diff --git a/modules/test/protocol/python/requirements.txt b/modules/test/protocol/python/requirements.txt index 1fe889fe9..4d9dc64b9 100644 --- a/modules/test/protocol/python/requirements.txt +++ b/modules/test/protocol/python/requirements.txt @@ -1,13 +1,13 @@ # Dependencies to user defined packages # Package dependencies should always be defined before the user defined # packages to prevent auto-upgrades of stable dependencies -bacpypes==0.18.7 +bacpypes3==0.0.104 colorama==0.4.6 # User defined packages # Required for BACnet protocol tests netifaces==0.11.0 -BAC0==23.7.3 +BAC0==2025.9.15 pytz==2024.2 # Required for Modbus protocol tests diff --git a/modules/test/protocol/python/src/protocol_bacnet.py b/modules/test/protocol/python/src/protocol_bacnet.py index 3b6d8f0ce..0981b7c89 100644 --- a/modules/test/protocol/python/src/protocol_bacnet.py +++ b/modules/test/protocol/python/src/protocol_bacnet.py @@ -14,6 +14,7 @@ """Module to run all the BACnet related methods for testing""" import BAC0 +from dataclasses import dataclass import logging import json from common import util @@ -29,15 +30,25 @@ DEFAULT_CAPTURE_FILE = 'protocol.pcap' DEFAULT_BIN_DIR = '/testrun/bin' + +@dataclass +class BACnetDevice: + device_id: str + ip: str + + class BACnet(): """BACnet Test module""" + devices: list[BACnetDevice] = [] + bacnet: BAC0.lite def __init__(self, log, + device_hw_addr, captures_dir=DEFAULT_CAPTURES_DIR, capture_file=DEFAULT_CAPTURE_FILE, bin_dir=DEFAULT_BIN_DIR, - device_hw_addr=None): + ): # Set the log global LOGGER LOGGER = log @@ -51,23 +62,30 @@ def __init__(self, self._capture_file = capture_file self._bin_dir = bin_dir self.device_hw_addr = device_hw_addr - self.devices = [] - self.bacnet = None self._bin_dir = bin_dir - def discover(self, local_ip=None): + async def discover(self, local_ip): LOGGER.info('Performing BACnet discovery...') self.bacnet = BAC0.lite(local_ip) LOGGER.info('Local BACnet object: ' + str(self.bacnet)) try: - self.bacnet.discover(global_broadcast=True) - except Exception as e: # pylint: disable=W0718 + await self.bacnet._discover(global_broadcast=True) # pylint: disable=protected-access + except Exception as e: # pylint: disable=W0718 LOGGER.error(e) LOGGER.info('BACnet discovery complete') with open(BAC0_LOG, 'r', encoding='utf-8') as f: bac0_log = f.read() LOGGER.info('BAC0 Log:\n' + bac0_log) - self.devices = self.bacnet.devices + # Extract discovered devices as a BACnetDevice. + self.devices = [] + if self.bacnet.discoveredDevices is not None: + for device_info in self.bacnet.discoveredDevices.values(): + self.devices.append( + BACnetDevice( + device_id=str(device_info['object_instance'][1]), + ip=str(device_info['address']) + ) + ) LOGGER.info('BACnet devices found: ' + str(len(self.devices))) # Check if the device being tested is in the discovered devices list @@ -79,10 +97,9 @@ def validate_device(self): if len(self.devices) > 0: result = True for device in self.devices: - object_id = str(device[3]) # BACnet Object ID - LOGGER.info('Checking device: ' + str(device)) + LOGGER.info(f'Checking device: {device.device_id}') device_valid = self.validate_bacnet_source( - object_id=object_id, device_hw_addr=self.device_hw_addr) + device=device, device_hw_addr=self.device_hw_addr) if device_valid is not None: result &= device_valid description = ('BACnet device discovered' if result else @@ -96,13 +113,15 @@ def validate_device(self): return result, description - def validate_protocol_version(self, device_addr, device_id): - LOGGER.info(f'Resolving protocol version for BACnet device: {device_id}') + def validate_protocol_version(self, device: BACnetDevice) -> tuple[bool, str]: + LOGGER.info( + f'Resolving protocol version for BACnet device: {device.device_id}' + ) try: version = self.bacnet.read( - f'{device_addr} device {device_id} protocolVersion') + f'{device.ip} device {device.device_id} protocolVersion') revision = self.bacnet.read( - f'{device_addr} device {device_id} protocolRevision') + f'{device.ip} device {device.device_id} protocolRevision') protocol_version = f'{version}.{revision}' result = True result_description = f'Device uses BACnet version {protocol_version}' @@ -115,17 +134,23 @@ def validate_protocol_version(self, device_addr, device_id): # Validate that all traffic to/from BACnet device from # discovered object id matches the MAC address of the device - def validate_bacnet_source(self, object_id, device_hw_addr): + def validate_bacnet_source( + self, device: BACnetDevice, + device_hw_addr: str + ) -> bool: try: - LOGGER.info(f'Checking BACnet traffic for object id {object_id}') + LOGGER.info(f'Checking BACnet traffic for object id {device.device_id}') capture_file = os.path.join(self._captures_dir, self._capture_file) - packets = self.get_bacnet_packets(capture_file, object_id) + packets = self.get_bacnet_packets(capture_file, device) valid = None # If no packets are found in protocol.pcap if not packets: - LOGGER.debug(f'No BACnet packets found for object id {object_id}') + LOGGER.debug( + f'No BACnet packets found for object id {device.device_id}' + ) for packet in packets: - if object_id in packet['_source']['layers']['bacapp.instance_number']: + pakcet_bac = packet['_source']['layers']['bacapp.instance_number'] + if device.device_id in pakcet_bac: if device_hw_addr.lower() in packet['_source']['layers']['eth.src']: LOGGER.debug('BACnet detected from device') valid = True if valid is None else valid and True @@ -143,9 +168,13 @@ def validate_bacnet_source(self, object_id, device_hw_addr): LOGGER.error('Error occurred when validating source', exc_info=True) return False - def get_bacnet_packets(self, capture_file, object_id): + def get_bacnet_packets( + self, + capture_file: str, + device: BACnetDevice + ) -> list[dict]: bin_file = self._bin_dir + '/get_bacnet_packets.sh' - args = f'"{capture_file}" {object_id}' + args = f'"{capture_file}" {device.device_id}' command = f'{bin_file} {args}' response = util.run_command(command) return json.loads(response[0].strip()) diff --git a/modules/test/protocol/python/src/protocol_module.py b/modules/test/protocol/python/src/protocol_module.py index 92891465d..646f912d1 100644 --- a/modules/test/protocol/python/src/protocol_module.py +++ b/modules/test/protocol/python/src/protocol_module.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Protocol test module""" +import asyncio from test_module import TestModule import netifaces from protocol_bacnet import BACnet @@ -36,8 +37,7 @@ def _protocol_valid_bacnet(self): result = None interface_name = 'veth0' # If the ipv4 address wasn't resolved yet, try again - if self._device_ipv4_addr is None: - self._device_ipv4_addr = self._get_device_ipv4() + self._device_ipv4_addr = self._get_device_ipv4() if self._device_ipv4_addr is None: LOGGER.error('No device IP could be resolved') @@ -46,7 +46,12 @@ def _protocol_valid_bacnet(self): # Resolve the appropriate IP for BACnet comms local_address = self.get_local_ip(interface_name) if local_address: - self._bacnet.discover(local_address + '/24') + local_address += '/24' + try: + loop = asyncio.get_running_loop() + loop.run_until_complete(self._bacnet.discover(local_address)) + except RuntimeError: + asyncio.run(self._bacnet.discover(local_address)) result = self._bacnet.validate_device() if result[0]: self._supports_bacnet = True @@ -73,10 +78,8 @@ def _protocol_bacnet_version(self): if len(self._bacnet.devices) > 0: for device in self._bacnet.devices: LOGGER.debug(f'Checking BACnet version for device: {device}') - device_addr = device[2] - device_id = device[3] result_status, result_description = \ - self._bacnet.validate_protocol_version(device_addr,device_id) + self._bacnet.validate_protocol_version(device) break LOGGER.info(result_description) diff --git a/testing/unit/protocol/protocol_module_test.py b/testing/unit/protocol/protocol_module_test.py index 9d474ab91..063fd9401 100644 --- a/testing/unit/protocol/protocol_module_test.py +++ b/testing/unit/protocol/protocol_module_test.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Module run all the DNS related unit tests""" -from protocol_bacnet import BACnet +from protocol_bacnet import BACnet, BACnetDevice import unittest import os import sys @@ -52,8 +52,10 @@ def setUpClass(cls): # Test the BACNet traffic for a matching Object ID and HW address def bacnet_protocol_traffic_test(self): LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}') - result = BACNET.validate_bacnet_source(object_id='1761001', - device_hw_addr=HW_ADDR) + result = BACNET.validate_bacnet_source( + BACnetDevice(device_id='1761001', ip='10.10.10.14'), + device_hw_addr=HW_ADDR + ) LOGGER.info(f'Test Result: {result}') self.assertEqual(result, True) @@ -61,8 +63,10 @@ def bacnet_protocol_traffic_test(self): # do not match def bacnet_protocol_traffic_fail_test(self): LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}') - result = BACNET.validate_bacnet_source(object_id='1761001', - device_hw_addr=HW_ADDR_BAD) + result = BACNET.validate_bacnet_source( + BACnetDevice(device_id='1761001', ip='10.10.10.14'), + device_hw_addr=HW_ADDR_BAD + ) LOGGER.info(f'Test Result: {result}') self.assertEqual(result, False) @@ -71,7 +75,7 @@ def bacnet_protocol_traffic_fail_test(self): def bacnet_protocol_validate_device_test(self): LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}') # Load bacnet devices to simulate a discovery - bac_dev = ('TestDevice', 'Testrun', '10.10.10.14', 1761001) + bac_dev = BACnetDevice(device_id='1761001', ip='10.10.10.14') BACNET.devices = [bac_dev] result = BACNET.validate_device() LOGGER.info(f'Test Result: {result}') @@ -82,7 +86,7 @@ def bacnet_protocol_validate_device_test(self): def bacnet_protocol_validate_device_fail_test(self): LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}') # Load bacnet devices to simulate a discovery - bac_dev = ('TestDevice', 'Testrun', '10.10.10.14', 1761001) + bac_dev = BACnetDevice(device_id='1761001', ip='10.10.10.14') BACNET.devices = [bac_dev] # Change the MAC address to a different device than expected BACNET.device_hw_addr = HW_ADDR_BAD