Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 126 additions & 48 deletions mypy/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import sys
import time
import types
from collections.abc import Callable, Iterator, Mapping, Sequence, Set as AbstractSet
from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set as AbstractSet
from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait
from heapq import heappop, heappush
from textwrap import dedent
from typing import (
Expand Down Expand Up @@ -947,6 +948,63 @@ def dump_stats(self) -> None:
for key, value in sorted(self.stats_summary().items()):
print(f"{key + ':':24}{value}")

def parse_all(self, states: Iterable[State]) -> None:
"""Parse multiple files in parallel (if possible) and compute dependencies.

Note: this duplicates a bit of logic from State.parse_file(). This is done
as a micro-optimization to parallelize only those parts of the code that
can be parallelized efficiently.
"""
if self.options.native_parser:
futures = []
parsed_states = set()
# TODO: we should probably use psutil instead.
# With psutil we can get a number of physical cores, while all stdlib
# functions include virtual cores (which is not optimal for performance).
available_threads = os.cpu_count() or 2 # conservative fallback
# For some reason there is no visible improvement with more than 8 threads.
# TODO: consider writing our own ThreadPool as an optimization.
with ThreadPoolExecutor(max_workers=min(available_threads, 8)) as executor:
for state in states:
state.needs_parse = False
if state.tree is not None:
# The file was already parsed.
continue
# New parser reads source from file directly, we do this only for
# the side effect of parsing inline mypy configurations.
state.get_source()
if state.id not in self.ast_cache:
futures.append(executor.submit(state.parse_file_inner, state.source or ""))
parsed_states.add(state)
else:
self.log(f"Using cached AST for {state.xpath} ({state.id})")
state.tree, state.early_errors = self.ast_cache[state.id]
for fut in wait(futures, return_when=FIRST_EXCEPTION).done:
# This will raise exceptions, if any.
fut.result()

for state in states:
assert state.tree is not None
if state in parsed_states:
state.early_errors = list(self.errors.error_info_map.get(state.xpath, []))
state.semantic_analysis_pass1()
self.ast_cache[state.id] = (state.tree, state.early_errors)
self.modules[state.id] = state.tree
state.check_blockers()
state.setup_errors()
else:
# Old parser cannot be parallelized.
for state in states:
state.parse_file()

for state in states:
state.compute_dependencies()
if self.workers and state.tree:
# We don't need imports in coordinator process anymore, we parse only to
# compute dependencies.
state.tree.imports = []
del self.ast_cache[state.id]

def use_fine_grained_cache(self) -> bool:
return self.cache_enabled and self.options.use_fine_grained_cache

Expand Down Expand Up @@ -2505,8 +2563,7 @@ def new_state(
# we need to re-calculate dependencies.
# NOTE: see comment below for why we skip this in fine-grained mode.
if exist_added_packages(suppressed, manager):
state.parse_file() # This is safe because the cache is anyway stale.
state.compute_dependencies()
state.needs_parse = True # This is safe because the cache is anyway stale.
# This is an inverse to the situation above. If we had an import like this:
# from pkg import mod
# and then mod was deleted, we need to force recompute dependencies, to
Expand All @@ -2515,8 +2572,7 @@ def new_state(
# import pkg
# import pkg.mod
if exist_removed_submodules(dependencies, manager):
state.parse_file() # Same as above, the current state is stale anyway.
state.compute_dependencies()
state.needs_parse = True # Same as above, the current state is stale anyway.
state.size_hint = meta.size
else:
# When doing a fine-grained cache load, pretend we only
Expand All @@ -2526,14 +2582,17 @@ def new_state(
manager.log(f"Deferring module to fine-grained update {path} ({id})")
raise ModuleNotFound

# Parse the file (and then some) to get the dependencies.
state.parse_file(temporary=temporary)
state.compute_dependencies()
if manager.workers and state.tree:
# We don't need imports in coordinator process anymore, we parse only to
# compute dependencies.
state.tree.imports = []
del manager.ast_cache[id]
if temporary:
# Eagerly parse temporary states, they are needed rarely.
state.parse_file(temporary=True)
state.compute_dependencies()
if state.manager.workers and state.tree:
# We don't need imports in coordinator process anymore, we parse only to
# compute dependencies.
state.tree.imports = []
del state.manager.ast_cache[state.id]
else:
state.needs_parse = True

return state

Expand Down Expand Up @@ -2596,6 +2655,8 @@ def __init__(
# Pre-computed opaque value of suppressed_deps_opts() used
# to minimize amount of data sent to parallel workers.
self.known_suppressed_deps_opts: bytes | None = None
# An internal flag used by build manager to schedule states for parsing.
self.needs_parse = False

def write(self, buf: WriteBuffer) -> None:
"""Serialize State for sending to build worker.
Expand Down Expand Up @@ -2819,26 +2880,9 @@ def fix_cross_refs(self) -> None:

# Methods for processing modules from source code.

def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = None) -> None:
"""Parse file and run first pass of semantic analysis.

Everything done here is local to the file. Don't depend on imported
modules in any way. Also record module dependencies based on imports.
"""
if self.tree is not None:
# The file was already parsed (in __init__()).
return

def get_source(self) -> str:
"""Get module source and parse inline mypy configurations."""
manager = self.manager

# Can we reuse a previously parsed AST? This avoids redundant work in daemon.
cached = self.id in manager.ast_cache
modules = manager.modules
if not cached:
manager.log(f"Parsing {self.xpath} ({self.id})")
else:
manager.log(f"Using cached AST for {self.xpath} ({self.id})")

t0 = time_ref()

with self.wrap_context():
Expand Down Expand Up @@ -2880,33 +2924,53 @@ def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None =
self.check_for_invalid_options()

self.size_hint = len(source)
if not cached:
ignore_errors = self.ignore_all or self.options.ignore_errors
self.tree = manager.parse_file(
self.id,
self.xpath,
source,
ignore_errors=ignore_errors,
options=self.options,
raw_data=raw_data,
)
else:
# Reuse a cached AST
self.tree = manager.ast_cache[self.id][0]
self.time_spent_us += time_spent_us(t0)
return source

def parse_file_inner(self, source: str, raw_data: FileRawData | None = None) -> None:
t0 = time_ref()
self.manager.log(f"Parsing {self.xpath} ({self.id})")
with self.wrap_context():
ignore_errors = self.ignore_all or self.options.ignore_errors
self.tree = self.manager.parse_file(
self.id,
self.xpath,
source,
ignore_errors=ignore_errors,
options=self.options,
raw_data=raw_data,
)
self.time_spent_us += time_spent_us(t0)

if not cached:
def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = None) -> None:
"""Parse file and run first pass of semantic analysis.

Everything done here is local to the file. Don't depend on imported
modules in any way. Logic here should be kept in sync with BuildManager.parse_all().
"""
self.needs_parse = False
if self.tree is not None:
# The file was already parsed.
return

source = self.get_source()
manager = self.manager
# Can we reuse a previously parsed AST? This avoids redundant work in daemon.
if self.id not in manager.ast_cache:
self.parse_file_inner(source, raw_data)
# Make a copy of any errors produced during parse time so that
# fine-grained mode can repeat them when the module is
# reprocessed.
self.early_errors = list(manager.errors.error_info_map.get(self.xpath, []))
self.semantic_analysis_pass1()
else:
self.early_errors = manager.ast_cache[self.id][1]
# Reuse a cached AST
manager.log(f"Using cached AST for {self.xpath} ({self.id})")
self.tree, self.early_errors = manager.ast_cache[self.id]

assert self.tree is not None
if not temporary:
modules[self.id] = self.tree
manager.modules[self.id] = self.tree
self.check_blockers()

manager.ast_cache[self.id] = (self.tree, self.early_errors)
Expand Down Expand Up @@ -3912,14 +3976,23 @@ def load_graph(
graph[st.id] = st
new.append(st)
entry_points.add(bs.module)
manager.parse_all([state for state in new if state.needs_parse])

# Note: Running this each time could be slow in the daemon. If it's a problem, we
# can do more work to maintain this incrementally.
seen_files = {st.abspath: st for st in graph.values() if st.path}

# Collect dependencies. We go breadth-first.
# More nodes might get added to new as we go, but that's fine.
ready = set(new)
not_ready: set[State] = set()
for st in new:
if st not in ready:
# We have run out of states, parse all we have.
assert st in not_ready
manager.parse_all(not_ready)
ready |= not_ready
not_ready.clear()
assert st.ancestors is not None
# Strip out indirect dependencies. These will be dealt with
# when they show up as direct dependencies, and there's a
Expand Down Expand Up @@ -3975,6 +4048,7 @@ def load_graph(
newst_path = newst.abspath

if newst_path in seen_files:
manager.errors.set_file(newst.xpath, newst.id, manager.options)
manager.error(
None,
"Source file found twice under different module names: "
Expand All @@ -3995,6 +4069,10 @@ def load_graph(
assert newst.id not in graph, newst.id
graph[newst.id] = newst
new.append(newst)
if newst.needs_parse:
not_ready.add(newst)
else:
ready.add(newst)
# There are two things we need to do after the initial load loop. One is up-suppress
# modules that are back in graph. We need to do this after the loop to cover edge cases
# like where a namespace package ancestor is shared by a typed and an untyped package.
Expand Down
2 changes: 1 addition & 1 deletion mypy/metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def close(self) -> None:
def connect_db(db_file: str, sync_off: bool = False) -> sqlite3.Connection:
import sqlite3.dbapi2

db = sqlite3.dbapi2.connect(db_file)
db = sqlite3.dbapi2.connect(db_file, check_same_thread=False)
if sync_off:
# This is a bit unfortunate (as we may get corrupt cache after e.g. Ctrl + C),
# but without this flag, commits are *very* slow, especially when using HDDs,
Expand Down
5 changes: 5 additions & 0 deletions mypy/nativeparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from __future__ import annotations

import os
import time
from typing import Any, Final, cast

import ast_serialize # type: ignore[import-untyped, import-not-found, unused-ignore]
Expand Down Expand Up @@ -273,6 +274,10 @@ def read_statements(state: State, data: ReadBuffer, n: int) -> list[Statement]:
def parse_to_binary_ast(
filename: str, options: Options, skip_function_bodies: bool = False
) -> tuple[bytes, list[dict[str, Any]], TypeIgnores, bytes, bool, bool]:
# This is a horrible hack to work around a mypyc bug where imported
# module may be not ready in a thread sometimes.
while ast_serialize is None:
time.sleep(0.0001) # type: ignore[unreachable]
ast_bytes, errors, ignores, import_bytes, ast_data = ast_serialize.parse(
filename,
skip_function_bodies=skip_function_bodies,
Expand Down
2 changes: 2 additions & 0 deletions mypy/test/testgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def test_sorted_components(self) -> None:
"c": State.new_state("c", None, "import b, d", manager),
"builtins": State.new_state("builtins", None, "", manager),
}
manager.parse_all(graph.values())
res = [scc.mod_ids for scc in sorted_components(graph)]
assert_equal(res, [{"builtins"}, {"d"}, {"c", "b"}, {"a"}])

Expand All @@ -129,6 +130,7 @@ def test_order_ascc(self) -> None:
"c": State.new_state("c", None, "import b, d", manager),
"builtins": State.new_state("builtins", None, "", manager),
}
manager.parse_all(graph.values())
res = [scc.mod_ids for scc in sorted_components(graph)]
assert_equal(res, [{"builtins"}, {"a", "d", "c", "b"}])
ascc = res[1]
Expand Down
Loading