Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace es-spanstore tracing instrumentation with OpenTelemetry #4596

Merged
merged 16 commits into from
Jul 30, 2023
2 changes: 2 additions & 0 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.opentelemetry.io/otel"
_ "go.uber.org/automaxprocs"
"go.uber.org/zap"

Expand Down Expand Up @@ -104,6 +105,7 @@ by default uses only in-memory database.`,
if err != nil {
logger.Fatal("Failed to initialize tracer", zap.Error(err))
}
otel.SetTracerProvider(tracer.OTEL)

storageFactory.InitFromViper(v, logger)
if err := storageFactory.Initialize(metricsFactory, logger); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.opentelemetry.io/otel"
_ "go.uber.org/automaxprocs"
"go.uber.org/zap"

Expand Down Expand Up @@ -76,6 +77,7 @@ func main() {
if err != nil {
logger.Fatal("Failed to create tracer:", zap.Error(err))
}
otel.SetTracerProvider(jtracer.OTEL)
queryOpts, err := new(app.QueryOptions).InitFromViper(v, logger)
if err != nil {
logger.Fatal("Failed to configure query service", zap.Error(err))
Expand Down
34 changes: 20 additions & 14 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"io"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/es"
Expand Down Expand Up @@ -50,6 +52,7 @@ type Factory struct {

metricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider
afzal442 marked this conversation as resolved.
Show resolved Hide resolved

newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)

Expand All @@ -64,6 +67,7 @@ func NewFactory() *Factory {
return &Factory{
Options: NewOptions(primaryNamespace, archiveNamespace),
newClientFn: config.NewClient,
tracer: otel.GetTracerProvider(),
}
}

Expand Down Expand Up @@ -108,49 +112,48 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return createSpanReader(f.metricsFactory, f.logger, f.primaryClient, f.primaryConfig, false)
return createSpanReader(f.primaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer)
}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.metricsFactory, f.logger, f.primaryClient, f.primaryConfig, false)
return createSpanWriter(f.primaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return createDependencyReader(f.logger, f.primaryClient, f.primaryConfig)
return createDependencyReader(f.primaryClient, f.primaryConfig, f.logger)
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true)
return createSpanReader(f.archiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer)
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true)
return createSpanWriter(f.archiveClient, f.archiveConfig, true, f.metricsFactory, f.logger)
}

func createSpanReader(
mFactory metrics.Factory,
logger *zap.Logger,
client es.Client,
cfg *config.Configuration,
archive bool,
mFactory metrics.Factory,
logger *zap.Logger,
tp trace.TracerProvider,
) (spanstore.Reader, error) {
if cfg.UseILM && !cfg.UseReadWriteAliases {
return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping")
}
return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{
Client: client,
Logger: logger,
MetricsFactory: mFactory,
MaxDocCount: cfg.MaxDocCount,
MaxSpanAge: cfg.MaxSpanAge,
IndexPrefix: cfg.IndexPrefix,
Expand All @@ -162,15 +165,18 @@ func createSpanReader(
UseReadWriteAliases: cfg.UseReadWriteAliases,
Archive: archive,
RemoteReadClusters: cfg.RemoteReadClusters,
Logger: logger,
MetricsFactory: mFactory,
Tracer: tp.Tracer("esSpanStore.SpanReader"),
}), nil
}

func createSpanWriter(
mFactory metrics.Factory,
logger *zap.Logger,
client es.Client,
cfg *config.Configuration,
archive bool,
mFactory metrics.Factory,
logger *zap.Logger,
) (spanstore.Writer, error) {
var tags []string
var err error
Expand All @@ -197,8 +203,6 @@ func createSpanWriter(
}
writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{
Client: client,
Logger: logger,
MetricsFactory: mFactory,
IndexPrefix: cfg.IndexPrefix,
SpanIndexDateLayout: cfg.IndexDateLayoutSpans,
ServiceIndexDateLayout: cfg.IndexDateLayoutServices,
Expand All @@ -207,6 +211,8 @@ func createSpanWriter(
TagDotReplacement: cfg.Tags.DotReplacement,
Archive: archive,
UseReadWriteAliases: cfg.UseReadWriteAliases,
Logger: logger,
MetricsFactory: mFactory,
})

// Creating a template here would conflict with the one created for ILM resulting to no index rollover
Expand All @@ -220,9 +226,9 @@ func createSpanWriter(
}

func createDependencyReader(
logger *zap.Logger,
client es.Client,
cfg *config.Configuration,
logger *zap.Logger,
) (dependencystore.Reader, error) {
reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{
Client: client,
Expand Down
58 changes: 33 additions & 25 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ import (
"time"

"github.com/olivere/elastic"
"github.com/opentracing/opentracing-go"
ottag "github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
Expand Down Expand Up @@ -93,7 +92,6 @@ var (
// SpanReader can query for and load traces from ElasticSearch
type SpanReader struct {
client es.Client
logger *zap.Logger
// The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day,
// this will be rounded down to UTC 00:00 of that day.
maxSpanAge time.Duration
Expand All @@ -109,15 +107,15 @@ type SpanReader struct {
sourceFn sourceFn
maxDocCount int
useReadWriteAliases bool
logger *zap.Logger
tracer trace.Tracer
}

// SpanReaderParams holds constructor params for NewSpanReader
type SpanReaderParams struct {
Client es.Client
Logger *zap.Logger
MaxSpanAge time.Duration
MaxDocCount int
MetricsFactory metrics.Factory
IndexPrefix string
SpanIndexDateLayout string
ServiceIndexDateLayout string
Expand All @@ -127,6 +125,9 @@ type SpanReaderParams struct {
Archive bool
UseReadWriteAliases bool
RemoteReadClusters []string
MetricsFactory metrics.Factory
Logger *zap.Logger
Tracer trace.Tracer
}

// NewSpanReader returns a new SpanReader with a metrics.
Expand All @@ -139,7 +140,6 @@ func NewSpanReader(p SpanReaderParams) *SpanReader {
}
return &SpanReader{
client: p.Client,
logger: p.Logger,
maxSpanAge: maxSpanAge,
serviceOperationStorage: NewServiceOperationStorage(p.Client, p.Logger, 0), // the decorator takes care of metrics
spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex),
Expand All @@ -153,6 +153,8 @@ func NewSpanReader(p SpanReaderParams) *SpanReader {
sourceFn: getSourceFn(p.Archive, p.MaxDocCount),
maxDocCount: p.MaxDocCount,
useReadWriteAliases: p.UseReadWriteAliases,
logger: p.Logger,
tracer: p.Tracer,
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -238,8 +240,8 @@ func indexNames(prefix, index string) string {

// GetTrace takes a traceID and returns a Trace associated with that traceID
func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "GetTrace")
defer span.Finish()
ctx, span := s.tracer.Start(ctx, "GetTrace")
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
defer span.End()
currentTime := time.Now()
traces, err := s.multiRead(ctx, []model.TraceID{traceID}, currentTime.Add(-s.maxSpanAge), currentTime)
if err != nil {
Expand Down Expand Up @@ -283,8 +285,8 @@ func (s *SpanReader) unmarshalJSONSpan(esSpanRaw *elastic.SearchHit) (*dbmodel.S

// GetServices returns all services traced by Jaeger, ordered by frequency
func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "GetServices")
defer span.Finish()
ctx, span := s.tracer.Start(ctx, "GetService")
defer span.End()
currentTime := time.Now()
jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency)
return s.serviceOperationStorage.getServices(ctx, jaegerIndices, s.maxDocCount)
Expand All @@ -295,8 +297,8 @@ func (s *SpanReader) GetOperations(
ctx context.Context,
query spanstore.OperationQueryParameters,
) ([]spanstore.Operation, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "GetOperations")
defer span.Finish()
ctx, span := s.tracer.Start(ctx, "GetOperations")
defer span.End()
currentTime := time.Now()
jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency)
operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, s.maxDocCount)
Expand Down Expand Up @@ -329,8 +331,8 @@ func bucketToStringArray(buckets []*elastic.AggregationBucketKeyItem) ([]string,

// FindTraces retrieves traces that match the traceQuery
func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraces")
defer span.Finish()
ctx, span := s.tracer.Start(ctx, "FindTraces")
defer span.End()

uniqueTraceIDs, err := s.FindTraceIDs(ctx, traceQuery)
if err != nil {
Expand All @@ -341,8 +343,8 @@ func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.Trace

// FindTraceIDs retrieves traces IDs that match the traceQuery
func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraceIDs")
defer span.Finish()
ctx, span := s.tracer.Start(ctx, "FindTraceIDs")
defer span.End()

if err := validateQuery(traceQuery); err != nil {
return nil, err
Expand All @@ -360,9 +362,16 @@ func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.Tra
}

func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, startTime, endTime time.Time) ([]*model.Trace, error) {
childSpan, _ := opentracing.StartSpanFromContext(ctx, "multiRead")
childSpan.LogFields(otlog.Object("trace_ids", traceIDs))
defer childSpan.Finish()
ctx, childSpan := s.tracer.Start(ctx, "multiRead")
defer childSpan.End()

if childSpan.IsRecording() {
tracesIDs := make([]string, len(traceIDs))
for i, traceID := range traceIDs {
tracesIDs[i] = traceID.String()
}
childSpan.SetAttributes(attribute.Key("trace_ids").StringSlice(tracesIDs))
}

if len(traceIDs) == 0 {
return []*model.Trace{}, nil
Expand Down Expand Up @@ -503,8 +512,8 @@ func validateQuery(p *spanstore.TraceQueryParameters) error {
}

func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]string, error) {
childSpan, _ := opentracing.StartSpanFromContext(ctx, "findTraceIDs")
defer childSpan.Finish()
ctx, childSpan := s.tracer.Start(ctx, "findTraceIDs")
defer childSpan.End()
// Below is the JSON body to our HTTP GET request to ElasticSearch. This function creates this.
// {
// "size": 0,
Expand Down Expand Up @@ -686,7 +695,6 @@ func (s *SpanReader) buildObjectQuery(field string, k string, v string) elastic.
return elastic.NewBoolQuery().Must(keyQuery)
}

func logErrorToSpan(span opentracing.Span, err error) {
ottag.Error.Set(span, true)
span.LogFields(otlog.Error(err))
func logErrorToSpan(span trace.Span, err error) {
span.RecordError(err)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}
Loading