From 83da5ff575e5c070d0149cf1698dc9942cc22d03 Mon Sep 17 00:00:00 2001 From: Chao Chen Date: Wed, 25 Oct 2023 20:55:17 -0700 Subject: [PATCH] [3.4] Backport #12671 clientv3: Replace balancer with upstream grpc solution Signed-off-by: Chao Chen --- bill-of-materials.json | 9 - clientv3/balancer/balancer.go | 293 ---------------- clientv3/balancer/balancer_test.go | 323 ------------------ .../balancer/connectivity/connectivity.go | 93 ----- clientv3/balancer/picker/doc.go | 16 - clientv3/balancer/picker/err.go | 39 --- clientv3/balancer/picker/picker.go | 91 ----- .../balancer/picker/roundrobin_balanced.go | 95 ------ .../balancer/resolver/endpoint/endpoint.go | 248 -------------- clientv3/balancer/utils.go | 68 ---- clientv3/balancer/utils_test.go | 34 -- clientv3/client.go | 140 ++------ clientv3/credentials/credentials.go | 3 +- clientv3/internal/endpoint/endpoint.go | 68 ++++ clientv3/internal/endpoint/endpoint_test.go | 65 ++++ clientv3/internal/resolver/resolver.go | 71 ++++ go.mod | 1 - go.sum | 2 - 18 files changed, 235 insertions(+), 1424 deletions(-) delete mode 100644 clientv3/balancer/balancer.go delete mode 100644 clientv3/balancer/balancer_test.go delete mode 100644 clientv3/balancer/connectivity/connectivity.go delete mode 100644 clientv3/balancer/picker/doc.go delete mode 100644 clientv3/balancer/picker/err.go delete mode 100644 clientv3/balancer/picker/picker.go delete mode 100644 clientv3/balancer/picker/roundrobin_balanced.go delete mode 100644 clientv3/balancer/resolver/endpoint/endpoint.go delete mode 100644 clientv3/balancer/utils.go delete mode 100644 clientv3/balancer/utils_test.go create mode 100644 clientv3/internal/endpoint/endpoint.go create mode 100644 clientv3/internal/endpoint/endpoint_test.go create mode 100644 clientv3/internal/resolver/resolver.go diff --git a/bill-of-materials.json b/bill-of-materials.json index e72b0bd89d6..19d63aca21f 100644 --- a/bill-of-materials.json +++ b/bill-of-materials.json @@ -98,15 +98,6 @@ } ] }, - { - "project": "github.com/google/uuid", - "licenses": [ - { - "type": "BSD 3-clause \"New\" or \"Revised\" License", - "confidence": 0.9663865546218487 - } - ] - }, { "project": "github.com/gorilla/websocket", "licenses": [ diff --git a/clientv3/balancer/balancer.go b/clientv3/balancer/balancer.go deleted file mode 100644 index d02a7eec7c3..00000000000 --- a/clientv3/balancer/balancer.go +++ /dev/null @@ -1,293 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package balancer implements client balancer. -package balancer - -import ( - "strconv" - "sync" - "time" - - "go.etcd.io/etcd/clientv3/balancer/connectivity" - "go.etcd.io/etcd/clientv3/balancer/picker" - - "go.uber.org/zap" - "google.golang.org/grpc/balancer" - grpcconnectivity "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/resolver" - _ "google.golang.org/grpc/resolver/dns" // register DNS resolver - _ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver -) - -// Config defines balancer configurations. -type Config struct { - // Policy configures balancer policy. - Policy picker.Policy - - // Picker implements gRPC picker. - // Leave empty if "Policy" field is not custom. - // TODO: currently custom policy is not supported. - // Picker picker.Picker - - // Name defines an additional name for balancer. - // Useful for balancer testing to avoid register conflicts. - // If empty, defaults to policy name. - Name string - - // Logger configures balancer logging. - // If nil, logs are discarded. - Logger *zap.Logger -} - -// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it -// must be invoked at initialization time. -func RegisterBuilder(cfg Config) { - bb := &builder{cfg} - balancer.Register(bb) - - bb.cfg.Logger.Debug( - "registered balancer", - zap.String("policy", bb.cfg.Policy.String()), - zap.String("name", bb.cfg.Name), - ) -} - -type builder struct { - cfg Config -} - -// Build is called initially when creating "ccBalancerWrapper". -// "grpc.Dial" is called to this client connection. -// Then, resolved addresses will be handled via "HandleResolvedAddrs". -func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { - bb := &baseBalancer{ - id: strconv.FormatInt(time.Now().UnixNano(), 36), - policy: b.cfg.Policy, - name: b.cfg.Name, - lg: b.cfg.Logger, - - addrToSc: make(map[resolver.Address]balancer.SubConn), - scToAddr: make(map[balancer.SubConn]resolver.Address), - scToSt: make(map[balancer.SubConn]grpcconnectivity.State), - - currentConn: nil, - connectivityRecorder: connectivity.New(b.cfg.Logger), - - // initialize picker always returns "ErrNoSubConnAvailable" - picker: picker.NewErr(balancer.ErrNoSubConnAvailable), - } - - // TODO: support multiple connections - bb.mu.Lock() - bb.currentConn = cc - bb.mu.Unlock() - - bb.lg.Info( - "built balancer", - zap.String("balancer-id", bb.id), - zap.String("policy", bb.policy.String()), - zap.String("resolver-target", cc.Target()), - ) - return bb -} - -// Name implements "grpc/balancer.Builder" interface. -func (b *builder) Name() string { return b.cfg.Name } - -// Balancer defines client balancer interface. -type Balancer interface { - // Balancer is called on specified client connection. Client initiates gRPC - // connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved - // addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs". - // For each resolved address, balancer calls "balancer.ClientConn.NewSubConn". - // "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state - // changes, thus requires failover logic in this method. - balancer.Balancer - - // Picker calls "Pick" for every client request. - picker.Picker -} - -type baseBalancer struct { - id string - policy picker.Policy - name string - lg *zap.Logger - - mu sync.RWMutex - - addrToSc map[resolver.Address]balancer.SubConn - scToAddr map[balancer.SubConn]resolver.Address - scToSt map[balancer.SubConn]grpcconnectivity.State - - currentConn balancer.ClientConn - connectivityRecorder connectivity.Recorder - - picker picker.Picker -} - -// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface. -// gRPC sends initial or updated resolved addresses from "Build". -func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { - if err != nil { - bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err)) - return - } - bb.lg.Info("resolved", - zap.String("picker", bb.picker.String()), - zap.String("balancer-id", bb.id), - zap.Strings("addresses", addrsToStrings(addrs)), - ) - - bb.mu.Lock() - defer bb.mu.Unlock() - - resolved := make(map[resolver.Address]struct{}) - for _, addr := range addrs { - resolved[addr] = struct{}{} - if _, ok := bb.addrToSc[addr]; !ok { - sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{}) - if err != nil { - bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr)) - continue - } - bb.lg.Info("created subconn", zap.String("address", addr.Addr)) - bb.addrToSc[addr] = sc - bb.scToAddr[sc] = addr - bb.scToSt[sc] = grpcconnectivity.Idle - sc.Connect() - } - } - - for addr, sc := range bb.addrToSc { - if _, ok := resolved[addr]; !ok { - // was removed by resolver or failed to create subconn - bb.currentConn.RemoveSubConn(sc) - delete(bb.addrToSc, addr) - - bb.lg.Info( - "removed subconn", - zap.String("picker", bb.picker.String()), - zap.String("balancer-id", bb.id), - zap.String("address", addr.Addr), - zap.String("subconn", scToString(sc)), - ) - - // Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown. - // The entry will be deleted in HandleSubConnStateChange. - // (DO NOT) delete(bb.scToAddr, sc) - // (DO NOT) delete(bb.scToSt, sc) - } - } -} - -// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface. -func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) { - bb.mu.Lock() - defer bb.mu.Unlock() - - old, ok := bb.scToSt[sc] - if !ok { - bb.lg.Warn( - "state change for an unknown subconn", - zap.String("picker", bb.picker.String()), - zap.String("balancer-id", bb.id), - zap.String("subconn", scToString(sc)), - zap.Int("subconn-size", len(bb.scToAddr)), - zap.String("state", s.String()), - ) - return - } - - bb.lg.Info( - "state changed", - zap.String("picker", bb.picker.String()), - zap.String("balancer-id", bb.id), - zap.Bool("connected", s == grpcconnectivity.Ready), - zap.String("subconn", scToString(sc)), - zap.Int("subconn-size", len(bb.scToAddr)), - zap.String("address", bb.scToAddr[sc].Addr), - zap.String("old-state", old.String()), - zap.String("new-state", s.String()), - ) - - bb.scToSt[sc] = s - switch s { - case grpcconnectivity.Idle: - sc.Connect() - case grpcconnectivity.Shutdown: - // When an address was removed by resolver, b called RemoveSubConn but - // kept the sc's state in scToSt. Remove state for this sc here. - delete(bb.scToAddr, sc) - delete(bb.scToSt, sc) - } - - oldAggrState := bb.connectivityRecorder.GetCurrentState() - bb.connectivityRecorder.RecordTransition(old, s) - - // Update balancer picker when one of the following happens: - // - this sc became ready from not-ready - // - this sc became not-ready from ready - // - the aggregated state of balancer became TransientFailure from non-TransientFailure - // - the aggregated state of balancer became non-TransientFailure from TransientFailure - if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) || - (bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) { - bb.updatePicker() - } - - bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker) -} - -func (bb *baseBalancer) updatePicker() { - if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure { - bb.picker = picker.NewErr(balancer.ErrTransientFailure) - bb.lg.Info( - "updated picker to transient error picker", - zap.String("picker", bb.picker.String()), - zap.String("balancer-id", bb.id), - zap.String("policy", bb.policy.String()), - ) - return - } - - // only pass ready subconns to picker - scToAddr := make(map[balancer.SubConn]resolver.Address) - for addr, sc := range bb.addrToSc { - if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready { - scToAddr[sc] = addr - } - } - - bb.picker = picker.New(picker.Config{ - Policy: bb.policy, - Logger: bb.lg, - SubConnToResolverAddress: scToAddr, - }) - bb.lg.Info( - "updated picker", - zap.String("picker", bb.picker.String()), - zap.String("balancer-id", bb.id), - zap.String("policy", bb.policy.String()), - zap.Strings("subconn-ready", scsToStrings(scToAddr)), - zap.Int("subconn-size", len(scToAddr)), - ) -} - -// Close implements "grpc/balancer.Balancer" interface. -// Close is a nop because base balancer doesn't have internal state to clean up, -// and it doesn't need to call RemoveSubConn for the SubConns. -func (bb *baseBalancer) Close() { - // TODO -} diff --git a/clientv3/balancer/balancer_test.go b/clientv3/balancer/balancer_test.go deleted file mode 100644 index 3eea2b77956..00000000000 --- a/clientv3/balancer/balancer_test.go +++ /dev/null @@ -1,323 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package balancer - -import ( - "context" - "fmt" - "strings" - "testing" - - "go.etcd.io/etcd/clientv3/balancer/picker" - "go.etcd.io/etcd/clientv3/balancer/resolver/endpoint" - pb "go.etcd.io/etcd/etcdserver/etcdserverpb" - "go.etcd.io/etcd/pkg/mock/mockserver" - - "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/peer" - "google.golang.org/grpc/status" -) - -// TestRoundRobinBalancedResolvableNoFailover ensures that -// requests to a resolvable endpoint can be balanced between -// multiple, if any, nodes. And there needs be no failover. -func TestRoundRobinBalancedResolvableNoFailover(t *testing.T) { - testCases := []struct { - name string - serverCount int - reqN int - network string - }{ - {name: "rrBalanced_1", serverCount: 1, reqN: 5, network: "tcp"}, - {name: "rrBalanced_1_unix_sockets", serverCount: 1, reqN: 5, network: "unix"}, - {name: "rrBalanced_3", serverCount: 3, reqN: 7, network: "tcp"}, - {name: "rrBalanced_5", serverCount: 5, reqN: 10, network: "tcp"}, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ms, err := mockserver.StartMockServersOnNetwork(tc.serverCount, tc.network) - if err != nil { - t.Fatalf("failed to start mock servers: %v", err) - } - defer ms.Stop() - - var eps []string - for _, svr := range ms.Servers { - eps = append(eps, svr.ResolverAddress().Addr) - } - - rsv, err := endpoint.NewResolverGroup("nofailover") - if err != nil { - t.Fatal(err) - } - defer rsv.Close() - rsv.SetEndpoints(eps) - - name := genName() - cfg := Config{ - Policy: picker.RoundrobinBalanced, - Name: name, - Logger: zap.NewExample(), - } - RegisterBuilder(cfg) - conn, err := grpc.Dial(fmt.Sprintf("endpoint://nofailover/*"), grpc.WithInsecure(), grpc.WithBalancerName(name)) - if err != nil { - t.Fatalf("failed to dial mock server: %v", err) - } - defer conn.Close() - cli := pb.NewKVClient(conn) - - reqFunc := func(ctx context.Context) (picked string, err error) { - var p peer.Peer - _, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p)) - if p.Addr != nil { - picked = p.Addr.String() - } - return picked, err - } - - _, picked, err := warmupConnections(reqFunc, tc.serverCount, "") - if err != nil { - t.Fatalf("Unexpected failure %v", err) - } - - // verify that we round robin - prev, switches := picked, 0 - for i := 0; i < tc.reqN; i++ { - picked, err = reqFunc(context.Background()) - if err != nil { - t.Fatalf("#%d: unexpected failure %v", i, err) - } - if prev != picked { - switches++ - } - prev = picked - } - if tc.serverCount > 1 && switches != tc.reqN { - t.Fatalf("expected balanced loads for %d requests, got switches %d", tc.reqN, switches) - } - }) - } -} - -// TestRoundRobinBalancedResolvableFailoverFromServerFail ensures that -// loads be rebalanced while one server goes down and comes back. -func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) { - serverCount := 5 - ms, err := mockserver.StartMockServers(serverCount) - if err != nil { - t.Fatalf("failed to start mock servers: %s", err) - } - defer ms.Stop() - var eps []string - for _, svr := range ms.Servers { - eps = append(eps, svr.ResolverAddress().Addr) - } - - rsv, err := endpoint.NewResolverGroup("serverfail") - if err != nil { - t.Fatal(err) - } - defer rsv.Close() - rsv.SetEndpoints(eps) - - name := genName() - cfg := Config{ - Policy: picker.RoundrobinBalanced, - Name: name, - Logger: zap.NewExample(), - } - RegisterBuilder(cfg) - conn, err := grpc.Dial(fmt.Sprintf("endpoint://serverfail/mock.server"), grpc.WithInsecure(), grpc.WithBalancerName(name)) - if err != nil { - t.Fatalf("failed to dial mock server: %s", err) - } - defer conn.Close() - cli := pb.NewKVClient(conn) - - reqFunc := func(ctx context.Context) (picked string, err error) { - var p peer.Peer - _, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p)) - if p.Addr != nil { - picked = p.Addr.String() - } - return picked, err - } - - // stop first server, loads should be redistributed - ms.StopAt(0) - // stopped server will be transitioned into TRANSIENT_FAILURE state - // but it doesn't happen instantaneously and it can still be picked for a short period of time - // we ignore "transport is closing" in such case - available, picked, err := warmupConnections(reqFunc, serverCount-1, "transport is closing") - if err != nil { - t.Fatalf("Unexpected failure %v", err) - } - - reqN := 10 - prev, switches := picked, 0 - for i := 0; i < reqN; i++ { - picked, err = reqFunc(context.Background()) - if err != nil { - t.Fatalf("#%d: unexpected failure %v", i, err) - } - if _, ok := available[picked]; !ok { - t.Fatalf("picked unavailable address %q (available %v)", picked, available) - } - if prev != picked { - switches++ - } - prev = picked - } - if switches != reqN { - t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches) - } - - // now failed server comes back - ms.StartAt(0) - available, picked, err = warmupConnections(reqFunc, serverCount, "") - if err != nil { - t.Fatalf("Unexpected failure %v", err) - } - - prev, switches = picked, 0 - recoveredAddr, recovered := eps[0], 0 - available[recoveredAddr] = struct{}{} - - for i := 0; i < 2*reqN; i++ { - picked, err := reqFunc(context.Background()) - if err != nil { - t.Fatalf("#%d: unexpected failure %v", i, err) - } - if _, ok := available[picked]; !ok { - t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available) - } - if prev != picked { - switches++ - } - if picked == recoveredAddr { - recovered++ - } - prev = picked - } - if switches != 2*reqN { - t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches) - } - if recovered != 2*reqN/serverCount { - t.Fatalf("recovered server %q got only %d requests", recoveredAddr, recovered) - } -} - -// TestRoundRobinBalancedResolvableFailoverFromRequestFail ensures that -// loads be rebalanced while some requests are failed. -func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) { - serverCount := 5 - ms, err := mockserver.StartMockServers(serverCount) - if err != nil { - t.Fatalf("failed to start mock servers: %s", err) - } - defer ms.Stop() - var eps []string - for _, svr := range ms.Servers { - eps = append(eps, svr.ResolverAddress().Addr) - } - - rsv, err := endpoint.NewResolverGroup("requestfail") - if err != nil { - t.Fatal(err) - } - defer rsv.Close() - rsv.SetEndpoints(eps) - - name := genName() - cfg := Config{ - Policy: picker.RoundrobinBalanced, - Name: name, - Logger: zap.NewExample(), - } - RegisterBuilder(cfg) - conn, err := grpc.Dial(fmt.Sprintf("endpoint://requestfail/mock.server"), grpc.WithInsecure(), grpc.WithBalancerName(name)) - if err != nil { - t.Fatalf("failed to dial mock server: %s", err) - } - defer conn.Close() - cli := pb.NewKVClient(conn) - - reqFunc := func(ctx context.Context) (picked string, err error) { - var p peer.Peer - _, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p)) - if p.Addr != nil { - picked = p.Addr.String() - } - return picked, err - } - - available, picked, err := warmupConnections(reqFunc, serverCount, "") - if err != nil { - t.Fatalf("Unexpected failure %v", err) - } - - reqN := 20 - prev, switches := "", 0 - for i := 0; i < reqN; i++ { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - if i%2 == 0 { - cancel() - } - picked, err = reqFunc(ctx) - if i%2 == 0 { - if s, ok := status.FromError(err); ok && s.Code() != codes.Canceled { - t.Fatalf("#%d: expected %v, got %v", i, context.Canceled, err) - } - continue - } - if _, ok := available[picked]; !ok { - t.Fatalf("#%d: picked unavailable address %q (available %v)", i, picked, available) - } - if prev != picked { - switches++ - } - prev = picked - } - if switches != reqN/2 { - t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches) - } -} - -type reqFuncT = func(ctx context.Context) (picked string, err error) - -func warmupConnections(reqFunc reqFuncT, serverCount int, ignoreErr string) (map[string]struct{}, string, error) { - var picked string - var err error - available := make(map[string]struct{}) - // cycle through all peers to indirectly verify that balancer subconn list is fully loaded - // otherwise we can't reliably count switches between 'picked' peers in the test assert phase - for len(available) < serverCount { - picked, err = reqFunc(context.Background()) - if err != nil { - if ignoreErr != "" && strings.Contains(err.Error(), ignoreErr) { - // skip ignored errors - continue - } - return available, picked, err - } - available[picked] = struct{}{} - } - return available, picked, err -} diff --git a/clientv3/balancer/connectivity/connectivity.go b/clientv3/balancer/connectivity/connectivity.go deleted file mode 100644 index 5b03e0c112c..00000000000 --- a/clientv3/balancer/connectivity/connectivity.go +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright 2019 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package connectivity implements client connectivity operations. -package connectivity - -import ( - "sync" - - "go.uber.org/zap" - "google.golang.org/grpc/connectivity" -) - -// Recorder records gRPC connectivity. -type Recorder interface { - GetCurrentState() connectivity.State - RecordTransition(oldState, newState connectivity.State) -} - -// New returns a new Recorder. -func New(lg *zap.Logger) Recorder { - return &recorder{lg: lg} -} - -// recorder takes the connectivity states of multiple SubConns -// and returns one aggregated connectivity state. -// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go -type recorder struct { - lg *zap.Logger - - mu sync.RWMutex - - cur connectivity.State - - numReady uint64 // Number of addrConns in ready state. - numConnecting uint64 // Number of addrConns in connecting state. - numTransientFailure uint64 // Number of addrConns in transientFailure. -} - -func (rc *recorder) GetCurrentState() (state connectivity.State) { - rc.mu.RLock() - defer rc.mu.RUnlock() - return rc.cur -} - -// RecordTransition records state change happening in subConn and based on that -// it evaluates what aggregated state should be. -// -// - If at least one SubConn in Ready, the aggregated state is Ready; -// - Else if at least one SubConn in Connecting, the aggregated state is Connecting; -// - Else the aggregated state is TransientFailure. -// -// Idle and Shutdown are not considered. -// -// ref. https://github.com/grpc/grpc-go/blob/master/balancer/balancer.go -func (rc *recorder) RecordTransition(oldState, newState connectivity.State) { - rc.mu.Lock() - defer rc.mu.Unlock() - - for idx, state := range []connectivity.State{oldState, newState} { - updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. - switch state { - case connectivity.Ready: - rc.numReady += updateVal - case connectivity.Connecting: - rc.numConnecting += updateVal - case connectivity.TransientFailure: - rc.numTransientFailure += updateVal - default: - rc.lg.Warn("connectivity recorder received unknown state", zap.String("connectivity-state", state.String())) - } - } - - switch { // must be exclusive, no overlap - case rc.numReady > 0: - rc.cur = connectivity.Ready - case rc.numConnecting > 0: - rc.cur = connectivity.Connecting - default: - rc.cur = connectivity.TransientFailure - } -} diff --git a/clientv3/balancer/picker/doc.go b/clientv3/balancer/picker/doc.go deleted file mode 100644 index 35dabf5532f..00000000000 --- a/clientv3/balancer/picker/doc.go +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package picker defines/implements client balancer picker policy. -package picker diff --git a/clientv3/balancer/picker/err.go b/clientv3/balancer/picker/err.go deleted file mode 100644 index f4b941d6529..00000000000 --- a/clientv3/balancer/picker/err.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package picker - -import ( - "context" - - "google.golang.org/grpc/balancer" -) - -// NewErr returns a picker that always returns err on "Pick". -func NewErr(err error) Picker { - return &errPicker{p: Error, err: err} -} - -type errPicker struct { - p Policy - err error -} - -func (ep *errPicker) String() string { - return ep.p.String() -} - -func (ep *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) { - return nil, nil, ep.err -} diff --git a/clientv3/balancer/picker/picker.go b/clientv3/balancer/picker/picker.go deleted file mode 100644 index bd1a5d25e8b..00000000000 --- a/clientv3/balancer/picker/picker.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package picker - -import ( - "fmt" - - "go.uber.org/zap" - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/resolver" -) - -// Picker defines balancer Picker methods. -type Picker interface { - balancer.Picker - String() string -} - -// Config defines picker configuration. -type Config struct { - // Policy specifies etcd clientv3's built in balancer policy. - Policy Policy - - // Logger defines picker logging object. - Logger *zap.Logger - - // SubConnToResolverAddress maps each gRPC sub-connection to an address. - // Basically, it is a list of addresses that the Picker can pick from. - SubConnToResolverAddress map[balancer.SubConn]resolver.Address -} - -// Policy defines balancer picker policy. -type Policy uint8 - -const ( - // Error is error picker policy. - Error Policy = iota - - // RoundrobinBalanced balances loads over multiple endpoints - // and implements failover in roundrobin fashion. - RoundrobinBalanced - - // Custom defines custom balancer picker. - // TODO: custom picker is not supported yet. - Custom -) - -func (p Policy) String() string { - switch p { - case Error: - return "picker-error" - - case RoundrobinBalanced: - return "picker-roundrobin-balanced" - - case Custom: - panic("'custom' picker policy is not supported yet") - - default: - panic(fmt.Errorf("invalid balancer picker policy (%d)", p)) - } -} - -// New creates a new Picker. -func New(cfg Config) Picker { - switch cfg.Policy { - case Error: - panic("'error' picker policy is not supported here; use 'picker.NewErr'") - - case RoundrobinBalanced: - return newRoundrobinBalanced(cfg) - - case Custom: - panic("'custom' picker policy is not supported yet") - - default: - panic(fmt.Errorf("invalid balancer picker policy (%d)", cfg.Policy)) - } -} diff --git a/clientv3/balancer/picker/roundrobin_balanced.go b/clientv3/balancer/picker/roundrobin_balanced.go deleted file mode 100644 index e3971ecc421..00000000000 --- a/clientv3/balancer/picker/roundrobin_balanced.go +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package picker - -import ( - "context" - "sync" - - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/resolver" -) - -// newRoundrobinBalanced returns a new roundrobin balanced picker. -func newRoundrobinBalanced(cfg Config) Picker { - scs := make([]balancer.SubConn, 0, len(cfg.SubConnToResolverAddress)) - for sc := range cfg.SubConnToResolverAddress { - scs = append(scs, sc) - } - return &rrBalanced{ - p: RoundrobinBalanced, - lg: cfg.Logger, - scs: scs, - scToAddr: cfg.SubConnToResolverAddress, - } -} - -type rrBalanced struct { - p Policy - - lg *zap.Logger - - mu sync.RWMutex - next int - scs []balancer.SubConn - scToAddr map[balancer.SubConn]resolver.Address -} - -func (rb *rrBalanced) String() string { return rb.p.String() } - -// Pick is called for every client request. -func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) { - rb.mu.RLock() - n := len(rb.scs) - rb.mu.RUnlock() - if n == 0 { - return nil, nil, balancer.ErrNoSubConnAvailable - } - - rb.mu.Lock() - cur := rb.next - sc := rb.scs[cur] - picked := rb.scToAddr[sc].Addr - rb.next = (rb.next + 1) % len(rb.scs) - rb.mu.Unlock() - - rb.lg.Debug( - "picked", - zap.String("picker", rb.p.String()), - zap.String("address", picked), - zap.Int("subconn-index", cur), - zap.Int("subconn-size", n), - ) - - doneFunc := func(info balancer.DoneInfo) { - // TODO: error handling? - fss := []zapcore.Field{ - zap.Error(info.Err), - zap.String("picker", rb.p.String()), - zap.String("address", picked), - zap.Bool("success", info.Err == nil), - zap.Bool("bytes-sent", info.BytesSent), - zap.Bool("bytes-received", info.BytesReceived), - } - if info.Err == nil { - rb.lg.Debug("balancer done", fss...) - } else { - rb.lg.Warn("balancer failed", fss...) - } - } - return sc, doneFunc, nil -} diff --git a/clientv3/balancer/resolver/endpoint/endpoint.go b/clientv3/balancer/resolver/endpoint/endpoint.go deleted file mode 100644 index 86992cbe680..00000000000 --- a/clientv3/balancer/resolver/endpoint/endpoint.go +++ /dev/null @@ -1,248 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package endpoint resolves etcd entpoints using grpc targets of the form 'endpoint:///'. -package endpoint - -import ( - "context" - "fmt" - "net" - "net/url" - "strings" - "sync" - - "google.golang.org/grpc/resolver" -) - -const scheme = "endpoint" - -var ( - targetPrefix = fmt.Sprintf("%s://", scheme) - - bldr *builder -) - -func init() { - bldr = &builder{ - resolverGroups: make(map[string]*ResolverGroup), - } - resolver.Register(bldr) -} - -type builder struct { - mu sync.RWMutex - resolverGroups map[string]*ResolverGroup -} - -// NewResolverGroup creates a new ResolverGroup with the given id. -func NewResolverGroup(id string) (*ResolverGroup, error) { - return bldr.newResolverGroup(id) -} - -// ResolverGroup keeps all endpoints of resolvers using a common endpoint:/// target -// up-to-date. -type ResolverGroup struct { - mu sync.RWMutex - id string - endpoints []string - resolvers []*Resolver -} - -func (e *ResolverGroup) addResolver(r *Resolver) { - e.mu.Lock() - addrs := epsToAddrs(e.endpoints...) - e.resolvers = append(e.resolvers, r) - e.mu.Unlock() - r.cc.NewAddress(addrs) -} - -func (e *ResolverGroup) removeResolver(r *Resolver) { - e.mu.Lock() - for i, er := range e.resolvers { - if er == r { - e.resolvers = append(e.resolvers[:i], e.resolvers[i+1:]...) - break - } - } - e.mu.Unlock() -} - -// SetEndpoints updates the endpoints for ResolverGroup. All registered resolver are updated -// immediately with the new endpoints. -func (e *ResolverGroup) SetEndpoints(endpoints []string) { - addrs := epsToAddrs(endpoints...) - e.mu.Lock() - e.endpoints = endpoints - for _, r := range e.resolvers { - r.cc.NewAddress(addrs) - } - e.mu.Unlock() -} - -// Target constructs a endpoint target using the endpoint id of the ResolverGroup. -func (e *ResolverGroup) Target(endpoint string) string { - return Target(e.id, endpoint) -} - -// Target constructs a endpoint resolver target. -func Target(id, endpoint string) string { - return fmt.Sprintf("%s://%s/%s", scheme, id, endpoint) -} - -// IsTarget checks if a given target string in an endpoint resolver target. -func IsTarget(target string) bool { - return strings.HasPrefix(target, "endpoint://") -} - -func (e *ResolverGroup) Close() { - bldr.close(e.id) -} - -// Build creates or reuses an etcd resolver for the etcd cluster name identified by the authority part of the target. -func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - if len(target.Authority) < 1 { - return nil, fmt.Errorf("'etcd' target scheme requires non-empty authority identifying etcd cluster being routed to") - } - id := target.Authority - es, err := b.getResolverGroup(id) - if err != nil { - return nil, fmt.Errorf("failed to build resolver: %v", err) - } - r := &Resolver{ - endpointID: id, - cc: cc, - } - es.addResolver(r) - return r, nil -} - -func (b *builder) newResolverGroup(id string) (*ResolverGroup, error) { - b.mu.RLock() - _, ok := b.resolverGroups[id] - b.mu.RUnlock() - if ok { - return nil, fmt.Errorf("Endpoint already exists for id: %s", id) - } - - es := &ResolverGroup{id: id} - b.mu.Lock() - b.resolverGroups[id] = es - b.mu.Unlock() - return es, nil -} - -func (b *builder) getResolverGroup(id string) (*ResolverGroup, error) { - b.mu.RLock() - es, ok := b.resolverGroups[id] - b.mu.RUnlock() - if !ok { - return nil, fmt.Errorf("ResolverGroup not found for id: %s", id) - } - return es, nil -} - -func (b *builder) close(id string) { - b.mu.Lock() - delete(b.resolverGroups, id) - b.mu.Unlock() -} - -func (b *builder) Scheme() string { - return scheme -} - -// Resolver provides a resolver for a single etcd cluster, identified by name. -type Resolver struct { - endpointID string - cc resolver.ClientConn - sync.RWMutex -} - -// TODO: use balancer.epsToAddrs -func epsToAddrs(eps ...string) (addrs []resolver.Address) { - addrs = make([]resolver.Address, 0, len(eps)) - for _, ep := range eps { - _, host, _ := ParseEndpoint(ep) - addrs = append(addrs, resolver.Address{Addr: ep, ServerName: host}) - } - return addrs -} - -func (*Resolver) ResolveNow(o resolver.ResolveNowOptions) {} - -func (r *Resolver) Close() { - es, err := bldr.getResolverGroup(r.endpointID) - if err != nil { - return - } - es.removeResolver(r) -} - -// ParseEndpoint endpoint parses an endpoint of the form -// (http|https)://*|(unix|unixs)://) -// and returns a protocol ('tcp' or 'unix'), -// host (or filepath if a unix socket), -// scheme (http, https, unix, unixs). -func ParseEndpoint(endpoint string) (proto string, host string, scheme string) { - proto = "tcp" - host = endpoint - url, uerr := url.Parse(endpoint) - if uerr != nil || !strings.Contains(endpoint, "://") { - return proto, host, scheme - } - scheme = url.Scheme - - // strip scheme:// prefix since grpc dials by host - host = url.Host - switch url.Scheme { - case "http", "https": - case "unix", "unixs": - proto = "unix" - host = url.Host + url.Path - default: - proto, host = "", "" - } - return proto, host, scheme -} - -// ParseTarget parses a endpoint:/// string and returns the parsed id and endpoint. -// If the target is malformed, an error is returned. -func ParseTarget(target string) (string, string, error) { - noPrefix := strings.TrimPrefix(target, targetPrefix) - if noPrefix == target { - return "", "", fmt.Errorf("malformed target, %s prefix is required: %s", targetPrefix, target) - } - parts := strings.SplitN(noPrefix, "/", 2) - if len(parts) != 2 { - return "", "", fmt.Errorf("malformed target, expected %s:///, but got %s", scheme, target) - } - return parts[0], parts[1], nil -} - -// Dialer dials a endpoint using net.Dialer. -// Context cancelation and timeout are supported. -func Dialer(ctx context.Context, dialEp string) (net.Conn, error) { - proto, host, _ := ParseEndpoint(dialEp) - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } - dialer := &net.Dialer{} - if deadline, ok := ctx.Deadline(); ok { - dialer.Deadline = deadline - } - return dialer.DialContext(ctx, proto, host) -} diff --git a/clientv3/balancer/utils.go b/clientv3/balancer/utils.go deleted file mode 100644 index 48eb8750740..00000000000 --- a/clientv3/balancer/utils.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package balancer - -import ( - "fmt" - "net/url" - "sort" - "sync/atomic" - "time" - - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/resolver" -) - -func scToString(sc balancer.SubConn) string { - return fmt.Sprintf("%p", sc) -} - -func scsToStrings(scs map[balancer.SubConn]resolver.Address) (ss []string) { - ss = make([]string, 0, len(scs)) - for sc, a := range scs { - ss = append(ss, fmt.Sprintf("%s (%s)", a.Addr, scToString(sc))) - } - sort.Strings(ss) - return ss -} - -func addrsToStrings(addrs []resolver.Address) (ss []string) { - ss = make([]string, len(addrs)) - for i := range addrs { - ss[i] = addrs[i].Addr - } - sort.Strings(ss) - return ss -} - -func epsToAddrs(eps ...string) (addrs []resolver.Address) { - addrs = make([]resolver.Address, 0, len(eps)) - for _, ep := range eps { - u, err := url.Parse(ep) - if err != nil { - addrs = append(addrs, resolver.Address{Addr: ep, Type: resolver.Backend}) - continue - } - addrs = append(addrs, resolver.Address{Addr: u.Host, Type: resolver.Backend}) - } - return addrs -} - -var genN = new(uint32) - -func genName() string { - now := time.Now().UnixNano() - return fmt.Sprintf("%X%X", now, atomic.AddUint32(genN, 1)) -} diff --git a/clientv3/balancer/utils_test.go b/clientv3/balancer/utils_test.go deleted file mode 100644 index e58cd349576..00000000000 --- a/clientv3/balancer/utils_test.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package balancer - -import ( - "reflect" - "testing" - - "google.golang.org/grpc/resolver" -) - -func Test_epsToAddrs(t *testing.T) { - eps := []string{"https://example.com:2379", "127.0.0.1:2379"} - exp := []resolver.Address{ - {Addr: "example.com:2379", Type: resolver.Backend}, - {Addr: "127.0.0.1:2379", Type: resolver.Backend}, - } - rs := epsToAddrs(eps...) - if !reflect.DeepEqual(rs, exp) { - t.Fatalf("expected %v, got %v", exp, rs) - } -} diff --git a/clientv3/client.go b/clientv3/client.go index b943ae0f660..46c903054c1 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -18,14 +18,11 @@ import ( "context" "errors" "fmt" - "net" - "os" "strconv" "strings" "sync" "time" - "github.com/google/uuid" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -33,10 +30,9 @@ import ( "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" - "go.etcd.io/etcd/clientv3/balancer" - "go.etcd.io/etcd/clientv3/balancer/picker" - "go.etcd.io/etcd/clientv3/balancer/resolver/endpoint" "go.etcd.io/etcd/clientv3/credentials" + "go.etcd.io/etcd/clientv3/internal/endpoint" + "go.etcd.io/etcd/clientv3/internal/resolver" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/pkg/logutil" ) @@ -44,31 +40,8 @@ import ( var ( ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints") ErrOldCluster = errors.New("etcdclient: old cluster version") - - roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String()) ) -func init() { - lg := zap.NewNop() - if os.Getenv("ETCD_CLIENT_DEBUG") != "" { - lcfg := logutil.DefaultZapLoggerConfig - lcfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel) - - var err error - lg, err = lcfg.Build() // info level logging - if err != nil { - panic(err) - } - } - - // TODO: support custom balancer - balancer.RegisterBuilder(balancer.Config{ - Policy: picker.RoundrobinBalanced, - Name: roundRobinBalancerName, - Logger: lg, - }) -} - // Client provides and manages an etcd v3 client session. type Client struct { Cluster @@ -80,10 +53,10 @@ type Client struct { conn *grpc.ClientConn - cfg Config - creds grpccredentials.TransportCredentials - resolverGroup *endpoint.ResolverGroup - mu *sync.RWMutex + cfg Config + creds grpccredentials.TransportCredentials + resolver *resolver.EtcdManualResolver + mu *sync.RWMutex ctx context.Context cancel context.CancelFunc @@ -153,9 +126,6 @@ func (c *Client) Close() error { if c.Lease != nil { c.Lease.Close() } - if c.resolverGroup != nil { - c.resolverGroup.Close() - } if c.conn != nil { return toErr(c.ctx, c.conn.Close()) } @@ -182,7 +152,8 @@ func (c *Client) SetEndpoints(eps ...string) { c.mu.Lock() defer c.mu.Unlock() c.cfg.Endpoints = eps - c.resolverGroup.SetEndpoints(eps) + + c.resolver.SetEndpoints(eps) } // Sync synchronizes client's endpoints with the known endpoints from the etcd membership. @@ -215,29 +186,12 @@ func (c *Client) autoSync() { err := c.Sync(ctx) cancel() if err != nil && err != c.ctx.Err() { - lg.Lvl(4).Infof("Auto sync endpoints failed: %v", err) + c.lg.Info("Auto sync endpoints failed.", zap.Error(err)) } } } } -func (c *Client) processCreds(scheme string) (creds grpccredentials.TransportCredentials) { - creds = c.creds - switch scheme { - case "unix": - case "http": - creds = nil - case "https", "unixs": - if creds != nil { - break - } - creds = credentials.NewBundle(credentials.Config{}).TransportCredentials() - default: - creds = nil - } - return creds -} - // dialSetupOpts gives the dial opts prior to any authentication. func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) { if c.cfg.DialKeepAliveTime > 0 { @@ -250,13 +204,12 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts } opts = append(opts, dopts...) - dialer := endpoint.Dialer if creds != nil { opts = append(opts, grpc.WithTransportCredentials(creds)) } else { opts = append(opts, grpc.WithInsecure()) } - opts = append(opts, grpc.WithContextDialer(dialer)) + grpc.WithDisableRetry() // Interceptor retry and backoff. // TODO: Replace all of clientv3/retry.go with interceptor based retry, or with @@ -275,15 +228,11 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts // Dial connects to a single endpoint using the client's config. func (c *Client) Dial(ep string) (*grpc.ClientConn, error) { - creds, err := c.directDialCreds(ep) - if err != nil { - return nil, err - } - // Use the grpc passthrough resolver to directly dial a single endpoint. - // This resolver passes through the 'unix' and 'unixs' endpoints schemes used - // by etcd without modification, allowing us to directly dial endpoints and - // using the same dial functions that we use for load balancer dialing. - return c.dial(fmt.Sprintf("passthrough:///%s", ep), creds) + creds := c.credentialsForEndpoint(ep) + + // Using ad-hoc created resolver, to guarantee only explicitly given + // endpoint is used. + return c.dial(creds, grpc.WithResolvers(resolver.New(ep))) } func (c *Client) getToken(ctx context.Context) error { @@ -307,19 +256,17 @@ func (c *Client) getToken(ctx context.Context) error { // dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host // of the provided endpoint determines the scheme used for all endpoints of the client connection. func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { - _, host, _ := endpoint.ParseEndpoint(ep) - target := c.resolverGroup.Target(host) - creds := c.dialWithBalancerCreds(ep) - return c.dial(target, creds, dopts...) + creds := c.credentialsForEndpoint(ep) + opts := append(dopts, grpc.WithResolvers(c.resolver)) + return c.dial(creds, opts...) } // dial configures and dials any grpc balancer target. -func (c *Client) dial(target string, creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { +func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { opts, err := c.dialSetupOpts(creds, dopts...) if err != nil { return nil, fmt.Errorf("failed to configure dialer: %v", err) } - if c.Username != "" && c.Password != "" { c.authTokenBundle = credentials.NewBundle(credentials.Config{}) opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials())) @@ -334,43 +281,21 @@ func (c *Client) dial(target string, creds grpccredentials.TransportCredentials, defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options? } - conn, err := grpc.DialContext(dctx, target, opts...) + conn, err := grpc.DialContext(dctx, c.resolver.Scheme()+":///", opts...) if err != nil { return nil, err } return conn, nil } -func (c *Client) directDialCreds(ep string) (grpccredentials.TransportCredentials, error) { - _, host, scheme := endpoint.ParseEndpoint(ep) - creds := c.creds - if len(scheme) != 0 { - creds = c.processCreds(scheme) - if creds != nil { - clone := creds.Clone() - // Set the server name must to the endpoint hostname without port since grpc - // otherwise attempts to check if x509 cert is valid for the full endpoint - // including the scheme and port, which fails. - overrideServerName, _, err := net.SplitHostPort(host) - if err != nil { - // Either the host didn't have a port or the host could not be parsed. Either way, continue with the - // original host string. - overrideServerName = host - } - clone.OverrideServerName(overrideServerName) - creds = clone - } +func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials { + if c.creds != nil { + return c.creds } - return creds, nil -} - -func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCredentials { - _, _, scheme := endpoint.ParseEndpoint(ep) - creds := c.creds - if len(scheme) != 0 { - creds = c.processCreds(scheme) + if endpoint.RequiresCredentials(ep) { + return credentials.NewBundle(credentials.Config{}).TransportCredentials() } - return creds + return nil } func newClient(cfg *Config) (*Client, error) { @@ -432,14 +357,7 @@ func newClient(cfg *Config) (*Client, error) { client.callOpts = callOpts } - // Prepare a 'endpoint:///' resolver for the client and create a endpoint target to pass - // to dial so the client knows to use this resolver. - client.resolverGroup, err = endpoint.NewResolverGroup(fmt.Sprintf("client-%s", uuid.New().String())) - if err != nil { - client.cancel() - return nil, err - } - client.resolverGroup.SetEndpoints(cfg.Endpoints) + client.resolver = resolver.New(cfg.Endpoints...) if len(cfg.Endpoints) < 1 { return nil, fmt.Errorf("at least one Endpoint must is required in client config") @@ -448,10 +366,10 @@ func newClient(cfg *Config) (*Client, error) { // Use a provided endpoint target so that for https:// without any tls config given, then // grpc will assume the certificate server name is the endpoint host. - conn, err := client.dialWithBalancer(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName)) + conn, err := client.dialWithBalancer(dialEndpoint) if err != nil { client.cancel() - client.resolverGroup.Close() + client.resolver.Close() return nil, err } // TODO: With the old grpc balancer interface, we waited until the dial timeout diff --git a/clientv3/credentials/credentials.go b/clientv3/credentials/credentials.go index f3121a1c7eb..1dd5817f699 100644 --- a/clientv3/credentials/credentials.go +++ b/clientv3/credentials/credentials.go @@ -22,8 +22,9 @@ import ( "net" "sync" - "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" grpccredentials "google.golang.org/grpc/credentials" + + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" ) // Config defines gRPC credential configuration. diff --git a/clientv3/internal/endpoint/endpoint.go b/clientv3/internal/endpoint/endpoint.go new file mode 100644 index 00000000000..c0a2f9acbfa --- /dev/null +++ b/clientv3/internal/endpoint/endpoint.go @@ -0,0 +1,68 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endpoint + +import ( + "net/url" + "regexp" +) + +var ( + STRIP_PORT_REGEXP = regexp.MustCompile("(.*):([0-9]+)") +) + +func stripPort(ep string) string { + return STRIP_PORT_REGEXP.ReplaceAllString(ep, "$1") +} + +func translateEndpoint(ep string) (addr string, serverName string, requireCreds bool) { + url, err := url.Parse(ep) + if err != nil { + return ep, stripPort(ep), false + } + switch url.Scheme { + case "http", "https": + return url.Host, url.Hostname(), url.Scheme == "https" + case "unix", "unixs": + requireCreds = url.Scheme == "unixs" + if url.Opaque != "" { + return "unix:" + url.Opaque, stripPort(url.Opaque), requireCreds + } else if url.Path != "" { + return "unix://" + url.Host + url.Path, url.Host + url.Path, requireCreds + } else { + return "unix:" + url.Host, url.Hostname(), requireCreds + } + case "": + return url.Host + url.Path, url.Host + url.Path, false + default: + return ep, stripPort(ep), false + } +} + +// RequiresCredentials returns whether given endpoint requires +// credentials/certificates for connection. +func RequiresCredentials(ep string) bool { + _, _, requireCreds := translateEndpoint(ep) + return requireCreds +} + +// Interpret endpoint parses an endpoint of the form +// (http|https)://*|(unix|unixs)://) +// and returns low-level address (supported by 'net') to connect to, +// and a server name used for x509 certificate matching. +func Interpret(ep string) (address string, serverName string) { + addr, serverName, _ := translateEndpoint(ep) + return addr, serverName +} diff --git a/clientv3/internal/endpoint/endpoint_test.go b/clientv3/internal/endpoint/endpoint_test.go new file mode 100644 index 00000000000..6eb810cfae9 --- /dev/null +++ b/clientv3/internal/endpoint/endpoint_test.go @@ -0,0 +1,65 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endpoint + +import ( + "testing" +) + +func TestInterpret(t *testing.T) { + tests := []struct { + endpoint string + wantAddress string + wantServerName string + }{ + {"127.0.0.1", "127.0.0.1", "127.0.0.1"}, + {"localhost", "localhost", "localhost"}, + {"localhost:8080", "localhost:8080", "localhost"}, + + {"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1"}, + {"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"}, + + {"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1"}, + {"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"}, + + {"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1"}, + {"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"}, + {"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1"}, + {"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"}, + + {"http://127.0.0.1", "127.0.0.1", "127.0.0.1"}, + {"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1"}, + {"https://127.0.0.1", "127.0.0.1", "127.0.0.1"}, + {"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1"}, + {"https://localhost:20000", "localhost:20000", "localhost"}, + + {"unix:///tmp/abc", "unix:///tmp/abc", "/tmp/abc"}, + {"unixs:///tmp/abc", "unix:///tmp/abc", "/tmp/abc"}, + {"etcd.io", "etcd.io", "etcd.io"}, + {"http://etcd.io/abc", "etcd.io", "etcd.io"}, + {"dns://something-other", "dns://something-other", "dns://something-other"}, + } + for _, tt := range tests { + t.Run(tt.endpoint, func(t *testing.T) { + gotAddress, gotServerName := Interpret(tt.endpoint) + if gotAddress != tt.wantAddress { + t.Errorf("Interpret() gotAddress = %v, want %v", gotAddress, tt.wantAddress) + } + if gotServerName != tt.wantServerName { + t.Errorf("Interpret() gotServerName = %v, want %v", gotServerName, tt.wantServerName) + } + }) + } +} diff --git a/clientv3/internal/resolver/resolver.go b/clientv3/internal/resolver/resolver.go new file mode 100644 index 00000000000..6ca7b051c6e --- /dev/null +++ b/clientv3/internal/resolver/resolver.go @@ -0,0 +1,71 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resolver + +import ( + "go.etcd.io/etcd/clientv3/internal/endpoint" + + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" +) + +// EtcdManualResolver is a Resolver (and resolver.Builder) that can be updated +// using SetEndpoints. +type EtcdManualResolver struct { + *manual.Resolver + endpoints []string + serviceConfig *serviceconfig.ParseResult +} + +func New(endpoints ...string) *EtcdManualResolver { + r := manual.NewBuilderWithScheme("etcd-endpoints") + return &EtcdManualResolver{Resolver: r, endpoints: endpoints, serviceConfig: nil} +} + +// Build returns itself for Resolver, because it's both a builder and a resolver. +func (r *EtcdManualResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + r.serviceConfig = cc.ParseServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) + if r.serviceConfig.Err != nil { + return nil, r.serviceConfig.Err + } + res, err := r.Resolver.Build(target, cc, opts) + if err != nil { + return nil, err + } + // Populates endpoints stored in r into ClientConn (cc). + r.updateState() + return res, nil +} + +func (r *EtcdManualResolver) SetEndpoints(endpoints []string) { + r.endpoints = endpoints + r.updateState() +} + +func (r EtcdManualResolver) updateState() { + if r.CC != nil { + addresses := make([]resolver.Address, len(r.endpoints)) + for i, ep := range r.endpoints { + addr, serverName := endpoint.Interpret(ep) + addresses[i] = resolver.Address{Addr: addr, ServerName: serverName} + } + state := resolver.State{ + Addresses: addresses, + ServiceConfig: r.serviceConfig, + } + r.UpdateState(state) + } +} diff --git a/go.mod b/go.mod index bf8835cfd3a..a1ea8b564d2 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,6 @@ require ( github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 github.com/golang/protobuf v1.4.3 github.com/google/btree v1.0.0 - github.com/google/uuid v1.0.0 github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/grpc-gateway v1.11.0 diff --git a/go.sum b/go.sum index 442bbb1ce93..79e29ddb62b 100644 --- a/go.sum +++ b/go.sum @@ -76,8 +76,6 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA= -github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 h1:z53tR0945TRRQO/fLEVPI6SMv7ZflF0TEaTAoU7tOzg=