Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
sunnights committed Mar 4, 2020
1 parent b44f83c commit bb9c715
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.weibo.api.motan.cluster.support;

import com.weibo.api.motan.closable.Closable;
import com.weibo.api.motan.closable.ShutDownHook;
import com.weibo.api.motan.cluster.Cluster;
import com.weibo.api.motan.cluster.HaStrategy;
Expand Down Expand Up @@ -56,16 +55,31 @@
public class ClusterSupport<T> implements NotifyListener {

private static ConcurrentHashMap<String, Protocol> protocols = new ConcurrentHashMap<>();
private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
private static Set<ClusterSupport> refreshSet = new HashSet<>();

static {
executorService.scheduleAtFixedRate(() -> {
for (ClusterSupport clusterSupport : refreshSet) {
clusterSupport.refreshReferers();
}
}, MotanConstants.REFRESH_PERIOD, MotanConstants.REFRESH_PERIOD, TimeUnit.SECONDS);

ShutDownHook.registerShutdownHook(() -> {
if (!executorService.isShutdown()) {
executorService.shutdown();
}
});
}

protected ConcurrentHashMap<URL, List<URL>> registryUrlsMap = new ConcurrentHashMap<>();
protected ConcurrentHashMap<URL, List<URL>> registryActiveUrlsMap = new ConcurrentHashMap<>();
private Cluster<T> cluster;
private List<URL> registryUrls;
private URL url;
private Class<T> interfaceClass;
private Protocol protocol;
private ConcurrentHashMap<URL, List<Referer<T>>> registryReferers = new ConcurrentHashMap<>();
public ScheduledExecutorService executorService;

protected ConcurrentHashMap<URL, List<URL>> registryUrlsMap = new ConcurrentHashMap<>();
protected ConcurrentHashMap<URL, List<URL>> registryActiveUrlsMap = new ConcurrentHashMap<>();
private int selectNodeCount;

public ClusterSupport(Class<T> interfaceClass, List<URL> registryUrls) {
Expand All @@ -77,25 +91,6 @@ public ClusterSupport(Class<T> interfaceClass, List<URL> registryUrls) {
int maxConnectionCount = this.url.getIntParameter(URLParamType.maxConnectionPerGroup.getName(), URLParamType.maxConnectionPerGroup.getIntValue());
int maxClientConnection = this.url.getIntParameter(URLParamType.maxClientConnection.getName(), URLParamType.maxClientConnection.getIntValue());
selectNodeCount = maxConnectionCount / maxClientConnection;

if (selectNodeCount != 0) {
executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
checkReferers();
}
}, MotanConstants.CHECK_PERIOD, MotanConstants.CHECK_PERIOD, TimeUnit.SECONDS);

ShutDownHook.registerShutdownHook(new Closable() {
@Override
public void close() {
if (!executorService.isShutdown()) {
executorService.shutdown();
}
}
});
}
}

public void init() {
Expand Down Expand Up @@ -181,7 +176,7 @@ private URL toSubscribeUrl(URL url) {
public synchronized void notify(URL registryUrl, List<URL> urls) {
if (CollectionUtil.isEmpty(urls)) {
onRegistryEmpty(registryUrl);
LoggerUtil.warn("ClusterSupport config change notify, urls is empty: registry={} service={} urls={}", registryUrl.getUri(),
LoggerUtil.warn("ClusterSupport config change notify, urls is empty: registry={} service={} urls=[]", registryUrl.getUri(),
url.getIdentity());
return;
}
Expand All @@ -197,7 +192,10 @@ public synchronized void notify(URL registryUrl, List<URL> urls) {

List<URL> serviceUrls = urls;
if (selectNodeCount > 0 && MotanSwitcherUtil.switcherIsOpenWithDefault("feature.motan.partial.server", true)) {
refreshSet.add(this);
serviceUrls = selectUrls(registryUrl, urls);
} else {
refreshSet.remove(this);
}
List<Referer<T>> newReferers = new ArrayList<>();
for (URL u : serviceUrls) {
Expand Down Expand Up @@ -301,20 +299,19 @@ protected List<URL> selectUrlsByGroup(URL registryUrl, String group, List<URL> n
return result;
}

public void checkReferers() {
public void refreshReferers() {
for (Map.Entry<URL, List<Referer<T>>> entry : registryReferers.entrySet()) {
URL registryUrl = entry.getKey();
LoggerUtil.info("ClusterSupport checkReferers: registry={} service={}",
registryUrl.getUri(), url.getIdentity());
LoggerUtil.info("ClusterSupport refreshReferers: registry={} service={}", registryUrl.getUri(), url.getIdentity());
int available = 0;
for (Referer<T> referer : entry.getValue()) {
if (referer.isAvailable()) {
available++;
}
}
List<URL> activeUrls = registryUrlsMap.get(registryUrl);
if (activeUrls.size() > selectNodeCount && available < selectNodeCount) {
notify(registryUrl, activeUrls);
List<URL> urls = registryUrlsMap.get(registryUrl);
if (urls.size() > selectNodeCount && available < selectNodeCount) {
notify(registryUrl, urls);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class MotanConstants {
public static final String DEFAULT_CHARACTER = "utf-8";
public static final int SLOW_COST = 50; // 50ms
public static final int STATISTIC_PEROID = 30; // 30 seconds
public static final int CHECK_PERIOD = 60;
public static final int REFRESH_PERIOD = 60;
public static final String ASYNC_SUFFIX = "Async";// suffix for async call.
public static final String APPLICATION_STATISTIC = "statisitic";
public static final String REQUEST_REMOTE_ADDR = "requestRemoteAddress";
Expand Down

0 comments on commit bb9c715

Please sign in to comment.