hotdata-dlt-destination is a Python package that implements a custom dlt destination for loading data into Hotdata managed databases with deterministic idempotency keys and explicit write semantics.
- Custom destination via
@dlt.destinationinsrc/hotdata_dlt_destination/destination.py - Managed-database ingestion through
hotdata-runtime(upload_parquet,load_managed_table,SELECT) - Read-modify-write append/merge using only supported API operations
- Deterministic batch and row idempotency keys
- Example pipelines:
hotdata-dlt-basic-pipeline(append)hotdata-dlt-incremental-pipeline(upsert/merge)hotdata-dlt-linear-pipeline(Linear issues → Hotdata)
- Unit tests in
tests/ - Architecture and runbook docs in
docs/
- Managed database:
database_name(defaultdlt, created on first load when missing) - Schema:
public - Table name: normalized lowercase dlt table identifier
- Nested table names:
{parent}__{child} - Write semantics (all use
load_managed_table(replace)under the hood):replace: upload batch parquet and replace the target tableappend: read existing target rows, append batch in Python, replace targetupsert/merge: read existing rows, upsert by dltprimary_key(or_hotdata_row_key), replace target
- Idempotency:
- Batch key
_hotdata_batch_key= hash(table + full batch payload) - Row key
_hotdata_row_key= hash(table + canonical row payload)
- Batch key
Set environment variables (or pass destination kwargs / dlt secrets):
HOTDATA_API_KEYHOTDATA_WORKSPACEHOTDATA_DATABASE(managed database name, defaultdlt)- optional:
HOTDATA_SCHEMA,HOTDATA_WRITE_DISPOSITION,HOTDATA_DECLARED_TABLES, retry tuning
For pipelines with multiple tables, declare every target table when the managed database is first created:
hotdata_destination(
database_name="analytics",
declared_tables=["customers", "orders", "orders__items"],
)import dlt
from hotdata_dlt_destination import hotdata_destination
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination=hotdata_destination(
database_name="analytics",
write_disposition="append",
declared_tables=["customers"],
),
)
pipeline.run(my_resource())Per-resource write_disposition and primary_key from dlt take precedence over the destination default.
uv sync
uv run ruff check .
uv run pytest
uv run hotdata-dlt-destinationRun pipelines:
uv run hotdata-dlt-basic-pipeline
uv run hotdata-dlt-incremental-pipeline
uv run hotdata-dlt-linear-pipelineRun the live end-to-end integration test (requires Hotdata + Linear env vars):
uv run pytest tests/test_e2e_linear_hotdata.py -m integration