diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 5f93b7d7b..4fa7acd3d 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -11,6 +11,10 @@ ## New Features - `Resampler`: The resampler can now be configured to have the resampling window closed to the right (default) or left, and to also set the resampler timestamp to the right (default) or left end of the window being resampled. You can configure setting the new options `closed` and `label` in the `ResamplerConfig`. +- `EventResampler`: A new event-driven resampler for cascaded resampling stages. Unlike the timer-based `Resampler`, `EventResampler` emits windows when sample timestamps exceed window boundaries, eliminating data loss at window boundaries in cascaded scenarios. See the class documentation for usage guidelines. +- `StreamingHelper`: Added callback mechanism via `register_sample_callback()` to notify external consumers when samples arrive, enabling event-driven resampling without polling internal buffers. +- `Resampler._emit_window()`: Extracted window emission logic into a dedicated method for code sharing between timer-based and event-driven resampler implementations. + ## Bug Fixes diff --git a/src/frequenz/sdk/timeseries/_resampling/_event_resampler.py b/src/frequenz/sdk/timeseries/_resampling/_event_resampler.py new file mode 100644 index 000000000..5e48f7794 --- /dev/null +++ b/src/frequenz/sdk/timeseries/_resampling/_event_resampler.py @@ -0,0 +1,213 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Event-driven resampler for cascaded resampling stages.""" + +import asyncio +import logging +from datetime import datetime, timedelta, timezone + +from frequenz.quantities import Quantity + +from .._base_types import Sample +from ._base_types import Sink, Source +from ._config import ResamplerConfig +from ._resampler import Resampler, _ResamplingHelper, _StreamingHelper + +_logger = logging.getLogger(__name__) + + +class EventResampler(Resampler): + """Event-driven resampler for cascaded resampling stages. + + Unlike the standard Timer-based Resampler which uses fixed wall-clock + intervals, EventResampler is triggered by incoming data. Windows are + emitted when a sample arrives that falls outside the current window, + not on a fixed timer schedule. + + Problem Solved: + When cascading Timer-based resamplers (e.g., 1s → 10s) with + align_to=UNIX_EPOCH, samples can be lost at window boundaries due to + timing synchronization issues. EventResampler eliminates this by + opening/closing windows based on actual data arrival. + + Important: This resampler is optimized for continuous data streams + where samples arrive at regular or semi-regular intervals. It is not + suitable for handling raw, irregular data directly from sources. + + Best Used: + Stage 1: Timer-based Resampler (handles raw, irregular data) + Stage 2+: Event-based Resampler (handles continuous data from Stage 1) + + Example: + config = ResamplerConfig( + resampling_period=timedelta(seconds=10), + resampling_function=..., + ) + resampler = EventResampler(config) + resampler.add_timeseries("my_source", source, sink) + await resampler.resample() + + Note: If a long gap occurs without incoming samples (no data for multiple periods), + the corresponding windows will be emitted all at once when data resumes. This is + acceptable for cascaded resampling since the input typically comes from another + Resampler with guaranteed continuous output. + """ + + # pylint: disable=super-init-not-called + def __init__(self, config: ResamplerConfig) -> None: + """Initialize EventResampler. + + This does not call super().__init__() to avoid starting any timers + + Args: + config: Resampler configuration + """ + self._config = config + """The configuration for this resampler.""" + + self._resamplers: dict[Source, _StreamingHelper] = {} + """A mapping between sources and the streaming helper handling that source.""" + + window_end, _ = self._calculate_window_end() + self._window_end: datetime = window_end + """The time in which the current window ends. + + This is used to make sure every resampling window is generated at + precise times. We can't rely on the timer timestamp because timers will + never fire at the exact requested time, so if we don't use a precise + time for the end of the window, the resampling windows we produce will + have different sizes. + + The window end will also be aligned to the `config.align_to` time, so + the window end is deterministic. + """ + + self._window_lock = asyncio.Lock() + """Lock protecting access to `_window_end` during window state transitions.""" + + self._sample_queue: asyncio.Queue[Sample[Quantity]] = asyncio.Queue() + """Queue for samples awaiting processing. Filled by `_StreamingHelper` callbacks, + consumed by the event loop in `resample()`. + """ + + # OVERRIDDEN: Register callback to receive samples asynchronously for + # event-driven window management. + def add_timeseries(self, name: str, source: Source, sink: Sink) -> bool: + """Start resampling a new timeseries. + + Registers the timeseries and sets up a sample callback to enqueue + incoming samples for event-driven processing. + + Args: + name: The name of the timeseries (for logging purposes). + source: The source of the timeseries to resample. + sink: The sink to use to send the resampled data. + + Returns: + `True` if the timeseries was added, `False` if the timeseries was + not added because there already a timeseries using the provided + receiver. + """ + if source in self._resamplers: + return False + + resampler = _StreamingHelper( + _ResamplingHelper(name, self._config), source, sink + ) + + # Register the callback to receive samples from the streaming helper. + resampler.register_sample_callback(self._enqueue_sample) + + self._resamplers[source] = resampler + return True + + async def _enqueue_sample(self, sample: Sample[Quantity]) -> None: + """Add a sample to the processing queue. + + Args: + sample: The sample to enqueue. + """ + await self._sample_queue.put(sample) + + # OVERRIDDEN: no warm-up period needed for event-driven sample accumulation. + def _calculate_window_end(self) -> tuple[datetime, timedelta]: + """Calculate the end of the first resampling window. + + Calculates the next multiple of resampling_period after the current time, + respecting align_to configuration. + + Returns: + A tuple (window_end, delay_time) where: + - window_end: datetime when the first window should end + - delay_time: always timedelta(0) for EventResampler + """ + now = datetime.now(timezone.utc) + period = self._config.resampling_period + align_to = self._config.align_to + + if align_to is None: + return (now + period, timedelta(0)) + + elapsed = (now - align_to) % period + + return ( + (now + (period - elapsed), timedelta(0)) + if elapsed > timedelta(0) + else (now, timedelta(0)) + ) + + async def resample(self, *, one_shot: bool = False) -> None: + """Start event-driven resampling. + + Processes incoming samples from the queue continuously. Windows are + emitted when a sample arrives with a timestamp >= current window_end. + This is in contrast to Timer-based resampling which emits windows at + fixed intervals regardless of data arrival. + + Args: + one_shot: If True, waits for the first window to be emitted, then exits. + + Raises: + asyncio.CancelledError: If the task is cancelled. + """ + try: + while True: + sample = await self._sample_queue.get() + emmitted = await self._process_sample(sample) + + if one_shot and emmitted: + return + + except asyncio.CancelledError: + _logger.info("EventResampler task cancelled") + raise + + async def _process_sample( + self, + sample: Sample[Quantity], + ) -> bool: + """Process an incoming sample and manage window state. + + This method checks if the incoming sample falls outside the current + window and emits completed windows as needed. Returns True if any + windows were emitted. + + Args: + sample: Incoming sample to process + + Returns: + True if at least one window was emitted, False otherwise. + """ + async with self._window_lock: + emmitted = False + while sample.timestamp >= self._window_end: + _logger.debug( + "EventResampler: Sample at %s >= window end %s, closing window", + sample.timestamp, + self._window_end, + ) + await self._emit_window(self._window_end) + self._window_end += self._config.resampling_period + emmitted = True + return emmitted diff --git a/src/frequenz/sdk/timeseries/_resampling/_resampler.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py index db2631c2e..8924ff44a 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_resampler.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -12,7 +12,7 @@ from bisect import bisect, bisect_left from collections import deque from datetime import datetime, timedelta, timezone -from typing import assert_never +from typing import Awaitable, Callable, assert_never from frequenz.channels.timer import Timer, TriggerAllMissed, _to_microseconds from frequenz.quantities import Quantity @@ -160,14 +160,6 @@ async def resample(self, *, one_shot: bool = False) -> None: Args: one_shot: Wether the resampling should run only for one resampling period. - - Raises: - ResamplingError: If some timeseries source or sink encounters any - errors while receiving or sending samples. In this case the - timer still runs and the timeseries will keep receiving data. - The user should remove (and re-add if desired) the faulty - timeseries from the resampler before calling this method - again). """ # We use a tolerance of 10% of the resampling period tolerance = timedelta( @@ -200,28 +192,45 @@ async def resample(self, *, one_shot: bool = False) -> None: case unexpected: assert_never(unexpected) - # We need to make a copy here because we need to match the results to the - # current resamplers, and since we await here, new resamplers could be added - # or removed from the dict while we awaiting the resampling, which would - # cause the results to be out of sync. - resampler_sources = list(self._resamplers) - results = await asyncio.gather( - *[r.resample(next_tick_time) for r in self._resamplers.values()], - return_exceptions=True, - ) + await self._emit_window(next_tick_time) - exceptions = { - source: result - for source, result in zip(resampler_sources, results) - # CancelledError inherits from BaseException, but we don't want - # to catch *all* BaseExceptions here. - if isinstance(result, (Exception, asyncio.CancelledError)) - } - if exceptions: - raise ResamplingError(exceptions) if one_shot: break + async def _emit_window(self, window_end: datetime) -> None: + """Emit resampled samples for all timeseries at the given window boundary. + + Args: + window_end: The timestamp marking the end of the resampling window. + + Raises: + ResamplingError: If some timeseries source or sink encounters any + errors while receiving or sending samples. In this case the + timer still runs and the timeseries will keep receiving data. + The user should remove (and re-add if desired) the faulty + timeseries from the resampler before calling this method + again). + """ + # We need to make a copy here because we need to match the results to the + # current resamplers, and since we await here, new resamplers could be added + # or removed from the dict while we awaiting the resampling, which would + # cause the results to be out of sync. + resampler_sources = list(self._resamplers) + results = await asyncio.gather( + *[r.resample(window_end) for r in self._resamplers.values()], + return_exceptions=True, + ) + + exceptions = { + source: result + for source, result in zip(resampler_sources, results) + # CancelledError inherits from BaseException, but we don't want + # to catch *all* BaseExceptions here. + if isinstance(result, (Exception, asyncio.CancelledError)) + } + if exceptions: + raise ResamplingError(exceptions) + def _calculate_window_end(self) -> tuple[datetime, timedelta]: """Calculate the end of the current resampling window. @@ -528,6 +537,9 @@ def __init__( self._helper: _ResamplingHelper = helper self._source: Source = source self._sink: Sink = sink + self._sample_callback: Callable[[Sample[Quantity]], Awaitable[None]] | None = ( + None + ) self._receiving_task: asyncio.Task[None] = asyncio.create_task( self._receive_samples() ) @@ -545,6 +557,22 @@ async def stop(self) -> None: """Cancel the receiving task.""" await cancel_and_await(self._receiving_task) + def register_sample_callback( + self, + callback: Callable[[Sample[Quantity]], Awaitable[None]] | None, + ) -> None: + """Register a callback to be invoked when a sample arrives. + + The callback is called asynchronously each time a sample is received + from the source. This allows consumers (like EventResampler) to be + notified of incoming samples without polling internal buffers. + + Args: + callback: An async function to call when a sample arrives. + If `None`, no callback will be called on new samples. + """ + self._sample_callback = callback + async def _receive_samples(self) -> None: """Pass received samples to the helper. @@ -555,6 +583,9 @@ async def _receive_samples(self) -> None: if sample.value is not None and not sample.value.isnan(): self._helper.add_sample((sample.timestamp, sample.value.base_value)) + if self._sample_callback: + await self._sample_callback(sample) + # We need the noqa because pydoclint can't figure out that `recv_exception` is an # `Exception` instance. async def resample(self, timestamp: datetime) -> None: # noqa: DOC503 diff --git a/tests/timeseries/_resampling/test_event_resampler.py b/tests/timeseries/_resampling/test_event_resampler.py new file mode 100644 index 000000000..0920a6cd9 --- /dev/null +++ b/tests/timeseries/_resampling/test_event_resampler.py @@ -0,0 +1,298 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Tests for the `EventResampler` class.""" + +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, patch + +import pytest +from frequenz.quantities import Quantity + +from frequenz.sdk.timeseries import Sample +from frequenz.sdk.timeseries._resampling._config import ResamplerConfig +from frequenz.sdk.timeseries._resampling._event_resampler import EventResampler +from frequenz.sdk.timeseries._resampling._resampler import Resampler + +# pylint: disable=protected-access + + +@dataclass +class ResamplerTestCase: + """Data class for holding test case parameters for EventResampler tests.""" + + align_to: datetime | None + """Alignment point for windows. If None, windows are aligned to the first sample time.""" + + first_window_end: datetime + """Expected end time of the first window based on the configuration and start time.""" + + +@pytest.fixture +def now() -> datetime: + """Fixture providing a fixed current time for testing.""" + return datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc) + + +@pytest.fixture( + params=[ + ResamplerTestCase( + align_to=None, + first_window_end=datetime(2024, 1, 1, 12, 0, 15, tzinfo=timezone.utc), + ), + ResamplerTestCase( + align_to=datetime(1970, 1, 1, tzinfo=timezone.utc), + first_window_end=datetime(2024, 1, 1, 12, 0, 10, tzinfo=timezone.utc), + ), + ], + ids=["no_alignment", "with_alignment"], +) +def resampler_case(request: pytest.FixtureRequest) -> ResamplerTestCase: + """Fixture for EventResampler test cases.""" + assert isinstance(request.param, ResamplerTestCase) + return request.param + + +@pytest.fixture +def resampler_config(resampler_case: ResamplerTestCase) -> ResamplerConfig: + """Create a basic resampler config for testing.""" + return ResamplerConfig( + resampling_period=timedelta(seconds=10), + max_data_age_in_periods=1, + align_to=resampler_case.align_to, + ) + + +@pytest.fixture +def first_window_end(resampler_case: ResamplerTestCase) -> datetime: + """Fixture providing the expected first window end time based on the test case.""" + return resampler_case.first_window_end + + +@patch("frequenz.sdk.timeseries._resampling._event_resampler.datetime") +@pytest.mark.asyncio +async def test_event_resampler_initialization( + mock_datetime: AsyncMock, + resampler_config: ResamplerConfig, + now: datetime, + first_window_end: datetime, +) -> None: + """Event Resampler initializes without errors.""" + mock_datetime.now.return_value = now + resampler = EventResampler(resampler_config) + + assert resampler.config == resampler_config + assert len(resampler._resamplers) == 0 + assert resampler._window_end == first_window_end + + +@pytest.mark.asyncio +async def test_event_resampler_inherits_from_resampler( + resampler_config: ResamplerConfig, +) -> None: + """Event Resampler is a Resampler subclass.""" + resampler = EventResampler(resampler_config) + assert isinstance(resampler, Resampler) + assert hasattr(resampler, "add_timeseries") + assert hasattr(resampler, "remove_timeseries") + assert callable(resampler.add_timeseries) + assert callable(resampler.remove_timeseries) + + +@patch("frequenz.sdk.timeseries._resampling._event_resampler.datetime") +@pytest.mark.asyncio +async def test_window_initialization( + mock_datetime: AsyncMock, + resampler_config: ResamplerConfig, + now: datetime, + first_window_end: datetime, +) -> None: + """Window initializes correctly on first sample.""" + mock_datetime.now.return_value = now + resampler = EventResampler(resampler_config) + + assert resampler._window_end == first_window_end + + sample = Sample(now, Quantity(42.0)) + await resampler._process_sample(sample) + + assert resampler._window_end == first_window_end + + +@patch("frequenz.sdk.timeseries._resampling._event_resampler.datetime") +@pytest.mark.asyncio +async def test_sample_before_first_window_boundary( + mock_datetime: AsyncMock, + resampler_config: ResamplerConfig, + now: datetime, + first_window_end: datetime, +) -> None: + """Samples before window boundary don't trigger emit.""" + mock_datetime.now.return_value = now + resampler = EventResampler(resampler_config) + + with patch.object( + resampler, "_emit_window", new_callable=AsyncMock + ) as mock_emit_window: + # Sample 1 + sample1 = Sample(now + timedelta(seconds=1), Quantity(10.0)) + await resampler._process_sample(sample1) + + assert resampler._window_end == first_window_end + mock_emit_window.assert_not_called() + + # Process sample still within first window + sample2 = Sample(now + timedelta(seconds=3), Quantity(20.0)) + await resampler._process_sample(sample2) + + assert resampler._window_end == first_window_end + mock_emit_window.assert_not_called() + + +@patch("frequenz.sdk.timeseries._resampling._event_resampler.datetime") +@pytest.mark.asyncio +async def test_sample_at_window_boundary_triggers_emit( + mock_datetime: AsyncMock, + resampler_config: ResamplerConfig, + now: datetime, + first_window_end: datetime, +) -> None: + """Sample at window boundary triggers emit and opens new window.""" + mock_datetime.now.return_value = now + resampler = EventResampler(resampler_config) + + with patch.object( + resampler, "_emit_window", new_callable=AsyncMock + ) as mock_emit_window: + # Sample 1 + sample1 = Sample(now + timedelta(seconds=1), Quantity(10.0)) + await resampler._process_sample(sample1) + + assert resampler._window_end == first_window_end + mock_emit_window.assert_not_called() + + # Sample 2 at boundary + sample2 = Sample(now + timedelta(seconds=10), Quantity(20.0)) + await resampler._process_sample(sample2) + + mock_emit_window.assert_called_once_with(first_window_end) + assert resampler._window_end == first_window_end + timedelta(seconds=10) + + +@patch("frequenz.sdk.timeseries._resampling._event_resampler.datetime") +@pytest.mark.asyncio +async def test_sample_after_window_boundary( + mock_datetime: AsyncMock, + resampler_config: ResamplerConfig, + now: datetime, + first_window_end: datetime, +) -> None: + """Sample after window boundary triggers emit.""" + mock_datetime.now.return_value = now + resampler = EventResampler(resampler_config) + + with patch.object( + resampler, "_emit_window", new_callable=AsyncMock + ) as mock_emit_window: + # Sample 1 + sample1 = Sample(now + timedelta(seconds=1), Quantity(10.0)) + await resampler._process_sample(sample1) + + assert resampler._window_end == first_window_end + mock_emit_window.assert_not_called() + + # Sample 2 at boundary + sample2 = Sample(now + timedelta(seconds=11), Quantity(20.0)) + await resampler._process_sample(sample2) + + mock_emit_window.assert_called_once_with(first_window_end) + assert resampler._window_end == first_window_end + timedelta(seconds=10) + + +@patch("frequenz.sdk.timeseries._resampling._event_resampler.datetime") +@pytest.mark.asyncio +async def test_sample_crossing_multiple_windows( + mock_datetime: AsyncMock, + resampler_config: ResamplerConfig, + now: datetime, + first_window_end: datetime, +) -> None: + """Sample crossing multiple windows emits each one.""" + mock_datetime.now.return_value = now + resampler = EventResampler(resampler_config) + + with patch.object( + resampler, "_emit_window", new_callable=AsyncMock + ) as mock_emit_window: + # Sample 1 at 2s + sample1 = Sample(now + timedelta(seconds=2), Quantity(10.0)) + await resampler._process_sample(sample1) + mock_emit_window.assert_not_called() + assert resampler._window_end == first_window_end + + # Sample 2 at 32s + sample2 = Sample(now + timedelta(seconds=32), Quantity(20.0)) + await resampler._process_sample(sample2) + assert mock_emit_window.call_count == 3 + assert resampler._window_end == first_window_end + timedelta(seconds=30) + + +@patch("frequenz.sdk.timeseries._resampling._event_resampler.datetime") +@pytest.mark.asyncio +async def test_window_alignment_maintained( + mock_datetime: AsyncMock, + resampler_config: ResamplerConfig, + now: datetime, + first_window_end: datetime, +) -> None: + """Windows remain aligned when using simple addition.""" + mock_datetime.now.return_value = now + resampler = EventResampler(resampler_config) + send_sequence = [1, 15, 25, 35] # Sample times in seconds + + with patch.object( + resampler, "_emit_window", new_callable=AsyncMock + ) as mock_emit_window: + for offset in send_sequence: + sample = Sample(now + timedelta(seconds=offset), Quantity(float(offset))) + await resampler._process_sample(sample) + + for i, call_args in enumerate(mock_emit_window.call_args_list): + window_end = call_args.args[0] # Extract the first argument from the call + expected = first_window_end + i * resampler_config.resampling_period + assert window_end == expected + + +@patch("frequenz.sdk.timeseries._resampling._event_resampler.datetime") +@pytest.mark.asyncio +async def test_key_benefit_no_data_loss_at_boundaries( + mock_datetime: AsyncMock, + resampler_config: ResamplerConfig, + now: datetime, + first_window_end: datetime, +) -> None: + """ + Key benefit: No data loss at window boundaries. + + This test demonstrates the main value of EventResampler compared + to cascaded TimerResamplers: samples arriving at boundaries are + never lost. + """ + mock_datetime.now.return_value = now + resampler = EventResampler(resampler_config) + arriving_samples = [1.0, 5.0, 9.5, 10.0, 10.1, 15.0, 20.0, 20.5] + + with patch.object( + resampler, "_emit_window", new_callable=AsyncMock + ) as mock_emit_window: + for i, sample_offset in enumerate(arriving_samples): + sample = Sample(now + timedelta(seconds=sample_offset), Quantity(i)) + await resampler._process_sample(sample) + + assert mock_emit_window.call_count == 2 + assert mock_emit_window.call_args_list[0].args[0] == first_window_end + assert mock_emit_window.call_args_list[1].args[ + 0 + ] == first_window_end + timedelta(seconds=10) + assert resampler._window_end == (first_window_end + timedelta(seconds=20))