From f36998f950d294d05aa9416a82c308d0a1fac1dc Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Wed, 1 Apr 2026 13:03:10 +0200 Subject: [PATCH 1/5] MINIFICPP-2764 Controller Service CPP API --- .../api/core/ControllerServiceContext.h | 41 ++++++++++++ .../include/api/core/ControllerServiceImpl.h | 61 +++++++++++++++++ .../include/api/core/ProcessContext.h | 1 + .../include/api/core/ProcessorImpl.h | 7 +- .../include/api/core/Resource.h | 66 +++++++++++++++---- .../include/api/utils/ProcessorConfigUtils.h | 24 +++++++ .../libtest/CProcessorTestUtils.h | 30 +++++++-- .../src/core/ControllerServiceContext.cpp | 40 +++++++++++ .../src/core/ControllerServiceImpl.cpp | 65 ++++++++++++++++++ .../src/core/ProcessContext.cpp | 13 ++++ libminifi/include/utils/CControllerService.h | 9 ++- libminifi/src/minifi-c.cpp | 2 +- libminifi/test/libtest/unit/TestBase.cpp | 64 +++++++++++------- libminifi/test/libtest/unit/TestBase.h | 3 +- 14 files changed, 376 insertions(+), 50 deletions(-) create mode 100644 extension-framework/cpp-extension-lib/include/api/core/ControllerServiceContext.h create mode 100644 extension-framework/cpp-extension-lib/include/api/core/ControllerServiceImpl.h create mode 100644 extension-framework/cpp-extension-lib/src/core/ControllerServiceContext.cpp create mode 100644 extension-framework/cpp-extension-lib/src/core/ControllerServiceImpl.cpp diff --git a/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceContext.h b/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceContext.h new file mode 100644 index 0000000000..b30764304b --- /dev/null +++ b/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceContext.h @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "minifi-c.h" +#include "minifi-cpp/core/PropertyDefinition.h" +#include "nonstd/expected.hpp" + +namespace org::apache::nifi::minifi::api::core { + +class ControllerServiceContext { + public: + explicit ControllerServiceContext(MinifiControllerServiceContext* impl) : impl_(impl) {} + + nonstd::expected getProperty(std::string_view name) const; + nonstd::expected getProperty(const minifi::core::PropertyReference& property_reference) const { + return getProperty(property_reference.name); + } + + private: + MinifiControllerServiceContext* impl_; +}; + +} // namespace org::apache::nifi::minifi::api::core diff --git a/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceImpl.h b/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceImpl.h new file mode 100644 index 0000000000..77fe1e3427 --- /dev/null +++ b/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceImpl.h @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include "ControllerServiceContext.h" +#include "minifi-cpp/core/ControllerServiceMetadata.h" +#include "minifi-cpp/utils/Id.h" +#include "utils/SmallString.h" + +namespace org::apache::nifi::minifi::api { + +class Connection; + +namespace core { + +class ControllerServiceImpl { + public: + explicit ControllerServiceImpl(minifi::core::ControllerServiceMetadata metadata); + + ControllerServiceImpl(const ControllerServiceImpl&) = delete; + ControllerServiceImpl(ControllerServiceImpl&&) = delete; + ControllerServiceImpl& operator=(const ControllerServiceImpl&) = delete; + ControllerServiceImpl& operator=(ControllerServiceImpl&&) = delete; + + virtual ~ControllerServiceImpl(); + + MinifiStatus enable(ControllerServiceContext&); + void notifyStop(); + + [[nodiscard]] std::string getName() const; + [[nodiscard]] minifi::utils::Identifier getUUID() const; + [[nodiscard]] minifi::utils::SmallString<36> getUUIDStr() const; + + protected: + virtual MinifiStatus enableImpl(api::core::ControllerServiceContext&) = 0; + virtual void notifyStopImpl() {} + + minifi::core::ControllerServiceMetadata metadata_; + + std::shared_ptr logger_; +}; + +} // namespace core +} // namespace org::apache::nifi::minifi::api diff --git a/extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h b/extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h index 442f9bbf46..cb0a123325 100644 --- a/extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h +++ b/extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h @@ -34,6 +34,7 @@ class ProcessContext { std::expected getProperty(const minifi::core::PropertyReference& property_reference, const FlowFile* flow_file = nullptr) const { return getProperty(property_reference.name, flow_file); } + nonstd::expected getControllerService(std::string_view controller_service_name, std::string_view controller_service_class) const; bool hasNonEmptyProperty(std::string_view name) const; diff --git a/extension-framework/cpp-extension-lib/include/api/core/ProcessorImpl.h b/extension-framework/cpp-extension-lib/include/api/core/ProcessorImpl.h index a5753b2d3f..43200f58df 100644 --- a/extension-framework/cpp-extension-lib/include/api/core/ProcessorImpl.h +++ b/extension-framework/cpp-extension-lib/include/api/core/ProcessorImpl.h @@ -17,9 +17,7 @@ #pragma once #include -#include #include -#include #include #include @@ -53,7 +51,7 @@ class ProcessorImpl { virtual ~ProcessorImpl(); - void setTriggerWhenEmpty(bool trigger_when_empty) { + void setTriggerWhenEmpty(const bool trigger_when_empty) { trigger_when_empty_ = trigger_when_empty; } @@ -89,9 +87,6 @@ class ProcessorImpl { std::atomic trigger_when_empty_; std::shared_ptr logger_; - - private: - mutable std::mutex mutex_; }; } // namespace core diff --git a/extension-framework/cpp-extension-lib/include/api/core/Resource.h b/extension-framework/cpp-extension-lib/include/api/core/Resource.h index 292a996398..c9777330bc 100644 --- a/extension-framework/cpp-extension-lib/include/api/core/Resource.h +++ b/extension-framework/cpp-extension-lib/include/api/core/Resource.h @@ -26,14 +26,16 @@ #define WIN32_LEAN_AND_MEAN 1 #endif -#include "minifi-c.h" -#include "core/ClassName.h" -#include "api/utils/minifi-c-utils.h" +#include "ControllerServiceContext.h" +#include "FlowFile.h" #include "ProcessContext.h" #include "ProcessSession.h" -#include "FlowFile.h" -#include "minifi-cpp/core/ProcessorMetadata.h" +#include "api/utils/minifi-c-utils.h" +#include "core/ClassName.h" #include "logging/Logger.h" +#include "minifi-c.h" +#include "minifi-cpp/core/ControllerServiceMetadata.h" +#include "minifi-cpp/core/ProcessorMetadata.h" namespace org::apache::nifi::minifi::api::core { @@ -94,11 +96,12 @@ void useProcessorClassDescription(Fn&& fn) { .callbacks = MinifiProcessorCallbacks{ .create = [] (MinifiProcessorMetadata metadata) -> MINIFI_OWNED void* { - return new Class{minifi::core::ProcessorMetadata{ - .uuid = minifi::utils::Identifier::parse(std::string{metadata.uuid.data, metadata.uuid.length}).value(), - .name = std::string{metadata.name.data, metadata.name.length}, - .logger = std::make_shared(metadata.logger) - }}; + try { + return new Class{minifi::core::ProcessorMetadata{ + .uuid = minifi::utils::Identifier::parse(std::string{metadata.uuid.data, metadata.uuid.length}).value(), + .name = std::string{metadata.name.data, metadata.name.length}, + .logger = std::make_shared(metadata.logger)}}; + } catch (...) { return nullptr; } }, .destroy = [] (MINIFI_OWNED void* self) -> void { delete static_cast(self); @@ -127,7 +130,9 @@ void useProcessorClassDescription(Fn&& fn) { } }, .onUnSchedule = [] (void* self) -> void { - static_cast(self)->onUnSchedule(); + try { + static_cast(self)->onUnSchedule(); + } catch (...) {} }, .calculateMetrics = [] (void* self) -> MINIFI_OWNED MinifiPublishedMetrics* { auto metrics = static_cast(self)->calculateMetrics(); @@ -145,4 +150,43 @@ void useProcessorClassDescription(Fn&& fn) { fn(description); } +template +void useControllerServiceClassDescription(Fn&& fn) { + std::vector> string_vector_cache; + + const auto full_name = minifi::core::className(); + + std::vector class_properties = utils::toProperties(Class::Properties, string_vector_cache); + + MinifiControllerServiceClassDefinition description{.full_name = utils::toStringView(full_name), + .description = utils::toStringView(Class::Description), + .class_properties_count = gsl::narrow(class_properties.size()), + .class_properties_ptr = class_properties.data(), + + .callbacks = MinifiControllerServiceCallbacks{ + .create = [](MinifiControllerServiceMetadata metadata) -> MINIFI_OWNED void* { + try { + return new Class{minifi::core::ControllerServiceMetadata{ + .uuid = minifi::utils::Identifier::parse(std::string{metadata.uuid.data, metadata.uuid.length}).value(), + .name = std::string{metadata.name.data, metadata.name.length}, + .logger = std::make_shared(metadata.logger)}}; + } catch (...) { return nullptr; } + }, + .destroy = [](MINIFI_OWNED void* self) -> void { delete static_cast(self); }, + .enable = [](void* self, MinifiControllerServiceContext* context) -> MinifiStatus { + ControllerServiceContext context_wrapper(context); + try { + return static_cast(self)->enable(context_wrapper); + } catch (...) { return MINIFI_STATUS_UNKNOWN_ERROR; } + }, + .notifyStop = [](void* self) -> void { + try { + static_cast(self)->notifyStop(); + } catch (...) {} + }, + }}; + + fn(description); +} + } // namespace org::apache::nifi::minifi::api::core diff --git a/extension-framework/cpp-extension-lib/include/api/utils/ProcessorConfigUtils.h b/extension-framework/cpp-extension-lib/include/api/utils/ProcessorConfigUtils.h index 6a14699d7b..802ae02fc9 100644 --- a/extension-framework/cpp-extension-lib/include/api/utils/ProcessorConfigUtils.h +++ b/extension-framework/cpp-extension-lib/include/api/utils/ProcessorConfigUtils.h @@ -167,4 +167,28 @@ std::optional parseOptionalEnumProperty(const core::ProcessContext& context, return result.value(); } +template +ControllerServiceType* parseOptionalControllerService(const core::ProcessContext& context, const minifi::core::PropertyReference& prop) { + const auto controller_service_name = context.getProperty(prop.name); + if (!controller_service_name || controller_service_name->empty()) { + return nullptr; + } + + nonstd::expected service = context.getControllerService(*controller_service_name, minifi::core::className()); + if (!service) { + return nullptr; + } + + return reinterpret_cast(*service); +} + +template +gsl::not_null parseControllerService(const core::ProcessContext& context, const minifi::core::PropertyReference& prop) { + auto controller_service = parseOptionalControllerService(context, prop); + if (!controller_service) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Required controller service property '{}' is missing", prop.name)); + } + return gsl::make_not_null(controller_service); +} + } // namespace org::apache::nifi::minifi::api::utils diff --git a/extension-framework/cpp-extension-lib/libtest/CProcessorTestUtils.h b/extension-framework/cpp-extension-lib/libtest/CProcessorTestUtils.h index 142f4709ef..cc61aea1a7 100644 --- a/extension-framework/cpp-extension-lib/libtest/CProcessorTestUtils.h +++ b/extension-framework/cpp-extension-lib/libtest/CProcessorTestUtils.h @@ -1,5 +1,5 @@ /** -* + * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -17,22 +17,40 @@ */ #pragma once +#include "api/core/Resource.h" #include "core/Processor.h" +#include "core/controller/ControllerService.h" +#include "minifi-cpp/core/ControllerServiceMetadata.h" #include "minifi-cpp/core/ProcessorMetadata.h" -#include "api/core/Resource.h" +#include "utils/CControllerService.h" #include "utils/CProcessor.h" +#include "minifi-cpp/agent/agent_docs.h" namespace org::apache::nifi::minifi::test::utils { -template -std::unique_ptr make_custom_c_processor(minifi::core::ProcessorMetadata metadata, Args&&... args) { // NOLINT(cppcoreguidelines-missing-std-forward) +template +std::unique_ptr make_custom_c_processor(minifi::core::ProcessorMetadata metadata, + Args&&... args) { // NOLINT(cppcoreguidelines-missing-std-forward) std::unique_ptr processor_impl; - minifi::api::core::useProcessorClassDescription([&] (const MinifiProcessorClassDefinition& description) { - minifi::utils::useCProcessorClassDescription(description, [&] (const auto&, auto c_description) { + minifi::api::core::useProcessorClassDescription([&](const MinifiProcessorClassDefinition& description) { + minifi::utils::useCProcessorClassDescription(description, [&](const auto&, auto c_description) { processor_impl = std::make_unique(std::move(c_description), metadata, new T(metadata, std::forward(args)...)); }); }); return std::make_unique(metadata.name, metadata.uuid, std::move(processor_impl)); } +template +std::shared_ptr make_custom_c_controller_service(minifi::core::ControllerServiceMetadata metadata, + Args&&... args) { // NOLINT(cppcoreguidelines-missing-std-forward) + std::unique_ptr controller_service_impl; + minifi::api::core::useControllerServiceClassDescription([&](const MinifiControllerServiceClassDefinition& description) { + minifi::utils::useCControllerServiceClassDescription(description, [&](const auto&, auto c_description) { + controller_service_impl = std::make_unique(std::move(c_description), + metadata, + new T(metadata, std::forward(args)...)); + }); + }); + return std::make_shared(metadata.name, metadata.uuid, std::move(controller_service_impl)); +} } // namespace org::apache::nifi::minifi::test::utils diff --git a/extension-framework/cpp-extension-lib/src/core/ControllerServiceContext.cpp b/extension-framework/cpp-extension-lib/src/core/ControllerServiceContext.cpp new file mode 100644 index 0000000000..a3b9254835 --- /dev/null +++ b/extension-framework/cpp-extension-lib/src/core/ControllerServiceContext.cpp @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "api/core/ControllerServiceContext.h" +#include "api/utils/minifi-c-utils.h" + +namespace org::apache::nifi::minifi::api::core { + +nonstd::expected ControllerServiceContext::getProperty(const std::string_view name) const { + std::optional value = std::nullopt; + const MinifiStatus status = MinifiControllerServiceContextGetProperty(impl_, utils::toStringView(name), + [] (void* data, const MinifiStringView result) { + (*static_cast*>(data)) = std::string(result.data, result.length); + }, &value); + + if (status != MINIFI_STATUS_SUCCESS) { + return nonstd::make_unexpected(utils::make_error_code(status)); + } + + if (!value) { + return nonstd::make_unexpected(utils::make_error_code(MINIFI_STATUS_UNKNOWN_ERROR)); + } + return value.value(); +} + +} // namespace org::apache::nifi::minifi::api::core diff --git a/extension-framework/cpp-extension-lib/src/core/ControllerServiceImpl.cpp b/extension-framework/cpp-extension-lib/src/core/ControllerServiceImpl.cpp new file mode 100644 index 0000000000..b17dd77424 --- /dev/null +++ b/extension-framework/cpp-extension-lib/src/core/ControllerServiceImpl.cpp @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "api/core/ControllerServiceImpl.h" + +#include +#include + +#include "fmt/format.h" +namespace org::apache::nifi::minifi::api::core { + +ControllerServiceImpl::ControllerServiceImpl(minifi::core::ControllerServiceMetadata metadata) + : metadata_(std::move(metadata)), + logger_(metadata_.logger) { + logger_->log_debug("ControllerService {} created with uuid {}", getName(), getUUIDStr()); +} + +ControllerServiceImpl::~ControllerServiceImpl() { + logger_->log_debug("Destroying controller service {} with uuid {}", getName(), getUUIDStr()); +} + +MinifiStatus ControllerServiceImpl::enable(ControllerServiceContext& ctx) { + try { + return enableImpl(ctx); + } catch (const std::exception& e) { + logger_->log_error("{}", e.what()); + throw; + } +} + +void ControllerServiceImpl::notifyStop() { + try { + notifyStopImpl(); + } catch (const std::exception& e) { + logger_->log_error("{}", e.what()); + throw; + } +} + +std::string ControllerServiceImpl::getName() const { + return metadata_.name; +} + +utils::Identifier ControllerServiceImpl::getUUID() const { + return metadata_.uuid; +} + +utils::SmallString<36> ControllerServiceImpl::getUUIDStr() const { + return getUUID().to_string(); +} + +} // namespace org::apache::nifi::minifi::api::core diff --git a/extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp b/extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp index 958274d7d5..7f164d3f3c 100644 --- a/extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp +++ b/extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp @@ -38,4 +38,17 @@ bool ProcessContext::hasNonEmptyProperty(std::string_view name) const { return MinifiProcessContextHasNonEmptyProperty(impl_, utils::toStringView(name)); } +nonstd::expected ProcessContext::getControllerService(const std::string_view controller_service_name, + const std::string_view controller_service_class) const { + void* controller_service = nullptr; + if (const MinifiStatus status = MinifiProcessContextGetControllerService(impl_, + utils::toStringView(controller_service_name), + utils::toStringView(controller_service_class), + &controller_service); + status != MINIFI_STATUS_SUCCESS) { + return nonstd::make_unexpected(utils::make_error_code(status)); + } + return static_cast(controller_service); +} + } // namespace org::apache::nifi::minifi::api::core diff --git a/libminifi/include/utils/CControllerService.h b/libminifi/include/utils/CControllerService.h index 9dea54dbef..6e000548df 100644 --- a/libminifi/include/utils/CControllerService.h +++ b/libminifi/include/utils/CControllerService.h @@ -26,8 +26,12 @@ #include "minifi-cpp/core/Property.h" #include "minifi-cpp/core/controller/ControllerServiceApi.h" +namespace org::apache::nifi::minifi { +struct ClassDescription; +} + namespace org::apache::nifi::minifi::utils { -class CControllerService; + struct CControllerServiceClassDescription { @@ -108,7 +112,6 @@ class CControllerService final : public core::controller::ControllerServiceApi, }; void useCControllerServiceClassDescription(const MinifiControllerServiceClassDefinition& class_description, - const BundleIdentifier& bundle_id, - const std::function& fn); + const std::function& fn); } // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/src/minifi-c.cpp b/libminifi/src/minifi-c.cpp index 90ccb969b7..54a3dba8fb 100644 --- a/libminifi/src/minifi-c.cpp +++ b/libminifi/src/minifi-c.cpp @@ -255,7 +255,7 @@ void useCProcessorClassDescription(const MinifiProcessorClassDefinition& class_d } void useCControllerServiceClassDescription(const MinifiControllerServiceClassDefinition& class_description, - const std::function& fn) { + const std::function& fn) { std::vector properties; properties.reserve(class_description.class_properties_count); for (size_t i = 0; i < class_description.class_properties_count; ++i) { diff --git a/libminifi/test/libtest/unit/TestBase.cpp b/libminifi/test/libtest/unit/TestBase.cpp index fad99f2c43..23a967ae9a 100644 --- a/libminifi/test/libtest/unit/TestBase.cpp +++ b/libminifi/test/libtest/unit/TestBase.cpp @@ -24,30 +24,30 @@ #include #include -#include "core/Processor.h" +#include "Connection.h" +#include "LogUtils.h" +#include "ProvenanceTestHelper.h" +#include "ResourceClaim.h" +#include "TestUtils.h" #include "core/ProcessContextImpl.h" -#include "minifi-cpp/core/PropertyDefinition.h" +#include "core/ProcessSessionFactory.h" +#include "core/Processor.h" +#include "core/controller/StandardControllerServiceNode.h" +#include "core/controller/StandardControllerServiceProvider.h" #include "core/logging/LoggerConfiguration.h" #include "core/state/nodes/FlowInformation.h" -#include "core/controller/StandardControllerServiceProvider.h" -#include "ProvenanceTestHelper.h" +#include "fmt/format.h" +#include "io/StreamPipe.h" +#include "minifi-cpp/core/ProcessContext.h" +#include "minifi-cpp/core/PropertyDefinition.h" +#include "spdlog/sinks/dist_sink.h" +#include "spdlog/sinks/ostream_sink.h" +#include "spdlog/sinks/stdout_sinks.h" #include "utils/ClassUtils.h" -#include "TestUtils.h" +#include "utils/GeneralUtils.h" #include "utils/Id.h" #include "utils/StringUtils.h" #include "utils/span.h" -#include "LogUtils.h" -#include "utils/GeneralUtils.h" -#include "Connection.h" -#include "minifi-cpp/core/ProcessContext.h" -#include "core/ProcessSessionFactory.h" -#include "ResourceClaim.h" -#include "io/StreamPipe.h" - -#include "fmt/format.h" -#include "spdlog/sinks/stdout_sinks.h" -#include "spdlog/sinks/ostream_sink.h" -#include "spdlog/sinks/dist_sink.h" std::shared_ptr LogTestController::getInstance(const std::shared_ptr& logger_properties) { static std::map, std::shared_ptr> map; @@ -360,6 +360,27 @@ minifi::Connection* TestPlan::addConnection(minifi::core::Processor* source_proc return retVal; } +std::shared_ptr TestPlan::addController(const std::string& name, const std::shared_ptr& cs) { + if (finalized) { + return nullptr; + } + std::lock_guard guard(mutex); + + std::shared_ptr controller_service_node = std::make_shared(cs, name, configuration_); + + controller_service_nodes_.push_back(controller_service_node); + + minifi::utils::Identifier uuid = minifi::utils::IdGenerator::getIdGenerator()->generate(); + + controller_service_node->initialize(); + controller_service_node->setUUID(uuid); + + root_process_group_->addControllerService(name, controller_service_node, uuid.to_string()); + controller_services_provider_->putControllerServiceNode(name, controller_service_node, root_process_group_.get(), uuid.to_string()); + + return controller_service_node; +} + std::shared_ptr TestPlan::addController(const std::string &controller_name, const std::string &name) { if (finalized) { return nullptr; @@ -486,7 +507,7 @@ std::shared_ptr TestPlan::getProcessContextForProc } void TestPlan::scheduleProcessor(minifi::core::Processor* processor, const std::shared_ptr& context) { - if (std::find(configured_processors_.begin(), configured_processors_.end(), processor) == configured_processors_.end()) { + if (std::ranges::find(configured_processors_, processor) == configured_processors_.end()) { // Ordering on factories and list of configured processors do not matter const auto factory = std::make_shared(context); factories_.push_back(factory); @@ -507,7 +528,7 @@ void TestPlan::scheduleProcessors() { } } -bool TestPlan::runProcessor(minifi::core::Processor* processor, const PreTriggerVerifier& verify) { +bool TestPlan::runProcessor(const minifi::core::Processor* processor, const PreTriggerVerifier& verify) { const auto processor_location = gsl::narrow(std::distance(processor_queue_.begin(), getProcessorItByUuid(processor->getUUIDStr()))); return runProcessor(processor_location, verify); } @@ -578,7 +599,7 @@ bool TestPlan::runCurrentProcessorUntilFlowfileIsProduced(std::chrono::milliseco const auto isFlowFileProduced = [&] { runCurrentProcessor(); const std::vector connections = getProcessorOutboundConnections(processor_queue_.at(location)); - return std::any_of(connections.cbegin(), connections.cend(), [] (const minifi::Connection* conn) { return !conn->isEmpty(); }); + return std::ranges::any_of(connections, [] (const minifi::Connection* conn) { return !conn->isEmpty(); }); }; return verifyEventHappenedInPollTime(wait_duration, isFlowFileProduced); } @@ -602,8 +623,7 @@ std::shared_ptr TestPlan::getFlowFileProducedByCurrentPr std::vector connections = getProcessorOutboundConnections(processor); for (auto connection : connections) { std::set> expiredFlowRecords; - std::shared_ptr flowfile = connection->poll(expiredFlowRecords); - if (flowfile) { + if (std::shared_ptr flowfile = connection->poll(expiredFlowRecords)) { return flowfile; } if (expiredFlowRecords.empty()) { diff --git a/libminifi/test/libtest/unit/TestBase.h b/libminifi/test/libtest/unit/TestBase.h index f3b85eb633..9e55571944 100644 --- a/libminifi/test/libtest/unit/TestBase.h +++ b/libminifi/test/libtest/unit/TestBase.h @@ -275,6 +275,7 @@ class TestPlan { minifi::Connection* addConnection(minifi::core::Processor* source_proc, const minifi::core::Relationship& source_relationship, minifi::core::Processor* destination_proc); + std::shared_ptr addController(const std::string& name, const std::shared_ptr& cs); std::shared_ptr addController(const std::string &controller_name, const std::string &name); bool setProperty(minifi::core::Processor* processor, const core::PropertyReference& property, std::string_view value); @@ -298,7 +299,7 @@ class TestPlan { void scheduleProcessor(minifi::core::Processor* processor); void scheduleProcessors(); - bool runProcessor(minifi::core::Processor* processor, const PreTriggerVerifier& verify = nullptr); + bool runProcessor(const minifi::core::Processor* processor, const PreTriggerVerifier& verify = nullptr); bool runProcessor(size_t target_location, const PreTriggerVerifier& verify = nullptr); bool runNextProcessor(const PreTriggerVerifier& verify = nullptr); bool runCurrentProcessor(); From d40b59ee748fffdc6281760c72d2afd09cc458f7 Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Wed, 29 Apr 2026 09:21:49 +0200 Subject: [PATCH 2/5] rebase fix --- .../include/api/core/ControllerServiceContext.h | 6 +++--- .../cpp-extension-lib/include/api/core/ProcessContext.h | 4 ++-- .../include/api/utils/ProcessorConfigUtils.h | 2 +- .../src/core/ControllerServiceContext.cpp | 6 +++--- .../cpp-extension-lib/src/core/ProcessContext.cpp | 8 ++++---- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceContext.h b/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceContext.h index b30764304b..561bef0ace 100644 --- a/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceContext.h +++ b/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceContext.h @@ -18,10 +18,10 @@ #pragma once #include +#include #include "minifi-c.h" #include "minifi-cpp/core/PropertyDefinition.h" -#include "nonstd/expected.hpp" namespace org::apache::nifi::minifi::api::core { @@ -29,8 +29,8 @@ class ControllerServiceContext { public: explicit ControllerServiceContext(MinifiControllerServiceContext* impl) : impl_(impl) {} - nonstd::expected getProperty(std::string_view name) const; - nonstd::expected getProperty(const minifi::core::PropertyReference& property_reference) const { + [[nodiscard]] std::expected getProperty(std::string_view name) const; + [[nodiscard]] std::expected getProperty(const minifi::core::PropertyReference& property_reference) const { return getProperty(property_reference.name); } diff --git a/extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h b/extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h index cb0a123325..6f012d719a 100644 --- a/extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h +++ b/extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h @@ -34,9 +34,9 @@ class ProcessContext { std::expected getProperty(const minifi::core::PropertyReference& property_reference, const FlowFile* flow_file = nullptr) const { return getProperty(property_reference.name, flow_file); } - nonstd::expected getControllerService(std::string_view controller_service_name, std::string_view controller_service_class) const; + [[nodiscard]] std::expected getControllerService(std::string_view controller_service_name, std::string_view controller_service_class) const; - bool hasNonEmptyProperty(std::string_view name) const; + [[nodiscard]] bool hasNonEmptyProperty(std::string_view name) const; private: MinifiProcessContext* impl_; diff --git a/extension-framework/cpp-extension-lib/include/api/utils/ProcessorConfigUtils.h b/extension-framework/cpp-extension-lib/include/api/utils/ProcessorConfigUtils.h index 802ae02fc9..a73dd75498 100644 --- a/extension-framework/cpp-extension-lib/include/api/utils/ProcessorConfigUtils.h +++ b/extension-framework/cpp-extension-lib/include/api/utils/ProcessorConfigUtils.h @@ -174,7 +174,7 @@ ControllerServiceType* parseOptionalControllerService(const core::ProcessContext return nullptr; } - nonstd::expected service = context.getControllerService(*controller_service_name, minifi::core::className()); + auto service = context.getControllerService(*controller_service_name, minifi::core::className()); if (!service) { return nullptr; } diff --git a/extension-framework/cpp-extension-lib/src/core/ControllerServiceContext.cpp b/extension-framework/cpp-extension-lib/src/core/ControllerServiceContext.cpp index a3b9254835..089112c042 100644 --- a/extension-framework/cpp-extension-lib/src/core/ControllerServiceContext.cpp +++ b/extension-framework/cpp-extension-lib/src/core/ControllerServiceContext.cpp @@ -20,7 +20,7 @@ namespace org::apache::nifi::minifi::api::core { -nonstd::expected ControllerServiceContext::getProperty(const std::string_view name) const { +std::expected ControllerServiceContext::getProperty(const std::string_view name) const { std::optional value = std::nullopt; const MinifiStatus status = MinifiControllerServiceContextGetProperty(impl_, utils::toStringView(name), [] (void* data, const MinifiStringView result) { @@ -28,11 +28,11 @@ nonstd::expected ControllerServiceContext::getProp }, &value); if (status != MINIFI_STATUS_SUCCESS) { - return nonstd::make_unexpected(utils::make_error_code(status)); + return std::unexpected{utils::make_error_code(status)}; } if (!value) { - return nonstd::make_unexpected(utils::make_error_code(MINIFI_STATUS_UNKNOWN_ERROR)); + return std::unexpected{utils::make_error_code(MINIFI_STATUS_UNKNOWN_ERROR)}; } return value.value(); } diff --git a/extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp b/extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp index 7f164d3f3c..d260b9c0c9 100644 --- a/extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp +++ b/extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp @@ -38,17 +38,17 @@ bool ProcessContext::hasNonEmptyProperty(std::string_view name) const { return MinifiProcessContextHasNonEmptyProperty(impl_, utils::toStringView(name)); } -nonstd::expected ProcessContext::getControllerService(const std::string_view controller_service_name, +std::expected ProcessContext::getControllerService(const std::string_view controller_service_name, const std::string_view controller_service_class) const { - void* controller_service = nullptr; + MinifiControllerService* controller_service = nullptr; if (const MinifiStatus status = MinifiProcessContextGetControllerService(impl_, utils::toStringView(controller_service_name), utils::toStringView(controller_service_class), &controller_service); status != MINIFI_STATUS_SUCCESS) { - return nonstd::make_unexpected(utils::make_error_code(status)); + return std::unexpected{utils::make_error_code(status)}; } - return static_cast(controller_service); + return controller_service; } } // namespace org::apache::nifi::minifi::api::core From 0bb815eed51377c02b5fcb99ef31818860c0e926 Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Wed, 29 Apr 2026 09:46:41 +0200 Subject: [PATCH 3/5] useProcessorClassDescription -> useProcessorClassDefinition --- Extensions.md | 4 ++-- .../cpp-extension-lib/include/api/core/Resource.h | 14 +++++++------- .../libtest/CProcessorTestUtils.h | 8 ++++---- .../llamacpp/processors/ExtensionInitializer.cpp | 4 ++-- .../extension-verification-test/CApiExtension.cpp | 4 ++-- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/Extensions.md b/Extensions.md index 51077009fe..81df822a1a 100644 --- a/Extensions.md +++ b/Extensions.md @@ -48,8 +48,8 @@ extern "C" void MinifiInitExtension(MinifiExtensionContext* extension_context) { .user_data = nullptr }; auto* extension = MinifiRegisterExtension(extension_context, &extension_definition); - minifi::api::core::useProcessorClassDescription([&] (const MinifiProcessorClassDefinition& description) { - MinifiRegisterProcessor(extension, &description); + minifi::api::core::useProcessorClassDefinition([&] (const MinifiProcessorClassDefinition& definition) { + MinifiRegisterProcessor(extension, &definition); }); } ``` diff --git a/extension-framework/cpp-extension-lib/include/api/core/Resource.h b/extension-framework/cpp-extension-lib/include/api/core/Resource.h index c9777330bc..2a24f90e06 100644 --- a/extension-framework/cpp-extension-lib/include/api/core/Resource.h +++ b/extension-framework/cpp-extension-lib/include/api/core/Resource.h @@ -40,7 +40,7 @@ namespace org::apache::nifi::minifi::api::core { template -void useProcessorClassDescription(Fn&& fn) { +void useProcessorClassDefinition(Fn&& fn) { std::vector> string_vector_cache; const auto full_name = minifi::core::className(); @@ -78,7 +78,7 @@ void useProcessorClassDescription(Fn&& fn) { attribute_relationships_cache.push_back(std::move(rel_cache)); } - MinifiProcessorClassDefinition description{ + MinifiProcessorClassDefinition definition{ .full_name = utils::toStringView(full_name), .description = utils::toStringView(Class::Description), .class_properties_count = gsl::narrow(class_properties.size()), @@ -147,18 +147,18 @@ void useProcessorClassDescription(Fn&& fn) { } }; - fn(description); + fn(definition); } template -void useControllerServiceClassDescription(Fn&& fn) { +void useControllerServiceClassDefinition(Fn&& fn) { std::vector> string_vector_cache; const auto full_name = minifi::core::className(); std::vector class_properties = utils::toProperties(Class::Properties, string_vector_cache); - MinifiControllerServiceClassDefinition description{.full_name = utils::toStringView(full_name), + MinifiControllerServiceClassDefinition definition{.full_name = utils::toStringView(full_name), .description = utils::toStringView(Class::Description), .class_properties_count = gsl::narrow(class_properties.size()), .class_properties_ptr = class_properties.data(), @@ -179,14 +179,14 @@ void useControllerServiceClassDescription(Fn&& fn) { return static_cast(self)->enable(context_wrapper); } catch (...) { return MINIFI_STATUS_UNKNOWN_ERROR; } }, - .notifyStop = [](void* self) -> void { + .disable = [](void* self) -> void { try { static_cast(self)->notifyStop(); } catch (...) {} }, }}; - fn(description); + fn(definition); } } // namespace org::apache::nifi::minifi::api::core diff --git a/extension-framework/cpp-extension-lib/libtest/CProcessorTestUtils.h b/extension-framework/cpp-extension-lib/libtest/CProcessorTestUtils.h index cc61aea1a7..2defdff5c8 100644 --- a/extension-framework/cpp-extension-lib/libtest/CProcessorTestUtils.h +++ b/extension-framework/cpp-extension-lib/libtest/CProcessorTestUtils.h @@ -32,8 +32,8 @@ template std::unique_ptr make_custom_c_processor(minifi::core::ProcessorMetadata metadata, Args&&... args) { // NOLINT(cppcoreguidelines-missing-std-forward) std::unique_ptr processor_impl; - minifi::api::core::useProcessorClassDescription([&](const MinifiProcessorClassDefinition& description) { - minifi::utils::useCProcessorClassDescription(description, [&](const auto&, auto c_description) { + minifi::api::core::useProcessorClassDefinition([&](const MinifiProcessorClassDefinition& definition) { + minifi::utils::useCProcessorClassDescription(definition, [&](const auto&, auto c_description) { processor_impl = std::make_unique(std::move(c_description), metadata, new T(metadata, std::forward(args)...)); }); }); @@ -44,8 +44,8 @@ template std::shared_ptr make_custom_c_controller_service(minifi::core::ControllerServiceMetadata metadata, Args&&... args) { // NOLINT(cppcoreguidelines-missing-std-forward) std::unique_ptr controller_service_impl; - minifi::api::core::useControllerServiceClassDescription([&](const MinifiControllerServiceClassDefinition& description) { - minifi::utils::useCControllerServiceClassDescription(description, [&](const auto&, auto c_description) { + minifi::api::core::useControllerServiceClassDefinition([&](const MinifiControllerServiceClassDefinition& definition) { + minifi::utils::useCControllerServiceClassDescription(definition, [&](const auto&, auto c_description) { controller_service_impl = std::make_unique(std::move(c_description), metadata, new T(metadata, std::forward(args)...)); diff --git a/extensions/llamacpp/processors/ExtensionInitializer.cpp b/extensions/llamacpp/processors/ExtensionInitializer.cpp index f11b539568..85d610a7d9 100644 --- a/extensions/llamacpp/processors/ExtensionInitializer.cpp +++ b/extensions/llamacpp/processors/ExtensionInitializer.cpp @@ -34,7 +34,7 @@ CEXTENSIONAPI void MinifiInitExtension(MinifiExtensionContext* extension_context .user_data = nullptr }; auto* extension = MinifiRegisterExtension(extension_context, &extension_definition); - minifi::api::core::useProcessorClassDescription([&] (const MinifiProcessorClassDefinition& description) { - MinifiRegisterProcessor(extension, &description); + minifi::api::core::useProcessorClassDefinition([&] (const MinifiProcessorClassDefinition& definition) { + MinifiRegisterProcessor(extension, &definition); }); } diff --git a/libminifi/test/integration/extension-verification-test/CApiExtension.cpp b/libminifi/test/integration/extension-verification-test/CApiExtension.cpp index 2b2b981ae8..546a229056 100644 --- a/libminifi/test/integration/extension-verification-test/CApiExtension.cpp +++ b/libminifi/test/integration/extension-verification-test/CApiExtension.cpp @@ -50,7 +50,7 @@ CEXTENSIONAPI void MinifiInitExtension(MinifiExtensionContext* extension_context .user_data = nullptr }; auto extension = MinifiRegisterExtension(extension_context, &extension_definition); - minifi::api::core::useProcessorClassDescription([&] (const MinifiProcessorClassDefinition& description) { - MinifiRegisterProcessor(extension, &description); + minifi::api::core::useProcessorClassDefinition([&] (const MinifiProcessorClassDefinition& definition) { + MinifiRegisterProcessor(extension, &definition); }); } From a3960293ebcebfa01feb2aff7c2aed8f01349f87 Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Tue, 5 May 2026 09:28:22 +0200 Subject: [PATCH 4/5] add explanatory comments to ControllerServiceImpl.h --- .../include/api/core/ControllerServiceImpl.h | 6 ++++-- .../cpp-extension-lib/include/api/core/Resource.h | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceImpl.h b/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceImpl.h index 77fe1e3427..b4b7473848 100644 --- a/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceImpl.h +++ b/extension-framework/cpp-extension-lib/include/api/core/ControllerServiceImpl.h @@ -30,6 +30,8 @@ class Connection; namespace core { +/// Helper class that makes implementing the stable API easier +/// It is used in conjunction with useControllerServiceClassDefinition class ControllerServiceImpl { public: explicit ControllerServiceImpl(minifi::core::ControllerServiceMetadata metadata); @@ -42,7 +44,7 @@ class ControllerServiceImpl { virtual ~ControllerServiceImpl(); MinifiStatus enable(ControllerServiceContext&); - void notifyStop(); + void disable(); [[nodiscard]] std::string getName() const; [[nodiscard]] minifi::utils::Identifier getUUID() const; @@ -50,7 +52,7 @@ class ControllerServiceImpl { protected: virtual MinifiStatus enableImpl(api::core::ControllerServiceContext&) = 0; - virtual void notifyStopImpl() {} + virtual void disableImpl() {} minifi::core::ControllerServiceMetadata metadata_; diff --git a/extension-framework/cpp-extension-lib/include/api/core/Resource.h b/extension-framework/cpp-extension-lib/include/api/core/Resource.h index 2a24f90e06..0f98a8d9e0 100644 --- a/extension-framework/cpp-extension-lib/include/api/core/Resource.h +++ b/extension-framework/cpp-extension-lib/include/api/core/Resource.h @@ -181,7 +181,7 @@ void useControllerServiceClassDefinition(Fn&& fn) { }, .disable = [](void* self) -> void { try { - static_cast(self)->notifyStop(); + static_cast(self)->disable(); } catch (...) {} }, }}; From ee86904b425a115f055cef73bb72cf7816adcdbd Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Tue, 5 May 2026 10:36:57 +0200 Subject: [PATCH 5/5] notifyStop -> disable in CPP API aswell --- .../cpp-extension-lib/src/core/ControllerServiceImpl.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extension-framework/cpp-extension-lib/src/core/ControllerServiceImpl.cpp b/extension-framework/cpp-extension-lib/src/core/ControllerServiceImpl.cpp index b17dd77424..920dabf97f 100644 --- a/extension-framework/cpp-extension-lib/src/core/ControllerServiceImpl.cpp +++ b/extension-framework/cpp-extension-lib/src/core/ControllerServiceImpl.cpp @@ -41,9 +41,9 @@ MinifiStatus ControllerServiceImpl::enable(ControllerServiceContext& ctx) { } } -void ControllerServiceImpl::notifyStop() { +void ControllerServiceImpl::disable() { try { - notifyStopImpl(); + disableImpl(); } catch (const std::exception& e) { logger_->log_error("{}", e.what()); throw;