Skip to content

Commit

Permalink
Support searchable attributes for unstructured indexes (#968)
Browse files Browse the repository at this point in the history
  • Loading branch information
papa99do authored Oct 7, 2024
1 parent 78d8227 commit fcbc982
Show file tree
Hide file tree
Showing 110 changed files with 8,768 additions and 1,135 deletions.
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ pynvml==11.5.0 # For cuda utilization
readerwriterlock==1.0.9
kazoo==2.10.0
pycurl==7.45.3
huggingface-hub==0.25.0
huggingface-hub==0.25.0
jinja2==3.1.4
2 changes: 2 additions & 0 deletions src/marqo/api/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,7 @@ def default_env_vars() -> dict:
EnvVars.MARQO_INFERENCE_CACHE_SIZE: 0,
EnvVars.MARQO_INFERENCE_CACHE_TYPE: "LRU",
EnvVars.MARQO_BEST_AVAILABLE_DEVICE: "cpu", # on_start_script will determine this.
EnvVars.MARQO_MAX_TENSOR_FIELD_COUNT_UNSTRUCTURED: 100,
EnvVars.MARQO_MAX_LEXICAL_FIELD_COUNT_UNSTRUCTURED: 100,
EnvVars.ZOOKEEPER_CONNECTION_TIMEOUT: 15,
}
40 changes: 40 additions & 0 deletions src/marqo/api/models/add_docs_objects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from typing import List
from typing import Optional, Union, Any, Sequence

import numpy as np
from pydantic import BaseModel, root_validator
from pydantic import Field

from marqo.core.models.add_docs_params import BatchVectorisationMode
from marqo.tensor_search.enums import EnvVars
from marqo.tensor_search.models.private_models import ModelAuth
from marqo.tensor_search.utils import read_env_vars_and_defaults_ints


class AddDocsBodyParams(BaseModel):
"""The parameters of the body parameters of tensor_search_add_documents() function"""

class Config:
arbitrary_types_allowed = True
allow_mutation = False
extra = "forbid" # Raise error on unknown fields

tensorFields: Optional[List] = None
useExistingTensors: bool = False
imageDownloadHeaders: dict = Field(default_factory=dict)
modelAuth: Optional[ModelAuth] = None
mappings: Optional[dict] = None
documents: Union[Sequence[Union[dict, Any]], np.ndarray]
imageDownloadThreadCount: int = Field(default_factory=lambda: read_env_vars_and_defaults_ints(EnvVars.MARQO_IMAGE_DOWNLOAD_THREAD_COUNT_PER_REQUEST))
mediaDownloadThreadCount: Optional[int]
textChunkPrefix: Optional[str] = None
# This parameter is experimental for now. we will add it to the document and py-marqo once it has been verified
batchVectorisationMode: BatchVectorisationMode = BatchVectorisationMode.PER_DOCUMENT

@root_validator
def validate_thread_counts(cls, values):
image_count = values.get('imageDownloadThreadCount')
media_count = values.get('mediaDownloadThreadCount')
if media_count is not None and image_count != read_env_vars_and_defaults_ints(EnvVars.MARQO_IMAGE_DOWNLOAD_THREAD_COUNT_PER_REQUEST):
raise ValueError("Cannot set both imageDownloadThreadCount and mediaDownloadThreadCount")
return values
64 changes: 29 additions & 35 deletions src/marqo/core/document/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@

import marqo.api.exceptions as api_exceptions
from marqo.core.constants import MARQO_DOC_ID
from marqo.core.exceptions import UnsupportedFeatureError, ParsingError
from marqo.core.models.add_docs_params import AddDocsParams
from marqo.core.exceptions import UnsupportedFeatureError, ParsingError, InternalError
from marqo.core.index_management.index_management import IndexManagement
from marqo.core.models.marqo_add_documents_response import MarqoAddDocumentsResponse, MarqoAddDocumentsItem
from marqo.core.models.marqo_index import IndexType
from marqo.core.models.marqo_index import IndexType, SemiStructuredMarqoIndex, StructuredMarqoIndex, \
UnstructuredMarqoIndex
from marqo.core.models.marqo_update_documents_response import MarqoUpdateDocumentsResponse, MarqoUpdateDocumentsItem
from marqo.core.vespa_index import for_marqo_index as vespa_index_factory
from marqo.core.semi_structured_vespa_index.semi_structured_add_document_handler import \
SemiStructuredAddDocumentsHandler, SemiStructuredFieldCountConfig
from marqo.core.structured_vespa_index.structured_add_document_handler import StructuredAddDocumentsHandler
from marqo.core.unstructured_vespa_index.unstructured_add_document_handler import UnstructuredAddDocumentsHandler
from marqo.core.vespa_index.vespa_index import for_marqo_index as vespa_index_factory
from marqo.logging import get_logger
from marqo.vespa.models import UpdateDocumentsBatchResponse, VespaDocument
from marqo.vespa.models.delete_document_response import DeleteAllDocumentsResponse
Expand All @@ -25,6 +31,23 @@ def __init__(self, vespa_client: VespaClient, index_management: IndexManagement)
self.vespa_client = vespa_client
self.index_management = index_management

def add_documents(self, add_docs_params: AddDocsParams,
field_count_config=SemiStructuredFieldCountConfig()) -> MarqoAddDocumentsResponse:
marqo_index = self.index_management.get_index(add_docs_params.index_name)

if isinstance(marqo_index, StructuredMarqoIndex):
add_docs_handler = StructuredAddDocumentsHandler(marqo_index, add_docs_params, self.vespa_client)
elif isinstance(marqo_index, SemiStructuredMarqoIndex):
add_docs_handler = SemiStructuredAddDocumentsHandler(marqo_index, add_docs_params,
self.vespa_client, self.index_management,
field_count_config)
elif isinstance(marqo_index, UnstructuredMarqoIndex):
add_docs_handler = UnstructuredAddDocumentsHandler(marqo_index, add_docs_params, self.vespa_client)
else:
raise InternalError(f"Unknown index type {type(marqo_index)}")

return add_docs_handler.add_documents()

def delete_all_docs_by_index_name(self, index_name: str) -> int:
"""Delete all documents in the given index by index name.
Expand Down Expand Up @@ -81,7 +104,7 @@ def partial_update_documents(self, partial_documents: List[Dict], marqo_index) \
Return:
MarqoUpdateDocumentsResponse containing the response of the partial update operation
"""
if marqo_index.type == IndexType.Unstructured:
if marqo_index.type in [IndexType.Unstructured, IndexType.SemiStructured]:
raise UnsupportedFeatureError("Partial document update is not supported for unstructured indexes. "
"Please use add_documents with use_existing_tensor=True instead")
elif marqo_index.type == IndexType.Structured:
Expand Down Expand Up @@ -136,7 +159,7 @@ def _translate_update_document_response(self, responses: UpdateDocumentsBatchRes
if responses is not None:
for resp in responses.responses:
doc_id = resp.id.split('::')[-1] if resp.id else None
status, message = self.translate_vespa_document_response(resp.status)
status, message = self.vespa_client.translate_vespa_document_response(resp.status, None)
new_item = MarqoUpdateDocumentsItem(id=doc_id, status=status, message=message, error=message)
items.append(new_item)

Expand Down Expand Up @@ -200,7 +223,7 @@ def translate_add_documents_response(self, responses: Optional[FeedBatchResponse
if responses is not None:
for resp in responses.responses:
doc_id = resp.id.split('::')[-1] if resp.id else None
status, message = self.translate_vespa_document_response(resp.status, message=resp.message)
status, message = self.vespa_client.translate_vespa_document_response(resp.status, resp.message)
new_item = MarqoAddDocumentsItem(id=doc_id, status=status, message=message)
new_items.append(new_item)

Expand All @@ -210,32 +233,3 @@ def translate_add_documents_response(self, responses: Optional[FeedBatchResponse

return MarqoAddDocumentsResponse(errors=errors, index_name=index_name, items=new_items,
processingTimeMs=add_docs_processing_time_ms)

def translate_vespa_document_response(self, status: int, message: Optional[str]=None) -> Tuple[int, Optional[str]]:
"""A helper function to translate Vespa document response into the expected status, message that
is used in Marqo document API responses.
Args:
status: The status code from Vespa document response
Return:
A tuple of status code and the message in the response
"""
if status == 200:
return 200, None
elif status == 404:
return 404, "Document does not exist in the index"
# Update documents get 412 from Vespa for document not found as we use condition
elif status == 412:
return 404, "Document does not exist in the index"
elif status == 429:
return 429, "Marqo vector store receives too many requests. Please try again later"
elif status == 507:
return 400, "Marqo vector store is out of memory or disk space"
# TODO Block the invalid special characters before sending to Vespa
elif status == 400 and isinstance(message, str) and "could not parse field" in message.lower():
return 400, f"The document contains invalid characters in the fields. Original error: {message} "
else:
logger.error(f"An unexpected error occurred from the Vespa document response. "
f"status: {status}, message: {message}")
return 500, f"Marqo vector store returns an unexpected error with this document. Original error: {message}"
34 changes: 33 additions & 1 deletion src/marqo/core/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from http import HTTPStatus

from marqo.exceptions import (
MarqoError,
InvalidArgumentError,
Expand Down Expand Up @@ -75,5 +77,35 @@ class InvalidTensorFieldError(MarqoDocumentParsingError):
class UnsupportedFeatureError(InvalidArgumentError):
pass


class ZeroMagnitudeVectorError(InvalidArgumentError):
pass
pass


class FieldTypeMismatchError(InvalidArgumentError):
pass


class ModelError(MarqoError):
pass


class AddDocumentsError(Exception):
status_code: int = int(HTTPStatus.BAD_REQUEST)
error_code: str = 'invalid_argument'
error_message: str

def __init__(self, error_message: str,
error_code: str = 'invalid_argument',
status_code: int = int(HTTPStatus.BAD_REQUEST)) -> None:
self.error_code = error_code
self.error_message = error_message
self.status_code = int(status_code)


class DuplicateDocumentError(AddDocumentsError):
pass


class TooManyFieldsError(MarqoError):
pass
57 changes: 50 additions & 7 deletions src/marqo/core/index_management/index_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@

import marqo.logging
import marqo.vespa.vespa_client
from marqo import version
from marqo import version, marqo_docs
from marqo.core import constants
from marqo.core.distributed_lock.zookeeper_distributed_lock import get_deployment_lock
from marqo.core.exceptions import IndexNotFoundError, ApplicationNotInitializedError
from marqo.core.exceptions import OperationConflictError
from marqo.core.exceptions import ZookeeperLockNotAcquiredError, InternalError
from marqo.core.index_management.vespa_application_package import VespaApplicationPackage, MarqoConfig, \
VespaApplicationFileStore, ApplicationPackageDeploymentSessionStore
from marqo.core.index_management.vespa_application_package import VespaApplicationPackage, VespaApplicationFileStore, \
ApplicationPackageDeploymentSessionStore
from marqo.core.models import MarqoIndex
from marqo.core.models.marqo_index import SemiStructuredMarqoIndex
from marqo.core.models.marqo_index_request import MarqoIndexRequest
from marqo.core.vespa_schema import for_marqo_index_request as vespa_schema_factory
from marqo.core.semi_structured_vespa_index.semi_structured_vespa_schema import SemiStructuredVespaSchema
from marqo.core.vespa_index.vespa_schema import for_marqo_index_request as vespa_schema_factory
from marqo.tensor_search.models.index_settings import IndexSettings
from marqo.vespa.exceptions import VespaStatusError
from marqo.vespa.vespa_client import VespaClient
from marqo.vespa.zookeeper_client import ZookeeperClient

Expand All @@ -27,6 +28,7 @@

class IndexManagement:
_MINIMUM_VESPA_VERSION_TO_SUPPORT_UPLOAD_BINARY_FILES = semver.VersionInfo.parse('8.382.22')
_MINIMUM_VESPA_VERSION_TO_SUPPORT_FAST_FILE_DISTRIBUTION = semver.VersionInfo.parse('8.396.18')
_MARQO_SETTINGS_SCHEMA_NAME = 'marqo__settings'
_MARQO_CONFIG_DOC_ID = 'marqo__config'

Expand Down Expand Up @@ -184,10 +186,30 @@ def batch_delete_indexes_by_name(self, index_names: List[str]) -> None:
with self._vespa_deployment_lock():
self._get_vespa_application().batch_delete_index_setting_and_schema(index_names)

def update_index(self, marqo_index: SemiStructuredMarqoIndex) -> None:
"""
Update index settings and schema
Aars:
marqo_index: Index to update, only SemiStructuredMarqoIndex is supported
Raises:
IndexNotFoundError: If an index does not exist
InternalError: If the index is not a SemiStructuredMarqoIndex.
RuntimeError: If deployment lock is not instantiated
OperationConflictError: If another index creation/deletion operation is
in progress and the lock cannot be acquired
"""
if not isinstance(marqo_index, SemiStructuredMarqoIndex):
# This is just a sanity check, it should not happen since we do not expose this method to end user.
raise InternalError(f'Index {marqo_index.name} can not be updated.')

with self._vespa_deployment_lock():
schema = SemiStructuredVespaSchema.generate_vespa_schema(marqo_index)
self._get_vespa_application().update_index_setting_and_schema(marqo_index, schema)

def _get_existing_indexes(self) -> List[MarqoIndex]:
"""
Get all Marqo indexes storing in _MARQO_SETTINGS_SCHEMA_NAME schema (used prior to Marqo v2.12.0).
This method is now only used to retrieve the existing indexes for bootstrapping from v2.12.0
Get all Marqo indexes storing in _MARQO_SETTINGS_SCHEMA_NAME schema (used prior to Marqo v2.13.0).
This method is now only used to retrieve the existing indexes for bootstrapping from v2.13.0
Returns:
List of Marqo indexes
Expand Down Expand Up @@ -252,6 +274,27 @@ def _get_vespa_application(self, check_configured: bool = True, need_binary_file
The VespaApplicationPackage instance we can use to do bootstrapping/rollback and any index operations.
"""
vespa_version = semver.VersionInfo.parse(self.vespa_client.get_vespa_version())

if vespa_version < self._MINIMUM_VESPA_VERSION_TO_SUPPORT_UPLOAD_BINARY_FILES:
# Please note that this warning message will only be logged out for OS users running Marqo on external
# Vespa servers with version prior to 8.382.22. This will be displayed when Marqo starts up and before
# each index CUD operation
logger.warning(f'Your Vespa version {vespa_version} is lower than the minimum recommended Vespa version '
f'{self._MINIMUM_VESPA_VERSION_TO_SUPPORT_FAST_FILE_DISTRIBUTION}. This could cause '
f'unexpected behavior when bootstrapping Marqo. Please upgrade '
f'Vespa to version {self._MINIMUM_VESPA_VERSION_TO_SUPPORT_FAST_FILE_DISTRIBUTION} or '
f'later. Please see {marqo_docs.troubleshooting()} for more details.')

if vespa_version < self._MINIMUM_VESPA_VERSION_TO_SUPPORT_FAST_FILE_DISTRIBUTION:
# Please note that this warning message will only be logged out for OS users running Marqo on external
# Vespa servers with version prior to 8.396.18. This will be displayed when Marqo starts up and before
# each index CUD operation
logger.warning(f'Your Vespa version {vespa_version} is lower than the minimum recommended Vespa version '
f'{self._MINIMUM_VESPA_VERSION_TO_SUPPORT_FAST_FILE_DISTRIBUTION}. You may encounter slower '
f'response times when creating a Marqo index or adding documents to unstructured indexes. '
f'Please upgrade Vespa to version {self._MINIMUM_VESPA_VERSION_TO_SUPPORT_FAST_FILE_DISTRIBUTION} or '
f'later. Please see {marqo_docs.troubleshooting()} for more details.')

if need_binary_file_support and vespa_version < self._MINIMUM_VESPA_VERSION_TO_SUPPORT_UPLOAD_BINARY_FILES:
# Binary files are only supported using VespaApplicationFileStore prior to Vespa version 8.382.22
application_package_store = VespaApplicationFileStore(
Expand Down
14 changes: 13 additions & 1 deletion src/marqo/core/index_management/vespa_application_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ def save_index_setting(self, index_setting: MarqoIndex) -> None:
current_version = self._index_settings[name].version
if current_version + 1 != target_version:
raise OperationConflictError(f'Conflict in version detected while saving index {name}. '
f'Current version {current_version}, new version {target_version}.')
f'Current version is {current_version}, and cannot be upgraded to '
f'target version {target_version}. Some other request might have changed '
f'the index. Please try again. ')
self._move_to_history(name)
else:
if target_version != 1:
Expand Down Expand Up @@ -678,6 +680,16 @@ def batch_delete_index_setting_and_schema(self, index_names: List[str]) -> None:
self._store.save_file(self._service_xml.to_xml(), self._SERVICES_XML_FILE)
self._deploy()

def update_index_setting_and_schema(self, index: MarqoIndex, schema: str) -> None:
if not self.has_index(index.name):
raise IndexNotFoundError(f"Index {index.name} not found")

version = index.version + 1 if index.version is not None else 1
self._store.save_file(schema, 'schemas', f'{index.schema_name}.sd')
self._index_setting_store.save_index_setting(index.copy(update={'version': version}))
self._persist_index_settings()
self._deploy()

def has_schema(self, name: str) -> bool:
return self._store.file_exists('schemas', f'{name}.sd')

Expand Down
Loading

0 comments on commit fcbc982

Please sign in to comment.