Skip to content

Commit

Permalink
Inject inferred queue settings into agent config generation (#27429)
Browse files Browse the repository at this point in the history
(cherry picked from commit 9e6c6e1)
  • Loading branch information
faec authored and mergify-bot committed Aug 17, 2021
1 parent 7cc33f5 commit 1cc53ba
Show file tree
Hide file tree
Showing 23 changed files with 259 additions and 1 deletion.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,4 @@
- Increase Agent's mem limits in k8s. {pull}27153[27153]
- Add new --enroll-delay option for install and enroll commands. {pull}27118[27118]
- Add link to troubleshooting guide on fatal exits. {issue}26367[26367] {pull}27236[27236]
- Agent now adapts the beats queue size based on output settings. {issue}26638[26638] {pull}27429[27429]
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/program/supported.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 350
flush:
min_events: 50
timeout: 1s

filebeat:
inputs:
- type: log
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 350
flush:
min_events: 50
timeout: 1s

filebeat:
inputs:
- type: log
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 350
flush:
min_events: 50
timeout: 1s

filebeat:
inputs:
- type: log
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 350
flush:
min_events: 50
timeout: 1s

inputs:
- type: synthetics/http
id: unique-http-id
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 350
flush:
min_events: 50
timeout: 1s

metricbeat:
modules:
- module: docker
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 350
flush:
min_events: 50
timeout: 1s

inputs:
- type: packet
processors:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ output:
hosts:
- "127.0.0.1:9200"
- "127.0.0.1:9300"
bulk_max_size: 23
worker: 10
username: elastic
password: changeme
api_key: TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 575
flush:
min_events: 23
timeout: 1s

filebeat:
inputs:
- type: log
Expand Down Expand Up @@ -63,5 +70,7 @@ output:
- 127.0.0.1:9300
username: elastic
password: changeme
bulk_max_size: 23
worker: 10
api_key: TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA
ca_sha256: 7HIpactkIAq2Y49orFOOQKurWxmmSFZhBCoQYcRhJ3Y=
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 575
flush:
min_events: 23
timeout: 1s

inputs:
- type: synthetics/http
id: unique-http-id
Expand All @@ -23,6 +30,8 @@ output:
hosts:
- 127.0.0.1:9200
- 127.0.0.1:9300
bulk_max_size: 23
worker: 10
username: elastic
password: changeme
api_key: TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 575
flush:
min_events: 23
timeout: 1s

metricbeat:
modules:
- module: docker
Expand Down Expand Up @@ -83,5 +90,7 @@ output:
hosts: [127.0.0.1:9200, 127.0.0.1:9300]
username: elastic
password: changeme
bulk_max_size: 23
worker: 10
api_key: TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA
ca_sha256: 7HIpactkIAq2Y49orFOOQKurWxmmSFZhBCoQYcRhJ3Y=
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 575
flush:
min_events: 23
timeout: 1s

inputs:
- type: packet
processors:
Expand Down Expand Up @@ -30,5 +37,7 @@ output:
- 127.0.0.1:9300
username: elastic
password: changeme
bulk_max_size: 23
worker: 10
api_key: TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA
ca_sha256: 7HIpactkIAq2Y49orFOOQKurWxmmSFZhBCoQYcRhJ3Y=
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ outputs:
default:
type: elasticsearch
hosts: [127.0.0.1:9200, 127.0.0.1:9300]
bulk_max_size: 23
worker: 10
username: elastic
password: changeme
api_key: TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 350
flush:
min_events: 50
timeout: 1s

inputs:
- type: synthetics/http
id: unique-http-id
Expand Down
103 changes: 103 additions & 0 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func (r *RuleList) MarshalYAML() (interface{}, error) {
name = "insert_defaults"
case *InjectHeadersRule:
name = "inject_headers"
case *InjectQueueRule:
name = "inject_queue"
default:
return nil, fmt.Errorf("unknown rule of type %T", rule)
}
Expand Down Expand Up @@ -180,6 +182,8 @@ func (r *RuleList) UnmarshalYAML(unmarshal func(interface{}) error) error {
r = &InsertDefaultsRule{}
case "inject_headers":
r = &InjectHeadersRule{}
case "inject_queue":
r = &InjectQueueRule{}
default:
return fmt.Errorf("unknown rule of type %s", name)
}
Expand Down Expand Up @@ -1519,6 +1523,105 @@ func InsertDefaults(path string, selectors ...Selector) *InsertDefaultsRule {
}
}

// InjectQueueRule injects inferred queue parameters into program
// configurations.
type InjectQueueRule struct{}

// InjectQueue creates a InjectQueueRule
func InjectQueue() *InjectQueueRule {
return &InjectQueueRule{}
}

// Apply adds queue parameters to a program configuration based on the
// output settings "worker" and "bulk_max_size".
func (r *InjectQueueRule) Apply(agentInfo AgentInfo, ast *AST) (err error) {
defer func() {
if err != nil {
err = errors.New(err, "failed to inject queue parameters into configuration")
}
}()

outputNode, found := Lookup(ast, "output")
if !found {
return nil
}

outputDict, ok := outputNode.Value().(*Dict)
if !ok || len(outputDict.value) == 0 {
return nil
}
outputChild := outputDict.value[0]

// Initialize the bulk_max_size and worker parameters to the global defaults,
// then override them if there's an explicit setting.
bulkMaxSize := 50
worker := 1

if bulkMaxSizeNode, ok := outputChild.Find("bulk_max_size"); ok {
if bulkMaxSizeInt, ok := bulkMaxSizeNode.Value().(*IntVal); ok {
bulkMaxSize = bulkMaxSizeInt.value
}
}

if workerNode, ok := outputChild.Find("worker"); ok {
if workerInt, ok := workerNode.Value().(*IntVal); ok {
worker = workerInt.value
}
}

// Insert memory queue settings based on the output params.
queueNode := queueDictFromOutputSettings(bulkMaxSize, worker)
if err := Insert(ast, queueNode, "queue.mem"); err != nil {
return err
}

return nil
}

func queueDictFromOutputSettings(bulkMaxSize, worker int) Node {
events, minEvents := queueParamsFromOutputSettings(bulkMaxSize, worker)
dict := &Dict{
value: []Node{
&Key{
name: "events",
value: &IntVal{value: events},
},
&Key{
name: "flush",
value: &Dict{
value: []Node{
&Key{
name: "min_events",
value: &IntVal{value: minEvents},
},
&Key{
name: "timeout",
value: &StrVal{value: "1s"},
},
},
},
},
},
}
return dict
}

// Given output settings, returns inferred values for queue.mem.events
// and queue.mem.flush.min_events.
// See https://github.com/elastic/beats/issues/26638.
func queueParamsFromOutputSettings(bulkMaxSize, worker int) (int, int) {
// Create space in the queue for each worker to have a full batch in flight
// and another one pending, plus a correction factor so users with the
// default worker count of 1 aren't surprised by an unreasonably small queue.
// These formulas could and perhaps should be customized further based on
// the specific beats being called, but their default behavior is already to
// significantly reduce the queue size, so let's get some experience using
// these baselines before optimizing further.
events := bulkMaxSize * (2*worker + 5)
minEvents := bulkMaxSize
return events, minEvents
}

// InjectHeadersRule injects headers into output.
type InjectHeadersRule struct{}

Expand Down
56 changes: 56 additions & 0 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,62 @@ outputs:
},
},
},
"inject queue settings": {
givenYAML: `
output:
elasticsearch:
hosts:
- "127.0.0.1:9201"
bulk_max_size: 46
worker: 5
`,
expectedYAML: `
queue:
mem:
events: 690
flush:
min_events: 46
timeout: 1s
output:
elasticsearch:
hosts:
- "127.0.0.1:9201"
bulk_max_size: 46
worker: 5
`,
rule: &RuleList{
Rules: []Rule{
InjectQueue(),
},
},
},
"inject queue settings falls back on default values": {
givenYAML: `
output:
elasticsearch:
hosts:
- "127.0.0.1:9201"
`,
expectedYAML: `
queue:
mem:
events: 350
flush:
min_events: 50
timeout: 1s
output:
elasticsearch:
hosts:
- "127.0.0.1:9201"
`,
rule: &RuleList{
Rules: []Rule{
InjectQueue(),
},
},
},
}

for name, test := range testcases {
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/spec/apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ rules:
- output
- fleet
- inject_headers: {}
- inject_queue: {}
when: length(${inputs}) > 0 and hasKey(${output}, 'elasticsearch', 'redis',
'kafka', 'logstash')
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/spec/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,7 @@ rules:

- inject_headers: {}

- inject_queue: {}

when: length(${filebeat.inputs}) > 0 and hasKey(${output}, 'elasticsearch', 'redis',
'kafka', 'logstash')
1 change: 1 addition & 0 deletions x-pack/elastic-agent/spec/heartbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ rules:
- inputs
- output
- keystore
- inject_queue: {}
when: length(${inputs}) > 0 and hasKey(${output}, 'elasticsearch', 'redis',
'kafka', 'logstash')
Loading

0 comments on commit 1cc53ba

Please sign in to comment.