Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,29 +118,29 @@ void ProcessSession::read(FlowFile& flow_file, const io::InputStreamCallback& ca

void ProcessSession::setAttribute(FlowFile& ff, const std::string_view key, std::string value) { // NOLINT(performance-unnecessary-value-param)
const MinifiStringView value_ref = utils::toStringView(value);
if (MINIFI_STATUS_SUCCESS != MinifiFlowFileSetAttribute(impl_, ff.get(), utils::toStringView(key), &value_ref)) {
if (MINIFI_STATUS_SUCCESS != MinifiProcessSessionSetFlowFileAttribute(impl_, ff.get(), utils::toStringView(key), &value_ref)) {
throw minifi::Exception(minifi::FILE_OPERATION_EXCEPTION, "Failed to set attribute");
}
}

void ProcessSession::removeAttribute(FlowFile& ff, const std::string_view key) {
if (MINIFI_STATUS_SUCCESS != MinifiFlowFileSetAttribute(impl_, ff.get(), utils::toStringView(key), nullptr)) {
if (MINIFI_STATUS_SUCCESS != MinifiProcessSessionSetFlowFileAttribute(impl_, ff.get(), utils::toStringView(key), nullptr)) {
throw minifi::Exception(minifi::FILE_OPERATION_EXCEPTION, "Failed to remove attribute");
}
}

std::optional<std::string> ProcessSession::getAttribute(FlowFile& ff, std::string_view key) {
std::optional<std::string> result;
MinifiFlowFileGetAttribute(impl_, ff.get(), utils::toStringView(key), [] (void* user_ctx, MinifiStringView value) {
MinifiProcessSessionGetFlowFileAttribute(impl_, ff.get(), utils::toStringView(key), [] (void* user_ctx, MinifiStringView value) {
*static_cast<std::optional<std::string>*>(user_ctx) = std::string{value.data, value.length};
}, &result);
return result;
}

std::map<std::string, std::string> ProcessSession::getAttributes(FlowFile& ff) {
std::map<std::string, std::string> result;
MinifiFlowFileGetAttributes(impl_, ff.get(), [] (void* user_ctx, MinifiStringView value, MinifiStringView key) {
static_cast<std::map<std::string, std::string>*>(user_ctx)->insert({std::string{value.data, value.length}, std::string{key.data, key.length}});
MinifiProcessSessionGetFlowFileAttributes(impl_, ff.get(), [] (void* user_ctx, const MinifiStringView key, const MinifiStringView value) {
static_cast<std::map<std::string, std::string>*>(user_ctx)->insert({std::string{key.data, key.length}, std::string{value.data, value.length}});
Comment thread
szaszm marked this conversation as resolved.
}, &result);
return result;
}
Expand Down
71 changes: 66 additions & 5 deletions libminifi/src/minifi-c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "core/ProcessorMetrics.h"
#include "core/extension/ExtensionManager.h"
#include "minifi-cpp/Exception.h"
#include "minifi-cpp/controllers/SSLContextServiceInterface.h"
#include "minifi-cpp/core/Annotation.h"
#include "minifi-cpp/core/ClassLoader.h"
#include "minifi-cpp/core/ProcessContext.h"
Expand All @@ -35,8 +36,8 @@
#include "minifi-cpp/core/PropertyValidator.h"
#include "minifi-cpp/core/logging/Logger.h"
#include "minifi-cpp/core/state/PublishedMetricProvider.h"
#include "utils/CProcessor.h"
#include "utils/CControllerService.h"
#include "utils/CProcessor.h"
#include "utils/PropertyErrors.h"

namespace minifi = org::apache::nifi::minifi;
Expand Down Expand Up @@ -435,6 +436,18 @@ MINIFI_OWNED MinifiFlowFile* MinifiProcessSessionCreate(MinifiProcessSession* se
return MINIFI_NULL;
}

MinifiStatus MinifiProcessSessionPenalize(MinifiProcessSession* session, MinifiFlowFile* flowfile) {
gsl_Assert(session != MINIFI_NULL);
gsl_Assert(flowfile != MINIFI_NULL);
try {
reinterpret_cast<minifi::core::ProcessSession*>(session)->penalize(
*reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(flowfile));
Comment thread
szaszm marked this conversation as resolved.
return MINIFI_STATUS_SUCCESS;
} catch (...) {
return MINIFI_STATUS_UNKNOWN_ERROR;
}
}

MinifiStatus MinifiProcessSessionTransfer(MinifiProcessSession* session, MINIFI_OWNED MinifiFlowFile* flowfile, MinifiStringView relationship_name) {
gsl_Assert(session != MINIFI_NULL);
gsl_Assert(flowfile != MINIFI_NULL);
Expand Down Expand Up @@ -501,7 +514,7 @@ int64_t MinifiOutputStreamWrite(MinifiOutputStream* stream, const char* data, si
return gsl::narrow<int64_t>(reinterpret_cast<minifi::io::OutputStream*>(stream)->write(as_bytes(std::span(data, size))));
}

MinifiStatus MinifiFlowFileSetAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, const MinifiStringView* attribute_value) {
MinifiStatus MinifiProcessSessionSetFlowFileAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, const MinifiStringView* attribute_value) {
gsl_Assert(session != MINIFI_NULL);
gsl_Assert(flowfile != MINIFI_NULL);
if (attribute_value == nullptr) {
Expand All @@ -513,7 +526,7 @@ MinifiStatus MinifiFlowFileSetAttribute(MinifiProcessSession* session, MinifiFlo
return MINIFI_STATUS_SUCCESS;
}

MinifiBool MinifiFlowFileGetAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name,
MinifiBool MinifiProcessSessionGetFlowFileAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name,
void(*cb)(void* user_ctx, MinifiStringView attribute_value), void* user_ctx) {
gsl_Assert(session != MINIFI_NULL);
gsl_Assert(flowfile != MINIFI_NULL);
Expand All @@ -525,7 +538,7 @@ MinifiBool MinifiFlowFileGetAttribute(MinifiProcessSession* session, MinifiFlowF
return true;
}

void MinifiFlowFileGetAttributes(MinifiProcessSession* session, MinifiFlowFile* flowfile,
void MinifiProcessSessionGetFlowFileAttributes(MinifiProcessSession* session, MinifiFlowFile* flowfile,
Comment thread
martinzink marked this conversation as resolved.
void(*cb)(void* user_ctx, MinifiStringView attribute_name, MinifiStringView attribute_value), void* user_ctx) {
gsl_Assert(session != MINIFI_NULL);
gsl_Assert(flowfile != MINIFI_NULL);
Expand All @@ -534,6 +547,20 @@ void MinifiFlowFileGetAttributes(MinifiProcessSession* session, MinifiFlowFile*
}
}

uint64_t MinifiProcessSessionGetFlowFileSize(MinifiProcessSession* session, MinifiFlowFile* flowfile) {
gsl_Assert(session != MINIFI_NULL);
gsl_Assert(flowfile != MINIFI_NULL);
return (*reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(flowfile))->getSize();
}

MinifiStatus MinifiProcessSessionGetFlowFileId(MinifiProcessSession* session, MinifiFlowFile* flowfile, void(*cb)(void* user_ctx, MinifiStringView flow_file_id), void* user_ctx) {
gsl_Assert(session != MINIFI_NULL);
gsl_Assert(flowfile != MINIFI_NULL);
const auto uuid_small_str = (*reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(flowfile))->getUUIDStr();
cb(user_ctx, minifiStringView(uuid_small_str.view()));
return MINIFI_STATUS_SUCCESS;
}

MinifiStatus MinifiControllerServiceContextGetProperty(MinifiControllerServiceContext* context, MinifiStringView property_name,
void (*result_cb)(void* user_ctx, MinifiStringView result), void* user_ctx) {
gsl_Assert(context != MINIFI_NULL);
Expand All @@ -551,7 +578,6 @@ MinifiStatus MinifiControllerServiceContextGetProperty(MinifiControllerServiceCo
}
}


MinifiStatus MinifiProcessContextGetControllerService(
MinifiProcessContext* process_context,
const MinifiStringView controller_service_name,
Expand All @@ -578,5 +604,40 @@ MinifiStatus MinifiProcessContextGetControllerService(
return MINIFI_STATUS_VALIDATION_FAILED;
}

void MinifiProcessContextGetDynamicProperties(MinifiProcessContext* context, MinifiFlowFile* minifi_flow_file,
void (*cb)(void* user_ctx, MinifiStringView dynamic_property_name, MinifiStringView dynamic_property_value), void* user_ctx) {
gsl_Assert(context != MINIFI_NULL);
auto flow_file = minifi_flow_file != MINIFI_NULL ? reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(minifi_flow_file)->get() : nullptr;
for (auto& [key, value] : reinterpret_cast<minifi::core::ProcessContext*>(context)->getDynamicProperties(flow_file)) {
cb(user_ctx, minifiStringView(key), minifiStringView(value));
}
}

MinifiStatus MinifiProcessContextGetSslData(MinifiProcessContext* process_context, MinifiStringView controller_service_name,
void (*cb)(void* user_ctx, const MinifiSslData* ssl_data), void* user_ctx) {
gsl_Assert(process_context != MINIFI_NULL);
const auto context = reinterpret_cast<minifi::core::ProcessContext*>(process_context);
const auto name_str = std::string{toStringView(controller_service_name)};
const auto service_shared_ptr = context->getControllerService(name_str, context->getProcessorInfo().getUUID());
if (!service_shared_ptr) { return MINIFI_STATUS_VALIDATION_FAILED; }
if (const auto ssl_context_service = dynamic_cast<minifi::controllers::SSLContextServiceInterface*>(service_shared_ptr.get())) {
const std::string ca_cert_file = ssl_context_service->getCACertificate().string();
const std::string passphrase = ssl_context_service->getPassphrase();
const std::string cert_file = ssl_context_service->getCertificateFile().string();
const std::string private_key_file = ssl_context_service->getPrivateKeyFile().string();
Comment on lines +624 to +627
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to handle exceptions here, so that no C++ exceptions cross the C API boundary. And ideally we should handle string allocation errors too.


MinifiSslData ssl_data{
.version = 1,
.ca_certificate_file = minifiStringView(ca_cert_file),
.certificate_file = minifiStringView(cert_file),
.private_key_file = minifiStringView(private_key_file),
.passphrase = minifiStringView(passphrase),
};
cb(user_ctx, &ssl_data);
return MINIFI_STATUS_SUCCESS;
}
return MINIFI_STATUS_VALIDATION_FAILED;
}


} // extern "C"
26 changes: 23 additions & 3 deletions minifi-api/include/minifi-c/minifi-c.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ extern "C" {
#define MINIFI_REGISTER_EXTENSION_FN MinifiRegisterExtension
#endif

/// To allow the proper usage of SSLContextServices set the MinifiPropertyDefinition::type to MINIFI_SSL_CONTEXT_SERVICE_PROPERTY_TYPE
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't really describe what proper usage means, so I would rephrase it to something like: "To declare a processor property that expects an SSLContextService, use MINIFI_SSL_CONTEXT_SERVICE_PROPERTY_TYPE in the type field of the property definition (MinifiPropertyDefinition::type)"

#define MINIFI_SSL_CONTEXT_SERVICE_PROPERTY_TYPE "org.apache.nifi.minifi.controllers.SSLContextServiceInterface"

enum : uint32_t {
MINIFI_API_VERSION = 3
};
Expand Down Expand Up @@ -231,6 +234,8 @@ MinifiBool MinifiProcessContextHasNonEmptyProperty(MinifiProcessContext* context

MinifiStatus MinifiProcessContextGetControllerService(
MinifiProcessContext* process_context, MinifiStringView controller_service_name, MinifiStringView controller_service_type, MinifiControllerService** controller_service_out);
void MinifiProcessContextGetDynamicProperties(MinifiProcessContext* context, MinifiFlowFile* minifi_flow_file,
void (*cb)(void* user_ctx, MinifiStringView dynamic_property_name, MinifiStringView dynamic_property_value), void* user_ctx);

void MinifiLoggerSetMaxLogSize(MinifiLogger*, int32_t);
void MinifiLoggerLogString(MinifiLogger*, MinifiLogLevel, MinifiStringView);
Expand All @@ -240,6 +245,7 @@ MinifiLogLevel MinifiLoggerLevel(MinifiLogger*);
MINIFI_OWNED MinifiFlowFile* MinifiProcessSessionGet(MinifiProcessSession*);
MINIFI_OWNED MinifiFlowFile* MinifiProcessSessionCreate(MinifiProcessSession* session, MinifiFlowFile* parent_flowfile);

MinifiStatus MinifiProcessSessionPenalize(MinifiProcessSession* session, MinifiFlowFile* flowfile);
MinifiStatus MinifiProcessSessionTransfer(MinifiProcessSession* session, MINIFI_OWNED MinifiFlowFile* flowfile, MinifiStringView relationship_name);
MinifiStatus MinifiProcessSessionRemove(MinifiProcessSession* session, MINIFI_OWNED MinifiFlowFile* flowfile);

Expand All @@ -253,16 +259,30 @@ size_t MinifiInputStreamSize(MinifiInputStream*);
int64_t MinifiInputStreamRead(MinifiInputStream* stream, char* buffer, size_t size);
int64_t MinifiOutputStreamWrite(MinifiOutputStream* stream, const char* data, size_t size);

MinifiStatus MinifiFlowFileSetAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, const MinifiStringView* attribute_value);
MinifiBool MinifiFlowFileGetAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name,
MinifiStatus MinifiProcessSessionSetFlowFileAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, const MinifiStringView* attribute_value);
MinifiBool MinifiProcessSessionGetFlowFileAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name,
void(*cb)(void* user_ctx, MinifiStringView attribute_value), void* user_ctx);
void MinifiFlowFileGetAttributes(MinifiProcessSession* session, MinifiFlowFile* flowfile, void(*cb)(void* user_ctx, MinifiStringView attribute_name, MinifiStringView attribute_value), void* user_ctx);
void MinifiProcessSessionGetFlowFileAttributes(MinifiProcessSession* session, MinifiFlowFile* flowfile,
void (*cb)(void* user_ctx, MinifiStringView attribute_name, MinifiStringView attribute_value), void* user_ctx);
Comment thread
martinzink marked this conversation as resolved.
uint64_t MinifiProcessSessionGetFlowFileSize(MinifiProcessSession* session, MinifiFlowFile* flowfile);
MinifiStatus MinifiProcessSessionGetFlowFileId(MinifiProcessSession* session, MinifiFlowFile* flowfile, void(*cb)(void* user_ctx, MinifiStringView flow_file_id), void* user_ctx);

MinifiStatus MinifiControllerServiceContextGetProperty(MinifiControllerServiceContext* context,
MinifiStringView property_name,
void(*cb)(void* user_ctx, MinifiStringView property_value),
void* user_ctx);

typedef struct MinifiSslData {
uint8_t version;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking we can probably do without this version field, and just rely on the extension API version in case the layout ever changes.

MinifiStringView ca_certificate_file;
MinifiStringView certificate_file;
MinifiStringView private_key_file;
MinifiStringView passphrase;
} MinifiSslData;

MinifiStatus MinifiProcessContextGetSslData(MinifiProcessContext* process_context, MinifiStringView controller_service_name,
void (*cb)(void* user_ctx, const MinifiSslData* ssl_data), void* user_ctx);

#ifdef __cplusplus
} // extern "C"
#endif // __cplusplus
Expand Down
11 changes: 8 additions & 3 deletions minifi-api/minifi-c-api.def
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ EXPORTS
MinifiProcessSessionCreate
MinifiProcessSessionTransfer
MinifiProcessSessionRemove
MinifiProcessSessionPenalize
MinifiProcessSessionRead
MinifiProcessSessionWrite
MinifiConfigGet
MinifiInputStreamSize
MinifiInputStreamRead
MinifiOutputStreamWrite
MinifiFlowFileSetAttribute
MinifiFlowFileGetAttribute
MinifiFlowFileGetAttributes
MinifiProcessSessionSetFlowFileAttribute
MinifiProcessSessionGetFlowFileAttribute
MinifiProcessSessionGetFlowFileAttributes
MinifiProcessSessionGetFlowFileSize
MinifiProcessSessionGetFlowFileId
MinifiProcessContextGetDynamicProperties
MinifiProcessContextGetSslData
Comment thread
martinzink marked this conversation as resolved.
Loading