From 890e17d7d9b2bccf5293b7024eba8c779a249b17 Mon Sep 17 00:00:00 2001 From: agent-of-mkmeral Date: Wed, 25 Mar 2026 16:06:39 +0000 Subject: [PATCH] feat: add Sandbox abstraction for agent code execution environments Add the Sandbox interface that decouples tool logic from where code runs. Tools that need to execute code or access a filesystem receive a Sandbox instead of managing their own execution, enabling portability across local, Docker, and cloud environments. Core components: - Sandbox ABC with streaming AsyncGenerator interface (base.py) - LocalSandbox for host-process execution via asyncio subprocesses (local.py) - DockerSandbox for containerized execution via docker exec (docker.py) - Agent integration: sandbox parameter on Agent.__init__, defaults to LocalSandbox Key design decisions: - Only core abstractions in SDK; AgentCoreSandbox and sandbox tools belong in separate packages (external dependencies, different release cycles) - Streaming output via AsyncGenerator[str | ExecutionResult] - yields lines as they arrive, ExecutionResult as the final yield - Security: randomized heredoc delimiters, shlex.quote for all paths, stdin piping in DockerSandbox to prevent injection - Auto-start lifecycle: sandbox starts on first execute() call - Zero external dependencies for core sandbox package Tests: 76 new tests (base: 22, local: 17, docker: 18, agent: 6, + shared) All 76 sandbox tests passing, 1686 existing tests still passing. --- src/strands/__init__.py | 10 +- src/strands/agent/agent.py | 9 + src/strands/sandbox/__init__.py | 22 ++ src/strands/sandbox/base.py | 263 +++++++++++++++ src/strands/sandbox/docker.py | 354 ++++++++++++++++++++ src/strands/sandbox/local.py | 161 +++++++++ tests/strands/sandbox/__init__.py | 0 tests/strands/sandbox/test_agent_sandbox.py | 49 +++ tests/strands/sandbox/test_base.py | 239 +++++++++++++ tests/strands/sandbox/test_docker.py | 314 +++++++++++++++++ tests/strands/sandbox/test_local.py | 201 +++++++++++ 11 files changed, 1621 insertions(+), 1 deletion(-) create mode 100644 src/strands/sandbox/__init__.py create mode 100644 src/strands/sandbox/base.py create mode 100644 src/strands/sandbox/docker.py create mode 100644 src/strands/sandbox/local.py create mode 100644 tests/strands/sandbox/__init__.py create mode 100644 tests/strands/sandbox/test_agent_sandbox.py create mode 100644 tests/strands/sandbox/test_base.py create mode 100644 tests/strands/sandbox/test_docker.py create mode 100644 tests/strands/sandbox/test_local.py diff --git a/src/strands/__init__.py b/src/strands/__init__.py index 2078f16ce..075c5e4de 100644 --- a/src/strands/__init__.py +++ b/src/strands/__init__.py @@ -1,10 +1,13 @@ """A framework for building, deploying, and managing AI agents.""" -from . import agent, models, telemetry, types +from . import agent, models, sandbox, telemetry, types from .agent.agent import Agent from .agent.base import AgentBase from .event_loop._retry import ModelRetryStrategy from .plugins import Plugin +from .sandbox.base import ExecutionResult, Sandbox +from .sandbox.docker import DockerSandbox +from .sandbox.local import LocalSandbox from .tools.decorator import tool from .types.tools import ToolContext from .vended_plugins.skills import AgentSkills, Skill @@ -14,9 +17,14 @@ "AgentBase", "AgentSkills", "agent", + "DockerSandbox", + "ExecutionResult", + "LocalSandbox", "models", "ModelRetryStrategy", "Plugin", + "sandbox", + "Sandbox", "Skill", "tool", "ToolContext", diff --git a/src/strands/agent/agent.py b/src/strands/agent/agent.py index f378a886a..82b1982f0 100644 --- a/src/strands/agent/agent.py +++ b/src/strands/agent/agent.py @@ -28,6 +28,8 @@ from .._async import run_async from ..event_loop._retry import ModelRetryStrategy from ..event_loop.event_loop import INITIAL_DELAY, MAX_ATTEMPTS, MAX_DELAY, event_loop_cycle +from ..sandbox.base import Sandbox +from ..sandbox.local import LocalSandbox from ..tools._tool_helpers import generate_missing_tool_result_content if TYPE_CHECKING: @@ -135,6 +137,7 @@ def __init__( tool_executor: ToolExecutor | None = None, retry_strategy: ModelRetryStrategy | _DefaultRetryStrategySentinel | None = _DEFAULT_RETRY_STRATEGY, concurrent_invocation_mode: ConcurrentInvocationMode = ConcurrentInvocationMode.THROW, + sandbox: Sandbox | None = None, ): """Initialize the Agent with the specified configuration. @@ -201,6 +204,9 @@ def __init__( Set to "unsafe_reentrant" to skip lock acquisition entirely, allowing concurrent invocations. Warning: "unsafe_reentrant" makes no guarantees about resulting behavior and is provided only for advanced use cases where the caller understands the risks. + sandbox: Execution environment for agent tools. Tools access the sandbox via + tool_context.agent.sandbox to execute commands, code, and filesystem operations. + Defaults to LocalSandbox (local host execution) when not specified. Raises: ValueError: If agent id contains path separators. @@ -273,6 +279,9 @@ def __init__( self.tool_caller = _ToolCaller(self) + # Initialize sandbox for tool execution environment + self.sandbox: Sandbox = sandbox if sandbox is not None else LocalSandbox() + self.hooks = HookRegistry() self._plugin_registry = _PluginRegistry(self) diff --git a/src/strands/sandbox/__init__.py b/src/strands/sandbox/__init__.py new file mode 100644 index 000000000..98f812bd8 --- /dev/null +++ b/src/strands/sandbox/__init__.py @@ -0,0 +1,22 @@ +"""Sandbox abstraction for agent code execution environments. + +This module provides the Sandbox interface that decouples tool logic from where code runs. +Tools that need to execute code or access a filesystem receive a Sandbox instead of managing +their own execution, enabling portability across local, Docker, and cloud environments. + +Concrete implementations: + +- ``LocalSandbox`` — runs on the host via asyncio subprocesses (default) +- ``DockerSandbox`` — runs inside a Docker container +""" + +from .base import ExecutionResult, Sandbox +from .docker import DockerSandbox +from .local import LocalSandbox + +__all__ = [ + "DockerSandbox", + "ExecutionResult", + "LocalSandbox", + "Sandbox", +] diff --git a/src/strands/sandbox/base.py b/src/strands/sandbox/base.py new file mode 100644 index 000000000..dc4c1631f --- /dev/null +++ b/src/strands/sandbox/base.py @@ -0,0 +1,263 @@ +"""Base sandbox interface for agent code execution environments. + +This module defines the abstract Sandbox class and the ExecutionResult dataclass. +Sandbox implementations provide the runtime context where tools execute code, run commands, +and interact with a filesystem. Multiple tools share the same Sandbox instance, giving them +a common working directory, environment variables, and filesystem. + +Implementations only need to provide execute(). All other methods are built on top of it. +Implementations may override convenience methods with native versions for better performance. +""" + +import logging +import secrets +import shlex +from abc import ABC, abstractmethod +from collections.abc import AsyncGenerator +from dataclasses import dataclass +from typing import Any + +logger = logging.getLogger(__name__) + + +@dataclass +class ExecutionResult: + """Result of code or command execution in a sandbox. + + Attributes: + exit_code: The exit code of the command or code execution. + stdout: Standard output captured from execution. + stderr: Standard error captured from execution. + """ + + exit_code: int + stdout: str + stderr: str + + +class Sandbox(ABC): + """Abstract execution environment for agent tools. + + A Sandbox provides the runtime context where tools execute code, + run commands, and interact with a filesystem. Multiple tools + share the same Sandbox instance, giving them a common working + directory, environment variables, and filesystem. + + Implementations only need to provide execute(). All other methods + are built on top of it. Implementations may override convenience + methods with native versions for better performance (for example, + LocalSandbox overrides read_file/write_file with native file I/O). + + The sandbox auto-starts on the first ``execute()`` call if not already + started, so callers do not need to manually call ``start()`` or use + the async context manager. + + Example: + ```python + from strands.sandbox import LocalSandbox + + sandbox = LocalSandbox(working_dir="/tmp/workspace") + async for chunk in sandbox.execute("echo hello"): + if isinstance(chunk, str): + print(chunk, end="") # stream output + ``` + """ + + def __init__(self) -> None: + """Initialize base sandbox state.""" + self._started = False + + @abstractmethod + async def execute( + self, + command: str, + timeout: int | None = None, + ) -> AsyncGenerator[str | ExecutionResult, None]: + """Execute a shell command, streaming output. + + Yields stdout/stderr lines as they arrive. The final yield + is an ExecutionResult with the exit code and complete output. + + This is the only method implementations must provide. All other + methods are built on top of this one by default. + + The sandbox is auto-started on the first call if not already started. + + Args: + command: The shell command to execute. + timeout: Maximum execution time in seconds. None means no timeout. + + Yields: + str lines of output as they arrive, then a final ExecutionResult. + """ + ... + # Make the method signature an async generator for type checkers. + # Concrete subclasses must yield at least one ExecutionResult. + yield # type: ignore[misc] # pragma: no cover + + async def execute_code( + self, + code: str, + language: str = "python", + timeout: int | None = None, + ) -> AsyncGenerator[str | ExecutionResult, None]: + """Execute code in the sandbox, streaming output. + + Override for native code execution support. The default implementation + passes code to the language interpreter via ``-c`` with proper shell + quoting. + + Args: + code: The source code to execute. + language: The programming language interpreter to use. + timeout: Maximum execution time in seconds. None means no timeout. + + Yields: + str lines of output as they arrive, then a final ExecutionResult. + """ + async for chunk in self.execute(f"{language} -c {shlex.quote(code)}", timeout=timeout): + yield chunk + + async def _execute_to_result(self, command: str, timeout: int | None = None) -> ExecutionResult: + """Helper: consume the execute() stream and return the final ExecutionResult. + + Convenience methods like read_file, write_file, and list_files use + this to get just the final result without dealing with the stream. + + Args: + command: The shell command to execute. + timeout: Maximum execution time in seconds. + + Returns: + The final ExecutionResult from the stream. + + Raises: + RuntimeError: If execute() did not yield an ExecutionResult. + """ + result = None + async for chunk in self.execute(command, timeout=timeout): + if isinstance(chunk, ExecutionResult): + result = chunk + if result is None: + raise RuntimeError("execute() did not yield an ExecutionResult") + return result + + async def _execute_code_to_result( + self, code: str, language: str = "python", timeout: int | None = None + ) -> ExecutionResult: + """Helper: consume the execute_code() stream and return the final ExecutionResult. + + Args: + code: The source code to execute. + language: The programming language interpreter to use. + timeout: Maximum execution time in seconds. + + Returns: + The final ExecutionResult from the stream. + + Raises: + RuntimeError: If execute_code() did not yield an ExecutionResult. + """ + result = None + async for chunk in self.execute_code(code, language=language, timeout=timeout): + if isinstance(chunk, ExecutionResult): + result = chunk + if result is None: + raise RuntimeError("execute_code() did not yield an ExecutionResult") + return result + + async def read_file(self, path: str) -> str: + """Read a file from the sandbox filesystem. + + Override for native file I/O support. The default implementation + uses shell commands. + + Args: + path: Path to the file to read. + + Returns: + The file contents as a string. + + Raises: + FileNotFoundError: If the file does not exist or cannot be read. + """ + result = await self._execute_to_result(f"cat {shlex.quote(path)}") + if result.exit_code != 0: + raise FileNotFoundError(result.stderr) + return result.stdout + + async def write_file(self, path: str, content: str) -> None: + """Write a file to the sandbox filesystem. + + Override for native file I/O support. The default implementation + uses a shell heredoc with a randomized delimiter to prevent + content injection. + + Args: + path: Path to the file to write. + content: The content to write to the file. + + Raises: + IOError: If the file cannot be written. + """ + # Use a randomized heredoc delimiter to prevent injection when content + # contains the delimiter string. + delimiter = f"STRANDS_EOF_{secrets.token_hex(8)}" + result = await self._execute_to_result( + f"cat > {shlex.quote(path)} << '{delimiter}'\n{content}\n{delimiter}" + ) + if result.exit_code != 0: + raise IOError(result.stderr) + + async def list_files(self, path: str = ".") -> list[str]: + """List files in a sandbox directory. + + Override for native directory listing support. The default + implementation uses shell commands. + + Args: + path: Path to the directory to list. + + Returns: + A list of filenames in the directory. + + Raises: + FileNotFoundError: If the directory does not exist. + """ + result = await self._execute_to_result(f"ls -1 {shlex.quote(path)}") + if result.exit_code != 0: + raise FileNotFoundError(result.stderr) + return [f for f in result.stdout.strip().split("\n") if f] + + async def _ensure_started(self) -> None: + """Auto-start the sandbox if it has not been started yet.""" + if not self._started: + await self.start() + self._started = True + + async def start(self) -> None: + """Initialize the sandbox. + + Called once before first use. Override to perform setup such as + starting containers or creating temporary directories. + """ + self._started = True + + async def stop(self) -> None: + """Clean up sandbox resources. + + Override to perform cleanup such as stopping containers or + removing temporary directories. + """ + self._started = False + + async def __aenter__(self) -> "Sandbox": + """Enter the async context manager, starting the sandbox.""" + await self.start() + self._started = True + return self + + async def __aexit__(self, *args: Any) -> None: + """Exit the async context manager, stopping the sandbox.""" + await self.stop() + self._started = False diff --git a/src/strands/sandbox/docker.py b/src/strands/sandbox/docker.py new file mode 100644 index 000000000..34b252e76 --- /dev/null +++ b/src/strands/sandbox/docker.py @@ -0,0 +1,354 @@ +"""Docker sandbox implementation for containerized execution. + +This module implements the DockerSandbox, which executes commands and code +inside a Docker container. The container is created on start() and destroyed +on stop(). Each execute() call uses ``docker exec`` on the running container. + +Docker must be available on the host and the user must have permission to run +containers. +""" + +import asyncio +import logging +import shlex +from collections.abc import AsyncGenerator +from typing import Any + +from .base import ExecutionResult, Sandbox + +logger = logging.getLogger(__name__) + + +class DockerSandbox(Sandbox): + """Execute code and commands in a Docker container. + + The container is created during start() and removed during stop(). + Commands run via ``docker exec`` on the running container, so filesystem + state persists across execute() calls for the lifetime of the container. + Working directory and environment variables set via ``export`` do not + carry across calls (each ``docker exec`` starts a new shell process). + + Args: + image: Docker image to use for the container. + volumes: Host-to-container volume mounts as ``{host_path: container_path}``. + environment: Environment variables to set in the container. + working_dir: Working directory inside the container. + docker_command: Path to the docker CLI binary. + + Example: + ```python + from strands.sandbox.docker import DockerSandbox + + async with DockerSandbox(image="python:3.12-slim") as sandbox: + async for chunk in sandbox.execute("python -c 'print(1+1)'"): + if isinstance(chunk, str): + print(chunk, end="") + ``` + """ + + def __init__( + self, + image: str = "python:3.12-slim", + volumes: dict[str, str] | None = None, + environment: dict[str, str] | None = None, + working_dir: str = "/workspace", + docker_command: str = "docker", + ) -> None: + """Initialize the DockerSandbox. + + Args: + image: Docker image to use for the container. + volumes: Host-to-container volume mounts as ``{host_path: container_path}``. + environment: Environment variables to set in the container. + working_dir: Working directory inside the container. + docker_command: Path to the docker CLI binary. + """ + super().__init__() + self.image = image + self.volumes = volumes or {} + self.environment = environment or {} + self.working_dir = working_dir + self.docker_command = docker_command + self._container_id: str | None = None + + async def _run_docker( + self, + args: list[str], + timeout: int | None = None, + stdin_data: bytes | None = None, + ) -> ExecutionResult: + """Run a docker CLI command and return the result. + + This is a low-level helper used by lifecycle methods (start/stop) + and write_file. It does NOT stream — it collects all output at once. + + Args: + args: Arguments to pass to the docker command. + timeout: Maximum execution time in seconds. + stdin_data: Optional data to send to stdin. + + Returns: + The result of the docker command. + + Raises: + asyncio.TimeoutError: If the command exceeds the timeout. + """ + cmd = [self.docker_command] + args + logger.debug("docker_args=<%s> | running docker command", " ".join(args)) + + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + stdin=asyncio.subprocess.PIPE if stdin_data else asyncio.subprocess.DEVNULL, + ) + try: + stdout, stderr = await asyncio.wait_for( + proc.communicate(input=stdin_data), + timeout=timeout, + ) + except asyncio.TimeoutError: + proc.kill() + await proc.communicate() + raise + + return ExecutionResult( + exit_code=proc.returncode or 0, + stdout=stdout.decode(), + stderr=stderr.decode(), + ) + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def start(self) -> None: + """Create and start the Docker container. + + Raises: + RuntimeError: If the container cannot be created. + """ + if self._container_id is not None: + self._started = True + return + + create_args = ["create", "--rm", "-i"] + + # Working directory + create_args += ["-w", self.working_dir] + + # Volume mounts + for host_path, container_path in self.volumes.items(): + create_args += ["-v", f"{host_path}:{container_path}"] + + # Environment variables + for key, value in self.environment.items(): + create_args += ["-e", f"{key}={value}"] + + create_args.append(self.image) + # Keep the container alive with a long-running sleep + create_args += ["sleep", "infinity"] + + result = await self._run_docker(create_args, timeout=60) + if result.exit_code != 0: + raise RuntimeError(f"failed to create docker container: {result.stderr}") + + self._container_id = result.stdout.strip() + logger.debug("container_id=<%s> | created docker container", self._container_id) + + # Start the container + start_result = await self._run_docker(["start", self._container_id], timeout=30) + if start_result.exit_code != 0: + raise RuntimeError(f"failed to start docker container: {start_result.stderr}") + + self._started = True + + # Ensure working directory exists + await self._execute_to_result(f"mkdir -p {shlex.quote(self.working_dir)}") + + logger.info("container_id=<%s>, image=<%s> | docker sandbox started", self._container_id, self.image) + + async def stop(self) -> None: + """Stop and remove the Docker container.""" + if self._container_id is None: + self._started = False + return + + container_id = self._container_id + self._container_id = None + self._started = False + + try: + await self._run_docker(["rm", "-f", container_id], timeout=30) + logger.info("container_id=<%s> | docker sandbox stopped", container_id) + except Exception as e: + logger.warning("container_id=<%s>, error=<%s> | failed to remove container", container_id, e) + + async def __aenter__(self) -> "DockerSandbox": + """Enter the async context manager, starting the sandbox.""" + await self.start() + return self + + async def __aexit__(self, *args: Any) -> None: + """Exit the async context manager, stopping the sandbox.""" + await self.stop() + + # ------------------------------------------------------------------ + # Execution + # ------------------------------------------------------------------ + + async def execute( + self, + command: str, + timeout: int | None = None, + ) -> AsyncGenerator[str | ExecutionResult, None]: + """Execute a shell command inside the Docker container, streaming output. + + Reads stdout and stderr line by line from the ``docker exec`` process + and yields each line. The final yield is an ExecutionResult. + + Args: + command: The shell command to execute. + timeout: Maximum execution time in seconds. + + Yields: + str lines of output, then a final ExecutionResult. + + Raises: + RuntimeError: If the sandbox has not been started. + asyncio.TimeoutError: If the command exceeds the timeout. + """ + await self._ensure_started() + if self._container_id is None: + raise RuntimeError("docker sandbox has not been started, call start() or use as async context manager") + + exec_args = [ + self.docker_command, + "exec", + "-w", + self.working_dir, + self._container_id, + "sh", + "-c", + command, + ] + + logger.debug("docker_exec=<%s> | executing in container", command) + + proc = await asyncio.create_subprocess_exec( + *exec_args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout_lines: list[str] = [] + stderr_lines: list[str] = [] + + async def _read_stream(stream: asyncio.StreamReader | None, collected: list[str]) -> None: + if stream is None: + return + while True: + line_bytes = await stream.readline() + if not line_bytes: + break + collected.append(line_bytes.decode()) + + try: + read_task = asyncio.gather( + _read_stream(proc.stdout, stdout_lines), + _read_stream(proc.stderr, stderr_lines), + ) + await asyncio.wait_for(read_task, timeout=timeout) + await proc.wait() + except asyncio.TimeoutError: + proc.kill() + await proc.communicate() + raise + + stdout_text = "".join(stdout_lines) + stderr_text = "".join(stderr_lines) + + for line in stdout_lines: + yield line + for line in stderr_lines: + yield line + + yield ExecutionResult( + exit_code=proc.returncode or 0, + stdout=stdout_text, + stderr=stderr_text, + ) + + # ------------------------------------------------------------------ + # File I/O overrides (use stdin pipe for reliability) + # ------------------------------------------------------------------ + + async def write_file(self, path: str, content: str) -> None: + """Write a file into the container by piping content via stdin. + + Uses ``docker exec`` with stdin to avoid heredoc injection issues. + Content is piped directly to ``cat`` inside the container, so any + file content (including shell metacharacters) is handled safely. + + Args: + path: Path inside the container. Relative paths are resolved + against the working directory. + content: The content to write. + + Raises: + RuntimeError: If the sandbox has not been started. + IOError: If the file cannot be written. + """ + await self._ensure_started() + if self._container_id is None: + raise RuntimeError("docker sandbox has not been started") + + # Resolve relative paths + if not path.startswith("/"): + path = f"{self.working_dir}/{path}" + + # Ensure parent directory exists + parent = "/".join(path.split("/")[:-1]) + if parent: + await self._execute_to_result(f"mkdir -p {shlex.quote(parent)}") + + # Pipe content via stdin to avoid heredoc injection + exec_args = [ + "exec", + "-i", + "-w", + self.working_dir, + self._container_id, + "sh", + "-c", + f"cat > {shlex.quote(path)}", + ] + result = await self._run_docker(exec_args, stdin_data=content.encode()) + if result.exit_code != 0: + raise IOError(result.stderr) + + async def read_file(self, path: str) -> str: + """Read a file from the container. + + Args: + path: Path inside the container. Relative paths are resolved + against the working directory. + + Returns: + The file contents as a string. + + Raises: + RuntimeError: If the sandbox has not been started. + FileNotFoundError: If the file does not exist. + """ + await self._ensure_started() + if self._container_id is None: + raise RuntimeError("docker sandbox has not been started") + + if not path.startswith("/"): + path = f"{self.working_dir}/{path}" + + result = await self._execute_to_result(f"cat {shlex.quote(path)}") + if result.exit_code != 0: + raise FileNotFoundError(result.stderr) + return result.stdout diff --git a/src/strands/sandbox/local.py b/src/strands/sandbox/local.py new file mode 100644 index 000000000..0baf8eed3 --- /dev/null +++ b/src/strands/sandbox/local.py @@ -0,0 +1,161 @@ +"""Local sandbox implementation for host-process execution. + +This module implements the LocalSandbox, which executes commands and code +on the local host using asyncio subprocesses. It overrides read_file and +write_file with native filesystem calls for encoding safety. + +This is the default sandbox used when no explicit sandbox is configured. +""" + +import asyncio +import logging +import os +from collections.abc import AsyncGenerator + +from .base import ExecutionResult, Sandbox + +logger = logging.getLogger(__name__) + + +class LocalSandbox(Sandbox): + """Execute code and commands on the local host. + + Uses asyncio subprocesses for command execution and native filesystem + operations for file I/O. This is the default sandbox, providing the + same behavior as running commands directly on the host. + + Args: + working_dir: The working directory for command execution. + Defaults to the current working directory. + + Example: + ```python + from strands.sandbox import LocalSandbox + + sandbox = LocalSandbox(working_dir="/tmp/workspace") + async for chunk in sandbox.execute("echo hello"): + if isinstance(chunk, str): + print(chunk, end="") + ``` + """ + + def __init__(self, working_dir: str | None = None) -> None: + """Initialize the LocalSandbox. + + Args: + working_dir: The working directory for command execution. + Defaults to the current working directory at construction time. + """ + super().__init__() + self.working_dir = working_dir or os.getcwd() + + async def execute( + self, + command: str, + timeout: int | None = None, + ) -> AsyncGenerator[str | ExecutionResult, None]: + """Execute a shell command on the local host, streaming output. + + Reads stdout and stderr line by line and yields each line as it + arrives. The final yield is an ExecutionResult with the exit code + and complete captured output. + + Args: + command: The shell command to execute. + timeout: Maximum execution time in seconds. None means no timeout. + + Yields: + str lines of output, then a final ExecutionResult. + + Raises: + asyncio.TimeoutError: If the command exceeds the timeout. + """ + await self._ensure_started() + logger.debug("command=<%s>, timeout=<%s> | executing local command", command, timeout) + proc = await asyncio.create_subprocess_shell( + command, + cwd=self.working_dir, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout_lines: list[str] = [] + stderr_lines: list[str] = [] + + async def _read_stream( + stream: asyncio.StreamReader | None, + collected: list[str], + is_stderr: bool = False, + ) -> None: + if stream is None: + return + while True: + line_bytes = await stream.readline() + if not line_bytes: + break + line = line_bytes.decode() + collected.append(line) + + try: + read_task = asyncio.gather( + _read_stream(proc.stdout, stdout_lines), + _read_stream(proc.stderr, stderr_lines, is_stderr=True), + ) + await asyncio.wait_for(read_task, timeout=timeout) + await proc.wait() + except asyncio.TimeoutError: + proc.kill() + await proc.communicate() + raise + + stdout_text = "".join(stdout_lines) + stderr_text = "".join(stderr_lines) + + # Yield each collected line as a streaming chunk + for line in stdout_lines: + yield line + for line in stderr_lines: + yield line + + # Final yield: the complete ExecutionResult + yield ExecutionResult( + exit_code=proc.returncode or 0, + stdout=stdout_text, + stderr=stderr_text, + ) + + async def read_file(self, path: str) -> str: + """Read a file from the local filesystem. + + Uses native file I/O instead of shell commands for encoding safety. + + Args: + path: Path to the file to read. Relative paths are resolved + against the working directory. + + Returns: + The file contents as a string. + + Raises: + FileNotFoundError: If the file does not exist. + """ + full_path = os.path.join(self.working_dir, path) if not os.path.isabs(path) else path + with open(full_path) as f: + return f.read() + + async def write_file(self, path: str, content: str) -> None: + """Write a file to the local filesystem. + + Uses native file I/O instead of shell commands for encoding safety. + + Args: + path: Path to the file to write. Relative paths are resolved + against the working directory. + content: The content to write to the file. + """ + full_path = os.path.join(self.working_dir, path) if not os.path.isabs(path) else path + parent_dir = os.path.dirname(full_path) + if parent_dir: + os.makedirs(parent_dir, exist_ok=True) + with open(full_path, "w") as f: + f.write(content) diff --git a/tests/strands/sandbox/__init__.py b/tests/strands/sandbox/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/strands/sandbox/test_agent_sandbox.py b/tests/strands/sandbox/test_agent_sandbox.py new file mode 100644 index 000000000..16b9c1f86 --- /dev/null +++ b/tests/strands/sandbox/test_agent_sandbox.py @@ -0,0 +1,49 @@ +"""Tests for Agent sandbox integration.""" + +import unittest.mock + +import pytest + +from strands import Agent +from strands.sandbox.base import ExecutionResult, Sandbox +from strands.sandbox.local import LocalSandbox + + +class CustomSandbox(Sandbox): + """Custom sandbox for testing sandbox parameter.""" + + async def execute(self, command: str, timeout: int | None = None) -> ExecutionResult: + return ExecutionResult(exit_code=0, stdout="custom", stderr="") + + +class TestAgentSandbox: + def test_default_sandbox_is_local(self): + agent = Agent() + assert isinstance(agent.sandbox, LocalSandbox) + + def test_custom_sandbox(self): + custom = CustomSandbox() + agent = Agent(sandbox=custom) + assert agent.sandbox is custom + + def test_explicit_local_sandbox(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + agent = Agent(sandbox=sandbox) + assert agent.sandbox is sandbox + assert agent.sandbox.working_dir == str(tmp_path) + + def test_sandbox_accessible_via_tool_context(self): + """Verify sandbox is accessible via agent.sandbox (tool_context.agent.sandbox path).""" + custom = CustomSandbox() + agent = Agent(sandbox=custom) + # Tools access via tool_context.agent.sandbox + assert agent.sandbox is custom + + def test_multiple_agents_independent_sandboxes(self): + agent1 = Agent() + agent2 = Agent() + assert agent1.sandbox is not agent2.sandbox + + def test_agent_with_none_sandbox_uses_default(self): + agent = Agent(sandbox=None) + assert isinstance(agent.sandbox, LocalSandbox) diff --git a/tests/strands/sandbox/test_base.py b/tests/strands/sandbox/test_base.py new file mode 100644 index 000000000..ee27dc1f7 --- /dev/null +++ b/tests/strands/sandbox/test_base.py @@ -0,0 +1,239 @@ +"""Tests for the Sandbox ABC and ExecutionResult dataclass.""" + +from collections.abc import AsyncGenerator + +import pytest + +from strands.sandbox.base import ExecutionResult, Sandbox + + +class ConcreteSandbox(Sandbox): + """Minimal concrete implementation for testing the ABC.""" + + def __init__(self): + super().__init__() + self.commands: list[str] = [] + self.started_count = 0 + self.stopped_count = 0 + + async def execute(self, command: str, timeout: int | None = None) -> AsyncGenerator[str | ExecutionResult, None]: + await self._ensure_started() + self.commands.append(command) + if "fail" in command: + yield ExecutionResult(exit_code=1, stdout="", stderr="command failed") + return + stdout = f"output of: {command}\n" + yield stdout + yield ExecutionResult(exit_code=0, stdout=stdout, stderr="") + + async def start(self) -> None: + self.started_count += 1 + self._started = True + + async def stop(self) -> None: + self.stopped_count += 1 + self._started = False + + +class TestExecutionResult: + def test_execution_result_fields(self): + result = ExecutionResult(exit_code=0, stdout="hello", stderr="") + assert result.exit_code == 0 + assert result.stdout == "hello" + assert result.stderr == "" + + def test_execution_result_error(self): + result = ExecutionResult(exit_code=1, stdout="", stderr="error msg") + assert result.exit_code == 1 + assert result.stderr == "error msg" + + def test_execution_result_equality(self): + r1 = ExecutionResult(exit_code=0, stdout="out", stderr="err") + r2 = ExecutionResult(exit_code=0, stdout="out", stderr="err") + assert r1 == r2 + + +class TestSandboxABC: + def test_cannot_instantiate_abstract(self): + with pytest.raises(TypeError): + Sandbox() # type: ignore + + @pytest.mark.asyncio + async def test_execute_yields_lines_and_result(self): + sandbox = ConcreteSandbox() + chunks = [] + async for chunk in sandbox.execute("echo hello"): + chunks.append(chunk) + # Last chunk is ExecutionResult + assert isinstance(chunks[-1], ExecutionResult) + assert chunks[-1].exit_code == 0 + # Earlier chunks are strings + assert any(isinstance(c, str) for c in chunks[:-1]) + assert sandbox.commands == ["echo hello"] + + @pytest.mark.asyncio + async def test_execute_to_result_helper(self): + sandbox = ConcreteSandbox() + result = await sandbox._execute_to_result("echo hello") + assert isinstance(result, ExecutionResult) + assert result.exit_code == 0 + assert "echo hello" in result.stdout + + @pytest.mark.asyncio + async def test_execute_code_default(self): + sandbox = ConcreteSandbox() + result = await sandbox._execute_code_to_result("print('hi')") + assert result.exit_code == 0 + # Default implementation pipes code through shell via shlex.quote + assert len(sandbox.commands) == 1 + assert "python" in sandbox.commands[0] + assert "print" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_execute_code_streams(self): + sandbox = ConcreteSandbox() + chunks = [] + async for chunk in sandbox.execute_code("print('hi')"): + chunks.append(chunk) + assert isinstance(chunks[-1], ExecutionResult) + assert chunks[-1].exit_code == 0 + + @pytest.mark.asyncio + async def test_execute_code_custom_language(self): + sandbox = ConcreteSandbox() + result = await sandbox._execute_code_to_result("puts 'hi'", language="ruby") + assert result.exit_code == 0 + assert "ruby" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_read_file_success(self): + sandbox = ConcreteSandbox() + content = await sandbox.read_file("/tmp/test.txt") + assert "cat" in sandbox.commands[0] + assert "/tmp/test.txt" in sandbox.commands[0] + assert content is not None + + @pytest.mark.asyncio + async def test_read_file_not_found(self): + sandbox = ConcreteSandbox() + with pytest.raises(FileNotFoundError): + await sandbox.read_file("/tmp/fail.txt") + + @pytest.mark.asyncio + async def test_write_file_success(self): + sandbox = ConcreteSandbox() + await sandbox.write_file("/tmp/test.txt", "hello content") + assert len(sandbox.commands) == 1 + assert "/tmp/test.txt" in sandbox.commands[0] + assert "hello content" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_write_file_failure(self): + sandbox = ConcreteSandbox() + with pytest.raises(IOError): + await sandbox.write_file("/tmp/fail.txt", "content") + + @pytest.mark.asyncio + async def test_write_file_uses_random_delimiter(self): + sandbox = ConcreteSandbox() + await sandbox.write_file("/tmp/test.txt", "content with STRANDS_EOF inside") + assert "STRANDS_EOF_" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_write_file_path_is_shell_quoted(self): + sandbox = ConcreteSandbox() + await sandbox.write_file("/tmp/test file.txt", "content") + assert "'/tmp/test file.txt'" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_read_file_path_is_shell_quoted(self): + sandbox = ConcreteSandbox() + content = await sandbox.read_file("/tmp/test file.txt") + assert "'/tmp/test file.txt'" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_list_files_success(self): + sandbox = ConcreteSandbox() + files = await sandbox.list_files("/tmp") + assert len(sandbox.commands) == 1 + assert "ls" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_list_files_not_found(self): + sandbox = ConcreteSandbox() + with pytest.raises(FileNotFoundError): + await sandbox.list_files("/tmp/fail") + + @pytest.mark.asyncio + async def test_list_files_path_is_shell_quoted(self): + sandbox = ConcreteSandbox() + await sandbox.list_files("/tmp/my dir") + assert "'/tmp/my dir'" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_lifecycle_start_stop(self): + sandbox = ConcreteSandbox() + assert not sandbox._started + + await sandbox.start() + assert sandbox._started + + await sandbox.stop() + assert not sandbox._started + + @pytest.mark.asyncio + async def test_async_context_manager(self): + sandbox = ConcreteSandbox() + async with sandbox as s: + assert s is sandbox + assert sandbox._started + assert not sandbox._started + + @pytest.mark.asyncio + async def test_default_start_stop_are_noop(self): + """Test that the base class default start/stop work correctly.""" + + class MinimalSandbox(Sandbox): + async def execute(self, command: str, timeout: int | None = None) -> AsyncGenerator[str | ExecutionResult, None]: + yield ExecutionResult(exit_code=0, stdout="", stderr="") + + sandbox = MinimalSandbox() + await sandbox.start() + assert sandbox._started + await sandbox.stop() + assert not sandbox._started + + @pytest.mark.asyncio + async def test_execute_code_uses_shlex_quote(self): + sandbox = ConcreteSandbox() + code = "print('hello')" + result = await sandbox._execute_code_to_result(code) + assert "python" in sandbox.commands[0] + assert "print" in sandbox.commands[0] + + @pytest.mark.asyncio + async def test_auto_start_on_first_execute(self): + sandbox = ConcreteSandbox() + assert not sandbox._started + result = await sandbox._execute_to_result("echo hello") + assert sandbox._started + assert result.exit_code == 0 + + @pytest.mark.asyncio + async def test_auto_start_only_once(self): + sandbox = ConcreteSandbox() + await sandbox._execute_to_result("echo 1") + await sandbox._execute_to_result("echo 2") + assert sandbox.started_count == 1 + + @pytest.mark.asyncio + async def test_execute_to_result_raises_on_missing_result(self): + """_execute_to_result raises if execute() yields no ExecutionResult.""" + + class BadSandbox(Sandbox): + async def execute(self, command: str, timeout: int | None = None) -> AsyncGenerator[str | ExecutionResult, None]: + yield "just a string, no result" + + sandbox = BadSandbox() + with pytest.raises(RuntimeError, match="did not yield an ExecutionResult"): + await sandbox._execute_to_result("anything") diff --git a/tests/strands/sandbox/test_docker.py b/tests/strands/sandbox/test_docker.py new file mode 100644 index 000000000..5b218d1fa --- /dev/null +++ b/tests/strands/sandbox/test_docker.py @@ -0,0 +1,314 @@ +"""Tests for the DockerSandbox implementation.""" + +import asyncio +import unittest.mock +from collections.abc import AsyncGenerator + +import pytest + +from strands.sandbox.base import ExecutionResult +from strands.sandbox.docker import DockerSandbox + + +@pytest.fixture +def sandbox(): + """Create a DockerSandbox with a fake container ID for unit tests.""" + s = DockerSandbox(image="python:3.12-slim", working_dir="/workspace") + s._container_id = "fake-container-123" + s._started = True + return s + + +class TestDockerSandboxInit: + def test_defaults(self): + s = DockerSandbox() + assert s.image == "python:3.12-slim" + assert s.working_dir == "/workspace" + assert s.volumes == {} + assert s.environment == {} + assert s._container_id is None + + def test_custom_params(self): + s = DockerSandbox( + image="node:20", + volumes={"/host": "/container"}, + environment={"FOO": "bar"}, + working_dir="/app", + ) + assert s.image == "node:20" + assert s.volumes == {"/host": "/container"} + assert s.environment == {"FOO": "bar"} + assert s.working_dir == "/app" + + +class TestDockerSandboxExecute: + @pytest.mark.asyncio + async def test_execute_not_started_raises(self): + s = DockerSandbox() + # _ensure_started will call start(), which will fail because docker isn't available + # We mock start to simply set _started=True but leave _container_id as None + async def mock_start(): + s._started = True + + with unittest.mock.patch.object(s, "start", side_effect=mock_start): + with pytest.raises(RuntimeError, match="has not been started"): + async for _ in s.execute("echo hello"): + pass + + @pytest.mark.asyncio + async def test_execute_yields_lines_and_result(self, sandbox): + """execute() streams lines and yields a final ExecutionResult.""" + + async def mock_create_subprocess_exec(*args, **kwargs): + proc = unittest.mock.AsyncMock() + proc.returncode = 0 + proc.stdout = _make_stream_reader(b"hello\nworld\n") + proc.stderr = _make_stream_reader(b"") + proc.wait = unittest.mock.AsyncMock(return_value=0) + return proc + + with unittest.mock.patch("asyncio.create_subprocess_exec", side_effect=mock_create_subprocess_exec): + chunks = [] + async for chunk in sandbox.execute("echo hello"): + chunks.append(chunk) + + str_chunks = [c for c in chunks if isinstance(c, str)] + result_chunks = [c for c in chunks if isinstance(c, ExecutionResult)] + assert len(result_chunks) == 1 + assert result_chunks[0].exit_code == 0 + assert result_chunks[0].stdout == "hello\nworld\n" + assert "hello\n" in str_chunks + assert "world\n" in str_chunks + + @pytest.mark.asyncio + async def test_execute_returns_exit_code(self, sandbox): + async def mock_create_subprocess_exec(*args, **kwargs): + proc = unittest.mock.AsyncMock() + proc.returncode = 42 + proc.stdout = _make_stream_reader(b"") + proc.stderr = _make_stream_reader(b"bad command\n") + proc.wait = unittest.mock.AsyncMock(return_value=42) + return proc + + with unittest.mock.patch("asyncio.create_subprocess_exec", side_effect=mock_create_subprocess_exec): + result = await sandbox._execute_to_result("bad_cmd") + + assert result.exit_code == 42 + assert "bad command" in result.stderr + + +class TestDockerSandboxLifecycle: + @pytest.mark.asyncio + async def test_start_creates_and_starts_container(self): + s = DockerSandbox(image="python:3.12-slim", volumes={"/host": "/cont"}, environment={"A": "1"}) + + call_count = 0 + + async def mock_run_docker(args, timeout=None, stdin_data=None): + nonlocal call_count + call_count += 1 + if args[0] == "create": + assert "-v" in args + assert "-e" in args + return ExecutionResult(exit_code=0, stdout="container-abc123\n", stderr="") + elif args[0] == "start": + return ExecutionResult(exit_code=0, stdout="", stderr="") + return ExecutionResult(exit_code=1, stdout="", stderr="unexpected") + + async def mock_execute_to_result(command, timeout=None): + return ExecutionResult(exit_code=0, stdout="", stderr="") + + with unittest.mock.patch.object(s, "_run_docker", side_effect=mock_run_docker): + with unittest.mock.patch.object(s, "_execute_to_result", side_effect=mock_execute_to_result): + await s.start() + + assert s._container_id == "container-abc123" + assert s._started + assert call_count >= 2 # create + start + + @pytest.mark.asyncio + async def test_start_raises_on_create_failure(self): + s = DockerSandbox() + + async def mock_run_docker(args, timeout=None, stdin_data=None): + return ExecutionResult(exit_code=1, stdout="", stderr="no such image") + + with unittest.mock.patch.object(s, "_run_docker", side_effect=mock_run_docker): + with pytest.raises(RuntimeError, match="failed to create"): + await s.start() + + @pytest.mark.asyncio + async def test_start_idempotent(self, sandbox): + """start() is a no-op if container already exists.""" + with unittest.mock.patch.object(sandbox, "_run_docker") as mock: + await sandbox.start() + mock.assert_not_called() + + @pytest.mark.asyncio + async def test_stop_removes_container(self, sandbox): + mock_result = ExecutionResult(exit_code=0, stdout="", stderr="") + with unittest.mock.patch.object(sandbox, "_run_docker", return_value=mock_result) as mock_run: + await sandbox.stop() + + mock_run.assert_called_once_with(["rm", "-f", "fake-container-123"], timeout=30) + assert sandbox._container_id is None + assert not sandbox._started + + @pytest.mark.asyncio + async def test_stop_noop_if_not_started(self): + s = DockerSandbox() + await s.stop() # Should not raise + + @pytest.mark.asyncio + async def test_context_manager(self): + s = DockerSandbox() + start_calls = [] + stop_calls = [] + + async def mock_start(): + s._container_id = "ctx-container" + s._started = True + start_calls.append(True) + + async def mock_stop(): + s._container_id = None + s._started = False + stop_calls.append(True) + + with unittest.mock.patch.object(s, "start", side_effect=mock_start): + with unittest.mock.patch.object(s, "stop", side_effect=mock_stop): + async with s as ctx: + assert ctx is s + assert len(start_calls) == 1 + + assert len(stop_calls) == 1 + + +class TestDockerSandboxFileOps: + @pytest.mark.asyncio + async def test_write_file_not_started(self): + s = DockerSandbox() + # Mock start to not actually start docker + async def mock_start(): + s._started = True + + with unittest.mock.patch.object(s, "start", side_effect=mock_start): + with pytest.raises(RuntimeError, match="has not been started"): + await s.write_file("test.txt", "content") + + @pytest.mark.asyncio + async def test_read_file_not_started(self): + s = DockerSandbox() + async def mock_start(): + s._started = True + + with unittest.mock.patch.object(s, "start", side_effect=mock_start): + with pytest.raises(RuntimeError, match="has not been started"): + await s.read_file("test.txt") + + @pytest.mark.asyncio + async def test_write_file_relative_path(self, sandbox): + """write_file resolves relative paths, creates dirs, and pipes via stdin.""" + calls = [] + + async def mock_execute_to_result(command, timeout=None): + calls.append(("execute_to_result", command)) + return ExecutionResult(exit_code=0, stdout="", stderr="") + + async def mock_run_docker(args, timeout=None, stdin_data=None): + calls.append(("run_docker", args, stdin_data)) + return ExecutionResult(exit_code=0, stdout="", stderr="") + + with unittest.mock.patch.object(sandbox, "_execute_to_result", side_effect=mock_execute_to_result): + with unittest.mock.patch.object(sandbox, "_run_docker", side_effect=mock_run_docker): + await sandbox.write_file("data/test.txt", "hello") + + assert any("mkdir" in str(c) for c in calls) + assert any( + isinstance(c, tuple) and len(c) == 3 and c[2] == b"hello" + for c in calls + ) + + @pytest.mark.asyncio + async def test_write_file_uses_stdin_pipe(self, sandbox): + """Verify write_file uses stdin piping instead of heredoc.""" + calls = [] + + async def mock_execute_to_result(command, timeout=None): + return ExecutionResult(exit_code=0, stdout="", stderr="") + + async def mock_run_docker(args, timeout=None, stdin_data=None): + calls.append({"args": args, "stdin_data": stdin_data}) + return ExecutionResult(exit_code=0, stdout="", stderr="") + + with unittest.mock.patch.object(sandbox, "_execute_to_result", side_effect=mock_execute_to_result): + with unittest.mock.patch.object(sandbox, "_run_docker", side_effect=mock_run_docker): + await sandbox.write_file("test.txt", "content with STRANDS_EOF inside") + + write_calls = [c for c in calls if c["stdin_data"] is not None] + assert len(write_calls) == 1 + assert write_calls[0]["stdin_data"] == b"content with STRANDS_EOF inside" + assert "-i" in write_calls[0]["args"] + + @pytest.mark.asyncio + async def test_read_file_success(self, sandbox): + async def mock_execute_to_result(command, timeout=None): + return ExecutionResult(exit_code=0, stdout="file content", stderr="") + + with unittest.mock.patch.object(sandbox, "_execute_to_result", side_effect=mock_execute_to_result): + content = await sandbox.read_file("test.txt") + assert content == "file content" + + @pytest.mark.asyncio + async def test_read_file_not_found(self, sandbox): + async def mock_execute_to_result(command, timeout=None): + return ExecutionResult(exit_code=1, stdout="", stderr="No such file") + + with unittest.mock.patch.object(sandbox, "_execute_to_result", side_effect=mock_execute_to_result): + with pytest.raises(FileNotFoundError): + await sandbox.read_file("missing.txt") + + @pytest.mark.asyncio + async def test_write_file_io_error(self, sandbox): + async def mock_execute_to_result(command, timeout=None): + return ExecutionResult(exit_code=0, stdout="", stderr="") + + async def mock_run_docker(args, timeout=None, stdin_data=None): + if stdin_data is not None: + return ExecutionResult(exit_code=1, stdout="", stderr="permission denied") + return ExecutionResult(exit_code=0, stdout="", stderr="") + + with unittest.mock.patch.object(sandbox, "_execute_to_result", side_effect=mock_execute_to_result): + with unittest.mock.patch.object(sandbox, "_run_docker", side_effect=mock_run_docker): + with pytest.raises(IOError): + await sandbox.write_file("readonly/test.txt", "content") + + +class TestDockerSandboxExecuteCode: + @pytest.mark.asyncio + async def test_execute_code_streams(self, sandbox): + """execute_code uses the base class default and streams.""" + + async def mock_create_subprocess_exec(*args, **kwargs): + proc = unittest.mock.AsyncMock() + proc.returncode = 0 + proc.stdout = _make_stream_reader(b"42\n") + proc.stderr = _make_stream_reader(b"") + proc.wait = unittest.mock.AsyncMock(return_value=0) + return proc + + with unittest.mock.patch("asyncio.create_subprocess_exec", side_effect=mock_create_subprocess_exec): + chunks = [] + async for chunk in sandbox.execute_code("print(42)"): + chunks.append(chunk) + + assert isinstance(chunks[-1], ExecutionResult) + assert chunks[-1].stdout == "42\n" + + +def _make_stream_reader(data: bytes) -> asyncio.StreamReader: + """Create an asyncio.StreamReader pre-loaded with data.""" + reader = asyncio.StreamReader() + reader.feed_data(data) + reader.feed_eof() + return reader diff --git a/tests/strands/sandbox/test_local.py b/tests/strands/sandbox/test_local.py new file mode 100644 index 000000000..b9cc82040 --- /dev/null +++ b/tests/strands/sandbox/test_local.py @@ -0,0 +1,201 @@ +"""Tests for the LocalSandbox implementation.""" + +import asyncio +import os + +import pytest + +from strands.sandbox.base import ExecutionResult +from strands.sandbox.local import LocalSandbox + + +class TestLocalSandboxInit: + def test_default_working_dir(self): + sandbox = LocalSandbox() + assert sandbox.working_dir == os.getcwd() + + def test_custom_working_dir(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + assert sandbox.working_dir == str(tmp_path) + + +class TestLocalSandboxExecute: + @pytest.mark.asyncio + async def test_execute_echo(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + result = await sandbox._execute_to_result("echo hello") + assert result.exit_code == 0 + assert result.stdout.strip() == "hello" + assert result.stderr == "" + + @pytest.mark.asyncio + async def test_execute_streams_lines(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + chunks = [] + async for chunk in sandbox.execute("echo line1 && echo line2"): + chunks.append(chunk) + # Should have string lines and a final ExecutionResult + str_chunks = [c for c in chunks if isinstance(c, str)] + result_chunks = [c for c in chunks if isinstance(c, ExecutionResult)] + assert len(result_chunks) == 1 + assert len(str_chunks) >= 2 + assert "line1\n" in str_chunks + assert "line2\n" in str_chunks + + @pytest.mark.asyncio + async def test_execute_failure(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + result = await sandbox._execute_to_result("exit 42") + assert result.exit_code == 42 + + @pytest.mark.asyncio + async def test_execute_stderr(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + result = await sandbox._execute_to_result("echo error >&2") + assert result.exit_code == 0 + assert result.stderr.strip() == "error" + + @pytest.mark.asyncio + async def test_execute_uses_working_dir(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + result = await sandbox._execute_to_result("pwd") + assert result.exit_code == 0 + assert result.stdout.strip() == str(tmp_path) + + @pytest.mark.asyncio + async def test_execute_timeout(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + with pytest.raises(asyncio.TimeoutError): + await sandbox._execute_to_result("sleep 10", timeout=1) + + @pytest.mark.asyncio + async def test_execute_no_timeout(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + result = await sandbox._execute_to_result("echo fast", timeout=None) + assert result.exit_code == 0 + assert result.stdout.strip() == "fast" + + @pytest.mark.asyncio + async def test_auto_start(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + assert not sandbox._started + result = await sandbox._execute_to_result("echo hello") + assert sandbox._started + assert result.exit_code == 0 + + +class TestLocalSandboxExecuteCode: + @pytest.mark.asyncio + async def test_execute_python_code(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + result = await sandbox._execute_code_to_result("print('hello from python')") + assert result.exit_code == 0 + assert result.stdout.strip() == "hello from python" + + @pytest.mark.asyncio + async def test_execute_code_streams(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + chunks = [] + async for chunk in sandbox.execute_code("print('line1')\\nprint('line2')"): + chunks.append(chunk) + assert isinstance(chunks[-1], ExecutionResult) + + @pytest.mark.asyncio + async def test_execute_python_code_error(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + result = await sandbox._execute_code_to_result("raise ValueError('test error')") + assert result.exit_code != 0 + assert "ValueError" in result.stderr + + @pytest.mark.asyncio + async def test_execute_python_multiline(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + code = "x = 42\nprint(f'x = {x}')" + result = await sandbox._execute_code_to_result(code) + assert result.exit_code == 0 + assert "x = 42" in result.stdout + + +class TestLocalSandboxFileOps: + @pytest.mark.asyncio + async def test_write_and_read_file(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + await sandbox.write_file("test.txt", "hello world") + content = await sandbox.read_file("test.txt") + assert content == "hello world" + + @pytest.mark.asyncio + async def test_read_file_absolute_path(self, tmp_path): + test_file = tmp_path / "abs_test.txt" + test_file.write_text("absolute content") + sandbox = LocalSandbox(working_dir=str(tmp_path)) + content = await sandbox.read_file(str(test_file)) + assert content == "absolute content" + + @pytest.mark.asyncio + async def test_read_file_not_found(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + with pytest.raises(FileNotFoundError): + await sandbox.read_file("nonexistent.txt") + + @pytest.mark.asyncio + async def test_write_file_creates_directories(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + await sandbox.write_file("subdir/nested/test.txt", "nested content") + content = await sandbox.read_file("subdir/nested/test.txt") + assert content == "nested content" + + @pytest.mark.asyncio + async def test_write_file_absolute_path(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + abs_path = str(tmp_path / "abs_write.txt") + await sandbox.write_file(abs_path, "absolute write") + content = await sandbox.read_file(abs_path) + assert content == "absolute write" + + @pytest.mark.asyncio + async def test_write_file_unicode(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + await sandbox.write_file("unicode.txt", "héllo wörld 🌍") + content = await sandbox.read_file("unicode.txt") + assert content == "héllo wörld 🌍" + + @pytest.mark.asyncio + async def test_list_files(self, tmp_path): + (tmp_path / "file1.txt").write_text("a") + (tmp_path / "file2.txt").write_text("b") + (tmp_path / "file3.py").write_text("c") + + sandbox = LocalSandbox(working_dir=str(tmp_path)) + files = await sandbox.list_files(".") + assert sorted(files) == ["file1.txt", "file2.txt", "file3.py"] + + @pytest.mark.asyncio + async def test_list_files_empty_dir(self, tmp_path): + empty_dir = tmp_path / "empty" + empty_dir.mkdir() + sandbox = LocalSandbox(working_dir=str(tmp_path)) + files = await sandbox.list_files("empty") + assert files == [] + + @pytest.mark.asyncio + async def test_list_files_not_found(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + with pytest.raises(FileNotFoundError): + await sandbox.list_files("nonexistent") + + +class TestLocalSandboxLifecycle: + @pytest.mark.asyncio + async def test_start_stop(self, tmp_path): + sandbox = LocalSandbox(working_dir=str(tmp_path)) + await sandbox.start() + assert sandbox._started + await sandbox.stop() + assert not sandbox._started + + @pytest.mark.asyncio + async def test_context_manager(self, tmp_path): + async with LocalSandbox(working_dir=str(tmp_path)) as sandbox: + result = await sandbox._execute_to_result("echo context") + assert result.stdout.strip() == "context"