Skip to content

Commit

Permalink
Begins migration off Elasticsearch 2.x library (and Guava) (#1495)
Browse files Browse the repository at this point in the history
Currently, our Elasticsearch 2.x dependency makes it impossible to load
recent versions of Elasticsearch or Guava.

This begins work to migrate off by porting write and health code to not
use guava types.

See #1431
  • Loading branch information
adriancole authored Jan 18, 2017
1 parent cf4df43 commit b543f09
Show file tree
Hide file tree
Showing 13 changed files with 279 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.storage.elasticsearch.http;

import java.io.IOException;
import okhttp3.Call;
import okhttp3.Response;
import okhttp3.ResponseBody;
import zipkin.storage.Callback;

import static zipkin.internal.Util.propagateIfFatal;

class CallbackAdapter<V> implements okhttp3.Callback {
final Call call;
final Callback<V> delegate;

CallbackAdapter(Call call, zipkin.storage.Callback<V> delegate) {
this.call = call;
this.delegate = delegate;
}

@Override public void onFailure(Call call, IOException e) {
delegate.onError(e);
}

void enqueue() {
call.enqueue(this);
}

/** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */
@Override public void onResponse(Call call, Response response) {
try (ResponseBody responseBody = response.body()) {
if (response.isSuccessful()) {
delegate.onSuccess(convert(responseBody));
} else {
delegate.onError(new IllegalStateException("response failed: " + response));
}
} catch (Throwable t) {
propagateIfFatal(t);
delegate.onError(t);
}
}

V convert(ResponseBody responseBody) throws IOException {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,19 +13,18 @@
*/
package zipkin.storage.elasticsearch.http;

import com.google.common.base.Joiner;
import com.google.common.util.concurrent.ListenableFuture;
import com.squareup.moshi.JsonWriter;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
import okhttp3.Call;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.ResponseBody;
import okio.Buffer;
import zipkin.internal.Nullable;
import zipkin.storage.Callback;

// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
// exposed to re-use for testing writes of dependency links
Expand All @@ -47,44 +46,47 @@ abstract class HttpBulkIndexer<T> {
this.tag = "index-" + typeName;
}

void add(String index, T object, String id) throws IOException {
void add(String index, T object, @Nullable String id) {
writeIndexMetadata(index, id);
writeDocument(object);

if (client.flushOnWrites) indices.add(index);
}

void writeIndexMetadata(String index, String id) throws IOException {
JsonWriter writer = JsonWriter.of(body);
writer.beginObject().name("index").beginObject();
writer.name("_index").value(index);
writer.name("_type").value(typeName);
writer.name("_id").value(id);
writer.endObject().endObject();
body.writeByte('\n');
void writeIndexMetadata(String index, @Nullable String id) {
body.writeUtf8("{\"index\":{\"_index\":\"").writeUtf8(index).writeByte('"');
body.writeUtf8(",\"_type\":\"").writeUtf8(typeName).writeByte('"');
if (id != null) {
body.writeUtf8(",\"_id\":\"").writeUtf8(id).writeByte('"');
}
body.writeUtf8("}}\n");
}

void writeDocument(T object) throws IOException {
void writeDocument(T object) {
body.write(toJsonBytes(object));
body.writeByte('\n');
}

abstract byte[] toJsonBytes(T object);

/** Creates a bulk request when there is more than one object to store */
public ListenableFuture<Void> execute() throws IOException { // public to allow interface retrofit
public void execute(Callback<Void> callback) { // public to allow interface retrofit
HttpUrl url = client.pipeline != null
? client.baseUrl.newBuilder("_bulk").addQueryParameter("pipeline", client.pipeline).build()
: client.baseUrl.resolve("_bulk");

Request request = new Request.Builder().url(url).tag(tag)
.post(RequestBody.create(APPLICATION_JSON, body.readByteString())).build();

return new CallbackListenableFuture<Void>(client.http.newCall(request)) {
new CallbackAdapter<Void>(client.http.newCall(request), callback) {
@Override Void convert(ResponseBody responseBody) throws IOException {
if (!indices.isEmpty()) {
client.flush(Joiner.on(',').join(indices));
if (indices.isEmpty()) return null;
Iterator<String> index = indices.iterator();
StringBuilder indexString = new StringBuilder(index.next());
while (index.hasNext()) {
indexString.append(',').append(index.next());
}
client.flush(indexString.toString());
return null;
}
}.enqueue();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -28,7 +28,7 @@ final class HttpBulkSpanIndexer extends HttpBulkIndexer<Span> implements
}

@Override
public HttpBulkSpanIndexer add(String index, Span span, Long timestampMillis) throws IOException {
public HttpBulkSpanIndexer add(String index, Span span, Long timestampMillis) {
String id = null; // Allow ES to choose an ID
if (timestampMillis == null) {
super.add(index, span, id);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -102,7 +102,7 @@ static final class Factory implements InternalElasticsearchClient.Factory {
this.allIndices = new String[] {allIndices};
}

@Override protected String getVersion() throws IOException {
@Override protected String getVersion() {
Request getNode = new Request.Builder().url(baseUrl).tag("get-node").build();

try (Response response = http.newCall(getNode).execute()) {
Expand All @@ -113,21 +113,25 @@ static final class Factory implements InternalElasticsearchClient.Factory {
JsonReader version = enterPath(JsonReader.of(response.body().source()), "version", "number");
if (version == null) throw new IllegalStateException(".version.number not in response");
return version.nextString();
} catch (IOException e) {
throw new IllegalStateException("Could not get version", e);
}
}

/**
* This is a blocking call, used inside a lazy. That's because no writes should occur until the
* template is available.
*/
@Override protected void ensureTemplate(String name, String indexTemplate) throws IOException {
@Override protected void ensureTemplate(String name, String indexTemplate) {
HttpUrl templateUrl = baseUrl.newBuilder("_template").addPathSegment(name).build();
Request request = new Request.Builder().url(templateUrl).tag("get-template").build();

try (Response response = http.newCall(request).execute()) {
if (response.isSuccessful()) {
return;
}
} catch (IOException e) {
throw new IllegalStateException("Could not get " + templateUrl.encodedPath(), e);
}

Call putTemplate = http.newCall(new Request.Builder()
Expand All @@ -139,6 +143,8 @@ static final class Factory implements InternalElasticsearchClient.Factory {
if (!response.isSuccessful()) {
throw new IllegalStateException(response.body().string());
}
} catch (IOException e) {
throw new IllegalStateException("Could not put " + templateUrl.encodedPath(), e);
}
}

Expand Down Expand Up @@ -217,7 +223,7 @@ protected ListenableFuture<List<DependencyLink>> findDependencies(String[] indic
}

/** This is blocking so that we can determine if the cluster is healthy or not */
@Override protected void ensureClusterReady(String catchAll) throws IOException {
@Override protected void ensureClusterReady(String catchAll) {
Call getHealth = http.newCall(
new Request.Builder().url(baseUrl.resolve("/_cluster/health/" + catchAll))
.tag("get-cluster-health").build());
Expand All @@ -227,8 +233,9 @@ protected ListenableFuture<List<DependencyLink>> findDependencies(String[] indic
JsonReader status = enterPath(JsonReader.of(response.body().source()), "status");
checkState(status != null, "Health status couldn't be read %s", response);
checkState(!"RED".equalsIgnoreCase(status.nextString()), "Health status is RED");
return;
}
} catch (IOException e) {
throw new IllegalStateException("could not get " + getHealth.request().url(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.storage.elasticsearch.http;

import com.google.common.util.concurrent.SimpleTimeLimiter;
import com.google.common.util.concurrent.UncheckedTimeoutException;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import okhttp3.Call;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.ResponseBody;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import zipkin.internal.CallbackCaptor;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;

public class CallbackAdapterTest {
@Rule
public MockWebServer mws = new MockWebServer();

OkHttpClient client = new OkHttpClient();
CallbackCaptor<Void> callback = new CallbackCaptor<>();
Call call = client.newCall(new Request.Builder().url(mws.url("")).build());

@After
public void close() throws IOException {
client.dispatcher().executorService().shutdownNow();
}

@Test
public void propagatesOnDispatcherThreadWhenFatal() throws Exception {
mws.enqueue(new MockResponse());

new CallbackAdapter<Void>(call, callback) {
@Override Void convert(ResponseBody responseBody) throws IOException {
throw new LinkageError();
}
}.enqueue();

SimpleTimeLimiter timeLimiter = new SimpleTimeLimiter();
try {
timeLimiter.callWithTimeout(callback::get, 100, TimeUnit.MILLISECONDS, true);
failBecauseExceptionWasNotThrown(UncheckedTimeoutException.class);
} catch (UncheckedTimeoutException expected) {
}
}

@Test
public void executionException_conversionException() throws Exception {
mws.enqueue(new MockResponse());

new CallbackAdapter<Void>(call, callback) {
@Override Void convert(ResponseBody responseBody) throws IOException {
throw new IllegalArgumentException("eeek");
}
}.enqueue();

try {
callback.get();
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException expected) {
assertThat(expected).isInstanceOf(IllegalArgumentException.class);
}
}

@Test
public void executionException_httpFailure() throws Exception {
mws.enqueue(new MockResponse().setResponseCode(500));

new CallbackAdapter<Void>(call, callback).enqueue();

try {
callback.get();
failBecauseExceptionWasNotThrown(IllegalStateException.class);
} catch (IllegalStateException expected) {
assertThat(expected).isInstanceOf(IllegalStateException.class);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -24,6 +24,7 @@
import org.junit.rules.ExpectedException;
import zipkin.Codec;
import zipkin.TestObjects;
import zipkin.internal.CallbackCaptor;

import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -35,6 +36,8 @@ public class HttpBulkSpanIndexerTest {
@Rule
public MockWebServer es = new MockWebServer();

CallbackCaptor<Void> callback = new CallbackCaptor<>();

HttpBulkSpanIndexer indexer =
new HttpBulkSpanIndexer((HttpClient) new HttpClientBuilder(new OkHttpClient())
.hosts(asList(es.url("").toString()))
Expand All @@ -50,7 +53,8 @@ public void doesntWriteSpanId() throws Exception {
es.enqueue(new MockResponse());

indexer.add("test_zipkin_http-2016-10-01", TestObjects.LOTS_OF_SPANS[0], (Long) null);
indexer.execute().get();
indexer.execute(callback);
callback.get();

RecordedRequest request = es.takeRequest();
assertThat(request.getBody().readByteString().utf8())
Expand All @@ -62,7 +66,8 @@ public void writesSpanNaturallyWhenNoTimestamp() throws Exception {
es.enqueue(new MockResponse());

indexer.add("test_zipkin_http-2016-10-01", TestObjects.LOTS_OF_SPANS[0], (Long) null);
indexer.execute().get();
indexer.execute(callback);
callback.get();

RecordedRequest request = es.takeRequest();
assertThat(request.getBody().readByteString().utf8())
Expand Down
Loading

0 comments on commit b543f09

Please sign in to comment.