Skip to content

Commit

Permalink
golang: Add namespace in Resource and metadata (#753)
Browse files Browse the repository at this point in the history
* metadata和resource支持namespace字段

* add ut

---------

Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
  • Loading branch information
guyinyou and guyinyou committed May 16, 2024
1 parent 78a347e commit 5790fc4
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 23 deletions.
5 changes: 4 additions & 1 deletion golang/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ func (cli *defaultClient) queryRoute(ctx context.Context, topic string, duration
func (cli *defaultClient) getQueryRouteRequest(topic string) *v2.QueryRouteRequest {
return &v2.QueryRouteRequest{
Topic: &v2.Resource{
Name: topic,
Name: topic,
ResourceNamespace: cli.config.NameSpace,
},
Endpoints: cli.accessPoint,
}
Expand Down Expand Up @@ -599,6 +600,8 @@ func (cli *defaultClient) Sign(ctx context.Context) context.Context {
innerMD.VersionValue,
innerMD.ClintID,
cli.clientID,
innerMD.NameSpace,
cli.config.NameSpace,
innerMD.DateTime,
now,
innerMD.Authorization,
Expand Down
6 changes: 4 additions & 2 deletions golang/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ func TestRestoreDefaultClientSessionTwoErrors(t *testing.T) {
func Test_routeEqual(t *testing.T) {
oldMq := &v2.MessageQueue{
Topic: &v2.Resource{
Name: "topic-test",
Name: "topic-test",
ResourceNamespace: "ns-test",
},
Id: 0,
Permission: v2.Permission_READ_WRITE,
Expand All @@ -313,7 +314,8 @@ func Test_routeEqual(t *testing.T) {
}
newMq := &v2.MessageQueue{
Topic: &v2.Resource{
Name: "topic-test",
Name: "topic-test",
ResourceNamespace: "ns-test",
},
Id: 0,
Permission: v2.Permission_READ_WRITE,
Expand Down
10 changes: 5 additions & 5 deletions golang/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package metadata

const (
LanguageKey = "x-mq-language"
ProtocolKey = "x-mq-protocol"
RequestID = "x-mq-request-id"
VersionKey = "x-mq-client-version"
// NameSpace = "x-mq-namespace"
LanguageKey = "x-mq-language"
ProtocolKey = "x-mq-protocol"
RequestID = "x-mq-request-id"
VersionKey = "x-mq-client-version"
NameSpace = "x-mq-namespace"
DateTime = "x-mq-date-time"
ClintID = "x-mq-client-id"
Authorization = "authorization"
Expand Down
13 changes: 8 additions & 5 deletions golang/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ var NewProducer = func(config *Config, opts ...ProducerOption) (Producer, error)
}
for _, topic := range po.topics {
topicResource := &v2.Resource{
Name: topic,
Name: topic,
ResourceNamespace: config.NameSpace,
}
p.pSetting.topics.Store(topic, topicResource)
}
Expand Down Expand Up @@ -287,7 +288,7 @@ func (p *defaultProducer) send0(ctx context.Context, msgs []*UnifiedMessage, txE
var err error
pubMessage = uMsg.pubMsg
if uMsg.pubMsg == nil {
pubMessage, err = NewPublishingMessage(msg, p.pSetting, txEnabled)
pubMessage, err = NewPublishingMessage(msg, p.cli.config.NameSpace, p.pSetting, txEnabled)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -315,7 +316,8 @@ func (p *defaultProducer) send0(ctx context.Context, msgs []*UnifiedMessage, txE
}
if _, ok := p.pSetting.topics.Load(topicName); !ok {
p.pSetting.topics.Store(topicName, &v2.Resource{
Name: topicName,
Name: topicName,
ResourceNamespace: p.cli.config.NameSpace,
})
}
pubLoadBalancer, err := p.getPublishingTopicRouteResult(ctx, topicName)
Expand Down Expand Up @@ -362,7 +364,7 @@ func (p *defaultProducer) SendWithTransaction(ctx context.Context, msg *Message,
return nil, fmt.Errorf("producer is not running")
}
t := transaction.(*transactionImpl)
pubMessage, err := t.tryAddMessage(msg)
pubMessage, err := t.tryAddMessage(msg, p.cli.config.NameSpace)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -394,7 +396,8 @@ func (p *defaultProducer) endTransaction(ctx context.Context, endpoints *v2.Endp
ctx = p.cli.Sign(ctx)
request := &v2.EndTransactionRequest{
Topic: &v2.Resource{
Name: messageCommon.topic,
Name: messageCommon.topic,
ResourceNamespace: p.cli.config.NameSpace,
},
MessageId: messageId,
TransactionId: transactionId,
Expand Down
8 changes: 6 additions & 2 deletions golang/publishing_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
)

type PublishingMessage struct {
namespace string
msg *Message
encoding v2.Encoding
messageId string
messageType v2.MessageType
traceContext *string
}

var NewPublishingMessage = func(msg *Message, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) {
var NewPublishingMessage = func(msg *Message, namespace string, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) {
if msg == nil {
return nil, fmt.Errorf("message is nil")
}
Expand All @@ -51,6 +52,8 @@ var NewPublishingMessage = func(msg *Message, settings *producerSettings, txEnab
// No need to compress message body.
pMsg.encoding = v2.Encoding_IDENTITY

pMsg.namespace = namespace

// Generate message id.
pMsg.messageId = GetMessageIdCodecInstance().NextMessageId().String()
// Normal message.
Expand Down Expand Up @@ -84,7 +87,8 @@ func (pMsg *PublishingMessage) toProtobuf() (*v2.Message, error) {
msg := &v2.Message{
Topic: &v2.Resource{
// ResourceNamespace: b.conn.Config().NameSpace,
Name: pMsg.msg.Topic,
Name: pMsg.msg.Topic,
ResourceNamespace: pMsg.namespace,
},
SystemProperties: &v2.SystemProperties{
Keys: pMsg.msg.GetKeys(),
Expand Down
37 changes: 37 additions & 0 deletions golang/publishing_message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package golang

import "testing"

func TestNewPublishingMessage(t *testing.T) {
namespace := "ns-test"
pSetting := &producerSettings{}
msg := &Message{}
pMsg, err := NewPublishingMessage(msg, namespace, pSetting, false)
if err != nil {
t.Error(err)
}
v2Msg, err := pMsg.toProtobuf()
if err != nil {
t.Error(err)
}
if v2Msg.GetTopic().GetResourceNamespace() != namespace {
t.Error("namespace not equal")
}
}
15 changes: 10 additions & 5 deletions golang/simple_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@ func (sc *defaultSimpleConsumer) changeInvisibleDuration0(messageView *MessageVi
ctx := sc.cli.Sign(context.Background())
request := &v2.ChangeInvisibleDurationRequest{
Topic: &v2.Resource{
Name: messageView.GetTopic(),
Name: messageView.GetTopic(),
ResourceNamespace: sc.cli.config.NameSpace,
},
Group: &v2.Resource{
Name: sc.groupName,
Name: sc.groupName,
ResourceNamespace: sc.cli.config.NameSpace,
},
ReceiptHandle: messageView.GetReceiptHandle(),
InvisibleDuration: durationpb.New(invisibleDuration),
Expand Down Expand Up @@ -166,7 +168,8 @@ func (sc *defaultSimpleConsumer) wrapReceiveMessageRequest(batchSize int, messag

return &v2.ReceiveMessageRequest{
Group: &v2.Resource{
Name: sc.groupName,
Name: sc.groupName,
ResourceNamespace: sc.cli.config.NameSpace,
},
MessageQueue: messageQueue,
FilterExpression: &v2.FilterExpression{
Expand All @@ -183,7 +186,8 @@ func (sc *defaultSimpleConsumer) wrapAckMessageRequest(messageView *MessageView)
return &v2.AckMessageRequest{
Group: sc.scSettings.groupName,
Topic: &v2.Resource{
Name: messageView.GetTopic(),
Name: messageView.GetTopic(),
ResourceNamespace: sc.cli.config.NameSpace,
},
Entries: []*v2.AckMessageEntry{
{
Expand Down Expand Up @@ -369,7 +373,8 @@ var NewSimpleConsumer = func(config *Config, opts ...SimpleConsumerOption) (Simp
requestTimeout: sc.cli.opts.timeout,

groupName: &v2.Resource{
Name: sc.groupName,
Name: sc.groupName,
ResourceNamespace: config.NameSpace,
},
longPollingTimeout: scOpts.awaitDuration,
subscriptionExpressions: scOpts.subscriptionExpressions,
Expand Down
3 changes: 2 additions & 1 deletion golang/simple_consumer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func (sc *simpleConsumerSettings) toProtobuf() *v2.Settings {
subscriptions := make([]*v2.SubscriptionEntry, 0)
for k, v := range sc.subscriptionExpressions {
topic := &v2.Resource{
Name: k,
Name: k,
ResourceNamespace: sc.groupName.GetResourceNamespace(),
}
filterExpression := &v2.FilterExpression{
Expression: v.expression,
Expand Down
4 changes: 2 additions & 2 deletions golang/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (t *transactionImpl) RollBack() error {
return nil
}

func (t *transactionImpl) tryAddMessage(message *Message) (*PublishingMessage, error) {
func (t *transactionImpl) tryAddMessage(message *Message, namespace string) (*PublishingMessage, error) {
t.messagesLock.RLock()
if len(t.messages) > MAX_MESSAGE_NUM {
return nil, fmt.Errorf("message in transaction has exceeded the threshold: %d", MAX_MESSAGE_NUM)
Expand All @@ -100,7 +100,7 @@ func (t *transactionImpl) tryAddMessage(message *Message) (*PublishingMessage, e
if len(t.messages) > MAX_MESSAGE_NUM {
return nil, fmt.Errorf("message in transaction has exceeded the threshold: %d", MAX_MESSAGE_NUM)
}
pubMessage, err := NewPublishingMessage(message, t.producerImpl.(*defaultProducer).pSetting, true)
pubMessage, err := NewPublishingMessage(message, namespace, t.producerImpl.(*defaultProducer).pSetting, true)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 5790fc4

Please sign in to comment.