Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raise error for loop if Python 3.10 #261

Merged
merged 12 commits into from
Aug 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
cython==0.29.21
setuptools>=27.1.2
wheel>=0.32.0
pytest==6.2.4; python_version >= '3.10'
pytest==5.4.1; python_version >= '3.6'
swathipil marked this conversation as resolved.
Show resolved Hide resolved
pytest==4.6.9; python_version == '2.7'
pytest-asyncio==0.10.0; python_version >= '3.6'
Expand Down
4 changes: 2 additions & 2 deletions samples/asynctests/test_azure_iothub_cli_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def executor(target, consumer_group, enqueued_time, device_id=None, properties=N
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

future = asyncio.gather(*coroutines, loop=loop, return_exceptions=True)
future = asyncio.gather(*coroutines, return_exceptions=True)
result = None

try:
Expand All @@ -112,7 +112,7 @@ def stop_and_suppress_eloop():
except KeyboardInterrupt:
print('Stopping event monitor...')
remaining_tasks = [t for t in asyncio.Task.all_tasks() if not t.done()]
remaining_future = asyncio.gather(*remaining_tasks, loop=loop, return_exceptions=True)
remaining_future = asyncio.gather(*remaining_tasks, return_exceptions=True)
try:
loop.run_until_complete(asyncio.wait_for(remaining_future, 5))
except concurrent.futures.TimeoutError:
Expand Down
59 changes: 59 additions & 0 deletions samples/asynctests/test_loop_param_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import sys
import pytest

import asyncio
from uamqp.async_ops.mgmt_operation_async import MgmtOperationAsync
from uamqp.async_ops.receiver_async import MessageReceiverAsync
from uamqp.authentication.cbs_auth_async import CBSAsyncAuthMixin
from uamqp.async_ops.sender_async import MessageSenderAsync
from uamqp.async_ops.client_async import (
AMQPClientAsync,
SendClientAsync,
ReceiveClientAsync,
ConnectionAsync,
)

@pytest.mark.asyncio
@pytest.mark.skipif(sys.version_info < (3, 10), reason="raise error if loop passed in >=3.10")
async def test_error_loop_arg_async():
with pytest.raises(ValueError) as e:
AMQPClientAsync("fake_addr", loop=asyncio.get_event_loop())
assert "no longer supports loop" in e
client_async = AMQPClientAsync("sb://resourcename.servicebus.windows.net/")
assert len(client_async._internal_kwargs) == 0 # pylint:disable=protected-access

with pytest.raises(ValueError) as e:
SendClientAsync("fake_addr", loop=asyncio.get_event_loop())
assert "no longer supports loop" in e
client_async = SendClientAsync("sb://resourcename.servicebus.windows.net/")
assert len(client_async._internal_kwargs) == 0 # pylint:disable=protected-access

with pytest.raises(ValueError) as e:
ReceiveClientAsync("fake_addr", loop=asyncio.get_event_loop())
assert "no longer supports loop" in e
client_async = ReceiveClientAsync("sb://resourcename.servicebus.windows.net/")
assert len(client_async._internal_kwargs) == 0 # pylint:disable=protected-access

with pytest.raises(ValueError) as e:
ConnectionAsync("fake_addr", sasl='fake_sasl', loop=asyncio.get_event_loop())
assert "no longer supports loop" in e

with pytest.raises(ValueError) as e:
MgmtOperationAsync("fake_addr", loop=asyncio.get_event_loop())
assert "no longer supports loop" in e

with pytest.raises(ValueError) as e:
MessageReceiverAsync("fake_addr", "session", "target", "on_message_received", loop=asyncio.get_event_loop())
assert "no longer supports loop" in e

with pytest.raises(ValueError) as e:
MessageSenderAsync("fake_addr", "source", "target", loop=asyncio.get_event_loop())
assert "no longer supports loop" in e

async def auth_async_loop():
auth_async = CBSAsyncAuthMixin()
with pytest.raises(ValueError) as e:
await auth_async.create_authenticator_async("fake_conn", loop=asyncio.get_event_loop())
assert "no longer supports loop" in e
loop = asyncio.get_event_loop()
loop.run_until_complete(auth_async_loop())
68 changes: 30 additions & 38 deletions uamqp/async_ops/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
import uuid

from uamqp import address, authentication, client, constants, errors, compat, c_uamqp
from uamqp.utils import get_running_loop
from uamqp.async_ops.connection_async import ConnectionAsync
from uamqp.async_ops.receiver_async import MessageReceiverAsync
from uamqp.async_ops.sender_async import MessageSenderAsync
from uamqp.async_ops.session_async import SessionAsync
from uamqp.async_ops.utils import get_dict_with_loop_if_needed

try:
TimeoutException = TimeoutError
Expand All @@ -43,8 +43,6 @@ class AMQPClientAsync(client.AMQPClient):
:param client_name: The name for the client, also known as the Container ID.
If no name is provided, a random GUID will be used.
:type client_name: str or bytes
:param loop: A user specified event loop.
:type loop: ~asycnio.AbstractEventLoop
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
:param debug: Whether to turn on network trace logs. If `True`, trace logs
will be logged at INFO level. Default is `False`.
:type debug: bool
Expand Down Expand Up @@ -105,14 +103,7 @@ def __init__(
keep_alive_interval=None,
**kwargs):

if loop:
self.loop = loop
else:
try:
if not self.loop: # from sub class instance
self.loop = get_running_loop()
except AttributeError:
self.loop = get_running_loop()
swathipil marked this conversation as resolved.
Show resolved Hide resolved
self._internal_kwargs = get_dict_with_loop_if_needed(loop)

super(AMQPClientAsync, self).__init__(
remote_address,
Expand Down Expand Up @@ -146,9 +137,9 @@ async def _keep_alive_async(self):
_logger.info("Keeping %r connection alive. %r",
self.__class__.__name__,
self._connection.container_id)
await asyncio.shield(self._connection.work_async(), loop=self.loop)
await asyncio.shield(self._connection.work_async(), **self._internal_kwargs)
start_time = current_time
await asyncio.sleep(1, loop=self.loop)
await asyncio.sleep(1, **self._internal_kwargs)
except Exception as e: # pylint: disable=broad-except
_logger.info("Connection keep-alive for %r failed: %r.", self.__class__.__name__, e)

Expand All @@ -163,7 +154,7 @@ async def _client_ready_async(self): # pylint: disable=no-self-use

async def _client_run_async(self):
"""Perform a single Connection iteration."""
await asyncio.shield(self._connection.work_async(), loop=self.loop)
await asyncio.shield(self._connection.work_async(), **self._internal_kwargs)

async def _redirect_async(self, redirect, auth):
"""Redirect the client endpoint using a Link DETACH redirect
Expand All @@ -177,7 +168,7 @@ async def _redirect_async(self, redirect, auth):
# pylint: disable=protected-access
if not self._connection._cbs:
_logger.info("Closing non-CBS session.")
await asyncio.shield(self._session.destroy_async(), loop=self.loop)
await asyncio.shield(self._session.destroy_async(), **self._internal_kwargs)
self._session = None
self._auth = auth
self._hostname = self._remote_address.hostname
Expand All @@ -197,8 +188,8 @@ async def _build_session_async(self):
outgoing_window=self._outgoing_window,
handle_max=self._handle_max,
on_attach=self._on_attach,
loop=self.loop),
loop=self.loop)
**self._internal_kwargs),
**self._internal_kwargs)
self._session = self._auth._session # pylint: disable=protected-access
elif self._connection._cbs:
self._session = self._auth._session # pylint: disable=protected-access
Expand All @@ -209,7 +200,11 @@ async def _build_session_async(self):
outgoing_window=self._outgoing_window,
handle_max=self._handle_max,
on_attach=self._on_attach,
loop=self.loop)
**self._internal_kwargs)

@property
def loop(self):
return self._internal_kwargs.get("loop")

async def open_async(self, connection=None):
"""Asynchronously open the client. The client can create a new Connection
Expand Down Expand Up @@ -242,10 +237,10 @@ async def open_async(self, connection=None):
remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio,
error_policy=self._error_policy,
debug=self._debug_trace,
loop=self.loop)
**self._internal_kwargs)
await self._build_session_async()
if self._keep_alive_interval:
self._keep_alive_thread = asyncio.ensure_future(self._keep_alive_async(), loop=self.loop)
self._keep_alive_thread = asyncio.ensure_future(self._keep_alive_async(), **self._internal_kwargs)
finally:
if self._ext_connection:
connection.release_async()
Expand All @@ -267,13 +262,13 @@ async def close_async(self):
return # already closed.
if not self._connection._cbs: # pylint: disable=protected-access
_logger.info("Closing non-CBS session.")
await asyncio.shield(self._session.destroy_async(), loop=self.loop)
await asyncio.shield(self._session.destroy_async(), **self._internal_kwargs)
else:
_logger.info("CBS session pending %r.", self._connection.container_id)
self._session = None
if not self._ext_connection:
_logger.info("Closing exclusive connection %r.", self._connection.container_id)
await asyncio.shield(self._connection.destroy_async(), loop=self.loop)
await asyncio.shield(self._connection.destroy_async(), **self._internal_kwargs)
else:
_logger.info("Shared connection remaining open.")
self._connection = None
Expand Down Expand Up @@ -314,7 +309,7 @@ async def mgmt_request_async(self, message, operation, op_type=None, node=None,
:rtype: ~uamqp.message.Message
"""
while not await self.auth_complete_async():
await asyncio.sleep(0.05, loop=self.loop)
await asyncio.sleep(0.05, **self._internal_kwargs)
response = await asyncio.shield(
self._session.mgmt_request_async(
message,
Expand All @@ -325,7 +320,7 @@ async def mgmt_request_async(self, message, operation, op_type=None, node=None,
encoding=self._encoding,
debug=self._debug_trace,
**kwargs),
loop=self.loop)
**self._internal_kwargs)
return response

async def auth_complete_async(self):
Expand Down Expand Up @@ -396,8 +391,6 @@ class SendClientAsync(client.SendClient, AMQPClientAsync):
:param client_name: The name for the client, also known as the Container ID.
If no name is provided, a random GUID will be used.
:type client_name: str or bytes
:param loop: A user specified event loop.
:type loop: ~asycnio.AbstractEventLoop
:param debug: Whether to turn on network trace logs. If `True`, trace logs
will be logged at INFO level. Default is `False`.
:type debug: bool
Expand Down Expand Up @@ -474,7 +467,7 @@ def __init__(
error_policy=None,
keep_alive_interval=None,
**kwargs):
self.loop = loop or get_running_loop()
swathipil marked this conversation as resolved.
Show resolved Hide resolved
self._internal_kwargs = get_dict_with_loop_if_needed(loop)
client.SendClient.__init__(
self,
target,
Expand All @@ -488,7 +481,7 @@ def __init__(

# AMQP object settings
self.sender_type = MessageSenderAsync
self._pending_messages_lock = asyncio.Lock(loop=self.loop)
self._pending_messages_lock = asyncio.Lock(**self._internal_kwargs)

async def _client_ready_async(self):
"""Determine whether the client is ready to start sending messages.
Expand All @@ -513,8 +506,8 @@ async def _client_ready_async(self):
error_policy=self._error_policy,
encoding=self._encoding,
desired_capabilities=self._desired_capabilities,
loop=self.loop)
await asyncio.shield(self.message_handler.open_async(), loop=self.loop)
**self._internal_kwargs)
await asyncio.shield(self.message_handler.open_async(), **self._internal_kwargs)
return False
if self.message_handler.get_state() == constants.MessageSenderState.Error:
raise errors.MessageHandlerError(
Expand All @@ -528,7 +521,8 @@ async def _client_ready_async(self):
async def _transfer_message_async(self, message, timeout):
sent = await asyncio.shield(
self.message_handler.send_async(message, self._on_message_sent, timeout=timeout),
loop=self.loop)
**self._internal_kwargs
)
if not sent:
_logger.info("Message not sent, raising RuntimeError.")
raise RuntimeError("Message sender failed to add message data to outgoing queue.")
Expand Down Expand Up @@ -567,7 +561,7 @@ async def _client_run_async(self):
"""
# pylint: disable=protected-access
await self.message_handler.work_async()
await asyncio.shield(self._connection.work_async(), loop=self.loop)
await asyncio.shield(self._connection.work_async(), **self._internal_kwargs)
if self._connection._state == c_uamqp.ConnectionState.DISCARDING:
raise errors.ConnectionClose(constants.ErrorCodes.InternalServerError)
self._waiting_messages = 0
Expand Down Expand Up @@ -692,8 +686,6 @@ class ReceiveClientAsync(client.ReceiveClient, AMQPClientAsync):
:param client_name: The name for the client, also known as the Container ID.
If no name is provided, a random GUID will be used.
:type client_name: str or bytes
:param loop: A user specified event loop.
:type loop: ~asycnio.AbstractEventLoop
:param debug: Whether to turn on network trace logs. If `True`, trace logs
will be logged at INFO level. Default is `False`.
:type debug: bool
Expand Down Expand Up @@ -782,7 +774,7 @@ def __init__(
error_policy=None,
keep_alive_interval=None,
**kwargs):
self.loop = loop or get_running_loop()
self._internal_kwargs = get_dict_with_loop_if_needed(loop)
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
client.ReceiveClient.__init__(
self,
source,
Expand Down Expand Up @@ -823,8 +815,8 @@ async def _client_ready_async(self):
error_policy=self._error_policy,
encoding=self._encoding,
desired_capabilities=self._desired_capabilities,
loop=self.loop)
await asyncio.shield(self.message_handler.open_async(), loop=self.loop)
)
await asyncio.shield(self.message_handler.open_async(), **self._internal_kwargs)
return False
if self.message_handler.get_state() == constants.MessageReceiverState.Error:
raise errors.MessageHandlerError(
Expand All @@ -850,7 +842,7 @@ async def _client_run_async(self):
now = self._counter.get_current_ms()
if self._last_activity_timestamp and not self._was_message_received:
# If no messages are coming through, back off a little to keep CPU use low.
await asyncio.sleep(0.05, loop=self.loop)
await asyncio.sleep(0.05, **self._internal_kwargs)
if self._timeout > 0:
timespan = now - self._last_activity_timestamp
if timespan >= self._timeout:
Expand Down
21 changes: 12 additions & 9 deletions uamqp/async_ops/connection_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
# license information.
#--------------------------------------------------------------------------

import sys
import asyncio
import logging

import uamqp
from uamqp import c_uamqp, connection
from uamqp.utils import get_running_loop
from uamqp.async_ops.utils import get_dict_with_loop_if_needed

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,8 +57,6 @@ class ConnectionAsync(connection.Connection):
:param encoding: The encoding to use for parameters supplied as strings.
Default is 'UTF-8'
:type encoding: str
:param loop: A user specified event loop.
:type loop: ~asyncio.AbstractEventLoop
"""

def __init__(self, hostname, sasl,
Expand All @@ -71,7 +70,7 @@ def __init__(self, hostname, sasl,
debug=False,
encoding='UTF-8',
loop=None):
self.loop = loop or get_running_loop()
self._internal_kwargs = get_dict_with_loop_if_needed(loop)
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
super(ConnectionAsync, self).__init__(
hostname, sasl,
container_id=container_id,
Expand All @@ -83,7 +82,7 @@ def __init__(self, hostname, sasl,
error_policy=error_policy,
debug=debug,
encoding=encoding)
self._async_lock = asyncio.Lock(loop=self.loop)
self._async_lock = asyncio.Lock(**self._internal_kwargs)

async def __aenter__(self):
"""Open the Connection in an async context manager."""
Expand All @@ -105,8 +104,12 @@ async def _close_async(self):
self.auth.close()
_logger.info("Connection shutdown complete %r.", self.container_id)

@property
def loop(self):
return self._internal_kwargs.get("loop")

async def lock_async(self, timeout=3.0):
await asyncio.wait_for(self._async_lock.acquire(), timeout=timeout, loop=self.loop)
await asyncio.wait_for(self._async_lock.acquire(), timeout=timeout, **self._internal_kwargs)

def release_async(self):
try:
Expand Down Expand Up @@ -135,12 +138,12 @@ async def work_async(self):
if self._closing:
_logger.debug("Connection unlocked but shutting down.")
return
await asyncio.sleep(0, loop=self.loop)
await asyncio.sleep(0, **self._internal_kwargs)
self._conn.do_work()
except asyncio.TimeoutError:
_logger.debug("Connection %r timed out while waiting for lock acquisition.", self.container_id)
finally:
await asyncio.sleep(0, loop=self.loop)
await asyncio.sleep(0, **self._internal_kwargs)
self.release_async()

async def sleep_async(self, seconds):
Expand All @@ -151,7 +154,7 @@ async def sleep_async(self, seconds):
"""
try:
await self.lock_async()
await asyncio.sleep(seconds, loop=self.loop)
await asyncio.sleep(seconds, **self._internal_kwargs)
except asyncio.TimeoutError:
_logger.debug("Connection %r timed out while waiting for lock acquisition.", self.container_id)
finally:
Expand Down
Loading