diff --git a/client/v3/kubernetes/client.go b/client/v3/kubernetes/client.go index 4ca0e5fe8c1..11f2a456447 100644 --- a/client/v3/kubernetes/client.go +++ b/client/v3/kubernetes/client.go @@ -16,6 +16,7 @@ package kubernetes import ( "context" + "fmt" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" @@ -31,7 +32,6 @@ func New(cfg clientv3.Config) (*Client, error) { } kc := &Client{ Client: c, - kv: clientv3.RetryKVClient(c), } kc.Kubernetes = kc return kc, nil @@ -40,15 +40,14 @@ func New(cfg clientv3.Config) (*Client, error) { type Client struct { *clientv3.Client Kubernetes Interface - kv pb.KVClient } var _ Interface = (*Client)(nil) func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetResponse, err error) { - rangeResp, err := k.kv.Range(ctx, getRequest(key, opts.Revision)) + rangeResp, err := k.KV.Get(ctx, key, clientv3.WithRev(opts.Revision), clientv3.WithLimit(1)) if err != nil { - return resp, clientv3.ContextError(ctx, err) + return resp, err } resp.Revision = rangeResp.Header.Revision if len(rangeResp.Kvs) == 1 { @@ -58,17 +57,14 @@ func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetR } func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp ListResponse, err error) { - rangeStart := prefix + opts.Continue + rangeStart := prefix + if opts.Continue != "" { + rangeStart = opts.Continue + } rangeEnd := clientv3.GetPrefixRangeEnd(prefix) - - rangeResp, err := k.kv.Range(ctx, &pb.RangeRequest{ - Key: []byte(rangeStart), - RangeEnd: []byte(rangeEnd), - Limit: opts.Limit, - Revision: opts.Revision, - }) + rangeResp, err := k.KV.Get(ctx, rangeStart, clientv3.WithRange(rangeEnd), clientv3.WithLimit(opts.Limit), clientv3.WithRev(opts.Revision)) if err != nil { - return resp, clientv3.ContextError(ctx, err) + return resp, err } resp.Kvs = rangeResp.Kvs resp.Count = rangeResp.Count @@ -77,48 +73,51 @@ func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp } func (k Client) Count(ctx context.Context, prefix string, _ CountOptions) (int64, error) { - resp, err := k.kv.Range(ctx, &pb.RangeRequest{ - Key: []byte(prefix), - RangeEnd: []byte(clientv3.GetPrefixRangeEnd(prefix)), - CountOnly: true, - }) + resp, err := k.KV.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithCountOnly()) if err != nil { - return 0, clientv3.ContextError(ctx, err) + return 0, err } return resp.Count, nil } func (k Client) OptimisticPut(ctx context.Context, key string, value []byte, expectedRevision int64, opts PutOptions) (resp PutResponse, err error) { - onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(key), Value: value, Lease: int64(opts.LeaseID)}}} + txn := k.KV.Txn(ctx).If( + clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision), + ).Then( + clientv3.OpPut(key, string(value), clientv3.WithLease(opts.LeaseID)), + ) - var onFailure *pb.RequestOp if opts.GetOnFailure { - onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}} + txn = txn.Else(clientv3.OpGet(key)) } - txnResp, err := k.optimisticTxn(ctx, key, expectedRevision, onSuccess, onFailure) + txnResp, err := txn.Commit() if err != nil { - return resp, clientv3.ContextError(ctx, err) + return resp, err } resp.Succeeded = txnResp.Succeeded resp.Revision = txnResp.Header.Revision if opts.GetOnFailure && !txnResp.Succeeded { + if len(txnResp.Responses) == 0 { + return resp, fmt.Errorf("invalid OptimisticPut response: %v", txnResp.Responses) + } resp.KV = kvFromTxnResponse(txnResp.Responses[0]) } return resp, nil } func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevision int64, opts DeleteOptions) (resp DeleteResponse, err error) { - onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}} - - var onFailure *pb.RequestOp + txn := k.KV.Txn(ctx).If( + clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision), + ).Then( + clientv3.OpDelete(key), + ) if opts.GetOnFailure { - onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}} + txn = txn.Else(clientv3.OpGet(key)) } - - txnResp, err := k.optimisticTxn(ctx, key, expectedRevision, onSuccess, onFailure) + txnResp, err := txn.Commit() if err != nil { - return resp, clientv3.ContextError(ctx, err) + return resp, err } resp.Succeeded = txnResp.Succeeded resp.Revision = txnResp.Header.Revision @@ -128,34 +127,6 @@ func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevisi return resp, nil } -func (k Client) optimisticTxn(ctx context.Context, key string, expectedRevision int64, onSuccess, onFailure *pb.RequestOp) (*pb.TxnResponse, error) { - txn := &pb.TxnRequest{ - Compare: []*pb.Compare{ - { - Result: pb.Compare_EQUAL, - Target: pb.Compare_MOD, - Key: []byte(key), - TargetUnion: &pb.Compare_ModRevision{ModRevision: expectedRevision}, - }, - }, - } - if onSuccess != nil { - txn.Success = []*pb.RequestOp{onSuccess} - } - if onFailure != nil { - txn.Failure = []*pb.RequestOp{onFailure} - } - return k.kv.Txn(ctx, txn) -} - -func getRequest(key string, revision int64) *pb.RangeRequest { - return &pb.RangeRequest{ - Key: []byte(key), - Revision: revision, - Limit: 1, - } -} - func kvFromTxnResponse(resp *pb.ResponseOp) *mvccpb.KeyValue { getResponse := resp.GetResponseRange() if len(getResponse.Kvs) == 1 {