diff --git a/cmd/ingester/app/processor/span_processor.go b/cmd/ingester/app/processor/span_processor.go index 3b29bd8c5a6..41b80c2c6cd 100644 --- a/cmd/ingester/app/processor/span_processor.go +++ b/cmd/ingester/app/processor/span_processor.go @@ -16,6 +16,11 @@ package processor import ( "io" + + "github.com/pkg/errors" + + "github.com/jaegertracing/jaeger/plugin/storage/kafka" + "github.com/jaegertracing/jaeger/storage/spanstore" ) //go:generate mockery -name=SpanProcessor @@ -26,7 +31,30 @@ type SpanProcessor interface { io.Closer } +type spanProcessor struct { + unmarshaller kafka.Unmarshaller + writer spanstore.Writer + io.Closer +} + // Message contains the fields of the kafka message that the span processor uses type Message interface { Value() []byte } + +// NewSpanProcessor creates a new SpanProcessor +func NewSpanProcessor(writer spanstore.Writer) SpanProcessor { + return &spanProcessor{ + unmarshaller: kafka.NewProtobufUnmarshaller(), + writer: writer, + } +} + +// Process unmarshals and writes a single kafka message +func (s spanProcessor) Process(message Message) error { + mSpan, err := s.unmarshaller.Unmarshal(message.Value()) + if err != nil { + return errors.Wrap(err, "cannot read message") + } + return s.writer.WriteSpan(mSpan) +} diff --git a/cmd/ingester/app/processor/span_processor_test.go b/cmd/ingester/app/processor/span_processor_test.go new file mode 100644 index 00000000000..74bf0e377b8 --- /dev/null +++ b/cmd/ingester/app/processor/span_processor_test.go @@ -0,0 +1,73 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + + kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks" + "github.com/jaegertracing/jaeger/model" + umocks "github.com/jaegertracing/jaeger/pkg/kafka/mocks" + smocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" +) + +func TestNewSpanProcessor(t *testing.T) { + assert.NotNil(t, NewSpanProcessor(&smocks.Writer{})) +} + +func TestSpanProcessor_Process(t *testing.T) { + writer := &smocks.Writer{} + unmarshallerMock := &umocks.Unmarshaller{} + processor := &spanProcessor{ + unmarshaller: unmarshallerMock, + writer: writer, + } + + message := &kmocks.Message{} + data := []byte("police") + span := &model.Span{} + + message.On("Value").Return(data) + unmarshallerMock.On("Unmarshal", data).Return(span, nil) + writer.On("WriteSpan", span).Return(nil) + + assert.Nil(t, processor.Process(message)) + + message.AssertExpectations(t) + writer.AssertExpectations(t) +} + +func TestSpanProcessor_ProcessError(t *testing.T) { + writer := &smocks.Writer{} + unmarshallerMock := &umocks.Unmarshaller{} + processor := &spanProcessor{ + unmarshaller: unmarshallerMock, + writer: writer, + } + + message := &kmocks.Message{} + data := []byte("police") + + message.On("Value").Return(data) + unmarshallerMock.On("Unmarshal", data).Return(nil, errors.New("moocow")) + + assert.Error(t, processor.Process(message)) + + message.AssertExpectations(t) + writer.AssertNotCalled(t, "WriteSpan") +} diff --git a/pkg/kafka/mocks/Unmarshaller.go b/pkg/kafka/mocks/Unmarshaller.go new file mode 100644 index 00000000000..e879bed9399 --- /dev/null +++ b/pkg/kafka/mocks/Unmarshaller.go @@ -0,0 +1,48 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocks + +import mock "github.com/stretchr/testify/mock" +import model "github.com/jaegertracing/jaeger/model" + +// Unmarshaller is an autogenerated mock type for the Unmarshaller type +type Unmarshaller struct { + mock.Mock +} + +// Unmarshal provides a mock function with given fields: _a0 +func (_m *Unmarshaller) Unmarshal(_a0 []byte) (*model.Span, error) { + ret := _m.Called(_a0) + + var r0 *model.Span + if rf, ok := ret.Get(0).(func([]byte) *model.Span); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.Span) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func([]byte) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/plugin/storage/kafka/marshaller_test.go b/plugin/storage/kafka/marshalling_test.go similarity index 62% rename from plugin/storage/kafka/marshaller_test.go rename to plugin/storage/kafka/marshalling_test.go index 785551ea641..3ec556ed32f 100644 --- a/plugin/storage/kafka/marshaller_test.go +++ b/plugin/storage/kafka/marshalling_test.go @@ -20,20 +20,22 @@ import ( "github.com/stretchr/testify/assert" ) -func TestProtoMarshaller(t *testing.T) { - marshaller := newProtobufMarshaller() +func TestProtobufMarshallerAndUnmarshaller(t *testing.T) { + testMarshallerAndUnmarshaller(t, newProtobufMarshaller(), NewProtobufUnmarshaller()) +} + +func TestJSONMarshallerAndUnmarshaller(t *testing.T) { + testMarshallerAndUnmarshaller(t, newJSONMarshaller(), NewJSONUnmarshaller()) +} +func testMarshallerAndUnmarshaller(t *testing.T, marshaller Marshaller, unmarshaller Unmarshaller) { bytes, err := marshaller.Marshal(sampleSpan) assert.NoError(t, err) assert.NotNil(t, bytes) -} -func TestJSONMarshaller(t *testing.T) { - marshaller := newJSONMarshaller() - - bytes, err := marshaller.Marshal(sampleSpan) + resultSpan, err := unmarshaller.Unmarshal(bytes) assert.NoError(t, err) - assert.NotNil(t, bytes) + assert.Equal(t, sampleSpan, resultSpan) } diff --git a/plugin/storage/kafka/unmarshaller.go b/plugin/storage/kafka/unmarshaller.go new file mode 100644 index 00000000000..140b85bb771 --- /dev/null +++ b/plugin/storage/kafka/unmarshaller.go @@ -0,0 +1,59 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "bytes" + + "github.com/gogo/protobuf/jsonpb" + "github.com/gogo/protobuf/proto" + + "github.com/jaegertracing/jaeger/model" +) + +// Unmarshaller decodes a byte array to a span +type Unmarshaller interface { + Unmarshal([]byte) (*model.Span, error) +} + +// ProtobufUnmarshaller implements Unmarshaller +type ProtobufUnmarshaller struct{} + +// NewProtobufUnmarshaller constructs a ProtobufUnmarshaller +func NewProtobufUnmarshaller() *ProtobufUnmarshaller { + return &ProtobufUnmarshaller{} +} + +// Unmarshal decodes a protobuf byte array to a span +func (h *ProtobufUnmarshaller) Unmarshal(msg []byte) (*model.Span, error) { + newSpan := &model.Span{} + err := proto.Unmarshal(msg, newSpan) + return newSpan, err +} + +// JSONUnmarshaller implements Unmarshaller +type JSONUnmarshaller struct{} + +// NewJSONUnmarshaller constructs a ProtobufUnmarshaller +func NewJSONUnmarshaller() *JSONUnmarshaller { + return &JSONUnmarshaller{} +} + +// Unmarshal decodes a json byte array to a span +func (h *JSONUnmarshaller) Unmarshal(msg []byte) (*model.Span, error) { + newSpan := &model.Span{} + err := jsonpb.Unmarshal(bytes.NewReader(msg), newSpan) + return newSpan, err +}