Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix mysql parser handling of connection phase #8173

Merged
merged 5 commits into from
Sep 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Fixed a seccomp related error where the `fcntl64` syscall was not permitted
on 32-bit Linux and the sniffer failed to start. {issue}7839[7839]
- Added missing `cmdline` and `client_cmdline` fields to index template. {pull}8258[8258]
- Fixed the mysql missing transactions if monitoring a connection from the start. {pull}8173[8173]

*Winlogbeat*

Expand Down
39 changes: 20 additions & 19 deletions packetbeat/protos/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,18 @@ func (stream *mysqlStream) prepareForNewMessage() {
stream.data = stream.data[stream.parseOffset:]
stream.parseState = mysqlStateStart
stream.parseOffset = 0
stream.isClient = false
stream.message = nil
}

func (mysql *mysqlPlugin) isServerPort(port uint16) bool {
for _, sPort := range mysql.ports {
if uint16(sPort) == port {
return true
}
}
return false
}

func mysqlMessageParser(s *mysqlStream) (bool, bool) {
logp.Debug("mysqldetailed", "MySQL parser called. parseState = %s", s.parseState)

Expand All @@ -229,27 +237,20 @@ func mysqlMessageParser(s *mysqlStream) (bool, bool) {
m.seq = hdr[3]
m.typ = hdr[4]

logp.Debug("mysqldetailed", "MySQL Header: Packet length %d, Seq %d, Type=%d", m.packetLength, m.seq, m.typ)
logp.Debug("mysqldetailed", "MySQL Header: Packet length %d, Seq %d, Type=%d isClient=%v", m.packetLength, m.seq, m.typ, s.isClient)

if m.seq == 0 {
if s.isClient {
// starts Command Phase

if m.typ == mysqlCmdQuery {
if m.seq == 0 && m.typ == mysqlCmdQuery {
// parse request
m.isRequest = true
m.start = s.parseOffset
s.parseState = mysqlStateEatMessage

} else {
// ignore command
m.ignoreMessage = true
s.parseState = mysqlStateEatMessage
}

if !s.isClient {
s.isClient = true
}

} else if !s.isClient {
// parse response
m.isRequest = false
Expand All @@ -275,11 +276,6 @@ func mysqlMessageParser(s *mysqlStream) (bool, bool) {
m.ignoreMessage = true
s.parseState = mysqlStateEatMessage
}

} else {
// something else, not expected
logp.Debug("mysql", "Unexpected MySQL message of type %d received.", m.typ)
return false, false
}

case mysqlStateEatMessage:
Expand Down Expand Up @@ -500,9 +496,14 @@ func (mysql *mysqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple,
}

if priv.data[dir] == nil {
dstPort := tcptuple.DstPort
if dir == tcp.TCPDirectionReverse {
dstPort = tcptuple.SrcPort
}
priv.data[dir] = &mysqlStream{
data: pkt.Payload,
message: &mysqlMessage{ts: pkt.Ts},
data: pkt.Payload,
message: &mysqlMessage{ts: pkt.Ts},
isClient: mysql.isServerPort(dstPort),
}
} else {
// concatenate bytes
Expand All @@ -521,7 +522,7 @@ func (mysql *mysqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple,
}

ok, complete := mysqlMessageParser(priv.data[dir])
//logp.Debug("mysqldetailed", "mysqlMessageParser returned ok=%b complete=%b", ok, complete)
logp.Debug("mysqldetailed", "mysqlMessageParser returned ok=%v complete=%v", ok, complete)
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
Expand Down
27 changes: 15 additions & 12 deletions packetbeat/protos/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/hex"
"net"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -31,10 +32,11 @@ import (
"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/beats/packetbeat/protos"

"time"
"github.com/elastic/beats/packetbeat/protos/tcp"
)

const serverPort = 3306

type eventStore struct {
events []beat.Event
}
Expand All @@ -55,6 +57,7 @@ func mysqlModForTests(store *eventStore) *mysqlPlugin {

var mysql mysqlPlugin
config := defaultConfig
config.Ports = []int{serverPort}
mysql.init(callback, &config)
return &mysql
}
Expand Down Expand Up @@ -82,7 +85,7 @@ func TestMySQLParser_simpleRequest(t *testing.T) {
t.Errorf("Failed to decode hex string")
}

stream := &mysqlStream{data: message, message: new(mysqlMessage)}
stream := &mysqlStream{data: message, message: new(mysqlMessage), isClient: true}

ok, complete := mysqlMessageParser(stream)

Expand Down Expand Up @@ -364,7 +367,7 @@ func TestParseMySQL_simpleUpdateResponse(t *testing.T) {
countHandleMysql++
}

mysql.Parse(&pkt, &tuple, 1, private)
mysql.Parse(&pkt, &tuple, tcp.TCPDirectionOriginal, private)

if countHandleMysql != 1 {
t.Errorf("handleMysql not called")
Expand Down Expand Up @@ -405,7 +408,7 @@ func TestParseMySQL_threeResponses(t *testing.T) {
countHandleMysql++
}

mysql.Parse(&pkt, &tuple, 1, private)
mysql.Parse(&pkt, &tuple, tcp.TCPDirectionOriginal, private)

if countHandleMysql != 3 {
t.Errorf("handleMysql not called three times")
Expand Down Expand Up @@ -447,7 +450,7 @@ func TestParseMySQL_splitResponse(t *testing.T) {
countHandleMysql++
}

private = mysql.Parse(&pkt, &tuple, 1, private).(mysqlPrivateData)
private = mysql.Parse(&pkt, &tuple, tcp.TCPDirectionOriginal, private).(mysqlPrivateData)
if countHandleMysql != 0 {
t.Errorf("handleMysql called on first run")
}
Expand Down Expand Up @@ -482,7 +485,7 @@ func testTCPTuple() *common.TCPTuple {
IPLength: 4,
BaseTuple: common.BaseTuple{
SrcIP: net.IPv4(192, 168, 0, 1), DstIP: net.IPv4(192, 168, 0, 2),
SrcPort: 6512, DstPort: 3306,
SrcPort: 6512, DstPort: serverPort,
},
}
t.ComputeHashables()
Expand Down Expand Up @@ -540,17 +543,17 @@ func Test_gap_in_response(t *testing.T) {

private := protos.ProtocolData(new(mysqlPrivateData))

private = mysql.Parse(&req, tcptuple, 0, private)
private = mysql.Parse(&resp, tcptuple, 1, private)
private = mysql.Parse(&req, tcptuple, tcp.TCPDirectionOriginal, private)
private = mysql.Parse(&resp, tcptuple, tcp.TCPDirectionReverse, private)

logp.Debug("mysql", "Now sending gap..")

_, drop := mysql.GapInStream(tcptuple, 1, 10, private)
_, drop := mysql.GapInStream(tcptuple, tcp.TCPDirectionReverse, 10, private)
assert.Equal(t, true, drop)

trans := expectTransaction(t, store)
assert.NotNil(t, trans)
assert.Equal(t, trans["notes"], []string{"Packet loss while capturing the response"})
assert.Equal(t, []string{"Packet loss while capturing the response"}, trans["notes"])
}

// Test that loss of data during the request doesn't result in a
Expand All @@ -567,7 +570,7 @@ func Test_gap_in_eat_message(t *testing.T) {
"66726f6d20746573")
assert.Nil(t, err)

stream := &mysqlStream{data: reqData, message: new(mysqlMessage)}
stream := &mysqlStream{data: reqData, message: new(mysqlMessage), isClient: true}
ok, complete := mysqlMessageParser(stream)
assert.Equal(t, true, ok)
assert.Equal(t, false, complete)
Expand Down
Binary file not shown.
23 changes: 23 additions & 0 deletions packetbeat/tests/system/test_0067_mysql_connection_phase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from packetbeat import BaseTest

"""
Tests that the negotiation phase at the beginning of a mysql connection
doesn't leave the parser in a broken state.
"""


class Test(BaseTest):

def test_connection_phase(self):
"""
This tests that requests are still captured from a mysql stream that
starts with the "connection phase" negotiation.
"""
self.render_config_template(
mysql_ports=[3306],
)
self.run_packetbeat(pcap="mysql_connection.pcap")

objs = self.read_output()
assert len(objs) == 1
assert objs[0]['query'] == 'SELECT DATABASE()'