diff --git a/src/constants.py b/src/constants.py index 2a7c7bc..d8dc2a6 100644 --- a/src/constants.py +++ b/src/constants.py @@ -50,3 +50,12 @@ """Number of times the Flash-deployed endpoint will attempt to unpack the worker-flash tarball from mounted volume.""" DEFAULT_TARBALL_UNPACK_INTERVAL = 30 """Time in seconds the Flash-deployed endpoint will wait between tarball unpack attempts.""" + +# Dependency Recovery Configuration +MAX_IMPORT_RECOVERY_ATTEMPTS = 3 +"""Max on-the-fly package installs before giving up during handler loading. + +When a deployed handler fails to import due to a missing package, the worker +attempts to install it and retry. This caps the retry loop to prevent unbounded +installs (e.g. a package with many missing transitive deps). +""" diff --git a/src/handler.py b/src/handler.py index c9b05a9..a1245e7 100644 --- a/src/handler.py +++ b/src/handler.py @@ -1,9 +1,11 @@ import importlib.util import logging import os +import sys from pathlib import Path from typing import Any, Dict, Optional +from constants import MAX_IMPORT_RECOVERY_ATTEMPTS from logger import setup_logging from unpack_volume import maybe_unpack from version import format_version_banner @@ -26,6 +28,109 @@ def _is_deployed_mode() -> bool: return bool(os.getenv("FLASH_RESOURCE_NAME")) +class _HandlerRecoveryError(RuntimeError): + """Raised by _exec_handler_module when on-the-fly recovery fails. + + Distinguished from generic RuntimeError so _load_generated_handler can + re-raise it without wrapping, while still wrapping user-code RuntimeErrors. + """ + + +def _extract_missing_package(error: ImportError) -> str | None: + """Extract the top-level package name from an ImportError. + + Returns the root package name (e.g. 'numpy' from 'numpy.core') or None + if the module name cannot be determined. + """ + module_name: str | None = getattr(error, "name", None) + if not module_name: + return None + return module_name.split(".")[0] + + +def _try_install_missing_package(package_name: str) -> bool: + """Attempt to install a missing package on-the-fly via DependencyInstaller. + + Returns True if installation succeeded, False otherwise. + """ + from dependency_installer import DependencyInstaller + + installer = DependencyInstaller() + result = installer.install_dependencies([package_name]) + return bool(result.success) + + +def _exec_handler_module( + spec: importlib.machinery.ModuleSpec, + handler_file: Path, +) -> Any: + """Execute a handler module spec, installing missing packages on-the-fly. + + When a deployed handler fails to import due to a missing package (e.g. + numpy excluded from the build artifact but needed at runtime), this + function installs the package and retries. This adds to cold start time + but prevents a fatal crash. + + Returns: + The loaded module object. + + Raises: + _HandlerRecoveryError: If the handler cannot be loaded after recovery attempts. + """ + installed_packages: list[str] = [] + + for _attempt in range(MAX_IMPORT_RECOVERY_ATTEMPTS + 1): + mod = importlib.util.module_from_spec(spec) + try: + spec.loader.exec_module(mod) # type: ignore[union-attr] + return mod + except ModuleNotFoundError as e: + if len(installed_packages) >= MAX_IMPORT_RECOVERY_ATTEMPTS: + raise _HandlerRecoveryError( + f"Generated handler {handler_file} failed to load after " + f"installing {len(installed_packages)} missing packages: " + f"{installed_packages}. Too many missing dependencies — " + f"redeploy with 'flash deploy'." + ) from e + + package_name = _extract_missing_package(e) + if not package_name or package_name in installed_packages: + raise _HandlerRecoveryError( + "Import is still failing after attempted automatic recovery " + "or the missing dependency could not be determined. " + "Inspect your handler and its dependencies, then redeploy " + "with 'flash deploy'." + ) from e + + logger.warning( + "Package '%s' is not in the build artifact. Installing on-the-fly. " + "This adds to cold start time — consider adding it to your " + "dependencies list to include it in the build artifact.", + package_name, + ) + + if not _try_install_missing_package(package_name): + raise _HandlerRecoveryError( + f"Failed to install missing package '{package_name}'. " + f"Generated handler {handler_file} cannot be loaded. " + f"Redeploy with 'flash deploy'." + ) from e + + installed_packages.append(package_name) + # Clear the failed module from sys.modules so the retry gets a fresh import + for key in list(sys.modules): + if key == package_name or key.startswith(f"{package_name}."): + del sys.modules[key] + importlib.invalidate_caches() + logger.info("Installed '%s', retrying handler load", package_name) + + raise _HandlerRecoveryError( + f"Generated handler {handler_file} failed to load after installing " + f"{len(installed_packages)} missing packages: {installed_packages}. " + f"Too many missing dependencies — redeploy with 'flash deploy'." + ) + + def _load_generated_handler() -> Optional[Any]: """Load Flash-generated handler for deployed QB mode. @@ -33,6 +138,11 @@ def _load_generated_handler() -> Optional[Any]: build pipeline. These handlers accept plain JSON input without FunctionRequest/cloudpickle serialization. + If the handler fails to import due to a missing package, attempts + on-the-fly installation before giving up. This handles cases where + a package was excluded from the build artifact (e.g. size-prohibitive + packages) but is needed at runtime. + In deployed mode (FLASH_RESOURCE_NAME set), failures are fatal. FunctionRequest fallback is only valid for Live Serverless workers. @@ -68,20 +178,16 @@ def _load_generated_handler() -> Optional[Any]: f"The file may be corrupted. Redeploy with 'flash deploy'." ) - mod = importlib.util.module_from_spec(spec) try: - spec.loader.exec_module(mod) - except ImportError as e: - raise RuntimeError( - f"Generated handler {handler_file} failed to import: {e}. " - f"This usually means a dependency was built for the wrong Python version. " - f"Redeploy with 'flash deploy'." - ) from e + mod = _exec_handler_module(spec, handler_file) except SyntaxError as e: raise RuntimeError( f"Generated handler {handler_file} has a syntax error: {e}. " f"This indicates a bug in the flash build pipeline." ) from e + except _HandlerRecoveryError: + # Recovery-specific RuntimeErrors from _exec_handler_module — already formatted + raise except Exception as e: raise RuntimeError( f"Generated handler {handler_file} failed to load: {e} ({type(e).__name__}). " diff --git a/tests/unit/test_handler.py b/tests/unit/test_handler.py index 09f698a..4562d55 100644 --- a/tests/unit/test_handler.py +++ b/tests/unit/test_handler.py @@ -204,8 +204,8 @@ def test_raises_when_spec_creation_fails(self): with pytest.raises(RuntimeError, match="Failed to create module spec"): _load_generated_handler() - def test_raises_on_import_error(self, tmp_path): - """If generated handler has ImportError, raises RuntimeError.""" + def test_raises_on_import_error_when_install_fails(self, tmp_path): + """If install of missing package fails, raises with install failure message.""" handler_file = tmp_path / "handler_gpu_config.py" handler_file.write_text( "from nonexistent_package import missing_function\ndef handler(event): pass\n" @@ -213,8 +213,51 @@ def test_raises_on_import_error(self, tmp_path): with patch.dict("os.environ", {"FLASH_RESOURCE_NAME": "gpu_config"}): with patch("handler.Path", return_value=handler_file): - with pytest.raises(RuntimeError, match="failed to import"): - _load_generated_handler() + with patch("handler._try_install_missing_package", return_value=False): + with pytest.raises(RuntimeError, match="Failed to install"): + _load_generated_handler() + + def test_recovery_installs_missing_package_and_retries(self, tmp_path): + """Successful on-the-fly install allows handler to load on retry.""" + from handler import _exec_handler_module + + handler_file = tmp_path / "handler_gpu_config.py" + handler_file.write_text("def handler(event): return {'recovered': True}\n") + + import importlib.util + + spec = importlib.util.spec_from_file_location("handler_gpu_config", handler_file) + assert spec is not None, f"Failed to create module spec for {handler_file}" + assert spec.loader is not None, f"Module spec has no loader for {handler_file}" + + call_count = 0 + original_exec = spec.loader.exec_module + + def exec_side_effect(module): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise ModuleNotFoundError("no module", name="fake_recovery_pkg") + original_exec(module) + + with patch.object(spec.loader, "exec_module", side_effect=exec_side_effect): + with patch("handler._try_install_missing_package", return_value=True) as mock_install: + mod = _exec_handler_module(spec, handler_file) + assert hasattr(mod, "handler") + assert callable(mod.handler) + mock_install.assert_called_once_with("fake_recovery_pkg") + + def test_recovery_stops_if_same_package_fails_twice(self, tmp_path): + """If the same package keeps failing after install, raises immediately.""" + handler_file = tmp_path / "handler_gpu_config.py" + # Always raises ModuleNotFoundError for same package, even after "install" + handler_file.write_text("raise ModuleNotFoundError('still missing', name='stubborn_pkg')\n") + + with patch.dict("os.environ", {"FLASH_RESOURCE_NAME": "gpu_config"}): + with patch("handler.Path", return_value=handler_file): + with patch("handler._try_install_missing_package", return_value=True): + with pytest.raises(RuntimeError, match="still failing after attempted"): + _load_generated_handler() def test_raises_on_syntax_error(self, tmp_path): """SyntaxError in generated handler raises RuntimeError."""