From 81aac9f5c6ec0b51bde3ef6c517eb83b94b8d3a7 Mon Sep 17 00:00:00 2001 From: Ankit Patel <8731662+ankitpatel96@users.noreply.github.com> Date: Thu, 2 May 2024 16:28:54 -0400 Subject: [PATCH] Documentation improvements - Comments in key functions (#10029) #### Documentation I wrote comments on a bunch of important functions that helped me understand how the collector works. I also created some other documentation in https://github.com/open-telemetry/opentelemetry-collector/pull/10068 - but split it up from this PR. --- confmap/confmap.go | 23 ++++++++++++++--------- confmap/resolver.go | 1 - otelcol/collector.go | 4 ++++ otelcol/command.go | 1 + service/internal/graph/graph.go | 22 +++++++++++++++++++--- service/service.go | 6 +++++- 6 files changed, 43 insertions(+), 14 deletions(-) diff --git a/confmap/confmap.go b/confmap/confmap.go index f3e0471e1d6..655ccc07315 100644 --- a/confmap/confmap.go +++ b/confmap/confmap.go @@ -245,30 +245,35 @@ func expandNilStructPointersHookFunc() mapstructure.DecodeHookFuncValue { // This is needed in combination with ComponentID, which may produce equal IDs for different strings, // and an error needs to be returned in that case, otherwise the last equivalent ID overwrites the previous one. func mapKeyStringToMapKeyTextUnmarshalerHookFunc() mapstructure.DecodeHookFuncType { - return func(f reflect.Type, t reflect.Type, data any) (any, error) { - if f.Kind() != reflect.Map || f.Key().Kind() != reflect.String { + return func(from reflect.Type, to reflect.Type, data any) (any, error) { + if from.Kind() != reflect.Map || from.Key().Kind() != reflect.String { return data, nil } - if t.Kind() != reflect.Map { + if to.Kind() != reflect.Map { return data, nil } - if _, ok := reflect.New(t.Key()).Interface().(encoding.TextUnmarshaler); !ok { + // Checks that the key type of to implements the TextUnmarshaler interface. + if _, ok := reflect.New(to.Key()).Interface().(encoding.TextUnmarshaler); !ok { return data, nil } - m := reflect.MakeMap(reflect.MapOf(t.Key(), reflect.TypeOf(true))) + // Create a map with key value of to's key to bool. + fieldNameSet := reflect.MakeMap(reflect.MapOf(to.Key(), reflect.TypeOf(true))) for k := range data.(map[string]any) { - tKey := reflect.New(t.Key()) + // Create a new value of the to's key type. + tKey := reflect.New(to.Key()) + + // Use tKey to unmarshal the key of the map. if err := tKey.Interface().(encoding.TextUnmarshaler).UnmarshalText([]byte(k)); err != nil { return nil, err } - - if m.MapIndex(reflect.Indirect(tKey)).IsValid() { + // Checks if the key has already been decoded in a previous iteration. + if fieldNameSet.MapIndex(reflect.Indirect(tKey)).IsValid() { return nil, fmt.Errorf("duplicate name %q after unmarshaling %v", k, tKey) } - m.SetMapIndex(reflect.Indirect(tKey), reflect.ValueOf(true)) + fieldNameSet.SetMapIndex(reflect.Indirect(tKey), reflect.ValueOf(true)) } return data, nil } diff --git a/confmap/resolver.go b/confmap/resolver.go index dc1cd7b7700..05f7f964d03 100644 --- a/confmap/resolver.go +++ b/confmap/resolver.go @@ -151,7 +151,6 @@ func NewResolver(set ResolverSettings) (*Resolver, error) { } // Resolve returns the configuration as a Conf, or error otherwise. -// // Should never be called concurrently with itself, Watch or Shutdown. func (mr *Resolver) Resolve(ctx context.Context) (*Conf, error) { // First check if already an active watching, close that if any. diff --git a/otelcol/collector.go b/otelcol/collector.go index e7983017d3d..df0860446a2 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -187,6 +187,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { } col.serviceConfig = &cfg.Service + col.service, err = service.New(ctx, service.Settings{ BuildInfo: col.set.BuildInfo, CollectorConf: conf, @@ -248,6 +249,7 @@ func (col *Collector) DryRun(ctx context.Context) error { // Run starts the collector according to the given configuration, and waits for it to complete. // Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down. +// Sets up the control logic for config reloading and shutdown. func (col *Collector) Run(ctx context.Context) error { if err := col.setupConfigurationComponents(ctx); err != nil { col.setCollectorState(StateClosed) @@ -263,6 +265,8 @@ func (col *Collector) Run(ctx context.Context) error { signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM) } + // Control loop: selects between channels for various interrupts - when this loop is broken, the collector exits. + // If a configuration reload fails, we return without waiting for graceful shutdown. LOOP: for { select { diff --git a/otelcol/command.go b/otelcol/command.go index 8e5c6284b3b..9db850bcc13 100644 --- a/otelcol/command.go +++ b/otelcol/command.go @@ -41,6 +41,7 @@ func NewCommand(set CollectorSettings) *cobra.Command { return rootCmd } +// Puts command line flags from flags into the CollectorSettings, to be used during config resolution. func updateSettingsUsingFlags(set *CollectorSettings, flags *flag.FlagSet) error { if set.ConfigProvider == nil { resolverSet := &set.ConfigProviderSettings.ResolverSettings diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 04643a37526..525c269a6de 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -53,6 +53,8 @@ type Graph struct { telemetry servicetelemetry.TelemetrySettings } +// Build builds a full pipeline graph. +// Build also validates the configuration of the pipelines and does the actual initialization of each Component in the Graph. func Build(ctx context.Context, set Settings) (*Graph, error) { pipelines := &Graph{ componentGraph: simple.NewDirectedGraph(), @@ -73,18 +75,22 @@ func Build(ctx context.Context, set Settings) (*Graph, error) { return pipelines, pipelines.buildComponents(ctx, set) } -// Creates a node for each instance of a component and adds it to the graph +// Creates a node for each instance of a component and adds it to the graph. +// Validates that connectors are configured to export and receive correctly. func (g *Graph) createNodes(set Settings) error { - // Build a list of all connectors for easy reference + // Build a list of all connectors for easy reference. connectors := make(map[component.ID]struct{}) - // Keep track of connectors and where they are used. (map[connectorID][]pipelineID) + // Keep track of connectors and where they are used. (map[connectorID][]pipelineID). connectorsAsExporter := make(map[component.ID][]component.ID) connectorsAsReceiver := make(map[component.ID][]component.ID) + // Build each pipelineNodes struct for each pipeline by parsing the pipelineCfg. + // Also populates the connectors, connectorsAsExporter and connectorsAsReceiver maps. for pipelineID, pipelineCfg := range set.PipelineConfigs { pipe := g.pipelines[pipelineID] for _, recvID := range pipelineCfg.Receivers { + // Checks if this receiver is a connector or a regular receiver. if set.ConnectorBuilder.IsConfigured(recvID) { connectors[recvID] = struct{}{} connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID) @@ -138,6 +144,7 @@ func (g *Graph) createNodes(set Settings) error { for expType := range expTypes { for recType := range recTypes { + // Typechecks the connector's receiving and exporting datatypes. if connectorStability(connFactory, expType, recType) != component.StabilityLevelUndefined { expTypes[expType] = true recTypes[recType] = true @@ -241,12 +248,15 @@ func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID, connID component return connNode } +// Iterates through the pipelines and creates edges between components. func (g *Graph) createEdges() { for _, pg := range g.pipelines { + // Draw edges from each receiver to the capability node. for _, receiver := range pg.receivers { g.componentGraph.SetEdge(g.componentGraph.NewEdge(receiver, pg.capabilitiesNode)) } + // Iterates through processors, chaining them together. starts with the capabilities node. var from, to graph.Node from = pg.capabilitiesNode for _, processor := range pg.processors { @@ -254,6 +264,8 @@ func (g *Graph) createEdges() { g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to)) from = processor } + // Always inserts a fanout node before any exporters. If there is only one + // exporter, the fanout node is still created and acts as a noop. to = pg.fanOutNode g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to)) @@ -263,6 +275,9 @@ func (g *Graph) createEdges() { } } +// Uses the already built graph g to instantiate the actual components for each component of each pipeline. +// Handles calling the factories for each component - and hooking up each component to the next. +// Also calculates whether each pipeline mutates data so the receiver can know whether it needs to clone the data. func (g *Graph) buildComponents(ctx context.Context, set Settings) error { nodes, err := topo.Sort(g.componentGraph) if err != nil { @@ -282,6 +297,7 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { case *receiverNode: err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID())) case *processorNode: + // nextConsumers is guaranteed to be length 1. Either it is the next processor or it is the fanout node for the exporters. err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ProcessorBuilder, g.nextConsumers(n.ID())[0]) case *exporterNode: err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ExporterBuilder) diff --git a/service/service.go b/service/service.go index 283e9c94d09..902454c78f1 100644 --- a/service/service.go +++ b/service/service.go @@ -34,7 +34,7 @@ import ( "go.opentelemetry.io/collector/service/telemetry" ) -// Settings holds configuration for building a new service. +// Settings holds configuration for building a new Service. type Settings struct { // BuildInfo provides collector start information. BuildInfo component.BuildInfo @@ -72,6 +72,7 @@ type Service struct { collectorConf *confmap.Conf } +// New creates a new Service, its telemetry, and Components. func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { disableHighCard := obsreportconfig.DisableHighCardinalityMetricsfeatureGate.IsEnabled() extendedConfig := obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled() @@ -92,6 +93,8 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { if err != nil { return nil, fmt.Errorf("failed to get logger: %w", err) } + + // Fetch data for internal telemetry like instance id and sdk version to provide for internal telemetry. res := resource.New(set.BuildInfo, cfg.Telemetry.Resource) pcommonRes := pdataFromSdk(res) @@ -247,6 +250,7 @@ func (srv *Service) Shutdown(ctx context.Context) error { return errs } +// Creates extensions and then builds the pipeline graph. func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings, cfg Config) error { var err error extensionsSettings := extensions.Settings{