diff --git a/jina/serve/runtimes/worker/batch_queue.py b/jina/serve/runtimes/worker/batch_queue.py index 8f7e0d283b413..572fcf6fe2743 100644 --- a/jina/serve/runtimes/worker/batch_queue.py +++ b/jina/serve/runtimes/worker/batch_queue.py @@ -84,13 +84,14 @@ async def _sleep_then_set(self): self._flush_trigger.set() self._timer_finished = True - async def push(self, request: DataRequest) -> asyncio.Queue: + async def push(self, request: DataRequest, http = False) -> asyncio.Queue: """Append request to the the list of requests to be processed. This method creates an asyncio Queue for that request and keeps track of it. It returns this queue to the caller so that the caller can now when this request has been processed :param request: The request to append to the queue. + :param http: Flag to determine if the request is served via HTTP for some optims :return: The queue that will receive when the request is processed. """ @@ -103,7 +104,7 @@ async def push(self, request: DataRequest) -> asyncio.Queue: self._start_timer() async with self._data_lock: if not self._flush_task: - self._flush_task = asyncio.create_task(self._await_then_flush()) + self._flush_task = asyncio.create_task(self._await_then_flush(http)) self._big_doc.extend(docs) next_req_idx = len(self._requests) @@ -118,8 +119,10 @@ async def push(self, request: DataRequest) -> asyncio.Queue: return queue - async def _await_then_flush(self) -> None: - """Process all requests in the queue once flush_trigger event is set.""" + async def _await_then_flush(self, http=False) -> None: + """Process all requests in the queue once flush_trigger event is set. + :param http: Flag to determine if the request is served via HTTP for some optims + """ def _get_docs_groups_completed_request_indexes( non_assigned_docs, @@ -200,9 +203,13 @@ async def _assign_results( for docs_group, request_idx in zip(docs_grouped, completed_req_idxs): request = self._requests[request_idx] request_completed = self._requests_completed[request_idx] - request.data.set_docs_convert_arrays( - docs_group, ndarray_type=self._output_array_type - ) + if http is False or self._output_array_type is not None: + request.direct_docs = None # batch queue will work in place, therefore result will need to read from data. + request.data.set_docs_convert_arrays( + docs_group, ndarray_type=self._output_array_type + ) + else: + request.direct_docs = docs_group await request_completed.put(None) return num_assigned_docs diff --git a/jina/serve/runtimes/worker/request_handling.py b/jina/serve/runtimes/worker/request_handling.py index 7d9958c35c049..6d4b3fb31ffc9 100644 --- a/jina/serve/runtimes/worker/request_handling.py +++ b/jina/serve/runtimes/worker/request_handling.py @@ -704,8 +704,9 @@ async def handle( **self._batchqueue_config[exec_endpoint], ) # This is necessary because push might need to await for the queue to be emptied + # the batch queue will change the request in-place queue = await self._batchqueue_instances[exec_endpoint][param_key].push( - requests[0] + requests[0], http=http ) item = await queue.get() queue.task_done() diff --git a/tests/integration/docarray_v2/test_v2.py b/tests/integration/docarray_v2/test_v2.py index eebbafd1a572f..f03fa4ddb9caf 100644 --- a/tests/integration/docarray_v2/test_v2.py +++ b/tests/integration/docarray_v2/test_v2.py @@ -35,7 +35,7 @@ class Image(BaseDoc): lll: List[List[str]] = [[]] texts: DocList[TextDoc] - class MyExec(Executor): + class MyExecDifSchema(Executor): @requests(on='/foo') def foo(self, docs: DocList[Image], **kwargs) -> DocList[Image]: for doc in docs: @@ -45,7 +45,7 @@ def foo(self, docs: DocList[Image], **kwargs) -> DocList[Image]: return docs ports = [random_port() for _ in protocols] - with Flow(port=ports, protocol=protocols, replicas=replicas).add(uses=MyExec) as f: + with Flow(port=ports, protocol=protocols, replicas=replicas).add(uses=MyExecDifSchema) as f: for port, protocol in zip(ports, protocols): c = Client(port=port, protocol=protocol) docs = c.post( @@ -76,14 +76,14 @@ def test_send_custom_doc(protocols, replicas): class MyDoc(BaseDoc): my_text: str - class MyExec(Executor): + class MyExecCustomDoc(Executor): @requests(on='/foo') def foo(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]: docs[0].my_text = 'hello world' return docs ports = [random_port() for _ in protocols] - with Flow(port=ports, protocol=protocols, replicas=replicas).add(uses=MyExec): + with Flow(port=ports, protocol=protocols, replicas=replicas).add(uses=MyExecCustomDoc): for port, protocol in zip(ports, protocols): c = Client(port=port, protocol=protocol) docs = c.post( @@ -100,7 +100,7 @@ def test_input_response_schema(protocols, replicas): class MyDoc(BaseDoc): text: str - class MyExec(Executor): + class MyExecRespSchema(Executor): @requests( on='/foo', request_schema=DocList[MyDoc], @@ -112,7 +112,7 @@ def foo(self, docs, **kwargs): return docs ports = [random_port() for _ in protocols] - with Flow(port=ports, protocol=protocols, replicas=replicas).add(uses=MyExec): + with Flow(port=ports, protocol=protocols, replicas=replicas).add(uses=MyExecRespSchema): for port, protocol in zip(ports, protocols): c = Client(port=port, protocol=protocol) docs = c.post( @@ -130,7 +130,7 @@ def test_input_response_schema_annotation(protocols, replicas): class MyDoc(BaseDoc): text: str - class MyExec(Executor): + class MyExecAnnotation(Executor): @requests(on='/bar') def bar(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]: assert docs.__class__.doc_type == MyDoc @@ -138,7 +138,7 @@ def bar(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]: return docs ports = [random_port() for _ in protocols] - with Flow(port=ports, protocol=protocols, replicas=replicas).add(uses=MyExec): + with Flow(port=ports, protocol=protocols, replicas=replicas).add(uses=MyExecAnnotation): for port, protocol in zip(ports, protocols): c = Client(port=port, protocol=protocol) docs = c.post( @@ -148,7 +148,7 @@ def bar(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]: assert docs.__class__.doc_type == MyDoc -@pytest.mark.parametrize('endpoint', [f'task{i+1}' for i in range(6)]) +@pytest.mark.parametrize('endpoint', [f'task{i + 1}' for i in range(6)]) def test_generator_endpoints_type_annotations(endpoint): class InputDocument(BaseDoc): input: str @@ -168,13 +168,13 @@ async def task2(self, doc: InputDocument, **kwargs) -> OutputDocument: @requests(on='/task3') async def task3( - self, doc: InputDocument, **kwargs + self, doc: InputDocument, **kwargs ) -> Generator[OutputDocument, None, None]: yield OutputDocument(text='hello world') @requests(on='/task4') async def task4( - self, doc: InputDocument, **kwargs + self, doc: InputDocument, **kwargs ) -> AsyncGenerator[OutputDocument, None]: yield OutputDocument(text='hello world') @@ -184,21 +184,21 @@ async def task5(self, doc: InputDocument, **kwargs) -> Iterator[OutputDocument]: @requests(on='/task6') async def task6( - self, doc: InputDocument, **kwargs + self, doc: InputDocument, **kwargs ) -> AsyncIterator[OutputDocument]: yield OutputDocument(text='hello world') assert ( - GeneratorExecutor.requests_by_class['GeneratorExecutor'][ - f'/{endpoint}' - ].request_schema - == InputDocument + GeneratorExecutor.requests_by_class['GeneratorExecutor'][ + f'/{endpoint}' + ].request_schema + == InputDocument ) assert ( - GeneratorExecutor.requests_by_class['GeneratorExecutor'][ - f'/{endpoint}' - ].response_schema - == OutputDocument + GeneratorExecutor.requests_by_class['GeneratorExecutor'][ + f'/{endpoint}' + ].response_schema + == OutputDocument ) @@ -212,27 +212,77 @@ class InputDoc(BaseDoc): class OutputDoc(BaseDoc): embedding: AnyTensor + t: str - class MyExec(Executor): + class MyExecForFlow(Executor): @requests(on='/bar') def bar(self, docs: DocList[InputDoc], **kwargs) -> DocList[OutputDoc]: docs_return = DocList[OutputDoc]( - [OutputDoc(embedding=np.zeros((100, 1))) for _ in range(len(docs))] + [OutputDoc(embedding=np.zeros((100, 1)), t='hey') for _ in range(len(docs))] ) return docs_return @requests(on='/bar_with_dbatch') @dynamic_batching(preferred_batch_size=4) def bar_with_dbatch( - self, docs: DocList[InputDoc], **kwargs + self, docs: DocList[InputDoc], **kwargs ) -> DocList[OutputDoc]: docs_return = DocList[OutputDoc]( - [OutputDoc(embedding=np.zeros((100, 1))) for _ in range(len(docs))] + [OutputDoc(embedding=np.zeros((100, 1)), t='hey') for _ in range(len(docs))] ) return docs_return ports = [random_port() for _ in protocols] - with Flow(port=ports, protocol=protocols, replicas=replicas).add(uses=MyExec): + with Flow(port=ports, protocol=protocols, replicas=replicas).add(uses=MyExecForFlow): + for port, protocol in zip(ports, protocols): + c = Client(port=port, protocol=protocol) + docs = c.post( + on='/bar', + inputs=InputDoc(img=ImageDoc(tensor=np.zeros((3, 224, 224)))), + return_type=DocList[OutputDoc], + ) + assert docs[0].embedding.shape == (100, 1) + assert docs.__class__.doc_type == OutputDoc + docs = c.post( + on='/bar_with_dbatch', + inputs=InputDoc(img=ImageDoc(tensor=np.zeros((3, 224, 224)))), + return_type=DocList[OutputDoc], + ) + assert docs[0].embedding.shape == (100, 1) + assert docs.__class__.doc_type == OutputDoc + + +@pytest.mark.parametrize( + 'protocols', [['grpc'], ['http'], ['grpc', 'http']] +) +def test_different_output_input_deployment(protocols): + class InputDoc(BaseDoc): + img: ImageDoc + + class OutputDoc(BaseDoc): + embedding: AnyTensor + t: str + + class MyExecForDepl(Executor): + @requests(on='/bar') + def bar(self, docs: DocList[InputDoc], **kwargs) -> DocList[OutputDoc]: + docs_return = DocList[OutputDoc]( + [OutputDoc(embedding=np.zeros((100, 1)), t='hey') for _ in range(len(docs))] + ) + return docs_return + + @requests(on='/bar_with_dbatch') + @dynamic_batching(preferred_batch_size=4) + def bar_with_dbatch( + self, docs: DocList[InputDoc], **kwargs + ) -> DocList[OutputDoc]: + docs_return = DocList[OutputDoc]( + [OutputDoc(embedding=np.zeros((100, 1)), t='hey') for _ in range(len(docs))] + ) + return docs_return + + ports = [random_port() for _ in protocols] + with Deployment(port=ports, protocol=protocols, uses=MyExecForDepl, include_gateway=False): for port, protocol in zip(ports, protocols): c = Client(port=port, protocol=protocol) docs = c.post( @@ -319,20 +369,20 @@ def bar(self, docs: DocList[Output1], **kwargs) -> DocList[Output2]: assert v['output'] == LegacyDocument.schema() v = schema_map['/bar'] assert ( - v['input'] - == _create_pydantic_model_from_schema( - _create_aux_model_doc_list_to_list(Input1).schema(), - 'Input1', - {}, - ).schema() + v['input'] + == _create_pydantic_model_from_schema( + _create_aux_model_doc_list_to_list(Input1).schema(), + 'Input1', + {}, + ).schema() ) assert ( - v['output'] - == _create_pydantic_model_from_schema( - _create_aux_model_doc_list_to_list(Output2).schema(), - 'Output2', - {}, - ).schema() + v['output'] + == _create_pydantic_model_from_schema( + _create_aux_model_doc_list_to_list(Output2).schema(), + 'Output2', + {}, + ).schema() ) @@ -370,7 +420,7 @@ def bar(self, docs: DocList[Output1], **kwargs) -> DocList[Output2]: ports = [random_port() for _ in protocols] with Flow(port=ports, protocol=protocols).add(uses=Exec1Default).add( - uses=Exec2Default + uses=Exec2Default ): for port, protocol in zip(ports, protocols): c = Client(port=port, protocol=protocol) @@ -410,16 +460,16 @@ def bar(self, docs: DocList[Output1], **kwargs) -> DocList[Output2]: assert v['output'] == LegacyDocument.schema() v = schema_map[__default_endpoint__] assert ( - v['input'] - == _create_pydantic_model_from_schema( - _create_aux_model_doc_list_to_list(Input1).schema(), 'Input1', {} - ).schema() + v['input'] + == _create_pydantic_model_from_schema( + _create_aux_model_doc_list_to_list(Input1).schema(), 'Input1', {} + ).schema() ) assert ( - v['output'] - == _create_pydantic_model_from_schema( - _create_aux_model_doc_list_to_list(Output2).schema(), 'Output2', {} - ).schema() + v['output'] + == _create_pydantic_model_from_schema( + _create_aux_model_doc_list_to_list(Output2).schema(), 'Output2', {} + ).schema() ) @@ -449,14 +499,14 @@ def endpoint(self, docs: DocList[DocTest], **kwargs) -> DocList[DocTest]: ports = [random_port() for _ in protocols] flow = ( Flow(protocol=protocols, port=ports) - .add(uses=ExecutorTest, uses_with={'text': 'exec1'}, name='pod0') - .add( + .add(uses=ExecutorTest, uses_with={'text': 'exec1'}, name='pod0') + .add( uses=ExecutorTest, uses_with={'text': 'exec2'}, needs='gateway', name='pod1' ) - .add( + .add( uses=ExecutorTest, uses_with={'text': 'exec3'}, needs='gateway', name='pod2' ) - .add( + .add( needs=['pod0', 'pod1', 'pod2'], uses=ReduceExecutorTest, no_reduce=not reduce, @@ -495,12 +545,12 @@ class ProcessingTestDocConditions(BaseDoc): class ConditionDumpExecutor(Executor): @requests def foo( - self, docs: DocList[ProcessingTestDocConditions], **kwargs + self, docs: DocList[ProcessingTestDocConditions], **kwargs ) -> DocList[ProcessingTestDocConditions]: with open( - os.path.join(str(self.workspace), f'{self.metas.name}.txt'), - 'w', - encoding='utf-8', + os.path.join(str(self.workspace), f'{self.metas.name}.txt'), + 'w', + encoding='utf-8', ) as fp: for doc in docs: fp.write(doc.text) @@ -509,7 +559,7 @@ def foo( class FirstExec(Executor): @requests def foo( - self, docs: DocList[LegacyDocument], **kwargs + self, docs: DocList[LegacyDocument], **kwargs ) -> DocList[ProcessingTestDocConditions]: output_da = DocList[ProcessingTestDocConditions]( [ @@ -522,14 +572,14 @@ def foo( class JoinerExec(Executor): @requests def foo( - self, docs: DocList[ProcessingTestDocConditions], **kwargs + self, docs: DocList[ProcessingTestDocConditions], **kwargs ) -> DocList[ProcessingTestDocConditions]: pass f = ( Flow(protocol=protocol) - .add(uses=FirstExec, name='first') - .add( + .add(uses=FirstExec, name='first') + .add( uses=ConditionDumpExecutor, uses_metas={'name': 'exec1'}, workspace=os.environ['TEMP_WORKSPACE'], @@ -537,7 +587,7 @@ def foo( needs=['first'], when={'tags__type': {'$eq': 1}}, ) - .add( + .add( uses=ConditionDumpExecutor, workspace=os.environ['TEMP_WORKSPACE'], uses_metas={'name': 'exec2'}, @@ -545,7 +595,7 @@ def foo( needs='first', when={'tags__type': {'$gt': 1}}, ) - .needs_all('joiner', uses=JoinerExec) + .needs_all('joiner', uses=JoinerExec) ) with f: @@ -569,12 +619,12 @@ def foo( assert types_set == {1, 2} with open( - os.path.join(str(tmpdir), 'exec1', '0', f'exec1.txt'), 'r', encoding='utf-8' + os.path.join(str(tmpdir), 'exec1', '0', f'exec1.txt'), 'r', encoding='utf-8' ) as fp: assert fp.read() == 'type1' with open( - os.path.join(str(tmpdir), 'exec2', '0', f'exec2.txt'), 'r', encoding='utf-8' + os.path.join(str(tmpdir), 'exec2', '0', f'exec2.txt'), 'r', encoding='utf-8' ) as fp: assert fp.read() == 'type2' @@ -626,7 +676,7 @@ def __init__(self, file_name, *args, **kwargs): @requests def foo( - self, docs: DocList[LegacyDocument], **kwargs + self, docs: DocList[LegacyDocument], **kwargs ) -> DocList[LegacyDocument]: time.sleep(TIME_SLEEP_FLOATING) with open(self.file_name, 'a+', encoding='utf-8') as f: @@ -641,8 +691,8 @@ def foo( f = ( Flow(protocol=protocol) - .add(name='first') - .add( + .add(name='first') + .add( name='second', floating=True, uses=FloatingTestExecutor, @@ -658,8 +708,8 @@ def foo( ) end_time = time.time() assert ( - end_time - start_time - ) < TIME_SLEEP_FLOATING # check that the response arrives before the + end_time - start_time + ) < TIME_SLEEP_FLOATING # check that the response arrives before the # Floating Executor finishes assert len(ret) == 1 assert ret[0].text == '' @@ -727,16 +777,16 @@ def test_send_parameters(protocol, ctxt_manager): if ctxt_manager == 'deployment' and protocol == 'websocket': return - class Foo(Executor): + class FooSendExecutor(Executor): @requests(on='/hello') def foo(self, docs: DocList[TextDoc], parameters, **kwargs) -> DocList[TextDoc]: for doc in docs: doc.text += f'Processed by foo with {parameters["param"]}' if ctxt_manager == 'flow': - ctxt_mgr = Flow(protocol=protocol).add(uses=Foo) + ctxt_mgr = Flow(protocol=protocol).add(uses=FooSendExecutor) else: - ctxt_mgr = Deployment(protocol=protocol, uses=Foo) + ctxt_mgr = Deployment(protocol=protocol, uses=FooSendExecutor) with ctxt_mgr: ret = ctxt_mgr.post( @@ -754,15 +804,15 @@ def test_get_parameters_back(protocol, ctxt_manager): if ctxt_manager == 'deployment' and protocol == 'websocket': return - class Foo(Executor): + class FooParams(Executor): @requests(on='/hello') def foo(self, parameters, **kwargs): return {'back': parameters} if ctxt_manager == 'flow': - ctxt_mgr = Flow(protocol=protocol).add(uses=Foo, name='foo') + ctxt_mgr = Flow(protocol=protocol).add(uses=FooParams, name='foo') else: - ctxt_mgr = Deployment(protocol=protocol, uses=Foo, name='foo') + ctxt_mgr = Deployment(protocol=protocol, uses=FooParams, name='foo') with ctxt_mgr: ret = ctxt_mgr.post( @@ -783,15 +833,15 @@ def test_raise_exception(protocol, ctxt_manager): if ctxt_manager == 'deployment' and protocol == 'websocket': return - class Foo(Executor): + class FooExcep(Executor): @requests(on='/hello') def foo(self, **kwargs): raise Exception('Raising some exception from Executor') if ctxt_manager == 'flow': - ctxt_mgr = Flow(protocol=protocol).add(uses=Foo, name='foo') + ctxt_mgr = Flow(protocol=protocol).add(uses=FooExcep, name='foo') else: - ctxt_mgr = Deployment(protocol=protocol, uses=Foo, name='foo') + ctxt_mgr = Deployment(protocol=protocol, uses=FooExcep, name='foo') with ctxt_mgr: if protocol == 'http': @@ -850,10 +900,10 @@ async def get_endpoint_stream_docs(text: str): ] ) async for resp in self.streamer.stream_docs( - docs, - parameters=PARAMETERS, - target_executor='executor1', - return_type=DocList[TextDoc], + docs, + parameters=PARAMETERS, + target_executor='executor1', + return_type=DocList[TextDoc], ): assert resp.doc_type is TextDoc return {'result': [doc.text for doc in resp]} @@ -867,10 +917,10 @@ async def get_endpoint_stream(text: str): ] ) async for resp, _ in self.streamer.stream( - docs, - parameters=PARAMETERS, - target_executor='executor1', - return_type=DocList[TextDoc], + docs, + parameters=PARAMETERS, + target_executor='executor1', + return_type=DocList[TextDoc], ): assert resp.doc_type is TextDoc return {'result': [doc.text for doc in resp]} @@ -886,13 +936,13 @@ def func(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]: class SecondExec(Executor): @requests def func( - self, docs: DocList[TextDoc], parameters, **kwargs + self, docs: DocList[TextDoc], parameters, **kwargs ) -> DocList[TextDoc]: for doc in docs: doc.text += f' Second(parameters={str(parameters)})' with Flow().config_gateway(uses=MyGateway, protocol='http').add( - uses=FirstExec, name='executor0' + uses=FirstExec, name='executor0' ).add(uses=SecondExec, name='executor1') as flow: import requests @@ -921,16 +971,16 @@ def test_any_endpoint(protocol, ctxt_manager): if ctxt_manager == 'deployment' and protocol == 'websocket': return - class Foo(Executor): + class FooAny(Executor): @requests def foo(self, docs: DocList[TextDoc], parameters, **kwargs) -> DocList[TextDoc]: for doc in docs: doc.text = 'Foo' if ctxt_manager == 'flow': - ctxt_mgr = Flow(protocol=protocol).add(uses=Foo, name='foo') + ctxt_mgr = Flow(protocol=protocol).add(uses=FooAny, name='foo') else: - ctxt_mgr = Deployment(protocol=protocol, uses=Foo, name='foo') + ctxt_mgr = Deployment(protocol=protocol, uses=FooAny, name='foo') with ctxt_mgr: ret = ctxt_mgr.post(on='/index', inputs=DocList[TextDoc]([TextDoc(text='')])) @@ -1046,10 +1096,10 @@ def foo(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]: f = ( Flow(protocol=protocol) - .add(uses=Previous, name='previous') - .add(uses=First, name='first', needs='previous') - .add(uses=Second, name='second', needs='previous') - .needs_all() + .add(uses=Previous, name='previous') + .add(uses=First, name='first', needs='previous') + .add(uses=Second, name='second', needs='previous') + .needs_all() ) with pytest.raises(RuntimeFailToStart): @@ -1119,7 +1169,7 @@ def foo(self, docs, *args, **kwargs): @pytest.mark.parametrize('num_shards', [1, 2], indirect=True) @pytest.mark.parametrize('protocol', ['grpc', 'http', 'websocket']) def test_flow_with_external_deployment( - external_deployment, external_deployment_args, input_docs, num_shards, protocol + external_deployment, external_deployment_args, input_docs, num_shards, protocol ): with external_deployment: external_args = vars(external_deployment_args) @@ -1149,7 +1199,7 @@ class InputDoc(BaseDoc): class OutputDoc(BaseDoc): embedding: AnyTensor - class MyExec(Executor): + class MyExecD(Executor): @requests(on='/bar') def bar(self, docs: DocList[InputDoc], **kwargs) -> DocList[OutputDoc]: docs_return = DocList[OutputDoc]( @@ -1158,7 +1208,7 @@ def bar(self, docs: DocList[InputDoc], **kwargs) -> DocList[OutputDoc]: return docs_return ports = [random_port() for _ in protocols] - with Deployment(port=ports, protocol=protocols, replicas=replicas, uses=MyExec): + with Deployment(port=ports, protocol=protocols, replicas=replicas, uses=MyExecD): for port, protocol in zip(ports, protocols): c = Client(port=port, protocol=protocol) docs = c.post( @@ -1191,7 +1241,7 @@ class OutputComplexDoc(BaseDoc): class MyComplexServeExec(Executor): @requests(on='/bar') def bar( - self, docs: DocList[InputComplexDoc], **kwargs + self, docs: DocList[InputComplexDoc], **kwargs ) -> DocList[OutputComplexDoc]: docs_return = DocList[OutputComplexDoc]( [ @@ -1261,14 +1311,14 @@ def __init__(self, *args, **kwargs): @requests(on=['/index']) def index( - self, docs: DocList[TextDocWithId], **kwargs + self, docs: DocList[TextDocWithId], **kwargs ) -> DocList[TextDocWithId]: for doc in docs: self._docs_dict[doc.id] = doc @requests(on=['/search']) def search( - self, docs: DocList[TextDocWithId], **kwargs + self, docs: DocList[TextDocWithId], **kwargs ) -> DocList[TextDocWithId]: for doc in docs: doc.text = self._docs_dict[doc.id].text @@ -1312,14 +1362,14 @@ def __init__(self, sleep_time=0.1, *args, **kwargs): @requests(on=['/index']) def index( - self, docs: DocList[TextDocWithId], **kwargs + self, docs: DocList[TextDocWithId], **kwargs ) -> DocList[TextDocWithId]: for doc in docs: self._docs.append(doc) @requests(on=['/search']) def search( - self, docs: DocList[TextDocWithId], **kwargs + self, docs: DocList[TextDocWithId], **kwargs ) -> DocList[ResultTestDoc]: resp = DocList[ResultTestDoc]() for q in docs: @@ -1328,10 +1378,10 @@ def search( return resp with Deployment( - uses=SimilarityTestIndexer, - uses_with={'sleep_time': sleep_time}, - shards=2, - reduce=reduce, + uses=SimilarityTestIndexer, + uses_with={'sleep_time': sleep_time}, + shards=2, + reduce=reduce, ) as dep: index_da = DocList[TextDocWithId]( [TextDocWithId(id=f'{i}', text=f'ID {i}') for i in range(10)] @@ -1378,14 +1428,14 @@ def __init__(self, sleep_time=0.1, *args, **kwargs): @requests(on=['/index']) def index( - self, docs: DocList[TextDocWithId], **kwargs + self, docs: DocList[TextDocWithId], **kwargs ) -> DocList[TextDocWithId]: for doc in docs: self._docs.append(doc) @requests(on=['/search']) def search( - self, docs: DocList[TextDocWithId], **kwargs + self, docs: DocList[TextDocWithId], **kwargs ) -> DocList[ResultTestDoc]: resp = DocList[ResultTestDoc]() for q in docs: @@ -1395,10 +1445,10 @@ def search( ports = [random_port() for _ in protocols] with Flow(protocol=protocols, port=ports).add( - uses=SimilarityTestIndexer, - uses_with={'sleep_time': sleep_time}, - shards=2, - reduce=reduce, + uses=SimilarityTestIndexer, + uses_with={'sleep_time': sleep_time}, + shards=2, + reduce=reduce, ): for port, protocol in zip(ports, protocols): c = Client(port=port, protocol=protocol) @@ -1426,10 +1476,10 @@ class MyDocWithMatchesAndScores(MyDoc): matches: DocList[MyDoc] scores: List[float] - class MyExec(Executor): + class MyExecIssueShards(Executor): @requests(on='/search') def foo( - self, docs: DocList[MyDoc], **kwargs + self, docs: DocList[MyDoc], **kwargs ) -> DocList[MyDocWithMatchesAndScores]: res = DocList[MyDocWithMatchesAndScores]() for doc in docs: @@ -1443,7 +1493,7 @@ def foo( res.append(new_doc) return res - d = Deployment(uses=MyExec, shards=2) + d = Deployment(uses=MyExecIssueShards, shards=2) with d: res = d.post( on='/search', @@ -1521,7 +1571,7 @@ class OutputDocMonitor(BaseDoc): class MonitorExecTest(Executor): @requests def foo( - self, docs: DocList[InputDocMonitor], **kwargs + self, docs: DocList[InputDocMonitor], **kwargs ) -> DocList[OutputDocMonitor]: ret = DocList[OutputDocMonitor]() for doc in docs: @@ -1563,7 +1613,7 @@ class Config: class MyExecDocWithExample(Executor): @requests def foo( - self, docs: DocList[MyDocWithExample], **kwargs + self, docs: DocList[MyDocWithExample], **kwargs ) -> DocList[MyDocWithExample]: pass @@ -1601,7 +1651,7 @@ class MyInputModel(BaseDoc): class MyFailingExecutor(Executor): @requests(on='/generate') def generate( - self, docs: DocList[MyInputModel], **kwargs + self, docs: DocList[MyInputModel], **kwargs ) -> DocList[MyRandomModel]: return DocList[MyRandomModel]([doc.b for doc in docs])