From 9e79106c6ad547b45bbbc296671f08df51543652 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Thu, 30 Apr 2026 10:09:12 -0400 Subject: [PATCH 1/2] feat(signing): SigningProvider Protocol for KMS-backed signing (#283) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Decouples the RFC 9421 request-signing profile from in-process key storage so KMS / HSM / Vault deployments can plug in without forking the signer or pulling private material out of the managed store at boot. * `SigningProvider` Protocol — async `sign(signature_base) -> bytes`, `key_id() -> str`, `algorithm() -> Literal["ed25519","ecdsa-p256-sha256"]`. Contract docstring covers lazy init, public-key type-checking, rotation tripwire, key separation by `adcp_use`, and fingerprint redaction. * `InMemorySigningProvider` — default adapter; validates Ed25519 vs. EC, EC curve = SECP256R1, and key_id non-empty at construction. * `async_sign_request(provider=...)` — async entry point sharing the canonicalization spine (`_prepare_signature`) with the sync `sign_request`, so both paths produce byte-identical Signature-Input. * `pem_to_adcp_jwk(pem, *, kid, purpose, password=None)` — derives the public JWK for KMS adopters whose private half never leaves the managed store. Accepts both PKCS#8 private and SPKI public PEMs. * RFC 8941 §3.3.3 escaping fix at `signer.py:_escape_sf_string` — applies to `keyid`, `nonce`, `tag`. Rejects characters outside printable ASCII 0x20-0x7E to close header-injection / parser- divergence vectors at non-httpx integrators. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/adcp/signing/__init__.py | 13 +- src/adcp/signing/keygen.py | 144 +++++-- src/adcp/signing/provider.py | 248 +++++++++++ src/adcp/signing/signer.py | 183 +++++++- .../signing/test_signing_provider.py | 403 ++++++++++++++++++ 5 files changed, 946 insertions(+), 45 deletions(-) create mode 100644 src/adcp/signing/provider.py create mode 100644 tests/conformance/signing/test_signing_provider.py diff --git a/src/adcp/signing/__init__.py b/src/adcp/signing/__init__.py index 4d85fe391..6e90042db 100644 --- a/src/adcp/signing/__init__.py +++ b/src/adcp/signing/__init__.py @@ -181,12 +181,17 @@ verify_detached_jws, verify_jws_document, ) -from adcp.signing.keygen import generate_signing_keypair +from adcp.signing.keygen import generate_signing_keypair, pem_to_adcp_jwk from adcp.signing.middleware import ( unauthorized_response_headers, verify_flask_request, verify_starlette_request, ) +from adcp.signing.provider import ( + InMemorySigningProvider, + SigningAlgorithm, + SigningProvider, +) from adcp.signing.replay import InMemoryReplayStore, ReplayStore from adcp.signing.revocation import RevocationChecker, RevocationList from adcp.signing.revocation_fetcher import ( @@ -205,6 +210,7 @@ ) from adcp.signing.signer import ( SignedHeaders, + async_sign_request, sign_request, ) from adcp.signing.verifier import ( @@ -259,6 +265,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "DEFAULT_TAG", "FetchResult", "InMemoryReplayStore", + "InMemorySigningProvider", "IpPinnedTransport", "JwksResolver", "JwsError", @@ -299,8 +306,10 @@ def __init__(self, *args: object, **kwargs: object) -> None: "SignatureInputLabel", "SignatureVerificationError", "SignedHeaders", + "SigningAlgorithm", "SigningConfig", "SigningDecision", + "SigningProvider", "StaticJwksResolver", "VerifiedSigner", "VerifierCapability", @@ -310,6 +319,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "as_async_resolver", "async_default_jwks_fetcher", "async_default_revocation_list_fetcher", + "async_sign_request", "averify_detached_jws", "averify_jws_document", "b64url_decode", @@ -330,6 +340,7 @@ def __init__(self, *args: object, **kwargs: object) -> None: "load_private_key_pem", "operation_needs_signing", "parse_signature_input_header", + "pem_to_adcp_jwk", "private_key_from_jwk", "public_key_from_jwk", "resolve_and_validate_host", diff --git a/src/adcp/signing/keygen.py b/src/adcp/signing/keygen.py index 3db3a57fd..6807b02a9 100644 --- a/src/adcp/signing/keygen.py +++ b/src/adcp/signing/keygen.py @@ -53,7 +53,7 @@ from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec, ed25519 -from adcp.signing.crypto import ALG_ED25519, ALG_ES256, b64url_encode +from adcp.signing.crypto import ALG_ED25519, ALG_ES256, b64url_encode, load_private_key_pem def _encryption_algorithm( @@ -67,21 +67,14 @@ def _encryption_algorithm( _ADCP_USE_VALUES = ("request-signing", "webhook-signing") -def generate_ed25519( - kid: str, passphrase: bytes | None = None, adcp_use: str = "request-signing" -) -> tuple[bytes, dict[str, Any]]: - private = ed25519.Ed25519PrivateKey.generate() - pem = private.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=_encryption_algorithm(passphrase), - ) - public = private.public_key() - x = public.public_bytes( +def _public_jwk_ed25519( + public_key: ed25519.Ed25519PublicKey, *, kid: str, adcp_use: str +) -> dict[str, Any]: + x = public_key.public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw, ) - jwk = { + return { "kty": "OKP", "crv": "Ed25519", "alg": "EdDSA", @@ -91,20 +84,13 @@ def generate_ed25519( "kid": kid, "x": b64url_encode(x), } - return pem, jwk -def generate_es256( - kid: str, passphrase: bytes | None = None, adcp_use: str = "request-signing" -) -> tuple[bytes, dict[str, Any]]: - private = ec.generate_private_key(ec.SECP256R1()) - pem = private.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=_encryption_algorithm(passphrase), - ) - numbers = private.public_key().public_numbers() - jwk = { +def _public_jwk_es256( + public_key: ec.EllipticCurvePublicKey, *, kid: str, adcp_use: str +) -> dict[str, Any]: + numbers = public_key.public_numbers() + return { "kty": "EC", "crv": "P-256", "alg": "ES256", @@ -115,9 +101,117 @@ def generate_es256( "x": b64url_encode(numbers.x.to_bytes(32, "big")), "y": b64url_encode(numbers.y.to_bytes(32, "big")), } + + +def generate_ed25519( + kid: str, passphrase: bytes | None = None, adcp_use: str = "request-signing" +) -> tuple[bytes, dict[str, Any]]: + private = ed25519.Ed25519PrivateKey.generate() + pem = private.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=_encryption_algorithm(passphrase), + ) + jwk = _public_jwk_ed25519(private.public_key(), kid=kid, adcp_use=adcp_use) + return pem, jwk + + +def generate_es256( + kid: str, passphrase: bytes | None = None, adcp_use: str = "request-signing" +) -> tuple[bytes, dict[str, Any]]: + private = ec.generate_private_key(ec.SECP256R1()) + pem = private.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=_encryption_algorithm(passphrase), + ) + jwk = _public_jwk_es256(private.public_key(), kid=kid, adcp_use=adcp_use) return pem, jwk +def pem_to_adcp_jwk( + pem: bytes, + *, + kid: str, + purpose: Literal["request-signing", "webhook-signing"], + password: bytes | None = None, +) -> dict[str, Any]: + """Derive the public JWK for an existing AdCP signing PEM. + + Companion to :func:`generate_signing_keypair` for the case where + the key was minted elsewhere — typically in a managed key store + (KMS / HSM / Vault) that exports the public half as a PEM. The + output JWK is byte-shape-identical to what + :func:`generate_signing_keypair` would have produced for the same + key material, so it is safe to publish at the agent's + ``jwks_uri`` directly. + + Why a helper at all? Three fields in the JWK are easy to mis-emit + by hand and every wrong value yields a verifier rejection at the + first signed request: + + * ``alg`` — MUST be ``"EdDSA"`` for Ed25519, ``"ES256"`` for P-256 + (NOT the RFC 9421 ``alg`` casing used in ``Signature-Input``). + * ``adcp_use`` — required by AdCP #2423; verifiers reject keys + lacking it. MUST match the signing surface (``"request-signing"`` + vs. ``"webhook-signing"``). + * ``key_ops`` — MUST be ``["verify"]`` (the public half cannot + sign). + + :param pem: PEM-encoded private key (PKCS#8). Pass the PEM only + when the private half is at hand — for KMS deployments where + the private material never leaves the managed store, pass an + SPKI public-key PEM (``-----BEGIN PUBLIC KEY-----``) instead; + the loader handles both forms. + :param kid: JWK ``kid`` to embed. MUST match the value the signer + will advertise via :meth:`SigningProvider.key_id`. + :param purpose: Which AdCP signing profile this key is for. Sets + ``adcp_use``. Generate distinct keys per purpose — sharing + material across request-signing and webhook-signing is a spec + violation, not just a convention. + :param password: Passphrase if ``pem`` is an encrypted private + key. + + :returns: Public JWK ready to publish in the agent's ``jwks_uri``. + The private scalar (``d``) is NEVER included in the output. + + :raises ValueError: ``purpose`` is not in + ``("request-signing", "webhook-signing")``; the PEM is not + Ed25519 or ECDSA-P-256; the EC curve is not P-256. + """ + if purpose not in _ADCP_USE_VALUES: + raise ValueError(f"purpose must be one of {_ADCP_USE_VALUES}, got {purpose!r}") + if not kid: + raise ValueError("kid must be a non-empty string") + + # SPKI public-key PEMs use the exact header `-----BEGIN PUBLIC KEY-----`; + # private-key PEMs use a different header. Match the full sentinel rather + # than a substring so a future PEM type whose header contains the words + # "PUBLIC" + "KEY" (e.g., a hypothetical encrypted-public-key form) + # doesn't silently dispatch to the wrong loader. + if b"-----BEGIN PUBLIC KEY-----" in pem[:128]: + loaded = serialization.load_pem_public_key(pem) + if not isinstance(loaded, (ed25519.Ed25519PublicKey, ec.EllipticCurvePublicKey)): + raise ValueError( + f"unsupported public key type {type(loaded).__name__} — " + f"AdCP signing accepts Ed25519 or ECDSA-P-256 only" + ) + if isinstance(loaded, ec.EllipticCurvePublicKey) and not isinstance( + loaded.curve, ec.SECP256R1 + ): + raise ValueError( + f"EC public key curve {loaded.curve.name} is not supported — only " + f"P-256 (SECP256R1) is allowed" + ) + public_key: ed25519.Ed25519PublicKey | ec.EllipticCurvePublicKey = loaded + else: + public_key = load_private_key_pem(pem, password=password).public_key() + + if isinstance(public_key, ed25519.Ed25519PublicKey): + return _public_jwk_ed25519(public_key, kid=kid, adcp_use=purpose) + return _public_jwk_es256(public_key, kid=kid, adcp_use=purpose) + + def _default_kid(alg: str) -> str: """Default ``kid`` — opaque, collision-resistant. diff --git a/src/adcp/signing/provider.py b/src/adcp/signing/provider.py new file mode 100644 index 000000000..2342eb859 --- /dev/null +++ b/src/adcp/signing/provider.py @@ -0,0 +1,248 @@ +"""SigningProvider abstraction for external key management (KMS / HSM / Vault). + +The pure :func:`adcp.signing.sign_request` primitive accepts a private +key already loaded into process memory. That is the right shape for +tests, the ``adcp-keygen`` PEM path, and small deployments — but it +forces operators who store key material in AWS KMS, GCP KMS, Azure Key +Vault, or Vault Transit to either fork the signer or pull the private +half out of the managed key store at boot, which defeats the point of +using one. + +This module defines a Protocol that decouples the request-signing +profile from how the key is held. The default :class:`InMemorySigningProvider` +matches the existing in-memory path; KMS adapters implement the same +Protocol by calling the provider's signing API in :meth:`SigningProvider.sign`. + +Companion API: :func:`adcp.signing.async_sign_request` accepts a +``SigningProvider`` and is the entry point for KMS-backed signing. The +sync :func:`adcp.signing.sign_request` continues to take the +in-memory ``private_key`` directly — it does not call into a network +KMS. + +Adapter contract +================ + +1. **Lazy init, not eager.** Do not warm the KMS client (fetch the + public key, ping the API, etc.) before the calling process has + bound its listener. gRPC retries inside KMS clients can block + indefinitely; the process never opens its port; an infra + health-check times out without the underlying KMS error surfacing. + Defer any KMS contact to the first :meth:`SigningProvider.sign` + call. Cache success aggressively, but never cache the error result + of a failed warm-up — retry on the next call. + +2. **Type-check the public key against** :meth:`SigningProvider.algorithm`. + Adapters that fetch their public key (for JWKS publication or + rotation tripwires) MUST verify the returned key type matches the + algorithm they advertise. A KMS cryptoKeyVersion that lands on a + P-256 curve when the adapter advertises ``ed25519`` will produce + signatures every verifier rejects. + +3. **Tripwire on rotation.** Managed key stores can silently rotate + the underlying material (the cryptoKeyVersion changes; the + resource name does not). Commit the expected SPKI bytes alongside + code; assert at process start that the provider returns the same + bytes. Mismatch should fail loudly rather than emit signatures no + verifier will accept. + +4. **Distinct key material per** ``adcp_use``. The AdCP signing + profile requires distinct keys per signing purpose — + request-signing, webhook-signing — not just RFC 9421 ``tag`` + isolation. Verifiers enforce the JWK ``adcp_use`` claim per + AdCP #2423. A ``SigningProvider`` instance is bound to ONE purpose; + sharing one provider across request-signing and webhook-signing + emission silently fails at first delivery against a strict-mode + verifier. + +5. **Fingerprint logging caveat.** Cloud-KMS resource names typically + embed the project / account ID. If your adapter exposes a + fingerprint for cache disambiguation, document that it MUST be + redacted before logging, or expose a separate accessor that returns + a redacted form. Raw resource names in observability pipelines leak + the project ID. +""" + +from __future__ import annotations + +from typing import Literal, Protocol, runtime_checkable + +from cryptography.hazmat.primitives.asymmetric import ec, ed25519 + +from adcp.signing.crypto import ( + ALG_ED25519, + ALLOWED_ALGS, + PrivateKey, + sign_signature_base, +) + +#: The algorithm identifiers an AdCP-conformant SigningProvider may +#: advertise. Casing matches :data:`adcp.signing.ALLOWED_ALGS` and the +#: ``alg`` parameter that ends up in the ``Signature-Input`` header. +SigningAlgorithm = Literal["ed25519", "ecdsa-p256-sha256"] + + +@runtime_checkable +class SigningProvider(Protocol): + """Decoupled signing surface for the AdCP request-signing profile. + + A ``SigningProvider`` produces a raw signature over a payload that + is the RFC 9421 *signature base* (NOT a pre-hashed digest). For + Ed25519 this is the message Ed25519 will sign directly; for + ECDSA-P-256 / ES256 the provider MUST hash the input with SHA-256 + internally — passing an already-hashed digest to a KMS that expects + the raw message is the classic ECDSA double-hash pitfall. + + Concretely: KMS adapters using GCP KMS / AWS KMS for ECDSA-P-256 + MUST use ``DigestSign`` rather than the pre-hashed ``Sign`` variant, + and pass the SHA-256 digest of the supplied payload as the digest + argument. They MUST NOT hash twice. + + Lifecycle (see module docstring for full rationale): + + * Lazy init on first :meth:`sign` — no eager warm-up. + * Cache success, never errors. + * Dedup in-flight requests if the underlying client doesn't already. + * One provider instance is bound to one ``adcp_use`` (request-signing + OR webhook-signing — never both). + + Implementations MUST be safe to call :meth:`sign` from multiple + coroutines concurrently. The default :class:`InMemorySigningProvider` + is trivially safe because the underlying ``cryptography`` private-key + objects are immutable. + """ + + async def sign(self, signature_base: bytes) -> bytes: + """Sign ``signature_base`` and return the signature bytes. + + ``signature_base`` is the RFC 9421 signature base — the + canonicalized component list joined by LF, with + ``"@signature-params": ...`` as the final line. The provider + MUST treat it as the raw message, NOT as a pre-hashed digest. + + For ``ecdsa-p256-sha256``: hash with SHA-256 internally, then + produce an IEEE P1363-encoded signature (``r || s``, 64 bytes + total — NOT DER). KMS adapters typically use the platform's + ``DigestSign`` operation with the SHA-256 digest of + ``signature_base``; return the raw concatenated ``r || s`` + (most KMS APIs return DER and require conversion). + + For ``ed25519``: sign the raw input directly. The result is + always 64 bytes. + + :param signature_base: The RFC 9421 signature base bytes to sign. + :returns: The signature bytes (64 bytes for both supported algorithms). + :raises Exception: Any error surfaced by the underlying signer. + The caller (:func:`async_sign_request`) does not retry — that + is the provider's responsibility. + """ + ... + + def key_id(self) -> str: + """Return the JWK ``kid`` advertised at the buyer's ``jwks_uri``. + + Embedded in the ``keyid="..."`` parameter of the + ``Signature-Input`` header. The verifier will look this up in + the seller-side JWKS cache. The signer escapes the value per + RFC 8941 §3.3.3 before writing the header, so a ``kid`` + containing ``"`` or ``\\`` will not break header parsing — but + adapter authors should still prefer opaque, well-formed kids. + """ + ... + + def algorithm(self) -> SigningAlgorithm: + """Return the RFC 9421 alg this provider produces. + + MUST match the algorithm of the published JWK and (for KMS + adapters that fetch their public key at init) MUST be type-checked + against the returned key. Mismatch produces signatures every + conformant verifier rejects. + """ + ... + + +class InMemorySigningProvider: + """Default :class:`SigningProvider` backed by an in-process private key. + + Wraps the same in-memory signing path as the original + :func:`sign_request`. Suitable for tests, local dev, and small + deployments where managed-key-store integration is overkill. + + Treat the key as sensitive: prefer constructing one of these in a + short-lived scope (e.g. inside a ``with`` block that loads the PEM + just before use) rather than holding the instance for the lifetime + of the process when the host runs untrusted code. + + :param private_key: Loaded ``cryptography`` private key. Use + :func:`adcp.signing.load_private_key_pem` to load from a PEM. + :param key_id: The JWK ``kid`` matching the public half published at + ``jwks_uri``. + :param algorithm: One of ``"ed25519"`` or ``"ecdsa-p256-sha256"``. + Defaults to ``"ed25519"``. MUST be the algorithm of + ``private_key`` — passing an Ed25519 key with + ``algorithm="ecdsa-p256-sha256"`` raises :class:`ValueError` + on :meth:`sign`, which fails on the first signed request rather + than producing valid-looking but verifier-rejected output. + """ + + def __init__( + self, + *, + private_key: PrivateKey, + key_id: str, + algorithm: SigningAlgorithm = "ed25519", + ) -> None: + if algorithm not in ALLOWED_ALGS: + raise ValueError(f"algorithm must be one of {sorted(ALLOWED_ALGS)}, got {algorithm!r}") + if not key_id: + raise ValueError("key_id must be a non-empty string") + # Bind private_key type and curve to algorithm at construction. The + # underlying sign_signature_base catches Ed25519/EC mismatch but does + # not check the EC curve — a P-384 or secp256k1 key with + # algorithm="ecdsa-p256-sha256" produces an opaque OverflowError at + # the r||s encoding step. Fail loudly here instead. + if algorithm == ALG_ED25519: + if not isinstance(private_key, ed25519.Ed25519PrivateKey): + raise ValueError( + f"algorithm={algorithm!r} requires an Ed25519 private key, " + f"got {type(private_key).__name__}" + ) + else: # ALG_ES256 + if not isinstance(private_key, ec.EllipticCurvePrivateKey): + raise ValueError( + f"algorithm={algorithm!r} requires an EC private key, " + f"got {type(private_key).__name__}" + ) + if not isinstance(private_key.curve, ec.SECP256R1): + raise ValueError( + f"algorithm={algorithm!r} requires SECP256R1 (P-256), " + f"got curve {private_key.curve.name}" + ) + self._private_key = private_key + self._key_id = key_id + self._algorithm: SigningAlgorithm = algorithm + + async def sign(self, signature_base: bytes) -> bytes: + return sign_signature_base( + alg=self._algorithm, + private_key=self._private_key, + signature_base=signature_base, + ) + + def key_id(self) -> str: + return self._key_id + + def algorithm(self) -> SigningAlgorithm: + return self._algorithm + + def __repr__(self) -> str: + return ( + f"InMemorySigningProvider(key_id={self._key_id!r}, " + f"algorithm={self._algorithm!r}, private_key=)" + ) + + +__all__ = [ + "InMemorySigningProvider", + "SigningAlgorithm", + "SigningProvider", +] diff --git a/src/adcp/signing/signer.py b/src/adcp/signing/signer.py index e993ea9ff..799460cab 100644 --- a/src/adcp/signing/signer.py +++ b/src/adcp/signing/signer.py @@ -2,7 +2,15 @@ Produces the `Signature-Input`, `Signature`, and (optionally) `Content-Digest` headers for a request, per the verifier checklist the other side will run. -The signer is a single pure function; there is no ambient state. + +Two entry points share the same canonicalization spine: + +* :func:`sign_request` — synchronous, takes a loaded ``PrivateKey`` and + signs in process. Use when the key lives in process memory. +* :func:`async_sign_request` — asynchronous, takes a + :class:`adcp.signing.SigningProvider`. Use when the key lives in a + managed key store (KMS / HSM / Vault) or anywhere ``sign`` may + involve I/O. """ from __future__ import annotations @@ -34,6 +42,7 @@ sign_signature_base, ) from adcp.signing.digest import compute_content_digest_sha256 +from adcp.signing.provider import SigningProvider @dataclass(frozen=True) @@ -54,26 +63,62 @@ def as_dict(self) -> dict[str, str]: return headers -def sign_request( +_SF_STRING_ALLOWED: frozenset[str] = frozenset(chr(c) for c in range(0x20, 0x7F)) + + +def _escape_sf_string(value: str, *, field: str) -> str: + """Escape ``value`` for embedding in an RFC 8941 §3.3.3 sf-string. + + The §3.3.3 grammar permits only printable ASCII 0x20-0x7E; ``"`` + and ``\\`` require escaping (and are the only escapes). Control + bytes and non-ASCII have no representation at all and are rejected + here — passing one through would emit a header line that conformant + verifiers parse differently from this serializer (a parser-divergence + bug class) and, in the CRLF case, can also turn into HTTP header + injection at non-httpx integrators that don't sanitize embedded + line terminators. + """ + bad = next((c for c in value if c not in _SF_STRING_ALLOWED), None) + if bad is not None: + raise ValueError( + f"{field} contains character {bad!r} (codepoint {ord(bad):#06x}) " + "not allowed in RFC 8941 sf-string — only printable ASCII 0x20-0x7E " + "may appear in keyid, nonce, or tag values" + ) + return value.replace("\\", "\\\\").replace('"', '\\"') + + +@dataclass(frozen=True) +class _PreparedSignature: + """Result of canonicalizing a request — everything except the raw signature.""" + + base: bytes + raw_value: str + label: str + content_digest_value: str | None + + +def _prepare_signature( *, method: str, url: str, headers: Mapping[str, str], body: bytes, - private_key: PrivateKey, key_id: str, alg: str, - cover_content_digest: bool = False, - created: int | None = None, - expires_in_seconds: int = DEFAULT_EXPIRES_IN_SECONDS, - nonce: str | None = None, - tag: str = DEFAULT_TAG, - label: str = SIG_LABEL_DEFAULT, -) -> SignedHeaders: - """Sign a request and return the headers to add to it. + cover_content_digest: bool, + created: int | None, + expires_in_seconds: int, + nonce: str | None, + tag: str, + label: str, +) -> _PreparedSignature: + """Canonicalize a request into a signable RFC 9421 base. - The caller is responsible for attaching `SignedHeaders.as_dict()` to the - outgoing HTTP request before sending. + Pure: takes only the request shape and the key metadata that lands in + ``Signature-Input``. The actual key material (private key OR + SigningProvider) is applied separately so the sync and async signers + produce byte-identical bases. """ if alg not in ALLOWED_ALGS: raise ValueError(f"alg must be one of {sorted(ALLOWED_ALGS)}, got {alg!r}") @@ -81,6 +126,8 @@ def sign_request( raise ValueError( f"expires_in_seconds must be in (0, {MAX_WINDOW_SECONDS}], got {expires_in_seconds}" ) + if not key_id: + raise ValueError("key_id must be a non-empty string") if created is None: created = int(time.time()) @@ -99,9 +146,12 @@ def sign_request( components.append("content-digest") comp_serialized = "(" + " ".join(f'"{c}"' for c in components) + ")" + nonce_escaped = _escape_sf_string(nonce, field="nonce") + key_id_escaped = _escape_sf_string(key_id, field="key_id") + tag_escaped = _escape_sf_string(tag, field="tag") params_serialized = ( - f';created={created};expires={expires};nonce="{nonce}"' - f';keyid="{key_id}";alg="{alg}";tag="{tag}"' + f';created={created};expires={expires};nonce="{nonce_escaped}"' + f';keyid="{key_id_escaped}";alg="{alg}";tag="{tag_escaped}"' ) raw_value = comp_serialized + params_serialized @@ -121,13 +171,107 @@ def sign_request( base = build_signature_base( method=method, url=url, headers=outgoing_headers, parsed=parsed ).encode("utf-8") - sig_bytes = sign_signature_base(alg=alg, private_key=private_key, signature_base=base) + return _PreparedSignature( + base=base, + raw_value=raw_value, + label=label, + content_digest_value=content_digest_value, + ) + +def _assemble_headers(prepared: _PreparedSignature, sig_bytes: bytes) -> SignedHeaders: return SignedHeaders( - signature_input=f"{label}={raw_value}", - signature=format_signature_header(sig_bytes, label=label), - content_digest=content_digest_value, + signature_input=f"{prepared.label}={prepared.raw_value}", + signature=format_signature_header(sig_bytes, label=prepared.label), + content_digest=prepared.content_digest_value, + ) + + +def sign_request( + *, + method: str, + url: str, + headers: Mapping[str, str], + body: bytes, + private_key: PrivateKey, + key_id: str, + alg: str, + cover_content_digest: bool = False, + created: int | None = None, + expires_in_seconds: int = DEFAULT_EXPIRES_IN_SECONDS, + nonce: str | None = None, + tag: str = DEFAULT_TAG, + label: str = SIG_LABEL_DEFAULT, +) -> SignedHeaders: + """Sign a request and return the headers to add to it. + + The caller is responsible for attaching `SignedHeaders.as_dict()` to the + outgoing HTTP request before sending. + """ + prepared = _prepare_signature( + method=method, + url=url, + headers=headers, + body=body, + key_id=key_id, + alg=alg, + cover_content_digest=cover_content_digest, + created=created, + expires_in_seconds=expires_in_seconds, + nonce=nonce, + tag=tag, + label=label, + ) + sig_bytes = sign_signature_base(alg=alg, private_key=private_key, signature_base=prepared.base) + return _assemble_headers(prepared, sig_bytes) + + +async def async_sign_request( + *, + method: str, + url: str, + headers: Mapping[str, str], + body: bytes, + provider: SigningProvider, + cover_content_digest: bool = False, + created: int | None = None, + expires_in_seconds: int = DEFAULT_EXPIRES_IN_SECONDS, + nonce: str | None = None, + tag: str = DEFAULT_TAG, + label: str = SIG_LABEL_DEFAULT, +) -> SignedHeaders: + """Sign a request via a :class:`SigningProvider` and return its headers. + + Async counterpart to :func:`sign_request`. Use this entry point when + the key lives in a managed key store (KMS / HSM / Vault) — the + provider's :meth:`SigningProvider.sign` may involve network I/O. + + The provider's :meth:`SigningProvider.key_id` and + :meth:`SigningProvider.algorithm` are read once and embedded in the + ``Signature-Input`` header. Calling them MUST NOT trigger any + expensive work in the adapter — those are constant-like accessors. + The KMS round-trip belongs in :meth:`SigningProvider.sign`. + + The signature base passed to :meth:`SigningProvider.sign` is the + raw RFC 9421 base — NOT a pre-hashed digest. See the + :class:`SigningProvider` docstring for the ECDSA double-hash caveat. + """ + prepared = _prepare_signature( + method=method, + url=url, + headers=headers, + body=body, + key_id=provider.key_id(), + alg=provider.algorithm(), + cover_content_digest=cover_content_digest, + created=created, + expires_in_seconds=expires_in_seconds, + nonce=nonce, + tag=tag, + label=label, ) + sig_bytes = await provider.sign(prepared.base) + return _assemble_headers(prepared, sig_bytes) __all__ = [ @@ -136,5 +280,6 @@ def sign_request( "DEFAULT_EXPIRES_IN_SECONDS", "DEFAULT_TAG", "SignedHeaders", + "async_sign_request", "sign_request", ] diff --git a/tests/conformance/signing/test_signing_provider.py b/tests/conformance/signing/test_signing_provider.py new file mode 100644 index 000000000..94e480071 --- /dev/null +++ b/tests/conformance/signing/test_signing_provider.py @@ -0,0 +1,403 @@ +"""Tests for the SigningProvider abstraction (issue #283). + +Coverage: + +* :class:`InMemorySigningProvider` round-trips through + :func:`async_sign_request` + :func:`verify_request_signature` for both + ed25519 and ecdsa-p256-sha256. +* :func:`async_sign_request` produces byte-identical headers to + :func:`sign_request` given the same inputs (modulo signature value + for ECDSA, which is non-deterministic). +* RFC 8941 escaping: a ``key_id`` containing ``"`` and ``\\`` survives + the sign → header → re-parse → verify round-trip. +* :func:`pem_to_adcp_jwk` produces a JWK byte-shape-identical to + :func:`generate_signing_keypair`'s public half, accepts SPKI public + PEMs, and rejects unsupported key types. +* Protocol is :func:`runtime_checkable` so adapter authors can use + ``isinstance``. +""" + +from __future__ import annotations + +import asyncio +import json +import time +from pathlib import Path + +import pytest +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa + +from adcp.signing import ( + InMemorySigningProvider, + SigningProvider, + StaticJwksResolver, + VerifierCapability, + VerifyOptions, + async_sign_request, + generate_signing_keypair, + parse_signature_input_header, + pem_to_adcp_jwk, + private_key_from_jwk, + sign_request, + verify_request_signature, +) + +VECTORS_DIR = Path(__file__).parent.parent / "vectors" / "request-signing" +KEYS = json.loads((VECTORS_DIR / "keys.json").read_text())["keys"] +ED25519_KEY = next(k for k in KEYS if k["kid"] == "test-ed25519-2026") +ES256_KEY = next(k for k in KEYS if k["kid"] == "test-es256-2026") + + +def _verify_options(keys: list[dict]) -> VerifyOptions: + return VerifyOptions( + now=float(int(time.time())), + capability=VerifierCapability( + covers_content_digest="either", + required_for=frozenset({"create_media_buy"}), + ), + operation="create_media_buy", + jwks_resolver=StaticJwksResolver({"keys": keys}), + ) + + +def _run(coro): + return asyncio.run(coro) + + +def test_in_memory_provider_satisfies_protocol() -> None: + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + provider = InMemorySigningProvider( + private_key=private_key, key_id="test-ed25519-2026", algorithm="ed25519" + ) + assert isinstance(provider, SigningProvider) + assert provider.key_id() == "test-ed25519-2026" + assert provider.algorithm() == "ed25519" + + +def test_async_sign_then_verify_ed25519() -> None: + body = b'{"plan_id":"plan_001"}' + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + provider = InMemorySigningProvider( + private_key=private_key, key_id="test-ed25519-2026", algorithm="ed25519" + ) + + signed = _run( + async_sign_request( + method="POST", + url="https://seller.example.com/adcp/create_media_buy", + headers={"Content-Type": "application/json"}, + body=body, + provider=provider, + ) + ) + + headers = {"Content-Type": "application/json", **signed.as_dict()} + result = verify_request_signature( + method="POST", + url="https://seller.example.com/adcp/create_media_buy", + headers=headers, + body=body, + options=_verify_options([ED25519_KEY]), + ) + assert result.key_id == "test-ed25519-2026" + assert result.alg == "ed25519" + + +def test_async_sign_then_verify_es256() -> None: + body = b'{"plan_id":"plan_001"}' + private_key = private_key_from_jwk(ES256_KEY, d_field="_private_d_for_test_only") + provider = InMemorySigningProvider( + private_key=private_key, + key_id="test-es256-2026", + algorithm="ecdsa-p256-sha256", + ) + + signed = _run( + async_sign_request( + method="POST", + url="https://seller.example.com/adcp/create_media_buy", + headers={"Content-Type": "application/json"}, + body=body, + provider=provider, + ) + ) + + headers = {"Content-Type": "application/json", **signed.as_dict()} + result = verify_request_signature( + method="POST", + url="https://seller.example.com/adcp/create_media_buy", + headers=headers, + body=body, + options=_verify_options([ES256_KEY]), + ) + assert result.alg == "ecdsa-p256-sha256" + + +def test_async_sign_includes_content_digest_when_requested() -> None: + body = b'{"plan_id":"plan_001"}' + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + provider = InMemorySigningProvider( + private_key=private_key, key_id="test-ed25519-2026", algorithm="ed25519" + ) + signed = _run( + async_sign_request( + method="POST", + url="https://seller.example.com/adcp/create_media_buy", + headers={"Content-Type": "application/json"}, + body=body, + provider=provider, + cover_content_digest=True, + ) + ) + assert signed.content_digest is not None + assert '"content-digest"' in signed.signature_input + + +def test_sync_and_async_byte_identical_for_ed25519() -> None: + """With identical inputs (including nonce/created), sync and async + paths produce byte-identical headers — the canonicalization spine is + shared and Ed25519 is deterministic. + """ + body = b'{"plan_id":"plan_001"}' + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + provider = InMemorySigningProvider( + private_key=private_key, key_id="test-ed25519-2026", algorithm="ed25519" + ) + + pinned = { + "method": "POST", + "url": "https://seller.example.com/adcp/create_media_buy", + "headers": {"Content-Type": "application/json"}, + "body": body, + "created": 1714500000, + "nonce": "AAAAAAAAAAAAAAAAAAAAAA", + } + sync_signed = sign_request( + **pinned, private_key=private_key, key_id="test-ed25519-2026", alg="ed25519" + ) + async_signed = _run(async_sign_request(**pinned, provider=provider)) + + assert sync_signed.signature_input == async_signed.signature_input + assert sync_signed.signature == async_signed.signature + assert sync_signed.content_digest == async_signed.content_digest + + +def test_sync_and_async_signature_input_identical_for_es256() -> None: + """Signature-Input and Content-Digest must match between sync and + async ECDSA paths. The signature value itself is non-deterministic + (random k), so don't compare it — but everything fed INTO the + signer is the same canonicalization spine, and that's the + regression risk: a future change to `_prepare_signature` that + diverges only on one alg path would slip past the Ed25519 test. + """ + body = b'{"plan_id":"plan_001"}' + private_key = private_key_from_jwk(ES256_KEY, d_field="_private_d_for_test_only") + provider = InMemorySigningProvider( + private_key=private_key, + key_id="test-es256-2026", + algorithm="ecdsa-p256-sha256", + ) + + pinned = { + "method": "POST", + "url": "https://seller.example.com/adcp/create_media_buy", + "headers": {"Content-Type": "application/json"}, + "body": body, + "created": 1714500000, + "nonce": "AAAAAAAAAAAAAAAAAAAAAA", + "cover_content_digest": True, + } + sync_signed = sign_request( + **pinned, + private_key=private_key, + key_id="test-es256-2026", + alg="ecdsa-p256-sha256", + ) + async_signed = _run(async_sign_request(**pinned, provider=provider)) + + assert sync_signed.signature_input == async_signed.signature_input + assert sync_signed.content_digest == async_signed.content_digest + + +def test_key_id_with_quotes_and_backslash_round_trips() -> None: + """RFC 8941 §3.3.3: ``"`` and ``\\`` are the only sf-string escapes. + + Without the escaping fix, a key_id containing ``"`` would terminate + the keyid param early, the rest would bleed into the next param, + parse would diverge, and verify would fail. The fix escapes both + chars at the serialization point. + """ + body = b"{}" + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + weird_kid = 'kid"with\\quotes-and-backslash' + + # Build a JWK that advertises the same weird kid so verify can find it. + kid_jwk = dict(ED25519_KEY) + kid_jwk["kid"] = weird_kid + + signed = sign_request( + method="POST", + url="https://seller.example.com/adcp/create_media_buy", + headers={"Content-Type": "application/json"}, + body=body, + private_key=private_key, + key_id=weird_kid, + alg="ed25519", + ) + + parsed = parse_signature_input_header(signed.signature_input) + assert parsed["sig1"].params["keyid"] == weird_kid + + headers = {"Content-Type": "application/json", **signed.as_dict()} + result = verify_request_signature( + method="POST", + url="https://seller.example.com/adcp/create_media_buy", + headers=headers, + body=body, + options=_verify_options([kid_jwk]), + ) + assert result.key_id == weird_kid + + +def test_pem_to_adcp_jwk_matches_generated_keypair_ed25519() -> None: + """The helper produces the same JWK shape as ``generate_signing_keypair``.""" + pem, expected_jwk = generate_signing_keypair( + alg="ed25519", kid="round-trip-test", purpose="webhook-signing" + ) + derived = pem_to_adcp_jwk(pem, kid="round-trip-test", purpose="webhook-signing") + assert derived == expected_jwk + + +def test_pem_to_adcp_jwk_matches_generated_keypair_es256() -> None: + pem, expected_jwk = generate_signing_keypair( + alg="es256", kid="es256-round-trip", purpose="request-signing" + ) + derived = pem_to_adcp_jwk(pem, kid="es256-round-trip", purpose="request-signing") + assert derived == expected_jwk + + +def test_pem_to_adcp_jwk_accepts_spki_public_pem() -> None: + """KMS deployments often only have the SPKI public PEM available — + the private half never leaves the managed store.""" + pem, _ = generate_signing_keypair(alg="ed25519", kid="kms-style", purpose="request-signing") + private = serialization.load_pem_private_key(pem, password=None) + spki_pem = private.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + + derived = pem_to_adcp_jwk(spki_pem, kid="kms-style", purpose="request-signing") + assert derived["adcp_use"] == "request-signing" + assert derived["alg"] == "EdDSA" + assert derived["key_ops"] == ["verify"] + assert "d" not in derived # never include private scalar + assert derived["kid"] == "kms-style" + + +def test_pem_to_adcp_jwk_rejects_invalid_purpose() -> None: + pem, _ = generate_signing_keypair(alg="ed25519", kid="x", purpose="request-signing") + with pytest.raises(ValueError, match="purpose must be one of"): + pem_to_adcp_jwk(pem, kid="x", purpose="some-other-purpose") # type: ignore[arg-type] + + +def test_pem_to_adcp_jwk_rejects_empty_kid() -> None: + pem, _ = generate_signing_keypair(alg="ed25519", kid="x", purpose="request-signing") + with pytest.raises(ValueError, match="kid must be a non-empty string"): + pem_to_adcp_jwk(pem, kid="", purpose="request-signing") + + +def test_pem_to_adcp_jwk_rejects_rsa_pem() -> None: + rsa_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + rsa_pem = rsa_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + with pytest.raises(ValueError, match="unsupported private key type"): + pem_to_adcp_jwk(rsa_pem, kid="x", purpose="request-signing") + + +def test_in_memory_provider_rejects_unknown_algorithm() -> None: + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + with pytest.raises(ValueError, match="algorithm must be one of"): + InMemorySigningProvider( + private_key=private_key, + key_id="x", + algorithm="rsa-pss", # type: ignore[arg-type] + ) + + +def test_in_memory_provider_rejects_empty_key_id() -> None: + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + with pytest.raises(ValueError, match="key_id must be a non-empty string"): + InMemorySigningProvider(private_key=private_key, key_id="", algorithm="ed25519") + + +def test_in_memory_provider_rejects_ed25519_key_with_es256_algorithm() -> None: + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + with pytest.raises(ValueError, match="requires an EC private key"): + InMemorySigningProvider(private_key=private_key, key_id="x", algorithm="ecdsa-p256-sha256") + + +def test_in_memory_provider_rejects_es256_key_with_ed25519_algorithm() -> None: + private_key = private_key_from_jwk(ES256_KEY, d_field="_private_d_for_test_only") + with pytest.raises(ValueError, match="requires an Ed25519 private key"): + InMemorySigningProvider(private_key=private_key, key_id="x", algorithm="ed25519") + + +def test_in_memory_provider_rejects_wrong_curve_for_es256() -> None: + """A non-P-256 EC key would silently pass `sign_signature_base`'s + isinstance check and fail later with an opaque OverflowError on + `r.to_bytes(32, ...)`. The constructor-time check fails clearly.""" + from cryptography.hazmat.primitives.asymmetric import ec + + p384_key = ec.generate_private_key(ec.SECP384R1()) + with pytest.raises(ValueError, match="requires SECP256R1"): + InMemorySigningProvider(private_key=p384_key, key_id="x", algorithm="ecdsa-p256-sha256") + + +def test_sign_request_rejects_key_id_with_control_characters() -> None: + """RFC 8941 §3.3.3 sf-string permits only printable ASCII. CRLF in a + key_id would otherwise produce a Signature-Input header with a literal + line break — a header-injection vector at non-httpx integrators.""" + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + with pytest.raises(ValueError, match="key_id contains character"): + sign_request( + method="POST", + url="https://seller.example.com/x", + headers={}, + body=b"{}", + private_key=private_key, + key_id="kid\r\nInjected: 1", + alg="ed25519", + ) + + +def test_sign_request_rejects_key_id_with_non_ascii() -> None: + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + with pytest.raises(ValueError, match="key_id contains character"): + sign_request( + method="POST", + url="https://seller.example.com/x", + headers={}, + body=b"{}", + private_key=private_key, + key_id="kid”", # right double quotation mark — sf-string parser-divergence risk + alg="ed25519", + ) + + +def test_sign_request_rejects_tag_with_control_characters() -> None: + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + with pytest.raises(ValueError, match="tag contains character"): + sign_request( + method="POST", + url="https://seller.example.com/x", + headers={}, + body=b"{}", + private_key=private_key, + key_id="ok", + alg="ed25519", + tag="adcp\x00bad", + ) From c62a422fdf9ba5b564a2a9ebcde90650029b3f61 Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Thu, 30 Apr 2026 10:43:39 -0400 Subject: [PATCH 2/2] fix(signing): validate sig label as RFC 8941 sf-key (#283) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `label` kwarg on `sign_request` / `async_sign_request` is a public input that lands unquoted in both the `Signature-Input` and `Signature` headers. Without validation, a CRLF or other non-token character in the label would inject extra header bytes (at non-httpx integrators that don't sanitize embedded line terminators) or produce a label that conformant verifiers parse differently from this serializer. Add `_validate_sf_key()` enforcing the RFC 8941 §3.1.2 token grammar: must start with `[a-z*]`, then `[a-z0-9_\-.*]`. Applied at the entry of `_prepare_signature` so both sync and async signers are covered. Same parser-divergence / header-injection class as the `keyid`/`nonce`/`tag` escaping fix already in this PR — just for the remaining unguarded input. Also adds the SPKI-public-key RSA rejection test for `pem_to_adcp_jwk` that was missing from the previous coverage round. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/adcp/signing/signer.py | 31 ++++++++++ .../signing/test_signing_provider.py | 61 +++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/src/adcp/signing/signer.py b/src/adcp/signing/signer.py index 799460cab..0a500a268 100644 --- a/src/adcp/signing/signer.py +++ b/src/adcp/signing/signer.py @@ -65,6 +65,36 @@ def as_dict(self) -> dict[str, str]: _SF_STRING_ALLOWED: frozenset[str] = frozenset(chr(c) for c in range(0x20, 0x7F)) +# RFC 8941 §3.1.2 key grammar: ( lcalpha / "*" ) *( lcalpha / DIGIT / "_" / "-" / "." / "*" ) +_SF_KEY_HEAD_ALLOWED: frozenset[str] = frozenset("abcdefghijklmnopqrstuvwxyz*") +_SF_KEY_TAIL_ALLOWED: frozenset[str] = frozenset("abcdefghijklmnopqrstuvwxyz0123456789_-.*") + + +def _validate_sf_key(value: str, *, field: str) -> str: + """Validate ``value`` as an RFC 8941 §3.1.2 sf-key (token). + + Sig-labels are emitted unquoted in both ``Signature-Input`` and + ``Signature`` headers, so they cannot use sf-string escaping — + they must be valid tokens. Anything outside the §3.1.2 grammar + would either inject extra header bytes (CRLF) or produce a + label conformant verifiers parse differently. + """ + if not value: + raise ValueError(f"{field} must be a non-empty RFC 8941 sf-key") + if value[0] not in _SF_KEY_HEAD_ALLOWED: + raise ValueError( + f"{field}={value!r} is not a valid RFC 8941 sf-key — must start with " + "a lowercase letter or '*'" + ) + bad = next((c for c in value[1:] if c not in _SF_KEY_TAIL_ALLOWED), None) + if bad is not None: + raise ValueError( + f"{field}={value!r} is not a valid RFC 8941 sf-key — character " + f"{bad!r} (codepoint {ord(bad):#06x}) is not allowed; " + "only [a-z0-9_-.*] permitted after the first character" + ) + return value + def _escape_sf_string(value: str, *, field: str) -> str: """Escape ``value`` for embedding in an RFC 8941 §3.3.3 sf-string. @@ -128,6 +158,7 @@ def _prepare_signature( ) if not key_id: raise ValueError("key_id must be a non-empty string") + _validate_sf_key(label, field="label") if created is None: created = int(time.time()) diff --git a/tests/conformance/signing/test_signing_provider.py b/tests/conformance/signing/test_signing_provider.py index 94e480071..72a7ab68d 100644 --- a/tests/conformance/signing/test_signing_provider.py +++ b/tests/conformance/signing/test_signing_provider.py @@ -388,6 +388,67 @@ def test_sign_request_rejects_key_id_with_non_ascii() -> None: ) +def test_sign_request_rejects_label_with_crlf() -> None: + """``label`` lands unquoted in both Signature-Input and Signature + headers; a CRLF here would inject extra header bytes. RFC 8941 + §3.1.2 sf-keys are restricted to a token grammar.""" + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + with pytest.raises(ValueError, match="not a valid RFC 8941 sf-key"): + sign_request( + method="POST", + url="https://seller.example.com/x", + headers={}, + body=b"{}", + private_key=private_key, + key_id="ok", + alg="ed25519", + label="sig1\r\nX-Injected: 1", + ) + + +def test_sign_request_rejects_label_starting_with_uppercase() -> None: + """RFC 8941 §3.1.2 sf-keys must start with lowercase letter or '*'.""" + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + with pytest.raises(ValueError, match="must start with a lowercase letter"): + sign_request( + method="POST", + url="https://seller.example.com/x", + headers={}, + body=b"{}", + private_key=private_key, + key_id="ok", + alg="ed25519", + label="Sig1", + ) + + +def test_sign_request_rejects_empty_label() -> None: + private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") + with pytest.raises(ValueError, match="label must be a non-empty"): + sign_request( + method="POST", + url="https://seller.example.com/x", + headers={}, + body=b"{}", + private_key=private_key, + key_id="ok", + alg="ed25519", + label="", + ) + + +def test_pem_to_adcp_jwk_rejects_rsa_public_pem() -> None: + """The SPKI public-PEM path through `load_pem_public_key` must + reject RSA the same way the private-PEM path does.""" + rsa_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + rsa_spki = rsa_key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + with pytest.raises(ValueError, match="unsupported public key type"): + pem_to_adcp_jwk(rsa_spki, kid="x", purpose="request-signing") + + def test_sign_request_rejects_tag_with_control_characters() -> None: private_key = private_key_from_jwk(ED25519_KEY, d_field="_private_d_for_test_only") with pytest.raises(ValueError, match="tag contains character"):