diff --git a/.github/workflows/python-e2e.yml b/.github/workflows/python-e2e.yml
new file mode 100644
index 0000000..fedeba1
--- /dev/null
+++ b/.github/workflows/python-e2e.yml
@@ -0,0 +1,335 @@
+name: Python E2E
+
+on:
+ push:
+ pull_request:
+ branches: ["main"]
+ workflow_dispatch:
+
+permissions:
+ contents: read
+
+concurrency:
+ group: python-e2e-${{ github.ref }}
+ cancel-in-progress: true
+
+jobs:
+ e2e-test-token:
+ name: E2E TEST (token)
+ if: ${{ github.event_name != 'pull_request' || github.event.pull_request.head.repo.fork == false }}
+ runs-on: ubuntu-latest
+ timeout-minutes: 35
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up Python 3.12
+ uses: actions/setup-python@v5
+ with:
+ python-version: "3.12"
+ cache: pip
+ cache-dependency-path: |
+ requirements.txt
+ requirements-dev.txt
+ pyproject.toml
+
+ - name: Install dependencies
+ run: |
+ python -m pip install --upgrade pip
+ python -m pip install -r requirements-dev.txt
+
+ - name: Validate required secrets
+ shell: bash
+ env:
+ KSEF_TEST_TOKEN: ${{ secrets.KSEF_TEST_TOKEN }}
+ KSEF_TEST_CONTEXT_TYPE: ${{ secrets.KSEF_TEST_CONTEXT_TYPE }}
+ KSEF_TEST_CONTEXT_VALUE: ${{ secrets.KSEF_TEST_CONTEXT_VALUE }}
+ run: |
+ set +x
+ missing=0
+ for name in KSEF_TEST_TOKEN KSEF_TEST_CONTEXT_TYPE KSEF_TEST_CONTEXT_VALUE; do
+ if [ -z "${!name}" ]; then
+ echo "::error::Missing required secret: ${name}"
+ missing=1
+ fi
+ done
+ if [ "${missing}" -ne 0 ]; then
+ exit 1
+ fi
+
+ - name: Mask sensitive values
+ shell: bash
+ env:
+ KSEF_TEST_TOKEN: ${{ secrets.KSEF_TEST_TOKEN }}
+ KSEF_TEST_CONTEXT_TYPE: ${{ secrets.KSEF_TEST_CONTEXT_TYPE }}
+ KSEF_TEST_CONTEXT_VALUE: ${{ secrets.KSEF_TEST_CONTEXT_VALUE }}
+ run: |
+ set +x
+ for value in "${KSEF_TEST_TOKEN}" "${KSEF_TEST_CONTEXT_TYPE}" "${KSEF_TEST_CONTEXT_VALUE}"; do
+ if [ -n "${value}" ]; then
+ echo "::add-mask::${value}"
+ fi
+ done
+
+ - name: Run E2E TEST (token)
+ env:
+ KSEF_E2E: "1"
+ KSEF_TEST_BASE_URL: https://api-test.ksef.mf.gov.pl
+ KSEF_TEST_TOKEN: ${{ secrets.KSEF_TEST_TOKEN }}
+ KSEF_TEST_CONTEXT_TYPE: ${{ secrets.KSEF_TEST_CONTEXT_TYPE }}
+ KSEF_TEST_CONTEXT_VALUE: ${{ secrets.KSEF_TEST_CONTEXT_VALUE }}
+ KSEF_TEST_SUBJECT_TYPE: Subject1
+ run: |
+ python -m pytest -q --maxfail=1 --disable-warnings \
+ tests/test_e2e_token_flows.py::test_e2e_test_environment_full_flow_token
+
+ e2e-test-xades:
+ name: E2E TEST (xades)
+ if: ${{ github.event_name != 'pull_request' || github.event.pull_request.head.repo.fork == false }}
+ runs-on: ubuntu-latest
+ timeout-minutes: 40
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up Python 3.12
+ uses: actions/setup-python@v5
+ with:
+ python-version: "3.12"
+ cache: pip
+ cache-dependency-path: |
+ requirements.txt
+ requirements-dev.txt
+ pyproject.toml
+
+ - name: Install dependencies
+ run: |
+ python -m pip install --upgrade pip
+ python -m pip install -r requirements-dev.txt
+ python -m pip install -e ".[xml]"
+
+ - name: Validate required secrets
+ shell: bash
+ env:
+ KSEF_TEST_CONTEXT_TYPE: ${{ secrets.KSEF_TEST_CONTEXT_TYPE }}
+ KSEF_TEST_CONTEXT_VALUE: ${{ secrets.KSEF_TEST_CONTEXT_VALUE }}
+ KSEF_TEST_XADES_CERT_CRT: ${{ secrets.KSEF_TEST_XADES_CERT_CRT }}
+ KSEF_TEST_XADES_CERT_CRT_B64: ${{ secrets.KSEF_TEST_XADES_CERT_CRT_B64 }}
+ KSEF_TEST_XADES_PRIVATE_KEY_PEM: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PEM }}
+ KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64 }}
+ KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD }}
+ run: |
+ set +x
+ missing=0
+ for name in KSEF_TEST_CONTEXT_TYPE KSEF_TEST_CONTEXT_VALUE; do
+ if [ -z "${!name}" ]; then
+ echo "::error::Missing required secret: ${name}"
+ missing=1
+ fi
+ done
+ if [ -z "${KSEF_TEST_XADES_CERT_CRT}" ] && [ -z "${KSEF_TEST_XADES_CERT_CRT_B64}" ]; then
+ echo "::error::Missing required secret: KSEF_TEST_XADES_CERT_CRT or KSEF_TEST_XADES_CERT_CRT_B64"
+ missing=1
+ fi
+ if [ -z "${KSEF_TEST_XADES_PRIVATE_KEY_PEM}" ] && [ -z "${KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64}" ]; then
+ echo "::error::Missing required secret: KSEF_TEST_XADES_PRIVATE_KEY_PEM or KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64"
+ missing=1
+ fi
+ if [ "${missing}" -ne 0 ]; then
+ exit 1
+ fi
+
+ - name: Mask sensitive values
+ shell: bash
+ env:
+ KSEF_TEST_CONTEXT_TYPE: ${{ secrets.KSEF_TEST_CONTEXT_TYPE }}
+ KSEF_TEST_CONTEXT_VALUE: ${{ secrets.KSEF_TEST_CONTEXT_VALUE }}
+ KSEF_TEST_XADES_CERT_CRT: ${{ secrets.KSEF_TEST_XADES_CERT_CRT }}
+ KSEF_TEST_XADES_CERT_CRT_B64: ${{ secrets.KSEF_TEST_XADES_CERT_CRT_B64 }}
+ KSEF_TEST_XADES_PRIVATE_KEY_PEM: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PEM }}
+ KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64 }}
+ KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD }}
+ run: |
+ set +x
+ for value in "${KSEF_TEST_CONTEXT_TYPE}" "${KSEF_TEST_CONTEXT_VALUE}" "${KSEF_TEST_XADES_CERT_CRT}" "${KSEF_TEST_XADES_CERT_CRT_B64}" "${KSEF_TEST_XADES_PRIVATE_KEY_PEM}" "${KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64}" "${KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD}"; do
+ if [ -n "${value}" ]; then
+ echo "::add-mask::${value}"
+ fi
+ done
+
+ - name: Run E2E TEST (xades)
+ env:
+ KSEF_E2E: "1"
+ KSEF_TEST_BASE_URL: https://api-test.ksef.mf.gov.pl
+ KSEF_TEST_CONTEXT_TYPE: ${{ secrets.KSEF_TEST_CONTEXT_TYPE }}
+ KSEF_TEST_CONTEXT_VALUE: ${{ secrets.KSEF_TEST_CONTEXT_VALUE }}
+ KSEF_TEST_SUBJECT_TYPE: Subject1
+ KSEF_TEST_XADES_CERT_CRT: ${{ secrets.KSEF_TEST_XADES_CERT_CRT }}
+ KSEF_TEST_XADES_CERT_CRT_B64: ${{ secrets.KSEF_TEST_XADES_CERT_CRT_B64 }}
+ KSEF_TEST_XADES_PRIVATE_KEY_PEM: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PEM }}
+ KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64 }}
+ KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD }}
+ KSEF_TEST_XADES_SUBJECT_IDENTIFIER_TYPE: certificateSubject
+ run: |
+ python -m pytest -q --maxfail=1 --disable-warnings \
+ tests/test_e2e_token_flows.py::test_e2e_test_environment_full_flow_xades
+
+ e2e-demo-token:
+ name: E2E DEMO (token)
+ if: ${{ github.event_name != 'pull_request' || github.event.pull_request.head.repo.fork == false }}
+ runs-on: ubuntu-latest
+ timeout-minutes: 35
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up Python 3.12
+ uses: actions/setup-python@v5
+ with:
+ python-version: "3.12"
+ cache: pip
+ cache-dependency-path: |
+ requirements.txt
+ requirements-dev.txt
+ pyproject.toml
+
+ - name: Install dependencies
+ run: |
+ python -m pip install --upgrade pip
+ python -m pip install -r requirements-dev.txt
+
+ - name: Validate required secrets
+ shell: bash
+ env:
+ KSEF_DEMO_TOKEN: ${{ secrets.KSEF_DEMO_TOKEN }}
+ KSEF_DEMO_CONTEXT_TYPE: ${{ secrets.KSEF_DEMO_CONTEXT_TYPE }}
+ KSEF_DEMO_CONTEXT_VALUE: ${{ secrets.KSEF_DEMO_CONTEXT_VALUE }}
+ run: |
+ set +x
+ missing=0
+ for name in KSEF_DEMO_TOKEN KSEF_DEMO_CONTEXT_TYPE KSEF_DEMO_CONTEXT_VALUE; do
+ if [ -z "${!name}" ]; then
+ echo "::error::Missing required secret: ${name}"
+ missing=1
+ fi
+ done
+ if [ "${missing}" -ne 0 ]; then
+ exit 1
+ fi
+
+ - name: Mask sensitive values
+ shell: bash
+ env:
+ KSEF_DEMO_TOKEN: ${{ secrets.KSEF_DEMO_TOKEN }}
+ KSEF_DEMO_CONTEXT_TYPE: ${{ secrets.KSEF_DEMO_CONTEXT_TYPE }}
+ KSEF_DEMO_CONTEXT_VALUE: ${{ secrets.KSEF_DEMO_CONTEXT_VALUE }}
+ run: |
+ set +x
+ for value in "${KSEF_DEMO_TOKEN}" "${KSEF_DEMO_CONTEXT_TYPE}" "${KSEF_DEMO_CONTEXT_VALUE}"; do
+ if [ -n "${value}" ]; then
+ echo "::add-mask::${value}"
+ fi
+ done
+
+ - name: Run E2E DEMO (token)
+ env:
+ KSEF_E2E: "1"
+ KSEF_DEMO_BASE_URL: https://api-demo.ksef.mf.gov.pl
+ KSEF_DEMO_TOKEN: ${{ secrets.KSEF_DEMO_TOKEN }}
+ KSEF_DEMO_CONTEXT_TYPE: ${{ secrets.KSEF_DEMO_CONTEXT_TYPE }}
+ KSEF_DEMO_CONTEXT_VALUE: ${{ secrets.KSEF_DEMO_CONTEXT_VALUE }}
+ KSEF_DEMO_SUBJECT_TYPE: Subject1
+ run: |
+ python -m pytest -q --maxfail=1 --disable-warnings \
+ tests/test_e2e_token_flows.py::test_e2e_demo_environment_full_flow_token
+
+ e2e-demo-xades:
+ name: E2E DEMO (xades)
+ if: ${{ github.event_name != 'pull_request' || github.event.pull_request.head.repo.fork == false }}
+ runs-on: ubuntu-latest
+ timeout-minutes: 40
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up Python 3.12
+ uses: actions/setup-python@v5
+ with:
+ python-version: "3.12"
+ cache: pip
+ cache-dependency-path: |
+ requirements.txt
+ requirements-dev.txt
+ pyproject.toml
+
+ - name: Install dependencies
+ run: |
+ python -m pip install --upgrade pip
+ python -m pip install -r requirements-dev.txt
+ python -m pip install -e ".[xml]"
+
+ - name: Validate required secrets
+ shell: bash
+ env:
+ KSEF_DEMO_CONTEXT_TYPE: ${{ secrets.KSEF_DEMO_CONTEXT_TYPE }}
+ KSEF_DEMO_CONTEXT_VALUE: ${{ secrets.KSEF_DEMO_CONTEXT_VALUE }}
+ KSEF_DEMO_XADES_CERT_CRT: ${{ secrets.KSEF_DEMO_XADES_CERT_CRT }}
+ KSEF_DEMO_XADES_CERT_CRT_B64: ${{ secrets.KSEF_DEMO_XADES_CERT_CRT_B64 }}
+ KSEF_DEMO_XADES_PRIVATE_KEY_PEM: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PEM }}
+ KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64 }}
+ KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD }}
+ run: |
+ set +x
+ missing=0
+ for name in KSEF_DEMO_CONTEXT_TYPE KSEF_DEMO_CONTEXT_VALUE; do
+ if [ -z "${!name}" ]; then
+ echo "::error::Missing required secret: ${name}"
+ missing=1
+ fi
+ done
+ if [ -z "${KSEF_DEMO_XADES_CERT_CRT}" ] && [ -z "${KSEF_DEMO_XADES_CERT_CRT_B64}" ]; then
+ echo "::error::Missing required secret: KSEF_DEMO_XADES_CERT_CRT or KSEF_DEMO_XADES_CERT_CRT_B64"
+ missing=1
+ fi
+ if [ -z "${KSEF_DEMO_XADES_PRIVATE_KEY_PEM}" ] && [ -z "${KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64}" ]; then
+ echo "::error::Missing required secret: KSEF_DEMO_XADES_PRIVATE_KEY_PEM or KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64"
+ missing=1
+ fi
+ if [ "${missing}" -ne 0 ]; then
+ exit 1
+ fi
+
+ - name: Mask sensitive values
+ shell: bash
+ env:
+ KSEF_DEMO_CONTEXT_TYPE: ${{ secrets.KSEF_DEMO_CONTEXT_TYPE }}
+ KSEF_DEMO_CONTEXT_VALUE: ${{ secrets.KSEF_DEMO_CONTEXT_VALUE }}
+ KSEF_DEMO_XADES_CERT_CRT: ${{ secrets.KSEF_DEMO_XADES_CERT_CRT }}
+ KSEF_DEMO_XADES_CERT_CRT_B64: ${{ secrets.KSEF_DEMO_XADES_CERT_CRT_B64 }}
+ KSEF_DEMO_XADES_PRIVATE_KEY_PEM: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PEM }}
+ KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64 }}
+ KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD }}
+ run: |
+ set +x
+ for value in "${KSEF_DEMO_CONTEXT_TYPE}" "${KSEF_DEMO_CONTEXT_VALUE}" "${KSEF_DEMO_XADES_CERT_CRT}" "${KSEF_DEMO_XADES_CERT_CRT_B64}" "${KSEF_DEMO_XADES_PRIVATE_KEY_PEM}" "${KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64}" "${KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD}"; do
+ if [ -n "${value}" ]; then
+ echo "::add-mask::${value}"
+ fi
+ done
+
+ - name: Run E2E DEMO (xades)
+ env:
+ KSEF_E2E: "1"
+ KSEF_DEMO_BASE_URL: https://api-demo.ksef.mf.gov.pl
+ KSEF_DEMO_CONTEXT_TYPE: ${{ secrets.KSEF_DEMO_CONTEXT_TYPE }}
+ KSEF_DEMO_CONTEXT_VALUE: ${{ secrets.KSEF_DEMO_CONTEXT_VALUE }}
+ KSEF_DEMO_SUBJECT_TYPE: Subject1
+ KSEF_DEMO_XADES_CERT_CRT: ${{ secrets.KSEF_DEMO_XADES_CERT_CRT }}
+ KSEF_DEMO_XADES_CERT_CRT_B64: ${{ secrets.KSEF_DEMO_XADES_CERT_CRT_B64 }}
+ KSEF_DEMO_XADES_PRIVATE_KEY_PEM: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PEM }}
+ KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64 }}
+ KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD }}
+ KSEF_DEMO_XADES_SUBJECT_IDENTIFIER_TYPE: certificateSubject
+ run: |
+ python -m pytest -q --maxfail=1 --disable-warnings \
+ tests/test_e2e_token_flows.py::test_e2e_demo_environment_full_flow_xades
diff --git a/README.md b/README.md
index 7e21831..028458b 100644
--- a/README.md
+++ b/README.md
@@ -209,8 +209,9 @@ Lokalne uruchomienie (XAdES, TEST):
KSEF_E2E=1 \
KSEF_TEST_CONTEXT_TYPE=nip \
KSEF_TEST_CONTEXT_VALUE=... \
-KSEF_TEST_XADES_CERT_PEM="$(cat cert.pem)" \
+KSEF_TEST_XADES_CERT_CRT="$(cat cert.crt)" \
KSEF_TEST_XADES_PRIVATE_KEY_PEM="$(cat key.pem)" \
+KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD=... \
pytest tests/test_e2e_token_flows.py::test_e2e_test_environment_full_flow_xades
```
@@ -225,17 +226,21 @@ Workflow uruchamia się:
Repozytoryjne sekrety do ustawienia:
- `KSEF_TEST_TOKEN`, `KSEF_TEST_CONTEXT_TYPE`, `KSEF_TEST_CONTEXT_VALUE` (token TEST)
- `KSEF_DEMO_TOKEN`, `KSEF_DEMO_CONTEXT_TYPE`, `KSEF_DEMO_CONTEXT_VALUE` (token DEMO)
-- `KSEF_TEST_XADES_CERT_PEM` albo `KSEF_TEST_XADES_CERT_PEM_B64` (XAdES TEST)
+- `KSEF_TEST_XADES_CERT_CRT` albo `KSEF_TEST_XADES_CERT_CRT_B64` (XAdES TEST)
+- `KSEF_TEST_XADES_CERT_PEM` albo `KSEF_TEST_XADES_CERT_PEM_B64` (XAdES TEST, kompatybilność wsteczna)
- `KSEF_TEST_XADES_PRIVATE_KEY_PEM` albo `KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64` (XAdES TEST)
+- `KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD` opcjonalnie, wymagany dla klucza zaszyfrowanego
- `KSEF_TEST_XADES_SUBJECT_IDENTIFIER_TYPE` opcjonalnie, domyślnie `certificateSubject`
-- `KSEF_DEMO_XADES_CERT_PEM` albo `KSEF_DEMO_XADES_CERT_PEM_B64` (XAdES DEMO)
+- `KSEF_DEMO_XADES_CERT_CRT` albo `KSEF_DEMO_XADES_CERT_CRT_B64` (XAdES DEMO)
+- `KSEF_DEMO_XADES_CERT_PEM` albo `KSEF_DEMO_XADES_CERT_PEM_B64` (XAdES DEMO, kompatybilność wsteczna)
- `KSEF_DEMO_XADES_PRIVATE_KEY_PEM` albo `KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64` (XAdES DEMO)
+- `KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD` opcjonalnie, wymagany dla klucza zaszyfrowanego
- `KSEF_DEMO_XADES_SUBJECT_IDENTIFIER_TYPE` opcjonalnie, domyślnie `certificateSubject`
-Przygotowanie sekretów PEM w wariancie Base64 (jedna linia):
+Przygotowanie sekretów CRT/PEM w wariancie Base64 (jedna linia):
```bash
-base64 < cert.pem | tr -d '\n'
+base64 < cert.crt | tr -d '\n'
base64 < key.pem | tr -d '\n'
```
diff --git a/tests/test_e2e_demo_token_flow.py b/tests/test_e2e_demo_token_flow.py
deleted file mode 100644
index 872835f..0000000
--- a/tests/test_e2e_demo_token_flow.py
+++ /dev/null
@@ -1,155 +0,0 @@
-from __future__ import annotations
-
-import os
-import time
-from datetime import datetime, timedelta, timezone
-
-import pytest
-
-from ksef_client import KsefClient, KsefClientOptions
-from ksef_client.exceptions import KsefRateLimitError
-from ksef_client.services import AuthCoordinator, OnlineSessionWorkflow
-
-
-def _env(name: str) -> str:
- value = os.getenv(name)
- if not value:
- raise RuntimeError(f"Missing env var: {name}")
- return value
-
-
-def _select_cert(certs: list[dict], usage_name: str) -> str:
- for cert in certs:
- if usage_name in (cert.get("usage") or []):
- return cert["certificate"]
- raise RuntimeError(f"Missing public cert usage: {usage_name}")
-
-
-@pytest.mark.e2e
-def test_e2e_token_send_and_list_session_invoices() -> None:
- if os.getenv("KSEF_E2E") not in {"1", "true", "yes"}:
- pytest.skip("Set KSEF_E2E=1 to enable this test.")
-
- token = _env("KSEF_TOKEN")
- context_type = _env("KSEF_CONTEXT_TYPE")
- context_value = _env("KSEF_CONTEXT_VALUE")
- base_url = _env("KSEF_BASE_URL")
-
- options = KsefClientOptions(base_url=base_url)
- with KsefClient(options) as client:
- certs = client.security.get_public_key_certificates()
- token_cert = _select_cert(certs, "KsefTokenEncryption")
- symmetric_cert = _select_cert(certs, "SymmetricKeyEncryption")
-
- access_token = (
- AuthCoordinator(client.auth)
- .authenticate_with_ksef_token(
- token=token,
- public_certificate=token_cert,
- context_identifier_type=context_type,
- context_identifier_value=context_value,
- max_attempts=90,
- poll_interval_seconds=2.0,
- )
- .tokens.access_token.token
- )
-
- workflow = OnlineSessionWorkflow(client.sessions)
- session = workflow.open_session(
- form_code={"systemCode": "FA (3)", "schemaVersion": "1-0E", "value": "FA"},
- public_certificate=symmetric_cert,
- access_token=access_token,
- )
-
- now = datetime.now(timezone.utc).replace(microsecond=0)
- invoice_xml = (
- ''
- ''
- ""
- 'FA'
- "3"
- f"{now.strftime('%Y-%m-%dT%H:%M:%SZ')}"
- "pytest"
- ""
- ""
- f"{context_value}pytest seller"
- "PLxx"
- ""
- ""
- "1111111111"
- "pytest buyer"
- "PLxx"
- "buyer@example.com555777999"
- "9999999922"
- ""
- ""
- "PLN"
- f"{now.date().isoformat()}x"
- f"FA/PYTEST/{now:%m}/{now:%Y}"
- ""
- f"{now.date().replace(day=1).isoformat()}"
- f"{(now.date() + timedelta(days=14)).isoformat()}"
- ""
- "1.000.231.23"
- "2222"
- "11"
- "21"
- "VAT"
- "1xszt1.00"
- "1.001.0023"
- "0.00x0.00"
- "0.00x0.00"
- "1.23"
- ""
- f"{(now.date() + timedelta(days=14)).isoformat()}"
- "6"
- "73111111111111111111111111xPLN"
- ""
- ""
- ""
- ).encode()
-
- send_result = workflow.send_invoice(
- session_reference_number=session.session_reference_number,
- invoice_xml=invoice_xml,
- encryption_data=session.encryption_data,
- access_token=access_token,
- )
- invoice_reference_number = send_result["referenceNumber"]
-
- ksef_number = None
- for _ in range(90):
- status = client.sessions.get_session_invoice_status(
- session.session_reference_number,
- invoice_reference_number,
- access_token=access_token,
- )
- code = int(status.get("status", {}).get("code", 0))
- if code == 200:
- ksef_number = status.get("ksefNumber")
- break
- if code not in {100, 150}:
- raise AssertionError(status)
- time.sleep(2)
-
- assert ksef_number, "Expected ksefNumber in invoice status"
-
- try:
- listed = client.sessions.get_session_invoices(
- session.session_reference_number,
- access_token=access_token,
- page_size=10,
- )
- except KsefRateLimitError:
- time.sleep(5)
- listed = client.sessions.get_session_invoices(
- session.session_reference_number,
- access_token=access_token,
- page_size=10,
- )
-
- invoices = listed.get("invoices") or []
- assert any(inv.get("ksefNumber") == ksef_number for inv in invoices)
diff --git a/tests/test_e2e_token_flows.py b/tests/test_e2e_token_flows.py
new file mode 100644
index 0000000..0781d72
--- /dev/null
+++ b/tests/test_e2e_token_flows.py
@@ -0,0 +1,662 @@
+from __future__ import annotations
+
+import base64
+import os
+import time
+import uuid
+from collections.abc import Callable
+from contextlib import suppress
+from dataclasses import dataclass
+from datetime import datetime, timedelta, timezone
+from typing import Any, TypeVar
+
+import pytest
+from cryptography import x509
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives.serialization import load_der_private_key, load_pem_private_key
+
+from ksef_client import KsefClient, KsefClientOptions, KsefEnvironment
+from ksef_client.exceptions import KsefHttpError, KsefRateLimitError
+from ksef_client.services import AuthCoordinator, OnlineSessionWorkflow
+
+pytestmark = pytest.mark.e2e
+
+FORM_CODE = {"systemCode": "FA (3)", "schemaVersion": "1-0E", "value": "FA"}
+ENABLED_VALUES = {"1", "true", "yes"}
+AUTH_MODE_TOKEN = "token"
+AUTH_MODE_XADES = "xades"
+T = TypeVar("T")
+
+
+@dataclass(frozen=True)
+class E2EConfig:
+ name: str
+ base_url: str
+ context_type: str
+ context_value: str
+ subject_type: str
+ auth_mode: str
+ token: str | None = None
+ certificate_pem: str | None = None
+ private_key_pem: str | None = None
+ subject_identifier_type: str | None = None
+
+
+def _optional_any(*names: str) -> str | None:
+ for name in names:
+ value = os.getenv(name)
+ if value:
+ return value
+ return None
+
+
+def _required_any(*names: str) -> str:
+ value = _optional_any(*names)
+ if value:
+ return value
+ raise RuntimeError(f"Missing env var: {' or '.join(names)}")
+
+
+def _env_int(name: str, default: int) -> int:
+ raw = os.getenv(name)
+ if raw is None:
+ return default
+ return int(raw)
+
+
+def _env_float(name: str, default: float) -> float:
+ raw = os.getenv(name)
+ if raw is None:
+ return default
+ return float(raw)
+
+
+def _ensure_e2e_enabled() -> None:
+ if os.getenv("KSEF_E2E", "").strip().lower() not in ENABLED_VALUES:
+ pytest.skip("Set KSEF_E2E=1 to enable e2e tests.")
+
+
+def _load_test_token_config() -> E2EConfig:
+ return E2EConfig(
+ name="test",
+ base_url=_optional_any("KSEF_TEST_BASE_URL") or KsefEnvironment.TEST.value,
+ context_type=_required_any("KSEF_TEST_CONTEXT_TYPE"),
+ context_value=_required_any("KSEF_TEST_CONTEXT_VALUE"),
+ subject_type=_optional_any("KSEF_TEST_SUBJECT_TYPE") or "Subject1",
+ auth_mode=AUTH_MODE_TOKEN,
+ token=_required_any("KSEF_TEST_TOKEN"),
+ )
+
+
+def _load_demo_token_config() -> E2EConfig:
+ return E2EConfig(
+ name="demo",
+ base_url=_optional_any("KSEF_DEMO_BASE_URL", "KSEF_BASE_URL") or KsefEnvironment.DEMO.value,
+ context_type=_required_any("KSEF_DEMO_CONTEXT_TYPE", "KSEF_CONTEXT_TYPE"),
+ context_value=_required_any("KSEF_DEMO_CONTEXT_VALUE", "KSEF_CONTEXT_VALUE"),
+ subject_type=_optional_any("KSEF_DEMO_SUBJECT_TYPE", "KSEF_SUBJECT_TYPE") or "Subject1",
+ auth_mode=AUTH_MODE_TOKEN,
+ token=_required_any("KSEF_DEMO_TOKEN", "KSEF_TOKEN"),
+ )
+
+
+def _select_certificate(certs: list[dict[str, Any]], usage_name: str) -> str:
+ for cert in certs:
+ usage = cert.get("usage") or []
+ if usage_name in usage and cert.get("certificate"):
+ return str(cert["certificate"])
+ raise RuntimeError(f"Missing public cert usage: {usage_name}")
+
+
+def _with_rate_limit_retry(call: Callable[[], T]) -> T:
+ retries = _env_int("KSEF_E2E_RATE_LIMIT_RETRIES", 3)
+ delay = _env_float("KSEF_E2E_RATE_LIMIT_SLEEP_SECONDS", 5.0)
+ for attempt in range(retries):
+ try:
+ return call()
+ except KsefRateLimitError:
+ if attempt == retries - 1:
+ raise
+ time.sleep(delay)
+ raise RuntimeError("Rate limit retry loop terminated unexpectedly.")
+
+
+def _poll_for_ksef_number(
+ client: KsefClient,
+ session_reference_number: str,
+ invoice_reference_number: str,
+ access_token: str,
+) -> str:
+ max_attempts = _env_int("KSEF_E2E_INVOICE_MAX_ATTEMPTS", 90)
+ poll_interval = _env_float("KSEF_E2E_POLL_INTERVAL_SECONDS", 2.0)
+ for _ in range(max_attempts):
+ status = _with_rate_limit_retry(
+ lambda: client.sessions.get_session_invoice_status(
+ session_reference_number,
+ invoice_reference_number,
+ access_token=access_token,
+ )
+ )
+ code = int((status.get("status") or {}).get("code", 0))
+ if code == 200:
+ ksef_number = status.get("ksefNumber")
+ if isinstance(ksef_number, str) and ksef_number:
+ return ksef_number
+ raise AssertionError("Invoice accepted but ksefNumber is missing.")
+ if code not in {100, 150}:
+ raise AssertionError(f"Invoice processing failed with status code {code}.")
+ time.sleep(poll_interval)
+ raise TimeoutError("Invoice processing did not complete within max_attempts.")
+
+
+def _poll_for_upo(fetch: Callable[[], bytes]) -> bytes:
+ max_attempts = _env_int("KSEF_E2E_UPO_MAX_ATTEMPTS", 45)
+ poll_interval = _env_float("KSEF_E2E_POLL_INTERVAL_SECONDS", 2.0)
+ transient_codes = {404, 409, 425}
+
+ for _ in range(max_attempts):
+ try:
+ upo = _with_rate_limit_retry(fetch)
+ if upo:
+ return upo
+ except KsefHttpError as error:
+ if error.status_code not in transient_codes:
+ raise
+ time.sleep(poll_interval)
+ raise TimeoutError("Invoice UPO was not available within max_attempts.")
+
+
+def _extract_ksef_number(invoice: dict[str, Any]) -> str | None:
+ value = invoice.get("ksefNumber")
+ if isinstance(value, str) and value:
+ return value
+ return None
+
+
+def _first_ksef_number(invoices: list[dict[str, Any]]) -> str:
+ for invoice in invoices:
+ ksef_number = _extract_ksef_number(invoice)
+ if ksef_number:
+ return ksef_number
+ raise AssertionError("No invoice with ksefNumber found in metadata response.")
+
+
+def _normalize_pem(raw: str) -> str:
+ value = raw.strip()
+ if "\\n" in value and "\n" not in value:
+ value = value.replace("\\n", "\n")
+ if not value.endswith("\n"):
+ value += "\n"
+ return value
+
+
+def _decode_b64(value: str) -> bytes:
+ compact = "".join(value.split())
+ return base64.b64decode(compact.encode("ascii"), validate=False)
+
+
+def _required_certificate_pem(
+ *,
+ plain_names: tuple[str, ...],
+ b64_names: tuple[str, ...],
+) -> str:
+ plain_value = _optional_any(*plain_names)
+ if plain_value:
+ normalized_plain = _normalize_pem(plain_value)
+ plain_bytes = normalized_plain.encode("utf-8")
+ if b"BEGIN CERTIFICATE" in plain_bytes:
+ return normalized_plain
+ try:
+ cert = x509.load_der_x509_certificate(plain_bytes)
+ except ValueError:
+ return normalized_plain
+ return cert.public_bytes(serialization.Encoding.PEM).decode("utf-8")
+
+ b64_value = _optional_any(*b64_names)
+ if b64_value:
+ decoded = _decode_b64(b64_value)
+ if b"BEGIN CERTIFICATE" in decoded:
+ return _normalize_pem(decoded.decode("utf-8"))
+ try:
+ cert = x509.load_der_x509_certificate(decoded)
+ except ValueError as exc:
+ raise RuntimeError("Unable to parse XAdES certificate from provided secrets.") from exc
+ return cert.public_bytes(serialization.Encoding.PEM).decode("utf-8")
+
+ raise RuntimeError(
+ "Missing certificate value in env vars: "
+ + ", ".join((*plain_names, *b64_names))
+ )
+
+
+def _required_private_key_pem(
+ *,
+ plain_names: tuple[str, ...],
+ b64_names: tuple[str, ...],
+ private_key_password: str | None,
+ password_secret_names: tuple[str, ...],
+) -> str:
+ plain_value = _optional_any(*plain_names)
+ if plain_value:
+ key_bytes = _normalize_pem(plain_value).encode("utf-8")
+ else:
+ b64_value = _optional_any(*b64_names)
+ if not b64_value:
+ raise RuntimeError(
+ "Missing PEM value in env vars: "
+ + ", ".join((*plain_names, *b64_names))
+ )
+ decoded = _decode_b64(b64_value)
+ if b"BEGIN" in decoded:
+ key_bytes = _normalize_pem(decoded.decode("utf-8")).encode("utf-8")
+ else:
+ key_bytes = decoded
+
+ password_bytes = None if private_key_password is None else private_key_password.encode("utf-8")
+ try:
+ if b"BEGIN" in key_bytes:
+ key = load_pem_private_key(key_bytes, password=password_bytes)
+ else:
+ key = load_der_private_key(key_bytes, password=password_bytes)
+ except TypeError as exc:
+ raise RuntimeError(
+ "Invalid private key password configuration. "
+ f"If key is encrypted, set one of: {', '.join(password_secret_names)}."
+ ) from exc
+ except ValueError as exc:
+ password_hint = (
+ f"Check secret value in: {', '.join(password_secret_names)}."
+ if private_key_password
+ else f"If key is encrypted, set one of: {', '.join(password_secret_names)}."
+ )
+ raise RuntimeError(f"Unable to load XAdES private key. {password_hint}") from exc
+
+ return key.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.PKCS8,
+ encryption_algorithm=serialization.NoEncryption(),
+ ).decode("utf-8")
+
+
+def _load_test_xades_config() -> E2EConfig:
+ key_password = _optional_any(
+ "KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD",
+ "KSEF_XADES_PRIVATE_KEY_PASSWORD",
+ )
+ return E2EConfig(
+ name="test",
+ base_url=_optional_any("KSEF_TEST_BASE_URL") or KsefEnvironment.TEST.value,
+ context_type=_required_any("KSEF_TEST_CONTEXT_TYPE"),
+ context_value=_required_any("KSEF_TEST_CONTEXT_VALUE"),
+ subject_type=_optional_any("KSEF_TEST_SUBJECT_TYPE") or "Subject1",
+ auth_mode=AUTH_MODE_XADES,
+ certificate_pem=_required_certificate_pem(
+ plain_names=("KSEF_TEST_XADES_CERT_CRT", "KSEF_TEST_XADES_CERT_PEM"),
+ b64_names=("KSEF_TEST_XADES_CERT_CRT_B64", "KSEF_TEST_XADES_CERT_PEM_B64"),
+ ),
+ private_key_pem=_required_private_key_pem(
+ plain_names=("KSEF_TEST_XADES_PRIVATE_KEY_PEM",),
+ b64_names=("KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64",),
+ private_key_password=key_password,
+ password_secret_names=(
+ "KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD",
+ "KSEF_XADES_PRIVATE_KEY_PASSWORD",
+ ),
+ ),
+ subject_identifier_type=(
+ _optional_any("KSEF_TEST_XADES_SUBJECT_IDENTIFIER_TYPE") or "certificateSubject"
+ ),
+ )
+
+
+def _load_demo_xades_config() -> E2EConfig:
+ key_password = _optional_any(
+ "KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD",
+ "KSEF_XADES_PRIVATE_KEY_PASSWORD",
+ )
+ return E2EConfig(
+ name="demo",
+ base_url=_optional_any("KSEF_DEMO_BASE_URL", "KSEF_BASE_URL") or KsefEnvironment.DEMO.value,
+ context_type=_required_any("KSEF_DEMO_CONTEXT_TYPE", "KSEF_CONTEXT_TYPE"),
+ context_value=_required_any("KSEF_DEMO_CONTEXT_VALUE", "KSEF_CONTEXT_VALUE"),
+ subject_type=_optional_any("KSEF_DEMO_SUBJECT_TYPE", "KSEF_SUBJECT_TYPE") or "Subject1",
+ auth_mode=AUTH_MODE_XADES,
+ certificate_pem=_required_certificate_pem(
+ plain_names=("KSEF_DEMO_XADES_CERT_CRT", "KSEF_DEMO_XADES_CERT_PEM"),
+ b64_names=("KSEF_DEMO_XADES_CERT_CRT_B64", "KSEF_DEMO_XADES_CERT_PEM_B64"),
+ ),
+ private_key_pem=_required_private_key_pem(
+ plain_names=("KSEF_DEMO_XADES_PRIVATE_KEY_PEM",),
+ b64_names=("KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64",),
+ private_key_password=key_password,
+ password_secret_names=(
+ "KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD",
+ "KSEF_XADES_PRIVATE_KEY_PASSWORD",
+ ),
+ ),
+ subject_identifier_type=(
+ _optional_any("KSEF_DEMO_XADES_SUBJECT_IDENTIFIER_TYPE") or "certificateSubject"
+ ),
+ )
+
+
+def _utc_z(dt: datetime) -> str:
+ return dt.astimezone(timezone.utc).replace(microsecond=0).strftime("%Y-%m-%dT%H:%M:%SZ")
+
+
+def _query_invoice_metadata(
+ client: KsefClient,
+ *,
+ access_token: str,
+ subject_type: str,
+) -> list[dict[str, Any]]:
+ now = datetime.now(timezone.utc)
+ lookback_days = _env_int("KSEF_E2E_LOOKBACK_DAYS", 30)
+ request_payload = {
+ "subjectType": subject_type,
+ "dateRange": {
+ "dateType": "Issue",
+ "from": _utc_z(now - timedelta(days=lookback_days)),
+ "to": _utc_z(now + timedelta(minutes=10)),
+ },
+ }
+ response = _with_rate_limit_retry(
+ lambda: client.invoices.query_invoice_metadata(
+ request_payload,
+ access_token=access_token,
+ page_offset=0,
+ page_size=10,
+ sort_order="Desc",
+ )
+ )
+ invoices = response.get("invoices") or response.get("invoiceList") or []
+ return [invoice for invoice in invoices if isinstance(invoice, dict)]
+
+
+def _poll_metadata_until_contains(
+ client: KsefClient,
+ *,
+ access_token: str,
+ subject_type: str,
+ expected_ksef_number: str,
+) -> list[dict[str, Any]]:
+ max_attempts = _env_int("KSEF_E2E_METADATA_MAX_ATTEMPTS", 45)
+ poll_interval = _env_float("KSEF_E2E_POLL_INTERVAL_SECONDS", 2.0)
+
+ for _ in range(max_attempts):
+ invoices = _query_invoice_metadata(
+ client,
+ access_token=access_token,
+ subject_type=subject_type,
+ )
+ if any(_extract_ksef_number(invoice) == expected_ksef_number for invoice in invoices):
+ return invoices
+ time.sleep(poll_interval)
+ raise TimeoutError("Expected sent invoice did not appear in invoice metadata list.")
+
+
+def _build_invoice_xml(*, seller_nip: str, environment_name: str) -> bytes:
+ now = datetime.now(timezone.utc).replace(microsecond=0)
+ issue_date = now.date().isoformat()
+ period_start = now.date().replace(day=1).isoformat()
+ payment_due = (now.date() + timedelta(days=14)).isoformat()
+ invoice_number = (
+ f"E2E/{environment_name.upper()}/{now.strftime('%Y%m%d%H%M%S')}/{uuid.uuid4().hex[:8]}"
+ )
+
+ xml = f"""
+
+
+ FA
+ 3
+ {_utc_z(now)}
+ pytest
+
+
+
+ {seller_nip}
+ Automated seller
+
+
+ PL
+ ul. Testowa 1
+ 00-001 Warszawa
+
+
+
+
+ 1111111111
+ Automated buyer
+
+
+ PL
+ ul. Odbiorcy 1
+ 00-002 Warszawa
+
+
+ buyer@example.com
+ 555777999
+
+ 99999999
+ 2
+ 2
+
+
+ PLN
+ {issue_date}
+ miejscowosc
+ {invoice_number}
+
+ {period_start}
+ {payment_due}
+
+ 1.00
+ 0.23
+ 1.23
+
+ 2
+ 2
+ 2
+ 2
+
+ 1
+
+
+ 1
+
+ 2
+
+ 1
+
+
+ VAT
+
+ 1
+ Pozycja testowa
+ szt
+ 1.00
+ 1.00
+ 1.00
+ 23
+
+
+
+ 0.00
+ brak
+
+ 0.00
+
+ 0.00
+ brak
+
+ 0.00
+ 1.23
+
+
+
+ {payment_due}
+
+ 6
+
+ 73111111111111111111111111
+ Bank testowy
+ PLN
+
+
+
+
+"""
+ return xml.encode("utf-8")
+
+
+def _authenticate_access_token(
+ client: KsefClient,
+ *,
+ config: E2EConfig,
+ token_cert: str,
+) -> str:
+ coordinator = AuthCoordinator(client.auth)
+ poll_interval_seconds = _env_float("KSEF_E2E_POLL_INTERVAL_SECONDS", 2.0)
+ max_attempts = _env_int("KSEF_E2E_AUTH_MAX_ATTEMPTS", 90)
+
+ if config.auth_mode == AUTH_MODE_TOKEN:
+ if not config.token:
+ raise RuntimeError("Missing token for token auth mode.")
+ result = coordinator.authenticate_with_ksef_token(
+ token=config.token,
+ public_certificate=token_cert,
+ context_identifier_type=config.context_type,
+ context_identifier_value=config.context_value,
+ max_attempts=max_attempts,
+ poll_interval_seconds=poll_interval_seconds,
+ )
+ return result.tokens.access_token.token
+
+ if config.auth_mode == AUTH_MODE_XADES:
+ if not config.certificate_pem or not config.private_key_pem:
+ raise RuntimeError("Missing certificate/private key for XAdES auth mode.")
+ result = coordinator.authenticate_with_xades(
+ context_identifier_type=config.context_type,
+ context_identifier_value=config.context_value,
+ subject_identifier_type=config.subject_identifier_type or "certificateSubject",
+ certificate_pem=config.certificate_pem,
+ private_key_pem=config.private_key_pem,
+ max_attempts=max_attempts,
+ poll_interval_seconds=poll_interval_seconds,
+ )
+ return result.tokens.access_token.token
+
+ raise RuntimeError(f"Unknown auth mode: {config.auth_mode}")
+
+
+def _run_full_e2e_flow(config: E2EConfig) -> None:
+ if config.context_type.lower() != "nip":
+ raise RuntimeError("This E2E scenario requires context_type='nip'.")
+
+ options = KsefClientOptions(base_url=config.base_url)
+ with KsefClient(options) as client:
+ certs = _with_rate_limit_retry(lambda: client.security.get_public_key_certificates())
+ token_cert = _select_certificate(certs, "KsefTokenEncryption")
+ symmetric_cert = _select_certificate(certs, "SymmetricKeyEncryption")
+
+ access_token = _authenticate_access_token(
+ client,
+ config=config,
+ token_cert=token_cert,
+ )
+ assert access_token, "Authentication did not return access token."
+
+ workflow = OnlineSessionWorkflow(client.sessions)
+ session = workflow.open_session(
+ form_code=FORM_CODE,
+ public_certificate=symmetric_cert,
+ access_token=access_token,
+ )
+
+ try:
+ send_result = _with_rate_limit_retry(
+ lambda: workflow.send_invoice(
+ session_reference_number=session.session_reference_number,
+ invoice_xml=_build_invoice_xml(
+ seller_nip=config.context_value,
+ environment_name=config.name,
+ ),
+ encryption_data=session.encryption_data,
+ access_token=access_token,
+ )
+ )
+ invoice_reference_number = send_result.get("referenceNumber")
+ assert isinstance(invoice_reference_number, str) and invoice_reference_number, (
+ "Missing invoice reference number after send."
+ )
+
+ ksef_number = _poll_for_ksef_number(
+ client,
+ session.session_reference_number,
+ invoice_reference_number,
+ access_token,
+ )
+
+ upo_bytes = _poll_for_upo(
+ lambda: client.sessions.get_session_invoice_upo_by_ksef(
+ session.session_reference_number,
+ ksef_number,
+ access_token=access_token,
+ )
+ )
+ assert upo_bytes, "Received empty UPO payload."
+
+ session_invoices = _with_rate_limit_retry(
+ lambda: client.sessions.get_session_invoices(
+ session.session_reference_number,
+ access_token=access_token,
+ page_size=20,
+ )
+ )
+ invoices_in_session = session_invoices.get("invoices") or []
+ assert any(
+ _extract_ksef_number(invoice) == ksef_number
+ for invoice in invoices_in_session
+ if isinstance(invoice, dict)
+ ), "Sent invoice not found in session invoice list."
+
+ metadata_invoices = _poll_metadata_until_contains(
+ client,
+ access_token=access_token,
+ subject_type=config.subject_type,
+ expected_ksef_number=ksef_number,
+ )
+ latest_ksef_number = _first_ksef_number(metadata_invoices)
+
+ latest_invoice = _with_rate_limit_retry(
+ lambda: client.invoices.get_invoice_bytes(
+ latest_ksef_number,
+ access_token=access_token,
+ )
+ )
+ assert latest_invoice.content, "Downloaded latest invoice content is empty."
+ finally:
+ with suppress(Exception):
+ workflow.close_session(session.session_reference_number, access_token)
+
+
+def test_e2e_test_environment_full_flow_token() -> None:
+ _ensure_e2e_enabled()
+ _run_full_e2e_flow(_load_test_token_config())
+
+
+def test_e2e_demo_environment_full_flow_token() -> None:
+ _ensure_e2e_enabled()
+ _run_full_e2e_flow(_load_demo_token_config())
+
+
+def test_e2e_test_environment_full_flow_xades() -> None:
+ _ensure_e2e_enabled()
+ _run_full_e2e_flow(_load_test_xades_config())
+
+
+def test_e2e_demo_environment_full_flow_xades() -> None:
+ _ensure_e2e_enabled()
+ _run_full_e2e_flow(_load_demo_xades_config())