From 4f38f15da542fdbb710320057b1df36b410d06f3 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 4 Oct 2024 16:50:02 +0200 Subject: [PATCH] Remove context from mvcc public interface Signed-off-by: Marek Siarkowicz --- .../api/v3rpc/validationfuzz_test.go | 7 +- server/etcdserver/apply/apply.go | 8 +- server/etcdserver/txn/txn.go | 12 ++- server/etcdserver/txn/txn_test.go | 80 +------------------ server/etcdserver/v3_server.go | 4 +- 5 files changed, 18 insertions(+), 93 deletions(-) diff --git a/server/etcdserver/api/v3rpc/validationfuzz_test.go b/server/etcdserver/api/v3rpc/validationfuzz_test.go index d921c9602b7..813a1f9b937 100644 --- a/server/etcdserver/api/v3rpc/validationfuzz_test.go +++ b/server/etcdserver/api/v3rpc/validationfuzz_test.go @@ -15,7 +15,6 @@ package v3rpc import ( - "context" "testing" "go.uber.org/zap/zaptest" @@ -167,15 +166,11 @@ func execTransaction(t *testing.T, req *pb.RequestOp) { s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{}) defer s.Close() - // setup cancelled context - ctx, cancel := context.WithCancel(context.TODO()) - cancel() - request := &pb.TxnRequest{ Success: []*pb.RequestOp{req}, } - _, _, err := txn.Txn(ctx, zaptest.NewLogger(t), request, false, s, &lease.FakeLessor{}) + _, _, err := txn.Txn(zaptest.NewLogger(t), request, false, s, &lease.FakeLessor{}) if err != nil { t.Skipf("Application erroring. %s", err.Error()) } diff --git a/server/etcdserver/apply/apply.go b/server/etcdserver/apply/apply.go index 7629787dbbb..b6659471382 100644 --- a/server/etcdserver/apply/apply.go +++ b/server/etcdserver/apply/apply.go @@ -151,19 +151,19 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc) } func (a *applierV3backend) Put(p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { - return mvcctxn.Put(context.TODO(), a.lg, a.lessor, a.kv, p) + return mvcctxn.Put(a.lg, a.lessor, a.kv, p) } func (a *applierV3backend) DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) { - return mvcctxn.DeleteRange(context.TODO(), a.lg, a.kv, dr) + return mvcctxn.DeleteRange(a.lg, a.kv, dr) } func (a *applierV3backend) Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) { - return mvcctxn.Range(context.TODO(), a.lg, a.kv, r) + return mvcctxn.Range(a.lg, a.kv, r) } func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { - return mvcctxn.Txn(context.TODO(), a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor) + return mvcctxn.Txn(a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor) } func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) { diff --git a/server/etcdserver/txn/txn.go b/server/etcdserver/txn/txn.go index 8f0e6c4b4a7..df988245cd9 100644 --- a/server/etcdserver/txn/txn.go +++ b/server/etcdserver/txn/txn.go @@ -32,7 +32,8 @@ import ( "go.etcd.io/etcd/server/v3/storage/mvcc" ) -func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { +func Put(lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { + ctx := context.Background() trace = traceutil.Get(ctx) // create put tracing if the trace in context is empty if trace.IsEmpty() { @@ -94,7 +95,8 @@ func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *p return resp, nil } -func DeleteRange(ctx context.Context, lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) { +func DeleteRange(lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) { + ctx := context.Background() trace = traceutil.Get(ctx) // create delete tracing if the trace in context is empty if trace.IsEmpty() { @@ -133,7 +135,8 @@ func deleteRange(ctx context.Context, txnWrite mvcc.TxnWrite, dr *pb.DeleteRange return resp, nil } -func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) { +func Range(lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) { + ctx := context.Background() trace = traceutil.Get(ctx) if trace.IsEmpty() { trace = traceutil.New("range", lg) @@ -249,7 +252,8 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r * return resp, nil } -func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) { +func Txn(lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) { + ctx := context.Background() trace := traceutil.Get(ctx) if trace.IsEmpty() { trace = traceutil.New("transaction", lg) diff --git a/server/etcdserver/txn/txn_test.go b/server/etcdserver/txn/txn_test.go index 54f465baf22..b511d3dbaf6 100644 --- a/server/etcdserver/txn/txn_test.go +++ b/server/etcdserver/txn/txn_test.go @@ -15,8 +15,6 @@ package txn import ( - "context" - "strings" "testing" "time" @@ -223,9 +221,7 @@ func TestCheckTxn(t *testing.T) { t.Run(tc.name, func(t *testing.T) { s, lessor := setup(t, tc.setup) - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - _, _, err := Txn(ctx, zaptest.NewLogger(t), tc.txn, false, s, lessor) + _, _, err := Txn(zaptest.NewLogger(t), tc.txn, false, s, lessor) gotErr := "" if err != nil { @@ -243,9 +239,7 @@ func TestCheckPut(t *testing.T) { t.Run(tc.name, func(t *testing.T) { s, lessor := setup(t, tc.setup) - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - _, _, err := Put(ctx, zaptest.NewLogger(t), lessor, s, tc.op.GetRequestPut()) + _, _, err := Put(zaptest.NewLogger(t), lessor, s, tc.op.GetRequestPut()) gotErr := "" if err != nil { @@ -263,9 +257,7 @@ func TestCheckRange(t *testing.T) { t.Run(tc.name, func(t *testing.T) { s, _ := setup(t, tc.setup) - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - _, _, err := Range(ctx, zaptest.NewLogger(t), s, tc.op.GetRequestRange()) + _, _, err := Range(zaptest.NewLogger(t), s, tc.op.GetRequestRange()) gotErr := "" if err != nil { @@ -304,72 +296,6 @@ func setup(t *testing.T, setup testSetup) (mvcc.KV, lease.Lessor) { return s, lessor } -func TestReadonlyTxnError(t *testing.T) { - b, _ := betesting.NewDefaultTmpBackend(t) - defer betesting.Close(t, b) - s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{}) - defer s.Close() - - // setup cancelled context - ctx, cancel := context.WithCancel(context.TODO()) - cancel() - - // put some data to prevent early termination in rangeKeys - // we are expecting failure on cancelled context check - s.Put([]byte("foo"), []byte("bar"), lease.NoLease) - - txn := &pb.TxnRequest{ - Success: []*pb.RequestOp{ - { - Request: &pb.RequestOp_RequestRange{ - RequestRange: &pb.RangeRequest{ - Key: []byte("foo"), - }, - }, - }, - }, - } - - _, _, err := Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}) - if err == nil || !strings.Contains(err.Error(), "applyTxn: failed Range: rangeKeys: context cancelled: context canceled") { - t.Fatalf("Expected context canceled error, got %v", err) - } -} - -func TestWriteTxnPanic(t *testing.T) { - b, _ := betesting.NewDefaultTmpBackend(t) - defer betesting.Close(t, b) - s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{}) - defer s.Close() - - // setup cancelled context - ctx, cancel := context.WithCancel(context.TODO()) - cancel() - - // write txn that puts some data and then fails in range due to cancelled context - txn := &pb.TxnRequest{ - Success: []*pb.RequestOp{ - { - Request: &pb.RequestOp_RequestPut{ - RequestPut: &pb.PutRequest{ - Key: []byte("foo"), - Value: []byte("bar"), - }, - }, - }, - { - Request: &pb.RequestOp_RequestRange{ - RequestRange: &pb.RangeRequest{ - Key: []byte("foo"), - }, - }, - }, - }, - } - - assert.Panics(t, func() { Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}) }, "Expected panic in Txn with writes") -} - func TestCheckTxnAuth(t *testing.T) { be, _ := betesting.NewDefaultTmpBackend(t) defer betesting.Close(t, be) diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 803f931f5f7..9788e84b6cc 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -132,7 +132,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) } - get := func() { resp, _, err = txn.Range(ctx, s.Logger(), s.KV(), r) } + get := func() { resp, _, err = txn.Range(s.Logger(), s.KV(), r) } if serr := s.doSerialize(ctx, chk, get); serr != nil { err = serr return nil, err @@ -183,7 +183,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse }(time.Now()) get := func() { - resp, _, err = txn.Txn(ctx, s.Logger(), r, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.KV(), s.lessor) + resp, _, err = txn.Txn(s.Logger(), r, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.KV(), s.lessor) } if serr := s.doSerialize(ctx, chk, get); serr != nil { return nil, serr