From 73258cb16feeeef3630b2a200004c957d41505c5 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 9 Feb 2026 12:06:19 +0100 Subject: [PATCH 1/9] Support closing of `Sender`s Signed-off-by: Sahas Subramanian --- src/frequenz/channels/__init__.py | 3 ++- src/frequenz/channels/_anycast.py | 19 ++++++++++++++++- src/frequenz/channels/_broadcast.py | 18 +++++++++++++++- src/frequenz/channels/_sender.py | 21 +++++++++++++++++++ .../channels/experimental/_relay_sender.py | 9 ++++++-- 5 files changed, 65 insertions(+), 5 deletions(-) diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index 87c86a34..de836443 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -100,7 +100,7 @@ select, selected_from, ) -from ._sender import Sender, SenderError +from ._sender import Sender, SenderClosedError, SenderError __all__ = [ "Anycast", @@ -120,6 +120,7 @@ "SelectError", "Selected", "Sender", + "SenderClosedError", "SenderError", "SenderMessageT_co", "SenderMessageT_contra", diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index b5184a3f..5c3f8284 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -15,7 +15,7 @@ from ._exceptions import ChannelClosedError from ._generic import ChannelMessageT from ._receiver import Receiver, ReceiverStoppedError -from ._sender import Sender, SenderError +from ._sender import Sender, SenderClosedError, SenderError _logger = logging.getLogger(__name__) @@ -327,6 +327,9 @@ def __init__(self, channel: Anycast[_T], /) -> None: self._channel: Anycast[_T] = channel """The channel that this sender belongs to.""" + self._closed: bool = False + """Whether the sender is closed.""" + @override async def send(self, message: _T, /) -> None: """Send a message across the channel. @@ -343,7 +346,11 @@ async def send(self, message: _T, /) -> None: SenderError: If the underlying channel was closed. A [ChannelClosedError][frequenz.channels.ChannelClosedError] is set as the cause. + SenderClosedError: If this sender was closed. """ + if self._closed: + raise SenderClosedError(self) + # pylint: disable=protected-access if self._channel._closed: raise SenderError("The channel was closed", self) from ChannelClosedError( @@ -367,6 +374,16 @@ async def send(self, message: _T, /) -> None: self._channel._recv_cv.notify(1) # pylint: enable=protected-access + @override + async def aclose(self) -> None: + """Close this sender. + + After closing, the sender will not be able to send any more messages. Any + attempt to send a message through a closed sender will raise a + [SenderError][frequenz.channels.SenderError]. + """ + self._closed = True + def __str__(self) -> str: """Return a string representation of this sender.""" return f"{self._channel}:{type(self).__name__}" diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 2c167d5e..f55003a5 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -16,7 +16,7 @@ from ._exceptions import ChannelClosedError from ._generic import ChannelMessageT from ._receiver import Receiver, ReceiverStoppedError -from ._sender import Sender, SenderError +from ._sender import Sender, SenderClosedError, SenderError _logger = logging.getLogger(__name__) @@ -334,6 +334,9 @@ def __init__(self, channel: Broadcast[_T], /) -> None: self._channel: Broadcast[_T] = channel """The broadcast channel this sender belongs to.""" + self._closed: bool = False + """Whether this sender is closed.""" + @override async def send(self, message: _T, /) -> None: """Send a message to all broadcast receivers. @@ -345,7 +348,10 @@ async def send(self, message: _T, /) -> None: SenderError: If the underlying channel was closed. A [ChannelClosedError][frequenz.channels.ChannelClosedError] is set as the cause. + SenderClosedError: If this sender was closed. """ + if self._closed: + raise SenderClosedError(self) # pylint: disable=protected-access if self._channel._closed: raise SenderError("The channel was closed", self) from ChannelClosedError( @@ -365,6 +371,16 @@ async def send(self, message: _T, /) -> None: self._channel._recv_cv.notify_all() # pylint: enable=protected-access + @override + async def aclose(self) -> None: + """Close this sender. + + After a sender is closed, it can no longer be used to send messages. Any + attempt to send a message through a closed sender will raise a + [SenderClosedError][frequenz.channels.SenderClosedError]. + """ + self._closed = True + def __str__(self) -> str: """Return a string representation of this sender.""" return f"{self._channel}:{type(self).__name__}" diff --git a/src/frequenz/channels/_sender.py b/src/frequenz/channels/_sender.py index e225e94c..908d00f8 100644 --- a/src/frequenz/channels/_sender.py +++ b/src/frequenz/channels/_sender.py @@ -70,6 +70,15 @@ async def send(self, message: SenderMessageT_contra, /) -> None: SenderError: If there was an error sending the message. """ + @abstractmethod + async def aclose(self) -> None: + """Close this sender. + + After a sender is closed, it can no longer be used to send messages. Any + attempt to send a message through a closed sender will raise a + [SenderClosedError][frequenz.channels.SenderClosedError]. + """ + class SenderError(Error, Generic[SenderMessageT_co]): """An error that originated in a [Sender][frequenz.channels.Sender]. @@ -88,3 +97,15 @@ def __init__(self, message: str, sender: Sender[SenderMessageT_co]): super().__init__(message) self.sender: Sender[SenderMessageT_co] = sender """The sender where the error happened.""" + + +class SenderClosedError(SenderError[SenderMessageT_co]): + """An error indicating that a send operation was attempted on a closed sender.""" + + def __init__(self, sender: Sender[SenderMessageT_co]): + """Initialize this error. + + Args: + sender: The [Sender][frequenz.channels.Sender] that was closed. + """ + super().__init__("Sender is closed", sender) diff --git a/src/frequenz/channels/experimental/_relay_sender.py b/src/frequenz/channels/experimental/_relay_sender.py index 398ba8d5..0a5d8063 100644 --- a/src/frequenz/channels/experimental/_relay_sender.py +++ b/src/frequenz/channels/experimental/_relay_sender.py @@ -7,7 +7,7 @@ to the senders it was created with. """ -import typing +import asyncio from typing_extensions import override @@ -15,7 +15,7 @@ from .._sender import Sender -class RelaySender(typing.Generic[SenderMessageT_contra], Sender[SenderMessageT_contra]): +class RelaySender(Sender[SenderMessageT_contra]): """A Sender for sending messages to multiple senders. The `RelaySender` class takes multiple senders and forwards all the messages sent to @@ -57,3 +57,8 @@ async def send(self, message: SenderMessageT_contra, /) -> None: """ for sender in self._senders: await sender.send(message) + + @override + async def aclose(self) -> None: + """Close this sender.""" + await asyncio.gather(*(sender.aclose() for sender in self._senders)) From 68b3348c945e80ea3e0b6c10d037b8b1e86db121 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 9 Feb 2026 18:01:21 +0100 Subject: [PATCH 2/9] Add one-shot channel implementation Signed-off-by: Sahas Subramanian --- src/frequenz/channels/__init__.py | 4 + src/frequenz/channels/_oneshot.py | 142 ++++++++++++++++++++++++++++++ tests/test_oneshot.py | 87 ++++++++++++++++++ 3 files changed, 233 insertions(+) create mode 100644 src/frequenz/channels/_oneshot.py create mode 100644 tests/test_oneshot.py diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index de836443..a6a092a9 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -92,6 +92,7 @@ ) from ._latest_value_cache import LatestValueCache from ._merge import Merger, merge +from ._oneshot import OneshotChannel, OneshotReceiver, OneshotSender from ._receiver import Receiver, ReceiverError, ReceiverStoppedError from ._select import ( Selected, @@ -113,6 +114,9 @@ "LatestValueCache", "MappedMessageT_co", "Merger", + "OneshotChannel", + "OneshotReceiver", + "OneshotSender", "Receiver", "ReceiverError", "ReceiverMessageT_co", diff --git a/src/frequenz/channels/_oneshot.py b/src/frequenz/channels/_oneshot.py new file mode 100644 index 00000000..15c8e3a6 --- /dev/null +++ b/src/frequenz/channels/_oneshot.py @@ -0,0 +1,142 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""A channel that can send a single message.""" + +from __future__ import annotations + +import asyncio +import typing + +from ._generic import ChannelMessageT +from ._receiver import Receiver, ReceiverStoppedError +from ._sender import Sender, SenderClosedError + + +class _Empty: + """A sentinel indicating that no message has been sent.""" + + +_EMPTY = _Empty() + + +class _Oneshot(typing.Generic[ChannelMessageT]): + """Internal representation of a one-shot channel. + + A one-shot channel is a channel that can only send one message. After the first + message is sent, the sender is closed and any further attempts to send a message + will raise a `SenderClosedError`. + """ + + def __init__(self) -> None: + """Create a new one-shot channel.""" + self.message: ChannelMessageT | _Empty = _EMPTY + self.closed: bool = False + self.drained: bool = False + self.event: asyncio.Event = asyncio.Event() + + +class OneshotSender(Sender[ChannelMessageT]): + """A sender for a one-shot channel.""" + + def __init__(self, channel: _Oneshot[ChannelMessageT]) -> None: + """Initialize this sender.""" + self._channel = channel + + async def send(self, message: ChannelMessageT, /) -> None: + """Send a message through this sender.""" + if self._channel.closed: + raise SenderClosedError(self) + self._channel.message = message + self._channel.closed = True + self._channel.event.set() + + async def aclose(self) -> None: + """Close this sender.""" + self._channel.closed = True + if isinstance(self._channel.message, _Empty): + self._channel.drained = True + self._channel.event.set() + + +class OneshotReceiver(Receiver[ChannelMessageT]): + """A receiver for a one-shot channel.""" + + def __init__(self, channel: _Oneshot[ChannelMessageT]) -> None: + """Initialize this receiver.""" + self._channel = channel + + async def ready(self) -> bool: + """Check if a message is ready to be received. + + Returns: + `True` if a message is ready to be received, `False` if the sender + is closed and no message will be sent. + """ + if self._channel.drained: + return False + while not self._channel.closed: + await self._channel.event.wait() + if isinstance(self._channel.message, _Empty): + return False + return True + + def consume(self) -> ChannelMessageT: + """Consume a message from this receiver. + + Returns: + The message that was sent through this channel. + + Raises: + ReceiverStoppedError: If the sender was closed without sending a message. + """ + if self._channel.drained: + raise ReceiverStoppedError(self) + + assert not isinstance( + self._channel.message, _Empty + ), "`consume()` must be preceded by a call to `ready()`." + + self._channel.drained = True + self._channel.event.clear() + return self._channel.message + + +class OneshotChannel( + tuple[OneshotSender[ChannelMessageT], OneshotReceiver[ChannelMessageT]] +): + """A channel that can send a single message. + + A one-shot channel is a channel that can only send one message. After the first + message is sent, the sender is closed and any further attempts to send a message + will raise a `SenderClosedError`. + + # Example + + This example demonstrates how to use a one-shot channel to send a message + from one task to another. + + ```python + import asyncio + + from frequenz.channels import OneshotChannel, OneshotSender + + async def send(sender: OneshotSender[int]) -> None: + await sender.send(42) + + async def main() -> None: + sender, receiver = OneshotChannel[int]() + + async with asyncio.TaskGroup() as tg: + tg.create_task(send(sender)) + assert await receiver.receive() == 42 + + asyncio.run(main()) + ``` + """ + + def __new__(cls) -> OneshotChannel[ChannelMessageT]: + """Create a new one-shot channel.""" + channel = _Oneshot[ChannelMessageT]() + + return tuple.__new__(cls, (OneshotSender(channel), OneshotReceiver(channel))) diff --git a/tests/test_oneshot.py b/tests/test_oneshot.py new file mode 100644 index 00000000..728bdcde --- /dev/null +++ b/tests/test_oneshot.py @@ -0,0 +1,87 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Tests for the oneshot channel.""" + +import asyncio + +import pytest + +from frequenz.channels import ( + OneshotChannel, + ReceiverStoppedError, + SenderClosedError, +) + + +async def test_oneshot_recv_after_send() -> None: + """Test the oneshot function. + + `receiver.receive()` is called after `sender.send()`. + """ + sender, receiver = OneshotChannel[int]() + + await sender.send(42) + assert await receiver.receive() == 42 + + with pytest.raises(SenderClosedError): + await sender.send(43) + with pytest.raises(ReceiverStoppedError): + await receiver.receive() + + +async def test_oneshot_recv_before_send() -> None: + """Test the oneshot function. + + `receiver.receive()` is called before `sender.send()`. + """ + sender, receiver = OneshotChannel[int]() + + task = asyncio.create_task(receiver.receive()) + + # Give the receiver a chance to start waiting + await asyncio.sleep(0.0) + + await sender.send(42) + assert await task == 42 + + with pytest.raises(SenderClosedError): + await sender.send(43) + with pytest.raises(ReceiverStoppedError): + await receiver.receive() + + +async def test_oneshot_recv_after_sender_closed() -> None: + """Test that closing sender works without sending a message. + + `receiver.receive()` is called after `sender.aclose()`. + """ + sender, receiver = OneshotChannel[int]() + + await sender.aclose() + + with pytest.raises(ReceiverStoppedError): + await receiver.receive() + with pytest.raises(SenderClosedError): + await sender.send(4) + + +async def test_oneshot_recv_before_sender_closed() -> None: + """Test that closing sender works without sending a message. + + `receiver.receive()` is called before `sender.aclose()`. + """ + sender, receiver = OneshotChannel[int]() + + task = asyncio.create_task(receiver.receive()) + + # Give the receiver a chance to start waiting + await asyncio.sleep(0.0) + + await sender.aclose() + + with pytest.raises(ReceiverStoppedError): + await task + + with pytest.raises(SenderClosedError): + await sender.send(4) From 9c7a4b28643f14e8a3051fa36a4ad73dbcd31d57 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 5 Mar 2026 14:35:30 +0100 Subject: [PATCH 3/9] Update release notes Signed-off-by: Sahas Subramanian --- RELEASE_NOTES.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 7e1258ff..afebaa77 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -11,7 +11,9 @@ ## New Features - +- There's a new `Oneshot` channel, which returns a sender and a receiver. A single message can be sent using the sender, after which it will be closed. And the receiver will close as soon as the message is received. + +- `Sender`s now have an `aclose`, which must be called, when they are no-longer needed. ## Bug Fixes From 8570c96dabc4d0907b79a5651db4c24bb43c5a27 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 9 Feb 2026 11:40:40 +0100 Subject: [PATCH 4/9] Make broadcast sender clonable and subscribable Signed-off-by: Sahas Subramanian --- src/frequenz/channels/_broadcast.py | 23 ++++++++++++++++--- src/frequenz/channels/_sender.py | 35 +++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index f55003a5..ff06880c 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -16,7 +16,7 @@ from ._exceptions import ChannelClosedError from ._generic import ChannelMessageT from ._receiver import Receiver, ReceiverStoppedError -from ._sender import Sender, SenderClosedError, SenderError +from ._sender import ClonableSubscribableSender, SenderClosedError, SenderError _logger = logging.getLogger(__name__) @@ -269,7 +269,7 @@ async def close(self) -> None: # noqa: D402 """Close the channel, deprecated alias for `aclose()`.""" # noqa: D402 return await self.aclose() - def new_sender(self) -> Sender[ChannelMessageT]: + def new_sender(self) -> ClonableSubscribableSender[ChannelMessageT]: """Return a new sender attached to this channel.""" return _Sender(self) @@ -317,7 +317,7 @@ def __repr__(self) -> str: _T = TypeVar("_T") -class _Sender(Sender[_T]): +class _Sender(ClonableSubscribableSender[_T]): """A sender to send messages to the broadcast channel. Should not be created directly, but through the @@ -381,6 +381,23 @@ async def aclose(self) -> None: """ self._closed = True + @override + def clone(self) -> _Sender[_T]: + """Return a clone of this sender.""" + return _Sender(self._channel) + + @override + def subscribe( + self, + name: str | None = None, + limit: int = 50, + warn_on_overflow: bool = True, + ) -> Receiver[_T]: + """Return a new receiver attached to this sender's channel.""" + return self._channel.new_receiver( + name=name, limit=limit, warn_on_overflow=warn_on_overflow + ) + def __str__(self) -> str: """Return a string representation of this sender.""" return f"{self._channel}:{type(self).__name__}" diff --git a/src/frequenz/channels/_sender.py b/src/frequenz/channels/_sender.py index 908d00f8..4c1a6d07 100644 --- a/src/frequenz/channels/_sender.py +++ b/src/frequenz/channels/_sender.py @@ -49,11 +49,14 @@ ``` """ +from __future__ import annotations + from abc import ABC, abstractmethod from typing import Generic from ._exceptions import Error from ._generic import SenderMessageT_co, SenderMessageT_contra +from ._receiver import Receiver class Sender(ABC, Generic[SenderMessageT_contra]): @@ -109,3 +112,35 @@ def __init__(self, sender: Sender[SenderMessageT_co]): sender: The [Sender][frequenz.channels.Sender] that was closed. """ super().__init__("Sender is closed", sender) + + +class SubscribableSender(Sender[SenderMessageT_contra], ABC): + """A [Sender][frequenz.channels.Sender] that can be subscribed to.""" + + @abstractmethod + def subscribe(self) -> Receiver[SenderMessageT_contra]: + """Subscribe to this sender. + + Returns: + A new sender that sends messages to the same channel as this sender. + """ + + +class ClonableSender(Sender[SenderMessageT_contra], ABC): + """A [Sender][frequenz.channels.Sender] that can be cloned.""" + + @abstractmethod + def clone(self) -> ClonableSender[SenderMessageT_contra]: + """Clone this sender. + + Returns: + A new sender that sends messages to the same channel as this sender. + """ + + +class ClonableSubscribableSender( + SubscribableSender[SenderMessageT_contra], + ClonableSender[SenderMessageT_contra], + ABC, +): + """A [Sender][frequenz.channels.Sender] that can be both cloned and subscribed to.""" From b6fd947ba69fc2a65bbefcdc72629c92d49deeba Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 9 Feb 2026 12:19:28 +0100 Subject: [PATCH 5/9] Track sender count on sender create and close Signed-off-by: Sahas Subramanian --- src/frequenz/channels/_broadcast.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index ff06880c..35fa8341 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -207,6 +207,9 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: self._recv_cv: Condition = Condition() """The condition to wait for data in the channel's buffer.""" + self._sender_count: int = 0 + """The number of senders attached to this channel.""" + self._receivers: dict[ int, weakref.ReferenceType[_Receiver[ChannelMessageT]] ] = {} @@ -337,6 +340,13 @@ def __init__(self, channel: Broadcast[_T], /) -> None: self._closed: bool = False """Whether this sender is closed.""" + self._channel._sender_count += 1 + + @property + def sender_count(self) -> int: + """Return the number of open senders attached to this sender's channel.""" + return self._channel._sender_count # pylint: disable=protected-access + @override async def send(self, message: _T, /) -> None: """Send a message to all broadcast receivers. @@ -379,7 +389,15 @@ async def aclose(self) -> None: attempt to send a message through a closed sender will raise a [SenderClosedError][frequenz.channels.SenderClosedError]. """ + if self._closed: + return self._closed = True + self._channel._sender_count -= 1 + + def __del__(self) -> None: + """Clean up this sender.""" + if not self._closed: + self._channel._sender_count -= 1 @override def clone(self) -> _Sender[_T]: From 6786edf85d55453fd25462becebe3006aa229a95 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 9 Mar 2026 13:54:03 +0100 Subject: [PATCH 6/9] Expose strongly-typed Senders and Receivers from Broadcast Signed-off-by: Sahas Subramanian --- src/frequenz/channels/_broadcast.py | 20 ++++++++++---------- tests/test_broadcast.py | 6 +++--- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 35fa8341..3fc3fa90 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -211,7 +211,7 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: """The number of senders attached to this channel.""" self._receivers: dict[ - int, weakref.ReferenceType[_Receiver[ChannelMessageT]] + int, weakref.ReferenceType[BroadcastReceiver[ChannelMessageT]] ] = {} """The receivers attached to the channel, indexed by their hash().""" @@ -272,13 +272,13 @@ async def close(self) -> None: # noqa: D402 """Close the channel, deprecated alias for `aclose()`.""" # noqa: D402 return await self.aclose() - def new_sender(self) -> ClonableSubscribableSender[ChannelMessageT]: + def new_sender(self) -> BroadcastSender[ChannelMessageT]: """Return a new sender attached to this channel.""" - return _Sender(self) + return BroadcastSender(self) def new_receiver( self, *, name: str | None = None, limit: int = 50, warn_on_overflow: bool = True - ) -> Receiver[ChannelMessageT]: + ) -> BroadcastReceiver[ChannelMessageT]: """Return a new receiver attached to this channel. Broadcast receivers have their own buffer, and when messages are not @@ -294,7 +294,7 @@ def new_receiver( Returns: A new receiver attached to this channel. """ - recv: _Receiver[ChannelMessageT] = _Receiver( + recv: BroadcastReceiver[ChannelMessageT] = BroadcastReceiver( self, name=name, limit=limit, warn_on_overflow=warn_on_overflow ) self._receivers[hash(recv)] = weakref.ref(recv) @@ -320,7 +320,7 @@ def __repr__(self) -> str: _T = TypeVar("_T") -class _Sender(ClonableSubscribableSender[_T]): +class BroadcastSender(ClonableSubscribableSender[_T]): """A sender to send messages to the broadcast channel. Should not be created directly, but through the @@ -400,9 +400,9 @@ def __del__(self) -> None: self._channel._sender_count -= 1 @override - def clone(self) -> _Sender[_T]: + def clone(self) -> BroadcastSender[_T]: """Return a clone of this sender.""" - return _Sender(self._channel) + return BroadcastSender(self._channel) @override def subscribe( @@ -410,7 +410,7 @@ def subscribe( name: str | None = None, limit: int = 50, warn_on_overflow: bool = True, - ) -> Receiver[_T]: + ) -> BroadcastReceiver[_T]: """Return a new receiver attached to this sender's channel.""" return self._channel.new_receiver( name=name, limit=limit, warn_on_overflow=warn_on_overflow @@ -425,7 +425,7 @@ def __repr__(self) -> str: return f"{type(self).__name__}({self._channel!r})" -class _Receiver(Receiver[_T]): +class BroadcastReceiver(Receiver[_T]): """A receiver to receive messages from the broadcast channel. Should not be created directly, but through the diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 0cc89f33..e2b8b199 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -107,7 +107,7 @@ async def test_broadcast_after_close() -> None: async def test_broadcast_overflow() -> None: """Ensure messages sent to full broadcast receivers get dropped.""" from frequenz.channels._broadcast import ( # pylint: disable=import-outside-toplevel - _Receiver, + BroadcastReceiver, ) bcast: Broadcast[int] = Broadcast(name="meter_5") @@ -117,9 +117,9 @@ async def test_broadcast_overflow() -> None: sender = bcast.new_sender() big_receiver = bcast.new_receiver(name="named-recv", limit=big_recv_size) - assert isinstance(big_receiver, _Receiver) + assert isinstance(big_receiver, BroadcastReceiver) small_receiver = bcast.new_receiver(limit=small_recv_size) - assert isinstance(small_receiver, _Receiver) + assert isinstance(small_receiver, BroadcastReceiver) async def drain_receivers() -> tuple[int, int]: big_sum = 0 From 1ece451657b4228e4090834c391912561935f071 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 9 Feb 2026 16:39:58 +0100 Subject: [PATCH 7/9] Implement auto-close support for Broadcast channels Signed-off-by: Sahas Subramanian --- src/frequenz/channels/_broadcast.py | 32 +++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 3fc3fa90..1c88a8d6 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -21,7 +21,9 @@ _logger = logging.getLogger(__name__) -class Broadcast(Generic[ChannelMessageT]): +class Broadcast( # pylint: disable=too-many-instance-attributes + Generic[ChannelMessageT] +): """A channel that deliver all messages to all receivers. # Description @@ -184,7 +186,13 @@ async def main() -> None: ``` """ - def __init__(self, *, name: str, resend_latest: bool = False) -> None: + def __init__( + self, + *, + name: str, + resend_latest: bool = False, + auto_close: bool = False, + ) -> None: """Initialize this channel. Args: @@ -197,6 +205,8 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: wait for the next message on the channel to arrive. It is safe to be set in data/reporting channels, but is not recommended for use in channels that stream control instructions. + auto_close: If True, the channel will be closed when all senders or all + receivers are closed. """ self._name: str = name """The name of the broadcast channel. @@ -221,6 +231,9 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: self._latest: ChannelMessageT | None = None """The latest message sent to the channel.""" + self._auto_close_enabled: bool = auto_close + """Whether to close the channel when all senders or all receivers are closed.""" + self.resend_latest: bool = resend_latest """Whether to resend the latest message to new receivers. @@ -367,6 +380,10 @@ async def send(self, message: _T, /) -> None: raise SenderError("The channel was closed", self) from ChannelClosedError( self._channel ) + if self._channel._auto_close_enabled and len(self._channel._receivers) == 0: + raise SenderError("The channel was closed", self) from ChannelClosedError( + self._channel + ) self._channel._latest = message stale_refs = [] for _hash, recv_ref in self._channel._receivers.items(): @@ -394,6 +411,12 @@ async def aclose(self) -> None: self._closed = True self._channel._sender_count -= 1 + if ( + self._channel._sender_count == 0 # pylint: disable=protected-access + and self._channel._auto_close_enabled # pylint: disable=protected-access + ): + await self._channel.aclose() + def __del__(self) -> None: """Clean up this sender.""" if not self._closed: @@ -527,6 +550,11 @@ async def ready(self) -> bool: while len(self._q) == 0: if self._channel._closed or self._closed: return False + if self._channel._auto_close_enabled and ( + self._channel._sender_count == 0 or len(self._channel._receivers) == 0 + ): + await self._channel.aclose() + return False async with self._channel._recv_cv: await self._channel._recv_cv.wait() return True From f65161fd2c1f9f50fec59eda3d601cc6d24d32f3 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 9 Feb 2026 16:45:38 +0100 Subject: [PATCH 8/9] Add `make_broadcast` function and deprecate `Broadcast` class The `make_broadcast` function would only return a sender and a receiver from an auto-closing channel. Signed-off-by: Sahas Subramanian --- src/frequenz/channels/__init__.py | 15 +++++++++++++-- src/frequenz/channels/_broadcast.py | 30 +++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index a6a092a9..eaf125d1 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -80,7 +80,7 @@ """ from ._anycast import Anycast -from ._broadcast import Broadcast +from ._broadcast import Broadcast, BroadcastChannel from ._exceptions import ChannelClosedError, ChannelError, Error from ._generic import ( ChannelMessageT, @@ -101,14 +101,24 @@ select, selected_from, ) -from ._sender import Sender, SenderClosedError, SenderError +from ._sender import ( + ClonableSender, + ClonableSubscribableSender, + Sender, + SenderClosedError, + SenderError, + SubscribableSender, +) __all__ = [ "Anycast", "Broadcast", + "BroadcastChannel", "ChannelClosedError", "ChannelError", "ChannelMessageT", + "ClonableSender", + "ClonableSubscribableSender", "Error", "ErroredChannelT_co", "LatestValueCache", @@ -128,6 +138,7 @@ "SenderError", "SenderMessageT_co", "SenderMessageT_contra", + "SubscribableSender", "UnhandledSelectedError", "merge", "select", diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 1c88a8d6..73f908d0 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -21,6 +21,7 @@ _logger = logging.getLogger(__name__) +@deprecated("Please use BroadcastChannel channel instead.") class Broadcast( # pylint: disable=too-many-instance-attributes Generic[ChannelMessageT] ): @@ -604,3 +605,32 @@ def __repr__(self) -> str: f"{type(self).__name__}(name={self._name!r}, limit={limit!r}, " f"{self._channel!r}):" ) + + +class BroadcastChannel( + tuple[BroadcastSender[ChannelMessageT], BroadcastReceiver[ChannelMessageT]] +): + """A broadcast channel, deprecated alias for Broadcast.""" + + def __new__( + cls, + name: str, + resend_latest: bool = False, + limit: int = 50, + warn_on_overflow: bool = True, + ) -> BroadcastChannel[ChannelMessageT]: + """Create a new broadcast channel, deprecated alias for Broadcast.""" + channel = Broadcast[ChannelMessageT]( + name=name, resend_latest=resend_latest, auto_close=True + ) + return tuple.__new__( + cls, + ( + channel.new_sender(), + channel.new_receiver( + name=f"{name}_receiver", + limit=limit, + warn_on_overflow=warn_on_overflow, + ), + ), + ) From 49b5b42315314f4ef78910f16b7d444349efa578 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 9 Feb 2026 16:48:42 +0100 Subject: [PATCH 9/9] Test the auto-close feature of broadcast channels Signed-off-by: Sahas Subramanian --- tests/test_broadcast.py | 48 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index e2b8b199..1f391844 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -12,6 +12,7 @@ from frequenz.channels import ( Broadcast, + BroadcastChannel, ChannelClosedError, Receiver, ReceiverStoppedError, @@ -425,3 +426,50 @@ async def test_broadcast_close_receiver() -> None: with pytest.raises(ReceiverStoppedError): _ = await receiver_2.receive() + + +async def test_broadcast_auto_close_1() -> None: + """Ensure broadcast auto close works when all receivers are closed.""" + sender, receiver = BroadcastChannel[int](name="auto-close-test") + + receiver_2 = sender.subscribe() + + await sender.send(1) + + assert (await receiver.receive()) == 1 + assert (await receiver_2.receive()) == 1 + + receiver.close() + + await sender.send(2) + + assert (await receiver_2.receive()) == 2 + + receiver_2.close() + + with pytest.raises(SenderError) as excinfo: + await sender.send(3) + assert isinstance(excinfo.value.__cause__, ChannelClosedError) + + +async def test_broadcast_auto_close_2() -> None: + """Ensure broadcast auto close works when all senders are closed.""" + sender, receiver = BroadcastChannel[int](name="auto-close-test") + + await sender.send(1) + + assert (await receiver.receive()) == 1 + + sender_2 = sender.clone() + + await sender.aclose() + + await sender_2.send(2) + + await sender_2.aclose() + + assert (await receiver.receive()) == 2 + + with pytest.raises(ReceiverStoppedError) as excinfo: + await receiver.receive() + assert isinstance(excinfo.value.__cause__, ChannelClosedError)