Skip to content

Commit

Permalink
Merge pull request #990 from weibocom/feat/export_additional_group
Browse files Browse the repository at this point in the history
Feat/export additional group
  • Loading branch information
rayzhang0603 authored Aug 3, 2022
2 parents 4f1f473 + 180da70 commit a87996c
Show file tree
Hide file tree
Showing 16 changed files with 284 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ public class MotanConstants {
// ------------------ attachment constants -----------------
public static final String ATT_PRINT_TRACE_LOG = "print_trace_log"; // 针对单请求是否打印(access)日志

// ------------------ common env name -----------------
public static final String ENV_ADDITIONAL_GROUP = "MOTAN_SERVICE_ADDITIONAL_GROUP"; //motan service 追加导出分组。例如可以自动追加云平台上的分组
public static final String ENV_MESH_PROXY = "MOTAN_MESH_PROXY"; //使用mesh代理motan请求的环境变量名
public static final String ENV_MOTAN_IP_PREFIX = "MOTAN_IP_PREFIX";

private MotanConstants() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ public class AbstractInterfaceConfig extends AbstractConfig {

protected String mixGroups;

protected Boolean asyncInitConnection;

protected Integer fusingThreshold;

private Integer connectTimeout;

public Integer getRetries() {
return retries;
}
Expand Down Expand Up @@ -380,6 +386,34 @@ public void setMixGroups(String mixGroups) {
this.mixGroups = mixGroups;
}

public void setRegistryUrls(List<URL> registryUrls) {
this.registryUrls = registryUrls;
}

public Boolean getAsyncInitConnection() {
return asyncInitConnection;
}

public void setAsyncInitConnection(Boolean asyncInitConnection) {
this.asyncInitConnection = asyncInitConnection;
}

public Integer getFusingThreshold() {
return fusingThreshold;
}

public void setFusingThreshold(Integer fusingThreshold) {
this.fusingThreshold = fusingThreshold;
}

public Integer getConnectTimeout() {
return connectTimeout;
}

public void setConnectTimeout(Integer connectTimeout) {
this.connectTimeout = connectTimeout;
}

// 解析注册中心URL
protected void loadRegistryUrls() {
registryUrls.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.weibo.api.motan.config;

/**
*
* Abstract referer config.
*
* @author fishermen
Expand All @@ -34,7 +33,6 @@ public abstract class AbstractRefererConfig extends AbstractInterfaceConfig {
protected String p99;
protected String p999;
protected String errorRate;
protected Boolean asyncInitConnection;

public String getMean() {
return mean;
Expand Down Expand Up @@ -75,13 +73,4 @@ public String getErrorRate() {
public void setErrorRate(String errorRate) {
this.errorRate = errorRate;
}

public Boolean getAsyncInitConnection() {
return asyncInitConnection;
}

public void setAsyncInitConnection(Boolean asyncInitConnection) {
this.asyncInitConnection = asyncInitConnection;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.weibo.api.motan.util.ConcurrentHashSet;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.NetUtils;
import com.weibo.api.motan.util.StringTools;
import org.apache.commons.lang3.StringUtils;

import java.util.*;
Expand All @@ -43,15 +44,15 @@
public class ServiceConfig<T> extends AbstractServiceConfig {

private static final long serialVersionUID = -3342374271064293224L;
private static ConcurrentHashSet<String> existingServices = new ConcurrentHashSet<String>();
private static ConcurrentHashSet<String> existingServices = new ConcurrentHashSet<>();
// 具体到方法的配置
protected List<MethodConfig> methods;

// 接口实现类引用
private T ref;

// service 对应的exporters,用于管理service服务的生命周期
private List<Exporter<T>> exporters = new CopyOnWriteArrayList<Exporter<T>>();
private List<Exporter<T>> exporters = new CopyOnWriteArrayList<>();
private Class<T> interfaceClass;
private BasicServiceInterfaceConfig basicService;
private AtomicBoolean exported = new AtomicBoolean(false);
Expand Down Expand Up @@ -120,7 +121,7 @@ public synchronized void export() {
for (ProtocolConfig protocolConfig : protocols) {
Integer port = protocolPorts.get(protocolConfig.getId());
if (port == null) {
throw new MotanServiceException(String.format("Unknow port in service:%s, protocol:%s", interfaceClass.getName(),
throw new MotanServiceException(String.format("Unknown port in service:%s, protocol:%s", interfaceClass.getName(),
protocolConfig.getId()));
}
doExport(protocolConfig, port);
Expand All @@ -142,7 +143,6 @@ public synchronized void unexport() {
}
}

@SuppressWarnings("unchecked")
private void doExport(ProtocolConfig protocolConfig, int port) {
String protocolName = protocolConfig.getName();
if (protocolName == null || protocolName.length() == 0) {
Expand All @@ -157,7 +157,7 @@ private void doExport(ProtocolConfig protocolConfig, int port) {
hostAddress = getLocalHostAddress();
}

Map<String, String> map = new HashMap<String, String>();
Map<String, String> map = new HashMap<>();

map.put(URLParamType.nodeType.getName(), MotanConstants.NODE_TYPE_SERVICE);
map.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis()));
Expand All @@ -167,31 +167,33 @@ private void doExport(ProtocolConfig protocolConfig, int port) {

URL serviceUrl = new URL(protocolName, hostAddress, port, interfaceClass.getName(), map);

String groupString = serviceUrl.getParameter(URLParamType.group.getName(), ""); // do not with default group value
String additionalGroup = System.getenv(MotanConstants.ENV_ADDITIONAL_GROUP);
if (StringUtils.isNotBlank(additionalGroup)) { // check additional groups
groupString = StringUtils.isBlank(groupString) ? additionalGroup : groupString + "," + additionalGroup;
serviceUrl.addParameter(URLParamType.group.getName(), groupString);
}
// check multi group.
String groupString = serviceUrl.getGroup();
if (groupString.contains(MotanConstants.COMMA_SEPARATOR)){
String[] groups = groupString.split(MotanConstants.COMMA_SEPARATOR);
for (String group : groups){
if (StringUtils.isNotBlank(group)){ // create new service for each group
URL newGroupServiceUrl = serviceUrl.createCopy();
newGroupServiceUrl.addParameter(URLParamType.group.getName(), group.trim());
exportService(hostAddress, protocolName, newGroupServiceUrl);
}
if (groupString.contains(MotanConstants.COMMA_SEPARATOR)) {
for (String group : StringTools.splitSet(groupString, MotanConstants.COMMA_SEPARATOR)) {
URL newGroupServiceUrl = serviceUrl.createCopy();
newGroupServiceUrl.addParameter(URLParamType.group.getName(), group);
exportService(hostAddress, protocolName, newGroupServiceUrl);
}
}else {
} else {
exportService(hostAddress, protocolName, serviceUrl);
}
}

private void exportService(String hostAddress, String protocol, URL serviceUrl){
private void exportService(String hostAddress, String protocol, URL serviceUrl) {
if (serviceExists(serviceUrl)) {
LoggerUtil.warn(String.format("%s configService is malformed, for same service (%s) already exists ", interfaceClass.getName(),
serviceUrl.getIdentity()));
throw new MotanFrameworkException(String.format("%s configService is malformed, for same service (%s) already exists ",
interfaceClass.getName(), serviceUrl.getIdentity()), MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
}
LoggerUtil.info("export for service url :" + serviceUrl.toFullStr());
List<URL> urls = new ArrayList<URL>();
List<URL> urls = new ArrayList<>();

// injvm 协议只支持注册到本地,其他协议可以注册到local、remote
if (MotanConstants.PROTOCOL_INJVM.equals(protocol)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright 2009-2016 Weibo, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
Expand All @@ -26,13 +26,11 @@
import com.weibo.api.motan.filter.InitializableFilter;
import com.weibo.api.motan.rpc.*;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.StringTools;
import org.apache.commons.lang3.StringUtils;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.*;

import static com.weibo.api.motan.common.MotanConstants.DISABLE_FILTER_PREFIX;

Expand Down Expand Up @@ -210,7 +208,7 @@ public T getImpl() {
*/
protected List<Filter> getFilters(URL url, String key) {
// load default filters
List<Filter> filters = new ArrayList<Filter>();
List<Filter> filters = new ArrayList<>();
List<Filter> defaultFilters = ExtensionLoader.getExtensionLoader(Filter.class).getExtensions(key);
if (defaultFilters != null && defaultFilters.size() > 0) {
filters.addAll(defaultFilters);
Expand All @@ -220,25 +218,26 @@ protected List<Filter> getFilters(URL url, String key) {
String filterStr = url.getParameter(URLParamType.filter.getName());
if (StringUtils.isNotBlank(filterStr)) {
HashSet<String> removedFilters = new HashSet<>();
String[] filterNames = MotanConstants.COMMA_SPLIT_PATTERN.split(filterStr);
Set<String> filterNames = StringTools.splitSet(filterStr, MotanConstants.COMMA_SEPARATOR);
for (String fn : filterNames) {
if (StringUtils.isBlank(fn)) {
continue;
}
fn = fn.trim();
if (fn.startsWith(DISABLE_FILTER_PREFIX)){ // disable filter
if (fn.length() > DISABLE_FILTER_PREFIX.length()){
if (fn.startsWith(DISABLE_FILTER_PREFIX)) { // disable filter
if (fn.length() > DISABLE_FILTER_PREFIX.length()) {
removedFilters.add(fn.substring(DISABLE_FILTER_PREFIX.length()).trim());
}
}else {
addIfAbsent(filters, fn);
} else {
Filter extFilter = ExtensionLoader.getExtensionLoader(Filter.class).getExtension(fn, false);
if (extFilter == null) {
LoggerUtil.warn("filter extension not found. filer name: " + fn);
continue;
}
filters.add(extFilter);
}
}

// remove disabled filters
if (!removedFilters.isEmpty()){
if (!removedFilters.isEmpty()) {
for (String removedName : removedFilters) {
filters.removeIf((filter)-> removedName.equals(filter.getClass().getAnnotation(SpiMeta.class).name()));
filters.removeIf((filter) -> removedName.equals(filter.getClass().getAnnotation(SpiMeta.class).name()));
}
}
}
Expand All @@ -248,24 +247,4 @@ protected List<Filter> getFilters(URL url, String key) {
Collections.reverse(filters);
return filters;
}

private void addIfAbsent(List<Filter> filters, String extensionName) {
Filter extFilter = ExtensionLoader.getExtensionLoader(Filter.class).getExtension(extensionName, false);
if (extFilter == null) {
LoggerUtil.warn("filter extension not found. filer name: " + extensionName);
return;
}

boolean exists = false;
for (Filter f : filters) {
if (f.getClass() == extFilter.getClass()) {
exists = true;
break;
}
}
if (!exists) {
filters.add(extFilter);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.weibo.api.motan.util;

import com.weibo.api.motan.common.MotanConstants;
import com.weibo.api.motan.common.URLParamType;
import com.weibo.api.motan.core.extension.ExtensionLoader;
import com.weibo.api.motan.registry.RegistryFactory;
Expand All @@ -32,8 +33,6 @@
* @date 2022/7/14.
*/
public class MeshProxyUtil {
public static final String MESH_PROXY_ENV_NAME = "MOTAN_MESH_PROXY"; //使用mesh代理motan请求的环境变量名

// config keys
private static final String MODE_KEY = "mode"; // proxy type key
private static final String PORT_KEY = "port"; // mesh transport port for client end
Expand Down Expand Up @@ -155,7 +154,7 @@ private static String getValue(Map<String, String> configs, String key, String d
// 检查是否支持mesh proxy
private static void initCheck() {
// check env set
String meshProxyString = System.getenv(MESH_PROXY_ENV_NAME);
String meshProxyString = System.getenv(MotanConstants.ENV_MESH_PROXY);
if (StringUtils.isNotBlank(meshProxyString)) {
LoggerUtil.info("find MOTAN_MESH_PROXY env, value:" + meshProxyString);
proxyConfig = parseProxyConfig(meshProxyString);
Expand Down
Loading

0 comments on commit a87996c

Please sign in to comment.