Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC transport initial work with crossdock testing #863

Merged
merged 101 commits into from
Apr 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
101 commits
Select commit Hold shift + click to select a range
fea2579
Initial grpc code
bufdev Mar 27, 2017
8ccec60
More grpc work
bufdev Mar 27, 2017
840241a
More grpc work
bufdev Mar 27, 2017
c0c9838
Copy in noopCodec from grpc branch
bufdev Mar 27, 2017
f20daac
More grpc work
bufdev Mar 27, 2017
4af2e29
More grpc work
bufdev Mar 27, 2017
9cb61f4
More grpc work
bufdev Mar 27, 2017
87d286f
More grpc work
bufdev Mar 27, 2017
0c58468
Merge branch 'dev' into pgrpc
bufdev Mar 27, 2017
2613453
Merge branch 'dev' into pgrpc
bufdev Mar 27, 2017
8fdfd51
Fix golint
bufdev Mar 27, 2017
328a2cb
Merge branch 'dev' into pgrpc
bufdev Mar 28, 2017
82a0b02
More grpc work
bufdev Mar 28, 2017
48c4329
Custom codec
bufdev Mar 28, 2017
13b13f2
Merge branch 'dev' into pgrpc
bufdev Mar 29, 2017
77acfec
Generate grpc for protobuf example
bufdev Mar 29, 2017
904c1f2
Add TransportTypeGRPC
bufdev Mar 29, 2017
30740b3
Add grpc ClientConn to testutils
bufdev Mar 29, 2017
47cefc5
Update protobuf example input
bufdev Mar 29, 2017
d3937f3
Fix
bufdev Mar 29, 2017
54735d0
FQSN
bufdev Mar 29, 2017
d1803f7
fix
bufdev Mar 29, 2017
3e38dfe
fix
bufdev Mar 29, 2017
45bf89c
Allow switching example to grpc client
bufdev Mar 29, 2017
bec7415
more logging
bufdev Mar 29, 2017
1589afa
More work
bufdev Mar 29, 2017
2b77918
Transport writer
bufdev Mar 29, 2017
07f2c92
Merge
bufdev Mar 29, 2017
17d99d5
Fix golint
bufdev Mar 29, 2017
d763d72
Commit
bufdev Mar 29, 2017
cc8c00a
Commit
bufdev Mar 29, 2017
c96fc1d
Commit
bufdev Mar 29, 2017
e43151e
Working
bufdev Mar 29, 2017
99c0e26
Merge
bufdev Mar 29, 2017
f9ac97b
Merge branch 'dev' into pgrpc
bufdev Mar 30, 2017
ec85413
Commit
bufdev Mar 30, 2017
1ebc702
Fixes
bufdev Mar 30, 2017
87a4135
Options on protobuf example
bufdev Mar 30, 2017
0ce6d0d
Tests working
bufdev Mar 30, 2017
7fb63ef
Tests working
bufdev Mar 30, 2017
9e8fc53
More testing
bufdev Mar 30, 2017
1dec7c5
Make yarpcmeta procedures work with grpc
bufdev Mar 30, 2017
d223951
Make generate
bufdev Mar 30, 2017
ab09c97
Fix lint
bufdev Mar 30, 2017
ae2a78c
Fix lint
bufdev Mar 30, 2017
92c7315
Basic grpc oneway implementation
bufdev Mar 30, 2017
2a66063
Grpc oneway working
bufdev Mar 30, 2017
45f341a
Merge
bufdev Mar 30, 2017
a8de846
Add grpc to example testing
bufdev Mar 30, 2017
e3de61f
Merge
bufdev Mar 30, 2017
fae985f
More example tests
bufdev Mar 30, 2017
1e2ac25
Fix lint
bufdev Mar 30, 2017
0808d2c
Add grpc-opentracing and glide update
bufdev Mar 31, 2017
8a59b68
Add opentracing to grpc
bufdev Mar 31, 2017
dd7f754
Merge branch 'dev' into pgrpc
bufdev Mar 31, 2017
5466aae
Merge
bufdev Apr 1, 2017
01c1c08
Merge
bufdev Apr 2, 2017
d12b770
Merge
bufdev Apr 2, 2017
b88fc2f
Merge
bufdev Apr 2, 2017
552ffdf
Merge
bufdev Apr 2, 2017
b820f6d
Merge
bufdev Apr 2, 2017
0179178
Merge
bufdev Apr 4, 2017
c82f69f
Address Wills comments
bufdev Apr 4, 2017
70e38bb
Merge branch 'dev' into pgrpc
bufdev Apr 5, 2017
87cf60e
Add tracer as an option for both grpc inbound and outbound
bufdev Apr 5, 2017
293b14a
Remove proto.Message from custom codec
bufdev Apr 5, 2017
6ceae8a
TestGetServiceDescs
bufdev Apr 6, 2017
daef54e
Blank import protobuf example in protobuf testing package
bufdev Apr 6, 2017
922da87
Gofmt
bufdev Apr 6, 2017
3ad0cef
Procedure name tests
bufdev Apr 6, 2017
bcec6ce
Add zap wrapper for grpclog
bufdev Apr 6, 2017
1e4d169
Fix golint
bufdev Apr 6, 2017
f1c57aa
Crossdock echo protobuf files
bufdev Apr 6, 2017
32ed9ce
Update crossdock proto files
bufdev Apr 6, 2017
48a227d
Basic protobuf crossdock tests
bufdev Apr 6, 2017
c9d1e73
Crossdock grpc testing
bufdev Apr 6, 2017
d6d663d
Start preparing crossdock for grpc oneway testing
bufdev Apr 6, 2017
f908f8b
Add more headers for grpc and add test for procedure names
bufdev Apr 6, 2017
9f56edf
Options
bufdev Apr 7, 2017
7ec7ed1
Crossdock oneway grpc but ctx propagation failing
bufdev Apr 7, 2017
8242208
Do WhenRunning check in grpc outbound
bufdev Apr 7, 2017
d608ee7
Do not run oneway context propagation crossdock test on grpc
bufdev Apr 7, 2017
51c4fb4
Merge branch 'dev' into pgrpc
bufdev Apr 7, 2017
65d9cff
Merge branch 'dev' into pgrpc
bufdev Apr 8, 2017
d61fd0f
Rename methodHandler to handler
bufdev Apr 8, 2017
70b259e
Have grpc Inbound take a listener instead of an address
bufdev Apr 8, 2017
b159f50
Fix lint
bufdev Apr 10, 2017
1c500d3
Merge
bufdev Apr 11, 2017
ac5cd33
Add comment about response headers
bufdev Apr 11, 2017
9e8d197
Rename CreateTransport to CreateDispatcherForTransport
bufdev Apr 11, 2017
210af50
Comment about fully-qualified protobuf names
bufdev Apr 11, 2017
3d9f351
Link to issue about grpc codec
bufdev Apr 11, 2017
fb9f0e6
Add comment on grpc package
bufdev Apr 11, 2017
5529d7a
More fixes
bufdev Apr 11, 2017
e0b13f4
Fix issues
bufdev Apr 11, 2017
2cc2444
Fixes
bufdev Apr 11, 2017
725b8cb
Merge
bufdev Apr 12, 2017
c3d649e
Remove GRPC variable that controls GRPC inbound in crossdock
bufdev Apr 12, 2017
9980462
Update FooTransport to FooForTransport in crossdock client
bufdev Apr 12, 2017
00d8552
Merge
bufdev Apr 18, 2017
f3ffdef
Remove grpc log code
bufdev Apr 18, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile.crossdock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM golang:1.8.1

EXPOSE 8080-8088
EXPOSE 8080-8090
ENV SUPPRESS_DOCKER 1
WORKDIR /go/src/go.uber.org/yarpc
ADD dockercrossdockdeps.mk /go/src/go.uber.org/yarpc/
Expand Down
3 changes: 2 additions & 1 deletion build/local.mk
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Paths besides auto-detected generated files that should be excluded from
# lint results.
LINT_EXCLUDES_EXTRAS =
LINT_EXCLUDES_EXTRAS = \
transport/x/grpc/handler.go

# Regex for 'go vet' rules to ignore
GOVET_IGNORE_RULES = \
Expand Down
12 changes: 9 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ services:
- AXIS_HTTPSERVER=go
- AXIS_CLIENT_ONEWAY=go
- AXIS_SERVER_ONEWAY=go
- AXIS_TRANSPORT_ONEWAY=http,redis,cherami
- AXIS_TRANSPORT_ONEWAY=grpc,http,redis,cherami
- AXIS_TRANSPORT_ONEWAY_CTXPROPAGATION=http,redis,cherami
- AXIS_GO_ENCODING=raw,json,thrift,protobuf
- AXIS_GO_CLIENT=go
- AXIS_GO_SERVER=go

# Transports available to the ctxpropagation behavior for multihop
# requests.
Expand All @@ -45,6 +49,8 @@ services:
- BEHAVIOR_JSON=client,server,transport
- SKIP_JSON=client:java+transport:tchannel,server:java+transport:tchannel
- BEHAVIOR_THRIFT=client,server,transport
- BEHAVIOR_PROTOBUF=go_client,go_server,transport
- BEHAVIOR_GRPC=go_client,go_server,go_encoding
- SKIP_THRIFT=client:java+transport:tchannel,server:java+transport:tchannel
- BEHAVIOR_HEADERS=client,server,transport,encoding
- SKIP_HEADERS=client:java+transport:tchannel,server:java+transport:tchannel
Expand All @@ -70,7 +76,7 @@ services:
- BEHAVIOR_CTXPROPAGATION=ctxclient,ctxserver,transport,ctxavailabletransports
- BEHAVIOR_APACHETHRIFT=apachethriftclient,apachethriftserver
- BEHAVIOR_ONEWAY=client_oneway,server_oneway,transport_oneway,encoding
- BEHAVIOR_ONEWAY_CTXPROPAGATION=client_oneway,server_oneway,transport_oneway
- BEHAVIOR_ONEWAY_CTXPROPAGATION=client_oneway,server_oneway,transport_oneway_ctxpropagation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task to consolidate context propagation over grpc oneway #904

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack


- REPORT=compact

Expand All @@ -83,7 +89,7 @@ services:
context: .
dockerfile: Dockerfile.crossdock
ports:
- "8080-8088"
- "8080-8090"
environment:
- REDIS=enabled
- CHERAMI=enabled
Expand Down
19 changes: 18 additions & 1 deletion encoding/x/protobuf/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ func (u *unaryHandler) Handle(ctx context.Context, transportRequest *transport.R
}
}
response, appErr := u.handle(ctx, request)
if appErr != nil {
responseWriter.SetApplicationError()
}
if err := call.WriteToResponse(responseWriter); err != nil {
return err
}
Expand All @@ -78,9 +81,23 @@ func (u *unaryHandler) Handle(ctx context.Context, transportRequest *transport.R
}
responseData = protoBuffer.Bytes()
}
// We have to detect if our transport requires a raw response
// It is not possible to propagate this information on ctx with the current API
// we we attach this in the relevant transport (currently only gRPC) on the headers
// If we are sending a raw response back to a YARPC client, it needs to understand
// this is happening, so we attach the headers on the response as well
// Other clients (namely the existing gRPC clients outside of YARPC) understand
// that the response is the raw response.
if isRawResponse(transportRequest.Headers) {
responseWriter.AddHeaders(getRawResponseHeaders())
_, err := responseWriter.Write(responseData)
if err != nil {
return err
}
return appErr
}
var wireError *wirepb.Error
if appErr != nil {
responseWriter.SetApplicationError()
wireError = &wirepb.Error{
appErr.Error(),
}
Expand Down
9 changes: 9 additions & 0 deletions encoding/x/protobuf/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ func (c *client) Call(
if responseData == nil {
return nil, nil
}
// TODO: the error from Call will be the application error, we might
// also have a response returned however
if isRawResponse(transportResponse.Headers) {
response := newResponse()
if err := proto.Unmarshal(responseData, response); err != nil {
return nil, encoding.ResponseBodyDecodeError(transportRequest, err)
}
return response, nil
}
wireResponse := &wirepb.Response{}
if err := proto.Unmarshal(responseData, wireResponse); err != nil {
return nil, encoding.ResponseBodyDecodeError(transportRequest, err)
Expand Down
11 changes: 8 additions & 3 deletions encoding/x/protobuf/protoc-gen-yarpc-go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ package main
import (
"fmt"
"log"
"strings"
"text/template"

"go.uber.org/yarpc/internal/protoplugin"
Expand Down Expand Up @@ -79,7 +80,7 @@ type {{$service.GetName}}YarpcClient interface {

// New{{$service.GetName}}YarpcClient builds a new yarpc client for the {{$service.GetName}} service.
func New{{$service.GetName}}YarpcClient(clientConfig transport.ClientConfig) {{$service.GetName}}YarpcClient {
return &_{{$service.GetName}}YarpcCaller{protobuf.NewClient("{{$service.GetName}}", clientConfig)}
return &_{{$service.GetName}}YarpcCaller{protobuf.NewClient("{{trimPrefixPeriod $service.FQSN}}", clientConfig)}
}

// {{$service.GetName}}YarpcServer is the yarpc server-side interface for the {{$service.GetName}} service.
Expand All @@ -94,7 +95,7 @@ type {{$service.GetName}}YarpcServer interface {
func Build{{$service.GetName}}YarpcProcedures(server {{$service.GetName}}YarpcServer) []transport.Procedure {
handler := &_{{$service.GetName}}YarpcHandler{server}
return protobuf.BuildProcedures(
"{{$service.GetName}}",
"{{trimPrefixPeriod $service.FQSN}}",
map[string]transport.UnaryHandler{
{{range $method := unaryMethods $service}}"{{$method.GetName}}": protobuf.NewUnaryHandler(handler.{{$method.GetName}}, new{{$service.GetName}}_{{$method.GetName}}YarpcRequest),
{{end}}
Expand Down Expand Up @@ -181,7 +182,7 @@ var (
{{end}}
`

var funcMap = template.FuncMap{"unaryMethods": unaryMethods, "onewayMethods": onewayMethods}
var funcMap = template.FuncMap{"unaryMethods": unaryMethods, "onewayMethods": onewayMethods, "trimPrefixPeriod": trimPrefixPeriod}

func main() {
if err := protoplugin.Run(
Expand Down Expand Up @@ -230,3 +231,7 @@ func onewayMethods(service *protoplugin.Service) ([]*protoplugin.Method, error)
}
return methods, nil
}

func trimPrefixPeriod(s string) string {
return strings.TrimPrefix(s, ".")
}
4 changes: 2 additions & 2 deletions encoding/x/protobuf/testing/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func benchmarkIntegrationForTransportType(b *testing.B, transportType testutils.
transportType,
keyValueYarpcServer,
sinkYarpcServer,
func(keyValueYarpcClient examplepb.KeyValueYarpcClient, sinkYarpcClient examplepb.SinkYarpcClient) error {
benchmarkIntegration(b, keyValueYarpcClient, sinkYarpcClient, keyValueYarpcServer, sinkYarpcServer)
func(clients *example.Clients) error {
benchmarkIntegration(b, clients.KeyValueYarpcClient, clients.SinkYarpcClient, keyValueYarpcServer, sinkYarpcServer)
return nil
},
)
Expand Down
5 changes: 5 additions & 0 deletions encoding/x/protobuf/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,8 @@
// THE SOFTWARE.

package testing

import (
// this is to make sure scripts/cover.sh picks this up with .Deps
_ "go.uber.org/yarpc/internal/examples/protobuf/example"
)
56 changes: 38 additions & 18 deletions encoding/x/protobuf/testing/testing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"testing"
"time"

"go.uber.org/yarpc/encoding/x/protobuf"
"go.uber.org/yarpc/internal/examples/protobuf/example"
"go.uber.org/yarpc/internal/examples/protobuf/examplepb"
"go.uber.org/yarpc/internal/testutils"
Expand All @@ -50,8 +49,8 @@ func testIntegrationForTransportType(t *testing.T, transportType testutils.Trans
transportType,
keyValueYarpcServer,
sinkYarpcServer,
func(keyValueYarpcClient examplepb.KeyValueYarpcClient, sinkYarpcClient examplepb.SinkYarpcClient) error {
testIntegration(t, keyValueYarpcClient, sinkYarpcClient, keyValueYarpcServer, sinkYarpcServer)
func(clients *example.Clients) error {
testIntegration(t, clients, keyValueYarpcServer, sinkYarpcServer)
return nil
},
),
Expand All @@ -60,37 +59,41 @@ func testIntegrationForTransportType(t *testing.T, transportType testutils.Trans

func testIntegration(
t *testing.T,
keyValueYarpcClient examplepb.KeyValueYarpcClient,
sinkYarpcClient examplepb.SinkYarpcClient,
clients *example.Clients,
keyValueYarpcServer *example.KeyValueYarpcServer,
sinkYarpcServer *example.SinkYarpcServer,
) {
_, err := getValue(keyValueYarpcClient, "foo")
_, err := getValue(clients.KeyValueYarpcClient, "foo")
assert.Error(t, err)
_, err = getValueGRPC(clients.KeyValueGRPCClient, "foo")
assert.Error(t, err)
assert.NotNil(t, protobuf.GetApplicationError(err))

assert.NoError(t, setValue(keyValueYarpcClient, "foo", "bar"))
value, err := getValue(keyValueYarpcClient, "foo")
assert.NoError(t, setValue(clients.KeyValueYarpcClient, "foo", "bar"))
value, err := getValue(clients.KeyValueYarpcClient, "foo")
assert.NoError(t, err)
assert.Equal(t, "bar", value)

assert.NoError(t, setValue(keyValueYarpcClient, "foo", ""))
_, err = getValue(keyValueYarpcClient, "foo")
assert.NoError(t, setValueGRPC(clients.KeyValueGRPCClient, "foo", "barGRPC"))
value, err = getValueGRPC(clients.KeyValueGRPCClient, "foo")
assert.NoError(t, err)
assert.Equal(t, "barGRPC", value)

assert.NoError(t, setValue(clients.KeyValueYarpcClient, "foo", ""))
_, err = getValue(clients.KeyValueYarpcClient, "foo")
assert.Error(t, err)
assert.NotNil(t, protobuf.GetApplicationError(err))

assert.NoError(t, setValue(keyValueYarpcClient, "foo", "baz"))
assert.NoError(t, setValue(keyValueYarpcClient, "baz", "bat"))
value, err = getValue(keyValueYarpcClient, "foo")
assert.NoError(t, setValue(clients.KeyValueYarpcClient, "foo", "baz"))
assert.NoError(t, setValue(clients.KeyValueYarpcClient, "baz", "bat"))
value, err = getValue(clients.KeyValueYarpcClient, "foo")
assert.NoError(t, err)
assert.Equal(t, "baz", value)
value, err = getValue(keyValueYarpcClient, "baz")
value, err = getValue(clients.KeyValueYarpcClient, "baz")
assert.NoError(t, err)
assert.Equal(t, "bat", value)

assert.NoError(t, fire(sinkYarpcClient, "foo"))
assert.NoError(t, fire(clients.SinkYarpcClient, "foo"))
assert.NoError(t, sinkYarpcServer.WaitFireDone())
assert.NoError(t, fire(sinkYarpcClient, "bar"))
assert.NoError(t, fire(clients.SinkYarpcClient, "bar"))
assert.NoError(t, sinkYarpcServer.WaitFireDone())
assert.Equal(t, []string{"foo", "bar"}, sinkYarpcServer.Values())
}
Expand All @@ -112,6 +115,23 @@ func setValue(keyValueYarpcClient examplepb.KeyValueYarpcClient, key string, val
return err
}

func getValueGRPC(keyValueGRPCClient examplepb.KeyValueClient, key string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
response, err := keyValueGRPCClient.GetValue(ctx, &examplepb.GetValueRequest{key})
if err != nil {
return "", err
}
return response.Value, nil
}

func setValueGRPC(keyValueGRPCClient examplepb.KeyValueClient, key string, value string) error {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
_, err := keyValueGRPCClient.SetValue(ctx, &examplepb.SetValueRequest{key, value})
return err
}

func fire(sinkYarpcClient examplepb.SinkYarpcClient, value string) error {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
Expand Down
36 changes: 27 additions & 9 deletions encoding/x/protobuf/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,26 @@ import (
"github.com/gogo/protobuf/proto"
)

// Encoding is the name of this encoding.
const Encoding transport.Encoding = "protobuf"
const (
// Encoding is the name of this encoding.
Encoding transport.Encoding = "protobuf"

// GetApplicationError returns the application error from the server, if present.
rawResponseHeaderKey = "yarpc-protobuf-raw-response"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming that we don't need to express application errors in the response body for Protobuf.

I’m proposing an alternative to this header #905

Whether the body is present can be implied by an Rpc-Error header with the name of the error case. The name of the error case is in the domain of the procedure definition and must have low cardinality. Over Thrift it has clear semantics, but can be crafted by hand for other transports.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a larger discussion we need to have before moving gRPC out of x/ as to how transports will understand application errors, as not all encodings are able to specify application errors in the same manner as thrift, leaving this for now.

)

// SetRawResponse will set rawResponseHeaderKey to "true".
//
// TODO: this has overlap with IsApplicationError
func GetApplicationError(err error) error {
if applicationError, ok := err.(*applicationError); ok {
return applicationError
}
return nil
// rawResponseHeaderKey is a header key attached to either a request or
// response that signals a UnaryHandler to not encode an application error
// inside a wirepb.Response object, instead marshalling the actual response.
//
// Note per the documentation on transport.Headers#With, the returned Header
// may not be the same as the input header, so the caller should always
// update the header with:
//
// header = protobuf.SetRawResponse(header)
func SetRawResponse(headers transport.Headers) transport.Headers {
return headers.With(rawResponseHeaderKey, "1")
}

// ***all below functions should only be called by generated code***
Expand Down Expand Up @@ -118,3 +127,12 @@ func NewOnewayHandler(
func CastError(expectedType proto.Message, actualType proto.Message) error {
return fmt.Errorf("expected proto.Message to have type %T but had type %T", expectedType, actualType)
}

func isRawResponse(headers transport.Headers) bool {
rawResponse, ok := headers.Get(rawResponseHeaderKey)
return ok && rawResponse == "1"
}

func getRawResponseHeaders() transport.Headers {
return SetRawResponse(transport.Headers{})
}
38 changes: 35 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading