Skip to content

Commit

Permalink
Adds v2 StorageComponent and allows the server to use it directly
Browse files Browse the repository at this point in the history
This adds a type for the v2 StorageComponent so that future storage
implementations can be made without a compile dependency on the v1 jar.

This is internal until everything is verified.
  • Loading branch information
Adrian Cole committed Sep 10, 2017
1 parent f895b83 commit 0baa31f
Show file tree
Hide file tree
Showing 40 changed files with 543 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import zipkin.internal.V2SpanConverter;
import zipkin.internal.Util;

import static zipkin.internal.V2SpanConverter.toEndpoint;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 10, time = 1)
@Fork(3)
Expand Down Expand Up @@ -97,8 +95,8 @@ public class Span2ConverterBenchmarks {
.name("get")
.kind(Span.Kind.SERVER)
.shared(true)
.localEndpoint(toEndpoint(backend))
.remoteEndpoint(toEndpoint(frontend))
.localEndpoint(V2SpanConverter.fromEndpoint(backend))
.remoteEndpoint(V2SpanConverter.fromEndpoint(frontend))
.timestamp(1472470996250000L)
.duration(100000L)
.putTag(TraceKeys.HTTP_PATH, "/backend")
Expand Down
10 changes: 6 additions & 4 deletions zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import zipkin.internal.v2.codec.DependencyLinkBytesCodec;
import zipkin.internal.v2.codec.SpanBytesCodec;
import zipkin.internal.v2.internal.Platform;
import zipkin.internal.v2.storage.StorageComponent;
import zipkin.storage.Callback;
import zipkin.storage.QueryRequest;
import zipkin.storage.SpanStore;
Expand All @@ -51,10 +52,11 @@ final class ZipkinDispatcher extends Dispatcher {
private final CollectorMetrics metrics;
private final MockWebServer server;

ZipkinDispatcher(V2StorageComponent storage, CollectorMetrics metrics, MockWebServer server) {
this.store = storage.spanStore();
this.store2 = storage.v2SpanStore();
this.consumer = Collector.builder(getClass()).storage(storage).metrics(metrics).build();
ZipkinDispatcher(StorageComponent storage, CollectorMetrics metrics, MockWebServer server) {
V2StorageComponent adapted = V2StorageComponent.create(storage);
this.store = adapted.spanStore();
this.store2 = storage.spanStore();
this.consumer = Collector.builder(getClass()).storage(adapted).metrics(metrics).build();
this.metrics = metrics;
this.server = server;
}
Expand Down
16 changes: 9 additions & 7 deletions zipkin-junit/src/main/java/zipkin/junit/ZipkinRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import org.junit.runners.model.Statement;
import zipkin.Span;
import zipkin.collector.InMemoryCollectorMetrics;
import zipkin.internal.CallbackCaptor;
import zipkin.internal.GroupByTraceId;
import zipkin.internal.V2InMemoryStorage;
import zipkin.internal.V2SpanConverter;
import zipkin.internal.v2.internal.Platform;
import zipkin.internal.v2.storage.InMemoryStorage;

import static okhttp3.mockwebserver.SocketPolicy.KEEP_OPEN;
import static zipkin.internal.GroupByTraceId.TRACE_DESCENDING;
Expand All @@ -48,7 +48,7 @@
* See http://openzipkin.github.io/zipkin-api/#/
*/
public final class ZipkinRule implements TestRule {
private final V2InMemoryStorage storage = V2InMemoryStorage.newBuilder().build();
private final InMemoryStorage storage = InMemoryStorage.newBuilder().build();
private final InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics();
private final MockWebServer server = new MockWebServer();
private final BlockingQueue<MockResponse> failureQueue = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -114,9 +114,11 @@ public InMemoryCollectorMetrics collectorMetrics() {
* you'd add the parent here.
*/
public ZipkinRule storeSpans(List<Span> spans) {
CallbackCaptor<Void> callback = new CallbackCaptor<>();
storage.asyncSpanConsumer().accept(spans, callback);
callback.get();
try {
storage.accept(V2SpanConverter.fromSpans(spans)).execute();
} catch (IOException e) {
throw Platform.get().uncheckedIOException(e);
}
return this;
}

Expand All @@ -138,7 +140,7 @@ public ZipkinRule enqueueFailure(HttpFailure failure) {

/** Retrieves all traces this zipkin server has received. */
public List<List<Span>> getTraces() {
List<List<zipkin.internal.v2.Span>> traces = storage.v2SpanStore().getTraces();
List<List<zipkin.internal.v2.Span>> traces = storage.spanStore().getTraces();
List<List<Span>> result = new ArrayList<>(traces.size());
for (List<zipkin.internal.v2.Span> trace2 : traces) {
List<Span> sameTraceId = new ArrayList<>();
Expand Down
9 changes: 5 additions & 4 deletions zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@

import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import zipkin.internal.V2StorageComponent;
import zipkin.internal.v2.CheckResult;
import zipkin.internal.v2.storage.SpanConsumer;
import zipkin.internal.v2.storage.SpanStore;
import zipkin.internal.v2.storage.StorageComponent;

/**
* Test storage component that forwards requests to an HTTP endpoint.
*
* <p>Note: this inherits the {@link Builder#strictTraceId(boolean)} from the backend.
*/
final class HttpV2Storage extends V2StorageComponent {
final class HttpV2Storage extends StorageComponent {
private final OkHttpClient client;
private final HttpUrl baseUrl;
private final HttpV2SpanStore spanStore;
Expand All @@ -40,11 +41,11 @@ final class HttpV2Storage extends V2StorageComponent {
this.spanConsumer = new HttpV2SpanConsumer(this.client, this.baseUrl);
}

@Override public SpanStore v2SpanStore() {
@Override public SpanStore spanStore() {
return spanStore;
}

@Override public SpanConsumer v2SpanConsumer() {
@Override public SpanConsumer spanConsumer() {
return spanConsumer;
}

Expand Down
10 changes: 6 additions & 4 deletions zipkin-junit/src/test/java/zipkin/junit/v2/ITHttpV2Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import org.junit.Rule;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import zipkin.internal.V2StorageComponent;
import zipkin.junit.ZipkinRule;
import zipkin.storage.StorageComponent;

@RunWith(Enclosed.class)
public class ITHttpV2Storage {
Expand All @@ -26,8 +28,8 @@ public static class DependenciesTest extends zipkin.storage.DependenciesTest {
@Rule public ZipkinRule server = new ZipkinRule();
HttpV2Storage storage = new HttpV2Storage(server.httpUrl());

@Override protected HttpV2Storage storage() {
return storage;
@Override protected StorageComponent storage() {
return V2StorageComponent.create(storage);
}

@Override public void clear() {
Expand All @@ -39,8 +41,8 @@ public static class SpanStoreTest extends zipkin.storage.SpanStoreTest {
@Rule public ZipkinRule server = new ZipkinRule();
HttpV2Storage storage = new HttpV2Storage(server.httpUrl());

@Override protected HttpV2Storage storage() {
return storage;
@Override protected StorageComponent storage() {
return V2StorageComponent.create(storage);
}

@Override public void clear() throws IOException {
Expand Down
18 changes: 9 additions & 9 deletions zipkin-server/src/main/java/zipkin/server/ZipkinQueryApiV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import zipkin.internal.v2.codec.DependencyLinkBytesCodec;
import zipkin.internal.v2.codec.SpanBytesCodec;
import zipkin.internal.v2.storage.QueryRequest;
import zipkin.storage.StorageComponent;
import zipkin.internal.v2.storage.StorageComponent;

import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;

Expand All @@ -51,21 +51,21 @@ public class ZipkinQueryApiV2 {
static final Charset UTF_8 = Charset.forName("UTF-8");

final String storageType;
final V2StorageComponent storage; // don't cache spanStore here as it can cause the app to crash!
final StorageComponent storage; // don't cache spanStore here as it can cause the app to crash!
final long defaultLookback;
/** The Cache-Control max-age (seconds) for /api/v2/services and /api/v2/spans */
final int namesMaxAge;

volatile int serviceCount; // used as a threshold to start returning cache-control headers

ZipkinQueryApiV2(
StorageComponent storage,
zipkin.storage.StorageComponent storage,
@Value("${zipkin.storage.type:mem}") String storageType,
@Value("${zipkin.query.lookback:86400000}") long defaultLookback, // 1 day in millis
@Value("${zipkin.query.names-max-age:300}") int namesMaxAge // 5 minutes
) {
if (storage instanceof V2StorageComponent) {
this.storage = (V2StorageComponent) storage;
this.storage = ((V2StorageComponent) storage).internalDelegate();
} else {
this.storage = null;
}
Expand All @@ -81,7 +81,7 @@ public byte[] getDependencies(
) throws IOException {
if (storage == null) throw new Version2StorageNotConfigured();

Call<List<DependencyLink>> call = storage.v2SpanStore()
Call<List<DependencyLink>> call = storage.spanStore()
.getDependencies(endTs, lookback != null ? lookback : defaultLookback);
return DependencyLinkBytesCodec.JSON.encodeList(call.execute());
}
Expand All @@ -90,7 +90,7 @@ public byte[] getDependencies(
public ResponseEntity<List<String>> getServiceNames() throws IOException {
if (storage == null) throw new Version2StorageNotConfigured();

List<String> serviceNames = storage.v2SpanStore().getServiceNames().execute();
List<String> serviceNames = storage.spanStore().getServiceNames().execute();
serviceCount = serviceNames.size();
return maybeCacheNames(serviceNames);
}
Expand All @@ -101,7 +101,7 @@ public ResponseEntity<List<String>> getSpanNames(
) throws IOException {
if (storage == null) throw new Version2StorageNotConfigured();

return maybeCacheNames(storage.v2SpanStore().getSpanNames(serviceName).execute());
return maybeCacheNames(storage.spanStore().getSpanNames(serviceName).execute());
}

@RequestMapping(value = "/traces", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE)
Expand All @@ -127,15 +127,15 @@ public String getTraces(
.lookback(lookback != null ? lookback : defaultLookback)
.limit(limit).build();

List<List<Span>> traces = storage.v2SpanStore().getTraces(queryRequest).execute();
List<List<Span>> traces = storage.spanStore().getTraces(queryRequest).execute();
return new String(SpanBytesCodec.JSON_V2.encodeNestedList(traces), UTF_8);
}

@RequestMapping(value = "/trace/{traceIdHex}", method = RequestMethod.GET, produces = APPLICATION_JSON_VALUE)
public String getTrace(@PathVariable String traceIdHex, WebRequest request) throws IOException {
if (storage == null) throw new Version2StorageNotConfigured();

List<Span> trace = storage.v2SpanStore().getTrace(traceIdHex).execute();
List<Span> trace = storage.spanStore().getTrace(traceIdHex).execute();
if (trace.isEmpty()) throw new TraceNotFoundException(traceIdHex);
return new String(SpanBytesCodec.JSON_V2.encodeList(trace), UTF_8);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import org.springframework.boot.actuate.health.HealthAggregator;
import org.springframework.boot.actuate.metrics.buffer.CounterBuffers;
import org.springframework.boot.actuate.metrics.buffer.GaugeBuffers;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import zipkin.collector.CollectorMetrics;
import zipkin.collector.CollectorSampler;
import zipkin.internal.V2InMemoryStorage;
import zipkin.internal.V2StorageComponent;
import zipkin.internal.v2.storage.InMemoryStorage;
import zipkin.server.brave.TracedStorageComponent;
import zipkin.storage.StorageComponent;

Expand Down Expand Up @@ -80,6 +81,10 @@ public Object postProcessAfterInitialization(Object bean, String beanName) {
}
}

/**
* This is a special-case configuration if there's no StorageComponent of any kind. In-Mem can
* supply both read apis, so we add two beans here.
*/
@Configuration
// "matchIfMissing = true" ensures this is used when there's no configured storage type
@ConditionalOnProperty(name = "zipkin.storage.type", havingValue = "mem", matchIfMissing = true)
Expand All @@ -88,10 +93,24 @@ static class InMemoryConfiguration {
@Bean StorageComponent storage(
@Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
@Value("${zipkin.storage.mem.max-spans:500000}") int maxSpans) {
return V2InMemoryStorage.newBuilder()
return V2StorageComponent.create(InMemoryStorage.newBuilder()
.strictTraceId(strictTraceId)
.maxSpanCount(maxSpans)
.build();
.build());
}

@Bean InMemoryStorage v2Storage(V2StorageComponent component) {
return (InMemoryStorage) component.internalDelegate();
}
}

/** This allows zipkin v2 components to be adapted to v1 components */
@Configuration
@ConditionalOnBean(zipkin.internal.v2.storage.StorageComponent.class)
@ConditionalOnMissingBean(StorageComponent.class)
static class AdapterConfiguration {
@Bean StorageComponent storage(zipkin.internal.v2.storage.StorageComponent in) {
return V2StorageComponent.create(in);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import zipkin.Codec;
import zipkin.Span;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.V2InMemoryStorage;
import zipkin.internal.V2SpanConverter;
import zipkin.internal.v2.codec.SpanBytesEncoder;
import zipkin.internal.v2.storage.InMemoryStorage;

import static java.lang.String.format;
import static java.util.Arrays.asList;
Expand Down Expand Up @@ -67,7 +67,7 @@ public class ZipkinServerIntegrationTest {
@Autowired
ConfigurableWebApplicationContext context;
@Autowired
V2InMemoryStorage storage;
InMemoryStorage storage;
@Autowired
ActuateCollectorMetrics metrics;
@LocalServerPort
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.server;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.boot.actuate.health.HealthAggregator;
import org.springframework.boot.actuate.health.OrderedHealthAggregator;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import zipkin.internal.v2.storage.InMemoryStorage;
import zipkin.internal.v2.storage.StorageComponent;

public class ZipkinServerV2StorageTest {
AnnotationConfigApplicationContext context;

@Before public void init() {
context = new AnnotationConfigApplicationContext();
}

@After public void close() {
if (context != null) context.close();
}

@Test public void adaptsStorageComponent() {
context.register(
PropertyPlaceholderAutoConfiguration.class,
V2Storage.class,
ZipkinServerConfiguration.class);
context.refresh();

context.getBean(zipkin.storage.StorageComponent.class);
}

@Configuration
public static class V2Storage {
@Bean public HealthAggregator healthAggregator() {
return new OrderedHealthAggregator();
}

@Bean public StorageComponent component() {
return InMemoryStorage.newBuilder().build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class ITCassandraStorage {

@ClassRule
public static LazyCassandraStorage storage =
new LazyCassandraStorage("openzipkin/zipkin-cassandra:1.29.1", "test_zipkin");
new LazyCassandraStorage("openzipkin/zipkin-cassandra:1.31.1", "test_zipkin");

public static class DependenciesTest extends CassandraDependenciesTest {
@Override protected CassandraStorage storage() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class ITCassandra3Storage {

@ClassRule
public static LazyCassandra3Storage storage =
new LazyCassandra3Storage("openzipkin/zipkin-cassandra:1.29.1", "test_zipkin3");
new LazyCassandra3Storage("openzipkin/zipkin-cassandra:1.31.1", "test_zipkin3");

public static class DependenciesTest extends CassandraDependenciesTest {
@Override protected Cassandra3Storage storage() {
Expand Down
Loading

0 comments on commit 0baa31f

Please sign in to comment.