Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update filter processor to use pdata #1885

Merged
merged 5 commits into from
Oct 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions internal/processor/filtermetric/filtermetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
package filtermetric

import (
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/processor/filterset"
)

Expand All @@ -25,13 +24,6 @@ type Matcher struct {
nameFilters filterset.FilterSet
}

// MatchMetric matches a metric using the metric properties configured on the Matcher.
// A metric only matches if every metric property configured on the Matcher is a match.
func (m *Matcher) MatchMetric(metric *metricspb.Metric) bool {
name := metric.GetMetricDescriptor().GetName()
return m.nameFilters.Matches(name)
}

// NewMatcher constructs a metric Matcher that can be used to match metrics by metric properties.
// For each supported metric property, the Matcher accepts a set of prespecified values. An incoming metric
// matches on a property if the property matches at least one of the prespecified values.
Expand All @@ -49,3 +41,9 @@ func NewMatcher(config *MatchProperties) (Matcher, error) {
nameFilters: nameFS,
}, nil
}

// MatchMetric matches a metric using the metric properties configured on the Matcher.
// A metric only matches if every metric property configured on the Matcher is a match.
func (m *Matcher) MatchMetric(metric pdata.Metric) bool {
return m.nameFilters.Matches(metric.Name())
}
15 changes: 7 additions & 8 deletions internal/processor/filtermetric/filtermetric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ package filtermetric
import (
"testing"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/processor/filterset"
)

Expand All @@ -42,19 +42,18 @@ var (
}
)

func createMetric(name string) *metricspb.Metric {
return &metricspb.Metric{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: name,
},
}
func createMetric(name string) pdata.Metric {
metric := pdata.NewMetric()
metric.InitEmpty()
metric.SetName(name)
return metric
}

func TestMatcherMatches(t *testing.T) {
tests := []struct {
name string
cfg *MatchProperties
metric *metricspb.Metric
metric pdata.Metric
shouldMatch bool
}{
{
Expand Down
45 changes: 25 additions & 20 deletions processor/filterprocessor/filter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ package filterprocessor
import (
"context"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/processor/filtermetric"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/translator/internaldata"
)

type filterMetricProcessor struct {
Expand Down Expand Up @@ -64,31 +61,39 @@ func createMatcher(mp *filtermetric.MatchProperties) (*filtermetric.Matcher, err
}

// ProcessMetrics filters the given metrics based off the filterMetricProcessor's filters.
func (fmp *filterMetricProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
mds := internaldata.MetricsToOC(md)
foundMetricToKeep := false
for i := range mds {
if len(mds[i].Metrics) == 0 {
func (fmp *filterMetricProcessor) ProcessMetrics(_ context.Context, pdm pdata.Metrics) (pdata.Metrics, error) {
rms := pdm.ResourceMetrics()
idx := newMetricIndex()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
if rm.IsNil() {
continue
}
keep := make([]*metricspb.Metric, 0, len(mds[i].Metrics))
for _, m := range mds[i].Metrics {
if fmp.shouldKeepMetric(m) {
foundMetricToKeep = true
keep = append(keep, m)
ilms := rm.InstrumentationLibraryMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
if ilm.IsNil() {
continue
}
ms := ilm.Metrics()
for k := 0; k < ms.Len(); k++ {
metric := ms.At(k)
if metric.IsNil() {
continue
}
if fmp.shouldKeepMetric(metric) {
idx.add(i, j, k)
}
}
}
mds[i].Metrics = keep
}

if !foundMetricToKeep {
return md, processorhelper.ErrSkipProcessingData
if idx.isEmpty() {
return pdm, processorhelper.ErrSkipProcessingData
}
return internaldata.OCSliceToMetrics(mds), nil
return idx.extract(pdm), nil
}

// shouldKeepMetric determines whether a metric should be kept based off the filterMetricProcessor's filters.
func (fmp *filterMetricProcessor) shouldKeepMetric(metric *metricspb.Metric) bool {
func (fmp *filterMetricProcessor) shouldKeepMetric(metric pdata.Metric) bool {
if fmp.include != nil {
if !fmp.include.MatchMetric(metric) {
return false
Expand Down
206 changes: 135 additions & 71 deletions processor/filterprocessor/filter_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package filterprocessor

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -27,7 +28,9 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
etest "go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/goldendataset"
"go.opentelemetry.io/collector/internal/processor/filtermetric"
"go.opentelemetry.io/collector/internal/processor/filterset"
"go.opentelemetry.io/collector/translator/internaldata"
Expand Down Expand Up @@ -240,77 +243,6 @@ func TestFilterMetricProcessor(t *testing.T) {
}
}

func BenchmarkFilter_MetricNames(b *testing.B) {
// runs 1000 metrics through a filterprocessor with both include and exclude filters.
stressTest := metricNameTest{
name: "includeAndExcludeFilter1000Metrics",
inc: regexpMetricsFilterProperties,
exc: &filtermetric.MatchProperties{
Config: filterset.Config{
MatchType: filterset.Strict,
},
MetricNames: []string{
"prefix_test_match",
"test_contains_match",
},
},
outMN: [][]string{{
"full_name_match",
"prefix/test/match",
// "prefix_test_match", excluded by exclude filter
"prefixprefix/test/match",
"test/match/suffix",
"test_match_suffix",
"test/match/suffixsuffix",
"test/contains/match",
// "test_contains_match", excluded by exclude filter
"full/name/match",
"full_name_match",
}},
}

for len(stressTest.inMN[0]) < 1000 {
stressTest.inMN[0] = append(stressTest.inMN[0], metricsWithName(inMetricNames)...)
}

benchmarkTests := append(standardTests, stressTest)

for _, test := range benchmarkTests {
// next stores the results of the filter metric processor
next := &etest.SinkMetricsExporter{}
cfg := &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
Metrics: MetricFilters{
Include: test.inc,
Exclude: test.exc,
},
}
factory := NewFactory()
fmp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, next)
assert.NotNil(b, fmp)
assert.Nil(b, err)

md := consumerdata.MetricsData{
Metrics: make([]*metricspb.Metric, len(test.inMN)),
}

mds := make([]consumerdata.MetricsData, len(test.inMN))
for i, metrics := range test.inMN {
mds[i] = consumerdata.MetricsData{
Metrics: metrics,
}
}

pdm := internaldata.OCToMetrics(md)
b.Run(test.name, func(b *testing.B) {
assert.NoError(b, fmp.ConsumeMetrics(context.Background(), pdm))
})
}
}

func metricsWithName(names []string) []*metricspb.Metric {
ret := make([]*metricspb.Metric, len(names))
now := time.Now()
Expand All @@ -336,3 +268,135 @@ func metricsWithName(names []string) []*metricspb.Metric {
}
return ret
}

func BenchmarkFilter(b *testing.B) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
pcfg := cfg.(*Config)
pcfg.Metrics = MetricFilters{
Exclude: &filtermetric.MatchProperties{
Config: filterset.Config{
MatchType: "strict",
},
MetricNames: []string{"p10_metric_0"},
},
}
next := &etest.SinkMetricsExporter{}
ctx := context.Background()
proc, _ := factory.CreateMetricsProcessor(
ctx,
component.ProcessorCreateParams{},
cfg,
next,
)
pdms := metricSlice(128)
for i := 0; i < b.N; i++ {
for _, pdm := range pdms {
_ = proc.ConsumeMetrics(ctx, pdm)
}
}
}

func metricSlice(numMetrics int) []pdata.Metrics {
var out []pdata.Metrics
for i := 0; i < numMetrics; i++ {
const size = 2
out = append(out, pdm(fmt.Sprintf("p%d_", i), size))
}
return out
}

func pdm(prefix string, size int) pdata.Metrics {
c := goldendataset.MetricCfg{
MetricDescriptorType: pdata.MetricDataTypeIntGauge,
MetricNamePrefix: prefix,
NumILMPerResource: size,
NumMetricsPerILM: size,
NumPtLabels: size,
NumPtsPerMetric: size,
NumResourceAttrs: size,
NumResourceMetrics: size,
}
return goldendataset.MetricDataFromCfg(c)
}

func TestMetricIndexSingle(t *testing.T) {
metrics := pdm("", 1)
idx := newMetricIndex()
idx.add(0, 0, 0)
extracted := idx.extract(metrics)
require.Equal(t, metrics, extracted)
}

func TestMetricIndexAll(t *testing.T) {
metrics := pdm("", 2)
idx := newMetricIndex()
idx.add(0, 0, 0)
idx.add(0, 0, 1)
idx.add(0, 1, 0)
idx.add(0, 1, 1)
idx.add(1, 0, 0)
idx.add(1, 0, 1)
idx.add(1, 1, 0)
idx.add(1, 1, 1)
extracted := idx.extract(metrics)
require.Equal(t, metrics, extracted)
}

func TestNilResourceMetrics(t *testing.T) {
metrics := pdata.NewMetrics()
rms := metrics.ResourceMetrics()
rms.Append(pdata.NewResourceMetrics())
requireNotPanics(t, metrics)
}

func TestNilILM(t *testing.T) {
metrics := pdata.NewMetrics()
rms := metrics.ResourceMetrics()
rm := pdata.NewResourceMetrics()
rm.InitEmpty()
rms.Append(rm)
ilms := rm.InstrumentationLibraryMetrics()
ilms.Append(pdata.NewInstrumentationLibraryMetrics())
requireNotPanics(t, metrics)
}

func TestNilMetric(t *testing.T) {
metrics := pdata.NewMetrics()
rms := metrics.ResourceMetrics()
rm := pdata.NewResourceMetrics()
rm.InitEmpty()
rms.Append(rm)
ilms := rm.InstrumentationLibraryMetrics()
ilm := pdata.NewInstrumentationLibraryMetrics()
ilms.Append(ilm)
ilm.InitEmpty()
ms := ilm.Metrics()
ms.Append(pdata.NewMetric())
requireNotPanics(t, metrics)
}

func requireNotPanics(t *testing.T, metrics pdata.Metrics) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
pcfg := cfg.(*Config)
pcfg.Metrics = MetricFilters{
Exclude: &filtermetric.MatchProperties{
Config: filterset.Config{
MatchType: "strict",
},
MetricNames: []string{"foo"},
},
}
next := &etest.SinkMetricsExporter{}
ctx := context.Background()
proc, _ := factory.CreateMetricsProcessor(
ctx,
component.ProcessorCreateParams{},
cfg,
next,
)
require.NotPanics(t, func() {
_ = proc.ConsumeMetrics(ctx, metrics)
})
}
Loading