Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove context from mvcc public interface #18678

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions server/etcdserver/api/v3rpc/validationfuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package v3rpc

import (
"context"
"testing"

"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -167,15 +166,11 @@ func execTransaction(t *testing.T, req *pb.RequestOp) {
s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer s.Close()

// setup cancelled context
ctx, cancel := context.WithCancel(context.TODO())
cancel()

request := &pb.TxnRequest{
Success: []*pb.RequestOp{req},
}

_, _, err := txn.Txn(ctx, zaptest.NewLogger(t), request, false, s, &lease.FakeLessor{})
_, _, err := txn.Txn(zaptest.NewLogger(t), request, false, s, &lease.FakeLessor{})
if err != nil {
t.Skipf("Application erroring. %s", err.Error())
}
Expand Down
8 changes: 4 additions & 4 deletions server/etcdserver/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,19 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc)
}

func (a *applierV3backend) Put(p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
return mvcctxn.Put(context.TODO(), a.lg, a.lessor, a.kv, p)
return mvcctxn.Put(a.lg, a.lessor, a.kv, p)
}

func (a *applierV3backend) DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
return mvcctxn.DeleteRange(context.TODO(), a.lg, a.kv, dr)
return mvcctxn.DeleteRange(a.lg, a.kv, dr)
}

func (a *applierV3backend) Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
return mvcctxn.Range(context.TODO(), a.lg, a.kv, r)
return mvcctxn.Range(a.lg, a.kv, r)
}

func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
return mvcctxn.Txn(context.TODO(), a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor)
return mvcctxn.Txn(a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor)
}

func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
Expand Down
12 changes: 8 additions & 4 deletions server/etcdserver/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
"go.etcd.io/etcd/server/v3/storage/mvcc"
)

func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
func Put(lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
ctx := context.Background()
trace = traceutil.Get(ctx)
// create put tracing if the trace in context is empty
if trace.IsEmpty() {
Expand Down Expand Up @@ -94,7 +95,8 @@ func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *p
return resp, nil
}

func DeleteRange(ctx context.Context, lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) {
func DeleteRange(lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) {
ctx := context.Background()
trace = traceutil.Get(ctx)
// create delete tracing if the trace in context is empty
if trace.IsEmpty() {
Expand Down Expand Up @@ -133,7 +135,8 @@ func deleteRange(ctx context.Context, txnWrite mvcc.TxnWrite, dr *pb.DeleteRange
return resp, nil
}

func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) {
func Range(lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) {
ctx := context.Background()
trace = traceutil.Get(ctx)
if trace.IsEmpty() {
trace = traceutil.New("range", lg)
Expand Down Expand Up @@ -249,7 +252,8 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
return resp, nil
}

func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
func Txn(lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
ctx := context.Background()
trace := traceutil.Get(ctx)
if trace.IsEmpty() {
trace = traceutil.New("transaction", lg)
Expand Down
80 changes: 3 additions & 77 deletions server/etcdserver/txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package txn

import (
"context"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -223,9 +221,7 @@ func TestCheckTxn(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
s, lessor := setup(t, tc.setup)

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
_, _, err := Txn(ctx, zaptest.NewLogger(t), tc.txn, false, s, lessor)
_, _, err := Txn(zaptest.NewLogger(t), tc.txn, false, s, lessor)

gotErr := ""
if err != nil {
Expand All @@ -243,9 +239,7 @@ func TestCheckPut(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
s, lessor := setup(t, tc.setup)

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
_, _, err := Put(ctx, zaptest.NewLogger(t), lessor, s, tc.op.GetRequestPut())
_, _, err := Put(zaptest.NewLogger(t), lessor, s, tc.op.GetRequestPut())

gotErr := ""
if err != nil {
Expand All @@ -263,9 +257,7 @@ func TestCheckRange(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
s, _ := setup(t, tc.setup)

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
_, _, err := Range(ctx, zaptest.NewLogger(t), s, tc.op.GetRequestRange())
_, _, err := Range(zaptest.NewLogger(t), s, tc.op.GetRequestRange())

gotErr := ""
if err != nil {
Expand Down Expand Up @@ -304,72 +296,6 @@ func setup(t *testing.T, setup testSetup) (mvcc.KV, lease.Lessor) {
return s, lessor
}

func TestReadonlyTxnError(t *testing.T) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to double check the original motivation for the test and if we need to rewrite it. Marking the PR as draft until then.

b, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, b)
s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer s.Close()

// setup cancelled context
ctx, cancel := context.WithCancel(context.TODO())
cancel()

// put some data to prevent early termination in rangeKeys
// we are expecting failure on cancelled context check
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)

txn := &pb.TxnRequest{
Success: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestRange{
RequestRange: &pb.RangeRequest{
Key: []byte("foo"),
},
},
},
},
}

_, _, err := Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{})
if err == nil || !strings.Contains(err.Error(), "applyTxn: failed Range: rangeKeys: context cancelled: context canceled") {
t.Fatalf("Expected context canceled error, got %v", err)
}
}

func TestWriteTxnPanic(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, b)
s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer s.Close()

// setup cancelled context
ctx, cancel := context.WithCancel(context.TODO())
cancel()

// write txn that puts some data and then fails in range due to cancelled context
txn := &pb.TxnRequest{
Success: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte("foo"),
Value: []byte("bar"),
},
},
},
{
Request: &pb.RequestOp_RequestRange{
RequestRange: &pb.RangeRequest{
Key: []byte("foo"),
},
},
},
},
}

assert.Panics(t, func() { Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}) }, "Expected panic in Txn with writes")
}

func TestCheckTxnAuth(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}

get := func() { resp, _, err = txn.Range(ctx, s.Logger(), s.KV(), r) }
get := func() { resp, _, err = txn.Range(s.Logger(), s.KV(), r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
err = serr
return nil, err
Expand Down Expand Up @@ -183,7 +183,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
}(time.Now())

get := func() {
resp, _, err = txn.Txn(ctx, s.Logger(), r, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.KV(), s.lessor)
resp, _, err = txn.Txn(s.Logger(), r, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.KV(), s.lessor)
}
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
Expand Down
Loading