diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index cc7e1f67..a4062b0b 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -200,7 +200,6 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: connectTimer_(executor_->createDeadlineTimer()), outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), keepAliveIntervalInSeconds_(clientConfiguration.getKeepAliveIntervalInSeconds()), - consumerStatsRequestTimer_(executor_->createDeadlineTimer()), maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()), clientVersion_(clientVersion), pool_(pool), @@ -336,49 +335,6 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC lock.unlock(); connectPromise_.setValue(shared_from_this()); - - if (serverProtocolVersion_ >= proto::v8) { - startConsumerStatsTimer(std::vector()); - } -} - -void ClientConnection::startConsumerStatsTimer(std::vector consumerStatsRequests) { - std::vector> consumerStatsPromises; - Lock lock(mutex_); - - for (int i = 0; i < consumerStatsRequests.size(); i++) { - PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.find(consumerStatsRequests[i]); - if (it != pendingConsumerStatsMap_.end()) { - LOG_DEBUG(cnxString() << " removing request_id " << it->first - << " from the pendingConsumerStatsMap_"); - consumerStatsPromises.push_back(it->second); - pendingConsumerStatsMap_.erase(it); - } else { - LOG_DEBUG(cnxString() << "request_id " << it->first << " already fulfilled - not removing it"); - } - } - - consumerStatsRequests.clear(); - for (PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.begin(); - it != pendingConsumerStatsMap_.end(); ++it) { - consumerStatsRequests.push_back(it->first); - } - - // If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero - // Check if we have a timer still before we set the request timer to pop again. - if (consumerStatsRequestTimer_) { - consumerStatsRequestTimer_->expires_after(operationsTimeout_); - consumerStatsRequestTimer_->async_wait( - [this, self{shared_from_this()}, consumerStatsRequests](const ASIO_ERROR& err) { - handleConsumerStatsTimeout(err, consumerStatsRequests); - }); - } - lock.unlock(); - // Complex logic since promises need to be fulfilled outside the lock - for (int i = 0; i < consumerStatsPromises.size(); i++) { - consumerStatsPromises[i].setFailed(ResultTimeout); - LOG_WARN(cnxString() << " Operation timedout, didn't get response from broker"); - } } /// The number of unacknowledged probes to send before considering the connection dead and notifying the @@ -996,21 +952,38 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) { Future ClientConnection::newConsumerStats(uint64_t consumerId, uint64_t requestId) { Lock lock(mutex_); - Promise promise; if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString() << " Client is not connected to the broker"); - promise.setFailed(ResultNotConnected); - return promise.getFuture(); + auto request = + std::make_shared(executor_->createTimer(operationsTimeout_), [] {}); + request->fail(ResultNotConnected); + return request->getFuture(); + } + if (serverProtocolVersion_ < proto::v8) { + lock.unlock(); + LOG_ERROR(cnxString() << "ConsumerStats is not supported since server protobuf version " + << serverProtocolVersion_ << " is older than proto::v8"); + auto request = + std::make_shared(executor_->createTimer(operationsTimeout_), [] {}); + request->fail(ResultUnsupportedVersionError); + return request->getFuture(); } - pendingConsumerStatsMap_.insert(std::make_pair(requestId, promise)); + + auto request = std::make_shared( + executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId]() { + LOG_WARN(cnxString << "ConsumerStats request timeout to broker, req_id: " << requestId); + }); + pendingConsumerStatsMap_.emplace(requestId, request); + request->initialize(); lock.unlock(); + if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr && mockServer_->sendRequest("CONSUMER_STATS", requestId)) { - return promise.getFuture(); + return request->getFuture(); } sendCommand(Commands::newConsumerStats(consumerId, requestId)); - return promise.getFuture(); + return request->getFuture(); } void ClientConnection::newTopicLookup(const std::string& topicName, bool authoritative, @@ -1029,8 +1002,6 @@ void ClientConnection::newPartitionedMetadataLookup(const std::string& topicName void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, const char* requestType, const LookupDataResultPromisePtr& promise) { Lock lock(mutex_); - std::shared_ptr lookupDataResult; - lookupDataResult = std::make_shared(); if (isClosed()) { lock.unlock(); promise->setFailed(ResultNotConnected); @@ -1040,16 +1011,22 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, uint64_t requestId, co promise->setFailed(ResultTooManyLookupRequestException); return; } - LookupRequestData requestData; - requestData.promise = promise; - requestData.timer = executor_->createDeadlineTimer(); - requestData.timer->expires_after(operationsTimeout_); - requestData.timer->async_wait([this, self{shared_from_this()}, requestData](const ASIO_ERROR& ec) { - handleLookupTimeout(ec, requestData); + + auto request = std::make_shared( + executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId, requestType]() { + LOG_WARN(cnxString << requestType << " request timeout to broker, req_id: " << requestId); + }); + request->getFuture().addListener([promise](Result result, const LookupDataResultPtr& lookupDataResult) { + if (result == ResultOk) { + promise->setValue(lookupDataResult); + } else { + promise->setFailed(result); + } }); - pendingLookupRequests_.insert(std::make_pair(requestId, requestData)); + pendingLookupRequests_.emplace(requestId, request); numOfPendingLookupRequest_++; + request->initialize(); lock.unlock(); LOG_DEBUG(cnxString() << "Inserted lookup request " << requestType << " (req_id: " << requestId << ")"); sendCommand(cmd); @@ -1159,21 +1136,21 @@ Future ClientConnection::sendRequestWithId(const SharedBuf if (isClosed()) { lock.unlock(); - Promise promise; LOG_DEBUG(cnxString() << "Fail " << requestType << "(req_id: " << requestId << ") to a closed connection"); - promise.setFailed(ResultNotConnected); - return promise.getFuture(); + auto request = std::make_shared(executor_->createTimer(operationsTimeout_), [] {}); + request->fail(ResultNotConnected); + return request->getFuture(); } - PendingRequestData requestData; - requestData.timer = executor_->createDeadlineTimer(); - requestData.timer->expires_after(operationsTimeout_); - requestData.timer->async_wait([this, self{shared_from_this()}, requestData](const ASIO_ERROR& ec) { - handleRequestTimeout(ec, requestData); - }); - - pendingRequests_.insert(std::make_pair(requestId, requestData)); + auto request = std::make_shared( + executor_->createTimer(operationsTimeout_), + [cnxString = cnxString(), physicalAddress = physicalAddress_, requestId, requestType]() { + LOG_WARN(cnxString << "Network request timeout to broker, remote: " << physicalAddress + << ", req_id: " << requestId << ", request: " << requestType); + }); + pendingRequests_.emplace(requestId, request); + request->initialize(); lock.unlock(); LOG_DEBUG(cnxString() << "Inserted request " << requestType << " (req_id: " << requestId << ")"); @@ -1187,31 +1164,7 @@ Future ClientConnection::sendRequestWithId(const SharedBuf } else { sendCommand(cmd); } - return requestData.promise.getFuture(); -} - -void ClientConnection::handleRequestTimeout(const ASIO_ERROR& ec, - const PendingRequestData& pendingRequestData) { - if (!ec && !pendingRequestData.hasGotResponse->load()) { - LOG_WARN(cnxString() << "Network request timeout to broker, remote: " << physicalAddress_); - pendingRequestData.promise.setFailed(ResultTimeout); - } -} - -void ClientConnection::handleLookupTimeout(const ASIO_ERROR& ec, - const LookupRequestData& pendingRequestData) { - if (!ec) { - LOG_WARN(cnxString() << "Lookup request timeout to broker, remote: " << physicalAddress_); - pendingRequestData.promise->setFailed(ResultTimeout); - } -} - -void ClientConnection::handleGetLastMessageIdTimeout(const ASIO_ERROR& ec, - const ClientConnection::LastMessageIdRequestData& data) { - if (!ec) { - LOG_WARN(cnxString() << "GetLastMessageId request timeout to broker, remote: " << physicalAddress_); - data.promise->setFailed(ResultTimeout); - } + return request->getFuture(); } void ClientConnection::handleKeepAliveTimeout(const ASIO_ERROR& ec) { @@ -1240,15 +1193,6 @@ void ClientConnection::handleKeepAliveTimeout(const ASIO_ERROR& ec) { } } -void ClientConnection::handleConsumerStatsTimeout(const ASIO_ERROR& ec, - const std::vector& consumerStatsRequests) { - if (ec) { - LOG_DEBUG(cnxString() << " Ignoring timer cancelled event, code[" << ec << "]"); - return; - } - startConsumerStatsTimer(consumerStatsRequests); -} - const std::future& ClientConnection::close(Result result, bool switchCluster) { Lock lock(mutex_); if (closeFuture_) { @@ -1286,11 +1230,6 @@ const std::future& ClientConnection::close(Result result, bool switchClust keepAliveTimer_.reset(); } - if (consumerStatsRequestTimer_) { - cancelTimer(*consumerStatsRequestTimer_); - consumerStatsRequestTimer_.reset(); - } - cancelTimer(*connectTimer_); lock.unlock(); int refCount = weak_from_this().use_count(); @@ -1344,25 +1283,25 @@ const std::future& ClientConnection::close(Result result, bool switchClust connectPromise_.setFailed(result); - // Fail all pending requests, all these type are map whose value type contains the Promise object + // Fail all pending requests after releasing the lock. for (auto& kv : pendingRequests) { - kv.second.fail(result); + kv.second->fail(result); } for (auto& kv : pendingLookupRequests) { - kv.second.fail(result); + kv.second->fail(result); } for (auto& kv : pendingConsumerStatsMap) { LOG_ERROR(cnxString() << " Closing Client Connection, please try again later"); - kv.second.setFailed(result); + kv.second->fail(result); } for (auto& kv : pendingGetLastMessageIdRequests) { - kv.second.fail(result); + kv.second->fail(result); } for (auto& kv : pendingGetNamespaceTopicsRequests) { - kv.second.setFailed(result); + kv.second->fail(result); } for (auto& kv : pendingGetSchemaRequests) { - kv.second.fail(result); + kv.second->fail(result); } return *closeFuture_; } @@ -1406,77 +1345,70 @@ Commands::ChecksumType ClientConnection::getChecksumType() const { Future ClientConnection::newGetLastMessageId(uint64_t consumerId, uint64_t requestId) { Lock lock(mutex_); - auto promise = std::make_shared(); if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString() << " Client is not connected to the broker"); + auto promise = std::make_shared(); promise->setFailed(ResultNotConnected); return promise->getFuture(); } - LastMessageIdRequestData requestData; - requestData.promise = promise; - requestData.timer = executor_->createDeadlineTimer(); - requestData.timer->expires_after(operationsTimeout_); - requestData.timer->async_wait([this, self{shared_from_this()}, requestData](const ASIO_ERROR& ec) { - handleGetLastMessageIdTimeout(ec, requestData); - }); - pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, requestData)); + auto request = std::make_shared( + executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId]() { + LOG_WARN(cnxString << "GetLastMessageId request timeout to broker, req_id: " << requestId); + }); + pendingGetLastMessageIdRequests_.emplace(requestId, request); + request->initialize(); lock.unlock(); sendCommand(Commands::newGetLastMessageId(consumerId, requestId)); - return promise->getFuture(); + return request->getFuture(); } Future ClientConnection::newGetTopicsOfNamespace( const std::string& nsName, CommandGetTopicsOfNamespace_Mode mode, uint64_t requestId) { Lock lock(mutex_); - Promise promise; if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString() << "Client is not connected to the broker"); - promise.setFailed(ResultNotConnected); - return promise.getFuture(); + auto request = + std::make_shared(executor_->createTimer(operationsTimeout_), [] {}); + request->fail(ResultNotConnected); + return request->getFuture(); } - pendingGetNamespaceTopicsRequests_.insert(std::make_pair(requestId, promise)); + auto request = std::make_shared( + executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId]() { + LOG_WARN(cnxString << "GetTopicsOfNamespace request timeout to broker, req_id: " << requestId); + }); + pendingGetNamespaceTopicsRequests_.emplace(requestId, request); + request->initialize(); lock.unlock(); sendCommand(Commands::newGetTopicsOfNamespace(nsName, mode, requestId)); - return promise.getFuture(); + return request->getFuture(); } Future ClientConnection::newGetSchema(const std::string& topicName, const std::string& version, uint64_t requestId) { Lock lock(mutex_); - Promise promise; if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString() << "Client is not connected to the broker"); - promise.setFailed(ResultNotConnected); - return promise.getFuture(); + auto request = std::make_shared(executor_->createTimer(operationsTimeout_), [] {}); + request->fail(ResultNotConnected); + return request->getFuture(); } - auto timer = executor_->createDeadlineTimer(); - pendingGetSchemaRequests_.emplace(requestId, GetSchemaRequest{promise, timer}); + auto request = std::make_shared( + executor_->createTimer(operationsTimeout_), [cnxString = cnxString(), requestId]() { + LOG_WARN(cnxString << "GetSchema request timeout to broker, req_id: " << requestId); + }); + pendingGetSchemaRequests_.emplace(requestId, request); + request->initialize(); lock.unlock(); - timer->expires_after(operationsTimeout_); - timer->async_wait([this, self{shared_from_this()}, requestId](const ASIO_ERROR& ec) { - if (ec) { - return; - } - Lock lock(mutex_); - auto it = pendingGetSchemaRequests_.find(requestId); - if (it != pendingGetSchemaRequests_.end()) { - auto promise = std::move(it->second.promise); - pendingGetSchemaRequests_.erase(it); - lock.unlock(); - promise.setFailed(ResultTimeout); - } - }); - sendCommand(Commands::newGetSchema(topicName, version, requestId)); - return promise.getFuture(); + return request->getFuture(); } void ClientConnection::checkServerError(ServerError error, const std::string& message) { @@ -1541,12 +1473,11 @@ void ClientConnection::handleSuccess(const proto::CommandSuccess& success) { Lock lock(mutex_); auto it = pendingRequests_.find(success.request_id()); if (it != pendingRequests_.end()) { - PendingRequestData requestData = it->second; + auto request = std::move(it->second); pendingRequests_.erase(it); lock.unlock(); - requestData.promise.setValue({}); - cancelTimer(*requestData.timer); + request->complete({}); } } @@ -1558,9 +1489,7 @@ void ClientConnection::handlePartitionedMetadataResponse( Lock lock(mutex_); auto it = pendingLookupRequests_.find(partitionMetadataResponse.request_id()); if (it != pendingLookupRequests_.end()) { - cancelTimer(*it->second.timer); - - LookupDataResultPromisePtr lookupDataPromise = it->second.promise; + auto request = std::move(it->second); pendingLookupRequests_.erase(it); numOfPendingLookupRequest_--; lock.unlock(); @@ -1574,17 +1503,17 @@ void ClientConnection::handlePartitionedMetadataResponse( << " error: " << partitionMetadataResponse.error() << " msg: " << partitionMetadataResponse.message()); checkServerError(partitionMetadataResponse.error(), partitionMetadataResponse.message()); - lookupDataPromise->setFailed( + request->fail( getResult(partitionMetadataResponse.error(), partitionMetadataResponse.message())); } else { LOG_ERROR(cnxString() << "Failed partition-metadata lookup req_id: " << partitionMetadataResponse.request_id() << " with empty response: "); - lookupDataPromise->setFailed(ResultConnectError); + request->fail(ResultConnectError); } } else { LookupDataResultPtr lookupResultPtr = std::make_shared(); lookupResultPtr->setPartitions(partitionMetadataResponse.partitions()); - lookupDataPromise->setValue(lookupResultPtr); + request->complete(lookupResultPtr); } } else { @@ -1600,7 +1529,7 @@ void ClientConnection::handleConsumerStatsResponse( Lock lock(mutex_); auto it = pendingConsumerStatsMap_.find(consumerStatsResponse.request_id()); if (it != pendingConsumerStatsMap_.end()) { - Promise consumerStatsPromise = it->second; + auto request = std::move(it->second); pendingConsumerStatsMap_.erase(it); lock.unlock(); @@ -1609,7 +1538,7 @@ void ClientConnection::handleConsumerStatsResponse( LOG_ERROR(cnxString() << " Failed to get consumer stats - " << consumerStatsResponse.error_message()); } - consumerStatsPromise.setFailed( + request->fail( getResult(consumerStatsResponse.error_code(), consumerStatsResponse.error_message())); } else { LOG_DEBUG(cnxString() << "ConsumerStatsResponse command - Received consumer stats " @@ -1622,7 +1551,7 @@ void ClientConnection::handleConsumerStatsResponse( consumerStatsResponse.blockedconsumeronunackedmsgs(), consumerStatsResponse.address(), consumerStatsResponse.connectedsince(), consumerStatsResponse.type(), consumerStatsResponse.msgrateexpired(), consumerStatsResponse.msgbacklog()); - consumerStatsPromise.setValue(brokerStats); + request->complete(brokerStats); } } else { LOG_WARN("ConsumerStatsResponse command - Received unknown request id from server: " @@ -1635,8 +1564,7 @@ void ClientConnection::handleLookupTopicRespose( Lock lock(mutex_); auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id()); if (it != pendingLookupRequests_.end()) { - cancelTimer(*it->second.timer); - LookupDataResultPromisePtr lookupDataPromise = it->second.promise; + auto request = std::move(it->second); pendingLookupRequests_.erase(it); numOfPendingLookupRequest_--; lock.unlock(); @@ -1648,12 +1576,11 @@ void ClientConnection::handleLookupTopicRespose( << " error: " << lookupTopicResponse.error() << " msg: " << lookupTopicResponse.message()); checkServerError(lookupTopicResponse.error(), lookupTopicResponse.message()); - lookupDataPromise->setFailed( - getResult(lookupTopicResponse.error(), lookupTopicResponse.message())); + request->fail(getResult(lookupTopicResponse.error(), lookupTopicResponse.message())); } else { LOG_ERROR(cnxString() << "Failed lookup req_id: " << lookupTopicResponse.request_id() << " with empty response: "); - lookupDataPromise->setFailed(ResultConnectError); + request->fail(ResultConnectError); } } else { LOG_DEBUG(cnxString() << "Received lookup response from server. req_id: " @@ -1676,7 +1603,7 @@ void ClientConnection::handleLookupTopicRespose( lookupResultPtr->setRedirect(lookupTopicResponse.response() == proto::CommandLookupTopicResponse::Redirect); lookupResultPtr->setShouldProxyThroughServiceUrl(lookupTopicResponse.proxy_through_service_url()); - lookupDataPromise->setValue(lookupResultPtr); + request->complete(lookupResultPtr); } } else { @@ -1692,12 +1619,12 @@ void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess Lock lock(mutex_); auto it = pendingRequests_.find(producerSuccess.request_id()); if (it != pendingRequests_.end()) { - PendingRequestData requestData = it->second; + auto request = it->second; if (!producerSuccess.producer_ready()) { LOG_INFO(cnxString() << " Producer " << producerSuccess.producer_name() << " has been queued up at broker. req_id: " << producerSuccess.request_id()); - requestData.hasGotResponse->store(true); + request->disableTimeout(); lock.unlock(); } else { pendingRequests_.erase(it); @@ -1713,8 +1640,7 @@ void ClientConnection::handleProducerSuccess(const proto::CommandProducerSuccess } else { data.topicEpoch = std::nullopt; } - requestData.promise.setValue(data); - cancelTimer(*requestData.timer); + request->complete(data); } } } @@ -1729,30 +1655,28 @@ void ClientConnection::handleError(const proto::CommandError& error) { auto it = pendingRequests_.find(error.request_id()); if (it != pendingRequests_.end()) { - PendingRequestData requestData = it->second; + auto request = std::move(it->second); pendingRequests_.erase(it); lock.unlock(); - requestData.promise.setFailed(result); - cancelTimer(*requestData.timer); + request->fail(result); } else { - PendingGetLastMessageIdRequestsMap::iterator it = - pendingGetLastMessageIdRequests_.find(error.request_id()); + auto it = pendingGetLastMessageIdRequests_.find(error.request_id()); if (it != pendingGetLastMessageIdRequests_.end()) { - auto getLastMessageIdPromise = it->second.promise; + auto request = std::move(it->second); pendingGetLastMessageIdRequests_.erase(it); lock.unlock(); - getLastMessageIdPromise->setFailed(result); + request->fail(result); } else { PendingGetNamespaceTopicsMap::iterator it = pendingGetNamespaceTopicsRequests_.find(error.request_id()); if (it != pendingGetNamespaceTopicsRequests_.end()) { - Promise getNamespaceTopicsPromise = it->second; + auto request = std::move(it->second); pendingGetNamespaceTopicsRequests_.erase(it); lock.unlock(); - getNamespaceTopicsPromise.setFailed(result); + request->fail(result); } else { lock.unlock(); } @@ -1904,16 +1828,15 @@ void ClientConnection::handleGetLastMessageIdResponse( auto it = pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id()); if (it != pendingGetLastMessageIdRequests_.end()) { - auto getLastMessageIdPromise = it->second.promise; + auto request = std::move(it->second); pendingGetLastMessageIdRequests_.erase(it); lock.unlock(); if (getLastMessageIdResponse.has_consumer_mark_delete_position()) { - getLastMessageIdPromise->setValue( - {toMessageId(getLastMessageIdResponse.last_message_id()), - toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())}); + request->complete({toMessageId(getLastMessageIdResponse.last_message_id()), + toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())}); } else { - getLastMessageIdPromise->setValue({toMessageId(getLastMessageIdResponse.last_message_id())}); + request->complete({toMessageId(getLastMessageIdResponse.last_message_id())}); } } else { lock.unlock(); @@ -1931,7 +1854,7 @@ void ClientConnection::handleGetTopicOfNamespaceResponse( auto it = pendingGetNamespaceTopicsRequests_.find(response.request_id()); if (it != pendingGetNamespaceTopicsRequests_.end()) { - Promise getTopicsPromise = it->second; + auto request = std::move(it->second); pendingGetNamespaceTopicsRequests_.erase(it); lock.unlock(); @@ -1953,7 +1876,7 @@ void ClientConnection::handleGetTopicOfNamespaceResponse( NamespaceTopicsPtr topicsPtr = std::make_shared>(topicSet.begin(), topicSet.end()); - getTopicsPromise.setValue(topicsPtr); + request->complete(topicsPtr); } else { lock.unlock(); LOG_WARN( @@ -1968,7 +1891,7 @@ void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResp Lock lock(mutex_); auto it = pendingGetSchemaRequests_.find(response.request_id()); if (it != pendingGetSchemaRequests_.end()) { - Promise getSchemaPromise = it->second.promise; + auto request = std::move(it->second); pendingGetSchemaRequests_.erase(it); lock.unlock(); @@ -1981,7 +1904,7 @@ void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResp : "") << " -- req_id: " << response.request_id()); } - getSchemaPromise.setFailed(result); + request->fail(result); return; } @@ -1992,7 +1915,7 @@ void ClientConnection::handleGetSchemaResponse(const proto::CommandGetSchemaResp properties[kv->key()] = kv->value(); } SchemaInfo schemaInfo(static_cast(schema.type()), "", schema.schema_data(), properties); - getSchemaPromise.setValue(schemaInfo); + request->complete(schemaInfo); } else { lock.unlock(); LOG_WARN( @@ -2013,24 +1936,23 @@ void ClientConnection::handleAckResponse(const proto::CommandAckResponse& respon return; } - auto promise = it->second.promise; + auto request = std::move(it->second); pendingRequests_.erase(it); lock.unlock(); if (response.has_error()) { - promise.setFailed(getResult(response.error(), "")); + request->fail(getResult(response.error(), "")); } else { - promise.setValue({}); + request->complete({}); } } void ClientConnection::unsafeRemovePendingRequest(long requestId) { auto it = pendingRequests_.find(requestId); if (it != pendingRequests_.end()) { - it->second.promise.setFailed(ResultDisconnected); - cancelTimer(*it->second.timer); - + auto request = std::move(it->second); pendingRequests_.erase(it); + request->fail(ResultDisconnected); } } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 75e4bca8..ad979e5d 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -28,6 +28,7 @@ #include #include #include + #ifdef USE_ASIO #include #include @@ -54,6 +55,7 @@ #include "Commands.h" #include "GetLastMessageIdResponse.h" #include "LookupDataResult.h" +#include "PendingRequest.h" #include "SharedBuffer.h" #include "TimeUtils.h" #include "UtilAllocator.h" @@ -225,47 +227,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this promise; - DeadlineTimerPtr timer; - std::shared_ptr hasGotResponse{std::make_shared(false)}; - - void fail(Result result) { - cancelTimer(*timer); - promise.setFailed(result); - } - }; - - struct LookupRequestData { - LookupDataResultPromisePtr promise; - DeadlineTimerPtr timer; - - void fail(Result result) { - cancelTimer(*timer); - promise->setFailed(result); - } - }; - - struct LastMessageIdRequestData { - GetLastMessageIdResponsePromisePtr promise; - DeadlineTimerPtr timer; - - void fail(Result result) { - cancelTimer(*timer); - promise->setFailed(result); - } - }; - - struct GetSchemaRequest { - Promise promise; - DeadlineTimerPtr timer; - - void fail(Result result) { - cancelTimer(*timer); - promise.setFailed(result); - } - }; - /* * handler for connectAsync * creates a ConnectionPtr which has a valid ClientConnection object @@ -303,12 +264,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this inline AllocHandler customAllocReadHandler(Handler h) { return AllocHandler(readHandlerAllocator_, h); @@ -385,28 +340,34 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this PendingRequestsMap; + using Request = PendingRequest; + typedef std::unordered_map> PendingRequestsMap; PendingRequestsMap pendingRequests_; - typedef std::map PendingLookupRequestsMap; + using LookupRequest = PendingRequest; + typedef std::unordered_map> PendingLookupRequestsMap; PendingLookupRequestsMap pendingLookupRequests_; - typedef std::map ProducersMap; + typedef std::unordered_map ProducersMap; ProducersMap producers_; - typedef std::map ConsumersMap; + typedef std::unordered_map ConsumersMap; ConsumersMap consumers_; - typedef std::map> PendingConsumerStatsMap; + using ConsumerStatsRequest = PendingRequest; + typedef std::unordered_map> PendingConsumerStatsMap; PendingConsumerStatsMap pendingConsumerStatsMap_; - typedef std::map PendingGetLastMessageIdRequestsMap; - PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_; + using GetLastMessageId = PendingRequest; + using PendingGetLastMessageIdMap = std::unordered_map>; + PendingGetLastMessageIdMap pendingGetLastMessageIdRequests_; - typedef std::map> PendingGetNamespaceTopicsMap; + using GetTopicsOfNamespace = PendingRequest; + typedef std::unordered_map> PendingGetNamespaceTopicsMap; PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_; - typedef std::unordered_map PendingGetSchemaMap; + using GetSchema = PendingRequest; + typedef std::unordered_map> PendingGetSchemaMap; PendingGetSchemaMap pendingGetSchemaRequests_; mutable std::mutex mutex_; @@ -426,14 +387,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this mockServer_; - - void handleConsumerStatsTimeout(const ASIO_ERROR& ec, const std::vector& consumerStatsRequests); - - void startConsumerStatsTimer(std::vector consumerStatsRequests); uint32_t maxPendingLookupRequest_; uint32_t numOfPendingLookupRequest_ = 0; diff --git a/lib/ExecutorService.h b/lib/ExecutorService.h index 80659d4b..4a36396c 100644 --- a/lib/ExecutorService.h +++ b/lib/ExecutorService.h @@ -28,12 +28,14 @@ #include #include #include +#include #else #include #include #include #include #include +#include #endif #include #include @@ -68,6 +70,13 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this + ASIO::steady_timer createTimer(const Duration &duration) { + auto timer = ASIO::steady_timer(io_context_); + timer.expires_after(duration); + return timer; + } + // Execute the task in the event loop thread asynchronously, i.e. the task will be put in the event loop // queue and executed later. template diff --git a/lib/PendingRequest.h b/lib/PendingRequest.h new file mode 100644 index 00000000..465073f6 --- /dev/null +++ b/lib/PendingRequest.h @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include + +#include +#include +#include + +#include "AsioDefines.h" +#include "AsioTimer.h" +#include "Future.h" + +namespace pulsar { + +template +class PendingRequest : public std::enable_shared_from_this> { + public: + PendingRequest(ASIO::steady_timer timer, std::function timeoutCallback) + : timer_(std::move(timer)), timeoutCallback_(std::move(timeoutCallback)) {} + + void initialize() { + timer_.async_wait([this, weakSelf{this->weak_from_this()}](const auto& error) { + auto self = weakSelf.lock(); + if (!self || error || timeoutDisabled_.load(std::memory_order_acquire)) { + return; + } + timeoutCallback_(); + promise_.setFailed(ResultTimeout); + }); + } + + void complete(const T& value) { + promise_.setValue(value); + cancelTimer(timer_); + } + + void fail(Result result) { + promise_.setFailed(result); + cancelTimer(timer_); + } + + void disableTimeout() { timeoutDisabled_.store(true, std::memory_order_release); } + + auto getFuture() const { return promise_.getFuture(); } + + ~PendingRequest() { cancelTimer(timer_); } + + private: + ASIO::steady_timer timer_; + Promise promise_; + std::function timeoutCallback_; + std::atomic_bool timeoutDisabled_{false}; +}; + +template +using PendingRequestPtr = std::shared_ptr>; + +} // namespace pulsar