Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zipkin receiver to OTEL collector #2181

Merged
merged 4 commits into from
Apr 16, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions cmd/opentelemetry-collector/app/defaults/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/processor/batchprocessor"
"github.com/open-telemetry/opentelemetry-collector/receiver"
"github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
"github.com/open-telemetry/opentelemetry-collector/receiver/zipkinreceiver"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
Expand All @@ -31,37 +32,42 @@ import (

// Config creates default configuration.
// It enables default Jaeger receivers, processors and exporters.
func Config(storageType string, factories config.Factories) (*configmodels.Config, error) {
func Config(storageType string, zipkinPort string, factories config.Factories) (*configmodels.Config, error) {
exporters, err := createExporters(storageType, factories)
if err != nil {
return nil, err
}
types := []string{}
expTypes := []string{}
for _, v := range exporters {
types = append(types, v.Type())
expTypes = append(expTypes, v.Type())
}
receivers := createReceivers(zipkinPort, factories)
recTypes := []string{}
for _, v := range receivers {
recTypes = append(recTypes, v.Type())
}
return &configmodels.Config{
Receivers: createReceivers(factories),
Receivers: receivers,
Exporters: exporters,
Processors: createProcessors(factories),
Service: configmodels.Service{
Pipelines: map[string]*configmodels.Pipeline{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Exporters: types,
Receivers: recTypes,
Exporters: expTypes,
Processors: []string{"batch"},
},
},
},
}, nil
}

func createReceivers(factories config.Factories) configmodels.Receivers {
rec := factories.Receivers["jaeger"].CreateDefaultConfig().(*jaegerreceiver.Config)
func createReceivers(zipkinPort string, factories config.Factories) configmodels.Receivers {
jaeger := factories.Receivers["jaeger"].CreateDefaultConfig().(*jaegerreceiver.Config)
// TODO load and serve sampling strategies
// TODO bind sampling strategies file
rec.Protocols = map[string]*receiver.SecureReceiverSettings{
jaeger.Protocols = map[string]*receiver.SecureReceiverSettings{
"grpc": {
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "localhost:14250",
Expand All @@ -83,9 +89,15 @@ func createReceivers(factories config.Factories) configmodels.Receivers {
},
},
}
return map[string]configmodels.Receiver{
"jaeger": rec,
recvs := map[string]configmodels.Receiver{
"jaeger": jaeger,
}
if zipkinPort != ":0" {
zipkin := factories.Receivers["zipkin"].CreateDefaultConfig().(*zipkinreceiver.Config)
zipkin.Endpoint = zipkinPort
recvs["zipkin"] = zipkin
}
return recvs
}

func createExporters(storageTypes string, factories config.Factories) (configmodels.Exporters, error) {
Expand Down
18 changes: 18 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package defaults

import (
"flag"

"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/service/defaultcomponents"
"github.com/spf13/pflag"
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
Expand All @@ -29,6 +32,10 @@ import (

// Components creates default and Jaeger factories
func Components(v *viper.Viper) config.Factories {
// Add flags to viper to make the default values available.
// We have to add all storage flags to viper because any exporter can be specified in the OTEL config file.
// OTEL collector creates default configurations for all factories to verify they can be created.
addDefaultValuesToViper(v)
kafkaExp := kafka.Factory{OptionsFactory: func() *storageKafka.Options {
opts := kafka.DefaultOptions()
opts.InitFromViper(v)
Expand All @@ -51,3 +58,14 @@ func Components(v *viper.Viper) config.Factories {
factories.Exporters[esExp.Type()] = esExp
return factories
}

// addDefaultValuesToViper adds Jaeger storage flags to viper to make the default values available.
func addDefaultValuesToViper(v *viper.Viper) {
flagSet := &flag.FlagSet{}
kafka.DefaultOptions().AddFlags(flagSet)
elasticsearch.DefaultOptions().AddFlags(flagSet)
cassandra.DefaultOptions().AddFlags(flagSet)
pflagSet := &pflag.FlagSet{}
pflagSet.AddGoFlagSet(flagSet)
v.BindPFlags(pflagSet)
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (

// CreateMetricsExporter is not implemented.
// This function implements OTEL component.Factory interface.
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.MetricsExporter, error) {
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.MetricsExporterOld, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (

// CreateMetricsExporter is not implemented.
// This function implements OTEL exporter.Factory interface.
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.MetricsExporter, error) {
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.MetricsExporterOld, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
2 changes: 1 addition & 1 deletion cmd/opentelemetry-collector/app/exporter/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ func (Factory) CreateTraceExporter(log *zap.Logger, cfg configmodels.Exporter) (

// CreateMetricsExporter is not implemented.
// This function implements OTEL component.Factory interface.
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.TraceExporterOld, error) {
func (Factory) CreateMetricsExporter(*zap.Logger, configmodels.Exporter) (component.MetricsExporterOld, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}
6 changes: 5 additions & 1 deletion cmd/opentelemetry-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/service/builder"
"github.com/spf13/viper"

collectorApp "github.com/jaegertracing/jaeger/cmd/collector/app"
jflags "github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/defaults"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
Expand Down Expand Up @@ -62,7 +63,9 @@ func main() {
if getOTELConfigFile() == "" {
log.Println("Config file not provided, installing default Jaeger components")
cfgFactory = func(*viper.Viper, config.Factories) (*configmodels.Config, error) {
return defaults.Config(storageType, cmpts)
collectorOpts := &collectorApp.CollectorOptions{}
collectorOpts.InitFromViper(v)
return defaults.Config(storageType, collectorOpts.CollectorZipkinHTTPHostPort, cmpts)
}
}

Expand All @@ -82,6 +85,7 @@ func main() {
cmd := svc.Command()
jconfig.AddFlags(v,
cmd,
collectorApp.AddFlags,
jflags.AddConfigFileFlag,
storageFlags,
)
Expand Down