Skip to content

Commit

Permalink
[ISSUE apache#3949] use SendMessageRequestHeaderV2
Browse files Browse the repository at this point in the history
  • Loading branch information
xdkxlk authored and drpmma committed Jul 13, 2022
1 parent e5124e4 commit 6b45c99
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
Expand Down Expand Up @@ -137,8 +138,8 @@ protected SendMessageRequestHeader buildSendMessageRequestHeader(List<Message> m

requestHeader.setProducerGroup(producerGroup);
requestHeader.setTopic(message.getTopic());
requestHeader.setDefaultTopic("");
requestHeader.setDefaultTopicQueueNums(0);
requestHeader.setDefaultTopic(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC);
requestHeader.setDefaultTopicQueueNums(4);
requestHeader.setQueueId(queueId);
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.logging.InternalLogger;
Expand Down Expand Up @@ -166,7 +167,8 @@ public CompletableFuture<SendResult> sendMessageAsync(
SendMessageRequestHeader requestHeader,
long timeoutMillis
) {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
request.setBody(msg.getBody());

CompletableFuture<SendResult> future = new CompletableFuture<>();
Expand Down Expand Up @@ -196,7 +198,8 @@ public CompletableFuture<SendResult> sendMessageAsync(
SendMessageRequestHeader requestHeader,
long timeoutMillis
) {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_BATCH_MESSAGE, requestHeader);
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_BATCH_MESSAGE, requestHeaderV2);

CompletableFuture<SendResult> future = new CompletableFuture<>();
try {
Expand Down

0 comments on commit 6b45c99

Please sign in to comment.