From b8f3bd51cbbff919ec1c13bc98d2ba823a9c8d08 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Mon, 14 Jul 2025 16:50:48 +0200 Subject: [PATCH] MINIFICPP-2590 Add Sparkplug B support for PublishMQTT processor --- CONTROLLERS.md | 15 + README.md | 2 +- docker/test/integration/features/mqtt.feature | 36 ++ .../test/integration/features/steps/steps.py | 24 +- .../minifi/controllers/SparkplugBWriter.py | 23 ++ .../mqtt/controllers/SparkplugBWriter.cpp | 354 ++++++++++++++++++ .../mqtt/controllers/SparkplugBWriter.h | 58 +++ extensions/mqtt/tests/PublishMQTTTests.cpp | 38 ++ .../mqtt/tests/SparkplugBWriterTests.cpp | 250 +++++++++++++ minifi-api/include/minifi-cpp/core/Record.h | 4 + 10 files changed, 800 insertions(+), 4 deletions(-) create mode 100644 docker/test/integration/minifi/controllers/SparkplugBWriter.py create mode 100644 extensions/mqtt/controllers/SparkplugBWriter.cpp create mode 100644 extensions/mqtt/controllers/SparkplugBWriter.h create mode 100644 extensions/mqtt/tests/SparkplugBWriterTests.cpp diff --git a/CONTROLLERS.md b/CONTROLLERS.md index 0b8c0676db..f7a0a69adf 100644 --- a/CONTROLLERS.md +++ b/CONTROLLERS.md @@ -30,6 +30,7 @@ limitations under the License. - [RocksDbStateStorage](#RocksDbStateStorage) - [SmbConnectionControllerService](#SmbConnectionControllerService) - [SparkplugBReader](#SparkplugBReader) +- [SparkplugBWriter](#SparkplugBWriter) - [SSLContextService](#SSLContextService) - [UpdatePolicyControllerService](#UpdatePolicyControllerService) - [VolatileMapStateStorage](#VolatileMapStateStorage) @@ -295,6 +296,20 @@ In the list below, the names of required properties appear in bold. Any other pr |------|---------------|------------------|-------------| +## SparkplugBWriter + +### Description + +Serializes recordset to Sparkplug B messages and writes them into a FlowFile. This writer is typically used with MQTT processors like PublishMQTT. + +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +|------|---------------|------------------|-------------| + + ## SSLContextService ### Description diff --git a/README.md b/README.md index 6940651da6..8f4bcdb2a1 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,7 @@ The next table outlines CMAKE flags that correspond with MiNiFi extensions. Exte | Kubernetes (Linux) | [KubernetesControllerService](CONTROLLERS.md#kubernetescontrollerservice) | -DENABLE_KUBERNETES=ON | | LlamaCpp | [RunLlamaCppInference](PROCESSORS.md#runllamacppinference) | -DENABLE_LLAMACPP=ON | | Lua Scripting | [ExecuteScript](PROCESSORS.md#executescript) | -DENABLE_LUA_SCRIPTING=ON | -| MQTT | [ConsumeMQTT](PROCESSORS.md#consumemqtt)
[PublishMQTT](PROCESSORS.md#publishmqtt)
[SparkplugBReader](PROCESSORS.md#sparkplugbreader) | -DENABLE_MQTT=ON | +| MQTT | [ConsumeMQTT](PROCESSORS.md#consumemqtt)
[PublishMQTT](PROCESSORS.md#publishmqtt)
[SparkplugBReader](PROCESSORS.md#sparkplugbreader)
[SparkplugBWriter](PROCESSORS.md#sparkplugbwriter) | -DENABLE_MQTT=ON | | OPC | [FetchOPCProcessor](PROCESSORS.md#fetchopcprocessor)
[PutOPCProcessor](PROCESSORS.md#putopcprocessor) | -DENABLE_OPC=ON | | OpenCV | [CaptureRTSPFrame](PROCESSORS.md#capturertspframe)
[MotionDetector](PROCESSORS.md#motiondetector) | -DENABLE_OPENCV=ON | | PDH (Windows) | [PerformanceDataMonitor](PROCESSORS.md#performancedatamonitor) | -DENABLE_PDH=ON | diff --git a/docker/test/integration/features/mqtt.feature b/docker/test/integration/features/mqtt.feature index 2cacd5b8df..37b94f0b97 100644 --- a/docker/test/integration/features/mqtt.feature +++ b/docker/test/integration/features/mqtt.feature @@ -579,3 +579,39 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT And a flowfile with the JSON content '[{"_isDuplicate": false, "_qos": 0, "_topicSegments": ["spBv1.0", "TestGroup", "DDATA", "TestNode", "TestDevice"], "_topic": "spBv1.0/TestGroup/DDATA/TestNode/TestDevice", "_isRetained": false, "body": "test-body", "uuid": "test-uuid", "seq": 12345, "metrics": [{"int_value": 123, "timestamp": 45345346346, "name": "TestMetric"}], "timestamp": 987654321}]' is placed in the monitored directory in less than 60 seconds And the Minifi logs contain the following message: "key:record.count value:1" in less than 60 seconds And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 1 seconds + + Scenario: A MiNiFi instance publishes then consumes Sparkplug message through MQTT broker + Given an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT + + And a SparkplugBReader controller service is set up in the "consumer-client" flow + And a JsonRecordSetWriter controller service is set up with "Array" output grouping in the "consumer-client" flow + And a SparkplugBWriter controller service is set up in the "publisher-client" flow + And a JsonTreeReader controller service is set up in the "publisher-client" flow + + And a file with the content '{"timestamp":987654321,"metrics":[{"int_value":123,"timestamp":45345346346,"name":"TestMetric"}],"seq":12345,"body":"test-body","uuid":"test-uuid"}' is present in '/tmp/input' + + And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow + And a PublishMQTT processor in the "publisher-client" flow + And the "Topic" property of the PublishMQTT processor is set to "spBv1.0/TestGroup/DDATA/TestNode/TestDevice" + And the "MQTT Version" property of the PublishMQTT processor is set to "3.1.1" + And the "Record Reader" property of the PublishMQTT processor is set to "JsonTreeReader" + And the "Record Writer" property of the PublishMQTT processor is set to "SparkplugBWriter" + + And the "success" relationship of the GetFile processor is connected to the PublishMQTT + + And a ConsumeMQTT processor in the "consumer-client" flow + And the "Topic" property of the ConsumeMQTT processor is set to "spBv1.0/TestGroup/DDATA/TestNode/TestDevice" + And the "MQTT Version" property of the ConsumeMQTT processor is set to "3.1.1" + And the "Record Reader" property of the ConsumeMQTT processor is set to "SparkplugBReader" + And the "Record Writer" property of the ConsumeMQTT processor is set to "JsonRecordSetWriter" + And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow + And a LogAttribute processor with the "Log Payload" property set to "true" in the "consumer-client" flow + And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile + And the "success" relationship of the PutFile processor is connected to the LogAttribute + + When all instances start up + + Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" + And a flowfile with the JSON content '[{"_isDuplicate": false, "_qos": 0, "_topicSegments": ["spBv1.0", "TestGroup", "DDATA", "TestNode", "TestDevice"], "_topic": "spBv1.0/TestGroup/DDATA/TestNode/TestDevice", "_isRetained": false, "body": "test-body", "uuid": "test-uuid", "seq": 12345, "metrics": [{"int_value": 123, "timestamp": 45345346346, "name": "TestMetric"}], "timestamp": 987654321}]' is placed in the monitored directory in less than 60 seconds + And the Minifi logs contain the following message: "key:record.count value:1" in less than 60 seconds + And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 1 seconds diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index 64ae0ba8b3..7d94cd91d2 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -30,6 +30,7 @@ from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService from minifi.controllers.XMLReader import XMLReader from minifi.controllers.SparkplugBReader import SparkplugBReader +from minifi.controllers.SparkplugBWriter import SparkplugBWriter from behave import given, then, when from behave.model_describe import ModelDescriptor @@ -507,13 +508,30 @@ def step_impl(context): context.test.start('mqtt-broker') -@given("a SparkplugBReader controller service is set up") -def step_impl(context): +@given("a SparkplugBReader controller service is set up in the \"{minifi_container_name}\" flow") +def step_impl(context, minifi_container_name: str): sparkplug_record_set_reader = SparkplugBReader("SparkplugBReader") - container = context.test.acquire_container(context=context, name="minifi-cpp-flow") + container = context.test.acquire_container(context=context, name=minifi_container_name) container.add_controller(sparkplug_record_set_reader) +@given("a SparkplugBWriter controller service is set up in the \"{minifi_container_name}\" flow") +def step_impl(context, minifi_container_name: str): + sparkplug_record_set_writer = SparkplugBWriter("SparkplugBWriter") + container = context.test.acquire_container(context=context, name=minifi_container_name) + container.add_controller(sparkplug_record_set_writer) + + +@given("a SparkplugBReader controller service is set up") +def step_impl(context): + context.execute_steps("given a SparkplugBReader controller service is set up in the \"minifi-cpp-flow\" flow") + + +@given("a SparkplugBWriter controller service is set up") +def step_impl(context): + context.execute_steps("given a SparkplugBWriter controller service is set up in the \"minifi-cpp-flow\" flow") + + @when("a test Sparkplug payload is published to the topic \"{topic}\"") def step_impl(context, topic): context.test.publish_test_sparkplug_payload(topic) diff --git a/docker/test/integration/minifi/controllers/SparkplugBWriter.py b/docker/test/integration/minifi/controllers/SparkplugBWriter.py new file mode 100644 index 0000000000..d9d5721e34 --- /dev/null +++ b/docker/test/integration/minifi/controllers/SparkplugBWriter.py @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ..core.ControllerService import ControllerService + + +class SparkplugBWriter(ControllerService): + def __init__(self, name=None): + super(SparkplugBWriter, self).__init__(name=name) + self.service_class = 'SparkplugBWriter' diff --git a/extensions/mqtt/controllers/SparkplugBWriter.cpp b/extensions/mqtt/controllers/SparkplugBWriter.cpp new file mode 100644 index 0000000000..cdf87d4164 --- /dev/null +++ b/extensions/mqtt/controllers/SparkplugBWriter.cpp @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "SparkplugBWriter.h" + +#include +#include +#include + +#include "sparkplug_b.pb.h" +#include "minifi-cpp/core/Record.h" +#include "core/Resource.h" +#include "utils/expected.h" + +namespace org::apache::nifi::minifi::controllers { + +namespace { + +template +T getIntegral(const std::variant& variant) { + if (std::holds_alternative(variant)) { + return gsl::narrow(std::get(variant)); + } else if (std::holds_alternative(variant)) { + return gsl::narrow(std::get(variant)); + } else { + throw std::invalid_argument("Invalid type for integral conversion"); + } +} + +void writeProperties(const core::RecordObject& properties, org::eclipse::tahu::protobuf::Payload_PropertySet* properties_proto); + +template +void writeMetric(const core::RecordObject& metric, T* metric_proto); + +void writePropertyValue(const core::RecordObject& value, org::eclipse::tahu::protobuf::Payload_PropertyValue* value_proto) { + if (value.contains("type")) { + value_proto->set_type(getIntegral(value.at("type").value_)); + } + + if (value.contains("is_null")) { + value_proto->set_is_null(std::get(value.at("is_null").value_)); + } + + if (value.contains("int_value")) { + value_proto->set_int_value(getIntegral(value.at("int_value").value_)); + } else if (value.contains("long_value")) { + value_proto->set_long_value(getIntegral(value.at("long_value").value_)); + } else if (value.contains("float_value")) { + value_proto->set_float_value(gsl::narrow(std::get(value.at("float_value").value_))); + } else if (value.contains("double_value")) { + value_proto->set_double_value(std::get(value.at("double_value").value_)); + } else if (value.contains("boolean_value")) { + value_proto->set_boolean_value(std::get(value.at("boolean_value").value_)); + } else if (value.contains("string_value")) { + value_proto->set_string_value(std::get(value.at("string_value").value_)); + } else if (value.contains("propertyset_value")) { + const auto& propertyset = std::get(value.at("propertyset_value").value_); + auto* propertyset_proto = value_proto->mutable_propertyset_value(); + writeProperties(propertyset, propertyset_proto); + } else if (value.contains("propertysets_value")) { + const auto& propertysets = std::get(value.at("propertysets_value").value_); + auto* propertysets_proto = value_proto->mutable_propertysets_value(); + for (const auto& propertyset : propertysets) { + auto* propertyset_proto = propertysets_proto->add_propertyset(); + writeProperties(std::get(propertyset.value_), propertyset_proto); + } + } +} + +void writeProperties(const core::RecordObject& properties, org::eclipse::tahu::protobuf::Payload_PropertySet* properties_proto) { + if (properties.contains("keys")) { + const auto& keys = std::get(properties.at("keys").value_); + for (const auto& key : keys) { + properties_proto->add_keys(std::get(key.value_)); + } + } + + if (properties.contains("values")) { + const auto& values = std::get(properties.at("values").value_); + for (const auto& value : values) { + auto* value_proto = properties_proto->add_values(); + writePropertyValue(std::get(value.value_), value_proto); + } + } +} + +void writeMetadataToMetric(const core::RecordObject& metadata, org::eclipse::tahu::protobuf::Payload_MetaData* metadata_proto) { + if (metadata.contains("is_multi_part")) { + metadata_proto->set_is_multi_part(std::get(metadata.at("is_multi_part").value_)); + } + if (metadata.contains("content_type")) { + metadata_proto->set_content_type(std::get(metadata.at("content_type").value_)); + } + if (metadata.contains("size")) { + metadata_proto->set_size(getIntegral(metadata.at("size").value_)); + } + if (metadata.contains("seq")) { + metadata_proto->set_seq(getIntegral(metadata.at("seq").value_)); + } + if (metadata.contains("file_name")) { + metadata_proto->set_file_name(std::get(metadata.at("file_name").value_)); + } + if (metadata.contains("file_type")) { + metadata_proto->set_file_type(std::get(metadata.at("file_type").value_)); + } + if (metadata.contains("md5")) { + metadata_proto->set_md5(std::get(metadata.at("md5").value_)); + } + if (metadata.contains("description")) { + metadata_proto->set_description(std::get(metadata.at("description").value_)); + } +} + +void writeDataSetValue(const core::RecordObject& dataset_value, org::eclipse::tahu::protobuf::Payload_DataSet_DataSetValue* dataset_value_proto) { + if (dataset_value.contains("int_value")) { + dataset_value_proto->set_int_value(getIntegral(dataset_value.at("int_value").value_)); + } else if (dataset_value.contains("long_value")) { + dataset_value_proto->set_long_value(getIntegral(dataset_value.at("long_value").value_)); + } else if (dataset_value.contains("float_value")) { + dataset_value_proto->set_float_value(gsl::narrow(std::get(dataset_value.at("float_value").value_))); + } else if (dataset_value.contains("double_value")) { + dataset_value_proto->set_double_value(std::get(dataset_value.at("double_value").value_)); + } else if (dataset_value.contains("boolean_value")) { + dataset_value_proto->set_boolean_value(std::get(dataset_value.at("boolean_value").value_)); + } else if (dataset_value.contains("string_value")) { + dataset_value_proto->set_string_value(std::get(dataset_value.at("string_value").value_)); + } +} + +void writeRow(const core::RecordObject& row, org::eclipse::tahu::protobuf::Payload_DataSet_Row* row_proto) { + if (row.contains("elements")) { + const auto& datasetvalue_elements = std::get(row.at("elements").value_); + for (const auto& dataset_value : datasetvalue_elements) { + auto* dataset_value_proto = row_proto->add_elements(); + writeDataSetValue(std::get(dataset_value.value_), dataset_value_proto); + } + } +} + +void writeDataSet(const core::RecordObject& dataset, org::eclipse::tahu::protobuf::Payload_DataSet* dataset_proto) { + if (dataset.contains("num_of_columns")) { + dataset_proto->set_num_of_columns(getIntegral(dataset.at("num_of_columns").value_)); + } + if (dataset.contains("columns")) { + const auto& columns = std::get(dataset.at("columns").value_); + for (const auto& column : columns) { + dataset_proto->add_columns(std::get(column.value_)); + } + } + if (dataset.contains("types")) { + const auto& types = std::get(dataset.at("types").value_); + for (const auto& type : types) { + dataset_proto->add_types(getIntegral(type.value_)); + } + } + if (dataset.contains("rows")) { + const auto& rows = std::get(dataset.at("rows").value_); + for (const auto& row : rows) { + auto* row_proto = dataset_proto->add_rows(); + writeRow(std::get(row.value_), row_proto); + } + } +} + +void writeParameter(const core::RecordObject& parameter, org::eclipse::tahu::protobuf::Payload_Template_Parameter* parameter_proto) { + if (parameter.contains("name")) { + parameter_proto->set_name(std::get(parameter.at("name").value_)); + } + if (parameter.contains("type")) { + parameter_proto->set_type(getIntegral(parameter.at("type").value_)); + } + + if (parameter.contains("int_value")) { + parameter_proto->set_int_value(getIntegral(parameter.at("int_value").value_)); + } else if (parameter.contains("long_value")) { + parameter_proto->set_long_value(getIntegral(parameter.at("long_value").value_)); + } else if (parameter.contains("float_value")) { + parameter_proto->set_float_value(gsl::narrow(std::get(parameter.at("float_value").value_))); + } else if (parameter.contains("double_value")) { + parameter_proto->set_double_value(std::get(parameter.at("double_value").value_)); + } else if (parameter.contains("boolean_value")) { + parameter_proto->set_boolean_value(std::get(parameter.at("boolean_value").value_)); + } else if (parameter.contains("string_value")) { + parameter_proto->set_string_value(std::get(parameter.at("string_value").value_)); + } +} + +void writeTemplate(const core::RecordObject& template_value, org::eclipse::tahu::protobuf::Payload_Template* template_proto) { + if (template_value.contains("version")) { + template_proto->set_version(std::get(template_value.at("version").value_)); + } + if (template_value.contains("metrics")) { + const auto& metrics = std::get(template_value.at("metrics").value_); + for (const auto& metric : metrics) { + auto* metric_proto = template_proto->add_metrics(); + writeMetric(std::get(metric.value_), metric_proto); + } + } + if (template_value.contains("parameters")) { + const auto& parameters = std::get(template_value.at("parameters").value_); + for (const auto& parameter : parameters) { + auto* parameter_proto = template_proto->add_parameters(); + writeParameter(std::get(parameter.value_), parameter_proto); + } + } + if (template_value.contains("template_ref")) { + template_proto->set_template_ref(std::get(template_value.at("template_ref").value_)); + } + if (template_value.contains("is_definition")) { + template_proto->set_is_definition(std::get(template_value.at("is_definition").value_)); + } +} + +template +void writeMetric(const core::RecordObject& metric, T* metric_proto) { + if (metric.contains("name")) { + metric_proto->set_name(std::get(metric.at("name").value_)); + } + if (metric.contains("alias")) { + metric_proto->set_alias(getIntegral(metric.at("alias").value_)); + } + if (metric.contains("timestamp")) { + metric_proto->set_timestamp(getIntegral(metric.at("timestamp").value_)); + } + if (metric.contains("datatype")) { + metric_proto->set_datatype(getIntegral(metric.at("datatype").value_)); + } + if (metric.contains("is_historical")) { + metric_proto->set_is_historical(std::get(metric.at("is_historical").value_)); + } + if (metric.contains("is_transient")) { + metric_proto->set_is_transient(std::get(metric.at("is_transient").value_)); + } + if (metric.contains("is_null")) { + metric_proto->set_is_null(std::get(metric.at("is_null").value_)); + } + if (metric.contains("metadata")) { + const auto& metadata = std::get(metric.at("metadata").value_); + auto* metadata_proto = metric_proto->mutable_metadata(); + writeMetadataToMetric(metadata, metadata_proto); + } + if (metric.contains("properties")) { + const auto& properties = std::get(metric.at("properties").value_); + auto* properties_proto = metric_proto->mutable_properties(); + writeProperties(properties, properties_proto); + } + + if (metric.contains("int_value")) { + metric_proto->set_int_value(getIntegral(metric.at("int_value").value_)); + } else if (metric.contains("long_value")) { + metric_proto->set_long_value(getIntegral(metric.at("long_value").value_)); + } else if (metric.contains("float_value")) { + metric_proto->set_float_value(gsl::narrow(std::get(metric.at("float_value").value_))); + } else if (metric.contains("double_value")) { + metric_proto->set_double_value(std::get(metric.at("double_value").value_)); + } else if (metric.contains("boolean_value")) { + metric_proto->set_boolean_value(std::get(metric.at("boolean_value").value_)); + } else if (metric.contains("string_value")) { + metric_proto->set_string_value(std::get(metric.at("string_value").value_)); + } else if (metric.contains("bytes_value")) { + metric_proto->set_bytes_value(std::get(metric.at("bytes_value").value_)); + } else if (metric.contains("dataset_value")) { + const auto& dataset = std::get(metric.at("dataset_value").value_); + auto* dataset_proto = metric_proto->mutable_dataset_value(); + writeDataSet(dataset, dataset_proto); + } else if (metric.contains("template_value")) { + const auto& template_value = std::get(metric.at("template_value").value_); + auto* template_proto = metric_proto->mutable_template_value(); + writeTemplate(template_value, template_proto); + } +} + +void writeRecordToPayload(const core::Record& record, org::eclipse::tahu::protobuf::Payload& payload) { + if (record.contains("timestamp")) { + payload.set_timestamp(getIntegral(record.at("timestamp").value_)); + } + if (record.contains("seq")) { + payload.set_seq(getIntegral(record.at("seq").value_)); + } + if (record.contains("uuid")) { + payload.set_uuid(std::get(record.at("uuid").value_)); + } + if (record.contains("body")) { + payload.set_body(std::get(record.at("body").value_)); + } + if (record.contains("metrics")) { + const auto& metrics = std::get(record.at("metrics").value_); + for (const auto& metric : metrics) { + auto* metric_proto = payload.add_metrics(); + writeMetric(std::get(metric.value_), metric_proto); + } + } +} + +} // namespace + +void SparkplugBWriter::write(const core::RecordSet& record_set, const std::shared_ptr& flow_file, core::ProcessSession& session) { + if (!flow_file) { + logger_->log_error("FlowFile is null, cannot write Sparkplug B message"); + return; + } + + if (record_set.empty()) { + logger_->log_info("No records to write to Sparkplug B message"); + return; + } + + bool is_first = true; + for (const auto& record : record_set) { + org::eclipse::tahu::protobuf::Payload payload; + try { + writeRecordToPayload(record, payload); + } catch (const std::exception& e) { + logger_->log_error("Failed to write record to Sparkplug B payload: {}", e.what()); + continue; + } + + auto size = payload.ByteSizeLong(); + std::vector buffer(size); + + if (!payload.SerializeToArray(buffer.data(), gsl::narrow(size))) { + logger_->log_error("Failed to serialize Sparkplug B payload"); + continue; + } + + auto write_callback = [&buffer](const std::shared_ptr& output_stream) -> int64_t { + const auto ret = output_stream->write(buffer.data(), buffer.size()); + return io::isError(ret) ? -1 : gsl::narrow(ret); + }; + if (is_first) { + session.write(flow_file, write_callback); + is_first = false; + } else { + session.append(flow_file, write_callback); + } + } +} + +REGISTER_RESOURCE(SparkplugBWriter, ControllerService); + +} // namespace org::apache::nifi::minifi::controllers diff --git a/extensions/mqtt/controllers/SparkplugBWriter.h b/extensions/mqtt/controllers/SparkplugBWriter.h new file mode 100644 index 0000000000..9c9f229074 --- /dev/null +++ b/extensions/mqtt/controllers/SparkplugBWriter.h @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "minifi-cpp/core/Record.h" +#include "controllers/RecordSetWriter.h" +#include "core/logging/LoggerFactory.h" + +namespace org::apache::nifi::minifi::controllers { + +class SparkplugBWriter final : public core::RecordSetWriterImpl { + public: + explicit SparkplugBWriter(const std::string_view name, const utils::Identifier& uuid = {}) : core::RecordSetWriterImpl(name, uuid) {} + + SparkplugBWriter(SparkplugBWriter&&) = delete; + SparkplugBWriter(const SparkplugBWriter&) = delete; + SparkplugBWriter& operator=(SparkplugBWriter&&) = delete; + SparkplugBWriter& operator=(const SparkplugBWriter&) = delete; + + ~SparkplugBWriter() override = default; + + EXTENSIONAPI static constexpr const char* Description = "Serializes recordset to Sparkplug B messages and writes them into a FlowFile. " + "This writer is typically used with MQTT processors like PublishMQTT."; + + EXTENSIONAPI static constexpr auto Properties = std::array{}; + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES + + void write(const core::RecordSet& record_set, const std::shared_ptr& flow_file, core::ProcessSession& session) override; + + void initialize() override { + setSupportedProperties(Properties); + } + void onEnable() override {} + void yield() override {} + bool isRunning() const override { return getState() == core::controller::ControllerServiceState::ENABLED; } + bool isWorkAvailable() override { return false; } + + private: + std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(); +}; + +} // namespace org::apache::nifi::minifi::controllers diff --git a/extensions/mqtt/tests/PublishMQTTTests.cpp b/extensions/mqtt/tests/PublishMQTTTests.cpp index 98ff172806..126b79586f 100644 --- a/extensions/mqtt/tests/PublishMQTTTests.cpp +++ b/extensions/mqtt/tests/PublishMQTTTests.cpp @@ -26,6 +26,7 @@ #include "core/Resource.h" #include "controllers/XMLRecordSetWriter.h" #include "unit/ProcessorUtils.h" +#include "sparkplug_b.pb.h" using namespace std::literals::chrono_literals; @@ -159,4 +160,41 @@ TEST_CASE_METHOD(PublishMQTTTestFixture, "Test scheduling failure if non-existen } } +TEST_CASE_METHOD(PublishMQTTTestFixture, "Test sending SparkplugB message records", "[publishMQTTTest]") { + test_controller_.plan->addController("JsonTreeReader", "JsonTreeReader"); + test_controller_.plan->addController("SparkplugBWriter", "SparkplugBWriter"); + REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::PublishMQTT::Topic.name, "mytopic")); + REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883")); + REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::PublishMQTT::RecordReader.name, "JsonTreeReader")); + REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::PublishMQTT::RecordWriter.name, "SparkplugBWriter")); + + const auto trigger_results = test_controller_.trigger( + R"([{"timestamp": 1752755515, "metrics": [{"name": "temperature", "int_value": 25, "datatype": 2}], "seq": 1, "uuid": "123e4567-e89b-12d3-a456-426614174000", "body": "testbody"}, + {"timestamp": 1752755516, "metrics": [{"name": "temperature", "int_value": 31, "datatype": 2}], "seq": 2, "uuid": "423e4567-e89b-12d3-a456-426614174111", "body": "testbody2"}])"); + CHECK(trigger_results.at(TestPublishMQTTProcessor::Success).size() == 2); + const auto flow_file_1 = trigger_results.at(TestPublishMQTTProcessor::Success).at(0); + + auto string_content = test_controller_.plan->getContent(flow_file_1); + org::eclipse::tahu::protobuf::Payload payload; + payload.ParseFromArray(static_cast(string_content.data()), gsl::narrow(string_content.size())); + CHECK(payload.timestamp() == 1752755515); + CHECK(payload.seq() == 1); + CHECK(payload.uuid() == "123e4567-e89b-12d3-a456-426614174000"); + CHECK(payload.metrics_size() == 1); + CHECK(payload.metrics(0).name() == "temperature"); + CHECK(payload.metrics(0).int_value() == 25); + CHECK(payload.body() == "testbody"); + + const auto flow_file_2 = trigger_results.at(TestPublishMQTTProcessor::Success).at(1); + string_content = test_controller_.plan->getContent(flow_file_2); + payload.ParseFromArray(static_cast(string_content.data()), gsl::narrow(string_content.size())); + CHECK(payload.timestamp() == 1752755516); + CHECK(payload.seq() == 2); + CHECK(payload.uuid() == "423e4567-e89b-12d3-a456-426614174111"); + CHECK(payload.metrics_size() == 1); + CHECK(payload.metrics(0).name() == "temperature"); + CHECK(payload.metrics(0).int_value() == 31); + CHECK(payload.body() == "testbody2"); +} + } // namespace org::apache::nifi::minifi::test diff --git a/extensions/mqtt/tests/SparkplugBWriterTests.cpp b/extensions/mqtt/tests/SparkplugBWriterTests.cpp new file mode 100644 index 0000000000..066a7e5a5b --- /dev/null +++ b/extensions/mqtt/tests/SparkplugBWriterTests.cpp @@ -0,0 +1,250 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "unit/Catch.h" +#include "unit/TestBase.h" +#include "../controllers/SparkplugBWriter.h" +#include "io/BufferStream.h" +#include "sparkplug_b.pb.h" +#include "core/ProcessSession.h" + +namespace org::apache::nifi::minifi::test { + +class SparkplugBWriterTestFixture { + public: + const core::Relationship Success{"success", "everything is fine"}; + const core::Relationship Failure{"failure", "something has gone awry"}; + + SparkplugBWriterTestFixture() { + test_plan_ = test_controller_.createPlan(); + dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor"); + context_ = [this] { + test_plan_->runNextProcessor(); + return test_plan_->getCurrentContext(); + }(); + process_session_ = std::make_unique(context_); + } + + core::ProcessSession &processSession() { return *process_session_; } + + void transferAndCommit(const std::shared_ptr& flow_file) { + process_session_->transfer(flow_file, Success); + process_session_->commit(); + } + + private: + TestController test_controller_; + + std::shared_ptr test_plan_; + core::Processor* dummy_processor_; + std::shared_ptr context_; + std::unique_ptr process_session_; +}; + +TEST_CASE_METHOD(SparkplugBWriterTestFixture, "Test empty record set", "[SparkplugBWriter]") { + core::RecordSet record_set; + + record_set.emplace_back(core::RecordObject{}); + auto flow_file = processSession().create(); + + controllers::SparkplugBWriter sparkplug_writer("SparkplugBWriter"); + sparkplug_writer.write(record_set, flow_file, processSession()); + transferAndCommit(flow_file); + org::eclipse::tahu::protobuf::Payload payload; + processSession().read(*flow_file, [&payload](const std::shared_ptr& input_stream) { + std::vector buffer(input_stream->size()); + input_stream->read(buffer); + payload.ParseFromArray(static_cast(buffer.data()), gsl::narrow(input_stream->size())); + return gsl::narrow(input_stream->size()); + }); + + CHECK_FALSE(payload.has_timestamp()); + CHECK_FALSE(payload.has_seq()); + CHECK_FALSE(payload.has_uuid()); + CHECK_FALSE(payload.has_body()); + CHECK(payload.metrics_size() == 0); +} + +TEST_CASE_METHOD(SparkplugBWriterTestFixture, "Test invalid type set", "[SparkplugBWriter]") { + core::RecordSet record_set; + core::RecordObject payload_record; + payload_record.emplace("timestamp", core::RecordField{std::string("invalid_timestamp")}); + record_set.emplace_back(std::move(payload_record)); + + auto flow_file = processSession().create(); + + controllers::SparkplugBWriter sparkplug_writer("SparkplugBWriter"); + sparkplug_writer.write(record_set, flow_file, processSession()); + transferAndCommit(flow_file); + org::eclipse::tahu::protobuf::Payload payload; + processSession().read(*flow_file, [&payload](const std::shared_ptr& input_stream) { + std::vector buffer(input_stream->size()); + input_stream->read(buffer); + payload.ParseFromArray(static_cast(buffer.data()), gsl::narrow(input_stream->size())); + return gsl::narrow(input_stream->size()); + }); + + CHECK_FALSE(payload.has_timestamp()); + CHECK_FALSE(payload.has_seq()); + CHECK_FALSE(payload.has_uuid()); + CHECK_FALSE(payload.has_body()); + CHECK(payload.metrics_size() == 0); + CHECK(LogTestController::getInstance().contains("Failed to write record to Sparkplug B payload: Invalid type for integral conversion")); +} + +TEST_CASE_METHOD(SparkplugBWriterTestFixture, "Test record set conversion to Sparkplug B payload", "[SparkplugBWriter]") { + core::RecordSet record_set; + core::RecordObject payload_record; + payload_record.emplace("timestamp", core::RecordField{static_cast(1234)}); + payload_record.emplace("seq", core::RecordField{static_cast(456)}); + payload_record.emplace("uuid", core::RecordField{std::string("my-uuid")}); + payload_record.emplace("body", core::RecordField{std::string("testbody")}); + core::RecordObject metric_object; + metric_object.emplace("name", core::RecordField{std::string("test_metric")}); + metric_object.emplace("alias", core::RecordField{static_cast(1)}); + metric_object.emplace("timestamp", core::RecordField{static_cast(789)}); + metric_object.emplace("datatype", core::RecordField{static_cast(2)}); + metric_object.emplace("is_historical", core::RecordField{false}); + metric_object.emplace("is_transient", core::RecordField{false}); + metric_object.emplace("is_null", core::RecordField{false}); + + core::RecordObject metadata_object; + metadata_object.emplace("is_multi_part", core::RecordField{false}); + metadata_object.emplace("content_type", core::RecordField{std::string("application/json")}); + metadata_object.emplace("size", core::RecordField{static_cast(1024)}); + metadata_object.emplace("seq", core::RecordField{static_cast(1)}); + metadata_object.emplace("file_name", core::RecordField{std::string("example.json")}); + metadata_object.emplace("file_type", core::RecordField{std::string("json")}); + metadata_object.emplace("md5", core::RecordField{std::string("d41d8cd98f00b204e9800998ecf8427e")}); + metadata_object.emplace("description", core::RecordField{std::string("Example metadata description")}); + metric_object.emplace("metadata", core::RecordField{std::move(metadata_object)}); + + core::RecordObject propertyset_object; + core::RecordArray keys_array; + keys_array.emplace_back(core::RecordField{std::string("key1")}); + keys_array.emplace_back(core::RecordField{std::string("key2")}); + core::RecordArray propertyvalues_array; + core::RecordObject propertyvalue1; + propertyvalue1.emplace("type", core::RecordField{static_cast(1)}); + propertyvalue1.emplace("is_null", core::RecordField{false}); + propertyvalue1.emplace("int_value", core::RecordField{static_cast(42)}); + core::RecordObject propertyvalue2; + propertyvalue2.emplace("type", core::RecordField{static_cast(1)}); + propertyvalue2.emplace("is_null", core::RecordField{false}); + propertyvalue2.emplace("int_value", core::RecordField{static_cast(43)}); + propertyvalues_array.emplace_back(core::RecordField{std::move(propertyvalue1)}); + propertyvalues_array.emplace_back(core::RecordField{std::move(propertyvalue2)}); + propertyset_object.emplace("keys", core::RecordField{std::move(keys_array)}); + propertyset_object.emplace("values", core::RecordField{std::move(propertyvalues_array)}); + metric_object.emplace("properties", core::RecordField{std::move(propertyset_object)}); + + core::RecordObject dataset_object; + + dataset_object.emplace("num_of_columns", core::RecordField{static_cast(1)}); + core::RecordArray columns_array; + columns_array.emplace_back(core::RecordField{std::string("column1")}); + dataset_object.emplace("columns", core::RecordField{std::move(columns_array)}); + core::RecordArray types_array; + types_array.emplace_back(core::RecordField{static_cast(1)}); + dataset_object.emplace("types", core::RecordField{std::move(types_array)}); + core::RecordArray rows_array; + core::RecordObject row_object; + core::RecordArray elements_array; + core::RecordObject element_object; + element_object.emplace("int_value", core::RecordField{static_cast(100)}); + elements_array.emplace_back(core::RecordField{std::move(element_object)}); + row_object.emplace("elements", core::RecordField{std::move(elements_array)}); + rows_array.emplace_back(core::RecordField{std::move(row_object)}); + dataset_object.emplace("rows", core::RecordField{std::move(rows_array)}); + metric_object.emplace("dataset_value", core::RecordField{std::move(dataset_object)}); + + core::RecordArray metric_array; + core::RecordField metric_field{std::move(metric_object)}; + metric_array.emplace_back(std::move(metric_field)); + payload_record.emplace("metrics", core::RecordField{std::move(metric_array)}); + record_set.emplace_back(std::move(payload_record)); + + record_set.emplace_back(core::RecordObject{}); + auto flow_file = processSession().create(); + + controllers::SparkplugBWriter sparkplug_writer("SparkplugBWriter"); + sparkplug_writer.write(record_set, flow_file, processSession()); + transferAndCommit(flow_file); + org::eclipse::tahu::protobuf::Payload payload; + processSession().read(*flow_file, [&payload](const std::shared_ptr& input_stream) { + std::vector buffer(input_stream->size()); + input_stream->read(buffer); + payload.ParseFromArray(static_cast(buffer.data()), gsl::narrow(input_stream->size())); + return gsl::narrow(input_stream->size()); + }); + + CHECK(payload.timestamp() == 1234); + CHECK(payload.seq() == 456); + CHECK(payload.uuid() == "my-uuid"); + CHECK(payload.body() == "testbody"); + CHECK(payload.metrics_size() == 1); + const auto& metric = payload.metrics(0); + CHECK(metric.name() == "test_metric"); + CHECK(metric.alias() == 1); + CHECK(metric.timestamp() == 789); + CHECK(metric.datatype() == 2); + CHECK_FALSE(metric.is_historical()); + CHECK_FALSE(metric.is_transient()); + CHECK_FALSE(metric.is_null()); + CHECK(metric.has_metadata()); + CHECK(metric.metadata().is_multi_part() == false); + CHECK(metric.metadata().content_type() == "application/json"); + CHECK(metric.metadata().size() == 1024); + CHECK(metric.metadata().seq() == 1); + CHECK(metric.metadata().file_name() == "example.json"); + CHECK(metric.metadata().file_type() == "json"); + CHECK(metric.metadata().md5() == "d41d8cd98f00b204e9800998ecf8427e"); + CHECK(metric.metadata().description() == "Example metadata description"); + + CHECK(metric.has_properties()); + const auto& properties = metric.properties(); + CHECK(properties.keys_size() == 2); + CHECK(properties.keys(0) == "key1"); + CHECK(properties.keys(1) == "key2"); + CHECK(properties.values_size() == 2); + const auto& property1 = properties.values(0); + CHECK(property1.type() == 1); + CHECK_FALSE(property1.is_null()); + CHECK(property1.int_value() == 42); + const auto& property2 = properties.values(1); + CHECK(property2.type() == 1); + CHECK_FALSE(property2.is_null()); + CHECK(property2.int_value() == 43); + + CHECK(metric.has_dataset_value()); + const auto& dataset = metric.dataset_value(); + CHECK(dataset.num_of_columns() == 1); + CHECK(dataset.columns_size() == 1); + CHECK(dataset.columns(0) == "column1"); + CHECK(dataset.types_size() == 1); + CHECK(dataset.types(0) == 1); + CHECK(dataset.rows_size() == 1); + const auto& row = dataset.rows(0); + CHECK(row.elements_size() == 1); + const auto& element = row.elements(0); + CHECK(element.has_int_value()); + CHECK(element.int_value() == 100); + CHECK_FALSE(element.has_long_value()); +} + +} // namespace org::apache::nifi::minifi::test diff --git a/minifi-api/include/minifi-cpp/core/Record.h b/minifi-api/include/minifi-cpp/core/Record.h index 5c2ca3a615..32d68e5532 100644 --- a/minifi-api/include/minifi-cpp/core/Record.h +++ b/minifi-api/include/minifi-cpp/core/Record.h @@ -43,6 +43,10 @@ class Record final { return fields_.emplace(std::move(key), std::move(field)); } + [[nodiscard]] bool contains(const std::string& key) const { + return fields_.contains(key); + } + [[nodiscard]] const RecordField& at(const std::string& key) const { return fields_.at(key); }