Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,12 @@
"""Number of times the Flash-deployed endpoint will attempt to unpack the worker-flash tarball from mounted volume."""
DEFAULT_TARBALL_UNPACK_INTERVAL = 30
"""Time in seconds the Flash-deployed endpoint will wait between tarball unpack attempts."""

# Dependency Recovery Configuration
MAX_IMPORT_RECOVERY_ATTEMPTS = 3
"""Max on-the-fly package installs before giving up during handler loading.

When a deployed handler fails to import due to a missing package, the worker
attempts to install it and retry. This caps the retry loop to prevent unbounded
installs (e.g. a package with many missing transitive deps).
"""
122 changes: 114 additions & 8 deletions src/handler.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import importlib.util
import logging
import os
import sys
from pathlib import Path
from typing import Any, Dict, Optional

from constants import MAX_IMPORT_RECOVERY_ATTEMPTS
from logger import setup_logging
from unpack_volume import maybe_unpack
from version import format_version_banner
Expand All @@ -26,13 +28,121 @@ def _is_deployed_mode() -> bool:
return bool(os.getenv("FLASH_RESOURCE_NAME"))


class _HandlerRecoveryError(RuntimeError):
"""Raised by _exec_handler_module when on-the-fly recovery fails.

Distinguished from generic RuntimeError so _load_generated_handler can
re-raise it without wrapping, while still wrapping user-code RuntimeErrors.
"""


def _extract_missing_package(error: ImportError) -> str | None:
"""Extract the top-level package name from an ImportError.

Returns the root package name (e.g. 'numpy' from 'numpy.core') or None
if the module name cannot be determined.
"""
module_name: str | None = getattr(error, "name", None)
if not module_name:
return None
return module_name.split(".")[0]


def _try_install_missing_package(package_name: str) -> bool:
"""Attempt to install a missing package on-the-fly via DependencyInstaller.

Returns True if installation succeeded, False otherwise.
"""
from dependency_installer import DependencyInstaller

installer = DependencyInstaller()
result = installer.install_dependencies([package_name])
return bool(result.success)


def _exec_handler_module(
spec: importlib.machinery.ModuleSpec,
handler_file: Path,
) -> Any:
"""Execute a handler module spec, installing missing packages on-the-fly.

When a deployed handler fails to import due to a missing package (e.g.
numpy excluded from the build artifact but needed at runtime), this
function installs the package and retries. This adds to cold start time
but prevents a fatal crash.

Returns:
The loaded module object.

Raises:
_HandlerRecoveryError: If the handler cannot be loaded after recovery attempts.
"""
installed_packages: list[str] = []

for _attempt in range(MAX_IMPORT_RECOVERY_ATTEMPTS + 1):
mod = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(mod) # type: ignore[union-attr]
return mod
except ModuleNotFoundError as e:
if len(installed_packages) >= MAX_IMPORT_RECOVERY_ATTEMPTS:
raise _HandlerRecoveryError(
f"Generated handler {handler_file} failed to load after "
f"installing {len(installed_packages)} missing packages: "
f"{installed_packages}. Too many missing dependencies — "
f"redeploy with 'flash deploy'."
) from e

package_name = _extract_missing_package(e)
if not package_name or package_name in installed_packages:
raise _HandlerRecoveryError(
"Import is still failing after attempted automatic recovery "
"or the missing dependency could not be determined. "
"Inspect your handler and its dependencies, then redeploy "
"with 'flash deploy'."
) from e

logger.warning(
"Package '%s' is not in the build artifact. Installing on-the-fly. "
"This adds to cold start time — consider adding it to your "
"dependencies list to include it in the build artifact.",
package_name,
)

if not _try_install_missing_package(package_name):
raise _HandlerRecoveryError(
f"Failed to install missing package '{package_name}'. "
f"Generated handler {handler_file} cannot be loaded. "
f"Redeploy with 'flash deploy'."
) from e

installed_packages.append(package_name)
# Clear the failed module from sys.modules so the retry gets a fresh import
for key in list(sys.modules):
if key == package_name or key.startswith(f"{package_name}."):
del sys.modules[key]
importlib.invalidate_caches()
logger.info("Installed '%s', retrying handler load", package_name)

raise _HandlerRecoveryError(
f"Generated handler {handler_file} failed to load after installing "
f"{len(installed_packages)} missing packages: {installed_packages}. "
f"Too many missing dependencies — redeploy with 'flash deploy'."
)


def _load_generated_handler() -> Optional[Any]:
"""Load Flash-generated handler for deployed QB mode.

Checks for a handler_<resource_name>.py file generated by the flash
build pipeline. These handlers accept plain JSON input without
FunctionRequest/cloudpickle serialization.

If the handler fails to import due to a missing package, attempts
on-the-fly installation before giving up. This handles cases where
a package was excluded from the build artifact (e.g. size-prohibitive
packages) but is needed at runtime.

In deployed mode (FLASH_RESOURCE_NAME set), failures are fatal.
FunctionRequest fallback is only valid for Live Serverless workers.

Expand Down Expand Up @@ -68,20 +178,16 @@ def _load_generated_handler() -> Optional[Any]:
f"The file may be corrupted. Redeploy with 'flash deploy'."
)

mod = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(mod)
except ImportError as e:
raise RuntimeError(
f"Generated handler {handler_file} failed to import: {e}. "
f"This usually means a dependency was built for the wrong Python version. "
f"Redeploy with 'flash deploy'."
) from e
mod = _exec_handler_module(spec, handler_file)
except SyntaxError as e:
raise RuntimeError(
f"Generated handler {handler_file} has a syntax error: {e}. "
f"This indicates a bug in the flash build pipeline."
) from e
except _HandlerRecoveryError:
# Recovery-specific RuntimeErrors from _exec_handler_module — already formatted
raise
except Exception as e:
raise RuntimeError(
f"Generated handler {handler_file} failed to load: {e} ({type(e).__name__}). "
Expand Down
51 changes: 47 additions & 4 deletions tests/unit/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,60 @@ def test_raises_when_spec_creation_fails(self):
with pytest.raises(RuntimeError, match="Failed to create module spec"):
_load_generated_handler()

def test_raises_on_import_error(self, tmp_path):
"""If generated handler has ImportError, raises RuntimeError."""
def test_raises_on_import_error_when_install_fails(self, tmp_path):
"""If install of missing package fails, raises with install failure message."""
handler_file = tmp_path / "handler_gpu_config.py"
handler_file.write_text(
"from nonexistent_package import missing_function\ndef handler(event): pass\n"
)

with patch.dict("os.environ", {"FLASH_RESOURCE_NAME": "gpu_config"}):
with patch("handler.Path", return_value=handler_file):
with pytest.raises(RuntimeError, match="failed to import"):
_load_generated_handler()
with patch("handler._try_install_missing_package", return_value=False):
with pytest.raises(RuntimeError, match="Failed to install"):
_load_generated_handler()

def test_recovery_installs_missing_package_and_retries(self, tmp_path):
"""Successful on-the-fly install allows handler to load on retry."""
from handler import _exec_handler_module

handler_file = tmp_path / "handler_gpu_config.py"
handler_file.write_text("def handler(event): return {'recovered': True}\n")

import importlib.util

spec = importlib.util.spec_from_file_location("handler_gpu_config", handler_file)
assert spec is not None, f"Failed to create module spec for {handler_file}"
assert spec.loader is not None, f"Module spec has no loader for {handler_file}"

call_count = 0
original_exec = spec.loader.exec_module

def exec_side_effect(module):
nonlocal call_count
call_count += 1
if call_count == 1:
raise ModuleNotFoundError("no module", name="fake_recovery_pkg")
original_exec(module)

with patch.object(spec.loader, "exec_module", side_effect=exec_side_effect):
with patch("handler._try_install_missing_package", return_value=True) as mock_install:
mod = _exec_handler_module(spec, handler_file)
assert hasattr(mod, "handler")
assert callable(mod.handler)
mock_install.assert_called_once_with("fake_recovery_pkg")

def test_recovery_stops_if_same_package_fails_twice(self, tmp_path):
"""If the same package keeps failing after install, raises immediately."""
handler_file = tmp_path / "handler_gpu_config.py"
# Always raises ModuleNotFoundError for same package, even after "install"
handler_file.write_text("raise ModuleNotFoundError('still missing', name='stubborn_pkg')\n")

with patch.dict("os.environ", {"FLASH_RESOURCE_NAME": "gpu_config"}):
with patch("handler.Path", return_value=handler_file):
with patch("handler._try_install_missing_package", return_value=True):
with pytest.raises(RuntimeError, match="still failing after attempted"):
_load_generated_handler()

def test_raises_on_syntax_error(self, tmp_path):
"""SyntaxError in generated handler raises RuntimeError."""
Expand Down
Loading