diff --git a/motan-core/src/main/java/com/weibo/api/motan/common/MotanConstants.java b/motan-core/src/main/java/com/weibo/api/motan/common/MotanConstants.java index 3a1f1009..dcb4a955 100644 --- a/motan-core/src/main/java/com/weibo/api/motan/common/MotanConstants.java +++ b/motan-core/src/main/java/com/weibo/api/motan/common/MotanConstants.java @@ -142,6 +142,7 @@ public class MotanConstants { public static final String M2_ERROR = "M_e"; public static final String M2_PROCESS_TIME = "M_pt"; + // ------------------ request trace point constants ----------------- public static final String TRACE_INVOKE = "TRACE_INVOKE"; //client 发起请求 public static final String TRACE_CONNECTION = "TRACE_CONNECTION"; // client获取链接 public static final String TRACE_CENCODE = "TRACE_CENCODE"; //client编码 @@ -157,6 +158,9 @@ public class MotanConstants { public static final String TRACE_CRECEIVE = "TRACE_CRECEIVE";// client端接收response public static final String TRACE_CDECODE = "TRACE_CDECODE"; // client端解码response + // ------------------ attachment constants ----------------- + public static final String ATT_PRINT_TRACE_LOG = "print_trace_log"; // 针对单请求是否打印(access)日志 + private MotanConstants() { } diff --git a/motan-core/src/main/java/com/weibo/api/motan/filter/AccessLogFilter.java b/motan-core/src/main/java/com/weibo/api/motan/filter/AccessLogFilter.java index 34fa3ab7..89eba62d 100644 --- a/motan-core/src/main/java/com/weibo/api/motan/filter/AccessLogFilter.java +++ b/motan-core/src/main/java/com/weibo/api/motan/filter/AccessLogFilter.java @@ -20,14 +20,8 @@ import com.weibo.api.motan.common.URLParamType; import com.weibo.api.motan.core.extension.Activation; import com.weibo.api.motan.core.extension.SpiMeta; -import com.weibo.api.motan.rpc.Caller; -import com.weibo.api.motan.rpc.Provider; -import com.weibo.api.motan.rpc.Request; -import com.weibo.api.motan.rpc.Response; -import com.weibo.api.motan.util.LoggerUtil; -import com.weibo.api.motan.util.MotanSwitcherUtil; -import com.weibo.api.motan.util.NetUtils; -import com.weibo.api.motan.util.StringTools; +import com.weibo.api.motan.rpc.*; +import com.weibo.api.motan.util.*; import org.apache.commons.lang3.StringUtils; /** @@ -47,12 +41,14 @@ public class AccessLogFilter implements Filter { public static final String ACCESS_LOG_SWITCHER_NAME = "feature.motan.filter.accessLog"; + public static final String PRINT_TRACE_LOG_SWITCHER_NAME = "feature.motan.printTraceLog.enable"; private String side; private Boolean accessLog; static { - // init global switcher, default value is false + // init global switcher MotanSwitcherUtil.initSwitcher(ACCESS_LOG_SWITCHER_NAME, false); + MotanSwitcherUtil.initSwitcher(PRINT_TRACE_LOG_SWITCHER_NAME, true); } @Override @@ -60,23 +56,63 @@ public Response filter(Caller caller, Request request) { if (accessLog == null) { accessLog = caller.getUrl().getBooleanParameter(URLParamType.accessLog.getName(), URLParamType.accessLog.getBooleanValue()); } - if (accessLog || MotanSwitcherUtil.isOpen(ACCESS_LOG_SWITCHER_NAME)) { - long t1 = System.currentTimeMillis(); + if (accessLog || needLog(request)) { + long start = System.currentTimeMillis(); boolean success = false; + Response response = null; try { - Response response = caller.call(request); - success = true; + response = caller.call(request); + if (response != null && response.getException() == null) { + success = true; + } return response; } finally { - long consumeTime = System.currentTimeMillis() - t1; - logAccess(caller, request, consumeTime, success); + processFinalLog(caller, request, response, start, success); } } else { return caller.call(request); } } - private void logAccess(Caller caller, Request request, long consumeTime, boolean success) { + private void processFinalLog(final Caller caller, final Request request, final Response response, final long start, final boolean success) { + long wholeTime = System.currentTimeMillis() - start; + long segmentTime = wholeTime; // 分段耗时。server侧是内部业务耗时,client侧时server整体耗时+网络接收耗时 + + if (request instanceof Traceable && response instanceof Traceable) { // 可以取得细分时间点 + if (caller instanceof Provider) { // server end + if (response instanceof Callbackable) {// 因server侧完整耗时包括response发送时间,需要通过callback机制异步记录日志。 + long finalSegmentTime = segmentTime; + ((Callbackable) response).addFinishCallback(() -> { + long responseSend = ((Traceable) response).getTraceableContext().getSendTime(); + long requestReceive = ((Traceable) request).getTraceableContext().getReceiveTime(); + long finalWholeTime = responseSend - requestReceive; + logAccess(caller, request, response, finalSegmentTime, finalWholeTime, success); + }, null); + return; + } + } else { // client end + long requestSend = ((Traceable) request).getTraceableContext().getSendTime(); + long responseReceive = ((Traceable) response).getTraceableContext().getReceiveTime(); + segmentTime = responseReceive - requestSend; + } + } + logAccess(caller, request, response, segmentTime, wholeTime, success); // 同步记录access日志 + } + + // 除了access log配置外,其他需要动态打印access的情况 + private boolean needLog(Request request) { + if (MotanSwitcherUtil.isOpen(ACCESS_LOG_SWITCHER_NAME)) { + return true; + } + + // check trace log + if (!MotanSwitcherUtil.isOpen(PRINT_TRACE_LOG_SWITCHER_NAME)) { + return false; + } + return "true".equalsIgnoreCase(request.getAttachments().get(MotanConstants.ATT_PRINT_TRACE_LOG)); + } + + private void logAccess(Caller caller, Request request, Response response, long segmentTime, long wholeTime, boolean success) { if (getSide() == null) { String side = caller instanceof Provider ? MotanConstants.NODE_TYPE_SERVICE : MotanConstants.NODE_TYPE_REFERER; setSide(side); @@ -107,7 +143,10 @@ private void logAccess(Caller caller, Request request, long consumeTime, bool requestId = String.valueOf(request.getRequestId()); } append(builder, requestId); - append(builder, consumeTime); + append(builder, request.getAttachments().get(MotanConstants.CONTENT_LENGTH)); + append(builder, response.getAttachments().get(MotanConstants.CONTENT_LENGTH)); + append(builder, segmentTime); + append(builder, wholeTime); LoggerUtil.accessLog(builder.substring(0, builder.length() - 1)); } diff --git a/motan-core/src/main/java/com/weibo/api/motan/rpc/DefaultProvider.java b/motan-core/src/main/java/com/weibo/api/motan/rpc/DefaultProvider.java index 0e73710a..d18b17b2 100644 --- a/motan-core/src/main/java/com/weibo/api/motan/rpc/DefaultProvider.java +++ b/motan-core/src/main/java/com/weibo/api/motan/rpc/DefaultProvider.java @@ -102,7 +102,6 @@ public Response invoke(Request request) { ExceptionUtil.setMockStackTrace(response.getException().getCause()); } } - response.setAttachments(request.getAttachments()); return response; } } diff --git a/motan-core/src/main/java/com/weibo/api/motan/rpc/DefaultResponse.java b/motan-core/src/main/java/com/weibo/api/motan/rpc/DefaultResponse.java index c09bf808..7060e1e2 100644 --- a/motan-core/src/main/java/com/weibo/api/motan/rpc/DefaultResponse.java +++ b/motan-core/src/main/java/com/weibo/api/motan/rpc/DefaultResponse.java @@ -16,6 +16,8 @@ package com.weibo.api.motan.rpc; +import com.weibo.api.motan.core.DefaultThreadFactory; +import com.weibo.api.motan.core.StandardThreadExecutor; import com.weibo.api.motan.exception.MotanServiceException; import com.weibo.api.motan.protocol.rpc.RpcProtocolVersion; import com.weibo.api.motan.util.LoggerUtil; @@ -24,8 +26,12 @@ import java.io.Serializable; import java.util.*; import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; +import static com.weibo.api.motan.core.StandardThreadExecutor.DEFAULT_MAX_IDLE_TIME; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + /** * Response received via rpc. * @@ -34,6 +40,9 @@ */ public class DefaultResponse implements Response, Traceable, Callbackable, Serializable { private static final long serialVersionUID = 4281186647291615871L; + protected static ThreadPoolExecutor defaultCallbackExecutor = new StandardThreadExecutor(20, 200, + DEFAULT_MAX_IDLE_TIME, MILLISECONDS, 5000, + new DefaultThreadFactory("defaultResponseCallbackPool-", true), new ThreadPoolExecutor.DiscardPolicy()); private Object value; private Exception exception; @@ -42,7 +51,7 @@ public class DefaultResponse implements Response, Traceable, Callbackable, Seria private int timeout; private Map attachments;// rpc协议版本兼容时可以回传一些额外的信息 private byte rpcProtocolVersion = RpcProtocolVersion.VERSION_1.getVersion(); - private int serializeNumber = 0;// default serialization is hession2 + private int serializeNumber = 0;// default serialization is hessian2 private List> taskList = new ArrayList<>(); private AtomicBoolean isFinished = new AtomicBoolean(); private TraceableContext traceableContext = new TraceableContext(); @@ -75,6 +84,7 @@ public DefaultResponse(Object value) { public DefaultResponse(Object value, long requestId) { this.value = value; + this.requestId = requestId; } @Override @@ -126,7 +136,7 @@ public int getTimeout() { @Override public Map getAttachments() { - return attachments != null ? attachments : Collections.emptyMap(); + return attachments != null ? attachments : Collections.emptyMap(); } public void setAttachments(Map attachments) { @@ -162,6 +172,12 @@ public int getSerializeNumber() { return serializeNumber; } + /** + * 未指定线程池时,统一使用默认线程池执行。默认线程池满时采用丢弃策略,不保证任务一定会被执行。 + * 如果默认线程池不满足需求时,可以自行携带executor。 + * @param runnable 准备在response on finish时执行的任务 + * @param executor 指定执行任务的线程池 + */ public void addFinishCallback(Runnable runnable, Executor executor) { if (!isFinished.get()) { taskList.add(Pair.of(runnable, executor)); @@ -177,13 +193,12 @@ public void onFinish() { Runnable runnable = pair.getKey(); Executor executor = pair.getValue(); if (executor == null) { - runnable.run(); - } else { - try { - executor.execute(runnable); - } catch (Exception e) { - LoggerUtil.error("Callbackable response exec callback task error, e: ", e); - } + executor = defaultCallbackExecutor; + } + try { + executor.execute(runnable); + } catch (Exception e) { + LoggerUtil.error("Callbackable response exec callback task error, e: ", e); } } } diff --git a/motan-core/src/test/java/com/weibo/api/motan/filter/AccessLogFilterTest.java b/motan-core/src/test/java/com/weibo/api/motan/filter/AccessLogFilterTest.java index 0b562099..79e678c2 100644 --- a/motan-core/src/test/java/com/weibo/api/motan/filter/AccessLogFilterTest.java +++ b/motan-core/src/test/java/com/weibo/api/motan/filter/AccessLogFilterTest.java @@ -23,15 +23,13 @@ import com.weibo.api.motan.log.LogService; import com.weibo.api.motan.protocol.example.IHello; import com.weibo.api.motan.registry.RegistryService; -import com.weibo.api.motan.rpc.Caller; -import com.weibo.api.motan.rpc.Request; -import com.weibo.api.motan.rpc.Response; -import com.weibo.api.motan.rpc.URL; +import com.weibo.api.motan.rpc.*; import com.weibo.api.motan.util.LoggerUtil; import com.weibo.api.motan.util.MotanSwitcherUtil; import com.weibo.api.motan.util.NetUtils; import org.jmock.Expectations; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; @@ -49,89 +47,181 @@ public class AccessLogFilterTest extends BaseTestCase { @Override public void setUp() throws Exception { super.setUp(); + accessLogFilter = new AccessLogFilter(); } - @SuppressWarnings("unchecked") - public void testCall() { - final Request request = mockery.mock(Request.class); - final Response response = mockery.mock(Response.class); - final URL url = - new URL(MotanConstants.PROTOCOL_MOTAN, NetUtils.getLocalAddress().getHostAddress(), 0, RegistryService.class.getName()); + public void testCall() throws Exception { + final URL url = getDefaultUrl(); + url.addParameter(URLParamType.accessLog.getName(), String.valueOf(false)); + final Map attachments = getDefaultAttachment(); + checkProcess(url, attachments, false); + url.addParameter(URLParamType.accessLog.getName(), String.valueOf(true)); + checkProcess(url, attachments, true); + } - final Caller caller = mockery.mock(Caller.class); - final Map attachments = new HashMap(); + public void testSwitcher() throws Exception { + URL url = getDefaultUrl(); + url.addParameter(URLParamType.accessLog.getName(), String.valueOf(false)); + final Map attachments = getDefaultAttachment(); + checkProcess(url, attachments, false); + + MotanSwitcherUtil.setSwitcherValue(AccessLogFilter.ACCESS_LOG_SWITCHER_NAME, true); + checkProcess(url, attachments, true); + + MotanSwitcherUtil.setSwitcherValue(AccessLogFilter.ACCESS_LOG_SWITCHER_NAME, false); + checkProcess(url, attachments, false); + } + + public void testTraceLog() throws Exception { + URL url = getDefaultUrl(); + url.addParameter(URLParamType.accessLog.getName(), String.valueOf(false)); // not log access + final Map attachments = getDefaultAttachment(); + // 强制log开关关闭 + MotanSwitcherUtil.setSwitcherValue(AccessLogFilter.ACCESS_LOG_SWITCHER_NAME, false); + checkProcess(url, attachments, false); + + // set trace log attachment + attachments.put(MotanConstants.ATT_PRINT_TRACE_LOG, "true"); + checkProcess(url, attachments, true); + + } + + private URL getDefaultUrl() { + return new URL(MotanConstants.PROTOCOL_MOTAN, NetUtils.getLocalAddress().getHostAddress(), 0, RegistryService.class.getName()); + } + + private Map getDefaultAttachment() { + Map attachments = new HashMap<>(); attachments.put(URLParamType.host.getName(), URLParamType.host.getValue()); attachments.put(URLParamType.application.getName(), URLParamType.application.getValue()); attachments.put(URLParamType.module.getName(), URLParamType.module.getValue()); + return attachments; + } - mockery.checking(new Expectations() { - { - atLeast(1).of(caller).getUrl(); - will(returnValue(url)); - exactly(1).of(caller).call(request); - will(returnValue(response)); - exactly(1).of(request).getInterfaceName(); - will(returnValue(IHello.class.getName())); - exactly(1).of(request).getMethodName(); - will(returnValue("get")); - exactly(1).of(request).getParamtersDesc(); - will(returnValue("param_desc")); - atLeast(1).of(request).getAttachments(); - will(returnValue(attachments)); - allowing(request).getRequestId(); - } - }); - - accessLogFilter.filter(caller, request); - mockery.assertIsSatisfied(); + private void checkProcess(URL url, Map attachments, boolean isProcess) throws Exception { + checkProcessNormal(url, attachments, isProcess); + checkProcessWithTraceable(url, attachments, isProcess, true); + checkProcessWithTraceable(url, attachments, isProcess, false); } - public void testSwitcher() { + private void checkProcessNormal(URL url, Map attachments, boolean isProcess) throws Exception { + resetMockery(); final Request request = mockery.mock(Request.class); final Caller caller = mockery.mock(Caller.class); - URL url = new URL(MotanConstants.PROTOCOL_MOTAN, NetUtils.getLocalAddress().getHostAddress(), 0, RegistryService.class.getName()); - url.addParameter(URLParamType.accessLog.getName(), String.valueOf(false)); - final Map attachments = new HashMap<>(); - attachments.put(URLParamType.host.getName(), URLParamType.host.getValue()); - attachments.put(URLParamType.application.getName(), URLParamType.application.getValue()); - attachments.put(URLParamType.module.getName(), URLParamType.module.getValue()); final LogService logService = mockery.mock(LogService.class); LoggerUtil.setLogService(logService); mockery.checking(new Expectations() { { - atLeast(1).of(caller).getUrl(); + allowing(caller).getUrl(); will(returnValue(url)); atLeast(1).of(caller).call(request); + allowing(request).getAttachments(); + will(returnValue(attachments)); } }); - + if (isProcess) { + mockery.checking(new Expectations() { + { + exactly(1).of(request).getInterfaceName(); + will(returnValue(IHello.class.getName())); + exactly(1).of(request).getMethodName(); + will(returnValue("get")); + exactly(1).of(request).getParamtersDesc(); + will(returnValue("param_desc")); + exactly(1).of(logService).accessLog(with(any(String.class))); + allowing(request).getRequestId(); + } + }); + } else { + mockery.checking(new Expectations() { + { + never(request).getInterfaceName(); + never(request).getMethodName(); + never(logService).accessLog(with(any(String.class))); + } + }); + } accessLogFilter.filter(caller, request); mockery.assertIsSatisfied(); - MotanSwitcherUtil.setSwitcherValue(AccessLogFilter.ACCESS_LOG_SWITCHER_NAME, true); + LoggerUtil.setLogService(new DefaultLogService()); + } + + private void checkProcessWithTraceable(URL url, Map attachments, boolean isProcess, boolean isServerEnd) throws Exception { + resetMockery(); + final DefaultRequest request = mockery.mock(DefaultRequest.class); + final DefaultResponse response = new DefaultResponse(); + final Caller caller; + if (isServerEnd) { + caller = mockery.mock(Provider.class); + } else { + caller = mockery.mock(Referer.class); + } + final LogService logService = mockery.mock(LogService.class); + final TraceableContext requestTraceableContext = mockery.mock(TraceableContext.class, "requestTraceableContext"); + final TraceableContext responseTraceableContext = mockery.mock(TraceableContext.class, "responseTraceableContext"); + Field field = DefaultResponse.class.getDeclaredField("traceableContext"); + field.setAccessible(true); + field.set(response, responseTraceableContext); + LoggerUtil.setLogService(logService); mockery.checking(new Expectations() { { - exactly(1).of(request).getInterfaceName(); - will(returnValue(IHello.class.getName())); - exactly(1).of(request).getMethodName(); - will(returnValue("get")); - exactly(1).of(request).getParamtersDesc(); - will(returnValue("param_desc")); - atLeast(1).of(request).getAttachments(); + allowing(caller).getUrl(); + will(returnValue(url)); + atLeast(1).of(caller).call(request); + will(returnValue(response)); + allowing(request).getAttachments(); will(returnValue(attachments)); - exactly(1).of(logService).accessLog(with(any(String.class))); - allowing(request).getRequestId(); } }); - + if (isProcess) { + mockery.checking(new Expectations() { + { + exactly(1).of(request).getInterfaceName(); + will(returnValue(IHello.class.getName())); + exactly(1).of(request).getMethodName(); + will(returnValue("get")); + exactly(1).of(request).getParamtersDesc(); + will(returnValue("param_desc")); + exactly(1).of(logService).accessLog(with(any(String.class))); + allowing(request).getRequestId(); + allowing(request).getTraceableContext(); + will(returnValue(requestTraceableContext)); + if (isServerEnd) { + exactly(1).of(requestTraceableContext).getReceiveTime(); // request receive time for server end + will(returnValue(10L)); + exactly(1).of(responseTraceableContext).getSendTime(); // response send time for server end + will(returnValue(15L)); + } else { + exactly(1).of(requestTraceableContext).getSendTime(); // request send time for client end + will(returnValue(10L)); + exactly(1).of(responseTraceableContext).getReceiveTime(); // response receive time for client end + will(returnValue(18L)); + } + } + }); + } else { + mockery.checking(new Expectations() { + { + never(request).getInterfaceName(); + never(request).getMethodName(); + never(logService).accessLog(with(any(String.class))); + never(request).getTraceableContext(); + } + }); + } accessLogFilter.filter(caller, request); + response.onFinish(); + if (isServerEnd) { + Thread.sleep(100); + } mockery.assertIsSatisfied(); - MotanSwitcherUtil.setSwitcherValue(AccessLogFilter.ACCESS_LOG_SWITCHER_NAME, false); - accessLogFilter.filter(caller, request); - mockery.assertIsSatisfied(); + LoggerUtil.setLogService(new DefaultLogService()); + } - LoggerUtil.setLogService( new DefaultLogService()); + private void resetMockery() throws Exception { + setUp(); } } diff --git a/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyDecoder.java b/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyDecoder.java index 9f98f861..48f1c5dc 100644 --- a/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyDecoder.java +++ b/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyDecoder.java @@ -58,6 +58,7 @@ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffe return null; } buffer.markReaderIndex(); + int startIndex = buffer.readerIndex(); short type = buffer.readShort(); if (type != MotanConstants.NETTY_MAGIC_TYPE) { @@ -81,11 +82,15 @@ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffe } if (result instanceof Request) { - MotanFrameworkUtil.logEvent((Request) result, MotanConstants.TRACE_SRECEIVE, requestStart); - MotanFrameworkUtil.logEvent((Request) result, MotanConstants.TRACE_SDECODE); + Request request = (Request) result; + MotanFrameworkUtil.logEvent(request, MotanConstants.TRACE_SRECEIVE, requestStart); + MotanFrameworkUtil.logEvent(request, MotanConstants.TRACE_SDECODE); + request.setAttachment(MotanConstants.CONTENT_LENGTH, String.valueOf(buffer.readerIndex() - startIndex)); } else if (result instanceof Response) { - MotanFrameworkUtil.logEvent((Response) result, MotanConstants.TRACE_CRECEIVE, requestStart); - MotanFrameworkUtil.logEvent((Response) result, MotanConstants.TRACE_CDECODE); + Response response = (Response) result; + MotanFrameworkUtil.logEvent(response, MotanConstants.TRACE_CRECEIVE, requestStart); + MotanFrameworkUtil.logEvent(response, MotanConstants.TRACE_CDECODE); + response.setAttachment(MotanConstants.CONTENT_LENGTH, String.valueOf(buffer.readerIndex() - startIndex)); } return result; } diff --git a/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyEncoder.java b/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyEncoder.java index 404a5ff9..75ffa729 100644 --- a/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyEncoder.java +++ b/motan-transport-netty/src/main/java/com/weibo/api/motan/transport/netty/NettyEncoder.java @@ -56,17 +56,18 @@ protected Object encode(ChannelHandlerContext ctx, Channel nettyChannel, Object if (type == DefaultRpcCodec.MAGIC) { channelBuffer = encodeV1(message, data); } else if (type == MotanV2Header.MAGIC) { - channelBuffer = encodeV2(data); + channelBuffer = encodeV2(data); } else { throw new MotanFrameworkException("can not encode message, unknown magic:" + type); } - if (message instanceof Response) { - ((Response) message).setAttachment(MotanConstants.CONTENT_LENGTH, String.valueOf(channelBuffer.readableBytes())); - } if (message instanceof Request) { - MotanFrameworkUtil.logEvent((Request) message, MotanConstants.TRACE_CENCODE); + Request request = (Request) message; + MotanFrameworkUtil.logEvent(request, MotanConstants.TRACE_CENCODE); + request.setAttachment(MotanConstants.CONTENT_LENGTH, String.valueOf(channelBuffer.readableBytes())); } else if (message instanceof Response) { - MotanFrameworkUtil.logEvent((Response) message, MotanConstants.TRACE_SENCODE); + Response response = (Response) message; + MotanFrameworkUtil.logEvent(response, MotanConstants.TRACE_SENCODE); + response.setAttachment(MotanConstants.CONTENT_LENGTH, String.valueOf(channelBuffer.readableBytes())); } return channelBuffer; } diff --git a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyChannel.java b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyChannel.java index f284e110..3b6935f9 100644 --- a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyChannel.java +++ b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyChannel.java @@ -59,6 +59,7 @@ public Response request(Request request) throws TransportException { this.nettyClient.registerCallback(request.getRequestId(), response); byte[] msg = CodecUtil.encodeObjectToBytes(this, codec, request); ChannelFuture writeFuture = this.channel.writeAndFlush(msg); + request.setAttachment(MotanConstants.CONTENT_LENGTH, String.valueOf(msg.length)); boolean result = writeFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS); if (result && writeFuture.isSuccess()) { diff --git a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyChannelHandler.java b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyChannelHandler.java index 1db5dfe0..4b18c222 100644 --- a/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyChannelHandler.java +++ b/motan-transport-netty4/src/main/java/com/weibo/api/motan/transport/netty4/NettyChannelHandler.java @@ -7,6 +7,7 @@ import com.weibo.api.motan.exception.MotanErrorMsgConstant; import com.weibo.api.motan.exception.MotanFrameworkException; import com.weibo.api.motan.exception.MotanServiceException; +import com.weibo.api.motan.protocol.rpc.RpcProtocolVersion; import com.weibo.api.motan.rpc.DefaultResponse; import com.weibo.api.motan.rpc.Request; import com.weibo.api.motan.rpc.Response; @@ -121,15 +122,22 @@ private void processMessage(ChannelHandlerContext ctx, NettyMessage msg) { } return; } - + long length = msg.getData().length; + if (RpcProtocolVersion.VERSION_1 == msg.getVersion() || RpcProtocolVersion.VERSION_1_Compress == msg.getVersion()) { + length += RpcProtocolVersion.VERSION_1.getHeaderLength(); + } if (result instanceof Request) { - MotanFrameworkUtil.logEvent((Request) result, MotanConstants.TRACE_SRECEIVE, msg.getStartTime()); - MotanFrameworkUtil.logEvent((Request) result, MotanConstants.TRACE_SEXECUTOR_START, startTime); - MotanFrameworkUtil.logEvent((Request) result, MotanConstants.TRACE_SDECODE); - processRequest(ctx, (Request) result); + Request request = (Request) result; + MotanFrameworkUtil.logEvent(request, MotanConstants.TRACE_SRECEIVE, msg.getStartTime()); + MotanFrameworkUtil.logEvent(request, MotanConstants.TRACE_SEXECUTOR_START, startTime); + MotanFrameworkUtil.logEvent(request, MotanConstants.TRACE_SDECODE); + request.setAttachment(MotanConstants.CONTENT_LENGTH, String.valueOf(length)); + processRequest(ctx, request); } else if (result instanceof Response) { - MotanFrameworkUtil.logEvent((Response) result, MotanConstants.TRACE_CRECEIVE, msg.getStartTime()); - MotanFrameworkUtil.logEvent((Response) result, MotanConstants.TRACE_CDECODE); + Response response = (Response) result; + MotanFrameworkUtil.logEvent(response, MotanConstants.TRACE_CRECEIVE, msg.getStartTime()); + MotanFrameworkUtil.logEvent(response, MotanConstants.TRACE_CDECODE); + response.setAttachment(MotanConstants.CONTENT_LENGTH, String.valueOf(length)); processResponse(result); } }