Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ error
pylint.out
__pycache__/
build/
local/reports/

# Ignore generated files from unit tests
testing/unit_test/temp/
Expand Down
2 changes: 1 addition & 1 deletion cmd/install
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ deactivate
cmd/build

# Create local folders
mkdir -p local/{devices,root_certs,risk_profiles}
mkdir -p local/{devices,root_certs,risk_profiles,reports}

# Set file permissions on local
# This does not work on GitHub actions
Expand Down
201 changes: 46 additions & 155 deletions docs/dev/mockoon.json

Large diffs are not rendered by default.

144 changes: 55 additions & 89 deletions framework/python/src/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
import asyncio
from datetime import datetime
import json
from json import JSONDecodeError
import os
Expand All @@ -43,6 +42,7 @@
DEVICE_ADDITIONAL_INFO_KEY = "additional_info"

DEVICES_PATH = "local/devices"
REPORTS_PATH = "local/reports"
PROFILES_PATH = "local/risk_profiles"

RESOURCES_PATH = "resources"
Expand Down Expand Up @@ -96,12 +96,12 @@ def __init__(self, testrun):

# Report endpoints
self._router.add_api_route("/reports", self.get_reports)
self._router.add_api_route("/report",
self._router.add_api_route("/report/{report_name}",
self.delete_report,
methods=["DELETE"])
self._router.add_api_route("/report/{device_name}/{timestamp}",
self._router.add_api_route("/report/{report_name}",
self.get_report)
self._router.add_api_route("/export/{device_name}/{timestamp}",
self._router.add_api_route("/export/{report_name}",
self.get_results,
methods=["POST"])

Expand Down Expand Up @@ -441,66 +441,42 @@ async def get_version(self, response: Response):
LOGGER.debug(e)
return json_response

async def get_reports(self, request: Request):
async def get_reports(self):
LOGGER.debug("Received reports list request")
# Resolve the server IP from the request so we
# can fix the report URL
reports = self._session.get_all_reports()
for report in reports:
# report URL is currently hard coded as localhost so we can
# replace that to fix the IP dynamically from the requester
report["report"] = report["report"].replace(
"localhost", request.client.host)
report["export"] = report["report"].replace("report", "export")
del report["tests"]
report["device"] = {
"manufacturer": report["device"]["manufacturer"],
"model": report["device"]["model"],
"mac_addr": report["device"]["mac_addr"],
"firmware": report["device"]["firmware"],
"test_pack": report["device"]["test_pack"],
}
report["delete"] = report["report"]
return reports

async def delete_report(self, request: Request, response: Response):
async def delete_report(self, response: Response, report_name: str):

body_raw = (await request.body()).decode("UTF-8")

if len(body_raw) == 0:
response.status_code = 400
return self._generate_msg(False, "Invalid request received, missing body")

try:
body_json = json.loads(body_raw)
except JSONDecodeError as e:
LOGGER.error("An error occurred whilst decoding JSON")
LOGGER.debug(e)
response.status_code = status.HTTP_400_BAD_REQUEST
return self._generate_msg(False, "Invalid request received")

if "mac_addr" not in body_json or "timestamp" not in body_json:
response.status_code = 400
return self._generate_msg(False, "Missing mac address or timestamp")

mac_addr = body_json.get("mac_addr").lower()
timestamp = body_json.get("timestamp")

try:
parsed_timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
timestamp_formatted = parsed_timestamp.strftime("%Y-%m-%dT%H:%M:%S")

except ValueError:
response.status_code = 400
return self._generate_msg(False, "Incorrect timestamp format")

# Get device from MAC address
device = self._session.get_device(mac_addr)
mac = report_name.split("_")[0]
device = self._session.get_device_by_mac_addr(mac)

# If the device not found
if device is None:
LOGGER.info("Device not found, returning 404")
response.status_code = 404
return self._generate_msg(False, "Could not find device")

# Assign the reports folder path from testrun
reports_folder = self._testrun.get_reports_folder(device)
return self._generate_msg(False, "Device not found")

# Check if reports folder exists
if not os.path.exists(reports_folder):
report = device.get_report_by_folder_name(report_name)
LOGGER.debug(f"Looking for report with name {report_name}")
if not report:
LOGGER.info("Report could not be found, returning 404")
response.status_code = 404
return self._generate_msg(False, "Report not found")

if self._testrun.delete_report(device, timestamp_formatted):
if self._testrun.delete_report(device, report):
return self._generate_msg(True, "Deleted report")

response.status_code = 500
Expand Down Expand Up @@ -702,43 +678,44 @@ async def edit_device(self, request: Request, response: Response):
response.status_code = status.HTTP_400_BAD_REQUEST
return self._generate_msg(False, "Invalid JSON received")

async def get_report(self, response: Response, device_name, timestamp):
device = self._session.get_device_by_name(device_name)
async def get_report(self, response: Response, report_name):
"""Serve report pdf file for a given report name"""
mac = report_name.split("_")[0]
device = self._session.get_device_by_mac_addr(mac)

# If the device not found
if device is None:
LOGGER.info("Device not found, returning 404")
response.status_code = 404
return self._generate_msg(False, "Device not found")

report = device.get_report_by_folder_name(report_name)
LOGGER.debug(f"Looking for report with name {report_name}")
if not report:
LOGGER.info("Report could not be found, returning 404")
response.status_code = 404
return self._generate_msg(False, "Report could not be found")


# Regenerate the pdf if the device profile has been updated
self._get_testrun().get_test_orc().regenerate_pdf(device, timestamp)

# 1.3 file path
file_path = os.path.join(
DEVICES_PATH,
device_name,
"reports",
timestamp,"test",
device.mac_addr.replace(":",""),
"report.pdf")
if not os.path.isfile(file_path):
# pre 1.3 file path
file_path = os.path.join(DEVICES_PATH, device_name, "reports", timestamp,
"report.pdf")

LOGGER.debug(f"Received get report request for {device_name} / {timestamp}")
test_orc = self._get_testrun().get_test_orc()
test_path = test_orc.regenerate_pdf(device, report)
file_path = os.path.join(test_path, "report.pdf")
LOGGER.debug(f"Received get report request for {device.model}")
if os.path.isfile(file_path):
return FileResponse(file_path)
else:
LOGGER.info("Report could not be found, returning 404")
response.status_code = 404
return self._generate_msg(False, "Report could not be found")

async def get_results(self, request: Request, response: Response, device_name,
timestamp):
LOGGER.debug("Received get results " +
f"request for {device_name} / {timestamp}")
async def get_results(
self,
request: Request,
response: Response,
report_name: str
):
LOGGER.debug(f"Received get results request for {report_name}")

profile = None

Expand All @@ -761,32 +738,21 @@ async def get_results(self, request: Request, response: Response, device_name,
pass

# Check if device exists
device = self.get_session().get_device_by_name(device_name)
mac = report_name.split("_")[0]
device = self._session.get_device_by_mac_addr(mac)
if device is None:
response.status_code = status.HTTP_404_NOT_FOUND
return self._generate_msg(False,
"A device with that name could not be found")

# Check if report exists (1.3 file path)
report_file_path = os.path.join(
DEVICES_PATH,
device_name,
"reports",
timestamp,"test",
device.mac_addr.replace(":",""))

if not os.path.isdir(report_file_path):
# pre 1.3 file path
report_file_path = os.path.join(DEVICES_PATH, device_name, "reports",
timestamp)

if not os.path.isdir(report_file_path):
"Device not found")
report = device.get_report_by_folder_name(report_name)
LOGGER.debug(f"Looking for report with name {report_name}")
if not report:
LOGGER.info("Report could not be found, returning 404")
response.status_code = 404
return self._generate_msg(False, "Report could not be found")

zip_file_path = self._get_testrun().get_test_orc().zip_results(
device, timestamp, profile)
device, report, profile)

if zip_file_path is None:
response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
Expand Down
48 changes: 44 additions & 4 deletions framework/python/src/common/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@

"""Track device object information."""

import json
import os
from typing import List, Dict
from dataclasses import dataclass, field
from common.testreport import TestReport
from datetime import datetime

_LOCAL_DEVICES_DIR = 'local/devices'
_DEVICE_CONFIG_FILE = 'device_config.json'

@dataclass
class Device():
"""Represents a physical device and it's configuration."""
Expand Down Expand Up @@ -47,17 +52,36 @@ class Device():
def add_report(self, report):
self.reports.append(report)

def get_report_by_folder_name(self, folder_name: str) -> TestReport | None:
for report in self.reports:
report_folder_name = report.get_folder_name()
if report_folder_name == folder_name:
return report
if report_folder_name is None or report_folder_name == '':
if report.get_report_url().split('/')[-1] == folder_name:
report.set_report_url(folder_name)
return report
return None

def get_reports(self):
return self.reports

def sort_reports(self):
self.reports.sort(key=lambda r: r.get_started() or datetime.min)

def clear_reports(self):
self.reports = []

def remove_report(self, timestamp: datetime):
def remove_report(self, report: TestReport):
if report in self.reports:
self.reports.remove(report)
report.delete_folder()
self.export_config_json()

def remove_reports(self):
for report in self.reports:
if report.get_started().strftime('%Y-%m-%dT%H:%M:%S') == timestamp:
self.reports.remove(report)
return
report.delete_folder()
self.clear_reports()

def to_dict(self):
"""Returns the device as a python dictionary. This is used for the
Expand All @@ -78,6 +102,8 @@ def to_dict(self):
device_json['firmware'] = self.firmware

device_json['test_modules'] = self.test_modules
device_json['reports'] = [
report.to_json() for report in self.reports] if self.reports else []
return device_json

def to_config_json(self):
Expand All @@ -94,9 +120,23 @@ def to_config_json(self):
device_json['additional_info'] = self.additional_info
device_json['created_at'] = self.created_at.isoformat()
device_json['modified_at'] = self.modified_at.isoformat()
device_json['reports'] = [
report.to_json() for report in self.reports] if self.reports else []

return device_json

def export_config_json(self):
"""Exports the device config as a json file to the specified path."""
# Locate parent directory
current_dir = os.path.dirname(os.path.realpath(__file__))
root_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(current_dir))))
config_file_path = os.path.join(root_dir, _LOCAL_DEVICES_DIR,
self.device_folder, _DEVICE_CONFIG_FILE)

with open(config_file_path, 'w+', encoding='utf-8') as config_file:
config_file.writelines(json.dumps(self.to_config_json(), indent=4))

def __post_init__(self):
# Store initial values after creation
for f in self.__dataclass_fields__:
Expand Down
Loading
Loading