Skip to content

Commit 3d43831

Browse files
authored
Revert "DPL: Better detection for injected workflows (#15130)"
This reverts commit 20be6e7.
1 parent f67af9e commit 3d43831

File tree

5 files changed

+58
-74
lines changed

5 files changed

+58
-74
lines changed

Framework/Core/src/ArrowSupport.cxx

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -680,12 +680,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
680680
workflow.erase(reader);
681681
} else {
682682
// load reader algorithm before deployment
683-
auto tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) {
684-
return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) {
685-
return DataSpecUtils::match(output, "TFN", "TFNumber", 0);
686-
});
687-
});
688-
if (tfnsource == workflow.end()) { // add normal reader algorithm only if no on-the-fly generator is injected
683+
auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](auto const& x) { return x.name == "mctracks-to-aod"; });
684+
if (mctracks2aod == workflow.end()) { // add normal reader algorithm only if no on-the-fly generator is injected
689685
reader->algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx));
690686
} // otherwise the algorithm was set in injectServiceDevices
691687
}

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -411,17 +411,13 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
411411

412412
// add the reader
413413
if (aodReader.outputs.empty() == false) {
414-
auto tfnsource = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) {
415-
return std::ranges::any_of(spec.outputs, [](OutputSpec const& output) {
416-
return DataSpecUtils::match(output, "TFN", "TFNumber", 0);
417-
});
418-
});
419-
if (tfnsource == workflow.end()) {
414+
auto mctracks2aod = std::ranges::find_if(workflow, [](auto const& x) { return x.name == "mctracks-to-aod"; });
415+
if (mctracks2aod == workflow.end()) {
420416
// add normal reader
421417
aodReader.outputs.emplace_back(OutputSpec{"TFN", "TFNumber"});
422418
aodReader.outputs.emplace_back(OutputSpec{"TFF", "TFFilename"});
423419
} else {
424-
// AODs are being injected the tfnsource is the entry point, add error-handler reader
420+
// AODs are being injected on-the-fly, add error-handler reader
425421
aodReader.algorithm = AlgorithmSpec{
426422
adaptStateful(
427423
[](DeviceSpec const& spec) {
@@ -704,11 +700,6 @@ void WorkflowHelpers::injectAODWriter(WorkflowSpec& workflow, ConfigContext cons
704700
return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
705701
});
706702
dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false;
707-
708-
it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec const& spec) -> bool {
709-
return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFF"));
710-
});
711-
dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false;
712703
}
713704
}
714705

run/o2sim_hepmc_publisher.cxx

Lines changed: 46 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@ struct O2simHepmcPublisher {
3737
int tfCounter = 0;
3838
std::shared_ptr<HepMC3::Reader> hepMCReader;
3939
bool eos = false;
40-
41-
std::vector<o2::pmr::vector<o2::MCTrack>*> mctracks_vector;
42-
std::vector<o2::dataformats::MCEventHeader*> mcheader_vector;
40+
std::vector<o2::MCTrack> mcTracks;
4341

4442
void init(o2::framework::InitContext& /*ic*/)
4543
{
@@ -52,19 +50,13 @@ struct O2simHepmcPublisher {
5250
LOGP(fatal, "Cannot open HEPMC kine file {}", (std::string)hepmcFileName);
5351
}
5452
// allocate the memory upfront to prevent reallocations later
55-
mctracks_vector.reserve(aggregate);
56-
mcheader_vector.reserve(aggregate);
53+
mcTracks.reserve(1e3 * aggregate);
5754
}
5855

5956
void run(o2::framework::ProcessingContext& pc)
6057
{
6158
HepMC3::GenEvent event;
62-
auto batch = maxEvents > 0 ? std::min((int)aggregate, (int)maxEvents - eventCounter) : (int)aggregate;
63-
for (auto i = 0; i < batch; ++i) {
64-
mctracks_vector.push_back(&pc.outputs().make<o2::pmr::vector<o2::MCTrack>>(Output{"MC", "MCTRACKS", 0}));
65-
auto& mctracks = mctracks_vector.back();
66-
mcheader_vector.push_back(&pc.outputs().make<o2::dataformats::MCEventHeader>(Output{"MC", "MCHEADER", 0}));
67-
auto& mcheader = mcheader_vector.back();
59+
for (auto i = 0; i < (int)aggregate; ++i) {
6860
// read next entry
6961
hepMCReader->read_event(event);
7062
if (hepMCReader->failed()) {
@@ -74,60 +66,61 @@ struct O2simHepmcPublisher {
7466
}
7567

7668
// create O2 MCHeader and MCtracks vector out of HEPMC event
77-
mcheader->SetEventID(event.event_number());
78-
mcheader->SetVertex(event.event_pos().px(), event.event_pos().py(), event.event_pos().pz());
69+
o2::dataformats::MCEventHeader mcHeader;
70+
mcHeader.SetEventID(event.event_number());
71+
mcHeader.SetVertex(event.event_pos().px(), event.event_pos().py(), event.event_pos().pz());
7972
auto xsecInfo = event.cross_section();
8073
if (xsecInfo != nullptr) {
81-
mcheader->putInfo(MCInfoKeys::acceptedEvents, (uint64_t)xsecInfo->get_accepted_events());
82-
mcheader->putInfo(MCInfoKeys::attemptedEvents, (uint64_t)xsecInfo->get_attempted_events());
83-
mcheader->putInfo(MCInfoKeys::xSection, (float)xsecInfo->xsec());
84-
mcheader->putInfo(MCInfoKeys::xSectionError, (float)xsecInfo->xsec_err());
74+
mcHeader.putInfo(MCInfoKeys::acceptedEvents, (uint64_t)xsecInfo->get_accepted_events());
75+
mcHeader.putInfo(MCInfoKeys::attemptedEvents, (uint64_t)xsecInfo->get_attempted_events());
76+
mcHeader.putInfo(MCInfoKeys::xSection, (float)xsecInfo->xsec());
77+
mcHeader.putInfo(MCInfoKeys::xSectionError, (float)xsecInfo->xsec_err());
8578
}
8679
auto scale = event.attribute<HepMC3::DoubleAttribute>(MCInfoKeys::eventScale);
8780
if (scale != nullptr) {
88-
mcheader->putInfo(MCInfoKeys::eventScale, (float)scale->value());
81+
mcHeader.putInfo(MCInfoKeys::eventScale, (float)scale->value());
8982
}
9083
auto nMPI = event.attribute<HepMC3::IntAttribute>(MCInfoKeys::mpi);
9184
if (nMPI != nullptr) {
92-
mcheader->putInfo(MCInfoKeys::mpi, nMPI->value());
85+
mcHeader.putInfo(MCInfoKeys::mpi, nMPI->value());
9386
}
9487
auto sid = event.attribute<HepMC3::IntAttribute>(MCInfoKeys::processCode);
9588
auto scode = event.attribute<HepMC3::IntAttribute>(MCInfoKeys::processID); // default pythia8 hepmc3 interface uses signal_process_id
9689
if (sid != nullptr) {
97-
mcheader->putInfo(MCInfoKeys::processCode, sid->value());
90+
mcHeader.putInfo(MCInfoKeys::processCode, sid->value());
9891
} else if (scode != nullptr) {
99-
mcheader->putInfo(MCInfoKeys::processCode, scode->value());
92+
mcHeader.putInfo(MCInfoKeys::processCode, scode->value());
10093
}
10194
auto pdfInfo = event.pdf_info();
10295
if (pdfInfo != nullptr) {
103-
mcheader->putInfo(MCInfoKeys::pdfParton1Id, pdfInfo->parton_id[0]);
104-
mcheader->putInfo(MCInfoKeys::pdfParton2Id, pdfInfo->parton_id[1]);
105-
mcheader->putInfo(MCInfoKeys::pdfCode1, pdfInfo->pdf_id[0]);
106-
mcheader->putInfo(MCInfoKeys::pdfCode2, pdfInfo->pdf_id[1]);
107-
mcheader->putInfo(MCInfoKeys::pdfX1, (float)pdfInfo->x[0]);
108-
mcheader->putInfo(MCInfoKeys::pdfX2, (float)pdfInfo->x[1]);
109-
mcheader->putInfo(MCInfoKeys::pdfScale, (float)pdfInfo->scale);
110-
mcheader->putInfo(MCInfoKeys::pdfXF1, (float)pdfInfo->xf[0]);
111-
mcheader->putInfo(MCInfoKeys::pdfXF2, (float)pdfInfo->xf[1]);
96+
mcHeader.putInfo(MCInfoKeys::pdfParton1Id, pdfInfo->parton_id[0]);
97+
mcHeader.putInfo(MCInfoKeys::pdfParton2Id, pdfInfo->parton_id[1]);
98+
mcHeader.putInfo(MCInfoKeys::pdfCode1, pdfInfo->pdf_id[0]);
99+
mcHeader.putInfo(MCInfoKeys::pdfCode2, pdfInfo->pdf_id[1]);
100+
mcHeader.putInfo(MCInfoKeys::pdfX1, (float)pdfInfo->x[0]);
101+
mcHeader.putInfo(MCInfoKeys::pdfX2, (float)pdfInfo->x[1]);
102+
mcHeader.putInfo(MCInfoKeys::pdfScale, (float)pdfInfo->scale);
103+
mcHeader.putInfo(MCInfoKeys::pdfXF1, (float)pdfInfo->xf[0]);
104+
mcHeader.putInfo(MCInfoKeys::pdfXF2, (float)pdfInfo->xf[1]);
112105
}
113106
auto heavyIon = event.heavy_ion();
114107
if (heavyIon != nullptr) {
115-
mcheader->putInfo(MCInfoKeys::nCollHard, heavyIon->Ncoll_hard);
116-
mcheader->putInfo(MCInfoKeys::nPartProjectile, heavyIon->Npart_proj);
117-
mcheader->putInfo(MCInfoKeys::nPartTarget, heavyIon->Npart_targ);
118-
mcheader->putInfo(MCInfoKeys::nColl, heavyIon->Ncoll);
119-
mcheader->putInfo(MCInfoKeys::nCollNNWounded, heavyIon->N_Nwounded_collisions);
120-
mcheader->putInfo(MCInfoKeys::nCollNWoundedN, heavyIon->Nwounded_N_collisions);
121-
mcheader->putInfo(MCInfoKeys::nCollNWoundedNwounded, heavyIon->Nwounded_Nwounded_collisions);
122-
mcheader->putInfo(MCInfoKeys::nSpecProjectileNeutron, heavyIon->Nspec_proj_n);
123-
mcheader->putInfo(MCInfoKeys::nSpecProjectileProton, heavyIon->Nspec_proj_p);
124-
mcheader->putInfo(MCInfoKeys::nSpecTargetNeutron, heavyIon->Nspec_targ_n);
125-
mcheader->putInfo(MCInfoKeys::nSpecTargetProton, heavyIon->Nspec_targ_p);
126-
mcheader->putInfo(MCInfoKeys::impactParameter, (float)heavyIon->impact_parameter);
127-
mcheader->putInfo(MCInfoKeys::planeAngle, (float)heavyIon->event_plane_angle);
128-
mcheader->putInfo("eccentricity", (float)heavyIon->eccentricity);
129-
mcheader->putInfo(MCInfoKeys::sigmaInelNN, (float)heavyIon->sigma_inel_NN);
130-
mcheader->putInfo(MCInfoKeys::centrality, (float)heavyIon->centrality);
108+
mcHeader.putInfo(MCInfoKeys::nCollHard, heavyIon->Ncoll_hard);
109+
mcHeader.putInfo(MCInfoKeys::nPartProjectile, heavyIon->Npart_proj);
110+
mcHeader.putInfo(MCInfoKeys::nPartTarget, heavyIon->Npart_targ);
111+
mcHeader.putInfo(MCInfoKeys::nColl, heavyIon->Ncoll);
112+
mcHeader.putInfo(MCInfoKeys::nCollNNWounded, heavyIon->N_Nwounded_collisions);
113+
mcHeader.putInfo(MCInfoKeys::nCollNWoundedN, heavyIon->Nwounded_N_collisions);
114+
mcHeader.putInfo(MCInfoKeys::nCollNWoundedNwounded, heavyIon->Nwounded_Nwounded_collisions);
115+
mcHeader.putInfo(MCInfoKeys::nSpecProjectileNeutron, heavyIon->Nspec_proj_n);
116+
mcHeader.putInfo(MCInfoKeys::nSpecProjectileProton, heavyIon->Nspec_proj_p);
117+
mcHeader.putInfo(MCInfoKeys::nSpecTargetNeutron, heavyIon->Nspec_targ_n);
118+
mcHeader.putInfo(MCInfoKeys::nSpecTargetProton, heavyIon->Nspec_targ_p);
119+
mcHeader.putInfo(MCInfoKeys::impactParameter, (float)heavyIon->impact_parameter);
120+
mcHeader.putInfo(MCInfoKeys::planeAngle, (float)heavyIon->event_plane_angle);
121+
mcHeader.putInfo("eccentricity", (float)heavyIon->eccentricity);
122+
mcHeader.putInfo(MCInfoKeys::sigmaInelNN, (float)heavyIon->sigma_inel_NN);
123+
mcHeader.putInfo(MCInfoKeys::centrality, (float)heavyIon->centrality);
131124
}
132125

133126
auto particles = event.particles();
@@ -138,21 +131,26 @@ struct O2simHepmcPublisher {
138131
auto has_children = children.size() > 0;
139132
auto p = particle->momentum();
140133
auto v = particle->production_vertex();
141-
mctracks->emplace_back(
134+
mcTracks.emplace_back(
142135
particle->pid(),
143136
has_parents ? parents.front()->id() : -1, has_parents ? parents.back()->id() : -1,
144137
has_children ? children.front()->id() : -1, has_children ? children.back()->id() : -1,
145138
p.px(), p.py(), p.pz(),
146139
v->position().x(), v->position().y(), v->position().z(),
147140
v->position().t(), 0);
148141
}
142+
143+
// add to the message
144+
pc.outputs().snapshot(Output{"MC", "MCHEADER", 0}, mcHeader);
145+
pc.outputs().snapshot(Output{"MC", "MCTRACKS", 0}, mcTracks);
146+
mcTracks.clear();
149147
++eventCounter;
150148
}
151149

152150
// report number of TFs injected for the rate limiter to work
153151
++tfCounter;
154152
pc.services().get<o2::monitoring::Monitoring>().send(o2::monitoring::Metric{(uint64_t)tfCounter, "df-sent"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
155-
if (eos || (maxEvents > 0 && eventCounter >= maxEvents)) {
153+
if (eos || (maxEvents > 0 && eventCounter == maxEvents)) {
156154
pc.services().get<ControlService>().endOfStream();
157155
pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
158156
}

run/o2sim_kine_publisher.cxx

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ struct O2simKinePublisher {
4040

4141
void run(o2::framework::ProcessingContext& pc)
4242
{
43-
auto batch = std::min((int)aggregate, nEvents - eventCounter);
44-
for (auto i = 0; i < batch; ++i) {
43+
for (auto i = 0; i < std::min((int)aggregate, nEvents - eventCounter); ++i) {
4544
auto mcevent = mcKinReader->getMCEventHeader(0, eventCounter);
4645
auto mctracks = mcKinReader->getTracks(0, eventCounter);
4746
pc.outputs().snapshot(Output{"MC", "MCHEADER", 0}, mcevent);

run/o2sim_mctracks_to_aod.cxx

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ struct MctracksToAod {
7070
/** Run the conversion */
7171
void run(o2::framework::ProcessingContext& pc)
7272
{
73-
LOG(detail) << "=== Running extended MC AOD exporter ===";
73+
LOG(debug) << "=== Running extended MC AOD exporter ===";
7474
using namespace o2::aodmchelpers;
7575
using McHeader = o2::dataformats::MCEventHeader;
7676
using McTrack = o2::MCTrack;
@@ -94,26 +94,26 @@ struct MctracksToAod {
9494
// TODO: include BC simulation
9595
auto bcCounter = 0UL;
9696
size_t offset = 0;
97-
LOG(detail) << "--- Loop over " << nParts << " parts ---";
97+
LOG(debug) << "--- Loop over " << nParts << " parts ---";
9898
for (auto i = 0U; i < nParts; ++i) {
9999
auto record = mSampler.generateCollisionTime();
100100
auto header = pc.inputs().get<McHeader*>("mcheader", i);
101101
auto tracks = pc.inputs().get<McTracks>("mctracks", i);
102102

103-
LOG(detail) << "Updating collision table";
103+
LOG(debug) << "Updating collision table";
104104
auto genID = updateMCCollisions(mCollisions.cursor,
105105
bcCounter,
106106
record.timeInBCNS * 1.e-3,
107107
*header,
108108
0,
109109
i);
110110

111-
LOG(detail) << "Updating HepMC tables";
111+
LOG(debug) << "Updating HepMC tables";
112112
updateHepMCXSection(mXSections.cursor, bcCounter, genID, *header);
113113
updateHepMCPdfInfo(mPdfInfos.cursor, bcCounter, genID, *header);
114114
updateHepMCHeavyIon(mHeavyIons.cursor, bcCounter, genID, *header);
115115

116-
LOG(detail) << "Updating particles table";
116+
LOG(debug) << "Updating particles table";
117117
TrackToIndex preselect;
118118
offset = updateParticles(mParticles.cursor,
119119
bcCounter,
@@ -123,7 +123,7 @@ struct MctracksToAod {
123123
(bool)filt,
124124
false);
125125

126-
LOG(detail) << "Increment BC counter";
126+
LOG(debug) << "Increment BC counter";
127127
bcCounter++;
128128
}
129129

0 commit comments

Comments
 (0)