Skip to content

Commit

Permalink
Fix passing default grpc call options in Kubernetes client
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Jul 24, 2024
1 parent f2ea60d commit 9446f6d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 28 deletions.
34 changes: 10 additions & 24 deletions client/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,14 +370,13 @@ func newClient(cfg *Config) (*Client, error) {

ctx, cancel := context.WithCancel(baseCtx)
client := &Client{
conn: nil,
cfg: *cfg,
creds: creds,
ctx: ctx,
cancel: cancel,
mu: new(sync.RWMutex),
callOpts: defaultCallOpts,
lgMu: new(sync.RWMutex),
conn: nil,
cfg: *cfg,
creds: creds,
ctx: ctx,
cancel: cancel,
mu: new(sync.RWMutex),
lgMu: new(sync.RWMutex),
}

var err error
Expand All @@ -400,22 +399,9 @@ func newClient(cfg *Config) (*Client, error) {
client.Password = cfg.Password
client.authTokenBundle = credentials.NewBundle(credentials.Config{})
}
if cfg.MaxCallSendMsgSize > 0 || cfg.MaxCallRecvMsgSize > 0 {
if cfg.MaxCallRecvMsgSize > 0 && cfg.MaxCallSendMsgSize > cfg.MaxCallRecvMsgSize {
return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", cfg.MaxCallRecvMsgSize, cfg.MaxCallSendMsgSize)
}
callOpts := []grpc.CallOption{
defaultWaitForReady,
defaultMaxCallSendMsgSize,
defaultMaxCallRecvMsgSize,
}
if cfg.MaxCallSendMsgSize > 0 {
callOpts[1] = grpc.MaxCallSendMsgSize(cfg.MaxCallSendMsgSize)
}
if cfg.MaxCallRecvMsgSize > 0 {
callOpts[2] = grpc.MaxCallRecvMsgSize(cfg.MaxCallRecvMsgSize)
}
client.callOpts = callOpts
client.callOpts, err = Options(cfg)
if err != nil {
return nil, err
}

client.resolver = resolver.New(cfg.Endpoints...)
Expand Down
15 changes: 11 additions & 4 deletions client/v3/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package kubernetes
import (
"context"

"google.golang.org/grpc"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
Expand All @@ -33,6 +35,10 @@ func New(cfg clientv3.Config) (*Client, error) {
Client: c,
kv: clientv3.RetryKVClient(c),
}
kc.callOpts, err = clientv3.Options(&cfg)
if err != nil {
return nil, err
}
kc.Kubernetes = kc
return kc, nil
}
Expand All @@ -41,12 +47,13 @@ type Client struct {
*clientv3.Client
Kubernetes Interface
kv pb.KVClient
callOpts []grpc.CallOption
}

var _ Interface = (*Client)(nil)

func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetResponse, err error) {
rangeResp, err := k.kv.Range(ctx, getRequest(key, opts.Revision))
rangeResp, err := k.kv.Range(ctx, getRequest(key, opts.Revision), k.callOpts...)
if err != nil {
return resp, clientv3.ContextError(ctx, err)
}
Expand All @@ -66,7 +73,7 @@ func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp
RangeEnd: []byte(rangeEnd),
Limit: opts.Limit,
Revision: opts.Revision,
})
}, k.callOpts...)
if err != nil {
return resp, clientv3.ContextError(ctx, err)
}
Expand All @@ -81,7 +88,7 @@ func (k Client) Count(ctx context.Context, prefix string, _ CountOptions) (int64
Key: []byte(prefix),
RangeEnd: []byte(clientv3.GetPrefixRangeEnd(prefix)),
CountOnly: true,
})
}, k.callOpts...)
if err != nil {
return 0, clientv3.ContextError(ctx, err)
}
Expand Down Expand Up @@ -145,7 +152,7 @@ func (k Client) optimisticTxn(ctx context.Context, key string, expectedRevision
if onFailure != nil {
txn.Failure = []*pb.RequestOp{onFailure}
}
return k.kv.Txn(ctx, txn)
return k.kv.Txn(ctx, txn, k.callOpts...)
}

func getRequest(key string, revision int64) *pb.RangeRequest {
Expand Down
22 changes: 22 additions & 0 deletions client/v3/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package clientv3

import (
"fmt"
"math"
"time"

Expand Down Expand Up @@ -67,3 +68,24 @@ var defaultCallOpts = []grpc.CallOption{

// MaxLeaseTTL is the maximum lease TTL value
const MaxLeaseTTL = 9000000000

func Options(cfg *Config) ([]grpc.CallOption, error) {
callOpts := defaultCallOpts
if cfg.MaxCallSendMsgSize > 0 || cfg.MaxCallRecvMsgSize > 0 {
if cfg.MaxCallRecvMsgSize > 0 && cfg.MaxCallSendMsgSize > cfg.MaxCallRecvMsgSize {
return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", cfg.MaxCallRecvMsgSize, cfg.MaxCallSendMsgSize)
}
callOpts = []grpc.CallOption{
defaultWaitForReady,
defaultMaxCallSendMsgSize,
defaultMaxCallRecvMsgSize,
}
if cfg.MaxCallSendMsgSize > 0 {
callOpts[1] = grpc.MaxCallSendMsgSize(cfg.MaxCallSendMsgSize)
}
if cfg.MaxCallRecvMsgSize > 0 {
callOpts[2] = grpc.MaxCallRecvMsgSize(cfg.MaxCallRecvMsgSize)
}
}
return callOpts, nil
}

0 comments on commit 9446f6d

Please sign in to comment.