Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions Extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,36 @@ To enable all extensions for your platform, you may use -DENABLE_ALL=TRUE OR sel
Extensions are dynamic libraries loaded at runtime by the agent.

## C extensions
You can build a shared library depending on the C capabilities of the agent as given in the `minifi-c.h` file.
For the shared library to be considered a valid extension, it has to have a global symbol with the name `MinifiCApiVersion`
with its value as a null terminated string (`const char*`) of the macro `MINIFI_API_VERSION` from `minifi-c.h`.
You can build shared libraries using the API defined in `minifi-c.h`
For the shared library to be considered a valid extension, it must export a global symbol with the name `MinifiApiVersion`
with its value equal to the uint32_t constant `MINIFI_API_VERSION` from `minifi-c.h`.

Moreover the actual resource registration (processors/controller services) has to happen during the `MinifiInitExtension` call.
### Resource Lifetime

Unless otherwise specified, the following lifetime rules apply to all functions called by the agent (e.g., `MinifiInitExtension`, `MinifiProcessorCallbacks::onTrigger`, or other callbacks):

* Arguments: The lifetime of any resource provided as a function argument is limited to the duration of that function call.

* Created Resources: The lifetime of resources created within these functions (e.g., a handle returned by `MinifiProcessSessionGet` inside `MinifiProcessorCallbacks::onTrigger`)
is limited to the scope of the innermost callback.
(the return value of `MinifiRegisterExtension` is only valid during the execution of `MinifiInitExtension`).

Because of these scoping rules, all processor and controller service registrations must occur within the `MinifiInitExtension` call.
One possible example of this is:

```C++
extern "C" const char* const MinifiApiVersion = MINIFI_API_VERSION;
extern "C" const uint32_t MinifiApiVersion = MINIFI_API_VERSION;

extern "C" void MinifiInitExtension(MinifiExtension* extension, MinifiConfig* /*config*/) {
extern "C" void MinifiInitExtension(MinifiExtensionContext* extension_context) {
MinifiExtensionDefinition extension_definition{
.name = minifi::api::utils::toStringView(MAKESTRING(EXTENSION_NAME)),
.version = minifi::api::utils::toStringView(MAKESTRING(EXTENSION_VERSION)),
.deinit = nullptr,
.user_data = nullptr
};
auto* extension = MinifiRegisterExtension(extension_context, &extension_definition);
minifi::api::core::useProcessorClassDescription<minifi::extensions::llamacpp::processors::RunLlamaCppInference>([&] (const MinifiProcessorClassDefinition& description) {
MinifiExtensionCreateInfo ext_create_info{
.name = minifi::api::utils::toStringView(MAKESTRING(EXTENSION_NAME)),
.version = minifi::api::utils::toStringView(MAKESTRING(EXTENSION_VERSION)),
.deinit = nullptr,
.user_data = nullptr,
.processors_count = 1,
.processors_ptr = &description,
};
MinifiCreateExtension(extension, &ext_create_info);
MinifiRegisterProcessor(extension, &description);
});
Comment thread
adamdebreceni marked this conversation as resolved.
}
```
Expand All @@ -64,23 +73,23 @@ REGISTER_RESOURCE(RESTSender, DescriptionOnly);
```

Some extensions (e.g. `OpenCVExtension`) require initialization before use.
You need to define an `MinifiInitCppExtension` function of type `MinifiExtension*(MinifiConfig*)` to be called.
You need to define an `MinifiInitCppExtension` function of type `MinifiExtension*(MinifiExtensionContext*)` to be called.

```C++
extern "C" void MinifiInitCppExtension(MinifiExtension* extension, MinifiConfig* /*config*/) {
extern "C" void MinifiInitCppExtension(MinifiExtensionContext* /*extension_context*/) {
const auto success = org::apache::nifi::minifi::utils::Environment::setEnvironmentVariable("OPENCV_FFMPEG_CAPTURE_OPTIONS", "rtsp_transport;udp", false /*overwrite*/);
if (!success) {
return nullptr;
}
MinifiExtensionCreateInfo ext_create_info{
MinifiExtensionDefinition extension_definition{
.name = minifi::utils::toStringView(MAKESTRING(MODULE_NAME)),
.version = minifi::utils::toStringView(minifi::AgentBuild::VERSION),
.deinit = nullptr,
.user_data = nullptr,
.processors_count = 0,
.processors_ptr = nullptr
};
minifi::utils::MinifiCreateCppExtension(extension, &ext_create_info);
minifi::utils::MinifiRegisterCppExtension(extension, &extension_definition);
}
```

Expand Down
4 changes: 2 additions & 2 deletions extension-framework/include/utils/ExtensionInitUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ inline MinifiStringView toStringView(std::string_view str) {

using ConfigReader = std::function<std::optional<std::string>(std::string_view key)>;

static inline void MinifiCreateCppExtension(MinifiExtension* extension, const MinifiExtensionCreateInfo* create_info) {
MINIFI_CREATE_EXTENSION_FN(extension, create_info);
static inline void MinifiRegisterCppExtension(MinifiExtensionContext* extension_context, const MinifiExtensionDefinition* extension_definition) {
MINIFI_REGISTER_EXTENSION_FN(extension_context, extension_definition);
}

} // namespace org::apache::nifi::minifi::utils
10 changes: 4 additions & 6 deletions extensions/ExtensionInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@

namespace minifi = org::apache::nifi::minifi;

extern "C" void MinifiInitCppExtension(MinifiExtension* extension, MinifiConfig* /*config*/) {
MinifiExtensionCreateInfo ext_create_info{
extern "C" void MinifiInitCppExtension(MinifiExtensionContext* extension_context) {
MinifiExtensionDefinition extension_definition{
.name = minifi::utils::toStringView(MAKESTRING(MODULE_NAME)),
.version = minifi::utils::toStringView(minifi::AgentBuild::VERSION),
.deinit = nullptr,
.user_data = nullptr,
.processors_count = 0,
.processors_ptr = nullptr
.user_data = nullptr
};
minifi::utils::MinifiCreateCppExtension(extension, &ext_create_info);
minifi::utils::MinifiRegisterCppExtension(extension_context, &extension_definition);
}
10 changes: 4 additions & 6 deletions extensions/aws/AwsLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,17 @@ void deinit(gsl::owner<void*> sdk_opts_ptr) {
}
} // namespace org::apache::nifi::minifi::aws::init

extern "C" void MinifiInitCppExtension(MinifiExtension* extension, MinifiConfig*) {
extern "C" void MinifiInitCppExtension(MinifiExtensionContext* extension_context) {
using minifi::aws::init::toStringView;
auto sdk_options = std::make_unique<Aws::SDKOptions>();
Aws::InitAPI(*sdk_options);
Aws::Utils::Logging::InitializeAWSLogging(std::make_shared<minifi::aws::utils::AWSSdkLogger>());

MinifiExtensionCreateInfo ext_create_info{.name = toStringView(MAKESTRING(MODULE_NAME)),
MinifiExtensionDefinition ext_definition{.name = toStringView(MAKESTRING(MODULE_NAME)),
.version = toStringView(minifi::AgentBuild::VERSION),
.deinit = &minifi::aws::init::deinit,
.user_data = sdk_options.get(),
.processors_count = 0,
.processors_ptr = nullptr};
.user_data = sdk_options.get()};

minifi::utils::MinifiCreateCppExtension(extension, &ext_create_info);
minifi::utils::MinifiRegisterCppExtension(extension_context, &ext_definition);
std::ignore = sdk_options.release(); // ownership is transferred to deinit
}
19 changes: 9 additions & 10 deletions extensions/llamacpp/processors/ExtensionInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@ namespace minifi = org::apache::nifi::minifi;

CEXTENSIONAPI const uint32_t MinifiApiVersion = MINIFI_API_VERSION;

CEXTENSIONAPI void MinifiInitExtension(MinifiExtension* extension, MinifiConfig* /*config*/) {
CEXTENSIONAPI void MinifiInitExtension(MinifiExtensionContext* extension_context) {
MinifiExtensionDefinition extension_definition{
.name = minifi::api::utils::toStringView(MAKESTRING(EXTENSION_NAME)),
.version = minifi::api::utils::toStringView(MAKESTRING(EXTENSION_VERSION)),
.deinit = nullptr,
.user_data = nullptr
};
auto* extension = MinifiRegisterExtension(extension_context, &extension_definition);
minifi::api::core::useProcessorClassDescription<minifi::extensions::llamacpp::processors::RunLlamaCppInference>([&] (const MinifiProcessorClassDefinition& description) {
MinifiExtensionCreateInfo ext_create_info{
.name = minifi::api::utils::toStringView(MAKESTRING(EXTENSION_NAME)),
.version = minifi::api::utils::toStringView(MAKESTRING(EXTENSION_VERSION)),
.deinit = nullptr,
.user_data = nullptr,
.processors_count = 1,
.processors_ptr = &description,
};
MinifiCreateExtension(extension, &ext_create_info);
MinifiRegisterProcessor(extension, &description);
});
}
10 changes: 4 additions & 6 deletions extensions/opencv/OpenCVLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

namespace minifi = org::apache::nifi::minifi;

extern "C" void MinifiInitCppExtension(MinifiExtension* extension, MinifiConfig* /*config*/) {
extern "C" void MinifiInitCppExtension(MinifiExtensionContext* extension_context) {
// By default in OpenCV, ffmpeg capture is hardcoded to use TCP and this is a workaround
// also if UDP timeout, ffmpeg will retry with TCP
// Note:
Expand All @@ -34,13 +34,11 @@ extern "C" void MinifiInitCppExtension(MinifiExtension* extension, MinifiConfig*
if (!success) {
return;
}
MinifiExtensionCreateInfo ext_create_info{
MinifiExtensionDefinition extension_definition{
.name = minifi::utils::toStringView(MAKESTRING(MODULE_NAME)),
.version = minifi::utils::toStringView(minifi::AgentBuild::VERSION),
.deinit = nullptr,
.user_data = nullptr,
.processors_count = 0,
.processors_ptr = nullptr
.user_data = nullptr
};
minifi::utils::MinifiCreateCppExtension(extension, &ext_create_info);
minifi::utils::MinifiRegisterCppExtension(extension_context, &extension_definition);
}
12 changes: 5 additions & 7 deletions extensions/python/pythonlibloader/PythonLibLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,19 @@ class PythonLibLoader {
std::shared_ptr<minifi::core::logging::Logger> logger_ = minifi::core::logging::LoggerFactory<PythonLibLoader>::getLogger();
};

extern "C" void MinifiInitCppExtension(MinifiExtension* extension, MinifiConfig* config) {
extern "C" void MinifiInitCppExtension(MinifiExtensionContext* extension_context) {
static PythonLibLoader python_lib_loader([&] (std::string_view key) -> std::optional<std::string> {
std::optional<std::string> result;
MinifiConfigGet(config, minifi::utils::toStringView(key), [] (void* user_data, MinifiStringView value) {
MinifiConfigGet(extension_context, minifi::utils::toStringView(key), [] (void* user_data, MinifiStringView value) {
*static_cast<std::optional<std::string>*>(user_data) = std::string{value.data, value.length};
}, &result);
return result;
});
MinifiExtensionCreateInfo ext_create_info{
MinifiExtensionDefinition extension_definition{
.name = minifi::utils::toStringView(MAKESTRING(MODULE_NAME)),
.version = minifi::utils::toStringView(minifi::AgentBuild::VERSION),
.deinit = nullptr,
.user_data = nullptr,
.processors_count = 0,
.processors_ptr = nullptr
.user_data = nullptr
};
minifi::utils::MinifiCreateCppExtension(extension, &ext_create_info);
minifi::utils::MinifiRegisterCppExtension(extension_context, &extension_definition);
}
12 changes: 5 additions & 7 deletions extensions/python/pythonloader/PyProcLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,19 @@ static minifi::extensions::python::PythonCreator& getPythonCreator() {
// the symbols of the python library
extern "C" const int LOAD_MODULE_AS_GLOBAL = 1;

extern "C" void MinifiInitCppExtension(MinifiExtension* extension, MinifiConfig* config) {
extern "C" void MinifiInitCppExtension(MinifiExtensionContext* extension_context) {
getPythonCreator().configure([&] (std::string_view key) -> std::optional<std::string> {
std::optional<std::string> result;
MinifiConfigGet(config, minifi::utils::toStringView(key), [] (void* user_data, MinifiStringView value) {
MinifiConfigGet(extension_context, minifi::utils::toStringView(key), [] (void* user_data, MinifiStringView value) {
*static_cast<std::optional<std::string>*>(user_data) = std::string{value.data, value.length};
}, &result);
return result;
});
MinifiExtensionCreateInfo ext_create_info{
MinifiExtensionDefinition extension_definition{
.name = minifi::utils::toStringView(MAKESTRING(MODULE_NAME)),
.version = minifi::utils::toStringView(minifi::AgentBuild::VERSION),
.deinit = nullptr,
.user_data = nullptr,
.processors_count = 0,
.processors_ptr = nullptr
.user_data = nullptr
};
minifi::utils::MinifiCreateCppExtension(extension, &ext_create_info);
minifi::utils::MinifiRegisterCppExtension(extension_context, &extension_definition);
}
10 changes: 4 additions & 6 deletions extensions/sftp/SFTPLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,22 @@

namespace minifi = org::apache::nifi::minifi;

extern "C" void MinifiInitCppExtension(MinifiExtension* extension, MinifiConfig* /*config*/) {
extern "C" void MinifiInitCppExtension(MinifiExtensionContext* extension_context) {
if (libssh2_init(0) != 0) {
return;
}
if (curl_global_init(CURL_GLOBAL_DEFAULT) != CURLE_OK) {
libssh2_exit();
return;
}
MinifiExtensionCreateInfo ext_create_info{
MinifiExtensionDefinition extension_definition{
.name = minifi::utils::toStringView(MAKESTRING(MODULE_NAME)),
.version = minifi::utils::toStringView(minifi::AgentBuild::VERSION),
.deinit = [] (void* /*user_data*/) {
curl_global_cleanup();
libssh2_exit();
},
.user_data = nullptr,
.processors_count = 0,
.processors_ptr = nullptr
.user_data = nullptr
};
minifi::utils::MinifiCreateCppExtension(extension, &ext_create_info);
minifi::utils::MinifiRegisterCppExtension(extension_context, &extension_definition);
}
11 changes: 10 additions & 1 deletion libminifi/include/core/extension/Extension.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
#pragma once

#include <filesystem>
#include <functional>
#include <map>
#include <memory>
#include <optional>
#include <string>

#include "minifi-c/minifi-c.h"
Expand All @@ -40,6 +42,11 @@ class Extension {
void* user_data;
};

struct Context {
std::shared_ptr<minifi::Configure> config;
std::function<Extension*(Info)> create;
};
Comment thread
adamdebreceni marked this conversation as resolved.

Extension(std::string name, std::filesystem::path library_path);

Extension(const Extension&) = delete;
Expand All @@ -51,7 +58,9 @@ class Extension {

bool initialize(const std::shared_ptr<minifi::Configure>& configure);

bool setInfo(Info info);
std::optional<Info> getInfo() const {
return info_;
}

private:
#ifdef WIN32
Expand Down
25 changes: 13 additions & 12 deletions libminifi/src/core/extension/Extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,6 @@ Extension::~Extension() {
#endif
}

bool Extension::setInfo(Info info) {
if (info_) {
return false;
}
info_ = std::move(info);
return true;
}

bool Extension::initialize(const std::shared_ptr<minifi::Configure>& configure) {
logger_->log_trace("Initializing extension '{}'", library_name_);
void* init_symbol_ptr = findSymbol("MinifiInitCppExtension");
Expand All @@ -158,10 +150,19 @@ bool Extension::initialize(const std::shared_ptr<minifi::Configure>& configure)
return false;
}
logger_->log_debug("Found initializer for '{}'", library_name_);
auto init_fn = reinterpret_cast<void(*)(MinifiExtension*, MinifiConfig*)>(init_symbol_ptr);
auto config_handle = reinterpret_cast<MinifiConfig*>(configure.get());
auto extension_handle = reinterpret_cast<MinifiExtension*>(this);
init_fn(extension_handle, config_handle);

auto init_fn = reinterpret_cast<void(*)(MinifiExtensionContext*)>(init_symbol_ptr);
Context extension_context{
.config = configure,
.create = [&] (Info info) -> Extension* {
if (info_) {
return nullptr;
}
info_ = std::move(info);
return this;
}
};
init_fn(reinterpret_cast<MinifiExtensionContext*>(&extension_context));
if (!info_) {
Comment thread
adamdebreceni marked this conversation as resolved.
logger_->log_error("Failed to initialize extension '{}'", library_name_);
return false;
Expand Down
Loading