Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.4] Backport clientv3:get AuthToken gracefully without dialing gRPC with balancer API to get extra connection #16826

Merged
merged 3 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 18 additions & 32 deletions clientv3/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
"fmt"
"strings"

"google.golang.org/grpc"

"go.etcd.io/etcd/auth/authpb"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
)

type (
Expand Down Expand Up @@ -55,6 +56,9 @@ const (
type UserAddOptions authpb.UserAddOptions

type Auth interface {
// Authenticate login and get token
Authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error)

// AuthEnable enables auth of an etcd cluster.
AuthEnable(ctx context.Context) (*AuthEnableResponse, error)

Expand Down Expand Up @@ -117,6 +121,19 @@ func NewAuth(c *Client) Auth {
return api
}

func NewAuthFromAuthClient(remote pb.AuthClient, c *Client) Auth {
api := &authClient{remote: remote}
if c != nil {
api.callOpts = c.callOpts
}
return api
}

func (auth *authClient) Authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, auth.callOpts...)
return (*AuthenticateResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, auth.callOpts...)
return (*AuthEnableResponse)(resp), toErr(ctx, err)
Expand Down Expand Up @@ -209,34 +226,3 @@ func StrToPermissionType(s string) (PermissionType, error) {
}
return PermissionType(-1), fmt.Errorf("invalid permission type: %s", s)
}

type authenticator struct {
conn *grpc.ClientConn // conn in-use
remote pb.AuthClient
callOpts []grpc.CallOption
}

func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, auth.callOpts...)
return (*AuthenticateResponse)(resp), toErr(ctx, err)
}

func (auth *authenticator) close() {
auth.conn.Close()
}

func newAuthenticator(ctx context.Context, target string, opts []grpc.DialOption, c *Client) (*authenticator, error) {
conn, err := grpc.DialContext(ctx, target, opts...)
if err != nil {
return nil, err
}

api := &authenticator{
conn: conn,
remote: pb.NewAuthClient(conn),
}
if c != nil {
api.callOpts = c.callOpts
}
return api, nil
}
76 changes: 24 additions & 52 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,43 +292,20 @@ func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {

func (c *Client) getToken(ctx context.Context) error {
var err error // return last error in a case of fail
var auth *authenticator

eps := c.Endpoints()
for _, ep := range eps {
// use dial options without dopts to avoid reusing the client balancer
var dOpts []grpc.DialOption
_, host, _ := endpoint.ParseEndpoint(ep)
target := c.resolverGroup.Target(host)
creds := c.dialWithBalancerCreds(ep)
dOpts, err = c.dialSetupOpts(creds, c.cfg.DialOptions...)
if err != nil {
err = fmt.Errorf("failed to configure auth dialer: %v", err)
continue
}
dOpts = append(dOpts, grpc.WithBalancerName(roundRobinBalancerName))
auth, err = newAuthenticator(ctx, target, dOpts, c)
if err != nil {
continue
}
defer auth.close()

var resp *AuthenticateResponse
resp, err = auth.authenticate(ctx, c.Username, c.Password)
if err != nil {
// return err without retrying other endpoints
if err == rpctypes.ErrAuthNotEnabled {
c.authTokenBundle.UpdateAuthToken("")
return err
}
continue
}

c.authTokenBundle.UpdateAuthToken(resp.Token)
if c.Username == "" || c.Password == "" {
return nil
}

return err
resp, err := c.Auth.Authenticate(ctx, c.Username, c.Password)
if err != nil {
if err == rpctypes.ErrAuthNotEnabled {
return nil
}
return err
}
c.authTokenBundle.UpdateAuthToken(resp.Token)
return nil
}

// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
Expand All @@ -349,25 +326,7 @@ func (c *Client) dial(target string, creds grpccredentials.TransportCredentials,

if c.Username != "" && c.Password != "" {
c.authTokenBundle = credentials.NewBundle(credentials.Config{})

ctx, cancel := c.ctx, func() {}
if c.cfg.DialTimeout > 0 {
ctx, cancel = context.WithTimeout(ctx, c.cfg.DialTimeout)
}

err = c.getToken(ctx)
if err != nil {
if toErr(ctx, err) != rpctypes.ErrAuthNotEnabled {
if err == ctx.Err() && ctx.Err() != c.ctx.Err() {
err = context.DeadlineExceeded
}
cancel()
return nil, err
}
} else {
opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials()))
}
cancel()
opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials()))
}

opts = append(opts, c.cfg.DialOptions...)
Expand Down Expand Up @@ -510,6 +469,19 @@ func newClient(cfg *Config) (*Client, error) {
client.Auth = NewAuth(client)
client.Maintenance = NewMaintenance(client)

//get token with established connection
ctx, cancel = client.ctx, func() {}
if client.cfg.DialTimeout > 0 {
ctx, cancel = context.WithTimeout(ctx, client.cfg.DialTimeout)
}
err = client.getToken(ctx)
if err != nil {
client.Close()
cancel()
return nil, err
}
cancel()

if cfg.RejectOldCluster {
if err := client.checkVersion(); err != nil {
client.Close()
Expand Down
3 changes: 3 additions & 0 deletions clientv3/credentials/credentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ func (rc *perRPCCredential) GetRequestMetadata(ctx context.Context, s ...string)
rc.authTokenMu.RLock()
authToken := rc.authToken
rc.authTokenMu.RUnlock()
if authToken == "" {
return nil, nil
}
return map[string]string{rpctypes.TokenFieldNameGRPC: authToken}, nil
}

Expand Down
14 changes: 13 additions & 1 deletion clientv3/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,19 @@ func NewMaintenance(c *Client) Maintenance {
if err != nil {
return nil, nil, fmt.Errorf("failed to dial endpoint %s with maintenance client: %v", endpoint, err)
}
cancel := func() { conn.Close() }

//get token with established connection
dctx := c.ctx
cancel := func() {}
if c.cfg.DialTimeout > 0 {
dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
}
err = c.getToken(dctx)
cancel()
if err != nil {
return nil, nil, fmt.Errorf("failed to getToken from endpoint %s with maintenance client: %v", endpoint, err)
}
cancel = func() { conn.Close() }
return RetryMaintenanceClient(c, conn), cancel, nil
},
remote: RetryMaintenanceClient(c, c.conn),
Expand Down
17 changes: 10 additions & 7 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,13 +703,16 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
ID: s.reqIDGen.Next(),
}

authInfo, err := s.AuthInfoFromCtx(ctx)
if err != nil {
return nil, err
}
if authInfo != nil {
r.Header.Username = authInfo.Username
r.Header.AuthRevision = authInfo.Revision
// check authinfo if it is not InternalAuthenticateRequest
if r.Authenticate == nil {
authInfo, err := s.AuthInfoFromCtx(ctx)
if err != nil {
return nil, err
}
if authInfo != nil {
r.Header.Username = authInfo.Username
r.Header.AuthRevision = authInfo.Revision
}
}

data, err := r.Marshal()
Expand Down
Loading