Skip to content

Commit

Permalink
[ISSUE #3949] return acceptMessageTypes for quering topic route.
Browse files Browse the repository at this point in the history
  • Loading branch information
lollipopjin authored and zhouxinyu committed Jul 26, 2022
1 parent 8034690 commit 1e41097
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,6 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_TIMER_DEL_UNIQKEY);
STRING_HASH_SET.add(PROPERTY_TIMER_DELAY_LEVEL);
STRING_HASH_SET.add(PROPERTY_BORN_HOST);
STRING_HASH_SET.add(PROPERTY_BORN_TIMESTAMP);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.Endpoints;
import apache.rocketmq.v2.MessageQueue;
import apache.rocketmq.v2.MessageType;
import apache.rocketmq.v2.Permission;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
Expand All @@ -36,6 +37,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.proxy.common.ProxyContext;
Expand All @@ -61,22 +63,22 @@ public CompletableFuture<QueryRouteResponse> queryRoute(ProxyContext ctx, QueryR
validateTopic(request.getTopic());
List<org.apache.rocketmq.proxy.common.Address> addressList = this.convertToAddressList(request.getEndpoints());

String topicName = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic());
ProxyTopicRouteData proxyTopicRouteData = this.messagingProcessor.getTopicRouteDataForProxy(
ctx,
addressList,
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic()));
ctx, addressList, topicName);

List<MessageQueue> messageQueueList = new ArrayList<>();
Map<String, Map<Long, Broker>> brokerMap = buildBrokerMap(proxyTopicRouteData.getBrokerDatas());

TopicMessageType topicMessageType = messagingProcessor.getMetadataService().getTopicMessageType(topicName);
for (QueueData queueData : proxyTopicRouteData.getQueueDatas()) {
String brokerName = queueData.getBrokerName();
Map<Long, Broker> brokerIdMap = brokerMap.get(brokerName);
if (brokerIdMap == null) {
break;
}
for (Broker broker : brokerIdMap.values()) {
messageQueueList.addAll(this.genMessageQueueFromQueueData(queueData, request.getTopic(), broker));
messageQueueList.addAll(this.genMessageQueueFromQueueData(queueData, request.getTopic(), topicMessageType, broker));
}
}

Expand Down Expand Up @@ -205,7 +207,7 @@ protected List<org.apache.rocketmq.proxy.common.Address> convertToAddressList(En
return brokerMap;
}

protected List<MessageQueue> genMessageQueueFromQueueData(QueueData queueData, Resource topic, Broker broker) {
protected List<MessageQueue> genMessageQueueFromQueueData(QueueData queueData, Resource topic, TopicMessageType topicMessageType, Broker broker) {
List<MessageQueue> messageQueueList = new ArrayList<>();

int r = 0;
Expand All @@ -227,6 +229,7 @@ protected List<MessageQueue> genMessageQueueFromQueueData(QueueData queueData, R
MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
.setId(queueIdIndex++)
.setPermission(Permission.READ)
.addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
.build();
messageQueueList.add(messageQueue);
}
Expand All @@ -235,6 +238,7 @@ protected List<MessageQueue> genMessageQueueFromQueueData(QueueData queueData, R
MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
.setId(queueIdIndex++)
.setPermission(Permission.WRITE)
.addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
.build();
messageQueueList.add(messageQueue);
}
Expand All @@ -243,10 +247,26 @@ protected List<MessageQueue> genMessageQueueFromQueueData(QueueData queueData, R
MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
.setId(queueIdIndex++)
.setPermission(Permission.READ_WRITE)
.addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
.build();
messageQueueList.add(messageQueue);
}

return messageQueueList;
}

private MessageType parseTopicMessageType(TopicMessageType topicMessageType) {
switch (topicMessageType) {
case NORMAL:
return MessageType.NORMAL;
case FIFO:
return MessageType.FIFO;
case TRANSACTION:
return MessageType.TRANSACTION;
case DELAY:
return MessageType.DELAY;
default:
return MessageType.MESSAGE_TYPE_UNSPECIFIED;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import apache.rocketmq.v2.Broker;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.Endpoints;
import apache.rocketmq.v2.MessageType;
import apache.rocketmq.v2.MessageQueue;
import apache.rocketmq.v2.Permission;
import apache.rocketmq.v2.QueryAssignmentRequest;
Expand All @@ -33,16 +34,20 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
import org.apache.rocketmq.proxy.service.metadata.LocalMetadataService;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -93,6 +98,9 @@ public void testQueryRoute() throws Throwable {
ArgumentCaptor<List<org.apache.rocketmq.proxy.common.Address>> addressListCaptor = ArgumentCaptor.forClass(List.class);
when(this.messagingProcessor.getTopicRouteDataForProxy(any(), addressListCaptor.capture(), anyString()))
.thenReturn(createProxyTopicRouteData(2, 2, 6));
MetadataService metadataService = Mockito.mock(LocalMetadataService.class);
when(this.messagingProcessor.getMetadataService()).thenReturn(metadataService);
when(metadataService.getTopicMessageType(anyString())).thenReturn(TopicMessageType.NORMAL);

QueryRouteResponse response = this.routeActivity.queryRoute(
createContext(),
Expand Down Expand Up @@ -199,40 +207,45 @@ private static ProxyTopicRouteData createProxyTopicRouteData(int r, int w, int p
public void testGenPartitionFromQueueData() throws Exception {
// test queueData with 8 read queues, 8 write queues, and rw permission, expect 8 rw queues.
QueueData queueDataWith8R8WPermRW = createQueueData(8, 8, PermName.PERM_READ | PermName.PERM_WRITE);
List<MessageQueue> partitionWith8R8WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R8WPermRW, GRPC_TOPIC, GRPC_BROKER);
List<MessageQueue> partitionWith8R8WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R8WPermRW, GRPC_TOPIC, TopicMessageType.NORMAL, GRPC_BROKER);
assertEquals(8, partitionWith8R8WPermRW.size());
assertEquals(8, partitionWith8R8WPermRW.stream().filter(a -> a.getAcceptMessageTypesValue(0) == MessageType.NORMAL.getNumber()).count());
assertEquals(8, partitionWith8R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count());
assertEquals(0, partitionWith8R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ).count());
assertEquals(0, partitionWith8R8WPermRW.stream().filter(a -> a.getPermission() == Permission.WRITE).count());

// test queueData with 8 read queues, 8 write queues, and read only permission, expect 8 read only queues.
QueueData queueDataWith8R8WPermR = createQueueData(8, 8, PermName.PERM_READ);
List<MessageQueue> partitionWith8R8WPermR = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R8WPermR, GRPC_TOPIC, GRPC_BROKER);
List<MessageQueue> partitionWith8R8WPermR = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R8WPermR, GRPC_TOPIC, TopicMessageType.FIFO, GRPC_BROKER);
assertEquals(8, partitionWith8R8WPermR.size());
assertEquals(8, partitionWith8R8WPermR.stream().filter(a -> a.getAcceptMessageTypesValue(0) == MessageType.FIFO.getNumber()).count());
assertEquals(8, partitionWith8R8WPermR.stream().filter(a -> a.getPermission() == Permission.READ).count());
assertEquals(0, partitionWith8R8WPermR.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count());
assertEquals(0, partitionWith8R8WPermR.stream().filter(a -> a.getPermission() == Permission.WRITE).count());

// test queueData with 8 read queues, 8 write queues, and write only permission, expect 8 write only queues.
QueueData queueDataWith8R8WPermW = createQueueData(8, 8, PermName.PERM_WRITE);
List<MessageQueue> partitionWith8R8WPermW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R8WPermW, GRPC_TOPIC, GRPC_BROKER);
List<MessageQueue> partitionWith8R8WPermW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R8WPermW, GRPC_TOPIC, TopicMessageType.TRANSACTION, GRPC_BROKER);
assertEquals(8, partitionWith8R8WPermW.size());
assertEquals(8, partitionWith8R8WPermW.stream().filter(a -> a.getAcceptMessageTypesValue(0) == MessageType.TRANSACTION.getNumber()).count());
assertEquals(8, partitionWith8R8WPermW.stream().filter(a -> a.getPermission() == Permission.WRITE).count());
assertEquals(0, partitionWith8R8WPermW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count());
assertEquals(0, partitionWith8R8WPermW.stream().filter(a -> a.getPermission() == Permission.READ).count());

// test queueData with 8 read queues, 0 write queues, and rw permission, expect 8 read only queues.
QueueData queueDataWith8R0WPermRW = createQueueData(8, 0, PermName.PERM_READ | PermName.PERM_WRITE);
List<MessageQueue> partitionWith8R0WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R0WPermRW, GRPC_TOPIC, GRPC_BROKER);
List<MessageQueue> partitionWith8R0WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R0WPermRW, GRPC_TOPIC, TopicMessageType.DELAY, GRPC_BROKER);
assertEquals(8, partitionWith8R0WPermRW.size());
assertEquals(8, partitionWith8R0WPermRW.stream().filter(a -> a.getAcceptMessageTypesValue(0) == MessageType.DELAY.getNumber()).count());
assertEquals(8, partitionWith8R0WPermRW.stream().filter(a -> a.getPermission() == Permission.READ).count());
assertEquals(0, partitionWith8R0WPermRW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count());
assertEquals(0, partitionWith8R0WPermRW.stream().filter(a -> a.getPermission() == Permission.WRITE).count());

// test queueData with 4 read queues, 8 write queues, and rw permission, expect 4 rw queues and 4 write only queues.
QueueData queueDataWith4R8WPermRW = createQueueData(4, 8, PermName.PERM_READ | PermName.PERM_WRITE);
List<MessageQueue> partitionWith4R8WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith4R8WPermRW, GRPC_TOPIC, GRPC_BROKER);
List<MessageQueue> partitionWith4R8WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith4R8WPermRW, GRPC_TOPIC, TopicMessageType.NORMAL, GRPC_BROKER);
assertEquals(8, partitionWith4R8WPermRW.size());
assertEquals(8, partitionWith4R8WPermRW.stream().filter(a -> a.getAcceptMessageTypesValue(0) == MessageType.NORMAL.getNumber()).count());
assertEquals(4, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.WRITE).count());
assertEquals(4, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count());
assertEquals(0, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ).count());
Expand Down

0 comments on commit 1e41097

Please sign in to comment.