From 764045eb4e60fa3af3516845a516e88b31367070 Mon Sep 17 00:00:00 2001 From: Andrea Spacca Date: Tue, 12 Oct 2021 17:09:52 +0200 Subject: [PATCH] =?UTF-8?q?[Filebeat]=20-=20S3=20Input=20-=20Add=20support?= =?UTF-8?q?=20for=20only=20iterating/accessing=20only=E2=80=A6=20(#28252)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [Filebeat] - S3 Input - Add support for only iterating/accessing only specific folders or datapaths --- CHANGELOG.next.asciidoc | 1 + filebeat/docs/modules/aws.asciidoc | 10 ++++ x-pack/filebeat/filebeat.reference.yml | 18 +++++++ x-pack/filebeat/input/awss3/config.go | 2 + x-pack/filebeat/input/awss3/config_test.go | 16 +++--- x-pack/filebeat/input/awss3/input.go | 2 + .../input/awss3/input_benchmark_test.go | 4 +- .../input/awss3/input_integration_test.go | 49 +++++++++++++++++++ x-pack/filebeat/input/awss3/interfaces.go | 5 +- .../input/awss3/mock_interfaces_test.go | 16 +++--- x-pack/filebeat/input/awss3/s3.go | 5 +- .../filebeat/input/awss3/s3_objects_test.go | 4 +- x-pack/filebeat/input/awss3/s3_test.go | 16 +++--- x-pack/filebeat/module/aws/_meta/config.yml | 18 +++++++ .../filebeat/module/aws/_meta/docs.asciidoc | 10 ++++ .../module/aws/cloudtrail/config/aws-s3.yml | 5 ++ .../module/aws/cloudtrail/manifest.yml | 1 + .../module/aws/cloudwatch/config/aws-s3.yml | 4 ++ .../module/aws/cloudwatch/manifest.yml | 1 + .../filebeat/module/aws/ec2/config/aws-s3.yml | 4 ++ x-pack/filebeat/module/aws/ec2/manifest.yml | 1 + .../filebeat/module/aws/elb/config/aws-s3.yml | 4 ++ x-pack/filebeat/module/aws/elb/manifest.yml | 1 + .../module/aws/s3access/config/aws-s3.yml | 4 ++ .../filebeat/module/aws/s3access/manifest.yml | 1 + .../module/aws/vpcflow/config/input.yml | 4 ++ .../filebeat/module/aws/vpcflow/manifest.yml | 1 + x-pack/filebeat/modules.d/aws.yml.disabled | 18 +++++++ 28 files changed, 195 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 37cd9702391..73310d107c6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -315,6 +315,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro - Update indentation for azure filebeat configuration. {pull}26604[26604] - Update Sophos xg module pipeline to deal with missing `date` and `time` fields. {pull}27834[27834] - sophos/xg fileset: Add missing pipeline for System Health logs. {pull}27827[27827] {issue}27826[27826] +- Add support for passing a prefix on S3 bucket list mode for AWS-S3 input {pull}28252[28252] {issue}27965[27965] - Resolve issue with @timestamp for defender_atp. {pull}28272[28272] - Tolerate faults when Windows Event Log session is interrupted {issue}27947[27947] {pull}28191[28191] - Add support for username in cisco asa security negotiation logs {pull}26975[26975] diff --git a/filebeat/docs/modules/aws.asciidoc b/filebeat/docs/modules/aws.asciidoc index 0035f225e38..097e22a741c 100644 --- a/filebeat/docs/modules/aws.asciidoc +++ b/filebeat/docs/modules/aws.asciidoc @@ -50,6 +50,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -67,6 +68,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -84,6 +86,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -101,6 +104,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -118,6 +122,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -135,6 +140,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -178,6 +184,10 @@ Use to vertically scale the input. Wait interval between completion of a list request to the S3 bucket and beginning of the next one. Default to be 120 seconds. +*`var.bucket_list_prefix`*:: + +Prefix to apply for the list request to the S3 bucket. Default empty. + *`var.endpoint`*:: Custom endpoint used to access AWS APIs. diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 7fcb4efaa68..61211d8cc1e 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -105,6 +105,9 @@ filebeat.modules: # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -166,6 +169,9 @@ filebeat.modules: # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -215,6 +221,9 @@ filebeat.modules: # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -264,6 +273,9 @@ filebeat.modules: # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -313,6 +325,9 @@ filebeat.modules: # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -362,6 +377,9 @@ filebeat.modules: # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index 404997ddc60..4105fbd9093 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -29,6 +29,7 @@ type config struct { QueueURL string `config:"queue_url"` BucketARN string `config:"bucket_arn"` BucketListInterval time.Duration `config:"bucket_list_interval"` + BucketListPrefix string `config:"bucket_list_prefix"` NumberOfWorkers int `config:"number_of_workers"` AWSConfig awscommon.ConfigAWS `config:",inline"` FileSelectors []fileSelectorConfig `config:"file_selectors"` @@ -40,6 +41,7 @@ func defaultConfig() config { APITimeout: 120 * time.Second, VisibilityTimeout: 300 * time.Second, BucketListInterval: 120 * time.Second, + BucketListPrefix: "", SQSWaitTime: 20 * time.Second, SQSMaxReceiveCount: 5, FIPSEnabled: false, diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index cd75d4df19c..f8573d60525 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -28,13 +28,15 @@ func TestConfig(t *testing.T) { parserConf := parser.Config{} require.NoError(t, parserConf.Unpack(common.MustNewConfigFrom(""))) return config{ - QueueURL: quequeURL, - BucketARN: s3Bucket, - APITimeout: 120 * time.Second, - VisibilityTimeout: 300 * time.Second, - SQSMaxReceiveCount: 5, - SQSWaitTime: 20 * time.Second, - BucketListInterval: 120 * time.Second, + QueueURL: quequeURL, + BucketARN: s3Bucket, + APITimeout: 120 * time.Second, + VisibilityTimeout: 300 * time.Second, + SQSMaxReceiveCount: 5, + SQSWaitTime: 20 * time.Second, + BucketListInterval: 120 * time.Second, + BucketListPrefix: "", + FIPSEnabled: false, MaxNumberOfMessages: 5, ReaderConfig: readerConfig{ diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 9cdd20d8ca1..4cd5a2c82b7 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -215,6 +215,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli log := ctx.Logger.With("bucket_arn", in.config.BucketARN) log.Infof("number_of_workers is set to %v.", in.config.NumberOfWorkers) log.Infof("bucket_list_interval is set to %v.", in.config.BucketListInterval) + log.Infof("bucket_list_prefix is set to %v.", in.config.BucketListPrefix) log.Infof("AWS region is set to %v.", in.awsConfig.Region) log.Debugf("AWS S3 service name is %v.", s3ServiceName) @@ -233,6 +234,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli states, persistentStore, in.config.BucketARN, + in.config.BucketListPrefix, in.awsConfig.Region, in.config.NumberOfWorkers, in.config.BucketListInterval) diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index 00540479d5c..9c718c83ac8 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -134,7 +134,7 @@ func (c constantS3) GetObject(ctx context.Context, bucket, key string) (*s3.GetO return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil } -func (c constantS3) ListObjectsPaginator(bucket string) s3Pager { +func (c constantS3) ListObjectsPaginator(bucket, prefix string) s3Pager { return c.pagerConstant } @@ -277,7 +277,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult } s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors) - s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", "region", numberOfWorkers, time.Second) + s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", "key-", "region", numberOfWorkers, time.Second) ctx, cancel := context.WithCancel(context.Background()) b.Cleanup(cancel) diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 9c6ebb3cac9..74866641e11 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -384,3 +384,52 @@ func TestGetRegionForBucketARN(t *testing.T) { regionName, err := getRegionForBucketARN(context.Background(), s3Client, tfConfig.BucketName) assert.Equal(t, tfConfig.AWSRegion, regionName) } + +func TestPaginatorListPrefix(t *testing.T) { + logp.TestingSetup() + + // Terraform is used to setup S3 and must be executed manually. + tfConfig := getTerraformOutputs(t) + + uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketName, + "testdata/events-array.json", + "testdata/invalid.json", + "testdata/log.json", + "testdata/log.ndjson", + "testdata/multiline.json", + "testdata/multiline.json.gz", + "testdata/multiline.txt", + "testdata/log.txt", // Skipped (no match). + ) + + awsConfig, err := external.LoadDefaultAWSConfig() + awsConfig.Region = tfConfig.AWSRegion + if err != nil { + t.Fatal(err) + } + + s3Client := s3.New(awscommon.EnrichAWSConfigWithEndpoint("", "s3", "", awsConfig)) + + s3API := &awsS3API{ + client: s3Client, + } + + var objects []string + paginator := s3API.ListObjectsPaginator(tfConfig.BucketName, "log") + for paginator.Next(context.Background()) { + page := paginator.CurrentPage() + for _, object := range page.Contents { + objects = append(objects, *object.Key) + } + } + + assert.NoError(t, paginator.Err()) + + expected := []string{ + "log.json", + "log.ndjson", + "log.txt", + } + + assert.Equal(t, expected, objects) +} diff --git a/x-pack/filebeat/input/awss3/interfaces.go b/x-pack/filebeat/input/awss3/interfaces.go index 2e406717348..c777072c6c9 100644 --- a/x-pack/filebeat/input/awss3/interfaces.go +++ b/x-pack/filebeat/input/awss3/interfaces.go @@ -66,7 +66,7 @@ type s3Getter interface { } type s3Lister interface { - ListObjectsPaginator(bucket string) s3Pager + ListObjectsPaginator(bucket, prefix string) s3Pager } type s3Pager interface { @@ -204,9 +204,10 @@ func (a *awsS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetOb return resp, nil } -func (a *awsS3API) ListObjectsPaginator(bucket string) s3Pager { +func (a *awsS3API) ListObjectsPaginator(bucket, prefix string) s3Pager { req := a.client.ListObjectsRequest(&s3.ListObjectsInput{ Bucket: awssdk.String(bucket), + Prefix: awssdk.String(prefix), }) pager := s3.NewListObjectsPaginator(req) diff --git a/x-pack/filebeat/input/awss3/mock_interfaces_test.go b/x-pack/filebeat/input/awss3/mock_interfaces_test.go index 1929fa7c9ec..85c11e0fe80 100644 --- a/x-pack/filebeat/input/awss3/mock_interfaces_test.go +++ b/x-pack/filebeat/input/awss3/mock_interfaces_test.go @@ -274,17 +274,17 @@ func (mr *MockS3APIMockRecorder) GetObject(ctx, bucket, key interface{}) *gomock } // ListObjectsPaginator mocks base method. -func (m *MockS3API) ListObjectsPaginator(bucket string) s3Pager { +func (m *MockS3API) ListObjectsPaginator(bucket, prefix string) s3Pager { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListObjectsPaginator", bucket) + ret := m.ctrl.Call(m, "ListObjectsPaginator", bucket, prefix) ret0, _ := ret[0].(s3Pager) return ret0 } // ListObjectsPaginator indicates an expected call of ListObjectsPaginator. -func (mr *MockS3APIMockRecorder) ListObjectsPaginator(bucket interface{}) *gomock.Call { +func (mr *MockS3APIMockRecorder) ListObjectsPaginator(bucket, prefix interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListObjectsPaginator", reflect.TypeOf((*MockS3API)(nil).ListObjectsPaginator), bucket) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListObjectsPaginator", reflect.TypeOf((*MockS3API)(nil).ListObjectsPaginator), bucket, prefix) } // Mocks3Getter is a mock of s3Getter interface. @@ -349,17 +349,17 @@ func (m *Mocks3Lister) EXPECT() *Mocks3ListerMockRecorder { } // ListObjectsPaginator mocks base method. -func (m *Mocks3Lister) ListObjectsPaginator(bucket string) s3Pager { +func (m *Mocks3Lister) ListObjectsPaginator(bucket, prefix string) s3Pager { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListObjectsPaginator", bucket) + ret := m.ctrl.Call(m, "ListObjectsPaginator", bucket, prefix) ret0, _ := ret[0].(s3Pager) return ret0 } // ListObjectsPaginator indicates an expected call of ListObjectsPaginator. -func (mr *Mocks3ListerMockRecorder) ListObjectsPaginator(bucket interface{}) *gomock.Call { +func (mr *Mocks3ListerMockRecorder) ListObjectsPaginator(bucket, prefix interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListObjectsPaginator", reflect.TypeOf((*Mocks3Lister)(nil).ListObjectsPaginator), bucket) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListObjectsPaginator", reflect.TypeOf((*Mocks3Lister)(nil).ListObjectsPaginator), bucket, prefix) } // MockS3Pager is a mock of s3Pager interface. diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index 313a71211e7..d69c9eccb44 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -41,6 +41,7 @@ type s3ObjectPayload struct { type s3Poller struct { numberOfWorkers int bucket string + listPrefix string region string bucketPollInterval time.Duration workerSem *sem @@ -61,6 +62,7 @@ func newS3Poller(log *logp.Logger, states *states, store *statestore.Store, bucket string, + listPrefix string, awsRegion string, numberOfWorkers int, bucketPollInterval time.Duration) *s3Poller { @@ -70,6 +72,7 @@ func newS3Poller(log *logp.Logger, return &s3Poller{ numberOfWorkers: numberOfWorkers, bucket: bucket, + listPrefix: listPrefix, region: awsRegion, bucketPollInterval: bucketPollInterval, workerSem: newSem(numberOfWorkers), @@ -142,7 +145,7 @@ func (p *s3Poller) GetS3Objects(ctx context.Context, s3ObjectPayloadChan chan<- bucketMetadata := strings.Split(p.bucket, ":") bucketName := bucketMetadata[len(bucketMetadata)-1] - paginator := p.s3.ListObjectsPaginator(bucketName) + paginator := p.s3.ListObjectsPaginator(bucketName, p.listPrefix) for paginator.Next(ctx) { listingID, err := uuid.NewV4() if err != nil { diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index 6cf1ea1fa5a..4ff6e4754f0 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -212,11 +212,11 @@ func TestNewMockS3Pager(t *testing.T) { defer ctrl.Finish() mockS3Pager := newMockS3Pager(ctrl, 1, fakeObjects) mockS3API := NewMockS3API(ctrl) - mockS3API.EXPECT().ListObjectsPaginator(gomock.Any()).Return(mockS3Pager) + mockS3API.EXPECT().ListObjectsPaginator(gomock.Any(), "").Return(mockS3Pager) // Test the mock. var keys []string - pager := mockS3API.ListObjectsPaginator("nombre") + pager := mockS3API.ListObjectsPaginator("nombre", "") for pager.Next(ctx) { for _, s3Obj := range pager.CurrentPage().Contents { keys = append(keys, *s3Obj.Key) diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go index dc87356ba65..250730ab055 100644 --- a/x-pack/filebeat/input/awss3/s3_test.go +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -48,9 +48,9 @@ func TestS3Poller(t *testing.T) { gomock.InOrder( mockAPI.EXPECT(). - ListObjectsPaginator(gomock.Eq(bucket)). + ListObjectsPaginator(gomock.Eq(bucket), gomock.Eq("key")). Times(1). - DoAndReturn(func(_ string) s3Pager { + DoAndReturn(func(_, _ string) s3Pager { return mockPager }), ) @@ -133,7 +133,7 @@ func TestS3Poller(t *testing.T) { Return(nil, errFakeConnectivityFailure) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil) - receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "region", numberOfWorkers, pollInterval) + receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) assert.Equal(t, numberOfWorkers, receiver.workerSem.available) }) @@ -158,16 +158,16 @@ func TestS3Poller(t *testing.T) { gomock.InOrder( // Initial ListObjectPaginator gets an error. mockAPI.EXPECT(). - ListObjectsPaginator(gomock.Eq(bucket)). + ListObjectsPaginator(gomock.Eq(bucket), gomock.Eq("key")). Times(1). - DoAndReturn(func(_ string) s3Pager { + DoAndReturn(func(_, _ string) s3Pager { return mockPagerFirst }), // After waiting for pollInterval, it retries. mockAPI.EXPECT(). - ListObjectsPaginator(gomock.Eq(bucket)). + ListObjectsPaginator(gomock.Eq(bucket), gomock.Eq("key")). Times(1). - DoAndReturn(func(_ string) s3Pager { + DoAndReturn(func(_, _ string) s3Pager { return mockPagerSecond }), ) @@ -263,7 +263,7 @@ func TestS3Poller(t *testing.T) { Return(nil, errFakeConnectivityFailure) s3ObjProc := newS3ObjectProcessorFactory(logp.NewLogger(inputName), nil, mockAPI, mockPublisher, nil) - receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "region", numberOfWorkers, pollInterval) + receiver := newS3Poller(logp.NewLogger(inputName), nil, mockAPI, s3ObjProc, newStates(inputCtx), store, bucket, "key", "region", numberOfWorkers, pollInterval) require.Error(t, context.DeadlineExceeded, receiver.Poll(ctx)) assert.Equal(t, numberOfWorkers, receiver.workerSem.available) }) diff --git a/x-pack/filebeat/module/aws/_meta/config.yml b/x-pack/filebeat/module/aws/_meta/config.yml index 393a197bc4e..1b139779b84 100644 --- a/x-pack/filebeat/module/aws/_meta/config.yml +++ b/x-pack/filebeat/module/aws/_meta/config.yml @@ -8,6 +8,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -69,6 +72,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -118,6 +124,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -167,6 +176,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -216,6 +228,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -265,6 +280,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s diff --git a/x-pack/filebeat/module/aws/_meta/docs.asciidoc b/x-pack/filebeat/module/aws/_meta/docs.asciidoc index a55dbc4583c..3fee8460161 100644 --- a/x-pack/filebeat/module/aws/_meta/docs.asciidoc +++ b/x-pack/filebeat/module/aws/_meta/docs.asciidoc @@ -45,6 +45,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -62,6 +63,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -79,6 +81,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -96,6 +99,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -113,6 +117,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -130,6 +135,7 @@ Example config: enabled: false #var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue #var.bucket_arn: 'arn:aws:s3:::mybucket' + #var.bucket_list_prefix: 'prefix' #var.bucket_list_interval: 300s #var.number_of_workers: 5 #var.shared_credential_file: /etc/filebeat/aws_credentials @@ -173,6 +179,10 @@ Use to vertically scale the input. Wait interval between completion of a list request to the S3 bucket and beginning of the next one. Default to be 120 seconds. +*`var.bucket_list_prefix`*:: + +Prefix to apply for the list request to the S3 bucket. Default empty. + *`var.endpoint`*:: Custom endpoint used to access AWS APIs. diff --git a/x-pack/filebeat/module/aws/cloudtrail/config/aws-s3.yml b/x-pack/filebeat/module/aws/cloudtrail/config/aws-s3.yml index 6134344678e..c95abb1cdc2 100644 --- a/x-pack/filebeat/module/aws/cloudtrail/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/cloudtrail/config/aws-s3.yml @@ -13,6 +13,11 @@ number_of_workers: {{ .number_of_workers }} {{ if .bucket_list_interval }} bucket_list_interval: {{ .bucket_list_interval }} {{ end }} + +{{ if .bucket_list_prefix }} +bucket_list_prefix: {{ .bucket_list_prefix }} +{{ end }} + file_selectors: {{ if .process_cloudtrail_logs }} - regex: '/CloudTrail/' diff --git a/x-pack/filebeat/module/aws/cloudtrail/manifest.yml b/x-pack/filebeat/module/aws/cloudtrail/manifest.yml index 6d2c9cdebe0..c0715d7647c 100644 --- a/x-pack/filebeat/module/aws/cloudtrail/manifest.yml +++ b/x-pack/filebeat/module/aws/cloudtrail/manifest.yml @@ -7,6 +7,7 @@ var: - name: bucket_arn - name: number_of_workers - name: bucket_list_interval + - name: bucket_list_prefix - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/module/aws/cloudwatch/config/aws-s3.yml b/x-pack/filebeat/module/aws/cloudwatch/config/aws-s3.yml index c98582c21ea..5b1fb24f561 100644 --- a/x-pack/filebeat/module/aws/cloudwatch/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/cloudwatch/config/aws-s3.yml @@ -14,6 +14,10 @@ number_of_workers: {{ .number_of_workers }} bucket_list_interval: {{ .bucket_list_interval }} {{ end }} +{{ if .bucket_list_prefix }} +bucket_list_prefix: {{ .bucket_list_prefix }} +{{ end }} + {{ if .credential_profile_name }} credential_profile_name: {{ .credential_profile_name }} {{ end }} diff --git a/x-pack/filebeat/module/aws/cloudwatch/manifest.yml b/x-pack/filebeat/module/aws/cloudwatch/manifest.yml index 7634f73d8d2..0223142a6a9 100644 --- a/x-pack/filebeat/module/aws/cloudwatch/manifest.yml +++ b/x-pack/filebeat/module/aws/cloudwatch/manifest.yml @@ -7,6 +7,7 @@ var: - name: bucket_arn - name: number_of_workers - name: bucket_list_interval + - name: bucket_list_prefix - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/module/aws/ec2/config/aws-s3.yml b/x-pack/filebeat/module/aws/ec2/config/aws-s3.yml index c98582c21ea..5b1fb24f561 100644 --- a/x-pack/filebeat/module/aws/ec2/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/ec2/config/aws-s3.yml @@ -14,6 +14,10 @@ number_of_workers: {{ .number_of_workers }} bucket_list_interval: {{ .bucket_list_interval }} {{ end }} +{{ if .bucket_list_prefix }} +bucket_list_prefix: {{ .bucket_list_prefix }} +{{ end }} + {{ if .credential_profile_name }} credential_profile_name: {{ .credential_profile_name }} {{ end }} diff --git a/x-pack/filebeat/module/aws/ec2/manifest.yml b/x-pack/filebeat/module/aws/ec2/manifest.yml index 7634f73d8d2..0223142a6a9 100644 --- a/x-pack/filebeat/module/aws/ec2/manifest.yml +++ b/x-pack/filebeat/module/aws/ec2/manifest.yml @@ -7,6 +7,7 @@ var: - name: bucket_arn - name: number_of_workers - name: bucket_list_interval + - name: bucket_list_prefix - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/module/aws/elb/config/aws-s3.yml b/x-pack/filebeat/module/aws/elb/config/aws-s3.yml index c98582c21ea..5b1fb24f561 100644 --- a/x-pack/filebeat/module/aws/elb/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/elb/config/aws-s3.yml @@ -14,6 +14,10 @@ number_of_workers: {{ .number_of_workers }} bucket_list_interval: {{ .bucket_list_interval }} {{ end }} +{{ if .bucket_list_prefix }} +bucket_list_prefix: {{ .bucket_list_prefix }} +{{ end }} + {{ if .credential_profile_name }} credential_profile_name: {{ .credential_profile_name }} {{ end }} diff --git a/x-pack/filebeat/module/aws/elb/manifest.yml b/x-pack/filebeat/module/aws/elb/manifest.yml index 128dc59791e..da22ed1b1cc 100644 --- a/x-pack/filebeat/module/aws/elb/manifest.yml +++ b/x-pack/filebeat/module/aws/elb/manifest.yml @@ -7,6 +7,7 @@ var: - name: bucket_arn - name: number_of_workers - name: bucket_list_interval + - name: bucket_list_prefix - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/module/aws/s3access/config/aws-s3.yml b/x-pack/filebeat/module/aws/s3access/config/aws-s3.yml index c98582c21ea..5b1fb24f561 100644 --- a/x-pack/filebeat/module/aws/s3access/config/aws-s3.yml +++ b/x-pack/filebeat/module/aws/s3access/config/aws-s3.yml @@ -14,6 +14,10 @@ number_of_workers: {{ .number_of_workers }} bucket_list_interval: {{ .bucket_list_interval }} {{ end }} +{{ if .bucket_list_prefix }} +bucket_list_prefix: {{ .bucket_list_prefix }} +{{ end }} + {{ if .credential_profile_name }} credential_profile_name: {{ .credential_profile_name }} {{ end }} diff --git a/x-pack/filebeat/module/aws/s3access/manifest.yml b/x-pack/filebeat/module/aws/s3access/manifest.yml index 7634f73d8d2..0223142a6a9 100644 --- a/x-pack/filebeat/module/aws/s3access/manifest.yml +++ b/x-pack/filebeat/module/aws/s3access/manifest.yml @@ -7,6 +7,7 @@ var: - name: bucket_arn - name: number_of_workers - name: bucket_list_interval + - name: bucket_list_prefix - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/module/aws/vpcflow/config/input.yml b/x-pack/filebeat/module/aws/vpcflow/config/input.yml index f5987c033d5..51b649b1e2e 100644 --- a/x-pack/filebeat/module/aws/vpcflow/config/input.yml +++ b/x-pack/filebeat/module/aws/vpcflow/config/input.yml @@ -16,6 +16,10 @@ number_of_workers: {{ .number_of_workers }} bucket_list_interval: {{ .bucket_list_interval }} {{ end }} +{{ if .bucket_list_prefix }} +bucket_list_prefix: {{ .bucket_list_prefix }} +{{ end }} + {{ if .credential_profile_name }} credential_profile_name: {{ .credential_profile_name }} {{ end }} diff --git a/x-pack/filebeat/module/aws/vpcflow/manifest.yml b/x-pack/filebeat/module/aws/vpcflow/manifest.yml index 87850096ed5..8871cf1cffb 100644 --- a/x-pack/filebeat/module/aws/vpcflow/manifest.yml +++ b/x-pack/filebeat/module/aws/vpcflow/manifest.yml @@ -7,6 +7,7 @@ var: - name: bucket_arn - name: number_of_workers - name: bucket_list_interval + - name: bucket_list_prefix - name: shared_credential_file - name: credential_profile_name - name: visibility_timeout diff --git a/x-pack/filebeat/modules.d/aws.yml.disabled b/x-pack/filebeat/modules.d/aws.yml.disabled index 57e370541c8..3d34116d225 100644 --- a/x-pack/filebeat/modules.d/aws.yml.disabled +++ b/x-pack/filebeat/modules.d/aws.yml.disabled @@ -11,6 +11,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -72,6 +75,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -121,6 +127,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -170,6 +179,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -219,6 +231,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s @@ -268,6 +283,9 @@ # AWS S3 bucket arn #var.bucket_arn: 'arn:aws:s3:::mybucket' + # AWS S3 list prefix + #var.bucket_list_prefix: 'prefix' + # Bucket list interval on S3 bucket #var.bucket_list_interval: 300s