Skip to content

Commit

Permalink
Properly update offset in case of unparasable line (#22685)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark authored Apr 1, 2021
1 parent ade9a88 commit 655984e
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix handing missing eventtime and assignip field being set to N/A for fortinet module. {pull}22361[22361]
- Fix Zeek dashboard reference to `zeek.ssl.server.name` field. {pull}21696[21696]
- Fix for `field [source] not present as part of path [source.ip]` error in azure pipelines. {pull}22377[22377]
- Properly update offset in case of unparasable line. {pull}22685[22685]
- Drop aws.vpcflow.pkt_srcaddr and aws.vpcflow.pkt_dstaddr when equal to "-". {pull}22721[22721] {issue}22716[22716]
- Fix cisco umbrella module config by adding input variable. {pull}22892[22892]
- Fix network.direction logic in zeek connection fileset. {pull}22967[22967]
Expand Down
3 changes: 0 additions & 3 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,6 @@ func (inp *filestream) readFromSource(
s.Offset = 0
case ErrClosed:
log.Info("Reader was closed. Closing.")
case reader.ErrLineUnparsable:
log.Info("Skipping unparsable line in file.")
continue
default:
log.Errorf("Read line error: %v", err)
}
Expand Down
4 changes: 0 additions & 4 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,6 @@ func (h *Harvester) Run() error {
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source)
case ErrInactive:
logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive)
case reader.ErrLineUnparsable:
logp.Info("Skipping unparsable line in file: %v", h.state.Source)
//line unparsable, go to next line
continue
default:
logp.Err("Read line error: %v; File: %v", err, h.state.Source)
}
Expand Down
21 changes: 21 additions & 0 deletions filebeat/tests/files/logs/docker_corrupted.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{"log":"Fetching main repository github.com/elastic/beats...\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
{"log":"Fetching dependencies...\n","stream":"stdout","time":"2016-03-02T22:59:04.609292428Z"}
{"log":"Execute /scripts/packetbeat_before_build.sh\n","stream":"stdout","time":"2016-03-02T22:59:04.617434682Z"}
{"log":"patching file vendor/github.com/tsg/gopacket/pcap/pcap.go\n","stream":"stdout","time":"2016-03-02T22:59:04.626534779Z"}
{"log":"cp etc/packetbeat.template.json /build/packetbeat.template.json\n","stream":"stdout","time":"2016-03-02T22:59:04.639782988Z"}
{"log":"# linux\n","stream":"stdout","time":"2016-03-02T22:59:04.646276053Z"}
"log":"cp packetbeat.yml /build/packetbeat-linux.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.647847045Z"}
{"log":"# binary\n","stream":"stdout","time":"2016-03-02T22:59:04.653740138Z"}
{"log":"cp packetbeat.yml /build/packetbeat-binary.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.655979016Z"}
{"log":"# darwin\n","stream":"stdout","time":"2016-03-02T22:59:04.661181197Z"}
{"log":"cp packetbeat.yml /build/packetbeat-darwin.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.662859769Z"}
{"log":"sed -i.bk 's/device: any/device: en0/' /build/packetbeat-darwin.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.66649744Z"}
{"log":"rm /build/packetbeat-darwin.yml.bk\n","stream":"stdout","time":"2016-03-02T22:59:04.701199002Z"}
{"log":"# win\n","stream":"stdout","time":"2016-03-02T22:59:04.705067809Z"}
{"log":"cp packetbeat.yml /build/packetbeat-win.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.706629907Z"}
{"log":"sed -i.bk 's/device: any/device: 0/' /build/packetbeat-win.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.711993313Z"}
{"log":"rm /build/packetbeat-win.yml.bk\n","stream":"stdout","time":"2016-03-02T22:59:04.757913979Z"}
{"log":"Compiling for windows/amd64...\n","stream":"stdout","time":"2016-03-02T22:59:04.761895467Z"}
{"log":"Compiling for windows/386...\n","stream":"stdout","time":"2016-03-02T22:59:29.481736885Z"}
{"log":"Compiling for darwin/amd64...\n","stream":"stdout","time":"2016-03-02T22:59:55.205334574Z"}
{"log":"Moving binaries to host...\n","stream":"stdout","time":"2016-03-02T23:00:15.140397826Z"}
39 changes: 39 additions & 0 deletions filebeat/tests/system/test_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,42 @@ def test_container_input_cri(self):
output = self.read_output()
assert len(output) == 1
assert output[0]["stream"] == "stdout"

def test_container_input_registry_for_unparsable_lines(self):
"""
Test container input properly updates registry offset in case
of unparsable lines
"""
input_raw = """
- type: container
paths:
- {}/logs/*.log
"""
self.render_config_template(
input_raw=input_raw.format(os.path.abspath(self.working_dir)),
inputs=False,
)

os.mkdir(self.working_dir + "/logs/")
self.copy_files(["logs/docker_corrupted.log"],
target_dir="logs")

filebeat = self.start_beat()

self.wait_until(lambda: self.output_has(lines=20))

filebeat.check_kill_and_wait()

output = self.read_output()
assert len(output) == 20
assert output[19]["message"] == "Moving binaries to host..."
for o in output:
assert o["stream"] == "stdout"

# Check that file exist
data = self.get_registry()
logs = self.log_access()
assert logs.contains("Parse line error") == True
# bytes of healthy file are 2244 so for the corrupted one should
# be 2244-1=2243 since we removed one character
assert data[0]["offset"] == 2243
6 changes: 0 additions & 6 deletions libbeat/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package reader

import (
"errors"
"io"
)

Expand All @@ -30,8 +29,3 @@ type Reader interface {
io.Closer
Next() (Message, error)
}

var (
//ErrLineUnparsable is error thrown when Next() element from input is corrupted and can not be parsed
ErrLineUnparsable = errors.New("line is unparsable")
)
4 changes: 2 additions & 2 deletions libbeat/reader/readjson/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
err = p.parseLine(&message, &logLine)
if err != nil {
p.logger.Errorf("Parse line error: %v", err)
return message, reader.ErrLineUnparsable
continue
}

// Handle multiline messages, join partial lines
Expand All @@ -219,7 +219,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
err = p.parseLine(&next, &logLine)
if err != nil {
p.logger.Errorf("Parse line error: %v", err)
return message, reader.ErrLineUnparsable
continue
}
message.Content = append(message.Content, next.Content...)
}
Expand Down
37 changes: 29 additions & 8 deletions libbeat/reader/readjson/docker_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package readjson

import (
"io"
"testing"
"time"

Expand Down Expand Up @@ -53,7 +54,7 @@ func TestDockerJSON(t *testing.T) {
name: "Wrong JSON",
input: [][]byte{[]byte(`this is not JSON`)},
stream: "all",
expectedError: reader.ErrLineUnparsable,
expectedError: io.EOF,
expectedMessage: reader.Message{
Bytes: 16,
},
Expand All @@ -73,7 +74,7 @@ func TestDockerJSON(t *testing.T) {
name: "Wrong CRI",
input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout`)},
stream: "all",
expectedError: reader.ErrLineUnparsable,
expectedError: io.EOF,
expectedMessage: reader.Message{
Bytes: 37,
},
Expand All @@ -82,7 +83,7 @@ func TestDockerJSON(t *testing.T) {
name: "Wrong CRI",
input: [][]byte{[]byte(`{this is not JSON nor CRI`)},
stream: "all",
expectedError: reader.ErrLineUnparsable,
expectedError: io.EOF,
expectedMessage: reader.Message{
Bytes: 25,
},
Expand All @@ -91,7 +92,7 @@ func TestDockerJSON(t *testing.T) {
name: "Missing time",
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)},
stream: "all",
expectedError: reader.ErrLineUnparsable,
expectedError: io.EOF,
expectedMessage: reader.Message{
Bytes: 82,
},
Expand Down Expand Up @@ -218,7 +219,7 @@ func TestDockerJSON(t *testing.T) {
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)},
stream: "all",
format: "cri",
expectedError: reader.ErrLineUnparsable,
expectedError: io.EOF,
expectedMessage: reader.Message{
Bytes: 82,
},
Expand All @@ -228,7 +229,7 @@ func TestDockerJSON(t *testing.T) {
input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)},
stream: "all",
format: "docker",
expectedError: reader.ErrLineUnparsable,
expectedError: io.EOF,
expectedMessage: reader.Message{
Bytes: 115,
},
Expand Down Expand Up @@ -300,7 +301,7 @@ func TestDockerJSON(t *testing.T) {
[]byte(`{"log":"shutdown...\n","stream`),
},
stream: "stdout",
expectedError: reader.ErrLineUnparsable,
expectedError: io.EOF,
expectedMessage: reader.Message{
Bytes: 139,
},
Expand All @@ -324,11 +325,25 @@ func TestDockerJSON(t *testing.T) {
name: "Corrupted log message line",
input: [][]byte{[]byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`)},
stream: "all",
expectedError: reader.ErrLineUnparsable,
expectedError: io.EOF,
expectedMessage: reader.Message{
Bytes: 97,
},
},
{
name: "Corrupted log message line is skipped, keep correct bytes count",
input: [][]byte{
[]byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`),
[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`),
},
stream: "all",
expectedMessage: reader.Message{
Content: []byte("1:M 09 Nov 13:27:36.276 # User requested"),
Fields: common.MapStr{"stream": "stdout"},
Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC),
Bytes: 205,
},
},
}

for _, test := range tests {
Expand Down Expand Up @@ -358,6 +373,12 @@ type mockReader struct {
}

func (m *mockReader) Next() (reader.Message, error) {
if len(m.messages) < 1 {
return reader.Message{
Content: []byte{},
Bytes: 0,
}, io.EOF
}
message := m.messages[0]
m.messages = m.messages[1:]
return reader.Message{
Expand Down

0 comments on commit 655984e

Please sign in to comment.