|
4 | 4 | # src/rotator_library/providers/anthropic_provider.py |
5 | 5 |
|
6 | 6 | import copy |
| 7 | +import hashlib |
7 | 8 | import json |
8 | 9 | import os |
9 | 10 | import time |
|
18 | 19 |
|
19 | 20 | from .provider_interface import ProviderInterface |
20 | 21 | from .anthropic_auth_base import AnthropicAuthBase |
| 22 | +from .provider_cache import create_provider_cache |
21 | 23 | from ..timeout_config import TimeoutConfig |
22 | 24 | from ..transaction_logger import ProviderLogger |
23 | 25 |
|
|
62 | 64 | "pause_turn": "stop", |
63 | 65 | } |
64 | 66 |
|
| 67 | +# Lazy-initialised server-side cache for thinking block signatures. |
| 68 | +# Allows us to re-attach signatures when OpenAI-format clients send back |
| 69 | +# reasoning_content without the signature (which they can't preserve). |
| 70 | +_thinking_sig_cache = None |
| 71 | + |
| 72 | + |
| 73 | +def _get_thinking_cache(): |
| 74 | + global _thinking_sig_cache |
| 75 | + if _thinking_sig_cache is None: |
| 76 | + _thinking_sig_cache = create_provider_cache( |
| 77 | + "anthropic_thinking_signatures", |
| 78 | + memory_ttl_seconds=7200, # 2 hours in memory |
| 79 | + disk_ttl_seconds=172800, # 48 hours on disk |
| 80 | + ) |
| 81 | + return _thinking_sig_cache |
| 82 | + |
65 | 83 |
|
66 | 84 | class AnthropicProvider(AnthropicAuthBase, ProviderInterface): |
67 | 85 | """ |
@@ -155,15 +173,23 @@ def _openai_messages_to_anthropic(self, messages: List[Dict[str, Any]]) -> tuple |
155 | 173 | blocks = [] |
156 | 174 |
|
157 | 175 | reasoning = msg.get("reasoning_content") |
158 | | - thinking_sig = msg.get("thinking_signature") |
159 | 176 | if reasoning: |
160 | | - thinking_block = { |
161 | | - "type": "thinking", |
162 | | - "thinking": reasoning, |
163 | | - } |
164 | | - if thinking_sig and len(thinking_sig) >= 100: |
165 | | - thinking_block["signature"] = thinking_sig |
166 | | - blocks.append(thinking_block) |
| 177 | + # Try server-side cache first (signature preserved from |
| 178 | + # the original Anthropic response) |
| 179 | + cached = self._retrieve_thinking_blocks(reasoning) |
| 180 | + if cached: |
| 181 | + blocks.extend(cached) |
| 182 | + else: |
| 183 | + # Fallback: inline signature from client (custom clients) |
| 184 | + thinking_sig = msg.get("thinking_signature") |
| 185 | + if thinking_sig and len(thinking_sig) >= 100: |
| 186 | + blocks.append({ |
| 187 | + "type": "thinking", |
| 188 | + "thinking": reasoning, |
| 189 | + "signature": thinking_sig, |
| 190 | + }) |
| 191 | + # else: no signature → drop thinking block, |
| 192 | + # model generates fresh thinking (cache miss on prefix) |
167 | 193 |
|
168 | 194 | if isinstance(content, str) and content.strip(): |
169 | 195 | blocks.append({"type": "text", "text": content}) |
@@ -251,6 +277,29 @@ def _openai_messages_to_anthropic(self, messages: List[Dict[str, Any]]) -> tuple |
251 | 277 |
|
252 | 278 | return system_blocks, anthropic_messages |
253 | 279 |
|
| 280 | + def _retrieve_thinking_blocks( |
| 281 | + self, reasoning_content: str |
| 282 | + ) -> Optional[List[Dict[str, Any]]]: |
| 283 | + """Look up cached thinking blocks with signatures for given thinking content.""" |
| 284 | + cache_key = hashlib.sha256(reasoning_content.encode()).hexdigest() |
| 285 | + cached = _get_thinking_cache().retrieve(cache_key) |
| 286 | + if not cached: |
| 287 | + return None |
| 288 | + try: |
| 289 | + blocks_data = json.loads(cached) |
| 290 | + result = [ |
| 291 | + { |
| 292 | + "type": "thinking", |
| 293 | + "thinking": b["thinking"], |
| 294 | + "signature": b["signature"], |
| 295 | + } |
| 296 | + for b in blocks_data |
| 297 | + if b.get("signature") |
| 298 | + ] |
| 299 | + return result if result else None |
| 300 | + except (json.JSONDecodeError, KeyError, TypeError): |
| 301 | + return None |
| 302 | + |
254 | 303 | def _openai_tools_to_anthropic( |
255 | 304 | self, tools: Optional[List[Dict[str, Any]]] |
256 | 305 | ) -> Optional[List[Dict[str, Any]]]: |
@@ -348,6 +397,10 @@ def _anthropic_event_to_openai_chunks( |
348 | 397 | stream_state["current_block_type"] = block_type |
349 | 398 | stream_state["current_block_index"] = index |
350 | 399 |
|
| 400 | + if block_type == "thinking": |
| 401 | + stream_state["_block_thinking"] = "" |
| 402 | + stream_state["_block_signature"] = "" |
| 403 | + |
351 | 404 | if block_type == "tool_use": |
352 | 405 | tool_id = block.get("id", f"toolu_{uuid.uuid4().hex[:12]}") |
353 | 406 | raw_name = block.get("name", "") |
@@ -423,6 +476,9 @@ def _anthropic_event_to_openai_chunks( |
423 | 476 | stream_state["accumulated_thinking"] = ( |
424 | 477 | stream_state.get("accumulated_thinking", "") + thinking |
425 | 478 | ) |
| 479 | + stream_state["_block_thinking"] = ( |
| 480 | + stream_state.get("_block_thinking", "") + thinking |
| 481 | + ) |
426 | 482 | yield { |
427 | 483 | "choices": [ |
428 | 484 | { |
@@ -473,10 +529,21 @@ def _anthropic_event_to_openai_chunks( |
473 | 529 | stream_state["thinking_signature"] = ( |
474 | 530 | stream_state.get("thinking_signature", "") + sig |
475 | 531 | ) |
| 532 | + stream_state["_block_signature"] = ( |
| 533 | + stream_state.get("_block_signature", "") + sig |
| 534 | + ) |
476 | 535 |
|
477 | 536 | return |
478 | 537 |
|
479 | 538 | if event_type == "content_block_stop": |
| 539 | + if stream_state.get("current_block_type") == "thinking": |
| 540 | + block_thinking = stream_state.pop("_block_thinking", "") |
| 541 | + block_sig = stream_state.pop("_block_signature", "") |
| 542 | + if block_thinking and block_sig: |
| 543 | + stream_state.setdefault("_thinking_blocks", []).append({ |
| 544 | + "thinking": block_thinking, |
| 545 | + "signature": block_sig, |
| 546 | + }) |
480 | 547 | return |
481 | 548 |
|
482 | 549 | if event_type == "message_delta": |
@@ -509,6 +576,14 @@ def _anthropic_event_to_openai_chunks( |
509 | 576 | "total_tokens": input_tokens + output_tokens, |
510 | 577 | }, |
511 | 578 | } |
| 579 | + |
| 580 | + # Cache thinking blocks with signatures for multi-turn preservation |
| 581 | + thinking_blocks = stream_state.get("_thinking_blocks") |
| 582 | + if thinking_blocks: |
| 583 | + full_thinking = "".join(b["thinking"] for b in thinking_blocks) |
| 584 | + cache_key = hashlib.sha256(full_thinking.encode()).hexdigest() |
| 585 | + _get_thinking_cache().store(cache_key, json.dumps(thinking_blocks)) |
| 586 | + |
512 | 587 | return |
513 | 588 |
|
514 | 589 | # ========================================================================= |
|
0 commit comments