Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- There's a new `Oneshot` channel, which returns a sender and a receiver. A single message can be sent using the sender, after which it will be closed. And the receiver will close as soon as the message is received.

- `Sender`s now have an `aclose`, which must be called, when they are no-longer needed.

## Bug Fixes

Expand Down
7 changes: 6 additions & 1 deletion src/frequenz/channels/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
)
from ._latest_value_cache import LatestValueCache
from ._merge import Merger, merge
from ._oneshot import OneshotChannel, OneshotReceiver, OneshotSender
from ._receiver import Receiver, ReceiverError, ReceiverStoppedError
from ._select import (
Selected,
Expand All @@ -100,7 +101,7 @@
select,
selected_from,
)
from ._sender import Sender, SenderError
from ._sender import Sender, SenderClosedError, SenderError

__all__ = [
"Anycast",
Expand All @@ -113,13 +114,17 @@
"LatestValueCache",
"MappedMessageT_co",
"Merger",
"OneshotChannel",
"OneshotReceiver",
"OneshotSender",
"Receiver",
"ReceiverError",
"ReceiverMessageT_co",
"ReceiverStoppedError",
"SelectError",
"Selected",
"Sender",
"SenderClosedError",
"SenderError",
"SenderMessageT_co",
"SenderMessageT_contra",
Expand Down
19 changes: 18 additions & 1 deletion src/frequenz/channels/_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ._exceptions import ChannelClosedError
from ._generic import ChannelMessageT
from ._receiver import Receiver, ReceiverStoppedError
from ._sender import Sender, SenderError
from ._sender import Sender, SenderClosedError, SenderError

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -327,6 +327,9 @@ def __init__(self, channel: Anycast[_T], /) -> None:
self._channel: Anycast[_T] = channel
"""The channel that this sender belongs to."""

self._closed: bool = False
"""Whether the sender is closed."""

@override
async def send(self, message: _T, /) -> None:
"""Send a message across the channel.
Expand All @@ -343,7 +346,11 @@ async def send(self, message: _T, /) -> None:
SenderError: If the underlying channel was closed.
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
set as the cause.
SenderClosedError: If this sender was closed.
"""
if self._closed:
raise SenderClosedError(self)

# pylint: disable=protected-access
if self._channel._closed:
raise SenderError("The channel was closed", self) from ChannelClosedError(
Expand All @@ -367,6 +374,16 @@ async def send(self, message: _T, /) -> None:
self._channel._recv_cv.notify(1)
# pylint: enable=protected-access

@override
async def aclose(self) -> None:
"""Close this sender.

After closing, the sender will not be able to send any more messages. Any
attempt to send a message through a closed sender will raise a
[SenderError][frequenz.channels.SenderError].
"""
self._closed = True

def __str__(self) -> str:
"""Return a string representation of this sender."""
return f"{self._channel}:{type(self).__name__}"
Expand Down
18 changes: 17 additions & 1 deletion src/frequenz/channels/_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ._exceptions import ChannelClosedError
from ._generic import ChannelMessageT
from ._receiver import Receiver, ReceiverStoppedError
from ._sender import Sender, SenderError
from ._sender import Sender, SenderClosedError, SenderError

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -334,6 +334,9 @@ def __init__(self, channel: Broadcast[_T], /) -> None:
self._channel: Broadcast[_T] = channel
"""The broadcast channel this sender belongs to."""

self._closed: bool = False
"""Whether this sender is closed."""

@override
async def send(self, message: _T, /) -> None:
"""Send a message to all broadcast receivers.
Expand All @@ -345,7 +348,10 @@ async def send(self, message: _T, /) -> None:
SenderError: If the underlying channel was closed.
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
set as the cause.
SenderClosedError: If this sender was closed.
"""
if self._closed:
raise SenderClosedError(self)
# pylint: disable=protected-access
if self._channel._closed:
raise SenderError("The channel was closed", self) from ChannelClosedError(
Expand All @@ -365,6 +371,16 @@ async def send(self, message: _T, /) -> None:
self._channel._recv_cv.notify_all()
# pylint: enable=protected-access

@override
async def aclose(self) -> None:
"""Close this sender.

After a sender is closed, it can no longer be used to send messages. Any
attempt to send a message through a closed sender will raise a
[SenderClosedError][frequenz.channels.SenderClosedError].
"""
self._closed = True

def __str__(self) -> str:
"""Return a string representation of this sender."""
return f"{self._channel}:{type(self).__name__}"
Expand Down
142 changes: 142 additions & 0 deletions src/frequenz/channels/_oneshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# License: MIT
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH

"""A channel that can send a single message."""

from __future__ import annotations

import asyncio
import typing

from ._generic import ChannelMessageT
from ._receiver import Receiver, ReceiverStoppedError
from ._sender import Sender, SenderClosedError


class _Empty:
"""A sentinel indicating that no message has been sent."""


_EMPTY = _Empty()


class _Oneshot(typing.Generic[ChannelMessageT]):
"""Internal representation of a one-shot channel.

A one-shot channel is a channel that can only send one message. After the first
message is sent, the sender is closed and any further attempts to send a message
will raise a `SenderClosedError`.
"""

def __init__(self) -> None:
"""Create a new one-shot channel."""
self.message: ChannelMessageT | _Empty = _EMPTY
self.closed: bool = False
self.drained: bool = False
self.event: asyncio.Event = asyncio.Event()


class OneshotSender(Sender[ChannelMessageT]):
"""A sender for a one-shot channel."""

def __init__(self, channel: _Oneshot[ChannelMessageT]) -> None:
"""Initialize this sender."""
self._channel = channel

async def send(self, message: ChannelMessageT, /) -> None:
"""Send a message through this sender."""
if self._channel.closed:
raise SenderClosedError(self)
self._channel.message = message
self._channel.closed = True
self._channel.event.set()

async def aclose(self) -> None:
"""Close this sender."""
self._channel.closed = True
if isinstance(self._channel.message, _Empty):
self._channel.drained = True
self._channel.event.set()


class OneshotReceiver(Receiver[ChannelMessageT]):
"""A receiver for a one-shot channel."""

def __init__(self, channel: _Oneshot[ChannelMessageT]) -> None:
"""Initialize this receiver."""
self._channel = channel

async def ready(self) -> bool:
"""Check if a message is ready to be received.

Returns:
`True` if a message is ready to be received, `False` if the sender
is closed and no message will be sent.
"""
if self._channel.drained:
return False
while not self._channel.closed:
await self._channel.event.wait()
if isinstance(self._channel.message, _Empty):
return False
return True

def consume(self) -> ChannelMessageT:
"""Consume a message from this receiver.

Returns:
The message that was sent through this channel.

Raises:
ReceiverStoppedError: If the sender was closed without sending a message.
"""
if self._channel.drained:
raise ReceiverStoppedError(self)

assert not isinstance(
self._channel.message, _Empty
), "`consume()` must be preceded by a call to `ready()`."

self._channel.drained = True
self._channel.event.clear()
return self._channel.message


class OneshotChannel(
tuple[OneshotSender[ChannelMessageT], OneshotReceiver[ChannelMessageT]]
):
"""A channel that can send a single message.

A one-shot channel is a channel that can only send one message. After the first
message is sent, the sender is closed and any further attempts to send a message
will raise a `SenderClosedError`.

# Example

This example demonstrates how to use a one-shot channel to send a message
from one task to another.

```python
import asyncio

from frequenz.channels import OneshotChannel, OneshotSender

async def send(sender: OneshotSender[int]) -> None:
await sender.send(42)

async def main() -> None:
sender, receiver = OneshotChannel[int]()

async with asyncio.TaskGroup() as tg:
tg.create_task(send(sender))
assert await receiver.receive() == 42

asyncio.run(main())
```
"""

def __new__(cls) -> OneshotChannel[ChannelMessageT]:
"""Create a new one-shot channel."""
channel = _Oneshot[ChannelMessageT]()

return tuple.__new__(cls, (OneshotSender(channel), OneshotReceiver(channel)))
21 changes: 21 additions & 0 deletions src/frequenz/channels/_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ async def send(self, message: SenderMessageT_contra, /) -> None:
SenderError: If there was an error sending the message.
"""

@abstractmethod
async def aclose(self) -> None:
"""Close this sender.

After a sender is closed, it can no longer be used to send messages. Any
attempt to send a message through a closed sender will raise a
[SenderClosedError][frequenz.channels.SenderClosedError].
"""


class SenderError(Error, Generic[SenderMessageT_co]):
"""An error that originated in a [Sender][frequenz.channels.Sender].
Expand All @@ -88,3 +97,15 @@ def __init__(self, message: str, sender: Sender[SenderMessageT_co]):
super().__init__(message)
self.sender: Sender[SenderMessageT_co] = sender
"""The sender where the error happened."""


class SenderClosedError(SenderError[SenderMessageT_co]):
"""An error indicating that a send operation was attempted on a closed sender."""

def __init__(self, sender: Sender[SenderMessageT_co]):
"""Initialize this error.

Args:
sender: The [Sender][frequenz.channels.Sender] that was closed.
"""
super().__init__("Sender is closed", sender)
9 changes: 7 additions & 2 deletions src/frequenz/channels/experimental/_relay_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
to the senders it was created with.
"""

import typing
import asyncio

from typing_extensions import override

from .._generic import SenderMessageT_contra
from .._sender import Sender


class RelaySender(typing.Generic[SenderMessageT_contra], Sender[SenderMessageT_contra]):
class RelaySender(Sender[SenderMessageT_contra]):
"""A Sender for sending messages to multiple senders.

The `RelaySender` class takes multiple senders and forwards all the messages sent to
Expand Down Expand Up @@ -57,3 +57,8 @@ async def send(self, message: SenderMessageT_contra, /) -> None:
"""
for sender in self._senders:
await sender.send(message)

@override
async def aclose(self) -> None:
"""Close this sender."""
await asyncio.gather(*(sender.aclose() for sender in self._senders))
Loading