-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcsv_to_db.py
More file actions
55 lines (48 loc) · 1.65 KB
/
csv_to_db.py
File metadata and controls
55 lines (48 loc) · 1.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
"""Simple ETL: CSV to SQLite database"""
import pandas as pd
import sqlite3
from pathlib import Path
def extract(csv_path):
"""Extract data from CSV file."""
print(f"Extracting from {csv_path}...")
df = pd.read_csv(csv_path)
print(f" Rows: {len(df)}, Columns: {len(df.columns)}")
return df
def transform(df):
"""Clean and transform data."""
print("Transforming...")
# Remove duplicates
before = len(df)
df = df.drop_duplicates()
print(f" Removed {before - len(df)} duplicates")
# Handle missing values
for col in df.select_dtypes(include=["object"]).columns:
df[col] = df[col].fillna("Unknown")
for col in df.select_dtypes(include=["number"]).columns:
df[col] = df[col].fillna(0)
# Normalize column names
df.columns = [c.lower().strip().replace(" ", "_") for c in df.columns]
print(f" Final shape: {df.shape}")
return df
def load(df, db_path, table_name):
"""Load data into SQLite database."""
print(f"Loading into {db_path} -> {table_name}...")
conn = sqlite3.connect(db_path)
df.to_sql(table_name, conn, if_exists="replace", index=False)
conn.close()
print(f" Loaded {len(df)} rows successfully!")
def run_pipeline(csv_path, db_path="output.db", table_name="data"):
"""Run the full ETL pipeline."""
print("=" * 50)
print("ETL Pipeline Started")
print("=" * 50)
df = extract(csv_path)
df = transform(df)
load(df, db_path, table_name)
print("\nPipeline complete!")
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("Usage: python csv_to_db.py <csv_file>")
else:
run_pipeline(sys.argv[1])