diff --git a/Detectors/CTF/workflow/src/CTFReaderSpec.cxx b/Detectors/CTF/workflow/src/CTFReaderSpec.cxx index 9b16e65c3a2b7..70bb589e8836a 100644 --- a/Detectors/CTF/workflow/src/CTFReaderSpec.cxx +++ b/Detectors/CTF/workflow/src/CTFReaderSpec.cxx @@ -148,6 +148,10 @@ void CTFReaderSpec::init(InitContext& ic) mUseLocalTFCounter = ic.options().get("local-tf-counter"); mImposeRunStartMS = ic.options().get("impose-run-start-timstamp"); mInput.checkTFLimitBeforeReading = ic.options().get("limit-tf-before-reading"); + mInput.maxTFs = ic.options().get("max-tf"); + mInput.maxTFs = mInput.maxTFs > 0 ? mInput.maxTFs : 0x7fffffff; + mInput.maxTFsPerFile = ic.options().get("max-tf-per-file"); + mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff; mRunning = true; mFileFetcher = std::make_unique(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd); mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache); @@ -474,6 +478,9 @@ DataProcessorSpec getCTFReaderSpec(const CTFReaderInp& inp) options.emplace_back(ConfigParamSpec{"local-tf-counter", VariantType::Bool, false, {"reassign header.tfCounter from local TF counter"}}); options.emplace_back(ConfigParamSpec{"fetch-failure-threshold", VariantType::Float, 0.f, {"Fail if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}}); options.emplace_back(ConfigParamSpec{"limit-tf-before-reading", VariantType::Bool, false, {"Check TF limiting before reading new TF, otherwhise before injecting it"}}); + options.emplace_back(ConfigParamSpec{"max-tf", VariantType::Int, -1, {"max CTFs to process (<= 0 : infinite)"}}); + options.emplace_back(ConfigParamSpec{"max-tf-per-file", VariantType::Int, -1, {"max TFs to process per ctf file (<= 0 : infinite)"}}); + if (!inp.metricChannel.empty()) { options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, inp.metricChannel, {"Out-of-band channel config for TF throttling"}}); } diff --git a/Detectors/CTF/workflow/src/ctf-reader-workflow.cxx b/Detectors/CTF/workflow/src/ctf-reader-workflow.cxx index a12c9c10f9dd8..90d259f4e3a5c 100644 --- a/Detectors/CTF/workflow/src/ctf-reader-workflow.cxx +++ b/Detectors/CTF/workflow/src/ctf-reader-workflow.cxx @@ -54,8 +54,6 @@ void customize(std::vector& workflowOptions) options.push_back(ConfigParamSpec{"ctf-input", VariantType::String, "none", {"comma-separated list CTF input files"}}); options.push_back(ConfigParamSpec{"onlyDet", VariantType::String, std::string{DetID::ALL}, {"comma-separated list of detectors to accept. Overrides skipDet"}}); options.push_back(ConfigParamSpec{"skipDet", VariantType::String, std::string{DetID::NONE}, {"comma-separate list of detectors to skip"}}); - options.push_back(ConfigParamSpec{"max-tf", VariantType::Int, -1, {"max CTFs to process (<= 0 : infinite)"}}); - options.push_back(ConfigParamSpec{"max-tf-per-file", VariantType::Int, -1, {"max TFs to process per ctf file (<= 0 : infinite)"}}); options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (infinite for N<0)"}}); options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}}); options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files or no-copy to avoid copying"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access @@ -117,11 +115,6 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext) if (ctfInput.delay_us < 0) { ctfInput.delay_us = 0; } - int n = configcontext.options().get("max-tf"); - ctfInput.maxTFs = n > 0 ? n : 0x7fffffff; - - n = configcontext.options().get("max-tf-per-file"); - ctfInput.maxTFsPerFile = n > 0 ? n : 0x7fffffff; ctfInput.maxFileCache = std::max(1, configcontext.options().get("max-cached-files")); diff --git a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx index 594d26b5682c6..58a2a775537d4 100644 --- a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx +++ b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx @@ -99,6 +99,12 @@ TFReaderSpec::TFReaderSpec(const TFReaderInp& rinp) : mInput(rinp) void TFReaderSpec::init(o2f::InitContext& ic) { mInput.tfIDs = o2::RangeTokenizer::tokenize(ic.options().get("select-tf-ids")); + mInput.maxTFs = ic.options().get("max-tf"); + mInput.maxTFs = mInput.maxTFs > 0 ? mInput.maxTFs : 0x7fffffff; + mInput.maxTFsPerFile = ic.options().get("max-tf-per-file"); + mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff; + mInput.maxTFCache = std::max(1, ic.options().get("max-cached-tf")); + mInput.maxFileCache = std::max(1, ic.options().get("max-cached-files")); mFileFetcher = std::make_unique(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd); mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache); mFileFetcher->setMaxLoops(mInput.maxLoops); @@ -417,15 +423,17 @@ void TFReaderSpec::TFBuilder() } mTFBuilderCounter++; } - if (!acceptTF) { - continue; - } if (mRunning && tf) { - mWaitSendingLast = true; - mTFQueue.push(std::move(tf)); + if (acceptTF) { + mWaitSendingLast = true; + mTFQueue.push(std::move(tf)); + } } else { break; } + if (mInput.maxTFsPerFile > 0 && mInput.maxTFsPerFile >= locID) { // go to next file + break; + } } // remove already processed file from the queue, unless they are needed for further looping if (mFileFetcher) { @@ -527,6 +535,11 @@ o2f::DataProcessorSpec o2::rawdd::getTFReaderSpec(o2::rawdd::TFReaderInp& rinp) } spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String, "", {"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}}); spec.options.emplace_back(o2f::ConfigParamSpec{"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}}); + spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf", o2f::VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}}); + spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf-per-file", o2f::VariantType::Int, -1, {"max TFs to process per raw-tf file (<= 0 : infinite)"}}); + spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-tf", o2f::VariantType::Int, 3, {"max TFs to cache in memory"}}); + spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-files", o2f::VariantType::Int, 3, {"max TF files queued (copied for remote source)"}}); + spec.algorithm = o2f::adaptFromTask(rinp); return spec; diff --git a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h index b4bb07fad24be..e3a5b5c920010 100644 --- a/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h +++ b/Detectors/Raw/TFReaderDD/src/TFReaderSpec.h @@ -43,6 +43,7 @@ struct TFReaderInp { int64_t delay_us = 0; int maxLoops = 0; int maxTFs = -1; + int maxTFsPerFile = -1; bool sendDummyForMissing = true; bool sup0xccdb = false; std::vector hdVec; diff --git a/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx b/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx index c468d1660fcc7..7d8ee09fe474f 100644 --- a/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx +++ b/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx @@ -28,20 +28,16 @@ void customize(std::vector& workflowOptions) options.push_back(ConfigParamSpec{"onlyDet", VariantType::String, "all", {"list of dectors"}}); options.push_back(ConfigParamSpec{"raw-only-det", VariantType::String, "none", {"do not open non-raw channel for these detectors"}}); options.push_back(ConfigParamSpec{"non-raw-only-det", VariantType::String, "none", {"do not open raw channel for these detectors"}}); - options.push_back(ConfigParamSpec{"max-tf", VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}}); options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (-1 = infinite)"}}); options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}}); options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access options.push_back(ConfigParamSpec{"tf-file-regex", VariantType::String, ".+\\.tf$", {"regex string to identify TF files"}}); options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access - options.push_back(ConfigParamSpec{"max-cached-tf", VariantType::Int, 3, {"max TFs to cache in memory"}}); - options.push_back(ConfigParamSpec{"max-cached-files", VariantType::Int, 3, {"max TF files queued (copied for remote source)"}}); options.push_back(ConfigParamSpec{"tf-reader-verbosity", VariantType::Int, 0, {"verbosity level (1 or 2: check RDH, print DH/DPH for 1st or all slices, >2 print RDH)"}}); options.push_back(ConfigParamSpec{"raw-channel-config", VariantType::String, "", {"optional raw FMQ channel for non-DPL output"}}); options.push_back(ConfigParamSpec{"send-diststf-0xccdb", VariantType::Bool, false, {"send explicit FLP/DISTSUBTIMEFRAME/0xccdb output"}}); options.push_back(ConfigParamSpec{"disable-dummy-output", VariantType::Bool, false, {"Disable sending empty output if corresponding data is not found in the data"}}); options.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"semicolon separated key=value strings"}}); - options.push_back(ConfigParamSpec{"timeframes-shm-limit", VariantType::String, "0", {"Minimum amount of SHM required in order to publish data"}}); options.push_back(ConfigParamSpec{"metric-feedback-channel-format", VariantType::String, "name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", {"format for the metric-feedback channel for TF rate limiting"}}); @@ -59,8 +55,6 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext) o2::rawdd::TFReaderInp rinp; rinp.inpdata = configcontext.options().get("input-data"); rinp.maxLoops = configcontext.options().get("loop"); - int n = configcontext.options().get("max-tf"); - rinp.maxTFs = n > 0 ? n : 0x7fffffff; auto detlistSelect = configcontext.options().get("onlyDet"); if (detlistSelect == "all") { // Exclude FOCAL from default detlist (must be selected on request) @@ -74,8 +68,6 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext) rinp.rawChannelConfig = configcontext.options().get("raw-channel-config"); rinp.delay_us = uint64_t(1e6 * configcontext.options().get("delay")); // delay in microseconds rinp.verbosity = configcontext.options().get("tf-reader-verbosity"); - rinp.maxTFCache = std::max(1, configcontext.options().get("max-cached-tf")); - rinp.maxFileCache = std::max(1, configcontext.options().get("max-cached-files")); rinp.copyCmd = configcontext.options().get("copy-cmd"); rinp.tffileRegex = configcontext.options().get("tf-file-regex"); rinp.remoteRegex = configcontext.options().get("remote-regex");