Fix connection leak by request timers not cancelled in time#555
Draft
BewareMyPower wants to merge 3 commits intoapache:mainfrom
Draft
Fix connection leak by request timers not cancelled in time#555BewareMyPower wants to merge 3 commits intoapache:mainfrom
BewareMyPower wants to merge 3 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses #554 by refactoring how ClientConnection tracks in-flight requests and their timeouts, aiming to avoid connection objects being kept alive by outstanding timer handlers.
Changes:
- Introduces a
PendingRequest<T>helper that owns a timer + promise and usesweak_from_this()in the timer handler. - Adds
ExecutorService::createTimer()to construct anASIO::steady_timerwith an expiry. - Migrates multiple
ClientConnectionpending-request maps (requests/lookups/stats/schema/etc.) to storePendingRequestinstances and removes the old consumer-stats sweep timer logic.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
lib/PendingRequest.h |
New helper abstraction for request promise + timeout timer. |
lib/ExecutorService.h |
Adds a createTimer() convenience factory for steady_timer. |
lib/ClientConnection.h |
Replaces several pending-request structs/maps with PendingRequest-based maps (mostly unordered_map). |
lib/ClientConnection.cc |
Switches request creation/timeout wiring to PendingRequest, removes consumer-stats periodic timer cleanup. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
Comment on lines
+1402
to
+1404
| auto request = std::make_shared<GetSchema>( | ||
| executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId]() { | ||
| LOG_WARN(cnxString << "GetSchema request timeout to broker, req_id: " << requestId); |
Comment on lines
1370
to
1378
| if (isClosed()) { | ||
| lock.unlock(); | ||
| LOG_ERROR(cnxString() << "Client is not connected to the broker"); | ||
| promise.setFailed(ResultNotConnected); | ||
| return promise.getFuture(); | ||
| auto request = | ||
| std::make_shared<GetTopicsOfNamespace>(executor_->createTimer(operationsTimeout_), [] {}); | ||
| request->fail(ResultNotConnected); | ||
| return request->getFuture(); | ||
| } | ||
|
|
Comment on lines
+1357
to
+1358
| executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId]() { | ||
| LOG_WARN(cnxString << "GetLastMessageId request timeout to broker, req_id: " << requestId); |
Comment on lines
+21
to
+30
| #include <pulsar/Result.h> | ||
|
|
||
| #include <atomic> | ||
| #include <functional> | ||
| #include <memory> | ||
|
|
||
| #include "AsioDefines.h" | ||
| #include "AsioTimer.h" | ||
| #include "Future.h" | ||
|
|
Comment on lines
+73
to
+78
| template <typename Duration> | ||
| ASIO::steady_timer createTimer(const Duration &duration) { | ||
| auto timer = ASIO::steady_timer(io_context_); | ||
| timer.expires_after(duration); | ||
| return timer; | ||
| } |
Comment on lines
+964
to
+969
| lock.unlock(); | ||
| LOG_ERROR(cnxString() << "ConsumerStats is not supported since server protobuf version " | ||
| << serverProtocolVersion_ << " is older than proto::v8"); | ||
| auto request = | ||
| std::make_shared<ConsumerStatsRequest>(executor_->createTimer(operationsTimeout_), [] {}); | ||
| request->fail(ResultUnsupportedVersionError); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #554