From c747277a1a01b7601b48dc8a12a9156f8cf41879 Mon Sep 17 00:00:00 2001 From: Smekcio Date: Sat, 14 Feb 2026 13:24:26 +0100 Subject: [PATCH] Add E2E workflows and consolidated token/XAdES tests --- .github/workflows/python-e2e.yml | 335 +++++++++++++++ README.md | 15 +- tests/test_e2e_demo_token_flow.py | 155 ------- tests/test_e2e_token_flows.py | 662 ++++++++++++++++++++++++++++++ 4 files changed, 1007 insertions(+), 160 deletions(-) create mode 100644 .github/workflows/python-e2e.yml delete mode 100644 tests/test_e2e_demo_token_flow.py create mode 100644 tests/test_e2e_token_flows.py diff --git a/.github/workflows/python-e2e.yml b/.github/workflows/python-e2e.yml new file mode 100644 index 0000000..fedeba1 --- /dev/null +++ b/.github/workflows/python-e2e.yml @@ -0,0 +1,335 @@ +name: Python E2E + +on: + push: + pull_request: + branches: ["main"] + workflow_dispatch: + +permissions: + contents: read + +concurrency: + group: python-e2e-${{ github.ref }} + cancel-in-progress: true + +jobs: + e2e-test-token: + name: E2E TEST (token) + if: ${{ github.event_name != 'pull_request' || github.event.pull_request.head.repo.fork == false }} + runs-on: ubuntu-latest + timeout-minutes: 35 + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: pip + cache-dependency-path: | + requirements.txt + requirements-dev.txt + pyproject.toml + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install -r requirements-dev.txt + + - name: Validate required secrets + shell: bash + env: + KSEF_TEST_TOKEN: ${{ secrets.KSEF_TEST_TOKEN }} + KSEF_TEST_CONTEXT_TYPE: ${{ secrets.KSEF_TEST_CONTEXT_TYPE }} + KSEF_TEST_CONTEXT_VALUE: ${{ secrets.KSEF_TEST_CONTEXT_VALUE }} + run: | + set +x + missing=0 + for name in KSEF_TEST_TOKEN KSEF_TEST_CONTEXT_TYPE KSEF_TEST_CONTEXT_VALUE; do + if [ -z "${!name}" ]; then + echo "::error::Missing required secret: ${name}" + missing=1 + fi + done + if [ "${missing}" -ne 0 ]; then + exit 1 + fi + + - name: Mask sensitive values + shell: bash + env: + KSEF_TEST_TOKEN: ${{ secrets.KSEF_TEST_TOKEN }} + KSEF_TEST_CONTEXT_TYPE: ${{ secrets.KSEF_TEST_CONTEXT_TYPE }} + KSEF_TEST_CONTEXT_VALUE: ${{ secrets.KSEF_TEST_CONTEXT_VALUE }} + run: | + set +x + for value in "${KSEF_TEST_TOKEN}" "${KSEF_TEST_CONTEXT_TYPE}" "${KSEF_TEST_CONTEXT_VALUE}"; do + if [ -n "${value}" ]; then + echo "::add-mask::${value}" + fi + done + + - name: Run E2E TEST (token) + env: + KSEF_E2E: "1" + KSEF_TEST_BASE_URL: https://api-test.ksef.mf.gov.pl + KSEF_TEST_TOKEN: ${{ secrets.KSEF_TEST_TOKEN }} + KSEF_TEST_CONTEXT_TYPE: ${{ secrets.KSEF_TEST_CONTEXT_TYPE }} + KSEF_TEST_CONTEXT_VALUE: ${{ secrets.KSEF_TEST_CONTEXT_VALUE }} + KSEF_TEST_SUBJECT_TYPE: Subject1 + run: | + python -m pytest -q --maxfail=1 --disable-warnings \ + tests/test_e2e_token_flows.py::test_e2e_test_environment_full_flow_token + + e2e-test-xades: + name: E2E TEST (xades) + if: ${{ github.event_name != 'pull_request' || github.event.pull_request.head.repo.fork == false }} + runs-on: ubuntu-latest + timeout-minutes: 40 + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: pip + cache-dependency-path: | + requirements.txt + requirements-dev.txt + pyproject.toml + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install -r requirements-dev.txt + python -m pip install -e ".[xml]" + + - name: Validate required secrets + shell: bash + env: + KSEF_TEST_CONTEXT_TYPE: ${{ secrets.KSEF_TEST_CONTEXT_TYPE }} + KSEF_TEST_CONTEXT_VALUE: ${{ secrets.KSEF_TEST_CONTEXT_VALUE }} + KSEF_TEST_XADES_CERT_CRT: ${{ secrets.KSEF_TEST_XADES_CERT_CRT }} + KSEF_TEST_XADES_CERT_CRT_B64: ${{ secrets.KSEF_TEST_XADES_CERT_CRT_B64 }} + KSEF_TEST_XADES_PRIVATE_KEY_PEM: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PEM }} + KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64 }} + KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD }} + run: | + set +x + missing=0 + for name in KSEF_TEST_CONTEXT_TYPE KSEF_TEST_CONTEXT_VALUE; do + if [ -z "${!name}" ]; then + echo "::error::Missing required secret: ${name}" + missing=1 + fi + done + if [ -z "${KSEF_TEST_XADES_CERT_CRT}" ] && [ -z "${KSEF_TEST_XADES_CERT_CRT_B64}" ]; then + echo "::error::Missing required secret: KSEF_TEST_XADES_CERT_CRT or KSEF_TEST_XADES_CERT_CRT_B64" + missing=1 + fi + if [ -z "${KSEF_TEST_XADES_PRIVATE_KEY_PEM}" ] && [ -z "${KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64}" ]; then + echo "::error::Missing required secret: KSEF_TEST_XADES_PRIVATE_KEY_PEM or KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64" + missing=1 + fi + if [ "${missing}" -ne 0 ]; then + exit 1 + fi + + - name: Mask sensitive values + shell: bash + env: + KSEF_TEST_CONTEXT_TYPE: ${{ secrets.KSEF_TEST_CONTEXT_TYPE }} + KSEF_TEST_CONTEXT_VALUE: ${{ secrets.KSEF_TEST_CONTEXT_VALUE }} + KSEF_TEST_XADES_CERT_CRT: ${{ secrets.KSEF_TEST_XADES_CERT_CRT }} + KSEF_TEST_XADES_CERT_CRT_B64: ${{ secrets.KSEF_TEST_XADES_CERT_CRT_B64 }} + KSEF_TEST_XADES_PRIVATE_KEY_PEM: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PEM }} + KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64 }} + KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD }} + run: | + set +x + for value in "${KSEF_TEST_CONTEXT_TYPE}" "${KSEF_TEST_CONTEXT_VALUE}" "${KSEF_TEST_XADES_CERT_CRT}" "${KSEF_TEST_XADES_CERT_CRT_B64}" "${KSEF_TEST_XADES_PRIVATE_KEY_PEM}" "${KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64}" "${KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD}"; do + if [ -n "${value}" ]; then + echo "::add-mask::${value}" + fi + done + + - name: Run E2E TEST (xades) + env: + KSEF_E2E: "1" + KSEF_TEST_BASE_URL: https://api-test.ksef.mf.gov.pl + KSEF_TEST_CONTEXT_TYPE: ${{ secrets.KSEF_TEST_CONTEXT_TYPE }} + KSEF_TEST_CONTEXT_VALUE: ${{ secrets.KSEF_TEST_CONTEXT_VALUE }} + KSEF_TEST_SUBJECT_TYPE: Subject1 + KSEF_TEST_XADES_CERT_CRT: ${{ secrets.KSEF_TEST_XADES_CERT_CRT }} + KSEF_TEST_XADES_CERT_CRT_B64: ${{ secrets.KSEF_TEST_XADES_CERT_CRT_B64 }} + KSEF_TEST_XADES_PRIVATE_KEY_PEM: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PEM }} + KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64 }} + KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD: ${{ secrets.KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD }} + KSEF_TEST_XADES_SUBJECT_IDENTIFIER_TYPE: certificateSubject + run: | + python -m pytest -q --maxfail=1 --disable-warnings \ + tests/test_e2e_token_flows.py::test_e2e_test_environment_full_flow_xades + + e2e-demo-token: + name: E2E DEMO (token) + if: ${{ github.event_name != 'pull_request' || github.event.pull_request.head.repo.fork == false }} + runs-on: ubuntu-latest + timeout-minutes: 35 + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: pip + cache-dependency-path: | + requirements.txt + requirements-dev.txt + pyproject.toml + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install -r requirements-dev.txt + + - name: Validate required secrets + shell: bash + env: + KSEF_DEMO_TOKEN: ${{ secrets.KSEF_DEMO_TOKEN }} + KSEF_DEMO_CONTEXT_TYPE: ${{ secrets.KSEF_DEMO_CONTEXT_TYPE }} + KSEF_DEMO_CONTEXT_VALUE: ${{ secrets.KSEF_DEMO_CONTEXT_VALUE }} + run: | + set +x + missing=0 + for name in KSEF_DEMO_TOKEN KSEF_DEMO_CONTEXT_TYPE KSEF_DEMO_CONTEXT_VALUE; do + if [ -z "${!name}" ]; then + echo "::error::Missing required secret: ${name}" + missing=1 + fi + done + if [ "${missing}" -ne 0 ]; then + exit 1 + fi + + - name: Mask sensitive values + shell: bash + env: + KSEF_DEMO_TOKEN: ${{ secrets.KSEF_DEMO_TOKEN }} + KSEF_DEMO_CONTEXT_TYPE: ${{ secrets.KSEF_DEMO_CONTEXT_TYPE }} + KSEF_DEMO_CONTEXT_VALUE: ${{ secrets.KSEF_DEMO_CONTEXT_VALUE }} + run: | + set +x + for value in "${KSEF_DEMO_TOKEN}" "${KSEF_DEMO_CONTEXT_TYPE}" "${KSEF_DEMO_CONTEXT_VALUE}"; do + if [ -n "${value}" ]; then + echo "::add-mask::${value}" + fi + done + + - name: Run E2E DEMO (token) + env: + KSEF_E2E: "1" + KSEF_DEMO_BASE_URL: https://api-demo.ksef.mf.gov.pl + KSEF_DEMO_TOKEN: ${{ secrets.KSEF_DEMO_TOKEN }} + KSEF_DEMO_CONTEXT_TYPE: ${{ secrets.KSEF_DEMO_CONTEXT_TYPE }} + KSEF_DEMO_CONTEXT_VALUE: ${{ secrets.KSEF_DEMO_CONTEXT_VALUE }} + KSEF_DEMO_SUBJECT_TYPE: Subject1 + run: | + python -m pytest -q --maxfail=1 --disable-warnings \ + tests/test_e2e_token_flows.py::test_e2e_demo_environment_full_flow_token + + e2e-demo-xades: + name: E2E DEMO (xades) + if: ${{ github.event_name != 'pull_request' || github.event.pull_request.head.repo.fork == false }} + runs-on: ubuntu-latest + timeout-minutes: 40 + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: pip + cache-dependency-path: | + requirements.txt + requirements-dev.txt + pyproject.toml + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install -r requirements-dev.txt + python -m pip install -e ".[xml]" + + - name: Validate required secrets + shell: bash + env: + KSEF_DEMO_CONTEXT_TYPE: ${{ secrets.KSEF_DEMO_CONTEXT_TYPE }} + KSEF_DEMO_CONTEXT_VALUE: ${{ secrets.KSEF_DEMO_CONTEXT_VALUE }} + KSEF_DEMO_XADES_CERT_CRT: ${{ secrets.KSEF_DEMO_XADES_CERT_CRT }} + KSEF_DEMO_XADES_CERT_CRT_B64: ${{ secrets.KSEF_DEMO_XADES_CERT_CRT_B64 }} + KSEF_DEMO_XADES_PRIVATE_KEY_PEM: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PEM }} + KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64 }} + KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD }} + run: | + set +x + missing=0 + for name in KSEF_DEMO_CONTEXT_TYPE KSEF_DEMO_CONTEXT_VALUE; do + if [ -z "${!name}" ]; then + echo "::error::Missing required secret: ${name}" + missing=1 + fi + done + if [ -z "${KSEF_DEMO_XADES_CERT_CRT}" ] && [ -z "${KSEF_DEMO_XADES_CERT_CRT_B64}" ]; then + echo "::error::Missing required secret: KSEF_DEMO_XADES_CERT_CRT or KSEF_DEMO_XADES_CERT_CRT_B64" + missing=1 + fi + if [ -z "${KSEF_DEMO_XADES_PRIVATE_KEY_PEM}" ] && [ -z "${KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64}" ]; then + echo "::error::Missing required secret: KSEF_DEMO_XADES_PRIVATE_KEY_PEM or KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64" + missing=1 + fi + if [ "${missing}" -ne 0 ]; then + exit 1 + fi + + - name: Mask sensitive values + shell: bash + env: + KSEF_DEMO_CONTEXT_TYPE: ${{ secrets.KSEF_DEMO_CONTEXT_TYPE }} + KSEF_DEMO_CONTEXT_VALUE: ${{ secrets.KSEF_DEMO_CONTEXT_VALUE }} + KSEF_DEMO_XADES_CERT_CRT: ${{ secrets.KSEF_DEMO_XADES_CERT_CRT }} + KSEF_DEMO_XADES_CERT_CRT_B64: ${{ secrets.KSEF_DEMO_XADES_CERT_CRT_B64 }} + KSEF_DEMO_XADES_PRIVATE_KEY_PEM: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PEM }} + KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64 }} + KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD }} + run: | + set +x + for value in "${KSEF_DEMO_CONTEXT_TYPE}" "${KSEF_DEMO_CONTEXT_VALUE}" "${KSEF_DEMO_XADES_CERT_CRT}" "${KSEF_DEMO_XADES_CERT_CRT_B64}" "${KSEF_DEMO_XADES_PRIVATE_KEY_PEM}" "${KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64}" "${KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD}"; do + if [ -n "${value}" ]; then + echo "::add-mask::${value}" + fi + done + + - name: Run E2E DEMO (xades) + env: + KSEF_E2E: "1" + KSEF_DEMO_BASE_URL: https://api-demo.ksef.mf.gov.pl + KSEF_DEMO_CONTEXT_TYPE: ${{ secrets.KSEF_DEMO_CONTEXT_TYPE }} + KSEF_DEMO_CONTEXT_VALUE: ${{ secrets.KSEF_DEMO_CONTEXT_VALUE }} + KSEF_DEMO_SUBJECT_TYPE: Subject1 + KSEF_DEMO_XADES_CERT_CRT: ${{ secrets.KSEF_DEMO_XADES_CERT_CRT }} + KSEF_DEMO_XADES_CERT_CRT_B64: ${{ secrets.KSEF_DEMO_XADES_CERT_CRT_B64 }} + KSEF_DEMO_XADES_PRIVATE_KEY_PEM: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PEM }} + KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64 }} + KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD: ${{ secrets.KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD }} + KSEF_DEMO_XADES_SUBJECT_IDENTIFIER_TYPE: certificateSubject + run: | + python -m pytest -q --maxfail=1 --disable-warnings \ + tests/test_e2e_token_flows.py::test_e2e_demo_environment_full_flow_xades diff --git a/README.md b/README.md index 7e21831..028458b 100644 --- a/README.md +++ b/README.md @@ -209,8 +209,9 @@ Lokalne uruchomienie (XAdES, TEST): KSEF_E2E=1 \ KSEF_TEST_CONTEXT_TYPE=nip \ KSEF_TEST_CONTEXT_VALUE=... \ -KSEF_TEST_XADES_CERT_PEM="$(cat cert.pem)" \ +KSEF_TEST_XADES_CERT_CRT="$(cat cert.crt)" \ KSEF_TEST_XADES_PRIVATE_KEY_PEM="$(cat key.pem)" \ +KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD=... \ pytest tests/test_e2e_token_flows.py::test_e2e_test_environment_full_flow_xades ``` @@ -225,17 +226,21 @@ Workflow uruchamia się: Repozytoryjne sekrety do ustawienia: - `KSEF_TEST_TOKEN`, `KSEF_TEST_CONTEXT_TYPE`, `KSEF_TEST_CONTEXT_VALUE` (token TEST) - `KSEF_DEMO_TOKEN`, `KSEF_DEMO_CONTEXT_TYPE`, `KSEF_DEMO_CONTEXT_VALUE` (token DEMO) -- `KSEF_TEST_XADES_CERT_PEM` albo `KSEF_TEST_XADES_CERT_PEM_B64` (XAdES TEST) +- `KSEF_TEST_XADES_CERT_CRT` albo `KSEF_TEST_XADES_CERT_CRT_B64` (XAdES TEST) +- `KSEF_TEST_XADES_CERT_PEM` albo `KSEF_TEST_XADES_CERT_PEM_B64` (XAdES TEST, kompatybilność wsteczna) - `KSEF_TEST_XADES_PRIVATE_KEY_PEM` albo `KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64` (XAdES TEST) +- `KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD` opcjonalnie, wymagany dla klucza zaszyfrowanego - `KSEF_TEST_XADES_SUBJECT_IDENTIFIER_TYPE` opcjonalnie, domyślnie `certificateSubject` -- `KSEF_DEMO_XADES_CERT_PEM` albo `KSEF_DEMO_XADES_CERT_PEM_B64` (XAdES DEMO) +- `KSEF_DEMO_XADES_CERT_CRT` albo `KSEF_DEMO_XADES_CERT_CRT_B64` (XAdES DEMO) +- `KSEF_DEMO_XADES_CERT_PEM` albo `KSEF_DEMO_XADES_CERT_PEM_B64` (XAdES DEMO, kompatybilność wsteczna) - `KSEF_DEMO_XADES_PRIVATE_KEY_PEM` albo `KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64` (XAdES DEMO) +- `KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD` opcjonalnie, wymagany dla klucza zaszyfrowanego - `KSEF_DEMO_XADES_SUBJECT_IDENTIFIER_TYPE` opcjonalnie, domyślnie `certificateSubject` -Przygotowanie sekretów PEM w wariancie Base64 (jedna linia): +Przygotowanie sekretów CRT/PEM w wariancie Base64 (jedna linia): ```bash -base64 < cert.pem | tr -d '\n' +base64 < cert.crt | tr -d '\n' base64 < key.pem | tr -d '\n' ``` diff --git a/tests/test_e2e_demo_token_flow.py b/tests/test_e2e_demo_token_flow.py deleted file mode 100644 index 872835f..0000000 --- a/tests/test_e2e_demo_token_flow.py +++ /dev/null @@ -1,155 +0,0 @@ -from __future__ import annotations - -import os -import time -from datetime import datetime, timedelta, timezone - -import pytest - -from ksef_client import KsefClient, KsefClientOptions -from ksef_client.exceptions import KsefRateLimitError -from ksef_client.services import AuthCoordinator, OnlineSessionWorkflow - - -def _env(name: str) -> str: - value = os.getenv(name) - if not value: - raise RuntimeError(f"Missing env var: {name}") - return value - - -def _select_cert(certs: list[dict], usage_name: str) -> str: - for cert in certs: - if usage_name in (cert.get("usage") or []): - return cert["certificate"] - raise RuntimeError(f"Missing public cert usage: {usage_name}") - - -@pytest.mark.e2e -def test_e2e_token_send_and_list_session_invoices() -> None: - if os.getenv("KSEF_E2E") not in {"1", "true", "yes"}: - pytest.skip("Set KSEF_E2E=1 to enable this test.") - - token = _env("KSEF_TOKEN") - context_type = _env("KSEF_CONTEXT_TYPE") - context_value = _env("KSEF_CONTEXT_VALUE") - base_url = _env("KSEF_BASE_URL") - - options = KsefClientOptions(base_url=base_url) - with KsefClient(options) as client: - certs = client.security.get_public_key_certificates() - token_cert = _select_cert(certs, "KsefTokenEncryption") - symmetric_cert = _select_cert(certs, "SymmetricKeyEncryption") - - access_token = ( - AuthCoordinator(client.auth) - .authenticate_with_ksef_token( - token=token, - public_certificate=token_cert, - context_identifier_type=context_type, - context_identifier_value=context_value, - max_attempts=90, - poll_interval_seconds=2.0, - ) - .tokens.access_token.token - ) - - workflow = OnlineSessionWorkflow(client.sessions) - session = workflow.open_session( - form_code={"systemCode": "FA (3)", "schemaVersion": "1-0E", "value": "FA"}, - public_certificate=symmetric_cert, - access_token=access_token, - ) - - now = datetime.now(timezone.utc).replace(microsecond=0) - invoice_xml = ( - '' - '' - "" - 'FA' - "3" - f"{now.strftime('%Y-%m-%dT%H:%M:%SZ')}" - "pytest" - "" - "" - f"{context_value}pytest seller" - "PLxx" - "" - "" - "1111111111" - "pytest buyer" - "PLxx" - "buyer@example.com555777999" - "9999999922" - "" - "" - "PLN" - f"{now.date().isoformat()}x" - f"FA/PYTEST/{now:%m}/{now:%Y}" - "" - f"{now.date().replace(day=1).isoformat()}" - f"{(now.date() + timedelta(days=14)).isoformat()}" - "" - "1.000.231.23" - "2222" - "11" - "21" - "VAT" - "1xszt1.00" - "1.001.0023" - "0.00x0.00" - "0.00x0.00" - "1.23" - "" - f"{(now.date() + timedelta(days=14)).isoformat()}" - "6" - "73111111111111111111111111xPLN" - "" - "" - "" - ).encode() - - send_result = workflow.send_invoice( - session_reference_number=session.session_reference_number, - invoice_xml=invoice_xml, - encryption_data=session.encryption_data, - access_token=access_token, - ) - invoice_reference_number = send_result["referenceNumber"] - - ksef_number = None - for _ in range(90): - status = client.sessions.get_session_invoice_status( - session.session_reference_number, - invoice_reference_number, - access_token=access_token, - ) - code = int(status.get("status", {}).get("code", 0)) - if code == 200: - ksef_number = status.get("ksefNumber") - break - if code not in {100, 150}: - raise AssertionError(status) - time.sleep(2) - - assert ksef_number, "Expected ksefNumber in invoice status" - - try: - listed = client.sessions.get_session_invoices( - session.session_reference_number, - access_token=access_token, - page_size=10, - ) - except KsefRateLimitError: - time.sleep(5) - listed = client.sessions.get_session_invoices( - session.session_reference_number, - access_token=access_token, - page_size=10, - ) - - invoices = listed.get("invoices") or [] - assert any(inv.get("ksefNumber") == ksef_number for inv in invoices) diff --git a/tests/test_e2e_token_flows.py b/tests/test_e2e_token_flows.py new file mode 100644 index 0000000..0781d72 --- /dev/null +++ b/tests/test_e2e_token_flows.py @@ -0,0 +1,662 @@ +from __future__ import annotations + +import base64 +import os +import time +import uuid +from collections.abc import Callable +from contextlib import suppress +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Any, TypeVar + +import pytest +from cryptography import x509 +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.serialization import load_der_private_key, load_pem_private_key + +from ksef_client import KsefClient, KsefClientOptions, KsefEnvironment +from ksef_client.exceptions import KsefHttpError, KsefRateLimitError +from ksef_client.services import AuthCoordinator, OnlineSessionWorkflow + +pytestmark = pytest.mark.e2e + +FORM_CODE = {"systemCode": "FA (3)", "schemaVersion": "1-0E", "value": "FA"} +ENABLED_VALUES = {"1", "true", "yes"} +AUTH_MODE_TOKEN = "token" +AUTH_MODE_XADES = "xades" +T = TypeVar("T") + + +@dataclass(frozen=True) +class E2EConfig: + name: str + base_url: str + context_type: str + context_value: str + subject_type: str + auth_mode: str + token: str | None = None + certificate_pem: str | None = None + private_key_pem: str | None = None + subject_identifier_type: str | None = None + + +def _optional_any(*names: str) -> str | None: + for name in names: + value = os.getenv(name) + if value: + return value + return None + + +def _required_any(*names: str) -> str: + value = _optional_any(*names) + if value: + return value + raise RuntimeError(f"Missing env var: {' or '.join(names)}") + + +def _env_int(name: str, default: int) -> int: + raw = os.getenv(name) + if raw is None: + return default + return int(raw) + + +def _env_float(name: str, default: float) -> float: + raw = os.getenv(name) + if raw is None: + return default + return float(raw) + + +def _ensure_e2e_enabled() -> None: + if os.getenv("KSEF_E2E", "").strip().lower() not in ENABLED_VALUES: + pytest.skip("Set KSEF_E2E=1 to enable e2e tests.") + + +def _load_test_token_config() -> E2EConfig: + return E2EConfig( + name="test", + base_url=_optional_any("KSEF_TEST_BASE_URL") or KsefEnvironment.TEST.value, + context_type=_required_any("KSEF_TEST_CONTEXT_TYPE"), + context_value=_required_any("KSEF_TEST_CONTEXT_VALUE"), + subject_type=_optional_any("KSEF_TEST_SUBJECT_TYPE") or "Subject1", + auth_mode=AUTH_MODE_TOKEN, + token=_required_any("KSEF_TEST_TOKEN"), + ) + + +def _load_demo_token_config() -> E2EConfig: + return E2EConfig( + name="demo", + base_url=_optional_any("KSEF_DEMO_BASE_URL", "KSEF_BASE_URL") or KsefEnvironment.DEMO.value, + context_type=_required_any("KSEF_DEMO_CONTEXT_TYPE", "KSEF_CONTEXT_TYPE"), + context_value=_required_any("KSEF_DEMO_CONTEXT_VALUE", "KSEF_CONTEXT_VALUE"), + subject_type=_optional_any("KSEF_DEMO_SUBJECT_TYPE", "KSEF_SUBJECT_TYPE") or "Subject1", + auth_mode=AUTH_MODE_TOKEN, + token=_required_any("KSEF_DEMO_TOKEN", "KSEF_TOKEN"), + ) + + +def _select_certificate(certs: list[dict[str, Any]], usage_name: str) -> str: + for cert in certs: + usage = cert.get("usage") or [] + if usage_name in usage and cert.get("certificate"): + return str(cert["certificate"]) + raise RuntimeError(f"Missing public cert usage: {usage_name}") + + +def _with_rate_limit_retry(call: Callable[[], T]) -> T: + retries = _env_int("KSEF_E2E_RATE_LIMIT_RETRIES", 3) + delay = _env_float("KSEF_E2E_RATE_LIMIT_SLEEP_SECONDS", 5.0) + for attempt in range(retries): + try: + return call() + except KsefRateLimitError: + if attempt == retries - 1: + raise + time.sleep(delay) + raise RuntimeError("Rate limit retry loop terminated unexpectedly.") + + +def _poll_for_ksef_number( + client: KsefClient, + session_reference_number: str, + invoice_reference_number: str, + access_token: str, +) -> str: + max_attempts = _env_int("KSEF_E2E_INVOICE_MAX_ATTEMPTS", 90) + poll_interval = _env_float("KSEF_E2E_POLL_INTERVAL_SECONDS", 2.0) + for _ in range(max_attempts): + status = _with_rate_limit_retry( + lambda: client.sessions.get_session_invoice_status( + session_reference_number, + invoice_reference_number, + access_token=access_token, + ) + ) + code = int((status.get("status") or {}).get("code", 0)) + if code == 200: + ksef_number = status.get("ksefNumber") + if isinstance(ksef_number, str) and ksef_number: + return ksef_number + raise AssertionError("Invoice accepted but ksefNumber is missing.") + if code not in {100, 150}: + raise AssertionError(f"Invoice processing failed with status code {code}.") + time.sleep(poll_interval) + raise TimeoutError("Invoice processing did not complete within max_attempts.") + + +def _poll_for_upo(fetch: Callable[[], bytes]) -> bytes: + max_attempts = _env_int("KSEF_E2E_UPO_MAX_ATTEMPTS", 45) + poll_interval = _env_float("KSEF_E2E_POLL_INTERVAL_SECONDS", 2.0) + transient_codes = {404, 409, 425} + + for _ in range(max_attempts): + try: + upo = _with_rate_limit_retry(fetch) + if upo: + return upo + except KsefHttpError as error: + if error.status_code not in transient_codes: + raise + time.sleep(poll_interval) + raise TimeoutError("Invoice UPO was not available within max_attempts.") + + +def _extract_ksef_number(invoice: dict[str, Any]) -> str | None: + value = invoice.get("ksefNumber") + if isinstance(value, str) and value: + return value + return None + + +def _first_ksef_number(invoices: list[dict[str, Any]]) -> str: + for invoice in invoices: + ksef_number = _extract_ksef_number(invoice) + if ksef_number: + return ksef_number + raise AssertionError("No invoice with ksefNumber found in metadata response.") + + +def _normalize_pem(raw: str) -> str: + value = raw.strip() + if "\\n" in value and "\n" not in value: + value = value.replace("\\n", "\n") + if not value.endswith("\n"): + value += "\n" + return value + + +def _decode_b64(value: str) -> bytes: + compact = "".join(value.split()) + return base64.b64decode(compact.encode("ascii"), validate=False) + + +def _required_certificate_pem( + *, + plain_names: tuple[str, ...], + b64_names: tuple[str, ...], +) -> str: + plain_value = _optional_any(*plain_names) + if plain_value: + normalized_plain = _normalize_pem(plain_value) + plain_bytes = normalized_plain.encode("utf-8") + if b"BEGIN CERTIFICATE" in plain_bytes: + return normalized_plain + try: + cert = x509.load_der_x509_certificate(plain_bytes) + except ValueError: + return normalized_plain + return cert.public_bytes(serialization.Encoding.PEM).decode("utf-8") + + b64_value = _optional_any(*b64_names) + if b64_value: + decoded = _decode_b64(b64_value) + if b"BEGIN CERTIFICATE" in decoded: + return _normalize_pem(decoded.decode("utf-8")) + try: + cert = x509.load_der_x509_certificate(decoded) + except ValueError as exc: + raise RuntimeError("Unable to parse XAdES certificate from provided secrets.") from exc + return cert.public_bytes(serialization.Encoding.PEM).decode("utf-8") + + raise RuntimeError( + "Missing certificate value in env vars: " + + ", ".join((*plain_names, *b64_names)) + ) + + +def _required_private_key_pem( + *, + plain_names: tuple[str, ...], + b64_names: tuple[str, ...], + private_key_password: str | None, + password_secret_names: tuple[str, ...], +) -> str: + plain_value = _optional_any(*plain_names) + if plain_value: + key_bytes = _normalize_pem(plain_value).encode("utf-8") + else: + b64_value = _optional_any(*b64_names) + if not b64_value: + raise RuntimeError( + "Missing PEM value in env vars: " + + ", ".join((*plain_names, *b64_names)) + ) + decoded = _decode_b64(b64_value) + if b"BEGIN" in decoded: + key_bytes = _normalize_pem(decoded.decode("utf-8")).encode("utf-8") + else: + key_bytes = decoded + + password_bytes = None if private_key_password is None else private_key_password.encode("utf-8") + try: + if b"BEGIN" in key_bytes: + key = load_pem_private_key(key_bytes, password=password_bytes) + else: + key = load_der_private_key(key_bytes, password=password_bytes) + except TypeError as exc: + raise RuntimeError( + "Invalid private key password configuration. " + f"If key is encrypted, set one of: {', '.join(password_secret_names)}." + ) from exc + except ValueError as exc: + password_hint = ( + f"Check secret value in: {', '.join(password_secret_names)}." + if private_key_password + else f"If key is encrypted, set one of: {', '.join(password_secret_names)}." + ) + raise RuntimeError(f"Unable to load XAdES private key. {password_hint}") from exc + + return key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode("utf-8") + + +def _load_test_xades_config() -> E2EConfig: + key_password = _optional_any( + "KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD", + "KSEF_XADES_PRIVATE_KEY_PASSWORD", + ) + return E2EConfig( + name="test", + base_url=_optional_any("KSEF_TEST_BASE_URL") or KsefEnvironment.TEST.value, + context_type=_required_any("KSEF_TEST_CONTEXT_TYPE"), + context_value=_required_any("KSEF_TEST_CONTEXT_VALUE"), + subject_type=_optional_any("KSEF_TEST_SUBJECT_TYPE") or "Subject1", + auth_mode=AUTH_MODE_XADES, + certificate_pem=_required_certificate_pem( + plain_names=("KSEF_TEST_XADES_CERT_CRT", "KSEF_TEST_XADES_CERT_PEM"), + b64_names=("KSEF_TEST_XADES_CERT_CRT_B64", "KSEF_TEST_XADES_CERT_PEM_B64"), + ), + private_key_pem=_required_private_key_pem( + plain_names=("KSEF_TEST_XADES_PRIVATE_KEY_PEM",), + b64_names=("KSEF_TEST_XADES_PRIVATE_KEY_PEM_B64",), + private_key_password=key_password, + password_secret_names=( + "KSEF_TEST_XADES_PRIVATE_KEY_PASSWORD", + "KSEF_XADES_PRIVATE_KEY_PASSWORD", + ), + ), + subject_identifier_type=( + _optional_any("KSEF_TEST_XADES_SUBJECT_IDENTIFIER_TYPE") or "certificateSubject" + ), + ) + + +def _load_demo_xades_config() -> E2EConfig: + key_password = _optional_any( + "KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD", + "KSEF_XADES_PRIVATE_KEY_PASSWORD", + ) + return E2EConfig( + name="demo", + base_url=_optional_any("KSEF_DEMO_BASE_URL", "KSEF_BASE_URL") or KsefEnvironment.DEMO.value, + context_type=_required_any("KSEF_DEMO_CONTEXT_TYPE", "KSEF_CONTEXT_TYPE"), + context_value=_required_any("KSEF_DEMO_CONTEXT_VALUE", "KSEF_CONTEXT_VALUE"), + subject_type=_optional_any("KSEF_DEMO_SUBJECT_TYPE", "KSEF_SUBJECT_TYPE") or "Subject1", + auth_mode=AUTH_MODE_XADES, + certificate_pem=_required_certificate_pem( + plain_names=("KSEF_DEMO_XADES_CERT_CRT", "KSEF_DEMO_XADES_CERT_PEM"), + b64_names=("KSEF_DEMO_XADES_CERT_CRT_B64", "KSEF_DEMO_XADES_CERT_PEM_B64"), + ), + private_key_pem=_required_private_key_pem( + plain_names=("KSEF_DEMO_XADES_PRIVATE_KEY_PEM",), + b64_names=("KSEF_DEMO_XADES_PRIVATE_KEY_PEM_B64",), + private_key_password=key_password, + password_secret_names=( + "KSEF_DEMO_XADES_PRIVATE_KEY_PASSWORD", + "KSEF_XADES_PRIVATE_KEY_PASSWORD", + ), + ), + subject_identifier_type=( + _optional_any("KSEF_DEMO_XADES_SUBJECT_IDENTIFIER_TYPE") or "certificateSubject" + ), + ) + + +def _utc_z(dt: datetime) -> str: + return dt.astimezone(timezone.utc).replace(microsecond=0).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def _query_invoice_metadata( + client: KsefClient, + *, + access_token: str, + subject_type: str, +) -> list[dict[str, Any]]: + now = datetime.now(timezone.utc) + lookback_days = _env_int("KSEF_E2E_LOOKBACK_DAYS", 30) + request_payload = { + "subjectType": subject_type, + "dateRange": { + "dateType": "Issue", + "from": _utc_z(now - timedelta(days=lookback_days)), + "to": _utc_z(now + timedelta(minutes=10)), + }, + } + response = _with_rate_limit_retry( + lambda: client.invoices.query_invoice_metadata( + request_payload, + access_token=access_token, + page_offset=0, + page_size=10, + sort_order="Desc", + ) + ) + invoices = response.get("invoices") or response.get("invoiceList") or [] + return [invoice for invoice in invoices if isinstance(invoice, dict)] + + +def _poll_metadata_until_contains( + client: KsefClient, + *, + access_token: str, + subject_type: str, + expected_ksef_number: str, +) -> list[dict[str, Any]]: + max_attempts = _env_int("KSEF_E2E_METADATA_MAX_ATTEMPTS", 45) + poll_interval = _env_float("KSEF_E2E_POLL_INTERVAL_SECONDS", 2.0) + + for _ in range(max_attempts): + invoices = _query_invoice_metadata( + client, + access_token=access_token, + subject_type=subject_type, + ) + if any(_extract_ksef_number(invoice) == expected_ksef_number for invoice in invoices): + return invoices + time.sleep(poll_interval) + raise TimeoutError("Expected sent invoice did not appear in invoice metadata list.") + + +def _build_invoice_xml(*, seller_nip: str, environment_name: str) -> bytes: + now = datetime.now(timezone.utc).replace(microsecond=0) + issue_date = now.date().isoformat() + period_start = now.date().replace(day=1).isoformat() + payment_due = (now.date() + timedelta(days=14)).isoformat() + invoice_number = ( + f"E2E/{environment_name.upper()}/{now.strftime('%Y%m%d%H%M%S')}/{uuid.uuid4().hex[:8]}" + ) + + xml = f""" + + + FA + 3 + {_utc_z(now)} + pytest + + + + {seller_nip} + Automated seller + + + PL + ul. Testowa 1 + 00-001 Warszawa + + + + + 1111111111 + Automated buyer + + + PL + ul. Odbiorcy 1 + 00-002 Warszawa + + + buyer@example.com + 555777999 + + 99999999 + 2 + 2 + + + PLN + {issue_date} + miejscowosc + {invoice_number} + + {period_start} + {payment_due} + + 1.00 + 0.23 + 1.23 + + 2 + 2 + 2 + 2 + + 1 + + + 1 + + 2 + + 1 + + + VAT + + 1 + Pozycja testowa + szt + 1.00 + 1.00 + 1.00 + 23 + + + + 0.00 + brak + + 0.00 + + 0.00 + brak + + 0.00 + 1.23 + + + + {payment_due} + + 6 + + 73111111111111111111111111 + Bank testowy + PLN + + + + +""" + return xml.encode("utf-8") + + +def _authenticate_access_token( + client: KsefClient, + *, + config: E2EConfig, + token_cert: str, +) -> str: + coordinator = AuthCoordinator(client.auth) + poll_interval_seconds = _env_float("KSEF_E2E_POLL_INTERVAL_SECONDS", 2.0) + max_attempts = _env_int("KSEF_E2E_AUTH_MAX_ATTEMPTS", 90) + + if config.auth_mode == AUTH_MODE_TOKEN: + if not config.token: + raise RuntimeError("Missing token for token auth mode.") + result = coordinator.authenticate_with_ksef_token( + token=config.token, + public_certificate=token_cert, + context_identifier_type=config.context_type, + context_identifier_value=config.context_value, + max_attempts=max_attempts, + poll_interval_seconds=poll_interval_seconds, + ) + return result.tokens.access_token.token + + if config.auth_mode == AUTH_MODE_XADES: + if not config.certificate_pem or not config.private_key_pem: + raise RuntimeError("Missing certificate/private key for XAdES auth mode.") + result = coordinator.authenticate_with_xades( + context_identifier_type=config.context_type, + context_identifier_value=config.context_value, + subject_identifier_type=config.subject_identifier_type or "certificateSubject", + certificate_pem=config.certificate_pem, + private_key_pem=config.private_key_pem, + max_attempts=max_attempts, + poll_interval_seconds=poll_interval_seconds, + ) + return result.tokens.access_token.token + + raise RuntimeError(f"Unknown auth mode: {config.auth_mode}") + + +def _run_full_e2e_flow(config: E2EConfig) -> None: + if config.context_type.lower() != "nip": + raise RuntimeError("This E2E scenario requires context_type='nip'.") + + options = KsefClientOptions(base_url=config.base_url) + with KsefClient(options) as client: + certs = _with_rate_limit_retry(lambda: client.security.get_public_key_certificates()) + token_cert = _select_certificate(certs, "KsefTokenEncryption") + symmetric_cert = _select_certificate(certs, "SymmetricKeyEncryption") + + access_token = _authenticate_access_token( + client, + config=config, + token_cert=token_cert, + ) + assert access_token, "Authentication did not return access token." + + workflow = OnlineSessionWorkflow(client.sessions) + session = workflow.open_session( + form_code=FORM_CODE, + public_certificate=symmetric_cert, + access_token=access_token, + ) + + try: + send_result = _with_rate_limit_retry( + lambda: workflow.send_invoice( + session_reference_number=session.session_reference_number, + invoice_xml=_build_invoice_xml( + seller_nip=config.context_value, + environment_name=config.name, + ), + encryption_data=session.encryption_data, + access_token=access_token, + ) + ) + invoice_reference_number = send_result.get("referenceNumber") + assert isinstance(invoice_reference_number, str) and invoice_reference_number, ( + "Missing invoice reference number after send." + ) + + ksef_number = _poll_for_ksef_number( + client, + session.session_reference_number, + invoice_reference_number, + access_token, + ) + + upo_bytes = _poll_for_upo( + lambda: client.sessions.get_session_invoice_upo_by_ksef( + session.session_reference_number, + ksef_number, + access_token=access_token, + ) + ) + assert upo_bytes, "Received empty UPO payload." + + session_invoices = _with_rate_limit_retry( + lambda: client.sessions.get_session_invoices( + session.session_reference_number, + access_token=access_token, + page_size=20, + ) + ) + invoices_in_session = session_invoices.get("invoices") or [] + assert any( + _extract_ksef_number(invoice) == ksef_number + for invoice in invoices_in_session + if isinstance(invoice, dict) + ), "Sent invoice not found in session invoice list." + + metadata_invoices = _poll_metadata_until_contains( + client, + access_token=access_token, + subject_type=config.subject_type, + expected_ksef_number=ksef_number, + ) + latest_ksef_number = _first_ksef_number(metadata_invoices) + + latest_invoice = _with_rate_limit_retry( + lambda: client.invoices.get_invoice_bytes( + latest_ksef_number, + access_token=access_token, + ) + ) + assert latest_invoice.content, "Downloaded latest invoice content is empty." + finally: + with suppress(Exception): + workflow.close_session(session.session_reference_number, access_token) + + +def test_e2e_test_environment_full_flow_token() -> None: + _ensure_e2e_enabled() + _run_full_e2e_flow(_load_test_token_config()) + + +def test_e2e_demo_environment_full_flow_token() -> None: + _ensure_e2e_enabled() + _run_full_e2e_flow(_load_demo_token_config()) + + +def test_e2e_test_environment_full_flow_xades() -> None: + _ensure_e2e_enabled() + _run_full_e2e_flow(_load_test_xades_config()) + + +def test_e2e_demo_environment_full_flow_xades() -> None: + _ensure_e2e_enabled() + _run_full_e2e_flow(_load_demo_xades_config())