diff --git a/requirements-runners.txt b/requirements-runners.txt index d1c31a8..cbc1fba 100644 --- a/requirements-runners.txt +++ b/requirements-runners.txt @@ -1,4 +1,5 @@ -ARS_Test_Runner==0.2.4 +# ARS_Test_Runner==0.2.4 # benchmarks-runner==0.1.3 # ui-test-runner==0.0.2 # graph-validation-test-runners==0.1.5 +locust==2.38.1 diff --git a/setup.py b/setup.py index 27b8d68..16e2380 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setup( name="sri-test-harness", - version="0.4.1", + version="0.5.0", author="Max Wang", author_email="max@covar.com", url="https://github.com/TranslatorSRI/TestHarness", diff --git a/test_harness/acceptance_test_runner.py b/test_harness/acceptance_test_runner.py new file mode 100644 index 0000000..80bcbda --- /dev/null +++ b/test_harness/acceptance_test_runner.py @@ -0,0 +1,123 @@ +"""Acceptance Test Pass Fail Analysis Runner.""" + +from typing import Any, Dict, List + + +def run_acceptance_pass_fail_analysis( + report: Dict[str, Any], + agent: str, + results: List[Dict[str, Any]], + out_curie: str, + expect_output: str, +): + """ "Function to run pass fail analysis on individual results.""" + # get the top_n result's ids + try: + all_ids = [] + for res in results: + for res_node, res_value in res["node_bindings"].items(): + for val in res_value: + ids = str(val["id"]) + if ids not in all_ids: + all_ids.append(ids) + if expect_output == "TopAnswer": + n_perc_res = results[0:30] + elif expect_output == "Acceptable": + n_perc_res = results[0 : int(len(results) * (float(50) / 100))] + elif expect_output == "BadButForgivable": + n_perc_res = results[int(len(results) * (float(50) / 100)) :] + elif expect_output == "NeverShow": + n_perc_res = results + else: + error_mesg = { + "error": "You have indicated a wrong category for expected output", + } + return error_mesg + n_perc_ids = [] + for res in n_perc_res: + for res_value in res["node_bindings"].values(): + for val in res_value: + ids = str(val["id"]) + if ids not in n_perc_ids: + n_perc_ids.append(ids) + # get the sugeno score & rank + for idx, res in enumerate(results): + node_bindings = res.get("node_bindings", {}) + for k in node_bindings.keys(): + nb = node_bindings[k] + the_id = None + for c in nb: + the_id = c.get("id") + if the_id == out_curie: + if "sugeno" in res.keys() and "rank" in res.keys(): + ars_score = res["sugeno"] + ars_rank = res["rank"] + ara_score = None + ara_rank = None + else: + ars_score = None + ars_rank = None + for anal in res["analyses"]: + if "score" in anal.keys(): + ara_score = anal["score"] + else: + ara_score = None + ara_rank = idx + 1 + + report[agent]["actual_output"] = {} + if ars_score is not None and ars_rank is not None: + report[agent]["actual_output"]["ars_score"] = ars_score + report[agent]["actual_output"]["ars_rank"] = ars_rank + + if ara_score is not None and ara_rank is not None: + report[agent]["actual_output"]["ara_score"] = ara_score + report[agent]["actual_output"]["ara_rank"] = ara_rank + + if expect_output in ["TopAnswer", "Acceptable"]: + if out_curie in n_perc_ids: + report[agent]["status"] = "PASSED" + elif out_curie not in n_perc_ids: + if out_curie in all_ids: + report[agent]["status"] = "FAILED" + else: + report[agent]["status"] = "FAILED" + report[agent]["actual_output"] = {} + if agent == "ars": + report[agent]["actual_output"]["ars_score"] = None + report[agent]["actual_output"]["ars_rank"] = None + else: + report[agent]["actual_output"]["ara_score"] = None + report[agent]["actual_output"]["ara_rank"] = None + + elif expect_output == "BadButForgivable": + if out_curie in n_perc_ids: + report[agent]["status"] = "PASSED" + elif out_curie not in n_perc_ids and out_curie in all_ids: + report[agent]["status"] = "FAILED" + elif out_curie not in n_perc_ids and out_curie not in all_ids: + report[agent]["status"] = "PASSED" + report[agent]["actual_output"] = {} + if agent == "ars": + report[agent]["actual_output"]["ars_score"] = None + report[agent]["actual_output"]["ars_rank"] = None + else: + report[agent]["actual_output"]["ara_score"] = None + report[agent]["actual_output"]["ara_rank"] = None + + elif expect_output == "NeverShow": + if out_curie in n_perc_ids: + report[agent]["status"] = "FAILED" + elif out_curie not in all_ids: + report[agent]["status"] = "PASSED" + report[agent]["actual_output"] = {} + if agent == "ars": + report[agent]["actual_output"]["ars_score"] = None + report[agent]["actual_output"]["ars_rank"] = None + else: + report[agent]["actual_output"]["ara_score"] = None + report[agent]["actual_output"]["ara_rank"] = None + except Exception as e: + report[agent]["status"] = "FAILED" + report[agent]["message"] = f"An exception happened: {type(e), str(e)}" + + return report diff --git a/test_harness/download.py b/test_harness/download.py index 58d2dd8..6559fc7 100644 --- a/test_harness/download.py +++ b/test_harness/download.py @@ -1,25 +1,25 @@ """Download tests.""" import glob -import httpx import io import json import logging -from pathlib import Path import tempfile -from typing import List, Union, Dict import zipfile +from pathlib import Path +from typing import Dict, List, Union +import httpx from translator_testing_model.datamodel.pydanticmodel import ( - TestCase, PathfinderTestCase, + TestCase, TestSuite, ) def download_tests( suite: Union[str, List[str]], - url: Path, + url: str, logger: logging.Logger, ) -> Dict[str, Union[TestCase, PathfinderTestCase]]: """Download tests from specified location.""" @@ -87,5 +87,15 @@ def download_tests( # test.test_case_type = "acceptance" # tests = all_tests # tests = list(filter((lambda x: x for x in all_tests for asset in x.test_assets if asset.output_id), all_tests)) - logger.info(f"Passing along {len(test_suite.test_cases)} queries") + logger.info(f"Passing along {len(test_suite.test_cases.keys())} queries") return test_suite.test_cases + + +if __name__ == "__main__": + tests = download_tests( + "performance_tests", + "https://github.com/NCATSTranslator/Tests/archive/refs/heads/performance_tests.zip", + logging.Logger("tester"), + ) + for test_case_id, test in tests.items(): + print(type(test)) diff --git a/test_harness/main.py b/test_harness/main.py index 7d33416..831ddfc 100644 --- a/test_harness/main.py +++ b/test_harness/main.py @@ -1,16 +1,22 @@ """Translator SRI Automated Test Harness.""" -from argparse import ArgumentParser -import asyncio +from gevent import monkey + +monkey.patch_all() + import json -from setproctitle import setproctitle +from argparse import ArgumentParser +import time from urllib.parse import urlparse from uuid import uuid4 -from test_harness.run import run_tests +from setproctitle import setproctitle + from test_harness.download import download_tests from test_harness.logger import get_logger, setup_logger from test_harness.reporter import Reporter +from test_harness.result_collector import ResultCollector +from test_harness.run import run_tests from test_harness.slacker import Slacker setproctitle("TestHarness") @@ -24,7 +30,7 @@ def url_type(arg): raise TypeError("Invalid URL") -async def main(args): +def main(args): """Main Test Harness entrypoint.""" qid = str(uuid4())[:8] logger = get_logger(qid, args["log_level"]) @@ -47,18 +53,57 @@ async def main(args): refresh_token=args.get("reporter_access_token"), logger=logger, ) - await reporter.get_auth() - await reporter.create_test_run(next(iter(tests.values())).test_env, args["suite"]) + reporter.get_auth() + reporter.create_test_run(next(iter(tests.values())).test_env, args["suite"]) slacker = Slacker() - report = await run_tests(reporter, slacker, tests, logger, args) + collector = ResultCollector(logger) + queried_envs = set() + for test in tests.values(): + queried_envs.add(test.test_env) + slacker.post_notification( + messages=[ + f"Running {args['suite']} ({sum([len(test.test_assets) for test in tests.values()])} tests, {len(tests.values())} queries)...\n<{reporter.base_path}/test-runs/{reporter.test_run_id}|View in the Information Radiator>" + ] + ) + start_time = time.time() + run_tests(tests, reporter, collector, logger, args) + + slacker.post_notification( + messages=[ + """Test Suite: {test_suite}\nDuration: {duration} | Environment(s): {envs}\n<{ir_url}|View in the Information Radiator>\n{result_summary}""".format( + test_suite=args["suite"], + duration=round(time.time() - start_time, 2), + envs=(",").join(list(queried_envs)), + ir_url=f"{reporter.base_path}/test-runs/{reporter.test_run_id}", + result_summary=collector.dump_result_summary(), + ) + ] + ) + if collector.has_acceptance_results: + slacker.upload_test_results_file( + reporter.test_name, + "json", + collector.acceptance_stats, + ) + slacker.upload_test_results_file( + reporter.test_name, + "csv", + collector.acceptance_csv, + ) + if collector.has_performance_results: + slacker.upload_test_results_file( + reporter.test_name, + "json", + collector.performance_stats, + ) logger.info("Finishing up test run...") - await reporter.finish_test_run() + reporter.finish_test_run() if args["json_output"]: # logger.info("Saving report as JSON...") with open("test_report.json", "w") as f: - json.dump(report, f) + json.dump(collector.acceptance_report, f) return logger.info("All tests have completed!") @@ -135,7 +180,7 @@ def cli(): ) args = parser.parse_args() - asyncio.run(main(vars(args))) + main(vars(args)) if __name__ == "__main__": diff --git a/test_harness/pathfinder_test_runner.py b/test_harness/pathfinder_test_runner.py index 0799d31..90f1c91 100644 --- a/test_harness/pathfinder_test_runner.py +++ b/test_harness/pathfinder_test_runner.py @@ -1,13 +1,13 @@ -from typing import Dict, Union, List +from typing import Any, Dict, List -async def pathfinder_pass_fail_analysis( - report: Dict[str, any], +def pathfinder_pass_fail_analysis( + report: Dict[str, Any], agent: str, - message: Dict[str, any], + message: Dict[str, Any], path_nodes: List[List[str]], minimum_required_path_nodes: int, -) -> Dict[str, any]: +) -> Dict[str, Any]: found_path_nodes = set() unmatched_paths = set() for analysis in message["results"][0]["analyses"]: diff --git a/test_harness/performance_test_runner.py b/test_harness/performance_test_runner.py new file mode 100644 index 0000000..3572fd0 --- /dev/null +++ b/test_harness/performance_test_runner.py @@ -0,0 +1,276 @@ +"""Translator Performance Test Runner.""" + +import logging +import time +from typing import Dict + +import gevent +from locust import HttpUser, LoadTestShape, task +from locust.env import Environment +from locust.stats import stats_history, stats_printer +from translator_testing_model.datamodel.pydanticmodel import ( + AcceptanceTestAsset, + ComponentEnum, + PerformanceTestCase, + TestEnvEnum, + TestObjectiveEnum, +) + +from test_harness.runner.generate_query import generate_query +from test_harness.runner.query_runner import QueryRunner, env_map + + +def run_locust_tests( + host: str, + test_query: Dict, + test_run_time: int, + spawn_rate: float, + target: str, +): + print("Starting locust testing") + + class RetryPoll(Exception): + pass + + class QueryCompleted(Exception): + pass + + class TestShape(LoadTestShape): + time_limit = test_run_time + user_spawn_rate = spawn_rate + + def tick(self): + run_time = self.get_run_time() + if run_time < self.time_limit: + user_count = round(run_time, -1) * self.user_spawn_rate + return (user_count, self.user_spawn_rate) + + return None + + class ARAUser(HttpUser): + @task + def send_query(self): + with self.client.post( + "/query", json=test_query, catch_response=True + ) as response: + # do stuff with the response + if response.status_code == 200: + response.success() + else: + response.failure(f"Got a bad response: {response.status_code}") + + class ARSUser(HttpUser): + @task + def send_query(self): + parent_pk = "" + try: + print("Sending query to ARS") + with self.client.post( + "/ars/api/submit", json=test_query, catch_response=True + ) as response: + # do stuff with the response + if response.status_code != 201: + response.failure( + f"Failed to start a query: {response.content}, {response.status_code}" + ) + return + + parent_pk = response.json().get("pk", "") + raise QueryCompleted() + except QueryCompleted: + pass + + start_time = time.time() + now = time.time() + total_time = 0 + try: + while now - start_time <= 3600: + now = time.time() + try: + with self.client.get( + f"/ars/api/messages/{parent_pk}?trace=y", + catch_response=True, + name=parent_pk, + ) as response: + total_time = (now - start_time) * 1000 + if response.status_code != 200: + self.environment.events.request.fire( + request_type="GET_RESPONSE", + name="/ars/api/messages", + response_time=total_time, + response_length=0, + exception=f"Failed to poll the query: {response.content}, {response.status_code}", + context=self.context(), + ) + res = response.json() + status = res.get("status") + if status == "Error": + self.environment.events.request.fire( + request_type="GET_RESPONSE", + name="/ars/api/messages", + response_time=total_time, + response_length=0, + exception=f"ARS had an error: {parent_pk}", + context=self.context(), + ) + elif status == "Done": + self.environment.events.request.fire( + request_type="GET_RESPONSE", + name="/ars/api/messages", + response_time=total_time, + response_length=( + len(response.content) if response.content else 0 + ), + exception=None, + context=self.context(), + ) + raise QueryCompleted() + time.sleep(5) + raise RetryPoll("Polling again") + except RetryPoll: + continue + + self.environment.events.request.fire( + request_type="GET_RESPONSE", + name="/ars/api/messages", + response_time=total_time, + response_length=0, + exception=f"ARS timed out: {parent_pk}", + context=self.context(), + ) + except QueryCompleted: + print("Query complete!") + pass + + # Create environment + USER_TYPE_MAP = { + "ars": ARSUser, + "aragorn": ARAUser, + "arax": ARAUser, + "bte": ARAUser, + } + user_class = USER_TYPE_MAP.get(target, ARAUser) + env = Environment(user_classes=[user_class], host=host, shape_class=TestShape()) + runner = env.create_local_runner() + + # Start stats printer + gevent.spawn(stats_printer(env.stats)) + gevent.spawn(stats_history, runner) + + # Start test + runner.start_shape() + + # Run for specified duration + gevent.spawn_later(test_run_time, runner.quit) + + # Wait for completion + runner.greenlet.join() + runner.quit() + + print("Done with locust testing!") + + return { + "stats": env.stats.serialize_stats(), + "failures": env.stats.serialize_errors(), + } + + +def run_performance_test(test: PerformanceTestCase, test_query: Dict, host: str): + """Wrapper function to run load tests with custom parameters""" + target = test.components[0] + + results = run_locust_tests( + host, + test_query, + test.test_run_time, + test.spawn_rate, + target, + ) + + return results + + +def initialize(): + test_asset = AcceptanceTestAsset.model_validate( + { + "id": "Asset_1", + "name": "NeverShow: Iron (PUBCHEM) treats Aceruloplasminemia", + "description": "NeverShow: Iron (PUBCHEM) treats Aceruloplasminemia", + "tags": [], + "test_runner_settings": ["inferred"], + "input_id": "MONDO:0011426", + "input_name": "Aceruloplasminemia", + "input_category": "biolink:Disease", + "predicate_id": "biolink:treats", + "predicate_name": "treats", + "output_id": "PUBCHEM.COMPOUND:23925", + "output_name": "Iron (PUBCHEM)", + "output_category": "biolink:ChemicalEntity", + "association": None, + "qualifiers": [ + {"parameter": "biolink_qualified_predicate", "value": "biolink:treats"}, + {"parameter": "biolink_object_aspect_qualifier", "value": ""}, + {"parameter": "biolink_object_direction_qualifier", "value": ""}, + ], + "expected_output": "NeverShow", + "test_issue": None, + "semantic_severity": None, + "in_v1": None, + "well_known": False, + "test_reference": None, + "test_metadata": { + "id": "1", + "name": None, + "description": None, + "tags": [], + "test_runner_settings": [], + "test_source": "SMURF", + "test_reference": "https://github.com/NCATSTranslator/Feedback/issues/506", + "test_objective": "AcceptanceTest", + "test_annotations": [], + }, + } + ) + test = PerformanceTestCase( + id="1", + name="ExamplePerformanceTest", + description="Iron treats Aceruloplasminemia", + tags=[], + test_runner_settings=["inferred"], + test_run_time=20, + spawn_rate=0.1, + query_type=None, + test_assets=[test_asset], + preconditions=[], + trapi_template=None, + test_case_objective=TestObjectiveEnum.QuantitativeTest, + test_case_source=None, + test_case_predicate_name="treats", + test_case_predicate_id="biolink_treats", + test_case_input_id="MONDO:0011426", + qualifiers=[], + input_category="biolink:Disease", + output_category=None, + components=[ComponentEnum.ars], + test_env=TestEnvEnum.ci, + ) + logger = logging.getLogger(__name__) + query_runner = QueryRunner(logger) + query_runner.retrieve_registry("1.6.0") + # print(query_runner.registry) + + host = query_runner.registry[env_map[test.test_env]][test.components[0]][0]["url"] + + test_query = generate_query(test.test_assets[0]) + + results = run_performance_test( + test, + test_query, + host, + ) + + print(results) + + +if __name__ == "__main__": + initialize() diff --git a/test_harness/reporter.py b/test_harness/reporter.py index 5a953fb..f37ef2a 100644 --- a/test_harness/reporter.py +++ b/test_harness/reporter.py @@ -1,16 +1,17 @@ """Information Radiator Reporter.""" -from datetime import datetime -import httpx import logging import os +from datetime import datetime from typing import List, Union +import httpx from translator_testing_model.datamodel.pydanticmodel import ( - TestCase, + PathfinderTestAsset, PathfinderTestCase, + PerformanceTestCase, TestAsset, - PathfinderTestAsset, + TestCase, ) @@ -32,10 +33,10 @@ def __init__( self.test_name = "" self.logger = logger - async def get_auth(self): + def get_auth(self): """Get access token for subsequent calls.""" - async with httpx.AsyncClient() as client: - res = await client.post( + with httpx.Client() as client: + res = client.post( url=f"{self.base_path}/api/iam/v1/auth/refresh", json={ "refreshToken": self.refresh_token, @@ -43,16 +44,16 @@ async def get_auth(self): ) res.raise_for_status() auth_response = res.json() - self.authenticated_client = httpx.AsyncClient( + self.authenticated_client = httpx.Client( headers={ "Authorization": f"Bearer {auth_response['authToken']}", } ) - async def create_test_run(self, test_env, suite_name): + def create_test_run(self, test_env, suite_name): """Create a test run in the IR.""" self.test_name = f"{suite_name}: {datetime.now().strftime('%Y_%m_%d_%H_%M')}" - res = await self.authenticated_client.post( + res = self.authenticated_client.post( url=f"{self.base_path}/api/reporting/v1/test-runs", json={ "name": self.test_name, @@ -68,9 +69,9 @@ async def create_test_run(self, test_env, suite_name): self.test_run_id = res_json["id"] return self.test_run_id - async def create_test( + def create_test( self, - test: Union[TestCase, PathfinderTestCase], + test: Union[TestCase, PathfinderTestCase, PerformanceTestCase], asset: Union[TestAsset, PathfinderTestAsset], ): """Create a test in the IR.""" @@ -95,7 +96,7 @@ async def create_test( }, ], } - if isinstance(test, TestCase) and isinstance(asset, TestAsset): + if isinstance(test, PerformanceTestCase) and isinstance(asset, TestAsset): test_json["labels"].extend( [ { @@ -103,10 +104,14 @@ async def create_test( "value": asset.input_id, }, { - "key": "OutputCurie", - "value": asset.output_id, + "key": "TestRunTime", + "value": test.test_run_time, }, - ] + { + "key": "UserSpawnRate", + "value": test.spawn_rate, + }, + ], ) elif isinstance(test, PathfinderTestCase) and isinstance( asset, PathfinderTestAsset @@ -123,9 +128,23 @@ async def create_test( }, ] ) + elif isinstance(test, TestCase) and isinstance(asset, TestAsset): + test_json["labels"].extend( + [ + { + "key": "InputCurie", + "value": asset.input_id, + }, + { + "key": "OutputCurie", + "value": asset.output_id, + }, + ] + ) else: + print("made it to the error section") raise Exception - res = await self.authenticated_client.post( + res = self.authenticated_client.post( url=f"{self.base_path}/api/reporting/v1/test-runs/{self.test_run_id}/tests", json=test_json, ) @@ -133,10 +152,10 @@ async def create_test( res_json = res.json() return res_json["id"] - async def upload_labels(self, test_id: int, labels: List[dict]): + def upload_labels(self, test_id: int, labels: List[dict]): """Upload labels to the IR.""" self.logger.info(labels) - res = await self.authenticated_client.put( + res = self.authenticated_client.put( url=f"{self.base_path}/api/reporting/v1/test-runs/{self.test_run_id}/tests/{test_id}/labels", json={ "items": labels, @@ -144,9 +163,9 @@ async def upload_labels(self, test_id: int, labels: List[dict]): ) res.raise_for_status() - async def upload_logs(self, test_id: int, logs: List[str]): + def upload_logs(self, test_id: int, logs: List[str]): """Upload logs to the IR.""" - res = await self.authenticated_client.post( + res = self.authenticated_client.post( url=f"{self.base_path}/api/reporting/v1/test-runs/{self.test_run_id}/logs", json=[ { @@ -160,17 +179,17 @@ async def upload_logs(self, test_id: int, logs: List[str]): ) res.raise_for_status() - async def upload_artifact_references(self, test_id, artifact_references): + def upload_artifact_references(self, test_id, artifact_references): """Upload artifact references to the IR.""" - res = await self.authenticated_client.put( + res = self.authenticated_client.put( url=f"{self.base_path}/api/reporting/v1/test-runs/{self.test_run_id}/tests/{test_id}/artifact-references", json=artifact_references, ) res.raise_for_status() - async def upload_screenshot(self, test_id, screenshot): + def upload_screenshot(self, test_id, screenshot): """Upload screenshots to the IR.""" - res = await self.authenticated_client.post( + res = self.authenticated_client.post( url=f"{self.base_path}/api/reporting/v1/test-runs/{self.test_run_id}/tests/{test_id}/screenshots", headers={ "Content-Type": "image/png", @@ -180,9 +199,9 @@ async def upload_screenshot(self, test_id, screenshot): ) res.raise_for_status() - async def upload_log(self, test_id, message): + def upload_log(self, test_id, message): """Upload logs to the IR.""" - res = await self.authenticated_client.post( + res = self.authenticated_client.post( url=f"{self.base_path}/api/reporting/v1/test-runs/{self.test_run_id}/logs", json=[ { @@ -195,9 +214,9 @@ async def upload_log(self, test_id, message): ) res.raise_for_status() - async def finish_test(self, test_id, result): + def finish_test(self, test_id, result): """Set the final status of a test.""" - res = await self.authenticated_client.put( + res = self.authenticated_client.put( url=f"{self.base_path}/api/reporting/v1/test-runs/{self.test_run_id}/tests/{test_id}", json={ "result": result, @@ -208,9 +227,9 @@ async def finish_test(self, test_id, result): res_json = res.json() return res_json["result"] - async def finish_test_run(self): + def finish_test_run(self): """Set the final status of a test run.""" - res = await self.authenticated_client.put( + res = self.authenticated_client.put( url=f"{self.base_path}/api/reporting/v1/test-runs/{self.test_run_id}", json={ "endedAt": datetime.now().astimezone().isoformat(), diff --git a/test_harness/result_collector.py b/test_harness/result_collector.py index 622ba29..244d26c 100644 --- a/test_harness/result_collector.py +++ b/test_harness/result_collector.py @@ -1,23 +1,41 @@ """The Collector of Results.""" import logging -from typing import Union +from typing import Dict, Union + from translator_testing_model.datamodel.pydanticmodel import ( - TestAsset, PathfinderTestAsset, - TestCase, PathfinderTestCase, + TestAsset, + TestCase, ) from test_harness.utils import get_tag +def median_from_dict(total: int, count: dict[int, int]) -> int: + """ + total is the number of requests made + count is a dict {response_time: count} + """ + pos = (total - 1) / 2 + k = 0 + for k in sorted(count.keys()): + if pos < count[k]: + return k + pos -= count[k] + + return k + + class ResultCollector: """Collect results for easy dissemination.""" def __init__(self, logger: logging.Logger): """Initialize the Collector.""" self.logger = logger + self.has_acceptance_results = False + self.has_performance_results = False self.agents = [ "ars", "aragorn", @@ -28,25 +46,35 @@ def __init__(self, logger: logging.Logger): "cqs", ] self.query_types = ["TopAnswer", "Acceptable", "BadButForgivable", "NeverShow"] + self.acceptance_report = { + "PASSED": 0, + "FAILED": 0, + "SKIPPED": 0, + } self.result_types = { "PASSED": "PASSED", "FAILED": "FAILED", "No results": "No results", "-": "Test Error", } - self.stats = {} + self.acceptance_stats = {} for agent in self.agents: - self.stats[agent] = {} + self.acceptance_stats[agent] = {} for query_type in self.query_types: - self.stats[agent][query_type] = {} + self.acceptance_stats[agent][query_type] = {} for result_type in self.result_types.values(): - self.stats[agent][query_type][result_type] = 0 + self.acceptance_stats[agent][query_type][result_type] = 0 self.columns = ["name", "url", "pk", "TestCase", "TestAsset", *self.agents] header = ",".join(self.columns) - self.csv = f"{header}\n" + self.acceptance_csv = f"{header}\n" + self.performance_stats = {} + self.performance_report = { + "stats": {}, + "failures": {}, + } - def collect_result( + def collect_acceptance_result( self, test: Union[TestCase, PathfinderTestCase], asset: Union[TestAsset, PathfinderTestAsset], @@ -55,6 +83,7 @@ def collect_result( url: str, ): """Add a single report to the total output.""" + self.has_acceptance_results = True # add result to stats for agent in self.agents: query_type = asset.expected_output @@ -63,10 +92,10 @@ def collect_result( get_tag(report[agent]), "Test Error" ) if ( - query_type in self.stats[agent] - and result_type in self.stats[agent][query_type] + query_type in self.acceptance_stats[agent] + and result_type in self.acceptance_stats[agent][query_type] ): - self.stats[agent][query_type][result_type] += 1 + self.acceptance_stats[agent][query_type][result_type] += 1 else: self.logger.error( f"Got {query_type} and {result_type} and can't put into stats!" @@ -80,6 +109,80 @@ def collect_result( pk_url = ( f"https://arax.ci.ncats.io/?r={parent_pk}" if parent_pk is not None else "" ) - self.csv += ( + self.acceptance_csv += ( f""""{asset.name}",{url},{pk_url},{test.id},{asset.id},{agent_results}\n""" ) + + def collect_performance_result( + self, + test: Union[TestCase, PathfinderTestCase], + asset: Union[TestAsset, PathfinderTestAsset], + url: str, + host_url: str, + results: Dict, + ): + """Add a single report for a performance test.""" + self.has_performance_results = True + results_stats = results.get("stats") or [] + for result_stat in results_stats: + self.performance_report["stats"][host_url] = { + "endpoint": f"{host_url}{result_stat['name']}", + "num_requests": result_stat.get("num_requests", 0) + - result_stat.get("num_none_requests", 0), + "num_failures": result_stat.get("num_failures", 0), + "max_response_time": result_stat.get("max_response_time", -1), + "min_response_time": result_stat.get("min_response_time", -1), + "requests_per_second": result_stat.get("num_requests", 1) + / ( + result_stat.get("last_request_timestamp", -1) + - result_stat.get("start_time", 0) + ), + "average_response_time": result_stat.get("total_response_time") + / ( + result_stat.get("num_requests", 0) + - result_stat.get("num_none_requests", 0) + ), + "median_response_time": median_from_dict( + result_stat.get("num_requests", 0) + - result_stat.get("num_none_requests", 0), + result_stat.get("response_times", {}), + ), + } + self.performance_report["failures"] = results.get("failures") or {} + + stats_id = f"{host_url}_case_{test.id}_asset_{asset.id}" + self.performance_stats[stats_id] = { + "information_radiator_url": url, + **results, + } + + def dump_result_summary(self): + """Format test results summary for Slack.""" + results_formatted = "" + if self.has_acceptance_results: + results_formatted += f""" +> Acceptance Test Results: +> Passed: {self.acceptance_report['PASSED']}, +> Failed: {self.acceptance_report['FAILED']}, +> Skipped: {self.acceptance_report['SKIPPED']} +""" + if self.has_performance_results: + results_formatted += """ +> Performance Test Results:""" + for target_url, target_stats in self.performance_report["stats"].items(): + results_formatted += f"""> {target_url} +> - Endpoint: {target_stats["endpoint"]} +> - Number of Requests: {target_stats["num_requests"]} +> - Number of Failures: {target_stats["num_failures"]} +> - Average Response Time: {target_stats["average_response_time"] / 1000} seconds +> - Median Response Time: {target_stats["median_response_time"] / 1000} seconds +> - Average Requests Per Second: {target_stats["requests_per_second"]}""" + if len(self.performance_report["failures"].keys()): + results_formatted += "> Failures:" + for failure_stat in self.performance_report["failures"].values(): + results_formatted += f"""--- +> {failure_stat.get("name", "Unknown")} +> {failure_stat.get("error", "Unknown Error")} +> {failure_stat.get("occurrences", 0)}""" + + return results_formatted diff --git a/test_harness/run.py b/test_harness/run.py index 0384931..a60337b 100644 --- a/test_harness/run.py +++ b/test_harness/run.py @@ -2,66 +2,51 @@ import json import logging -import time -from tqdm import tqdm -import traceback -from typing import Dict, Union +from typing import Any, Dict, Union -from ARS_Test_Runner.semantic_test import pass_fail_analysis +from tqdm import tqdm # from standards_validation_test_runner import StandardsValidationTest - # from benchmarks_runner import run_benchmarks - from translator_testing_model.datamodel.pydanticmodel import ( - TestCase, PathfinderTestCase, + PerformanceTestCase, + TestCase, ) -from test_harness.runner.query_runner import QueryRunner +from test_harness.acceptance_test_runner import run_acceptance_pass_fail_analysis +from test_harness.pathfinder_test_runner import pathfinder_pass_fail_analysis +from test_harness.performance_test_runner import run_performance_test from test_harness.reporter import Reporter -from test_harness.slacker import Slacker from test_harness.result_collector import ResultCollector +from test_harness.runner.generate_query import generate_query +from test_harness.runner.query_runner import QueryRunner, env_map from test_harness.utils import get_tag, hash_test_asset -from test_harness.pathfinder_test_runner import pathfinder_pass_fail_analysis -async def run_tests( - reporter: Reporter, - slacker: Slacker, +def run_tests( tests: Dict[str, Union[TestCase, PathfinderTestCase]], + reporter: Reporter, + collector: ResultCollector, logger: logging.Logger = logging.getLogger(__name__), - args: Dict[str, any] = {}, -) -> Dict: + args: Dict[str, Any] = {}, +) -> None: """Send tests through the Test Runners.""" - start_time = time.time() logger.info(f"Running {len(tests)} queries...") - full_report = { - "PASSED": 0, - "FAILED": 0, - "SKIPPED": 0, - } - env = "None" - await slacker.post_notification( - messages=[ - f"Running {args['suite']} ({sum([len(test.test_assets) for test in tests.values()])} tests, {len(tests.values())} queries)...\n<{reporter.base_path}/test-runs/{reporter.test_run_id}|View in the Information Radiator>" - ] - ) query_runner = QueryRunner(logger) logger.info("Runner is getting service registry") - await query_runner.retrieve_registry(trapi_version=args["trapi_version"]) - collector = ResultCollector(logger) + query_runner.retrieve_registry(trapi_version=args["trapi_version"]) # loop over all tests - for test in tqdm(tests.values()): + for test in tqdm(list(tests.values())[:2]): status = "PASSED" - env = test.test_env # check if acceptance test if not test.test_assets or not test.test_case_objective: logger.warning(f"Test has missing required fields: {test.id}") continue - query_responses, normalized_curies = await query_runner.run_queries(test) + query_responses = {} if test.test_case_objective == "AcceptanceTest": + query_responses, normalized_curies = query_runner.run_queries(test) test_ids = [] for asset in test.test_assets: @@ -74,7 +59,7 @@ async def run_tests( # create test in Test Dashboard test_id = "" try: - test_id = await reporter.create_test(test, asset) + test_id = reporter.create_test(test, asset) test_ids.append(test_id) except Exception as e: logger.error(f"Failed to create test: {test.id}") @@ -86,7 +71,7 @@ async def run_tests( message = json.dumps(test_query["query"], indent=2) else: message = "Unable to retrieve response for test asset." - await reporter.upload_log( + reporter.upload_log( test_id, message, ) @@ -168,7 +153,7 @@ async def run_tests( agent_report["message"] = "No results" continue if isinstance(test, PathfinderTestCase): - await pathfinder_pass_fail_analysis( + pathfinder_pass_fail_analysis( report["result"], agent, response["response"]["message"], @@ -182,7 +167,7 @@ async def run_tests( asset.minimum_required_path_nodes, ) else: - await pass_fail_analysis( + run_acceptance_pass_fail_analysis( report["result"], agent, response["response"]["message"]["results"], @@ -200,9 +185,9 @@ async def run_tests( # grab only ars result if it exists, otherwise default to failed ars_status = report["result"].get("ars", {}).get("status") status = ars_status if ars_status is not None else "SKIPPED" - full_report[status] += 1 + collector.acceptance_report[status] += 1 - collector.collect_result( + collector.collect_acceptance_result( test, asset, report["result"], @@ -221,89 +206,98 @@ async def run_tests( for ara in collector.agents if ara in report["result"] ] - await reporter.upload_labels(test_id, labels) + reporter.upload_labels(test_id, labels) except Exception as e: logger.warning(f"[{test.id}] failed to upload labels: {e}") try: - await reporter.upload_log(test_id, json.dumps(report, indent=4)) + reporter.upload_log(test_id, json.dumps(report, indent=4)) except Exception: logger.error(f"[{test.id}] failed to upload logs.") else: status = "SKIPPED" try: - await reporter.finish_test(test_id, status) + reporter.finish_test(test_id, status) except Exception: logger.error(f"[{test.id}] failed to upload finished status.") elif test.test_case_objective == "QuantitativeTest": - continue - assets = test.test_assets[0] - try: - test_id = await reporter.create_test(test, assets) - except Exception: - logger.error(f"Failed to create test: {test.id}") - continue - try: - test_inputs = [ - assets.id, - # TODO: update this. Assumes is going to be ARS - test.components[0], - ] - await reporter.upload_log( - test_id, - f"Calling Benchmark Test Runner with: {json.dumps(test_inputs, indent=4)}", - ) - benchmark_results, screenshots = await run_benchmarks(*test_inputs) - await reporter.upload_log(test_id, ("\n").join(benchmark_results)) - # ex: - # { - # "aragorn": { - # "precision": screenshot - # } - # } - for target_screenshots in screenshots.values(): - for screenshot in target_screenshots.values(): - await reporter.upload_screenshot(test_id, screenshot) - await reporter.finish_test(test_id, "PASSED") - full_report["PASSED"] += 1 - except Exception as e: - logger.error(f"Benchmarks failed with {e}: {traceback.format_exc()}") - full_report["FAILED"] += 1 + # create test in Test Dashboard + test_ids = [] + for asset in test.test_assets: + test_id = "" try: - await reporter.upload_log(test_id, traceback.format_exc()) - except Exception: - logger.error( - f"Failed to upload fail logs for test {test_id}: {traceback.format_exc()}" + test_id = reporter.create_test(test, asset) + test_ids.append(test_id) + except Exception as e: + logger.error(f"Failed to create test: {test.id}", e) + continue + + if isinstance(test, PerformanceTestCase): + test_query = generate_query(asset) + if test_query is not None: + message = json.dumps(test_query, indent=2) + else: + message = "Unable to retrieve response for test asset." + reporter.upload_log( + test_id, + message, ) - await reporter.finish_test(test_id, "FAILED") + host = query_runner.registry[env_map[test.test_env]][ + test.components[0] + ][0]["url"] + results = run_performance_test(test, test_query, host) + + collector.collect_performance_result( + test, + asset, + f"{reporter.base_path}/test-runs/{reporter.test_run_id}/tests/{test_id}", + host, + results, + ) + # try: + # test_inputs = [ + # assets.id, + # # TODO: update this. Assumes is going to be ARS + # test.components[0], + # ] + # await reporter.upload_log( + # test_id, + # f"Calling Benchmark Test Runner with: {json.dumps(test_inputs, indent=4)}", + # ) + # benchmark_results, screenshots = await run_benchmarks(*test_inputs) + # await reporter.upload_log(test_id, ("\n").join(benchmark_results)) + # # ex: + # # { + # # "aragorn": { + # # "precision": screenshot + # # } + # # } + # for target_screenshots in screenshots.values(): + # for screenshot in target_screenshots.values(): + # await reporter.upload_screenshot(test_id, screenshot) + # await reporter.finish_test(test_id, "PASSED") + # collector.full_report["PASSED"] += 1 + # except Exception as e: + # logger.error(f"Benchmarks failed with {e}: {traceback.format_exc()}") + # collector.full_report["FAILED"] += 1 + # try: + # await reporter.upload_log(test_id, traceback.format_exc()) + # except Exception: + # logger.error( + # f"Failed to upload fail logs for test {test_id}: {traceback.format_exc()}" + # ) + # await reporter.finish_test(test_id, "FAILED") else: try: - test_id = await reporter.create_test(test, test) + test_id = reporter.create_test(test, test.test_assets[0]) logger.error(f"Unsupported test type: {test.id}") - await reporter.upload_log( + reporter.upload_log( test_id, f"Unsupported test type in test: {test.id}" ) status = "FAILED" - await reporter.finish_test(test_id, status) + reporter.finish_test(test_id, status) except Exception: logger.error(f"Failed to report errors with: {test.id}") # delete this big object to help out the garbage collector del query_responses - - await slacker.post_notification( - messages=[ - """Test Suite: {test_suite}\nDuration: {duration} | Environment: {env}\n<{ir_url}|View in the Information Radiator>\n> Test Results:\n> Passed: {num_passed}, Failed: {num_failed}, Skipped: {num_skipped}""".format( - test_suite=args["suite"], - duration=round(time.time() - start_time, 2), - env=env, - ir_url=f"{reporter.base_path}/test-runs/{reporter.test_run_id}", - num_passed=full_report["PASSED"], - num_failed=full_report["FAILED"], - num_skipped=full_report["SKIPPED"], - ) - ] - ) - await slacker.upload_test_results_file(reporter.test_name, "json", collector.stats) - await slacker.upload_test_results_file(reporter.test_name, "csv", collector.csv) - return full_report diff --git a/test_harness/runner/generate_query.py b/test_harness/runner/generate_query.py index 8da5d5d..f65f942 100644 --- a/test_harness/runner/generate_query.py +++ b/test_harness/runner/generate_query.py @@ -2,14 +2,14 @@ import copy from typing import Union + from translator_testing_model.datamodel.pydanticmodel import ( - TestAsset, PathfinderTestAsset, + TestAsset, ) from test_harness.utils import get_qualifier_constraints - MVP1 = { "message": { "query_graph": { @@ -144,3 +144,54 @@ def generate_query(test_asset: Union[TestAsset, PathfinderTestAsset]) -> dict: raise Exception(f"Unsupported predicate: {test_asset.predicate_id}") return query + + +if __name__ == "__main__": + test_asset = TestAsset.parse_obj( + { + "id": "Asset_450", + "name": "NeverShow: MMP3 increases activity or abundance of Potassium ion", + "description": "NeverShow: MMP3 increases activity or abundance of Potassium ion", + "tags": [], + "test_runner_settings": ["inferred"], + "input_id": "CHEBI:29103", + "input_name": "Potassium ion", + "input_category": "biolink:ChemicalEntity", + "predicate_id": "biolink:affects", + "predicate_name": "affects", + "output_id": "NCBIGene:4314", + "output_name": "MMP3", + "output_category": "biolink:Gene", + "association": None, + "qualifiers": [ + {"parameter": "biolink_qualified_predicate", "value": "biolink:causes"}, + { + "parameter": "biolink_object_aspect_qualifier", + "value": "activity_or_abundance", + }, + { + "parameter": "biolink_object_direction_qualifier", + "value": "increased", + }, + ], + "expected_output": "NeverShow", + "test_issue": None, + "semantic_severity": None, + "in_v1": None, + "well_known": False, + "test_reference": None, + "test_metadata": { + "id": "1", + "name": None, + "description": None, + "tags": [], + "test_runner_settings": [], + "test_source": "SMURF", + "test_reference": "https://github.com/NCATSTranslator/Feedback/issues/740", + "test_objective": "AcceptanceTest", + "test_annotations": [], + }, + } + ) + query = generate_query(test_asset) + print(query) diff --git a/test_harness/runner/query_runner.py b/test_harness/runner/query_runner.py index 6f9470c..8854e7f 100644 --- a/test_harness/runner/query_runner.py +++ b/test_harness/runner/query_runner.py @@ -1,17 +1,17 @@ """Translator Test Query Runner.""" -import asyncio -import httpx import logging import time -from typing import Tuple, Dict, Union +from typing import Dict, Tuple, Union +import httpx from translator_testing_model.datamodel.pydanticmodel import ( - TestCase, PathfinderTestCase, + TestCase, ) -from test_harness.runner.smart_api_registry import retrieve_registry_from_smartapi + from test_harness.runner.generate_query import generate_query +from test_harness.runner.smart_api_registry import retrieve_registry_from_smartapi from test_harness.utils import hash_test_asset, normalize_curies MAX_QUERY_TIME = 600 @@ -32,57 +32,56 @@ def __init__(self, logger: logging.Logger): self.registry = {} self.logger = logger - async def retrieve_registry(self, trapi_version: str): - self.registry = await retrieve_registry_from_smartapi(trapi_version) + def retrieve_registry(self, trapi_version: str): + self.registry = retrieve_registry_from_smartapi(trapi_version) - async def run_query( - self, query_hash, semaphore, message, base_url, infores + def run_query( + self, query_hash, message, base_url, infores ) -> Tuple[int, Dict[str, dict], Dict[str, str]]: """Generate and run a single TRAPI query against a component.""" # wait for opening in semaphore before sending next query responses = {} pks = {} - async with semaphore: - # handle some outlier urls - if infores == "infores:ars": - url = base_url + "/ars/api/submit" - elif infores == "infores:sri-answer-appraiser": - url = base_url + "/get_appraisal" - elif infores == "infores:sri-node-normalizer": - url = base_url + "/get_normalized_nodes" - elif "annotator" in base_url: - url = base_url - pass - else: - url = base_url + "/query" - # send message - response = {} - status_code = 418 - async with httpx.AsyncClient(timeout=600) as client: - try: - res = await client.post(url, json=message) - status_code = res.status_code - res.raise_for_status() - response = res.json() - except Exception as e: - self.logger.error(f"Something went wrong: {e}") + # handle some outlier urls + if infores == "infores:ars": + url = base_url + "/ars/api/submit" + elif infores == "infores:sri-answer-appraiser": + url = base_url + "/get_appraisal" + elif infores == "infores:sri-node-normalizer": + url = base_url + "/get_normalized_nodes" + elif "annotator" in base_url: + url = base_url + pass + else: + url = base_url + "/query" + # send message + response = {} + status_code = 418 + with httpx.Client(timeout=600) as client: + try: + res = client.post(url, json=message) + status_code = res.status_code + res.raise_for_status() + response = res.json() + except Exception as e: + self.logger.error(f"Something went wrong: {e}") - if infores == "infores:ars": - # handle the ARS polling - parent_pk = response.get("pk", "") - ars_responses, pks = await self.get_ars_responses(parent_pk, base_url) - responses.update(ars_responses) - else: - single_infores = infores.split("infores:")[1] - # TODO: normalize this response - responses[single_infores] = { - "response": response, - "status_code": status_code, - } + if infores == "infores:ars": + # handle the ARS polling + parent_pk = response.get("pk", "") + ars_responses, pks = self.get_ars_responses(parent_pk, base_url) + responses.update(ars_responses) + else: + single_infores = infores.split("infores:")[1] + # TODO: normalize this response + responses[single_infores] = { + "response": response, + "status_code": status_code, + } return query_hash, responses, pks - async def get_ars_child_response( + def get_ars_child_response( self, child_pk: str, base_url: str, @@ -100,8 +99,8 @@ async def get_ars_child_response( # while we stay within the query max time while current_time - start_time <= MAX_ARA_TIME: # get query status of child query - async with httpx.AsyncClient(timeout=30) as client: - res = await client.get(f"{base_url}/ars/api/messages/{child_pk}") + with httpx.Client(timeout=30) as client: + res = client.get(f"{base_url}/ars/api/messages/{child_pk}") res.raise_for_status() response = res.json() status = response.get("fields", {}).get("status") @@ -113,7 +112,7 @@ async def get_ars_child_response( elif status == "Running": self.logger.info(f"{infores} is still Running...") current_time = time.time() - await asyncio.sleep(10) + time.sleep(10) else: self.logger.info(f"Got unhandled status: {status}") break @@ -151,7 +150,7 @@ async def get_ars_child_response( return infores, response - async def get_ars_responses( + def get_ars_responses( self, parent_pk: str, base_url: str ) -> Tuple[Dict[str, dict], Dict[str, str]]: """Given a parent pk, get responses for all ARS things.""" @@ -159,43 +158,36 @@ async def get_ars_responses( pks = { "parent_pk": parent_pk, } - async with httpx.AsyncClient(timeout=30) as client: + with httpx.Client(timeout=30) as client: # retain this response for testing - res = await client.post(f"{base_url}/ars/api/retain/{parent_pk}") + res = client.post(f"{base_url}/ars/api/retain/{parent_pk}") res.raise_for_status() # Get all children queries - res = await client.get(f"{base_url}/ars/api/messages/{parent_pk}?trace=y") + res = client.get(f"{base_url}/ars/api/messages/{parent_pk}?trace=y") res.raise_for_status() response = res.json() start_time = time.time() - child_tasks = [] + child_responses = [] for child in response.get("children", []): child_pk = child["message"] infores = child["actor"]["inforesid"].split("infores:")[1] # add child pk pks[infores] = child_pk - child_tasks.append( + child_responses.append( self.get_ars_child_response(child_pk, base_url, infores, start_time) ) - try: - child_responses = await asyncio.gather(*child_tasks) - - for child_response in child_responses: - infores, response = child_response - responses[infores] = response - except Exception as e: - self.logger.warning(f"Failed to get all child responses: {e}") + for child_response in child_responses: + infores, response = child_response + responses[infores] = response try: # After getting all individual ARA responses, get and save the merged version current_time = time.time() while current_time - start_time <= MAX_QUERY_TIME: - async with httpx.AsyncClient(timeout=30) as client: - res = await client.get( - f"{base_url}/ars/api/messages/{parent_pk}?trace=y" - ) + with httpx.Client(timeout=30) as client: + res = client.get(f"{base_url}/ars/api/messages/{parent_pk}?trace=y") res.raise_for_status() response = res.json() status = response.get("status") @@ -214,9 +206,7 @@ async def get_ars_responses( # add final ars pk pks["ars"] = merged_pk # get full merged pk - res = await client.get( - f"{base_url}/ars/api/messages/{merged_pk}" - ) + res = client.get(f"{base_url}/ars/api/messages/{merged_pk}") res.raise_for_status() merged_message = res.json() responses["ars"] = { @@ -232,7 +222,7 @@ async def get_ars_responses( else: self.logger.info("ARS merging not done, waiting...") current_time = time.time() - await asyncio.sleep(10) + time.sleep(10) else: self.logger.warning( f"ARS merging took greater than {MAX_QUERY_TIME / 60} minutes." @@ -252,14 +242,13 @@ async def get_ars_responses( return responses, pks - async def run_queries( + def run_queries( self, test_case: Union[TestCase, PathfinderTestCase], - concurrency: int = 1, # for performance testing ) -> Tuple[Dict[int, dict], Dict[str, str]]: """Run all queries specified in a Test Case.""" # normalize all the curies in a test case - normalized_curies = await normalize_curies(test_case, self.logger) + normalized_curies = normalize_curies(test_case, self.logger) # TODO: figure out the right way to handle input category wrt normalization queries: Dict[int, dict] = {} @@ -291,28 +280,24 @@ async def run_queries( for component in test_case.components: # component = "ara" # loop over all specified components, i.e. ars, ara, kp, utilities - semaphore = asyncio.Semaphore(concurrency) self.logger.info( f"Sending queries to {self.registry[env_map[test_case.test_env]][component]}" ) - tasks = [ - asyncio.create_task( - self.run_query( - query_hash, - semaphore, - query["query"], - service["url"], - service["infores"], - ) - ) - for service in self.registry[env_map[test_case.test_env]][component] - for query_hash, query in queries.items() - ] try: - all_responses = await asyncio.gather(*tasks, return_exceptions=True) - for query_hash, responses, pks in all_responses: - queries[query_hash]["responses"].update(responses) - queries[query_hash]["pks"].update(pks) + all_responses = [] + for service in self.registry[env_map[test_case.test_env]][component]: + for query_hash, query in queries.items(): + all_responses.append( + self.run_query( + query_hash, + query["query"], + service["url"], + service["infores"], + ) + ) + for query_hash, responses, pks in all_responses: + queries[query_hash]["responses"].update(responses) + queries[query_hash]["pks"].update(pks) except Exception as e: self.logger.error(f"Something went wrong with the queries: {e}") diff --git a/test_harness/runner/smart_api_registry.py b/test_harness/runner/smart_api_registry.py index ab6e31c..0a00c88 100644 --- a/test_harness/runner/smart_api_registry.py +++ b/test_harness/runner/smart_api_registry.py @@ -1,16 +1,16 @@ """KP registry.""" -import asyncio -from collections import defaultdict -import httpx import json import logging import re +from collections import defaultdict + +import httpx LOGGER = logging.getLogger(__name__) -async def retrieve_registry_from_smartapi( +def retrieve_registry_from_smartapi( target_trapi_version="1.6.0", ): """Returns a dict of smart api service endpoints defined with a dict like @@ -21,11 +21,9 @@ async def retrieve_registry_from_smartapi( "version": version, } """ - async with httpx.AsyncClient(timeout=30) as client: + with httpx.Client(timeout=30) as client: try: - response = await client.get( - "https://smart-api.info/api/query?limit=1000&q=TRAPI" - ) + response = client.get("https://smart-api.info/api/query?limit=1000&q=TRAPI") response.raise_for_status() except httpx.HTTPError as e: LOGGER.error("Failed to query smart api. Exiting...") @@ -122,5 +120,5 @@ async def retrieve_registry_from_smartapi( if __name__ == "__main__": - registry = asyncio.run(retrieve_registry_from_smartapi()) + registry = retrieve_registry_from_smartapi() print(json.dumps(registry)) diff --git a/test_harness/slacker.py b/test_harness/slacker.py index 968634b..d470c57 100644 --- a/test_harness/slacker.py +++ b/test_harness/slacker.py @@ -1,11 +1,12 @@ """Slack notification integration class.""" -import httpx import json import os -from slack_sdk import WebClient import tempfile +import httpx +from slack_sdk import WebClient + class Slacker: """Slack notification poster.""" @@ -18,7 +19,7 @@ def __init__(self, url=None, token=None, slack_channel=None): slack_token = token if token is not None else os.getenv("SLACK_TOKEN") self.client = WebClient(slack_token) - async def post_notification(self, messages=[]): + def post_notification(self, messages=[]): """Post a notification to Slack.""" # https://gist.github.com/mrjk/079b745c4a8a118df756b127d6499aa0 blocks = [] @@ -32,8 +33,8 @@ async def post_notification(self, messages=[]): }, } ) - async with httpx.AsyncClient() as client: - res = await client.post( + with httpx.Client() as client: + res = client.post( url=self.url, json={ "text": ", ".join(block["text"]["text"] for block in blocks), @@ -41,7 +42,7 @@ async def post_notification(self, messages=[]): }, ) - async def upload_test_results_file(self, filename, extension, results): + def upload_test_results_file(self, filename, extension, results): """Upload a results file to Slack.""" with tempfile.TemporaryDirectory() as td: tmp_path = os.path.join(td, f"{filename}.{extension}") diff --git a/test_harness/utils.py b/test_harness/utils.py index e78041a..e1e085c 100644 --- a/test_harness/utils.py +++ b/test_harness/utils.py @@ -1,14 +1,14 @@ """General utilities for the Test Harness.""" -import httpx import logging -from typing import Dict, Union, List, Tuple +from typing import Dict, List, Tuple, Union +import httpx from translator_testing_model.datamodel.pydanticmodel import ( - TestCase, + PathfinderTestAsset, PathfinderTestCase, TestAsset, - PathfinderTestAsset, + TestCase, ) NODE_NORM_URL = { @@ -19,7 +19,7 @@ } -async def normalize_curies( +def normalize_curies( test: Union[TestCase, PathfinderTestCase], logger: logging.Logger = logging.getLogger(__name__), ) -> Dict[str, Dict[str, Union[Dict[str, str], List[str]]]]: @@ -43,9 +43,9 @@ async def normalize_curies( curies.add(test.test_case_input_id) normalized_curies = {} - async with httpx.AsyncClient() as client: + with httpx.Client() as client: try: - response = await client.post( + response = client.post( node_norm + "/get_normalized_nodes", json={ "curies": list(curies), diff --git a/tests/helpers/example_tests.py b/tests/helpers/example_tests.py index 40eb295..297e8ea 100644 --- a/tests/helpers/example_tests.py +++ b/tests/helpers/example_tests.py @@ -2,7 +2,7 @@ from translator_testing_model.datamodel.pydanticmodel import TestSuite -example_test_cases = TestSuite.parse_obj( +example_test_cases = TestSuite.model_validate( { "id": "TestSuite_1", "name": None, @@ -110,7 +110,6 @@ "description": "imatinib to asthma", "tags": [], "test_runner_settings": ["pathfinder"], - "query_type": None, "test_assets": [ { "id": "PTFQ_1", @@ -136,31 +135,10 @@ {"ids": ["NCBIGene:4254"], "name": "SCF-1"}, {"ids": ["CL:0000097"], "name": "Mast Cell"}, ], - "association": None, - "qualifiers": None, "expected_output": "TopAnswer", - "test_issue": None, - "semantic_severity": None, - "in_v1": None, - "well_known": False, - "test_reference": None, - "test_metadata": { - "id": "1", - "name": None, - "description": None, - "tags": [], - "test_runner_settings": [], - "test_source": "SMURF", - "test_reference": None, - "test_objective": "AcceptanceTest", - "test_annotations": [], - }, } ], - "preconditions": [], - "trapi_template": None, "test_case_objective": "AcceptanceTest", - "test_case_source": None, "components": ["ars"], "test_env": "ci", }, diff --git a/tests/helpers/mocks.py b/tests/helpers/mocks.py index 1e4288a..920b557 100644 --- a/tests/helpers/mocks.py +++ b/tests/helpers/mocks.py @@ -1,4 +1,12 @@ +from typing import Dict +from translator_testing_model.datamodel.pydanticmodel import ( + PathfinderTestAsset, + PathfinderTestCase, + TestAsset, + TestCase, +) from test_harness.reporter import Reporter +from test_harness.result_collector import ResultCollector from test_harness.slacker import Slacker from test_harness.runner.query_runner import QueryRunner @@ -10,34 +18,34 @@ def __init__(self, base_url=None, refresh_token=None, logger=None): self.test_run_id = 1 pass - async def get_auth(self): + def get_auth(self): pass - async def create_test_run(self, test_env, suite_name): + def create_test_run(self, test_env, suite_name): return 1 - async def create_test(self, test, asset): + def create_test(self, test, asset): return 2 - async def upload_labels(self, test_id, labels): + def upload_labels(self, test_id, labels): pass - async def upload_logs(self, test_id, logs): + def upload_logs(self, test_id, logs): pass - async def upload_artifact_references(self, test_id, artifact_references): + def upload_artifact_references(self, test_id, artifact_references): pass - async def upload_screenshots(self, test_id, screenshot): + def upload_screenshots(self, test_id, screenshot): pass - async def upload_log(self, test_id, message): + def upload_log(self, test_id, message): pass - async def finish_test(self, test_id, result): + def finish_test(self, test_id, result): return result - async def finish_test_run(self): + def finish_test_run(self): pass @@ -45,16 +53,16 @@ class MockSlacker(Slacker): def __init__(self): pass - async def post_notification(self, messages=[]): + def post_notification(self, messages=[]): print(f"posting messages: {messages}") pass - async def upload_test_results_file(self, filename, extension, results): + def upload_test_results_file(self, filename, extension, results): pass class MockQueryRunner(QueryRunner): - async def retrieve_registry(self, trapi_version: str): + def retrieve_registry(self, trapi_version: str): self.registry = { "staging": { "ars": [ @@ -67,3 +75,28 @@ async def retrieve_registry(self, trapi_version: str): ], }, } + + +class MockResultCollector(ResultCollector): + def collect_acceptance_result( + self, + test: TestCase | PathfinderTestCase, + asset: TestAsset | PathfinderTestAsset, + report: dict, + parent_pk: str | None, + url: str, + ): + return super().collect_acceptance_result(test, asset, report, parent_pk, url) + + def collect_performance_result( + self, + test: TestCase | PathfinderTestCase, + asset: TestAsset | PathfinderTestAsset, + url: str, + host_url: str, + results: Dict, + ): + return super().collect_performance_result(test, asset, url, host_url, results) + + def dump_result_summary(self): + return super().dump_result_summary() diff --git a/tests/test_main.py b/tests/test_main.py index d3e20b5..5554487 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -9,14 +9,13 @@ ) -@pytest.mark.asyncio -async def test_main(mocker): +def test_main(mocker): """Test the main function.""" # This article is awesome: https://nedbatchelder.com/blog/201908/why_your_mock_doesnt_work.html run_tests = mocker.patch("test_harness.main.run_tests", return_value={}) mocker.patch("test_harness.main.Slacker", return_value=MockSlacker()) mocker.patch("test_harness.main.Reporter", return_value=MockReporter()) - await main( + main( { "tests": example_test_cases, "suite": "testing", diff --git a/tests/test_run.py b/tests/test_run.py index 517eaa9..d5ad5e3 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -8,6 +8,7 @@ from .helpers.example_tests import example_test_cases from .helpers.mocks import ( MockReporter, + MockResultCollector, MockSlacker, MockQueryRunner, ) @@ -17,8 +18,7 @@ logger = setup_logger() -@pytest.mark.asyncio -async def test_run_tests(mocker, httpx_mock: HTTPXMock): +def test_run_tests(mocker, httpx_mock: HTTPXMock): """Test the run_tests function.""" # This article is awesome: https://nedbatchelder.com/blog/201908/why_your_mock_doesnt_work.html mocker.patch( @@ -44,16 +44,15 @@ async def test_run_tests(mocker, httpx_mock: HTTPXMock): "PR:000049994": None, }, ) - full_report = await run_tests( + run_tests( + tests=example_test_cases, reporter=MockReporter( base_url="http://test", ), - slacker=MockSlacker(), - tests=example_test_cases, + collector=MockResultCollector(logger), logger=logger, args={ "suite": "testing", "trapi_version": "1.6.0", }, ) - assert full_report["SKIPPED"] == 3