Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] Add OTel mapping mode for metrics #34248

Merged
merged 35 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5815374
Add metrics grouping for otel mode
carsonip Jul 25, 2024
c816d4d
Add test
carsonip Jul 25, 2024
50a46fa
Add hack
carsonip Jul 25, 2024
786d0f0
Emit dynamic templates
carsonip Jul 25, 2024
4d03f98
Fix dynamic template name
carsonip Jul 26, 2024
98f5d7c
Workaround tsdb not supporting bool dimension
carsonip Jul 26, 2024
9d806e9
Temporarily use start_time instead of start_timestamp
carsonip Jul 26, 2024
2b5c24f
Revert to use start_timestamp
carsonip Jul 26, 2024
c8663a0
Add complex object to fix histogram falling into doc _ignored
carsonip Jul 29, 2024
fdcf8ca
Only emit start_timestamp when it is non-zero
carsonip Jul 29, 2024
91e10ae
Fix otel test
carsonip Jul 29, 2024
76ad969
Add sum to test
carsonip Jul 29, 2024
552a0b2
Fix tests
carsonip Jul 29, 2024
53b0253
Revert "Workaround tsdb not supporting bool dimension"
carsonip Jul 30, 2024
51bca38
Merge branch 'main' into otel-mode-metrics
carsonip Aug 12, 2024
8ded84c
Bump go-docappender to v2.3.0
carsonip Aug 12, 2024
497fbb5
go mod tidy
carsonip Aug 12, 2024
bcae04a
Remove go mod replace
carsonip Aug 12, 2024
6246e32
Fix NumberDataPointValueTypeEmpty dynamicTemplates
carsonip Aug 12, 2024
c48624f
Rename to KindUnflattenableObject
carsonip Aug 12, 2024
66b3cf4
Merge branch 'main' into otel-mode-metrics
carsonip Aug 12, 2024
0190df2
Fix typo
carsonip Aug 13, 2024
2998221
Fix quantiles
carsonip Aug 13, 2024
ff9d86a
Dynamically map summary
carsonip Aug 13, 2024
8ccf3a1
Use summary_metrics instead of summary_gauge
carsonip Aug 13, 2024
88128ac
Add FIXME
carsonip Aug 13, 2024
7086ea3
Exclude DS attr in metric hash
carsonip Aug 13, 2024
e371e53
Fix test
carsonip Aug 13, 2024
69ac0b3
Change quantiles from FIXME to TODO
carsonip Aug 13, 2024
dd1a70e
Merge branch 'main' into otel-mode-metrics
carsonip Aug 13, 2024
03228a4
Add changelog
carsonip Aug 13, 2024
2290cc0
Revert go mod diff
carsonip Aug 13, 2024
5c7d991
Make linter happy
carsonip Aug 13, 2024
20ed43f
Make ECS use mapHashExcludeDataStreamAttr
carsonip Aug 14, 2024
bd7cbd7
Describe metricDpToDynamicTemplate
carsonip Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_otel-mode-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add OTel mapping mode for metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a bit mysterious, can you add to what mapping we map to?

Suggested change
note: Add OTel mapping mode for metrics
note: Add support for Elastic ECS data model mapping from OpenTelemetry metrics. See <URL> for the mapping.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34248]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
13 changes: 7 additions & 6 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type bulkIndexer interface {

type bulkIndexerSession interface {
// Add adds a document to the bulk indexing session.
Add(ctx context.Context, index string, document io.WriterTo) error
Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error

// End must be called on the session object once it is no longer
// needed, in order to release any associated resources.
Expand Down Expand Up @@ -108,8 +108,8 @@ type syncBulkIndexerSession struct {
}

// Add adds an item to the sync bulk indexer session.
func (s *syncBulkIndexerSession) Add(_ context.Context, index string, document io.WriterTo) error {
return s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document})
func (s *syncBulkIndexerSession) Add(_ context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error {
return s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates})
}

// End is a no-op.
Expand Down Expand Up @@ -243,10 +243,11 @@ func (a *asyncBulkIndexer) Close(ctx context.Context) error {
// Add adds an item to the async bulk indexer session.
//
// Adding an item after a call to Close() will panic.
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo) error {
func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error {
item := docappender.BulkIndexerItem{
Index: index,
Body: document,
Index: index,
Body: document,
DynamicTemplates: dynamicTemplates,
}
select {
case <-ctx.Done():
Expand Down
6 changes: 3 additions & 3 deletions exporter/elasticsearchexporter/bulkindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestAsyncBulkIndexer_flushOnClose(t *testing.T) {
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, bulkIndexer.Close(context.Background()))
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestAsyncBulkIndexer_flush(t *testing.T) {
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
// should flush
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`)))
assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
// should flush
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load())
Expand Down
9 changes: 5 additions & 4 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (e *elasticsearchExporter) pushLogRecord(
if err != nil {
return fmt.Errorf("failed to encode log event: %w", err)
}
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document))
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil)
}

func (e *elasticsearchExporter) pushMetricsData(
Expand Down Expand Up @@ -215,7 +215,8 @@ func (e *elasticsearchExporter) pushMetricsData(
resourceDocs[fIndex] = make(map[uint32]objmodel.Document)
}

if err = e.model.upsertMetricDataPointValue(resourceDocs[fIndex], resource, scope, metric, dp, dpValue); err != nil {
if err = e.model.upsertMetricDataPointValue(resourceDocs[fIndex], resource,
resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), metric, dp, dpValue); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -290,7 +291,7 @@ func (e *elasticsearchExporter) pushMetricsData(
errs = append(errs, err)
continue
}
if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes)); err != nil {
if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), doc.DynamicTemplates()); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
Expand Down Expand Up @@ -397,5 +398,5 @@ func (e *elasticsearchExporter) pushTraceRecord(
if err != nil {
return fmt.Errorf("failed to encode trace record: %w", err)
}
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document))
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil)
}
75 changes: 74 additions & 1 deletion exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,9 @@ func TestExporterMetrics(t *testing.T) {
return itemsAllOK(docs)
})

exporter := newTestMetricsExporter(t, server.URL)
exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "ecs"
})
dp := pmetric.NewNumberDataPoint()
dp.SetDoubleValue(123.456)
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
Expand All @@ -519,6 +521,7 @@ func TestExporterMetrics(t *testing.T) {

exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
cfg.MetricsIndex = "metrics.index"
cfg.Mapping.Mode = "ecs"
})
metrics := newMetricsWithAttributeAndResourceMap(
map[string]string{
Expand Down Expand Up @@ -549,6 +552,7 @@ func TestExporterMetrics(t *testing.T) {

exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
cfg.MetricsIndex = "metrics.index"
cfg.Mapping.Mode = "ecs"
})
metrics := newMetricsWithAttributeAndResourceMap(
map[string]string{
Expand Down Expand Up @@ -767,6 +771,75 @@ func TestExporterMetrics(t *testing.T) {
assertItemsEqual(t, expected, rec.Items(), false)
})

t.Run("otel mode", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "otel"
})

metrics := pmetric.NewMetrics()
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()
scopeA := resourceMetrics.ScopeMetrics().AppendEmpty()
metricSlice := scopeA.Metrics()
fooMetric := metricSlice.AppendEmpty()
fooMetric.SetName("metric.foo")
fooDps := fooMetric.SetEmptyHistogram().DataPoints()
fooDp := fooDps.AppendEmpty()
fooDp.ExplicitBounds().FromRaw([]float64{1.0, 2.0, 3.0})
fooDp.BucketCounts().FromRaw([]uint64{1, 2, 3, 4})
fooOtherDp := fooDps.AppendEmpty()
fooOtherDp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0)))
fooOtherDp.ExplicitBounds().FromRaw([]float64{4.0, 5.0, 6.0})
fooOtherDp.BucketCounts().FromRaw([]uint64{4, 5, 6, 7})

sumMetric := metricSlice.AppendEmpty()
sumMetric.SetName("metric.sum")
sumDps := sumMetric.SetEmptySum().DataPoints()
sumDp := sumDps.AppendEmpty()
sumDp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0)))
sumDp.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(7200, 0)))
sumDp.SetDoubleValue(1.5)

summaryMetric := metricSlice.AppendEmpty()
summaryMetric.SetName("metric.summary")
summaryDps := summaryMetric.SetEmptySummary().DataPoints()
summaryDp := summaryDps.AppendEmpty()
summaryDp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(3*3600, 0)))
summaryDp.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(3*3600, 0)))
summaryDp.SetCount(1)
summaryDp.SetSum(1.5)

mustSendMetrics(t, exporter, metrics)

rec.WaitItems(2)

expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.foo":"histogram"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[1,2,3,4],"values":[0.5,1.5,2.5,3]}},"resource":{"dropped_attributes_count":0,"schema_url":""}}`),
},
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.foo":"histogram"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.foo":{"counts":[4,5,6,7],"values":[2,4.5,5.5,6]}},"resource":{"dropped_attributes_count":0,"schema_url":""}}`),
},
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.sum":"gauge_double"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.sum":1.5},"resource":{"dropped_attributes_count":0,"schema_url":""},"start_timestamp":"1970-01-01T02:00:00.000000000Z"}`),
},
{
Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.metric.summary":"summary_metrics"}}}`),
Document: []byte(`{"@timestamp":"1970-01-01T03:00:00.000000000Z","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"metric.summary":{"sum":1.5,"value_count":1}},"resource":{"dropped_attributes_count":0,"schema_url":""},"start_timestamp":"1970-01-01T03:00:00.000000000Z"}`),
},
}

assertItemsEqual(t, expected, rec.Items(), false)
})

t.Run("publish summary", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
Expand Down
32 changes: 29 additions & 3 deletions exporter/elasticsearchexporter/internal/objmodel/objmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ package objmodel // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"encoding/hex"
"io"
"maps"
"math"
"sort"
"strings"
Expand All @@ -48,7 +49,8 @@ import (
// Document is an intermediate representation for converting open telemetry records with arbitrary attributes
// into a JSON document that can be processed by Elasticsearch.
type Document struct {
fields []field
fields []field
dynamicTemplates map[string]string
}

type field struct {
Expand Down Expand Up @@ -81,6 +83,7 @@ const (
KindObject
KindTimestamp
KindIgnore
KindUnflattenableObject // Unflattenable object is an object that should not be flattened at serialization time
)

const tsLayout = "2006-01-02T15:04:05.000000000Z"
Expand All @@ -105,13 +108,24 @@ func DocumentFromAttributesWithPath(path string, am pcommon.Map) Document {

fields := make([]field, 0, am.Len())
fields = appendAttributeFields(fields, path, am)
return Document{fields}
return Document{fields: fields}
}

func (doc *Document) Clone() *Document {
fields := make([]field, len(doc.fields))
copy(fields, doc.fields)
return &Document{fields}
return &Document{fields: fields, dynamicTemplates: maps.Clone(doc.dynamicTemplates)}
}

func (doc *Document) AddDynamicTemplate(path, template string) {
if doc.dynamicTemplates == nil {
doc.dynamicTemplates = make(map[string]string)
}
doc.dynamicTemplates[path] = template
}

func (doc *Document) DynamicTemplates() map[string]string {
return doc.dynamicTemplates
}

// AddTimestamp adds a raw timestamp value to the Document.
Expand Down Expand Up @@ -293,6 +307,7 @@ func (doc *Document) iterJSONFlat(w *json.Visitor, otel bool) error {
// for current use cases and the proper fix will be slightly too complex. YAGNI.
var otelPrefixSet = map[string]struct{}{
"attributes.": {},
"metrics.": {},
}

func (doc *Document) iterJSONDedot(w *json.Visitor, otel bool) error {
Expand Down Expand Up @@ -422,6 +437,12 @@ func TimestampValue(ts time.Time) Value {
return Value{kind: KindTimestamp, ts: ts}
}

// UnflattenableObjectValue creates a unflattenable object from a map
func UnflattenableObjectValue(m pcommon.Map) Value {
sub := DocumentFromAttributes(m)
return Value{kind: KindUnflattenableObject, doc: sub}
}

// ValueFromAttribute converts a AttributeValue into a value.
func ValueFromAttribute(attr pcommon.Value) Value {
switch attr.Type() {
Expand Down Expand Up @@ -506,6 +527,11 @@ func (v *Value) iterJSON(w *json.Visitor, dedot bool, otel bool) error {
return w.OnNil()
}
return v.doc.iterJSON(w, dedot, otel)
case KindUnflattenableObject:
if len(v.doc.fields) == 0 {
return w.OnNil()
}
return v.doc.iterJSON(w, true, otel)
case KindArr:
if err := w.OnArrayStart(-1, structform.AnyType); err != nil {
return err
Expand Down
Loading
Loading