Skip to content

Commit

Permalink
grpc
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Gahlot <gauravgahlot0107@gmail.com>
  • Loading branch information
gauravgahlot committed Nov 5, 2020
1 parent 3092a7d commit e622408
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 14 deletions.
13 changes: 9 additions & 4 deletions balancer_switching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ func checkPickFirst(cc *ClientConn, servers []*server) error {
err error
)
connected := false
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for i := 0; i < 5000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); errorDesc(err) == servers[0].port {
if err = cc.Invoke(ctx, "/foo/bar", &req, &reply); errorDesc(err) == servers[0].port {
if connected {
// connected is set to false if peer is not server[0]. So if
// connected is true here, this is the second time we saw
Expand All @@ -100,9 +102,10 @@ func checkPickFirst(cc *ClientConn, servers []*server) error {
if !connected {
return fmt.Errorf("pickfirst is not in effect after 5 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port)
}

// The following RPCs should all succeed with the first server.
for i := 0; i < 3; i++ {
err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
err = cc.Invoke(ctx, "/foo/bar", &req, &reply)
if errorDesc(err) != servers[0].port {
return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[0].port, err)
}
Expand All @@ -117,14 +120,16 @@ func checkRoundRobin(cc *ClientConn, servers []*server) error {
err error
)

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Make sure connections to all servers are up.
for i := 0; i < 2; i++ {
// Do this check twice, otherwise the first RPC's transport may still be
// picked by the closing pickfirst balancer, and the test becomes flaky.
for _, s := range servers {
var up bool
for i := 0; i < 5000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); errorDesc(err) == s.port {
if err = cc.Invoke(ctx, "/foo/bar", &req, &reply); errorDesc(err) == s.port {
up = true
break
}
Expand All @@ -138,7 +143,7 @@ func checkRoundRobin(cc *ClientConn, servers []*server) error {

serverCount := len(servers)
for i := 0; i < 3*serverCount; i++ {
err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
err = cc.Invoke(ctx, "/foo/bar", &req, &reply)
if errorDesc(err) != servers[i%serverCount].port {
return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[i%serverCount].port, err)
}
Expand Down
26 changes: 19 additions & 7 deletions call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ var (
canceled = 0
)

const defaultTestTimeout = 10 * time.Second

type testCodec struct {
}

Expand Down Expand Up @@ -237,7 +239,8 @@ func (s) TestUnaryClientInterceptor(t *testing.T) {
}()

var reply string
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
parentCtx := context.WithValue(ctx, ctxKey("parentKey"), 0)
if err := cc.Invoke(parentCtx, "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
Expand Down Expand Up @@ -305,7 +308,8 @@ func (s) TestChainUnaryClientInterceptor(t *testing.T) {
}()

var reply string
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
parentCtx := context.WithValue(ctx, ctxKey("parentKey"), 0)
if err := cc.Invoke(parentCtx, "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse+"321" {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
Expand Down Expand Up @@ -346,7 +350,8 @@ func (s) TestChainOnBaseUnaryClientInterceptor(t *testing.T) {
}()

var reply string
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
parentCtx := context.WithValue(ctx, ctxKey("parentKey"), 0)
if err := cc.Invoke(parentCtx, "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
Expand Down Expand Up @@ -407,7 +412,8 @@ func (s) TestChainStreamClientInterceptor(t *testing.T) {
server.stop()
}()

ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
parentCtx := context.WithValue(ctx, ctxKey("parentKey"), 0)
_, err := cc.NewStream(parentCtx, &StreamDesc{}, "/foo/bar")
if err != nil {
Expand All @@ -418,7 +424,9 @@ func (s) TestChainStreamClientInterceptor(t *testing.T) {
func (s) TestInvoke(t *testing.T) {
server, cc := setUp(t, 0, math.MaxUint32)
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
}
cc.Close()
Expand All @@ -429,7 +437,9 @@ func (s) TestInvokeLargeErr(t *testing.T) {
server, cc := setUp(t, 0, math.MaxUint32)
var reply string
req := "hello"
err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
err := cc.Invoke(ctx, "/foo/bar", &req, &reply)
if _, ok := status.FromError(err); !ok {
t.Fatalf("grpc.Invoke(_, _, _, _, _) receives non rpc error.")
}
Expand All @@ -445,7 +455,9 @@ func (s) TestInvokeErrorSpecialChars(t *testing.T) {
server, cc := setUp(t, 0, math.MaxUint32)
var reply string
req := "weird error"
err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
err := cc.Invoke(ctx, "/foo/bar", &req, &reply)
if _, ok := status.FromError(err); !ok {
t.Fatalf("grpc.Invoke(_, _, _, _, _) receives non rpc error.")
}
Expand Down
10 changes: 7 additions & 3 deletions internal/transport/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ import (
)

const (
envTestAddr = "1.2.3.4:8080"
envProxyAddr = "2.3.4.5:7687"
envTestAddr = "1.2.3.4:8080"
envProxyAddr = "2.3.4.5:7687"
defaultTestTimeout = 10 * time.Second
)

// overwriteAndRestore overwrite function httpProxyFromEnvironment and
Expand Down Expand Up @@ -210,8 +211,11 @@ func (s) TestMapAddressEnv(t *testing.T) {
}
defer overwrite(hpfe)()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// envTestAddr should be handled by ProxyFromEnvironment.
got, err := mapAddress(context.Background(), envTestAddr)
got, err := mapAddress(ctx, envTestAddr)
if err != nil {
t.Error(err)
}
Expand Down

0 comments on commit e622408

Please sign in to comment.