Skip to content

Commit

Permalink
[exporter/elasticsearch] OTel mode serialization (#33290)
Browse files Browse the repository at this point in the history
**Description:**

Implements OTel (OpenTelemetry-native) mode serialization for
elasticsearch exporter.
This is an initial cut in order to get the discussion going.
This is approach was tested as internal POC.

It leverages Elasticsearch ```"passthrough"``` fields mapping initially
introduced in Elasticsearch 8.13 allowing users to query the
document/scope/resources attributes as top level fields, making the ECS
queries compatible with OTel sematic convention schema. Another benefit
is the simplicity of conversion of stored document from Elasticsearch
back to Otel data model format.

The document/scope/resources attributes are dynamically mapped and
stored as flattened keys.

Here is an example of index template mappings with ```"passthrough"```
fields:
```
PUT _index_template/logs_otel
{
  "priority": 250,
  "template": {
    "settings": {
      "index": {
        "lifecycle": {
          "name": "logs"
        },
        "codec": "best_compression",
        "mapping": {
          "ignore_malformed": "true"
        }
      }
    },
    "mappings": {
      "_source": {
        "enabled": true
      },
      "date_detection": false,
      "dynamic": "strict",
      "dynamic_templates": [
        {
          "all_strings_to_keywords": {
            "mapping": {
              "ignore_above": 1024,
              "type": "keyword"
            },
            "match_mapping_type": "string"
          }
        },
        {
          "complex_attributes": {
            "path_match": [
              "resource.attributes.*",
              "scope.attributes.*",
              "attributes.*"
            ],
            "match_mapping_type": "object",
            "mapping": {
              "type": "flattened"
            }
          }
        }
      ],
      "properties": {
        "@timestamp": {
          "type": "date_nanos",
          "ignore_malformed": false
        },
        "data_stream": {
          "type": "object",
          "properties": {
            "type": {
              "type": "constant_keyword"
            },
            "dataset": {
              "type": "constant_keyword"
            },
            "namespace": {
              "type": "constant_keyword"
            }
          }
        },
        "observed_timestamp": {
          "type": "date_nanos",
          "ignore_malformed": true
        },
        "severity_number": {
          "type": "long"
        },
        "severity_text": {
          "type": "keyword"
        },
        "body_text": {
          "type": "match_only_text"
        },
        "body_structured": {
          "type": "flattened"
        },
        "attributes": {
          "type": "passthrough",
          "dynamic": true,
          "priority": 2
        },
        "dropped_attributes_count": {
          "type": "long"
        },
        "trace_flags": {
          "type": "byte"
        },
        "trace_id": {
          "type": "keyword"
        },
        "span_id": {
          "type": "keyword"
        },
        "scope": {
          "properties": {
            "name": {
              "type": "keyword"
            },
            "version": {
              "type": "keyword"
            },
            "attributes": {
              "type": "passthrough",
              "dynamic": true,
              "priority": 1
            },
            "dropped_attributes_count": {
              "type": "long"
            },
            "schema_url": {
              "type": "keyword"
            }
          }
        },
        "resource": {
          "properties": {
            "dropped_attributes_count": {
              "type": "long"
            },
            "schema_url": {
              "type": "keyword"
            },
            "attributes": {
              "type": "passthrough",
              "dynamic": true,
              "priority": 0
            }
          }
        }
      }
    }
  },
  "index_patterns": [
    "logs-*.otel-*"
  ],
  "data_stream": {}
}
```

Here is an example of the auditd document in Elasticsearch abbreviated:
```
{
    "@timestamp": "2024-05-29T13:30:25.085926000Z",
    "attributes": {
        "foo": "bar",
        "some.bool": true
    },
    "body_structured": {
        "MESSAGE": "AVC apparmor=\"STATUS\" operation=\"profile_replace\" info=\"same as current profile, skipping\" profile=\"unconfined\" name=\"/usr/bin/evince-previewer\" pid=2702 comm=\"apparmor_parser\"",
        "SYSLOG_FACILITY": "4",
        "SYSLOG_IDENTIFIER": "audit",
        "_SOURCE_REALTIME_TIMESTAMP": "1716989425080000",
        "_TRANSPORT": "audit",
    },
    "dropped_attributes_count": 0,
    "observed_timestamp": "2024-05-29T14:49:26.534908898Z",
    "resource": {
        "attributes": {
            "data_stream.dataset": "auditd.otel",
            "data_stream.namespace": "default",
            "data_stream.type": "logs",
            "host.arch": "arm64",
            "host.cpu.cache.l2.size": 0,
            "host.cpu.family": "",
            "host.cpu.model.id": "0x000",
            "host.cpu.model.name": "",
            "host.cpu.stepping": "0",
            "host.cpu.vendor.id": "Apple",
            "host.id": "cae0e0147d454a80971b0b747c8b62b9",
            "host.ip": [
                "172.16.3.131",
                "fe80::20c:29ff:fe66:3012",
            "host.name": "lebuntu",
            "host.os.description": "Ubuntu 22.04.4 LTS (Jammy Jellyfish) (Linux lebuntu 5.15.0-107-generic #117-Ubuntu SMP Mon Apr 29 14:37:09 UTC 2024 aarch64)",
            "host.os.type": "linux",
            "os.description": "Ubuntu 22.04.4 LTS (Jammy Jellyfish) (Linux lebuntu 5.15.0-107-generic #117-Ubuntu SMP Mon Apr 29 14:37:09 UTC 2024 aarch64)",
            "os.type": "linux"
        },
        "dropped_attributes_count": 0,
        "schema_url": "https://opentelemetry.io/schemas/1.6.1"
    },
    "severity_number": 0,
    "trace_flags": 0
}
```

Here is an example of ECS compatible query that works on this Otel
native schema:
```
GET logs-auditd.otel-default/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "host.name": "lebuntu"
          }
        }
      ]
    }
  }
}
```


**Link to tracking Issue:**
No tracking issue yet.

**Testing:**
Added unit test for OTel transformation.
Tested with journald OTel receiver. 

**Documentation:**
No documentation is added yet.

---------

Co-authored-by: Felix Barnsteiner <felixbarny@users.noreply.github.com>
Co-authored-by: Carson Ip <carsonip@users.noreply.github.com>
  • Loading branch information
3 people committed Jul 23, 2024
1 parent 1c98261 commit 13366cc
Show file tree
Hide file tree
Showing 10 changed files with 647 additions and 39 deletions.
27 changes: 27 additions & 0 deletions .chloggen/feature_elasticsearch_otel_model.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Introduce an experimental OTel native mapping mode for logs

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33290]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
4 changes: 4 additions & 0 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ behaviours, which may be configured through the following settings:
- `mode` (default=none): The fields naming mode. valid modes are:
- `none`: Use original fields and event structure from the OTLP event.
- `ecs`: Try to map fields to [Elastic Common Schema (ECS)][ECS]
- `otel`: Elastic's preferred "OTel-native" mapping mode. Uses original fields and event structure from the OTLP event.
:warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes.
There's a special treatment for the following attributes: `data_stream.type`, `data_stream.dataset`, `data_stream.namespace`. Instead of serializing these values under the `*attributes.*` namespace, they're put at the root of the document, to conform with the conventions of the data stream naming scheme that maps these as `constant_keyword` fields.

- `raw`: Omit the `Attributes.` string prefixed to field names for log and
span attributes as well as omit the `Events.` string prefixed to
field names for span events.
Expand Down
4 changes: 4 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ type MappingMode int
const (
MappingNone MappingMode = iota
MappingECS
MappingOTel
MappingRaw
)

Expand All @@ -193,6 +194,8 @@ func (m MappingMode) String() string {
return ""
case MappingECS:
return "ecs"
case MappingOTel:
return "otel"
case MappingRaw:
return "raw"
default:
Expand All @@ -205,6 +208,7 @@ var mappingModes = func() map[string]MappingMode {
for _, m := range []MappingMode{
MappingNone,
MappingECS,
MappingOTel,
MappingRaw,
} {
table[strings.ToLower(m.String())] = m
Expand Down
18 changes: 15 additions & 3 deletions exporter/elasticsearchexporter/data_stream_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace strin
pcommon.Map,
pcommon.Map,
string,
bool,
) string {
return func(
recordAttr pcommon.Map,
scopeAttr pcommon.Map,
resourceAttr pcommon.Map,
fIndex string,
otel bool,
) string {
// Order:
// 1. read data_stream.* from attributes
Expand All @@ -37,6 +39,13 @@ func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace strin
return fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)
}
}

// The naming convention for datastream is expected to be "logs-[dataset].otel-[namespace]".
// This is in order to match the soon to be built-in logs-*.otel-* index template.
if otel {
dataset += ".otel"
}

recordAttr.PutStr(dataStreamDataset, dataset)
recordAttr.PutStr(dataStreamNamespace, namespace)
recordAttr.PutStr(dataStreamType, defaultDSType)
Expand All @@ -51,9 +60,10 @@ func routeLogRecord(
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
otel bool,
) string {
route := routeWithDefaults(defaultDataStreamTypeLogs, defaultDataStreamDataset, defaultDataStreamNamespace)
return route(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex)
return route(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
}

// routeDataPoint returns the name of the index to send the data point to according to data stream routing attributes.
Expand All @@ -63,9 +73,10 @@ func routeDataPoint(
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
otel bool,
) string {
route := routeWithDefaults(defaultDataStreamTypeMetrics, defaultDataStreamDataset, defaultDataStreamNamespace)
return route(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex)
return route(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
}

// routeSpan returns the name of the index to send the span to according to data stream routing attributes.
Expand All @@ -75,7 +86,8 @@ func routeSpan(
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
otel bool,
) string {
route := routeWithDefaults(defaultDataStreamTypeTraces, defaultDataStreamDataset, defaultDataStreamNamespace)
return route(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex)
return route(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
}
78 changes: 78 additions & 0 deletions exporter/elasticsearchexporter/data_stream_router_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package elasticsearchexporter

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

type routeTestInfo struct {
name string
otel bool
want string
}

func createRouteTests(dsType string) []routeTestInfo {
renderWantRoute := func(dsType string, otel bool) string {
if otel {
return fmt.Sprintf("%s-%s.otel-%s", dsType, defaultDataStreamDataset, defaultDataStreamNamespace)
}
return fmt.Sprintf("%s-%s-%s", dsType, defaultDataStreamDataset, defaultDataStreamNamespace)
}

return []routeTestInfo{
{
name: "default",
otel: false,
want: renderWantRoute(dsType, false),
},
{
name: "otel",
otel: true,
want: renderWantRoute(dsType, true),
},
}
}

func TestRouteLogRecord(t *testing.T) {

tests := createRouteTests(defaultDataStreamTypeLogs)

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ds := routeLogRecord(plog.NewLogRecord(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel)
assert.Equal(t, tc.want, ds)
})
}
}

func TestRouteDataPoint(t *testing.T) {

tests := createRouteTests(defaultDataStreamTypeMetrics)

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ds := routeDataPoint(pmetric.NewNumberDataPoint(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel)
assert.Equal(t, tc.want, ds)
})
}
}

func TestRouteSpan(t *testing.T) {

tests := createRouteTests(defaultDataStreamTypeTraces)

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ds := routeSpan(ptrace.NewSpan(), plog.NewScopeLogs().Scope(), plog.NewResourceLogs().Resource(), "", tc.otel)
assert.Equal(t, tc.want, ds)
})
}
}
16 changes: 11 additions & 5 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type elasticsearchExporter struct {
logstashFormat LogstashFormatSettings
dynamicIndex bool
model mappingModel
otel bool

bulkIndexer bulkIndexer
}
Expand All @@ -49,6 +50,8 @@ func newExporter(
mode: cfg.MappingMode(),
}

otel := model.mode == MappingOTel

userAgent := fmt.Sprintf(
"%s/%s (%s/%s)",
set.BuildInfo.Description,
Expand All @@ -66,6 +69,7 @@ func newExporter(
dynamicIndex: dynamicIndex,
model: model,
logstashFormat: cfg.LogstashFormat,
otel: otel,
}, nil
}

Expand Down Expand Up @@ -107,7 +111,7 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
scope := ill.Scope()
logs := ill.LogRecords()
for k := 0; k < logs.Len(); k++ {
if err := e.pushLogRecord(ctx, resource, logs.At(k), scope, session); err != nil {
if err := e.pushLogRecord(ctx, resource, rl.SchemaUrl(), logs.At(k), scope, ill.SchemaUrl(), session); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}
Expand All @@ -130,13 +134,15 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
func (e *elasticsearchExporter) pushLogRecord(
ctx context.Context,
resource pcommon.Resource,
resourceSchemaURL string,
record plog.LogRecord,
scope pcommon.InstrumentationScope,
scopeSchemaURL string,
bulkIndexerSession bulkIndexerSession,
) error {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeLogRecord(record, scope, resource, fIndex)
fIndex = routeLogRecord(record, scope, resource, fIndex, e.otel)
}

if e.logstashFormat.Enabled {
Expand All @@ -147,7 +153,7 @@ func (e *elasticsearchExporter) pushLogRecord(
fIndex = formattedIndex
}

document, err := e.model.encodeLog(resource, record, scope)
document, err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL)
if err != nil {
return fmt.Errorf("failed to encode log event: %w", err)
}
Expand Down Expand Up @@ -279,7 +285,7 @@ func (e *elasticsearchExporter) getMetricDataPointIndex(
) (string, error) {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeDataPoint(dataPoint, scope, resource, fIndex)
fIndex = routeDataPoint(dataPoint, scope, resource, fIndex, e.otel)
}

if e.logstashFormat.Enabled {
Expand Down Expand Up @@ -342,7 +348,7 @@ func (e *elasticsearchExporter) pushTraceRecord(
) error {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeSpan(span, scope, resource, fIndex)
fIndex = routeSpan(span, scope, resource, fIndex, e.otel)
}

if e.logstashFormat.Enabled {
Expand Down
41 changes: 29 additions & 12 deletions exporter/elasticsearchexporter/internal/objmodel/objmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,19 +244,19 @@ func (doc *Document) Dedup() {
// Serialize writes the document to the given writer. The serializer will create nested objects if dedot is true.
//
// NOTE: The documented MUST be sorted if dedot is true.
func (doc *Document) Serialize(w io.Writer, dedot bool) error {
func (doc *Document) Serialize(w io.Writer, dedot bool, otel bool) error {
v := json.NewVisitor(w)
return doc.iterJSON(v, dedot)
return doc.iterJSON(v, dedot, otel)
}

func (doc *Document) iterJSON(v *json.Visitor, dedot bool) error {
func (doc *Document) iterJSON(v *json.Visitor, dedot bool, otel bool) error {
if dedot {
return doc.iterJSONDedot(v)
return doc.iterJSONDedot(v, otel)
}
return doc.iterJSONFlat(v)
return doc.iterJSONFlat(v, otel)
}

func (doc *Document) iterJSONFlat(w *json.Visitor) error {
func (doc *Document) iterJSONFlat(w *json.Visitor, otel bool) error {
err := w.OnObjectStart(-1, structform.AnyType)
if err != nil {
return err
Expand All @@ -275,15 +275,22 @@ func (doc *Document) iterJSONFlat(w *json.Visitor) error {
return err
}

if err := fld.value.iterJSON(w, true); err != nil {
if err := fld.value.iterJSON(w, true, otel); err != nil {
return err
}
}

return nil
}

func (doc *Document) iterJSONDedot(w *json.Visitor) error {
// Set of prefixes for the OTel attributes that needs to stay flattened
var otelPrefixSet = map[string]struct{}{
"attributes.": {},
"resource.attributes.": {},
"scope.attributes.": {},
}

func (doc *Document) iterJSONDedot(w *json.Visitor, otel bool) error {
objPrefix := ""
level := 0

Expand Down Expand Up @@ -335,6 +342,16 @@ func (doc *Document) iterJSONDedot(w *json.Visitor) error {

// increase object level up to current field
for {

// Otel mode serialization
if otel {
// Check the prefix
_, isOtelPrefix := otelPrefixSet[objPrefix]
if isOtelPrefix {
break
}
}

start := len(objPrefix)
idx := strings.IndexByte(key[start:], '.')
if idx < 0 {
Expand All @@ -357,7 +374,7 @@ func (doc *Document) iterJSONDedot(w *json.Visitor) error {
if err := w.OnKey(fieldName); err != nil {
return err
}
if err := fld.value.iterJSON(w, true); err != nil {
if err := fld.value.iterJSON(w, true, otel); err != nil {
return err
}
}
Expand Down Expand Up @@ -460,7 +477,7 @@ func (v *Value) IsEmpty() bool {
}
}

func (v *Value) iterJSON(w *json.Visitor, dedot bool) error {
func (v *Value) iterJSON(w *json.Visitor, dedot bool, otel bool) error {
switch v.kind {
case KindNil:
return w.OnNil()
Expand All @@ -483,13 +500,13 @@ func (v *Value) iterJSON(w *json.Visitor, dedot bool) error {
if len(v.doc.fields) == 0 {
return w.OnNil()
}
return v.doc.iterJSON(w, dedot)
return v.doc.iterJSON(w, dedot, otel)
case KindArr:
if err := w.OnArrayStart(-1, structform.AnyType); err != nil {
return err
}
for i := range v.arr {
if err := v.arr[i].iterJSON(w, dedot); err != nil {
if err := v.arr[i].iterJSON(w, dedot, otel); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit 13366cc

Please sign in to comment.