diff --git a/CONTROLLERS.md b/CONTROLLERS.md
index 0b8c0676db..f7a0a69adf 100644
--- a/CONTROLLERS.md
+++ b/CONTROLLERS.md
@@ -30,6 +30,7 @@ limitations under the License.
- [RocksDbStateStorage](#RocksDbStateStorage)
- [SmbConnectionControllerService](#SmbConnectionControllerService)
- [SparkplugBReader](#SparkplugBReader)
+- [SparkplugBWriter](#SparkplugBWriter)
- [SSLContextService](#SSLContextService)
- [UpdatePolicyControllerService](#UpdatePolicyControllerService)
- [VolatileMapStateStorage](#VolatileMapStateStorage)
@@ -295,6 +296,20 @@ In the list below, the names of required properties appear in bold. Any other pr
|------|---------------|------------------|-------------|
+## SparkplugBWriter
+
+### Description
+
+Serializes recordset to Sparkplug B messages and writes them into a FlowFile. This writer is typically used with MQTT processors like PublishMQTT.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+|------|---------------|------------------|-------------|
+
+
## SSLContextService
### Description
diff --git a/README.md b/README.md
index 6940651da6..8f4bcdb2a1 100644
--- a/README.md
+++ b/README.md
@@ -85,7 +85,7 @@ The next table outlines CMAKE flags that correspond with MiNiFi extensions. Exte
| Kubernetes (Linux) | [KubernetesControllerService](CONTROLLERS.md#kubernetescontrollerservice) | -DENABLE_KUBERNETES=ON |
| LlamaCpp | [RunLlamaCppInference](PROCESSORS.md#runllamacppinference) | -DENABLE_LLAMACPP=ON |
| Lua Scripting | [ExecuteScript](PROCESSORS.md#executescript) | -DENABLE_LUA_SCRIPTING=ON |
-| MQTT | [ConsumeMQTT](PROCESSORS.md#consumemqtt)
[PublishMQTT](PROCESSORS.md#publishmqtt)
[SparkplugBReader](PROCESSORS.md#sparkplugbreader) | -DENABLE_MQTT=ON |
+| MQTT | [ConsumeMQTT](PROCESSORS.md#consumemqtt)
[PublishMQTT](PROCESSORS.md#publishmqtt)
[SparkplugBReader](PROCESSORS.md#sparkplugbreader)
[SparkplugBWriter](PROCESSORS.md#sparkplugbwriter) | -DENABLE_MQTT=ON |
| OPC | [FetchOPCProcessor](PROCESSORS.md#fetchopcprocessor)
[PutOPCProcessor](PROCESSORS.md#putopcprocessor) | -DENABLE_OPC=ON |
| OpenCV | [CaptureRTSPFrame](PROCESSORS.md#capturertspframe)
[MotionDetector](PROCESSORS.md#motiondetector) | -DENABLE_OPENCV=ON |
| PDH (Windows) | [PerformanceDataMonitor](PROCESSORS.md#performancedatamonitor) | -DENABLE_PDH=ON |
diff --git a/docker/test/integration/features/mqtt.feature b/docker/test/integration/features/mqtt.feature
index 2cacd5b8df..37b94f0b97 100644
--- a/docker/test/integration/features/mqtt.feature
+++ b/docker/test/integration/features/mqtt.feature
@@ -579,3 +579,39 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
And a flowfile with the JSON content '[{"_isDuplicate": false, "_qos": 0, "_topicSegments": ["spBv1.0", "TestGroup", "DDATA", "TestNode", "TestDevice"], "_topic": "spBv1.0/TestGroup/DDATA/TestNode/TestDevice", "_isRetained": false, "body": "test-body", "uuid": "test-uuid", "seq": 12345, "metrics": [{"int_value": 123, "timestamp": 45345346346, "name": "TestMetric"}], "timestamp": 987654321}]' is placed in the monitored directory in less than 60 seconds
And the Minifi logs contain the following message: "key:record.count value:1" in less than 60 seconds
And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 1 seconds
+
+ Scenario: A MiNiFi instance publishes then consumes Sparkplug message through MQTT broker
+ Given an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
+
+ And a SparkplugBReader controller service is set up in the "consumer-client" flow
+ And a JsonRecordSetWriter controller service is set up with "Array" output grouping in the "consumer-client" flow
+ And a SparkplugBWriter controller service is set up in the "publisher-client" flow
+ And a JsonTreeReader controller service is set up in the "publisher-client" flow
+
+ And a file with the content '{"timestamp":987654321,"metrics":[{"int_value":123,"timestamp":45345346346,"name":"TestMetric"}],"seq":12345,"body":"test-body","uuid":"test-uuid"}' is present in '/tmp/input'
+
+ And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
+ And a PublishMQTT processor in the "publisher-client" flow
+ And the "Topic" property of the PublishMQTT processor is set to "spBv1.0/TestGroup/DDATA/TestNode/TestDevice"
+ And the "MQTT Version" property of the PublishMQTT processor is set to "3.1.1"
+ And the "Record Reader" property of the PublishMQTT processor is set to "JsonTreeReader"
+ And the "Record Writer" property of the PublishMQTT processor is set to "SparkplugBWriter"
+
+ And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+
+ And a ConsumeMQTT processor in the "consumer-client" flow
+ And the "Topic" property of the ConsumeMQTT processor is set to "spBv1.0/TestGroup/DDATA/TestNode/TestDevice"
+ And the "MQTT Version" property of the ConsumeMQTT processor is set to "3.1.1"
+ And the "Record Reader" property of the ConsumeMQTT processor is set to "SparkplugBReader"
+ And the "Record Writer" property of the ConsumeMQTT processor is set to "JsonRecordSetWriter"
+ And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
+ And a LogAttribute processor with the "Log Payload" property set to "true" in the "consumer-client" flow
+ And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
+ And the "success" relationship of the PutFile processor is connected to the LogAttribute
+
+ When all instances start up
+
+ Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+ And a flowfile with the JSON content '[{"_isDuplicate": false, "_qos": 0, "_topicSegments": ["spBv1.0", "TestGroup", "DDATA", "TestNode", "TestDevice"], "_topic": "spBv1.0/TestGroup/DDATA/TestNode/TestDevice", "_isRetained": false, "body": "test-body", "uuid": "test-uuid", "seq": 12345, "metrics": [{"int_value": 123, "timestamp": 45345346346, "name": "TestMetric"}], "timestamp": 987654321}]' is placed in the monitored directory in less than 60 seconds
+ And the Minifi logs contain the following message: "key:record.count value:1" in less than 60 seconds
+ And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 1 seconds
diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py
index 64ae0ba8b3..7d94cd91d2 100644
--- a/docker/test/integration/features/steps/steps.py
+++ b/docker/test/integration/features/steps/steps.py
@@ -30,6 +30,7 @@
from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService
from minifi.controllers.XMLReader import XMLReader
from minifi.controllers.SparkplugBReader import SparkplugBReader
+from minifi.controllers.SparkplugBWriter import SparkplugBWriter
from behave import given, then, when
from behave.model_describe import ModelDescriptor
@@ -507,13 +508,30 @@ def step_impl(context):
context.test.start('mqtt-broker')
-@given("a SparkplugBReader controller service is set up")
-def step_impl(context):
+@given("a SparkplugBReader controller service is set up in the \"{minifi_container_name}\" flow")
+def step_impl(context, minifi_container_name: str):
sparkplug_record_set_reader = SparkplugBReader("SparkplugBReader")
- container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
+ container = context.test.acquire_container(context=context, name=minifi_container_name)
container.add_controller(sparkplug_record_set_reader)
+@given("a SparkplugBWriter controller service is set up in the \"{minifi_container_name}\" flow")
+def step_impl(context, minifi_container_name: str):
+ sparkplug_record_set_writer = SparkplugBWriter("SparkplugBWriter")
+ container = context.test.acquire_container(context=context, name=minifi_container_name)
+ container.add_controller(sparkplug_record_set_writer)
+
+
+@given("a SparkplugBReader controller service is set up")
+def step_impl(context):
+ context.execute_steps("given a SparkplugBReader controller service is set up in the \"minifi-cpp-flow\" flow")
+
+
+@given("a SparkplugBWriter controller service is set up")
+def step_impl(context):
+ context.execute_steps("given a SparkplugBWriter controller service is set up in the \"minifi-cpp-flow\" flow")
+
+
@when("a test Sparkplug payload is published to the topic \"{topic}\"")
def step_impl(context, topic):
context.test.publish_test_sparkplug_payload(topic)
diff --git a/docker/test/integration/minifi/controllers/SparkplugBWriter.py b/docker/test/integration/minifi/controllers/SparkplugBWriter.py
new file mode 100644
index 0000000000..d9d5721e34
--- /dev/null
+++ b/docker/test/integration/minifi/controllers/SparkplugBWriter.py
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.ControllerService import ControllerService
+
+
+class SparkplugBWriter(ControllerService):
+ def __init__(self, name=None):
+ super(SparkplugBWriter, self).__init__(name=name)
+ self.service_class = 'SparkplugBWriter'
diff --git a/extensions/mqtt/controllers/SparkplugBWriter.cpp b/extensions/mqtt/controllers/SparkplugBWriter.cpp
new file mode 100644
index 0000000000..cdf87d4164
--- /dev/null
+++ b/extensions/mqtt/controllers/SparkplugBWriter.cpp
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "SparkplugBWriter.h"
+
+#include
+#include
+#include
+
+#include "sparkplug_b.pb.h"
+#include "minifi-cpp/core/Record.h"
+#include "core/Resource.h"
+#include "utils/expected.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+namespace {
+
+template
+T getIntegral(const std::variant& variant) {
+ if (std::holds_alternative(variant)) {
+ return gsl::narrow(std::get(variant));
+ } else if (std::holds_alternative(variant)) {
+ return gsl::narrow(std::get(variant));
+ } else {
+ throw std::invalid_argument("Invalid type for integral conversion");
+ }
+}
+
+void writeProperties(const core::RecordObject& properties, org::eclipse::tahu::protobuf::Payload_PropertySet* properties_proto);
+
+template
+void writeMetric(const core::RecordObject& metric, T* metric_proto);
+
+void writePropertyValue(const core::RecordObject& value, org::eclipse::tahu::protobuf::Payload_PropertyValue* value_proto) {
+ if (value.contains("type")) {
+ value_proto->set_type(getIntegral(value.at("type").value_));
+ }
+
+ if (value.contains("is_null")) {
+ value_proto->set_is_null(std::get(value.at("is_null").value_));
+ }
+
+ if (value.contains("int_value")) {
+ value_proto->set_int_value(getIntegral(value.at("int_value").value_));
+ } else if (value.contains("long_value")) {
+ value_proto->set_long_value(getIntegral(value.at("long_value").value_));
+ } else if (value.contains("float_value")) {
+ value_proto->set_float_value(gsl::narrow(std::get(value.at("float_value").value_)));
+ } else if (value.contains("double_value")) {
+ value_proto->set_double_value(std::get(value.at("double_value").value_));
+ } else if (value.contains("boolean_value")) {
+ value_proto->set_boolean_value(std::get(value.at("boolean_value").value_));
+ } else if (value.contains("string_value")) {
+ value_proto->set_string_value(std::get(value.at("string_value").value_));
+ } else if (value.contains("propertyset_value")) {
+ const auto& propertyset = std::get(value.at("propertyset_value").value_);
+ auto* propertyset_proto = value_proto->mutable_propertyset_value();
+ writeProperties(propertyset, propertyset_proto);
+ } else if (value.contains("propertysets_value")) {
+ const auto& propertysets = std::get(value.at("propertysets_value").value_);
+ auto* propertysets_proto = value_proto->mutable_propertysets_value();
+ for (const auto& propertyset : propertysets) {
+ auto* propertyset_proto = propertysets_proto->add_propertyset();
+ writeProperties(std::get(propertyset.value_), propertyset_proto);
+ }
+ }
+}
+
+void writeProperties(const core::RecordObject& properties, org::eclipse::tahu::protobuf::Payload_PropertySet* properties_proto) {
+ if (properties.contains("keys")) {
+ const auto& keys = std::get(properties.at("keys").value_);
+ for (const auto& key : keys) {
+ properties_proto->add_keys(std::get(key.value_));
+ }
+ }
+
+ if (properties.contains("values")) {
+ const auto& values = std::get(properties.at("values").value_);
+ for (const auto& value : values) {
+ auto* value_proto = properties_proto->add_values();
+ writePropertyValue(std::get(value.value_), value_proto);
+ }
+ }
+}
+
+void writeMetadataToMetric(const core::RecordObject& metadata, org::eclipse::tahu::protobuf::Payload_MetaData* metadata_proto) {
+ if (metadata.contains("is_multi_part")) {
+ metadata_proto->set_is_multi_part(std::get(metadata.at("is_multi_part").value_));
+ }
+ if (metadata.contains("content_type")) {
+ metadata_proto->set_content_type(std::get(metadata.at("content_type").value_));
+ }
+ if (metadata.contains("size")) {
+ metadata_proto->set_size(getIntegral(metadata.at("size").value_));
+ }
+ if (metadata.contains("seq")) {
+ metadata_proto->set_seq(getIntegral(metadata.at("seq").value_));
+ }
+ if (metadata.contains("file_name")) {
+ metadata_proto->set_file_name(std::get(metadata.at("file_name").value_));
+ }
+ if (metadata.contains("file_type")) {
+ metadata_proto->set_file_type(std::get(metadata.at("file_type").value_));
+ }
+ if (metadata.contains("md5")) {
+ metadata_proto->set_md5(std::get(metadata.at("md5").value_));
+ }
+ if (metadata.contains("description")) {
+ metadata_proto->set_description(std::get(metadata.at("description").value_));
+ }
+}
+
+void writeDataSetValue(const core::RecordObject& dataset_value, org::eclipse::tahu::protobuf::Payload_DataSet_DataSetValue* dataset_value_proto) {
+ if (dataset_value.contains("int_value")) {
+ dataset_value_proto->set_int_value(getIntegral(dataset_value.at("int_value").value_));
+ } else if (dataset_value.contains("long_value")) {
+ dataset_value_proto->set_long_value(getIntegral(dataset_value.at("long_value").value_));
+ } else if (dataset_value.contains("float_value")) {
+ dataset_value_proto->set_float_value(gsl::narrow(std::get(dataset_value.at("float_value").value_)));
+ } else if (dataset_value.contains("double_value")) {
+ dataset_value_proto->set_double_value(std::get(dataset_value.at("double_value").value_));
+ } else if (dataset_value.contains("boolean_value")) {
+ dataset_value_proto->set_boolean_value(std::get(dataset_value.at("boolean_value").value_));
+ } else if (dataset_value.contains("string_value")) {
+ dataset_value_proto->set_string_value(std::get(dataset_value.at("string_value").value_));
+ }
+}
+
+void writeRow(const core::RecordObject& row, org::eclipse::tahu::protobuf::Payload_DataSet_Row* row_proto) {
+ if (row.contains("elements")) {
+ const auto& datasetvalue_elements = std::get(row.at("elements").value_);
+ for (const auto& dataset_value : datasetvalue_elements) {
+ auto* dataset_value_proto = row_proto->add_elements();
+ writeDataSetValue(std::get(dataset_value.value_), dataset_value_proto);
+ }
+ }
+}
+
+void writeDataSet(const core::RecordObject& dataset, org::eclipse::tahu::protobuf::Payload_DataSet* dataset_proto) {
+ if (dataset.contains("num_of_columns")) {
+ dataset_proto->set_num_of_columns(getIntegral(dataset.at("num_of_columns").value_));
+ }
+ if (dataset.contains("columns")) {
+ const auto& columns = std::get(dataset.at("columns").value_);
+ for (const auto& column : columns) {
+ dataset_proto->add_columns(std::get(column.value_));
+ }
+ }
+ if (dataset.contains("types")) {
+ const auto& types = std::get(dataset.at("types").value_);
+ for (const auto& type : types) {
+ dataset_proto->add_types(getIntegral(type.value_));
+ }
+ }
+ if (dataset.contains("rows")) {
+ const auto& rows = std::get(dataset.at("rows").value_);
+ for (const auto& row : rows) {
+ auto* row_proto = dataset_proto->add_rows();
+ writeRow(std::get(row.value_), row_proto);
+ }
+ }
+}
+
+void writeParameter(const core::RecordObject& parameter, org::eclipse::tahu::protobuf::Payload_Template_Parameter* parameter_proto) {
+ if (parameter.contains("name")) {
+ parameter_proto->set_name(std::get(parameter.at("name").value_));
+ }
+ if (parameter.contains("type")) {
+ parameter_proto->set_type(getIntegral(parameter.at("type").value_));
+ }
+
+ if (parameter.contains("int_value")) {
+ parameter_proto->set_int_value(getIntegral(parameter.at("int_value").value_));
+ } else if (parameter.contains("long_value")) {
+ parameter_proto->set_long_value(getIntegral(parameter.at("long_value").value_));
+ } else if (parameter.contains("float_value")) {
+ parameter_proto->set_float_value(gsl::narrow(std::get(parameter.at("float_value").value_)));
+ } else if (parameter.contains("double_value")) {
+ parameter_proto->set_double_value(std::get(parameter.at("double_value").value_));
+ } else if (parameter.contains("boolean_value")) {
+ parameter_proto->set_boolean_value(std::get(parameter.at("boolean_value").value_));
+ } else if (parameter.contains("string_value")) {
+ parameter_proto->set_string_value(std::get(parameter.at("string_value").value_));
+ }
+}
+
+void writeTemplate(const core::RecordObject& template_value, org::eclipse::tahu::protobuf::Payload_Template* template_proto) {
+ if (template_value.contains("version")) {
+ template_proto->set_version(std::get(template_value.at("version").value_));
+ }
+ if (template_value.contains("metrics")) {
+ const auto& metrics = std::get(template_value.at("metrics").value_);
+ for (const auto& metric : metrics) {
+ auto* metric_proto = template_proto->add_metrics();
+ writeMetric(std::get(metric.value_), metric_proto);
+ }
+ }
+ if (template_value.contains("parameters")) {
+ const auto& parameters = std::get(template_value.at("parameters").value_);
+ for (const auto& parameter : parameters) {
+ auto* parameter_proto = template_proto->add_parameters();
+ writeParameter(std::get(parameter.value_), parameter_proto);
+ }
+ }
+ if (template_value.contains("template_ref")) {
+ template_proto->set_template_ref(std::get(template_value.at("template_ref").value_));
+ }
+ if (template_value.contains("is_definition")) {
+ template_proto->set_is_definition(std::get(template_value.at("is_definition").value_));
+ }
+}
+
+template
+void writeMetric(const core::RecordObject& metric, T* metric_proto) {
+ if (metric.contains("name")) {
+ metric_proto->set_name(std::get(metric.at("name").value_));
+ }
+ if (metric.contains("alias")) {
+ metric_proto->set_alias(getIntegral(metric.at("alias").value_));
+ }
+ if (metric.contains("timestamp")) {
+ metric_proto->set_timestamp(getIntegral(metric.at("timestamp").value_));
+ }
+ if (metric.contains("datatype")) {
+ metric_proto->set_datatype(getIntegral(metric.at("datatype").value_));
+ }
+ if (metric.contains("is_historical")) {
+ metric_proto->set_is_historical(std::get(metric.at("is_historical").value_));
+ }
+ if (metric.contains("is_transient")) {
+ metric_proto->set_is_transient(std::get(metric.at("is_transient").value_));
+ }
+ if (metric.contains("is_null")) {
+ metric_proto->set_is_null(std::get(metric.at("is_null").value_));
+ }
+ if (metric.contains("metadata")) {
+ const auto& metadata = std::get(metric.at("metadata").value_);
+ auto* metadata_proto = metric_proto->mutable_metadata();
+ writeMetadataToMetric(metadata, metadata_proto);
+ }
+ if (metric.contains("properties")) {
+ const auto& properties = std::get(metric.at("properties").value_);
+ auto* properties_proto = metric_proto->mutable_properties();
+ writeProperties(properties, properties_proto);
+ }
+
+ if (metric.contains("int_value")) {
+ metric_proto->set_int_value(getIntegral(metric.at("int_value").value_));
+ } else if (metric.contains("long_value")) {
+ metric_proto->set_long_value(getIntegral(metric.at("long_value").value_));
+ } else if (metric.contains("float_value")) {
+ metric_proto->set_float_value(gsl::narrow(std::get(metric.at("float_value").value_)));
+ } else if (metric.contains("double_value")) {
+ metric_proto->set_double_value(std::get(metric.at("double_value").value_));
+ } else if (metric.contains("boolean_value")) {
+ metric_proto->set_boolean_value(std::get(metric.at("boolean_value").value_));
+ } else if (metric.contains("string_value")) {
+ metric_proto->set_string_value(std::get(metric.at("string_value").value_));
+ } else if (metric.contains("bytes_value")) {
+ metric_proto->set_bytes_value(std::get(metric.at("bytes_value").value_));
+ } else if (metric.contains("dataset_value")) {
+ const auto& dataset = std::get(metric.at("dataset_value").value_);
+ auto* dataset_proto = metric_proto->mutable_dataset_value();
+ writeDataSet(dataset, dataset_proto);
+ } else if (metric.contains("template_value")) {
+ const auto& template_value = std::get(metric.at("template_value").value_);
+ auto* template_proto = metric_proto->mutable_template_value();
+ writeTemplate(template_value, template_proto);
+ }
+}
+
+void writeRecordToPayload(const core::Record& record, org::eclipse::tahu::protobuf::Payload& payload) {
+ if (record.contains("timestamp")) {
+ payload.set_timestamp(getIntegral(record.at("timestamp").value_));
+ }
+ if (record.contains("seq")) {
+ payload.set_seq(getIntegral(record.at("seq").value_));
+ }
+ if (record.contains("uuid")) {
+ payload.set_uuid(std::get(record.at("uuid").value_));
+ }
+ if (record.contains("body")) {
+ payload.set_body(std::get(record.at("body").value_));
+ }
+ if (record.contains("metrics")) {
+ const auto& metrics = std::get(record.at("metrics").value_);
+ for (const auto& metric : metrics) {
+ auto* metric_proto = payload.add_metrics();
+ writeMetric(std::get(metric.value_), metric_proto);
+ }
+ }
+}
+
+} // namespace
+
+void SparkplugBWriter::write(const core::RecordSet& record_set, const std::shared_ptr& flow_file, core::ProcessSession& session) {
+ if (!flow_file) {
+ logger_->log_error("FlowFile is null, cannot write Sparkplug B message");
+ return;
+ }
+
+ if (record_set.empty()) {
+ logger_->log_info("No records to write to Sparkplug B message");
+ return;
+ }
+
+ bool is_first = true;
+ for (const auto& record : record_set) {
+ org::eclipse::tahu::protobuf::Payload payload;
+ try {
+ writeRecordToPayload(record, payload);
+ } catch (const std::exception& e) {
+ logger_->log_error("Failed to write record to Sparkplug B payload: {}", e.what());
+ continue;
+ }
+
+ auto size = payload.ByteSizeLong();
+ std::vector buffer(size);
+
+ if (!payload.SerializeToArray(buffer.data(), gsl::narrow(size))) {
+ logger_->log_error("Failed to serialize Sparkplug B payload");
+ continue;
+ }
+
+ auto write_callback = [&buffer](const std::shared_ptr& output_stream) -> int64_t {
+ const auto ret = output_stream->write(buffer.data(), buffer.size());
+ return io::isError(ret) ? -1 : gsl::narrow(ret);
+ };
+ if (is_first) {
+ session.write(flow_file, write_callback);
+ is_first = false;
+ } else {
+ session.append(flow_file, write_callback);
+ }
+ }
+}
+
+REGISTER_RESOURCE(SparkplugBWriter, ControllerService);
+
+} // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/mqtt/controllers/SparkplugBWriter.h b/extensions/mqtt/controllers/SparkplugBWriter.h
new file mode 100644
index 0000000000..9c9f229074
--- /dev/null
+++ b/extensions/mqtt/controllers/SparkplugBWriter.h
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "minifi-cpp/core/Record.h"
+#include "controllers/RecordSetWriter.h"
+#include "core/logging/LoggerFactory.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+class SparkplugBWriter final : public core::RecordSetWriterImpl {
+ public:
+ explicit SparkplugBWriter(const std::string_view name, const utils::Identifier& uuid = {}) : core::RecordSetWriterImpl(name, uuid) {}
+
+ SparkplugBWriter(SparkplugBWriter&&) = delete;
+ SparkplugBWriter(const SparkplugBWriter&) = delete;
+ SparkplugBWriter& operator=(SparkplugBWriter&&) = delete;
+ SparkplugBWriter& operator=(const SparkplugBWriter&) = delete;
+
+ ~SparkplugBWriter() override = default;
+
+ EXTENSIONAPI static constexpr const char* Description = "Serializes recordset to Sparkplug B messages and writes them into a FlowFile. "
+ "This writer is typically used with MQTT processors like PublishMQTT.";
+
+ EXTENSIONAPI static constexpr auto Properties = std::array{};
+
+ EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+ ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
+
+ void write(const core::RecordSet& record_set, const std::shared_ptr& flow_file, core::ProcessSession& session) override;
+
+ void initialize() override {
+ setSupportedProperties(Properties);
+ }
+ void onEnable() override {}
+ void yield() override {}
+ bool isRunning() const override { return getState() == core::controller::ControllerServiceState::ENABLED; }
+ bool isWorkAvailable() override { return false; }
+
+ private:
+ std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger();
+};
+
+} // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/mqtt/tests/PublishMQTTTests.cpp b/extensions/mqtt/tests/PublishMQTTTests.cpp
index 98ff172806..126b79586f 100644
--- a/extensions/mqtt/tests/PublishMQTTTests.cpp
+++ b/extensions/mqtt/tests/PublishMQTTTests.cpp
@@ -26,6 +26,7 @@
#include "core/Resource.h"
#include "controllers/XMLRecordSetWriter.h"
#include "unit/ProcessorUtils.h"
+#include "sparkplug_b.pb.h"
using namespace std::literals::chrono_literals;
@@ -159,4 +160,41 @@ TEST_CASE_METHOD(PublishMQTTTestFixture, "Test scheduling failure if non-existen
}
}
+TEST_CASE_METHOD(PublishMQTTTestFixture, "Test sending SparkplugB message records", "[publishMQTTTest]") {
+ test_controller_.plan->addController("JsonTreeReader", "JsonTreeReader");
+ test_controller_.plan->addController("SparkplugBWriter", "SparkplugBWriter");
+ REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::PublishMQTT::Topic.name, "mytopic"));
+ REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::PublishMQTT::RecordReader.name, "JsonTreeReader"));
+ REQUIRE(publish_mqtt_processor_->setProperty(minifi::processors::PublishMQTT::RecordWriter.name, "SparkplugBWriter"));
+
+ const auto trigger_results = test_controller_.trigger(
+ R"([{"timestamp": 1752755515, "metrics": [{"name": "temperature", "int_value": 25, "datatype": 2}], "seq": 1, "uuid": "123e4567-e89b-12d3-a456-426614174000", "body": "testbody"},
+ {"timestamp": 1752755516, "metrics": [{"name": "temperature", "int_value": 31, "datatype": 2}], "seq": 2, "uuid": "423e4567-e89b-12d3-a456-426614174111", "body": "testbody2"}])");
+ CHECK(trigger_results.at(TestPublishMQTTProcessor::Success).size() == 2);
+ const auto flow_file_1 = trigger_results.at(TestPublishMQTTProcessor::Success).at(0);
+
+ auto string_content = test_controller_.plan->getContent(flow_file_1);
+ org::eclipse::tahu::protobuf::Payload payload;
+ payload.ParseFromArray(static_cast(string_content.data()), gsl::narrow(string_content.size()));
+ CHECK(payload.timestamp() == 1752755515);
+ CHECK(payload.seq() == 1);
+ CHECK(payload.uuid() == "123e4567-e89b-12d3-a456-426614174000");
+ CHECK(payload.metrics_size() == 1);
+ CHECK(payload.metrics(0).name() == "temperature");
+ CHECK(payload.metrics(0).int_value() == 25);
+ CHECK(payload.body() == "testbody");
+
+ const auto flow_file_2 = trigger_results.at(TestPublishMQTTProcessor::Success).at(1);
+ string_content = test_controller_.plan->getContent(flow_file_2);
+ payload.ParseFromArray(static_cast(string_content.data()), gsl::narrow(string_content.size()));
+ CHECK(payload.timestamp() == 1752755516);
+ CHECK(payload.seq() == 2);
+ CHECK(payload.uuid() == "423e4567-e89b-12d3-a456-426614174111");
+ CHECK(payload.metrics_size() == 1);
+ CHECK(payload.metrics(0).name() == "temperature");
+ CHECK(payload.metrics(0).int_value() == 31);
+ CHECK(payload.body() == "testbody2");
+}
+
} // namespace org::apache::nifi::minifi::test
diff --git a/extensions/mqtt/tests/SparkplugBWriterTests.cpp b/extensions/mqtt/tests/SparkplugBWriterTests.cpp
new file mode 100644
index 0000000000..066a7e5a5b
--- /dev/null
+++ b/extensions/mqtt/tests/SparkplugBWriterTests.cpp
@@ -0,0 +1,250 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "unit/Catch.h"
+#include "unit/TestBase.h"
+#include "../controllers/SparkplugBWriter.h"
+#include "io/BufferStream.h"
+#include "sparkplug_b.pb.h"
+#include "core/ProcessSession.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class SparkplugBWriterTestFixture {
+ public:
+ const core::Relationship Success{"success", "everything is fine"};
+ const core::Relationship Failure{"failure", "something has gone awry"};
+
+ SparkplugBWriterTestFixture() {
+ test_plan_ = test_controller_.createPlan();
+ dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor");
+ context_ = [this] {
+ test_plan_->runNextProcessor();
+ return test_plan_->getCurrentContext();
+ }();
+ process_session_ = std::make_unique(context_);
+ }
+
+ core::ProcessSession &processSession() { return *process_session_; }
+
+ void transferAndCommit(const std::shared_ptr& flow_file) {
+ process_session_->transfer(flow_file, Success);
+ process_session_->commit();
+ }
+
+ private:
+ TestController test_controller_;
+
+ std::shared_ptr test_plan_;
+ core::Processor* dummy_processor_;
+ std::shared_ptr context_;
+ std::unique_ptr process_session_;
+};
+
+TEST_CASE_METHOD(SparkplugBWriterTestFixture, "Test empty record set", "[SparkplugBWriter]") {
+ core::RecordSet record_set;
+
+ record_set.emplace_back(core::RecordObject{});
+ auto flow_file = processSession().create();
+
+ controllers::SparkplugBWriter sparkplug_writer("SparkplugBWriter");
+ sparkplug_writer.write(record_set, flow_file, processSession());
+ transferAndCommit(flow_file);
+ org::eclipse::tahu::protobuf::Payload payload;
+ processSession().read(*flow_file, [&payload](const std::shared_ptr& input_stream) {
+ std::vector buffer(input_stream->size());
+ input_stream->read(buffer);
+ payload.ParseFromArray(static_cast(buffer.data()), gsl::narrow(input_stream->size()));
+ return gsl::narrow(input_stream->size());
+ });
+
+ CHECK_FALSE(payload.has_timestamp());
+ CHECK_FALSE(payload.has_seq());
+ CHECK_FALSE(payload.has_uuid());
+ CHECK_FALSE(payload.has_body());
+ CHECK(payload.metrics_size() == 0);
+}
+
+TEST_CASE_METHOD(SparkplugBWriterTestFixture, "Test invalid type set", "[SparkplugBWriter]") {
+ core::RecordSet record_set;
+ core::RecordObject payload_record;
+ payload_record.emplace("timestamp", core::RecordField{std::string("invalid_timestamp")});
+ record_set.emplace_back(std::move(payload_record));
+
+ auto flow_file = processSession().create();
+
+ controllers::SparkplugBWriter sparkplug_writer("SparkplugBWriter");
+ sparkplug_writer.write(record_set, flow_file, processSession());
+ transferAndCommit(flow_file);
+ org::eclipse::tahu::protobuf::Payload payload;
+ processSession().read(*flow_file, [&payload](const std::shared_ptr& input_stream) {
+ std::vector buffer(input_stream->size());
+ input_stream->read(buffer);
+ payload.ParseFromArray(static_cast(buffer.data()), gsl::narrow(input_stream->size()));
+ return gsl::narrow(input_stream->size());
+ });
+
+ CHECK_FALSE(payload.has_timestamp());
+ CHECK_FALSE(payload.has_seq());
+ CHECK_FALSE(payload.has_uuid());
+ CHECK_FALSE(payload.has_body());
+ CHECK(payload.metrics_size() == 0);
+ CHECK(LogTestController::getInstance().contains("Failed to write record to Sparkplug B payload: Invalid type for integral conversion"));
+}
+
+TEST_CASE_METHOD(SparkplugBWriterTestFixture, "Test record set conversion to Sparkplug B payload", "[SparkplugBWriter]") {
+ core::RecordSet record_set;
+ core::RecordObject payload_record;
+ payload_record.emplace("timestamp", core::RecordField{static_cast(1234)});
+ payload_record.emplace("seq", core::RecordField{static_cast(456)});
+ payload_record.emplace("uuid", core::RecordField{std::string("my-uuid")});
+ payload_record.emplace("body", core::RecordField{std::string("testbody")});
+ core::RecordObject metric_object;
+ metric_object.emplace("name", core::RecordField{std::string("test_metric")});
+ metric_object.emplace("alias", core::RecordField{static_cast(1)});
+ metric_object.emplace("timestamp", core::RecordField{static_cast(789)});
+ metric_object.emplace("datatype", core::RecordField{static_cast(2)});
+ metric_object.emplace("is_historical", core::RecordField{false});
+ metric_object.emplace("is_transient", core::RecordField{false});
+ metric_object.emplace("is_null", core::RecordField{false});
+
+ core::RecordObject metadata_object;
+ metadata_object.emplace("is_multi_part", core::RecordField{false});
+ metadata_object.emplace("content_type", core::RecordField{std::string("application/json")});
+ metadata_object.emplace("size", core::RecordField{static_cast(1024)});
+ metadata_object.emplace("seq", core::RecordField{static_cast(1)});
+ metadata_object.emplace("file_name", core::RecordField{std::string("example.json")});
+ metadata_object.emplace("file_type", core::RecordField{std::string("json")});
+ metadata_object.emplace("md5", core::RecordField{std::string("d41d8cd98f00b204e9800998ecf8427e")});
+ metadata_object.emplace("description", core::RecordField{std::string("Example metadata description")});
+ metric_object.emplace("metadata", core::RecordField{std::move(metadata_object)});
+
+ core::RecordObject propertyset_object;
+ core::RecordArray keys_array;
+ keys_array.emplace_back(core::RecordField{std::string("key1")});
+ keys_array.emplace_back(core::RecordField{std::string("key2")});
+ core::RecordArray propertyvalues_array;
+ core::RecordObject propertyvalue1;
+ propertyvalue1.emplace("type", core::RecordField{static_cast(1)});
+ propertyvalue1.emplace("is_null", core::RecordField{false});
+ propertyvalue1.emplace("int_value", core::RecordField{static_cast(42)});
+ core::RecordObject propertyvalue2;
+ propertyvalue2.emplace("type", core::RecordField{static_cast(1)});
+ propertyvalue2.emplace("is_null", core::RecordField{false});
+ propertyvalue2.emplace("int_value", core::RecordField{static_cast(43)});
+ propertyvalues_array.emplace_back(core::RecordField{std::move(propertyvalue1)});
+ propertyvalues_array.emplace_back(core::RecordField{std::move(propertyvalue2)});
+ propertyset_object.emplace("keys", core::RecordField{std::move(keys_array)});
+ propertyset_object.emplace("values", core::RecordField{std::move(propertyvalues_array)});
+ metric_object.emplace("properties", core::RecordField{std::move(propertyset_object)});
+
+ core::RecordObject dataset_object;
+
+ dataset_object.emplace("num_of_columns", core::RecordField{static_cast(1)});
+ core::RecordArray columns_array;
+ columns_array.emplace_back(core::RecordField{std::string("column1")});
+ dataset_object.emplace("columns", core::RecordField{std::move(columns_array)});
+ core::RecordArray types_array;
+ types_array.emplace_back(core::RecordField{static_cast(1)});
+ dataset_object.emplace("types", core::RecordField{std::move(types_array)});
+ core::RecordArray rows_array;
+ core::RecordObject row_object;
+ core::RecordArray elements_array;
+ core::RecordObject element_object;
+ element_object.emplace("int_value", core::RecordField{static_cast(100)});
+ elements_array.emplace_back(core::RecordField{std::move(element_object)});
+ row_object.emplace("elements", core::RecordField{std::move(elements_array)});
+ rows_array.emplace_back(core::RecordField{std::move(row_object)});
+ dataset_object.emplace("rows", core::RecordField{std::move(rows_array)});
+ metric_object.emplace("dataset_value", core::RecordField{std::move(dataset_object)});
+
+ core::RecordArray metric_array;
+ core::RecordField metric_field{std::move(metric_object)};
+ metric_array.emplace_back(std::move(metric_field));
+ payload_record.emplace("metrics", core::RecordField{std::move(metric_array)});
+ record_set.emplace_back(std::move(payload_record));
+
+ record_set.emplace_back(core::RecordObject{});
+ auto flow_file = processSession().create();
+
+ controllers::SparkplugBWriter sparkplug_writer("SparkplugBWriter");
+ sparkplug_writer.write(record_set, flow_file, processSession());
+ transferAndCommit(flow_file);
+ org::eclipse::tahu::protobuf::Payload payload;
+ processSession().read(*flow_file, [&payload](const std::shared_ptr& input_stream) {
+ std::vector buffer(input_stream->size());
+ input_stream->read(buffer);
+ payload.ParseFromArray(static_cast(buffer.data()), gsl::narrow(input_stream->size()));
+ return gsl::narrow(input_stream->size());
+ });
+
+ CHECK(payload.timestamp() == 1234);
+ CHECK(payload.seq() == 456);
+ CHECK(payload.uuid() == "my-uuid");
+ CHECK(payload.body() == "testbody");
+ CHECK(payload.metrics_size() == 1);
+ const auto& metric = payload.metrics(0);
+ CHECK(metric.name() == "test_metric");
+ CHECK(metric.alias() == 1);
+ CHECK(metric.timestamp() == 789);
+ CHECK(metric.datatype() == 2);
+ CHECK_FALSE(metric.is_historical());
+ CHECK_FALSE(metric.is_transient());
+ CHECK_FALSE(metric.is_null());
+ CHECK(metric.has_metadata());
+ CHECK(metric.metadata().is_multi_part() == false);
+ CHECK(metric.metadata().content_type() == "application/json");
+ CHECK(metric.metadata().size() == 1024);
+ CHECK(metric.metadata().seq() == 1);
+ CHECK(metric.metadata().file_name() == "example.json");
+ CHECK(metric.metadata().file_type() == "json");
+ CHECK(metric.metadata().md5() == "d41d8cd98f00b204e9800998ecf8427e");
+ CHECK(metric.metadata().description() == "Example metadata description");
+
+ CHECK(metric.has_properties());
+ const auto& properties = metric.properties();
+ CHECK(properties.keys_size() == 2);
+ CHECK(properties.keys(0) == "key1");
+ CHECK(properties.keys(1) == "key2");
+ CHECK(properties.values_size() == 2);
+ const auto& property1 = properties.values(0);
+ CHECK(property1.type() == 1);
+ CHECK_FALSE(property1.is_null());
+ CHECK(property1.int_value() == 42);
+ const auto& property2 = properties.values(1);
+ CHECK(property2.type() == 1);
+ CHECK_FALSE(property2.is_null());
+ CHECK(property2.int_value() == 43);
+
+ CHECK(metric.has_dataset_value());
+ const auto& dataset = metric.dataset_value();
+ CHECK(dataset.num_of_columns() == 1);
+ CHECK(dataset.columns_size() == 1);
+ CHECK(dataset.columns(0) == "column1");
+ CHECK(dataset.types_size() == 1);
+ CHECK(dataset.types(0) == 1);
+ CHECK(dataset.rows_size() == 1);
+ const auto& row = dataset.rows(0);
+ CHECK(row.elements_size() == 1);
+ const auto& element = row.elements(0);
+ CHECK(element.has_int_value());
+ CHECK(element.int_value() == 100);
+ CHECK_FALSE(element.has_long_value());
+}
+
+} // namespace org::apache::nifi::minifi::test
diff --git a/minifi-api/include/minifi-cpp/core/Record.h b/minifi-api/include/minifi-cpp/core/Record.h
index 5c2ca3a615..32d68e5532 100644
--- a/minifi-api/include/minifi-cpp/core/Record.h
+++ b/minifi-api/include/minifi-cpp/core/Record.h
@@ -43,6 +43,10 @@ class Record final {
return fields_.emplace(std::move(key), std::move(field));
}
+ [[nodiscard]] bool contains(const std::string& key) const {
+ return fields_.contains(key);
+ }
+
[[nodiscard]] const RecordField& at(const std::string& key) const {
return fields_.at(key);
}