Skip to content

Commit

Permalink
Fix mysql parser handling of connection phase (#8173)
Browse files Browse the repository at this point in the history
The mysql protocol parser assumes that transactions can only be started
by the client. This is true once the connection has been negotiated
("command phase"), but not during initial handshake ("connection phase").

This causes parsing problems when a connection is monitored from the
start, as sometimes the connection phase leaves the parser confused on
which side is client.

This patch modifies how client-side is detected, which can only be done
by looking at the destination port.
  • Loading branch information
adriansr committed Sep 7, 2018
1 parent a84e51a commit 9828935
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Fixed a seccomp related error where the `fcntl64` syscall was not permitted
on 32-bit Linux and the sniffer failed to start. {issue}7839[7839]
- Added missing `cmdline` and `client_cmdline` fields to index template. {pull}8258[8258]
- Fixed the mysql missing transactions if monitoring a connection from the start. {pull}8173[8173]

*Winlogbeat*

Expand Down
39 changes: 20 additions & 19 deletions packetbeat/protos/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,18 @@ func (stream *mysqlStream) prepareForNewMessage() {
stream.data = stream.data[stream.parseOffset:]
stream.parseState = mysqlStateStart
stream.parseOffset = 0
stream.isClient = false
stream.message = nil
}

func (mysql *mysqlPlugin) isServerPort(port uint16) bool {
for _, sPort := range mysql.ports {
if uint16(sPort) == port {
return true
}
}
return false
}

func mysqlMessageParser(s *mysqlStream) (bool, bool) {
logp.Debug("mysqldetailed", "MySQL parser called. parseState = %s", s.parseState)

Expand All @@ -229,27 +237,20 @@ func mysqlMessageParser(s *mysqlStream) (bool, bool) {
m.seq = hdr[3]
m.typ = hdr[4]

logp.Debug("mysqldetailed", "MySQL Header: Packet length %d, Seq %d, Type=%d", m.packetLength, m.seq, m.typ)
logp.Debug("mysqldetailed", "MySQL Header: Packet length %d, Seq %d, Type=%d isClient=%v", m.packetLength, m.seq, m.typ, s.isClient)

if m.seq == 0 {
if s.isClient {
// starts Command Phase

if m.typ == mysqlCmdQuery {
if m.seq == 0 && m.typ == mysqlCmdQuery {
// parse request
m.isRequest = true
m.start = s.parseOffset
s.parseState = mysqlStateEatMessage

} else {
// ignore command
m.ignoreMessage = true
s.parseState = mysqlStateEatMessage
}

if !s.isClient {
s.isClient = true
}

} else if !s.isClient {
// parse response
m.isRequest = false
Expand All @@ -275,11 +276,6 @@ func mysqlMessageParser(s *mysqlStream) (bool, bool) {
m.ignoreMessage = true
s.parseState = mysqlStateEatMessage
}

} else {
// something else, not expected
logp.Debug("mysql", "Unexpected MySQL message of type %d received.", m.typ)
return false, false
}

case mysqlStateEatMessage:
Expand Down Expand Up @@ -500,9 +496,14 @@ func (mysql *mysqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple,
}

if priv.data[dir] == nil {
dstPort := tcptuple.DstPort
if dir == tcp.TCPDirectionReverse {
dstPort = tcptuple.SrcPort
}
priv.data[dir] = &mysqlStream{
data: pkt.Payload,
message: &mysqlMessage{ts: pkt.Ts},
data: pkt.Payload,
message: &mysqlMessage{ts: pkt.Ts},
isClient: mysql.isServerPort(dstPort),
}
} else {
// concatenate bytes
Expand All @@ -521,7 +522,7 @@ func (mysql *mysqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple,
}

ok, complete := mysqlMessageParser(priv.data[dir])
//logp.Debug("mysqldetailed", "mysqlMessageParser returned ok=%b complete=%b", ok, complete)
logp.Debug("mysqldetailed", "mysqlMessageParser returned ok=%v complete=%v", ok, complete)
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
Expand Down
27 changes: 15 additions & 12 deletions packetbeat/protos/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/hex"
"net"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -31,10 +32,11 @@ import (
"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/beats/packetbeat/protos"

"time"
"github.com/elastic/beats/packetbeat/protos/tcp"
)

const serverPort = 3306

type eventStore struct {
events []beat.Event
}
Expand All @@ -55,6 +57,7 @@ func mysqlModForTests(store *eventStore) *mysqlPlugin {

var mysql mysqlPlugin
config := defaultConfig
config.Ports = []int{serverPort}
mysql.init(callback, &config)
return &mysql
}
Expand Down Expand Up @@ -82,7 +85,7 @@ func TestMySQLParser_simpleRequest(t *testing.T) {
t.Errorf("Failed to decode hex string")
}

stream := &mysqlStream{data: message, message: new(mysqlMessage)}
stream := &mysqlStream{data: message, message: new(mysqlMessage), isClient: true}

ok, complete := mysqlMessageParser(stream)

Expand Down Expand Up @@ -364,7 +367,7 @@ func TestParseMySQL_simpleUpdateResponse(t *testing.T) {
countHandleMysql++
}

mysql.Parse(&pkt, &tuple, 1, private)
mysql.Parse(&pkt, &tuple, tcp.TCPDirectionOriginal, private)

if countHandleMysql != 1 {
t.Errorf("handleMysql not called")
Expand Down Expand Up @@ -405,7 +408,7 @@ func TestParseMySQL_threeResponses(t *testing.T) {
countHandleMysql++
}

mysql.Parse(&pkt, &tuple, 1, private)
mysql.Parse(&pkt, &tuple, tcp.TCPDirectionOriginal, private)

if countHandleMysql != 3 {
t.Errorf("handleMysql not called three times")
Expand Down Expand Up @@ -447,7 +450,7 @@ func TestParseMySQL_splitResponse(t *testing.T) {
countHandleMysql++
}

private = mysql.Parse(&pkt, &tuple, 1, private).(mysqlPrivateData)
private = mysql.Parse(&pkt, &tuple, tcp.TCPDirectionOriginal, private).(mysqlPrivateData)
if countHandleMysql != 0 {
t.Errorf("handleMysql called on first run")
}
Expand Down Expand Up @@ -482,7 +485,7 @@ func testTCPTuple() *common.TCPTuple {
IPLength: 4,
BaseTuple: common.BaseTuple{
SrcIP: net.IPv4(192, 168, 0, 1), DstIP: net.IPv4(192, 168, 0, 2),
SrcPort: 6512, DstPort: 3306,
SrcPort: 6512, DstPort: serverPort,
},
}
t.ComputeHashables()
Expand Down Expand Up @@ -540,17 +543,17 @@ func Test_gap_in_response(t *testing.T) {

private := protos.ProtocolData(new(mysqlPrivateData))

private = mysql.Parse(&req, tcptuple, 0, private)
private = mysql.Parse(&resp, tcptuple, 1, private)
private = mysql.Parse(&req, tcptuple, tcp.TCPDirectionOriginal, private)
private = mysql.Parse(&resp, tcptuple, tcp.TCPDirectionReverse, private)

logp.Debug("mysql", "Now sending gap..")

_, drop := mysql.GapInStream(tcptuple, 1, 10, private)
_, drop := mysql.GapInStream(tcptuple, tcp.TCPDirectionReverse, 10, private)
assert.Equal(t, true, drop)

trans := expectTransaction(t, store)
assert.NotNil(t, trans)
assert.Equal(t, trans["notes"], []string{"Packet loss while capturing the response"})
assert.Equal(t, []string{"Packet loss while capturing the response"}, trans["notes"])
}

// Test that loss of data during the request doesn't result in a
Expand All @@ -567,7 +570,7 @@ func Test_gap_in_eat_message(t *testing.T) {
"66726f6d20746573")
assert.Nil(t, err)

stream := &mysqlStream{data: reqData, message: new(mysqlMessage)}
stream := &mysqlStream{data: reqData, message: new(mysqlMessage), isClient: true}
ok, complete := mysqlMessageParser(stream)
assert.Equal(t, true, ok)
assert.Equal(t, false, complete)
Expand Down
Binary file not shown.
23 changes: 23 additions & 0 deletions packetbeat/tests/system/test_0067_mysql_connection_phase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from packetbeat import BaseTest

"""
Tests that the negotiation phase at the beginning of a mysql connection
doesn't leave the parser in a broken state.
"""


class Test(BaseTest):

def test_connection_phase(self):
"""
This tests that requests are still captured from a mysql stream that
starts with the "connection phase" negotiation.
"""
self.render_config_template(
mysql_ports=[3306],
)
self.run_packetbeat(pcap="mysql_connection.pcap")

objs = self.read_output()
assert len(objs) == 1
assert objs[0]['query'] == 'SELECT DATABASE()'

0 comments on commit 9828935

Please sign in to comment.