Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] Data stream routing based on data_stream.* attributes #33794

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6ceef08
[exporter/elasticsearch] route based on data stream attributes
andrzej-stencel Jun 3, 2024
dc8e980
add issue number to changelog entry
andrzej-stencel Jun 25, 2024
cb88ebc
fill in missing data stream attributes
andrzej-stencel Jun 26, 2024
7f41b41
make gotidy
andrzej-stencel Jun 26, 2024
2aa1c72
Add back metrics grouping logic; Fix missing scope DS attrs
carsonip Jun 27, 2024
ad4a019
Update changelog
carsonip Jun 27, 2024
1eb99c5
Fix scope override bug
carsonip Jun 27, 2024
d134ef7
Remove ability to override DS type
carsonip Jun 27, 2024
cf382bc
Refactor tests
carsonip Jun 27, 2024
fcbdd95
Update exporter/elasticsearchexporter/model.go
carsonip Jun 28, 2024
608d88e
Rename to data_stream_router.go
carsonip Jun 28, 2024
0567fb1
Remove distinction between data_stream and prefix_suffix mode
carsonip Jun 28, 2024
aa93bde
Update README
carsonip Jun 28, 2024
b17fb7c
Merge branch 'main' into route-on-data-stream-attributes-with-grouping
carsonip Jun 28, 2024
1a5e9ba
Remove mode config completely
carsonip Jun 28, 2024
2283d74
Remove other ref to modes
carsonip Jun 28, 2024
d2f7d37
Fix tests
carsonip Jun 28, 2024
17a0e2b
Clarify fallback behavior
carsonip Jun 28, 2024
8ac0cb1
Update changelog
carsonip Jun 28, 2024
1b7377d
Update issues
carsonip Jun 28, 2024
4b3b73b
Try to fix changelog
carsonip Jun 28, 2024
1470c92
Remove unused func
carsonip Jun 28, 2024
88c6687
Rename func
carsonip Jun 28, 2024
7bed538
Make linter happy
carsonip Jun 28, 2024
a61917f
Make linter happy again
carsonip Jun 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/elasticsearch

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add data stream routing; Add metrics grouping

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33755, 33756]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
58 changes: 40 additions & 18 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->

This exporter supports sending OpenTelemetry logs and traces to [Elasticsearch](https://www.elastic.co/elasticsearch).
This exporter supports sending logs, metrics and traces to [Elasticsearch](https://www.elastic.co/elasticsearch).

## Configuration options

Expand Down Expand Up @@ -83,32 +83,52 @@ The Elasticsearch exporter supports the common [`sending_queue` settings][export
### Elasticsearch document routing

Telemetry data will be written to signal specific data streams by default:
logs to `logs-generic-default`, and traces to `traces-generic-default`.
logs to `logs-generic-default`, metrics to `metrics-generic-default`, and traces to `traces-generic-default`.
This can be customised through the following settings:

- `index` (DEPRECATED, please use `logs_index` for logs, `traces_index` for traces): The [index] or [data stream] name to publish events to.
- `index` (DEPRECATED, please use `logs_index` for logs, `metrics_index` for metrics, `traces_index` for traces): The [index] or [data stream] name to publish events to.
The default value is `logs-generic-default`.

- `logs_index`: The [index] or [data stream] name to publish events to. The default value is `logs-generic-default`
- `logs_dynamic_index` (optional):
takes resource or log record attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `logs_index`. (priority: resource attribute > log record attribute)

- `logs_dynamic_index` (optional): uses resource or log record attributes to dynamically construct index name. See `mode` for details.
- `enabled`(default=false): Enable/Disable dynamic index for log records
- `metrics_index`: The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`.
- `mode` (default=`prefix_suffix`): defines how dynamic index name is constructed.
andrzej-stencel marked this conversation as resolved.
Show resolved Hide resolved
carsonip marked this conversation as resolved.
Show resolved Hide resolved
- `data_stream` - uses resource, scope or log record attributes `data_stream.dataset` and `data_stream.namespace`
to dynamically construct index name in the form `logs-${data_stream.dataset}-${data_stream.namespace}`.
Log record attributes take precedence over scope attributes, which take precedence over resource attributes.
- `prefix_suffix` - uses resource or log record attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
to dynamically construct index name in the form `${elasticsearch.index.prefix}${logs_index}${elasticsearch.index.suffix}`. (priority: resource attribute > log record attribute)

- `metrics_index` (optional): The [index] or [data stream] name to publish metrics to. The default value is `metrics-generic-default`.
⚠️ Note that metrics support is currently in development.
- `metrics_dynamic_index` (optional):
takes resource attributes named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `metrics_index`.

- `metrics_dynamic_index` (optional): uses resource, scope or data point attributes to dynamically construct index name. See `mode` for details.
⚠️ Note that metrics support is currently in development.
- `enabled`(default=false): Enable/Disable dynamic index for metrics
- `enabled`(default=true): Enable/disable dynamic index for metrics
- `mode` (default=`data_stream`): defines how dynamic index name is constructed.
- `data_stream` - uses resource, scope or data point attributes `data_stream.dataset` and `data_stream.namespace`
to dynamically construct index name in the form `metrics-${data_stream.dataset}-${data_stream.namespace}`.
Data point attributes take precedence over scope attributes, which take precedence over resource attributes.
- `prefix_suffix` - uses resource, scope or data point attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
to dynamically construct index name in the form `${elasticsearch.index.prefix}${metrics_index}${elasticsearch.index.suffix}`.
Data point attributes take precedence over scope attributes, which take precedence over resource attributes.

- `traces_index`: The [index] or [data stream] name to publish traces to. The default value is `traces-generic-default`.
- `traces_dynamic_index` (optional):
takes resource or span attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `traces_index`. (priority: resource attribute > span attribute)

- `traces_dynamic_index` (optional): uses resource or span attributes to dynamically construct index name. See `mode` for details.
- `enabled`(default=false): Enable/Disable dynamic index for trace spans
- `logstash_format` (optional): Logstash format compatibility. Traces or Logs data can be written into an index in logstash format.
- `enabled`(default=false): Enable/Disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `traces/logs_index` or `traces/logs_dynamic_index` as prefix and the date,
e.g: If `traces/logs_index` or `traces/logs_dynamic_index` is equals to `otlp-generic-default` your index will become `otlp-generic-default-YYYY.MM.DD`.
The last string appended belongs to the date when the data is being generated.
- `mode` (default=`prefix_suffix`): defines how dynamic index name is constructed.
- `data_stream` - uses resource attributes `data_stream.dataset` and `data_stream.namespace`
to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`.
Span attributes take precedence over scope attributes, which take precedence over resource attributes.
- `prefix_suffix` - uses resource or span attributes `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. (priority: resource attribute > span attribute)

- `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format.
- `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix,
e.g: If `logs_index` or `logs_dynamic_index` is equal to `logs-generic-default`, your index will become `logs-generic-default-YYYY.MM.DD`.
The last string appended belongs to the date when the data is being generated.
- `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date.
- `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name.

Expand Down Expand Up @@ -189,6 +209,8 @@ The only metric types supported are:

Other metric types (Histogram, Exponential Histogram, Summary) are ignored.

Dynamic indexing in `data_stream` mode is enabled by default for metrics. See `metrics_dynamic_index` configuration property for details.

[confighttp]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confighttp/README.md#http-configuration-settings
[configtls]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md#tls-configuration-settings
[configauth]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configauth/README.md#authentication-configuration
Expand Down
12 changes: 10 additions & 2 deletions exporter/elasticsearchexporter/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,16 @@ import "go.opentelemetry.io/collector/pdata/pcommon"

// dynamic index attribute key constants
const (
indexPrefix = "elasticsearch.index.prefix"
indexSuffix = "elasticsearch.index.suffix"
indexPrefix = "elasticsearch.index.prefix"
indexSuffix = "elasticsearch.index.suffix"
dataStreamDataset = "data_stream.dataset"
dataStreamNamespace = "data_stream.namespace"
dataStreamType = "data_stream.type"
defaultDataStreamDataset = "generic"
defaultDataStreamNamespace = "default"
defaultDataStreamTypeLogs = "logs"
defaultDataStreamTypeMetrics = "metrics"
defaultDataStreamTypeTraces = "traces"
)

// resource is higher priotized than record attribute
Expand Down
29 changes: 28 additions & 1 deletion exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"encoding"
"encoding/base64"
"errors"
"fmt"
Expand Down Expand Up @@ -80,7 +81,33 @@ type LogstashFormatSettings struct {
}

type DynamicIndexSetting struct {
Enabled bool `mapstructure:"enabled"`
Enabled bool `mapstructure:"enabled"`
Mode DynamicIndexMode `mapstructure:"mode"`
}

type DynamicIndexMode string

const DynamicIndexModeDataStream DynamicIndexMode = "data_stream"
const DynamicIndexModePrefixSuffix DynamicIndexMode = "prefix_suffix"

var _ encoding.TextUnmarshaler = (*DynamicIndexMode)(nil)

func (m *DynamicIndexMode) UnmarshalText(text []byte) error {
if m == nil {
return errors.New("cannot unmarshal to a nil *DynamicIndexMode")
}

str := string(text)
switch str {
case string(DynamicIndexModeDataStream):
*m = DynamicIndexModeDataStream
case string(DynamicIndexModePrefixSuffix):
*m = DynamicIndexModePrefixSuffix
default:
return fmt.Errorf("unknown dynamic index mode %s", str)
}

return nil
}

// AuthenticationSettings defines user authentication related settings.
Expand Down
104 changes: 89 additions & 15 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,24 @@ func TestConfig(t *testing.T) {
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"https://elastic.example.com:9200"},
Index: "",
LogsIndex: "logs-generic-default",
Endpoints: []string{"https://elastic.example.com:9200"},
Index: "",
LogsIndex: "logs-generic-default",
LogsDynamicIndex: DynamicIndexSetting{
Enabled: false,
Mode: DynamicIndexModePrefixSuffix,
},
MetricsIndex: "metrics-generic-default",
TracesIndex: "trace_index",
Pipeline: "mypipeline",
MetricsDynamicIndex: DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModeDataStream,
},
TracesIndex: "trace_index",
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
Mode: DynamicIndexModePrefixSuffix,
},
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
Expand Down Expand Up @@ -110,12 +122,24 @@ func TestConfig(t *testing.T) {
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "my_log_index",
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "my_log_index",
LogsDynamicIndex: DynamicIndexSetting{
Enabled: false,
Mode: DynamicIndexModePrefixSuffix,
},
MetricsIndex: "metrics-generic-default",
TracesIndex: "traces-generic-default",
Pipeline: "mypipeline",
MetricsDynamicIndex: DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModeDataStream,
},
TracesIndex: "traces-generic-default",
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
Mode: DynamicIndexModePrefixSuffix,
},
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
Expand Down Expand Up @@ -163,12 +187,24 @@ func TestConfig(t *testing.T) {
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "logs-generic-default",
Endpoints: []string{"http://localhost:9200"},
Index: "",
LogsIndex: "logs-generic-default",
LogsDynamicIndex: DynamicIndexSetting{
Enabled: false,
Mode: DynamicIndexModePrefixSuffix,
},
MetricsIndex: "my_metric_index",
TracesIndex: "traces-generic-default",
Pipeline: "mypipeline",
MetricsDynamicIndex: DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModeDataStream,
},
TracesIndex: "traces-generic-default",
TracesDynamicIndex: DynamicIndexSetting{
Enabled: false,
Mode: DynamicIndexModePrefixSuffix,
},
Pipeline: "mypipeline",
ClientConfig: confighttp.ClientConfig{
Timeout: 2 * time.Minute,
MaxIdleConns: &defaultMaxIdleConns,
Expand Down Expand Up @@ -239,6 +275,44 @@ func TestConfig(t *testing.T) {
cfg.Endpoint = "https://elastic.example.com:9200"
}),
},
{
id: component.NewIDWithName(metadata.Type, "data-stream-mode"),
configFile: "config.yaml",
expected: withDefaultConfig(func(cfg *Config) {
cfg.Endpoint = "https://elastic.example.com:9200"
cfg.LogsDynamicIndex = DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModeDataStream,
}
cfg.MetricsDynamicIndex = DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModeDataStream,
}
cfg.TracesDynamicIndex = DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModeDataStream,
}
}),
},
{
id: component.NewIDWithName(metadata.Type, "prefix-suffix-mode"),
configFile: "config.yaml",
expected: withDefaultConfig(func(cfg *Config) {
cfg.Endpoint = "https://elastic.example.com:9200"
cfg.LogsDynamicIndex = DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModePrefixSuffix,
}
cfg.MetricsDynamicIndex = DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModePrefixSuffix,
}
cfg.TracesDynamicIndex = DynamicIndexSetting{
Enabled: true,
Mode: DynamicIndexModePrefixSuffix,
}
}),
},
}

for _, tt := range tests {
Expand Down
65 changes: 65 additions & 0 deletions exporter/elasticsearchexporter/data-stream-router.go
andrzej-stencel marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"fmt"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

// routeLogRecord returns the name of the index to send the log record to according to data stream routing attributes.
// It searches for the routing attributes on the log record, scope, and resource.
// It creates missing routing attributes on the log record if they are not found.
func routeLogRecord(
record *plog.LogRecord,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
) string {
dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, record.Attributes(), scope.Attributes(), resource.Attributes())
namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, record.Attributes(), scope.Attributes(), resource.Attributes())
record.Attributes().PutStr(dataStreamType, defaultDataStreamTypeLogs)
return fmt.Sprintf("%s-%s-%s", defaultDataStreamTypeLogs, dataSet, namespace)
}

// routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes.
// It searches for the routing attributes on the data point, scope, and resource.
// It creates missing routing attributes on the data point if they are not found.
func routeDataPoint(
dataPoint pmetric.NumberDataPoint,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
) string {
dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, dataPoint.Attributes(), scope.Attributes(), resource.Attributes())
namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, dataPoint.Attributes(), scope.Attributes(), resource.Attributes())
dataPoint.Attributes().PutStr(dataStreamType, defaultDataStreamTypeMetrics)
return fmt.Sprintf("%s-%s-%s", defaultDataStreamTypeMetrics, dataSet, namespace)
}

// routeSpan returns the name of the index to send the span to according to data stream routing attributes.
// It searches for the routing attributes on the span, scope, and resource.
// It creates missing routing attributes on the span if they are not found.
func routeSpan(
span ptrace.Span,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
) string {
dataSet := ensureAttribute(dataStreamDataset, defaultDataStreamDataset, span.Attributes(), scope.Attributes(), resource.Attributes())
namespace := ensureAttribute(dataStreamNamespace, defaultDataStreamNamespace, span.Attributes(), scope.Attributes(), resource.Attributes())
span.Attributes().PutStr(dataStreamType, defaultDataStreamTypeTraces)
return fmt.Sprintf("%s-%s-%s", defaultDataStreamTypeTraces, dataSet, namespace)
}

func ensureAttribute(attributeName string, defaultValue string, recordAttributes, scopeAttributes, resourceAttributes pcommon.Map) string {
// Fetch value according to precedence and default.
value := getFromAttributesNew(attributeName, defaultValue, recordAttributes, scopeAttributes, resourceAttributes)

// Always set the value on the record, as record attributes have the highest precedence.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Always set the value on the record, as record attributes have the highest precedence.
// Set the attribute in case the default value is used.
//
// Always set the value on the record, as record attributes have the highest precedence.

I think? Should we only do this if value == defaultValue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scope attributes are not read in the encodeModel, meaning that only Resource and DataPoint attributes are added to the Document. This is aligned with the behavior of apm-data, where scope attributes are only read for data_stream.* and anything else is ignored, see code. Let me know if you find this behavior problematic.

Should we only do this if value == defaultValue

Therefore, the problem of doing this is that if data_stream.* is present in scope attributes, they will be missing from the resulting doc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks. Seems odd, but not something we need to change right now.

recordAttributes.PutStr(attributeName, value)

return value
}
Loading
Loading