Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
sunnights committed Jan 21, 2020
1 parent ee8fb9b commit b44f83c
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public class ClusterSupport<T> implements NotifyListener {

protected ConcurrentHashMap<URL, List<URL>> registryUrlsMap = new ConcurrentHashMap<>();
protected ConcurrentHashMap<URL, List<URL>> registryActiveUrlsMap = new ConcurrentHashMap<>();
protected ConcurrentHashMap<String, List<URL>> groupUrlsMap = new ConcurrentHashMap<>();
private int selectNodeCount;

public ClusterSupport(Class<T> interfaceClass, List<URL> registryUrls) {
Expand All @@ -75,9 +74,9 @@ public ClusterSupport(Class<T> interfaceClass, List<URL> registryUrls) {
String urlStr = StringTools.urlDecode(registryUrls.get(0).getParameter(URLParamType.embed.getName()));
this.url = URL.valueOf(urlStr);
protocol = getDecorateProtocol(url.getProtocol());
int connectionCount = this.url.getIntParameter(URLParamType.clientConnectionCount.getName(), URLParamType.clientConnectionCount.getIntValue());
int maxConnectionCount = this.url.getIntParameter(URLParamType.maxConnectionPerGroup.getName(), URLParamType.maxConnectionPerGroup.getIntValue());
int maxClientConnection = this.url.getIntParameter(URLParamType.maxClientConnection.getName(), URLParamType.maxClientConnection.getIntValue());
selectNodeCount = connectionCount / maxClientConnection;
selectNodeCount = maxConnectionCount / maxClientConnection;

if (selectNodeCount != 0) {
executorService = Executors.newScheduledThreadPool(1);
Expand Down Expand Up @@ -196,7 +195,6 @@ public synchronized void notify(URL registryUrl, List<URL> urls) {
// 判断urls中是否包含权重信息,并通知loadbalance。
processWeights(urls);

registryUrlsMap.put(registryUrl, urls);
List<URL> serviceUrls = urls;
if (selectNodeCount > 0 && MotanSwitcherUtil.switcherIsOpenWithDefault("feature.motan.partial.server", true)) {
serviceUrls = selectUrls(registryUrl, urls);
Expand Down Expand Up @@ -243,9 +241,8 @@ protected List<URL> selectUrls(URL registryUrl, List<URL> urls) {
for (Map.Entry<String, List<URL>> entry : groupUrlsMap.entrySet()) {
result.addAll(selectUrlsByGroup(registryUrl, entry.getKey(), entry.getValue()));
}
registryUrlsMap.put(registryUrl, urls);
registryActiveUrlsMap.put(registryUrl, result);
this.groupUrlsMap.clear();
this.groupUrlsMap.putAll(groupUrlsMap);
return result;
}

Expand All @@ -261,21 +258,21 @@ protected List<URL> selectUrlsByGroup(URL registryUrl, String group, List<URL> n
if (activeUrls == null) {
activeUrls = new ArrayList<>();
}
List<URL> oldNotifyUrls = groupUrlsMap.get(group);
if (oldNotifyUrls == null) {
oldNotifyUrls = new ArrayList<>();
List<URL> lastNotifyUrls = registryUrlsMap.get(registryUrl);
if (lastNotifyUrls == null) {
lastNotifyUrls = new ArrayList<>();
}

List<URL> sameUrls = new ArrayList<>(notifyUrls);
sameUrls.retainAll(activeUrls);
Collections.shuffle(sameUrls);
List<URL> addedUrls = new ArrayList<>(notifyUrls);
addedUrls.removeAll(oldNotifyUrls);
addedUrls.removeAll(lastNotifyUrls);
Collections.shuffle(addedUrls);

List<URL> groupUrls = new ArrayList<>();
// 计算重用节点数量
int newCount = Math.round((float) selectNodeCount * addedUrls.size() / (oldNotifyUrls.size() + addedUrls.size()));
int newCount = Math.round((float) selectNodeCount * addedUrls.size() / notifyUrls.size());
int remainCount = selectNodeCount - newCount;
// 至少三分之一的节点参与随机选择
remainCount = Math.min(remainCount, 2 * selectNodeCount / 3);
Expand Down Expand Up @@ -307,6 +304,8 @@ protected List<URL> selectUrlsByGroup(URL registryUrl, String group, List<URL> n
public void checkReferers() {
for (Map.Entry<URL, List<Referer<T>>> entry : registryReferers.entrySet()) {
URL registryUrl = entry.getKey();
LoggerUtil.info("ClusterSupport checkReferers: registry={} service={}",
registryUrl.getUri(), url.getIdentity());
int available = 0;
for (Referer<T> referer : entry.getValue()) {
if (referer.isAvailable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public enum URLParamType {
* pool max conn number
**/
maxClientConnection("maxClientConnection", 10),
clientConnectionCount("clientConnectionCount", 0),
maxConnectionPerGroup("maxConnectionPerGroup", 0),
/**
* pool max conn number
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package com.weibo.api.motan.config;

import java.util.Map;

import com.weibo.api.motan.config.annotation.ConfigDesc;

import java.util.Map;

/**
*
* protocol
Expand Down Expand Up @@ -48,6 +48,7 @@ public class ProtocolConfig extends AbstractConfig {
protected Integer minClientConnection;
// client最大连接数
protected Integer maxClientConnection;
protected Integer maxConnectionPerGroup;
// 最小工作pool线程数
protected Integer minWorkerThread;
// 最大工作pool线程数
Expand Down Expand Up @@ -164,6 +165,14 @@ public void setMaxClientConnection(Integer maxClientConnection) {
this.maxClientConnection = maxClientConnection;
}

public Integer getMaxConnectionPerGroup() {
return maxConnectionPerGroup;
}

public void setMaxConnectionPerGroup(Integer maxConnectionPerGroup) {
this.maxConnectionPerGroup = maxConnectionPerGroup;
}

public Integer getMinWorkerThread() {
return minWorkerThread;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class SelectUrlsTest {
private static List<URL> mockRegistryUrls() {
URL refUrl = new URL(MotanConstants.PROTOCOL_MOTAN, NetUtils.getLocalAddress().getHostAddress(), 0, IHello.class.getName());
refUrl.addParameter(URLParamType.check.getName(), "false");
refUrl.addParameter(URLParamType.clientConnectionCount.getName(), String.valueOf(count * URLParamType.maxClientConnection.getIntValue()));
refUrl.addParameter(URLParamType.maxConnectionPerGroup.getName(), String.valueOf(count * URLParamType.maxClientConnection.getIntValue()));

URL url1 = new URL("reg_1", "192.168.1.1", 18081, RegistryService.class.getName());
url1.addParameter(URLParamType.embed.getName(), StringTools.urlEncode(refUrl.toFullStr()));
Expand Down

0 comments on commit b44f83c

Please sign in to comment.