Skip to content

Commit

Permalink
More timings
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed May 1, 2024
1 parent 7183195 commit 0213cae
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 0 deletions.
3 changes: 3 additions & 0 deletions exporter/clickhousetracesexporter/clickhouse_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/SigNoz/signoz-otel-collector/usage"
"github.com/SigNoz/signoz-otel-collector/utils"
Expand Down Expand Up @@ -416,6 +417,7 @@ func (s *storage) pushTraceData(ctx context.Context, td ptrace.Traces) error {
case <-s.closeChan:
return errors.New("shutdown has been called")
default:
start := time.Now()
rss := td.ResourceSpans()
var batchOfSpans []*Span
for i := 0; i < rss.Len(); i++ {
Expand All @@ -436,6 +438,7 @@ func (s *storage) pushTraceData(ctx context.Context, td ptrace.Traces) error {
}
}
}
zap.L().Info("Time taken to convert spans to structured spans", zap.Int64("time", time.Since(start).Milliseconds()))
err := s.Writer.WriteBatchOfSpans(ctx, batchOfSpans)
if err != nil {
zap.S().Error("Error in writing spans to clickhouse: ", err)
Expand Down
3 changes: 3 additions & 0 deletions receiver/signozkafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
if !c.messageMarking.After {
session.MarkMessage(message, "")
}
c.logger.Info("Time taken to claim message", zap.Int64("time", time.Since(start).Milliseconds()))

ctx := c.obsrecv.StartTracesOp(session.Context())
statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}
Expand All @@ -501,6 +502,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
spanCount := traces.SpanCount()
c.logger.Info("Time taken to unmarshal traces message", zap.Int64("time", time.Since(start).Milliseconds()), zap.Int("spanCount", spanCount))
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
c.logger.Info("Time taken to consume traces message", zap.Int64("time", time.Since(start).Milliseconds()))
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
if err != nil {
c.logger.Error("kafka receiver: failed to export traces", zap.Error(err), zap.Int32("partition", claim.Partition()), zap.String("topic", claim.Topic()))
Expand All @@ -515,6 +517,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
if !c.autocommitEnabled {
session.Commit()
}
c.logger.Info("Time taken to process traces message", zap.Int64("time", time.Since(start).Milliseconds()))
err = stats.RecordWithTags(ctx, statsTags, processingTime.M(time.Since(start).Milliseconds()))
if err != nil {
c.logger.Error("failed to record processing time", zap.Error(err))
Expand Down

0 comments on commit 0213cae

Please sign in to comment.