Skip to content

Commit

Permalink
[Heartbeat] Handle data streams (#24223)
Browse files Browse the repository at this point in the history
Handles data streams from fleet, the current heartbeat code doesn't handle data_streams at all, this fixes that. Additionally, it hoists the id from the top level of the yaml config, and merges both levels of data_streams, since one is at the input level, and the other is at the stream level.

Additionally, this cleans up the data streams code significantly, introducing a new add_data_streams_index processor that:

    More efficiently formats index names for data streams
    Allows individual events to override the dataset (useful for synthetics where we have a base browser dataset, but also browser_screenshot and browser_network for extended data that can take up lots of space, and often requires a different ILM policy).
  • Loading branch information
andrewvc authored Mar 4, 2021
1 parent 270a676 commit 74a3a44
Show file tree
Hide file tree
Showing 10 changed files with 413 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

- Add mime type detection for http responses. {pull}22976[22976]
- Bundle synthetics deps with heartbeat docker image. {pull}23274[23274]
- Handle datastreams for fleet. {pull}24223[24223]
- Add --sandbox option for browser monitor. {pull}24172[24172]

*Journalbeat*
Expand Down
62 changes: 24 additions & 38 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
package monitors

import (
"fmt"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index"
"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
)
Expand All @@ -52,16 +52,10 @@ type publishSettings struct {
KeepNull bool `config:"keep_null"`

// Output meta data settings
Pipeline string `config:"pipeline"` // ES Ingest pipeline name
Index fmtstr.EventFormatString `config:"index"` // ES output index pattern
DataStream *datastream `config:"data_stream"`
DataSet string `config:"dataset"`
}

type datastream struct {
Namespace string `config:"namespace"`
Dataset string `config:"dataset"`
Type string `config:"type"`
Pipeline string `config:"pipeline"` // ES Ingest pipeline name
Index fmtstr.EventFormatString `config:"index"` // ES output index pattern
DataStream *add_data_stream_index.DataStream `config:"data_stream"`
DataSet string `config:"dataset"`
}

// NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects.
Expand All @@ -71,6 +65,11 @@ func NewFactory(info beat.Info, sched *scheduler.Scheduler, allowWatches bool) *

// Create makes a new Runner for a new monitor with the given Config.
func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runner, error) {
c, err := stdfields.UnnestStream(c)
if err != nil {
return nil, err
}

configEditor, err := newCommonPublishConfigs(f.info, c)
if err != nil {
return nil, err
Expand All @@ -92,7 +91,12 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi
return nil, err
}

indexProcessor, err := setupIndexProcessor(info, settings)
stdFields, err := stdfields.ConfigToStdMonitorFields(cfg)
if err != nil {
return nil, err
}

indexProcessor, err := setupIndexProcessor(info, settings, stdFields.Type)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -147,35 +151,17 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi
}, nil
}

func setupIndexProcessor(info beat.Info, settings publishSettings) (processors.Processor, error) {
func setupIndexProcessor(info beat.Info, settings publishSettings, dataset string) (processors.Processor, error) {
var indexProcessor processors.Processor
if settings.DataStream != nil {
namespace := settings.DataStream.Namespace
if namespace == "" {
namespace = "default"
}
typ := settings.DataStream.Type
if typ == "" {
typ = "synthetics"
}

dataset := settings.DataStream.Dataset
if dataset == "" {
dataset = "generic"
ds := settings.DataStream
if ds.Type == "" {
ds.Type = "synthetics"
}

index := fmt.Sprintf(
"%s-%s-%s",
typ,
dataset,
namespace,
)
compiled, err := fmtstr.CompileEvent(index)
if err != nil {
return nil, fmt.Errorf("could not compile datastream: '%s', this should never happen: %w", index, err)
} else {
settings.Index = *compiled
if ds.Dataset == "" {
ds.Dataset = dataset
}
return add_data_stream_index.New(*ds), nil
}

if !settings.Index.IsEmpty() {
Expand Down
19 changes: 13 additions & 6 deletions heartbeat/monitors/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index"
)

func TestSetupIndexProcessor(t *testing.T) {
Expand All @@ -37,38 +38,43 @@ func TestSetupIndexProcessor(t *testing.T) {
tests := map[string]struct {
settings publishSettings
expectedIndex string
monitorType string
wantProc bool
wantErr bool
}{
"no settings should yield no processor": {
publishSettings{},
"",
"browser",
false,
false,
},
"exact index should be used exactly": {
publishSettings{Index: *fmtstr.MustCompileEvent("test")},
"test",
"browser",
true,
false,
},
"data stream should be type-namespace-dataset": {
publishSettings{
DataStream: &datastream{
Type: "myType",
Dataset: "myDataset",
DataStream: &add_data_stream_index.DataStream{
Namespace: "myNamespace",
Dataset: "myDataset",
Type: "myType",
},
},
"myType-myDataset-myNamespace",
"myType",
true,
false,
},
"data stream should use defaults": {
publishSettings{
DataStream: &datastream{},
DataStream: &add_data_stream_index.DataStream{},
},
"synthetics-generic-default",
"synthetics-browser-default",
"browser",
true,
false,
},
Expand All @@ -77,7 +83,7 @@ func TestSetupIndexProcessor(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
e := beat.Event{Meta: common.MapStr{}, Fields: common.MapStr{}}
proc, err := setupIndexProcessor(binfo, tt.settings)
proc, err := setupIndexProcessor(binfo, tt.settings, tt.monitorType)
if tt.wantErr == true {
require.Error(t, err)
return
Expand All @@ -89,6 +95,7 @@ func TestSetupIndexProcessor(t *testing.T) {
return
}

require.NotNil(t, proc)
_, err = proc.Run(&e)
require.Equal(t, tt.expectedIndex, e.Meta[events.FieldMetaRawIndex])
})
Expand Down
53 changes: 53 additions & 0 deletions heartbeat/monitors/stdfields/unnest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package stdfields

import (
"fmt"

"github.com/elastic/beats/v7/libbeat/common"
)

// OptionalStream represents a config that has a stream set, which in practice
// means agent/fleet. In this case we primarily use the first stream for the
// config, but we do pull the Id from the root, and merge the root data stream
// in as well
type OptionalStream struct {
Id string `config:"id"`
DataStream *common.Config `config:"data_stream"`
Streams []*common.Config `config:"streams"`
}

// UnnestStream detects configs that come from fleet and transforms the config into something compatible
// with heartbeat, by mixing some fields (id, data_stream) with those from the first stream. It assumes
// that there is exactly one stream associated with the input.
func UnnestStream(config *common.Config) (res *common.Config, err error) {
optS := &OptionalStream{}
err = config.Unpack(optS)
if err != nil {
return nil, fmt.Errorf("could not unnest stream: %w", err)
}

if len(optS.Streams) == 0 {
return config, nil
}

res = optS.Streams[0]
err = res.MergeWithOpts(common.MapStr{"id": optS.Id, "data_stream": optS.DataStream})
return
}
103 changes: 103 additions & 0 deletions heartbeat/monitors/stdfields/unnest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package stdfields

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/go-lookslike"
"github.com/elastic/go-lookslike/testslike"
"github.com/elastic/go-lookslike/validator"
)

func TestUnnestStream(t *testing.T) {
type testCase struct {
name string
cfg common.MapStr
v validator.Validator
}
tests := []testCase{
{
name: "simple",
cfg: common.MapStr{
"id": "myuuid",
"streams": []common.MapStr{
common.MapStr{
"streamid": "mystreamid",
"data_stream": common.MapStr{
"namespace": "mynamespace",
"dataset": "mydataset",
"type": "mytype",
},
},
},
},
v: lookslike.MustCompile(common.MapStr{
"id": "myuuid",
"data_stream": common.MapStr{
"namespace": "mynamespace",
"dataset": "mydataset",
"type": "mytype",
},
}),
},
{
name: "split data stream",
cfg: common.MapStr{
"id": "myuuid",
"data_stream": common.MapStr{
"type": "mytype",
},
"streams": []common.MapStr{
common.MapStr{
"data_stream": common.MapStr{
"namespace": "mynamespace",
"dataset": "mydataset",
},
},
},
},
v: lookslike.MustCompile(common.MapStr{
"id": "myuuid",
"data_stream": common.MapStr{
"namespace": "mynamespace",
"dataset": "mydataset",
"type": "mytype",
},
}),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
src, err := common.NewConfigFrom(test.cfg)
require.NoError(t, err)

unnested, err := UnnestStream(src)
require.NoError(t, err)

unpacked := common.MapStr{}
err = unnested.Unpack(unpacked)
require.NoError(t, err)
testslike.Test(t, test.v, unpacked)
})
}
}
Loading

0 comments on commit 74a3a44

Please sign in to comment.