Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions modules/test/protocol/python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# Dependencies to user defined packages
# Package dependencies should always be defined before the user defined
# packages to prevent auto-upgrades of stable dependencies
bacpypes==0.18.7
bacpypes3==0.0.104
colorama==0.4.6

# User defined packages
# Required for BACnet protocol tests
netifaces==0.11.0
BAC0==23.7.3
BAC0==2025.9.15
pytz==2024.2

# Required for Modbus protocol tests
Expand Down
71 changes: 50 additions & 21 deletions modules/test/protocol/python/src/protocol_bacnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""Module to run all the BACnet related methods for testing"""

import BAC0
from dataclasses import dataclass
import logging
import json
from common import util
Expand All @@ -29,15 +30,25 @@
DEFAULT_CAPTURE_FILE = 'protocol.pcap'
DEFAULT_BIN_DIR = '/testrun/bin'


@dataclass
class BACnetDevice:
device_id: str
ip: str


class BACnet():
"""BACnet Test module"""
devices: list[BACnetDevice] = []
bacnet: BAC0.lite

def __init__(self,
log,
device_hw_addr,
captures_dir=DEFAULT_CAPTURES_DIR,
capture_file=DEFAULT_CAPTURE_FILE,
bin_dir=DEFAULT_BIN_DIR,
device_hw_addr=None):
):
# Set the log
global LOGGER
LOGGER = log
Expand All @@ -51,23 +62,30 @@ def __init__(self,
self._capture_file = capture_file
self._bin_dir = bin_dir
self.device_hw_addr = device_hw_addr
self.devices = []
self.bacnet = None
self._bin_dir = bin_dir

def discover(self, local_ip=None):
async def discover(self, local_ip):
LOGGER.info('Performing BACnet discovery...')
self.bacnet = BAC0.lite(local_ip)
LOGGER.info('Local BACnet object: ' + str(self.bacnet))
try:
self.bacnet.discover(global_broadcast=True)
except Exception as e: # pylint: disable=W0718
await self.bacnet._discover(global_broadcast=True) # pylint: disable=protected-access
except Exception as e: # pylint: disable=W0718
LOGGER.error(e)
LOGGER.info('BACnet discovery complete')
with open(BAC0_LOG, 'r', encoding='utf-8') as f:
bac0_log = f.read()
LOGGER.info('BAC0 Log:\n' + bac0_log)
self.devices = self.bacnet.devices
# Extract discovered devices as a BACnetDevice.
self.devices = []
if self.bacnet.discoveredDevices is not None:
for device_info in self.bacnet.discoveredDevices.values():
self.devices.append(
BACnetDevice(
device_id=str(device_info['object_instance'][1]),
ip=str(device_info['address'])
)
)
LOGGER.info('BACnet devices found: ' + str(len(self.devices)))

# Check if the device being tested is in the discovered devices list
Expand All @@ -79,10 +97,9 @@ def validate_device(self):
if len(self.devices) > 0:
result = True
for device in self.devices:
object_id = str(device[3]) # BACnet Object ID
LOGGER.info('Checking device: ' + str(device))
LOGGER.info(f'Checking device: {device.device_id}')
device_valid = self.validate_bacnet_source(
object_id=object_id, device_hw_addr=self.device_hw_addr)
device=device, device_hw_addr=self.device_hw_addr)
if device_valid is not None:
result &= device_valid
description = ('BACnet device discovered' if result else
Expand All @@ -96,13 +113,15 @@ def validate_device(self):
return result, description


def validate_protocol_version(self, device_addr, device_id):
LOGGER.info(f'Resolving protocol version for BACnet device: {device_id}')
def validate_protocol_version(self, device: BACnetDevice) -> tuple[bool, str]:
LOGGER.info(
f'Resolving protocol version for BACnet device: {device.device_id}'
)
try:
version = self.bacnet.read(
f'{device_addr} device {device_id} protocolVersion')
f'{device.ip} device {device.device_id} protocolVersion')
revision = self.bacnet.read(
f'{device_addr} device {device_id} protocolRevision')
f'{device.ip} device {device.device_id} protocolRevision')
protocol_version = f'{version}.{revision}'
result = True
result_description = f'Device uses BACnet version {protocol_version}'
Expand All @@ -115,17 +134,23 @@ def validate_protocol_version(self, device_addr, device_id):

# Validate that all traffic to/from BACnet device from
# discovered object id matches the MAC address of the device
def validate_bacnet_source(self, object_id, device_hw_addr):
def validate_bacnet_source(
self, device: BACnetDevice,
device_hw_addr: str
) -> bool:
try:
LOGGER.info(f'Checking BACnet traffic for object id {object_id}')
LOGGER.info(f'Checking BACnet traffic for object id {device.device_id}')
capture_file = os.path.join(self._captures_dir, self._capture_file)
packets = self.get_bacnet_packets(capture_file, object_id)
packets = self.get_bacnet_packets(capture_file, device)
valid = None
# If no packets are found in protocol.pcap
if not packets:
LOGGER.debug(f'No BACnet packets found for object id {object_id}')
LOGGER.debug(
f'No BACnet packets found for object id {device.device_id}'
)
for packet in packets:
if object_id in packet['_source']['layers']['bacapp.instance_number']:
pakcet_bac = packet['_source']['layers']['bacapp.instance_number']
if device.device_id in pakcet_bac:
if device_hw_addr.lower() in packet['_source']['layers']['eth.src']:
LOGGER.debug('BACnet detected from device')
valid = True if valid is None else valid and True
Expand All @@ -143,9 +168,13 @@ def validate_bacnet_source(self, object_id, device_hw_addr):
LOGGER.error('Error occurred when validating source', exc_info=True)
return False

def get_bacnet_packets(self, capture_file, object_id):
def get_bacnet_packets(
self,
capture_file: str,
device: BACnetDevice
) -> list[dict]:
bin_file = self._bin_dir + '/get_bacnet_packets.sh'
args = f'"{capture_file}" {object_id}'
args = f'"{capture_file}" {device.device_id}'
command = f'{bin_file} {args}'
response = util.run_command(command)
return json.loads(response[0].strip())
15 changes: 9 additions & 6 deletions modules/test/protocol/python/src/protocol_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Protocol test module"""
import asyncio
from test_module import TestModule
import netifaces
from protocol_bacnet import BACnet
Expand All @@ -36,8 +37,7 @@ def _protocol_valid_bacnet(self):
result = None
interface_name = 'veth0'
# If the ipv4 address wasn't resolved yet, try again
if self._device_ipv4_addr is None:
self._device_ipv4_addr = self._get_device_ipv4()
self._device_ipv4_addr = self._get_device_ipv4()

if self._device_ipv4_addr is None:
LOGGER.error('No device IP could be resolved')
Expand All @@ -46,7 +46,12 @@ def _protocol_valid_bacnet(self):
# Resolve the appropriate IP for BACnet comms
local_address = self.get_local_ip(interface_name)
if local_address:
self._bacnet.discover(local_address + '/24')
local_address += '/24'
try:
loop = asyncio.get_running_loop()
loop.run_until_complete(self._bacnet.discover(local_address))
except RuntimeError:
asyncio.run(self._bacnet.discover(local_address))
result = self._bacnet.validate_device()
if result[0]:
self._supports_bacnet = True
Expand All @@ -73,10 +78,8 @@ def _protocol_bacnet_version(self):
if len(self._bacnet.devices) > 0:
for device in self._bacnet.devices:
LOGGER.debug(f'Checking BACnet version for device: {device}')
device_addr = device[2]
device_id = device[3]
result_status, result_description = \
self._bacnet.validate_protocol_version(device_addr,device_id)
self._bacnet.validate_protocol_version(device)
break

LOGGER.info(result_description)
Expand Down
18 changes: 11 additions & 7 deletions testing/unit/protocol/protocol_module_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module run all the DNS related unit tests"""
from protocol_bacnet import BACnet
from protocol_bacnet import BACnet, BACnetDevice
import unittest
import os
import sys
Expand Down Expand Up @@ -52,17 +52,21 @@ def setUpClass(cls):
# Test the BACNet traffic for a matching Object ID and HW address
def bacnet_protocol_traffic_test(self):
LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}')
result = BACNET.validate_bacnet_source(object_id='1761001',
device_hw_addr=HW_ADDR)
result = BACNET.validate_bacnet_source(
BACnetDevice(device_id='1761001', ip='10.10.10.14'),
device_hw_addr=HW_ADDR
)
LOGGER.info(f'Test Result: {result}')
self.assertEqual(result, True)

# Test the BACNet test when Object ID and HW address
# do not match
def bacnet_protocol_traffic_fail_test(self):
LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}')
result = BACNET.validate_bacnet_source(object_id='1761001',
device_hw_addr=HW_ADDR_BAD)
result = BACNET.validate_bacnet_source(
BACnetDevice(device_id='1761001', ip='10.10.10.14'),
device_hw_addr=HW_ADDR_BAD
)
LOGGER.info(f'Test Result: {result}')
self.assertEqual(result, False)

Expand All @@ -71,7 +75,7 @@ def bacnet_protocol_traffic_fail_test(self):
def bacnet_protocol_validate_device_test(self):
LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}')
# Load bacnet devices to simulate a discovery
bac_dev = ('TestDevice', 'Testrun', '10.10.10.14', 1761001)
bac_dev = BACnetDevice(device_id='1761001', ip='10.10.10.14')
BACNET.devices = [bac_dev]
result = BACNET.validate_device()
LOGGER.info(f'Test Result: {result}')
Expand All @@ -82,7 +86,7 @@ def bacnet_protocol_validate_device_test(self):
def bacnet_protocol_validate_device_fail_test(self):
LOGGER.info(f'Running { inspect.currentframe().f_code.co_name}')
# Load bacnet devices to simulate a discovery
bac_dev = ('TestDevice', 'Testrun', '10.10.10.14', 1761001)
bac_dev = BACnetDevice(device_id='1761001', ip='10.10.10.14')
BACNET.devices = [bac_dev]
# Change the MAC address to a different device than expected
BACNET.device_hw_addr = HW_ADDR_BAD
Expand Down
Loading