Skip to content

Commit

Permalink
Remove context from appliers
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Oct 4, 2024
1 parent 620a00b commit 08d54bc
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 59 deletions.
38 changes: 19 additions & 19 deletions server/etcdserver/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ type applierV3 interface {
// delegates the actual execution to the applyFunc method.
Apply(r *pb.InternalRaftRequest, applyFunc applyFunc) *Result

Put(ctx context.Context, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error)
DeleteRange(ctx context.Context, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error)
Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error)
Put(p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error)
DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error)
Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error)
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error)

LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
Expand Down Expand Up @@ -150,20 +150,20 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc)
return applyFunc(r)
}

func (a *applierV3backend) Put(ctx context.Context, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
return mvcctxn.Put(ctx, a.lg, a.lessor, a.kv, p)
func (a *applierV3backend) Put(p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
return mvcctxn.Put(context.TODO(), a.lg, a.lessor, a.kv, p)
}

func (a *applierV3backend) DeleteRange(ctx context.Context, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
return mvcctxn.DeleteRange(ctx, a.lg, a.kv, dr)
func (a *applierV3backend) DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
return mvcctxn.DeleteRange(context.TODO(), a.lg, a.kv, dr)
}

func (a *applierV3backend) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
return mvcctxn.Range(ctx, a.lg, a.kv, r)
func (a *applierV3backend) Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
return mvcctxn.Range(context.TODO(), a.lg, a.kv, r)
}

func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
return mvcctxn.Txn(ctx, a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor)
func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
return mvcctxn.Txn(context.TODO(), a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor)
}

func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
Expand Down Expand Up @@ -248,15 +248,15 @@ type applierV3Capped struct {
// with Puts so that the number of keys in the store is capped.
func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }

func (a *applierV3Capped) Put(_ context.Context, _ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
func (a *applierV3Capped) Put(_ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
return nil, nil, errors.ErrNoSpace
}

func (a *applierV3Capped) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
if a.q.Cost(r) > 0 {
return nil, nil, errors.ErrNoSpace
}
return a.applierV3.Txn(ctx, r)
return a.applierV3.Txn(r)
}

func (a *applierV3Capped) LeaseGrant(_ *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
Expand Down Expand Up @@ -454,18 +454,18 @@ func newQuotaApplierV3(lg *zap.Logger, quotaBackendBytesCfg int64, be backend.Ba
return &quotaApplierV3{app, serverstorage.NewBackendQuota(lg, quotaBackendBytesCfg, be, "v3-applier")}
}

func (a *quotaApplierV3) Put(ctx context.Context, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
func (a *quotaApplierV3) Put(p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
ok := a.q.Available(p)
resp, trace, err := a.applierV3.Put(ctx, p)
resp, trace, err := a.applierV3.Put(p)
if err == nil && !ok {
err = errors.ErrNoSpace
}
return resp, trace, err
}

func (a *quotaApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
ok := a.q.Available(rt)
resp, trace, err := a.applierV3.Txn(ctx, rt)
resp, trace, err := a.applierV3.Txn(rt)
if err == nil && !ok {
err = errors.ErrNoSpace
}
Expand Down
17 changes: 8 additions & 9 deletions server/etcdserver/apply/apply_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package apply

import (
"context"
"sync"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
Expand Down Expand Up @@ -63,7 +62,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc) *
return ret
}

func (aa *authApplierV3) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
func (aa *authApplierV3) Put(r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil {
return nil, nil, err
}
Expand All @@ -82,17 +81,17 @@ func (aa *authApplierV3) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResp
return nil, nil, err
}
}
return aa.applierV3.Put(ctx, r)
return aa.applierV3.Put(r)
}

func (aa *authApplierV3) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
func (aa *authApplierV3) Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
return nil, nil, err
}
return aa.applierV3.Range(ctx, r)
return aa.applierV3.Range(r)
}

func (aa *authApplierV3) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
func (aa *authApplierV3) DeleteRange(r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
if err := aa.as.IsDeleteRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
return nil, nil, err
}
Expand All @@ -103,14 +102,14 @@ func (aa *authApplierV3) DeleteRange(ctx context.Context, r *pb.DeleteRangeReque
}
}

return aa.applierV3.DeleteRange(ctx, r)
return aa.applierV3.DeleteRange(r)
}

func (aa *authApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
func (aa *authApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
if err := txn.CheckTxnAuth(aa.as, &aa.authInfo, rt); err != nil {
return nil, nil, err
}
return aa.applierV3.Txn(ctx, rt)
return aa.applierV3.Txn(rt)
}

func (aa *authApplierV3) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
Expand Down
27 changes: 8 additions & 19 deletions server/etcdserver/apply/apply_auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package apply

import (
"context"
"errors"
"testing"
"time"
Expand Down Expand Up @@ -437,12 +436,10 @@ func TestAuthApplierV3_Put(t *testing.T) {

authApplier := defaultAuthApplierV3(t)
mustCreateRolesAndEnableAuth(t, authApplier)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
setAuthInfo(authApplier, tc.userName)
_, _, err := authApplier.Put(ctx, tc.request)
_, _, err := authApplier.Put(tc.request)
require.Equalf(t, tc.expectError, err, "Put returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err)
})
}
Expand All @@ -452,8 +449,6 @@ func TestAuthApplierV3_Put(t *testing.T) {
func TestAuthApplierV3_LeasePut(t *testing.T) {
authApplier := defaultAuthApplierV3(t)
mustCreateRolesAndEnableAuth(t, authApplier)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

_, err := authApplier.LeaseGrant(&pb.LeaseGrantRequest{
TTL: lease.MaxLeaseTTL,
Expand All @@ -463,7 +458,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {

// The user should be able to put the key
setAuthInfo(authApplier, userWriteOnly)
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
_, _, err = authApplier.Put(&pb.PutRequest{
Key: []byte(key),
Value: []byte("1"),
Lease: leaseID,
Expand All @@ -472,7 +467,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {

// Put a key under the lease outside user's key range
setAuthInfo(authApplier, userRoot)
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
_, _, err = authApplier.Put(&pb.PutRequest{
Key: []byte(keyOutsideRange),
Value: []byte("1"),
Lease: leaseID,
Expand All @@ -481,7 +476,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {

// The user should not be able to put the key anymore
setAuthInfo(authApplier, userWriteOnly)
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
_, _, err = authApplier.Put(&pb.PutRequest{
Key: []byte(key),
Value: []byte("1"),
Lease: leaseID,
Expand Down Expand Up @@ -524,12 +519,10 @@ func TestAuthApplierV3_Range(t *testing.T) {

authApplier := defaultAuthApplierV3(t)
mustCreateRolesAndEnableAuth(t, authApplier)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
setAuthInfo(authApplier, tc.userName)
_, _, err := authApplier.Range(ctx, tc.request)
_, _, err := authApplier.Range(tc.request)
require.Equalf(t, tc.expectError, err, "Range returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err)
})
}
Expand Down Expand Up @@ -593,7 +586,7 @@ func TestAuthApplierV3_DeleteRange(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
setAuthInfo(authApplier, tc.userName)
_, _, err := authApplier.DeleteRange(context.Background(), tc.request)
_, _, err := authApplier.DeleteRange(tc.request)
require.Equalf(t, tc.expectError, err, "Range returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err)
})
}
Expand Down Expand Up @@ -660,12 +653,10 @@ func TestAuthApplierV3_Txn(t *testing.T) {

authApplier := defaultAuthApplierV3(t)
mustCreateRolesAndEnableAuth(t, authApplier)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
setAuthInfo(authApplier, tc.userName)
_, _, err := authApplier.Txn(ctx, tc.request)
_, _, err := authApplier.Txn(tc.request)
require.Equalf(t, tc.expectError, err, "Range returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err)
})
}
Expand All @@ -676,8 +667,6 @@ func TestAuthApplierV3_Txn(t *testing.T) {
func TestAuthApplierV3_LeaseRevoke(t *testing.T) {
authApplier := defaultAuthApplierV3(t)
mustCreateRolesAndEnableAuth(t, authApplier)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

_, err := authApplier.LeaseGrant(&pb.LeaseGrantRequest{
TTL: lease.MaxLeaseTTL,
Expand All @@ -700,7 +689,7 @@ func TestAuthApplierV3_LeaseRevoke(t *testing.T) {

// Put a key under the lease outside user's key range
setAuthInfo(authApplier, userRoot)
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
_, _, err = authApplier.Put(&pb.PutRequest{
Key: []byte(keyOutsideRange),
Value: []byte("1"),
Lease: leaseID,
Expand Down
10 changes: 4 additions & 6 deletions server/etcdserver/apply/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package apply

import (
"context"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/etcdserver/errors"
Expand All @@ -28,19 +26,19 @@ type applierV3Corrupt struct {

func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} }

func (a *applierV3Corrupt) Put(_ context.Context, _ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
func (a *applierV3Corrupt) Put(_ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
return nil, nil, errors.ErrCorrupt
}

func (a *applierV3Corrupt) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
func (a *applierV3Corrupt) Range(_ *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
return nil, nil, errors.ErrCorrupt
}

func (a *applierV3Corrupt) DeleteRange(_ context.Context, _ *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
func (a *applierV3Corrupt) DeleteRange(_ *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
return nil, nil, errors.ErrCorrupt
}

func (a *applierV3Corrupt) Txn(_ context.Context, _ *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
func (a *applierV3Corrupt) Txn(_ *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
return nil, nil, errors.ErrCorrupt
}

Expand Down
10 changes: 4 additions & 6 deletions server/etcdserver/apply/uber_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package apply

import (
"context"
"errors"
"time"

Expand Down Expand Up @@ -121,7 +120,6 @@ func (a *uberApplier) Apply(r *pb.InternalRaftRequest) *Result {
// dispatch translates the request (r) into appropriate call (like Put) on
// the underlying applyV3 object.
func (a *uberApplier) dispatch(r *pb.InternalRaftRequest) *Result {
ctx := context.TODO()
op := "unknown"
ar := &Result{}
defer func(start time.Time) {
Expand All @@ -136,16 +134,16 @@ func (a *uberApplier) dispatch(r *pb.InternalRaftRequest) *Result {
switch {
case r.Range != nil:
op = "Range"
ar.Resp, ar.Trace, ar.Err = a.applyV3.Range(ctx, r.Range)
ar.Resp, ar.Trace, ar.Err = a.applyV3.Range(r.Range)
case r.Put != nil:
op = "Put"
ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(ctx, r.Put)
ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(r.Put)
case r.DeleteRange != nil:
op = "DeleteRange"
ar.Resp, ar.Trace, ar.Err = a.applyV3.DeleteRange(ctx, r.DeleteRange)
ar.Resp, ar.Trace, ar.Err = a.applyV3.DeleteRange(r.DeleteRange)
case r.Txn != nil:
op = "Txn"
ar.Resp, ar.Trace, ar.Err = a.applyV3.Txn(ctx, r.Txn)
ar.Resp, ar.Trace, ar.Err = a.applyV3.Txn(r.Txn)
case r.Compaction != nil:
op = "Compaction"
ar.Resp, ar.Physc, ar.Trace, ar.Err = a.applyV3.Compaction(r.Compaction)
Expand Down

0 comments on commit 08d54bc

Please sign in to comment.