Skip to content

Commit

Permalink
pubsub: pull messages concurrently
Browse files Browse the repository at this point in the history
Instead of using one goroutine to pull messages,
we allow for arbitrarily many goroutines.
The default is 10*CPU, the same with Java.

Change-Id: Ib8884506cfed204ff6b690f73d7260e0a695d25e
Reviewed-on: https://code-review.googlesource.com/14370
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
  • Loading branch information
pongad committed Jun 30, 2017
1 parent 7c205da commit 927c812
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 24 deletions.
3 changes: 1 addition & 2 deletions pubsub/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import (
"sync"
"time"

"golang.org/x/net/context"

"cloud.google.com/go/internal/testutil"
"golang.org/x/net/context"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
)

Expand Down
13 changes: 5 additions & 8 deletions pubsub/loadtest/loadtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"bytes"
"errors"
"log"
"runtime"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -149,13 +148,11 @@ func (s *SubServer) Start(ctx context.Context, req *pb.StartRequest) (*pb.StartR
}

// Load test API doesn't define any way to stop right now.
for i := 0; i < 30*runtime.GOMAXPROCS(0); i++ {
go func() {
sub := c.Subscription(req.GetPubsubOptions().Subscription)
err := sub.Receive(context.Background(), s.callback)
log.Fatal(err)
}()
}
go func() {
sub := c.Subscription(req.GetPubsubOptions().Subscription)
err := sub.Receive(context.Background(), s.callback)
log.Fatal(err)
}()

log.Println("started")
return &pb.StartResponse{}, nil
Expand Down
2 changes: 1 addition & 1 deletion pubsub/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (p *streamingPuller) openLocked() {
// No opens in flight; start one.
p.inFlight = true
p.c.L.Unlock()
spc, err := p.subc.StreamingPull(p.ctx)
spc, err := p.subc.StreamingPull(p.ctx, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes)))
if err == nil {
err = spc.Send(&pb.StreamingPullRequest{
Subscription: p.subName,
Expand Down
39 changes: 26 additions & 13 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package pubsub
import (
"errors"
"fmt"
"runtime"
"strings"
"sync"
"time"

"cloud.google.com/go/iam"
"golang.org/x/net/context"
"golang.org/x/sync/errgroup"
"google.golang.org/api/iterator"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -147,13 +149,19 @@ type ReceiveSettings struct {
// the value is negative, then there will be no limit on the number of bytes
// for unprocessed messages.
MaxOutstandingBytes int

// NumGoroutines is the number of goroutines Receive will spawn to pull
// messages concurrently. If NumGoroutines is less than 1, it will be treated
// as if it were DefaultReceiveSettings.NumGoroutines.
NumGoroutines int
}

// DefaultReceiveSettings holds the default values for ReceiveSettings.
var DefaultReceiveSettings = ReceiveSettings{
MaxExtension: 10 * time.Minute,
MaxOutstandingMessages: 1000,
MaxOutstandingBytes: 1e9, // 1G
NumGoroutines: 10 * runtime.GOMAXPROCS(0),
}

// Delete deletes the subscription.
Expand Down Expand Up @@ -286,6 +294,10 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
// If MaxExtension is negative, disable automatic extension.
maxExt = 0
}
numGoroutines := s.ReceiveSettings.NumGoroutines
if numGoroutines < 1 {
numGoroutines = DefaultReceiveSettings.NumGoroutines
}
// TODO(jba): add tests that verify that ReceiveSettings are correctly processed.
po := &pullOptions{
maxExtension: maxExt,
Expand All @@ -296,13 +308,16 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes

// Wait for all goroutines started by Receive to return, so instead of an
// obscure goroutine leak we have an obvious blocked call to Receive.
var wg sync.WaitGroup
defer wg.Wait()

return s.receive(ctx, &wg, po, fc, f)
group, gctx := errgroup.WithContext(ctx)
for i := 0; i < numGoroutines; i++ {
group.Go(func() error {
return s.receive(gctx, group, po, fc, f)
})
}
return group.Wait()
}

func (s *Subscription) receive(ctx context.Context, wg *sync.WaitGroup, po *pullOptions, fc *flowController, f func(context.Context, *Message)) error {
func (s *Subscription) receive(ctx context.Context, group *errgroup.Group, po *pullOptions, fc *flowController, f func(context.Context, *Message)) error {
// Cancel a sub-context when we return, to kick the context-aware callbacks
// and the goroutine below.
ctx2, cancel := context.WithCancel(ctx)
Expand All @@ -313,12 +328,11 @@ func (s *Subscription) receive(ctx context.Context, wg *sync.WaitGroup, po *pull
// that context would immediately stop the iterator without waiting for unacked
// messages.
iter := newMessageIterator(context.Background(), s.s, s.name, po)
wg.Add(1)
go func() {
group.Go(func() error {
<-ctx2.Done()
iter.Stop()
wg.Done()
}()
return nil
})
defer cancel()
for {
msg, err := iter.Next()
Expand All @@ -334,15 +348,14 @@ func (s *Subscription) receive(ctx context.Context, wg *sync.WaitGroup, po *pull
msg.Nack()
return nil
}
wg.Add(1)
go func() {
defer wg.Done()
group.Go(func() error {
// TODO(jba): call release when the message is available for GC.
// This considers the message to be released when
// f is finished, but f may ack early or not at all.
defer fc.release(len(msg.Data))
f(ctx2, msg)
}()
return nil
})
}
}

Expand Down

0 comments on commit 927c812

Please sign in to comment.