Skip to content

Commit

Permalink
Adds v2 Call interface based on okhttp
Browse files Browse the repository at this point in the history
In Zipkin v1, we had a pair of interfaces, like `SpanConsumer` and
`AsyncSpanConsumer`.

For example, this would define the same thing twice:

```java
void accept(List<Span> spans);

// and

void accept(List<Span> spans, Callback<Void> spans);
```

in v2, it looks like this (inspired by OkHttp's call):

```java
Call<Void> accept(List<Span> spans);

// then make a call like this..
call = spanConsumer.accept(spans);

// to do a blocking call..
call.execute();

// to do an async call..
call.enqueue(callback);

// to cancel
call.cancel();
```

The important part of this design is that it reduces the redundant
interfaces. Also, libraries like retrofit prove that this design is
compatible with other async abstractions like guava or rxjava.

We end up making less work for users and implementors at the cost of an
intermediary type: Call.
  • Loading branch information
Adrian Cole committed Aug 24, 2017
1 parent 65e7db8 commit ac7111d
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@
import okio.Buffer;
import okio.ByteString;
import zipkin.Annotation;
import zipkin.internal.v2.Call;
import zipkin.internal.v2.Callback;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;
import zipkin.internal.v2.storage.SpanConsumer;
import zipkin.storage.elasticsearch.http.internal.client.HttpCall;

import static zipkin.internal.Util.propagateIfFatal;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;

class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing
class ElasticsearchHttpSpanConsumer implements SpanConsumer { // not final for testing
static final Logger LOG = Logger.getLogger(ElasticsearchHttpSpanConsumer.class.getName());

final ElasticsearchHttpStorage es;
Expand All @@ -43,22 +44,27 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
this.indexNameFormatter = es.indexNameFormatter();
}

@Override public void accept(List<Span> spans, Callback<Void> callback) {
if (spans.isEmpty()) {
callback.onSuccess(null);
return;
}
try {
BulkSpanIndexer indexer = new BulkSpanIndexer(es);
indexSpans(indexer, spans);
indexer.execute(callback);
} catch (Throwable t) {
propagateIfFatal(t);
callback.onError(t);
}
@Override public Call<Void> accept(List<Span> spans) {
if (spans.isEmpty()) return Call.immediateCall(null);
BulkSpanIndexer indexer = new BulkSpanIndexer(es);
indexSpans(indexer, spans);
HttpCall<Void> call = indexer.newCall();
return new Call<Void>() {
@Override public Void execute() throws IOException {
return call.execute();
}

@Override public void enqueue(Callback<Void> callback) {
call.submit(callback);
}

@Override public void cancel() {
call.cancel();
}
};
}

void indexSpans(BulkSpanIndexer indexer, List<Span> spans) throws IOException {
void indexSpans(BulkSpanIndexer indexer, List<Span> spans) {
for (Span span : spans) {
Long spanTimestamp = span.timestamp();
long indexTimestamp = 0L; // which index to store this span into
Expand Down Expand Up @@ -92,8 +98,8 @@ void add(long indexTimestamp, Span span, @Nullable Long timestampMillis) {
indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
}

void execute(Callback<Void> callback) throws IOException {
indexer.execute(callback);
HttpCall<Void> newCall() {
return indexer.newCall();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import okhttp3.RequestBody;
import okio.Buffer;
import zipkin.internal.V2StorageComponent;
import zipkin.internal.v2.storage.SpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
import zipkin.storage.StorageAdapters;
Expand Down Expand Up @@ -220,7 +221,7 @@ public final Builder dateSeparator(char dateSeparator) {
}
}

@Override protected zipkin.internal.v2.storage.AsyncSpanConsumer v2AsyncSpanConsumer() {
@Override protected SpanConsumer v2AsyncSpanConsumer() {
ensureIndexTemplates();
return new ElasticsearchHttpSpanConsumer(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,23 @@ void writeDocument(byte[] document) {
}

/** Creates a bulk request when there is more than one object to store */
void execute(Callback<Void> callback) {
HttpCall<Void> newCall(){
HttpUrl url = pipeline != null
? http.baseUrl.newBuilder("_bulk").addQueryParameter("pipeline", pipeline).build()
: http.baseUrl.resolve("_bulk");

Request request = new Request.Builder().url(url).tag(tag)
.post(RequestBody.create(APPLICATION_JSON, body.readByteString())).build();

http.<Void>newCall(request, b -> {
return http.newCall(request, b -> {
String content = b.readUtf8();
if (content.indexOf("\"errors\":true") != -1) {
throw new IllegalStateException(content);
}
if (indices.isEmpty()) return null;
ElasticsearchHttpStorage.flush(http, join(indices));
return null;
}).submit(callback);
});
}

static String join(Collection<String> parts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import okio.GzipSource;
import okio.Okio;
import zipkin.internal.CallbackCaptor;
import zipkin.storage.Callback;
import zipkin.internal.v2.Callback;

import static zipkin.internal.Util.propagateIfFatal;

Expand All @@ -46,7 +46,7 @@ public Factory(OkHttpClient ok, HttpUrl baseUrl) {
}

public <V> HttpCall<V> newCall(Request request, BodyConverter<V> bodyConverter) {
return new HttpCall(this, request, bodyConverter);
return new HttpCall<>(this, request, bodyConverter);
}

public <V> V execute(Request request, BodyConverter<V> bodyConverter) {
Expand All @@ -60,25 +60,35 @@ public <V> V execute(Request request, BodyConverter<V> bodyConverter) {
}
}

final Call.Factory ok;
final Request request;
final Call call;
final BodyConverter<V> bodyConverter;

HttpCall(Factory factory, Request request, BodyConverter<V> bodyConverter) {
this.ok = factory.ok;
this.request = request;
this.call = factory.ok.newCall(request);
this.bodyConverter = bodyConverter;
}

public V execute() throws IOException {
return parseResponse(call.execute(), bodyConverter);
}

public void submit(Callback<V> delegate) {
ok.newCall(request).enqueue(new CallbackAdapter<>(bodyConverter, delegate));
call.enqueue(new V2CallbackAdapter<>(bodyConverter, delegate));
}

static class CallbackAdapter<V> implements okhttp3.Callback {
public void submit(zipkin.storage.Callback<V> delegate) {
call.enqueue(new CallbackAdapter<>(bodyConverter, delegate));
}

public void cancel() {
call.cancel();
}

static class V2CallbackAdapter<V> implements okhttp3.Callback {
final BodyConverter<V> bodyConverter;
final Callback<V> delegate;

CallbackAdapter(BodyConverter<V> bodyConverter, Callback<V> delegate) {
V2CallbackAdapter(BodyConverter<V> bodyConverter, Callback<V> delegate) {
this.bodyConverter = bodyConverter;
this.delegate = delegate;
}
Expand All @@ -89,28 +99,57 @@ static class CallbackAdapter<V> implements okhttp3.Callback {

/** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */
@Override public void onResponse(Call call, Response response) {
if (!HttpHeaders.hasBody(response)) {
if (response.isSuccessful()) {
delegate.onSuccess(null);
} else {
delegate.onError(new IllegalStateException("response failed: " + response));
}
return;
try {
delegate.onSuccess(parseResponse(response, bodyConverter));
} catch (Throwable e) {
propagateIfFatal(e);
delegate.onError(e);
}
}
}

static class CallbackAdapter<V> implements okhttp3.Callback {
final HttpCall.BodyConverter<V> bodyConverter;
final zipkin.storage.Callback<V> delegate;

CallbackAdapter(HttpCall.BodyConverter<V> bodyConverter, zipkin.storage.Callback<V> delegate) {
this.bodyConverter = bodyConverter;
this.delegate = delegate;
}

@Override public void onFailure(okhttp3.Call call, IOException e) {
delegate.onError(e);
}

/** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */
@Override public void onResponse(okhttp3.Call call, Response response) {
try {
delegate.onSuccess(parseResponse(response, bodyConverter));
} catch (Throwable e) {
propagateIfFatal(e);
delegate.onError(e);
}
}
}

static <V> V parseResponse(Response response, BodyConverter<V> bodyConverter) throws IOException {
if (!HttpHeaders.hasBody(response)) {
if (response.isSuccessful()) {
return null;
} else {
throw new IllegalStateException("response failed: " + response);
}
}
try (ResponseBody responseBody = response.body()) {
BufferedSource content = responseBody.source();
if ("gzip".equalsIgnoreCase(response.header("Content-Encoding"))) {
content = Okio.buffer(new GzipSource(responseBody.source()));
}
try (ResponseBody responseBody = response.body()) {
BufferedSource content = responseBody.source();
if ("gzip".equalsIgnoreCase(response.header("Content-Encoding"))) {
content = Okio.buffer(new GzipSource(responseBody.source()));
}
if (response.isSuccessful()) {
delegate.onSuccess(bodyConverter.convert(content));
} else {
delegate.onError(new IllegalStateException(
"response for " + response.request().tag() + " failed: " + content.readUtf8()));
}
} catch (Throwable t) {
propagateIfFatal(t);
delegate.onError(t);
if (response.isSuccessful()) {
return bodyConverter.convert(content);
} else {
throw new IllegalStateException(
"response for " + response.request().tag() + " failed: " + content.readUtf8());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.junit.Rule;
import org.junit.Test;
import zipkin.TestObjects;
import zipkin.internal.CallbackCaptor;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.Span.Kind;
import zipkin.internal.v2.codec.Decoder;
Expand Down Expand Up @@ -213,8 +212,6 @@ public void close() throws IOException {
}

void accept(Span... spans) throws Exception {
CallbackCaptor<Void> callback = new CallbackCaptor<>();
storage.v2AsyncSpanConsumer().accept(asList(spans), callback);
callback.get();
storage.v2AsyncSpanConsumer().accept(asList(spans)).execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package zipkin.storage.elasticsearch.http;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import zipkin.Codec;
import zipkin.DependencyLink;
Expand All @@ -32,9 +33,11 @@ public static void writeDependencyLinks(ElasticsearchHttpStorage es, List<Depend
byte[] document = Codec.JSON.writeDependencyLink(link);
indexer.add(index, DEPENDENCY, document, link.parent + "|" + link.child); // Unique constraint
}
CallbackCaptor<Void> callback = new CallbackCaptor<>();
indexer.execute(callback);
callback.get();
try {
indexer.newCall().execute();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static void clear(ElasticsearchHttpStorage es) throws IOException {
Expand Down
33 changes: 33 additions & 0 deletions zipkin/src/main/java/zipkin/internal/V2CallbackAdapter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.internal;

import javax.annotation.Nullable;
import zipkin.storage.Callback;

final class V2CallbackAdapter implements zipkin.internal.v2.Callback<Void> {
private final Callback<Void> callback;

V2CallbackAdapter(Callback<Void> callback) {
this.callback = callback;
}

@Override public void onSuccess(@Nullable Void value) {
callback.onSuccess(value);
}

@Override public void onError(Throwable t) {
callback.onError(t);
}
}
2 changes: 1 addition & 1 deletion zipkin/src/main/java/zipkin/internal/V2Collector.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void acceptSpans(byte[] serializedSpans, Decoder<Span> decoder, Callback<
}

@Override protected void record(List<Span> sampled, Callback<Void> callback) {
storage.v2AsyncSpanConsumer().accept(sampled, callback);
storage.v2AsyncSpanConsumer().accept(sampled).enqueue(new V2CallbackAdapter(callback));
}

@Override protected String idString(Span span) {
Expand Down
10 changes: 6 additions & 4 deletions zipkin/src/main/java/zipkin/internal/V2StorageComponent.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.storage.SpanConsumer;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;
import zipkin.storage.StorageComponent;
Expand All @@ -25,12 +27,12 @@ public abstract class V2StorageComponent implements StorageComponent {
return new V2AsyncSpanConsumerAdapter(v2AsyncSpanConsumer());
}

protected abstract zipkin.internal.v2.storage.AsyncSpanConsumer v2AsyncSpanConsumer();
protected abstract SpanConsumer v2AsyncSpanConsumer();

static class V2AsyncSpanConsumerAdapter implements AsyncSpanConsumer {
final zipkin.internal.v2.storage.AsyncSpanConsumer delegate;
final SpanConsumer delegate;

V2AsyncSpanConsumerAdapter(zipkin.internal.v2.storage.AsyncSpanConsumer delegate) {
V2AsyncSpanConsumerAdapter(SpanConsumer delegate) {
this.delegate = delegate;
}

Expand All @@ -40,7 +42,7 @@ static class V2AsyncSpanConsumerAdapter implements AsyncSpanConsumer {
for (int i = 0; i < length; i++) {
linkSpans.addAll(V2SpanConverter.fromSpan(spans.get(i)));
}
delegate.accept(linkSpans, callback);
delegate.accept(linkSpans).enqueue(new V2CallbackAdapter(callback));
}
}
}
Loading

0 comments on commit ac7111d

Please sign in to comment.