From 74a3a446221b63a38f0042d902ffb876b11ce659 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Thu, 4 Mar 2021 14:06:20 -0600 Subject: [PATCH] [Heartbeat] Handle data streams (#24223) Handles data streams from fleet, the current heartbeat code doesn't handle data_streams at all, this fixes that. Additionally, it hoists the id from the top level of the yaml config, and merges both levels of data_streams, since one is at the input level, and the other is at the stream level. Additionally, this cleans up the data streams code significantly, introducing a new add_data_streams_index processor that: More efficiently formats index names for data streams Allows individual events to override the dataset (useful for synthetics where we have a base browser dataset, but also browser_screenshot and browser_network for extended data that can take up lots of space, and often requires a different ILM policy). --- CHANGELOG.next.asciidoc | 1 + heartbeat/monitors/factory.go | 62 ++++------- heartbeat/monitors/factory_test.go | 19 +++- heartbeat/monitors/stdfields/unnest.go | 53 +++++++++ heartbeat/monitors/stdfields/unnest_test.go | 103 ++++++++++++++++++ .../add_data_stream_index.go | 101 +++++++++++++++++ .../add_data_stream_index_test.go | 87 +++++++++++++++ .../add_data_stream_index/datastream.go | 18 +++ .../monitors/browser/synthexec/enrich.go | 6 + .../sample-synthetics-config/heartbeat.yml | 8 +- 10 files changed, 413 insertions(+), 45 deletions(-) create mode 100644 heartbeat/monitors/stdfields/unnest.go create mode 100644 heartbeat/monitors/stdfields/unnest_test.go create mode 100644 libbeat/processors/add_data_stream_index/add_data_stream_index.go create mode 100644 libbeat/processors/add_data_stream_index/add_data_stream_index_test.go create mode 100644 libbeat/processors/add_data_stream_index/datastream.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 08c29e07da7..75207d5dedc 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -879,6 +879,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add mime type detection for http responses. {pull}22976[22976] - Bundle synthetics deps with heartbeat docker image. {pull}23274[23274] +- Handle datastreams for fleet. {pull}24223[24223] - Add --sandbox option for browser monitor. {pull}24172[24172] *Journalbeat* diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index b20df49def1..c71103c083b 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -18,15 +18,15 @@ package monitors import ( - "fmt" - "github.com/elastic/beats/v7/heartbeat/monitors/plugin" + "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/scheduler" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index" "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" ) @@ -52,16 +52,10 @@ type publishSettings struct { KeepNull bool `config:"keep_null"` // Output meta data settings - Pipeline string `config:"pipeline"` // ES Ingest pipeline name - Index fmtstr.EventFormatString `config:"index"` // ES output index pattern - DataStream *datastream `config:"data_stream"` - DataSet string `config:"dataset"` -} - -type datastream struct { - Namespace string `config:"namespace"` - Dataset string `config:"dataset"` - Type string `config:"type"` + Pipeline string `config:"pipeline"` // ES Ingest pipeline name + Index fmtstr.EventFormatString `config:"index"` // ES output index pattern + DataStream *add_data_stream_index.DataStream `config:"data_stream"` + DataSet string `config:"dataset"` } // NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects. @@ -71,6 +65,11 @@ func NewFactory(info beat.Info, sched *scheduler.Scheduler, allowWatches bool) * // Create makes a new Runner for a new monitor with the given Config. func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runner, error) { + c, err := stdfields.UnnestStream(c) + if err != nil { + return nil, err + } + configEditor, err := newCommonPublishConfigs(f.info, c) if err != nil { return nil, err @@ -92,7 +91,12 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi return nil, err } - indexProcessor, err := setupIndexProcessor(info, settings) + stdFields, err := stdfields.ConfigToStdMonitorFields(cfg) + if err != nil { + return nil, err + } + + indexProcessor, err := setupIndexProcessor(info, settings, stdFields.Type) if err != nil { return nil, err } @@ -147,35 +151,17 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi }, nil } -func setupIndexProcessor(info beat.Info, settings publishSettings) (processors.Processor, error) { +func setupIndexProcessor(info beat.Info, settings publishSettings, dataset string) (processors.Processor, error) { var indexProcessor processors.Processor if settings.DataStream != nil { - namespace := settings.DataStream.Namespace - if namespace == "" { - namespace = "default" - } - typ := settings.DataStream.Type - if typ == "" { - typ = "synthetics" - } - - dataset := settings.DataStream.Dataset - if dataset == "" { - dataset = "generic" + ds := settings.DataStream + if ds.Type == "" { + ds.Type = "synthetics" } - - index := fmt.Sprintf( - "%s-%s-%s", - typ, - dataset, - namespace, - ) - compiled, err := fmtstr.CompileEvent(index) - if err != nil { - return nil, fmt.Errorf("could not compile datastream: '%s', this should never happen: %w", index, err) - } else { - settings.Index = *compiled + if ds.Dataset == "" { + ds.Dataset = dataset } + return add_data_stream_index.New(*ds), nil } if !settings.Index.IsEmpty() { diff --git a/heartbeat/monitors/factory_test.go b/heartbeat/monitors/factory_test.go index caa681985d7..7c515885421 100644 --- a/heartbeat/monitors/factory_test.go +++ b/heartbeat/monitors/factory_test.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" + "github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index" ) func TestSetupIndexProcessor(t *testing.T) { @@ -37,38 +38,43 @@ func TestSetupIndexProcessor(t *testing.T) { tests := map[string]struct { settings publishSettings expectedIndex string + monitorType string wantProc bool wantErr bool }{ "no settings should yield no processor": { publishSettings{}, "", + "browser", false, false, }, "exact index should be used exactly": { publishSettings{Index: *fmtstr.MustCompileEvent("test")}, "test", + "browser", true, false, }, "data stream should be type-namespace-dataset": { publishSettings{ - DataStream: &datastream{ - Type: "myType", - Dataset: "myDataset", + DataStream: &add_data_stream_index.DataStream{ Namespace: "myNamespace", + Dataset: "myDataset", + Type: "myType", }, }, "myType-myDataset-myNamespace", + "myType", true, false, }, "data stream should use defaults": { publishSettings{ - DataStream: &datastream{}, + DataStream: &add_data_stream_index.DataStream{}, }, - "synthetics-generic-default", + "synthetics-browser-default", + "browser", true, false, }, @@ -77,7 +83,7 @@ func TestSetupIndexProcessor(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { e := beat.Event{Meta: common.MapStr{}, Fields: common.MapStr{}} - proc, err := setupIndexProcessor(binfo, tt.settings) + proc, err := setupIndexProcessor(binfo, tt.settings, tt.monitorType) if tt.wantErr == true { require.Error(t, err) return @@ -89,6 +95,7 @@ func TestSetupIndexProcessor(t *testing.T) { return } + require.NotNil(t, proc) _, err = proc.Run(&e) require.Equal(t, tt.expectedIndex, e.Meta[events.FieldMetaRawIndex]) }) diff --git a/heartbeat/monitors/stdfields/unnest.go b/heartbeat/monitors/stdfields/unnest.go new file mode 100644 index 00000000000..05f0de7e1ca --- /dev/null +++ b/heartbeat/monitors/stdfields/unnest.go @@ -0,0 +1,53 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stdfields + +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/common" +) + +// OptionalStream represents a config that has a stream set, which in practice +// means agent/fleet. In this case we primarily use the first stream for the +// config, but we do pull the Id from the root, and merge the root data stream +// in as well +type OptionalStream struct { + Id string `config:"id"` + DataStream *common.Config `config:"data_stream"` + Streams []*common.Config `config:"streams"` +} + +// UnnestStream detects configs that come from fleet and transforms the config into something compatible +// with heartbeat, by mixing some fields (id, data_stream) with those from the first stream. It assumes +// that there is exactly one stream associated with the input. +func UnnestStream(config *common.Config) (res *common.Config, err error) { + optS := &OptionalStream{} + err = config.Unpack(optS) + if err != nil { + return nil, fmt.Errorf("could not unnest stream: %w", err) + } + + if len(optS.Streams) == 0 { + return config, nil + } + + res = optS.Streams[0] + err = res.MergeWithOpts(common.MapStr{"id": optS.Id, "data_stream": optS.DataStream}) + return +} diff --git a/heartbeat/monitors/stdfields/unnest_test.go b/heartbeat/monitors/stdfields/unnest_test.go new file mode 100644 index 00000000000..cb9450c6327 --- /dev/null +++ b/heartbeat/monitors/stdfields/unnest_test.go @@ -0,0 +1,103 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stdfields + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/go-lookslike" + "github.com/elastic/go-lookslike/testslike" + "github.com/elastic/go-lookslike/validator" +) + +func TestUnnestStream(t *testing.T) { + type testCase struct { + name string + cfg common.MapStr + v validator.Validator + } + tests := []testCase{ + { + name: "simple", + cfg: common.MapStr{ + "id": "myuuid", + "streams": []common.MapStr{ + common.MapStr{ + "streamid": "mystreamid", + "data_stream": common.MapStr{ + "namespace": "mynamespace", + "dataset": "mydataset", + "type": "mytype", + }, + }, + }, + }, + v: lookslike.MustCompile(common.MapStr{ + "id": "myuuid", + "data_stream": common.MapStr{ + "namespace": "mynamespace", + "dataset": "mydataset", + "type": "mytype", + }, + }), + }, + { + name: "split data stream", + cfg: common.MapStr{ + "id": "myuuid", + "data_stream": common.MapStr{ + "type": "mytype", + }, + "streams": []common.MapStr{ + common.MapStr{ + "data_stream": common.MapStr{ + "namespace": "mynamespace", + "dataset": "mydataset", + }, + }, + }, + }, + v: lookslike.MustCompile(common.MapStr{ + "id": "myuuid", + "data_stream": common.MapStr{ + "namespace": "mynamespace", + "dataset": "mydataset", + "type": "mytype", + }, + }), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + src, err := common.NewConfigFrom(test.cfg) + require.NoError(t, err) + + unnested, err := UnnestStream(src) + require.NoError(t, err) + + unpacked := common.MapStr{} + err = unnested.Unpack(unpacked) + require.NoError(t, err) + testslike.Test(t, test.v, unpacked) + }) + } +} diff --git a/libbeat/processors/add_data_stream_index/add_data_stream_index.go b/libbeat/processors/add_data_stream_index/add_data_stream_index.go new file mode 100644 index 00000000000..7d770c6f082 --- /dev/null +++ b/libbeat/processors/add_data_stream_index/add_data_stream_index.go @@ -0,0 +1,101 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_data_stream_index + +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/beat/events" + "github.com/elastic/beats/v7/libbeat/common" +) + +const FieldMetaCustomDataset = "dataset" + +func SetEventDataset(event *beat.Event, ds string) { + if event.Meta == nil { + event.Meta = common.MapStr{ + FieldMetaCustomDataset: ds, + } + } else { + event.Meta[FieldMetaCustomDataset] = ds + } +} + +// AddDataStreamIndex is a Processor to set an event's "raw_index" metadata field +// based on the given type, dataset, and namespace fields. +// If the event's metadata contains an +type AddDataStreamIndex struct { + DataStream DataStream + // cached, compiled version of the index name derived from the data stream + dsCached string + customDsCache string +} + +// New returns a new AddDataStreamIndex processor. +func New(ds DataStream) *AddDataStreamIndex { + if ds.Namespace == "" { + ds.Namespace = "default" + } + if ds.Dataset == "" { + ds.Dataset = "generic" + } + return &AddDataStreamIndex{ + DataStream: ds, + dsCached: ds.indexName(), + customDsCache: ds.datasetFmtString(), + } +} + +// Run runs the processor. +func (p *AddDataStreamIndex) Run(event *beat.Event) (*beat.Event, error) { + if event.Meta == nil { + event.Meta = common.MapStr{ + events.FieldMetaRawIndex: p.dsCached, + } + } else { + customDs, hasCustom := event.Meta[FieldMetaCustomDataset] + if !hasCustom { + event.Meta[events.FieldMetaRawIndex] = p.dsCached + } else { + event.Meta[events.FieldMetaRawIndex] = fmt.Sprintf(p.customDsCache, customDs) + } + } + + return event, nil +} + +func (p *AddDataStreamIndex) String() string { + return fmt.Sprintf("add_data_stream_index=%v", p.DataStream.indexName()) +} + +// DataStream represents the 3-tuple + configuration metadata since it +// can be convenient to import this into other contexts. +type DataStream struct { + Namespace string `config:"namespace"` + Dataset string `config:"dataset"` + Type string `config:"type"` +} + +func (ds DataStream) datasetFmtString() string { + return fmt.Sprintf("%s-%%s-%s", ds.Type, ds.Namespace) +} + +func (ds DataStream) indexName() string { + return fmt.Sprintf(ds.datasetFmtString(), ds.Dataset) +} diff --git a/libbeat/processors/add_data_stream_index/add_data_stream_index_test.go b/libbeat/processors/add_data_stream_index/add_data_stream_index_test.go new file mode 100644 index 00000000000..d71181e3127 --- /dev/null +++ b/libbeat/processors/add_data_stream_index/add_data_stream_index_test.go @@ -0,0 +1,87 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_data_stream_index + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/beat/events" + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestAddDataStreamIndex(t *testing.T) { + simpleDs := DataStream{ + "myns", + "myds", + "mytype", + } + tests := []struct { + name string + ds DataStream + event *beat.Event + want string + wantErr bool + }{ + { + "simple", + simpleDs, + &beat.Event{}, + "mytype-myds-myns", + false, + }, + { + "existing meta", + simpleDs, + &beat.Event{Meta: common.MapStr{}}, + "mytype-myds-myns", + false, + }, + { + "custom ds", + simpleDs, + &beat.Event{Meta: common.MapStr{ + FieldMetaCustomDataset: "custom-ds", + }}, + "mytype-custom-ds-myns", + false, + }, + { + "defaults ds/ns", + DataStream{ + Type: "mytype", + }, + &beat.Event{}, + "mytype-generic-default", + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := New(tt.ds) + got, err := p.Run(tt.event) + if (err != nil) != tt.wantErr { + t.Errorf("Run() error = %v, wantErr %v", err, tt.wantErr) + return + } + require.Equal(t, tt.want, got.Meta[events.FieldMetaRawIndex]) + }) + } +} diff --git a/libbeat/processors/add_data_stream_index/datastream.go b/libbeat/processors/add_data_stream_index/datastream.go new file mode 100644 index 00000000000..c2a69dc5214 --- /dev/null +++ b/libbeat/processors/add_data_stream_index/datastream.go @@ -0,0 +1,18 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_data_stream_index diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go index 69d8913128d..4e3e05e2170 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go @@ -8,6 +8,8 @@ import ( "fmt" "time" + "github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index" + "github.com/gofrs/uuid" "github.com/elastic/beats/v7/heartbeat/eventext" @@ -105,6 +107,10 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e return je.createSummary(event) case "step/end": je.stepCount++ + case "step/screenshot": + add_data_stream_index.SetEventDataset(event, "browser_screenshot") + case "journey/network_info": + add_data_stream_index.SetEventDataset(event, "browser_network") } eventext.MergeEventFields(event, se.ToMap()) diff --git a/x-pack/heartbeat/sample-synthetics-config/heartbeat.yml b/x-pack/heartbeat/sample-synthetics-config/heartbeat.yml index bebebdb4673..74fc2f4d885 100644 --- a/x-pack/heartbeat/sample-synthetics-config/heartbeat.yml +++ b/x-pack/heartbeat/sample-synthetics-config/heartbeat.yml @@ -9,6 +9,8 @@ heartbeat.monitors: enabled: true id: todos-suite name: Todos Suite + data_stream: + namespace: myns source: local: path: "/home/andrewvc/projects/synthetics/examples/todos/" @@ -19,10 +21,14 @@ heartbeat.monitors: urls: http://www.google.com schedule: "@every 15s" name: Simple HTTP + data_stream: + namespace: myns - type: browser - enabled: true + enabled: false id: my-monitor name: My Monitor + data_stream: + namespace: myns source: inline: script: