Skip to content

Commit

Permalink
[Filebeat] Add max_number_of_messages config parameter for S3 input (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kaiyan-sheng committed Oct 20, 2020
1 parent 610e998 commit a10dca7
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 64 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Adding support for Microsoft 365 Defender (Microsoft Threat Protection) {pull}21446[21446]
- Adding support for FIPS in s3 input {pull}21446[21446]
- Add SSL option to checkpoint module {pull}19560[19560]
- Add max_number_of_messages config into s3 input. {pull}21993[21993]

*Heartbeat*

Expand Down
89 changes: 45 additions & 44 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,32 @@ The `s3` input supports the following configuration options plus the
<<{beatname_lc}-input-{type}-common-options>> described later.

[float]
==== `queue_url`

URL of the AWS SQS queue that messages will be received from. Required.

[float]
==== `fips_enabled`

Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. For example: `s3-fips.us-gov-east-1.amazonaws.com`.
==== `api_timeout`

[float]
==== `visibility_timeout`
The maximum duration of the AWS API call. If it exceeds the timeout, the AWS API
call will be interrupted.
The default AWS API call timeout for a message is 120 seconds. The minimum
is 0 seconds. The maximum is half of the visibility timeout value.

The duration that the received messages are hidden from subsequent
retrieve requests after being retrieved by a ReceiveMessage request.
This value needs to be a lot bigger than {beatname_uc} collection frequency so
if it took too long to read the s3 log, this sqs message will not be reprocessed.
The default visibility timeout for a message is 300 seconds. The minimum
is 0 seconds. The maximum is 12 hours.
["source","json"]
----
{
"Records": [
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:51:00Z",
"awsRegion": "us-east-1",
"eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
},
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:52:00Z",
"awsRegion": "us-east-1",
"eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
}
]
}
----

[float]
==== `expand_event_list_from_field`
Expand Down Expand Up @@ -93,40 +101,33 @@ file_selectors:
- regex: '^AWSLogs/\d+/CloudTrail/'
expand_event_list_from_field: 'Records'
- regex: '^AWSLogs/\d+/CloudTrail-Digest'
```
----

[float]
==== `fips_enabled`

Enabling this option changes the service name from `s3` to `s3-fips` for connecting to the correct service endpoint. For example: `s3-fips.us-gov-east-1.amazonaws.com`.

[float]
==== `api_timeout`
==== `max_number_of_messages`
The maximum number of messages to return. Amazon SQS never returns more messages
than this value (however, fewer messages might be returned).
Valid values: 1 to 10. Default: 5.

The maximum duration of AWS API can take. If it exceeds the timeout, AWS API
will be interrupted.
The default AWS API timeout for a message is 120 seconds. The minimum
is 0 seconds. The maximum is half of the visibility timeout value.
[float]
==== `queue_url`

["source","json"]
----
{
"Records": [
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:51:00Z",
"awsRegion": "us-east-1",
"eventID": "EXAMPLE8-9621-4d00-b913-beca2EXAMPLE",
...
},
{
"eventVersion": "1.07",
"eventTime": "2019-11-14T00:52:00Z",
"awsRegion": "us-east-1",
"eventID": "EXAMPLEc-28be-486c-8928-49ce6EXAMPLE",
...
}
]
}
```
----
URL of the AWS SQS queue that messages will be received from. Required.

[float]
==== `visibility_timeout`

The duration that the received messages are hidden from subsequent
retrieve requests after being retrieved by a ReceiveMessage request.
This value needs to be a lot bigger than {beatname_uc} collection frequency so
if it took too long to read the s3 log, this sqs message will not be reprocessed.
The default visibility timeout for a message is 300 seconds. The minimum
is 0 seconds. The maximum is 12 hours.

[float]
==== `aws credentials`
Expand Down
18 changes: 6 additions & 12 deletions x-pack/filebeat/input/s3/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,11 @@ type s3Context struct {
errC chan error
}

var (
// The maximum number of messages to return. Amazon SQS never returns more messages
// than this value (however, fewer messages might be returned).
maxNumberOfMessage uint8 = 10

// The duration (in seconds) for which the call waits for a message to arrive
// in the queue before returning. If a message is available, the call returns
// sooner than WaitTimeSeconds. If no messages are available and the wait time
// expires, the call returns successfully with an empty list of messages.
waitTimeSecond uint8 = 10
)
// The duration (in seconds) for which the call waits for a message to arrive
// in the queue before returning. If a message is available, the call returns
// sooner than WaitTimeSeconds. If no messages are available and the wait time
// expires, the call returns successfully with an empty list of messages.
var waitTimeSecond uint8 = 10

func (c *s3Collector) run() {
defer c.logger.Info("s3 input worker has stopped.")
Expand Down Expand Up @@ -205,7 +199,7 @@ func (c *s3Collector) receiveMessage(svcSQS sqsiface.ClientAPI, visibilityTimeou
&sqs.ReceiveMessageInput{
QueueUrl: &c.config.QueueURL,
MessageAttributeNames: []string{"All"},
MaxNumberOfMessages: awssdk.Int64(int64(maxNumberOfMessage)),
MaxNumberOfMessages: awssdk.Int64(int64(c.config.MaxNumberOfMessages)),
VisibilityTimeout: &visibilityTimeout,
WaitTimeSeconds: awssdk.Int64(int64(waitTimeSecond)),
})
Expand Down
22 changes: 15 additions & 7 deletions x-pack/filebeat/input/s3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
)

type config struct {
APITimeout time.Duration `config:"api_timeout"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
FileSelectors []FileSelectorCfg `config:"file_selectors"`
FipsEnabled bool `config:"fips_enabled"`
MaxNumberOfMessages int `config:"max_number_of_messages"`
QueueURL string `config:"queue_url" validate:"nonzero,required"`
VisibilityTimeout time.Duration `config:"visibility_timeout"`
FipsEnabled bool `config:"fips_enabled"`
AwsConfig awscommon.ConfigAWS `config:",inline"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
APITimeout time.Duration `config:"api_timeout"`
FileSelectors []FileSelectorCfg `config:"file_selectors"`
}

// FileSelectorCfg defines type and configuration of FileSelectors
Expand All @@ -31,9 +32,10 @@ type FileSelectorCfg struct {

func defaultConfig() config {
return config{
VisibilityTimeout: 300 * time.Second,
APITimeout: 120 * time.Second,
FipsEnabled: false,
APITimeout: 120 * time.Second,
FipsEnabled: false,
MaxNumberOfMessages: 5,
VisibilityTimeout: 300 * time.Second,
}
}

Expand All @@ -42,16 +44,22 @@ func (c *config) Validate() error {
return fmt.Errorf("visibility timeout %v is not within the "+
"required range 0s to 12h", c.VisibilityTimeout)
}

if c.APITimeout < 0 || c.APITimeout > c.VisibilityTimeout/2 {
return fmt.Errorf("api timeout %v needs to be larger than"+
" 0s and smaller than half of the visibility timeout", c.APITimeout)
}

for i := range c.FileSelectors {
r, err := regexp.Compile(c.FileSelectors[i].RegexString)
if err != nil {
return err
}
c.FileSelectors[i].Regex = r
}

if c.MaxNumberOfMessages > 10 || c.MaxNumberOfMessages < 1 {
return fmt.Errorf(" max_number_of_messages %v needs to be between 1 and 10", c.MaxNumberOfMessages)
}
return nil
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (in *s3Input) createCollector(ctx v2.Context, pipeline beat.Pipeline) (*s3C
}

log.Debug("s3 service name = ", s3Servicename)

log.Debug("s3 input config max_number_of_messages = ", in.config.MaxNumberOfMessages)
return &s3Collector{
cancellation: ctxtool.FromCanceller(ctx.Cancelation),
logger: log,
Expand Down

0 comments on commit a10dca7

Please sign in to comment.