-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathclient.py
More file actions
4485 lines (3835 loc) · 169 KB
/
client.py
File metadata and controls
4485 lines (3835 loc) · 169 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
"""Main client classes for AdCP."""
import contextlib
import hashlib
import hmac
import json
import logging
import os
import time
from collections.abc import Callable, Iterator
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, TypedDict, cast
from uuid import uuid4
from a2a.types import Task, TaskStatusUpdateEvent
from pydantic import BaseModel
if TYPE_CHECKING:
import httpx
from mcp import ClientSession
from adcp._version import resolve_adcp_version
from adcp.capabilities import TASK_FEATURE_MAP, FeatureResolver, looks_like_v3_capabilities
from adcp.exceptions import ADCPError, ADCPWebhookSignatureError
from adcp.protocols.a2a import A2AAdapter
from adcp.protocols.base import ProtocolAdapter
from adcp.protocols.mcp import MCPAdapter
from adcp.signing.autosign import (
SigningConfig,
operation_needs_signing,
)
from adcp.signing.autosign import (
current_operation as _signing_current_operation,
)
from adcp.signing.signer import sign_request
from adcp.types import (
ActivateSignalRequest,
ActivateSignalResponse,
BuildCreativeRequest,
BuildCreativeResponse,
CreateMediaBuyRequest,
CreateMediaBuyResponse,
GeneratedTaskStatus,
GetAccountFinancialsRequest,
GetAccountFinancialsResponse,
GetCreativeDeliveryRequest,
GetCreativeDeliveryResponse,
GetMediaBuyDeliveryRequest,
GetMediaBuyDeliveryResponse,
GetMediaBuysRequest,
GetMediaBuysResponse,
GetProductsRequest,
GetProductsResponse,
GetSignalsRequest,
GetSignalsResponse,
ListAccountsRequest,
ListAccountsResponse,
ListCreativeFormatsRequest,
ListCreativeFormatsResponse,
ListCreativesRequest,
ListCreativesResponse,
LogEventRequest,
LogEventResponse,
PreviewCreativeRequest,
PreviewCreativeResponse,
ProvidePerformanceFeedbackRequest,
ProvidePerformanceFeedbackResponse,
ReportUsageRequest,
ReportUsageResponse,
SyncAccountsRequest,
SyncAccountsResponse,
SyncAudiencesRequest,
SyncAudiencesResponse,
SyncCatalogsRequest,
SyncCatalogsResponse,
SyncCreativesRequest,
SyncCreativesResponse,
SyncEventSourcesRequest,
SyncEventSourcesResponse,
UpdateMediaBuyRequest,
UpdateMediaBuyResponse,
)
from adcp.types.core import (
Activity,
ActivityType,
AgentConfig,
Protocol,
TaskResult,
TaskStatus,
)
# V3 Governance (Sync Governance) types
from adcp.types.generated_poc.account.sync_governance_request import (
SyncGovernanceRequest,
)
from adcp.types.generated_poc.account.sync_governance_response import (
SyncGovernanceResponse,
)
from adcp.types.generated_poc.brand.acquire_rights_request import AcquireRightsRequest
from adcp.types.generated_poc.brand.acquire_rights_response import (
AcquireRightsResponse,
)
from adcp.types.generated_poc.brand.get_brand_identity_request import (
GetBrandIdentityRequest,
)
from adcp.types.generated_poc.brand.get_brand_identity_response import (
GetBrandIdentityResponse,
)
from adcp.types.generated_poc.brand.get_rights_request import GetRightsRequest
from adcp.types.generated_poc.brand.get_rights_response import GetRightsResponse
from adcp.types.generated_poc.brand.update_rights_request import UpdateRightsRequest
from adcp.types.generated_poc.brand.update_rights_response import (
UpdateRightsResponse,
)
# V3 Governance (Collection Lists) types
from adcp.types.generated_poc.collection.create_collection_list_request import (
CreateCollectionListRequest,
)
from adcp.types.generated_poc.collection.create_collection_list_response import (
CreateCollectionListResponse,
)
from adcp.types.generated_poc.collection.delete_collection_list_request import (
DeleteCollectionListRequest,
)
from adcp.types.generated_poc.collection.delete_collection_list_response import (
DeleteCollectionListResponse,
)
from adcp.types.generated_poc.collection.get_collection_list_request import (
GetCollectionListRequest,
)
from adcp.types.generated_poc.collection.get_collection_list_response import (
GetCollectionListResponse,
)
from adcp.types.generated_poc.collection.list_collection_lists_request import (
ListCollectionListsRequest,
)
from adcp.types.generated_poc.collection.list_collection_lists_response import (
ListCollectionListsResponse,
)
from adcp.types.generated_poc.collection.update_collection_list_request import (
UpdateCollectionListRequest,
)
from adcp.types.generated_poc.collection.update_collection_list_response import (
UpdateCollectionListResponse,
)
from adcp.types.generated_poc.compliance.comply_test_controller_request import (
ComplyTestControllerRequest,
)
from adcp.types.generated_poc.compliance.comply_test_controller_response import (
ComplyTestControllerResponse,
)
from adcp.types.generated_poc.content_standards.calibrate_content_request import (
CalibrateContentRequest,
)
from adcp.types.generated_poc.content_standards.calibrate_content_response import (
CalibrateContentResponse,
)
# V3 Content Standards types
from adcp.types.generated_poc.content_standards.create_content_standards_request import (
CreateContentStandardsRequest,
)
from adcp.types.generated_poc.content_standards.create_content_standards_response import (
CreateContentStandardsResponse,
)
from adcp.types.generated_poc.content_standards.get_content_standards_request import (
GetContentStandardsRequest,
)
from adcp.types.generated_poc.content_standards.get_content_standards_response import (
GetContentStandardsResponse,
)
from adcp.types.generated_poc.content_standards.get_media_buy_artifacts_request import (
GetMediaBuyArtifactsRequest,
)
from adcp.types.generated_poc.content_standards.get_media_buy_artifacts_response import (
GetMediaBuyArtifactsResponse,
)
from adcp.types.generated_poc.content_standards.list_content_standards_request import (
ListContentStandardsRequest,
)
from adcp.types.generated_poc.content_standards.list_content_standards_response import (
ListContentStandardsResponse,
)
from adcp.types.generated_poc.content_standards.update_content_standards_request import (
UpdateContentStandardsRequest,
)
from adcp.types.generated_poc.content_standards.update_content_standards_response import (
UpdateContentStandardsResponse,
)
from adcp.types.generated_poc.content_standards.validate_content_delivery_request import (
ValidateContentDeliveryRequest,
)
from adcp.types.generated_poc.content_standards.validate_content_delivery_response import (
ValidateContentDeliveryResponse,
)
from adcp.types.generated_poc.core.async_response_data import AdcpAsyncResponseData
from adcp.types.generated_poc.creative.get_creative_features_request import (
GetCreativeFeaturesRequest,
)
from adcp.types.generated_poc.creative.get_creative_features_response import (
GetCreativeFeaturesResponse,
)
# V3 Governance (Property Lists) types
from adcp.types.generated_poc.governance.check_governance_request import (
CheckGovernanceRequest,
)
from adcp.types.generated_poc.governance.check_governance_response import (
CheckGovernanceResponse,
)
from adcp.types.generated_poc.governance.get_plan_audit_logs_request import (
GetPlanAuditLogsRequest,
)
from adcp.types.generated_poc.governance.get_plan_audit_logs_response import (
GetPlanAuditLogsResponse,
)
from adcp.types.generated_poc.governance.report_plan_outcome_request import (
ReportPlanOutcomeRequest,
)
from adcp.types.generated_poc.governance.report_plan_outcome_response import (
ReportPlanOutcomeResponse,
)
from adcp.types.generated_poc.governance.sync_plans_request import SyncPlansRequest
from adcp.types.generated_poc.governance.sync_plans_response import SyncPlansResponse
from adcp.types.generated_poc.property.create_property_list_request import (
CreatePropertyListRequest,
)
from adcp.types.generated_poc.property.create_property_list_response import (
CreatePropertyListResponse,
)
from adcp.types.generated_poc.property.delete_property_list_request import (
DeletePropertyListRequest,
)
from adcp.types.generated_poc.property.delete_property_list_response import (
DeletePropertyListResponse,
)
from adcp.types.generated_poc.property.get_property_list_request import (
GetPropertyListRequest,
)
from adcp.types.generated_poc.property.get_property_list_response import (
GetPropertyListResponse,
)
from adcp.types.generated_poc.property.list_property_lists_request import (
ListPropertyListsRequest,
)
from adcp.types.generated_poc.property.list_property_lists_response import (
ListPropertyListsResponse,
)
from adcp.types.generated_poc.property.update_property_list_request import (
UpdatePropertyListRequest,
)
from adcp.types.generated_poc.property.update_property_list_response import (
UpdatePropertyListResponse,
)
# V3 Protocol Discovery types
from adcp.types.generated_poc.protocol.get_adcp_capabilities_request import (
GetAdcpCapabilitiesRequest,
)
from adcp.types.generated_poc.protocol.get_adcp_capabilities_response import (
GetAdcpCapabilitiesResponse,
)
# V3 Sponsored Intelligence types
from adcp.types.generated_poc.sponsored_intelligence.si_get_offering_request import (
SiGetOfferingRequest,
)
from adcp.types.generated_poc.sponsored_intelligence.si_get_offering_response import (
SiGetOfferingResponse,
)
from adcp.types.generated_poc.sponsored_intelligence.si_initiate_session_request import (
SiInitiateSessionRequest,
)
from adcp.types.generated_poc.sponsored_intelligence.si_initiate_session_response import (
SiInitiateSessionResponse,
)
from adcp.types.generated_poc.sponsored_intelligence.si_send_message_request import (
SiSendMessageRequest,
)
from adcp.types.generated_poc.sponsored_intelligence.si_send_message_response import (
SiSendMessageResponse,
)
from adcp.types.generated_poc.sponsored_intelligence.si_terminate_session_request import (
SiTerminateSessionRequest,
)
from adcp.types.generated_poc.sponsored_intelligence.si_terminate_session_response import (
SiTerminateSessionResponse,
)
from adcp.types.generated_poc.tmp.context_match_request import ContextMatchRequest
from adcp.types.generated_poc.tmp.context_match_response import ContextMatchResponse
from adcp.types.generated_poc.tmp.identity_match_request import IdentityMatchRequest
from adcp.types.generated_poc.tmp.identity_match_response import IdentityMatchResponse
from adcp.utils.operation_id import create_operation_id
from adcp.validation.client_hooks import ValidationHookConfig
logger = logging.getLogger(__name__)
class Checkpoint(TypedDict):
"""Persistable session-resume state for an A2A ``ADCPClient``.
The minimal set of fields needed to reconnect to an in-flight A2A
conversation after a process restart. Produced by
``ADCPClient.checkpoint()``; consumed by
``ADCPClient.from_checkpoint()``.
- ``agent_id`` — binds the checkpoint to the agent that minted it,
so a restore against the wrong ``AgentConfig`` fails loudly
instead of sending Agent A's ids to Agent B.
- ``context_id`` — the A2A conversation id.
- ``active_task_id`` — the in-flight task the next message must
echo; ``None`` if no task is pending.
"""
agent_id: str
context_id: str | None
active_task_id: str | None
class ADCPClient:
"""Client for interacting with a single AdCP agent."""
def __init__(
self,
agent_config: AgentConfig,
webhook_url_template: str | None = None,
webhook_secret: str | None = None,
on_activity: Callable[[Activity], None] | None = None,
webhook_timestamp_tolerance: int = 300,
capabilities_ttl: float = 3600.0,
validate_features: bool = False,
strict_idempotency: bool = False,
signing: SigningConfig | None = None,
context_id: str | None = None,
validation: ValidationHookConfig | None = None,
force_a2a_version: str | None = None,
adcp_version: str | None = None,
):
"""
Initialize ADCP client for a single agent.
Args:
agent_config: Agent configuration
webhook_url_template: Template for webhook URLs with {agent_id},
{task_type}, {operation_id}
webhook_secret: Secret for webhook signature verification
on_activity: Callback for activity events
webhook_timestamp_tolerance: Maximum age (in seconds) for webhook
timestamps. Webhooks with timestamps older than this or more than
this far in the future are rejected. Defaults to 300 (5 minutes).
capabilities_ttl: Time-to-live in seconds for cached capabilities (default: 1 hour)
validate_features: When True, automatically check that the seller supports
required features before making task calls (e.g., sync_audiences requires
audience_targeting). Requires capabilities to have been fetched first.
strict_idempotency: When True, verify the seller declared
``adcp.idempotency.replay_ttl_seconds`` in capabilities before any
mutating call. Fetches capabilities lazily on first use. Raises
``IdempotencyUnsupportedError`` if the declaration is missing —
sellers that don't declare it provide no retry-safety guarantee
per AdCP #2315. Defaults to False for backward compatibility.
signing: Optional RFC 9421 request-signing config. When provided,
the client automatically attaches ``Signature`` /
``Signature-Input`` / ``Content-Digest`` headers to operations
the seller's ``request_signing`` capability lists in
``required_for``, ``warn_for``, or ``supported_for``. The
seller's ``covers_content_digest`` policy determines whether
the body is bound to the signature. Generate a key with
``adcp-keygen`` and publish the public JWK at your
``jwks_uri``. Supported on both A2A and MCP
(``mcp_transport="streamable_http"``); SSE-transport MCP
logs a warning and falls through unsigned.
validation: Schema-driven validation modes for outgoing
requests and incoming responses against the bundled AdCP
JSON schemas. Defaults (matching the TS port): requests
in ``warn`` mode (drift logged but not blocked — partial
payloads in error-path tests still work) and responses
in ``strict`` mode (agent drift fails the task).
``ADCP_VALIDATION_MODE=strict|warn|off`` overrides both
sides at call time (matches the TS port); ``ADCP_ENV``
set to ``production`` / ``prod`` flips only the response
default to ``warn``. Generic ``ENV`` / ``ENVIRONMENT`` /
``PYTHON_ENV`` are deliberately ignored — they collide
with unrelated tooling. Storyboards and compliance
runners that want hard-stop enforcement everywhere pass
``validation=ValidationHookConfig(requests="strict",
responses="strict")``; high-throughput callers can set
either side to ``"off"`` to skip the validator entirely
with zero overhead.
context_id: A2A-only. Seed the A2A conversation context. Pass a
previously-returned ``context_id`` to resume a session
across process restarts, or a self-assigned UUID to name
the session with your own correlation key (the ADK server
honors buyer-proposed ids). If omitted, the server mints
one on the first message and this client auto-retains it
for subsequent calls. Read the current value via
``client.context_id``; call ``client.reset_context()`` to
start a fresh conversation. Rule of thumb: one
``ADCPClient`` per A2A conversation — if a buyer has
multiple concurrent briefs with the same agent, construct
one client per brief rather than sharing.
For HITL flows that can span a process restart mid-task,
use ``checkpoint()`` / ``from_checkpoint()`` instead of
persisting ``context_id`` alone — full resume state is
both ``context_id`` AND ``active_task_id``.
Raises ``TypeError`` if passed with a non-A2A protocol.
force_a2a_version: A2A-only. Pin the **A2A transport
version** (e.g. ``"0.3"``, ``"1.0"``) by filtering the
peer's advertised ``supported_interfaces`` to entries
whose ``protocol_version`` matches. Not for AdCP
protocol pinning — see ``adcp_version`` for that.
Intended for tests or for forcing a 0.3-speaking path
against a dual-advertising peer. Raises
:class:`ADCPConnectionError` on the first call if no
advertised interface matches. ``None`` (default) lets
the SDK's ``ClientFactory`` pick the most capable
transport the peer supports. Use
:attr:`a2a_protocol_versions` to probe what a peer
advertises before pinning.
Raises ``TypeError`` if passed with a non-A2A protocol.
adcp_version: AdCP protocol release this client speaks
(release-precision string, e.g. ``"3.0"``, ``"3.1"``,
``"3.1-beta"``). Stripe-style per-instance pin: the
value is sent as ``adcp_version`` on every outbound
request once Stage 3 wires it through the validation
hooks; today (Stage 2), it's plumbing only — stored on
the instance and exposed via :meth:`get_adcp_version`,
with no wire impact yet. ``None`` (default) resolves
to the SDK's compile-time pin (``ADCP_VERSION``
packaged with the wheel). Cross-major pins raise
:class:`ConfigurationError` at construction; install
the SDK major that targets your wire version instead.
Patch-precision strings (``"3.0.1"``) and build
metadata (``"3.0.1+canary"``) are accepted at construction
but normalized to release-precision before wire emission
per the spec — patches and build metadata are not part
of the negotiation contract. ``get_adcp_version()``
returns the normalized form.
Caller-supplied ``adcp_version`` on a per-call params
dict wins over the constructor pin: the enricher is
the default, not an override. Once Stage 3 threads
schema selection through, this becomes a supported
per-call override; today it's plumbing-level only.
Migration from ``adcp_major_version`` (legacy integer
wire field): generated request types still expose
``adcp_major_version: int | None`` from the pre-#3493
schema. Both fields will coexist on the wire through
3.x; servers prefer the new ``adcp_version`` when both
are present. Stop populating ``adcp_major_version`` on
request models once your seller advertises 3.1 in
``supported_versions``.
"""
self._adcp_version: str = resolve_adcp_version(adcp_version)
self.agent_config = agent_config
self.webhook_url_template = webhook_url_template
self.webhook_secret = webhook_secret
self.on_activity = on_activity
self.webhook_timestamp_tolerance = webhook_timestamp_tolerance
self.capabilities_ttl = capabilities_ttl
self.validate_features = validate_features
self.strict_idempotency = strict_idempotency
self.signing = signing
# Capabilities cache
self._capabilities: GetAdcpCapabilitiesResponse | None = None
self._feature_resolver: FeatureResolver | None = None
self._capabilities_fetched_at: float | None = None
self._idempotency_capability_verified: bool = False
# Unique per-instance token so use_idempotency_key scopes to this
# client and does not bleed to siblings (AdCP #2315 cross-seller risk).
from uuid import uuid4 as _uuid4
self._idempotency_client_token: str = _uuid4().hex
if force_a2a_version is not None and agent_config.protocol != Protocol.A2A:
raise TypeError(
f"force_a2a_version is only supported for A2A protocol; "
f"got {agent_config.protocol}"
)
# Initialize protocol adapter
self.adapter: ProtocolAdapter
if agent_config.protocol == Protocol.A2A:
self.adapter = A2AAdapter(agent_config, force_a2a_version=force_a2a_version)
elif agent_config.protocol == Protocol.MCP:
self.adapter = MCPAdapter(agent_config)
else:
raise ValueError(f"Unsupported protocol: {agent_config.protocol}")
self.adapter.idempotency_client_token = self._idempotency_client_token
if strict_idempotency:
self.adapter.idempotency_capability_check = self._ensure_idempotency_capability
if signing is not None:
self.adapter.signing_request_hook = self._sign_outgoing_request
# Apply schema validation modes (default: requests=warn, responses=strict
# in dev/test, warn in production — see ``ValidationHookConfig`` docs).
self.adapter.configure_validation(validation)
# Auto-inject the per-instance ``adcp_version`` pin into every
# outbound request envelope. Caller-supplied values on the
# request object win — the enricher is the default, not an
# override — so per-call overrides remain available once the
# generated request types declare the field.
_pinned_version = self._adcp_version
def _inject_adcp_version(params: dict[str, Any]) -> dict[str, Any]:
return {"adcp_version": _pinned_version, **params}
self.adapter.envelope_enricher = _inject_adcp_version
if context_id:
# Empty string is treated as "not provided" — callers using
# ``context_id=os.getenv("...") or ""`` patterns shouldn't
# silently seed an empty id on the wire.
if not isinstance(self.adapter, A2AAdapter):
raise TypeError(
f"context_id is only supported for A2A protocol; "
f"got {agent_config.protocol}"
)
self.adapter.set_context_id(context_id)
# Initialize simple API accessor (lazy import to avoid circular dependency)
from adcp.simple import SimpleAPI
self.simple = SimpleAPI(self)
def get_adcp_version(self) -> str:
"""Return the AdCP protocol release this client is pinned to.
Resolved at construction from the ``adcp_version`` kwarg, with
fallback to the SDK's compile-time pin (``ADCP_VERSION``
packaged with the wheel) when the caller didn't pin
explicitly. Same value across the client's lifetime — the pin
is per-instance, not per-call.
See ``__init__``'s ``adcp_version`` parameter for the full
semantics, including the cross-major fence and the Stage 2 vs
Stage 3 distinction (today the pin is plumbing only; Stage 3
threads it through schema/validator selection).
"""
return self._adcp_version
@property
def context_id(self) -> str | None:
"""Current A2A conversation context_id.
Reads the context_id currently associated with this client: the
value assigned by the A2A server (auto-captured from the most
recent response) or the one seeded via the constructor or
``reset_context()``. Returns ``None`` before the first A2A call
in a fresh conversation, or for clients on non-A2A protocols —
reads are lenient across protocols so generic code can probe
``if client.context_id: ...`` safely. Writes (constructor kwarg,
``reset_context``) raise on non-A2A because the operation has no
meaning there.
Not safe for concurrent calls on the same client — the adapter
mutates this on every response. Rule of thumb: one ADCPClient
per A2A conversation.
For simple completed-task resume, persist this value and pass
it to ``ADCPClient(context_id=...)``. For HITL flows that may
restart mid-``input-required``, use ``checkpoint()`` /
``from_checkpoint()`` — full resume state is both this id AND
``active_task_id``.
"""
if isinstance(self.adapter, A2AAdapter):
return self.adapter.context_id
return None
@property
def active_task_id(self) -> str | None:
"""A2A task_id the next send must echo to resume the same task.
Set when the last A2A response was non-terminal
(``input-required``, ``working``, ``submitted``,
``auth-required``). The adapter echoes this id on the next
outbound message so the server resumes the same task. Clears
automatically when the task reaches a terminal state.
Full resume state is *both* ``context_id`` and
``active_task_id`` — persist both (or use ``checkpoint()``) to
survive a process restart mid-HITL without orphaning the task.
Returns ``None`` for non-A2A clients.
"""
if isinstance(self.adapter, A2AAdapter):
return self.adapter.active_task_id
return None
@property
def a2a_protocol_versions(self) -> list[str] | None:
"""A2A ``protocol_version`` strings the peer advertises, sorted.
Lazily populated after the first operation that fetches the
peer's ``AgentCard`` (``fetch_capabilities``, ``list_tools``,
``get_agent_info``, or any skill-call). Returns ``None`` before
the card has been fetched so callers can distinguish "not yet
known" from "peer advertises nothing" (empty list). Returns
``None`` for non-A2A clients.
Useful for probing which wire version a peer speaks — buyers
running alongside both 0.3-era and 1.0-era agents can use this
to confirm what they're talking to.
"""
if isinstance(self.adapter, A2AAdapter):
return self.adapter.a2a_protocol_versions
return None
def reset_context(self, context_id: str | None = None) -> None:
"""Start a new A2A conversation on this client.
Passing ``None`` (default) clears the current context so the
server mints a fresh one on the next call. Passing a string uses
it as the new conversation id — useful for resuming a specific
prior session or for naming the conversation with your own
correlation key. Note: some servers (notably ADK) rewrite
client-supplied ids into their own session format; the client
auto-adopts the rewritten id on the next response.
Also clears any active_task_id — starting a new conversation
discards any in-flight task on the old one.
Raises ``TypeError`` when called on a non-A2A client.
"""
if not isinstance(self.adapter, A2AAdapter):
raise TypeError(
f"reset_context is only supported for A2A protocol; "
f"got {self.agent_config.protocol}"
)
self.adapter.set_context_id(context_id)
def checkpoint(self) -> Checkpoint:
"""Return the minimal state needed to resume this A2A session.
Full resume for HITL / multi-turn flows requires *both*
``context_id`` (which conversation) AND ``active_task_id``
(which in-flight task to echo). Persisting only ``context_id``
reconnects to the right conversation but orphans the pending
task server-side — the next send starts a new task under the
same context, and the original ``input-required`` task is
abandoned.
The returned dict also carries ``agent_id`` so a later
``from_checkpoint`` call against a different ``AgentConfig``
fails loudly instead of sending one agent's session ids to
another.
Pair with ``ADCPClient.from_checkpoint(agent_config, state)``.
Returns a fully-populated ``Checkpoint`` on non-A2A clients
with ``context_id``/``active_task_id`` set to ``None``, so
generic persist-and-restore code can call this without
branching on protocol.
"""
return Checkpoint(
agent_id=self.agent_config.id,
context_id=self.context_id,
active_task_id=self.active_task_id,
)
@classmethod
def from_checkpoint(
cls,
agent_config: AgentConfig,
state: Checkpoint,
**kwargs: Any,
) -> ADCPClient:
"""Rehydrate an ADCPClient from a prior ``checkpoint()``.
Restores both ``context_id`` and ``active_task_id`` so a process
restart mid-``input-required`` can resume the same task, not
orphan it. Accepts the same keyword arguments as ``__init__``
(signing, strict_idempotency, etc.) — the checkpoint only
carries session-resume state; operational config is re-supplied
by the caller.
Raises ``ValueError`` if the checkpoint's ``agent_id`` doesn't
match ``agent_config.id`` — a checkpoint minted for Agent A
must not be restored onto Agent B, or the client will leak
Agent A's opaque session ids to Agent B on the next message.
Raises ``TypeError`` on a non-A2A ``agent_config`` if the
checkpoint carries a non-empty ``context_id`` or
``active_task_id`` — session-resume state on a protocol that
doesn't support it would be silently dropped, masking bugs.
An empty/absent checkpoint round-trips cleanly on any protocol.
"""
saved_agent_id = state.get("agent_id") if state else None
if saved_agent_id and saved_agent_id != agent_config.id:
raise ValueError(
f"checkpoint was minted for agent {saved_agent_id!r}, "
f"cannot restore against {agent_config.id!r}"
)
context_id = state.get("context_id") if state else None
active_task_id = state.get("active_task_id") if state else None
if active_task_id and agent_config.protocol != Protocol.A2A:
raise TypeError(
f"active_task_id in checkpoint is only supported for A2A "
f"protocol; got {agent_config.protocol}"
)
client = cls(agent_config, context_id=context_id, **kwargs)
if active_task_id and isinstance(client.adapter, A2AAdapter):
client.adapter._restore_active_task_id(active_task_id)
return client
@classmethod
def from_mcp_client(
cls,
client: ClientSession,
*,
agent_id: str | None = None,
validation: ValidationHookConfig | None = None,
capabilities_ttl: float = 3600.0,
validate_features: bool = False,
strict_idempotency: bool = False,
) -> ADCPClient:
"""Create an ADCPClient wrapping a pre-connected MCP ClientSession.
Parity with JS ``AgentClient.fromMCPClient()`` (v5.19.0). The primary
use case is compliance test fleets that wire a full ``ADCPClient``
against an in-process MCP server without standing up a loopback HTTP
server.
Warning:
The returned client's ``close()`` and ``async with`` ``__aexit__``
are **no-ops** — the caller owns the injected session and is
responsible for closing it. Code that relies on ``async with
ADCPClient.from_mcp_client(...) as c:`` to clean up the session
will leak the session.
Webhook delivery and ``on_activity`` callbacks are **not wired**
on the in-process path — there is no HTTP transport for the
seller to call back through. Don't pass these to the factory
(they're absent from the signature on purpose).
If the injected session has not been initialized
(``await session.initialize()``), the first tool call surfaces
as an opaque MCP protocol error in ``TaskResult.error``. The
factory does not initialize for you — verify before calling.
**Session lifecycle:** the caller owns the session — ``close()`` and
``async with`` exit on the returned client are no-ops. Use your own
``AsyncExitStack`` to scope both the transport and the client::
import contextlib
from mcp import ClientSession
from mcp.shared.memory import create_client_server_memory_streams
async with contextlib.AsyncExitStack() as stack:
(c_read, c_write), (s_read, s_write) = await stack.enter_async_context(
create_client_server_memory_streams()
)
# wire your in-process server to (s_read, s_write) here
session = await stack.enter_async_context(
ClientSession(c_read, c_write)
)
await session.initialize()
# close() is a no-op on injected sessions; no stack.enter_async_context needed.
adcp_client = ADCPClient.from_mcp_client(session, agent_id="test-seller")
result = await adcp_client.get_products(GetProductsRequest(...))
Note:
Request signing is not supported on the injected-session path —
the signing hook is wired into the HTTP transport layer that is
bypassed here. ``signing=`` is intentionally absent from this
factory's parameters.
Args:
client: A pre-connected ``mcp.ClientSession`` whose
``initialize()`` has already been awaited.
agent_id: Identifier for the wrapped agent used in log messages
and error objects. Defaults to a unique ``in-process-XXXXXXXX``
token; set this explicitly when running multiple in-process
agents concurrently so log lines are distinguishable.
validation: Schema-validation modes (same as ``__init__``).
strict_idempotency: Verify seller declared idempotency support
before each mutating call (same as ``__init__``).
validate_features: Gate tool calls on fetched capability
declarations (same as ``__init__``).
capabilities_ttl: TTL for the capability cache in seconds
(same as ``__init__``).
Returns:
A fully configured ``ADCPClient`` backed by the injected session.
"""
effective_id = agent_id if agent_id is not None else f"in-process-{uuid4().hex[:8]}"
config = AgentConfig(
id=effective_id,
# RFC 2606 .invalid TLD — passes the http:// validator, guaranteed
# not to route to a real host. Self-documenting in error messages.
agent_uri="http://in-process.invalid",
protocol=Protocol.MCP,
)
instance = cls(
config,
validation=validation,
strict_idempotency=strict_idempotency,
validate_features=validate_features,
capabilities_ttl=capabilities_ttl,
)
if not isinstance(instance.adapter, MCPAdapter):
raise RuntimeError( # pragma: no cover
"from_mcp_client: expected MCPAdapter but got " f"{type(instance.adapter).__name__}"
)
instance.adapter._inject_session(client)
return instance
async def _ensure_idempotency_capability(self) -> None:
"""Verify the seller positively declares idempotency support in capabilities.
Called before every mutating request when ``strict_idempotency=True``.
Fetches capabilities on first invocation; subsequent calls are no-ops
once the declaration has been observed. Raises
``IdempotencyUnsupportedError`` when ``adcp.idempotency`` is missing,
declares ``supported=False`` (seller does not dedupe — naive retry
would double-process), or declares ``supported=True`` without a
``replay_ttl_seconds`` window.
Sets ``_idempotency_capability_verified = True`` BEFORE calling
``fetch_capabilities`` so any recursive dispatch through the adapter
terminates (``get_adcp_capabilities`` is non-mutating, so it would
short-circuit anyway — but this guard protects against future refactors
that might add it to the mutating set).
"""
from adcp.exceptions import IdempotencyUnsupportedError
if self._idempotency_capability_verified:
return
self._idempotency_capability_verified = True
try:
caps = await self.fetch_capabilities()
adcp_info = getattr(caps, "adcp", None)
idempotency_info = getattr(adcp_info, "idempotency", None) if adcp_info else None
if idempotency_info is None:
raise IdempotencyUnsupportedError(
agent_id=self.agent_config.id,
agent_uri=self.agent_config.agent_uri,
reason="seller did not declare adcp.idempotency",
)
supported = getattr(idempotency_info, "supported", None)
if supported is False:
raise IdempotencyUnsupportedError(
agent_id=self.agent_config.id,
agent_uri=self.agent_config.agent_uri,
reason="seller declared adcp.idempotency.supported=false",
)
ttl = getattr(idempotency_info, "replay_ttl_seconds", None)
if ttl is None:
raise IdempotencyUnsupportedError(
agent_id=self.agent_config.id,
agent_uri=self.agent_config.agent_uri,
reason=(
"seller declared adcp.idempotency.supported=true but omitted "
"replay_ttl_seconds"
),
)
except Exception:
self._idempotency_capability_verified = False
raise
async def _sign_outgoing_request(self, request: httpx.Request) -> None:
"""httpx request event hook that attaches RFC 9421 signature headers.
Installed on the protocol adapter's httpx client when a
``SigningConfig`` was passed to ``ADCPClient``. Consults the
seller's advertised ``request_signing`` capability and signs only
the operations the seller listed in ``required_for``, ``warn_for``,
or ``supported_for`` — other requests (including the agent-card
fetch and ``get_adcp_capabilities`` itself) pass through unsigned.
The ``covers_content_digest`` tri-state determines whether the
body is bound to the signature.
"""
if self.signing is None:
return
operation = _signing_current_operation.get()
# Unset ContextVar → out-of-band call (agent-card fetch, session
# initialize, etc). Skip without fetching capabilities.
#
# get_adcp_capabilities → bootstrap carve-out: signing it would
# require capabilities we don't have yet, and if a pathological
# seller listed this op in its own required_for we'd recurse.
# Keep this check narrow — only operations strictly required to
# *obtain* capabilities belong here. Today that's just
# get_adcp_capabilities. A future adapter that adds another
# capabilities-precondition op MUST extend this guard.
if operation is None or operation == "get_adcp_capabilities":
return
caps = await self.fetch_capabilities()
req_signing = getattr(caps, "request_signing", None)
# Detect and surface a malformed seller config: supported=False is
# "signatures are ignored", but populating required_for alongside
# it is contradictory. The classifier correctly skips (matches
# verifier behavior) but the silent downgrade hides a config bug
# that will bite pilots.
if (
req_signing is not None
and not req_signing.supported
and (req_signing.required_for or req_signing.warn_for)
):
logger.warning(
"Seller %s advertises request_signing.supported=false but "
"populates required_for/warn_for — treating as unsupported "
"per spec. Verify the seller's capability advertisement.",
self.agent_config.id,
)
decision = operation_needs_signing(req_signing, operation)
if decision == "skip":
return
covers_policy: str | None = None
if req_signing is not None and req_signing.covers_content_digest is not None:
covers_policy = req_signing.covers_content_digest.value
if covers_policy == "forbidden":
cover_digest = False
elif covers_policy == "required":
cover_digest = True
else:
# "either" or absent — signer's choice; default stricter.
cover_digest = True
body = request.content
signed = sign_request(
method=request.method,
url=str(request.url),
headers=dict(request.headers),
body=body,
private_key=self.signing.private_key,
key_id=self.signing.key_id,
alg=self.signing.alg,
cover_content_digest=cover_digest,
tag=self.signing.tag,
)
# pop-then-set ensures our signed values are authoritative even if
# another hook or earlier layer added a same-named header. httpx
# headers are a case-insensitive MultiDict, so a naive assignment
# could leave a duplicate value in a different case.
for header_name, header_value in signed.as_dict().items():
request.headers.pop(header_name, None)
request.headers[header_name] = header_value
def get_webhook_url(self, task_type: str, operation_id: str) -> str:
"""Generate webhook URL for a task."""
if not self.webhook_url_template:
raise ValueError("webhook_url_template not configured")
return self.webhook_url_template.format(
agent_id=self.agent_config.id,
task_type=task_type,
operation_id=operation_id,
)
def _emit_activity(self, activity: Activity) -> None:
"""Emit activity event."""
if self.on_activity:
self.on_activity(activity)
@contextlib.contextmanager
def use_idempotency_key(self, key: str) -> Iterator[str]:
"""Pin an ``idempotency_key`` for the next mutating call on THIS client.
Use when you've persisted a key (e.g., in a buyer-side database) and
want the SDK to send that exact key on resume or retry across process
restarts. The key is validated against ``^[A-Za-z0-9_.:-]{16,255}$`` on
entry; a ``ValueError`` is raised for malformed keys.
Scope rules:
* **Single-use within scope.** The first mutating call inside the
``with`` block consumes the pinned key; a second mutating call falls
through to a fresh UUID. This protects against ``asyncio.gather``
siblings accidentally sharing the key (which would trigger
``IDEMPOTENCY_CONFLICT`` or silently duplicate work). If you need to
retry, wrap each attempt in its own ``with`` block.
* **Client-scoped.** The pinned key applies only to calls on THIS
client. A mutating call on a sibling ``ADCPClient`` inside the same
``with`` block generates a fresh key and emits a ``UserWarning`` —
keys must be unique per (seller, request) pair (AdCP #2315).
* **No nesting.** Nested ``use_idempotency_key`` on the same client
raises ``RuntimeError``.
Example::
with client.use_idempotency_key(campaign.stored_key):
result = await client.create_media_buy(request)
"""
from adcp import _idempotency