From a536a5a59c1086ef636b22f4d5dcff0e2d859c8b Mon Sep 17 00:00:00 2001 From: JeyBee Date: Thu, 14 May 2026 18:28:53 +0200 Subject: [PATCH 1/2] Add Pool.on_acquire hook with AcquireEvent Adds an optional `on_acquire` callback to `Pool` / `create_pool`, mirroring the existing setup/init/reset style. The callback is invoked synchronously with an `AcquireEvent(wait_seconds, size, idle, max_size)` after every successful `Pool.acquire` dispatch. Lets applications detect pool saturation (long wait, idle == 0) without subclassing the pool or wrapping every callsite. Callback exceptions are logged and suppressed. No behavior change when unused. --- asyncpg/__init__.py | 4 ++-- asyncpg/pool.py | 58 +++++++++++++++++++++++++++++++++++++++++---- tests/test_pool.py | 56 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 6 deletions(-) diff --git a/asyncpg/__init__.py b/asyncpg/__init__.py index e8811a9d..45f3c661 100644 --- a/asyncpg/__init__.py +++ b/asyncpg/__init__.py @@ -8,7 +8,7 @@ from .connection import connect, Connection # NOQA from .exceptions import * # NOQA -from .pool import create_pool, Pool # NOQA +from .pool import create_pool, Pool, AcquireEvent # NOQA from .protocol import Record # NOQA from .types import * # NOQA @@ -19,6 +19,6 @@ __all__: tuple[str, ...] = ( - 'connect', 'create_pool', 'Pool', 'Record', 'Connection' + 'connect', 'create_pool', 'Pool', 'Record', 'Connection', 'AcquireEvent' ) __all__ += exceptions.__all__ # NOQA diff --git a/asyncpg/pool.py b/asyncpg/pool.py index 5c7ea9ca..77c96d8a 100644 --- a/asyncpg/pool.py +++ b/asyncpg/pool.py @@ -8,6 +8,7 @@ import asyncio from collections.abc import Awaitable, Callable +import dataclasses import functools import inspect import logging @@ -25,6 +26,19 @@ logger = logging.getLogger(__name__) +@dataclasses.dataclass(frozen=True) +class AcquireEvent: + """Emitted by :meth:`Pool.acquire` on every successful dispatch. + + .. versionadded:: 0.32.0 + """ + + wait_seconds: float + size: int + idle: int + max_size: int + + class PoolConnectionProxyMeta(type): def __new__( @@ -342,7 +356,8 @@ class Pool: '_init', '_connect', '_reset', '_connect_args', '_connect_kwargs', '_holders', '_initialized', '_initializing', '_closing', '_closed', '_connection_class', '_record_class', '_generation', - '_setup', '_max_queries', '_max_inactive_connection_lifetime' + '_setup', '_max_queries', '_max_inactive_connection_lifetime', + '_on_acquire', ) def __init__(self, *connect_args, @@ -357,6 +372,8 @@ def __init__(self, *connect_args, loop, connection_class, record_class, + on_acquire: Optional[ + Callable[[AcquireEvent], None]] = None, **connect_kwargs): if len(connect_args) > 1: @@ -399,6 +416,8 @@ def __init__(self, *connect_args, 'record_class is expected to be a subclass of ' 'asyncpg.Record, got {!r}'.format(record_class)) + self._on_acquire = on_acquire + self._minsize = min_size self._maxsize = max_size @@ -892,11 +911,29 @@ async def _acquire_impl(): raise exceptions.InterfaceError('pool is closing') self._check_init() + cb = self._on_acquire + if cb is None: + if timeout is None: + return await _acquire_impl() + return await compat.wait_for(_acquire_impl(), timeout=timeout) + + started = time.monotonic() if timeout is None: - return await _acquire_impl() + proxy = await _acquire_impl() else: - return await compat.wait_for( - _acquire_impl(), timeout=timeout) + proxy = await compat.wait_for(_acquire_impl(), timeout=timeout) + event = AcquireEvent( + wait_seconds=time.monotonic() - started, + size=self.get_size(), + idle=self.get_idle_size(), + max_size=self._maxsize, + ) + try: + cb(event) + except Exception: + logger.exception( + 'asyncpg on_acquire callback raised; suppressing') + return proxy async def release(self, connection, *, timeout=None): """Release a database connection back to the pool. @@ -1084,6 +1121,8 @@ def create_pool(dsn=None, *, loop=None, connection_class=connection.Connection, record_class=protocol.Record, + on_acquire: Optional[ + Callable[[AcquireEvent], None]] = None, **connect_kwargs): r"""Create a connection pool. @@ -1230,6 +1269,16 @@ def create_pool(dsn=None, *, .. versionchanged:: 0.30.0 Added the *connect* and *reset* parameters. + + :param on_acquire: + Synchronous callback invoked with an :class:`AcquireEvent` after + every successful :meth:`Pool.acquire` dispatch. ``wait_seconds`` + is wall-clock time spent inside :meth:`Pool.acquire` (queue wait + plus any reconnect or ``setup`` callback). Exceptions are + logged and suppressed. + + .. versionchanged:: 0.32.0 + Added the *on_acquire* parameter. """ return Pool( dsn, @@ -1244,5 +1293,6 @@ def create_pool(dsn=None, *, init=init, reset=reset, max_inactive_connection_lifetime=max_inactive_connection_lifetime, + on_acquire=on_acquire, **connect_kwargs, ) diff --git a/tests/test_pool.py b/tests/test_pool.py index 695363b7..3eaf7d28 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -1004,6 +1004,62 @@ async def worker(): conn = await pool.acquire(timeout=0.1) await pool.release(conn) + async def test_pool_on_acquire_reports_saturation_wait(self): + events = [] + pool = await self.create_pool( + database='postgres', + min_size=1, + max_size=1, + on_acquire=events.append, + ) + try: + holder_acquired = asyncio.Event() + release_holder = asyncio.Event() + + async def holder(): + async with pool.acquire(): + holder_acquired.set() + await release_holder.wait() + + async def waiter(): + await holder_acquired.wait() + async with pool.acquire() as con: + await con.fetchval('SELECT 1') + + holder_task = self.loop.create_task(holder()) + waiter_task = self.loop.create_task(waiter()) + await holder_acquired.wait() + await asyncio.sleep(0.15) + release_holder.set() + await asyncio.gather(holder_task, waiter_task) + finally: + await pool.close() + + self.assertEqual(len(events), 2) + for ev in events: + self.assertEqual(ev.max_size, 1) + self.assertGreaterEqual(ev.wait_seconds, 0) + self.assertGreaterEqual( + max(ev.wait_seconds for ev in events), 0.1) + + async def test_pool_on_acquire_not_fired_on_timeout(self): + events = [] + pool = await self.create_pool( + database='postgres', + min_size=1, + max_size=1, + on_acquire=events.append, + ) + try: + async with pool.acquire(): + with self.assertRaises(asyncio.TimeoutError): + await pool.acquire(timeout=0.1) + finally: + await pool.close() + + # one event for the outer successful acquire, none for the timeout + self.assertEqual(len(events), 1) + @unittest.skipIf(os.environ.get('PGHOST'), 'unmanaged cluster') class TestPoolReconnectWithTargetSessionAttrs(tb.ClusterTestCase): From a128258cd86af01a898b80bd37bff8850efe0b3d Mon Sep 17 00:00:00 2001 From: JeyBee Date: Thu, 14 May 2026 18:43:23 +0200 Subject: [PATCH 2/2] Hoist on_acquire None-check out of helper Caller branches in `_acquire` so `_fire_on_acquire` does not need a redundant None check. --- asyncpg/pool.py | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/asyncpg/pool.py b/asyncpg/pool.py index 77c96d8a..5b4a9f0a 100644 --- a/asyncpg/pool.py +++ b/asyncpg/pool.py @@ -911,29 +911,27 @@ async def _acquire_impl(): raise exceptions.InterfaceError('pool is closing') self._check_init() - cb = self._on_acquire - if cb is None: - if timeout is None: - return await _acquire_impl() - return await compat.wait_for(_acquire_impl(), timeout=timeout) - started = time.monotonic() if timeout is None: proxy = await _acquire_impl() else: - proxy = await compat.wait_for(_acquire_impl(), timeout=timeout) - event = AcquireEvent( - wait_seconds=time.monotonic() - started, - size=self.get_size(), - idle=self.get_idle_size(), - max_size=self._maxsize, - ) + proxy = await compat.wait_for( + _acquire_impl(), timeout=timeout) + if self._on_acquire is not None: + self._fire_on_acquire(time.monotonic() - started) + return proxy + + def _fire_on_acquire(self, wait_seconds: float) -> None: try: - cb(event) + self._on_acquire(AcquireEvent( + wait_seconds=wait_seconds, + size=self.get_size(), + idle=self.get_idle_size(), + max_size=self._maxsize, + )) except Exception: logger.exception( 'asyncpg on_acquire callback raised; suppressing') - return proxy async def release(self, connection, *, timeout=None): """Release a database connection back to the pool.