Skip to content

Commit

Permalink
YAAAAS - Yet Another Attempt At Adaptive Sampling (#2966)
Browse files Browse the repository at this point in the history
* Original PR

Signed-off-by: Joe Elliott <number101010@gmail.com>
Co-authored-by: Ashmita Bohara ashmita.bohara152@gmail.com

* cleaned up tests and config factory logic

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Wire up lock/samplingstore

Signed-off-by: Joe Elliott <number101010@gmail.com>

* changelog

Signed-off-by: Joe Elliott <number101010@gmail.com>

* lint

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added method for conditionally instantiating the lock and sampling store

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Additional tests

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Passed appropriate settings and added override hostname

Signed-off-by: Joe Elliott <number101010@gmail.com>

* pass hostname to lock

Signed-off-by: Joe Elliott <number101010@gmail.com>

* First pass cassandra tables

Signed-off-by: Joe Elliott <number101010@gmail.com>

* schema cleanup. start electionParticipant

Signed-off-by: Joe Elliott <number101010@gmail.com>

* revert cqlsh path

Signed-off-by: Joe Elliott <number101010@gmail.com>

* compose updates

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Pass in processspan

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Rearranged hotrod startup to allow env override

Signed-off-by: Joe Elliott <number101010@gmail.com>

* call aggregator.Stop()

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Made aggregator make sense

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added additional processors test

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added CreateLockAndSamplingStore tests

Signed-off-by: Joe Elliott <number101010@gmail.com>

* remove overrideHostname in favor of a short random postfix

Signed-off-by: Joe Elliott <number101010@gmail.com>

* added hostname

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Update storage/factory.go

Co-authored-by: Juraci Paixão Kröhling <juraci.github@kroehling.de>
Signed-off-by: Joe Elliott <number101010@gmail.com>

* lint really wants crypto/rand

Signed-off-by: Joe Elliott <number101010@gmail.com>

* logger to last param

Signed-off-by: Joe Elliott <number101010@gmail.com>

* io.Closer

Signed-off-by: Joe Elliott <number101010@gmail.com>

* log error on aggregator close

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added mocks to test that the right values are returned

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Improved unique name logging

Signed-off-by: Joe Elliott <number101010@gmail.com>

* make fmt

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Additional testing

Signed-off-by: Joe Elliott <number101010@gmail.com>

* lint

Signed-off-by: Joe Elliott <number101010@gmail.com>

* fix test

Signed-off-by: Joe Elliott <number101010@gmail.com>

* tests

Signed-off-by: Joe Elliott <number101010@gmail.com>

* getParam => samplerParamToFloat

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Removed requirelockandsamplingstore

Signed-off-by: Joe Elliott <number101010@gmail.com>

* use constants for constants

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Split out sampling store interface

Signed-off-by: Joe Elliott <number101010@gmail.com>

* lint

Signed-off-by: Joe Elliott <number101010@gmail.com>

* tests

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Move changelog entry

Signed-off-by: Yuri Shkuro <github@ysh.us>

Co-authored-by: Juraci Paixão Kröhling <juraci.github@kroehling.de>
Co-authored-by: Juraci Paixão Kröhling <juraci@kroehling.de>
Co-authored-by: Yuri Shkuro <yurishkuro@users.noreply.github.com>
Co-authored-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
5 people committed Sep 8, 2021
1 parent 6685913 commit e345aa7
Show file tree
Hide file tree
Showing 42 changed files with 1,010 additions and 401 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changes by Version
==================

next release
-------------------
### Backend Changes
#### New Features
* Add support for adaptive sampling with a Cassandra backend. ([#2966](https://github.com/jaegertracing/jaeger/pull/2966), [@joe-elliott](https://github.com/joe-elliott))


1.26.0 (2021-09-06)
-------------------
### Backend Changes
Expand Down Expand Up @@ -70,6 +77,8 @@ Changes by Version
-------------------
### Backend Changes

#### New Features

#### Breaking Changes

* Remove unused `--es-archive.max-span-age` flag ([#2865](https://github.com/jaegertracing/jaeger/pull/2865), [@albertteoh](https://github.com/albertteoh)):
Expand Down
16 changes: 13 additions & 3 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ func main() {
if err != nil {
log.Fatalf("Cannot initialize storage factory: %v", err)
}
strategyStoreFactory, err := ss.NewFactory(ss.FactoryConfigFromEnv())
strategyStoreFactoryConfig, err := ss.FactoryConfigFromEnv()
if err != nil {
log.Fatalf("Cannot initialize sampling strategy store factory config: %v", err)
}
strategyStoreFactory, err := ss.NewFactory(*strategyStoreFactoryConfig)
if err != nil {
log.Fatalf("Cannot initialize sampling strategy store factory: %v", err)
}
Expand Down Expand Up @@ -121,11 +125,16 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to create metrics reader", zap.Error(err))
}

ssFactory, err := storageFactory.CreateSamplingStoreFactory()
if err != nil {
logger.Fatal("Failed to create sampling store factory", zap.Error(err))
}

strategyStoreFactory.InitFromViper(v, logger)
if err := strategyStoreFactory.Initialize(metricsFactory, logger); err != nil {
if err := strategyStoreFactory.Initialize(metricsFactory, ssFactory, logger); err != nil {
logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err))
}
strategyStore, err := strategyStoreFactory.CreateStrategyStore()
strategyStore, aggregator, err := strategyStoreFactory.CreateStrategyStore()
if err != nil {
logger.Fatal("Failed to create sampling strategy store", zap.Error(err))
}
Expand All @@ -143,6 +152,7 @@ by default uses only in-memory database.`,
MetricsFactory: metricsFactory,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
Aggregator: aggregator,
HealthCheck: svc.HC(),
})
if err := c.Start(cOpts); err != nil {
Expand Down
17 changes: 16 additions & 1 deletion cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Collector struct {
metricsFactory metrics.Factory
spanWriter spanstore.Writer
strategyStore strategystore.StrategyStore
aggregator strategystore.Aggregator
hCheck *healthcheck.HealthCheck
spanProcessor processor.SpanProcessor
spanHandlers *SpanHandlers
Expand All @@ -59,6 +60,7 @@ type CollectorParams struct {
MetricsFactory metrics.Factory
SpanWriter spanstore.Writer
StrategyStore strategystore.StrategyStore
Aggregator strategystore.Aggregator
HealthCheck *healthcheck.HealthCheck
}

Expand All @@ -70,6 +72,7 @@ func New(params *CollectorParams) *Collector {
metricsFactory: params.MetricsFactory,
spanWriter: params.SpanWriter,
strategyStore: params.StrategyStore,
aggregator: params.Aggregator,
hCheck: params.HealthCheck,
}
}
Expand All @@ -83,7 +86,12 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
MetricsFactory: c.metricsFactory,
}

c.spanProcessor = handlerBuilder.BuildSpanProcessor()
var additionalProcessors []ProcessSpan
if c.aggregator != nil {
additionalProcessors = append(additionalProcessors, handleRootSpan(c.aggregator, c.logger))
}

c.spanProcessor = handlerBuilder.BuildSpanProcessor(additionalProcessors...)
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor)

grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
Expand Down Expand Up @@ -169,6 +177,13 @@ func (c *Collector) Close() error {
c.logger.Error("failed to close span processor.", zap.Error(err))
}

// aggregator does not exist for all strategy stores. only Close() if exists.
if c.aggregator != nil {
if err := c.aggregator.Close(); err != nil {
c.logger.Error("failed to close aggregator.", zap.Error(err))
}
}

// watchers actually never return errors from Close
_ = c.tlsGRPCCertWatcherCloser.Close()
_ = c.tlsHTTPCertWatcherCloser.Close()
Expand Down
59 changes: 59 additions & 0 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)
Expand Down Expand Up @@ -98,3 +100,60 @@ func TestCollector_PublishOpts(t *testing.T) {
Value: 42,
})
}

func TestAggregator(t *testing.T) {
// prepare
hc := healthcheck.New()
logger := zap.NewNop()
baseMetrics := metricstest.NewFactory(time.Hour)
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
agg := &mockAggregator{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
Aggregator: agg,
})
collectorOpts := &CollectorOptions{
QueueSize: 10,
NumWorkers: 10,
}

// test
c.Start(collectorOpts)

// assert that aggregator was added to the collector
_, err := c.spanProcessor.ProcessSpans([]*model.Span{
{
OperationName: "y",
Process: &model.Process{
ServiceName: "x",
},
Tags: []model.KeyValue{
{
Key: "sampler.type",
VStr: "probabilistic",
},
{
Key: "sampler.param",
VStr: "1",
},
},
},
}, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})
assert.NoError(t, err)

// verify
assert.NoError(t, c.Close())

// assert that aggregator was used
assert.Equal(t, 1, agg.callCount)

// assert that aggregator close was called
assert.Equal(t, 1, agg.closeCount)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package adaptive
package app

import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/model"
)

// HandleRootSpan returns a function that records throughput for root spans
func HandleRootSpan(aggregator Aggregator, logger *zap.Logger) app.ProcessSpan {
// handleRootSpan returns a function that records throughput for root spans
func handleRootSpan(aggregator strategystore.Aggregator, logger *zap.Logger) ProcessSpan {
return func(span *model.Span) {
// TODO simply checking parentId to determine if a span is a root span is not sufficient. However,
// we can be sure that only a root span will have sampler tags.
Expand All @@ -33,7 +33,7 @@ func HandleRootSpan(aggregator Aggregator, logger *zap.Logger) app.ProcessSpan {
if service == "" || span.OperationName == "" {
return
}
samplerType, samplerParam := GetSamplerParams(span, logger)
samplerType, samplerParam := span.GetSamplerParams(logger)
if samplerType == "" {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package adaptive
package app

import (
"testing"
Expand All @@ -24,18 +24,22 @@ import (
)

type mockAggregator struct {
callCount int
callCount int
closeCount int
}

func (t *mockAggregator) RecordThroughput(service, operation, samplerType string, probability float64) {
t.callCount++
}
func (t *mockAggregator) Start() {}
func (t *mockAggregator) Stop() {}
func (t *mockAggregator) Close() error {
t.closeCount++
return nil
}

func TestHandleRootSpan(t *testing.T) {
aggregator := &mockAggregator{}
processor := HandleRootSpan(aggregator, zap.NewNop())
processor := handleRootSpan(aggregator, zap.NewNop())

// Testing non-root span
span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}}
Expand Down
6 changes: 4 additions & 2 deletions cmd/collector/app/sampling/strategystore/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package strategystore
import (
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/storage"
)

// Factory defines an interface for a factory that can create implementations of different strategy storage components.
Expand All @@ -27,8 +29,8 @@ import (
// plugin.Configurable
type Factory interface {
// Initialize performs internal initialization of the factory.
Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error
Initialize(metricsFactory metrics.Factory, ssFactory storage.SamplingStoreFactory, logger *zap.Logger) error

// CreateStrategyStore initializes the StrategyStore and returns it.
CreateStrategyStore() (StrategyStore, error)
CreateStrategyStore() (StrategyStore, Aggregator, error)
}
13 changes: 13 additions & 0 deletions cmd/collector/app/sampling/strategystore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package strategystore

import (
"context"
"io"

"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)
Expand All @@ -25,3 +26,15 @@ type StrategyStore interface {
// GetSamplingStrategy retrieves the sampling strategy for the specified service.
GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error)
}

// Aggregator defines an interface used to aggregate operation throughput.
type Aggregator interface {
// Close() from io.Closer stops the aggregator from aggregating throughput.
io.Closer

// RecordThroughput records throughput for an operation for aggregation.
RecordThroughput(service, operation, samplerType string, probability float64)

// Start starts aggregating operation throughput.
Start()
}
4 changes: 2 additions & 2 deletions cmd/collector/app/span_handler_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ type SpanHandlers struct {
}

// BuildSpanProcessor builds the span processor to be used with the handlers
func (b *SpanHandlerBuilder) BuildSpanProcessor() processor.SpanProcessor {
func (b *SpanHandlerBuilder) BuildSpanProcessor(additional ...ProcessSpan) processor.SpanProcessor {
hostname, _ := os.Hostname()
svcMetrics := b.metricsFactory()
hostMetrics := svcMetrics.Namespace(metrics.NSOptions{Tags: map[string]string{"host": hostname}})

return NewSpanProcessor(
b.SpanWriter,
additional,
Options.ServiceMetrics(svcMetrics),
Options.HostMetrics(hostMetrics),
Options.Logger(b.logger()),
Expand All @@ -61,7 +62,6 @@ func (b *SpanHandlerBuilder) BuildSpanProcessor() processor.SpanProcessor {
Options.DynQueueSizeWarmup(uint(b.CollectorOpts.QueueSize)), // same as queue size for now
Options.DynQueueSizeMemory(b.CollectorOpts.DynQueueSizeMemory),
)

}

// BuildHandlers builds span handlers (Zipkin, Jaeger)
Expand Down
7 changes: 5 additions & 2 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ type queueItem struct {
// NewSpanProcessor returns a SpanProcessor that preProcesses, filters, queues, sanitizes, and processes spans
func NewSpanProcessor(
spanWriter spanstore.Writer,
additional []ProcessSpan,
opts ...Option,
) processor.SpanProcessor {
sp := newSpanProcessor(spanWriter, opts...)
sp := newSpanProcessor(spanWriter, additional, opts...)

sp.queue.StartConsumers(sp.numWorkers, func(item interface{}) {
value := item.(*queueItem)
Expand All @@ -84,7 +85,7 @@ func NewSpanProcessor(
return sp
}

func newSpanProcessor(spanWriter spanstore.Writer, opts ...Option) *spanProcessor {
func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opts ...Option) *spanProcessor {
options := Options.apply(opts...)
handlerMetrics := NewSpanProcessorMetrics(
options.serviceMetrics,
Expand Down Expand Up @@ -122,6 +123,8 @@ func newSpanProcessor(spanWriter spanstore.Writer, opts ...Option) *spanProcesso
processSpanFuncs = append(processSpanFuncs, sp.countSpan)
}

processSpanFuncs = append(processSpanFuncs, additional...)

sp.processSpan = ChainedProcessSpan(processSpanFuncs...)
return &sp
}
Expand Down
Loading

0 comments on commit e345aa7

Please sign in to comment.