From 4fed4f44026a3874c914d982ee6a08cf2ffc91d8 Mon Sep 17 00:00:00 2001 From: Harshvir Potpose <122517264+akagami-harsh@users.noreply.github.com> Date: Sun, 5 May 2024 09:48:25 +0530 Subject: [PATCH] [jaeger-v2] Add Cassandra e2e integration tests (#5398) ## Which problem is this PR solving? - part of #5254 ## Description of the changes - added cassandra integration tests - added method to purge cassandra storage ## How was this change tested? - some tests are failing ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Harshvir Potpose Signed-off-by: Harshvir Potpose <122517264+akagami-harsh@users.noreply.github.com> Signed-off-by: Yuri Shkuro Signed-off-by: Yuri Shkuro Co-authored-by: Yuri Shkuro Co-authored-by: Yuri Shkuro --- .github/workflows/ci-cassandra.yml | 7 +- cmd/jaeger/config-cassandra.yaml | 16 +++- .../extension/jaegerstorage/config.go | 14 +-- .../extension/jaegerstorage/extension.go | 3 +- .../internal/integration/cassandra_test.go | 30 ++++++ pkg/cassandra/config/config.go | 12 +-- plugin/storage/cassandra/factory.go | 42 ++++++--- plugin/storage/cassandra/factory_test.go | 93 +++++++++++++------ plugin/storage/cassandra/options.go | 16 ++-- plugin/storage/integration/cassandra_test.go | 11 +-- plugin/storage/integration/integration.go | 11 +++ scripts/cassandra-integration-test.sh | 19 +++- 12 files changed, 190 insertions(+), 84 deletions(-) create mode 100644 cmd/jaeger/internal/integration/cassandra_test.go diff --git a/.github/workflows/ci-cassandra.yml b/.github/workflows/ci-cassandra.yml index aa7bb3a5465..e8b67f91dac 100644 --- a/.github/workflows/ci-cassandra.yml +++ b/.github/workflows/ci-cassandra.yml @@ -20,6 +20,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: + jaeger-version: [v1, v2] version: - distribution: cassandra major: 3.x @@ -29,7 +30,7 @@ jobs: major: 4.x image: 4.0 schema: v004 - name: ${{ matrix.version.distribution }} ${{ matrix.version.major }} + name: ${{ matrix.version.distribution }} ${{ matrix.version.major }} ${{ matrix.jaeger-version }} steps: - name: Harden Runner uses: step-security/harden-runner@63c24ba6bd7ba022e95695ff85de572c04a18142 # v2.7.0 @@ -43,10 +44,10 @@ jobs: go-version: 1.22.x - name: Run cassandra integration tests - run: bash scripts/cassandra-integration-test.sh ${{ matrix.version.image }} ${{ matrix.version.schema }} + run: bash scripts/cassandra-integration-test.sh ${{ matrix.version.image }} ${{ matrix.version.schema }} ${{ matrix.jaeger-version }} - name: Upload coverage to codecov uses: ./.github/actions/upload-codecov with: file: cover.out - flags: cassandra-${{ matrix.version.major }} + flags: cassandra-${{ matrix.version.major }}-${{ matrix.jaeger-version }} diff --git a/cmd/jaeger/config-cassandra.yaml b/cmd/jaeger/config-cassandra.yaml index 86efdcfaf29..39cfb319489 100644 --- a/cmd/jaeger/config-cassandra.yaml +++ b/cmd/jaeger/config-cassandra.yaml @@ -17,15 +17,27 @@ extensions: cassandra_main: servers: 127.0.0.1 port: 9042 + keyspace: "jaeger_v1_dc1" + connections_per_host: 2 + index: + tags: true + logs: true + process_tags: true cassandra_archive: servers: 127.0.0.1 port: 9042 + keyspace: "jaeger_v1_dc1" + connections_per_host: 2 + index: + tags: true + logs: true + process_tags: true receivers: otlp: protocols: grpc: http: - + jaeger: protocols: grpc: @@ -38,4 +50,4 @@ processors: exporters: jaeger_storage_exporter: - trace_storage: cassandra_main \ No newline at end of file + trace_storage: cassandra_main diff --git a/cmd/jaeger/internal/extension/jaegerstorage/config.go b/cmd/jaeger/internal/extension/jaegerstorage/config.go index 10afd448c9e..542c5a6437a 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/config.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/config.go @@ -7,21 +7,21 @@ import ( "fmt" "reflect" - cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config" esCfg "github.com/jaegertracing/jaeger/pkg/es/config" memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger" + "github.com/jaegertracing/jaeger/plugin/storage/cassandra" grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc/config" ) // Config has the configuration for jaeger-query, type Config struct { - Memory map[string]memoryCfg.Configuration `mapstructure:"memory"` - Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"` - GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"` - Opensearch map[string]esCfg.Configuration `mapstructure:"opensearch"` - Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"` - Cassandra map[string]cassandraCfg.Configuration `mapstructure:"cassandra"` + Memory map[string]memoryCfg.Configuration `mapstructure:"memory"` + Badger map[string]badgerCfg.NamespaceConfig `mapstructure:"badger"` + GRPC map[string]grpcCfg.Configuration `mapstructure:"grpc"` + Opensearch map[string]esCfg.Configuration `mapstructure:"opensearch"` + Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"` + Cassandra map[string]cassandra.Options `mapstructure:"cassandra"` // TODO add other storage types here // TODO how will this work with 3rd party storage implementations? // Option: instead of looking for specific name, check interface. diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index f718ca8cef0..28b3c4920a8 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -13,7 +13,6 @@ import ( "go.opentelemetry.io/collector/extension" "go.uber.org/zap" - cassandraCfg "github.com/jaegertracing/jaeger/pkg/cassandra/config" esCfg "github.com/jaegertracing/jaeger/pkg/es/config" memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" "github.com/jaegertracing/jaeger/pkg/metrics" @@ -136,7 +135,7 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error { cfg: s.config.Opensearch, builder: es.NewFactoryWithConfig, } - cassandraStarter := &starter[cassandraCfg.Configuration, *cassandra.Factory]{ + cassandraStarter := &starter[cassandra.Options, *cassandra.Factory]{ ext: s, storageKind: "cassandra", cfg: s.config.Cassandra, diff --git a/cmd/jaeger/internal/integration/cassandra_test.go b/cmd/jaeger/internal/integration/cassandra_test.go new file mode 100644 index 00000000000..f04cb335ee7 --- /dev/null +++ b/cmd/jaeger/internal/integration/cassandra_test.go @@ -0,0 +1,30 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package integration + +import ( + "testing" + + "github.com/jaegertracing/jaeger/plugin/storage/integration" +) + +func TestCassandraStorage(t *testing.T) { + integration.SkipUnlessEnv(t, "cassandra") + s := &E2EStorageIntegration{ + ConfigFile: "../../config-cassandra.yaml", + StorageIntegration: integration.StorageIntegration{ + CleanUp: purge, + GetDependenciesReturnsSource: true, + SkipArchiveTest: true, + SkipBinaryAttrs: true, + + SkipList: integration.CassandraSkippedTests, + }, + } + s.e2eInitialize(t, "cassandra") + t.Cleanup(func() { + s.e2eCleanUp(t) + }) + s.RunSpanStoreTests(t) +} diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index a55ed402756..e898b74d6f6 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -31,14 +31,14 @@ import ( // Configuration describes the configuration properties needed to connect to a Cassandra cluster type Configuration struct { Servers []string `valid:"required,url" mapstructure:"servers"` - Keyspace string `valid:"nonzero" mapstructure:"keyspace"` + Keyspace string `mapstructure:"keyspace"` LocalDC string `mapstructure:"local_dc"` - ConnectionsPerHost int `valid:"min=1" mapstructure:"connections_per_host"` - Timeout time.Duration `valid:"min=500" mapstructure:"-"` + ConnectionsPerHost int `mapstructure:"connections_per_host"` + Timeout time.Duration `mapstructure:"-"` ConnectTimeout time.Duration `mapstructure:"connection_timeout"` - ReconnectInterval time.Duration `valid:"min=500" mapstructure:"reconnect_interval"` - SocketKeepAlive time.Duration `valid:"min=0" mapstructure:"socket_keep_alive"` - MaxRetryAttempts int `valid:"min=0" mapstructure:"max_retry_attempts"` + ReconnectInterval time.Duration `mapstructure:"reconnect_interval"` + SocketKeepAlive time.Duration `mapstructure:"socket_keep_alive"` + MaxRetryAttempts int `mapstructure:"max_retry_attempts"` ProtoVersion int `mapstructure:"proto_version"` Consistency string `mapstructure:"consistency"` DisableCompression bool `mapstructure:"disable_compression"` diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 5e97a9065e5..29335ed08a1 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -50,6 +50,7 @@ const ( var ( // interface comformance checks _ storage.Factory = (*Factory)(nil) + _ storage.Purger = (*Factory)(nil) _ storage.ArchiveFactory = (*Factory)(nil) _ storage.SamplingStoreFactory = (*Factory)(nil) _ io.Closer = (*Factory)(nil) @@ -81,20 +82,40 @@ func NewFactory() *Factory { // NewFactoryWithConfig initializes factory with Config. func NewFactoryWithConfig( - cfg config.Configuration, + cfg Options, metricsFactory metrics.Factory, logger *zap.Logger, ) (*Factory, error) { - if err := cfg.Validate(); err != nil { + f := NewFactory() + // use this to help with testing + b := &withConfigBuilder{ + f: f, + cfg: &cfg, + metricsFactory: metricsFactory, + logger: logger, + initializer: f.Initialize, // this can be mocked in tests + } + return b.build() +} + +type withConfigBuilder struct { + f *Factory + cfg *Options + metricsFactory metrics.Factory + logger *zap.Logger + initializer func(metricsFactory metrics.Factory, logger *zap.Logger) error +} + +func (b *withConfigBuilder) build() (*Factory, error) { + b.f.InitFromOptions(b.cfg) + if err := b.cfg.Primary.Validate(); err != nil { return nil, err } - f := NewFactory() - f.primaryConfig = &cfg - err := f.Initialize(metricsFactory, logger) + err := b.initializer(b.metricsFactory, b.logger) if err != nil { return nil, err } - return f, nil + return b.f, nil } // AddFlags implements plugin.Configurable @@ -114,6 +135,10 @@ func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) { // InitFromOptions initializes factory from options. func (f *Factory) InitFromOptions(o *Options) { f.Options = o + // TODO this is a hack because we do not define defaults in Options + if o.others == nil { + o.others = make(map[string]*namespaceConfig) + } f.primaryConfig = o.GetPrimary() if cfg := f.Options.Get(archiveStorageConfig); cfg != nil { f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error @@ -248,11 +273,6 @@ func (f *Factory) Close() error { return errors.Join(errs...) } -// PrimarySession is used from integration tests to clean database between tests -func (f *Factory) PrimarySession() cassandra.Session { - return f.primarySession -} - func (f *Factory) Purge(_ context.Context) error { return f.primarySession.Query("TRUNCATE traces").Exec() } diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 5f9fbb30757..205e56cd303 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -16,6 +16,7 @@ package cassandra import ( + "context" "errors" "testing" @@ -200,35 +201,67 @@ func TestInitFromOptions(t *testing.T) { assert.Equal(t, o.Get(archiveStorageConfig), f.archiveConfig) } -func TestConfigurationValidation(t *testing.T) { - testCases := []struct { - name string - cfg cassandraCfg.Configuration - wantErr bool - }{ - { - name: "valid configuration", - cfg: cassandraCfg.Configuration{ - Servers: []string{"http://localhost:9200"}, +func TestNewFactoryWithConfig(t *testing.T) { + t.Run("valid configuration", func(t *testing.T) { + cfg := Options{ + Primary: namespaceConfig{ + Configuration: cassandraCfg.Configuration{ + Servers: []string{"localhost:9200"}, + }, }, - wantErr: false, - }, - { - name: "missing servers", - cfg: cassandraCfg.Configuration{}, - wantErr: true, - }, - } - for _, test := range testCases { - t.Run(test.name, func(t *testing.T) { - err := test.cfg.Validate() - if test.wantErr { - require.Error(t, err) - _, err = NewFactoryWithConfig(test.cfg, metrics.NullFactory, zap.NewNop()) - require.Error(t, err) - } else { - require.NoError(t, err) - } - }) - } + } + f := NewFactory() + b := &withConfigBuilder{ + f: f, + cfg: &cfg, + metricsFactory: metrics.NullFactory, + logger: zap.NewNop(), + initializer: func(metricsFactory metrics.Factory, logger *zap.Logger) error { return nil }, + } + _, err := b.build() + require.NoError(t, err) + }) + t.Run("connection error", func(t *testing.T) { + expErr := errors.New("made-up error") + cfg := Options{ + Primary: namespaceConfig{ + Configuration: cassandraCfg.Configuration{ + Servers: []string{"localhost:9200"}, + }, + }, + } + f := NewFactory() + b := &withConfigBuilder{ + f: f, + cfg: &cfg, + metricsFactory: metrics.NullFactory, + logger: zap.NewNop(), + initializer: func(metricsFactory metrics.Factory, logger *zap.Logger) error { return expErr }, + } + _, err := b.build() + require.ErrorIs(t, err, expErr) + }) + t.Run("invalid configuration", func(t *testing.T) { + cfg := Options{} + _, err := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) + require.Error(t, err) + require.ErrorContains(t, err, "Servers: non zero value required") + }) +} + +func TestFactory_Purge(t *testing.T) { + f := NewFactory() + var ( + session = &mocks.Session{} + query = &mocks.Query{} + ) + session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) + query.On("Exec").Return(nil) + f.primarySession = session + + err := f.Purge(context.Background()) + require.NoError(t, err) + + session.AssertCalled(t, "Query", mock.AnythingOfType("string"), mock.Anything) + query.AssertCalled(t, "Exec") } diff --git a/plugin/storage/cassandra/options.go b/plugin/storage/cassandra/options.go index 4e3f0e09964..b2e76b24710 100644 --- a/plugin/storage/cassandra/options.go +++ b/plugin/storage/cassandra/options.go @@ -53,6 +53,8 @@ const ( suffixIndexLogs = ".index.logs" suffixIndexTags = ".index.tags" suffixIndexProcessTags = ".index.process-tags" + + defaultHost = "127.0.0.1" ) // Options contains various type of Cassandra configs and provides the ability @@ -80,7 +82,6 @@ type IndexConfig struct { // preparing the actual config.Configuration. type namespaceConfig struct { config.Configuration `mapstructure:",squash"` - servers string namespace string Enabled bool `mapstructure:"-"` } @@ -96,8 +97,8 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { ProtoVersion: 4, ConnectionsPerHost: 2, ReconnectInterval: 60 * time.Second, + Servers: []string{defaultHost}, }, - servers: "127.0.0.1", namespace: primaryNamespace, Enabled: true, }, @@ -175,7 +176,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig namespaceConfig) { "Reconnect interval to retry connecting to downed hosts") flagSet.String( nsConfig.namespace+suffixServers, - nsConfig.servers, + strings.Join(nsConfig.Servers, ","), "The comma-separated list of Cassandra servers") flagSet.Int( nsConfig.namespace+suffixPort, @@ -245,7 +246,8 @@ func (cfg *namespaceConfig) initFromViper(v *viper.Viper) { cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout) cfg.ConnectTimeout = v.GetDuration(cfg.namespace + suffixConnectTimeout) cfg.ReconnectInterval = v.GetDuration(cfg.namespace + suffixReconnectInterval) - cfg.servers = stripWhiteSpace(v.GetString(cfg.namespace + suffixServers)) + servers := stripWhiteSpace(v.GetString(cfg.namespace + suffixServers)) + cfg.Servers = strings.Split(servers, ",") cfg.Port = v.GetInt(cfg.namespace + suffixPort) cfg.Keyspace = v.GetString(cfg.namespace + suffixKeyspace) cfg.LocalDC = v.GetString(cfg.namespace + suffixDC) @@ -265,7 +267,6 @@ func (cfg *namespaceConfig) initFromViper(v *viper.Viper) { // GetPrimary returns primary configuration. func (opt *Options) GetPrimary() *config.Configuration { - opt.Primary.Servers = strings.Split(opt.Primary.servers, ",") return &opt.Primary.Configuration } @@ -280,10 +281,9 @@ func (opt *Options) Get(namespace string) *config.Configuration { return nil } nsCfg.Configuration.ApplyDefaults(&opt.Primary.Configuration) - if nsCfg.servers == "" { - nsCfg.servers = opt.Primary.servers + if len(nsCfg.Servers) == 0 { + nsCfg.Servers = opt.Primary.Servers } - nsCfg.Servers = strings.Split(nsCfg.servers, ",") return &nsCfg.Configuration } diff --git a/plugin/storage/integration/cassandra_test.go b/plugin/storage/integration/cassandra_test.go index 41d95216a43..ebb153459c1 100644 --- a/plugin/storage/integration/cassandra_test.go +++ b/plugin/storage/integration/cassandra_test.go @@ -40,16 +40,7 @@ func newCassandraStorageIntegration() *CassandraStorageIntegration { GetDependenciesReturnsSource: true, SkipArchiveTest: true, - SkipList: []string{ - "Tags_+_Operation_name_+_Duration_range", - "Tags_+_Duration_range", - "Tags_+_Operation_name_+_max_Duration", - "Tags_+_max_Duration", - "Operation_name_+_Duration_range", - "Duration_range", - "max_Duration", - "Multiple_Traces", - }, + SkipList: CassandraSkippedTests, }, } s.CleanUp = s.cleanUp diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 287aac44625..9e73fedeb92 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -113,6 +113,17 @@ func SkipUnlessEnv(t *testing.T, storage ...string) { t.Skipf("This test requires environment variable STORAGE=%s", strings.Join(storage, "|")) } +var CassandraSkippedTests = []string{ + "Tags_+_Operation_name_+_Duration_range", + "Tags_+_Duration_range", + "Tags_+_Operation_name_+_max_Duration", + "Tags_+_max_Duration", + "Operation_name_+_Duration_range", + "Duration_range", + "max_Duration", + "Multiple_Traces", +} + func (s *StorageIntegration) skipIfNeeded(t *testing.T) { for _, pat := range s.SkipList { escapedPat := regexp.QuoteMeta(pat) diff --git a/scripts/cassandra-integration-test.sh b/scripts/cassandra-integration-test.sh index d232c3f7b97..1eb31b69dac 100755 --- a/scripts/cassandra-integration-test.sh +++ b/scripts/cassandra-integration-test.sh @@ -8,8 +8,8 @@ usage() { } check_arg() { - if [ ! $# -eq 2 ]; then - echo "ERROR: need exactly two arguments, " + if [ ! $# -eq 3 ]; then + echo "ERROR: need exactly three arguments, " usage fi } @@ -52,11 +52,20 @@ apply_schema() { run_integration_test() { local version=$1 local schema_version=$2 + local jaegerVersion=$3 local cid cid=$(setup_cassandra "${version}") apply_schema "$2" - STORAGE=cassandra make storage-integration-test - exit_status=$? + if [ "${jaegerVersion}" = "v1" ]; then + STORAGE=cassandra make storage-integration-test + exit_status=$? + elif [ "${jaegerVersion}" == "v2" ]; then + STORAGE=cassandra make jaeger-v2-storage-integration-test + exit_status=$? + else + echo "Unknown jaeger version $jaegerVersion. Valid options are v1 or v2" + exit 1 + fi # shellcheck disable=SC2064 trap "teardown_cassandra ${cid}" EXIT } @@ -65,7 +74,7 @@ main() { check_arg "$@" echo "Executing integration test for $1 with schema $2.cql.tmpl" - run_integration_test "$1" "$2" + run_integration_test "$1" "$2" "$3" } main "$@"