From e25cb3a255b6b9d0f967464bcd5340625cd47f13 Mon Sep 17 00:00:00 2001 From: Laurent Querel Date: Sun, 13 Nov 2022 15:51:27 -0800 Subject: [PATCH] Implement #46 --- pkg/otel/arrow_record/consumer.go | 25 +++++++++++++++ pkg/otel/arrow_record/producer.go | 23 +++++++++++++ .../arrow_record/producer_consumer_test.go | 32 +++++++++++++++++-- 3 files changed, 77 insertions(+), 3 deletions(-) diff --git a/pkg/otel/arrow_record/consumer.go b/pkg/otel/arrow_record/consumer.go index 7e1c8415..31cad6c0 100644 --- a/pkg/otel/arrow_record/consumer.go +++ b/pkg/otel/arrow_record/consumer.go @@ -22,10 +22,12 @@ import ( "github.com/apache/arrow/go/v11/arrow/ipc" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" colarspb "github.com/f5/otel-arrow-adapter/api/collector/arrow/v1" logs_otlp "github.com/f5/otel-arrow-adapter/pkg/otel/logs/otlp" + metrics_otlp "github.com/f5/otel-arrow-adapter/pkg/otel/metrics/otlp" traces_otlp "github.com/f5/otel-arrow-adapter/pkg/otel/traces/otlp" ) @@ -55,6 +57,29 @@ func NewConsumer() *Consumer { } } +// MetricsFrom produces an array of [pmetric.Metrics] from a BatchArrowRecords message. +func (c *Consumer) MetricsFrom(bar *colarspb.BatchArrowRecords) ([]pmetric.Metrics, error) { + records, err := c.Consume(bar) + if err != nil { + return nil, err + } + + record2Metrics := func(record *RecordMessage) (pmetric.Metrics, error) { + defer record.record.Release() + return metrics_otlp.MetricsFrom(record.record) + } + + var result []pmetric.Metrics + for _, record := range records { + metrics, err := record2Metrics(record) + if err != nil { + return nil, err + } + result = append(result, metrics) + } + return result, nil +} + // LogsFrom produces an array of [plog.Logs] from a BatchArrowRecords message. func (c *Consumer) LogsFrom(bar *colarspb.BatchArrowRecords) ([]plog.Logs, error) { records, err := c.Consume(bar) diff --git a/pkg/otel/arrow_record/producer.go b/pkg/otel/arrow_record/producer.go index 6520c011..3a908608 100644 --- a/pkg/otel/arrow_record/producer.go +++ b/pkg/otel/arrow_record/producer.go @@ -21,10 +21,12 @@ import ( "github.com/apache/arrow/go/v11/arrow/ipc" "github.com/apache/arrow/go/v11/arrow/memory" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" colarspb "github.com/f5/otel-arrow-adapter/api/collector/arrow/v1" logs_arrow "github.com/f5/otel-arrow-adapter/pkg/otel/logs/arrow" + metrics_arrow "github.com/f5/otel-arrow-adapter/pkg/otel/metrics/arrow" traces_arrow "github.com/f5/otel-arrow-adapter/pkg/otel/traces/arrow" ) @@ -60,6 +62,27 @@ func NewProducer() *Producer { } } +// BatchArrowRecordsFromMetrics produces a BatchArrowRecords message from a [pmetric.Metrics] messages. +func (p *Producer) BatchArrowRecordsFromMetrics(metrics pmetric.Metrics) (*colarspb.BatchArrowRecords, error) { + mb := metrics_arrow.NewMetricsBuilder(p.pool) + if err := mb.Append(metrics); err != nil { + return nil, err + } + record, err := mb.Build() + if err != nil { + return nil, err + } + defer record.Release() + + rms := []*RecordMessage{NewMetricsMessage(record, colarspb.DeliveryType_BEST_EFFORT)} + + bar, err := p.Produce(rms, colarspb.DeliveryType_BEST_EFFORT) + if err != nil { + return nil, err + } + return bar, nil +} + // BatchArrowRecordsFromLogs produces a BatchArrowRecords message from a [plog.Logs] messages. func (p *Producer) BatchArrowRecordsFromLogs(ls plog.Logs) (*colarspb.BatchArrowRecords, error) { lb := logs_arrow.NewLogsBuilder(p.pool) diff --git a/pkg/otel/arrow_record/producer_consumer_test.go b/pkg/otel/arrow_record/producer_consumer_test.go index 356f4595..8047259c 100644 --- a/pkg/otel/arrow_record/producer_consumer_test.go +++ b/pkg/otel/arrow_record/producer_consumer_test.go @@ -5,12 +5,14 @@ import ( "testing" "time" - arrowpb "github.com/f5/otel-arrow-adapter/api/collector/arrow/v1" - "github.com/f5/otel-arrow-adapter/pkg/datagen" - "github.com/f5/otel-arrow-adapter/pkg/otel/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + + arrowpb "github.com/f5/otel-arrow-adapter/api/collector/arrow/v1" + "github.com/f5/otel-arrow-adapter/pkg/datagen" + "github.com/f5/otel-arrow-adapter/pkg/otel/assert" ) func TestProducerConsumerTraces(t *testing.T) { @@ -60,3 +62,27 @@ func TestProducerConsumerLogs(t *testing.T) { []json.Marshaler{plogotlp.NewExportRequestFromLogs(received[0])}, ) } + +func TestProducerConsumerMetrics(t *testing.T) { + dg := datagen.NewMetricsGenerator( + datagen.DefaultResourceAttributes(), + datagen.DefaultInstrumentationScopes(), + ) + metrics := dg.Generate(10, time.Minute) + + producer := NewProducer() + + batch, err := producer.BatchArrowRecordsFromMetrics(metrics) + require.NoError(t, err) + require.Equal(t, arrowpb.OtlpArrowPayloadType_METRICS, batch.OtlpArrowPayloads[0].Type) + + consumer := NewConsumer() + received, err := consumer.MetricsFrom(batch) + require.Equal(t, 1, len(received)) + + assert.Equiv( + t, + []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(metrics)}, + []json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(received[0])}, + ) +}