Skip to content

Commit

Permalink
WIP part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
Adrian Cole committed Aug 29, 2017
1 parent ab09853 commit d51ec34
Show file tree
Hide file tree
Showing 18 changed files with 1,286 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import okhttp3.Request;
import okhttp3.RequestBody;
import okio.Buffer;
import zipkin.internal.LenientDoubleCallbackAsyncSpanStore;
import zipkin.internal.V2StorageComponent;
import zipkin.internal.v2.storage.SpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
import zipkin.storage.StorageAdapters;
import zipkin.storage.elasticsearch.http.internal.LenientDoubleCallbackAsyncSpanStore;
import zipkin.storage.elasticsearch.http.internal.client.HttpCall;

import static zipkin.internal.Util.checkNotNull;
Expand Down Expand Up @@ -205,6 +205,10 @@ public final Builder dateSeparator(char dateSeparator) {

abstract boolean legacyReadsEnabled();

@Override public zipkin.internal.v2.storage.SpanStore v2SpanStore() {
throw new UnsupportedOperationException("TODO");
}

@Override public SpanStore spanStore() {
return StorageAdapters.asyncToBlocking(asyncSpanStore());
}
Expand All @@ -221,7 +225,7 @@ public final Builder dateSeparator(char dateSeparator) {
}
}

@Override protected SpanConsumer v2AsyncSpanConsumer() {
@Override public SpanConsumer v2SpanConsumer() {
ensureIndexTemplates();
return new ElasticsearchHttpSpanConsumer(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,6 @@ public void close() throws IOException {
}

void accept(Span... spans) throws Exception {
storage.v2AsyncSpanConsumer().accept(asList(spans)).execute();
storage.v2SpanConsumer().accept(asList(spans)).execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.storage.elasticsearch.http.internal;
package zipkin.internal;

import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -33,7 +33,7 @@ abstract class LenientDoubleCallback<V> implements Callback<V> {
this.delegate = delegate;
}

abstract V merge(V v1, V v2);
abstract @Nullable V merge(@Nullable V v1, @Nullable V v2);

@Override synchronized final public void onSuccess(@Nullable V value) {
if (t != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.storage.elasticsearch.http.internal;
package zipkin.internal;

import java.util.ArrayList;
import java.util.LinkedHashSet;
Expand All @@ -21,8 +21,6 @@
import javax.annotation.Nullable;
import zipkin.DependencyLink;
import zipkin.Span;
import zipkin.internal.DependencyLinker;
import zipkin.internal.MergeById;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.Callback;
import zipkin.storage.QueryRequest;
Expand All @@ -31,6 +29,7 @@
* This makes redundant read commands, concatenating results if two answers come back, or accepting
* one if there's an error on the other.
*/
// TODO: temporarily public until elasticsearch-http transitions to V2 SpanStore
public final class LenientDoubleCallbackAsyncSpanStore implements AsyncSpanStore {
final AsyncSpanStore left;
final AsyncSpanStore right;
Expand Down
8 changes: 4 additions & 4 deletions zipkin/src/main/java/zipkin/internal/V2CallbackAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
import javax.annotation.Nullable;
import zipkin.storage.Callback;

final class V2CallbackAdapter implements zipkin.internal.v2.Callback<Void> {
private final Callback<Void> callback;
final class V2CallbackAdapter<T> implements zipkin.internal.v2.Callback<T> {
final Callback<T> callback;

V2CallbackAdapter(Callback<Void> callback) {
V2CallbackAdapter(Callback<T> callback) {
this.callback = callback;
}

@Override public void onSuccess(@Nullable Void value) {
@Override public void onSuccess(@Nullable T value) {
callback.onSuccess(value);
}

Expand Down
2 changes: 1 addition & 1 deletion zipkin/src/main/java/zipkin/internal/V2Collector.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void acceptSpans(byte[] serializedSpans, Decoder<Span> decoder, Callback<
}

@Override protected void record(List<Span> sampled, Callback<Void> callback) {
storage.v2AsyncSpanConsumer().accept(sampled).enqueue(new V2CallbackAdapter(callback));
storage.v2SpanConsumer().accept(sampled).enqueue(new V2CallbackAdapter(callback));
}

@Override protected String idString(Span span) {
Expand Down
44 changes: 44 additions & 0 deletions zipkin/src/main/java/zipkin/internal/V2SpanConsumerAdapter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.internal;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.storage.SpanConsumer;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;

final class V2SpanConsumerAdapter implements AsyncSpanConsumer {
final SpanConsumer delegate;

V2SpanConsumerAdapter(SpanConsumer delegate) {
this.delegate = delegate;
}

@Override public void accept(List<zipkin.Span> spans, Callback<Void> callback) {
delegate.accept(fromSpans(spans)).enqueue(new V2CallbackAdapter<>(callback));
}

static List<Span> fromSpans(List<zipkin.Span> spans) {
if (spans.isEmpty()) return Collections.emptyList();
int length = spans.size();
List<Span> span2s = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
span2s.addAll(V2SpanConverter.fromSpan(spans.get(i)));
}
return span2s;
}
}
192 changes: 192 additions & 0 deletions zipkin/src/main/java/zipkin/internal/V2SpanStoreAdapter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/**
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.internal;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import zipkin.DependencyLink;
import zipkin.internal.v2.Call;
import zipkin.internal.v2.Call.Mapper;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.internal.Platform;
import zipkin.internal.v2.storage.QueryRequest;
import zipkin.internal.v2.storage.SpanStore;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.Callback;

import static zipkin.internal.GroupByTraceId.TRACE_DESCENDING;

final class V2SpanStoreAdapter implements zipkin.storage.SpanStore, AsyncSpanStore {
final SpanStore delegate;

V2SpanStoreAdapter(SpanStore delegate) {
this.delegate = delegate;
}

@Override public List<List<zipkin.Span>> getTraces(zipkin.storage.QueryRequest request) {
try {
return getTracesCall(request).execute();
} catch (IOException e) {
throw Platform.get().uncheckedIOException(e);
}
}

@Override
public void getTraces(zipkin.storage.QueryRequest request,
Callback<List<List<zipkin.Span>>> callback) {
getTracesCall(request).enqueue(new V2CallbackAdapter<>(callback));
}

Call<List<List<zipkin.Span>>> getTracesCall(zipkin.storage.QueryRequest v1Request) {
return delegate.getTraces(convert(v1Request)).map(getTracesMapper);
}

@Nullable @Override public List<zipkin.Span> getTrace(long traceIdHigh, long traceIdLow) {
try {
return getTraceCall(traceIdHigh, traceIdLow).execute();
} catch (IOException e) {
throw Platform.get().uncheckedIOException(e);
}
}

@Override
public void getTrace(long traceIdHigh, long traceIdLow, Callback<List<zipkin.Span>> callback) {
getTraceCall(traceIdHigh, traceIdLow).enqueue(new V2CallbackAdapter<>(callback));
}

Call<List<zipkin.Span>> getTraceCall(long traceIdHigh, long traceIdLow) {
return delegate.getTrace(traceIdHigh, traceIdLow).map(getTraceMapper);
}

@Nullable @Override public List<zipkin.Span> getRawTrace(long traceIdHigh, long traceIdLow) {
try {
return getRawTraceCall(traceIdHigh, traceIdLow).execute();
} catch (IOException e) {
throw Platform.get().uncheckedIOException(e);
}
}

@Override
public void getRawTrace(long traceIdHigh, long traceIdLow,
Callback<List<zipkin.Span>> callback) {
getRawTraceCall(traceIdHigh, traceIdLow).enqueue(new V2CallbackAdapter<>(callback));
}

Call<List<zipkin.Span>> getRawTraceCall(long traceIdHigh, long traceIdLow) {
return delegate.getTrace(traceIdHigh, traceIdLow).map(getRawTraceMapper);
}

@Override public List<String> getServiceNames() {
try {
return delegate.getServiceNames().execute();
} catch (IOException e) {
throw Platform.get().uncheckedIOException(e);
}
}

@Override public void getServiceNames(Callback<List<String>> callback) {
delegate.getServiceNames().enqueue(new V2CallbackAdapter<>(callback));
}

@Override public List<String> getSpanNames(String serviceName) {
try {
return delegate.getSpanNames(serviceName).execute();
} catch (IOException e) {
throw Platform.get().uncheckedIOException(e);
}
}

@Override public void getSpanNames(String serviceName, Callback<List<String>> callback) {
delegate.getSpanNames(serviceName).enqueue(new V2CallbackAdapter<>(callback));
}

@Override public List<DependencyLink> getDependencies(long endTs, @Nullable Long lookback) {
try {
return getDependenciesCall(endTs, lookback).execute();
} catch (IOException e) {
throw Platform.get().uncheckedIOException(e);
}
}

@Override public void getDependencies(long endTs, @Nullable Long lookback,
Callback<List<DependencyLink>> callback) {
getDependenciesCall(endTs, lookback).enqueue(new V2CallbackAdapter<>(callback));
}

Call<List<DependencyLink>> getDependenciesCall(long endTs, @Nullable Long lookback) {
return delegate.getDependencies(endTs, lookback != null ? lookback : endTs);
}

@Nullable @Override public List<zipkin.Span> getTrace(long traceId) {
return getTrace(0L, traceId);
}

@Override public void getTrace(long id, Callback<List<zipkin.Span>> callback) {
getTrace(0L, id, callback);
}

@Nullable @Override public List<zipkin.Span> getRawTrace(long traceId) {
return getRawTrace(0L, traceId);
}

@Override public void getRawTrace(long traceId, Callback<List<zipkin.Span>> callback) {
getRawTrace(0L, traceId, callback);
}

static final Mapper<List<List<Span>>, List<List<zipkin.Span>>> getTracesMapper = (trace2s) -> {
if (trace2s.isEmpty()) return Collections.emptyList();
int length = trace2s.size();
List<List<zipkin.Span>> trace1s = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
trace1s.add(CorrectForClockSkew.apply(MergeById.apply(convert(trace2s.get(i)))));
}
Collections.sort(trace1s, TRACE_DESCENDING);
return trace1s;
};

static final Mapper<List<Span>, List<zipkin.Span>> getTraceMapper = (spans) -> {
List<zipkin.Span> span1s = CorrectForClockSkew.apply(MergeById.apply(convert(spans)));
return (span1s.isEmpty()) ? null : span1s;
};

static final Mapper<List<Span>, List<zipkin.Span>> getRawTraceMapper = (spans) -> {
List<zipkin.Span> span1s = convert(spans);
return (span1s.isEmpty()) ? null : span1s;
};

static QueryRequest convert(zipkin.storage.QueryRequest v1Request) {
return QueryRequest.newBuilder()
.serviceName(v1Request.serviceName)
.spanName(v1Request.spanName)
.parseAnnotationQuery(v1Request.toAnnotationQuery())
.minDuration(v1Request.minDuration)
.maxDuration(v1Request.maxDuration)
.endTs(v1Request.endTs)
.lookback(v1Request.lookback)
.limit(v1Request.limit).build();
}

static List<zipkin.Span> convert(List<zipkin.internal.v2.Span> spans) {
if (spans.isEmpty()) return Collections.emptyList();
int length = spans.size();
List<zipkin.Span> span1s = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
span1s.add(V2SpanConverter.toSpan(spans.get(i)));
}
return span1s;
}
}
Loading

0 comments on commit d51ec34

Please sign in to comment.