Skip to content

Commit

Permalink
raise error for loop if Python 3.10 (#261)
Browse files Browse the repository at this point in the history
* raise error for loop

* update sample

* adams comments

* adams comments

* fix

* update impl

* add loop props

* update test

* create folder

* comment test and add later

* add async loop test to async samples

* remove unnecessary imports
  • Loading branch information
swathipil authored Aug 10, 2021
1 parent 23f30b9 commit 95ef0cb
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 76 deletions.
1 change: 1 addition & 0 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
cython==0.29.21
setuptools>=27.1.2
wheel>=0.32.0
pytest==6.2.4; python_version >= '3.10'
pytest==5.4.1; python_version >= '3.6'
pytest==4.6.9; python_version == '2.7'
pytest-asyncio==0.10.0; python_version >= '3.6'
Expand Down
4 changes: 2 additions & 2 deletions samples/asynctests/test_azure_iothub_cli_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def executor(target, consumer_group, enqueued_time, device_id=None, properties=N
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

future = asyncio.gather(*coroutines, loop=loop, return_exceptions=True)
future = asyncio.gather(*coroutines, return_exceptions=True)
result = None

try:
Expand All @@ -112,7 +112,7 @@ def stop_and_suppress_eloop():
except KeyboardInterrupt:
print('Stopping event monitor...')
remaining_tasks = [t for t in asyncio.Task.all_tasks() if not t.done()]
remaining_future = asyncio.gather(*remaining_tasks, loop=loop, return_exceptions=True)
remaining_future = asyncio.gather(*remaining_tasks, return_exceptions=True)
try:
loop.run_until_complete(asyncio.wait_for(remaining_future, 5))
except concurrent.futures.TimeoutError:
Expand Down
59 changes: 59 additions & 0 deletions samples/asynctests/test_loop_param_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import sys
import pytest

import asyncio
from uamqp.async_ops.mgmt_operation_async import MgmtOperationAsync
from uamqp.async_ops.receiver_async import MessageReceiverAsync
from uamqp.authentication.cbs_auth_async import CBSAsyncAuthMixin
from uamqp.async_ops.sender_async import MessageSenderAsync
from uamqp.async_ops.client_async import (
AMQPClientAsync,
SendClientAsync,
ReceiveClientAsync,
ConnectionAsync,
)

@pytest.mark.asyncio
@pytest.mark.skipif(sys.version_info < (3, 10), reason="raise error if loop passed in >=3.10")
async def test_error_loop_arg_async():
with pytest.raises(ValueError) as e:
AMQPClientAsync("fake_addr", loop=asyncio.get_event_loop())
assert "no longer supports loop" in e
client_async = AMQPClientAsync("sb://resourcename.servicebus.windows.net/")
assert len(client_async._internal_kwargs) == 0 # pylint:disable=protected-access

with pytest.raises(ValueError) as e:
SendClientAsync("fake_addr", loop=asyncio.get_event_loop())
assert "no longer supports loop" in e
client_async = SendClientAsync("sb://resourcename.servicebus.windows.net/")
assert len(client_async._internal_kwargs) == 0 # pylint:disable=protected-access

with pytest.raises(ValueError) as e:
ReceiveClientAsync("fake_addr", loop=asyncio.get_event_loop())
assert "no longer supports loop" in e
client_async = ReceiveClientAsync("sb://resourcename.servicebus.windows.net/")
assert len(client_async._internal_kwargs) == 0 # pylint:disable=protected-access

with pytest.raises(ValueError) as e:
ConnectionAsync("fake_addr", sasl='fake_sasl', loop=asyncio.get_event_loop())
assert "no longer supports loop" in e

with pytest.raises(ValueError) as e:
MgmtOperationAsync("fake_addr", loop=asyncio.get_event_loop())
assert "no longer supports loop" in e

with pytest.raises(ValueError) as e:
MessageReceiverAsync("fake_addr", "session", "target", "on_message_received", loop=asyncio.get_event_loop())
assert "no longer supports loop" in e

with pytest.raises(ValueError) as e:
MessageSenderAsync("fake_addr", "source", "target", loop=asyncio.get_event_loop())
assert "no longer supports loop" in e

async def auth_async_loop():
auth_async = CBSAsyncAuthMixin()
with pytest.raises(ValueError) as e:
await auth_async.create_authenticator_async("fake_conn", loop=asyncio.get_event_loop())
assert "no longer supports loop" in e
loop = asyncio.get_event_loop()
loop.run_until_complete(auth_async_loop())
68 changes: 30 additions & 38 deletions uamqp/async_ops/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
import uuid

from uamqp import address, authentication, client, constants, errors, compat, c_uamqp
from uamqp.utils import get_running_loop
from uamqp.async_ops.connection_async import ConnectionAsync
from uamqp.async_ops.receiver_async import MessageReceiverAsync
from uamqp.async_ops.sender_async import MessageSenderAsync
from uamqp.async_ops.session_async import SessionAsync
from uamqp.async_ops.utils import get_dict_with_loop_if_needed

try:
TimeoutException = TimeoutError
Expand All @@ -43,8 +43,6 @@ class AMQPClientAsync(client.AMQPClient):
:param client_name: The name for the client, also known as the Container ID.
If no name is provided, a random GUID will be used.
:type client_name: str or bytes
:param loop: A user specified event loop.
:type loop: ~asycnio.AbstractEventLoop
:param debug: Whether to turn on network trace logs. If `True`, trace logs
will be logged at INFO level. Default is `False`.
:type debug: bool
Expand Down Expand Up @@ -105,14 +103,7 @@ def __init__(
keep_alive_interval=None,
**kwargs):

if loop:
self.loop = loop
else:
try:
if not self.loop: # from sub class instance
self.loop = get_running_loop()
except AttributeError:
self.loop = get_running_loop()
self._internal_kwargs = get_dict_with_loop_if_needed(loop)

super(AMQPClientAsync, self).__init__(
remote_address,
Expand Down Expand Up @@ -146,9 +137,9 @@ async def _keep_alive_async(self):
_logger.info("Keeping %r connection alive. %r",
self.__class__.__name__,
self._connection.container_id)
await asyncio.shield(self._connection.work_async(), loop=self.loop)
await asyncio.shield(self._connection.work_async(), **self._internal_kwargs)
start_time = current_time
await asyncio.sleep(1, loop=self.loop)
await asyncio.sleep(1, **self._internal_kwargs)
except Exception as e: # pylint: disable=broad-except
_logger.info("Connection keep-alive for %r failed: %r.", self.__class__.__name__, e)

Expand All @@ -163,7 +154,7 @@ async def _client_ready_async(self): # pylint: disable=no-self-use

async def _client_run_async(self):
"""Perform a single Connection iteration."""
await asyncio.shield(self._connection.work_async(), loop=self.loop)
await asyncio.shield(self._connection.work_async(), **self._internal_kwargs)

async def _redirect_async(self, redirect, auth):
"""Redirect the client endpoint using a Link DETACH redirect
Expand All @@ -177,7 +168,7 @@ async def _redirect_async(self, redirect, auth):
# pylint: disable=protected-access
if not self._connection._cbs:
_logger.info("Closing non-CBS session.")
await asyncio.shield(self._session.destroy_async(), loop=self.loop)
await asyncio.shield(self._session.destroy_async(), **self._internal_kwargs)
self._session = None
self._auth = auth
self._hostname = self._remote_address.hostname
Expand All @@ -197,8 +188,8 @@ async def _build_session_async(self):
outgoing_window=self._outgoing_window,
handle_max=self._handle_max,
on_attach=self._on_attach,
loop=self.loop),
loop=self.loop)
**self._internal_kwargs),
**self._internal_kwargs)
self._session = self._auth._session # pylint: disable=protected-access
elif self._connection._cbs:
self._session = self._auth._session # pylint: disable=protected-access
Expand All @@ -209,7 +200,11 @@ async def _build_session_async(self):
outgoing_window=self._outgoing_window,
handle_max=self._handle_max,
on_attach=self._on_attach,
loop=self.loop)
**self._internal_kwargs)

@property
def loop(self):
return self._internal_kwargs.get("loop")

async def open_async(self, connection=None):
"""Asynchronously open the client. The client can create a new Connection
Expand Down Expand Up @@ -242,10 +237,10 @@ async def open_async(self, connection=None):
remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio,
error_policy=self._error_policy,
debug=self._debug_trace,
loop=self.loop)
**self._internal_kwargs)
await self._build_session_async()
if self._keep_alive_interval:
self._keep_alive_thread = asyncio.ensure_future(self._keep_alive_async(), loop=self.loop)
self._keep_alive_thread = asyncio.ensure_future(self._keep_alive_async(), **self._internal_kwargs)
finally:
if self._ext_connection:
connection.release_async()
Expand All @@ -267,13 +262,13 @@ async def close_async(self):
return # already closed.
if not self._connection._cbs: # pylint: disable=protected-access
_logger.info("Closing non-CBS session.")
await asyncio.shield(self._session.destroy_async(), loop=self.loop)
await asyncio.shield(self._session.destroy_async(), **self._internal_kwargs)
else:
_logger.info("CBS session pending %r.", self._connection.container_id)
self._session = None
if not self._ext_connection:
_logger.info("Closing exclusive connection %r.", self._connection.container_id)
await asyncio.shield(self._connection.destroy_async(), loop=self.loop)
await asyncio.shield(self._connection.destroy_async(), **self._internal_kwargs)
else:
_logger.info("Shared connection remaining open.")
self._connection = None
Expand Down Expand Up @@ -314,7 +309,7 @@ async def mgmt_request_async(self, message, operation, op_type=None, node=None,
:rtype: ~uamqp.message.Message
"""
while not await self.auth_complete_async():
await asyncio.sleep(0.05, loop=self.loop)
await asyncio.sleep(0.05, **self._internal_kwargs)
response = await asyncio.shield(
self._session.mgmt_request_async(
message,
Expand All @@ -325,7 +320,7 @@ async def mgmt_request_async(self, message, operation, op_type=None, node=None,
encoding=self._encoding,
debug=self._debug_trace,
**kwargs),
loop=self.loop)
**self._internal_kwargs)
return response

async def auth_complete_async(self):
Expand Down Expand Up @@ -396,8 +391,6 @@ class SendClientAsync(client.SendClient, AMQPClientAsync):
:param client_name: The name for the client, also known as the Container ID.
If no name is provided, a random GUID will be used.
:type client_name: str or bytes
:param loop: A user specified event loop.
:type loop: ~asycnio.AbstractEventLoop
:param debug: Whether to turn on network trace logs. If `True`, trace logs
will be logged at INFO level. Default is `False`.
:type debug: bool
Expand Down Expand Up @@ -474,7 +467,7 @@ def __init__(
error_policy=None,
keep_alive_interval=None,
**kwargs):
self.loop = loop or get_running_loop()
self._internal_kwargs = get_dict_with_loop_if_needed(loop)
client.SendClient.__init__(
self,
target,
Expand All @@ -488,7 +481,7 @@ def __init__(

# AMQP object settings
self.sender_type = MessageSenderAsync
self._pending_messages_lock = asyncio.Lock(loop=self.loop)
self._pending_messages_lock = asyncio.Lock(**self._internal_kwargs)

async def _client_ready_async(self):
"""Determine whether the client is ready to start sending messages.
Expand All @@ -513,8 +506,8 @@ async def _client_ready_async(self):
error_policy=self._error_policy,
encoding=self._encoding,
desired_capabilities=self._desired_capabilities,
loop=self.loop)
await asyncio.shield(self.message_handler.open_async(), loop=self.loop)
**self._internal_kwargs)
await asyncio.shield(self.message_handler.open_async(), **self._internal_kwargs)
return False
if self.message_handler.get_state() == constants.MessageSenderState.Error:
raise errors.MessageHandlerError(
Expand All @@ -528,7 +521,8 @@ async def _client_ready_async(self):
async def _transfer_message_async(self, message, timeout):
sent = await asyncio.shield(
self.message_handler.send_async(message, self._on_message_sent, timeout=timeout),
loop=self.loop)
**self._internal_kwargs
)
if not sent:
_logger.info("Message not sent, raising RuntimeError.")
raise RuntimeError("Message sender failed to add message data to outgoing queue.")
Expand Down Expand Up @@ -567,7 +561,7 @@ async def _client_run_async(self):
"""
# pylint: disable=protected-access
await self.message_handler.work_async()
await asyncio.shield(self._connection.work_async(), loop=self.loop)
await asyncio.shield(self._connection.work_async(), **self._internal_kwargs)
if self._connection._state == c_uamqp.ConnectionState.DISCARDING:
raise errors.ConnectionClose(constants.ErrorCodes.InternalServerError)
self._waiting_messages = 0
Expand Down Expand Up @@ -692,8 +686,6 @@ class ReceiveClientAsync(client.ReceiveClient, AMQPClientAsync):
:param client_name: The name for the client, also known as the Container ID.
If no name is provided, a random GUID will be used.
:type client_name: str or bytes
:param loop: A user specified event loop.
:type loop: ~asycnio.AbstractEventLoop
:param debug: Whether to turn on network trace logs. If `True`, trace logs
will be logged at INFO level. Default is `False`.
:type debug: bool
Expand Down Expand Up @@ -782,7 +774,7 @@ def __init__(
error_policy=None,
keep_alive_interval=None,
**kwargs):
self.loop = loop or get_running_loop()
self._internal_kwargs = get_dict_with_loop_if_needed(loop)
client.ReceiveClient.__init__(
self,
source,
Expand Down Expand Up @@ -823,8 +815,8 @@ async def _client_ready_async(self):
error_policy=self._error_policy,
encoding=self._encoding,
desired_capabilities=self._desired_capabilities,
loop=self.loop)
await asyncio.shield(self.message_handler.open_async(), loop=self.loop)
)
await asyncio.shield(self.message_handler.open_async(), **self._internal_kwargs)
return False
if self.message_handler.get_state() == constants.MessageReceiverState.Error:
raise errors.MessageHandlerError(
Expand All @@ -850,7 +842,7 @@ async def _client_run_async(self):
now = self._counter.get_current_ms()
if self._last_activity_timestamp and not self._was_message_received:
# If no messages are coming through, back off a little to keep CPU use low.
await asyncio.sleep(0.05, loop=self.loop)
await asyncio.sleep(0.05, **self._internal_kwargs)
if self._timeout > 0:
timespan = now - self._last_activity_timestamp
if timespan >= self._timeout:
Expand Down
21 changes: 12 additions & 9 deletions uamqp/async_ops/connection_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
# license information.
#--------------------------------------------------------------------------

import sys
import asyncio
import logging

import uamqp
from uamqp import c_uamqp, connection
from uamqp.utils import get_running_loop
from uamqp.async_ops.utils import get_dict_with_loop_if_needed

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,8 +57,6 @@ class ConnectionAsync(connection.Connection):
:param encoding: The encoding to use for parameters supplied as strings.
Default is 'UTF-8'
:type encoding: str
:param loop: A user specified event loop.
:type loop: ~asyncio.AbstractEventLoop
"""

def __init__(self, hostname, sasl,
Expand All @@ -71,7 +70,7 @@ def __init__(self, hostname, sasl,
debug=False,
encoding='UTF-8',
loop=None):
self.loop = loop or get_running_loop()
self._internal_kwargs = get_dict_with_loop_if_needed(loop)
super(ConnectionAsync, self).__init__(
hostname, sasl,
container_id=container_id,
Expand All @@ -83,7 +82,7 @@ def __init__(self, hostname, sasl,
error_policy=error_policy,
debug=debug,
encoding=encoding)
self._async_lock = asyncio.Lock(loop=self.loop)
self._async_lock = asyncio.Lock(**self._internal_kwargs)

async def __aenter__(self):
"""Open the Connection in an async context manager."""
Expand All @@ -105,8 +104,12 @@ async def _close_async(self):
self.auth.close()
_logger.info("Connection shutdown complete %r.", self.container_id)

@property
def loop(self):
return self._internal_kwargs.get("loop")

async def lock_async(self, timeout=3.0):
await asyncio.wait_for(self._async_lock.acquire(), timeout=timeout, loop=self.loop)
await asyncio.wait_for(self._async_lock.acquire(), timeout=timeout, **self._internal_kwargs)

def release_async(self):
try:
Expand Down Expand Up @@ -135,12 +138,12 @@ async def work_async(self):
if self._closing:
_logger.debug("Connection unlocked but shutting down.")
return
await asyncio.sleep(0, loop=self.loop)
await asyncio.sleep(0, **self._internal_kwargs)
self._conn.do_work()
except asyncio.TimeoutError:
_logger.debug("Connection %r timed out while waiting for lock acquisition.", self.container_id)
finally:
await asyncio.sleep(0, loop=self.loop)
await asyncio.sleep(0, **self._internal_kwargs)
self.release_async()

async def sleep_async(self, seconds):
Expand All @@ -151,7 +154,7 @@ async def sleep_async(self, seconds):
"""
try:
await self.lock_async()
await asyncio.sleep(seconds, loop=self.loop)
await asyncio.sleep(seconds, **self._internal_kwargs)
except asyncio.TimeoutError:
_logger.debug("Connection %r timed out while waiting for lock acquisition.", self.container_id)
finally:
Expand Down
Loading

0 comments on commit 95ef0cb

Please sign in to comment.