Skip to content

Commit

Permalink
Teaches the v2 tree how to encode v1 format (openzipkin#1728)
Browse files Browse the repository at this point in the history
The zipkin v2 library is intended to be used by itself in tracers. Since
tracers cannot control the span transport, it likely will need to write
the old (historical) json format for at least months. To avoid requiring
dual versions of jars to accomplish this, we add an encoder that writes
json v1 format given a v2 span.

Note: we don't decode as the library problem is easier to manage on the
server side. For example, zipkin-server will need both versions of the
jars for things like collection for a while.
  • Loading branch information
adriancole authored and abesto committed Sep 10, 2019
1 parent efccda5 commit ff86e6e
Show file tree
Hide file tree
Showing 31 changed files with 947 additions and 308 deletions.
37 changes: 24 additions & 13 deletions benchmarks/src/main/java/zipkin/benchmarks/CodecBenchmarks.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import zipkin.Endpoint;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.codec.SpanBytesCodec;
import zipkin.internal.v2.codec.SpanBytesEncoder;

/**
* This compares the speed of the bundled java codec with the approach used in the scala
Expand Down Expand Up @@ -155,29 +156,39 @@ public byte[] writeClientSpan_thrift_libthrift() throws TException {
return serialize(clientSpanLibThrift);
}

static final byte[] span2Json = read("/span2.json");
static final Span span2 = SpanBytesCodec.JSON.decode(span2Json);
static final List<Span> tenSpan2s = Collections.nCopies(10, span2);
static final byte[] tenSpan2sJson = SpanBytesCodec.JSON.encodeList(tenSpan2s);
static final byte[] zipkin2Json = read("/zipkin2-client.json");
static final Span zipkin2 = SpanBytesCodec.JSON_V2.decode(zipkin2Json);
static final List<Span> tenSpan2s = Collections.nCopies(10, zipkin2);
static final byte[] tenSpan2sJson = SpanBytesCodec.JSON_V2.encodeList(tenSpan2s);

@Benchmark
public Span readClientSpan_json_span2() {
return SpanBytesCodec.JSON.decode(span2Json);
public Span readClientSpan_json_zipkin2() {
return SpanBytesCodec.JSON_V2.decode(zipkin2Json);
}

@Benchmark
public List<Span> readTenClientSpans_json_span2() {
return SpanBytesCodec.JSON.decodeList(tenSpan2sJson);
public List<Span> readTenClientSpans_json_zipkin2() {
return SpanBytesCodec.JSON_V2.decodeList(tenSpan2sJson);
}

@Benchmark
public byte[] writeClientSpan_json_span2() {
return SpanBytesCodec.JSON.encode(span2);
public byte[] writeClientSpan_json_zipkin2() {
return SpanBytesEncoder.JSON_V2.encode(zipkin2);
}

@Benchmark
public byte[] writeTenClientSpans_json_span2() {
return SpanBytesCodec.JSON.encodeList(tenSpan2s);
public byte[] writeTenClientSpans_json_zipkin2() {
return SpanBytesEncoder.JSON_V2.encodeList(tenSpan2s);
}

@Benchmark
public byte[] writeClientSpan_json_zipkin2_legacy() {
return SpanBytesEncoder.JSON_V1.encode(zipkin2);
}

@Benchmark
public byte[] writeTenClientSpans_json_zipkin2_legacy() {
return SpanBytesEncoder.JSON_V1.encodeList(tenSpan2s);
}

static final byte[] rpcSpanJson = read("/span-rpc.json");
Expand Down Expand Up @@ -253,7 +264,7 @@ public byte[] writeRpcV6Span_thrift_libthrift() throws TException {
// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(".*" + CodecBenchmarks.class.getSimpleName() + ".*ClientSpan.*")
.include(".*" + CodecBenchmarks.class.getSimpleName() + ".*")
.build();

new Runner(opt).run();
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/main/resources/span-client.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
},
{
"timestamp": 1472470996238000,
"value": "ws",
"value": "foo",
"endpoint": {
"serviceName": "frontend",
"ipv4": "127.0.0.1"
}
},
{
"timestamp": 1472470996403000,
"value": "wr",
"value": "bar",
"endpoint": {
"serviceName": "frontend",
"ipv4": "127.0.0.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
"annotations": [
{
"timestamp": 1472470996238000,
"value": "ws"
"value": "foo"
},
{
"timestamp": 1472470996403000,
"value": "wr"
"value": "bar"
}
],
"tags": {
Expand Down
2 changes: 1 addition & 1 deletion zipkin-collector/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ for (int i = 0; i < count; i++) {

### Legacy encoding
Older versions of zipkin accepted a single span per message, as opposed
to a list per message. This practice is deprecated, but still supported.
to a list per message. This practice is deprecated, but still supported.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import zipkin.collector.kafka.KafkaCollector.Builder;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.V2SpanConverter;
import zipkin.internal.v2.codec.SpanBytesCodec;
import zipkin.internal.v2.codec.SpanBytesEncoder;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
Expand Down Expand Up @@ -146,7 +146,7 @@ public void messageWithMultipleSpans_json2() throws Exception {
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] message = SpanBytesCodec.JSON.encodeList(asList(
byte[] message = SpanBytesEncoder.JSON_V2.encodeList(asList(
V2SpanConverter.fromSpan(spans.get(0)).get(0),
V2SpanConverter.fromSpan(spans.get(1)).get(0)
));
Expand Down
8 changes: 4 additions & 4 deletions zipkin-collector/kafka10/README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# collector-kafka10

## KafkaCollector
This collector is implemented as a Kafka consumer supporting Kafka brokers running
This collector is implemented as a Kafka consumer supporting Kafka brokers running
version 0.10.0.0 or later. It polls a Kafka topic for messages that contain
a list of spans in json or TBinaryProtocol big-endian encoding. These
spans are pushed to a span consumer.

For information about running this collector as a module in Zipkin server, see
For information about running this collector as a module in Zipkin server, see
[zipkin-autoconfigure/collector-kafka10](../../zipkin-autoconfigure/collector-kafka10/).

When using this collector as a library outside of Zipkin server,
When using this collector as a library outside of Zipkin server,
[zipkin.collector.kafka10.KafkaCollector.Builder](src/main/java/zipkin/collector/kafka10/KafkaCollector.java)
includes defaults that will operate against a Kafka topic name `zipkin`.

Expand Down Expand Up @@ -43,4 +43,4 @@ for (int i = 0; i < count; i++) {

### Legacy encoding
Older versions of zipkin accepted a single span per message, as opposed
to a list per message. This practice is deprecated, but still supported.
to a list per message. This practice is deprecated, but still supported.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import zipkin.collector.kafka10.KafkaCollector.Builder;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.V2SpanConverter;
import zipkin.internal.v2.codec.SpanBytesCodec;
import zipkin.internal.v2.codec.SpanBytesEncoder;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
Expand Down Expand Up @@ -200,7 +200,7 @@ public void messageWithMultipleSpans_json2() throws Exception {
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] message = SpanBytesCodec.JSON.encodeList(asList(
byte[] message = SpanBytesEncoder.JSON_V2.encodeList(asList(
V2SpanConverter.fromSpan(spans.get(0)).get(0),
V2SpanConverter.fromSpan(spans.get(1)).get(0)
));
Expand Down
4 changes: 2 additions & 2 deletions zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ MockResponse queryV2(HttpUrl url) throws IOException {
return jsonResponse(DependencyLinkBytesCodec.JSON.encodeList(result));
} else if (url.encodedPath().equals("/api/v2/traces")) {
List<List<zipkin.internal.v2.Span>> traces = store2.getTraces(toQueryRequest2(url)).execute();
return jsonResponse(SpanBytesCodec.JSON.encodeNestedList(traces));
return jsonResponse(SpanBytesCodec.JSON_V2.encodeNestedList(traces));
} else if (url.encodedPath().startsWith("/api/v2/trace/")) {
String traceIdHex = url.encodedPath().replace("/api/v2/trace/", "");
List<zipkin.internal.v2.Span> trace = store2.getTrace(normalizeTraceId(traceIdHex)).execute();
if (!trace.isEmpty()) return jsonResponse(SpanBytesCodec.JSON.encodeList(trace));
if (!trace.isEmpty()) return jsonResponse(SpanBytesCodec.JSON_V2.encodeList(trace));
}
return new MockResponse().setResponseCode(404);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void getTraces_storedViaPostVersion2() throws IOException {
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] message = SpanBytesCodec.JSON.encodeList(asList(
byte[] message = SpanBytesCodec.JSON_V2.encodeList(asList(
V2SpanConverter.fromSpan(spans.get(0)).get(0),
V2SpanConverter.fromSpan(spans.get(1)).get(0)
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ final class HttpV2SpanConsumer implements SpanConsumer {
}

@Override public zipkin.internal.v2.Call<Void> accept(List<Span> spans) {
byte[] json = SpanBytesCodec.JSON.encodeList(spans);
byte[] json = SpanBytesCodec.JSON_V2.encodeList(spans);
return factory.newCall(new Request.Builder()
.url(factory.baseUrl.resolve("/api/v2/spans"))
.post(RequestBody.create(MediaType.parse("application/json"), json)).build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ final class HttpV2SpanStore implements SpanStore {
maybeAddQueryParam(url, "lookback", request.lookback());
maybeAddQueryParam(url, "limit", request.limit());
return factory.newCall(new Request.Builder().url(url.build()).build(),
content -> SpanBytesCodec.JSON.decodeNestedList(content.readByteArray()));
content -> SpanBytesCodec.JSON_V2.decodeNestedList(content.readByteArray()));
}

@Override public Call<List<Span>> getTrace(String traceId) {
return factory.newCall(new Request.Builder()
.url(factory.baseUrl.resolve("/api/v2/trace/" + Span.normalizeTraceId(traceId)))
.build(), content -> SpanBytesCodec.JSON.decodeList(content.readByteArray()))
.build(), content -> SpanBytesCodec.JSON_V2.decodeList(content.readByteArray()))
.handleError(((error, callback) -> {
if (error instanceof HttpException && ((HttpException) error).code == 404) {
callback.onSuccess(Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public String getTraces(
.limit(limit).build();

List<List<Span>> traces = storage.v2SpanStore().getTraces(queryRequest).execute();
return new String(SpanBytesCodec.JSON.encodeNestedList(traces), UTF_8);
return new String(SpanBytesCodec.JSON_V2.encodeNestedList(traces), UTF_8);
}

@RequestMapping(value = "/trace/{traceIdHex}", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE)
Expand All @@ -137,7 +137,7 @@ public String getTrace(@PathVariable String traceIdHex, WebRequest request) thro

List<Span> trace = storage.v2SpanStore().getTrace(traceIdHex).execute();
if (trace.isEmpty()) throw new TraceNotFoundException(traceIdHex);
return new String(SpanBytesCodec.JSON.encodeList(trace), UTF_8);
return new String(SpanBytesCodec.JSON_V2.encodeList(trace), UTF_8);
}

@ExceptionHandler(Version2StorageNotConfigured.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.V2InMemoryStorage;
import zipkin.internal.V2SpanConverter;
import zipkin.internal.v2.codec.SpanBytesCodec;
import zipkin.internal.v2.codec.SpanBytesEncoder;

import static java.lang.String.format;
import static java.util.Arrays.asList;
Expand Down Expand Up @@ -93,7 +93,7 @@ public void writeSpans_noContentTypeIsJson() throws Exception {
public void writeSpans_version2() throws Exception {
Span span = ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]);

byte[] message = SpanBytesCodec.JSON.encodeList(asList(
byte[] message = SpanBytesEncoder.JSON_V2.encodeList(asList(
V2SpanConverter.fromSpan(span).get(0)
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ static byte[] prefixWithTimestampMillisAndQuery(Span span, @Nullable Long timest
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "Error indexing query for span: " + span, e);
}
return SpanBytesEncoder.JSON.encode(span);
return SpanBytesEncoder.JSON_V2.encode(span);
}
byte[] document = SpanBytesEncoder.JSON.encode(span);
byte[] document = SpanBytesEncoder.JSON_V2.encode(span);
if (query.rangeEquals(0L, ByteString.of(new byte[] {'{', '}'}))) {
return document;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void close() throws IOException {
Span span = Span.newBuilder().traceId("20").id("20").name("get")
.timestamp(TODAY * 1000).build();

assertThat(SpanBytesCodec.JSON.decode(prefixWithTimestampMillisAndQuery(span, span.timestamp())))
assertThat(SpanBytesCodec.JSON_V2.decode(prefixWithTimestampMillisAndQuery(span, span.timestamp())))
.isEqualTo(span); // ignores timestamp_millis field
}

Expand All @@ -144,7 +144,7 @@ public void close() throws IOException {
accept(Span.newBuilder().traceId("1").id("1").name("foo").build());

assertThat(es.takeRequest().getBody().readByteString().utf8())
.contains("\n" + new String(SpanBytesCodec.JSON.encode(span), "UTF-8") + "\n");
.contains("\n" + new String(SpanBytesCodec.JSON_V2.encode(span), "UTF-8") + "\n");
}

@Test public void traceIsSearchableByServerServiceName() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void span_roundTrip() throws IOException {
zipkin.Span span = ApplyTimestampAndDuration.apply(TestObjects.LOTS_OF_SPANS[0]);
Span span2 = V2SpanConverter.fromSpan(span).get(0);
Buffer bytes = new Buffer();
bytes.write(SpanBytesEncoder.JSON.encode(span2));
bytes.write(SpanBytesEncoder.JSON_V2.encode(span2));
assertThat(SPAN_ADAPTER.fromJson(bytes))
.isEqualTo(span2);
}
Expand All @@ -158,7 +158,7 @@ public void span_specialCharsInJson() throws IOException {
.build();

Buffer bytes = new Buffer();
bytes.write(SpanBytesEncoder.JSON.encode(worstSpanInTheWorld));
bytes.write(SpanBytesEncoder.JSON_V2.encode(worstSpanInTheWorld));
assertThat(SPAN_ADAPTER.fromJson(bytes))
.isEqualTo(worstSpanInTheWorld);
}
Expand Down
2 changes: 1 addition & 1 deletion zipkin/src/main/java/zipkin/collector/Collector.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void acceptSpans(byte[] serializedSpans, SpanDecoder decoder, Callback<Vo
return;
}
if (storage2 != null && decoder instanceof V2JsonSpanDecoder) {
storage2.acceptSpans(serializedSpans, SpanBytesCodec.JSON, callback);
storage2.acceptSpans(serializedSpans, SpanBytesCodec.JSON_V2, callback);
} else {
super.acceptSpans(serializedSpans, decoder, callback);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public final class V2JsonSpanDecoder implements SpanDecoder {
}

@Override public List<zipkin.Span> readSpans(byte[] span) {
List<Span> span2s = SpanBytesCodec.JSON.decodeList(span);
List<Span> span2s = SpanBytesCodec.JSON_V2.decodeList(span);
if (span2s.isEmpty()) return Collections.emptyList();
int length = span2s.size();
List<zipkin.Span> result = new ArrayList<>(length);
Expand Down
2 changes: 1 addition & 1 deletion zipkin/src/main/java/zipkin/internal/v2/Span.java
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ public Span build() {
}

@Override public String toString() {
return new String(SpanBytesEncoder.JSON.encode(this), UTF_8);
return new String(SpanBytesEncoder.JSON_V2.encode(this), UTF_8);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,26 @@
import zipkin.internal.v2.Endpoint;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.internal.JsonCodec;
import zipkin.internal.v2.internal.V2SpanWriter;

/** This is separate from {@link SpanBytesEncoder}, as it isn't needed for instrumentation */
public enum SpanBytesCodec implements BytesEncoder<Span>, BytesDecoder<Span> {
/** Corresponds to the Zipkin v2 json format */
JSON {
JSON_V2 {
@Override public Encoding encoding() {
return Encoding.JSON;
}

@Override public int sizeInBytes(Span input) {
return SpanBytesEncoder.JSON.sizeInBytes(input);
return SpanBytesEncoder.JSON_V2.sizeInBytes(input);
}

@Override public byte[] encode(Span input) {
return SpanBytesEncoder.JSON.encode(input);
return SpanBytesEncoder.JSON_V2.encode(input);
}

@Override public byte[] encodeList(List<Span> input) {
return SpanBytesEncoder.JSON.encodeList(input);
return SpanBytesEncoder.JSON_V2.encodeList(input);
}

@Override public Span decode(byte[] span) { // ex decode span in dependencies job
Expand All @@ -53,7 +54,7 @@ public enum SpanBytesCodec implements BytesEncoder<Span>, BytesDecoder<Span> {
}

@Override public byte[] encodeNestedList(List<List<Span>> traces) {
return JsonCodec.writeNestedList(SpanBytesEncoder.SPAN_WRITER, traces);
return JsonCodec.writeNestedList(new V2SpanWriter(), traces);
}

@Override public List<List<Span>> decodeNestedList(byte[] traces) { // ex getTraces
Expand Down
Loading

0 comments on commit ff86e6e

Please sign in to comment.