Skip to content

Commit

Permalink
Cherry-pick elastic#19972 to 7.9: [Filebeat] remove delimiter \n from…
Browse files Browse the repository at this point in the history
… log line in s3 input (elastic#20064)

* [Filebeat] remove delimiter \n from log line in s3 input (elastic#19972)

* remove delimiter \n from log line in s3 input

(cherry picked from commit 08a9f95)

* backport elastic#20041
  • Loading branch information
kaiyan-sheng committed Jul 21, 2020
1 parent f6a12b8 commit bb8216d
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fixed `cloudfoundry.access` to have the correct `cloudfoundry.app.id` contents. {pull}17847[17847]
- Fixing `ingress_controller.` fields to be of type keyword instead of text. {issue}17834[17834]
- Fixed typo in log message. {pull}17897[17897]
- Fix S3 input to trim delimiter /n from each log line. {pull}19972[19972]

*Heartbeat*

Expand Down
27 changes: 20 additions & 7 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,11 +477,7 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
// handle s3 objects that are not json content-type
offset := 0
for {
log, err := reader.ReadString('\n')
if log == "" {
break
}

log, err := readStringAndTrimDelimiter(reader)
if err == io.EOF {
// create event for last line
offset += len([]byte(log))
Expand All @@ -494,11 +490,15 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
}
return nil
} else if err != nil {
err = errors.Wrap(err, "ReadString failed")
err = errors.Wrap(err, "readStringAndTrimDelimiter failed")
p.logger.Error(err)
return err
}

if log == "" {
break
}

// create event per log line
offset += len([]byte(log))
event := createEvent(log, offset, info, objectHash, s3Ctx)
Expand Down Expand Up @@ -566,7 +566,8 @@ func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3

func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) error {
vJSON, err := json.Marshal(jsonFields)
log := string(vJSON)
logOriginal := string(vJSON)
log := trimLogDelimiter(logOriginal)
offset += len([]byte(log))
event := createEvent(log, offset, s3Info, objectHash, s3Ctx)

Expand Down Expand Up @@ -609,6 +610,18 @@ func (p *s3Input) deleteMessage(queueURL string, messagesReceiptHandle string, s
return nil
}

func trimLogDelimiter(log string) string {
return strings.TrimSuffix(log, "\n")
}

func readStringAndTrimDelimiter(reader *bufio.Reader) (string, error) {
logOriginal, err := reader.ReadString('\n')
if err != nil {
return logOriginal, err
}
return trimLogDelimiter(logOriginal), nil
}

func createEvent(log string, offset int, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event {
s3Ctx.Inc()

Expand Down
49 changes: 40 additions & 9 deletions x-pack/filebeat/input/s3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ type MockS3Client struct {
}

var (
s3LogString1 = "36c1f test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5141F REST.HEAD.OBJECT Screen1.png \n"
s3LogString2 = "28kdg test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5A070 REST.HEAD.OBJECT Screen2.png \n"
mockSvc = &MockS3Client{}
info = s3Info{
s3LogString1 = "36c1f test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5141F REST.HEAD.OBJECT Screen1.png\n"
s3LogString1Trimmed = "36c1f test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5141F REST.HEAD.OBJECT Screen1.png"
s3LogString2 = "28kdg test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5A070 REST.HEAD.OBJECT Screen2.png\n"
s3LogString2Trimmed = "28kdg test-s3-ks [20/Jun/2019] 1.2.3.4 arn:aws:iam::1234:user/test@elastic.co 5A070 REST.HEAD.OBJECT Screen2.png"
mockSvc = &MockS3Client{}
info = s3Info{
name: "test-s3-ks",
key: "log2019-06-21-16-16-54",
region: "us-west-1",
Expand Down Expand Up @@ -182,15 +184,15 @@ func TestNewS3BucketReader(t *testing.T) {
for i := 0; i < 3; i++ {
switch i {
case 0:
log, err := reader.ReadString('\n')
log, err := readStringAndTrimDelimiter(reader)
assert.NoError(t, err)
assert.Equal(t, s3LogString1, log)
assert.Equal(t, s3LogString1Trimmed, log)
case 1:
log, err := reader.ReadString('\n')
log, err := readStringAndTrimDelimiter(reader)
assert.NoError(t, err)
assert.Equal(t, s3LogString2, log)
assert.Equal(t, s3LogString2Trimmed, log)
case 2:
log, err := reader.ReadString('\n')
log, err := readStringAndTrimDelimiter(reader)
assert.Error(t, io.EOF, err)
assert.Equal(t, "", log)
}
Expand Down Expand Up @@ -373,3 +375,32 @@ May 28 03:03:29 Shaunaks-MacBook-Pro-Work VTDecoderXPCService[57953]: DEPRECATED
})
}
}

func TestTrimLogDelimiter(t *testing.T) {
cases := []struct {
title string
logOriginal string
expectedLog string
}{
{"string with delimiter",
`test
`,
"test",
},
{"string without delimiter",
"test",
"test",
},
{"string just with delimiter",
`
`,
"",
},
}
for _, c := range cases {
t.Run(c.title, func(t *testing.T) {
log := trimLogDelimiter(c.logOriginal)
assert.Equal(t, c.expectedLog, log)
})
}
}

0 comments on commit bb8216d

Please sign in to comment.