diff --git a/client/v3/auth.go b/client/v3/auth.go index 382172b21bf..ae85ec9a942 100644 --- a/client/v3/auth.go +++ b/client/v3/auth.go @@ -135,67 +135,67 @@ func NewAuthFromAuthClient(remote pb.AuthClient, c *Client) Auth { func (auth *authClient) Authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) { resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, auth.callOpts...) - return (*AuthenticateResponse)(resp), ContextError(ctx, err) + return (*AuthenticateResponse)(resp), toErr(ctx, err) } func (auth *authClient) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) { resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, auth.callOpts...) - return (*AuthEnableResponse)(resp), ContextError(ctx, err) + return (*AuthEnableResponse)(resp), toErr(ctx, err) } func (auth *authClient) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) { resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, auth.callOpts...) - return (*AuthDisableResponse)(resp), ContextError(ctx, err) + return (*AuthDisableResponse)(resp), toErr(ctx, err) } func (auth *authClient) AuthStatus(ctx context.Context) (*AuthStatusResponse, error) { resp, err := auth.remote.AuthStatus(ctx, &pb.AuthStatusRequest{}, auth.callOpts...) - return (*AuthStatusResponse)(resp), ContextError(ctx, err) + return (*AuthStatusResponse)(resp), toErr(ctx, err) } func (auth *authClient) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) { resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password, Options: &authpb.UserAddOptions{NoPassword: false}}, auth.callOpts...) - return (*AuthUserAddResponse)(resp), ContextError(ctx, err) + return (*AuthUserAddResponse)(resp), toErr(ctx, err) } func (auth *authClient) UserAddWithOptions(ctx context.Context, name string, password string, options *UserAddOptions) (*AuthUserAddResponse, error) { resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password, Options: (*authpb.UserAddOptions)(options)}, auth.callOpts...) - return (*AuthUserAddResponse)(resp), ContextError(ctx, err) + return (*AuthUserAddResponse)(resp), toErr(ctx, err) } func (auth *authClient) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) { resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, auth.callOpts...) - return (*AuthUserDeleteResponse)(resp), ContextError(ctx, err) + return (*AuthUserDeleteResponse)(resp), toErr(ctx, err) } func (auth *authClient) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) { resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, auth.callOpts...) - return (*AuthUserChangePasswordResponse)(resp), ContextError(ctx, err) + return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err) } func (auth *authClient) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) { resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, auth.callOpts...) - return (*AuthUserGrantRoleResponse)(resp), ContextError(ctx, err) + return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err) } func (auth *authClient) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) { resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, auth.callOpts...) - return (*AuthUserGetResponse)(resp), ContextError(ctx, err) + return (*AuthUserGetResponse)(resp), toErr(ctx, err) } func (auth *authClient) UserList(ctx context.Context) (*AuthUserListResponse, error) { resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, auth.callOpts...) - return (*AuthUserListResponse)(resp), ContextError(ctx, err) + return (*AuthUserListResponse)(resp), toErr(ctx, err) } func (auth *authClient) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) { resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, auth.callOpts...) - return (*AuthUserRevokeRoleResponse)(resp), ContextError(ctx, err) + return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err) } func (auth *authClient) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) { resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, auth.callOpts...) - return (*AuthRoleAddResponse)(resp), ContextError(ctx, err) + return (*AuthRoleAddResponse)(resp), toErr(ctx, err) } func (auth *authClient) RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error) { @@ -205,27 +205,27 @@ func (auth *authClient) RoleGrantPermission(ctx context.Context, name string, ke PermType: authpb.Permission_Type(permType), } resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, auth.callOpts...) - return (*AuthRoleGrantPermissionResponse)(resp), ContextError(ctx, err) + return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err) } func (auth *authClient) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) { resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, auth.callOpts...) - return (*AuthRoleGetResponse)(resp), ContextError(ctx, err) + return (*AuthRoleGetResponse)(resp), toErr(ctx, err) } func (auth *authClient) RoleList(ctx context.Context) (*AuthRoleListResponse, error) { resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, auth.callOpts...) - return (*AuthRoleListResponse)(resp), ContextError(ctx, err) + return (*AuthRoleListResponse)(resp), toErr(ctx, err) } func (auth *authClient) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) { resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: []byte(key), RangeEnd: []byte(rangeEnd)}, auth.callOpts...) - return (*AuthRoleRevokePermissionResponse)(resp), ContextError(ctx, err) + return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err) } func (auth *authClient) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) { resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, auth.callOpts...) - return (*AuthRoleDeleteResponse)(resp), ContextError(ctx, err) + return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err) } func StrToPermissionType(s string) (PermissionType, error) { diff --git a/client/v3/client.go b/client/v3/client.go index 8789acb38c8..79654db9bb9 100644 --- a/client/v3/client.go +++ b/client/v3/client.go @@ -154,7 +154,7 @@ func (c *Client) Close() error { c.Lease.Close() } if c.conn != nil { - return ContextError(c.ctx, c.conn.Close()) + return toErr(c.ctx, c.conn.Close()) } return c.ctx.Err() } @@ -598,9 +598,7 @@ func isUnavailableErr(ctx context.Context, err error) bool { return false } -// ContextError converts the error into an EtcdError if the error message matches one of -// the defined messages; otherwise, it tries to retrieve the context error. -func ContextError(ctx context.Context, err error) error { +func toErr(ctx context.Context, err error) error { if err == nil { return nil } diff --git a/client/v3/cluster.go b/client/v3/cluster.go index 1b7e83375c3..faab2a1add5 100644 --- a/client/v3/cluster.go +++ b/client/v3/cluster.go @@ -93,7 +93,7 @@ func (c *cluster) memberAdd(ctx context.Context, peerAddrs []string, isLearner b } resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...) if err != nil { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } return (*MemberAddResponse)(resp), nil } @@ -102,7 +102,7 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes r := &pb.MemberRemoveRequest{ID: id} resp, err := c.remote.MemberRemove(ctx, r, c.callOpts...) if err != nil { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } return (*MemberRemoveResponse)(resp), nil } @@ -119,7 +119,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin if err == nil { return (*MemberUpdateResponse)(resp), nil } - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } func (c *cluster) MemberList(ctx context.Context, opts ...OpOption) (*MemberListResponse, error) { @@ -128,14 +128,14 @@ func (c *cluster) MemberList(ctx context.Context, opts ...OpOption) (*MemberList if err == nil { return (*MemberListResponse)(resp), nil } - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } func (c *cluster) MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error) { r := &pb.MemberPromoteRequest{ID: id} resp, err := c.remote.MemberPromote(ctx, r, c.callOpts...) if err != nil { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } return (*MemberPromoteResponse)(resp), nil } diff --git a/client/v3/kubernetes/client.go b/client/v3/kubernetes/client.go index 4ca0e5fe8c1..bf789b3322a 100644 --- a/client/v3/kubernetes/client.go +++ b/client/v3/kubernetes/client.go @@ -31,24 +31,20 @@ func New(cfg clientv3.Config) (*Client, error) { } kc := &Client{ Client: c, - kv: clientv3.RetryKVClient(c), } - kc.Kubernetes = kc return kc, nil } type Client struct { *clientv3.Client - Kubernetes Interface - kv pb.KVClient } var _ Interface = (*Client)(nil) func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetResponse, err error) { - rangeResp, err := k.kv.Range(ctx, getRequest(key, opts.Revision)) + rangeResp, err := k.KV.Get(ctx, key, clientv3.WithRev(opts.Revision), clientv3.WithLimit(1)) if err != nil { - return resp, clientv3.ContextError(ctx, err) + return resp, err } resp.Revision = rangeResp.Header.Revision if len(rangeResp.Kvs) == 1 { @@ -58,17 +54,14 @@ func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetR } func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp ListResponse, err error) { - rangeStart := prefix + opts.Continue + rangeStart := prefix + if opts.Continue != "" { + rangeStart = opts.Continue + "\x00" + } rangeEnd := clientv3.GetPrefixRangeEnd(prefix) - - rangeResp, err := k.kv.Range(ctx, &pb.RangeRequest{ - Key: []byte(rangeStart), - RangeEnd: []byte(rangeEnd), - Limit: opts.Limit, - Revision: opts.Revision, - }) + rangeResp, err := k.KV.Get(ctx, rangeStart, clientv3.WithRange(rangeEnd), clientv3.WithLimit(opts.Limit), clientv3.WithRev(opts.Revision)) if err != nil { - return resp, clientv3.ContextError(ctx, err) + return resp, err } resp.Kvs = rangeResp.Kvs resp.Count = rangeResp.Count @@ -77,28 +70,27 @@ func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp } func (k Client) Count(ctx context.Context, prefix string, _ CountOptions) (int64, error) { - resp, err := k.kv.Range(ctx, &pb.RangeRequest{ - Key: []byte(prefix), - RangeEnd: []byte(clientv3.GetPrefixRangeEnd(prefix)), - CountOnly: true, - }) + resp, err := k.KV.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithCountOnly()) if err != nil { - return 0, clientv3.ContextError(ctx, err) + return 0, err } return resp.Count, nil } func (k Client) OptimisticPut(ctx context.Context, key string, value []byte, expectedRevision int64, opts PutOptions) (resp PutResponse, err error) { - onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(key), Value: value, Lease: int64(opts.LeaseID)}}} + txn := k.KV.Txn(ctx).If( + clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision), + ).Then( + clientv3.OpPut(key, string(value), clientv3.WithLease(opts.LeaseID)), + ) - var onFailure *pb.RequestOp if opts.GetOnFailure { - onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}} + txn = txn.Else(clientv3.OpGet(key)) } - txnResp, err := k.optimisticTxn(ctx, key, expectedRevision, onSuccess, onFailure) + txnResp, err := txn.Commit() if err != nil { - return resp, clientv3.ContextError(ctx, err) + return resp, err } resp.Succeeded = txnResp.Succeeded resp.Revision = txnResp.Header.Revision @@ -109,16 +101,17 @@ func (k Client) OptimisticPut(ctx context.Context, key string, value []byte, exp } func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevision int64, opts DeleteOptions) (resp DeleteResponse, err error) { - onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}} - - var onFailure *pb.RequestOp + txn := k.KV.Txn(ctx).If( + clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision), + ).Then( + clientv3.OpDelete(key), + ) if opts.GetOnFailure { - onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}} + txn = txn.Else(clientv3.OpGet(key)) } - - txnResp, err := k.optimisticTxn(ctx, key, expectedRevision, onSuccess, onFailure) + txnResp, err := txn.Commit() if err != nil { - return resp, clientv3.ContextError(ctx, err) + return resp, err } resp.Succeeded = txnResp.Succeeded resp.Revision = txnResp.Header.Revision @@ -128,34 +121,6 @@ func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevisi return resp, nil } -func (k Client) optimisticTxn(ctx context.Context, key string, expectedRevision int64, onSuccess, onFailure *pb.RequestOp) (*pb.TxnResponse, error) { - txn := &pb.TxnRequest{ - Compare: []*pb.Compare{ - { - Result: pb.Compare_EQUAL, - Target: pb.Compare_MOD, - Key: []byte(key), - TargetUnion: &pb.Compare_ModRevision{ModRevision: expectedRevision}, - }, - }, - } - if onSuccess != nil { - txn.Success = []*pb.RequestOp{onSuccess} - } - if onFailure != nil { - txn.Failure = []*pb.RequestOp{onFailure} - } - return k.kv.Txn(ctx, txn) -} - -func getRequest(key string, revision int64) *pb.RangeRequest { - return &pb.RangeRequest{ - Key: []byte(key), - Revision: revision, - Limit: 1, - } -} - func kvFromTxnResponse(resp *pb.ResponseOp) *mvccpb.KeyValue { getResponse := resp.GetResponseRange() if len(getResponse.Kvs) == 1 { diff --git a/client/v3/kv.go b/client/v3/kv.go index 8fa8d56c249..f50f9595ce1 100644 --- a/client/v3/kv.go +++ b/client/v3/kv.go @@ -113,23 +113,23 @@ func NewKVFromKVClient(remote pb.KVClient, c *Client) KV { func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) { r, err := kv.Do(ctx, OpPut(key, val, opts...)) - return r.put, ContextError(ctx, err) + return r.put, toErr(ctx, err) } func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) { r, err := kv.Do(ctx, OpGet(key, opts...)) - return r.get, ContextError(ctx, err) + return r.get, toErr(ctx, err) } func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) { r, err := kv.Do(ctx, OpDelete(key, opts...)) - return r.del, ContextError(ctx, err) + return r.del, toErr(ctx, err) } func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) { resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), kv.callOpts...) if err != nil { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } return (*CompactResponse)(resp), err } @@ -178,5 +178,5 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { default: panic("Unknown op") } - return OpResponse{}, ContextError(ctx, err) + return OpResponse{}, toErr(ctx, err) } diff --git a/client/v3/lease.go b/client/v3/lease.go index 0f0c57a9572..bd067c63032 100644 --- a/client/v3/lease.go +++ b/client/v3/lease.go @@ -223,7 +223,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err } return gresp, nil } - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) { @@ -232,14 +232,14 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, if err == nil { return (*LeaseRevokeResponse)(resp), nil } - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) { r := toLeaseTimeToLiveRequest(id, opts...) resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...) if err != nil { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } gresp := &LeaseTimeToLiveResponse{ ResponseHeader: resp.GetHeader(), @@ -260,7 +260,7 @@ func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) { } return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil } - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) { @@ -315,7 +315,7 @@ func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive return resp, err } if isHaltErr(ctx, err) { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } } } @@ -405,13 +405,13 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (karesp *LeaseKe stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...) if err != nil { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } defer func() { if cerr := stream.CloseSend(); cerr != nil { if ferr == nil { - ferr = ContextError(ctx, cerr) + ferr = toErr(ctx, cerr) } return } @@ -419,12 +419,12 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (karesp *LeaseKe err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)}) if err != nil { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } resp, rerr := stream.Recv() if rerr != nil { - return nil, ContextError(ctx, rerr) + return nil, toErr(ctx, rerr) } karesp = &LeaseKeepAliveResponse{ @@ -464,7 +464,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { return err } - if ContextError(l.stopCtx, err) == rpctypes.ErrNoLeader { + if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader { l.closeRequireLeader() } break diff --git a/client/v3/maintenance.go b/client/v3/maintenance.go index f290f7ce137..d8cd137179f 100644 --- a/client/v3/maintenance.go +++ b/client/v3/maintenance.go @@ -155,7 +155,7 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) { if err == nil { return (*AlarmResponse)(resp), nil } - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) { @@ -168,13 +168,13 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE { ar, err := m.AlarmList(ctx) if err != nil { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } ret := AlarmResponse{} for _, am := range ar.Alarms { dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am)) if derr != nil { - return nil, ContextError(ctx, derr) + return nil, toErr(ctx, derr) } ret.Alarms = append(ret.Alarms, dresp.Alarms...) } @@ -185,18 +185,18 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR if err == nil { return (*AlarmResponse)(resp), nil } - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) { remote, cancel, err := m.dial(endpoint) if err != nil { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } defer cancel() resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, m.callOpts...) if err != nil { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } return (*DefragmentResponse)(resp), nil } @@ -204,12 +204,12 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) { remote, cancel, err := m.dial(endpoint) if err != nil { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } defer cancel() resp, err := remote.Status(ctx, &pb.StatusRequest{}, m.callOpts...) if err != nil { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } return (*StatusResponse)(resp), nil } @@ -218,12 +218,12 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (* remote, cancel, err := m.dial(endpoint) if err != nil { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } defer cancel() resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, m.callOpts...) if err != nil { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } return (*HashKVResponse)(resp), nil } @@ -231,7 +231,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (* func (m *maintenance) SnapshotWithVersion(ctx context.Context) (*SnapshotResponse, error) { ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...) if err != nil { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } m.lg.Info("opened snapshot stream; downloading") @@ -274,7 +274,7 @@ func (m *maintenance) SnapshotWithVersion(ctx context.Context) (*SnapshotRespons func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...) if err != nil { - return nil, ContextError(ctx, err) + return nil, toErr(ctx, err) } m.lg.Info("opened snapshot stream; downloading") @@ -326,12 +326,12 @@ type snapshotReadCloser struct { func (rc *snapshotReadCloser) Read(p []byte) (n int, err error) { n, err = rc.ReadCloser.Read(p) - return n, ContextError(rc.ctx, err) + return n, toErr(rc.ctx, err) } func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) { resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, m.callOpts...) - return (*MoveLeaderResponse)(resp), ContextError(ctx, err) + return (*MoveLeaderResponse)(resp), toErr(ctx, err) } func (m *maintenance) Downgrade(ctx context.Context, action DowngradeAction, version string) (*DowngradeResponse, error) { @@ -347,5 +347,5 @@ func (m *maintenance) Downgrade(ctx context.Context, action DowngradeAction, ver return nil, errors.New("etcdclient: unknown downgrade action") } resp, err := m.remote.Downgrade(ctx, &pb.DowngradeRequest{Action: actionType, Version: version}, m.callOpts...) - return (*DowngradeResponse)(resp), ContextError(ctx, err) + return (*DowngradeResponse)(resp), toErr(ctx, err) } diff --git a/client/v3/txn.go b/client/v3/txn.go index 0a57332ac78..a4e3706fa96 100644 --- a/client/v3/txn.go +++ b/client/v3/txn.go @@ -144,7 +144,7 @@ func (txn *txn) Commit() (*TxnResponse, error) { var err error resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...) if err != nil { - return nil, ContextError(txn.ctx, err) + return nil, toErr(txn.ctx, err) } return (*TxnResponse)(resp), nil } diff --git a/client/v3/watch.go b/client/v3/watch.go index bfa753cfd71..117c662b432 100644 --- a/client/v3/watch.go +++ b/client/v3/watch.go @@ -442,7 +442,7 @@ func (w *watchGRPCStream) close() (err error) { case err = <-w.errc: default: } - return ContextError(w.ctx, err) + return toErr(w.ctx, err) } func (w *watcher) closeStream(wgs *watchGRPCStream) { @@ -653,7 +653,7 @@ func (w *watchGRPCStream) run() { // watch client failed on Recv; spawn another if possible case err := <-w.errc: - if isHaltErr(w.ctx, err) || ContextError(w.ctx, err) == v3rpc.ErrNoLeader { + if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader { closeErr = err return }