Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catch exceptions thrown by async download #14946

Merged
merged 2 commits into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from .._deserialize import get_page_ranges_result
from .._download import process_range_and_offset, _ChunkDownloader


async def process_content(data, start_offset, end_offset, encryption):
if data is None:
raise ValueError("Response cannot be None.")
Expand Down Expand Up @@ -390,7 +389,7 @@ async def content_as_text(self, max_concurrency=1, encoding="UTF-8"):

This operation is blocking until all data is downloaded.

:keyword int max_concurrency:
:param int max_concurrency:
The number of parallel connections with which to download.
:param str encoding:
Test encoding to decode the downloaded bytes. Default is UTF-8.
Expand Down Expand Up @@ -458,8 +457,13 @@ async def readinto(self, stream):
]
while running_futures:
# Wait for some download to finish before adding a new one
_done, running_futures = await asyncio.wait(
done, running_futures = await asyncio.wait(
running_futures, return_when=asyncio.FIRST_COMPLETED)
try:
for task in done:
task.result()
except HttpResponseError as error:
process_storage_error(error)
try:
next_chunk = next(dl_tasks)
except StopIteration:
Expand All @@ -469,7 +473,12 @@ async def readinto(self, stream):

if running_futures:
# Wait for the remaining downloads to finish
await asyncio.wait(running_futures)
done, _running_futures = await asyncio.wait(running_futures)
try:
for task in done:
task.result()
except HttpResponseError as error:
process_storage_error(error)
return self.size

async def download_to_stream(self, stream, max_concurrency=1):
Expand All @@ -479,6 +488,8 @@ async def download_to_stream(self, stream, max_concurrency=1):
The stream to download to. This can be an open file-handle,
or any writable stream. The stream must be seekable if the download
uses more than one parallel connection.
:param int max_concurrency:
The number of parallel connections with which to download.
:returns: The properties of the downloaded blob.
:rtype: Any
"""
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

24 changes: 23 additions & 1 deletion sdk/storage/azure-storage-blob/tests/test_get_blob_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
TEST_BLOB_PREFIX = 'blob'



# ------------------------------------------------------------------------------

class AiohttpTestTransport(AioHttpTransport):
Expand Down Expand Up @@ -346,6 +345,29 @@ async def test_get_blob_to_stream_async(self, resource_group, location, storage_
self.assertEqual(self.byte_data, actual)
self._teardown(FILE_PATH)

@pytest.mark.live_test_only
@GlobalStorageAccountPreparer()
@AsyncStorageTestCase.await_prepared_test
async def test_readinto_raises_exceptions(self, resource_group, location, storage_account, storage_account_key):
# parallel tests introduce random order of requests, can only run live
callback_counter = {'value': 0}

def callback(response):
callback_counter['value'] += 1
if callback_counter['value'] > 3:
raise ValueError()

# Arrange
await self._setup(storage_account, storage_account_key)
blob = self.bsc.get_blob_client(self.container_name, self.byte_blob)

# Act
FILE_PATH = 'get_blob_to_stream_async.temp.{}.dat'.format(str(uuid.uuid4()))
with open(FILE_PATH, 'wb') as stream:
downloader = await blob.download_blob(max_concurrency=2, raw_response_hook=callback)
with self.assertRaises(ValueError):
await downloader.readinto(stream)

@pytest.mark.live_test_only
@GlobalStorageAccountPreparer()
@AsyncStorageTestCase.await_prepared_test
Expand Down