MINIFICPP-2769 Move Kafka Extension to stable C API#2175
Conversation
| namespace org::apache::nifi::minifi::processors { | ||
| // The upper limit for Max Poll Time is 4 seconds. This is because Watchdog would potentially start | ||
| // reporting issues with the processor health otherwise | ||
| bool consume_kafka::ConsumeKafkaMaxPollTimePropertyValidator::validate(const std::string_view input) const { |
There was a problem hiding this comment.
Removed this custom validator (because C Api only supports standard validators ATM), and added a custom validation into onSchedule
| } | ||
|
|
||
| namespace { | ||
| void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t trigger, rd_kafka_topic_partition_list_t* partitions, void* /*opaque*/) { |
There was a problem hiding this comment.
I've moved this rebalence_cb into a helper class thats available through an opaque handle (needed for logging which is not available anymore through statis functions)
https://github.com/apache/nifi-minifi-cpp/pull/2175/changes#diff-93908b3b2326640601fa590e78c35c046654a1ac90c6bf70dcd7dd1bcf0d2c5cR73
| setKafkaConfigurationField(*conf_, "isolation.level", utils::parseBoolProperty(context, HonorTransactions) ? "read_committed" : "read_uncommitted"); | ||
| setKafkaConfigurationField(*conf_, "group.id", utils::parseProperty(context, GroupID)); | ||
| setKafkaConfigurationField(*conf_, "client.id", this->getUUIDStr()); | ||
| // setKafkaConfigurationField(*conf_, "client.id", client_id); No need to set id since its autogenerated, and we don't access it anywhere from minifi |
There was a problem hiding this comment.
We dont have access to the processor's UUID, but this seems unnessary anyways, any other ideas are welcome :D
| topics_[topicName] = topic; | ||
| } | ||
|
|
||
| void KafkaConnection::logCallback(const rd_kafka_t* rk, const int level, const char* /*fac*/, const char* buf) { |
There was a problem hiding this comment.
This is also been moved into KafkaOpaque class https://github.com/apache/nifi-minifi-cpp/pull/2175/changes#diff-93908b3b2326640601fa590e78c35c046654a1ac90c6bf70dcd7dd1bcf0d2c5cR46
There was a problem hiding this comment.
Pull request overview
This PR migrates the Kafka extension (and some related extension infrastructure) toward the stable C API / cpp-extension-lib surface by refactoring processor implementations to api::core::* interfaces, switching extension registration to the C-API initializer path, and adjusting supporting utilities/tests accordingly.
Changes:
- Rename/process-context SSL data C API entry point and wire it through the CFFI
ProcessContextwrapper. - Refactor Kafka processors (PublishKafka/ConsumeKafka) to
api::core::{ProcessContext,ProcessSession}and introduceKafkaOpaquefor librdkafka callbacks/logging. - Update extension initializers/CMake targets (Kafka + LlamaCpp) and modernize Kafka tests to use the mock C-API framework.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| minifi-api/minifi-c-api.def | Updates exported C API symbol list for SSL data retrieval. |
| minifi-api/include/minifi-c/minifi-c.h | Renames the SSL data retrieval C API declaration. |
| libminifi/src/minifi-c.cpp | Implements renamed SSL data retrieval function by resolving controller service name from a property. |
| extensions/llamacpp/processors/RunLlamaCppInference.cpp | Switches to parseOptionalProperty helper for optional property parsing. |
| extensions/llamacpp/ExtensionInitializer.cpp | Updates include path and registers processors via C API extension initializer helper. |
| extensions/llamacpp/CMakeLists.txt | Ensures ExtensionInitializer.cpp is part of the build sources. |
| extensions/kafka/tests/PublishKafkaTests.cpp | Replaces legacy test harness with mock C-API process context/session based tests. |
| extensions/kafka/tests/CMakeLists.txt | Updates test linking/runtime output settings for the new test approach. |
| extensions/kafka/rdkafka_utils.h | Refactors utilities and introduces KafkaOpaque for callbacks; updates includes/types. |
| extensions/kafka/rdkafka_utils.cpp | Implements KafkaOpaque log + rebalance callbacks and topic list debug printing. |
| extensions/kafka/PublishKafka.h | Migrates processor interface to api::core and updates SSL/type handling. |
| extensions/kafka/PublishKafka.cpp | Migrates scheduling/triggering/session interactions to api::core and updates config callbacks. |
| extensions/kafka/KafkaProcessorBase.h | Migrates base class to api::core::ProcessorImpl and updates SSL property allowed type. |
| extensions/kafka/KafkaProcessorBase.cpp | Reworks SSL retrieval and authentication parameter setup for the new API surface. |
| extensions/kafka/KafkaConnection.h | Removes legacy logger plumbing and static log callback approach. |
| extensions/kafka/KafkaConnection.cpp | Removes legacy logger mapping/callback implementation. |
| extensions/kafka/ExtensionInitializer.cpp | Adds C API extension initializer registering Kafka processors. |
| extensions/kafka/ConsumeKafka.h | Migrates processor interface to api::core, updates validator usage, refactors helper signatures. |
| extensions/kafka/ConsumeKafka.cpp | Migrates scheduling/triggering/session interactions to api::core and updates callbacks/config handling. |
| extensions/kafka/CMakeLists.txt | Renames the extension library target and registers it as a C-API extension. |
| extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp | Routes SSL retrieval through the renamed C API function. |
| extension-framework/cpp-extension-lib/mocklib/src/mock-minifi-c.cpp | Updates mocked symbol name and wraps exports in extern "C". |
| extension-framework/cpp-extension-lib/mocklib/CMakeLists.txt | Renames mock library target to mock-minifi. |
| core-framework/common/include/core/PropertyDefinitionBuilder.h | Adds string-literal-based withAllowedType() support for stable allowed-type storage. |
| CMakeLists.txt | Updates default-enabled extension name from the old Kafka target to the new one. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
e45e239 to
2878fa2
Compare
2878fa2 to
527195a
Compare
| const std::string ca_cert_file = ssl_context_service->getCACertificate().string(); | ||
| const std::string passphrase = ssl_context_service->getPassphrase(); | ||
| const std::string cert_file = ssl_context_service->getCertificateFile().string(); | ||
| const std::string private_key_file = ssl_context_service->getPrivateKeyFile().string(); |
| duplicate_header_handling_ = utils::parseEnumProperty<consume_kafka::MessageHeaderPolicyEnum>(context, DuplicateHeaderHandling); | ||
| max_poll_time_milliseconds_ = utils::parseDurationProperty(context, MaxPollTime); | ||
| if (max_poll_time_milliseconds_ > 4s) { | ||
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, "MaxPollTime is too large (it should be less than 4s)"); |
| #include "rdkafka_utils.h" | ||
|
|
||
| #include <array> | ||
|
|
||
| #include "minifi-cpp/Exception.h" | ||
| #include "minifi-cpp/core/logging/Logger.h" | ||
| #include "utils/StringUtils.h" | ||
|
|
| if (!result) { throw std::bad_alloc{}; } | ||
|
|
||
| for (const auto& [attribute_key, attribute_value]: flow_file.getAttributes()) { | ||
| for (const auto& [attribute_key, attribute_value]: session.getAttributes(flow_file)) { |
Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically main)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.