Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump upstream base version to 0.111.0 #418

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 116 additions & 10 deletions components/components.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions exporter/clickhouselogsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (

// Config defines configuration for ClickHouse exporter.
type Config struct {
exporterhelper.TimeoutSettings `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.TimeoutConfig `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.QueueConfig `mapstructure:"sending_queue"`

// DSN is the ClickHouse server Data Source Name.
// For tcp protocol reference: [ClickHouse/clickhouse-go#dsn](https://github.com/ClickHouse/clickhouse-go#dsn).
Expand Down
4 changes: 2 additions & 2 deletions exporter/clickhouselogsexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestLoadConfig(t *testing.T) {
r1 := cfg.Exporters[component.NewIDWithName(component.MustNewType(typeStr), "full")].(*Config)
assert.Equal(t, r1, &Config{
DSN: "tcp://127.0.0.1:9000/?dial_timeout=5s",
TimeoutSettings: exporterhelper.TimeoutSettings{
TimeoutConfig: exporterhelper.TimeoutConfig{
Timeout: 5 * time.Second,
},
BackOffConfig: configretry.BackOffConfig{
Expand All @@ -58,7 +58,7 @@ func TestLoadConfig(t *testing.T) {
RandomizationFactor: 0.7,
Multiplier: 1.3,
},
QueueSettings: exporterhelper.QueueSettings{
QueueConfig: exporterhelper.QueueConfig{
Enabled: true,
NumConsumers: 10,
QueueSize: 100,
Expand Down
8 changes: 4 additions & 4 deletions exporter/clickhouselogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pipeline"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -443,7 +443,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
sendDuration := <-chDuration
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, component.DataTypeLogs.String()),
tag.Upsert(exporterKey, pipeline.SignalLogs.String()),
tag.Upsert(tableKey, sendDuration.Name),
},
writeLatencyMillis.M(int64(sendDuration.duration.Milliseconds())),
Expand All @@ -470,7 +470,7 @@ func (e *clickhouseLogsExporter) pushToClickhouse(ctx context.Context, ld plog.L
err = tagStatement.Send()
stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(exporterKey, component.DataTypeLogs.String()),
tag.Upsert(exporterKey, pipeline.SignalLogs.String()),
tag.Upsert(tableKey, DISTRIBUTED_TAG_ATTRIBUTES),
},
writeLatencyMillis.M(int64(time.Since(tagWriteStart).Milliseconds())),
Expand Down Expand Up @@ -787,7 +787,7 @@ func newClickhouseClient(logger *zap.Logger, cfg *Config) (clickhouse.Conn, erro
}

// setting maxIdleConnections = numConsumers + 1 to avoid `prepareBatch:clickhouse: acquire conn timeout` error
maxIdleConnections := cfg.QueueSettings.NumConsumers + 1
maxIdleConnections := cfg.QueueConfig.NumConsumers + 1
if options.MaxIdleConns < maxIdleConnections {
options.MaxIdleConns = maxIdleConnections
options.MaxOpenConns = maxIdleConnections + 5
Expand Down
12 changes: 6 additions & 6 deletions exporter/clickhouselogsexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,17 @@ func NewFactory() exporter.Factory {

func createDefaultConfig() component.Config {
return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(),
QueueConfig: exporterhelper.NewDefaultQueueConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
}
}

// createLogsExporter creates a new exporter for logs.
// Logs are directly insert into clickhouse.
func createLogsExporter(
ctx context.Context,
set exporter.CreateSettings,
set exporter.Settings,
cfg component.Config,
) (exporter.Logs, error) {
c := cfg.(*Config)
Expand All @@ -91,8 +91,8 @@ func createLogsExporter(
cfg,
exporter.pushLogsData,
exporterhelper.WithShutdown(exporter.Shutdown),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithTimeout(c.TimeoutConfig),
exporterhelper.WithQueue(c.QueueConfig),
exporterhelper.WithRetry(c.BackOffConfig),
)
}
2 changes: 1 addition & 1 deletion exporter/clickhouselogsexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestCreateDefaultConfig(t *testing.T) {
func TestFactory_CreateLogsExporter_Fail(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
params := exportertest.NewNopCreateSettings()
params := exportertest.NewNopSettings()
_, err := factory.CreateLogsExporter(context.Background(), params, cfg)
require.Error(t, err, "expected an error when creating a logs exporter")
}
12 changes: 6 additions & 6 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/sirupsen/logrus"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pipeline"
semconv "go.opentelemetry.io/collector/semconv/v1.13.0"

"github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/base"
Expand Down Expand Up @@ -300,7 +300,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, string(component.DataTypeMetrics.String())),
tag.Upsert(exporterKey, pipeline.SignalMetrics.String()),
tag.Upsert(tableKey, DISTRIBUTED_TIME_SERIES_TABLE),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
Expand Down Expand Up @@ -340,7 +340,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, string(component.DataTypeMetrics.String())),
tag.Upsert(exporterKey, pipeline.SignalMetrics.String()),
tag.Upsert(tableKey, DISTRIBUTED_SAMPLES_TABLE),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
Expand Down Expand Up @@ -397,7 +397,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, string(component.DataTypeMetrics.String())),
tag.Upsert(exporterKey, pipeline.SignalMetrics.String()),
tag.Upsert(tableKey, DISTRIBUTED_SAMPLES_TABLE_V4),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
Expand Down Expand Up @@ -450,7 +450,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, string(component.DataTypeMetrics.String())),
tag.Upsert(exporterKey, pipeline.SignalMetrics.String()),
tag.Upsert(tableKey, DISTRIBUTED_TIME_SERIES_TABLE_V4),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
Expand Down Expand Up @@ -539,7 +539,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, string(component.DataTypeMetrics.String())),
tag.Upsert(exporterKey, pipeline.SignalMetrics.String()),
tag.Upsert(tableKey, DISTRIBUTED_EXP_HIST_TABLE),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
Expand Down
4 changes: 2 additions & 2 deletions exporter/clickhousemetricsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (

// Config defines configuration for Remote Write exporter.
type Config struct {
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.TimeoutConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
// QueueConfig allows users to fine tune the queues
// that handle outgoing requests.
RemoteWriteQueue RemoteWriteQueue `mapstructure:"remote_write_queue"`
Expand Down
2 changes: 1 addition & 1 deletion exporter/clickhousemetricsexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func Test_loadConfig(t *testing.T) {
e1 := cfg.Exporters[component.NewIDWithName(component.MustNewType(typeStr), "2")]
assert.Equal(t, e1,
&Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(),
BackOffConfig: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 10 * time.Second,
Expand Down
2 changes: 1 addition & 1 deletion exporter/clickhousemetricsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type PrwExporter struct {

// NewPrwExporter initializes a new PrwExporter instance and sets fields accordingly.
// client parameter cannot be nil.
func NewPrwExporter(cfg *Config, set exporter.CreateSettings) (*PrwExporter, error) {
func NewPrwExporter(cfg *Config, set exporter.Settings) (*PrwExporter, error) {

sanitizedLabels, err := validateAndSanitizeExternalLabels(cfg.ExternalLabels)
if err != nil {
Expand Down
22 changes: 11 additions & 11 deletions exporter/clickhousemetricsexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
// FIXME(srikanthccv): Enable the tests once this issue is fixed: https://github.com/SigNoz/signoz-otel-collector/issues/65
func skip_Test_NewPRWExporter(t *testing.T) {
cfg := &Config{
TimeoutSettings: exporterhelper.TimeoutSettings{},
TimeoutConfig: exporterhelper.TimeoutConfig{},
BackOffConfig: configretry.BackOffConfig{},
Namespace: "",
ExternalLabels: map[string]string{},
Expand All @@ -56,7 +56,7 @@ func skip_Test_NewPRWExporter(t *testing.T) {
Description: "OpenTelemetry Collector",
Version: "1.0",
}
set := exportertest.NewNopCreateSettings()
set := exportertest.NewNopSettings()
set.BuildInfo = buildInfo

tests := []struct {
Expand All @@ -67,7 +67,7 @@ func skip_Test_NewPRWExporter(t *testing.T) {
concurrency int
externalLabels map[string]string
returnErrorOnCreate bool
set exporter.CreateSettings
set exporter.Settings
}{
{
name: "invalid_URL",
Expand Down Expand Up @@ -138,16 +138,16 @@ func skip_Test_NewPRWExporter(t *testing.T) {
// FIXME(srikanthccv): Enable the tests once this issue is fixed: https://github.com/SigNoz/signoz-otel-collector/issues/65
func skip_Test_Start(t *testing.T) {
cfg := &Config{
TimeoutSettings: exporterhelper.TimeoutSettings{},
BackOffConfig: configretry.BackOffConfig{},
Namespace: "",
ExternalLabels: map[string]string{},
TimeoutConfig: exporterhelper.TimeoutConfig{},
BackOffConfig: configretry.BackOffConfig{},
Namespace: "",
ExternalLabels: map[string]string{},
}
buildInfo := component.BuildInfo{
Description: "OpenTelemetry Collector",
Version: "1.0",
}
set := exportertest.NewNopCreateSettings()
set := exportertest.NewNopSettings()
set.BuildInfo = buildInfo
tests := []struct {
name string
Expand All @@ -156,7 +156,7 @@ func skip_Test_Start(t *testing.T) {
concurrency int
externalLabels map[string]string
returnErrorOnStartUp bool
set exporter.CreateSettings
set exporter.Settings
endpoint string
clientSettings confighttp.ClientConfig
}{
Expand Down Expand Up @@ -340,7 +340,7 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) []error {
Description: "OpenTelemetry Collector",
Version: "1.0",
}
set := exportertest.NewNopCreateSettings()
set := exportertest.NewNopSettings()
set.BuildInfo = buildInfo
// after this, instantiate a CortexExporter with the current HTTP client and endpoint set to passed in endpoint
prwe, err := NewPrwExporter(cfg, set)
Expand Down Expand Up @@ -634,7 +634,7 @@ func temp_dis_Test_PushMetrics(t *testing.T) {
Description: "OpenTelemetry Collector",
Version: "1.0",
}
set := exportertest.NewNopCreateSettings()
set := exportertest.NewNopSettings()
set.BuildInfo = buildInfo
prwe, nErr := NewPrwExporter(cfg, set)
require.NoError(t, nErr)
Expand Down
16 changes: 8 additions & 8 deletions exporter/clickhousemetricsexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewFactory() exporter.Factory {
exporter.WithMetrics(createMetricsExporter, component.StabilityLevelUndefined))
}

func createMetricsExporter(ctx context.Context, set exporter.CreateSettings,
func createMetricsExporter(ctx context.Context, set exporter.Settings,
cfg component.Config) (exporter.Metrics, error) {

prwCfg, ok := cfg.(*Config)
Expand All @@ -86,8 +86,8 @@ func createMetricsExporter(ctx context.Context, set exporter.CreateSettings,
set,
cfg,
prwe.PushMetrics,
exporterhelper.WithTimeout(prwCfg.TimeoutSettings),
exporterhelper.WithQueue(exporterhelper.QueueSettings{
exporterhelper.WithTimeout(prwCfg.TimeoutConfig),
exporterhelper.WithQueue(exporterhelper.QueueConfig{
Enabled: prwCfg.RemoteWriteQueue.Enabled,
NumConsumers: 1,
QueueSize: prwCfg.RemoteWriteQueue.QueueSize,
Expand All @@ -106,16 +106,16 @@ func createMetricsExporter(ctx context.Context, set exporter.CreateSettings,

func createDefaultConfig() component.Config {
return &Config{
Namespace: "",
ExternalLabels: map[string]string{},
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
Namespace: "",
ExternalLabels: map[string]string{},
TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
HTTPClientSettings: confighttp.ClientConfig{
Endpoint: "http://some.url:9411/api/prom/push",
// We almost read 0 bytes, so no need to tune ReadBufferSize.
ReadBufferSize: 0,
WriteBufferSize: 512 * 1024,
Timeout: exporterhelper.NewDefaultTimeoutSettings().Timeout,
Timeout: exporterhelper.NewDefaultTimeoutConfig().Timeout,
Headers: map[string]configopaque.String{},
},
// TODO(jbd): Adjust the default queue size.
Expand Down
10 changes: 5 additions & 5 deletions exporter/clickhousemetricsexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,31 +53,31 @@ func skip_Test_createMetricsExporter(t *testing.T) {
tests := []struct {
name string
cfg component.Config
set exporter.CreateSettings
set exporter.Settings
returnErrorOnCreate bool
returnErrorOnStart bool
}{
{"success_case",
createDefaultConfig(),
exportertest.NewNopCreateSettings(),
exportertest.NewNopSettings(),
false,
false,
},
{"fail_case",
nil,
exportertest.NewNopCreateSettings(),
exportertest.NewNopSettings(),
true,
false,
},
{"invalid_config_case",
invalidConfig,
exportertest.NewNopCreateSettings(),
exportertest.NewNopSettings(),
true,
false,
},
{"invalid_tls_config_case",
invalidTLSConfig,
exportertest.NewNopCreateSettings(),
exportertest.NewNopSettings(),
false,
true,
},
Expand Down
2 changes: 1 addition & 1 deletion exporter/clickhousemetricsexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ exporters:
write_buffer_size: 524288
headers:
Prometheus-Remote-Write-Version: "0.1.0"
X-Scope-OrgID: 234
X-Scope-OrgID: '234'
external_labels:
key1: value1
key2: value2
Expand Down
2 changes: 1 addition & 1 deletion exporter/clickhousetracesexporter/clickhouse_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func newExporter(cfg component.Config, logger *zap.Logger) (*storage, error) {

id := uuid.New()

f := ClickHouseNewFactory(id, configClickHouse.Migrations, configClickHouse.Datasource, configClickHouse.DockerMultiNodeCluster, configClickHouse.QueueSettings.NumConsumers)
f := ClickHouseNewFactory(id, configClickHouse.Migrations, configClickHouse.Datasource, configClickHouse.DockerMultiNodeCluster, configClickHouse.QueueConfig.NumConsumers)

err := f.Initialize(logger)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions exporter/clickhousetracesexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,25 @@ type Config struct {
// Docker Multi Node Cluster is a flag to enable the docker multi node cluster. Default is false.
DockerMultiNodeCluster bool `mapstructure:"docker_multi_node_cluster"`
// LowCardinalExceptionGrouping is a flag to enable exception grouping by serviceName + exceptionType. Default is false.
LowCardinalExceptionGrouping bool `mapstructure:"low_cardinal_exception_grouping"`
exporterhelper.TimeoutSettings `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
LowCardinalExceptionGrouping bool `mapstructure:"low_cardinal_exception_grouping"`
exporterhelper.TimeoutConfig `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.QueueConfig `mapstructure:"sending_queue"`
}

var _ component.Config = (*Config)(nil)

// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
if cfg.QueueSettings.QueueSize < 0 {
if cfg.QueueConfig.QueueSize < 0 {
return fmt.Errorf("remote write queue size can't be negative")
}

if cfg.QueueSettings.Enabled && cfg.QueueSettings.QueueSize == 0 {
if cfg.QueueConfig.Enabled && cfg.QueueConfig.QueueSize == 0 {
return fmt.Errorf("a 0 size queue will drop all the data")
}

if cfg.QueueSettings.NumConsumers < 0 {
if cfg.QueueConfig.NumConsumers < 0 {
return fmt.Errorf("remote write consumer number can't be negative")
}
return nil
Expand Down
Loading