Skip to content

Commit

Permalink
Implement #46
Browse files Browse the repository at this point in the history
  • Loading branch information
lquerel committed Nov 13, 2022
1 parent 0140de5 commit e25cb3a
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 3 deletions.
25 changes: 25 additions & 0 deletions pkg/otel/arrow_record/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (

"github.com/apache/arrow/go/v11/arrow/ipc"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

colarspb "github.com/f5/otel-arrow-adapter/api/collector/arrow/v1"
logs_otlp "github.com/f5/otel-arrow-adapter/pkg/otel/logs/otlp"
metrics_otlp "github.com/f5/otel-arrow-adapter/pkg/otel/metrics/otlp"
traces_otlp "github.com/f5/otel-arrow-adapter/pkg/otel/traces/otlp"
)

Expand Down Expand Up @@ -55,6 +57,29 @@ func NewConsumer() *Consumer {
}
}

// MetricsFrom produces an array of [pmetric.Metrics] from a BatchArrowRecords message.
func (c *Consumer) MetricsFrom(bar *colarspb.BatchArrowRecords) ([]pmetric.Metrics, error) {
records, err := c.Consume(bar)
if err != nil {
return nil, err
}

record2Metrics := func(record *RecordMessage) (pmetric.Metrics, error) {
defer record.record.Release()
return metrics_otlp.MetricsFrom(record.record)
}

var result []pmetric.Metrics
for _, record := range records {
metrics, err := record2Metrics(record)
if err != nil {
return nil, err
}
result = append(result, metrics)
}
return result, nil
}

// LogsFrom produces an array of [plog.Logs] from a BatchArrowRecords message.
func (c *Consumer) LogsFrom(bar *colarspb.BatchArrowRecords) ([]plog.Logs, error) {
records, err := c.Consume(bar)
Expand Down
23 changes: 23 additions & 0 deletions pkg/otel/arrow_record/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"github.com/apache/arrow/go/v11/arrow/ipc"
"github.com/apache/arrow/go/v11/arrow/memory"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"

colarspb "github.com/f5/otel-arrow-adapter/api/collector/arrow/v1"
logs_arrow "github.com/f5/otel-arrow-adapter/pkg/otel/logs/arrow"
metrics_arrow "github.com/f5/otel-arrow-adapter/pkg/otel/metrics/arrow"
traces_arrow "github.com/f5/otel-arrow-adapter/pkg/otel/traces/arrow"
)

Expand Down Expand Up @@ -60,6 +62,27 @@ func NewProducer() *Producer {
}
}

// BatchArrowRecordsFromMetrics produces a BatchArrowRecords message from a [pmetric.Metrics] messages.
func (p *Producer) BatchArrowRecordsFromMetrics(metrics pmetric.Metrics) (*colarspb.BatchArrowRecords, error) {
mb := metrics_arrow.NewMetricsBuilder(p.pool)
if err := mb.Append(metrics); err != nil {
return nil, err
}
record, err := mb.Build()
if err != nil {
return nil, err
}
defer record.Release()

rms := []*RecordMessage{NewMetricsMessage(record, colarspb.DeliveryType_BEST_EFFORT)}

bar, err := p.Produce(rms, colarspb.DeliveryType_BEST_EFFORT)
if err != nil {
return nil, err
}
return bar, nil
}

// BatchArrowRecordsFromLogs produces a BatchArrowRecords message from a [plog.Logs] messages.
func (p *Producer) BatchArrowRecordsFromLogs(ls plog.Logs) (*colarspb.BatchArrowRecords, error) {
lb := logs_arrow.NewLogsBuilder(p.pool)
Expand Down
32 changes: 29 additions & 3 deletions pkg/otel/arrow_record/producer_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"testing"
"time"

arrowpb "github.com/f5/otel-arrow-adapter/api/collector/arrow/v1"
"github.com/f5/otel-arrow-adapter/pkg/datagen"
"github.com/f5/otel-arrow-adapter/pkg/otel/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"

arrowpb "github.com/f5/otel-arrow-adapter/api/collector/arrow/v1"
"github.com/f5/otel-arrow-adapter/pkg/datagen"
"github.com/f5/otel-arrow-adapter/pkg/otel/assert"
)

func TestProducerConsumerTraces(t *testing.T) {
Expand Down Expand Up @@ -60,3 +62,27 @@ func TestProducerConsumerLogs(t *testing.T) {
[]json.Marshaler{plogotlp.NewExportRequestFromLogs(received[0])},
)
}

func TestProducerConsumerMetrics(t *testing.T) {
dg := datagen.NewMetricsGenerator(
datagen.DefaultResourceAttributes(),
datagen.DefaultInstrumentationScopes(),
)
metrics := dg.Generate(10, time.Minute)

producer := NewProducer()

batch, err := producer.BatchArrowRecordsFromMetrics(metrics)
require.NoError(t, err)
require.Equal(t, arrowpb.OtlpArrowPayloadType_METRICS, batch.OtlpArrowPayloads[0].Type)

consumer := NewConsumer()
received, err := consumer.MetricsFrom(batch)
require.Equal(t, 1, len(received))

assert.Equiv(
t,
[]json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(metrics)},
[]json.Marshaler{pmetricotlp.NewExportRequestFromMetrics(received[0])},
)
}

0 comments on commit e25cb3a

Please sign in to comment.