Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions deps/test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ pytest-mock >= 2.0.0
mock >= 4.0; python_version < "3.8"
pytest-xdist >= 1.31.0
pytest >= 6.0.0,<9.0.0
# pyO3 only supports pypy 3.11 (or any cpython version)
cryptography >= 43.0.0; platform_python_implementation != "PyPy" or python_version >= "3.11"
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ dependencies = [
[project.optional-dependencies]
# `pip install stripe[async]` gets you everything + `httpx`, so our async stuff works out of the box
async = ["httpx"]
# `pip install stripe[request-signing]` enables request signing with ECDSA P-256 keys
request-signing = ["cryptography >= 43.0"]

[project.urls]
homepage = "https://stripe.com/"
Expand Down
5 changes: 5 additions & 0 deletions stripe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,9 @@ def set_app_info(
from stripe._refund_service import RefundService as RefundService
from stripe._reporting_service import ReportingService as ReportingService
from stripe._request_options import RequestOptions as RequestOptions
from stripe._request_signing_authenticator import (
RequestSigningOptions as RequestSigningOptions,
)
from stripe._requestor_options import RequestorOptions as RequestorOptions
from stripe._reserve_transaction import (
ReserveTransaction as ReserveTransaction,
Expand Down Expand Up @@ -801,6 +804,8 @@ def set_app_info(
"RefundService": ("stripe._refund_service", False),
"ReportingService": ("stripe._reporting_service", False),
"RequestOptions": ("stripe._request_options", False),
# TODO: move this to codegen
"RequestSigningOptions": ("stripe._request_signing_authenticator", False),
"RequestorOptions": ("stripe._requestor_options", False),
"ReserveTransaction": ("stripe._reserve_transaction", False),
"Reversal": ("stripe._reversal", False),
Expand Down
40 changes: 30 additions & 10 deletions stripe/_api_requestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@
if TYPE_CHECKING:
from stripe._app_info import AppInfo
from stripe._stripe_object import StripeObject
from stripe._request_signing_authenticator import (
RequestSigningAuthenticator,
)

HttpVerb = Literal["get", "post", "delete"]

Expand All @@ -78,6 +81,7 @@ def is_v2_delete_resp(method: str, api_mode: ApiMode) -> bool:

class _APIRequestor(object):
_instance: ClassVar["_APIRequestor|None"] = None
_authenticator: Optional["RequestSigningAuthenticator"] = None

def __init__(
self,
Expand All @@ -89,6 +93,15 @@ def __init__(
self._options = options
self._client = client

if options.signing_keys is not None:
from stripe._request_signing_authenticator import (
RequestSigningAuthenticator,
)

self._authenticator = RequestSigningAuthenticator(
options.signing_keys
)

# In the case of client=None, we should use the current value of stripe.default_http_client
# or lazily initialize it. Since stripe.default_http_client can change throughout the lifetime of
# an _APIRequestor, we shouldn't set it as stripe._client and should access it only through this
Expand Down Expand Up @@ -534,9 +547,12 @@ def request_headers(
headers: Dict[str, str] = {
"X-Stripe-Client-User-Agent": json.dumps(ua),
"User-Agent": user_agent,
"Authorization": "Bearer %s" % (options.get("api_key"),),
}

api_key = options.get("api_key")
if api_key is not None:
headers["Authorization"] = "Bearer %s" % (api_key,)

stripe_account = options.get("stripe_account")
if stripe_account:
headers["Stripe-Account"] = stripe_account
Expand Down Expand Up @@ -590,15 +606,6 @@ def _args_for_request_with_retries(
# If user specified an API version, honor it
request_options["stripe_version"] = options["stripe_version"]

if request_options.get("api_key") is None:
raise error.AuthenticationError(
"No API key provided. (HINT: set your API key using "
'"stripe.api_key = <API-KEY>"). You can generate API keys '
"from the Stripe web interface. See https://stripe.com/api "
"for details, or email support@stripe.com if you have any "
"questions."
)

abs_url = "%s%s" % (
self._options.base_addresses.get(base_address),
url,
Expand Down Expand Up @@ -697,6 +704,19 @@ def _args_for_request_with_retries(

max_network_retries = request_options.get("max_network_retries")

if self._authenticator is not None:
headers = self._authenticator.apply_signing_headers(
method, abs_url, headers, post_data
)

if not headers.get("Authorization"):
raise error.AuthenticationError(
"No API key or request signing mechanism provided. You can generate API keys "
"from the Stripe web interface. See https://stripe.com/api "
"for details, or email support@stripe.com if you have any "
"questions."
)

return (
# Actual args
method,
Expand Down
168 changes: 168 additions & 0 deletions stripe/_request_signing_authenticator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import base64
import hashlib
import os
import time
from email.utils import formatdate
from typing import Dict, List, Optional, Union
from typing_extensions import TypedDict


class RequestSigningOptions(TypedDict):
"""Options for ECDSA P-256 request signing. Requires `pip install stripe[request-signing]`."""

key_id: str
"""The signing key ID from the Stripe dashboard (starts with `keyid_`)."""

private_key: Union[str, bytes]
"""PEM-encoded ECDSA P-256 (secp256r1) private key."""


class RequestSigningAuthenticator:
"""
Authenticates requests using ECDSA P-256 request signing (RFC 9421 patterns).
Used as an alternative to Bearer token auth.

Requires the `cryptography` package: `pip install stripe[request-signing]`
"""

_SIGNATURE_VALIDITY_SECONDS = 300
_ALGORITHM = "ecdsa-p256-sha256"

def __init__(self, options: RequestSigningOptions):
try:
from cryptography.hazmat.primitives.serialization import (
load_pem_private_key,
)
from cryptography.hazmat.primitives.asymmetric.ec import (
EllipticCurvePrivateKey,
SECP256R1,
)
except ImportError:
raise ImportError(
"The `cryptography` package is required for request signing. "
"Install it with: `pip install stripe[request-signing]`"
)

pem = options["private_key"]
if isinstance(pem, str):
pem = pem.encode("utf-8")

private_key = load_pem_private_key(pem, password=None)

if not isinstance(
private_key, EllipticCurvePrivateKey
) or not isinstance(private_key.curve, SECP256R1):
raise ValueError(
"private_key must be an ECDSA P-256 (secp256r1) private key"
)

self._signing_key = options["key_id"]
self._private_key = private_key

def apply_signing_headers(
self,
method: str,
abs_url: str,
headers: Dict[str, str],
body: Optional[Union[str, bytes]],
) -> Dict[str, str]:
"""
Adds request signing headers to an outgoing request, replacing the
Authorization header with STRIPE-V1-SIG scheme.
"""
headers["Authorization"] = f"STRIPE-V1-SIG {self._signing_key}"

if isinstance(body, str):
body_bytes = body.encode("utf-8")
elif body is not None:
body_bytes = bytes(body)
else:
body_bytes = b""
digest = base64.b64encode(hashlib.sha256(body_bytes).digest()).decode(
"ascii"
)
headers["Content-Digest"] = f"sha-256=:{digest}:"

if "Date" not in headers:
headers["Date"] = formatdate(
timeval=None, localtime=False, usegmt=True
)

now = int(time.time())
nonce = base64.b64encode(os.urandom(16)).decode("ascii")
covered = self._covered_components(headers)
sig_params = self._signature_params(covered, now, nonce)
sig_base = self._signature_base(
method, abs_url, headers, covered, sig_params
)

raw_sig = self._sign(sig_base.encode("utf-8"))
encoded_sig = base64.b64encode(raw_sig).decode("ascii")
headers["Signature-Input"] = f"sig1={sig_params}"
headers["Signature"] = f"sig1=:{encoded_sig}:"

return headers

def _covered_components(self, headers: Dict[str, str]) -> List[str]:
components = [
'"@method"',
'"@target-uri"',
'"authorization"',
'"content-digest"',
]
if "Stripe-Account" in headers:
components.append('"stripe-account"')
if "Stripe-Context" in headers:
components.append('"stripe-context"')
if "Date" in headers:
components.append('"date"')
return components

def _signature_params(
self, covered: List[str], created: int, nonce: str
) -> str:
expires = created + self._SIGNATURE_VALIDITY_SECONDS
return (
f"({' '.join(covered)});created={created};expires={expires};"
f'nonce="{nonce}";alg="{self._ALGORITHM}"'
)

def _signature_base(
self,
method: str,
abs_url: str,
headers: Dict[str, str],
covered: List[str],
sig_params: str,
) -> str:
lines: List[str] = []
for component in covered:
name = component.strip('"')
if name == "@method":
lines.append(f'"@method": {method.upper()}')
elif name == "@target-uri":
lines.append(f'"@target-uri": {abs_url}')
else:
value = self._header_value(headers, name)
lines.append(f'"{name}": {value}')
lines.append(f'"@signature-params": {sig_params}')
return "\n".join(lines)

def _header_value(self, headers: Dict[str, str], name: str) -> str:
name_lower = name.lower()
for key, value in headers.items():
if key.lower() == name_lower:
return value
return ""

def _sign(self, data: bytes) -> bytes:
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA
from cryptography.hazmat.primitives.hashes import SHA256
from cryptography.hazmat.primitives.asymmetric.utils import (
decode_dss_signature,
)

der_sig = self._private_key.sign(data, ECDSA(SHA256()))
r, s = decode_dss_signature(der_sig)
# Convert to fixed 64-byte raw (r||s) format
return r.to_bytes(32, "big") + s.to_bytes(32, "big")
9 changes: 9 additions & 0 deletions stripe/_requestor_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing_extensions import TYPE_CHECKING

if TYPE_CHECKING:
from stripe._request_signing_authenticator import RequestSigningOptions
from stripe._stripe_context import StripeContext


Expand All @@ -16,6 +17,7 @@ class RequestorOptions(object):
stripe_version: Optional[str]
base_addresses: BaseAddresses
max_network_retries: Optional[int]
signing_keys: Optional["RequestSigningOptions"]

def __init__(
self,
Expand All @@ -25,12 +27,14 @@ def __init__(
stripe_version: Optional[str] = None,
base_addresses: Optional[BaseAddresses] = None,
max_network_retries: Optional[int] = None,
signing_keys: Optional["RequestSigningOptions"] = None,
):
self.api_key = api_key
self.stripe_account = stripe_account
self.stripe_context = stripe_context
self.stripe_version = stripe_version
self.base_addresses = {}
self.signing_keys = signing_keys

if base_addresses:
# Base addresses can be unset (for correct merging).
Expand Down Expand Up @@ -59,6 +63,7 @@ def to_dict(self):
"stripe_version": self.stripe_version,
"base_addresses": self.base_addresses,
"max_network_retries": self.max_network_retries,
"signing_keys": self.signing_keys,
}


Expand Down Expand Up @@ -94,3 +99,7 @@ def stripe_context(self):
@property
def max_network_retries(self):
return stripe.max_network_retries

@property
def signing_keys(self):
return None
19 changes: 13 additions & 6 deletions stripe/_stripe_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from stripe._error import AuthenticationError
from stripe._request_options import extract_options_from_dict
from stripe._requestor_options import RequestorOptions, BaseAddresses
from stripe._request_signing_authenticator import RequestSigningOptions
from stripe._client_options import _ClientOptions
from stripe._http_client import (
new_default_http_client,
Expand Down Expand Up @@ -136,7 +137,7 @@
class StripeClient(object):
def __init__(
self,
api_key: str,
api_key: Optional[str] = None,
*,
stripe_account: Optional[str] = None,
stripe_context: "Optional[Union[str, StripeContext]]" = None,
Expand All @@ -147,13 +148,18 @@ def __init__(
proxy: Optional[str] = None,
max_network_retries: Optional[int] = None,
http_client: Optional["HTTPClient"] = None,
signing_keys: "Optional[RequestSigningOptions]" = None,
):
# The types forbid this, but let's give users without types a friendly error.
if api_key is None: # pyright: ignore[reportUnnecessaryComparison]
if api_key is not None and signing_keys is not None:
raise ValueError(
"Cannot use both `api_key` and `signing_keys`. "
"Request signing_keys replaces API key authentication."
)

if api_key is None and signing_keys is None:
raise AuthenticationError(
"No API key provided. (HINT: set your API key using "
'"client = stripe.StripeClient(<API-KEY>)"). You can '
"generate API keys from the Stripe web interface. "
"No authentication provided. Supply an `api_key` or a `signing_keys` "
"config. You can generate API keys from the Stripe web interface. "
"See https://stripe.com/api for details, or email "
"support@stripe.com if you have any questions."
)
Expand Down Expand Up @@ -182,6 +188,7 @@ def __init__(
stripe_version=stripe_version or _ApiVersion.CURRENT,
base_addresses=base_addresses,
max_network_retries=max_network_retries,
signing_keys=signing_keys,
)

if http_client is None:
Expand Down
Loading
Loading