Skip to content

Commit

Permalink
[chore][exporter/elasticsearch] refactor, consolidate exporters (#33318)
Browse files Browse the repository at this point in the history
**Description:**

Replace the `elasticsearchTracesExporter` and
`elasticsearchLogsExporter` with a single `elasticsearchExporter` that
handles both.

Tests have been cleaned up to reduce repetition between traces and logs
exporters, e.g. config validation tests have been moved to more specific
`TestConfig*` tests. To enable this, some further validation has been
added to `Config.Validate`, which was previously done only when reaching
the go-elasticsearch code when constructing exporters.

Various tests have been updated to use the public APIs rather than
internal functions, so we're testing behaviour rather than the
implementation; this will enable further refactoring without breaking
tests.

**Link to tracking Issue:**

None

**Testing:**

Ran the unit tests. This is a non-functional change.

**Documentation:** 

N/A, non-functional change.

---------

Co-authored-by: Carson Ip <carsonip@users.noreply.github.com>
  • Loading branch information
axw and carsonip committed Jun 3, 2024
1 parent ce09071 commit 4148641
Show file tree
Hide file tree
Showing 15 changed files with 896 additions and 1,309 deletions.
2 changes: 0 additions & 2 deletions exporter/elasticsearchexporter/attribute.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package elasticsearchexporter contains an opentelemetry-collector exporter
// for Elasticsearch.
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import "go.opentelemetry.io/collector/pdata/pcommon"
Expand Down
37 changes: 34 additions & 3 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"encoding/base64"
"errors"
"fmt"
"net/url"
"os"
"strings"
"time"
Expand Down Expand Up @@ -182,8 +184,9 @@ const (
)

var (
errConfigNoEndpoint = errors.New("endpoints or cloudid must be specified")
errConfigEmptyEndpoint = errors.New("endpoints must not include empty entries")
errConfigNoEndpoint = errors.New("endpoints or cloudid must be specified")
errConfigEmptyEndpoint = errors.New("endpoints must not include empty entries")
errConfigCloudIDMutuallyExclusive = errors.New("only one of endpoints or cloudid may be specified")
)

func (m MappingMode) String() string {
Expand Down Expand Up @@ -226,19 +229,47 @@ func (cfg *Config) Validate() error {
}
}

if cfg.CloudID != "" {
if len(cfg.Endpoints) > 0 {
return errConfigCloudIDMutuallyExclusive
}
if _, err := parseCloudID(cfg.CloudID); err != nil {
return err
}
}

for _, endpoint := range cfg.Endpoints {
if endpoint == "" {
return errConfigEmptyEndpoint
}
}

if _, ok := mappingModes[cfg.Mapping.Mode]; !ok {
return fmt.Errorf("unknown mapping mode %v", cfg.Mapping.Mode)
return fmt.Errorf("unknown mapping mode %q", cfg.Mapping.Mode)
}

return nil
}

// Based on "addrFromCloudID" in go-elasticsearch.
func parseCloudID(input string) (*url.URL, error) {
_, after, ok := strings.Cut(input, ":")
if !ok {
return nil, fmt.Errorf("invalid CloudID %q", input)
}

decoded, err := base64.StdEncoding.DecodeString(after)
if err != nil {
return nil, err
}

before, after, ok := strings.Cut(string(decoded), "$")
if !ok {
return nil, fmt.Errorf("invalid decoded CloudID %q", string(decoded))
}
return url.Parse(fmt.Sprintf("https://%s.%s", after, before))
}

// MappingMode returns the mapping.mode defined in the given cfg
// object. This method must be called after cfg.Validate() has been
// called without returning an error.
Expand Down
145 changes: 77 additions & 68 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,72 +18,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata"
)

func TestLoad_DeprecatedIndexConfigOption(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config-use-deprecated-index_option.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "log").String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))

assert.Equal(t, cfg, &Config{
QueueSettings: exporterhelper.QueueSettings{
Enabled: false,
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"http://localhost:9200"},
CloudID: "TRNMxjXlNJEt",
Index: "my_log_index",
LogsIndex: "logs-generic-default",
TracesIndex: "traces-generic-default",
Pipeline: "mypipeline",
ClientConfig: ClientConfig{
Authentication: AuthenticationSettings{
User: "elastic",
Password: "search",
APIKey: "AvFsEiPs==",
},
Timeout: 2 * time.Minute,
Headers: map[string]string{
"myheader": "test",
},
},
Discovery: DiscoverySettings{
OnStart: true,
},
Flush: FlushSettings{
Bytes: 10485760,
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{
http.StatusTooManyRequests,
http.StatusInternalServerError,
http.StatusBadGateway,
http.StatusServiceUnavailable,
http.StatusGatewayTimeout,
},
},
Mapping: MappingsSettings{
Mode: "none",
Dedup: true,
Dedot: true,
},
LogstashFormat: LogstashFormatSettings{
Enabled: false,
PrefixSeparator: "-",
DateFormat: "%Y.%m.%d",
},
})
}

func TestLoadConfig(t *testing.T) {
func TestConfig(t *testing.T) {
t.Parallel()

defaultCfg := createDefaultConfig()
Expand Down Expand Up @@ -117,7 +52,6 @@ func TestLoadConfig(t *testing.T) {
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"https://elastic.example.com:9200"},
CloudID: "TRNMxjXlNJEt",
Index: "",
LogsIndex: "logs-generic-default",
TracesIndex: "trace_index",
Expand Down Expand Up @@ -168,7 +102,6 @@ func TestLoadConfig(t *testing.T) {
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"http://localhost:9200"},
CloudID: "TRNMxjXlNJEt",
Index: "",
LogsIndex: "my_log_index",
TracesIndex: "traces-generic-default",
Expand Down Expand Up @@ -219,6 +152,21 @@ func TestLoadConfig(t *testing.T) {
configFile: "config.yaml",
expected: defaultRawCfg,
},
{
id: component.NewIDWithName(metadata.Type, "cloudid"),
configFile: "config.yaml",
expected: withDefaultConfig(func(cfg *Config) {
cfg.CloudID = "foo:YmFyLmNsb3VkLmVzLmlvJGFiYzEyMyRkZWY0NTY="
}),
},
{
id: component.NewIDWithName(metadata.Type, "deprecated_index"),
configFile: "config.yaml",
expected: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"https://elastic.example.com:9200"}
cfg.Index = "my_log_index"
}),
},
}

for _, tt := range tests {
Expand All @@ -239,6 +187,67 @@ func TestLoadConfig(t *testing.T) {
}
}

// TestConfig_Validate tests the error cases of Config.Validate.
//
// Successful validation should be covered by TestConfig above.
func TestConfig_Validate(t *testing.T) {
tests := map[string]struct {
config *Config
err string
}{
"no endpoints": {
config: withDefaultConfig(),
err: "endpoints or cloudid must be specified",
},
"empty endpoint": {
config: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{""}
}),
err: "endpoints must not include empty entries",
},
"invalid cloudid": {
config: withDefaultConfig(func(cfg *Config) {
cfg.CloudID = "invalid"
}),
err: `invalid CloudID "invalid"`,
},
"invalid decoded cloudid": {
config: withDefaultConfig(func(cfg *Config) {
cfg.CloudID = "foo:YWJj"
}),
err: `invalid decoded CloudID "abc"`,
},
"endpoint and cloudid both set": {
config: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"test:9200"}
cfg.CloudID = "foo:YmFyLmNsb3VkLmVzLmlvJGFiYzEyMyRkZWY0NTY="
}),
err: "only one of endpoints or cloudid may be specified",
},
"invalid mapping mode": {
config: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"test:9200"}
cfg.Mapping.Mode = "invalid"
}),
err: `unknown mapping mode "invalid"`,
},
}

for name, tt := range tests {
t.Run(name, func(t *testing.T) {
err := tt.config.Validate()
assert.EqualError(t, err, tt.err)
})
}
}

func TestConfig_Validate_Environment(t *testing.T) {
t.Setenv("ELASTICSEARCH_URL", "test:9200")
config := withDefaultConfig()
err := config.Validate()
require.NoError(t, err)
}

func withDefaultConfig(fns ...func(*Config)) *Config {
cfg := createDefaultConfig().(*Config)
for _, fn := range fns {
Expand Down
6 changes: 6 additions & 0 deletions exporter/elasticsearchexporter/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package elasticsearchexporter contains an opentelemetry-collector exporter
// for Elasticsearch.
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"
2 changes: 0 additions & 2 deletions exporter/elasticsearchexporter/elasticsearch_bulk.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package elasticsearchexporter contains an opentelemetry-collector exporter
// for Elasticsearch.
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package elasticsearchexporter contains an opentelemetry-collector exporter
// for Elasticsearch.
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
Expand All @@ -12,11 +10,12 @@ import (
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)

type elasticsearchTracesExporter struct {
type elasticsearchExporter struct {
logger *zap.Logger

index string
Expand All @@ -28,7 +27,7 @@ type elasticsearchTracesExporter struct {
model mappingModel
}

func newTracesExporter(logger *zap.Logger, cfg *Config) (*elasticsearchTracesExporter, error) {
func newExporter(logger *zap.Logger, cfg *Config, index string, dynamicIndex bool) (*elasticsearchExporter, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
Expand All @@ -49,23 +48,74 @@ func newTracesExporter(logger *zap.Logger, cfg *Config) (*elasticsearchTracesExp
mode: cfg.MappingMode(),
}

return &elasticsearchTracesExporter{
return &elasticsearchExporter{
logger: logger,
client: client,
bulkIndexer: bulkIndexer,

index: cfg.TracesIndex,
dynamicIndex: cfg.TracesDynamicIndex.Enabled,
index: index,
dynamicIndex: dynamicIndex,
model: model,
logstashFormat: cfg.LogstashFormat,
}, nil
}

func (e *elasticsearchTracesExporter) Shutdown(ctx context.Context) error {
func (e *elasticsearchExporter) Shutdown(ctx context.Context) error {
return e.bulkIndexer.Close(ctx)
}

func (e *elasticsearchTracesExporter) pushTraceData(
func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
var errs []error

rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rl := rls.At(i)
resource := rl.Resource()
ills := rl.ScopeLogs()
for j := 0; j < ills.Len(); j++ {
ill := ills.At(j)
scope := ill.Scope()
logs := ill.LogRecords()
for k := 0; k < logs.Len(); k++ {
if err := e.pushLogRecord(ctx, resource, logs.At(k), scope); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}

errs = append(errs, err)
}
}
}
}

return errors.Join(errs...)
}

func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) error {
fIndex := e.index
if e.dynamicIndex {
prefix := getFromAttributes(indexPrefix, resource, scope, record)
suffix := getFromAttributes(indexSuffix, resource, scope, record)

fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)
}

if e.logstashFormat.Enabled {
formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now())
if err != nil {
return err
}
fIndex = formattedIndex
}

document, err := e.model.encodeLog(resource, record, scope)
if err != nil {
return fmt.Errorf("Failed to encode log event: %w", err)
}
return pushDocuments(ctx, fIndex, document, e.bulkIndexer)
}

func (e *elasticsearchExporter) pushTraceData(
ctx context.Context,
td ptrace.Traces,
) error {
Expand Down Expand Up @@ -94,7 +144,7 @@ func (e *elasticsearchTracesExporter) pushTraceData(
return errors.Join(errs...)
}

func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error {
func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error {
fIndex := e.index
if e.dynamicIndex {
prefix := getFromAttributes(indexPrefix, resource, scope, span)
Expand Down
Loading

0 comments on commit 4148641

Please sign in to comment.