Skip to content

Commit

Permalink
Drops internal DependencyLinkSpan for Span2
Browse files Browse the repository at this point in the history
This drops the internal type of DependencyLinkSpan in favor of the Span2
type coming in openzipkin#1499. Doing so now gives us practice, solves a few bugs
along the way. When Span2 becomes non-internal, the change will be a
simple package rename.
  • Loading branch information
Adrian Cole authored and abesto committed Sep 10, 2019
1 parent ae8ae76 commit 8f43a55
Show file tree
Hide file tree
Showing 8 changed files with 411 additions and 701 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,32 @@
import java.util.Iterator;
import org.jooq.Record;
import org.jooq.TableField;
import zipkin.internal.DependencyLinkSpan;
import zipkin.Endpoint;
import zipkin.internal.Nullable;
import zipkin.internal.PeekingIterator;
import zipkin.internal.Span2;
import zipkin.storage.mysql.internal.generated.tables.ZipkinSpans;

import static zipkin.Constants.CLIENT_ADDR;
import static zipkin.Constants.CLIENT_SEND;
import static zipkin.Constants.SERVER_ADDR;
import static zipkin.Constants.SERVER_RECV;
import static zipkin.internal.Util.equal;
import static zipkin.storage.mysql.internal.generated.tables.ZipkinAnnotations.ZIPKIN_ANNOTATIONS;

/**
* Convenience that lazy converts rows into {@linkplain DependencyLinkSpan} objects.
* Lazy converts rows into {@linkplain Span2} objects suitable for dependency links. This takes
* short-cuts to require less data. For example, it folds shared RPC spans into one, and doesn't
* include tags, non-core annotations or time units.
*
* <p>Out-of-date schemas may be missing the trace_id_high field. When present, this becomes {@link
* DependencyLinkSpan.TraceId#hi} used as the left-most 16 characters of the traceId in logging
* Span2#traceIdHigh()} used as the left-most 16 characters of the traceId in logging
* statements.
*/
final class DependencyLinkSpanIterator implements Iterator<DependencyLinkSpan> {
final class DependencyLinkSpan2Iterator implements Iterator<Span2> {

/** Assumes the input records are sorted by trace id, span id */
static final class ByTraceId implements Iterator<Iterator<DependencyLinkSpan>> {
static final class ByTraceId implements Iterator<Iterator<Span2>> {
final PeekingIterator<Record> delegate;
final boolean hasTraceIdHigh;

Expand All @@ -53,10 +57,10 @@ static final class ByTraceId implements Iterator<Iterator<DependencyLinkSpan>> {
return delegate.hasNext();
}

@Override public Iterator<DependencyLinkSpan> next() {
@Override public Iterator<Span2> next() {
currentTraceIdHi = hasTraceIdHigh ? traceIdHigh(delegate) : null;
currentTraceIdLo = delegate.peek().getValue(ZipkinSpans.ZIPKIN_SPANS.TRACE_ID);
return new DependencyLinkSpanIterator(delegate, currentTraceIdHi, currentTraceIdLo);
return new DependencyLinkSpan2Iterator(delegate, currentTraceIdHi, currentTraceIdLo);
}

@Override public void remove() {
Expand All @@ -68,7 +72,7 @@ static final class ByTraceId implements Iterator<Iterator<DependencyLinkSpan>> {
@Nullable final Long traceIdHi;
final long traceIdLo;

DependencyLinkSpanIterator(PeekingIterator<Record> delegate, Long traceIdHi, long traceIdLo) {
DependencyLinkSpan2Iterator(PeekingIterator<Record> delegate, Long traceIdHi, long traceIdLo) {
this.delegate = delegate;
this.traceIdHi = traceIdHi;
this.traceIdLo = traceIdLo;
Expand All @@ -83,17 +87,11 @@ public boolean hasNext() {
}

@Override
public DependencyLinkSpan next() {
public Span2 next() {
Record row = delegate.peek();

long spanId = row.getValue(ZipkinSpans.ZIPKIN_SPANS.ID);
DependencyLinkSpan.Builder result = DependencyLinkSpan.builder(
traceIdHi != null ? traceIdHi : 0L,
traceIdLo,
row.getValue(ZipkinSpans.ZIPKIN_SPANS.PARENT_ID),
spanId
);

String srService = null, csService = null, caService = null, saService = null;
while (hasNext()) { // there are more values for this trace
if (spanId != delegate.peek().getValue(ZipkinSpans.ZIPKIN_SPANS.ID)) {
break; // if we are in a new span
Expand All @@ -105,18 +103,48 @@ public DependencyLinkSpan next() {
if (key == null || value == null) continue; // neither client nor server
switch (key) {
case CLIENT_ADDR:
result.caService(value);
caService = value;
break;
case CLIENT_SEND:
result.csService(value);
csService = value;
break;
case SERVER_ADDR:
result.saService(value);
saService = value;
break;
case SERVER_RECV:
result.srService(value);
srService = value;
}
}

// The client address is more authoritative than the client send owner.
if (caService == null) caService = csService;

// Finagle labels two sides of the same socket ("ca", "sa") with the same name.
// Skip the client side, so it isn't mistaken for a loopback request
if (equal(saService, caService)) caService = null;

Span2.Builder result = Span2.builder()
.traceIdHigh(traceIdHi != null ? traceIdHi : 0L)
.traceId(traceIdLo)
.parentId(row.getValue(ZipkinSpans.ZIPKIN_SPANS.PARENT_ID))
.id(spanId);

if (srService != null) {
return result.kind(Span2.Kind.SERVER)
.localEndpoint(ep(srService))
.remoteEndpoint(ep(caService))
.build();
} else if (saService != null) {
return result
.kind(csService != null ? Span2.Kind.CLIENT : null)
.localEndpoint(ep(caService))
.remoteEndpoint(ep(saService))
.build();
} else if (csService != null) {
return result.kind(Span2.Kind.SERVER)
.localEndpoint(ep(caService))
.build();
}
return result.build();
}

Expand All @@ -129,8 +157,12 @@ static long traceIdHigh(PeekingIterator<Record> delegate) {
return delegate.peek().getValue(ZipkinSpans.ZIPKIN_SPANS.TRACE_ID_HIGH);
}

static String emptyToNull(Record next, TableField<Record, String> field) {
static @Nullable String emptyToNull(Record next, TableField<Record, String> field) {
String result = next.getValue(field);
return result != null && !"".equals(result) ? result : null;
}

static Endpoint ep(@Nullable String serviceName) {
return serviceName != null ? Endpoint.builder().serviceName(serviceName).build() : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@
import zipkin.DependencyLink;
import zipkin.Endpoint;
import zipkin.Span;
import zipkin.internal.DependencyLinkSpan;
import zipkin.internal.DependencyLinker;
import zipkin.internal.GroupByTraceId;
import zipkin.internal.Nullable;
import zipkin.internal.Pair;
import zipkin.internal.Span2;
import zipkin.storage.QueryRequest;
import zipkin.storage.SpanStore;
import zipkin.storage.mysql.internal.generated.tables.ZipkinAnnotations;
Expand Down Expand Up @@ -324,8 +324,8 @@ List<DependencyLink> aggregateDependencies(long endTs, @Nullable Long lookback,
// Grouping so that later code knows when a span or trace is finished.
.groupBy(schema.dependencyLinkGroupByFields).fetchLazy();

Iterator<Iterator<DependencyLinkSpan>> traces =
new DependencyLinkSpanIterator.ByTraceId(cursor.iterator(), schema.hasTraceIdHigh);
Iterator<Iterator<Span2>> traces =
new DependencyLinkSpan2Iterator.ByTraceId(cursor.iterator(), schema.hasTraceIdHigh);

if (!traces.hasNext()) return Collections.emptyList();

Expand Down
Loading

0 comments on commit 8f43a55

Please sign in to comment.