Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inject inferred queue settings into agent config generation #27429

Merged
merged 7 commits into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,4 @@
- Increase Agent's mem limits in k8s. {pull}27153[27153]
- Add new --enroll-delay option for install and enroll commands. {pull}27118[27118]
- Add link to troubleshooting guide on fatal exits. {issue}26367[26367] {pull}27236[27236]
- Agent now adapts the beats queue size based on output settings. {issue}26638[26638] {pull}27429[27429]
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/program/supported.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 350
flush:
min_events: 50
timeout: 1s

filebeat:
inputs:
- type: log
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 350
flush:
min_events: 50
timeout: 1s

filebeat:
inputs:
- type: log
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 350
flush:
min_events: 50
timeout: 1s

filebeat:
inputs:
- type: log
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 350
flush:
min_events: 50
timeout: 1s

inputs:
- type: synthetics/http
id: unique-http-id
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 350
flush:
min_events: 50
timeout: 1s

metricbeat:
modules:
- module: docker
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 350
flush:
min_events: 50
timeout: 1s

inputs:
- type: packet
processors:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ output:
hosts:
- "127.0.0.1:9200"
- "127.0.0.1:9300"
bulk_max_size: 23
worker: 10
username: elastic
password: changeme
api_key: TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 575
flush:
min_events: 23
timeout: 1s

filebeat:
inputs:
- type: log
Expand Down Expand Up @@ -63,5 +70,7 @@ output:
- 127.0.0.1:9300
username: elastic
password: changeme
bulk_max_size: 23
worker: 10
api_key: TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA
ca_sha256: 7HIpactkIAq2Y49orFOOQKurWxmmSFZhBCoQYcRhJ3Y=
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 575
flush:
min_events: 23
timeout: 1s

inputs:
- type: synthetics/http
id: unique-http-id
Expand All @@ -23,6 +30,8 @@ output:
hosts:
- 127.0.0.1:9200
- 127.0.0.1:9300
bulk_max_size: 23
worker: 10
username: elastic
password: changeme
api_key: TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 575
flush:
min_events: 23
timeout: 1s

metricbeat:
modules:
- module: docker
Expand Down Expand Up @@ -83,5 +90,7 @@ output:
hosts: [127.0.0.1:9200, 127.0.0.1:9300]
username: elastic
password: changeme
bulk_max_size: 23
worker: 10
api_key: TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA
ca_sha256: 7HIpactkIAq2Y49orFOOQKurWxmmSFZhBCoQYcRhJ3Y=
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 575
flush:
min_events: 23
timeout: 1s

inputs:
- type: packet
processors:
Expand Down Expand Up @@ -30,5 +37,7 @@ output:
- 127.0.0.1:9300
username: elastic
password: changeme
bulk_max_size: 23
worker: 10
api_key: TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA
ca_sha256: 7HIpactkIAq2Y49orFOOQKurWxmmSFZhBCoQYcRhJ3Y=
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ outputs:
default:
type: elasticsearch
hosts: [127.0.0.1:9200, 127.0.0.1:9300]
bulk_max_size: 23
worker: 10
username: elastic
password: changeme
api_key: TiNAGG4BaaMdaH1tRfuU:KnR6yE41RrSowb0kQ0HWoA
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
queue:
mem:
events: 350
flush:
min_events: 50
timeout: 1s

inputs:
- type: synthetics/http
id: unique-http-id
Expand Down
99 changes: 99 additions & 0 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func (r *RuleList) MarshalYAML() (interface{}, error) {
name = "insert_defaults"
case *InjectHeadersRule:
name = "inject_headers"
case *InjectQueueRule:
name = "inject_queue"
default:
return nil, fmt.Errorf("unknown rule of type %T", rule)
}
Expand Down Expand Up @@ -180,6 +182,8 @@ func (r *RuleList) UnmarshalYAML(unmarshal func(interface{}) error) error {
r = &InsertDefaultsRule{}
case "inject_headers":
r = &InjectHeadersRule{}
case "inject_queue":
r = &InjectQueueRule{}
default:
return fmt.Errorf("unknown rule of type %s", name)
}
Expand Down Expand Up @@ -1519,6 +1523,101 @@ func InsertDefaults(path string, selectors ...Selector) *InsertDefaultsRule {
}
}

type InjectQueueRule struct{}

// InjectQueue creates a InjectQueueRule
func InjectQueue() *InjectQueueRule {
return &InjectQueueRule{}
}

func (r *InjectQueueRule) Apply(agentInfo AgentInfo, ast *AST) (err error) {
defer func() {
if err != nil {
err = errors.New(err, "failed to inject queue parameters into configuration")
}
}()

outputNode, found := Lookup(ast, "output")
if !found {
return nil
}

outputDict, ok := outputNode.Value().(*Dict)
if !ok || len(outputDict.value) == 0 {
return nil
}
outputChild := outputDict.value[0]

// Initialize the bulk_max_size and worker parameters to the global defaults,
// then override them if there's an explicit setting.
bulkMaxSize := 50
worker := 1

if bulkMaxSizeNode, ok := outputChild.Find("bulk_max_size"); ok {
if bulkMaxSizeInt, ok := bulkMaxSizeNode.Value().(*IntVal); ok {
bulkMaxSize = bulkMaxSizeInt.value
}
}

if workerNode, ok := outputChild.Find("worker"); ok {
if workerInt, ok := workerNode.Value().(*IntVal); ok {
worker = workerInt.value
}
}

// Insert memory queue settings based on the output params.
queueNode := queueDictFromOutputSettings(bulkMaxSize, worker)
if err := Insert(ast, queueNode, "queue.mem"); err != nil {
return err
}

return nil
}

func queueDictFromOutputSettings(bulkMaxSize, worker int) Node {
events, min_events := queueParamsFromOutputSettings(bulkMaxSize, worker)
dict := &Dict{
value: []Node{
&Key{
name: "events",
value: &IntVal{value: events},
},
&Key{
name: "flush",
value: &Dict{
value: []Node{
&Key{
name: "min_events",
value: &IntVal{value: min_events},
},
&Key{
name: "timeout",
value: &StrVal{value: "1s"},
},
},
},
},
},
}
return dict
}

// Given output settings, returns inferred values for queue.mem.events
// and queue.mem.flush.min_events.
// See https://github.com/elastic/beats/issues/26638.
func queueParamsFromOutputSettings(bulkMaxSize, worker int) (int, int) {
// Create space in the queue for each worker to have a full batch in flight
// and another one pending, plus a correction factor so users with the
// default worker count of 1 aren't surprised by an unreasonably small queue.
// These formulas could and perhaps should be customized further based on
// the specific beats being called, but their default behavior is already to
// significantly reduce the queue size, so let's get some experience using
// these baselines before optimizing further.
events := bulkMaxSize * (2*worker + 5)
minEvents := bulkMaxSize
return events, minEvents
}

// InjectHeadersRule injects headers into output.
type InjectHeadersRule struct{}

Expand Down
56 changes: 56 additions & 0 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,62 @@ outputs:
},
},
},
"inject queue settings": {
givenYAML: `
output:
elasticsearch:
hosts:
- "127.0.0.1:9201"
bulk_max_size: 46
worker: 5
`,
expectedYAML: `
queue:
mem:
events: 690
flush:
min_events: 46
timeout: 1s

output:
elasticsearch:
hosts:
- "127.0.0.1:9201"
bulk_max_size: 46
worker: 5
`,
rule: &RuleList{
Rules: []Rule{
InjectQueue(),
},
},
},
"inject queue settings falls back on default values": {
givenYAML: `
output:
elasticsearch:
hosts:
- "127.0.0.1:9201"
`,
expectedYAML: `
queue:
mem:
events: 350
flush:
min_events: 50
timeout: 1s

output:
elasticsearch:
hosts:
- "127.0.0.1:9201"
`,
rule: &RuleList{
Rules: []Rule{
InjectQueue(),
},
},
},
}

for name, test := range testcases {
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/spec/apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ rules:
- output
- fleet
- inject_headers: {}
- inject_queue: {}
when: length(${inputs}) > 0 and hasKey(${output}, 'elasticsearch', 'redis',
'kafka', 'logstash')
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/spec/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,7 @@ rules:

- inject_headers: {}

- inject_queue: {}

when: length(${filebeat.inputs}) > 0 and hasKey(${output}, 'elasticsearch', 'redis',
'kafka', 'logstash')
1 change: 1 addition & 0 deletions x-pack/elastic-agent/spec/heartbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ rules:
- inputs
- output
- keystore
- inject_queue: {}
when: length(${inputs}) > 0 and hasKey(${output}, 'elasticsearch', 'redis',
'kafka', 'logstash')
1 change: 1 addition & 0 deletions x-pack/elastic-agent/spec/metricbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ rules:
- output
- keystore
- inject_headers: {}
- inject_queue: {}

when: length(${metricbeat.modules}) > 0 and hasKey(${output}, 'elasticsearch',
'redis', 'kafka', 'logstash')
Loading