From 9d2506bd5002e143faa2d6d3f0340e68f3b815b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Mon, 9 Mar 2026 16:29:35 -0700 Subject: [PATCH 1/2] feat(handler): on-the-fly package install for deployed mode When a deployed handler fails to import due to a missing package (e.g. excluded from the build artifact), the worker now attempts to install it via DependencyInstaller and retries the handler load. This prevents fatal crashes for CPU endpoints that need packages not in the slim base image. Logs a warning on each on-the-fly install so users are aware of the cold start penalty and can add the package to their dependencies list. Capped at 3 recovery attempts with a guard against retrying the same package, preventing unbounded install loops. --- src/constants.py | 9 +++ src/handler.py | 113 ++++++++++++++++++++++++++++++++++--- tests/unit/test_handler.py | 49 ++++++++++++++-- 3 files changed, 159 insertions(+), 12 deletions(-) diff --git a/src/constants.py b/src/constants.py index 2a7c7bc..d8dc2a6 100644 --- a/src/constants.py +++ b/src/constants.py @@ -50,3 +50,12 @@ """Number of times the Flash-deployed endpoint will attempt to unpack the worker-flash tarball from mounted volume.""" DEFAULT_TARBALL_UNPACK_INTERVAL = 30 """Time in seconds the Flash-deployed endpoint will wait between tarball unpack attempts.""" + +# Dependency Recovery Configuration +MAX_IMPORT_RECOVERY_ATTEMPTS = 3 +"""Max on-the-fly package installs before giving up during handler loading. + +When a deployed handler fails to import due to a missing package, the worker +attempts to install it and retry. This caps the retry loop to prevent unbounded +installs (e.g. a package with many missing transitive deps). +""" diff --git a/src/handler.py b/src/handler.py index c9b05a9..229bc65 100644 --- a/src/handler.py +++ b/src/handler.py @@ -1,9 +1,11 @@ import importlib.util import logging import os +import sys from pathlib import Path from typing import Any, Dict, Optional +from constants import MAX_IMPORT_RECOVERY_ATTEMPTS from logger import setup_logging from unpack_volume import maybe_unpack from version import format_version_banner @@ -26,6 +28,100 @@ def _is_deployed_mode() -> bool: return bool(os.getenv("FLASH_RESOURCE_NAME")) +class _HandlerRecoveryError(RuntimeError): + """Raised by _exec_handler_module when on-the-fly recovery fails. + + Distinguished from generic RuntimeError so _load_generated_handler can + re-raise it without wrapping, while still wrapping user-code RuntimeErrors. + """ + + +def _extract_missing_package(error: ImportError) -> str | None: + """Extract the top-level package name from an ImportError. + + Returns the root package name (e.g. 'numpy' from 'numpy.core') or None + if the module name cannot be determined. + """ + module_name: str | None = getattr(error, "name", None) + if not module_name: + return None + return module_name.split(".")[0] + + +def _try_install_missing_package(package_name: str) -> bool: + """Attempt to install a missing package on-the-fly via DependencyInstaller. + + Returns True if installation succeeded, False otherwise. + """ + from dependency_installer import DependencyInstaller + + installer = DependencyInstaller() + result = installer.install_dependencies([package_name]) + return bool(result.success) + + +def _exec_handler_module( + spec: importlib.machinery.ModuleSpec, + handler_file: Path, +) -> Any: + """Execute a handler module spec, installing missing packages on-the-fly. + + When a deployed handler fails to import due to a missing package (e.g. + numpy excluded from the build artifact but needed at runtime), this + function installs the package and retries. This adds to cold start time + but prevents a fatal crash. + + Returns: + The loaded module object. + + Raises: + _HandlerRecoveryError: If the handler cannot be loaded after recovery attempts. + """ + installed_packages: list[str] = [] + + for _attempt in range(MAX_IMPORT_RECOVERY_ATTEMPTS): + mod = importlib.util.module_from_spec(spec) + try: + spec.loader.exec_module(mod) # type: ignore[union-attr] + return mod + except ImportError as e: + package_name = _extract_missing_package(e) + if not package_name or package_name in installed_packages: + # Can't determine package or already tried installing it + raise _HandlerRecoveryError( + f"Generated handler {handler_file} failed to import: {e}. " + f"This usually means a dependency was built for the wrong " + f"Python version. Redeploy with 'flash deploy'." + ) from e + + logger.warning( + "Package '%s' is not in the build artifact. Installing on-the-fly. " + "This adds to cold start time — consider adding it to your " + "dependencies list to include it in the build artifact.", + package_name, + ) + + if not _try_install_missing_package(package_name): + raise _HandlerRecoveryError( + f"Failed to install missing package '{package_name}'. " + f"Generated handler {handler_file} cannot be loaded. " + f"Redeploy with 'flash deploy'." + ) from e + + installed_packages.append(package_name) + # Clear the failed module from sys.modules so the retry gets a fresh import + for key in list(sys.modules): + if key == package_name or key.startswith(f"{package_name}."): + del sys.modules[key] + logger.info("Installed '%s', retrying handler load", package_name) + + raise _HandlerRecoveryError( + f"Generated handler {handler_file} failed to load after installing " + f"{len(installed_packages)} missing packages: {installed_packages}. " + f"Too many missing dependencies — redeploy with 'flash deploy'." + ) + + def _load_generated_handler() -> Optional[Any]: """Load Flash-generated handler for deployed QB mode. @@ -33,6 +129,11 @@ def _load_generated_handler() -> Optional[Any]: build pipeline. These handlers accept plain JSON input without FunctionRequest/cloudpickle serialization. + If the handler fails to import due to a missing package, attempts + on-the-fly installation before giving up. This handles cases where + a package was excluded from the build artifact (e.g. size-prohibitive + packages) but is needed at runtime. + In deployed mode (FLASH_RESOURCE_NAME set), failures are fatal. FunctionRequest fallback is only valid for Live Serverless workers. @@ -68,20 +169,16 @@ def _load_generated_handler() -> Optional[Any]: f"The file may be corrupted. Redeploy with 'flash deploy'." ) - mod = importlib.util.module_from_spec(spec) try: - spec.loader.exec_module(mod) - except ImportError as e: - raise RuntimeError( - f"Generated handler {handler_file} failed to import: {e}. " - f"This usually means a dependency was built for the wrong Python version. " - f"Redeploy with 'flash deploy'." - ) from e + mod = _exec_handler_module(spec, handler_file) except SyntaxError as e: raise RuntimeError( f"Generated handler {handler_file} has a syntax error: {e}. " f"This indicates a bug in the flash build pipeline." ) from e + except _HandlerRecoveryError: + # Recovery-specific RuntimeErrors from _exec_handler_module — already formatted + raise except Exception as e: raise RuntimeError( f"Generated handler {handler_file} failed to load: {e} ({type(e).__name__}). " diff --git a/tests/unit/test_handler.py b/tests/unit/test_handler.py index 09f698a..f14032f 100644 --- a/tests/unit/test_handler.py +++ b/tests/unit/test_handler.py @@ -204,8 +204,8 @@ def test_raises_when_spec_creation_fails(self): with pytest.raises(RuntimeError, match="Failed to create module spec"): _load_generated_handler() - def test_raises_on_import_error(self, tmp_path): - """If generated handler has ImportError, raises RuntimeError.""" + def test_raises_on_import_error_when_install_fails(self, tmp_path): + """If install of missing package fails, raises with install failure message.""" handler_file = tmp_path / "handler_gpu_config.py" handler_file.write_text( "from nonexistent_package import missing_function\ndef handler(event): pass\n" @@ -213,8 +213,49 @@ def test_raises_on_import_error(self, tmp_path): with patch.dict("os.environ", {"FLASH_RESOURCE_NAME": "gpu_config"}): with patch("handler.Path", return_value=handler_file): - with pytest.raises(RuntimeError, match="failed to import"): - _load_generated_handler() + with patch("handler._try_install_missing_package", return_value=False): + with pytest.raises(RuntimeError, match="Failed to install"): + _load_generated_handler() + + def test_recovery_installs_missing_package_and_retries(self, tmp_path): + """Successful on-the-fly install allows handler to load on retry.""" + from handler import _exec_handler_module + + handler_file = tmp_path / "handler_gpu_config.py" + handler_file.write_text("def handler(event): return {'recovered': True}\n") + + import importlib.util + + spec = importlib.util.spec_from_file_location("handler_gpu_config", handler_file) + + call_count = 0 + original_exec = spec.loader.exec_module + + def exec_side_effect(module): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise ImportError("no module", name="fake_recovery_pkg") + original_exec(module) + + with patch.object(spec.loader, "exec_module", side_effect=exec_side_effect): + with patch("handler._try_install_missing_package", return_value=True) as mock_install: + mod = _exec_handler_module(spec, handler_file) + assert hasattr(mod, "handler") + assert callable(mod.handler) + mock_install.assert_called_once_with("fake_recovery_pkg") + + def test_recovery_stops_if_same_package_fails_twice(self, tmp_path): + """If the same package keeps failing after install, raises immediately.""" + handler_file = tmp_path / "handler_gpu_config.py" + # Always raises ImportError for same package, even after "install" + handler_file.write_text("raise ImportError('still missing', name='stubborn_pkg')\n") + + with patch.dict("os.environ", {"FLASH_RESOURCE_NAME": "gpu_config"}): + with patch("handler.Path", return_value=handler_file): + with patch("handler._try_install_missing_package", return_value=True): + with pytest.raises(RuntimeError, match="failed to import"): + _load_generated_handler() def test_raises_on_syntax_error(self, tmp_path): """SyntaxError in generated handler raises RuntimeError.""" From 1f829967c6bcdd7bc9984bffc0680edfe2eb70a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Mon, 9 Mar 2026 20:57:29 -0700 Subject: [PATCH 2/2] fix(review): address PR feedback for #77 - Restrict recovery to ModuleNotFoundError instead of broad ImportError - Fix off-by-one: add extra exec attempt after final install (MAX+1 iterations) - Add importlib.invalidate_caches() after on-the-fly install - Improve error message for failed recovery (actionable, not speculative) - Add explicit spec/loader assertions in test for clearer failure diagnostics - Update tests to raise ModuleNotFoundError to match new except clause --- src/handler.py | 21 +++++++++++++++------ tests/unit/test_handler.py | 10 ++++++---- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/src/handler.py b/src/handler.py index 229bc65..a1245e7 100644 --- a/src/handler.py +++ b/src/handler.py @@ -79,19 +79,27 @@ def _exec_handler_module( """ installed_packages: list[str] = [] - for _attempt in range(MAX_IMPORT_RECOVERY_ATTEMPTS): + for _attempt in range(MAX_IMPORT_RECOVERY_ATTEMPTS + 1): mod = importlib.util.module_from_spec(spec) try: spec.loader.exec_module(mod) # type: ignore[union-attr] return mod - except ImportError as e: + except ModuleNotFoundError as e: + if len(installed_packages) >= MAX_IMPORT_RECOVERY_ATTEMPTS: + raise _HandlerRecoveryError( + f"Generated handler {handler_file} failed to load after " + f"installing {len(installed_packages)} missing packages: " + f"{installed_packages}. Too many missing dependencies — " + f"redeploy with 'flash deploy'." + ) from e + package_name = _extract_missing_package(e) if not package_name or package_name in installed_packages: - # Can't determine package or already tried installing it raise _HandlerRecoveryError( - f"Generated handler {handler_file} failed to import: {e}. " - f"This usually means a dependency was built for the wrong " - f"Python version. Redeploy with 'flash deploy'." + "Import is still failing after attempted automatic recovery " + "or the missing dependency could not be determined. " + "Inspect your handler and its dependencies, then redeploy " + "with 'flash deploy'." ) from e logger.warning( @@ -113,6 +121,7 @@ def _exec_handler_module( for key in list(sys.modules): if key == package_name or key.startswith(f"{package_name}."): del sys.modules[key] + importlib.invalidate_caches() logger.info("Installed '%s', retrying handler load", package_name) raise _HandlerRecoveryError( diff --git a/tests/unit/test_handler.py b/tests/unit/test_handler.py index f14032f..4562d55 100644 --- a/tests/unit/test_handler.py +++ b/tests/unit/test_handler.py @@ -227,6 +227,8 @@ def test_recovery_installs_missing_package_and_retries(self, tmp_path): import importlib.util spec = importlib.util.spec_from_file_location("handler_gpu_config", handler_file) + assert spec is not None, f"Failed to create module spec for {handler_file}" + assert spec.loader is not None, f"Module spec has no loader for {handler_file}" call_count = 0 original_exec = spec.loader.exec_module @@ -235,7 +237,7 @@ def exec_side_effect(module): nonlocal call_count call_count += 1 if call_count == 1: - raise ImportError("no module", name="fake_recovery_pkg") + raise ModuleNotFoundError("no module", name="fake_recovery_pkg") original_exec(module) with patch.object(spec.loader, "exec_module", side_effect=exec_side_effect): @@ -248,13 +250,13 @@ def exec_side_effect(module): def test_recovery_stops_if_same_package_fails_twice(self, tmp_path): """If the same package keeps failing after install, raises immediately.""" handler_file = tmp_path / "handler_gpu_config.py" - # Always raises ImportError for same package, even after "install" - handler_file.write_text("raise ImportError('still missing', name='stubborn_pkg')\n") + # Always raises ModuleNotFoundError for same package, even after "install" + handler_file.write_text("raise ModuleNotFoundError('still missing', name='stubborn_pkg')\n") with patch.dict("os.environ", {"FLASH_RESOURCE_NAME": "gpu_config"}): with patch("handler.Path", return_value=handler_file): with patch("handler._try_install_missing_package", return_value=True): - with pytest.raises(RuntimeError, match="failed to import"): + with pytest.raises(RuntimeError, match="still failing after attempted"): _load_generated_handler() def test_raises_on_syntax_error(self, tmp_path):