Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] add fixed backoff retry mode #21884

Merged
merged 10 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

### Features Added

- Added support for fixed (linear) retry backoff:
- Sync/async `EventHubProducerClient` and `EventHubConsumerClient` constructors and `from_connection_string` take `retry_mode` as a keyword argument.
- `RetryMode` enum has been added to `azure.eventhub`, with values `Fixed` and `Exponential`.

### Breaking Changes

### Bugs Fixed
Expand Down
4 changes: 3 additions & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
parse_connection_string,
EventHubConnectionStringProperties
)
from ._retry import RetryMode

TransportType = constants.TransportType

Expand All @@ -33,5 +34,6 @@
"LoadBalancingStrategy",
"PartitionContext",
"parse_connection_string",
"EventHubConnectionStringProperties"
"EventHubConnectionStringProperties",
"RetryMode"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There're API changes, remember to get @annatisch to take a look at the API view before CC day :)

]
14 changes: 11 additions & 3 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from .exceptions import _handle_exception, ClientClosedError, ConnectError
from ._configuration import Configuration
from ._retry import RetryMode
from ._utils import utc_from_timestamp, parse_sas_credential
from ._connection_manager import get_connection_manager
from ._constants import (
Expand Down Expand Up @@ -154,6 +155,13 @@ def _build_uri(address, entity):
address += "/" + str(entity)
return address

def _get_backoff_time(retry_mode, backoff_factor, retried_times):
if retry_mode == RetryMode.Fixed:
swathipil marked this conversation as resolved.
Show resolved Hide resolved
backoff_value = backoff_factor
else:
backoff_value = backoff_factor * (2 ** retried_times)
return backoff_value
swathipil marked this conversation as resolved.
Show resolved Hide resolved


class EventHubSharedKeyCredential(object):
"""The shared access key credential used for authentication.
Expand Down Expand Up @@ -330,7 +338,7 @@ def _backoff(
):
# type: (int, Exception, Optional[int], Optional[str]) -> None
entity_name = entity_name or self._container_id
backoff = self._config.backoff_factor * 2 ** retried_times
backoff = _get_backoff_time(self._config.retry_mode, self._config.backoff_factor, retried_times)
if backoff <= self._config.backoff_max and (
timeout_time is None or time.time() + backoff <= timeout_time
): # pylint:disable=no-else-return
Expand Down Expand Up @@ -358,9 +366,9 @@ def _management_request(self, mgmt_msg, op_type):
self._mgmt_target, auth=mgmt_auth, debug=self._config.network_tracing
)
try:
conn = self._conn_manager.get_connection(
conn = self._conn_manager.get_connection( # pylint:disable=assignment-from-none
self._address.hostname, mgmt_auth
) # pylint:disable=assignment-from-none
)
mgmt_client.open(connection=conn)
mgmt_msg.application_properties["security_token"] = mgmt_auth.token
response = mgmt_client.mgmt_request(
Expand Down
3 changes: 2 additions & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def __init__(self, body=None):
# Internal usage only for transforming AmqpAnnotatedMessage to outgoing EventData
self._raw_amqp_message = AmqpAnnotatedMessage( # type: ignore
data_body=body, annotations={}, application_properties={}
)
)
self.message = (self._raw_amqp_message._message) # pylint:disable=protected-access
self._raw_amqp_message.header = AmqpMessageHeader()
self._raw_amqp_message.properties = AmqpMessageProperties()
Expand Down Expand Up @@ -181,6 +181,7 @@ def _from_message(cls, message, raw_amqp_message=None):
"""
event_data = cls(body="")
event_data.message = message
# pylint: disable=protected-access
event_data._raw_amqp_message = raw_amqp_message if raw_amqp_message else AmqpAnnotatedMessage(message=message)
return event_data

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
from urllib.parse import urlparse

from uamqp.constants import TransportType, DEFAULT_AMQPS_PORT, DEFAULT_AMQP_WSS_PORT
from ._retry import RetryMode


class Configuration(object): # pylint:disable=too-many-instance-attributes
def __init__(self, **kwargs):
self.user_agent = kwargs.get("user_agent") # type: Optional[str]
self.retry_total = kwargs.get("retry_total", 3) # type: int
self.max_retries = self.retry_total # type: int
self.retry_mode = kwargs.get("retry_mode", RetryMode.Exponential)
self.backoff_factor = kwargs.get("retry_backoff_factor", 0.8) # type: float
self.backoff_max = kwargs.get("retry_backoff_max", 120) # type: int
self.network_tracing = kwargs.get("network_tracing", False) # type: bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
# --------------------------------------------------------------------------------------------

from typing import TYPE_CHECKING
from threading import Lock
from enum import Enum

from uamqp import Connection, TransportType, c_uamqp
from uamqp import Connection

if TYPE_CHECKING:
from uamqp.authentication import JWTTokenAuth
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class EventHubConsumerClient(ClientBase):
The failed internal partition consumer will be closed (`on_partition_close` will be called if provided) and
new internal partition consumer will be created (`on_partition_initialize` will be called if provided) to resume
receiving.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
swathipil marked this conversation as resolved.
Show resolved Hide resolved
:paramtype retry_mode: ~azure.eventhub.RetryMode
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no further activity. By default the value is None, meaning that the client will not shutdown due to
inactivity unless initiated by the service.
Expand Down Expand Up @@ -219,6 +221,8 @@ def from_connection_string(cls, conn_str, consumer_group, **kwargs):
information. The failed internal partition consumer will be closed (`on_partition_close` will be called
if provided) and new internal partition consumer will be created (`on_partition_initialize` will be called if
provided) to resume receiving.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no furthur activity. By default the value is None, meaning that the client will not shutdown due
to inactivity unless initiated by the service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class EventHubProducerClient(ClientBase):
:keyword str user_agent: If specified, this will be added in front of the user agent string.
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. Default
value is 3.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no activity. By default the value is None, meaning that the client will not shutdown due to inactivity
unless initiated by the service.
Expand Down Expand Up @@ -178,6 +180,8 @@ def from_connection_string(cls, conn_str, **kwargs):
:keyword str user_agent: If specified, this will be added in front of the user agent string.
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs.
Default value is 3.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no activity. By default the value is None, meaning that the client will not shutdown due to
inactivity unless initiated by the service.
Expand Down
13 changes: 13 additions & 0 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Optional, Dict, Any

from enum import Enum

# INCLUDE?: _LOGGER = logging.getLogger(__name__)
swathipil marked this conversation as resolved.
Show resolved Hide resolved

class RetryMode(str, Enum):
Exponential = 'exponential'
Fixed = 'fixed'
swathipil marked this conversation as resolved.
Show resolved Hide resolved
14 changes: 7 additions & 7 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import datetime
import calendar
import logging
from typing import TYPE_CHECKING, Type, Optional, Dict, Union, Any, Iterable, Tuple, List, Mapping
from typing import TYPE_CHECKING, Type, Optional, Dict, Union, Any, Iterable, Tuple, Mapping

import six

Expand Down Expand Up @@ -172,9 +172,9 @@ def trace_message(event, parent_span=None):


def get_event_links(events):
trace_events = (
events if isinstance(events, Iterable) else (events,)
) # pylint:disable=isinstance-second-argument-not-valid-type
trace_events = (
events if isinstance(events, Iterable) else (events,) # pylint:disable=isinstance-second-argument-not-valid-type
)
links = []
try:
for event in trace_events: # type: ignore
Expand Down Expand Up @@ -309,14 +309,14 @@ def decode_with_recurse(data, encoding="UTF-8"):
return data
if isinstance(data, six.binary_type):
return data.decode(encoding)
if isinstance(data, Mapping):
if isinstance(data, Mapping): # pylint:disable=isinstance-second-argument-not-valid-type
decoded_mapping = {}
for k,v in data.items():
for k, v in data.items():
decoded_key = decode_with_recurse(k, encoding)
decoded_val = decode_with_recurse(v, encoding)
decoded_mapping[decoded_key] = decoded_val
return decoded_mapping
if isinstance(data, Iterable):
if isinstance(data, Iterable): # pylint:disable=isinstance-second-argument-not-valid-type
decoded_list = []
for d in data:
decoded_list.append(decode_with_recurse(d, encoding))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#--------------------------------------------------------------------------

import sys
import asyncio

def get_dict_with_loop_if_needed(loop):
if sys.version_info >= (3, 10):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)
from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential

from .._client_base import ClientBase, _generate_sas_token, _parse_conn_str
from .._client_base import ClientBase, _generate_sas_token, _parse_conn_str, _get_backoff_time
from .._utils import utc_from_timestamp, parse_sas_credential
from ..exceptions import ClientClosedError, ConnectError
from .._constants import (
Expand Down Expand Up @@ -211,7 +211,7 @@ async def _backoff_async(
entity_name: Optional[str] = None,
) -> None:
entity_name = entity_name or self._container_id
backoff = self._config.backoff_factor * 2 ** retried_times
backoff = _get_backoff_time(self._config.retry_mode, self._config.backoff_factor, retried_times)
if backoff <= self._config.backoff_max and (
timeout_time is None or time.time() + backoff <= timeout_time
): # pylint:disable=no-else-return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
# --------------------------------------------------------------------------------------------

from typing import TYPE_CHECKING
from asyncio import Lock

from uamqp import TransportType, c_uamqp
from uamqp.async_ops import ConnectionAsync

if TYPE_CHECKING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ._client_base_async import ClientBaseAsync
from .._constants import ALL_PARTITIONS
from .._eventprocessor.common import LoadBalancingStrategy
from .._retry import RetryMode


if TYPE_CHECKING:
Expand Down Expand Up @@ -79,6 +80,8 @@ class EventHubConsumerClient(ClientBaseAsync):
The failed internal partition consumer will be closed (`on_partition_close` will be called if provided) and
new internal partition consumer will be created (`on_partition_initialize` will be called if provided) to resume
receiving.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no further activity. By default the value is None, meaning that the client will not shutdown due to
inactivity unless initiated by the service.
Expand Down Expand Up @@ -239,6 +242,8 @@ def from_connection_string(
information. The failed internal partition consumer will be closed (`on_partition_close` will be called
if provided) and new internal partition consumer will be created (`on_partition_initialize` will be called if
provided) to resume receiving.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no further activity. By default the value is None, meaning that the client will not shutdown due
to inactivity unless initiated by the service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ._producer_async import EventHubProducer
from .._constants import ALL_PARTITIONS
from .._common import EventDataBatch, EventData
from .._retry import RetryMode

if TYPE_CHECKING:
from azure.core.credentials_async import AsyncTokenCredential
Expand Down Expand Up @@ -46,6 +47,8 @@ class EventHubProducerClient(ClientBaseAsync):
:keyword str user_agent: If specified, this will be added in front of the user agent string.
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. Default
value is 3.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no activity. By default the value is None, meaning that the client will not shutdown due to inactivity
unless initiated by the service.
Expand Down Expand Up @@ -196,6 +199,8 @@ def from_connection_string(
:keyword str user_agent: If specified, this will be added in front of the user agent string.
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs.
Default value is 3.
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no activity. By default the value is None, meaning that the client will not shutdown due to
inactivity unless initiated by the service.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient, TransportType
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import time
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient, TransportType, RetryMode


def test_custom_endpoint():
Expand Down Expand Up @@ -123,3 +130,19 @@ def test_custom_certificate():
connection_verify='D:/local/certfile'
)
assert consumer._config.connection_verify == 'D:/local/certfile'

def test_backoff_fixed_retry():
client = EventHubProducerClient(
'fake.host.com',
'fake_eh',
None,
retry_mode=RetryMode.Fixed
)
backoff = client._config.backoff_factor
start_time = time.time()
client._backoff(retried_times=1, last_exception=Exception('fake'), timeout_time=None)
sleep_time = time.time() - start_time
# exp = 0.8 * (2 ** 1) = 1.6
# time.sleep() in _backoff will take AT LEAST time 'exp' for RetryMode.Exponential
# check that fixed is less than 'exp'
assert sleep_time < backoff * (2 ** 1)