Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions src/crawlee/crawlers/_basic/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,27 +768,36 @@ def sigint_handler() -> None:
return final_statistics

async def _run_crawler(self) -> None:
event_manager = self._service_locator.get_event_manager()
local_event_manager = self._service_locator.get_event_manager()
global_event_manager = service_locator.get_event_manager()
if local_event_manager is global_event_manager:
local_event_manager = None # Avoid entering the same event manager context twice

# The event managers are always entered.
contexts_to_enter: list[Any] = (
[global_event_manager, local_event_manager] if local_event_manager else [global_event_manager]
)
Comment on lines +777 to +779
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not so sure about this. Why should the crawler initialize a component that it doesn't use? And then tear it down? I think we should try to see a bigger picture first.

The linked issues are caused by Snapshotter and RecoverableState subscribing to an event manager that doesn't emit anything - how does that happen?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to remove event_manager from the BasicCrawler constructor entirely and keep only the global one, configurable via service_locator.set_event_manager. However, this requires discussion and should probably be part of v2.

I'm not so sure about this. Why should the crawler initialize a component that it doesn't use? And then tear it down? I think we should try to see a bigger picture first.

The global event_manager being started inside the crawler, while only being used by the crawler's internal components, is an early architectural decision. I assume that it was done for a better user experience.

The linked issues are caused by Snapshotter and RecoverableState subscribing to an event manager that doesn't emit anything - how does that happen?

I believe the root cause is bugs introduced during our work on supporting multiple parallel crawlers:

  • An edge case we didn't account for, demonstrated by test_multiple_crawlers_with_global_event_manager. The first crawler to activate the event_manager is fully responsible for its lifecycle. When the crawler finishes, it tears down the event_manager, which clears all subscriptions, leaving any still-running crawlers without a functioning event_manager.
  • The introduction of an internal _service_locator in BasicCrawler, which caused the user-provided event_manager to be used only for Event.CRAWLER_STATUS in _crawler_state_task, while internal components (Snapshotter, RecoverableState) continued subscribing to the global service_locator, which may never be started.

All that made the behavior of service_locator.get_event_manager() unstable and unpredictable.

This PR is an attempt to fix these issues within the current architecture without introducing breaking changes.

Copy link
Copy Markdown
Collaborator

@Pijukatel Pijukatel Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the fixes that solve 2 Crawlers using the same EventManager are good. Regarding the global event manager. I see this as a quick fix.

I would prefer to remove event_manager from the BasicCrawler constructor entirely...

I think this would lead to counterintuitive behavior due to Configuration having EventManager specific fields and they would be ignored in cases like this:

# Global event manager already exists
BasicCrawler(configuration=Configuration(persist_state_interval=...))

Maybe it would be possible for each subscriber to the event manager to have at least an optional init argument that would allow passing an explicit event_manager (if it is creating storages, then maybe service_locator argument would be even better) instead of relying on the global one.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When porting the service locator to crawlee-js (apify/crawlee#3325), I actually made the global serviceLocator object into a proxy that resolves to the crawler-scoped ServiceLocator when called from a crawler instance and to the global one otherwise.

If we adopted that in Python as well, it would solve the inconsistency caused by Snapshotter and friends still using the global locator.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll study that solution

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if you get stuck on something, it's kinda magic. But that's the cost of making the library usable without factories, builders and explicit dependency containers.


# Collect the context managers to be entered. Context managers that are already active are excluded,
# as they were likely entered by the caller, who will also be responsible for exiting them.
contexts_to_enter = [
cm
for cm in (
event_manager,
self._snapshotter,
self._statistics,
self._session_pool if self._use_session_pool else None,
self._http_client,
self._crawler_state_rec_task,
*self._additional_context_managers,
)
if cm and getattr(cm, 'active', False) is False
]
contexts_to_enter.extend(
[
cm
for cm in (
self._snapshotter,
self._statistics,
self._session_pool if self._use_session_pool else None,
self._http_client,
self._crawler_state_rec_task,
*self._additional_context_managers,
)
if cm and getattr(cm, 'active', False) is False
]
)

async with AsyncExitStack() as exit_stack:
for context in contexts_to_enter:
await exit_stack.enter_async_context(context) # ty: ignore[invalid-argument-type]
await exit_stack.enter_async_context(context)

await self._autoscaled_pool.run()

Expand Down
28 changes: 15 additions & 13 deletions src/crawlee/events/_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,24 +93,20 @@ def __init__(
delay=self._persist_state_interval,
)

# Flag to indicate the context state.
self._active = False
# Reference count for active contexts.
self._active_ref_count = 0

@property
def active(self) -> bool:
"""Indicate whether the context is active."""
return self._active
return self._active_ref_count > 0

async def __aenter__(self) -> EventManager:
"""Initialize the event manager upon entering the async context.
"""Initialize the event manager upon entering the async context."""
self._active_ref_count += 1
if self._active_ref_count > 1:
return self
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential task leak for multi-crawler scenarios; from Claude:

Bug: LocalEventManager not updated for ref-counting — causes task leak

The base class now correctly guards _emit_persist_state_event_rec_task.start() behind ref_count == 1 (line 110). However, LocalEventManager.__aenter__ (in _local_event_manager.py:72-79) unconditionally calls self._emit_system_info_event_rec_task.start() on every entry:

async def __aenter__(self) -> LocalEventManager:
    await super().__aenter__()
    self._emit_system_info_event_rec_task.start()  # Called EVERY time!
    return self

Since RecurringTask.start() creates a new asyncio.Task and overwrites self.task without cancelling the previous one (_utils/recurring_task.py:60-66), the old task is leaked (still running, but no reference to cancel it).

In the multi-crawler scenario with the default shared global LocalEventManager:

  1. Crawler 1 enters → ref_count=1, system_info task A created
  2. Crawler 2 enters → ref_count=2, system_info task B created, task A orphaned
  3. Crawler 1 exits → stop() cancels task B, ref_count=1 — but task A is still running with no reference
  4. Crawler 2 exits → stop() tries to cancel already-cancelled task B — task A never stopped

LocalEventManager needs the same ref-count-aware guards:

async def __aenter__(self) -> LocalEventManager:
    await super().__aenter__()
    if self._active_ref_count == 1:  # Only start on first entry
        self._emit_system_info_event_rec_task.start()
    return self

async def __aexit__(self, ...) -> None:
    if self._active_ref_count == 1:  # Only stop on last exit
        await self._emit_system_info_event_rec_task.stop()
    await super().__aexit__(exc_type, exc_value, exc_traceback)

Note: checking == 1 in __aexit__ must happen before super().__aexit__() decrements the count.


Raises:
RuntimeError: If the context manager is already active.
"""
if self._active:
raise RuntimeError(f'The {self.__class__.__name__} is already active.')

self._active = True
self._emit_persist_state_event_rec_task.start()
return self

Expand All @@ -127,17 +123,23 @@ async def __aexit__(
Raises:
RuntimeError: If the context manager is not active.
"""
if not self._active:
if not self.active:
raise RuntimeError(f'The {self.__class__.__name__} is not active.')

if self._active_ref_count > 1:
# Emit persist state event to ensure the latest state is saved before closing the context.
await self._emit_persist_state_event()
self._active_ref_count -= 1
return
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't call wait_for_all_listeners_to_complete in this branch.

From Claude:

Consideration: Intermediate exit skips listener cleanup

When ref_count > 1, the exit emits a persist-state event but does not call wait_for_all_listeners_to_complete() or clean up listeners registered by the exiting crawler.

In practice this is likely fine because individual components (Snapshotter, RecoverableState) clean up their own listeners via off() in their __aexit__/teardown(). But any component that relies on the event manager's remove_all_listeners() as the cleanup mechanism would have stale listeners firing after its context is torn down.

Worth adding a comment here explaining this design choice — e.g. that individual components are responsible for their own listener cleanup, and remove_all_listeners() on final exit is just a safety net.


# Stop persist state event periodic emission and manually emit last one to ensure latest state is saved.
await self._emit_persist_state_event_rec_task.stop()
await self._emit_persist_state_event()
await self.wait_for_all_listeners_to_complete(timeout=self._close_timeout)
self._event_emitter.remove_all_listeners()
self._listener_tasks.clear()
self._listeners_to_wrappers.clear()
self._active = False
self._active_ref_count -= 1

@overload
def on(self, *, event: Literal[Event.PERSIST_STATE], listener: EventListener[EventPersistStateData]) -> None: ...
Expand Down
104 changes: 102 additions & 2 deletions tests/unit/crawlers/_basic/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
from crawlee.configuration import Configuration
from crawlee.crawlers import BasicCrawler
from crawlee.errors import RequestCollisionError, SessionError, UserDefinedErrorHandlerError
from crawlee.events import Event, EventCrawlerStatusData
from crawlee.events._local_event_manager import LocalEventManager
from crawlee.events import Event, EventCrawlerStatusData, LocalEventManager
from crawlee.request_loaders import RequestList, RequestManagerTandem
from crawlee.sessions import Session, SessionPool
from crawlee.statistics import FinalStatistics
Expand Down Expand Up @@ -2047,3 +2046,104 @@ async def error_handler(context: BasicCrawlingContext, error: Exception) -> Requ
assert error_request is not None
assert error_request.state == RequestState.DONE
assert error_request.was_already_handled


@pytest.mark.skipif(sys.version_info[:3] < (3, 11), reason='asyncio.Barrier was introduced in Python 3.11.')
async def test_multiple_crawlers_with_global_event_manager() -> None:
"""Test that multiple crawlers work correctly when using the global event manager."""

rq1 = await RequestQueue.open(alias='rq1')
rq2 = await RequestQueue.open(alias='rq2')

crawler_1 = BasicCrawler(request_manager=rq1)
crawler_2 = BasicCrawler(request_manager=rq2)

started_event = asyncio.Event()
finished_event = asyncio.Event()

async def launch_crawler_1() -> None:
await crawler_1.run(['https://a.placeholder.com'])
finished_event.set()

async def launch_crawler_2() -> None:
# Ensure that crawler_1 is already running and has activated event_manager
await started_event.wait()
await crawler_2.run(['https://b.placeholder.com'])

handler_barrier = asyncio.Barrier(2) # ty:ignore[unresolved-attribute] # Test is skipped in older Python versions.

handler_call = AsyncMock()

@crawler_1.router.default_handler
async def handler_1(context: BasicCrawlingContext) -> None:
started_event.set()
# Ensure that both handlers are running at the same time.
await handler_barrier.wait()
event_manager = service_locator.get_event_manager()

await handler_call(event_manager.active)

@crawler_2.router.default_handler
async def handler_2(context: BasicCrawlingContext) -> None:
# Ensure that both handlers are running at the same time.
await handler_barrier.wait()
# Ensure that crawler_1 is finished and closed all active contexts.
await finished_event.wait()
# Check that event manager is active and can be used in the second crawler.
event_manager = service_locator.get_event_manager()

await handler_call(event_manager.active)

await asyncio.gather(
launch_crawler_1(),
launch_crawler_2(),
)

assert handler_call.call_count == 2

first_call = handler_call.call_args_list[0]
second_call = handler_call.call_args_list[1]

assert first_call[0][0] is True
assert second_call[0][0] is True

event_manager = service_locator.get_event_manager()

# After both crawlers are finished, event manager should be inactive.
assert event_manager.active is False

await rq1.drop()
await rq2.drop()


async def test_global_and_local_event_manager_in_crawler_run() -> None:
"""Test that both global and local event managers are used in crawler run."""

config = service_locator.get_configuration()

local_event_manager = LocalEventManager.from_config(config)

crawler = BasicCrawler(event_manager=local_event_manager)

handler_call = AsyncMock()

@crawler.router.default_handler
async def handler(context: BasicCrawlingContext) -> None:
global_event_manager = service_locator.get_event_manager()
handler_call(local_event_manager.active, global_event_manager.active)

await crawler.run(['https://a.placeholder.com'])

assert handler_call.call_count == 1

local_em_state, global_em_state = handler_call.call_args_list[0][0]

# Both event managers should be active.
assert local_em_state is True
assert global_em_state is True

global_event_manager = service_locator.get_event_manager()

# After crawler is finished, both event managers should be inactive.
assert local_event_manager.active is False
assert global_event_manager.active is False
4 changes: 0 additions & 4 deletions tests/unit/events/test_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,6 @@ async def test_methods_raise_error_when_not_active(event_system_info_data: Event
with pytest.raises(RuntimeError, match=r'EventManager is not active.'):
await event_manager.wait_for_all_listeners_to_complete()

with pytest.raises(RuntimeError, match=r'EventManager is already active.'):
async with event_manager, event_manager:
pass

async with event_manager:
event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_system_info_data)
await event_manager.wait_for_all_listeners_to_complete()
Expand Down
Loading