Skip to content

Commit

Permalink
Add in-memory storage support for adaptive sampling (#3335)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Oct 25, 2021
1 parent b53d901 commit e9bf6ed
Show file tree
Hide file tree
Showing 12 changed files with 294 additions and 6 deletions.
2 changes: 1 addition & 1 deletion plugin/sampling/strategystore/adaptive/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S
if err != nil {
return err
}
f.store, err = ssFactory.CreateSamplingStore()
f.store, err = ssFactory.CreateSamplingStore(f.options.AggregationBuckets)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/sampling/strategystore/adaptive/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (m *mockSamplingStoreFactory) CreateLock() (distributedlock.Lock, error) {

return mockLock, nil
}
func (m *mockSamplingStoreFactory) CreateSamplingStore() (samplingstore.Store, error) {
func (m *mockSamplingStoreFactory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) {
if m.storeFailsWith != nil {
return nil, m.storeFailsWith
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/sampling/strategystore/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,6 @@ type mockSamplingStoreFactory struct{}
func (m *mockSamplingStoreFactory) CreateLock() (distributedlock.Lock, error) {
return nil, nil
}
func (m *mockSamplingStoreFactory) CreateSamplingStore() (samplingstore.Store, error) {
func (m *mockSamplingStoreFactory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) {
return nil, nil
}
2 changes: 1 addition & 1 deletion plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (f *Factory) CreateLock() (distributedlock.Lock, error) {
}

// CreateSamplingStore implements storage.SamplingStoreFactory
func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) {
func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) {
return cSamplingStore.New(f.primarySession, f.primaryMetricsFactory, f.logger), nil
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestCassandraFactory(t *testing.T) {
_, err = f.CreateLock()
assert.NoError(t, err)

_, err = f.CreateSamplingStore()
_, err = f.CreateSamplingStore(0)
assert.NoError(t, err)

assert.NoError(t, f.Close())
Expand Down
12 changes: 12 additions & 0 deletions plugin/storage/memory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand Down Expand Up @@ -79,6 +81,16 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return f.store, nil
}

// CreateSamplingStore implements storage.SamplingStoreFactory
func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) {
return NewSamplingStore(maxBuckets), nil
}

// CreateLock implements storage.SamplingStoreFactory
func (f *Factory) CreateLock() (distributedlock.Lock, error) {
return &lock{}, nil
}

func (f *Factory) publishOpts() {
internalFactory := f.metricsFactory.Namespace(metrics.NSOptions{Name: "internal"})
internalFactory.Gauge(metrics.Options{Name: limit}).
Expand Down
6 changes: 6 additions & 0 deletions plugin/storage/memory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ func TestMemoryStorageFactory(t *testing.T) {
depReader, err := f.CreateDependencyReader()
assert.NoError(t, err)
assert.Equal(t, f.store, depReader)
samplingStore, err := f.CreateSamplingStore(2)
assert.NoError(t, err)
assert.Equal(t, 2, samplingStore.(*SamplingStore).maxBuckets)
lock, err := f.CreateLock()
assert.NoError(t, err)
assert.NotNil(t, lock)
}

func TestWithConfiguration(t *testing.T) {
Expand Down
29 changes: 29 additions & 0 deletions plugin/storage/memory/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import "time"

type lock struct{}

// Acquire always returns true for memory storage because it's a single-node
func (l *lock) Acquire(resource string, ttl time.Duration) (bool, error) {
return true, nil
}

// Forfeit always returns true for memory storage
func (l *lock) Forfeit(resource string) (bool, error) {
return true, nil
}
36 changes: 36 additions & 0 deletions plugin/storage/memory/lock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
"testing"
"time"

"github.com/crossdock/crossdock-go/assert"
)

func TestAcquire(t *testing.T) {
l := &lock{}
ok, err := l.Acquire("resource", time.Duration(1))
assert.True(t, ok)
assert.NoError(t, err)
}

func TestForfeit(t *testing.T) {
l := &lock{}
ok, err := l.Forfeit("resource")
assert.True(t, ok)
assert.NoError(t, err)
}
98 changes: 98 additions & 0 deletions plugin/storage/memory/sampling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
"sync"
"time"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
)

// SamplingStroe is an in-memory store for sampling data
type SamplingStore struct {
sync.RWMutex
throughputs []*storedThroughput
probabilitiesAndQPS *storedServiceOperationProbabilitiesAndQPS
maxBuckets int
}

type storedThroughput struct {
throughput []*model.Throughput
time time.Time
}

type storedServiceOperationProbabilitiesAndQPS struct {
hostname string
probabilities model.ServiceOperationProbabilities
qps model.ServiceOperationQPS
time time.Time
}

// NewSamplingStore creates an in-memory sampling store.
func NewSamplingStore(maxBuckets int) *SamplingStore {
return &SamplingStore{maxBuckets: maxBuckets}
}

// InsertThroughput implements samplingstore.Store#InsertThroughput.
func (ss *SamplingStore) InsertThroughput(throughput []*model.Throughput) error {
ss.Lock()
defer ss.Unlock()
now := time.Now()
ss.preprendThroughput(&storedThroughput{throughput, now})
return nil
}

// GetThroughput implements samplingstore.Store#GetThroughput.
func (ss *SamplingStore) GetThroughput(start, end time.Time) ([]*model.Throughput, error) {
ss.Lock()
defer ss.Unlock()
var retSlice []*model.Throughput
for _, t := range ss.throughputs {
if t.time.After(start) && (t.time.Before(end) || t.time.Equal(end)) {
retSlice = append(retSlice, t.throughput...)
}
}
return retSlice, nil
}

// InsertProbabilitiesAndQPS implements samplingstore.Store#InsertProbabilitiesAndQPS.
func (ss *SamplingStore) InsertProbabilitiesAndQPS(
hostname string,
probabilities model.ServiceOperationProbabilities,
qps model.ServiceOperationQPS,
) error {
ss.Lock()
defer ss.Unlock()
ss.probabilitiesAndQPS = &storedServiceOperationProbabilitiesAndQPS{hostname, probabilities, qps, time.Now()}
return nil
}

// GetLatestProbabilities implements samplingstore.Store#GetLatestProbabilities.
func (ss *SamplingStore) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) {
ss.Lock()
defer ss.Unlock()
if ss.probabilitiesAndQPS != nil {
return ss.probabilitiesAndQPS.probabilities, nil
}
return model.ServiceOperationProbabilities{}, nil
}

func (ss *SamplingStore) preprendThroughput(throughput *storedThroughput) {
ss.throughputs = append([]*storedThroughput{throughput}, ss.throughputs...)
if len(ss.throughputs) > ss.maxBuckets {
ss.throughputs = ss.throughputs[0:ss.maxBuckets]
}
}
107 changes: 107 additions & 0 deletions plugin/storage/memory/sampling_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memory

import (
"fmt"
"testing"
"time"

"github.com/crossdock/crossdock-go/assert"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
)

func withPopulatedSamplingStore(f func(samplingStore *SamplingStore)) {
now := time.Now()
millisAfter := now.Add(time.Millisecond * time.Duration(100))
secondsAfter := now.Add(time.Second * time.Duration(2))
throughputs := []*storedThroughput{
{[]*model.Throughput{{Service: "svc-1", Operation: "op-1", Count: 1}}, now},
{[]*model.Throughput{{Service: "svc-1", Operation: "op-2", Count: 1}}, millisAfter},
{[]*model.Throughput{{Service: "svc-2", Operation: "op-3", Count: 1}}, secondsAfter},
}
pQPS := &storedServiceOperationProbabilitiesAndQPS{
hostname: "guntur38ab8928", probabilities: model.ServiceOperationProbabilities{"svc-1": {"op-1": 0.01}}, qps: model.ServiceOperationQPS{"svc-1": {"op-1": 10.0}}, time: now}
samplingStore := &SamplingStore{throughputs: throughputs, probabilitiesAndQPS: pQPS}
f(samplingStore)
}

func withMemorySamplingStore(f func(samplingStore *SamplingStore)) {
f(NewSamplingStore(5))
}

func TestInsertThroughtput(t *testing.T) {
withMemorySamplingStore(func(samplingStore *SamplingStore) {
start := time.Now()
throughputs := []*model.Throughput{
{Service: "my-svc", Operation: "op"},
{Service: "our-svc", Operation: "op2"},
}
assert.NoError(t, samplingStore.InsertThroughput(throughputs))
ret, _ := samplingStore.GetThroughput(start, start.Add(time.Second*time.Duration(1)))
assert.Equal(t, 2, len(ret))

for i := 0; i < 10; i++ {
in := []*model.Throughput{
{Service: fmt.Sprint("svc-", i), Operation: fmt.Sprint("op-", i)},
}
samplingStore.InsertThroughput(in)
}
assert.Equal(t, 5, len(samplingStore.throughputs))
})
}

func TestGetThroughput(t *testing.T) {
withPopulatedSamplingStore(func(samplingStore *SamplingStore) {
start := time.Now()
ret, err := samplingStore.GetThroughput(start, start.Add(time.Second*time.Duration(1)))
assert.NoError(t, err)
assert.Equal(t, 1, len(ret))
ret1, _ := samplingStore.GetThroughput(start, start)
assert.Equal(t, 0, len(ret1))
ret2, _ := samplingStore.GetThroughput(start, start.Add(time.Hour*time.Duration(1)))
assert.Equal(t, 2, len(ret2))
})
}

func TestInsertProbabilitiesAndQPS(t *testing.T) {
withMemorySamplingStore(func(samplingStore *SamplingStore) {
assert.NoError(t, samplingStore.InsertProbabilitiesAndQPS("dell11eg843d", model.ServiceOperationProbabilities{"new-srv": {"op": 0.1}}, model.ServiceOperationQPS{"new-srv": {"op": 4}}))
assert.NotEmpty(t, 1, samplingStore.probabilitiesAndQPS)
// Only latest one is kept in memory
assert.NoError(t, samplingStore.InsertProbabilitiesAndQPS("lncol73", model.ServiceOperationProbabilities{"my-app": {"hello": 0.3}}, model.ServiceOperationQPS{"new-srv": {"op": 7}}))
assert.Equal(t, 0.3, samplingStore.probabilitiesAndQPS.probabilities["my-app"]["hello"])
})
}

func TestGetLatestProbability(t *testing.T) {
withMemorySamplingStore(func(samplingStore *SamplingStore) {
// No priod data
ret, err := samplingStore.GetLatestProbabilities()
assert.NoError(t, err)
assert.Empty(t, ret)
})

withPopulatedSamplingStore(func(samplingStore *SamplingStore) {
// With some pregenerated data
ret, err := samplingStore.GetLatestProbabilities()
assert.NoError(t, err)
assert.Equal(t, ret, model.ServiceOperationProbabilities{"svc-1": {"op-1": 0.01}})
assert.NoError(t, samplingStore.InsertProbabilitiesAndQPS("utfhyolf", model.ServiceOperationProbabilities{"another-service": {"hello": 0.009}}, model.ServiceOperationQPS{"new-srv": {"op": 5}}))
ret, _ = samplingStore.GetLatestProbabilities()
assert.NotEqual(t, ret, model.ServiceOperationProbabilities{"svc-1": {"op-1": 0.01}})
})
}
2 changes: 1 addition & 1 deletion storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type SamplingStoreFactory interface {
// CreateLock creates a distributed lock.
CreateLock() (distributedlock.Lock, error)
// CreateSamplingStore creates a sampling store.
CreateSamplingStore() (samplingstore.Store, error)
CreateSamplingStore(maxBuckets int) (samplingstore.Store, error)
}

var (
Expand Down

0 comments on commit e9bf6ed

Please sign in to comment.