Goal: Build a fully working analytics data warehouse step by step. You start with four CSV files and finish with live Superset charts.
data/raw/*.csv
↓ Airflow (ETL orchestration)
[bronze schema] Raw TEXT tables in PostgreSQL
↓ dbt staging models
[silver schema] Typed + surrogate-keyed tables
↓ dbt gold models
[gold schema] Star schema (dims + fact)
↓
Apache Superset Dashboards & SQL Lab
| Service | URL | Login |
|---|---|---|
| Airflow | http://localhost:8088 | admin / admin |
| Superset | http://localhost:8089 | admin / admin |
| PostgreSQL | localhost:5432 | admin / admin |
| dbt HTTP | http://localhost:8087 | — |
docker-compose up -d
# Wait ~60 s for Airflow to init, then open http://localhost:8088docker-compose up -d- Open Docker Desktop and confirm four containers are running:
postgres,dbt,airflow,superset. - Wait about 60 seconds for Airflow to finish its
db init. - Open http://localhost:8088 and log in with
admin / admin. - Navigate to DAGs and confirm
etl_full_pipelineis listed (it will be in a failed state — that is expected at this stage).
The folder data/raw/ contains four CSV files:
| File | Key columns |
|---|---|
regions.csv |
region_id, region_name, country, continent |
clients.csv |
client_id, client_name, client_type, region_id, email |
products.csv |
product_id, product_name, category, subcategory, list_price |
orders.csv |
order_id, order_date, client_id, product_id, quantity, unit_price, discount_pct |
Open each file and answer:
- What is the data type of every column? Which ones need casting?
- Are there any obviously dirty values (extra spaces, mixed case)?
- What joins would you need to produce a single flat sales table?
File to edit:
dags/etl_pipeline.py
All bronze tasks call the helper _load_csv(table, csv_path, columns).
Implement the helper first, then wire up each task.
Implement _load_csv() in dags/etl_pipeline.py:
Steps (see docstring in the file):
1. Use glob.glob(csv_path) to find matching files.
→ Raise FileNotFoundError if the list is empty.
2. Open a psycopg2 connection using the DB_CONN dict.
3. CREATE TABLE IF NOT EXISTS bronze.<table>
with every column declared as TEXT.
4. TRUNCATE the table (so re-runs are idempotent).
5. For each file: read with csv.DictReader, build a list of
value tuples, INSERT with cursor.executemany().
6. conn.commit() and close the connection.
Then implement load_regions():
def load_regions():
_load_csv(
table = "regions",
csv_path = "/opt/data/raw/regions.csv",
columns = ["region_id", "region_name", "country", "continent"],
)Verify: In Airflow, trigger the DAG and check that only the
load_raw.load_regions task turns green. Then:
SELECT * FROM bronze.regions LIMIT 5;Implement load_clients() and load_products() following the same pattern.
Clients columns : client_id, client_name, client_type, region_id, email
Products columns: product_id, product_name, category, subcategory, list_price
Verify:
SELECT COUNT(*) FROM bronze.clients;
SELECT COUNT(*) FROM bronze.products;Implement load_orders().
Orders columns: order_id, order_date, client_id, product_id,
quantity, unit_price, discount_pct
Verify: All four load_raw.* tasks turn green.
SELECT MIN(order_date), MAX(order_date), COUNT(*) FROM bronze.orders;Files to edit:
dbt/models/staging/stg_*.sqlanddags/etl_pipeline.py(implement_call_dbt)
Implement _call_dbt(endpoint) in dags/etl_pipeline.py:
Steps (see docstring):
1. POST to f"{DBT_BASE}/{endpoint}" with timeout=300.
2. Parse the JSON response body.
3. Print result["output"] so logs are visible in Airflow.
4. If result["returncode"] != 0, raise an Exception.
Then implement run_dbt_staging():
def run_dbt_staging():
_call_dbt("run/staging")Test the dbt endpoint directly:
curl -X POST http://localhost:8087/run/stagingOpen dbt/models/staging/stg_regions.sql and
dbt/models/staging/stg_clients.sql.
Each staging model should:
- Read from the corresponding
bronze.*table. TRIM()all text columns.- For clients, also
LOWER()the email. - Filter out rows where the primary ID column is NULL.
SELECT * FROM silver.stg_regions LIMIT 5;
SELECT * FROM silver.stg_clients LIMIT 5;Open dbt/models/staging/stg_products.sql and
dbt/models/staging/stg_orders.sql.
For products and orders also cast columns:
list_price::NUMERICorder_date::DATEquantity::INTEGERunit_price::NUMERIC,discount_pct::NUMERIC
Verify:
SELECT pg_typeof(list_price) FROM silver.stg_products LIMIT 1;
SELECT pg_typeof(order_date), pg_typeof(quantity) FROM silver.stg_orders LIMIT 1;Files to edit:
dbt/models/silver/silver_*.sql
The silver layer adds integer surrogate keys using this deterministic MD5 pattern (memorise it — you'll use it in every silver model):
ABS(('x' || MD5(<natural_key>))::BIT(32)::INT) AS <entity>_skOpen dbt/models/silver/silver_regions.sql.
Add a region_sk using the surrogate key pattern on region_id.
Pass through all columns from {{ ref('stg_regions') }}.
Add CURRENT_TIMESTAMP AS loaded_at.
SELECT region_sk, region_id, region_name FROM silver.silver_regions LIMIT 5;silver_products — same pattern as silver_regions, keyed on product_id.
silver_clients — needs a JOIN:
- Generate
client_skfromclient_id. - LEFT JOIN
{{ ref('silver_regions') }}onregion_idto bring inregion_sk.
SELECT client_sk, client_name, region_sk FROM silver.silver_clients LIMIT 5;silver_orders joins three upstream models and adds computed fields:
- Generate
order_skfromorder_id. - Add
date_keyasTO_CHAR(order_date, 'YYYYMMDD')::INTEGER. - LEFT JOIN
silver_clients(forclient_sk,region_sk) andsilver_products(forproduct_sk). - Compute
total_amount:ROUND(quantity * unit_price * (1 - discount_pct), 2)
- Add
loaded_at.
SELECT order_sk, date_key, client_sk, product_sk, total_amount
FROM silver.silver_orders LIMIT 5;Open each file in dbt/models/gold/ and implement the four dimensions.
dim_region — rename region_sk → region_key, pass through other columns.
dim_client — rename client_sk → client_key, region_sk → region_key, pass through client columns.
dim_product — rename product_sk → product_key, pass through product columns.
dim_date — derive calendar attributes from distinct order dates in stg_orders.
Required columns:
date_key (YYYYMMDD integer), full_date, day, month, month_name,
quarter, year, day_name, day_of_week, day_of_year, is_weekend
SELECT * FROM gold.dim_date ORDER BY full_date LIMIT 5;Open dbt/models/gold/fact_sales.sql.
Thin projection of silver_orders:
- Rename
order_sk → sale_key. - Pass through
date_key. - Rename
client_sk → client_key,product_sk → product_key,region_sk → region_key. - Pass through measures:
quantity,unit_price,discount_pct,total_amount.
SELECT COUNT(*), SUM(total_amount) FROM gold.fact_sales;Implement run_dbt_silver() and run_dbt_gold() in dags/etl_pipeline.py
(same pattern as run_dbt_staging — different endpoint strings).
Trigger the full DAG in Airflow and wait for all three task groups to turn green:
load_raw ✓ → silver ✓ → gold ✓
Once Exercise 14 is complete, Superset auto-provisions the dashboard.
- Open http://localhost:8089 (admin / admin).
- Navigate to Dashboards → Sales Analytics Dashboard.
- You should see five charts:
- Total Revenue (KPI big number)
- Monthly Revenue Trend (line chart)
- Revenue by Region (bar chart)
- Revenue by Category (pie chart)
- Top 10 Clients (table)
Optional explorations:
- Open SQL Lab and write a query that joins
gold.fact_saleswith all four dimension tables to produce a flat row per sale. - Add a new chart: "Revenue by Continent" on the
sales_analyticsdataset. - In Airflow, trigger the DAG again — it is idempotent and safe to re-run.
| Symptom | What to check |
|---|---|
| Airflow task stuck in queued | DAG may be paused — click the toggle next to the DAG name |
FileNotFoundError in load task |
CSV lives at /opt/data/raw/ inside the container |
dbt returns returncode: 1 |
Run docker-compose logs dbt for the SQL error |
| Superset "dataset not found" | Run the ETL pipeline first; Superset retries every 60 s |
| Port conflict on startup | Another service is using 8087/8088/8089/5432 |
Working solutions for all files are in solutions/:
solutions/
dags/etl_pipeline.py
dbt/models/staging/stg_*.sql
dbt/models/silver/silver_*.sql
dbt/models/gold/dim_*.sql
dbt/models/gold/fact_sales.sql
If you encounter any issues or have questions:
- 🐛 Report bugs
- 💡 Request features
- ⭐ Star the repo if you find it useful!
If you like this project, support further development with a repost or coffee:
- 🧑💻 Markus Begerow
- 💾 GitHub