diff --git a/mypy/build.py b/mypy/build.py index 98caaaec2dcf9..e2ddb124bbeae 100644 --- a/mypy/build.py +++ b/mypy/build.py @@ -26,7 +26,8 @@ import sys import time import types -from collections.abc import Callable, Iterator, Mapping, Sequence, Set as AbstractSet +from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set as AbstractSet +from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait from heapq import heappop, heappush from textwrap import dedent from typing import ( @@ -947,6 +948,63 @@ def dump_stats(self) -> None: for key, value in sorted(self.stats_summary().items()): print(f"{key + ':':24}{value}") + def parse_all(self, states: Iterable[State]) -> None: + """Parse multiple files in parallel (if possible) and compute dependencies. + + Note: this duplicates a bit of logic from State.parse_file(). This is done + as a micro-optimization to parallelize only those parts of the code that + can be parallelized efficiently. + """ + if self.options.native_parser: + futures = [] + parsed_states = set() + # TODO: we should probably use psutil instead. + # With psutil we can get a number of physical cores, while all stdlib + # functions include virtual cores (which is not optimal for performance). + available_threads = os.cpu_count() or 2 # conservative fallback + # For some reason there is no visible improvement with more than 8 threads. + # TODO: consider writing our own ThreadPool as an optimization. + with ThreadPoolExecutor(max_workers=min(available_threads, 8)) as executor: + for state in states: + state.needs_parse = False + if state.tree is not None: + # The file was already parsed. + continue + # New parser reads source from file directly, we do this only for + # the side effect of parsing inline mypy configurations. + state.get_source() + if state.id not in self.ast_cache: + futures.append(executor.submit(state.parse_file_inner, state.source or "")) + parsed_states.add(state) + else: + self.log(f"Using cached AST for {state.xpath} ({state.id})") + state.tree, state.early_errors = self.ast_cache[state.id] + for fut in wait(futures, return_when=FIRST_EXCEPTION).done: + # This will raise exceptions, if any. + fut.result() + + for state in states: + assert state.tree is not None + if state in parsed_states: + state.early_errors = list(self.errors.error_info_map.get(state.xpath, [])) + state.semantic_analysis_pass1() + self.ast_cache[state.id] = (state.tree, state.early_errors) + self.modules[state.id] = state.tree + state.check_blockers() + state.setup_errors() + else: + # Old parser cannot be parallelized. + for state in states: + state.parse_file() + + for state in states: + state.compute_dependencies() + if self.workers and state.tree: + # We don't need imports in coordinator process anymore, we parse only to + # compute dependencies. + state.tree.imports = [] + del self.ast_cache[state.id] + def use_fine_grained_cache(self) -> bool: return self.cache_enabled and self.options.use_fine_grained_cache @@ -2505,8 +2563,7 @@ def new_state( # we need to re-calculate dependencies. # NOTE: see comment below for why we skip this in fine-grained mode. if exist_added_packages(suppressed, manager): - state.parse_file() # This is safe because the cache is anyway stale. - state.compute_dependencies() + state.needs_parse = True # This is safe because the cache is anyway stale. # This is an inverse to the situation above. If we had an import like this: # from pkg import mod # and then mod was deleted, we need to force recompute dependencies, to @@ -2515,8 +2572,7 @@ def new_state( # import pkg # import pkg.mod if exist_removed_submodules(dependencies, manager): - state.parse_file() # Same as above, the current state is stale anyway. - state.compute_dependencies() + state.needs_parse = True # Same as above, the current state is stale anyway. state.size_hint = meta.size else: # When doing a fine-grained cache load, pretend we only @@ -2526,14 +2582,17 @@ def new_state( manager.log(f"Deferring module to fine-grained update {path} ({id})") raise ModuleNotFound - # Parse the file (and then some) to get the dependencies. - state.parse_file(temporary=temporary) - state.compute_dependencies() - if manager.workers and state.tree: - # We don't need imports in coordinator process anymore, we parse only to - # compute dependencies. - state.tree.imports = [] - del manager.ast_cache[id] + if temporary: + # Eagerly parse temporary states, they are needed rarely. + state.parse_file(temporary=True) + state.compute_dependencies() + if state.manager.workers and state.tree: + # We don't need imports in coordinator process anymore, we parse only to + # compute dependencies. + state.tree.imports = [] + del state.manager.ast_cache[state.id] + else: + state.needs_parse = True return state @@ -2596,6 +2655,8 @@ def __init__( # Pre-computed opaque value of suppressed_deps_opts() used # to minimize amount of data sent to parallel workers. self.known_suppressed_deps_opts: bytes | None = None + # An internal flag used by build manager to schedule states for parsing. + self.needs_parse = False def write(self, buf: WriteBuffer) -> None: """Serialize State for sending to build worker. @@ -2819,26 +2880,9 @@ def fix_cross_refs(self) -> None: # Methods for processing modules from source code. - def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = None) -> None: - """Parse file and run first pass of semantic analysis. - - Everything done here is local to the file. Don't depend on imported - modules in any way. Also record module dependencies based on imports. - """ - if self.tree is not None: - # The file was already parsed (in __init__()). - return - + def get_source(self) -> str: + """Get module source and parse inline mypy configurations.""" manager = self.manager - - # Can we reuse a previously parsed AST? This avoids redundant work in daemon. - cached = self.id in manager.ast_cache - modules = manager.modules - if not cached: - manager.log(f"Parsing {self.xpath} ({self.id})") - else: - manager.log(f"Using cached AST for {self.xpath} ({self.id})") - t0 = time_ref() with self.wrap_context(): @@ -2880,33 +2924,53 @@ def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = self.check_for_invalid_options() self.size_hint = len(source) - if not cached: - ignore_errors = self.ignore_all or self.options.ignore_errors - self.tree = manager.parse_file( - self.id, - self.xpath, - source, - ignore_errors=ignore_errors, - options=self.options, - raw_data=raw_data, - ) - else: - # Reuse a cached AST - self.tree = manager.ast_cache[self.id][0] + self.time_spent_us += time_spent_us(t0) + return source + def parse_file_inner(self, source: str, raw_data: FileRawData | None = None) -> None: + t0 = time_ref() + self.manager.log(f"Parsing {self.xpath} ({self.id})") + with self.wrap_context(): + ignore_errors = self.ignore_all or self.options.ignore_errors + self.tree = self.manager.parse_file( + self.id, + self.xpath, + source, + ignore_errors=ignore_errors, + options=self.options, + raw_data=raw_data, + ) self.time_spent_us += time_spent_us(t0) - if not cached: + def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = None) -> None: + """Parse file and run first pass of semantic analysis. + + Everything done here is local to the file. Don't depend on imported + modules in any way. Logic here should be kept in sync with BuildManager.parse_all(). + """ + self.needs_parse = False + if self.tree is not None: + # The file was already parsed. + return + + source = self.get_source() + manager = self.manager + # Can we reuse a previously parsed AST? This avoids redundant work in daemon. + if self.id not in manager.ast_cache: + self.parse_file_inner(source, raw_data) # Make a copy of any errors produced during parse time so that # fine-grained mode can repeat them when the module is # reprocessed. self.early_errors = list(manager.errors.error_info_map.get(self.xpath, [])) self.semantic_analysis_pass1() else: - self.early_errors = manager.ast_cache[self.id][1] + # Reuse a cached AST + manager.log(f"Using cached AST for {self.xpath} ({self.id})") + self.tree, self.early_errors = manager.ast_cache[self.id] + assert self.tree is not None if not temporary: - modules[self.id] = self.tree + manager.modules[self.id] = self.tree self.check_blockers() manager.ast_cache[self.id] = (self.tree, self.early_errors) @@ -3912,6 +3976,7 @@ def load_graph( graph[st.id] = st new.append(st) entry_points.add(bs.module) + manager.parse_all([state for state in new if state.needs_parse]) # Note: Running this each time could be slow in the daemon. If it's a problem, we # can do more work to maintain this incrementally. @@ -3919,7 +3984,15 @@ def load_graph( # Collect dependencies. We go breadth-first. # More nodes might get added to new as we go, but that's fine. + ready = set(new) + not_ready: set[State] = set() for st in new: + if st not in ready: + # We have run out of states, parse all we have. + assert st in not_ready + manager.parse_all(not_ready) + ready |= not_ready + not_ready.clear() assert st.ancestors is not None # Strip out indirect dependencies. These will be dealt with # when they show up as direct dependencies, and there's a @@ -3975,6 +4048,7 @@ def load_graph( newst_path = newst.abspath if newst_path in seen_files: + manager.errors.set_file(newst.xpath, newst.id, manager.options) manager.error( None, "Source file found twice under different module names: " @@ -3995,6 +4069,10 @@ def load_graph( assert newst.id not in graph, newst.id graph[newst.id] = newst new.append(newst) + if newst.needs_parse: + not_ready.add(newst) + else: + ready.add(newst) # There are two things we need to do after the initial load loop. One is up-suppress # modules that are back in graph. We need to do this after the loop to cover edge cases # like where a namespace package ancestor is shared by a typed and an untyped package. diff --git a/mypy/metastore.py b/mypy/metastore.py index 64839bf8a79c0..58a63d0a9ca42 100644 --- a/mypy/metastore.py +++ b/mypy/metastore.py @@ -157,7 +157,7 @@ def close(self) -> None: def connect_db(db_file: str, sync_off: bool = False) -> sqlite3.Connection: import sqlite3.dbapi2 - db = sqlite3.dbapi2.connect(db_file) + db = sqlite3.dbapi2.connect(db_file, check_same_thread=False) if sync_off: # This is a bit unfortunate (as we may get corrupt cache after e.g. Ctrl + C), # but without this flag, commits are *very* slow, especially when using HDDs, diff --git a/mypy/nativeparse.py b/mypy/nativeparse.py index 3ff184155bac2..5b73d76fde36c 100644 --- a/mypy/nativeparse.py +++ b/mypy/nativeparse.py @@ -20,6 +20,7 @@ from __future__ import annotations import os +import time from typing import Any, Final, cast import ast_serialize # type: ignore[import-untyped, import-not-found, unused-ignore] @@ -273,6 +274,10 @@ def read_statements(state: State, data: ReadBuffer, n: int) -> list[Statement]: def parse_to_binary_ast( filename: str, options: Options, skip_function_bodies: bool = False ) -> tuple[bytes, list[dict[str, Any]], TypeIgnores, bytes, bool, bool]: + # This is a horrible hack to work around a mypyc bug where imported + # module may be not ready in a thread sometimes. + while ast_serialize is None: + time.sleep(0.0001) # type: ignore[unreachable] ast_bytes, errors, ignores, import_bytes, ast_data = ast_serialize.parse( filename, skip_function_bodies=skip_function_bodies, diff --git a/mypy/test/testgraph.py b/mypy/test/testgraph.py index 491fcf427e65d..ae5cb2111da4d 100644 --- a/mypy/test/testgraph.py +++ b/mypy/test/testgraph.py @@ -117,6 +117,7 @@ def test_sorted_components(self) -> None: "c": State.new_state("c", None, "import b, d", manager), "builtins": State.new_state("builtins", None, "", manager), } + manager.parse_all(graph.values()) res = [scc.mod_ids for scc in sorted_components(graph)] assert_equal(res, [{"builtins"}, {"d"}, {"c", "b"}, {"a"}]) @@ -129,6 +130,7 @@ def test_order_ascc(self) -> None: "c": State.new_state("c", None, "import b, d", manager), "builtins": State.new_state("builtins", None, "", manager), } + manager.parse_all(graph.values()) res = [scc.mod_ids for scc in sorted_components(graph)] assert_equal(res, [{"builtins"}, {"a", "d", "c", "b"}]) ascc = res[1]