Skip to content

Commit

Permalink
Switch paritition metricset from client to broker
Browse files Browse the repository at this point in the history
Update kafka broker query

- on connect try to find the broker id (address must match advertised host).
- check broker is leader before querying offsets
- query offsets for all replicas
- remove 'isr' from event, and replace with boolean flag `insync_replica`
- replace `replicas` from event with per event `replica`-id
- update sarama to get offset per replica id
  • Loading branch information
ruflin committed Nov 30, 2016
1 parent 7ca7848 commit 03cb921
Show file tree
Hide file tree
Showing 10 changed files with 368 additions and 68 deletions.
2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import:
- package: github.com/miekg/dns
version: 5d001d020961ae1c184f9f8152fdc73810481677
- package: github.com/Shopify/sarama
version: fix/sasl-handshake
version: enh/offset-replica-id
repo: https://github.com/urso/sarama
- package: github.com/rcrowley/go-metrics
version: ab2277b1c5d15c3cba104e9cbddbdfc622df5ad8
Expand Down
2 changes: 2 additions & 0 deletions libbeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ services:
expose:
- 9092
- 2181
environment:
- ADVERTISED_HOST=kafka

# Overloading kibana with a simple image as it is not needed here
kibana:
Expand Down
67 changes: 65 additions & 2 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1959,15 +1959,78 @@ Oldest offset of the partition.
[float]
=== kafka.partition.partition
=== kafka.partition.offset.error
type: long
Error code from fetching offset.
[float]
== partition Fields
Partition data.
[float]
=== kafka.partition.partition.id
type: long
Partition id.
[float]
=== kafka.partition.topic
=== kafka.partition.partition.leader
type: long
Leader id (broker).
[float]
=== kafka.partition.partition.isr
type: list
List of isr ids.
[float]
=== kafka.partition.partition.replica
type: long
Replica id (broker).
[float]
=== kafka.partition.partition.insync_replica
type: boolean
Indicates if replica is included in the in-sync replicate set (ISR).
[float]
=== kafka.partition.partition.error
type: long
Error code from fetching partition.
[float]
=== kafka.partition.topic.error
type: long
topic error.
[float]
=== kafka.partition.topic.name
type: keyword
Expand Down
34 changes: 30 additions & 4 deletions metricbeat/metricbeat.template-es2x.json
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,9 @@
},
"offset": {
"properties": {
"error": {
"type": "long"
},
"newest": {
"type": "long"
},
Expand All @@ -950,12 +953,35 @@
}
},
"partition": {
"type": "long"
"properties": {
"error": {
"type": "long"
},
"id": {
"type": "long"
},
"insync_replica": {
"type": "boolean"
},
"leader": {
"type": "long"
},
"replica": {
"type": "long"
}
}
},
"topic": {
"ignore_above": 1024,
"index": "not_analyzed",
"type": "string"
"properties": {
"error": {
"type": "long"
},
"name": {
"ignore_above": 1024,
"index": "not_analyzed",
"type": "string"
}
}
}
}
}
Expand Down
32 changes: 29 additions & 3 deletions metricbeat/metricbeat.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,9 @@
},
"offset": {
"properties": {
"error": {
"type": "long"
},
"newest": {
"type": "long"
},
Expand All @@ -957,11 +960,34 @@
}
},
"partition": {
"type": "long"
"properties": {
"error": {
"type": "long"
},
"id": {
"type": "long"
},
"insync_replica": {
"type": "boolean"
},
"leader": {
"type": "long"
},
"replica": {
"type": "long"
}
}
},
"topic": {
"ignore_above": 1024,
"type": "keyword"
"properties": {
"error": {
"type": "long"
},
"name": {
"ignore_above": 1024,
"type": "keyword"
}
}
}
}
}
Expand Down
17 changes: 11 additions & 6 deletions metricbeat/module/kafka/partition/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@
"id": 0
},
"offset": {
"newest": 13,
"newest": 11,
"oldest": 0
},
"partition": 0,
"replicas": [
0
],
"topic": "testtopic"
"partition": {
"error": 0,
"id": 0,
"insync_replica": true,
"leader": 0,
"replica": 0
},
"topic": {
"name": "test-metricbeat-8760238589576171408"
}
}
},
"metricset": {
Expand Down
45 changes: 43 additions & 2 deletions metricbeat/module/kafka/partition/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,53 @@
type: long
description: >
Oldest offset of the partition.
- name: error
type: long
description: >
Error code from fetching offset.
- name: partition
type: group
description: >
Partition data.
fields:
- name: id
type: long
description: >
Partition id.
- name: leader
type: long
description: >
Leader id (broker).
- name: isr
type: list
description: >
List of isr ids.
- name: replica
type: long
description: >
Replica id (broker).
- name: insync_replica
type: boolean
description: >
Indicates if replica is included in the in-sync replicate set (ISR).
- name: error
type: long
description: >
Error code from fetching partition.
- name: topic.error
type: long
description: >
Partition id.
- name: topic
topic error.
- name: topic.name
type: keyword
description: >
Topic name
- name: broker.id
type: long
description: >
Expand All @@ -32,3 +71,5 @@
type: keyword
description: >
Broker address
Loading

0 comments on commit 03cb921

Please sign in to comment.