diff --git a/exporter/cortexexporter/cortex.go b/exporter/cortexexporter/cortex.go index dbf2d9fff72..6bb617f8b31 100644 --- a/exporter/cortexexporter/cortex.go +++ b/exporter/cortexexporter/cortex.go @@ -17,38 +17,38 @@ package cortexexporter import ( "context" "fmt" - "github.com/prometheus/prometheus/prompb" - "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/consumer/pdatautil" - "go.opentelemetry.io/collector/internal/data" - common "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" - otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" "net/http" "sort" "strconv" "strings" "sync" "unicode" + "log" + "github.com/prometheus/prometheus/prompb" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/consumer/pdatautil" + "go.opentelemetry.io/collector/internal/data" + common "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" + otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" + ) type cortexExporter struct { - namespace string - endpoint string - client http.Client - wg *sync.WaitGroup + namespace string + endpoint string + client http.Client + wg *sync.WaitGroup closeChan chan struct{} } // ByLabelName enables the usage of sort.Sort() with a slice of labels type ByLabelName []prompb.Label - func (a ByLabelName) Len() int { return len(a) } func (a ByLabelName) Less(i, j int) bool { return a[i].Name < a[j].Name } func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -//check whether the metric has the correct type and kind combination -//desc cannot be nil +// validateMetrics returns a bool representing whether the metric has a valid type and temporality combination. func validateMetrics(desc *otlp.MetricDescriptor) bool { if desc == nil { return false @@ -63,9 +63,8 @@ func validateMetrics(desc *otlp.MetricDescriptor) bool { return false } -// find the TimeSeries corresponding to the label set in the map, and add sample to the TimeSeries; create new -// TimeSeries if not found. -// Non of the parameter can be nil +// addSample finds a TimeSeries in tsMap that corresponds to the label set lbs, and add sample to the TimeSeries; it +// creates a new TimeSeries in the map if not found. tsMap is unmodified if either of its parameters is nil. func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, lbs []prompb.Label, ty otlp.MetricDescriptor_Type) { if sample == nil || lbs == nil || tsMap == nil { @@ -77,22 +76,23 @@ func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, lbs [ ts.Samples = append(ts.Samples, *sample) } else { newTs := &prompb.TimeSeries{ - Labels: lbs, - Samples: []prompb.Sample{*sample}, - XXX_NoUnkeyedLiteral: struct{}{}, - XXX_unrecognized: nil, - XXX_sizecache: 0, + Labels: lbs, + Samples: []prompb.Sample{*sample}, } tsMap[sig] = newTs } } -// TYPE - label 1 - value 1 - ... label N - value N; labels should be in sorted order + +// timeSeries return a string signature in the form of: +// TYPE-label1-value1- ... -labelN-valueN +// the label slice should not contain duplicate label names; this method sorts the slice by label name before creating +// the signature. func timeSeriesSignature(t otlp.MetricDescriptor_Type, lbs *[]prompb.Label) string { b := strings.Builder{} fmt.Fprintf(&b, t.String()) - // sort labels by name sort.Sort(ByLabelName(*lbs)) + for _, lb := range *lbs { fmt.Fprintf(&b, "-%s-%s", lb.GetName(),lb.GetValue()) } @@ -100,28 +100,28 @@ func timeSeriesSignature(t otlp.MetricDescriptor_Type, lbs *[]prompb.Label) stri return b.String() } -// sanitize labels as well; label in extras overwrites label in labels if collision happens, perhaps log the overwrite +// createLabelSet creates a slice of Cortex Label with OTLP labels and paris of string values. +// Unpaired string value is ignored. String pairs overwrites OTLP labels if collision happens, and the overwrite is +// logged. Resultant label names are sanitized. func createLabelSet(labels []*common.StringKeyValue, extras ...string) []prompb.Label { l := map[string]prompb.Label{} for _, lb := range labels { l[lb.Key] = prompb.Label{ - Name: sanitize(lb.Key), - Value: lb.Value, - XXX_NoUnkeyedLiteral: struct{}{}, - XXX_unrecognized: nil, - XXX_sizecache: 0, + Name: sanitize(lb.Key), + Value: lb.Value, } } for i := 0; i < len(extras); i+=2 { if i+1 >= len(extras){ break } - l[extras[i]] = prompb.Label{ - Name: sanitize(extras[i]), - Value: extras[i+1], - XXX_NoUnkeyedLiteral: struct{}{}, - XXX_unrecognized: nil, - XXX_sizecache: 0, + _,found:= l[extras[i]] + if found { + log.Println("label " + extras[i] + " is overwritten. Check if Prometheus reserved labels are used.") + } + l[extras[i]] = prompb.Label{ + Name: sanitize(extras[i]), + Value: extras[i+1], } } s := make([]prompb.Label,0,len(l)) @@ -131,35 +131,51 @@ func createLabelSet(labels []*common.StringKeyValue, extras ...string) []prompb. return s } +// handleScalarMetric processes data points in a single OTLP scalar metric by adding the each point as a Sample into +// its corresponding TimeSeries in tsMap. tsMap and metric cannot be nil. metric must have a non-nil descriptor func (ce *cortexExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error { - ty := metric.MetricDescriptor.Type + ty := metric.MetricDescriptor.Type switch ty { + // int points case otlp.MetricDescriptor_MONOTONIC_INT64,otlp.MetricDescriptor_INT64: if metric.Int64DataPoints == nil { return fmt.Errorf("nil data point field in metric" + metric.GetMetricDescriptor().Name) } for _, pt := range metric.Int64DataPoints { - lbs := createLabelSet(pt.GetLabels(),"name", - getPromMetricName(metric.GetMetricDescriptor(),ce.namespace)) - sample := &prompb.Sample{Value:float64(pt.Value), Timestamp:int64(pt.TimeUnixNano)} - addSample(tsMap,sample, lbs, metric.GetMetricDescriptor().GetType()) + name := getPromMetricName(metric.GetMetricDescriptor(), ce.namespace) + lbs := createLabelSet(pt.GetLabels(), "name", name) + sample := &prompb.Sample{ + Value: float64(pt.Value), + Timestamp: int64(pt.TimeUnixNano), + } + addSample(tsMap,sample, lbs, metric.GetMetricDescriptor().GetType()) } return nil + + // double points case otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE: if metric.DoubleDataPoints == nil { return fmt.Errorf("nil data point field in metric" + metric.GetMetricDescriptor().Name) } for _, pt := range metric.DoubleDataPoints { - lbs := createLabelSet(pt.GetLabels(),"name", - getPromMetricName(metric.GetMetricDescriptor(),ce.namespace)) - sample := &prompb.Sample{Value:pt.Value, Timestamp:int64(pt.TimeUnixNano)} + name := getPromMetricName(metric.GetMetricDescriptor(), ce.namespace) + lbs := createLabelSet(pt.GetLabels(),"name", name) + sample := &prompb.Sample{ + Value: pt.Value, + Timestamp: int64(pt.TimeUnixNano), + } addSample(tsMap,sample, lbs, metric.GetMetricDescriptor().GetType()) } - return nil; + return nil } - return fmt.Errorf("invalid metric type: wants int or double points"); + + return fmt.Errorf("invalid metric type: wants int or double data points"); } + +// handleHistogramMetric processes data points in a single OTLP histogram metric by mapping the sum, count and each +// bucket of every data point as a Sample, and adding each Sample to its corresponding TimeSeries. +// tsMap and metric cannot be nil. func (ce *cortexExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error { if metric.HistogramDataPoints == nil { return fmt.Errorf("invalid metric type: wants histogram points") @@ -168,47 +184,76 @@ func (ce *cortexExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSer time := int64(pt.GetTimeUnixNano()) ty := metric.GetMetricDescriptor().GetType() baseName := getPromMetricName(metric.GetMetricDescriptor(),ce.namespace) - sum := &prompb.Sample{Value:pt.GetSum(),Timestamp:time} - count := &prompb.Sample{Value:float64(pt.GetCount()),Timestamp:time} - - addSample(tsMap, sum, createLabelSet(pt.GetLabels(),"name", baseName+"_sum"),ty) - addSample(tsMap, count, createLabelSet(pt.GetLabels(),"name", baseName+"_count"),ty) + sum := &prompb.Sample{ + Value: pt.GetSum(), + Timestamp: time, + } + count := &prompb.Sample{ + Value: float64(pt.GetCount()), + Timestamp: time, + } + sumLbs := createLabelSet(pt.GetLabels(),"name", baseName+"_sum") + countLbs := createLabelSet(pt.GetLabels(),"name", baseName+"_count") + addSample(tsMap, sum, sumLbs, ty) + addSample(tsMap, count, countLbs, ty) var totalCount uint64 for le, bk := range pt.GetBuckets(){ - bucket := &prompb.Sample{Value:float64(bk.Count),Timestamp:time} - addSample(tsMap, bucket, createLabelSet(pt.GetLabels(),"name", baseName+"_bucket", "le", - strconv.FormatFloat(pt.GetExplicitBounds()[le], 'f',-1, 64)),ty) + bucket := &prompb.Sample{ + Value: float64(bk.Count), + Timestamp: time, + } + boundStr := strconv.FormatFloat(pt.GetExplicitBounds()[le], 'f',-1, 64) + lbs := createLabelSet(pt.GetLabels(),"name", baseName+"_bucket", "le",boundStr) + addSample(tsMap, bucket, lbs ,ty) totalCount += bk.GetCount() } - addSample(tsMap, &prompb.Sample{Value:float64(totalCount),Timestamp:time}, - createLabelSet(pt.GetLabels(),"name", baseName+"_bucket", "le","+Inf"),ty) + infSample := &prompb.Sample{Value:float64(totalCount),Timestamp:time} + infLbs := createLabelSet(pt.GetLabels(),"name", baseName+"_bucket", "le","+Inf") + addSample(tsMap, infSample, infLbs, ty) } return nil } + +// handleSummaryMetric processes data points in a single OTLP summary metric by mapping the sum, count and each +// quantile of every data point as a Sample, and adding each Sample to its corresponding TimeSeries. +// tsMap and metric cannot be nil. func (ce *cortexExporter) handleSummaryMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error { if metric.SummaryDataPoints == nil { return fmt.Errorf("invalid metric type: wants summary points") } + for _, pt := range metric.SummaryDataPoints { time := int64(pt.GetTimeUnixNano()) ty := metric.GetMetricDescriptor().GetType() baseName := getPromMetricName(metric.GetMetricDescriptor(),ce.namespace) - sum := &prompb.Sample{Value:pt.GetSum(),Timestamp:time} - count := &prompb.Sample{Value:float64(pt.GetCount()),Timestamp:time} - - addSample(tsMap, sum, createLabelSet(pt.GetLabels(),"name", baseName+"_sum"),ty) - addSample(tsMap, count, createLabelSet(pt.GetLabels(),"name", baseName+"_count"),ty) + sum := &prompb.Sample{ + Value: pt.GetSum(), + Timestamp: time, + } + count := &prompb.Sample{ + Value: float64(pt.GetCount()), + Timestamp: time, + } + sumLbs := createLabelSet(pt.GetLabels(),"name", baseName+"_sum") + countLbs := createLabelSet(pt.GetLabels(),"name", baseName+"_count") + addSample(tsMap, sum, sumLbs, ty) + addSample(tsMap, count, countLbs, ty) for _, qt := range pt.GetPercentileValues(){ - quantile := &prompb.Sample{Value:float64(qt.Value), Timestamp:time} - addSample(tsMap, quantile, createLabelSet(pt.GetLabels(),"name", baseName, "quantile", - strconv.FormatFloat(qt.Percentile, 'f',-1, 64)),ty) + quantile := &prompb.Sample{ + Value: float64(qt.Value), + Timestamp: time, + } + qtStr := strconv.FormatFloat(qt.Percentile, 'f',-1, 64) + qtLbs := createLabelSet(pt.GetLabels(),"name", baseName, "quantile", qtStr) + addSample(tsMap, quantile, qtLbs, ty) } } return nil } -// client cannot be nil +// newCortexExporter initializes a new cortexExporter instance and sets fields accordingly. +// client parameter cannot be nil. func newCortexExporter(ns string, ep string, client *http.Client) *cortexExporter { return &cortexExporter{ namespace: ns, @@ -218,11 +263,17 @@ func newCortexExporter(ns string, ep string, client *http.Client) *cortexExporte closeChan: make(chan struct{}), } } +// shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations +// to finish before returning func (ce *cortexExporter)shutdown(context.Context) error{ close(ce.closeChan) ce.wg.Wait() return nil } + +// pushMetrics converts metrics to Cortex TimeSeries and send to remote endpoint. It maintain a map of TimeSeries, +// validates and handles each individual metric, adding the converted TimeSeries to the map, and finally +// exports the map. func (ce *cortexExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int, error) { ce.wg.Add(1) defer ce.wg.Done() @@ -272,16 +323,19 @@ func (ce *cortexExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (in return 0, nil } } -func (c *cortexExporter) Export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error { + +// export sends TimeSeries in tsMap to a Cortex Gateway +func (ce *cortexExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error { return nil } -// create Prometheus metric name by attaching namespace prefix, unit, and _total suffix +// getPromMetricName creates a Prometheus metric name by attaching namespace prefix, unit, and _total suffix func getPromMetricName(desc *otlp.MetricDescriptor, ns string) string { if desc == nil { return "" } - isCounter := desc.Type == otlp.MetricDescriptor_MONOTONIC_INT64 || desc.Type == otlp.MetricDescriptor_MONOTONIC_DOUBLE + isCounter := desc.Type == otlp.MetricDescriptor_MONOTONIC_INT64 || + desc.Type == otlp.MetricDescriptor_MONOTONIC_DOUBLE b := strings.Builder{} fmt.Fprintf(&b, ns) if b.Len() > 0 { @@ -298,7 +352,7 @@ func getPromMetricName(desc *otlp.MetricDescriptor, ns string) string { fmt.Fprintf(&b, "_") fmt.Fprintf(&b, "total") } - return b.String() + return sanitize(b.String()) } // copied from prometheus-go-metric-exporter @@ -323,7 +377,7 @@ func sanitize(s string) string { } // copied from prometheus-go-metric-exporter -// converts anything that is not a letter or digit to an underscore +// sanitizeRune converts anything that is not a letter or digit to an underscore func sanitizeRune(r rune) rune { if unicode.IsLetter(r) || unicode.IsDigit(r) { return r