Skip to content

Commit

Permalink
Merge branch 'master' into feature/common-handler
Browse files Browse the repository at this point in the history
# Conflicts:
#	motan-core/src/main/java/com/weibo/api/motan/proxy/spi/JdkProxyFactory.java
  • Loading branch information
sunnights committed Apr 10, 2018
2 parents f7d3559 + 9724ff3 commit a633fe9
Show file tree
Hide file tree
Showing 18 changed files with 177 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ public interface BenchmarkService {

public Map<Long, Integer> getUserTypes(List<Long> uids);

public long[] getLastStausIds(long[] uids);
public long[] getLastStatusIds(long[] uids);

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public Map<Long, Integer> getUserTypes(List<Long> uids) {
}

@Override
public long[] getLastStausIds(long[] uids) {
public long[] getLastStatusIds(long[] uids) {
return new long[0];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.weibo.api.motan.util.CollectionUtil;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.MathUtil;

import org.apache.commons.lang3.StringUtils;

import java.util.*;
Expand Down Expand Up @@ -202,7 +201,7 @@ Referer<T> next() {
String group = randomKeyList.get(ThreadLocalRandom.current().nextInt(randomKeySize));
AtomicInteger ai = cursors.get(group);
List<Referer<T>> referers = groupReferers.get(group);
return referers.get(MathUtil.getPositive(ai.getAndIncrement()) % referers.size());
return referers.get(MathUtil.getNonNegative(ai.getAndIncrement()) % referers.size());
}

// 求最大公约数
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@

package com.weibo.api.motan.cluster.loadbalance;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import com.weibo.api.motan.common.MotanConstants;
import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.rpc.Referer;
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.util.MathUtil;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
*
* Use consistent hash to choose referer
Expand Down Expand Up @@ -89,7 +89,7 @@ private int getHash(Request request) {
} else {
hashcode = Arrays.hashCode(request.getArguments());
}
return MathUtil.getPositive(hashcode);
return MathUtil.getNonNegative(hashcode);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package com.weibo.api.motan.cluster.loadbalance;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.rpc.Referer;
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.util.MathUtil;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
*
* Round robin loadbalance.
Expand All @@ -40,7 +40,7 @@ public class RoundRobinLoadBalance<T> extends AbstractLoadBalance<T> {
protected Referer<T> doSelect(Request request) {
List<Referer<T>> referers = getReferers();

int index = getNextPositive();
int index = getNextNonNegative();
for (int i = 0; i < referers.size(); i++) {
Referer<T> ref = referers.get((i + index) % referers.size());
if (ref.isAvailable()) {
Expand All @@ -54,7 +54,7 @@ protected Referer<T> doSelect(Request request) {
protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder) {
List<Referer<T>> referers = getReferers();

int index = getNextPositive();
int index = getNextNonNegative();
for (int i = 0, count = 0; i < referers.size() && count < MAX_REFERER_COUNT; i++) {
Referer<T> referer = referers.get((i + index) % referers.size());
if (referer.isAvailable()) {
Expand All @@ -64,8 +64,8 @@ protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder)
}
}

// get positive int
private int getNextPositive() {
return MathUtil.getPositive(idx.incrementAndGet());
// get non-negative int
private int getNextNonNegative() {
return MathUtil.getNonNegative(idx.incrementAndGet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public ObjectInput createInput(InputStream in) {
}
}

protected static synchronized void initAllSerialziation() {
protected static synchronized void initAllSerialization() {
if (serializations == null) {
serializations = new ConcurrentHashMap<Integer, String>();
try {
Expand All @@ -89,9 +89,9 @@ protected static synchronized void initAllSerialziation() {
}
}

protected Serialization getSerializaiontByNum(int serializationNum) {
protected Serialization getSerializationByNum(int serializationNum) {
if (serializations == null) {
initAllSerialziation();
initAllSerialization();
}
String name = serializations.get(serializationNum);
Serialization s = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class MotanV2Codec extends AbstractCodec {


static {
initAllSerialziation();
initAllSerialization();
}

@Override
Expand Down Expand Up @@ -233,7 +233,7 @@ public Object decode(Channel channel, String remoteIp, byte[] data) throws IOExc
body = ByteUtil.unGzip(body);
}
//默认自适应序列化
Serialization serialization = getSerializaiontByNum(header.getSerialize());
Serialization serialization = getSerializationByNum(header.getSerialize());
obj = new DeserializableObject(serialization, body);
}
if (header.isRequest()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ public class JdkProxyFactory implements ProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clz, List<Cluster<T>> clusters) {
return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{clz}, new RefererInvocationHandler<>(clz, clusters));
return (T) Proxy.newProxyInstance(clz.getClassLoader(), new Class[]{clz}, new RefererInvocationHandler<>(clz, clusters));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@

package com.weibo.api.motan.registry.support.command;

import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.StringUtils;

import com.weibo.api.motan.registry.NotifyListener;
import com.weibo.api.motan.registry.support.FailbackRegistry;
import com.weibo.api.motan.rpc.URL;
import com.weibo.api.motan.util.LoggerUtil;
import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

public abstract class CommandFailbackRegistry extends FailbackRegistry {

Expand Down Expand Up @@ -93,7 +92,7 @@ protected List<URL> doDiscover(URL url) {
finalResult = discoverService(urlCopy);
}

LoggerUtil.info("CommandFailbackRegistry discover size: " + finalResult.size() + ", result:" + finalResult.toString());
LoggerUtil.info("CommandFailbackRegistry discover size: " + (finalResult == null ? "0" : finalResult.size()));

return finalResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ public class SimpleSerialization implements Serialization {
@Override
public byte[] serialize(Object obj) throws IOException {
GrowableByteBuffer buffer = new GrowableByteBuffer(4096);
serialize(obj, buffer);
buffer.flip();
byte[] result = new byte[buffer.remaining()];
buffer.get(result);
return result;
}

private void serialize(Object obj, GrowableByteBuffer buffer) throws IOException {
if (obj != null) {
if (obj instanceof String) {
buffer.put((byte) 1);
Expand All @@ -48,19 +56,20 @@ public byte[] serialize(Object obj) throws IOException {
buffer.put(b);
} else if (obj instanceof Map) {
buffer.put((byte) 2);
int pos = buffer.position();
int size = 0;
buffer.position(5);
buffer.position(pos + 4);
for (Entry<Object, Object> entry : ((Map<Object, Object>) obj).entrySet()) {
if (entry.getKey() != null && entry.getValue() != null
&& (entry.getKey() instanceof String) && (entry.getValue() instanceof String)) {
size += putString(buffer, (String) entry.getKey());
size += putString(buffer, (String) entry.getValue());
}
}
buffer.position(1);
buffer.position(pos);
buffer.putInt(size);
buffer.position(5 + size);
} else if(obj instanceof byte[]){
buffer.position(pos + size + 4);
} else if (obj instanceof byte[]) {
buffer.put((byte) 3);
byte[] b = (byte[]) obj;
buffer.putInt(b.length);
Expand All @@ -71,15 +80,15 @@ public byte[] serialize(Object obj) throws IOException {
} else {
buffer.put((byte) 0);
}
buffer.flip();
byte[] result = new byte[buffer.remaining()];
buffer.get(result);
return result;
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clz) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
return deserialize(buffer, clz);
}

private <T> T deserialize(ByteBuffer buffer, Class<T> clz) throws IOException {
byte type = buffer.get();
switch (type) {
case 0:
Expand Down Expand Up @@ -111,6 +120,7 @@ public <T> T deserialize(byte[] bytes, Class<T> clz) throws IOException {
key = getString(buffer);
}
}
buffer.limit(buffer.capacity());
return (T) map;
} else {
throw new MotanServiceException("SimpleSerialization not support type:" + clz);
Expand All @@ -125,30 +135,24 @@ public <T> T deserialize(byte[] bytes, Class<T> clz) throws IOException {

@Override
public byte[] serializeMulti(Object[] data) throws IOException {
if (data.length == 1) {
return serialize(data[0]);
GrowableByteBuffer buffer = new GrowableByteBuffer(4096);
for (Object o : data) {
serialize(o, buffer);
}
//TODO mulit param support
throw new MotanServiceException("SimpleSerialization not support serialize multi Object");
buffer.flip();
byte[] result = new byte[buffer.remaining()];
buffer.get(result);
return result;
}

@Override
public Object[] deserializeMulti(byte[] data, Class<?>[] classes) throws IOException {
if (classes.length == 1) {
return new Object[]{deserialize(data, classes[0])};
} else {
StringBuilder sb = new StringBuilder(128);
sb.append("[");
for (Class c : classes) {
sb.append(c.getName()).append(",");
}
if (sb.length() > 1) {
sb.deleteCharAt(sb.length() - 1);
}
sb.append("]");
throw new MotanServiceException("SimpleSerialization not support deserialize multi Object of " + classes);
ByteBuffer buffer = ByteBuffer.wrap(data);
Object[] result = new Object[classes.length];
for (int i = 0; i < classes.length; i++) {
result[i] = deserialize(buffer, classes[i]);
}

return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class LocalSwitcherService implements SwitcherService {

private static ConcurrentMap<String, Switcher> switchers = new ConcurrentHashMap<String, Switcher>();

private Map<String, List<SwitcherListener>> listenerMap = new ConcurrentHashMap();
private Map<String, List<SwitcherListener>> listenerMap = new ConcurrentHashMap<>();

@Override
public Switcher getSwitcher(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private void createConnections() {
}

protected Channel getChannel() throws MotanServiceException {
int index = MathUtil.getPositive(idx.getAndIncrement());
int index = MathUtil.getNonNegative(idx.getAndIncrement());
Channel channel;

for (int i = index; i < connections + index; i++) {
Expand Down
25 changes: 22 additions & 3 deletions motan-core/src/main/java/com/weibo/api/motan/util/MathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@

public class MathUtil {


/**
* 针对int类型字符串进行解析,如果存在格式错误,则返回默认值(defaultValue)
* Parse intStr, return defaultValue when numberFormatException occurs
*
* @param intStr
* @param defaultValue
* @return
*/
public static int parseInt(String intStr, int defaultValue) {
try {
return Integer.parseInt(intStr);
Expand All @@ -32,20 +41,30 @@ public static int parseInt(String intStr, int defaultValue) {
}
}

/**
* 针对long类型字符串进行解析,如果存在格式错误,则返回默认值(defaultValue)
* Parse longStr, return defaultValue when numberFormatException occurs
* @param longStr
* @param defaultValue
* @return
*/
public static long parseLong(String longStr, long defaultValue){
try {
return Long.parseLong(longStr);
} catch (NumberFormatException e) {
return defaultValue;
}
}

/**
* return positive int value of originValue
* 通过二进制位操作将originValue转化为非负数:
* 0和正数返回本身
* 负数通过二进制首位取反转化为正数或0(Integer.MIN_VALUE将转换为0)
* return non-negative int value of originValue
* @param originValue
* @return positive int
*/
public static int getPositive(int originValue){
public static int getNonNegative(int originValue){
return 0x7fffffff & originValue;
}
}
Loading

0 comments on commit a633fe9

Please sign in to comment.