Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aio folder and update GA tag #16178

Merged
merged 4 commits into from
Feb 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion sdk/search/azure-search-documents/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
# Release History

## 11.1.0b5 (Unreleased)
## 11.1.0 (2021-02-09)

**Breaking Changes**

- `IndexDocumentsBatch` does not support `enqueue_action` any longer. `enqueue_actions` takes a single action too.
- `max_retries` of `SearchIndexingBufferedSender` is renamed to `max_retries_per_action`
- `SearchClient` does not support `get_search_indexing_buffered_sender`

## 11.1.0b4 (2020-11-10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,15 @@ def dequeue_actions(self, **kwargs): # pylint: disable=unused-argument
return result

def enqueue_actions(self, new_actions, **kwargs): # pylint: disable=unused-argument
# type: (List[IndexAction]) -> None
# type: (Union[IndexAction, List[IndexAction]]) -> None
"""Enqueue a list of index actions to index.
"""
with self._lock:
self._actions.extend(new_actions)

def enqueue_action(self, new_action, **kwargs): # pylint: disable=unused-argument
# type: (IndexAction) -> None
"""Enqueue a single index action
"""
with self._lock:
self._actions.append(new_action)
if isinstance(new_actions, IndexAction):
with self._lock:
self._actions.append(new_actions)
else:
with self._lock:
self._actions.extend(new_actions)

def _extend_batch(self, documents, action_type):
# type: (List[dict], str) -> List[IndexAction]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from ._queries import AutocompleteQuery, SearchQuery, SuggestQuery
from .._headers_mixin import HeadersMixin
from .._version import SDK_MONIKER
from ._search_indexing_buffered_sender import SearchIndexingBufferedSender

if TYPE_CHECKING:
# pylint:disable=unused-import,ungrouped-imports
Expand Down Expand Up @@ -100,15 +99,6 @@ def close(self):
"""
return self._client.close()

def get_search_indexing_buffered_sender(self, **kwargs):
# type: (dict) -> SearchIndexingBufferedSender
"""Return a SearchIndexingBufferedSender instance

:rtype: ~azure.search.documents.SearchIndexingBufferedSender

"""
return SearchIndexingBufferedSender(self._endpoint, self._index_name, self._credential, **kwargs)

@distributed_trace
def get_document_count(self, **kwargs):
# type: (**Any) -> int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ class SearchIndexingBufferedSender(SearchIndexingBufferedSenderBase, HeadersMixi
to 86400s (1 day)
:keyword int initial_batch_action_count: The initial number of actions to group into a batch when
tuning the behavior of the sender. The default value is 512.
:keyword int max_retries: The number of times to retry a failed document. The default value is 3.
:keyword int max_retries_per_action: The number of times to retry a failed document. The default value is 3.
:keyword callable on_new: If it is set, the client will call corresponding methods when there
is a new IndexAction added.
is a new IndexAction added. This may be called from main thread or a worker thread.
:keyword callable on_progress: If it is set, the client will call corresponding methods when there
is a IndexAction succeeds.
is a IndexAction succeeds. This may be called from main thread or a worker thread.
:keyword callable on_error: If it is set, the client will call corresponding methods when there
is a IndexAction fails.
is a IndexAction fails. This may be called from main thread or a worker thread.
:keyword callable on_remove: If it is set, the client will call corresponding methods when there
is a IndexAction removed from the queue (succeeds or fails).
is a IndexAction removed from the queue (succeeds or fails). This may be called from main
thread or a worker thread.
:keyword str api_version: The Search API version to use for requests.
"""
# pylint: disable=too-many-instance-attributes
Expand Down Expand Up @@ -262,6 +263,8 @@ def _index_documents_actions(self, actions, **kwargs):
if len(actions) == 1:
raise
pos = round(len(actions) / 2)
if pos < self._batch_action_count:
self._index_documents_batch = pos
now = int(time.time())
remaining = timeout - (now - begin_time)
if remaining < 0:
Expand Down Expand Up @@ -312,11 +315,11 @@ def _retry_action(self, action):
if not counter:
# first time that fails
self._retry_counter[key] = 1
self._index_documents_batch.enqueue_action(action)
elif counter < self._max_retries - 1:
self._index_documents_batch.enqueue_actions(action)
elif counter < self._max_retries_per_action - 1:
# not reach retry limit yet
self._retry_counter[key] = counter + 1
self._index_documents_batch.enqueue_action(action)
self._index_documents_batch.enqueue_actions(action)
else:
self._callback_fail(action)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, endpoint, index_name, credential, **kwargs):
self._auto_flush_interval = kwargs.pop('auto_flush_interval', self._DEFAULT_AUTO_FLUSH_INTERVAL)
if self._auto_flush_interval <= 0:
self._auto_flush_interval = 86400
self._max_retries = kwargs.pop('max_retries', self._DEFAULT_MAX_RETRIES)
self._max_retries_per_action = kwargs.pop('max_retries_per_action ', self._DEFAULT_MAX_RETRIES)
self._endpoint = endpoint # type: str
self._index_name = index_name # type: str
self._index_key = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,15 @@ async def dequeue_actions(self):
return result

async def enqueue_actions(self, new_actions):
# type: (List[IndexAction]) -> None
# type: (Union[IndexAction, List[IndexAction]]) -> None
"""Enqueue a list of index actions to index.
"""
async with self._lock:
self._actions.extend(new_actions)

async def enqueue_action(self, new_action):
# type: (IndexAction) -> None
"""Enqueue a single index action to index.
"""
async with self._lock:
self._actions.append(new_action)
if isinstance(new_actions, IndexAction):
async with self._lock:
self._actions.append(new_actions)
else:
async with self._lock:
self._actions.extend(new_actions)

async def _extend_batch(self, documents, action_type):
# type: (List[dict], str) -> List[IndexAction]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SearchIndexingBufferedSender(SearchIndexingBufferedSenderBase, HeadersMixi
to 86400s (1 day)
:keyword int initial_batch_action_count: The initial number of actions to group into a batch when
tuning the behavior of the sender. The default value is 512.
:keyword int max_retries: The number of times to retry a failed document. The default value is 3.
:keyword int max_retries_per_action: The number of times to retry a failed document. The default value is 3.
:keyword callable on_new: If it is set, the client will call corresponding methods when there
is a new IndexAction added.
:keyword callable on_progress: If it is set, the client will call corresponding methods when there
Expand Down Expand Up @@ -258,6 +258,8 @@ async def _index_documents_actions(self, actions, **kwargs):
if len(actions) == 1:
raise
pos = round(len(actions) / 2)
if pos < self._batch_action_count:
self._index_documents_batch = pos
now = int(time.time())
remaining = timeout - (now - begin_time)
if remaining < 0:
Expand Down Expand Up @@ -306,11 +308,11 @@ async def _retry_action(self, action):
if not counter:
# first time that fails
self._retry_counter[key] = 1
await self._index_documents_batch.enqueue_action(action)
elif counter < self._max_retries - 1:
await self._index_documents_batch.enqueue_actions(action)
elif counter < self._max_retries_per_action - 1:
# not reach retry limit yet
self._retry_counter[key] = counter + 1
await self._index_documents_batch.enqueue_action(action)
await self._index_documents_batch.enqueue_actions(action)
else:
await self._callback_fail(action)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
# Licensed under the MIT License.
# ------------------------------------

VERSION = "11.1.0b5" # type: str
VERSION = "11.1.0" # type: str

SDK_MONIKER = "search-documents/{}".format(VERSION) # type: str
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
#
# --------------------------------------------------------------------------

from ._internal.aio._search_client_async import AsyncSearchItemPaged, SearchClient
from ._internal.aio._search_indexing_buffered_sender_async import SearchIndexingBufferedSender
from .._internal.aio._search_client_async import AsyncSearchItemPaged, SearchClient
from .._internal.aio._search_indexing_buffered_sender_async import SearchIndexingBufferedSender

__all__ = (
"AsyncSearchItemPaged",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
#
# --------------------------------------------------------------------------

from ._internal.aio._search_index_client import SearchIndexClient
from ._internal.aio._search_indexer_client import SearchIndexerClient
from .._internal.aio._search_index_client import SearchIndexClient
from .._internal.aio._search_indexer_client import SearchIndexerClient

__all__ = (
"SearchIndexClient",
Expand Down
2 changes: 1 addition & 1 deletion sdk/search/azure-search-documents/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
author_email='ascl@microsoft.com',
url='https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/search/azure-search-documents',
classifiers=[
"Development Status :: 4 - Beta",
"Development Status :: 5 - Production/Stable",
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TestSearchBatchingClientAsync(object):
async def test_search_indexing_buffered_sender_kwargs(self):
async with SearchIndexingBufferedSender("endpoint", "index name", CREDENTIAL, window=100) as client:
assert client._batch_action_count == 512
assert client._max_retries == 3
assert client._max_retries_per_action == 3
assert client._auto_flush_interval == 60
assert client._auto_flush

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TestSearchBatchingClient(object):
def test_search_indexing_buffered_sender_kwargs(self):
with SearchIndexingBufferedSender("endpoint", "index name", CREDENTIAL, window=100) as client:
assert client._batch_action_count == 512
assert client._max_retries == 3
assert client._max_retries_per_action == 3
assert client._auto_flush_interval == 60
assert client._auto_flush

Expand All @@ -39,7 +39,7 @@ def test_batch_queue(self):
actions = client._index_documents_batch.dequeue_actions()
assert len(client.actions) == 0
for action in actions:
client._index_documents_batch.enqueue_action(action)
client._index_documents_batch.enqueue_actions(action)
assert len(client.actions) == 7


Expand Down