Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion extension-framework/cpp-extension-lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ target_include_directories(minifi-cpp-extension-lib PUBLIC include)
target_link_libraries(minifi-cpp-extension-lib PUBLIC minifi-core-framework-common minifi-c-api)

add_subdirectory(libtest)

add_subdirectory(mocklib)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
Expand All @@ -20,23 +20,48 @@
#include <string>
#include <expected>

#include "api/core/FlowFile.h"
#include "api/utils/Ssl.h"
#include "minifi-c.h"
#include "minifi-cpp/core/PropertyDefinition.h"
#include "api/core/FlowFile.h"

namespace org::apache::nifi::minifi::api::core {

class ProcessContext {
public:
explicit ProcessContext(MinifiProcessContext* impl): impl_(impl) {}
virtual ~ProcessContext() noexcept = default;

ProcessContext() = default;
ProcessContext(const ProcessContext&) = delete;
ProcessContext(ProcessContext&&) = delete;
ProcessContext& operator=(const ProcessContext&) = delete;
ProcessContext& operator=(ProcessContext&&) = delete;

[[nodiscard]] virtual std::expected<std::string, std::error_code> getProperty(const minifi::core::PropertyReference& prop,
const FlowFile* ff) const = 0;
[[nodiscard]] virtual std::expected<MinifiControllerService*, std::error_code> getControllerService(std::string_view name,
std::string_view type) const = 0;
[[nodiscard]] virtual bool hasNonEmptyProperty(std::string_view name) const = 0;
[[nodiscard]] virtual std::map<std::string, std::string> getDynamicProperties(const FlowFile* flow_file) const = 0;

[[nodiscard]] virtual std::expected<utils::net::SslData, std::error_code> getSslData(std::string_view name) const = 0;
};

class CffiProcessContext : public ProcessContext {
public:
explicit CffiProcessContext(MinifiProcessContext* impl) : impl_(impl) {}

std::expected<std::string, std::error_code> getProperty(std::string_view name, const FlowFile* flow_file = nullptr) const;
std::expected<std::string, std::error_code> getProperty(const minifi::core::PropertyReference& property_reference, const FlowFile* flow_file = nullptr) const {
return getProperty(property_reference.name, flow_file);
}
[[nodiscard]] std::expected<MinifiControllerService*, std::error_code> getControllerService(std::string_view controller_service_name, std::string_view controller_service_class) const;
[[nodiscard]] std::expected<std::string, std::error_code> getProperty(const minifi::core::PropertyReference& property_reference,
const FlowFile* flow_file) const override;
[[nodiscard]] std::expected<MinifiControllerService*, std::error_code> getControllerService(std::string_view name,
std::string_view type) const override;
[[nodiscard]] std::map<std::string, std::string> getDynamicProperties(const FlowFile* flow_file) const override;
[[nodiscard]] bool hasNonEmptyProperty(std::string_view name) const override;

[[nodiscard]] bool hasNonEmptyProperty(std::string_view name) const;
[[nodiscard]] std::expected<utils::net::SslData, std::error_code> getSslData(std::string_view name) const override;

private:
[[nodiscard]] std::expected<std::string, std::error_code> getProperty(std::string_view name, const FlowFile* flow_file) const;

private:
MinifiProcessContext* impl_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,54 @@ namespace org::apache::nifi::minifi::api::core {

class ProcessSession {
public:
explicit ProcessSession(MinifiProcessSession* impl): impl_(impl) {}
virtual ~ProcessSession() = default;

FlowFile create(const FlowFile* parent = nullptr);
FlowFile get();
void transfer(FlowFile ff, const minifi::core::Relationship& relationship);
void remove(FlowFile ff);
void write(FlowFile& flow, const io::OutputStreamCallback& callback);
void read(FlowFile& flow, const io::InputStreamCallback& callback);
ProcessSession() = default;

void setAttribute(FlowFile& ff, std::string_view key, std::string value);
void removeAttribute(FlowFile& ff, std::string_view key);
std::optional<std::string> getAttribute(FlowFile& ff, std::string_view key);
std::map<std::string, std::string> getAttributes(FlowFile& ff);
ProcessSession(const ProcessSession&) = delete;
ProcessSession(ProcessSession&&) = delete;
ProcessSession& operator=(const ProcessSession&) = delete;
ProcessSession& operator=(ProcessSession&&) = delete;

virtual FlowFile create(const FlowFile* parent = nullptr) = 0;
virtual FlowFile get() = 0;

virtual void penalize(FlowFile& ff) = 0;
virtual void transfer(FlowFile ff, const minifi::core::Relationship& relationship) = 0;
virtual void remove(FlowFile ff) = 0;
virtual void write(FlowFile& flow, const io::OutputStreamCallback& callback) = 0;
virtual void read(FlowFile& flow, const io::InputStreamCallback& callback) = 0;

virtual void setAttribute(FlowFile& ff, std::string_view key, std::string value) = 0;
virtual void removeAttribute(FlowFile& ff, std::string_view key) = 0;
[[nodiscard]] virtual std::optional<std::string> getAttribute(FlowFile& ff, std::string_view key) = 0;
[[nodiscard]] virtual std::map<std::string, std::string> getAttributes(const FlowFile& ff) const = 0;
[[nodiscard]] virtual std::string getFlowFileId(const FlowFile& ff) const = 0;
[[nodiscard]] virtual uint64_t getFlowFileSize(const FlowFile& ff) const = 0;

void writeBuffer(FlowFile& flow_file, std::span<const char> buffer);
void writeBuffer(FlowFile& flow_file, std::span<const std::byte> buffer);
std::vector<std::byte> readBuffer(FlowFile& flow_file);
[[nodiscard]] std::vector<std::byte> readBuffer(FlowFile& flow_file);
};

class CffiProcessSession : public ProcessSession {
public:
explicit CffiProcessSession(MinifiProcessSession* impl): impl_(impl) {}

FlowFile create(const FlowFile* parent = nullptr) override;
FlowFile get() override;
void penalize(FlowFile& ff) override;
void transfer(FlowFile ff, const minifi::core::Relationship& relationship) override;
void remove(FlowFile ff) override;
void write(FlowFile& flow, const io::OutputStreamCallback& callback) override;
void read(FlowFile& flow, const io::InputStreamCallback& callback) override;

void setAttribute(FlowFile& ff, std::string_view key, std::string value) override;
void removeAttribute(FlowFile& ff, std::string_view key) override;
[[nodiscard]] std::optional<std::string> getAttribute(FlowFile& ff, std::string_view key) override;
[[nodiscard]] std::map<std::string, std::string> getAttributes(const FlowFile& ff) const override;
[[nodiscard]] std::string getFlowFileId(const FlowFile& ff) const override;
[[nodiscard]] uint64_t getFlowFileSize(const FlowFile& ff) const override;

private:
MinifiProcessSession* impl_;
Expand Down
50 changes: 32 additions & 18 deletions extension-framework/cpp-extension-lib/include/api/core/Resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,38 +49,38 @@ void useProcessorClassDefinition(Fn&& fn) {
std::vector<MinifiDynamicPropertyDefinition> dynamic_properties;
for (auto& prop : Class::DynamicProperties) {
dynamic_properties.push_back(MinifiDynamicPropertyDefinition {
.name = utils::toStringView(prop.name),
.value = utils::toStringView(prop.value),
.description = utils::toStringView(prop.description),
.name = utils::minifiStringView(prop.name),
.value = utils::minifiStringView(prop.value),
.description = utils::minifiStringView(prop.description),
.supports_expression_language = prop.supports_expression_language
});
}
std::vector<MinifiRelationshipDefinition> relationships;
for (auto& rel : Class::Relationships) {
relationships.push_back(MinifiRelationshipDefinition{
.name = utils::toStringView(rel.name),
.description = utils::toStringView(rel.description)
.name = utils::minifiStringView(rel.name),
.description = utils::minifiStringView(rel.description)
});
}
std::vector<std::vector<MinifiStringView>> attribute_relationships_cache;
std::vector<MinifiOutputAttributeDefinition> output_attributes;
for (auto& attr : Class::OutputAttributes) {
std::vector<MinifiStringView> rel_cache;
for (auto& rel : attr.relationships) {
rel_cache.push_back(utils::toStringView(rel.name));
rel_cache.push_back(utils::minifiStringView(rel.name));
}
output_attributes.push_back(MinifiOutputAttributeDefinition {
.name = utils::toStringView(attr.name),
.name = utils::minifiStringView(attr.name),
.relationships_count = gsl::narrow<uint32_t>(attr.relationships.size()),
.relationships_ptr = rel_cache.data(),
.description = utils::toStringView(attr.description)
.description = utils::minifiStringView(attr.description)
});
attribute_relationships_cache.push_back(std::move(rel_cache));
}

MinifiProcessorClassDefinition definition{
.full_name = utils::toStringView(full_name),
.description = utils::toStringView(Class::Description),
.full_name = utils::minifiStringView(full_name),
.description = utils::minifiStringView(Class::Description),
.class_properties_count = gsl::narrow<uint32_t>(class_properties.size()),
.class_properties_ptr = class_properties.data(),
.dynamic_properties_count = gsl::narrow<uint32_t>(dynamic_properties.size()),
Expand All @@ -100,7 +100,7 @@ void useProcessorClassDefinition(Fn&& fn) {
return new Class{minifi::core::ProcessorMetadata{
.uuid = minifi::utils::Identifier::parse(std::string{metadata.uuid.data, metadata.uuid.length}).value(),
.name = std::string{metadata.name.data, metadata.name.length},
.logger = std::make_shared<logging::Logger>(metadata.logger)}};
.logger = std::make_shared<logging::CffiLogger>(metadata.logger)}};
} catch (...) { return nullptr; }
},
.destroy = [] (MINIFI_OWNED void* self) -> void {
Expand All @@ -110,16 +110,16 @@ void useProcessorClassDefinition(Fn&& fn) {
return static_cast<Class*>(self)->getTriggerWhenEmpty();
},
.onTrigger = [] (void* self, MinifiProcessContext* context, MinifiProcessSession* session) -> MinifiStatus {
ProcessContext context_wrapper(context);
ProcessSession session_wrapper(session);
CffiProcessContext context_wrapper(context);
CffiProcessSession session_wrapper(session);
try {
return static_cast<Class*>(self)->onTrigger(context_wrapper, session_wrapper);
} catch (...) {
return MINIFI_STATUS_UNKNOWN_ERROR;
}
},
.onSchedule = [] (void* self, MinifiProcessContext* context) -> MinifiStatus {
ProcessContext context_wrapper(context);
CffiProcessContext context_wrapper(context);
try {
return static_cast<Class*>(self)->onSchedule(context_wrapper);
} catch (...) {
Expand All @@ -136,7 +136,7 @@ void useProcessorClassDefinition(Fn&& fn) {
std::vector<MinifiStringView> names;
std::vector<double> values;
for (auto& [name, val] : metrics) {
names.push_back(utils::toStringView(name));
names.push_back(utils::minifiStringView(name));
values.push_back(val);
}
return MinifiPublishedMetricsCreate(gsl::narrow<uint32_t>(metrics.size()), names.data(), values.data());
Expand All @@ -155,8 +155,8 @@ void useControllerServiceClassDefinition(Fn&& fn) {

std::vector<MinifiPropertyDefinition> class_properties = utils::toProperties(Class::Properties, string_vector_cache);

MinifiControllerServiceClassDefinition definition{.full_name = utils::toStringView(full_name),
.description = utils::toStringView(Class::Description),
MinifiControllerServiceClassDefinition definition{.full_name = utils::minifiStringView(full_name),
.description = utils::minifiStringView(Class::Description),
.class_properties_count = gsl::narrow<uint32_t>(class_properties.size()),
.class_properties_ptr = class_properties.data(),

Expand All @@ -166,7 +166,7 @@ void useControllerServiceClassDefinition(Fn&& fn) {
return new Class{minifi::core::ControllerServiceMetadata{
.uuid = minifi::utils::Identifier::parse(std::string{metadata.uuid.data, metadata.uuid.length}).value(),
.name = std::string{metadata.name.data, metadata.name.length},
.logger = std::make_shared<logging::Logger>(metadata.logger)}};
.logger = std::make_shared<logging::CffiLogger>(metadata.logger)}};
} catch (...) { return nullptr; }
},
.destroy = [](MINIFI_OWNED void* self) -> void { delete static_cast<Class*>(self); },
Expand All @@ -186,4 +186,18 @@ void useControllerServiceClassDefinition(Fn&& fn) {
fn(definition);
}

template <typename... Processors>
void registerProcessors(MinifiExtension* extension) {
(core::useProcessorClassDefinition<Processors>([&](const MinifiProcessorClassDefinition& definition) {
MinifiRegisterProcessor(extension, &definition);
}), ...);
}

template <typename... ControllerServices>
void registerControllerServices(MinifiExtension* extension) {
(core::useControllerServiceClassDefinition<ControllerServices>([&](const MinifiControllerServiceClassDefinition& definition) {
MinifiRegisterControllerService(extension, &definition);
}), ...);
}

} // namespace org::apache::nifi::minifi::api::core
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
#include <iostream>
#include <string>

#include "fmt/chrono.h"
#include "minifi-c.h"
#include "minifi-cpp/core/logging/Logger.h"

namespace org::apache::nifi::minifi::api::core::logging {

class Logger : public minifi::core::logging::Logger {
class CffiLogger : public minifi::core::logging::Logger {
public:
explicit Logger(MinifiLogger* impl): impl_(impl) {}
explicit CffiLogger(MinifiLogger* impl): impl_(impl) {}

void set_max_log_size(int size) override;
void log_string(minifi::core::logging::LOG_LEVEL level, std::string str) override;
Expand Down
Loading
Loading