diff --git a/CMakeLists.txt b/CMakeLists.txt index 96257bf095..ad953bf787 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -748,7 +748,7 @@ set(EXTENSIONS_ENABLED_BY_DEFAULT ( minifi-grafana-loki minifi-archive-extensions minifi-mqtt-extensions - minifi-rdkafka-extensions + minifi-kafka minifi-pdh minifi-prometheus minifi-rocksdb-repos diff --git a/core-framework/common/include/core/PropertyDefinitionBuilder.h b/core-framework/common/include/core/PropertyDefinitionBuilder.h index 221f5d917a..b2bde14b0e 100644 --- a/core-framework/common/include/core/PropertyDefinitionBuilder.h +++ b/core-framework/common/include/core/PropertyDefinitionBuilder.h @@ -27,7 +27,21 @@ namespace org::apache::nifi::minifi::core { namespace detail { template inline constexpr auto TypeNames = std::array{core::className()...}; -} + +template +struct StringLiteral { + char value[N]; + constexpr StringLiteral(const char (&str)[N]) { // NOLINT(runtime/explicit) + for (size_t i = 0; i < N; ++i) { + value[i] = str[i]; + } + } +}; + +// A variable template that creates permanent static memory for the span to point to +template +inline constexpr auto StaticAllowedType = std::array{std::string_view{str.value, sizeof(str.value) - 1}}; +} // namespace detail template struct PropertyDefinitionBuilder { @@ -81,6 +95,12 @@ struct PropertyDefinitionBuilder { return *this; } + template + constexpr PropertyDefinitionBuilder withAllowedType() { + property.allowed_types = detail::StaticAllowedType; + return *this; + } + constexpr PropertyDefinitionBuilder withValidator(const PropertyValidator& property_validator) { property.validator = gsl::make_not_null(&property_validator); return *this; diff --git a/extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h b/extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h index ee9eb0f727..87209577cc 100644 --- a/extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h +++ b/extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h @@ -44,7 +44,7 @@ class ProcessContext { [[nodiscard]] virtual bool hasNonEmptyProperty(std::string_view name) const = 0; [[nodiscard]] virtual std::map getDynamicProperties(const FlowFile* flow_file) const = 0; - [[nodiscard]] virtual std::expected getSslData(std::string_view name) const = 0; + [[nodiscard]] virtual std::expected getSslData(const minifi::core::PropertyReference& prop) const = 0; }; class CffiProcessContext : public ProcessContext { @@ -58,7 +58,7 @@ class CffiProcessContext : public ProcessContext { [[nodiscard]] std::map getDynamicProperties(const FlowFile* flow_file) const override; [[nodiscard]] bool hasNonEmptyProperty(std::string_view name) const override; - [[nodiscard]] std::expected getSslData(std::string_view name) const override; + [[nodiscard]] std::expected getSslData(const minifi::core::PropertyReference& prop) const override; private: [[nodiscard]] std::expected getProperty(std::string_view name, const FlowFile* flow_file) const; diff --git a/extension-framework/cpp-extension-lib/mocklib/CMakeLists.txt b/extension-framework/cpp-extension-lib/mocklib/CMakeLists.txt index f93870f375..28aaede166 100644 --- a/extension-framework/cpp-extension-lib/mocklib/CMakeLists.txt +++ b/extension-framework/cpp-extension-lib/mocklib/CMakeLists.txt @@ -19,6 +19,6 @@ file(GLOB SRC_FILES CONFIGURE_DEPENDS "src/*.cpp") -add_library(libmock-minifi STATIC ${SRC_FILES}) -target_link_libraries(libmock-minifi PUBLIC minifi-cpp-extension-lib minifi-core-framework-common minifi-c-api) -target_include_directories(libmock-minifi PUBLIC include) +add_library(mock-minifi STATIC ${SRC_FILES}) +target_link_libraries(mock-minifi PUBLIC minifi-cpp-extension-lib minifi-core-framework-common minifi-c-api) +target_include_directories(mock-minifi PUBLIC include) diff --git a/extension-framework/cpp-extension-lib/mocklib/include/MockProcessContext.h b/extension-framework/cpp-extension-lib/mocklib/include/MockProcessContext.h index 62a0bc57af..76a13c8d81 100644 --- a/extension-framework/cpp-extension-lib/mocklib/include/MockProcessContext.h +++ b/extension-framework/cpp-extension-lib/mocklib/include/MockProcessContext.h @@ -34,7 +34,7 @@ class MockProcessContext : public api::core::ProcessContext { [[nodiscard]] std::map getDynamicProperties(const api::core::FlowFile* flow_file) const override; [[nodiscard]] bool hasNonEmptyProperty(std::string_view name) const override; - [[nodiscard]] std::expected getSslData(std::string_view name) const override; + [[nodiscard]] std::expected getSslData(const minifi::core::PropertyReference& prop) const override; std::map> properties_; diff --git a/extension-framework/cpp-extension-lib/mocklib/src/MockProcessContext.cpp b/extension-framework/cpp-extension-lib/mocklib/src/MockProcessContext.cpp index b8bb1e5fc6..afd5b9b2f6 100644 --- a/extension-framework/cpp-extension-lib/mocklib/src/MockProcessContext.cpp +++ b/extension-framework/cpp-extension-lib/mocklib/src/MockProcessContext.cpp @@ -44,7 +44,7 @@ bool MockProcessContext::hasNonEmptyProperty(const std::string_view name) const return properties_.contains(name); } -std::expected MockProcessContext::getSslData(std::string_view) const { +std::expected MockProcessContext::getSslData(const minifi::core::PropertyReference&) const { return api::utils::net::SslData{}; } } // namespace org::apache::nifi::minifi::mock diff --git a/extension-framework/cpp-extension-lib/mocklib/src/mock-minifi-c.cpp b/extension-framework/cpp-extension-lib/mocklib/src/mock-minifi-c.cpp index a546f07ad3..a50ca712b2 100644 --- a/extension-framework/cpp-extension-lib/mocklib/src/mock-minifi-c.cpp +++ b/extension-framework/cpp-extension-lib/mocklib/src/mock-minifi-c.cpp @@ -19,6 +19,7 @@ #include "minifi-c.h" +extern "C" { MinifiExtension* MINIFI_REGISTER_EXTENSION_FN(MinifiExtensionContext*, const MinifiExtensionDefinition*) { throw std::runtime_error("Not implemented"); } @@ -51,7 +52,7 @@ void MinifiProcessContextGetDynamicProperties(MinifiProcessContext*, MinifiFlowF throw std::runtime_error("Not implemented"); } -MinifiStatus MinifiProcessContextGetSslData(MinifiProcessContext*, MinifiStringView, +MinifiStatus MinifiProcessContextGetSslDataFromProperty(MinifiProcessContext*, MinifiStringView, void (*)(void* user_ctx, const MinifiSslData* ssl_data), void*) { throw std::runtime_error("Not implemented"); } @@ -133,3 +134,4 @@ MinifiStatus MinifiControllerServiceContextGetProperty(MinifiControllerServiceCo void (*)(void* user_ctx, MinifiStringView property_value), void*) { throw std::runtime_error("Not implemented"); } +} // extern "C" diff --git a/extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp b/extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp index a4dd9505cd..90c333f0dd 100644 --- a/extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp +++ b/extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp @@ -69,10 +69,10 @@ std::map CffiProcessContext::getDynamicProperties(cons return result; } -std::expected CffiProcessContext::getSslData(const std::string_view name) const { +std::expected CffiProcessContext::getSslData(const minifi::core::PropertyReference& prop) const { auto ssl_data = utils::net::SslData{}; - if (const auto status = MinifiProcessContextGetSslData(impl_, utils::minifiStringView(name), [](void* data, const MinifiSslData* minifi_ssl_data) { + if (const auto status = MinifiProcessContextGetSslDataFromProperty(impl_, utils::minifiStringView(prop.name), [](void* data, const MinifiSslData* minifi_ssl_data) { auto* my_ssl_data = static_cast(data); my_ssl_data->ca_loc = utils::toString(minifi_ssl_data->ca_certificate_file); my_ssl_data->cert_loc = utils::toString(minifi_ssl_data->certificate_file); diff --git a/extensions/kafka/CMakeLists.txt b/extensions/kafka/CMakeLists.txt index 13f4b3a386..3e7bfa1e28 100644 --- a/extensions/kafka/CMakeLists.txt +++ b/extensions/kafka/CMakeLists.txt @@ -27,13 +27,13 @@ include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) file(GLOB SOURCES "*.cpp") -add_minifi_library(minifi-rdkafka-extensions SHARED ${SOURCES}) +add_minifi_library(minifi-kafka SHARED ${SOURCES}) -target_link_libraries(minifi-rdkafka-extensions ${LIBMINIFI} Threads::Threads) +target_link_libraries(minifi-kafka minifi-cpp-extension-lib Threads::Threads) get_target_property(RDKAFKA_INCLUDE_DIRS rdkafka INCLUDE_DIRECTORIES) -target_include_directories(minifi-rdkafka-extensions SYSTEM PUBLIC ${RDKAFKA_INCLUDE_DIRS}) -target_link_libraries(minifi-rdkafka-extensions rdkafka) +target_include_directories(minifi-kafka SYSTEM PUBLIC ${RDKAFKA_INCLUDE_DIRS}) +target_link_libraries(minifi-kafka rdkafka) -register_extension(minifi-rdkafka-extensions "RDKAFKA EXTENSIONS" RDKAFKA-EXTENSIONS "This Enables librdkafka functionality including PublishKafka" "extensions/kafka/tests") +register_c_api_extension(minifi-kafka "RDKAFKA EXTENSIONS" RDKAFKA-EXTENSIONS "This Enables librdkafka functionality including PublishKafka" "extensions/kafka/tests") diff --git a/extensions/kafka/ConsumeKafka.cpp b/extensions/kafka/ConsumeKafka.cpp index adbff6630a..acad0ff195 100644 --- a/extensions/kafka/ConsumeKafka.cpp +++ b/extensions/kafka/ConsumeKafka.cpp @@ -20,14 +20,11 @@ #include #include -#include "minifi-cpp/core/FlowFile.h" -#include "core/ProcessSession.h" -#include "minifi-cpp/core/PropertyValidator.h" -#include "core/Resource.h" -#include "utils/OptionalUtils.h" -#include "utils/AttributeErrors.h" -#include "utils/ProcessorConfigUtils.h" +#include "api/utils/ProcessorConfigUtils.h" #include "minifi-cpp/utils/gsl.h" +#include "utils/AttributeErrors.h" +#include "utils/Hash.h" +#include "utils/OptionalUtils.h" #include "utils/expected.h" using namespace std::literals::chrono_literals; @@ -41,33 +38,27 @@ struct std::hash(input, 0ms, 4s); - return parsed_time.has_value(); -} - -void ConsumeKafka::initialize() { - setSupportedProperties(Properties); - setSupportedRelationships(Relationships); -} -void ConsumeKafka::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { +MinifiStatus ConsumeKafka::onScheduleImpl(api::core::ProcessContext& context) { + using utils::KafkaEncoding; + namespace utils = api::utils; // Required properties - topic_names_ = utils::string::splitAndTrim(utils::parseProperty(context, TopicNames), ","); + topic_names_ = minifi::utils::string::splitAndTrim(utils::parseProperty(context, TopicNames), ","); topic_name_format_ = utils::parseEnumProperty(context, TopicNameFormat); commit_policy_ = utils::parseEnumProperty(context, CommitPolicy); - key_attribute_encoding_ = utils::parseEnumProperty(context, KeyAttributeEncoding); - message_header_encoding_ = utils::parseEnumProperty(context, MessageHeaderEncoding); + key_attribute_encoding_ = utils::parseEnumProperty(context, KeyAttributeEncoding); + message_header_encoding_ = utils::parseEnumProperty(context, MessageHeaderEncoding); duplicate_header_handling_ = utils::parseEnumProperty(context, DuplicateHeaderHandling); max_poll_time_milliseconds_ = utils::parseDurationProperty(context, MaxPollTime); + if (max_poll_time_milliseconds_ > 4s) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "MaxPollTime is too large (it should be less than 4s)"); + } max_poll_records_ = gsl::narrow(utils::parseU64Property(context, MaxPollRecords)); // Optional properties message_demarcator_ = utils::parseOptionalProperty(context, MessageDemarcator); - headers_to_add_as_attributes_ = parseOptionalProperty(context, HeadersToAddAsAttributes) - | utils::transform([](const std::string& headers_to_add_str) { return utils::string::splitAndTrim(headers_to_add_str, ","); }); + headers_to_add_as_attributes_ = utils::parseOptionalProperty(context, HeadersToAddAsAttributes) + | minifi::utils::transform([](const std::string& headers_to_add_str) { return minifi::utils::string::splitAndTrim(headers_to_add_str, ","); }); if (message_demarcator_ && headers_to_add_as_attributes_) { logger_->log_error("Message merging with header extraction is not yet supported"); throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Message merging with header extraction is not yet supported"); @@ -76,40 +67,9 @@ void ConsumeKafka::onSchedule(core::ProcessContext& context, core::ProcessSessio configureNewConnection(context); if (commit_policy_ == consume_kafka::CommitPolicyEnum::CommitFromIncomingFlowFiles) { setTriggerWhenEmpty(true); - } else if (context.hasIncomingConnections()) { - logger_->log_error("Incoming connections are not allowed with {}", magic_enum::enum_name(commit_policy_)); - throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Incoming connections are not allowed with {}", magic_enum::enum_name(commit_policy_))); - } -} - -namespace { -void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t trigger, rd_kafka_topic_partition_list_t* partitions, void* /*opaque*/) { - // Cooperative, incremental assignment is not supported in the current librdkafka version - std::shared_ptr logger{core::logging::LoggerFactory::getLogger()}; - logger->log_debug("Rebalance triggered."); - rd_kafka_resp_err_t assign_error = RD_KAFKA_RESP_ERR_NO_ERROR; - switch (trigger) { - case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: - logger->log_debug("assigned:"); - if (logger->should_log(core::logging::LOG_LEVEL::debug)) { utils::print_topics_list(*logger, *partitions); } - assign_error = rd_kafka_assign(rk, partitions); - break; - - case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: - logger->log_debug("revoked:"); - rd_kafka_commit(rk, partitions, /* async = */ 0); // Sync commit, maybe unnecessary - if (logger->should_log(core::logging::LOG_LEVEL::debug)) { utils::print_topics_list(*logger, *partitions); } - assign_error = rd_kafka_assign(rk, nullptr); - break; - - default: - logger->log_debug("failed: {}", rd_kafka_err2str(trigger)); - assign_error = rd_kafka_assign(rk, nullptr); - break; } - logger->log_debug("assign failure: {}", rd_kafka_err2str(assign_error)); + return MINIFI_STATUS_SUCCESS; } -} // namespace void ConsumeKafka::createTopicPartitionList() { kf_topic_partition_list_ = utils::rd_kafka_topic_partition_list_unique_ptr{ @@ -136,25 +96,17 @@ void ConsumeKafka::createTopicPartitionList() { } } -void ConsumeKafka::extendConfigFromDynamicProperties(const core::ProcessContext& context) const { - using utils::setKafkaConfigurationField; - - const std::vector dynamic_prop_keys = context.getDynamicPropertyKeys(); - if (dynamic_prop_keys.empty()) { return; } - logger_->log_info("Loading {} extra kafka configuration fields from ConsumeKafka dynamic properties:", dynamic_prop_keys.size()); - for (const std::string& key: dynamic_prop_keys) { - std::string value = context.getDynamicProperty(key) - | utils::orThrow(fmt::format("This shouldn't happen, dynamic property {} is expected because we just queried the list of dynamic properties", key)); - logger_->log_info("{}: {}", key.c_str(), value.c_str()); - setKafkaConfigurationField(*conf_, key, value); +void ConsumeKafka::extendConfigFromDynamicProperties(const api::core::ProcessContext& context) const { + for (const auto& [key, value] : context.getDynamicProperties(nullptr)) { + logger_->log_trace("{}: {}", key, value); + utils::setKafkaConfigurationField(*conf_, key, value); } } -void ConsumeKafka::configureNewConnection(core::ProcessContext& context) { - using utils::setKafkaConfigurationField; - +void ConsumeKafka::configureNewConnection(api::core::ProcessContext& context) { conf_ = {rd_kafka_conf_new(), utils::rd_kafka_conf_deleter()}; if (conf_ == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object"); } + rd_kafka_conf_set_opaque(conf_.get(), &kafka_opaque_); // Set rebalance callback for use with coordinated consumer group balancing // Rebalance handlers are needed for the initial configuration of the consumer @@ -162,15 +114,14 @@ void ConsumeKafka::configureNewConnection(core::ProcessContext& context) { // Registering a rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that // responsibility to the application's rebalance_cb. if (commit_policy_ != consume_kafka::CommitPolicyEnum::CommitFromIncomingFlowFiles) { - rd_kafka_conf_set_rebalance_cb(conf_.get(), rebalance_cb); + rd_kafka_conf_set_rebalance_cb(conf_.get(), utils::KafkaOpaque::rebalance_cb); } - // Uncomment this for librdkafka debug logs: - // logger_->log_info("Enabling all debug logs for kafka consumer."); - // setKafkaConfigurationField(*conf_, "debug", "all"); - setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get())); + using utils::setKafkaConfigurationField; + namespace utils = api::utils; + setKafkaConfigurationField(*conf_, "bootstrap.servers", utils::parseProperty(context, KafkaBrokers)); setKafkaConfigurationField(*conf_, "allow.auto.create.topics", "true"); setKafkaConfigurationField(*conf_, @@ -180,7 +131,7 @@ void ConsumeKafka::configureNewConnection(core::ProcessContext& context) { setKafkaConfigurationField(*conf_, "enable.auto.offset.store", std::to_string(commit_policy_ == consume_kafka::CommitPolicyEnum::AutoCommit)); setKafkaConfigurationField(*conf_, "isolation.level", utils::parseBoolProperty(context, HonorTransactions) ? "read_committed" : "read_uncommitted"); setKafkaConfigurationField(*conf_, "group.id", utils::parseProperty(context, GroupID)); - setKafkaConfigurationField(*conf_, "client.id", this->getUUIDStr()); + // setKafkaConfigurationField(*conf_, "client.id", client_id); No need to set id since its autogenerated, and we don't access it anywhere from minifi setKafkaConfigurationField(*conf_, "session.timeout.ms", std::to_string(utils::parseDurationProperty(context, SessionTimeout).count())); // Twice the default, arbitrarily chosen setKafkaConfigurationField(*conf_, "max.poll.interval.ms", "600000"); @@ -188,7 +139,7 @@ void ConsumeKafka::configureNewConnection(core::ProcessContext& context) { extendConfigFromDynamicProperties(context); std::array errstr{}; - consumer_ = {rd_kafka_new(RD_KAFKA_CONSUMER, conf_.release(), errstr.data(), errstr.size()), utils::rd_kafka_consumer_deleter()}; + consumer_ = {rd_kafka_new(RD_KAFKA_CONSUMER, conf_.release(), errstr.data(), errstr.size()), minifi::utils::rd_kafka_consumer_deleter()}; if (consumer_ == nullptr) { const std::string error_msg{errstr.data()}; throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create Kafka consumer " + error_msg); @@ -252,7 +203,7 @@ std::string ConsumeKafka::resolve_duplicate_headers(const std::vector> ConsumeKafka::getFlowFilesAttri return attributes_from_headers; } -void ConsumeKafka::addAttributesToSingleMessageFlowFile(core::FlowFile& flow_file, const rd_kafka_message_t& message) const { - flow_file.setAttribute(KafkaCountAttribute.name, "1"); +void ConsumeKafka::addAttributesToSingleMessageFlowFile(api::core::ProcessSession& session, api::core::FlowFile& flow_file, const rd_kafka_message_t& message) const { + session.setAttribute(flow_file, KafkaCountAttribute.name, "1"); if (const auto message_key = get_encoded_message_key(message, key_attribute_encoding_)) { - flow_file.setAttribute(KafkaKeyAttribute.name, *message_key); + session.setAttribute(flow_file, KafkaKeyAttribute.name, *message_key); } - flow_file.setAttribute(KafkaOffsetAttribute.name, std::to_string(message.offset)); - flow_file.setAttribute(KafkaPartitionAttribute.name, std::to_string(message.partition)); - flow_file.setAttribute(KafkaTopicAttribute.name, rd_kafka_topic_name(message.rkt)); + session.setAttribute(flow_file, KafkaOffsetAttribute.name, std::to_string(message.offset)); + session.setAttribute(flow_file, KafkaPartitionAttribute.name, std::to_string(message.partition)); + session.setAttribute(flow_file, KafkaTopicAttribute.name, rd_kafka_topic_name(message.rkt)); if (headers_to_add_as_attributes_) { - for (const auto& [attr_key, attr_value]: getFlowFilesAttributesFromMessageHeaders(message)) { flow_file.setAttribute(attr_key, attr_value); } + for (const auto& [attr_key, attr_value]: getFlowFilesAttributesFromMessageHeaders(message)) { session.setAttribute(flow_file, attr_key, attr_value); } } } -void ConsumeKafka::addAttributesToMessageBundleFlowFile(core::FlowFile& flow_file, const MessageBundle& message_bundle) const { +void ConsumeKafka::addAttributesToMessageBundleFlowFile(api::core::ProcessSession& session, api::core::FlowFile& flow_file, const MessageBundle& message_bundle) const { gsl_Assert(!headers_to_add_as_attributes_); - flow_file.setAttribute(KafkaCountAttribute.name, std::to_string(message_bundle.getMessages().size())); - flow_file.setAttribute(KafkaOffsetAttribute.name, std::to_string(message_bundle.getLargestOffset())); - flow_file.setAttribute(KafkaPartitionAttribute.name, std::to_string(message_bundle.getMessages().front()->partition)); - flow_file.setAttribute(KafkaTopicAttribute.name, rd_kafka_topic_name(message_bundle.getMessages().front()->rkt)); + session.setAttribute(flow_file, KafkaCountAttribute.name, std::to_string(message_bundle.getMessages().size())); + session.setAttribute(flow_file, KafkaOffsetAttribute.name, std::to_string(message_bundle.getLargestOffset())); + session.setAttribute(flow_file, KafkaPartitionAttribute.name, std::to_string(message_bundle.getMessages().front()->partition)); + session.setAttribute(flow_file, KafkaTopicAttribute.name, rd_kafka_topic_name(message_bundle.getMessages().front()->rkt)); } void ConsumeKafka::commitOffsetsFromMessages(const std::unordered_map& message_bundles) const { @@ -333,19 +284,19 @@ void ConsumeKafka::commitOffsetsFromMessages(const std::unordered_map, core::Relationship>> flow_files; - while (auto ff = session.get()) { flow_files.push_back({ff, Committed}); } +void ConsumeKafka::commitOffsetsFromIncomingFlowFiles(api::core::ProcessSession& session) const { + std::vector> flow_files; + while (auto ff = session.get()) { flow_files.emplace_back(std::move(ff), Committed); } if (flow_files.empty()) { return; } std::unordered_map max_offsets; for (auto& [flow_file, relationship]: std::ranges::reverse_view(flow_files)) { - auto topic_name = flow_file->getAttribute(KafkaTopicAttribute.name) + auto topic_name = session.getAttribute(flow_file, KafkaTopicAttribute.name) | utils::toExpected(make_error_code(core::AttributeErrorCode::MissingAttribute)); - const auto offset = flow_file->getAttribute(KafkaOffsetAttribute.name) + const auto offset = session.getAttribute(flow_file, KafkaOffsetAttribute.name) | utils::toExpected(make_error_code(core::AttributeErrorCode::MissingAttribute)) | utils::andThen(parsing::parseIntegral); - const auto partition = flow_file->getAttribute(KafkaPartitionAttribute.name) + const auto partition = session.getAttribute(flow_file, KafkaPartitionAttribute.name) | utils::toExpected(make_error_code(core::AttributeErrorCode::MissingAttribute)) | utils::andThen(parsing::parseIntegral); if (!topic_name || !offset || !partition) { @@ -367,8 +318,7 @@ void ConsumeKafka::commitOffsetsFromIncomingFlowFiles(core::ProcessSession& sess for (const auto& [location, max_offset] : max_offsets) { rd_kafka_topic_partition_list_add(partitions.get(), location.topic.data(), location.partition)->offset = max_offset + 1; } - const auto commit_res = rd_kafka_commit(consumer_.get(), partitions.get(), 0); - switch (commit_res) { + switch (const auto commit_res = rd_kafka_commit(consumer_.get(), partitions.get(), 0)) { case RD_KAFKA_RESP_ERR_NO_ERROR: { logger_->log_debug("Commit successfully from {} flowfiles", flow_files.size()); break; @@ -386,24 +336,24 @@ void ConsumeKafka::commitOffsetsFromIncomingFlowFiles(core::ProcessSession& sess throw Exception(PROCESS_SESSION_EXCEPTION, fmt::format("Committing offset failed: {}: {}", magic_enum::enum_underlying(commit_res), rd_kafka_err2str(commit_res))); } } - for (const auto& [ff, relationship]: flow_files) { session.transfer(ff, relationship); } + for (auto& [ff, relationship]: flow_files) { session.transfer(std::move(ff), relationship); } } -void ConsumeKafka::processMessages(core::ProcessSession& session, const std::unordered_map& message_bundles) const { +MinifiStatus ConsumeKafka::processMessages(api::core::ProcessSession& session, const std::unordered_map& message_bundles) const { for (const auto& msg_bundle: message_bundles | std::views::values) { for (const auto& message: msg_bundle.getMessages()) { std::string message_content = extractMessage(*message); auto flow_file = session.create(); session.writeBuffer(flow_file, message_content); - addAttributesToSingleMessageFlowFile(*flow_file, *message); - session.transfer(flow_file, Success); + addAttributesToSingleMessageFlowFile(session, flow_file, *message); + session.transfer(std::move(flow_file), Success); } } - session.commit(); if (commit_policy_ == consume_kafka::CommitPolicyEnum::CommitAfterBatch) { commitOffsetsFromMessages(message_bundles); } + return MINIFI_STATUS_SUCCESS; } -void ConsumeKafka::processMessageBundles(core::ProcessSession& session, +MinifiStatus ConsumeKafka::processMessageBundles(api::core::ProcessSession& session, const std::unordered_map& message_bundles, const std::string_view message_demarcator) const { for (const auto& msg_bundle: message_bundles | std::views::values) { auto flow_file = session.create(); @@ -411,24 +361,23 @@ void ConsumeKafka::processMessageBundles(core::ProcessSession& session, return extractMessage(*message); }); session.writeBuffer(flow_file, merged_message_content); - addAttributesToMessageBundleFlowFile(*flow_file, msg_bundle); - session.transfer(flow_file, Success); + addAttributesToMessageBundleFlowFile(session, flow_file, msg_bundle); + session.transfer(std::move(flow_file), Success); } + return MINIFI_STATUS_SUCCESS; } -void ConsumeKafka::onTrigger(core::ProcessContext&, core::ProcessSession& session) { +MinifiStatus ConsumeKafka::onTriggerImpl(api::core::ProcessContext&, api::core::ProcessSession& session) { if (commit_policy_ == consume_kafka::CommitPolicyEnum::CommitFromIncomingFlowFiles) { commitOffsetsFromIncomingFlowFiles(session); } const auto message_bundles = pollKafkaMessages(); if (message_bundles.empty()) { logger_->log_debug("No new messages"); - return; + return MINIFI_STATUS_PROCESSOR_YIELD; } if (!message_demarcator_) { - processMessages(session, message_bundles); - } else { - processMessageBundles(session, message_bundles, *message_demarcator_); + return processMessages(session, message_bundles); } + return processMessageBundles(session, message_bundles, *message_demarcator_); } -REGISTER_RESOURCE(ConsumeKafka, Processor); } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/kafka/ConsumeKafka.h b/extensions/kafka/ConsumeKafka.h index 96dbe58e5f..ded235f09e 100644 --- a/extensions/kafka/ConsumeKafka.h +++ b/extensions/kafka/ConsumeKafka.h @@ -28,24 +28,15 @@ #include "core/PropertyDefinitionBuilder.h" #include "minifi-cpp/core/PropertyValidator.h" #include "minifi-cpp/core/RelationshipDefinition.h" -#include "core/logging/LoggerFactory.h" -#include "io/StreamPipe.h" #include "rdkafka.h" #include "rdkafka_utils.h" #include "utils/ArrayUtils.h" +#include "api/core/ProcessSession.h" +#include "api/core/ProcessContext.h" +#include "minifi-cpp/core/Annotation.h" namespace org::apache::nifi::minifi::processors::consume_kafka { -class ConsumeKafkaMaxPollTimePropertyValidator final : public minifi::core::PropertyValidator { - public: - constexpr ~ConsumeKafkaMaxPollTimePropertyValidator() override { } // NOLINT see comment at grandparent - - [[nodiscard]] bool validate(std::string_view input) const override; - [[nodiscard]] std::optional getEquivalentNifiStandardValidatorName() const override { return std::nullopt; } -}; - -inline constexpr ConsumeKafkaMaxPollTimePropertyValidator CONSUME_KAFKA_MAX_POLL_TIME_TYPE{}; - enum class CommitPolicyEnum { NoCommit, AutoCommit, CommitAfterBatch, CommitFromIncomingFlowFiles }; enum class OffsetResetPolicyEnum { earliest, latest, none }; @@ -215,7 +206,7 @@ class ConsumeKafka final : public KafkaProcessorBase { .withDescription( "Specifies the maximum amount of time the consumer can use for polling data from the brokers. " "Polling is a blocking operation, so the upper limit of this value is specified in 4 seconds.") - .withValidator(consume_kafka::CONSUME_KAFKA_MAX_POLL_TIME_TYPE) + .withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR) .withDefaultValue(DEFAULT_MAX_POLL_TIME) .isRequired(true) .build(); @@ -284,7 +275,6 @@ class ConsumeKafka final : public KafkaProcessorBase { EXTENSIONAPI static constexpr auto KafkaOffsetAttribute = core::OutputAttributeDefinition<>{ "kafka.offset", {Success}, "The offset of the message (or largest offset of the message bundle)"}; - ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS using KafkaProcessorBase::KafkaProcessorBase; @@ -294,9 +284,8 @@ class ConsumeKafka final : public KafkaProcessorBase { ConsumeKafka& operator=(ConsumeKafka&&) = delete; ~ConsumeKafka() override = default; - void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; - void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; - void initialize() override; + MinifiStatus onScheduleImpl(api::core::ProcessContext& context) override; + MinifiStatus onTriggerImpl(api::core::ProcessContext& context, api::core::ProcessSession& session) override; private: struct KafkaMessageLocation { @@ -323,21 +312,21 @@ class ConsumeKafka final : public KafkaProcessorBase { friend struct ::std::hash; void createTopicPartitionList(); - void extendConfigFromDynamicProperties(const core::ProcessContext& context) const; - void configureNewConnection(core::ProcessContext& context); + void extendConfigFromDynamicProperties(const api::core::ProcessContext& context) const; + void configureNewConnection(api::core::ProcessContext& context); static std::string extractMessage(const rd_kafka_message_t& rkmessage); std::unordered_map pollKafkaMessages(); std::string resolve_duplicate_headers(const std::vector& matching_headers) const; std::vector get_matching_headers(const rd_kafka_message_t& message, const std::string& header_name) const; std::vector> getFlowFilesAttributesFromMessageHeaders(const rd_kafka_message_t& message) const; - void addAttributesToSingleMessageFlowFile(core::FlowFile& flow_file, const rd_kafka_message_t& message) const; - void addAttributesToMessageBundleFlowFile(core::FlowFile& flow_file, const MessageBundle& message_bundle) const; - void processMessages(core::ProcessSession& session, const std::unordered_map& message_bundles) const; - void processMessageBundles(core::ProcessSession& session, const std::unordered_map& message_bundles, + void addAttributesToSingleMessageFlowFile(api::core::ProcessSession& session, api::core::FlowFile& flow_file, const rd_kafka_message_t& message) const; + void addAttributesToMessageBundleFlowFile(api::core::ProcessSession& session, api::core::FlowFile& flow_file, const MessageBundle& message_bundle) const; + MinifiStatus processMessages(api::core::ProcessSession& session, const std::unordered_map& message_bundles) const; + MinifiStatus processMessageBundles(api::core::ProcessSession& session, const std::unordered_map& message_bundles, std::string_view message_demarcator) const; void commitOffsetsFromMessages(const std::unordered_map& message_bundles) const; - void commitOffsetsFromIncomingFlowFiles(core::ProcessSession& session) const; + void commitOffsetsFromIncomingFlowFiles(api::core::ProcessSession& session) const; std::vector topic_names_{}; consume_kafka::TopicNameFormatEnum topic_name_format_ = consume_kafka::TopicNameFormatEnum::Names; diff --git a/extensions/kafka/ExtensionInitializer.cpp b/extensions/kafka/ExtensionInitializer.cpp new file mode 100644 index 0000000000..792ab083b5 --- /dev/null +++ b/extensions/kafka/ExtensionInitializer.cpp @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ConsumeKafka.h" +#include "PublishKafka.h" +#include "api/core/Resource.h" +#include "api/utils/minifi-c-utils.h" + +#define MKSOC(x) #x +#define MAKESTRING(x) MKSOC(x) // NOLINT(cppcoreguidelines-macro-usage) + +namespace minifi = org::apache::nifi::minifi; + +CEXTENSIONAPI const uint32_t MinifiApiVersion = MINIFI_API_VERSION; + +CEXTENSIONAPI void MinifiInitExtension(MinifiExtensionContext* extension_context) { + const MinifiExtensionDefinition extension_definition{.name = minifi::api::utils::minifiStringView(MAKESTRING(EXTENSION_NAME)), + .version = minifi::api::utils::minifiStringView(MAKESTRING(EXTENSION_VERSION)), + .deinit = nullptr, + .user_data = nullptr}; + auto* extension = MinifiRegisterExtension(extension_context, &extension_definition); + minifi::api::core::registerProcessors(extension); +} diff --git a/extensions/kafka/KafkaConnection.cpp b/extensions/kafka/KafkaConnection.cpp index ac1387fa9e..e7b09f8af6 100644 --- a/extensions/kafka/KafkaConnection.cpp +++ b/extensions/kafka/KafkaConnection.cpp @@ -25,8 +25,7 @@ namespace org::apache::nifi::minifi::processors { KafkaConnection::KafkaConnection(KafkaConnectionKey key) - : logger_(core::logging::LoggerFactory::getLogger()), - initialized_(false), + : initialized_(false), key_(std::move(key)), poll_(false) {} @@ -40,18 +39,13 @@ void KafkaConnection::remove() { } void KafkaConnection::removeConnection() { - logger_->log_trace("KafkaConnection::removeConnection START: Client = {} -- Broker = {}", key_.client_id_, key_.brokers_); stopPoll(); if (kafka_connection_) { rd_kafka_flush(kafka_connection_, 10 * 1000); /* wait for max 10 seconds */ rd_kafka_destroy(kafka_connection_); - modifyLoggers([&](std::unordered_map>& loggers) { - loggers.erase(kafka_connection_); - }); kafka_connection_ = nullptr; } initialized_ = false; - logger_->log_trace("KafkaConnection::removeConnection FINISH: Client = {} -- Broker = {}", key_.client_id_, key_.brokers_); } bool KafkaConnection::initialized() const { @@ -62,9 +56,6 @@ void KafkaConnection::setConnection(utils::rd_kafka_producer_unique_ptr producer removeConnection(); kafka_connection_ = gsl::owner{producer.release()}; // kafka_connection_ takes ownership from producer initialized_ = true; - modifyLoggers([&](std::unordered_map>& loggers) { - loggers[kafka_connection_] = logger_; - }); startPoll(); } @@ -89,37 +80,4 @@ void KafkaConnection::putTopic(const std::string& topicName, const std::shared_p topics_[topicName] = topic; } -void KafkaConnection::logCallback(const rd_kafka_t* rk, const int level, const char* /*fac*/, const char* buf) { - std::shared_ptr logger; - try { - modifyLoggers([&](const std::unordered_map>& loggers) { - logger = loggers.at(rk).lock(); - }); - } catch (...) {} - - if (!logger) { return; } - - switch (level) { - case 0: // LOG_EMERG - case 1: // LOG_ALERT - case 2: // LOG_CRIT - logger->log_critical("{}", buf); - break; - case 3: // LOG_ERR - logger->log_error("{}", buf); - break; - case 4: // LOG_WARNING - logger->log_warn("{}", buf); - break; - case 5: // LOG_NOTICE - case 6: // LOG_INFO - logger->log_info("{}", buf); - break; - case 7: // LOG_DEBUG - logger->log_debug("{}", buf); - break; - default: gsl_FailFast(); - } -} - } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/kafka/KafkaConnection.h b/extensions/kafka/KafkaConnection.h index 121ab2f3d1..2960f0172f 100644 --- a/extensions/kafka/KafkaConnection.h +++ b/extensions/kafka/KafkaConnection.h @@ -24,7 +24,6 @@ #include #include -#include "core/logging/LoggerFactory.h" #include "minifi-cpp/core/logging/Logger.h" #include "rdkafka.h" #include "KafkaTopic.h" @@ -73,11 +72,7 @@ class KafkaConnection { void putTopic(const std::string &topicName, const std::shared_ptr &topic); - static void logCallback(const rd_kafka_t* rk, int level, const char* /*fac*/, const char* buf); - private: - std::shared_ptr logger_; - bool initialized_; KafkaConnectionKey key_; @@ -89,17 +84,8 @@ class KafkaConnection { std::atomic poll_; std::thread thread_kafka_poll_; - static void modifyLoggers(const std::function>&)>& func) { - static std::mutex loggers_mutex; - static std::unordered_map> loggers; - - std::lock_guard lock(loggers_mutex); - func(loggers); - } - void stopPoll() { poll_ = false; - logger_->log_debug("Stop polling"); if (thread_kafka_poll_.joinable()) { thread_kafka_poll_.join(); } @@ -107,7 +93,6 @@ class KafkaConnection { void startPoll() { poll_ = true; - logger_->log_debug("Start polling"); thread_kafka_poll_ = std::thread([this]{ while (this->poll_) { rd_kafka_poll(this->kafka_connection_, 1000); diff --git a/extensions/kafka/KafkaProcessorBase.cpp b/extensions/kafka/KafkaProcessorBase.cpp index 250bef64bc..cfd5b0679d 100644 --- a/extensions/kafka/KafkaProcessorBase.cpp +++ b/extensions/kafka/KafkaProcessorBase.cpp @@ -16,22 +16,24 @@ */ #include "KafkaProcessorBase.h" -#include "minifi-cpp/controllers/SSLContextServiceInterface.h" +#include "api/utils/ProcessorConfigUtils.h" #include "rdkafka_utils.h" -#include "utils/ProcessorConfigUtils.h" namespace org::apache::nifi::minifi::processors { -std::optional KafkaProcessorBase::getSslData(core::ProcessContext& context) const { - return utils::net::getSslData(context, SSLContextService, logger_); +KafkaProcessorBase::KafkaProcessorBase(core::ProcessorMetadata metadata) : ProcessorImpl(std::move(metadata)), kafka_opaque_(*logger_) { } -void KafkaProcessorBase::setKafkaAuthenticationParameters(core::ProcessContext& context, gsl::not_null config) { - security_protocol_ = utils::parseEnumProperty(context, SecurityProtocol); +std::optional KafkaProcessorBase::getSslData(api::core::ProcessContext& context) const { + return context.getSslData(SSLContextService) | utils::toOptional(); +} + +void KafkaProcessorBase::setKafkaAuthenticationParameters(api::core::ProcessContext& context, gsl::not_null config) { + security_protocol_ = api::utils::parseEnumProperty(context, SecurityProtocol); utils::setKafkaConfigurationField(*config, "security.protocol", std::string{magic_enum::enum_name(security_protocol_)}); logger_->log_debug("Kafka security.protocol [{}]", magic_enum::enum_name(security_protocol_)); if (security_protocol_ == kafka::SecurityProtocolOption::ssl || security_protocol_ == kafka::SecurityProtocolOption::sasl_ssl) { - if (auto ssl_data = getSslData(context)) { + if (auto ssl_data = context.getSslData(SSLContextService) | utils::toOptional()) { if (ssl_data->ca_loc.empty() && ssl_data->cert_loc.empty() && ssl_data->key_loc.empty() && ssl_data->key_pw.empty()) { logger_->log_warn("Security protocol is set to {}, but no valid security parameters are set in the properties or in the SSL Context Service.", magic_enum::enum_name(security_protocol_)); @@ -48,12 +50,12 @@ void KafkaProcessorBase::setKafkaAuthenticationParameters(core::ProcessContext& } } - auto sasl_mechanism = utils::parseEnumProperty(context, SASLMechanism); + auto sasl_mechanism = api::utils::parseEnumProperty(context, SASLMechanism); utils::setKafkaConfigurationField(*config, "sasl.mechanism", std::string{magic_enum::enum_name(sasl_mechanism)}); logger_->log_debug("Kafka sasl.mechanism [{}]", magic_enum::enum_name(sasl_mechanism)); auto setKafkaConfigIfNotEmpty = [this, &context, config](const core::PropertyReference& property, const std::string& kafka_config_name, bool log_value = true) { - const std::string value = context.getProperty(property).value_or(""); + const std::string value = context.getProperty(property, nullptr).value_or(""); if (!value.empty()) { utils::setKafkaConfigurationField(*config, kafka_config_name, value); if (log_value) { diff --git a/extensions/kafka/KafkaProcessorBase.h b/extensions/kafka/KafkaProcessorBase.h index 129882ac44..6cdc2feec8 100644 --- a/extensions/kafka/KafkaProcessorBase.h +++ b/extensions/kafka/KafkaProcessorBase.h @@ -20,12 +20,13 @@ #include #include -#include "minifi-cpp/controllers/SSLContextServiceInterface.h" -#include "core/ProcessorImpl.h" +#include "api/core/ProcessorImpl.h" +#include "api/utils/Export.h" +#include "api/utils/Ssl.h" #include "core/PropertyDefinitionBuilder.h" #include "rdkafka_utils.h" #include "utils/Enum.h" -#include "utils/net/Ssl.h" +#include "minifi-c.h" namespace org::apache::nifi::minifi::processors { @@ -35,12 +36,12 @@ enum class SecurityProtocolOption { plaintext, ssl, sasl_plaintext, sasl_ssl }; enum class SASLMechanismOption { GSSAPI, PLAIN }; } // namespace kafka -class KafkaProcessorBase : public core::ProcessorImpl { +class KafkaProcessorBase : public api::core::ProcessorImpl { public: EXTENSIONAPI static constexpr auto SSLContextService = core::PropertyDefinitionBuilder<>::createProperty("SSL Context Service") .withDescription("SSL Context Service Name") - .withAllowedTypes() + .withAllowedType() .build(); EXTENSIONAPI static constexpr auto SecurityProtocol = core::PropertyDefinitionBuilder()>::createProperty( @@ -88,7 +89,7 @@ class KafkaProcessorBase : public core::ProcessorImpl { EXTENSIONAPI static constexpr auto Properties = std::to_array({SSLContextService, SecurityProtocol, KerberosServiceName, KerberosPrincipal, KerberosKeytabPath, SASLMechanism, Username, Password}); - using ProcessorImpl::ProcessorImpl; + explicit KafkaProcessorBase(core::ProcessorMetadata metadata); KafkaProcessorBase(const KafkaProcessorBase&) = delete; KafkaProcessorBase(KafkaProcessorBase&&) = delete; @@ -97,10 +98,11 @@ class KafkaProcessorBase : public core::ProcessorImpl { ~KafkaProcessorBase() override = default; protected: - virtual std::optional getSslData(core::ProcessContext& context) const; - void setKafkaAuthenticationParameters(core::ProcessContext& context, gsl::not_null config); + virtual std::optional getSslData(api::core::ProcessContext& context) const; + void setKafkaAuthenticationParameters(api::core::ProcessContext& context, gsl::not_null config); kafka::SecurityProtocolOption security_protocol_{}; + utils::KafkaOpaque kafka_opaque_; }; } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/kafka/PublishKafka.cpp b/extensions/kafka/PublishKafka.cpp index 6f04186cd5..60b6cde7d9 100644 --- a/extensions/kafka/PublishKafka.cpp +++ b/extensions/kafka/PublishKafka.cpp @@ -21,14 +21,15 @@ #include #include -#include "minifi-cpp/core/ProcessContext.h" -#include "core/ProcessSession.h" -#include "core/Resource.h" +#include "api/core/ProcessSession.h" +#include "api/core/Resource.h" +#include "api/utils/ProcessorConfigUtils.h" +#include "minifi-cpp/io/InputStream.h" +#include "minifi-cpp/io/Stream.h" +#include "minifi-cpp/utils/gsl.h" #include "range/v3/algorithm/all_of.hpp" #include "rdkafka_utils.h" -#include "utils/ProcessorConfigUtils.h" #include "utils/StringUtils.h" -#include "minifi-cpp/utils/gsl.h" namespace org::apache::nifi::minifi::processors { @@ -158,12 +159,13 @@ class ReadCallback { }); } - static utils::rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& flow_file, + static utils::rd_kafka_headers_unique_ptr make_headers(const api::core::FlowFile& flow_file, + const api::core::ProcessSession& session, const std::optional& attribute_name_regex) { utils::rd_kafka_headers_unique_ptr result{rd_kafka_headers_new(8)}; if (!result) { throw std::bad_alloc{}; } - for (const auto& [attribute_key, attribute_value]: flow_file.getAttributes()) { + for (const auto& [attribute_key, attribute_value]: session.getAttributes(flow_file)) { if (attribute_name_regex && utils::regexMatch(attribute_key, *attribute_name_regex)) { rd_kafka_header_add(result.get(), attribute_key.c_str(), @@ -228,15 +230,15 @@ class ReadCallback { public: ReadCallback(const uint64_t max_seg_size, std::string key, rd_kafka_topic_t* const rkt, rd_kafka_t* const rk, - const core::FlowFile& flowFile, const std::optional& attributeNameRegex, - std::shared_ptr messages, const size_t flow_file_index, const bool fail_empty_flow_files, - std::shared_ptr logger) - : flow_size_(flowFile.getSize()), + uint64_t flow_file_size, const api::core::FlowFile& flowFile, const api::core::ProcessSession& session, + const std::optional& attributeNameRegex, std::shared_ptr messages, + const size_t flow_file_index, const bool fail_empty_flow_files, std::shared_ptr logger) + : flow_size_(flow_file_size), max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? flow_size_ : max_seg_size), key_(std::move(key)), rkt_(rkt), rk_(rk), - hdrs(make_headers(flowFile, attributeNameRegex)), + hdrs(make_headers(flowFile, session, attributeNameRegex)), messages_(std::move(messages)), flow_file_index_(flow_file_index), fail_empty_flow_files_(fail_empty_flow_files), @@ -326,33 +328,28 @@ void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage } } // namespace -void PublishKafka::initialize() { - setSupportedProperties(Properties); - setSupportedRelationships(Relationships); -} - -void PublishKafka::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { +MinifiStatus PublishKafka::onScheduleImpl(api::core::ProcessContext& context) { interrupted_ = false; // Try to get a KafkaConnection - std::string client_id = utils::parseProperty(context, ClientName); - std::string brokers = utils::parseProperty(context, SeedBrokers); + std::string client_id = api::utils::parseProperty(context, ClientName); + std::string brokers = api::utils::parseProperty(context, SeedBrokers); // Get some properties not (only) used directly to set up librdkafka // Batch Size - batch_size_ = utils::parseU64Property(context, BatchSize); + batch_size_ = api::utils::parseU64Property(context, BatchSize); logger_->log_debug("PublishKafka: Batch Size [{}]", batch_size_); // Target Batch Payload Size - target_batch_payload_size_ = utils::parseDataSizeProperty(context, TargetBatchPayloadSize); + target_batch_payload_size_ = api::utils::parseDataSizeProperty(context, TargetBatchPayloadSize); logger_->log_debug("PublishKafka: Target Batch Payload Size [{}]", target_batch_payload_size_); // Max Flow Segment Size - max_flow_seg_size_ = utils::parseDataSizeProperty(context, MaxFlowSegSize); + max_flow_seg_size_ = api::utils::parseDataSizeProperty(context, MaxFlowSegSize); logger_->log_debug("PublishKafka: Max Flow Segment Size [{}]", max_flow_seg_size_); // Attributes to Send as Headers - attributeNameRegex_ = context.getProperty(AttributeNameRegex) + attributeNameRegex_ = context.getProperty(AttributeNameRegex, nullptr) | utils::transform([](auto pattern_str) { return utils::Regex{std::move(pattern_str)}; }) | utils::toOptional(); @@ -362,15 +359,16 @@ void PublishKafka::onSchedule(core::ProcessContext& context, core::ProcessSessio conn_ = std::make_unique(key_); configureNewConnection(context); - if (const auto message_key_field = context.getProperty(MessageKeyField); message_key_field && !message_key_field->empty()) { + if (const auto message_key_field = context.getProperty(MessageKeyField, nullptr); message_key_field && !message_key_field->empty()) { logger_->log_error("The {} property is set. This property is DEPRECATED and has no effect; please use Kafka Key instead.", MessageKeyField.name); } logger_->log_debug("Successfully configured PublishKafka"); + return MINIFI_STATUS_SUCCESS; } -void PublishKafka::notifyStop() { - logger_->log_debug("notifyStop called"); +void PublishKafka::onUnSchedule() { + logger_->log_debug("onUnSchedule called"); interrupted_ = true; { // Normally when we need both connection_mutex_ and messages_mutex_, we need to take connection_mutex_ first to avoid a @@ -386,18 +384,20 @@ void PublishKafka::notifyStop() { conn_.reset(); } -bool PublishKafka::configureNewConnection(core::ProcessContext& context) { +void PublishKafka::configureNewConnection(api::core::ProcessContext& context) { std::array err_chars{}; rd_kafka_conf_res_t result = RD_KAFKA_CONF_OK; constexpr std::string_view PREFIX_ERROR_MSG = "PublishKafka: configure error result: "; - utils::rd_kafka_conf_unique_ptr conf_{rd_kafka_conf_new()}; - if (conf_ == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object"); } + utils::rd_kafka_conf_unique_ptr conf{rd_kafka_conf_new()}; + if (conf == nullptr) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to create rd_kafka_conf_t object"); } + + rd_kafka_conf_set_opaque(conf.get(), &kafka_opaque_); const auto* const key = conn_->getKey(); if (key->brokers_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "There are no brokers"); } - result = rd_kafka_conf_set(conf_.get(), "bootstrap.servers", key->brokers_.c_str(), err_chars.data(), err_chars.size()); + result = rd_kafka_conf_set(conf.get(), "bootstrap.servers", key->brokers_.c_str(), err_chars.data(), err_chars.size()); logger_->log_debug("PublishKafka: bootstrap.servers [{}]", key->brokers_); if (result != RD_KAFKA_CONF_OK) { auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); @@ -405,15 +405,15 @@ bool PublishKafka::configureNewConnection(core::ProcessContext& context) { } if (key->client_id_.empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Client id is empty"); } - result = rd_kafka_conf_set(conf_.get(), "client.id", key->client_id_.c_str(), err_chars.data(), err_chars.size()); + result = rd_kafka_conf_set(conf.get(), "client.id", key->client_id_.c_str(), err_chars.data(), err_chars.size()); logger_->log_debug("PublishKafka: client.id [{}]", key->client_id_); if (result != RD_KAFKA_CONF_OK) { auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } - if (const auto debug_context = context.getProperty(DebugContexts)) { - result = rd_kafka_conf_set(conf_.get(), "debug", debug_context->c_str(), err_chars.data(), err_chars.size()); + if (const auto debug_context = context.getProperty(DebugContexts, nullptr)) { + result = rd_kafka_conf_set(conf.get(), "debug", debug_context->c_str(), err_chars.data(), err_chars.size()); logger_->log_debug("PublishKafka: debug [{}]", *debug_context); if (result != RD_KAFKA_CONF_OK) { auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); @@ -421,8 +421,8 @@ bool PublishKafka::configureNewConnection(core::ProcessContext& context) { } } - if (const auto max_message_size = context.getProperty(MaxMessageSize); max_message_size && !max_message_size->empty()) { - result = rd_kafka_conf_set(conf_.get(), "message.max.bytes", max_message_size->c_str(), err_chars.data(), err_chars.size()); + if (const auto max_message_size = context.getProperty(MaxMessageSize, nullptr); max_message_size && !max_message_size->empty()) { + result = rd_kafka_conf_set(conf.get(), "message.max.bytes", max_message_size->c_str(), err_chars.data(), err_chars.size()); logger_->log_debug("PublishKafka: message.max.bytes [{}]", *max_message_size); if (result != RD_KAFKA_CONF_OK) { auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); @@ -430,13 +430,13 @@ bool PublishKafka::configureNewConnection(core::ProcessContext& context) { } } - if (const auto queue_buffer_max_message = utils::parseOptionalU64Property(context, QueueBufferMaxMessage)) { + if (const auto queue_buffer_max_message = api::utils::parseOptionalU64Property(context, QueueBufferMaxMessage)) { if (*queue_buffer_max_message < batch_size_) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid configuration: Batch Size cannot be larger than Queue Max Message"); } const auto value = std::to_string(*queue_buffer_max_message); - result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.messages", value.c_str(), err_chars.data(), err_chars.size()); + result = rd_kafka_conf_set(conf.get(), "queue.buffering.max.messages", value.c_str(), err_chars.data(), err_chars.size()); logger_->log_debug("PublishKafka: queue.buffering.max.messages [{}]", value); if (result != RD_KAFKA_CONF_OK) { auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); @@ -444,10 +444,10 @@ bool PublishKafka::configureNewConnection(core::ProcessContext& context) { } } - if (const auto queue_buffer_max_size = utils::parseOptionalDataSizeProperty(context, QueueBufferMaxSize)) { + if (const auto queue_buffer_max_size = api::utils::parseOptionalDataSizeProperty(context, QueueBufferMaxSize)) { auto valInt = *queue_buffer_max_size / 1024; auto valueConf = std::to_string(valInt); - result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.kbytes", valueConf.c_str(), err_chars.data(), err_chars.size()); + result = rd_kafka_conf_set(conf.get(), "queue.buffering.max.kbytes", valueConf.c_str(), err_chars.data(), err_chars.size()); logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [{}]", valueConf); if (result != RD_KAFKA_CONF_OK) { auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); @@ -455,9 +455,9 @@ bool PublishKafka::configureNewConnection(core::ProcessContext& context) { } } - if (const auto queue_buffer_max_time = utils::parseOptionalDurationProperty(context, QueueBufferMaxTime)) { + if (const auto queue_buffer_max_time = api::utils::parseOptionalDurationProperty(context, QueueBufferMaxTime)) { auto valueConf = std::to_string(queue_buffer_max_time->count()); - result = rd_kafka_conf_set(conf_.get(), "queue.buffering.max.ms", valueConf.c_str(), err_chars.data(), err_chars.size()); + result = rd_kafka_conf_set(conf.get(), "queue.buffering.max.ms", valueConf.c_str(), err_chars.data(), err_chars.size()); logger_->log_debug("PublishKafka: queue.buffering.max.ms [{}]", valueConf); if (result != RD_KAFKA_CONF_OK) { auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); @@ -465,9 +465,9 @@ bool PublishKafka::configureNewConnection(core::ProcessContext& context) { } } - if (const auto batch_size = utils::parseOptionalU64Property(context, BatchSize)) { + if (const auto batch_size = api::utils::parseOptionalU64Property(context, BatchSize)) { auto value = std::to_string(*batch_size); - result = rd_kafka_conf_set(conf_.get(), "batch.num.messages", value.c_str(), err_chars.data(), err_chars.size()); + result = rd_kafka_conf_set(conf.get(), "batch.num.messages", value.c_str(), err_chars.data(), err_chars.size()); logger_->log_debug("PublishKafka: batch.num.messages [{}]", value); if (result != RD_KAFKA_CONF_OK) { auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); @@ -475,8 +475,8 @@ bool PublishKafka::configureNewConnection(core::ProcessContext& context) { } } - if (const auto compress_codec = context.getProperty(CompressCodec); compress_codec && !compress_codec->empty() && *compress_codec != "none") { - result = rd_kafka_conf_set(conf_.get(), "compression.codec", compress_codec->c_str(), err_chars.data(), err_chars.size()); + if (const auto compress_codec = context.getProperty(CompressCodec, nullptr); compress_codec && !compress_codec->empty() && *compress_codec != "none") { + result = rd_kafka_conf_set(conf.get(), "compression.codec", compress_codec->c_str(), err_chars.data(), err_chars.size()); logger_->log_debug("PublishKafka: compression.codec [{}]", *compress_codec); if (result != RD_KAFKA_CONF_OK) { auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); @@ -484,49 +484,47 @@ bool PublishKafka::configureNewConnection(core::ProcessContext& context) { } } - setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get())); + setKafkaAuthenticationParameters(context, gsl::make_not_null(conf.get())); // Add all the dynamic properties as librdkafka configurations - const auto& dynamic_prop_keys = context.getDynamicPropertyKeys(); - logger_->log_info("PublishKafka registering {} librdkafka dynamic properties", dynamic_prop_keys.size()); - - for (const auto& prop_key : dynamic_prop_keys) { - if (const auto dynamic_property_value = context.getDynamicProperty(prop_key, nullptr); dynamic_property_value && !dynamic_property_value->empty()) { - logger_->log_debug("PublishKafka: DynamicProperty: [{}] -> [{}]", prop_key, *dynamic_property_value); - result = rd_kafka_conf_set(conf_.get(), prop_key.c_str(), dynamic_property_value->c_str(), err_chars.data(), err_chars.size()); - if (result != RD_KAFKA_CONF_OK) { - auto error_msg = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); - throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); - } - } else { + + std::optional error_during_on_dyn_props = std::nullopt; + for (auto [prop_key, prop_value] : context.getDynamicProperties(nullptr)) { + if (prop_value.empty()) { logger_->log_warn( - "PublishKafka Dynamic Property '{}' is empty and therefore will not " - "be configured", + "PublishKafka Dynamic Property '{}' is empty and therefore will not be configured", prop_key); + continue; + } + logger_->log_debug("PublishKafka: DynamicProperty: [{}] -> [{}]", prop_key, prop_value); + result = rd_kafka_conf_set(conf.get(), prop_key.data(), prop_value.data(), err_chars.data(), err_chars.size()); + if (result != RD_KAFKA_CONF_OK) { + error_during_on_dyn_props = utils::string::join_pack(PREFIX_ERROR_MSG, err_chars.data()); } } + if (error_during_on_dyn_props) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, *error_during_on_dyn_props); + } // Set the delivery callback - rd_kafka_conf_set_dr_msg_cb(conf_.get(), &messageDeliveryCallback); + rd_kafka_conf_set_dr_msg_cb(conf.get(), &messageDeliveryCallback); // Set the logger callback - rd_kafka_conf_set_log_cb(conf_.get(), &KafkaConnection::logCallback); + rd_kafka_conf_set_log_cb(conf.get(), &utils::KafkaOpaque::logCallback); // The producer takes ownership of the configuration, we must not free it utils::rd_kafka_producer_unique_ptr producer{ - rd_kafka_new(RD_KAFKA_PRODUCER, conf_.release(), err_chars.data(), err_chars.size())}; + rd_kafka_new(RD_KAFKA_PRODUCER, conf.release(), err_chars.data(), err_chars.size())}; if (producer == nullptr) { auto error_msg = utils::string::join_pack("Failed to create Kafka producer ", err_chars.data()); throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg); } conn_->setConnection(std::move(producer)); - - return true; } -bool PublishKafka::createNewTopic(core::ProcessContext& context, const std::string& topic_name, - const std::shared_ptr& flow_file) const { +bool PublishKafka::createNewTopic(const api::core::ProcessContext& context, const std::string& topic_name, + const api::core::FlowFile& flow_file) const { utils::rd_kafka_topic_conf_unique_ptr topic_conf_{rd_kafka_topic_conf_new()}; if (topic_conf_ == nullptr) { logger_->log_error("Failed to create rd_kafka_topic_conf_t object"); @@ -536,7 +534,7 @@ bool PublishKafka::createNewTopic(core::ProcessContext& context, const std::stri rd_kafka_conf_res_t result = RD_KAFKA_CONF_OK; std::array err_chars{}; - if (auto delivery_guarantee = context.getProperty(DeliveryGuarantee, flow_file.get())) { + if (auto delivery_guarantee = api::utils::parseOptionalProperty(context, DeliveryGuarantee, &flow_file)) { /* * Because of a previous error in this processor, the default value of this property was "DELIVERY_ONE_NODE". * As this is not a valid value for "request.required.acks", the following rd_kafka_topic_conf_set call failed, @@ -560,7 +558,7 @@ bool PublishKafka::createNewTopic(core::ProcessContext& context, const std::stri } } - if (const auto request_timeout = utils::parseOptionalDurationProperty(context, RequestTimeOut)) { + if (const auto request_timeout = api::utils::parseOptionalDurationProperty(context, RequestTimeOut)) { auto valueConf = std::to_string(request_timeout->count()); result = rd_kafka_topic_conf_set(topic_conf_.get(), "request.timeout.ms", valueConf.c_str(), err_chars.data(), err_chars.size()); logger_->log_debug("PublishKafka: request.timeout.ms [{}]", valueConf); @@ -570,7 +568,7 @@ bool PublishKafka::createNewTopic(core::ProcessContext& context, const std::stri } } - if (const auto message_timeout = utils::parseOptionalDurationProperty(context, MessageTimeOut)) { + if (const auto message_timeout = api::utils::parseOptionalDurationProperty(context, MessageTimeOut)) { auto valueConf = std::to_string(message_timeout->count()); result = rd_kafka_topic_conf_set(topic_conf_.get(), "message.timeout.ms", valueConf.c_str(), err_chars.data(), err_chars.size()); logger_->log_debug("PublishKafka: message.timeout.ms [{}]", valueConf); @@ -596,25 +594,24 @@ bool PublishKafka::createNewTopic(core::ProcessContext& context, const std::stri return true; } -std::optional PublishKafka::getSslData(core::ProcessContext& context) const { +std::optional PublishKafka::getSslData(api::core::ProcessContext& context) const { if (auto result = KafkaProcessorBase::getSslData(context); result) { return result; } - utils::net::SslData ssl_data; - if (auto security_ca = context.getProperty(SecurityCA)) { ssl_data.ca_loc = *security_ca; } - if (auto security_cert = context.getProperty(SecurityCert)) { ssl_data.cert_loc = *security_cert; } - if (auto security_private_key = context.getProperty(SecurityPrivateKey)) { ssl_data.key_loc = *security_private_key; } - if (auto security_private_key_pass = context.getProperty(SecurityPrivateKeyPassWord)) { + api::utils::net::SslData ssl_data; + if (auto security_ca = context.getProperty(SecurityCA, nullptr)) { ssl_data.ca_loc = *security_ca; } + if (auto security_cert = context.getProperty(SecurityCert, nullptr)) { ssl_data.cert_loc = *security_cert; } + if (auto security_private_key = context.getProperty(SecurityPrivateKey, nullptr)) { ssl_data.key_loc = *security_private_key; } + if (auto security_private_key_pass = context.getProperty(SecurityPrivateKeyPassWord, nullptr)) { ssl_data.key_pw = *security_private_key_pass; } return ssl_data; } -void PublishKafka::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { +MinifiStatus PublishKafka::onTriggerImpl(api::core::ProcessContext& context, api::core::ProcessSession& session) { // Check whether we have been interrupted if (interrupted_) { logger_->log_info("The processor has been interrupted, not running onTrigger"); - context.yield(); - return; + return MINIFI_STATUS_PROCESSOR_YIELD; } std::lock_guard lock_connection(connection_mutex_); @@ -622,17 +619,16 @@ void PublishKafka::onTrigger(core::ProcessContext& context, core::ProcessSession // Collect FlowFiles to process uint64_t actual_bytes = 0U; - std::vector> flowFiles; + std::vector flowFiles; for (uint64_t i = 0; i < batch_size_; i++) { - std::shared_ptr flowFile = session.get(); + auto flowFile = session.get(); if (flowFile == nullptr) { break; } - actual_bytes += flowFile->getSize(); + actual_bytes += session.getFlowFileSize(flowFile); flowFiles.emplace_back(std::move(flowFile)); if (target_batch_payload_size_ != 0U && actual_bytes >= target_batch_payload_size_) { break; } } if (flowFiles.empty()) { - context.yield(); - return; + return MINIFI_STATUS_PROCESSOR_YIELD; } logger_->log_debug("Processing {} flow files with a total size of {} B", flowFiles.size(), actual_bytes); @@ -649,15 +645,17 @@ void PublishKafka::onTrigger(core::ProcessContext& context, core::ProcessSession }); // Process FlowFiles - for (auto& flowFile: flowFiles) { + for (auto& flow_file: flowFiles) { const size_t flow_file_index = messages->addFlowFile(); // Get Topic (FlowFile-dependent EL property) - const auto topic = context.getProperty(Topic, flowFile.get()); + const auto topic = api::utils::parseOptionalProperty(context, Topic, &flow_file); + const std::string flow_file_id = session.getFlowFileId(flow_file); + const uint64_t flow_file_size = session.getFlowFileSize(flow_file); if (topic) { - logger_->log_debug("PublishKafka: topic for flow file {} is '{}'", flowFile->getUUIDStr(), *topic); + logger_->log_debug("PublishKafka: topic for flow file {} is '{}'", flow_file_id, *topic); } else { - logger_->log_error("Flow file {} does not have a valid Topic", flowFile->getUUIDStr()); + logger_->log_error("Flow file {} does not have a valid Topic", flow_file_id); messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) { flow_file_result.flow_file_error = true; }); continue; @@ -665,7 +663,7 @@ void PublishKafka::onTrigger(core::ProcessContext& context, core::ProcessSession // Add topic to the connection if needed if (!conn_->hasTopic(*topic)) { - if (!createNewTopic(context, *topic, flowFile)) { + if (!createNewTopic(context, *topic, flow_file)) { logger_->log_error("Failed to add topic {}", *topic); messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) { flow_file_result.flow_file_error = true; @@ -674,7 +672,7 @@ void PublishKafka::onTrigger(core::ProcessContext& context, core::ProcessSession } } - std::string kafkaKey = context.getProperty(KafkaKey, flowFile.get()).value_or(flowFile->getUUIDStr()); + std::string kafkaKey = api::utils::parseOptionalProperty(context, KafkaKey, &flow_file).value_or(flow_file_id); logger_->log_debug("PublishKafka: Message Key [{}]", kafkaKey); @@ -687,30 +685,33 @@ void PublishKafka::onTrigger(core::ProcessContext& context, core::ProcessSession continue; } - bool failEmptyFlowFiles = utils::parseBoolProperty(context, FailEmptyFlowFiles); + bool failEmptyFlowFiles = api::utils::parseBoolProperty(context, FailEmptyFlowFiles); + ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), - *flowFile, + flow_file_size, + flow_file, + session, attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles, logger_); - session.read(flowFile, std::ref(callback)); + session.read(flow_file, std::ref(callback)); if (!callback.called_) { // workaround: call callback since ProcessSession doesn't do so for empty flow files without resource claims callback(nullptr); } - if (flowFile->getSize() == 0 && failEmptyFlowFiles) { + if (flow_file_size == 0 && failEmptyFlowFiles) { logger_->log_debug( "Deprecated behavior, use connections to drop empty flow files! " "Failing empty flow file with uuid: {}", - flowFile->getUUIDStr()); + flow_file_id); messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) { flow_file_result.flow_file_error = true; }); } @@ -748,33 +749,32 @@ void PublishKafka::onTrigger(core::ProcessContext& context, core::ProcessSession logger_->log_error( "Waiting for delivery confirmation was interrupted for flow " "file {} segment {}", - flowFiles[index]->getUUIDStr(), + session.getFlowFileId(flowFiles[index]), segment_num); break; case MessageStatus::Error: success = false; logger_->log_error("Failed to deliver flow file {} segment {}, error: {}", - flowFiles[index]->getUUIDStr(), + session.getFlowFileId(flowFiles[index]), segment_num, rd_kafka_err2str(message.err_code)); break; case MessageStatus::Success: logger_->log_debug("Successfully delivered flow file {} segment {}", - flowFiles[index]->getUUIDStr(), + session.getFlowFileId(flowFiles[index]), segment_num); break; } } } if (success) { - session.transfer(flowFiles[index], Success); + session.transfer(std::move(flowFiles[index]), Success); } else { session.penalize(flowFiles[index]); - session.transfer(flowFiles[index], Failure); + session.transfer(std::move(flowFiles[index]), Failure); } }); + return MINIFI_STATUS_SUCCESS; } -REGISTER_RESOURCE(PublishKafka, Processor); - } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/kafka/PublishKafka.h b/extensions/kafka/PublishKafka.h index 61cd6ccd3b..682e4218ee 100644 --- a/extensions/kafka/PublishKafka.h +++ b/extensions/kafka/PublishKafka.h @@ -29,19 +29,16 @@ #include "KafkaConnection.h" #include "KafkaProcessorBase.h" -#include "minifi-cpp/controllers/SSLContextServiceInterface.h" -#include "core/Core.h" -#include "minifi-cpp/core/FlowFile.h" -#include "core/ProcessSession.h" +#include "api/core/ProcessSession.h" #include "minifi-cpp/core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "minifi-cpp/core/RelationshipDefinition.h" #include "minifi-cpp/core/logging/Logger.h" -#include "core/logging/LoggerFactory.h" #include "minifi-cpp/core/PropertyValidator.h" #include "rdkafka.h" #include "utils/ArrayUtils.h" #include "utils/RegexUtils.h" +#include "minifi-cpp/core/Annotation.h" namespace org::apache::nifi::minifi::processors { @@ -245,8 +242,6 @@ class PublishKafka final : public KafkaProcessorBase { EXTENSIONAPI static constexpr auto InputRequirement = core::annotation::Input::INPUT_REQUIRED; EXTENSIONAPI static constexpr bool IsSingleThreaded = false; - ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - using KafkaProcessorBase::KafkaProcessorBase; PublishKafka(const PublishKafka&) = delete; @@ -255,18 +250,17 @@ class PublishKafka final : public KafkaProcessorBase { PublishKafka& operator=(PublishKafka&&) = delete; ~PublishKafka() override = default; - void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; - void initialize() override; - void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& sessionFactory) override; - void notifyStop() override; + MinifiStatus onTriggerImpl(api::core::ProcessContext& context, api::core::ProcessSession& session) override; + MinifiStatus onScheduleImpl(api::core::ProcessContext& context) override; + void onUnSchedule() override; class Messages; protected: - bool configureNewConnection(core::ProcessContext& context); + void configureNewConnection(api::core::ProcessContext& context); bool createNewTopic( - core::ProcessContext& context, const std::string& topic_name, const std::shared_ptr& flow_file) const; - std::optional getSslData(core::ProcessContext& context) const override; + const api::core::ProcessContext& context, const std::string& topic_name, const api::core::FlowFile& flow_file) const; + std::optional getSslData(api::core::ProcessContext& context) const override; private: KafkaConnectionKey key_; diff --git a/extensions/kafka/rdkafka_utils.cpp b/extensions/kafka/rdkafka_utils.cpp index d3c38dab1a..ab1ed7d978 100644 --- a/extensions/kafka/rdkafka_utils.cpp +++ b/extensions/kafka/rdkafka_utils.cpp @@ -20,6 +20,7 @@ #include #include "minifi-cpp/Exception.h" +#include "minifi-cpp/core/logging/Logger.h" #include "utils/StringUtils.h" namespace org::apache::nifi::minifi::utils { @@ -34,12 +35,70 @@ void setKafkaConfigurationField(rd_kafka_conf_t& configuration, const std::strin } } -void print_topics_list(core::logging::Logger& logger, const rd_kafka_topic_partition_list_t& kf_topic_partition_list) { +void KafkaOpaque::print_topics_list(const rd_kafka_topic_partition_list_t& kf_topic_partition_list) const { + if (!logger_.should_log(core::logging::debug)) + return; for (int i = 0; i < kf_topic_partition_list.cnt; ++i) { - logger.log_debug("kf_topic_partition_list: topic: {}, partition: {}, offset: {}.", kf_topic_partition_list.elems[i].topic, + logger_.log_debug("kf_topic_partition_list: topic: {}, partition: {}, offset: {}.", kf_topic_partition_list.elems[i].topic, kf_topic_partition_list.elems[i].partition, kf_topic_partition_list.elems[i].offset); } } +void KafkaOpaque::logCallback(const rd_kafka_t* rk, const int level, const char*, const char* buf) { + const auto kafka_opaque = static_cast(rd_kafka_opaque(rk)); + + if (!kafka_opaque) { return; } + + switch (level) { + case 0: // LOG_EMERG + case 1: // LOG_ALERT + case 2: // LOG_CRIT + kafka_opaque->logger_.log_critical("{}", buf); + break; + case 3: // LOG_ERR + kafka_opaque->logger_.log_error("{}", buf); + break; + case 4: // LOG_WARNING + kafka_opaque->logger_.log_warn("{}", buf); + break; + case 5: // LOG_NOTICE + case 6: // LOG_INFO + kafka_opaque->logger_.log_info("{}", buf); + break; + case 7: // LOG_DEBUG + kafka_opaque->logger_.log_debug("{}", buf); + break; + default: gsl_FailFast(); + } +} +void KafkaOpaque::rebalance_cb(rd_kafka_t* rk, const rd_kafka_resp_err_t trigger, rd_kafka_topic_partition_list_t* partitions, void* opaque_ptr) { + const auto* kafka_opaque = static_cast(opaque_ptr); + if (!kafka_opaque) { + return; + } + kafka_opaque->logger_.log_debug("Rebalance triggered."); + rd_kafka_resp_err_t assign_error = RD_KAFKA_RESP_ERR_NO_ERROR; + switch (trigger) { + case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: + kafka_opaque->logger_.log_debug("assigned:"); + kafka_opaque->print_topics_list(*partitions); + assign_error = rd_kafka_assign(rk, partitions); + break; + + case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: + kafka_opaque->logger_.log_debug("revoked:"); + rd_kafka_commit(rk, partitions, /* async = */ 0); // Sync commit, maybe unnecessary + kafka_opaque->print_topics_list(*partitions); + + assign_error = rd_kafka_assign(rk, nullptr); + break; + + default: + kafka_opaque->logger_.log_debug("failed: {}", rd_kafka_err2str(trigger)); + assign_error = rd_kafka_assign(rk, nullptr); + break; + } + kafka_opaque->logger_.log_debug("assign failure: {}", rd_kafka_err2str(assign_error)); +} std::string get_human_readable_kafka_message_timestamp(const rd_kafka_message_t& rkmessage) { rd_kafka_timestamp_type_t tstype{}; diff --git a/extensions/kafka/rdkafka_utils.h b/extensions/kafka/rdkafka_utils.h index b09e51a8a2..2bd3711a0b 100644 --- a/extensions/kafka/rdkafka_utils.h +++ b/extensions/kafka/rdkafka_utils.h @@ -17,16 +17,14 @@ #pragma once -#include #include #include #include -#include -#include +#include -#include "core/logging/LoggerFactory.h" #include "rdkafka.h" -#include "utils/net/Ssl.h" +#include "api/utils/Ssl.h" +#include "minifi-cpp/core/logging/Logger.h" namespace org::apache::nifi::minifi::utils { @@ -87,18 +85,28 @@ template void kafka_headers_for_each(const rd_kafka_headers_t& headers, T key_value_handle) { const char* key = nullptr; // Null terminated, not to be freed const void* value = nullptr; - std::size_t size; + std::size_t size = 0; for (std::size_t i = 0; RD_KAFKA_RESP_ERR_NO_ERROR == rd_kafka_header_get_all(&headers, i, &key, &value, &size); ++i) { key_value_handle(std::string(key), std::span(static_cast(value), size)); } } void setKafkaConfigurationField(rd_kafka_conf_t& configuration, const std::string& field_name, const std::string& value); -void print_topics_list(core::logging::Logger& logger, const rd_kafka_topic_partition_list_t& kf_topic_partition_list); void print_kafka_message(const rd_kafka_message_t& rkmessage, core::logging::Logger& logger); std::string get_encoded_string(const std::string& input, KafkaEncoding encoding); std::optional get_encoded_message_key(const rd_kafka_message_t& message, KafkaEncoding encoding); +class KafkaOpaque { + public: + explicit KafkaOpaque(core::logging::Logger& logger) : logger_(logger) {} + + void print_topics_list(const rd_kafka_topic_partition_list_t& kf_topic_partition_list) const; + static void logCallback(const rd_kafka_t* rk, int level, const char* /*fac*/, const char* buf); + static void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t trigger, rd_kafka_topic_partition_list_t* partitions, void* opaque_ptr); + + private: + core::logging::Logger& logger_; +}; } // namespace org::apache::nifi::minifi::utils namespace magic_enum::customize { diff --git a/extensions/kafka/tests/CMakeLists.txt b/extensions/kafka/tests/CMakeLists.txt index 98341cf8fc..c1e2858735 100644 --- a/extensions/kafka/tests/CMakeLists.txt +++ b/extensions/kafka/tests/CMakeLists.txt @@ -24,10 +24,10 @@ FOREACH (testfile ${KAFKA_INTEGRATION_TESTS}) get_filename_component(testfilename "${testfile}" NAME_WE) add_minifi_executable("${testfilename}" "${testfile}") target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/kafka") - createTests("${testfilename}") + set_target_properties(${testfilename} PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin") target_link_libraries(${testfilename} Catch2WithMain) - target_link_libraries(${testfilename} minifi-rdkafka-extensions) - target_link_libraries(${testfilename} minifi-standard-processors) + target_link_libraries(${testfilename} minifi-kafka) + target_link_libraries(${testfilename} mock-minifi) MATH(EXPR EXTENSIONS_TEST_COUNT "${EXTENSIONS_TEST_COUNT}+1") add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR}) set_tests_properties("${testfilename}" PROPERTIES LABELS "librdkafka;memchecked") diff --git a/extensions/kafka/tests/PublishKafkaTests.cpp b/extensions/kafka/tests/PublishKafkaTests.cpp index aeb286531b..445c78d394 100644 --- a/extensions/kafka/tests/PublishKafkaTests.cpp +++ b/extensions/kafka/tests/PublishKafkaTests.cpp @@ -15,57 +15,44 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "unit/TestBase.h" -#include "unit/TestUtils.h" -#include "unit/Catch.h" +#include "MockLogger.h" +#include "MockProcessContext.h" +#include "MockProcessSession.h" +#include "MockUtils.h" #include "PublishKafka.h" -#include "unit/SingleProcessorTestController.h" +#include "catch2/catch_test_macros.hpp" +#include "catch2/matchers/catch_matchers.hpp" -namespace org::apache::nifi::minifi::test { +namespace org::apache::nifi::minifi::processors::test { -TEST_CASE("Scheduling should fail when batch size is larger than the max queue message count", "[testPublishKafka]") { - LogTestController::getInstance().setTrace(); - LogTestController::getInstance().setTrace(); - SingleProcessorTestController test_controller(minifi::test::utils::make_processor("PublishKafka")); - const auto publish_kafka = test_controller.getProcessor(); - REQUIRE(publish_kafka->setProperty(processors::PublishKafka::ClientName.name, "test_client")); - REQUIRE(publish_kafka->setProperty(processors::PublishKafka::SeedBrokers.name, "test_seedbroker")); - REQUIRE(publish_kafka->setProperty(processors::PublishKafka::QueueBufferMaxMessage.name, "1000")); - REQUIRE(publish_kafka->setProperty(processors::PublishKafka::BatchSize.name, "1500")); - REQUIRE_THROWS_WITH(test_controller.trigger(""), "Process Schedule Operation: Invalid configuration: Batch Size cannot be larger than Queue Max Message"); +TEST_CASE("Batch Size cannot be larger than Queue Max Message", "[PublishKafka]") { + auto publish_kafka = PublishKafka(mock::getMockMetadata()); + auto context = mock::MockProcessContext{}; + context.properties_.emplace(PublishKafka::ClientName.name, "test_client"); + context.properties_.emplace(PublishKafka::SeedBrokers.name, "test_seedbroker"); + context.properties_.emplace(PublishKafka::QueueBufferMaxMessage.name, "1000"); + context.properties_.emplace(PublishKafka::BatchSize.name, "1500"); + + REQUIRE_THROWS_WITH(publish_kafka.onScheduleImpl(context), + "Process Schedule Operation: Invalid configuration: Batch Size cannot be larger than Queue Max Message"); } -TEST_CASE("Compress Codec property") { - using processors::PublishKafka; - SingleProcessorTestController test_controller(minifi::test::utils::make_processor("PublishKafka")); - REQUIRE(test_controller.getProcessor()->setProperty(PublishKafka::ClientName.name, "test_client")); - REQUIRE(test_controller.getProcessor()->setProperty(PublishKafka::SeedBrokers.name, "test_seedbroker")); - REQUIRE(test_controller.getProcessor()->setProperty(PublishKafka::Topic.name, "test_topic")); - REQUIRE(test_controller.getProcessor()->setProperty(PublishKafka::MessageTimeOut.name, "10ms")); +TEST_CASE("Trigger without valid broker", "[PublishKafka]") { + const auto logger = std::make_shared(); + auto publish_kafka = PublishKafka(mock::metadataWithLogger(logger)); + auto context = mock::MockProcessContext{}; + context.properties_.emplace(PublishKafka::ClientName.name, "test_client"); + context.properties_.emplace(PublishKafka::SeedBrokers.name, "test_seedbroker"); + context.properties_.emplace(PublishKafka::Topic.name, "test_topic"); + context.properties_.emplace(PublishKafka::MessageTimeOut.name, "10 ms"); + + + REQUIRE_NOTHROW(publish_kafka.onScheduleImpl(context)); + auto session = mock::MockProcessSession{}; + session.addInputFlowFile(mock::MockFlowFileData("test_input_flow_file")); + REQUIRE_NOTHROW(publish_kafka.onTriggerImpl(context, session)); - SECTION("none") { - REQUIRE(test_controller.getProcessor()->setProperty(PublishKafka::CompressCodec.name, "none")); - REQUIRE_NOTHROW(test_controller.trigger("input")); - } - SECTION("gzip") { - REQUIRE(test_controller.getProcessor()->setProperty(PublishKafka::CompressCodec.name, "gzip")); - REQUIRE_NOTHROW(test_controller.trigger("input")); - } - SECTION("snappy") { - REQUIRE(test_controller.getProcessor()->setProperty(PublishKafka::CompressCodec.name, "snappy")); - REQUIRE_NOTHROW(test_controller.trigger("input")); - } - SECTION("lz4") { - REQUIRE(test_controller.getProcessor()->setProperty(PublishKafka::CompressCodec.name, "lz4")); - REQUIRE_NOTHROW(test_controller.trigger("input")); - } - SECTION("zstd") { - REQUIRE(test_controller.getProcessor()->setProperty(PublishKafka::CompressCodec.name, "zstd")); - REQUIRE_NOTHROW(test_controller.trigger("input")); - } - SECTION("foo") { - REQUIRE_FALSE(test_controller.getProcessor()->setProperty(PublishKafka::CompressCodec.name, "foo")); - } + CHECK_FALSE(logger->logs_.empty()); } -} // namespace org::apache::nifi::minifi::test +} // namespace org::apache::nifi::minifi::processors::test diff --git a/extensions/llamacpp/CMakeLists.txt b/extensions/llamacpp/CMakeLists.txt index 421143f692..9caae4033f 100644 --- a/extensions/llamacpp/CMakeLists.txt +++ b/extensions/llamacpp/CMakeLists.txt @@ -25,7 +25,7 @@ include(LlamaCpp) include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) -file(GLOB SOURCES "processors/*.cpp") +file(GLOB SOURCES "processors/*.cpp" "ExtensionInitializer.cpp") add_minifi_library(minifi-llamacpp SHARED ${SOURCES}) target_include_directories(minifi-llamacpp PUBLIC "${CMAKE_SOURCE_DIR}/extensions/llamacpp") diff --git a/extensions/llamacpp/processors/ExtensionInitializer.cpp b/extensions/llamacpp/ExtensionInitializer.cpp similarity index 85% rename from extensions/llamacpp/processors/ExtensionInitializer.cpp rename to extensions/llamacpp/ExtensionInitializer.cpp index 8e06bf0cc3..09b142ff21 100644 --- a/extensions/llamacpp/processors/ExtensionInitializer.cpp +++ b/extensions/llamacpp/ExtensionInitializer.cpp @@ -15,9 +15,9 @@ * limitations under the License. */ -#include "RunLlamaCppInference.h" #include "api/core/Resource.h" #include "api/utils/minifi-c-utils.h" +#include "processors/RunLlamaCppInference.h" #define MKSOC(x) #x #define MAKESTRING(x) MKSOC(x) // NOLINT(cppcoreguidelines-macro-usage) @@ -34,7 +34,5 @@ CEXTENSIONAPI void MinifiInitExtension(MinifiExtensionContext* extension_context .user_data = nullptr }; auto* extension = MinifiRegisterExtension(extension_context, &extension_definition); - minifi::api::core::useProcessorClassDefinition([&] (const MinifiProcessorClassDefinition& definition) { - MinifiRegisterProcessor(extension, &definition); - }); + minifi::api::core::registerProcessors(extension); } diff --git a/extensions/llamacpp/processors/RunLlamaCppInference.cpp b/extensions/llamacpp/processors/RunLlamaCppInference.cpp index a8d587521c..49c712f7ef 100644 --- a/extensions/llamacpp/processors/RunLlamaCppInference.cpp +++ b/extensions/llamacpp/processors/RunLlamaCppInference.cpp @@ -31,7 +31,7 @@ namespace org::apache::nifi::minifi::extensions::llamacpp::processors { MinifiStatus RunLlamaCppInference::onScheduleImpl(api::core::ProcessContext& context) { model_path_.clear(); model_path_ = api::utils::parseProperty(context, ModelPath); - system_prompt_ = context.getProperty(SystemPrompt, nullptr).value_or(""); + system_prompt_ = api::utils::parseOptionalProperty(context, SystemPrompt).value_or(""); LlamaSamplerParams llama_sampler_params; llama_sampler_params.temperature = api::utils::parseOptionalFloatProperty(context, Temperature); diff --git a/libminifi/src/minifi-c.cpp b/libminifi/src/minifi-c.cpp index 1c5d98fcf0..1045c5a5b4 100644 --- a/libminifi/src/minifi-c.cpp +++ b/libminifi/src/minifi-c.cpp @@ -613,19 +613,21 @@ void MinifiProcessContextGetDynamicProperties(MinifiProcessContext* context, Min } } -MinifiStatus MinifiProcessContextGetSslData(MinifiProcessContext* process_context, MinifiStringView controller_service_name, +MinifiStatus MinifiProcessContextGetSslDataFromProperty(MinifiProcessContext* process_context, MinifiStringView property_name, void (*cb)(void* user_ctx, const MinifiSslData* ssl_data), void* user_ctx) { gsl_Assert(process_context != MINIFI_NULL); try { const auto context = reinterpret_cast(process_context); - const auto name_str = std::string{toStringView(controller_service_name)}; - const auto service_shared_ptr = context->getControllerService(name_str, context->getProcessorInfo().getUUID()); + const auto property_name_str = std::string{toStringView(property_name)}; + const auto name_str = context->getProperty(property_name_str, nullptr); + if (!name_str) { return MINIFI_STATUS_PROPERTY_NOT_SET; } + const auto service_shared_ptr = context->getControllerService(*name_str, context->getProcessorInfo().getUUID()); if (!service_shared_ptr) { return MINIFI_STATUS_VALIDATION_FAILED; } if (const auto ssl_context_service = dynamic_cast(service_shared_ptr.get())) { - const std::string ca_cert_file = ssl_context_service->getCACertificate().string(); - const std::string passphrase = ssl_context_service->getPassphrase(); - const std::string cert_file = ssl_context_service->getCertificateFile().string(); - const std::string private_key_file = ssl_context_service->getPrivateKeyFile().string(); + const std::string ca_cert_file = ssl_context_service->getCACertificate().string(); + const std::string passphrase = ssl_context_service->getPassphrase(); + const std::string cert_file = ssl_context_service->getCertificateFile().string(); + const std::string private_key_file = ssl_context_service->getPrivateKeyFile().string(); MinifiSslData ssl_data{ .ca_certificate_file = minifiStringView(ca_cert_file), diff --git a/minifi-api/include/minifi-c/minifi-c.h b/minifi-api/include/minifi-c/minifi-c.h index e846efbdcf..b885cd614f 100644 --- a/minifi-api/include/minifi-c/minifi-c.h +++ b/minifi-api/include/minifi-c/minifi-c.h @@ -280,7 +280,7 @@ typedef struct MinifiSslData { MinifiStringView passphrase; } MinifiSslData; -MinifiStatus MinifiProcessContextGetSslData(MinifiProcessContext* process_context, MinifiStringView controller_service_name, +MinifiStatus MinifiProcessContextGetSslDataFromProperty(MinifiProcessContext* process_context, MinifiStringView property_name, void (*cb)(void* user_ctx, const MinifiSslData* ssl_data), void* user_ctx); #ifdef __cplusplus diff --git a/minifi-api/minifi-c-api.def b/minifi-api/minifi-c-api.def index 0fcaf2ad97..0baa3063c7 100644 --- a/minifi-api/minifi-c-api.def +++ b/minifi-api/minifi-c-api.def @@ -29,4 +29,4 @@ EXPORTS MinifiProcessSessionGetFlowFileSize MinifiProcessSessionGetFlowFileId MinifiProcessContextGetDynamicProperties - MinifiProcessContextGetSslData + MinifiProcessContextGetSslDataFromProperty