diff --git a/.gitignore b/.gitignore index 9d6cc2acc..772a5356a 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ error pylint.out __pycache__/ build/ +local/reports/ # Ignore generated files from unit tests testing/unit_test/temp/ diff --git a/cmd/install b/cmd/install index baf0f5469..e7b0ca81a 100755 --- a/cmd/install +++ b/cmd/install @@ -68,7 +68,7 @@ deactivate cmd/build # Create local folders -mkdir -p local/{devices,root_certs,risk_profiles} +mkdir -p local/{devices,root_certs,risk_profiles,reports} # Set file permissions on local # This does not work on GitHub actions diff --git a/docs/dev/mockoon.json b/docs/dev/mockoon.json index d8a753259..b35465c08 100644 --- a/docs/dev/mockoon.json +++ b/docs/dev/mockoon.json @@ -605,7 +605,7 @@ "responses": [ { "uuid": "6adc954a-55c9-40ed-8f49-cf38f659d883", - "body": "[\n {\n \"mac_addr\": \"00:1e:42:35:73:c6\",\n \"device\": {\n \"mac_addr\": \"00:1e:42:35:73:c4\",\n \"manufacturer\": \"Teltonika\",\n \"model\": \"TRB140\",\n \"firmware\": \"1.2.3\",\n \"test_modules\": {\n \"connection\": {\n \"enabled\": false\n },\n \"ntp\": {\n \"enabled\": true\n },\n \"dns\": {\n \"enabled\": true\n },\n \"services\": {\n \"enabled\": true\n },\n \"tls\": {\n \"enabled\": true\n },\n \"protocol\": {\n \"enabled\": true\n }\n }\n },\n \"status\": \"Non-Compliant\",\n \"started\": \"2024-05-03 12:09:59\",\n \"finished\": \"2024-05-03 12:15:51\",\n \"tests\": {\n \"total\": 20,\n \"results\": [\n {\n \"name\": \"protocol.valid_bacnet\",\n \"description\": \"BACnet discovery could not resolve any devices\",\n \"expected_behavior\": \"BACnet traffic can be seen on the network and packets are valid and not malformed\",\n \"required_result\": \"Recommended\",\n \"result\": \"Skipped\"\n },\n {\n \"name\": \"protocol.bacnet.version\",\n \"description\": \"No BACnet devices discovered.\",\n \"expected_behavior\": \"The BACnet client implements an up to date version of BACnet\",\n \"required_result\": \"Recommended\",\n \"result\": \"Skipped\"\n },\n {\n \"name\": \"protocol.valid_modbus\",\n \"description\": \"Failed to establish Modbus connection to device\",\n \"expected_behavior\": \"Any Modbus functionality works as expected and valid Modbus traffic can be observed\",\n \"required_result\": \"Recommended\",\n \"result\": \"Non-Compliant\"\n },\n {\n \"name\": \"ntp.network.ntp_support\",\n \"description\": \"Device sent NTPv3 packets. NTPv3 is not allowed.\",\n \"expected_behavior\": \"The device sends an NTPv4 request to the configured NTP server.\",\n \"required_result\": \"Required\",\n \"result\": \"Non-Compliant\"\n },\n {\n \"name\": \"ntp.network.ntp_dhcp\",\n \"description\": \"Device sent NTP request to non-DHCP provided server\",\n \"expected_behavior\": \"Device can accept NTP server address, provided by the DHCP server (DHCP OFFER PACKET)\",\n \"required_result\": \"Roadmap\",\n \"result\": \"Non-Compliant\"\n },\n {\n \"name\": \"security.services.ftp\",\n \"description\": \"No FTP server found\",\n \"expected_behavior\": \"There is no FTP service running on any port\",\n \"required_result\": \"Required\",\n \"result\": \"Compliant\"\n },\n {\n \"name\": \"security.ssh.version\",\n \"description\": \"SSH server found running protocol 2.0\",\n \"expected_behavior\": \"SSH server is not running or server is SSHv2\",\n \"required_result\": \"Required\",\n \"result\": \"Compliant\"\n },\n {\n \"name\": \"security.services.telnet\",\n \"description\": \"No telnet server found\",\n \"expected_behavior\": \"There is no Telnet service running on any port\",\n \"required_result\": \"Required\",\n \"result\": \"Compliant\"\n },\n {\n \"name\": \"security.services.smtp\",\n \"description\": \"No SMTP server found\",\n \"expected_behavior\": \"There is no SMTP service running on any port\",\n \"required_result\": \"Required\",\n \"result\": \"Compliant\"\n },\n {\n \"name\": \"security.services.http\",\n \"description\": \"Found HTTP server running on port 80/tcp\",\n \"expected_behavior\": \"Device is unreachable on port 80 (or any other port) and only responds to HTTPS requests on port 443 (or any other port if HTTP is used at all)\",\n \"required_result\": \"Required\",\n \"result\": \"Non-Compliant\"\n },\n {\n \"name\": \"security.services.pop\",\n \"description\": \"No POP server found\",\n \"expected_behavior\": \"There is no POP service running on any port\",\n \"required_result\": \"Required\",\n \"result\": \"Compliant\"\n },\n {\n \"name\": \"security.services.imap\",\n \"description\": \"No IMAP server found\",\n \"expected_behavior\": \"There is no IMAP service running on any port\",\n \"required_result\": \"Required\",\n \"result\": \"Compliant\"\n },\n {\n \"name\": \"security.services.snmpv3\",\n \"description\": \"No SNMP server found\",\n \"expected_behavior\": \"Device is unreachable on port 161 (or any other port) and device is unreachable on port 162 (or any other port) unless SNMP is essential in which case it is SNMPv3 is used.\",\n \"required_result\": \"Required\",\n \"result\": \"Compliant\"\n },\n {\n \"name\": \"security.services.vnc\",\n \"description\": \"No VNC server found\",\n \"expected_behavior\": \"Device cannot be accessed / connected to via VNC on any port\",\n \"required_result\": \"Required\",\n \"result\": \"Compliant\"\n },\n {\n \"name\": \"security.services.tftp\",\n \"description\": \"No TFTP server found\",\n \"expected_behavior\": \"There is no TFTP service running on any port\",\n \"required_result\": \"Required\",\n \"result\": \"Compliant\"\n },\n {\n \"name\": \"ntp.network.ntp_server\",\n \"description\": \"No NTP server found\",\n \"expected_behavior\": \"The device does not respond to NTP requests when it's IP is set as the NTP server on another device\",\n \"required_result\": \"Required\",\n \"result\": \"Compliant\"\n },\n {\n \"name\": \"dns.network.hostname_resolution\",\n \"description\": \"DNS traffic detected from device\",\n \"expected_behavior\": \"The device sends DNS requests.\",\n \"required_result\": \"Required\",\n \"result\": \"Compliant\"\n },\n {\n \"name\": \"dns.network.from_dhcp\",\n \"description\": \"DNS traffic detected only to DHCP provided server\",\n \"expected_behavior\": \"The device sends DNS requests to the DNS server provided by the DHCP server\",\n \"required_result\": \"Roadmap\",\n \"result\": \"Compliant\"\n },\n {\n \"name\": \"security.tls.v1_2_server\",\n \"description\": \"TLS 1.2 not validated: Certificate has expired\\nEC key length passed: 256 >= 224\\nDevice certificate has not been signed\\nTLS 1.3 not validated: Certificate has expired\\nEC key length passed: 256 >= 224\\nDevice certificate has not been signed\",\n \"expected_behavior\": \"TLS 1.2 certificate is issued to the web browser client when accessed\",\n \"required_result\": \"Required\",\n \"result\": \"Non-Compliant\"\n },\n {\n \"name\": \"security.tls.v1_2_client\",\n \"description\": \"No outbound TLS connections were found.\",\n \"expected_behavior\": \"The packet indicates a TLS connection with at least TLS 1.2 and support for ECDH and ECDSA ciphers\",\n \"required_result\": \"Required\",\n \"result\": \"Skipped\"\n }\n ]\n },\n \"report\": \"http://localhost:8000/report/123 123/2024-05-03T12:09:59\",\n \"export\": \"http://localhost:8000/export/123 123/2024-05-03T12:09:59\"\n \n }\n]", + "body": "[\n {\n \"testrun\": {\n \"version\": \"2.2.2\"\n },\n \"device\": {\n \"manufacturer\": \"vagrant\",\n \"model\": \"vm-compliant\",\n \"mac_addr\": \"f0:d4:e2:f2:f5:41\",\n \"firmware\": \"123\",\n \"test_pack\": \"Device Qualification\"\n },\n \"status\": \"Complete\",\n \"result\": \"Non-Compliant\",\n \"started\": \"2026-02-02 17:24:52\",\n \"finished\": \"2026-02-02 17:34:58\",\n \"report\": \"report/f0d4e2f2f541_2026-02-02T17:24:52\",\n \"export\": \"export/f0d4e2f2f541_2026-02-02T17:24:52\",\n \"folder_name\": \"f0d4e2f2f541_2026-02-02T17:24:52\",\n \"delete\": \"report/f0d4e2f2f541_2026-02-02T17:24:52\"\n }\n]", "latency": 0, "statusCode": 200, "label": "", @@ -824,7 +824,7 @@ "type": "http", "documentation": "Delete a Testrun report", "method": "delete", - "endpoint": "report", + "endpoint": "report/{folder_name}", "responses": [ { "uuid": "35b1585b-13d0-4702-9733-9e2f7fc08ab9", @@ -910,7 +910,7 @@ "type": "http", "documentation": "Get a Testrun PDF report", "method": "get", - "endpoint": "report/{device_name}/{timestamp}", + "endpoint": "report/{folder_name}", "responses": [ { "uuid": "09092f2d-907b-452c-b47b-b2f3b9b47189", @@ -1220,52 +1220,52 @@ "responseMode": null }, { - "uuid": "6d6b9fa9-c961-4007-89f3-9ea9cccc05e7", - "type": "http", - "documentation": "List root CA certificates", - "method": "get", - "endpoint": "system/config/certs", - "responses": [ - { - "uuid": "581b126d-fadd-4461-bece-97b5b1fd97fd", - "body": "[\n {\n \"name\": \"iot.bms.google.com\",\n \"organisation\": \"Google, Inc.\",\n \"expires\": \"2024-09-01T09:00:12Z\",\n \"status\": \"Valid\",\n \"type\": \"root\"\n },\n {\n \"name\": \"sensor.bms.google.com\",\n \"organisation\": \"Google, Inc.\",\n \"expires\": \"2022-09-01T09:00:12Z\",\n \"status\": \"Expired\",\n \"type\": \"intermediate\"\n }\n]", - "latency": 0, - "statusCode": 200, - "label": "Listing 2x certificates", - "headers": [], - "bodyType": "INLINE", - "filePath": "", - "databucketID": "", - "sendFileAsBody": false, - "rules": [], - "rulesOperator": "OR", - "disableTemplating": false, - "fallbackTo404": false, - "default": true, - "crudKey": "id", - "callbacks": [] + "uuid": "6d6b9fa9-c961-4007-89f3-9ea9cccc05e7", + "type": "http", + "documentation": "List root CA certificates", + "method": "get", + "endpoint": "system/config/certs", + "responses": [ + { + "uuid": "581b126d-fadd-4461-bece-97b5b1fd97fd", + "body": "[\n {\n \"name\": \"iot.bms.google.com\",\n \"organisation\": \"Google, Inc.\",\n \"expires\": \"2024-09-01T09:00:12Z\",\n \"status\": \"Valid\",\n \"type\": \"root\"\n },\n {\n \"name\": \"sensor.bms.google.com\",\n \"organisation\": \"Google, Inc.\",\n \"expires\": \"2022-09-01T09:00:12Z\",\n \"status\": \"Expired\",\n \"type\": \"intermediate\"\n }\n]", + "latency": 0, + "statusCode": 200, + "label": "Listing 2x certificates", + "headers": [], + "bodyType": "INLINE", + "filePath": "", + "databucketID": "", + "sendFileAsBody": false, + "rules": [], + "rulesOperator": "OR", + "disableTemplating": false, + "fallbackTo404": false, + "default": true, + "crudKey": "id", + "callbacks": [] }, { - "uuid": "b9fa58e6-95c8-45ea-b6fb-a16287a6e65b", - "body": "[]", - "latency": 0, - "statusCode": 200, - "label": "No certificates", - "headers": [], - "bodyType": "INLINE", - "filePath": "", - "databucketID": "", - "sendFileAsBody": false, - "rules": [], - "rulesOperator": "OR", - "disableTemplating": false, - "fallbackTo404": false, - "default": false, - "crudKey": "id", - "callbacks": [] + "uuid": "b9fa58e6-95c8-45ea-b6fb-a16287a6e65b", + "body": "[]", + "latency": 0, + "statusCode": 200, + "label": "No certificates", + "headers": [], + "bodyType": "INLINE", + "filePath": "", + "databucketID": "", + "sendFileAsBody": false, + "rules": [], + "rulesOperator": "OR", + "disableTemplating": false, + "fallbackTo404": false, + "default": false, + "crudKey": "id", + "callbacks": [] } - ], - "responseMode": null + ], + "responseMode": null }, { "uuid": "fb33213d-4448-431c-8733-1ce292644af6", @@ -1602,111 +1602,6 @@ } ], "responseMode": null - }, - { - "uuid": "af7fdcb0-721d-4198-a8ef-c6d8c4eba8c8", - "type": "http", - "documentation": "Get a Testrun PDF profile", - "method": "post", - "endpoint": "report/{profile_name}", - "responses": [ - { - "uuid": "9a759f46-4bc4-433a-be86-e456f069c217", - "body": "", - "latency": 0, - "statusCode": 200, - "label": "Profile found - no device selected", - "headers": [], - "bodyType": "FILE", - "filePath": "", - "databucketID": "", - "sendFileAsBody": false, - "rules": [], - "rulesOperator": "OR", - "disableTemplating": false, - "fallbackTo404": false, - "default": true, - "crudKey": "id", - "callbacks": [] - }, - { - "uuid": "c9a09ae7-3158-4956-93ac-4c8a90dfced8", - "body": "", - "latency": 0, - "statusCode": 200, - "label": "Profile found - device selected ", - "headers": [], - "bodyType": "FILE", - "filePath": "", - "databucketID": "", - "sendFileAsBody": false, - "rules": [], - "rulesOperator": "OR", - "disableTemplating": false, - "fallbackTo404": false, - "default": false, - "crudKey": "id", - "callbacks": [] - }, - { - "uuid": "5f98471e-15b6-47a4-a68d-e98c3a538b40", - "body": "{\n \"error\": \"Profile could not be found\"\n}", - "latency": 0, - "statusCode": 404, - "label": "Profile not found", - "headers": [], - "bodyType": "INLINE", - "filePath": "", - "databucketID": "", - "sendFileAsBody": false, - "rules": [], - "rulesOperator": "OR", - "disableTemplating": false, - "fallbackTo404": false, - "default": false, - "crudKey": "id", - "callbacks": [] - }, - { - "uuid": "767d9e78-386e-4bf7-bec8-71a005efdce9", - "body": "{\n \"error\": \"A device with that mac address could not be found\"\n}", - "latency": 0, - "statusCode": 404, - "label": "Device not found", - "headers": [], - "bodyType": "INLINE", - "filePath": "", - "databucketID": "", - "sendFileAsBody": false, - "rules": [], - "rulesOperator": "OR", - "disableTemplating": false, - "fallbackTo404": false, - "default": false, - "crudKey": "id", - "callbacks": [] - }, - { - "uuid": "5d76bea0-39c1-45f2-80f1-de6f770cb999", - "body": "{\n \"error\": \"Error retrieving the profile PDF\"\n}", - "latency": 0, - "statusCode": 500, - "label": "Error occurred", - "headers": [], - "bodyType": "INLINE", - "filePath": "", - "databucketID": "", - "sendFileAsBody": false, - "rules": [], - "rulesOperator": "OR", - "disableTemplating": false, - "fallbackTo404": false, - "default": false, - "crudKey": "id", - "callbacks": [] - } - ], - "responseMode": null } ], "rootChildren": [ @@ -1805,10 +1700,6 @@ { "type": "route", "uuid": "26f0f76f-e787-4ebe-a3f8-ea3a6004bc15" - }, - { - "type": "route", - "uuid": "af7fdcb0-721d-4198-a8ef-c6d8c4eba8c8" } ], "proxyMode": false, diff --git a/framework/python/src/api/api.py b/framework/python/src/api/api.py index 56f93c781..f0910eb02 100644 --- a/framework/python/src/api/api.py +++ b/framework/python/src/api/api.py @@ -16,7 +16,6 @@ from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware import asyncio -from datetime import datetime import json from json import JSONDecodeError import os @@ -43,6 +42,7 @@ DEVICE_ADDITIONAL_INFO_KEY = "additional_info" DEVICES_PATH = "local/devices" +REPORTS_PATH = "local/reports" PROFILES_PATH = "local/risk_profiles" RESOURCES_PATH = "resources" @@ -96,12 +96,12 @@ def __init__(self, testrun): # Report endpoints self._router.add_api_route("/reports", self.get_reports) - self._router.add_api_route("/report", + self._router.add_api_route("/report/{report_name}", self.delete_report, methods=["DELETE"]) - self._router.add_api_route("/report/{device_name}/{timestamp}", + self._router.add_api_route("/report/{report_name}", self.get_report) - self._router.add_api_route("/export/{device_name}/{timestamp}", + self._router.add_api_route("/export/{report_name}", self.get_results, methods=["POST"]) @@ -441,66 +441,42 @@ async def get_version(self, response: Response): LOGGER.debug(e) return json_response - async def get_reports(self, request: Request): + async def get_reports(self): LOGGER.debug("Received reports list request") # Resolve the server IP from the request so we # can fix the report URL reports = self._session.get_all_reports() for report in reports: - # report URL is currently hard coded as localhost so we can - # replace that to fix the IP dynamically from the requester - report["report"] = report["report"].replace( - "localhost", request.client.host) - report["export"] = report["report"].replace("report", "export") + del report["tests"] + report["device"] = { + "manufacturer": report["device"]["manufacturer"], + "model": report["device"]["model"], + "mac_addr": report["device"]["mac_addr"], + "firmware": report["device"]["firmware"], + "test_pack": report["device"]["test_pack"], + } + report["delete"] = report["report"] return reports - async def delete_report(self, request: Request, response: Response): + async def delete_report(self, response: Response, report_name: str): - body_raw = (await request.body()).decode("UTF-8") - - if len(body_raw) == 0: - response.status_code = 400 - return self._generate_msg(False, "Invalid request received, missing body") - - try: - body_json = json.loads(body_raw) - except JSONDecodeError as e: - LOGGER.error("An error occurred whilst decoding JSON") - LOGGER.debug(e) - response.status_code = status.HTTP_400_BAD_REQUEST - return self._generate_msg(False, "Invalid request received") - - if "mac_addr" not in body_json or "timestamp" not in body_json: - response.status_code = 400 - return self._generate_msg(False, "Missing mac address or timestamp") - - mac_addr = body_json.get("mac_addr").lower() - timestamp = body_json.get("timestamp") - - try: - parsed_timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") - timestamp_formatted = parsed_timestamp.strftime("%Y-%m-%dT%H:%M:%S") - - except ValueError: - response.status_code = 400 - return self._generate_msg(False, "Incorrect timestamp format") - - # Get device from MAC address - device = self._session.get_device(mac_addr) + mac = report_name.split("_")[0] + device = self._session.get_device_by_mac_addr(mac) + # If the device not found if device is None: + LOGGER.info("Device not found, returning 404") response.status_code = 404 - return self._generate_msg(False, "Could not find device") - - # Assign the reports folder path from testrun - reports_folder = self._testrun.get_reports_folder(device) + return self._generate_msg(False, "Device not found") - # Check if reports folder exists - if not os.path.exists(reports_folder): + report = device.get_report_by_folder_name(report_name) + LOGGER.debug(f"Looking for report with name {report_name}") + if not report: + LOGGER.info("Report could not be found, returning 404") response.status_code = 404 return self._generate_msg(False, "Report not found") - if self._testrun.delete_report(device, timestamp_formatted): + if self._testrun.delete_report(device, report): return self._generate_msg(True, "Deleted report") response.status_code = 500 @@ -702,8 +678,10 @@ async def edit_device(self, request: Request, response: Response): response.status_code = status.HTTP_400_BAD_REQUEST return self._generate_msg(False, "Invalid JSON received") - async def get_report(self, response: Response, device_name, timestamp): - device = self._session.get_device_by_name(device_name) + async def get_report(self, response: Response, report_name): + """Serve report pdf file for a given report name""" + mac = report_name.split("_")[0] + device = self._session.get_device_by_mac_addr(mac) # If the device not found if device is None: @@ -711,23 +689,19 @@ async def get_report(self, response: Response, device_name, timestamp): response.status_code = 404 return self._generate_msg(False, "Device not found") + report = device.get_report_by_folder_name(report_name) + LOGGER.debug(f"Looking for report with name {report_name}") + if not report: + LOGGER.info("Report could not be found, returning 404") + response.status_code = 404 + return self._generate_msg(False, "Report could not be found") + + # Regenerate the pdf if the device profile has been updated - self._get_testrun().get_test_orc().regenerate_pdf(device, timestamp) - - # 1.3 file path - file_path = os.path.join( - DEVICES_PATH, - device_name, - "reports", - timestamp,"test", - device.mac_addr.replace(":",""), - "report.pdf") - if not os.path.isfile(file_path): - # pre 1.3 file path - file_path = os.path.join(DEVICES_PATH, device_name, "reports", timestamp, - "report.pdf") - - LOGGER.debug(f"Received get report request for {device_name} / {timestamp}") + test_orc = self._get_testrun().get_test_orc() + test_path = test_orc.regenerate_pdf(device, report) + file_path = os.path.join(test_path, "report.pdf") + LOGGER.debug(f"Received get report request for {device.model}") if os.path.isfile(file_path): return FileResponse(file_path) else: @@ -735,10 +709,13 @@ async def get_report(self, response: Response, device_name, timestamp): response.status_code = 404 return self._generate_msg(False, "Report could not be found") - async def get_results(self, request: Request, response: Response, device_name, - timestamp): - LOGGER.debug("Received get results " + - f"request for {device_name} / {timestamp}") + async def get_results( + self, + request: Request, + response: Response, + report_name: str + ): + LOGGER.debug(f"Received get results request for {report_name}") profile = None @@ -761,32 +738,21 @@ async def get_results(self, request: Request, response: Response, device_name, pass # Check if device exists - device = self.get_session().get_device_by_name(device_name) + mac = report_name.split("_")[0] + device = self._session.get_device_by_mac_addr(mac) if device is None: response.status_code = status.HTTP_404_NOT_FOUND return self._generate_msg(False, - "A device with that name could not be found") - - # Check if report exists (1.3 file path) - report_file_path = os.path.join( - DEVICES_PATH, - device_name, - "reports", - timestamp,"test", - device.mac_addr.replace(":","")) - - if not os.path.isdir(report_file_path): - # pre 1.3 file path - report_file_path = os.path.join(DEVICES_PATH, device_name, "reports", - timestamp) - - if not os.path.isdir(report_file_path): + "Device not found") + report = device.get_report_by_folder_name(report_name) + LOGGER.debug(f"Looking for report with name {report_name}") + if not report: LOGGER.info("Report could not be found, returning 404") response.status_code = 404 return self._generate_msg(False, "Report could not be found") zip_file_path = self._get_testrun().get_test_orc().zip_results( - device, timestamp, profile) + device, report, profile) if zip_file_path is None: response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR diff --git a/framework/python/src/common/device.py b/framework/python/src/common/device.py index 0b2ea3162..b86dfebe0 100644 --- a/framework/python/src/common/device.py +++ b/framework/python/src/common/device.py @@ -14,11 +14,16 @@ """Track device object information.""" +import json +import os from typing import List, Dict from dataclasses import dataclass, field from common.testreport import TestReport from datetime import datetime +_LOCAL_DEVICES_DIR = 'local/devices' +_DEVICE_CONFIG_FILE = 'device_config.json' + @dataclass class Device(): """Represents a physical device and it's configuration.""" @@ -47,17 +52,36 @@ class Device(): def add_report(self, report): self.reports.append(report) + def get_report_by_folder_name(self, folder_name: str) -> TestReport | None: + for report in self.reports: + report_folder_name = report.get_folder_name() + if report_folder_name == folder_name: + return report + if report_folder_name is None or report_folder_name == '': + if report.get_report_url().split('/')[-1] == folder_name: + report.set_report_url(folder_name) + return report + return None + def get_reports(self): return self.reports + def sort_reports(self): + self.reports.sort(key=lambda r: r.get_started() or datetime.min) + def clear_reports(self): self.reports = [] - def remove_report(self, timestamp: datetime): + def remove_report(self, report: TestReport): + if report in self.reports: + self.reports.remove(report) + report.delete_folder() + self.export_config_json() + + def remove_reports(self): for report in self.reports: - if report.get_started().strftime('%Y-%m-%dT%H:%M:%S') == timestamp: - self.reports.remove(report) - return + report.delete_folder() + self.clear_reports() def to_dict(self): """Returns the device as a python dictionary. This is used for the @@ -78,6 +102,8 @@ def to_dict(self): device_json['firmware'] = self.firmware device_json['test_modules'] = self.test_modules + device_json['reports'] = [ + report.to_json() for report in self.reports] if self.reports else [] return device_json def to_config_json(self): @@ -94,9 +120,23 @@ def to_config_json(self): device_json['additional_info'] = self.additional_info device_json['created_at'] = self.created_at.isoformat() device_json['modified_at'] = self.modified_at.isoformat() + device_json['reports'] = [ + report.to_json() for report in self.reports] if self.reports else [] return device_json + def export_config_json(self): + """Exports the device config as a json file to the specified path.""" + # Locate parent directory + current_dir = os.path.dirname(os.path.realpath(__file__)) + root_dir = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(current_dir)))) + config_file_path = os.path.join(root_dir, _LOCAL_DEVICES_DIR, + self.device_folder, _DEVICE_CONFIG_FILE) + + with open(config_file_path, 'w+', encoding='utf-8') as config_file: + config_file.writelines(json.dumps(self.to_config_json(), indent=4)) + def __post_init__(self): # Store initial values after creation for f in self.__dataclass_fields__: diff --git a/framework/python/src/common/testreport.py b/framework/python/src/common/testreport.py index 8f2c998c5..6cfeef156 100644 --- a/framework/python/src/common/testreport.py +++ b/framework/python/src/common/testreport.py @@ -14,6 +14,7 @@ """Store previous Testrun information.""" from datetime import datetime +import shutil from weasyprint import HTML from io import BytesIO from common import util, logger @@ -39,6 +40,11 @@ RESULTS_SPACE_FIRST_PAGE = 440 RESULTS_SPACE = 800 +_REPORTS_FOLDER = 'local/reports' +_CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) +_ROOT_DIR = os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(_CURRENT_DIR)))) + LOGGER = logger.get_logger('REPORT') @@ -76,10 +82,16 @@ def __init__(self, self._report_url = '' self._export_url = '' self._cur_page = 0 + self._folder_name = '' def update_device_profile(self, additional_info): self._device['device_profile'] = additional_info + def update_device_info(self, device): + self._device['manufacturer'] = device.manufacturer + self._device['model'] = device.model + self._device['device_profile'] = device.additional_info + def add_module_reports(self, module_reports): self._module_reports = module_reports @@ -108,8 +120,9 @@ def get_duration(self): def add_test(self, test): self._results.append(test) - def set_report_url(self, url): - self._report_url = url + def set_report_url(self, folder_name: str): + self._folder_name = folder_name + self._report_url = f'/report/{folder_name}' def get_report_url(self): return self._report_url @@ -117,6 +130,24 @@ def get_report_url(self): def get_export_url(self): return self._export_url + def set_export_url(self, folder_name: str): + self._export_url = f'/export/{folder_name}' + + def get_folder_name(self) -> str: + return self._folder_name + + def delete_folder(self): + # Delete report from disk + if self._folder_name: + report_path = os.path.join(_ROOT_DIR, _REPORTS_FOLDER, self._folder_name) + if os.path.exists(report_path): + try: + shutil.rmtree(report_path) + except FileNotFoundError: + LOGGER.error( + f'Report folder not found for deletion: {report_path}' + ) + def set_mac_addr(self, mac_addr): self._mac_addr = mac_addr @@ -165,6 +196,8 @@ def to_json(self): report_json['tests'] = {'total': self._total_tests, 'results': test_results} report_json['report'] = self._report_url + report_json['export'] = self._export_url + report_json['folder_name'] = self._folder_name return report_json def from_json(self, json_file): @@ -209,6 +242,8 @@ def from_json(self, json_file): self._report_url = json_file['report'] if 'export' in json_file: self._export_url = json_file['export'] + if 'folder_name' in json_file: + self._folder_name = json_file['folder_name'] self._total_tests = json_file['tests']['total'] @@ -244,6 +279,12 @@ def to_pdf(self): HTML(string=report_html).write_pdf(pdf_bytes) return pdf_bytes + def to_pdf_from_html(self, html_content): + # Convert HTML to PDF in memory using weasyprint + pdf_bytes = BytesIO() + HTML(string=html_content).write_pdf(pdf_bytes) + return pdf_bytes + def to_html(self): # Obtain test pack diff --git a/framework/python/src/core/session.py b/framework/python/src/core/session.py index 80f5db7ce..a01efcd9c 100644 --- a/framework/python/src/core/session.py +++ b/framework/python/src/core/session.py @@ -21,6 +21,7 @@ from common import util, logger, mqtt from common.risk_profile import RiskProfile from common.statuses import TestrunStatus, TestResult, TestrunResult +from common.device import Device from net_orc.ip_control import IPControl # Certificate dependencies @@ -383,6 +384,12 @@ def get_device_by_make_and_model(self, make, model): if device.manufacturer == make and device.model == model: return device + def get_device_by_mac_addr(self, mac_addr_simmplified: str) -> Device | None: + for device in self._device_repository: + if (device.mac_addr is not None + and device.mac_addr.replace(':', '') == mac_addr_simmplified): + return device + def get_device_repository(self): return self._device_repository @@ -517,8 +524,9 @@ def get_all_reports(self): for device in self.get_device_repository(): device_reports = device.get_reports() - for device_report in device_reports: - reports.append(device_report.to_json()) + reports.extend( + [device_report.to_json() for device_report in device_reports] + ) return sorted(reports, key=lambda report: report['started'], reverse=True) def add_total_tests(self, no_tests): diff --git a/framework/python/src/core/testrun.py b/framework/python/src/core/testrun.py index 2e81aa1c1..262713631 100644 --- a/framework/python/src/core/testrun.py +++ b/framework/python/src/core/testrun.py @@ -51,9 +51,13 @@ DEVICE_TECHNOLOGY_KEY = 'technology' DEVICE_TEST_PACK_KEY = 'test_pack' DEVICE_ADDITIONAL_INFO_KEY = 'additional_info' +DEVICE_REPORT_NAME_FORMAT = '{mac_addr}_{timestamp}' MAX_DEVICE_REPORTS_KEY = 'max_device_reports' +OLD_REPORTS_FOLDER = 'local/devices/{device_folder}/reports' +REPORTS_FOLDER = 'local/reports' + class Testrun: # pylint: disable=too-few-public-methods """Testrun controller. @@ -171,6 +175,45 @@ def load_all_devices(self): # self._load_devices(device_dir=RESOURCE_DEVICES_DIR) return self.get_session().get_device_repository() + + def _copy_existing_reports(self, device: Device): + old_reports = self._load_test_reports(device) + device.clear_reports() + if old_reports: + for report in old_reports: + timestamp = report.get_started().strftime('%Y-%m-%dT%H:%M:%S') + report_path = os.path.join(self.get_reports_folder(device), timestamp) + if os.path.exists(report_path) and os.path.isdir(report_path): + new_report_folder_name = DEVICE_REPORT_NAME_FORMAT.format( + mac_addr=device.mac_addr.replace(':', ''), + timestamp=timestamp + ) + new_report_path = os.path.join( + self.get_common_reports_folder(), new_report_folder_name + ) + try: + shutil.copytree( + report_path, + os.path.join(self.get_common_reports_folder(), new_report_path) + ) + except (FileExistsError, shutil.Error) as e: + LOGGER.error(f'Error occurred while copying report: {e}') + report.set_report_url(new_report_folder_name) + report.set_export_url(new_report_folder_name) + device.add_report(report) + self.save_device(device) + try: + shutil.rmtree( + OLD_REPORTS_FOLDER.format(device_folder=device.device_folder) + ) + except FileNotFoundError: + LOGGER.error( + f'Old reports folder not found for device {device.model}' + ) + else: + LOGGER.info('No existing reports to copy') + + def _load_devices(self, device_dir): LOGGER.debug('Loading devices from ' + device_dir) @@ -203,7 +246,7 @@ def _load_devices(self, device_dir): device_model = device_config_json.get(DEVICE_MODEL) mac_addr = device_config_json.get(DEVICE_MAC_ADDR) test_modules = device_config_json.get(DEVICE_TEST_MODULES) - + reports = device_config_json.get('reports', []) # Load max device reports max_device_reports = None if 'max_device_reports' in device_config_json: @@ -211,13 +254,22 @@ def _load_devices(self, device_dir): folder_url = os.path.join(device_dir, device_folder) + device_reports = [] + if reports: + for report in reports: + test_report = TestReport() + test_report.from_json(report) + device_reports.append(test_report) + device = Device(folder_url=folder_url, manufacturer=device_manufacturer, model=device_model, mac_addr=mac_addr, test_modules=test_modules, max_device_reports=max_device_reports, - device_folder=device_folder) + device_folder=device_folder, + reports=device_reports + ) # Load in the additional fields if DEVICE_TYPE_KEY in device_config_json: @@ -238,7 +290,10 @@ def _load_devices(self, device_dir): 'Device is outdated and requires further configuration') device.status = 'Invalid' - self._load_test_reports(device) + if not device.get_reports(): + self._copy_existing_reports(device) + + # self._load_test_reports(device) # Add device to device repository self.get_session().add_device(device) @@ -252,7 +307,7 @@ def _load_test_reports(self, device): # Remove the existing reports in memory device.clear_reports() - + reports = [] # Locate reports folder reports_folder = self.get_reports_folder(device) @@ -288,27 +343,24 @@ def _load_test_reports(self, device): test_report.from_json(report_json) test_report.set_mac_addr(device.mac_addr) device.add_report(test_report) + reports.append(test_report) + return reports def get_reports_folder(self, device): """Return the reports folder path for the device""" return os.path.join(self._root_dir, LOCAL_DEVICES_DIR, device.device_folder, 'reports') - def delete_report(self, device: Device, timestamp): - LOGGER.debug(f'Deleting test report for device {device.model} ' + - f'at {timestamp}') - - # Locate reports folder - reports_folder = self.get_reports_folder(device) + def get_common_reports_folder(self): + """Return the common reports folder path for all devices""" + return os.path.join(self._root_dir, REPORTS_FOLDER) - for report_folder in os.listdir(reports_folder): - if report_folder == timestamp: - shutil.rmtree(os.path.join(reports_folder, report_folder)) - device.remove_report(timestamp) - LOGGER.debug('Successfully deleted the report') - return True + def delete_report(self, device: Device, report: TestReport) -> bool: + LOGGER.debug(f'Deleting test report for device {device.model} ' + + f'at {report.get_folder_name()}') - return False + device.remove_report(report) + return True def create_device(self, device: Device): @@ -353,6 +405,9 @@ def delete_device(self, device: Device): device_folder = os.path.join(self._root_dir, LOCAL_DEVICES_DIR, device.device_folder) + # Remove device reports + device.remove_reports() + # Delete the device directory shutil.rmtree(device_folder) diff --git a/framework/python/src/test_orc/test_orchestrator.py b/framework/python/src/test_orc/test_orchestrator.py index 7da1d216d..eef767a0f 100644 --- a/framework/python/src/test_orc/test_orchestrator.py +++ b/framework/python/src/test_orc/test_orchestrator.py @@ -20,15 +20,17 @@ import time import shutil import docker -from datetime import datetime -from common import logger, util +from common import logger, util, risk_profile from common.testreport import TestReport from common.statuses import TestrunStatus, TestrunResult, TestResult +from common.device import Device +from core.testrun import REPORTS_FOLDER, DEVICE_REPORT_NAME_FORMAT from core.docker.test_docker_module import TestModule from test_orc.test_case import TestCase from test_orc.test_pack import TestPack import threading from typing import List +from bs4 import BeautifulSoup LOG_NAME = "test_orc" LOGGER = logger.get_logger("test_orc") @@ -45,7 +47,7 @@ MODULE_CONFIG = "conf/module_config.json" SAVED_DEVICE_REPORTS = "report/{device_folder}/" -LOCAL_DEVICE_REPORTS = "local/devices/{device_folder}/reports" +LOCAL_DEVICE_REPORTS = "local/reports" DEVICE_ROOT_CERTS = "local/root_certs" LOG_REGEX = r"^[A-Z][a-z]{2} [0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} test_" @@ -181,12 +183,9 @@ def run_test_modules(self): report.from_json(generated_report_json) report.add_module_reports(self.get_session().get_module_reports()) report.add_module_templates(self.get_session().get_module_templates()) - device.add_report(report) self._write_reports(report) self._test_in_progress = False - self.get_session().set_report_url(report.get_report_url()) - self.get_session().set_export_url(report.get_export_url()) # Set testing description test_pack: TestPack = self.get_test_pack(device.test_pack) @@ -199,19 +198,26 @@ def run_test_modules(self): elif report.get_result() == TestrunResult.NON_COMPLIANT: message = test_pack.get_message("non_compliant_description") + # Move testing output from runtime to local device folder + report_folder_name = self._copy_report_to_common_folder(device) + report.set_report_url(report_folder_name) + report.set_export_url(report_folder_name) + + self.get_session().set_report_url(report.get_report_url()) + self.get_session().set_export_url(report.get_export_url()) + device.add_report(report) + self.get_session().set_description(message) # Set result and status at the end self.get_session().set_result(report.get_result()) self.get_session().set_status(report.get_status()) - # Move testing output from runtime to local device folder - self._timestamp_results(device) - LOGGER.debug("Cleaning old test results...") self._cleanup_old_test_results(device) - LOGGER.debug("Old test results cleaned") + LOGGER.debug("Saving device config...") + device.export_config_json() def _write_reports(self, test_report): @@ -265,12 +271,6 @@ def _generate_report(self): report["status"] = status report["tests"] = self.get_session().get_report_tests() - report["report"] = ( - self._api_url + "/" + SAVED_DEVICE_REPORTS.replace( - "{device_folder}", - device.device_folder) + - self.get_session().get_started().strftime("%Y-%m-%dT%H:%M:%S")) - report["export"] = report["report"].replace("report", "export") return report @@ -294,7 +294,6 @@ def _cleanup_modules_html_reports(self, out_dir): except Exception as e: LOGGER.error(f"Error {module_template_path}: {e}") - def _cleanup_old_test_results(self, device): if device.max_device_reports is not None: @@ -303,95 +302,56 @@ def _cleanup_old_test_results(self, device): max_device_reports = self.get_session().get_max_device_reports() if max_device_reports > 0: - completed_results_dir = os.path.join( - self._root_path, - LOCAL_DEVICE_REPORTS.replace("{device_folder}", device.device_folder)) - - completed_tests = os.listdir(completed_results_dir) - cur_test_count = len(completed_tests) - if cur_test_count > max_device_reports: - LOGGER.debug("Current device has more than max results allowed: " + - str(cur_test_count) + ">" + str(max_device_reports)) - - # Find and delete the oldest test - oldest_test = self._find_oldest_test(completed_results_dir) - if oldest_test is not None: - LOGGER.debug("Oldest test found, removing: " + str(oldest_test[1])) - shutil.rmtree(oldest_test[1], ignore_errors=True) - - # Remove oldest test from session - oldest_timestamp = oldest_test[0] - self.get_session().get_target_device().remove_report(oldest_timestamp) - - # Confirm the delete was succesful - new_test_count = len(os.listdir(completed_results_dir)) - if (new_test_count != cur_test_count - and new_test_count > max_device_reports): - # Continue cleaning up until we're under the max - self._cleanup_old_test_results(device) - - def _find_oldest_test(self, completed_tests_dir): - oldest_timestamp = None - oldest_directory = None - for completed_test in os.listdir(completed_tests_dir): - try: - timestamp = datetime.strptime(str(completed_test), "%Y-%m-%dT%H:%M:%S") - - # Occurs when time does not match format - except ValueError as e: - LOGGER.error(e) - continue - - if oldest_timestamp is None or timestamp < oldest_timestamp: - oldest_timestamp = timestamp - oldest_directory = completed_test + device.sort_reports() + while len(device.get_reports()) > max_device_reports: + report = device.get_reports().pop(0) + report.delete_folder() - if oldest_directory: - return oldest_timestamp, os.path.join(completed_tests_dir, - oldest_directory) - else: - return None - - def _timestamp_results(self, device): + def _copy_report_to_common_folder(self, device: Device) -> str: # Define the current device results directory cur_results_dir = os.path.join(self._root_path, RUNTIME_DIR) # Define the directory - completed_results_dir = os.path.join( - self._root_path, - LOCAL_DEVICE_REPORTS.replace("{device_folder}", device.device_folder), - self.get_session().get_started().strftime("%Y-%m-%dT%H:%M:%S")) + report_folder_name = DEVICE_REPORT_NAME_FORMAT.format( + mac_addr=device.mac_addr.replace(":", ""), + timestamp=self.get_session().get_started().strftime("%Y-%m-%dT%H:%M:%S") + ) + report_dir = os.path.join(self._root_path, REPORTS_FOLDER) + report_dir = os.path.join(report_dir, report_folder_name) + LOGGER.info(f"Copying test results from {cur_results_dir} to {report_dir}") # Copy the results to the timestamp directory # leave current copy in place for quick reference to # most recent test - shutil.copytree(cur_results_dir, completed_results_dir, dirs_exist_ok=True) - util.run_command(f"chown -R {self._host_user} '{completed_results_dir}'") + shutil.copytree(cur_results_dir, report_dir, dirs_exist_ok=True) + util.run_command(f"chown -R {self._host_user} '{report_dir}'") # Copy Testrun log to testing directory shutil.copy(os.path.join(self._root_path, "testrun.log"), - os.path.join(completed_results_dir, "testrun.log")) + os.path.join(report_dir, "testrun.log")) - return completed_results_dir + return report_folder_name - def zip_results(self, device, timestamp: str, profile): + def zip_results( + self, device: Device, + report: TestReport, + profile: risk_profile.RiskProfile) -> str: try: LOGGER.debug("Archiving test results") src_path = os.path.join( - LOCAL_DEVICE_REPORTS.replace("{device_folder}", device.device_folder), - timestamp) + LOCAL_DEVICE_REPORTS, report.get_folder_name()) # Regenerate the report if the device profile has been updated - self._regenerate_report_files(device, timestamp) + self._regenerate_report_files(device, report) # Define temp directory to store files before zipping results_dir = os.path.join(f"/tmp/testrun/{time.time()}") # Define where to save the zip file - zip_location = os.path.join("/tmp/testrun", timestamp) + zip_location = os.path.join("/tmp/testrun", report.get_folder_name()) # Delete zip_temp if it already exists if os.path.exists(results_dir): @@ -431,83 +391,113 @@ def zip_results(self, device, timestamp: str, profile): LOGGER.debug(error) return None - def regenerate_pdf(self, device, timestamp): + def regenerate_pdf(self, device: Device, report: TestReport) -> str: """Regenerate the pdf report""" - self._regenerate_report_files(device, timestamp) + return self._regenerate_report_files(device, report) - def _regenerate_report_files(self, device, timestamp): + def _regenerate_report_files(self, device: Device, report: TestReport) -> str: '''Regenerate the report if the device profile has been updated''' - + # Report files path + report_dir = os.path.join(self._root_path, REPORTS_FOLDER) + report_dir = os.path.join(report_dir, report.get_folder_name()) + test_path = os.path.join( + report_dir, + f'test/{device.mac_addr.replace(":", "")}' + ) try: - - # Report files path - report_path = os.path.join( - LOCAL_DEVICE_REPORTS.replace("{device_folder}", device.device_folder), - timestamp, "test", device.mac_addr.replace(":", "")) - - # Parse string timestamp - date_timestamp: datetime.datetime = datetime.strptime( - timestamp, "%Y-%m-%dT%H:%M:%S") - - # Find the report - test_report = None - for report in device.get_reports(): - if report.get_started() == date_timestamp: - test_report = report - - # This should not happen as the timestamp is checked in api.py first - if test_report is None: - return None - # Copy the original report for comparison - test_report_copy = copy.deepcopy(test_report) - + report_copy = copy.deepcopy(report) # Update the report with 'additional_info' field - test_report.update_device_profile(device.additional_info) - + report.update_device_info(device) + device.export_config_json() # Overwrite report only if additional_info has been changed - if test_report.to_json() != test_report_copy.to_json(): + if report.to_json() != report_copy.to_json(): LOGGER.debug("Device profile has been updated, regenerating the report") - # Store the jinja templates - reload_templates = [] - - # Load the jinja templates - if os.path.isdir(report_path): - for dir_path, _, filenames in os.walk(report_path): - for filename in filenames: - try: - if filename.endswith(".j2.html"): - with open(os.path.join(dir_path, filename), "r", - encoding="utf-8") as f: - reload_templates.append(f.read()) - except Exception as e: - LOGGER.debug(f"Could not read the file: {e}") - - # Add the jinja templates to the report - test_report.add_module_templates(reload_templates) - # Rewrite the json report - with open(os.path.join(report_path, "report.json"), + with open(os.path.join(test_path, "report.json"), "w", encoding="utf-8") as f: - json.dump(test_report.to_json(), f, indent=2) + json.dump(report.to_json(), f, indent=2) + with open(os.path.join(test_path, "report.html"), + "r", + encoding="utf-8") as f: + html = f.read() + html = self._update_html_report(report, html) + LOGGER.debug(f"{test_path}") # Rewrite the html report - with open(os.path.join(report_path, "report.html"), + with open(os.path.join(test_path, "report.html"), "w", encoding="utf-8") as f: - f.write(test_report.to_html()) + f.write(html) # Rewrite the pdf report - with open(os.path.join(report_path, "report.pdf"), "wb") as f: - f.write(test_report.to_pdf().getvalue()) + with open(os.path.join(test_path, "report.pdf"), "wb") as f: + f.write(report.to_pdf_from_html(html).getvalue()) LOGGER.debug("Report has been regenerated") except Exception as error: LOGGER.error("Failed to regenerate the report") LOGGER.debug(error) + return test_path + + def _update_html_report(self, report: TestReport, html: str): + """Update the HTML report with the new device information.""" + report_json = report.to_json() + manufacturer = report_json["device"].get("manufacturer", "Unknown") + model = report_json["device"].get("model", "Unknown") + title = f"{manufacturer} {model}" + bs = BeautifulSoup(html, "html.parser") + div_profile = bs.find("div", class_="device-profile-content") + if div_profile is not None: + for item in report_json["device"]["device_profile"]: + question = item["question"] + answer = item["answer"] + # Find the question div with exact text match for 'q' + question_div = div_profile.find( + "div", + class_="device-profile-question", + string=question + ) + if question_div: + # Find the next sibling answer div + answer_div = question_div.find_next_sibling( + "div", + class_="device-profile-answer" + ) + if answer_div: + # Update the answer text with 'a' + answer_div.string = answer + h3 = bs.find("h3", class_="header-info-device") + if h3: + h3.string = title + # Update manufacturer and model in summary section by finding labels + all_labels = bs.find_all("div", class_="summary-item-label") + for label_div in all_labels: + h4 = label_div.find("h4") + if h4 and h4.string: + # Find the next summary-item-value sibling + value_div = label_div.find_next_sibling( + "div", + class_="summary-item-value" + ) + if value_div: + if "Manufacturer" in h4.string: + value_div.string = manufacturer + LOGGER.debug(f"Updated manufacturer to '{value_div.string}'") + elif "Model" in h4.string: + value_div.string = model + LOGGER.debug(f"Updated model to '{value_div.string}'") + all_header_info_divs = bs.find_all("div", class_="header-info") + for header_info_div in all_header_info_divs: + header_span = header_info_div.find_next_sibling("span") + if header_span: + header_span.string = title + LOGGER.debug(f"Updated sibling span to '{header_span.string}'") + + return str(bs) def test_in_progress(self): return self._test_in_progress diff --git a/testing/api/devices/device_1/device_config.json b/testing/api/devices/device_1/device_config.json index 3be69a082..9e3f84328 100644 --- a/testing/api/devices/device_1/device_config.json +++ b/testing/api/devices/device_1/device_config.json @@ -50,5 +50,121 @@ "dns": { "enabled": true } - } + }, + "reports": [{ + "testrun": { + "version": "2.1" + }, + "mac_addr": null, + "device": { + "mac_addr": "00:1e:42:35:73:c4", + "manufacturer": "Teltonika", + "model": "TRB140", + "firmware": "1", + "test_modules": { + "protocol": { + "enabled": false + }, + "services": { + "enabled": false + }, + "connection": { + "enabled": false + }, + "tls": { + "enabled": true + }, + "ntp": { + "enabled": false + }, + "dns": { + "enabled": false + } + }, + "test_pack": "Device Qualification", + "device_profile": [ + { + "question": "What type of device is this?", + "answer": "Building Automation Gateway" + }, + { + "question": "Please select the technology this device falls into", + "answer": "Hardware - Access Control" + }, + { + "question": "Does your device process any sensitive information? ", + "answer": "No" + }, + { + "question": "Can all non-essential services be disabled on your device?", + "answer": "Yes" + }, + { + "question": "Is there a second IP port on the device?", + "answer": "Yes" + }, + { + "question": "Can the second IP port on your device be disabled?", + "answer": "No" + } + ] + }, + "status": "Non-Compliant", + "started": "2024-12-10 16:06:42", + "finished": "2024-12-10 16:08:12", + "tests": { + "total": 5, + "results": [ + { + "name": "security.tls.v1_0_client", + "description": "No outbound connections were found", + "expected_behavior": "The packet indicates a TLS connection with at least TLS 1.0 and support", + "required_result": "Informational", + "result": "Feature Not Detected" + }, + { + "name": "security.tls.v1_2_server", + "description": "TLS 1.2 certificate is invalid", + "expected_behavior": "TLS 1.2 certificate is issued to the web browser client when accessed", + "required_result": "Required if Applicable", + "result": "Non-Compliant", + "recommendations": [ + "Enable TLS 1.2 support in the web server configuration", + "Disable TLS 1.0 and 1.1", + "Sign the certificate used by the web server" + ] + }, + { + "name": "security.tls.v1_2_client", + "description": "An error occurred whilst running this test", + "expected_behavior": "The packet indicates a TLS connection with at least TLS 1.2 and support for ECDH and ECDSA ciphers", + "required_result": "Required if Applicable", + "result": "Error" + }, + { + "name": "security.tls.v1_3_server", + "description": "TLS 1.3 certificate is invalid", + "expected_behavior": "TLS 1.3 certificate is issued to the web browser client when accessed", + "required_result": "Informational", + "result": "Informational", + "optional_recommendations": [ + "Enable TLS 1.3 support in the web server configuration", + "Disable TLS 1.0 and 1.1", + "Sign the certificate used by the web server" + ] + }, + { + "name": "security.tls.v1_3_client", + "description": "An error occurred whilst running this test", + "expected_behavior": "The packet indicates a TLS connection with at least TLS 1.3", + "required_result": "Informational", + "result": "Error" + } + ] + }, + "report": "/report/001e42289e4a_2024-12-10T16:06:42", + "export": "/export/001e42289e4a_2024-12-10T16:06:42", + "folder_name": "001e42289e4a_2024-12-10T16:06:42" +} +] } diff --git a/testing/api/local/reports/001e42289e4a_2024-12-10T16:06:42/test/001e42289e4a/report.json b/testing/api/local/reports/001e42289e4a_2024-12-10T16:06:42/test/001e42289e4a/report.json new file mode 100644 index 000000000..7dc70a7b5 --- /dev/null +++ b/testing/api/local/reports/001e42289e4a_2024-12-10T16:06:42/test/001e42289e4a/report.json @@ -0,0 +1,116 @@ +{ + "testrun": { + "version": "2.1" + }, + "mac_addr": null, + "device": { + "mac_addr": "00:1e:42:35:73:c4", + "manufacturer": "Teltonika", + "model": "TRB140", + "firmware": "1", + "test_modules": { + "protocol": { + "enabled": false + }, + "services": { + "enabled": false + }, + "connection": { + "enabled": false + }, + "tls": { + "enabled": true + }, + "ntp": { + "enabled": false + }, + "dns": { + "enabled": false + } + }, + "test_pack": "Device Qualification", + "device_profile": [ + { + "question": "What type of device is this?", + "answer": "Building Automation Gateway" + }, + { + "question": "Please select the technology this device falls into", + "answer": "Hardware - Access Control" + }, + { + "question": "Does your device process any sensitive information? ", + "answer": "No" + }, + { + "question": "Can all non-essential services be disabled on your device?", + "answer": "Yes" + }, + { + "question": "Is there a second IP port on the device?", + "answer": "Yes" + }, + { + "question": "Can the second IP port on your device be disabled?", + "answer": "No" + } + ] + }, + "status": "Non-Compliant", + "started": "2024-12-10 16:06:42", + "finished": "2024-12-10 16:08:12", + "tests": { + "total": 5, + "results": [ + { + "name": "security.tls.v1_0_client", + "description": "No outbound connections were found", + "expected_behavior": "The packet indicates a TLS connection with at least TLS 1.0 and support", + "required_result": "Informational", + "result": "Feature Not Detected" + }, + { + "name": "security.tls.v1_2_server", + "description": "TLS 1.2 certificate is invalid", + "expected_behavior": "TLS 1.2 certificate is issued to the web browser client when accessed", + "required_result": "Required if Applicable", + "result": "Non-Compliant", + "recommendations": [ + "Enable TLS 1.2 support in the web server configuration", + "Disable TLS 1.0 and 1.1", + "Sign the certificate used by the web server" + ] + }, + { + "name": "security.tls.v1_2_client", + "description": "An error occurred whilst running this test", + "expected_behavior": "The packet indicates a TLS connection with at least TLS 1.2 and support for ECDH and ECDSA ciphers", + "required_result": "Required if Applicable", + "result": "Error" + }, + { + "name": "security.tls.v1_3_server", + "description": "TLS 1.3 certificate is invalid", + "expected_behavior": "TLS 1.3 certificate is issued to the web browser client when accessed", + "required_result": "Informational", + "result": "Informational", + "optional_recommendations": [ + "Enable TLS 1.3 support in the web server configuration", + "Disable TLS 1.0 and 1.1", + "Sign the certificate used by the web server" + ] + }, + { + "name": "security.tls.v1_3_client", + "description": "An error occurred whilst running this test", + "expected_behavior": "The packet indicates a TLS connection with at least TLS 1.3", + "required_result": "Informational", + "result": "Error" + } + ] + }, + "report": "/report/001e42289e4a_2024-12-10T16:06:42", + "export": "/export/001e42289e4a_2024-12-10T16:06:42", + "folder_name": "001e42289e4a_2024-12-10T16:06:42" + +} \ No newline at end of file diff --git a/testing/api/local/reports/001e42289e4a_2024-12-10T16:06:42/test/001e42289e4a/report.pdf b/testing/api/local/reports/001e42289e4a_2024-12-10T16:06:42/test/001e42289e4a/report.pdf new file mode 100644 index 000000000..87fc93be2 Binary files /dev/null and b/testing/api/local/reports/001e42289e4a_2024-12-10T16:06:42/test/001e42289e4a/report.pdf differ diff --git a/testing/api/reports/report.json b/testing/api/reports/report.json index e1d9f7003..7dc70a7b5 100644 --- a/testing/api/reports/report.json +++ b/testing/api/reports/report.json @@ -109,5 +109,8 @@ } ] }, - "report": "http://localhost:8000/report/Teltonika TRB140/2024-12-10T16:06:42" + "report": "/report/001e42289e4a_2024-12-10T16:06:42", + "export": "/export/001e42289e4a_2024-12-10T16:06:42", + "folder_name": "001e42289e4a_2024-12-10T16:06:42" + } \ No newline at end of file diff --git a/testing/api/test_api b/testing/api/test_api index 095123a3b..649720205 100755 --- a/testing/api/test_api +++ b/testing/api/test_api @@ -38,6 +38,7 @@ sudo chown -R $USER local # Copy configuration to testrun sudo cp testing/api/sys_config/system.json local/system.json +sudo cp testing/api/local/reports local/ -r # Needs to be sudo because this invokes bin/testrun sudo venv/bin/python3 -m pytest -v testing/api/test_api.py diff --git a/testing/api/test_api.py b/testing/api/test_api.py index 9ba6c3f08..28c92467d 100644 --- a/testing/api/test_api.py +++ b/testing/api/test_api.py @@ -934,7 +934,6 @@ def test_get_reports(empty_devices_dir, add_devices, # pylint: disable=W0613 "status", "started", "finished", - "tests", "report", "export" ] @@ -948,9 +947,9 @@ def test_get_reports(empty_devices_dir, add_devices, # pylint: disable=W0613 @pytest.mark.parametrize("add_devices", [ ["device_1"] ],indirect=True) -def test_delete_report(empty_devices_dir, add_devices, # pylint: disable=W0613 - create_report_folder, testrun): # pylint: disable=W0613 - """ Test for succesfully delete a report (200) """ +def test_get_report_success(empty_devices_dir, add_devices, # pylint: disable=W0613 + create_report_folder, testrun): # pylint: disable=W0613 + """Test for successfully get report when report exists (200)""" # Load the device using load_json utility method device = load_json("device_config.json", directory=DEVICE_1_PATH) @@ -958,86 +957,85 @@ def test_delete_report(empty_devices_dir, add_devices, # pylint: disable=W0613 # Assign the device mac address mac_addr = device["mac_addr"] - # Assign the device name - device_name = f'{device["manufacturer"]} {device["model"]}' - - # Payload - delete_data = { - "mac_addr": mac_addr, - "timestamp": get_timestamp() - } + # Construct report_name + mac_no_colons = mac_addr.replace(":", "") + timestamp = get_timestamp(formatted=True) + report_name = f"{mac_no_colons}_{timestamp}" - # Send a DELETE request to remove the report - r = requests.delete(f"{API}/report", data=json.dumps(delete_data), timeout=5) + # Send the get request + r = requests.get(f"{API}/report/{report_name}", timeout=5) - # Check if status code is 200 (OK) + # Check if status code is 200 (ok) assert r.status_code == 200 - # Parse the json response - response = r.json() - - # Check if "success" in response - assert "success" in response + # Check if the response is a PDF + assert r.headers["Content-Type"] == "application/pdf" - # Construct the 'reports' folder path - reports_folder = os.path.join(device_name, "reports") +@pytest.mark.parametrize("add_devices, add_profiles", [ + (["device_1"], ["valid_profile.json"]) +], indirect=True) +def test_export_report_with_profile(empty_devices_dir, add_devices, # pylint: disable=W0613 + empty_profiles_dir, add_profiles, # pylint: disable=W0613 + create_report_folder, testrun): # pylint: disable=W0613 + """Test export results with existing profile when report exists (200)""" - # Check if reports folder has been deleted - assert not os.path.exists(reports_folder) + # Load the profile using load_json utility method + profile = load_json("valid_profile.json", directory=PROFILES_PATH) -@pytest.mark.parametrize("add_devices", [ - ["device_1"] -],indirect=True) -def test_delete_report_no_payload(empty_devices_dir, add_devices, # pylint: disable=W0613 - create_report_folder, testrun): # pylint: disable=W0613 - """ Test delete report bad request when the payload is missing (400) """ + # Load the device using load_json utility method + device = load_json("device_config.json", directory=DEVICE_1_PATH) - # Send a DELETE request to remove the report without the payload - r = requests.delete(f"{API}/report", timeout=5) + # Assign the device mac address + mac_addr = device["mac_addr"] - # Check if status code is 400 (bad request) - assert r.status_code == 400 + # Construct report_name + mac_no_colons = mac_addr.replace(":", "") + timestamp = get_timestamp(formatted=True) + report_name = f"{mac_no_colons}_{timestamp}" - # Parse the json response - response = r.json() + # Send the post request + r = requests.post(f"{API}/export/{report_name}", + json=profile, + timeout=5) - # Check if "error" in response - assert "error" in response + # Check if status code is 200 (OK) + assert r.status_code == 200 - # Check if the correct error message returned - assert "Invalid request received, missing body" in response["error"] + # Check if the response is a zip file + assert r.headers["Content-Type"] == "application/zip" @pytest.mark.parametrize("add_devices", [ ["device_1"] ],indirect=True) -def test_delete_report_invalid_payload(empty_devices_dir, add_devices, # pylint: disable=W0613 - create_report_folder, testrun): # pylint: disable=W0613 - """ Test delete report bad request missing mac addr or timestamp (400) """ - - # Empty payload - delete_data = {} +def test_export_results_with_no_profile(empty_devices_dir, add_devices, # pylint: disable=W0613 + create_report_folder, testrun): # pylint: disable=W0613 + """Test export results with no profile when report exists (200)""" + # Load the device using load_json utility method + device = load_json("device_config.json", directory=DEVICE_1_PATH) - # Send a DELETE request to remove the report - r = requests.delete(f"{API}/report", data=json.dumps(delete_data), timeout=5) + # Assign the device mac address + mac_addr = device["mac_addr"] - # Check if status code is 400 (bad request) - assert r.status_code == 400 + # Construct report_name + mac_no_colons = mac_addr.replace(":", "") + timestamp = get_timestamp(formatted=True) + report_name = f"{mac_no_colons}_{timestamp}" - # Parse the json response - response = r.json() + # Send the post request + r = requests.post(f"{API}/export/{report_name}", timeout=5) - # Check if "error" in response - assert "error" in response + # Check if status code is 200 (OK) + assert r.status_code == 200 - # Check if the correct error message returned - assert "Missing mac address or timestamp" in response["error"] + # Check if the response is a zip file + assert r.headers["Content-Type"] == "application/zip" @pytest.mark.parametrize("add_devices", [ ["device_1"] ],indirect=True) -def test_delete_report_invalid_timestamp(empty_devices_dir, add_devices, # pylint: disable=W0613 - create_report_folder, testrun): # pylint: disable=W0613 - """ Test delete report bad request if timestamp format is not valid (400) """ +def test_delete_report(empty_devices_dir, add_devices, # pylint: disable=W0613 + create_report_folder, testrun): # pylint: disable=W0613 + """ Test for succesfully delete a report (200) """ # Load the device using load_json utility method device = load_json("device_config.json", directory=DEVICE_1_PATH) @@ -1045,41 +1043,56 @@ def test_delete_report_invalid_timestamp(empty_devices_dir, add_devices, # pylin # Assign the device mac address mac_addr = device["mac_addr"] - # Assign the incorrect timestamp format - invalid_timestamp = "2024-01-01 invalid" + # Assign the device name + device_name = f'{device["manufacturer"]} {device["model"]}' - # Payload - delete_data = { - "mac_addr": mac_addr, - "timestamp": invalid_timestamp - } + # Construct report_name from mac_addr and timestamp + mac_no_colons = mac_addr.replace(":", "") + timestamp = get_timestamp(formatted=True) + report_name = f"{mac_no_colons}_{timestamp}" # Send a DELETE request to remove the report - r = requests.delete(f"{API}/report", data=json.dumps(delete_data), timeout=5) + r = requests.delete(f"{API}/report/{report_name}", timeout=5) - # Check if status code is 400 (bad request) - assert r.status_code == 400 + # Check if status code is 200 (OK) + assert r.status_code == 200 # Parse the json response response = r.json() - # Check if "error" in response - assert "error" in response + # Check if "success" in response + assert "success" in response + + # Construct the 'reports' folder path + reports_folder = os.path.join(device_name, "reports") + + # Check if reports folder has been deleted + assert not os.path.exists(reports_folder) + +@pytest.mark.parametrize("add_devices", [ + ["device_1"] +],indirect=True) +def test_delete_report_no_payload(empty_devices_dir, add_devices, # pylint: disable=W0613 + create_report_folder, testrun): # pylint: disable=W0613 + """ Test delete report with empty report name (404) """ + + # Send a DELETE request with empty report name + r = requests.delete(f"{API}/report/", timeout=5) + + # Check if status code is 404 (not found) + assert r.status_code == 404 - # Check if the correct error message returned - assert "Incorrect timestamp format" in response["error"] def test_delete_report_no_device(empty_devices_dir, testrun): # pylint: disable=W0613 """ Test delete report when device does not exist (404) """ - # Payload to be deleted for a non existing device - delete_data = { - "mac_addr": "00:1e:42:35:73:c4", - "timestamp": get_timestamp() - } + # Construct report_name for non-existing device + mac_no_colons = "001e423573c4" + timestamp = get_timestamp(formatted=True) + report_name = f"{mac_no_colons}_{timestamp}" # Send the delete request to the endpoint - r = requests.delete(f"{API}/report", data=json.dumps(delete_data), timeout=5) + r = requests.delete(f"{API}/report/{report_name}", timeout=5) # Check if status is 404 (not found) assert r.status_code == 404 @@ -1091,7 +1104,7 @@ def test_delete_report_no_device(empty_devices_dir, testrun): # pylint: disable= assert "error" in response # Check if the correct error message returned - assert "Could not find device" in response["error"] + assert "Device not found" in response["error"] @pytest.mark.parametrize("add_devices", [ ["device_1"] @@ -1105,16 +1118,13 @@ def test_delete_report_no_report(empty_devices_dir, add_devices, testrun): # pyl # Assign the device mac address mac_addr = device["mac_addr"] - # Prepare the payload for the DELETE request - delete_data = { - "mac_addr": mac_addr, - "timestamp": get_timestamp() - } + # Construct report_name for non-existent report + mac_no_colons = mac_addr.replace(":", "") + # Use different timestamp than what exists + invalid_report_name = f"{mac_no_colons}_2020-01-01T00:00:00" # Send the delete request to delete the report - r = requests.delete(f"{API}/report", - data=json.dumps(delete_data), - timeout=5) + r = requests.delete(f"{API}/report/{invalid_report_name}", timeout=5) # Check if status code is 404 (not found) assert r.status_code == 404 @@ -1128,31 +1138,6 @@ def test_delete_report_no_report(empty_devices_dir, add_devices, testrun): # pyl # Check if the correct error message is returned assert "Report not found" in response["error"] -@pytest.mark.parametrize("add_devices", [ - ["device_1"] -],indirect=True) -def test_get_report_success(empty_devices_dir, add_devices, # pylint: disable=W0613 - create_report_folder, testrun): # pylint: disable=W0613 - """Test for successfully get report when report exists (200)""" - - # Load the device using load_json utility method - device = load_json("device_config.json", directory=DEVICE_1_PATH) - - # Assign the device name - device_name = f'{device["manufacturer"]} {device["model"]}' - - # Assign the timestamp and change the format - timestamp = get_timestamp(formatted=True) - - # Send the get request - r = requests.get(f"{API}/report/{device_name}/{timestamp}", timeout=5) - - # Check if status code is 200 (ok) - assert r.status_code == 200 - - # Check if the response is a PDF - assert r.headers["Content-Type"] == "application/pdf" - @pytest.mark.parametrize("add_devices", [ ["device_1"] ],indirect=True) @@ -1162,14 +1147,15 @@ def test_get_report_not_found(empty_devices_dir, add_devices, testrun): # pylint # Load the device using load_json utility method device = load_json("device_config.json", directory=DEVICE_1_PATH) - # Assign the device name - device_name = f'{device["manufacturer"]} {device["model"]}' + # Assign the device mac address + mac_addr = device["mac_addr"] - # Assign the timestamp - timestamp = get_timestamp() + # Construct report_name with non-existent timestamp + mac_no_colons = mac_addr.replace(":", "") + invalid_report_name = f"{mac_no_colons}_2020-01-01T00:00:00" # Send the get request - r = requests.get(f"{API}/report/{device_name}/{timestamp}", timeout=5) + r = requests.get(f"{API}/report/{invalid_report_name}", timeout=5) # Check if status code is 404 (not found) assert r.status_code == 404 @@ -1180,20 +1166,23 @@ def test_get_report_not_found(empty_devices_dir, add_devices, testrun): # pylint # Check if "error" in response assert "error" in response + # Check if the correct error message is returned + assert "Report could not be found" in response["error"] + + # Check if "error" in response + assert "error" in response + # Check if the correct error message returned assert "Report could not be found" in response["error"] def test_get_report_device_not_found(empty_devices_dir, testrun): # pylint: disable=W0613 """Test getting a report when the device is not found (404)""" - # Assign device name - device_name = "nonexistent_device" - # Assign the timestamp timestamp = get_timestamp() # Send the get request - r = requests.get(f"{API}/report/{device_name}/{timestamp}", timeout=5) + r = requests.get(f"{API}/report/001e423573c4_{timestamp}", timeout=5) # Check if is 404 (not found) assert r.status_code == 404 @@ -1211,14 +1200,14 @@ def test_export_report_device_not_found(empty_devices_dir, create_report_folder, testrun): # pylint: disable=W0613 """Test for export the report result when the device could not be found""" - # Assign the non-existing device name - device_name = "non existing device" - + # Assign the non-existing device mac (no colons) + mac_no_colons = "001e423573c4" # Assign the timestamp - timestamp = get_timestamp() + timestamp = get_timestamp(formatted=True) + invalid_report_name = f"{mac_no_colons}_{timestamp}" # Send the post request - r = requests.post(f"{API}/export/{device_name}/{timestamp}", timeout=5) + r = requests.post(f"{API}/export/{invalid_report_name}", timeout=5) # Check if is 404 (not found) assert r.status_code == 404 @@ -1229,8 +1218,8 @@ def test_export_report_device_not_found(empty_devices_dir, create_report_folder, # Check if "error" in response assert "error" in response - # Check if the correct error message returned - assert "A device with that name could not be found" in response["error"] + # Check if the correct error message is returned + assert "Device not found" in response["error"] @pytest.mark.parametrize("add_devices", [ ["device_1"] @@ -1242,17 +1231,19 @@ def test_export_report_profile_not_found(empty_devices_dir, add_devices, # pylin # Load the device using load_json utility method device = load_json("device_config.json", directory=DEVICE_1_PATH) - # Assign the device name - device_name = f'{device["manufacturer"]} {device["model"]}' + # Assign the device mac address + mac_addr = device["mac_addr"] - # Assign the timestamp - timestamp = get_timestamp() + # Construct report_name + mac_no_colons = mac_addr.replace(":", "") + timestamp = get_timestamp(formatted=True) + report_name = f"{mac_no_colons}_{timestamp}" # Add a non existing profile into the payload payload = {"profile": "non_existent_profile"} # Send the post request - r = requests.post(f"{API}/export/{device_name}/{timestamp}", + r = requests.post(f"{API}/export/{report_name}", json=payload, timeout=5) @@ -1277,14 +1268,15 @@ def test_export_report_not_found(empty_devices_dir, add_devices, testrun): # pyl # Load the device using load_json utility method device = load_json("device_config.json", directory=DEVICE_1_PATH) - # Assign the device name - device_name = f'{device["manufacturer"]} {device["model"]}' + # Assign the device mac address + mac_addr = device["mac_addr"] - # Assign the timestamp - timestamp = get_timestamp() + # Construct report_name + mac_no_colons = mac_addr.replace(":", "") + invalid_report_name = f"{mac_no_colons}_2020-01-01T00:00:00" # Send the post request to trigger the zipping process - r = requests.post(f"{API}/export/{device_name}/{timestamp}", timeout=10) + r = requests.post(f"{API}/export/{invalid_report_name}", timeout=10) # Check if status code is 404 (Not Found) assert r.status_code == 404 @@ -1298,62 +1290,6 @@ def test_export_report_not_found(empty_devices_dir, add_devices, testrun): # pyl # Check if the correct error message is returned assert "Report could not be found" in response["error"] -@pytest.mark.parametrize("add_devices, add_profiles", [ - (["device_1"], ["valid_profile.json"]) -], indirect=True) -def test_export_report_with_profile(empty_devices_dir, add_devices, # pylint: disable=W0613 - empty_profiles_dir, add_profiles, # pylint: disable=W0613 - create_report_folder, testrun): # pylint: disable=W0613 - """Test export results with existing profile when report exists (200)""" - - # Load the profile using load_json utility method - profile = load_json("valid_profile.json", directory=PROFILES_PATH) - - # Load the device using load_json utility method - device = load_json("device_config.json", directory=DEVICE_1_PATH) - - # Assign the device name - device_name = f'{device["manufacturer"]} {device["model"]}' - - # Assign the timestamp and change the format - timestamp = get_timestamp(formatted=True) - - # Send the post request - r = requests.post(f"{API}/export/{device_name}/{timestamp}", - json=profile, - timeout=5) - - # Check if status code is 200 (OK) - assert r.status_code == 200 - - # Check if the response is a zip file - assert r.headers["Content-Type"] == "application/zip" - -@pytest.mark.parametrize("add_devices", [ - ["device_1"] -],indirect=True) -def test_export_results_with_no_profile(empty_devices_dir, add_devices, # pylint: disable=W0613 - create_report_folder, testrun): # pylint: disable=W0613 - """Test export results with no profile when report exists (200)""" - - # Load the device using load_json utility method - device = load_json("device_config.json", directory=DEVICE_1_PATH) - - # Assign the device name - device_name = f'{device["manufacturer"]} {device["model"]}' - - # Assign the timestamp and change the format - timestamp = get_timestamp(formatted=True) - - # Send the post request - r = requests.post(f"{API}/export/{device_name}/{timestamp}", timeout=5) - - # Check if status code is 200 (OK) - assert r.status_code == 200 - - # Check if the response is a zip file - assert r.headers["Content-Type"] == "application/zip" - # Tests for device endpoints @pytest.fixture() def add_devices(request):