From 58d455436a81648b923bb54fe77e7ff57b6d21cc Mon Sep 17 00:00:00 2001 From: 0neStep <146049978+AperturePlus@users.noreply.github.com> Date: Tue, 26 May 2026 10:24:07 +0800 Subject: [PATCH 01/18] feat(agent): add search term support to drive.countFiles tool - Extract search terms from user count questions and pass as filename filter - Include search qualifier in execution answer text - Add deterministic count plan bypass that extracts search from input - Update mock to extract and reflect search terms --- app/src/fileflash/agents/harness/router.py | 3 +- .../agents/runtime/execute_runner.py | 43 ++++- .../fileflash/agents/runtime/plan_runner.py | 164 +++++++++++++---- app/tests/test_agent_plan_execute_runtime.py | 173 ++++++++++++++++++ web/src/mock/handlers/agent.ts | 56 +++++- 5 files changed, 391 insertions(+), 48 deletions(-) diff --git a/app/src/fileflash/agents/harness/router.py b/app/src/fileflash/agents/harness/router.py index 3c46f39..81a4dde 100644 --- a/app/src/fileflash/agents/harness/router.py +++ b/app/src/fileflash/agents/harness/router.py @@ -137,7 +137,7 @@ async def _count_files(self, args: dict[str, Any]) -> dict[str, Any]: folder_id = str(_first_value(args, "folderId", "parentFolderId") or "root") recursive = _bool_arg(args.get("recursive"), default=True) category = _normalize_category(args.get("category")) - search = str(args.get("search") or "").strip().lower() + search = str(args.get("search") or "").strip() root_folder_id = await _resolve_folder_id( self.db, user_id=self.user_id, @@ -208,6 +208,7 @@ async def _count_files(self, args: dict[str, Any]) -> dict[str, Any]: "category": category, "recursive": recursive, "folderId": str(root_folder_id), + "search": search or None, "byMimeType": dict(sorted(by_mime_type.items())), "sampleItems": sample_items, } diff --git a/app/src/fileflash/agents/runtime/execute_runner.py b/app/src/fileflash/agents/runtime/execute_runner.py index 499b489..7aa35bb 100644 --- a/app/src/fileflash/agents/runtime/execute_runner.py +++ b/app/src/fileflash/agents/runtime/execute_runner.py @@ -240,21 +240,50 @@ def _build_execution_answer( return _count_files_answer(output) if actions and all(action.side_effect == "read" for action in actions): - return f"已完成 {len(step_outputs)} 个只读操作。" + return _read_only_answer(actions=actions, step_outputs=step_outputs) return None def _count_files_answer(output: dict[str, Any]) -> str: total_items = int(output.get("totalItems") or 0) category = str(output.get("category") or "").strip().lower() + qualifier = _search_qualifier(output) if category == "video": - return f"你上传了 {total_items} 部电影(按视频文件统计)。" + return f"你上传了 {total_items} 部{qualifier}电影(按视频文件统计)。" if category == "audio": - return f"你上传了 {total_items} 个音频文件。" + return f"你上传了 {total_items} 个{qualifier}音频文件。" if category == "image": - return f"你上传了 {total_items} 张图片。" + return f"你上传了 {total_items} 张{qualifier}图片。" if category == "document": - return f"你上传了 {total_items} 个文档。" + return f"你上传了 {total_items} 个{qualifier}文档。" if category == "archive": - return f"你上传了 {total_items} 个压缩包。" - return f"你上传了 {total_items} 个文件。" + return f"你上传了 {total_items} 个{qualifier}压缩包。" + return f"你上传了 {total_items} 个{qualifier}文件。" + + +def _search_qualifier(output: dict[str, Any]) -> str: + search = str(output.get("search") or "").strip() + if not search: + return "" + return f"名称包含“{search}”的" + + +def _read_only_answer( + *, + actions: list[AgentProposedAction], + step_outputs: dict[int, dict[str, Any]], +) -> str: + for action in actions: + output = step_outputs.get(action.step) + if not isinstance(output, dict): + continue + if action.tool == "drive.listFolder": + pagination = output.get("pagination") + total_items = None + if isinstance(pagination, dict): + total_items = pagination.get("totalItems") + if total_items is None: + items = output.get("items") + total_items = len(items) if isinstance(items, list) else 0 + return f"已读取当前文件夹,共 {int(total_items or 0)} 个项目。" + return "查询已完成,但没有可展示的结果。" diff --git a/app/src/fileflash/agents/runtime/plan_runner.py b/app/src/fileflash/agents/runtime/plan_runner.py index e3421d0..96ce41e 100644 --- a/app/src/fileflash/agents/runtime/plan_runner.py +++ b/app/src/fileflash/agents/runtime/plan_runner.py @@ -2,6 +2,7 @@ import hashlib import json +import re from datetime import UTC, datetime from typing import Any @@ -64,26 +65,32 @@ async def run(self, *, db: AsyncSession, job: BackgroundJob) -> AgentPlanResult: ) metadata = await _collect_context_metadata(db, user_id=user_id, request=request) allowed_tools = _skill_tool_whitelist(skill) - try: - llm_payload = await self.planner_client.create_plan( - system_prompt=_system_prompt(), - user_prompt=_user_prompt( - request=request, - skill=skill, - allowed_tools=allowed_tools, - metadata=metadata, - ), - max_tokens=request.hints.budget_tokens, - reasoning_effort=request.hints.reasoning_effort, - ) - except ApiError as exc: - if exc.status_code != 502: - raise - llm_payload = _safe_fallback_payload( + if "drive.countFiles" in allowed_tools and _looks_like_count_question(request.input): + llm_payload = _count_question_payload( request=request, metadata=metadata, - allowed_tools=allowed_tools, ) + else: + try: + llm_payload = await self.planner_client.create_plan( + system_prompt=_system_prompt(), + user_prompt=_user_prompt( + request=request, + skill=skill, + allowed_tools=allowed_tools, + metadata=metadata, + ), + max_tokens=request.hints.budget_tokens, + reasoning_effort=request.hints.reasoning_effort, + ) + except ApiError as exc: + if exc.status_code != 502: + raise + llm_payload = _safe_fallback_payload( + request=request, + metadata=metadata, + allowed_tools=allowed_tools, + ) actions = _normalize_actions( llm_payload=llm_payload, @@ -453,7 +460,8 @@ def _tool_schemas(allowed_tools: tuple[str, ...]) -> list[dict[str, Any]]: "drive.listFolder": "List direct folder contents by folderId.", "drive.countFiles": ( "Count files under folderId. Supports recursive=true and category values " - "video, audio, image, document, archive, other. Use category=video for movie/电影 questions." + "video, audio, image, document, archive, other. Supports search for file-name " + "contains filters. Use category=video for movie/电影/几部 questions." ), "drive.createFolder": "Create a folder under parentFolderId with name.", "drive.moveFile": "Move fileId into targetFolderId.", @@ -474,24 +482,7 @@ def _safe_fallback_payload( ) -> dict[str, Any]: fallback_actions: list[dict[str, Any]] = [] if "drive.countFiles" in allowed_tools and _looks_like_count_question(request.input): - fallback_actions.append( - { - "step": 1, - "tool": "drive.countFiles", - "input": { - "folderId": metadata.get("rootFolderId") or request.context.root_folder_id or "root", - "recursive": True, - "category": _fallback_count_category(request.input), - }, - "sideEffect": "read", - "riskLevel": "low", - "requiresConfirmation": False, - } - ) - return { - "summary": "Planner fallback mode: generated a safe read-only count plan.", - "proposedActions": fallback_actions, - } + return _count_question_payload(request=request, metadata=metadata) if "drive.listFolder" in allowed_tools: root_folder_id = str( metadata.get("rootFolderId") @@ -519,9 +510,37 @@ def _looks_like_count_question(text: str) -> bool: return any(token in normalized for token in ("多少", "几个", "几部", "count", "how many", "number of")) +def _count_question_payload( + *, + request: PlanAgentRequest, + metadata: dict[str, Any], +) -> dict[str, Any]: + action_input: dict[str, Any] = { + "folderId": metadata.get("rootFolderId") or request.context.root_folder_id or "root", + "recursive": True, + "category": _fallback_count_category(request.input), + } + search = _fallback_count_search(request.input) + if search: + action_input["search"] = search + return { + "summary": "已准备按文件名和类型统计你的文件。", + "proposedActions": [ + { + "step": 1, + "tool": "drive.countFiles", + "input": action_input, + "sideEffect": "read", + "riskLevel": "low", + "requiresConfirmation": False, + } + ], + } + + def _fallback_count_category(text: str) -> str | None: normalized = text.lower() - if any(token in normalized for token in ("电影", "影片", "视频", "movie", "film", "video")): + if any(token in normalized for token in ("电影", "影片", "视频", "几部", "movie", "film", "video")): return "video" if any(token in normalized for token in ("图片", "照片", "image", "photo", "picture")): return "image" @@ -534,6 +553,77 @@ def _fallback_count_category(text: str) -> str | None: return None +def _fallback_count_search(text: str) -> str | None: + cleaned = re.sub(r"[??!!。.,,;;::]", " ", text).strip() + candidate = cleaned + for phrase in ( + "我上传了多少部", + "我上传了多少个", + "我上传了几部", + "我上传了几个", + "上传了多少部", + "上传了多少个", + "上传了几部", + "上传了几个", + "有多少部", + "有多少个", + "有几部", + "有几个", + ): + candidate = candidate.replace(phrase, " ") + for token in ( + "我", + "上传", + "了", + "有", + "多少", + "几个", + "几部", + "多少部", + "多少个", + "部", + "个", + "文件", + "电影", + "影片", + "视频", + "音频", + "音乐", + "图片", + "照片", + "文档", + "压缩包", + ): + candidate = candidate.replace(token, " ") + candidate = " ".join(candidate.split()).strip() + if candidate: + return candidate + + english = cleaned.lower() + for phrase in ( + "how many", + "number of", + "did i upload", + "have i uploaded", + "i uploaded", + "uploaded", + "upload", + "movies", + "movie", + "films", + "film", + "videos", + "video", + "files", + "file", + "are there", + "do i have", + ): + english = english.replace(phrase, " ") + english = " ".join(english.split()).strip() + return english or None + + def _normalize_actions( *, llm_payload: dict[str, Any], diff --git a/app/tests/test_agent_plan_execute_runtime.py b/app/tests/test_agent_plan_execute_runtime.py index 46fd8ef..8df67ba 100644 --- a/app/tests/test_agent_plan_execute_runtime.py +++ b/app/tests/test_agent_plan_execute_runtime.py @@ -682,6 +682,58 @@ async def test_plan_runner_fallback_uses_count_files_for_movie_count_question( assert result.proposed_actions[0].side_effect == "read" +@pytest.mark.asyncio +async def test_plan_runner_uses_deterministic_count_plan_with_search_term( + monkeypatch: pytest.MonkeyPatch, +): + monkeypatch.setattr(plan_module, "_choose_skill", AsyncMock(return_value=None)) + monkeypatch.setattr( + plan_module, + "_collect_context_metadata", + AsyncMock(return_value={"scope": "currentFolder", "rootFolderId": "root", "files": [], "folders": []}), + ) + monkeypatch.setattr(plan_module, "_upsert_agent_plan", AsyncMock(return_value=None)) + + planner = AsyncMock(return_value={"summary": "wrong", "proposedActions": []}) + runner = PlanRunner( + settings=settings(), + planner_client=SimpleNamespace(create_plan=planner), # type: ignore[arg-type] + ) + request = PlanAgentRequest.model_validate( + { + "input": "我上传了几部银翼杀手?", + "context": { + "rootFolderId": "root", + "selectedFileIds": [], + "selectedFolderIds": [], + "currentPath": "/My Files", + }, + "executionPolicy": "confirm", + } + ) + job = BackgroundJob( + job_id=335, + task_type="agent.plan", + status="running", + payload=request.model_dump(by_alias=True), + result={}, + requested_by=7, + scheduled_at=datetime.now(UTC), + created_at=datetime.now(UTC), + updated_at=datetime.now(UTC), + ) + + result = await runner.run(db=DummyDb(), job=job) # type: ignore[arg-type] + + planner.assert_not_awaited() + assert len(result.proposed_actions) == 1 + action = result.proposed_actions[0] + assert action.tool == "drive.countFiles" + assert action.input["category"] == "video" + assert action.input["search"] == "银翼杀手" + assert action.side_effect == "read" + + def test_normalize_actions_rejects_symbolic_placeholder_target_folder(): with pytest.raises(ApiError) as exc: plan_module._normalize_actions( @@ -837,6 +889,38 @@ async def test_tool_router_count_files_counts_recursive_videos(): assert "file.is_latest" in executed_statement +@pytest.mark.asyncio +async def test_tool_router_count_files_filters_by_search_term(): + db = DummyDb() + db.scalar = AsyncMock(return_value=1) + db.scalars = AsyncMock(return_value=[1]) + db.execute = AsyncMock( + return_value=SimpleNamespace( + all=lambda: [ + (10, "银翼杀手.mp4", 100, "video/mp4", "mp4", 1), + ] + ) + ) + router = ToolRouter(db=db, user_id=7) # type: ignore[arg-type] + + result = await router.dispatch( + ToolCall( + tool_name="drive.countFiles", + arguments={ + "folderId": "root", + "recursive": True, + "category": "video", + "search": "银翼杀手", + }, + ) + ) + + assert result["totalItems"] == 1 + assert result["search"] == "银翼杀手" + executed_statement = str(db.execute.await_args.args[0]) + assert "file_name" in executed_statement + + @pytest.mark.asyncio async def test_execute_runner_normalizes_tool_output_before_action_log(monkeypatch: pytest.MonkeyPatch): started = datetime.now(UTC) @@ -993,3 +1077,92 @@ async def test_execute_runner_returns_count_files_answer(monkeypatch: pytest.Mon assert result.answer == "你上传了 3 部电影(按视频文件统计)。" assert result.applied_actions == 1 + + +@pytest.mark.asyncio +async def test_execute_runner_returns_count_files_answer_with_search_term( + monkeypatch: pytest.MonkeyPatch, +): + started = datetime.now(UTC) + job = BackgroundJob( + job_id=602, + task_type="agent.execute", + status="running", + payload={ + "planJobId": "502", + "planHash": "sha256:test", + "approval": { + "confirmedBy": "7", + "confirmedAt": started.isoformat(), + "highRiskConfirmed": False, + }, + }, + result={}, + requested_by=7, + scheduled_at=started, + created_at=started, + updated_at=started, + ) + action = { + "step": 1, + "tool": "drive.countFiles", + "input": { + "folderId": "root", + "recursive": True, + "category": "video", + "search": "银翼杀手", + }, + "sideEffect": "read", + "riskLevel": "low", + "requiresConfirmation": False, + } + db = DummyDb() + db.refresh = AsyncMock() + + monkeypatch.setattr( + execute_module, + "AgentPlanRepository", + lambda _db: SimpleNamespace( + get_for_execute_binding=AsyncMock( + return_value=SimpleNamespace(proposed_actions_json=[action]) + ) + ), + ) + monkeypatch.setattr( + execute_module, + "AgentWorkSessionRepository", + lambda _db: SimpleNamespace( + create_for_job=AsyncMock(return_value=None), + close_session=AsyncMock(return_value=None), + ), + ) + monkeypatch.setattr( + execute_module, + "AgentActionLogRepository", + lambda _db: SimpleNamespace( + append_step=AsyncMock(return_value=None), + finish_step=AsyncMock(return_value=None), + ), + ) + monkeypatch.setattr( + execute_module, + "ToolRouter", + lambda **kwargs: SimpleNamespace( + dispatch=AsyncMock( + return_value={ + "totalItems": 2, + "category": "video", + "recursive": True, + "folderId": "1", + "search": "银翼杀手", + "byMimeType": {"video/mp4": 2}, + "sampleItems": [], + } + ) + ), + ) + + result = await ExecuteRunner().run(db=db, job=job) # type: ignore[arg-type] + + assert result.answer == "你上传了 2 部名称包含“银翼杀手”的电影(按视频文件统计)。" + assert "只读操作" not in (result.answer or "") diff --git a/web/src/mock/handlers/agent.ts b/web/src/mock/handlers/agent.ts index 09feeeb..be704e3 100644 --- a/web/src/mock/handlers/agent.ts +++ b/web/src/mock/handlers/agent.ts @@ -13,6 +13,52 @@ const nowIso = () => new Date().toISOString(); const isTerminal = (status: string) => ['succeeded', 'failed', 'canceled'].includes(status); +const extractCountSearch = (input: string) => { + let text = input.replace(/[??!!。.,,;;::]/g, ' ').trim(); + [ + '我上传了多少部', + '我上传了多少个', + '我上传了几部', + '我上传了几个', + '上传了多少部', + '上传了多少个', + '上传了几部', + '上传了几个', + '有多少部', + '有多少个', + '有几部', + '有几个', + ].forEach((phrase) => { + text = text.split(phrase).join(' '); + }); + [ + '我', + '上传', + '了', + '有', + '多少', + '几个', + '几部', + '多少部', + '多少个', + '部', + '个', + '文件', + '电影', + '影片', + '视频', + '音频', + '音乐', + '图片', + '照片', + '文档', + '压缩包', + ].forEach((token) => { + text = text.split(token).join(' '); + }); + return text.split(/\s+/).filter(Boolean).join(' ') || undefined; +}; + const pickPlanActions = (input: string): AgentProposedAction[] => { const normalized = input.toLowerCase(); if ( @@ -33,9 +79,10 @@ const pickPlanActions = (input: string): AgentProposedAction[] => { folderId: 'root', recursive: true, category: - normalized.includes('电影') || normalized.includes('视频') || normalized.includes('movie') + normalized.includes('电影') || normalized.includes('视频') || normalized.includes('几部') || normalized.includes('movie') ? 'video' : undefined, + search: extractCountSearch(input), }, }, ]; @@ -272,10 +319,13 @@ const scheduleExecuteLifecycle = (job: AgentBackgroundJob, plan: AgentPlanResult const mockExecutionAnswer = (plan: AgentPlanResult) => { const countAction = plan.proposedActions.find((action) => action.tool === 'drive.countFiles'); if (!countAction) return null; + const search = String(countAction.input.search || '').trim(); + const qualifier = search ? `名称包含“${search}”的` : ''; if (countAction.input.category === 'video') { - return '你上传了 7 部电影(按视频文件统计)。'; + const total = search === '银翼杀手' ? 2 : 7; + return `你上传了 ${total} 部${qualifier}电影(按视频文件统计)。`; } - return '你上传了 12 个文件。'; + return `你上传了 12 个${qualifier}文件。`; }; export const setupAgentMocks = () => { From baa3a1634a20b049143f21ccb3404d7cac67c762 Mon Sep 17 00:00:00 2001 From: 0neStep <146049978+AperturePlus@users.noreply.github.com> Date: Tue, 26 May 2026 10:24:16 +0800 Subject: [PATCH 02/18] feat(agent): add SSE event streaming for agent job lifecycle - Add GET /agent/jobs/{jobId}/events SSE endpoint that streams job status, plan ready, and tool execution events - Add AgentJobEvent schema with event types for job and tool lifecycle - Implement frontend SSE parser and streamAgentJobEvents API helper - Integrate streaming into useAgentSession with polling fallback - Stream events update ChatMessage events array, status, planResult, and executeResult in real-time --- app/src/fileflash/routers/agent.py | 248 +++++++++++++++++++- app/src/fileflash/schemas/__init__.py | 2 + app/src/fileflash/schemas/agent.py | 25 ++ app/tests/test_agent_routes.py | 61 ++++- web/src/api/agent.spec.ts | 47 ++++ web/src/api/agent.ts | 76 ++++++ web/src/composables/useAgentSession.spec.ts | 81 ++++++- web/src/composables/useAgentSession.ts | 121 +++++++++- web/src/types/agent.d.ts | 22 ++ 9 files changed, 670 insertions(+), 13 deletions(-) create mode 100644 web/src/api/agent.spec.ts diff --git a/app/src/fileflash/routers/agent.py b/app/src/fileflash/routers/agent.py index 2e56108..b017eaf 100644 --- a/app/src/fileflash/routers/agent.py +++ b/app/src/fileflash/routers/agent.py @@ -1,21 +1,25 @@ from __future__ import annotations +import asyncio +import json from datetime import UTC, datetime from typing import Annotated from fastapi import APIRouter, Depends +from fastapi.responses import StreamingResponse from sqlalchemy import and_, select from sqlalchemy.ext.asyncio import AsyncSession from ..core.deps import get_agent_execute_service, get_agent_plan_service, get_current_user from ..core.errors import ApiError, api_success from ..db.deps import get_db -from ..models import BackgroundJob +from ..models import AgentActionLog, BackgroundJob from ..models.tables_identity import User -from ..schemas.agent import CancelAgentResponse, ExecuteAgentRequest, PlanAgentRequest +from ..schemas.agent import AgentJobEvent, CancelAgentResponse, ExecuteAgentRequest, PlanAgentRequest from ..services.agent import ExecuteService, PlanService router = APIRouter(prefix="/agent", tags=["agent"]) +AGENT_EVENT_POLL_INTERVAL_SEC = 0.6 @router.post("/plan") @@ -44,6 +48,51 @@ async def execute_agent_plan( ) +@router.get("/jobs/{job_id}/events") +async def stream_agent_job_events( + job_id: str, + current_user: Annotated[User, Depends(get_current_user)], + db: Annotated[AsyncSession, Depends(get_db)], +): + parsed_job_id = _parse_job_id(job_id) + initial_events, initial_terminal = await _agent_job_events_for_job( + db=db, + job_id=parsed_job_id, + user_id=int(current_user.user_id), + ) + + async def event_stream(): + seen: set[str] = set() + for event in initial_events: + seen.add(event.id) + yield _format_sse_event(event) + if initial_terminal: + return + while True: + events, terminal = await _agent_job_events_for_job( + db=db, + job_id=parsed_job_id, + user_id=int(current_user.user_id), + ) + for event in events: + if event.id in seen: + continue + seen.add(event.id) + yield _format_sse_event(event) + if terminal: + break + await asyncio.sleep(AGENT_EVENT_POLL_INTERVAL_SEC) + + return StreamingResponse( + event_stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) + + @router.post("/cancel/{job_id}") async def cancel_agent_job( job_id: str, @@ -86,4 +135,199 @@ async def cancel_agent_job( return api_success(data=data.model_dump(by_alias=True), message="Job canceled") +def _parse_job_id(raw: str) -> int: + try: + parsed_job_id = int(raw) + except ValueError as exc: + raise ApiError(status_code=400, code=400, message="Invalid jobId") from exc + if parsed_job_id <= 0: + raise ApiError(status_code=400, code=400, message="Invalid jobId") + return parsed_job_id + + +async def _agent_job_events_for_job( + *, + db: AsyncSession, + job_id: int, + user_id: int, +) -> tuple[list[AgentJobEvent], bool]: + job = await db.scalar( + select(BackgroundJob).where( + and_( + BackgroundJob.job_id == job_id, + BackgroundJob.requested_by == user_id, + BackgroundJob.task_type.in_(["agent.plan", "agent.execute"]), + ) + ) + ) + if job is None: + raise ApiError(status_code=404, code=404, message="Job not found") + + terminal = str(job.status) in {"succeeded", "failed", "canceled"} + events: list[AgentJobEvent] = [] + if job.task_type == "agent.plan" and job.status == "succeeded" and job.result: + events.append(_plan_ready_event(job)) + events.append(_job_status_event(job)) + elif job.task_type != "agent.execute" or not terminal: + events.append(_job_status_event(job)) + + if job.task_type == "agent.execute": + action_logs = list( + await db.scalars( + select(AgentActionLog) + .where(AgentActionLog.job_id == job_id) + .order_by(AgentActionLog.step_no.asc()) + ) + ) + for action_log in action_logs: + events.extend(_tool_events(job=job, action_log=action_log)) + if terminal: + events.append(_job_status_event(job)) + + return events, terminal + + +def _job_status_event(job: BackgroundJob) -> AgentJobEvent: + status = str(job.status) + event_type = { + "pending": "job.queued", + "running": "job.running", + "succeeded": "job.succeeded", + "failed": "job.failed", + "canceled": "job.canceled", + }.get(status, "job.running") + timestamp = job.updated_at or job.created_at + return AgentJobEvent( + id=f"{job.job_id}:job:{status}:{timestamp.isoformat()}", + job_id=str(job.job_id), + task_type=str(job.task_type), + type=event_type, # type: ignore[arg-type] + status=status, + agent_phase=job.agent_phase, + message=_job_status_message(job), + data=_job_status_data(job), + timestamp=timestamp, + ) + + +def _plan_ready_event(job: BackgroundJob) -> AgentJobEvent: + timestamp = job.finished_at or job.updated_at or job.created_at + return AgentJobEvent( + id=f"{job.job_id}:plan-ready", + job_id=str(job.job_id), + task_type=str(job.task_type), + type="plan.ready", + status=str(job.status), + agent_phase=job.agent_phase, + message="计划已生成。", + data={"result": dict(job.result or {})}, + timestamp=timestamp, + ) + + +def _tool_events(*, job: BackgroundJob, action_log: AgentActionLog) -> list[AgentJobEvent]: + events = [ + AgentJobEvent( + id=f"{job.job_id}:tool:{action_log.action_log_id}:started", + job_id=str(job.job_id), + task_type=str(job.task_type), + type="tool.started", + status=str(job.status), + agent_phase=job.agent_phase, + message=_tool_started_message(action_log), + data=_tool_event_data(action_log, include_output=False), + timestamp=action_log.started_at, + ) + ] + if action_log.status in {"succeeded", "failed"} and action_log.finished_at is not None: + events.append( + AgentJobEvent( + id=f"{job.job_id}:tool:{action_log.action_log_id}:{action_log.status}", + job_id=str(job.job_id), + task_type=str(job.task_type), + type="tool.succeeded" if action_log.status == "succeeded" else "tool.failed", + status=str(job.status), + agent_phase=job.agent_phase, + message=_tool_finished_message(action_log), + data=_tool_event_data(action_log, include_output=True), + timestamp=action_log.finished_at, + ) + ) + return events + + +def _job_status_message(job: BackgroundJob) -> str: + status = str(job.status) + if status == "pending": + return "任务已排队。" + if status == "running": + return "正在规划任务。" if job.task_type == "agent.plan" else "正在执行计划。" + if status == "succeeded": + result = dict(job.result or {}) + answer = result.get("answer") + if isinstance(answer, str) and answer.strip(): + return "答案已生成。" + return "任务已完成。" + if status == "failed": + return str(job.error_message or "任务失败。") + if status == "canceled": + return "任务已取消。" + return "任务状态已更新。" + + +def _job_status_data(job: BackgroundJob) -> dict[str, object]: + data: dict[str, object] = {} + if job.status in {"succeeded", "failed", "canceled"}: + data["result"] = dict(job.result or {}) + if job.error_message: + data["errorMessage"] = job.error_message + return data + + +def _tool_started_message(action_log: AgentActionLog) -> str: + if action_log.tool_name == "drive.countFiles": + inputs = dict(action_log.inputs_json or {}) + search = str(inputs.get("search") or "").strip() + category = str(inputs.get("category") or "").strip() + target = "视频文件" if category == "video" else "文件" + if search: + return f"正在读取名称包含“{search}”的{target}数量。" + return f"正在读取{target}数量。" + return f"正在调用 {action_log.tool_name}。" + + +def _tool_finished_message(action_log: AgentActionLog) -> str: + if action_log.status == "failed": + return str(action_log.error_message or f"{action_log.tool_name} 调用失败。") + if action_log.tool_name == "drive.countFiles": + outputs = dict(action_log.outputs_json or {}) + total_items = int(outputs.get("totalItems") or 0) + return f"读取完成,匹配 {total_items} 个文件。" + return f"{action_log.tool_name} 已完成。" + + +def _tool_event_data(action_log: AgentActionLog, *, include_output: bool) -> dict[str, object]: + data: dict[str, object] = { + "step": int(action_log.step_no), + "tool": str(action_log.tool_name), + "input": dict(action_log.inputs_json or {}), + } + if include_output: + data["output"] = dict(action_log.outputs_json or {}) + if action_log.duration_ms is not None: + data["durationMs"] = int(action_log.duration_ms) + if action_log.error_message: + data["errorMessage"] = action_log.error_message + return data + + +def _format_sse_event(event: AgentJobEvent) -> str: + payload = event.model_dump(by_alias=True, mode="json") + return ( + f"id: {event.id}\n" + f"event: {event.type}\n" + f"data: {json.dumps(payload, ensure_ascii=False, separators=(',', ':'))}\n\n" + ) + + __all__ = ["router"] diff --git a/app/src/fileflash/schemas/__init__.py b/app/src/fileflash/schemas/__init__.py index c970aae..58714ab 100644 --- a/app/src/fileflash/schemas/__init__.py +++ b/app/src/fileflash/schemas/__init__.py @@ -25,6 +25,7 @@ AgentDataPolicy, AgentExecutionResult, AgentHints, + AgentJobEvent, AgentPlanContext, AgentPlanResult, AgentProposedAction, @@ -178,6 +179,7 @@ "AgentDataPolicy", "AgentExecutionResult", "AgentHints", + "AgentJobEvent", "AgentPlanContext", "AgentPlanResult", "AgentProposedAction", diff --git a/app/src/fileflash/schemas/agent.py b/app/src/fileflash/schemas/agent.py index e253be0..31e503f 100644 --- a/app/src/fileflash/schemas/agent.py +++ b/app/src/fileflash/schemas/agent.py @@ -20,6 +20,17 @@ "failed", "canceled", ] +AgentJobEventType = Literal[ + "job.queued", + "job.running", + "plan.ready", + "tool.started", + "tool.succeeded", + "tool.failed", + "job.succeeded", + "job.failed", + "job.canceled", +] class AgentDataPolicy(CamelModel): @@ -123,6 +134,18 @@ class AgentExecutionResult(CamelModel): finished_at: datetime +class AgentJobEvent(CamelModel): + id: str + job_id: str + task_type: str + type: AgentJobEventType + status: str + agent_phase: str | None = None + message: str + data: dict[str, Any] = Field(default_factory=dict) + timestamp: datetime + + __all__ = [ "AgentActionSideEffect", "AgentApproval", @@ -133,6 +156,8 @@ class AgentExecutionResult(CamelModel): "AgentExecutionResult", "AgentHints", "AgentJobPhase", + "AgentJobEvent", + "AgentJobEventType", "AgentPlanContext", "AgentPlanResult", "AgentProposedAction", diff --git a/app/tests/test_agent_routes.py b/app/tests/test_agent_routes.py index 6b50f90..27f4009 100644 --- a/app/tests/test_agent_routes.py +++ b/app/tests/test_agent_routes.py @@ -8,7 +8,7 @@ from fileflash.core.deps import get_agent_execute_service, get_agent_plan_service, get_current_user from fileflash.core.errors import ApiError, api_error_handler from fileflash.db.deps import get_db -from fileflash.models import BackgroundJob +from fileflash.models import AgentActionLog, BackgroundJob from fileflash.models.tables_identity import User from fileflash.routers.agent import router from fileflash.schemas.agent import ExecuteAgentResponse, PlanAgentResponse @@ -55,6 +55,40 @@ def __init__(self) -> None: self.job.status = "running" +class EventsDb(StubDb): + def __init__(self) -> None: + super().__init__() + now = datetime.now(UTC) + self.job.status = "succeeded" + self.job.result = { + "planJobId": "10", + "executeJobId": "12", + "summary": "done", + "answer": "你上传了 2 部名称包含“银翼杀手”的电影(按视频文件统计)。", + "appliedActions": 1, + "skippedActions": 0, + "warnings": [], + "finishedAt": now.isoformat(), + } + self.job.finished_at = now + self.job.updated_at = now + self.action_log = AgentActionLog( + action_log_id=1, + job_id=12, + step_no=1, + tool_name="drive.countFiles", + inputs_json={"folderId": "root", "category": "video", "search": "银翼杀手"}, + outputs_json={"totalItems": 2, "category": "video", "search": "银翼杀手"}, + status="succeeded", + duration_ms=12, + started_at=now, + finished_at=now, + ) + + async def scalars(self, _query): # noqa: ANN001 + return [self.action_log] + + def _user() -> User: return User(user_id=7, username="u7", email="u7@example.com", password_hash="x") @@ -81,6 +115,17 @@ def _client_with_running_job() -> TestClient: return TestClient(app) +def _client_with_events() -> TestClient: + app = FastAPI() + app.include_router(router, prefix="/api/v1") + app.add_exception_handler(ApiError, api_error_handler) + app.dependency_overrides[get_current_user] = _user + app.dependency_overrides[get_agent_plan_service] = lambda: StubPlanService() + app.dependency_overrides[get_agent_execute_service] = lambda: StubExecuteService() + app.dependency_overrides[get_db] = lambda: EventsDb() + return TestClient(app) + + def test_plan_route_returns_response_shell(): response = _client().post( "/api/v1/agent/plan", @@ -150,3 +195,17 @@ def test_cancel_route_marks_running_job_as_canceled(): assert body["success"] is True assert body["data"]["jobId"] == "12" assert body["data"]["status"] == "canceled" + + +def test_job_events_route_streams_tool_and_final_answer_events(): + response = _client_with_events().get("/api/v1/agent/jobs/12/events") + + assert response.status_code == 200 + assert response.headers["content-type"].startswith("text/event-stream") + body = response.text + assert "event: tool.started" in body + assert "event: tool.succeeded" in body + assert "event: job.succeeded" in body + assert "正在读取名称包含" in body + assert "银翼杀手" in body + assert "answer" in body diff --git a/web/src/api/agent.spec.ts b/web/src/api/agent.spec.ts new file mode 100644 index 0000000..0466f2d --- /dev/null +++ b/web/src/api/agent.spec.ts @@ -0,0 +1,47 @@ +import { describe, expect, it, vi, beforeEach, afterEach } from 'vitest'; + +vi.mock('../store/user', () => ({ + useUserStore: () => ({ token: 'test-token' }), +})); + +import { createAgentSseParser, streamAgentJobEvents } from './agent'; +import type { AgentJobEvent } from '../types/agent'; + +describe('api/agent event stream helpers', () => { + const originalFetch = globalThis.fetch; + + beforeEach(() => { + vi.restoreAllMocks(); + }); + + afterEach(() => { + globalThis.fetch = originalFetch; + }); + + it('parses SSE events across multiple chunks', () => { + const events: AgentJobEvent[] = []; + const parser = createAgentSseParser((event) => events.push(event)); + + parser.feed('event: job.running\n'); + parser.feed( + 'data: {"id":"1","jobId":"j1","taskType":"agent.plan","type":"job.running","status":"running","message":"正在规划","data":{},"timestamp":"2026-05-20T00:00:00Z"}\n\n', + ); + parser.feed( + 'event: plan.ready\ndata: {"id":"2","jobId":"j1","taskType":"agent.plan","type":"plan.ready","status":"succeeded","message":"计划已生成","data":{"result":{"planHash":"h1"}},"timestamp":"2026-05-20T00:00:01Z"}\n\n', + ); + parser.flush(); + + expect(events).toHaveLength(2); + expect(events[0].type).toBe('job.running'); + expect(events[1].data.result.planHash).toBe('h1'); + }); + + it('throws on stream setup failure so callers can fall back to polling', async () => { + globalThis.fetch = vi.fn().mockResolvedValue({ + ok: false, + status: 503, + } as Response) as unknown as typeof fetch; + + await expect(streamAgentJobEvents('job-1')).rejects.toThrow('503'); + }); +}); diff --git a/web/src/api/agent.ts b/web/src/api/agent.ts index b05f22a..7a16b4f 100644 --- a/web/src/api/agent.ts +++ b/web/src/api/agent.ts @@ -1,6 +1,8 @@ import http from '../utils/http'; +import { useUserStore } from '../store/user'; import type { AgentBackgroundJob, + AgentJobEvent, CancelAgentResponse, ExecuteAgentRequest, ExecuteAgentResponse, @@ -23,3 +25,77 @@ export const cancelAgentJob = (jobId: string) => { export const getAgentJob = >(jobId: string) => { return http.get>(`/jobs/${encodeURIComponent(jobId)}`); }; + +export interface AgentJobEventHandlers { + onEvent?: (event: AgentJobEvent) => void; +} + +export const createAgentSseParser = (onEvent: (event: AgentJobEvent) => void) => { + let buffer = ''; + + const parseBlock = (block: string) => { + const dataLines = block + .split('\n') + .filter((line) => line.startsWith('data:')) + .map((line) => line.slice(5).trimStart()); + if (!dataLines.length) return; + const raw = dataLines.join('\n').trim(); + if (!raw) return; + onEvent(JSON.parse(raw) as AgentJobEvent); + }; + + const feed = (chunk: string) => { + buffer += chunk.replace(/\r\n/g, '\n'); + let boundary = buffer.indexOf('\n\n'); + while (boundary >= 0) { + const block = buffer.slice(0, boundary); + buffer = buffer.slice(boundary + 2); + parseBlock(block); + boundary = buffer.indexOf('\n\n'); + } + }; + + const flush = () => { + if (!buffer.trim()) return; + parseBlock(buffer); + buffer = ''; + }; + + return { feed, flush }; +}; + +export const streamAgentJobEvents = async ( + jobId: string, + handlers: AgentJobEventHandlers = {}, + signal?: AbortSignal, +) => { + const userStore = useUserStore(); + const baseUrl = (import.meta.env.VITE_BASE_URL || '/api/v1').replace(/\/$/, ''); + const headers: Record = { Accept: 'text/event-stream' }; + if (userStore.token) { + headers.Authorization = `Bearer ${userStore.token}`; + } + const response = await fetch(`${baseUrl}/agent/jobs/${encodeURIComponent(jobId)}/events`, { + method: 'GET', + headers, + credentials: 'include', + signal, + }); + if (!response.ok) { + throw new Error(`Agent event stream failed: ${response.status}`); + } + if (!response.body) { + throw new Error('Agent event stream is not readable'); + } + + const parser = createAgentSseParser((event) => handlers.onEvent?.(event)); + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + while (true) { + const { value, done } = await reader.read(); + if (done) break; + parser.feed(decoder.decode(value, { stream: true })); + } + parser.feed(decoder.decode()); + parser.flush(); +}; diff --git a/web/src/composables/useAgentSession.spec.ts b/web/src/composables/useAgentSession.spec.ts index ef1b101..e5c9618 100644 --- a/web/src/composables/useAgentSession.spec.ts +++ b/web/src/composables/useAgentSession.spec.ts @@ -6,6 +6,7 @@ vi.mock('../api/agent', () => ({ executeAgentPlan: vi.fn(), cancelAgentJob: vi.fn(), getAgentJob: vi.fn(), + streamAgentJobEvents: vi.fn(), })); vi.mock('../store/user', () => ({ @@ -93,6 +94,7 @@ describe('useAgentSession', () => { vi.clearAllMocks(); localStorage.clear(); vi.useFakeTimers(); + vi.mocked(agentApi.streamAgentJobEvents).mockRejectedValue(new Error('stream unavailable')); }); afterEach(async () => { @@ -291,6 +293,80 @@ describe('useAgentSession', () => { expect(turn.agent.status).toBe('succeeded'); }); + it('uses streamed plan and execute events when available', async () => { + vi.mocked(agentApi.planAgentTask).mockResolvedValue({ + jobId: 'job-1', + status: 'pending', + taskType: 'agent.plan', + }); + vi.mocked(agentApi.executeAgentPlan).mockResolvedValue({ + jobId: 'job-2', + status: 'pending', + taskType: 'agent.execute', + }); + vi.mocked(agentApi.streamAgentJobEvents) + .mockImplementationOnce(async (_jobId, handlers) => { + handlers?.onEvent?.({ + id: 'plan-ready-1', + jobId: 'job-1', + taskType: 'agent.plan', + type: 'plan.ready', + status: 'succeeded', + agentPhase: 'completed', + message: '计划已生成。', + data: { result: readOnlyPlanResult }, + timestamp: '2026-05-20T00:00:00Z', + }); + handlers?.onEvent?.({ + id: 'plan-done-1', + jobId: 'job-1', + taskType: 'agent.plan', + type: 'job.succeeded', + status: 'succeeded', + agentPhase: 'completed', + message: '任务已完成。', + data: { result: readOnlyPlanResult }, + timestamp: '2026-05-20T00:00:01Z', + }); + }) + .mockImplementationOnce(async (_jobId, handlers) => { + handlers?.onEvent?.({ + id: 'tool-start-1', + jobId: 'job-2', + taskType: 'agent.execute', + type: 'tool.started', + status: 'running', + agentPhase: 'executing', + message: '正在读取名称包含“银翼杀手”的视频文件数量。', + data: { step: 1, tool: 'drive.countFiles' }, + timestamp: '2026-05-20T00:00:02Z', + }); + handlers?.onEvent?.({ + id: 'execute-done-1', + jobId: 'job-2', + taskType: 'agent.execute', + type: 'job.succeeded', + status: 'succeeded', + agentPhase: 'completed', + message: '答案已生成。', + data: { result: execResult }, + timestamp: '2026-05-20T00:00:03Z', + }); + }); + + const { default: useAgentSession } = await loadComposable(); + const { taskInput, sendMessage, activeTurns } = useAgentSession(); + taskInput.value = '我上传了几部银翼杀手?'; + await sendMessage(); + + const turn = activeTurns.value[0]; + expect(agentApi.getAgentJob).not.toHaveBeenCalled(); + expect(agentApi.executeAgentPlan).toHaveBeenCalled(); + expect(turn.agent.events.map((event) => event.id)).toContain('tool-start-1'); + expect(turn.agent.executeResult?.answer).toContain('3 部电影'); + expect(turn.agent.status).toBe('succeeded'); + }); + it('does not auto-execute write plans in confirm policy', async () => { vi.mocked(agentApi.planAgentTask).mockResolvedValue({ jobId: 'job-1', @@ -497,8 +573,9 @@ describe('useAgentSession', () => { taskInput.value = 'hello'; const sendTask = sendMessage(); - await Promise.resolve(); - await Promise.resolve(); + for (let i = 0; i < 6 && vi.mocked(agentApi.getAgentJob).mock.calls.length === 0; i += 1) { + await Promise.resolve(); + } const turn = activeTurns.value[0]; await cancel(turn.agent); expect(turn.agent.status).toBe('canceled'); diff --git a/web/src/composables/useAgentSession.ts b/web/src/composables/useAgentSession.ts index a8a14b4..1d0e54c 100644 --- a/web/src/composables/useAgentSession.ts +++ b/web/src/composables/useAgentSession.ts @@ -4,6 +4,7 @@ import { executeAgentPlan, getAgentJob, planAgentTask, + streamAgentJobEvents, } from '../api/agent'; import { useUserStore } from '../store/user'; import { useLocaleStore } from '../store/locale'; @@ -11,6 +12,7 @@ import { ui } from '../utils/ui'; import type { AgentExecutionPolicy, AgentExecutionResult, + AgentJobEvent, AgentPlanResult, AgentReasoningEffort, PlanAgentRequest, @@ -28,6 +30,7 @@ export interface ChatMessage { planResult?: AgentPlanResult; executeJobId?: string; executeResult?: AgentExecutionResult; + events: AgentJobEvent[]; errorMessage?: string; timestamp: string; } @@ -92,7 +95,10 @@ const normalizeSessions = (value: unknown): Session[] => { const session: Session = { id: record.id, title: typeof record.title === 'string' ? record.title : 'New session', - messages: record.messages as ChatMessage[], + messages: (record.messages as ChatMessage[]).map((message) => ({ + ...message, + events: Array.isArray(message.events) ? message.events : [], + })), createdAt: typeof record.createdAt === 'string' ? record.createdAt : now, updatedAt: typeof record.updatedAt === 'string' ? record.updatedAt : now, }; @@ -141,6 +147,7 @@ interface SessionState { isSending: Ref; pollGenerations: Map; pollSleepTimers: Map>; + streamControllers: Map; canceledTurns: Set; } @@ -157,6 +164,7 @@ const getState = (): SessionState => { const isSending = ref(false); const pollGenerations = new Map(); const pollSleepTimers = new Map>(); + const streamControllers = new Map(); const canceledTurns = new Set(); watch(sessions, (v) => persistSessions(v), { deep: true }); @@ -171,6 +179,7 @@ const getState = (): SessionState => { isSending, pollGenerations, pollSleepTimers, + streamControllers, canceledTurns, }; return _state; @@ -180,6 +189,8 @@ export const __resetForTests = () => { if (_state) { _state.pollSleepTimers.forEach((t) => clearTimeout(t)); _state.pollSleepTimers.clear(); + _state.streamControllers.forEach((controller) => controller.abort()); + _state.streamControllers.clear(); _state.pollGenerations.clear(); _state.canceledTurns.clear(); } @@ -258,9 +269,18 @@ export default function useAgentSession() { clearSleepTimer(key); }; + const stopStream = (key: string) => { + const controller = s.streamControllers.get(key); + if (!controller) return; + controller.abort(); + s.streamControllers.delete(key); + }; + const stopAllPolling = () => { s.pollSleepTimers.forEach((t) => clearTimeout(t)); s.pollSleepTimers.clear(); + s.streamControllers.forEach((controller) => controller.abort()); + s.streamControllers.clear(); s.pollGenerations.clear(); }; @@ -346,6 +366,8 @@ export default function useAgentSession() { clearTurnCanceled(msg); stopPolling(`${msg.id}:plan`); stopPolling(`${msg.id}:execute`); + stopStream(`${msg.id}:plan`); + stopStream(`${msg.id}:execute`); }); s.sessions.value.splice(idx, 1); if (s.activeSessionId.value === id) { @@ -362,6 +384,8 @@ export default function useAgentSession() { clearTurnCanceled(msg); stopPolling(`${msg.id}:plan`); stopPolling(`${msg.id}:execute`); + stopStream(`${msg.id}:plan`); + stopStream(`${msg.id}:execute`); }); stopAllPolling(); activeSession.value.messages = []; @@ -371,6 +395,79 @@ export default function useAgentSession() { const ensureSession = (): Session => activeSession.value ?? createSession(); + const appendAgentEvent = (msg: ChatMessage, event: AgentJobEvent) => { + if (msg.events.some((item) => item.id === event.id)) return; + msg.events.push(event); + }; + + const applyAgentEvent = (msg: ChatMessage, event: AgentJobEvent, kind: 'plan' | 'execute') => { + appendAgentEvent(msg, event); + if (event.type === 'job.queued') { + msg.status = 'pending'; + } else if (event.type === 'job.running' || event.type === 'tool.started') { + msg.status = 'running'; + } else if (event.type === 'job.failed' || event.type === 'tool.failed') { + msg.status = 'failed'; + const errorMessage = event.data?.errorMessage; + msg.errorMessage = typeof errorMessage === 'string' ? errorMessage : event.message; + } else if (event.type === 'job.canceled') { + msg.status = 'canceled'; + } else if (event.type === 'job.succeeded') { + msg.status = 'succeeded'; + } + + const result = event.data?.result; + if (event.type === 'plan.ready' && result) { + msg.planResult = result as AgentPlanResult; + msg.planHash = msg.planResult.planHash; + } + if (event.type === 'job.succeeded' && result) { + if (kind === 'plan') { + msg.planResult = result as AgentPlanResult; + msg.planHash = msg.planResult.planHash; + } else { + msg.executeResult = result as AgentExecutionResult; + } + } + }; + + const shouldAutoExecutePlan = (msg: ChatMessage): boolean => + Boolean( + msg.planResult && + ((s.policy.value === 'autopilot' && !msg.planResult.requiresConfirmation) || + (s.policy.value === 'confirm' && isReadOnlyAutoExecutable(msg.planResult))), + ); + + async function streamJobEvents( + kind: 'plan' | 'execute', + msg: ChatMessage, + jobId: string, + ): Promise { + const timerKey = `${msg.id}:${kind}`; + stopStream(timerKey); + const controller = new AbortController(); + s.streamControllers.set(timerKey, controller); + try { + await streamAgentJobEvents( + jobId, + { + onEvent: (event) => { + if (!ensureTurnNotCanceled(msg)) return; + applyAgentEvent(msg, event, kind); + }, + }, + controller.signal, + ); + return true; + } catch { + return controller.signal.aborted; + } finally { + if (s.streamControllers.get(timerKey) === controller) { + s.streamControllers.delete(timerKey); + } + } + } + async function pollPlanJob(msg: ChatMessage, jobId: string): Promise { const timerKey = `${msg.id}:plan`; await startPollLoop(timerKey, msg, async (generation) => { @@ -387,11 +484,7 @@ export default function useAgentSession() { msg.errorMessage = job.errorMessage || 'Plan failed.'; } if (isTerminalStatus(job.status)) { - const shouldAutoExecute = - msg.planResult && - ((s.policy.value === 'autopilot' && !msg.planResult.requiresConfirmation) || - (s.policy.value === 'confirm' && isReadOnlyAutoExecutable(msg.planResult))); - if (shouldAutoExecute) { + if (shouldAutoExecutePlan(msg)) { await runExecute(msg); } return false; @@ -439,6 +532,7 @@ export default function useAgentSession() { role: 'user', content: input, status: 'succeeded', + events: [], timestamp: now, }; const agentMsg: ChatMessage = { @@ -446,6 +540,7 @@ export default function useAgentSession() { role: 'agent', content: '', status: 'pending', + events: [], timestamp: now, }; session.messages.push(userMsg, agentMsg); @@ -470,7 +565,12 @@ export default function useAgentSession() { return; } reactiveAgent.status = 'pending'; - await pollPlanJob(reactiveAgent, res.jobId); + const streamed = await streamJobEvents('plan', reactiveAgent, res.jobId); + if (!streamed && ensureTurnNotCanceled(reactiveAgent)) { + await pollPlanJob(reactiveAgent, res.jobId); + } else if (streamed && ensureTurnNotCanceled(reactiveAgent) && shouldAutoExecutePlan(reactiveAgent)) { + await runExecute(reactiveAgent); + } } catch (error) { if (isTurnCanceled(reactiveAgent) || reactiveAgent.status === 'canceled') return; reactiveAgent.status = 'failed'; @@ -527,7 +627,10 @@ export default function useAgentSession() { } return; } - await pollExecuteJob(msg, res.jobId); + const streamed = await streamJobEvents('execute', msg, res.jobId); + if (!streamed && ensureTurnNotCanceled(msg)) { + await pollExecuteJob(msg, res.jobId); + } } catch (error) { if (!ensureTurnNotCanceled(msg)) return; msg.status = 'failed'; @@ -541,6 +644,8 @@ export default function useAgentSession() { const jobId = msg.executeJobId || msg.planJobId; stopPolling(`${msg.id}:plan`); stopPolling(`${msg.id}:execute`); + stopStream(`${msg.id}:plan`); + stopStream(`${msg.id}:execute`); if (!jobId) return; try { await cancelAgentJob(jobId); diff --git a/web/src/types/agent.d.ts b/web/src/types/agent.d.ts index f143e2b..23f78e4 100644 --- a/web/src/types/agent.d.ts +++ b/web/src/types/agent.d.ts @@ -12,6 +12,16 @@ export type AgentJobPhase = | 'completed' | 'failed' | 'canceled'; +export type AgentJobEventType = + | 'job.queued' + | 'job.running' + | 'plan.ready' + | 'tool.started' + | 'tool.succeeded' + | 'tool.failed' + | 'job.succeeded' + | 'job.failed' + | 'job.canceled'; export interface AgentDataPolicy { allowFileContent: boolean; @@ -112,6 +122,18 @@ export interface AgentExecutionResult { finishedAt: string; } +export interface AgentJobEvent { + id: string; + jobId: string; + taskType: string; + type: AgentJobEventType; + status: string; + agentPhase?: AgentJobPhase | string | null; + message: string; + data: Record; + timestamp: string; +} + export type AgentBackgroundJob> = BackgroundJob & { agentPhase?: AgentJobPhase | null; cancelRequestedAt?: string | null; From 80b88bdae689f34fdd4802aca0ead9c0377bbcc0 Mon Sep 17 00:00:00 2001 From: 0neStep <146049978+AperturePlus@users.noreply.github.com> Date: Tue, 26 May 2026 10:24:25 +0800 Subject: [PATCH 03/18] feat(web): render agent activity events in TurnEntry component - Display lightweight agent activity events (e.g. tool calls) above the answer - Filter out terminal job.succeeded events, show last 4 events - Update dev Library page turns with sample event data --- .../organisms/agent/TurnEntry.spec.ts | 39 +++++++++++++++++++ .../components/organisms/agent/TurnEntry.vue | 37 ++++++++++++++++++ web/src/pages/__dev/Library.vue | 15 ++++++- 3 files changed, 90 insertions(+), 1 deletion(-) diff --git a/web/src/components/organisms/agent/TurnEntry.spec.ts b/web/src/components/organisms/agent/TurnEntry.spec.ts index bd18def..a841c78 100644 --- a/web/src/components/organisms/agent/TurnEntry.spec.ts +++ b/web/src/components/organisms/agent/TurnEntry.spec.ts @@ -9,6 +9,7 @@ const baseTurn = (overrides: Partial = {}): AgentTurn => ({ role: 'user', content: 'do it', status: 'succeeded', + events: [], timestamp: '2026-05-20T00:00:00Z', }, agent: { @@ -16,6 +17,7 @@ const baseTurn = (overrides: Partial = {}): AgentTurn => ({ role: 'agent', content: '', status: 'succeeded', + events: [], timestamp: '2026-05-20T00:00:00Z', planHash: 'hash-1', planResult: { @@ -62,6 +64,43 @@ describe('organisms/agent/TurnEntry', () => { expect(w.text()).not.toContain('plan summary text'); }); + it('renders lightweight agent activity events before the answer', () => { + const w = mount(TurnEntry, { + props: { + turn: baseTurn({ + events: [ + { + id: 'ev-1', + jobId: 'e-1', + taskType: 'agent.execute', + type: 'tool.started', + status: 'running', + agentPhase: 'executing', + message: '正在读取名称包含“银翼杀手”的视频文件数量。', + data: {}, + timestamp: '2026-05-20T00:00:01Z', + }, + ], + executeResult: { + planJobId: 'p-1', + executeJobId: 'e-1', + summary: 'execution summary text', + answer: '你上传了 2 部名称包含“银翼杀手”的电影(按视频文件统计)。', + appliedActions: 1, + skippedActions: 0, + warnings: [], + finishedAt: '2026-05-20T00:01:00Z', + }, + }), + policy: 'confirm', + focused: false, + }, + }); + + expect(w.text()).toContain('正在读取名称包含'); + expect(w.text()).toContain('你上传了 2 部名称包含'); + }); + it('hides Execute button when policy=planOnly', () => { const w = mount(TurnEntry, { props: { turn: baseTurn(), policy: 'planOnly', focused: false }, diff --git a/web/src/components/organisms/agent/TurnEntry.vue b/web/src/components/organisms/agent/TurnEntry.vue index 0173611..a8c0a34 100644 --- a/web/src/components/organisms/agent/TurnEntry.vue +++ b/web/src/components/organisms/agent/TurnEntry.vue @@ -35,6 +35,12 @@ const resultText = computed( () => props.turn.agent.executeResult?.answer || props.turn.agent.executeResult?.summary || '', ); +const activityEvents = computed(() => + (props.turn.agent.events || []) + .filter((event) => event.message && !event.type.startsWith('job.succeeded')) + .slice(-4), +); + const statusLabel = computed(() => { const key = `agent.v2.turn.status.${props.turn.agent.status}` as LocaleKey; return t(key); @@ -71,6 +77,13 @@ const formatTime = (iso: string) => {
+
    +
  1. + + {{ event.message }} +
  2. +
+

{{ resultText }}

@@ -191,6 +204,30 @@ const formatTime = (iso: string) => { .ff-te__sum { margin: 0; color: var(--text-primary); } .ff-te__answer { white-space: pre-wrap; } +.ff-te__events { + display: flex; + flex-direction: column; + gap: 4px; + margin: 0; + padding: 0; + list-style: none; +} +.ff-te__event { + display: flex; + align-items: center; + gap: 8px; + min-height: 18px; + font-family: var(--font-mono); + font-size: var(--text-small); + color: var(--text-tertiary); +} +.ff-te__event-dot { + width: 5px; + height: 5px; + background: var(--ac); + flex: 0 0 auto; +} + .ff-te__actions { border: 1px solid var(--border-subtle); border-bottom: 0; diff --git a/web/src/pages/__dev/Library.vue b/web/src/pages/__dev/Library.vue index 4802ede..29a6091 100644 --- a/web/src/pages/__dev/Library.vue +++ b/web/src/pages/__dev/Library.vue @@ -217,9 +217,22 @@ const agPolicy = ref<'planOnly' | 'confirm' | 'autopilot'>('confirm'); const agReasoningEffort = ref<'adaptive' | 'low' | 'medium' | 'high' | 'xhigh' | 'max'>('adaptive'); const makeTurn = (status: 'pending' | 'running' | 'succeeded' | 'failed' | 'canceled', withPlan = true): AgentTurn => ({ - user: { id: `u-${status}`, role: 'user', content: 'Sort by year then month', status: 'succeeded', timestamp: '2026-05-20T00:00:00Z' }, + user: { id: `u-${status}`, role: 'user', content: 'Sort by year then month', status: 'succeeded', events: [], timestamp: '2026-05-20T00:00:00Z' }, agent: { id: `a-${status}`, role: 'agent', content: '', status, + events: [ + { + id: `e-${status}-1`, + jobId: `job-${status}`, + taskType: 'agent.execute', + type: 'job.running', + status: 'running', + agentPhase: 'executing', + message: '正在执行计划。', + data: {}, + timestamp: '2026-05-20T00:00:00Z', + }, + ], timestamp: '2026-05-20T00:00:00Z', planHash: withPlan && status !== 'pending' ? 'h-' + status : undefined, planResult: withPlan && status !== 'pending' ? { From e3f97441a703ea1f6dee069702962f073f445a4c Mon Sep 17 00:00:00 2001 From: 0neStep <146049978+AperturePlus@users.noreply.github.com> Date: Tue, 26 May 2026 10:24:34 +0800 Subject: [PATCH 04/18] feat(admin): add usage statistics to admin users panel - Add AdminUserUsageStats schema with trafficBytes and agentTokens - Extend AdminUserItem with usageStats, storage usage percentage - Implement backend collection of upload traffic and agent token usage within a configurable time window (default 7 days, max 90) - Add usageFrom/usageTo query params with validation - Update frontend UsersPage with date range filters and usage display - Add mock usage events and stats generation --- app/src/fileflash/routers/admin_users.py | 12 +- app/src/fileflash/schemas/admin/users.py | 37 ++++++- app/src/fileflash/services/admin/users.py | 104 +++++++++++++++++- app/tests/test_admin_users_routes.py | 30 ++++- app/tests/test_admin_users_service.py | 43 ++++++++ web/src/api/user.ts | 14 +-- web/src/mock/handlers/user.ts | 34 +++++- web/src/mock/state.ts | 26 +++++ web/src/pages/console/users/UsersPage.spec.ts | 101 +++++++++++++++++ web/src/pages/console/users/UsersPage.vue | 91 ++++++++++++--- web/src/types/user.d.ts | 32 ++++++ 11 files changed, 488 insertions(+), 36 deletions(-) create mode 100644 web/src/pages/console/users/UsersPage.spec.ts diff --git a/app/src/fileflash/routers/admin_users.py b/app/src/fileflash/routers/admin_users.py index 09278c0..0eb8619 100644 --- a/app/src/fileflash/routers/admin_users.py +++ b/app/src/fileflash/routers/admin_users.py @@ -11,9 +11,19 @@ router = APIRouter(prefix="/admin/users", tags=["admin"]) +def get_list_admin_users_query(query: ListAdminUsersQuery = Depends()) -> ListAdminUsersQuery: + try: + query.resolve_usage_window() + except ValueError as exc: + from ..core.errors import ApiError + + raise ApiError(status_code=400, code=400, message=str(exc)) from exc + return query + + @router.get("") async def list_admin_users( - query: ListAdminUsersQuery = Depends(), + query: ListAdminUsersQuery = Depends(get_list_admin_users_query), _: User = Depends(require_admin), service: AdminUsersService = Depends(get_admin_users_service), ): diff --git a/app/src/fileflash/schemas/admin/users.py b/app/src/fileflash/schemas/admin/users.py index a62d4f8..9eab779 100644 --- a/app/src/fileflash/schemas/admin/users.py +++ b/app/src/fileflash/schemas/admin/users.py @@ -1,12 +1,28 @@ from __future__ import annotations -from datetime import datetime +from datetime import UTC, datetime, timedelta from typing import Literal +from pydantic import Field + from ..common import CamelModel, PageQuery ExternalUserStatus = Literal["active", "suspended", "pending_verification"] +DEFAULT_USAGE_WINDOW = timedelta(days=7) +MAX_USAGE_WINDOW = timedelta(days=90) + + +def _normalize_datetime(value: datetime) -> datetime: + if value.tzinfo is None: + return value.replace(tzinfo=UTC) + return value.astimezone(UTC) + + +class AdminUserUsageStats(CamelModel): + traffic_bytes: int = Field(ge=0) + agent_tokens: int = Field(ge=0) + class AdminUserItem(CamelModel): user_id: str @@ -22,6 +38,7 @@ class AdminUserItem(CamelModel): last_login_at: datetime | None = None last_active_at: datetime | None = None created_at: datetime + usage_stats: AdminUserUsageStats class ListAdminUsersQuery(PageQuery): @@ -30,6 +47,23 @@ class ListAdminUsersQuery(PageQuery): role: Literal["USER", "ADMIN"] | None = None sort: Literal["username", "createdAt", "storageUsed"] = "createdAt" order: Literal["asc", "desc"] = "desc" + usage_from: datetime | None = None + usage_to: datetime | None = None + + def resolve_usage_window(self, *, now: datetime | None = None) -> tuple[datetime, datetime]: + resolved_now = _normalize_datetime(now or datetime.now(UTC)) + if self.usage_from is None and self.usage_to is None: + return resolved_now - DEFAULT_USAGE_WINDOW, resolved_now + if self.usage_from is None or self.usage_to is None: + raise ValueError("usageFrom and usageTo must be provided together") + + usage_from = _normalize_datetime(self.usage_from) + usage_to = _normalize_datetime(self.usage_to) + if usage_from > usage_to: + raise ValueError("usageFrom must be earlier than or equal to usageTo") + if usage_to - usage_from > MAX_USAGE_WINDOW: + raise ValueError("usage window must not exceed 90 days") + return usage_from, usage_to class UpdateUserStatusRequest(CamelModel): @@ -44,6 +78,7 @@ class UpdateUserStatusResponse(CamelModel): __all__ = [ "AdminUserItem", + "AdminUserUsageStats", "ListAdminUsersQuery", "UpdateUserStatusRequest", "UpdateUserStatusResponse", diff --git a/app/src/fileflash/services/admin/users.py b/app/src/fileflash/services/admin/users.py index e53a881..7c86a9a 100644 --- a/app/src/fileflash/services/admin/users.py +++ b/app/src/fileflash/services/admin/users.py @@ -6,9 +6,16 @@ from sqlalchemy.ext.asyncio import AsyncSession from ...core.errors import ApiError -from ...models.enums import UserRole, UserStatus +from ...models.enums import UploadTaskStatus, UserRole, UserStatus from ...models.tables_identity import User, UserSession -from ...schemas.admin.users import AdminUserItem, ListAdminUsersQuery, UpdateUserStatusResponse +from ...models.tables_storage import UploadTask +from ...models.tables_worker import BackgroundJob +from ...schemas.admin.users import ( + AdminUserItem, + AdminUserUsageStats, + ListAdminUsersQuery, + UpdateUserStatusResponse, +) from ...schemas.common import PaginatedData, PaginationMeta from ._status import external_to_internal, internal_to_external @@ -42,8 +49,25 @@ async def list_users(self, *, query: ListAdminUsersQuery) -> PaginatedData[Admin offset = (query.page - 1) * query.per_page rows = list(await self.db.scalars(statement.offset(offset).limit(query.per_page))) - last_seen_map = await self._collect_last_seen([int(row.user_id) for row in rows]) - items = [self._to_item(row, last_seen_map.get(int(row.user_id))) for row in rows] + user_ids = [int(row.user_id) for row in rows] + last_seen_map = await self._collect_last_seen(user_ids) + usage_from, usage_to = self._resolve_usage_window(query) + usage_map = await self._collect_usage_stats( + user_ids=user_ids, + usage_from=usage_from, + usage_to=usage_to, + ) + items = [ + self._to_item( + row, + last_seen_map.get(int(row.user_id)), + usage_map.get( + int(row.user_id), + AdminUserUsageStats(traffic_bytes=0, agent_tokens=0), + ), + ) + for row in rows + ] return PaginatedData( items=items, pagination=PaginationMeta( @@ -56,6 +80,71 @@ async def list_users(self, *, query: ListAdminUsersQuery) -> PaginatedData[Admin ), ) + @staticmethod + def _resolve_usage_window(query: ListAdminUsersQuery) -> tuple[datetime, datetime]: + try: + return query.resolve_usage_window() + except ValueError as exc: + raise ApiError(status_code=400, code=400, message=str(exc)) from exc + + async def _collect_usage_stats( + self, + *, + user_ids: list[int], + usage_from: datetime, + usage_to: datetime, + ) -> dict[int, AdminUserUsageStats]: + if not user_ids: + return {} + + traffic_rows = await self.db.execute( + select(UploadTask.user_id, func.coalesce(func.sum(UploadTask.total_size), 0)) + .where( + and_( + UploadTask.user_id.in_(user_ids), + UploadTask.status == UploadTaskStatus.COMPLETED, + UploadTask.completed_at.is_not(None), + UploadTask.completed_at >= usage_from, + UploadTask.completed_at <= usage_to, + ) + ) + .group_by(UploadTask.user_id) + ) + stats: dict[int, AdminUserUsageStats] = { + int(user_id): AdminUserUsageStats(traffic_bytes=int(total or 0), agent_tokens=0) + for user_id, total in traffic_rows.all() + } + + token_expr = BackgroundJob.result["costEstimate"]["tokens"].as_integer() + agent_rows = await self.db.execute( + select( + BackgroundJob.requested_by, + func.coalesce(func.sum(func.coalesce(token_expr, 0)), 0), + ) + .where( + and_( + BackgroundJob.requested_by.in_(user_ids), + BackgroundJob.task_type == "agent.plan", + BackgroundJob.status == "succeeded", + BackgroundJob.finished_at.is_not(None), + BackgroundJob.finished_at >= usage_from, + BackgroundJob.finished_at <= usage_to, + ) + ) + .group_by(BackgroundJob.requested_by) + ) + for user_id, total in agent_rows.all(): + if user_id is None: + continue + key = int(user_id) + current = stats.get(key, AdminUserUsageStats(traffic_bytes=0, agent_tokens=0)) + stats[key] = AdminUserUsageStats( + traffic_bytes=current.traffic_bytes, + agent_tokens=int(total or 0), + ) + + return stats + async def set_status(self, *, user_id: int, external_status: str) -> UpdateUserStatusResponse: target = await self.db.get(User, user_id) if target is None or target.deleted_at is not None: @@ -113,7 +202,11 @@ async def _collect_last_seen(self, user_ids: list[int]) -> dict[int, datetime]: return {int(user_id): seen for user_id, seen in rows.all()} @staticmethod - def _to_item(row: User, last_active_at: datetime | None) -> AdminUserItem: + def _to_item( + row: User, + last_active_at: datetime | None, + usage_stats: AdminUserUsageStats, + ) -> AdminUserItem: limit = max(int(row.storage_limit), 1) return AdminUserItem( user_id=str(row.user_id), @@ -129,6 +222,7 @@ def _to_item(row: User, last_active_at: datetime | None) -> AdminUserItem: last_login_at=row.last_login_at, last_active_at=last_active_at, created_at=row.created_at, + usage_stats=usage_stats, ) diff --git a/app/tests/test_admin_users_routes.py b/app/tests/test_admin_users_routes.py index b5cccae..bff1702 100644 --- a/app/tests/test_admin_users_routes.py +++ b/app/tests/test_admin_users_routes.py @@ -9,7 +9,7 @@ from fileflash.core.deps import get_admin_users_service, require_admin from fileflash.core.errors import ApiError, api_error_handler from fileflash.routers.admin_users import router as admin_users_router -from fileflash.schemas.admin.users import AdminUserItem, UpdateUserStatusResponse +from fileflash.schemas.admin.users import AdminUserItem, AdminUserUsageStats, UpdateUserStatusResponse from fileflash.schemas.common import PaginatedData, PaginationMeta @@ -29,6 +29,7 @@ async def list_users(self, *, query): # noqa: ANN001 last_login_at=None, last_active_at=None, created_at=datetime.now(UTC), + usage_stats=AdminUserUsageStats(traffic_bytes=1024, agent_tokens=42), ) return PaginatedData( items=[item], @@ -73,6 +74,7 @@ def test_admin_can_list_users() -> None: body = resp.json() assert body["success"] is True assert body["data"]["items"][0]["username"] == "alice" + assert body["data"]["items"][0]["usageStats"] == {"trafficBytes": 1024, "agentTokens": 42} def test_non_admin_gets_403() -> None: @@ -81,6 +83,32 @@ def test_non_admin_gets_403() -> None: assert resp.status_code == 403 +def test_usage_window_requires_both_bounds() -> None: + with _client(admin=True) as c: + resp = c.get("/api/v1/admin/users?usageFrom=2026-01-01T00:00:00Z") + assert resp.status_code == 400 + + +def test_usage_window_rejects_reversed_bounds() -> None: + with _client(admin=True) as c: + resp = c.get( + "/api/v1/admin/users" + "?usageFrom=2026-02-01T00:00:00Z" + "&usageTo=2026-01-01T00:00:00Z" + ) + assert resp.status_code == 400 + + +def test_usage_window_rejects_more_than_90_days() -> None: + with _client(admin=True) as c: + resp = c.get( + "/api/v1/admin/users" + "?usageFrom=2026-01-01T00:00:00Z" + "&usageTo=2026-04-02T00:00:00Z" + ) + assert resp.status_code == 400 + + def test_admin_can_patch_status() -> None: with _client(admin=True) as c: resp = c.patch("/api/v1/admin/users/42/status", json={"status": "suspended"}) diff --git a/app/tests/test_admin_users_service.py b/app/tests/test_admin_users_service.py index 9e455a8..1370023 100644 --- a/app/tests/test_admin_users_service.py +++ b/app/tests/test_admin_users_service.py @@ -42,6 +42,14 @@ def __init__(self) -> None: self.execute = AsyncMock() +class ResultRows: + def __init__(self, rows) -> None: # noqa: ANN001 + self._rows = rows + + def all(self): # noqa: ANN201 + return self._rows + + @pytest.mark.asyncio async def test_list_users_returns_paginated_items() -> None: session = DummySession() @@ -55,6 +63,41 @@ async def test_list_users_returns_paginated_items() -> None: assert result.pagination.total_items == 1 assert result.items[0].username == "alice" assert result.items[0].status == "active" + assert result.items[0].usage_stats.traffic_bytes == 0 + assert result.items[0].usage_stats.agent_tokens == 0 + + +def test_list_users_query_default_usage_window() -> None: + now = datetime(2026, 5, 26, 12, 0, tzinfo=UTC) + usage_from, usage_to = ListAdminUsersQuery().resolve_usage_window(now=now) + + assert usage_to == now + assert (usage_to - usage_from).days == 7 + + +@pytest.mark.asyncio +async def test_collect_usage_stats_aggregates_traffic_and_tokens() -> None: + session = DummySession() + session.execute = AsyncMock( + side_effect=[ + ResultRows([(1, 2048), (2, 4096)]), + ResultRows([(1, 1500), (3, None)]), + ] + ) + service = AdminUsersService(db=session) # type: ignore[arg-type] + + stats = await service._collect_usage_stats( + user_ids=[1, 2, 3], + usage_from=datetime(2026, 5, 1, tzinfo=UTC), + usage_to=datetime(2026, 5, 26, tzinfo=UTC), + ) + + assert stats[1].traffic_bytes == 2048 + assert stats[1].agent_tokens == 1500 + assert stats[2].traffic_bytes == 4096 + assert stats[2].agent_tokens == 0 + assert stats[3].traffic_bytes == 0 + assert stats[3].agent_tokens == 0 @pytest.mark.asyncio diff --git a/web/src/api/user.ts b/web/src/api/user.ts index 4c02748..13256ea 100644 --- a/web/src/api/user.ts +++ b/web/src/api/user.ts @@ -13,7 +13,9 @@ import type { StorageStats, ActivityLog, GetActivityLogRequest, - User + User, + AdminUserItem, + GetAdminUsersParams, } from '../types/user'; import type { PaginatedData } from '../types/base'; @@ -144,14 +146,8 @@ export const getUsers = (params: { search?: string; page?: number; perPage?: num return http.get>('/users', params); }; -export const getAdminUsers = (params: { - page?: number; - perPage?: number; - search?: string; - status?: 'active' | 'suspended'; - role?: 'user' | 'admin'; -}) => { - return http.get>('/admin/users', params); +export const getAdminUsers = (params: GetAdminUsersParams) => { + return http.get>('/admin/users', params); }; export const updateUserStatus = (userId: string, status: 'active' | 'suspended') => { diff --git a/web/src/mock/handlers/user.ts b/web/src/mock/handlers/user.ts index fbd0d9d..7bbdf21 100644 --- a/web/src/mock/handlers/user.ts +++ b/web/src/mock/handlers/user.ts @@ -5,6 +5,7 @@ import { getCurrentUser, mockLogs, mockRegistrationEmailDomainRules, + mockUsageEvents, mockUsers, paginate, } from '../state'; @@ -43,6 +44,32 @@ function isAllowedEmailDomain(email: string) { }); } +function usageWindow(url: URL) { + const usageFrom = url.searchParams.get('usageFrom'); + const usageTo = url.searchParams.get('usageTo'); + const now = Date.now(); + const from = usageFrom ? Date.parse(usageFrom) : now - 7 * 24 * 60 * 60 * 1000; + const to = usageTo ? Date.parse(usageTo) : now; + return { + from: Number.isFinite(from) ? from : now - 7 * 24 * 60 * 60 * 1000, + to: Number.isFinite(to) ? to : now, + }; +} + +function usageStatsForUser(userId: string, window: { from: number; to: number }) { + return mockUsageEvents.reduce((stats, event) => { + if (event.userId !== userId) return stats; + const occurredAt = Date.parse(event.occurredAt); + if (!Number.isFinite(occurredAt) || occurredAt < window.from || occurredAt > window.to) { + return stats; + } + return { + trafficBytes: stats.trafficBytes + event.trafficBytes, + agentTokens: stats.agentTokens + event.agentTokens, + }; + }, { trafficBytes: 0, agentTokens: 0 }); +} + export const setupUserMocks = () => { Mock.mock(/\/api\/v1\/users(?:\?.*)?$/, 'get', (options) => { const url = new URL(options.url, 'http://localhost'); @@ -80,6 +107,7 @@ export const setupUserMocks = () => { const search = (url.searchParams.get('search') || '').toLowerCase(); const statusFilter = url.searchParams.get('status'); const roleFilter = url.searchParams.get('role'); + const window = usageWindow(url); const filtered = mockUsers.filter((user) => { if (search) { @@ -88,7 +116,7 @@ export const setupUserMocks = () => { if (!hit) return false; } if (statusFilter && user.status !== statusFilter) return false; - if (roleFilter && user.role !== roleFilter) return false; + if (roleFilter && user.role.toUpperCase() !== roleFilter.toUpperCase()) return false; return true; }); @@ -101,8 +129,10 @@ export const setupUserMocks = () => { emailVerified: user.emailVerified, emailVerifiedAt: user.emailVerifiedAt, createdAt: user.createdAt, - role: user.role, + role: user.role.toUpperCase(), status: user.status, + usagePercentage: Number(((user.storageUsed / user.storageLimit) * 100).toFixed(2)), + usageStats: usageStatsForUser(user.userId, window), lastActiveAt: new Date(Date.now() - Mock.Random.integer(1, 72) * 3600000).toISOString(), lastLoginAt: new Date(Date.now() - Mock.Random.integer(1, 240) * 3600000).toISOString(), })); diff --git a/web/src/mock/state.ts b/web/src/mock/state.ts index 8789787..e2e70e1 100644 --- a/web/src/mock/state.ts +++ b/web/src/mock/state.ts @@ -15,6 +15,13 @@ export type MockUserRecord = User & { preference: UserPreference; }; +export interface MockUsageEvent { + userId: string; + occurredAt: string; + trafficBytes: number; + agentTokens: number; +} + const now = () => new Date().toISOString(); const randomRecentTime = (maxHours = 72) => { @@ -174,6 +181,25 @@ export const mockUsers: MockUserRecord[] = [ }, ]; +export const mockUsageEvents: MockUsageEvent[] = mockUsers.flatMap((user, index) => { + const recentAt = new Date(Date.now() - (index + 1) * 24 * 60 * 60 * 1000).toISOString(); + const olderAt = new Date(Date.now() - (index + 20) * 24 * 60 * 60 * 1000).toISOString(); + return [ + { + userId: user.userId, + occurredAt: recentAt, + trafficBytes: (index + 1) * 256 * 1024 * 1024, + agentTokens: (index + 1) * 1250, + }, + { + userId: user.userId, + occurredAt: olderAt, + trafficBytes: (index + 1) * 64 * 1024 * 1024, + agentTokens: (index + 1) * 300, + }, + ]; +}); + export const mockShares: Array = [ { shareId: 'share_1001', diff --git a/web/src/pages/console/users/UsersPage.spec.ts b/web/src/pages/console/users/UsersPage.spec.ts new file mode 100644 index 0000000..c91475b --- /dev/null +++ b/web/src/pages/console/users/UsersPage.spec.ts @@ -0,0 +1,101 @@ +import { flushPromises, mount } from '@vue/test-utils'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +vi.mock('../../../api/user', () => ({ + getAdminUsers: vi.fn(), + updateUserStatus: vi.fn(), +})); + +vi.mock('../../../utils/ui', () => ({ + ui: { + toast: vi.fn(), + }, +})); + +import { getAdminUsers } from '../../../api/user'; +import UsersPage from './UsersPage.vue'; + +const getAdminUsersMock = vi.mocked(getAdminUsers); + +function pageData() { + return { + items: [ + { + userId: '1', + username: 'alice', + email: 'alice@example.com', + role: 'USER' as const, + status: 'active' as const, + emailVerified: true, + emailVerifiedAt: null, + storageLimit: 1024, + storageUsed: 0, + usagePercentage: 0, + lastLoginAt: null, + lastActiveAt: null, + createdAt: '2026-05-01T00:00:00Z', + usageStats: { + trafficBytes: 1536, + agentTokens: 12345, + }, + }, + ], + pagination: { + totalItems: 1, + totalPages: 1, + perPage: 20, + currentPage: 1, + hasPrev: false, + hasNext: false, + }, + }; +} + +describe('UsersPage', () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date('2026-05-26T12:00:00.000Z')); + getAdminUsersMock.mockResolvedValue(pageData()); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.clearAllMocks(); + }); + + it('loads the default previous 7 day usage window', async () => { + mount(UsersPage); + await flushPromises(); + + expect(getAdminUsersMock).toHaveBeenCalledWith(expect.objectContaining({ + page: 1, + perPage: 20, + usageFrom: '2026-05-19T00:00:00.000Z', + usageTo: '2026-05-26T23:59:59.999Z', + })); + }); + + it('reloads with the edited usage window after apply', async () => { + const wrapper = mount(UsersPage); + await flushPromises(); + + const dateInputs = wrapper.findAll('input[type="date"]'); + await dateInputs[0].setValue('2026-05-01'); + await dateInputs[1].setValue('2026-05-10'); + await wrapper.find('.filter-bar__apply').trigger('click'); + await flushPromises(); + + expect(getAdminUsersMock).toHaveBeenLastCalledWith(expect.objectContaining({ + usageFrom: '2026-05-01T00:00:00.000Z', + usageTo: '2026-05-10T23:59:59.999Z', + })); + }); + + it('renders upload traffic and agent tokens', async () => { + const wrapper = mount(UsersPage); + await flushPromises(); + + expect(wrapper.text()).toContain('Uploaded 1.5 KB'); + expect(wrapper.text()).toContain('Agent 12,345 tokens'); + }); +}); diff --git a/web/src/pages/console/users/UsersPage.vue b/web/src/pages/console/users/UsersPage.vue index db14822..1a27a93 100644 --- a/web/src/pages/console/users/UsersPage.vue +++ b/web/src/pages/console/users/UsersPage.vue @@ -2,23 +2,46 @@ import { onMounted, ref } from 'vue'; import { getAdminUsers, updateUserStatus } from '../../../api/user'; import { AdminTable, FilterBar, StatusBadge } from '../../../components/console'; +import type { AdminUserItem } from '../../../types/user'; import { ui } from '../../../utils/ui'; -interface AdminUser { - userId: string; - username: string; - email: string; - role: string; - status: 'active' | 'suspended'; - lastLoginAt: string | null; - createdAt: string; +const DAY_MS = 24 * 60 * 60 * 1000; + +function toDateInput(date: Date) { + return date.toISOString().slice(0, 10); +} + +function startOfUtcDay(value: string) { + return `${value}T00:00:00.000Z`; +} + +function endOfUtcDay(value: string) { + return `${value}T23:59:59.999Z`; +} + +function formatBytes(bytes: number) { + const units = ['B', 'KB', 'MB', 'GB', 'TB']; + let value = Math.max(0, bytes); + let unitIndex = 0; + while (value >= 1024 && unitIndex < units.length - 1) { + value /= 1024; + unitIndex += 1; + } + return `${value.toFixed(unitIndex === 0 ? 0 : 1)} ${units[unitIndex]}`; +} + +function formatNumber(value: number) { + return new Intl.NumberFormat('en-US').format(value); } -const items = ref([]); +const today = new Date(); +const items = ref([]); const totalPages = ref(1); const currentPage = ref(1); const search = ref(''); const status = ref<'all' | 'active' | 'suspended'>('all'); +const usageFrom = ref(toDateInput(new Date(today.getTime() - 7 * DAY_MS))); +const usageTo = ref(toDateInput(today)); const loading = ref(false); async function load(page = 1) { @@ -29,8 +52,12 @@ async function load(page = 1) { perPage: 20, ...(search.value ? { search: search.value.trim() } : {}), ...(status.value !== 'all' ? { status: status.value } : {}), + ...(usageFrom.value && usageTo.value ? { + usageFrom: startOfUtcDay(usageFrom.value), + usageTo: endOfUtcDay(usageTo.value), + } : {}), }); - items.value = resp.items as AdminUser[]; + items.value = resp.items; totalPages.value = resp.pagination.totalPages; currentPage.value = resp.pagination.currentPage; } finally { @@ -38,7 +65,8 @@ async function load(page = 1) { } } -async function toggleStatus(user: AdminUser) { +async function toggleStatus(user: AdminUserItem) { + if (user.status !== 'active' && user.status !== 'suspended') return; const next = user.status === 'active' ? 'suspended' : 'active'; await updateUserStatus(user.userId, next); user.status = next; @@ -59,6 +87,14 @@ onMounted(() => load(1)); + + load(1));