From 5a92472843ec08762cfbdbb4c9c11a96b29fb853 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 31 Jul 2023 11:09:37 +0200 Subject: [PATCH] Introduce Kubernetes KV interface to etcd client Signed-off-by: Marek Siarkowicz --- client/v3/auth.go | 36 +++--- client/v3/client.go | 6 +- client/v3/cluster.go | 10 +- client/v3/kubernetes/interface.go | 88 ++++++++++++++ client/v3/kubernetes/kubernetes.go | 179 +++++++++++++++++++++++++++++ client/v3/kv.go | 10 +- client/v3/lease.go | 20 ++-- client/v3/maintenance.go | 26 ++--- client/v3/txn.go | 2 +- client/v3/watch.go | 4 +- 10 files changed, 325 insertions(+), 56 deletions(-) create mode 100644 client/v3/kubernetes/interface.go create mode 100644 client/v3/kubernetes/kubernetes.go diff --git a/client/v3/auth.go b/client/v3/auth.go index a6f75d321592..110918a4c7c4 100644 --- a/client/v3/auth.go +++ b/client/v3/auth.go @@ -134,67 +134,67 @@ func NewAuthFromAuthClient(remote pb.AuthClient, c *Client) Auth { func (auth *authClient) Authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) { resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, auth.callOpts...) - return (*AuthenticateResponse)(resp), toErr(ctx, err) + return (*AuthenticateResponse)(resp), ContextError(ctx, err) } func (auth *authClient) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) { resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, auth.callOpts...) - return (*AuthEnableResponse)(resp), toErr(ctx, err) + return (*AuthEnableResponse)(resp), ContextError(ctx, err) } func (auth *authClient) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) { resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, auth.callOpts...) - return (*AuthDisableResponse)(resp), toErr(ctx, err) + return (*AuthDisableResponse)(resp), ContextError(ctx, err) } func (auth *authClient) AuthStatus(ctx context.Context) (*AuthStatusResponse, error) { resp, err := auth.remote.AuthStatus(ctx, &pb.AuthStatusRequest{}, auth.callOpts...) - return (*AuthStatusResponse)(resp), toErr(ctx, err) + return (*AuthStatusResponse)(resp), ContextError(ctx, err) } func (auth *authClient) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) { resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password, Options: &authpb.UserAddOptions{NoPassword: false}}, auth.callOpts...) - return (*AuthUserAddResponse)(resp), toErr(ctx, err) + return (*AuthUserAddResponse)(resp), ContextError(ctx, err) } func (auth *authClient) UserAddWithOptions(ctx context.Context, name string, password string, options *UserAddOptions) (*AuthUserAddResponse, error) { resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password, Options: (*authpb.UserAddOptions)(options)}, auth.callOpts...) - return (*AuthUserAddResponse)(resp), toErr(ctx, err) + return (*AuthUserAddResponse)(resp), ContextError(ctx, err) } func (auth *authClient) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) { resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, auth.callOpts...) - return (*AuthUserDeleteResponse)(resp), toErr(ctx, err) + return (*AuthUserDeleteResponse)(resp), ContextError(ctx, err) } func (auth *authClient) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) { resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, auth.callOpts...) - return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err) + return (*AuthUserChangePasswordResponse)(resp), ContextError(ctx, err) } func (auth *authClient) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) { resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, auth.callOpts...) - return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err) + return (*AuthUserGrantRoleResponse)(resp), ContextError(ctx, err) } func (auth *authClient) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) { resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, auth.callOpts...) - return (*AuthUserGetResponse)(resp), toErr(ctx, err) + return (*AuthUserGetResponse)(resp), ContextError(ctx, err) } func (auth *authClient) UserList(ctx context.Context) (*AuthUserListResponse, error) { resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, auth.callOpts...) - return (*AuthUserListResponse)(resp), toErr(ctx, err) + return (*AuthUserListResponse)(resp), ContextError(ctx, err) } func (auth *authClient) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) { resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, auth.callOpts...) - return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err) + return (*AuthUserRevokeRoleResponse)(resp), ContextError(ctx, err) } func (auth *authClient) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) { resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, auth.callOpts...) - return (*AuthRoleAddResponse)(resp), toErr(ctx, err) + return (*AuthRoleAddResponse)(resp), ContextError(ctx, err) } func (auth *authClient) RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error) { @@ -204,27 +204,27 @@ func (auth *authClient) RoleGrantPermission(ctx context.Context, name string, ke PermType: authpb.Permission_Type(permType), } resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, auth.callOpts...) - return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err) + return (*AuthRoleGrantPermissionResponse)(resp), ContextError(ctx, err) } func (auth *authClient) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) { resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, auth.callOpts...) - return (*AuthRoleGetResponse)(resp), toErr(ctx, err) + return (*AuthRoleGetResponse)(resp), ContextError(ctx, err) } func (auth *authClient) RoleList(ctx context.Context) (*AuthRoleListResponse, error) { resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, auth.callOpts...) - return (*AuthRoleListResponse)(resp), toErr(ctx, err) + return (*AuthRoleListResponse)(resp), ContextError(ctx, err) } func (auth *authClient) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) { resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: []byte(key), RangeEnd: []byte(rangeEnd)}, auth.callOpts...) - return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err) + return (*AuthRoleRevokePermissionResponse)(resp), ContextError(ctx, err) } func (auth *authClient) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) { resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, auth.callOpts...) - return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err) + return (*AuthRoleDeleteResponse)(resp), ContextError(ctx, err) } func StrToPermissionType(s string) (PermissionType, error) { diff --git a/client/v3/client.go b/client/v3/client.go index 312d03e7a62a..f7aa65a0a729 100644 --- a/client/v3/client.go +++ b/client/v3/client.go @@ -148,7 +148,7 @@ func (c *Client) Close() error { c.Lease.Close() } if c.conn != nil { - return toErr(c.ctx, c.conn.Close()) + return ContextError(c.ctx, c.conn.Close()) } return c.ctx.Err() } @@ -573,7 +573,9 @@ func isUnavailableErr(ctx context.Context, err error) bool { return false } -func toErr(ctx context.Context, err error) error { +// ContextError converts the error into an EtcdError if the error message matches one of +// the defined messages; otherwise, it tries to retrieve the context error. +func ContextError(ctx context.Context, err error) error { if err == nil { return nil } diff --git a/client/v3/cluster.go b/client/v3/cluster.go index 92d7cdb56b0f..1815c1c96408 100644 --- a/client/v3/cluster.go +++ b/client/v3/cluster.go @@ -93,7 +93,7 @@ func (c *cluster) memberAdd(ctx context.Context, peerAddrs []string, isLearner b } resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...) if err != nil { - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } return (*MemberAddResponse)(resp), nil } @@ -102,7 +102,7 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes r := &pb.MemberRemoveRequest{ID: id} resp, err := c.remote.MemberRemove(ctx, r, c.callOpts...) if err != nil { - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } return (*MemberRemoveResponse)(resp), nil } @@ -119,7 +119,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin if err == nil { return (*MemberUpdateResponse)(resp), nil } - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) { @@ -128,14 +128,14 @@ func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) { if err == nil { return (*MemberListResponse)(resp), nil } - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } func (c *cluster) MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error) { r := &pb.MemberPromoteRequest{ID: id} resp, err := c.remote.MemberPromote(ctx, r, c.callOpts...) if err != nil { - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } return (*MemberPromoteResponse)(resp), nil } diff --git a/client/v3/kubernetes/interface.go b/client/v3/kubernetes/interface.go new file mode 100644 index 000000000000..e7213c88ee59 --- /dev/null +++ b/client/v3/kubernetes/interface.go @@ -0,0 +1,88 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// Interface defines the minimal client side interface that Kubernetes requires +// to utilize etcd for reconciliation protocol. Methods below are normal +// etcd operations with their semantics adjusted to better suite Kubernetes. +type Interface interface { + // Get retrieves a single key. + // When GetOptions.Revision sets rev > 0, Get retrieves keys at the given revision; + // if the required revision is compacted, the request will fail with ErrCompacted. + Get(ctx context.Context, key string, opts GetOptions) (GetResponse, error) + // List retrieves keys sharing the same prefix. + // When ListOptions.Revision > 0, List retrieves keys at the given revision; + // if the required revision is compacted, the request will fail with ErrCompacted. + // When ListOptions.Limit > 0 , the number of returned keys is bounded by limit. + // When ListOptions.Continue != 0, List will skip all the preceding keys. + List(ctx context.Context, prefix string, opts ListOptions) (ListResponse, error) + // Count retrieves number of keys sharing the same prefix. + Count(ctx context.Context, prefix string) (int64, error) + OptimisticPut(ctx context.Context, key string, value []byte, opts PutOptions) (PutResponse, error) + OptimisticDelete(ctx context.Context, key string, opts DeleteOptions) (DeleteResponse, error) +} + +type GetOptions struct { + Revision int64 +} + +type ListOptions struct { + Revision int64 + Limit int64 + Continue string +} + +type PutOptions struct { + ExpectedRevision int64 + GetOnFailure bool + // LeaseID + // Deprecated: Should be replaced with TTL when Interface starts using one lease per object. + LeaseID clientv3.LeaseID +} + +type DeleteOptions struct { + ExpectedRevision int64 + GetOnFailure bool +} + +type GetResponse struct { + KV *mvccpb.KeyValue + Revision int64 +} + +type ListResponse struct { + KVs []*mvccpb.KeyValue + Count int64 + Revision int64 +} + +type PutResponse struct { + KV *mvccpb.KeyValue + Succeeded bool + Revision int64 +} + +type DeleteResponse struct { + KV *mvccpb.KeyValue + Succeeded bool + Revision int64 +} diff --git a/client/v3/kubernetes/kubernetes.go b/client/v3/kubernetes/kubernetes.go new file mode 100644 index 000000000000..71efe073718a --- /dev/null +++ b/client/v3/kubernetes/kubernetes.go @@ -0,0 +1,179 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubernetes + +import ( + "context" + + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// New creates KubernetesClient from config. +// Caller is responsible to call Close() to clean up client. +func New(cfg clientv3.Config) (*KubernetesClient, error) { + c, err := clientv3.New(cfg) + if err != nil { + return nil, err + } + return fromClient(c, c.Close), nil +} + +// FromClient creates KubernetesClient from existing client. +// Caller can still call Close() however, it's not needed as long caller should call Close() on the original client. +func FromClient(c *clientv3.Client) *KubernetesClient { + return fromClient(c, nil) +} + +func fromClient(c *clientv3.Client, close func() error) *KubernetesClient { + return &KubernetesClient{ + kv: clientv3.RetryKVClient(c), + close: close, + } +} + +type KubernetesClient struct { + kv pb.KVClient + close func() error +} + +var _ Interface = (*KubernetesClient)(nil) + +func (k KubernetesClient) Get(ctx context.Context, key string, opts GetOptions) (resp GetResponse, err error) { + rangeResp, err := k.kv.Range(ctx, getRequest(key, opts.Revision)) + if err != nil { + return resp, clientv3.ContextError(ctx, err) + } + resp.Revision = rangeResp.Header.Revision + if len(rangeResp.Kvs) == 1 { + resp.KV = rangeResp.Kvs[0] + } + return resp, nil +} + +func (k KubernetesClient) List(ctx context.Context, prefix string, opts ListOptions) (resp ListResponse, err error) { + rangeStart := prefix + opts.Continue + rangeEnd := clientv3.GetPrefixRangeEnd(prefix) + + rangeResp, err := k.kv.Range(ctx, &pb.RangeRequest{ + Key: []byte(rangeStart), + RangeEnd: []byte(rangeEnd), + Limit: opts.Limit, + Revision: opts.Revision, + }) + if err != nil { + return resp, clientv3.ContextError(ctx, err) + } + resp.KVs = rangeResp.Kvs + resp.Count = rangeResp.Count + resp.Revision = rangeResp.Header.Revision + return resp, nil +} + +func (k KubernetesClient) Count(ctx context.Context, prefix string) (int64, error) { + resp, err := k.kv.Range(ctx, &pb.RangeRequest{ + Key: []byte(prefix), + RangeEnd: []byte(clientv3.GetPrefixRangeEnd(prefix)), + CountOnly: true, + }) + if err != nil { + return 0, clientv3.ContextError(ctx, err) + } + return resp.Count, nil +} + +func (k KubernetesClient) OptimisticPut(ctx context.Context, key string, value []byte, opts PutOptions) (resp PutResponse, err error) { + onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(key), Value: value, Lease: int64(opts.LeaseID)}}} + + var onFailure *pb.RequestOp + if opts.GetOnFailure { + onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}} + } + + txnResp, err := k.optimisticTxn(ctx, key, opts.ExpectedRevision, onSuccess, onFailure) + if err != nil { + return resp, clientv3.ContextError(ctx, err) + } + resp.Succeeded = txnResp.Succeeded + resp.Revision = txnResp.Header.Revision + if opts.GetOnFailure && !txnResp.Succeeded { + resp.KV = kvFromTxnResponse(txnResp.Responses[0]) + } + return resp, nil +} + +func (k KubernetesClient) OptimisticDelete(ctx context.Context, key string, opts DeleteOptions) (resp DeleteResponse, err error) { + onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}} + + var onFailure *pb.RequestOp + if opts.GetOnFailure { + onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}} + } + + txnResp, err := k.optimisticTxn(ctx, key, opts.ExpectedRevision, onSuccess, onFailure) + if err != nil { + return resp, clientv3.ContextError(ctx, err) + } + resp.Succeeded = txnResp.Succeeded + resp.Revision = txnResp.Header.Revision + if opts.GetOnFailure && !txnResp.Succeeded { + resp.KV = kvFromTxnResponse(txnResp.Responses[0]) + } + return resp, nil +} + +func (k KubernetesClient) optimisticTxn(ctx context.Context, key string, expectRevision int64, onSuccess, onFailure *pb.RequestOp) (*pb.TxnResponse, error) { + txn := &pb.TxnRequest{ + Compare: []*pb.Compare{ + { + Result: pb.Compare_EQUAL, + Target: pb.Compare_MOD, + Key: []byte(key), + TargetUnion: &pb.Compare_ModRevision{ModRevision: expectRevision}, + }, + }, + } + if onSuccess != nil { + txn.Success = []*pb.RequestOp{onSuccess} + } + if onFailure != nil { + txn.Failure = []*pb.RequestOp{onFailure} + } + return k.kv.Txn(ctx, txn) +} + +func getRequest(key string, revision int64) *pb.RangeRequest { + return &pb.RangeRequest{ + Key: []byte(key), + Revision: revision, + Limit: 1, + } +} + +func kvFromTxnResponse(resp *pb.ResponseOp) *mvccpb.KeyValue { + getResponse := resp.GetResponseRange() + if len(getResponse.Kvs) == 1 { + return getResponse.Kvs[0] + } + return nil +} + +func (k KubernetesClient) Close() error { + if k.close == nil { + return nil + } + return k.close() +} diff --git a/client/v3/kv.go b/client/v3/kv.go index 5e9fb7d45896..be5b508dd615 100644 --- a/client/v3/kv.go +++ b/client/v3/kv.go @@ -112,23 +112,23 @@ func NewKVFromKVClient(remote pb.KVClient, c *Client) KV { func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) { r, err := kv.Do(ctx, OpPut(key, val, opts...)) - return r.put, toErr(ctx, err) + return r.put, ContextError(ctx, err) } func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) { r, err := kv.Do(ctx, OpGet(key, opts...)) - return r.get, toErr(ctx, err) + return r.get, ContextError(ctx, err) } func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) { r, err := kv.Do(ctx, OpDelete(key, opts...)) - return r.del, toErr(ctx, err) + return r.del, ContextError(ctx, err) } func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) { resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), kv.callOpts...) if err != nil { - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } return (*CompactResponse)(resp), err } @@ -173,5 +173,5 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { default: panic("Unknown op") } - return OpResponse{}, toErr(ctx, err) + return OpResponse{}, ContextError(ctx, err) } diff --git a/client/v3/lease.go b/client/v3/lease.go index 19af9c093a35..4e7d1caf8312 100644 --- a/client/v3/lease.go +++ b/client/v3/lease.go @@ -223,7 +223,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err } return gresp, nil } - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) { @@ -232,14 +232,14 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, if err == nil { return (*LeaseRevokeResponse)(resp), nil } - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) { r := toLeaseTimeToLiveRequest(id, opts...) resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...) if err != nil { - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } gresp := &LeaseTimeToLiveResponse{ ResponseHeader: resp.GetHeader(), @@ -260,7 +260,7 @@ func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) { } return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil } - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) { @@ -315,7 +315,7 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive return resp, err } if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } } } @@ -405,13 +405,13 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (karesp *LeaseKe stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...) if err != nil { - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } defer func() { if err := stream.CloseSend(); err != nil { if ferr == nil { - ferr = toErr(ctx, err) + ferr = ContextError(ctx, err) } return } @@ -419,12 +419,12 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (karesp *LeaseKe err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)}) if err != nil { - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } resp, rerr := stream.Recv() if rerr != nil { - return nil, toErr(ctx, rerr) + return nil, ContextError(ctx, rerr) } karesp = &LeaseKeepAliveResponse{ @@ -461,7 +461,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { return err } - if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader { + if ContextError(l.stopCtx, err) == rpctypes.ErrNoLeader { l.closeRequireLeader() } break diff --git a/client/v3/maintenance.go b/client/v3/maintenance.go index a98b8ca51e1a..71b28e6dc3f9 100644 --- a/client/v3/maintenance.go +++ b/client/v3/maintenance.go @@ -130,7 +130,7 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) { if err == nil { return (*AlarmResponse)(resp), nil } - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) { @@ -143,13 +143,13 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE { ar, err := m.AlarmList(ctx) if err != nil { - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } ret := AlarmResponse{} for _, am := range ar.Alarms { dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am)) if derr != nil { - return nil, toErr(ctx, derr) + return nil, ContextError(ctx, derr) } ret.Alarms = append(ret.Alarms, dresp.Alarms...) } @@ -160,18 +160,18 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR if err == nil { return (*AlarmResponse)(resp), nil } - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) { remote, cancel, err := m.dial(endpoint) if err != nil { - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } defer cancel() resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, m.callOpts...) if err != nil { - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } return (*DefragmentResponse)(resp), nil } @@ -179,12 +179,12 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) { remote, cancel, err := m.dial(endpoint) if err != nil { - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } defer cancel() resp, err := remote.Status(ctx, &pb.StatusRequest{}, m.callOpts...) if err != nil { - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } return (*StatusResponse)(resp), nil } @@ -193,12 +193,12 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (* remote, cancel, err := m.dial(endpoint) if err != nil { - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } defer cancel() resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, m.callOpts...) if err != nil { - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } return (*HashKVResponse)(resp), nil } @@ -206,7 +206,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (* func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...) if err != nil { - return nil, toErr(ctx, err) + return nil, ContextError(ctx, err) } m.lg.Info("opened snapshot stream; downloading") @@ -246,10 +246,10 @@ type snapshotReadCloser struct { func (rc *snapshotReadCloser) Read(p []byte) (n int, err error) { n, err = rc.ReadCloser.Read(p) - return n, toErr(rc.ctx, err) + return n, ContextError(rc.ctx, err) } func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) { resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, m.callOpts...) - return (*MoveLeaderResponse)(resp), toErr(ctx, err) + return (*MoveLeaderResponse)(resp), ContextError(ctx, err) } diff --git a/client/v3/txn.go b/client/v3/txn.go index 3f6a953cf094..e31bfe0b94d2 100644 --- a/client/v3/txn.go +++ b/client/v3/txn.go @@ -144,7 +144,7 @@ func (txn *txn) Commit() (*TxnResponse, error) { var err error resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...) if err != nil { - return nil, toErr(txn.ctx, err) + return nil, ContextError(txn.ctx, err) } return (*TxnResponse)(resp), nil } diff --git a/client/v3/watch.go b/client/v3/watch.go index 41a6ec976333..5590b68378f6 100644 --- a/client/v3/watch.go +++ b/client/v3/watch.go @@ -442,7 +442,7 @@ func (w *watchGrpcStream) close() (err error) { case err = <-w.errc: default: } - return toErr(w.ctx, err) + return ContextError(w.ctx, err) } func (w *watcher) closeStream(wgs *watchGrpcStream) { @@ -653,7 +653,7 @@ func (w *watchGrpcStream) run() { // watch client failed on Recv; spawn another if possible case err := <-w.errc: - if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader { + if isHaltErr(w.ctx, err) || ContextError(w.ctx, err) == v3rpc.ErrNoLeader { closeErr = err return }