Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opentelemetry-instrumentation-kafka-python: wait for metadata #1260

Merged
merged 20 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c5a026f
fix kafka: wait for metadata
rayrapetyan Sep 1, 2022
2733cd8
upd: changelog
rayrapetyan Sep 1, 2022
e60c6a4
fix: changelog
rayrapetyan Sep 1, 2022
bfd272c
Merge branch 'main' into fix_kafka_wait_metadata
srikanthccv Sep 13, 2022
6bef7c4
fix: import KafkaErrors
rayrapetyan Sep 14, 2022
4cf76da
Merge branch 'main' into fix_kafka_wait_metadata
rayrapetyan Sep 14, 2022
38b9bbf
Merge branch 'main' into fix_kafka_wait_metadata
srikanthccv Sep 15, 2022
37b772d
fix: tox -e lint errors
rayrapetyan Sep 15, 2022
39b38bb
Merge branch 'fix_kafka_wait_metadata' of gitpro.ttaallkk.top-rayrapetyan:rayr…
rayrapetyan Sep 15, 2022
79a7c63
Merge branch 'open-telemetry:main' into fix_kafka_wait_metadata
rayrapetyan Sep 15, 2022
df3fe62
Merge branch 'main' into fix_kafka_wait_metadata
lzchen Sep 20, 2022
23b117e
Merge branch 'main' into fix_kafka_wait_metadata
srikanthccv Sep 20, 2022
9f442ec
Merge branch 'main' into fix_kafka_wait_metadata
rayrapetyan Sep 25, 2022
1a149c4
fix: refact and added unit test
rayrapetyan Sep 25, 2022
02f8b7a
Merge branch 'main' into fix_kafka_wait_metadata
srikanthccv Oct 1, 2022
6f69e94
Merge branch 'main' into fix_kafka_wait_metadata
srikanthccv Oct 5, 2022
42bbdf6
Merge branch 'main' into fix_kafka_wait_metadata
rayrapetyan Oct 10, 2022
25b4fb6
Merge branch 'main' into fix_kafka_wait_metadata
srikanthccv Oct 10, 2022
2575b24
Merge branch 'main' into fix_kafka_wait_metadata
srikanthccv Oct 11, 2022
bd08811
Merge branch 'main' into fix_kafka_wait_metadata
ocelotl Nov 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- `opentelemetry-instrumentation-kafka-python`: wait for metadata
([#1260](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1260))
- `opentelemetry-instrumentation-boto3sqs` Make propagation compatible with other SQS instrumentations, add 'messaging.url' span attribute, and fix missing package dependencies.
([#1234](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1234))
- `opentelemetry-instrumentation-pymongo` Change span names to not contain queries but only database name and command name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ def extract_send_partition(instance, args, kwargs):
):
return None

all_partitions = instance._metadata.partitions_for_topic(topic)
if all_partitions is None or len(all_partitions) == 0:
return None
instance._wait_on_metadata(
topic, instance.config["max_block_ms"] / 1000.0
)

return instance._partition(
topic, partition, key, value, key_bytes, value_bytes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from unittest import TestCase, mock

from opentelemetry.instrumentation.kafka.utils import (
KafkaPropertiesExtractor,
_create_consumer_span,
_get_span_name,
_kafka_getter,
Expand Down Expand Up @@ -208,3 +209,28 @@ def test_create_consumer_span(
span, record, self.args, self.kwargs
)
detach.assert_called_once_with(attach.return_value)

@mock.patch(
"opentelemetry.instrumentation.kafka.utils.KafkaPropertiesExtractor"
)
def test_kafka_properties_extractor(
self,
kafka_properties_extractor: mock.MagicMock,
):
kafka_properties_extractor._serialize.return_value = None
kafka_properties_extractor._partition.return_value = "partition"
assert (
KafkaPropertiesExtractor.extract_send_partition(
kafka_properties_extractor, self.args, self.kwargs
)
== "partition"
)
kafka_properties_extractor._wait_on_metadata.side_effect = Exception(
"mocked error"
)
assert (
KafkaPropertiesExtractor.extract_send_partition(
kafka_properties_extractor, self.args, self.kwargs
)
is None
)