From fd98f6cbc7b80dab772c52d59cf033784bec80b6 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Wed, 11 Mar 2026 15:08:40 +0100 Subject: [PATCH 1/4] Fix heap invariant corruption in _remove_scheduled list.pop(idx) removes an arbitrary element from the heap without restoring the heap property. Add heapify() after the removal to fix the invariant. Change the return type from bool to QueueItem | None so callers can inspect the removed item (needed for the follow-up fix). Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_bg_service.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index 3ea9d5f..effb31d 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -13,7 +13,7 @@ from contextlib import closing from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone -from heapq import heappop, heappush +from heapq import heapify, heappop, heappush import grpc.aio from frequenz.channels import Broadcast, Receiver, select, selected_from @@ -507,21 +507,26 @@ def _update_timer(self, timer: Timer) -> None: timer.reset(interval=due_at - datetime.now(timezone.utc)) _logger.debug("Next event scheduled at %s", self._scheduled_events[0].time) - def _remove_scheduled(self, dispatch: Dispatch) -> bool: + def _remove_scheduled(self, dispatch: Dispatch) -> "QueueItem | None": """Remove a dispatch from the scheduled events. Args: dispatch: The dispatch to remove. Returns: - True if the dispatch was found and removed, False otherwise. + The removed queue item, or None if not found. """ for idx, item in enumerate(self._scheduled_events): if dispatch.id == item.dispatch.id: self._scheduled_events.pop(idx) - return True - - return False + # heappop() only removes the root (index 0) and does not accept + # an index argument, so we use list.pop(idx) instead. After + # removing an arbitrary element the heap property is broken and + # must be restored explicitly. + heapify(self._scheduled_events) + return item + + return None def _schedule_start(self, dispatch: Dispatch) -> None: """Schedule a dispatch to start. From 619364be05cf2638af30259fc007c4b4852e0fa1 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Wed, 11 Mar 2026 15:24:20 +0100 Subject: [PATCH 2/4] Fix lost stop event on boundary updates When a dispatch update arrives at exactly the stop boundary, _remove_scheduled consumes the pending stop event from the queue. _update_changed_running_state does not fire because old_dispatch.started and dispatch.started are both False at this point. The actor would then remain running indefinitely. Fix: if _remove_scheduled returned a stop event (priority == 1) and the dispatch is no longer started, send the running-state change explicitly. Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_bg_service.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index effb31d..45fcce8 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -486,11 +486,19 @@ async def _update_dispatch_schedule_and_notify( # Dispatch was updated elif dispatch and old_dispatch: # Remove potentially existing scheduled event - self._remove_scheduled(old_dispatch) + removed = self._remove_scheduled(old_dispatch) # Check if the change requires an immediate notification if self._update_changed_running_state(dispatch, old_dispatch): await self._send_running_state_change(dispatch) + elif removed is not None and removed.priority == 1 and not dispatch.started: + # priority == 1 means a stop event (see QueueItem.__init__). + # If we removed a pending stop event and the dispatch is no + # longer started, the update arrived exactly at the stop + # boundary. The timer would have delivered the stop event, but + # _remove_scheduled consumed it first. Send the notification + # here so the actor is not left running past the window end. + await self._send_running_state_change(dispatch) if dispatch.started: self._schedule_stop(dispatch) From c9341f01e09b8dffcf36f8c4225a53183e2e468f Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Wed, 11 Mar 2026 15:40:08 +0100 Subject: [PATCH 3/4] Add scheduler tests for heap invariant and boundary stop-event fixes Signed-off-by: Mathias L. Baumann --- tests/test_dispatcher_scheduler.py | 302 +++++++++++++++++++++++++++++ 1 file changed, 302 insertions(+) create mode 100644 tests/test_dispatcher_scheduler.py diff --git a/tests/test_dispatcher_scheduler.py b/tests/test_dispatcher_scheduler.py new file mode 100644 index 0000000..833c9f6 --- /dev/null +++ b/tests/test_dispatcher_scheduler.py @@ -0,0 +1,302 @@ +# License: MIT +# Copyright © 2024 Frequenz Energy-as-a-Service GmbH + +# pylint: disable=too-many-arguments,too-many-positional-arguments,too-many-locals + +"""Tests for scheduler edge cases in DispatchScheduler.""" + +import asyncio +from collections.abc import AsyncIterator, Iterator +from dataclasses import replace +from datetime import datetime, timedelta, timezone +from random import randint + +import async_solipsism +import time_machine +from frequenz.channels import Receiver +from frequenz.client.common.microgrid import MicrogridId +from frequenz.client.dispatch.recurrence import RecurrenceRule +from frequenz.client.dispatch.test.client import FakeClient, to_create_params +from frequenz.client.dispatch.test.generator import DispatchGenerator +from pytest import fixture + +from frequenz.dispatch import Dispatch, DispatchEvent +from frequenz.dispatch._bg_service import DispatchScheduler + + +@fixture +def generator() -> DispatchGenerator: + """Return a dispatch generator.""" + return DispatchGenerator() + + +@fixture +def event_loop_policy() -> async_solipsism.EventLoopPolicy: + """Set the event loop policy to use async_solipsism.""" + policy = async_solipsism.EventLoopPolicy() + asyncio.set_event_loop_policy(policy) + return policy + + +@fixture +def fake_time() -> Iterator[time_machine.Traveller]: + """Replace real time with a time machine that doesn't automatically tick.""" + with time_machine.travel(destination=0, tick=False) as traveller: + yield traveller + + +def _now() -> datetime: + """Return the current time in UTC.""" + return datetime.now(tz=timezone.utc) + + +@fixture +def microgrid_id() -> MicrogridId: + """Return a random microgrid ID.""" + return MicrogridId(randint(1, 100)) + + +@fixture +def client() -> FakeClient: + """Return a fake dispatch API client.""" + return FakeClient() + + +@fixture +async def scheduler( + microgrid_id: MicrogridId, client: FakeClient +) -> AsyncIterator[DispatchScheduler]: + """Start a DispatchScheduler and stop it after the test.""" + sched = DispatchScheduler(microgrid_id=microgrid_id, client=client) + sched.start() + try: + yield sched + finally: + await sched.stop() + + +@fixture +async def event_receiver(scheduler: DispatchScheduler) -> Receiver[Dispatch]: + """Return a running-state event receiver for SET_POWER dispatches.""" + return await scheduler.new_running_state_event_receiver( + "SET_POWER", merge_strategy=None + ) + + +@fixture +def lifecycle_receiver(scheduler: DispatchScheduler) -> Receiver[DispatchEvent]: + """Return a lifecycle event receiver for SET_POWER dispatches.""" + return scheduler.new_lifecycle_events_receiver("SET_POWER") + + +async def test_parallel_dispatches_same_window_and_follow_up_window( + fake_time: time_machine.Traveller, + generator: DispatchGenerator, + microgrid_id: MicrogridId, + client: FakeClient, + event_receiver: Receiver[Dispatch], + lifecycle_receiver: Receiver[DispatchEvent], +) -> None: + """Test that unmerged dispatches stop correctly at a shared boundary. + + This mirrors the SET_POWER edge case where multiple dispatches share the + exact same start time and duration, and another batch starts exactly when + the first batch ends. + + This is an opportunistic test: it tries to detect corruption in the + _scheduled_events heap by exercising the production traffic shape around + shared boundaries. The broken heap ordering depends on runtime queue + layouts that this harness does not reproduce reliably, but the test still + covers the correct boundary behaviour. + """ + start_time = _now() + timedelta(seconds=5) + duration = timedelta(seconds=15) + follow_up_start_time = start_time + duration + + first_batch = [ + replace( + generator.generate_dispatch(), + active=True, + duration=duration, + start_time=start_time, + recurrence=RecurrenceRule(), + type="SET_POWER", + ) + for _ in range(3) + ] + second_batch = [ + replace( + generator.generate_dispatch(), + active=True, + duration=duration, + start_time=follow_up_start_time, + recurrence=RecurrenceRule(), + type="SET_POWER", + ) + for _ in range(4) + ] + + for dispatch in [*first_batch, *second_batch]: + await client.create(**to_create_params(microgrid_id, dispatch)) + + for _ in range(7): + await lifecycle_receiver.receive() + + fake_time.move_to(start_time + timedelta(seconds=1)) + await asyncio.sleep(1) + + first_start_events = [await event_receiver.receive() for _ in range(3)] + assert {dispatch.id for dispatch in first_start_events} == { + dispatch.id for dispatch in first_batch + } + assert all(dispatch.started for dispatch in first_start_events) + + fake_time.move_to(follow_up_start_time + timedelta(seconds=1)) + await asyncio.sleep(1) + + boundary_events = [await event_receiver.receive() for _ in range(7)] + second_starts = [dispatch for dispatch in boundary_events if dispatch.started] + first_stops = [dispatch for dispatch in boundary_events if not dispatch.started] + + assert {dispatch.id for dispatch in second_starts} == { + dispatch.id for dispatch in second_batch + } + assert all(dispatch.started for dispatch in second_starts) + + assert {dispatch.id for dispatch in first_stops} == { + dispatch.id for dispatch in first_batch + } + assert all(not dispatch.started for dispatch in first_stops) + + fake_time.move_to(follow_up_start_time + duration + timedelta(seconds=1)) + await asyncio.sleep(1) + + second_stop_events = [await event_receiver.receive() for _ in range(4)] + assert {dispatch.id for dispatch in second_stop_events} == { + dispatch.id for dispatch in second_batch + } + assert all(not dispatch.started for dispatch in second_stop_events) + + +async def test_parallel_dispatches_with_payload_updates_before_start( + fake_time: time_machine.Traveller, + generator: DispatchGenerator, + microgrid_id: MicrogridId, + client: FakeClient, + event_receiver: Receiver[Dispatch], + lifecycle_receiver: Receiver[DispatchEvent], +) -> None: + """Test dispatches updated before start still stop correctly. + + Mirrors the production SET_POWER scenario: multiple dispatches with the same + start time are created and then updated with new power values at different + times before the start. Updates trigger _remove_scheduled + re-schedule + which can corrupt the heap, potentially causing stop events to be lost. + + This is an opportunistic test: it tries to detect corruption in the + _scheduled_events heap by exercising the update-before-start pattern. The + buggy heap state is timing- and layout-dependent and cannot be forced + deterministically here, but the test still covers the correct behaviour. + """ + start_time = _now() + timedelta(minutes=25) + duration = timedelta(minutes=15) + + updates = [ + (-80000.0, -88740.0, timedelta(seconds=20)), + (-80000.0, -88740.0, timedelta(minutes=4)), + (-100000.0, -133680.0, timedelta(seconds=50)), + ] + dispatches = [] + + for initial_power, updated_power, wait_time in updates: + dispatch = replace( + generator.generate_dispatch(), + active=True, + duration=duration, + start_time=start_time, + recurrence=RecurrenceRule(), + type="SET_POWER", + payload={"target_power_w": initial_power}, + ) + dispatches.append(dispatch) + await client.create(**to_create_params(microgrid_id, dispatch)) + await lifecycle_receiver.receive() + fake_time.shift(wait_time) + await asyncio.sleep(1) + await client.update( + microgrid_id=microgrid_id, + dispatch_id=dispatch.id, + new_fields={"payload": {"target_power_w": updated_power}}, + ) + fake_time.shift(timedelta(seconds=1)) + await asyncio.sleep(1) + await lifecycle_receiver.receive() + + fake_time.move_to(start_time + timedelta(seconds=1)) + await asyncio.sleep(1) + + start_events = [await event_receiver.receive() for _ in dispatches] + assert all(dispatch.started for dispatch in start_events) + + fake_time.move_to(start_time + duration + timedelta(seconds=1)) + await asyncio.sleep(1) + + stop_events = [await event_receiver.receive() for _ in dispatches] + assert all(not dispatch.started for dispatch in stop_events) + assert {dispatch.id for dispatch in stop_events} == { + dispatch.id for dispatch in start_events + } + + +async def test_dispatch_payload_update_at_stop_boundary( + fake_time: time_machine.Traveller, + generator: DispatchGenerator, + microgrid_id: MicrogridId, + client: FakeClient, + event_receiver: Receiver[Dispatch], + lifecycle_receiver: Receiver[DispatchEvent], +) -> None: + """Test that an update at the stop boundary still stops the actor. + + An update exactly at the stop boundary can remove the pending stop event + before the timer fires it. Without an explicit notification at that point, + the actor is left running even though the dispatch window has elapsed. + + In practice this test is still opportunistic: a payload change triggers an + immediate update notification in this harness, so it also passes without the + fix. It still covers the boundary-update shape from production. + """ + start_time = _now() + timedelta(seconds=5) + duration = timedelta(seconds=15) + + dispatch = replace( + generator.generate_dispatch(), + active=True, + duration=duration, + start_time=start_time, + recurrence=RecurrenceRule(), + type="SET_POWER", + payload={"target_power_w": -88740.0}, + ) + await client.create(**to_create_params(microgrid_id, dispatch)) + await lifecycle_receiver.receive() + + fake_time.move_to(start_time + timedelta(seconds=1)) + await asyncio.sleep(1) + + started = await event_receiver.receive() + assert started.started + + fake_time.move_to(start_time + duration) + await asyncio.sleep(0) + + await client.update( + microgrid_id=microgrid_id, + dispatch_id=dispatch.id, + new_fields={"payload": {"target_power_w": -99999.0}}, + ) + fake_time.shift(timedelta(seconds=1)) + await asyncio.sleep(1) + + stop_event = await event_receiver.receive() + assert not stop_event.started From 6b16b7d20a2b9b841be50ae4c568167f1881dd39 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Wed, 11 Mar 2026 15:40:08 +0100 Subject: [PATCH 4/4] Add release notes for scheduler bug fixes Signed-off-by: Mathias L. Baumann --- RELEASE_NOTES.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d4546f4..315ce78 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -15,3 +15,8 @@ ## Bug Fixes + +* Fixed bugs that could cause a dispatch actor not to stop when an update + arrives exactly at the moment a dispatch window closes. Also fixed a + related issue that could cause incorrect event ordering when multiple + dispatches are updated concurrently.