Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datalake] Removed list_paths manual paging and deserialization #16309

Merged
merged 6 commits into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def test_create_largest_blob_from_stream_single_upload_without_network(self, res
payload_dropping_policy = PayloadDroppingPolicy()
credential_policy = _format_shared_key_credential(storage_account.name, storage_account_key)
self._setup(storage_account, storage_account_key, [payload_dropping_policy, credential_policy],
max_single_put_size=LARGEST_SINGLE_UPLOAD_SIZE)
max_single_put_size=LARGEST_SINGLE_UPLOAD_SIZE+1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this in an attempt to resolve the flaky test test_create_largest_blob_from_stream_single_upload_without_network which seems to sometimes do a PUT twice rather than once causing one of the assertions to fail.

blob_name = self._get_blob_reference()
blob = self.bsc.get_blob_client(self.container_name, blob_name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ async def test_create_largest_blob_from_stream_single_upload_without_network(sel
payload_dropping_policy = PayloadDroppingPolicy()
credential_policy = _format_shared_key_credential(storage_account.name, storage_account_key)
await self._setup(storage_account, storage_account_key, [payload_dropping_policy, credential_policy],
max_single_put_size=LARGEST_SINGLE_UPLOAD_SIZE)
max_single_put_size=LARGEST_SINGLE_UPLOAD_SIZE+1)
blob_name = self._get_blob_reference()
blob = self.bsc.get_blob_client(self.container_name, blob_name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from azure.core.pipeline.policies import ContentDecodePolicy
from azure.core.exceptions import HttpResponseError, DecodeError, ResourceModifiedError, ClientAuthenticationError, \
ResourceNotFoundError, ResourceExistsError
from ._models import FileProperties, DirectoryProperties, LeaseProperties
from ._models import FileProperties, DirectoryProperties, LeaseProperties, PathProperties
from ._shared.models import StorageErrorCode

if TYPE_CHECKING:
Expand Down Expand Up @@ -44,6 +44,10 @@ def deserialize_file_properties(response, obj, headers):
return file_properties


def deserialize_path_properties(path_list):
return [PathProperties._from_generated(path) for path in path_list] # pylint: disable=protected-access


def from_blob_properties(blob_properties):
file_props = FileProperties()
file_props.name = blob_properties.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import functools
from typing import Optional

try:
from urllib.parse import urlparse, quote
Expand All @@ -18,11 +18,11 @@
from ._shared.base_client import TransportWrapper, StorageAccountHostsMixin, parse_query, parse_connection_str
from ._serialize import convert_dfs_url_to_blob_url
from ._models import LocationMode, FileSystemProperties, PublicAccess
from ._list_paths_helper import PathPropertiesPaged
from ._data_lake_file_client import DataLakeFileClient
from ._data_lake_directory_client import DataLakeDirectoryClient
from ._data_lake_lease import DataLakeLeaseClient
from ._generated import AzureDataLakeStorageRESTAPI
from ._deserialize import deserialize_path_properties


class FileSystemClient(StorageAccountHostsMixin):
Expand Down Expand Up @@ -463,14 +463,13 @@ def get_paths(self, path=None, # type: Optional[str]
:caption: List the paths in the file system.
"""
timeout = kwargs.pop('timeout', None)
command = functools.partial(
self._client.file_system.list_paths,
return self._client.file_system.list_paths(
recursive=recursive,
max_results=max_results,
path=path,
timeout=timeout,
cls=deserialize_path_properties,
**kwargs)
return ItemPaged(
command, recursive, path=path, max_results=max_results,
page_iterator_class=PathPropertiesPaged, **kwargs)

def create_directory(self, directory, # type: Union[DirectoryProperties, str]
metadata=None, # type: Optional[Dict[str, str]]
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
# --------------------------------------------------------------------------
# pylint: disable=invalid-overridden-method

import functools
from typing import ( # pylint: disable=unused-import
Union, Optional, Any, Dict, TYPE_CHECKING
)
Expand All @@ -21,8 +20,8 @@

from ._data_lake_file_client_async import DataLakeFileClient
from ._data_lake_directory_client_async import DataLakeDirectoryClient
from ._models import PathPropertiesPaged
from ._data_lake_lease_async import DataLakeLeaseClient
from .._deserialize import deserialize_path_properties
from .._file_system_client import FileSystemClient as FileSystemClientBase
from .._generated.aio import AzureDataLakeStorageRESTAPI
from .._shared.base_client_async import AsyncTransportWrapper, AsyncStorageAccountHostsMixin
Expand Down Expand Up @@ -385,7 +384,7 @@ def get_paths(self, path=None, # type: Optional[str]
recursive=True, # type: Optional[bool]
max_results=None, # type: Optional[int]
**kwargs):
# type: (...) -> ItemPaged[PathProperties]
# type: (...) -> AsyncItemPaged[PathProperties]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this a break? what if somebody checks type? should we alias?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have previously fixed incorrect type annotations. The return object in this method is AsyncItemPaged so this was a mistake that needed to be fixed.

"""Returns a generator to list the paths(could be files or directories) under the specified file system.
The generator will lazily follow the continuation tokens returned by
the service.
Expand Down Expand Up @@ -421,14 +420,13 @@ def get_paths(self, path=None, # type: Optional[str]
:caption: List the blobs in the file system.
"""
timeout = kwargs.pop('timeout', None)
command = functools.partial(
self._client.file_system.list_paths,
return self._client.file_system.list_paths(
recursive=recursive,
max_results=max_results,
path=path,
timeout=timeout,
cls=deserialize_path_properties,
**kwargs)
return AsyncItemPaged(
command, recursive, path=path, max_results=max_results,
page_iterator_class=PathPropertiesPaged, **kwargs)

@distributed_trace_async
async def create_directory(self, directory, # type: Union[DirectoryProperties, str]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,7 @@
# --------------------------------------------------------------------------
# pylint: disable=too-few-public-methods, too-many-instance-attributes
# pylint: disable=super-init-not-called, too-many-lines
from azure.core.async_paging import AsyncPageIterator
from azure.core.exceptions import HttpResponseError
from azure.storage.blob.aio._models import ContainerPropertiesPaged

from .._deserialize import process_storage_error
from .._generated.models import Path
from .._models import PathProperties

from .._models import FileSystemProperties


Expand Down Expand Up @@ -46,68 +39,3 @@ def __init__(self, *args, **kwargs):
@staticmethod
def _build_item(item):
return FileSystemProperties._from_generated(item) # pylint: disable=protected-access


class PathPropertiesPaged(AsyncPageIterator):
"""An Iterable of Path properties.

:ivar str path: Filters the results to return only paths under the specified path.
:ivar int results_per_page: The maximum number of results retrieved per API call.
:ivar str continuation_token: The continuation token to retrieve the next page of results.
:ivar list(~azure.storage.filedatalake.PathProperties) current_page: The current page of listed results.

:param callable command: Function to retrieve the next page of items.
:param str path: Filters the results to return only paths under the specified path.
:param int max_results: The maximum number of psths to retrieve per
call.
:param str continuation_token: An opaque continuation token.
"""

def __init__(
self, command,
recursive,
path=None,
max_results=None,
continuation_token=None,
upn=None):
super(PathPropertiesPaged, self).__init__(
get_next=self._get_next_cb,
extract_data=self._extract_data_cb,
continuation_token=continuation_token or ""
)
self._command = command
self.recursive = recursive
self.results_per_page = max_results
self.path = path
self.upn = upn
self.current_page = None
self.path_list = None

async def _get_next_cb(self, continuation_token):
try:
return self._command(
self.recursive,
continuation=continuation_token or None,
path=self.path,
max_results=self.results_per_page,
upn=self.upn)
except HttpResponseError as error:
process_storage_error(error)

async def _extract_data_cb(self, get_next_return):
path_list = []
async for path in get_next_return:
path_list.append(path)
self.path_list = path_list
self.current_page = [self._build_item(item) for item in self.path_list]

return None, self.current_page

@staticmethod
def _build_item(item):
if isinstance(item, PathProperties):
return item
if isinstance(item, Path):
path = PathProperties._from_generated(item) # pylint: disable=protected-access
return path
return item