Skip to content

Commit

Permalink
call on_error if timeout in flush (#16485)
Browse files Browse the repository at this point in the history
* call on_error if timeout in flush

* address arch review feedback

* update release date

* remove print statement
  • Loading branch information
xiangyan99 authored Feb 3, 2021
1 parent 6e6db92 commit cfc953f
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 15 deletions.
2 changes: 1 addition & 1 deletion sdk/search/azure-search-documents/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 11.1.0 (2021-02-09)
## 11.1.0 (2021-02-10)

**Breaking Changes**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ class SearchIndexingBufferedSender(SearchIndexingBufferedSenderBase, HeadersMixi
:type index_name: str
:param credential: A credential to authorize search client requests
:type credential: ~azure.core.credentials.AzureKeyCredential
:keyword bool auto_flush: if the auto flush mode is on. Default to True.
:keyword int auto_flush_interval: how many max seconds if between 2 flushes. This only takes effect
when auto_flush is on. Default to 60 seconds. If a non-positive number is set, it will be default
to 86400s (1 day)
when auto_flush is on. Default to 60 seconds.
:keyword int initial_batch_action_count: The initial number of actions to group into a batch when
tuning the behavior of the sender. The default value is 512.
:keyword int max_retries_per_action: The number of times to retry a failed document. The default value is 3.
Expand Down Expand Up @@ -107,13 +105,18 @@ def flush(self, timeout=86400, **kwargs): # pylint:disable=unused-argument
:param int timeout: time out setting. Default is 86400s (one day)
:return: True if there are errors. Else False
:rtype: bool
:raises ~azure.core.exceptions.ServiceResponseTimeoutError:
"""
has_error = False
begin_time = int(time.time())
while len(self.actions) > 0:
now = int(time.time())
remaining = timeout - (now - begin_time)
if remaining < 0:
if self._on_error:
actions = self._index_documents_batch.dequeue_actions()
for action in actions:
self._on_error(action)
raise ServiceResponseTimeoutError("Service response time out")
result = self._process(timeout=remaining, raise_error=False)
if result:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(self, endpoint, index_name, credential, **kwargs):
self._batch_action_count = kwargs.pop('initial_batch_action_count', self._DEFAULT_INITIAL_BATCH_ACTION_COUNT)
self._auto_flush_interval = kwargs.pop('auto_flush_interval', self._DEFAULT_AUTO_FLUSH_INTERVAL)
if self._auto_flush_interval <= 0:
self._auto_flush_interval = 86400
raise ValueError("auto_flush_interval must be a positive number.")
self._max_retries_per_action = kwargs.pop('max_retries_per_action ', self._DEFAULT_MAX_RETRIES)
self._endpoint = endpoint # type: str
self._index_name = index_name # type: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ class SearchIndexingBufferedSender(SearchIndexingBufferedSenderBase, HeadersMixi
:type index_name: str
:param credential: A credential to authorize search client requests
:type credential: ~azure.core.credentials.AzureKeyCredential
:keyword bool auto_flush: if the auto flush mode is on. Default to True.
:keyword int auto_flush_interval: how many max seconds if between 2 flushes. This only takes effect
when auto_flush is on. Default to 60 seconds. If a non-positive number is set, it will be default
to 86400s (1 day)
when auto_flush is on. Default to 60 seconds.
:keyword int initial_batch_action_count: The initial number of actions to group into a batch when
tuning the behavior of the sender. The default value is 512.
:keyword int max_retries_per_action: The number of times to retry a failed document. The default value is 3.
Expand Down Expand Up @@ -106,13 +104,18 @@ async def flush(self, timeout=86400, **kwargs): # pylint:disable=unused-argum
:param int timeout: time out setting. Default is 86400s (one day)
:return: True if there are errors. Else False
:rtype: bool
:raises ~azure.core.exceptions.ServiceResponseTimeoutError:
"""
has_error = False
begin_time = int(time.time())
while len(self.actions) > 0:
now = int(time.time())
remaining = timeout - (now - begin_time)
if remaining < 0:
if self._on_error:
actions = await self._index_documents_batch.dequeue_actions()
for action in actions:
await self._on_error(action)
raise ServiceResponseTimeoutError("Service response time out")
result = await self._process(timeout=remaining, raise_error=False)
if result:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from unittest import mock
except ImportError:
import mock

import pytest
from azure.search.documents.aio import (
SearchIndexingBufferedSender,
)
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import HttpResponseError
from azure.core.exceptions import HttpResponseError, ServiceResponseTimeoutError
from azure.search.documents.models import IndexingResult

CREDENTIAL = AzureKeyCredential(key="test_api_key")
Expand Down Expand Up @@ -82,7 +82,6 @@ async def test_callback_new(self):
async def test_callback_error(self):
async def mock_fail_index_documents(actions, timeout=86400):
if len(actions) > 0:
print("There is something wrong")
result = IndexingResult()
result.key = actions[0].additional_properties.get('id')
result.status_code = 400
Expand All @@ -102,10 +101,33 @@ async def mock_fail_index_documents(actions, timeout=86400):
await client.flush()
assert on_error.called

async def test_callback_error_on_timeout(self):
async def mock_fail_index_documents(actions, timeout=86400):
if len(actions) > 0:
result = IndexingResult()
result.key = actions[0].additional_properties.get('id')
result.status_code = 400
result.succeeded = False
self.uploaded = self.uploaded + len(actions) - 1
time.sleep(1)
return [result]

on_error = mock.AsyncMock()
async with SearchIndexingBufferedSender("endpoint",
"index name",
CREDENTIAL,
auto_flush=False,
on_error=on_error) as client:
client._index_documents_actions = mock_fail_index_documents
client._index_key = "id"
await client.upload_documents([{"id": 0},{"id": 1}])
with pytest.raises(ServiceResponseTimeoutError):
await client.flush(timeout=-1)
assert on_error.call_count == 2

async def test_callback_progress(self):
async def mock_successful_index_documents(actions, timeout=86400):
if len(actions) > 0:
print("There is something wrong")
result = IndexingResult()
result.key = actions[0].additional_properties.get('id')
result.status_code = 200
Expand Down
30 changes: 27 additions & 3 deletions sdk/search/azure-search-documents/tests/test_buffered_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
except ImportError:
import mock

import pytest
from azure.search.documents import (
SearchIndexingBufferedSender,
)
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import HttpResponseError
from azure.core.exceptions import HttpResponseError, ServiceResponseTimeoutError
from azure.search.documents.models import IndexingResult

CREDENTIAL = AzureKeyCredential(key="test_api_key")
Expand Down Expand Up @@ -86,7 +87,6 @@ def test_callback_new(self):
def test_callback_error(self):
def mock_fail_index_documents(actions, timeout=86400):
if len(actions) > 0:
print("There is something wrong")
result = IndexingResult()
result.key = actions[0].additional_properties.get('id')
result.status_code = 400
Expand All @@ -106,10 +106,34 @@ def mock_fail_index_documents(actions, timeout=86400):
client.flush()
assert on_error.called

def test_callback_error_on_timeout(self):
def mock_fail_index_documents(actions, timeout=86400):
import time
if len(actions) > 0:
result = IndexingResult()
result.key = actions[0].additional_properties.get('id')
result.status_code = 400
result.succeeded = False
self.uploaded = self.uploaded + len(actions) - 1
time.sleep(1)
return [result]

on_error = mock.Mock()
with SearchIndexingBufferedSender("endpoint",
"index name",
CREDENTIAL,
auto_flush=False,
on_error=on_error) as client:
client._index_documents_actions = mock_fail_index_documents
client._index_key = "id"
client.upload_documents([{"id": 0},{"id": 1}])
with pytest.raises(ServiceResponseTimeoutError):
client.flush(timeout=-1)
assert on_error.call_count == 2

def test_callback_progress(self):
def mock_successful_index_documents(actions, timeout=86400):
if len(actions) > 0:
print("There is something wrong")
result = IndexingResult()
result.key = actions[0].additional_properties.get('id')
result.status_code = 200
Expand Down

0 comments on commit cfc953f

Please sign in to comment.