diff --git a/deploy/docker/crawler_pool.py b/deploy/docker/crawler_pool.py index 516d9562a..f3790262c 100644 --- a/deploy/docker/crawler_pool.py +++ b/deploy/docker/crawler_pool.py @@ -1,5 +1,5 @@ # crawler_pool.py - Smart browser pool with tiered management -import asyncio, json, hashlib, time +import asyncio, json, hashlib, signal, time from contextlib import suppress from typing import Dict, Optional from crawl4ai import AsyncWebCrawler, BrowserConfig @@ -11,16 +11,27 @@ # Pool tiers PERMANENT: Optional[AsyncWebCrawler] = None # Always-ready default browser -HOT_POOL: Dict[str, AsyncWebCrawler] = {} # Frequent configs -COLD_POOL: Dict[str, AsyncWebCrawler] = {} # Rare configs +HOT_POOL: Dict[str, AsyncWebCrawler] = {} # Frequent configs +COLD_POOL: Dict[str, AsyncWebCrawler] = {} # Rare configs LAST_USED: Dict[str, float] = {} USAGE_COUNT: Dict[str, int] = {} LOCK = asyncio.Lock() # Config MEM_LIMIT = CONFIG.get("crawler", {}).get("memory_threshold_percent", 95.0) +MEM_RECYCLE_THRESHOLD = CONFIG.get("crawler", {}).get("memory_recycle_percent", 88.0) BASE_IDLE_TTL = CONFIG.get("crawler", {}).get("pool", {}).get("idle_ttl_sec", 300) +PERMANENT_MAX_AGE_S = ( + CONFIG.get("crawler", {}).get("pool", {}).get("permanent_max_age_sec", 4 * 3600) +) # 4 h +PERMANENT_MAX_REQUESTS = ( + CONFIG.get("crawler", {}).get("pool", {}).get("permanent_max_requests", 2000) +) DEFAULT_CONFIG_SIG = None # Cached sig for default config +_PERMANENT_CFG: Optional[BrowserConfig] = None # Saved for crash recovery +_PERMANENT_STARTED_AT: float = ( + 0.0 # Wall-clock time when PERMANENT was last (re)started +) def get_pool_snapshot() -> dict: @@ -43,57 +54,267 @@ def get_pool_snapshot() -> dict: } +def _get_browser_proc(crawler: AsyncWebCrawler): + """Return the Playwright subprocess (Popen) object for *crawler*, or None. + + Playwright stores the underlying Chromium process as + ``transport._proc`` on the pipe/websocket transport object. We walk + the standard (Browser) and persistent-context paths, returning None + if the reference is unavailable for any reason. + """ + try: + strategy = getattr(crawler, "crawler_strategy", None) + bm = getattr(strategy, "browser_manager", None) + if bm is None: + return None + if getattr(bm, "_launched_persistent", False): + ctx = getattr(bm, "default_context", None) + impl = getattr(ctx, "_impl_obj", None) + conn = getattr(impl, "_connection", None) + transport = getattr(conn, "_transport", None) + else: + browser = getattr(bm, "browser", None) + channel = getattr(browser, "_channel", None) + conn = getattr(channel, "_connection", None) + transport = getattr(conn, "_transport", None) + proc = getattr(transport, "_proc", None) + # Return only if the process hasn't exited yet + if proc is not None and proc.returncode is None: + return proc + return None + except Exception: + return None + + +def _kill_proc_tree(pid: int) -> None: + """SIGKILL *pid* and all its descendant processes using psutil. + + Called after ``crawler.close()`` to ensure Chromium renderer / utility + sub-processes that survived the close are reaped immediately rather than + continuing to hold memory until the next GC or container restart. + """ + try: + import psutil + + parent = psutil.Process(pid) + children = parent.children(recursive=True) + for child in children: + with suppress(Exception): + child.send_signal(signal.SIGKILL) + with suppress(Exception): + parent.send_signal(signal.SIGKILL) + logger.debug( + f"Killed Chromium process tree (pid={pid}, children={len(children)})" + ) + except Exception: + pass + + +async def _close_and_kill(crawler: AsyncWebCrawler) -> None: + """Close *crawler* then force-kill any surviving Chromium OS processes. + + ``crawler.close()`` sends a polite shutdown to Chromium, but when the + browser has already crashed the command is never delivered and the OS + process tree stays alive. Capturing the subprocess reference *before* + calling close (while the transport object still exists) and then issuing + SIGKILL afterwards guarantees that no renderer or utility processes linger + regardless of whether the close succeeded. + """ + chrome_proc = _get_browser_proc(crawler) + with suppress(Exception): + await asyncio.wait_for(crawler.close(), timeout=5.0) + if chrome_proc is not None: + _kill_proc_tree(chrome_proc.pid) + + def _sig(cfg: BrowserConfig) -> str: """Generate config signature.""" - payload = json.dumps(cfg.to_dict(), sort_keys=True, separators=(",",":")) + payload = json.dumps(cfg.to_dict(), sort_keys=True, separators=(",", ":")) return hashlib.sha1(payload.encode()).hexdigest() + def _is_default_config(sig: str) -> bool: """Check if config matches default.""" return sig == DEFAULT_CONFIG_SIG + +def _is_crawler_alive(crawler: AsyncWebCrawler) -> bool: + """Check whether a pooled crawler's Playwright browser is still responsive. + + The Chromium process can die from OOM or /dev/shm exhaustion. When that + happens the Playwright driver connection closes, and every subsequent + ``BrowserContext.new_page()`` call raises + ``Connection closed while reading from the driver``. + + We inspect the internal Playwright state rather than attempting a live + page-open probe because the probe itself could hang or raise, and we + already hold the pool LOCK. + + Important: ``browser.is_connected()`` is updated via an async callback + chain (``Connection.close()`` β†’ ``Browser._on_close()``). It can return + True for several event-loop cycles after the OS pipe has already closed. + We therefore also check ``_channel._connection._transport`` directly: the + transport reference is cleared to ``None`` synchronously inside + ``Connection.close()`` the instant asyncio fires ``connection_lost()``, + giving an immediate and reliable dead-browser signal independent of the + async callback backlog. + + See: https://github.com/unclecode/crawl4ai/issues/842 + """ + try: + strategy = getattr(crawler, "crawler_strategy", None) + if strategy is None: + return False + + # AsyncPlaywrightCrawlerStrategy stores a BrowserManager + bm = getattr(strategy, "browser_manager", None) + if bm is None: + return False + + # Persistent context path β€” no separate browser object + if getattr(bm, "_launched_persistent", False): + ctx = getattr(bm, "default_context", None) + if ctx is None: + return False + # Persistent contexts expose no is_connected(); check impl internals + impl = getattr(ctx, "_impl_obj", None) + if impl is None: + return False + conn = getattr(impl, "_connection", None) + if conn is None: + return False + # _connection._transport is None once the pipe/websocket dies + transport = getattr(conn, "_transport", None) + return transport is not None + + browser = getattr(bm, "browser", None) + if browser is None: + return False + + # is_connected() lags β€” check the transport first for an immediate signal. + # Playwright's Browser extends ChannelOwner, so _channel._connection + # is the live Connection object whose _transport goes None on pipe close. + channel = getattr(browser, "_channel", None) + if channel is not None: + conn = getattr(channel, "_connection", None) + if conn is not None: + transport = getattr(conn, "_transport", None) + if transport is None: + return False # pipe gone β€” browser is dead + + return browser.is_connected() + except Exception: + return False + + +async def _replace_permanent(reason: str = "dead") -> AsyncWebCrawler: + """Close the permanent browser and start a fresh one. + + *reason* is logged to distinguish crash recovery ("dead") from proactive + recycling ("memory pressure", "max age", "max requests"). + Must be called while LOCK is held. + """ + global PERMANENT, _PERMANENT_STARTED_AT + logger.warning(f"πŸ”§ Permanent browser recycling ({reason}) β€” recreating…") + + if PERMANENT: + await _close_and_kill(PERMANENT) + PERMANENT = None + + # Also reset the class-level Playwright singleton so a fresh one is started + try: + from crawl4ai.browser_manager import BrowserManager + + BrowserManager._playwright_instance = None + except Exception: + pass + + new_crawler = AsyncWebCrawler(config=_PERMANENT_CFG, thread_safe=False) + await new_crawler.start() + PERMANENT = new_crawler + _PERMANENT_STARTED_AT = time.time() + LAST_USED[DEFAULT_CONFIG_SIG] = _PERMANENT_STARTED_AT + logger.info("βœ… Permanent browser recreated successfully") + return PERMANENT + + +async def _replace_pooled( + pool: Dict[str, AsyncWebCrawler], sig: str, cfg: BrowserConfig +) -> AsyncWebCrawler: + """Close a dead pooled browser and start a fresh replacement.""" + logger.warning(f"πŸ”§ Pooled browser dead (sig={sig[:8]}) β€” recreating…") + old = pool.pop(sig, None) + if old: + await _close_and_kill(old) + new_crawler = AsyncWebCrawler(config=cfg, thread_safe=False) + await new_crawler.start() + new_crawler.active_requests = 1 + pool[sig] = new_crawler + LAST_USED[sig] = time.time() + logger.info(f"βœ… Pooled browser recreated (sig={sig[:8]})") + return new_crawler + + async def get_crawler(cfg: BrowserConfig) -> AsyncWebCrawler: """Get crawler from pool with tiered strategy.""" sig = _sig(cfg) async with LOCK: # Check permanent browser for default config if PERMANENT and _is_default_config(sig): + # Crash recovery: verify the browser is still alive + if not _is_crawler_alive(PERMANENT): + crawler = await _replace_permanent() + else: + crawler = PERMANENT LAST_USED[sig] = time.time() USAGE_COUNT[sig] = USAGE_COUNT.get(sig, 0) + 1 - if not hasattr(PERMANENT, 'active_requests'): - PERMANENT.active_requests = 0 - PERMANENT.active_requests += 1 + if not hasattr(crawler, "active_requests"): + crawler.active_requests = 0 + crawler.active_requests += 1 logger.info("πŸ”₯ Using permanent browser") - return PERMANENT + return crawler # Check hot pool if sig in HOT_POOL: + crawler = HOT_POOL[sig] + # Crash recovery: verify the browser is still alive + if not _is_crawler_alive(crawler): + crawler = await _replace_pooled(HOT_POOL, sig, cfg) LAST_USED[sig] = time.time() USAGE_COUNT[sig] = USAGE_COUNT.get(sig, 0) + 1 - crawler = HOT_POOL[sig] - if not hasattr(crawler, 'active_requests'): + if not hasattr(crawler, "active_requests"): crawler.active_requests = 0 crawler.active_requests += 1 - logger.info(f"♨️ Using hot pool browser (sig={sig[:8]}, active={crawler.active_requests})") + logger.info( + f"♨️ Using hot pool browser (sig={sig[:8]}, active={crawler.active_requests})" + ) return crawler # Check cold pool (promote to hot if used 3+ times) if sig in COLD_POOL: + crawler = COLD_POOL[sig] + # Crash recovery: verify the browser is still alive + if not _is_crawler_alive(crawler): + crawler = await _replace_pooled(COLD_POOL, sig, cfg) LAST_USED[sig] = time.time() USAGE_COUNT[sig] = USAGE_COUNT.get(sig, 0) + 1 - crawler = COLD_POOL[sig] - if not hasattr(crawler, 'active_requests'): + if not hasattr(crawler, "active_requests"): crawler.active_requests = 0 crawler.active_requests += 1 if USAGE_COUNT[sig] >= 3: - logger.info(f"⬆️ Promoting to hot pool (sig={sig[:8]}, count={USAGE_COUNT[sig]})") + logger.info( + f"⬆️ Promoting to hot pool (sig={sig[:8]}, count={USAGE_COUNT[sig]})" + ) HOT_POOL[sig] = COLD_POOL.pop(sig) # Track promotion in monitor try: from monitor import get_monitor - await get_monitor().track_janitor_event("promote", sig, {"count": USAGE_COUNT[sig]}) + + await get_monitor().track_janitor_event( + "promote", sig, {"count": USAGE_COUNT[sig]} + ) except: pass @@ -109,7 +330,9 @@ async def get_crawler(cfg: BrowserConfig) -> AsyncWebCrawler: raise MemoryError(f"Memory at {mem_pct:.1f}%, refusing new browser") # Create new in cold pool - logger.info(f"πŸ†• Creating new browser in cold pool (sig={sig[:8]}, mem={mem_pct:.1f}%)") + logger.info( + f"πŸ†• Creating new browser in cold pool (sig={sig[:8]}, mem={mem_pct:.1f}%)" + ) crawler = AsyncWebCrawler(config=cfg, thread_safe=False) await crawler.start() crawler.active_requests = 1 @@ -118,6 +341,7 @@ async def get_crawler(cfg: BrowserConfig) -> AsyncWebCrawler: USAGE_COUNT[sig] = 1 return crawler + async def release_crawler(crawler: AsyncWebCrawler): """Decrement active request count for a pooled crawler. @@ -126,36 +350,43 @@ async def release_crawler(crawler: AsyncWebCrawler): to close idle browsers. """ async with LOCK: - if hasattr(crawler, 'active_requests'): + if hasattr(crawler, "active_requests"): crawler.active_requests = max(0, crawler.active_requests - 1) + async def init_permanent(cfg: BrowserConfig): """Initialize permanent default browser.""" - global PERMANENT, DEFAULT_CONFIG_SIG + global PERMANENT, DEFAULT_CONFIG_SIG, _PERMANENT_CFG, _PERMANENT_STARTED_AT async with LOCK: if PERMANENT: return DEFAULT_CONFIG_SIG = _sig(cfg) + _PERMANENT_CFG = cfg # Save config for crash recovery logger.info("πŸ”₯ Creating permanent default browser") PERMANENT = AsyncWebCrawler(config=cfg, thread_safe=False) await PERMANENT.start() - LAST_USED[DEFAULT_CONFIG_SIG] = time.time() + _PERMANENT_STARTED_AT = time.time() + LAST_USED[DEFAULT_CONFIG_SIG] = _PERMANENT_STARTED_AT USAGE_COUNT[DEFAULT_CONFIG_SIG] = 0 + async def close_all(): """Close all browsers.""" async with LOCK: - tasks = [] + all_crawlers = [] if PERMANENT: - tasks.append(PERMANENT.close()) - tasks.extend([c.close() for c in HOT_POOL.values()]) - tasks.extend([c.close() for c in COLD_POOL.values()]) - await asyncio.gather(*tasks, return_exceptions=True) + all_crawlers.append(PERMANENT) + all_crawlers.extend(HOT_POOL.values()) + all_crawlers.extend(COLD_POOL.values()) + await asyncio.gather( + *[_close_and_kill(c) for c in all_crawlers], return_exceptions=True + ) HOT_POOL.clear() COLD_POOL.clear() LAST_USED.clear() USAGE_COUNT.clear() + async def janitor(): """Adaptive cleanup based on memory pressure.""" while True: @@ -173,24 +404,80 @@ async def janitor(): now = time.time() async with LOCK: - # Clean cold pool + # Proactive health check: permanent browser + if PERMANENT and not _is_crawler_alive(PERMANENT): + logger.warning("🧹 Janitor detected dead permanent browser") + try: + await _replace_permanent(reason="dead") + except Exception as e: + logger.error(f"🧹 Failed to replace permanent browser: {e}") + + # Proactive recycling: permanent browser (memory pressure / age / request count) + elif PERMANENT and getattr(PERMANENT, "active_requests", 0) == 0: + age_s = now - _PERMANENT_STARTED_AT + req_count = USAGE_COUNT.get(DEFAULT_CONFIG_SIG, 0) + recycle_reason = None + if mem_pct >= MEM_RECYCLE_THRESHOLD and not HOT_POOL and not COLD_POOL: + recycle_reason = f"memory pressure ({mem_pct:.1f}%)" + elif age_s >= PERMANENT_MAX_AGE_S: + recycle_reason = f"max age ({age_s / 3600:.1f}h)" + elif req_count >= PERMANENT_MAX_REQUESTS: + recycle_reason = f"max requests ({req_count})" + if recycle_reason: + try: + await _replace_permanent(reason=recycle_reason) + USAGE_COUNT[DEFAULT_CONFIG_SIG] = 0 + except Exception as e: + logger.error(f"🧹 Failed to recycle permanent browser: {e}") + + # Proactive health check: hot pool + for sig in list(HOT_POOL.keys()): + if not _is_crawler_alive(HOT_POOL[sig]): + logger.warning( + f"🧹 Janitor detected dead hot browser (sig={sig[:8]})" + ) + old = HOT_POOL.pop(sig, None) + if old: + await _close_and_kill(old) + LAST_USED.pop(sig, None) + USAGE_COUNT.pop(sig, None) + + # Proactive health check: cold pool + for sig in list(COLD_POOL.keys()): + if not _is_crawler_alive(COLD_POOL[sig]): + logger.warning( + f"🧹 Janitor detected dead cold browser (sig={sig[:8]})" + ) + old = COLD_POOL.pop(sig, None) + if old: + await _close_and_kill(old) + LAST_USED.pop(sig, None) + USAGE_COUNT.pop(sig, None) + + # Clean cold pool (idle timeout) for sig in list(COLD_POOL.keys()): if now - LAST_USED.get(sig, now) > cold_ttl: crawler = COLD_POOL[sig] - if getattr(crawler, 'active_requests', 0) > 0: + if getattr(crawler, "active_requests", 0) > 0: continue # still serving requests, skip idle_time = now - LAST_USED[sig] - logger.info(f"🧹 Closing cold browser (sig={sig[:8]}, idle={idle_time:.0f}s)") - with suppress(Exception): - await crawler.close() + logger.info( + f"🧹 Closing cold browser (sig={sig[:8]}, idle={idle_time:.0f}s)" + ) COLD_POOL.pop(sig, None) LAST_USED.pop(sig, None) USAGE_COUNT.pop(sig, None) + await _close_and_kill(crawler) # Track in monitor try: from monitor import get_monitor - await get_monitor().track_janitor_event("close_cold", sig, {"idle_seconds": int(idle_time), "ttl": cold_ttl}) + + await get_monitor().track_janitor_event( + "close_cold", + sig, + {"idle_seconds": int(idle_time), "ttl": cold_ttl}, + ) except: pass @@ -198,23 +485,36 @@ async def janitor(): for sig in list(HOT_POOL.keys()): if now - LAST_USED.get(sig, now) > hot_ttl: crawler = HOT_POOL[sig] - if getattr(crawler, 'active_requests', 0) > 0: + if getattr(crawler, "active_requests", 0) > 0: continue # still serving requests, skip idle_time = now - LAST_USED[sig] - logger.info(f"🧹 Closing hot browser (sig={sig[:8]}, idle={idle_time:.0f}s)") - with suppress(Exception): - await crawler.close() + logger.info( + f"🧹 Closing hot browser (sig={sig[:8]}, idle={idle_time:.0f}s)" + ) HOT_POOL.pop(sig, None) LAST_USED.pop(sig, None) USAGE_COUNT.pop(sig, None) + await _close_and_kill(crawler) # Track in monitor try: from monitor import get_monitor - await get_monitor().track_janitor_event("close_hot", sig, {"idle_seconds": int(idle_time), "ttl": hot_ttl}) + + await get_monitor().track_janitor_event( + "close_hot", + sig, + {"idle_seconds": int(idle_time), "ttl": hot_ttl}, + ) except: pass # Log pool stats if mem_pct > 60: - logger.info(f"πŸ“Š Pool: hot={len(HOT_POOL)}, cold={len(COLD_POOL)}, mem={mem_pct:.1f}%") + perm_age = ( + f", perm_age={int((now - _PERMANENT_STARTED_AT) / 60)}m" + if PERMANENT + else "" + ) + logger.info( + f"πŸ“Š Pool: perm={'yes' if PERMANENT else 'no'}, hot={len(HOT_POOL)}, cold={len(COLD_POOL)}, mem={mem_pct:.1f}%{perm_age}" + ) diff --git a/tests/docker/test_crash_recovery.py b/tests/docker/test_crash_recovery.py new file mode 100644 index 000000000..739d75577 --- /dev/null +++ b/tests/docker/test_crash_recovery.py @@ -0,0 +1,608 @@ +"""Tests for crash recovery additions to crawler_pool.py. + +Covers the three-layer crash recovery mechanism added in the pool-crash-recovery PR: + + 1. _is_crawler_alive() β€” liveness detection via Playwright internals + 2. _replace_permanent() β€” dead permanent browser replacement + 3. _replace_pooled() β€” dead hot/cold browser replacement + 4. get_crawler() recovery β€” on-demand crash recovery paths + 5. janitor() health sweep β€” proactive dead-browser detection + +All tests use mocks β€” no live browser or Docker required. + +Import note: crawler_pool.py lives in deploy/docker/ and has a local `utils` +dependency. We inject a shim module into sys.modules before importing so the +tests run identically whether or not the container helpers are present. +""" + +import asyncio +import sys +import time +import types +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +# --------------------------------------------------------------------------- +# One-time module shim β€” must happen before `import crawler_pool` +# --------------------------------------------------------------------------- + +_DOCKER_DIR = Path(__file__).resolve().parent.parent.parent / "deploy" / "docker" + +_fake_utils = types.ModuleType("utils") +_fake_utils.load_config = lambda: {} +_fake_utils.get_container_memory_percent = lambda: 50.0 +sys.modules.setdefault("utils", _fake_utils) + +if str(_DOCKER_DIR) not in sys.path: + sys.path.insert(0, str(_DOCKER_DIR)) + +import crawler_pool # noqa: E402 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_standard_crawler(*, alive: bool = True) -> MagicMock: + """Return a fake AsyncWebCrawler backed by a standard (non-persistent) browser. + + If *alive* is True the Playwright transport reference is a real MagicMock + object (truthy). If False the transport is None, which is the synchronous + signal we use as the dead-browser indicator. + """ + transport = MagicMock() if alive else None + + conn = MagicMock() + conn._transport = transport + + channel = MagicMock() + channel._connection = conn + + browser = MagicMock() + browser._channel = channel + browser.is_connected.return_value = alive + + bm = MagicMock() + bm._launched_persistent = False + bm.browser = browser + + strategy = MagicMock() + strategy.browser_manager = bm + + crawler = MagicMock() + crawler.crawler_strategy = strategy + crawler.active_requests = 0 + return crawler + + +def _make_persistent_crawler(*, alive: bool = True) -> MagicMock: + """Return a fake AsyncWebCrawler backed by a persistent browser context.""" + transport = MagicMock() if alive else None + + conn = MagicMock() + conn._transport = transport + + impl = MagicMock() + impl._connection = conn + + ctx = MagicMock() + ctx._impl_obj = impl + + bm = MagicMock() + bm._launched_persistent = True + bm.default_context = ctx + + strategy = MagicMock() + strategy.browser_manager = bm + + crawler = MagicMock() + crawler.crawler_strategy = strategy + crawler.active_requests = 0 + return crawler + + +@pytest.fixture(autouse=True) +def reset_pool(): + """Reset all module-level pool globals before (and after) each test.""" + def _clear(): + crawler_pool.PERMANENT = None + crawler_pool.HOT_POOL.clear() + crawler_pool.COLD_POOL.clear() + crawler_pool.LAST_USED.clear() + crawler_pool.USAGE_COUNT.clear() + crawler_pool.DEFAULT_CONFIG_SIG = None + crawler_pool._PERMANENT_CFG = None + crawler_pool._PERMANENT_STARTED_AT = 0.0 + + _clear() + yield + _clear() + + +# =========================================================================== +# 1. _is_crawler_alive() +# =========================================================================== + + +class TestIsCrawlerAlive: + """Unit tests for the _is_crawler_alive() liveness probe.""" + + # --- standard browser --------------------------------------------------- + + def test_alive_standard_browser(self): + """Live transport + is_connected()=True β†’ alive.""" + crawler = _make_standard_crawler(alive=True) + assert crawler_pool._is_crawler_alive(crawler) is True + + def test_dead_standard_browser_transport_none(self): + """Transport is None (pipe closed) β†’ dead.""" + crawler = _make_standard_crawler(alive=False) + assert crawler_pool._is_crawler_alive(crawler) is False + + def test_transport_none_overrides_is_connected(self): + """Transport=None β†’ False even when is_connected() still says True. + + This is the key regression the PR fixes: browser.is_connected() lags + by several event-loop cycles after the OS pipe closes, so we check the + transport reference directly. + """ + crawler = _make_standard_crawler(alive=True) # live transport + # Simulate the lag: clear the transport but leave is_connected() True + bm = crawler.crawler_strategy.browser_manager + bm.browser._channel._connection._transport = None + bm.browser.is_connected.return_value = True # still lagging + + assert crawler_pool._is_crawler_alive(crawler) is False + + def test_is_connected_false_marks_dead(self): + """When transport exists but is_connected() is False β†’ dead.""" + crawler = _make_standard_crawler(alive=True) + crawler.crawler_strategy.browser_manager.browser.is_connected.return_value = ( + False + ) + assert crawler_pool._is_crawler_alive(crawler) is False + + # --- persistent context ------------------------------------------------- + + def test_alive_persistent_context(self): + """Live persistent context transport β†’ alive.""" + crawler = _make_persistent_crawler(alive=True) + assert crawler_pool._is_crawler_alive(crawler) is True + + def test_dead_persistent_context(self): + """Dead persistent context (transport=None) β†’ dead.""" + crawler = _make_persistent_crawler(alive=False) + assert crawler_pool._is_crawler_alive(crawler) is False + + def test_persistent_context_none(self): + """default_context is None β†’ dead.""" + crawler = _make_persistent_crawler(alive=True) + crawler.crawler_strategy.browser_manager.default_context = None + assert crawler_pool._is_crawler_alive(crawler) is False + + def test_persistent_impl_none(self): + """_impl_obj is None β†’ dead.""" + crawler = _make_persistent_crawler(alive=True) + crawler.crawler_strategy.browser_manager.default_context._impl_obj = None + assert crawler_pool._is_crawler_alive(crawler) is False + + # --- edge cases --------------------------------------------------------- + + def test_crawler_strategy_none(self): + """Missing crawler_strategy β†’ False (not a crash).""" + crawler = MagicMock() + crawler.crawler_strategy = None + assert crawler_pool._is_crawler_alive(crawler) is False + + def test_browser_manager_none(self): + """Missing browser_manager β†’ False.""" + crawler = MagicMock() + crawler.crawler_strategy.browser_manager = None + assert crawler_pool._is_crawler_alive(crawler) is False + + def test_browser_none(self): + """bm.browser is None (non-persistent) β†’ False.""" + crawler = _make_standard_crawler(alive=True) + crawler.crawler_strategy.browser_manager.browser = None + assert crawler_pool._is_crawler_alive(crawler) is False + + def test_exception_returns_false(self): + """Any unexpected exception during introspection β†’ False, never raises.""" + crawler = MagicMock() + # Accessing crawler_strategy raises to simulate a completely broken object + type(crawler).crawler_strategy = property( + lambda self: (_ for _ in ()).throw(RuntimeError("corrupted")) + ) + # Must not raise + assert crawler_pool._is_crawler_alive(crawler) is False + + +# =========================================================================== +# 2. _replace_permanent() +# =========================================================================== + + +class TestReplacePermanent: + """Tests for the permanent-browser crash-recovery helper.""" + + @pytest.mark.asyncio + async def test_closes_old_browser(self): + """The dead permanent browser is closed+killed before replacement.""" + old = AsyncMock() + new = AsyncMock() + new.start = AsyncMock() + + crawler_pool.PERMANENT = old + crawler_pool.DEFAULT_CONFIG_SIG = "sig1" + crawler_pool.LAST_USED["sig1"] = 0.0 + crawler_pool._PERMANENT_CFG = MagicMock() + + with ( + patch("crawler_pool._close_and_kill", new=AsyncMock()) as mock_kill, + patch("crawler_pool.AsyncWebCrawler", return_value=new), + ): + await crawler_pool._replace_permanent(reason="dead") + + mock_kill.assert_awaited_once_with(old) + + @pytest.mark.asyncio + async def test_uses_saved_config_for_new_browser(self): + """New browser is started using the config saved at init_permanent() time.""" + new = AsyncMock() + new.start = AsyncMock() + cfg = MagicMock() + + crawler_pool.PERMANENT = None + crawler_pool._PERMANENT_CFG = cfg + crawler_pool.DEFAULT_CONFIG_SIG = "sig1" + crawler_pool.LAST_USED["sig1"] = 0.0 + + with ( + patch("crawler_pool._close_and_kill", new=AsyncMock()), + patch("crawler_pool.AsyncWebCrawler", return_value=new) as mock_cls, + ): + await crawler_pool._replace_permanent() + + mock_cls.assert_called_once_with(config=cfg, thread_safe=False) + new.start.assert_awaited_once() + + @pytest.mark.asyncio + async def test_sets_permanent_and_returns_new_crawler(self): + """PERMANENT module global is updated and the new crawler is returned.""" + new = AsyncMock() + new.start = AsyncMock() + + crawler_pool._PERMANENT_CFG = MagicMock() + crawler_pool.DEFAULT_CONFIG_SIG = "sig1" + crawler_pool.LAST_USED["sig1"] = 0.0 + + with ( + patch("crawler_pool._close_and_kill", new=AsyncMock()), + patch("crawler_pool.AsyncWebCrawler", return_value=new), + ): + result = await crawler_pool._replace_permanent() + + assert result is new + assert crawler_pool.PERMANENT is new + + @pytest.mark.asyncio + async def test_updates_started_at_timestamp(self): + """_PERMANENT_STARTED_AT is refreshed to approximately now.""" + new = AsyncMock() + new.start = AsyncMock() + + crawler_pool._PERMANENT_CFG = MagicMock() + crawler_pool.DEFAULT_CONFIG_SIG = "sig1" + crawler_pool.LAST_USED["sig1"] = 0.0 + before = time.time() + + with ( + patch("crawler_pool._close_and_kill", new=AsyncMock()), + patch("crawler_pool.AsyncWebCrawler", return_value=new), + ): + await crawler_pool._replace_permanent() + + assert crawler_pool._PERMANENT_STARTED_AT >= before + assert crawler_pool._PERMANENT_STARTED_AT <= time.time() + 1 + + +# =========================================================================== +# 3. _replace_pooled() +# =========================================================================== + + +class TestReplacePooled: + """Tests for the hot/cold-pool crash-recovery helper.""" + + @pytest.mark.asyncio + async def test_removes_old_and_inserts_new(self): + """Dead crawler is closed+killed; fresh one takes its slot.""" + old = AsyncMock() + new = AsyncMock() + new.start = AsyncMock() + cfg = MagicMock() + sig = "deadpool" + pool = {sig: old} + crawler_pool.LAST_USED[sig] = 0.0 + + with ( + patch("crawler_pool._close_and_kill", new=AsyncMock()) as mock_kill, + patch("crawler_pool.AsyncWebCrawler", return_value=new), + ): + result = await crawler_pool._replace_pooled(pool, sig, cfg) + + assert result is new + assert pool[sig] is new + mock_kill.assert_awaited_once_with(old) + + @pytest.mark.asyncio + async def test_new_browser_starts_with_active_requests_one(self): + """Replacement enters the pool already holding one request slot.""" + new = AsyncMock() + new.start = AsyncMock() + cfg = MagicMock() + sig = "freshsig" + + with ( + patch("crawler_pool._close_and_kill", new=AsyncMock()), + patch("crawler_pool.AsyncWebCrawler", return_value=new), + ): + result = await crawler_pool._replace_pooled({}, sig, cfg) + + assert result.active_requests == 1 + + +# =========================================================================== +# 4. get_crawler() β€” on-demand crash recovery +# =========================================================================== + + +class TestGetCrawlerRecovery: + """Integration-level tests: get_crawler() triggers recovery when browser is dead.""" + + @pytest.mark.asyncio + async def test_dead_permanent_triggers_replace_permanent(self): + """get_crawler() calls _replace_permanent when the permanent browser is dead.""" + cfg = MagicMock() + sig = "perm" + fresh = AsyncMock() + fresh.active_requests = 0 + + crawler_pool.PERMANENT = _make_standard_crawler(alive=False) + crawler_pool.DEFAULT_CONFIG_SIG = sig + crawler_pool.LAST_USED[sig] = time.time() + crawler_pool.USAGE_COUNT[sig] = 0 + crawler_pool._PERMANENT_CFG = cfg + + with ( + patch("crawler_pool._sig", return_value=sig), + patch("crawler_pool._is_default_config", return_value=True), + patch("crawler_pool._is_crawler_alive", return_value=False), + patch( + "crawler_pool._replace_permanent", new=AsyncMock(return_value=fresh) + ) as mock_replace, + ): + result = await crawler_pool.get_crawler(cfg) + + mock_replace.assert_awaited_once() + assert result is fresh + + @pytest.mark.asyncio + async def test_alive_permanent_skips_replacement(self): + """get_crawler() does NOT call _replace_permanent for a healthy permanent.""" + cfg = MagicMock() + sig = "perm" + live = _make_standard_crawler(alive=True) + live.active_requests = 0 + + crawler_pool.PERMANENT = live + crawler_pool.DEFAULT_CONFIG_SIG = sig + crawler_pool.LAST_USED[sig] = time.time() + crawler_pool.USAGE_COUNT[sig] = 0 + + with ( + patch("crawler_pool._sig", return_value=sig), + patch("crawler_pool._is_default_config", return_value=True), + patch("crawler_pool._is_crawler_alive", return_value=True), + patch( + "crawler_pool._replace_permanent", new=AsyncMock() + ) as mock_replace, + ): + result = await crawler_pool.get_crawler(cfg) + + mock_replace.assert_not_awaited() + assert result is live + + @pytest.mark.asyncio + async def test_dead_hot_browser_triggers_replace_pooled(self): + """get_crawler() replaces a dead hot-pool browser inline.""" + cfg = MagicMock() + sig = "hot" + dead = _make_standard_crawler(alive=False) + fresh = AsyncMock() + fresh.active_requests = 1 + + crawler_pool.HOT_POOL[sig] = dead + crawler_pool.LAST_USED[sig] = time.time() + crawler_pool.USAGE_COUNT[sig] = 5 + + with ( + patch("crawler_pool._sig", return_value=sig), + patch("crawler_pool._is_default_config", return_value=False), + patch("crawler_pool._is_crawler_alive", return_value=False), + patch( + "crawler_pool._replace_pooled", new=AsyncMock(return_value=fresh) + ) as mock_replace, + ): + result = await crawler_pool.get_crawler(cfg) + + mock_replace.assert_awaited_once_with(crawler_pool.HOT_POOL, sig, cfg) + assert result is fresh + + @pytest.mark.asyncio + async def test_dead_cold_browser_triggers_replace_pooled(self): + """get_crawler() replaces a dead cold-pool browser inline.""" + cfg = MagicMock() + sig = "cold" + dead = _make_standard_crawler(alive=False) + fresh = AsyncMock() + fresh.active_requests = 1 + + crawler_pool.COLD_POOL[sig] = dead + crawler_pool.LAST_USED[sig] = time.time() + crawler_pool.USAGE_COUNT[sig] = 1 + + with ( + patch("crawler_pool._sig", return_value=sig), + patch("crawler_pool._is_default_config", return_value=False), + patch("crawler_pool._is_crawler_alive", return_value=False), + patch( + "crawler_pool._replace_pooled", new=AsyncMock(return_value=fresh) + ) as mock_replace, + ): + result = await crawler_pool.get_crawler(cfg) + + mock_replace.assert_awaited_once_with(crawler_pool.COLD_POOL, sig, cfg) + assert result is fresh + + +# =========================================================================== +# 5. janitor() β€” proactive health sweep +# =========================================================================== + + +def _janitor_sleep_side_effect(): + """Return a side_effect list for asyncio.sleep that lets one tick run then stops. + + The janitor loop is structured as: + while True: + mem_pct = ... + await asyncio.sleep(interval) ← 1st call: return normally + async with LOCK: + # health checks happen here + # back to top of loop + await asyncio.sleep(interval) ← 2nd call: raise CancelledError + """ + return [None, asyncio.CancelledError()] + + +class TestJanitorHealthSweep: + """Tests for the proactive dead-browser detection inside the janitor loop.""" + + @pytest.mark.asyncio + async def test_dead_permanent_triggers_replace_permanent(self): + """Janitor calls _replace_permanent when the permanent browser is dead.""" + dead = _make_standard_crawler(alive=False) + crawler_pool.PERMANENT = dead + crawler_pool.DEFAULT_CONFIG_SIG = "perm" + crawler_pool.LAST_USED["perm"] = time.time() + crawler_pool._PERMANENT_CFG = MagicMock() + crawler_pool._PERMANENT_STARTED_AT = time.time() + + with ( + patch( + "asyncio.sleep", + new=AsyncMock(side_effect=_janitor_sleep_side_effect()), + ), + patch( + "crawler_pool.get_container_memory_percent", return_value=50.0 + ), + patch("crawler_pool._is_crawler_alive", return_value=False), + patch( + "crawler_pool._replace_permanent", new=AsyncMock() + ) as mock_replace, + ): + with pytest.raises(asyncio.CancelledError): + await crawler_pool.janitor() + + mock_replace.assert_awaited_once_with(reason="dead") + + @pytest.mark.asyncio + async def test_dead_hot_browser_evicted_and_killed(self): + """Janitor removes a dead hot-pool browser and force-kills its process.""" + sig = "hot_dead" + dead = _make_standard_crawler(alive=False) + crawler_pool.HOT_POOL[sig] = dead + crawler_pool.LAST_USED[sig] = time.time() + crawler_pool.USAGE_COUNT[sig] = 3 + + with ( + patch( + "asyncio.sleep", + new=AsyncMock(side_effect=_janitor_sleep_side_effect()), + ), + patch( + "crawler_pool.get_container_memory_percent", return_value=50.0 + ), + patch("crawler_pool._is_crawler_alive", return_value=False), + patch( + "crawler_pool._close_and_kill", new=AsyncMock() + ) as mock_kill, + ): + with pytest.raises(asyncio.CancelledError): + await crawler_pool.janitor() + + assert sig not in crawler_pool.HOT_POOL + assert sig not in crawler_pool.LAST_USED + mock_kill.assert_awaited_once_with(dead) + + @pytest.mark.asyncio + async def test_dead_cold_browser_evicted_and_killed(self): + """Janitor removes a dead cold-pool browser.""" + sig = "cold_dead" + dead = _make_standard_crawler(alive=False) + crawler_pool.COLD_POOL[sig] = dead + crawler_pool.LAST_USED[sig] = time.time() + crawler_pool.USAGE_COUNT[sig] = 1 + + with ( + patch( + "asyncio.sleep", + new=AsyncMock(side_effect=_janitor_sleep_side_effect()), + ), + patch( + "crawler_pool.get_container_memory_percent", return_value=50.0 + ), + patch("crawler_pool._is_crawler_alive", return_value=False), + patch( + "crawler_pool._close_and_kill", new=AsyncMock() + ) as mock_kill, + ): + with pytest.raises(asyncio.CancelledError): + await crawler_pool.janitor() + + assert sig not in crawler_pool.COLD_POOL + mock_kill.assert_awaited_once_with(dead) + + @pytest.mark.asyncio + async def test_alive_hot_browser_left_in_pool(self): + """Janitor does not touch a live hot-pool browser.""" + sig = "hot_alive" + live = _make_standard_crawler(alive=True) + live.active_requests = 0 + # Set last-used to now so the idle-TTL cleanup doesn't fire either + crawler_pool.HOT_POOL[sig] = live + crawler_pool.LAST_USED[sig] = time.time() + crawler_pool.USAGE_COUNT[sig] = 2 + + with ( + patch( + "asyncio.sleep", + new=AsyncMock(side_effect=_janitor_sleep_side_effect()), + ), + patch( + "crawler_pool.get_container_memory_percent", return_value=50.0 + ), + patch("crawler_pool._is_crawler_alive", return_value=True), + patch( + "crawler_pool._close_and_kill", new=AsyncMock() + ) as mock_kill, + ): + with pytest.raises(asyncio.CancelledError): + await crawler_pool.janitor() + + assert sig in crawler_pool.HOT_POOL + mock_kill.assert_not_awaited()