Skip to content

Commit

Permalink
Handle overlapping metrics from different jobs in prometheus exporter (
Browse files Browse the repository at this point in the history
…#1096)

This commit fixes the following bug:

**The bug**
if otel-collector/agent scrapes from two endpoints which emits metrics of the same name(even with different label), it seems to only export metrics from one of the endpoints in a random fashion.

**Steps to reproduce**
1. modified the [demo example](https://github.com/open-telemetry/opentelemetry-collector/tree/master/examples/demo) in [this commit](jhengy@933679d)
  - create a cloned `metrics-load-generator2` which emits exactly the same metrics as `metrics-load-generator` except the `source` label
2. use [an older image](jhengy@bad29d9) (from 28 May 2020) due to the problem encountered with the latest otel-collector docker image, i.e. details can be found in [this issue](#1075)
3. run the modified demo example
- `cd exaples/demo`
- `docker-compose up`
- `curl localhost:8889/metrics`

**What did you expect to see?**
Will see metrics from both metrics-load-generator(source=source1) and metrics-load-generator2( source=source2)

**What did you see instead?**
At anytime, only see metrics from one of the sources.
Sometimes seeing this (only observe metrics from the metrics-load-generator service):
```
# HELP promexample_opdemo_latency The various latencies of the methods
# TYPE promexample_opdemo_latency histogram
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source1",le="10"} 86
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source1",le="50"} 448
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source1",le="100"} 783
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source1",le="200"} 802
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source1",le="400"} 846
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source1",le="800"} 939
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source1",le="1000"} 973
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source1",le="1400"} 1000
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source1",le="2000"} 1007
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source1",le="5000"} 1040
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source1",le="10000"} 1088
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source1",le="15000"} 1122
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source1",le="+Inf"} 1132
promexample_opdemo_latency_sum{client="cli",label1="value1",method="repl",source="source1"} 1.2317093098059976e+06
promexample_opdemo_latency_count{client="cli",label1="value1",method="repl",source="source1"} 1132
# HELP promexample_opdemo_line_counts The counts of the lines in
# TYPE promexample_opdemo_line_counts counter
promexample_opdemo_line_counts{client="cli",label1="value1",method="repl",source="source1"} 3424
# HELP promexample_opdemo_line_lengths The lengths of the various lines in
# TYPE promexample_opdemo_line_lengths histogram
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source1",le="10"} 27
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source1",le="20"} 61
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source1",le="50"} 155
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source1",le="100"} 324
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source1",le="150"} 481
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source1",le="200"} 662
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source1",le="500"} 1669
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source1",le="800"} 2722
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source1",le="+Inf"} 3424
promexample_opdemo_line_lengths_sum{client="cli",label1="value1",method="repl",source="source1"} 1.7351559999999993e+06
promexample_opdemo_line_lengths_count{client="cli",label1="value1",method="repl",source="source1"} 3424
# HELP promexample_opdemo_process_counts The various counts
# TYPE promexample_opdemo_process_counts counter
promexample_opdemo_process_counts{client="cli",label1="value1",method="repl",source="source1"} 1132
``` 
sometimes seeing this(only see metrics from the metrics-load-generator2 service):
```
# HELP promexample_opdemo_latency The various latencies of the methods
# TYPE promexample_opdemo_latency histogram
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source2",le="10"} 100
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source2",le="50"} 526
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source2",le="100"} 937
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source2",le="200"} 960
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source2",le="400"} 1013
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source2",le="800"} 1122
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source2",le="1000"} 1171
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source2",le="1400"} 1206
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source2",le="2000"} 1214
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source2",le="5000"} 1257
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source2",le="10000"} 1308
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source2",le="15000"} 1343
promexample_opdemo_latency_bucket{client="cli",label1="value1",method="repl",source="source2",le="+Inf"} 1352
promexample_opdemo_latency_sum{client="cli",label1="value1",method="repl",source="source2"} 1.3510892105500018e+06
promexample_opdemo_latency_count{client="cli",label1="value1",method="repl",source="source2"} 1352
# HELP promexample_opdemo_line_counts The counts of the lines in
# TYPE promexample_opdemo_line_counts counter
promexample_opdemo_line_counts{client="cli",label1="value1",method="repl",source="source2"} 4113
# HELP promexample_opdemo_line_lengths The lengths of the various lines in
# TYPE promexample_opdemo_line_lengths histogram
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source2",le="10"} 38
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source2",le="20"} 92
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source2",le="50"} 211
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source2",le="100"} 419
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source2",le="150"} 626
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source2",le="200"} 814
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source2",le="500"} 2025
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source2",le="800"} 3270
promexample_opdemo_line_lengths_bucket{client="cli",label1="value1",method="repl",source="source2",le="+Inf"} 4113
promexample_opdemo_line_lengths_sum{client="cli",label1="value1",method="repl",source="source2"} 2.0698130000000026e+06
promexample_opdemo_line_lengths_count{client="cli",label1="value1",method="repl",source="source2"} 4113
# HELP promexample_opdemo_process_counts The various counts
# TYPE promexample_opdemo_process_counts counter
promexample_opdemo_process_counts{client="cli",label1="value1",method="repl",source="source2"} 1352
```
  • Loading branch information
liamawhite committed Jun 16, 2020
1 parent 69a9fab commit f134f50
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 44 deletions.
30 changes: 30 additions & 0 deletions exporter/prometheusexporter/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package prometheusexporter

import (
"bytes"
"context"
"errors"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
// TODO: once this repository has been transferred to the
// official census-ecosystem location, update this import path.
"github.com/orijtech/prometheus-go-metrics-exporter"
Expand All @@ -42,12 +44,40 @@ func (pe *prometheusExporter) Start(_ context.Context, _ component.Host) error {
}

func (pe *prometheusExporter) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
merged := make(map[string]*metricspb.Metric)
for _, metric := range md.Metrics {
merge(merged, metric)
}
for _, metric := range merged {
_ = pe.exporter.ExportMetric(ctx, md.Node, md.Resource, metric)
}
return nil
}

// The underlying exporter overwrites timeseries when there are conflicting metric signatures.
// Therefore, we need to merge timeseries that share a metric signature into a single metric before sending.
func merge(m map[string]*metricspb.Metric, metric *metricspb.Metric) {
key := metricSignature(metric)
current, ok := m[key]
if !ok {
m[key] = metric
return
}
current.Timeseries = append(current.Timeseries, metric.Timeseries...)
}

// Unique identifier of a given promtheus metric
// Assumes label keys are always in the same order
func metricSignature(metric *metricspb.Metric) string {
var buf bytes.Buffer
buf.WriteString(metric.GetMetricDescriptor().GetName())
labelKeys := metric.GetMetricDescriptor().GetLabelKeys()
for _, labelKey := range labelKeys {
buf.WriteString("-" + labelKey.Key)
}
return buf.String()
}

// Shutdown stops the exporter and is invoked during shutdown.
func (pe *prometheusExporter) Shutdown(context.Context) error {
return pe.shutdownFunc()
Expand Down
137 changes: 93 additions & 44 deletions exporter/prometheusexporter/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package prometheusexporter

import (
"context"
"fmt"
"io/ioutil"
"net/http"
"strings"
"testing"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
Expand Down Expand Up @@ -87,57 +89,104 @@ func TestPrometheusExporter_endToEnd(t *testing.T) {

assert.NotNil(t, consumer)

var metric1 = &metricspb.Metric{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "this/one/there(where)",
Description: "Extra ones",
Unit: "1",
Type: metricspb.MetricDescriptor_CUMULATIVE_INT64,
LabelKeys: []*metricspb.LabelKey{
{Key: "os", Description: "Operating system"},
{Key: "arch", Description: "Architecture"},
for delta := 0; delta <= 20; delta += 10 {
consumer.ConsumeMetricsData(context.Background(), consumerdata.MetricsData{Metrics: metricBuilder(int64(delta))})

res, err := http.Get("http://localhost:7777/metrics")
if err != nil {
t.Fatalf("Failed to perform a scrape: %v", err)
}
if g, w := res.StatusCode, 200; g != w {
t.Errorf("Mismatched HTTP response status code: Got: %d Want: %d", g, w)
}
blob, _ := ioutil.ReadAll(res.Body)
_ = res.Body.Close()
want := []string{
`# HELP test_this_one_there_where_ Extra ones`,
`# TYPE test_this_one_there_where_ counter`,
fmt.Sprintf(`test_this_one_there_where_{arch="x86",code="one",foo="bar",os="windows"} %v`, 99+delta),
fmt.Sprintf(`test_this_one_there_where_{arch="x86",code="one",foo="bar",os="linux"} %v`, 100+delta),
}

for _, w := range want {
if !strings.Contains(string(blob), w) {
t.Errorf("Missing %v from response:\n%v", w, string(blob))
}
}
}
}

func metricBuilder(delta int64) []*metricspb.Metric {
return []*metricspb.Metric{
{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "this/one/there(where)",
Description: "Extra ones",
Unit: "1",
Type: metricspb.MetricDescriptor_CUMULATIVE_INT64,
LabelKeys: []*metricspb.LabelKey{
{Key: "os", Description: "Operating system"},
{Key: "arch", Description: "Architecture"},
},
},
},
Timeseries: []*metricspb.TimeSeries{
{
StartTimestamp: &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000090,
Timeseries: []*metricspb.TimeSeries{
{
StartTimestamp: &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000090,
},
LabelValues: []*metricspb.LabelValue{
{Value: "windows"},
{Value: "x86"},
},
Points: []*metricspb.Point{
{
Timestamp: &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000997,
},
Value: &metricspb.Point_Int64Value{
Int64Value: 99 + delta,
},
},
},
},
LabelValues: []*metricspb.LabelValue{
{Value: "windows"},
{Value: "x86"},
},
},
{
MetricDescriptor: &metricspb.MetricDescriptor{
Name: "this/one/there(where)",
Description: "Extra ones",
Unit: "1",
Type: metricspb.MetricDescriptor_CUMULATIVE_INT64,
LabelKeys: []*metricspb.LabelKey{
{Key: "os", Description: "Operating system"},
{Key: "arch", Description: "Architecture"},
},
Points: []*metricspb.Point{
{
Timestamp: &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000997,
},
Value: &metricspb.Point_Int64Value{
Int64Value: 99,
},
Timeseries: []*metricspb.TimeSeries{
{
StartTimestamp: &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000090,
},
LabelValues: []*metricspb.LabelValue{
{Value: "linux"},
{Value: "x86"},
},
Points: []*metricspb.Point{
{
Timestamp: &timestamp.Timestamp{
Seconds: 1543160298,
Nanos: 100000997,
},
Value: &metricspb.Point_Int64Value{
Int64Value: 100 + delta,
},
},
},
},
},
},
}
consumer.ConsumeMetricsData(context.Background(), consumerdata.MetricsData{Metrics: []*metricspb.Metric{metric1}})

res, err := http.Get("http://localhost:7777/metrics")
if err != nil {
t.Fatalf("Failed to perform a scrape: %v", err)
}
if g, w := res.StatusCode, 200; g != w {
t.Errorf("Mismatched HTTP response status code: Got: %d Want: %d", g, w)
}
blob, _ := ioutil.ReadAll(res.Body)
_ = res.Body.Close()
want := `# HELP test_this_one_there_where_ Extra ones
# TYPE test_this_one_there_where_ counter
test_this_one_there_where_{arch="x86",code="one",foo="bar",os="windows"} 99
`
if got := string(blob); got != want {
t.Errorf("Response mismatch\nGot:\n%s\n\nWant:\n%s", got, want)
}
}

0 comments on commit f134f50

Please sign in to comment.