From 8d755575a35531333f69d750d89a237c26554556 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Wed, 2 Jul 2025 12:50:22 +0200 Subject: [PATCH] MINIFICPP-2587 Add Sparkplug support for ConsumeMQTT processor --- CONTROLLERS.md | 15 + LICENSE | 281 ++++++++++++++++++ NOTICE | 1 + README.md | 2 +- docker/requirements.txt | 1 + .../integration/cluster/DockerTestCluster.py | 3 + .../cluster/checkers/MqttHelper.py | 18 ++ .../cluster/checkers/sparkplug_b_pb2.py | 67 +++++ .../MiNiFi_integration_test_driver.py | 3 + docker/test/integration/features/mqtt.feature | 21 ++ .../test/integration/features/steps/steps.py | 13 + .../minifi/controllers/SparkplugBReader.py | 23 ++ extensions/mqtt/CMakeLists.txt | 25 +- .../mqtt/controllers/SparkplugBReader.cpp | 158 ++++++++++ .../mqtt/controllers/SparkplugBReader.h | 58 ++++ extensions/mqtt/protos/sparkplug_b.proto | 224 ++++++++++++++ extensions/mqtt/tests/CMakeLists.txt | 1 + extensions/mqtt/tests/ConsumeMQTTTests.cpp | 125 ++++++++ .../mqtt/tests/SparkplugBReaderTests.cpp | 68 +++++ run_flake8.sh | 2 +- 20 files changed, 1104 insertions(+), 5 deletions(-) create mode 100644 docker/test/integration/cluster/checkers/sparkplug_b_pb2.py create mode 100644 docker/test/integration/minifi/controllers/SparkplugBReader.py create mode 100644 extensions/mqtt/controllers/SparkplugBReader.cpp create mode 100644 extensions/mqtt/controllers/SparkplugBReader.h create mode 100644 extensions/mqtt/protos/sparkplug_b.proto create mode 100644 extensions/mqtt/tests/SparkplugBReaderTests.cpp diff --git a/CONTROLLERS.md b/CONTROLLERS.md index 587470fdde..0b8c0676db 100644 --- a/CONTROLLERS.md +++ b/CONTROLLERS.md @@ -29,6 +29,7 @@ limitations under the License. - [PersistentMapStateStorage](#PersistentMapStateStorage) - [RocksDbStateStorage](#RocksDbStateStorage) - [SmbConnectionControllerService](#SmbConnectionControllerService) +- [SparkplugBReader](#SparkplugBReader) - [SSLContextService](#SSLContextService) - [UpdatePolicyControllerService](#UpdatePolicyControllerService) - [VolatileMapStateStorage](#VolatileMapStateStorage) @@ -280,6 +281,20 @@ In the list below, the names of required properties appear in bold. Any other pr | Password | | | The password used for authentication. Required if Username is set.
**Sensitive Property: true** | +## SparkplugBReader + +### Description + +Reads Sparkplug B messages and turns them into individual Record objects. The reader expects a single Sparkplug B payload in a read operation, which is a protobuf-encoded binary message. This reader is designed to be used with ConsumeMQTT, since Sparkplug B is an MQTT-based protocol. + +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +|------|---------------|------------------|-------------| + + ## SSLContextService ### Description diff --git a/LICENSE b/LICENSE index 9e8c591ecc..d456bdad86 100644 --- a/LICENSE +++ b/LICENSE @@ -3494,3 +3494,284 @@ SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + +This product bundles 'Sparkplug B' protobuf file from the 'Eclipse Tahu' project which is available under the Eclipse Public License - v 2.0 license below. + +Eclipse Public License - v 2.0 + + THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE + PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION + OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT. + +1. DEFINITIONS + +"Contribution" means: + + a) in the case of the initial Contributor, the initial content + Distributed under this Agreement, and + + b) in the case of each subsequent Contributor: + i) changes to the Program, and + ii) additions to the Program; + where such changes and/or additions to the Program originate from + and are Distributed by that particular Contributor. A Contribution + "originates" from a Contributor if it was added to the Program by + such Contributor itself or anyone acting on such Contributor's behalf. + Contributions do not include changes or additions to the Program that + are not Modified Works. + +"Contributor" means any person or entity that Distributes the Program. + +"Licensed Patents" mean patent claims licensable by a Contributor which +are necessarily infringed by the use or sale of its Contribution alone +or when combined with the Program. + +"Program" means the Contributions Distributed in accordance with this +Agreement. + +"Recipient" means anyone who receives the Program under this Agreement +or any Secondary License (as applicable), including Contributors. + +"Derivative Works" shall mean any work, whether in Source Code or other +form, that is based on (or derived from) the Program and for which the +editorial revisions, annotations, elaborations, or other modifications +represent, as a whole, an original work of authorship. + +"Modified Works" shall mean any work in Source Code or other form that +results from an addition to, deletion from, or modification of the +contents of the Program, including, for purposes of clarity any new file +in Source Code form that contains any contents of the Program. Modified +Works shall not include works that contain only declarations, +interfaces, types, classes, structures, or files of the Program solely +in each case in order to link to, bind by name, or subclass the Program +or Modified Works thereof. + +"Distribute" means the acts of a) distributing or b) making available +in any manner that enables the transfer of a copy. + +"Source Code" means the form of a Program preferred for making +modifications, including but not limited to software source code, +documentation source, and configuration files. + +"Secondary License" means either the GNU General Public License, +Version 2.0, or any later versions of that license, including any +exceptions or additional permissions as identified by the initial +Contributor. + +2. GRANT OF RIGHTS + + a) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free copyright + license to reproduce, prepare Derivative Works of, publicly display, + publicly perform, Distribute and sublicense the Contribution of such + Contributor, if any, and such Derivative Works. + + b) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free patent + license under Licensed Patents to make, use, sell, offer to sell, + import and otherwise transfer the Contribution of such Contributor, + if any, in Source Code or other form. This patent license shall + apply to the combination of the Contribution and the Program if, at + the time the Contribution is added by the Contributor, such addition + of the Contribution causes such combination to be covered by the + Licensed Patents. The patent license shall not apply to any other + combinations which include the Contribution. No hardware per se is + licensed hereunder. + + c) Recipient understands that although each Contributor grants the + licenses to its Contributions set forth herein, no assurances are + provided by any Contributor that the Program does not infringe the + patent or other intellectual property rights of any other entity. + Each Contributor disclaims any liability to Recipient for claims + brought by any other entity based on infringement of intellectual + property rights or otherwise. As a condition to exercising the + rights and licenses granted hereunder, each Recipient hereby + assumes sole responsibility to secure any other intellectual + property rights needed, if any. For example, if a third party + patent license is required to allow Recipient to Distribute the + Program, it is Recipient's responsibility to acquire that license + before distributing the Program. + + d) Each Contributor represents that to its knowledge it has + sufficient copyright rights in its Contribution, if any, to grant + the copyright license set forth in this Agreement. + + e) Notwithstanding the terms of any Secondary License, no + Contributor makes additional grants to any Recipient (other than + those set forth in this Agreement) as a result of such Recipient's + receipt of the Program under the terms of a Secondary License + (if permitted under the terms of Section 3). + +3. REQUIREMENTS + +3.1 If a Contributor Distributes the Program in any form, then: + + a) the Program must also be made available as Source Code, in + accordance with section 3.2, and the Contributor must accompany + the Program with a statement that the Source Code for the Program + is available under this Agreement, and informs Recipients how to + obtain it in a reasonable manner on or through a medium customarily + used for software exchange; and + + b) the Contributor may Distribute the Program under a license + different than this Agreement, provided that such license: + i) effectively disclaims on behalf of all other Contributors all + warranties and conditions, express and implied, including + warranties or conditions of title and non-infringement, and + implied warranties or conditions of merchantability and fitness + for a particular purpose; + + ii) effectively excludes on behalf of all other Contributors all + liability for damages, including direct, indirect, special, + incidental and consequential damages, such as lost profits; + + iii) does not attempt to limit or alter the recipients' rights + in the Source Code under section 3.2; and + + iv) requires any subsequent distribution of the Program by any + party to be under a license that satisfies the requirements + of this section 3. + +3.2 When the Program is Distributed as Source Code: + + a) it must be made available under this Agreement, or if the + Program (i) is combined with other material in a separate file or + files made available under a Secondary License, and (ii) the initial + Contributor attached to the Source Code the notice described in + Exhibit A of this Agreement, then the Program may be made available + under the terms of such Secondary Licenses, and + + b) a copy of this Agreement must be included with each copy of + the Program. + +3.3 Contributors may not remove or alter any copyright, patent, +trademark, attribution notices, disclaimers of warranty, or limitations +of liability ("notices") contained within the Program from any copy of +the Program which they Distribute, provided that Contributors may add +their own appropriate notices. + +4. COMMERCIAL DISTRIBUTION + +Commercial distributors of software may accept certain responsibilities +with respect to end users, business partners and the like. While this +license is intended to facilitate the commercial use of the Program, +the Contributor who includes the Program in a commercial product +offering should do so in a manner which does not create potential +liability for other Contributors. Therefore, if a Contributor includes +the Program in a commercial product offering, such Contributor +("Commercial Contributor") hereby agrees to defend and indemnify every +other Contributor ("Indemnified Contributor") against any losses, +damages and costs (collectively "Losses") arising from claims, lawsuits +and other legal actions brought by a third party against the Indemnified +Contributor to the extent caused by the acts or omissions of such +Commercial Contributor in connection with its distribution of the Program +in a commercial product offering. The obligations in this section do not +apply to any claims or Losses relating to any actual or alleged +intellectual property infringement. In order to qualify, an Indemnified +Contributor must: a) promptly notify the Commercial Contributor in +writing of such claim, and b) allow the Commercial Contributor to control, +and cooperate with the Commercial Contributor in, the defense and any +related settlement negotiations. The Indemnified Contributor may +participate in any such claim at its own expense. + +For example, a Contributor might include the Program in a commercial +product offering, Product X. That Contributor is then a Commercial +Contributor. If that Commercial Contributor then makes performance +claims, or offers warranties related to Product X, those performance +claims and warranties are such Commercial Contributor's responsibility +alone. Under this section, the Commercial Contributor would have to +defend claims against the other Contributors related to those performance +claims and warranties, and if a court requires any other Contributor to +pay any damages as a result, the Commercial Contributor must pay +those damages. + +5. NO WARRANTY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, THE PROGRAM IS PROVIDED ON AN "AS IS" +BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR +IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF +TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR +PURPOSE. Each Recipient is solely responsible for determining the +appropriateness of using and distributing the Program and assumes all +risks associated with its exercise of rights under this Agreement, +including but not limited to the risks and costs of program errors, +compliance with applicable laws, damage to or loss of data, programs +or equipment, and unavailability or interruption of operations. + +6. DISCLAIMER OF LIABILITY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, NEITHER RECIPIENT NOR ANY CONTRIBUTORS +SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST +PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE +EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + +7. GENERAL + +If any provision of this Agreement is invalid or unenforceable under +applicable law, it shall not affect the validity or enforceability of +the remainder of the terms of this Agreement, and without further +action by the parties hereto, such provision shall be reformed to the +minimum extent necessary to make such provision valid and enforceable. + +If Recipient institutes patent litigation against any entity +(including a cross-claim or counterclaim in a lawsuit) alleging that the +Program itself (excluding combinations of the Program with other software +or hardware) infringes such Recipient's patent(s), then such Recipient's +rights granted under Section 2(b) shall terminate as of the date such +litigation is filed. + +All Recipient's rights under this Agreement shall terminate if it +fails to comply with any of the material terms or conditions of this +Agreement and does not cure such failure in a reasonable period of +time after becoming aware of such noncompliance. If all Recipient's +rights under this Agreement terminate, Recipient agrees to cease use +and distribution of the Program as soon as reasonably practicable. +However, Recipient's obligations under this Agreement and any licenses +granted by Recipient relating to the Program shall continue and survive. + +Everyone is permitted to copy and distribute copies of this Agreement, +but in order to avoid inconsistency the Agreement is copyrighted and +may only be modified in the following manner. The Agreement Steward +reserves the right to publish new versions (including revisions) of +this Agreement from time to time. No one other than the Agreement +Steward has the right to modify this Agreement. The Eclipse Foundation +is the initial Agreement Steward. The Eclipse Foundation may assign the +responsibility to serve as the Agreement Steward to a suitable separate +entity. Each new version of the Agreement will be given a distinguishing +version number. The Program (including Contributions) may always be +Distributed subject to the version of the Agreement under which it was +received. In addition, after a new version of the Agreement is published, +Contributor may elect to Distribute the Program (including its +Contributions) under the new version. + +Except as expressly stated in Sections 2(a) and 2(b) above, Recipient +receives no rights or licenses to the intellectual property of any +Contributor under this Agreement, whether expressly, by implication, +estoppel or otherwise. All rights in the Program not expressly granted +under this Agreement are reserved. Nothing in this Agreement is intended +to be enforceable by any entity that is not a Contributor or Recipient. +No third-party beneficiary rights are created under this Agreement. + +Exhibit A - Form of Secondary Licenses Notice + +"This Source Code may also be made available under the following +Secondary Licenses when the conditions for such availability set forth +in the Eclipse Public License, v. 2.0 are satisfied: {name license(s), +version(s), and exceptions or additional permissions here}." + + Simply including a copy of this Agreement, including this Exhibit A + is not sufficient to license the Source Code under Secondary Licenses. + + If it is not possible or desirable to put the notice in a particular + file, then You may include the notice in a location (such as a LICENSE + file in a relevant directory) where a recipient would be likely to + look for such a notice. + + You may add additional accurate notices of copyright ownership. diff --git a/NOTICE b/NOTICE index e97469bff9..03e8787b55 100644 --- a/NOTICE +++ b/NOTICE @@ -79,6 +79,7 @@ This software includes third party software subject to the following copyrights: - llama.cpp - Copyright (c) 2023-2024 The ggml authors - pugixml - Copyright (C) 2003, by Kristen Wegner (kristen@tima.net) - jsoncons - Copyright Daniel Parker 2013 - 2020. +- Eclipse Tahu - Copyright (c) 2015, 2018 Cirrus Link Solutions and others The licenses for these third party components are included in LICENSE.txt diff --git a/README.md b/README.md index c4da64624d..6940651da6 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,7 @@ The next table outlines CMAKE flags that correspond with MiNiFi extensions. Exte | Kubernetes (Linux) | [KubernetesControllerService](CONTROLLERS.md#kubernetescontrollerservice) | -DENABLE_KUBERNETES=ON | | LlamaCpp | [RunLlamaCppInference](PROCESSORS.md#runllamacppinference) | -DENABLE_LLAMACPP=ON | | Lua Scripting | [ExecuteScript](PROCESSORS.md#executescript) | -DENABLE_LUA_SCRIPTING=ON | -| MQTT | [ConsumeMQTT](PROCESSORS.md#consumemqtt)
[PublishMQTT](PROCESSORS.md#publishmqtt) | -DENABLE_MQTT=ON | +| MQTT | [ConsumeMQTT](PROCESSORS.md#consumemqtt)
[PublishMQTT](PROCESSORS.md#publishmqtt)
[SparkplugBReader](PROCESSORS.md#sparkplugbreader) | -DENABLE_MQTT=ON | | OPC | [FetchOPCProcessor](PROCESSORS.md#fetchopcprocessor)
[PutOPCProcessor](PROCESSORS.md#putopcprocessor) | -DENABLE_OPC=ON | | OpenCV | [CaptureRTSPFrame](PROCESSORS.md#capturertspframe)
[MotionDetector](PROCESSORS.md#motiondetector) | -DENABLE_OPENCV=ON | | PDH (Windows) | [PerformanceDataMonitor](PROCESSORS.md#performancedatamonitor) | -DENABLE_PDH=ON | diff --git a/docker/requirements.txt b/docker/requirements.txt index 2a2da597a8..bc0df59d9b 100644 --- a/docker/requirements.txt +++ b/docker/requirements.txt @@ -12,3 +12,4 @@ humanfriendly==10.0 requests<2.29 # https://github.com/docker/docker-py/issues/3113 couchbase==4.3.5 paho-mqtt==2.1.0 +protobuf==6.31.1 diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index 620f842677..4c785e41d2 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -462,3 +462,6 @@ def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, expected_d def publish_test_mqtt_message(self, topic: str, message: str): self.mqtt_helper.publish_test_mqtt_message(topic, message) + + def publish_test_sparkplug_payload(self, topic): + self.mqtt_helper.publish_test_sparkplug_payload(topic) diff --git a/docker/test/integration/cluster/checkers/MqttHelper.py b/docker/test/integration/cluster/checkers/MqttHelper.py index 719911c970..9d39b49d2f 100644 --- a/docker/test/integration/cluster/checkers/MqttHelper.py +++ b/docker/test/integration/cluster/checkers/MqttHelper.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import paho.mqtt.client as mqtt +from .sparkplug_b_pb2 import Payload class MqttHelper: @@ -20,4 +21,21 @@ def publish_test_mqtt_message(self, topic: str, message: str): client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "docker_test_client_id") client.connect("localhost", 1883, 60) client.publish(topic, message) + + def publish_test_sparkplug_payload(self, topic: str): + client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "docker_test_client_id") + client.connect("localhost", 1883, 60) + + payload = Payload() + metric = payload.metrics.add() + metric.name = "TestMetric" + metric.int_value = 123 + metric.timestamp = 45345346346 + payload.uuid = "test-uuid" + payload.timestamp = 987654321 + payload.seq = 12345 + payload.body = b"test-body" + payload_bytes = payload.SerializeToString() + + client.publish(topic, payload_bytes) client.disconnect() diff --git a/docker/test/integration/cluster/checkers/sparkplug_b_pb2.py b/docker/test/integration/cluster/checkers/sparkplug_b_pb2.py new file mode 100644 index 0000000000..4231914663 --- /dev/null +++ b/docker/test/integration/cluster/checkers/sparkplug_b_pb2.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: sparkplug_b.proto +# Protobuf Python Version: 6.31.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 6, + 31, + 1, + '', + 'sparkplug_b.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x11sparkplug_b.proto\x12\x19org.eclipse.tahu.protobuf\"\xee\x15\n\x07Payload\x12\x11\n\ttimestamp\x18\x01 \x01(\x04\x12:\n\x07metrics\x18\x02 \x03(\x0b\x32).org.eclipse.tahu.protobuf.Payload.Metric\x12\x0b\n\x03seq\x18\x03 \x01(\x04\x12\x0c\n\x04uuid\x18\x04 \x01(\t\x12\x0c\n\x04\x62ody\x18\x05 \x01(\x0c\x1a\xa6\x04\n\x08Template\x12\x0f\n\x07version\x18\x01 \x01(\t\x12:\n\x07metrics\x18\x02 \x03(\x0b\x32).org.eclipse.tahu.protobuf.Payload.Metric\x12I\n\nparameters\x18\x03 \x03(\x0b\x32\x35.org.eclipse.tahu.protobuf.Payload.Template.Parameter\x12\x14\n\x0ctemplate_ref\x18\x04 \x01(\t\x12\x15\n\ris_definition\x18\x05 \x01(\x08\x1a\xca\x02\n\tParameter\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\r\x12\x13\n\tint_value\x18\x03 \x01(\rH\x00\x12\x14\n\nlong_value\x18\x04 \x01(\x04H\x00\x12\x15\n\x0b\x66loat_value\x18\x05 \x01(\x02H\x00\x12\x16\n\x0c\x64ouble_value\x18\x06 \x01(\x01H\x00\x12\x17\n\rboolean_value\x18\x07 \x01(\x08H\x00\x12\x16\n\x0cstring_value\x18\x08 \x01(\tH\x00\x12h\n\x0f\x65xtension_value\x18\t \x01(\x0b\x32M.org.eclipse.tahu.protobuf.Payload.Template.Parameter.ParameterValueExtensionH\x00\x1a#\n\x17ParameterValueExtension*\x08\x08\x01\x10\x80\x80\x80\x80\x02\x42\x07\n\x05value*\x08\x08\x06\x10\x80\x80\x80\x80\x02\x1a\x97\x04\n\x07\x44\x61taSet\x12\x16\n\x0enum_of_columns\x18\x01 \x01(\x04\x12\x0f\n\x07\x63olumns\x18\x02 \x03(\t\x12\r\n\x05types\x18\x03 \x03(\r\x12<\n\x04rows\x18\x04 \x03(\x0b\x32..org.eclipse.tahu.protobuf.Payload.DataSet.Row\x1a\xaf\x02\n\x0c\x44\x61taSetValue\x12\x13\n\tint_value\x18\x01 \x01(\rH\x00\x12\x14\n\nlong_value\x18\x02 \x01(\x04H\x00\x12\x15\n\x0b\x66loat_value\x18\x03 \x01(\x02H\x00\x12\x16\n\x0c\x64ouble_value\x18\x04 \x01(\x01H\x00\x12\x17\n\rboolean_value\x18\x05 \x01(\x08H\x00\x12\x16\n\x0cstring_value\x18\x06 \x01(\tH\x00\x12h\n\x0f\x65xtension_value\x18\x07 \x01(\x0b\x32M.org.eclipse.tahu.protobuf.Payload.DataSet.DataSetValue.DataSetValueExtensionH\x00\x1a!\n\x15\x44\x61taSetValueExtension*\x08\x08\x01\x10\x80\x80\x80\x80\x02\x42\x07\n\x05value\x1aZ\n\x03Row\x12I\n\x08\x65lements\x18\x01 \x03(\x0b\x32\x37.org.eclipse.tahu.protobuf.Payload.DataSet.DataSetValue*\x08\x08\x02\x10\x80\x80\x80\x80\x02*\x08\x08\x05\x10\x80\x80\x80\x80\x02\x1a\xe9\x03\n\rPropertyValue\x12\x0c\n\x04type\x18\x01 \x01(\r\x12\x0f\n\x07is_null\x18\x02 \x01(\x08\x12\x13\n\tint_value\x18\x03 \x01(\rH\x00\x12\x14\n\nlong_value\x18\x04 \x01(\x04H\x00\x12\x15\n\x0b\x66loat_value\x18\x05 \x01(\x02H\x00\x12\x16\n\x0c\x64ouble_value\x18\x06 \x01(\x01H\x00\x12\x17\n\rboolean_value\x18\x07 \x01(\x08H\x00\x12\x16\n\x0cstring_value\x18\x08 \x01(\tH\x00\x12K\n\x11propertyset_value\x18\t \x01(\x0b\x32..org.eclipse.tahu.protobuf.Payload.PropertySetH\x00\x12P\n\x12propertysets_value\x18\n \x01(\x0b\x32\x32.org.eclipse.tahu.protobuf.Payload.PropertySetListH\x00\x12\x62\n\x0f\x65xtension_value\x18\x0b \x01(\x0b\x32G.org.eclipse.tahu.protobuf.Payload.PropertyValue.PropertyValueExtensionH\x00\x1a\"\n\x16PropertyValueExtension*\x08\x08\x01\x10\x80\x80\x80\x80\x02\x42\x07\n\x05value\x1ag\n\x0bPropertySet\x12\x0c\n\x04keys\x18\x01 \x03(\t\x12@\n\x06values\x18\x02 \x03(\x0b\x32\x30.org.eclipse.tahu.protobuf.Payload.PropertyValue*\x08\x08\x03\x10\x80\x80\x80\x80\x02\x1a`\n\x0fPropertySetList\x12\x43\n\x0bpropertyset\x18\x01 \x03(\x0b\x32..org.eclipse.tahu.protobuf.Payload.PropertySet*\x08\x08\x02\x10\x80\x80\x80\x80\x02\x1a\xa4\x01\n\x08MetaData\x12\x15\n\ris_multi_part\x18\x01 \x01(\x08\x12\x14\n\x0c\x63ontent_type\x18\x02 \x01(\t\x12\x0c\n\x04size\x18\x03 \x01(\x04\x12\x0b\n\x03seq\x18\x04 \x01(\x04\x12\x11\n\tfile_name\x18\x05 \x01(\t\x12\x11\n\tfile_type\x18\x06 \x01(\t\x12\x0b\n\x03md5\x18\x07 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x08 \x01(\t*\x08\x08\t\x10\x80\x80\x80\x80\x02\x1a\xbf\x05\n\x06Metric\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05\x61lias\x18\x02 \x01(\x04\x12\x11\n\ttimestamp\x18\x03 \x01(\x04\x12\x10\n\x08\x64\x61tatype\x18\x04 \x01(\r\x12\x15\n\ris_historical\x18\x05 \x01(\x08\x12\x14\n\x0cis_transient\x18\x06 \x01(\x08\x12\x0f\n\x07is_null\x18\x07 \x01(\x08\x12=\n\x08metadata\x18\x08 \x01(\x0b\x32+.org.eclipse.tahu.protobuf.Payload.MetaData\x12\x42\n\nproperties\x18\t \x01(\x0b\x32..org.eclipse.tahu.protobuf.Payload.PropertySet\x12\x13\n\tint_value\x18\n \x01(\rH\x00\x12\x14\n\nlong_value\x18\x0b \x01(\x04H\x00\x12\x15\n\x0b\x66loat_value\x18\x0c \x01(\x02H\x00\x12\x16\n\x0c\x64ouble_value\x18\r \x01(\x01H\x00\x12\x17\n\rboolean_value\x18\x0e \x01(\x08H\x00\x12\x16\n\x0cstring_value\x18\x0f \x01(\tH\x00\x12\x15\n\x0b\x62ytes_value\x18\x10 \x01(\x0cH\x00\x12\x43\n\rdataset_value\x18\x11 \x01(\x0b\x32*.org.eclipse.tahu.protobuf.Payload.DataSetH\x00\x12\x45\n\x0etemplate_value\x18\x12 \x01(\x0b\x32+.org.eclipse.tahu.protobuf.Payload.TemplateH\x00\x12Y\n\x0f\x65xtension_value\x18\x13 \x01(\x0b\x32>.org.eclipse.tahu.protobuf.Payload.Metric.MetricValueExtensionH\x00\x1a \n\x14MetricValueExtension*\x08\x08\x01\x10\x80\x80\x80\x80\x02\x42\x07\n\x05value*\x08\x08\x06\x10\x80\x80\x80\x80\x02*\xf2\x03\n\x08\x44\x61taType\x12\x0b\n\x07Unknown\x10\x00\x12\x08\n\x04Int8\x10\x01\x12\t\n\x05Int16\x10\x02\x12\t\n\x05Int32\x10\x03\x12\t\n\x05Int64\x10\x04\x12\t\n\x05UInt8\x10\x05\x12\n\n\x06UInt16\x10\x06\x12\n\n\x06UInt32\x10\x07\x12\n\n\x06UInt64\x10\x08\x12\t\n\x05\x46loat\x10\t\x12\n\n\x06\x44ouble\x10\n\x12\x0b\n\x07\x42oolean\x10\x0b\x12\n\n\x06String\x10\x0c\x12\x0c\n\x08\x44\x61teTime\x10\r\x12\x08\n\x04Text\x10\x0e\x12\x08\n\x04UUID\x10\x0f\x12\x0b\n\x07\x44\x61taSet\x10\x10\x12\t\n\x05\x42ytes\x10\x11\x12\x08\n\x04\x46ile\x10\x12\x12\x0c\n\x08Template\x10\x13\x12\x0f\n\x0bPropertySet\x10\x14\x12\x13\n\x0fPropertySetList\x10\x15\x12\r\n\tInt8Array\x10\x16\x12\x0e\n\nInt16Array\x10\x17\x12\x0e\n\nInt32Array\x10\x18\x12\x0e\n\nInt64Array\x10\x19\x12\x0e\n\nUInt8Array\x10\x1a\x12\x0f\n\x0bUInt16Array\x10\x1b\x12\x0f\n\x0bUInt32Array\x10\x1c\x12\x0f\n\x0bUInt64Array\x10\x1d\x12\x0e\n\nFloatArray\x10\x1e\x12\x0f\n\x0b\x44oubleArray\x10\x1f\x12\x10\n\x0c\x42ooleanArray\x10 \x12\x0f\n\x0bStringArray\x10!\x12\x11\n\rDateTimeArray\x10\"B,\n\x19org.eclipse.tahu.protobufB\x0fSparkplugBProto') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'sparkplug_b_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + _globals['DESCRIPTOR']._loaded_options = None + _globals['DESCRIPTOR']._serialized_options = b'\n\031org.eclipse.tahu.protobufB\017SparkplugBProto' + _globals['_DATATYPE']._serialized_start=2850 + _globals['_DATATYPE']._serialized_end=3348 + _globals['_PAYLOAD']._serialized_start=49 + _globals['_PAYLOAD']._serialized_end=2847 + _globals['_PAYLOAD_TEMPLATE']._serialized_start=181 + _globals['_PAYLOAD_TEMPLATE']._serialized_end=731 + _globals['_PAYLOAD_TEMPLATE_PARAMETER']._serialized_start=391 + _globals['_PAYLOAD_TEMPLATE_PARAMETER']._serialized_end=721 + _globals['_PAYLOAD_TEMPLATE_PARAMETER_PARAMETERVALUEEXTENSION']._serialized_start=677 + _globals['_PAYLOAD_TEMPLATE_PARAMETER_PARAMETERVALUEEXTENSION']._serialized_end=712 + _globals['_PAYLOAD_DATASET']._serialized_start=734 + _globals['_PAYLOAD_DATASET']._serialized_end=1269 + _globals['_PAYLOAD_DATASET_DATASETVALUE']._serialized_start=864 + _globals['_PAYLOAD_DATASET_DATASETVALUE']._serialized_end=1167 + _globals['_PAYLOAD_DATASET_DATASETVALUE_DATASETVALUEEXTENSION']._serialized_start=1125 + _globals['_PAYLOAD_DATASET_DATASETVALUE_DATASETVALUEEXTENSION']._serialized_end=1158 + _globals['_PAYLOAD_DATASET_ROW']._serialized_start=1169 + _globals['_PAYLOAD_DATASET_ROW']._serialized_end=1259 + _globals['_PAYLOAD_PROPERTYVALUE']._serialized_start=1272 + _globals['_PAYLOAD_PROPERTYVALUE']._serialized_end=1761 + _globals['_PAYLOAD_PROPERTYVALUE_PROPERTYVALUEEXTENSION']._serialized_start=1718 + _globals['_PAYLOAD_PROPERTYVALUE_PROPERTYVALUEEXTENSION']._serialized_end=1752 + _globals['_PAYLOAD_PROPERTYSET']._serialized_start=1763 + _globals['_PAYLOAD_PROPERTYSET']._serialized_end=1866 + _globals['_PAYLOAD_PROPERTYSETLIST']._serialized_start=1868 + _globals['_PAYLOAD_PROPERTYSETLIST']._serialized_end=1964 + _globals['_PAYLOAD_METADATA']._serialized_start=1967 + _globals['_PAYLOAD_METADATA']._serialized_end=2131 + _globals['_PAYLOAD_METRIC']._serialized_start=2134 + _globals['_PAYLOAD_METRIC']._serialized_end=2837 + _globals['_PAYLOAD_METRIC_METRICVALUEEXTENSION']._serialized_start=2796 + _globals['_PAYLOAD_METRIC_METRICVALUEEXTENSION']._serialized_end=2828 +# @@protoc_insertion_point(module_scope) diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index e74e1772fd..4c67d9bfca 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -520,3 +520,6 @@ def check_is_data_present_on_couchbase(self, doc_id: str, bucket_name: str, expe def publish_test_mqtt_message(self, topic, message): self.cluster.publish_test_mqtt_message(topic, message) + + def publish_test_sparkplug_payload(self, topic): + self.cluster.publish_test_sparkplug_payload(topic) diff --git a/docker/test/integration/features/mqtt.feature b/docker/test/integration/features/mqtt.feature index 64ef62f6b4..2cacd5b8df 100644 --- a/docker/test/integration/features/mqtt.feature +++ b/docker/test/integration/features/mqtt.feature @@ -558,3 +558,24 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT Then two flowfiles with the contents 'test' and '42' are placed in the monitored directory in less than 60 seconds And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(72 bytes\)" And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(64 bytes\)" + + Scenario: A MiNiFi instance consumes Sparkplug message from an MQTT broker + Given a SparkplugBReader controller service is set up + And a JsonRecordSetWriter controller service is set up with "Array" output grouping + And a ConsumeMQTT processor with the "Topic" property set to "spBv1.0/TestGroup/DDATA/TestNode/TestDevice" + And the "MQTT Version" property of the ConsumeMQTT processor is set to "3.1.1" + And the "Record Reader" property of the ConsumeMQTT processor is set to "SparkplugBReader" + And the "Record Writer" property of the ConsumeMQTT processor is set to "JsonRecordSetWriter" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And a LogAttribute processor + And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile + And the "success" relationship of the PutFile processor is connected to the LogAttribute + And an MQTT broker is set up in correspondence with the ConsumeMQTT + + When both instances start up + And a test Sparkplug payload is published to the topic "spBv1.0/TestGroup/DDATA/TestNode/TestDevice" + + Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" + And a flowfile with the JSON content '[{"_isDuplicate": false, "_qos": 0, "_topicSegments": ["spBv1.0", "TestGroup", "DDATA", "TestNode", "TestDevice"], "_topic": "spBv1.0/TestGroup/DDATA/TestNode/TestDevice", "_isRetained": false, "body": "test-body", "uuid": "test-uuid", "seq": 12345, "metrics": [{"int_value": 123, "timestamp": 45345346346, "name": "TestMetric"}], "timestamp": 987654321}]' is placed in the monitored directory in less than 60 seconds + And the Minifi logs contain the following message: "key:record.count value:1" in less than 60 seconds + And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 1 seconds diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index 590fa06d0b..64ae0ba8b3 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -29,6 +29,7 @@ from minifi.controllers.XMLRecordSetWriter import XMLRecordSetWriter from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService from minifi.controllers.XMLReader import XMLReader +from minifi.controllers.SparkplugBReader import SparkplugBReader from behave import given, then, when from behave.model_describe import ModelDescriptor @@ -506,6 +507,18 @@ def step_impl(context): context.test.start('mqtt-broker') +@given("a SparkplugBReader controller service is set up") +def step_impl(context): + sparkplug_record_set_reader = SparkplugBReader("SparkplugBReader") + container = context.test.acquire_container(context=context, name="minifi-cpp-flow") + container.add_controller(sparkplug_record_set_reader) + + +@when("a test Sparkplug payload is published to the topic \"{topic}\"") +def step_impl(context, topic): + context.test.publish_test_sparkplug_payload(topic) + + # s3 setup @given("a s3 server is set up in correspondence with the PutS3Object") @given("a s3 server is set up in correspondence with the DeleteS3Object") diff --git a/docker/test/integration/minifi/controllers/SparkplugBReader.py b/docker/test/integration/minifi/controllers/SparkplugBReader.py new file mode 100644 index 0000000000..9dc08aa99d --- /dev/null +++ b/docker/test/integration/minifi/controllers/SparkplugBReader.py @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ..core.ControllerService import ControllerService + + +class SparkplugBReader(ControllerService): + def __init__(self, name=None): + super(SparkplugBReader, self).__init__(name=name) + self.service_class = 'SparkplugBReader' diff --git a/extensions/mqtt/CMakeLists.txt b/extensions/mqtt/CMakeLists.txt index 3226236646..0530c37337 100644 --- a/extensions/mqtt/CMakeLists.txt +++ b/extensions/mqtt/CMakeLists.txt @@ -21,14 +21,33 @@ if(NOT (ENABLE_ALL OR ENABLE_MQTT)) return() endif() +include(Protobuf) + include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) -include_directories(./processors ../../libminifi/include ../../libminifi/include/core) -file(GLOB SOURCES "processors/*.cpp") +set(SPARKPLUG_PROTOBUF_GENERATED_DIR ${CMAKE_BINARY_DIR}/sparkplug-protobuf-generated) +file(MAKE_DIRECTORY ${SPARKPLUG_PROTOBUF_GENERATED_DIR}) + +add_custom_command( + OUTPUT ${SPARKPLUG_PROTOBUF_GENERATED_DIR}/sparkplug_b.pb.h ${SPARKPLUG_PROTOBUF_GENERATED_DIR}/sparkplug_b.pb.cc + COMMAND ${PROTOBUF_COMPILER} -I=${CMAKE_CURRENT_SOURCE_DIR}/protos/ -I=${protobuf_SOURCE_DIR}/src --cpp_out=${SPARKPLUG_PROTOBUF_GENERATED_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/protos/sparkplug_b.proto + DEPENDS protobuf::protoc) + +add_library(sparkplug-protos ${SPARKPLUG_PROTOBUF_GENERATED_DIR}/sparkplug_b.pb.cc) +target_include_directories(sparkplug-protos SYSTEM PRIVATE BEFORE "${SPARKPLUG_PROTOBUF_GENERATED_DIR}" "${PROTOBUF_INCLUDE_DIR}") +target_link_libraries(sparkplug-protos protobuf::libprotobuf) + +if (NOT WIN32) + set_source_files_properties(${SPARKPLUG_PROTOBUF_GENERATED_DIR}/sparkplug_b.pb.cc PROPERTIES COMPILE_FLAGS -Wno-error) +endif() + +file(GLOB SOURCES "processors/*.cpp" "controllers/*.cpp") add_minifi_library(minifi-mqtt-extensions SHARED ${SOURCES}) -target_link_libraries(minifi-mqtt-extensions ${LIBMINIFI}) +target_link_libraries(minifi-mqtt-extensions ${LIBMINIFI} sparkplug-protos) +target_include_directories(minifi-mqtt-extensions PRIVATE BEFORE "./processors" "./controllers" "../../libminifi/include" "../../libminifi/include/core") +target_include_directories(minifi-mqtt-extensions SYSTEM PRIVATE BEFORE "${SPARKPLUG_PROTOBUF_GENERATED_DIR}" "${PROTOBUF_INCLUDE_DIR}") include(PahoMqttC) target_link_libraries(minifi-mqtt-extensions paho.mqtt.c) diff --git a/extensions/mqtt/controllers/SparkplugBReader.cpp b/extensions/mqtt/controllers/SparkplugBReader.cpp new file mode 100644 index 0000000000..e4d0a2be4c --- /dev/null +++ b/extensions/mqtt/controllers/SparkplugBReader.cpp @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "SparkplugBReader.h" + +#include +#include + +#include "google/protobuf/message.h" +#include "google/protobuf/descriptor.h" +#include "google/protobuf/reflection.h" + +#include "sparkplug_b.pb.h" +#include "minifi-cpp/core/Record.h" +#include "core/Resource.h" +#include "utils/expected.h" + +namespace org::apache::nifi::minifi::controllers { + +namespace { + +nonstd::expected parseRecordFromProtobufMessage(const google::protobuf::Message& message) { + core::RecordObject result; + const google::protobuf::Descriptor* descriptor = message.GetDescriptor(); + const google::protobuf::Reflection* reflection = message.GetReflection(); + + int field_count = descriptor->field_count(); + for (int i = 0; i < field_count; ++i) { + const google::protobuf::FieldDescriptor* field = descriptor->field(i); + + std::string full_name{field->name()}; + + if (field->is_repeated()) { + core::RecordArray record_array; + int field_size = reflection->FieldSize(message, field); + for (int j = 0; j < field_size; ++j) { + switch (field->cpp_type()) { + case google::protobuf::FieldDescriptor::CPPTYPE_INT32: + record_array.push_back(core::RecordField(reflection->GetRepeatedInt32(message, field, j))); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_INT64: + record_array.push_back(core::RecordField(reflection->GetRepeatedInt64(message, field, j))); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: + record_array.push_back(core::RecordField(reflection->GetRepeatedUInt32(message, field, j))); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: + record_array.push_back(core::RecordField(reflection->GetRepeatedUInt64(message, field, j))); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: + record_array.push_back(core::RecordField(reflection->GetRepeatedDouble(message, field, j))); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: + record_array.push_back(core::RecordField(reflection->GetRepeatedFloat(message, field, j))); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: + record_array.push_back(core::RecordField(reflection->GetRepeatedBool(message, field, j))); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_STRING: + record_array.push_back(core::RecordField(reflection->GetRepeatedString(message, field, j))); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: { + auto parse_result = parseRecordFromProtobufMessage(reflection->GetRepeatedMessage(message, field, j)); + if (!parse_result) { + return nonstd::make_unexpected(parse_result.error()); + } + record_array.push_back(core::RecordField{*parse_result}); + break; + } + default: + return nonstd::make_unexpected(std::make_error_code(std::errc::invalid_argument)); + } + } + result.emplace(full_name, core::RecordField(record_array)); + } else if (reflection->HasField(message, field)) { + switch (field->cpp_type()) { + case google::protobuf::FieldDescriptor::CPPTYPE_INT32: + result.emplace(full_name, core::RecordField(reflection->GetInt32(message, field))); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_INT64: + result.emplace(full_name, core::RecordField(reflection->GetInt64(message, field))); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: + result.emplace(full_name, core::RecordField(reflection->GetUInt32(message, field))); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: + result.emplace(full_name, core::RecordField(reflection->GetUInt64(message, field))); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: + result.emplace(full_name, core::RecordField(reflection->GetDouble(message, field))); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: + result.emplace(full_name, core::RecordField(reflection->GetFloat(message, field))); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: + result.emplace(full_name, core::RecordField(reflection->GetBool(message, field))); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_STRING: + result.emplace(full_name, core::RecordField(reflection->GetString(message, field))); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: { + auto parse_result = parseRecordFromProtobufMessage(reflection->GetMessage(message, field)); + if (!parse_result) { + return nonstd::make_unexpected(parse_result.error()); + } + result.emplace(full_name, core::RecordField{*parse_result}); + break; + } + default: + return nonstd::make_unexpected(std::make_error_code(std::errc::invalid_argument)); + } + } + } + return result; +} + +} // namespace + +nonstd::expected SparkplugBReader::read(io::InputStream& input_stream) { + org::eclipse::tahu::protobuf::Payload payload; + std::vector buffer(input_stream.size()); + auto result = input_stream.read(buffer); + + if (io::isError(result) || result != input_stream.size()) { + return nonstd::make_unexpected(std::make_error_code(std::errc::invalid_argument)); + } + + if (!payload.ParseFromArray(static_cast(buffer.data()), gsl::narrow(input_stream.size()))) { + return nonstd::make_unexpected(std::make_error_code(std::errc::invalid_argument)); + } + + auto parse_result = parseRecordFromProtobufMessage(payload); + if (!parse_result) { + logger_->log_error("Failed to parse Sparkplug payload: {}", parse_result.error().message()); + return nonstd::make_unexpected(parse_result.error()); + } + core::Record record{std::move(*parse_result)}; + core::RecordSet record_set; + record_set.push_back(std::move(record)); + return record_set; +} + +REGISTER_RESOURCE(SparkplugBReader, ControllerService); + +} // namespace org::apache::nifi::minifi::controllers diff --git a/extensions/mqtt/controllers/SparkplugBReader.h b/extensions/mqtt/controllers/SparkplugBReader.h new file mode 100644 index 0000000000..aca201790b --- /dev/null +++ b/extensions/mqtt/controllers/SparkplugBReader.h @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "controllers/RecordSetReader.h" +#include "core/logging/LoggerFactory.h" + +namespace org::apache::nifi::minifi::controllers { + +class SparkplugBReader final : public core::RecordSetReaderImpl { + public: + explicit SparkplugBReader(const std::string_view name, const utils::Identifier& uuid = {}) : RecordSetReaderImpl(name, uuid) {} + + SparkplugBReader(SparkplugBReader&&) = delete; + SparkplugBReader(const SparkplugBReader&) = delete; + SparkplugBReader& operator=(SparkplugBReader&&) = delete; + SparkplugBReader& operator=(const SparkplugBReader&) = delete; + + ~SparkplugBReader() override = default; + + EXTENSIONAPI static constexpr const char* Description = "Reads Sparkplug B messages and turns them into individual Record objects. " + "The reader expects a single Sparkplug B payload in a read operation, which is a protobuf-encoded binary message. This reader is designed to be used with ConsumeMQTT, " + "since Sparkplug B is an MQTT-based protocol."; + + EXTENSIONAPI static constexpr auto Properties = std::array{}; + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES + + nonstd::expected read(io::InputStream& input_stream) override; + + void initialize() override { + setSupportedProperties(Properties); + } + void onEnable() override {} + void yield() override {} + bool isRunning() const override { return getState() == core::controller::ControllerServiceState::ENABLED; } + bool isWorkAvailable() override { return false; } + + private: + std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(); +}; + +} // namespace org::apache::nifi::minifi::controllers diff --git a/extensions/mqtt/protos/sparkplug_b.proto b/extensions/mqtt/protos/sparkplug_b.proto new file mode 100644 index 0000000000..bf72ab5f09 --- /dev/null +++ b/extensions/mqtt/protos/sparkplug_b.proto @@ -0,0 +1,224 @@ +// * Copyright (c) 2015, 2018 Cirrus Link Solutions and others +// * +// * This program and the accompanying materials are made available under the +// * terms of the Eclipse Public License 2.0 which is available at +// * http://www.eclipse.org/legal/epl-2.0. +// * +// * SPDX-License-Identifier: EPL-2.0 +// * +// * Contributors: +// * Cirrus Link Solutions - initial implementation + +// +// To compile: +// cd client_libraries/java +// protoc --proto_path=../../ --java_out=src/main/java ../../sparkplug_b.proto +// + +syntax = "proto2"; + +package org.eclipse.tahu.protobuf; + +option java_package = "org.eclipse.tahu.protobuf"; +option java_outer_classname = "SparkplugBProto"; + +enum DataType { + // Indexes of Data Types + + // Unknown placeholder for future expansion. + Unknown = 0; + + // Basic Types + Int8 = 1; + Int16 = 2; + Int32 = 3; + Int64 = 4; + UInt8 = 5; + UInt16 = 6; + UInt32 = 7; + UInt64 = 8; + Float = 9; + Double = 10; + Boolean = 11; + String = 12; + DateTime = 13; + Text = 14; + + // Additional Metric Types + UUID = 15; + DataSet = 16; + Bytes = 17; + File = 18; + Template = 19; + + // Additional PropertyValue Types + PropertySet = 20; + PropertySetList = 21; + + // Array Types + Int8Array = 22; + Int16Array = 23; + Int32Array = 24; + Int64Array = 25; + UInt8Array = 26; + UInt16Array = 27; + UInt32Array = 28; + UInt64Array = 29; + FloatArray = 30; + DoubleArray = 31; + BooleanArray = 32; + StringArray = 33; + DateTimeArray = 34; +} + +message Payload { + + message Template { + + message Parameter { + optional string name = 1; + optional uint32 type = 2; + + oneof value { + uint32 int_value = 3; + uint64 long_value = 4; + float float_value = 5; + double double_value = 6; + bool boolean_value = 7; + string string_value = 8; + ParameterValueExtension extension_value = 9; + } + + message ParameterValueExtension { + extensions 1 to max; + } + } + + optional string version = 1; // The version of the Template to prevent mismatches + repeated Metric metrics = 2; // Each metric includes a name, datatype, and optionally a value + repeated Parameter parameters = 3; + optional string template_ref = 4; // MUST be a reference to a template definition if this is an instance (i.e. the name of the template definition) - MUST be omitted for template definitions + optional bool is_definition = 5; + extensions 6 to max; + } + + message DataSet { + + message DataSetValue { + + oneof value { + uint32 int_value = 1; + uint64 long_value = 2; + float float_value = 3; + double double_value = 4; + bool boolean_value = 5; + string string_value = 6; + DataSetValueExtension extension_value = 7; + } + + message DataSetValueExtension { + extensions 1 to max; + } + } + + message Row { + repeated DataSetValue elements = 1; + extensions 2 to max; // For third party extensions + } + + optional uint64 num_of_columns = 1; + repeated string columns = 2; + repeated uint32 types = 3; + repeated Row rows = 4; + extensions 5 to max; // For third party extensions + } + + message PropertyValue { + + optional uint32 type = 1; + optional bool is_null = 2; + + oneof value { + uint32 int_value = 3; + uint64 long_value = 4; + float float_value = 5; + double double_value = 6; + bool boolean_value = 7; + string string_value = 8; + PropertySet propertyset_value = 9; + PropertySetList propertysets_value = 10; // List of Property Values + PropertyValueExtension extension_value = 11; + } + + message PropertyValueExtension { + extensions 1 to max; + } + } + + message PropertySet { + repeated string keys = 1; // Names of the properties + repeated PropertyValue values = 2; + extensions 3 to max; + } + + message PropertySetList { + repeated PropertySet propertyset = 1; + extensions 2 to max; + } + + message MetaData { + // Bytes specific metadata + optional bool is_multi_part = 1; + + // General metadata + optional string content_type = 2; // Content/Media type + optional uint64 size = 3; // File size, String size, Multi-part size, etc + optional uint64 seq = 4; // Sequence number for multi-part messages + + // File metadata + optional string file_name = 5; // File name + optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc) + optional string md5 = 7; // md5 of data + + // Catchalls and future expansion + optional string description = 8; // Could be anything such as json or xml of custom properties + extensions 9 to max; + } + + message Metric { + + optional string name = 1; // Metric name - should only be included on birth + optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages + optional uint64 timestamp = 3; // Timestamp associated with data acquisition time + optional uint32 datatype = 4; // DataType of the metric/tag value + optional bool is_historical = 5; // If this is historical data and should not update real time tag + optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag + optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes. + optional MetaData metadata = 8; // Metadata for the payload + optional PropertySet properties = 9; + + oneof value { + uint32 int_value = 10; + uint64 long_value = 11; + float float_value = 12; + double double_value = 13; + bool boolean_value = 14; + string string_value = 15; + bytes bytes_value = 16; // Bytes, File + DataSet dataset_value = 17; + Template template_value = 18; + MetricValueExtension extension_value = 19; + } + + message MetricValueExtension { + extensions 1 to max; + } + } + + optional uint64 timestamp = 1; // Timestamp at message sending time + repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs + optional uint64 seq = 3; // Sequence number + optional string uuid = 4; // UUID to track message type in terms of schema definitions + optional bytes body = 5; // To optionally bypass the whole definition above + extensions 6 to max; // For third party extensions +} diff --git a/extensions/mqtt/tests/CMakeLists.txt b/extensions/mqtt/tests/CMakeLists.txt index 0969fe635a..bba60550af 100644 --- a/extensions/mqtt/tests/CMakeLists.txt +++ b/extensions/mqtt/tests/CMakeLists.txt @@ -24,6 +24,7 @@ FOREACH(testfile ${MQTT_TESTS}) add_minifi_executable("${testfilename}" "${testfile}") target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_CURRENT_SOURCE_DIR}/../../extensions/standard-processors") target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_CURRENT_SOURCE_DIR}/../../../libminifi/test/") + target_include_directories(${testfilename} SYSTEM PRIVATE BEFORE "${CMAKE_BINARY_DIR}/sparkplug-protobuf-generated" "${PROTOBUF_INCLUDE_DIR}") createTests("${testfilename}") target_link_libraries(${testfilename} Catch2WithMain) target_link_libraries(${testfilename} minifi-mqtt-extensions) diff --git a/extensions/mqtt/tests/ConsumeMQTTTests.cpp b/extensions/mqtt/tests/ConsumeMQTTTests.cpp index f367037207..195784abae 100644 --- a/extensions/mqtt/tests/ConsumeMQTTTests.cpp +++ b/extensions/mqtt/tests/ConsumeMQTTTests.cpp @@ -22,10 +22,12 @@ #include "../processors/ConsumeMQTT.h" #include "core/Resource.h" #include "unit/SingleProcessorTestController.h" +#include "sparkplug_b.pb.h" #include "rapidjson/document.h" #include "unit/ProcessorUtils.h" namespace org::apache::nifi::minifi::test { +namespace { void verifyXmlJsonResult(const std::string& json_content, size_t expected_record_count, bool add_attributes_as_fields) { rapidjson::Document document; document.Parse(json_content.c_str()); @@ -75,6 +77,69 @@ void verifyXmlJsonResult(const std::string& json_content, size_t expected_record } } +std::string createTestSparkplugPayload() { + org::eclipse::tahu::protobuf::Payload payload; + payload.set_uuid("test-uuid"); + payload.set_timestamp(987654321); + payload.set_seq(12345); + payload.set_body("test-body"); + auto metrics = payload.add_metrics(); + metrics->set_name("test-metric"); + metrics->set_datatype(99); + metrics->set_timestamp(42); + io::BufferStream buffer_stream; + std::string payload_string; + payload.SerializeToString(&payload_string); + return payload_string; +} + +void verifySparkplugJsonResult(const std::string& json_content, size_t expected_record_count, bool add_attributes_as_fields) { + rapidjson::Document document; + document.Parse(json_content.c_str()); + REQUIRE(document.IsArray()); + REQUIRE(document.GetArray().Size() == expected_record_count); + for (size_t i = 0; i < expected_record_count; ++i) { + auto& current_record = document[gsl::narrow(i)]; + std::string string_result = current_record["uuid"].GetString(); + CHECK(string_result == "test-uuid"); + auto int_result = current_record["timestamp"].GetInt64(); + CHECK(int_result == 987654321); + int_result = current_record["seq"].GetInt64(); + CHECK(int_result == 12345); + string_result = current_record["body"].GetString(); + CHECK(string_result == "test-body"); + string_result = current_record["metrics"][0]["name"].GetString(); + CHECK(string_result == "test-metric"); + int_result = current_record["metrics"][0]["datatype"].GetInt64(); + CHECK(int_result == 99); + int_result = current_record["metrics"][0]["timestamp"].GetInt64(); + CHECK(int_result == 42); + + if (add_attributes_as_fields) { + string_result = current_record["_topic"].GetString(); + CHECK(string_result == "mytopic/segment"); + auto array = current_record["_topicSegments"].GetArray(); + CHECK(array.Size() == 2); + string_result = array[0].GetString(); + CHECK(string_result == "mytopic"); + string_result = array[1].GetString(); + CHECK(string_result == "segment"); + int_result = current_record["_qos"].GetInt64(); + CHECK(int_result == 1); + bool bool_result = current_record["_isDuplicate"].GetBool(); + CHECK_FALSE(bool_result); + bool_result = current_record["_isRetained"].GetBool(); + CHECK_FALSE(bool_result); + } else { + CHECK_FALSE(current_record.HasMember("_topic")); + CHECK_FALSE(current_record.HasMember("_qos")); + CHECK_FALSE(current_record.HasMember("_isDuplicate")); + CHECK_FALSE(current_record.HasMember("_isRetained")); + } + } +} +} // namespace + class TestConsumeMQTTProcessor : public minifi::processors::ConsumeMQTT { public: using SmartMessage = processors::AbstractMQTTProcessor::SmartMessage; @@ -368,4 +433,64 @@ TEST_CASE_METHOD(ConsumeMqttTestFixture, "Test scheduling failure if non-existen } } +TEST_CASE_METHOD(ConsumeMqttTestFixture, "Read Sparkplug messages and write them to json records", "[consumeMQTTTest]") { + test_controller_.plan->addController("SparkplugBReader", "SparkplugBReader"); + test_controller_.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter"); + REQUIRE(consume_mqtt_processor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883")); + REQUIRE(consume_mqtt_processor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic")); + REQUIRE(consume_mqtt_processor_->setProperty(minifi::processors::ConsumeMQTT::RecordReader.name, "SparkplugBReader")); + REQUIRE(consume_mqtt_processor_->setProperty(minifi::processors::ConsumeMQTT::RecordWriter.name, "JsonRecordSetWriter")); + + bool add_attributes_as_fields = true; + SECTION("Add attributes as fields by default") { + } + + SECTION("Do not add attributes as fields") { + add_attributes_as_fields = false; + REQUIRE(consume_mqtt_processor_->setProperty(minifi::processors::ConsumeMQTT::AddAttributesAsFields.name, "false")); + } + + const size_t expected_record_count = 2; + const auto payload = createTestSparkplugPayload(); + for (size_t i = 0; i < expected_record_count; ++i) { + TestConsumeMQTTProcessor::SmartMessage message{std::unique_ptr( + new MQTTAsync_message{.struct_id = {'M', 'Q', 'T', 'M'}, .struct_version = 1, .payloadlen = gsl::narrow(payload.size()), + .payload = const_cast(payload.data()), .qos = 1, .retained = 0, .dup = 0, .msgid = 42, .properties = {}}), + std::string{"mytopic/segment"}}; // NOLINT(clang-analyzer-cplusplus.NewDeleteLeaks) + auto& test_processor = dynamic_cast(consume_mqtt_processor_->getImpl()); + test_processor.enqueueReceivedMQTTMsg(std::move(message)); + } + const auto trigger_results = test_controller_.trigger(); + CHECK(trigger_results.at(TestConsumeMQTTProcessor::Success).size() == 1); + const auto flow_file = trigger_results.at(TestConsumeMQTTProcessor::Success).at(0); + + auto string_content = test_controller_.plan->getContent(flow_file); + verifySparkplugJsonResult(string_content, expected_record_count, add_attributes_as_fields); + + CHECK(*flow_file->getAttribute("record.count") == "2"); + CHECK(*flow_file->getAttribute("mqtt.broker") == "127.0.0.1:1883"); +} + +TEST_CASE_METHOD(ConsumeMqttTestFixture, "Invalid Sparkplug payload does not result in new flow files", "[consumeMQTTTest]") { + test_controller_.plan->addController("SparkplugBReader", "SparkplugBReader"); + test_controller_.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter"); + REQUIRE(consume_mqtt_processor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883")); + REQUIRE(consume_mqtt_processor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic")); + REQUIRE(consume_mqtt_processor_->setProperty(minifi::processors::ConsumeMQTT::RecordReader.name, "SparkplugBReader")); + REQUIRE(consume_mqtt_processor_->setProperty(minifi::processors::ConsumeMQTT::RecordWriter.name, "JsonRecordSetWriter")); + + const std::string payload = "invalid sparkplug payload"; + TestConsumeMQTTProcessor::SmartMessage message{ + std::unique_ptr( + new MQTTAsync_message{.struct_id = {'M', 'Q', 'T', 'M'}, .struct_version = 1, .payloadlen = gsl::narrow(payload.size()), + .payload = const_cast(payload.data()), .qos = 1, .retained = 0, .dup = 0, .msgid = 42, .properties = {}}), + std::string{"mytopic"}}; // NOLINT(clang-analyzer-cplusplus.NewDeleteLeaks) + auto& test_processor = dynamic_cast(consume_mqtt_processor_->getImpl()); + test_processor.enqueueReceivedMQTTMsg(std::move(message)); + + const auto trigger_results = test_controller_.trigger(); + CHECK(trigger_results.at(TestConsumeMQTTProcessor::Success).empty()); + REQUIRE(LogTestController::getInstance().contains("[error] Failed to read records from MQTT message", 1s)); +} + } // namespace org::apache::nifi::minifi::test diff --git a/extensions/mqtt/tests/SparkplugBReaderTests.cpp b/extensions/mqtt/tests/SparkplugBReaderTests.cpp new file mode 100644 index 0000000000..445d3a0173 --- /dev/null +++ b/extensions/mqtt/tests/SparkplugBReaderTests.cpp @@ -0,0 +1,68 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "unit/Catch.h" +#include "unit/TestBase.h" +#include "../controllers/SparkplugBReader.h" +#include "io/BufferStream.h" +#include "sparkplug_b.pb.h" + +namespace org::apache::nifi::minifi::test { + +TEST_CASE("Test invalid payload read failure", "[SparkplugBReader]") { + io::BufferStream buffer_stream; + std::string payload_string = "invalid payload data"; + buffer_stream.write(reinterpret_cast(payload_string.data()), payload_string.size()); + + controllers::SparkplugBReader sparkplug_reader("SparkplugBReader"); + auto record_set = sparkplug_reader.read(buffer_stream); + REQUIRE_FALSE(record_set.has_value()); + REQUIRE(record_set.error().value() == std::make_error_code(std::errc::invalid_argument).value()); +} + +TEST_CASE("Test converting serialized Sparkplug payload to recordset", "[SparkplugBReader]") { + org::eclipse::tahu::protobuf::Payload payload; + payload.set_uuid("test-uuid"); + payload.set_timestamp(987654321); + payload.set_seq(12345); + payload.set_body("test-body"); + auto metrics = payload.add_metrics(); + metrics->set_name("test-metric"); + metrics->set_datatype(99); + metrics->set_timestamp(42); + io::BufferStream buffer_stream; + std::string payload_string; + payload.SerializeToString(&payload_string); + buffer_stream.write(reinterpret_cast(payload_string.data()), payload_string.size()); + + controllers::SparkplugBReader sparkplug_reader("SparkplugBReader"); + auto record_set = sparkplug_reader.read(buffer_stream); + REQUIRE(record_set.has_value()); + REQUIRE(record_set->size() == 1); + auto& record = record_set->at(0); + CHECK(std::get(record.at("uuid").value_) == "test-uuid"); + CHECK(std::get(record.at("timestamp").value_) == 987654321); + CHECK(std::get(record.at("seq").value_) == 12345); + CHECK(std::get(record.at("body").value_) == "test-body"); + auto record_metric_array = std::get(record.at("metrics").value_); + REQUIRE(record_metric_array.size() == 1); + auto record_metric = std::get(record_metric_array.at(0).value_); + CHECK(std::get(record_metric.at("name").value_) == "test-metric"); +} + +} // namespace org::apache::nifi::minifi::test diff --git a/run_flake8.sh b/run_flake8.sh index 1e884aa57f..6e19b86852 100755 --- a/run_flake8.sh +++ b/run_flake8.sh @@ -19,4 +19,4 @@ set -euo pipefail directory=${1:-.} -flake8 --exclude venv,thirdparty,build,cmake-build-*,github_env --builtins log,REL_SUCCESS,REL_FAILURE,REL_ORIGINAL,raw_input --ignore E501,W503 --per-file-ignores="steps.py:F811" "${directory}" +flake8 --exclude venv,thirdparty,build,cmake-build-*,github_env,*sparkplug_b_pb2.py --builtins log,REL_SUCCESS,REL_FAILURE,REL_ORIGINAL,raw_input --ignore E501,W503 --per-file-ignores="steps.py:F811" "${directory}"