-
Notifications
You must be signed in to change notification settings - Fork 335
Adds ImageSpec.with_runtime_packages #3231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b5a1aa9
4b0521d
3ea79df
e9d384d
b9a412a
9d19964
b8c2fb2
b6ce00d
d022e45
2ad32c8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -30,7 +30,7 @@ | |||||||||||||||||||||||||||||||||||||||
| from flytekit.core import utils | ||||||||||||||||||||||||||||||||||||||||
| from flytekit.core.base_task import IgnoreOutputs, PythonTask | ||||||||||||||||||||||||||||||||||||||||
| from flytekit.core.checkpointer import SyncCheckpoint | ||||||||||||||||||||||||||||||||||||||||
| from flytekit.core.constants import FLYTE_FAIL_ON_ERROR | ||||||||||||||||||||||||||||||||||||||||
| from flytekit.core.constants import FLYTE_FAIL_ON_ERROR, RUNTIME_PACKAGES_ENV_NAME | ||||||||||||||||||||||||||||||||||||||||
| from flytekit.core.context_manager import ( | ||||||||||||||||||||||||||||||||||||||||
| ExecutionParameters, | ||||||||||||||||||||||||||||||||||||||||
| ExecutionState, | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -63,6 +63,18 @@ def get_version_message(): | |||||||||||||||||||||||||||||||||||||||
| return f"Welcome to Flyte! Version: {flytekit.__version__}" | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def _run_subprocess(cmd: List[str], env: Optional[dict] = None) -> int: | ||||||||||||||||||||||||||||||||||||||||
| """Run cmd with proper SIGTERM handling.""" | ||||||||||||||||||||||||||||||||||||||||
| p = subprocess.Popen(cmd, env=env) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def handle_sigterm(signum, frame): | ||||||||||||||||||||||||||||||||||||||||
| logger.info(f"passing signum {signum} [frame={frame}] to subprocess") | ||||||||||||||||||||||||||||||||||||||||
| p.send_signal(signum) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| signal.signal(signal.SIGTERM, handle_sigterm) | ||||||||||||||||||||||||||||||||||||||||
| return p.wait() | ||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+68
to
+75
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Signal handler not restored after subprocess completes
The Code suggestionCheck the AI-generated fix before applying
Suggested change
Code Review Run #b1335b Should Bito avoid suggestions like this for future reviews? (Manage Rules)
|
||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def _compute_array_job_index(): | ||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||
| Computes the absolute index of the current array job. This is determined by summing the compute-environment-specific | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -432,6 +444,14 @@ def setup_execution( | |||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| compressed_serialization_settings = os.environ.get(SERIALIZED_CONTEXT_ENV_VAR, "") | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| if runtime_packages := os.getenv(RUNTIME_PACKAGES_ENV_NAME): | ||||||||||||||||||||||||||||||||||||||||
| import importlib | ||||||||||||||||||||||||||||||||||||||||
| import site | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| dev_packages_list = runtime_packages.split(" ") | ||||||||||||||||||||||||||||||||||||||||
| _run_subprocess([sys.executable, "-m", "pip", "install", *dev_packages_list]) | ||||||||||||||||||||||||||||||||||||||||
| importlib.reload(site) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| ctx = FlyteContextManager.current_context() | ||||||||||||||||||||||||||||||||||||||||
| # Create directories | ||||||||||||||||||||||||||||||||||||||||
| user_workspace_dir = ctx.file_access.get_random_local_directory() | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -751,14 +771,7 @@ def fast_execute_task_cmd(additional_distribution: str, dest_dir: str, task_exec | |||||||||||||||||||||||||||||||||||||||
| env["PYTHONPATH"] += os.pathsep + dest_dir_resolved | ||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||
| env["PYTHONPATH"] = dest_dir_resolved | ||||||||||||||||||||||||||||||||||||||||
| p = subprocess.Popen(cmd, env=env) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def handle_sigterm(signum, frame): | ||||||||||||||||||||||||||||||||||||||||
| logger.info(f"passing signum {signum} [frame={frame}] to subprocess") | ||||||||||||||||||||||||||||||||||||||||
| p.send_signal(signum) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| signal.signal(signal.SIGTERM, handle_sigterm) | ||||||||||||||||||||||||||||||||||||||||
| returncode = p.wait() | ||||||||||||||||||||||||||||||||||||||||
| returncode = _run_subprocess(cmd, env) | ||||||||||||||||||||||||||||||||||||||||
| exit(returncode) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,6 +67,12 @@ class ImageSpec: | |
| If the option is set by the user, then that option is of course used. | ||
| copy: List of files/directories to copy to /root. e.g. ["src/file1.txt", "src/file2.txt"] | ||
| python_exec: Python executable to use for install packages | ||
| runtime_packages: List of packages to be installed during runtime. `runtime_packages` requires `pip` to be installed | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a comment telling people to not use this? Or use it only as a last resort? |
||
| in your base image. | ||
| - If you are using an ImageSpec as your base image, please include `pip` into your packages: | ||
| `ImageSpec(..., packages=["pip"])`. | ||
| - If you want to install runtime packages into a fixed base_image and not use an image builder, you can | ||
| use `builder="noop"`: `ImageSpec(base_image="ghcr.io/name/my-custom-image", builder="noop").with_runtime_packages(["numpy"])` | ||
| """ | ||
|
|
||
| name: str = "flytekit" | ||
|
|
@@ -95,6 +101,7 @@ class ImageSpec: | |
| source_copy_mode: Optional[CopyFileDetection] = None | ||
| copy: Optional[List[str]] = None | ||
| python_exec: Optional[str] = None | ||
| runtime_packages: Optional[List[str]] = None | ||
|
|
||
| def __post_init__(self): | ||
| self.name = self.name.lower() | ||
|
|
@@ -127,6 +134,7 @@ def __post_init__(self): | |
| "pip_extra_index_url", | ||
| "entrypoint", | ||
| "commands", | ||
| "runtime_packages", | ||
| ] | ||
| for parameter in parameters_str_list: | ||
| attr = getattr(self, parameter) | ||
|
|
@@ -160,7 +168,7 @@ def id(self) -> str: | |
|
|
||
| :return: a unique identifier of the ImageSpec | ||
| """ | ||
| parameters_to_exclude = ["pip_secret_mounts", "builder"] | ||
| parameters_to_exclude = ["pip_secret_mounts", "builder", "runtime_packages"] | ||
| # Only get the non-None values in the ImageSpec to ensure the hash is consistent across different Flytekit versions. | ||
| image_spec_dict = asdict( | ||
| self, dict_factory=lambda x: {k: v for (k, v) in x if v is not None and k not in parameters_to_exclude} | ||
|
|
@@ -358,6 +366,12 @@ def force_push(self) -> "ImageSpec": | |
|
|
||
| return copied_image_spec | ||
|
|
||
| def with_runtime_packages(self, runtime_packages: List[str]) -> "ImageSpec": | ||
| """ | ||
| Builder that returns a new image spec with runtime packages. Dev packages will be installed during runtime. | ||
| """ | ||
| return self._update_attribute("runtime_packages", runtime_packages) | ||
|
|
||
| @classmethod | ||
| def from_env(cls, *, pinned_packages: Optional[List[str]] = None, **kwargs) -> "ImageSpec": | ||
| """Create ImageSpec with the environment's Python version and packages pinned to the ones in the environment.""" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| from flytekit.image_spec.image_spec import ImageSpec, ImageSpecBuilder | ||
|
|
||
|
|
||
| class NoOpBuilder(ImageSpecBuilder): | ||
| """Noop image builder.""" | ||
|
|
||
| builder_type = "noop" | ||
|
|
||
| def build_image(self, image_spec: ImageSpec) -> str: | ||
| if not isinstance(image_spec.base_image, str): | ||
| msg = "base_image must be a string to use the noop image builder" | ||
| raise ValueError(msg) | ||
|
|
||
| import click | ||
|
|
||
| click.secho(f"Using image: {image_spec.base_image}", fg="blue") | ||
| return image_spec.base_image |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| import pytest | ||
| from flytekit.image_spec.noop_builder import NoOpBuilder | ||
| from flytekit import ImageSpec | ||
|
|
||
|
|
||
| def test_noop_builder(): | ||
| builder = NoOpBuilder() | ||
|
|
||
| image_spec = ImageSpec(base_image="localhost:30000/flytekit") | ||
| image = builder.build_image(image_spec) | ||
| assert image == "localhost:30000/flytekit" | ||
|
|
||
|
|
||
| @pytest.mark.parametrize("base_image", [ | ||
| None, | ||
| ImageSpec(base_image="another_none") | ||
| ]) | ||
| def test_noop_builder_error(base_image): | ||
| builder = NoOpBuilder() | ||
|
|
||
| msg = "base_image must be a string to use the noop image builder" | ||
| image_spec = ImageSpec(base_image=base_image) | ||
| with pytest.raises(ValueError, match=msg): | ||
| builder.build_image(image_spec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we uninstall this after?