Skip to content

Commit

Permalink
Adds v2 Call interface based on okhttp and retrofit (#1705)
Browse files Browse the repository at this point in the history
In Zipkin v1, we had a pair of interfaces, like `SpanConsumer` and
`AsyncSpanConsumer`.

For example, this would define the same thing twice:

```java
void accept(List<Span> spans);

// and

void accept(List<Span> spans, Callback<Void> spans);
```

in v2, it looks like this (inspired by OkHttp's call):

```java
Call<Void> accept(List<Span> spans);

// then make a call like this..
call = spanConsumer.accept(spans);

// to do a blocking call..
call.execute();

// to do an async call..
call.enqueue(callback);

// to cancel
call.cancel();
```

The important part of this design is that it reduces the redundant
interfaces. Also, libraries like retrofit prove that this design is
compatible with other async abstractions like guava or rxjava.

We end up making less work for users and implementors at the cost of an
intermediary type: Call.
  • Loading branch information
adriancole authored Aug 27, 2017
1 parent 65e7db8 commit ac0ba02
Show file tree
Hide file tree
Showing 14 changed files with 494 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
import okio.Buffer;
import okio.ByteString;
import zipkin.Annotation;
import zipkin.internal.v2.Call;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;
import zipkin.internal.v2.storage.SpanConsumer;
import zipkin.storage.elasticsearch.http.internal.client.HttpCall;

import static zipkin.internal.Util.propagateIfFatal;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;

class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing
class ElasticsearchHttpSpanConsumer implements SpanConsumer { // not final for testing
static final Logger LOG = Logger.getLogger(ElasticsearchHttpSpanConsumer.class.getName());

final ElasticsearchHttpStorage es;
Expand All @@ -43,22 +43,14 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
this.indexNameFormatter = es.indexNameFormatter();
}

@Override public void accept(List<Span> spans, Callback<Void> callback) {
if (spans.isEmpty()) {
callback.onSuccess(null);
return;
}
try {
BulkSpanIndexer indexer = new BulkSpanIndexer(es);
indexSpans(indexer, spans);
indexer.execute(callback);
} catch (Throwable t) {
propagateIfFatal(t);
callback.onError(t);
}
@Override public Call<Void> accept(List<Span> spans) {
if (spans.isEmpty()) return Call.create(null);
BulkSpanIndexer indexer = new BulkSpanIndexer(es);
indexSpans(indexer, spans);
return indexer.newCall();
}

void indexSpans(BulkSpanIndexer indexer, List<Span> spans) throws IOException {
void indexSpans(BulkSpanIndexer indexer, List<Span> spans) {
for (Span span : spans) {
Long spanTimestamp = span.timestamp();
long indexTimestamp = 0L; // which index to store this span into
Expand All @@ -77,7 +69,7 @@ void indexSpans(BulkSpanIndexer indexer, List<Span> spans) throws IOException {
}
}

static class BulkSpanIndexer {
static final class BulkSpanIndexer {
final HttpBulkIndexer indexer;
final IndexNameFormatter indexNameFormatter;

Expand All @@ -92,8 +84,8 @@ void add(long indexTimestamp, Span span, @Nullable Long timestampMillis) {
indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
}

void execute(Callback<Void> callback) throws IOException {
indexer.execute(callback);
HttpCall<Void> newCall() {
return indexer.newCall();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import okhttp3.RequestBody;
import okio.Buffer;
import zipkin.internal.V2StorageComponent;
import zipkin.internal.v2.storage.SpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
import zipkin.storage.StorageAdapters;
Expand Down Expand Up @@ -220,7 +221,7 @@ public final Builder dateSeparator(char dateSeparator) {
}
}

@Override protected zipkin.internal.v2.storage.AsyncSpanConsumer v2AsyncSpanConsumer() {
@Override protected SpanConsumer v2AsyncSpanConsumer() {
ensureIndexTemplates();
return new ElasticsearchHttpSpanConsumer(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,23 @@ void writeDocument(byte[] document) {
}

/** Creates a bulk request when there is more than one object to store */
void execute(Callback<Void> callback) {
HttpCall<Void> newCall(){
HttpUrl url = pipeline != null
? http.baseUrl.newBuilder("_bulk").addQueryParameter("pipeline", pipeline).build()
: http.baseUrl.resolve("_bulk");

Request request = new Request.Builder().url(url).tag(tag)
.post(RequestBody.create(APPLICATION_JSON, body.readByteString())).build();

http.<Void>newCall(request, b -> {
return http.newCall(request, b -> {
String content = b.readUtf8();
if (content.indexOf("\"errors\":true") != -1) {
throw new IllegalStateException(content);
}
if (indices.isEmpty()) return null;
ElasticsearchHttpStorage.flush(http, join(indices));
return null;
}).submit(callback);
});
}

static String join(Collection<String> parts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import java.io.Closeable;
import java.io.IOException;
import okhttp3.Call;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
Expand All @@ -26,11 +25,12 @@
import okio.GzipSource;
import okio.Okio;
import zipkin.internal.CallbackCaptor;
import zipkin.storage.Callback;
import zipkin.internal.v2.Call;
import zipkin.internal.v2.Callback;

import static zipkin.internal.Util.propagateIfFatal;

public final class HttpCall<V> {
public final class HttpCall<V> extends Call<V> {

public interface BodyConverter<V> {
V convert(BufferedSource content) throws IOException;
Expand All @@ -46,7 +46,7 @@ public Factory(OkHttpClient ok, HttpUrl baseUrl) {
}

public <V> HttpCall<V> newCall(Request request, BodyConverter<V> bodyConverter) {
return new HttpCall(this, request, bodyConverter);
return new HttpCall<>(this, request, bodyConverter);
}

public <V> V execute(Request request, BodyConverter<V> bodyConverter) {
Expand All @@ -60,57 +60,108 @@ public <V> V execute(Request request, BodyConverter<V> bodyConverter) {
}
}

final Call.Factory ok;
final Request request;
final okhttp3.Call call;
final BodyConverter<V> bodyConverter;

HttpCall(Factory factory, Request request, BodyConverter<V> bodyConverter) {
this.ok = factory.ok;
this.request = request;
this(factory.ok.newCall(request), bodyConverter);
}

HttpCall(okhttp3.Call call, BodyConverter<V> bodyConverter) {
this.call = call;
this.bodyConverter = bodyConverter;
}

public void submit(Callback<V> delegate) {
ok.newCall(request).enqueue(new CallbackAdapter<>(bodyConverter, delegate));
@Override public V execute() throws IOException {
return parseResponse(call.execute(), bodyConverter);
}

static class CallbackAdapter<V> implements okhttp3.Callback {
@Override public void enqueue(Callback<V> delegate) {
call.enqueue(new V2CallbackAdapter<>(bodyConverter, delegate));
}

public void submit(zipkin.storage.Callback<V> delegate) {
call.enqueue(new CallbackAdapter<>(bodyConverter, delegate));
}

@Override public void cancel() {
call.cancel();
}

@Override public boolean isCanceled() {
return call.isCanceled();
}

@Override public HttpCall<V> clone() {
return new HttpCall<V>(call.clone(), bodyConverter);
}

static class V2CallbackAdapter<V> implements okhttp3.Callback {
final BodyConverter<V> bodyConverter;
final Callback<V> delegate;

CallbackAdapter(BodyConverter<V> bodyConverter, Callback<V> delegate) {
V2CallbackAdapter(BodyConverter<V> bodyConverter, Callback<V> delegate) {
this.bodyConverter = bodyConverter;
this.delegate = delegate;
}

@Override public void onFailure(Call call, IOException e) {
@Override public void onFailure(okhttp3.Call call, IOException e) {
delegate.onError(e);
}

/** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */
@Override public void onResponse(Call call, Response response) {
if (!HttpHeaders.hasBody(response)) {
if (response.isSuccessful()) {
delegate.onSuccess(null);
} else {
delegate.onError(new IllegalStateException("response failed: " + response));
}
return;
@Override public void onResponse(okhttp3.Call call, Response response) {
try {
delegate.onSuccess(parseResponse(response, bodyConverter));
} catch (Throwable e) {
propagateIfFatal(e);
delegate.onError(e);
}
}
}

static class CallbackAdapter<V> implements okhttp3.Callback {
final HttpCall.BodyConverter<V> bodyConverter;
final zipkin.storage.Callback<V> delegate;

CallbackAdapter(HttpCall.BodyConverter<V> bodyConverter, zipkin.storage.Callback<V> delegate) {
this.bodyConverter = bodyConverter;
this.delegate = delegate;
}

@Override public void onFailure(okhttp3.Call call, IOException e) {
delegate.onError(e);
}

/** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */
@Override public void onResponse(okhttp3.Call call, Response response) {
try {
delegate.onSuccess(parseResponse(response, bodyConverter));
} catch (Throwable e) {
propagateIfFatal(e);
delegate.onError(e);
}
}
}

static <V> V parseResponse(Response response, BodyConverter<V> bodyConverter) throws IOException {
if (!HttpHeaders.hasBody(response)) {
if (response.isSuccessful()) {
return null;
} else {
throw new IllegalStateException("response failed: " + response);
}
}
try (ResponseBody responseBody = response.body()) {
BufferedSource content = responseBody.source();
if ("gzip".equalsIgnoreCase(response.header("Content-Encoding"))) {
content = Okio.buffer(new GzipSource(responseBody.source()));
}
try (ResponseBody responseBody = response.body()) {
BufferedSource content = responseBody.source();
if ("gzip".equalsIgnoreCase(response.header("Content-Encoding"))) {
content = Okio.buffer(new GzipSource(responseBody.source()));
}
if (response.isSuccessful()) {
delegate.onSuccess(bodyConverter.convert(content));
} else {
delegate.onError(new IllegalStateException(
"response for " + response.request().tag() + " failed: " + content.readUtf8()));
}
} catch (Throwable t) {
propagateIfFatal(t);
delegate.onError(t);
if (response.isSuccessful()) {
return bodyConverter.convert(content);
} else {
throw new IllegalStateException(
"response for " + response.request().tag() + " failed: " + content.readUtf8());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.junit.Rule;
import org.junit.Test;
import zipkin.TestObjects;
import zipkin.internal.CallbackCaptor;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.Span.Kind;
import zipkin.internal.v2.codec.Decoder;
Expand Down Expand Up @@ -213,8 +212,6 @@ public void close() throws IOException {
}

void accept(Span... spans) throws Exception {
CallbackCaptor<Void> callback = new CallbackCaptor<>();
storage.v2AsyncSpanConsumer().accept(asList(spans), callback);
callback.get();
storage.v2AsyncSpanConsumer().accept(asList(spans)).execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package zipkin.storage.elasticsearch.http;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import zipkin.Codec;
import zipkin.DependencyLink;
Expand All @@ -32,9 +33,11 @@ public static void writeDependencyLinks(ElasticsearchHttpStorage es, List<Depend
byte[] document = Codec.JSON.writeDependencyLink(link);
indexer.add(index, DEPENDENCY, document, link.parent + "|" + link.child); // Unique constraint
}
CallbackCaptor<Void> callback = new CallbackCaptor<>();
indexer.execute(callback);
callback.get();
try {
indexer.newCall().execute();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static void clear(ElasticsearchHttpStorage es) throws IOException {
Expand Down
33 changes: 33 additions & 0 deletions zipkin/src/main/java/zipkin/internal/V2CallbackAdapter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.internal;

import javax.annotation.Nullable;
import zipkin.storage.Callback;

final class V2CallbackAdapter implements zipkin.internal.v2.Callback<Void> {
private final Callback<Void> callback;

V2CallbackAdapter(Callback<Void> callback) {
this.callback = callback;
}

@Override public void onSuccess(@Nullable Void value) {
callback.onSuccess(value);
}

@Override public void onError(Throwable t) {
callback.onError(t);
}
}
2 changes: 1 addition & 1 deletion zipkin/src/main/java/zipkin/internal/V2Collector.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void acceptSpans(byte[] serializedSpans, Decoder<Span> decoder, Callback<
}

@Override protected void record(List<Span> sampled, Callback<Void> callback) {
storage.v2AsyncSpanConsumer().accept(sampled, callback);
storage.v2AsyncSpanConsumer().accept(sampled).enqueue(new V2CallbackAdapter(callback));
}

@Override protected String idString(Span span) {
Expand Down
Loading

0 comments on commit ac0ba02

Please sign in to comment.