diff --git a/astrbot/cli/__main__.py b/astrbot/cli/__main__.py index 6d48ec28d5..f10f14ca59 100644 --- a/astrbot/cli/__main__.py +++ b/astrbot/cli/__main__.py @@ -5,7 +5,7 @@ import click from . import __version__ -from .commands import conf, init, plug, run +from .commands import conf, init, migrate, plug, run logo_tmpl = r""" ___ _______.___________..______ .______ ______ .___________. @@ -54,6 +54,7 @@ def help(command_name: str | None) -> None: cli.add_command(help) cli.add_command(plug) cli.add_command(conf) +cli.add_command(migrate) if __name__ == "__main__": cli() diff --git a/astrbot/cli/commands/__init__.py b/astrbot/cli/commands/__init__.py index 1d3e0bca2f..fdabff70c4 100644 --- a/astrbot/cli/commands/__init__.py +++ b/astrbot/cli/commands/__init__.py @@ -1,6 +1,7 @@ from .cmd_conf import conf from .cmd_init import init +from .cmd_migrate import migrate from .cmd_plug import plug from .cmd_run import run -__all__ = ["conf", "init", "plug", "run"] +__all__ = ["conf", "init", "migrate", "plug", "run"] diff --git a/astrbot/cli/commands/cmd_migrate.py b/astrbot/cli/commands/cmd_migrate.py new file mode 100644 index 0000000000..1a77fb80bf --- /dev/null +++ b/astrbot/cli/commands/cmd_migrate.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from pathlib import Path + +import click + +from ..utils import get_astrbot_root +from ..utils.openclaw_migrate import run_openclaw_migration + + +@click.group(name="migrate") +def migrate() -> None: + """Data migration utilities for external runtimes.""" + + +@migrate.command(name="openclaw") +@click.option( + "--source", + "source_path", + type=click.Path(path_type=Path, file_okay=False, resolve_path=True), + default=None, + help="Path to OpenClaw root directory (default: ~/.openclaw).", +) +@click.option( + "--target", + "target_path", + type=click.Path(path_type=Path, file_okay=False, resolve_path=False), + default=None, + help=( + "Custom output directory. If omitted, writes to " + "data/migrations/openclaw/run-." + ), +) +@click.option( + "--dry-run", + is_flag=True, + default=False, + help="Preview migration candidates without writing files.", +) +def migrate_openclaw( + source_path: Path | None, + target_path: Path | None, + dry_run: bool, +) -> None: + """Migrate OpenClaw workspace snapshots into AstrBot migration artifacts.""" + + astrbot_root = get_astrbot_root() + source_root = source_path or (Path.home() / ".openclaw") + + report = run_openclaw_migration( + source_root=source_root, + astrbot_root=astrbot_root, + dry_run=dry_run, + target_dir=target_path, + ) + + click.echo("OpenClaw migration report:") + click.echo(f" Source root: {report.source_root}") + click.echo(f" Source workspace: {report.source_workspace}") + click.echo(f" Dry run: {report.dry_run}") + click.echo(f" Memory entries: {report.memory_entries_total}") + click.echo(f" - sqlite: {report.memory_entries_from_sqlite}") + click.echo(f" - markdown: {report.memory_entries_from_markdown}") + click.echo(f" Workspace files: {report.workspace_files_total}") + click.echo(f" Workspace size: {report.workspace_bytes_total} bytes") + click.echo(f" Config found: {report.config_found}") + + if dry_run: + click.echo("") + click.echo("Dry-run mode: no files were written.") + if target_path is not None: + click.echo("Note: --target is ignored when --dry-run is enabled.") + click.echo("Run without --dry-run to perform migration.") + return + + click.echo("") + click.echo(f"Migration output: {report.target_dir}") + click.echo(f" Copied files: {report.copied_workspace_files}") + click.echo(f" Imported memories: {report.copied_memory_entries}") + click.echo(f" Timeline written: {report.wrote_timeline}") + click.echo(f" Config TOML written: {report.wrote_config_toml}") + click.echo("Done.") + + +__all__ = ["migrate"] diff --git a/astrbot/cli/utils/openclaw_artifacts.py b/astrbot/cli/utils/openclaw_artifacts.py new file mode 100644 index 0000000000..a094de3f1f --- /dev/null +++ b/astrbot/cli/utils/openclaw_artifacts.py @@ -0,0 +1,169 @@ +from __future__ import annotations + +import datetime as dt +import json +import os +import shutil +from pathlib import Path +from typing import Any + +import click + +from .openclaw_models import MemoryEntry +from .openclaw_toml import json_to_toml + + +def _is_within(path: Path, parent: Path) -> bool: + try: + path.resolve().relative_to(parent.resolve()) + return True + except (OSError, ValueError): + return False + + +def collect_workspace_files( + workspace_dir: Path, *, exclude_dir: Path | None = None +) -> list[Path]: + files: list[Path] = [] + exclude_resolved = exclude_dir.resolve() if exclude_dir is not None else None + + for root, dirnames, filenames in os.walk( + workspace_dir, topdown=True, followlinks=False + ): + root_path = Path(root) + + pruned_dirs: list[str] = [] + for dirname in dirnames: + dir_path = root_path / dirname + if dir_path.is_symlink(): + continue + if exclude_resolved is not None and _is_within(dir_path, exclude_resolved): + continue + pruned_dirs.append(dirname) + dirnames[:] = pruned_dirs + + for filename in filenames: + path = root_path / filename + if path.is_symlink() or not path.is_file(): + continue + if exclude_resolved is not None and _is_within(path, exclude_resolved): + continue + files.append(path) + + return sorted(files) + + +def workspace_total_size(files: list[Path]) -> int: + total_bytes = 0 + for path in files: + try: + total_bytes += path.stat().st_size + except OSError: + # Best-effort accounting: files may disappear or become unreadable + # during migration scans. + continue + return total_bytes + + +def _write_jsonl(path: Path, entries: list[MemoryEntry]) -> None: + with path.open("w", encoding="utf-8") as fp: + for entry in entries: + fp.write( + json.dumps( + { + "key": entry.key, + "content": entry.content, + "category": entry.category, + "timestamp": entry.timestamp, + "source": entry.source, + }, + ensure_ascii=False, + ) + + "\n" + ) + + +def _write_timeline(path: Path, entries: list[MemoryEntry], source_root: Path) -> None: + ordered = sorted(entries, key=lambda e: (e.timestamp or "", e.order)) + + lines: list[str] = [] + lines.append("# OpenClaw Migration - Time Brief History") + lines.append("") + lines.append("> 时间简史(初步方案):按时间汇总可迁移记忆条目。") + lines.append("") + lines.append(f"- Generated at: {dt.datetime.now(dt.timezone.utc).isoformat()}") + lines.append(f"- Source: `{source_root}`") + lines.append(f"- Total entries: {len(ordered)}") + lines.append("") + lines.append("## Timeline") + lines.append("") + + for entry in ordered: + ts = entry.timestamp or "unknown" + snippet = entry.content.replace("\n", " ").strip() + if len(snippet) > 160: + snippet = snippet[:157] + "..." + safe_key = (entry.key or "").replace("`", "\\`") + safe_snippet = snippet.replace("`", "\\`") + lines.append(f"- [{ts}] ({entry.category}) `{safe_key}`: {safe_snippet}") + + lines.append("") + path.write_text("\n".join(lines), encoding="utf-8") + + +def write_migration_artifacts( + *, + workspace_dir: Path, + workspace_files: list[Path], + resolved_target: Path, + source_root: Path, + memory_entries: list[MemoryEntry], + config_obj: dict[str, Any] | None, + config_json_path: Path | None, +) -> tuple[int, int, bool, bool]: + workspace_target = resolved_target / "workspace" + workspace_target.mkdir(parents=True, exist_ok=True) + + copied_workspace_files = 0 + for src_file in workspace_files: + rel_path = src_file.relative_to(workspace_dir) + dst_file = workspace_target / rel_path + dst_file.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(src_file, dst_file) + copied_workspace_files += 1 + + copied_memory_entries = 0 + wrote_timeline = False + if memory_entries: + _write_jsonl(resolved_target / "memory_entries.jsonl", memory_entries) + copied_memory_entries = len(memory_entries) + _write_timeline( + resolved_target / "time_brief_history.md", + memory_entries, + source_root, + ) + wrote_timeline = True + + wrote_config_toml = False + if config_obj is not None: + (resolved_target / "config.original.json").write_text( + json.dumps(config_obj, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + try: + converted_toml = json_to_toml(config_obj) + except ValueError as exc: + source_hint = str(config_json_path) if config_json_path else "config JSON" + raise click.ClickException( + f"Failed to convert {source_hint} to TOML: {exc}" + ) from exc + (resolved_target / "config.migrated.toml").write_text( + converted_toml, + encoding="utf-8", + ) + wrote_config_toml = True + + return copied_workspace_files, copied_memory_entries, wrote_timeline, wrote_config_toml + + +__all__ = ["collect_workspace_files", "workspace_total_size", "write_migration_artifacts"] diff --git a/astrbot/cli/utils/openclaw_memory.py b/astrbot/cli/utils/openclaw_memory.py new file mode 100644 index 0000000000..fd440d80eb --- /dev/null +++ b/astrbot/cli/utils/openclaw_memory.py @@ -0,0 +1,291 @@ +from __future__ import annotations + +import datetime as dt +import sqlite3 +from pathlib import Path +from typing import Any + +import click + +from .openclaw_models import MemoryEntry + +SQLITE_KEY_CANDIDATES = ("key", "id", "name") +SQLITE_CONTENT_CANDIDATES = ("content", "value", "text", "memory") +SQLITE_CATEGORY_CANDIDATES = ("category", "kind", "type") +SQLITE_TS_CANDIDATES = ("updated_at", "created_at", "timestamp", "ts", "time") + + +def _pick_existing_column(columns: set[str], candidates: tuple[str, ...]) -> str | None: + for candidate in candidates: + if candidate in columns: + return candidate + return None + + +def _timestamp_from_epoch(raw: float | int | str) -> str | None: + try: + ts = float(raw) + if ts > 1e12: + ts /= 1000.0 + return dt.datetime.fromtimestamp(ts, tz=dt.timezone.utc).isoformat() + except Exception: + return None + + +def _normalize_timestamp(raw: Any) -> str | None: + if raw is None: + return None + + if isinstance(raw, (int, float)): + normalized = _timestamp_from_epoch(raw) + return normalized if normalized is not None else str(raw) + + text = str(raw).strip() + if not text: + return None + + if text.isdigit(): + normalized = _timestamp_from_epoch(text) + return normalized if normalized is not None else text + + maybe_iso = text.replace("Z", "+00:00") + try: + parsed = dt.datetime.fromisoformat(maybe_iso) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=dt.timezone.utc) + return parsed.isoformat() + except Exception: + return text + + +def _normalize_key(raw: Any, fallback_idx: int) -> str: + text = str(raw).strip() if raw is not None else "" + if text: + return text + return f"openclaw_{fallback_idx}" + + +def _parse_structured_line(line: str) -> tuple[str, str] | None: + if not line.startswith("**"): + return None + rest = line[2:] + marker = "**:" + marker_idx = rest.find(marker) + if marker_idx <= 0: + return None + key = rest[:marker_idx].strip() + value = rest[marker_idx + len(marker) :].strip() + if not key or not value: + return None + return key, value + + +def _discover_memory_columns( + cursor: sqlite3.Cursor, db_path: Path +) -> tuple[str, str, str | None, str | None]: + table_info_rows = cursor.execute("PRAGMA table_info(memories)").fetchall() + columns_in_order = [ + str(row[1]).strip().lower() + for row in table_info_rows + if str(row[1]).strip() + ] + columns = set(columns_in_order) + + key_col = _pick_existing_column(columns, SQLITE_KEY_CANDIDATES) + if key_col is None: + pk_columns = sorted( + ( + (int(row[5]), str(row[1]).strip().lower()) + for row in table_info_rows + if int(row[5]) > 0 and str(row[1]).strip() + ), + key=lambda item: item[0], + ) + if pk_columns: + key_col = pk_columns[0][1] + if key_col is None: + try: + cursor.execute("SELECT rowid FROM memories LIMIT 1").fetchone() + key_col = "rowid" + except sqlite3.Error: + key_col = columns_in_order[0] if columns_in_order else None + + content_col = _pick_existing_column(columns, SQLITE_CONTENT_CANDIDATES) + if content_col is None: + raise click.ClickException( + f"OpenClaw sqlite exists at {db_path}, but no content-like column found" + ) + if key_col is None: + raise click.ClickException( + f"OpenClaw sqlite exists at {db_path}, but no key-like or usable fallback column found" + ) + category_col = _pick_existing_column(columns, SQLITE_CATEGORY_CANDIDATES) + ts_col = _pick_existing_column(columns, SQLITE_TS_CANDIDATES) + return key_col, content_col, category_col, ts_col + + +def _read_openclaw_sqlite_entries(db_path: Path) -> list[MemoryEntry]: + if not db_path.exists(): + return [] + + conn: sqlite3.Connection | None = None + try: + db_uri = f"{db_path.resolve().as_uri()}?mode=ro" + conn = sqlite3.connect(db_uri, uri=True) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + table_exists = cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='memories' LIMIT 1" + ).fetchone() + if table_exists is None: + return [] + + key_col, content_col, category_col, ts_col = _discover_memory_columns( + cursor, db_path + ) + + select_clauses = [ + f"{key_col} AS __key__", + f"{content_col} AS __content__", + ( + f"{category_col} AS __category__" + if category_col is not None + else "'core' AS __category__" + ), + f"{ts_col} AS __timestamp__" if ts_col is not None else "NULL AS __timestamp__", + ] + order_by_clause = ( + " ORDER BY __timestamp__ ASC, __key__ ASC" + if ts_col is not None + else " ORDER BY __key__ ASC" + ) + rows = cursor.execute( + "SELECT " + ", ".join(select_clauses) + " FROM memories" + order_by_clause + ).fetchall() + + entries: list[MemoryEntry] = [] + for idx, row in enumerate(rows): + content = str(row["__content__"] or "").strip() + if not content: + continue + + entries.append( + MemoryEntry( + key=_normalize_key(row["__key__"], idx), + content=content, + category=str(row["__category__"] or "core").strip().lower() or "core", + timestamp=_normalize_timestamp(row["__timestamp__"]), + source=f"sqlite:{db_path}", + order=idx, + ) + ) + + return entries + except sqlite3.Error as exc: + raise click.ClickException( + f"Failed to read OpenClaw sqlite at {db_path}: {exc}" + ) from exc + finally: + if conn is not None: + conn.close() + + +def _parse_markdown_file( + path: Path, default_category: str, stem: str, order_offset: int +) -> list[MemoryEntry]: + content = path.read_text(encoding="utf-8", errors="replace") + mtime = _normalize_timestamp(path.stat().st_mtime) + entries: list[MemoryEntry] = [] + line_no = 0 + for raw_line in content.splitlines(): + line_no += 1 + stripped = raw_line.strip() + if not stripped or stripped.startswith("#"): + continue + + line = stripped[2:] if stripped.startswith("- ") else stripped + parsed = _parse_structured_line(line) + if parsed is not None: + key, text = parsed + key = _normalize_key(key, line_no) + body = text.strip() + else: + key = f"openclaw_{stem}_{line_no}" + body = line.strip() + + if not body: + continue + + entries.append( + MemoryEntry( + key=key, + content=body, + category=default_category, + timestamp=mtime, + source=f"markdown:{path}", + order=order_offset + len(entries), + ) + ) + return entries + + +def _read_openclaw_markdown_entries(workspace_dir: Path) -> list[MemoryEntry]: + entries: list[MemoryEntry] = [] + + core_path = workspace_dir / "MEMORY.md" + if core_path.exists(): + entries.extend( + _parse_markdown_file( + core_path, + default_category="core", + stem="core", + order_offset=len(entries), + ) + ) + + daily_dir = workspace_dir / "memory" + if daily_dir.exists(): + for md_path in sorted(daily_dir.glob("*.md")): + stem = md_path.stem or "daily" + entries.extend( + _parse_markdown_file( + md_path, + default_category="daily", + stem=stem, + order_offset=len(entries), + ) + ) + + return entries + + +def _dedup_entries(entries: list[MemoryEntry]) -> list[MemoryEntry]: + seen_exact: set[tuple[str, str, str, str]] = set() + seen_semantic: set[tuple[str, str]] = set() + deduped: list[MemoryEntry] = [] + + for item in entries: + exact_key = ( + item.key.strip(), + item.content.strip(), + item.category.strip(), + item.timestamp or "", + ) + semantic_key = (item.content.strip(), item.category.strip()) + if exact_key in seen_exact or semantic_key in seen_semantic: + continue + seen_exact.add(exact_key) + seen_semantic.add(semantic_key) + deduped.append(item) + + return deduped + + +def collect_memory_entries(workspace_dir: Path) -> tuple[list[MemoryEntry], int, int]: + sqlite_entries = _read_openclaw_sqlite_entries(workspace_dir / "memory" / "brain.db") + markdown_entries = _read_openclaw_markdown_entries(workspace_dir) + memory_entries = _dedup_entries([*sqlite_entries, *markdown_entries]) + return memory_entries, len(sqlite_entries), len(markdown_entries) + + +__all__ = ["collect_memory_entries"] diff --git a/astrbot/cli/utils/openclaw_migrate.py b/astrbot/cli/utils/openclaw_migrate.py new file mode 100644 index 0000000000..bb9231cc99 --- /dev/null +++ b/astrbot/cli/utils/openclaw_migrate.py @@ -0,0 +1,155 @@ +from __future__ import annotations + +import datetime as dt +import json +from dataclasses import asdict +from pathlib import Path +from typing import Any + +import click + +from .basic import check_astrbot_root +from .openclaw_artifacts import ( + collect_workspace_files, + workspace_total_size, + write_migration_artifacts, +) +from .openclaw_memory import collect_memory_entries +from .openclaw_models import MemoryEntry, MigrationReport + + +def _find_source_workspace(source_root: Path) -> Path: + candidate = source_root / "workspace" + if candidate.exists() and candidate.is_dir(): + return candidate + return source_root + + +def _find_openclaw_config_json(source_root: Path, workspace_dir: Path) -> Path | None: + candidates = [ + source_root / "config.json", + source_root / "settings.json", + workspace_dir / "config.json", + workspace_dir / "settings.json", + ] + for candidate in candidates: + if candidate.exists() and candidate.is_file(): + return candidate + return None + + +def _load_json_or_raise(path: Path) -> dict[str, Any]: + try: + return json.loads(path.read_text(encoding="utf-8")) + except json.JSONDecodeError as exc: + raise click.ClickException( + f"Failed to parse OpenClaw config JSON at {path}: {exc.msg} " + f"(line {exc.lineno}, column {exc.colno})" + ) from exc + + +def _resolve_explicit_target_dir( + astrbot_root: Path, target_dir: Path | None +) -> Path | None: + if target_dir is None: + return None + return target_dir if target_dir.is_absolute() else (astrbot_root / target_dir) + + +def _resolve_output_target_dir( + astrbot_root: Path, target_dir: Path | None, dry_run: bool +) -> Path | None: + if dry_run: + return None + explicit_target = _resolve_explicit_target_dir(astrbot_root, target_dir) + if explicit_target is not None: + return explicit_target + run_id = dt.datetime.now().strftime("%Y%m%d-%H%M%S") + return astrbot_root / "data" / "migrations" / "openclaw" / f"run-{run_id}" + + +def run_openclaw_migration( + *, + source_root: Path, + astrbot_root: Path, + dry_run: bool = False, + target_dir: Path | None = None, +) -> MigrationReport: + if not source_root.exists() or not source_root.is_dir(): + raise click.ClickException(f"OpenClaw source not found: {source_root}") + + if not check_astrbot_root(astrbot_root): + raise click.ClickException( + f"{astrbot_root} is not a valid AstrBot root. Run from initialized AstrBot root." + ) + + workspace_dir = _find_source_workspace(source_root) + memory_entries, from_sqlite, from_markdown = collect_memory_entries(workspace_dir) + + explicit_target_dir = _resolve_explicit_target_dir(astrbot_root, target_dir) + workspace_files = collect_workspace_files( + workspace_dir, + exclude_dir=explicit_target_dir, + ) + workspace_total_bytes = workspace_total_size(workspace_files) + + config_json_path = _find_openclaw_config_json(source_root, workspace_dir) + config_obj: dict[str, Any] | None = None + if config_json_path is not None: + config_obj = _load_json_or_raise(config_json_path) + + resolved_target = _resolve_output_target_dir(astrbot_root, target_dir, dry_run) + + copied_workspace_files = 0 + copied_memory_entries = 0 + wrote_timeline = False + wrote_config_toml = False + + if not dry_run and resolved_target is not None: + resolved_target.mkdir(parents=True, exist_ok=True) + ( + copied_workspace_files, + copied_memory_entries, + wrote_timeline, + wrote_config_toml, + ) = write_migration_artifacts( + workspace_dir=workspace_dir, + workspace_files=workspace_files, + resolved_target=resolved_target, + source_root=source_root, + memory_entries=memory_entries, + config_obj=config_obj, + config_json_path=config_json_path, + ) + + report = MigrationReport( + source_root=str(source_root), + source_workspace=str(workspace_dir), + target_dir=str(resolved_target) if resolved_target else None, + dry_run=dry_run, + memory_entries_total=len(memory_entries), + memory_entries_from_sqlite=from_sqlite, + memory_entries_from_markdown=from_markdown, + workspace_files_total=len(workspace_files), + workspace_bytes_total=workspace_total_bytes, + config_found=config_obj is not None, + copied_workspace_files=copied_workspace_files, + copied_memory_entries=copied_memory_entries, + wrote_timeline=wrote_timeline, + wrote_config_toml=wrote_config_toml, + ) + + if not dry_run and resolved_target is not None: + (resolved_target / "migration_summary.json").write_text( + json.dumps(asdict(report), ensure_ascii=False, indent=2), + encoding="utf-8", + ) + + return report + + +__all__ = [ + "MemoryEntry", + "MigrationReport", + "run_openclaw_migration", +] diff --git a/astrbot/cli/utils/openclaw_models.py b/astrbot/cli/utils/openclaw_models.py new file mode 100644 index 0000000000..3503b8c1e6 --- /dev/null +++ b/astrbot/cli/utils/openclaw_models.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(slots=True) +class MemoryEntry: + key: str + content: str + category: str + timestamp: str | None + source: str + order: int + + +@dataclass(slots=True) +class MigrationReport: + source_root: str + source_workspace: str + target_dir: str | None + dry_run: bool + memory_entries_total: int + memory_entries_from_sqlite: int + memory_entries_from_markdown: int + workspace_files_total: int + workspace_bytes_total: int + config_found: bool + copied_workspace_files: int + copied_memory_entries: int + wrote_timeline: bool + wrote_config_toml: bool + + +__all__ = ["MemoryEntry", "MigrationReport"] diff --git a/astrbot/cli/utils/openclaw_toml.py b/astrbot/cli/utils/openclaw_toml.py new file mode 100644 index 0000000000..090c682fd5 --- /dev/null +++ b/astrbot/cli/utils/openclaw_toml.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +import math +from typing import Any + +# TOML has no null literal. Keep this centralized so behavior is explicit and +# easy to adjust in future migrations. +NULL_SENTINEL = "__ASTRBOT_OPENCLAW_NULL_SENTINEL_V1__" + + +def _toml_quote(value: str) -> str: + escaped = value.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n") + return f'"{escaped}"' + + +def _format_toml_path(path: list[str]) -> str: + return ".".join(_toml_quote(str(part)) for part in path) + + +def _classify_items( + obj: dict[str, Any], +) -> tuple[ + list[tuple[str, Any]], + list[tuple[str, dict[str, Any]]], + list[tuple[str, list[dict[str, Any]]]], +]: + scalar_items: list[tuple[str, Any]] = [] + nested_dicts: list[tuple[str, dict[str, Any]]] = [] + array_tables: list[tuple[str, list[dict[str, Any]]]] = [] + + for key, value in obj.items(): + key_text = str(key) + if isinstance(value, dict): + nested_dicts.append((key_text, value)) + elif isinstance(value, list) and value and all(isinstance(item, dict) for item in value): + array_tables.append((key_text, value)) + else: + scalar_items.append((key_text, value)) + + return scalar_items, nested_dicts, array_tables + + +def _toml_literal(value: Any) -> str: + if value is None: + # TOML has no null literal; preserve previous output contract. + return _toml_quote(NULL_SENTINEL) + if isinstance(value, bool): + return "true" if value else "false" + if isinstance(value, int): + return str(value) + if isinstance(value, float): + if not math.isfinite(value): + # TOML 1.0 does not allow NaN/Infinity. + raise ValueError(f"non-finite float value is not TOML-compatible: {value}") + return repr(value) + if isinstance(value, str): + return _toml_quote(value) + if isinstance(value, list): + return "[" + ", ".join(_toml_literal(v) for v in value) + "]" + if isinstance(value, dict): + pairs = ", ".join( + f"{_toml_quote(str(k))} = {_toml_literal(v)}" for k, v in value.items() + ) + return "{ " + pairs + " }" + return _toml_quote(str(value)) + + +def json_to_toml(data: dict[str, Any]) -> str: + """Serialize a JSON-like dict to TOML text used by migration snapshots. + + Notes: + - Empty lists are emitted as `key = []`. + - Only non-empty `list[dict]` values are emitted as array-of-tables. + For empty lists we intentionally preserve literal emptiness because the + element schema is unknown at serialization time. + """ + lines: list[str] = [] + + def emit_table(obj: dict[str, Any], path: list[str]) -> None: + scalar_items, nested_dicts, array_tables = _classify_items(obj) + + if path: + lines.append(f"[{_format_toml_path(path)}]") + for key, value in scalar_items: + lines.append(f"{_toml_quote(key)} = {_toml_literal(value)}") + if scalar_items and (nested_dicts or array_tables): + lines.append("") + + for idx, (key, value) in enumerate(nested_dicts): + emit_table(value, [*path, key]) + if idx != len(nested_dicts) - 1 or array_tables: + lines.append("") + + for t_idx, (key, items) in enumerate(array_tables): + table_path = [*path, key] + for item in items: + lines.append(f"[[{_format_toml_path(table_path)}]]") + for sub_key, sub_value in item.items(): + lines.append(f"{_toml_quote(str(sub_key))} = {_toml_literal(sub_value)}") + lines.append("") + if t_idx == len(array_tables) - 1 and lines and lines[-1] == "": + lines.pop() + + emit_table(data, []) + if not lines: + return "" + return "\n".join(lines).rstrip() + "\n" + + +__all__ = ["NULL_SENTINEL", "json_to_toml"] diff --git a/docs/en/deploy/astrbot/cli.md b/docs/en/deploy/astrbot/cli.md index 857e0d6a61..ecac9ae82b 100644 --- a/docs/en/deploy/astrbot/cli.md +++ b/docs/en/deploy/astrbot/cli.md @@ -90,3 +90,17 @@ If there are no errors, you will see a log message similar to `🌈 Dashboard st Next, you need to deploy any messaging platform to use AstrBot on that platform. + +## OpenClaw Migration (Preliminary) + +If you previously used OpenClaw, AstrBot now provides a preliminary migration command that can import OpenClaw workspace snapshots into AstrBot migration artifacts (including memory entries, workspace files, config conversion, and a generated `time_brief_history.md`): + +```bash +# Preview only (no writes to disk) +astrbot migrate openclaw --dry-run + +# Execute migration +astrbot migrate openclaw +``` + +By default, AstrBot reads from `~/.openclaw` and writes migration artifacts to `data/migrations/openclaw/run-` under your AstrBot root. diff --git a/docs/zh/deploy/astrbot/cli.md b/docs/zh/deploy/astrbot/cli.md index 623eb583fb..44392d857a 100644 --- a/docs/zh/deploy/astrbot/cli.md +++ b/docs/zh/deploy/astrbot/cli.md @@ -90,3 +90,17 @@ python main.py 接下来,你需要部署任何一个消息平台,才能够实现在消息平台上使用 AstrBot。 + +## OpenClaw 迁移(初步方案) + +如果你之前使用过 OpenClaw,AstrBot 现在提供了一个初步迁移命令,可将 OpenClaw 工作区快照导入为 AstrBot 迁移产物(包含记忆条目、工作区文件、配置转换和自动生成的 `time_brief_history.md`): + +```bash +# 仅预览(不写入) +astrbot migrate openclaw --dry-run + +# 执行迁移 +astrbot migrate openclaw +``` + +默认从 `~/.openclaw` 读取,并将迁移结果写入 AstrBot 根目录下 `data/migrations/openclaw/run-`。 diff --git a/tests/unit/test_cli_cmd_migrate.py b/tests/unit/test_cli_cmd_migrate.py new file mode 100644 index 0000000000..27bf35caa9 --- /dev/null +++ b/tests/unit/test_cli_cmd_migrate.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +from pathlib import Path + +from click.testing import CliRunner + +from astrbot.cli.commands import cmd_migrate +from astrbot.cli.utils.openclaw_migrate import MigrationReport + + +def test_migrate_openclaw_reports_config_toml_field_and_relative_target( + monkeypatch, + tmp_path: Path, +) -> None: + source_root = tmp_path / ".openclaw" + source_root.mkdir(parents=True) + astrbot_root = tmp_path / "astrbot" + astrbot_root.mkdir(parents=True) + + captured: dict[str, object] = {} + + def _fake_run_openclaw_migration(**kwargs: object) -> MigrationReport: + captured.update(kwargs) + return MigrationReport( + source_root=str(source_root), + source_workspace=str(source_root / "workspace"), + target_dir=str(astrbot_root / "data" / "migrations" / "openclaw" / "run-test"), + dry_run=False, + memory_entries_total=3, + memory_entries_from_sqlite=2, + memory_entries_from_markdown=1, + workspace_files_total=5, + workspace_bytes_total=1024, + config_found=True, + copied_workspace_files=5, + copied_memory_entries=3, + wrote_timeline=False, + wrote_config_toml=True, + ) + + monkeypatch.setattr(cmd_migrate, "get_astrbot_root", lambda: astrbot_root) + monkeypatch.setattr(cmd_migrate, "run_openclaw_migration", _fake_run_openclaw_migration) + + runner = CliRunner() + result = runner.invoke( + cmd_migrate.migrate, + ["openclaw", "--source", str(source_root), "--target", "data/migrations/custom"], + ) + + assert result.exit_code == 0, result.output + assert captured["target_dir"] == Path("data/migrations/custom") + assert "Timeline written: False" in result.output + assert "Config TOML written: True" in result.output + + +def test_migrate_openclaw_dry_run_explicit_target_prints_ignore_note( + monkeypatch, + tmp_path: Path, +) -> None: + source_root = tmp_path / ".openclaw" + source_root.mkdir(parents=True) + astrbot_root = tmp_path / "astrbot" + astrbot_root.mkdir(parents=True) + + captured: dict[str, object] = {} + + def _fake_run_openclaw_migration(**kwargs: object) -> MigrationReport: + captured.update(kwargs) + return MigrationReport( + source_root=str(source_root), + source_workspace=str(source_root / "workspace"), + target_dir=None, + dry_run=True, + memory_entries_total=0, + memory_entries_from_sqlite=0, + memory_entries_from_markdown=0, + workspace_files_total=0, + workspace_bytes_total=0, + config_found=False, + copied_workspace_files=0, + copied_memory_entries=0, + wrote_timeline=False, + wrote_config_toml=False, + ) + + monkeypatch.setattr(cmd_migrate, "get_astrbot_root", lambda: astrbot_root) + monkeypatch.setattr(cmd_migrate, "run_openclaw_migration", _fake_run_openclaw_migration) + + runner = CliRunner() + result = runner.invoke( + cmd_migrate.migrate, + [ + "openclaw", + "--source", + str(source_root), + "--dry-run", + "--target", + "data/migrations/custom", + ], + ) + + assert result.exit_code == 0, result.output + assert captured["target_dir"] == Path("data/migrations/custom") + assert "Dry-run mode: no files were written." in result.output + assert "Note: --target is ignored when --dry-run is enabled." in result.output diff --git a/tests/unit/test_cli_openclaw_migrate.py b/tests/unit/test_cli_openclaw_migrate.py new file mode 100644 index 0000000000..fc17173a08 --- /dev/null +++ b/tests/unit/test_cli_openclaw_migrate.py @@ -0,0 +1,553 @@ +from __future__ import annotations + +import json +import sqlite3 +from pathlib import Path + +import pytest + +from astrbot.cli.utils.openclaw_migrate import run_openclaw_migration +from astrbot.cli.utils.openclaw_toml import json_to_toml + + +def _prepare_astrbot_root(root: Path) -> None: + (root / ".astrbot").touch() + (root / "data").mkdir(parents=True, exist_ok=True) + + +def _prepare_openclaw_source(source_root: Path) -> None: + workspace = source_root / "workspace" + (workspace / "memory").mkdir(parents=True, exist_ok=True) + (workspace / "notes").mkdir(parents=True, exist_ok=True) + + db_path = workspace / "memory" / "brain.db" + conn = sqlite3.connect(db_path) + try: + conn.execute( + "CREATE TABLE memories (id TEXT, value TEXT, type TEXT, updated_at INTEGER)" + ) + conn.execute( + "INSERT INTO memories (id, value, type, updated_at) VALUES (?, ?, ?, ?)", + ("user_pref", "likes rust", "core", 1700000000), + ) + conn.commit() + finally: + conn.close() + + (workspace / "MEMORY.md").write_text( + "# Memory\n- **style**: concise\n- keep logs\n", + encoding="utf-8", + ) + (workspace / "memory" / "2026-03-20.md").write_text( + "- **todo**: migrate artifacts\n", + encoding="utf-8", + ) + (workspace / "notes" / "readme.txt").write_text( + "workspace artifact", + encoding="utf-8", + ) + (source_root / "config.json").write_text( + json.dumps( + { + "model": "gpt-4.1-mini", + "memory": {"enabled": True, "limit": 4096}, + "skills": [{"name": "planner", "enabled": True}], + } + ), + encoding="utf-8", + ) + + +def _read_migrated_memory_entries(target_dir: Path) -> list[dict[str, str | None]]: + memory_jsonl = target_dir / "memory_entries.jsonl" + entries: list[dict[str, str | None]] = [] + for line in memory_jsonl.read_text(encoding="utf-8").splitlines(): + if not line.strip(): + continue + payload = json.loads(line) + entries.append(payload) + return entries + + +def test_migration_supports_legacy_sqlite_columns(tmp_path: Path) -> None: + source_root = tmp_path / ".openclaw" + workspace = source_root / "workspace" + db_dir = workspace / "memory" + db_dir.mkdir(parents=True) + (workspace / "notes").mkdir(parents=True, exist_ok=True) + (workspace / "MEMORY.md").write_text("", encoding="utf-8") + + db_path = db_dir / "brain.db" + conn = sqlite3.connect(db_path) + try: + conn.execute( + "CREATE TABLE memories (id TEXT, value TEXT, type TEXT, updated_at INTEGER)" + ) + conn.execute( + "INSERT INTO memories (id, value, type, updated_at) VALUES (?, ?, ?, ?)", + ("legacy_key", "legacy_value", "daily", 1700000000), + ) + conn.commit() + finally: + conn.close() + + astrbot_root = tmp_path / "astrbot" + astrbot_root.mkdir(parents=True) + _prepare_astrbot_root(astrbot_root) + + report = run_openclaw_migration( + source_root=source_root, + astrbot_root=astrbot_root, + dry_run=False, + target_dir=Path("data/migrations/openclaw/test-legacy-sqlite"), + ) + + assert report.target_dir is not None + entries = _read_migrated_memory_entries(Path(report.target_dir)) + assert len(entries) == 1 + assert entries[0].get("key") == "legacy_key" + assert entries[0].get("content") == "legacy_value" + assert entries[0].get("category") == "daily" + assert entries[0].get("timestamp") is not None + + +def test_migration_handles_without_rowid_memories_table(tmp_path: Path) -> None: + source_root = tmp_path / ".openclaw" + workspace = source_root / "workspace" + db_dir = workspace / "memory" + db_dir.mkdir(parents=True) + (workspace / "notes").mkdir(parents=True, exist_ok=True) + (workspace / "MEMORY.md").write_text("", encoding="utf-8") + + db_path = db_dir / "brain.db" + conn = sqlite3.connect(db_path) + try: + conn.execute( + """ + CREATE TABLE memories ( + value TEXT NOT NULL, + type TEXT NOT NULL, + updated_at INTEGER, + PRIMARY KEY (value, type) + ) WITHOUT ROWID + """ + ) + conn.execute( + "INSERT INTO memories (value, type, updated_at) VALUES (?, ?, ?)", + ("without-rowid-content", "core", 1700000000), + ) + conn.commit() + finally: + conn.close() + + astrbot_root = tmp_path / "astrbot" + astrbot_root.mkdir(parents=True) + _prepare_astrbot_root(astrbot_root) + + report = run_openclaw_migration( + source_root=source_root, + astrbot_root=astrbot_root, + dry_run=False, + target_dir=Path("data/migrations/openclaw/test-without-rowid"), + ) + + assert report.target_dir is not None + entries = _read_migrated_memory_entries(Path(report.target_dir)) + assert len(entries) == 1 + assert entries[0].get("content") == "without-rowid-content" + assert entries[0].get("category") == "core" + assert entries[0].get("key") == "without-rowid-content" + + +def test_run_openclaw_migration_dry_run(tmp_path: Path) -> None: + source_root = tmp_path / ".openclaw" + source_root.mkdir(parents=True) + _prepare_openclaw_source(source_root) + + astrbot_root = tmp_path / "astrbot" + astrbot_root.mkdir(parents=True) + _prepare_astrbot_root(astrbot_root) + + report = run_openclaw_migration( + source_root=source_root, + astrbot_root=astrbot_root, + dry_run=True, + ) + + assert report.dry_run is True + assert report.memory_entries_total >= 3 + assert report.workspace_files_total >= 3 + assert report.config_found is True + assert report.target_dir is None + assert not (astrbot_root / "data" / "migrations" / "openclaw").exists() + + +def test_run_openclaw_migration_dry_run_with_explicit_target_reports_none( + tmp_path: Path, +) -> None: + source_root = tmp_path / ".openclaw" + source_root.mkdir(parents=True) + _prepare_openclaw_source(source_root) + + astrbot_root = tmp_path / "astrbot" + astrbot_root.mkdir(parents=True) + _prepare_astrbot_root(astrbot_root) + + explicit_target = astrbot_root / "data" / "migrations" / "openclaw" / "dry-run-target" + report = run_openclaw_migration( + source_root=source_root, + astrbot_root=astrbot_root, + dry_run=True, + target_dir=explicit_target, + ) + + assert report.dry_run is True + assert report.target_dir is None + assert not explicit_target.exists() + + +def test_run_openclaw_migration_writes_artifacts(tmp_path: Path) -> None: + source_root = tmp_path / ".openclaw" + source_root.mkdir(parents=True) + _prepare_openclaw_source(source_root) + + astrbot_root = tmp_path / "astrbot" + astrbot_root.mkdir(parents=True) + _prepare_astrbot_root(astrbot_root) + + report = run_openclaw_migration( + source_root=source_root, + astrbot_root=astrbot_root, + dry_run=False, + target_dir=Path("data/migrations/openclaw/test-run"), + ) + + assert report.dry_run is False + assert report.target_dir is not None + target = Path(report.target_dir) + assert target.exists() + + assert (target / "migration_summary.json").exists() + assert (target / "memory_entries.jsonl").exists() + assert (target / "time_brief_history.md").exists() + assert (target / "config.original.json").exists() + assert (target / "config.migrated.toml").exists() + assert (target / "workspace" / "notes" / "readme.txt").exists() + + timeline = (target / "time_brief_history.md").read_text(encoding="utf-8") + assert "Time Brief History" in timeline + assert "时间简史" in timeline + + toml_text = (target / "config.migrated.toml").read_text(encoding="utf-8") + assert '"model" = ' in toml_text + assert '["memory"]' in toml_text + assert '[["skills"]]' in toml_text + + +def test_run_openclaw_migration_writes_to_default_timestamp_target(tmp_path: Path) -> None: + source_root = tmp_path / ".openclaw" + source_root.mkdir(parents=True) + _prepare_openclaw_source(source_root) + + astrbot_root = tmp_path / "astrbot" + astrbot_root.mkdir(parents=True) + _prepare_astrbot_root(astrbot_root) + + report = run_openclaw_migration( + source_root=source_root, + astrbot_root=astrbot_root, + dry_run=False, + target_dir=None, + ) + + assert report.target_dir is not None + target = Path(report.target_dir) + assert target.exists() + expected_root = astrbot_root / "data" / "migrations" / "openclaw" + assert target.parent == expected_root + assert target.name.startswith("run-") + + +def test_run_openclaw_migration_excludes_target_inside_workspace(tmp_path: Path) -> None: + source_root = tmp_path / ".openclaw" + source_root.mkdir(parents=True) + _prepare_openclaw_source(source_root) + + workspace = source_root / "workspace" + target_inside_workspace = workspace / "snapshot-output" + target_inside_workspace.mkdir(parents=True, exist_ok=True) + (target_inside_workspace / "stale.txt").write_text("old artifact", encoding="utf-8") + + astrbot_root = tmp_path / "astrbot" + astrbot_root.mkdir(parents=True) + _prepare_astrbot_root(astrbot_root) + + report = run_openclaw_migration( + source_root=source_root, + astrbot_root=astrbot_root, + dry_run=False, + target_dir=target_inside_workspace, + ) + + assert report.target_dir is not None + target = Path(report.target_dir) + assert target == target_inside_workspace + + # Files from the output directory itself must not be re-copied into snapshot workspace. + assert not (target / "workspace" / "snapshot-output" / "stale.txt").exists() + + +def test_run_openclaw_migration_does_not_follow_symlinked_workspace_dirs( + tmp_path: Path, +) -> None: + source_root = tmp_path / ".openclaw" + source_root.mkdir(parents=True) + _prepare_openclaw_source(source_root) + + workspace = source_root / "workspace" + external_dir = tmp_path / "external-data" + external_dir.mkdir(parents=True, exist_ok=True) + (external_dir / "outside.txt").write_text("outside", encoding="utf-8") + + symlink_dir = workspace / "symlinked-outside" + try: + symlink_dir.symlink_to(external_dir, target_is_directory=True) + except (NotImplementedError, OSError) as exc: + pytest.skip(f"symlink unsupported in test environment: {exc}") + + astrbot_root = tmp_path / "astrbot" + astrbot_root.mkdir(parents=True) + _prepare_astrbot_root(astrbot_root) + + report = run_openclaw_migration( + source_root=source_root, + astrbot_root=astrbot_root, + dry_run=False, + target_dir=Path("data/migrations/openclaw/test-symlink-scan"), + ) + + assert report.target_dir is not None + target = Path(report.target_dir) + assert not (target / "workspace" / "symlinked-outside" / "outside.txt").exists() + + +def test_markdown_parsing_structured_and_plain_lines(tmp_path: Path) -> None: + source_root = tmp_path / ".openclaw" + source_root.mkdir(parents=True) + _prepare_openclaw_source(source_root) + + astrbot_root = tmp_path / "astrbot" + astrbot_root.mkdir(parents=True) + _prepare_astrbot_root(astrbot_root) + + report = run_openclaw_migration( + source_root=source_root, + astrbot_root=astrbot_root, + dry_run=False, + target_dir=Path("data/migrations/openclaw/test-markdown"), + ) + assert report.target_dir is not None + entries = _read_migrated_memory_entries(Path(report.target_dir)) + + memory_md_entries = [ + entry + for entry in entries + if str(entry.get("source", "")).endswith("workspace/MEMORY.md") + ] + style_entries = [entry for entry in memory_md_entries if entry.get("key") == "style"] + assert len(style_entries) == 1 + assert style_entries[0].get("content") == "concise" + + plain_entries = [ + entry for entry in memory_md_entries if entry.get("content") == "keep logs" + ] + assert len(plain_entries) == 1 + assert str(plain_entries[0].get("key", "")).startswith("openclaw_core_") + + +def test_deduplication_between_sqlite_and_markdown_preserves_order( + tmp_path: Path, +) -> None: + source_root = tmp_path / ".openclaw" + source_root.mkdir(parents=True) + _prepare_openclaw_source(source_root) + + memory_md = source_root / "workspace" / "MEMORY.md" + memory_md.write_text( + memory_md.read_text(encoding="utf-8") + "- likes rust\n", + encoding="utf-8", + ) + + astrbot_root = tmp_path / "astrbot" + astrbot_root.mkdir(parents=True) + _prepare_astrbot_root(astrbot_root) + + report = run_openclaw_migration( + source_root=source_root, + astrbot_root=astrbot_root, + dry_run=False, + target_dir=Path("data/migrations/openclaw/test-dedup"), + ) + assert report.target_dir is not None + entries = _read_migrated_memory_entries(Path(report.target_dir)) + contents = [str(entry.get("content", "")) for entry in entries] + + assert contents.count("likes rust") == 1 + assert contents.index("likes rust") < contents.index("keep logs") + + +def test_run_openclaw_migration_invalid_config_json_raises_click_exception( + tmp_path: Path, +) -> None: + source_root = tmp_path / ".openclaw" + source_root.mkdir(parents=True) + _prepare_openclaw_source(source_root) + (source_root / "config.json").write_text("{ invalid json", encoding="utf-8") + + astrbot_root = tmp_path / "astrbot" + astrbot_root.mkdir(parents=True) + _prepare_astrbot_root(astrbot_root) + + import click + import pytest + + with pytest.raises(click.ClickException) as exc_info: + run_openclaw_migration( + source_root=source_root, + astrbot_root=astrbot_root, + dry_run=False, + target_dir=Path("data/migrations/openclaw/test-invalid-config"), + ) + + assert "Failed to parse OpenClaw config JSON" in str(exc_info.value) + + +def test_run_openclaw_migration_invalid_sqlite_raises_click_exception( + tmp_path: Path, +) -> None: + source_root = tmp_path / ".openclaw" + source_root.mkdir(parents=True) + workspace = source_root / "workspace" + (workspace / "memory").mkdir(parents=True, exist_ok=True) + (workspace / "notes").mkdir(parents=True, exist_ok=True) + (workspace / "MEMORY.md").write_text("", encoding="utf-8") + (workspace / "memory" / "brain.db").write_text( + "not a sqlite database", + encoding="utf-8", + ) + + astrbot_root = tmp_path / "astrbot" + astrbot_root.mkdir(parents=True) + _prepare_astrbot_root(astrbot_root) + + import click + import pytest + + with pytest.raises(click.ClickException) as exc_info: + run_openclaw_migration( + source_root=source_root, + astrbot_root=astrbot_root, + dry_run=True, + ) + + err_text = str(exc_info.value) + assert "Failed to read OpenClaw sqlite at" in err_text + assert "brain.db" in err_text + + +def test_run_openclaw_migration_supports_sqlite_path_with_spaces(tmp_path: Path) -> None: + source_root = tmp_path / ".openclaw with spaces" + source_root.mkdir(parents=True) + _prepare_openclaw_source(source_root) + + astrbot_root = tmp_path / "astrbot" + astrbot_root.mkdir(parents=True) + _prepare_astrbot_root(astrbot_root) + + report = run_openclaw_migration( + source_root=source_root, + astrbot_root=astrbot_root, + dry_run=True, + ) + + assert report.memory_entries_from_sqlite >= 1 + + +def test_run_openclaw_migration_timeline_escapes_backticks(tmp_path: Path) -> None: + source_root = tmp_path / ".openclaw" + source_root.mkdir(parents=True) + _prepare_openclaw_source(source_root) + + memory_md = source_root / "workspace" / "MEMORY.md" + memory_md.write_text( + memory_md.read_text(encoding="utf-8") + + "- **key`tick**: content has `tick` too\n", + encoding="utf-8", + ) + + astrbot_root = tmp_path / "astrbot" + astrbot_root.mkdir(parents=True) + _prepare_astrbot_root(astrbot_root) + + report = run_openclaw_migration( + source_root=source_root, + astrbot_root=astrbot_root, + dry_run=False, + target_dir=Path("data/migrations/openclaw/test-timeline-backticks"), + ) + + assert report.target_dir is not None + timeline = (Path(report.target_dir) / "time_brief_history.md").read_text( + encoding="utf-8" + ) + assert "`key\\`tick`" in timeline + assert "content has \\`tick\\` too" in timeline + + +def test_json_to_toml_quotes_special_keys() -> None: + payload = { + "normal key": "ok", + "nested.obj": {"x y": 1}, + "list": [{"dot.key": True}], + } + toml_text = json_to_toml(payload) + + assert '"normal key" = "ok"' in toml_text + assert '["nested.obj"]' in toml_text + assert '"x y" = 1' in toml_text + assert '[["list"]]' in toml_text + assert '"dot.key" = true' in toml_text + + +def test_json_to_toml_rejects_non_finite_float() -> None: + import pytest + + with pytest.raises(ValueError): + json_to_toml({"invalid": float("nan")}) + + +def test_json_to_toml_preserves_null_sentinel_behavior() -> None: + toml_text = json_to_toml( + { + "nullable": None, + "nested": {"inner": None}, + "list": [None, 1], + } + ) + + assert '"nullable" = "__ASTRBOT_OPENCLAW_NULL_SENTINEL_V1__"' in toml_text + assert '["nested"]' in toml_text + assert '"inner" = "__ASTRBOT_OPENCLAW_NULL_SENTINEL_V1__"' in toml_text + assert '"list" = ["__ASTRBOT_OPENCLAW_NULL_SENTINEL_V1__", 1]' in toml_text + + +def test_json_to_toml_escapes_quotes_backslashes_and_newlines() -> None: + toml_text = json_to_toml( + { + 'k"ey': "line1\nline2", + "path": "C:\\tmp\\file.txt", + } + ) + + assert '"k\\"ey" = "line1\\nline2"' in toml_text + assert '"path" = "C:\\\\tmp\\\\file.txt"' in toml_text