diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 47484cd1fef..24e31d17720 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -118,6 +118,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fixed `cloudfoundry.access` to have the correct `cloudfoundry.app.id` contents. {pull}17847[17847] - Fixing `ingress_controller.` fields to be of type keyword instead of text. {issue}17834[17834] - Fixed typo in log message. {pull}17897[17897] +- Fix S3 input to trim delimiter /n from each log line. {pull}19972[19972] *Heartbeat* diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index b5201686597..e74800ae127 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -477,11 +477,7 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C // handle s3 objects that are not json content-type offset := 0 for { - log, err := reader.ReadString('\n') - if log == "" { - break - } - + log, err := readStringAndTrimDelimiter(reader) if err == io.EOF { // create event for last line offset += len([]byte(log)) @@ -494,11 +490,15 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C } return nil } else if err != nil { - err = errors.Wrap(err, "ReadString failed") + err = errors.Wrap(err, "readStringAndTrimDelimiter failed") p.logger.Error(err) return err } + if log == "" { + break + } + // create event per log line offset += len([]byte(log)) event := createEvent(log, offset, info, objectHash, s3Ctx) @@ -566,7 +566,8 @@ func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3 func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) error { vJSON, err := json.Marshal(jsonFields) - log := string(vJSON) + logOriginal := string(vJSON) + log := trimLogDelimiter(logOriginal) offset += len([]byte(log)) event := createEvent(log, offset, s3Info, objectHash, s3Ctx) @@ -609,6 +610,18 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s return nil } +func trimLogDelimiter(log string) string { + return strings.TrimSuffix(log, "\n") +} + +func readStringAndTrimDelimiter(reader *bufio.Reader) (string, error) { + logOriginal, err := reader.ReadString('\n') + if err != nil { + return logOriginal, err + } + return trimLogDelimiter(logOriginal), nil +} + func createEvent(log string, offset int, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event { s3Ctx.Inc() diff --git a/x-pack/filebeat/input/s3/input_test.go b/x-pack/filebeat/input/s3/input_test.go index 5eddbaad956..c9ff86780de 100644 --- a/x-pack/filebeat/input/s3/input_test.go +++ b/x-pack/filebeat/input/s3/input_test.go @@ -32,10 +32,12 @@ type MockS3Client struct { } var ( - s3LogString1 = "36c1f test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5141F REST.HEAD.OBJECT Screen1.png \n" - s3LogString2 = "28kdg test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5A070 REST.HEAD.OBJECT Screen2.png \n" - mockSvc = &MockS3Client{} - info = s3Info{ + s3LogString1 = "36c1f test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5141F REST.HEAD.OBJECT Screen1.png\n" + s3LogString1Trimmed = "36c1f test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5141F REST.HEAD.OBJECT Screen1.png" + s3LogString2 = "28kdg test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5A070 REST.HEAD.OBJECT Screen2.png\n" + s3LogString2Trimmed = "28kdg test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5A070 REST.HEAD.OBJECT Screen2.png" + mockSvc = &MockS3Client{} + info = s3Info{ name: "test-s3-ks", key: "log2019-06-21-16-16-54", region: "us-west-1", @@ -182,15 +184,15 @@ func TestNewS3BucketReader(t *testing.T) { for i := 0; i < 3; i++ { switch i { case 0: - log, err := reader.ReadString('\n') + log, err := readStringAndTrimDelimiter(reader) assert.NoError(t, err) - assert.Equal(t, s3LogString1, log) + assert.Equal(t, s3LogString1Trimmed, log) case 1: - log, err := reader.ReadString('\n') + log, err := readStringAndTrimDelimiter(reader) assert.NoError(t, err) - assert.Equal(t, s3LogString2, log) + assert.Equal(t, s3LogString2Trimmed, log) case 2: - log, err := reader.ReadString('\n') + log, err := readStringAndTrimDelimiter(reader) assert.Error(t, io.EOF, err) assert.Equal(t, "", log) } @@ -373,3 +375,32 @@ May 28 03:03:29 Shaunaks-MacBook-Pro-Work VTDecoderXPCService[57953]: DEPRECATED }) } } + +func TestTrimLogDelimiter(t *testing.T) { + cases := []struct { + title string + logOriginal string + expectedLog string + }{ + {"string with delimiter", + `test +`, + "test", + }, + {"string without delimiter", + "test", + "test", + }, + {"string just with delimiter", + ` +`, + "", + }, + } + for _, c := range cases { + t.Run(c.title, func(t *testing.T) { + log := trimLogDelimiter(c.logOriginal) + assert.Equal(t, c.expectedLog, log) + }) + } +}