diff --git a/flocks/channel/base.py b/flocks/channel/base.py index 7bac9f2e..5d58315d 100644 --- a/flocks/channel/base.py +++ b/flocks/channel/base.py @@ -53,6 +53,7 @@ class InboundMessage: chat_type: ChatType = ChatType.DIRECT text: str = "" media_url: Optional[str] = None + media_mime: Optional[str] = None reply_to_id: Optional[str] = None thread_id: Optional[str] = None mentioned: bool = False diff --git a/flocks/channel/builtin/weixin/__init__.py b/flocks/channel/builtin/weixin/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/flocks/channel/builtin/weixin/cdn.py b/flocks/channel/builtin/weixin/cdn.py new file mode 100644 index 00000000..60980e28 --- /dev/null +++ b/flocks/channel/builtin/weixin/cdn.py @@ -0,0 +1,162 @@ +""" +WeChat CDN (novac2c.cdn.weixin.qq.com) URL builders, SSRF protection, +and raw download / upload helpers for AES-encrypted media payloads. + +The CDN protocol: +- Inbound media is fetched from ``/c2c/download?encrypted_query_param=...`` + and decrypted client-side with the AES key embedded in the iLink frame. +- Outbound media is encrypted client-side and uploaded with POST to either + ``/c2c/upload?encrypted_query_param=&filekey=`` + or directly to ``upload_full_url`` returned by ``getuploadurl``. +""" + +from __future__ import annotations + +import asyncio +from typing import TYPE_CHECKING, Optional +from urllib.parse import quote, urlparse + +if TYPE_CHECKING: + import aiohttp + + +# Hosts the channel is allowed to fetch media from. SSRF guard. +_WEIXIN_CDN_ALLOWLIST: frozenset[str] = frozenset( + { + "novac2c.cdn.weixin.qq.com", + "ilinkai.weixin.qq.com", + "wx.qlogo.cn", + "thirdwx.qlogo.cn", + "res.wx.qq.com", + "mmbiz.qpic.cn", + "mmbiz.qlogo.cn", + } +) + + +def cdn_download_url(cdn_base_url: str, encrypted_query_param: str) -> str: + return ( + f"{cdn_base_url.rstrip('/')}/download" + f"?encrypted_query_param={quote(encrypted_query_param, safe='')}" + ) + + +def cdn_upload_url(cdn_base_url: str, upload_param: str, filekey: str) -> str: + return ( + f"{cdn_base_url.rstrip('/')}/upload" + f"?encrypted_query_param={quote(upload_param, safe='')}" + f"&filekey={quote(filekey, safe='')}" + ) + + +def assert_weixin_cdn_url(url: str) -> None: + """Raise ``ValueError`` if *url* is not on a known WeChat CDN host. + + Used as an SSRF guard before fetching ``full_url`` (which the iLink + server controls) — without this, a malicious frame could redirect + downloads to arbitrary internal hosts. + """ + try: + parsed = urlparse(url) + scheme = parsed.scheme.lower() + host = parsed.hostname or "" + except Exception as exc: # noqa: BLE001 + raise ValueError(f"Unparseable media URL: {url!r}") from exc + + if scheme not in ("http", "https"): + raise ValueError( + f"Media URL has disallowed scheme {scheme!r}; only http/https permitted." + ) + if host not in _WEIXIN_CDN_ALLOWLIST: + raise ValueError( + f"Media URL host {host!r} is not in the WeChat CDN allowlist. " + "Refusing to fetch to prevent SSRF." + ) + + +async def download_bytes( + session: "aiohttp.ClientSession", + *, + url: str, + timeout_seconds: float = 60.0, +) -> bytes: + """GET *url* and return the response body bytes. + + Uses ``asyncio.wait_for`` rather than ``aiohttp.ClientTimeout`` so the + coroutine can be safely scheduled via ``run_coroutine_threadsafe`` from + callers running outside the aiohttp event loop. + """ + async def _do() -> bytes: + async with session.get(url) as resp: + resp.raise_for_status() + return await resp.read() + return await asyncio.wait_for(_do(), timeout=timeout_seconds) + + +async def upload_ciphertext( + session: "aiohttp.ClientSession", + *, + ciphertext: bytes, + upload_url: str, + timeout_seconds: float = 120.0, +) -> str: + """POST encrypted bytes to the WeChat CDN, return ``x-encrypted-param`` echo. + + Both the constructed CDN URL (from ``upload_param``) and the direct + ``upload_full_url`` use POST with the raw ciphertext as the body. + """ + async def _do() -> str: + async with session.post( + upload_url, + data=ciphertext, + headers={"Content-Type": "application/octet-stream"}, + ) as resp: + if resp.status == 200: + encrypted_param = resp.headers.get("x-encrypted-param") + if encrypted_param: + await resp.read() + return encrypted_param + raw = await resp.text() + raise RuntimeError(f"CDN upload missing x-encrypted-param header: {raw[:200]}") + raw = await resp.text() + raise RuntimeError(f"CDN upload HTTP {resp.status}: {raw[:200]}") + return await asyncio.wait_for(_do(), timeout=timeout_seconds) + + +def media_reference(item: dict, key: str) -> dict: + """Pull the ``.media`` sub-dict out of an item like ``image_item``/``file_item``.""" + return (item.get(key) or {}).get("media") or {} + + +async def download_and_decrypt_media( + session: "aiohttp.ClientSession", + *, + cdn_base_url: str, + encrypted_query_param: Optional[str], + aes_key_b64: Optional[str], + full_url: Optional[str], + timeout_seconds: float, +) -> bytes: + """Fetch + AES-decrypt a single media payload. + + Caller supplies whichever of ``encrypted_query_param`` / ``full_url`` is + present in the iLink frame. ``aes_key_b64`` is decoded by ``crypto.parse_aes_key``. + """ + # Local import to avoid a circular dependency between cdn and crypto. + from .crypto import aes128_ecb_decrypt, parse_aes_key + + if encrypted_query_param: + raw = await download_bytes( + session, + url=cdn_download_url(cdn_base_url, encrypted_query_param), + timeout_seconds=timeout_seconds, + ) + elif full_url: + assert_weixin_cdn_url(full_url) + raw = await download_bytes(session, url=full_url, timeout_seconds=timeout_seconds) + else: + raise RuntimeError("media item had neither encrypt_query_param nor full_url") + + if aes_key_b64: + raw = aes128_ecb_decrypt(raw, parse_aes_key(aes_key_b64)) + return raw diff --git a/flocks/channel/builtin/weixin/channel.py b/flocks/channel/builtin/weixin/channel.py new file mode 100644 index 00000000..cb1e30d5 --- /dev/null +++ b/flocks/channel/builtin/weixin/channel.py @@ -0,0 +1,696 @@ +""" +Weixin (微信) ChannelPlugin implementation. + +Connects Flocks to WeChat personal accounts via Tencent's iLink Bot API. +Only accounts registered as iLink bots (via QR scan) are supported. + +Design notes: +- Long-poll ``getupdates`` drives inbound delivery. +- Every outbound reply should echo the latest ``context_token`` for the peer. +- Media files move through an AES-128-ECB encrypted CDN protocol — see + ``media.py`` and ``cdn.py``. +- Token / credentials are obtained via QR login on the iLink Bot developer + portal, then configured under the ``weixin`` channel. +""" + +from __future__ import annotations + +import asyncio +import hashlib +import os +import uuid +from typing import Awaitable, Callable, Optional +from urllib.parse import urlparse + +from flocks.channel.base import ( + ChannelCapabilities, + ChannelMeta, + ChannelPlugin, + ChatType, + DeliveryResult, + InboundMessage, + OutboundContext, +) +from flocks.utils.log import Log + +from . import client as ilink +from .config import ( + AIOHTTP_AVAILABLE, + BACKOFF_DELAY_SECONDS, + CRYPTO_AVAILABLE, + ILINK_BASE_URL, + LONG_POLL_TIMEOUT_MS, + MAX_CONSECUTIVE_FAILURES, + MAX_MESSAGE_LENGTH, + RATE_LIMIT_ERRCODE, + RETRY_DELAY_SECONDS, + SESSION_EXPIRED_ERRCODE, + WEIXIN_CDN_BASE_URL, +) +from .format import format_for_weixin, split_chunks +from .inbound import extract_text, guess_chat_type, safe_id +from .media import ( + MediaCache, + download_inbound_item, + fetch_remote_to_temp, + is_downloadable_media_item, + send_outbound_file, +) +from .store import ( + ContextTokenStore, + MessageDedup, + load_sync_buf, + save_sync_buf, +) + +log = Log.create(service="channel.weixin") + +# Local alias to keep type hints readable when aiohttp is missing at import time +if AIOHTTP_AVAILABLE: + import aiohttp # type: ignore[import-untyped] + + +class WeixinChannel(ChannelPlugin): + """WeChat (微信) personal account channel via Tencent iLink Bot API. + + Prerequisites: + - A WeChat account registered as an iLink bot (QR scan on the iLink portal). + - ``aiohttp`` and ``cryptography`` Python packages installed. + + Required config keys: + - ``token`` — iLink bot token (``WEIXIN_TOKEN`` env var as fallback) + - ``accountId`` — iLink bot account ID (``WEIXIN_ACCOUNT_ID`` env var as fallback) + + Optional config keys: + - ``baseUrl`` — iLink API base URL (defaults to ilinkai.weixin.qq.com) + - ``cdnBaseUrl`` — iLink CDN base URL (defaults to novac2c.cdn.weixin.qq.com) + - ``dmPolicy`` — ``"open"`` (default) | ``"disabled"`` | ``"allowlist"`` + - ``allowFrom`` — list of allowed sender user IDs for DM allowlist mode + - ``groupPolicy`` — ``"all"`` (default) | ``"disabled"`` | ``"allowlist"`` + Controls whether group chat messages are processed. + Note: iLink Bot accounts may not receive group events in + ordinary WeChat groups depending on account type. + - ``groupAllowFrom`` — list of allowed group / room IDs for group allowlist mode + - ``sendChunkDelay`` — seconds between multi-chunk messages (default 1.5) + - ``dataDir`` — override path for storing sync_buf / context-token / media cache + (default: ~/.flocks/workspace/channels/weixin) + """ + + def __init__(self) -> None: + super().__init__() + self._token: str = "" + self._account_id: str = "" + self._base_url: str = ILINK_BASE_URL + self._cdn_base_url: str = WEIXIN_CDN_BASE_URL + self._dm_policy: str = "open" + self._allow_from: list[str] = [] + self._group_policy: str = "all" + self._group_allow_from: list[str] = [] + self._send_chunk_delay: float = 1.5 + self._send_chunk_retries: int = 4 + self._data_dir: Optional[str] = None + + self._token_store: ContextTokenStore = ContextTokenStore() + self._dedup: MessageDedup = MessageDedup() + self._media_cache: Optional[MediaCache] = None + + self._poll_session: "Optional[aiohttp.ClientSession]" = None + self._send_session: "Optional[aiohttp.ClientSession]" = None + + # ------------------------------------------------------------------ + # ChannelPlugin interface + # ------------------------------------------------------------------ + + def meta(self) -> ChannelMeta: + return ChannelMeta( + id="weixin", + label="微信", + aliases=["wechat", "wx"], + order=30, + ) + + def capabilities(self) -> ChannelCapabilities: + return ChannelCapabilities( + chat_types=[ChatType.DIRECT, ChatType.GROUP], + media=True, + threads=False, + reactions=False, + edit=False, + rich_text=True, + ) + + def validate_config(self, config: dict) -> Optional[str]: + token = config.get("token") or os.getenv("WEIXIN_TOKEN", "") + account_id = config.get("accountId") or os.getenv("WEIXIN_ACCOUNT_ID", "") + if not str(token).strip(): + return "Missing required config: token (or WEIXIN_TOKEN env var)" + if not str(account_id).strip(): + return "Missing required config: accountId (or WEIXIN_ACCOUNT_ID env var)" + return None + + def config_schema(self) -> Optional[dict]: + return { + "type": "object", + "properties": { + "token": {"type": "string", "description": "iLink bot token (从 QR 登录获取)"}, + "accountId": {"type": "string", "description": "iLink bot account ID (从 QR 登录获取)"}, + "baseUrl": {"type": "string", "description": "iLink API 地址", "default": ILINK_BASE_URL}, + "cdnBaseUrl": {"type": "string", "description": "iLink CDN 地址", "default": WEIXIN_CDN_BASE_URL}, + "dmPolicy": { + "type": "string", + "enum": ["open", "disabled", "allowlist"], + "description": "私信策略", + "default": "open", + }, + "allowFrom": {"type": "string", "description": "allowlist 模式下允许的发送者 user_id,逗号分隔"}, + "groupPolicy": { + "type": "string", + "enum": ["all", "disabled", "allowlist"], + "description": "群聊策略", + "default": "all", + }, + "groupAllowFrom": {"type": "string", "description": "群聊 allowlist 模式下允许的群 / 房间 ID,逗号分隔"}, + "sendChunkDelay": {"type": "number", "description": "多段消息发送间隔(秒)", "default": 1.5}, + "dataDir": {"type": "string", "description": "状态文件 / 媒体缓存存储目录(默认 ~/.flocks/workspace/channels/weixin)"}, + }, + "required": ["token", "accountId"], + } + + def target_hint(self) -> str: + return "" + + @property + def text_chunk_limit(self) -> int: + return MAX_MESSAGE_LENGTH + + def format_message(self, text: str, format_hint: str = "markdown") -> str: + return format_for_weixin(text) + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def start( + self, + config: dict, + on_message: Callable[[InboundMessage], Awaitable[None]], + abort_event: Optional[asyncio.Event] = None, + ) -> None: + if not (AIOHTTP_AVAILABLE and CRYPTO_AVAILABLE): + raise RuntimeError( + "Weixin channel requires ``aiohttp`` and ``cryptography``. " + "Run: pip install aiohttp cryptography" + ) + + self._token = str(config.get("token") or os.getenv("WEIXIN_TOKEN", "")).strip() + self._account_id = str(config.get("accountId") or os.getenv("WEIXIN_ACCOUNT_ID", "")).strip() + self._base_url = str(config.get("baseUrl") or ILINK_BASE_URL).rstrip("/") + self._cdn_base_url = str(config.get("cdnBaseUrl") or WEIXIN_CDN_BASE_URL).rstrip("/") + self._dm_policy = str(config.get("dmPolicy") or "open").lower() + raw_allow = config.get("allowFrom") or "" + self._allow_from = [s.strip() for s in str(raw_allow).split(",") if s.strip()] + self._group_policy = str(config.get("groupPolicy") or "all").lower() + raw_group_allow = config.get("groupAllowFrom") or "" + self._group_allow_from = [s.strip() for s in str(raw_group_allow).split(",") if s.strip()] + self._send_chunk_delay = float(config.get("sendChunkDelay") or 1.5) + self._send_chunk_retries = int(config.get("sendChunkRetries") or 4) + self._data_dir = config.get("dataDir") + + self._token_store = ContextTokenStore(self._data_dir) + self._token_store.restore(self._account_id) + self._dedup = MessageDedup() + self._media_cache = MediaCache(self._data_dir) + + no_timeout = aiohttp.ClientTimeout( + total=None, connect=None, sock_connect=None, sock_read=None, + ) + self._poll_session = aiohttp.ClientSession( + trust_env=True, connector=ilink.make_ssl_connector(), + ) + self._send_session = aiohttp.ClientSession( + trust_env=True, connector=ilink.make_ssl_connector(), timeout=no_timeout, + ) + + self.mark_connected() + log.info("weixin.connected", { + "account_id": safe_id(self._account_id), + "base_url": self._base_url, + }) + if self._group_policy != "disabled": + log.warning("weixin.group_policy.note", { + "group_policy": self._group_policy, + "note": ( + "QR-login connects an iLink bot identity (e.g. ...@im.bot), not a " + "normal personal WeChat account. Ordinary WeChat group messages are " + "typically NOT delivered by iLink for this account type. " + "groupPolicy only takes effect if iLink actually delivers group events." + ), + }) + + try: + await self._poll_loop(on_message, abort_event) + finally: + await self._close_sessions() + self.mark_disconnected() + + async def stop(self) -> None: + await self._close_sessions() + + async def _close_sessions(self) -> None: + for attr in ("_poll_session", "_send_session"): + session = getattr(self, attr, None) + if session and not session.closed: + try: + await session.close() + except Exception: + pass + setattr(self, attr, None) + + # ------------------------------------------------------------------ + # Inbound long-poll loop + # ------------------------------------------------------------------ + + async def _poll_loop( + self, + on_message: Callable[[InboundMessage], Awaitable[None]], + abort_event: Optional[asyncio.Event], + ) -> None: + assert self._poll_session is not None + sync_buf = load_sync_buf(self._account_id, self._data_dir) + timeout_ms = LONG_POLL_TIMEOUT_MS + consecutive_failures = 0 + + while abort_event is None or not abort_event.is_set(): + try: + response = await ilink.get_updates( + self._poll_session, + base_url=self._base_url, + token=self._token, + sync_buf=sync_buf, + timeout_ms=timeout_ms, + ) + + suggested = response.get("longpolling_timeout_ms") + if isinstance(suggested, int) and suggested > 0: + timeout_ms = suggested + + ret = response.get("ret", 0) + errcode = response.get("errcode", 0) + + if ret not in (0, None) or errcode not in (0, None): + if ( + ret == SESSION_EXPIRED_ERRCODE + or errcode == SESSION_EXPIRED_ERRCODE + or ilink.is_stale_session(ret, errcode, response.get("errmsg")) + ): + log.error("weixin.session_expired", { + "account_id": safe_id(self._account_id), + }) + await asyncio.sleep(600) + consecutive_failures = 0 + continue + + consecutive_failures += 1 + log.warning("weixin.getupdates_error", { + "ret": ret, "errcode": errcode, + "errmsg": response.get("errmsg", ""), + "attempt": consecutive_failures, + }) + await asyncio.sleep( + BACKOFF_DELAY_SECONDS + if consecutive_failures >= MAX_CONSECUTIVE_FAILURES + else RETRY_DELAY_SECONDS + ) + if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: + consecutive_failures = 0 + continue + + consecutive_failures = 0 + new_sync_buf = str(response.get("get_updates_buf") or "") + if new_sync_buf: + sync_buf = new_sync_buf + save_sync_buf(self._account_id, sync_buf, self._data_dir) + + for message in response.get("msgs") or []: + asyncio.create_task(self._process_message_safe(message, on_message)) + + except asyncio.CancelledError: + break + except Exception as exc: + consecutive_failures += 1 + log.error("weixin.poll_error", { + "error": str(exc), "attempt": consecutive_failures, + }) + await asyncio.sleep( + BACKOFF_DELAY_SECONDS + if consecutive_failures >= MAX_CONSECUTIVE_FAILURES + else RETRY_DELAY_SECONDS + ) + if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: + consecutive_failures = 0 + + async def _process_message_safe( + self, + message: dict, + on_message: Callable[[InboundMessage], Awaitable[None]], + ) -> None: + try: + await self._process_message(message, on_message) + except Exception as exc: + log.error("weixin.process_error", { + "from": safe_id(message.get("from_user_id")), + "error": str(exc), + }) + + async def _process_message( + self, + message: dict, + on_message: Callable[[InboundMessage], Awaitable[None]], + ) -> None: + sender_id = str(message.get("from_user_id") or "").strip() + if not sender_id or sender_id == self._account_id: + return + + message_id = str(message.get("message_id") or "").strip() + if message_id and self._dedup.is_duplicate(message_id): + return + + item_list = message.get("item_list") or [] + text = extract_text(item_list) + + if text: + content_key = f"content:{sender_id}:{hashlib.md5(text.encode()).hexdigest()}" + if self._dedup.is_duplicate(content_key): + log.debug("weixin.dedup_content", {"sender": safe_id(sender_id)}) + return + + chat_type_str, effective_chat_id = guess_chat_type(message, self._account_id) + + if chat_type_str == "group": + if not self._is_group_allowed(effective_chat_id): + return + elif not self._is_dm_allowed(sender_id): + return + + # Download the first inbound media item (image / video / voice / file). + # InboundMessage.media_url is single-valued, so any extras are dropped. + media_url, media_mime = await self._collect_inbound_media(item_list, sender_id) + + if not text and not media_url: + return + + context_token = str(message.get("context_token") or "").strip() + if context_token: + self._token_store.set(self._account_id, sender_id, context_token) + + chat_type = ChatType.GROUP if chat_type_str == "group" else ChatType.DIRECT + inbound = InboundMessage( + channel_id="weixin", + account_id=self._account_id, + message_id=message_id or str(uuid.uuid4()), + sender_id=sender_id, + sender_name=sender_id, + chat_id=effective_chat_id, + chat_type=chat_type, + text=text, + media_url=media_url, + media_mime=media_mime, + mentioned=False, + raw=message, + ) + log.info("weixin.inbound", { + "from": safe_id(sender_id), + "chat_type": chat_type_str, + "text_preview": text[:50], + "media_mime": media_mime, + }) + await on_message(inbound) + + async def _collect_inbound_media( + self, item_list: list, sender_id: str, + ) -> tuple[Optional[str], Optional[str]]: + """Download the first downloadable media item and return ``(uri, mime)``. + + ``InboundMessage.media_url`` is single-valued, so we deliberately do NOT + download items beyond the first — only count them so a warning is logged. + """ + if not self._poll_session or not self._media_cache: + return None, None + media_items = [item for item in item_list if is_downloadable_media_item(item)] + if not media_items: + return None, None + + sender_log = safe_id(sender_id) + if len(media_items) > 1: + log.warning("weixin.media.extra_dropped", { + "from": sender_log, + "dropped": len(media_items) - 1, + }) + + result = await download_inbound_item( + self._poll_session, + item=media_items[0], + cdn_base_url=self._cdn_base_url, + cache=self._media_cache, + sender_log_id=sender_log, + ) + return (result[0], result[1]) if result else (None, None) + + def _is_dm_allowed(self, sender_id: str) -> bool: + if self._dm_policy == "disabled": + return False + if self._dm_policy == "allowlist": + return sender_id in self._allow_from + return True + + def _is_group_allowed(self, chat_id: str) -> bool: + if self._group_policy == "disabled": + return False + if self._group_policy == "allowlist": + return chat_id in self._group_allow_from + return True + + # ------------------------------------------------------------------ + # Outbound: text + # ------------------------------------------------------------------ + + async def send_text(self, ctx: OutboundContext) -> DeliveryResult: + if not self._send_session or not self._token: + return DeliveryResult( + channel_id="weixin", message_id="", + success=False, error="Not connected", + ) + + formatted = format_for_weixin(ctx.text) + chunks = split_chunks(formatted, MAX_MESSAGE_LENGTH) + if not chunks: + return DeliveryResult(channel_id="weixin", message_id="") + + context_token = self._token_store.get(self._account_id, ctx.to) + last_message_id = "" + try: + for idx, chunk in enumerate(chunks): + client_id = f"flocks-weixin-{uuid.uuid4().hex}" + await self._send_chunk_with_retry( + to=ctx.to, chunk=chunk, + context_token=context_token, client_id=client_id, + ) + last_message_id = client_id + if idx < len(chunks) - 1 and self._send_chunk_delay > 0: + await asyncio.sleep(self._send_chunk_delay) + except Exception as exc: + log.error("weixin.send_text.error", { + "to": safe_id(ctx.to), "error": str(exc), + }) + return DeliveryResult( + channel_id="weixin", message_id="", + success=False, error=str(exc), + ) + return DeliveryResult(channel_id="weixin", message_id=last_message_id) + + async def _send_chunk_with_retry( + self, + *, + to: str, + chunk: str, + context_token: Optional[str], + client_id: str, + ) -> None: + """Send a single text chunk with per-chunk retry and backoff. + + - On session-expired (errcode -14): retry once *without* ``context_token`` + and drop it from the local store. + - On rate-limit (errcode -2): back off 3× and retry. + """ + last_error: Optional[Exception] = None + retried_without_token = False + retry_delay = 1.0 + + for attempt in range(self._send_chunk_retries + 1): + try: + resp = await ilink.send_text_message( + self._send_session, + base_url=self._base_url, + token=self._token, + to=to, text=chunk, + context_token=context_token, client_id=client_id, + ) + + if isinstance(resp, dict): + ret = resp.get("ret") + errcode = resp.get("errcode") + # Always log the iLink response so we can confirm whether + # the message was actually accepted (vs silently dropped). + log.info("weixin.send.response", { + "to": safe_id(to), + "client_id": client_id[:24], + "ret": ret, + "errcode": errcode, + "errmsg": resp.get("errmsg"), + "msg_id": resp.get("msg_id") or resp.get("message_id"), + "has_context_token": bool(context_token), + "chunk_len": len(chunk), + }) + if (ret is not None and ret != 0) or (errcode is not None and errcode != 0): + is_session_expired = ( + ret == SESSION_EXPIRED_ERRCODE + or errcode == SESSION_EXPIRED_ERRCODE + or ilink.is_stale_session(ret, errcode, resp.get("errmsg")) + ) + if is_session_expired and not retried_without_token and context_token: + retried_without_token = True + context_token = None + self._token_store.clear(self._account_id, to) + log.warning("weixin.send.session_expired_retry", { + "to": safe_id(to), + }) + continue + + is_rate_limited = ( + ret == RATE_LIMIT_ERRCODE or errcode == RATE_LIMIT_ERRCODE + ) + if is_rate_limited: + errmsg = resp.get("errmsg") or resp.get("msg") or "rate limited" + last_error = RuntimeError( + f"iLink sendmessage rate limited: " + f"ret={ret} errcode={errcode} errmsg={errmsg}" + ) + if attempt >= self._send_chunk_retries: + break + wait = retry_delay * 3 + log.warning("weixin.send.rate_limited", { + "to": safe_id(to), "wait": wait, + }) + await asyncio.sleep(wait) + continue + + errmsg = resp.get("errmsg") or resp.get("msg") or "unknown error" + raise RuntimeError( + f"iLink sendmessage error: ret={ret} errcode={errcode} errmsg={errmsg}" + ) + return + + except Exception as exc: + last_error = exc + if attempt >= self._send_chunk_retries: + break + wait = retry_delay * (attempt + 1) + log.warning("weixin.send.retry", { + "to": safe_id(to), + "attempt": attempt + 1, + "wait": wait, + "error": str(exc), + }) + await asyncio.sleep(wait) + + if last_error is not None: + raise last_error + + # ------------------------------------------------------------------ + # Outbound: media + # ------------------------------------------------------------------ + + async def send_media(self, ctx: OutboundContext) -> DeliveryResult: + """Send a media file (image / video / voice / document). + + ``ctx.media_url`` may be: + - a local path (``/abs/path/to/file.png``) + - a ``file://`` URI + - a remote ``http(s)://`` URL on the WeChat CDN allowlist + """ + if not self._send_session or not self._token: + return DeliveryResult( + channel_id="weixin", message_id="", + success=False, error="Not connected", + ) + if not ctx.media_url: + # No media to send — fall back to plain text via send_text. + return await self.send_text(ctx) + + local_path, cleanup = await self._resolve_media_to_path(ctx.media_url) + if not local_path: + return DeliveryResult( + channel_id="weixin", message_id="", + success=False, error=f"Could not resolve media URL: {ctx.media_url}", + ) + + context_token = self._token_store.get(self._account_id, ctx.to) + try: + # Caption first (if any) so the file appears under it in chat order. + if ctx.text and ctx.text.strip(): + caption_result = await self.send_text(ctx) + if not caption_result.success: + return caption_result + + client_id = await send_outbound_file( + self._send_session, + base_url=self._base_url, + cdn_base_url=self._cdn_base_url, + token=self._token, + chat_id=ctx.to, + path=local_path, + context_token=context_token, + ) + return DeliveryResult(channel_id="weixin", message_id=client_id) + + except Exception as exc: + log.error("weixin.send_media.error", { + "to": safe_id(ctx.to), "error": str(exc), + }) + return DeliveryResult( + channel_id="weixin", message_id="", + success=False, error=str(exc), + ) + finally: + if cleanup and local_path: + try: + os.unlink(local_path) + except OSError: + pass + + async def _resolve_media_to_path(self, media_url: str) -> tuple[Optional[str], bool]: + """Resolve *media_url* to an on-disk path. Returns ``(path, should_cleanup)``.""" + parsed = urlparse(media_url) + scheme = parsed.scheme.lower() + + if scheme in ("", "file"): + path = parsed.path if scheme == "file" else media_url + if not os.path.isabs(path): + path = os.path.abspath(path) + return (path, False) if os.path.exists(path) else (None, False) + + if scheme in ("http", "https"): + try: + # Validate host before downloading to prevent SSRF. + from .cdn import assert_weixin_cdn_url + assert_weixin_cdn_url(media_url) + path = await fetch_remote_to_temp(self._send_session, url=media_url) + return path, True + except Exception as exc: + log.warning("weixin.media.fetch_failed", { + "url": media_url, "error": str(exc), + }) + return None, False + + log.warning("weixin.media.unsupported_scheme", {"scheme": scheme}) + return None, False diff --git a/flocks/channel/builtin/weixin/client.py b/flocks/channel/builtin/weixin/client.py new file mode 100644 index 00000000..1069fabd --- /dev/null +++ b/flocks/channel/builtin/weixin/client.py @@ -0,0 +1,229 @@ +""" +Low-level iLink Bot HTTP API helpers. + +Each function maps 1:1 to an iLink endpoint and returns the parsed JSON dict. +Higher-level retry/backoff is handled by the channel itself. +""" + +from __future__ import annotations + +import asyncio +import base64 +import json +import secrets +import ssl +import struct +from typing import TYPE_CHECKING, Optional + +from .config import ( + API_TIMEOUT_MS, + CHANNEL_VERSION, + EP_GET_UPDATES, + EP_GET_UPLOAD_URL, + EP_SEND_MESSAGE, + ILINK_APP_CLIENT_VERSION, + ILINK_APP_ID, + ITEM_TEXT, + MSG_STATE_FINISH, + MSG_TYPE_BOT, + RATE_LIMIT_ERRCODE, +) + +if TYPE_CHECKING: + import aiohttp + + +def make_ssl_connector() -> "Optional[aiohttp.TCPConnector]": + """Return a TCPConnector with certifi CA bundle for iLink TLS verification. + + Tencent's ``ilinkai.weixin.qq.com`` is not always verifiable against + Homebrew OpenSSL on macOS; certifi's Mozilla bundle is the reliable choice. + Returns ``None`` if certifi or aiohttp is unavailable; caller falls back + to aiohttp defaults. + """ + try: + import aiohttp # local import keeps module importable without aiohttp + import certifi + except ImportError: + return None + ssl_ctx = ssl.create_default_context(cafile=certifi.where()) + return aiohttp.TCPConnector(ssl=ssl_ctx) + + +def random_wechat_uin() -> str: + value = struct.unpack(">I", secrets.token_bytes(4))[0] + return base64.b64encode(str(value).encode("utf-8")).decode("ascii") + + +def base_info() -> dict: + return {"channel_version": CHANNEL_VERSION} + + +def make_headers(token: Optional[str], body: str) -> dict: + headers = { + "Content-Type": "application/json", + "AuthorizationType": "ilink_bot_token", + "Content-Length": str(len(body.encode("utf-8"))), + "X-WECHAT-UIN": random_wechat_uin(), + "iLink-App-Id": ILINK_APP_ID, + "iLink-App-ClientVersion": str(ILINK_APP_CLIENT_VERSION), + } + if token: + headers["Authorization"] = f"Bearer {token}" + return headers + + +def is_stale_session( + ret: Optional[int], errcode: Optional[int], errmsg: Optional[str] +) -> bool: + """Detect the iLink "stale session" disguise of errcode -2. + + iLink occasionally returns ret/errcode = -2 with errmsg "unknown error" + for an expired session, rather than the documented errcode -14. + """ + if ret != RATE_LIMIT_ERRCODE and errcode != RATE_LIMIT_ERRCODE: + return False + return (errmsg or "").lower() == "unknown error" + + +def _json_dumps(payload: dict) -> str: + return json.dumps(payload, ensure_ascii=False, separators=(",", ":")) + + +async def api_post( + session: "aiohttp.ClientSession", + *, + base_url: str, + endpoint: str, + payload: dict, + token: Optional[str], + timeout_ms: int, +) -> dict: + """POST *payload* + ``base_info`` to ``{base_url}/{endpoint}``.""" + import aiohttp + + body = _json_dumps({**payload, "base_info": base_info()}) + url = f"{base_url.rstrip('/')}/{endpoint}" + timeout = aiohttp.ClientTimeout(total=timeout_ms / 1000) + async with session.post(url, data=body, headers=make_headers(token, body), timeout=timeout) as resp: + raw = await resp.text() + if not resp.ok: + raise RuntimeError(f"iLink POST {endpoint} HTTP {resp.status}: {raw[:200]}") + return json.loads(raw) + + +async def get_updates( + session: "aiohttp.ClientSession", + *, + base_url: str, + token: str, + sync_buf: str, + timeout_ms: int, +) -> dict: + try: + return await api_post( + session, + base_url=base_url, + endpoint=EP_GET_UPDATES, + payload={"get_updates_buf": sync_buf}, + token=token, + timeout_ms=timeout_ms, + ) + except asyncio.TimeoutError: + return {"ret": 0, "msgs": [], "get_updates_buf": sync_buf} + + +async def send_text_message( + session: "aiohttp.ClientSession", + *, + base_url: str, + token: str, + to: str, + text: str, + context_token: Optional[str], + client_id: str, +) -> dict: + if not text or not text.strip(): + raise ValueError("send_text_message: text must not be empty") + message: dict = { + "from_user_id": "", + "to_user_id": to, + "client_id": client_id, + "message_type": MSG_TYPE_BOT, + "message_state": MSG_STATE_FINISH, + "item_list": [{"type": ITEM_TEXT, "text_item": {"text": text}}], + } + if context_token: + message["context_token"] = context_token + return await api_post( + session, + base_url=base_url, + endpoint=EP_SEND_MESSAGE, + payload={"msg": message}, + token=token, + timeout_ms=API_TIMEOUT_MS, + ) + + +async def send_media_message( + session: "aiohttp.ClientSession", + *, + base_url: str, + token: str, + to: str, + item: dict, + context_token: Optional[str], + client_id: str, +) -> dict: + """Send a single pre-built media item (image/video/voice/file).""" + message: dict = { + "from_user_id": "", + "to_user_id": to, + "client_id": client_id, + "message_type": MSG_TYPE_BOT, + "message_state": MSG_STATE_FINISH, + "item_list": [item], + } + if context_token: + message["context_token"] = context_token + return await api_post( + session, + base_url=base_url, + endpoint=EP_SEND_MESSAGE, + payload={"msg": message}, + token=token, + timeout_ms=API_TIMEOUT_MS, + ) + + +async def get_upload_url( + session: "aiohttp.ClientSession", + *, + base_url: str, + token: str, + to_user_id: str, + media_type: int, + filekey: str, + rawsize: int, + rawfilemd5: str, + filesize: int, + aeskey_hex: str, +) -> dict: + """Request a CDN upload slot for an outbound media file.""" + return await api_post( + session, + base_url=base_url, + endpoint=EP_GET_UPLOAD_URL, + payload={ + "filekey": filekey, + "media_type": media_type, + "to_user_id": to_user_id, + "rawsize": rawsize, + "rawfilemd5": rawfilemd5, + "filesize": filesize, + "no_need_thumb": True, + "aeskey": aeskey_hex, + }, + token=token, + timeout_ms=API_TIMEOUT_MS, + ) diff --git a/flocks/channel/builtin/weixin/config.py b/flocks/channel/builtin/weixin/config.py new file mode 100644 index 00000000..50ed3150 --- /dev/null +++ b/flocks/channel/builtin/weixin/config.py @@ -0,0 +1,99 @@ +""" +Constants, regex patterns, and dependency guards for the Weixin channel. + +All public constants for the iLink Bot API live here so that other modules +in this package import a single source of truth. +""" + +from __future__ import annotations + +import re + +# --------------------------------------------------------------------------- +# iLink Bot API constants +# --------------------------------------------------------------------------- +ILINK_BASE_URL = "https://ilinkai.weixin.qq.com" +WEIXIN_CDN_BASE_URL = "https://novac2c.cdn.weixin.qq.com/c2c" +ILINK_APP_ID = "bot" +CHANNEL_VERSION = "2.2.0" +ILINK_APP_CLIENT_VERSION = (2 << 16) | (2 << 8) | 0 + +EP_GET_UPDATES = "ilink/bot/getupdates" +EP_SEND_MESSAGE = "ilink/bot/sendmessage" +EP_GET_CONFIG = "ilink/bot/getconfig" +EP_GET_UPLOAD_URL = "ilink/bot/getuploadurl" +EP_GET_BOT_QR = "ilink/bot/get_bot_qrcode" +EP_GET_QR_STATUS = "ilink/bot/get_qrcode_status" +QR_TIMEOUT_MS = 35_000 + +# --------------------------------------------------------------------------- +# Timeouts (milliseconds for API calls, seconds for media transfers) +# --------------------------------------------------------------------------- +LONG_POLL_TIMEOUT_MS = 35_000 +API_TIMEOUT_MS = 15_000 + +MEDIA_DOWNLOAD_IMAGE_TIMEOUT_S = 30.0 +MEDIA_DOWNLOAD_VIDEO_TIMEOUT_S = 120.0 +MEDIA_DOWNLOAD_FILE_TIMEOUT_S = 60.0 +MEDIA_DOWNLOAD_VOICE_TIMEOUT_S = 60.0 +MEDIA_UPLOAD_TIMEOUT_S = 120.0 +MEDIA_REMOTE_FETCH_TIMEOUT_S = 30.0 + +# --------------------------------------------------------------------------- +# Retry / backoff tuning +# --------------------------------------------------------------------------- +MAX_CONSECUTIVE_FAILURES = 3 +RETRY_DELAY_SECONDS = 2.0 +BACKOFF_DELAY_SECONDS = 30.0 +SESSION_EXPIRED_ERRCODE = -14 +RATE_LIMIT_ERRCODE = -2 +MESSAGE_DEDUP_TTL_SECONDS = 300 +MAX_MESSAGE_LENGTH = 2000 + +# --------------------------------------------------------------------------- +# iLink message / item type constants +# --------------------------------------------------------------------------- +ITEM_TEXT = 1 +ITEM_IMAGE = 2 +ITEM_VOICE = 3 +ITEM_FILE = 4 +ITEM_VIDEO = 5 +MSG_TYPE_BOT = 2 +MSG_STATE_FINISH = 2 + +MEDIA_IMAGE = 1 +MEDIA_VIDEO = 2 +MEDIA_FILE = 3 +MEDIA_VOICE = 4 + +# --------------------------------------------------------------------------- +# Markdown / format regex helpers (shared with format.py) +# --------------------------------------------------------------------------- +HEADER_RE = re.compile(r"^(#{1,6})\s+(.+?)\s*$") +TABLE_RULE_RE = re.compile(r"^\s*\|?(?:\s*:?-{3,}:?\s*\|)+\s*:?-{3,}:?\s*\|?\s*$") +FENCE_RE = re.compile(r"^```([^\n`]*)\s*$") + +# --------------------------------------------------------------------------- +# Dependency guards (importable feature flags) +# --------------------------------------------------------------------------- +try: + import aiohttp # type: ignore[import-untyped] # noqa: F401 + AIOHTTP_AVAILABLE = True +except ImportError: + AIOHTTP_AVAILABLE = False + +try: + from cryptography.hazmat.backends import default_backend # noqa: F401 + from cryptography.hazmat.primitives.ciphers import ( # noqa: F401 + Cipher, + algorithms, + modes, + ) + CRYPTO_AVAILABLE = True +except ImportError: + CRYPTO_AVAILABLE = False + + +def check_requirements() -> bool: + """Return True when both runtime dependencies are installed.""" + return AIOHTTP_AVAILABLE and CRYPTO_AVAILABLE diff --git a/flocks/channel/builtin/weixin/crypto.py b/flocks/channel/builtin/weixin/crypto.py new file mode 100644 index 00000000..c40729f0 --- /dev/null +++ b/flocks/channel/builtin/weixin/crypto.py @@ -0,0 +1,59 @@ +""" +AES-128-ECB encryption helpers used by the WeChat iLink CDN protocol. + +iLink encrypts/decrypts media payloads with a per-file 16-byte AES key +in ECB mode with PKCS7 padding. Key wire format is base64 of either the +raw 16 bytes or the 32-character hex string of the same key. +""" + +from __future__ import annotations + +import base64 + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + + +def pkcs7_pad(data: bytes, block_size: int = 16) -> bytes: + pad_len = block_size - (len(data) % block_size) + return data + bytes([pad_len] * pad_len) + + +def aes128_ecb_encrypt(plaintext: bytes, key: bytes) -> bytes: + cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=default_backend()) + encryptor = cipher.encryptor() + return encryptor.update(pkcs7_pad(plaintext)) + encryptor.finalize() + + +def aes128_ecb_decrypt(ciphertext: bytes, key: bytes) -> bytes: + cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=default_backend()) + decryptor = cipher.decryptor() + padded = decryptor.update(ciphertext) + decryptor.finalize() + if not padded: + return padded + pad_len = padded[-1] + if 1 <= pad_len <= 16 and padded.endswith(bytes([pad_len]) * pad_len): + return padded[:-pad_len] + return padded + + +def aes_padded_size(size: int) -> int: + """PKCS7-padded output size for *size* plaintext bytes (block=16).""" + return ((size + 1 + 15) // 16) * 16 + + +def parse_aes_key(aes_key_b64: str) -> bytes: + """Parse an iLink-style AES key. + + Accepts either: + - base64 of raw 16 bytes (decoded length 16), or + - base64 of the 32-char ASCII hex of the same key (decoded length 32). + """ + decoded = base64.b64decode(aes_key_b64) + if len(decoded) == 16: + return decoded + if len(decoded) == 32: + text = decoded.decode("ascii", errors="ignore") + if text and all(ch in "0123456789abcdefABCDEF" for ch in text): + return bytes.fromhex(text) + raise ValueError(f"unexpected aes_key format ({len(decoded)} decoded bytes)") diff --git a/flocks/channel/builtin/weixin/format.py b/flocks/channel/builtin/weixin/format.py new file mode 100644 index 00000000..f36fb873 --- /dev/null +++ b/flocks/channel/builtin/weixin/format.py @@ -0,0 +1,170 @@ +""" +Markdown normalization and message-chunk splitting for WeChat delivery. + +WeChat clients render most Markdown but truncate very long lines awkwardly. +We: +- Collapse runs of blank lines to at most one. +- Hard-wrap non-code, non-table lines longer than ``LINE_WRAP_WIDTH``. +- Pack content into messages under ``MAX_MESSAGE_LENGTH`` while keeping + fenced code blocks (``` ``` ```) intact. +""" + +from __future__ import annotations + +import textwrap +from typing import Optional + +from .config import FENCE_RE, TABLE_RULE_RE + +LINE_WRAP_WIDTH = 120 + + +def normalize_markdown(content: str) -> str: + """Collapse multi-blank-line runs (outside code blocks) to a single blank.""" + lines = content.splitlines() + result: list[str] = [] + in_code_block = False + blank_run = 0 + + for raw_line in lines: + line = raw_line.rstrip() + if FENCE_RE.match(line.strip()): + in_code_block = not in_code_block + result.append(line) + blank_run = 0 + continue + if in_code_block: + result.append(line) + continue + if not line.strip(): + blank_run += 1 + if blank_run <= 1: + result.append("") + continue + blank_run = 0 + result.append(line) + + return "\n".join(result).strip() + + +def wrap_long_lines(content: str, width: int = LINE_WRAP_WIDTH) -> str: + """Soft-wrap copy-unfriendly long lines while preserving code/tables.""" + wrapped: list[str] = [] + in_code_block = False + + for raw_line in content.splitlines(): + line = raw_line.rstrip() + stripped = line.strip() + + if FENCE_RE.match(stripped): + in_code_block = not in_code_block + wrapped.append(line) + continue + + if ( + in_code_block + or len(line) <= width + or not stripped + or stripped.startswith("|") + or TABLE_RULE_RE.match(stripped) + ): + wrapped.append(line) + continue + + wrapped_lines = textwrap.wrap( + line, width=width, + break_long_words=False, break_on_hyphens=False, + replace_whitespace=False, drop_whitespace=True, + ) + wrapped.extend(wrapped_lines or [line]) + + return "\n".join(wrapped).strip() + + +def format_for_weixin(content: Optional[str]) -> str: + """Top-level formatter: normalize whitespace + soft-wrap long lines.""" + if not content: + return "" + return wrap_long_lines(normalize_markdown(content)) + + +def split_markdown_blocks(content: str) -> list[str]: + """Split content into markdown-aware blocks, keeping fenced code intact.""" + if not content: + return [] + blocks: list[str] = [] + current: list[str] = [] + in_code_block = False + + for raw_line in content.splitlines(): + line = raw_line.rstrip() + if FENCE_RE.match(line.strip()): + if not in_code_block and current: + blocks.append("\n".join(current).strip()) + current = [] + current.append(line) + in_code_block = not in_code_block + if not in_code_block: + blocks.append("\n".join(current).strip()) + current = [] + continue + if in_code_block: + current.append(line) + continue + if not line.strip(): + if current: + blocks.append("\n".join(current).strip()) + current = [] + continue + current.append(line) + + if current: + blocks.append("\n".join(current).strip()) + return [b for b in blocks if b] + + +def split_chunks(content: str, max_length: int) -> list[str]: + """Pack markdown blocks into chunks under *max_length*, preserving code fences. + + Long single blocks (e.g. a large code block) are force-split by line then + by character as a last resort. + """ + if not content: + return [] + if len(content) <= max_length: + return [content] + + chunks: list[str] = [] + current = "" + for block in split_markdown_blocks(content): + candidate = block if not current else f"{current}\n\n{block}" + if len(candidate) <= max_length: + current = candidate + continue + if current: + chunks.append(current) + current = "" + if len(block) <= max_length: + current = block + continue + # Block itself oversized — fall back to line-then-char split. + line_buf = "" + for line in block.splitlines(): + if len(line) > max_length: + if line_buf: + chunks.append(line_buf) + line_buf = "" + for i in range(0, len(line), max_length): + chunks.append(line[i:i + max_length]) + continue + if len(line_buf) + len(line) + 1 > max_length: + if line_buf: + chunks.append(line_buf) + line_buf = line + else: + line_buf = f"{line_buf}\n{line}" if line_buf else line + if line_buf: + current = line_buf + if current: + chunks.append(current) + return [c for c in chunks if c] or [content[:max_length]] diff --git a/flocks/channel/builtin/weixin/inbound.py b/flocks/channel/builtin/weixin/inbound.py new file mode 100644 index 00000000..1ac2af99 --- /dev/null +++ b/flocks/channel/builtin/weixin/inbound.py @@ -0,0 +1,65 @@ +""" +Inbound message parsing helpers for the iLink frame format. + +These are pure functions over the raw frame dicts emitted by ``getupdates``, +suitable for unit-testing without an aiohttp session. +""" + +from __future__ import annotations + +from .config import ITEM_FILE, ITEM_IMAGE, ITEM_TEXT, ITEM_VIDEO, ITEM_VOICE + + +def extract_text(item_list: list) -> str: + """Pull a flat text string out of an iLink ``item_list``. + + Handles plain text, replies / quotes (``ref_msg``), and voice-to-text + transcription fallback. + """ + for item in item_list: + if item.get("type") == ITEM_TEXT: + text = str((item.get("text_item") or {}).get("text") or "") + ref = item.get("ref_msg") or {} + ref_item = ref.get("message_item") or {} + ref_type = ref_item.get("type") + if ref_type in (ITEM_IMAGE, ITEM_VIDEO, ITEM_FILE, ITEM_VOICE): + title = ref.get("title") or "" + prefix = f"[引用媒体: {title}]\n" if title else "[引用媒体]\n" + return f"{prefix}{text}".strip() + if ref_item: + parts: list[str] = [] + if ref.get("title"): + parts.append(str(ref["title"])) + ref_text = extract_text([ref_item]) + if ref_text: + parts.append(ref_text) + if parts: + return f"[引用: {' | '.join(parts)}]\n{text}".strip() + return text + for item in item_list: + if item.get("type") == ITEM_VOICE: + voice_text = str((item.get("voice_item") or {}).get("text") or "") + if voice_text: + return voice_text + return "" + + +def guess_chat_type(message: dict, account_id: str) -> tuple[str, str]: + """Return ``(chat_type, effective_chat_id)`` where chat_type ∈ ``"dm"`` | ``"group"``.""" + room_id = str(message.get("room_id") or message.get("chat_room_id") or "").strip() + to_user_id = str(message.get("to_user_id") or "").strip() + is_group = bool(room_id) or ( + to_user_id and account_id and to_user_id != account_id + and message.get("msg_type") == 1 + ) + if is_group: + return "group", room_id or to_user_id or str(message.get("from_user_id") or "") + return "dm", str(message.get("from_user_id") or "") + + +def safe_id(value: object, keep: int = 8) -> str: + """Truncate IDs for log output while keeping enough to be useful.""" + raw = str(value or "").strip() + if not raw: + return "?" + return raw[:keep] if len(raw) > keep else raw diff --git a/flocks/channel/builtin/weixin/media.py b/flocks/channel/builtin/weixin/media.py new file mode 100644 index 00000000..95b5a48a --- /dev/null +++ b/flocks/channel/builtin/weixin/media.py @@ -0,0 +1,449 @@ +""" +High-level media orchestration for the Weixin channel. + +- ``MediaCache`` writes decrypted inbound bytes to a content-addressed disk + cache and returns local ``file://`` URIs that can travel through the rest + of the Flocks pipeline. +- ``download_inbound_item`` dispatches on iLink item type to fetch + decrypt + + cache an image / video / file / voice payload, returning ``(local_uri, + mime_type)``. +- ``send_outbound_file`` encrypts a local file, requests a CDN upload slot, + uploads the ciphertext, and posts the media item via ``send_media_message``. +- ``fetch_remote_to_temp`` resolves remote URLs to local temp files (used + when ``OutboundContext.media_url`` is an http(s) URL rather than a path). +""" + +from __future__ import annotations + +import base64 +import hashlib +import mimetypes +import secrets +import tempfile +import uuid +from pathlib import Path +from typing import TYPE_CHECKING, Callable, Optional + +from flocks.utils.log import Log + +from .cdn import ( + cdn_upload_url, + download_and_decrypt_media, + download_bytes, + media_reference, + upload_ciphertext, +) +from .client import get_upload_url, send_media_message +from .config import ( + ITEM_FILE, + ITEM_IMAGE, + ITEM_VIDEO, + ITEM_VOICE, + MEDIA_DOWNLOAD_FILE_TIMEOUT_S, + MEDIA_DOWNLOAD_IMAGE_TIMEOUT_S, + MEDIA_DOWNLOAD_VIDEO_TIMEOUT_S, + MEDIA_DOWNLOAD_VOICE_TIMEOUT_S, + MEDIA_FILE, + MEDIA_IMAGE, + MEDIA_REMOTE_FETCH_TIMEOUT_S, + MEDIA_UPLOAD_TIMEOUT_S, + MEDIA_VIDEO, + MEDIA_VOICE, +) +from .crypto import aes128_ecb_encrypt, aes_padded_size +from .inbound import safe_id +from .store import ensure_state_dir + +if TYPE_CHECKING: + import aiohttp + +log = Log.create(service="channel.weixin.media") + + +# --------------------------------------------------------------------------- +# Local content-addressed cache for inbound media +# --------------------------------------------------------------------------- + +class MediaCache: + """Write decrypted inbound bytes to ``/media/`` and yield URIs. + + Content-addressed by sha256 of the plaintext to deduplicate re-deliveries + of the same image / file across restarts. + """ + + def __init__(self, data_dir: Optional[str] = None) -> None: + self._root = ensure_state_dir(data_dir) / "media" + self._root.mkdir(parents=True, exist_ok=True) + + def write(self, data: bytes, suffix: str, original_name: Optional[str] = None) -> str: + """Cache *data* under sha256(data) + *suffix* and return a ``file://`` URI.""" + digest = hashlib.sha256(data).hexdigest() + if original_name: + stem = Path(original_name).stem.replace("/", "_") or "media" + name = f"{stem}-{digest[:16]}{suffix}" + else: + name = f"{digest}{suffix}" + path = self._root / name + if not path.exists(): + try: + path.write_bytes(data) + except Exception as exc: + log.warning("weixin.media.cache_write_error", {"error": str(exc)}) + return "" + return path.resolve().as_uri() + + +# --------------------------------------------------------------------------- +# Inbound dispatch +# --------------------------------------------------------------------------- + +def is_downloadable_media_item(item: dict) -> bool: + """Return True iff *item* is a media item that ``download_inbound_item`` + would actually fetch (i.e. would produce bytes, not text-only fallback). + """ + item_type = item.get("type") + if item_type in (ITEM_IMAGE, ITEM_VIDEO, ITEM_FILE): + return True + if item_type == ITEM_VOICE: + # Voice items already transcribed to text are not downloaded as media. + voice_item = item.get("voice_item") or {} + return not voice_item.get("text") + return False + + +async def download_inbound_item( + session: "aiohttp.ClientSession", + *, + item: dict, + cdn_base_url: str, + cache: MediaCache, + sender_log_id: str = "?", +) -> Optional[tuple[str, str]]: + """Download + decrypt + cache a single ``item_list`` entry. + + Returns ``(local_file_uri, mime_type)`` on success, or ``None`` for non-media + items / failures (logged at WARN). + """ + item_type = item.get("type") + try: + if item_type == ITEM_IMAGE: + return await _download_image(session, item, cdn_base_url, cache) + if item_type == ITEM_VIDEO: + return await _download_video(session, item, cdn_base_url, cache) + if item_type == ITEM_FILE: + return await _download_file(session, item, cdn_base_url, cache) + if item_type == ITEM_VOICE: + return await _download_voice(session, item, cdn_base_url, cache) + except Exception as exc: + log.warning("weixin.media.download_failed", { + "type": item_type, "from": sender_log_id, "error": str(exc), + }) + return None + + +async def _download_image( + session: "aiohttp.ClientSession", + item: dict, + cdn_base_url: str, + cache: MediaCache, +) -> Optional[tuple[str, str]]: + image_item = item.get("image_item") or {} + media = image_item.get("media") or {} + aes_key = _normalize_image_aes_key(image_item, media) + data = await download_and_decrypt_media( + session, + cdn_base_url=cdn_base_url, + encrypted_query_param=media.get("encrypt_query_param"), + aes_key_b64=aes_key, + full_url=media.get("full_url"), + timeout_seconds=MEDIA_DOWNLOAD_IMAGE_TIMEOUT_S, + ) + uri = cache.write(data, ".jpg") + return (uri, "image/jpeg") if uri else None + + +async def _download_video( + session: "aiohttp.ClientSession", + item: dict, + cdn_base_url: str, + cache: MediaCache, +) -> Optional[tuple[str, str]]: + media = media_reference(item, "video_item") + data = await download_and_decrypt_media( + session, + cdn_base_url=cdn_base_url, + encrypted_query_param=media.get("encrypt_query_param"), + aes_key_b64=media.get("aes_key"), + full_url=media.get("full_url"), + timeout_seconds=MEDIA_DOWNLOAD_VIDEO_TIMEOUT_S, + ) + uri = cache.write(data, ".mp4") + return (uri, "video/mp4") if uri else None + + +async def _download_file( + session: "aiohttp.ClientSession", + item: dict, + cdn_base_url: str, + cache: MediaCache, +) -> Optional[tuple[str, str]]: + file_item = item.get("file_item") or {} + media = file_item.get("media") or {} + filename = str(file_item.get("file_name") or "document.bin") + mime = mime_from_filename(filename) + data = await download_and_decrypt_media( + session, + cdn_base_url=cdn_base_url, + encrypted_query_param=media.get("encrypt_query_param"), + aes_key_b64=media.get("aes_key"), + full_url=media.get("full_url"), + timeout_seconds=MEDIA_DOWNLOAD_FILE_TIMEOUT_S, + ) + suffix = Path(filename).suffix or ".bin" + uri = cache.write(data, suffix, original_name=filename) + return (uri, mime) if uri else None + + +async def _download_voice( + session: "aiohttp.ClientSession", + item: dict, + cdn_base_url: str, + cache: MediaCache, +) -> Optional[tuple[str, str]]: + voice_item = item.get("voice_item") or {} + if voice_item.get("text"): + # Voice already transcribed by iLink; treat as text, no media to cache. + return None + media = voice_item.get("media") or {} + data = await download_and_decrypt_media( + session, + cdn_base_url=cdn_base_url, + encrypted_query_param=media.get("encrypt_query_param"), + aes_key_b64=media.get("aes_key"), + full_url=media.get("full_url"), + timeout_seconds=MEDIA_DOWNLOAD_VOICE_TIMEOUT_S, + ) + uri = cache.write(data, ".silk") + return (uri, "audio/silk") if uri else None + + +def _normalize_image_aes_key(image_item: dict, media: dict) -> Optional[str]: + """iLink image frames may stash the AES key under ``image_item.aeskey`` (hex) + instead of ``media.aes_key`` (b64). Reconcile both into a base64 string. + """ + if media.get("aes_key"): + return media["aes_key"] + aeskey_hex = image_item.get("aeskey") + if isinstance(aeskey_hex, str) and aeskey_hex: + try: + return base64.b64encode(bytes.fromhex(aeskey_hex)).decode("ascii") + except Exception: + return None + return None + + +# --------------------------------------------------------------------------- +# Outbound dispatch +# --------------------------------------------------------------------------- + +OutboundItemBuilder = Callable[..., dict] + + +def select_outbound_media( + path: str, force_file_attachment: bool = False +) -> tuple[int, OutboundItemBuilder]: + """Pick the right ``media_type`` + ``item`` constructor for *path*'s mime.""" + mime = mimetypes.guess_type(path)[0] or "application/octet-stream" + + if mime.startswith("image/"): + return MEDIA_IMAGE, _build_image_item + if mime.startswith("video/"): + return MEDIA_VIDEO, _build_video_item + if path.endswith(".silk") and not force_file_attachment: + return MEDIA_VOICE, _build_voice_item + if mime.startswith("audio/"): + # Non-silk audio: send as file attachment (silk is required for native voice bubble). + return MEDIA_FILE, _build_file_item + return MEDIA_FILE, _build_file_item + + +def _build_image_item(**kw) -> dict: + return { + "type": ITEM_IMAGE, + "image_item": { + "media": { + "encrypt_query_param": kw["encrypt_query_param"], + "aes_key": kw["aes_key_for_api"], + "encrypt_type": 1, + }, + "mid_size": kw["ciphertext_size"], + }, + } + + +def _build_video_item(**kw) -> dict: + return { + "type": ITEM_VIDEO, + "video_item": { + "media": { + "encrypt_query_param": kw["encrypt_query_param"], + "aes_key": kw["aes_key_for_api"], + "encrypt_type": 1, + }, + "video_size": kw["ciphertext_size"], + "play_length": kw.get("play_length", 0), + "video_md5": kw.get("rawfilemd5", ""), + }, + } + + +def _build_voice_item(**kw) -> dict: + return { + "type": ITEM_VOICE, + "voice_item": { + "media": { + "encrypt_query_param": kw["encrypt_query_param"], + "aes_key": kw["aes_key_for_api"], + "encrypt_type": 1, + }, + "encode_type": kw.get("encode_type", 6), + "bits_per_sample": kw.get("bits_per_sample", 16), + "sample_rate": kw.get("sample_rate", 24000), + "playtime": kw.get("playtime", 0), + }, + } + + +def _build_file_item(**kw) -> dict: + return { + "type": ITEM_FILE, + "file_item": { + "media": { + "encrypt_query_param": kw["encrypt_query_param"], + "aes_key": kw["aes_key_for_api"], + "encrypt_type": 1, + }, + "file_name": kw["filename"], + "len": str(kw["plaintext_size"]), + }, + } + + +async def send_outbound_file( + session: "aiohttp.ClientSession", + *, + base_url: str, + cdn_base_url: str, + token: str, + chat_id: str, + path: str, + context_token: Optional[str], + context_token_setter: Optional[Callable[[str, Optional[str]], None]] = None, + force_file_attachment: bool = False, +) -> str: + """Encrypt + upload + send a single local file. Returns the client_id used.""" + plaintext = Path(path).read_bytes() + media_type, item_builder = select_outbound_media( + path, force_file_attachment=force_file_attachment, + ) + filekey = secrets.token_hex(16) + aes_key = secrets.token_bytes(16) + rawsize = len(plaintext) + rawfilemd5 = hashlib.md5(plaintext).hexdigest() + + upload_resp = await get_upload_url( + session, + base_url=base_url, + token=token, + to_user_id=chat_id, + media_type=media_type, + filekey=filekey, + rawsize=rawsize, + rawfilemd5=rawfilemd5, + filesize=aes_padded_size(rawsize), + aeskey_hex=aes_key.hex(), + ) + + upload_param = str(upload_resp.get("upload_param") or "") + upload_full_url = str(upload_resp.get("upload_full_url") or "") + if upload_full_url: + upload_url = upload_full_url + elif upload_param: + upload_url = cdn_upload_url(cdn_base_url, upload_param, filekey) + else: + raise RuntimeError( + "getUploadUrl returned neither upload_param nor upload_full_url: " + f"{upload_resp}" + ) + + ciphertext = aes128_ecb_encrypt(plaintext, aes_key) + encrypted_query_param = await upload_ciphertext( + session, + ciphertext=ciphertext, + upload_url=upload_url, + timeout_seconds=MEDIA_UPLOAD_TIMEOUT_S, + ) + + # iLink expects aes_key as base64(hex_string), not base64(raw_bytes). + aes_key_for_api = base64.b64encode(aes_key.hex().encode("ascii")).decode("ascii") + + media_item_kwargs = { + "encrypt_query_param": encrypted_query_param, + "aes_key_for_api": aes_key_for_api, + "ciphertext_size": len(ciphertext), + "plaintext_size": rawsize, + "filename": Path(path).name, + "rawfilemd5": rawfilemd5, + } + media_item = item_builder(**media_item_kwargs) + + client_id = f"flocks-weixin-{uuid.uuid4().hex}" + await send_media_message( + session, + base_url=base_url, + token=token, + to=chat_id, + item=media_item, + context_token=context_token, + client_id=client_id, + ) + log.info("weixin.media.sent", { + "to": safe_id(chat_id), + "media_type": media_type, + "size": rawsize, + }) + return client_id + + +async def fetch_remote_to_temp( + session: "aiohttp.ClientSession", + *, + url: str, + timeout_seconds: float = MEDIA_REMOTE_FETCH_TIMEOUT_S, +) -> str: + """Download an http(s) URL into a temp file, return the local path. + + Caller is responsible for unlinking the temp file when done. + Only use after validating the URL belongs to the WeChat CDN. + """ + data = await download_bytes(session, url=url, timeout_seconds=timeout_seconds) + suffix = Path(url.split("?", 1)[0]).suffix or ".bin" + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as handle: + handle.write(data) + return handle.name + + +def mime_from_filename(filename: str) -> str: + return mimetypes.guess_type(filename)[0] or "application/octet-stream" + + +# Re-exported for the channel to schedule async tasks without a circular import. +__all__ = [ + "MediaCache", + "download_inbound_item", + "is_downloadable_media_item", + "send_outbound_file", + "fetch_remote_to_temp", + "mime_from_filename", + "select_outbound_media", +] diff --git a/flocks/channel/builtin/weixin/qr_login.py b/flocks/channel/builtin/weixin/qr_login.py new file mode 100644 index 00000000..e364fca8 --- /dev/null +++ b/flocks/channel/builtin/weixin/qr_login.py @@ -0,0 +1,172 @@ +""" +iLink Bot QR-code login flow for the Weixin channel. + +Two API endpoints are exposed by the Flocks server so the web UI can: + +1. ``POST /api/channel/weixin/qr-login/start`` + → Call ``ilink/bot/get_bot_qrcode`` (no token required — this *is* the + pre-auth step). Returns ``{qrcode_value, qrcode_url}`` so the frontend + can render a QR code with e.g. ``qrcode.react``. + +2. ``GET /api/channel/weixin/qr-login/status?qrcode=`` + → Poll ``ilink/bot/get_qrcode_status``. Returns + ``{status, account_id, token}`` where ``status`` is one of: + "waiting" — waiting for scan + "scaned" — phone scanned, waiting for phone confirmation tap + "confirmed"— login complete; ``account_id`` and ``token`` populated + "expired" — QR code expired; frontend should call /start again + +These helpers are pure async functions that accept an explicit ``base_url`` +so callers can override the iLink endpoint without touching global state. +""" + +from __future__ import annotations + +import json +import ssl +from typing import Optional + +from .config import ( + EP_GET_QR_STATUS, + EP_GET_BOT_QR, + ILINK_BASE_URL, + ILINK_APP_ID, + ILINK_APP_CLIENT_VERSION, + CHANNEL_VERSION, + QR_TIMEOUT_MS, +) + + +# --------------------------------------------------------------------------- +# HTTP helpers (no aiohttp session shared with the channel — login creates +# a throwaway session so it doesn't interfere with the poll loop) +# --------------------------------------------------------------------------- + +def _make_ssl_ctx(): + try: + import certifi + return ssl.create_default_context(cafile=certifi.where()) + except ImportError: + return True # aiohttp default + + +def _login_headers() -> dict: + return { + "Content-Type": "application/json", + "AuthorizationType": "ilink_bot_token", + "iLink-App-Id": ILINK_APP_ID, + "iLink-App-ClientVersion": str(ILINK_APP_CLIENT_VERSION), + } + + +async def _api_get(base_url: str, endpoint: str) -> dict: + """Simple GET against the iLink API with a short timeout.""" + import aiohttp + + url = f"{base_url.rstrip('/')}/{endpoint}" + timeout = aiohttp.ClientTimeout(total=QR_TIMEOUT_MS / 1000) + connector = aiohttp.TCPConnector(ssl=_make_ssl_ctx()) + async with aiohttp.ClientSession( + trust_env=True, connector=connector + ) as session: + async with session.get( + url, headers=_login_headers(), timeout=timeout + ) as resp: + raw = await resp.text() + if not resp.ok: + raise RuntimeError( + f"iLink GET {endpoint} HTTP {resp.status}: {raw[:200]}" + ) + return json.loads(raw) + + +# --------------------------------------------------------------------------- +# Public helpers called by the route handlers +# --------------------------------------------------------------------------- + +async def start_qr_login( + base_url: str = ILINK_BASE_URL, + bot_type: str = "3", +) -> dict: + """Request a fresh QR code from iLink. + + Returns ``{"qrcode_value": str, "qrcode_url": str}`` where + - ``qrcode_value`` is the raw hex token used to poll status + - ``qrcode_url`` is the WeChat mini-app URL to encode in the rendered QR + """ + resp = await _api_get( + base_url, + f"{EP_GET_BOT_QR}?bot_type={bot_type}", + ) + qrcode_value: str = str(resp.get("qrcode") or "") + qrcode_url: str = str(resp.get("qrcode_img_content") or "") + if not qrcode_value: + raise RuntimeError( + f"iLink get_bot_qrcode returned no qrcode field: {resp}" + ) + # WeChat must scan the full mini-app URL, not the raw hex token. + scan_data = qrcode_url if qrcode_url else qrcode_value + return { + "qrcode_value": qrcode_value, + "qrcode_url": scan_data, + } + + +async def poll_qr_status( + qrcode_value: str, + base_url: str = ILINK_BASE_URL, +) -> dict: + """Poll the QR code status once. + + Returns one of:: + + {"status": "waiting"} + {"status": "scaned"} + {"status": "expired"} + {"status": "redirect", "redirect_base_url": "https://..."} + {"status": "confirmed", "account_id": "...", "token": "...", + "base_url": "https://..."} + + ``redirect`` is returned when iLink routes the account to a regional node. + The frontend must pass the new ``redirect_base_url`` as ``base_url`` for all + subsequent calls so that the final ``confirmed`` response comes from the + correct node. It must also persist ``base_url`` from ``confirmed`` into the + channel config — otherwise the long-poll loop will connect to the wrong node. + + The caller (route handler) is responsible for looping / error handling. + """ + resp = await _api_get( + base_url, + f"{EP_GET_QR_STATUS}?qrcode={qrcode_value}", + ) + status: str = str(resp.get("status") or "waiting").lower() + + if status == "confirmed": + account_id = str(resp.get("ilink_bot_id") or "") + token = str(resp.get("bot_token") or "") + # iLink returns the canonical base_url for this account on confirmed. + # This may differ from ILINK_BASE_URL for accounts on regional nodes. + confirmed_base_url = str(resp.get("baseurl") or "").rstrip("/") or base_url + if not account_id or not token: + raise RuntimeError( + f"QR confirmed but missing credentials: {resp}" + ) + return { + "status": "confirmed", + "account_id": account_id, + "token": token, + "base_url": confirmed_base_url, + } + + if status == "scaned_but_redirect": + redirect_host = str(resp.get("redirect_host") or "").strip() + redirect_base_url = ( + f"https://{redirect_host}" if redirect_host else base_url + ) + return {"status": "redirect", "redirect_base_url": redirect_base_url} + + if status == "scaned": + return {"status": "scaned"} + if status == "expired": + return {"status": "expired"} + return {"status": "waiting"} diff --git a/flocks/channel/builtin/weixin/store.py b/flocks/channel/builtin/weixin/store.py new file mode 100644 index 00000000..cf08592f --- /dev/null +++ b/flocks/channel/builtin/weixin/store.py @@ -0,0 +1,144 @@ +""" +Disk-backed state stores for the Weixin channel: + +- ``ContextTokenStore`` — per-account, per-peer ``context_token`` cache + required to maintain conversation continuity with the iLink server. +- ``MessageDedup`` — in-memory dedup with TTL-based pruning. +- ``sync_buf`` helpers — long-poll cursor persistence. + +State files default to ``~/.flocks/workspace/channels/weixin/`` but the channel +can override the root via the ``dataDir`` config key (useful for multi-profile +setups). When ``dataDir`` is set it is used as-is (no ``weixin/`` sub-dir is +appended). +""" + +from __future__ import annotations + +import json +import time +from pathlib import Path +from typing import Optional + +from flocks.utils.log import Log + +from .config import MESSAGE_DEDUP_TTL_SECONDS + +log = Log.create(service="channel.weixin.store") + + +# --------------------------------------------------------------------------- +# Filesystem helpers +# --------------------------------------------------------------------------- + +def state_dir(data_dir: Optional[str] = None) -> Path: + if data_dir: + return Path(data_dir) + return Path.home() / ".flocks" / "workspace" / "channels" / "weixin" + + +def ensure_state_dir(data_dir: Optional[str] = None) -> Path: + path = state_dir(data_dir) + path.mkdir(parents=True, exist_ok=True) + return path + + +# --------------------------------------------------------------------------- +# Sync-buf cursor (long-poll position) +# --------------------------------------------------------------------------- + +def load_sync_buf(account_id: str, data_dir: Optional[str] = None) -> str: + path = state_dir(data_dir) / f"{account_id}.sync.json" + if not path.exists(): + return "" + try: + return json.loads(path.read_text(encoding="utf-8")).get("get_updates_buf", "") + except Exception: + return "" + + +def save_sync_buf(account_id: str, sync_buf: str, data_dir: Optional[str] = None) -> None: + try: + path = ensure_state_dir(data_dir) / f"{account_id}.sync.json" + path.write_text(json.dumps({"get_updates_buf": sync_buf}), encoding="utf-8") + except Exception as exc: + log.warning("weixin.sync_buf.save_error", {"error": str(exc)}) + + +# --------------------------------------------------------------------------- +# Per-peer context token cache +# --------------------------------------------------------------------------- + +class ContextTokenStore: + """Disk-backed ``context_token`` cache keyed by ``(account_id, user_id)``.""" + + def __init__(self, data_dir: Optional[str] = None) -> None: + self._root = state_dir(data_dir) + self._cache: dict[str, str] = {} + + def _path(self, account_id: str) -> Path: + return self._root / f"{account_id}.context-tokens.json" + + @staticmethod + def _key(account_id: str, user_id: str) -> str: + return f"{account_id}:{user_id}" + + def restore(self, account_id: str) -> None: + path = self._path(account_id) + if not path.exists(): + return + try: + data = json.loads(path.read_text(encoding="utf-8")) + except Exception: + return + for user_id, token in data.items(): + if isinstance(token, str) and token: + self._cache[self._key(account_id, user_id)] = token + + def get(self, account_id: str, user_id: str) -> Optional[str]: + return self._cache.get(self._key(account_id, user_id)) + + def set(self, account_id: str, user_id: str, token: str) -> None: + self._cache[self._key(account_id, user_id)] = token + self._persist(account_id) + + def clear(self, account_id: str, user_id: str) -> None: + """Drop a stale token (called on session-expired errors).""" + if self._cache.pop(self._key(account_id, user_id), None) is not None: + self._persist(account_id) + + def _persist(self, account_id: str) -> None: + prefix = f"{account_id}:" + payload = { + key[len(prefix):]: value + for key, value in self._cache.items() + if key.startswith(prefix) + } + try: + self._root.mkdir(parents=True, exist_ok=True) + self._path(account_id).write_text(json.dumps(payload), encoding="utf-8") + except Exception as exc: + log.warning("weixin.context_token.persist_error", {"error": str(exc)}) + + +# --------------------------------------------------------------------------- +# In-memory dedup with TTL pruning +# --------------------------------------------------------------------------- + +class MessageDedup: + """Track recent message ids / content hashes to drop redelivered messages.""" + + def __init__(self, ttl_seconds: float = MESSAGE_DEDUP_TTL_SECONDS) -> None: + self._ttl = ttl_seconds + self._seen: dict[str, float] = {} + + def is_duplicate(self, key: str) -> bool: + now = time.time() + cutoff = now - self._ttl + seen_at = self._seen.get(key) + if seen_at is not None and seen_at > cutoff: + return True + # Prune stale entries lazily every ~100 inserts to bound memory growth. + if len(self._seen) >= 100 and len(self._seen) % 100 == 0: + self._seen = {k: v for k, v in self._seen.items() if v > cutoff} + self._seen[key] = now + return False diff --git a/flocks/channel/inbound/dispatcher.py b/flocks/channel/inbound/dispatcher.py index 466ffc50..1512c8cd 100644 --- a/flocks/channel/inbound/dispatcher.py +++ b/flocks/channel/inbound/dispatcher.py @@ -959,6 +959,11 @@ async def _append_user_message( model: Optional[dict] = None, agent: Optional[str] = None, ) -> None: + import mimetypes + import os + from pathlib import Path + from urllib.parse import unquote, urlparse + from flocks.session.message import FilePart, Message, MessageRole create_kwargs: dict = dict( @@ -980,29 +985,70 @@ async def _append_user_message( message = await Message.create(**create_kwargs) - if msg.channel_id != "feishu" or not msg.media_url or channel_config is None: + if not msg.media_url: return try: - from flocks.channel.builtin.feishu.inbound_media import download_inbound_media + parsed = urlparse(msg.media_url) + scheme = parsed.scheme.lower() + + if msg.channel_id == "feishu" and channel_config is not None: + # Feishu: media is still on the remote server, download first. + from flocks.channel.builtin.feishu.inbound_media import download_inbound_media + + raw_cfg = channel_config.model_dump(by_alias=True, exclude_none=True) + media = await download_inbound_media(msg, raw_cfg) + if not media: + return + + await Message.store_part( + session_id, + message.id, + FilePart( + sessionID=session_id, + messageID=message.id, + mime=media.mime, + filename=media.filename, + url=media.url, + source=media.source, + ), + ) - raw_cfg = channel_config.model_dump(by_alias=True, exclude_none=True) - media = await download_inbound_media(msg, raw_cfg) - if not media: - return + elif scheme in ("", "file"): + # Local file already downloaded by the channel plugin (e.g. weixin). + # file:// URIs may have URL-encoded paths (e.g. Chinese filenames). + local_path = unquote(parsed.path) if scheme == "file" else msg.media_url + if not os.path.isfile(local_path): + log.warning("dispatcher.inbound_media_missing", { + "channel_id": msg.channel_id, + "path": local_path, + }) + return + filename = Path(local_path).name + mime = ( + msg.media_mime + or mimetypes.guess_type(local_path)[0] + or "application/octet-stream" + ) + file_uri = Path(local_path).resolve().as_uri() + await Message.store_part( + session_id, + message.id, + FilePart( + sessionID=session_id, + messageID=message.id, + mime=mime, + filename=filename, + url=file_uri, + source=None, + ), + ) + log.info("dispatcher.inbound_media_attached", { + "channel_id": msg.channel_id, + "filename": filename, + "mime": mime, + }) - await Message.store_part( - session_id, - message.id, - FilePart( - sessionID=session_id, - messageID=message.id, - mime=media.mime, - filename=media.filename, - url=media.url, - source=media.source, - ), - ) except Exception as e: log.warning("dispatcher.inbound_media_download_failed", { "channel_id": msg.channel_id, diff --git a/flocks/channel/registry.py b/flocks/channel/registry.py index aff20c7d..03819050 100644 --- a/flocks/channel/registry.py +++ b/flocks/channel/registry.py @@ -80,10 +80,12 @@ def _register_builtin_channels(self) -> None: from flocks.channel.builtin.feishu.channel import FeishuChannel from flocks.channel.builtin.telegram.channel import TelegramChannel from flocks.channel.builtin.wecom.channel import WeComChannel + from flocks.channel.builtin.weixin.channel import WeixinChannel self.register(FeishuChannel()) self.register(WeComChannel()) self.register(TelegramChannel()) self.register(DingTalkChannel()) + self.register(WeixinChannel()) def _register_plugin_extension_point(self) -> None: from flocks.plugin import PluginLoader, ExtensionPoint diff --git a/flocks/server/routes/channel.py b/flocks/server/routes/channel.py index fa456a1d..4ee07d1a 100644 --- a/flocks/server/routes/channel.py +++ b/flocks/server/routes/channel.py @@ -300,6 +300,52 @@ async def _do(): return {"ok": True} +# --------------------------------------------------------------------------- +# Weixin QR login +# --------------------------------------------------------------------------- + +class WeixinQrStartRequest(BaseModel): + baseUrl: Optional[str] = None + + +@router.post("/weixin/qr-login/start") +async def weixin_qr_login_start(req: WeixinQrStartRequest): + """Request a fresh iLink Bot QR code for WeChat account login. + + No credentials needed — this is the pre-authentication step. + Returns ``{qrcode_value, qrcode_url}`` for the frontend to render. + """ + from flocks.channel.builtin.weixin.config import ILINK_BASE_URL + from flocks.channel.builtin.weixin.qr_login import start_qr_login + + base_url = (req.baseUrl or "").strip() or ILINK_BASE_URL + try: + result = await start_qr_login(base_url=base_url) + return {"ok": True, **result} + except Exception as exc: + log.error("weixin.qr_login.start_failed", {"error": str(exc)}) + raise HTTPException(status_code=502, detail=str(exc)) + + +@router.get("/weixin/qr-login/status") +async def weixin_qr_login_status(qrcode: str, baseUrl: Optional[str] = None): + """Poll the QR code scan status once. + + Returns ``{status}`` where status ∈ waiting | scaned | expired | confirmed. + On ``confirmed`` also returns ``{account_id, token}``. + """ + from flocks.channel.builtin.weixin.config import ILINK_BASE_URL + from flocks.channel.builtin.weixin.qr_login import poll_qr_status + + base_url = (baseUrl or "").strip() or ILINK_BASE_URL + try: + result = await poll_qr_status(qrcode_value=qrcode, base_url=base_url) + return {"ok": True, **result} + except Exception as exc: + log.error("weixin.qr_login.poll_failed", {"error": str(exc)}) + raise HTTPException(status_code=502, detail=str(exc)) + + # --------------------------------------------------------------------------- # Telegram pairing # --------------------------------------------------------------------------- diff --git a/webui/package-lock.json b/webui/package-lock.json index 232a16ce..3d3f9d2c 100644 --- a/webui/package-lock.json +++ b/webui/package-lock.json @@ -16,6 +16,7 @@ "i18next": "^25.8.14", "i18next-browser-languagedetector": "^8.2.1", "lucide-react": "^0.562.0", + "qrcode.react": "^4.2.0", "react": "^19.2.0", "react-dom": "^19.2.0", "react-i18next": "^16.5.6", @@ -6390,6 +6391,15 @@ "node": ">=6" } }, + "node_modules/qrcode.react": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/qrcode.react/-/qrcode.react-4.2.0.tgz", + "integrity": "sha512-QpgqWi8rD9DsS9EP3z7BT+5lY5SFhsqGjpgW5DY/i3mK4M9DTBNz3ErMi8BWYEfI3L0d8GIbGmcdFAS1uIRGjA==", + "license": "ISC", + "peerDependencies": { + "react": "^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0" + } + }, "node_modules/querystringify": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", diff --git a/webui/package.json b/webui/package.json index 44c05b6b..ee2f6e88 100644 --- a/webui/package.json +++ b/webui/package.json @@ -22,6 +22,7 @@ "i18next": "^25.8.14", "i18next-browser-languagedetector": "^8.2.1", "lucide-react": "^0.562.0", + "qrcode.react": "^4.2.0", "react": "^19.2.0", "react-dom": "^19.2.0", "react-i18next": "^16.5.6", diff --git a/webui/public/channel-weixin.png b/webui/public/channel-weixin.png new file mode 100644 index 00000000..916104af Binary files /dev/null and b/webui/public/channel-weixin.png differ diff --git a/webui/src/locales/en-US/channel.json b/webui/src/locales/en-US/channel.json index 5a02ae3a..fa058f8f 100644 --- a/webui/src/locales/en-US/channel.json +++ b/webui/src/locales/en-US/channel.json @@ -11,7 +11,8 @@ "feishu": "Feishu", "wecom": "WeCom", "dingtalk": "DingTalk", - "telegram": "Telegram" + "telegram": "Telegram", + "weixin": "WeChat" }, "status": { "enabled": "Enabled", @@ -232,5 +233,57 @@ "dedupTtlSeconds": "Dedup TTL (seconds)", "dedupTtlSecondsHint": "Retention time for message deduplication records (default 86400s = 24h)", "optional": "Optional" + }, + "weixin": { + "enableTitle": "Enable WeChat Channel", + "enableDesc": "When enabled, the bot connects to WeChat personal accounts via the iLink Bot long-poll API.", + "credentials": "Account Credentials", + "credentialsDesc": "Obtain Token and Account ID after scanning the iLink Bot QR code.", + "tokenHint": "iLink Bot Token (obtained after QR login)", + "accountIdHint": "iLink Bot Account ID (obtained after QR login, format: xxx@im.bot)", + "baseUrl": "iLink API Base URL", + "baseUrlHint": "Custom iLink API base URL (leave empty to use default https://ilinkai.weixin.qq.com)", + "optional": "Optional", + "behavior": "Message Behavior", + "behaviorDesc": "Configure message routing and DM policy.", + "defaultAgent": "Default Agent", + "defaultAgentHint": "Default Agent ID used when no Agent is specified", + "dmPolicy": "DM Policy", + "dmPolicyHint": "Controls which users' direct messages can trigger the Agent", + "dmPolicyOpen": "Open (everyone allowed)", + "dmPolicyAllowlist": "Allowlist (only listed users)", + "dmPolicyDisabled": "Disabled (no DMs accepted)", + "allowFrom": "Allowed User IDs", + "allowFromHint": "In allowlist mode, only these WeChat user_ids can trigger the Agent", + "allowFromPlaceholder": "Enter WeChat user_id and press Enter", + "groupPolicy": "Group Policy", + "groupPolicyHint": "Controls whether group chat messages trigger the Agent", + "groupPolicyAll": "All (process all group messages)", + "groupPolicyAllowlist": "Allowlist (allowed groups only)", + "groupPolicyDisabled": "Disabled (ignore group messages)", + "groupAllowFrom": "Allowed Group IDs", + "groupAllowFromHint": "In allowlist mode, only messages from these group / room IDs trigger the Agent", + "groupAllowFromPlaceholder": "Enter group ID and press Enter", + "advanced": "Advanced Settings", + "advancedDesc": "Message chunk delay and state storage directory.", + "sendChunkDelay": "Chunk Send Delay (s)", + "sendChunkDelayHint": "Delay in seconds between multi-chunk messages (default 1.5s)", + "dataDir": "State Storage Directory", + "dataDirHint": "Directory for sync_buf and context-token state files (leave empty to use ~/.flocks/workspace/channels/weixin)", + "qrLoginButton": "Connect via QR Code", + "qrLoading": "Fetching QR code…", + "qrAlreadyLinked": "Account already linked — scan again to replace credentials", + "qrModalTitle": "WeChat QR Login", + "qrHintScanning": "Scan the QR code above with WeChat", + "qrHintScaned": "Scanned — please tap Confirm on your phone", + "qrHintConfirmed": "Login successful — credentials auto-filled", + "qrScaned": "Scanned, confirm on phone", + "qrConfirmed": "Login successful!", + "qrExpired": "QR code expired — please refresh", + "qrRefresh": "Refresh QR Code", + "qrRetry": "Retry", + "qrError": "Failed to fetch QR code — check your network connection", + "qrDone": "Done", + "qrSuccess": "WeChat account connected — credentials auto-filled" } } diff --git a/webui/src/locales/zh-CN/channel.json b/webui/src/locales/zh-CN/channel.json index a7e25b18..fbd4c2cb 100644 --- a/webui/src/locales/zh-CN/channel.json +++ b/webui/src/locales/zh-CN/channel.json @@ -11,7 +11,8 @@ "feishu": "飞书", "wecom": "企业微信", "dingtalk": "钉钉", - "telegram": "Telegram" + "telegram": "Telegram", + "weixin": "微信" }, "status": { "enabled": "已启用", @@ -234,5 +235,57 @@ "dedupTtlSeconds": "去重 TTL (秒)", "dedupTtlSecondsHint": "消息去重记录的保留时间(默认 86400 秒 = 24 小时)", "optional": "选填" + }, + "weixin": { + "enableTitle": "启用微信通道", + "enableDesc": "启用后,机器人将通过 iLink Bot 长轮询接入微信,并开始接收和回复消息。", + "credentials": "账号凭证", + "credentialsDesc": "通过微信 iLink Bot QR 扫码登录后获取 Token 和 Account ID。", + "tokenHint": "iLink Bot Token(扫码登录后获取)", + "accountIdHint": "iLink Bot Account ID(扫码登录后获取,格式如 xxx@im.bot)", + "baseUrl": "iLink API 地址", + "baseUrlHint": "自定义 iLink API 地址(留空使用默认 https://ilinkai.weixin.qq.com)", + "optional": "选填", + "behavior": "消息行为", + "behaviorDesc": "配置消息路由和私信接收策略。", + "defaultAgent": "默认 Agent", + "defaultAgentHint": "未指定 Agent 时使用的默认 Agent ID", + "dmPolicy": "私信策略", + "dmPolicyHint": "控制哪些用户的私信可以触发 Agent", + "dmPolicyOpen": "开放(所有人均可)", + "dmPolicyAllowlist": "白名单(仅允许名单内用户)", + "dmPolicyDisabled": "关闭(不接受私信)", + "allowFrom": "允许的用户 ID", + "allowFromHint": "白名单模式下,仅允许这些微信 user_id 触发 Agent", + "allowFromPlaceholder": "输入微信 user_id 后按回车添加", + "groupPolicy": "群聊策略", + "groupPolicyHint": "控制群聊消息是否触发 Agent", + "groupPolicyAll": "全部(处理所有群消息)", + "groupPolicyAllowlist": "白名单(仅允许名单内群组)", + "groupPolicyDisabled": "关闭(不处理群消息)", + "groupAllowFrom": "允许的群 ID", + "groupAllowFromHint": "白名单模式下,仅允许这些群 / 房间 ID 触发 Agent", + "groupAllowFromPlaceholder": "输入群 ID 后按回车添加", + "advanced": "高级设置", + "advancedDesc": "消息分块发送间隔及状态存储目录。", + "sendChunkDelay": "分块发送间隔 (秒)", + "sendChunkDelayHint": "多段消息之间的发送间隔秒数(默认 1.5 秒)", + "dataDir": "状态存储目录", + "dataDirHint": "sync_buf 与 context-token 等状态文件存储目录(留空使用默认 ~/.flocks/workspace/channels/weixin)", + "qrLoginButton": "扫码登录微信", + "qrLoading": "正在获取二维码…", + "qrAlreadyLinked": "已连接账号,可重新扫码替换", + "qrModalTitle": "微信扫码登录", + "qrHintScanning": "请用微信扫描上方二维码", + "qrHintScaned": "扫码成功,请在手机上点击「确认登录」", + "qrHintConfirmed": "登录成功,凭证已自动填入", + "qrScaned": "已扫码,请确认", + "qrConfirmed": "登录成功!", + "qrExpired": "二维码已过期,请点击刷新", + "qrRefresh": "刷新二维码", + "qrRetry": "重试", + "qrError": "获取二维码失败,请检查网络连接", + "qrDone": "完成", + "qrSuccess": "微信账号连接成功,凭证已自动填入" } } diff --git a/webui/src/pages/Channel/index.tsx b/webui/src/pages/Channel/index.tsx index ef246969..09ca07a6 100644 --- a/webui/src/pages/Channel/index.tsx +++ b/webui/src/pages/Channel/index.tsx @@ -1,4 +1,5 @@ import { useState, useEffect, useCallback, useRef } from 'react'; +import { QRCodeSVG } from 'qrcode.react'; import { Radio, Save, @@ -128,7 +129,22 @@ interface TelegramChannelConfig { streamingCoalesceMs?: number; } -type ChannelConfig = FeishuChannelConfig | WeComChannelConfig | DingTalkChannelConfig | TelegramChannelConfig; +interface WeixinChannelConfig { + enabled: boolean; + token?: string; + accountId?: string; + baseUrl?: string; + cdnBaseUrl?: string; + defaultAgent?: string; + dmPolicy?: string; + allowFrom?: string[]; + groupPolicy?: string; + groupAllowFrom?: string[]; + sendChunkDelay?: number; + dataDir?: string; +} + +type ChannelConfig = FeishuChannelConfig | WeComChannelConfig | DingTalkChannelConfig | TelegramChannelConfig | WeixinChannelConfig; function defaultFeishuConfig(): FeishuChannelConfig { return { @@ -175,6 +191,15 @@ function defaultTelegramConfig(): TelegramChannelConfig { }; } +function defaultWeixinConfig(): WeixinChannelConfig { + return { + enabled: false, + dmPolicy: 'open', + groupPolicy: 'all', + sendChunkDelay: 1.5, + }; +} + // ============================================================================ // Form primitives // ============================================================================ @@ -430,6 +455,7 @@ const CHANNEL_ICON_SRC: Record = { wecom: '/channel-wecom.png', dingtalk: '/channel-dingtalk.png', telegram: '/channel-telegram.png', + weixin: '/channel-weixin.png', }; const FEISHU_GUIDE_PDF_URL = '/feishu-bot-guide.pdf'; @@ -616,6 +642,7 @@ function ConnectionStatusPanel({ status, config, channelId }: ConnectionStatusPa {channelId === 'feishu' && 'WebSocket'} {channelId === 'wecom' && 'WebSocket'} {channelId === 'dingtalk' && 'Stream'} + {channelId === 'weixin' && 'Long-Poll'} {channelId === 'telegram' && ((config as TelegramChannelConfig).mode === 'webhook' ? 'Webhook' : 'Polling')} @@ -1369,6 +1396,354 @@ function TelegramPanel({ config, onChange, onRefresh }: TelegramPanelProps) { ); } +// ============================================================================ +// Weixin Config Panel +// ============================================================================ + +interface WeixinPanelProps { + config: WeixinChannelConfig; + onChange: (c: WeixinChannelConfig) => void; + /** Persist QR-obtained credentials to flocks.json + restart the channel. + * Called automatically when the QR login flow completes. */ + onQrLoginSuccess?: (creds: { token: string; accountId: string; baseUrl?: string }) => Promise | void; +} + +type QrPhase = + | 'idle' // initial / closed + | 'loading' // fetching QR from backend + | 'scanning' // QR shown, waiting for phone scan + | 'scaned' // phone scanned, waiting for confirmation tap + | 'confirmed' // login complete — credentials filled + | 'expired' // QR expired, allow restart + | 'error'; // network / API error + +function WeixinPanel({ config, onChange, onQrLoginSuccess }: WeixinPanelProps) { + const { t } = useTranslation('channel'); + const toast = useToast(); + const set = useCallback( + (key: K, value: WeixinChannelConfig[K]) => + onChange({ ...config, [key]: value }), + [config, onChange] + ); + + // ── QR login state ────────────────────────────────────────────────────── + const [qrPhase, setQrPhase] = useState('idle'); + const [qrUrl, setQrUrl] = useState(''); // URL to encode into QR SVG + const [qrValue, setQrValue] = useState(''); // hex token used for polling + const [qrError, setQrError] = useState(''); + const pollRef = useRef | null>(null); + // Guard: multiple in-flight requests may all resolve with "confirmed". + // Only the first one should act; the rest are no-ops. + const confirmedRef = useRef(false); + // Tracks the current polling base_url; may change on scaned_but_redirect. + const currentBaseUrlRef = useRef(undefined); + + const stopPolling = () => { + if (pollRef.current) { + clearInterval(pollRef.current); + pollRef.current = null; + } + }; + + // Cleanup on unmount + useEffect(() => () => stopPolling(), []); + + const startQrLogin = async () => { + stopPolling(); + confirmedRef.current = false; + currentBaseUrlRef.current = config.baseUrl?.trim() || undefined; + setQrError(''); + setQrPhase('loading'); + try { + const baseUrl = config.baseUrl?.trim() || undefined; + const res = await client.post('/api/channel/weixin/qr-login/start', { baseUrl: baseUrl ?? null }); + const { qrcode_value, qrcode_url } = res.data; + setQrValue(qrcode_value); + setQrUrl(qrcode_url); + setQrPhase('scanning'); + + // Poll status every 2 s. + // NOTE: each tick is an async call; multiple ticks can be in-flight + // simultaneously. confirmedRef prevents duplicate side-effects. + // currentBaseUrlRef tracks regional redirects (scaned_but_redirect). + pollRef.current = setInterval(async () => { + try { + const statusRes = await client.get('/api/channel/weixin/qr-login/status', { + params: { qrcode: qrcode_value, baseUrl: currentBaseUrlRef.current ?? undefined }, + }); + const { status, account_id, token, base_url, redirect_base_url } = statusRes.data; + if (status === 'scaned') { + setQrPhase('scaned'); + } else if (status === 'redirect') { + // iLink is routing this account to a different regional node. + // Update base_url so subsequent polls hit the correct host. + if (redirect_base_url) currentBaseUrlRef.current = redirect_base_url; + setQrPhase('scaned'); + } else if (status === 'confirmed') { + if (confirmedRef.current) return; // already handled + confirmedRef.current = true; + stopPolling(); + setQrPhase('confirmed'); + // Auto-fill credentials including the canonical base_url for this + // account — it may differ from the default when iLink redirected. + const newConfig: WeixinChannelConfig = { + ...config, + accountId: account_id, + token, + ...(base_url ? { baseUrl: base_url } : {}), + }; + onChange(newConfig); + // Persist immediately — without this the gateway keeps trying to + // start with the (still empty) on-disk config and the channel never + // actually connects to WeChat. + if (onQrLoginSuccess) { + try { + await onQrLoginSuccess({ + token, + accountId: account_id, + ...(base_url ? { baseUrl: base_url } : {}), + }); + } catch (err: any) { + toast.error(t('weixin.qrError'), err?.message ?? ''); + } + } + toast.success(t('weixin.qrSuccess')); + } else if (status === 'expired') { + stopPolling(); + setQrPhase('expired'); + } + // 'waiting' → keep polling + } catch { + // transient network error — keep polling + } + }, 2000); + } catch (err: any) { + const detail = err?.response?.data?.detail ?? err?.message ?? ''; + setQrError(detail); + setQrPhase('error'); + } + }; + + const closeQrModal = () => { + stopPolling(); + setQrPhase('idle'); + setQrUrl(''); + setQrValue(''); + setQrError(''); + }; + + const showModal = qrPhase !== 'idle'; + + return ( + <> +
+ {/* QR login launcher */} +
+ + {config.token && config.accountId && ( +

+ + {t('weixin.qrAlreadyLinked')} +

+ )} +
+ + {/* QR modal overlay */} + {showModal && ( +
+
+ {/* Close button */} + + +

{t('weixin.qrModalTitle')}

+ + {/* QR code display area */} + {qrPhase === 'loading' && ( +
+ +
+ )} + {(qrPhase === 'scanning' || qrPhase === 'scaned') && qrUrl && ( +
+
+ +
+ {qrPhase === 'scaned' && ( +
+
+ +

{t('weixin.qrScaned')}

+
+
+ )} +
+ )} + {qrPhase === 'confirmed' && ( +
+ +

{t('weixin.qrConfirmed')}

+
+ )} + {qrPhase === 'expired' && ( +
+ +

{t('weixin.qrExpired')}

+ +
+ )} + {qrPhase === 'error' && ( +
+ +

{qrError || t('weixin.qrError')}

+ +
+ )} + + {/* Status hint */} +

+ {qrPhase === 'scanning' && t('weixin.qrHintScanning')} + {qrPhase === 'scaned' && t('weixin.qrHintScaned')} + {qrPhase === 'confirmed' && t('weixin.qrHintConfirmed')} + {qrPhase === 'expired' && ''} + {qrPhase === 'error' && ''} +

+ + {qrPhase === 'confirmed' && ( + + )} +
+
+ )} + +
+ + + set('token', v || undefined)} + placeholder="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + /> + + + set('accountId', v || undefined)} + placeholder="xxxxxxxxxxxxxxxxx@im.bot" + /> + + + set('baseUrl', v || undefined)} + placeholder={t('weixin.optional')} + /> + +
+ +
+ + set('defaultAgent', v || undefined)} + placeholder={t('weixin.optional')} + /> + + + set('groupPolicy', v)} + options={[ + { value: 'all', label: t('weixin.groupPolicyAll') }, + { value: 'allowlist', label: t('weixin.groupPolicyAllowlist') }, + { value: 'disabled', label: t('weixin.groupPolicyDisabled') }, + ]} + /> + + {(config.groupPolicy ?? 'all') === 'allowlist' && ( + + set('groupAllowFrom', v.length ? v : undefined)} + placeholder={t('weixin.groupAllowFromPlaceholder')} + /> + + )} +
+ +
+ + set('sendChunkDelay', v)} + min={0} + /> + + + set('dataDir', v || undefined)} + placeholder={t('weixin.optional')} + /> + +
+ + ); +} + // ============================================================================ // Detail Panel Header // ============================================================================ @@ -1581,6 +1956,8 @@ export default function ChannelPage() { configs[ch.id] = { ...defaultDingTalkConfig(), ...saved }; } else if (ch.id === 'telegram') { configs[ch.id] = { ...defaultTelegramConfig(), ...saved }; + } else if (ch.id === 'weixin') { + configs[ch.id] = { ...defaultWeixinConfig(), ...saved }; } else { configs[ch.id] = { enabled: false, ...saved }; } @@ -1669,6 +2046,50 @@ export default function ChannelPage() { } }; + // Persist credentials obtained via WeChat QR login + auto-enable + restart. + // The user explicitly initiated the QR scan, so we treat that as consent to + // enable the channel — no extra "save & enable" click required. + // Mirrors handleToggleEnabled's single-field update pattern so that any + // other unsaved channel edits are not flushed prematurely. + const handleWeixinQrSuccess = async ( + creds: { token: string; accountId: string; baseUrl?: string } + ) => { + const channelId = 'weixin'; + const savedChannelCfg = (fullConfig.channels?.[channelId] ?? {}) as Record; + const updatedChannelCfg: Record = { + ...savedChannelCfg, + enabled: true, + token: creds.token, + accountId: creds.accountId, + }; + if (creds.baseUrl) updatedChannelCfg.baseUrl = creds.baseUrl; + + const updatedChannels = { ...(fullConfig.channels ?? {}), [channelId]: updatedChannelCfg }; + const updated = { ...fullConfig, channels: updatedChannels }; + + await client.patch('/api/config/', updated); + setFullConfig(updated); + + // Sync the in-memory editor state so the UI immediately reflects the + // newly-saved values (token + accountId fields, enabled toggle, baseUrl). + setChannelConfigs((prev) => ({ + ...prev, + [channelId]: { ...prev[channelId], ...updatedChannelCfg } as ChannelConfig, + })); + originalConfigsRef.current = { + ...originalConfigsRef.current, + [channelId]: { ...originalConfigsRef.current[channelId], ...updatedChannelCfg }, + }; + + // Restart the channel so the new credentials take effect immediately. + // Fire-and-forget — server may take time to disconnect WebSocket. + client.post(`/api/channel/${channelId}/restart`, {}, { timeout: 5000 }).catch(() => {}); + + // Sync UI state after the connection has had time to come up. + setTimeout(() => { fetchAll(); fetchStatuses(true); }, 3000); + setTimeout(() => { fetchAll(); fetchStatuses(true); }, 8000); + }; + // Manual restart — useful when connection drops and user wants to reconnect const handleRestart = async (channelId?: string) => { const id = channelId ?? selectedId; @@ -1860,6 +2281,13 @@ export default function ChannelPage() { onRefresh={fetchAll} /> )} + {selectedId === 'weixin' && ( + handleChannelConfigChange('weixin', cfg)} + onQrLoginSuccess={handleWeixinQrSuccess} + /> + )} ) : (