diff --git a/.github/workflows/sync-cloud-run-env.yml b/.github/workflows/sync-cloud-run-env.yml index 7bcc69e..2db75f7 100644 --- a/.github/workflows/sync-cloud-run-env.yml +++ b/.github/workflows/sync-cloud-run-env.yml @@ -24,6 +24,11 @@ jobs: STRATEGY_PROFILE: ${{ vars.STRATEGY_PROFILE }} ACCOUNT_GROUP: ${{ vars.ACCOUNT_GROUP }} IB_ACCOUNT_GROUP_CONFIG_SECRET_NAME: ${{ vars.IB_ACCOUNT_GROUP_CONFIG_SECRET_NAME }} + IBKR_FEATURE_SNAPSHOT_PATH: ${{ vars.IBKR_FEATURE_SNAPSHOT_PATH }} + IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH: ${{ vars.IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH }} + IBKR_STRATEGY_CONFIG_PATH: ${{ vars.IBKR_STRATEGY_CONFIG_PATH }} + IBKR_RECONCILIATION_OUTPUT_PATH: ${{ vars.IBKR_RECONCILIATION_OUTPUT_PATH }} + IBKR_DRY_RUN_ONLY: ${{ vars.IBKR_DRY_RUN_ONLY }} IB_GATEWAY_ZONE: ${{ vars.IB_GATEWAY_ZONE }} IB_GATEWAY_IP_MODE: ${{ vars.IB_GATEWAY_IP_MODE }} GLOBAL_TELEGRAM_CHAT_ID: ${{ vars.GLOBAL_TELEGRAM_CHAT_ID }} @@ -51,6 +56,14 @@ jobs: NOTIFY_LANG ) + if [ "${STRATEGY_PROFILE:-}" = "russell_1000_multi_factor_defensive" ] || [ "${STRATEGY_PROFILE:-}" = "cash_buffer_branch_default" ]; then + required_vars+=(IBKR_FEATURE_SNAPSHOT_PATH) + fi + + if [ "${STRATEGY_PROFILE:-}" = "cash_buffer_branch_default" ]; then + required_vars+=(IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH IBKR_STRATEGY_CONFIG_PATH IBKR_RECONCILIATION_OUTPUT_PATH) + fi + missing_vars=() for var_name in "${required_vars[@]}"; do if [ -z "${!var_name:-}" ]; then @@ -129,6 +142,36 @@ jobs: remove_env_vars+=("IB_GATEWAY_IP_MODE") fi + if [ -n "${IBKR_FEATURE_SNAPSHOT_PATH:-}" ]; then + env_pairs+=("IBKR_FEATURE_SNAPSHOT_PATH=${IBKR_FEATURE_SNAPSHOT_PATH}") + else + remove_env_vars+=("IBKR_FEATURE_SNAPSHOT_PATH") + fi + + if [ -n "${IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH:-}" ]; then + env_pairs+=("IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH=${IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH}") + else + remove_env_vars+=("IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH") + fi + + if [ -n "${IBKR_STRATEGY_CONFIG_PATH:-}" ]; then + env_pairs+=("IBKR_STRATEGY_CONFIG_PATH=${IBKR_STRATEGY_CONFIG_PATH}") + else + remove_env_vars+=("IBKR_STRATEGY_CONFIG_PATH") + fi + + if [ -n "${IBKR_RECONCILIATION_OUTPUT_PATH:-}" ]; then + env_pairs+=("IBKR_RECONCILIATION_OUTPUT_PATH=${IBKR_RECONCILIATION_OUTPUT_PATH}") + else + remove_env_vars+=("IBKR_RECONCILIATION_OUTPUT_PATH") + fi + + if [ -n "${IBKR_DRY_RUN_ONLY:-}" ]; then + env_pairs+=("IBKR_DRY_RUN_ONLY=${IBKR_DRY_RUN_ONLY}") + else + remove_env_vars+=("IBKR_DRY_RUN_ONLY") + fi + gcloud_args=( run services update "${CLOUD_RUN_SERVICE}" --region "${CLOUD_RUN_REGION}" diff --git a/README.md b/README.md index cb114f4..c3bffb6 100644 --- a/README.md +++ b/README.md @@ -14,9 +14,13 @@ Quarterly momentum rotation across 22 global ETFs (international markets, commodities, US sectors, US broad market, tech, and semiconductors) with daily canary emergency check. Designed to stay more stable than high-beta tech strategies while still allowing major tech leadership to enter the rotation. Deployed on GCP Cloud Run, connecting to IB Gateway on GCE. -The current `global_etf_rotation` implementation is sourced from `UsEquityStrategies`. +This runtime now supports multiple `us_equity` profiles sourced from `UsEquityStrategies`. -Full strategy documentation now lives in [`UsEquityStrategies`](https://github.com/QuantStrategyLab/UsEquityStrategies#global_etf_rotation). The strategy section below is kept as an execution-side summary. +Current runtime-facing profiles: +- `global_etf_rotation` — quarterly ETF rotation (current rollback line) +- `cash_buffer_branch_default` — monthly stock-selection branch with an explicit `80%` stock cap and `BOXX` parking + +Full strategy documentation lives in `UsEquityStrategies`; the sections here stay focused on execution and deployment. ### Strategy @@ -109,13 +113,18 @@ The selected `ACCOUNT_GROUP` is now the runtime identity. Keep broker-specific i |----------|----------|-------------| | `IB_GATEWAY_ZONE` | Optional fallback | GCE zone (for example `us-central1-a`). Recommended to keep in the selected account-group entry; this env var is only a transition fallback. | | `IB_GATEWAY_IP_MODE` | Optional fallback | `internal` (default) or `external`. Recommended to keep in the selected account-group entry; this env var is only a transition fallback. | -| `STRATEGY_PROFILE` | Yes | Strategy profile selector. Current required `us_equity` value: `global_etf_rotation` | +| `STRATEGY_PROFILE` | Yes | Strategy profile selector. Supported `us_equity` values: `global_etf_rotation`, `cash_buffer_branch_default` | | `ACCOUNT_GROUP` | Yes | Account-group selector. No default fallback. | | `IB_ACCOUNT_GROUP_CONFIG_SECRET_NAME` | Yes for Cloud Run | Secret Manager secret name for account-group config JSON. Recommended production source. | | `IB_ACCOUNT_GROUP_CONFIG_JSON` | No | Local/dev JSON fallback for account-group config. Not recommended for production Cloud Run. | | `TELEGRAM_TOKEN` | Yes | Telegram bot token. For Cloud Run, prefer a Secret Manager reference instead of a literal env var. | | `GLOBAL_TELEGRAM_CHAT_ID` | Yes | Telegram chat ID used by this service. | | `NOTIFY_LANG` | No | `en` (default) or `zh` | +| `IBKR_FEATURE_SNAPSHOT_PATH` | Required for snapshot-based profiles | Latest feature snapshot file path. Required for `cash_buffer_branch_default`. | +| `IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH` | Required for `cash_buffer_branch_default` | Sidecar manifest path used by runtime freshness / contract checks. | +| `IBKR_STRATEGY_CONFIG_PATH` | Required for `cash_buffer_branch_default` | Canonical runtime config path used for manifest/config matching. | +| `IBKR_RECONCILIATION_OUTPUT_PATH` | No | Optional structured reconciliation output path for dry-run / paper execution logs. | +| `IBKR_DRY_RUN_ONLY` | No | `true` to block order submission and only emit planned actions; `false` for real paper orders. | The selected account-group entry must provide at least: @@ -135,17 +144,32 @@ If you use instance-name resolution with `ib_gateway_zone`, the Cloud Run runtim For the current first rollout, keep GitHub / Cloud Run focused on service-level values: ```bash +# rollback / legacy ETF line STRATEGY_PROFILE=global_etf_rotation ACCOUNT_GROUP=default IB_ACCOUNT_GROUP_CONFIG_SECRET_NAME=ibkr-account-groups GLOBAL_TELEGRAM_CHAT_ID= NOTIFY_LANG=zh -# Optional transition fallback only: +# optional transition fallback only: IB_GATEWAY_ZONE=us-central1-c IB_GATEWAY_IP_MODE=internal ``` +```bash +# snapshot-based stock paper branch +STRATEGY_PROFILE=cash_buffer_branch_default +ACCOUNT_GROUP=default +IB_ACCOUNT_GROUP_CONFIG_SECRET_NAME=ibkr-account-groups +IBKR_FEATURE_SNAPSHOT_PATH=/var/data/cash_buffer_branch_feature_snapshot_latest.csv +IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH=/var/data/cash_buffer_branch_feature_snapshot_latest.csv.manifest.json +IBKR_STRATEGY_CONFIG_PATH=/app/research/configs/growth_pullback_cash_buffer_branch_default.json +IBKR_RECONCILIATION_OUTPUT_PATH=/var/log/ibkr_cash_buffer_branch_reconciliation.json +IBKR_DRY_RUN_ONLY=true +GLOBAL_TELEGRAM_CHAT_ID= +NOTIFY_LANG=zh +``` + This shared-config mode is only for the **IBKR pair** (`InteractiveBrokersPlatform` + `IBKRGatewayManager`). It is not meant to become a global secret bundle for unrelated quant repos. Across multiple quant projects, the only broadly reusable runtime settings are usually `GLOBAL_TELEGRAM_CHAT_ID` and `NOTIFY_LANG`. Recommended account-group config payload: @@ -195,7 +219,7 @@ Recommended setup: - `CLOUD_RUN_REGION` - `CLOUD_RUN_SERVICE` - `TELEGRAM_TOKEN_SECRET_NAME` (recommended when Cloud Run already uses Secret Manager for `TELEGRAM_TOKEN`) - - `STRATEGY_PROFILE` (recommended: `global_etf_rotation`) + - `STRATEGY_PROFILE` (recommended: `global_etf_rotation` for rollback, or `cash_buffer_branch_default` for the snapshot-based paper branch) - `ACCOUNT_GROUP` (recommended: `default`) - `IB_ACCOUNT_GROUP_CONFIG_SECRET_NAME` - `GLOBAL_TELEGRAM_CHAT_ID` @@ -208,7 +232,7 @@ Recommended setup: On every push to `main`, the workflow updates the existing Cloud Run service with the values above and removes legacy env vars that should now live in the account-group config (`IB_CLIENT_ID`, `IB_GATEWAY_INSTANCE_NAME`, `IB_GATEWAY_MODE`) plus the older transport vars (`IB_GATEWAY_HOST`, `IB_GATEWAY_PORT`, `TELEGRAM_CHAT_ID`). If `IB_GATEWAY_ZONE` or `IB_GATEWAY_IP_MODE` are blank in GitHub, the workflow also removes them from Cloud Run to avoid drift. -For now, `STRATEGY_PROFILE` still only supports one strategy profile. The current strategy domain is `us_equity`, and the repo now keeps a thin strategy registry so future expansion can grow by domain + profile instead of mixing strategy and platform in one layer. `ACCOUNT_GROUP` now selects one account-group config entry, and the service fails fast if that runtime identity is incomplete. +`STRATEGY_PROFILE` now supports both `global_etf_rotation` and `cash_buffer_branch_default` under the shared `us_equity` domain. `ACCOUNT_GROUP` selects one account-group config entry, and the service still fails fast if that runtime identity is incomplete. Important: @@ -265,9 +289,13 @@ gcloud run services update ibkr-quant \ 基于 IBKR 的全球 ETF 季度轮动策略(国际市场、商品、美股行业、美股宽基、科技和半导体),含每日金丝雀应急机制。定位上比 `TQQQ`、`SOXL` 这类高弹性科技策略更稳健,但不再把科技完全排除在外。部署在 GCP Cloud Run,连接 GCE 上的 IB Gateway。 -当前 `global_etf_rotation` 的策略实现来自 `UsEquityStrategies`。 +当前这个 runtime 已经可以承载多个来自 `UsEquityStrategies` 的 `us_equity` profile。 + +当前运行侧最相关的两个 profile: +- `global_etf_rotation`:季度 ETF 轮动,也是当前 rollback 线 +- `cash_buffer_branch_default`:月频个股分支,`risk_on` 明确只上 `80%` 股票,其余停在 `BOXX` -完整策略说明现在放在 [`UsEquityStrategies`](https://github.com/QuantStrategyLab/UsEquityStrategies#global_etf_rotation)。下面的策略章节主要保留执行侧摘要。 +完整策略说明放在 `UsEquityStrategies`;这里主要保留执行和部署侧摘要。 ### 策略 @@ -324,13 +352,18 @@ IBKR 账户 |------|------|------| | `IB_GATEWAY_ZONE` | 可选过渡项 | GCE zone(如 `us-central1-a`)。推荐直接放进选中的账号组配置里;这里只保留过渡 fallback。 | | `IB_GATEWAY_IP_MODE` | 可选过渡项 | `internal`(默认)或 `external`。推荐直接放进选中的账号组配置里;这里只保留过渡 fallback。 | -| `STRATEGY_PROFILE` | 是 | 策略档位选择。当前必填的 `us_equity` 值:`global_etf_rotation` | +| `STRATEGY_PROFILE` | 是 | 策略档位选择。当前支持的 `us_equity` 值:`global_etf_rotation`、`cash_buffer_branch_default` | | `ACCOUNT_GROUP` | 是 | 账号组选择器,不再提供默认回退。 | | `IB_ACCOUNT_GROUP_CONFIG_SECRET_NAME` | Cloud Run 建议必填 | 账号组配置 JSON 在 Secret Manager 里的密钥名。生产环境推荐使用。 | | `IB_ACCOUNT_GROUP_CONFIG_JSON` | 否 | 本地开发用的账号组配置 JSON fallback。不建议在生产 Cloud Run 直接使用。 | | `TELEGRAM_TOKEN` | 是 | Telegram 机器人 Token。Cloud Run 上更推荐走 Secret Manager 引用,不要直接写成明文 env。 | | `GLOBAL_TELEGRAM_CHAT_ID` | 是 | 这个服务使用的 Telegram Chat ID。 | | `NOTIFY_LANG` | 否 | `en`(默认)或 `zh` | +| `IBKR_FEATURE_SNAPSHOT_PATH` | snapshot 型策略必填 | 最新 feature snapshot 文件路径。`cash_buffer_branch_default` 必填。 | +| `IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH` | `cash_buffer_branch_default` 必填 | snapshot sidecar manifest 路径,用于 freshness / contract 检查。 | +| `IBKR_STRATEGY_CONFIG_PATH` | `cash_buffer_branch_default` 必填 | runtime 侧 canonical config 路径,用于 manifest/config 匹配。 | +| `IBKR_RECONCILIATION_OUTPUT_PATH` | 否 | dry-run / paper 执行后的结构化对账输出路径。 | +| `IBKR_DRY_RUN_ONLY` | 否 | `true` 时只输出计划动作不下单;`false` 时允许真正的 paper 下单。 | 选中的账号组配置里,至少要有: @@ -410,7 +443,7 @@ IB_GATEWAY_IP_MODE=internal - `CLOUD_RUN_REGION` - `CLOUD_RUN_SERVICE` - `TELEGRAM_TOKEN_SECRET_NAME`(如果 Cloud Run 上的 `TELEGRAM_TOKEN` 已经改成 Secret Manager,建议配置) - - `STRATEGY_PROFILE`(建议设为 `global_etf_rotation`) + - `STRATEGY_PROFILE`(rollback 建议用 `global_etf_rotation`,snapshot 个股 paper 分支用 `cash_buffer_branch_default`) - `ACCOUNT_GROUP`(建议设为 `default`) - `IB_ACCOUNT_GROUP_CONFIG_SECRET_NAME` - `GLOBAL_TELEGRAM_CHAT_ID` @@ -423,7 +456,7 @@ IB_GATEWAY_IP_MODE=internal 每次 push 到 `main` 时,这个 workflow 会把上面这些值同步到现有 Cloud Run 服务里,并清掉已经转移到账号组配置里的旧 env(`IB_CLIENT_ID`、`IB_GATEWAY_INSTANCE_NAME`、`IB_GATEWAY_MODE`)以及更早的传输层 env(`IB_GATEWAY_HOST`、`IB_GATEWAY_PORT`、`TELEGRAM_CHAT_ID`)。如果 GitHub 里没有配置 `IB_GATEWAY_ZONE` 或 `IB_GATEWAY_IP_MODE`,workflow 也会把 Cloud Run 上这两个旧值一起删除,避免双配置源漂移。 -`STRATEGY_PROFILE` 当前只有一个可用值;当前策略域是 `us_equity`,本地策略注册表只用于域和 profile 校验。`ACCOUNT_GROUP` 是严格必填项,并会选中一份账号组配置。运行身份不完整时,服务会直接失败,不再静默回退。 +`STRATEGY_PROFILE` 现在支持 `global_etf_rotation` 和 `cash_buffer_branch_default` 两个 `us_equity` profile。`ACCOUNT_GROUP` 仍然是严格必填项,并会选中一份账号组配置;运行身份不完整时,服务会直接失败,不再静默回退。 注意: diff --git a/application/execution_service.py b/application/execution_service.py index de29c0d..12fc611 100644 --- a/application/execution_service.py +++ b/application/execution_service.py @@ -2,7 +2,15 @@ from __future__ import annotations +import hashlib +import json +import tempfile import time +from datetime import datetime +from pathlib import Path +from typing import Any + +import pandas as pd def get_market_prices(ib, symbols, *, fetch_quote_snapshots): @@ -16,7 +24,32 @@ def check_order_submitted(report, *, translator): order_id = report.broker_order_id status = report.status - if status in ["Submitted", "PreSubmitted", "Filled"]: + if status == "Filled": + return ( + True, + translator( + "order_filled", + symbol=report.symbol, + side=report.side, + qty=int(report.filled_quantity or report.quantity), + price=f"{float(report.average_fill_price or 0.0):.2f}", + order_id=order_id, + ), + ) + if status in {"PartiallyFilled", "Partial"}: + return ( + True, + translator( + "order_partial", + symbol=report.symbol, + side=report.side, + executed=int(report.filled_quantity or 0), + qty=int(report.quantity or 0), + price=f"{float(report.average_fill_price or 0.0):.2f}", + order_id=order_id, + ), + ) + if status in ["Submitted", "PreSubmitted"]: return True, f"✅ {translator('submitted', order_id=order_id)}" return False, f"❌ {translator('failed', reason=status)}" @@ -29,6 +62,227 @@ def get_available_buying_power(ib, fallback_buying_power): return buying_power +def _iter_open_orders(ib) -> list[Any]: + open_trades = getattr(ib, "openTrades", None) + if callable(open_trades): + return list(open_trades() or []) + open_orders = getattr(ib, "openOrders", None) + if callable(open_orders): + return list(open_orders() or []) + return [] + + +def _extract_open_order_symbol(order_like: Any) -> str | None: + contract = getattr(order_like, "contract", None) + if contract is None and hasattr(order_like, "order"): + contract = getattr(order_like, "contract", None) + symbol = getattr(contract, "symbol", None) + if symbol is None and hasattr(order_like, "symbol"): + symbol = getattr(order_like, "symbol") + symbol_text = str(symbol or "").strip().upper() + return symbol_text or None + + +def _extract_open_order_status(order_like: Any) -> str: + order_status = getattr(order_like, "orderStatus", None) + status = getattr(order_status, "status", None) + if status is None: + status = getattr(order_like, "status", None) + return str(status or "").strip() + + +def _collect_pending_symbols(ib, symbols: set[str]) -> tuple[str, ...]: + pending = [] + for order_like in _iter_open_orders(ib): + status = _extract_open_order_status(order_like) + if status in {"Cancelled", "ApiCancelled", "Inactive", "Filled"}: + continue + symbol = _extract_open_order_symbol(order_like) + if symbol and symbol in symbols: + pending.append(symbol) + return tuple(sorted(dict.fromkeys(pending))) + + +def _iter_fills(ib) -> list[Any]: + fills = getattr(ib, "fills", None) + if callable(fills): + return list(fills() or []) + return [] + + +def _extract_fill_symbol(fill_like: Any) -> str | None: + contract = getattr(fill_like, "contract", None) + symbol = getattr(contract, "symbol", None) + symbol_text = str(symbol or "").strip().upper() + return symbol_text or None + + +def _normalize_date_like(value: Any) -> str | None: + if value in {None, ""}: + return None + ts = pd.Timestamp(value) + if getattr(ts, "tzinfo", None) is not None: + ts = ts.tz_convert(None) + else: + ts = ts.tz_localize(None) + return ts.normalize().date().isoformat() + + +def _extract_fill_date(fill_like: Any) -> str | None: + execution = getattr(fill_like, "execution", None) + for candidate in ( + getattr(execution, "time", None), + getattr(fill_like, "time", None), + ): + normalized = _normalize_date_like(candidate) + if normalized is not None: + return normalized + return None + + +def _collect_same_day_filled_symbols(ib, symbols: set[str], trade_date: str | None) -> tuple[str, ...]: + if not trade_date: + return () + matched = [] + for fill_like in _iter_fills(ib): + symbol = _extract_fill_symbol(fill_like) + if not symbol or symbol not in symbols: + continue + fill_date = _extract_fill_date(fill_like) + if fill_date == trade_date: + matched.append(symbol) + return tuple(sorted(dict.fromkeys(matched))) + + +def _round_weight(value: float) -> float: + return round(float(value or 0.0), 8) + + +def _build_target_hash(target_weights: dict[str, float]) -> str: + payload = [[str(symbol), _round_weight(weight)] for symbol, weight in sorted(target_weights.items())] + return hashlib.sha256(json.dumps(payload, separators=(",", ":"), ensure_ascii=True).encode("utf-8")).hexdigest() + + +def _sanitize_token(value: str | None) -> str: + text = str(value or "").strip() + if not text: + return "none" + safe = "".join(ch if ch.isalnum() or ch in {"-", "_", "."} else "_" for ch in text) + return safe or "none" + + +def _resolve_execution_lock_path( + *, + strategy_profile: str | None, + account_group: str | None, + service_name: str | None, + trade_date: str | None, + snapshot_date: str | None, + dry_run_only: bool, + execution_lock_dir: str | Path | None, +) -> Path: + lock_dir = Path(execution_lock_dir) if execution_lock_dir else Path(tempfile.gettempdir()) / "ibkr_execution_locks" + mode = "dry_run" if dry_run_only else "paper" + scope = "__".join( + [ + _sanitize_token(account_group or "default"), + _sanitize_token(service_name or "service"), + _sanitize_token(strategy_profile or "unknown"), + _sanitize_token(mode), + _sanitize_token(trade_date), + _sanitize_token(snapshot_date or "no_snapshot"), + ] + ) + return lock_dir / f"{scope}.json" + + +def _read_execution_lock(lock_path: Path) -> dict[str, Any] | None: + if not lock_path.exists(): + return None + return json.loads(lock_path.read_text(encoding="utf-8")) + + +def _try_create_execution_lock(lock_path: Path, payload: dict[str, Any]) -> bool: + lock_path.parent.mkdir(parents=True, exist_ok=True) + try: + with lock_path.open("x", encoding="utf-8") as fh: + json.dump(payload, fh, ensure_ascii=False, indent=2, sort_keys=True) + return True + except FileExistsError: + return False + + +def _build_execution_lock_payload( + *, + strategy_profile: str | None, + account_group: str | None, + service_name: str | None, + account_ids: tuple[str, ...] | list[str] | None, + trade_date: str | None, + snapshot_date: str | None, + target_hash: str, + dry_run_only: bool, +) -> dict[str, Any]: + return { + "strategy_profile": strategy_profile, + "account_group": account_group, + "service_name": service_name, + "account_ids": list(account_ids or ()), + "trade_date": trade_date, + "snapshot_date": snapshot_date, + "mode": "dry_run" if dry_run_only else "paper", + "target_hash": target_hash, + "created_at": datetime.utcnow().isoformat(timespec="seconds") + "Z", + } + + +def _format_target_lines( + target_weights: dict[str, float], + current_mv: dict[str, float], + equity: float, +) -> list[str]: + current_weight = { + symbol: (current_mv.get(symbol, 0.0) / equity if equity > 0 else 0.0) + for symbol in set(target_weights) | set(current_mv) + } + target_lines = [] + for symbol, target_weight in sorted(target_weights.items(), key=lambda item: (-item[1], item[0])): + delta = target_weight - current_weight.get(symbol, 0.0) + target_lines.append( + f"target_diff {symbol}: current={current_weight.get(symbol, 0.0):.1%} target={target_weight:.1%} delta={delta:.1%}" + ) + return target_lines + + +def _build_target_diff_rows( + target_weights: dict[str, float], + current_mv: dict[str, float], + equity: float, +) -> list[dict[str, float | str]]: + current_weight = { + symbol: (current_mv.get(symbol, 0.0) / equity if equity > 0 else 0.0) + for symbol in set(target_weights) | set(current_mv) + } + rows = [] + for symbol, target_weight in sorted(target_weights.items(), key=lambda item: (-item[1], item[0])): + current_value = current_weight.get(symbol, 0.0) + rows.append( + { + "symbol": symbol, + "current_weight": current_value, + "target_weight": float(target_weight), + "delta_weight": float(target_weight - current_value), + } + ) + return rows + + +def _finalize_result(trade_logs, execution_summary, *, return_summary: bool): + if return_summary: + return trade_logs, execution_summary + return trade_logs + + def execute_rebalance( ib, target_weights, @@ -39,25 +293,63 @@ def execute_rebalance( submit_order_intent, order_intent_cls, translator, - ranking_pool, - safe_haven, + strategy_symbols=None, + signal_metadata=None, + strategy_profile=None, + account_group=None, + service_name=None, + account_ids=None, + dry_run_only=False, cash_reserve_ratio, rebalance_threshold_ratio, limit_buy_premium, sell_settle_delay_sec, + execution_lock_dir=None, + return_summary=False, ): """Execute trades to reach target weights.""" + signal_metadata = signal_metadata or {} + trade_date = str(signal_metadata.get("trade_date") or "").strip() or None + snapshot_date = _normalize_date_like(signal_metadata.get("snapshot_as_of")) + safe_haven_symbol = str(signal_metadata.get("safe_haven_symbol") or "").strip().upper() or None equity = account_values.get("equity", 0) + execution_summary = { + "mode": "dry_run" if dry_run_only else "paper", + "strategy_profile": strategy_profile, + "trade_date": trade_date, + "snapshot_as_of": snapshot_date, + "safe_haven_symbol": safe_haven_symbol, + "target_stock_weight": signal_metadata.get("target_stock_weight"), + "realized_stock_weight": signal_metadata.get("realized_stock_weight"), + "target_safe_haven_weight": signal_metadata.get("safe_haven_weight"), + "realized_safe_haven_weight": signal_metadata.get("safe_haven_weight"), + "orders_submitted": [], + "orders_filled": [], + "orders_partially_filled": [], + "orders_skipped": [], + "skipped_reasons": [], + "target_vs_current": [], + "execution_status": "not_started", + "no_op_reason": None, + "cash_reserve_dollars": 0.0, + "residual_cash_estimate": float(account_values.get("buying_power", 0.0) or 0.0), + "current_stock_weight": 0.0, + "current_safe_haven_weight": 0.0, + "lock_path": None, + } if equity <= 0: - return ["❌ No equity"] + execution_summary["execution_status"] = "blocked" + execution_summary["no_op_reason"] = "no_equity" + return _finalize_result(["❌ No equity"], execution_summary, return_summary=return_summary) reserved = equity * cash_reserve_ratio investable = equity - reserved threshold = equity * rebalance_threshold_ratio + execution_summary["cash_reserve_dollars"] = float(reserved) all_symbols = set(target_weights.keys()) | set(positions.keys()) - strategy_symbols = set(ranking_pool + [safe_haven]) - all_symbols = all_symbols & strategy_symbols + if strategy_symbols: + all_symbols = all_symbols & set(strategy_symbols) prices = get_market_prices(ib, all_symbols, fetch_quote_snapshots=fetch_quote_snapshots) @@ -69,6 +361,120 @@ def execute_rebalance( target_mv = {symbol: investable * weight for symbol, weight in target_weights.items()} trade_logs = [] + target_hash = _build_target_hash(target_weights) + execution_summary["target_vs_current"] = _build_target_diff_rows(target_weights, current_mv, equity) + if equity > 0: + current_safe_haven_mv = current_mv.get(safe_haven_symbol, 0.0) if safe_haven_symbol else 0.0 + execution_summary["current_safe_haven_weight"] = float(current_safe_haven_mv / equity) + execution_summary["current_stock_weight"] = float( + (sum(current_mv.values()) - current_safe_haven_mv) / equity + ) + + pending_symbols = _collect_pending_symbols(ib, set(all_symbols)) + if pending_symbols: + reason = f"pending_orders_detected:{','.join(pending_symbols)}" + execution_summary["execution_status"] = "blocked" + execution_summary["no_op_reason"] = reason + execution_summary["skipped_reasons"].append(reason) + trade_logs.append( + f"pending_orders_detected profile={strategy_profile or ''} symbols={','.join(pending_symbols)}" + ) + return _finalize_result(trade_logs, execution_summary, return_summary=return_summary) + + trade_logs.append( + f"profile={strategy_profile or ''} regime={signal_metadata.get('regime')} " + f"breadth={signal_metadata.get('breadth_ratio', 0.0):.1%} " + f"target_stock={signal_metadata.get('target_stock_weight', 0.0):.1%} " + f"realized_stock={signal_metadata.get('realized_stock_weight', 0.0):.1%} " + f"snapshot_as_of={snapshot_date or ''} trade_date={trade_date or ''}" + ) + trade_logs.extend(_format_target_lines(target_weights, current_mv, equity)) + + has_sell_plan = False + for symbol in all_symbols: + current = current_mv.get(symbol, 0.0) + target = target_mv.get(symbol, 0.0) + price = prices.get(symbol) + if price and current > target + threshold and int((current - target) / price) > 0: + has_sell_plan = True + break + + anticipated_buying_power = get_available_buying_power( + ib, + account_values.get("buying_power", 0), + ) + has_buy_plan = False + for symbol, target in target_mv.items(): + current = current_mv.get(symbol, 0.0) + price = prices.get(symbol) + if not price or current >= target - threshold: + continue + buy_value = min(target - current, anticipated_buying_power * 0.95) + limit_price = round(price * limit_buy_premium, 2) + qty = int(buy_value / limit_price) if limit_price > 0 else 0 + if qty > 0 and buy_value >= 50: + has_buy_plan = True + break + + if not has_sell_plan and not has_buy_plan: + execution_summary["execution_status"] = "no_op" + execution_summary["no_op_reason"] = "target_diff_below_threshold" + return _finalize_result(trade_logs, execution_summary, return_summary=return_summary) + + same_day_filled_symbols = _collect_same_day_filled_symbols(ib, set(all_symbols), trade_date) + if same_day_filled_symbols: + reason = f"same_day_fills_detected:{','.join(same_day_filled_symbols)}" + execution_summary["execution_status"] = "blocked" + execution_summary["no_op_reason"] = reason + execution_summary["skipped_reasons"].append(reason) + trade_logs.append( + f"same_day_fills_detected profile={strategy_profile or ''} mode={'dry_run' if dry_run_only else 'paper'} " + f"symbols={','.join(same_day_filled_symbols)} trade_date={trade_date}" + ) + return _finalize_result(trade_logs, execution_summary, return_summary=return_summary) + + lock_path = _resolve_execution_lock_path( + strategy_profile=strategy_profile, + account_group=account_group, + service_name=service_name, + trade_date=trade_date, + snapshot_date=snapshot_date, + dry_run_only=dry_run_only, + execution_lock_dir=execution_lock_dir, + ) + lock_payload = _build_execution_lock_payload( + strategy_profile=strategy_profile, + account_group=account_group, + service_name=service_name, + account_ids=tuple(account_ids or ()), + trade_date=trade_date, + snapshot_date=snapshot_date, + target_hash=target_hash, + dry_run_only=dry_run_only, + ) + if not _try_create_execution_lock(lock_path, lock_payload): + existing = _read_execution_lock(lock_path) or {} + reason = ( + f"same_day_execution_locked:mode={'dry_run' if dry_run_only else 'paper'}:" + f"target_hash={existing.get('target_hash', '')}" + ) + execution_summary["execution_status"] = "blocked" + execution_summary["no_op_reason"] = reason + execution_summary["skipped_reasons"].append(reason) + execution_summary["lock_path"] = str(lock_path) + trade_logs.append( + "same_day_execution_locked " + f"profile={strategy_profile or ''} mode={'dry_run' if dry_run_only else 'paper'} " + f"trade_date={trade_date or ''} snapshot_date={snapshot_date or ''} " + f"target_hash={existing.get('target_hash', '')} lock_path={lock_path}" + ) + return _finalize_result(trade_logs, execution_summary, return_summary=return_summary) + execution_summary["lock_path"] = str(lock_path) + trade_logs.append( + f"execution_lock_acquired mode={'dry_run' if dry_run_only else 'paper'} " + f"trade_date={trade_date or ''} snapshot_date={snapshot_date or ''} lock_path={lock_path}" + ) + execution_summary["execution_status"] = "executing" sell_executed = False for symbol in all_symbols: @@ -78,16 +484,42 @@ def execute_rebalance( sell_value = current - target price = prices.get(symbol) if not price: + execution_summary["orders_skipped"].append({"symbol": symbol, "side": "sell", "reason": "missing_price"}) + execution_summary["skipped_reasons"].append(f"missing_price:{symbol}") continue qty = int(sell_value / price) if qty <= 0: + execution_summary["orders_skipped"].append({"symbol": symbol, "side": "sell", "reason": "quantity_zero"}) continue + if dry_run_only: + execution_summary["orders_submitted"].append( + {"symbol": symbol, "side": "sell", "quantity": qty, "status": "dry_run"} + ) + trade_logs.append(f"DRY_RUN sell {symbol} {qty}") + continue report = submit_order_intent( ib, order_intent_cls(symbol=symbol, side="sell", quantity=qty), ) ok, status_msg = check_order_submitted(report, translator=translator) + status = str(getattr(report, "status", "") or "") + order_payload = { + "symbol": symbol, + "side": "sell", + "quantity": qty, + "status": status, + "broker_order_id": getattr(report, "broker_order_id", None), + } + if status == "Filled": + execution_summary["orders_filled"].append(order_payload) + elif status in {"PartiallyFilled", "Partial"}: + execution_summary["orders_partially_filled"].append(order_payload) + elif ok: + execution_summary["orders_submitted"].append(order_payload) + else: + execution_summary["orders_skipped"].append({**order_payload, "reason": status or "submit_failed"}) + execution_summary["skipped_reasons"].append(f"submit_failed:{symbol}:{status or 'unknown'}") trade_logs.append(translator("market_sell", symbol=symbol, qty=qty) + f" {status_msg}") if ok: sell_executed = True @@ -95,7 +527,7 @@ def execute_rebalance( if sell_executed: time.sleep(sell_settle_delay_sec) - buying_power = get_available_buying_power( + buying_power = anticipated_buying_power if not sell_executed else get_available_buying_power( ib, account_values.get("buying_power", 0), ) @@ -105,14 +537,33 @@ def execute_rebalance( if current < target - threshold: buy_value = min(target - current, buying_power * 0.95) price = prices.get(symbol) - if not price or buy_value < 50: + if not price: + execution_summary["orders_skipped"].append({"symbol": symbol, "side": "buy", "reason": "missing_price"}) + execution_summary["skipped_reasons"].append(f"missing_price:{symbol}") + continue + if buy_value < 50: + execution_summary["orders_skipped"].append({"symbol": symbol, "side": "buy", "reason": "min_notional"}) continue limit_price = round(price * limit_buy_premium, 2) qty = int(buy_value / limit_price) if qty <= 0: + execution_summary["orders_skipped"].append({"symbol": symbol, "side": "buy", "reason": "quantity_zero"}) continue + if dry_run_only: + execution_summary["orders_submitted"].append( + { + "symbol": symbol, + "side": "buy", + "quantity": qty, + "limit_price": limit_price, + "status": "dry_run", + } + ) + trade_logs.append(f"DRY_RUN buy {symbol} {qty} @{limit_price:.2f}") + buying_power -= qty * limit_price + continue report = submit_order_intent( ib, order_intent_cls( @@ -125,10 +576,38 @@ def execute_rebalance( ), ) ok, status_msg = check_order_submitted(report, translator=translator) + status = str(getattr(report, "status", "") or "") + order_payload = { + "symbol": symbol, + "side": "buy", + "quantity": qty, + "limit_price": limit_price, + "status": status, + "broker_order_id": getattr(report, "broker_order_id", None), + } + if status == "Filled": + execution_summary["orders_filled"].append(order_payload) + elif status in {"PartiallyFilled", "Partial"}: + execution_summary["orders_partially_filled"].append(order_payload) + elif ok: + execution_summary["orders_submitted"].append(order_payload) + else: + execution_summary["orders_skipped"].append({**order_payload, "reason": status or "submit_failed"}) + execution_summary["skipped_reasons"].append(f"submit_failed:{symbol}:{status or 'unknown'}") trade_logs.append( translator("limit_buy", symbol=symbol, qty=qty, price=f"{limit_price:.2f}") + f" {status_msg}" ) if ok: buying_power -= qty * limit_price - return trade_logs + execution_summary["execution_status"] = ( + "executed" + if ( + execution_summary["orders_submitted"] + or execution_summary["orders_filled"] + or execution_summary["orders_partially_filled"] + ) + else "no_op" + ) + execution_summary["residual_cash_estimate"] = float(max(buying_power, 0.0)) + return _finalize_result(trade_logs, execution_summary, return_summary=return_summary) diff --git a/application/feature_snapshot_service.py b/application/feature_snapshot_service.py new file mode 100644 index 0000000..4e0558c --- /dev/null +++ b/application/feature_snapshot_service.py @@ -0,0 +1,581 @@ +"""Feature snapshot loading helpers for InteractiveBrokersPlatform.""" + +from __future__ import annotations + +import hashlib +import json +from dataclasses import dataclass +from datetime import timezone +from pathlib import Path +from typing import Iterable + +import pandas as pd + + +DEFAULT_SNAPSHOT_DATE_COLUMNS = ("as_of", "snapshot_date") +DEFAULT_MAX_SNAPSHOT_MONTH_LAG = 1 +DEFAULT_SNAPSHOT_MANIFEST_SUFFIX = ".manifest.json" + + +@dataclass(frozen=True) +class FeatureSnapshotGuardResult: + frame: pd.DataFrame | None + metadata: dict[str, object] + + +def _load_snapshot_frame(snapshot_path: Path) -> pd.DataFrame: + suffix = snapshot_path.suffix.lower() + if suffix == ".csv": + return pd.read_csv(snapshot_path) + if suffix in {".json", ".jsonl"}: + return pd.read_json(snapshot_path, orient="records", lines=suffix == ".jsonl") + if suffix == ".parquet": + return pd.read_parquet(snapshot_path) + + raise ValueError( + "Unsupported feature snapshot format; expected .csv, .json, .jsonl, or .parquet" + ) + + +def _normalize_timestamp(value) -> pd.Timestamp | None: + if value is None or value == "": + return None + ts = pd.Timestamp(value) + if ts.tzinfo is not None: + ts = ts.tz_convert(None) + else: + ts = ts.tz_localize(None) + return ts.normalize() + + +def _month_lag(snapshot_as_of: pd.Timestamp, run_as_of: pd.Timestamp) -> int: + return (run_as_of.year - snapshot_as_of.year) * 12 + (run_as_of.month - snapshot_as_of.month) + + +def _build_guard_metadata( + *, + snapshot_path: Path, + decision: str, + snapshot_format: str | None = None, + snapshot_exists: bool, + snapshot_as_of: pd.Timestamp | None = None, + file_timestamp: str | None = None, + age_days: int | None = None, + no_op_reason: str | None = None, + fail_reason: str | None = None, + **extra, +) -> dict[str, object]: + payload = { + "feature_snapshot_path": str(snapshot_path), + "snapshot_path": str(snapshot_path), + "snapshot_format": snapshot_format, + "snapshot_exists": bool(snapshot_exists), + "snapshot_as_of": snapshot_as_of, + "snapshot_file_timestamp": file_timestamp, + "snapshot_age_days": age_days, + "snapshot_guard_decision": decision, + "no_op_reason": no_op_reason, + "fail_reason": fail_reason, + } + payload.update(extra) + return payload + + +def _sha256_file(path: Path) -> str: + hasher = hashlib.sha256() + with path.open("rb") as fh: + while True: + chunk = fh.read(1024 * 1024) + if not chunk: + break + hasher.update(chunk) + return hasher.hexdigest() + + +def _resolve_manifest_path(snapshot_path: Path, manifest_path: str | None) -> Path: + raw_manifest = str(manifest_path or "").strip() + if raw_manifest: + return Path(raw_manifest) + return Path(f"{snapshot_path}{DEFAULT_SNAPSHOT_MANIFEST_SUFFIX}") + + +def _normalize_manifest_payload(payload: dict[str, object]) -> dict[str, object]: + normalized = dict(payload) + if "snapshot_as_of" in normalized: + normalized["snapshot_as_of"] = _normalize_timestamp(normalized.get("snapshot_as_of")) + return normalized + + +def _load_manifest_payload(manifest_path: Path) -> dict[str, object]: + payload = json.loads(manifest_path.read_text(encoding="utf-8")) + if not isinstance(payload, dict): + raise ValueError("snapshot manifest must be a JSON object") + return _normalize_manifest_payload(payload) + + +def load_feature_snapshot(path: str) -> pd.DataFrame: + raw_path = str(path or "").strip() + if not raw_path: + raise EnvironmentError("Feature snapshot path is required") + snapshot_path = Path(raw_path) + if not snapshot_path.exists(): + raise FileNotFoundError(f"Feature snapshot not found: {snapshot_path}") + return _load_snapshot_frame(snapshot_path) + + +def load_feature_snapshot_guarded( + path: str, + *, + run_as_of, + required_columns: Iterable[str] | None = None, + snapshot_date_columns: Iterable[str] = DEFAULT_SNAPSHOT_DATE_COLUMNS, + max_snapshot_month_lag: int = DEFAULT_MAX_SNAPSHOT_MONTH_LAG, + manifest_path: str | None = None, + require_manifest: bool = False, + expected_strategy_profile: str | None = None, + expected_config_name: str | None = None, + expected_config_path: str | None = None, + expected_contract_version: str | None = None, +) -> FeatureSnapshotGuardResult: + raw_path = str(path or "").strip() + if not raw_path: + snapshot_path = Path("") + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_exists=False, + fail_reason="feature_snapshot_path_missing", + ), + ) + + snapshot_path = Path(raw_path) + manifest_file = _resolve_manifest_path(snapshot_path, manifest_path) + file_timestamp = None + if snapshot_path.exists(): + stat = snapshot_path.stat() + file_timestamp = pd.Timestamp(stat.st_mtime, unit="s", tz=timezone.utc).isoformat() + manifest_file_timestamp = None + if manifest_file.exists(): + manifest_stat = manifest_file.stat() + manifest_file_timestamp = pd.Timestamp( + manifest_stat.st_mtime, + unit="s", + tz=timezone.utc, + ).isoformat() + + if not snapshot_path.exists(): + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_exists=False, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=False, + fail_reason=f"feature_snapshot_missing:{snapshot_path}", + ), + ) + + try: + frame = _load_snapshot_frame(snapshot_path) + except Exception as exc: # pragma: no cover - exercised in tests through ValueError path + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + file_timestamp=file_timestamp, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=manifest_file.exists(), + snapshot_manifest_file_timestamp=manifest_file_timestamp, + fail_reason=f"feature_snapshot_parse_failed:{type(exc).__name__}:{exc}", + ), + ) + + if frame.empty: + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + file_timestamp=file_timestamp, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=manifest_file.exists(), + snapshot_manifest_file_timestamp=manifest_file_timestamp, + fail_reason="feature_snapshot_empty", + ), + ) + + required = {str(column) for column in (required_columns or ()) if str(column).strip()} + missing_columns = required - set(frame.columns) + if missing_columns: + missing_text = ",".join(sorted(missing_columns)) + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + file_timestamp=file_timestamp, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=manifest_file.exists(), + snapshot_manifest_file_timestamp=manifest_file_timestamp, + fail_reason=f"feature_snapshot_missing_columns:{missing_text}", + ), + ) + + date_columns = tuple(str(column) for column in snapshot_date_columns if str(column).strip()) + selected_date_column = next((column for column in date_columns if column in frame.columns), None) + if selected_date_column is None: + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + file_timestamp=file_timestamp, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=manifest_file.exists(), + snapshot_manifest_file_timestamp=manifest_file_timestamp, + fail_reason=f"feature_snapshot_missing_date_column:candidates={','.join(date_columns)}", + ), + ) + + snapshot_dates = pd.to_datetime(frame[selected_date_column], errors="coerce", utc=False) + if getattr(snapshot_dates.dt, "tz", None) is not None: + snapshot_dates = snapshot_dates.dt.tz_localize(None) + snapshot_dates = snapshot_dates.dt.normalize() + if snapshot_dates.notna().sum() == 0: + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + file_timestamp=file_timestamp, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=manifest_file.exists(), + snapshot_manifest_file_timestamp=manifest_file_timestamp, + fail_reason=f"feature_snapshot_invalid_date_column:{selected_date_column}", + ), + ) + + snapshot_as_of = pd.Timestamp(snapshot_dates.max()).normalize() + run_date = _normalize_timestamp(run_as_of) + if run_date is None: + raise ValueError("run_as_of is required for guarded feature snapshot loading") + + age_days = int((run_date - snapshot_as_of).days) + if age_days < 0: + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + snapshot_as_of=snapshot_as_of, + file_timestamp=file_timestamp, + age_days=age_days, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=manifest_file.exists(), + snapshot_manifest_file_timestamp=manifest_file_timestamp, + fail_reason=f"feature_snapshot_future_as_of:{snapshot_as_of.date()}", + ), + ) + + if _month_lag(snapshot_as_of, run_date) > int(max_snapshot_month_lag): + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + snapshot_as_of=snapshot_as_of, + file_timestamp=file_timestamp, + age_days=age_days, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=manifest_file.exists(), + snapshot_manifest_file_timestamp=manifest_file_timestamp, + fail_reason=( + "feature_snapshot_stale:" + f"snapshot_as_of={snapshot_as_of.date()} run_as_of={run_date.date()} " + f"max_month_lag={int(max_snapshot_month_lag)}" + ), + ), + ) + + actual_snapshot_sha256 = None + actual_config_sha256 = None + manifest_payload: dict[str, object] | None = None + manifest_exists = manifest_file.exists() + if require_manifest: + if not manifest_exists: + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + snapshot_as_of=snapshot_as_of, + file_timestamp=file_timestamp, + age_days=age_days, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=False, + snapshot_manifest_file_timestamp=manifest_file_timestamp, + fail_reason=f"feature_snapshot_manifest_missing:{manifest_file}", + ), + ) + try: + manifest_payload = _load_manifest_payload(manifest_file) + except Exception as exc: + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + snapshot_as_of=snapshot_as_of, + file_timestamp=file_timestamp, + age_days=age_days, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=True, + snapshot_manifest_file_timestamp=manifest_file_timestamp, + fail_reason=f"feature_snapshot_manifest_parse_failed:{type(exc).__name__}:{exc}", + ), + ) + + required_manifest_fields = { + "contract_version", + "strategy_profile", + "config_name", + "snapshot_as_of", + "snapshot_sha256", + "config_sha256", + } + missing_manifest_fields = sorted( + field for field in required_manifest_fields if not str(manifest_payload.get(field) or "").strip() + ) + if missing_manifest_fields: + missing_text = ",".join(missing_manifest_fields) + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + snapshot_as_of=snapshot_as_of, + file_timestamp=file_timestamp, + age_days=age_days, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=True, + snapshot_manifest_file_timestamp=manifest_file_timestamp, + fail_reason=f"feature_snapshot_manifest_missing_fields:{missing_text}", + ), + ) + + manifest_as_of = manifest_payload.get("snapshot_as_of") + if manifest_as_of != snapshot_as_of: + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + snapshot_as_of=snapshot_as_of, + file_timestamp=file_timestamp, + age_days=age_days, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=True, + snapshot_manifest_file_timestamp=manifest_file_timestamp, + snapshot_manifest_contract_version=manifest_payload.get("contract_version"), + snapshot_manifest_strategy_profile=manifest_payload.get("strategy_profile"), + snapshot_manifest_config_name=manifest_payload.get("config_name"), + fail_reason=( + "feature_snapshot_manifest_as_of_mismatch:" + f"manifest={manifest_as_of} snapshot={snapshot_as_of.date()}" + ), + ), + ) + + if expected_strategy_profile and str(manifest_payload.get("strategy_profile")).strip() != str(expected_strategy_profile).strip(): + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + snapshot_as_of=snapshot_as_of, + file_timestamp=file_timestamp, + age_days=age_days, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=True, + snapshot_manifest_file_timestamp=manifest_file_timestamp, + snapshot_manifest_contract_version=manifest_payload.get("contract_version"), + snapshot_manifest_strategy_profile=manifest_payload.get("strategy_profile"), + snapshot_manifest_config_name=manifest_payload.get("config_name"), + fail_reason=( + "feature_snapshot_manifest_strategy_profile_mismatch:" + f"expected={expected_strategy_profile} actual={manifest_payload.get('strategy_profile')}" + ), + ), + ) + + if expected_config_name and str(manifest_payload.get("config_name")).strip() != str(expected_config_name).strip(): + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + snapshot_as_of=snapshot_as_of, + file_timestamp=file_timestamp, + age_days=age_days, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=True, + snapshot_manifest_file_timestamp=manifest_file_timestamp, + snapshot_manifest_contract_version=manifest_payload.get("contract_version"), + snapshot_manifest_strategy_profile=manifest_payload.get("strategy_profile"), + snapshot_manifest_config_name=manifest_payload.get("config_name"), + fail_reason=( + "feature_snapshot_manifest_config_name_mismatch:" + f"expected={expected_config_name} actual={manifest_payload.get('config_name')}" + ), + ), + ) + + if expected_contract_version and str(manifest_payload.get("contract_version")).strip() != str(expected_contract_version).strip(): + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + snapshot_as_of=snapshot_as_of, + file_timestamp=file_timestamp, + age_days=age_days, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=True, + snapshot_manifest_file_timestamp=manifest_file_timestamp, + snapshot_manifest_contract_version=manifest_payload.get("contract_version"), + snapshot_manifest_strategy_profile=manifest_payload.get("strategy_profile"), + snapshot_manifest_config_name=manifest_payload.get("config_name"), + fail_reason=( + "feature_snapshot_manifest_contract_version_mismatch:" + f"expected={expected_contract_version} actual={manifest_payload.get('contract_version')}" + ), + ), + ) + + actual_snapshot_sha256 = _sha256_file(snapshot_path) + if str(manifest_payload.get("snapshot_sha256")).strip() != actual_snapshot_sha256: + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + snapshot_as_of=snapshot_as_of, + file_timestamp=file_timestamp, + age_days=age_days, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=True, + snapshot_manifest_file_timestamp=manifest_file_timestamp, + snapshot_manifest_contract_version=manifest_payload.get("contract_version"), + snapshot_manifest_strategy_profile=manifest_payload.get("strategy_profile"), + snapshot_manifest_config_name=manifest_payload.get("config_name"), + snapshot_manifest_snapshot_sha256=manifest_payload.get("snapshot_sha256"), + fail_reason="feature_snapshot_manifest_snapshot_checksum_mismatch", + ), + ) + + if expected_config_path: + config_file = Path(str(expected_config_path)) + if not config_file.exists(): + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + snapshot_as_of=snapshot_as_of, + file_timestamp=file_timestamp, + age_days=age_days, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=True, + snapshot_manifest_file_timestamp=manifest_file_timestamp, + snapshot_manifest_contract_version=manifest_payload.get("contract_version"), + snapshot_manifest_strategy_profile=manifest_payload.get("strategy_profile"), + snapshot_manifest_config_name=manifest_payload.get("config_name"), + fail_reason=f"feature_snapshot_expected_config_missing:{config_file}", + ), + ) + actual_config_sha256 = _sha256_file(config_file) + if str(manifest_payload.get("config_sha256")).strip() != actual_config_sha256: + return FeatureSnapshotGuardResult( + frame=None, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="fail_closed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + snapshot_as_of=snapshot_as_of, + file_timestamp=file_timestamp, + age_days=age_days, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=True, + snapshot_manifest_file_timestamp=manifest_file_timestamp, + snapshot_manifest_contract_version=manifest_payload.get("contract_version"), + snapshot_manifest_strategy_profile=manifest_payload.get("strategy_profile"), + snapshot_manifest_config_name=manifest_payload.get("config_name"), + snapshot_manifest_config_sha256=manifest_payload.get("config_sha256"), + fail_reason="feature_snapshot_manifest_config_checksum_mismatch", + ), + ) + + return FeatureSnapshotGuardResult( + frame=frame, + metadata=_build_guard_metadata( + snapshot_path=snapshot_path, + decision="proceed", + snapshot_format=snapshot_path.suffix.lower() or None, + snapshot_exists=True, + snapshot_as_of=snapshot_as_of, + file_timestamp=file_timestamp, + age_days=age_days, + snapshot_manifest_path=str(manifest_file), + snapshot_manifest_exists=manifest_exists, + snapshot_manifest_file_timestamp=manifest_file_timestamp, + snapshot_manifest_contract_version=(manifest_payload or {}).get("contract_version"), + snapshot_manifest_strategy_profile=(manifest_payload or {}).get("strategy_profile"), + snapshot_manifest_config_name=(manifest_payload or {}).get("config_name"), + snapshot_manifest_config_path=(manifest_payload or {}).get("config_path"), + snapshot_manifest_snapshot_sha256=(manifest_payload or {}).get("snapshot_sha256"), + snapshot_manifest_config_sha256=(manifest_payload or {}).get("config_sha256"), + expected_strategy_profile=expected_strategy_profile, + expected_config_name=expected_config_name, + expected_config_path=expected_config_path, + expected_contract_version=expected_contract_version, + actual_snapshot_sha256=actual_snapshot_sha256, + actual_config_sha256=actual_config_sha256, + ), + ) diff --git a/application/rebalance_service.py b/application/rebalance_service.py index 23b9b75..a7276ab 100644 --- a/application/rebalance_service.py +++ b/application/rebalance_service.py @@ -2,8 +2,27 @@ from __future__ import annotations +import json -def build_dashboard(positions, account_values, signal_desc, canary_str, *, translator, separator): +from application.reconciliation_service import ( + build_reconciliation_record, + write_reconciliation_record, +) + + +def build_dashboard( + positions, + account_values, + signal_desc, + status_desc, + *, + strategy_profile=None, + target_weights=None, + signal_metadata=None, + translator, + separator, + status_icon="🐤", +): equity = account_values.get("equity", 0) buying_power = account_values.get("buying_power", 0) position_lines = [] @@ -13,13 +32,50 @@ def build_dashboard(positions, account_values, signal_desc, canary_str, *, trans market_value = qty * avg position_lines.append(f" {symbol}: {qty}股 ${market_value:,.2f}") position_text = "\n".join(position_lines) if position_lines else " (空仓)" + signal_metadata = signal_metadata or {} + target_lines = [] + if target_weights: + for symbol, weight in sorted(target_weights.items(), key=lambda item: (-item[1], item[0])): + target_lines.append(f" {symbol}: {weight:.1%}") + target_text = "\n".join(target_lines) if target_lines else " (无目标持仓)" + profile_line = f"strategy_profile={strategy_profile}" if strategy_profile else "strategy_profile=" + regime = signal_metadata.get("regime") + breadth_ratio = signal_metadata.get("breadth_ratio") + target_stock_weight = signal_metadata.get("target_stock_weight") + realized_stock_weight = signal_metadata.get("realized_stock_weight") + safe_haven_weight = signal_metadata.get("safe_haven_weight") + config_source = signal_metadata.get("strategy_config_source") + snapshot_as_of = signal_metadata.get("snapshot_as_of") + snapshot_path = signal_metadata.get("feature_snapshot_path") or signal_metadata.get("snapshot_path") + snapshot_age_days = signal_metadata.get("snapshot_age_days") + snapshot_file_timestamp = signal_metadata.get("snapshot_file_timestamp") + snapshot_decision = signal_metadata.get("snapshot_guard_decision") + diagnostics = [ + profile_line, + f"regime={regime}" if regime else None, + f"breadth={breadth_ratio:.1%}" if isinstance(breadth_ratio, (int, float)) else None, + f"risk_target={target_stock_weight:.1%}" if isinstance(target_stock_weight, (int, float)) else None, + f"realized_stock={realized_stock_weight:.1%}" if isinstance(realized_stock_weight, (int, float)) else None, + f"safe_haven_target={safe_haven_weight:.1%}" if isinstance(safe_haven_weight, (int, float)) else None, + f"snapshot_decision={snapshot_decision}" if snapshot_decision else None, + f"snapshot_as_of={snapshot_as_of}" if snapshot_as_of else None, + f"snapshot_age_days={snapshot_age_days}" if isinstance(snapshot_age_days, (int, float)) else None, + f"snapshot_file_ts={snapshot_file_timestamp}" if snapshot_file_timestamp else None, + f"snapshot_path={snapshot_path}" if snapshot_path else None, + f"config_source={config_source}" if config_source else None, + ] + diagnostics_text = " | ".join(part for part in diagnostics if part) return ( f"{translator('equity')}: ${equity:,.2f} | {translator('buying_power')}: ${buying_power:,.2f}\n" f"{separator}\n" f"{position_text}\n" f"{separator}\n" - f"🐤 {canary_str}\n" - f"🎯 {signal_desc}" + f"{diagnostics_text}\n" + f"{separator}\n" + f"{status_icon} {status_desc}\n" + f"🎯 {signal_desc}\n" + f"{separator}\n" + f"Target Weights:\n{target_text}" ) @@ -32,30 +88,102 @@ def run_strategy_core( send_tg_message, translator, separator, + reconciliation_output_path=None, ): ib = None try: ib = connect_ib() positions, account_values = get_current_portfolio(ib) current_holdings = set(positions.keys()) - target_weights, signal_desc, _is_emergency, canary_str = compute_signals(ib, current_holdings) + signal_result = compute_signals(ib, current_holdings) + if len(signal_result) == 5: + target_weights, signal_desc, _is_emergency, status_desc, signal_metadata = signal_result + else: + target_weights, signal_desc, _is_emergency, status_desc = signal_result + signal_metadata = {} dashboard = build_dashboard( positions, account_values, signal_desc, - canary_str, + status_desc, + strategy_profile=signal_metadata.get("strategy_profile"), + target_weights=target_weights, + signal_metadata=signal_metadata, translator=translator, separator=separator, + status_icon=signal_metadata.get("status_icon", "🐤"), ) if target_weights is None: - message = f"{translator('heartbeat_title')}\n{dashboard}\n{separator}\n{translator('no_trades')}" + decision = signal_metadata.get("snapshot_guard_decision") + no_op_reason = signal_metadata.get("no_op_reason") + fail_reason = signal_metadata.get("fail_reason") + no_op_text = translator("no_trades") + if decision: + no_op_text = f"{no_op_text} | decision={decision}" + if no_op_reason: + no_op_text = f"{no_op_text} | reason={no_op_reason}" + if fail_reason: + no_op_text = f"{no_op_text} | fail_reason={fail_reason}" + record = build_reconciliation_record( + strategy_profile=signal_metadata.get("strategy_profile"), + mode="dry_run" if signal_metadata.get("dry_run_only") else "paper", + trade_date=signal_metadata.get("trade_date"), + snapshot_as_of=signal_metadata.get("snapshot_as_of"), + signal_metadata=signal_metadata, + target_weights=None, + execution_summary=None, + no_op_reason=no_op_reason or fail_reason or decision, + ) + record_path = write_reconciliation_record(record, output_path=reconciliation_output_path) + print( + "reconciliation_record " + + json.dumps({"path": str(record_path), "status": record.get("execution_status"), "no_op_reason": record.get("no_op_reason")}, ensure_ascii=False), + flush=True, + ) + message = f"{translator('heartbeat_title')}\n{dashboard}\n{separator}\n{no_op_text}" send_tg_message(message) print(message, flush=True) return "OK - heartbeat" - trade_logs = execute_rebalance(ib, target_weights, positions, account_values) + execution_result = execute_rebalance( + ib, + target_weights, + positions, + account_values, + strategy_symbols=signal_metadata.get("managed_symbols"), + signal_metadata=signal_metadata, + ) + if isinstance(execution_result, tuple) and len(execution_result) == 2: + trade_logs, execution_summary = execution_result + else: + trade_logs = execution_result + execution_summary = None + record = build_reconciliation_record( + strategy_profile=signal_metadata.get("strategy_profile"), + mode="dry_run" if execution_summary and execution_summary.get("mode") == "dry_run" else "paper", + trade_date=signal_metadata.get("trade_date"), + snapshot_as_of=signal_metadata.get("snapshot_as_of"), + signal_metadata=signal_metadata, + target_weights=target_weights, + execution_summary=execution_summary, + ) + record_path = write_reconciliation_record(record, output_path=reconciliation_output_path) + print( + "reconciliation_record " + + json.dumps( + { + "path": str(record_path), + "status": record.get("execution_status"), + "orders_submitted": len(record.get("orders_submitted") or ()), + "orders_filled": len(record.get("orders_filled") or ()), + "orders_skipped": len(record.get("orders_skipped") or ()), + }, + ensure_ascii=False, + ), + flush=True, + ) if trade_logs: trade_lines = "\n".join(trade_logs) message = ( @@ -73,4 +201,3 @@ def run_strategy_core( finally: if ib is not None and ib.isConnected(): ib.disconnect() - diff --git a/application/reconciliation_service.py b/application/reconciliation_service.py new file mode 100644 index 0000000..e88591a --- /dev/null +++ b/application/reconciliation_service.py @@ -0,0 +1,86 @@ +"""Structured reconciliation record helpers for InteractiveBrokersPlatform.""" + +from __future__ import annotations + +import json +import tempfile +from pathlib import Path +from typing import Any + +import pandas as pd + + +def _json_safe(value: Any): + if isinstance(value, pd.Timestamp): + return value.isoformat() + if isinstance(value, Path): + return str(value) + if isinstance(value, dict): + return {str(key): _json_safe(item) for key, item in value.items()} + if isinstance(value, (list, tuple)): + return [_json_safe(item) for item in value] + return value + + +def default_reconciliation_output_path(strategy_profile: str | None) -> Path: + profile = str(strategy_profile or "unknown").strip() or "unknown" + safe_profile = "".join(ch if ch.isalnum() or ch in {"-", "_", "."} else "_" for ch in profile) + return Path(tempfile.gettempdir()) / f"ibkr_reconciliation_{safe_profile}.json" + + +def build_reconciliation_record( + *, + strategy_profile: str | None, + mode: str, + trade_date: str | None, + snapshot_as_of, + signal_metadata: dict[str, Any] | None, + target_weights: dict[str, float] | None, + execution_summary: dict[str, Any] | None, + no_op_reason: str | None = None, +) -> dict[str, Any]: + signal_metadata = dict(signal_metadata or {}) + execution_summary = dict(execution_summary or {}) + target_weights = dict(target_weights or {}) + record = { + "strategy_profile": strategy_profile, + "mode": mode, + "trade_date": trade_date, + "snapshot_as_of": snapshot_as_of, + "snapshot_guard_decision": signal_metadata.get("snapshot_guard_decision"), + "snapshot_path": signal_metadata.get("feature_snapshot_path") or signal_metadata.get("snapshot_path"), + "regime": signal_metadata.get("regime"), + "breadth": signal_metadata.get("breadth_ratio"), + "target_stock_weight": signal_metadata.get("target_stock_weight"), + "realized_stock_weight": signal_metadata.get("realized_stock_weight"), + "target_safe_haven_weight": signal_metadata.get("safe_haven_weight"), + "realized_safe_haven_weight": execution_summary.get("realized_safe_haven_weight"), + "safe_haven_symbol": execution_summary.get("safe_haven_symbol") or signal_metadata.get("safe_haven_symbol"), + "target_holdings": [ + {"symbol": symbol, "target_weight": float(weight)} + for symbol, weight in sorted(target_weights.items(), key=lambda item: (-item[1], item[0])) + ], + "target_vs_current": execution_summary.get("target_vs_current") or [], + "orders_submitted": execution_summary.get("orders_submitted") or [], + "orders_filled": execution_summary.get("orders_filled") or [], + "orders_partially_filled": execution_summary.get("orders_partially_filled") or [], + "orders_skipped": execution_summary.get("orders_skipped") or [], + "skipped_reasons": execution_summary.get("skipped_reasons") or [], + "residual_cash_estimate": execution_summary.get("residual_cash_estimate"), + "cash_reserve_dollars": execution_summary.get("cash_reserve_dollars"), + "current_stock_weight": execution_summary.get("current_stock_weight"), + "current_safe_haven_weight": execution_summary.get("current_safe_haven_weight"), + "execution_status": execution_summary.get("execution_status") or ("no_op" if no_op_reason else "executed"), + "lock_path": execution_summary.get("lock_path"), + "no_op_reason": no_op_reason or execution_summary.get("no_op_reason"), + "fail_reason": signal_metadata.get("fail_reason"), + "status_icon": signal_metadata.get("status_icon"), + } + return _json_safe(record) + + +def write_reconciliation_record(record: dict[str, Any], *, output_path: str | Path | None = None) -> Path: + path = Path(output_path) if output_path else default_reconciliation_output_path(record.get("strategy_profile")) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(_json_safe(record), ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") + return path diff --git a/configs/paper_cash_buffer_branch.dryrun.env.example b/configs/paper_cash_buffer_branch.dryrun.env.example new file mode 100644 index 0000000..7a3f15e --- /dev/null +++ b/configs/paper_cash_buffer_branch.dryrun.env.example @@ -0,0 +1,11 @@ +# IBKR paper runtime switch: cash_buffer_branch_default (dry-run) +STRATEGY_PROFILE=cash_buffer_branch_default +ACCOUNT_GROUP=default +IB_ACCOUNT_GROUP_CONFIG_SECRET_NAME=ibkr-account-groups +IBKR_FEATURE_SNAPSHOT_PATH=/var/data/cash_buffer_branch_feature_snapshot_latest.csv +IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH=/var/data/cash_buffer_branch_feature_snapshot_latest.csv.manifest.json +IBKR_STRATEGY_CONFIG_PATH=/app/research/configs/growth_pullback_cash_buffer_branch_default.json +IBKR_RECONCILIATION_OUTPUT_PATH=/var/log/ibkr_cash_buffer_branch_reconciliation.json +IBKR_DRY_RUN_ONLY=true +GLOBAL_TELEGRAM_CHAT_ID= +NOTIFY_LANG=zh diff --git a/configs/paper_cash_buffer_branch.env.example b/configs/paper_cash_buffer_branch.env.example new file mode 100644 index 0000000..f66cfe5 --- /dev/null +++ b/configs/paper_cash_buffer_branch.env.example @@ -0,0 +1,11 @@ +# IBKR paper runtime switch: cash_buffer_branch_default +STRATEGY_PROFILE=cash_buffer_branch_default +ACCOUNT_GROUP=default +IB_ACCOUNT_GROUP_CONFIG_SECRET_NAME=ibkr-account-groups +IBKR_FEATURE_SNAPSHOT_PATH=/var/data/cash_buffer_branch_feature_snapshot_latest.csv +IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH=/var/data/cash_buffer_branch_feature_snapshot_latest.csv.manifest.json +IBKR_STRATEGY_CONFIG_PATH=/app/research/configs/growth_pullback_cash_buffer_branch_default.json +IBKR_RECONCILIATION_OUTPUT_PATH=/var/log/ibkr_cash_buffer_branch_reconciliation.json +IBKR_DRY_RUN_ONLY=true +GLOBAL_TELEGRAM_CHAT_ID= +NOTIFY_LANG=zh diff --git a/configs/paper_cash_buffer_branch.paper.env.example b/configs/paper_cash_buffer_branch.paper.env.example new file mode 100644 index 0000000..3cf14a0 --- /dev/null +++ b/configs/paper_cash_buffer_branch.paper.env.example @@ -0,0 +1,11 @@ +# IBKR paper runtime switch: cash_buffer_branch_default (paper ordering) +STRATEGY_PROFILE=cash_buffer_branch_default +ACCOUNT_GROUP=default +IB_ACCOUNT_GROUP_CONFIG_SECRET_NAME=ibkr-account-groups +IBKR_FEATURE_SNAPSHOT_PATH=/var/data/cash_buffer_branch_feature_snapshot_latest.csv +IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH=/var/data/cash_buffer_branch_feature_snapshot_latest.csv.manifest.json +IBKR_STRATEGY_CONFIG_PATH=/app/research/configs/growth_pullback_cash_buffer_branch_default.json +IBKR_RECONCILIATION_OUTPUT_PATH=/var/log/ibkr_cash_buffer_branch_reconciliation.json +IBKR_DRY_RUN_ONLY=false +GLOBAL_TELEGRAM_CHAT_ID= +NOTIFY_LANG=zh diff --git a/configs/paper_global_etf_rotation.rollback.env.example b/configs/paper_global_etf_rotation.rollback.env.example new file mode 100644 index 0000000..7daa578 --- /dev/null +++ b/configs/paper_global_etf_rotation.rollback.env.example @@ -0,0 +1,12 @@ +# IBKR paper runtime rollback: global_etf_rotation +STRATEGY_PROFILE=global_etf_rotation +ACCOUNT_GROUP=default +IB_ACCOUNT_GROUP_CONFIG_SECRET_NAME=ibkr-account-groups +GLOBAL_TELEGRAM_CHAT_ID= +NOTIFY_LANG=zh +# remove if previously set for cash_buffer_branch_default: +# IBKR_FEATURE_SNAPSHOT_PATH +# IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH +# IBKR_STRATEGY_CONFIG_PATH +# IBKR_RECONCILIATION_OUTPUT_PATH +# IBKR_DRY_RUN_ONLY diff --git a/entrypoints/cloud_run.py b/entrypoints/cloud_run.py index 103df03..fa9735e 100644 --- a/entrypoints/cloud_run.py +++ b/entrypoints/cloud_run.py @@ -3,14 +3,33 @@ from __future__ import annotations from datetime import datetime +from importlib import import_module -import pandas_market_calendars as mcal import pytz -def is_market_open_today(*, calendar_name="NYSE", timezone_name="America/New_York") -> bool: +def _load_market_calendar(calendar_name: str, *, logger) -> object | None: + try: + module = import_module("pandas_market_calendars") + return module.get_calendar(calendar_name) + except Exception as exc: + logger( + f"pandas_market_calendars unavailable for {calendar_name}: {exc}; " + "falling back to weekday-only market-open check" + ) + return None + + +def is_market_open_today( + *, + calendar_name="NYSE", + timezone_name="America/New_York", + logger=lambda _message: None, +) -> bool: tz_ny = pytz.timezone(timezone_name) now_ny = datetime.now(tz_ny) - calendar = mcal.get_calendar(calendar_name) + calendar = _load_market_calendar(calendar_name, logger=logger) + if calendar is None: + return now_ny.weekday() < 5 schedule = calendar.schedule(start_date=now_ny.date(), end_date=now_ny.date()) - return not schedule.empty + return len(getattr(schedule, "index", ())) > 0 diff --git a/main.py b/main.py index 2eb7fbb..9167057 100644 --- a/main.py +++ b/main.py @@ -1,10 +1,9 @@ -""" -IBKR Global ETF Rotation Strategy. -Quarterly momentum rotation across 22 global ETFs + daily canary emergency. -Runs on Cloud Run; connects to IB Gateway on GCE via ib_insync, alerts via Telegram. -""" +"""IBKR strategy runner for shared us_equity strategy profiles.""" import os import traceback +from datetime import datetime +from zoneinfo import ZoneInfo + import requests import pandas as pd from flask import Flask, request @@ -30,6 +29,7 @@ execute_rebalance as application_execute_rebalance, get_market_prices as application_get_market_prices, ) +from application.feature_snapshot_service import load_feature_snapshot_guarded from application.rebalance_service import run_strategy_core as run_rebalance_cycle from entrypoints.cloud_run import is_market_open_today from runtime_config_support import ( @@ -40,6 +40,7 @@ app = Flask(__name__) ensure_event_loop = ibkr_ensure_event_loop +NEW_YORK_TZ = ZoneInfo("America/New_York") # --------------------------------------------------------------------------- @@ -132,23 +133,122 @@ def get_ib_port(): ACCOUNT_IDS = RUNTIME_SETTINGS.account_ids STRATEGY_LOGIC = load_signal_logic_module(STRATEGY_PROFILE) -RANKING_POOL = STRATEGY_LOGIC.RANKING_POOL -CANARY_ASSETS = STRATEGY_LOGIC.CANARY_ASSETS -SAFE_HAVEN = STRATEGY_LOGIC.SAFE_HAVEN -TOP_N = STRATEGY_LOGIC.TOP_N -SMA_PERIOD = STRATEGY_LOGIC.SMA_PERIOD -HOLD_BONUS = STRATEGY_LOGIC.HOLD_BONUS -CANARY_BAD_THRESHOLD = STRATEGY_LOGIC.CANARY_BAD_THRESHOLD -REBALANCE_MONTHS = STRATEGY_LOGIC.REBALANCE_MONTHS -strategy_check_sma = STRATEGY_LOGIC.check_sma -strategy_compute_13612w_momentum = STRATEGY_LOGIC.compute_13612w_momentum +STRATEGY_SIGNAL_SOURCE = getattr(STRATEGY_LOGIC, "SIGNAL_SOURCE", "market_data") +STRATEGY_STATUS_ICON = getattr(STRATEGY_LOGIC, "STATUS_ICON", "🐤") +SAFE_HAVEN = getattr(STRATEGY_LOGIC, "SAFE_HAVEN", "BIL") +RANKING_POOL = list(getattr(STRATEGY_LOGIC, "RANKING_POOL", ())) +CANARY_ASSETS = list(getattr(STRATEGY_LOGIC, "CANARY_ASSETS", ())) +TOP_N = getattr(STRATEGY_LOGIC, "TOP_N", None) +SMA_PERIOD = getattr(STRATEGY_LOGIC, "SMA_PERIOD", 200) +CANARY_BAD_THRESHOLD = getattr(STRATEGY_LOGIC, "CANARY_BAD_THRESHOLD", None) +REBALANCE_MONTHS = getattr(STRATEGY_LOGIC, "REBALANCE_MONTHS", None) +FEATURE_SNAPSHOT_PATH = RUNTIME_SETTINGS.feature_snapshot_path +FEATURE_SNAPSHOT_MANIFEST_PATH = RUNTIME_SETTINGS.feature_snapshot_manifest_path +FEATURE_REQUIRE_SNAPSHOT_MANIFEST = bool(getattr(STRATEGY_LOGIC, "REQUIRE_SNAPSHOT_MANIFEST", False)) +FEATURE_SNAPSHOT_CONTRACT_VERSION = getattr(STRATEGY_LOGIC, "SNAPSHOT_CONTRACT_VERSION", None) +feature_runtime_loader = getattr(STRATEGY_LOGIC, "load_runtime_parameters", None) +FEATURE_RUNTIME_PARAMETERS = ( + feature_runtime_loader( + config_path=RUNTIME_SETTINGS.strategy_config_path, + logger=lambda message: print(message, flush=True), + ) + if STRATEGY_SIGNAL_SOURCE == "feature_snapshot" and callable(feature_runtime_loader) + else {} +) +HOLD_BONUS = FEATURE_RUNTIME_PARAMETERS.get( + "hold_bonus", + getattr(STRATEGY_LOGIC, "HOLD_BONUS", getattr(STRATEGY_LOGIC, "DEFAULT_HOLD_BONUS", 0.0)), +) +FEATURE_SIGNAL_KWARG_KEYS = tuple( + getattr( + STRATEGY_LOGIC, + "FEATURE_SIGNAL_KWARG_KEYS", + ( + "benchmark_symbol", + "safe_haven", + "holdings_count", + "single_name_cap", + "sector_cap", + "hold_bonus", + "soft_defense_exposure", + "hard_defense_exposure", + "soft_breadth_threshold", + "hard_breadth_threshold", + ), + ) +) +FEATURE_BENCHMARK_SYMBOL = FEATURE_RUNTIME_PARAMETERS.get( + "benchmark_symbol", + getattr(STRATEGY_LOGIC, "BENCHMARK_SYMBOL", "SPY"), +) +FEATURE_HOLDINGS_COUNT = FEATURE_RUNTIME_PARAMETERS.get( + "holdings_count", + getattr(STRATEGY_LOGIC, "DEFAULT_HOLDINGS_COUNT", 24), +) +FEATURE_SINGLE_NAME_CAP = FEATURE_RUNTIME_PARAMETERS.get( + "single_name_cap", + getattr(STRATEGY_LOGIC, "DEFAULT_SINGLE_NAME_CAP", 0.06), +) +FEATURE_SECTOR_CAP = FEATURE_RUNTIME_PARAMETERS.get( + "sector_cap", + getattr(STRATEGY_LOGIC, "DEFAULT_SECTOR_CAP", 0.20), +) +FEATURE_RISK_ON_EXPOSURE = FEATURE_RUNTIME_PARAMETERS.get( + "risk_on_exposure", + 1.0, +) +FEATURE_SOFT_DEFENSE_EXPOSURE = FEATURE_RUNTIME_PARAMETERS.get( + "soft_defense_exposure", + getattr(STRATEGY_LOGIC, "DEFAULT_SOFT_DEFENSE_EXPOSURE", 0.50), +) +FEATURE_HARD_DEFENSE_EXPOSURE = FEATURE_RUNTIME_PARAMETERS.get( + "hard_defense_exposure", + getattr(STRATEGY_LOGIC, "DEFAULT_HARD_DEFENSE_EXPOSURE", 0.10), +) +FEATURE_SOFT_BREADTH_THRESHOLD = FEATURE_RUNTIME_PARAMETERS.get( + "soft_breadth_threshold", + getattr(STRATEGY_LOGIC, "DEFAULT_SOFT_BREADTH_THRESHOLD", 0.55), +) +FEATURE_HARD_BREADTH_THRESHOLD = FEATURE_RUNTIME_PARAMETERS.get( + "hard_breadth_threshold", + getattr(STRATEGY_LOGIC, "DEFAULT_HARD_BREADTH_THRESHOLD", 0.35), +) +FEATURE_RUNTIME_EXECUTION_WINDOW_TRADING_DAYS = FEATURE_RUNTIME_PARAMETERS.get( + "runtime_execution_window_trading_days", +) +FEATURE_MIN_ADV20_USD = FEATURE_RUNTIME_PARAMETERS.get("min_adv20_usd") +FEATURE_SECTOR_WHITELIST = FEATURE_RUNTIME_PARAMETERS.get("sector_whitelist") +FEATURE_NORMALIZATION = FEATURE_RUNTIME_PARAMETERS.get("normalization") +FEATURE_SCORE_TEMPLATE = FEATURE_RUNTIME_PARAMETERS.get("score_template") +FEATURE_RESIDUAL_PROXY = FEATURE_RUNTIME_PARAMETERS.get("residual_proxy") +FEATURE_RUNTIME_CONFIG_NAME = FEATURE_RUNTIME_PARAMETERS.get( + "runtime_config_name", + RUNTIME_SETTINGS.strategy_profile, +) +FEATURE_RUNTIME_CONFIG_PATH = FEATURE_RUNTIME_PARAMETERS.get( + "runtime_config_path", + RUNTIME_SETTINGS.strategy_config_path, +) +FEATURE_RUNTIME_CONFIG_SOURCE = FEATURE_RUNTIME_PARAMETERS.get( + "runtime_config_source", + RUNTIME_SETTINGS.strategy_config_source, +) +RECONCILIATION_OUTPUT_PATH = RUNTIME_SETTINGS.reconciliation_output_path +strategy_check_sma = getattr(STRATEGY_LOGIC, "check_sma", None) +strategy_compute_13612w_momentum = getattr(STRATEGY_LOGIC, "compute_13612w_momentum", None) strategy_compute_signals = STRATEGY_LOGIC.compute_signals TG_TOKEN = RUNTIME_SETTINGS.tg_token TG_CHAT_ID = RUNTIME_SETTINGS.tg_chat_id NOTIFY_LANG = RUNTIME_SETTINGS.notify_lang -CASH_RESERVE_RATIO = 0.03 +DEFAULT_CASH_RESERVE_RATIO = 0.03 +CASH_RESERVE_RATIO = float( + FEATURE_RUNTIME_PARAMETERS.get( + "execution_cash_reserve_ratio", + DEFAULT_CASH_RESERVE_RATIO, + ) +) REBALANCE_THRESHOLD_RATIO = 0.02 # 2% of equity to trigger trades LIMIT_BUY_PREMIUM = 1.005 @@ -177,6 +277,13 @@ def connect_ib(): return ibkr_connect_ib(IB_HOST, IB_PORT, IB_CLIENT_ID) +def resolve_run_as_of_date() -> pd.Timestamp: + explicit = os.getenv("IBKR_RUN_AS_OF_DATE") + if explicit: + return pd.Timestamp(explicit).normalize() + return pd.Timestamp(datetime.now(NEW_YORK_TZ).date()) + + def get_historical_close(ib, symbol, duration="2 Y", bar_size="1 day"): """Fetch daily close prices from IBKR via QuantPlatformKit.""" series = fetch_historical_price_series( @@ -197,14 +304,187 @@ def get_historical_close(ib, symbol, duration="2 Y", bar_size="1 day"): # Strategy logic # --------------------------------------------------------------------------- def compute_13612w_momentum(closes, as_of_date=None): + if strategy_compute_13612w_momentum is None: + raise NotImplementedError(f"{STRATEGY_PROFILE} does not expose 13612W momentum") return strategy_compute_13612w_momentum(closes, as_of_date=as_of_date) def check_sma(closes, period=SMA_PERIOD): + if strategy_check_sma is None: + raise NotImplementedError(f"{STRATEGY_PROFILE} does not expose SMA filtering") return strategy_check_sma(closes, period=period) def compute_signals(ib, current_holdings): + if STRATEGY_SIGNAL_SOURCE == "feature_snapshot": + run_as_of = resolve_run_as_of_date() + if not FEATURE_SNAPSHOT_PATH: + return ( + None, + "feature snapshot required", + False, + "fail_closed | reason=feature_snapshot_path_missing", + { + "strategy_profile": STRATEGY_PROFILE, + "feature_snapshot_path": None, + "strategy_config_path": FEATURE_RUNTIME_CONFIG_PATH, + "strategy_config_source": FEATURE_RUNTIME_CONFIG_SOURCE, + "dry_run_only": RUNTIME_SETTINGS.dry_run_only, + "snapshot_guard_decision": "fail_closed", + "fail_reason": "feature_snapshot_path_missing", + "managed_symbols": (), + "status_icon": "🛑", + }, + ) + guard_result = load_feature_snapshot_guarded( + FEATURE_SNAPSHOT_PATH, + run_as_of=run_as_of, + required_columns=getattr(STRATEGY_LOGIC, "REQUIRED_FEATURE_COLUMNS", ()), + snapshot_date_columns=getattr( + STRATEGY_LOGIC, + "SNAPSHOT_DATE_COLUMNS", + ("as_of", "snapshot_date"), + ), + max_snapshot_month_lag=int( + getattr(STRATEGY_LOGIC, "MAX_SNAPSHOT_MONTH_LAG", 1) + ), + manifest_path=FEATURE_SNAPSHOT_MANIFEST_PATH, + require_manifest=FEATURE_REQUIRE_SNAPSHOT_MANIFEST, + expected_strategy_profile=STRATEGY_PROFILE, + expected_config_name=FEATURE_RUNTIME_CONFIG_NAME, + expected_config_path=FEATURE_RUNTIME_CONFIG_PATH, + expected_contract_version=FEATURE_SNAPSHOT_CONTRACT_VERSION, + ) + guard_metadata = dict(guard_result.metadata) + print( + "snapshot_manifest_summary | " + f"profile={STRATEGY_PROFILE} decision={guard_metadata.get('snapshot_guard_decision')} " + f"snapshot_path={guard_metadata.get('snapshot_path')} " + f"snapshot_as_of={guard_metadata.get('snapshot_as_of')} " + f"snapshot_age_days={guard_metadata.get('snapshot_age_days')} " + f"snapshot_file_ts={guard_metadata.get('snapshot_file_timestamp')} " + f"manifest_path={guard_metadata.get('snapshot_manifest_path')} " + f"manifest_exists={guard_metadata.get('snapshot_manifest_exists')} " + f"manifest_contract={guard_metadata.get('snapshot_manifest_contract_version')} " + f"expected_config={FEATURE_RUNTIME_CONFIG_PATH} " + f"expected_profile={STRATEGY_PROFILE}", + flush=True, + ) + if guard_result.metadata.get("snapshot_guard_decision") != "proceed": + decision = guard_metadata.get("snapshot_guard_decision") + reason = guard_metadata.get("fail_reason") or guard_metadata.get("no_op_reason") + return ( + None, + "feature snapshot guard blocked execution", + False, + f"{decision} | reason={reason}", + { + "strategy_profile": STRATEGY_PROFILE, + "strategy_config_path": FEATURE_RUNTIME_CONFIG_PATH, + "strategy_config_source": FEATURE_RUNTIME_CONFIG_SOURCE, + "dry_run_only": RUNTIME_SETTINGS.dry_run_only, + "managed_symbols": (), + "status_icon": "🛑", + **guard_metadata, + }, + ) + feature_snapshot = guard_result.frame + feature_kwargs = { + "benchmark_symbol": FEATURE_BENCHMARK_SYMBOL, + "safe_haven": SAFE_HAVEN, + "holdings_count": FEATURE_HOLDINGS_COUNT, + "single_name_cap": FEATURE_SINGLE_NAME_CAP, + "sector_cap": FEATURE_SECTOR_CAP, + "hold_bonus": HOLD_BONUS, + "risk_on_exposure": FEATURE_RISK_ON_EXPOSURE, + "soft_defense_exposure": FEATURE_SOFT_DEFENSE_EXPOSURE, + "hard_defense_exposure": FEATURE_HARD_DEFENSE_EXPOSURE, + "soft_breadth_threshold": FEATURE_SOFT_BREADTH_THRESHOLD, + "hard_breadth_threshold": FEATURE_HARD_BREADTH_THRESHOLD, + "min_adv20_usd": FEATURE_MIN_ADV20_USD, + "sector_whitelist": FEATURE_SECTOR_WHITELIST, + "normalization": FEATURE_NORMALIZATION, + "score_template": FEATURE_SCORE_TEMPLATE, + "run_as_of": run_as_of, + "runtime_execution_window_trading_days": FEATURE_RUNTIME_EXECUTION_WINDOW_TRADING_DAYS, + "runtime_config_name": FEATURE_RUNTIME_CONFIG_NAME, + "runtime_config_path": FEATURE_RUNTIME_CONFIG_PATH, + "runtime_config_source": FEATURE_RUNTIME_CONFIG_SOURCE, + "residual_proxy": FEATURE_RESIDUAL_PROXY, + } + feature_kwargs = { + key: value + for key, value in feature_kwargs.items() + if key in FEATURE_SIGNAL_KWARG_KEYS and value is not None + } + try: + result = strategy_compute_signals( + feature_snapshot, + current_holdings, + **feature_kwargs, + ) + except Exception as exc: + return ( + None, + "feature snapshot compute failed", + False, + f"fail_closed | reason=feature_snapshot_compute_failed:{type(exc).__name__}:{exc}", + { + "strategy_profile": STRATEGY_PROFILE, + "strategy_config_path": FEATURE_RUNTIME_CONFIG_PATH, + "strategy_config_source": FEATURE_RUNTIME_CONFIG_SOURCE, + "dry_run_only": RUNTIME_SETTINGS.dry_run_only, + "managed_symbols": (), + "status_icon": "🛑", + **guard_metadata, + "snapshot_guard_decision": "fail_closed", + "fail_reason": f"feature_snapshot_compute_failed:{type(exc).__name__}:{exc}", + }, + ) + if len(result) == 5: + target_weights, signal_desc, is_emergency, status_desc, metadata = result + return ( + target_weights, + signal_desc, + is_emergency, + status_desc, + { + "strategy_profile": STRATEGY_PROFILE, + "feature_snapshot_path": FEATURE_SNAPSHOT_PATH, + "strategy_config_path": FEATURE_RUNTIME_CONFIG_PATH, + "strategy_config_source": FEATURE_RUNTIME_CONFIG_SOURCE, + "safe_haven_symbol": SAFE_HAVEN, + "dry_run_only": RUNTIME_SETTINGS.dry_run_only, + "trade_date": run_as_of.date().isoformat(), + **guard_metadata, + **metadata, + }, + ) + target_weights, signal_desc, is_emergency, status_desc = result + return ( + target_weights, + signal_desc, + is_emergency, + status_desc, + { + "strategy_profile": STRATEGY_PROFILE, + "feature_snapshot_path": FEATURE_SNAPSHOT_PATH, + "strategy_config_path": FEATURE_RUNTIME_CONFIG_PATH, + "strategy_config_source": FEATURE_RUNTIME_CONFIG_SOURCE, + "safe_haven_symbol": SAFE_HAVEN, + "dry_run_only": RUNTIME_SETTINGS.dry_run_only, + "trade_date": run_as_of.date().isoformat(), + **guard_metadata, + "managed_symbols": tuple( + getattr( + STRATEGY_LOGIC, + "extract_managed_symbols", + )(feature_snapshot, benchmark_symbol=FEATURE_BENCHMARK_SYMBOL, safe_haven=SAFE_HAVEN) + ), + "status_icon": STRATEGY_STATUS_ICON, + }, + ) + return strategy_compute_signals( ib, current_holdings, @@ -219,6 +499,14 @@ def compute_signals(ib, current_holdings): translator=t, pacing_sec=HIST_DATA_PACING_SEC, sma_period=SMA_PERIOD, + ) + ( + { + "strategy_profile": STRATEGY_PROFILE, + "managed_symbols": tuple(RANKING_POOL + [SAFE_HAVEN]), + "status_icon": STRATEGY_STATUS_ICON, + "safe_haven_symbol": SAFE_HAVEN, + "dry_run_only": RUNTIME_SETTINGS.dry_run_only, + }, ) @@ -255,7 +543,15 @@ def check_order_submitted(report): return application_check_order_submitted(report, translator=t) -def execute_rebalance(ib, target_weights, positions, account_values): +def execute_rebalance( + ib, + target_weights, + positions, + account_values, + *, + strategy_symbols=None, + signal_metadata=None, +): return application_execute_rebalance( ib, target_weights, @@ -265,12 +561,18 @@ def execute_rebalance(ib, target_weights, positions, account_values): submit_order_intent=submit_order_intent, order_intent_cls=OrderIntent, translator=t, - ranking_pool=RANKING_POOL, - safe_haven=SAFE_HAVEN, + strategy_symbols=strategy_symbols, + signal_metadata=signal_metadata or {}, + strategy_profile=STRATEGY_PROFILE, + account_group=ACCOUNT_GROUP, + service_name=SERVICE_NAME, + account_ids=ACCOUNT_IDS, + dry_run_only=RUNTIME_SETTINGS.dry_run_only, cash_reserve_ratio=CASH_RESERVE_RATIO, rebalance_threshold_ratio=REBALANCE_THRESHOLD_RATIO, limit_buy_premium=LIMIT_BUY_PREMIUM, sell_settle_delay_sec=SELL_SETTLE_DELAY_SEC, + return_summary=True, ) @@ -286,6 +588,7 @@ def run_strategy_core(): send_tg_message=send_tg_message, translator=t, separator=SEPARATOR, + reconciliation_output_path=RECONCILIATION_OUTPUT_PATH, ) diff --git a/requirements.txt b/requirements.txt index d598c91..f190f4d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ flask gunicorn quant-platform-kit @ git+https://github.com/QuantStrategyLab/QuantPlatformKit.git@v0.6.0 -us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@v0.6.0 +us-equity-strategies @ git+https://github.com/QuantStrategyLab/UsEquityStrategies.git@5c40d49927b7a86e27feee23b82da81338320404 pandas numpy requests diff --git a/research/configs/growth_pullback_cash_buffer_branch_default.json b/research/configs/growth_pullback_cash_buffer_branch_default.json new file mode 100644 index 0000000..324cb22 --- /dev/null +++ b/research/configs/growth_pullback_cash_buffer_branch_default.json @@ -0,0 +1,41 @@ +{ + "role": "cash_buffer_branch_default", + "status": "research_only", + "strategy": "growth_pullback_systematic_v1", + "branch_name": "cash_buffer_branch", + "name": "cash_buffer_branch_default", + "previous_candidate_name": "cash_buffer_a__hb10__base__adv50", + "family": "tech_heavy_pullback", + "universe": "tech_heavy", + "normalization": "universe_cross_sectional", + "min_adv20_usd": 50000000.0, + "sector_whitelist": [ + "Information Technology", + "Communication" + ], + "symbol_whitelist": [], + "notes": "Sector proxy only: Information Technology + Communication; no sub-industry data available", + "score_template": "balanced_pullback", + "holdings_count": 8, + "single_name_cap": 0.1, + "sector_cap": 0.4, + "hold_bonus": 0.1, + "regime": "qqq_breadth", + "benchmark_symbol": "QQQ", + "breadth_mode": "broad", + "breadth_symbols": [], + "breadth_thresholds": { + "soft": 0.55, + "hard": 0.35 + }, + "exposures": { + "risk_on": 0.8, + "soft_defense": 0.6, + "hard_defense": 0.0 + }, + "execution_cash_reserve_ratio": 0.0, + "residual_proxy": "simple_excess_return_vs_QQQ", + "cost_assumption_bps_one_way": 5.0, + "branch_role": "cash-buffered parallel branch", + "canonicalized_from": "growth_pullback_systematic_v1.4 cash_buffer_refinement" +} diff --git a/runtime_config_support.py b/runtime_config_support.py index 435c2cd..e6aad02 100644 --- a/runtime_config_support.py +++ b/runtime_config_support.py @@ -3,6 +3,7 @@ import json import os from dataclasses import dataclass +from pathlib import Path from typing import Any, Callable from strategy_registry import ( @@ -36,6 +37,12 @@ class PlatformRuntimeSettings: ib_client_id: int strategy_profile: str strategy_domain: str + feature_snapshot_path: str | None + feature_snapshot_manifest_path: str | None + strategy_config_path: str | None + strategy_config_source: str | None + reconciliation_output_path: str | None + dry_run_only: bool account_group: str service_name: str | None account_ids: tuple[str, ...] @@ -55,6 +62,13 @@ def load_platform_runtime_settings( os.getenv("STRATEGY_PROFILE"), platform_id=IBKR_PLATFORM, ) + strategy_config_path, strategy_config_source = resolve_strategy_config_path( + strategy_definition.profile, + explicit_path=first_non_empty( + os.getenv("IBKR_STRATEGY_CONFIG_PATH"), + os.getenv("STRATEGY_CONFIG_PATH"), + ), + ) account_group = resolve_account_group(os.getenv("ACCOUNT_GROUP")) group_config = load_account_group_config( project_id=project_id, @@ -96,6 +110,21 @@ def load_platform_runtime_settings( ), strategy_profile=strategy_definition.profile, strategy_domain=strategy_definition.domain, + feature_snapshot_path=first_non_empty( + os.getenv("IBKR_FEATURE_SNAPSHOT_PATH"), + os.getenv("FEATURE_SNAPSHOT_PATH"), + ), + feature_snapshot_manifest_path=first_non_empty( + os.getenv("IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH"), + os.getenv("FEATURE_SNAPSHOT_MANIFEST_PATH"), + ), + strategy_config_path=strategy_config_path, + strategy_config_source=strategy_config_source, + reconciliation_output_path=first_non_empty( + os.getenv("IBKR_RECONCILIATION_OUTPUT_PATH"), + os.getenv("RECONCILIATION_OUTPUT_PATH"), + ), + dry_run_only=resolve_bool_env(os.getenv("IBKR_DRY_RUN_ONLY")), account_group=account_group, service_name=group_config.service_name, account_ids=group_config.account_ids, @@ -119,6 +148,27 @@ def resolve_account_group(raw_value: str | None) -> str: return value +def resolve_strategy_config_path( + strategy_profile: str, + *, + explicit_path: str | None, +) -> tuple[str | None, str | None]: + path = first_non_empty(explicit_path) + if path is not None: + return path, "env" + + if str(strategy_profile).strip().lower() == "cash_buffer_branch_default": + bundled = ( + Path(__file__).resolve().parent + / "research" + / "configs" + / "growth_pullback_cash_buffer_branch_default.json" + ) + if bundled.exists(): + return str(bundled), "bundled_canonical_default" + return None, None + + def load_account_group_config( *, project_id: str | None, @@ -232,6 +282,11 @@ def first_non_empty(*values: str | None) -> str | None: return None +def resolve_bool_env(raw_value: str | None) -> bool: + value = str(raw_value or "").strip().lower() + return value in {"1", "true", "yes", "y", "on"} + + def require_group_string( raw_value: str | None, *, diff --git a/tests/conftest.py b/tests/conftest.py index a773e2a..4acf171 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,6 +21,8 @@ def load_strategy_module(**env_overrides): "STRATEGY_PROFILE": "global_etf_rotation", "ACCOUNT_GROUP": "default", "IB_CLIENT_ID": "1", + "IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH": None, + "IBKR_RECONCILIATION_OUTPUT_PATH": None, "IB_ACCOUNT_GROUP_CONFIG_JSON": ( '{"groups":{"default":{"ib_gateway_instance_name":"127.0.0.1",' '"ib_gateway_mode":"live","ib_client_id":1}}}' diff --git a/tests/test_cloud_run_entrypoint.py b/tests/test_cloud_run_entrypoint.py new file mode 100644 index 0000000..d173f37 --- /dev/null +++ b/tests/test_cloud_run_entrypoint.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from datetime import datetime + +import pytz + +from entrypoints.cloud_run import is_market_open_today + + +def test_is_market_open_today_falls_back_to_weekday_when_calendar_import_fails(monkeypatch): + monkeypatch.setattr( + "entrypoints.cloud_run.import_module", + lambda _name: (_ for _ in ()).throw(TypeError("broken calendar")), + ) + monkeypatch.setattr( + "entrypoints.cloud_run.datetime", + type( + "FakeDatetime", + (), + { + "now": staticmethod( + lambda _tz: datetime(2026, 4, 6, 12, 0, 0, tzinfo=pytz.timezone("America/New_York")) + ) + }, + ), + ) + + assert is_market_open_today() is True diff --git a/tests/test_execution_service.py b/tests/test_execution_service.py index a92d231..fb44180 100644 --- a/tests/test_execution_service.py +++ b/tests/test_execution_service.py @@ -22,8 +22,11 @@ def test_check_order_submitted_accepts_submitted_like_status(): assert "submitted 123" in message -def test_execute_rebalance_submits_limit_buy_for_underweight_position(monkeypatch): +def test_execute_rebalance_submits_limit_buy_for_underweight_position(monkeypatch, tmp_path): class FakeIB: + def openTrades(self): + return [] + def accountValues(self): return [SimpleNamespace(tag="AvailableFunds", currency="USD", value="5000")] @@ -47,16 +50,230 @@ def fake_fetch_quote_snapshots(_ib, symbols): submit_order_intent=fake_submit_order_intent, order_intent_cls=OrderIntent, translator=translate, - ranking_pool=["VOO"], - safe_haven="BIL", + strategy_symbols=["VOO", "BIL"], + strategy_profile="cash_buffer_branch_default", + signal_metadata={ + "regime": "risk_on", + "breadth_ratio": 0.6, + "target_stock_weight": 0.8, + "realized_stock_weight": 0.8, + "trade_date": "2026-04-01", + "snapshot_as_of": "2026-03-31", + }, + dry_run_only=False, cash_reserve_ratio=0.03, rebalance_threshold_ratio=0.02, limit_buy_premium=1.005, sell_settle_delay_sec=0, + execution_lock_dir=tmp_path, ) assert len(submitted) == 1 assert submitted[0].side == "buy" assert submitted[0].symbol == "VOO" assert submitted[0].order_type == "limit" - assert trade_logs and trade_logs[0].startswith("buy VOO") + assert any(log.startswith("buy VOO") for log in trade_logs) + + +def test_execute_rebalance_skips_when_pending_orders_exist(): + class FakeIB: + def openTrades(self): + return [ + SimpleNamespace( + contract=SimpleNamespace(symbol="VOO"), + orderStatus=SimpleNamespace(status="Submitted"), + ) + ] + + def accountValues(self): + return [SimpleNamespace(tag="AvailableFunds", currency="USD", value="5000")] + + trade_logs = execute_rebalance( + FakeIB(), + {"VOO": 1.0}, + {}, + {"equity": 1000.0, "buying_power": 1000.0}, + fetch_quote_snapshots=lambda *_args, **_kwargs: {"VOO": SimpleNamespace(last_price=100.0)}, + submit_order_intent=lambda *_args, **_kwargs: None, + order_intent_cls=OrderIntent, + translator=translate, + strategy_symbols=["VOO"], + strategy_profile="cash_buffer_branch_default", + signal_metadata={}, + dry_run_only=False, + cash_reserve_ratio=0.03, + rebalance_threshold_ratio=0.02, + limit_buy_premium=1.005, + sell_settle_delay_sec=0, + ) + + assert trade_logs == ["pending_orders_detected profile=cash_buffer_branch_default symbols=VOO"] + + +def test_execute_rebalance_blocks_same_day_repeat_via_execution_lock(tmp_path, monkeypatch): + class FakeIB: + def openTrades(self): + return [] + + def fills(self): + return [] + + def accountValues(self): + return [SimpleNamespace(tag="AvailableFunds", currency="USD", value="5000")] + + def fake_fetch_quote_snapshots(_ib, symbols): + return {symbol: SimpleNamespace(last_price=100.0) for symbol in symbols} + + monkeypatch.setattr("application.execution_service.time.sleep", lambda _seconds: None) + + kwargs = dict( + fetch_quote_snapshots=fake_fetch_quote_snapshots, + submit_order_intent=lambda *_args, **_kwargs: SimpleNamespace(broker_order_id="1", status="Submitted"), + order_intent_cls=OrderIntent, + translator=translate, + strategy_symbols=["VOO", "BOXX"], + strategy_profile="cash_buffer_branch_default", + account_group="default", + service_name="ibkr-paper", + account_ids=("DU123",), + signal_metadata={ + "regime": "risk_on", + "breadth_ratio": 0.6, + "target_stock_weight": 0.8, + "realized_stock_weight": 0.8, + "trade_date": "2026-04-01", + "snapshot_as_of": "2026-03-31", + }, + cash_reserve_ratio=0.0, + rebalance_threshold_ratio=0.02, + limit_buy_premium=1.005, + sell_settle_delay_sec=0, + execution_lock_dir=tmp_path, + ) + + first_logs = execute_rebalance( + FakeIB(), + {"VOO": 0.8, "BOXX": 0.2}, + {}, + {"equity": 1000.0, "buying_power": 1000.0}, + dry_run_only=True, + **kwargs, + ) + second_logs = execute_rebalance( + FakeIB(), + {"VOO": 0.8, "BOXX": 0.2}, + {}, + {"equity": 1000.0, "buying_power": 1000.0}, + dry_run_only=True, + **kwargs, + ) + paper_logs = execute_rebalance( + FakeIB(), + {"VOO": 0.8, "BOXX": 0.2}, + {}, + {"equity": 1000.0, "buying_power": 1000.0}, + dry_run_only=False, + **kwargs, + ) + + assert any("execution_lock_acquired" in log for log in first_logs) + assert any("same_day_execution_locked" in log for log in second_logs) + assert any("execution_lock_acquired" in log for log in paper_logs) + + +def test_execute_rebalance_skips_when_same_day_fills_detected(): + class FakeIB: + def openTrades(self): + return [] + + def fills(self): + return [ + SimpleNamespace( + contract=SimpleNamespace(symbol="VOO"), + execution=SimpleNamespace(time="2026-04-01 10:30:00"), + ) + ] + + def accountValues(self): + return [SimpleNamespace(tag="AvailableFunds", currency="USD", value="5000")] + + trade_logs = execute_rebalance( + FakeIB(), + {"VOO": 1.0}, + {}, + {"equity": 1000.0, "buying_power": 1000.0}, + fetch_quote_snapshots=lambda *_args, **_kwargs: {"VOO": SimpleNamespace(last_price=100.0)}, + submit_order_intent=lambda *_args, **_kwargs: None, + order_intent_cls=OrderIntent, + translator=translate, + strategy_symbols=["VOO"], + strategy_profile="cash_buffer_branch_default", + account_group="default", + service_name="ibkr-paper", + account_ids=("DU123",), + signal_metadata={"trade_date": "2026-04-01"}, + dry_run_only=False, + cash_reserve_ratio=0.0, + rebalance_threshold_ratio=0.02, + limit_buy_premium=1.005, + sell_settle_delay_sec=0, + ) + + assert any("same_day_fills_detected" in log for log in trade_logs) + + +def test_execute_rebalance_returns_structured_summary_when_requested(monkeypatch, tmp_path): + class FakeIB: + def openTrades(self): + return [] + + def fills(self): + return [] + + def accountValues(self): + return [SimpleNamespace(tag="AvailableFunds", currency="USD", value="5000")] + + def fake_fetch_quote_snapshots(_ib, symbols): + return {symbol: SimpleNamespace(last_price=100.0) for symbol in symbols} + + monkeypatch.setattr("application.execution_service.time.sleep", lambda _seconds: None) + + trade_logs, summary = execute_rebalance( + FakeIB(), + {"VOO": 0.8, "BOXX": 0.2}, + {}, + {"equity": 1000.0, "buying_power": 1000.0}, + fetch_quote_snapshots=fake_fetch_quote_snapshots, + submit_order_intent=lambda *_args, **_kwargs: SimpleNamespace(broker_order_id="1", status="Submitted"), + order_intent_cls=OrderIntent, + translator=translate, + strategy_symbols=["VOO", "BOXX"], + strategy_profile="cash_buffer_branch_default", + account_group="default", + service_name="ibkr-paper", + account_ids=("DU123",), + signal_metadata={ + "regime": "risk_on", + "breadth_ratio": 0.6, + "target_stock_weight": 0.8, + "realized_stock_weight": 0.8, + "safe_haven_weight": 0.2, + "safe_haven_symbol": "BOXX", + "trade_date": "2026-04-01", + "snapshot_as_of": "2026-03-31", + }, + dry_run_only=True, + cash_reserve_ratio=0.0, + rebalance_threshold_ratio=0.02, + limit_buy_premium=1.005, + sell_settle_delay_sec=0, + execution_lock_dir=tmp_path, + return_summary=True, + ) + + assert any("execution_lock_acquired" in log for log in trade_logs) + assert summary["execution_status"] == "executed" + assert summary["mode"] == "dry_run" + assert summary["safe_haven_symbol"] == "BOXX" + assert summary["orders_submitted"] + assert summary["target_vs_current"] diff --git a/tests/test_feature_snapshot_service.py b/tests/test_feature_snapshot_service.py new file mode 100644 index 0000000..ba99a33 --- /dev/null +++ b/tests/test_feature_snapshot_service.py @@ -0,0 +1,150 @@ +from __future__ import annotations + +import hashlib +import json +import unittest +from pathlib import Path +from tempfile import TemporaryDirectory + + +def _sha256_file(path: Path) -> str: + hasher = hashlib.sha256() + with path.open("rb") as fh: + while True: + chunk = fh.read(1024 * 1024) + if not chunk: + break + hasher.update(chunk) + return hasher.hexdigest() + + +def _skip_if_missing_pandas() -> None: + try: + import pandas # noqa: F401 + except ModuleNotFoundError as exc: + raise unittest.SkipTest(f"pandas unavailable: {exc}") from exc + + +class FeatureSnapshotServiceTest(unittest.TestCase): + def test_load_feature_snapshot_reads_csv(self): + _skip_if_missing_pandas() + from application.feature_snapshot_service import load_feature_snapshot + + with TemporaryDirectory() as tmp_dir: + path = Path(tmp_dir) / "snapshot.csv" + path.write_text("symbol,sector,mom_6_1\nAAA,tech,0.1\n", encoding="utf-8") + + frame = load_feature_snapshot(str(path)) + + self.assertEqual(frame.to_dict(orient="records"), [{"symbol": "AAA", "sector": "tech", "mom_6_1": 0.1}]) + + def test_load_feature_snapshot_rejects_unknown_format(self): + _skip_if_missing_pandas() + from application.feature_snapshot_service import load_feature_snapshot + + with TemporaryDirectory() as tmp_dir: + path = Path(tmp_dir) / "snapshot.txt" + path.write_text("hello", encoding="utf-8") + + with self.assertRaisesRegex(ValueError, "Unsupported feature snapshot format"): + load_feature_snapshot(str(path)) + + def test_load_feature_snapshot_guarded_fails_closed_when_missing(self): + _skip_if_missing_pandas() + from application.feature_snapshot_service import load_feature_snapshot_guarded + + result = load_feature_snapshot_guarded( + "/tmp/definitely-missing-snapshot.csv", + run_as_of="2026-04-05", + required_columns=("symbol", "as_of"), + ) + + self.assertIsNone(result.frame) + self.assertEqual(result.metadata["snapshot_guard_decision"], "fail_closed") + self.assertIn("feature_snapshot_missing", result.metadata["fail_reason"]) + + def test_load_feature_snapshot_guarded_fails_closed_when_stale(self): + _skip_if_missing_pandas() + from application.feature_snapshot_service import load_feature_snapshot_guarded + + with TemporaryDirectory() as tmp_dir: + path = Path(tmp_dir) / "snapshot.csv" + path.write_text("as_of,symbol,sector,mom_6_1\n2026-01-31,AAA,tech,0.1\n", encoding="utf-8") + + result = load_feature_snapshot_guarded( + str(path), + run_as_of="2026-04-05", + required_columns=("as_of", "symbol", "sector", "mom_6_1"), + ) + + self.assertIsNone(result.frame) + self.assertEqual(result.metadata["snapshot_guard_decision"], "fail_closed") + self.assertIn("feature_snapshot_stale", result.metadata["fail_reason"]) + + def test_load_feature_snapshot_guarded_requires_manifest_when_requested(self): + _skip_if_missing_pandas() + from application.feature_snapshot_service import load_feature_snapshot_guarded + + with TemporaryDirectory() as tmp_dir: + path = Path(tmp_dir) / "snapshot.csv" + path.write_text("as_of,symbol,sector,mom_6_1\n2026-03-31,AAA,tech,0.1\n", encoding="utf-8") + + result = load_feature_snapshot_guarded( + str(path), + run_as_of="2026-04-01", + required_columns=("as_of", "symbol", "sector", "mom_6_1"), + require_manifest=True, + expected_strategy_profile="cash_buffer_branch_default", + expected_config_name="cash_buffer_branch_default", + expected_contract_version="cash_buffer_branch_default.feature_snapshot.v1", + ) + + self.assertIsNone(result.frame) + self.assertEqual(result.metadata["snapshot_guard_decision"], "fail_closed") + self.assertIn("feature_snapshot_manifest_missing", result.metadata["fail_reason"]) + + def test_load_feature_snapshot_guarded_validates_manifest_checksums(self): + _skip_if_missing_pandas() + from application.feature_snapshot_service import load_feature_snapshot_guarded + + with TemporaryDirectory() as tmp_dir: + snapshot_path = Path(tmp_dir) / "snapshot.csv" + config_path = Path(tmp_dir) / "config.json" + snapshot_path.write_text("as_of,symbol,sector,mom_6_1\n2026-03-31,AAA,tech,0.1\n", encoding="utf-8") + config_path.write_text(json.dumps({"name": "cash_buffer_branch_default"}), encoding="utf-8") + manifest_path = Path(f"{snapshot_path}.manifest.json") + manifest_path.write_text( + json.dumps( + { + "contract_version": "cash_buffer_branch_default.feature_snapshot.v1", + "strategy_profile": "cash_buffer_branch_default", + "config_name": "cash_buffer_branch_default", + "config_path": str(config_path), + "config_sha256": _sha256_file(config_path), + "snapshot_path": str(snapshot_path), + "snapshot_sha256": _sha256_file(snapshot_path), + "snapshot_as_of": "2026-03-31", + } + ), + encoding="utf-8", + ) + + result = load_feature_snapshot_guarded( + str(snapshot_path), + run_as_of="2026-04-01", + required_columns=("as_of", "symbol", "sector", "mom_6_1"), + require_manifest=True, + expected_strategy_profile="cash_buffer_branch_default", + expected_config_name="cash_buffer_branch_default", + expected_config_path=str(config_path), + expected_contract_version="cash_buffer_branch_default.feature_snapshot.v1", + ) + + self.assertIsNotNone(result.frame) + self.assertEqual(result.metadata["snapshot_guard_decision"], "proceed") + self.assertEqual(result.metadata["snapshot_manifest_strategy_profile"], "cash_buffer_branch_default") + self.assertEqual(result.metadata["snapshot_manifest_config_name"], "cash_buffer_branch_default") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_rebalance_service.py b/tests/test_rebalance_service.py new file mode 100644 index 0000000..75d1e90 --- /dev/null +++ b/tests/test_rebalance_service.py @@ -0,0 +1,125 @@ +import json + +from application.rebalance_service import run_strategy_core + + +def test_run_strategy_core_passes_signal_metadata_to_execution(): + observed = {"messages": [], "strategy_symbols": None} + + class FakeIB: + def isConnected(self): + return True + + def disconnect(self): + observed["disconnected"] = True + + def fake_execute_rebalance( + _ib, + _weights, + _positions, + _account_values, + *, + strategy_symbols=None, + signal_metadata=None, + ): + observed["strategy_symbols"] = strategy_symbols + observed["signal_metadata"] = signal_metadata + return [] + + result = run_strategy_core( + connect_ib=lambda: FakeIB(), + get_current_portfolio=lambda _ib: ({}, {"equity": 1000.0, "buying_power": 500.0}), + compute_signals=lambda _ib, _holdings: ( + {"AAA": 0.9, "BOXX": 0.1}, + "signal", + False, + "breadth=60.0%", + {"managed_symbols": ("AAA", "BOXX"), "status_icon": "📏"}, + ), + execute_rebalance=fake_execute_rebalance, + send_tg_message=lambda message: observed["messages"].append(message), + translator=lambda key, **_kwargs: { + "heartbeat_title": "heartbeat", + "rebalance_title": "rebalance", + "no_trades": "no trades", + "equity": "equity", + "buying_power": "buying_power", + }.get(key, key), + separator="---", + ) + + assert result == "OK - executed" + assert observed["strategy_symbols"] == ("AAA", "BOXX") + assert observed["signal_metadata"]["managed_symbols"] == ("AAA", "BOXX") + assert observed["messages"] + assert "📏 breadth=60.0%" in observed["messages"][0] + + +def test_run_strategy_core_writes_reconciliation_record(tmp_path): + observed = {"messages": []} + + class FakeIB: + def isConnected(self): + return True + + def disconnect(self): + return None + + output_path = tmp_path / "reconciliation.json" + + result = run_strategy_core( + connect_ib=lambda: FakeIB(), + get_current_portfolio=lambda _ib: ({}, {"equity": 1000.0, "buying_power": 500.0}), + compute_signals=lambda _ib, _holdings: ( + {"AAA": 0.6, "BOXX": 0.4}, + "signal", + False, + "breadth=41.0%", + { + "strategy_profile": "cash_buffer_branch_default", + "managed_symbols": ("AAA", "BOXX"), + "status_icon": "🧲", + "trade_date": "2026-04-01", + "snapshot_as_of": "2026-03-31", + "snapshot_guard_decision": "proceed", + "regime": "soft_defense", + "breadth_ratio": 0.41, + "target_stock_weight": 0.6, + "realized_stock_weight": 0.6, + "safe_haven_weight": 0.4, + "safe_haven_symbol": "BOXX", + "dry_run_only": True, + }, + ), + execute_rebalance=lambda *_args, **_kwargs: ( + ["DRY_RUN buy AAA 1 @100.00"], + { + "mode": "dry_run", + "execution_status": "executed", + "orders_submitted": [{"symbol": "AAA", "side": "buy", "quantity": 1, "status": "dry_run"}], + "orders_filled": [], + "orders_partially_filled": [], + "orders_skipped": [], + "skipped_reasons": [], + "residual_cash_estimate": 400.0, + "realized_safe_haven_weight": 0.4, + "target_vs_current": [{"symbol": "AAA", "current_weight": 0.0, "target_weight": 0.6, "delta_weight": 0.6}], + }, + ), + send_tg_message=lambda message: observed["messages"].append(message), + translator=lambda key, **_kwargs: { + "heartbeat_title": "heartbeat", + "rebalance_title": "rebalance", + "no_trades": "no trades", + "equity": "equity", + "buying_power": "buying_power", + }.get(key, key), + separator="---", + reconciliation_output_path=output_path, + ) + + assert result == "OK - executed" + payload = json.loads(output_path.read_text(encoding="utf-8")) + assert payload["strategy_profile"] == "cash_buffer_branch_default" + assert payload["snapshot_as_of"] == "2026-03-31" + assert payload["orders_submitted"][0]["symbol"] == "AAA" diff --git a/tests/test_runtime_config_support.py b/tests/test_runtime_config_support.py index 737414a..7a888ad 100644 --- a/tests/test_runtime_config_support.py +++ b/tests/test_runtime_config_support.py @@ -68,6 +68,12 @@ def test_load_platform_runtime_settings_uses_minimal_group_config(monkeypatch): assert settings.ib_client_id == 1 assert settings.strategy_profile == DEFAULT_STRATEGY_PROFILE assert settings.strategy_domain == US_EQUITY_DOMAIN + assert settings.feature_snapshot_path is None + assert settings.feature_snapshot_manifest_path is None + assert settings.strategy_config_path is None + assert settings.strategy_config_source is None + assert settings.reconciliation_output_path is None + assert settings.dry_run_only is False assert settings.account_group == "default" assert settings.service_name is None assert settings.account_ids == () @@ -101,6 +107,8 @@ def test_load_platform_runtime_settings_supports_explicit_group_config_values(mo assert settings.ib_client_id == 7 assert settings.strategy_profile == DEFAULT_STRATEGY_PROFILE assert settings.strategy_domain == US_EQUITY_DOMAIN + assert settings.feature_snapshot_path is None + assert settings.feature_snapshot_manifest_path is None assert settings.account_group == "taxable_main" assert settings.service_name == "interactive-brokers-quant-global-etf-rotation-taxable-main" assert settings.account_ids == ("U1234567",) @@ -120,7 +128,59 @@ def test_load_platform_runtime_settings_rejects_unknown_strategy_profile(monkeyp def test_platform_supported_profiles_are_filtered_by_registry(): - assert get_supported_profiles_for_platform(IBKR_PLATFORM) == frozenset({DEFAULT_STRATEGY_PROFILE}) + assert get_supported_profiles_for_platform(IBKR_PLATFORM) == frozenset( + { + "cash_buffer_branch_default", + "global_etf_rotation", + "russell_1000_multi_factor_defensive", + } + ) + + +def test_load_platform_runtime_settings_reads_feature_snapshot_path(monkeypatch): + monkeypatch.setenv("STRATEGY_PROFILE", "russell_1000_multi_factor_defensive") + monkeypatch.setenv("ACCOUNT_GROUP", "default") + monkeypatch.setenv("IB_ACCOUNT_GROUP_CONFIG_JSON", MINIMAL_GROUP_JSON) + monkeypatch.setenv("IBKR_FEATURE_SNAPSHOT_PATH", "/tmp/r1000-latest.csv") + + settings = load_platform_runtime_settings(project_id_resolver=lambda: "project-1") + + assert settings.feature_snapshot_path == "/tmp/r1000-latest.csv" + + +def test_load_platform_runtime_settings_reads_cash_buffer_runtime_config(monkeypatch): + monkeypatch.setenv("STRATEGY_PROFILE", "cash_buffer_branch_default") + monkeypatch.setenv("ACCOUNT_GROUP", "default") + monkeypatch.setenv("IB_ACCOUNT_GROUP_CONFIG_JSON", MINIMAL_GROUP_JSON) + monkeypatch.setenv("IBKR_FEATURE_SNAPSHOT_PATH", "/tmp/cash-buffer.csv") + monkeypatch.setenv("IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH", "/tmp/cash-buffer.csv.manifest.json") + monkeypatch.setenv("IBKR_STRATEGY_CONFIG_PATH", "/tmp/cash-buffer-config.json") + monkeypatch.setenv("IBKR_RECONCILIATION_OUTPUT_PATH", "/tmp/reconciliation.json") + monkeypatch.setenv("IBKR_DRY_RUN_ONLY", "true") + + settings = load_platform_runtime_settings(project_id_resolver=lambda: "project-1") + + assert settings.feature_snapshot_path == "/tmp/cash-buffer.csv" + assert settings.feature_snapshot_manifest_path == "/tmp/cash-buffer.csv.manifest.json" + assert settings.strategy_config_path == "/tmp/cash-buffer-config.json" + assert settings.strategy_config_source == "env" + assert settings.reconciliation_output_path == "/tmp/reconciliation.json" + assert settings.dry_run_only is True + + +def test_load_platform_runtime_settings_uses_bundled_cash_buffer_config_when_env_missing(monkeypatch): + monkeypatch.setenv("STRATEGY_PROFILE", "cash_buffer_branch_default") + monkeypatch.setenv("ACCOUNT_GROUP", "default") + monkeypatch.setenv("IB_ACCOUNT_GROUP_CONFIG_JSON", MINIMAL_GROUP_JSON) + monkeypatch.setenv("IBKR_FEATURE_SNAPSHOT_PATH", "/tmp/cash-buffer.csv") + monkeypatch.delenv("IBKR_STRATEGY_CONFIG_PATH", raising=False) + monkeypatch.delenv("STRATEGY_CONFIG_PATH", raising=False) + + settings = load_platform_runtime_settings(project_id_resolver=lambda: "project-1") + + assert settings.strategy_config_path is not None + assert settings.strategy_config_path.endswith("growth_pullback_cash_buffer_branch_default.json") + assert settings.strategy_config_source == "bundled_canonical_default" diff --git a/tests/test_snapshot_strategy_runtime.py b/tests/test_snapshot_strategy_runtime.py new file mode 100644 index 0000000..584a777 --- /dev/null +++ b/tests/test_snapshot_strategy_runtime.py @@ -0,0 +1,260 @@ +import pytest +from pathlib import Path +import hashlib +import json +from types import SimpleNamespace + + +def _sha256_file(path: Path) -> str: + hasher = hashlib.sha256() + with path.open("rb") as fh: + while True: + chunk = fh.read(1024 * 1024) + if not chunk: + break + hasher.update(chunk) + return hasher.hexdigest() + + +def _write_cash_buffer_manifest(snapshot_path: Path, config_path: Path, *, snapshot_as_of: str) -> Path: + manifest_path = Path(f"{snapshot_path}.manifest.json") + manifest_path.write_text( + json.dumps( + { + "manifest_type": "feature_snapshot", + "contract_version": "cash_buffer_branch_default.feature_snapshot.v1", + "strategy_profile": "cash_buffer_branch_default", + "config_name": "cash_buffer_branch_default", + "config_path": str(config_path), + "config_sha256": _sha256_file(config_path), + "snapshot_path": str(snapshot_path), + "snapshot_sha256": _sha256_file(snapshot_path), + "snapshot_as_of": snapshot_as_of, + } + ), + encoding="utf-8", + ) + return manifest_path + + +def test_compute_signals_uses_feature_snapshot_for_russell_1000(strategy_module_factory, monkeypatch): + pytest.importorskip("pandas") + + module = strategy_module_factory( + STRATEGY_PROFILE="russell_1000_multi_factor_defensive", + IBKR_FEATURE_SNAPSHOT_PATH="/tmp/r1000.csv", + ) + + observed = {} + + def fake_load_feature_snapshot_guarded(path, **_kwargs): + observed["path"] = path + return SimpleNamespace( + frame=[ + { + "as_of": "2026-03-31", + "symbol": "SPY", + "sector": "benchmark", + "mom_6_1": 0.1, + "mom_12_1": 0.1, + "sma200_gap": 0.1, + "vol_63": 0.1, + "maxdd_126": 0.1, + "eligible": False, + } + ], + metadata={ + "snapshot_guard_decision": "proceed", + "snapshot_as_of": "2026-03-31", + "snapshot_path": path, + "snapshot_age_days": 1, + }, + ) + + monkeypatch.setattr(module, "load_feature_snapshot_guarded", fake_load_feature_snapshot_guarded) + monkeypatch.setattr( + module, + "strategy_compute_signals", + lambda snapshot, holdings, **kwargs: ( + {"BOXX": 1.0}, + "signal", + False, + "breadth=0.0%", + {"managed_symbols": ("BOXX",), "status_icon": "📏"}, + ), + ) + + result = module.compute_signals(None, {"AAA"}) + + assert observed["path"] == "/tmp/r1000.csv" + assert result[0] == {"BOXX": 1.0} + assert result[4]["status_icon"] == "📏" + assert result[4]["snapshot_guard_decision"] == "proceed" + + +def test_compute_signals_loads_cash_buffer_branch_default_runtime(strategy_module_factory, monkeypatch, tmp_path): + pytest.importorskip("pandas") + + snapshot_path = tmp_path / "cash_buffer_snapshot.csv" + config_path = tmp_path / "cash_buffer_branch_default.json" + snapshot_path.write_text( + "\n".join( + [ + "as_of,symbol,sector,close,volume,adv20_usd,history_days,mom_6_1,mom_12_1,sma20_gap,sma50_gap,sma200_gap,ma50_over_ma200,vol_63,maxdd_126,breakout_252,dist_63_high,dist_126_high,rebound_20,base_eligible", + "2026-03-31,QQQ,benchmark,500,1000000,1000000000,400,0.20,0.30,0.03,0.05,0.08,0.04,0.22,-0.12,-0.01,-0.03,-0.05,0.04,false", + "2026-03-31,BOXX,defense,101,200000,20000000,400,0.02,0.04,0.00,0.00,0.01,0.00,0.03,-0.01,0.00,-0.01,-0.01,0.00,false", + "2026-03-31,AAPL,Information Technology,200,1000000,150000000,400,0.20,0.35,0.03,0.05,0.10,0.05,0.18,-0.08,-0.01,-0.03,-0.05,0.05,true", + "2026-03-31,MSFT,Information Technology,350,1000000,150000000,400,0.18,0.33,0.03,0.05,0.09,0.04,0.17,-0.09,-0.02,-0.04,-0.06,0.04,true", + "2026-03-31,NVDA,Information Technology,900,1000000,150000000,400,0.30,0.60,0.07,0.09,0.18,0.10,0.35,-0.05,-0.01,-0.02,-0.04,0.12,true", + "2026-03-31,META,Communication,520,1000000,150000000,400,0.22,0.40,0.04,0.06,0.11,0.05,0.24,-0.07,-0.03,-0.05,-0.07,0.07,true", + "2026-03-31,GOOGL,Communication,180,1000000,150000000,400,0.17,0.28,0.02,0.04,0.08,0.03,0.20,-0.08,-0.04,-0.06,-0.08,0.05,true", + "2026-03-31,NFLX,Communication,620,1000000,150000000,400,0.18,0.31,0.03,0.05,0.09,0.04,0.22,-0.07,-0.03,-0.05,-0.07,0.05,true", + "2026-03-31,TTWO,Communication,210,1000000,150000000,400,0.14,0.20,0.01,0.03,0.06,0.02,0.18,-0.09,-0.04,-0.06,-0.09,0.03,true", + "2026-03-31,ADBE,Information Technology,600,1000000,150000000,400,0.16,0.27,0.02,0.04,0.07,0.03,0.19,-0.08,-0.05,-0.06,-0.08,0.04,true", + "2026-03-31,CRM,Information Technology,320,1000000,150000000,400,0.15,0.26,0.02,0.03,0.07,0.03,0.18,-0.09,-0.05,-0.06,-0.09,0.03,true", + "2026-03-31,NOW,Information Technology,780,1000000,150000000,400,0.16,0.29,0.02,0.04,0.08,0.03,0.19,-0.07,-0.04,-0.06,-0.08,0.05,true", + ] + ), + encoding="utf-8", + ) + config_path.write_text( + json.dumps( + { + "name": "cash_buffer_branch_default", + "family": "tech_heavy_pullback", + "branch_role": "cash-buffered parallel branch", + "benchmark_symbol": "QQQ", + "holdings_count": 8, + "single_name_cap": 0.10, + "sector_cap": 0.40, + "hold_bonus": 0.10, + "min_adv20_usd": 50_000_000.0, + "normalization": "universe_cross_sectional", + "score_template": "balanced_pullback", + "sector_whitelist": ["Information Technology", "Communication"], + "breadth_thresholds": {"soft": 0.55, "hard": 0.35}, + "exposures": {"risk_on": 0.8, "soft_defense": 0.6, "hard_defense": 0.0}, + "execution_cash_reserve_ratio": 0.0, + "residual_proxy": "simple_excess_return_vs_QQQ", + } + ), + encoding="utf-8", + ) + _write_cash_buffer_manifest(snapshot_path, config_path, snapshot_as_of="2026-03-31") + + module = strategy_module_factory( + STRATEGY_PROFILE="cash_buffer_branch_default", + IBKR_FEATURE_SNAPSHOT_PATH=str(snapshot_path), + IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH=str(Path(f"{snapshot_path}.manifest.json")), + IBKR_STRATEGY_CONFIG_PATH=str(config_path), + IBKR_RUN_AS_OF_DATE="2026-04-01", + ) + result = module.compute_signals(None, {"AAPL"}) + + assert result[0]["BOXX"] == pytest.approx(0.2) + assert result[4]["strategy_profile"] == "cash_buffer_branch_default" + assert result[4]["strategy_config_source"] in {"env", "external_config"} + assert result[4]["realized_stock_weight"] == pytest.approx(0.8) + assert result[4]["snapshot_guard_decision"] == "proceed" + assert module.CASH_RESERVE_RATIO == pytest.approx(0.0) + + +def test_compute_signals_fail_closes_when_snapshot_missing(strategy_module_factory): + module = strategy_module_factory( + STRATEGY_PROFILE="cash_buffer_branch_default", + IBKR_FEATURE_SNAPSHOT_PATH="/tmp/definitely-missing-cash-buffer-snapshot.csv", + IBKR_RUN_AS_OF_DATE="2026-04-01", + ) + + result = module.compute_signals(None, {"AAPL"}) + + assert result[0] is None + assert result[4]["snapshot_guard_decision"] == "fail_closed" + assert "feature_snapshot_missing" in result[4]["fail_reason"] + + +def test_compute_signals_fail_closes_when_snapshot_is_stale(strategy_module_factory, tmp_path): + snapshot_path = tmp_path / "stale_snapshot.csv" + snapshot_path.write_text( + "\n".join( + [ + "as_of,symbol,sector,close,adv20_usd,history_days,mom_6_1,mom_12_1,sma20_gap,sma50_gap,sma200_gap,ma50_over_ma200,vol_63,maxdd_126,breakout_252,dist_63_high,dist_126_high,rebound_20,base_eligible", + "2026-01-31,QQQ,benchmark,500,1000000000,400,0.20,0.30,0.03,0.05,0.08,0.04,0.22,-0.12,-0.01,-0.03,-0.05,0.04,false", + "2026-01-31,BOXX,defense,101,20000000,400,0.02,0.04,0.00,0.00,0.01,0.00,0.03,-0.01,0.00,-0.01,-0.01,0.00,false", + "2026-01-31,AAPL,Information Technology,200,150000000,400,0.20,0.35,0.03,0.05,0.10,0.05,0.18,-0.08,-0.01,-0.03,-0.05,0.05,true", + ] + ), + encoding="utf-8", + ) + + module = strategy_module_factory( + STRATEGY_PROFILE="cash_buffer_branch_default", + IBKR_FEATURE_SNAPSHOT_PATH=str(snapshot_path), + IBKR_RUN_AS_OF_DATE="2026-04-05", + ) + + result = module.compute_signals(None, {"AAPL"}) + + assert result[0] is None + assert result[4]["snapshot_guard_decision"] == "fail_closed" + assert "feature_snapshot_stale" in result[4]["fail_reason"] + + +def test_compute_signals_fail_closes_when_manifest_missing(strategy_module_factory, tmp_path): + snapshot_path = tmp_path / "snapshot.csv" + config_path = tmp_path / "cash_buffer_branch_default.json" + snapshot_path.write_text( + "\n".join( + [ + "as_of,symbol,sector,close,adv20_usd,history_days,mom_6_1,mom_12_1,sma20_gap,sma50_gap,sma200_gap,ma50_over_ma200,vol_63,maxdd_126,breakout_252,dist_63_high,dist_126_high,rebound_20,base_eligible", + "2026-03-31,QQQ,benchmark,500,1000000000,400,0.20,0.30,0.03,0.05,0.08,0.04,0.22,-0.12,-0.01,-0.03,-0.05,0.04,false", + "2026-03-31,BOXX,defense,101,20000000,400,0.02,0.04,0.00,0.00,0.01,0.00,0.03,-0.01,0.00,-0.01,-0.01,0.00,false", + "2026-03-31,AAPL,Information Technology,200,150000000,400,0.20,0.35,0.03,0.05,0.10,0.05,0.18,-0.08,-0.01,-0.03,-0.05,0.05,true", + ] + ), + encoding="utf-8", + ) + config_path.write_text( + json.dumps( + { + "name": "cash_buffer_branch_default", + "family": "tech_heavy_pullback", + "branch_role": "cash-buffered parallel branch", + "benchmark_symbol": "QQQ", + "holdings_count": 8, + "single_name_cap": 0.10, + "sector_cap": 0.40, + "hold_bonus": 0.10, + "min_adv20_usd": 50_000_000.0, + "normalization": "universe_cross_sectional", + "score_template": "balanced_pullback", + "sector_whitelist": ["Information Technology", "Communication"], + "breadth_thresholds": {"soft": 0.55, "hard": 0.35}, + "exposures": {"risk_on": 0.8, "soft_defense": 0.6, "hard_defense": 0.0}, + "execution_cash_reserve_ratio": 0.0, + "residual_proxy": "simple_excess_return_vs_QQQ", + } + ), + encoding="utf-8", + ) + + module = strategy_module_factory( + STRATEGY_PROFILE="cash_buffer_branch_default", + IBKR_FEATURE_SNAPSHOT_PATH=str(snapshot_path), + IBKR_STRATEGY_CONFIG_PATH=str(config_path), + IBKR_RUN_AS_OF_DATE="2026-04-01", + ) + + result = module.compute_signals(None, {"AAPL"}) + + assert result[0] is None + assert result[4]["snapshot_guard_decision"] == "fail_closed" + assert "feature_snapshot_manifest_missing" in result[4]["fail_reason"] + + +def test_global_etf_rotation_keeps_default_cash_reserve(strategy_module_factory): + module = strategy_module_factory( + STRATEGY_PROFILE="global_etf_rotation", + ) + + assert module.CASH_RESERVE_RATIO == pytest.approx(0.03) diff --git a/tests/test_strategy_loader.py b/tests/test_strategy_loader.py index 9701c12..bc36170 100644 --- a/tests/test_strategy_loader.py +++ b/tests/test_strategy_loader.py @@ -1,8 +1,45 @@ +import sys +import types + from strategy_loader import load_signal_logic_module -def test_load_signal_logic_module_resolves_global_etf_rotation(): +def test_load_signal_logic_module_resolves_global_etf_rotation(monkeypatch): + market_calendars_module = types.ModuleType("pandas_market_calendars") + market_calendars_module.get_calendar = lambda name: None + monkeypatch.setitem(sys.modules, "pandas_market_calendars", market_calendars_module) + sys.modules.pop("us_equity_strategies.strategies.global_etf_rotation", None) + module = load_signal_logic_module("global_etf_rotation") assert module.__name__ == "us_equity_strategies.strategies.global_etf_rotation" assert module.TOP_N == 2 + + +def test_load_signal_logic_module_resolves_russell_1000_multi_factor_defensive(): + try: + import pandas # noqa: F401 + except ModuleNotFoundError: + return + + module = load_signal_logic_module("russell_1000_multi_factor_defensive") + + assert module.__name__ == "us_equity_strategies.strategies.russell_1000_multi_factor_defensive" + assert module.SIGNAL_SOURCE == "feature_snapshot" + + +def test_load_signal_logic_module_resolves_cash_buffer_branch_default(monkeypatch): + try: + import pandas # noqa: F401 + except ModuleNotFoundError: + return + + market_calendars_module = types.ModuleType("pandas_market_calendars") + market_calendars_module.get_calendar = lambda name: None + monkeypatch.setitem(sys.modules, "pandas_market_calendars", market_calendars_module) + sys.modules.pop("us_equity_strategies.strategies.cash_buffer_branch_default", None) + + module = load_signal_logic_module("cash_buffer_branch_default") + + assert module.__name__ == "us_equity_strategies.strategies.cash_buffer_branch_default" + assert module.SIGNAL_SOURCE == "feature_snapshot" diff --git a/tests/test_sync_cloud_run_env_workflow.sh b/tests/test_sync_cloud_run_env_workflow.sh index a8c55d0..d5267ac 100644 --- a/tests/test_sync_cloud_run_env_workflow.sh +++ b/tests/test_sync_cloud_run_env_workflow.sh @@ -17,6 +17,11 @@ grep -Fq 'TELEGRAM_TOKEN_SECRET_NAME: ${{ vars.TELEGRAM_TOKEN_SECRET_NAME }}' "$ grep -Fq 'STRATEGY_PROFILE: ${{ vars.STRATEGY_PROFILE }}' "$workflow_file" grep -Fq 'ACCOUNT_GROUP: ${{ vars.ACCOUNT_GROUP }}' "$workflow_file" grep -Fq 'IB_ACCOUNT_GROUP_CONFIG_SECRET_NAME: ${{ vars.IB_ACCOUNT_GROUP_CONFIG_SECRET_NAME }}' "$workflow_file" +grep -Fq 'IBKR_FEATURE_SNAPSHOT_PATH: ${{ vars.IBKR_FEATURE_SNAPSHOT_PATH }}' "$workflow_file" +grep -Fq 'IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH: ${{ vars.IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH }}' "$workflow_file" +grep -Fq 'IBKR_STRATEGY_CONFIG_PATH: ${{ vars.IBKR_STRATEGY_CONFIG_PATH }}' "$workflow_file" +grep -Fq 'IBKR_RECONCILIATION_OUTPUT_PATH: ${{ vars.IBKR_RECONCILIATION_OUTPUT_PATH }}' "$workflow_file" +grep -Fq 'IBKR_DRY_RUN_ONLY: ${{ vars.IBKR_DRY_RUN_ONLY }}' "$workflow_file" grep -Fq 'IB_GATEWAY_ZONE: ${{ vars.IB_GATEWAY_ZONE }}' "$workflow_file" grep -Fq 'IB_GATEWAY_IP_MODE: ${{ vars.IB_GATEWAY_IP_MODE }}' "$workflow_file" grep -Fq 'GLOBAL_TELEGRAM_CHAT_ID: ${{ vars.GLOBAL_TELEGRAM_CHAT_ID }}' "$workflow_file" @@ -33,6 +38,18 @@ grep -Fq 'remove_secret_vars=(' "$workflow_file" grep -Fq 'STRATEGY_PROFILE=${STRATEGY_PROFILE}' "$workflow_file" grep -Fq 'ACCOUNT_GROUP=${ACCOUNT_GROUP}' "$workflow_file" grep -Fq 'IB_ACCOUNT_GROUP_CONFIG_SECRET_NAME=${IB_ACCOUNT_GROUP_CONFIG_SECRET_NAME}' "$workflow_file" +grep -Fq 'required_vars+=(IBKR_FEATURE_SNAPSHOT_PATH)' "$workflow_file" +grep -Fq 'required_vars+=(IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH IBKR_STRATEGY_CONFIG_PATH IBKR_RECONCILIATION_OUTPUT_PATH)' "$workflow_file" +grep -Fq 'env_pairs+=("IBKR_FEATURE_SNAPSHOT_PATH=${IBKR_FEATURE_SNAPSHOT_PATH}")' "$workflow_file" +grep -Fq 'env_pairs+=("IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH=${IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH}")' "$workflow_file" +grep -Fq 'env_pairs+=("IBKR_STRATEGY_CONFIG_PATH=${IBKR_STRATEGY_CONFIG_PATH}")' "$workflow_file" +grep -Fq 'env_pairs+=("IBKR_RECONCILIATION_OUTPUT_PATH=${IBKR_RECONCILIATION_OUTPUT_PATH}")' "$workflow_file" +grep -Fq 'env_pairs+=("IBKR_DRY_RUN_ONLY=${IBKR_DRY_RUN_ONLY}")' "$workflow_file" +grep -Fq 'remove_env_vars+=("IBKR_FEATURE_SNAPSHOT_PATH")' "$workflow_file" +grep -Fq 'remove_env_vars+=("IBKR_FEATURE_SNAPSHOT_MANIFEST_PATH")' "$workflow_file" +grep -Fq 'remove_env_vars+=("IBKR_STRATEGY_CONFIG_PATH")' "$workflow_file" +grep -Fq 'remove_env_vars+=("IBKR_RECONCILIATION_OUTPUT_PATH")' "$workflow_file" +grep -Fq 'remove_env_vars+=("IBKR_DRY_RUN_ONLY")' "$workflow_file" grep -Fq 'secret_pairs+=("TELEGRAM_TOKEN=${TELEGRAM_TOKEN_SECRET_NAME}:latest")' "$workflow_file" grep -Fq 'env_pairs+=("TELEGRAM_TOKEN=${TELEGRAM_TOKEN}")' "$workflow_file" grep -Fq 'remove_env_vars+=("IB_GATEWAY_ZONE")' "$workflow_file"