From 1ba2a7eeb2f825d7472bad4ecb019beff7ad2960 Mon Sep 17 00:00:00 2001 From: A_A <21040751+Otto-AA@users.noreply.github.com> Date: Sun, 29 Jun 2025 14:07:16 +0200 Subject: [PATCH 1/3] Refactor to support starting processes with 'spawn' This should allow running mutmut on windows. Timeouts are disabled while this is WIP. --- e2e_projects/config/pyproject.toml | 2 +- e2e_projects/my_lib/pyproject.toml | 2 +- mutmut/__main__.py | 128 ++++++++++++++++++----------- 3 files changed, 81 insertions(+), 51 deletions(-) diff --git a/e2e_projects/config/pyproject.toml b/e2e_projects/config/pyproject.toml index 1a3220c8..4eab40b9 100644 --- a/e2e_projects/config/pyproject.toml +++ b/e2e_projects/config/pyproject.toml @@ -23,7 +23,7 @@ dev = [ ] [tool.mutmut] -debug = true +debug = false paths_to_mutate = [ "config_pkg/" ] do_not_mutate = [ "*ignore*" ] also_copy = [ "data" ] diff --git a/e2e_projects/my_lib/pyproject.toml b/e2e_projects/my_lib/pyproject.toml index 1f1ce736..62693630 100644 --- a/e2e_projects/my_lib/pyproject.toml +++ b/e2e_projects/my_lib/pyproject.toml @@ -17,4 +17,4 @@ dev = [ ] [tool.mutmut] -debug = true \ No newline at end of file +debug = false \ No newline at end of file diff --git a/mutmut/__main__.py b/mutmut/__main__.py index 24646d7a..9f2b4627 100644 --- a/mutmut/__main__.py +++ b/mutmut/__main__.py @@ -4,7 +4,9 @@ import inspect import itertools import json -from multiprocessing import Pool, set_start_method +from multiprocessing import Pool, Process, set_start_method +import multiprocessing +import multiprocessing.connection import os import resource import shutil @@ -68,7 +70,7 @@ status_by_exit_code = { 1: 'killed', 3: 'killed', # internal error in pytest means a kill - -24: 'killed', + -24: 'timeout', # SIGXCPU (via timeout handler thread) 0: 'survived', 5: 'no tests', 2: 'check was interrupted by user', @@ -187,12 +189,16 @@ def copy_src_dir(): def create_mutants(max_children: int): - with Pool(processes=max_children) as p: + with Pool(processes=max_children, initializer=_setup_globals, initargs=_get_global_args()) as p: p.map(create_file_mutants, walk_source_files()) +def _get_global_args(): + return (mutmut.config, ) + +def _setup_globals(config): + mutmut.config = config def create_file_mutants(path: Path): - print(path) output_path = Path('mutants') / path makedirs(output_path.parent, exist_ok=True) @@ -868,9 +874,10 @@ def inner_timout_checker(): now = datetime.now() for m, mutant_name, result in mutants: + # TODO: this is not multiprocessing safe for pid, start_time in m.start_time_by_pid.items(): run_time = now - start_time - if run_time.total_seconds() > (m.estimated_time_of_tests_by_mutant[mutant_name] + 1) * 4: + if run_time.total_seconds() > 100 + (m.estimated_time_of_tests_by_mutant[mutant_name] + 1) * 4: try: os.kill(pid, signal.SIGXCPU) except ProcessLookupError: @@ -882,8 +889,7 @@ def inner_timout_checker(): @click.option('--max-children', type=int) @click.argument('mutant_names', required=False, nargs=-1) def run(mutant_names, *, max_children): - # used to copy the global mutmut.config to subprocesses - set_start_method('fork') + set_start_method('spawn') assert isinstance(mutant_names, (tuple, list)), mutant_names _run(mutant_names, max_children) @@ -948,14 +954,26 @@ def _run(mutant_names: Union[tuple, list], max_children: Union[None, int]): runner.prepare_main_test_run() - def read_one_child_exit_status(): - pid, wait_status = os.wait() - exit_code = os.waitstatus_to_exitcode(wait_status) - if mutmut.config.debug: - print(' worker exit code', exit_code) - source_file_mutation_data_by_pid[pid].register_result(pid=pid, exit_code=exit_code) + running_processes: set[Process] = set() + + def handle_finished_processes() -> int: + nonlocal running_processes + sentinels = [p.sentinel for p in running_processes] + multiprocessing.connection.wait(sentinels) + + finished_processes = {p for p in running_processes if not p.is_alive()} + running_processes -= finished_processes + + for p in finished_processes: + if mutmut.config.debug: + print(' worker exit code', p.exitcode) + source_file_mutation_data_by_pid[p.pid].register_result(pid=p.pid, exit_code=p.exitcode) - source_file_mutation_data_by_pid: Dict[int, SourceFileMutationData] = {} # many pids map to one MutationData + p.close() + + return len(finished_processes) + + source_file_mutation_data_by_pid: dict[int, SourceFileMutationData] = {} # many pids map to one MutationData running_children = 0 count_tried = 0 @@ -975,7 +993,8 @@ def read_one_child_exit_status(): estimated_time_of_tests = sum(mutmut.duration_by_test[test_name] for test_name in tests) m.estimated_time_of_tests_by_mutant[mutant_name] = estimated_time_of_tests - Thread(target=timeout_checker(mutants), daemon=True).start() + # TODO: implement timeout for windows + unix + # Thread(target=timeout_checker(mutants), daemon=True).start() # Now do mutation for m, mutant_name, result in mutants: @@ -988,51 +1007,34 @@ def read_one_child_exit_status(): continue tests = mutmut.tests_by_mangled_function_name.get(mangled_name_from_mutant_name(mutant_name), []) + # Run fast tests first + tests = sorted(tests, key=lambda test_name: mutmut.duration_by_test[test_name]) - # print(tests) if not tests: m.exit_code_by_key[mutant_name] = 33 m.save() continue - pid = os.fork() - if not pid: - # In the child - os.environ['MUTANT_UNDER_TEST'] = mutant_name - setproctitle(f'mutmut: {mutant_name}') - - # Run fast tests first - tests = sorted(tests, key=lambda test_name: mutmut.duration_by_test[test_name]) - if not tests: - os._exit(33) - - estimated_time_of_tests = m.estimated_time_of_tests_by_mutant[mutant_name] - cpu_time_limit = ceil((estimated_time_of_tests + 1) * 2 + process_time()) * 10 - resource.setrlimit(resource.RLIMIT_CPU, (cpu_time_limit, cpu_time_limit)) - - with CatchOutput(): - result = runner.run_tests(mutant_name=mutant_name, tests=tests) - - if result != 0: - # TODO: write failure information to stdout? - pass - os._exit(result) - else: - # in the parent - source_file_mutation_data_by_pid[pid] = m - m.register_pid(pid=pid, key=mutant_name, estimated_time_of_tests=estimated_time_of_tests) - running_children += 1 + p = Process(target=_test_mutation, args=(runner, m, mutant_name, tests, mutmut.config)) + running_processes.add(p) + p.start() + pid = p.pid + # in the parent + source_file_mutation_data_by_pid[pid] = m + m.register_pid(pid=pid, key=mutant_name, estimated_time_of_tests=estimated_time_of_tests) + running_children += 1 if running_children >= max_children: - read_one_child_exit_status() - count_tried += 1 - running_children -= 1 + count_finished = handle_finished_processes() + count_tried += count_finished + running_children -= count_finished try: while running_children: - read_one_child_exit_status() - count_tried += 1 - running_children -= 1 + print_stats(source_file_mutation_data_by_path) + count_finished = handle_finished_processes() + count_tried += count_finished + running_children -= count_finished except ChildProcessError: pass except KeyboardInterrupt: @@ -1060,6 +1062,34 @@ def read_one_child_exit_status(): print() +def _test_mutation(runner: TestRunner, m: SourceFileMutationData, mutant_name: str, tests, config): + try: + mutmut.config = config + + with CatchOutput(): + runner.list_all_tests() + + os.environ['MUTANT_UNDER_TEST'] = mutant_name + setproctitle(f'mutmut: {mutant_name}') + + if not tests: + result = 33 + else: + # TODO: implement timeout for windows + unix + # estimated_time_of_tests = m.estimated_time_of_tests_by_mutant[mutant_name] + # cpu_time_limit = ceil((estimated_time_of_tests + 1) * 2 + process_time()) * 10 + # resource.setrlimit(resource.RLIMIT_CPU, (cpu_time_limit, cpu_time_limit)) + + with CatchOutput(): + result = runner.run_tests(mutant_name=mutant_name, tests=tests) + os._exit(result) + except Exception as e: + with open(f'error.{mutant_name}.log', 'w') as log: + log.write(str(e)) + os._exit(-1) + + + def tests_for_mutant_names(mutant_names): tests = set() for mutant_name in mutant_names: From 8ed5a9c3f9db6db6339df5b2e252c0302c59934c Mon Sep 17 00:00:00 2001 From: A_A <21040751+Otto-AA@users.noreply.github.com> Date: Thu, 3 Jul 2025 19:14:57 +0200 Subject: [PATCH 2/3] use process pool for testing mutations --- mutmut/__main__.py | 101 +++++++++---------- mutmut/custom_process_pool.py | 131 +++++++++++++++++++++++++ mutmut/file_mutation.py | 2 + tests/e2e/test_e2e_result_snapshots.py | 3 + tests/test_custom_pool.py | 31 ++++++ 5 files changed, 218 insertions(+), 50 deletions(-) create mode 100644 mutmut/custom_process_pool.py create mode 100644 tests/test_custom_pool.py diff --git a/mutmut/__main__.py b/mutmut/__main__.py index 9f2b4627..4d356419 100644 --- a/mutmut/__main__.py +++ b/mutmut/__main__.py @@ -1,10 +1,13 @@ +import platform +from mutmut.custom_process_pool import Task +from mutmut.custom_process_pool import CustomProcessPool import ast import fnmatch import gc import inspect import itertools import json -from multiprocessing import Pool, Process, set_start_method +from multiprocessing import JoinableQueue, Pool, Process, Queue, set_start_method import multiprocessing import multiprocessing.connection import os @@ -47,7 +50,9 @@ ) from typing import ( Dict, + Generic, List, + TypeVar, Union, ) @@ -373,6 +378,8 @@ def new_tests(self): return self.ids - collected_test_names() +_pytest_initialized = False + class PytestRunner(TestRunner): # noinspection PyMethodMayBeStatic def execute_pytest(self, params: list[str], **kwargs): @@ -386,6 +393,10 @@ def execute_pytest(self, params: list[str], **kwargs): print(' exit code', exit_code) if exit_code == 4: raise BadTestExecutionCommandsException(params) + + global _pytest_initialized + _pytest_initialized = True + return exit_code def run_stats(self, *, tests): @@ -889,7 +900,10 @@ def inner_timout_checker(): @click.option('--max-children', type=int) @click.argument('mutant_names', required=False, nargs=-1) def run(mutant_names, *, max_children): - set_start_method('spawn') + if platform.system() == 'Windows': + set_start_method('spawn') + else: + set_start_method('fork') assert isinstance(mutant_names, (tuple, list)), mutant_names _run(mutant_names, max_children) @@ -956,23 +970,6 @@ def _run(mutant_names: Union[tuple, list], max_children: Union[None, int]): running_processes: set[Process] = set() - def handle_finished_processes() -> int: - nonlocal running_processes - sentinels = [p.sentinel for p in running_processes] - multiprocessing.connection.wait(sentinels) - - finished_processes = {p for p in running_processes if not p.is_alive()} - running_processes -= finished_processes - - for p in finished_processes: - if mutmut.config.debug: - print(' worker exit code', p.exitcode) - source_file_mutation_data_by_pid[p.pid].register_result(pid=p.pid, exit_code=p.exitcode) - - p.close() - - return len(finished_processes) - source_file_mutation_data_by_pid: dict[int, SourceFileMutationData] = {} # many pids map to one MutationData running_children = 0 count_tried = 0 @@ -996,6 +993,8 @@ def handle_finished_processes() -> int: # TODO: implement timeout for windows + unix # Thread(target=timeout_checker(mutants), daemon=True).start() + args: list[tuple[TestRunner, SourceFileMutationData, str, list[str], Config]] = [] + # Now do mutation for m, mutant_name, result in mutants: print_stats(source_file_mutation_data_by_path) @@ -1015,37 +1014,32 @@ def handle_finished_processes() -> int: m.save() continue - p = Process(target=_test_mutation, args=(runner, m, mutant_name, tests, mutmut.config)) - running_processes.add(p) - p.start() - pid = p.pid - # in the parent - source_file_mutation_data_by_pid[pid] = m - m.register_pid(pid=pid, key=mutant_name, estimated_time_of_tests=estimated_time_of_tests) - running_children += 1 - - if running_children >= max_children: - count_finished = handle_finished_processes() - count_tried += count_finished - running_children -= count_finished - - try: - while running_children: - print_stats(source_file_mutation_data_by_path) - count_finished = handle_finished_processes() - count_tried += count_finished - running_children -= count_finished - except ChildProcessError: - pass + args.append((runner, m, mutant_name, tests, mutmut.config)) + source_file_mutation_data_by_pid[mutant_name] = m + m.register_pid(pid=mutant_name, key=mutant_name, estimated_time_of_tests=estimated_time_of_tests) + + tasks: list[Task] = [] + for arg in args: + tasks.append(Task(id=arg[2], args=arg, timeout_seconds=1000)) + pool = CustomProcessPool(tasks, _test_mutation, max_children) + done = 0 + for finished_task in pool.run(): + done += 1 + # print(f'Finished {done} tasks') + if finished_task.error: + print(finished_task) + # print(finished_task) + source_file_mutation_data_by_pid[finished_task.id].register_result(pid=finished_task.id, exit_code=finished_task.result) + print_stats(source_file_mutation_data_by_path) except KeyboardInterrupt: + pool.shutdown() print('Stopping...') - stop_all_children(mutants) t = datetime.now() - start print_stats(source_file_mutation_data_by_path, force_output=True) print() - print(f'{count_tried / t.total_seconds():.2f} mutations/second') + print(f'{len(tasks) / t.total_seconds():.2f} mutations/second') if mutant_names: print() @@ -1062,12 +1056,18 @@ def handle_finished_processes() -> int: print() -def _test_mutation(runner: TestRunner, m: SourceFileMutationData, mutant_name: str, tests, config): +def _test_mutation(task: Task): + args: tuple[TestRunner, SourceFileMutationData, str, list[str], Config] = task.args + runner, m, mutant_name, tests, config = args try: mutmut.config = config - with CatchOutput(): - runner.list_all_tests() + # ensure that we imported all files at least once per process + # before we set MUTANT_UNDER_TEST (so everything that runs at import + # time is not mutated) + if not _pytest_initialized: + with CatchOutput(): + runner.list_all_tests() os.environ['MUTANT_UNDER_TEST'] = mutant_name setproctitle(f'mutmut: {mutant_name}') @@ -1082,13 +1082,14 @@ def _test_mutation(runner: TestRunner, m: SourceFileMutationData, mutant_name: s with CatchOutput(): result = runner.run_tests(mutant_name=mutant_name, tests=tests) - os._exit(result) + + return result + # os._exit(result) except Exception as e: with open(f'error.{mutant_name}.log', 'w') as log: log.write(str(e)) - os._exit(-1) - - + log.flush() + return -24 def tests_for_mutant_names(mutant_names): tests = set() diff --git a/mutmut/custom_process_pool.py b/mutmut/custom_process_pool.py new file mode 100644 index 00000000..fee43f44 --- /dev/null +++ b/mutmut/custom_process_pool.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +from typing import Generic, Union, Any, Callable, Iterable, TypeVar +from typing_extensions import ParamSpec +from dataclasses import dataclass + +import multiprocessing.connection +from multiprocessing import Queue, Process +import queue +import os + + +TaskArgs = ParamSpec('TaskArgs') +TaskResult = TypeVar('TaskResult') + +@dataclass +class Task: + id: str + args: tuple[Any, ...] + # this timeout is real time, not process cpu time + timeout_seconds: int + +@dataclass +class TaskError: + id: str + error: Exception + +@dataclass +class FinishedTask(Generic[TaskResult]): + id: str + result: Union[TaskResult, None] + error: Union[Exception, None] + +class JobTimeoutException(Exception): + pass + +class CustomProcessPool(Generic[TaskArgs, TaskResult]): + def __init__(self, tasks: list[Task], job: Callable[TaskArgs, TaskResult], max_workers: int): + self._tasks = tasks + self._job = job + self._remaining_tasks_queue: Queue[Task] = Queue() + self._remaining_tasks_count = len(tasks) + self._results: Queue[FinishedTask[TaskResult]] = Queue() + self._max_workers = max_workers + self._workers: set[Process] = set() + self._killed_workers = 0 + self._shutdown = False + + def run(self) -> Iterable[FinishedTask]: + for task in self._tasks: + self._remaining_tasks_queue.put(task) + + self._start_missing_workers() + + while not self.done() and not self._shutdown: + self._remove_stopped_workers() + self._start_missing_workers() + + yield from self._get_new_results(timeout=1) + + self.shutdown() + + def shutdown(self): + # TODO: is this a good way to shutdown processes? + for p in self._workers: + if p.is_alive(): + p.kill() + for p in self._workers: + p.join() + self._remaining_tasks_queue.close() + self._results.close() + self._shutdown = True + + def _start_missing_workers(self): + self._workers = {p for p in self._workers if p.is_alive()} + + desired_workers = min(self._max_workers, self._remaining_tasks_count) + missing_workers = desired_workers - len(self._workers) + + for _ in range(missing_workers): + self._start_worker() + + def _remove_stopped_workers(self): + """Start a new worker for all stopped workers. We kill workers for timeouts.""" + killed_workers = {p for p in self._workers if not p.is_alive()} + self._workers -= killed_workers + + for worker in killed_workers: + print(f'Worker {worker.pid} stopped with exitcode {worker.exitcode}') + + def _get_new_results(self, timeout: int) -> Iterable[FinishedTask]: + try: + result = self._results.get(timeout=timeout) + self._remaining_tasks_count -= 1 + yield result + except queue.Empty: + pass + + def _start_worker(self): + p = Process(target=CustomProcessPool._pool_job_executor, args=(self._job, self._remaining_tasks_queue, self._results)) + p.start() + self._workers.add(p) + + def done(self) -> bool: + return self._remaining_tasks_count == 0 + + @staticmethod + def _pool_job_executor(job: Callable[..., TaskResult], task_queue: Queue[Task], results: Queue[FinishedTask[TaskResult]]): + while True: + try: + task = task_queue.get(timeout=1) + # f = open(f'logs/log-{task.id}.txt', 'w') + # pid = os.getpid() + except queue.Empty: + os._exit(0) + + try: + result = job(task) + finished_task: FinishedTask[TaskResult] = FinishedTask(id=task.id, result=result, error=None) + except Exception as e: + finished_task = FinishedTask(id=task.id, result=None, error=e) + finally: + # f.write(f'Finished job: {finished_task}\n') + # f.flush() + results.put(finished_task) + # f.write(f'Added job to queue\n') + # f.write(f'Finished qsize: {results.qsize()}\n') + # f.flush() + + + diff --git a/mutmut/file_mutation.py b/mutmut/file_mutation.py index 7cc625fb..2540d01e 100644 --- a/mutmut/file_mutation.py +++ b/mutmut/file_mutation.py @@ -176,6 +176,8 @@ def combine_mutations_to_source(module: cst.Module, mutations: Sequence[Mutation :param mutations: Mutations that should be applied. :return: Mutated code and list of mutation names""" + # mutations = mutations[0:10] + # copy start of the module (in particular __future__ imports) result: list[MODULE_STATEMENT] = get_statements_until_func_or_class(module.body) mutation_names: list[str] = [] diff --git a/tests/e2e/test_e2e_result_snapshots.py b/tests/e2e/test_e2e_result_snapshots.py index 74cd4fa2..81018222 100644 --- a/tests/e2e/test_e2e_result_snapshots.py +++ b/tests/e2e/test_e2e_result_snapshots.py @@ -78,3 +78,6 @@ def test_my_lib_result_snapshot(): def test_config_result_snapshot(): mutmut._reset_globals() asserts_results_did_not_change("config") + +if __name__ == '__main__': + test_my_lib_result_snapshot() \ No newline at end of file diff --git a/tests/test_custom_pool.py b/tests/test_custom_pool.py new file mode 100644 index 00000000..005a3b83 --- /dev/null +++ b/tests/test_custom_pool.py @@ -0,0 +1,31 @@ +from mutmut.__main__ import CustomProcessPool, Task +import pytest +import time + +def test_custom_process_pool(): + tasks = [ + Task(id='a-small', args=(1, 2), timeout_seconds=1000), + Task(id='b-medium', args=(30, 20), timeout_seconds=1000), + Task(id='c-neg', args=(-2, -2), timeout_seconds=1000), + Task(id='d-div-by-zero', args=(-2, 0), timeout_seconds=1000), + ] + pool = CustomProcessPool(tasks, _divide, max_workers=2) + + results = [] + for result in pool.run(): + print(result) + results.append(result) + + assert len(results) == 4 + + results = sorted(results, key=lambda result: result.id) + assert results[0].result == pytest.approx(0.5) + assert results[1].result == pytest.approx(1.5) + assert results[2].result == pytest.approx(1) + assert results[3].result == None + assert isinstance(results[3].error, ZeroDivisionError) + +def _divide(task: Task): + a, b = task.args + # time.sleep(timeout) + return a / b From c7b38fb67765324954d3f1791b5f9da5203930bb Mon Sep 17 00:00:00 2001 From: A_A <21040751+Otto-AA@users.noreply.github.com> Date: Sun, 6 Jul 2025 19:40:42 +0200 Subject: [PATCH 3/3] Run CI tests also on windows --- .github/workflows/tests.yml | 6 +++--- e2e_projects/config/tests/test_main.py | 2 +- mutmut/__main__.py | 27 ++++++++++++++------------ tests/e2e/test_e2e_result_snapshots.py | 6 +++--- 4 files changed, 22 insertions(+), 19 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index cc295fdd..00db05bf 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -4,13 +4,13 @@ on: [push, pull_request] jobs: tests: - name: Test Python ${{ matrix.python-version }} - # Todo: Revert to ubuntu-latest when Python 3.7 support no longer needed - runs-on: ubuntu-22.04 + name: Test ${{ matrix.os }} - ${{ matrix.python-version }} + runs-on: ${{ matrix.os }} strategy: fail-fast: false matrix: python-version: ["3.13", "3.12", "3.11", "3.10", "3.9"] + os: [ubuntu-latest, windows-latest] steps: - name: Checkout uses: actions/checkout@v3 diff --git a/e2e_projects/config/tests/test_main.py b/e2e_projects/config/tests/test_main.py index ef066d0b..a900a389 100644 --- a/e2e_projects/config/tests/test_main.py +++ b/e2e_projects/config/tests/test_main.py @@ -21,6 +21,6 @@ def test_max_stack_depth(): def test_data_exists(): path = (Path("data") / "data.json").resolve() assert path.exists() - with open(path) as f: + with open(path, encoding='utf-8') as f: data = json.load(f) assert data['comment'] == 'this should be copied to the mutants folder' \ No newline at end of file diff --git a/mutmut/__main__.py b/mutmut/__main__.py index 4d356419..55b6b8b7 100644 --- a/mutmut/__main__.py +++ b/mutmut/__main__.py @@ -11,7 +11,6 @@ import multiprocessing import multiprocessing.connection import os -import resource import shutil import signal import subprocess @@ -230,14 +229,14 @@ def copy_also_copy_files(): def create_mutants_for_file(filename, output_path): input_stat = os.stat(filename) - with open(filename) as f: + with open(filename, encoding='utf-8') as f: source = f.read() - with open(output_path, 'w') as out: + with open(output_path, 'w', encoding='utf-8') as out: mutant_names, hash_by_function_name = write_all_mutants_to_file(out=out, source=source, filename=filename) # validate no syntax errors of mutants - with open(output_path) as f: + with open(output_path, encoding='utf-8') as f: try: ast.parse(f.read()) except (IndentationError, SyntaxError) as e: @@ -282,7 +281,7 @@ def __init__(self, *, path): def load(self): try: - with open(self.meta_path) as f: + with open(self.meta_path, encoding='utf-8') as f: self.meta = json.load(f) except FileNotFoundError: return @@ -309,7 +308,7 @@ def stop_children(self): os.kill(pid, SIGTERM) def save(self): - with open(self.meta_path, 'w') as f: + with open(self.meta_path, 'w', encoding='utf-8') as f: json.dump(dict( exit_code_by_key=self.exit_code_by_key, hash_by_function_name=self.hash_by_function_name, @@ -507,6 +506,9 @@ def status_printer(): last_update = [datetime(1900, 1, 1)] update_threshold = timedelta(seconds=0.1) + # support the spinner chars on windows + sys.__stdout__.reconfigure(encoding='utf-8') + def p(s, *, force_output=False): if not force_output and (datetime.now() - last_update[0]) < update_threshold: return @@ -781,7 +783,7 @@ def collect_or_load_stats(runner): def load_stats(): did_load = False try: - with open('mutants/mutmut-stats.json') as f: + with open('mutants/mutmut-stats.json', encoding='utf-8') as f: data = json.load(f) for k, v in data.pop('tests_by_mangled_function_name').items(): mutmut.tests_by_mangled_function_name[k] |= set(v) @@ -795,7 +797,7 @@ def load_stats(): def save_stats(): - with open('mutants/mutmut-stats.json', 'w') as f: + with open('mutants/mutmut-stats.json', 'w', encoding='utf-8') as f: json.dump(dict( tests_by_mangled_function_name={k: list(v) for k, v in mutmut.tests_by_mangled_function_name.items()}, duration_by_test=mutmut.duration_by_test, @@ -1078,6 +1080,7 @@ def _test_mutation(task: Task): # TODO: implement timeout for windows + unix # estimated_time_of_tests = m.estimated_time_of_tests_by_mutant[mutant_name] # cpu_time_limit = ceil((estimated_time_of_tests + 1) * 2 + process_time()) * 10 + # import resource # resource.setrlimit(resource.RLIMIT_CPU, (cpu_time_limit, cpu_time_limit)) with CatchOutput(): @@ -1086,7 +1089,7 @@ def _test_mutation(task: Task): return result # os._exit(result) except Exception as e: - with open(f'error.{mutant_name}.log', 'w') as log: + with open(f'error.{mutant_name}.log', 'w', encoding='utf-8') as log: log.write(str(e)) log.flush() return -24 @@ -1120,12 +1123,12 @@ def results(all): def read_mutants_module(path) -> cst.Module: - with open(Path('mutants') / path) as f: + with open(Path('mutants') / path, encoding='utf-8') as f: return cst.parse_module(f.read()) def read_orig_module(path) -> cst.Module: - with open(path) as f: + with open(path, encoding='utf-8') as f: return cst.parse_module(f.read()) @@ -1226,7 +1229,7 @@ def apply_mutant(mutant_name): new_module: cst.Module = orig_module.deep_replace(original_function, mutant_function) # type: ignore - with open(path, 'w') as f: + with open(path, 'w', encoding='utf-8') as f: f.write(new_module.code) diff --git a/tests/e2e/test_e2e_result_snapshots.py b/tests/e2e/test_e2e_result_snapshots.py index 81018222..313a3fdc 100644 --- a/tests/e2e/test_e2e_result_snapshots.py +++ b/tests/e2e/test_e2e_result_snapshots.py @@ -29,18 +29,18 @@ def read_all_stats_for_project(project_path: Path) -> dict[str, dict]: continue data = SourceFileMutationData(path=p) data.load() - stats[str(data.meta_path)] = data.exit_code_by_key + stats[str(data.meta_path.as_posix())] = data.exit_code_by_key return stats def read_json_file(path: Path): - with open(path, 'r') as file: + with open(path, 'r', encoding='utf-8') as file: return json.load(file) def write_json_file(path: Path, data: Any): - with open(path, 'w') as file: + with open(path, 'w', encoding='utf-8') as file: json.dump(data, file, indent=2)