diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index d1b7b077e7b..4b4eed1a05f 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff] *Filebeat* - Fix date format in Mongodb Ingest pipeline. {pull}7974[7974] +- Fixed a docker input error due to the offset update bug in partial log join.{pull}8177[8177] *Heartbeat* diff --git a/filebeat/reader/docker_json/docker_json.go b/filebeat/reader/docker_json/docker_json.go index df87e9a161f..f3913da1eb4 100644 --- a/filebeat/reader/docker_json/docker_json.go +++ b/filebeat/reader/docker_json/docker_json.go @@ -136,6 +136,7 @@ func (p *Reader) Next() (reader.Message, error) { return message, err } message.Content = append(message.Content, next.Content...) + message.Bytes += next.Bytes } } else { message, err = parseCRILog(message, &crioLine) diff --git a/filebeat/reader/docker_json/docker_json_test.go b/filebeat/reader/docker_json/docker_json_test.go index 2b838f0e079..4f2cbcb8f7a 100644 --- a/filebeat/reader/docker_json/docker_json_test.go +++ b/filebeat/reader/docker_json/docker_json_test.go @@ -43,6 +43,7 @@ func TestDockerJSON(t *testing.T) { Content: []byte("1:M 09 Nov 13:27:36.276 # User requested shutdown...\n"), Fields: common.MapStr{"stream": "stdout"}, Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), + Bytes: 122, }, }, // Wrong JSON @@ -77,6 +78,7 @@ func TestDockerJSON(t *testing.T) { Content: []byte("2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache"), Fields: common.MapStr{"stream": "stdout"}, Ts: time.Date(2017, 9, 12, 22, 32, 21, 212861448, time.UTC), + Bytes: 115, }, }, // Filtering stream @@ -91,6 +93,7 @@ func TestDockerJSON(t *testing.T) { Content: []byte("unfiltered\n"), Fields: common.MapStr{"stream": "stderr"}, Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), + Bytes: 80, }, }, // Filtering stream @@ -105,6 +108,7 @@ func TestDockerJSON(t *testing.T) { Content: []byte("2017-11-12 23:32:21.212 [ERROR][77] table.go 111: error"), Fields: common.MapStr{"stream": "stderr"}, Ts: time.Date(2017, 11, 12, 23, 32, 21, 212771448, time.UTC), + Bytes: 93, }, }, // Split lines @@ -119,6 +123,7 @@ func TestDockerJSON(t *testing.T) { Content: []byte("1:M 09 Nov 13:27:36.276 # User requested shutdown...\n"), Fields: common.MapStr{"stream": "stdout"}, Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), + Bytes: 190, }, }, // Split lines with partial disabled @@ -133,6 +138,7 @@ func TestDockerJSON(t *testing.T) { Content: []byte("1:M 09 Nov 13:27:36.276 # User requested "), Fields: common.MapStr{"stream": "stdout"}, Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), + Bytes: 109, }, }, } @@ -159,5 +165,6 @@ func (m *mockReader) Next() (reader.Message, error) { m.messages = m.messages[1:] return reader.Message{ Content: message, + Bytes: len(message), }, nil }