diff --git a/zipkin-collector/kafka/src/main/java/zipkin/collector/kafka/KafkaStreamProcessor.java b/zipkin-collector/kafka/src/main/java/zipkin/collector/kafka/KafkaStreamProcessor.java index da597ab3b76..cd768bddb2b 100644 --- a/zipkin-collector/kafka/src/main/java/zipkin/collector/kafka/KafkaStreamProcessor.java +++ b/zipkin-collector/kafka/src/main/java/zipkin/collector/kafka/KafkaStreamProcessor.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -13,10 +13,8 @@ */ package zipkin.collector.kafka; -import java.util.Collections; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; -import zipkin.Codec; import zipkin.collector.Collector; import zipkin.collector.CollectorMetrics; @@ -47,24 +45,7 @@ public void run() { continue; } - // In TBinaryProtocol encoding, the first byte is the TType, in a range 0-16 - // .. If the first byte isn't in that range, it isn't a thrift. - // - // When byte(0) == '[' (91), assume it is a list of json-encoded spans - // - // When byte(0) <= 16, assume it is a TBinaryProtocol-encoded thrift - // .. When serializing a Span (Struct), the first byte will be the type of a field - // .. When serializing a List[ThriftSpan], the first byte is the member type, TType.STRUCT(12) - // .. As ThriftSpan has no STRUCT fields: so, if the first byte is TType.STRUCT(12), it is a list. - if (bytes[0] == '[') { - collector.acceptSpans(bytes, Codec.JSON, NOOP); - } else { - if (bytes[0] == 12 /* TType.STRUCT */) { - collector.acceptSpans(bytes, Codec.THRIFT, NOOP); - } else { - collector.acceptSpans(Collections.singletonList(bytes), Codec.THRIFT, NOOP); - } - } + collector.acceptSpans(bytes, NOOP); } } } diff --git a/zipkin-collector/kafka/src/test/java/zipkin/collector/kafka/KafkaCollectorTest.java b/zipkin-collector/kafka/src/test/java/zipkin/collector/kafka/KafkaCollectorTest.java index 74fb9cec96f..92d84013837 100644 --- a/zipkin-collector/kafka/src/test/java/zipkin/collector/kafka/KafkaCollectorTest.java +++ b/zipkin-collector/kafka/src/test/java/zipkin/collector/kafka/KafkaCollectorTest.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -13,6 +13,7 @@ */ package zipkin.collector.kafka; +import java.util.Arrays; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -29,12 +30,16 @@ import zipkin.TestObjects; import zipkin.collector.InMemoryCollectorMetrics; import zipkin.collector.kafka.KafkaCollector.Builder; +import zipkin.internal.ApplyTimestampAndDuration; +import zipkin.internal.Span2Codec; +import zipkin.internal.Span2Converter; import zipkin.storage.AsyncSpanConsumer; import zipkin.storage.AsyncSpanStore; import zipkin.storage.SpanStore; import zipkin.storage.StorageComponent; import static org.assertj.core.api.Assertions.assertThat; +import static zipkin.TestObjects.LOTS_OF_SPANS; import static zipkin.TestObjects.TRACE; public class KafkaCollectorTest { @@ -130,6 +135,32 @@ public void messageWithMultipleSpans_json() throws Exception { assertThat(kafkaMetrics.spans()).isEqualTo(TestObjects.TRACE.size()); } + /** Ensures list encoding works: a version 2 json encoded list of spans */ + @Test + public void messageWithMultipleSpans_json2() throws Exception { + Builder builder = builder("multiple_spans_json2"); + + List spans = Arrays.asList( + ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]), + ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1]) + ); + + byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList( + Span2Converter.fromSpan(spans.get(0)).get(0), + Span2Converter.fromSpan(spans.get(1)).get(0) + )); + + producer.send(new KeyedMessage<>(builder.topic, bytes)); + + try (KafkaCollector collector = newKafkaTransport(builder, consumer)) { + assertThat(recvdSpans.take()).containsAll(spans); + } + + assertThat(kafkaMetrics.messages()).isEqualTo(1); + assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length); + assertThat(kafkaMetrics.spans()).isEqualTo(spans.size()); + } + /** Ensures malformed spans don't hang the collector */ @Test public void skipsMalformedData() throws Exception { diff --git a/zipkin-collector/kafka10/src/main/java/zipkin/collector/kafka10/KafkaCollectorWorker.java b/zipkin-collector/kafka10/src/main/java/zipkin/collector/kafka10/KafkaCollectorWorker.java index 1dd673a8819..f6f849a0cd7 100644 --- a/zipkin-collector/kafka10/src/main/java/zipkin/collector/kafka10/KafkaCollectorWorker.java +++ b/zipkin-collector/kafka10/src/main/java/zipkin/collector/kafka10/KafkaCollectorWorker.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import zipkin.Codec; import zipkin.collector.Collector; import zipkin.collector.CollectorMetrics; @@ -74,24 +73,7 @@ public void run() { if (bytes.length == 0) { metrics.incrementMessagesDropped(); } else { - // In TBinaryProtocol encoding, the first byte is the TType, in a range 0-16 - // .. If the first byte isn't in that range, it isn't a thrift. - // - // When byte(0) == '[' (91), assume it is a list of json-encoded spans - // - // When byte(0) <= 16, assume it is a TBinaryProtocol-encoded thrift - // .. When serializing a Span (Struct), the first byte will be the type of a field - // .. When serializing a List[ThriftSpan], the first byte is the member type, TType.STRUCT(12) - // .. As ThriftSpan has no STRUCT fields: so, if the first byte is TType.STRUCT(12), it is a list. - if (bytes[0] == '[') { - collector.acceptSpans(bytes, Codec.JSON, NOOP); - } else { - if (bytes[0] == 12 /* TType.STRUCT */) { - collector.acceptSpans(bytes, Codec.THRIFT, NOOP); - } else { - collector.acceptSpans(Collections.singletonList(bytes), Codec.THRIFT, NOOP); - } - } + collector.acceptSpans(bytes, NOOP); } } } diff --git a/zipkin-collector/kafka10/src/test/java/zipkin/collector/kafka10/KafkaCollectorTest.java b/zipkin-collector/kafka10/src/test/java/zipkin/collector/kafka10/KafkaCollectorTest.java index 1f88f29eb54..ac3d095b4a2 100644 --- a/zipkin-collector/kafka10/src/test/java/zipkin/collector/kafka10/KafkaCollectorTest.java +++ b/zipkin-collector/kafka10/src/test/java/zipkin/collector/kafka10/KafkaCollectorTest.java @@ -15,6 +15,7 @@ import com.github.charithe.kafka.EphemeralKafkaBroker; import com.github.charithe.kafka.KafkaJunitRule; +import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.concurrent.CopyOnWriteArraySet; @@ -36,12 +37,16 @@ import zipkin.Span; import zipkin.collector.InMemoryCollectorMetrics; import zipkin.collector.kafka10.KafkaCollector.Builder; +import zipkin.internal.ApplyTimestampAndDuration; +import zipkin.internal.Span2Codec; +import zipkin.internal.Span2Converter; import zipkin.storage.AsyncSpanConsumer; import zipkin.storage.AsyncSpanStore; import zipkin.storage.SpanStore; import zipkin.storage.StorageComponent; import static org.assertj.core.api.Assertions.assertThat; +import static zipkin.TestObjects.LOTS_OF_SPANS; import static zipkin.TestObjects.TRACE; public class KafkaCollectorTest { @@ -184,6 +189,33 @@ public void messageWithMultipleSpans_json() throws Exception { assertThat(kafkaMetrics.spans()).isEqualTo(TRACE.size()); } + /** Ensures list encoding works: a version 2 json encoded list of spans */ + @Test + public void messageWithMultipleSpans_json2() throws Exception { + Builder builder = builder("multiple_spans_json2"); + + List spans = Arrays.asList( + ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]), + ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1]) + ); + + byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList( + Span2Converter.fromSpan(spans.get(0)).get(0), + Span2Converter.fromSpan(spans.get(1)).get(0) + )); + + produceSpans(bytes, builder.topic); + + try (KafkaCollector collector = builder.build()) { + collector.start(); + assertThat(receivedSpans.take()).containsAll(spans); + } + + assertThat(kafkaMetrics.messages()).isEqualTo(1); + assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length); + assertThat(kafkaMetrics.spans()).isEqualTo(spans.size()); + } + /** Ensures malformed spans don't hang the collector */ @Test public void skipsMalformedData() throws Exception { diff --git a/zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java b/zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java index ee643bdbe18..37a3cf1fe8d 100644 --- a/zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java +++ b/zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -25,8 +25,10 @@ import zipkin.Codec; import zipkin.DependencyLink; import zipkin.Span; +import zipkin.SpanDecoder; import zipkin.collector.Collector; import zipkin.collector.CollectorMetrics; +import zipkin.internal.Span2JsonDecoder; import zipkin.storage.Callback; import zipkin.storage.QueryRequest; import zipkin.storage.SpanStore; @@ -35,6 +37,8 @@ import static zipkin.internal.Util.lowerHexToUnsignedLong; final class ZipkinDispatcher extends Dispatcher { + static final SpanDecoder JSON2_DECODER = new Span2JsonDecoder(); + private final SpanStore store; private final Collector consumer; private final CollectorMetrics metrics; @@ -77,35 +81,13 @@ public MockResponse dispatch(RecordedRequest request) { } } else if (request.getMethod().equals("POST")) { if (url.encodedPath().equals("/api/v1/spans")) { - metrics.incrementMessages(); - byte[] body = request.getBody().readByteArray(); - String encoding = request.getHeader("Content-Encoding"); - if (encoding != null && encoding.contains("gzip")) { - try { - Buffer result = new Buffer(); - GzipSource source = new GzipSource(new Buffer().write(body)); - while (source.read(result, Integer.MAX_VALUE) != -1) ; - body = result.readByteArray(); - } catch (IOException e) { - metrics.incrementMessagesDropped(); - return new MockResponse().setResponseCode(400).setBody("Cannot gunzip spans"); - } - } String type = request.getHeader("Content-Type"); - Codec codec = type != null && type.contains("/x-thrift") ? Codec.THRIFT : Codec.JSON; - - final MockResponse result = new MockResponse(); - consumer.acceptSpans(body, codec, new Callback() { - @Override public void onSuccess(Void value) { - result.setResponseCode(202); - } - - @Override public void onError(Throwable t) { - String message = t.getMessage(); - result.setBody(message).setResponseCode(message.startsWith("Cannot store") ? 500 : 400); - } - }); - return result; + SpanDecoder decoder = type != null && type.contains("/x-thrift") + ? SpanDecoder.THRIFT_DECODER + : SpanDecoder.JSON_DECODER; + return acceptSpans(request, decoder); + } else if (url.encodedPath().equals("/api/v2/spans")) { + return acceptSpans(request, JSON2_DECODER); } } else { // unsupported method return new MockResponse().setResponseCode(405); @@ -113,6 +95,36 @@ public MockResponse dispatch(RecordedRequest request) { return new MockResponse().setResponseCode(404); } + MockResponse acceptSpans(RecordedRequest request, SpanDecoder decoder) { + metrics.incrementMessages(); + byte[] body = request.getBody().readByteArray(); + String encoding = request.getHeader("Content-Encoding"); + if (encoding != null && encoding.contains("gzip")) { + try { + Buffer result = new Buffer(); + GzipSource source = new GzipSource(new Buffer().write(body)); + while (source.read(result, Integer.MAX_VALUE) != -1) ; + body = result.readByteArray(); + } catch (IOException e) { + metrics.incrementMessagesDropped(); + return new MockResponse().setResponseCode(400).setBody("Cannot gunzip spans"); + } + } + + final MockResponse result = new MockResponse(); + consumer.acceptSpans(body, decoder, new Callback() { + @Override public void onSuccess(Void value) { + result.setResponseCode(202); + } + + @Override public void onError(Throwable t) { + String message = t.getMessage(); + result.setBody(message).setResponseCode(message.startsWith("Cannot store") ? 500 : 400); + } + }); + return result; + } + static QueryRequest toQueryRequest(HttpUrl url) { return QueryRequest.builder().serviceName(url.queryParameter("serviceName")) .spanName(url.queryParameter("spanName")) diff --git a/zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java b/zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java index bbebf7e586b..2d04b0918c8 100644 --- a/zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java +++ b/zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java @@ -30,6 +30,9 @@ import zipkin.Annotation; import zipkin.Codec; import zipkin.Span; +import zipkin.internal.ApplyTimestampAndDuration; +import zipkin.internal.Span2Codec; +import zipkin.internal.Span2Converter; import static java.lang.String.format; import static java.util.Arrays.asList; @@ -58,6 +61,30 @@ public void getTraces_storedViaPost() throws IOException { .containsOnly(trace); } + @Test + public void getTraces_storedViaPostVersion2() throws IOException { + List spans = Arrays.asList( + ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]), + ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1]) + ); + + byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList( + Span2Converter.fromSpan(spans.get(0)).get(0), + Span2Converter.fromSpan(spans.get(1)).get(0) + )); + + // write the span to the zipkin using http api v2 + Response response = client.newCall(new Request.Builder() + .url(zipkin.httpUrl() + "/api/v2/spans") + .post(RequestBody.create(MediaType.parse("application/json"), bytes)).build() + ).execute(); + assertThat(response.code()).isEqualTo(202); + + // read the traces directly + assertThat(zipkin.getTraces()) + .containsOnly(asList(spans.get(0)), asList(spans.get(1))); + } + /** The rule is here to help debugging. Even partial spans should be returned */ @Test public void getTraces_whenMissingTimestamps() throws IOException { diff --git a/zipkin-server/src/main/java/zipkin/server/ZipkinHttpCollector.java b/zipkin-server/src/main/java/zipkin/server/ZipkinHttpCollector.java index 57040aebedb..82a24aec248 100644 --- a/zipkin-server/src/main/java/zipkin/server/ZipkinHttpCollector.java +++ b/zipkin-server/src/main/java/zipkin/server/ZipkinHttpCollector.java @@ -27,11 +27,13 @@ import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import zipkin.Codec; +import zipkin.Span; +import zipkin.SpanDecoder; import zipkin.collector.Collector; import zipkin.collector.CollectorMetrics; import zipkin.collector.CollectorSampler; import zipkin.internal.Nullable; +import zipkin.internal.Span2JsonDecoder; import zipkin.storage.Callback; import zipkin.storage.StorageComponent; @@ -46,6 +48,7 @@ public class ZipkinHttpCollector { static final ResponseEntity SUCCESS = ResponseEntity.accepted().build(); static final String APPLICATION_THRIFT = "application/x-thrift"; + static final SpanDecoder JSON2_DECODER = new Span2JsonDecoder(); final CollectorMetrics metrics; final Collector collector; @@ -57,12 +60,20 @@ public class ZipkinHttpCollector { .storage(storage).sampler(sampler).metrics(this.metrics).build(); } + @RequestMapping(value = "/api/v2/spans", method = POST) + public ListenableFuture> uploadSpansJson2( + @RequestHeader(value = "Content-Encoding", required = false) String encoding, + @RequestBody byte[] body + ) { + return validateAndStoreSpans(encoding, JSON2_DECODER, body); + } + @RequestMapping(value = "/api/v1/spans", method = POST) public ListenableFuture> uploadSpansJson( @RequestHeader(value = "Content-Encoding", required = false) String encoding, @RequestBody byte[] body ) { - return validateAndStoreSpans(encoding, Codec.JSON, body); + return validateAndStoreSpans(encoding, SpanDecoder.JSON_DECODER, body); } @RequestMapping(value = "/api/v1/spans", method = POST, consumes = APPLICATION_THRIFT) @@ -70,10 +81,10 @@ public ListenableFuture> uploadSpansThrift( @RequestHeader(value = "Content-Encoding", required = false) String encoding, @RequestBody byte[] body ) { - return validateAndStoreSpans(encoding, Codec.THRIFT, body); + return validateAndStoreSpans(encoding, SpanDecoder.THRIFT_DECODER, body); } - ListenableFuture> validateAndStoreSpans(String encoding, Codec codec, + ListenableFuture> validateAndStoreSpans(String encoding, SpanDecoder decoder, byte[] body) { SettableListenableFuture> result = new SettableListenableFuture<>(); metrics.incrementMessages(); @@ -85,7 +96,7 @@ ListenableFuture> validateAndStoreSpans(String encoding, Codec result.set(ResponseEntity.badRequest().body("Cannot gunzip spans: " + e.getMessage() + "\n")); } } - collector.acceptSpans(body, codec, new Callback() { + collector.acceptSpans(body, decoder, new Callback() { @Override public void onSuccess(@Nullable Void value) { result.set(SUCCESS); } diff --git a/zipkin-server/src/test/java/zipkin/server/ZipkinServerIntegrationTest.java b/zipkin-server/src/test/java/zipkin/server/ZipkinServerIntegrationTest.java index 34012cbb7df..4532d95a709 100644 --- a/zipkin-server/src/test/java/zipkin/server/ZipkinServerIntegrationTest.java +++ b/zipkin-server/src/test/java/zipkin/server/ZipkinServerIntegrationTest.java @@ -13,6 +13,7 @@ */ package zipkin.server; +import java.util.Collections; import okio.Buffer; import okio.GzipSink; import org.junit.Before; @@ -31,6 +32,9 @@ import org.springframework.web.context.ConfigurableWebApplicationContext; import zipkin.Codec; import zipkin.Span; +import zipkin.internal.ApplyTimestampAndDuration; +import zipkin.internal.Span2Codec; +import zipkin.internal.Span2Converter; import zipkin.storage.InMemoryStorage; import static java.lang.String.format; @@ -43,6 +47,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.header; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; +import static zipkin.TestObjects.LOTS_OF_SPANS; import static zipkin.TestObjects.TRACE; import static zipkin.TestObjects.span; import static zipkin.internal.Util.UTF_8; @@ -76,6 +81,26 @@ public void writeSpans_noContentTypeIsJson() throws Exception { .andExpect(status().isAccepted()); } + @Test + public void writeSpans_version2() throws Exception { + Span span = ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]); + + byte[] bytes = Span2Codec.JSON.writeSpans(Collections.singletonList( + Span2Converter.fromSpan(span).get(0) + )); + + performAsync(post("/api/v2/spans").content(bytes)) + .andExpect(status().isAccepted()); + + // sleep as the the storage operation is async + Thread.sleep(1500); + + // We read it back in span v1 format + mockMvc.perform(get(format("/api/v1/trace/" + span.traceIdString()))) + .andExpect(status().isOk()) + .andExpect(content().string(new String(Codec.JSON.writeSpans(asList(span)), UTF_8))); + } + @Test public void writeSpans_updatesMetrics() throws Exception { byte[] body = Codec.JSON.writeSpans(TRACE); diff --git a/zipkin/src/main/java/zipkin/Codec.java b/zipkin/src/main/java/zipkin/Codec.java index e5f063a5f80..160a3ef7acc 100644 --- a/zipkin/src/main/java/zipkin/Codec.java +++ b/zipkin/src/main/java/zipkin/Codec.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -20,21 +20,15 @@ /** * Methods make an attempt to perform codec operations, failing to null. */ -public interface Codec { +public interface Codec extends SpanDecoder { JsonCodec JSON = new JsonCodec(); ThriftCodec THRIFT = new ThriftCodec(); - /** throws {@linkplain IllegalArgumentException} if the span couldn't be decoded */ - Span readSpan(byte[] bytes); - int sizeInBytes(Span value); byte[] writeSpan(Span value); - /** throws {@linkplain IllegalArgumentException} if the spans couldn't be decoded */ - List readSpans(byte[] bytes); - byte[] writeSpans(List value); byte[] writeTraces(List> value); diff --git a/zipkin/src/main/java/zipkin/SpanDecoder.java b/zipkin/src/main/java/zipkin/SpanDecoder.java new file mode 100644 index 00000000000..79b39907890 --- /dev/null +++ b/zipkin/src/main/java/zipkin/SpanDecoder.java @@ -0,0 +1,31 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package zipkin; + +import java.util.List; +import zipkin.internal.JsonCodec; +import zipkin.internal.ThriftCodec; + +/** Decodes spans from serialized bytes. */ +public interface SpanDecoder { + SpanDecoder JSON_DECODER = new JsonCodec(); + SpanDecoder THRIFT_DECODER = new ThriftCodec(); + + /** throws {@linkplain IllegalArgumentException} if a span couldn't be decoded */ + Span readSpan(byte[] span); + + /** throws {@linkplain IllegalArgumentException} if the spans couldn't be decoded */ + List readSpans(byte[] span); +} diff --git a/zipkin/src/main/java/zipkin/collector/Collector.java b/zipkin/src/main/java/zipkin/collector/Collector.java index 873ab86736c..51a5f3630da 100644 --- a/zipkin/src/main/java/zipkin/collector/Collector.java +++ b/zipkin/src/main/java/zipkin/collector/Collector.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -14,11 +14,14 @@ package zipkin.collector; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.logging.Logger; -import zipkin.Codec; import zipkin.Span; +import zipkin.SpanDecoder; +import zipkin.internal.Span2JsonDecoder; +import zipkin.internal.Util; import zipkin.storage.Callback; import zipkin.storage.StorageComponent; @@ -86,11 +89,46 @@ public Collector build() { this.metrics = builder.metrics == null ? CollectorMetrics.NOOP_METRICS : builder.metrics; } - public void acceptSpans(byte[] serializedSpans, Codec codec, Callback callback) { + /** zipkin v2 will have this tag, and others won't. */ + static final byte[] LOCAL_ENDPOINT_TAG = "\"localEndpoint\"".getBytes(Util.UTF_8); + static final SpanDecoder JSON2_DECODER = new Span2JsonDecoder(); + + public void acceptSpans(byte[] bytes, Callback callback) { + // In TBinaryProtocol encoding, the first byte is the TType, in a range 0-16 + // .. If the first byte isn't in that range, it isn't a thrift. + // + // When byte(0) == '[' (91), assume it is a list of json-encoded spans + // + // When byte(0) <= 16, assume it is a TBinaryProtocol-encoded thrift + // .. When serializing a Span (Struct), the first byte will be the type of a field + // .. When serializing a List[ThriftSpan], the first byte is the member type, TType.STRUCT(12) + // .. As ThriftSpan has no STRUCT fields: so, if the first byte is TType.STRUCT(12), it is a list. + if (bytes[0] == '[') { + bytes: // searches for a substring matching zipkin v2 format. otherwise assume it isn't. + for (int i = 0; i < bytes.length - LOCAL_ENDPOINT_TAG.length + 1; i++) { + for (int j = 0; j < LOCAL_ENDPOINT_TAG.length; j++) { + if (bytes[i + j] != LOCAL_ENDPOINT_TAG[j]) { + continue bytes; + } + } + acceptSpans(bytes, JSON2_DECODER, callback); + return; + } + acceptSpans(bytes, SpanDecoder.JSON_DECODER, callback); + } else { + if (bytes[0] == 12 /* TType.STRUCT */) { + acceptSpans(bytes, SpanDecoder.THRIFT_DECODER, callback); + } else { + acceptSpans(Collections.singletonList(bytes), SpanDecoder.THRIFT_DECODER, callback); + } + } + } + + public void acceptSpans(byte[] serializedSpans, SpanDecoder decoder, Callback callback) { metrics.incrementBytes(serializedSpans.length); List spans; try { - spans = codec.readSpans(serializedSpans); + spans = decoder.readSpans(serializedSpans); } catch (RuntimeException e) { callback.onError(errorReading(e)); return; @@ -98,13 +136,14 @@ public void acceptSpans(byte[] serializedSpans, Codec codec, Callback call accept(spans, callback); } - public void acceptSpans(List serializedSpans, Codec codec, Callback callback) { + public void acceptSpans(List serializedSpans, SpanDecoder decoder, + Callback callback) { List spans = new ArrayList<>(serializedSpans.size()); try { int bytesRead = 0; for (byte[] serializedSpan : serializedSpans) { bytesRead += serializedSpan.length; - spans.add(codec.readSpan(serializedSpan)); + spans.add(decoder.readSpan(serializedSpan)); } metrics.incrementBytes(bytesRead); } catch (RuntimeException e) { diff --git a/zipkin/src/main/java/zipkin/internal/Span2JsonDecoder.java b/zipkin/src/main/java/zipkin/internal/Span2JsonDecoder.java new file mode 100644 index 00000000000..4241e085e4c --- /dev/null +++ b/zipkin/src/main/java/zipkin/internal/Span2JsonDecoder.java @@ -0,0 +1,38 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import zipkin.Span; +import zipkin.SpanDecoder; + +/** Decodes a span from zipkin v2 encoding */ +public final class Span2JsonDecoder implements SpanDecoder { + @Override public Span readSpan(byte[] span) { + return Span2Converter.toSpan(Span2Codec.JSON.readSpan(span)); + } + + @Override public List readSpans(byte[] span) { + List span2s = Span2Codec.JSON.readSpans(span); + if (span2s.isEmpty()) return Collections.emptyList(); + int length = span2s.size(); + List result = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + result.add(Span2Converter.toSpan(span2s.get(i))); + } + return result; + } +} diff --git a/zipkin/src/test/java/zipkin/collector/CollectorTest.java b/zipkin/src/test/java/zipkin/collector/CollectorTest.java index 1e538fd528d..fcbd3fec584 100644 --- a/zipkin/src/test/java/zipkin/collector/CollectorTest.java +++ b/zipkin/src/test/java/zipkin/collector/CollectorTest.java @@ -1,5 +1,5 @@ /** - * Copyright 2015-2016 The OpenZipkin Authors + * Copyright 2015-2017 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -14,18 +14,25 @@ package zipkin.collector; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; import org.junit.Test; +import zipkin.Codec; +import zipkin.Span; +import zipkin.internal.ApplyTimestampAndDuration; +import zipkin.internal.Span2Codec; +import zipkin.internal.Span2Converter; import zipkin.storage.Callback; import zipkin.storage.InMemoryStorage; -import zipkin.Span; +import zipkin.storage.QueryRequest; import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; -import static zipkin.storage.Callback.NOOP; +import static zipkin.TestObjects.LOTS_OF_SPANS; import static zipkin.TestObjects.span; +import static zipkin.storage.Callback.NOOP; public class CollectorTest { List messages = new ArrayList<>(); @@ -38,14 +45,13 @@ public void log(Level level, String msg, Throwable thrown) { } }).storage(new InMemoryStorage()).build(); - Span span1 = Span.builder().traceId(1L).id(1L).name("foo").build(); - Span span2 = Span.builder().traceId(1L).parentId(1L).id(2L).name("bar").build(); + Span span1 = ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]); + Span span2 = ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1]); @Test public void acceptSpansCallback_toStringIncludesSpanIds() { assertThat(collector.acceptSpansCallback(asList(span1, span2))) - .hasToString( - "AcceptSpans([0000000000000001.0000000000000001<:0000000000000001, 0000000000000001.0000000000000002<:0000000000000001])"); + .hasToString("AcceptSpans([" + span1.idString() + ", " + span2.idString() + "])"); } @Test @@ -54,8 +60,7 @@ public void acceptSpansCallback_onErrorWithNullMessage() { callback.onError(new RuntimeException()); assertThat(messages) - .containsExactly( - "Cannot store spans [0000000000000001.0000000000000001<:0000000000000001] due to RuntimeException()"); + .containsExactly("Cannot store spans [" + span1.idString() + "] due to RuntimeException()"); } @Test @@ -64,8 +69,8 @@ public void acceptSpansCallback_onErrorWithMessage() { callback.onError(new IllegalArgumentException("no beer")); assertThat(messages) - .containsExactly( - "Cannot store spans [0000000000000001.0000000000000001<:0000000000000001] due to IllegalArgumentException(no beer)"); + .containsExactly( + "Cannot store spans [" + span1.idString() + "] due to IllegalArgumentException(no beer)"); } @Test @@ -75,8 +80,7 @@ public void errorAcceptingSpans_onErrorWithNullMessage() { assertThat(messages) .containsExactly(message) - .containsExactly( - "Cannot store spans [0000000000000001.0000000000000001<:0000000000000001] due to RuntimeException()"); + .containsExactly("Cannot store spans [" + span1.idString() + "] due to RuntimeException()"); } @Test @@ -87,8 +91,8 @@ public void errorAcceptingSpans_onErrorWithMessage() { assertThat(messages) .containsExactly(message) - .containsExactly( - "Cannot store spans [0000000000000001.0000000000000001<:0000000000000001] due to IllegalArgumentException(no beer)"); + .containsExactly( + "Cannot store spans [" + span1.idString() + "] due to IllegalArgumentException(no beer)"); } @Test @@ -121,10 +125,39 @@ public void errorDecoding_doesntWrapMalformedException() { } @Test - public void debugFlagWins() { - collector.accept(asList(span(Long.MIN_VALUE).toBuilder().debug(true).build()), NOOP); + public void acceptSpans_detectsThrift() { + collector.acceptSpans(Codec.THRIFT.writeSpan(span1), NOOP); - assertThat(collector.storage.spanStore().getServiceNames()).containsExactly("service"); + assertThat(collector.storage.spanStore().getTraces(QueryRequest.builder().build())) + .hasSize(1); + } + + @Test + public void acceptSpans_detectsThriftList() { + collector.acceptSpans(Codec.THRIFT.writeSpans(asList(span1, span2)), NOOP); + + assertThat(collector.storage.spanStore().getTraces(QueryRequest.builder().build())) + .hasSize(2); + } + + @Test + public void acceptSpans_detectsJsonList() { + collector.acceptSpans(Codec.JSON.writeSpans(asList(span1, span2)), NOOP); + + assertThat(collector.storage.spanStore().getTraces(QueryRequest.builder().build())) + .hasSize(2); + } + + @Test + public void acceptSpans_detectsJson2List() { + byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList( + Span2Converter.fromSpan(span1).get(0), + Span2Converter.fromSpan(span2).get(0) + )); + collector.acceptSpans(bytes, NOOP); + + assertThat(collector.storage.spanStore().getTraces(QueryRequest.builder().build())) + .hasSize(2); } @Test @@ -137,4 +170,11 @@ public void unsampledSpansArentStored() { assertThat(collector.storage.spanStore().getServiceNames()).isEmpty(); } + + @Test + public void debugFlagWins() { + collector.accept(asList(span(Long.MIN_VALUE).toBuilder().debug(true).build()), NOOP); + + assertThat(collector.storage.spanStore().getServiceNames()).containsExactly("service"); + } } diff --git a/zipkin/src/test/java/zipkin/internal/Span2JsonDecoderTest.java b/zipkin/src/test/java/zipkin/internal/Span2JsonDecoderTest.java new file mode 100644 index 00000000000..26ee8a72c48 --- /dev/null +++ b/zipkin/src/test/java/zipkin/internal/Span2JsonDecoderTest.java @@ -0,0 +1,41 @@ +/** + * Copyright 2015-2017 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin.internal; + +import org.junit.Test; +import zipkin.Span; +import zipkin.SpanDecoder; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; +import static zipkin.TestObjects.LOTS_OF_SPANS; + +public class Span2JsonDecoderTest { + Span span1 = ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]); + Span span2 = ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1]); + Span2 span2_1 = Span2Converter.fromSpan(span1).get(0); + Span2 span2_2 = Span2Converter.fromSpan(span2).get(0); + + SpanDecoder decoder = new Span2JsonDecoder(); + + @Test public void readSpan() { + assertThat(decoder.readSpan(Span2Codec.JSON.writeSpan(span2_1))) + .isEqualTo(span1); + } + + @Test public void readSpans() { + assertThat(decoder.readSpans(Span2Codec.JSON.writeSpans(asList(span2_1, span2_2)))) + .containsExactly(span1, span2); + } +}