From 73258cb16feeeef3630b2a200004c957d41505c5 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 9 Feb 2026 12:06:19 +0100 Subject: [PATCH 1/3] Support closing of `Sender`s Signed-off-by: Sahas Subramanian --- src/frequenz/channels/__init__.py | 3 ++- src/frequenz/channels/_anycast.py | 19 ++++++++++++++++- src/frequenz/channels/_broadcast.py | 18 +++++++++++++++- src/frequenz/channels/_sender.py | 21 +++++++++++++++++++ .../channels/experimental/_relay_sender.py | 9 ++++++-- 5 files changed, 65 insertions(+), 5 deletions(-) diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index 87c86a34..de836443 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -100,7 +100,7 @@ select, selected_from, ) -from ._sender import Sender, SenderError +from ._sender import Sender, SenderClosedError, SenderError __all__ = [ "Anycast", @@ -120,6 +120,7 @@ "SelectError", "Selected", "Sender", + "SenderClosedError", "SenderError", "SenderMessageT_co", "SenderMessageT_contra", diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index b5184a3f..5c3f8284 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -15,7 +15,7 @@ from ._exceptions import ChannelClosedError from ._generic import ChannelMessageT from ._receiver import Receiver, ReceiverStoppedError -from ._sender import Sender, SenderError +from ._sender import Sender, SenderClosedError, SenderError _logger = logging.getLogger(__name__) @@ -327,6 +327,9 @@ def __init__(self, channel: Anycast[_T], /) -> None: self._channel: Anycast[_T] = channel """The channel that this sender belongs to.""" + self._closed: bool = False + """Whether the sender is closed.""" + @override async def send(self, message: _T, /) -> None: """Send a message across the channel. @@ -343,7 +346,11 @@ async def send(self, message: _T, /) -> None: SenderError: If the underlying channel was closed. A [ChannelClosedError][frequenz.channels.ChannelClosedError] is set as the cause. + SenderClosedError: If this sender was closed. """ + if self._closed: + raise SenderClosedError(self) + # pylint: disable=protected-access if self._channel._closed: raise SenderError("The channel was closed", self) from ChannelClosedError( @@ -367,6 +374,16 @@ async def send(self, message: _T, /) -> None: self._channel._recv_cv.notify(1) # pylint: enable=protected-access + @override + async def aclose(self) -> None: + """Close this sender. + + After closing, the sender will not be able to send any more messages. Any + attempt to send a message through a closed sender will raise a + [SenderError][frequenz.channels.SenderError]. + """ + self._closed = True + def __str__(self) -> str: """Return a string representation of this sender.""" return f"{self._channel}:{type(self).__name__}" diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 2c167d5e..f55003a5 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -16,7 +16,7 @@ from ._exceptions import ChannelClosedError from ._generic import ChannelMessageT from ._receiver import Receiver, ReceiverStoppedError -from ._sender import Sender, SenderError +from ._sender import Sender, SenderClosedError, SenderError _logger = logging.getLogger(__name__) @@ -334,6 +334,9 @@ def __init__(self, channel: Broadcast[_T], /) -> None: self._channel: Broadcast[_T] = channel """The broadcast channel this sender belongs to.""" + self._closed: bool = False + """Whether this sender is closed.""" + @override async def send(self, message: _T, /) -> None: """Send a message to all broadcast receivers. @@ -345,7 +348,10 @@ async def send(self, message: _T, /) -> None: SenderError: If the underlying channel was closed. A [ChannelClosedError][frequenz.channels.ChannelClosedError] is set as the cause. + SenderClosedError: If this sender was closed. """ + if self._closed: + raise SenderClosedError(self) # pylint: disable=protected-access if self._channel._closed: raise SenderError("The channel was closed", self) from ChannelClosedError( @@ -365,6 +371,16 @@ async def send(self, message: _T, /) -> None: self._channel._recv_cv.notify_all() # pylint: enable=protected-access + @override + async def aclose(self) -> None: + """Close this sender. + + After a sender is closed, it can no longer be used to send messages. Any + attempt to send a message through a closed sender will raise a + [SenderClosedError][frequenz.channels.SenderClosedError]. + """ + self._closed = True + def __str__(self) -> str: """Return a string representation of this sender.""" return f"{self._channel}:{type(self).__name__}" diff --git a/src/frequenz/channels/_sender.py b/src/frequenz/channels/_sender.py index e225e94c..908d00f8 100644 --- a/src/frequenz/channels/_sender.py +++ b/src/frequenz/channels/_sender.py @@ -70,6 +70,15 @@ async def send(self, message: SenderMessageT_contra, /) -> None: SenderError: If there was an error sending the message. """ + @abstractmethod + async def aclose(self) -> None: + """Close this sender. + + After a sender is closed, it can no longer be used to send messages. Any + attempt to send a message through a closed sender will raise a + [SenderClosedError][frequenz.channels.SenderClosedError]. + """ + class SenderError(Error, Generic[SenderMessageT_co]): """An error that originated in a [Sender][frequenz.channels.Sender]. @@ -88,3 +97,15 @@ def __init__(self, message: str, sender: Sender[SenderMessageT_co]): super().__init__(message) self.sender: Sender[SenderMessageT_co] = sender """The sender where the error happened.""" + + +class SenderClosedError(SenderError[SenderMessageT_co]): + """An error indicating that a send operation was attempted on a closed sender.""" + + def __init__(self, sender: Sender[SenderMessageT_co]): + """Initialize this error. + + Args: + sender: The [Sender][frequenz.channels.Sender] that was closed. + """ + super().__init__("Sender is closed", sender) diff --git a/src/frequenz/channels/experimental/_relay_sender.py b/src/frequenz/channels/experimental/_relay_sender.py index 398ba8d5..0a5d8063 100644 --- a/src/frequenz/channels/experimental/_relay_sender.py +++ b/src/frequenz/channels/experimental/_relay_sender.py @@ -7,7 +7,7 @@ to the senders it was created with. """ -import typing +import asyncio from typing_extensions import override @@ -15,7 +15,7 @@ from .._sender import Sender -class RelaySender(typing.Generic[SenderMessageT_contra], Sender[SenderMessageT_contra]): +class RelaySender(Sender[SenderMessageT_contra]): """A Sender for sending messages to multiple senders. The `RelaySender` class takes multiple senders and forwards all the messages sent to @@ -57,3 +57,8 @@ async def send(self, message: SenderMessageT_contra, /) -> None: """ for sender in self._senders: await sender.send(message) + + @override + async def aclose(self) -> None: + """Close this sender.""" + await asyncio.gather(*(sender.aclose() for sender in self._senders)) From 68b3348c945e80ea3e0b6c10d037b8b1e86db121 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Mon, 9 Feb 2026 18:01:21 +0100 Subject: [PATCH 2/3] Add one-shot channel implementation Signed-off-by: Sahas Subramanian --- src/frequenz/channels/__init__.py | 4 + src/frequenz/channels/_oneshot.py | 142 ++++++++++++++++++++++++++++++ tests/test_oneshot.py | 87 ++++++++++++++++++ 3 files changed, 233 insertions(+) create mode 100644 src/frequenz/channels/_oneshot.py create mode 100644 tests/test_oneshot.py diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index de836443..a6a092a9 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -92,6 +92,7 @@ ) from ._latest_value_cache import LatestValueCache from ._merge import Merger, merge +from ._oneshot import OneshotChannel, OneshotReceiver, OneshotSender from ._receiver import Receiver, ReceiverError, ReceiverStoppedError from ._select import ( Selected, @@ -113,6 +114,9 @@ "LatestValueCache", "MappedMessageT_co", "Merger", + "OneshotChannel", + "OneshotReceiver", + "OneshotSender", "Receiver", "ReceiverError", "ReceiverMessageT_co", diff --git a/src/frequenz/channels/_oneshot.py b/src/frequenz/channels/_oneshot.py new file mode 100644 index 00000000..15c8e3a6 --- /dev/null +++ b/src/frequenz/channels/_oneshot.py @@ -0,0 +1,142 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""A channel that can send a single message.""" + +from __future__ import annotations + +import asyncio +import typing + +from ._generic import ChannelMessageT +from ._receiver import Receiver, ReceiverStoppedError +from ._sender import Sender, SenderClosedError + + +class _Empty: + """A sentinel indicating that no message has been sent.""" + + +_EMPTY = _Empty() + + +class _Oneshot(typing.Generic[ChannelMessageT]): + """Internal representation of a one-shot channel. + + A one-shot channel is a channel that can only send one message. After the first + message is sent, the sender is closed and any further attempts to send a message + will raise a `SenderClosedError`. + """ + + def __init__(self) -> None: + """Create a new one-shot channel.""" + self.message: ChannelMessageT | _Empty = _EMPTY + self.closed: bool = False + self.drained: bool = False + self.event: asyncio.Event = asyncio.Event() + + +class OneshotSender(Sender[ChannelMessageT]): + """A sender for a one-shot channel.""" + + def __init__(self, channel: _Oneshot[ChannelMessageT]) -> None: + """Initialize this sender.""" + self._channel = channel + + async def send(self, message: ChannelMessageT, /) -> None: + """Send a message through this sender.""" + if self._channel.closed: + raise SenderClosedError(self) + self._channel.message = message + self._channel.closed = True + self._channel.event.set() + + async def aclose(self) -> None: + """Close this sender.""" + self._channel.closed = True + if isinstance(self._channel.message, _Empty): + self._channel.drained = True + self._channel.event.set() + + +class OneshotReceiver(Receiver[ChannelMessageT]): + """A receiver for a one-shot channel.""" + + def __init__(self, channel: _Oneshot[ChannelMessageT]) -> None: + """Initialize this receiver.""" + self._channel = channel + + async def ready(self) -> bool: + """Check if a message is ready to be received. + + Returns: + `True` if a message is ready to be received, `False` if the sender + is closed and no message will be sent. + """ + if self._channel.drained: + return False + while not self._channel.closed: + await self._channel.event.wait() + if isinstance(self._channel.message, _Empty): + return False + return True + + def consume(self) -> ChannelMessageT: + """Consume a message from this receiver. + + Returns: + The message that was sent through this channel. + + Raises: + ReceiverStoppedError: If the sender was closed without sending a message. + """ + if self._channel.drained: + raise ReceiverStoppedError(self) + + assert not isinstance( + self._channel.message, _Empty + ), "`consume()` must be preceded by a call to `ready()`." + + self._channel.drained = True + self._channel.event.clear() + return self._channel.message + + +class OneshotChannel( + tuple[OneshotSender[ChannelMessageT], OneshotReceiver[ChannelMessageT]] +): + """A channel that can send a single message. + + A one-shot channel is a channel that can only send one message. After the first + message is sent, the sender is closed and any further attempts to send a message + will raise a `SenderClosedError`. + + # Example + + This example demonstrates how to use a one-shot channel to send a message + from one task to another. + + ```python + import asyncio + + from frequenz.channels import OneshotChannel, OneshotSender + + async def send(sender: OneshotSender[int]) -> None: + await sender.send(42) + + async def main() -> None: + sender, receiver = OneshotChannel[int]() + + async with asyncio.TaskGroup() as tg: + tg.create_task(send(sender)) + assert await receiver.receive() == 42 + + asyncio.run(main()) + ``` + """ + + def __new__(cls) -> OneshotChannel[ChannelMessageT]: + """Create a new one-shot channel.""" + channel = _Oneshot[ChannelMessageT]() + + return tuple.__new__(cls, (OneshotSender(channel), OneshotReceiver(channel))) diff --git a/tests/test_oneshot.py b/tests/test_oneshot.py new file mode 100644 index 00000000..728bdcde --- /dev/null +++ b/tests/test_oneshot.py @@ -0,0 +1,87 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Tests for the oneshot channel.""" + +import asyncio + +import pytest + +from frequenz.channels import ( + OneshotChannel, + ReceiverStoppedError, + SenderClosedError, +) + + +async def test_oneshot_recv_after_send() -> None: + """Test the oneshot function. + + `receiver.receive()` is called after `sender.send()`. + """ + sender, receiver = OneshotChannel[int]() + + await sender.send(42) + assert await receiver.receive() == 42 + + with pytest.raises(SenderClosedError): + await sender.send(43) + with pytest.raises(ReceiverStoppedError): + await receiver.receive() + + +async def test_oneshot_recv_before_send() -> None: + """Test the oneshot function. + + `receiver.receive()` is called before `sender.send()`. + """ + sender, receiver = OneshotChannel[int]() + + task = asyncio.create_task(receiver.receive()) + + # Give the receiver a chance to start waiting + await asyncio.sleep(0.0) + + await sender.send(42) + assert await task == 42 + + with pytest.raises(SenderClosedError): + await sender.send(43) + with pytest.raises(ReceiverStoppedError): + await receiver.receive() + + +async def test_oneshot_recv_after_sender_closed() -> None: + """Test that closing sender works without sending a message. + + `receiver.receive()` is called after `sender.aclose()`. + """ + sender, receiver = OneshotChannel[int]() + + await sender.aclose() + + with pytest.raises(ReceiverStoppedError): + await receiver.receive() + with pytest.raises(SenderClosedError): + await sender.send(4) + + +async def test_oneshot_recv_before_sender_closed() -> None: + """Test that closing sender works without sending a message. + + `receiver.receive()` is called before `sender.aclose()`. + """ + sender, receiver = OneshotChannel[int]() + + task = asyncio.create_task(receiver.receive()) + + # Give the receiver a chance to start waiting + await asyncio.sleep(0.0) + + await sender.aclose() + + with pytest.raises(ReceiverStoppedError): + await task + + with pytest.raises(SenderClosedError): + await sender.send(4) From 9c7a4b28643f14e8a3051fa36a4ad73dbcd31d57 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 5 Mar 2026 14:35:30 +0100 Subject: [PATCH 3/3] Update release notes Signed-off-by: Sahas Subramanian --- RELEASE_NOTES.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 7e1258ff..afebaa77 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -11,7 +11,9 @@ ## New Features - +- There's a new `Oneshot` channel, which returns a sender and a receiver. A single message can be sent using the sender, after which it will be closed. And the receiver will close as soon as the message is received. + +- `Sender`s now have an `aclose`, which must be called, when they are no-longer needed. ## Bug Fixes