Skip to content

Commit

Permalink
Documentation improvements - Comments in key functions (open-telemetr…
Browse files Browse the repository at this point in the history
…y#10029)

#### Documentation
I wrote comments on a bunch of important functions that helped me
understand how the collector works.
I also created some other documentation in
open-telemetry#10068 -
but split it up from this PR.
  • Loading branch information
ankitpatel96 authored and andrzej-stencel committed May 27, 2024
1 parent cbf61d0 commit 81aac9f
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 14 deletions.
23 changes: 14 additions & 9 deletions confmap/confmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,30 +245,35 @@ func expandNilStructPointersHookFunc() mapstructure.DecodeHookFuncValue {
// This is needed in combination with ComponentID, which may produce equal IDs for different strings,
// and an error needs to be returned in that case, otherwise the last equivalent ID overwrites the previous one.
func mapKeyStringToMapKeyTextUnmarshalerHookFunc() mapstructure.DecodeHookFuncType {
return func(f reflect.Type, t reflect.Type, data any) (any, error) {
if f.Kind() != reflect.Map || f.Key().Kind() != reflect.String {
return func(from reflect.Type, to reflect.Type, data any) (any, error) {
if from.Kind() != reflect.Map || from.Key().Kind() != reflect.String {
return data, nil
}

if t.Kind() != reflect.Map {
if to.Kind() != reflect.Map {
return data, nil
}

if _, ok := reflect.New(t.Key()).Interface().(encoding.TextUnmarshaler); !ok {
// Checks that the key type of to implements the TextUnmarshaler interface.
if _, ok := reflect.New(to.Key()).Interface().(encoding.TextUnmarshaler); !ok {
return data, nil
}

m := reflect.MakeMap(reflect.MapOf(t.Key(), reflect.TypeOf(true)))
// Create a map with key value of to's key to bool.
fieldNameSet := reflect.MakeMap(reflect.MapOf(to.Key(), reflect.TypeOf(true)))
for k := range data.(map[string]any) {
tKey := reflect.New(t.Key())
// Create a new value of the to's key type.
tKey := reflect.New(to.Key())

// Use tKey to unmarshal the key of the map.
if err := tKey.Interface().(encoding.TextUnmarshaler).UnmarshalText([]byte(k)); err != nil {
return nil, err
}

if m.MapIndex(reflect.Indirect(tKey)).IsValid() {
// Checks if the key has already been decoded in a previous iteration.
if fieldNameSet.MapIndex(reflect.Indirect(tKey)).IsValid() {
return nil, fmt.Errorf("duplicate name %q after unmarshaling %v", k, tKey)
}
m.SetMapIndex(reflect.Indirect(tKey), reflect.ValueOf(true))
fieldNameSet.SetMapIndex(reflect.Indirect(tKey), reflect.ValueOf(true))
}
return data, nil
}
Expand Down
1 change: 0 additions & 1 deletion confmap/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func NewResolver(set ResolverSettings) (*Resolver, error) {
}

// Resolve returns the configuration as a Conf, or error otherwise.
//
// Should never be called concurrently with itself, Watch or Shutdown.
func (mr *Resolver) Resolve(ctx context.Context) (*Conf, error) {
// First check if already an active watching, close that if any.
Expand Down
4 changes: 4 additions & 0 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
}

col.serviceConfig = &cfg.Service

col.service, err = service.New(ctx, service.Settings{
BuildInfo: col.set.BuildInfo,
CollectorConf: conf,
Expand Down Expand Up @@ -248,6 +249,7 @@ func (col *Collector) DryRun(ctx context.Context) error {

// Run starts the collector according to the given configuration, and waits for it to complete.
// Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down.
// Sets up the control logic for config reloading and shutdown.
func (col *Collector) Run(ctx context.Context) error {
if err := col.setupConfigurationComponents(ctx); err != nil {
col.setCollectorState(StateClosed)
Expand All @@ -263,6 +265,8 @@ func (col *Collector) Run(ctx context.Context) error {
signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM)
}

// Control loop: selects between channels for various interrupts - when this loop is broken, the collector exits.
// If a configuration reload fails, we return without waiting for graceful shutdown.
LOOP:
for {
select {
Expand Down
1 change: 1 addition & 0 deletions otelcol/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func NewCommand(set CollectorSettings) *cobra.Command {
return rootCmd
}

// Puts command line flags from flags into the CollectorSettings, to be used during config resolution.
func updateSettingsUsingFlags(set *CollectorSettings, flags *flag.FlagSet) error {
if set.ConfigProvider == nil {
resolverSet := &set.ConfigProviderSettings.ResolverSettings
Expand Down
22 changes: 19 additions & 3 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Graph struct {
telemetry servicetelemetry.TelemetrySettings
}

// Build builds a full pipeline graph.
// Build also validates the configuration of the pipelines and does the actual initialization of each Component in the Graph.
func Build(ctx context.Context, set Settings) (*Graph, error) {
pipelines := &Graph{
componentGraph: simple.NewDirectedGraph(),
Expand All @@ -73,18 +75,22 @@ func Build(ctx context.Context, set Settings) (*Graph, error) {
return pipelines, pipelines.buildComponents(ctx, set)
}

// Creates a node for each instance of a component and adds it to the graph
// Creates a node for each instance of a component and adds it to the graph.
// Validates that connectors are configured to export and receive correctly.
func (g *Graph) createNodes(set Settings) error {
// Build a list of all connectors for easy reference
// Build a list of all connectors for easy reference.
connectors := make(map[component.ID]struct{})

// Keep track of connectors and where they are used. (map[connectorID][]pipelineID)
// Keep track of connectors and where they are used. (map[connectorID][]pipelineID).
connectorsAsExporter := make(map[component.ID][]component.ID)
connectorsAsReceiver := make(map[component.ID][]component.ID)

// Build each pipelineNodes struct for each pipeline by parsing the pipelineCfg.
// Also populates the connectors, connectorsAsExporter and connectorsAsReceiver maps.
for pipelineID, pipelineCfg := range set.PipelineConfigs {
pipe := g.pipelines[pipelineID]
for _, recvID := range pipelineCfg.Receivers {
// Checks if this receiver is a connector or a regular receiver.
if set.ConnectorBuilder.IsConfigured(recvID) {
connectors[recvID] = struct{}{}
connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID)
Expand Down Expand Up @@ -138,6 +144,7 @@ func (g *Graph) createNodes(set Settings) error {

for expType := range expTypes {
for recType := range recTypes {
// Typechecks the connector's receiving and exporting datatypes.
if connectorStability(connFactory, expType, recType) != component.StabilityLevelUndefined {
expTypes[expType] = true
recTypes[recType] = true
Expand Down Expand Up @@ -241,19 +248,24 @@ func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID, connID component
return connNode
}

// Iterates through the pipelines and creates edges between components.
func (g *Graph) createEdges() {
for _, pg := range g.pipelines {
// Draw edges from each receiver to the capability node.
for _, receiver := range pg.receivers {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(receiver, pg.capabilitiesNode))
}

// Iterates through processors, chaining them together. starts with the capabilities node.
var from, to graph.Node
from = pg.capabilitiesNode
for _, processor := range pg.processors {
to = processor
g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))
from = processor
}
// Always inserts a fanout node before any exporters. If there is only one
// exporter, the fanout node is still created and acts as a noop.
to = pg.fanOutNode
g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))

Expand All @@ -263,6 +275,9 @@ func (g *Graph) createEdges() {
}
}

// Uses the already built graph g to instantiate the actual components for each component of each pipeline.
// Handles calling the factories for each component - and hooking up each component to the next.
// Also calculates whether each pipeline mutates data so the receiver can know whether it needs to clone the data.
func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
Expand All @@ -282,6 +297,7 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
case *receiverNode:
err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID()))
case *processorNode:
// nextConsumers is guaranteed to be length 1. Either it is the next processor or it is the fanout node for the exporters.
err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ProcessorBuilder, g.nextConsumers(n.ID())[0])
case *exporterNode:
err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ExporterBuilder)
Expand Down
6 changes: 5 additions & 1 deletion service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"go.opentelemetry.io/collector/service/telemetry"
)

// Settings holds configuration for building a new service.
// Settings holds configuration for building a new Service.
type Settings struct {
// BuildInfo provides collector start information.
BuildInfo component.BuildInfo
Expand Down Expand Up @@ -72,6 +72,7 @@ type Service struct {
collectorConf *confmap.Conf
}

// New creates a new Service, its telemetry, and Components.
func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
disableHighCard := obsreportconfig.DisableHighCardinalityMetricsfeatureGate.IsEnabled()
extendedConfig := obsreportconfig.UseOtelWithSDKConfigurationForInternalTelemetryFeatureGate.IsEnabled()
Expand All @@ -92,6 +93,8 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
if err != nil {
return nil, fmt.Errorf("failed to get logger: %w", err)
}

// Fetch data for internal telemetry like instance id and sdk version to provide for internal telemetry.
res := resource.New(set.BuildInfo, cfg.Telemetry.Resource)
pcommonRes := pdataFromSdk(res)

Expand Down Expand Up @@ -247,6 +250,7 @@ func (srv *Service) Shutdown(ctx context.Context) error {
return errs
}

// Creates extensions and then builds the pipeline graph.
func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings, cfg Config) error {
var err error
extensionsSettings := extensions.Settings{
Expand Down

0 comments on commit 81aac9f

Please sign in to comment.