Skip to content

Commit

Permalink
Add processing timings for each step in traces pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed May 1, 2024
1 parent 076a33a commit d20e242
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 2 deletions.
19 changes: 19 additions & 0 deletions exporter/clickhousetracesexporter/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func NewSpanWriter(options WriterOptions) *SpanWriter {
func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) error {
var statement driver.Batch
var err error
var prepareStart time.Time = time.Now()

defer func() {
if statement != nil {
Expand Down Expand Up @@ -151,6 +152,8 @@ func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) er
}
}

w.logger.Info("Time taken to prepare batch for index table", zap.Int64("time", time.Since(prepareStart).Milliseconds()))

start := time.Now()

err = statement.Send()
Expand All @@ -167,6 +170,7 @@ func (w *SpanWriter) writeTagBatch(ctx context.Context, batchSpans []*Span) erro
var tagKeyStatement driver.Batch
var tagStatement driver.Batch
var err error
var prepareStart time.Time = time.Now()

defer func() {
if tagKeyStatement != nil {
Expand Down Expand Up @@ -264,6 +268,8 @@ func (w *SpanWriter) writeTagBatch(ctx context.Context, batchSpans []*Span) erro
}
}

w.logger.Info("Time taken to prepare batch for tag/tagKey tables", zap.Int64("time", time.Since(prepareStart).Milliseconds()))

tagStart := time.Now()
err = tagStatement.Send()
stats.RecordWithTags(ctx,
Expand Down Expand Up @@ -298,6 +304,7 @@ func (w *SpanWriter) writeTagBatch(ctx context.Context, batchSpans []*Span) erro
func (w *SpanWriter) writeErrorBatch(ctx context.Context, batchSpans []*Span) error {
var statement driver.Batch
var err error
var prepareStart time.Time = time.Now()

defer func() {
if statement != nil {
Expand Down Expand Up @@ -333,6 +340,8 @@ func (w *SpanWriter) writeErrorBatch(ctx context.Context, batchSpans []*Span) er
}
}

w.logger.Info("Time taken to prepare batch for error table", zap.Int64("time", time.Since(prepareStart).Milliseconds()))

start := time.Now()

err = statement.Send()
Expand All @@ -352,6 +361,7 @@ func stringToBool(s string) bool {
func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) error {
var statement driver.Batch
var err error
var marshalStart time.Time = time.Now()

defer func() {
if statement != nil {
Expand Down Expand Up @@ -388,6 +398,7 @@ func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) er

usage.AddMetric(metrics, *span.Tenant, 1, int64(len(serializedUsage)))
}
w.logger.Info("Time taken to marshal spans for model batch", zap.Int64("time", time.Since(marshalStart).Milliseconds()))
start := time.Now()

err = statement.Send()
Expand All @@ -408,30 +419,38 @@ func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) er

// WriteBatchOfSpans writes the encoded batch of spans
func (w *SpanWriter) WriteBatchOfSpans(ctx context.Context, batch []*Span) error {
start := time.Now()
if w.spansTable != "" {
if err := w.writeModelBatch(ctx, batch); err != nil {
w.logger.Error("Could not write a batch of spans to model table: ", zap.Error(err))
return err
}
}
w.logger.Info("Time taken to write batch of spans to model table", zap.Int64("time", time.Since(start).Milliseconds()))
start = time.Now()
if w.indexTable != "" {
if err := w.writeIndexBatch(ctx, batch); err != nil {
w.logger.Error("Could not write a batch of spans to index table: ", zap.Error(err))
return err
}
}
w.logger.Info("Time taken to write batch of spans to index table", zap.Int64("time", time.Since(start).Milliseconds()))
start = time.Now()
if w.errorTable != "" {
if err := w.writeErrorBatch(ctx, batch); err != nil {
w.logger.Error("Could not write a batch of spans to error table: ", zap.Error(err))
return err
}
}
w.logger.Info("Time taken to write batch of spans to error table", zap.Int64("time", time.Since(start).Milliseconds()))
start = time.Now()
if w.attributeTable != "" && w.attributeKeyTable != "" {
if err := w.writeTagBatch(ctx, batch); err != nil {
w.logger.Error("Could not write a batch of spans to tag/tagKey tables: ", zap.Error(err))
return err
}
}
w.logger.Info("Time taken to write batch of spans to tag/tagKey tables", zap.Int64("time", time.Since(start).Milliseconds()))
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions processor/signozspanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,11 @@ func (p *processorImp) Capabilities() consumer.Capabilities {
// It aggregates the trace data to generate metrics, forwarding these metrics to the discovered metrics exporter.
// The original input trace data will be forwarded to the next consumer, unmodified.
func (p *processorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
start := time.Now()
p.lock.Lock()
p.aggregateMetrics(traces)
p.lock.Unlock()
p.logger.Info("Time taken to aggregate metrics", zap.Int64("time", time.Since(start).Milliseconds()))

// Forward trace data unmodified and propagate trace pipeline errors, if any.
return p.tracesConsumer.ConsumeTraces(ctx, traces)
Expand Down
5 changes: 3 additions & 2 deletions receiver/signozkafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers
func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) error {
ctx, cancel := context.WithCancel(context.Background())
c.cancelConsumeLoop = cancel

obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: c.settings.ID,
Transport: transport,
Expand Down Expand Up @@ -275,7 +275,7 @@ func (c *kafkaMetricsConsumer) Shutdown(context.Context) error {
func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers map[string]LogsUnmarshaler, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) {
// set sarama library's logger to get detailed logs from the library
sarama.Logger = zap.NewStdLog(set.Logger)

c := sarama.NewConfig()
c = setSaramaConsumerConfig(c, &config.SaramaConsumerConfig)
c.ClientID = config.ClientID
Expand Down Expand Up @@ -497,6 +497,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
}
return err
}
c.logger.Info("Time taken to unmarshal traces message", zap.Int64("time", time.Since(start).Milliseconds()))

spanCount := traces.SpanCount()
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
Expand Down

0 comments on commit d20e242

Please sign in to comment.