Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from flytekit.core import utils
from flytekit.core.base_task import IgnoreOutputs, PythonTask
from flytekit.core.checkpointer import SyncCheckpoint
from flytekit.core.constants import FLYTE_FAIL_ON_ERROR
from flytekit.core.constants import FLYTE_FAIL_ON_ERROR, RUNTIME_PACKAGES_ENV_NAME
from flytekit.core.context_manager import (
ExecutionParameters,
ExecutionState,
Expand Down Expand Up @@ -63,6 +63,18 @@ def get_version_message():
return f"Welcome to Flyte! Version: {flytekit.__version__}"


def _run_subprocess(cmd: List[str], env: Optional[dict] = None) -> int:
"""Run cmd with proper SIGTERM handling."""
p = subprocess.Popen(cmd, env=env)

def handle_sigterm(signum, frame):
logger.info(f"passing signum {signum} [frame={frame}] to subprocess")
p.send_signal(signum)

signal.signal(signal.SIGTERM, handle_sigterm)
return p.wait()


def _compute_array_job_index():
"""
Computes the absolute index of the current array job. This is determined by summing the compute-environment-specific
Expand Down Expand Up @@ -432,6 +444,14 @@ def setup_execution(

compressed_serialization_settings = os.environ.get(SERIALIZED_CONTEXT_ENV_VAR, "")

if runtime_packages := os.getenv(RUNTIME_PACKAGES_ENV_NAME):
import importlib
import site

dev_packages_list = runtime_packages.split(" ")
_run_subprocess([sys.executable, "-m", "pip", "install", *dev_packages_list])
importlib.reload(site)

ctx = FlyteContextManager.current_context()
# Create directories
user_workspace_dir = ctx.file_access.get_random_local_directory()
Expand Down Expand Up @@ -751,14 +771,7 @@ def fast_execute_task_cmd(additional_distribution: str, dest_dir: str, task_exec
env["PYTHONPATH"] += os.pathsep + dest_dir_resolved
else:
env["PYTHONPATH"] = dest_dir_resolved
p = subprocess.Popen(cmd, env=env)

def handle_sigterm(signum, frame):
logger.info(f"passing signum {signum} [frame={frame}] to subprocess")
p.send_signal(signum)

signal.signal(signal.SIGTERM, handle_sigterm)
returncode = p.wait()
returncode = _run_subprocess(cmd, env)
exit(returncode)


Expand Down
3 changes: 3 additions & 0 deletions flytekit/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@
# Shared memory mount name and path
SHARED_MEMORY_MOUNT_NAME = "flyte-shared-memory"
SHARED_MEMORY_MOUNT_PATH = "/dev/shm"

# Packages to be installed at the beginning of runtime
RUNTIME_PACKAGES_ENV_NAME = "_F_RUNTIME_PACKAGES"
7 changes: 7 additions & 0 deletions flytekit/core/python_auto_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from flytekit.configuration import ImageConfig, SerializationSettings
from flytekit.constants import CopyFileDetection
from flytekit.core.base_task import PythonTask, TaskMetadata, TaskResolverMixin
from flytekit.core.constants import RUNTIME_PACKAGES_ENV_NAME
from flytekit.core.context_manager import FlyteContextManager
from flytekit.core.pod_template import PodTemplate
from flytekit.core.resources import Resources, ResourceSpec, construct_extended_resources
Expand Down Expand Up @@ -231,6 +232,12 @@ def _get_container(self, settings: SerializationSettings) -> _task_model.Contain
for elem in (settings.env, self.environment):
if elem:
env.update(elem)

# Add runtime dependencies into environment
if isinstance(self.container_image, ImageSpec) and self.container_image.runtime_packages:
runtime_packages = " ".join(self.container_image.runtime_packages)
env[RUNTIME_PACKAGES_ENV_NAME] = runtime_packages

return _get_container_definition(
image=self.get_image(settings),
resource_spec=self.resources,
Expand Down
3 changes: 3 additions & 0 deletions flytekit/image_spec/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

from .default_builder import DefaultImageBuilder
from .image_spec import ImageBuildEngine, ImageSpec
from .noop_builder import NoOpBuilder

# Set this to a lower priority compared to `envd` to maintain backward compatibility
ImageBuildEngine.register(DefaultImageBuilder.builder_type, DefaultImageBuilder(), priority=1)
# Lower priority compared to Default.
ImageBuildEngine.register(NoOpBuilder.builder_type, NoOpBuilder(), priority=0)
109 changes: 74 additions & 35 deletions flytekit/image_spec/image_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dataclasses import asdict, dataclass
from functools import cached_property, lru_cache
from importlib import metadata
from typing import Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union

import click
import requests
Expand All @@ -30,50 +30,57 @@ class ImageSpec:
"""
This class is used to specify the docker image that will be used to run the task.

Args:
name: name of the image.
python_version: python version of the image. Use default python in the base image if None.
builder: Type of plugin to build the image. Use envd by default.
source_root: source root of the image.
env: environment variables of the image.
registry: registry of the image.
packages: list of python packages to install.
conda_packages: list of conda packages to install.
conda_channels: list of conda channels.
requirements: path to the requirements.txt file.
apt_packages: list of apt packages to install.
cuda: version of cuda to install.
cudnn: version of cudnn to install.
base_image: base image of the image.
platform: Specify the target platforms for the build output (for example, windows/amd64 or linux/amd64,darwin/arm64
pip_index: Specify the custom pip index url
pip_extra_index_url: Specify one or more pip index urls as a list
pip_secret_mounts: Specify a list of tuples to mount secret for pip install. Each tuple should contain the path to
Attributes:
name (str): Name of the image.
python_version (str): Python version of the image. Use default python in the base image if None.
builder (Optional[str]): Type of plugin to build the image. Use envd by default.
source_root (Optional[str]): Source root of the image.
env (Optional[Dict[str, str]]): Environment variables of the image.
registry (Optional[str]): Registry of the image.
packages (Optional[List[str]]): List of python packages to install.
conda_packages (Optional[List[str]]): List of conda packages to install.
conda_channels (Optional[List[str]]): List of conda channels.
requirements (Optional[str]): Path to the requirements.txt file.
apt_packages (Optional[List[str]]): List of apt packages to install.
cuda (Optional[str]): Version of cuda to install.
cudnn (Optional[str]): Version of cudnn to install.
base_image (Optional[Union[str, 'ImageSpec']]): Base image of the image.
platform (str): Specify the target platforms for the build output (for example, windows/amd64 or linux/amd64,darwin/arm64).
pip_index (Optional[str]): Specify the custom pip index url.
pip_extra_index_url (Optional[List[str]]): Specify one or more pip index urls as a list.
pip_secret_mounts (Optional[List[Tuple[str, str]]]): Specify a list of tuples to mount secret for pip install. Each tuple should contain the path to
the secret file and the mount path. For example, [(".gitconfig", "/etc/gitconfig")]. This is experimental and
the interface may change in the future. Configuring this should not change the built image.
pip_extra_args: Specify one or more extra pip install arguments as a space-delimited string
registry_config: Specify the path to a JSON registry config file
entrypoint: List of strings to overwrite the entrypoint of the base image with, set to [] to remove the entrypoint.
commands: Command to run during the building process
tag_format: Custom string format for image tag. The ImageSpec hash passed in as `spec_hash`. For example,
to add a "dev" suffix to the image tag, set `tag_format="{spec_hash}-dev"`
source_copy_mode: This option allows the user to specify which source files to copy from the local host, into the image.
pip_extra_args (Optional[str]): Specify one or more extra pip install arguments as a space-delimited string.
registry_config (Optional[str]): Specify the path to a JSON registry config file.
entrypoint (Optional[List[str]]): List of strings to overwrite the entrypoint of the base image with, set to [] to remove the entrypoint.
commands (Optional[List[str]]): Command to run during the building process.
tag_format (Optional[str]): Custom string format for image tag. The ImageSpec hash passed in as `spec_hash`. For example,
to add a "dev" suffix to the image tag, set `tag_format="{spec_hash}-dev"`.
source_copy_mode (Optional[CopyFileDetection]): This option allows the user to specify which source files to copy from the local host, into the image.
Not setting this option means to use the default flytekit behavior. The default behavior is:
- if fast register is used, source files are not copied into the image (because they're already copied
into the fast register tar layer).
- if fast register is not used, then the LOADED_MODULES (aka 'auto') option is used to copy loaded
Python files into the image.

If the option is set by the user, then that option is of course used.
copy: List of files/directories to copy to /root. e.g. ["src/file1.txt", "src/file2.txt"]
python_exec: Python executable to use for install packages
copy (Optional[List[str]]): List of files/directories to copy to /root. e.g. ["src/file1.txt", "src/file2.txt"].
python_exec (Optional[str]): Python executable to use for install packages.
runtime_packages (Optional[List[str]]): List of packages to be installed during runtime. `runtime_packages` requires `pip` to be installed
in your base image.
- If you are using an ImageSpec as your base image, please include `pip` into your packages:
`ImageSpec(..., packages=["pip"])`.
- If you want to install runtime packages into a fixed base_image and not use an image builder, you can
use `builder="noop"`: `ImageSpec(base_image="ghcr.io/name/my-custom-image", builder="noop").with_runtime_packages(["numpy"])`.
builder_options (Optional[Dict[str, Any]]): Additional options for the builder. This is a dictionary that will be passed to the builder.
The options are builder-specific and may not be supported by all builders.
"""

name: str = "flytekit"
python_version: str = None # Use default python in the base image if None.
builder: Optional[str] = None
source_root: Optional[str] = None # a.txt:auto
env: Optional[typing.Dict[str, str]] = None
env: Optional[Dict[str, str]] = None
registry: Optional[str] = None
packages: Optional[List[str]] = None
conda_packages: Optional[List[str]] = None
Expand All @@ -95,6 +102,8 @@ class ImageSpec:
source_copy_mode: Optional[CopyFileDetection] = None
copy: Optional[List[str]] = None
python_exec: Optional[str] = None
runtime_packages: Optional[List[str]] = None
builder_options: Optional[Dict[str, Any]] = None

def __post_init__(self):
self.name = self.name.lower()
Expand Down Expand Up @@ -127,6 +136,7 @@ def __post_init__(self):
"pip_extra_index_url",
"entrypoint",
"commands",
"runtime_packages",
]
for parameter in parameters_str_list:
attr = getattr(self, parameter)
Expand All @@ -145,6 +155,9 @@ def __post_init__(self):
error_msg = "pip_secret_mounts must be a list of tuples of two strings or None"
raise ValueError(error_msg)

if self.builder_options is not None and not isinstance(self.builder_options, dict):
raise ValueError("builder_options must be a dictionary or None")

@cached_property
def id(self) -> str:
"""
Expand All @@ -160,7 +173,7 @@ def id(self) -> str:

:return: a unique identifier of the ImageSpec
"""
parameters_to_exclude = ["pip_secret_mounts", "builder"]
parameters_to_exclude = ["pip_secret_mounts", "builder", "runtime_packages"]
# Only get the non-None values in the ImageSpec to ensure the hash is consistent across different Flytekit versions.
image_spec_dict = asdict(
self, dict_factory=lambda x: {k: v for (k, v) in x if v is not None and k not in parameters_to_exclude}
Expand Down Expand Up @@ -310,16 +323,30 @@ def exist(self) -> Optional[bool]:
click.secho(f"Failed to check if the image exists with error:\n {e}", fg="red")
return None

def _update_attribute(self, attr_name: str, values: Union[str, List[str]]) -> "ImageSpec":
def _update_attribute(self, attr_name: str, values: Union[str, List[str], Dict[str, Any]]) -> "ImageSpec":
"""
Generic method to update a specified list attribute, either appending or extending.
Generic method to update a specified attribute, handling strings, lists, and dictionaries.
"""
current_value = copy.deepcopy(getattr(self, attr_name)) or []
current_value = copy.deepcopy(getattr(self, attr_name))

if current_value is None:
if isinstance(values, dict):
current_value = {}
else:
current_value = []

if isinstance(values, str):
if not isinstance(current_value, list):
raise TypeError(f"Cannot append string to non-list attribute {attr_name}")
current_value.append(values)
elif isinstance(values, list):
if not isinstance(current_value, list):
raise TypeError(f"Cannot extend non-list attribute {attr_name}")
current_value.extend(values)
elif isinstance(values, dict):
if not isinstance(current_value, dict):
raise TypeError(f"Cannot update non-dict attribute {attr_name}")
current_value.update(values)

return dataclasses.replace(self, **{attr_name: current_value})

Expand Down Expand Up @@ -358,6 +385,18 @@ def force_push(self) -> "ImageSpec":

return copied_image_spec

def with_runtime_packages(self, runtime_packages: List[str]) -> "ImageSpec":
"""
Builder that returns a new image spec with runtime packages. Dev packages will be installed during runtime.
"""
return self._update_attribute("runtime_packages", runtime_packages)

def with_builder_options(self, builder_options: Dict[str, Any]) -> "ImageSpec":
"""
Builder that returns a new image spec with additional builder options.
"""
return self._update_attribute("builder_options", builder_options)

@classmethod
def from_env(cls, *, pinned_packages: Optional[List[str]] = None, **kwargs) -> "ImageSpec":
"""Create ImageSpec with the environment's Python version and packages pinned to the ones in the environment."""
Expand Down
17 changes: 17 additions & 0 deletions flytekit/image_spec/noop_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from flytekit.image_spec.image_spec import ImageSpec, ImageSpecBuilder


class NoOpBuilder(ImageSpecBuilder):
"""Noop image builder."""

builder_type = "noop"

def build_image(self, image_spec: ImageSpec) -> str:
if not isinstance(image_spec.base_image, str):
msg = "base_image must be a string to use the noop image builder"
raise ValueError(msg)

import click

click.secho(f"Using image: {image_spec.base_image}", fg="blue")
return image_spec.base_image
2 changes: 0 additions & 2 deletions pydoclint-errors-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,6 @@ flytekit/extras/tensorflow/record.py
DOC603: Class `TFRecordDatasetConfig`: Class docstring attributes are different from actual class attributes. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Attributes in the class definition but not in the docstring: [buffer_size: Optional[int], compression_type: Optional[str], name: Optional[str], num_parallel_reads: Optional[int]]. (Please read https://jsh9.github.io/pydoclint/checking_class_attributes.html on how to correctly document class attributes.)
--------------------
flytekit/image_spec/image_spec.py
DOC601: Class `ImageSpec`: Class docstring contains fewer class attributes than actual class attributes. (Please read https://jsh9.github.io/pydoclint/checking_class_attributes.html on how to correctly document class attributes.)
DOC603: Class `ImageSpec`: Class docstring attributes are different from actual class attributes. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Attributes in the class definition but not in the docstring: [apt_packages: Optional[List[str]], base_image: Optional[Union[str, 'ImageSpec']], builder: Optional[str], commands: Optional[List[str]], conda_channels: Optional[List[str]], conda_packages: Optional[List[str]], copy: Optional[List[str]], cuda: Optional[str], cudnn: Optional[str], entrypoint: Optional[List[str]], env: Optional[typing.Dict[str, str]], name: str, packages: Optional[List[str]], pip_extra_args: Optional[str], pip_extra_index_url: Optional[List[str]], pip_index: Optional[str], pip_secret_mounts: Optional[List[Tuple[str, str]]], platform: str, python_exec: Optional[str], python_version: str, registry: Optional[str], registry_config: Optional[str], requirements: Optional[str], source_copy_mode: Optional[CopyFileDetection], source_root: Optional[str], tag_format: Optional[str]]. (Please read https://jsh9.github.io/pydoclint/checking_class_attributes.html on how to correctly document class attributes.)
DOC109: Method `ImageSpecBuilder.build_image`: The option `--arg-type-hints-in-docstring` is `True` but there are no type hints in the docstring arg list
DOC110: Method `ImageSpecBuilder.build_image`: The option `--arg-type-hints-in-docstring` is `True` but not all args in the docstring arg list have type hints
DOC105: Method `ImageSpecBuilder.build_image`: Argument names match, but type hints in these args do not match: image_spec
Expand Down
Loading
Loading