Skip to content
Merged
24 changes: 24 additions & 0 deletions framework/python/src/test_orc/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import copy
import os
import json
import pathlib
import re
import time
import shutil
Expand Down Expand Up @@ -234,6 +235,8 @@ def _write_reports(self, test_report):

util.run_command(f"chown -R {self._host_user} {out_dir}")

self._cleanup_modules_html_reports(out_dir)

def _generate_report(self):

device = self.get_session().get_target_device()
Expand Down Expand Up @@ -271,6 +274,27 @@ def _generate_report(self):

return report

def _cleanup_modules_html_reports(self, out_dir):
"""Cleans up any HTML reports generated by test modules to save space."""

for module in self._test_modules:
module_template_path = os.path.join(
os.path.join(out_dir, module.name),
f"{module.name}_report.j2.html")
module_report_path = os.path.join(
os.path.join(out_dir, module.name),
f"{module.name}_report.jinja2")
try:
if os.path.exists(module_template_path):
os.remove(module_template_path)
LOGGER.debug(f"Removed module template: {module_template_path}")
if os.path.exists(module_report_path):
p = pathlib.Path(module_report_path)
p.rename(p.with_suffix(".html"))
except Exception as e:
LOGGER.error(f"Error {module_template_path}: {e}")


def _cleanup_old_test_results(self, device):

if device.max_device_reports is not None:
Expand Down
20 changes: 20 additions & 0 deletions modules/test/base/base.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,29 @@ ENV PATH="/opt/venv/bin:$PATH"
ENV REPORT_TEMPLATE_PATH=/testrun/resources
# Jinja base template
ENV BASE_TEMPLATE_FILE=module_report_base.jinja2
# Jinja preview template
ENV BASE_TEMPLATE_PREVIEW_FILE=module_report_base_preview.jinja2
# Jinja base template
ENV BASE_TEMPLATE_STYLED_FILE=module_report_styled.jinja2
# Styles
ENV CSS_FILE=test_report_styles.css
# ICON
ENV LOGO_FILE=testrun.png

# Copy base template
COPY resources/report/$BASE_TEMPLATE_FILE $REPORT_TEMPLATE_PATH/

# Copy base preview template
COPY resources/report/$BASE_TEMPLATE_PREVIEW_FILE $REPORT_TEMPLATE_PATH/

# Copy base template (with styles)
COPY resources/report/$BASE_TEMPLATE_STYLED_FILE $REPORT_TEMPLATE_PATH/

# Copy styles
COPY resources/report/$CSS_FILE $REPORT_TEMPLATE_PATH/

# Copy icon
COPY resources/report/$LOGO_FILE $REPORT_TEMPLATE_PATH/

# Start the test module
ENTRYPOINT [ "/testrun/bin/start" ]
44 changes: 44 additions & 0 deletions modules/test/base/python/src/test_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base class for all core test module functions"""

import base64
import json
import logger
import os
import util
from datetime import datetime
import traceback
from jinja2 import Environment, FileSystemLoader, BaseLoader

from common.statuses import TestResult

Expand All @@ -44,6 +47,12 @@ def __init__(self,
self._device_test_pack = json.loads(os.environ.get('DEVICE_TEST_PACK', ''))
self._report_template_folder = os.environ.get('REPORT_TEMPLATE_PATH')
self._base_template_file=os.environ.get('BASE_TEMPLATE_FILE')
self._base_template_file_preview=os.environ.get(
'BASE_TEMPLATE_PREVIEW_FILE'
)
self._base_template_styled_file=os.environ.get('BASE_TEMPLATE_STYLED_FILE')
self._css_file=os.environ.get('CSS_FILE')
self._logo_file=os.environ.get('LOGO_FILE')
self._log_level = os.environ.get('LOG_LEVEL', None)
self._add_logger(log_name=log_name)
self._config = self._read_config(
Expand Down Expand Up @@ -229,3 +238,38 @@ def _get_device_ipv4(self):
if text:
return text.split('\n')[0]
return None

def _render_styled_report(self, jinja_report, result_path):
# Report styles
with open(os.path.join(self._report_template_folder,
self._css_file),
'r',
encoding='UTF-8'
) as style_file:
styles = style_file.read()

# Load Testrun logo to base64
with open(os.path.join(self._report_template_folder,
self._logo_file), 'rb') as f:
logo = base64.b64encode(f.read()).decode('utf-8')

loader=FileSystemLoader(self._report_template_folder)
template = Environment(
loader=loader,
trim_blocks=True,
lstrip_blocks=True
).get_template(self._base_template_styled_file)

module_template = Environment(loader=BaseLoader()
).from_string(jinja_report).render(
title = 'Testrun report',
logo=logo,
)

report_jinja_styled = template.render(
template=module_template,
styles=styles
)
# Write the styled content to a file
with open(result_path, 'w', encoding='utf-8') as file:
file.write(report_jinja_styled)
22 changes: 19 additions & 3 deletions modules/test/dns/python/src/dns_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

LOG_NAME = 'test_dns'
MODULE_REPORT_FILE_NAME = 'dns_report.j2.html'
MODULE_REPORT_STYLED_FILE_NAME = 'dns_report.jinja2'
DNS_SERVER_CAPTURE_FILE = '/runtime/network/dns.pcap'
STARTUP_CAPTURE_FILE = '/runtime/device/startup.pcap'
MONITOR_CAPTURE_FILE = '/runtime/device/monitor.pcap'
Expand Down Expand Up @@ -161,6 +162,7 @@ def generate_module_report(self):
pages_content.append(current_page_rows)

report_html = ''
report_jinja_preview = ''
if not pages_content:
pages_content = [[]]

Expand All @@ -175,9 +177,23 @@ def generate_module_report(self):
module_data=page_rows
)
report_html += page_html
page_html = template.render(
base_template=self._base_template_file_preview,
module_header=module_header_repr,
summary_headers=summary_headers,
summary_data=summary_data,
module_data_headers=module_data_headers,
module_data=page_rows
)
report_jinja_preview += page_html

LOGGER.debug('Module report:\n' + report_html)

# Generate styled report for a preview
jinja_path_styled = os.path.join(
self._results_dir, MODULE_REPORT_STYLED_FILE_NAME)
self._render_styled_report(report_jinja_preview, jinja_path_styled)

# Use os.path.join to create the complete file path
report_path = os.path.join(self._results_dir, MODULE_REPORT_FILE_NAME)

Expand Down Expand Up @@ -219,7 +235,6 @@ def extract_dns_data(self):
qname = dns_layer.qd.qname.decode() if dns_layer.qd.qname else 'N/A'
else:
qname = 'N/A'

resolved_ip = 'N/A'
# If it's a response packet, extract the resolved IP address
# from the answer section
Expand All @@ -237,14 +252,15 @@ def extract_dns_data(self):
elif answer.type == 28: # Indicates AAAA record (IPv6 address)
resolved_ip = answer.rdata # Extract IPv6 address
break # Stop after finding the first valid resolved IP

qname = qname.rstrip('.') if (isinstance(qname, str)
and qname.endswith('.')) else qname
dns_data.append({
'Timestamp': float(packet.time), # Timestamp of the DNS packet
'Source': source_ip,
'Destination': destination_ip,
'ResolvedIP': resolved_ip, # Adding the resolved IP address
'Type': dns_type,
'Data': qname[:-1]
'Data': qname,
})

# Filter unique entries based on 'Timestamp'
Expand Down
34 changes: 27 additions & 7 deletions modules/test/ntp/python/src/ntp_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

LOG_NAME = 'test_ntp'
MODULE_REPORT_FILE_NAME = 'ntp_report.j2.html'
MODULE_REPORT_STYLED_FILE_NAME = 'ntp_report.jinja2'
NTP_SERVER_CAPTURE_FILE = '/runtime/network/ntp.pcap'
STARTUP_CAPTURE_FILE = '/runtime/device/startup.pcap'
MONITOR_CAPTURE_FILE = '/runtime/device/monitor.pcap'
Expand Down Expand Up @@ -142,7 +143,7 @@ def generate_module_report(self):

# Generate the HTML table with the count column
for (src, dst, typ,
version), avg_diff in average_time_between_requests.items():
version), avg_diff in average_time_between_requests.items():
cnt = len(timestamps[(src, dst, typ, version)])

# Sync Average only applies to client requests
Expand All @@ -169,23 +170,42 @@ def generate_module_report(self):
rows_on_page = ((page_useful_space) // row_height) - 1
start = 0
report_html = ''
report_html_styled = ''
for page in range(pages + 1):
end = start + min(len(module_table_data), rows_on_page)
module_header_repr = module_header if page == 0 else None
page_html = template.render(base_template=self._base_template_file,
module_header=module_header_repr,
summary_headers=summary_headers,
summary_data=summary_data,
module_data_headers=module_data_headers,
module_data=module_table_data[start:end])
page_html = template.render(
base_template=self._base_template_file,
module_header=module_header_repr,
summary_headers=summary_headers,
summary_data=summary_data,
module_data_headers=module_data_headers,
module_data=module_table_data[start:end]
)
page_html_styled = template.render(
base_template=self._base_template_file_preview,
module_header=module_header_repr,
summary_headers=summary_headers,
summary_data=summary_data,
module_data_headers=module_data_headers,
module_data=module_table_data[start:end]
)
report_html += page_html
report_html_styled += page_html_styled
start = end

LOGGER.debug('Module report:\n' + report_html)

# Use os.path.join to create the complete file path
report_path = os.path.join(self._results_dir, MODULE_REPORT_FILE_NAME)

# Use os.path.join to create the complete file path for styled report
report_path_styled = os.path.join(
self._results_dir, MODULE_REPORT_STYLED_FILE_NAME
)
# Generate the styled report for preview
self._render_styled_report(report_html_styled, report_path_styled)

# Write the content to a file
with open(report_path, 'w', encoding='utf-8') as file:
file.write(report_html)
Expand Down
15 changes: 13 additions & 2 deletions modules/test/ntp/python/src/ntp_white_list.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Module to resolve NTP whitelist domains to IP addresses asynchronously."""

import asyncio
import concurrent.futures
import dns.asyncresolver
from logging import Logger

Expand Down Expand Up @@ -99,8 +100,18 @@ def _get_ntp_whitelist_ips(
semaphore_limit: int = 50,
timeout: int = 30
) -> set[str]:
return asyncio.run(
self._get_ips_whitelist(self.config, semaphore_limit, timeout))
# Always run in a separate thread to ensure we have a clean event loop
def run_in_thread():
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
return new_loop.run_until_complete(
self._get_ips_whitelist(self.config, semaphore_limit, timeout))
finally:
new_loop.close()

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
return executor.submit(run_in_thread).result()

# Check if an IP is whitelisted
def is_ip_whitelisted(self, ip: str) -> bool:
Expand Down
14 changes: 14 additions & 0 deletions modules/test/services/python/src/services_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

LOG_NAME = 'test_services'
MODULE_REPORT_FILE_NAME = 'services_report.j2.html'
MODULE_REPORT_STYLED_FILE_NAME = 'services_report.jinja2'
NMAP_SCAN_RESULTS_SCAN_FILE = 'services_scan_results.json'
LOGGER = None
REPORT_TEMPLATE_FILE = 'report_template.jinja2'
Expand Down Expand Up @@ -129,9 +130,22 @@ def generate_module_report(self):
module_data_headers=module_data_headers,
module_data=module_data,
)
html_content_preview = template.render(
base_template=self._base_template_file_preview,
module_header=module_header,
summary_headers=summary_headers,
summary_data=summary_data,
module_data_headers=module_data_headers,
module_data=module_data,
)

LOGGER.debug('Module report:\n' + html_content)

# Generate styled report for a preview
jinja_path_styled = os.path.join(
self._results_dir, MODULE_REPORT_STYLED_FILE_NAME)
self._render_styled_report(html_content_preview, jinja_path_styled)

# Use os.path.join to create the complete file path
report_path = os.path.join(self._results_dir, MODULE_REPORT_FILE_NAME)

Expand Down
Loading
Loading