diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index d54022c9034..8340532fcaf 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -48,3 +48,4 @@ - Update `install` command to perform enroll before starting Elastic Agent {pull}21772[21772] - Update `fleet.kibana.path` from a POLICY_CHANGE {pull}21804[21804] - Removed `install-service.ps1` and `uninstall-service.ps1` from Windows .zip packaging {pull}21694[21694] +- Add `priority` to `AddOrUpdate` on dynamic composable input providers communication channel {pull}22352[22352] diff --git a/x-pack/elastic-agent/pkg/composable/controller.go b/x-pack/elastic-agent/pkg/composable/controller.go index 50cc9b69ead..cb629f4c7e9 100644 --- a/x-pack/elastic-agent/pkg/composable/controller.go +++ b/x-pack/elastic-agent/pkg/composable/controller.go @@ -214,6 +214,7 @@ func (c *contextProviderState) Current() map[string]interface{} { } type dynamicProviderMapping struct { + priority int mapping map[string]interface{} processors transpiler.Processors } @@ -228,7 +229,11 @@ type dynamicProviderState struct { } // AddOrUpdate adds or updates the current mapping for the dynamic provider. -func (c *dynamicProviderState) AddOrUpdate(id string, mapping map[string]interface{}, processors []map[string]interface{}) error { +// +// `priority` ensures that order is maintained when adding the mapping to the current state +// for the processor. Lower priority mappings will always be sorted before higher priority mappings +// to ensure that matching of variables occurs on the lower priority mappings first. +func (c *dynamicProviderState) AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error { var err error mapping, err = cloneMap(mapping) if err != nil { @@ -252,6 +257,7 @@ func (c *dynamicProviderState) AddOrUpdate(id string, mapping map[string]interfa return nil } c.mappings[id] = dynamicProviderMapping{ + priority: priority, mapping: mapping, processors: processors, } @@ -276,14 +282,24 @@ func (c *dynamicProviderState) Mappings() []dynamicProviderMapping { c.lock.RLock() defer c.lock.RUnlock() + // add the mappings sorted by (priority,id) mappings := make([]dynamicProviderMapping, 0) - ids := make([]string, 0) - for name := range c.mappings { - ids = append(ids, name) + priorities := make([]int, 0) + for _, mapping := range c.mappings { + priorities = addToSet(priorities, mapping.priority) } - sort.Strings(ids) - for _, name := range ids { - mappings = append(mappings, c.mappings[name]) + sort.Ints(priorities) + for _, priority := range priorities { + ids := make([]string, 0) + for name, mapping := range c.mappings { + if mapping.priority == priority { + ids = append(ids, name) + } + } + sort.Strings(ids) + for _, name := range ids { + mappings = append(mappings, c.mappings[name]) + } } return mappings } @@ -319,3 +335,12 @@ func cloneMapArray(source []map[string]interface{}) ([]map[string]interface{}, e } return dest, nil } + +func addToSet(set []int, i int) []int { + for _, j := range set { + if j == i { + return set + } + } + return append(set, i) +} diff --git a/x-pack/elastic-agent/pkg/composable/dynamic.go b/x-pack/elastic-agent/pkg/composable/dynamic.go index 33fbaca5a52..24c272a322f 100644 --- a/x-pack/elastic-agent/pkg/composable/dynamic.go +++ b/x-pack/elastic-agent/pkg/composable/dynamic.go @@ -18,7 +18,11 @@ type DynamicProviderComm interface { context.Context // AddOrUpdate updates a mapping with given ID with latest mapping and processors. - AddOrUpdate(id string, mapping map[string]interface{}, processors []map[string]interface{}) error + // + // `priority` ensures that order is maintained when adding the mapping to the current state + // for the processor. Lower priority mappings will always be sorted before higher priority mappings + // to ensure that matching of variables occurs on the lower priority mappings first. + AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error // Remove removes a mapping by given ID. Remove(id string) } diff --git a/x-pack/elastic-agent/pkg/composable/providers/docker/docker.go b/x-pack/elastic-agent/pkg/composable/providers/docker/docker.go index dee8b96bb88..902ceb7c832 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/docker/docker.go +++ b/x-pack/elastic-agent/pkg/composable/providers/docker/docker.go @@ -18,6 +18,9 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) +// ContainerPriority is the priority that container mappings are added to the provider. +const ContainerPriority = 0 + func init() { composable.Providers.AddDynamicProvider("docker", DynamicProviderBuilder) } @@ -76,7 +79,7 @@ func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error { delete(stoppers, data.container.ID) return } - comm.AddOrUpdate(data.container.ID, data.mapping, data.processors) + comm.AddOrUpdate(data.container.ID, ContainerPriority, data.mapping, data.processors) case event := <-stopListener.Events(): data, err := generateData(event) if err != nil { diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go index 200bedbe878..1a85005df19 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go @@ -28,4 +28,5 @@ type Config struct { func (c *Config) InitDefaults() { c.SyncPeriod = 10 * time.Minute c.CleanupTimeout = 60 * time.Second + c.Scope = "node" } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go index c777c8a05ce..9cea442dc6b 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go @@ -15,6 +15,13 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) +const ( + // PodPriority is the priority that pod mappings are added to the provider. + PodPriority = 0 + // ContainerPriority is the priority that container mappings are added to the provider. + ContainerPriority = 1 +) + func init() { composable.Providers.AddDynamicProvider("kubernetes", DynamicProviderBuilder) } @@ -54,13 +61,13 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error { // Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty // when cluster scope is enforced. + p.logger.Infof("Kubernetes provider started with %s scope", p.config.Scope) if p.config.Scope == "node" { + p.logger.Debugf("Initializing Kubernetes watcher using node: %v", p.config.Node) p.config.Node = kubernetes.DiscoverKubernetesNode(p.logger, p.config.Node, kubernetes.IsInCluster(p.config.KubeConfig), client) } else { p.config.Node = "" } - p.logger.Infof("Kubernetes provider started with %s scope", p.config.Scope) - p.logger.Debugf("Initializing Kubernetes watcher using node: %v", p.config.Node) watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{ SyncTimeout: p.config.SyncPeriod, @@ -70,7 +77,6 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error { if err != nil { return errors.New(err, "couldn't create kubernetes watcher") } - watcher.AddEventHandler(&eventWatcher{p.logger, p.config.CleanupTimeout, comm}) err = watcher.Start() @@ -104,7 +110,7 @@ func (p *eventWatcher) emitRunning(pod *kubernetes.Pod) { // Emit the pod // We emit Pod + containers to ensure that configs matching Pod only // get Pod metadata (not specific to any container) - p.comm.AddOrUpdate(string(pod.GetUID()), mapping, processors) + p.comm.AddOrUpdate(string(pod.GetUID()), PodPriority, mapping, processors) // Emit all containers in the pod p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses) @@ -161,7 +167,7 @@ func (p *eventWatcher) emitContainers(pod *kubernetes.Pod, containers []kubernet } // Emit the container - p.comm.AddOrUpdate(eventID, mapping, processors) + p.comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors) } } diff --git a/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic.go b/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic.go index 240b7251aa0..b8466055e9e 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic.go +++ b/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic.go @@ -14,6 +14,9 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) +// ItemPriority is the priority that item mappings are added to the provider. +const ItemPriority = 0 + func init() { composable.Providers.AddDynamicProvider("local_dynamic", DynamicProviderBuilder) } @@ -30,7 +33,7 @@ type dynamicProvider struct { // Run runs the environment context provider. func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error { for i, item := range c.Items { - if err := comm.AddOrUpdate(strconv.Itoa(i), item.Mapping, item.Processors); err != nil { + if err := comm.AddOrUpdate(strconv.Itoa(i), ItemPriority, item.Mapping, item.Processors); err != nil { return errors.New(err, fmt.Sprintf("failed to add mapping for index %d", i), errors.TypeUnexpected) } } diff --git a/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic_test.go b/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic_test.go index 9e8bc7a394c..1576d51194a 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic_test.go @@ -69,11 +69,13 @@ func TestContextProvider(t *testing.T) { curr1, ok1 := comm.Current("0") assert.True(t, ok1) + assert.Equal(t, ItemPriority, curr1.Priority) assert.Equal(t, mapping1, curr1.Mapping) assert.Equal(t, processors1, curr1.Processors) curr2, ok2 := comm.Current("1") assert.True(t, ok2) + assert.Equal(t, ItemPriority, curr2.Priority) assert.Equal(t, mapping2, curr2.Mapping) assert.Equal(t, processors2, curr2.Processors) } diff --git a/x-pack/elastic-agent/pkg/composable/testing/dynamic.go b/x-pack/elastic-agent/pkg/composable/testing/dynamic.go index 3943ac4b3bb..bfa48dff57d 100644 --- a/x-pack/elastic-agent/pkg/composable/testing/dynamic.go +++ b/x-pack/elastic-agent/pkg/composable/testing/dynamic.go @@ -11,6 +11,7 @@ import ( // DynamicState is the state of the dynamic mapping. type DynamicState struct { + Priority int Mapping map[string]interface{} Processors []map[string]interface{} } @@ -34,7 +35,7 @@ func NewDynamicComm(ctx context.Context) *DynamicComm { } // AddOrUpdate adds or updates a current mapping. -func (t *DynamicComm) AddOrUpdate(id string, mapping map[string]interface{}, processors []map[string]interface{}) error { +func (t *DynamicComm) AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error { var err error mapping, err = CloneMap(mapping) if err != nil { @@ -53,6 +54,7 @@ func (t *DynamicComm) AddOrUpdate(id string, mapping map[string]interface{}, pro t.previous[id] = prev } t.current[id] = DynamicState{ + Priority: priority, Mapping: mapping, Processors: processors, }