Feat/cron#913
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a new unified memory and session architecture, adding a pluggable MemoryBackend system (with adapters for file-based, ReMe, mem0, mempalace, ByteRover, and Supermemory storage) and a non-destructive ContextAssembler for message history compaction. It also introduces a standalone CronService and CronTool for managing scheduled agent tasks. The review comments correctly identify critical issues, including potential CPU exhaustion/hangs in the cron schedule catch-up logic, platform compatibility issues with os.fork on Windows, and blocking synchronous thread joins (self._sync_thread.join) that degrade the performance of the asynchronous event loop in the Supermemory and ByteRover adapters.
| if schedule.kind == 'cron': | ||
| croniter_cls = _try_import_croniter() | ||
| import pytz | ||
| tz = pytz.timezone(schedule.timezone) if schedule.timezone else timezone.utc | ||
| local_base = base.astimezone(tz) | ||
| cron = croniter_cls(schedule.expr, local_base) | ||
| next_dt = cron.get_next(datetime) | ||
| while next_dt.astimezone(timezone.utc) <= now: | ||
| next_dt = cron.get_next(datetime) | ||
| return next_dt.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S+00:00') | ||
|
|
||
| elif schedule.kind == 'interval': | ||
| from datetime import timedelta | ||
| step = timedelta(seconds=schedule.interval_seconds) | ||
| candidate = base + step | ||
| while candidate <= now: | ||
| candidate += step | ||
| return candidate.strftime('%Y-%m-%dT%H:%M:%S+00:00') |
There was a problem hiding this comment.
The while loops used to catch up expired schedules to now can cause severe CPU exhaustion and process hangs if the base (or current_next) timestamp is far in the past (e.g., if the daemon was stopped for a long time or due to clock synchronization issues). For cron schedules, we can fast-forward the base time to now before initializing croniter. For interval schedules, we can mathematically calculate the number of steps required to exceed now in
| if schedule.kind == 'cron': | |
| croniter_cls = _try_import_croniter() | |
| import pytz | |
| tz = pytz.timezone(schedule.timezone) if schedule.timezone else timezone.utc | |
| local_base = base.astimezone(tz) | |
| cron = croniter_cls(schedule.expr, local_base) | |
| next_dt = cron.get_next(datetime) | |
| while next_dt.astimezone(timezone.utc) <= now: | |
| next_dt = cron.get_next(datetime) | |
| return next_dt.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S+00:00') | |
| elif schedule.kind == 'interval': | |
| from datetime import timedelta | |
| step = timedelta(seconds=schedule.interval_seconds) | |
| candidate = base + step | |
| while candidate <= now: | |
| candidate += step | |
| return candidate.strftime('%Y-%m-%dT%H:%M:%S+00:00') | |
| if schedule.kind == 'cron': | |
| croniter_cls = _try_import_croniter() | |
| import pytz | |
| tz = pytz.timezone(schedule.timezone) if schedule.timezone else timezone.utc | |
| local_base = max(base, now).astimezone(tz) | |
| cron = croniter_cls(schedule.expr, local_base) | |
| next_dt = cron.get_next(datetime) | |
| return next_dt.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S+00:00') | |
| elif schedule.kind == 'interval': | |
| from datetime import timedelta | |
| step = timedelta(seconds=schedule.interval_seconds) | |
| if base <= now: | |
| diff = now - base | |
| steps = int(diff.total_seconds() // schedule.interval_seconds) + 1 | |
| candidate = base + steps * step | |
| else: | |
| candidate = base + step | |
| return candidate.strftime('%Y-%m-%dT%H:%M:%S+00:00') |
| pid = os.fork() | ||
| if pid > 0: | ||
| print(f'Cron daemon started (PID {pid}, workspace: {service.workspace})') | ||
| return | ||
| os.setsid() | ||
| asyncio.run(service.run_forever()) |
There was a problem hiding this comment.
The use of os.fork() and os.setsid() is Unix-only and will raise an AttributeError on Windows platforms. To ensure cross-platform compatibility, guard the daemonization logic with a check for os.fork availability and print a helpful error message on unsupported platforms.
| pid = os.fork() | |
| if pid > 0: | |
| print(f'Cron daemon started (PID {pid}, workspace: {service.workspace})') | |
| return | |
| os.setsid() | |
| asyncio.run(service.run_forever()) | |
| if not hasattr(os, 'fork'): | |
| print('Daemon mode is not supported on this platform. Please use --foreground.', file=sys.stderr) | |
| sys.exit(1) | |
| pid = os.fork() | |
| if pid > 0: | |
| print(f'Cron daemon started (PID {pid}, workspace: {service.workspace})') | |
| return | |
| os.setsid() | |
| asyncio.run(service.run_forever()) |
| def _background_add(self, content: str) -> None: | ||
| """Add a memory document in a background thread.""" | ||
| if self._sync_thread and self._sync_thread.is_alive(): | ||
| self._sync_thread.join(timeout=2.0) | ||
|
|
||
| def _add(): | ||
| try: | ||
| self._client.documents.add( | ||
| content=content, | ||
| container_tags=[self._container_tag], | ||
| entity_context=self._entity_context, | ||
| metadata={ | ||
| "source": "ms-agent", | ||
| "type": "conversation_turn", | ||
| }, | ||
| ) | ||
| except Exception as e: | ||
| logger.debug("[supermemory_backend] add failed: %s", e) |
There was a problem hiding this comment.
Calling self._sync_thread.join(timeout=2.0) synchronously inside the main thread blocks the asyncio event loop, which can severely degrade the responsiveness of the asynchronous agent framework. Since this is a fire-and-forget background operation, consider running it completely asynchronously using loop.run_in_executor to avoid blocking the event loop.
def _background_add(self, content: str) -> None:
"""Add a memory document in a background thread."""
def _add():
try:
self._client.documents.add(
content=content,
container_tags=[self._container_tag],
entity_context=self._entity_context,
metadata={
"source": "ms-agent",
"type": "conversation_turn",
},
)
except Exception as e:
logger.debug("[supermemory_backend] add failed: %s", e)
import asyncio
try:
loop = asyncio.get_running_loop()
loop.run_in_executor(None, _add)
except RuntimeError:
self._sync_thread = threading.Thread(target=_add, daemon=True, name="supermemory-add")
self._sync_thread.start()| def _background_curate(self, content: str, wait: bool = False) -> None: | ||
| """Run ``brv curate`` in a background thread.""" | ||
| if self._sync_thread and self._sync_thread.is_alive(): | ||
| self._sync_thread.join(timeout=5.0) | ||
|
|
||
| def _curate(): | ||
| try: | ||
| _run_brv( | ||
| ["curate", "--", content], | ||
| timeout=self._curate_timeout, cwd=self._cwd, | ||
| ) | ||
| except Exception as e: | ||
| logger.debug("[byterover_backend] curate failed: %s", e) | ||
|
|
||
| self._sync_thread = threading.Thread( | ||
| target=_curate, daemon=True, name="brv-curate") | ||
| self._sync_thread.start() | ||
|
|
||
| if wait: | ||
| self._sync_thread.join(timeout=float(self._curate_timeout)) |
There was a problem hiding this comment.
Calling self._sync_thread.join(timeout=5.0) synchronously inside the main thread blocks the asyncio event loop, which can severely degrade the responsiveness of the asynchronous agent framework. Since this is a fire-and-forget background operation when wait is False, consider running it completely asynchronously using loop.run_in_executor to avoid blocking the event loop.
def _background_curate(self, content: str, wait: bool = False) -> None:
"""Run ``brv curate`` in a background thread."""
def _curate():
try:
_run_brv(
["curate", "--", content],
timeout=self._curate_timeout, cwd=self._cwd,
)
except Exception as e:
logger.debug("[byterover_backend] curate failed: %s", e)
if wait:
_curate()
else:
import asyncio
try:
loop = asyncio.get_running_loop()
loop.run_in_executor(None, _curate)
except RuntimeError:
self._sync_thread = threading.Thread(target=_curate, daemon=True, name="brv-curate")
self._sync_thread.start()
Change Summary
Add support for cron as a CLI, tool, and service.
Related issue number
Checklist
pre-commit installandpre-commit run --all-filesbefore git commit, and passed lint check.