Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#52051
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
YuJuncen authored and ti-chi-bot committed Mar 25, 2024
1 parent b366852 commit 8fe19d3
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 2 deletions.
35 changes: 34 additions & 1 deletion br/pkg/backup/prepare_snap/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ package preparesnap

import (
"context"
<<<<<<< HEAD
=======
"slices"
"sync"
>>>>>>> 02b277a6671 (operator: make gRPC connections synced (#52051))
"time"

"github.com/docker/go-units"
Expand Down Expand Up @@ -128,6 +133,34 @@ func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) {
return withoutTiFlash, err
}

func AdaptForGRPCInTest(p PrepareClient) PrepareClient {
return &gRPCGoAdapter{
inner: p,
}
}

// GrpcGoAdapter makes the `Send` call synchronous.
// grpc-go doesn't guarantee concurrency call to `Send` or `Recv` is safe.
// But concurrency call to `send` and `recv` is safe.
// This type is exported for testing.
type gRPCGoAdapter struct {
inner PrepareClient
sendMu sync.Mutex
recvMu sync.Mutex
}

func (s *gRPCGoAdapter) Send(req *brpb.PrepareSnapshotBackupRequest) error {
s.sendMu.Lock()
defer s.sendMu.Unlock()
return s.inner.Send(req)
}

func (s *gRPCGoAdapter) Recv() (*brpb.PrepareSnapshotBackupResponse, error) {
s.recvMu.Lock()
defer s.recvMu.Unlock()
return s.inner.Recv()
}

func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
var cli brpb.Backup_PrepareSnapshotBackupClient
err := c.Mgr.TryWithConn(ctx, storeID, func(cc *grpc.ClientConn) error {
Expand All @@ -142,7 +175,7 @@ func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClie
if err != nil {
return nil, err
}
return cli, nil
return &gRPCGoAdapter{inner: cli}, nil
}

func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/prepare_snap/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (Prepar
}
m.onCreateStore(m.stores[storeID])
}
return m.stores[storeID], nil
return AdaptForGRPCInTest(m.stores[storeID]), nil
}

func (m *mockStores) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {
Expand Down

0 comments on commit 8fe19d3

Please sign in to comment.