From 73d342e5d1674ae7a9b4ecd21cc7af14ca8beee5 Mon Sep 17 00:00:00 2001 From: Mia Kimberly Christie Date: Wed, 11 Mar 2026 10:54:35 +0000 Subject: [PATCH 1/2] Add parser keepalive, drop callback from stream --- src/ahttpx/_parsers.py | 24 +++-- src/ahttpx/_pool.py | 16 ++-- src/ahttpx/_server.py | 10 +-- src/httpx/_connection.py | 190 +++++++++++++++++++++++++++++++++++++++ src/httpx/_parsers.py | 24 +++-- src/httpx/_pool.py | 16 ++-- src/httpx/_server.py | 10 +-- 7 files changed, 236 insertions(+), 54 deletions(-) create mode 100644 src/httpx/_connection.py diff --git a/src/ahttpx/_parsers.py b/src/ahttpx/_parsers.py index 6ac2c33..182df44 100644 --- a/src/ahttpx/_parsers.py +++ b/src/ahttpx/_parsers.py @@ -1,5 +1,6 @@ import enum import io +import time import typing from ._streams import Stream @@ -89,6 +90,7 @@ def __init__(self, stream: Stream, mode: str) -> None: self.stream = stream self.parser = ReadAheadParser(stream) self.mode = {'CLIENT': Mode.CLIENT, 'SERVER': Mode.SERVER}[mode] + self.keepalive_duration = 5.0 # Track state... if self.mode == Mode.CLIENT: @@ -107,6 +109,7 @@ def __init__(self, stream: Stream, mode: str) -> None: # Track connection keep alive... self.send_keep_alive = True self.recv_keep_alive = True + self.keepalive_until: float | None = None # Special states... self.processing_1xx = False @@ -119,6 +122,9 @@ async def send_method_line(self, method: bytes, target: bytes, protocol: bytes) Sending state will switch to SEND_HEADERS state. """ + # Scrub connection keepalive + self.keepalive_until = None + if self.send_state != State.SEND_METHOD_LINE: msg = f"Called 'send_method_line' in invalid state {self.send_state}" raise ProtocolError(msg) @@ -244,6 +250,9 @@ async def recv_method_line(self) -> tuple[bytes, bytes, bytes]: Receive state will switch to RECV_HEADERS. """ + # Scrub connection keepalive + self.keepalive_until = None + if self.recv_state != State.RECV_METHOD_LINE: msg = f"Called 'recv_method_line' in invalid state {self.recv_state}" raise ProtocolError(msg) @@ -409,6 +418,7 @@ async def complete(self): self.send_keep_alive = True self.recv_keep_alive = True self.processing_1xx = False + self.keepalive_until = time.monotonic() + self.keepalive_duration async def close(self): if self.send_state != State.CLOSED: @@ -425,6 +435,9 @@ def is_idle(self) -> bool: def is_closed(self) -> bool: return self.send_state == State.CLOSED + def keepalive_expired(self) -> bool: + return (self.keepalive_until is not None) and (time.monotonic() > self.keepalive_until) + def description(self) -> str: return { State.SEND_METHOD_LINE: "idle", @@ -439,10 +452,9 @@ def __repr__(self) -> str: class HTTPStream(Stream): - def __init__(self, parser: HTTPParser, callback: typing.Callable | None = None): + def __init__(self, parser: HTTPParser): self._parser = parser self._buffer = io.BytesIO() - self._callback = callback async def read(self, size=-1) -> bytes: sections = [] @@ -474,12 +486,8 @@ async def read(self, size=-1) -> bytes: return output async def close(self) -> None: - try: - self._buffer.close() - await self._parser.complete() - finally: - if self._callback is not None: - await self._callback() + self._buffer.close() + await self._parser.complete() class ReadAheadParser: diff --git a/src/ahttpx/_pool.py b/src/ahttpx/_pool.py index c4838da..911e76f 100644 --- a/src/ahttpx/_pool.py +++ b/src/ahttpx/_pool.py @@ -84,9 +84,8 @@ async def _get_connection(self, request: Request) -> "Connection": # Attempt to reuse an existing connection. url = request.url origin = URL(scheme=url.scheme, host=url.host, port=url.port) - now = time.monotonic() for conn in self._connections: - if conn.origin() == origin and conn.is_idle() and not conn.is_expired(now): + if conn.origin() == origin and conn.is_idle() and not conn.is_expired(): return conn # Or else create a new connection. @@ -102,7 +101,7 @@ async def _get_connection(self, request: Request) -> "Connection": async def _cleanup(self) -> None: now = time.monotonic() for conn in list(self._connections): - if conn.is_expired(now): + if conn.is_expired(): await conn.close() if conn.is_closed(): self._connections.remove(conn) @@ -142,8 +141,6 @@ class Connection(Transport): def __init__(self, stream: Stream, origin: URL | str): self._stream = stream self._origin = URL(origin) if not isinstance(origin, URL) else origin - self._keepalive_duration = 5.0 - self._idle_expiry = time.monotonic() + self._keepalive_duration self._request_lock = Lock() self._parser = HTTPParser(stream, mode='CLIENT') @@ -154,8 +151,8 @@ def origin(self) -> URL: def is_idle(self) -> bool: return self._parser.is_idle() - def is_expired(self, when: float) -> bool: - return self._parser.is_idle() and when > self._idle_expiry + def is_expired(self) -> bool: + return self._parser.is_idle() and self._parser.keepalive_expired() def is_closed(self) -> bool: return self._parser.is_closed() @@ -170,7 +167,7 @@ async def send(self, request: Request) -> Response: await self._send_head(request) await self._send_body(request) code, headers = await self._recv_head() - stream = HTTPStream(self._parser, callback=self._complete) + stream = HTTPStream(self._parser) # TODO... return Response(code, headers=headers, content=stream) # finally: @@ -233,9 +230,6 @@ async def _recv_body(self) -> bytes: return await self._parser.recv_body() # Request/response cycle complete... - async def _complete(self) -> None: - self._idle_expiry = time.monotonic() + self._keepalive_duration - async def _close(self) -> None: await self._parser.close() diff --git a/src/ahttpx/_server.py b/src/ahttpx/_server.py index 577d001..d30416e 100644 --- a/src/ahttpx/_server.py +++ b/src/ahttpx/_server.py @@ -24,15 +24,13 @@ def __init__(self, stream, endpoint): self._stream = stream self._endpoint = endpoint self._parser = HTTPParser(stream, mode='SERVER') - self._keepalive_duration = 5.0 - self._idle_expiry = time.monotonic() + self._keepalive_duration # API entry points... async def handle_requests(self): try: - while not await self._parser.recv_close(): + while not (self._parser.keepalive_expired() or await self._parser.recv_close()): method, url, headers = await self._recv_head() - stream = HTTPStream(self._parser, callback=self._complete) + stream = HTTPStream(self._parser) # TODO: Handle endpoint exceptions async with Request(method, url, headers=headers, content=stream) as request: try: @@ -82,10 +80,6 @@ async def _send_body(self, response: Response): await self._parser.send_body(data) await self._parser.send_body(b'') - # Start it all over again... - async def _complete(self): - self._idle_expiry = time.monotonic() + self._keepalive_duration - class HTTPServer: def __init__(self, host, port): diff --git a/src/httpx/_connection.py b/src/httpx/_connection.py new file mode 100644 index 0000000..a395618 --- /dev/null +++ b/src/httpx/_connection.py @@ -0,0 +1,190 @@ +import time +import types + +from ._content import Content +from ._headers import Headers +from ._network import Lock, NetworkBackend +from ._parsers import HTTPParser, HTTPStream +from ._response import Response +from ._request import Method, Request +from ._streams import Stream +from ._urls import URL + + +__all__ = [ + "Transport", + "Connection", + "open_connection", +] + +class Transport: + def send(self, request: Request) -> Response: + raise NotImplementedError() + + def close(self): + pass + + def request( + self, + method: Method | str, + url: URL | str, + headers: Headers | dict[str, str] | None = None, + content: Content | Stream | bytes | None = None, + ) -> Response: + request = Request(method, url, headers=headers, content=content) + with self.send(request) as response: + response.read() + return response + + def stream( + self, + method: Method | str, + url: URL | str, + headers: Headers | dict[str, str] | None = None, + content: Content | Stream | bytes | None = None, + ) -> Response: + request = Request(method, url, headers=headers, content=content) + response = self.send(request) + return response + + +class Connection(Transport): + def __init__(self, stream: Stream, origin: URL | str): + self._stream = stream + self._origin = URL(origin) if not isinstance(origin, URL) else origin + self._keepalive_duration = 5.0 + self._idle_expiry = time.monotonic() + self._keepalive_duration + self._request_lock = Lock() + self._parser = HTTPParser(stream, mode='CLIENT') + + # API for connection pool management... + def origin(self) -> URL: + return self._origin + + def is_idle(self) -> bool: + return self._parser.is_idle() + + def is_expired(self, when: float) -> bool: + return self._parser.is_idle() and when > self._idle_expiry + + def is_closed(self) -> bool: + return self._parser.is_closed() + + def description(self) -> str: + return self._parser.description() + + # API entry points... + def send(self, request: Request) -> Response: + #async with self._request_lock: + # try: + self._send_head(request) + self._send_body(request) + code, headers = self._recv_head() + stream = HTTPStream(self._parser, callback=self._complete) + # TODO... + return Response(code, headers=headers, content=stream) + # finally: + # await self._cycle_complete() + + def close(self) -> None: + with self._request_lock: + self._close() + + # Top-level API for working directly with a connection. + def request( + self, + method: Method | str, + url: URL | str, + headers: Headers | dict[str, str] | None = None, + content: Content | Stream | bytes | None = None, + ) -> Response: + url = self._origin.join(url) + request = Request(method, url, headers=headers, content=content) + with self.send(request) as response: + response.read() + return response + + def stream( + self, + method: Method | str, + url: URL | str, + headers: Headers | dict[str, str] | None = None, + content: Content | Stream | bytes | None = None, + ) -> Response: + url = self._origin.join(url) + request = Request(method, url, headers=headers, content=content) + return self.send(request) + + # Send the request... + def _send_head(self, request: Request) -> None: + method = bytes(request.method) + target = request.url.target.encode('ascii') + protocol = b'HTTP/1.1' + self._parser.send_method_line(method, target, protocol) + headers = request.headers.as_byte_pairs() + self._parser.send_headers(headers) + + def _send_body(self, request: Request) -> None: + while data := request.stream.read(64 * 1024): + self._parser.send_body(data) + self._parser.send_body(b'') + + # Receive the response... + def _recv_head(self) -> tuple[int, Headers]: + _, code, _ = self._parser.recv_status_line() + h = self._parser.recv_headers() + headers = Headers([ + (k.decode('ascii'), v.decode('ascii')) + for k, v in h + ]) + return code, headers + + def _recv_body(self) -> bytes: + return self._parser.recv_body() + + # Request/response cycle complete... + def _complete(self) -> None: + self._idle_expiry = time.monotonic() + self._keepalive_duration + + def _close(self) -> None: + self._parser.close() + + # Builtins... + def __repr__(self) -> str: + return f"" + + def __enter__(self) -> "Connection": + return self + + def __exit__( + self, + exc_type: type[BaseException] | None = None, + exc_value: BaseException | None = None, + traceback: types.TracebackType | None = None, + ): + self.close() + + +def open_connection( + url: URL | str, + hostname: str = '', + backend: NetworkBackend | None = None, + ) -> Connection: + + if isinstance(url, str): + url = URL(url) + + if url.scheme not in ("http", "https"): + raise ValueError("URL scheme must be 'http://' or 'https://'.") + if backend is None: + backend = NetworkBackend() + + host = url.host + port = url.port or {"http": 80, "https": 443}[url.scheme] + + if url.scheme == "https": + stream = backend.connect_tls(host, port, hostname) + else: + stream = backend.connect(host, port) + + return Connection(stream, url) diff --git a/src/httpx/_parsers.py b/src/httpx/_parsers.py index 415bfef..2c7f1af 100644 --- a/src/httpx/_parsers.py +++ b/src/httpx/_parsers.py @@ -1,5 +1,6 @@ import enum import io +import time import typing from ._streams import Stream @@ -89,6 +90,7 @@ def __init__(self, stream: Stream, mode: str) -> None: self.stream = stream self.parser = ReadAheadParser(stream) self.mode = {'CLIENT': Mode.CLIENT, 'SERVER': Mode.SERVER}[mode] + self.keepalive_duration = 5.0 # Track state... if self.mode == Mode.CLIENT: @@ -107,6 +109,7 @@ def __init__(self, stream: Stream, mode: str) -> None: # Track connection keep alive... self.send_keep_alive = True self.recv_keep_alive = True + self.keepalive_until: float | None = None # Special states... self.processing_1xx = False @@ -119,6 +122,9 @@ def send_method_line(self, method: bytes, target: bytes, protocol: bytes) -> Non Sending state will switch to SEND_HEADERS state. """ + # Scrub connection keepalive + self.keepalive_until = None + if self.send_state != State.SEND_METHOD_LINE: msg = f"Called 'send_method_line' in invalid state {self.send_state}" raise ProtocolError(msg) @@ -244,6 +250,9 @@ def recv_method_line(self) -> tuple[bytes, bytes, bytes]: Receive state will switch to RECV_HEADERS. """ + # Scrub connection keepalive + self.keepalive_until = None + if self.recv_state != State.RECV_METHOD_LINE: msg = f"Called 'recv_method_line' in invalid state {self.recv_state}" raise ProtocolError(msg) @@ -409,6 +418,7 @@ def complete(self): self.send_keep_alive = True self.recv_keep_alive = True self.processing_1xx = False + self.keepalive_until = time.monotonic() + self.keepalive_duration def close(self): if self.send_state != State.CLOSED: @@ -425,6 +435,9 @@ def is_idle(self) -> bool: def is_closed(self) -> bool: return self.send_state == State.CLOSED + def keepalive_expired(self) -> bool: + return (self.keepalive_until is not None) and (time.monotonic() > self.keepalive_until) + def description(self) -> str: return { State.SEND_METHOD_LINE: "idle", @@ -439,10 +452,9 @@ def __repr__(self) -> str: class HTTPStream(Stream): - def __init__(self, parser: HTTPParser, callback: typing.Callable | None = None): + def __init__(self, parser: HTTPParser): self._parser = parser self._buffer = io.BytesIO() - self._callback = callback def read(self, size=-1) -> bytes: sections = [] @@ -474,12 +486,8 @@ def read(self, size=-1) -> bytes: return output def close(self) -> None: - try: - self._buffer.close() - self._parser.complete() - finally: - if self._callback is not None: - self._callback() + self._buffer.close() + self._parser.complete() class ReadAheadParser: diff --git a/src/httpx/_pool.py b/src/httpx/_pool.py index 8959159..796c99b 100644 --- a/src/httpx/_pool.py +++ b/src/httpx/_pool.py @@ -84,9 +84,8 @@ def _get_connection(self, request: Request) -> "Connection": # Attempt to reuse an existing connection. url = request.url origin = URL(scheme=url.scheme, host=url.host, port=url.port) - now = time.monotonic() for conn in self._connections: - if conn.origin() == origin and conn.is_idle() and not conn.is_expired(now): + if conn.origin() == origin and conn.is_idle() and not conn.is_expired(): return conn # Or else create a new connection. @@ -102,7 +101,7 @@ def _get_connection(self, request: Request) -> "Connection": def _cleanup(self) -> None: now = time.monotonic() for conn in list(self._connections): - if conn.is_expired(now): + if conn.is_expired(): conn.close() if conn.is_closed(): self._connections.remove(conn) @@ -142,8 +141,6 @@ class Connection(Transport): def __init__(self, stream: Stream, origin: URL | str): self._stream = stream self._origin = URL(origin) if not isinstance(origin, URL) else origin - self._keepalive_duration = 5.0 - self._idle_expiry = time.monotonic() + self._keepalive_duration self._request_lock = Lock() self._parser = HTTPParser(stream, mode='CLIENT') @@ -154,8 +151,8 @@ def origin(self) -> URL: def is_idle(self) -> bool: return self._parser.is_idle() - def is_expired(self, when: float) -> bool: - return self._parser.is_idle() and when > self._idle_expiry + def is_expired(self) -> bool: + return self._parser.is_idle() and self._parser.keepalive_expired() def is_closed(self) -> bool: return self._parser.is_closed() @@ -170,7 +167,7 @@ def send(self, request: Request) -> Response: self._send_head(request) self._send_body(request) code, headers = self._recv_head() - stream = HTTPStream(self._parser, callback=self._complete) + stream = HTTPStream(self._parser) # TODO... return Response(code, headers=headers, content=stream) # finally: @@ -233,9 +230,6 @@ def _recv_body(self) -> bytes: return self._parser.recv_body() # Request/response cycle complete... - def _complete(self) -> None: - self._idle_expiry = time.monotonic() + self._keepalive_duration - def _close(self) -> None: self._parser.close() diff --git a/src/httpx/_server.py b/src/httpx/_server.py index 4f1ca3a..7ed426b 100644 --- a/src/httpx/_server.py +++ b/src/httpx/_server.py @@ -24,15 +24,13 @@ def __init__(self, stream, endpoint): self._stream = stream self._endpoint = endpoint self._parser = HTTPParser(stream, mode='SERVER') - self._keepalive_duration = 5.0 - self._idle_expiry = time.monotonic() + self._keepalive_duration # API entry points... def handle_requests(self): try: - while not self._parser.recv_close(): + while not (self._parser.keepalive_expired() or self._parser.recv_close()): method, url, headers = self._recv_head() - stream = HTTPStream(self._parser, callback=self._complete) + stream = HTTPStream(self._parser) # TODO: Handle endpoint exceptions with Request(method, url, headers=headers, content=stream) as request: try: @@ -82,10 +80,6 @@ def _send_body(self, response: Response): self._parser.send_body(data) self._parser.send_body(b'') - # Start it all over again... - def _complete(self): - self._idle_expiry = time.monotonic() + self._keepalive_duration - class HTTPServer: def __init__(self, host, port): From 5197ccba8189dc71e8208aac0937ce24407febbd Mon Sep 17 00:00:00 2001 From: Mia Kimberly Christie Date: Wed, 11 Mar 2026 10:58:12 +0000 Subject: [PATCH 2/2] Drop erronous checkin --- src/httpx/_connection.py | 190 --------------------------------------- 1 file changed, 190 deletions(-) delete mode 100644 src/httpx/_connection.py diff --git a/src/httpx/_connection.py b/src/httpx/_connection.py deleted file mode 100644 index a395618..0000000 --- a/src/httpx/_connection.py +++ /dev/null @@ -1,190 +0,0 @@ -import time -import types - -from ._content import Content -from ._headers import Headers -from ._network import Lock, NetworkBackend -from ._parsers import HTTPParser, HTTPStream -from ._response import Response -from ._request import Method, Request -from ._streams import Stream -from ._urls import URL - - -__all__ = [ - "Transport", - "Connection", - "open_connection", -] - -class Transport: - def send(self, request: Request) -> Response: - raise NotImplementedError() - - def close(self): - pass - - def request( - self, - method: Method | str, - url: URL | str, - headers: Headers | dict[str, str] | None = None, - content: Content | Stream | bytes | None = None, - ) -> Response: - request = Request(method, url, headers=headers, content=content) - with self.send(request) as response: - response.read() - return response - - def stream( - self, - method: Method | str, - url: URL | str, - headers: Headers | dict[str, str] | None = None, - content: Content | Stream | bytes | None = None, - ) -> Response: - request = Request(method, url, headers=headers, content=content) - response = self.send(request) - return response - - -class Connection(Transport): - def __init__(self, stream: Stream, origin: URL | str): - self._stream = stream - self._origin = URL(origin) if not isinstance(origin, URL) else origin - self._keepalive_duration = 5.0 - self._idle_expiry = time.monotonic() + self._keepalive_duration - self._request_lock = Lock() - self._parser = HTTPParser(stream, mode='CLIENT') - - # API for connection pool management... - def origin(self) -> URL: - return self._origin - - def is_idle(self) -> bool: - return self._parser.is_idle() - - def is_expired(self, when: float) -> bool: - return self._parser.is_idle() and when > self._idle_expiry - - def is_closed(self) -> bool: - return self._parser.is_closed() - - def description(self) -> str: - return self._parser.description() - - # API entry points... - def send(self, request: Request) -> Response: - #async with self._request_lock: - # try: - self._send_head(request) - self._send_body(request) - code, headers = self._recv_head() - stream = HTTPStream(self._parser, callback=self._complete) - # TODO... - return Response(code, headers=headers, content=stream) - # finally: - # await self._cycle_complete() - - def close(self) -> None: - with self._request_lock: - self._close() - - # Top-level API for working directly with a connection. - def request( - self, - method: Method | str, - url: URL | str, - headers: Headers | dict[str, str] | None = None, - content: Content | Stream | bytes | None = None, - ) -> Response: - url = self._origin.join(url) - request = Request(method, url, headers=headers, content=content) - with self.send(request) as response: - response.read() - return response - - def stream( - self, - method: Method | str, - url: URL | str, - headers: Headers | dict[str, str] | None = None, - content: Content | Stream | bytes | None = None, - ) -> Response: - url = self._origin.join(url) - request = Request(method, url, headers=headers, content=content) - return self.send(request) - - # Send the request... - def _send_head(self, request: Request) -> None: - method = bytes(request.method) - target = request.url.target.encode('ascii') - protocol = b'HTTP/1.1' - self._parser.send_method_line(method, target, protocol) - headers = request.headers.as_byte_pairs() - self._parser.send_headers(headers) - - def _send_body(self, request: Request) -> None: - while data := request.stream.read(64 * 1024): - self._parser.send_body(data) - self._parser.send_body(b'') - - # Receive the response... - def _recv_head(self) -> tuple[int, Headers]: - _, code, _ = self._parser.recv_status_line() - h = self._parser.recv_headers() - headers = Headers([ - (k.decode('ascii'), v.decode('ascii')) - for k, v in h - ]) - return code, headers - - def _recv_body(self) -> bytes: - return self._parser.recv_body() - - # Request/response cycle complete... - def _complete(self) -> None: - self._idle_expiry = time.monotonic() + self._keepalive_duration - - def _close(self) -> None: - self._parser.close() - - # Builtins... - def __repr__(self) -> str: - return f"" - - def __enter__(self) -> "Connection": - return self - - def __exit__( - self, - exc_type: type[BaseException] | None = None, - exc_value: BaseException | None = None, - traceback: types.TracebackType | None = None, - ): - self.close() - - -def open_connection( - url: URL | str, - hostname: str = '', - backend: NetworkBackend | None = None, - ) -> Connection: - - if isinstance(url, str): - url = URL(url) - - if url.scheme not in ("http", "https"): - raise ValueError("URL scheme must be 'http://' or 'https://'.") - if backend is None: - backend = NetworkBackend() - - host = url.host - port = url.port or {"http": 80, "https": 443}[url.scheme] - - if url.scheme == "https": - stream = backend.connect_tls(host, port, hostname) - else: - stream = backend.connect(host, port) - - return Connection(stream, url)