Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zipkin json encoding via HTTP #334

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 24 additions & 18 deletions cmd/collector/app/zipkin/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
tchanThrift "github.com/uber/tchannel-go/thrift"

"github.com/uber/jaeger/cmd/collector/app"
zipkincore "github.com/uber/jaeger/thrift-gen/zipkincore"
"github.com/uber/jaeger/thrift-gen/zipkincore"
)

// APIHandler handles all HTTP calls to the collector
Expand Down Expand Up @@ -75,30 +75,36 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) {
return
}

if r.Header.Get("Content-Type") == "application/x-thrift" {
handleZipkinThrift(aH.zipkinSpansHandler, bodyBytes, w)
contentType := r.Header.Get("Content-Type")
var tSpans []*zipkincore.Span
if contentType == "application/x-thrift" {
tSpans, err = deserializeThrift(bodyBytes)
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can do this outside of the if contentType==

http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
return
}
} else if contentType == "application/json" {
tSpans, err = deserializeJSON(bodyBytes)
if err != nil {
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
return
}
} else {
http.Error(w, "Only Content-Type:application/x-thrift is supported at the moment", http.StatusBadRequest)
http.Error(w, "Unsupported Content-Type", http.StatusBadRequest)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing return?

}

w.WriteHeader(http.StatusAccepted)
}

func handleZipkinThrift(zHandler app.ZipkinSpansHandler, bodyBytes []byte, w http.ResponseWriter) {
spans, err := deserializeZipkin(bodyBytes)
if err != nil {
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
return
if len(tSpans) > 0 {
ctx, _ := tchanThrift.NewContext(time.Minute)
if _, err = aH.zipkinSpansHandler.SubmitZipkinBatch(ctx, tSpans); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Zipkin batch: %v", err), http.StatusInternalServerError)
return
}
}

ctx, _ := tchanThrift.NewContext(time.Minute)
if _, err = zHandler.SubmitZipkinBatch(ctx, spans); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Zipkin batch: %v", err), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusAccepted)
}

func deserializeZipkin(b []byte) ([]*zipkincore.Span, error) {
func deserializeThrift(b []byte) ([]*zipkincore.Span, error) {
buffer := thrift.NewTMemoryBuffer()
buffer.Write(b)

Expand Down
44 changes: 42 additions & 2 deletions cmd/collector/app/zipkin/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,46 @@ func TestThriftFormat(t *testing.T) {
assert.EqualValues(t, "Cannot submit Zipkin batch: Bad times ahead\n", resBodyStr)
}

func TestJsonFormat(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we use a fixture for this test instead which defines the test cases and then run the cases at once?

server, handler := initializeTestServer(nil)
defer server.Close()

endpJSON := createEndpoint("foo", "127.0.0.1", "2001:db8::c001", 66)
annoJSON := createAnno("cs", 1515, endpJSON)
binAnnoJSON := createBinAnno("http.status_code", "200", endpJSON)
spanJSON := createSpan("bar", "1234567891234565", "1234567891234567", "1234567891234568", 156, 15145, false,
annoJSON, binAnnoJSON)
statusCode, resBodyStr, err := postBytes(server.URL+`/api/v1/spans`, []byte(spanJSON), createHeader("application/json"))
assert.NoError(t, err)
assert.EqualValues(t, http.StatusAccepted, statusCode)
assert.EqualValues(t, "", resBodyStr)

endpErrJSON := createEndpoint("", "127.0.0.A", "", 80)

// error zipkinSpanHandler
handler.zipkinSpansHandler.(*mockZipkinHandler).err = fmt.Errorf("Bad times ahead")
tests := []struct {
payload string
expected string
statusCode int
}{
{spanJSON, "Cannot submit Zipkin batch: Bad times ahead\n", http.StatusInternalServerError},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please explicitly define the variable names in the struct: ie {payload: spanJSON, etc.}

we've been bad about this in the past but should do it for everything going forward

{createSpan("bar", "", "1", "1", 156, 15145, false, annoJSON, binAnnoJSON),
"Unable to process request body: id is not an unsigned long\n", http.StatusBadRequest},
{createSpan("bar", "ZTA", "1", "1", 156, 15145, false, "", ""),
"Unable to process request body: id is not an unsigned long\n", http.StatusBadRequest},
{createSpan("bar", "1", "", "1", 156, 15145, false, "", createAnno("cs", 1, endpErrJSON)),
"Unable to process request body: wrong ipv4\n", http.StatusBadRequest},
}

for _, test := range tests {
statusCode, resBodyStr, err = postBytes(server.URL+`/api/v1/spans`, []byte(test.payload), createHeader("application/json"))
require.NoError(t, err)
assert.EqualValues(t, test.statusCode, statusCode)
assert.EqualValues(t, test.expected, resBodyStr)
}
}

func TestGzipEncoding(t *testing.T) {
server, _ := initializeTestServer(nil)
defer server.Close()
Expand All @@ -148,7 +188,7 @@ func TestGzipBadBody(t *testing.T) {
func TestUnsupportedContentType(t *testing.T) {
server, _ := initializeTestServer(nil)
defer server.Close()
statusCode, _, err := postBytes(server.URL+`/api/v1/spans`, []byte{}, createHeader("application/json"))
statusCode, _, err := postBytes(server.URL+`/api/v1/spans`, []byte{}, createHeader("text/html"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xml would be funnier

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

video/mp4 :P

assert.NoError(t, err)
assert.EqualValues(t, http.StatusBadRequest, statusCode)
}
Expand All @@ -164,7 +204,7 @@ func TestFormatBadBody(t *testing.T) {

func TestDeserializeWithBadListStart(t *testing.T) {
spanBytes := zipkinSerialize([]*zipkincore.Span{{}})
_, err := deserializeZipkin(append([]byte{0, 255, 255}, spanBytes...))
_, err := deserializeThrift(append([]byte{0, 255, 255}, spanBytes...))
assert.Error(t, err)
}

Expand Down
236 changes: 236 additions & 0 deletions cmd/collector/app/zipkin/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package zipkin

import (
"encoding/json"
"errors"
"strconv"
"strings"

"github.com/uber/jaeger/thrift-gen/zipkincore"
)

type endpoint struct {
ServiceName string `json:"serviceName"`
IPv4 string `json:"ipv4"`
IPv6 string `json:"ipv6"`
Port int16 `json:"port"`
}
type annotation struct {
Endpoint endpoint `json:"endpoint"`
Value string `json:"value"`
Timestamp int64 `json:"timestamp"`
}
type binaryAnnotation struct {
Endpoint endpoint `json:"endpoint"`
Key string `json:"key"`
Value string `json:"value"`
}
type zipkinSpan struct {
ID string `json:"id"`
ParentID string `json:"parentId,omitempty"`
TraceID string `json:"traceId"`
Name string `json:"name"`
Timestamp *int64 `json:"timestamp"`
Duration *int64 `json:"duration"`
Debug bool `json:"debug"`
Annotations []annotation `json:"annotations"`
BinaryAnnotations []binaryAnnotation `json:"binaryAnnotations"`
}
type zipkinSpans []zipkinSpan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only used once, better to inline: var spans []zipkinSpan

Copy link
Member Author

@pavolloffay pavolloffay Aug 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to define it here as long as it's used in multiple places (which looks like it can). Every reference to []zipkinSpan can be replaced with zipkinSpans. I count at least 3 instances where this happens.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to use []zipkinSpan directly, less obfuscation. The type alias is useful when you add extra behavior to it, like we do with []model.Tag (sorting, etc.)


var (
errIsNotUnsignedLog = errors.New("id is not an unsigned long")
errWrongIpv4 = errors.New("wrong ipv4")
)

func deserializeJSON(body []byte) ([]*zipkincore.Span, error) {
spans, err := decode(body)
if err != nil {
return nil, err
}

return spansToThrift(spans)
}

func decode(body []byte) ([]zipkinSpan, error) {
var spans zipkinSpans
if err := json.Unmarshal(body, &spans); err != nil {
return nil, err
}
return spans, nil
}

func spansToThrift(spans []zipkinSpan) ([]*zipkincore.Span, error) {
var tSpans []*zipkincore.Span
for _, span := range spans {
tSpan, err := spanToThrift(span)
if err != nil {
return nil, err
}
tSpans = append(tSpans, tSpan)
}
return tSpans, nil
}

func spanToThrift(s zipkinSpan) (*zipkincore.Span, error) {
id, err := hexToUnsignedLong(s.ID)
if err != nil {
return nil, err
}
traceID, err := hexToUnsignedLong(s.TraceID)
if err != nil {
return nil, err
}

tSpan := &zipkincore.Span{
ID: int64(id),
TraceID: int64(traceID),
Name: s.Name,
Debug: s.Debug,
Timestamp: s.Timestamp,
Duration: s.Duration,
}

if len(s.TraceID) == 32 {
// take 0-16
traceIDHigh, err := hexToUnsignedLong(s.TraceID[:16])
if err != nil {
return nil, err
}
help := int64(traceIDHigh)
tSpan.TraceIDHigh = &help
}

if len(s.ParentID) > 0 {
parentID, err := hexToUnsignedLong(s.ParentID)
if err != nil {
return nil, err
}
signed := int64(parentID)
tSpan.ParentID = &signed
}

for _, a := range s.Annotations {
tA, err := annoToThrift(a)
if err != nil {
return nil, err
}
tSpan.Annotations = append(tSpan.Annotations, tA)
}
for _, ba := range s.BinaryAnnotations {
tBa, err := binAnnoToThrift(ba)
if err != nil {
return nil, err
}
tSpan.BinaryAnnotations = append(tSpan.BinaryAnnotations, tBa)
}

return tSpan, nil
}

func endpointToThrift(e endpoint) (*zipkincore.Endpoint, error) {
ipv4, err := parseIpv4(e.IPv4)
if err != nil {
return nil, err
}

return &zipkincore.Endpoint{
ServiceName: e.ServiceName,
Port: e.Port,
Ipv4: ipv4,
Ipv6: []byte(e.IPv6),
}, nil
}

func annoToThrift(a annotation) (*zipkincore.Annotation, error) {
endpoint, err := endpointToThrift(a.Endpoint)
if err != nil {
return nil, err
}

return &zipkincore.Annotation{
Timestamp: a.Timestamp,
Value: a.Value,
Host: endpoint,
}, nil
}

func binAnnoToThrift(ba binaryAnnotation) (*zipkincore.BinaryAnnotation, error) {
endpoint, err := endpointToThrift(ba.Endpoint)
if err != nil {
return nil, err
}

return &zipkincore.BinaryAnnotation{
Key: ba.Key,
Value: []byte(ba.Value),
Host: endpoint,
AnnotationType: zipkincore.AnnotationType_STRING,
}, nil
}

func parseIpv4(str string) (int32, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParseIP returns IP, which is an alias to []byte. If nil return 0 else pack bytes into long

segments := strings.Split(str, ".")
if len(segments) == 1 {
return 0, nil
}

var ipv4 int32
for _, segment := range segments {
parsed, err := strconv.ParseInt(segment, 10, 32)
if err != nil {
return 0, errWrongIpv4
}
parsed32 := int32(parsed)
ipv4 = ipv4<<8 | (parsed32 & 0xff)
}

return ipv4, nil
}

func hexToUnsignedLong(hex string) (uint64, error) {
len := len(hex)
if len < 1 || len > 32 {
return 0, errIsNotUnsignedLog
}

start := 0
if len > 16 {
start = len - 16
}

var result uint64
for i := start; i < len; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use official the builtin golang functions for this: https://play.golang.org/p/0GUjRvy0kw or would it not work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it produces different numbers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use model functions like model/TraceIDFromString?

https://github.com/uber/jaeger/blob/master/model/span.go#L146

It automatically takes care of high/low.

c := hex[i]
result <<= 4
if c >= '0' && c <= '9' {
result = result | uint64(c-'0')
} else if c >= 'a' && c <= 'f' {
result = result | uint64(c-'a'+10)
} else {
return 0, errIsNotUnsignedLog
}
}

return result, nil
}
Loading