Skip to content

Commit

Permalink
[jaeger-v2] Add Cassandra e2e integration tests (#5398)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
-  part of #5254 

## Description of the changes
- added cassandra integration tests
- added method to purge cassandra storage

## How was this change tested?
- some tests are failing

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Harshvir Potpose <hpotpose62@gmail.com>
Signed-off-by: Harshvir Potpose <122517264+akagami-harsh@users.noreply.github.com>
Signed-off-by: Yuri Shkuro <yurishkuro@users.noreply.github.com>
Signed-off-by: Yuri Shkuro <github@ysh.us>
Co-authored-by: Yuri Shkuro <yurishkuro@users.noreply.github.com>
Co-authored-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
3 people authored May 5, 2024
1 parent 13cbaed commit 4fed4f4
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 84 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/ci-cassandra.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
jaeger-version: [v1, v2]
version:
- distribution: cassandra
major: 3.x
Expand All @@ -29,7 +30,7 @@ jobs:
major: 4.x
image: 4.0
schema: v004
name: ${{ matrix.version.distribution }} ${{ matrix.version.major }}
name: ${{ matrix.version.distribution }} ${{ matrix.version.major }} ${{ matrix.jaeger-version }}
steps:
- name: Harden Runner
uses: step-security/harden-runner@63c24ba6bd7ba022e95695ff85de572c04a18142 # v2.7.0
Expand All @@ -43,10 +44,10 @@ jobs:
go-version: 1.22.x

- name: Run cassandra integration tests
run: bash scripts/cassandra-integration-test.sh ${{ matrix.version.image }} ${{ matrix.version.schema }}
run: bash scripts/cassandra-integration-test.sh ${{ matrix.version.image }} ${{ matrix.version.schema }} ${{ matrix.jaeger-version }}

- name: Upload coverage to codecov
uses: ./.github/actions/upload-codecov
with:
file: cover.out
flags: cassandra-${{ matrix.version.major }}
flags: cassandra-${{ matrix.version.major }}-${{ matrix.jaeger-version }}
16 changes: 14 additions & 2 deletions cmd/jaeger/config-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,27 @@ extensions:
cassandra_main:
servers: 127.0.0.1
port: 9042
keyspace: "jaeger_v1_dc1"
connections_per_host: 2
index:
tags: true
logs: true
process_tags: true
cassandra_archive:
servers: 127.0.0.1
port: 9042
keyspace: "jaeger_v1_dc1"
connections_per_host: 2
index:
tags: true
logs: true
process_tags: true
receivers:
otlp:
protocols:
grpc:
http:

jaeger:
protocols:
grpc:
Expand All @@ -38,4 +50,4 @@ processors:

exporters:
jaeger_storage_exporter:
trace_storage: cassandra_main
trace_storage: cassandra_main
14 changes: 7 additions & 7 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@ import (
"fmt"
"reflect"

cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
)

// Config has the configuration for jaeger-query,
type Config struct {
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"`
Opensearch map[string]esCfg.Configuration `mapstructure:"opensearch"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
Cassandra map[string]cassandraCfg.Configuration `mapstructure:"cassandra"`
Memory map[string]memoryCfg.Configuration `mapstructure:"memory"`
Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"`
GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"`
Opensearch map[string]esCfg.Configuration `mapstructure:"opensearch"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
Cassandra map[string]cassandra.Options `mapstructure:"cassandra"`
// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
// Option: instead of looking for specific name, check interface.
Expand Down
3 changes: 1 addition & 2 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"go.opentelemetry.io/collector/extension"
"go.uber.org/zap"

cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
esCfg "github.com/jaegertracing/jaeger/pkg/es/config"
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -136,7 +135,7 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
cfg: s.config.Opensearch,
builder: es.NewFactoryWithConfig,
}
cassandraStarter := &starter[cassandraCfg.Configuration, *cassandra.Factory]{
cassandraStarter := &starter[cassandra.Options, *cassandra.Factory]{
ext: s,
storageKind: "cassandra",
cfg: s.config.Cassandra,
Expand Down
30 changes: 30 additions & 0 deletions cmd/jaeger/internal/integration/cassandra_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package integration

import (
"testing"

"github.com/jaegertracing/jaeger/plugin/storage/integration"
)

func TestCassandraStorage(t *testing.T) {
integration.SkipUnlessEnv(t, "cassandra")
s := &E2EStorageIntegration{
ConfigFile: "../../config-cassandra.yaml",
StorageIntegration: integration.StorageIntegration{
CleanUp: purge,
GetDependenciesReturnsSource: true,
SkipArchiveTest: true,
SkipBinaryAttrs: true,

SkipList: integration.CassandraSkippedTests,
},
}
s.e2eInitialize(t, "cassandra")
t.Cleanup(func() {
s.e2eCleanUp(t)
})
s.RunSpanStoreTests(t)
}
12 changes: 6 additions & 6 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ import (
// Configuration describes the configuration properties needed to connect to a Cassandra cluster
type Configuration struct {
Servers []string `valid:"required,url" mapstructure:"servers"`
Keyspace string `valid:"nonzero" mapstructure:"keyspace"`
Keyspace string `mapstructure:"keyspace"`
LocalDC string `mapstructure:"local_dc"`
ConnectionsPerHost int `valid:"min=1" mapstructure:"connections_per_host"`
Timeout time.Duration `valid:"min=500" mapstructure:"-"`
ConnectionsPerHost int `mapstructure:"connections_per_host"`
Timeout time.Duration `mapstructure:"-"`
ConnectTimeout time.Duration `mapstructure:"connection_timeout"`
ReconnectInterval time.Duration `valid:"min=500" mapstructure:"reconnect_interval"`
SocketKeepAlive time.Duration `valid:"min=0" mapstructure:"socket_keep_alive"`
MaxRetryAttempts int `valid:"min=0" mapstructure:"max_retry_attempts"`
ReconnectInterval time.Duration `mapstructure:"reconnect_interval"`
SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"`
MaxRetryAttempts int `mapstructure:"max_retry_attempts"`
ProtoVersion int `mapstructure:"proto_version"`
Consistency string `mapstructure:"consistency"`
DisableCompression bool `mapstructure:"disable_compression"`
Expand Down
42 changes: 31 additions & 11 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (

var ( // interface comformance checks
_ storage.Factory = (*Factory)(nil)
_ storage.Purger = (*Factory)(nil)
_ storage.ArchiveFactory = (*Factory)(nil)
_ storage.SamplingStoreFactory = (*Factory)(nil)
_ io.Closer = (*Factory)(nil)
Expand Down Expand Up @@ -81,20 +82,40 @@ func NewFactory() *Factory {

// NewFactoryWithConfig initializes factory with Config.
func NewFactoryWithConfig(
cfg config.Configuration,
cfg Options,
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
if err := cfg.Validate(); err != nil {
f := NewFactory()
// use this to help with testing
b := &withConfigBuilder{
f: f,
cfg: &cfg,
metricsFactory: metricsFactory,
logger: logger,
initializer: f.Initialize, // this can be mocked in tests
}
return b.build()
}

type withConfigBuilder struct {
f *Factory
cfg *Options
metricsFactory metrics.Factory
logger *zap.Logger
initializer func(metricsFactory metrics.Factory, logger *zap.Logger) error
}

func (b *withConfigBuilder) build() (*Factory, error) {
b.f.InitFromOptions(b.cfg)
if err := b.cfg.Primary.Validate(); err != nil {
return nil, err
}
f := NewFactory()
f.primaryConfig = &cfg
err := f.Initialize(metricsFactory, logger)
err := b.initializer(b.metricsFactory, b.logger)
if err != nil {
return nil, err
}
return f, nil
return b.f, nil
}

// AddFlags implements plugin.Configurable
Expand All @@ -114,6 +135,10 @@ func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) {
// InitFromOptions initializes factory from options.
func (f *Factory) InitFromOptions(o *Options) {
f.Options = o
// TODO this is a hack because we do not define defaults in Options
if o.others == nil {
o.others = make(map[string]*namespaceConfig)
}
f.primaryConfig = o.GetPrimary()
if cfg := f.Options.Get(archiveStorageConfig); cfg != nil {
f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error
Expand Down Expand Up @@ -248,11 +273,6 @@ func (f *Factory) Close() error {
return errors.Join(errs...)
}

// PrimarySession is used from integration tests to clean database between tests
func (f *Factory) PrimarySession() cassandra.Session {
return f.primarySession
}

func (f *Factory) Purge(_ context.Context) error {
return f.primarySession.Query("TRUNCATE traces").Exec()
}
93 changes: 63 additions & 30 deletions plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package cassandra

import (
"context"
"errors"
"testing"

Expand Down Expand Up @@ -200,35 +201,67 @@ func TestInitFromOptions(t *testing.T) {
assert.Equal(t, o.Get(archiveStorageConfig), f.archiveConfig)
}

func TestConfigurationValidation(t *testing.T) {
testCases := []struct {
name string
cfg cassandraCfg.Configuration
wantErr bool
}{
{
name: "valid configuration",
cfg: cassandraCfg.Configuration{
Servers: []string{"http://localhost:9200"},
func TestNewFactoryWithConfig(t *testing.T) {
t.Run("valid configuration", func(t *testing.T) {
cfg := Options{
Primary: namespaceConfig{
Configuration: cassandraCfg.Configuration{
Servers: []string{"localhost:9200"},
},
},
wantErr: false,
},
{
name: "missing servers",
cfg: cassandraCfg.Configuration{},
wantErr: true,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
err := test.cfg.Validate()
if test.wantErr {
require.Error(t, err)
_, err = NewFactoryWithConfig(test.cfg, metrics.NullFactory, zap.NewNop())
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
f := NewFactory()
b := &withConfigBuilder{
f: f,
cfg: &cfg,
metricsFactory: metrics.NullFactory,
logger: zap.NewNop(),
initializer: func(metricsFactory metrics.Factory, logger *zap.Logger) error { return nil },
}
_, err := b.build()
require.NoError(t, err)
})
t.Run("connection error", func(t *testing.T) {
expErr := errors.New("made-up error")
cfg := Options{
Primary: namespaceConfig{
Configuration: cassandraCfg.Configuration{
Servers: []string{"localhost:9200"},
},
},
}
f := NewFactory()
b := &withConfigBuilder{
f: f,
cfg: &cfg,
metricsFactory: metrics.NullFactory,
logger: zap.NewNop(),
initializer: func(metricsFactory metrics.Factory, logger *zap.Logger) error { return expErr },
}
_, err := b.build()
require.ErrorIs(t, err, expErr)
})
t.Run("invalid configuration", func(t *testing.T) {
cfg := Options{}
_, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop())
require.Error(t, err)
require.ErrorContains(t, err, "Servers: non zero value required")
})
}

func TestFactory_Purge(t *testing.T) {
f := NewFactory()
var (
session = &mocks.Session{}
query = &mocks.Query{}
)
session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query)
query.On("Exec").Return(nil)
f.primarySession = session

err := f.Purge(context.Background())
require.NoError(t, err)

session.AssertCalled(t, "Query", mock.AnythingOfType("string"), mock.Anything)
query.AssertCalled(t, "Exec")
}
Loading

0 comments on commit 4fed4f4

Please sign in to comment.