diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 990d09ff645..21da2ec3d06 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -57,6 +57,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Fixed a memory leak when harvesters are closed. {pull}7820[7820] - Fix date format in Mongodb Ingest pipeline. {pull}7974[7974] - Mark the TCP and UDP input as GA. {pull}8125[8125] +- Fixed a docker input error due to the offset update bug in partial log join.{pull}8177[8177] *Heartbeat* diff --git a/filebeat/reader/readjson/docker_json.go b/filebeat/reader/readjson/docker_json.go index 1ec2514b787..f9d4e80626d 100644 --- a/filebeat/reader/readjson/docker_json.go +++ b/filebeat/reader/readjson/docker_json.go @@ -136,6 +136,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { return message, err } message.Content = append(message.Content, next.Content...) + message.Bytes += next.Bytes } } else { message, err = parseCRILog(message, &crioLine) diff --git a/filebeat/reader/readjson/docker_json_test.go b/filebeat/reader/readjson/docker_json_test.go index 7bb997581d2..642113a1e90 100644 --- a/filebeat/reader/readjson/docker_json_test.go +++ b/filebeat/reader/readjson/docker_json_test.go @@ -43,6 +43,7 @@ func TestDockerJSON(t *testing.T) { Content: []byte("1:M 09 Nov 13:27:36.276 # User requested shutdown...\n"), Fields: common.MapStr{"stream": "stdout"}, Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), + Bytes: 122, }, }, // Wrong JSON @@ -77,6 +78,7 @@ func TestDockerJSON(t *testing.T) { Content: []byte("2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache"), Fields: common.MapStr{"stream": "stdout"}, Ts: time.Date(2017, 9, 12, 22, 32, 21, 212861448, time.UTC), + Bytes: 115, }, }, // Filtering stream @@ -91,6 +93,7 @@ func TestDockerJSON(t *testing.T) { Content: []byte("unfiltered\n"), Fields: common.MapStr{"stream": "stderr"}, Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), + Bytes: 80, }, }, // Filtering stream @@ -105,6 +108,7 @@ func TestDockerJSON(t *testing.T) { Content: []byte("2017-11-12 23:32:21.212 [ERROR][77] table.go 111: error"), Fields: common.MapStr{"stream": "stderr"}, Ts: time.Date(2017, 11, 12, 23, 32, 21, 212771448, time.UTC), + Bytes: 93, }, }, // Split lines @@ -119,6 +123,7 @@ func TestDockerJSON(t *testing.T) { Content: []byte("1:M 09 Nov 13:27:36.276 # User requested shutdown...\n"), Fields: common.MapStr{"stream": "stdout"}, Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), + Bytes: 190, }, }, // Split lines with partial disabled @@ -133,6 +138,7 @@ func TestDockerJSON(t *testing.T) { Content: []byte("1:M 09 Nov 13:27:36.276 # User requested "), Fields: common.MapStr{"stream": "stdout"}, Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), + Bytes: 109, }, }, } @@ -159,5 +165,6 @@ func (m *mockReader) Next() (reader.Message, error) { m.messages = m.messages[1:] return reader.Message{ Content: message, + Bytes: len(message), }, nil }