-
-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Send early errors to workers #20822
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Send early errors to workers #20822
Changes from all commits
1dc730a
83f283e
f5efcf0
89f12ec
0278aae
a9ecc04
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -399,7 +399,7 @@ def default_flush_errors( | |
| finally: | ||
| for worker in workers: | ||
| try: | ||
| send(worker.conn, SccRequestMessage(scc_id=None)) | ||
| send(worker.conn, SccRequestMessage(scc_id=None, import_errors={})) | ||
| except (OSError, IPCException): | ||
| pass | ||
| for worker in workers: | ||
|
|
@@ -437,6 +437,9 @@ def build_inner( | |
| source_set = BuildSourceSet(sources) | ||
| cached_read = fscache.read | ||
| errors = Errors(options, read_source=lambda path: read_py_file(path, cached_read)) | ||
| # Record import errors so that they can be replayed by the workers. | ||
| if workers: | ||
| errors.global_watcher = True | ||
| plugin, snapshot = load_plugins(options, errors, stdout, extra_plugins) | ||
|
|
||
| # Validate error codes after plugins are loaded. | ||
|
|
@@ -904,6 +907,10 @@ def __init__( | |
| self.import_options: dict[str, bytes] = {} | ||
| # Cache for transitive dependency check (expensive). | ||
| self.transitive_deps_cache: dict[tuple[int, int], bool] = {} | ||
| # Resolved paths for each module in build. | ||
| self.path_by_id: dict[str, str] = {} | ||
| # Packages for which we know presence or absence of __getattr__(). | ||
| self.known_partial_packages: dict[str, bool] = {} | ||
|
|
||
| def dump_stats(self) -> None: | ||
| if self.options.dump_build_stats: | ||
|
|
@@ -1045,8 +1052,6 @@ def parse_file( | |
| if self.errors.is_blockers(): | ||
| self.log("Bailing due to parse errors") | ||
| self.errors.raise_error() | ||
|
|
||
| self.errors.set_file_ignored_lines(path, tree.ignored_lines, ignore_errors) | ||
| return tree | ||
|
|
||
| def load_fine_grained_deps(self, id: str) -> dict[str, set[str]]: | ||
|
|
@@ -1118,7 +1123,15 @@ def submit_to_workers(self, sccs: list[SCC] | None = None) -> None: | |
| while self.scc_queue and self.free_workers: | ||
| idx = self.free_workers.pop() | ||
| _, _, scc = heappop(self.scc_queue) | ||
| send(self.workers[idx].conn, SccRequestMessage(scc_id=scc.id)) | ||
| import_errors = { | ||
| mod_id: self.errors.recorded[path] | ||
| for mod_id in scc.mod_ids | ||
| if (path := self.path_by_id[mod_id]) in self.errors.recorded | ||
| } | ||
| send( | ||
| self.workers[idx].conn, | ||
| SccRequestMessage(scc_id=scc.id, import_errors=import_errors), | ||
| ) | ||
|
|
||
| def wait_for_done( | ||
| self, graph: Graph | ||
|
|
@@ -2399,8 +2412,10 @@ def new_state( | |
| state.compute_dependencies() | ||
| if manager.workers: | ||
| # We don't need parsed trees in coordinator process, we parse only to | ||
| # compute dependencies. | ||
| state.tree = None | ||
| # compute dependencies. Keep temporary tree until the caller uses it | ||
| if not temporary: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we only have the del statement behind the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, we need the tree until it is used by the caller who created temporary state to check for
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I decided to add a little cache for partial packages, otherwise it looks like a lot of potential overhead for a niche use case. |
||
| state.tree = None | ||
| del manager.modules[id] | ||
| del manager.ast_cache[id] | ||
|
|
||
| return state | ||
|
|
@@ -2533,7 +2548,8 @@ def read(cls, buf: ReadBuffer, manager: BuildManager) -> State: | |
| id=id, | ||
| path=path, | ||
| source=source, | ||
| options=manager.options.clone_for_module(id), | ||
| # The caller must call clone_for_module(). | ||
| options=manager.options, | ||
| ignore_all=ignore_all, | ||
| caller_line=caller_line, | ||
| import_context=import_context, | ||
|
|
@@ -2721,7 +2737,7 @@ def parse_file(self, *, temporary: bool = False) -> None: | |
| assert ioerr.errno is not None | ||
| raise CompileError( | ||
| [ | ||
| "mypy: can't read file '{}': {}".format( | ||
| "mypy: error: cannot read file '{}': {}".format( | ||
| self.path.replace(os.getcwd() + os.sep, ""), | ||
| os.strerror(ioerr.errno), | ||
| ) | ||
|
|
@@ -2730,9 +2746,9 @@ def parse_file(self, *, temporary: bool = False) -> None: | |
| ) from ioerr | ||
| except (UnicodeDecodeError, DecodeError) as decodeerr: | ||
| if self.path.endswith(".pyd"): | ||
| err = f"mypy: stubgen does not support .pyd files: '{self.path}'" | ||
| err = f"{self.path}: error: stubgen does not support .pyd files" | ||
| else: | ||
| err = f"mypy: can't decode file '{self.path}': {str(decodeerr)}" | ||
| err = f"{self.path}: error: cannot decode file: {str(decodeerr)}" | ||
| raise CompileError([err], module_with_blocker=self.id) from decodeerr | ||
| elif self.path and self.manager.fscache.isdir(self.path): | ||
| source = "" | ||
|
|
@@ -2746,22 +2762,13 @@ def parse_file(self, *, temporary: bool = False) -> None: | |
|
|
||
| self.size_hint = len(source) | ||
| if not cached: | ||
| ignore_errors = self.ignore_all or self.options.ignore_errors | ||
| self.tree = manager.parse_file( | ||
| self.id, | ||
| self.xpath, | ||
| source, | ||
| ignore_errors=self.ignore_all or self.options.ignore_errors, | ||
| options=self.options, | ||
| self.id, self.xpath, source, ignore_errors=ignore_errors, options=self.options | ||
| ) | ||
|
|
||
| else: | ||
| # Reuse a cached AST | ||
| self.tree = manager.ast_cache[self.id][0] | ||
| manager.errors.set_file_ignored_lines( | ||
| self.xpath, | ||
| self.tree.ignored_lines, | ||
| self.ignore_all or self.options.ignore_errors, | ||
| ) | ||
|
|
||
| self.time_spent_us += time_spent_us(t0) | ||
|
|
||
|
|
@@ -2770,19 +2777,23 @@ def parse_file(self, *, temporary: bool = False) -> None: | |
| # fine-grained mode can repeat them when the module is | ||
| # reprocessed. | ||
| self.early_errors = list(manager.errors.error_info_map.get(self.xpath, [])) | ||
| self.semantic_analysis_pass1() | ||
| else: | ||
| self.early_errors = manager.ast_cache[self.id][1] | ||
|
|
||
| if not temporary: | ||
| modules[self.id] = self.tree | ||
|
|
||
| if not cached: | ||
| self.semantic_analysis_pass1() | ||
|
|
||
| if not temporary: | ||
| self.check_blockers() | ||
|
|
||
| manager.ast_cache[self.id] = (self.tree, self.early_errors) | ||
| self.setup_errors() | ||
|
|
||
| def setup_errors(self) -> None: | ||
| assert self.tree is not None | ||
| self.manager.errors.set_file_ignored_lines( | ||
| self.xpath, self.tree.ignored_lines, self.ignore_all or self.options.ignore_errors | ||
| ) | ||
| self.manager.errors.set_skipped_lines(self.xpath, self.tree.skipped_lines) | ||
|
|
||
| def parse_inline_configuration(self, source: str) -> None: | ||
| """Check for inline mypy: options directive and parse them.""" | ||
|
|
@@ -2821,7 +2832,6 @@ def semantic_analysis_pass1(self) -> None: | |
| analyzer = SemanticAnalyzerPreAnalysis() | ||
| with self.wrap_context(): | ||
| analyzer.visit_file(self.tree, self.xpath, self.id, options) | ||
| self.manager.errors.set_skipped_lines(self.xpath, self.tree.skipped_lines) | ||
| # TODO: Do this while constructing the AST? | ||
| self.tree.names = SymbolTable() | ||
| if not self.tree.is_stub: | ||
|
|
@@ -3362,23 +3372,28 @@ def in_partial_package(id: str, manager: BuildManager) -> bool: | |
| defines a module-level __getattr__ (a.k.a. partial stub package). | ||
| """ | ||
| while "." in id: | ||
| parent, _ = id.rsplit(".", 1) | ||
| if parent in manager.modules: | ||
| parent_mod: MypyFile | None = manager.modules[parent] | ||
| ancestor, _ = id.rsplit(".", 1) | ||
| if ancestor in manager.known_partial_packages: | ||
| return manager.known_partial_packages[ancestor] | ||
| if ancestor in manager.modules: | ||
| ancestor_mod: MypyFile | None = manager.modules[ancestor] | ||
| else: | ||
| # Parent is not in build, try quickly if we can find it. | ||
| # Ancestor is not in build, try quickly if we can find it. | ||
| try: | ||
| parent_st = State.new_state( | ||
| id=parent, path=None, source=None, manager=manager, temporary=True | ||
| ancestor_st = State.new_state( | ||
| id=ancestor, path=None, source=None, manager=manager, temporary=True | ||
| ) | ||
| except (ModuleNotFound, CompileError): | ||
| parent_mod = None | ||
| ancestor_mod = None | ||
| else: | ||
| parent_mod = parent_st.tree | ||
| if parent_mod is not None: | ||
| ancestor_mod = ancestor_st.tree | ||
| # We will not need this anymore. | ||
| ancestor_st.tree = None | ||
| if ancestor_mod is not None: | ||
| # Bail out soon, complete subpackage found | ||
| return parent_mod.is_partial_stub_package | ||
| id = parent | ||
| manager.known_partial_packages[ancestor] = ancestor_mod.is_partial_stub_package | ||
| return ancestor_mod.is_partial_stub_package | ||
| id = ancestor | ||
| return False | ||
|
|
||
|
|
||
|
|
@@ -3537,7 +3552,7 @@ def dispatch(sources: list[BuildSource], manager: BuildManager, stdout: TextIO) | |
| initial_gc_freeze_done = True | ||
|
|
||
| for id in graph: | ||
| manager.import_map[id] = set(graph[id].dependencies + graph[id].suppressed) | ||
| manager.import_map[id] = graph[id].dependencies_set | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this change related to error processing or the earlier changes to how we process suppressed modules?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just a minor perf optimization (we never check for imports of suppressed deps after graph is loaded). I was scrolling past this line and decided it is not worth a separate PR. |
||
|
|
||
| t1 = time.time() | ||
| manager.add_stats( | ||
|
|
@@ -3839,6 +3854,8 @@ def load_graph( | |
| if dep not in graph: | ||
| st.suppress_dependency(dep) | ||
| manager.plugin.set_modules(manager.modules) | ||
| manager.path_by_id = {id: graph[id].xpath for id in graph} | ||
| manager.errors.global_watcher = False | ||
| return graph | ||
|
|
||
|
|
||
|
|
@@ -3966,7 +3983,9 @@ def find_stale_sccs( | |
| def process_graph(graph: Graph, manager: BuildManager) -> None: | ||
| """Process everything in dependency order.""" | ||
| # Broadcast graph to workers before computing SCCs to save a bit of time. | ||
| graph_message = GraphMessage(graph=graph) | ||
| # TODO: check if we can optimize by sending only part of the graph needed for given SCC. | ||
| # For example only send modules in the SCC and their dependencies. | ||
| graph_message = GraphMessage(graph=graph, missing_modules=set(manager.missing_modules)) | ||
| buf = WriteBuffer() | ||
| graph_message.write(buf) | ||
| graph_data = buf.getvalue() | ||
|
|
@@ -4108,7 +4127,7 @@ def process_fresh_modules(graph: Graph, modules: list[str], manager: BuildManage | |
|
|
||
|
|
||
| def process_stale_scc( | ||
| graph: Graph, ascc: SCC, manager: BuildManager | ||
| graph: Graph, ascc: SCC, manager: BuildManager, from_cache: set[str] | None = None | ||
| ) -> dict[str, tuple[str, list[str]]]: | ||
| """Process the modules in one SCC from source code.""" | ||
| # First verify if all transitive dependencies are loaded in the current process. | ||
|
|
@@ -4173,7 +4192,9 @@ def process_stale_scc( | |
| stale = scc | ||
| for id in stale: | ||
| # Re-generate import errors in case this module was loaded from the cache. | ||
| if graph[id].meta: | ||
| # Deserialized states all have meta=None, so the caller should specify | ||
| # explicitly which of them are from cache. | ||
| if graph[id].meta or from_cache and id in from_cache: | ||
| graph[id].verify_dependencies(suppressed_only=True) | ||
| # We may already have parsed the module, or not. | ||
| # If the former, parse_file() is a no-op. | ||
|
|
@@ -4436,17 +4457,30 @@ class SccRequestMessage(IPCMessage): | |
| If scc_id is None, then it means that the coordinator requested a shutdown. | ||
| """ | ||
|
|
||
| def __init__(self, *, scc_id: int | None) -> None: | ||
| def __init__(self, *, scc_id: int | None, import_errors: dict[str, list[ErrorInfo]]) -> None: | ||
| self.scc_id = scc_id | ||
| self.import_errors = import_errors | ||
|
|
||
| @classmethod | ||
| def read(cls, buf: ReadBuffer) -> SccRequestMessage: | ||
| assert read_tag(buf) == SCC_REQUEST_MESSAGE | ||
| return SccRequestMessage(scc_id=read_int_opt(buf)) | ||
| return SccRequestMessage( | ||
| scc_id=read_int_opt(buf), | ||
| import_errors={ | ||
| read_str(buf): [ErrorInfo.read(buf) for _ in range(read_int_bare(buf))] | ||
| for _ in range(read_int_bare(buf)) | ||
| }, | ||
| ) | ||
|
|
||
| def write(self, buf: WriteBuffer) -> None: | ||
| write_tag(buf, SCC_REQUEST_MESSAGE) | ||
| write_int_opt(buf, self.scc_id) | ||
| write_int_bare(buf, len(self.import_errors)) | ||
| for path, errors in self.import_errors.items(): | ||
| write_str(buf, path) | ||
| write_int_bare(buf, len(errors)) | ||
| for error in errors: | ||
| error.write(buf) | ||
|
|
||
|
|
||
| class SccResponseMessage(IPCMessage): | ||
|
|
@@ -4570,19 +4604,31 @@ def write(self, buf: WriteBuffer) -> None: | |
| class GraphMessage(IPCMessage): | ||
| """A message wrapping the build graph computed by the coordinator.""" | ||
|
|
||
| def __init__(self, *, graph: Graph) -> None: | ||
| def __init__(self, *, graph: Graph, missing_modules: set[str]) -> None: | ||
| self.graph = graph | ||
| self.missing_modules = missing_modules | ||
| # Send this data separately as it will be lost during state serialization. | ||
| self.from_cache = {mod_id for mod_id in graph if graph[mod_id].meta} | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like we might calculate this on demand using
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The receiver will not be able to do this after we stop calling |
||
|
|
||
| @classmethod | ||
| def read(cls, buf: ReadBuffer, manager: BuildManager | None = None) -> GraphMessage: | ||
| assert manager is not None | ||
| assert read_tag(buf) == GRAPH_MESSAGE | ||
| graph = {read_str_bare(buf): State.read(buf, manager) for _ in range(read_int_bare(buf))} | ||
| return GraphMessage(graph=graph) | ||
| missing_modules = {read_str_bare(buf) for _ in range(read_int_bare(buf))} | ||
| message = GraphMessage(graph=graph, missing_modules=missing_modules) | ||
| message.from_cache = {read_str_bare(buf) for _ in range(read_int_bare(buf))} | ||
| return message | ||
|
|
||
| def write(self, buf: WriteBuffer) -> None: | ||
| write_tag(buf, GRAPH_MESSAGE) | ||
| write_int_bare(buf, len(self.graph)) | ||
| for mod_id, state in self.graph.items(): | ||
| write_str_bare(buf, mod_id) | ||
| state.write(buf) | ||
| write_int_bare(buf, len(self.missing_modules)) | ||
| for module in self.missing_modules: | ||
| write_str_bare(buf, module) | ||
| write_int_bare(buf, len(self.from_cache)) | ||
| for module in self.from_cache: | ||
| write_str_bare(buf, module) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -112,6 +112,12 @@ def main(argv: list[str]) -> None: | |
|
|
||
|
|
||
| def serve(server: IPCServer, ctx: ServerContext) -> None: | ||
| """Main server loop of the worker. | ||
|
|
||
| Receive initial state from the coordinator, then process each | ||
| SCC checking request and reply to client (coordinator). See module | ||
| docstring for more details on the protocol. | ||
| """ | ||
| sources = SourcesDataMessage.read(receive(server)).sources | ||
| manager = setup_worker_manager(sources, ctx) | ||
| if manager is None: | ||
|
|
@@ -130,13 +136,18 @@ def serve(server: IPCServer, ctx: ServerContext) -> None: | |
| gc.unfreeze() | ||
| gc.enable() | ||
| for id in graph: | ||
| manager.import_map[id] = set(graph[id].dependencies + graph[id].suppressed) | ||
| manager.import_map[id] = graph[id].dependencies_set | ||
| # Ignore errors during local graph loading to check that receiving | ||
| # early errors from coordinator works correctly. | ||
| manager.errors.reset() | ||
|
|
||
| # Notify worker we are done loading graph. | ||
| send(server, AckMessage()) | ||
|
|
||
| # Compare worker graph and coordinator, with parallel parser we will only use the latter. | ||
| coordinator_graph = GraphMessage.read(receive(server), manager).graph | ||
| graph_data = GraphMessage.read(receive(server), manager) | ||
| assert set(manager.missing_modules) == graph_data.missing_modules | ||
| coordinator_graph = graph_data.graph | ||
| assert coordinator_graph.keys() == graph.keys() | ||
| for id in graph: | ||
| assert graph[id].dependencies_set == coordinator_graph[id].dependencies_set | ||
|
|
@@ -150,14 +161,29 @@ def serve(server: IPCServer, ctx: ServerContext) -> None: | |
| # Notify coordinator we are ready to process SCCs. | ||
| send(server, AckMessage()) | ||
| while True: | ||
| scc_id = SccRequestMessage.read(receive(server)).scc_id | ||
| scc_message = SccRequestMessage.read(receive(server)) | ||
| scc_id = scc_message.scc_id | ||
| if scc_id is None: | ||
| manager.dump_stats() | ||
| break | ||
| scc = manager.scc_by_id[scc_id] | ||
| t0 = time.time() | ||
| try: | ||
| result = process_stale_scc(graph, scc, manager) | ||
| for id in scc.mod_ids: | ||
| state = graph[id] | ||
| # Extra if below is needed only because we are using local graph. | ||
| # TODO: clone options when switching to coordinator graph. | ||
| if state.tree is None: | ||
| # Parse early to get errors related data, such as ignored | ||
| # and skipped lines before replaying the errors. | ||
| state.parse_file() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we parsing the file here? Is this related to error processing?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we want to get the side effect of setting ignored/skipped lines. |
||
| else: | ||
| state.setup_errors() | ||
| if id in scc_message.import_errors: | ||
| manager.errors.set_file(state.xpath, id, state.options) | ||
| for err_info in scc_message.import_errors[id]: | ||
| manager.errors.add_error_info(err_info) | ||
| result = process_stale_scc(graph, scc, manager, from_cache=graph_data.from_cache) | ||
| # We must commit after each SCC, otherwise we break --sqlite-cache. | ||
| manager.metastore.commit() | ||
| except CompileError as blocker: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this because we now delete modules in the middle of a build, or is there another reason for maintaing this here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mostly for convenience, to avoid passing
grapharound just for the purpose of mappingidtoxpath. In most cases we identify modules byid, but in error-tracking-related code it is alwaysxpath.