Skip to content

Commit

Permalink
Merge pull request #14 from Xorlev/streaming
Browse files Browse the repository at this point in the history
Server Streaming RPC support
  • Loading branch information
Xorlev committed Jan 1, 2018
2 parents 2ffa419 + 7202d45 commit 8a022db
Show file tree
Hide file tree
Showing 16 changed files with 551 additions and 80 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ service TestService {
body: "*";
};
}
rpc StreamMethod1 (TestRequest) returns (stream TestResponse) {
option (google.api.http).get = "/stream/{s}";
}
}
message TestRequest {
string id = 1;
Expand Down Expand Up @@ -210,8 +213,9 @@ Short-term roadmap:
- [ ] Performance tests
- [ ] Generic/pluggable error handling
- [ ] Supporting streaming RPCs
- [ ] Server streaming
- [X] Server streaming
- [ ] Client streaming
- [ ] BiDi streaming (true bidi streaming is impossible without websockets)

## Build Process

Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ project(":jersey-rpc-support") {
compile "io.grpc:grpc-protobuf:${grpcVersion}"
compile "io.grpc:grpc-stub:${grpcVersion}"
compile "javax.ws.rs:javax.ws.rs-api:2.0.1"
provided "org.glassfish.jersey.core:jersey-server:2.25"
}

protobuf {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import com.fullcontact.rpc.TestRequest;
import com.fullcontact.rpc.TestResponse;
import com.fullcontact.rpc.TestServiceGrpc;

import com.google.rpc.DebugInfo;
import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;

/**
Expand Down Expand Up @@ -47,4 +48,35 @@ public void testMethod6(TestRequest request, StreamObserver<TestResponse> respon
responseObserver.onNext(TestResponse.newBuilder().setRequest(request).build());
responseObserver.onCompleted();
}

@Override
public void streamMethod1(TestRequest request, StreamObserver<TestResponse> responseObserver) {
for(int i = 0; i < request.getInt3(); i++) {
responseObserver.onNext(TestResponse.newBuilder().setRequest(request).build());

try {
Thread.sleep(100);
}
catch(InterruptedException e) {
e.printStackTrace();
}
}

if (request.getS().equals("explode")) {
responseObserver.onError(new IllegalStateException("Explode called."));
} else if (request.getS().equals("grpc_data_loss")) {
Metadata metadata = new Metadata();
metadata.put(GrpcErrorUtil.DEBUG_INFO_KEY,
DebugInfo.newBuilder().setDetail("test2").build());

io.grpc.Status status = io.grpc.Status.DATA_LOSS
.withCause(new IllegalStateException("Grue detected."))
.withDescription("Fail-fast: Grue found in write-path.")
.augmentDescription("test");

responseObserver.onError(status.asRuntimeException(metadata));
} else {
responseObserver.onCompleted();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
import com.fullcontact.rpc.TestResponse;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.util.JsonFormat;
import com.google.rpc.Status;
import io.dropwizard.testing.junit.ResourceTestRule;
import org.assertj.core.util.Strings;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import javax.ws.rs.client.Entity;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -286,4 +291,136 @@ public void testPost__nestedBindingYaml() throws Exception {

assertThat(response.getRequest().getNt()).isEqualTo(request);
}

@Test
public void testStreamGet() throws Exception {
InputStream response = resources().getJerseyTest()
.target("/stream/hello")
.queryParam("d", 1234.5678)
.queryParam("enu", "SECOND")
.queryParam("int3", "10")
.queryParam("x", "y")
.queryParam("nt.f1", "abcd")
.request()
.buildGet()
.invoke(InputStream.class);

BufferedReader reader = new BufferedReader(new InputStreamReader(response));

int count = 0;
long now = System.currentTimeMillis();
while(true) {
String json = reader.readLine();

if(Strings.isNullOrEmpty(json))
break;

TestResponse.Builder responseFromJson = TestResponse.newBuilder();
JsonFormat.parser().merge(json, responseFromJson);
TestResponse r = responseFromJson.build();

assertThat(r.getRequest().getS()).isEqualTo("hello");
assertThat(r.getRequest().getInt3()).isEqualTo(10);
assertThat(r.getRequest().getD()).isEqualTo(1234.5678);
assertThat(r.getRequest().getEnu()).isEqualTo(TestEnum.SECOND);
assertThat(r.getRequest().getNt().getF1()).isEqualTo("abcd");

count++;

long after = System.currentTimeMillis();
long duration = after - now;

// This might be flaky, but we want to ensure that we're actually streaming
assertThat(duration).isLessThan(1000/2);
now = after;
}

assertThat(count).isEqualTo(10);
}

@Test
public void testStreamGetStatusError() throws Exception {
InputStream response = resources().getJerseyTest()
.target("/stream/grpc_data_loss")
.queryParam("d", 1234.5678)
.queryParam("enu", "SECOND")
.queryParam("int3", "10")
.queryParam("x", "y")
.queryParam("nt.f1", "abcd")
.request()
.buildGet()
.invoke(InputStream.class);

BufferedReader reader = new BufferedReader(new InputStreamReader(response));

// int3 controls "successful" messages. Next request will throw.
for (int i = 0; i < 10; i++) {
String json = reader.readLine();

if(Strings.isNullOrEmpty(json))
break;

TestResponse.Builder responseFromJson = TestResponse.newBuilder();
JsonFormat.parser().merge(json, responseFromJson);
TestResponse r = responseFromJson.build();

assertThat(r.getRequest().getS()).isEqualTo("grpc_data_loss");
}

String json = reader.readLine();
Status.Builder statusBuilder = Status.newBuilder();
JsonFormat.parser().merge(json, statusBuilder);

// As expected, Status loses "cause" and "details" after transmission.
// Normally, details would be set, but JsonFormat doesn't support serializing Any.
Status expected = Status
.newBuilder()
.setCode(15)
.setMessage("HTTP 500 (gRPC: DATA_LOSS): Fail-fast: Grue found in write-path.\ntest")
.build();

assertThat(statusBuilder.build()).isEqualTo(expected);
}

@Test
public void testStreamGetUnhandledError() throws Exception {
InputStream response = resources().getJerseyTest()
.target("/stream/explode")
.queryParam("d", 1234.5678)
.queryParam("enu", "SECOND")
.queryParam("int3", "10")
.queryParam("x", "y")
.queryParam("nt.f1", "abcd")
.request()
.buildGet()
.invoke(InputStream.class);

BufferedReader reader = new BufferedReader(new InputStreamReader(response));

// int3 controls "successful" messages. Next request will throw.
for (int i = 0; i < 10; i++) {
String json = reader.readLine();

if(Strings.isNullOrEmpty(json))
break;

TestResponse.Builder responseFromJson = TestResponse.newBuilder();
JsonFormat.parser().merge(json, responseFromJson);
TestResponse r = responseFromJson.build();

assertThat(r.getRequest().getS()).isEqualTo("explode");
}

String json = reader.readLine();
Status.Builder statusBuilder = Status.newBuilder();
JsonFormat.parser().merge(json, statusBuilder);

Status expected = Status
.newBuilder()
.setCode(2)
.setMessage("HTTP 500 (gRPC: UNKNOWN)")
.build();

assertThat(statusBuilder.build()).isEqualTo(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
public class ProxyIntegration extends IntegrationBase {
private static Server server = InProcessServerBuilder.forName("TestService")
.addService(new EchoTestService())
.directExecutor()
.build();

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.dropwizard.Configuration;
import io.dropwizard.setup.Environment;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;

Expand All @@ -25,7 +24,6 @@ public static void main(String[] args) throws Exception {
public void run(Configuration configuration, Environment environment) throws Exception {
Server server = InProcessServerBuilder.forName("TestService")
.addService(new EchoTestService())
.directExecutor()
.build();
server.start();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.fullcontact.rpc.jersey;

import javax.ws.rs.container.AsyncResponse;
import java.io.IOException;
import java.util.Optional;

/**
* Registry for (currently) JVM-global gRPC error handlers. This allows users to override the error handling
* methodology without patching the library.
*
* This class should be considered unstable, a more comprehensive plugin mechanism will be built in the future.
*/
public final class ErrorHandler {
private static GrpcJerseyErrorHandler errorHandler = new GrpcJerseyErrorHandler.Default();

private ErrorHandler() {}

static void handleUnaryError(Throwable t, AsyncResponse response) {
errorHandler.handleUnaryError(t, response);
}

static Optional<String> handleStreamingError(Throwable t) throws IOException {
return errorHandler.handleStreamingError(t);
}

/**
* Overrides the default error handler on a global basis. Beware, this shouldn't be done after requests start.
*/
public static void setErrorHandler(GrpcJerseyErrorHandler errorHandler) {
ErrorHandler.errorHandler = errorHandler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
* @author Michael Rose (xorlev)
*/
public class GrpcErrorUtil {
private static final Metadata.Key<DebugInfo> DEBUG_INFO_KEY = ProtoUtils.keyForProto(DebugInfo.getDefaultInstance());
private static final Metadata.Key<RetryInfo> RETRY_INFO_KEY = ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());
public static final Metadata.Key<DebugInfo> DEBUG_INFO_KEY = ProtoUtils.keyForProto(DebugInfo.getDefaultInstance());
public static final Metadata.Key<RetryInfo> RETRY_INFO_KEY = ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());

public static int grpcToHttpStatus(io.grpc.Status status) {
switch(status.getCode()) {
Expand Down Expand Up @@ -79,12 +79,15 @@ public static GrpcError throwableToStatus(Throwable t) {

com.google.rpc.Status.Builder payload = com.google.rpc.Status.newBuilder();
payload.setCode(status.getCode().value());
payload.setMessage(status.getCode().name());

if(status.getDescription() != null) {
payload.setMessage(payload.getMessage() + ": " + Strings.nullToEmpty(status.getDescription()));
StringBuilder errorMessage = new StringBuilder("HTTP " + statusCode + " (gRPC: "+status.getCode().name()+")");

if(!Strings.isNullOrEmpty(status.getDescription())) {
errorMessage.append(": ").append(Strings.nullToEmpty(status.getDescription()));
}

payload.setMessage(errorMessage.toString());

if(trailer != null) {
if(trailer.containsKey(RETRY_INFO_KEY)) {
RetryInfo retryInfo = trailer.get(RETRY_INFO_KEY);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.fullcontact.rpc.jersey;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.Status;

import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.Optional;

/**
* Pluggable error handler used by the {@link JerseyUnaryObserver} and {@link JerseyStreamingObserver}.
*/
public interface GrpcJerseyErrorHandler {
/**
* Handles an exception raised in a unary (request/response) RPC handler.
*
* @param t throwable raised
* @param response JAX-RS AsyncResponse, can call cancel() or resume() with a string payload or {@link Response}.
*/
void handleUnaryError(Throwable t, AsyncResponse response);

/**
* Handles an exception raised in a server streaming RPC handler. As HTTP/1.1 practically doesn't support trailers,
* there isn't a real way to signal well-formed errors except via another streaming payload.
*
* @param t throwable raised.
* @return Literal string, if you want JSON-encoded data use the {@link JsonHandler#streamPrinter()} to
* retain server-sent events compatibility. Return {@link Optional#empty()} to silently abort.
* @throws IOException usually if serialization of errors break.
*/
Optional<String> handleStreamingError(Throwable t) throws IOException;

class Default implements GrpcJerseyErrorHandler {

@Override
public void handleUnaryError(Throwable t, AsyncResponse asyncResponse) {
if(t instanceof InvalidProtocolBufferException) {
asyncResponse.resume(Response.status(Response.Status.BAD_REQUEST).entity(t.getMessage()).build());
} else {
asyncResponse.resume(GrpcErrorUtil.createJerseyResponse(t));
}
}

@Override
public Optional<String> handleStreamingError(Throwable t) throws InvalidProtocolBufferException {
Status grpcError = GrpcErrorUtil.throwableToStatus(t).getPayload();

// JsonFormat doesn't support serializing Any.
if (!grpcError.getDetailsList().isEmpty()) {
grpcError = grpcError.toBuilder().clearDetails().build();
}

return Optional.of(JsonHandler.streamPrinter().print(grpcError));
}
}
}
Loading

0 comments on commit 8a022db

Please sign in to comment.