diff --git a/src/opencmo/report_charts.py b/src/opencmo/report_charts.py new file mode 100644 index 0000000..d72b7af --- /dev/null +++ b/src/opencmo/report_charts.py @@ -0,0 +1,252 @@ +"""Deterministic SVG chart generation for persisted AI CMO reports.""" + +from __future__ import annotations + +import html +import os +import re +import uuid +from dataclasses import asdict, dataclass +from pathlib import Path +from typing import Any, Iterable + + +@dataclass(frozen=True) +class ReportChart: + title: str + description: str + data_source: str + points_count: int + asset_id: str + markdown: str + degraded: bool = False + + def to_meta(self) -> dict[str, Any]: + payload = asdict(self) + payload.pop("markdown", None) + return payload + + +def get_report_asset_dir() -> Path: + configured = os.environ.get("OPENCMO_REPORT_ASSET_DIR") + if configured: + return Path(configured) + db_path = os.environ.get("OPENCMO_DB_PATH") + if db_path: + return Path(db_path).expanduser().parent / "report_assets" + return Path.home() / ".opencmo" / "report_assets" + + +def get_report_asset_path(asset_id: str) -> Path | None: + if not re.fullmatch(r"[a-f0-9]{32}", asset_id): + return None + return get_report_asset_dir() / f"{asset_id}.svg" + + +def delete_chart_assets(asset_ids: Iterable[str]) -> None: + for asset_id in asset_ids: + asset_path = get_report_asset_path(asset_id) + if not asset_path: + continue + try: + asset_path.unlink(missing_ok=True) + except OSError: + continue + + +def charts_to_markdown(charts: list[ReportChart]) -> str: + if not charts: + return "当前数据不足,未生成图表。" + blocks = [] + for chart in charts: + blocks.append( + "\n".join( + [ + f"### {chart.title}", + chart.markdown, + f"图表说明:{chart.description}", + f"数据来源:`{chart.data_source}`;数据点:{chart.points_count}。", + "数据限制:图表只使用系统已采集到的真实数据,缺失值不会被补造。", + ] + ) + ) + return "\n\n".join(blocks) + + +def build_report_charts(kind: str, facts: dict, meta: dict) -> list[ReportChart]: + charts: list[ReportChart] = [] + charts.extend(_strategic_charts(facts, meta) if kind == "strategic" else _periodic_charts(facts, meta)) + return charts[:4] + + +def _asset_id() -> str: + return uuid.uuid4().hex + + +def _write_svg(asset_id: str, svg: str) -> None: + directory = get_report_asset_dir() + directory.mkdir(parents=True, exist_ok=True) + (directory / f"{asset_id}.svg").write_text(svg, encoding="utf-8") + + +def _markdown(asset_id: str, title: str) -> str: + return f"![{title}](/api/v1/report-assets/{asset_id}.svg)" + + +def _to_percent(value: Any) -> float | None: + if value is None: + return None + try: + num = float(value) + except (TypeError, ValueError): + return None + if 0 <= num <= 1: + return round(num * 100, 1) + return round(num, 1) + + +def _number(value: Any) -> float | None: + if value is None: + return None + try: + return float(value) + except (TypeError, ValueError): + return None + + +def _latest_first(items: list[dict], date_key: str) -> list[dict]: + return sorted(items or [], key=lambda item: str(item.get(date_key) or ""), reverse=True) + + +def _strategic_charts(facts: dict, meta: dict) -> list[ReportChart]: + charts: list[ReportChart] = [] + latest = facts.get("latest_scans") or {} + kpis = [ + ("SEO", _to_percent((latest.get("seo") or {}).get("score"))), + ("GEO", _number((latest.get("geo") or {}).get("score"))), + ("Citability", _to_percent((facts.get("citability") or [{}])[0].get("avg_score") if facts.get("citability") else None)), + ("Brand", _number((facts.get("brand_presence") or [{}])[0].get("footprint_score") if facts.get("brand_presence") else None)), + ("Community", _number((latest.get("community") or {}).get("total_hits"))), + ] + kpis = [(label, value) for label, value in kpis if value is not None] + if kpis: + charts.append(_bar_chart("关键指标快照", kpis, "latest_scans/citability/brand_presence", "SEO、GEO、AI 引文可信度、品牌足迹与社区命中的当前快照。")) + + serp = [ + (str(item.get("keyword") or "keyword")[:24], _number(item.get("position"))) + for item in (facts.get("serp_latest") or []) + if item.get("position") is not None + ][:8] + if serp: + charts.append(_bar_chart("SERP 当前排名(数字越小越靠前)", serp, "serp_latest.position", "已跟踪关键词的当前自然搜索排名。")) + + coverage = [ + ("有数据", _number(meta.get("sample_count"))), + ("总数据源", _number(meta.get("total_data_sources"))), + ] + if all(value is not None for _, value in coverage): + charts.append(_bar_chart("数据覆盖度", coverage, "meta.sample_count/meta.total_data_sources", "本报告事实包的数据源覆盖情况。")) + + distribution = _finding_distribution(facts) + if distribution: + charts.append(_bar_chart("风险与建议分布", distribution, "findings/recommendations", "近期发现与建议按优先级聚合后的执行压力。")) + return charts + + +def _periodic_charts(facts: dict, meta: dict) -> list[ReportChart]: + charts: list[ReportChart] = [] + trend_series = [ + ("SEO", [(item.get("scanned_at"), _to_percent(item.get("score_performance"))) for item in _latest_first(facts.get("seo_history") or [], "scanned_at")]), + ("GEO", [(item.get("scanned_at"), _number(item.get("geo_score"))) for item in _latest_first(facts.get("geo_history") or [], "scanned_at")]), + ("Community", [(item.get("scanned_at"), _number(item.get("total_hits"))) for item in _latest_first(facts.get("community_history") or [], "scanned_at")]), + ] + for title, series in trend_series: + points = [(label, value) for label, value in reversed(series) if value is not None] + if len(points) >= 2: + charts.append(_line_chart(f"{title} 趋势", points[-10:], f"{title.lower()}_history", f"{title} 在本报告窗口内的真实历史走势。")) + + citability = [(item.get("created_at") or item.get("scanned_at"), _to_percent(item.get("avg_score"))) for item in _latest_first(facts.get("citability") or [], "created_at")] + citability_points = [(label, value) for label, value in reversed(citability) if value is not None] + if len(citability_points) >= 2: + charts.append(_line_chart("AI 引文可信度趋势", citability_points[-10:], "citability.avg_score", "AI 引文可信度在最近样本中的走势。")) + + distribution = _finding_distribution(facts) + if distribution: + charts.append(_bar_chart("本周风险与建议分布", distribution, "findings/recommendations", "本周期可行动问题按优先级聚合后的分布。")) + return charts + + +def _finding_distribution(facts: dict) -> list[tuple[str, float]]: + counts = {"high": 0, "medium": 0, "low": 0, "unknown": 0} + for item in facts.get("findings") or []: + priority = str((item.get("severity") or item.get("priority") or "unknown")).lower() + counts[priority if priority in counts else "unknown"] += 1 + for item in facts.get("recommendations") or []: + priority = str(item.get("priority") or "unknown").lower() + counts[priority if priority in counts else "unknown"] += 1 + return [(label, count) for label, count in counts.items() if count] + + +def _bar_chart(title: str, values: list[tuple[str, float | None]], source: str, description: str) -> ReportChart: + values = [(label, float(value)) for label, value in values if value is not None] + asset_id = _asset_id() + max_value = max((value for _, value in values), default=1) or 1 + width = 760 + row_h = 42 + height = 110 + row_h * len(values) + rows = [] + for index, (label, value) in enumerate(values): + y = 76 + index * row_h + bar_w = max(4, int((value / max_value) * 460)) + rows.append(f'{html.escape(label)}') + rows.append(f'') + rows.append(f'{value:g}') + svg = _svg_frame(width, height, title, "\n".join(rows)) + _write_svg(asset_id, svg) + return ReportChart(title, description, source, len(values), asset_id, _markdown(asset_id, title)) + + +def _line_chart(title: str, points: list[tuple[Any, float]], source: str, description: str) -> ReportChart: + points = [(str(label or index + 1), float(value)) for index, (label, value) in enumerate(points)] + asset_id = _asset_id() + width = 760 + height = 320 + min_v = min(value for _, value in points) + max_v = max(value for _, value in points) + span = max(max_v - min_v, 1) + left, right, top, bottom = 70, 700, 70, 250 + coords = [] + for index, (_, value) in enumerate(points): + x = left + (right - left) * (index / max(len(points) - 1, 1)) + y = bottom - ((value - min_v) / span) * (bottom - top) + coords.append((x, y, value)) + path = " ".join(("M" if index == 0 else "L") + f" {x:.1f} {y:.1f}" for index, (x, y, _) in enumerate(coords)) + circles = "\n".join( + f'{value:g}' + for x, y, value in coords + ) + labels = "\n".join( + f'{html.escape(label[:10])}' + for (label, _), (x, _, _) in zip(points, coords) + ) + body = ( + f'' + f'' + f'{max_v:g}' + f'{min_v:g}' + f'' + f"{circles}{labels}" + ) + svg = _svg_frame(width, height, title, body) + _write_svg(asset_id, svg) + return ReportChart(title, description, source, len(points), asset_id, _markdown(asset_id, title)) + + +def _svg_frame(width: int, height: int, title: str, body: str) -> str: + return ( + f'' + '' + f'' + f'{html.escape(title)}' + f"{body}" + ) diff --git a/src/opencmo/report_pipeline.py b/src/opencmo/report_pipeline.py index f15890f..d8672fa 100644 --- a/src/opencmo/report_pipeline.py +++ b/src/opencmo/report_pipeline.py @@ -458,6 +458,8 @@ async def _bounded_distill(dim): 3. 每个章节必须指定使用哪些 insights (用 id 引用) 作为论据 4. 章节数量:4-6 个主体章节 5. 引言和战略建议章节标记为 is_final_section: true(它们最后写) +6. 标题层级必须清晰:最终 Markdown 只能使用 `#`、`##`、`###`,不能规划更深层级 +7. 必须规划一个图表解释章节,用于解释后端提供的真实图表,不要要求模型自行创造图表数据 输出 JSON 格式: { @@ -493,6 +495,8 @@ async def _phase_plan_outline( f" 类别:{project['category']}\n" f" 网址:{project['url']}\n" f" 数据质量:{reflection.get('data_quality_score', '?')}/100\n\n" + f"可用真实图表:\n" + f"{facts.get('report_charts_markdown', '当前数据不足,未生成图表。')}\n\n" f"分析发现(共 {len(distilled.get('insights', []))} 条):\n" f"{_json_dump(distilled)}" ) @@ -535,6 +539,8 @@ async def _phase_plan_outline( 11. 对于问题诊断,必须进行根因分析(回答"为什么会这样"),列出2-3个可能原因 12. 如果有历史趋势数据,说明趋势方向和变化速度(如"过去3个月下降30%") 13. 每个问题都要关联到商业影响(流量、收入、市场份额等) +14. 标题层级只能使用 `##` 和 `###`,禁止使用 `####` 或更深标题 +15. 不要自行生成图表数据;如果需要提到图表,只能引用输入中已经提供的图表 输出纯 Markdown 文本(不要 JSON,不要代码块包裹)。 以 ## 开头写章节标题,然后是正文段落。""" @@ -691,6 +697,7 @@ async def _phase_revise_section( 4. 明确指出 1-3 个最高优先级行动和建议时间窗口,但不要编造 ROI、流量损失或竞品增速 5. 添加紧迫性提示,但只能基于输入中已经给出的事实和趋势 6. 面向CMO决策者,30秒内让人理解"为什么现在必须行动" +7. 如果输入包含真实图表,必须把图表作为证据引用,但不能改写图表数字 输出纯 Markdown(以 ## 执行摘要 开头)。""" @@ -778,6 +785,7 @@ async def _bounded_summarize(sec, content): f"网址:{project['url']}\n" f"报告标题:{outline.get('report_title', '深度分析报告')}\n" f"叙事线索:{outline.get('narrative_arc', '无')}\n\n" + f"真实图表证据:\n{facts.get('report_charts_markdown', '当前数据不足,未生成图表。')}\n\n" f"核心发现要点:\n" + "\n".join(f"- {p}" for p in distilled.get("executive_summary_points", [])) + f"\n\n贯穿主题:{', '.join(distilled.get('cross_cutting_themes', []))}\n\n" diff --git a/src/opencmo/reports.py b/src/opencmo/reports.py index d1263e9..3bba67d 100644 --- a/src/opencmo/reports.py +++ b/src/opencmo/reports.py @@ -6,6 +6,7 @@ import html import json import logging +import re from datetime import datetime, timedelta, timezone from opencmo import storage @@ -16,6 +17,8 @@ _REPORT_MODEL_DEFAULT = "gpt-5.4" _PERIODIC_WINDOW_DAYS = 7 _REPORT_LLM_TIMEOUT_SECONDS = 300.0 +_CHART_ASSET_SRC_RE = re.compile(r"^/api/v1/report-assets/[a-f0-9]{32}\.svg$") +_CHART_ASSET_REF_RE = re.compile(r"/api/v1/report-assets/[a-f0-9]{32}\.svg") _REPORT_SYSTEM_COMMON = ( "你是 AI CMO(首席营销官),拥有完整的多智能体营销系统:SEO审计专家、GEO(AI搜索可见性)分析师、" "SERP排名追踪器、社区舆情监控(Reddit/HN/Dev.to/知乎/V2EX/掘金等)、AI引文可信度(Citability)评估引擎、" @@ -120,6 +123,15 @@ def close_list() -> None: if not stripped: close_list() continue + image_match = re.fullmatch(r"!\[([^\]]*)\]\(([^)]+)\)", stripped) + if image_match: + raw_src = image_match.group(2) + if _CHART_ASSET_SRC_RE.fullmatch(raw_src): + close_list() + alt = html.escape(image_match.group(1)) + src = html.escape(raw_src, quote=True) + html_lines.append(f'
{alt}
{alt}
') + continue if stripped.startswith("### "): close_list() html_lines.append(f"

{html.escape(stripped[4:])}

") @@ -145,6 +157,74 @@ def close_list() -> None: return "\n".join(html_lines) +def _normalize_report_headings(markdown_text: str) -> str: + """Keep report heading hierarchy to H1/H2/H3 only.""" + lines: list[str] = [] + seen_h1 = False + for raw_line in markdown_text.splitlines(): + match = re.match(r"^(#{1,6})\s+(.+?)\s*$", raw_line) + if not match: + lines.append(raw_line.rstrip()) + continue + level = len(match.group(1)) + title = match.group(2).strip() + if level == 1 and not seen_h1: + seen_h1 = True + lines.append(f"# {title}") + elif level == 1: + lines.append(f"## {title}") + elif level == 2: + lines.append(f"## {title}") + else: + lines.append(f"### {title}") + return "\n".join(lines).strip() + + +def _insert_after_first_section(markdown_text: str, section: str) -> str: + lines = markdown_text.splitlines() + h2_indices = [idx for idx, line in enumerate(lines) if line.startswith("## ")] + if len(h2_indices) >= 2: + insert_at = h2_indices[1] + elif len(lines) >= 1 and lines[0].startswith("# "): + insert_at = 1 + else: + insert_at = 0 + return "\n".join([*lines[:insert_at], "", section.strip(), "", *lines[insert_at:]]).strip() + + +def _postprocess_human_report_content(content: str, charts_markdown: str) -> str: + content = _normalize_report_headings(content) + if _CHART_ASSET_REF_RE.search(content): + return content + chart_section = f"## 2. 数据图表速览\n\n{charts_markdown or '当前数据不足,未生成图表。'}" + return _insert_after_first_section(content, chart_section) + + +def _prepare_report_charts(kind: str, facts: dict, meta: dict) -> tuple[dict, dict, str]: + """Generate deterministic charts and return facts/meta copies enriched for prompts.""" + enriched_facts = dict(facts) + enriched_meta = dict(meta) + try: + from opencmo.report_charts import build_report_charts, charts_to_markdown + + charts = build_report_charts(kind, facts, meta) + charts_markdown = charts_to_markdown(charts) + enriched_facts["report_charts"] = [chart.to_meta() | {"markdown": chart.markdown} for chart in charts] + enriched_facts["report_charts_markdown"] = charts_markdown + enriched_meta["charts"] = [chart.to_meta() for chart in charts] + enriched_meta["chart_count"] = len(charts) + return enriched_facts, enriched_meta, charts_markdown + except Exception as exc: + logger.exception("Report chart generation failed for %s", kind) + enriched_meta["chart_error"] = str(exc) or exc.__class__.__name__ + charts_markdown = "当前数据不足或图表生成失败,未生成图表。" + enriched_facts["report_charts"] = [] + enriched_facts["report_charts_markdown"] = charts_markdown + enriched_meta["charts"] = [] + enriched_meta["chart_count"] = 0 + return enriched_facts, enriched_meta, charts_markdown + + async def _generate_llm_markdown(system_prompt: str, user_prompt: str, *, model_override: str | None = None) -> str: """Generate markdown with the configured LLM.""" from opencmo import llm @@ -637,6 +717,11 @@ def _prompts(kind: str, audience: str, facts: dict, meta: dict, previous_exists: if kind == "strategic" and audience == "human": system = _compose_report_system_prompt( "你的任务是生成一份极其深入的战略分析报告。输出 Markdown,报告总长度应在 2000-4000 字之间。\n\n" + "【标题与可视化硬性要求】\n" + "- `#` 只用于报告总标题,`##` 只用于一级章节,`###` 只用于二级小节,禁止使用 `####` 或更深标题。\n" + "- 必须保留并解释输入中提供的真实图表 Markdown,不能修改图表链接、标题或图表数字。\n" + "- 必须包含 `## 数据图表速览`,每张图后解释:图表说明、业务含义、数据限制。\n" + "- 一级章节标题要清晰可扫读,二级标题必须是结论型标题,不要写空泛标题。\n\n" "严格按以下 6 大模块结构生成,每个模块都必须展开详细论述,不能用简短的一两句话敷衍:\n\n" "## 1. 执行摘要与项目定性 (Executive Summary)\n" " - 一句话定义项目当前所处的增长阶段\n" @@ -675,6 +760,8 @@ def _prompts(kind: str, audience: str, facts: dict, meta: dict, previous_exists: f"版本是否已有历史报告:{previous_exists}\n" f"数据来源覆盖度:{meta.get('sample_count', 0)}/{meta.get('total_data_sources', 0)} 个数据源有数据\n" f"摘要元数据:{_json_dump(meta)}\n\n" + f"=== 必须引用的真实图表(由后端基于事实包生成,不得改写数字或链接)===\n" + f"{facts.get('report_charts_markdown', '当前数据不足,未生成图表。')}\n\n" f"=== 完整事实包(来自所有智能体的采集结果)===\n{_json_dump(facts)}" ) return system, user @@ -706,6 +793,11 @@ def _prompts(kind: str, audience: str, facts: dict, meta: dict, previous_exists: if kind == "periodic" and audience == "human": system = _compose_report_system_prompt( "你的任务是生成一份深度周报。输出 Markdown,报告总长度应在 1500-3000 字之间。\n\n" + "【标题与可视化硬性要求】\n" + "- `#` 只用于报告总标题,`##` 只用于一级章节,`###` 只用于二级小节,禁止使用 `####` 或更深标题。\n" + "- 必须保留并解释输入中提供的真实图表 Markdown,不能修改图表链接、标题或图表数字。\n" + "- 必须包含 `## 数据图表速览`,每张图后解释:图表说明、业务含义、数据限制。\n" + "- 少于 2 个时间点的指标不能写成趋势,只能写成当前快照。\n\n" "严格按以下结构生成,每个模块都要做深入的业务推导,不能停留在数据罗列层面:\n\n" "## 1. 本周最重要的变化 (Top Changes)\n" " - 列出 3-5 个最重要的变化,每个变化不仅要说「发生了什么」,还要解释「为什么重要」「对增长意味着什么」\n" @@ -733,6 +825,8 @@ def _prompts(kind: str, audience: str, facts: dict, meta: dict, previous_exists: f"统计窗口:{meta.get('window_start', '未知')} 到 {meta.get('window_end', '未知')}\n" f"数据来源覆盖度:{meta.get('sample_count', 0)}/{meta.get('total_data_sources', 0)} 个数据源有数据\n" f"元数据:{_json_dump(meta)}\n\n" + f"=== 必须引用的真实图表(由后端基于事实包生成,不得改写数字或链接)===\n" + f"{facts.get('report_charts_markdown', '当前数据不足,未生成图表。')}\n\n" f"=== 完整事实包(来自所有智能体的采集结果)===\n{_json_dump(facts)}" ) return system, user @@ -766,6 +860,16 @@ async def _generate_report_record( model = await _get_report_model() report_model = model content = "" + charts_markdown = "" + chart_asset_ids: list[str] = [] + + if audience == "human": + facts, meta, charts_markdown = _prepare_report_charts(kind, facts, meta) + chart_asset_ids = [ + chart["asset_id"] + for chart in meta.get("charts", []) + if isinstance(chart.get("asset_id"), str) + ] # Human reports use the deep multi-agent pipeline; # Agent briefs stay single-call (they need to be concise). @@ -780,6 +884,7 @@ async def _generate_report_record( used_pipeline = True if not content.strip(): raise RuntimeError("Pipeline returned empty report.") + content = _postprocess_human_report_content(content, charts_markdown) except Exception as pipeline_exc: pipeline_error = str(pipeline_exc) or pipeline_exc.__class__.__name__ logger.warning( @@ -797,9 +902,17 @@ async def _generate_report_record( used_fallback = True if fallback_model: report_model = fallback_model + content = _postprocess_human_report_content(content, charts_markdown) except Exception as exc: llm_error = str(exc) or exc.__class__.__name__ logger.exception("Report generation failed for %s/%s", kind, audience) + if chart_asset_ids: + try: + from opencmo.report_charts import delete_chart_assets + + delete_chart_assets(chart_asset_ids) + except Exception: + logger.exception("Failed to clean up chart assets for failed %s/%s report", kind, audience) return _failed_report_payload( meta, model, diff --git a/src/opencmo/web/routers/report.py b/src/opencmo/web/routers/report.py index 97137c5..6cd2ea2 100644 --- a/src/opencmo/web/routers/report.py +++ b/src/opencmo/web/routers/report.py @@ -5,7 +5,7 @@ import asyncio from fastapi import APIRouter, Request -from fastapi.responses import JSONResponse +from fastapi.responses import FileResponse, JSONResponse from opencmo import storage from opencmo.background import service as bg_service @@ -85,6 +85,16 @@ async def api_v1_report_detail(report_id: int): return JSONResponse(report) +@router.api_route("/report-assets/{asset_id}.svg", methods=["GET", "HEAD"]) +async def api_v1_report_asset(asset_id: str): + from opencmo.report_charts import get_report_asset_path + + asset_path = get_report_asset_path(asset_id) + if not asset_path or not asset_path.exists() or not asset_path.is_file(): + return JSONResponse({"error": "Not found"}, status_code=404) + return FileResponse(asset_path, media_type="image/svg+xml") + + @router.post("/projects/{project_id}/reports/{kind}/regenerate") async def api_v1_regenerate_report(project_id: int, kind: str, request: Request): project = await storage.get_project(project_id) diff --git a/tests/test_report_charts.py b/tests/test_report_charts.py new file mode 100644 index 0000000..dc7b875 --- /dev/null +++ b/tests/test_report_charts.py @@ -0,0 +1,134 @@ +from __future__ import annotations + +from fastapi.testclient import TestClient + +from opencmo.report_charts import build_report_charts, delete_chart_assets, get_report_asset_path +from opencmo.reports import ( + _normalize_report_headings, + _postprocess_human_report_content, + _simple_markdown_to_html, +) +from opencmo.web.app import app + + +def test_strategic_chart_builder_uses_real_fact_values(tmp_path, monkeypatch): + monkeypatch.setenv("OPENCMO_REPORT_ASSET_DIR", str(tmp_path)) + facts = { + "latest_scans": { + "seo": {"score": 0.82}, + "geo": {"score": 57}, + "community": {"total_hits": 12}, + }, + "citability": [{"avg_score": 0.41}], + "brand_presence": [{"footprint_score": 33}], + "serp_latest": [{"keyword": "ai cmo", "position": 4}], + "findings": [{"severity": "high"}], + "recommendations": [{"priority": "medium"}], + } + charts = build_report_charts("strategic", facts, {"sample_count": 3, "total_data_sources": 5}) + + assert charts + assert charts[0].markdown.startswith("![") + svg = get_report_asset_path(charts[0].asset_id).read_text(encoding="utf-8") + assert "SEO" in svg + assert "82" in svg + assert "57" in svg + + +def test_periodic_chart_builder_requires_two_points_for_trends(tmp_path, monkeypatch): + monkeypatch.setenv("OPENCMO_REPORT_ASSET_DIR", str(tmp_path)) + facts = { + "seo_history": [{"scanned_at": "2026-05-01T00:00:00", "score_performance": 0.7}], + "geo_history": [ + {"scanned_at": "2026-05-01T00:00:00", "geo_score": 40}, + {"scanned_at": "2026-05-02T00:00:00", "geo_score": 50}, + ], + "community_history": [], + "citability": [], + "findings": [], + "recommendations": [], + } + charts = build_report_charts("periodic", facts, {"sample_count": 2, "total_data_sources": 8}) + + assert [chart.title for chart in charts] == ["GEO 趋势"] + svg = get_report_asset_path(charts[0].asset_id).read_text(encoding="utf-8") + assert "40" in svg + assert "50" in svg + + +def test_report_heading_normalization_and_chart_section_insertion(): + content = "# 总标题\n\n## 1. 执行摘要\n\n正文\n\n#### 深层标题\n\n内容" + + normalized = _normalize_report_headings(content) + assert "####" not in normalized + assert "### 深层标题" in normalized + + processed = _postprocess_human_report_content(normalized, "### 图表\n![图](/api/v1/report-assets/abc.svg)") + assert "## 2. 数据图表速览" in processed + assert processed.count("# 总标题") == 1 + + +def test_simple_markdown_to_html_supports_images(): + asset_id = "a" * 32 + html = _simple_markdown_to_html(f"![关键指标](/api/v1/report-assets/{asset_id}.svg)") + + assert f'关键指标' in html + assert "
关键指标
" in html + + +def test_simple_markdown_to_html_rejects_external_images(): + html = _simple_markdown_to_html("![x](https://attacker.com/pixel.gif)") + + assert "![x](https://attacker.com/pixel.gif)

" in html + + +def test_simple_markdown_to_html_rejects_javascript_url(): + html = _simple_markdown_to_html("![x](javascript:alert(1))") + + assert "![x](javascript:alert(1))

" in html + + +def test_postprocess_skips_chart_section_when_already_referenced(): + asset_id = "b" * 32 + content = f"# 总标题\n\n## 二、数据图表速览\n\n正文 /api/v1/report-assets/{asset_id}.svg" + + processed = _postprocess_human_report_content(content, "### 图表\n![图](/api/v1/report-assets/c.svg)") + + assert processed == content + assert "## 2. 数据图表速览" not in processed + + +def test_delete_chart_assets_removes_files_and_ignores_missing(tmp_path, monkeypatch): + monkeypatch.setenv("OPENCMO_REPORT_ASSET_DIR", str(tmp_path)) + asset_ids = ["c" * 32, "d" * 32] + for asset_id in asset_ids: + (tmp_path / f"{asset_id}.svg").write_text("", encoding="utf-8") + + delete_chart_assets([*asset_ids, "e" * 32, "not-valid"]) + + assert not (tmp_path / f"{asset_ids[0]}.svg").exists() + assert not (tmp_path / f"{asset_ids[1]}.svg").exists() + + +def test_report_asset_route_serves_svg(tmp_path, monkeypatch): + monkeypatch.setenv("OPENCMO_REPORT_ASSET_DIR", str(tmp_path)) + asset_id = "a" * 32 + (tmp_path / f"{asset_id}.svg").write_text("", encoding="utf-8") + + response = TestClient(app).get(f"/api/v1/report-assets/{asset_id}.svg") + + assert response.status_code == 200 + assert response.headers["content-type"].startswith("image/svg+xml") + + +def test_report_asset_route_rejects_missing_or_invalid_assets(tmp_path, monkeypatch): + monkeypatch.setenv("OPENCMO_REPORT_ASSET_DIR", str(tmp_path)) + client = TestClient(app) + + invalid_response = client.get("/api/v1/report-assets/not-valid.svg") + missing_response = client.get(f"/api/v1/report-assets/{'f' * 32}.svg") + + assert invalid_response.status_code == 404 + assert missing_response.status_code == 404