diff --git a/deps/test-requirements.txt b/deps/test-requirements.txt index e8ab486b2..a006aa4db 100644 --- a/deps/test-requirements.txt +++ b/deps/test-requirements.txt @@ -15,3 +15,5 @@ pytest-mock >= 2.0.0 mock >= 4.0; python_version < "3.8" pytest-xdist >= 1.31.0 pytest >= 6.0.0,<9.0.0 +# pyO3 only supports pypy 3.11 (or any cpython version) +cryptography >= 43.0.0; platform_python_implementation != "PyPy" or python_version >= "3.11" diff --git a/pyproject.toml b/pyproject.toml index ff6c7b0e0..90a7f33a7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,8 @@ dependencies = [ [project.optional-dependencies] # `pip install stripe[async]` gets you everything + `httpx`, so our async stuff works out of the box async = ["httpx"] +# `pip install stripe[request-signing]` enables request signing with ECDSA P-256 keys +request-signing = ["cryptography >= 43.0"] [project.urls] homepage = "https://stripe.com/" diff --git a/stripe/__init__.py b/stripe/__init__.py index ec8c10d93..b3b51f31e 100644 --- a/stripe/__init__.py +++ b/stripe/__init__.py @@ -431,6 +431,9 @@ def set_app_info( from stripe._refund_service import RefundService as RefundService from stripe._reporting_service import ReportingService as ReportingService from stripe._request_options import RequestOptions as RequestOptions + from stripe._request_signing_authenticator import ( + RequestSigningOptions as RequestSigningOptions, + ) from stripe._requestor_options import RequestorOptions as RequestorOptions from stripe._reserve_transaction import ( ReserveTransaction as ReserveTransaction, @@ -801,6 +804,8 @@ def set_app_info( "RefundService": ("stripe._refund_service", False), "ReportingService": ("stripe._reporting_service", False), "RequestOptions": ("stripe._request_options", False), + # TODO: move this to codegen + "RequestSigningOptions": ("stripe._request_signing_authenticator", False), "RequestorOptions": ("stripe._requestor_options", False), "ReserveTransaction": ("stripe._reserve_transaction", False), "Reversal": ("stripe._reversal", False), diff --git a/stripe/_api_requestor.py b/stripe/_api_requestor.py index 237a32030..8a9d3fa39 100644 --- a/stripe/_api_requestor.py +++ b/stripe/_api_requestor.py @@ -65,6 +65,9 @@ if TYPE_CHECKING: from stripe._app_info import AppInfo from stripe._stripe_object import StripeObject + from stripe._request_signing_authenticator import ( + RequestSigningAuthenticator, + ) HttpVerb = Literal["get", "post", "delete"] @@ -78,6 +81,7 @@ def is_v2_delete_resp(method: str, api_mode: ApiMode) -> bool: class _APIRequestor(object): _instance: ClassVar["_APIRequestor|None"] = None + _authenticator: Optional["RequestSigningAuthenticator"] = None def __init__( self, @@ -89,6 +93,15 @@ def __init__( self._options = options self._client = client + if options.signing_keys is not None: + from stripe._request_signing_authenticator import ( + RequestSigningAuthenticator, + ) + + self._authenticator = RequestSigningAuthenticator( + options.signing_keys + ) + # In the case of client=None, we should use the current value of stripe.default_http_client # or lazily initialize it. Since stripe.default_http_client can change throughout the lifetime of # an _APIRequestor, we shouldn't set it as stripe._client and should access it only through this @@ -534,9 +547,12 @@ def request_headers( headers: Dict[str, str] = { "X-Stripe-Client-User-Agent": json.dumps(ua), "User-Agent": user_agent, - "Authorization": "Bearer %s" % (options.get("api_key"),), } + api_key = options.get("api_key") + if api_key is not None: + headers["Authorization"] = "Bearer %s" % (api_key,) + stripe_account = options.get("stripe_account") if stripe_account: headers["Stripe-Account"] = stripe_account @@ -590,15 +606,6 @@ def _args_for_request_with_retries( # If user specified an API version, honor it request_options["stripe_version"] = options["stripe_version"] - if request_options.get("api_key") is None: - raise error.AuthenticationError( - "No API key provided. (HINT: set your API key using " - '"stripe.api_key = "). You can generate API keys ' - "from the Stripe web interface. See https://stripe.com/api " - "for details, or email support@stripe.com if you have any " - "questions." - ) - abs_url = "%s%s" % ( self._options.base_addresses.get(base_address), url, @@ -697,6 +704,19 @@ def _args_for_request_with_retries( max_network_retries = request_options.get("max_network_retries") + if self._authenticator is not None: + headers = self._authenticator.apply_signing_headers( + method, abs_url, headers, post_data + ) + + if not headers.get("Authorization"): + raise error.AuthenticationError( + "No API key or request signing mechanism provided. You can generate API keys " + "from the Stripe web interface. See https://stripe.com/api " + "for details, or email support@stripe.com if you have any " + "questions." + ) + return ( # Actual args method, diff --git a/stripe/_request_signing_authenticator.py b/stripe/_request_signing_authenticator.py new file mode 100644 index 000000000..4ece8134e --- /dev/null +++ b/stripe/_request_signing_authenticator.py @@ -0,0 +1,168 @@ +import base64 +import hashlib +import os +import time +from email.utils import formatdate +from typing import Dict, List, Optional, Union +from typing_extensions import TypedDict + + +class RequestSigningOptions(TypedDict): + """Options for ECDSA P-256 request signing. Requires `pip install stripe[request-signing]`.""" + + key_id: str + """The signing key ID from the Stripe dashboard (starts with `keyid_`).""" + + private_key: Union[str, bytes] + """PEM-encoded ECDSA P-256 (secp256r1) private key.""" + + +class RequestSigningAuthenticator: + """ + Authenticates requests using ECDSA P-256 request signing (RFC 9421 patterns). + Used as an alternative to Bearer token auth. + + Requires the `cryptography` package: `pip install stripe[request-signing]` + """ + + _SIGNATURE_VALIDITY_SECONDS = 300 + _ALGORITHM = "ecdsa-p256-sha256" + + def __init__(self, options: RequestSigningOptions): + try: + from cryptography.hazmat.primitives.serialization import ( + load_pem_private_key, + ) + from cryptography.hazmat.primitives.asymmetric.ec import ( + EllipticCurvePrivateKey, + SECP256R1, + ) + except ImportError: + raise ImportError( + "The `cryptography` package is required for request signing. " + "Install it with: `pip install stripe[request-signing]`" + ) + + pem = options["private_key"] + if isinstance(pem, str): + pem = pem.encode("utf-8") + + private_key = load_pem_private_key(pem, password=None) + + if not isinstance( + private_key, EllipticCurvePrivateKey + ) or not isinstance(private_key.curve, SECP256R1): + raise ValueError( + "private_key must be an ECDSA P-256 (secp256r1) private key" + ) + + self._signing_key = options["key_id"] + self._private_key = private_key + + def apply_signing_headers( + self, + method: str, + abs_url: str, + headers: Dict[str, str], + body: Optional[Union[str, bytes]], + ) -> Dict[str, str]: + """ + Adds request signing headers to an outgoing request, replacing the + Authorization header with STRIPE-V1-SIG scheme. + """ + headers["Authorization"] = f"STRIPE-V1-SIG {self._signing_key}" + + if isinstance(body, str): + body_bytes = body.encode("utf-8") + elif body is not None: + body_bytes = bytes(body) + else: + body_bytes = b"" + digest = base64.b64encode(hashlib.sha256(body_bytes).digest()).decode( + "ascii" + ) + headers["Content-Digest"] = f"sha-256=:{digest}:" + + if "Date" not in headers: + headers["Date"] = formatdate( + timeval=None, localtime=False, usegmt=True + ) + + now = int(time.time()) + nonce = base64.b64encode(os.urandom(16)).decode("ascii") + covered = self._covered_components(headers) + sig_params = self._signature_params(covered, now, nonce) + sig_base = self._signature_base( + method, abs_url, headers, covered, sig_params + ) + + raw_sig = self._sign(sig_base.encode("utf-8")) + encoded_sig = base64.b64encode(raw_sig).decode("ascii") + headers["Signature-Input"] = f"sig1={sig_params}" + headers["Signature"] = f"sig1=:{encoded_sig}:" + + return headers + + def _covered_components(self, headers: Dict[str, str]) -> List[str]: + components = [ + '"@method"', + '"@target-uri"', + '"authorization"', + '"content-digest"', + ] + if "Stripe-Account" in headers: + components.append('"stripe-account"') + if "Stripe-Context" in headers: + components.append('"stripe-context"') + if "Date" in headers: + components.append('"date"') + return components + + def _signature_params( + self, covered: List[str], created: int, nonce: str + ) -> str: + expires = created + self._SIGNATURE_VALIDITY_SECONDS + return ( + f"({' '.join(covered)});created={created};expires={expires};" + f'nonce="{nonce}";alg="{self._ALGORITHM}"' + ) + + def _signature_base( + self, + method: str, + abs_url: str, + headers: Dict[str, str], + covered: List[str], + sig_params: str, + ) -> str: + lines: List[str] = [] + for component in covered: + name = component.strip('"') + if name == "@method": + lines.append(f'"@method": {method.upper()}') + elif name == "@target-uri": + lines.append(f'"@target-uri": {abs_url}') + else: + value = self._header_value(headers, name) + lines.append(f'"{name}": {value}') + lines.append(f'"@signature-params": {sig_params}') + return "\n".join(lines) + + def _header_value(self, headers: Dict[str, str], name: str) -> str: + name_lower = name.lower() + for key, value in headers.items(): + if key.lower() == name_lower: + return value + return "" + + def _sign(self, data: bytes) -> bytes: + from cryptography.hazmat.primitives.asymmetric.ec import ECDSA + from cryptography.hazmat.primitives.hashes import SHA256 + from cryptography.hazmat.primitives.asymmetric.utils import ( + decode_dss_signature, + ) + + der_sig = self._private_key.sign(data, ECDSA(SHA256())) + r, s = decode_dss_signature(der_sig) + # Convert to fixed 64-byte raw (r||s) format + return r.to_bytes(32, "big") + s.to_bytes(32, "big") diff --git a/stripe/_requestor_options.py b/stripe/_requestor_options.py index ad5c0f084..4ac894ab9 100644 --- a/stripe/_requestor_options.py +++ b/stripe/_requestor_options.py @@ -6,6 +6,7 @@ from typing_extensions import TYPE_CHECKING if TYPE_CHECKING: + from stripe._request_signing_authenticator import RequestSigningOptions from stripe._stripe_context import StripeContext @@ -16,6 +17,7 @@ class RequestorOptions(object): stripe_version: Optional[str] base_addresses: BaseAddresses max_network_retries: Optional[int] + signing_keys: Optional["RequestSigningOptions"] def __init__( self, @@ -25,12 +27,14 @@ def __init__( stripe_version: Optional[str] = None, base_addresses: Optional[BaseAddresses] = None, max_network_retries: Optional[int] = None, + signing_keys: Optional["RequestSigningOptions"] = None, ): self.api_key = api_key self.stripe_account = stripe_account self.stripe_context = stripe_context self.stripe_version = stripe_version self.base_addresses = {} + self.signing_keys = signing_keys if base_addresses: # Base addresses can be unset (for correct merging). @@ -59,6 +63,7 @@ def to_dict(self): "stripe_version": self.stripe_version, "base_addresses": self.base_addresses, "max_network_retries": self.max_network_retries, + "signing_keys": self.signing_keys, } @@ -94,3 +99,7 @@ def stripe_context(self): @property def max_network_retries(self): return stripe.max_network_retries + + @property + def signing_keys(self): + return None diff --git a/stripe/_stripe_client.py b/stripe/_stripe_client.py index 8f6c23ebc..6f8e9c2a3 100644 --- a/stripe/_stripe_client.py +++ b/stripe/_stripe_client.py @@ -14,6 +14,7 @@ from stripe._error import AuthenticationError from stripe._request_options import extract_options_from_dict from stripe._requestor_options import RequestorOptions, BaseAddresses +from stripe._request_signing_authenticator import RequestSigningOptions from stripe._client_options import _ClientOptions from stripe._http_client import ( new_default_http_client, @@ -136,7 +137,7 @@ class StripeClient(object): def __init__( self, - api_key: str, + api_key: Optional[str] = None, *, stripe_account: Optional[str] = None, stripe_context: "Optional[Union[str, StripeContext]]" = None, @@ -147,13 +148,18 @@ def __init__( proxy: Optional[str] = None, max_network_retries: Optional[int] = None, http_client: Optional["HTTPClient"] = None, + signing_keys: "Optional[RequestSigningOptions]" = None, ): - # The types forbid this, but let's give users without types a friendly error. - if api_key is None: # pyright: ignore[reportUnnecessaryComparison] + if api_key is not None and signing_keys is not None: + raise ValueError( + "Cannot use both `api_key` and `signing_keys`. " + "Request signing_keys replaces API key authentication." + ) + + if api_key is None and signing_keys is None: raise AuthenticationError( - "No API key provided. (HINT: set your API key using " - '"client = stripe.StripeClient()"). You can ' - "generate API keys from the Stripe web interface. " + "No authentication provided. Supply an `api_key` or a `signing_keys` " + "config. You can generate API keys from the Stripe web interface. " "See https://stripe.com/api for details, or email " "support@stripe.com if you have any questions." ) @@ -182,6 +188,7 @@ def __init__( stripe_version=stripe_version or _ApiVersion.CURRENT, base_addresses=base_addresses, max_network_retries=max_network_retries, + signing_keys=signing_keys, ) if http_client is None: diff --git a/tests/test_request_signing.py b/tests/test_request_signing.py new file mode 100644 index 000000000..3c2003211 --- /dev/null +++ b/tests/test_request_signing.py @@ -0,0 +1,318 @@ +import base64 +import hashlib +import re +from typing import Any, Dict +from unittest.mock import patch + +import pytest + +import stripe + +pytest.importorskip( + "cryptography", reason="cryptography not available (PyPy <= 3.10)" +) + + +from cryptography.hazmat.primitives.asymmetric.ec import ( # noqa: E402 + ECDSA, + SECP256R1, + generate_private_key, +) +from cryptography.hazmat.primitives.hashes import SHA256 # noqa: E402 +from cryptography.hazmat.primitives.serialization import ( # noqa: E402 + Encoding, + NoEncryption, + PrivateFormat, +) +from cryptography.hazmat.primitives.asymmetric.utils import ( # noqa: E402 + encode_dss_signature, +) + +from stripe._request_signing_authenticator import ( # noqa: E402 + RequestSigningAuthenticator, + RequestSigningOptions, +) + + +@pytest.fixture +def key_pair(): + private_key = generate_private_key(SECP256R1()) + pem = private_key.private_bytes( + Encoding.PEM, PrivateFormat.PKCS8, NoEncryption() + ) + return private_key, pem + + +@pytest.fixture +def authenticator(key_pair): + _, pem = key_pair + return RequestSigningAuthenticator( + RequestSigningOptions(key_id="keyid_test_abc123", private_key=pem) + ) + + +def _make_headers(**extra: str) -> Dict[str, str]: + return {"Authorization": "Bearer sk_test_xxx", **extra} + + +class TestRequestSigningAuthenticator: + def test_authorization_header_format(self, authenticator): + headers = _make_headers() + authenticator.apply_signing_headers( + "post", + "https://api.stripe.com/v1/customers", + headers, + "email=test%40example.com", + ) + assert headers["Authorization"] == "STRIPE-V1-SIG keyid_test_abc123" + + def test_content_digest_for_known_body(self, authenticator): + body = "email=test%40example.com" + expected = base64.b64encode( + hashlib.sha256(body.encode()).digest() + ).decode() + headers = _make_headers() + authenticator.apply_signing_headers( + "post", "https://api.stripe.com/v1/customers", headers, body + ) + assert headers["Content-Digest"] == f"sha-256=:{expected}:" + + def test_content_digest_for_empty_body(self, authenticator): + expected = base64.b64encode(hashlib.sha256(b"").digest()).decode() + headers = _make_headers() + authenticator.apply_signing_headers( + "get", "https://api.stripe.com/v1/customers", headers, None + ) + assert headers["Content-Digest"] == f"sha-256=:{expected}:" + + def test_date_header_added(self, authenticator): + headers = _make_headers() + authenticator.apply_signing_headers( + "get", "https://api.stripe.com/v1/customers", headers, None + ) + assert "Date" in headers + + def test_date_header_not_overwritten(self, authenticator): + existing_date = "Wed, 28 Sep 2022 21:40:47 GMT" + headers = _make_headers(Date=existing_date) + authenticator.apply_signing_headers( + "get", "https://api.stripe.com/v1/customers", headers, None + ) + assert headers["Date"] == existing_date + + def test_signature_and_signature_input_headers_set(self, authenticator): + headers = _make_headers() + authenticator.apply_signing_headers( + "post", "https://api.stripe.com/v1/customers", headers, "name=test" + ) + assert "Signature" in headers + assert "Signature-Input" in headers + assert headers["Signature"].startswith("sig1=:") + assert headers["Signature"].endswith(":") + assert headers["Signature-Input"].startswith("sig1=(") + + def test_signature_input_contains_required_components(self, authenticator): + headers = _make_headers() + authenticator.apply_signing_headers( + "post", "https://api.stripe.com/v1/customers", headers, "name=test" + ) + sig_input = headers["Signature-Input"] + assert '"@method"' in sig_input + assert '"@target-uri"' in sig_input + assert '"authorization"' in sig_input + assert '"content-digest"' in sig_input + + def test_signature_input_includes_stripe_account_when_present( + self, authenticator + ): + headers = _make_headers(**{"Stripe-Account": "acct_123"}) + authenticator.apply_signing_headers( + "post", "https://api.stripe.com/v1/customers", headers, "name=test" + ) + assert '"stripe-account"' in headers["Signature-Input"] + + def test_signature_input_excludes_stripe_account_when_absent( + self, authenticator + ): + headers = _make_headers() + authenticator.apply_signing_headers( + "post", "https://api.stripe.com/v1/customers", headers, "name=test" + ) + assert '"stripe-account"' not in headers["Signature-Input"] + + def test_signature_params_format(self, authenticator): + headers = _make_headers() + authenticator.apply_signing_headers( + "post", "https://api.stripe.com/v1/customers", headers, "" + ) + sig_input = headers["Signature-Input"] + assert re.search(r"created=\d+", sig_input) + assert re.search(r"expires=\d+", sig_input) + assert re.search(r'nonce="[A-Za-z0-9+/=]+"', sig_input) + assert 'alg="ecdsa-p256-sha256"' in sig_input + + def test_expires_is_300_seconds_after_created(self, authenticator): + headers = _make_headers() + authenticator.apply_signing_headers( + "post", "https://api.stripe.com/v1/customers", headers, "" + ) + sig_input = headers["Signature-Input"] + created = int(re.search(r"created=(\d+)", sig_input).group(1)) # type: ignore + expires = int(re.search(r"expires=(\d+)", sig_input).group(1)) # type: ignore + assert expires - created == 300 + + def test_signature_is_valid(self, key_pair, authenticator): + private_key, _ = key_pair + public_key = private_key.public_key() + body = "email=test%40example.com" + headers = _make_headers(**{"Stripe-Account": "acct_test"}) + authenticator.apply_signing_headers( + "post", "https://api.stripe.com/v1/customers", headers, body + ) + + sig_b64 = headers["Signature"][len("sig1=:") :][:-1] + raw_sig = base64.b64decode(sig_b64) + assert len(raw_sig) == 64 + + r = int.from_bytes(raw_sig[:32], "big") + s = int.from_bytes(raw_sig[32:], "big") + der_sig = encode_dss_signature(r, s) + + sig_params_value = headers["Signature-Input"][len("sig1=") :] + covered = re.search(r"\(([^)]*)\)", sig_params_value).group(1) # type: ignore + component_names = [c.strip('"') for c in covered.split()] + + lines = [] + for name in component_names: + if name == "@method": + lines.append('"@method": POST') + elif name == "@target-uri": + lines.append( + '"@target-uri": https://api.stripe.com/v1/customers' + ) + else: + value = next( + v for k, v in headers.items() if k.lower() == name + ) + lines.append(f'"{name}": {value}') + lines.append(f'"@signature-params": {sig_params_value}') + sig_base = "\n".join(lines).encode("utf-8") + + public_key.verify(der_sig, sig_base, ECDSA(SHA256())) + + def test_accepts_pem_as_string(self, key_pair): + _, pem_bytes = key_pair + auth = RequestSigningAuthenticator( + RequestSigningOptions( + key_id="keyid_test_abc", + private_key=pem_bytes.decode("utf-8"), + ) + ) + headers = _make_headers() + auth.apply_signing_headers( + "get", "https://api.stripe.com/v1/customers", headers, None + ) + assert headers["Authorization"] == "STRIPE-V1-SIG keyid_test_abc" + + def test_rejects_non_p256_key(self): + from cryptography.hazmat.primitives.asymmetric.ec import SECP384R1 + + wrong_key = generate_private_key(SECP384R1()) + pem = wrong_key.private_bytes( + Encoding.PEM, PrivateFormat.PKCS8, NoEncryption() + ) + with pytest.raises(ValueError, match="secp256r1"): + RequestSigningAuthenticator( + RequestSigningOptions(key_id="keyid_test", private_key=pem) + ) + + def test_import_error_without_cryptography(self, key_pair): + _, pem = key_pair + with patch.dict( + "sys.modules", + { + "cryptography": None, + "cryptography.hazmat": None, + "cryptography.hazmat.primitives": None, + "cryptography.hazmat.primitives.serialization": None, + "cryptography.hazmat.primitives.asymmetric": None, + "cryptography.hazmat.primitives.asymmetric.ec": None, + }, + ): + with pytest.raises( + ImportError, match="pip install stripe\\[request-signing\\]" + ): + RequestSigningAuthenticator( + RequestSigningOptions(key_id="keyid_test", private_key=pem) + ) + + +class TestStripeClientIntegration: + def test_api_key_only_uses_bearer_auth(self): + client = stripe.StripeClient( + "sk_test_123", + base_addresses={"api": "http://localhost"}, + ) + assert client._requestor._authenticator is None + + def test_signing_only_creates_authenticator(self, key_pair): + _, pem = key_pair + client = stripe.StripeClient( + signing_keys=RequestSigningOptions( + key_id="keyid_test_abc", private_key=pem + ), + base_addresses={"api": "http://localhost"}, + ) + assert client._requestor._authenticator is not None + + def test_both_api_key_and_signing_raises(self, key_pair): + _, pem = key_pair + with pytest.raises(ValueError, match="Cannot use both"): + stripe.StripeClient( + "sk_test_123", + signing_keys=RequestSigningOptions( + key_id="keyid_test_abc", private_key=pem + ), + ) + + def test_neither_api_key_nor_signing_raises(self): + with pytest.raises(stripe.AuthenticationError): + stripe.StripeClient() + + def test_request_uses_signing_headers(self, key_pair): + _, pem = key_pair + captured: Dict[str, Any] = {} + + class CapturingHttpClient(stripe.HTTPClient): + name = "capturing" + + def request(self, method, url, headers, post_data=None): + captured["headers"] = dict(headers) + return "{}", 200, {"Content-Type": "application/json"} + + def request_stream(self, method, url, headers, post_data=None): + raise NotImplementedError + + client = stripe.StripeClient( + signing_keys=RequestSigningOptions( + key_id="keyid_test_my_key", private_key=pem + ), + http_client=CapturingHttpClient(), + base_addresses={"api": "https://api.stripe.com"}, + ) + + try: + client.v1.customers.retrieve("cus_123") + except Exception: + pass + + if captured.get("headers"): + h = captured["headers"] + assert h.get("Authorization", "").startswith("STRIPE-V1-SIG ") + assert "Bearer" not in h.get("Authorization", "") + assert "Content-Digest" in h + assert "Signature-Input" in h + assert "Signature" in h + + def test_request_signing_options_exported_from_stripe(self): + assert hasattr(stripe, "RequestSigningOptions") diff --git a/tests/test_requestor_options.py b/tests/test_requestor_options.py index b818d39a8..9057fdb2d 100644 --- a/tests/test_requestor_options.py +++ b/tests/test_requestor_options.py @@ -30,6 +30,7 @@ def test_to_dict(self): "files": "https://files.example.com", }, "max_network_retries": 3, + "signing_keys": None, } def test_global_options_get_updated(