Skip to content

Commit

Permalink
Removes double-conversion when collecting into Elasticsearch (openzip…
Browse files Browse the repository at this point in the history
…kin#1700)

Before, when accepting span2 format, we needlessly converted data to and
from the legacy Span format. This removes the double-conversion in favor
of a version 2 native span consumer.
  • Loading branch information
adriancole authored and abesto committed Sep 10, 2019
1 parent aadb002 commit 6a3d889
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@
import okio.Buffer;
import okio.ByteString;
import zipkin.Annotation;
import zipkin.Span;
import zipkin.internal.Nullable;
import zipkin.internal.Span2;
import zipkin.internal.Span2Converter;
import zipkin.internal.v2.codec.Encoder;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.internal.v2.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;

import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
import static zipkin.internal.Util.propagateIfFatal;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;

Expand All @@ -46,13 +43,13 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
this.indexNameFormatter = es.indexNameFormatter();
}

@Override public void accept(List<Span> spans, Callback<Void> callback) {
@Override public void accept(List<Span2> spans, Callback<Void> callback) {
if (spans.isEmpty()) {
callback.onSuccess(null);
return;
}
try {
BulkSpanIndexer indexer = newBulkSpanIndexer(es);
BulkSpanIndexer indexer = new BulkSpanIndexer(es);
indexSpans(indexer, spans);
indexer.execute(callback);
} catch (Throwable t) {
Expand All @@ -61,19 +58,17 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
}
}

void indexSpans(BulkSpanIndexer indexer, List<Span> spans) throws IOException {
for (Span span : spans) {
Long timestamp = guessTimestamp(span);
void indexSpans(BulkSpanIndexer indexer, List<Span2> spans) throws IOException {
for (Span2 span : spans) {
Long spanTimestamp = span.timestamp();
long indexTimestamp = 0L; // which index to store this span into
Long spanTimestamp;
if (timestamp != null) {
indexTimestamp = spanTimestamp = TimeUnit.MICROSECONDS.toMillis(timestamp);
if (spanTimestamp != null) {
indexTimestamp = spanTimestamp = TimeUnit.MICROSECONDS.toMillis(spanTimestamp);
} else {
spanTimestamp = null;
// guessTimestamp is made for determining the span's authoritative timestamp. When choosing
// the index bucket, any annotation is better than using current time.
for (int i = 0, length = span.annotations.size(); i < length; i++) {
indexTimestamp = span.annotations.get(i).timestamp / 1000;
for (int i = 0, length = span.annotations().size(); i < length; i++) {
indexTimestamp = span.annotations().get(i).timestamp / 1000;
break;
}
if (indexTimestamp == 0L) indexTimestamp = System.currentTimeMillis();
Expand All @@ -82,10 +77,6 @@ void indexSpans(BulkSpanIndexer indexer, List<Span> spans) throws IOException {
}
}

BulkSpanIndexer newBulkSpanIndexer(ElasticsearchHttpStorage es) {
return new BulkSpanIndexer(es);
}

static class BulkSpanIndexer {
final HttpBulkIndexer indexer;
final IndexNameFormatter indexNameFormatter;
Expand All @@ -95,12 +86,10 @@ static class BulkSpanIndexer {
this.indexNameFormatter = es.indexNameFormatter();
}

void add(long indexTimestamp, Span span, @Nullable Long timestampMillis) {
void add(long indexTimestamp, Span2 span, @Nullable Long timestampMillis) {
String index = indexNameFormatter.formatTypeAndTimestamp(SPAN, indexTimestamp);
for (Span2 span2 : Span2Converter.fromSpan(span)) {
byte[] document = prefixWithTimestampMillisAndQuery(span2, timestampMillis);
indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
}
byte[] document = prefixWithTimestampMillisAndQuery(span, timestampMillis);
indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
}

void execute(Callback<Void> callback) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import okhttp3.Request;
import okhttp3.RequestBody;
import okio.Buffer;
import zipkin.internal.AsyncSpan2ConsumerAdapter;
import zipkin.internal.Nullable;
import zipkin.internal.Span2Component;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
Expand All @@ -42,8 +44,7 @@
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;

@AutoValue
public abstract class ElasticsearchHttpStorage implements StorageComponent {

public abstract class ElasticsearchHttpStorage extends Span2Component implements StorageComponent {
/**
* A list of elasticsearch nodes to connect to, in http://host:port or https://host:port
* format. Note this value is only read once.
Expand Down Expand Up @@ -222,6 +223,10 @@ public final Builder dateSeparator(char dateSeparator) {
}

@Override public AsyncSpanConsumer asyncSpanConsumer() {
return AsyncSpan2ConsumerAdapter.create(asyncSpan2Consumer());
}

@Override protected zipkin.internal.v2.storage.AsyncSpanConsumer asyncSpan2Consumer() {
ensureIndexTemplates();
return new ElasticsearchHttpSpanConsumer(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,17 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import zipkin.Annotation;
import zipkin.BinaryAnnotation;
import zipkin.Codec;
import zipkin.Span;
import zipkin.TestObjects;
import zipkin.internal.CallbackCaptor;
import zipkin.internal.Span2;
import zipkin.internal.Span2Converter;
import zipkin.internal.v2.codec.MessageEncoder;
import zipkin.internal.Span2.Kind;
import zipkin.internal.v2.codec.Decoder;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.codec.MessageEncoder;

import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static zipkin.Constants.CLIENT_SEND;
import static zipkin.Constants.SERVER_RECV;
import static zipkin.TestObjects.TODAY;
import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
import static zipkin.internal.Util.UTF_8;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanConsumer.prefixWithTimestampMillisAndQuery;

Expand Down Expand Up @@ -69,7 +63,7 @@ public void close() throws IOException {
@Test public void addsTimestamp_millisIntoJson() throws Exception {
es.enqueue(new MockResponse());

Span span = Span.builder().traceId(20L).id(20L).name("get")
Span2 span = Span2.builder().traceId(20L).id(20L).name("get")
.timestamp(TODAY * 1000).build();

accept(span);
Expand All @@ -79,77 +73,69 @@ public void close() throws IOException {
}

@Test public void prefixWithTimestampMillisAndQuery_skipsWhenNoData() throws Exception {
Span span = Span.builder().traceId(20L).id(22L).name("").parentId(21L).timestamp(0L)
.addAnnotation(Annotation.create(0, CLIENT_SEND, TestObjects.WEB_ENDPOINT))
Span2 span = Span2.builder().traceId(20L).id(22L).name("").parentId(21L).timestamp(0L)
.localEndpoint(TestObjects.WEB_ENDPOINT)
.kind(Kind.CLIENT)
.build();

byte[] result = prefixWithTimestampMillisAndQuery(
Span2Converter.fromSpan(span).get(0),
span.timestamp
);
byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestamp());

assertThat(new String(result, UTF_8))
.startsWith("{\"traceId\":\"");
}

@Test public void prefixWithTimestampMillisAndQuery_addsTimestampMillis() throws Exception {
Span span = Span.builder().traceId(20L).id(22L).name("").parentId(21L).timestamp(1L)
.addAnnotation(Annotation.create(1L, CLIENT_SEND, TestObjects.WEB_ENDPOINT))
Span2 span = Span2.builder().traceId(20L).id(22L).name("").parentId(21L).timestamp(1L)
.localEndpoint(TestObjects.WEB_ENDPOINT)
.kind(Kind.CLIENT)
.build();

byte[] result = prefixWithTimestampMillisAndQuery(
Span2Converter.fromSpan(span).get(0),
span.timestamp
);
byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestamp());

assertThat(new String(result, UTF_8))
.startsWith("{\"timestamp_millis\":1,\"traceId\":");
}

@Test public void prefixWithTimestampMillisAndQuery_addsAnnotationQuery() throws Exception {
Span span = Span.builder().traceId(20L).id(22L).name("").parentId(21L)
.addAnnotation(Annotation.create(1L, "\"foo", TestObjects.WEB_ENDPOINT))
Span2 span = Span2.builder().traceId(20L).id(22L).name("").parentId(21L)
.localEndpoint(TestObjects.WEB_ENDPOINT)
.addAnnotation(1L, "\"foo")
.build();

byte[] result = prefixWithTimestampMillisAndQuery(
Span2Converter.fromSpan(span).get(0),
span.timestamp
);
byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestamp());

assertThat(new String(result, UTF_8))
.startsWith("{\"_q\":[\"\\\"foo\"],\"traceId");
}

@Test public void prefixWithTimestampMillisAndQuery_addsAnnotationQueryTags() throws Exception {
Span span = Span.builder().traceId(20L).id(22L).name("").parentId(21L)
.addBinaryAnnotation(BinaryAnnotation.create("\"foo", "\"bar", TestObjects.WEB_ENDPOINT))
Span2 span = Span2.builder().traceId(20L).id(22L).name("").parentId(21L)
.localEndpoint(TestObjects.WEB_ENDPOINT)
.putTag("\"foo", "\"bar")
.build();

byte[] result = prefixWithTimestampMillisAndQuery(Span2Converter.fromSpan(span).get(0),
span.timestamp
);
byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestamp());

assertThat(new String(result, UTF_8))
.startsWith("{\"_q\":[\"\\\"foo\",\"\\\"foo=\\\"bar\"],\"traceId");
}

@Test public void prefixWithTimestampMillisAndQuery_readable() throws Exception {
Span span = Span.builder().traceId(20L).id(20L).name("get")
Span2 span = Span2.builder().traceId(20L).id(20L).name("get")
.timestamp(TODAY * 1000).build();
Span2 span2 = Span2Converter.fromSpan(span).get(0);

byte[] message = MessageEncoder.JSON_BYTES.encode(asList(
prefixWithTimestampMillisAndQuery(span2, span.timestamp)
prefixWithTimestampMillisAndQuery(span, span.timestamp())
));

assertThat(Decoder.JSON.decodeList(message))
.containsOnly(span2); // ignores timestamp_millis field
.containsOnly(span); // ignores timestamp_millis field
}

@Test public void doesntWriteSpanId() throws Exception {
@Test public void doesntWriteDocumentId() throws Exception {
es.enqueue(new MockResponse());

accept(TestObjects.LOTS_OF_SPANS[0]);
accept(Span2.builder().traceId(1L).id(1L).name("foo").build());

RecordedRequest request = es.takeRequest();
assertThat(request.getBody().readByteString().utf8())
Expand All @@ -159,30 +145,34 @@ public void close() throws IOException {
@Test public void writesSpanNaturallyWhenNoTimestamp() throws Exception {
es.enqueue(new MockResponse());

Span span = Span.builder().traceId(1L).id(1L).name("foo").build();
accept(span);
Span2 span = Span2.builder().traceId(1L).id(1L).name("foo").build();
accept(Span2.builder().traceId(1L).id(1L).name("foo").build());

assertThat(es.takeRequest().getBody().readByteString().utf8())
.contains("\n" + new String(Codec.JSON.writeSpan(span), UTF_8) + "\n");
.contains("\n" + new String(Encoder.JSON.encode(span), UTF_8) + "\n");
}

@Test public void traceIsSearchableBySRServiceName() throws Exception {
@Test public void traceIsSearchableByServerServiceName() throws Exception {
es.enqueue(new MockResponse());

Span clientSpan = Span.builder().traceId(20L).id(22L).name("").parentId(21L).timestamp(0L)
.addAnnotation(Annotation.create(0, CLIENT_SEND, TestObjects.WEB_ENDPOINT))
Span2 clientSpan = Span2.builder().traceId(20L).id(22L).name("").parentId(21L)
.timestamp(1000L)
.kind(Kind.CLIENT)
.localEndpoint(TestObjects.WEB_ENDPOINT)
.build();

Span serverSpan = Span.builder().traceId(20L).id(22L).name("get").parentId(21L)
.addAnnotation(Annotation.create(1000, SERVER_RECV, TestObjects.APP_ENDPOINT))
Span2 serverSpan = Span2.builder().traceId(20L).id(22L).name("get").parentId(21L)
.timestamp(2000L)
.kind(Kind.SERVER)
.localEndpoint(TestObjects.APP_ENDPOINT)
.build();

accept(serverSpan, clientSpan);

// make sure that both timestamps are in the index
assertThat(es.takeRequest().getBody().readByteString().utf8())
.contains("{\"timestamp_millis\":1")
.contains("{\"timestamp_millis\":0");
.contains("{\"timestamp_millis\":2")
.contains("{\"timestamp_millis\":1");
}

@Test public void addsPipelineId() throws Exception {
Expand All @@ -196,7 +186,7 @@ public void close() throws IOException {

es.enqueue(new MockResponse());

accept(TestObjects.TRACE.get(0));
accept(Span2.builder().traceId(1L).id(1L).name("foo").build());

RecordedRequest request = es.takeRequest();
assertThat(request.getPath())
Expand All @@ -206,17 +196,13 @@ public void close() throws IOException {
@Test public void choosesTypeSpecificIndex() throws Exception {
es.enqueue(new MockResponse());

Annotation foo = Annotation.create(
TimeUnit.DAYS.toMicros(365), // 1971-01-01
"foo",
TestObjects.APP_ENDPOINT
);

Span span = Span.builder().traceId(1L).id(2L).parentId(1L).name("s").addAnnotation(foo).build();
Span2 span = Span2.builder().traceId(1L).id(2L).parentId(1L).name("s")
.localEndpoint(TestObjects.APP_ENDPOINT)
.addAnnotation(TimeUnit.DAYS.toMicros(365) /* 1971-01-01 */, "foo")
.build();

// sanity check data
assertThat(span.timestamp).isNull();
assertThat(guessTimestamp(span)).isNull();
assertThat(span.timestamp()).isNull();

accept(span);

Expand All @@ -226,9 +212,9 @@ public void close() throws IOException {
);
}

void accept(Span... spans) throws Exception {
void accept(Span2... spans) throws Exception {
CallbackCaptor<Void> callback = new CallbackCaptor<>();
storage.asyncSpanConsumer().accept(asList(spans), callback);
storage.asyncSpan2Consumer().accept(asList(spans), callback);
callback.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class ElasticsearchHttpStorageTest {
es.enqueue(new MockResponse()); // get dependency template

// check this isn't the legacy consumer
assertThat(storage.asyncSpanConsumer())
assertThat(storage.asyncSpan2Consumer())
.isInstanceOf(ElasticsearchHttpSpanConsumer.class);
// check this isn't the double reading span store
assertThat(storage.asyncSpanStore())
Expand All @@ -101,9 +101,6 @@ public class ElasticsearchHttpStorageTest {
es.enqueue(new MockResponse()); // get span template
es.enqueue(new MockResponse()); // get dependency template

// check this isn't the legacy consumer
assertThat(storage.asyncSpanConsumer())
.isInstanceOf(ElasticsearchHttpSpanConsumer.class);
// check that we do double-reads on the legacy and new format
assertThat(storage.asyncSpanStore())
.isInstanceOf(LenientDoubleCallbackAsyncSpanStore.class);
Expand All @@ -124,9 +121,6 @@ public class ElasticsearchHttpStorageTest {
es.enqueue(new MockResponse()); // get span template
es.enqueue(new MockResponse()); // get dependency template

// check this isn't the legacy consumer
assertThat(storage.asyncSpanConsumer())
.isInstanceOf(ElasticsearchHttpSpanConsumer.class);
// check this isn't the double reading span store
assertThat(storage.asyncSpanStore())
.isInstanceOf(ElasticsearchHttpSpanStore.class);
Expand Down
Loading

0 comments on commit 6a3d889

Please sign in to comment.