From 80e380eff4edbfdacb4be1ae7d92c772400b2159 Mon Sep 17 00:00:00 2001 From: longxboy Date: Fri, 26 Mar 2021 04:08:24 +0800 Subject: [PATCH] balancer/base: keep address attributes for pickers (#4253) --- balancer/base/balancer.go | 29 +++++++++++------- balancer/base/balancer_test.go | 54 +++++++++++++++++++++++++++++++++- 2 files changed, 72 insertions(+), 11 deletions(-) diff --git a/balancer/base/balancer.go b/balancer/base/balancer.go index 383d02ec2bf5..c883efa0bbf5 100644 --- a/balancer/base/balancer.go +++ b/balancer/base/balancer.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" + "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" @@ -41,7 +42,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) cc: cc, pickerBuilder: bb.pickerBuilder, - subConns: make(map[resolver.Address]balancer.SubConn), + subConns: make(map[resolver.Address]subConnInfo), scStates: make(map[balancer.SubConn]connectivity.State), csEvltr: &balancer.ConnectivityStateEvaluator{}, config: bb.config, @@ -57,6 +58,11 @@ func (bb *baseBuilder) Name() string { return bb.name } +type subConnInfo struct { + subConn balancer.SubConn + attrs *attributes.Attributes +} + type baseBalancer struct { cc balancer.ClientConn pickerBuilder PickerBuilder @@ -64,7 +70,7 @@ type baseBalancer struct { csEvltr *balancer.ConnectivityStateEvaluator state connectivity.State - subConns map[resolver.Address]balancer.SubConn // `attributes` is stripped from the keys of this map (the addresses) + subConns map[resolver.Address]subConnInfo // `attributes` is stripped from the keys of this map (the addresses) scStates map[balancer.SubConn]connectivity.State picker balancer.Picker config Config @@ -114,7 +120,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { aNoAttrs := a aNoAttrs.Attributes = nil addrsSet[aNoAttrs] = struct{}{} - if sc, ok := b.subConns[aNoAttrs]; !ok { + if scInfo, ok := b.subConns[aNoAttrs]; !ok { // a is a new address (not existing in b.subConns). // // When creating SubConn, the original address with attributes is @@ -125,7 +131,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err) continue } - b.subConns[aNoAttrs] = sc + b.subConns[aNoAttrs] = subConnInfo{subConn: sc, attrs: a.Attributes} b.scStates[sc] = connectivity.Idle sc.Connect() } else { @@ -135,13 +141,15 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { // The SubConn does a reflect.DeepEqual of the new and old // addresses. So this is a noop if the current address is the same // as the old one (including attributes). - b.cc.UpdateAddresses(sc, []resolver.Address{a}) + scInfo.attrs = a.Attributes + b.subConns[aNoAttrs] = scInfo + b.cc.UpdateAddresses(scInfo.subConn, []resolver.Address{a}) } } - for a, sc := range b.subConns { + for a, scInfo := range b.subConns { // a was removed by resolver. if _, ok := addrsSet[a]; !ok { - b.cc.RemoveSubConn(sc) + b.cc.RemoveSubConn(scInfo.subConn) delete(b.subConns, a) // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. // The entry will be deleted in UpdateSubConnState. @@ -184,9 +192,10 @@ func (b *baseBalancer) regeneratePicker() { readySCs := make(map[balancer.SubConn]SubConnInfo) // Filter out all ready SCs from full subConn map. - for addr, sc := range b.subConns { - if st, ok := b.scStates[sc]; ok && st == connectivity.Ready { - readySCs[sc] = SubConnInfo{Address: addr} + for addr, scInfo := range b.subConns { + if st, ok := b.scStates[scInfo.subConn]; ok && st == connectivity.Ready { + addr.Attributes = scInfo.attrs + readySCs[scInfo.subConn] = SubConnInfo{Address: addr} } } b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs}) diff --git a/balancer/base/balancer_test.go b/balancer/base/balancer_test.go index 03114251a048..f8ff8cf98444 100644 --- a/balancer/base/balancer_test.go +++ b/balancer/base/balancer_test.go @@ -23,6 +23,7 @@ import ( "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/resolver" ) @@ -35,12 +36,24 @@ func (c *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewS return c.newSubConn(addrs, opts) } +func (c *testClientConn) UpdateState(balancer.State) {} + type testSubConn struct{} func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {} func (sc *testSubConn) Connect() {} +// testPickBuilder creates balancer.Picker for test. +type testPickBuilder struct { + validate func(info PickerBuildInfo) +} + +func (p *testPickBuilder) Build(info PickerBuildInfo) balancer.Picker { + p.validate(info) + return nil +} + func TestBaseBalancerStripAttributes(t *testing.T) { b := (&baseBuilder{}).Build(&testClientConn{ newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) { @@ -64,7 +77,46 @@ func TestBaseBalancerStripAttributes(t *testing.T) { for addr := range b.subConns { if addr.Attributes != nil { - t.Errorf("in b.subConns, got address %+v with nil attributes, want not nil", addr) + t.Errorf("in b.subConns, got address %+v with not nil attributes, want nil", addr) + } + } +} + +func TestBaseBalancerReserveAttributes(t *testing.T) { + var v = func(info PickerBuildInfo) { + for _, sc := range info.ReadySCs { + if sc.Address.Addr == "1.1.1.1" { + if sc.Address.Attributes == nil { + t.Errorf("in picker.validate, got address %+v with nil attributes, want not nil", sc.Address) + } + foo, ok := sc.Address.Attributes.Value("foo").(string) + if !ok || foo != "2233niang" { + t.Errorf("in picker.validate, got address[1.1.1.1] with invalid attributes value %v, want 2233niang", sc.Address.Attributes.Value("foo")) + } + } else if sc.Address.Addr == "2.2.2.2" { + if sc.Address.Attributes != nil { + t.Error("in b.subConns, got address[2.2.2.2] with not nil attributes, want nil") + } + } } } + pickBuilder := &testPickBuilder{validate: v} + b := (&baseBuilder{pickerBuilder: pickBuilder}).Build(&testClientConn{ + newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) { + return &testSubConn{}, nil + }, + }, balancer.BuildOptions{}).(*baseBalancer) + + b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + {Addr: "1.1.1.1", Attributes: attributes.New("foo", "2233niang")}, + {Addr: "2.2.2.2", Attributes: nil}, + }, + }, + }) + + for sc := range b.scStates { + b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready, ConnectionError: nil}) + } }