Skip to content

Commit

Permalink
Remove Parallel (Processes) in Marqo (#523)
Browse files Browse the repository at this point in the history
* remove parallel

* remove parallel

* remove parallel

* remove processes in unittests

* remove processes in unittests

* add an error for processes

* fix device

* fix device

* remove files and respond to farshid requests
  • Loading branch information
wanliAlex authored Jun 30, 2023
1 parent 5bcf3ae commit 183ab42
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 383 deletions.
3 changes: 1 addition & 2 deletions src/marqo/tensor_search/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ def add_or_replace_documents(
refresh: bool = True,
marqo_config: config.Config = Depends(generate_config),
batch_size: int = 0,
processes: int = 1,
non_tensor_fields: List[str] = Query(default=[]),
device: str = Depends(api_validation.validate_device),
use_existing_tensors: bool = False,
Expand All @@ -194,7 +193,7 @@ def add_or_replace_documents(
with RequestMetricsStore.for_request().time(f"POST /indexes/{index_name}/documents"):
return tensor_search.add_documents_orchestrator(
config=marqo_config, add_docs_params=add_docs_params,
batch_size=batch_size, processes=processes
batch_size=batch_size
)


Expand Down
245 changes: 0 additions & 245 deletions src/marqo/tensor_search/parallel.py

This file was deleted.

52 changes: 9 additions & 43 deletions src/marqo/tensor_search/tensor_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
EnvVars
)
from marqo.tensor_search.enums import IndexSettingsField as NsField
from marqo.tensor_search import utils, backend, validation, configs, parallel, add_docs
from marqo.tensor_search import utils, backend, validation, configs, add_docs
from marqo.tensor_search.formatting import _clean_doc
from marqo.tensor_search.index_meta_cache import get_cache, get_index_info
from marqo.tensor_search import index_meta_cache
Expand Down Expand Up @@ -230,8 +230,7 @@ def get_stats(config: Config, index_name: str):

def add_documents_orchestrator(
config: Config, add_docs_params: AddDocsParams,
batch_size: int = 0, processes: int = 1,
):
batch_size: int = 0):
# Default device calculated here and not in add_documents call
if add_docs_params.device is None:
selected_device = utils.read_env_vars_and_defaults("MARQO_BEST_AVAILABLE_DEVICE")
Expand All @@ -241,47 +240,14 @@ def add_documents_orchestrator(
logger.debug(f"No device given for add_documents_orchestrator. Defaulting to best available device: {selected_device}")
else:
add_docs_params_with_device = add_docs_params

if batch_size is None or batch_size == 0:
logger.debug(f"batch_size={batch_size} and processes={processes} - not doing any marqo side batching")
logger.debug(f"batch_size={batch_size} - not doing any marqo side batching")
return add_documents(config=config, add_docs_params=add_docs_params_with_device)
elif processes is not None and processes > 1:

# verify index exists and update cache
try:
backend.get_index_info(config=config, index_name=add_docs_params.index_name)
except errors.IndexNotFoundError:
raise errors.IndexNotFoundError(f"Cannot add documents to non-existent index {add_docs_params.index_name}")

try:
# Empty text search:
# 1. loads model into memory, 2. updates cache for multiprocessing
_vector_text_search(
config=config, index_name=add_docs_params.index_name, query='',
model_auth=add_docs_params.model_auth, device=add_docs_params_with_device.device,
image_download_headers=add_docs_params.image_download_headers)
except Exception as e:
logger.warning(
f"add_documents orchestrator's call to vector text search, prior to parallel add_docs, raised an error. "
f"Continuing to parallel add_docs. "
f"Message: {e}"
)

logger.debug(f"batch_size={batch_size} and processes={processes} - using multi-processing")
results = parallel.add_documents_mp(
config=config, batch_size=batch_size, processes=processes, add_docs_params=add_docs_params_with_device
)
# we need to force the cache to update as it does not propagate using mp
# we just clear this index's entry and it will re-populate when needed next
if add_docs_params.index_name in get_cache():
logger.info(f'deleting cache entry for {add_docs_params.index_name} after parallel add documents')
del get_cache()[add_docs_params.index_name]

return results
else:
if batch_size < 0:
raise errors.InvalidArgError("Batch size can't be less than 1!")
logger.debug(f"batch_size={batch_size} and processes={processes} - batching using a single process")
logger.debug(f"batch_size={batch_size} - batching inside marqo")
return _batch_request(config=config, verbose=False, add_docs_params=add_docs_params_with_device, batch_size=batch_size)


Expand Down Expand Up @@ -643,7 +609,7 @@ def add_documents(config: Config, add_docs_params: AddDocsParams):
# TODO: we may want to use chunks_to_append here to make it uniform with use_existing_tensors and normal vectorisation
chunks.append({**combo_chunk, **chunk_values_for_filtering})
continue

# Add chunks_to_append along with doc metadata to total chunks
for chunk in chunks_to_append:
chunks.append({**chunk, **chunk_values_for_filtering})
Expand Down Expand Up @@ -1279,7 +1245,7 @@ def get_vector_properties_to_search(searchable_attributes: Union[None, List[str]
properties_to_search = list(searchable_attributes_as_vectors.intersection(
index_info.get_vector_properties().keys()
))

# Validation for offset (pagination is single field) if offset not provided, validation is not run.
if len(properties_to_search) != 1 and offset > 0:
human_readable_vector_properties = [v.replace(TensorField.vector_prefix, '') for v in
Expand Down Expand Up @@ -1634,7 +1600,7 @@ def _bulk_vector_text_search(config: Config, queries: List[BulkSearchQueryEntity

if not device:
raise errors.InternalError("_bulk_vector_text_search cannot be called without `device`!")

with RequestMetricsStore.for_request().time("bulk_search.vector.processing_before_opensearch",
lambda t : logger.debug(f"bulk search (tensor) pre-processing: took {t:.3f}ms")
):
Expand All @@ -1657,7 +1623,7 @@ def _bulk_vector_text_search(config: Config, queries: List[BulkSearchQueryEntity
if not aggregate_body:
# Must return empty response, per search query
return create_empty_query_response(queries)

## 5. POST aggregate to /_msearch
responses = bulk_msearch(config, aggregate_body)

Expand Down
4 changes: 2 additions & 2 deletions tests/tensor_search/test_add_documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ def test_add_documents_list_data_type_validation(self):
assert add_res['errors'] is True
assert all(['error' in item for item in add_res['items'] if item['_id'].startswith('to_fail')])

def test_add_documents_orchestrator_set_device_single_process(self):
def test_add_documents_orchestrator_set_device(self):
mock_config = copy.deepcopy(self.config)

mock_vectorise = mock.MagicMock()
Expand All @@ -467,7 +467,7 @@ def run():
index_name=self.index_name_1, device="cuda:22", docs=[{"some": "doc"}, {"som other": "doc"}],
auto_refresh=True,
),
batch_size=1, processes=1
batch_size=1
)
return True

Expand Down
2 changes: 1 addition & 1 deletion tests/tensor_search/test_bulk_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@ def test_limit_results_none(self):
tensor_search.add_documents_orchestrator(
config=self.config, add_docs_params=AddDocsParams(index_name=self.index_name_1,
docs=[{"Title": "a " + (" ".join(random.choices(population=vocab, k=25)))}
for _ in range(700)], auto_refresh=False), processes=4, batch_size=50
for _ in range(700)], auto_refresh=False), batch_size=50
)
tensor_search.refresh_index(config=self.config, index_name=self.index_name_1)

Expand Down
Loading

0 comments on commit 183ab42

Please sign in to comment.