Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Join node should allow reciprocal rank fusion as additional merging method #2133

Merged
merged 14 commits into from
Feb 10, 2022
30 changes: 30 additions & 0 deletions docs/_src/api/api/document_store.md
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,36 @@ def get_documents_by_id(ids: List[str], index: Optional[str] = None) -> List[Doc

Fetch documents by specifying a list of text id strings.

<a id="memory.InMemoryDocumentStore.get_scores_torch"></a>

#### get\_scores\_torch

```python
def get_scores_torch(query_emb: np.ndarray, document_to_search: List[Document]) -> List[float]
```

Calculate similarity scores between query embedding and a list of documents using torch.

**Arguments**:

- `query_emb`: Embedding of the query (e.g. gathered from DPR)
- `document_to_search`: List of documents to compare `query_emb` against.

<a id="memory.InMemoryDocumentStore.get_scores_numpy"></a>

#### get\_scores\_numpy

```python
def get_scores_numpy(query_emb: np.ndarray, document_to_search: List[Document]) -> List[float]
```

Calculate similarity scores between query embedding and a list of documents using numpy.

**Arguments**:

- `query_emb`: Embedding of the query (e.g. gathered from DPR)
- `document_to_search`: List of documents to compare `query_emb` against.

<a id="memory.InMemoryDocumentStore.query_by_embedding"></a>

#### query\_by\_embedding
Expand Down
1 change: 1 addition & 0 deletions docs/_src/api/api/other.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ The node allows multiple join modes:
* concatenate: combine the documents from multiple nodes. Any duplicate documents are discarded.
* merge: merge scores of documents from multiple nodes. Optionally, each input score can be given a different
`weight` & a `top_k` limit can be set. This mode can also be used for "reranking" retrieved documents.
* reciprocal_rank_fusion: combines the documents based on their rank in multiple nodes.

89 changes: 63 additions & 26 deletions haystack/nodes/other/join_docs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from copy import deepcopy
from collections import defaultdict

from typing import Optional, List

from haystack.nodes.base import BaseComponent
Expand All @@ -12,6 +13,7 @@ class JoinDocuments(BaseComponent):
* concatenate: combine the documents from multiple nodes. Any duplicate documents are discarded.
* merge: merge scores of documents from multiple nodes. Optionally, each input score can be given a different
`weight` & a `top_k` limit can be set. This mode can also be used for "reranking" retrieved documents.
* reciprocal_rank_fusion: combines the documents based on their rank in multiple nodes.
"""

outgoing_edges = 1
Expand All @@ -20,14 +22,18 @@ def __init__(
self, join_mode: str = "concatenate", weights: Optional[List[float]] = None, top_k_join: Optional[int] = None
):
"""
:param join_mode: `concatenate` to combine documents from multiple retrievers or `merge` to aggregate scores of
individual documents.
:param join_mode: `concatenate` to combine documents from multiple retrievers `merge` to aggregate scores of
individual documents, `reciprocal_rank_fusion` to apply rank based scoring.
:param weights: A node-wise list(length of list must be equal to the number of input nodes) of weights for
adjusting document scores when using the `merge` join_mode. By default, equal weight is given
to each retriever score. This param is not compatible with the `concatenate` join_mode.
:param top_k_join: Limit documents to top_k based on the resulting scores of the join.
"""
assert join_mode in ["concatenate", "merge"], f"JoinDocuments node does not support '{join_mode}' join_mode."
assert join_mode in [
"concatenate",
"merge",
"reciprocal_rank_fusion",
], f"JoinDocuments node does not support '{join_mode}' join_mode."

assert not (
weights is not None and join_mode == "concatenate"
Expand All @@ -41,33 +47,64 @@ def __init__(
self.top_k_join = top_k_join

def run(self, inputs: List[dict], top_k_join: Optional[int] = None): # type: ignore
results = [inp["documents"] for inp in inputs]
document_map = {doc.id: doc for result in results for doc in result}

if self.join_mode == "concatenate":
document_map = {}
for input_from_node in inputs:
for doc in input_from_node["documents"]:
document_map[doc.id] = doc
scores_map = self._concatenate_results(results)
elif self.join_mode == "merge":
document_map = {}
if self.weights:
weights = self.weights
else:
weights = [1 / len(inputs)] * len(inputs)
for input_from_node, weight in zip(inputs, weights):
for doc in input_from_node["documents"]:
if document_map.get(doc.id): # document already exists; update score
document_map[doc.id].score += doc.score * weight
else: # add the document in map
document_map[doc.id] = deepcopy(doc)
document_map[doc.id].score *= weight
scores_map = self._calculate_comb_sum(results)
elif self.join_mode == "reciprocal_rank_fusion":
scores_map = self._calculate_rrf(results)
else:
raise Exception(f"Invalid join_mode: {self.join_mode}")
raise ValueError(f"Invalid join_mode: {self.join_mode}")

documents = sorted(document_map.values(), key=lambda d: d.score, reverse=True)
sorted_docs = sorted(scores_map.items(), key=lambda d: d[1], reverse=True)

if top_k_join is None:
if not top_k_join:
top_k_join = self.top_k_join
if not top_k_join:
top_k_join = len(sorted_docs)

docs = []
for (id, score) in sorted_docs[:top_k_join]:
doc = document_map[id]
doc.score = score
docs.append(doc)

output = {"documents": docs, "labels": inputs[0].get("labels", None)}

if top_k_join:
documents = documents[:top_k_join]
output = {"documents": documents, "labels": inputs[0].get("labels", None)}
return output, "output_1"

def _concatenate_results(self, results):
"""
Concatenates multiple document result lists.
"""
return {doc.id: doc.score for result in results for doc in result}

def _calculate_comb_sum(self, results):
"""
Calculates a combination sum by multiplying each score by its weight.
"""
scores_map = defaultdict(int)
weights = self.weights if self.weights else [1 / len(results)] * len(results)

for result, weight in zip(results, weights):
for doc in result:
scores_map[doc.id] += doc.score * weight

return scores_map

def _calculate_rrf(self, results):
"""
Calculates the reciprocal rank fusion. The constant K is set to 61 (60 was suggested by the original paper,
plus 1 as python lists are 0-based and the paper used 1-based ranking).
"""
K = 61

scores_map = defaultdict(int)
for result in results:
for rank, doc in enumerate(result):
scores_map[doc.id] += 1 / (K + rank)

return scores_map
103 changes: 96 additions & 7 deletions test/test_standard_pipelines.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pathlib import Path
from collections import defaultdict

import os
import math
Expand Down Expand Up @@ -136,8 +137,7 @@ def test_most_similar_documents_pipeline(retriever, document_store):

@pytest.mark.elasticsearch
@pytest.mark.parametrize("document_store_dot_product_with_docs", ["elasticsearch"], indirect=True)
@pytest.mark.parametrize("reader", ["farm"], indirect=True)
def test_join_document_pipeline(document_store_dot_product_with_docs, reader):
def test_join_merge_no_weights(document_store_dot_product_with_docs):
es = ElasticsearchRetriever(document_store=document_store_dot_product_with_docs)
dpr = DensePassageRetriever(
document_store=document_store_dot_product_with_docs,
Expand All @@ -149,7 +149,6 @@ def test_join_document_pipeline(document_store_dot_product_with_docs, reader):

query = "Where does Carla live?"

# test merge without weights
join_node = JoinDocuments(join_mode="merge")
p = Pipeline()
p.add_node(component=es, name="R1", inputs=["Query"])
Expand All @@ -158,7 +157,21 @@ def test_join_document_pipeline(document_store_dot_product_with_docs, reader):
results = p.run(query=query)
assert len(results["documents"]) == 5

# test merge with weights

@pytest.mark.elasticsearch
@pytest.mark.parametrize("document_store_dot_product_with_docs", ["elasticsearch"], indirect=True)
def test_join_merge_with_weights(document_store_dot_product_with_docs):
es = ElasticsearchRetriever(document_store=document_store_dot_product_with_docs)
dpr = DensePassageRetriever(
document_store=document_store_dot_product_with_docs,
query_embedding_model="facebook/dpr-question_encoder-single-nq-base",
passage_embedding_model="facebook/dpr-ctx_encoder-single-nq-base",
use_gpu=False,
)
document_store_dot_product_with_docs.update_embeddings(dpr)

query = "Where does Carla live?"

join_node = JoinDocuments(join_mode="merge", weights=[1000, 1], top_k_join=2)
p = Pipeline()
p.add_node(component=es, name="R1", inputs=["Query"])
Expand All @@ -168,7 +181,21 @@ def test_join_document_pipeline(document_store_dot_product_with_docs, reader):
assert math.isclose(results["documents"][0].score, 0.5481393431183286, rel_tol=0.0001)
assert len(results["documents"]) == 2

# test concatenate

@pytest.mark.elasticsearch
@pytest.mark.parametrize("document_store_dot_product_with_docs", ["elasticsearch"], indirect=True)
def test_join_concatenate(document_store_dot_product_with_docs):
es = ElasticsearchRetriever(document_store=document_store_dot_product_with_docs)
dpr = DensePassageRetriever(
document_store=document_store_dot_product_with_docs,
query_embedding_model="facebook/dpr-question_encoder-single-nq-base",
passage_embedding_model="facebook/dpr-ctx_encoder-single-nq-base",
use_gpu=False,
)
document_store_dot_product_with_docs.update_embeddings(dpr)

query = "Where does Carla live?"

join_node = JoinDocuments(join_mode="concatenate")
p = Pipeline()
p.add_node(component=es, name="R1", inputs=["Query"])
Expand All @@ -177,7 +204,21 @@ def test_join_document_pipeline(document_store_dot_product_with_docs, reader):
results = p.run(query=query)
assert len(results["documents"]) == 5

# test concatenate with top_k_join parameter

@pytest.mark.elasticsearch
@pytest.mark.parametrize("document_store_dot_product_with_docs", ["elasticsearch"], indirect=True)
def test_join_concatenate_with_topk(document_store_dot_product_with_docs):
es = ElasticsearchRetriever(document_store=document_store_dot_product_with_docs)
dpr = DensePassageRetriever(
document_store=document_store_dot_product_with_docs,
query_embedding_model="facebook/dpr-question_encoder-single-nq-base",
passage_embedding_model="facebook/dpr-ctx_encoder-single-nq-base",
use_gpu=False,
)
document_store_dot_product_with_docs.update_embeddings(dpr)

query = "Where does Carla live?"

join_node = JoinDocuments(join_mode="concatenate")
p = Pipeline()
p.add_node(component=es, name="R1", inputs=["Query"])
Expand All @@ -188,7 +229,22 @@ def test_join_document_pipeline(document_store_dot_product_with_docs, reader):
assert len(one_result["documents"]) == 1
assert len(two_results["documents"]) == 2

# test join_node with reader

@pytest.mark.elasticsearch
@pytest.mark.parametrize("document_store_dot_product_with_docs", ["elasticsearch"], indirect=True)
@pytest.mark.parametrize("reader", ["farm"], indirect=True)
def test_join_with_reader(document_store_dot_product_with_docs, reader):
es = ElasticsearchRetriever(document_store=document_store_dot_product_with_docs)
dpr = DensePassageRetriever(
document_store=document_store_dot_product_with_docs,
query_embedding_model="facebook/dpr-question_encoder-single-nq-base",
passage_embedding_model="facebook/dpr-ctx_encoder-single-nq-base",
use_gpu=False,
)
document_store_dot_product_with_docs.update_embeddings(dpr)

query = "Where does Carla live?"

join_node = JoinDocuments()
p = Pipeline()
p.add_node(component=es, name="R1", inputs=["Query"])
Expand All @@ -200,6 +256,39 @@ def test_join_document_pipeline(document_store_dot_product_with_docs, reader):
assert results["answers"][0].answer == "Berlin" or results["answers"][1].answer == "Berlin"


@pytest.mark.elasticsearch
@pytest.mark.parametrize("document_store_dot_product_with_docs", ["elasticsearch"], indirect=True)
def test_join_with_rrf(document_store_dot_product_with_docs):
es = ElasticsearchRetriever(document_store=document_store_dot_product_with_docs)
dpr = DensePassageRetriever(
document_store=document_store_dot_product_with_docs,
query_embedding_model="facebook/dpr-question_encoder-single-nq-base",
passage_embedding_model="facebook/dpr-ctx_encoder-single-nq-base",
use_gpu=False,
)
document_store_dot_product_with_docs.update_embeddings(dpr)

query = "Where does Carla live?"

join_node = JoinDocuments(join_mode="reciprocal_rank_fusion")
p = Pipeline()
p.add_node(component=es, name="R1", inputs=["Query"])
p.add_node(component=dpr, name="R2", inputs=["Query"])
p.add_node(component=join_node, name="Join", inputs=["R1", "R2"])
results = p.run(query=query)

# list of precalculated expected results
expected_scores = [
0.03278688524590164,
0.03200204813108039,
0.03200204813108039,
0.031009615384615385,
0.031009615384615385,
]

assert all([doc.score == expected_scores[idx] for idx, doc in enumerate(results["documents"])])


def test_query_keyword_statement_classifier():
class KeywordOutput(RootNode):
outgoing_edges = 2
Expand Down