Skip to content

Commit c42d97e

Browse files
committed
sched: debug partition terminate phase
1 parent a544077 commit c42d97e

File tree

6 files changed

+25
-43
lines changed

6 files changed

+25
-43
lines changed

src/TfScheduler/TfSchedulerConnManager.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,7 @@
3333
#include <list>
3434
#include <future>
3535

36-
namespace o2
37-
{
38-
namespace DataDistribution
36+
namespace o2::DataDistribution
3937
{
4038

4139
enum StfSenderState {
@@ -167,7 +165,7 @@ class TfSchedulerConnManager
167165

168166
/// StfSender RPC-client channels
169167
std::recursive_mutex mStfSenderClientsLock;
170-
StfSenderRpcClientCollection<ConsulTfSchedulerInstance> mStfSenderRpcClients;
168+
StfSenderRpcClientCollection<ConsulTfSchedulerInstance> mStfSenderRpcClients;
171169
/// TfBuilder RPC-client channels
172170
TfBuilderRpcClientCollection<ConsulTfSchedulerInstance> mTfBuilderRpcClients;
173171

@@ -176,7 +174,7 @@ class TfSchedulerConnManager
176174
std::condition_variable_any mStfDropFuturesCV;
177175
std::list<std::future<std::uint64_t>> mStfDropFutures;
178176
};
179-
}
177+
180178
} /* namespace o2::DataDistribution */
181179

182180
#endif /* ALICEO2_TF_SCHEDULER_CONNMANAGER_H_ */

src/TfScheduler/TfSchedulerInstanceRpc.cxx

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
#include <condition_variable>
1919
#include <stdexcept>
2020

21-
namespace o2
22-
{
23-
namespace DataDistribution
21+
namespace o2::DataDistribution
2422
{
2523

2624
using namespace std::chrono_literals;
@@ -158,42 +156,42 @@ ::grpc::Status TfSchedulerInstanceRpcImpl::HeartBeat(::grpc::ServerContext* /*co
158156
::grpc::Status TfSchedulerInstanceRpcImpl::GetPartitionState(::grpc::ServerContext* /*context*/,
159157
const ::o2::DataDistribution::PartitionInfo* /*request*/, ::o2::DataDistribution::PartitionResponse* response)
160158
{
161-
DDDLOG("gRPC server: GetPartitionState");
162-
159+
// Terminating?
163160
if (!accepting_updates()) {
164161
response->set_partition_state(mPartitionState);
162+
DDDLOG("gRPC server: GetPartitionState() state={}", PartitionState_Name(mPartitionState));
165163
return Status::OK;
166164
}
167165

168-
// Terminating?
169166
switch (mConnManager.getStfSenderState()) {
170167
case StfSenderState::STF_SENDER_STATE_OK:
171168
{
172-
response->set_partition_state(o2::DataDistribution::PartitionState::PARTITION_CONFIGURED);
169+
response->set_partition_state(PartitionState::PARTITION_CONFIGURED);
173170
response->set_info_message("Partition is fully configured.");
174171
break;
175172
}
176173
case StfSenderState::STF_SENDER_STATE_INITIALIZING:
177174
{
178-
response->set_partition_state(o2::DataDistribution::PartitionState::PARTITION_CONFIGURING);
175+
response->set_partition_state(PartitionState::PARTITION_CONFIGURING);
179176
const auto lMsg = fmt::format("Partition is being configured. Found {} out of {} StfSenders.",
180177
mConnManager.getStfSenderCount(), mConnManager.getStfSenderSet().size());
181178
response->set_info_message(lMsg);
182179
break;
183180
}
184181
case StfSenderState::STF_SENDER_STATE_INCOMPLETE:
185182
{
186-
response->set_partition_state(o2::DataDistribution::PartitionState::PARTITION_ERROR);
183+
response->set_partition_state(PartitionState::PARTITION_ERROR);
187184
response->set_info_message("Not all StfSenders are reachable.");
188185
break;
189186
}
190187
default:
191188
{
192-
response->set_partition_state(o2::DataDistribution::PartitionState::PARTITION_ERROR);
189+
response->set_partition_state(PartitionState::PARTITION_ERROR);
193190
response->set_info_message("Unknown partition state.");
194191
}
195192
}
196193

194+
DDDLOG("gRPC server: GetPartitionState() state={}", PartitionState_Name(response->partition_state()));
197195
return Status::OK;
198196
}
199197

@@ -207,7 +205,7 @@ ::grpc::Status TfSchedulerInstanceRpcImpl::TerminatePartition(::grpc::ServerCont
207205
updatePartitionState(PartitionState::PARTITION_TERMINATING);
208206
response->set_info_message("Terminate started.");
209207
} else {
210-
const auto lMsg = fmt::format("Terminate is already requested. partition_id={}", request->partition_id());
208+
const auto lMsg = fmt::format("Terminate was already requested. partition_id={}", request->partition_id());
211209
response->set_info_message(lMsg);
212210
WDDLOG(lMsg);
213211
}
@@ -298,5 +296,4 @@ ::grpc::Status TfSchedulerInstanceRpcImpl::StfSenderStfUpdate(::grpc::ServerCont
298296
}
299297

300298

301-
}
302299
} /* o2::DataDistribution */

src/TfScheduler/TfSchedulerStfInfo.cxx

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@
2222
#include <tuple>
2323
#include <algorithm>
2424

25-
namespace o2
26-
{
27-
namespace DataDistribution
25+
namespace o2::DataDistribution
2826
{
2927

3028
using namespace std::chrono_literals;
@@ -327,7 +325,7 @@ void TfSchedulerStfInfo::HighWatermarkThread()
327325
}
328326
}
329327

330-
DDDLOG("Exiting StfInfo Scheduling thread.");
328+
DDDLOG("Exiting HighWatermarkThread thread.");
331329
}
332330

333331

@@ -476,5 +474,4 @@ void TfSchedulerStfInfo::addStfInfo(const StfSenderStfInfo &pStfInfo, SchedulerS
476474
}
477475
}
478476

479-
}
480477
} /* o2::DataDistribution */

src/TfScheduler/TfSchedulerStfInfo.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,7 @@
3333
#include <thread>
3434
#include <chrono>
3535

36-
namespace o2
37-
{
38-
namespace DataDistribution
36+
namespace o2::DataDistribution
3937
{
4038

4139
using namespace std::chrono_literals;
@@ -85,6 +83,7 @@ class TfSchedulerStfInfo
8583
}
8684

8785
void stop() {
86+
DDDLOG("TfSchedulerStfInfo::stop()");
8887
mRunning = false;
8988
mDropQueue.stop();
9089
mCompleteStfsInfoQueue.stop();
@@ -120,7 +119,7 @@ class TfSchedulerStfInfo
120119

121120
private:
122121
/// Discard timeout for incomplete TFs
123-
static constexpr auto sStfDiscardTimeout = 10s;
122+
static constexpr auto sStfDiscardTimeout = 5s;
124123

125124
std::atomic_bool mRunning = false;
126125

@@ -174,7 +173,7 @@ class TfSchedulerStfInfo
174173
/// stale cleanup thread
175174
std::thread mStaleStfThread;
176175
};
177-
}
176+
178177
} /* namespace o2::DataDistribution */
179178

180179
#endif /* ALICEO2_TF_SCHEDULER_STF_INFO_H_ */

src/common/discovery/StfSenderRpcClient.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@
2727
#include <map>
2828
#include <thread>
2929

30-
namespace o2
31-
{
32-
namespace DataDistribution
30+
namespace o2::DataDistribution
3331
{
3432

3533
using grpc::Server;
@@ -68,7 +66,8 @@ class StfSenderRpcClient {
6866
PartitionInfo lPartInfo; // TODO: specify and check partition ID
6967
PartitionResponse lRet;
7068

71-
return mStub->TerminatePartition(&lContext, lPartInfo, &lRet).ok();
69+
mStub->TerminatePartition(&lContext, lPartInfo, &lRet);
70+
return true; // could have been stopped by the ECS
7271
}
7372

7473
bool is_ready() const;
@@ -230,8 +229,6 @@ class StfSenderRpcClientCollection {
230229
std::map<std::string, std::unique_ptr<StfSenderRpcClient>> mClients;
231230
};
232231

233-
}
234232
} /* namespace o2::DataDistribution */
235233

236234
#endif /* ALICEO2_DATADIST_StfSender_RPC_CLIENT_H_ */
237-

src/common/discovery/TfBuilderRpcClient.h

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@
2424
#include <map>
2525
#include <thread>
2626

27-
namespace o2
28-
{
29-
namespace DataDistribution
27+
namespace o2::DataDistribution
3028
{
3129

3230
using grpc::Server;
@@ -108,13 +106,10 @@ class TfBuilderRpcClientCtx {
108106
PartitionResponse lResponse;
109107

110108
auto lStatus = mStub->TerminatePartition(&lContext, lPartitionInfo, &lResponse);
111-
if (!lStatus.ok()) {
112-
return false;
113-
}
114109

115-
if (lStatus.ok() && lResponse.partition_state() != PartitionState::PARTITION_TERMINATING) {
116-
EDDLOG("TerminatePartition: TfBuilder. state={} message={}", lResponse.partition_state(), lStatus.error_message());
117-
}
110+
// this is best effort only. ECS could have already stopped them
111+
DDDLOG("TerminatePartition: TfBuilder. tfb_id={} state={} message={}",
112+
mTfBuilderConf.info().process_id(), lResponse.partition_state(), lStatus.error_message());
118113
return true;
119114
}
120115

@@ -279,7 +274,6 @@ class TfBuilderRpcClientCollection {
279274
};
280275

281276

282-
}
283277
} /* namespace o2::DataDistribution */
284278

285279
#endif /* ALICEO2_DATADIST_TFBUILDER_RPC_CLIENT_H_ */

0 commit comments

Comments
 (0)