From 3f7d6a6147ef37e0cb5f8b2fdc4ef96367b2dc11 Mon Sep 17 00:00:00 2001 From: Jarek Miszkinis Date: Tue, 23 Oct 2018 23:23:45 +0900 Subject: [PATCH] Reduce casting between []byte and string in CRI log processing (#8424) * Reduce casting between []byte and string in CRI log processing * Refactor CRI and json-file detection * Improve handling of new lines in linux/windows CRI logs. * Move new line removal to OS specific functions. * Add option to force CRI log parsing, otherwise autodetect log type. * Fixed unit tests and new line removal * Fixed new line removal * Added unit tests to check Forced CRI flag --- filebeat/input/log/config.go | 1 + filebeat/input/log/harvester.go | 2 +- libbeat/reader/readjson/docker_json.go | 32 ++++---- libbeat/reader/readjson/docker_json_test.go | 76 ++++++++++++++++++- libbeat/reader/readjson/docker_json_unix.go | 30 ++++++++ .../reader/readjson/docker_json_windows.go | 30 ++++++++ 6 files changed, 153 insertions(+), 18 deletions(-) create mode 100644 libbeat/reader/readjson/docker_json_unix.go create mode 100644 libbeat/reader/readjson/docker_json_windows.go diff --git a/filebeat/input/log/config.go b/filebeat/input/log/config.go index a6a117db374..12ebaa5ebd4 100644 --- a/filebeat/input/log/config.go +++ b/filebeat/input/log/config.go @@ -106,6 +106,7 @@ type config struct { DockerJSON *struct { Stream string `config:"stream"` Partial bool `config:"partial"` + ForceCRI bool `config:"force_cri_logs"` CRIFlags bool `config:"cri_flags"` } `config:"docker-json"` } diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index ad67637c0cf..cac56e78bb6 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -561,7 +561,7 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { if h.config.DockerJSON != nil { // Docker json-file format, add custom parsing to the pipeline - r = readjson.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.CRIFlags) + r = readjson.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.ForceCRI, h.config.DockerJSON.CRIFlags) } if h.config.JSON != nil { diff --git a/libbeat/reader/readjson/docker_json.go b/libbeat/reader/readjson/docker_json.go index a67bef53406..a5331bc9c67 100644 --- a/libbeat/reader/readjson/docker_json.go +++ b/libbeat/reader/readjson/docker_json.go @@ -20,7 +20,6 @@ package readjson import ( "bytes" "encoding/json" - "strings" "time" "github.com/elastic/beats/libbeat/common" @@ -38,6 +37,9 @@ type DockerJSONReader struct { // join partial lines partial bool + // Force log format: json-file | cri + forceCRI bool + // parse CRI flags criflags bool } @@ -51,11 +53,12 @@ type logLine struct { } // New creates a new reader renaming a field -func New(r reader.Reader, stream string, partial bool, CRIFlags bool) *DockerJSONReader { +func New(r reader.Reader, stream string, partial bool, forceCRI bool, CRIFlags bool) *DockerJSONReader { return &DockerJSONReader{ stream: stream, partial: partial, reader: r, + forceCRI: forceCRI, criflags: CRIFlags, } } @@ -74,11 +77,11 @@ func (p *DockerJSONReader) parseCRILog(message *reader.Message, msg *logLine) er i := 0 // timestamp - log := strings.SplitN(string(message.Content), " ", split) + log := bytes.SplitN(message.Content, []byte{' '}, split) if len(log) < split { return errors.New("invalid CRI log format") } - ts, err := time.Parse(time.RFC3339, log[i]) + ts, err := time.Parse(time.RFC3339, string(log[i])) if err != nil { return errors.Wrap(err, "parsing CRI timestamp") } @@ -86,16 +89,16 @@ func (p *DockerJSONReader) parseCRILog(message *reader.Message, msg *logLine) er i++ // stream - msg.Stream = log[i] + msg.Stream = string(log[i]) i++ // tags partial := false if p.criflags { // currently only P(artial) or F(ull) are available - tags := strings.Split(log[i], ":") + tags := bytes.Split(log[i], []byte{':'}) for _, tag := range tags { - if tag == "P" { + if len(tag) == 1 && tag[0] == 'P' { partial = true } } @@ -106,12 +109,10 @@ func (p *DockerJSONReader) parseCRILog(message *reader.Message, msg *logLine) er message.AddFields(common.MapStr{ "stream": msg.Stream, }) - // Remove ending \n for partial messages - message.Content = []byte(log[i]) + // Remove \n ending for partial messages + message.Content = log[i] if partial { - message.Content = bytes.TrimRightFunc(message.Content, func(r rune) bool { - return r == '\n' || r == '\r' - }) + stripNewLine(message) } return nil @@ -144,7 +145,12 @@ func (p *DockerJSONReader) parseDockerJSONLog(message *reader.Message, msg *logL } func (p *DockerJSONReader) parseLine(message *reader.Message, msg *logLine) error { - if strings.HasPrefix(string(message.Content), "{") { + if p.forceCRI { + return p.parseCRILog(message, msg) + } + + // If froceCRI isn't set, autodetect file type + if len(message.Content) > 0 && message.Content[0] == '{' { return p.parseDockerJSONLog(message, msg) } diff --git a/libbeat/reader/readjson/docker_json_test.go b/libbeat/reader/readjson/docker_json_test.go index b28247d8374..1d475cec8c7 100644 --- a/libbeat/reader/readjson/docker_json_test.go +++ b/libbeat/reader/readjson/docker_json_test.go @@ -33,6 +33,7 @@ func TestDockerJSON(t *testing.T) { input [][]byte stream string partial bool + forceCRI bool criflags bool expectedError bool expectedMessage reader.Message @@ -143,19 +144,19 @@ func TestDockerJSON(t *testing.T) { }, }, { - name: "Split lines", + name: "CRI Split lines", input: [][]byte{ []byte(`2017-10-12T13:32:21.232861448Z stdout P 2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`), []byte(`2017-11-12T23:32:21.212771448Z stdout F error`), }, - stream: "stdout", + stream: "stdout", + partial: true, expectedMessage: reader.Message{ Content: []byte("2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache error"), Fields: common.MapStr{"stream": "stdout"}, Ts: time.Date(2017, 10, 12, 13, 32, 21, 232861448, time.UTC), Bytes: 163, }, - partial: true, criflags: true, }, { @@ -189,12 +190,79 @@ func TestDockerJSON(t *testing.T) { Bytes: 109, }, }, + { + name: "Force CRI with JSON logs", + input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, + stream: "all", + forceCRI: true, + expectedError: true, + }, + { + name: "Force CRI log no tags", + input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)}, + stream: "all", + expectedMessage: reader.Message{ + Content: []byte("2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache"), + Fields: common.MapStr{"stream": "stdout"}, + Ts: time.Date(2017, 9, 12, 22, 32, 21, 212861448, time.UTC), + Bytes: 115, + }, + forceCRI: true, + criflags: false, + }, + { + name: "Force CRI log with flags", + input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout F 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)}, + stream: "all", + expectedMessage: reader.Message{ + Content: []byte("2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache"), + Fields: common.MapStr{"stream": "stdout"}, + Ts: time.Date(2017, 9, 12, 22, 32, 21, 212861448, time.UTC), + Bytes: 117, + }, + forceCRI: true, + criflags: true, + }, + { + name: "Force CRI split lines", + input: [][]byte{ + []byte(`2017-10-12T13:32:21.232861448Z stdout P 2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`), + []byte(`2017-11-12T23:32:21.212771448Z stdout F error`), + }, + stream: "stdout", + partial: true, + expectedMessage: reader.Message{ + Content: []byte("2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache error"), + Fields: common.MapStr{"stream": "stdout"}, + Ts: time.Date(2017, 10, 12, 13, 32, 21, 232861448, time.UTC), + Bytes: 163, + }, + forceCRI: true, + criflags: true, + }, + { + name: "Force CRI split lines and remove \\n", + input: [][]byte{ + []byte("2017-10-12T13:32:21.232861448Z stdout P 2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache\n"), + []byte("2017-11-12T23:32:21.212771448Z stdout F error"), + }, + stream: "stdout", + expectedMessage: reader.Message{ + Content: []byte("2017-10-12 13:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache error"), + Fields: common.MapStr{"stream": "stdout"}, + Ts: time.Date(2017, 10, 12, 13, 32, 21, 232861448, time.UTC), + Bytes: 164, + }, + partial: true, + forceCRI: true, + criflags: true, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { r := &mockReader{messages: test.input} - json := New(r, test.stream, test.partial, test.criflags) + json := New(r, test.stream, test.partial, test.forceCRI, test.criflags) message, err := json.Next() if test.expectedError { diff --git a/libbeat/reader/readjson/docker_json_unix.go b/libbeat/reader/readjson/docker_json_unix.go new file mode 100644 index 00000000000..40eb75b5825 --- /dev/null +++ b/libbeat/reader/readjson/docker_json_unix.go @@ -0,0 +1,30 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +build linux darwin + +package readjson + +import ( + "github.com/elastic/beats/libbeat/reader" +) + +func stripNewLine(msg *reader.Message) { + l := len(msg.Content) + if l > 0 && msg.Content[l-1] == '\n' { + msg.Content = msg.Content[:l-1] + } +} diff --git a/libbeat/reader/readjson/docker_json_windows.go b/libbeat/reader/readjson/docker_json_windows.go new file mode 100644 index 00000000000..e750b4095ee --- /dev/null +++ b/libbeat/reader/readjson/docker_json_windows.go @@ -0,0 +1,30 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package readjson + +import ( + "bytes" + + "github.com/elastic/beats/libbeat/reader" +) + +func stripNewLine(msg *reader.Message) { + msg.Content = bytes.TrimRightFunc(msg.Content, func(r rune) bool { + return r == '\n' || r == '\r' + }) +}