diff --git a/.env.example b/.env.example index 4d0119d..811e352 100644 --- a/.env.example +++ b/.env.example @@ -3,6 +3,7 @@ HOTDATA_WORKSPACE= HOTDATA_DATABASE=dlt HOTDATA_SCHEMA=public HOTDATA_WRITE_DISPOSITION=append +HOTDATA_DECLARED_TABLES=customers,orders HOTDATA_CREATE_DATABASE_IF_MISSING=true HOTDATA_API_BASE_URL=https://api.hotdata.dev HOTDATA_MAX_RETRIES=5 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 66c6c1e..a6cf124 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -4,6 +4,12 @@ on: push: tags: - 'v[0-9]*' + workflow_dispatch: + inputs: + tag: + description: 'Existing tag to create or update a GitHub Release for (e.g. v0.3.0)' + required: true + type: string permissions: contents: write @@ -12,8 +18,23 @@ jobs: release: name: Create GitHub Release runs-on: ubuntu-latest + env: + RELEASE_TAG: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.tag || github.ref_name }} steps: + - name: Validate release tag format + if: github.event_name == 'workflow_dispatch' + run: | + set -euo pipefail + tag="${{ github.event.inputs.tag }}" + if [[ ! "$tag" =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then + echo "tag must look like vX.Y.Z, got: $tag" >&2 + exit 1 + fi + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6 + with: + ref: ${{ env.RELEASE_TAG }} + fetch-depth: 0 - uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6 with: @@ -23,7 +44,7 @@ jobs: id: meta run: | pkg_name=$(python -c "import tomllib,pathlib; print(tomllib.loads(pathlib.Path('pyproject.toml').read_text())['project']['name'])") - pkg_version="${GITHUB_REF_NAME#v}" + pkg_version="${RELEASE_TAG#v}" echo "name=${pkg_name}" >> "$GITHUB_OUTPUT" echo "version=${pkg_version}" >> "$GITHUB_OUTPUT" @@ -31,7 +52,7 @@ jobs: id: notes run: | set -euo pipefail - version="${GITHUB_REF_NAME#v}" + version="${RELEASE_TAG#v}" if [[ -f CHANGELOG.md ]]; then body="$(python scripts/extract-changelog.py "$version")" else @@ -45,9 +66,9 @@ jobs: } >> "$GITHUB_OUTPUT" - name: Create GitHub Release - uses: softprops/action-gh-release@1e812e8210a4a8a0b23075e5795f2a4e2b2a0b7 # v2.2.2 + uses: softprops/action-gh-release@da05d552573ad5aba039eaac05058a918a7bf631 # v2.2.2 with: - tag_name: ${{ github.ref_name }} + tag_name: ${{ env.RELEASE_TAG }} name: ${{ steps.meta.outputs.name }} ${{ steps.meta.outputs.version }} body: ${{ steps.notes.outputs.body }} generate_release_notes: false diff --git a/CHANGELOG.md b/CHANGELOG.md index cbbe505..3ba9171 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] + +## [0.3.0] - 2026-05-20 + +### Changed + +- Replace SQL staging append/merge with read-modify-write using supported API operations only (`SELECT`, `upload_parquet`, `load_managed_table(replace)`). +- Switch to dlt parquet file mode (`batch_size=0`) instead of re-encoding typed-jsonl batches. +- Require `hotdata>=0.2.2` for reliable `ApiClient.close()` lifecycle support. + +### Added + +- Per-resource `write_disposition` and `primary_key` from dlt table schema. +- `declared_tables` destination config (and `HOTDATA_DECLARED_TABLES` env var) for multi-table managed databases. +- `DestinationTerminalException` mapping for non-retryable Hotdata errors. +- Synced-table guard before `SELECT` to avoid 500s on never-loaded managed tables. + +### Removed + +- `sql.py` DML staging path (Hotdata query API does not support DML/DDL on managed tables). + ## [0.2.0] - 2026-05-20 ### Changed diff --git a/README.md b/README.md index 33fa488..c0130ac 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,8 @@ ## What this repo includes - Custom destination via `@dlt.destination` in `src/hotdata_dlt_destination/destination.py` -- Managed-database ingestion through `hotdata-runtime` (`upload_parquet`, `load_managed_table`, SQL merge) +- Managed-database ingestion through `hotdata-runtime` (`upload_parquet`, `load_managed_table`, `SELECT`) +- Read-modify-write append/merge using only supported API operations - Deterministic batch and row idempotency keys - Example pipelines: - `hotdata-dlt-basic-pipeline` (append) @@ -20,11 +21,10 @@ - Schema: `public` - Table name: normalized lowercase dlt table identifier - Nested table names: `{parent}__{child}` -- Staging tables: `_dlt_staging_{table}` for append/merge batches -- Write semantics: - - `append`: parquet upload → staging load → SQL insert into target - - `replace`: parquet upload → direct managed-table replace on target - - `upsert`/`merge`: parquet upload → staging load → SQL delete by `_hotdata_row_key` + insert +- Write semantics (all use `load_managed_table(replace)` under the hood): + - `replace`: upload batch parquet and replace the target table + - `append`: read existing target rows, append batch in Python, replace target + - `upsert`/`merge`: read existing rows, upsert by dlt `primary_key` (or `_hotdata_row_key`), replace target - Idempotency: - Batch key `_hotdata_batch_key` = hash(table + full batch payload) - Row key `_hotdata_row_key` = hash(table + canonical row payload) @@ -36,7 +36,16 @@ Set environment variables (or pass destination kwargs / dlt secrets): - `HOTDATA_API_KEY` - `HOTDATA_WORKSPACE` - `HOTDATA_DATABASE` (managed database name, default `dlt`) -- optional: `HOTDATA_SCHEMA`, `HOTDATA_WRITE_DISPOSITION`, retry tuning +- optional: `HOTDATA_SCHEMA`, `HOTDATA_WRITE_DISPOSITION`, `HOTDATA_DECLARED_TABLES`, retry tuning + +For pipelines with multiple tables, declare every target table when the managed database is first created: + +```python +hotdata_destination( + database_name="analytics", + declared_tables=["customers", "orders", "orders__items"], +) +``` ## Usage @@ -49,11 +58,14 @@ pipeline = dlt.pipeline( destination=hotdata_destination( database_name="analytics", write_disposition="append", + declared_tables=["customers"], ), ) pipeline.run(my_resource()) ``` +Per-resource `write_disposition` and `primary_key` from dlt take precedence over the destination default. + ## Developer workflow ```bash diff --git a/RELEASING.md b/RELEASING.md index 0b50ff4..6516555 100644 --- a/RELEASING.md +++ b/RELEASING.md @@ -34,6 +34,18 @@ Pushing a `vX.Y.Z` tag triggers two workflows: | `publish.yml` | Build wheel/sdist and publish to PyPI | | `release.yml` | Create the GitHub Release with notes from `CHANGELOG.md` | +## Recover a missing GitHub Release + +If PyPI publish succeeded but the GitHub Release workflow failed, rerun it from `master` +without retagging: + +```bash +gh workflow run "GitHub Release" --ref master -f tag=vX.Y.Z +``` + +The tag must already exist on the remote. The workflow checks out that tag, extracts the +matching `CHANGELOG.md` section, and creates or updates the GitHub Release. + ## Enforcement - **PR check** (`check-release.yml`): if `pyproject.toml` version changes, `CHANGELOG.md` must contain a matching `## [X.Y.Z]` section. diff --git a/docs/architecture.md b/docs/architecture.md index 24f70d7..eeb2bb9 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -8,8 +8,8 @@ Provide a custom `dlt` destination for Hotdata managed databases with explicit c - `destination.py`: custom destination entrypoint (`@dlt.destination`) - `hotdata_client.py`: retry wrapper over `hotdata-runtime` managed-database APIs -- `parquet.py`: batch row serialization to parquet uploads -- `sql.py`: append/replace/merge SQL against qualified managed tables +- `parquet.py`: parquet read/write for dlt load files and uploads +- `merge.py`: read-modify-write row combining for append/merge dispositions - `contracts.py`: deterministic database/schema/table mapping - `idempotency.py`: stable batch and row key generation - `errors.py`: transient vs terminal error mapping @@ -17,24 +17,26 @@ Provide a custom `dlt` destination for Hotdata managed databases with explicit c ## Ingestion flow -1. `dlt` sends `(items, table)` into `hotdata_destination`. +1. `dlt` sends a parquet load file path and `table` schema into `hotdata_destination`. 2. Contract mapping converts table metadata into `{database}.{schema}.{table}` naming. 3. Each row receives `_hotdata_batch_key`, `_hotdata_row_key`, and `_hotdata_loaded_at`. -4. Rows are written to a temporary parquet file and uploaded via `upload_parquet`. +4. Write disposition comes from the dlt table schema, falling back to the destination default. 5. Managed database is resolved or created (`create_managed_database` when enabled). -6. Load path depends on write disposition: - - `replace`: `load_managed_table` directly on the target table (`mode=replace`) - - `append` / `merge`: `load_managed_table` into `_dlt_staging_{table}`, then SQL into target +6. Load path uses only supported API operations: + - `replace`: upload parquet batch and `load_managed_table(replace)` on the target + - `append` / `merge`: `SELECT *` existing target rows, combine in Python, then replace the target ## Reliability model - Retries: bounded retries with linear backoff - Retryable classes: HTTP 408/409/425/429, HTTP 5xx, network timeout/connect failures -- Terminal classes: remaining HTTP/client errors +- Terminal classes: remaining HTTP/client errors, surfaced as `DestinationTerminalException` - Idempotency: stable row and batch keys derived from canonical JSON +- Parallelism: `table-sequential` load jobs to avoid concurrent read-modify-write races ## Known limitations -- Managed-table loads currently support `mode=replace` only; append/merge use staging + SQL. -- `replace` replaces the full target table per batch (best for single-batch resources). +- Managed-table loads only support `mode=replace`; append/merge are emulated via read-modify-write. +- Tables must be declared when the managed database is created; use `declared_tables` for multi-table pipelines. +- Read-modify-write reads the full target table on every append/merge batch. - This implementation is a custom destination callable, not a native dlt destination plugin package. diff --git a/docs/runbook.md b/docs/runbook.md index e5540e9..af8f375 100644 --- a/docs/runbook.md +++ b/docs/runbook.md @@ -55,7 +55,7 @@ uv run pytest tests/test_e2e_linear_hotdata.py -m integration 1. Create a new module in `src/hotdata_dlt_destination/pipelines/`. 2. Define a `dlt.resource` with explicit `name` and `write_disposition`. -3. Build pipeline with `destination=hotdata_destination(database_name=..., write_disposition=...)`. +3. Build pipeline with `destination=hotdata_destination(database_name=..., declared_tables=[...])`. 4. Add script entrypoint in `pyproject.toml`. 5. Add tests covering schema shape, idempotency key behavior, and retry/error handling. @@ -70,4 +70,5 @@ uv run pytest tests/test_e2e_linear_hotdata.py -m integration - `401` / `403`: verify `HOTDATA_API_KEY` and `HOTDATA_WORKSPACE`. - `404` on destination paths: verify `HOTDATA_API_BASE_URL` is the API host (for example `https://api.hotdata.dev`). - `429` / `5xx`: increase retry/backoff values. -- SQL merge errors: verify managed database/table names exist and columns are valid in your workspace. +- `table not declared`: recreate the managed database with all target tables in `declared_tables`, or declare them at create time. +- Append/merge loads re-read the full target table each batch; large tables may be slow until native append/merge API support lands. diff --git a/pyproject.toml b/pyproject.toml index 008d357..1a01014 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "hotdata-dlt-destination" -version = "0.2.0" +version = "0.3.0" description = "dlt destination for loading data into Hotdata managed databases." readme = "README.md" authors = [ @@ -9,7 +9,7 @@ authors = [ requires-python = ">=3.11" dependencies = [ "dlt>=1.26.0", - "hotdata>=0.2.0", + "hotdata>=0.2.2", "hotdata-runtime>=0.1.1", "pyarrow>=14", ] diff --git a/src/hotdata_dlt_destination/config.py b/src/hotdata_dlt_destination/config.py index 7c90859..f1a510d 100644 --- a/src/hotdata_dlt_destination/config.py +++ b/src/hotdata_dlt_destination/config.py @@ -13,11 +13,16 @@ class HotdataDestinationConfig: schema: str = "public" write_disposition: str = "append" create_database_if_missing: bool = True + declared_tables: tuple[str, ...] = () max_retries: int = 5 retry_backoff_seconds: float = 1.0 @classmethod def from_env(cls) -> HotdataDestinationConfig: + declared = os.environ.get("HOTDATA_DECLARED_TABLES", "") + declared_tables = tuple( + table.strip() for table in declared.split(",") if table.strip() + ) return cls( api_key=os.environ["HOTDATA_API_KEY"], workspace_id=os.environ["HOTDATA_WORKSPACE"], @@ -29,6 +34,7 @@ def from_env(cls) -> HotdataDestinationConfig: "HOTDATA_CREATE_DATABASE_IF_MISSING", "true" ).lower() in {"1", "true", "yes"}, + declared_tables=declared_tables, max_retries=int(os.environ.get("HOTDATA_MAX_RETRIES", "5")), retry_backoff_seconds=float(os.environ.get("HOTDATA_RETRY_BACKOFF_SECONDS", "1.0")), ) diff --git a/src/hotdata_dlt_destination/contracts.py b/src/hotdata_dlt_destination/contracts.py index 70846aa..aba5bec 100644 --- a/src/hotdata_dlt_destination/contracts.py +++ b/src/hotdata_dlt_destination/contracts.py @@ -6,7 +6,6 @@ from dlt.common.schema import TTableSchema IDENTIFIER_RE = re.compile(r"[^a-zA-Z0-9_]") -STAGING_PREFIX = "_dlt_staging_" def normalize_identifier(value: str) -> str: @@ -19,16 +18,11 @@ class TableContract: database_name: str schema: str table_name: str - staging_table_name: str @property def qualified_target(self) -> str: return f"{self.database_name}.{self.schema}.{self.table_name}" - @property - def qualified_staging(self) -> str: - return f"{self.database_name}.{self.schema}.{self.staging_table_name}" - @classmethod def from_table_schema( cls, @@ -50,5 +44,23 @@ def from_table_schema( database_name=normalize_identifier(database_name), schema=normalize_identifier(schema), table_name=normalized_table_name, - staging_table_name=f"{STAGING_PREFIX}{normalized_table_name}", + ) + + @classmethod + def declared_table_names( + cls, + *, + database_name: str, + schema: str, + table_names: list[str], + ) -> list[str]: + return sorted( + { + cls.from_table_schema( + {"name": table_name}, + database_name=database_name, + schema=schema, + ).table_name + for table_name in table_names + } ) diff --git a/src/hotdata_dlt_destination/destination.py b/src/hotdata_dlt_destination/destination.py index 4bee36a..1760a72 100644 --- a/src/hotdata_dlt_destination/destination.py +++ b/src/hotdata_dlt_destination/destination.py @@ -6,23 +6,35 @@ from typing import Any import dlt +from dlt.common.destination.exceptions import DestinationTerminalException from dlt.common.schema import TTableSchema from dlt.common.typing import TDataItems from hotdata_dlt_destination.config import HotdataDestinationConfig from hotdata_dlt_destination.contracts import TableContract +from hotdata_dlt_destination.errors import HotdataTerminalError from hotdata_dlt_destination.hotdata_client import HotdataClient from hotdata_dlt_destination.idempotency import compute_batch_key, compute_row_key -from hotdata_dlt_destination.parquet import write_rows_parquet -from hotdata_dlt_destination.sql import append_sql, merge_sql +from hotdata_dlt_destination.merge import ( + combine_rows, + resolve_primary_key, + resolve_write_disposition, +) +from hotdata_dlt_destination.parquet import read_parquet_rows, write_rows_parquet + + +def _load_batch_rows(items: TDataItems | str) -> list[dict[str, Any]]: + if isinstance(items, str): + return read_parquet_rows(items) + return [dict(item) for item in items] def _augment_rows( *, table_name: str, - items: TDataItems, + items: TDataItems | str, ) -> tuple[str, list[dict[str, Any]]]: - rows = [dict(item) for item in items] + rows = _load_batch_rows(items) batch_key = compute_batch_key(table_name, rows) loaded_at = datetime.now(UTC).isoformat() augmented_rows = [ @@ -37,16 +49,30 @@ def _augment_rows( return batch_key, augmented_rows +def _declared_tables( + *, + contract: TableContract, + declared_tables: list[str] | None, +) -> list[str]: + normalized_declared = TableContract.declared_table_names( + database_name=contract.database_name, + schema=contract.schema, + table_names=declared_tables or [], + ) + return sorted({*normalized_declared, contract.table_name}) + + @dlt.destination( - batch_size=500, - loader_file_format="typed-jsonl", + batch_size=0, + loader_file_format="parquet", + loader_parallelism_strategy="table-sequential", name="hotdata", naming_convention="direct", max_table_nesting=0, skip_dlt_columns_and_tables=True, ) def hotdata_destination( - items: TDataItems, + items: TDataItems | str, table: TTableSchema, api_key: str = dlt.secrets.value, workspace_id: str = dlt.secrets.value, @@ -54,6 +80,7 @@ def hotdata_destination( database_name: str = "dlt", schema: str = "public", write_disposition: str = "append", + declared_tables: list[str] | None = None, create_database_if_missing: bool = True, max_retries: int = 5, retry_backoff_seconds: float = 1.0, @@ -65,6 +92,7 @@ def hotdata_destination( database_name=database_name, schema=schema, write_disposition=write_disposition, + declared_tables=tuple(declared_tables or ()), create_database_if_missing=create_database_if_missing, max_retries=max_retries, retry_backoff_seconds=retry_backoff_seconds, @@ -74,7 +102,9 @@ def hotdata_destination( database_name=config.database_name, schema=config.schema, ) - _, rows = _augment_rows(table_name=contract.table_name, items=items) + disposition = resolve_write_disposition(table, config.write_disposition) + primary_key = resolve_primary_key(table) + _, batch_rows = _augment_rows(table_name=contract.table_name, items=items) client = HotdataClient( api_key=config.api_key, @@ -86,47 +116,43 @@ def hotdata_destination( parquet_path = "" try: - with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as handle: - parquet_path = handle.name - write_rows_parquet(rows, parquet_path) - client.ensure_managed_database( contract.database_name, schema=contract.schema, - tables=[contract.table_name, contract.staging_table_name], + tables=_declared_tables( + contract=contract, + declared_tables=list(config.declared_tables), + ), create_if_missing=config.create_database_if_missing, ) - upload_id = client.upload_parquet(parquet_path) - if config.write_disposition == "replace": - client.load_managed_table( - contract.database_name, - contract.table_name, + rows_to_load = batch_rows + if disposition != "replace": + existing_rows = client.fetch_table_rows( + database=contract.database_name, schema=contract.schema, - upload_id=upload_id, + table=contract.table_name, + ) + rows_to_load = combine_rows( + disposition=disposition, + existing=existing_rows, + incoming=batch_rows, + primary_key=primary_key, ) - return + with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as handle: + parquet_path = handle.name + write_rows_parquet(rows_to_load, parquet_path) + + upload_id = client.upload_parquet(parquet_path) client.load_managed_table( contract.database_name, - contract.staging_table_name, + contract.table_name, schema=contract.schema, upload_id=upload_id, ) - - if config.write_disposition in ("merge", "upsert"): - statements = merge_sql( - target=contract.qualified_target, - staging=contract.qualified_staging, - ) - else: - statements = append_sql( - target=contract.qualified_target, - staging=contract.qualified_staging, - ) - - for statement in statements: - client.execute_sql(statement) + except HotdataTerminalError as error: + raise DestinationTerminalException(str(error)) from error finally: if parquet_path: os.unlink(parquet_path) diff --git a/src/hotdata_dlt_destination/hotdata_client.py b/src/hotdata_dlt_destination/hotdata_client.py index a9aabd6..52edacb 100644 --- a/src/hotdata_dlt_destination/hotdata_client.py +++ b/src/hotdata_dlt_destination/hotdata_client.py @@ -2,7 +2,7 @@ import time from collections.abc import Callable -from typing import TypeVar +from typing import Any, TypeVar from hotdata.rest import ApiException from hotdata_runtime.client import HotdataClient as RuntimeClient @@ -62,6 +62,34 @@ def operation() -> ManagedDatabase: return self._request_with_retry(operation) + def table_is_synced( + self, + database: str, + table: str, + *, + schema: str, + ) -> bool: + for managed_table in self._runtime.list_managed_tables(database, schema=schema): + if managed_table.table == table: + return managed_table.synced + return False + + def fetch_table_rows( + self, + *, + database: str, + schema: str, + table: str, + ) -> list[dict[str, Any]]: + def operation() -> list[dict[str, Any]]: + if not self.table_is_synced(database, table, schema=schema): + return [] + qualified_table = f"{database}.{schema}.{table}" + result = self._runtime.execute_sql(f"SELECT * FROM {qualified_table}") + return result.to_records() + + return self._request_with_retry(operation) + def upload_parquet(self, path: str) -> str: return self._request_with_retry(lambda: self._runtime.upload_parquet(path)) @@ -82,9 +110,6 @@ def load_managed_table( ) ) - def execute_sql(self, sql: str) -> None: - self._request_with_retry(lambda: self._runtime.execute_sql(sql)) - def _request_with_retry(self, operation: Callable[[], T]) -> T: for attempt in range(1, self._max_retries + 1): try: diff --git a/src/hotdata_dlt_destination/merge.py b/src/hotdata_dlt_destination/merge.py new file mode 100644 index 0000000..86e24fa --- /dev/null +++ b/src/hotdata_dlt_destination/merge.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from typing import Any + +from dlt.common.schema import TTableSchema + +SUPPORTED_WRITE_DISPOSITIONS = frozenset({"replace", "append", "merge", "upsert"}) + + +def resolve_write_disposition(table: TTableSchema, default: str) -> str: + disposition = table.get("write_disposition") or default + return str(disposition).lower() + + +def resolve_primary_key(table: TTableSchema) -> list[str] | None: + primary_key = table.get("primary_key") + if primary_key is None: + return None + if isinstance(primary_key, str): + return [primary_key] + return [str(key) for key in primary_key] + + +def row_key(row: dict[str, Any], keys: list[str]) -> tuple[Any, ...]: + return tuple(row.get(key) for key in keys) + + +def append_rows( + existing: list[dict[str, Any]], + incoming: list[dict[str, Any]], +) -> list[dict[str, Any]]: + return [*existing, *incoming] + + +def merge_rows( + existing: list[dict[str, Any]], + incoming: list[dict[str, Any]], + *, + primary_key: list[str], +) -> list[dict[str, Any]]: + merged = list(existing) + index = {row_key(row, primary_key): position for position, row in enumerate(merged)} + for row in incoming: + key = row_key(row, primary_key) + if key in index: + merged[index[key]] = row + else: + index[key] = len(merged) + merged.append(row) + return merged + + +def combine_rows( + *, + disposition: str, + existing: list[dict[str, Any]], + incoming: list[dict[str, Any]], + primary_key: list[str] | None, +) -> list[dict[str, Any]]: + if disposition == "replace": + return incoming + if disposition in ("merge", "upsert"): + keys = primary_key or ["_hotdata_row_key"] + return merge_rows(existing, incoming, primary_key=keys) + if disposition == "append": + return append_rows(existing, incoming) + raise ValueError( + f"Unsupported write_disposition {disposition!r}. " + f"Expected one of: {', '.join(sorted(SUPPORTED_WRITE_DISPOSITIONS))}" + ) diff --git a/src/hotdata_dlt_destination/parquet.py b/src/hotdata_dlt_destination/parquet.py index 24cb14d..c6b5a85 100644 --- a/src/hotdata_dlt_destination/parquet.py +++ b/src/hotdata_dlt_destination/parquet.py @@ -7,6 +7,11 @@ import pyarrow.parquet as pq +def read_parquet_rows(path: str | Path) -> list[dict[str, Any]]: + table = pq.read_table(Path(path)) + return table.to_pylist() + + def write_rows_parquet(rows: list[dict[str, Any]], path: str | Path) -> None: table = pa.Table.from_pylist(rows) pq.write_table(table, Path(path)) diff --git a/src/hotdata_dlt_destination/pipelines/basic_pipeline.py b/src/hotdata_dlt_destination/pipelines/basic_pipeline.py index 0315827..1baf494 100644 --- a/src/hotdata_dlt_destination/pipelines/basic_pipeline.py +++ b/src/hotdata_dlt_destination/pipelines/basic_pipeline.py @@ -17,7 +17,10 @@ def customers_resource() -> list[dict[str, object]]: def main() -> None: pipeline = dlt.pipeline( pipeline_name="hotdata_basic", - destination=hotdata_destination(write_disposition="append"), # type: ignore[call-arg] + destination=hotdata_destination( + write_disposition="append", + declared_tables=["customers"], + ), # type: ignore[call-arg] dataset_name="hotdata_basic", ) load_info = pipeline.run(customers_resource()) diff --git a/src/hotdata_dlt_destination/pipelines/incremental_pipeline.py b/src/hotdata_dlt_destination/pipelines/incremental_pipeline.py index 18861ae..995675f 100644 --- a/src/hotdata_dlt_destination/pipelines/incremental_pipeline.py +++ b/src/hotdata_dlt_destination/pipelines/incremental_pipeline.py @@ -25,7 +25,10 @@ def orders_resource(since: str = "2026-01-01T00:00:00+00:00") -> list[dict[str, def main() -> None: pipeline = dlt.pipeline( pipeline_name="hotdata_incremental", - destination=hotdata_destination(write_disposition="upsert"), # type: ignore[call-arg] + destination=hotdata_destination( + write_disposition="upsert", + declared_tables=["orders"], + ), # type: ignore[call-arg] dataset_name="hotdata_incremental", ) load_info = pipeline.run(orders_resource()) diff --git a/src/hotdata_dlt_destination/pipelines/linear_pipeline.py b/src/hotdata_dlt_destination/pipelines/linear_pipeline.py index 206eb93..a65bd62 100644 --- a/src/hotdata_dlt_destination/pipelines/linear_pipeline.py +++ b/src/hotdata_dlt_destination/pipelines/linear_pipeline.py @@ -92,7 +92,11 @@ def linear_issues_resource( def main() -> None: pipeline = dlt.pipeline( pipeline_name="hotdata_linear", - destination=hotdata_destination(write_disposition="upsert", database_name="linear"), # type: ignore[call-arg] + destination=hotdata_destination( + write_disposition="upsert", + database_name="linear", + declared_tables=["linear_issues"], + ), # type: ignore[call-arg] dataset_name="hotdata_linear", ) linear_api_key = os.environ["LINEAR_API_KEY"] diff --git a/src/hotdata_dlt_destination/sql.py b/src/hotdata_dlt_destination/sql.py deleted file mode 100644 index cd1c66b..0000000 --- a/src/hotdata_dlt_destination/sql.py +++ /dev/null @@ -1,24 +0,0 @@ -from __future__ import annotations - - -def append_sql(*, target: str, staging: str) -> list[str]: - return [ - f"CREATE TABLE IF NOT EXISTS {target} AS SELECT * FROM {staging} WHERE 1=0", - f"INSERT INTO {target} SELECT * FROM {staging}", - ] - - -def replace_from_staging_sql(*, target: str, staging: str) -> list[str]: - return [ - f"DROP TABLE IF EXISTS {target}", - f"CREATE TABLE {target} AS SELECT * FROM {staging}", - ] - - -def merge_sql(*, target: str, staging: str) -> list[str]: - return [ - f"CREATE TABLE IF NOT EXISTS {target} AS SELECT * FROM {staging} WHERE 1=0", - f"DELETE FROM {target} t USING {staging} s " - "WHERE t._hotdata_row_key = s._hotdata_row_key", - f"INSERT INTO {target} SELECT * FROM {staging}", - ] diff --git a/tests/test_client.py b/tests/test_client.py index f259ccf..9bb3af5 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -61,3 +61,77 @@ def close(self) -> None: assert fake_runtime.upload_calls == 1 assert fake_runtime.load_calls == 1 client.close() + + +def test_fetch_table_rows_skips_unsynced_tables() -> None: + class FakeRuntime: + def list_managed_tables(self, database: str, *, schema: str): + assert database == "dlt" + assert schema == "public" + return [SimpleNamespace(table="orders", synced=False)] + + def close(self) -> None: + return None + + client = HotdataClient( + api_key="k", + workspace_id="ws_1", + api_base_url="https://api.hotdata.dev", + max_retries=1, + retry_backoff_seconds=0.0, + ) + client._runtime = FakeRuntime() # noqa: SLF001 + + rows = client.fetch_table_rows(database="dlt", schema="public", table="orders") + assert rows == [] + client.close() + + +def test_fetch_table_rows_reads_synced_table() -> None: + class FakeRuntime: + def list_managed_tables(self, database: str, *, schema: str): + return [SimpleNamespace(table="orders", synced=True)] + + def execute_sql(self, sql: str): + assert sql == "SELECT * FROM dlt.public.orders" + return SimpleNamespace( + to_records=lambda: [{"id": 1, "name": "alpha"}], + ) + + def close(self) -> None: + return None + + client = HotdataClient( + api_key="k", + workspace_id="ws_1", + api_base_url="https://api.hotdata.dev", + max_retries=1, + retry_backoff_seconds=0.0, + ) + client._runtime = FakeRuntime() # noqa: SLF001 + + rows = client.fetch_table_rows(database="dlt", schema="public", table="orders") + assert rows == [{"id": 1, "name": "alpha"}] + client.close() + + +def test_fetch_table_rows_returns_empty_when_table_missing() -> None: + class FakeRuntime: + def list_managed_tables(self, database: str, *, schema: str): + return [] + + def close(self) -> None: + return None + + client = HotdataClient( + api_key="k", + workspace_id="ws_1", + api_base_url="https://api.hotdata.dev", + max_retries=1, + retry_backoff_seconds=0.0, + ) + client._runtime = FakeRuntime() # noqa: SLF001 + + rows = client.fetch_table_rows(database="dlt", schema="public", table="orders") + assert rows == [] + client.close() diff --git a/tests/test_contracts.py b/tests/test_contracts.py index db5caf6..1e8481f 100644 --- a/tests/test_contracts.py +++ b/tests/test_contracts.py @@ -15,6 +15,13 @@ def test_table_contract_mapping() -> None: assert contract.database_name == "linear" assert contract.schema == "public" assert contract.table_name == "lineitems" - assert contract.staging_table_name == "_dlt_staging_lineitems" assert contract.qualified_target == "linear.public.lineitems" - assert contract.qualified_staging == "linear.public._dlt_staging_lineitems" + + +def test_declared_table_names_normalizes_identifiers() -> None: + names = TableContract.declared_table_names( + database_name="Linear", + schema="public", + table_names=["LineItems", "Customers"], + ) + assert names == ["customers", "lineitems"] diff --git a/tests/test_destination.py b/tests/test_destination.py index b9548ad..39f2772 100644 --- a/tests/test_destination.py +++ b/tests/test_destination.py @@ -1,5 +1,4 @@ from hotdata_dlt_destination.destination import _augment_rows -from hotdata_dlt_destination.sql import append_sql, merge_sql, replace_from_staging_sql def test_augment_rows_adds_metadata() -> None: @@ -11,15 +10,3 @@ def test_augment_rows_adds_metadata() -> None: assert rows[0]["_hotdata_batch_key"] == batch_key assert "_hotdata_row_key" in rows[0] assert "_hotdata_loaded_at" in rows[0] - - -def test_sql_generation() -> None: - append = append_sql(target="dlt.public.orders", staging="dlt.public._dlt_staging_orders") - replace = replace_from_staging_sql( - target="dlt.public.orders", - staging="dlt.public._dlt_staging_orders", - ) - merge = merge_sql(target="dlt.public.orders", staging="dlt.public._dlt_staging_orders") - assert "INSERT INTO dlt.public.orders" in append[1] - assert "DROP TABLE IF EXISTS dlt.public.orders" in replace[0] - assert "DELETE FROM dlt.public.orders" in merge[1] diff --git a/tests/test_e2e_linear_hotdata.py b/tests/test_e2e_linear_hotdata.py index 0cf186b..9490d8f 100644 --- a/tests/test_e2e_linear_hotdata.py +++ b/tests/test_e2e_linear_hotdata.py @@ -37,6 +37,7 @@ def test_e2e_linear_to_hotdata() -> None: api_base_url=os.environ.get("HOTDATA_API_BASE_URL", "https://api.hotdata.dev"), write_disposition="upsert", database_name=f"linear_e2e_{run_suffix}", + declared_tables=["linear_issues"], ), # type: ignore[call-arg] dataset_name=f"hotdata_linear_e2e_{run_suffix}", ) diff --git a/tests/test_merge.py b/tests/test_merge.py new file mode 100644 index 0000000..39383e4 --- /dev/null +++ b/tests/test_merge.py @@ -0,0 +1,71 @@ +import pytest + +from hotdata_dlt_destination.merge import ( + append_rows, + combine_rows, + merge_rows, + resolve_primary_key, + resolve_write_disposition, +) + + +def test_resolve_write_disposition_prefers_table_schema() -> None: + table = {"name": "orders", "write_disposition": "merge"} + assert resolve_write_disposition(table, "append") == "merge" + + +def test_resolve_primary_key_supports_composite_keys() -> None: + table = {"name": "orders", "primary_key": ["tenant_id", "id"]} + assert resolve_primary_key(table) == ["tenant_id", "id"] + + +def test_append_rows_extends_existing() -> None: + combined = append_rows([{"id": 1}], [{"id": 2}]) + assert combined == [{"id": 1}, {"id": 2}] + + +def test_merge_rows_updates_matching_primary_key() -> None: + existing = [{"id": 1, "value": "old"}, {"id": 2, "value": "keep"}] + incoming = [{"id": 1, "value": "new"}] + merged = merge_rows(existing, incoming, primary_key=["id"]) + assert merged == [{"id": 1, "value": "new"}, {"id": 2, "value": "keep"}] + + +def test_combine_rows_replace_uses_incoming_only() -> None: + combined = combine_rows( + disposition="replace", + existing=[{"id": 1}], + incoming=[{"id": 2}], + primary_key=["id"], + ) + assert combined == [{"id": 2}] + + +def test_combine_rows_merge_falls_back_to_hotdata_row_key() -> None: + combined = combine_rows( + disposition="merge", + existing=[{"_hotdata_row_key": "a", "value": 1}], + incoming=[{"_hotdata_row_key": "a", "value": 2}], + primary_key=None, + ) + assert combined == [{"_hotdata_row_key": "a", "value": 2}] + + +def test_combine_rows_append_extends_existing() -> None: + combined = combine_rows( + disposition="append", + existing=[{"id": 1}], + incoming=[{"id": 2}], + primary_key=["id"], + ) + assert combined == [{"id": 1}, {"id": 2}] + + +def test_combine_rows_rejects_unknown_disposition() -> None: + with pytest.raises(ValueError, match="Unsupported write_disposition 'appned'"): + combine_rows( + disposition="appned", + existing=[{"id": 1}], + incoming=[{"id": 2}], + primary_key=["id"], + ) diff --git a/tests/test_parquet.py b/tests/test_parquet.py index f687fe6..c73622e 100644 --- a/tests/test_parquet.py +++ b/tests/test_parquet.py @@ -1,4 +1,4 @@ -from hotdata_dlt_destination.parquet import write_rows_parquet +from hotdata_dlt_destination.parquet import read_parquet_rows, write_rows_parquet def test_write_rows_parquet_roundtrip(tmp_path) -> None: @@ -7,3 +7,4 @@ def test_write_rows_parquet_roundtrip(tmp_path) -> None: write_rows_parquet(rows, path) assert path.exists() assert path.stat().st_size > 0 + assert read_parquet_rows(path) == rows diff --git a/uv.lock b/uv.lock index 1e4c837..e7ef040 100644 --- a/uv.lock +++ b/uv.lock @@ -221,7 +221,7 @@ wheels = [ [[package]] name = "hotdata" -version = "0.2.1" +version = "0.2.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "pydantic" }, @@ -229,14 +229,14 @@ dependencies = [ { name = "typing-extensions" }, { name = "urllib3" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/e7/a2/3c88c39dc287230901069a64d001bc589c2ea6ef653a323a98d27a065fb2/hotdata-0.2.1.tar.gz", hash = "sha256:2b7cd511110c03221f9c82bb907ad50e514a2ac0dcaf77cb401439aa4667e6f6", size = 109863, upload-time = "2026-05-20T03:48:05.846Z" } +sdist = { url = "https://files.pythonhosted.org/packages/f9/05/81988ab357f05621b9258e4f1943d175f28720341e7ac1a98399f22bf24e/hotdata-0.2.2.tar.gz", hash = "sha256:0a66c37bc7a53d4ff8cfbd14b77d43402ebbd006a88e84cf321aaafcbb4bd2ab", size = 110864, upload-time = "2026-05-20T16:46:09.589Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/ab/c7/7f689954b860b8f90c377564a7ea9e8e0659e4ec49220f3222b75ce2d0e3/hotdata-0.2.1-py3-none-any.whl", hash = "sha256:96213c9b4db1aa1da4aad42f365306209fde2c7aad29a1be04b0a406dd366a42", size = 257273, upload-time = "2026-05-20T03:48:04.031Z" }, + { url = "https://files.pythonhosted.org/packages/3f/2b/beb0422d82e4d15d9639d4d27be5487af591cf16dbbc94fe83c5b597e2f2/hotdata-0.2.2-py3-none-any.whl", hash = "sha256:06882d99df7f6828b21aed3a66a560b7f65f3cdd368b5916a65c166c75d60a62", size = 257323, upload-time = "2026-05-20T16:46:08.185Z" }, ] [[package]] name = "hotdata-dlt-destination" -version = "0.2.0" +version = "0.3.0" source = { editable = "." } dependencies = [ { name = "dlt" }, @@ -254,7 +254,7 @@ dev = [ [package.metadata] requires-dist = [ { name = "dlt", specifier = ">=1.26.0" }, - { name = "hotdata", specifier = ">=0.2.0" }, + { name = "hotdata", specifier = ">=0.2.2" }, { name = "hotdata-runtime", specifier = ">=0.1.1" }, { name = "pyarrow", specifier = ">=14" }, ]