diff --git a/extension-framework/cpp-extension-lib/src/core/ProcessSession.cpp b/extension-framework/cpp-extension-lib/src/core/ProcessSession.cpp index cc7ab50e1d..8b13b09c2a 100644 --- a/extension-framework/cpp-extension-lib/src/core/ProcessSession.cpp +++ b/extension-framework/cpp-extension-lib/src/core/ProcessSession.cpp @@ -118,20 +118,20 @@ void ProcessSession::read(FlowFile& flow_file, const io::InputStreamCallback& ca void ProcessSession::setAttribute(FlowFile& ff, const std::string_view key, std::string value) { // NOLINT(performance-unnecessary-value-param) const MinifiStringView value_ref = utils::toStringView(value); - if (MINIFI_STATUS_SUCCESS != MinifiFlowFileSetAttribute(impl_, ff.get(), utils::toStringView(key), &value_ref)) { + if (MINIFI_STATUS_SUCCESS != MinifiProcessSessionSetFlowFileAttribute(impl_, ff.get(), utils::toStringView(key), &value_ref)) { throw minifi::Exception(minifi::FILE_OPERATION_EXCEPTION, "Failed to set attribute"); } } void ProcessSession::removeAttribute(FlowFile& ff, const std::string_view key) { - if (MINIFI_STATUS_SUCCESS != MinifiFlowFileSetAttribute(impl_, ff.get(), utils::toStringView(key), nullptr)) { + if (MINIFI_STATUS_SUCCESS != MinifiProcessSessionSetFlowFileAttribute(impl_, ff.get(), utils::toStringView(key), nullptr)) { throw minifi::Exception(minifi::FILE_OPERATION_EXCEPTION, "Failed to remove attribute"); } } std::optional ProcessSession::getAttribute(FlowFile& ff, std::string_view key) { std::optional result; - MinifiFlowFileGetAttribute(impl_, ff.get(), utils::toStringView(key), [] (void* user_ctx, MinifiStringView value) { + MinifiProcessSessionGetFlowFileAttribute(impl_, ff.get(), utils::toStringView(key), [] (void* user_ctx, MinifiStringView value) { *static_cast*>(user_ctx) = std::string{value.data, value.length}; }, &result); return result; @@ -139,8 +139,8 @@ std::optional ProcessSession::getAttribute(FlowFile& ff, std::strin std::map ProcessSession::getAttributes(FlowFile& ff) { std::map result; - MinifiFlowFileGetAttributes(impl_, ff.get(), [] (void* user_ctx, MinifiStringView value, MinifiStringView key) { - static_cast*>(user_ctx)->insert({std::string{value.data, value.length}, std::string{key.data, key.length}}); + MinifiProcessSessionGetFlowFileAttributes(impl_, ff.get(), [] (void* user_ctx, const MinifiStringView key, const MinifiStringView value) { + static_cast*>(user_ctx)->insert({std::string{key.data, key.length}, std::string{value.data, value.length}}); }, &result); return result; } diff --git a/libminifi/src/minifi-c.cpp b/libminifi/src/minifi-c.cpp index 54a3dba8fb..1c5d98fcf0 100644 --- a/libminifi/src/minifi-c.cpp +++ b/libminifi/src/minifi-c.cpp @@ -24,6 +24,7 @@ #include "core/ProcessorMetrics.h" #include "core/extension/ExtensionManager.h" #include "minifi-cpp/Exception.h" +#include "minifi-cpp/controllers/SSLContextServiceInterface.h" #include "minifi-cpp/core/Annotation.h" #include "minifi-cpp/core/ClassLoader.h" #include "minifi-cpp/core/ProcessContext.h" @@ -35,8 +36,8 @@ #include "minifi-cpp/core/PropertyValidator.h" #include "minifi-cpp/core/logging/Logger.h" #include "minifi-cpp/core/state/PublishedMetricProvider.h" -#include "utils/CProcessor.h" #include "utils/CControllerService.h" +#include "utils/CProcessor.h" #include "utils/PropertyErrors.h" namespace minifi = org::apache::nifi::minifi; @@ -435,6 +436,18 @@ MINIFI_OWNED MinifiFlowFile* MinifiProcessSessionCreate(MinifiProcessSession* se return MINIFI_NULL; } +MinifiStatus MinifiProcessSessionPenalize(MinifiProcessSession* session, MinifiFlowFile* flowfile) { + gsl_Assert(session != MINIFI_NULL); + gsl_Assert(flowfile != MINIFI_NULL); + try { + reinterpret_cast(session)->penalize( + *reinterpret_cast*>(flowfile)); + return MINIFI_STATUS_SUCCESS; + } catch (...) { + return MINIFI_STATUS_UNKNOWN_ERROR; + } +} + MinifiStatus MinifiProcessSessionTransfer(MinifiProcessSession* session, MINIFI_OWNED MinifiFlowFile* flowfile, MinifiStringView relationship_name) { gsl_Assert(session != MINIFI_NULL); gsl_Assert(flowfile != MINIFI_NULL); @@ -501,7 +514,7 @@ int64_t MinifiOutputStreamWrite(MinifiOutputStream* stream, const char* data, si return gsl::narrow(reinterpret_cast(stream)->write(as_bytes(std::span(data, size)))); } -MinifiStatus MinifiFlowFileSetAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, const MinifiStringView* attribute_value) { +MinifiStatus MinifiProcessSessionSetFlowFileAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, const MinifiStringView* attribute_value) { gsl_Assert(session != MINIFI_NULL); gsl_Assert(flowfile != MINIFI_NULL); if (attribute_value == nullptr) { @@ -513,7 +526,7 @@ MinifiStatus MinifiFlowFileSetAttribute(MinifiProcessSession* session, MinifiFlo return MINIFI_STATUS_SUCCESS; } -MinifiBool MinifiFlowFileGetAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, +MinifiBool MinifiProcessSessionGetFlowFileAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, void(*cb)(void* user_ctx, MinifiStringView attribute_value), void* user_ctx) { gsl_Assert(session != MINIFI_NULL); gsl_Assert(flowfile != MINIFI_NULL); @@ -525,7 +538,7 @@ MinifiBool MinifiFlowFileGetAttribute(MinifiProcessSession* session, MinifiFlowF return true; } -void MinifiFlowFileGetAttributes(MinifiProcessSession* session, MinifiFlowFile* flowfile, +void MinifiProcessSessionGetFlowFileAttributes(MinifiProcessSession* session, MinifiFlowFile* flowfile, void(*cb)(void* user_ctx, MinifiStringView attribute_name, MinifiStringView attribute_value), void* user_ctx) { gsl_Assert(session != MINIFI_NULL); gsl_Assert(flowfile != MINIFI_NULL); @@ -534,6 +547,20 @@ void MinifiFlowFileGetAttributes(MinifiProcessSession* session, MinifiFlowFile* } } +uint64_t MinifiProcessSessionGetFlowFileSize(MinifiProcessSession* session, MinifiFlowFile* flowfile) { + gsl_Assert(session != MINIFI_NULL); + gsl_Assert(flowfile != MINIFI_NULL); + return (*reinterpret_cast*>(flowfile))->getSize(); +} + +MinifiStatus MinifiProcessSessionGetFlowFileId(MinifiProcessSession* session, MinifiFlowFile* flowfile, void(*cb)(void* user_ctx, MinifiStringView flow_file_id), void* user_ctx) { + gsl_Assert(session != MINIFI_NULL); + gsl_Assert(flowfile != MINIFI_NULL); + const auto uuid_small_str = (*reinterpret_cast*>(flowfile))->getUUIDStr(); + cb(user_ctx, minifiStringView(uuid_small_str.view())); + return MINIFI_STATUS_SUCCESS; +} + MinifiStatus MinifiControllerServiceContextGetProperty(MinifiControllerServiceContext* context, MinifiStringView property_name, void (*result_cb)(void* user_ctx, MinifiStringView result), void* user_ctx) { gsl_Assert(context != MINIFI_NULL); @@ -551,7 +578,6 @@ MinifiStatus MinifiControllerServiceContextGetProperty(MinifiControllerServiceCo } } - MinifiStatus MinifiProcessContextGetControllerService( MinifiProcessContext* process_context, const MinifiStringView controller_service_name, @@ -578,5 +604,43 @@ MinifiStatus MinifiProcessContextGetControllerService( return MINIFI_STATUS_VALIDATION_FAILED; } +void MinifiProcessContextGetDynamicProperties(MinifiProcessContext* context, MinifiFlowFile* minifi_flow_file, + void (*cb)(void* user_ctx, MinifiStringView dynamic_property_name, MinifiStringView dynamic_property_value), void* user_ctx) { + gsl_Assert(context != MINIFI_NULL); + auto flow_file = minifi_flow_file != MINIFI_NULL ? reinterpret_cast*>(minifi_flow_file)->get() : nullptr; + for (auto& [key, value] : reinterpret_cast(context)->getDynamicProperties(flow_file)) { + cb(user_ctx, minifiStringView(key), minifiStringView(value)); + } +} + +MinifiStatus MinifiProcessContextGetSslData(MinifiProcessContext* process_context, MinifiStringView controller_service_name, + void (*cb)(void* user_ctx, const MinifiSslData* ssl_data), void* user_ctx) { + gsl_Assert(process_context != MINIFI_NULL); + try { + const auto context = reinterpret_cast(process_context); + const auto name_str = std::string{toStringView(controller_service_name)}; + const auto service_shared_ptr = context->getControllerService(name_str, context->getProcessorInfo().getUUID()); + if (!service_shared_ptr) { return MINIFI_STATUS_VALIDATION_FAILED; } + if (const auto ssl_context_service = dynamic_cast(service_shared_ptr.get())) { + const std::string ca_cert_file = ssl_context_service->getCACertificate().string(); + const std::string passphrase = ssl_context_service->getPassphrase(); + const std::string cert_file = ssl_context_service->getCertificateFile().string(); + const std::string private_key_file = ssl_context_service->getPrivateKeyFile().string(); + + MinifiSslData ssl_data{ + .ca_certificate_file = minifiStringView(ca_cert_file), + .certificate_file = minifiStringView(cert_file), + .private_key_file = minifiStringView(private_key_file), + .passphrase = minifiStringView(passphrase), + }; + cb(user_ctx, &ssl_data); + return MINIFI_STATUS_SUCCESS; + } + return MINIFI_STATUS_VALIDATION_FAILED; + } catch (...) { + return MINIFI_STATUS_UNKNOWN_ERROR; + } +} + } // extern "C" diff --git a/minifi-api/include/minifi-c/minifi-c.h b/minifi-api/include/minifi-c/minifi-c.h index 97c969ab49..e846efbdcf 100644 --- a/minifi-api/include/minifi-c/minifi-c.h +++ b/minifi-api/include/minifi-c/minifi-c.h @@ -41,6 +41,10 @@ extern "C" { #define MINIFI_REGISTER_EXTENSION_FN MinifiRegisterExtension #endif +/// To declare a processor property that expects an SSLContextService, +/// use MINIFI_SSL_CONTEXT_SERVICE_PROPERTY_TYPE in the type field of the property definition (MinifiPropertyDefinition::type) +#define MINIFI_SSL_CONTEXT_SERVICE_PROPERTY_TYPE "org.apache.nifi.minifi.controllers.SSLContextServiceInterface" + enum : uint32_t { MINIFI_API_VERSION = 3 }; @@ -231,6 +235,8 @@ MinifiBool MinifiProcessContextHasNonEmptyProperty(MinifiProcessContext* context MinifiStatus MinifiProcessContextGetControllerService( MinifiProcessContext* process_context, MinifiStringView controller_service_name, MinifiStringView controller_service_type, MinifiControllerService** controller_service_out); +void MinifiProcessContextGetDynamicProperties(MinifiProcessContext* context, MinifiFlowFile* minifi_flow_file, + void (*cb)(void* user_ctx, MinifiStringView dynamic_property_name, MinifiStringView dynamic_property_value), void* user_ctx); void MinifiLoggerSetMaxLogSize(MinifiLogger*, int32_t); void MinifiLoggerLogString(MinifiLogger*, MinifiLogLevel, MinifiStringView); @@ -240,6 +246,7 @@ MinifiLogLevel MinifiLoggerLevel(MinifiLogger*); MINIFI_OWNED MinifiFlowFile* MinifiProcessSessionGet(MinifiProcessSession*); MINIFI_OWNED MinifiFlowFile* MinifiProcessSessionCreate(MinifiProcessSession* session, MinifiFlowFile* parent_flowfile); +MinifiStatus MinifiProcessSessionPenalize(MinifiProcessSession* session, MinifiFlowFile* flowfile); MinifiStatus MinifiProcessSessionTransfer(MinifiProcessSession* session, MINIFI_OWNED MinifiFlowFile* flowfile, MinifiStringView relationship_name); MinifiStatus MinifiProcessSessionRemove(MinifiProcessSession* session, MINIFI_OWNED MinifiFlowFile* flowfile); @@ -253,16 +260,29 @@ size_t MinifiInputStreamSize(MinifiInputStream*); int64_t MinifiInputStreamRead(MinifiInputStream* stream, char* buffer, size_t size); int64_t MinifiOutputStreamWrite(MinifiOutputStream* stream, const char* data, size_t size); -MinifiStatus MinifiFlowFileSetAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, const MinifiStringView* attribute_value); -MinifiBool MinifiFlowFileGetAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, +MinifiStatus MinifiProcessSessionSetFlowFileAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, const MinifiStringView* attribute_value); +MinifiBool MinifiProcessSessionGetFlowFileAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, void(*cb)(void* user_ctx, MinifiStringView attribute_value), void* user_ctx); -void MinifiFlowFileGetAttributes(MinifiProcessSession* session, MinifiFlowFile* flowfile, void(*cb)(void* user_ctx, MinifiStringView attribute_name, MinifiStringView attribute_value), void* user_ctx); +void MinifiProcessSessionGetFlowFileAttributes(MinifiProcessSession* session, MinifiFlowFile* flowfile, + void (*cb)(void* user_ctx, MinifiStringView attribute_name, MinifiStringView attribute_value), void* user_ctx); +uint64_t MinifiProcessSessionGetFlowFileSize(MinifiProcessSession* session, MinifiFlowFile* flowfile); +MinifiStatus MinifiProcessSessionGetFlowFileId(MinifiProcessSession* session, MinifiFlowFile* flowfile, void(*cb)(void* user_ctx, MinifiStringView flow_file_id), void* user_ctx); MinifiStatus MinifiControllerServiceContextGetProperty(MinifiControllerServiceContext* context, MinifiStringView property_name, void(*cb)(void* user_ctx, MinifiStringView property_value), void* user_ctx); +typedef struct MinifiSslData { + MinifiStringView ca_certificate_file; + MinifiStringView certificate_file; + MinifiStringView private_key_file; + MinifiStringView passphrase; +} MinifiSslData; + +MinifiStatus MinifiProcessContextGetSslData(MinifiProcessContext* process_context, MinifiStringView controller_service_name, + void (*cb)(void* user_ctx, const MinifiSslData* ssl_data), void* user_ctx); + #ifdef __cplusplus } // extern "C" #endif // __cplusplus diff --git a/minifi-api/minifi-c-api.def b/minifi-api/minifi-c-api.def index b2cdcf4f43..0fcaf2ad97 100644 --- a/minifi-api/minifi-c-api.def +++ b/minifi-api/minifi-c-api.def @@ -16,12 +16,17 @@ EXPORTS MinifiProcessSessionCreate MinifiProcessSessionTransfer MinifiProcessSessionRemove + MinifiProcessSessionPenalize MinifiProcessSessionRead MinifiProcessSessionWrite MinifiConfigGet MinifiInputStreamSize MinifiInputStreamRead MinifiOutputStreamWrite - MinifiFlowFileSetAttribute - MinifiFlowFileGetAttribute - MinifiFlowFileGetAttributes + MinifiProcessSessionSetFlowFileAttribute + MinifiProcessSessionGetFlowFileAttribute + MinifiProcessSessionGetFlowFileAttributes + MinifiProcessSessionGetFlowFileSize + MinifiProcessSessionGetFlowFileId + MinifiProcessContextGetDynamicProperties + MinifiProcessContextGetSslData