diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d4546f4..315ce78 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -15,3 +15,8 @@ ## Bug Fixes + +* Fixed bugs that could cause a dispatch actor not to stop when an update + arrives exactly at the moment a dispatch window closes. Also fixed a + related issue that could cause incorrect event ordering when multiple + dispatches are updated concurrently. diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index 3ea9d5f..45fcce8 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -13,7 +13,7 @@ from contextlib import closing from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone -from heapq import heappop, heappush +from heapq import heapify, heappop, heappush import grpc.aio from frequenz.channels import Broadcast, Receiver, select, selected_from @@ -486,11 +486,19 @@ async def _update_dispatch_schedule_and_notify( # Dispatch was updated elif dispatch and old_dispatch: # Remove potentially existing scheduled event - self._remove_scheduled(old_dispatch) + removed = self._remove_scheduled(old_dispatch) # Check if the change requires an immediate notification if self._update_changed_running_state(dispatch, old_dispatch): await self._send_running_state_change(dispatch) + elif removed is not None and removed.priority == 1 and not dispatch.started: + # priority == 1 means a stop event (see QueueItem.__init__). + # If we removed a pending stop event and the dispatch is no + # longer started, the update arrived exactly at the stop + # boundary. The timer would have delivered the stop event, but + # _remove_scheduled consumed it first. Send the notification + # here so the actor is not left running past the window end. + await self._send_running_state_change(dispatch) if dispatch.started: self._schedule_stop(dispatch) @@ -507,21 +515,26 @@ def _update_timer(self, timer: Timer) -> None: timer.reset(interval=due_at - datetime.now(timezone.utc)) _logger.debug("Next event scheduled at %s", self._scheduled_events[0].time) - def _remove_scheduled(self, dispatch: Dispatch) -> bool: + def _remove_scheduled(self, dispatch: Dispatch) -> "QueueItem | None": """Remove a dispatch from the scheduled events. Args: dispatch: The dispatch to remove. Returns: - True if the dispatch was found and removed, False otherwise. + The removed queue item, or None if not found. """ for idx, item in enumerate(self._scheduled_events): if dispatch.id == item.dispatch.id: self._scheduled_events.pop(idx) - return True - - return False + # heappop() only removes the root (index 0) and does not accept + # an index argument, so we use list.pop(idx) instead. After + # removing an arbitrary element the heap property is broken and + # must be restored explicitly. + heapify(self._scheduled_events) + return item + + return None def _schedule_start(self, dispatch: Dispatch) -> None: """Schedule a dispatch to start. diff --git a/tests/test_dispatcher_scheduler.py b/tests/test_dispatcher_scheduler.py new file mode 100644 index 0000000..833c9f6 --- /dev/null +++ b/tests/test_dispatcher_scheduler.py @@ -0,0 +1,302 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +# pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals + +"""Tests for scheduler edge cases in DispatchScheduler.""" + +import asyncio +from collections.abc import AsyncIterator, Iterator +from dataclasses import replace +from datetime import datetime, timedelta, timezone +from random import randint + +import async_solipsism +import time_machine +from frequenz.channels import Receiver +from frequenz.client.common.microgrid import MicrogridId +from frequenz.client.dispatch.recurrence import RecurrenceRule +from frequenz.client.dispatch.test.client import FakeClient, to_create_params +from frequenz.client.dispatch.test.generator import DispatchGenerator +from pytest import fixture + +from frequenz.dispatch import Dispatch, DispatchEvent +from frequenz.dispatch._bg_service import DispatchScheduler + + +@fixture +def generator() -> DispatchGenerator: + """Return a dispatch generator.""" + return DispatchGenerator() + + +@fixture +def event_loop_policy() -> async_solipsism.EventLoopPolicy: + """Set the event loop policy to use async_solipsism.""" + policy = async_solipsism.EventLoopPolicy() + asyncio.set_event_loop_policy(policy) + return policy + + +@fixture +def fake_time() -> Iterator[time_machine.Traveller]: + """Replace real time with a time machine that doesn't automatically tick.""" + with time_machine.travel(destination=0, tick=False) as traveller: + yield traveller + + +def _now() -> datetime: + """Return the current time in UTC.""" + return datetime.now(tz=timezone.utc) + + +@fixture +def microgrid_id() -> MicrogridId: + """Return a random microgrid ID.""" + return MicrogridId(randint(1, 100)) + + +@fixture +def client() -> FakeClient: + """Return a fake dispatch API client.""" + return FakeClient() + + +@fixture +async def scheduler( + microgrid_id: MicrogridId, client: FakeClient +) -> AsyncIterator[DispatchScheduler]: + """Start a DispatchScheduler and stop it after the test.""" + sched = DispatchScheduler(microgrid_id=microgrid_id, client=client) + sched.start() + try: + yield sched + finally: + await sched.stop() + + +@fixture +async def event_receiver(scheduler: DispatchScheduler) -> Receiver[Dispatch]: + """Return a running-state event receiver for SET_POWER dispatches.""" + return await scheduler.new_running_state_event_receiver( + "SET_POWER", merge_strategy=None + ) + + +@fixture +def lifecycle_receiver(scheduler: DispatchScheduler) -> Receiver[DispatchEvent]: + """Return a lifecycle event receiver for SET_POWER dispatches.""" + return scheduler.new_lifecycle_events_receiver("SET_POWER") + + +async def test_parallel_dispatches_same_window_and_follow_up_window( + fake_time: time_machine.Traveller, + generator: DispatchGenerator, + microgrid_id: MicrogridId, + client: FakeClient, + event_receiver: Receiver[Dispatch], + lifecycle_receiver: Receiver[DispatchEvent], +) -> None: + """Test that unmerged dispatches stop correctly at a shared boundary. + + This mirrors the SET_POWER edge case where multiple dispatches share the + exact same start time and duration, and another batch starts exactly when + the first batch ends. + + This is an opportunistic test: it tries to detect corruption in the + _scheduled_events heap by exercising the production traffic shape around + shared boundaries. The broken heap ordering depends on runtime queue + layouts that this harness does not reproduce reliably, but the test still + covers the correct boundary behaviour. + """ + start_time = _now() + timedelta(seconds=5) + duration = timedelta(seconds=15) + follow_up_start_time = start_time + duration + + first_batch = [ + replace( + generator.generate_dispatch(), + active=True, + duration=duration, + start_time=start_time, + recurrence=RecurrenceRule(), + type="SET_POWER", + ) + for _ in range(3) + ] + second_batch = [ + replace( + generator.generate_dispatch(), + active=True, + duration=duration, + start_time=follow_up_start_time, + recurrence=RecurrenceRule(), + type="SET_POWER", + ) + for _ in range(4) + ] + + for dispatch in [*first_batch, *second_batch]: + await client.create(**to_create_params(microgrid_id, dispatch)) + + for _ in range(7): + await lifecycle_receiver.receive() + + fake_time.move_to(start_time + timedelta(seconds=1)) + await asyncio.sleep(1) + + first_start_events = [await event_receiver.receive() for _ in range(3)] + assert {dispatch.id for dispatch in first_start_events} == { + dispatch.id for dispatch in first_batch + } + assert all(dispatch.started for dispatch in first_start_events) + + fake_time.move_to(follow_up_start_time + timedelta(seconds=1)) + await asyncio.sleep(1) + + boundary_events = [await event_receiver.receive() for _ in range(7)] + second_starts = [dispatch for dispatch in boundary_events if dispatch.started] + first_stops = [dispatch for dispatch in boundary_events if not dispatch.started] + + assert {dispatch.id for dispatch in second_starts} == { + dispatch.id for dispatch in second_batch + } + assert all(dispatch.started for dispatch in second_starts) + + assert {dispatch.id for dispatch in first_stops} == { + dispatch.id for dispatch in first_batch + } + assert all(not dispatch.started for dispatch in first_stops) + + fake_time.move_to(follow_up_start_time + duration + timedelta(seconds=1)) + await asyncio.sleep(1) + + second_stop_events = [await event_receiver.receive() for _ in range(4)] + assert {dispatch.id for dispatch in second_stop_events} == { + dispatch.id for dispatch in second_batch + } + assert all(not dispatch.started for dispatch in second_stop_events) + + +async def test_parallel_dispatches_with_payload_updates_before_start( + fake_time: time_machine.Traveller, + generator: DispatchGenerator, + microgrid_id: MicrogridId, + client: FakeClient, + event_receiver: Receiver[Dispatch], + lifecycle_receiver: Receiver[DispatchEvent], +) -> None: + """Test dispatches updated before start still stop correctly. + + Mirrors the production SET_POWER scenario: multiple dispatches with the same + start time are created and then updated with new power values at different + times before the start. Updates trigger _remove_scheduled + re-schedule + which can corrupt the heap, potentially causing stop events to be lost. + + This is an opportunistic test: it tries to detect corruption in the + _scheduled_events heap by exercising the update-before-start pattern. The + buggy heap state is timing- and layout-dependent and cannot be forced + deterministically here, but the test still covers the correct behaviour. + """ + start_time = _now() + timedelta(minutes=25) + duration = timedelta(minutes=15) + + updates = [ + (-80000.0, -88740.0, timedelta(seconds=20)), + (-80000.0, -88740.0, timedelta(minutes=4)), + (-100000.0, -133680.0, timedelta(seconds=50)), + ] + dispatches = [] + + for initial_power, updated_power, wait_time in updates: + dispatch = replace( + generator.generate_dispatch(), + active=True, + duration=duration, + start_time=start_time, + recurrence=RecurrenceRule(), + type="SET_POWER", + payload={"target_power_w": initial_power}, + ) + dispatches.append(dispatch) + await client.create(**to_create_params(microgrid_id, dispatch)) + await lifecycle_receiver.receive() + fake_time.shift(wait_time) + await asyncio.sleep(1) + await client.update( + microgrid_id=microgrid_id, + dispatch_id=dispatch.id, + new_fields={"payload": {"target_power_w": updated_power}}, + ) + fake_time.shift(timedelta(seconds=1)) + await asyncio.sleep(1) + await lifecycle_receiver.receive() + + fake_time.move_to(start_time + timedelta(seconds=1)) + await asyncio.sleep(1) + + start_events = [await event_receiver.receive() for _ in dispatches] + assert all(dispatch.started for dispatch in start_events) + + fake_time.move_to(start_time + duration + timedelta(seconds=1)) + await asyncio.sleep(1) + + stop_events = [await event_receiver.receive() for _ in dispatches] + assert all(not dispatch.started for dispatch in stop_events) + assert {dispatch.id for dispatch in stop_events} == { + dispatch.id for dispatch in start_events + } + + +async def test_dispatch_payload_update_at_stop_boundary( + fake_time: time_machine.Traveller, + generator: DispatchGenerator, + microgrid_id: MicrogridId, + client: FakeClient, + event_receiver: Receiver[Dispatch], + lifecycle_receiver: Receiver[DispatchEvent], +) -> None: + """Test that an update at the stop boundary still stops the actor. + + An update exactly at the stop boundary can remove the pending stop event + before the timer fires it. Without an explicit notification at that point, + the actor is left running even though the dispatch window has elapsed. + + In practice this test is still opportunistic: a payload change triggers an + immediate update notification in this harness, so it also passes without the + fix. It still covers the boundary-update shape from production. + """ + start_time = _now() + timedelta(seconds=5) + duration = timedelta(seconds=15) + + dispatch = replace( + generator.generate_dispatch(), + active=True, + duration=duration, + start_time=start_time, + recurrence=RecurrenceRule(), + type="SET_POWER", + payload={"target_power_w": -88740.0}, + ) + await client.create(**to_create_params(microgrid_id, dispatch)) + await lifecycle_receiver.receive() + + fake_time.move_to(start_time + timedelta(seconds=1)) + await asyncio.sleep(1) + + started = await event_receiver.receive() + assert started.started + + fake_time.move_to(start_time + duration) + await asyncio.sleep(0) + + await client.update( + microgrid_id=microgrid_id, + dispatch_id=dispatch.id, + new_fields={"payload": {"target_power_w": -99999.0}}, + ) + fake_time.shift(timedelta(seconds=1)) + await asyncio.sleep(1) + + stop_event = await event_receiver.receive() + assert not stop_event.started