Skip to content

Commit

Permalink
Move latest_sample from FallbackFormulaMetricFetcher to MetricFetcher
Browse files Browse the repository at this point in the history
Because that's where it is used.

Signed-off-by: Elzbieta Kotulska <elzbieta.kotulska@frequenz.com>
  • Loading branch information
ela-kotulska-frequenz committed Aug 30, 2024
1 parent 1731ad4 commit c70ea97
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def __init__(self, formula_generator: FormulaGenerator[QuantityT]):
self._formula_generator: FormulaGenerator[QuantityT] = formula_generator
self._formula_engine: FormulaEngine[QuantityT] | None = None
self._receiver: Receiver[Sample[QuantityT]] | None = None
self._latest_sample: Sample[QuantityT] | None = None

@property
def name(self) -> str:
Expand All @@ -46,15 +45,6 @@ def is_running(self) -> bool:
"""Check whether the formula engine is running."""
return self._receiver is not None

@property
def latest_sample(self) -> Sample[QuantityT] | None:
"""Get the latest fetched sample.
Returns:
The latest fetched sample, or `None` if no sample has been fetched yet.
"""
return self._latest_sample

def start(self) -> None:
"""Initialize the formula engine and start fetching samples."""
engine = self._formula_generator.generate()
Expand Down Expand Up @@ -87,5 +77,4 @@ def consume(self) -> Sample[QuantityT]:
self._receiver is not None
), f"Fallback metric fetcher: {self.name} was not started"

self._latest_sample = self._receiver.consume()
return self._latest_sample
return self._receiver.consume()
25 changes: 7 additions & 18 deletions src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,16 +365,6 @@ def name(self) -> str:
def is_running(self) -> bool:
"""Check whether the metric fetcher is running."""

@property
@abstractmethod
def latest_sample(self) -> Sample[QuantityT] | None:
"""Get the latest fetched value.
Returns:
The latest fetched value. None if no value has been fetched
of fetcher is not running.
"""

@abstractmethod
def start(self) -> None:
"""Initialize the metric fetcher and start fetching samples."""
Expand Down Expand Up @@ -406,6 +396,7 @@ def __init__(
self._next_value: Sample[QuantityT] | None = None
self._nones_are_zeros = nones_are_zeros
self._fallback: FallbackMetricFetcher[QuantityT] | None = fallback
self._latest_fallback_sample: Sample[QuantityT] | None = None

@property
def stream(self) -> Receiver[Sample[QuantityT]]:
Expand Down Expand Up @@ -444,9 +435,9 @@ async def _synchronize_and_fetch_fallback(
fetcher or if the fallback fetcher fails to fetch the next value.
"""
# fallback_fetcher was not used, yet. We need to fetch first value.
if fallback_fetcher.latest_sample is None:
if self._latest_fallback_sample is None:
try:
fallback = await fallback_fetcher.receive()
self._latest_fallback_sample = await fallback_fetcher.receive()
except ReceiverError[Any] as err:
_logger.error(
"Fallback metric fetcher %s failed to fetch next value: %s."
Expand All @@ -455,16 +446,14 @@ async def _synchronize_and_fetch_fallback(
err,
)
return None
else:
fallback = fallback_fetcher.latest_sample

if primary_fetcher_sample.timestamp < fallback.timestamp:
if primary_fetcher_sample.timestamp < self._latest_fallback_sample.timestamp:
return None

# Synchronize the fallback fetcher with primary one
while primary_fetcher_sample.timestamp > fallback.timestamp:
while primary_fetcher_sample.timestamp > self._latest_fallback_sample.timestamp:
try:
fallback = await fallback_fetcher.receive()
self._latest_fallback_sample = await fallback_fetcher.receive()
except ReceiverError[Any] as err:
_logger.error(
"Fallback metric fetcher %s failed to fetch next value: %s."
Expand All @@ -474,7 +463,7 @@ async def _synchronize_and_fetch_fallback(
)
return None

return fallback
return self._latest_fallback_sample

async def fetch_next_with_fallback(
self, fallback_fetcher: FallbackMetricFetcher[QuantityT]
Expand Down

0 comments on commit c70ea97

Please sign in to comment.