diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/CallbackAdapter.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/CallbackAdapter.java new file mode 100644 index 00000000000..d196e705d6f --- /dev/null +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/CallbackAdapter.java @@ -0,0 +1,58 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.storage.elasticsearch.http; + +import java.io.IOException; +import okhttp3.Call; +import okhttp3.Response; +import okhttp3.ResponseBody; +import zipkin.storage.Callback; + +import static zipkin.internal.Util.propagateIfFatal; + +class CallbackAdapter implements okhttp3.Callback { + final Call call; + final Callback delegate; + + CallbackAdapter(Call call, zipkin.storage.Callback delegate) { + this.call = call; + this.delegate = delegate; + } + + @Override public void onFailure(Call call, IOException e) { + delegate.onError(e); + } + + void enqueue() { + call.enqueue(this); + } + + /** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */ + @Override public void onResponse(Call call, Response response) { + try (ResponseBody responseBody = response.body()) { + if (response.isSuccessful()) { + delegate.onSuccess(convert(responseBody)); + } else { + delegate.onError(new IllegalStateException("response failed: " + response)); + } + } catch (Throwable t) { + propagateIfFatal(t); + delegate.onError(t); + } + } + + V convert(ResponseBody responseBody) throws IOException { + return null; + } +} diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkIndexer.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkIndexer.java index 62ccd7c27ea..6ac2f67e2ae 100644 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkIndexer.java +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkIndexer.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -13,19 +13,18 @@ */ package zipkin.storage.elasticsearch.http; -import com.google.common.base.Joiner; -import com.google.common.util.concurrent.ListenableFuture; -import com.squareup.moshi.JsonWriter; import java.io.IOException; +import java.util.Iterator; import java.util.LinkedHashSet; import java.util.Set; -import okhttp3.Call; import okhttp3.HttpUrl; import okhttp3.MediaType; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.ResponseBody; import okio.Buffer; +import zipkin.internal.Nullable; +import zipkin.storage.Callback; // See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html // exposed to re-use for testing writes of dependency links @@ -47,24 +46,23 @@ abstract class HttpBulkIndexer { this.tag = "index-" + typeName; } - void add(String index, T object, String id) throws IOException { + void add(String index, T object, @Nullable String id) { writeIndexMetadata(index, id); writeDocument(object); if (client.flushOnWrites) indices.add(index); } - void writeIndexMetadata(String index, String id) throws IOException { - JsonWriter writer = JsonWriter.of(body); - writer.beginObject().name("index").beginObject(); - writer.name("_index").value(index); - writer.name("_type").value(typeName); - writer.name("_id").value(id); - writer.endObject().endObject(); - body.writeByte('\n'); + void writeIndexMetadata(String index, @Nullable String id) { + body.writeUtf8("{\"index\":{\"_index\":\"").writeUtf8(index).writeByte('"'); + body.writeUtf8(",\"_type\":\"").writeUtf8(typeName).writeByte('"'); + if (id != null) { + body.writeUtf8(",\"_id\":\"").writeUtf8(id).writeByte('"'); + } + body.writeUtf8("}}\n"); } - void writeDocument(T object) throws IOException { + void writeDocument(T object) { body.write(toJsonBytes(object)); body.writeByte('\n'); } @@ -72,7 +70,7 @@ void writeDocument(T object) throws IOException { abstract byte[] toJsonBytes(T object); /** Creates a bulk request when there is more than one object to store */ - public ListenableFuture execute() throws IOException { // public to allow interface retrofit + public void execute(Callback callback) { // public to allow interface retrofit HttpUrl url = client.pipeline != null ? client.baseUrl.newBuilder("_bulk").addQueryParameter("pipeline", client.pipeline).build() : client.baseUrl.resolve("_bulk"); @@ -80,11 +78,15 @@ public ListenableFuture execute() throws IOException { // public to allow Request request = new Request.Builder().url(url).tag(tag) .post(RequestBody.create(APPLICATION_JSON, body.readByteString())).build(); - return new CallbackListenableFuture(client.http.newCall(request)) { + new CallbackAdapter(client.http.newCall(request), callback) { @Override Void convert(ResponseBody responseBody) throws IOException { - if (!indices.isEmpty()) { - client.flush(Joiner.on(',').join(indices)); + if (indices.isEmpty()) return null; + Iterator index = indices.iterator(); + StringBuilder indexString = new StringBuilder(index.next()); + while (index.hasNext()) { + indexString.append(',').append(index.next()); } + client.flush(indexString.toString()); return null; } }.enqueue(); diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkSpanIndexer.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkSpanIndexer.java index 53decaa956d..830d4122e2e 100644 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkSpanIndexer.java +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkSpanIndexer.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -28,7 +28,7 @@ final class HttpBulkSpanIndexer extends HttpBulkIndexer implements } @Override - public HttpBulkSpanIndexer add(String index, Span span, Long timestampMillis) throws IOException { + public HttpBulkSpanIndexer add(String index, Span span, Long timestampMillis) { String id = null; // Allow ES to choose an ID if (timestampMillis == null) { super.add(index, span, id); diff --git a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpClient.java b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpClient.java index dd788692fef..e12e10c78b7 100644 --- a/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpClient.java +++ b/zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpClient.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -102,7 +102,7 @@ static final class Factory implements InternalElasticsearchClient.Factory { this.allIndices = new String[] {allIndices}; } - @Override protected String getVersion() throws IOException { + @Override protected String getVersion() { Request getNode = new Request.Builder().url(baseUrl).tag("get-node").build(); try (Response response = http.newCall(getNode).execute()) { @@ -113,6 +113,8 @@ static final class Factory implements InternalElasticsearchClient.Factory { JsonReader version = enterPath(JsonReader.of(response.body().source()), "version", "number"); if (version == null) throw new IllegalStateException(".version.number not in response"); return version.nextString(); + } catch (IOException e) { + throw new IllegalStateException("Could not get version", e); } } @@ -120,7 +122,7 @@ static final class Factory implements InternalElasticsearchClient.Factory { * This is a blocking call, used inside a lazy. That's because no writes should occur until the * template is available. */ - @Override protected void ensureTemplate(String name, String indexTemplate) throws IOException { + @Override protected void ensureTemplate(String name, String indexTemplate) { HttpUrl templateUrl = baseUrl.newBuilder("_template").addPathSegment(name).build(); Request request = new Request.Builder().url(templateUrl).tag("get-template").build(); @@ -128,6 +130,8 @@ static final class Factory implements InternalElasticsearchClient.Factory { if (response.isSuccessful()) { return; } + } catch (IOException e) { + throw new IllegalStateException("Could not get " + templateUrl.encodedPath(), e); } Call putTemplate = http.newCall(new Request.Builder() @@ -139,6 +143,8 @@ static final class Factory implements InternalElasticsearchClient.Factory { if (!response.isSuccessful()) { throw new IllegalStateException(response.body().string()); } + } catch (IOException e) { + throw new IllegalStateException("Could not put " + templateUrl.encodedPath(), e); } } @@ -217,7 +223,7 @@ protected ListenableFuture> findDependencies(String[] indic } /** This is blocking so that we can determine if the cluster is healthy or not */ - @Override protected void ensureClusterReady(String catchAll) throws IOException { + @Override protected void ensureClusterReady(String catchAll) { Call getHealth = http.newCall( new Request.Builder().url(baseUrl.resolve("/_cluster/health/" + catchAll)) .tag("get-cluster-health").build()); @@ -227,8 +233,9 @@ protected ListenableFuture> findDependencies(String[] indic JsonReader status = enterPath(JsonReader.of(response.body().source()), "status"); checkState(status != null, "Health status couldn't be read %s", response); checkState(!"RED".equalsIgnoreCase(status.nextString()), "Health status is RED"); - return; } + } catch (IOException e) { + throw new IllegalStateException("could not get " + getHealth.request().url(), e); } } diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/CallbackAdapterTest.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/CallbackAdapterTest.java new file mode 100644 index 00000000000..9a493767319 --- /dev/null +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/CallbackAdapterTest.java @@ -0,0 +1,96 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.storage.elasticsearch.http; + +import com.google.common.util.concurrent.SimpleTimeLimiter; +import com.google.common.util.concurrent.UncheckedTimeoutException; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import okhttp3.Call; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.ResponseBody; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import zipkin.internal.CallbackCaptor; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; + +public class CallbackAdapterTest { + @Rule + public MockWebServer mws = new MockWebServer(); + + OkHttpClient client = new OkHttpClient(); + CallbackCaptor callback = new CallbackCaptor<>(); + Call call = client.newCall(new Request.Builder().url(mws.url("")).build()); + + @After + public void close() throws IOException { + client.dispatcher().executorService().shutdownNow(); + } + + @Test + public void propagatesOnDispatcherThreadWhenFatal() throws Exception { + mws.enqueue(new MockResponse()); + + new CallbackAdapter(call, callback) { + @Override Void convert(ResponseBody responseBody) throws IOException { + throw new LinkageError(); + } + }.enqueue(); + + SimpleTimeLimiter timeLimiter = new SimpleTimeLimiter(); + try { + timeLimiter.callWithTimeout(callback::get, 100, TimeUnit.MILLISECONDS, true); + failBecauseExceptionWasNotThrown(UncheckedTimeoutException.class); + } catch (UncheckedTimeoutException expected) { + } + } + + @Test + public void executionException_conversionException() throws Exception { + mws.enqueue(new MockResponse()); + + new CallbackAdapter(call, callback) { + @Override Void convert(ResponseBody responseBody) throws IOException { + throw new IllegalArgumentException("eeek"); + } + }.enqueue(); + + try { + callback.get(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalArgumentException expected) { + assertThat(expected).isInstanceOf(IllegalArgumentException.class); + } + } + + @Test + public void executionException_httpFailure() throws Exception { + mws.enqueue(new MockResponse().setResponseCode(500)); + + new CallbackAdapter(call, callback).enqueue(); + + try { + callback.get(); + failBecauseExceptionWasNotThrown(IllegalStateException.class); + } catch (IllegalStateException expected) { + assertThat(expected).isInstanceOf(IllegalStateException.class); + } + } +} diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpBulkSpanIndexerTest.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpBulkSpanIndexerTest.java index c0b03f970c5..73ddcb431ba 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpBulkSpanIndexerTest.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpBulkSpanIndexerTest.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -24,6 +24,7 @@ import org.junit.rules.ExpectedException; import zipkin.Codec; import zipkin.TestObjects; +import zipkin.internal.CallbackCaptor; import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; @@ -35,6 +36,8 @@ public class HttpBulkSpanIndexerTest { @Rule public MockWebServer es = new MockWebServer(); + CallbackCaptor callback = new CallbackCaptor<>(); + HttpBulkSpanIndexer indexer = new HttpBulkSpanIndexer((HttpClient) new HttpClientBuilder(new OkHttpClient()) .hosts(asList(es.url("").toString())) @@ -50,7 +53,8 @@ public void doesntWriteSpanId() throws Exception { es.enqueue(new MockResponse()); indexer.add("test_zipkin_http-2016-10-01", TestObjects.LOTS_OF_SPANS[0], (Long) null); - indexer.execute().get(); + indexer.execute(callback); + callback.get(); RecordedRequest request = es.takeRequest(); assertThat(request.getBody().readByteString().utf8()) @@ -62,7 +66,8 @@ public void writesSpanNaturallyWhenNoTimestamp() throws Exception { es.enqueue(new MockResponse()); indexer.add("test_zipkin_http-2016-10-01", TestObjects.LOTS_OF_SPANS[0], (Long) null); - indexer.execute().get(); + indexer.execute(callback); + callback.get(); RecordedRequest request = es.takeRequest(); assertThat(request.getBody().readByteString().utf8()) diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpClientTest.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpClientTest.java index 0b6ecc73c20..df058f34d7d 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpClientTest.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpClientTest.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -23,6 +23,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import zipkin.TestObjects; +import zipkin.internal.CallbackCaptor; import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; @@ -137,10 +138,11 @@ public void addsPipelineId() throws Exception { es.enqueue(new MockResponse()); + CallbackCaptor callback = new CallbackCaptor<>(); client.bulkSpanIndexer() .add("zipkin-2016-10-01", TestObjects.TRACE.get(0), null) - .execute() - .get(); + .execute(callback); + callback.get(); RecordedRequest request = es.takeRequest(); assertThat(request.getPath()) diff --git a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpElasticsearchDependencyWriter.java b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpElasticsearchDependencyWriter.java index 115ef7f67ee..b110f88882c 100644 --- a/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpElasticsearchDependencyWriter.java +++ b/zipkin-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpElasticsearchDependencyWriter.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,13 +16,14 @@ import java.util.List; import zipkin.Codec; import zipkin.DependencyLink; +import zipkin.internal.CallbackCaptor; import zipkin.storage.elasticsearch.InternalElasticsearchClient; import static com.google.common.base.Preconditions.checkArgument; public class HttpElasticsearchDependencyWriter { public static void writeDependencyLinks(InternalElasticsearchClient genericClient, - List links, String index, String type) throws Exception { + List links, String index, String type) { checkArgument(genericClient instanceof HttpClient, ""); HttpClient client = (HttpClient) genericClient; HttpBulkIndexer indexer = new HttpBulkIndexer(client, type){ @@ -33,6 +34,8 @@ public static void writeDependencyLinks(InternalElasticsearchClient genericClien for (DependencyLink link : links) { indexer.add(index, link, link.parent + "|" + link.child); // Unique constraint } - indexer.execute().get(); + CallbackCaptor callback = new CallbackCaptor<>(); + indexer.execute(callback); + callback.get(); } } diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/ElasticsearchSpanConsumer.java b/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/ElasticsearchSpanConsumer.java index 8fc65a8878a..62752e019dd 100755 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/ElasticsearchSpanConsumer.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/ElasticsearchSpanConsumer.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -13,23 +13,20 @@ */ package zipkin.storage.elasticsearch; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; import zipkin.Span; +import zipkin.storage.AsyncSpanConsumer; +import zipkin.storage.Callback; import zipkin.storage.elasticsearch.InternalElasticsearchClient.BulkSpanIndexer; -import zipkin.storage.guava.GuavaSpanConsumer; -import static com.google.common.util.concurrent.Futures.immediateFuture; import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp; import static zipkin.internal.Util.UTF_8; +import static zipkin.internal.Util.propagateIfFatal; -final class ElasticsearchSpanConsumer implements GuavaSpanConsumer { +final class ElasticsearchSpanConsumer implements AsyncSpanConsumer { private static final byte[] TIMESTAMP_MILLIS_PREFIX = "{\"timestamp_millis\":".getBytes(UTF_8); - private static final ListenableFuture VOID = immediateFuture(null); private final InternalElasticsearchClient client; private final IndexNameFormatter indexNameFormatter; @@ -40,12 +37,16 @@ final class ElasticsearchSpanConsumer implements GuavaSpanConsumer { this.indexNameFormatter = indexNameFormatter; } - @Override public ListenableFuture accept(List spans) { - if (spans.isEmpty()) return VOID; + @Override public void accept(List spans, Callback callback) { + if (spans.isEmpty()) { + callback.onSuccess(null); + return; + } try { - return indexSpans(client.bulkSpanIndexer(), spans).execute(); - } catch (Exception e) { - return Futures.immediateFailedFuture(e); + indexSpans(client.bulkSpanIndexer(), spans).execute(callback); + } catch (Throwable t) { + propagateIfFatal(t); + callback.onError(t); } } @@ -71,7 +72,6 @@ BulkSpanIndexer indexSpans(BulkSpanIndexer indexer, List spans) throws IOE * when storing. The cheapest way to do this without changing the codec is prefixing it to the * json. For example. {"traceId":"... becomes {"timestamp_millis":12345,"traceId":"... */ - @VisibleForTesting static byte[] prefixWithTimestampMillis(byte[] input, long timestampMillis) { String dateAsString = Long.toString(timestampMillis); byte[] newSpanBytes = diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/ElasticsearchStorage.java b/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/ElasticsearchStorage.java index 62c10125c32..173b1d42f10 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/ElasticsearchStorage.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/ElasticsearchStorage.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -14,17 +14,17 @@ package zipkin.storage.elasticsearch; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.UncheckedExecutionException; import java.io.IOException; import java.util.List; -import zipkin.internal.Lazy; import zipkin.storage.StorageComponent; +import zipkin.storage.guava.GuavaSpanConsumer; +import zipkin.storage.guava.GuavaStorageAdapters; import zipkin.storage.guava.LazyGuavaStorageComponent; import static zipkin.internal.Util.checkNotNull; public final class ElasticsearchStorage - extends LazyGuavaStorageComponent { + extends LazyGuavaStorageComponent { public static Builder builder() { return new Builder(new NativeClient.Builder()); @@ -140,8 +140,9 @@ public Builder indexReplicas(int indexReplicas) { return new ElasticsearchSpanStore(client(), indexNameFormatter, strictTraceId); } - @Override protected ElasticsearchSpanConsumer computeGuavaSpanConsumer() { - return new ElasticsearchSpanConsumer(client(), indexNameFormatter); + @Override protected GuavaSpanConsumer computeGuavaSpanConsumer() { + return GuavaStorageAdapters.asyncToGuava( + new ElasticsearchSpanConsumer(client(), indexNameFormatter)); } @VisibleForTesting void clear() throws IOException { @@ -151,9 +152,7 @@ public Builder indexReplicas(int indexReplicas) { @Override public CheckResult check() { try { client().ensureClusterReady(indexNameFormatter.catchAll()); - } catch (UncheckedExecutionException e) { // we have to wrap on LazyClient.compute() - return CheckResult.failed((Exception) e.getCause()); - } catch (Exception e) { + } catch (RuntimeException e) { return CheckResult.failed(e); } return CheckResult.OK; diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/InternalElasticsearchClient.java b/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/InternalElasticsearchClient.java index 76425b13d63..58d33919555 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/InternalElasticsearchClient.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/InternalElasticsearchClient.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -25,6 +25,7 @@ import zipkin.DependencyLink; import zipkin.Span; import zipkin.internal.Lazy; +import zipkin.storage.Callback; import static com.google.common.base.Preconditions.checkNotNull; import static zipkin.storage.elasticsearch.ElasticsearchSpanConsumer.prefixWithTimestampMillis; @@ -85,10 +86,10 @@ public final Builder hosts(final List hosts) { } /** Returns the Elasticsearch version of the transport. */ - protected abstract String getVersion() throws IOException; + protected abstract String getVersion(); /** Ensures the existence of a template, creating it if it does not exist. */ - protected abstract void ensureTemplate(String name, String indexTemplate) throws IOException; + protected abstract void ensureTemplate(String name, String indexTemplate); /** Deletes the specified index pattern is supplied */ protected abstract void clear(String index) throws IOException; @@ -136,9 +137,9 @@ public interface BulkSpanIndexer { * *

For example. {"traceId":".. becomes {"timestamp_millis":12345,"traceId":"... */ - BulkSpanIndexer add(String index, Span span, Long timestampMillis) throws IOException; + BulkSpanIndexer add(String index, Span span, Long timestampMillis); - ListenableFuture execute() throws IOException; + void execute(Callback callback); } protected static abstract class SpanBytesBulkSpanIndexer implements BulkSpanIndexer { @@ -162,7 +163,7 @@ public static byte[] toSpanBytes(Span span, Long timestampMillis) { * * @param catchAll See {@link IndexNameFormatter#catchAll()} */ - protected abstract void ensureClusterReady(String catchAll) throws IOException; + protected abstract void ensureClusterReady(String catchAll); /** Overridden to remove checked exceptions. */ @Override diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/LazyClient.java b/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/LazyClient.java index c9324d3158e..0975cd71718 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/LazyClient.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/LazyClient.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -14,7 +14,6 @@ package zipkin.storage.elasticsearch; import com.google.common.io.Resources; -import com.google.common.util.concurrent.UncheckedExecutionException; import java.io.IOException; import java.nio.charset.StandardCharsets; import zipkin.internal.LazyCloseable; @@ -49,9 +48,9 @@ final class LazyClient extends LazyCloseable { String version = client.getVersion(); String versionSpecificTemplate = versionSpecificTemplate(version); client.ensureTemplate(indexTemplateName, versionSpecificTemplate); - } catch (IOException e) { + } catch (RuntimeException e) { client.close(); - throw new UncheckedExecutionException(e); + throw e; } return client; } @@ -72,4 +71,5 @@ String versionSpecificTemplate(String version) { @Override public String toString() { return clientFactory.toString(); } + } diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/NativeClient.java b/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/NativeClient.java index bf6a4101c8e..6e4e2391288 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/NativeClient.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/NativeClient.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -14,13 +14,11 @@ package zipkin.storage.elasticsearch; import com.google.common.base.Function; -import com.google.common.base.Functions; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.io.Resources; import com.google.common.net.HostAndPort; -import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; @@ -34,6 +32,7 @@ import java.util.Properties; import java.util.Set; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; @@ -62,6 +61,7 @@ import zipkin.Span; import zipkin.internal.Lazy; import zipkin.internal.Util; +import zipkin.storage.Callback; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -175,7 +175,7 @@ private static final class Factory implements InternalElasticsearchClient.Factor } /** Since the wire protocol doesn't support version negotiation, return the library version */ - @Override protected String getVersion() throws IOException { + @Override protected String getVersion() { return clientVersion; } @@ -300,42 +300,59 @@ enum ConvertDependenciesResponse implements Function indexRequests = new LinkedList<>(); - final Set indices = new LinkedHashSet<>(); + final Set indicesToFlush = new LinkedHashSet<>(); @Override protected void add(String index, byte[] spanBytes) { indexRequests.add(client.prepareIndex(index, SPAN).setSource(spanBytes)); - if (flushOnWrites) indices.add(index); + if (flushOnWrites) indicesToFlush.add(index); } // Creates a bulk request when there is more than one span to store - @Override public ListenableFuture execute() { - ListenableFuture future; + @Override public void execute(final Callback callback) { + ActionListener callbackAdapter = new ActionListener() { + @Override public void onResponse(Object input) { + callback.onSuccess(null); + } + + @Override public void onFailure(Throwable throwable) { + callback.onError(throwable); + } + }; + + // Conditionally create a bulk action depending on the count of index requests + ListenableActionFuture future; if (indexRequests.size() == 1) { - future = toGuava(indexRequests.get(0).execute()); + future = indexRequests.get(0).execute(); } else { BulkRequestBuilder request = client.prepareBulk(); for (IndexRequestBuilder span : indexRequests) { request.add(span); } - future = toGuava(request.execute()); + future = request.execute(); } - if (!indices.isEmpty()) { - future = transform(future, new AsyncFunction() { - @Override public ListenableFuture apply(Object input) { - return toGuava(client.admin().indices() - .prepareFlush(indices.toArray(new String[indices.size()])) - .execute()); - } - }); + + // Unless we are in a unit test, this should always be true + if (indicesToFlush.isEmpty()) { + future.addListener(callbackAdapter); + return; } - return transform(future, TO_VOID); + // If we are in a unit test, we need to flush so that we can read our writes + future.addListener(new ActionListener() { + @Override public void onResponse(Object input) { + client.admin().indices() + .prepareFlush(indicesToFlush.toArray(new String[indicesToFlush.size()])) + .execute().addListener(callbackAdapter); + } + + @Override public void onFailure(Throwable throwable) { + callbackAdapter.onFailure(throwable); + } + }); } }; } - private static final Function TO_VOID = Functions.constant(null); - @Override protected void ensureClusterReady(String catchAll) { ClusterHealthResponse health = getUnchecked(client .admin().cluster().prepareHealth(catchAll).execute());