Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8f1634b
feat: migrate asyncio usage to anyio across the codebase
raballew May 13, 2026
060ad9e
fix: correct type annotation for get_minikube_ip profile parameter
raballew May 13, 2026
793d993
fix: remove redundant inner timeout from TLS connection attempts
raballew May 13, 2026
0938967
fix: add task group cleanup for websocket telemetry handler
raballew May 13, 2026
db17b76
fix: resolve lint and type-check failures in asyncio-to-anyio migration
raballew May 13, 2026
42a0089
fix: replace process.communicate() with anyio.run_process() in operator
raballew May 13, 2026
f83ae20
fix: use BufferedByteReceiveStream for line reading in QEMU driver
raballew May 13, 2026
4632430
fix: replace remaining pytest.mark.asyncio with pytest.mark.anyio
raballew May 13, 2026
82ebbc6
fix: migrate dut-network tcpdump from asyncio to anyio
raballew May 13, 2026
a210995
fix: read all chunks until EndOfStream in shell driver read_all mode
raballew May 13, 2026
fcf1757
fix: handle ExceptionGroup in mitmproxy websocket handler shutdown
raballew May 13, 2026
2e506dc
fix: rename timeout test and document output discard design decision
raballew May 13, 2026
a4f76a4
fix: add sniffio as explicit dependency
raballew May 13, 2026
a726a96
fix: document pyserial asyncio usage as known exception to anyio migr…
raballew May 13, 2026
20b2bea
fix: strengthen QEMU inner wait timeout test assertions
raballew May 13, 2026
710683c
fix: document CPython-internal _sslobj usage for certificate extraction
raballew May 13, 2026
1104b95
fix: handle IncompleteRead from BufferedByteReceiveStream
raballew May 13, 2026
600334a
fix: defer errors from task group to avoid ExceptionGroup wrapping
raballew May 13, 2026
9a7cf4b
fix: resolve lint violations and reduce shell driver complexity
raballew May 13, 2026
136865d
fix: narrow shell driver exception handler and add process cleanup
raballew May 13, 2026
cd40fd0
fix: resolve CI failures from anyio migration
raballew May 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions python/docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
# -- Project information -----------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information

import asyncio
import os
import sys

import anyio
from jumpstarter_kubernetes.controller import get_latest_compatible_controller_version

os.environ["TERM"] = "dumb"
Expand Down Expand Up @@ -64,7 +64,9 @@ def get_controller_version():
else:
version = None

return asyncio.run(get_latest_compatible_controller_version(client_version=version))
async def _run():
return await get_latest_compatible_controller_version(client_version=version)
return anyio.run(_run)


def get_index_url():
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from asyncio import run
from functools import wraps

import anyio


def blocking(f):
@wraps(f)
def wrapper(*args, **kwargs):
return run(f(*args, **kwargs))
async def _run():
return await f(*args, **kwargs)
return anyio.run(_run)

return wrapper
16 changes: 10 additions & 6 deletions python/packages/jumpstarter-cli/jumpstarter_cli/login_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import asyncio
import json
import ssl
from unittest.mock import AsyncMock, MagicMock, patch

import anyio
import click
import pytest
from click.testing import CliRunner
Expand Down Expand Up @@ -84,7 +84,9 @@ def get(self, *args, **kwargs):
monkeypatch.setattr("jumpstarter_cli.login.aiohttp.ClientSession", FakeClientSession)

with pytest.raises(click.ClickException, match="Timed out while connecting"):
asyncio.run(fetch_auth_config("login.example.com"))
async def _run():
return await fetch_auth_config("login.example.com")
anyio.run(_run)


def test_fetch_auth_config_maps_json_decode_error(monkeypatch) -> None:
Expand Down Expand Up @@ -116,7 +118,9 @@ def get(self, *args, **kwargs):
monkeypatch.setattr("jumpstarter_cli.login.aiohttp.ClientSession", FakeClientSession)

with pytest.raises(click.ClickException, match="Invalid JSON response received"):
asyncio.run(fetch_auth_config("login.example.com"))
async def _run():
return await fetch_auth_config("login.example.com")
anyio.run(_run)


def test_login_cli_shows_timeout_message(monkeypatch) -> None:
Expand Down Expand Up @@ -151,13 +155,13 @@ async def fake_fetch_auth_config(*args, **kwargs):
assert "TLS certificate verification failed" in result.output


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_fetch_auth_config_rejects_http_without_insecure_tls():
with pytest.raises(click.UsageError, match="--insecure-tls"):
await fetch_auth_config("http://login.example.com", insecure_tls=False)


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_fetch_auth_config_allows_explicit_http_with_insecure_tls():
mock_response = MagicMock()
mock_response.status = 200
Expand All @@ -183,7 +187,7 @@ async def test_fetch_auth_config_allows_explicit_http_with_insecure_tls():
assert result["grpcEndpoint"] == "grpc.example.com"


@pytest.mark.asyncio
@pytest.mark.anyio
async def test_fetch_auth_config_defaults_to_https_with_insecure_tls():
mock_response = MagicMock()
mock_response.status = 200
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import asyncio
import asyncio.subprocess
import ipaddress
import shutil
import socket
Expand All @@ -8,8 +6,14 @@
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field
from pathlib import Path
from subprocess import PIPE
from typing import Literal, TypedDict

import anyio
from anyio import IncompleteRead
from anyio.abc import Process
from anyio.streams.buffered import BufferedByteReceiveStream

from . import dnsmasq, iproute, nftables
from .ntp_server import NtpServer
from jumpstarter.driver import Driver, export
Expand Down Expand Up @@ -67,7 +71,7 @@ class DutNetwork(Driver):
_added_aliases: set[str] = field(init=False, default_factory=set)
_fwd_rule_handles: list[int] = field(init=False, default_factory=list)
_ntp_server: NtpServer | None = field(init=False, default=None)
_tcpdump_process: asyncio.subprocess.Process | None = field(init=False, default=None)
_tcpdump_process: Process | None = field(init=False, default=None)

@classmethod
def client(cls) -> str:
Expand Down Expand Up @@ -467,21 +471,18 @@ async def tcpdump(self, args: list[str] | None = None) -> AsyncGenerator[str, No

self.logger.info("Starting tcpdump: %s", " ".join(cmd))

proc = await asyncio.subprocess.create_subprocess_exec(
cmd[0],
*cmd[1:],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
proc = await anyio.open_process(cmd, stdout=PIPE, stderr=PIPE)
self._tcpdump_process = proc

try:
assert proc.stdout is not None
buffered = BufferedByteReceiveStream(proc.stdout)
while True:
line = await proc.stdout.readline()
if not line:
try:
line = await buffered.receive_until(b"\n", 1048576)
except (anyio.EndOfStream, anyio.ClosedResourceError, IncompleteRead):
break
yield line.decode("utf-8", errors="replace").rstrip("\n")
yield line.decode("utf-8", errors="replace")
finally:
if proc.returncode is None:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
and the streaming driver method using mocked subprocesses.
"""

import asyncio
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch

import anyio
import anyio.abc
import pytest

from .driver import DutNetwork
Expand Down Expand Up @@ -161,44 +162,50 @@ def test_multiple_blocked_flags(self):
assert DutNetwork._sanitize_tcpdump_args(args) == ["-c", "5"]


def _create_mock_byte_stream(data: bytes):
"""Create a mock ByteReceiveStream that returns data then raises EndOfStream."""
stream = MagicMock(spec=anyio.abc.ByteReceiveStream)
call_count = {"n": 0}

async def mock_receive(max_bytes=65536):
if call_count["n"] >= 1:
raise anyio.EndOfStream()
call_count["n"] += 1
return data

stream.receive = mock_receive
stream.aclose = AsyncMock()
return stream


class TestTcpdumpMethod:
def test_tcpdump_raises_when_disabled(self, tmp_path: Path):
driver = _make_driver(tmp_path, enable_tcpdump=False)
with pytest.raises(RuntimeError, match="tcpdump is not enabled"):
asyncio.run(
_consume_async_gen(driver.tcpdump())
anyio.run(
_consume_async_gen, driver.tcpdump()
)

def test_tcpdump_streams_output(self, tmp_path: Path):
driver = _make_driver(tmp_path, enable_tcpdump=True)

mock_stdout = AsyncMock()
lines = [
b"12:00:00.000000 IP 192.168.100.10 > 8.8.8.8: ICMP echo request\n",
b"12:00:00.001000 IP 8.8.8.8 > 192.168.100.10: ICMP echo reply\n",
b"", # EOF
]
state = {"call_count": 0}

async def mock_readline():
if state["call_count"] < len(lines):
result = lines[state["call_count"]]
state["call_count"] += 1
return result
return b""

mock_stdout.readline = mock_readline
data = (
b"12:00:00.000000 IP 192.168.100.10 > 8.8.8.8: ICMP echo request\n"
b"12:00:00.001000 IP 8.8.8.8 > 192.168.100.10: ICMP echo reply\n"
)
mock_stdout = _create_mock_byte_stream(data)

mock_proc = AsyncMock()
mock_proc.stdout = mock_stdout
mock_proc.stderr = _create_mock_byte_stream(b"")
mock_proc.returncode = None
mock_proc.terminate = MagicMock()
mock_proc.wait = AsyncMock()

with patch(f"{_DRIVER_MODULE}.asyncio.subprocess.create_subprocess_exec",
return_value=mock_proc):
output = asyncio.run(
_consume_async_gen(driver.tcpdump())
with patch(f"{_DRIVER_MODULE}.anyio.open_process",
new_callable=AsyncMock, return_value=mock_proc):
output = anyio.run(
_consume_async_gen, driver.tcpdump()
)

assert len(output) == 2
Expand All @@ -208,82 +215,55 @@ async def mock_readline():
def test_tcpdump_enforces_interface(self, tmp_path: Path):
driver = _make_driver(tmp_path, enable_tcpdump=True)

mock_stdout = AsyncMock()
mock_stdout.readline = AsyncMock(return_value=b"")
mock_stdout = _create_mock_byte_stream(b"")

mock_proc = AsyncMock()
mock_proc.stdout = mock_stdout
mock_proc.stderr = _create_mock_byte_stream(b"")
mock_proc.returncode = 0
mock_proc.terminate = MagicMock()
mock_proc.wait = AsyncMock()

with patch(f"{_DRIVER_MODULE}.asyncio.subprocess.create_subprocess_exec",
return_value=mock_proc) as mock_exec:
asyncio.run(
_consume_async_gen(driver.tcpdump(args=["-i", "evil-iface", "-c", "1"]))
with patch(f"{_DRIVER_MODULE}.anyio.open_process",
new_callable=AsyncMock, return_value=mock_proc) as mock_exec:
anyio.run(
_consume_async_gen, driver.tcpdump(args=["-i", "evil-iface", "-c", "1"])
)

# Verify the command was called with the correct interface
call_args = mock_exec.call_args[0]
cmd = list(call_args)
cmd = list(call_args[0])
assert cmd[0] == "tcpdump"
assert "-i" in cmd
iface_idx = cmd.index("-i")
assert cmd[iface_idx + 1] == "eth-dut"
# The user-specified -i should have been removed by sanitization
assert cmd.count("-i") == 1

def test_tcpdump_passes_extra_args(self, tmp_path: Path):
driver = _make_driver(tmp_path, enable_tcpdump=True)

mock_stdout = AsyncMock()
mock_stdout.readline = AsyncMock(return_value=b"")
mock_stdout = _create_mock_byte_stream(b"")

mock_proc = AsyncMock()
mock_proc.stdout = mock_stdout
mock_proc.stderr = _create_mock_byte_stream(b"")
mock_proc.returncode = 0
mock_proc.terminate = MagicMock()
mock_proc.wait = AsyncMock()

with patch(f"{_DRIVER_MODULE}.asyncio.subprocess.create_subprocess_exec",
return_value=mock_proc) as mock_exec:
asyncio.run(
_consume_async_gen(driver.tcpdump(args=["-c", "10", "-n", "port", "80"]))
with patch(f"{_DRIVER_MODULE}.anyio.open_process",
new_callable=AsyncMock, return_value=mock_proc) as mock_exec:
anyio.run(
_consume_async_gen, driver.tcpdump(args=["-c", "10", "-n", "port", "80"])
)

call_args = mock_exec.call_args[0]
cmd = list(call_args)
cmd = list(call_args[0])
assert "-c" in cmd
assert "10" in cmd
assert "-n" in cmd
assert "port" in cmd
assert "80" in cmd

def test_tcpdump_cleanup_on_cancel(self, tmp_path: Path):
driver = _make_driver(tmp_path, enable_tcpdump=True)

mock_stdout = AsyncMock()
# Simulate a stream that never ends
mock_stdout.readline = AsyncMock(
side_effect=[b"line 1\n", b"line 2\n", asyncio.CancelledError()]
)

mock_proc = AsyncMock()
mock_proc.stdout = mock_stdout
mock_proc.returncode = None
mock_proc.terminate = MagicMock()
mock_proc.wait = AsyncMock()

with patch(f"{_DRIVER_MODULE}.asyncio.subprocess.create_subprocess_exec",
return_value=mock_proc):
with pytest.raises(asyncio.CancelledError):
asyncio.run(
_consume_async_gen(driver.tcpdump())
)

# Verify the process was terminated
mock_proc.terminate.assert_called_once()


class TestTcpdumpCleanup:
def test_cleanup_terminates_tcpdump_process(self, tmp_path: Path):
Expand Down
Loading
Loading