Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions controller/MiNiFiController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "c2/ControllerSocketProtocol.h"
#include "controllers/SSLContextService.h"
#include "core/ConfigurationFactory.h"
#include "minifi-cpp/core/controller/ControllerService.h"
#include "core/extension/ExtensionManager.h"
#include "properties/Configure.h"
#include "range/v3/algorithm/contains.hpp"
Expand All @@ -42,8 +41,7 @@ std::shared_ptr<minifi::controllers::SSLContextServiceInterface> getSSLContextSe
std::shared_ptr<minifi::controllers::SSLContextServiceInterface> secure_context;
std::string secure_str;
if (configuration->get(minifi::Configure::nifi_remote_input_secure, secure_str) && minifi::utils::string::toBool(secure_str).value_or(false)) {
secure_context = std::make_shared<minifi::controllers::SSLContextService>("ControllerSocketProtocolSSL", configuration);
secure_context->onEnable();
secure_context = minifi::controllers::SSLContextService::createAndEnable("ControllerSocketProtocolSSL", configuration);
}

return secure_context;
Expand Down
3 changes: 1 addition & 2 deletions controller/tests/ControllerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,7 @@ class ControllerTestFixture {
configuration_->set(minifi::Configure::nifi_security_client_private_key, (minifi::utils::file::FileUtils::get_executable_dir() / "resources" / "minifi-cpp-flow.key").string());
configuration_->set(minifi::Configure::nifi_security_client_pass_phrase, "abcdefgh");
configuration_->set(minifi::Configure::nifi_security_client_ca_certificate, (minifi::utils::file::FileUtils::get_executable_dir() / "resources" / "root-ca.pem").string());
ssl_context_service_ = std::make_shared<controllers::SSLContextService>("SSLContextService", configuration_);
ssl_context_service_->onEnable();
ssl_context_service_ = controllers::SSLContextService::createAndEnable("SSLContextService", configuration_);
controller_socket_data_.host = "localhost";
controller_socket_data_.port = 9997;
}
Expand Down
6 changes: 6 additions & 0 deletions core-framework/include/core/Resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "utils/OptionalUtils.h"
#include "utils/Macro.h"
#include "core/ProcessorFactoryImpl.h"
#include "core/controller/ControllerServiceFactoryImpl.h"
#include "core/ObjectFactory.h"

namespace org::apache::nifi::minifi::core {
Expand Down Expand Up @@ -61,6 +62,11 @@ class StaticClassType {
auto factory = std::unique_ptr<ProcessorFactory>(new ProcessorFactoryImpl<Class>(module_name));
getClassLoader().registerClass(construction_name, std::move(factory));
}
} else if constexpr (Type == ResourceType::ControllerService) {
for (const auto& construction_name : construction_names_) {
auto factory = std::unique_ptr<controller::ControllerServiceFactory>(new controller::ControllerServiceFactoryImpl<Class>(module_name));
getClassLoader().registerClass(construction_name, std::move(factory));
}
} else {
for (const auto& construction_name : construction_names_) {
auto factory = std::unique_ptr<ObjectFactory>(new DefaultObjectFactory<Class>(module_name));
Expand Down
121 changes: 0 additions & 121 deletions core-framework/include/core/controller/ControllerService.h

This file was deleted.

116 changes: 116 additions & 0 deletions core-framework/include/core/controller/ControllerServiceBase.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "minifi-cpp/properties/Configure.h"
#include "core/Core.h"
#include "core/ConfigurableComponentImpl.h"
#include "core/Connectable.h"
#include "minifi-cpp/core/controller/ControllerServiceApi.h"
#include "minifi-cpp/core/controller/ControllerServiceInterface.h"
#include "minifi-cpp/core/ControllerServiceApiDefinition.h"
#include "minifi-cpp/core/controller/ControllerServiceMetadata.h"

namespace org::apache::nifi::minifi::core::controller {

/**
* Controller Service base class that contains some pure virtual methods.
*
* Design: OnEnable is executed when the controller service is being enabled.
* Note that keeping state here must be protected in this function.
*/
class ControllerServiceBase : public ControllerServiceApi {
public:
explicit ControllerServiceBase(ControllerServiceMetadata metadata)
: name_(std::move(metadata.name)),
uuid_(metadata.uuid),
logger_(std::move(metadata.logger)) {}

virtual void initialize() {}

void initialize(ControllerServiceDescriptor& descriptor) final {
gsl_Expects(!descriptor_);
descriptor_ = &descriptor;
auto guard = gsl::finally([&] {descriptor_ = nullptr;});
initialize();
}

void setSupportedProperties(std::span<const PropertyReference> properties) {
gsl_Expects(descriptor_);
descriptor_->setSupportedProperties(properties);
}

~ControllerServiceBase() override {}

virtual void onEnable() {}

/**
* Function is called when Controller Services are enabled and being run
*/
void onEnable(ControllerServiceContext& context, const std::shared_ptr<Configure>& configuration, const std::vector<std::shared_ptr<ControllerServiceInterface>>& linked_services) final {
configuration_ = configuration;
linked_services_ = linked_services;
gsl_Expects(!context_);
context_ = &context;
auto guard = gsl::finally([&] {context_ = nullptr;});
onEnable();
}

[[nodiscard]] nonstd::expected<std::string, std::error_code> getProperty(std::string_view name) const {
gsl_Expects(context_);
return context_->getProperty(name);
}

[[nodiscard]] nonstd::expected<std::vector<std::string>, std::error_code> getAllPropertyValues(std::string_view name) const {
gsl_Expects(context_);
return context_->getAllPropertyValues(name);
}

/**
* Function is called when Controller Services are disabled
*/
void notifyStop() override {}

std::string getName() const {
return name_;
}

utils::Identifier getUUID() const {
return uuid_;
}


static constexpr auto ImplementsApis = std::array<ControllerServiceApiDefinition, 0>{};

protected:
std::string name_;
utils::Identifier uuid_;
std::vector<std::shared_ptr<controller::ControllerServiceInterface> > linked_services_;
std::shared_ptr<Configure> configuration_;
ControllerServiceDescriptor* descriptor_{nullptr};
ControllerServiceContext* context_{nullptr};

std::shared_ptr<core::logging::Logger> logger_;
};

} // namespace org::apache::nifi::minifi::core::controller
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <string>
#include <memory>
#include <utility>
#include "core/ClassName.h"
#include "minifi-cpp/core/controller/ControllerServiceFactory.h"

namespace org::apache::nifi::minifi::core::controller {

template<class T>
class ControllerServiceFactoryImpl : public ControllerServiceFactory {
public:
ControllerServiceFactoryImpl()
: class_name_(core::className<T>()) {
}

explicit ControllerServiceFactoryImpl(std::string group_name)
: group_name_(std::move(group_name)),
class_name_(core::className<T>()) {
}

std::string getGroupName() const override {
return group_name_;
}

std::unique_ptr<ControllerServiceApi> create(ControllerServiceMetadata metadata) override {
return std::make_unique<T>(metadata);
}

std::string getClassName() const override {
return std::string{class_name_};
}

protected:
std::string group_name_;
std::string_view class_name_;
};

} // namespace org::apache::nifi::minifi::core::controller
9 changes: 5 additions & 4 deletions core-framework/include/utils/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#include "Monitors.h"
#include "core/expect.h"
#include "minifi-cpp/controllers/ThreadManagementService.h"
#include "minifi-cpp/core/controller/ControllerServiceLookup.h"
#include "minifi-cpp/core/logging/Logger.h"

namespace org::apache::nifi::minifi::utils {
Expand Down Expand Up @@ -137,8 +136,10 @@ class WorkerThread {
*/
class ThreadPool {
public:
using ControllerServiceProvider = std::function<std::shared_ptr<core::controller::ControllerServiceInterface>(std::string_view)>;

ThreadPool(int max_worker_threads = 2,
core::controller::ControllerServiceLookup* controller_service_provider = nullptr, std::string name = "NamelessPool");
ControllerServiceProvider controller_service_provider = nullptr, std::string name = "NamelessPool");

ThreadPool(const ThreadPool &other) = delete;
ThreadPool& operator=(const ThreadPool &other) = delete;
Expand Down Expand Up @@ -231,7 +232,7 @@ class ThreadPool {
start();
}

void setControllerServiceProvider(core::controller::ControllerServiceLookup* controller_service_provider) {
void setControllerServiceProvider(ControllerServiceProvider controller_service_provider) {
std::lock_guard<std::recursive_mutex> lock(manager_mutex_);
bool was_running = running_;
if (was_running) {
Expand Down Expand Up @@ -272,7 +273,7 @@ class ThreadPool {
std::thread manager_thread_;
std::thread delayed_scheduler_thread_;
std::atomic<bool> running_;
core::controller::ControllerServiceLookup* controller_service_provider_;
ControllerServiceProvider controller_service_provider_;
std::shared_ptr<controllers::ThreadManagementService> thread_manager_;
ConcurrentQueue<std::shared_ptr<WorkerThread>> deceased_thread_queue_;
ConditionConcurrentQueue<Worker> worker_queue_;
Expand Down
Loading
Loading