diff --git a/exporter/clickhousetracesexporter/writer.go b/exporter/clickhousetracesexporter/writer.go index 81848a60..b26108a1 100644 --- a/exporter/clickhousetracesexporter/writer.go +++ b/exporter/clickhousetracesexporter/writer.go @@ -93,6 +93,7 @@ func NewSpanWriter(options WriterOptions) *SpanWriter { func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) error { var statement driver.Batch var err error + var prepareStart time.Time = time.Now() defer func() { if statement != nil { @@ -151,6 +152,8 @@ func (w *SpanWriter) writeIndexBatch(ctx context.Context, batchSpans []*Span) er } } + w.logger.Info("Time taken to prepare batch for index table", zap.Int64("time", time.Since(prepareStart).Milliseconds())) + start := time.Now() err = statement.Send() @@ -167,6 +170,7 @@ func (w *SpanWriter) writeTagBatch(ctx context.Context, batchSpans []*Span) erro var tagKeyStatement driver.Batch var tagStatement driver.Batch var err error + var prepareStart time.Time = time.Now() defer func() { if tagKeyStatement != nil { @@ -264,6 +268,8 @@ func (w *SpanWriter) writeTagBatch(ctx context.Context, batchSpans []*Span) erro } } + w.logger.Info("Time taken to prepare batch for tag/tagKey tables", zap.Int64("time", time.Since(prepareStart).Milliseconds())) + tagStart := time.Now() err = tagStatement.Send() stats.RecordWithTags(ctx, @@ -298,6 +304,7 @@ func (w *SpanWriter) writeTagBatch(ctx context.Context, batchSpans []*Span) erro func (w *SpanWriter) writeErrorBatch(ctx context.Context, batchSpans []*Span) error { var statement driver.Batch var err error + var prepareStart time.Time = time.Now() defer func() { if statement != nil { @@ -333,6 +340,8 @@ func (w *SpanWriter) writeErrorBatch(ctx context.Context, batchSpans []*Span) er } } + w.logger.Info("Time taken to prepare batch for error table", zap.Int64("time", time.Since(prepareStart).Milliseconds())) + start := time.Now() err = statement.Send() @@ -352,6 +361,7 @@ func stringToBool(s string) bool { func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) error { var statement driver.Batch var err error + var marshalStart time.Time = time.Now() defer func() { if statement != nil { @@ -388,6 +398,7 @@ func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) er usage.AddMetric(metrics, *span.Tenant, 1, int64(len(serializedUsage))) } + w.logger.Info("Time taken to marshal spans for model batch", zap.Int64("time", time.Since(marshalStart).Milliseconds())) start := time.Now() err = statement.Send() @@ -408,30 +419,38 @@ func (w *SpanWriter) writeModelBatch(ctx context.Context, batchSpans []*Span) er // WriteBatchOfSpans writes the encoded batch of spans func (w *SpanWriter) WriteBatchOfSpans(ctx context.Context, batch []*Span) error { + start := time.Now() if w.spansTable != "" { if err := w.writeModelBatch(ctx, batch); err != nil { w.logger.Error("Could not write a batch of spans to model table: ", zap.Error(err)) return err } } + w.logger.Info("Time taken to write batch of spans to model table", zap.Int64("time", time.Since(start).Milliseconds())) + start = time.Now() if w.indexTable != "" { if err := w.writeIndexBatch(ctx, batch); err != nil { w.logger.Error("Could not write a batch of spans to index table: ", zap.Error(err)) return err } } + w.logger.Info("Time taken to write batch of spans to index table", zap.Int64("time", time.Since(start).Milliseconds())) + start = time.Now() if w.errorTable != "" { if err := w.writeErrorBatch(ctx, batch); err != nil { w.logger.Error("Could not write a batch of spans to error table: ", zap.Error(err)) return err } } + w.logger.Info("Time taken to write batch of spans to error table", zap.Int64("time", time.Since(start).Milliseconds())) + start = time.Now() if w.attributeTable != "" && w.attributeKeyTable != "" { if err := w.writeTagBatch(ctx, batch); err != nil { w.logger.Error("Could not write a batch of spans to tag/tagKey tables: ", zap.Error(err)) return err } } + w.logger.Info("Time taken to write batch of spans to tag/tagKey tables", zap.Int64("time", time.Since(start).Milliseconds())) return nil } diff --git a/processor/signozspanmetricsprocessor/processor.go b/processor/signozspanmetricsprocessor/processor.go index 29cc04f6..ecc34969 100644 --- a/processor/signozspanmetricsprocessor/processor.go +++ b/processor/signozspanmetricsprocessor/processor.go @@ -439,9 +439,11 @@ func (p *processorImp) Capabilities() consumer.Capabilities { // It aggregates the trace data to generate metrics, forwarding these metrics to the discovered metrics exporter. // The original input trace data will be forwarded to the next consumer, unmodified. func (p *processorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error { + start := time.Now() p.lock.Lock() p.aggregateMetrics(traces) p.lock.Unlock() + p.logger.Info("Time taken to aggregate metrics", zap.Int64("time", time.Since(start).Milliseconds())) // Forward trace data unmodified and propagate trace pipeline errors, if any. return p.tracesConsumer.ConsumeTraces(ctx, traces) diff --git a/receiver/signozkafkareceiver/kafka_receiver.go b/receiver/signozkafkareceiver/kafka_receiver.go index 30082746..fb6a9eb2 100644 --- a/receiver/signozkafkareceiver/kafka_receiver.go +++ b/receiver/signozkafkareceiver/kafka_receiver.go @@ -125,7 +125,7 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) error { ctx, cancel := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancel - + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: c.settings.ID, Transport: transport, @@ -275,7 +275,7 @@ func (c *kafkaMetricsConsumer) Shutdown(context.Context) error { func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers map[string]LogsUnmarshaler, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) { // set sarama library's logger to get detailed logs from the library sarama.Logger = zap.NewStdLog(set.Logger) - + c := sarama.NewConfig() c = setSaramaConsumerConfig(c, &config.SaramaConsumerConfig) c.ClientID = config.ClientID @@ -497,6 +497,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe } return err } + c.logger.Info("Time taken to unmarshal traces message", zap.Int64("time", time.Since(start).Milliseconds())) spanCount := traces.SpanCount() err = c.nextConsumer.ConsumeTraces(session.Context(), traces)