-
-
Notifications
You must be signed in to change notification settings - Fork 110
Expand file tree
/
Copy pathopencode-get-message
More file actions
executable file
·98 lines (78 loc) · 3.4 KB
/
opencode-get-message
File metadata and controls
executable file
·98 lines (78 loc) · 3.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python3
"""
Get full OpenCode message payload(s) by message ID from the OpenCode database.
Usage:
opencode-get-message <message-id> [message-id ...]
opencode-get-message --session <session-id> <message-id> [message-id ...]
"""
import argparse
import json
from opencode_api import APIError, add_api_arguments, create_client_from_args, list_sessions_across_projects
def normalize_message_payload(payload: dict) -> dict:
"""Normalize API response into expected output shape."""
info = payload.get("info", {})
parts = payload.get("parts", [])
return {"info": info, "parts": parts}
def not_found_result(message_id: str) -> dict:
return {"id": message_id, "error": "message_not_found"}
def get_message_for_session(client, session: dict, message_id: str) -> dict:
"""Get a message payload within a known session."""
session_id = session.get("id", "")
directory = session.get("directory")
try:
payload = client.get_session_message(session_id, message_id, directory=directory)
return normalize_message_payload(payload)
except APIError as err:
if err.status_code == 404:
return not_found_result(message_id)
raise
def find_messages_without_session(client, message_ids: list[str], scan_sessions: int, session_list_limit: int) -> list[dict]:
"""Search recent sessions for requested message IDs."""
wanted = set(message_ids)
found: dict[str, dict] = {}
sessions = list_sessions_across_projects(client, per_project_limit=session_list_limit)
if scan_sessions > 0:
sessions = sessions[:scan_sessions]
for session in sessions:
if not wanted:
break
messages = client.get_session_messages(session.get("id", ""), directory=session.get("directory"))
for message in messages:
info = message.get("info", {})
mid = info.get("id")
if mid in wanted:
found[mid] = normalize_message_payload(message)
wanted.remove(mid)
return [found.get(message_id, not_found_result(message_id)) for message_id in message_ids]
def main() -> int:
parser = argparse.ArgumentParser(description="Get full OpenCode message payload by message ID")
parser.add_argument("--session", "-s", type=str, default=None, help="Session ID for direct lookup")
parser.add_argument(
"--scan-sessions",
type=int,
default=200,
help="When --session is omitted, scan this many recent sessions for message IDs",
)
parser.add_argument("message_ids", nargs="+", help="One or more message IDs")
add_api_arguments(parser)
args = parser.parse_args()
try:
with create_client_from_args(args) as client:
if args.session:
session = client.get_session(args.session)
results = [get_message_for_session(client, session, message_id) for message_id in args.message_ids]
else:
results = find_messages_without_session(
client,
args.message_ids,
scan_sessions=args.scan_sessions,
session_list_limit=args.session_list_limit,
)
except APIError as err:
print(f"Error: {err}")
return 1
output = results[0] if len(results) == 1 else results
print(json.dumps(output, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())