Skip to content

Commit 64fbda0

Browse files
Fix connection leak by request timers not cancelled in time (#555)
* Fix connection leak by request timers not cancelled in time * fix * fix include style * revert consumer stats changes and speed up tests * abstract a common method to insert a request and add tests * remove duplicated emplace * remove unexpected error log
1 parent 1a39e53 commit 64fbda0

File tree

7 files changed

+376
-197
lines changed

7 files changed

+376
-197
lines changed

lib/ClientConnection.cc

Lines changed: 104 additions & 131 deletions
Large diffs are not rendered by default.

lib/ClientConnection.h

Lines changed: 35 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <cstdint>
2929
#include <future>
3030
#include <optional>
31+
3132
#ifdef USE_ASIO
3233
#include <asio/bind_executor.hpp>
3334
#include <asio/io_context.hpp>
@@ -52,8 +53,10 @@
5253

5354
#include "AsioTimer.h"
5455
#include "Commands.h"
56+
#include "ExecutorService.h"
5557
#include "GetLastMessageIdResponse.h"
5658
#include "LookupDataResult.h"
59+
#include "PendingRequest.h"
5760
#include "SharedBuffer.h"
5861
#include "TimeUtils.h"
5962
#include "UtilAllocator.h"
@@ -66,9 +69,6 @@ class PulsarFriend;
6669

6770
using TcpResolverPtr = std::shared_ptr<ASIO::ip::tcp::resolver>;
6871

69-
class ExecutorService;
70-
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
71-
7272
class ConnectionPool;
7373
class ClientConnection;
7474
typedef std::shared_ptr<ClientConnection> ClientConnectionPtr;
@@ -225,47 +225,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
225225
void handleKeepAliveTimeout(const ASIO_ERROR& ec);
226226

227227
private:
228-
struct PendingRequestData {
229-
Promise<Result, ResponseData> promise;
230-
DeadlineTimerPtr timer;
231-
std::shared_ptr<std::atomic_bool> hasGotResponse{std::make_shared<std::atomic_bool>(false)};
232-
233-
void fail(Result result) {
234-
cancelTimer(*timer);
235-
promise.setFailed(result);
236-
}
237-
};
238-
239-
struct LookupRequestData {
240-
LookupDataResultPromisePtr promise;
241-
DeadlineTimerPtr timer;
242-
243-
void fail(Result result) {
244-
cancelTimer(*timer);
245-
promise->setFailed(result);
246-
}
247-
};
248-
249-
struct LastMessageIdRequestData {
250-
GetLastMessageIdResponsePromisePtr promise;
251-
DeadlineTimerPtr timer;
252-
253-
void fail(Result result) {
254-
cancelTimer(*timer);
255-
promise->setFailed(result);
256-
}
257-
};
258-
259-
struct GetSchemaRequest {
260-
Promise<Result, SchemaInfo> promise;
261-
DeadlineTimerPtr timer;
262-
263-
void fail(Result result) {
264-
cancelTimer(*timer);
265-
promise.setFailed(result);
266-
}
267-
};
268-
269228
/*
270229
* handler for connectAsync
271230
* creates a ConnectionPtr which has a valid ClientConnection object
@@ -303,12 +262,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
303262
void newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* requestType,
304263
const LookupDataResultPromisePtr& promise);
305264

306-
void handleRequestTimeout(const ASIO_ERROR& ec, const PendingRequestData& pendingRequestData);
307-
308-
void handleLookupTimeout(const ASIO_ERROR&, const LookupRequestData&);
309-
310-
void handleGetLastMessageIdTimeout(const ASIO_ERROR&, const LastMessageIdRequestData& data);
311-
312265
template <typename Handler>
313266
inline AllocHandler<Handler> customAllocReadHandler(Handler h) {
314267
return AllocHandler<Handler>(readHandlerAllocator_, h);
@@ -385,33 +338,49 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
385338
const std::chrono::milliseconds connectTimeout_;
386339
const DeadlineTimerPtr connectTimer_;
387340

388-
typedef std::map<long, PendingRequestData> PendingRequestsMap;
389-
PendingRequestsMap pendingRequests_;
341+
template <typename T>
342+
using RequestMap = std::unordered_map<uint64_t, PendingRequestPtr<T>>;
390343

391-
typedef std::map<long, LookupRequestData> PendingLookupRequestsMap;
392-
PendingLookupRequestsMap pendingLookupRequests_;
344+
RequestMap<ResponseData> pendingRequests_;
345+
RequestMap<LookupDataResultPtr> pendingLookupRequests_;
346+
RequestMap<GetLastMessageIdResponse> pendingGetLastMessageIdRequests_;
347+
RequestMap<NamespaceTopicsPtr> pendingGetNamespaceTopicsRequests_;
348+
RequestMap<SchemaInfo> pendingGetSchemaRequests_;
393349

394-
typedef std::map<long, ProducerImplWeakPtr> ProducersMap;
350+
typedef std::unordered_map<long, ProducerImplWeakPtr> ProducersMap;
395351
ProducersMap producers_;
396352

397-
typedef std::map<long, ConsumerImplWeakPtr> ConsumersMap;
353+
typedef std::unordered_map<long, ConsumerImplWeakPtr> ConsumersMap;
398354
ConsumersMap consumers_;
399355

400356
typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl>> PendingConsumerStatsMap;
401357
PendingConsumerStatsMap pendingConsumerStatsMap_;
402358

403-
typedef std::map<long, LastMessageIdRequestData> PendingGetLastMessageIdRequestsMap;
404-
PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_;
405-
406-
typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
407-
PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
408-
409-
typedef std::unordered_map<uint64_t, GetSchemaRequest> PendingGetSchemaMap;
410-
PendingGetSchemaMap pendingGetSchemaRequests_;
411-
412359
mutable std::mutex mutex_;
413360
typedef std::unique_lock<std::mutex> Lock;
414361

362+
// Note: this method must be called when holding `mutex_`
363+
template <typename T, typename OnTimeout>
364+
auto insertRequest(RequestMap<T>& pendingRequests, uint64_t requestId, OnTimeout onTimeout) {
365+
auto request = std::make_shared<PendingRequest<T>>(
366+
executor_->createTimer(operationsTimeout_),
367+
[this, self{shared_from_this()}, requestId, onTimeout{std::move(onTimeout)},
368+
&pendingRequests]() mutable {
369+
{
370+
std::lock_guard lock{mutex_};
371+
if (auto it = pendingRequests.find(requestId); it != pendingRequests.end()) {
372+
pendingRequests.erase(it);
373+
}
374+
}
375+
onTimeout();
376+
});
377+
auto [iterator, inserted] = pendingRequests.emplace(requestId, request);
378+
if (inserted) {
379+
request->initialize();
380+
} // else: the request id is duplicated
381+
return iterator->second;
382+
}
383+
415384
// Pending buffers to write on the socket
416385
std::deque<std::any> pendingWriteBuffers_;
417386
int pendingWriteOperations_ = 0;
@@ -435,7 +404,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
435404

436405
void startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests);
437406
uint32_t maxPendingLookupRequest_;
438-
uint32_t numOfPendingLookupRequest_ = 0;
407+
std::atomic_uint32_t numOfPendingLookupRequest_{0};
439408

440409
bool isTlsAllowInsecureConnection_ = false;
441410

lib/ExecutorService.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@
2828
#include <asio/ip/tcp.hpp>
2929
#include <asio/post.hpp>
3030
#include <asio/ssl.hpp>
31+
#include <asio/steady_timer.hpp>
3132
#else
3233
#include <boost/asio/dispatch.hpp>
3334
#include <boost/asio/io_context.hpp>
3435
#include <boost/asio/ip/tcp.hpp>
3536
#include <boost/asio/post.hpp>
3637
#include <boost/asio/ssl.hpp>
38+
#include <boost/asio/steady_timer.hpp>
3739
#endif
3840
#include <chrono>
3941
#include <condition_variable>
@@ -68,6 +70,13 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut
6870
// throws std::runtime_error if failed
6971
DeadlineTimerPtr createDeadlineTimer();
7072

73+
template <typename Duration>
74+
ASIO::steady_timer createTimer(const Duration &duration) {
75+
auto timer = ASIO::steady_timer(io_context_);
76+
timer.expires_after(duration);
77+
return timer;
78+
}
79+
7180
// Execute the task in the event loop thread asynchronously, i.e. the task will be put in the event loop
7281
// queue and executed later.
7382
template <typename T>

lib/MockServer.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,26 @@ class MockServer : public std::enable_shared_from_this<MockServer> {
8181
proto::CommandConsumerStatsResponse response;
8282
response.set_request_id(requestId);
8383
connection->handleConsumerStatsResponse(response);
84+
} else if (request == "LOOKUP") {
85+
proto::CommandLookupTopicResponse response;
86+
response.set_request_id(requestId);
87+
response.set_response(proto::CommandLookupTopicResponse_LookupType_Connect);
88+
response.set_brokerserviceurl("pulsar://localhost:6650");
89+
connection->handleLookupTopicRespose(response);
90+
} else if (request == "GET_LAST_MESSAGE_ID") {
91+
proto::CommandGetLastMessageIdResponse response;
92+
response.set_request_id(requestId);
93+
response.mutable_last_message_id();
94+
connection->handleGetLastMessageIdResponse(response);
95+
} else if (request == "GET_TOPICS_OF_NAMESPACE") {
96+
proto::CommandGetTopicsOfNamespaceResponse response;
97+
response.set_request_id(requestId);
98+
connection->handleGetTopicOfNamespaceResponse(response);
99+
} else if (request == "GET_SCHEMA") {
100+
proto::CommandGetSchemaResponse response;
101+
response.set_request_id(requestId);
102+
response.mutable_schema()->set_type(proto::Schema_Type_String);
103+
connection->handleGetSchemaResponse(response);
84104
} else {
85105
proto::CommandSuccess success;
86106
success.set_request_id(requestId);

lib/PendingRequest.h

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <pulsar/Result.h>
22+
23+
#include <atomic>
24+
#include <functional>
25+
#include <memory>
26+
27+
#include "AsioDefines.h"
28+
#include "AsioTimer.h"
29+
#include "Future.h"
30+
31+
namespace pulsar {
32+
33+
template <typename T>
34+
class PendingRequest : public std::enable_shared_from_this<PendingRequest<T>> {
35+
public:
36+
PendingRequest(ASIO::steady_timer timer, std::function<void()> timeoutCallback)
37+
: timer_(std::move(timer)), timeoutCallback_(std::move(timeoutCallback)) {}
38+
39+
void initialize() {
40+
timer_.async_wait([this, weakSelf{this->weak_from_this()}](const auto& error) {
41+
auto self = weakSelf.lock();
42+
if (!self || error || timeoutDisabled_.load(std::memory_order_acquire)) {
43+
return;
44+
}
45+
timeoutCallback_();
46+
promise_.setFailed(ResultTimeout);
47+
});
48+
}
49+
50+
void complete(const T& value) {
51+
promise_.setValue(value);
52+
cancelTimer(timer_);
53+
}
54+
55+
void fail(Result result) {
56+
promise_.setFailed(result);
57+
cancelTimer(timer_);
58+
}
59+
60+
void disableTimeout() { timeoutDisabled_.store(true, std::memory_order_release); }
61+
62+
auto getFuture() const { return promise_.getFuture(); }
63+
64+
~PendingRequest() { cancelTimer(timer_); }
65+
66+
private:
67+
ASIO::steady_timer timer_;
68+
Promise<Result, T> promise_;
69+
std::function<void()> timeoutCallback_;
70+
std::atomic_bool timeoutDisabled_{false};
71+
};
72+
73+
template <typename T>
74+
using PendingRequestPtr = std::shared_ptr<PendingRequest<T>>;
75+
76+
} // namespace pulsar

0 commit comments

Comments
 (0)