Skip to content

Commit

Permalink
[filebeat][s3] Add custom parsing script for S3 notifications (#28946)
Browse files Browse the repository at this point in the history
* Add custom parsing script for S3 notifications

* Remove unnecessary custom jsmapstr type.
It can be used as a regular JS map since its only purpose is to be read.

* add docs and changelog entry

* Remove commented code

* Document script options restriction

* Better error if Records are missing in notification

* Fix test

* Pass notification as string and add xml parsing options for the scripts

(cherry picked from commit df3fcec)
  • Loading branch information
marc-gr authored and mergify-bot committed Nov 18, 2021
1 parent 0471467 commit 3904dbe
Show file tree
Hide file tree
Showing 12 changed files with 1,148 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Sophos UTM: Support logs containing hostname in syslog header. {pull}28638[28638]
- Moving Oracle Filebeat module to GA. {pull}28754[28754]
- Add support in aws-s3 input for s3 notification from SNS to SQS. {pull}28800[28800]
- Add support in aws-s3 input for custom script parsing of s3 notifications. {pull}28946[28946]
- Improve error handling in aws-s3 input for malformed s3 notifications. {issue}28828[28828] {pull}28946[28946]

*Heartbeat*

Expand Down
255 changes: 255 additions & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,90 @@ attribute. The default value is 5.
If you have configured a dead letter queue then you can set this value to
`-1` to disable deletion on failure.

[float]
==== `sqs.notification_parsing_script.source`

Inline Javascript source code.

[source,yaml]
----
sqs.notification_parsing_script.source: >
function parse(notification) {
var evts = [];
var evt = new S3EventV2();
evt.SetS3BucketName(notification.bucket);
evt.SetS3ObjectKey(notification.path);
evts.push(evt);
return evts;
}
----

[float]
==== `sqs.notification_parsing_script.file`

Path to a script file to load. Relative paths are interpreted as
relative to the `path.config` directory. Globs are expanded.

This loads `filter.js` from disk.

[source,yaml]
----
sqs.notification_parsing_script.file: ${path.config}/filter.js
----

[float]
==== `sqs.notification_parsing_script.files`

List of script files to load. The scripts are concatenated together.
Relative paths are interpreted as relative to the `path.config` directory.
And globs are expanded.

[float]
==== `sqs.notification_parsing_script.params`

A dictionary of parameters that are passed to the `register` of the
script.

Parameters can be passed to the script by adding `params` to the config.
This allows for a script to be made reusable. When using `params` the
code must define a `register(params)` function to receive the parameters.

[source,yaml]
----
sqs.notification_parsing_script:
params:
provider: aws:s3
source: >
var params = {provider: ""};
function register(scriptParams) {
params = scriptParams;
}
function parse(notification) {
var evts = [];
var evt = new S3EventV2();
evt.SetS3BucketName(notification.bucket);
evt.SetS3ObjectKey(notification.path);
evt.SetProvider(params.provider);
evts.push(evt);
return evts;
}
----

[float]
==== `sqs.notification_parsing_script.timeout`

This sets an execution timeout for the `process` function. When
the `process` function takes longer than the `timeout` period the function
is interrupted. You can set this option to prevent a script from running for
too long (like preventing an infinite `while` loop). By default there is no
timeout.

[float]
==== `sqs.notification_parsing_script.max_cached_sessions`

This sets the maximum number of Javascript VM sessions
that will be cached to avoid reallocation.

[float]
==== `sqs.wait_time`

Expand Down Expand Up @@ -426,6 +510,177 @@ Therefore, when using the polling list of S3 bucket objects method, scaling shou
vertical, with a single bigger {beatname_uc} instance and higher `number_of_workers`
config value.

[float]
=== SQS Custom Notification Parsing Script

Under some circumstances you might want to listen to events that are not following
the standard SQS notifications format. To be able to parse them, it is possible to
define a custom script that will take care of processing them and generating the
required list of S3 Events used to download the files.

The `sqs.notification_parsing_script` executes Javascript code to process an event.
It uses a pure Go implementation of ECMAScript 5.1 and has no external dependencies.

It can be configured by embedding Javascript in your configuration file or by pointing
the processor at external file(s). Only one of the options `sqs.notification_parsing_script.source`, `sqs.notification_parsing_script.file`, and `sqs.notification_parsing_script.files`
can be set at the same time.

The script requires a `parse(notification)` function that receives the notification as
a raw string and returns a list of `S3EventV2` objects. This raw string can then be
processed as needed, e.g.: `JSON.parse(n)` or the provided helper for XML `new XMLDecoder(n)`.

If the script defines a `test()` function it will be invoked when it is loaded. Any exceptions thrown will cause the processor to fail to load. This can be used to make assertions about the behavior of the script.

[source,javascript]
----
function parse(n) {
var m = JSON.parse(n);
var evts = [];
var files = m.files;
var bucket = m.bucket;
if (!Array.isArray(files) || (files.length == 0) || bucket == null || bucket == "") {
return evts;
}
files.forEach(function(f){
var evt = new S3EventV2();
evt.SetS3BucketName(bucket);
evt.SetS3ObjectKey(f.path);
evts.push(evt);
});
return evts;
}
function test() {
var events = parse({bucket: "aBucket", files: [{path: "path/to/file"}]});
if (events.length !== 1) {
throw "expecting one event";
}
if (events[0].S3.Bucket.Name === "aBucket") {
throw "expected bucket === aBucket";
}
if (events[0].S3.Object.Key === "path/to/file") {
throw "expected bucket === path/to/file";
}
}
----

[float]
==== S3EventV2 API

The `S3EventV2` object returned by the `parse` method.

[frame="topbot",options="header"]
|===
|Method |Description

|`new S3EventV2()`
|Returns a new `S3EventV2` object.

*Example*: `var evt = new S3EventV2();`

|`SetAWSRegion(string)`
|Sets the AWS region.

*Example*: `evt.SetAWSRegion("us-east-1");`

|`SetProvider(string)`
|Sets the provider.

*Example*: `evt.SetProvider("provider");`

|`SetEventName(string)`
|Sets the event name.

*Example*: `evt.SetEventName("event-type");`

|`SetEventSource(string)`
|Sets the event surce.

*Example*: `evt.SetEventSource("aws:s3");`

|`SetS3BucketName(string)`
|Sets the bucket name.

*Example*: `evt.SetS3BucketName("bucket-name");`

|`SetS3BucketARN(string)`
|Sets the bucket ARN.

*Example*: `evt.SetS3BucketARN("bucket-ARN");`

|`SetS3ObjectKey(string)`
|Sets the object key.

*Example*: `evt.SetS3ObjectKey("path/to/object");`

|===

In order to be able to retrieve an S3 object successfully, at least `S3.Object.Key`
and `S3.Bucket.Name` properties must be set (using the provided setters). The other
properties will be used as metadata in the resulting event when available.

[float]
==== XMLDecoder API

To help with XML decoding, an `XMLDecoder` class is provided.

Example XML input:

[source,xml]
-------------------------------------------------------------------------------
<catalog>
<book seq="1">
<author>William H. Gaddis</author>
<title>The Recognitions</title>
<review>One of the great seminal American novels of the 20th century.</review>
</book>
</catalog>
-------------------------------------------------------------------------------

Will produce the following output:

[source,json]
-------------------------------------------------------------------------------
{
"catalog": {
"book": {
"author": "William H. Gaddis",
"review": "One of the great seminal American novels of the 20th century.",
"seq": "1",
"title": "The Recognitions"
}
}
}
-------------------------------------------------------------------------------

[frame="topbot",options="header"]
|===
|Method |Description

|`new XMLDecoder(string)`
|Returns a new `XMLDecoder` object to decode the provided `string`.

*Example*: `var dec = new XMLDecoder(n);`

|`PrependHyphenToAttr()`
|Causes the Decoder to prepend a hyphen (`-`) to to all XML attribute names.

*Example*: `dec.PrependHyphenToAttr();`

|`LowercaseKeys()`
|Causes the Decoder to transform all key name to lowercase.

*Example*: `dec.LowercaseKeys();`

|`Decode()`
|Reads the XML string and return a map containing the data.

*Example*: `var m = dec.Decode();`

|===

[float]
=== Metrics
Expand Down
31 changes: 31 additions & 0 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type config struct {
VisibilityTimeout time.Duration `config:"visibility_timeout"`
SQSWaitTime time.Duration `config:"sqs.wait_time"` // The max duration for which the SQS ReceiveMessage call waits for a message to arrive in the queue before returning.
SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it.
SQSScript *scriptConfig `config:"sqs.notification_parsing_script"`
FIPSEnabled bool `config:"fips_enabled"`
MaxNumberOfMessages int `config:"max_number_of_messages"`
QueueURL string `config:"queue_url"`
Expand Down Expand Up @@ -151,6 +152,36 @@ func (rc *readerConfig) Validate() error {
return nil
}

type scriptConfig struct {
Source string `config:"source"` // Inline script to execute.
File string `config:"file"` // Source file.
Files []string `config:"files"` // Multiple source files.
Params map[string]interface{} `config:"params"` // Parameters to pass to script.
Timeout time.Duration `config:"timeout" validate:"min=0"` // Execution timeout.
MaxCachedSessions int `config:"max_cached_sessions" validate:"min=0"` // Max. number of cached VM sessions.
}

// Validate returns an error if one (and only one) option is not set.
func (c scriptConfig) Validate() error {
numConfigured := 0
for _, set := range []bool{c.Source != "", c.File != "", len(c.Files) > 0} {
if set {
numConfigured++
}
}

switch {
case numConfigured == 0:
return errors.New("javascript must be defined via 'file', " +
"'files', or inline as 'source'")
case numConfigured > 1:
return errors.New("javascript can be defined in only one of " +
"'file', 'files', or inline as 'source'")
}

return nil
}

func (rc *readerConfig) InitDefaults() {
rc.BufferSize = 16 * humanize.KiByte
rc.MaxBytes = 10 * humanize.MiByte
Expand Down
6 changes: 5 additions & 1 deletion x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,12 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsRe
if len(in.config.FileSelectors) == 0 {
fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}}
}
script, err := newScriptFromConfig(log.Named("sqs_script"), in.config.SQSScript)
if err != nil {
return nil, err
}
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, fileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, s3EventHandlerFactory)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, s3EventHandlerFactory)
sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler)

return sqsReader, nil
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR
conf := makeBenchmarkConfig(t)

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, time.Minute, 5, s3EventHandlerFactory)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, s3EventHandlerFactory)
sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, maxMessagesInflight, sqsMessageHandler)

go func() {
Expand Down
Loading

0 comments on commit 3904dbe

Please sign in to comment.