Skip to content

Commit

Permalink
Merge pull request #700 from axw/sql-rows-affected
Browse files Browse the repository at this point in the history
module/apmsql: report rows affected
  • Loading branch information
axw committed Jan 10, 2020
2 parents 4b3f304 + 86669ee commit 95fed16
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ endif::[]
https://github.com/elastic/apm-agent-go/compare/v1.6.0...master[View commits]
- Add support for API Key auth {pull}698[(#698)]
- module/apmsql: report rows affected {pull}700[(#700)]
[[release-notes-1.x]]
=== Go Agent version 1.x
Expand Down
3 changes: 3 additions & 0 deletions internal/apmschema/jsonschema/context.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@
"service": {
"description": "Service related information can be sent per event. Provided information will override the more generic information from metadata, non provided fields will be set according to the metadata information.",
"$ref": "service.json"
},
"message": {
"$ref": "message.json"
}
}
}
43 changes: 43 additions & 0 deletions internal/apmschema/jsonschema/message.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"$id": "doc/spec/message.json",
"title": "Message",
"description": "Details related to message receiving and publishing if the captured event integrates with a messaging system",
"type": ["object", "null"],
"properties": {
"queue": {
"type": ["object", "null"],
"properties": {
"name": {
"description": "Name of the message queue where the message is received.",
"type": ["string","null"],
"maxLength": 1024
}
}
},
"age": {
"type": ["object", "null"],
"properties": {
"ms": {
"description": "The age of the message in milliseconds. If the instrumented messaging framework provides a timestamp for the message, agents may use it. Otherwise, the sending agent can add a timestamp in milliseconds since the Unix epoch to the message's metadata to be retrieved by the receiving agent. If a timestamp is not available, agents should omit this field.",
"type": ["integer", "null"]
}
}
},
"body": {
"description": "messsage body, similar to an http request body",
"type": ["string", "null"]
},
"headers": {
"description": "messsage headers, similar to http request headers",
"type": ["object", "null"],
"patternProperties": {
"[.*]*$": {
"type": ["string", "array", "null"],
"items": {
"type": ["string"]
}
}
}
}
}
}
9 changes: 8 additions & 1 deletion internal/apmschema/jsonschema/spans/span.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
"maxLength": 1024
}
},
"required": ["type", "name", "resource"]
"required": ["type", "name", "resource"]
}
}
},
Expand Down Expand Up @@ -102,6 +102,10 @@
"user": {
"type": ["string", "null"],
"description": "Username for accessing database"
},
"rows_affected": {
"type": ["integer", "null"],
"description": "Number of rows affected by the SQL statement (if applicable)"
}
}
},
Expand Down Expand Up @@ -170,6 +174,9 @@
"maxLength": 1024
}
}
},
"message": {
"$ref": "../message.json"
}
}
},
Expand Down
11 changes: 9 additions & 2 deletions internal/apmschema/jsonschema/stacktrace_frame.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
},
"filename": {
"description": "The relative filename of the code involved in the stack frame, used e.g. to do error checksumming",
"type": "string"
"type": ["string", "null"]
},
"classname": {
"description": "The classname of the code involved in the stack frame",
"type": ["string", "null"]
},
"function": {
"description": "The function involved in the stack frame",
Expand Down Expand Up @@ -58,5 +62,8 @@
"properties": {}
}
},
"required": ["filename"]
"anyOf": [
{ "required": ["filename"], "properties": {"filename": { "type": "string" }} },
{ "required": ["classname"], "properties": {"classname": { "type": "string" }} }
]
}
1 change: 1 addition & 0 deletions internal/apmschema/update.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ FILES=( \
"metricsets/metricset.json" \
"metricsets/sample.json" \
"context.json" \
"message.json" \
"metadata.json" \
"process.json" \
"request.json" \
Expand Down
10 changes: 10 additions & 0 deletions model/marshal_fastjson.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@ type DatabaseSpanContext struct {
// Statement holds the database statement (e.g. query).
Statement string `json:"statement,omitempty"`

// RowsAffected holds the number of rows affected by the
// database operation.
RowsAffected *int64 `json:"rows_affected,omitempty"`

// Type holds the database type. For any SQL database,
// this should be "sql"; for others, the lower-cased
// database category, e.g. "cassandra", "hbase", "redis".
Expand Down
47 changes: 47 additions & 0 deletions module/apmsql/apmsql_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,50 @@ func benchmarkQueries(b *testing.B, ctx context.Context, stmt *sql.Stmt) {
rows.Close()
}
}

func BenchmarkStmtExecContext(b *testing.B) {
db, err := apmsql.Open("sqlite3", ":memory:")
require.NoError(b, err)
defer db.Close()

_, err = db.Exec("CREATE TABLE foo (bar INT)")
require.NoError(b, err)

stmt, err := db.Prepare("DELETE FROM foo")
require.NoError(b, err)
defer stmt.Close()

b.Run("baseline", func(b *testing.B) {
benchmarkExec(b, context.Background(), stmt)
})
b.Run("instrumented", func(b *testing.B) {
invalidServerURL, err := url.Parse("http://testing.invalid:8200")
if err != nil {
panic(err)
}
httpTransport, err := transport.NewHTTPTransport()
require.NoError(b, err)
httpTransport.SetServerURL(invalidServerURL)

tracer, err := apm.NewTracerOptions(apm.TracerOptions{
ServiceName: "apmhttp_test",
ServiceVersion: "0.1",
Transport: httpTransport,
})
require.NoError(b, err)
defer tracer.Close()

tracer.SetMaxSpans(b.N)
tx := tracer.StartTransaction("name", "type")
ctx := apm.ContextWithTransaction(context.Background(), tx)
benchmarkExec(b, ctx, stmt)
})
}

func benchmarkExec(b *testing.B, ctx context.Context, stmt *sql.Stmt) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := stmt.ExecContext(ctx)
require.NoError(b, err)
}
}
54 changes: 53 additions & 1 deletion module/apmsql/apmsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,67 @@ func TestExecContext(t *testing.T) {
defer db.Close()

db.Ping() // connect
const N = 5
_, spans, _ := apmtest.WithTransaction(func(ctx context.Context) {
_, err := db.ExecContext(ctx, "CREATE TABLE foo (bar INT)")
require.NoError(t, err)
for i := 0; i < N; i++ {
result, err := db.ExecContext(ctx, "INSERT INTO foo VALUES (?)", i)
require.NoError(t, err)

rowsAffected, err := result.RowsAffected()
require.NoError(t, err)
assert.Equal(t, int64(1), rowsAffected)
}
result, err := db.ExecContext(ctx, "DELETE FROM foo")
require.NoError(t, err)
rowsAffected, err := result.RowsAffected()
require.NoError(t, err)
assert.Equal(t, int64(N), rowsAffected)
})
require.Len(t, spans, 1)
require.Len(t, spans, 2+N)

int64ptr := func(n int64) *int64 { return &n }

assert.Equal(t, "CREATE", spans[0].Name)
assert.Equal(t, "db", spans[0].Type)
assert.Equal(t, "sqlite3", spans[0].Subtype)
assert.Equal(t, "exec", spans[0].Action)
assert.Equal(t, &model.SpanContext{
Database: &model.DatabaseSpanContext{
Instance: ":memory:",
// Ideally RowsAffected would not be returned for DDL
// statements, but this is on the driver; it should
// be returning database/sql/driver.ResultNoRows for
// DDL statements, in which case we'll omit this.
RowsAffected: int64ptr(0),
Statement: "CREATE TABLE foo (bar INT)",
Type: "sql",
},
}, spans[0].Context)

for i := 0; i < N; i++ {
span := spans[i+1]
assert.Equal(t, "INSERT INTO foo", span.Name)
assert.Equal(t, &model.SpanContext{
Database: &model.DatabaseSpanContext{
Instance: ":memory:",
RowsAffected: int64ptr(1),
Statement: "INSERT INTO foo VALUES (?)",
Type: "sql",
},
}, span.Context)
}

assert.Equal(t, "DELETE FROM foo", spans[N+1].Name)
assert.Equal(t, &model.SpanContext{
Database: &model.DatabaseSpanContext{
Instance: ":memory:",
RowsAffected: int64ptr(N),
Statement: "DELETE FROM foo",
Type: "sql",
},
}, spans[N+1].Context)
}

func TestQueryContext(t *testing.T) {
Expand Down
21 changes: 14 additions & 7 deletions module/apmsql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (c *conn) startSpan(ctx context.Context, name, spanType, stmt string) (*apm
return span, ctx
}

func (c *conn) finishSpan(ctx context.Context, span *apm.Span, resultError *error) {
func (c *conn) finishSpan(ctx context.Context, span *apm.Span, result *driver.Result, resultError *error) {
if *resultError == driver.ErrSkip {
// TODO(axw) mark span as abandoned,
// so it's not sent and not counted
Expand All @@ -93,7 +93,14 @@ func (c *conn) finishSpan(ctx context.Context, span *apm.Span, resultError *erro
return
}
switch *resultError {
case nil, driver.ErrBadConn, context.Canceled:
case nil:
if !span.Dropped() && result != nil && *result != nil && *result != driver.ResultNoRows {
rowsAffected, err := (*result).RowsAffected()
if err == nil && rowsAffected >= 0 {
span.Context.SetDatabaseRowsAffected(rowsAffected)
}
}
case driver.ErrBadConn, context.Canceled:
// ErrBadConn is used by the connection pooling
// logic in database/sql, and so is expected and
// should not be reported.
Expand All @@ -113,7 +120,7 @@ func (c *conn) Ping(ctx context.Context) (resultError error) {
return nil
}
span, ctx := c.startSpan(ctx, "ping", c.driver.pingSpanType, "")
defer c.finishSpan(ctx, span, &resultError)
defer c.finishSpan(ctx, span, nil, &resultError)
return c.pinger.Ping(ctx)
}

Expand All @@ -122,7 +129,7 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
return nil, driver.ErrSkip
}
span, ctx := c.startStmtSpan(ctx, query, c.driver.querySpanType)
defer c.finishSpan(ctx, span, &resultError)
defer c.finishSpan(ctx, span, nil, &resultError)

if c.queryerContext != nil {
return c.queryerContext.QueryContext(ctx, query, args)
Expand All @@ -145,7 +152,7 @@ func (*conn) Query(query string, args []driver.Value) (driver.Rows, error) {

func (c *conn) PrepareContext(ctx context.Context, query string) (_ driver.Stmt, resultError error) {
span, ctx := c.startStmtSpan(ctx, query, c.driver.prepareSpanType)
defer c.finishSpan(ctx, span, &resultError)
defer c.finishSpan(ctx, span, nil, &resultError)
var stmt driver.Stmt
var err error
if c.connPrepareContext != nil {
Expand All @@ -167,12 +174,12 @@ func (c *conn) PrepareContext(ctx context.Context, query string) (_ driver.Stmt,
return stmt, err
}

func (c *conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (_ driver.Result, resultError error) {
func (c *conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (result driver.Result, resultError error) {
if c.execerContext == nil && c.execer == nil {
return nil, driver.ErrSkip
}
span, ctx := c.startStmtSpan(ctx, query, c.driver.execSpanType)
defer c.finishSpan(ctx, span, &resultError)
defer c.finishSpan(ctx, span, &result, &resultError)

if c.execerContext != nil {
return c.execerContext.ExecContext(ctx, query, args)
Expand Down
6 changes: 3 additions & 3 deletions module/apmsql/stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func (s *stmt) ColumnConverter(idx int) driver.ValueConverter {
return driver.DefaultParameterConverter
}

func (s *stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (_ driver.Result, resultError error) {
func (s *stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (result driver.Result, resultError error) {
span, ctx := s.startSpan(ctx, s.conn.driver.execSpanType)
defer s.conn.finishSpan(ctx, span, &resultError)
defer s.conn.finishSpan(ctx, span, &result, &resultError)
if s.stmtExecContext != nil {
return s.stmtExecContext.ExecContext(ctx, args)
}
Expand All @@ -84,7 +84,7 @@ func (s *stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (_ dri

func (s *stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (_ driver.Rows, resultError error) {
span, ctx := s.startSpan(ctx, s.conn.driver.querySpanType)
defer s.conn.finishSpan(ctx, span, &resultError)
defer s.conn.finishSpan(ctx, span, nil, &resultError)
if s.stmtQueryContext != nil {
return s.stmtQueryContext.QueryContext(ctx, args)
}
Expand Down
18 changes: 13 additions & 5 deletions spancontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ import (

// SpanContext provides methods for setting span context.
type SpanContext struct {
model model.SpanContext
destination model.DestinationSpanContext
destinationService model.DestinationServiceSpanContext
database model.DatabaseSpanContext
http model.HTTPSpanContext
model model.SpanContext
destination model.DestinationSpanContext
destinationService model.DestinationServiceSpanContext
databaseRowsAffected int64
database model.DatabaseSpanContext
http model.HTTPSpanContext
}

// DatabaseSpanContext holds database span context.
Expand Down Expand Up @@ -120,6 +121,13 @@ func (c *SpanContext) SetDatabase(db DatabaseSpanContext) {
c.model.Database = &c.database
}

// SetDatabaseRowsAffected records the number of rows affected by
// a database operation.
func (c *SpanContext) SetDatabaseRowsAffected(n int64) {
c.databaseRowsAffected = n
c.database.RowsAffected = &c.databaseRowsAffected
}

// SetHTTPRequest sets the details of the HTTP request in the context.
//
// This function relates to client requests. If the request URL contains
Expand Down

0 comments on commit 95fed16

Please sign in to comment.