Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal: fix client send preface problems #2380

Merged
merged 9 commits into from
Oct 18, 2018
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,10 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
prefaceReceived := make(chan struct{})
onCloseCalled := make(chan struct{})

var prefaceMu sync.Mutex
var serverPrefaceReceived bool
var clientPrefaceWrote bool

onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
Expand Down Expand Up @@ -1100,11 +1104,18 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts

// TODO(deklerk): optimization; does anyone else actually use this lock? maybe we can just remove it for this scope
ac.mu.Lock()
ac.successfulHandshake = true
ac.backoffDeadline = time.Time{}
ac.connectDeadline = time.Time{}
ac.addrIdx = 0
ac.backoffIdx = 0

prefaceMu.Lock()
serverPrefaceReceived = true
if clientPrefaceWrote {
jeanbza marked this conversation as resolved.
Show resolved Hide resolved
ac.successfulHandshake = true
ac.backoffDeadline = time.Time{}
ac.connectDeadline = time.Time{}
ac.addrIdx = 0
ac.backoffIdx = 0
}
prefaceMu.Unlock()

ac.mu.Unlock()
}

Expand All @@ -1117,6 +1128,13 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)

if err == nil {
prefaceMu.Lock()
clientPrefaceWrote = true
if serverPrefaceReceived {
ac.successfulHandshake = true
}
prefaceMu.Unlock()

if ac.dopts.waitForHandshake {
select {
case <-prefaceTimer.C:
Expand Down Expand Up @@ -1160,8 +1178,6 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts

return errConnClosing
}
ac.updateConnectivityState(connectivity.TransientFailure)
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.mu.Unlock()
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)

Expand Down
135 changes: 111 additions & 24 deletions clientconn_state_transition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package grpc
import (
"net"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -29,6 +30,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
)
Expand All @@ -41,76 +43,135 @@ func init() {
balancer.Register(testBalancer)
}

// These tests use a pipeListener. This listener is similar to net.Listener except that it is unbuffered, so each read
// and write will wait for the other side's corresponding write or read.
func TestStateTransitions_SingleAddress(t *testing.T) {
defer leakcheck.Check(t)

mctBkp := getMinConnectTimeout()
defer func() {
atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(mctBkp))
}()
atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(time.Millisecond)*100)

for _, test := range []struct {
name string
desc string
want []connectivity.State
server func(net.Listener)
server func(net.Listener) net.Conn
}{
// When the server returns server preface, the client enters READY.
{
name: "ServerEntersReadyOnPrefaceReceipt",
desc: "When the server returns server preface, the client enters READY.",
want: []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
},
server: func(lis net.Listener) {
server: func(lis net.Listener) net.Conn {
conn, err := lis.Accept()
if err != nil {
t.Error(err)
return
return nil
}

go keepReading(conn)

framer := http2.NewFramer(conn, conn)
if err := framer.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("Error while writing settings frame. %v", err)
return
return nil
}

return conn
},
},
// When the connection is closed, the client enters TRANSIENT FAILURE.
{
name: "ServerEntersTransientFailureOnClose",
desc: "When the connection is closed, the client enters TRANSIENT FAILURE.",
want: []connectivity.State{
connectivity.Connecting,
connectivity.TransientFailure,
},
server: func(lis net.Listener) {
server: func(lis net.Listener) net.Conn {
conn, err := lis.Accept()
if err != nil {
t.Error(err)
return
return nil
}

conn.Close()
return nil
},
},
{
desc: `When the server sends its connection preface, but the connection dies before the client can write its
connection preface, the client enters TRANSIENT FAILURE.`,
want: []connectivity.State{
connectivity.Connecting,
connectivity.TransientFailure,
},
server: func(lis net.Listener) net.Conn {
conn, err := lis.Accept()
if err != nil {
t.Error(err)
return nil
}

framer := http2.NewFramer(conn, conn)
if err := framer.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("Error while writing settings frame. %v", err)
return nil
}

conn.Close()
return nil
},
},
{
desc: `When the server reads the client connection preface but does not send its connection preface, the
client enters TRANSIENT FAILURE.`,
want: []connectivity.State{
connectivity.Connecting,
connectivity.TransientFailure,
},
server: func(lis net.Listener) net.Conn {
conn, err := lis.Accept()
if err != nil {
t.Error(err)
return nil
}

go keepReading(conn)

return conn
},
},
} {
t.Logf("Test %s", test.name)
t.Log(test.desc)
testStateTransitionSingleAddress(t, test.want, test.server)
}
}

func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener)) {
func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) {
defer leakcheck.Check(t)

stateNotifications := make(chan connectivity.State, len(want))
testBalancer.ResetNotifier(stateNotifications)
defer close(stateNotifications)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
defer lis.Close()
pl := testutils.NewPipeListener()
defer pl.Close()

// Launch the server.
go server(lis)
var conn net.Conn
var connMu sync.Mutex
go func() {
connMu.Lock()
conn = server(pl)
connMu.Unlock()
}()

client, err := DialContext(ctx, lis.Addr().String(), WithWaitForHandshake(), WithInsecure(), WithBalancerName(stateRecordingBalancerName))
client, err := DialContext(ctx, "", WithWaitForHandshake(), WithInsecure(),
WithBalancerName(stateRecordingBalancerName), WithDialer(pl.Dialer()), withBackoff(noBackoff{}))
if err != nil {
t.Fatal(err)
}
Expand All @@ -128,6 +189,15 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s
}
}
}

connMu.Lock()
defer connMu.Unlock()
if conn != nil {
err = conn.Close()
if err != nil {
t.Fatal(err)
}
}
}

// When a READY connection is closed, the client enters TRANSIENT FAILURE before CONNECTING.
Expand All @@ -143,7 +213,6 @@ func TestStateTransition_ReadyToTransientFailure(t *testing.T) {

stateNotifications := make(chan connectivity.State, len(want))
testBalancer.ResetNotifier(stateNotifications)
defer close(stateNotifications)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
Expand All @@ -164,6 +233,8 @@ func TestStateTransition_ReadyToTransientFailure(t *testing.T) {
return
}

go keepReading(conn)

framer := http2.NewFramer(conn, conn)
if err := framer.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("Error while writing settings frame. %v", err)
Expand Down Expand Up @@ -211,7 +282,6 @@ func TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {

stateNotifications := make(chan connectivity.State, len(want))
testBalancer.ResetNotifier(stateNotifications)
defer close(stateNotifications)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
Expand Down Expand Up @@ -250,11 +320,14 @@ func TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
return
}

go keepReading(conn)

framer := http2.NewFramer(conn, conn)
if err := framer.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("Error while writing settings frame. %v", err)
return
}

close(server2Done)
}()

Expand Down Expand Up @@ -307,7 +380,6 @@ func TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {

stateNotifications := make(chan connectivity.State, len(want))
testBalancer.ResetNotifier(stateNotifications)
defer close(stateNotifications)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
Expand Down Expand Up @@ -336,6 +408,8 @@ func TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
return
}

go keepReading(conn)

framer := http2.NewFramer(conn, conn)
if err := framer.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("Error while writing settings frame. %v", err)
Expand Down Expand Up @@ -426,3 +500,16 @@ func (b *stateRecordingBalancer) Build(cc balancer.ClientConn, opts balancer.Bui
b.mu.Unlock()
return b
}

type noBackoff struct{}

func (b noBackoff) Backoff(int) time.Duration { return time.Duration(0) }

// Keep reading until something causes the connection to die (EOF, server closed, etc). Useful
// as a tool for mindlessly keeping the connection healthy, since the client will error if
// things like client prefaces are not accepted in a timely fashion.
func keepReading(conn net.Conn) {
buf := make([]byte, 1024)
for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) {
}
}
2 changes: 0 additions & 2 deletions clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,8 @@ func TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) {
// 3. The new server sends its preface.
// 4. Client doesn't kill the connection this time.
mctBkp := getMinConnectTimeout()
// Call this only after transportMonitor goroutine has ended.
defer func() {
atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(mctBkp))

}()
defer leakcheck.Check(t)
atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(time.Millisecond)*500)
Expand Down
Loading