Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support export at random port #912

Merged
merged 2 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,10 @@ public class ClusterSupport<T> implements NotifyListener {
private int selectNodeCount;
private ConcurrentHashMap<URL, Map<String, GroupUrlsSelector>> registryGroupUrlsSelectorMap = new ConcurrentHashMap<>();

public ClusterSupport(Class<T> interfaceClass, List<URL> registryUrls) {
public ClusterSupport(Class<T> interfaceClass, List<URL> registryUrls, URL refUrl) {
this.registryUrls = registryUrls;
this.interfaceClass = interfaceClass;
String urlStr = StringTools.urlDecode(registryUrls.get(0).getParameter(URLParamType.embed.getName()));
this.url = URL.valueOf(urlStr);
this.url = refUrl;
protocol = getDecorateProtocol(url.getProtocol());
int maxConnectionCount = this.url.getIntParameter(URLParamType.maxConnectionPerGroup.getName(), URLParamType.maxConnectionPerGroup.getIntValue());
int maxClientConnection = this.url.getIntParameter(URLParamType.maxClientConnection.getName(), URLParamType.maxClientConnection.getIntValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public class AbstractInterfaceConfig extends AbstractConfig {
// 注册中心的配置列表
protected List<RegistryConfig> registries;

// 解析后的所有注册中心url
protected List<URL> registryUrls = new ArrayList<>();

// 扩展配置点
protected ExtConfig extConfig;

Expand Down Expand Up @@ -371,8 +374,9 @@ public void setTransExceptionStack(Boolean transExceptionStack) {
this.transExceptionStack = transExceptionStack;
}

protected List<URL> loadRegistryUrls() {
List<URL> registryList = new ArrayList<URL>();
// 解析注册中心URL
protected void loadRegistryUrls() {
registryUrls.clear();
if (registries != null && !registries.isEmpty()) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
Expand All @@ -399,12 +403,11 @@ protected List<URL> loadRegistryUrls() {
if (urls != null && !urls.isEmpty()) {
for (URL url : urls) {
url.removeParameter(URLParamType.protocol.getName());
registryList.add(url);
registryUrls.add(url);
}
}
}
}
return registryList;
}

protected void checkInterfaceAndMethods(Class<?> interfaceClass, List<MethodConfig> methods) {
Expand Down Expand Up @@ -449,12 +452,12 @@ protected void checkInterfaceAndMethods(Class<?> interfaceClass, List<MethodConf
}
}

protected String getLocalHostAddress(List<URL> registryURLs) {
protected String getLocalHostAddress() {

String localAddress = null;

Map<String, Integer> regHostPorts = new HashMap<String, Integer>();
for (URL ru : registryURLs) {
for (URL ru : registryUrls) {
if (StringUtils.isNotBlank(ru.getHost()) && ru.getPort() > 0) {
regHostPorts.put(ru.getHost(), ru.getPort());
}
Expand All @@ -479,4 +482,8 @@ public Integer getSlowThreshold() {
public void setSlowThreshold(int slowThreshold) {
this.slowThreshold = slowThreshold;
}

public List<URL> getRegistryUrls() {
return registryUrls;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ public synchronized void initRef() {

ConfigHandler configHandler = ExtensionLoader.getExtensionLoader(ConfigHandler.class).getExtension(MotanConstants.DEFAULT_VALUE);

List<URL> registryUrls = loadRegistryUrls();
String localIp = getLocalHostAddress(registryUrls);
loadRegistryUrls();
String localIp = getLocalHostAddress();
for (ProtocolConfig protocol : protocols) {
Map<String, String> params = new HashMap<>();
params.put(URLParamType.nodeType.getName(), MotanConstants.NODE_TYPE_REFERER);
Expand All @@ -135,7 +135,7 @@ public synchronized void initRef() {

String path = StringUtils.isBlank(serviceInterface) ? interfaceClass.getName() : serviceInterface;
URL refUrl = new URL(protocol.getName(), localIp, MotanConstants.DEFAULT_INT_VALUE, path, params);
ClusterSupport<T> clusterSupport = createClusterSupport(refUrl, configHandler, registryUrls);
ClusterSupport<T> clusterSupport = createClusterSupport(refUrl, configHandler);

clusterSupports.add(clusterSupport);
clusters.add(clusterSupport.getCluster());
Expand All @@ -151,7 +151,7 @@ public synchronized void initRef() {
initialized.set(true);
}

private ClusterSupport<T> createClusterSupport(URL refUrl, ConfigHandler configHandler, List<URL> registryUrls) {
private ClusterSupport<T> createClusterSupport(URL refUrl, ConfigHandler configHandler) {
List<URL> regUrls = new ArrayList<>();

// 如果用户指定directUrls 或者 injvm协议访问,则使用local registry
Expand Down Expand Up @@ -190,10 +190,7 @@ private ClusterSupport<T> createClusterSupport(URL refUrl, ConfigHandler configH
}
}

for (URL url : regUrls) {
url.addParameter(URLParamType.embed.getName(), StringTools.urlEncode(refUrl.toFullStr()));
}
return configHandler.buildClusterSupport(interfaceClass, regUrls);
return configHandler.buildClusterSupport(interfaceClass, regUrls, refUrl);
}

public synchronized void destroy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.weibo.api.motan.util.ConcurrentHashSet;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.NetUtils;
import com.weibo.api.motan.util.StringTools;
import org.apache.commons.lang3.StringUtils;

import java.util.*;
Expand All @@ -56,9 +55,6 @@ public class ServiceConfig<T> extends AbstractServiceConfig {
private Class<T> interfaceClass;
private BasicServiceInterfaceConfig basicService;
private AtomicBoolean exported = new AtomicBoolean(false);
// service的用于注册的url,用于管理service注册的生命周期,url为regitry url,内部嵌套service url。
private ConcurrentHashSet<URL> registereUrls = new ConcurrentHashSet<URL>();

public static ConcurrentHashSet<String> getExistingServices() {
return existingServices;
}
Expand Down Expand Up @@ -114,7 +110,7 @@ public synchronized void export() {

checkInterfaceAndMethods(interfaceClass, methods);

List<URL> registryUrls = loadRegistryUrls();
loadRegistryUrls();
if (registryUrls == null || registryUrls.size() == 0) {
throw new IllegalStateException("Should set registry config for service:" + interfaceClass.getName());
}
Expand All @@ -126,7 +122,7 @@ public synchronized void export() {
throw new MotanServiceException(String.format("Unknow port in service:%s, protocol:%s", interfaceClass.getName(),
protocolConfig.getId()));
}
doExport(protocolConfig, port, registryUrls);
doExport(protocolConfig, port);
}

afterExport();
Expand All @@ -139,14 +135,14 @@ public synchronized void unexport() {
try {
ConfigHandler configHandler =
ExtensionLoader.getExtensionLoader(ConfigHandler.class).getExtension(MotanConstants.DEFAULT_VALUE);
configHandler.unexport(exporters, registereUrls);
configHandler.unexport(exporters, registryUrls);
} finally {
afterUnexport();
}
}

@SuppressWarnings("unchecked")
private void doExport(ProtocolConfig protocolConfig, int port, List<URL> registryURLs) {
private void doExport(ProtocolConfig protocolConfig, int port) {
String protocolName = protocolConfig.getName();
if (protocolName == null || protocolName.length() == 0) {
protocolName = URLParamType.protocol.getValue();
Expand All @@ -157,7 +153,7 @@ private void doExport(ProtocolConfig protocolConfig, int port, List<URL> registr
hostAddress = basicService.getHost();
}
if (NetUtils.isInvalidLocalHost(hostAddress)) {
hostAddress = getLocalHostAddress(registryURLs);
hostAddress = getLocalHostAddress();
}

Map<String, String> map = new HashMap<String, String>();
Expand All @@ -182,7 +178,7 @@ private void doExport(ProtocolConfig protocolConfig, int port, List<URL> registr
// injvm 协议只支持注册到本地,其他协议可以注册到local、remote
if (MotanConstants.PROTOCOL_INJVM.equals(protocolConfig.getId())) {
URL localRegistryUrl = null;
for (URL ru : registryURLs) {
for (URL ru : registryUrls) {
if (MotanConstants.REGISTRY_PROTOCOL_LOCAL.equals(ru.getProtocol())) {
localRegistryUrl = ru.createCopy();
break;
Expand All @@ -196,19 +192,14 @@ private void doExport(ProtocolConfig protocolConfig, int port, List<URL> registr

urls.add(localRegistryUrl);
} else {
for (URL ru : registryURLs) {
for (URL ru : registryUrls) {
urls.add(ru.createCopy());
}
}

for (URL u : urls) {
u.addParameter(URLParamType.embed.getName(), StringTools.urlEncode(serviceUrl.toFullStr()));
registereUrls.add(u.createCopy());
}

ConfigHandler configHandler = ExtensionLoader.getExtensionLoader(ConfigHandler.class).getExtension(MotanConstants.DEFAULT_VALUE);

exporters.add(configHandler.export(interfaceClass, ref, urls));
exporters.add(configHandler.export(interfaceClass, ref, urls, serviceUrl));
}

private void afterExport() {
Expand All @@ -222,10 +213,8 @@ private void afterUnexport() {
exported.set(false);
for (Exporter<T> ep : exporters) {
existingServices.remove(ep.getProvider().getUrl().getIdentity());
exporters.remove(ep);
}
exporters.clear();
registereUrls.clear();
}

@ConfigDesc(excluded = true)
Expand Down Expand Up @@ -256,9 +245,4 @@ public void setHost(String host) {
public AtomicBoolean getExported() {
return exported;
}

public ConcurrentHashSet<URL> getRegistereUrls() {
return registereUrls;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@

package com.weibo.api.motan.config.handler;

import java.util.Collection;
import java.util.List;

import com.weibo.api.motan.cluster.Cluster;
import com.weibo.api.motan.cluster.support.ClusterSupport;
import com.weibo.api.motan.core.extension.Scope;
import com.weibo.api.motan.core.extension.Spi;
import com.weibo.api.motan.rpc.Exporter;
import com.weibo.api.motan.rpc.URL;

import java.util.Collection;
import java.util.List;

/**
*
* Handle urls which are from config.
Expand All @@ -36,11 +36,11 @@
@Spi(scope = Scope.SINGLETON)
public interface ConfigHandler {

<T> ClusterSupport<T> buildClusterSupport(Class<T> interfaceClass, List<URL> registryUrls);
<T> ClusterSupport<T> buildClusterSupport(Class<T> interfaceClass, List<URL> registryUrls, URL refUrl);

<T> T refer(Class<T> interfaceClass, List<Cluster<T>> cluster, String proxyType);

<T> Exporter<T> export(Class<T> interfaceClass, T ref, List<URL> registryUrls);
<T> Exporter<T> export(Class<T> interfaceClass, T ref, List<URL> registryUrls, URL serviceUrl);

<T> void unexport(List<Exporter<T>> exporters, Collection<URL> registryUrls);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.weibo.api.motan.registry.RegistryFactory;
import com.weibo.api.motan.rpc.*;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.StringTools;

import java.util.Collection;
import java.util.List;
Expand All @@ -49,8 +48,8 @@
public class SimpleConfigHandler implements ConfigHandler {

@Override
public <T> ClusterSupport<T> buildClusterSupport(Class<T> interfaceClass, List<URL> registryUrls) {
ClusterSupport<T> clusterSupport = new ClusterSupport<T>(interfaceClass, registryUrls);
public <T> ClusterSupport<T> buildClusterSupport(Class<T> interfaceClass, List<URL> registryUrls, URL refUrl) {
ClusterSupport<T> clusterSupport = new ClusterSupport<T>(interfaceClass, registryUrls, refUrl);
clusterSupport.init();

return clusterSupport;
Expand All @@ -63,11 +62,7 @@ public <T> T refer(Class<T> interfaceClass, List<Cluster<T>> clusters, String pr
}

@Override
public <T> Exporter<T> export(Class<T> interfaceClass, T ref, List<URL> registryUrls) {

String serviceStr = StringTools.urlDecode(registryUrls.get(0).getParameter(URLParamType.embed.getName()));
URL serviceUrl = URL.valueOf(serviceStr);

public <T> Exporter<T> export(Class<T> interfaceClass, T ref, List<URL> registryUrls, URL serviceUrl) {
// export service
// 利用protocol decorator来增加filter特性
String protocolName = serviceUrl.getParameter(URLParamType.protocol.getName(), URLParamType.protocol.getValue());
Expand All @@ -93,17 +88,13 @@ protected <T> Provider<T> getProvider(Protocol protocol, T proxyImpl, URL url, C

@Override
public <T> void unexport(List<Exporter<T>> exporters, Collection<URL> registryUrls) {
try {
unRegister(registryUrls);
} catch (Exception e1) {
LoggerUtil.warn("Exception when unregister urls:" + registryUrls);
}
try {
for (Exporter<T> exporter : exporters) {
for (Exporter<T> exporter : exporters) {
try {
unRegister(registryUrls, exporter.getUrl());
exporter.unexport();
} catch (Exception e) {
LoggerUtil.warn("Exception when unexport exporters:" + exporters);
}
} catch (Exception e) {
LoggerUtil.warn("Exception when unexport exporters:" + exporters);
}
}

Expand All @@ -122,13 +113,10 @@ private void register(List<URL> registryUrls, URL serviceUrl) {
}
}

private void unRegister(Collection<URL> registryUrls) {
private void unRegister(Collection<URL> registryUrls, URL serviceUrl) {
for (URL url : registryUrls) {
// 不管check的设置如何,做完所有unregistry,做好清理工作
try {
String serviceStr = StringTools.urlDecode(url.getParameter(URLParamType.embed.getName()));
URL serviceUrl = URL.valueOf(serviceStr);

RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(url.getProtocol());
Registry registry = registryFactory.getRegistry(url);
registry.unregister(serviceUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public <T> Exporter<T> export(Provider<T> provider, URL url) {
exporter = createExporter(provider, url);
exporter.init();

protocolKey = MotanFrameworkUtil.getProtocolKey(url);// rebuild protocolKey,maybe port change when using random port
exporterMap.put(protocolKey, exporter);

LoggerUtil.info(this.getClass().getSimpleName() + " export Success: url=" + url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.weibo.api.motan.common.URLParamType;
import com.weibo.api.motan.core.extension.ExtensionLoader;
import com.weibo.api.motan.exception.MotanFrameworkException;
import com.weibo.api.motan.rpc.AbstractExporter;
import com.weibo.api.motan.rpc.Exporter;
import com.weibo.api.motan.rpc.Provider;
Expand Down Expand Up @@ -76,7 +77,16 @@ public void unexport() {

@Override
protected boolean doInit() {
return server.open();
boolean result = server.open();
if (result && getUrl().getPort() == 0){ // use random port
ProviderMessageRouter requestRouter = this.ipPort2RequestRouter.remove(getUrl().getServerPortStr());
if (requestRouter == null){
throw new MotanFrameworkException("can not find message router. url:" + getUrl().getIdentity());
}
updateRealServerPort(server.getLocalAddress().getPort());
this.ipPort2RequestRouter.put(getUrl().getServerPortStr(), requestRouter);
}
return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*
* @author maijunsheng
* @version 创建时间:2013-5-21
*
* AbstractExporter
*/
public abstract class AbstractExporter<T> extends AbstractNode implements Exporter<T> {
protected Provider<T> provider;
Expand All @@ -39,4 +39,12 @@ public Provider<T> getProvider() {
public String desc() {
return "[" + this.getClass().getSimpleName() + "] url=" + url;
}

/**
* update real listened port
* @param port
*/
protected void updateRealServerPort(int port){
getUrl().setPort(port);
}
}
Loading