Skip to content

Commit

Permalink
Add options for handling duplicate documents (skip, fail, overwrite) (#…
Browse files Browse the repository at this point in the history
…1088)

* [document_stores] Duplicate document implmentation added for memorystore.

* [document_stores]duplicate documents implementation done for faiss store.

* [document_store] Duplicate document feature added for elasticsearch document store fixed #1069

* [document_store] Duplicate documents feature added for milvus document store and bug fixed in faiss document store fixed #1069

* [document_store] Code refactored fixed #1069

* [document_store]Test cases refactored.

* [document_store] mypy issue fixed.

* [test_case] faiss and milvus test case refactored to support duplicate documents implementation. fixed #1069

* [document_store] duplicate_documents_options code refactored.

* [document_store] Code refactored.
  • Loading branch information
akkefa authored May 25, 2021
1 parent c4ee32d commit b76ed4c
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 78 deletions.
63 changes: 62 additions & 1 deletion haystack/document_store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import numpy as np

from haystack import Document, Label, MultiLabel, BaseComponent
from haystack.errors import DuplicateDocumentError
from haystack.preprocessor.preprocessor import PreProcessor
from haystack.preprocessor.utils import eval_data_from_json, eval_data_from_jsonl, squad_json_to_jsonl

Expand All @@ -19,9 +20,11 @@ class BaseDocumentStore(BaseComponent):
index: Optional[str]
label_index: Optional[str]
similarity: Optional[str]
duplicate_documents_options: tuple = ('skip', 'overwrite', 'fail')

@abstractmethod
def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None):
def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None,
batch_size: int = 10_000, duplicate_documents: Optional[str] = None):
"""
Indexes documents for later queries.
Expand All @@ -32,6 +35,13 @@ def write_documents(self, documents: Union[List[dict], List[Document]], index: O
It can be used for filtering and is accessible in the responses of the Finder.
:param index: Optional name of index where the documents shall be written to.
If None, the DocumentStore's default index (self.index) will be used.
:param batch_size: Number of documents that are passed to bulk function at a time.
:param duplicate_documents: Handle duplicates document based on parameter options.
Parameter options : ( 'skip','overwrite','fail')
skip: Ignore the duplicates documents
overwrite: Update any existing documents with the same ID when adding documents.
fail: an error is raised if the document ID of the document being added already
exists.
:return: None
"""
Expand Down Expand Up @@ -214,3 +224,54 @@ def delete_documents(self, index: Optional[str] = None, filters: Optional[Dict[s
def run(self, documents: List[dict], index: Optional[str] = None, **kwargs): # type: ignore
self.write_documents(documents=documents, index=index)
return kwargs, "output_1"

@abstractmethod
def get_documents_by_id(self, ids: List[str], index: Optional[str] = None,
batch_size: int = 10_000) -> List[Document]:
pass

def _drop_duplicate_documents(self, documents: List[Document]) -> List[Document]:
"""
Drop duplicates documents based on same hash ID
:param documents: A list of Haystack Document objects.
:return: A list of Haystack Document objects.
"""
_hash_ids: list = []
_documents: List[Document] = []

for document in documents:
if document.id in _hash_ids:
logger.warning(f"Duplicate Documents: Document with id '{document.id}' already exists in index "
f"'{self.index}'")
continue
_documents.append(document)
_hash_ids.append(document.id)

return _documents

def _handle_duplicate_documents(self, documents: List[Document], duplicate_documents: Optional[str] = None):
"""
Handle duplicates documents
:param documents: A list of Haystack Document objects.
:param duplicate_documents: Handle duplicates document based on parameter options.
Parameter options : ( 'skip','overwrite','fail')
skip (default option): Ignore the duplicates documents
overwrite: Update any existing documents with the same ID when adding documents.
fail: an error is raised if the document ID of the document being added already
exists.
:return: A list of Haystack Document objects.
"""
if duplicate_documents in ('skip', 'fail'):
documents = self._drop_duplicate_documents(documents)
documents_found = self.get_documents_by_id(ids=[doc.id for doc in documents], index=self.index)
ids_exist_in_db = [doc.id for doc in documents_found]

if len(ids_exist_in_db) > 0 and duplicate_documents == 'fail':
raise DuplicateDocumentError(f"Document with ids '{', '.join(ids_exist_in_db)} already exists"
f" in index = '{self.index}'.")

documents = list(filter(lambda doc: doc.id not in ids_exist_in_db, documents))

return documents
52 changes: 30 additions & 22 deletions haystack/document_store/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from scipy.special import expit
from tqdm.auto import tqdm

from haystack.document_store.base import BaseDocumentStore
from haystack.document_store.base import BaseDocumentStore, DuplicateDocumentError
from haystack import Document, Label
from haystack.utils import get_batches_from_generator

Expand Down Expand Up @@ -43,11 +43,11 @@ def __init__(
ca_certs: Optional[str] = None,
verify_certs: bool = True,
create_index: bool = True,
update_existing_documents: bool = False,
refresh_type: str = "wait_for",
similarity="dot_product",
timeout=30,
return_embedding: bool = False,
duplicate_documents: str = 'overwrite',
):
"""
A DocumentStore using Elasticsearch to store and query the documents for our search.
Expand Down Expand Up @@ -80,11 +80,7 @@ def __init__(
:param scheme: 'https' or 'http', protocol used to connect to your elasticsearch instance
:param ca_certs: Root certificates for SSL: it is a path to certificate authority (CA) certs on disk. You can use certifi package with certifi.where() to find where the CA certs file is located in your machine.
:param verify_certs: Whether to be strict about ca certificates
:param create_index: Whether to try creating a new index (If the index of that name is already existing, we will just continue in any case)
:param update_existing_documents: Whether to update any existing documents with the same ID when adding
documents. When set as True, any document with an existing ID gets updated.
If set to False, an error is raised if the document ID of the document being
added already exists.
:param create_index: Whether to try creating a new index (If the index of that name is already existing, we will just continue in any case
:param refresh_type: Type of ES refresh used to control when changes made by a request (e.g. bulk) are made visible to search.
If set to 'wait_for', continue only after changes are visible (slow, but safe).
If set to 'false', continue directly (fast, but sometimes unintuitive behaviour when docs are not immediately available after ingestion).
Expand All @@ -93,6 +89,12 @@ def __init__(
more performant with DPR embeddings. 'cosine' is recommended if you are using a Sentence BERT model.
:param timeout: Number of seconds after which an ElasticSearch request times out.
:param return_embedding: To return document embedding
:param duplicate_documents: Handle duplicates document based on parameter options.
Parameter options : ( 'skip','overwrite','fail')
skip: Ignore the duplicates documents
overwrite: Update any existing documents with the same ID when adding documents.
fail: an error is raised if the document ID of the document being added already
exists.
"""
# save init parameters to enable export of component config as YAML
Expand All @@ -102,7 +104,7 @@ def __init__(
name_field=name_field, embedding_field=embedding_field, embedding_dim=embedding_dim,
custom_mapping=custom_mapping, excluded_meta_data=excluded_meta_data, analyzer=analyzer, scheme=scheme,
ca_certs=ca_certs, verify_certs=verify_certs, create_index=create_index,
update_existing_documents=update_existing_documents, refresh_type=refresh_type, similarity=similarity,
duplicate_documents=duplicate_documents, refresh_type=refresh_type, similarity=similarity,
timeout=timeout, return_embedding=return_embedding,
)

Expand Down Expand Up @@ -137,7 +139,7 @@ def __init__(
self._create_document_index(index)
self._create_label_index(label_index)

self.update_existing_documents = update_existing_documents
self.duplicate_documents = duplicate_documents
self.refresh_type = refresh_type

def _init_elastic_client(self,
Expand Down Expand Up @@ -303,7 +305,7 @@ def get_document_by_id(self, id: str, index: Optional[str] = None) -> Optional[D
else:
return None

def get_documents_by_id(self, ids: List[str], index: Optional[str] = None) -> List[Document]:
def get_documents_by_id(self, ids: List[str], index: Optional[str] = None) -> List[Document]: # type: ignore
"""Fetch documents by specifying a list of text id strings"""
index = index or self.index
query = {"query": {"ids": {"values": ids}}}
Expand Down Expand Up @@ -349,9 +351,8 @@ def get_metadata_values_by_key(
bucket["value"] = bucket.pop("key")
return buckets

def write_documents(
self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 10_000
):
def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None,
batch_size: int = 10_000,duplicate_documents: Optional[str] = None):
"""
Indexes documents for later queries in Elasticsearch.
Expand All @@ -371,6 +372,13 @@ def write_documents(
should be changed to what you have set for self.text_field and self.name_field.
:param index: Elasticsearch index where the documents should be indexed. If not supplied, self.index will be used.
:param batch_size: Number of documents that are passed to Elasticsearch's bulk function at a time.
:param duplicate_documents: Handle duplicates document based on parameter options.
Parameter options : ( 'skip','overwrite','fail')
skip: Ignore the duplicates documents
overwrite: Update any existing documents with the same ID when adding documents.
fail: an error is raised if the document ID of the document being added already
exists.
:raises DuplicateDocumentError: Exception trigger on duplicate document
:return: None
"""

Expand All @@ -379,17 +387,17 @@ def write_documents(

if index is None:
index = self.index
duplicate_documents = duplicate_documents or self.duplicate_documents
assert duplicate_documents in self.duplicate_documents_options, \
f"duplicate_documents parameter must be {', '.join(self.duplicate_documents_options)}"

field_map = self._create_document_field_map()
document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents]
document_objects = self._handle_duplicate_documents(document_objects, duplicate_documents)
documents_to_index = []
for document in documents:
# Make sure we comply to Document class format
if isinstance(document, dict):
doc = Document.from_dict(document, field_map=self._create_document_field_map())
else:
doc = document

for doc in document_objects:
_doc = {
"_op_type": "index" if self.update_existing_documents else "create",
"_op_type": "index" if duplicate_documents == 'overwrite' else "create",
"_index": index,
**doc.to_dict(field_map=self._create_document_field_map())
} # type: Dict[str, Any]
Expand Down Expand Up @@ -450,7 +458,7 @@ def write_labels(
label.updated_at = label.created_at

_label = {
"_op_type": "index" if self.update_existing_documents else "create",
"_op_type": "index" if self.duplicate_documents == "overwrite" else "create",
"_index": index,
**label.to_dict()
} # type: Dict[str, Any]
Expand Down
43 changes: 28 additions & 15 deletions haystack/document_store/faiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from haystack.retriever.base import BaseRetriever
from haystack.utils import get_batches_from_generator
from scipy.special import expit

from haystack.document_store.base import DuplicateDocumentError

logger = logging.getLogger(__name__)

Expand All @@ -37,11 +37,11 @@ def __init__(
faiss_index_factory_str: str = "Flat",
faiss_index: Optional["faiss.swigfaiss.Index"] = None,
return_embedding: bool = False,
update_existing_documents: bool = False,
index: str = "document",
similarity: str = "dot_product",
embedding_field: str = "embedding",
progress_bar: bool = True,
duplicate_documents: str = 'overwrite',
**kwargs,
):
"""
Expand All @@ -66,23 +66,25 @@ def __init__(
:param faiss_index: Pass an existing FAISS Index, i.e. an empty one that you configured manually
or one with docs that you used in Haystack before and want to load again.
:param return_embedding: To return document embedding
:param update_existing_documents: Whether to update any existing documents with the same ID when adding
documents. When set as True, any document with an existing ID gets updated.
If set to False, an error is raised if the document ID of the document being
added already exists.
:param index: Name of index in document store to use.
:param similarity: The similarity function used to compare document vectors. 'dot_product' is the default sine it is
more performant with DPR embeddings. 'cosine' is recommended if you are using a Sentence BERT model.
:param embedding_field: Name of field containing an embedding vector.
:param progress_bar: Whether to show a tqdm progress bar or not.
Can be helpful to disable in production deployments to keep the logs clean.
:param duplicate_documents: Handle duplicates document based on parameter options.
Parameter options : ( 'skip','overwrite','fail')
skip: Ignore the duplicates documents
overwrite: Update any existing documents with the same ID when adding documents.
fail: an error is raised if the document ID of the document being added already
exists.
"""

# save init parameters to enable export of component config as YAML
self.set_config(
sql_url=sql_url, vector_dim=vector_dim, faiss_index_factory_str=faiss_index_factory_str,
faiss_index=faiss_index, return_embedding=return_embedding,
update_existing_documents=update_existing_documents, index=index, similarity=similarity,
duplicate_documents=duplicate_documents, index=index, similarity=similarity,
embedding_field=embedding_field, progress_bar=progress_bar
)

Expand All @@ -107,10 +109,10 @@ def __init__(
raise ValueError("The FAISS document store can currently only support dot_product similarity. "
"Please set similarity=\"dot_product\"")
self.progress_bar = progress_bar
self.duplicate_documents = duplicate_documents

super().__init__(
url=sql_url,
update_existing_documents=update_existing_documents,
index=index
)

Expand All @@ -130,20 +132,30 @@ def _create_new_index(self, vector_dim: int, metric_type, index_factory: str = "
index = faiss.index_factory(vector_dim, index_factory, metric_type)
return index

def write_documents(
self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, batch_size: int = 10_000
):
def write_documents(self, documents: Union[List[dict], List[Document]], index: Optional[str] = None,
batch_size: int = 10_000, duplicate_documents: Optional[str] = None):
"""
Add new documents to the DocumentStore.
:param documents: List of `Dicts` or List of `Documents`. If they already contain the embeddings, we'll index
them right away in FAISS. If not, you can later call update_embeddings() to create & index them.
:param index: (SQL) index name for storing the docs and metadata
:param batch_size: When working with large number of documents, batching can help reduce memory footprint.
:param duplicate_documents: Handle duplicates document based on parameter options.
Parameter options : ( 'skip','overwrite','fail')
skip: Ignore the duplicates documents
overwrite: Update any existing documents with the same ID when adding documents.
fail: an error is raised if the document ID of the document being added already
exists.
:raises DuplicateDocumentError: Exception trigger on duplicate document
:return:
"""

index = index or self.index
duplicate_documents = duplicate_documents or self.duplicate_documents
assert duplicate_documents in self.duplicate_documents_options, \
f"duplicate_documents parameter must be {', '.join(self.duplicate_documents_options)}"

if not self.faiss_indexes.get(index):
self.faiss_indexes[index] = self._create_new_index(
vector_dim=self.vector_dim,
Expand All @@ -153,11 +165,11 @@ def write_documents(

field_map = self._create_document_field_map()
document_objects = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents]

document_objects = self._handle_duplicate_documents(document_objects, duplicate_documents)
add_vectors = False if document_objects[0].embedding is None else True

if self.update_existing_documents and add_vectors:
logger.warning("You have enabled `update_existing_documents` feature and "
if self.duplicate_documents == "overwrite" and add_vectors:
logger.warning("You have to provide `duplicate_documents = 'overwrite'` arg and "
"`FAISSDocumentStore` does not support update in existing `faiss_index`.\n"
"Please call `update_embeddings` method to repopulate `faiss_index`")

Expand All @@ -176,7 +188,8 @@ def write_documents(
vector_id += 1
docs_to_write_in_sql.append(doc)

super(FAISSDocumentStore, self).write_documents(docs_to_write_in_sql, index=index)
super(FAISSDocumentStore, self).write_documents(docs_to_write_in_sql, index=index,
duplicate_documents=duplicate_documents)

def _create_document_field_map(self) -> Dict:
return {
Expand Down
Loading

0 comments on commit b76ed4c

Please sign in to comment.