Skip to content

Commit

Permalink
add comments to and format cortex.go
Browse files Browse the repository at this point in the history
  • Loading branch information
huyan0 committed Aug 4, 2020
1 parent e9ac299 commit 53fbe2b
Showing 1 changed file with 123 additions and 69 deletions.
192 changes: 123 additions & 69 deletions exporter/cortexexporter/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,38 @@ package cortexexporter
import (
"context"
"fmt"
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/internal/data"
common "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1"
otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"unicode"
"log"
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/internal/data"
common "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1"
otlp "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1"

)

type cortexExporter struct {
namespace string
endpoint string
client http.Client
wg *sync.WaitGroup
namespace string
endpoint string
client http.Client
wg *sync.WaitGroup
closeChan chan struct{}
}

// ByLabelName enables the usage of sort.Sort() with a slice of labels
type ByLabelName []prompb.Label

func (a ByLabelName) Len() int { return len(a) }
func (a ByLabelName) Less(i, j int) bool { return a[i].Name < a[j].Name }
func (a ByLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }


//check whether the metric has the correct type and kind combination
//desc cannot be nil
// validateMetrics returns a bool representing whether the metric has a valid type and temporality combination.
func validateMetrics(desc *otlp.MetricDescriptor) bool {
if desc == nil {
return false
Expand All @@ -63,9 +63,8 @@ func validateMetrics(desc *otlp.MetricDescriptor) bool {
return false
}

// find the TimeSeries corresponding to the label set in the map, and add sample to the TimeSeries; create new
// TimeSeries if not found.
// Non of the parameter can be nil
// addSample finds a TimeSeries in tsMap that corresponds to the label set lbs, and add sample to the TimeSeries; it
// creates a new TimeSeries in the map if not found. tsMap is unmodified if either of its parameters is nil.
func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, lbs []prompb.Label,
ty otlp.MetricDescriptor_Type) {
if sample == nil || lbs == nil || tsMap == nil {
Expand All @@ -77,51 +76,52 @@ func addSample(tsMap map[string]*prompb.TimeSeries, sample *prompb.Sample, lbs [
ts.Samples = append(ts.Samples, *sample)
} else {
newTs := &prompb.TimeSeries{
Labels: lbs,
Samples: []prompb.Sample{*sample},
XXX_NoUnkeyedLiteral: struct{}{},
XXX_unrecognized: nil,
XXX_sizecache: 0,
Labels: lbs,
Samples: []prompb.Sample{*sample},
}
tsMap[sig] = newTs
}
}
// TYPE - label 1 - value 1 - ... label N - value N; labels should be in sorted order

// timeSeries return a string signature in the form of:
// TYPE-label1-value1- ... -labelN-valueN
// the label slice should not contain duplicate label names; this method sorts the slice by label name before creating
// the signature.
func timeSeriesSignature(t otlp.MetricDescriptor_Type, lbs *[]prompb.Label) string {
b := strings.Builder{}
fmt.Fprintf(&b, t.String())

// sort labels by name
sort.Sort(ByLabelName(*lbs))

for _, lb := range *lbs {
fmt.Fprintf(&b, "-%s-%s", lb.GetName(),lb.GetValue())
}

return b.String()
}

// sanitize labels as well; label in extras overwrites label in labels if collision happens, perhaps log the overwrite
// createLabelSet creates a slice of Cortex Label with OTLP labels and paris of string values.
// Unpaired string value is ignored. String pairs overwrites OTLP labels if collision happens, and the overwrite is
// logged. Resultant label names are sanitized.
func createLabelSet(labels []*common.StringKeyValue, extras ...string) []prompb.Label {
l := map[string]prompb.Label{}
for _, lb := range labels {
l[lb.Key] = prompb.Label{
Name: sanitize(lb.Key),
Value: lb.Value,
XXX_NoUnkeyedLiteral: struct{}{},
XXX_unrecognized: nil,
XXX_sizecache: 0,
Name: sanitize(lb.Key),
Value: lb.Value,
}
}
for i := 0; i < len(extras); i+=2 {
if i+1 >= len(extras){
break
}
l[extras[i]] = prompb.Label{
Name: sanitize(extras[i]),
Value: extras[i+1],
XXX_NoUnkeyedLiteral: struct{}{},
XXX_unrecognized: nil,
XXX_sizecache: 0,
_,found:= l[extras[i]]
if found {
log.Println("label " + extras[i] + " is overwritten. Check if Prometheus reserved labels are used.")
}
l[extras[i]] = prompb.Label{
Name: sanitize(extras[i]),
Value: extras[i+1],
}
}
s := make([]prompb.Label,0,len(l))
Expand All @@ -131,35 +131,51 @@ func createLabelSet(labels []*common.StringKeyValue, extras ...string) []prompb.
return s
}

// handleScalarMetric processes data points in a single OTLP scalar metric by adding the each point as a Sample into
// its corresponding TimeSeries in tsMap. tsMap and metric cannot be nil. metric must have a non-nil descriptor
func (ce *cortexExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {
ty := metric.MetricDescriptor.Type

ty := metric.MetricDescriptor.Type
switch ty {
// int points
case otlp.MetricDescriptor_MONOTONIC_INT64,otlp.MetricDescriptor_INT64:
if metric.Int64DataPoints == nil {
return fmt.Errorf("nil data point field in metric" + metric.GetMetricDescriptor().Name)
}
for _, pt := range metric.Int64DataPoints {
lbs := createLabelSet(pt.GetLabels(),"name",
getPromMetricName(metric.GetMetricDescriptor(),ce.namespace))
sample := &prompb.Sample{Value:float64(pt.Value), Timestamp:int64(pt.TimeUnixNano)}
addSample(tsMap,sample, lbs, metric.GetMetricDescriptor().GetType())
name := getPromMetricName(metric.GetMetricDescriptor(), ce.namespace)
lbs := createLabelSet(pt.GetLabels(), "name", name)
sample := &prompb.Sample{
Value: float64(pt.Value),
Timestamp: int64(pt.TimeUnixNano),
}
addSample(tsMap,sample, lbs, metric.GetMetricDescriptor().GetType())
}
return nil

// double points
case otlp.MetricDescriptor_MONOTONIC_DOUBLE, otlp.MetricDescriptor_DOUBLE:
if metric.DoubleDataPoints == nil {
return fmt.Errorf("nil data point field in metric" + metric.GetMetricDescriptor().Name)
}
for _, pt := range metric.DoubleDataPoints {
lbs := createLabelSet(pt.GetLabels(),"name",
getPromMetricName(metric.GetMetricDescriptor(),ce.namespace))
sample := &prompb.Sample{Value:pt.Value, Timestamp:int64(pt.TimeUnixNano)}
name := getPromMetricName(metric.GetMetricDescriptor(), ce.namespace)
lbs := createLabelSet(pt.GetLabels(),"name", name)
sample := &prompb.Sample{
Value: pt.Value,
Timestamp: int64(pt.TimeUnixNano),
}
addSample(tsMap,sample, lbs, metric.GetMetricDescriptor().GetType())
}
return nil;
return nil
}
return fmt.Errorf("invalid metric type: wants int or double points");

return fmt.Errorf("invalid metric type: wants int or double data points");
}

// handleHistogramMetric processes data points in a single OTLP histogram metric by mapping the sum, count and each
// bucket of every data point as a Sample, and adding each Sample to its corresponding TimeSeries.
// tsMap and metric cannot be nil.
func (ce *cortexExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {
if metric.HistogramDataPoints == nil {
return fmt.Errorf("invalid metric type: wants histogram points")
Expand All @@ -168,47 +184,76 @@ func (ce *cortexExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSer
time := int64(pt.GetTimeUnixNano())
ty := metric.GetMetricDescriptor().GetType()
baseName := getPromMetricName(metric.GetMetricDescriptor(),ce.namespace)
sum := &prompb.Sample{Value:pt.GetSum(),Timestamp:time}
count := &prompb.Sample{Value:float64(pt.GetCount()),Timestamp:time}

addSample(tsMap, sum, createLabelSet(pt.GetLabels(),"name", baseName+"_sum"),ty)
addSample(tsMap, count, createLabelSet(pt.GetLabels(),"name", baseName+"_count"),ty)
sum := &prompb.Sample{
Value: pt.GetSum(),
Timestamp: time,
}
count := &prompb.Sample{
Value: float64(pt.GetCount()),
Timestamp: time,
}
sumLbs := createLabelSet(pt.GetLabels(),"name", baseName+"_sum")
countLbs := createLabelSet(pt.GetLabels(),"name", baseName+"_count")
addSample(tsMap, sum, sumLbs, ty)
addSample(tsMap, count, countLbs, ty)
var totalCount uint64
for le, bk := range pt.GetBuckets(){
bucket := &prompb.Sample{Value:float64(bk.Count),Timestamp:time}
addSample(tsMap, bucket, createLabelSet(pt.GetLabels(),"name", baseName+"_bucket", "le",
strconv.FormatFloat(pt.GetExplicitBounds()[le], 'f',-1, 64)),ty)
bucket := &prompb.Sample{
Value: float64(bk.Count),
Timestamp: time,
}
boundStr := strconv.FormatFloat(pt.GetExplicitBounds()[le], 'f',-1, 64)
lbs := createLabelSet(pt.GetLabels(),"name", baseName+"_bucket", "le",boundStr)
addSample(tsMap, bucket, lbs ,ty)
totalCount += bk.GetCount()
}
addSample(tsMap, &prompb.Sample{Value:float64(totalCount),Timestamp:time},
createLabelSet(pt.GetLabels(),"name", baseName+"_bucket", "le","+Inf"),ty)
infSample := &prompb.Sample{Value:float64(totalCount),Timestamp:time}
infLbs := createLabelSet(pt.GetLabels(),"name", baseName+"_bucket", "le","+Inf")
addSample(tsMap, infSample, infLbs, ty)
}
return nil
}

// handleSummaryMetric processes data points in a single OTLP summary metric by mapping the sum, count and each
// quantile of every data point as a Sample, and adding each Sample to its corresponding TimeSeries.
// tsMap and metric cannot be nil.
func (ce *cortexExporter) handleSummaryMetric(tsMap map[string]*prompb.TimeSeries, metric *otlp.Metric) error {
if metric.SummaryDataPoints == nil {
return fmt.Errorf("invalid metric type: wants summary points")
}

for _, pt := range metric.SummaryDataPoints {
time := int64(pt.GetTimeUnixNano())
ty := metric.GetMetricDescriptor().GetType()
baseName := getPromMetricName(metric.GetMetricDescriptor(),ce.namespace)
sum := &prompb.Sample{Value:pt.GetSum(),Timestamp:time}
count := &prompb.Sample{Value:float64(pt.GetCount()),Timestamp:time}

addSample(tsMap, sum, createLabelSet(pt.GetLabels(),"name", baseName+"_sum"),ty)
addSample(tsMap, count, createLabelSet(pt.GetLabels(),"name", baseName+"_count"),ty)
sum := &prompb.Sample{
Value: pt.GetSum(),
Timestamp: time,
}
count := &prompb.Sample{
Value: float64(pt.GetCount()),
Timestamp: time,
}
sumLbs := createLabelSet(pt.GetLabels(),"name", baseName+"_sum")
countLbs := createLabelSet(pt.GetLabels(),"name", baseName+"_count")
addSample(tsMap, sum, sumLbs, ty)
addSample(tsMap, count, countLbs, ty)
for _, qt := range pt.GetPercentileValues(){
quantile := &prompb.Sample{Value:float64(qt.Value), Timestamp:time}
addSample(tsMap, quantile, createLabelSet(pt.GetLabels(),"name", baseName, "quantile",
strconv.FormatFloat(qt.Percentile, 'f',-1, 64)),ty)
quantile := &prompb.Sample{
Value: float64(qt.Value),
Timestamp: time,
}
qtStr := strconv.FormatFloat(qt.Percentile, 'f',-1, 64)
qtLbs := createLabelSet(pt.GetLabels(),"name", baseName, "quantile", qtStr)
addSample(tsMap, quantile, qtLbs, ty)
}
}
return nil

}

// client cannot be nil
// newCortexExporter initializes a new cortexExporter instance and sets fields accordingly.
// client parameter cannot be nil.
func newCortexExporter(ns string, ep string, client *http.Client) *cortexExporter {
return &cortexExporter{
namespace: ns,
Expand All @@ -218,11 +263,17 @@ func newCortexExporter(ns string, ep string, client *http.Client) *cortexExporte
closeChan: make(chan struct{}),
}
}
// shutdown stops the exporter from accepting incoming calls(and return error), and wait for current export operations
// to finish before returning
func (ce *cortexExporter)shutdown(context.Context) error{
close(ce.closeChan)
ce.wg.Wait()
return nil
}

// pushMetrics converts metrics to Cortex TimeSeries and send to remote endpoint. It maintain a map of TimeSeries,
// validates and handles each individual metric, adding the converted TimeSeries to the map, and finally
// exports the map.
func (ce *cortexExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (int, error) {
ce.wg.Add(1)
defer ce.wg.Done()
Expand Down Expand Up @@ -272,16 +323,19 @@ func (ce *cortexExporter) pushMetrics(ctx context.Context, md pdata.Metrics) (in
return 0, nil
}
}
func (c *cortexExporter) Export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error {

// export sends TimeSeries in tsMap to a Cortex Gateway
func (ce *cortexExporter) export(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error {
return nil
}

// create Prometheus metric name by attaching namespace prefix, unit, and _total suffix
// getPromMetricName creates a Prometheus metric name by attaching namespace prefix, unit, and _total suffix
func getPromMetricName(desc *otlp.MetricDescriptor, ns string) string {
if desc == nil {
return ""
}
isCounter := desc.Type == otlp.MetricDescriptor_MONOTONIC_INT64 || desc.Type == otlp.MetricDescriptor_MONOTONIC_DOUBLE
isCounter := desc.Type == otlp.MetricDescriptor_MONOTONIC_INT64 ||
desc.Type == otlp.MetricDescriptor_MONOTONIC_DOUBLE
b := strings.Builder{}
fmt.Fprintf(&b, ns)
if b.Len() > 0 {
Expand All @@ -298,7 +352,7 @@ func getPromMetricName(desc *otlp.MetricDescriptor, ns string) string {
fmt.Fprintf(&b, "_")
fmt.Fprintf(&b, "total")
}
return b.String()
return sanitize(b.String())
}

// copied from prometheus-go-metric-exporter
Expand All @@ -323,7 +377,7 @@ func sanitize(s string) string {
}

// copied from prometheus-go-metric-exporter
// converts anything that is not a letter or digit to an underscore
// sanitizeRune converts anything that is not a letter or digit to an underscore
func sanitizeRune(r rune) rune {
if unicode.IsLetter(r) || unicode.IsDigit(r) {
return r
Expand Down

0 comments on commit 53fbe2b

Please sign in to comment.