diff --git a/br/pkg/backup/prepare_snap/env.go b/br/pkg/backup/prepare_snap/env.go index e0998adc392e3..afefe8d8b4aa4 100644 --- a/br/pkg/backup/prepare_snap/env.go +++ b/br/pkg/backup/prepare_snap/env.go @@ -17,6 +17,7 @@ package preparesnap import ( "context" "slices" + "sync" "time" "github.com/docker/go-units" @@ -110,6 +111,34 @@ func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) { return withoutTiFlash, err } +func AdaptForGRPCInTest(p PrepareClient) PrepareClient { + return &gRPCGoAdapter{ + inner: p, + } +} + +// GrpcGoAdapter makes the `Send` call synchronous. +// grpc-go doesn't guarantee concurrency call to `Send` or `Recv` is safe. +// But concurrency call to `send` and `recv` is safe. +// This type is exported for testing. +type gRPCGoAdapter struct { + inner PrepareClient + sendMu sync.Mutex + recvMu sync.Mutex +} + +func (s *gRPCGoAdapter) Send(req *brpb.PrepareSnapshotBackupRequest) error { + s.sendMu.Lock() + defer s.sendMu.Unlock() + return s.inner.Send(req) +} + +func (s *gRPCGoAdapter) Recv() (*brpb.PrepareSnapshotBackupResponse, error) { + s.recvMu.Lock() + defer s.recvMu.Unlock() + return s.inner.Recv() +} + func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) { var cli brpb.Backup_PrepareSnapshotBackupClient err := c.Mgr.TryWithConn(ctx, storeID, func(cc *grpc.ClientConn) error { @@ -124,7 +153,7 @@ func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClie if err != nil { return nil, err } - return cli, nil + return &gRPCGoAdapter{inner: cli}, nil } func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) { diff --git a/br/pkg/backup/prepare_snap/prepare_test.go b/br/pkg/backup/prepare_snap/prepare_test.go index 5f3d2e28d44dc..f635d729833af 100644 --- a/br/pkg/backup/prepare_snap/prepare_test.go +++ b/br/pkg/backup/prepare_snap/prepare_test.go @@ -177,7 +177,7 @@ func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (Prepar } m.onCreateStore(m.stores[storeID]) } - return m.stores[storeID], nil + return AdaptForGRPCInTest(m.stores[storeID]), nil } func (m *mockStores) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {