Skip to content

Commit

Permalink
Merge pull request #682 from sunnights/feature/common-handler
Browse files Browse the repository at this point in the history
feature: support common handler
  • Loading branch information
rayzhang0603 authored May 2, 2018
2 parents 85aa36f + 5b7db53 commit 042f735
Show file tree
Hide file tree
Showing 22 changed files with 498 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class MotanConstants {
public static final String PROTOCOL_INJVM = "injvm";
public static final String PROTOCOL_MOTAN = "motan";
public static final String PROXY_JDK = "jdk";
public static final String PROXY_COMMON = "common";
public static final String PROXY_JAVASSIST = "javassist";
public static final String FRAMEWORK_NAME = "motan";
public static final String PROTOCOL_SWITCHER_PREFIX = "protocol:";
Expand All @@ -69,7 +70,7 @@ public class MotanConstants {
public static final int STATISTIC_PEROID = 30; // 30 seconds
public static final String ASYNC_SUFFIX = "Async";// suffix for async call.
public static final String APPLICATION_STATISTIC = "statisitic";

/**
* netty channel constants start
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import java.util.concurrent.atomic.AtomicBoolean;

/**
*
*
* Referer config.
*
*
* @author fishermen
* @version V1.0 created at: 2013-5-17
*/
Expand All @@ -49,6 +49,16 @@ public class RefererConfig<T> extends AbstractRefererConfig {

private Class<T> interfaceClass;

private String serviceInterface;

public String getServiceInterface() {
return serviceInterface;
}

public void setServiceInterface(String serviceInterface) {
this.serviceInterface = serviceInterface;
}

// 具体到方法的配置
protected List<MethodConfig> methods;

Expand Down Expand Up @@ -123,7 +133,8 @@ public synchronized void initRef() {
collectConfigParams(params, protocol, basicReferer, extConfig, this);
collectMethodConfigParams(params, this.getMethods());

URL refUrl = new URL(protocol.getName(), localIp, MotanConstants.DEFAULT_INT_VALUE, interfaceClass.getName(), params);
String path = StringUtils.isBlank(serviceInterface) ? interfaceClass.getName() : serviceInterface;
URL refUrl = new URL(protocol.getName(), localIp, MotanConstants.DEFAULT_INT_VALUE, path, params);
ClusterSupport<T> clusterSupport = createClusterSupport(refUrl, configHandler, registryUrls);

clusterSupports.add(clusterSupport);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.weibo.api.motan.exception.MotanFrameworkException;
import com.weibo.api.motan.protocol.support.ProtocolFilterDecorator;
import com.weibo.api.motan.proxy.ProxyFactory;
import com.weibo.api.motan.proxy.RefererInvocationHandler;
import com.weibo.api.motan.registry.Registry;
import com.weibo.api.motan.registry.RegistryFactory;
import com.weibo.api.motan.rpc.*;
Expand All @@ -39,7 +38,7 @@


/**
*
*
* Handle refUrl to get referers, assemble to a cluster, create a proxy
*
* @author fishermen
Expand All @@ -60,7 +59,7 @@ public <T> ClusterSupport<T> buildClusterSupport(Class<T> interfaceClass, List<U
@Override
public <T> T refer(Class<T> interfaceClass, List<Cluster<T>> clusters, String proxyType) {
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(proxyType);
return proxyFactory.getProxy(interfaceClass, new RefererInvocationHandler<T>(interfaceClass, clusters));
return proxyFactory.getProxy(interfaceClass, clusters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package com.weibo.api.motan.proxy;

import com.weibo.api.motan.cluster.Cluster;
import com.weibo.api.motan.common.MotanConstants;
import com.weibo.api.motan.common.URLParamType;
import com.weibo.api.motan.core.extension.ExtensionLoader;
import com.weibo.api.motan.exception.MotanErrorMsgConstant;
import com.weibo.api.motan.exception.MotanFrameworkException;
import com.weibo.api.motan.exception.MotanServiceException;
import com.weibo.api.motan.rpc.*;
import com.weibo.api.motan.serialize.DeserializableObject;
import com.weibo.api.motan.switcher.Switcher;
import com.weibo.api.motan.switcher.SwitcherService;
import com.weibo.api.motan.util.ExceptionUtil;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.MotanFrameworkUtil;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* @author sunnights
*/
public class AbstractRefererHandler<T> {
protected List<Cluster<T>> clusters;
protected Class<T> clz;
protected SwitcherService switcherService = null;
protected String interfaceName;

void init() {
// clusters 不应该为空
String switchName = this.clusters.get(0).getUrl().getParameter(URLParamType.switcherService.getName(), URLParamType.switcherService.getValue());
switcherService = ExtensionLoader.getExtensionLoader(SwitcherService.class).getExtension(switchName);
}

Object invokeRequest(Request request, Class returnType, boolean async) throws Throwable {
RpcContext curContext = RpcContext.getContext();
curContext.putAttribute(MotanConstants.ASYNC_SUFFIX, async);

// set rpc context attachments to request
Map<String, String> attachments = curContext.getRpcAttachments();
if (!attachments.isEmpty()) {
for (Map.Entry<String, String> entry : attachments.entrySet()) {
request.setAttachment(entry.getKey(), entry.getValue());
}
}

// add to attachment if client request id is set
if (StringUtils.isNotBlank(curContext.getClientRequestId())) {
request.setAttachment(URLParamType.requestIdFromClient.getName(), curContext.getClientRequestId());
}

// 当 referer配置多个protocol的时候,比如A,B,C,
// 那么正常情况下只会使用A,如果A被开关降级,那么就会使用B,B也被降级,那么会使用C
for (Cluster<T> cluster : clusters) {
String protocolSwitcher = MotanConstants.PROTOCOL_SWITCHER_PREFIX + cluster.getUrl().getProtocol();

Switcher switcher = switcherService.getSwitcher(protocolSwitcher);

if (switcher != null && !switcher.isOn()) {
continue;
}

request.setAttachment(URLParamType.version.getName(), cluster.getUrl().getVersion());
request.setAttachment(URLParamType.clientGroup.getName(), cluster.getUrl().getGroup());
// 带上client的application和module
request.setAttachment(URLParamType.application.getName(), cluster.getUrl().getApplication());
request.setAttachment(URLParamType.module.getName(), cluster.getUrl().getModule());

Response response;
boolean throwException = Boolean.parseBoolean(cluster.getUrl().getParameter(URLParamType.throwException.getName(), URLParamType.throwException.getValue()));
try {
response = cluster.call(request);
if (async) {
if (response instanceof ResponseFuture) {
((ResponseFuture) response).setReturnType(returnType);
return response;
} else {
ResponseFuture responseFuture = new DefaultResponseFuture(request, 0, cluster.getUrl());
if (response.getException() != null) {
responseFuture.onFailure(response);
} else {
responseFuture.onSuccess(response);
}
responseFuture.setReturnType(returnType);
return responseFuture;
}
} else {
Object value = response.getValue();
if (value != null && value instanceof DeserializableObject) {
try {
value = ((DeserializableObject) value).deserialize(returnType);
} catch (IOException e) {
LoggerUtil.error("deserialize response value fail! deserialize type:" + returnType, e);
throw new MotanFrameworkException("deserialize return value fail! deserialize type:" + returnType, e);
}
}
return value;
}
} catch (RuntimeException e) {
if (ExceptionUtil.isBizException(e)) {
Throwable t = e.getCause();
// 只抛出Exception,防止抛出远程的Error
if (t != null && t instanceof Exception) {
throw t;
} else {
String msg = t == null ? "biz exception cause is null. origin error msg : " + e.getMessage() : ("biz exception cause is throwable error:" + t.getClass() + ", errmsg:" + t.getMessage());
throw new MotanServiceException(msg, MotanErrorMsgConstant.SERVICE_DEFAULT_ERROR);
}
} else if (!throwException) {
LoggerUtil.warn("RefererInvocationHandler invoke false, so return default value: uri=" + cluster.getUrl().getUri() + " " + MotanFrameworkUtil.toString(request), e);
return getDefaultReturnValue(returnType);
} else {
LoggerUtil.error("RefererInvocationHandler invoke Error: uri=" + cluster.getUrl().getUri() + " " + MotanFrameworkUtil.toString(request), e);
throw e;
}
}
}
throw new MotanServiceException("Referer call Error: cluster not exist, interface=" + interfaceName + " " + MotanFrameworkUtil.toString(request), MotanErrorMsgConstant.SERVICE_UNFOUND);
}

private Object getDefaultReturnValue(Class<?> returnType) {
if (returnType != null && returnType.isPrimitive()) {
return PrimitiveDefault.getDefaultReturnValue(returnType);
}
return null;
}

private static class PrimitiveDefault {
private static boolean defaultBoolean;
private static char defaultChar;
private static byte defaultByte;
private static short defaultShort;
private static int defaultInt;
private static long defaultLong;
private static float defaultFloat;
private static double defaultDouble;

private static Map<Class<?>, Object> primitiveValues = new HashMap<Class<?>, Object>();

static {
primitiveValues.put(boolean.class, defaultBoolean);
primitiveValues.put(char.class, defaultChar);
primitiveValues.put(byte.class, defaultByte);
primitiveValues.put(short.class, defaultShort);
primitiveValues.put(int.class, defaultInt);
primitiveValues.put(long.class, defaultLong);
primitiveValues.put(float.class, defaultFloat);
primitiveValues.put(double.class, defaultDouble);
}

public static Object getDefaultReturnValue(Class<?> returnType) {
return primitiveValues.get(returnType);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.weibo.api.motan.proxy;

import com.weibo.api.motan.rpc.Request;

/**
* @author sunnights
*/
public interface CommonHandler {
/**
* call a service method with general handler
*
* @param methodName the method name of remote service
* @param arguments an array of objects containing the values of the arguments passed in the method invocation
* @param returnType the class type that the method returns
* @return
* @throws Throwable
*/
Object call(String methodName, Object[] arguments, Class returnType) throws Throwable;

/**
* async call a service with general handler
*
* @param methodName
* @param arguments
* @param returnType
* @return
* @throws Throwable
*/
Object asyncCall(String methodName, Object[] arguments, Class returnType) throws Throwable;

/**
* call a service method with request
*
* @param request
* @param returnType
* @return
*/
Object call(Request request, Class returnType) throws Throwable;

/**
* async call a service with request
*
* @param request
* @param returnType
* @return
*/
Object asyncCall(Request request, Class returnType) throws Throwable;

/**
* build request with methodName and arguments
*
* @param methodName
* @param arguments
* @return
*/
Request buildRequest(String methodName, Object[] arguments);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@

package com.weibo.api.motan.proxy;

import java.lang.reflect.InvocationHandler;

import com.weibo.api.motan.cluster.Cluster;
import com.weibo.api.motan.core.extension.Spi;

import java.util.List;

/**
*
*
* @author maijunsheng
*
*
*/
@Spi
public interface ProxyFactory {

<T> T getProxy(Class<T> clz, InvocationHandler invocationHandler);
<T> T getProxy(Class<T> clz, List<Cluster<T>> clusters);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.weibo.api.motan.proxy;

import com.weibo.api.motan.cluster.Cluster;
import com.weibo.api.motan.rpc.DefaultRequest;
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.util.ReflectUtil;
import com.weibo.api.motan.util.RequestIdGenerator;

import java.util.List;
import java.util.Map;

/**
* @author sunnights
*/
public class RefererCommonHandler<T> extends AbstractRefererHandler<T> implements CommonHandler {

public RefererCommonHandler(String interfaceName, List<Cluster<T>> clusters) {
this.interfaceName = interfaceName;
this.clusters = clusters;
init();
}

public Object call(String methodName, Object[] arguments, Class returnType, Map<String, String> attachments, boolean async) throws Throwable {
DefaultRequest request = new DefaultRequest();
request.setRequestId(RequestIdGenerator.getRequestId());
request.setInterfaceName(interfaceName);
request.setMethodName(methodName);
request.setArguments(arguments);
request.setAttachments(attachments);
request.setParamtersDesc(ReflectUtil.getParamsDesc(arguments));
return invokeRequest(request, returnType, async);
}

@Override
public Object call(String methodName, Object[] arguments, Class returnType) throws Throwable {
return call(methodName, arguments, returnType, null, false);
}

@Override
public Object asyncCall(String methodName, Object[] arguments, Class returnType) throws Throwable {
return call(methodName, arguments, returnType, null, true);
}

@Override
public Object call(Request request, Class returnType) throws Throwable {
return invokeRequest(request, returnType, false);
}

@Override
public Object asyncCall(Request request, Class returnType) throws Throwable {
return invokeRequest(request, returnType, true);
}

@Override
public Request buildRequest(String methodName, Object[] arguments) {
DefaultRequest request = new DefaultRequest();
request.setRequestId(RequestIdGenerator.getRequestId());
request.setInterfaceName(interfaceName);
request.setMethodName(methodName);
request.setArguments(arguments);
request.setParamtersDesc(ReflectUtil.getParamsDesc(arguments));
return request;
}

}
Loading

0 comments on commit 042f735

Please sign in to comment.