Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester Processor #944

Merged
merged 5 commits into from
Jul 26, 2018
Merged

Conversation

davit-y
Copy link
Contributor

@davit-y davit-y commented Jul 19, 2018

Which problem is this PR solving?

Short description of the changes

  • Unmarshal consumed messages into spans and write them to storage

@codecov
Copy link

codecov bot commented Jul 20, 2018

Codecov Report

Merging #944 into master will not change coverage.
The diff coverage is 100%.

Impacted file tree graph

@@          Coverage Diff          @@
##           master   #944   +/-   ##
=====================================
  Coverage     100%   100%           
=====================================
  Files         136    138    +2     
  Lines        6322   6343   +21     
=====================================
+ Hits         6322   6343   +21
Impacted Files Coverage Δ
plugin/storage/kafka/unmarshaller.go 100% <100%> (ø)
cmd/ingester/app/processor/span_processor.go 100% <100%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update cac7343...0f34c30. Read the comment docs.

@davit-y davit-y changed the base branch from add-ingester to master July 20, 2018 15:42
@davit-y davit-y force-pushed the ingester-processor branch 2 times, most recently from 250f585 to ece2341 Compare July 20, 2018 17:19
// NewSpanProcessor creates a new SpanProcessor
func NewSpanProcessor(writer spanstore.Writer) SpanProcessor {
return &spanProcessor{
unmarshaller: kafka.NewProtobufUnmarshaller(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend making the unmarshaller configurable by using an Option.

That way, we can default to protobuf, but allow people to easily conveniently override it to anything else.

Copy link
Contributor

@black-adder black-adder left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good stuff

// JSONUnmarshaller implements Unmarshaller
type JSONUnmarshaller struct{}

// NewJSONUnmarshaller constructs a ProtobufUnmarshaller
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this was school, you'd be reprimanded

func (s spanProcessor) Process(message Message) error {
mSpan, err := s.unmarshaller.Unmarshal(message.Value())
if err != nil {
return errors.Wrap(err, "cannot read message")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this message is a bit too generic, maybe could not unmarshal kafka message?

// Message contains the fields of the kafka message that the span processor uses
type Message interface {
Value() []byte
}

// NewSpanProcessor creates a new SpanProcessor
func NewSpanProcessor(writer spanstore.Writer, unmarshaller kafka.Unmarshaller) SpanProcessor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to use functional options here and default to protobuf?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please don't use functional options. Pass an options struct if necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create a Params struct similar to the consumer

@davit-y davit-y force-pushed the ingester-processor branch 3 times, most recently from 990848b to 2017c2e Compare July 25, 2018 05:57
}

// NewSpanProcessor creates a new KafkaSpanProcessor
func NewSpanProcessor(params SpanProcessorParams) KafkaSpanProcessor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you return a pointer instead?

Davit Yeghshatyan added 4 commits July 26, 2018 13:36
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
Signed-off-by: Davit Yeghshatyan <davo@uber.com>
@vprithvi vprithvi merged commit 8f7e497 into jaegertracing:master Jul 26, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants