Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve test coverage #47

Merged
merged 18 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# OTLP Arrow Encoder/Decoder package

Adapter used to convert OTEL batches to/from OTEL Arrow batches in both directions.
This package is a reference implementation of the OTLP Arrow Encoder/Decoder specified in this [OTEP](https://github.com/lquerel/oteps/blob/main/text/0156-columnar-encoding.md.
All OTLP entities are covered (metrics, logs, and traces) as well as all sub-elements such as events, links, gauge, sum,
summary, histograms, ... The overall goal is to optimize the compression ratio for telemetry data transmission as well
as the end-to-end performance between telemetry data producers and receivers.

This package is still experimental and subject to change. It is currently used by an [experimental OTLP/Arrow gRPC
**This package is still experimental and subject to change.** It is currently used by an [experimental OTLP/Arrow gRPC
exporter and receiver](https://github.com/open-telemetry/experimental-arrow-collector).

Other important links:
Expand All @@ -13,7 +16,34 @@ Other important links:
rationale, specifications and different phases of this project.


## Developers
## Testing and validation

The testing of this package and the validation of the OTLP Arrow encoding/decoding are the object of particular
attention because of the central position of this package in the future OTEL collector.

Concerning the test, the plan is to:
- reach at least 80% of the tested code (probably more),
- implement fuzz tests on the encoding and decoding of OTLP Arrow messages,
- implement integration tests with the experimental collector.

Concerning the encoding/decoding validation, the plan is to:
- compare the OTLP entities before and after their conversion to OTLP Arrow entities.
- test the conversion procedure of the production data via a CLI tool or directly via the integration in the
experimental collector.

A validation of the compression ratio stability is also part of the objectives. This validation will be performed on production data.

## Security

A thread model is being defined [WIP] (untrusted input data, what can go wrong at the protocol level, during the
encoding or decoding phases, ...). Below the main risks identified so far:
- invalid, or compromised inputs causing security or reliability issues.
- very large input data causing denial of service.
- high cardinality data causing dictionary overflow (over multiple messages).
lquerel marked this conversation as resolved.
Show resolved Hide resolved
- ... TBD


## Developers

### How to change the protobuf specification

Expand All @@ -28,6 +58,7 @@ cd ./proto
Once the `*.pb.go` files are generated, you need to replace the content of the `api/collector/arrow/v1` directory by the
generated files present in the `./proto/api/collector/arrow/v1` directory.


## Packages

| Package | Description |
Expand Down
18 changes: 9 additions & 9 deletions docs/arrow_schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ resource_metrics:
dropped_attributes_count: uint32
schema_url: string | string_dictionary
metrics:
- name: string | string_dictionary
- name: string | string_dictionary # required
description: string | string_dictionary
unit: string | string_dictionary
data: # arrow type = sparse union
- gauge:
data_points:
- attributes: *attributes
start_time_unix_nano: uint64
time_unix_nano: uint64
time_unix_nano: uint64 # required
value: # arrow type = dense union
i64: int64
f64: float64
Expand All @@ -74,7 +74,7 @@ resource_metrics:
data_points:
- attributes: *attributes
start_time_unix_nano: uint64
time_unix_nano: uint64
time_unix_nano: uint64 # required
value: # arrow type = dense union
i64: int64
f64: float64
Expand All @@ -86,7 +86,7 @@ resource_metrics:
data_points:
- attributes: *attributes
start_time_unix_nano: uint64
time_unix_nano: uint64
time_unix_nano: uint64 # required
count: uint64
sum: float64
quantile:
Expand Down Expand Up @@ -178,13 +178,13 @@ resource_spans:
dropped_attributes_count: uint32
schema_url: string | string_dictionary
spans:
- start_time_unix_nano: uint64
end_time_unix_nano: uint64
trace_id: 16_bytes_binary | 16_bytes_binary_dictionary
span_id: 8_bytes_binary | 8_bytes_binary_dictionary
- start_time_unix_nano: uint64 # required
end_time_unix_nano: uint64 # required
trace_id: 16_bytes_binary | 16_bytes_binary_dictionary # required
span_id: 8_bytes_binary | 8_bytes_binary_dictionary # required
trace_state: string | string_dictionary
parent_span_id: 8_bytes_binary | 8_bytes_binary_dictionary
name: string | string_dictionary
name: string | string_dictionary # required
kind: int32
attributes: *attributes
dropped_attributes_count: uint32
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ require (
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
)
15 changes: 7 additions & 8 deletions pkg/air_test/record_gen_test.go → pkg/air/record_gen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package air_test
package air

import (
"fmt"

"github.com/f5/otel-arrow-adapter/pkg/air"
"github.com/f5/otel-arrow-adapter/pkg/air/rfield"
)

func GenSimpleRecord(ts int64) *air.Record {
record := air.NewRecord()
func GenSimpleRecord(ts int64) *Record {
record := NewRecord()
record.I64Field("ts", ts)
record.StringField("a", "a")
record.StringField("b", "b")
record.StringField("c", "c")
return record
}

func GenComplexRecord(ts int64) *air.Record {
record := air.NewRecord()
func GenComplexRecord(ts int64) *Record {
record := NewRecord()
record.I64Field("ts", ts)
record.StructField("a", rfield.Struct{
Fields: []*rfield.Field{
Expand All @@ -44,8 +43,8 @@ func GenComplexRecord(ts int64) *air.Record {
return record
}

func GenRecord(ts int64, value_a, value_b, value_c int) *air.Record {
record := air.NewRecord()
func GenRecord(ts int64, value_a, value_b, value_c int) *Record {
record := NewRecord()
record.I64Field("ts", ts)
record.StringField("c", fmt.Sprintf("c_%d", value_c))
record.StringField("a", fmt.Sprintf("a___%d", value_a))
Expand Down
19 changes: 9 additions & 10 deletions pkg/air_test/record_test.go → pkg/air/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package air_test
package air

import (
"testing"

"github.com/google/go-cmp/cmp"

"github.com/f5/otel-arrow-adapter/pkg/air"
"github.com/f5/otel-arrow-adapter/pkg/air/config"
"github.com/f5/otel-arrow-adapter/pkg/air/rfield"
)

func TestValue(t *testing.T) {
t.Parallel()

record := air.NewRecord()
record := NewRecord()
record.StringField("b", "b")
record.StructField("a", rfield.Struct{
Fields: []*rfield.Field{
Expand Down Expand Up @@ -141,7 +140,7 @@ func TestRecordNormalize(t *testing.T) {
t.Parallel()

emptyString := ""
record := air.NewRecord()
record := NewRecord()
record.StringField("b", "")
record.StructField("a", rfield.Struct{
Fields: []*rfield.Field{
Expand Down Expand Up @@ -173,7 +172,7 @@ func TestRecordNormalize(t *testing.T) {
})
record.Normalize()

expected_record := air.NewRecord()
expected_record := NewRecord()
expected_record.StructField("a", rfield.Struct{
Fields: []*rfield.Field{
{Name: "b", Value: &rfield.String{Value: &emptyString}},
Expand Down Expand Up @@ -204,7 +203,7 @@ func TestRecordNormalize(t *testing.T) {
})
expected_record.StringField("b", "")

if !cmp.Equal(record, expected_record, cmp.AllowUnexported(air.Record{}, rfield.Struct{}, rfield.List{}, rfield.Field{}, rfield.Metadata{})) {
if !cmp.Equal(record, expected_record, cmp.AllowUnexported(Record{}, rfield.Struct{}, rfield.List{}, rfield.Field{}, rfield.Metadata{})) {
t.Errorf("Expected: %+v\nGot: %+v", expected_record, record)
}
}
Expand All @@ -214,7 +213,7 @@ func TestRecordSchemaId(t *testing.T) {

vTrue := true
emptyString := ""
record := air.NewRecord()
record := NewRecord()
record.StringField("b", "")
record.StructField("a", rfield.Struct{
Fields: []*rfield.Field{
Expand Down Expand Up @@ -275,7 +274,7 @@ func TestRecordSchemaId(t *testing.T) {
func TestRecordWithNestedListSchemaId(t *testing.T) {
t.Parallel()

rr := air.NewRecordRepository(config.NewUint16DefaultConfig())
rr := NewRecordRepository(config.NewUint16DefaultConfig())

resourceSpans := rfield.List{
Values: []rfield.Value{
Expand Down Expand Up @@ -383,7 +382,7 @@ func TestRecordWithNestedListSchemaId(t *testing.T) {
},
}

record := air.NewRecord()
record := NewRecord()
record.ListField("resource_spans", resourceSpans)
record.Normalize()
id := record.SchemaId()
Expand Down Expand Up @@ -499,7 +498,7 @@ func TestRecordWithNestedListSchemaId(t *testing.T) {
},
}

record = air.NewRecord()
record = NewRecord()
record.ListField("resource_spans", resourceSpans)
record.Normalize()
id = record.SchemaId()
Expand Down
15 changes: 7 additions & 8 deletions pkg/air_test/repo_test.go → pkg/air/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package air_test
package air

import (
"fmt"
Expand All @@ -25,7 +25,6 @@ import (
"github.com/apache/arrow/go/v11/arrow/array"
"github.com/davecgh/go-spew/spew"

"github.com/f5/otel-arrow-adapter/pkg/air"
config2 "github.com/f5/otel-arrow-adapter/pkg/air/config"
"github.com/f5/otel-arrow-adapter/pkg/air/rfield"
arrow2 "github.com/f5/otel-arrow-adapter/pkg/arrow"
Expand All @@ -34,7 +33,7 @@ import (
func TestAddRecord(t *testing.T) {
t.Parallel()

rr := air.NewRecordRepository(config2.NewUint8DefaultConfig())
rr := NewRecordRepository(config2.NewUint8DefaultConfig())
rr.AddRecord(GenSimpleRecord(0))
rr.AddRecord(GenComplexRecord(1))
rr.AddRecord(GenSimpleRecord(2))
Expand Down Expand Up @@ -98,7 +97,7 @@ func TestOptimize(t *testing.T) {
},
},
}
rr := air.NewRecordRepository(&config)
rr := NewRecordRepository(&config)

for i := 0; i < 100; i++ {
rr.AddRecord(GenRecord(int64(i), i%15, i%2, i))
Expand Down Expand Up @@ -183,7 +182,7 @@ func TestBuild(t *testing.T) {
},
},
}
rr := air.NewRecordRepository(&config)
rr := NewRecordRepository(&config)

recordCount := 100

Expand Down Expand Up @@ -280,7 +279,7 @@ func TestBuildHeterogeneousListOfStructs(t *testing.T) {
t.Parallel()

config := config2.NewUint8DefaultConfig()
rr := air.NewRecordRepository(config)
rr := NewRecordRepository(config)

rr.AddRecord(RecordWithHeterogeneousListOfStructs(1))

Expand All @@ -292,8 +291,8 @@ func TestBuildHeterogeneousListOfStructs(t *testing.T) {
spew.Dump(records)
}

func RecordWithHeterogeneousListOfStructs(ts uint64) *air.Record {
record := air.NewRecord()
func RecordWithHeterogeneousListOfStructs(ts uint64) *Record {
record := NewRecord()
record.U64Field("ts", ts)
record.ListField("events", rfield.List{
Values: []rfield.Value{
Expand Down
23 changes: 23 additions & 0 deletions pkg/arrow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,11 @@ func (los *ListOfStructs) F64FieldById(fieldId int, row int) (float64, error) {
return F64FromArray(column, row)
}

func (los *ListOfStructs) F64OrNilFieldById(fieldId int, row int) (*float64, error) {
column := los.arr.Field(fieldId)
return F64OrNilFromArray(column, row)
}

func (los *ListOfStructs) BoolFieldById(fieldId int, row int) (bool, error) {
column := los.arr.Field(fieldId)
return BoolFromArray(column, row)
Expand Down Expand Up @@ -713,6 +718,24 @@ func F64FromArray(arr arrow.Array, row int) (float64, error) {
}
}

func F64OrNilFromArray(arr arrow.Array, row int) (*float64, error) {
if arr == nil {
return nil, nil
} else {
switch arr := arr.(type) {
case *array.Float64:
if arr.IsNull(row) {
return nil, nil
} else {
v := arr.Value(row)
return &v, nil
}
default:
return nil, fmt.Errorf("column is not of type f64")
}
}
}

func U64FromArray(arr arrow.Array, row int) (uint64, error) {
if arr == nil {
return 0, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ package benchmark

import (
"testing"

"github.com/f5/otel-arrow-adapter/pkg/benchmark"
)

func TestLz4(t *testing.T) {
t.Parallel()

lz4 := benchmark.Lz4()
lz4 := Lz4()
compressed, err := lz4.Compress([]byte("This is an example of text to compress.This is an example of text to compress.This is an example of text to compress."))
if err != nil {
t.Errorf("expected no error, got %v", err)
Expand All @@ -40,7 +38,7 @@ func TestLz4(t *testing.T) {
func TestZstd(t *testing.T) {
t.Parallel()

zstd := benchmark.Zstd()
zstd := Zstd()
compressed, err := zstd.Compress([]byte("This is an example of text to compress.This is an example of text to compress.This is an example of text to compress."))
if err != nil {
t.Errorf("expected no error, got %v", err)
Expand Down
Loading