-
Notifications
You must be signed in to change notification settings - Fork 310
Description
Describe the bug
I was trying to migrate a repository from 0.5 to 0.6 and I uncovered a bug that doesn't seem documented nor could I find any relevant issue.
How to reproduce
Include source code:
import asyncio
from faststream.rabbit import ExchangeType, RabbitExchange, RabbitQueue
from faststream.rabbit.fastapi import RabbitRouter
from faststream.rabbit.testing import TestRabbitBroker
from pydantic import BaseModel
exchange = RabbitExchange(name="test", type=ExchangeType.TOPIC)
queue = RabbitQueue(name="test_queue", routing_key="test.key")
result_queue = RabbitQueue(name="result_queue")
router = RabbitRouter()
class InputMessage(BaseModel):
value: int
class OutputMessage(BaseModel):
result: int
@router.subscriber(exchange=exchange, queue=queue)
@router.publisher(exchange=exchange, queue=result_queue)
async def process_message(message: InputMessage) -> OutputMessage:
return OutputMessage(result=message.value * 2)
@router.subscriber(exchange=exchange, queue=result_queue)
async def receive_result(message: OutputMessage) -> None:
print(f"Received result: {message.result}")
async def test_process_message():
async with TestRabbitBroker(router.broker) as broker:
await broker.publish(InputMessage(value=21), exchange=exchange, routing_key="test.key")
asyncio.run(test_process_message())And/Or steps to reproduce the behavior:
- ...
Expected behavior
In FastStream 0.5.47:
2025-11-05 17:58:48,239 INFO - test | test_queue | cbca55f0-7 - Received
2025-11-05 17:58:48,239 INFO - test | result_queue | 6198e1ee-7 - Received
Received result: 42
2025-11-05 17:58:48,239 INFO - test | result_queue | 6198e1ee-7 - Processed
2025-11-05 17:58:48,239 INFO - test | test_queue | cbca55f0-7 - ProcessedObserved behavior
In FastStream 0.6.0:
... [Big traceback]
File "/Users/lachaib/Library/Caches/pypoetry/virtualenvs/fastapi-regression-mBBImc5S-py3.13/lib/python3.13/site-packages/faststream/rabbit/testing.py", line 240, in publish
raise SubscriberNotFound
faststream.exceptions.SubscriberNotFoundEnvironment
Running FastStream 0.6.0 with CPython 3.13.7 on DarwinAdditional context
I did a lot of digging but TBH with all the rework done for version 0.6 I got lost in history and couldn't pinpoint properly the change causing the issue.
As far as I could go, the exchange attributed passed to @publisher gets omitted when the result is sent for publication, and a new one is created with type=DIRECT, leading to mismatch when looking up for the handler, which is though properly registered.
By the way, in version 0.5 in my unit tests there was no need for an actual handler to be registered, given in the production architecture, the listener for the result is inside another process.
My main concern is that as far as I could go, I saw:
in faststream/_internal/endpoint/subscriber/usecase.py
for p in chain(
self.__get_response_publisher(message),
h.handler._publishers,
):
await p._publish(
result_msg.as_publish_command(),
_extra_middlewares=(
m.publish_scope for m in middlewares[::-1]
),calling faststream/rabbit/publisher/usecase.py
@override
async def _publish(
self,
cmd: Union["RabbitPublishCommand", "PublishCommand"],
*,
_extra_middlewares: Iterable["PublisherMiddleware"],
) -> None:
"""This method should be called in subscriber flow only."""
cmd = RabbitPublishCommand.from_cmd(cmd)
cmd.destination = self.routing()
cmd.reply_to = cmd.reply_to or self.reply_to
cmd.add_headers(self.headers, override=False)
cmd.timeout = cmd.timeout or self.timeout
cmd.message_options = {**self.message_options, **cmd.message_options}
cmd.publish_options = {**self.publish_options, **cmd.publish_options}
await self._basic_publish(
cmd,
producer=self._outer_config.producer,
_extra_middlewares=_extra_middlewares,
)Apparently this design is new in 0.6.0, and from what I see, it affects other brokers (redis, kafka)
From these 2 snippets, we can see the cmd being created without information on the exchange of the publisher, nor see it added in _publish
Given it's not in the testing layers, I'm concerned this issue might affect real publications in production.
I don't know to which extent this pattern is used with people who are using 0.6.
I'd appreciate a confirmation of my investigation and tips to move on with migration.
Also, fix seems as trivial as adding to _publish
cmd.exchange = RabbitExchange.validate(self.exchange)If that's ok, I'm game to add myself as contributor for a fix.