Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement proper FK in MetaDocumentORM and MetaLabelORM to work on PostgreSQL #1990

Merged
merged 25 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
fbbca6e
Add unique contraint to avoid PSQL crash and debug hanging tests
ZanSara Jan 10, 2022
7b7a271
Tests seems to be runnable on PG now, still to verify
ZanSara Jan 11, 2022
caadc97
Add latest docstring and tutorial changes
github-actions[bot] Jan 11, 2022
5797f4a
Merge branch 'master' into fix_postgresql_composite_index_2
ZanSara Jan 11, 2022
250d44d
Forgot comma
ZanSara Jan 11, 2022
71a24d3
Merge branch 'fix_postgresql_composite_index_2' of github.com:deepset…
ZanSara Jan 11, 2022
1addbd7
Fix a few more issues left and add isolation_level to set.config()
ZanSara Jan 11, 2022
463ba44
Properly fix MetaDocumentORM and MetaLabelORM with composite foreign …
ZanSara Jan 11, 2022
bb534ee
Fix mypy
ZanSara Jan 11, 2022
0e2ebfb
update_document_meta() was not using index properly
ZanSara Jan 12, 2022
c1589a8
Add latest docstring and tutorial changes
github-actions[bot] Jan 12, 2022
681b036
Another small fix for test_weaviate.py
ZanSara Jan 12, 2022
146cf3c
Merge branch 'fix_postgresql_composite_index_2' of github.com:deepset…
ZanSara Jan 12, 2022
65e7bf3
Improve dosctrings for 'isolation_level'
ZanSara Jan 12, 2022
9c3d8fa
Add latest docstring and tutorial changes
github-actions[bot] Jan 12, 2022
115f072
Implement Thomas' feedback
ZanSara Jan 13, 2022
de676e4
Merge branch 'master' into fix_postgresql_composite_index_2
ZanSara Jan 13, 2022
d2a6ad5
Fix bug introduced in merge
ZanSara Jan 13, 2022
b96f83b
Typo
ZanSara Jan 13, 2022
0cb19f4
Another typo from the merge
ZanSara Jan 13, 2022
afe8483
Exclude ES and Memory from the cosine_sanity_check test
ZanSara Jan 14, 2022
3b669b3
Bug in fixture name
ZanSara Jan 14, 2022
d9e14e0
move ensure_ids_are_correct_uuids in conftest and move one test back …
ZanSara Jan 14, 2022
c1913df
Move back tests to faiss&milvus suite, cannot parametrize them properly
ZanSara Jan 14, 2022
70bde27
Remove one failing test for Weaviate
ZanSara Jan 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions docs/_src/api/api/document_store.md
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ Write annotation labels into document store.
#### update\_document\_meta

```python
| update_document_meta(id: str, meta: Dict[str, str], headers: Optional[Dict[str, str]] = None)
| update_document_meta(id: str, meta: Dict[str, str], headers: Optional[Dict[str, str]] = None, index: str = None)
```

Update the metadata dictionary of a document by specifying its string id
Expand Down Expand Up @@ -952,7 +952,7 @@ class SQLDocumentStore(BaseDocumentStore)
#### \_\_init\_\_

```python
| __init__(url: str = "sqlite://", index: str = "document", label_index: str = "label", duplicate_documents: str = "overwrite", check_same_thread: bool = False)
| __init__(url: str = "sqlite://", index: str = "document", label_index: str = "label", duplicate_documents: str = "overwrite", check_same_thread: bool = False, isolation_level: str = None)
```

An SQL backed DocumentStore. Currently supports SQLite, PostgreSQL and MySQL backends.
Expand All @@ -970,6 +970,7 @@ An SQL backed DocumentStore. Currently supports SQLite, PostgreSQL and MySQL bac
fail: an error is raised if the document ID of the document being added already
exists.
- `check_same_thread`: Set to False to mitigate multithreading issues in older SQLite versions (see https://docs.sqlalchemy.org/en/14/dialects/sqlite.html?highlight=check_same_thread#threading-pooling-behavior)
- `isolation_level`: see SQLAlchemy's `isolation_level` parameter for `create_engine()`

<a name="sql.SQLDocumentStore.get_document_by_id"></a>
#### get\_document\_by\_id
Expand Down Expand Up @@ -1094,7 +1095,7 @@ Set vector IDs for all documents as None
#### update\_document\_meta

```python
| update_document_meta(id: str, meta: Dict[str, str])
| update_document_meta(id: str, meta: Dict[str, str], index: str = None)
```

Update the metadata dictionary of a document by specifying its string id
Expand Down Expand Up @@ -1202,7 +1203,7 @@ the vector embeddings are indexed in a FAISS Index.
#### \_\_init\_\_

```python
| __init__(sql_url: str = "sqlite:///faiss_document_store.db", vector_dim: int = None, embedding_dim: int = 768, faiss_index_factory_str: str = "Flat", faiss_index: Optional["faiss.swigfaiss.Index"] = None, return_embedding: bool = False, index: str = "document", similarity: str = "dot_product", embedding_field: str = "embedding", progress_bar: bool = True, duplicate_documents: str = 'overwrite', faiss_index_path: Union[str, Path] = None, faiss_config_path: Union[str, Path] = None, **kwargs, ,)
| __init__(sql_url: str = "sqlite:///faiss_document_store.db", vector_dim: int = None, embedding_dim: int = 768, faiss_index_factory_str: str = "Flat", faiss_index: Optional["faiss.swigfaiss.Index"] = None, return_embedding: bool = False, index: str = "document", similarity: str = "dot_product", embedding_field: str = "embedding", progress_bar: bool = True, duplicate_documents: str = 'overwrite', faiss_index_path: Union[str, Path] = None, faiss_config_path: Union[str, Path] = None, isolation_level: str = None, **kwargs, ,)
```

**Arguments**:
Expand Down Expand Up @@ -1479,7 +1480,7 @@ Usage:
#### \_\_init\_\_

```python
| __init__(sql_url: str = "sqlite:///", milvus_url: str = "tcp://localhost:19530", connection_pool: str = "SingletonThread", index: str = "document", vector_dim: int = None, embedding_dim: int = 768, index_file_size: int = 1024, similarity: str = "dot_product", index_type: IndexType = IndexType.FLAT, index_param: Optional[Dict[str, Any]] = None, search_param: Optional[Dict[str, Any]] = None, return_embedding: bool = False, embedding_field: str = "embedding", progress_bar: bool = True, duplicate_documents: str = 'overwrite', **kwargs, ,)
| __init__(sql_url: str = "sqlite:///", milvus_url: str = "tcp://localhost:19530", connection_pool: str = "SingletonThread", index: str = "document", vector_dim: int = None, embedding_dim: int = 768, index_file_size: int = 1024, similarity: str = "dot_product", index_type: IndexType = IndexType.FLAT, index_param: Optional[Dict[str, Any]] = None, search_param: Optional[Dict[str, Any]] = None, return_embedding: bool = False, embedding_field: str = "embedding", progress_bar: bool = True, duplicate_documents: str = 'overwrite', isolation_level: str = None, **kwargs, ,)
```

**Arguments**:
Expand Down Expand Up @@ -1862,7 +1863,7 @@ None
#### update\_document\_meta

```python
| update_document_meta(id: str, meta: Dict[str, str])
| update_document_meta(id: str, meta: Dict[str, str], index: str = None)
```

Update the metadata dictionary of a document by specifying its string id.
Expand Down
4 changes: 3 additions & 1 deletion haystack/document_stores/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,10 +551,12 @@ def write_labels(
if labels_to_index:
bulk(self.client, labels_to_index, request_timeout=300, refresh=self.refresh_type, headers=headers)

def update_document_meta(self, id: str, meta: Dict[str, str], headers: Optional[Dict[str, str]] = None):
def update_document_meta(self, id: str, meta: Dict[str, str], headers: Optional[Dict[str, str]] = None, index: str = None):
"""
Update the metadata dictionary of a document by specifying its string id
"""
if not index:
index = self.index
body = {"doc": meta}
self.client.update(index=self.index, id=id, body=body, refresh=self.refresh_type, headers=headers)
ZanSara marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
7 changes: 5 additions & 2 deletions haystack/document_stores/faiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(
duplicate_documents: str = 'overwrite',
faiss_index_path: Union[str, Path] = None,
faiss_config_path: Union[str, Path] = None,
isolation_level: str = None,
ZanSara marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
):
"""
Expand Down Expand Up @@ -115,7 +116,8 @@ def __init__(
index=index,
similarity=similarity,
embedding_field=embedding_field,
progress_bar=progress_bar
progress_bar=progress_bar,
isolation_level=isolation_level
)

if similarity in ("dot_product", "cosine"):
Expand Down Expand Up @@ -155,7 +157,8 @@ def __init__(
super().__init__(
url=sql_url,
index=index,
duplicate_documents=duplicate_documents
duplicate_documents=duplicate_documents,
isolation_level=isolation_level
)

self._validate_index_sync()
Expand Down
5 changes: 4 additions & 1 deletion haystack/document_stores/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(
embedding_field: str = "embedding",
progress_bar: bool = True,
duplicate_documents: str = 'overwrite',
isolation_level: str = None,
**kwargs,
):
"""
Expand Down Expand Up @@ -104,6 +105,7 @@ def __init__(
embedding_dim=embedding_dim, index_file_size=index_file_size, similarity=similarity, index_type=index_type, index_param=index_param,
search_param=search_param, duplicate_documents=duplicate_documents,
return_embedding=return_embedding, embedding_field=embedding_field, progress_bar=progress_bar,
isolation_level=isolation_level
)

self.milvus_server = Milvus(uri=milvus_url, pool=connection_pool)
Expand Down Expand Up @@ -139,7 +141,8 @@ def __init__(
super().__init__(
url=sql_url,
index=index,
duplicate_documents=duplicate_documents
duplicate_documents=duplicate_documents,
isolation_level=isolation_level,
)

def __del__(self):
Expand Down
5 changes: 4 additions & 1 deletion haystack/document_stores/milvus2x.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def __init__(
custom_fields: Optional[List[Any]] = None,
progress_bar: bool = True,
duplicate_documents: str = 'overwrite',
isolation_level: str = None,
):
"""
:param sql_url: SQL connection URL for storing document texts and metadata. It defaults to a local, file based SQLite DB. For large scale
Expand Down Expand Up @@ -127,6 +128,7 @@ def __init__(
search_param=search_param, duplicate_documents=duplicate_documents, id_field=id_field,
return_embedding=return_embedding, embedding_field=embedding_field, progress_bar=progress_bar,
custom_fields=custom_fields,
isolation_level=isolation_level
)

logger.warning("Milvus2DocumentStore is in experimental state until Milvus 2.0 is released")
Expand Down Expand Up @@ -173,7 +175,8 @@ def __init__(
super().__init__(
url=sql_url,
index=index,
duplicate_documents=duplicate_documents
duplicate_documents=duplicate_documents,
isolation_level=isolation_level,
)

def _create_collection_and_index_if_not_exist(
Expand Down
71 changes: 35 additions & 36 deletions haystack/document_stores/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import itertools
import numpy as np
from uuid import uuid4
from sqlalchemy import and_, func, create_engine, Column, String, DateTime, ForeignKey, Boolean, Text, text, JSON
from sqlalchemy import and_, func, create_engine, Column, String, DateTime, ForeignKey, Boolean, Text, text, JSON, ForeignKeyConstraint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from sqlalchemy.sql import case, null
Expand Down Expand Up @@ -33,9 +33,6 @@ class DocumentORM(ORMBase):
# primary key in combination with id to allow the same doc in different indices
index = Column(String(100), nullable=False, primary_key=True)
vector_id = Column(String(100), unique=True, nullable=True)

# labels = relationship("LabelORM", back_populates="document")

# speeds up queries for get_documents_by_vector_ids() by having a single query that returns joined metadata
meta = relationship("MetaDocumentORM", back_populates="documents", lazy="joined")

Expand All @@ -45,36 +42,18 @@ class MetaDocumentORM(ORMBase):

name = Column(String(100), index=True)
value = Column(String(1000), index=True)
document_id = Column(
String(100),
ForeignKey("document.id", ondelete="CASCADE", onupdate="CASCADE"),
nullable=False,
index=True
)

documents = relationship("DocumentORM", back_populates="meta")


class MetaLabelORM(ORMBase):
__tablename__ = "meta_label"

name = Column(String(100), index=True)
value = Column(String(1000), index=True)
label_id = Column(
String(100),
ForeignKey("label.id", ondelete="CASCADE", onupdate="CASCADE"),
nullable=False,
index=True
)

labels = relationship("LabelORM", back_populates="meta")
document_id = Column(String(100), nullable=False, index=True)
document_index = Column(String(100), nullable=False, index=True)
__table_args__ = (ForeignKeyConstraint([document_id, document_index],
[DocumentORM.id, DocumentORM.index],
ondelete="CASCADE", onupdate="CASCADE"), {}) #type: ignore


class LabelORM(ORMBase):
__tablename__ = "label"

# document_id = Column(String(100), ForeignKey("document.id", ondelete="CASCADE", onupdate="CASCADE"), nullable=False)

index = Column(String(100), nullable=False, primary_key=True)
query = Column(Text, nullable=False)
answer = Column(JSON, nullable=True)
Expand All @@ -86,7 +65,21 @@ class LabelORM(ORMBase):
pipeline_id = Column(String(500), nullable=True)

meta = relationship("MetaLabelORM", back_populates="labels", lazy="joined")
# document = relationship("DocumentORM", back_populates="labels")


class MetaLabelORM(ORMBase):
__tablename__ = "meta_label"

name = Column(String(100), index=True)
value = Column(String(1000), index=True)
labels = relationship("LabelORM", back_populates="meta")

label_id = Column(String(100), nullable=False, index=True)
label_index = Column(String(100), nullable=False, index=True)
__table_args__ = (ForeignKeyConstraint([label_id, label_index],
[LabelORM.id, LabelORM.index],
ondelete="CASCADE", onupdate="CASCADE"), {}) #type: ignore



class SQLDocumentStore(BaseDocumentStore):
Expand All @@ -96,7 +89,8 @@ def __init__(
index: str = "document",
label_index: str = "label",
duplicate_documents: str = "overwrite",
check_same_thread: bool = False
check_same_thread: bool = False,
isolation_level: str = None
):
"""
An SQL backed DocumentStore. Currently supports SQLite, PostgreSQL and MySQL backends.
Expand All @@ -112,18 +106,21 @@ def __init__(
fail: an error is raised if the document ID of the document being added already
exists.
:param check_same_thread: Set to False to mitigate multithreading issues in older SQLite versions (see https://docs.sqlalchemy.org/en/14/dialects/sqlite.html?highlight=check_same_thread#threading-pooling-behavior)
:param isolation_level: see SQLAlchemy's `isolation_level` parameter for `create_engine()`
ZanSara marked this conversation as resolved.
Show resolved Hide resolved
"""

# save init parameters to enable export of component config as YAML
self.set_config(
url=url, index=index, label_index=label_index, duplicate_documents=duplicate_documents, check_same_thread=check_same_thread
)

create_engine_params = {}
if isolation_level:
create_engine_params["isolation_level"] = isolation_level
if "sqlite" in url:
engine = create_engine(url, connect_args={'check_same_thread': check_same_thread})
engine = create_engine(url, connect_args={'check_same_thread': check_same_thread}, **create_engine_params)
else:
engine = create_engine(url)
ORMBase.metadata.create_all(engine)
engine = create_engine(url, **create_engine_params)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
self.session = Session()
self.index: str = index
Expand Down Expand Up @@ -461,12 +458,14 @@ def reset_vector_ids(self, index: Optional[str] = None):
self.session.query(DocumentORM).filter_by(index=index).update({DocumentORM.vector_id: null()})
self.session.commit()

def update_document_meta(self, id: str, meta: Dict[str, str]):
def update_document_meta(self, id: str, meta: Dict[str, str], index: str = None):
"""
Update the metadata dictionary of a document by specifying its string id
"""
self.session.query(MetaDocumentORM).filter_by(document_id=id).delete()
meta_orms = [MetaDocumentORM(name=key, value=value, document_id=id) for key, value in meta.items()]
if not index:
index = self.index
self.session.query(MetaDocumentORM).filter_by(document_id=id, document_index=index).delete()
meta_orms = [MetaDocumentORM(name=key, value=value, document_id=id, document_index=index) for key, value in meta.items()]
for m in meta_orms:
self.session.add(m)
self.session.commit()
Expand Down
4 changes: 3 additions & 1 deletion haystack/document_stores/weaviate.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,10 +483,12 @@ def write_documents(
progress_bar.update(batch_size)
progress_bar.close()

def update_document_meta(self, id: str, meta: Dict[str, str]):
def update_document_meta(self, id: str, meta: Dict[str, str], index: str = None):
"""
Update the metadata dictionary of a document by specifying its string id.
"""
if not index:
index = self.index
self.weaviate_client.data_object.update(meta, class_name=self.index, uuid=id)
ZanSara marked this conversation as resolved.
Show resolved Hide resolved

def get_embedding_count(self, filters: Optional[Dict[str, List[str]]] = None, index: Optional[str] = None) -> int:
Expand Down
Loading