From 41486419ef000be542bd1f623a7cd9e29900fff1 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Tue, 4 Jun 2024 02:58:07 +0800 Subject: [PATCH] [chore][exporter/elasticsearch] refactor, consolidate exporters (#33318) **Description:** Replace the `elasticsearchTracesExporter` and `elasticsearchLogsExporter` with a single `elasticsearchExporter` that handles both. Tests have been cleaned up to reduce repetition between traces and logs exporters, e.g. config validation tests have been moved to more specific `TestConfig*` tests. To enable this, some further validation has been added to `Config.Validate`, which was previously done only when reaching the go-elasticsearch code when constructing exporters. Various tests have been updated to use the public APIs rather than internal functions, so we're testing behaviour rather than the implementation; this will enable further refactoring without breaking tests. **Link to tracking Issue:** None **Testing:** Ran the unit tests. This is a non-functional change. **Documentation:** N/A, non-functional change. --------- Co-authored-by: Carson Ip --- exporter/elasticsearchexporter/attribute.go | 2 - exporter/elasticsearchexporter/config.go | 37 +- exporter/elasticsearchexporter/config_test.go | 145 ++-- exporter/elasticsearchexporter/doc.go | 6 + .../elasticsearch_bulk.go | 2 - .../{trace_exporter.go => exporter.go} | 70 +- .../elasticsearchexporter/exporter_test.go | 655 ++++++++++++++++++ exporter/elasticsearchexporter/factory.go | 22 +- .../elasticsearchexporter/factory_test.go | 53 +- .../elasticsearchexporter/logs_exporter.go | 122 ---- .../logs_exporter_test.go | 533 -------------- .../config-use-deprecated-index_option.yaml | 38 - .../testdata/config.yaml | 7 +- .../traces_exporter_test.go | 479 ------------- exporter/elasticsearchexporter/utils_test.go | 34 +- 15 files changed, 896 insertions(+), 1309 deletions(-) create mode 100644 exporter/elasticsearchexporter/doc.go rename exporter/elasticsearchexporter/{trace_exporter.go => exporter.go} (55%) create mode 100644 exporter/elasticsearchexporter/exporter_test.go delete mode 100644 exporter/elasticsearchexporter/logs_exporter.go delete mode 100644 exporter/elasticsearchexporter/logs_exporter_test.go delete mode 100644 exporter/elasticsearchexporter/testdata/config-use-deprecated-index_option.yaml delete mode 100644 exporter/elasticsearchexporter/traces_exporter_test.go diff --git a/exporter/elasticsearchexporter/attribute.go b/exporter/elasticsearchexporter/attribute.go index c2c187899b77..311b4ace84d1 100644 --- a/exporter/elasticsearchexporter/attribute.go +++ b/exporter/elasticsearchexporter/attribute.go @@ -1,8 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -// Package elasticsearchexporter contains an opentelemetry-collector exporter -// for Elasticsearch. package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" import "go.opentelemetry.io/collector/pdata/pcommon" diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index fd7d93145943..8c3123294ceb 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -4,8 +4,10 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" import ( + "encoding/base64" "errors" "fmt" + "net/url" "os" "strings" "time" @@ -182,8 +184,9 @@ const ( ) var ( - errConfigNoEndpoint = errors.New("endpoints or cloudid must be specified") - errConfigEmptyEndpoint = errors.New("endpoints must not include empty entries") + errConfigNoEndpoint = errors.New("endpoints or cloudid must be specified") + errConfigEmptyEndpoint = errors.New("endpoints must not include empty entries") + errConfigCloudIDMutuallyExclusive = errors.New("only one of endpoints or cloudid may be specified") ) func (m MappingMode) String() string { @@ -226,6 +229,15 @@ func (cfg *Config) Validate() error { } } + if cfg.CloudID != "" { + if len(cfg.Endpoints) > 0 { + return errConfigCloudIDMutuallyExclusive + } + if _, err := parseCloudID(cfg.CloudID); err != nil { + return err + } + } + for _, endpoint := range cfg.Endpoints { if endpoint == "" { return errConfigEmptyEndpoint @@ -233,12 +245,31 @@ func (cfg *Config) Validate() error { } if _, ok := mappingModes[cfg.Mapping.Mode]; !ok { - return fmt.Errorf("unknown mapping mode %v", cfg.Mapping.Mode) + return fmt.Errorf("unknown mapping mode %q", cfg.Mapping.Mode) } return nil } +// Based on "addrFromCloudID" in go-elasticsearch. +func parseCloudID(input string) (*url.URL, error) { + _, after, ok := strings.Cut(input, ":") + if !ok { + return nil, fmt.Errorf("invalid CloudID %q", input) + } + + decoded, err := base64.StdEncoding.DecodeString(after) + if err != nil { + return nil, err + } + + before, after, ok := strings.Cut(string(decoded), "$") + if !ok { + return nil, fmt.Errorf("invalid decoded CloudID %q", string(decoded)) + } + return url.Parse(fmt.Sprintf("https://%s.%s", after, before)) +} + // MappingMode returns the mapping.mode defined in the given cfg // object. This method must be called after cfg.Validate() has been // called without returning an error. diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 59c6a290692f..8ff84a45c027 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -18,72 +18,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata" ) -func TestLoad_DeprecatedIndexConfigOption(t *testing.T) { - cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config-use-deprecated-index_option.yaml")) - require.NoError(t, err) - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - - sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "log").String()) - require.NoError(t, err) - require.NoError(t, component.UnmarshalConfig(sub, cfg)) - - assert.Equal(t, cfg, &Config{ - QueueSettings: exporterhelper.QueueSettings{ - Enabled: false, - NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers, - QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, - }, - Endpoints: []string{"http://localhost:9200"}, - CloudID: "TRNMxjXlNJEt", - Index: "my_log_index", - LogsIndex: "logs-generic-default", - TracesIndex: "traces-generic-default", - Pipeline: "mypipeline", - ClientConfig: ClientConfig{ - Authentication: AuthenticationSettings{ - User: "elastic", - Password: "search", - APIKey: "AvFsEiPs==", - }, - Timeout: 2 * time.Minute, - Headers: map[string]string{ - "myheader": "test", - }, - }, - Discovery: DiscoverySettings{ - OnStart: true, - }, - Flush: FlushSettings{ - Bytes: 10485760, - }, - Retry: RetrySettings{ - Enabled: true, - MaxRequests: 5, - InitialInterval: 100 * time.Millisecond, - MaxInterval: 1 * time.Minute, - RetryOnStatus: []int{ - http.StatusTooManyRequests, - http.StatusInternalServerError, - http.StatusBadGateway, - http.StatusServiceUnavailable, - http.StatusGatewayTimeout, - }, - }, - Mapping: MappingsSettings{ - Mode: "none", - Dedup: true, - Dedot: true, - }, - LogstashFormat: LogstashFormatSettings{ - Enabled: false, - PrefixSeparator: "-", - DateFormat: "%Y.%m.%d", - }, - }) -} - -func TestLoadConfig(t *testing.T) { +func TestConfig(t *testing.T) { t.Parallel() defaultCfg := createDefaultConfig() @@ -117,7 +52,6 @@ func TestLoadConfig(t *testing.T) { QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, Endpoints: []string{"https://elastic.example.com:9200"}, - CloudID: "TRNMxjXlNJEt", Index: "", LogsIndex: "logs-generic-default", TracesIndex: "trace_index", @@ -168,7 +102,6 @@ func TestLoadConfig(t *testing.T) { QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize, }, Endpoints: []string{"http://localhost:9200"}, - CloudID: "TRNMxjXlNJEt", Index: "", LogsIndex: "my_log_index", TracesIndex: "traces-generic-default", @@ -219,6 +152,21 @@ func TestLoadConfig(t *testing.T) { configFile: "config.yaml", expected: defaultRawCfg, }, + { + id: component.NewIDWithName(metadata.Type, "cloudid"), + configFile: "config.yaml", + expected: withDefaultConfig(func(cfg *Config) { + cfg.CloudID = "foo:YmFyLmNsb3VkLmVzLmlvJGFiYzEyMyRkZWY0NTY=" + }), + }, + { + id: component.NewIDWithName(metadata.Type, "deprecated_index"), + configFile: "config.yaml", + expected: withDefaultConfig(func(cfg *Config) { + cfg.Endpoints = []string{"https://elastic.example.com:9200"} + cfg.Index = "my_log_index" + }), + }, } for _, tt := range tests { @@ -239,6 +187,67 @@ func TestLoadConfig(t *testing.T) { } } +// TestConfig_Validate tests the error cases of Config.Validate. +// +// Successful validation should be covered by TestConfig above. +func TestConfig_Validate(t *testing.T) { + tests := map[string]struct { + config *Config + err string + }{ + "no endpoints": { + config: withDefaultConfig(), + err: "endpoints or cloudid must be specified", + }, + "empty endpoint": { + config: withDefaultConfig(func(cfg *Config) { + cfg.Endpoints = []string{""} + }), + err: "endpoints must not include empty entries", + }, + "invalid cloudid": { + config: withDefaultConfig(func(cfg *Config) { + cfg.CloudID = "invalid" + }), + err: `invalid CloudID "invalid"`, + }, + "invalid decoded cloudid": { + config: withDefaultConfig(func(cfg *Config) { + cfg.CloudID = "foo:YWJj" + }), + err: `invalid decoded CloudID "abc"`, + }, + "endpoint and cloudid both set": { + config: withDefaultConfig(func(cfg *Config) { + cfg.Endpoints = []string{"test:9200"} + cfg.CloudID = "foo:YmFyLmNsb3VkLmVzLmlvJGFiYzEyMyRkZWY0NTY=" + }), + err: "only one of endpoints or cloudid may be specified", + }, + "invalid mapping mode": { + config: withDefaultConfig(func(cfg *Config) { + cfg.Endpoints = []string{"test:9200"} + cfg.Mapping.Mode = "invalid" + }), + err: `unknown mapping mode "invalid"`, + }, + } + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + err := tt.config.Validate() + assert.EqualError(t, err, tt.err) + }) + } +} + +func TestConfig_Validate_Environment(t *testing.T) { + t.Setenv("ELASTICSEARCH_URL", "test:9200") + config := withDefaultConfig() + err := config.Validate() + require.NoError(t, err) +} + func withDefaultConfig(fns ...func(*Config)) *Config { cfg := createDefaultConfig().(*Config) for _, fn := range fns { diff --git a/exporter/elasticsearchexporter/doc.go b/exporter/elasticsearchexporter/doc.go new file mode 100644 index 000000000000..0f6d2f1342aa --- /dev/null +++ b/exporter/elasticsearchexporter/doc.go @@ -0,0 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package elasticsearchexporter contains an opentelemetry-collector exporter +// for Elasticsearch. +package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" diff --git a/exporter/elasticsearchexporter/elasticsearch_bulk.go b/exporter/elasticsearchexporter/elasticsearch_bulk.go index e52a4cd5d232..ea7f496df08d 100644 --- a/exporter/elasticsearchexporter/elasticsearch_bulk.go +++ b/exporter/elasticsearchexporter/elasticsearch_bulk.go @@ -1,8 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -// Package elasticsearchexporter contains an opentelemetry-collector exporter -// for Elasticsearch. package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" import ( diff --git a/exporter/elasticsearchexporter/trace_exporter.go b/exporter/elasticsearchexporter/exporter.go similarity index 55% rename from exporter/elasticsearchexporter/trace_exporter.go rename to exporter/elasticsearchexporter/exporter.go index 073bed4d8b6a..d3c9f124f596 100644 --- a/exporter/elasticsearchexporter/trace_exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -1,8 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -// Package elasticsearchexporter contains an opentelemetry-collector exporter -// for Elasticsearch. package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" import ( @@ -12,11 +10,12 @@ import ( "time" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" ) -type elasticsearchTracesExporter struct { +type elasticsearchExporter struct { logger *zap.Logger index string @@ -28,7 +27,7 @@ type elasticsearchTracesExporter struct { model mappingModel } -func newTracesExporter(logger *zap.Logger, cfg *Config) (*elasticsearchTracesExporter, error) { +func newExporter(logger *zap.Logger, cfg *Config, index string, dynamicIndex bool) (*elasticsearchExporter, error) { if err := cfg.Validate(); err != nil { return nil, err } @@ -49,23 +48,74 @@ func newTracesExporter(logger *zap.Logger, cfg *Config) (*elasticsearchTracesExp mode: cfg.MappingMode(), } - return &elasticsearchTracesExporter{ + return &elasticsearchExporter{ logger: logger, client: client, bulkIndexer: bulkIndexer, - index: cfg.TracesIndex, - dynamicIndex: cfg.TracesDynamicIndex.Enabled, + index: index, + dynamicIndex: dynamicIndex, model: model, logstashFormat: cfg.LogstashFormat, }, nil } -func (e *elasticsearchTracesExporter) Shutdown(ctx context.Context) error { +func (e *elasticsearchExporter) Shutdown(ctx context.Context) error { return e.bulkIndexer.Close(ctx) } -func (e *elasticsearchTracesExporter) pushTraceData( +func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { + var errs []error + + rls := ld.ResourceLogs() + for i := 0; i < rls.Len(); i++ { + rl := rls.At(i) + resource := rl.Resource() + ills := rl.ScopeLogs() + for j := 0; j < ills.Len(); j++ { + ill := ills.At(j) + scope := ill.Scope() + logs := ill.LogRecords() + for k := 0; k < logs.Len(); k++ { + if err := e.pushLogRecord(ctx, resource, logs.At(k), scope); err != nil { + if cerr := ctx.Err(); cerr != nil { + return cerr + } + + errs = append(errs, err) + } + } + } + } + + return errors.Join(errs...) +} + +func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) error { + fIndex := e.index + if e.dynamicIndex { + prefix := getFromAttributes(indexPrefix, resource, scope, record) + suffix := getFromAttributes(indexSuffix, resource, scope, record) + + fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) + } + + if e.logstashFormat.Enabled { + formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now()) + if err != nil { + return err + } + fIndex = formattedIndex + } + + document, err := e.model.encodeLog(resource, record, scope) + if err != nil { + return fmt.Errorf("Failed to encode log event: %w", err) + } + return pushDocuments(ctx, fIndex, document, e.bulkIndexer) +} + +func (e *elasticsearchExporter) pushTraceData( ctx context.Context, td ptrace.Traces, ) error { @@ -94,7 +144,7 @@ func (e *elasticsearchTracesExporter) pushTraceData( return errors.Join(errs...) } -func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error { +func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error { fIndex := e.index if e.dynamicIndex { prefix := getFromAttributes(indexPrefix, resource, scope, span) diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go new file mode 100644 index 000000000000..77825911a7a2 --- /dev/null +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -0,0 +1,655 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package elasticsearchexporter + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "runtime" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestExporterLogs(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("skipping test on Windows, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10178") + } + + t.Run("publish with success", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL) + mustSendLogRecords(t, exporter, plog.NewLogRecord()) + mustSendLogRecords(t, exporter, plog.NewLogRecord()) + + rec.WaitItems(2) + }) + + t.Run("publish with ecs encoding", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + var expectedDoc, actualDoc map[string]any + expected := []byte(`{"attrKey1":"abc","attrKey2":"def","application":"myapp","service":{"name":"myservice"},"error":{"stacktrace":"no no no no"},"agent":{"name":"otlp"},"@timestamp":"1970-01-01T00:00:00.000000000Z","message":"hello world"}`) + err := json.Unmarshal(expected, &expectedDoc) + require.NoError(t, err) + + actual := docs[0].Document + err = json.Unmarshal(actual, &actualDoc) + require.NoError(t, err) + assert.Equal(t, expectedDoc, actualDoc) + + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "ecs" + }) + logs := newLogsWithAttributeAndResourceMap( + // record attrs + map[string]string{ + "application": "myapp", + "service.name": "myservice", + "exception.stacktrace": "no no no no", + }, + // resource attrs + map[string]string{ + "attrKey1": "abc", + "attrKey2": "def", + }, + ) + logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") + mustSendLogs(t, exporter, logs) + rec.WaitItems(1) + }) + + t.Run("publish with dedot", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + assert.JSONEq(t, + `{"attr":{"key":"value"},"agent":{"name":"otlp"},"@timestamp":"1970-01-01T00:00:00.000000000Z"}`, + string(docs[0].Document), + ) + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "ecs" + cfg.Mapping.Dedot = true + }) + logs := newLogsWithAttributeAndResourceMap( + map[string]string{"attr.key": "value"}, + nil, + ) + mustSendLogs(t, exporter, logs) + rec.WaitItems(1) + }) + + t.Run("publish with dedup", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + assert.Equal(t, `{"@timestamp":"1970-01-01T00:00:00.000000000Z","Scope":{"name":"","value":"value","version":""},"SeverityNumber":0,"TraceFlags":0}`, string(docs[0].Document)) + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "raw" + // dedup is the default + }) + logs := newLogsWithAttributeAndResourceMap( + // Scope collides with the top-level "Scope" field, + // so will be removed during deduplication. + map[string]string{"Scope": "value"}, + nil, + ) + mustSendLogs(t, exporter, logs) + rec.WaitItems(1) + }) + + t.Run("publish with headers", func(t *testing.T) { + done := make(chan struct{}, 1) + server := newESTestServerBulkHandlerFunc(t, func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, + fmt.Sprintf("OpenTelemetry Collector/latest (%s/%s)", runtime.GOOS, runtime.GOARCH), + r.UserAgent(), + ) + assert.Equal(t, "bah", r.Header.Get("Foo")) + + w.WriteHeader(http.StatusTeapot) + select { + case done <- struct{}{}: + default: + } + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Headers = map[string]string{"foo": "bah"} + }) + mustSendLogRecords(t, exporter, plog.NewLogRecord()) + <-done + }) + + t.Run("publish with configured user-agent header", func(t *testing.T) { + done := make(chan struct{}, 1) + server := newESTestServerBulkHandlerFunc(t, func(w http.ResponseWriter, r *http.Request) { + // User the configured User-Agent header, rather than + // the default one derived from BuildInfo. + assert.Equal(t, "overridden", r.UserAgent()) + + w.WriteHeader(http.StatusTeapot) + select { + case done <- struct{}{}: + default: + } + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Headers = map[string]string{"User-Agent": "overridden"} + }) + mustSendLogRecords(t, exporter, plog.NewLogRecord()) + <-done + }) + + t.Run("publish with dynamic index", func(t *testing.T) { + + rec := newBulkRecorder() + var ( + prefix = "resprefix-" + suffix = "-attrsuffix" + index = "someindex" + ) + + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.NoError(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.NoError(t, err) + + create := jsonVal["create"].(map[string]any) + expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) + assert.Equal(t, expected, create["_index"].(string)) + + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.LogsIndex = index + cfg.LogsDynamicIndex.Enabled = true + }) + logs := newLogsWithAttributeAndResourceMap( + map[string]string{ + indexPrefix: "attrprefix-", + indexSuffix: suffix, + }, + map[string]string{ + indexPrefix: prefix, + }, + ) + logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") + mustSendLogs(t, exporter, logs) + + rec.WaitItems(1) + }) + + t.Run("publish with logstash index format enabled and dynamic index disabled", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.NoError(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.NoError(t, err) + + create := jsonVal["create"].(map[string]any) + assert.Contains(t, create["_index"], "not-used-index") + + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.LogstashFormat.Enabled = true + cfg.LogsIndex = "not-used-index" + }) + mustSendLogs(t, exporter, newLogsWithAttributeAndResourceMap(nil, nil)) + + rec.WaitItems(1) + }) + + t.Run("publish with logstash index format enabled and dynamic index enabled", func(t *testing.T) { + var ( + prefix = "resprefix-" + suffix = "-attrsuffix" + index = "someindex" + ) + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.NoError(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.NoError(t, err) + + create := jsonVal["create"].(map[string]any) + expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) + + assert.Equal(t, strings.Contains(create["_index"].(string), expected), true) + + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.LogsIndex = index + cfg.LogsDynamicIndex.Enabled = true + cfg.LogstashFormat.Enabled = true + }) + mustSendLogs(t, exporter, newLogsWithAttributeAndResourceMap( + map[string]string{ + indexPrefix: "attrprefix-", + indexSuffix: suffix, + }, + map[string]string{ + indexPrefix: prefix, + }, + )) + rec.WaitItems(1) + }) + + t.Run("retry http request", func(t *testing.T) { + failures := 0 + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + if failures == 0 { + failures++ + return nil, &httpTestError{status: http.StatusTooManyRequests, message: "oops"} + } + + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL) + mustSendLogRecords(t, exporter, plog.NewLogRecord()) + + rec.WaitItems(1) + }) + + t.Run("no retry", func(t *testing.T) { + configurations := map[string]func(*Config){ + "max_requests limited": func(cfg *Config) { + cfg.Retry.MaxRequests = 1 + cfg.Retry.InitialInterval = 1 * time.Millisecond + cfg.Retry.MaxInterval = 10 * time.Millisecond + }, + "retry.enabled is false": func(cfg *Config) { + cfg.Retry.Enabled = false + cfg.Retry.MaxRequests = 10 + cfg.Retry.InitialInterval = 1 * time.Millisecond + cfg.Retry.MaxInterval = 10 * time.Millisecond + }, + } + + handlers := map[string]func(attempts *atomic.Int64) bulkHandler{ + "fail http request": func(attempts *atomic.Int64) bulkHandler { + return func([]itemRequest) ([]itemResponse, error) { + attempts.Add(1) + return nil, &httpTestError{message: "oops"} + } + }, + "fail item": func(attempts *atomic.Int64) bulkHandler { + return func(docs []itemRequest) ([]itemResponse, error) { + attempts.Add(1) + return itemsReportStatus(docs, http.StatusTooManyRequests) + } + }, + } + + for name, handler := range handlers { + handler := handler + t.Run(name, func(t *testing.T) { + t.Parallel() + for name, configurer := range configurations { + configurer := configurer + t.Run(name, func(t *testing.T) { + t.Parallel() + attempts := &atomic.Int64{} + server := newESTestServer(t, handler(attempts)) + + exporter := newTestLogsExporter(t, server.URL, configurer) + mustSendLogRecords(t, exporter, plog.NewLogRecord()) + + time.Sleep(200 * time.Millisecond) + assert.Equal(t, int64(1), attempts.Load()) + }) + } + }) + } + }) + + t.Run("do not retry invalid request", func(t *testing.T) { + attempts := &atomic.Int64{} + server := newESTestServer(t, func(_ []itemRequest) ([]itemResponse, error) { + attempts.Add(1) + return nil, &httpTestError{message: "oops", status: http.StatusBadRequest} + }) + + exporter := newTestLogsExporter(t, server.URL) + mustSendLogRecords(t, exporter, plog.NewLogRecord()) + + time.Sleep(200 * time.Millisecond) + assert.Equal(t, int64(1), attempts.Load()) + }) + + t.Run("retry single item", func(t *testing.T) { + var attempts int + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + attempts++ + + if attempts == 1 { + return itemsReportStatus(docs, http.StatusTooManyRequests) + } + + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL) + mustSendLogRecords(t, exporter, plog.NewLogRecord()) + + rec.WaitItems(1) + }) + + t.Run("do not retry bad item", func(t *testing.T) { + attempts := &atomic.Int64{} + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + attempts.Add(1) + return itemsReportStatus(docs, http.StatusBadRequest) + }) + + exporter := newTestLogsExporter(t, server.URL) + mustSendLogRecords(t, exporter, plog.NewLogRecord()) + + time.Sleep(200 * time.Millisecond) + assert.Equal(t, int64(1), attempts.Load()) + }) + + t.Run("only retry failed items", func(t *testing.T) { + var attempts [3]int + var wg sync.WaitGroup + wg.Add(1) + + const retryIdx = 1 + + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + resp := make([]itemResponse, len(docs)) + for i, doc := range docs { + resp[i].Status = http.StatusOK + + var idxInfo struct { + Attributes struct { + Idx int + } + } + if err := json.Unmarshal(doc.Document, &idxInfo); err != nil { + panic(err) + } + + if idxInfo.Attributes.Idx == retryIdx { + if attempts[retryIdx] == 0 { + resp[i].Status = http.StatusTooManyRequests + } else { + defer wg.Done() + } + } + attempts[idxInfo.Attributes.Idx]++ + } + return resp, nil + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Flush.Interval = 50 * time.Millisecond + cfg.Retry.InitialInterval = 1 * time.Millisecond + cfg.Retry.MaxInterval = 10 * time.Millisecond + }) + for i := 0; i < 3; i++ { + logRecord := plog.NewLogRecord() + logRecord.Attributes().PutInt("idx", int64(i)) + mustSendLogRecords(t, exporter, logRecord) + } + + wg.Wait() // <- this blocks forever if the event is not retried + + assert.Equal(t, [3]int{1, 2, 1}, attempts) + }) +} + +func TestExporterTraces(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("skipping test on Windows, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/14759") + } + + t.Run("publish with success", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestTracesExporter(t, server.URL) + mustSendSpans(t, exporter, ptrace.NewSpan()) + mustSendSpans(t, exporter, ptrace.NewSpan()) + + rec.WaitItems(2) + }) + + t.Run("publish with dynamic index", func(t *testing.T) { + + rec := newBulkRecorder() + var ( + prefix = "resprefix-" + suffix = "-attrsuffix" + index = "someindex" + ) + + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.NoError(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.NoError(t, err) + + create := jsonVal["create"].(map[string]any) + + expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) + assert.Equal(t, expected, create["_index"].(string)) + + return itemsAllOK(docs) + }) + + exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { + cfg.TracesIndex = index + cfg.TracesDynamicIndex.Enabled = true + }) + + mustSendTraces(t, exporter, newTracesWithAttributeAndResourceMap( + map[string]string{ + indexPrefix: "attrprefix-", + indexSuffix: suffix, + }, + map[string]string{ + indexPrefix: prefix, + }, + )) + + rec.WaitItems(1) + }) + + t.Run("publish with logstash format index", func(t *testing.T) { + var defaultCfg Config + + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.NoError(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.NoError(t, err) + + create := jsonVal["create"].(map[string]any) + + assert.Equal(t, strings.Contains(create["_index"].(string), defaultCfg.TracesIndex), true) + + return itemsAllOK(docs) + }) + + exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { + cfg.LogstashFormat.Enabled = true + cfg.TracesIndex = "not-used-index" + defaultCfg = *cfg + }) + + mustSendTraces(t, exporter, newTracesWithAttributeAndResourceMap(nil, nil)) + + rec.WaitItems(1) + }) + + t.Run("publish with logstash format index and dynamic index enabled", func(t *testing.T) { + var ( + prefix = "resprefix-" + suffix = "-attrsuffix" + index = "someindex" + ) + + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.NoError(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.NoError(t, err) + + create := jsonVal["create"].(map[string]any) + expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) + + assert.Equal(t, strings.Contains(create["_index"].(string), expected), true) + + return itemsAllOK(docs) + }) + + exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { + cfg.TracesIndex = index + cfg.TracesDynamicIndex.Enabled = true + cfg.LogstashFormat.Enabled = true + }) + + mustSendTraces(t, exporter, newTracesWithAttributeAndResourceMap( + map[string]string{ + indexPrefix: "attrprefix-", + indexSuffix: suffix, + }, + map[string]string{ + indexPrefix: prefix, + }, + )) + rec.WaitItems(1) + }) +} + +func newTestTracesExporter(t *testing.T, url string, fns ...func(*Config)) exporter.Traces { + f := NewFactory() + cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { + cfg.Endpoints = []string{url} + cfg.NumWorkers = 1 + cfg.Flush.Interval = 10 * time.Millisecond + }}, fns...)...) + exp, err := f.CreateTracesExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + return exp +} + +func newTestLogsExporter(t *testing.T, url string, fns ...func(*Config)) exporter.Logs { + f := NewFactory() + cfg := withDefaultConfig(append([]func(*Config){func(cfg *Config) { + cfg.Endpoints = []string{url} + cfg.NumWorkers = 1 + cfg.Flush.Interval = 10 * time.Millisecond + }}, fns...)...) + exp, err := f.CreateLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + return exp +} + +func mustSendLogRecords(t *testing.T, exporter exporter.Logs, records ...plog.LogRecord) { + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + for _, record := range records { + record.CopyTo(scopeLogs.LogRecords().AppendEmpty()) + } + mustSendLogs(t, exporter, logs) +} + +func mustSendLogs(t *testing.T, exporter exporter.Logs, logs plog.Logs) { + err := exporter.ConsumeLogs(context.Background(), logs) + require.NoError(t, err) +} + +func mustSendSpans(t *testing.T, exporter exporter.Traces, spans ...ptrace.Span) { + traces := ptrace.NewTraces() + resourceSpans := traces.ResourceSpans().AppendEmpty() + scopeSpans := resourceSpans.ScopeSpans().AppendEmpty() + for _, span := range spans { + span.CopyTo(scopeSpans.Spans().AppendEmpty()) + } + mustSendTraces(t, exporter, traces) +} + +func mustSendTraces(t *testing.T, exporter exporter.Traces, traces ptrace.Traces) { + err := exporter.ConsumeTraces(context.Background(), traces) + require.NoError(t, err) +} diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index f50a8e614ecd..afcf2d6197e4 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -82,23 +82,26 @@ func createLogsExporter( cfg component.Config, ) (exporter.Logs, error) { cf := cfg.(*Config) + + index := cf.LogsIndex if cf.Index != "" { set.Logger.Warn("index option are deprecated and replaced with logs_index and traces_index.") + index = cf.Index } setDefaultUserAgentHeader(cf, set.BuildInfo) - logsExporter, err := newLogsExporter(set.Logger, cf) + exporter, err := newExporter(set.Logger, cf, index, cf.LogsDynamicIndex.Enabled) if err != nil { - return nil, fmt.Errorf("cannot configure Elasticsearch logsExporter: %w", err) + return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) } return exporterhelper.NewLogsExporter( ctx, set, cfg, - logsExporter.pushLogsData, - exporterhelper.WithShutdown(logsExporter.Shutdown), + exporter.pushLogsData, + exporterhelper.WithShutdown(exporter.Shutdown), exporterhelper.WithQueue(cf.QueueSettings), ) } @@ -111,17 +114,18 @@ func createTracesExporter(ctx context.Context, setDefaultUserAgentHeader(cf, set.BuildInfo) - tracesExporter, err := newTracesExporter(set.Logger, cf) + exporter, err := newExporter(set.Logger, cf, cf.TracesIndex, cf.TracesDynamicIndex.Enabled) if err != nil { - return nil, fmt.Errorf("cannot configure Elasticsearch tracesExporter: %w", err) + return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) } return exporterhelper.NewTracesExporter( ctx, set, cfg, - tracesExporter.pushTraceData, - exporterhelper.WithShutdown(tracesExporter.Shutdown), - exporterhelper.WithQueue(cf.QueueSettings)) + exporter.pushTraceData, + exporterhelper.WithShutdown(exporter.Shutdown), + exporterhelper.WithQueue(cf.QueueSettings), + ) } // set default User-Agent header with BuildInfo if User-Agent is empty diff --git a/exporter/elasticsearchexporter/factory_test.go b/exporter/elasticsearchexporter/factory_test.go index 86557365b09b..e4d19f32fd78 100644 --- a/exporter/elasticsearchexporter/factory_test.go +++ b/exporter/elasticsearchexporter/factory_test.go @@ -5,12 +5,10 @@ package elasticsearchexporter import ( "context" - "strings" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/exporter/exportertest" ) @@ -32,7 +30,16 @@ func TestFactory_CreateLogsExporter(t *testing.T) { require.NoError(t, err) require.NotNil(t, exporter) - require.NoError(t, exporter.Shutdown(context.TODO())) + require.NoError(t, exporter.Shutdown(context.Background())) +} + +func TestFactory_CreateLogsExporter_Fail(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + params := exportertest.NewNopCreateSettings() + _, err := factory.CreateLogsExporter(context.Background(), params, cfg) + require.Error(t, err, "expected an error when creating a logs exporter") + assert.EqualError(t, err, "cannot configure Elasticsearch exporter: endpoints or cloudid must be specified") } func TestFactory_CreateMetricsExporter_Fail(t *testing.T) { @@ -41,6 +48,20 @@ func TestFactory_CreateMetricsExporter_Fail(t *testing.T) { params := exportertest.NewNopCreateSettings() _, err := factory.CreateMetricsExporter(context.Background(), params, cfg) require.Error(t, err, "expected an error when creating a traces exporter") + assert.EqualError(t, err, "telemetry type is not supported") +} + +func TestFactory_CreateTracesExporter(t *testing.T) { + factory := NewFactory() + cfg := withDefaultConfig(func(cfg *Config) { + cfg.Endpoints = []string{"test:9200"} + }) + params := exportertest.NewNopCreateSettings() + exporter, err := factory.CreateTracesExporter(context.Background(), params, cfg) + require.NoError(t, err) + require.NotNil(t, exporter) + + require.NoError(t, exporter.Shutdown(context.Background())) } func TestFactory_CreateTracesExporter_Fail(t *testing.T) { @@ -49,6 +70,7 @@ func TestFactory_CreateTracesExporter_Fail(t *testing.T) { params := exportertest.NewNopCreateSettings() _, err := factory.CreateTracesExporter(context.Background(), params, cfg) require.Error(t, err, "expected an error when creating a traces exporter") + assert.EqualError(t, err, "cannot configure Elasticsearch exporter: endpoints or cloudid must be specified") } func TestFactory_CreateLogsAndTracesExporterWithDeprecatedIndexOption(t *testing.T) { @@ -61,31 +83,10 @@ func TestFactory_CreateLogsAndTracesExporterWithDeprecatedIndexOption(t *testing logsExporter, err := factory.CreateLogsExporter(context.Background(), params, cfg) require.NoError(t, err) require.NotNil(t, logsExporter) - require.NoError(t, logsExporter.Shutdown(context.TODO())) + require.NoError(t, logsExporter.Shutdown(context.Background())) tracesExporter, err := factory.CreateTracesExporter(context.Background(), params, cfg) require.NoError(t, err) require.NotNil(t, tracesExporter) - require.NoError(t, tracesExporter.Shutdown(context.TODO())) -} - -func TestSetDefaultUserAgentHeader(t *testing.T) { - t.Run("insert default user agent header into empty", func(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig().(*Config) - setDefaultUserAgentHeader(cfg, component.BuildInfo{Description: "mock OpenTelemetry Collector", Version: "latest"}) - assert.Equal(t, len(cfg.Headers), 1) - assert.Equal(t, strings.Contains(cfg.Headers[userAgentHeaderKey], "OpenTelemetry Collector"), true) - }) - - t.Run("ignore user agent header if configured", func(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig().(*Config) - cfg.Headers = map[string]string{ - userAgentHeaderKey: "mock user agent header", - } - setDefaultUserAgentHeader(cfg, component.BuildInfo{Description: "mock OpenTelemetry Collector", Version: "latest"}) - assert.Equal(t, len(cfg.Headers), 1) - assert.Equal(t, cfg.Headers[userAgentHeaderKey], "mock user agent header") - }) + require.NoError(t, tracesExporter.Shutdown(context.Background())) } diff --git a/exporter/elasticsearchexporter/logs_exporter.go b/exporter/elasticsearchexporter/logs_exporter.go deleted file mode 100644 index f7ab2ea8a58f..000000000000 --- a/exporter/elasticsearchexporter/logs_exporter.go +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -// Package elasticsearchexporter contains an opentelemetry-collector exporter -// for Elasticsearch. -package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" - -import ( - "context" - "errors" - "fmt" - "time" - - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" - "go.uber.org/zap" -) - -type elasticsearchLogsExporter struct { - logger *zap.Logger - - index string - logstashFormat LogstashFormatSettings - dynamicIndex bool - - client *esClientCurrent - bulkIndexer *esBulkIndexerCurrent - model mappingModel -} - -func newLogsExporter(logger *zap.Logger, cfg *Config) (*elasticsearchLogsExporter, error) { - if err := cfg.Validate(); err != nil { - return nil, err - } - - client, err := newElasticsearchClient(logger, cfg) - if err != nil { - return nil, err - } - - bulkIndexer, err := newBulkIndexer(logger, client, cfg) - if err != nil { - return nil, err - } - - model := &encodeModel{ - dedup: cfg.Mapping.Dedup, - dedot: cfg.Mapping.Dedot, - mode: cfg.MappingMode(), - } - - indexStr := cfg.LogsIndex - if cfg.Index != "" { - indexStr = cfg.Index - } - esLogsExp := &elasticsearchLogsExporter{ - logger: logger, - client: client, - bulkIndexer: bulkIndexer, - - index: indexStr, - dynamicIndex: cfg.LogsDynamicIndex.Enabled, - model: model, - logstashFormat: cfg.LogstashFormat, - } - return esLogsExp, nil -} - -func (e *elasticsearchLogsExporter) Shutdown(ctx context.Context) error { - return e.bulkIndexer.Close(ctx) -} - -func (e *elasticsearchLogsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { - var errs []error - - rls := ld.ResourceLogs() - for i := 0; i < rls.Len(); i++ { - rl := rls.At(i) - resource := rl.Resource() - ills := rl.ScopeLogs() - for j := 0; j < ills.Len(); j++ { - ill := ills.At(j) - scope := ill.Scope() - logs := ill.LogRecords() - for k := 0; k < logs.Len(); k++ { - if err := e.pushLogRecord(ctx, resource, logs.At(k), scope); err != nil { - if cerr := ctx.Err(); cerr != nil { - return cerr - } - - errs = append(errs, err) - } - } - } - } - - return errors.Join(errs...) -} - -func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) error { - fIndex := e.index - if e.dynamicIndex { - prefix := getFromAttributes(indexPrefix, resource, scope, record) - suffix := getFromAttributes(indexSuffix, resource, scope, record) - - fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) - } - - if e.logstashFormat.Enabled { - formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now()) - if err != nil { - return err - } - fIndex = formattedIndex - } - - document, err := e.model.encodeLog(resource, record, scope) - if err != nil { - return fmt.Errorf("Failed to encode log event: %w", err) - } - return pushDocuments(ctx, fIndex, document, e.bulkIndexer) -} diff --git a/exporter/elasticsearchexporter/logs_exporter_test.go b/exporter/elasticsearchexporter/logs_exporter_test.go deleted file mode 100644 index 28ce2fb0f624..000000000000 --- a/exporter/elasticsearchexporter/logs_exporter_test.go +++ /dev/null @@ -1,533 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package elasticsearchexporter - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "net/http" - "runtime" - "strings" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zaptest" -) - -func TestExporter_New(t *testing.T) { - type validate func(*testing.T, *elasticsearchLogsExporter, error) - - success := func(t *testing.T, exporter *elasticsearchLogsExporter, err error) { - require.NoError(t, err) - require.NotNil(t, exporter) - } - successWithInternalModel := func(expectedModel *encodeModel) validate { - return func(t *testing.T, exporter *elasticsearchLogsExporter, err error) { - assert.NoError(t, err) - assert.EqualValues(t, expectedModel, exporter.model) - } - } - successWithDeprecatedIndexOption := func(index string) validate { - return func(t *testing.T, exporter *elasticsearchLogsExporter, err error) { - require.NoError(t, err) - require.NotNil(t, exporter) - require.EqualValues(t, index, exporter.index) - } - } - - failWith := func(want error) validate { - return func(t *testing.T, exporter *elasticsearchLogsExporter, err error) { - require.Nil(t, exporter) - require.Error(t, err) - if !errors.Is(err, want) { - t.Fatalf("Expected error '%v', but got '%v'", want, err) - } - } - } - - failWithMessage := func(msg string) validate { - return func(t *testing.T, exporter *elasticsearchLogsExporter, err error) { - require.Nil(t, exporter) - require.Error(t, err) - require.Contains(t, err.Error(), msg) - } - } - - tests := map[string]struct { - config *Config - want validate - env map[string]string - }{ - "no endpoint": { - config: withDefaultConfig(), - want: failWith(errConfigNoEndpoint), - }, - "create from default config with ELASTICSEARCH_URL environment variable": { - config: withDefaultConfig(), - want: success, - env: map[string]string{defaultElasticsearchEnvName: "localhost:9200"}, - }, - "create from default with endpoints": { - config: withDefaultConfig(func(cfg *Config) { - cfg.Endpoints = []string{"test:9200"} - }), - want: success, - }, - "create from default config with endpoints and deprecated index_option": { - config: withDefaultConfig(func(cfg *Config) { - cfg.Index = "foo-index" - cfg.Endpoints = []string{"test:9200"} - }), - want: successWithDeprecatedIndexOption("foo-index"), - }, - "create with cloudid": { - config: withDefaultConfig(func(cfg *Config) { - cfg.CloudID = "foo:YmFyLmNsb3VkLmVzLmlvJGFiYzEyMyRkZWY0NTY=" - }), - want: success, - }, - "create with invalid cloudid": { - config: withDefaultConfig(func(cfg *Config) { - cfg.CloudID = "invalid" - }), - want: failWithMessage("cannot parse CloudID"), - }, - "fail if endpoint and cloudid are set": { - config: withDefaultConfig(func(cfg *Config) { - cfg.Endpoints = []string{"test:9200"} - cfg.CloudID = "foo:YmFyLmNsb3VkLmVzLmlvJGFiYzEyMyRkZWY0NTY=" - }), - want: failWithMessage("Addresses and CloudID are set"), - }, - "create with custom request header": { - config: withDefaultConfig(func(cfg *Config) { - cfg.Endpoints = []string{"test:9200"} - cfg.Headers = map[string]string{ - "foo": "bah", - } - }), - want: success, - }, - "create with custom dedup and dedot values": { - config: withDefaultConfig(func(cfg *Config) { - cfg.Endpoints = []string{"test:9200"} - cfg.Mapping.Dedot = false - cfg.Mapping.Dedup = true - }), - want: successWithInternalModel(&encodeModel{dedot: false, dedup: true, mode: MappingNone}), - }, - } - - for name, test := range tests { - t.Run(name, func(t *testing.T) { - env := test.env - if len(env) == 0 { - env = map[string]string{defaultElasticsearchEnvName: ""} - } - - for k, v := range env { - t.Setenv(k, v) - } - - exporter, err := newLogsExporter(zap.NewNop(), test.config) - if exporter != nil { - defer func() { - require.NoError(t, exporter.Shutdown(context.TODO())) - }() - } - - test.want(t, exporter, err) - }) - } -} - -func TestExporter_PushEvent(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping test on Windows, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10178") - } - - t.Run("publish with success", func(t *testing.T) { - rec := newBulkRecorder() - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - rec.Record(docs) - return itemsAllOK(docs) - }) - - exporter := newTestExporter(t, server.URL) - mustSend(t, exporter, `{"message": "test1"}`) - mustSend(t, exporter, `{"message": "test2"}`) - - rec.WaitItems(2) - }) - - t.Run("publish with ecs encoding", func(t *testing.T) { - rec := newBulkRecorder() - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - rec.Record(docs) - - var expectedDoc, actualDoc map[string]any - expected := []byte(`{"attrKey1":"abc","attrKey2":"def","application":"myapp","service":{"name":"myservice"},"error":{"stacktrace":"no no no no"},"agent":{"name":"otlp"},"@timestamp":"1970-01-01T00:00:00.000000000Z","message":"hello world"}`) - err := json.Unmarshal(expected, &expectedDoc) - require.NoError(t, err) - - actual := docs[0].Document - err = json.Unmarshal(actual, &actualDoc) - require.NoError(t, err) - assert.Equal(t, expectedDoc, actualDoc) - - return itemsAllOK(docs) - }) - - testConfig := withTestExporterConfig(func(cfg *Config) { - cfg.Mapping.Mode = "ecs" - })(server.URL) - exporter := newTestExporter(t, server.URL, func(cfg *Config) { *cfg = *testConfig }) - mustSendLogsWithAttributes(t, exporter, - // record attrs - map[string]string{ - "application": "myapp", - "service.name": "myservice", - "exception.stacktrace": "no no no no", - }, - // resource attrs - map[string]string{ - "attrKey1": "abc", - "attrKey2": "def", - }, - // record body - "hello world", - ) - rec.WaitItems(1) - }) - - t.Run("publish with dynamic index", func(t *testing.T) { - - rec := newBulkRecorder() - var ( - prefix = "resprefix-" - suffix = "-attrsuffix" - index = "someindex" - ) - - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - rec.Record(docs) - - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) - expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - assert.Equal(t, expected, create["_index"].(string)) - - return itemsAllOK(docs) - }) - - exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { - cfg.LogsIndex = index - cfg.LogsDynamicIndex.Enabled = true - }) - - mustSendLogsWithAttributes(t, exporter, - map[string]string{ - indexPrefix: "attrprefix-", - indexSuffix: suffix, - }, - map[string]string{ - indexPrefix: prefix, - }, - "hello world", - ) - - rec.WaitItems(1) - }) - - t.Run("publish with logstash index format enabled and dynamic index disabled", func(t *testing.T) { - var defaultCfg Config - rec := newBulkRecorder() - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - rec.Record(docs) - - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) - - assert.Equal(t, strings.Contains(create["_index"].(string), defaultCfg.LogsIndex), true) - - return itemsAllOK(docs) - }) - - exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { - cfg.LogstashFormat.Enabled = true - cfg.LogsIndex = "not-used-index" - defaultCfg = *cfg - }) - - mustSendLogsWithAttributes(t, exporter, nil, nil, "") - - rec.WaitItems(1) - }) - - t.Run("publish with logstash index format enabled and dynamic index enabled", func(t *testing.T) { - var ( - prefix = "resprefix-" - suffix = "-attrsuffix" - index = "someindex" - ) - rec := newBulkRecorder() - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - rec.Record(docs) - - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) - expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - - assert.Equal(t, strings.Contains(create["_index"].(string), expected), true) - - return itemsAllOK(docs) - }) - - exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { - cfg.LogsIndex = index - cfg.LogsDynamicIndex.Enabled = true - cfg.LogstashFormat.Enabled = true - }) - - mustSendLogsWithAttributes(t, exporter, - map[string]string{ - indexPrefix: "attrprefix-", - indexSuffix: suffix, - }, - map[string]string{ - indexPrefix: prefix, - }, - "", - ) - rec.WaitItems(1) - }) - - t.Run("retry http request", func(t *testing.T) { - failures := 0 - rec := newBulkRecorder() - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - if failures == 0 { - failures++ - return nil, &httpTestError{status: http.StatusTooManyRequests, message: "oops"} - } - - rec.Record(docs) - return itemsAllOK(docs) - }) - - exporter := newTestExporter(t, server.URL) - mustSend(t, exporter, `{"message": "test1"}`) - - rec.WaitItems(1) - }) - - t.Run("no retry", func(t *testing.T) { - configurations := map[string]func(string) *Config{ - "max_requests limited": withTestExporterConfig(func(cfg *Config) { - cfg.Retry.MaxRequests = 1 - cfg.Retry.InitialInterval = 1 * time.Millisecond - cfg.Retry.MaxInterval = 10 * time.Millisecond - }), - "retry.enabled is false": withTestExporterConfig(func(cfg *Config) { - cfg.Retry.Enabled = false - cfg.Retry.MaxRequests = 10 - cfg.Retry.InitialInterval = 1 * time.Millisecond - cfg.Retry.MaxInterval = 10 * time.Millisecond - }), - } - - handlers := map[string]func(attempts *atomic.Int64) bulkHandler{ - "fail http request": func(attempts *atomic.Int64) bulkHandler { - return func([]itemRequest) ([]itemResponse, error) { - attempts.Add(1) - return nil, &httpTestError{message: "oops"} - } - }, - "fail item": func(attempts *atomic.Int64) bulkHandler { - return func(docs []itemRequest) ([]itemResponse, error) { - attempts.Add(1) - return itemsReportStatus(docs, http.StatusTooManyRequests) - } - }, - } - - for name, handler := range handlers { - handler := handler - t.Run(name, func(t *testing.T) { - t.Parallel() - for name, configurer := range configurations { - configurer := configurer - t.Run(name, func(t *testing.T) { - t.Parallel() - attempts := &atomic.Int64{} - server := newESTestServer(t, handler(attempts)) - - testConfig := configurer(server.URL) - exporter := newTestExporter(t, server.URL, func(cfg *Config) { *cfg = *testConfig }) - mustSend(t, exporter, `{"message": "test1"}`) - - time.Sleep(200 * time.Millisecond) - assert.Equal(t, int64(1), attempts.Load()) - }) - } - }) - } - }) - - t.Run("do not retry invalid request", func(t *testing.T) { - attempts := &atomic.Int64{} - server := newESTestServer(t, func(_ []itemRequest) ([]itemResponse, error) { - attempts.Add(1) - return nil, &httpTestError{message: "oops", status: http.StatusBadRequest} - }) - - exporter := newTestExporter(t, server.URL) - mustSend(t, exporter, `{"message": "test1"}`) - - time.Sleep(200 * time.Millisecond) - assert.Equal(t, int64(1), attempts.Load()) - }) - - t.Run("retry single item", func(t *testing.T) { - var attempts int - rec := newBulkRecorder() - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - attempts++ - - if attempts == 1 { - return itemsReportStatus(docs, http.StatusTooManyRequests) - } - - rec.Record(docs) - return itemsAllOK(docs) - }) - - exporter := newTestExporter(t, server.URL) - mustSend(t, exporter, `{"message": "test1"}`) - - rec.WaitItems(1) - }) - - t.Run("do not retry bad item", func(t *testing.T) { - attempts := &atomic.Int64{} - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - attempts.Add(1) - return itemsReportStatus(docs, http.StatusBadRequest) - }) - - exporter := newTestExporter(t, server.URL) - mustSend(t, exporter, `{"message": "test1"}`) - - time.Sleep(200 * time.Millisecond) - assert.Equal(t, int64(1), attempts.Load()) - }) - - t.Run("only retry failed items", func(t *testing.T) { - var attempts [3]int - var wg sync.WaitGroup - wg.Add(1) - - const retryIdx = 1 - - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - resp := make([]itemResponse, len(docs)) - for i, doc := range docs { - resp[i].Status = http.StatusOK - - var idxInfo struct{ Idx int } - if err := json.Unmarshal(doc.Document, &idxInfo); err != nil { - panic(err) - } - - if idxInfo.Idx == retryIdx { - if attempts[retryIdx] == 0 { - resp[i].Status = http.StatusTooManyRequests - } else { - defer wg.Done() - } - } - attempts[idxInfo.Idx]++ - } - return resp, nil - }) - - exporter := newTestExporter(t, server.URL, func(cfg *Config) { - cfg.Flush.Interval = 50 * time.Millisecond - cfg.Retry.InitialInterval = 1 * time.Millisecond - cfg.Retry.MaxInterval = 10 * time.Millisecond - }) - mustSend(t, exporter, `{"message": "test1", "idx": 0}`) - mustSend(t, exporter, `{"message": "test2", "idx": 1}`) - mustSend(t, exporter, `{"message": "test3", "idx": 2}`) - - wg.Wait() // <- this blocks forever if the event is not retried - - assert.Equal(t, [3]int{1, 2, 1}, attempts) - }) -} - -func newTestExporter(t *testing.T, url string, fns ...func(*Config)) *elasticsearchLogsExporter { - exporter, err := newLogsExporter(zaptest.NewLogger(t), withTestExporterConfig(fns...)(url)) - require.NoError(t, err) - - t.Cleanup(func() { - require.NoError(t, exporter.Shutdown(context.TODO())) - }) - return exporter -} - -func withTestExporterConfig(fns ...func(*Config)) func(string) *Config { - return func(url string) *Config { - var configMods []func(*Config) - configMods = append(configMods, func(cfg *Config) { - cfg.Endpoints = []string{url} - cfg.NumWorkers = 1 - cfg.Flush.Interval = 10 * time.Millisecond - }) - configMods = append(configMods, fns...) - return withDefaultConfig(configMods...) - } -} - -func mustSend(t *testing.T, exporter *elasticsearchLogsExporter, contents string) { - err := pushDocuments(context.TODO(), exporter.index, []byte(contents), exporter.bulkIndexer) - require.NoError(t, err) -} - -// send trace with span & resource attributes -func mustSendLogsWithAttributes(t *testing.T, exporter *elasticsearchLogsExporter, attrMp map[string]string, resMp map[string]string, body string) { - logs := newLogsWithAttributeAndResourceMap(attrMp, resMp) - resSpans := logs.ResourceLogs().At(0) - scopeLog := resSpans.ScopeLogs().At(0) - logRecords := scopeLog.LogRecords().At(0) - logRecords.Body().SetStr(body) - - err := exporter.pushLogRecord(context.TODO(), resSpans.Resource(), logRecords, scopeLog.Scope()) - require.NoError(t, err) -} diff --git a/exporter/elasticsearchexporter/testdata/config-use-deprecated-index_option.yaml b/exporter/elasticsearchexporter/testdata/config-use-deprecated-index_option.yaml deleted file mode 100644 index 9bf57e686b00..000000000000 --- a/exporter/elasticsearchexporter/testdata/config-use-deprecated-index_option.yaml +++ /dev/null @@ -1,38 +0,0 @@ -elasticsearch/trace: - tls: - insecure: false - endpoints: [ https://elastic.example.com:9200 ] - timeout: 2m - cloudid: TRNMxjXlNJEt - headers: - myheader: test - traces_index: trace_index - pipeline: mypipeline - user: elastic - password: search - api_key: AvFsEiPs== - discover: - on_start: true - flush: - bytes: 10485760 - retry: - max_requests: 5 -elasticsearch/log: - tls: - insecure: false - endpoints: [ http://localhost:9200 ] - index: my_log_index - timeout: 2m - cloudid: TRNMxjXlNJEt - headers: - myheader: test - pipeline: mypipeline - user: elastic - password: search - api_key: AvFsEiPs== - discover: - on_start: true - flush: - bytes: 10485760 - retry: - max_requests: 5 diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index ba323702ea43..b75fda2cf65a 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -5,7 +5,6 @@ elasticsearch/trace: insecure: false endpoints: [https://elastic.example.com:9200] timeout: 2m - cloudid: TRNMxjXlNJEt headers: myheader: test traces_index: trace_index @@ -28,7 +27,6 @@ elasticsearch/log: endpoints: [http://localhost:9200] logs_index: my_log_index timeout: 2m - cloudid: TRNMxjXlNJEt headers: myheader: test pipeline: mypipeline @@ -54,3 +52,8 @@ elasticsearch/raw: endpoints: [http://localhost:9200] mapping: mode: raw +elasticsearch/cloudid: + cloudid: foo:YmFyLmNsb3VkLmVzLmlvJGFiYzEyMyRkZWY0NTY= +elasticsearch/deprecated_index: + endpoints: [https://elastic.example.com:9200] + index: my_log_index diff --git a/exporter/elasticsearchexporter/traces_exporter_test.go b/exporter/elasticsearchexporter/traces_exporter_test.go deleted file mode 100644 index c5490398a56c..000000000000 --- a/exporter/elasticsearchexporter/traces_exporter_test.go +++ /dev/null @@ -1,479 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package elasticsearchexporter - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "net/http" - "runtime" - "strings" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zaptest" -) - -func TestTracesExporter_New(t *testing.T) { - type validate func(*testing.T, *elasticsearchTracesExporter, error) - - success := func(t *testing.T, exporter *elasticsearchTracesExporter, err error) { - require.NoError(t, err) - require.NotNil(t, exporter) - } - successWithInternalModel := func(expectedModel *encodeModel) validate { - return func(t *testing.T, exporter *elasticsearchTracesExporter, err error) { - assert.NoError(t, err) - assert.EqualValues(t, expectedModel, exporter.model) - } - } - - failWith := func(want error) validate { - return func(t *testing.T, exporter *elasticsearchTracesExporter, err error) { - require.Nil(t, exporter) - require.Error(t, err) - if !errors.Is(err, want) { - t.Fatalf("Expected error '%v', but got '%v'", want, err) - } - } - } - - failWithMessage := func(msg string) validate { - return func(t *testing.T, exporter *elasticsearchTracesExporter, err error) { - require.Nil(t, exporter) - require.Error(t, err) - require.Contains(t, err.Error(), msg) - } - } - - tests := map[string]struct { - config *Config - want validate - env map[string]string - }{ - "no endpoint": { - config: withDefaultConfig(), - want: failWith(errConfigNoEndpoint), - }, - "create from default config with ELASTICSEARCH_URL environment variable": { - config: withDefaultConfig(), - want: success, - env: map[string]string{defaultElasticsearchEnvName: "localhost:9200"}, - }, - "create from default with endpoints": { - config: withDefaultConfig(func(cfg *Config) { - cfg.Endpoints = []string{"test:9200"} - }), - want: success, - }, - "create with cloudid": { - config: withDefaultConfig(func(cfg *Config) { - cfg.CloudID = "foo:YmFyLmNsb3VkLmVzLmlvJGFiYzEyMyRkZWY0NTY=" - }), - want: success, - }, - "create with invalid cloudid": { - config: withDefaultConfig(func(cfg *Config) { - cfg.CloudID = "invalid" - }), - want: failWithMessage("cannot parse CloudID"), - }, - "fail if endpoint and cloudid are set": { - config: withDefaultConfig(func(cfg *Config) { - cfg.Endpoints = []string{"test:9200"} - cfg.CloudID = "foo:YmFyLmNsb3VkLmVzLmlvJGFiYzEyMyRkZWY0NTY=" - }), - want: failWithMessage("Addresses and CloudID are set"), - }, - "create with custom dedup and dedot values": { - config: withDefaultConfig(func(cfg *Config) { - cfg.Endpoints = []string{"test:9200"} - cfg.Mapping.Dedot = false - cfg.Mapping.Dedup = true - }), - want: successWithInternalModel(&encodeModel{dedot: false, dedup: true, mode: MappingNone}), - }, - } - - for name, test := range tests { - t.Run(name, func(t *testing.T) { - env := test.env - if len(env) == 0 { - env = map[string]string{defaultElasticsearchEnvName: ""} - } - - for k, v := range env { - t.Setenv(k, v) - } - - exporter, err := newTracesExporter(zap.NewNop(), test.config) - if exporter != nil { - defer func() { - require.NoError(t, exporter.Shutdown(context.TODO())) - }() - } - - test.want(t, exporter, err) - }) - } -} - -func TestExporter_PushTraceRecord(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping test on Windows, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/14759") - } - - t.Run("publish with success", func(t *testing.T) { - rec := newBulkRecorder() - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - rec.Record(docs) - return itemsAllOK(docs) - }) - - exporter := newTestTracesExporter(t, server.URL) - mustSendTraces(t, exporter, `{"message": "test1"}`) - mustSendTraces(t, exporter, `{"message": "test1"}`) - - rec.WaitItems(2) - }) - - t.Run("publish with dynamic index", func(t *testing.T) { - - rec := newBulkRecorder() - var ( - prefix = "resprefix-" - suffix = "-attrsuffix" - index = "someindex" - ) - - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - rec.Record(docs) - - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) - - expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - assert.Equal(t, expected, create["_index"].(string)) - - return itemsAllOK(docs) - }) - - exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { - cfg.TracesIndex = index - cfg.TracesDynamicIndex.Enabled = true - }) - - mustSendTracesWithAttributes(t, exporter, - map[string]string{ - indexPrefix: "attrprefix-", - indexSuffix: suffix, - }, - map[string]string{ - indexPrefix: prefix, - }, - ) - - rec.WaitItems(1) - }) - - t.Run("publish with logstash format index", func(t *testing.T) { - var defaultCfg Config - - rec := newBulkRecorder() - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - rec.Record(docs) - - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) - - assert.Equal(t, strings.Contains(create["_index"].(string), defaultCfg.TracesIndex), true) - - return itemsAllOK(docs) - }) - - exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { - cfg.LogstashFormat.Enabled = true - cfg.TracesIndex = "not-used-index" - defaultCfg = *cfg - }) - - mustSendTracesWithAttributes(t, exporter, nil, nil) - - rec.WaitItems(1) - }) - - t.Run("publish with logstash format index and dynamic index enabled ", func(t *testing.T) { - var ( - prefix = "resprefix-" - suffix = "-attrsuffix" - index = "someindex" - ) - - rec := newBulkRecorder() - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - rec.Record(docs) - - data, err := docs[0].Action.MarshalJSON() - assert.NoError(t, err) - - jsonVal := map[string]any{} - err = json.Unmarshal(data, &jsonVal) - assert.NoError(t, err) - - create := jsonVal["create"].(map[string]any) - expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) - - assert.Equal(t, strings.Contains(create["_index"].(string), expected), true) - - return itemsAllOK(docs) - }) - - exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { - cfg.TracesIndex = index - cfg.TracesDynamicIndex.Enabled = true - cfg.LogstashFormat.Enabled = true - }) - - mustSendTracesWithAttributes(t, exporter, - map[string]string{ - indexPrefix: "attrprefix-", - indexSuffix: suffix, - }, - map[string]string{ - indexPrefix: prefix, - }, - ) - rec.WaitItems(1) - }) - - t.Run("retry http request", func(t *testing.T) { - failures := 0 - rec := newBulkRecorder() - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - if failures == 0 { - failures++ - return nil, &httpTestError{status: http.StatusTooManyRequests, message: "oops"} - } - - rec.Record(docs) - return itemsAllOK(docs) - }) - - exporter := newTestTracesExporter(t, server.URL) - mustSendTraces(t, exporter, `{"message": "test1"}`) - - rec.WaitItems(1) - }) - - t.Run("no retry", func(t *testing.T) { - configurations := map[string]func(string) *Config{ - "max_requests limited": withTestExporterConfig(func(cfg *Config) { - cfg.Retry.MaxRequests = 1 - cfg.Retry.InitialInterval = 1 * time.Millisecond - cfg.Retry.MaxInterval = 10 * time.Millisecond - }), - "retry.enabled is false": withTestExporterConfig(func(cfg *Config) { - cfg.Retry.Enabled = false - cfg.Retry.MaxRequests = 10 - cfg.Retry.InitialInterval = 1 * time.Millisecond - cfg.Retry.MaxInterval = 10 * time.Millisecond - }), - } - - handlers := map[string]func(attempts *atomic.Int64) bulkHandler{ - "fail http request": func(attempts *atomic.Int64) bulkHandler { - return func([]itemRequest) ([]itemResponse, error) { - attempts.Add(1) - return nil, &httpTestError{message: "oops"} - } - }, - "fail item": func(attempts *atomic.Int64) bulkHandler { - return func(docs []itemRequest) ([]itemResponse, error) { - attempts.Add(1) - return itemsReportStatus(docs, http.StatusTooManyRequests) - } - }, - } - - for name, handler := range handlers { - handler := handler - t.Run(name, func(t *testing.T) { - t.Parallel() - for name, configurer := range configurations { - configurer := configurer - t.Run(name, func(t *testing.T) { - t.Parallel() - attempts := &atomic.Int64{} - server := newESTestServer(t, handler(attempts)) - - testConfig := configurer(server.URL) - exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { *cfg = *testConfig }) - mustSendTraces(t, exporter, `{"message": "test1"}`) - - time.Sleep(200 * time.Millisecond) - assert.Equal(t, int64(1), attempts.Load()) - }) - } - }) - } - }) - - t.Run("do not retry invalid request", func(t *testing.T) { - attempts := &atomic.Int64{} - server := newESTestServer(t, func(_ []itemRequest) ([]itemResponse, error) { - attempts.Add(1) - return nil, &httpTestError{message: "oops", status: http.StatusBadRequest} - }) - - exporter := newTestTracesExporter(t, server.URL) - mustSendTraces(t, exporter, `{"message": "test1"}`) - - time.Sleep(200 * time.Millisecond) - assert.Equal(t, int64(1), attempts.Load()) - }) - - t.Run("retry single item", func(t *testing.T) { - var attempts int - rec := newBulkRecorder() - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - attempts++ - - if attempts == 1 { - return itemsReportStatus(docs, http.StatusTooManyRequests) - } - - rec.Record(docs) - return itemsAllOK(docs) - }) - - exporter := newTestTracesExporter(t, server.URL) - mustSendTraces(t, exporter, `{"message": "test1"}`) - - rec.WaitItems(1) - }) - - t.Run("do not retry bad item", func(t *testing.T) { - attempts := &atomic.Int64{} - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - attempts.Add(1) - return itemsReportStatus(docs, http.StatusBadRequest) - }) - - exporter := newTestTracesExporter(t, server.URL) - mustSendTraces(t, exporter, `{"message": "test1"}`) - - time.Sleep(200 * time.Millisecond) - assert.Equal(t, int64(1), attempts.Load()) - }) - - t.Run("only retry failed items", func(t *testing.T) { - var attempts [3]int - var wg sync.WaitGroup - wg.Add(1) - - const retryIdx = 1 - - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - resp := make([]itemResponse, len(docs)) - for i, doc := range docs { - resp[i].Status = http.StatusOK - - var idxInfo struct{ Idx int } - if err := json.Unmarshal(doc.Document, &idxInfo); err != nil { - panic(err) - } - - if idxInfo.Idx == retryIdx { - if attempts[retryIdx] == 0 { - resp[i].Status = http.StatusTooManyRequests - } else { - defer wg.Done() - } - } - attempts[idxInfo.Idx]++ - } - return resp, nil - }) - - exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { - cfg.Flush.Interval = 50 * time.Millisecond - cfg.Retry.InitialInterval = 1 * time.Millisecond - cfg.Retry.MaxInterval = 10 * time.Millisecond - }) - mustSendTraces(t, exporter, `{"message": "test1", "idx": 0}`) - mustSendTraces(t, exporter, `{"message": "test2", "idx": 1}`) - mustSendTraces(t, exporter, `{"message": "test3", "idx": 2}`) - - wg.Wait() // <- this blocks forever if the trace is not retried - - assert.Equal(t, [3]int{1, 2, 1}, attempts) - }) -} -func newTestLogsExporter(t *testing.T, url string, fns ...func(*Config)) *elasticsearchLogsExporter { - exporter, err := newLogsExporter(zaptest.NewLogger(t), withTestTracesExporterConfig(fns...)(url)) - require.NoError(t, err) - - t.Cleanup(func() { - require.NoError(t, exporter.Shutdown(context.TODO())) - }) - return exporter -} - -func newTestTracesExporter(t *testing.T, url string, fns ...func(*Config)) *elasticsearchTracesExporter { - exporter, err := newTracesExporter(zaptest.NewLogger(t), withTestTracesExporterConfig(fns...)(url)) - require.NoError(t, err) - - t.Cleanup(func() { - require.NoError(t, exporter.Shutdown(context.TODO())) - }) - return exporter -} - -func withTestTracesExporterConfig(fns ...func(*Config)) func(string) *Config { - return func(url string) *Config { - var configMods []func(*Config) - configMods = append(configMods, func(cfg *Config) { - cfg.Endpoints = []string{url} - cfg.NumWorkers = 1 - cfg.Flush.Interval = 10 * time.Millisecond - }) - configMods = append(configMods, fns...) - return withDefaultConfig(configMods...) - } -} - -func mustSendTraces(t *testing.T, exporter *elasticsearchTracesExporter, contents string) { - err := pushDocuments(context.TODO(), exporter.index, []byte(contents), exporter.bulkIndexer) - require.NoError(t, err) -} - -// send trace with span & resource attributes -func mustSendTracesWithAttributes(t *testing.T, exporter *elasticsearchTracesExporter, attrMp map[string]string, resMp map[string]string) { - traces := newTracesWithAttributeAndResourceMap(attrMp, resMp) - resSpans := traces.ResourceSpans().At(0) - span := resSpans.ScopeSpans().At(0).Spans().At(0) - scope := resSpans.ScopeSpans().At(0).Scope() - - err := exporter.pushTraceRecord(context.TODO(), resSpans.Resource(), span, scope) - require.NoError(t, err) -} diff --git a/exporter/elasticsearchexporter/utils_test.go b/exporter/elasticsearchexporter/utils_test.go index e53fedfbdd89..de3d60418b24 100644 --- a/exporter/elasticsearchexporter/utils_test.go +++ b/exporter/elasticsearchexporter/utils_test.go @@ -125,23 +125,9 @@ func (r *bulkRecorder) countItems() (count int) { } func newESTestServer(t *testing.T, bulkHandler bulkHandler) *httptest.Server { - mux := http.NewServeMux() - - mux.HandleFunc("/", handleErr(func(w http.ResponseWriter, _ *http.Request) error { - w.Header().Add("X-Elastic-Product", "Elasticsearch") - - enc := json.NewEncoder(w) - return enc.Encode(map[string]any{ - "version": map[string]any{ - "number": currentESVersion, - }, - }) - })) - - mux.HandleFunc("/_bulk", handleErr(func(w http.ResponseWriter, req *http.Request) error { + return newESTestServerBulkHandlerFunc(t, handleErr(func(w http.ResponseWriter, req *http.Request) error { tsStart := time.Now() var items []itemRequest - w.Header().Add("X-Elastic-Product", "Elasticsearch") dec := json.NewDecoder(req.Body) for dec.More() { @@ -171,6 +157,24 @@ func newESTestServer(t *testing.T, bulkHandler bulkHandler) *httptest.Server { enc := json.NewEncoder(w) return enc.Encode(bulkResult{Took: took, Items: resp, HasErrors: itemsHasError(resp)}) })) +} + +func newESTestServerBulkHandlerFunc(t *testing.T, handler http.HandlerFunc) *httptest.Server { + mux := http.NewServeMux() + mux.HandleFunc("/", handleErr(func(w http.ResponseWriter, _ *http.Request) error { + w.Header().Add("X-Elastic-Product", "Elasticsearch") + + enc := json.NewEncoder(w) + return enc.Encode(map[string]any{ + "version": map[string]any{ + "number": currentESVersion, + }, + }) + })) + mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("X-Elastic-Product", "Elasticsearch") + handler.ServeHTTP(w, r) + }) server := httptest.NewServer(mux) t.Cleanup(server.Close)