2020#include < pulsar/Client.h>
2121
2222#include < atomic>
23+ #include < chrono>
2324#include < memory>
2425
26+ #ifdef USE_ASIO
27+ #include < asio/connect.hpp>
28+ #include < asio/io_context.hpp>
29+ #include < asio/ip/tcp.hpp>
30+ #include < asio/steady_timer.hpp>
31+ #else
32+ #include < boost/asio/connect.hpp>
33+ #include < boost/asio/io_context.hpp>
34+ #include < boost/asio/ip/tcp.hpp>
35+ #include < boost/asio/steady_timer.hpp>
36+ #endif
37+
38+ #include " AsioDefines.h"
2539#include " ClientImpl.h"
2640#include " ExecutorService.h"
2741#include " LogUtils.h"
42+ #include " ServiceURI.h"
2843
2944DECLARE_LOG_OBJECT ()
3045
3146namespace pulsar {
3247
48+ // Probe whether a Pulsar service URL is reachable by attempting a TCP connection.
49+ // Parses the first host:port from the service URL and tries to connect within timeoutMs.
50+ static bool probeAvailable (const std::string& serviceUrl, int timeoutMs) {
51+ try {
52+ ServiceURI uri (serviceUrl);
53+ const auto & hosts = uri.getServiceHosts ();
54+ if (hosts.empty ()) {
55+ return false ;
56+ }
57+
58+ // Each entry in getServiceHosts() is a full URL like "pulsar://host:port".
59+ // Strip the scheme prefix to get "host:port".
60+ const auto & hostAddr = hosts[0 ];
61+ const auto schemeEnd = hostAddr.find (" ://" );
62+ if (schemeEnd == std::string::npos) {
63+ return false ;
64+ }
65+ const std::string hostPort = hostAddr.substr (schemeEnd + 3 );
66+ const auto colonPos = hostPort.rfind (' :' );
67+ if (colonPos == std::string::npos) {
68+ return false ;
69+ }
70+ const std::string host = hostPort.substr (0 , colonPos);
71+ const std::string port = hostPort.substr (colonPos + 1 );
72+
73+ ASIO::io_context ioCtx;
74+ ASIO::ip::tcp::resolver resolver (ioCtx);
75+ ASIO_ERROR ec;
76+ const auto endpoints = resolver.resolve (host, port, ec);
77+ if (ec) {
78+ LOG_WARN (" probeAvailable: failed to resolve " << host << " :" << port << " - " << ec.message ());
79+ return false ;
80+ }
81+
82+ ASIO::ip::tcp::socket socket (ioCtx);
83+ bool connected = false ;
84+ ASIO::steady_timer timer (ioCtx);
85+ timer.expires_after (std::chrono::milliseconds (timeoutMs));
86+ timer.async_wait ([&](const ASIO_ERROR& timerEc) {
87+ if (!timerEc) {
88+ ASIO_ERROR closeEc;
89+ socket.close (closeEc);
90+ }
91+ });
92+ ASIO::async_connect (socket, endpoints,
93+ [&](const ASIO_ERROR& connectEc, const ASIO::ip::tcp::endpoint&) {
94+ if (!connectEc) {
95+ connected = true ;
96+ }
97+ timer.cancel ();
98+ });
99+ ioCtx.run ();
100+
101+ if (connected) {
102+ ASIO_ERROR closeEc;
103+ socket.close (closeEc);
104+ }
105+ return connected;
106+ } catch (const std::exception& e) {
107+ LOG_WARN (" probeAvailable: exception probing " << serviceUrl << " : " << e.what ());
108+ return false ;
109+ }
110+ }
111+
112+ static int64_t nowMs () {
113+ return std::chrono::duration_cast<std::chrono::milliseconds>(
114+ std::chrono::steady_clock::now ().time_since_epoch ())
115+ .count ();
116+ }
117+
118+ static constexpr int kProbeTimeoutMs = 30000 ;
119+
33120class AutoClusterFailoverImpl : public std ::enable_shared_from_this<AutoClusterFailoverImpl> {
34121 public:
35122 explicit AutoClusterFailoverImpl (AutoClusterFailover::Config&& config) : config_(std::move(config)) {}
@@ -52,7 +139,7 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_this<AutoClusterF
52139 return ;
53140 }
54141 auto closed = !running_->load (std::memory_order_acquire);
55- if (! error || closed) {
142+ if (error || closed) {
56143 LOG_INFO (" AutoClusterFailover exited, timer error: " << error.message ()
57144 << " , closed: " << closed);
58145 return ;
@@ -64,13 +151,75 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_this<AutoClusterF
64151
65152 void probeAndUpdateServiceUrl () {
66153 auto currentServiceInfo = client_->getServiceInfo ();
154+ const auto now = nowMs ();
155+
67156 if (currentServiceInfo == config_.primary ) {
68- // TODO: probe whether primary is down
157+ // Currently on primary: probe it and fail over to first available secondary if it's down.
158+ if (probeAvailable (currentServiceInfo.serviceUrl , kProbeTimeoutMs )) {
159+ failedTimestamp_ = -1 ;
160+ } else {
161+ if (failedTimestamp_ < 0 ) {
162+ failedTimestamp_ = now;
163+ } else if (now - failedTimestamp_ >= config_.failoverDelay .count ()) {
164+ for (const auto & secondary : config_.secondary ) {
165+ if (probeAvailable (secondary.serviceUrl , kProbeTimeoutMs )) {
166+ LOG_INFO (" Primary " << currentServiceInfo.serviceUrl << " has been down for "
167+ << (now - failedTimestamp_) << " ms, switching to secondary "
168+ << secondary.serviceUrl );
169+ auto info = secondary;
170+ client_->updateServiceInfo (std::move (info));
171+ failedTimestamp_ = -1 ;
172+ break ;
173+ } else {
174+ LOG_WARN (" Primary " << currentServiceInfo.serviceUrl << " has been down for "
175+ << (now - failedTimestamp_) << " ms. Secondary "
176+ << secondary.serviceUrl
177+ << " is also unavailable, trying next." );
178+ }
179+ }
180+ }
181+ }
69182 } else {
70- // TODO:
71- // 1. probe whether current (one secondary) is down
72- // 2. if not, check whether primary is up and switch back if it is
183+ // Currently on a secondary: probe it.
184+ if (probeAvailable (currentServiceInfo.serviceUrl , kProbeTimeoutMs )) {
185+ failedTimestamp_ = -1 ;
186+ // Secondary is up; check whether primary has recovered long enough to switch back.
187+ if (probeAvailable (config_.primary .serviceUrl , kProbeTimeoutMs )) {
188+ if (recoverTimestamp_ < 0 ) {
189+ recoverTimestamp_ = now;
190+ } else if (now - recoverTimestamp_ >= config_.switchBackDelay .count ()) {
191+ LOG_INFO (" Primary " << config_.primary .serviceUrl << " has been recovered for "
192+ << (now - recoverTimestamp_) << " ms, switching back from "
193+ << currentServiceInfo.serviceUrl );
194+ auto info = config_.primary ;
195+ client_->updateServiceInfo (std::move (info));
196+ recoverTimestamp_ = -1 ;
197+ }
198+ } else {
199+ recoverTimestamp_ = -1 ;
200+ }
201+ } else {
202+ // Current secondary is down; reset recovery clock and attempt to switch back to primary.
203+ recoverTimestamp_ = -1 ;
204+ if (failedTimestamp_ < 0 ) {
205+ failedTimestamp_ = now;
206+ } else if (now - failedTimestamp_ >= config_.failoverDelay .count ()) {
207+ if (probeAvailable (config_.primary .serviceUrl , kProbeTimeoutMs )) {
208+ LOG_INFO (" Secondary " << currentServiceInfo.serviceUrl << " has been down for "
209+ << (now - failedTimestamp_) << " ms, switching back to primary "
210+ << config_.primary .serviceUrl );
211+ auto info = config_.primary ;
212+ client_->updateServiceInfo (std::move (info));
213+ failedTimestamp_ = -1 ;
214+ } else {
215+ LOG_ERROR (" Secondary " << currentServiceInfo.serviceUrl << " has been down for "
216+ << (now - failedTimestamp_) << " ms and primary "
217+ << config_.primary .serviceUrl << " is also unavailable." );
218+ }
219+ }
220+ }
73221 }
222+
74223 scheduleProbeAndUpdateServiceUrl ();
75224 }
76225
@@ -79,6 +228,10 @@ class AutoClusterFailoverImpl : public std::enable_shared_from_this<AutoClusterF
79228 ClientImplPtr client_;
80229 ExecutorServicePtr executor_;
81230 std::shared_ptr<std::atomic_bool> running_{std::make_shared<std::atomic_bool>(true )};
231+ // Timestamp (ms) when the current service first became unreachable; -1 when it is reachable.
232+ int64_t failedTimestamp_{-1 };
233+ // Timestamp (ms) when the primary first became reachable again while on a secondary; -1 otherwise.
234+ int64_t recoverTimestamp_{-1 };
82235};
83236
84237AutoClusterFailover::AutoClusterFailover (AutoClusterFailover::Config&& config)
0 commit comments