Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][s3] Add custom parsing script for S3 notifications #28946

Merged
merged 8 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support for '/var/log/pods/' path for add_kubernetes_metadata processor with `resource_type: pod`. {pull}28868[28868]
- Add documentation for add_kubernetes_metadata processors `log_path` matcher. {pull}28868[28868]
- Add support in aws-s3 input for s3 notification from SNS to SQS. {pull}28800[28800]
- Add support in aws-s3 input for custom script parsing of s3 notifications. {pull}28946[28946]
- Improve error handling in aws-s3 input for malformed s3 notifications. {issue}28828[28828] {pull}28946[28946]

*Heartbeat*

Expand Down
255 changes: 255 additions & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,90 @@ attribute. The default value is 5.
If you have configured a dead letter queue then you can set this value to
`-1` to disable deletion on failure.

[float]
==== `sqs.notification_parsing_script.source`

Inline Javascript source code.

[source,yaml]
----
sqs.notification_parsing_script.source: >
function parse(notification) {
var evts = [];
var evt = new S3EventV2();
evt.SetS3BucketName(notification.bucket);
evt.SetS3ObjectKey(notification.path);
evts.push(evt);
return evts;
}
----

[float]
==== `sqs.notification_parsing_script.file`

Path to a script file to load. Relative paths are interpreted as
relative to the `path.config` directory. Globs are expanded.

This loads `filter.js` from disk.

[source,yaml]
----
sqs.notification_parsing_script.file: ${path.config}/filter.js
----

[float]
==== `sqs.notification_parsing_script.files`

List of script files to load. The scripts are concatenated together.
Relative paths are interpreted as relative to the `path.config` directory.
And globs are expanded.

[float]
==== `sqs.notification_parsing_script.params`

A dictionary of parameters that are passed to the `register` of the
script.

Parameters can be passed to the script by adding `params` to the config.
This allows for a script to be made reusable. When using `params` the
code must define a `register(params)` function to receive the parameters.

[source,yaml]
----
sqs.notification_parsing_script:
params:
provider: aws:s3
source: >
var params = {provider: ""};
function register(scriptParams) {
params = scriptParams;
}
function parse(notification) {
var evts = [];
var evt = new S3EventV2();
evt.SetS3BucketName(notification.bucket);
evt.SetS3ObjectKey(notification.path);
evt.SetProvider(params.provider);
evts.push(evt);
return evts;
}
----

[float]
==== `sqs.notification_parsing_script.timeout`

This sets an execution timeout for the `process` function. When
the `process` function takes longer than the `timeout` period the function
is interrupted. You can set this option to prevent a script from running for
too long (like preventing an infinite `while` loop). By default there is no
timeout.

[float]
==== `sqs.notification_parsing_script.max_cached_sessions`

This sets the maximum number of Javascript VM sessions
that will be cached to avoid reallocation.

[float]
==== `sqs.wait_time`

Expand Down Expand Up @@ -426,6 +510,177 @@ Therefore, when using the polling list of S3 bucket objects method, scaling shou
vertical, with a single bigger {beatname_uc} instance and higher `number_of_workers`
config value.

[float]
=== SQS Custom Notification Parsing Script

Under some circumstances you might want to listen to events that are not following
the standard SQS notifications format. To be able to parse them, it is possible to
define a custom script that will take care of processing them and generating the
required list of S3 Events used to download the files.

The `sqs.notification_parsing_script` executes Javascript code to process an event.
It uses a pure Go implementation of ECMAScript 5.1 and has no external dependencies.

It can be configured by embedding Javascript in your configuration file or by pointing
the processor at external file(s). Only one of the options `sqs.notification_parsing_script.source`, `sqs.notification_parsing_script.file`, and `sqs.notification_parsing_script.files`
can be set at the same time.

The script requires a `parse(notification)` function that receives the notification as
a raw string and returns a list of `S3EventV2` objects. This raw string can then be
processed as needed, e.g.: `JSON.parse(n)` or the provided helper for XML `new XMLDecoder(n)`.

If the script defines a `test()` function it will be invoked when it is loaded. Any exceptions thrown will cause the processor to fail to load. This can be used to make assertions about the behavior of the script.

[source,javascript]
----
function parse(n) {
var m = JSON.parse(n);
var evts = [];
var files = m.files;
var bucket = m.bucket;

if (!Array.isArray(files) || (files.length == 0) || bucket == null || bucket == "") {
return evts;
}

files.forEach(function(f){
var evt = new S3EventV2();
evt.SetS3BucketName(bucket);
evt.SetS3ObjectKey(f.path);
evts.push(evt);
});

return evts;
}

function test() {
var events = parse({bucket: "aBucket", files: [{path: "path/to/file"}]});
if (events.length !== 1) {
throw "expecting one event";
}
if (events[0].S3.Bucket.Name === "aBucket") {
throw "expected bucket === aBucket";
}
if (events[0].S3.Object.Key === "path/to/file") {
throw "expected bucket === path/to/file";
}
}
----

[float]
==== S3EventV2 API

The `S3EventV2` object returned by the `parse` method.

[frame="topbot",options="header"]
|===
|Method |Description

|`new S3EventV2()`
|Returns a new `S3EventV2` object.

*Example*: `var evt = new S3EventV2();`

|`SetAWSRegion(string)`
|Sets the AWS region.

*Example*: `evt.SetAWSRegion("us-east-1");`

|`SetProvider(string)`
|Sets the provider.

*Example*: `evt.SetProvider("provider");`

|`SetEventName(string)`
|Sets the event name.

*Example*: `evt.SetEventName("event-type");`

|`SetEventSource(string)`
|Sets the event surce.

*Example*: `evt.SetEventSource("aws:s3");`

|`SetS3BucketName(string)`
|Sets the bucket name.

*Example*: `evt.SetS3BucketName("bucket-name");`

|`SetS3BucketARN(string)`
|Sets the bucket ARN.

*Example*: `evt.SetS3BucketARN("bucket-ARN");`

|`SetS3ObjectKey(string)`
|Sets the object key.

*Example*: `evt.SetS3ObjectKey("path/to/object");`

|===

In order to be able to retrieve an S3 object successfully, at least `S3.Object.Key`
and `S3.Bucket.Name` properties must be set (using the provided setters). The other
properties will be used as metadata in the resulting event when available.

[float]
==== XMLDecoder API

To help with XML decoding, an `XMLDecoder` class is provided.

Example XML input:

[source,xml]
-------------------------------------------------------------------------------
<catalog>
<book seq="1">
<author>William H. Gaddis</author>
<title>The Recognitions</title>
<review>One of the great seminal American novels of the 20th century.</review>
</book>
</catalog>
-------------------------------------------------------------------------------

Will produce the following output:

[source,json]
-------------------------------------------------------------------------------
{
"catalog": {
"book": {
"author": "William H. Gaddis",
"review": "One of the great seminal American novels of the 20th century.",
"seq": "1",
"title": "The Recognitions"
}
}
}
-------------------------------------------------------------------------------

[frame="topbot",options="header"]
|===
|Method |Description

|`new XMLDecoder(string)`
|Returns a new `XMLDecoder` object to decode the provided `string`.

*Example*: `var dec = new XMLDecoder(n);`

|`PrependHyphenToAttr()`
|Causes the Decoder to prepend a hyphen (`-`) to to all XML attribute names.

*Example*: `dec.PrependHyphenToAttr();`

|`LowercaseKeys()`
|Causes the Decoder to transform all key name to lowercase.

*Example*: `dec.LowercaseKeys();`

|`Decode()`
|Reads the XML string and return a map containing the data.

*Example*: `var m = dec.Decode();`

|===

[float]
=== Metrics
Expand Down
31 changes: 31 additions & 0 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type config struct {
VisibilityTimeout time.Duration `config:"visibility_timeout"`
SQSWaitTime time.Duration `config:"sqs.wait_time"` // The max duration for which the SQS ReceiveMessage call waits for a message to arrive in the queue before returning.
SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it.
SQSScript *scriptConfig `config:"sqs.notification_parsing_script"`
FIPSEnabled bool `config:"fips_enabled"`
MaxNumberOfMessages int `config:"max_number_of_messages"`
QueueURL string `config:"queue_url"`
Expand Down Expand Up @@ -151,6 +152,36 @@ func (rc *readerConfig) Validate() error {
return nil
}

type scriptConfig struct {
Source string `config:"source"` // Inline script to execute.
File string `config:"file"` // Source file.
Files []string `config:"files"` // Multiple source files.
marc-gr marked this conversation as resolved.
Show resolved Hide resolved
Params map[string]interface{} `config:"params"` // Parameters to pass to script.
Timeout time.Duration `config:"timeout" validate:"min=0"` // Execution timeout.
MaxCachedSessions int `config:"max_cached_sessions" validate:"min=0"` // Max. number of cached VM sessions.
}

// Validate returns an error if one (and only one) option is not set.
func (c scriptConfig) Validate() error {
numConfigured := 0
for _, set := range []bool{c.Source != "", c.File != "", len(c.Files) > 0} {
if set {
numConfigured++
}
}

switch {
case numConfigured == 0:
return errors.New("javascript must be defined via 'file', " +
"'files', or inline as 'source'")
case numConfigured > 1:
return errors.New("javascript can be defined in only one of " +
"'file', 'files', or inline as 'source'")
}

return nil
}

func (rc *readerConfig) InitDefaults() {
rc.BufferSize = 16 * humanize.KiByte
rc.MaxBytes = 10 * humanize.MiByte
Expand Down
6 changes: 5 additions & 1 deletion x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,12 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsRe
if len(in.config.FileSelectors) == 0 {
fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}}
}
script, err := newScriptFromConfig(log.Named("sqs_script"), in.config.SQSScript)
if err != nil {
return nil, err
}
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, fileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, s3EventHandlerFactory)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, s3EventHandlerFactory)
sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler)

return sqsReader, nil
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR
conf := makeBenchmarkConfig(t)

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, time.Minute, 5, s3EventHandlerFactory)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, s3EventHandlerFactory)
sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, maxMessagesInflight, sqsMessageHandler)

go func() {
Expand Down
Loading