Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rabbitmq queue metricset message rates #6606

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di

*Metricbeat*

- Add connections metricset to RabbitMQ module {pull}6548[6548]
- Change field type of http header from nested to object {pull}5258[5258]
- Fix the fetching of process information when some data is missing under MacOS X. {issue}5337[5337]
- Change `MySQL active connections` visualization title to `MySQL total connections`. {issue}4812[4812]
Expand Down Expand Up @@ -256,6 +255,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Making the MongoDB module GA. {pull}6554[6554]
- Allow to disable labels `dedot` in Docker module, in favor of a safe way to keep dots. {pull}6490[6490]
- Add experimental module to collect metrics from munin nodes. {pull}6517[6517]
- Add connections metricset to RabbitMQ module {pull}6548[6548]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line should be removed for this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can remove it, but currently this line is in the wrong section - Bugfixes, while it should be in Added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left the line in the change log, let me know if I should revert it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just realising now that this log line has been moved and not added.

+1 on moving it to the right section.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this line added on purpose? Was it missing in a previews PR? If yes, the PR number seems to be off.

- Add message rates to the RabbitMQ queue metricset {issue}6442[6442] {pull}6606[6606]

*Packetbeat*

Expand Down
5 changes: 5 additions & 0 deletions libbeat/common/schema/mapstriface/mapstriface.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ func Bool(key string, opts ...schema.SchemaOption) schema.Conv {
return schema.SetOptions(schema.Conv{Key: key, Func: toBool}, opts)
}

// Float creates a Conv object for parsing floats
func Float(key string, opts ...schema.SchemaOption) schema.Conv {
return schema.SetOptions(schema.Conv{Key: key, Func: toInteger}, opts)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You use the toInteger function here which I think does not return the expected result?

}

func toInteger(key string, data map[string]interface{}) (interface{}, error) {
emptyIface, exists := data[key]
if !exists {
Expand Down
24 changes: 24 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9230,6 +9230,14 @@ type: long
Sum of ready and unacknowledged messages (queue depth).


[float]
=== `rabbitmq.queue.messages.total.details.rate`

type: float

How much the queue depth has changed per second in the most recent sampling interval.


[float]
=== `rabbitmq.queue.messages.ready.count`

Expand All @@ -9238,6 +9246,14 @@ type: long
Number of messages ready to be delivered to clients.


[float]
=== `rabbitmq.queue.messages.ready.details.rate`

type: float

How much the count of messages ready has changed per second in the most recent sampling interval.


[float]
=== `rabbitmq.queue.messages.unacknowledged.count`

Expand All @@ -9246,6 +9262,14 @@ type: long
Number of messages delivered to clients but not yet acknowledged.


[float]
=== `rabbitmq.queue.messages.unacknowledged.details.rate`

type: float

How much the count of unacknowledged messages has changed per second in the most recent sampling interval.


[float]
=== `rabbitmq.queue.messages.persistent.count`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@
},
"messages": 74,
"messages_details": {
"rate": 0
"rate": 2.2
},
"messages_ready": 71,
"messages_ready_details": {
"rate": 0
},
"messages_unacknowledged": 3,
"messages_unacknowledged_details": {
"rate": 0
"rate": 0.5
},
"idle_since": "2017-07-28 23:45:52",
"consumer_utilisation": 0.7,
Expand Down
15 changes: 12 additions & 3 deletions metricbeat/module/rabbitmq/queue/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,22 @@
"state": "running",
"messages": {
"total": {
"count": 0
"count": 0,
"details": {
"rate": 2.2
}
},
"ready": {
"count": 0
"count": 0,
"details": {
"rate": 0
}
},
"unacknowledged": {
"count": 0
"count": 0,
"details": {
"rate": 0.5
}
},
"persistent": {
"count": 0
Expand Down
12 changes: 12 additions & 0 deletions metricbeat/module/rabbitmq/queue/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,26 @@
type: long
description: >
Sum of ready and unacknowledged messages (queue depth).
- name: messages.total.details.rate
type: float
description: >
How much the queue depth has changed per second in the most recent sampling interval.
- name: messages.ready.count
type: long
description: >
Number of messages ready to be delivered to clients.
- name: messages.ready.details.rate
type: float
description: >
How much the count of messages ready has changed per second in the most recent sampling interval.
- name: messages.unacknowledged.count
type: long
description: >
Number of messages delivered to clients but not yet acknowledged.
- name: messages.unacknowledged.details.rate
type: float
description: >
How much the count of unacknowledged messages has changed per second in the most recent sampling interval.
- name: messages.persistent.count
type: long
description: >
Expand Down
9 changes: 9 additions & 0 deletions metricbeat/module/rabbitmq/queue/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,21 @@ var (
"messages": s.Object{
"total": s.Object{
"count": c.Int("messages"),
"details": c.Dict("messages_details", s.Schema{
"rate": c.Float("rate"),
}),
},
"ready": s.Object{
"count": c.Int("messages_ready"),
"details": c.Dict("messages_ready_details", s.Schema{
"rate": c.Float("rate"),
}),
},
"unacknowledged": s.Object{
"count": c.Int("messages_unacknowledged"),
"details": c.Dict("messages_unacknowledged_details", s.Schema{
"rate": c.Float("rate"),
}),
},
"persistent": s.Object{
"count": c.Int("messages_persistent"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ func TestFetchEventContents(t *testing.T) {
assert.EqualValues(t, 3, unacknowledged["count"])
assert.EqualValues(t, 73, persistent["count"])

totalDetails := total["details"].(common.MapStr)
assert.EqualValues(t, 2.2, totalDetails["rate"])

readyDetails := ready["details"].(common.MapStr)
assert.EqualValues(t, 0, readyDetails["rate"])

unacknowledgedDetails := unacknowledged["details"].(common.MapStr)
assert.EqualValues(t, 0.5, unacknowledgedDetails["rate"])

disk := event["disk"].(common.MapStr)
reads := disk["reads"].(common.MapStr)
writes := disk["writes"].(common.MapStr)
Expand Down