-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Zipkin json encoding via HTTP #334
Changes from 7 commits
8adc8db
31748f3
e6da153
cce519b
2603165
64499c2
2b406ae
3211e3d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,7 @@ import ( | |
tchanThrift "github.com/uber/tchannel-go/thrift" | ||
|
||
"github.com/uber/jaeger/cmd/collector/app" | ||
zipkincore "github.com/uber/jaeger/thrift-gen/zipkincore" | ||
"github.com/uber/jaeger/thrift-gen/zipkincore" | ||
) | ||
|
||
// APIHandler handles all HTTP calls to the collector | ||
|
@@ -75,30 +75,36 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) { | |
return | ||
} | ||
|
||
if r.Header.Get("Content-Type") == "application/x-thrift" { | ||
handleZipkinThrift(aH.zipkinSpansHandler, bodyBytes, w) | ||
contentType := r.Header.Get("Content-Type") | ||
var tSpans []*zipkincore.Span | ||
if contentType == "application/x-thrift" { | ||
tSpans, err = deserializeThrift(bodyBytes) | ||
if err != nil { | ||
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest) | ||
return | ||
} | ||
} else if contentType == "application/json" { | ||
tSpans, err = deserializeJSON(bodyBytes) | ||
if err != nil { | ||
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest) | ||
return | ||
} | ||
} else { | ||
http.Error(w, "Only Content-Type:application/x-thrift is supported at the moment", http.StatusBadRequest) | ||
http.Error(w, "Unsupported Content-Type", http.StatusBadRequest) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missing return? |
||
} | ||
|
||
w.WriteHeader(http.StatusAccepted) | ||
} | ||
|
||
func handleZipkinThrift(zHandler app.ZipkinSpansHandler, bodyBytes []byte, w http.ResponseWriter) { | ||
spans, err := deserializeZipkin(bodyBytes) | ||
if err != nil { | ||
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest) | ||
return | ||
if len(tSpans) > 0 { | ||
ctx, _ := tchanThrift.NewContext(time.Minute) | ||
if _, err = aH.zipkinSpansHandler.SubmitZipkinBatch(ctx, tSpans); err != nil { | ||
http.Error(w, fmt.Sprintf("Cannot submit Zipkin batch: %v", err), http.StatusInternalServerError) | ||
return | ||
} | ||
} | ||
|
||
ctx, _ := tchanThrift.NewContext(time.Minute) | ||
if _, err = zHandler.SubmitZipkinBatch(ctx, spans); err != nil { | ||
http.Error(w, fmt.Sprintf("Cannot submit Zipkin batch: %v", err), http.StatusInternalServerError) | ||
return | ||
} | ||
w.WriteHeader(http.StatusAccepted) | ||
} | ||
|
||
func deserializeZipkin(b []byte) ([]*zipkincore.Span, error) { | ||
func deserializeThrift(b []byte) ([]*zipkincore.Span, error) { | ||
buffer := thrift.NewTMemoryBuffer() | ||
buffer.Write(b) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -122,6 +122,46 @@ func TestThriftFormat(t *testing.T) { | |
assert.EqualValues(t, "Cannot submit Zipkin batch: Bad times ahead\n", resBodyStr) | ||
} | ||
|
||
func TestJsonFormat(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: could we use a fixture for this test instead which defines the test cases and then run the cases at once? |
||
server, handler := initializeTestServer(nil) | ||
defer server.Close() | ||
|
||
endpJSON := createEndpoint("foo", "127.0.0.1", "2001:db8::c001", 66) | ||
annoJSON := createAnno("cs", 1515, endpJSON) | ||
binAnnoJSON := createBinAnno("http.status_code", "200", endpJSON) | ||
spanJSON := createSpan("bar", "1234567891234565", "1234567891234567", "1234567891234568", 156, 15145, false, | ||
annoJSON, binAnnoJSON) | ||
statusCode, resBodyStr, err := postBytes(server.URL+`/api/v1/spans`, []byte(spanJSON), createHeader("application/json")) | ||
assert.NoError(t, err) | ||
assert.EqualValues(t, http.StatusAccepted, statusCode) | ||
assert.EqualValues(t, "", resBodyStr) | ||
|
||
endpErrJSON := createEndpoint("", "127.0.0.A", "", 80) | ||
|
||
// error zipkinSpanHandler | ||
handler.zipkinSpansHandler.(*mockZipkinHandler).err = fmt.Errorf("Bad times ahead") | ||
tests := []struct { | ||
payload string | ||
expected string | ||
statusCode int | ||
}{ | ||
{spanJSON, "Cannot submit Zipkin batch: Bad times ahead\n", http.StatusInternalServerError}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please explicitly define the variable names in the struct: ie {payload: spanJSON, etc.} we've been bad about this in the past but should do it for everything going forward |
||
{createSpan("bar", "", "1", "1", 156, 15145, false, annoJSON, binAnnoJSON), | ||
"Unable to process request body: id is not an unsigned long\n", http.StatusBadRequest}, | ||
{createSpan("bar", "ZTA", "1", "1", 156, 15145, false, "", ""), | ||
"Unable to process request body: id is not an unsigned long\n", http.StatusBadRequest}, | ||
{createSpan("bar", "1", "", "1", 156, 15145, false, "", createAnno("cs", 1, endpErrJSON)), | ||
"Unable to process request body: wrong ipv4\n", http.StatusBadRequest}, | ||
} | ||
|
||
for _, test := range tests { | ||
statusCode, resBodyStr, err = postBytes(server.URL+`/api/v1/spans`, []byte(test.payload), createHeader("application/json")) | ||
require.NoError(t, err) | ||
assert.EqualValues(t, test.statusCode, statusCode) | ||
assert.EqualValues(t, test.expected, resBodyStr) | ||
} | ||
} | ||
|
||
func TestGzipEncoding(t *testing.T) { | ||
server, _ := initializeTestServer(nil) | ||
defer server.Close() | ||
|
@@ -148,7 +188,7 @@ func TestGzipBadBody(t *testing.T) { | |
func TestUnsupportedContentType(t *testing.T) { | ||
server, _ := initializeTestServer(nil) | ||
defer server.Close() | ||
statusCode, _, err := postBytes(server.URL+`/api/v1/spans`, []byte{}, createHeader("application/json")) | ||
statusCode, _, err := postBytes(server.URL+`/api/v1/spans`, []byte{}, createHeader("text/html")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. xml would be funnier There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
assert.NoError(t, err) | ||
assert.EqualValues(t, http.StatusBadRequest, statusCode) | ||
} | ||
|
@@ -164,7 +204,7 @@ func TestFormatBadBody(t *testing.T) { | |
|
||
func TestDeserializeWithBadListStart(t *testing.T) { | ||
spanBytes := zipkinSerialize([]*zipkincore.Span{{}}) | ||
_, err := deserializeZipkin(append([]byte{0, 255, 255}, spanBytes...)) | ||
_, err := deserializeThrift(append([]byte{0, 255, 255}, spanBytes...)) | ||
assert.Error(t, err) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,236 @@ | ||
// Copyright (c) 2017 Uber Technologies, Inc. | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a copy | ||
// of this software and associated documentation files (the "Software"), to deal | ||
// in the Software without restriction, including without limitation the rights | ||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
// copies of the Software, and to permit persons to whom the Software is | ||
// furnished to do so, subject to the following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included in | ||
// all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
// THE SOFTWARE. | ||
|
||
package zipkin | ||
|
||
import ( | ||
"encoding/json" | ||
"errors" | ||
"strconv" | ||
"strings" | ||
|
||
"github.com/uber/jaeger/thrift-gen/zipkincore" | ||
) | ||
|
||
type endpoint struct { | ||
ServiceName string `json:"serviceName"` | ||
IPv4 string `json:"ipv4"` | ||
IPv6 string `json:"ipv6"` | ||
Port int16 `json:"port"` | ||
} | ||
type annotation struct { | ||
Endpoint endpoint `json:"endpoint"` | ||
Value string `json:"value"` | ||
Timestamp int64 `json:"timestamp"` | ||
} | ||
type binaryAnnotation struct { | ||
Endpoint endpoint `json:"endpoint"` | ||
Key string `json:"key"` | ||
Value string `json:"value"` | ||
} | ||
type zipkinSpan struct { | ||
ID string `json:"id"` | ||
ParentID string `json:"parentId,omitempty"` | ||
TraceID string `json:"traceId"` | ||
Name string `json:"name"` | ||
Timestamp *int64 `json:"timestamp"` | ||
Duration *int64 `json:"duration"` | ||
Debug bool `json:"debug"` | ||
Annotations []annotation `json:"annotations"` | ||
BinaryAnnotations []binaryAnnotation `json:"binaryAnnotations"` | ||
} | ||
type zipkinSpans []zipkinSpan | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. only used once, better to inline: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was like that before. @black-adder wanted to move it here. https://github.com/uber/jaeger/pull/334#discussion_r133217513 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's fine to define it here as long as it's used in multiple places (which looks like it can). Every reference to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer to use |
||
|
||
var ( | ||
errIsNotUnsignedLog = errors.New("id is not an unsigned long") | ||
errWrongIpv4 = errors.New("wrong ipv4") | ||
) | ||
|
||
func deserializeJSON(body []byte) ([]*zipkincore.Span, error) { | ||
spans, err := decode(body) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return spansToThrift(spans) | ||
} | ||
|
||
func decode(body []byte) ([]zipkinSpan, error) { | ||
var spans zipkinSpans | ||
if err := json.Unmarshal(body, &spans); err != nil { | ||
return nil, err | ||
} | ||
return spans, nil | ||
} | ||
|
||
func spansToThrift(spans []zipkinSpan) ([]*zipkincore.Span, error) { | ||
var tSpans []*zipkincore.Span | ||
for _, span := range spans { | ||
tSpan, err := spanToThrift(span) | ||
if err != nil { | ||
return nil, err | ||
} | ||
tSpans = append(tSpans, tSpan) | ||
} | ||
return tSpans, nil | ||
} | ||
|
||
func spanToThrift(s zipkinSpan) (*zipkincore.Span, error) { | ||
id, err := hexToUnsignedLong(s.ID) | ||
if err != nil { | ||
return nil, err | ||
} | ||
traceID, err := hexToUnsignedLong(s.TraceID) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
tSpan := &zipkincore.Span{ | ||
ID: int64(id), | ||
TraceID: int64(traceID), | ||
Name: s.Name, | ||
Debug: s.Debug, | ||
Timestamp: s.Timestamp, | ||
Duration: s.Duration, | ||
} | ||
|
||
if len(s.TraceID) == 32 { | ||
// take 0-16 | ||
traceIDHigh, err := hexToUnsignedLong(s.TraceID[:16]) | ||
if err != nil { | ||
return nil, err | ||
} | ||
help := int64(traceIDHigh) | ||
tSpan.TraceIDHigh = &help | ||
} | ||
|
||
if len(s.ParentID) > 0 { | ||
parentID, err := hexToUnsignedLong(s.ParentID) | ||
if err != nil { | ||
return nil, err | ||
} | ||
signed := int64(parentID) | ||
tSpan.ParentID = &signed | ||
} | ||
|
||
for _, a := range s.Annotations { | ||
tA, err := annoToThrift(a) | ||
if err != nil { | ||
return nil, err | ||
} | ||
tSpan.Annotations = append(tSpan.Annotations, tA) | ||
} | ||
for _, ba := range s.BinaryAnnotations { | ||
tBa, err := binAnnoToThrift(ba) | ||
if err != nil { | ||
return nil, err | ||
} | ||
tSpan.BinaryAnnotations = append(tSpan.BinaryAnnotations, tBa) | ||
} | ||
|
||
return tSpan, nil | ||
} | ||
|
||
func endpointToThrift(e endpoint) (*zipkincore.Endpoint, error) { | ||
ipv4, err := parseIpv4(e.IPv4) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &zipkincore.Endpoint{ | ||
ServiceName: e.ServiceName, | ||
Port: e.Port, | ||
Ipv4: ipv4, | ||
Ipv6: []byte(e.IPv6), | ||
}, nil | ||
} | ||
|
||
func annoToThrift(a annotation) (*zipkincore.Annotation, error) { | ||
endpoint, err := endpointToThrift(a.Endpoint) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &zipkincore.Annotation{ | ||
Timestamp: a.Timestamp, | ||
Value: a.Value, | ||
Host: endpoint, | ||
}, nil | ||
} | ||
|
||
func binAnnoToThrift(ba binaryAnnotation) (*zipkincore.BinaryAnnotation, error) { | ||
endpoint, err := endpointToThrift(ba.Endpoint) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &zipkincore.BinaryAnnotation{ | ||
Key: ba.Key, | ||
Value: []byte(ba.Value), | ||
Host: endpoint, | ||
AnnotationType: zipkincore.AnnotationType_STRING, | ||
}, nil | ||
} | ||
|
||
func parseIpv4(str string) (int32, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ParseIP returns IP, which is an alias to []byte. If nil return 0 else pack bytes into long |
||
segments := strings.Split(str, ".") | ||
if len(segments) == 1 { | ||
return 0, nil | ||
} | ||
|
||
var ipv4 int32 | ||
for _, segment := range segments { | ||
parsed, err := strconv.ParseInt(segment, 10, 32) | ||
if err != nil { | ||
return 0, errWrongIpv4 | ||
} | ||
parsed32 := int32(parsed) | ||
ipv4 = ipv4<<8 | (parsed32 & 0xff) | ||
} | ||
|
||
return ipv4, nil | ||
} | ||
|
||
func hexToUnsignedLong(hex string) (uint64, error) { | ||
len := len(hex) | ||
if len < 1 || len > 32 { | ||
return 0, errIsNotUnsignedLog | ||
} | ||
|
||
start := 0 | ||
if len > 16 { | ||
start = len - 16 | ||
} | ||
|
||
var result uint64 | ||
for i := start; i < len; i++ { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we use official the builtin golang functions for this: https://play.golang.org/p/0GUjRvy0kw or would it not work? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it produces different numbers There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not use model functions like https://github.com/uber/jaeger/blob/master/model/span.go#L146 It automatically takes care of high/low. |
||
c := hex[i] | ||
result <<= 4 | ||
if c >= '0' && c <= '9' { | ||
result = result | uint64(c-'0') | ||
} else if c >= 'a' && c <= 'f' { | ||
result = result | uint64(c-'a'+10) | ||
} else { | ||
return 0, errIsNotUnsignedLog | ||
} | ||
} | ||
|
||
return result, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can do this outside of the
if contentType==