Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: replace decorator with counter attribute for pipeline event #3462

Merged
merged 4 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions haystack/pipelines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
)
from haystack.pipelines.utils import generate_code, print_eval_report
from haystack.utils import DeepsetCloud, calculate_context_similarity
from haystack.utils.reflection import pipeline_invocation_counter
from haystack.schema import Answer, EvaluationResult, MultiLabel, Document, Span
from haystack.errors import HaystackError, PipelineError, PipelineConfigError
from haystack.nodes.base import BaseComponent, RootNode
Expand Down Expand Up @@ -78,6 +77,7 @@ def __init__(self):
self.event_time_interval = datetime.timedelta(hours=24)
self.event_run_total_threshold = 100
self.last_window_run_total = 0
self.run_total = 0
self.sent_event_in_window = False

@property
Expand Down Expand Up @@ -450,7 +450,6 @@ def set_node(self, name: str, component):
def _run_node(self, node_id: str, node_input: Dict[str, Any]) -> Tuple[Dict, str]:
return self.graph.nodes[node_id]["component"]._dispatch_run(**node_input)

@pipeline_invocation_counter
def run( # type: ignore
self,
query: Optional[str] = None,
Expand Down Expand Up @@ -571,10 +570,11 @@ def run( # type: ignore
i = 0
else:
i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors

self.run_total += 1
self.send_pipeline_event_if_needed(is_indexing=file_paths is not None)
return node_output

@pipeline_invocation_counter
def run_batch( # type: ignore
self,
queries: List[str] = None,
Expand Down Expand Up @@ -722,6 +722,15 @@ def run_batch( # type: ignore
i = 0
else:
i += 1 # attempt executing next node in the queue as current `node_id` has unprocessed predecessors

# increase counter of how many queries/documents have been processed by the pipeline
if queries:
self.run_total += len(queries)
elif documents:
self.run_total += len(documents)
else:
self.run_total += 1

self.send_pipeline_event_if_needed()
return node_output

Expand Down Expand Up @@ -2239,20 +2248,19 @@ def uptime(self) -> timedelta:

def send_pipeline_event(self, is_indexing: bool = False):
fingerprint = sha1(json.dumps(self.get_config(), sort_keys=True).encode()).hexdigest()
run_total = self.run.counter + self.run_batch.counter
send_custom_event(
"pipeline",
payload={
"fingerprint": fingerprint,
"type": "Indexing" if is_indexing else self.get_type(),
"uptime": int(self.uptime().total_seconds()),
"run_total": run_total,
"run_total_window": run_total - self.last_window_run_total,
"run_total": self.run_total,
"run_total_window": self.run_total - self.last_window_run_total,
},
)
now = datetime.datetime.now(datetime.timezone.utc)
self.time_of_last_sent_event = datetime.datetime(now.year, now.month, now.day, tzinfo=datetime.timezone.utc)
self.last_window_run_total = run_total
self.last_window_run_total = self.run_total

def send_pipeline_event_if_needed(self, is_indexing: bool = False):
should_send_event = self.has_event_time_interval_exceeded() or self.has_event_run_total_threshold_exceeded()
Expand All @@ -2267,8 +2275,7 @@ def has_event_time_interval_exceeded(self):
return now - self.time_of_last_sent_event > self.event_time_interval

def has_event_run_total_threshold_exceeded(self):
run_total = self.run.counter + self.run_batch.counter
return run_total - self.last_window_run_total > self.event_run_total_threshold
return self.run_total - self.last_window_run_total > self.event_run_total_threshold


class _HaystackBeirRetrieverAdapter:
Expand Down
23 changes: 0 additions & 23 deletions haystack/utils/reflection.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import inspect
import functools
import logging
import time
from random import random
Expand All @@ -20,28 +19,6 @@ def args_to_kwargs(args: Tuple, func: Callable) -> Dict[str, Any]:
return args_as_kwargs


def pipeline_invocation_counter(func):
@functools.wraps(func)
def wrapper_invocation_counter(*args, **kwargs):
# single query
this_invocation_count = 1
# were named arguments used?
if "queries" in kwargs:
this_invocation_count = len(kwargs["queries"]) if kwargs["queries"] else 1
elif "documents" in kwargs:
this_invocation_count = len(kwargs["documents"]) if kwargs["documents"] else 1
else:
# positional arguments used? try to infer count from the first parameter in args
if args[0] and isinstance(args[0], list):
this_invocation_count = len(args[0])

wrapper_invocation_counter.counter += this_invocation_count
return func(*args, **kwargs)

wrapper_invocation_counter.counter = 0
return wrapper_invocation_counter


def retry_with_exponential_backoff(
backoff_in_seconds: float = 1, max_retries: int = 10, errors: tuple = (OpenAIRateLimitError,)
):
Expand Down
20 changes: 19 additions & 1 deletion test/pipelines/test_pipeline_debug_and_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import pytest

from haystack.pipelines import Pipeline, RootNode
from haystack.pipelines import Pipeline, RootNode, DocumentSearchPipeline
from haystack.nodes import FARMReader, BM25Retriever, JoinDocuments

from ..conftest import SAMPLES_PATH, MockRetriever as BaseMockRetriever, MockReader
Expand Down Expand Up @@ -208,6 +208,24 @@ def test_unexpected_node_arg():
assert "Invalid parameter 'invalid' for the node 'Retriever'" in str(exc.value)


@pytest.mark.parametrize("retriever", ["embedding"], indirect=True)
@pytest.mark.parametrize("document_store", ["memory"], indirect=True)
def test_pipeline_run_counters(retriever, document_store):
documents = [{"content": "Sample text for document-1", "meta": {"source": "wiki1"}}]

document_store.write_documents(documents)
document_store.update_embeddings(retriever)

p = DocumentSearchPipeline(retriever=retriever)
p.run(query="Irrelevant", params={"top_k": 1})
assert p.pipeline.run_total == 1
for i in range(p.pipeline.event_run_total_threshold + 1):
p.run(query="Irrelevant", params={"top_k": 1})

assert p.pipeline.run_total == 102
assert p.pipeline.last_window_run_total == 101


def test_debug_info_propagation():
class A(RootNode):
def run(self):
Expand Down