diff --git a/pubsub/go.mod b/pubsub/go.mod index 93bb409bedc4..fe9e9a7bb8c8 100644 --- a/pubsub/go.mod +++ b/pubsub/go.mod @@ -7,6 +7,7 @@ require ( cloud.google.com/go/iam v1.1.8 cloud.google.com/go/kms v1.17.1 github.com/google/go-cmp v0.6.0 + github.com/google/uuid v1.6.0 github.com/googleapis/gax-go/v2 v2.12.5 go.einride.tech/aip v0.67.1 go.opencensus.io v0.24.0 diff --git a/pubsub/go.sum b/pubsub/go.sum index 7adcbdaa0551..b7ada61b1b70 100644 --- a/pubsub/go.sum +++ b/pubsub/go.sum @@ -59,6 +59,7 @@ github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/gax-go/v2 v2.12.5 h1:8gw9KZK8TiVKB6q3zHY3SBzLnrGp6HQjyfYBYGmXdxA= diff --git a/pubsub/iterator.go b/pubsub/iterator.go index 85740b57a469..e06d52ded4af 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -121,7 +121,7 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt maxMessages = 0 maxBytes = 0 } - ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes, po.maxExtensionPeriod) + ps = newPullStream(context.Background(), subc.StreamingPull, subName, po.clientID, maxMessages, maxBytes, po.maxExtensionPeriod) } // The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending // the first keepAlive halfway towards the minimum ack deadline. diff --git a/pubsub/pullstream.go b/pubsub/pullstream.go index bfdc11e2a02e..c5ea8f510af8 100644 --- a/pubsub/pullstream.go +++ b/pubsub/pullstream.go @@ -42,7 +42,7 @@ type pullStream struct { // for testing type streamingPullFunc func(context.Context, ...gax.CallOption) (pb.Subscriber_StreamingPullClient, error) -func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName string, maxOutstandingMessages, maxOutstandingBytes int, maxDurationPerLeaseExtension time.Duration) *pullStream { +func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName, clientID string, maxOutstandingMessages, maxOutstandingBytes int, maxDurationPerLeaseExtension time.Duration) *pullStream { ctx = withSubscriptionKey(ctx, subName) hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(subName))} ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...) @@ -62,6 +62,7 @@ func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName } err = spc.Send(&pb.StreamingPullRequest{ Subscription: subName, + ClientId: clientID, StreamAckDeadlineSeconds: streamAckDeadline, MaxOutstandingMessages: int64(maxOutstandingMessages), MaxOutstandingBytes: int64(maxOutstandingBytes), diff --git a/pubsub/pullstream_test.go b/pubsub/pullstream_test.go index 091cab445b49..b8282fe7ec83 100644 --- a/pubsub/pullstream_test.go +++ b/pubsub/pullstream_test.go @@ -67,7 +67,7 @@ func TestPullStreamGet(t *testing.T) { test.errors = test.errors[1:] return &testStreamingPullClient{sendError: err}, nil } - ps := newPullStream(context.Background(), streamingPull, "", 100, 1000, 0) + ps := newPullStream(context.Background(), streamingPull, "", "", 100, 1000, 0) _, err := ps.get(nil) if got := status.Code(err); got != test.wantCode { t.Errorf("%s: got %s, want %s", test.desc, got, test.wantCode) diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 15e72e55a44b..51e5cf8a0ddf 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -27,6 +27,7 @@ import ( "cloud.google.com/go/internal/optional" pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" "cloud.google.com/go/pubsub/internal/scheduler" + "github.com/google/uuid" gax "github.com/googleapis/gax-go/v2" "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" @@ -50,6 +51,11 @@ type Subscription struct { mu sync.Mutex receiveActive bool + + // clientID to be used across all streaming pull connections that are created. + // This indicates to the server that any guarantees made for a stream that + // disconnected will be made for the stream that is created to replace it. + clientID string } // Subscription creates a reference to a subscription. @@ -60,8 +66,9 @@ func (c *Client) Subscription(id string) *Subscription { // SubscriptionInProject creates a reference to a subscription in a given project. func (c *Client) SubscriptionInProject(id, projectID string) *Subscription { return &Subscription{ - c: c, - name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id), + c: c, + name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id), + clientID: uuid.NewString(), } } @@ -1280,6 +1287,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes maxOutstandingMessages: maxCount, maxOutstandingBytes: maxBytes, useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl, + clientID: s.clientID, } fc := newSubscriptionFlowController(FlowControlSettings{ MaxOutstandingMessages: maxCount, @@ -1446,4 +1454,5 @@ type pullOptions struct { maxOutstandingMessages int maxOutstandingBytes int useLegacyFlowControl bool + clientID string }