Skip to content

Commit

Permalink
Use OpenTelemetry SDK in HotROD 🚗 (jaegertracing#4187)
Browse files Browse the repository at this point in the history
Based on earlier PR jaegertracing#3390 by @rbroggi. 

## Which problem is this PR solving?
- Resolves jaegertracing#3380

## Short description of the changes
- Switch from jaeger-client-go SDK to OTel SDK paired with ot-otel
bridge
- Add cli flag to select which Otel Exporter to use (Jaeger, OTLP or
stdout)
- Add favicon 🚗
- Added OTEL version of rpcmetrics from jaeger-client-go
- Remove dependency on github.com/uber/jaeger-lib in HotROD (addresses
one of outstanding tasks in jaegertracing#3766)

---------

Signed-off-by: rbroggi <ro_broggi@hotmail.com>
Signed-off-by: Yuri Shkuro <github@ysh.us>
Co-authored-by: rbroggi <ro_broggi@hotmail.com>
  • Loading branch information
2 people authored and shubbham1215 committed Feb 22, 2023
1 parent d8f2d65 commit 19bd501
Show file tree
Hide file tree
Showing 24 changed files with 816 additions and 68 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
go.work
go.work.sum

*.out
*.test
*.xml
Expand Down
3 changes: 1 addition & 2 deletions examples/hotrod/cmd/customer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/examples/hotrod/pkg/log"
"github.com/jaegertracing/jaeger/examples/hotrod/pkg/tracing"
"github.com/jaegertracing/jaeger/examples/hotrod/services/customer"
)

Expand All @@ -37,7 +36,7 @@ var customerCmd = &cobra.Command{
logger := log.NewFactory(zapLogger)
server := customer.NewServer(
net.JoinHostPort("0.0.0.0", strconv.Itoa(customerPort)),
tracing.Init("customer", metricsFactory, logger),
otelExporter,
metricsFactory,
logger,
)
Expand Down
3 changes: 1 addition & 2 deletions examples/hotrod/cmd/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/examples/hotrod/pkg/log"
"github.com/jaegertracing/jaeger/examples/hotrod/pkg/tracing"
"github.com/jaegertracing/jaeger/examples/hotrod/services/driver"
)

Expand All @@ -37,7 +36,7 @@ var driverCmd = &cobra.Command{
logger := log.NewFactory(zapLogger)
server := driver.NewServer(
net.JoinHostPort("0.0.0.0", strconv.Itoa(driverPort)),
tracing.Init("driver", metricsFactory, logger),
otelExporter,
metricsFactory,
logger,
)
Expand Down
2 changes: 1 addition & 1 deletion examples/hotrod/cmd/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var frontendCmd = &cobra.Command{
logger := log.NewFactory(zapLogger)
server := frontend.NewServer(
options,
tracing.Init("frontend", metricsFactory, logger),
tracing.Init("frontend", otelExporter, metricsFactory, logger),
logger,
)
return logError(zapLogger, server.Run())
Expand Down
13 changes: 8 additions & 5 deletions examples/hotrod/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@ import (
"time"

"github.com/spf13/cobra"
"github.com/uber/jaeger-lib/metrics"
jexpvar "github.com/uber/jaeger-lib/metrics/expvar"
jprom "github.com/uber/jaeger-lib/metrics/prometheus"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/jaegertracing/jaeger/examples/hotrod/services/config"
"github.com/jaegertracing/jaeger/internal/metrics/expvar"
"github.com/jaegertracing/jaeger/internal/metrics/prometheus"
"github.com/jaegertracing/jaeger/pkg/metrics"
)

var (
metricsBackend string
logger *zap.Logger
metricsFactory metrics.Factory
otelExporter string // jaeger, otlp, stdout

fixDBConnDelay time.Duration
fixDBConnDisableMutex bool
Expand Down Expand Up @@ -66,6 +67,8 @@ func Execute() {

func init() {
RootCmd.PersistentFlags().StringVarP(&metricsBackend, "metrics", "m", "expvar", "Metrics backend (expvar|prometheus)")
RootCmd.PersistentFlags().StringVarP(&otelExporter, "otel-exporter", "x", "jaeger", "OpenTelemetry exporter (jaeger|otlp|stdout)")

RootCmd.PersistentFlags().DurationVarP(&fixDBConnDelay, "fix-db-query-delay", "D", 300*time.Millisecond, "Average latency of MySQL DB query")
RootCmd.PersistentFlags().BoolVarP(&fixDBConnDisableMutex, "fix-disable-db-conn-mutex", "M", false, "Disables the mutex guarding db connection")
RootCmd.PersistentFlags().IntVarP(&fixRouteWorkerPoolSize, "fix-route-worker-pool-size", "W", 3, "Default worker pool size")
Expand All @@ -92,10 +95,10 @@ func init() {
func onInitialize() {
switch metricsBackend {
case "expvar":
metricsFactory = jexpvar.NewFactory(10) // 10 buckets for histograms
metricsFactory = expvar.NewFactory(10) // 10 buckets for histograms
logger.Info("Using expvar as metrics backend")
case "prometheus":
metricsFactory = jprom.New().Namespace(metrics.NSOptions{Name: "hotrod", Tags: nil})
metricsFactory = prometheus.New().Namespace(metrics.NSOptions{Name: "hotrod", Tags: nil})
logger.Info("Using Prometheus as metrics backend")
default:
logger.Fatal("unsupported metrics backend " + metricsBackend)
Expand Down
2 changes: 1 addition & 1 deletion examples/hotrod/cmd/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var routeCmd = &cobra.Command{
logger := log.NewFactory(zapLogger)
server := route.NewServer(
net.JoinHostPort("0.0.0.0", strconv.Itoa(routePort)),
tracing.Init("route", metricsFactory, logger),
tracing.Init("route", otelExporter, metricsFactory, logger),
logger,
)
return logError(zapLogger, server.Run())
Expand Down
14 changes: 7 additions & 7 deletions examples/hotrod/pkg/log/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package log
import (
"context"

"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
ot "github.com/opentracing/opentracing-go"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand All @@ -44,13 +44,13 @@ func (b Factory) Bg() Logger {
// contains an OpenTracing span, all logging calls are also
// echo-ed into the span.
func (b Factory) For(ctx context.Context) Logger {
if span := opentracing.SpanFromContext(ctx); span != nil {
logger := spanLogger{span: span, logger: b.logger}
if otSpan := ot.SpanFromContext(ctx); otSpan != nil {
logger := spanLogger{span: otSpan, logger: b.logger}

if jaegerCtx, ok := span.Context().(jaeger.SpanContext); ok {
if otelSpan := trace.SpanFromContext(ctx); otelSpan != nil {
logger.spanFields = []zapcore.Field{
zap.String("trace_id", jaegerCtx.TraceID().String()),
zap.String("span_id", jaegerCtx.SpanID().String()),
zap.String("trace_id", otelSpan.SpanContext().TraceID().String()),
zap.String("span_id", otelSpan.SpanContext().SpanID().String()),
}
}

Expand Down
91 changes: 55 additions & 36 deletions examples/hotrod/pkg/tracing/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,56 +16,75 @@
package tracing

import (
"context"
"fmt"
"time"
"sync"

"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"github.com/uber/jaeger-client-go/rpcmetrics"
"github.com/uber/jaeger-lib/metrics"
"go.opentelemetry.io/otel"
otbridge "go.opentelemetry.io/otel/bridge/opentracing"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/examples/hotrod/pkg/log"
"github.com/jaegertracing/jaeger/examples/hotrod/pkg/tracing/rpcmetrics"
"github.com/jaegertracing/jaeger/pkg/metrics"
)

// Init creates a new instance of Jaeger tracer.
func Init(serviceName string, metricsFactory metrics.Factory, logger log.Factory) opentracing.Tracer {
cfg := &config.Configuration{
Sampler: &config.SamplerConfig{},
}
cfg.ServiceName = serviceName
cfg.Sampler.Type = "const"
cfg.Sampler.Param = 1
var once sync.Once

_, err := cfg.FromEnv()
if err != nil {
logger.Bg().Fatal("cannot parse Jaeger env vars", zap.Error(err))
}
// Init initializes OpenTelemetry SDK and uses OTel-OpenTracing Bridge
// to return an OpenTracing-compatible tracer.
func Init(serviceName string, exporterType string, metricsFactory metrics.Factory, logger log.Factory) opentracing.Tracer {
once.Do(func() {
otel.SetTextMapPropagator(propagation.TraceContext{})
})

// TODO(ys) a quick hack to ensure random generators get different seeds, which are based on current time.
time.Sleep(100 * time.Millisecond)
jaegerLogger := jaegerLoggerAdapter{logger.Bg()}

metricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: serviceName, Tags: nil})
tracer, _, err := cfg.NewTracer(
config.Logger(jaegerLogger),
config.Metrics(metricsFactory),
config.Observer(rpcmetrics.NewObserver(metricsFactory, rpcmetrics.DefaultNameNormalizer)),
)
exp, err := createOtelExporter(exporterType)
if err != nil {
logger.Bg().Fatal("cannot initialize Jaeger Tracer", zap.Error(err))
logger.Bg().Fatal("cannot create exporter", zap.String("exporterType", exporterType), zap.Error(err))
}
return tracer
}
logger.Bg().Info("using " + exporterType + " trace exporter")

type jaegerLoggerAdapter struct {
logger log.Logger
}
rpcmetricsObserver := rpcmetrics.NewObserver(metricsFactory, rpcmetrics.DefaultNameNormalizer)

func (l jaegerLoggerAdapter) Error(msg string) {
l.logger.Error(msg)
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithSpanProcessor(rpcmetricsObserver),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
)),
)
otTracer, _ := otbridge.NewTracerPair(tp.Tracer(""))
logger.Bg().Info("created OTEL->OT brige", zap.String("service-name", serviceName))
return otTracer
}

func (l jaegerLoggerAdapter) Infof(msg string, args ...interface{}) {
l.logger.Info(fmt.Sprintf(msg, args...))
func createOtelExporter(exporterType string) (sdktrace.SpanExporter, error) {
var exporter sdktrace.SpanExporter
var err error
switch exporterType {
case "jaeger":
exporter, err = jaeger.New(
jaeger.WithCollectorEndpoint(),
)
case "otlp":
client := otlptracehttp.NewClient(
otlptracehttp.WithInsecure(),
)
exporter, err = otlptrace.New(context.Background(), client)
case "stdout":
exporter, err = stdouttrace.New()
default:
return nil, fmt.Errorf("unrecognized exporter type %s", exporterType)
}
return exporter, err
}
3 changes: 3 additions & 0 deletions examples/hotrod/pkg/tracing/rpcmetrics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Package rpcmetrics implements an OpenTelemetry SpanProcessor that can be used to emit RPC metrics.

This package is copied from jaeger-client-go and adapted to work with OpenTelemtery SDK.
63 changes: 63 additions & 0 deletions examples/hotrod/pkg/tracing/rpcmetrics/endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2023 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rpcmetrics

import "sync"

// normalizedEndpoints is a cache for endpointName -> safeName mappings.
type normalizedEndpoints struct {
names map[string]string
maxSize int
normalizer NameNormalizer
mux sync.RWMutex
}

func newNormalizedEndpoints(maxSize int, normalizer NameNormalizer) *normalizedEndpoints {
return &normalizedEndpoints{
maxSize: maxSize,
normalizer: normalizer,
names: make(map[string]string, maxSize),
}
}

// normalize looks up the name in the cache, if not found it uses normalizer
// to convert the name to a safe name. If called with more than maxSize unique
// names it returns "" for all other names beyond those already cached.
func (n *normalizedEndpoints) normalize(name string) string {
n.mux.RLock()
norm, ok := n.names[name]
l := len(n.names)
n.mux.RUnlock()
if ok {
return norm
}
if l >= n.maxSize {
return ""
}
return n.normalizeWithLock(name)
}

func (n *normalizedEndpoints) normalizeWithLock(name string) string {
norm := n.normalizer.Normalize(name)
n.mux.Lock()
defer n.mux.Unlock()
// cache may have grown while we were not holding the lock
if len(n.names) >= n.maxSize {
return ""
}
n.names[name] = norm
return norm
}
44 changes: 44 additions & 0 deletions examples/hotrod/pkg/tracing/rpcmetrics/endpoints_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2023 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rpcmetrics

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestNormalizedEndpoints(t *testing.T) {
n := newNormalizedEndpoints(1, DefaultNameNormalizer)

assertLen := func(l int) {
n.mux.RLock()
defer n.mux.RUnlock()
assert.Len(t, n.names, l)
}

assert.Equal(t, "ab_cd", n.normalize("ab^cd"), "one translation")
assert.Equal(t, "ab_cd", n.normalize("ab^cd"), "cache hit")
assertLen(1)
assert.Equal(t, "", n.normalize("xys"), "cache overflow")
assertLen(1)
}

func TestNormalizedEndpointsDoubleLocking(t *testing.T) {
n := newNormalizedEndpoints(1, DefaultNameNormalizer)
assert.Equal(t, "ab_cd", n.normalize("ab^cd"), "fill out the cache")
assert.Equal(t, "", n.normalizeWithLock("xys"), "cache overflow")
}
Loading

0 comments on commit 19bd501

Please sign in to comment.