Skip to content

Commit

Permalink
[jaeger-v2] Consolidate v1 and v2 Configurations for GRPC Storage (#6042
Browse files Browse the repository at this point in the history
)

## Which problem is this PR solving?
- Resolves #6041

## Description of the changes
- We were currently maintaining two entirely separate configurations for
v1 and v2 for the GRPC Storage Component and were initializing v1
configurations and then translating them to the v2 configurations which
were the ones actually being used. This caused an issue where one
configuration field was left out of the translation method (see
#6030 for more details).
- In this PR, we consolidate the v1 and v2 configurations into a single
config type that is directly initialized to avoid having configurations
that diverge or running into issues like the one described above.

## How was this change tested?
- Unit tests were updated
- Integration tests were not touched but should still pass since this
was just an internal change and the interface was not touched

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
  • Loading branch information
mahadzaryab1 authored Oct 4, 2024
1 parent edc45e0 commit fa31b89
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 85 deletions.
4 changes: 2 additions & 2 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Config struct {
type Backend struct {
Memory *memory.Configuration `mapstructure:"memory"`
Badger *badger.Config `mapstructure:"badger"`
GRPC *grpc.ConfigV2 `mapstructure:"grpc"`
GRPC *grpc.Config `mapstructure:"grpc"`
Cassandra *cassandra.Options `mapstructure:"cassandra"`
Elasticsearch *esCfg.Configuration `mapstructure:"elasticsearch"`
Opensearch *esCfg.Configuration `mapstructure:"opensearch"`
Expand All @@ -66,7 +66,7 @@ func (cfg *Backend) Unmarshal(conf *confmap.Conf) error {
cfg.Badger = v
}
if conf.IsSet("grpc") {
v := grpc.DefaultConfigV2()
v := grpc.DefaultConfig()
cfg.GRPC = &v
}
if conf.IsSet("cassandra") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestGRPC(t *testing.T) {
ext := makeStorageExtenion(t, &Config{
Backends: map[string]Backend{
"foo": {
GRPC: &grpc.ConfigV2{
GRPC: &grpc.Config{
ClientConfig: configgrpc.ClientConfig{
Endpoint: "localhost:12345",
},
Expand Down
31 changes: 4 additions & 27 deletions plugin/storage/grpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,28 @@
package grpc

import (
"time"

"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
)

// Configuration describes the options to customize the storage behavior.
type Configuration struct {
RemoteServerAddr string `yaml:"server" mapstructure:"server"`
RemoteTLS tlscfg.Options
RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"`
TenancyOpts tenancy.Options
}

type ConfigV2 struct {
// Config describes the options to customize the storage behavior
type Config struct {
Tenancy tenancy.Options `mapstructure:"multi_tenancy"`
configgrpc.ClientConfig `mapstructure:",squash"`
exporterhelper.TimeoutSettings `mapstructure:",squash"`
}

func DefaultConfigV2() ConfigV2 {
return ConfigV2{
func DefaultConfig() Config {
return Config{
TimeoutSettings: exporterhelper.TimeoutConfig{
Timeout: defaultConnectionTimeout,
},
}
}

func (c *Configuration) TranslateToConfigV2() *ConfigV2 {
return &ConfigV2{
Tenancy: c.TenancyOpts,
ClientConfig: configgrpc.ClientConfig{
Endpoint: c.RemoteServerAddr,
TLSSetting: c.RemoteTLS.ToOtelClientConfig(),
},
TimeoutSettings: exporterhelper.TimeoutConfig{
Timeout: c.RemoteConnectTimeout,
},
}
}

// ClientPluginServices defines services plugin can expose and its capabilities
type ClientPluginServices struct {
shared.PluginServices
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/grpc/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestDefaultConfigV2(t *testing.T) {
cfg := DefaultConfigV2()
func TestDefaultConfig(t *testing.T) {
cfg := DefaultConfig()
assert.NotEmpty(t, cfg.Timeout)
}
30 changes: 10 additions & 20 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,9 @@ type Factory struct {
metricsFactory metrics.Factory
logger *zap.Logger
tracerProvider trace.TracerProvider

// configV1 is used for backward compatibility. it will be removed in v2.
// In the main initialization logic, only configV2 is used.
configV1 Configuration
configV2 *ConfigV2

services *ClientPluginServices
remoteConn *grpc.ClientConn
config Config
services *ClientPluginServices
remoteConn *grpc.ClientConn
}

// NewFactory creates a new Factory.
Expand All @@ -61,12 +56,12 @@ func NewFactory() *Factory {

// NewFactoryWithConfig is used from jaeger(v2).
func NewFactoryWithConfig(
cfg ConfigV2,
cfg Config,
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
f := NewFactory()
f.configV2 = &cfg
f.config = cfg
if err := f.Initialize(metricsFactory, logger); err != nil {
return nil, err
}
Expand All @@ -75,12 +70,12 @@ func NewFactoryWithConfig(

// AddFlags implements plugin.Configurable
func (*Factory) AddFlags(flagSet *flag.FlagSet) {
v1AddFlags(flagSet)
addFlags(flagSet)
}

// InitFromViper implements plugin.Configurable
func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) {
if err := v1InitFromViper(&f.configV1, v); err != nil {
if err := initFromViper(&f.config, v); err != nil {
logger.Fatal("unable to initialize gRPC storage factory", zap.Error(err))
}
}
Expand All @@ -90,10 +85,6 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
f.metricsFactory, f.logger = metricsFactory, logger
f.tracerProvider = otel.GetTracerProvider()

if f.configV2 == nil {
f.configV2 = f.configV1.TranslateToConfigV2()
}

telset := component.TelemetrySettings{
Logger: logger,
TracerProvider: f.tracerProvider,
Expand All @@ -107,22 +98,22 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
for _, opt := range opts {
clientOpts = append(clientOpts, configgrpc.WithGrpcDialOption(opt))
}
return f.configV2.ToClientConnWithOptions(context.Background(), componenttest.NewNopHost(), telset, clientOpts...)
return f.config.ToClientConnWithOptions(context.Background(), componenttest.NewNopHost(), telset, clientOpts...)
}

var err error
f.services, err = f.newRemoteStorage(telset, newClientFn)
if err != nil {
return fmt.Errorf("grpc storage builder failed to create a store: %w", err)
}
logger.Info("Remote storage configuration", zap.Any("configuration", f.configV2))
logger.Info("Remote storage configuration", zap.Any("configuration", f.config))
return nil
}

type newClientFn func(opts ...grpc.DialOption) (*grpc.ClientConn, error)

func (f *Factory) newRemoteStorage(telset component.TelemetrySettings, newClient newClientFn) (*ClientPluginServices, error) {
c := f.configV2
c := f.config
opts := []grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider))),
}
Expand Down Expand Up @@ -208,6 +199,5 @@ func (f *Factory) Close() error {
if f.remoteConn != nil {
errs = append(errs, f.remoteConn.Close())
}
errs = append(errs, f.configV1.RemoteTLS.Close())
return errors.Join(errs...)
}
10 changes: 5 additions & 5 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func makeFactory(t *testing.T) *Factory {
}

func TestNewFactoryError(t *testing.T) {
cfg := &ConfigV2{
cfg := &Config{
ClientConfig: configgrpc.ClientConfig{
// non-empty Auth is currently not supported
Auth: &configauth.Authentication{},
Expand All @@ -113,15 +113,15 @@ func TestNewFactoryError(t *testing.T) {
t.Run("viper", func(t *testing.T) {
f := NewFactory()
f.InitFromViper(viper.New(), zap.NewNop())
f.configV2 = cfg
f.config = *cfg
err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.Error(t, err)
assert.Contains(t, err.Error(), "authenticator")
})

t.Run("client", func(t *testing.T) {
// this is a silly test to verify handling of error from grpc.NewClient, which cannot be induced via params.
f, err := NewFactoryWithConfig(ConfigV2{}, metrics.NullFactory, zap.NewNop())
f, err := NewFactoryWithConfig(Config{}, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, f.Close()) })
newClientFn := func(_ ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestGRPCStorageFactoryWithConfig(t *testing.T) {
}()
defer s.Stop()

cfg := ConfigV2{
cfg := Config{
ClientConfig: configgrpc.ClientConfig{
Endpoint: lis.Addr().String(),
},
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestWithCLIFlags(t *testing.T) {
})
require.NoError(t, err)
f.InitFromViper(v, zap.NewNop())
assert.Equal(t, "foo:1234", f.configV1.RemoteServerAddr)
assert.Equal(t, "foo:1234", f.config.ClientConfig.Endpoint)
require.NoError(t, f.Close())
}

Expand Down
16 changes: 8 additions & 8 deletions plugin/storage/grpc/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,22 @@ func tlsFlagsConfig() tlscfg.ClientFlagsConfig {
}
}

// AddFlags adds flags for Options
func v1AddFlags(flagSet *flag.FlagSet) {
// addFlags adds flags for Options
func addFlags(flagSet *flag.FlagSet) {
tlsFlagsConfig().AddFlags(flagSet)

flagSet.String(remoteServer, "", "The remote storage gRPC server address as host:port")
flagSet.Duration(remoteConnectionTimeout, defaultConnectionTimeout, "The remote storage gRPC server connection timeout")
}

func v1InitFromViper(cfg *Configuration, v *viper.Viper) error {
cfg.RemoteServerAddr = v.GetString(remoteServer)
var err error
cfg.RemoteTLS, err = tlsFlagsConfig().InitFromViper(v)
func initFromViper(cfg *Config, v *viper.Viper) error {
cfg.ClientConfig.Endpoint = v.GetString(remoteServer)
remoteTLS, err := tlsFlagsConfig().InitFromViper(v)
if err != nil {
return fmt.Errorf("failed to parse gRPC storage TLS options: %w", err)
}
cfg.RemoteConnectTimeout = v.GetDuration(remoteConnectionTimeout)
cfg.TenancyOpts = tenancy.InitFromViper(v)
cfg.ClientConfig.TLSSetting = remoteTLS.ToOtelClientConfig()
cfg.TimeoutSettings.Timeout = v.GetDuration(remoteConnectionTimeout)
cfg.Tenancy = tenancy.InitFromViper(v)
return nil
}
39 changes: 19 additions & 20 deletions plugin/storage/grpc/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,60 @@ import (
)

func TestOptionsWithFlags(t *testing.T) {
v, command := config.Viperize(v1AddFlags, tenancy.AddFlags)
v, command := config.Viperize(addFlags, tenancy.AddFlags)
err := command.ParseFlags([]string{
"--grpc-storage.server=foo:12345",
"--multi-tenancy.header=x-scope-orgid",
})
require.NoError(t, err)
var cfg Configuration
require.NoError(t, v1InitFromViper(&cfg, v))
var cfg Config
require.NoError(t, initFromViper(&cfg, v))

assert.Equal(t, "foo:12345", cfg.RemoteServerAddr)
assert.False(t, cfg.TenancyOpts.Enabled)
assert.Equal(t, "x-scope-orgid", cfg.TenancyOpts.Header)
assert.Equal(t, "foo:12345", cfg.ClientConfig.Endpoint)
assert.False(t, cfg.Tenancy.Enabled)
assert.Equal(t, "x-scope-orgid", cfg.Tenancy.Header)
}

func TestRemoteOptionsWithFlags(t *testing.T) {
v, command := config.Viperize(v1AddFlags)
v, command := config.Viperize(addFlags)
err := command.ParseFlags([]string{
"--grpc-storage.server=localhost:2001",
"--grpc-storage.tls.enabled=true",
"--grpc-storage.connection-timeout=60s",
})
require.NoError(t, err)
var cfg Configuration
require.NoError(t, v1InitFromViper(&cfg, v))
var cfg Config
require.NoError(t, initFromViper(&cfg, v))

assert.Equal(t, "localhost:2001", cfg.RemoteServerAddr)
assert.True(t, cfg.RemoteTLS.Enabled)
assert.Equal(t, 60*time.Second, cfg.RemoteConnectTimeout)
assert.Equal(t, "localhost:2001", cfg.ClientConfig.Endpoint)
assert.False(t, cfg.ClientConfig.TLSSetting.Insecure)
assert.Equal(t, 60*time.Second, cfg.TimeoutSettings.Timeout)
}

func TestRemoteOptionsNoTLSWithFlags(t *testing.T) {
v, command := config.Viperize(v1AddFlags)
v, command := config.Viperize(addFlags)
err := command.ParseFlags([]string{
"--grpc-storage.server=localhost:2001",
"--grpc-storage.tls.enabled=false",
"--grpc-storage.connection-timeout=60s",
})
require.NoError(t, err)
var cfg Configuration
require.NoError(t, v1InitFromViper(&cfg, v))
var cfg Config
require.NoError(t, initFromViper(&cfg, v))

assert.Equal(t, "localhost:2001", cfg.RemoteServerAddr)
assert.False(t, cfg.RemoteTLS.Enabled)
assert.Equal(t, 60*time.Second, cfg.RemoteConnectTimeout)
assert.Equal(t, "localhost:2001", cfg.ClientConfig.Endpoint)
assert.True(t, cfg.ClientConfig.TLSSetting.Insecure)
assert.Equal(t, 60*time.Second, cfg.TimeoutSettings.Timeout)
}

func TestFailedTLSFlags(t *testing.T) {
v, command := config.Viperize(v1AddFlags)
v, command := config.Viperize(addFlags)
err := command.ParseFlags([]string{
"--grpc-storage.tls.enabled=false",
"--grpc-storage.tls.cert=blah", // invalid unless tls.enabled=true
})
require.NoError(t, err)
f := NewFactory()
f.configV2 = nil
core, logs := observer.New(zap.NewAtomicLevelAt(zapcore.ErrorLevel))
logger := zap.New(core, zap.WithFatalHook(zapcore.WriteThenPanic))
require.Panics(t, func() { f.InitFromViper(v, logger) })
Expand Down

0 comments on commit fa31b89

Please sign in to comment.