Skip to content

Commit

Permalink
Switch partition metricset from client to broker (elastic#3029)
Browse files Browse the repository at this point in the history
Update kafka broker query

- Switch paritition metricset from client to broker
- on connect try to find the broker id (address must match advertised host).
- check broker is leader before querying offsets
- query offsets for all replicas
- remove 'isr' from event, and replace with boolean flag `insync_replica`
- replace `replicas` from event with per event `replica`-id
- update sarama to get offset per replica id

(cherry picked from commit 4c4889f)
  • Loading branch information
ruflin committed Dec 1, 2016
1 parent 25335ad commit 3d60927
Show file tree
Hide file tree
Showing 15 changed files with 589 additions and 66 deletions.
2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import:
- package: github.com/miekg/dns
version: 5d001d020961ae1c184f9f8152fdc73810481677
- package: github.com/Shopify/sarama
version: fix/sasl-handshake
version: enh/offset-replica-id
repo: https://github.com/urso/sarama
- package: github.com/rcrowley/go-metrics
version: ab2277b1c5d15c3cba104e9cbddbdfc622df5ad8
Expand Down
2 changes: 2 additions & 0 deletions libbeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ services:
expose:
- 9092
- 2181
environment:
- ADVERTISED_HOST=kafka

# Overloading kibana with a simple image as it is not needed here
kibana:
Expand Down
21 changes: 21 additions & 0 deletions metricbeat/_meta/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,27 @@ metricbeat.modules:
#period: 10s
#hosts: ["localhost:9092"]

#client_id: metricbeat

#metadata.retries: 3
#metadata.backoff: 250ms

# List of Topics to query metadata for. If empty, all topics will be queried.
#topics: []

# Optional SSL. By default is off.
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/cert.pem"

# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"

# SASL authentication
#username: ""
#password: ""

#------------------------------- MongoDB Module ------------------------------
#- module: mongodb
Expand Down
59 changes: 57 additions & 2 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1959,15 +1959,70 @@ Oldest offset of the partition.
[float]
=== kafka.partition.partition
== partition Fields
Partition data.
[float]
=== kafka.partition.partition.id
type: long
Partition id.
[float]
=== kafka.partition.topic
=== kafka.partition.partition.leader
type: long
Leader id (broker).
[float]
=== kafka.partition.partition.isr
type: list
List of isr ids.
[float]
=== kafka.partition.partition.replica
type: long
Replica id (broker).
[float]
=== kafka.partition.partition.insync_replica
type: boolean
Indicates if replica is included in the in-sync replicate set (ISR).
[float]
=== kafka.partition.partition.error.code
type: long
Error code from fetching partition.
[float]
=== kafka.partition.topic.error.code
type: long
topic error code.
[float]
=== kafka.partition.topic.name
type: keyword
Expand Down
21 changes: 21 additions & 0 deletions metricbeat/docs/modules/kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,27 @@ metricbeat.modules:
#period: 10s
#hosts: ["localhost:9092"]
#client_id: metricbeat
#metadata.retries: 3
#metadata.backoff: 250ms
# List of Topics to query metadata for. If empty, all topics will be queried.
#topics: []
# Optional SSL. By default is off.
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/cert.pem"
# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"
# SASL authentication
#username: ""
#password: ""
----

[float]
Expand Down
21 changes: 21 additions & 0 deletions metricbeat/metricbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,27 @@ metricbeat.modules:
#period: 10s
#hosts: ["localhost:9092"]

#client_id: metricbeat

#metadata.retries: 3
#metadata.backoff: 250ms

# List of Topics to query metadata for. If empty, all topics will be queried.
#topics: []

# Optional SSL. By default is off.
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/cert.pem"

# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"

# SASL authentication
#username: ""
#password: ""

#------------------------------- MongoDB Module ------------------------------
#- module: mongodb
Expand Down
39 changes: 35 additions & 4 deletions metricbeat/metricbeat.template-es2x.json
Original file line number Diff line number Diff line change
Expand Up @@ -950,12 +950,43 @@
}
},
"partition": {
"type": "long"
"properties": {
"error": {
"properties": {
"code": {
"type": "long"
}
}
},
"id": {
"type": "long"
},
"insync_replica": {
"type": "boolean"
},
"leader": {
"type": "long"
},
"replica": {
"type": "long"
}
}
},
"topic": {
"ignore_above": 1024,
"index": "not_analyzed",
"type": "string"
"properties": {
"error": {
"properties": {
"code": {
"type": "long"
}
}
},
"name": {
"ignore_above": 1024,
"index": "not_analyzed",
"type": "string"
}
}
}
}
}
Expand Down
37 changes: 34 additions & 3 deletions metricbeat/metricbeat.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -957,11 +957,42 @@
}
},
"partition": {
"type": "long"
"properties": {
"error": {
"properties": {
"code": {
"type": "long"
}
}
},
"id": {
"type": "long"
},
"insync_replica": {
"type": "boolean"
},
"leader": {
"type": "long"
},
"replica": {
"type": "long"
}
}
},
"topic": {
"ignore_above": 1024,
"type": "keyword"
"properties": {
"error": {
"properties": {
"code": {
"type": "long"
}
}
},
"name": {
"ignore_above": 1024,
"type": "keyword"
}
}
}
}
}
Expand Down
21 changes: 21 additions & 0 deletions metricbeat/module/kafka/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,24 @@
#period: 10s
#hosts: ["localhost:9092"]

#client_id: metricbeat

#metadata.retries: 3
#metadata.backoff: 250ms

# List of Topics to query metadata for. If empty, all topics will be queried.
#topics: []

# Optional SSL. By default is off.
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/cert.pem"

# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"

# SASL authentication
#username: ""
#password: ""
17 changes: 11 additions & 6 deletions metricbeat/module/kafka/partition/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@
"id": 0
},
"offset": {
"newest": 13,
"newest": 11,
"oldest": 0
},
"partition": 0,
"replicas": [
0
],
"topic": "testtopic"
"partition": {
"error": 0,
"id": 0,
"insync_replica": true,
"leader": 0,
"replica": 0
},
"topic": {
"name": "test-metricbeat-8760238589576171408"
}
}
},
"metricset": {
Expand Down
41 changes: 39 additions & 2 deletions metricbeat/module/kafka/partition/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,49 @@
type: long
description: >
Oldest offset of the partition.
- name: partition
type: group
description: >
Partition data.
fields:
- name: id
type: long
description: >
Partition id.
- name: leader
type: long
description: >
Leader id (broker).
- name: isr
type: list
description: >
List of isr ids.
- name: replica
type: long
description: >
Replica id (broker).
- name: insync_replica
type: boolean
description: >
Indicates if replica is included in the in-sync replicate set (ISR).
- name: error.code
type: long
description: >
Error code from fetching partition.
- name: topic.error.code
type: long
description: >
Partition id.
- name: topic
topic error code.
- name: topic.name
type: keyword
description: >
Topic name
- name: broker.id
type: long
description: >
Expand All @@ -32,3 +67,5 @@
type: keyword
description: >
Broker address
Loading

0 comments on commit 3d60927

Please sign in to comment.