diff --git a/sdk/search/azure-search-documents/CHANGELOG.md b/sdk/search/azure-search-documents/CHANGELOG.md index 36d7b750591f..a5863ebae763 100644 --- a/sdk/search/azure-search-documents/CHANGELOG.md +++ b/sdk/search/azure-search-documents/CHANGELOG.md @@ -1,6 +1,6 @@ # Release History -## 11.1.0 (2021-02-09) +## 11.1.0 (2021-02-10) **Breaking Changes** diff --git a/sdk/search/azure-search-documents/azure/search/documents/_internal/_search_indexing_buffered_sender.py b/sdk/search/azure-search-documents/azure/search/documents/_internal/_search_indexing_buffered_sender.py index 83396e00bf6a..ab1ce8947635 100644 --- a/sdk/search/azure-search-documents/azure/search/documents/_internal/_search_indexing_buffered_sender.py +++ b/sdk/search/azure-search-documents/azure/search/documents/_internal/_search_indexing_buffered_sender.py @@ -32,10 +32,8 @@ class SearchIndexingBufferedSender(SearchIndexingBufferedSenderBase, HeadersMixi :type index_name: str :param credential: A credential to authorize search client requests :type credential: ~azure.core.credentials.AzureKeyCredential - :keyword bool auto_flush: if the auto flush mode is on. Default to True. :keyword int auto_flush_interval: how many max seconds if between 2 flushes. This only takes effect - when auto_flush is on. Default to 60 seconds. If a non-positive number is set, it will be default - to 86400s (1 day) + when auto_flush is on. Default to 60 seconds. :keyword int initial_batch_action_count: The initial number of actions to group into a batch when tuning the behavior of the sender. The default value is 512. :keyword int max_retries_per_action: The number of times to retry a failed document. The default value is 3. @@ -107,6 +105,7 @@ def flush(self, timeout=86400, **kwargs): # pylint:disable=unused-argument :param int timeout: time out setting. Default is 86400s (one day) :return: True if there are errors. Else False :rtype: bool + :raises ~azure.core.exceptions.ServiceResponseTimeoutError: """ has_error = False begin_time = int(time.time()) @@ -114,6 +113,10 @@ def flush(self, timeout=86400, **kwargs): # pylint:disable=unused-argument now = int(time.time()) remaining = timeout - (now - begin_time) if remaining < 0: + if self._on_error: + actions = self._index_documents_batch.dequeue_actions() + for action in actions: + self._on_error(action) raise ServiceResponseTimeoutError("Service response time out") result = self._process(timeout=remaining, raise_error=False) if result: diff --git a/sdk/search/azure-search-documents/azure/search/documents/_internal/_search_indexing_buffered_sender_base.py b/sdk/search/azure-search-documents/azure/search/documents/_internal/_search_indexing_buffered_sender_base.py index 226f81d220ff..11b926ad2ff9 100644 --- a/sdk/search/azure-search-documents/azure/search/documents/_internal/_search_indexing_buffered_sender_base.py +++ b/sdk/search/azure-search-documents/azure/search/documents/_internal/_search_indexing_buffered_sender_base.py @@ -30,7 +30,7 @@ def __init__(self, endpoint, index_name, credential, **kwargs): self._batch_action_count = kwargs.pop('initial_batch_action_count', self._DEFAULT_INITIAL_BATCH_ACTION_COUNT) self._auto_flush_interval = kwargs.pop('auto_flush_interval', self._DEFAULT_AUTO_FLUSH_INTERVAL) if self._auto_flush_interval <= 0: - self._auto_flush_interval = 86400 + raise ValueError("auto_flush_interval must be a positive number.") self._max_retries_per_action = kwargs.pop('max_retries_per_action ', self._DEFAULT_MAX_RETRIES) self._endpoint = endpoint # type: str self._index_name = index_name # type: str diff --git a/sdk/search/azure-search-documents/azure/search/documents/_internal/aio/_search_indexing_buffered_sender_async.py b/sdk/search/azure-search-documents/azure/search/documents/_internal/aio/_search_indexing_buffered_sender_async.py index 1555990195bb..62a571c35122 100644 --- a/sdk/search/azure-search-documents/azure/search/documents/_internal/aio/_search_indexing_buffered_sender_async.py +++ b/sdk/search/azure-search-documents/azure/search/documents/_internal/aio/_search_indexing_buffered_sender_async.py @@ -34,10 +34,8 @@ class SearchIndexingBufferedSender(SearchIndexingBufferedSenderBase, HeadersMixi :type index_name: str :param credential: A credential to authorize search client requests :type credential: ~azure.core.credentials.AzureKeyCredential - :keyword bool auto_flush: if the auto flush mode is on. Default to True. :keyword int auto_flush_interval: how many max seconds if between 2 flushes. This only takes effect - when auto_flush is on. Default to 60 seconds. If a non-positive number is set, it will be default - to 86400s (1 day) + when auto_flush is on. Default to 60 seconds. :keyword int initial_batch_action_count: The initial number of actions to group into a batch when tuning the behavior of the sender. The default value is 512. :keyword int max_retries_per_action: The number of times to retry a failed document. The default value is 3. @@ -106,6 +104,7 @@ async def flush(self, timeout=86400, **kwargs): # pylint:disable=unused-argum :param int timeout: time out setting. Default is 86400s (one day) :return: True if there are errors. Else False :rtype: bool + :raises ~azure.core.exceptions.ServiceResponseTimeoutError: """ has_error = False begin_time = int(time.time()) @@ -113,6 +112,10 @@ async def flush(self, timeout=86400, **kwargs): # pylint:disable=unused-argum now = int(time.time()) remaining = timeout - (now - begin_time) if remaining < 0: + if self._on_error: + actions = await self._index_documents_batch.dequeue_actions() + for action in actions: + await self._on_error(action) raise ServiceResponseTimeoutError("Service response time out") result = await self._process(timeout=remaining, raise_error=False) if result: diff --git a/sdk/search/azure-search-documents/tests/async_tests/test_buffered_sender_async.py b/sdk/search/azure-search-documents/tests/async_tests/test_buffered_sender_async.py index 9b18ee7fbb07..bba904af524b 100644 --- a/sdk/search/azure-search-documents/tests/async_tests/test_buffered_sender_async.py +++ b/sdk/search/azure-search-documents/tests/async_tests/test_buffered_sender_async.py @@ -6,12 +6,12 @@ from unittest import mock except ImportError: import mock - +import pytest from azure.search.documents.aio import ( SearchIndexingBufferedSender, ) from azure.core.credentials import AzureKeyCredential -from azure.core.exceptions import HttpResponseError +from azure.core.exceptions import HttpResponseError, ServiceResponseTimeoutError from azure.search.documents.models import IndexingResult CREDENTIAL = AzureKeyCredential(key="test_api_key") @@ -82,7 +82,6 @@ async def test_callback_new(self): async def test_callback_error(self): async def mock_fail_index_documents(actions, timeout=86400): if len(actions) > 0: - print("There is something wrong") result = IndexingResult() result.key = actions[0].additional_properties.get('id') result.status_code = 400 @@ -102,10 +101,33 @@ async def mock_fail_index_documents(actions, timeout=86400): await client.flush() assert on_error.called + async def test_callback_error_on_timeout(self): + async def mock_fail_index_documents(actions, timeout=86400): + if len(actions) > 0: + result = IndexingResult() + result.key = actions[0].additional_properties.get('id') + result.status_code = 400 + result.succeeded = False + self.uploaded = self.uploaded + len(actions) - 1 + time.sleep(1) + return [result] + + on_error = mock.AsyncMock() + async with SearchIndexingBufferedSender("endpoint", + "index name", + CREDENTIAL, + auto_flush=False, + on_error=on_error) as client: + client._index_documents_actions = mock_fail_index_documents + client._index_key = "id" + await client.upload_documents([{"id": 0},{"id": 1}]) + with pytest.raises(ServiceResponseTimeoutError): + await client.flush(timeout=-1) + assert on_error.call_count == 2 + async def test_callback_progress(self): async def mock_successful_index_documents(actions, timeout=86400): if len(actions) > 0: - print("There is something wrong") result = IndexingResult() result.key = actions[0].additional_properties.get('id') result.status_code = 200 diff --git a/sdk/search/azure-search-documents/tests/test_buffered_sender.py b/sdk/search/azure-search-documents/tests/test_buffered_sender.py index 550bed07f8b3..e8b5439b505f 100644 --- a/sdk/search/azure-search-documents/tests/test_buffered_sender.py +++ b/sdk/search/azure-search-documents/tests/test_buffered_sender.py @@ -7,11 +7,12 @@ except ImportError: import mock +import pytest from azure.search.documents import ( SearchIndexingBufferedSender, ) from azure.core.credentials import AzureKeyCredential -from azure.core.exceptions import HttpResponseError +from azure.core.exceptions import HttpResponseError, ServiceResponseTimeoutError from azure.search.documents.models import IndexingResult CREDENTIAL = AzureKeyCredential(key="test_api_key") @@ -86,7 +87,6 @@ def test_callback_new(self): def test_callback_error(self): def mock_fail_index_documents(actions, timeout=86400): if len(actions) > 0: - print("There is something wrong") result = IndexingResult() result.key = actions[0].additional_properties.get('id') result.status_code = 400 @@ -106,10 +106,34 @@ def mock_fail_index_documents(actions, timeout=86400): client.flush() assert on_error.called + def test_callback_error_on_timeout(self): + def mock_fail_index_documents(actions, timeout=86400): + import time + if len(actions) > 0: + result = IndexingResult() + result.key = actions[0].additional_properties.get('id') + result.status_code = 400 + result.succeeded = False + self.uploaded = self.uploaded + len(actions) - 1 + time.sleep(1) + return [result] + + on_error = mock.Mock() + with SearchIndexingBufferedSender("endpoint", + "index name", + CREDENTIAL, + auto_flush=False, + on_error=on_error) as client: + client._index_documents_actions = mock_fail_index_documents + client._index_key = "id" + client.upload_documents([{"id": 0},{"id": 1}]) + with pytest.raises(ServiceResponseTimeoutError): + client.flush(timeout=-1) + assert on_error.call_count == 2 + def test_callback_progress(self): def mock_successful_index_documents(actions, timeout=86400): if len(actions) > 0: - print("There is something wrong") result = IndexingResult() result.key = actions[0].additional_properties.get('id') result.status_code = 200