Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
58d4554
feat(agent): add search term support to drive.countFiles tool
AperturePlus May 26, 2026
baa3a16
feat(agent): add SSE event streaming for agent job lifecycle
AperturePlus May 26, 2026
80b88bd
feat(web): render agent activity events in TurnEntry component
AperturePlus May 26, 2026
e3f9744
feat(admin): add usage statistics to admin users panel
AperturePlus May 26, 2026
394ba18
feat: add download rate limiting with user and IP tracking
AperturePlus May 26, 2026
87c935f
docs(agent): spec for agent subsystem improvements (interaction, sess…
AperturePlus May 26, 2026
0d87046
docs(agent): implementation plan for subproject A (interaction layer,…
AperturePlus May 26, 2026
b1ba1c9
docs(agent): implementation plan for subproject A (interaction layer,…
AperturePlus May 26, 2026
604d6ce
feat(agent): add inbox foundation — settings, enums, model, V14 migra…
AperturePlus May 26, 2026
1ae2b4e
feat(agent): add ToolRegistry and EventBus core services
AperturePlus May 26, 2026
cd30f57
refactor(agent): use ToolRegistry in policy, router, deps, and skill …
AperturePlus May 26, 2026
50c2c1d
feat(agent): add AgentInbox, AskProtocol, and POST /agent/jobs/{id}/m…
AperturePlus May 26, 2026
967d7a1
feat(agent): wire runners and worker with EventBus, inbox controls, L…
AperturePlus May 26, 2026
7ba35c9
feat(web): add agent interaction frontend — inbox controls, ask/reply…
AperturePlus May 26, 2026
6f7b200
docs: fix markdown formatting in implementation plans
AperturePlus May 26, 2026
c2dab2b
feat(auth): adjust risk-control default thresholds
AperturePlus May 26, 2026
53f5546
feat(agent): add planning evidence tracking and grounded write summary
AperturePlus May 26, 2026
5182a5d
feat(web): display planning evidence in agent turn entry
AperturePlus May 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions app/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ JWT_SECRET_KEY=please-set-at-least-32-bytes-secret-key
# TOKEN_HASH_SECRET=please-set-at-least-32-bytes-and-different-from-jwt-secret
ACCESS_TOKEN_EXPIRE_MINUTES=4320
REFRESH_TOKEN_EXPIRE_DAYS=7
# Auth risk-control defaults (can be overridden per environment)
MAX_FAILED_LOGIN_ATTEMPTS=8
ACCOUNT_LOCK_MINUTES=5
REGISTER_RATE_LIMIT=12
LOGIN_RATE_LIMIT=30

REDIS_URL=redis://localhost:6379/0
# RabbitMQ is optional in this stage, kept for future publisher swap.
Expand Down Expand Up @@ -60,10 +65,11 @@ AGENT_COMPACT_THRESHOLD=0.75
AGENT_USER_DAILY_LIMIT=50
AGENT_USER_CONCURRENT_LIMIT=2
AGENT_STAGING_TTL_SEC=86400
AGENT_SSE_ENABLED=false
AGENT_SSE_ENABLED=true
AGENT_LLM_PROVIDER=anthropic
AGENT_LLM_MODEL=claude-sonnet-4-6
# Configure provider compatibility only via base URL + API key token.
# Example (DeepSeek Anthropic-compatible endpoint): https://api.deepseek.com/anthropic
AGENT_LLM_PLAN_MAX_TOKENS=8192
# For Anthropic-compatible providers, e.g. DeepSeek: https://api.deepseek.com/anthropic
# AGENT_LLM_BASE_URL=
# AGENT_LLM_API_KEY=
AGENT_MCP_ENDPOINTS=[]
Expand Down
5 changes: 5 additions & 0 deletions app/src/fileflash/agents/harness/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .policy import PolicyDecision, PolicyGuard
from .prompt import PromptBuildRequest, PromptBuilder
from .router import ToolCall, ToolRouter
from .tool_registry import REGISTRY, ToolContext, ToolRegistry, ToolSpec

__all__ = [
"AgentEvent",
Expand All @@ -20,6 +21,10 @@
"PolicyGuard",
"PromptBuildRequest",
"PromptBuilder",
"REGISTRY",
"ToolCall",
"ToolContext",
"ToolRegistry",
"ToolRouter",
"ToolSpec",
]
128 changes: 128 additions & 0 deletions app/src/fileflash/agents/harness/ask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from __future__ import annotations

import asyncio
import contextlib
from datetime import UTC, datetime
from typing import Any

from sqlalchemy.ext.asyncio import AsyncSession

from ...repositories import AgentInboxMessageRepository
from .event_bus import AgentEventBus, AgentEventEnvelope, AgentEventStream


class AskTimedOut(Exception):
def __init__(self, *, ask_id: int) -> None:
super().__init__(f"Ask {ask_id} timed out")
self.ask_id = ask_id


class AskProtocol:
def __init__(
self,
*,
db: AsyncSession,
event_bus: AgentEventBus,
job_id: int,
) -> None:
self._db = db
self._bus = event_bus
self._job_id = job_id
self._repo = AgentInboxMessageRepository(db)
self._waiters: dict[int, asyncio.Future[Any]] = {}
self._sub_ctx = None
self._sub_stream: AgentEventStream | None = None
self._sub_task: asyncio.Task[None] | None = None

async def start(self) -> None:
self._sub_ctx = self._bus.subscribe(job_id=self._job_id)
self._sub_stream = await self._sub_ctx.__aenter__()
self._sub_task = asyncio.create_task(self._listen())

async def aclose(self) -> None:
if self._sub_task is not None:
self._sub_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._sub_task
if self._sub_ctx is not None:
await self._sub_ctx.__aexit__(None, None, None)
for future in self._waiters.values():
if not future.done():
future.cancel()

async def ask(
self,
*,
prompt: str,
schema: dict[str, Any],
timeout_sec: float,
) -> Any:
msg = await self._repo.create_ask(
job_id=self._job_id,
payload={"prompt": prompt, "schema": schema, "timeoutSec": timeout_sec},
)
await self._db.commit()

ask_id = int(msg.inbox_message_id)
loop = asyncio.get_running_loop()
future: asyncio.Future[Any] = loop.create_future()
self._waiters[ask_id] = future

await self._bus.publish(
AgentEventEnvelope(
job_id=self._job_id,
event_type="agent.ask",
payload={
"messageId": str(ask_id),
"prompt": prompt,
"schema": schema,
"timeoutSec": timeout_sec,
},
emitted_at=datetime.now(UTC),
)
)

try:
value = await asyncio.wait_for(future, timeout=timeout_sec)
except TimeoutError as exc:
await self._repo.mark_timed_out(
inbox_message_id=ask_id,
answered_at=datetime.now(UTC),
)
await self._db.commit()
raise AskTimedOut(ask_id=ask_id) from exc
finally:
self._waiters.pop(ask_id, None)

await self._repo.mark_answered(
inbox_message_id=ask_id,
answered_at=datetime.now(UTC),
)
await self._db.commit()
return value

async def _listen(self) -> None:
assert self._sub_stream is not None
while True:
try:
envelope = await self._sub_stream.next(timeout=None)
except asyncio.CancelledError:
raise
except Exception:
continue
if envelope.event_type != "agent.inbox.reply":
continue
reply_to = envelope.payload.get("replyTo")
if reply_to is None:
continue
try:
ask_id = int(reply_to)
except (TypeError, ValueError):
continue
future = self._waiters.get(ask_id)
if future is None or future.done():
continue
future.set_result(envelope.payload.get("value"))


__all__ = ["AskProtocol", "AskTimedOut"]
194 changes: 194 additions & 0 deletions app/src/fileflash/agents/harness/event_bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
from __future__ import annotations

import asyncio
import contextlib
import json
import logging
from collections.abc import AsyncIterator
from contextlib import AbstractAsyncContextManager
from dataclasses import asdict, dataclass
from datetime import datetime
from typing import Any, Protocol

from fastapi.encoders import jsonable_encoder
from redis.asyncio import Redis

from ...core.settings import Settings, get_settings

logger = logging.getLogger(__name__)


@dataclass(slots=True)
class AgentEventEnvelope:
job_id: int
event_type: str
payload: dict[str, Any]
emitted_at: datetime
event_id: str | None = None

def to_json(self) -> str:
body = jsonable_encoder(asdict(self))
return json.dumps(body, ensure_ascii=False, separators=(",", ":"))

@classmethod
def from_json(cls, raw: str) -> AgentEventEnvelope:
data = json.loads(raw)
return cls(
job_id=int(data["job_id"]),
event_type=str(data["event_type"]),
payload=dict(data.get("payload") or {}),
emitted_at=datetime.fromisoformat(data["emitted_at"]),
event_id=data.get("event_id"),
)


class AgentEventStream(Protocol):
async def next(self, *, timeout: float | None = None) -> AgentEventEnvelope: ...
async def aclose(self) -> None: ...


class AgentEventBus(Protocol):
async def publish(self, envelope: AgentEventEnvelope) -> None: ...

def subscribe(
self,
*,
job_id: int,
) -> AbstractAsyncContextManager[AgentEventStream]: ...


@dataclass(slots=True)
class _InMemoryStream:
queue: asyncio.Queue[AgentEventEnvelope]

async def next(self, *, timeout: float | None = None) -> AgentEventEnvelope:
if timeout is None:
return await self.queue.get()
return await asyncio.wait_for(self.queue.get(), timeout=timeout)

async def aclose(self) -> None:
return None


class InMemoryAgentEventBus:
def __init__(self, *, buffer_size: int = 64) -> None:
self._buffer = buffer_size
self._subscribers: dict[int, list[asyncio.Queue[AgentEventEnvelope]]] = {}

async def publish(self, envelope: AgentEventEnvelope) -> None:
queues = list(self._subscribers.get(envelope.job_id, []))
for queue in queues:
if queue.full():
logger.warning(
"InMemoryAgentEventBus dropped event: queue full job_id=%s",
envelope.job_id,
)
continue
await queue.put(envelope)

@contextlib.asynccontextmanager
async def subscribe(self, *, job_id: int) -> AsyncIterator[_InMemoryStream]:
queue: asyncio.Queue[AgentEventEnvelope] = asyncio.Queue(maxsize=self._buffer)
self._subscribers.setdefault(job_id, []).append(queue)
try:
yield _InMemoryStream(queue=queue)
finally:
subscribers = self._subscribers.get(job_id)
if subscribers is not None:
subscribers.remove(queue)
if not subscribers:
del self._subscribers[job_id]


class RedisAgentEventBus:
def __init__(
self,
*,
redis: Redis,
channel_prefix: str,
buffer_size: int = 64,
) -> None:
self._redis = redis
self._channel_prefix = channel_prefix
self._buffer = buffer_size

def _channel(self, job_id: int) -> str:
return f"{self._channel_prefix}:{job_id}:events"

async def publish(self, envelope: AgentEventEnvelope) -> None:
await self._redis.publish(self._channel(envelope.job_id), envelope.to_json())

@contextlib.asynccontextmanager
async def subscribe(self, *, job_id: int) -> AsyncIterator[_RedisStream]:
pubsub = self._redis.pubsub()
channel = self._channel(job_id)
await pubsub.subscribe(channel)
stream = _RedisStream(pubsub=pubsub)
try:
yield stream
finally:
await pubsub.unsubscribe(channel)
await pubsub.aclose()


@dataclass(slots=True)
class _RedisStream:
pubsub: Any

async def next(self, *, timeout: float | None = None) -> AgentEventEnvelope:
if timeout is None:
async for message in self.pubsub.listen():
envelope = _envelope_from_redis_message(message)
if envelope is not None:
return envelope
else:
message = await self.pubsub.get_message(
ignore_subscribe_messages=True,
timeout=timeout,
)
envelope = _envelope_from_redis_message(message)
if envelope is not None:
return envelope
raise TimeoutError("No event within timeout")

async def aclose(self) -> None:
await self.pubsub.aclose()


def _envelope_from_redis_message(message: Any) -> AgentEventEnvelope | None:
if message is None:
return None
message_type = message.get("type")
if message_type not in {"message", "pmessage"}:
return None
data = message.get("data")
if isinstance(data, bytes):
data = data.decode("utf-8")
return AgentEventEnvelope.from_json(str(data))


def build_agent_event_bus(
*,
settings: Settings | None = None,
redis: Redis | None = None,
) -> AgentEventBus:
cfg = settings or get_settings()
if redis is None:
if not cfg.redis_url:
return InMemoryAgentEventBus(buffer_size=cfg.agent_event_bus_buffer_size)
redis = Redis.from_url(cfg.redis_url, decode_responses=True)
return RedisAgentEventBus(
redis=redis,
channel_prefix=cfg.agent_event_channel_prefix,
buffer_size=cfg.agent_event_bus_buffer_size,
)


__all__ = [
"AgentEventBus",
"AgentEventEnvelope",
"AgentEventStream",
"InMemoryAgentEventBus",
"RedisAgentEventBus",
"build_agent_event_bus",
]
15 changes: 3 additions & 12 deletions app/src/fileflash/agents/harness/events.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Any
from .event_bus import AgentEventBus as EventBus
from .event_bus import AgentEventEnvelope as AgentEvent


@dataclass(slots=True)
class AgentEvent:
event_type: str
payload: dict[str, Any]


class EventBus:
async def publish(self, event: AgentEvent) -> None:
raise NotImplementedError("EventBus is scaffolded only in this stage")
__all__ = ["AgentEvent", "EventBus"]
Loading
Loading