fix(aio): use get_running_loop() instead of get_event_loop() in AIOConsumer#2212
Open
AlexCai26 wants to merge 1 commit intoconfluentinc:masterfrom
Open
fix(aio): use get_running_loop() instead of get_event_loop() in AIOConsumer#2212AlexCai26 wants to merge 1 commit intoconfluentinc:masterfrom
AlexCai26 wants to merge 1 commit intoconfluentinc:masterfrom
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
This PR replaces calls to the deprecated
asyncio.get_event_loop()withasyncio.get_running_loop()insideAIOConsumerinitialization and thesubscribemethod.Motivation and Context
When integrating
confluent_kafka.aio.AIOConsumerinto applications running on complex async frameworks like FastAPI / Uvicorn,asyncio.get_event_loop()can return a different event loop than the one currently executing the request handlers.Because
_AIOConsumerusesget_event_loop()to wrap Kafka callbacks (e.g.,on_assign,on_revoke, and internal configuration callbacks), these callbacks end up bound to the wrong loop. WhenAIOConsumerperforms background operations viaThreadPoolExecutorand attempts to dispatch callbacks usingloop.call_soon_threadsafe(...), the callbacks are scheduled on an inactive or incorrect event loop. This leads to silent deadlocks wherepoll()waits indefinitely for events that are never properly propagated to the application's running loop.Using
asyncio.get_running_loop()ensures that all thread-safe callbacks are explicitly scheduled on the exact event loop that instantiate and run the consumer. This perfectly alignsAIOConsumerwith the existing correct implementation inAIOProducer, which already utilizesasyncio.get_running_loop().Types of changes
Checklist