Skip to content

Commit

Permalink
Fix passing default grpc call options in Kubernetes client
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Aug 29, 2024
1 parent fe796ab commit 880b3cb
Showing 1 changed file with 26 additions and 59 deletions.
85 changes: 26 additions & 59 deletions client/v3/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func New(cfg clientv3.Config) (*Client, error) {
}
kc := &Client{
Client: c,
kv: clientv3.RetryKVClient(c),
}
kc.Kubernetes = kc
return kc, nil
Expand All @@ -40,15 +39,14 @@ func New(cfg clientv3.Config) (*Client, error) {
type Client struct {
*clientv3.Client
Kubernetes Interface
kv pb.KVClient
}

var _ Interface = (*Client)(nil)

func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetResponse, err error) {
rangeResp, err := k.kv.Range(ctx, getRequest(key, opts.Revision))
rangeResp, err := k.KV.Get(ctx, key, clientv3.WithRev(opts.Revision), clientv3.WithLimit(1))
if err != nil {
return resp, clientv3.ContextError(ctx, err)
return resp, err
}
resp.Revision = rangeResp.Header.Revision
if len(rangeResp.Kvs) == 1 {
Expand All @@ -58,17 +56,14 @@ func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetR
}

func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp ListResponse, err error) {
rangeStart := prefix + opts.Continue
rangeStart := prefix
if opts.Continue != "" {
rangeStart = opts.Continue + "\x00"
}
rangeEnd := clientv3.GetPrefixRangeEnd(prefix)

rangeResp, err := k.kv.Range(ctx, &pb.RangeRequest{
Key: []byte(rangeStart),
RangeEnd: []byte(rangeEnd),
Limit: opts.Limit,
Revision: opts.Revision,
})
rangeResp, err := k.KV.Get(ctx, rangeStart, clientv3.WithRange(rangeEnd), clientv3.WithLimit(opts.Limit), clientv3.WithRev(opts.Revision))
if err != nil {
return resp, clientv3.ContextError(ctx, err)
return resp, err
}
resp.Kvs = rangeResp.Kvs
resp.Count = rangeResp.Count
Expand All @@ -77,28 +72,27 @@ func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp
}

func (k Client) Count(ctx context.Context, prefix string, _ CountOptions) (int64, error) {
resp, err := k.kv.Range(ctx, &pb.RangeRequest{
Key: []byte(prefix),
RangeEnd: []byte(clientv3.GetPrefixRangeEnd(prefix)),
CountOnly: true,
})
resp, err := k.KV.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithCountOnly())
if err != nil {
return 0, clientv3.ContextError(ctx, err)
return 0, err
}
return resp.Count, nil
}

func (k Client) OptimisticPut(ctx context.Context, key string, value []byte, expectedRevision int64, opts PutOptions) (resp PutResponse, err error) {
onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(key), Value: value, Lease: int64(opts.LeaseID)}}}
txn := k.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision),
).Then(
clientv3.OpPut(key, string(value), clientv3.WithLease(opts.LeaseID)),
)

var onFailure *pb.RequestOp
if opts.GetOnFailure {
onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}}
txn = txn.Else(clientv3.OpGet(key))
}

txnResp, err := k.optimisticTxn(ctx, key, expectedRevision, onSuccess, onFailure)
txnResp, err := txn.Commit()
if err != nil {
return resp, clientv3.ContextError(ctx, err)
return resp, err
}
resp.Succeeded = txnResp.Succeeded
resp.Revision = txnResp.Header.Revision
Expand All @@ -109,16 +103,17 @@ func (k Client) OptimisticPut(ctx context.Context, key string, value []byte, exp
}

func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevision int64, opts DeleteOptions) (resp DeleteResponse, err error) {
onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}}

var onFailure *pb.RequestOp
txn := k.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision),
).Then(
clientv3.OpDelete(key),
)
if opts.GetOnFailure {
onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}}
txn = txn.Else(clientv3.OpGet(key))
}

txnResp, err := k.optimisticTxn(ctx, key, expectedRevision, onSuccess, onFailure)
txnResp, err := txn.Commit()
if err != nil {
return resp, clientv3.ContextError(ctx, err)
return resp, err
}
resp.Succeeded = txnResp.Succeeded
resp.Revision = txnResp.Header.Revision
Expand All @@ -128,34 +123,6 @@ func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevisi
return resp, nil
}

func (k Client) optimisticTxn(ctx context.Context, key string, expectedRevision int64, onSuccess, onFailure *pb.RequestOp) (*pb.TxnResponse, error) {
txn := &pb.TxnRequest{
Compare: []*pb.Compare{
{
Result: pb.Compare_EQUAL,
Target: pb.Compare_MOD,
Key: []byte(key),
TargetUnion: &pb.Compare_ModRevision{ModRevision: expectedRevision},
},
},
}
if onSuccess != nil {
txn.Success = []*pb.RequestOp{onSuccess}
}
if onFailure != nil {
txn.Failure = []*pb.RequestOp{onFailure}
}
return k.kv.Txn(ctx, txn)
}

func getRequest(key string, revision int64) *pb.RangeRequest {
return &pb.RangeRequest{
Key: []byte(key),
Revision: revision,
Limit: 1,
}
}

func kvFromTxnResponse(resp *pb.ResponseOp) *mvccpb.KeyValue {
getResponse := resp.GetResponseRange()
if len(getResponse.Kvs) == 1 {
Expand Down

0 comments on commit 880b3cb

Please sign in to comment.