Skip to content

Commit

Permalink
Expose the Metadata Refresh stragegy for the Kafka output. (#10682)
Browse files Browse the repository at this point in the history
* Expose the Metadata Refresh stragegy for the Kafka output.

By default the Kafka output will periodically refresh the metadata
information for all the available topics. This is the default strategy
that beat uses.

But if you have strict permissions in place for your
Kafka cluster this could lead to errors in your log while trying to get
information for a topic that you don't have permissions.

This commit keep the default behavior but allow to change the strategy
from the config file.
  • Loading branch information
ph committed Feb 27, 2019
1 parent be91dbf commit 6829f19
Show file tree
Hide file tree
Showing 15 changed files with 80 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix a issue when remote and local configuration didn't match when fetching configuration from Central Management. {issue}10587[10587]
- Fix unauthorized error when loading dashboards by adding username and password into kibana config. {issue}10513[10513] {pull}10675[10675]
- Ensure all beat commands respect configured settings. {pull}10721[10721]
- Allow to configure Kafka fetching strategy for the topic metadata. {pull}10682[10682]
- Using an environment variable for the password when enrolling a beat will now raise an error if the variable doesn't exist. {pull}10936[10936]

*Auditbeat*
Expand Down
7 changes: 5 additions & 2 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ auditbeat.modules:
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields,
# The supported processors are drop_fields, drop_event, include_fields,
# decode_json_fields, and add_cloud_metadata.
#
# For example, you can use the following processors to keep the fields that
Expand Down Expand Up @@ -319,7 +319,7 @@ auditbeat.modules:
# match_pids: ["system.process.ppid"]
# target: system.process.parent
#
# The following example decodes fields containing JSON strings
# The following example decodes fields containing JSON strings
# and replaces the strings with valid JSON objects.
#
#processors:
Expand Down Expand Up @@ -621,6 +621,9 @@ output.elasticsearch:
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is true.
#full: true

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
7 changes: 5 additions & 2 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ filebeat.inputs:
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields,
# The supported processors are drop_fields, drop_event, include_fields,
# decode_json_fields, and add_cloud_metadata.
#
# For example, you can use the following processors to keep the fields that
Expand Down Expand Up @@ -1030,7 +1030,7 @@ filebeat.inputs:
# match_pids: ["system.process.ppid"]
# target: system.process.parent
#
# The following example decodes fields containing JSON strings
# The following example decodes fields containing JSON strings
# and replaces the strings with valid JSON objects.
#
#processors:
Expand Down Expand Up @@ -1332,6 +1332,9 @@ output.elasticsearch:
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is true.
#full: true

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
7 changes: 5 additions & 2 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ heartbeat.scheduler:
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields,
# The supported processors are drop_fields, drop_event, include_fields,
# decode_json_fields, and add_cloud_metadata.
#
# For example, you can use the following processors to keep the fields that
Expand Down Expand Up @@ -463,7 +463,7 @@ heartbeat.scheduler:
# match_pids: ["system.process.ppid"]
# target: system.process.parent
#
# The following example decodes fields containing JSON strings
# The following example decodes fields containing JSON strings
# and replaces the strings with valid JSON objects.
#
#processors:
Expand Down Expand Up @@ -765,6 +765,9 @@ output.elasticsearch:
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is true.
#full: true

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
7 changes: 5 additions & 2 deletions journalbeat/journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ setup.template.settings:
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields,
# The supported processors are drop_fields, drop_event, include_fields,
# decode_json_fields, and add_cloud_metadata.
#
# For example, you can use the following processors to keep the fields that
Expand Down Expand Up @@ -259,7 +259,7 @@ setup.template.settings:
# match_pids: ["system.process.ppid"]
# target: system.process.parent
#
# The following example decodes fields containing JSON strings
# The following example decodes fields containing JSON strings
# and replaces the strings with valid JSON objects.
#
#processors:
Expand Down Expand Up @@ -561,6 +561,9 @@ output.elasticsearch:
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is true.
#full: true

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
7 changes: 5 additions & 2 deletions libbeat/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields,
# The supported processors are drop_fields, drop_event, include_fields,
# decode_json_fields, and add_cloud_metadata.
#
# For example, you can use the following processors to keep the fields that
Expand Down Expand Up @@ -207,7 +207,7 @@
# match_pids: ["system.process.ppid"]
# target: system.process.parent
#
# The following example decodes fields containing JSON strings
# The following example decodes fields containing JSON strings
# and replaces the strings with valid JSON objects.
#
#processors:
Expand Down Expand Up @@ -509,6 +509,9 @@ output.elasticsearch:
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is true.
#full: true

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
28 changes: 16 additions & 12 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ endif::no_dashboards[]

You can set the index dynamically by using a format string to access any event
field. For example, this configuration uses a custom field, `fields.log_type`,
to set the index:
to set the index:

["source","yaml",subs="attributes"]
------------------------------------------------------------------------------
Expand All @@ -261,13 +261,13 @@ output.elasticsearch:
<1> We recommend including +{beat_version_key}+ in the name to avoid mapping issues
when you upgrade.

With this configuration, all events with `log_type: normal` are sent to an
With this configuration, all events with `log_type: normal` are sent to an
index named +normal-{version}-{localdate}+, and all events with
`log_type: critical` are sent to an index named
+critical-{version}-{localdate}+.

TIP: To learn how to add custom fields to events, see the
<<libbeat-configuration-fields,`fields`>> option.
<<libbeat-configuration-fields,`fields`>> option.

See the <<indices-option-es,`indices`>> setting for other ways to set the index
dynamically.
Expand All @@ -285,7 +285,7 @@ matches, the <<index-option-es,`index`>> setting is used.
Rule settings:

*`index`*:: The index format string to use. If this string contains field
references, such as `%{[fields.name]}`, the fields must exist, or the rule fails.
references, such as `%{[fields.name]}`, the fields must exist, or the rule fails.

*`mappings`*:: A dictionary that takes the value returned by `index` and maps it
to a new name.
Expand Down Expand Up @@ -347,7 +347,7 @@ ifndef::no_ilm[]
[[ilm-es]]
===== `ilm`

Configuration options for index lifecycle management.
Configuration options for index lifecycle management.

See <<ilm>> for more information.
endif::no_ilm[]
Expand All @@ -369,7 +369,7 @@ For more information, see <<configuring-ingest-node>>.

You can set the ingest node pipeline dynamically by using a format string to
access any event field. For example, this configuration uses a custom field,
`fields.log_type`, to set the pipeline for each event:
`fields.log_type`, to set the pipeline for each event:

["source","yaml",subs="attributes"]
------------------------------------------------------------------------------
Expand All @@ -384,7 +384,7 @@ named `normal_pipeline`, and all events with `log_type: critical` are sent to a
pipeline named `critical_pipeline`.

TIP: To learn how to add custom fields to events, see the
<<libbeat-configuration-fields,`fields`>> option.
<<libbeat-configuration-fields,`fields`>> option.

See the <<pipelines-option-es,`pipelines`>> setting for other ways to set the
ingest node pipeline dynamically.
Expand All @@ -403,7 +403,7 @@ Rule settings:

*`pipeline`*:: The pipeline format string to use. If this string contains field
references, such as `%{[fields.name]}`, the fields must exist, or the rule
fails.
fails.

*`mappings`*:: A dictionary that takes the value returned by `pipeline` and maps
it to a new name.
Expand Down Expand Up @@ -870,7 +870,7 @@ topic: '%{[fields.log_topic]}'
-----

TIP: To learn how to add custom fields to events, see the
<<libbeat-configuration-fields,`fields`>> option.
<<libbeat-configuration-fields,`fields`>> option.

See the <<topics-option-kafka,`topics`>> setting for other ways to set the
topic dynamically.
Expand All @@ -889,7 +889,7 @@ Rule settings:

*`topic`*:: The topic format string to use. If this string contains field
references, such as `%{[fields.name]}`, the fields must exist, or the rule
fails.
fails.

*`mappings`*:: A dictionary that takes the value returned by `topic` and maps it
to a new name.
Expand All @@ -901,7 +901,7 @@ match.
ifndef::no-processors[]
All the <<conditions,conditions>> supported by processors are also supported
here.
endif::no-processors[]
endif::no-processors[]


===== `key`
Expand Down Expand Up @@ -955,6 +955,10 @@ brokers, topics, partition, and active leaders to use for publishing.

*`refresh_frequency`*:: Metadata refresh interval. Defaults to 10 minutes.

*`full`*:: Strategy to use when fetching metadata, when this option is `true`, the client will maintain
a full set of metadata for all the available topics, if the this option is set to `false` it will only refresh the
metadata for the configured topics. The default is true.

*`retry.max`*:: Total number of metadata update retries when cluster is in middle of leader election. The default is 3.

*`retry.backoff`*:: Waiting time between retries during leader elections. Default is 250ms.
Expand Down Expand Up @@ -1099,7 +1103,7 @@ output.redis:


TIP: To learn how to add custom fields to events, see the
<<libbeat-configuration-fields,`fields`>> option.
<<libbeat-configuration-fields,`fields`>> option.

See the <<keys-option-redis,`keys`>> setting for other ways to set the key
dynamically.
Expand Down
3 changes: 3 additions & 0 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type kafkaConfig struct {
type metaConfig struct {
Retry metaRetryConfig `config:"retry"`
RefreshFreq time.Duration `config:"refresh_frequency" validate:"min=0"`
Full bool `config:"full"`
}

type metaRetryConfig struct {
Expand Down Expand Up @@ -90,6 +91,7 @@ func defaultConfig() kafkaConfig {
Backoff: 250 * time.Millisecond,
},
RefreshFreq: 10 * time.Minute,
Full: true,
},
KeepAlive: 0,
MaxMessageBytes: nil, // use library default
Expand Down Expand Up @@ -177,6 +179,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
k.Metadata.Retry.Max = config.Metadata.Retry.Max
k.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
k.Metadata.RefreshFrequency = config.Metadata.RefreshFreq
k.Metadata.Full = config.Metadata.Full

// configure producer API properties
if config.MaxMessageBytes != nil {
Expand Down
7 changes: 5 additions & 2 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ metricbeat.modules:
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields,
# The supported processors are drop_fields, drop_event, include_fields,
# decode_json_fields, and add_cloud_metadata.
#
# For example, you can use the following processors to keep the fields that
Expand Down Expand Up @@ -926,7 +926,7 @@ metricbeat.modules:
# match_pids: ["system.process.ppid"]
# target: system.process.parent
#
# The following example decodes fields containing JSON strings
# The following example decodes fields containing JSON strings
# and replaces the strings with valid JSON objects.
#
#processors:
Expand Down Expand Up @@ -1228,6 +1228,9 @@ output.elasticsearch:
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is true.
#full: true

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
7 changes: 5 additions & 2 deletions packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ packetbeat.ignore_outgoing: false
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields,
# The supported processors are drop_fields, drop_event, include_fields,
# decode_json_fields, and add_cloud_metadata.
#
# For example, you can use the following processors to keep the fields that
Expand Down Expand Up @@ -687,7 +687,7 @@ packetbeat.ignore_outgoing: false
# match_pids: ["system.process.ppid"]
# target: system.process.parent
#
# The following example decodes fields containing JSON strings
# The following example decodes fields containing JSON strings
# and replaces the strings with valid JSON objects.
#
#processors:
Expand Down Expand Up @@ -989,6 +989,9 @@ output.elasticsearch:
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is true.
#full: true

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
7 changes: 5 additions & 2 deletions winlogbeat/winlogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ winlogbeat.event_logs:
#
# event -> filter1 -> event1 -> filter2 ->event2 ...
#
# The supported processors are drop_fields, drop_event, include_fields,
# The supported processors are drop_fields, drop_event, include_fields,
# decode_json_fields, and add_cloud_metadata.
#
# For example, you can use the following processors to keep the fields that
Expand Down Expand Up @@ -236,7 +236,7 @@ winlogbeat.event_logs:
# match_pids: ["system.process.ppid"]
# target: system.process.parent
#
# The following example decodes fields containing JSON strings
# The following example decodes fields containing JSON strings
# and replaces the strings with valid JSON objects.
#
#processors:
Expand Down Expand Up @@ -538,6 +538,9 @@ output.elasticsearch:
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m

# Strategy for fetching the topics metadata from the broker. Default is true.
#full: true

# The number of concurrent load-balanced Kafka output workers.
#worker: 1

Expand Down
Loading

0 comments on commit 6829f19

Please sign in to comment.