Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ HOTDATA_WORKSPACE=
HOTDATA_DATABASE=dlt
HOTDATA_SCHEMA=public
HOTDATA_WRITE_DISPOSITION=append
HOTDATA_DECLARED_TABLES=customers,orders
HOTDATA_CREATE_DATABASE_IF_MISSING=true
HOTDATA_API_BASE_URL=https://api.hotdata.dev
HOTDATA_MAX_RETRIES=5
Expand Down
29 changes: 25 additions & 4 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ on:
push:
tags:
- 'v[0-9]*'
workflow_dispatch:
inputs:
tag:
description: 'Existing tag to create or update a GitHub Release for (e.g. v0.3.0)'
required: true
type: string

permissions:
contents: write
Expand All @@ -12,8 +18,23 @@ jobs:
release:
name: Create GitHub Release
runs-on: ubuntu-latest
env:
RELEASE_TAG: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.tag || github.ref_name }}
steps:
- name: Validate release tag format
if: github.event_name == 'workflow_dispatch'
run: |
set -euo pipefail
tag="${{ github.event.inputs.tag }}"
if [[ ! "$tag" =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
echo "tag must look like vX.Y.Z, got: $tag" >&2
exit 1
fi
Comment thread
eddietejeda marked this conversation as resolved.

- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
with:
ref: ${{ env.RELEASE_TAG }}
fetch-depth: 0

- uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6
with:
Expand All @@ -23,15 +44,15 @@ jobs:
id: meta
run: |
pkg_name=$(python -c "import tomllib,pathlib; print(tomllib.loads(pathlib.Path('pyproject.toml').read_text())['project']['name'])")
pkg_version="${GITHUB_REF_NAME#v}"
pkg_version="${RELEASE_TAG#v}"
echo "name=${pkg_name}" >> "$GITHUB_OUTPUT"
echo "version=${pkg_version}" >> "$GITHUB_OUTPUT"

- name: Extract changelog notes
id: notes
run: |
set -euo pipefail
version="${GITHUB_REF_NAME#v}"
version="${RELEASE_TAG#v}"
if [[ -f CHANGELOG.md ]]; then
body="$(python scripts/extract-changelog.py "$version")"
else
Expand All @@ -45,9 +66,9 @@ jobs:
} >> "$GITHUB_OUTPUT"

- name: Create GitHub Release
uses: softprops/action-gh-release@1e812e8210a4a8a0b23075e5795f2a4e2b2a0b7 # v2.2.2
uses: softprops/action-gh-release@da05d552573ad5aba039eaac05058a918a7bf631 # v2.2.2
with:
tag_name: ${{ github.ref_name }}
tag_name: ${{ env.RELEASE_TAG }}
name: ${{ steps.meta.outputs.name }} ${{ steps.meta.outputs.version }}
body: ${{ steps.notes.outputs.body }}
generate_release_notes: false
Expand Down
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]


## [0.3.0] - 2026-05-20

### Changed

- Replace SQL staging append/merge with read-modify-write using supported API operations only (`SELECT`, `upload_parquet`, `load_managed_table(replace)`).
- Switch to dlt parquet file mode (`batch_size=0`) instead of re-encoding typed-jsonl batches.
- Require `hotdata>=0.2.2` for reliable `ApiClient.close()` lifecycle support.

### Added

- Per-resource `write_disposition` and `primary_key` from dlt table schema.
- `declared_tables` destination config (and `HOTDATA_DECLARED_TABLES` env var) for multi-table managed databases.
- `DestinationTerminalException` mapping for non-retryable Hotdata errors.
- Synced-table guard before `SELECT` to avoid 500s on never-loaded managed tables.

### Removed

- `sql.py` DML staging path (Hotdata query API does not support DML/DDL on managed tables).

## [0.2.0] - 2026-05-20

### Changed
Expand Down
26 changes: 19 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
## What this repo includes

- Custom destination via `@dlt.destination` in `src/hotdata_dlt_destination/destination.py`
- Managed-database ingestion through `hotdata-runtime` (`upload_parquet`, `load_managed_table`, SQL merge)
- Managed-database ingestion through `hotdata-runtime` (`upload_parquet`, `load_managed_table`, `SELECT`)
- Read-modify-write append/merge using only supported API operations
- Deterministic batch and row idempotency keys
- Example pipelines:
- `hotdata-dlt-basic-pipeline` (append)
Expand All @@ -20,11 +21,10 @@
- Schema: `public`
- Table name: normalized lowercase dlt table identifier
- Nested table names: `{parent}__{child}`
- Staging tables: `_dlt_staging_{table}` for append/merge batches
- Write semantics:
- `append`: parquet upload → staging load → SQL insert into target
- `replace`: parquet upload → direct managed-table replace on target
- `upsert`/`merge`: parquet upload → staging load → SQL delete by `_hotdata_row_key` + insert
- Write semantics (all use `load_managed_table(replace)` under the hood):
- `replace`: upload batch parquet and replace the target table
- `append`: read existing target rows, append batch in Python, replace target
- `upsert`/`merge`: read existing rows, upsert by dlt `primary_key` (or `_hotdata_row_key`), replace target
- Idempotency:
- Batch key `_hotdata_batch_key` = hash(table + full batch payload)
- Row key `_hotdata_row_key` = hash(table + canonical row payload)
Expand All @@ -36,7 +36,16 @@ Set environment variables (or pass destination kwargs / dlt secrets):
- `HOTDATA_API_KEY`
- `HOTDATA_WORKSPACE`
- `HOTDATA_DATABASE` (managed database name, default `dlt`)
- optional: `HOTDATA_SCHEMA`, `HOTDATA_WRITE_DISPOSITION`, retry tuning
- optional: `HOTDATA_SCHEMA`, `HOTDATA_WRITE_DISPOSITION`, `HOTDATA_DECLARED_TABLES`, retry tuning

For pipelines with multiple tables, declare every target table when the managed database is first created:

```python
hotdata_destination(
database_name="analytics",
declared_tables=["customers", "orders", "orders__items"],
)
```

## Usage

Expand All @@ -49,11 +58,14 @@ pipeline = dlt.pipeline(
destination=hotdata_destination(
database_name="analytics",
write_disposition="append",
declared_tables=["customers"],
),
)
pipeline.run(my_resource())
```

Per-resource `write_disposition` and `primary_key` from dlt take precedence over the destination default.

## Developer workflow

```bash
Expand Down
12 changes: 12 additions & 0 deletions RELEASING.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ Pushing a `vX.Y.Z` tag triggers two workflows:
| `publish.yml` | Build wheel/sdist and publish to PyPI |
| `release.yml` | Create the GitHub Release with notes from `CHANGELOG.md` |

## Recover a missing GitHub Release

If PyPI publish succeeded but the GitHub Release workflow failed, rerun it from `master`
without retagging:

```bash
gh workflow run "GitHub Release" --ref master -f tag=vX.Y.Z
```

The tag must already exist on the remote. The workflow checks out that tag, extracts the
matching `CHANGELOG.md` section, and creates or updates the GitHub Release.

## Enforcement

- **PR check** (`check-release.yml`): if `pyproject.toml` version changes, `CHANGELOG.md` must contain a matching `## [X.Y.Z]` section.
Expand Down
22 changes: 12 additions & 10 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,35 @@ Provide a custom `dlt` destination for Hotdata managed databases with explicit c

- `destination.py`: custom destination entrypoint (`@dlt.destination`)
- `hotdata_client.py`: retry wrapper over `hotdata-runtime` managed-database APIs
- `parquet.py`: batch row serialization to parquet uploads
- `sql.py`: append/replace/merge SQL against qualified managed tables
- `parquet.py`: parquet read/write for dlt load files and uploads
- `merge.py`: read-modify-write row combining for append/merge dispositions
- `contracts.py`: deterministic database/schema/table mapping
- `idempotency.py`: stable batch and row key generation
- `errors.py`: transient vs terminal error mapping
- `pipelines/`: example data pipelines

## Ingestion flow

1. `dlt` sends `(items, table)` into `hotdata_destination`.
1. `dlt` sends a parquet load file path and `table` schema into `hotdata_destination`.
2. Contract mapping converts table metadata into `{database}.{schema}.{table}` naming.
3. Each row receives `_hotdata_batch_key`, `_hotdata_row_key`, and `_hotdata_loaded_at`.
4. Rows are written to a temporary parquet file and uploaded via `upload_parquet`.
4. Write disposition comes from the dlt table schema, falling back to the destination default.
5. Managed database is resolved or created (`create_managed_database` when enabled).
6. Load path depends on write disposition:
- `replace`: `load_managed_table` directly on the target table (`mode=replace`)
- `append` / `merge`: `load_managed_table` into `_dlt_staging_{table}`, then SQL into target
6. Load path uses only supported API operations:
- `replace`: upload parquet batch and `load_managed_table(replace)` on the target
- `append` / `merge`: `SELECT *` existing target rows, combine in Python, then replace the target

## Reliability model

- Retries: bounded retries with linear backoff
- Retryable classes: HTTP 408/409/425/429, HTTP 5xx, network timeout/connect failures
- Terminal classes: remaining HTTP/client errors
- Terminal classes: remaining HTTP/client errors, surfaced as `DestinationTerminalException`
- Idempotency: stable row and batch keys derived from canonical JSON
- Parallelism: `table-sequential` load jobs to avoid concurrent read-modify-write races

## Known limitations

- Managed-table loads currently support `mode=replace` only; append/merge use staging + SQL.
- `replace` replaces the full target table per batch (best for single-batch resources).
- Managed-table loads only support `mode=replace`; append/merge are emulated via read-modify-write.
- Tables must be declared when the managed database is created; use `declared_tables` for multi-table pipelines.
- Read-modify-write reads the full target table on every append/merge batch.
- This implementation is a custom destination callable, not a native dlt destination plugin package.
5 changes: 3 additions & 2 deletions docs/runbook.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ uv run pytest tests/test_e2e_linear_hotdata.py -m integration

1. Create a new module in `src/hotdata_dlt_destination/pipelines/`.
2. Define a `dlt.resource` with explicit `name` and `write_disposition`.
3. Build pipeline with `destination=hotdata_destination(database_name=..., write_disposition=...)`.
3. Build pipeline with `destination=hotdata_destination(database_name=..., declared_tables=[...])`.
4. Add script entrypoint in `pyproject.toml`.
5. Add tests covering schema shape, idempotency key behavior, and retry/error handling.

Expand All @@ -70,4 +70,5 @@ uv run pytest tests/test_e2e_linear_hotdata.py -m integration
- `401` / `403`: verify `HOTDATA_API_KEY` and `HOTDATA_WORKSPACE`.
- `404` on destination paths: verify `HOTDATA_API_BASE_URL` is the API host (for example `https://api.hotdata.dev`).
- `429` / `5xx`: increase retry/backoff values.
- SQL merge errors: verify managed database/table names exist and columns are valid in your workspace.
- `table not declared`: recreate the managed database with all target tables in `declared_tables`, or declare them at create time.
- Append/merge loads re-read the full target table each batch; large tables may be slow until native append/merge API support lands.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "hotdata-dlt-destination"
version = "0.2.0"
version = "0.3.0"
description = "dlt destination for loading data into Hotdata managed databases."
readme = "README.md"
authors = [
Expand All @@ -9,7 +9,7 @@ authors = [
requires-python = ">=3.11"
dependencies = [
"dlt>=1.26.0",
"hotdata>=0.2.0",
"hotdata>=0.2.2",
"hotdata-runtime>=0.1.1",
"pyarrow>=14",
]
Expand Down
6 changes: 6 additions & 0 deletions src/hotdata_dlt_destination/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ class HotdataDestinationConfig:
schema: str = "public"
write_disposition: str = "append"
create_database_if_missing: bool = True
declared_tables: tuple[str, ...] = ()
max_retries: int = 5
retry_backoff_seconds: float = 1.0

@classmethod
def from_env(cls) -> HotdataDestinationConfig:
declared = os.environ.get("HOTDATA_DECLARED_TABLES", "")
declared_tables = tuple(
table.strip() for table in declared.split(",") if table.strip()
)
return cls(
api_key=os.environ["HOTDATA_API_KEY"],
workspace_id=os.environ["HOTDATA_WORKSPACE"],
Expand All @@ -29,6 +34,7 @@ def from_env(cls) -> HotdataDestinationConfig:
"HOTDATA_CREATE_DATABASE_IF_MISSING", "true"
).lower()
in {"1", "true", "yes"},
declared_tables=declared_tables,
max_retries=int(os.environ.get("HOTDATA_MAX_RETRIES", "5")),
retry_backoff_seconds=float(os.environ.get("HOTDATA_RETRY_BACKOFF_SECONDS", "1.0")),
)
26 changes: 19 additions & 7 deletions src/hotdata_dlt_destination/contracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from dlt.common.schema import TTableSchema

IDENTIFIER_RE = re.compile(r"[^a-zA-Z0-9_]")
STAGING_PREFIX = "_dlt_staging_"


def normalize_identifier(value: str) -> str:
Expand All @@ -19,16 +18,11 @@ class TableContract:
database_name: str
schema: str
table_name: str
staging_table_name: str

@property
def qualified_target(self) -> str:
return f"{self.database_name}.{self.schema}.{self.table_name}"

@property
def qualified_staging(self) -> str:
return f"{self.database_name}.{self.schema}.{self.staging_table_name}"

@classmethod
def from_table_schema(
cls,
Expand All @@ -50,5 +44,23 @@ def from_table_schema(
database_name=normalize_identifier(database_name),
schema=normalize_identifier(schema),
table_name=normalized_table_name,
staging_table_name=f"{STAGING_PREFIX}{normalized_table_name}",
)

@classmethod
def declared_table_names(
cls,
*,
database_name: str,
schema: str,
table_names: list[str],
) -> list[str]:
return sorted(
{
cls.from_table_schema(
{"name": table_name},
database_name=database_name,
schema=schema,
).table_name
for table_name in table_names
}
)
Loading
Loading