diff --git a/README.md b/README.md index 9c12192..3e48553 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,7 @@ API key priority (lowest to highest): config file → `HOTDATA_API_KEY` env var | `auth` | `login`, `status`, `logout` | `login` or bare `auth` opens browser login; `status` / `logout` manage the saved profile | | `workspaces` | `list`, `set` | Manage workspaces | | `connections` | `list`, `create`, `refresh`, `new` | Manage connections | +| `databases` | `list`, `create`, `delete`, `tables` | Managed databases (create and load tables via parquet) | | `tables` | `list` | List tables and columns | | `datasets` | `list`, `create`, `update` | Manage uploaded datasets | | `context` | `list`, `show`, `pull`, `push` | Workspace Markdown context (e.g. data model `DATAMODEL`) via the context API | @@ -127,6 +128,34 @@ hotdata connections create list --format json hotdata connections create --name "my-conn" --type postgres --config '{"host":"...","port":5432,...}' ``` +## Databases + +Managed databases are Hotdata-owned catalogs you create and populate yourself (no remote source to sync). Query them with SQL as `database_name.schema.table` — the database name is the connection name. + +```sh +hotdata databases list [-w ] [-o table|json|yaml] +hotdata databases create --name [--table ...] [--schema public] [-o table|json|yaml] +hotdata databases [-o table|json|yaml] +hotdata databases delete + +hotdata databases tables list [--schema ] [-o table|json|yaml] +hotdata databases tables load
--file ./data.parquet [--schema public] +hotdata databases tables load
--upload-id [--schema public] +hotdata databases tables delete
[--schema public] +``` + +- `create` registers a managed connection (`source_type: managed`) with no external credentials. Use `--table` to declare tables up front (required before `tables load` on the current API). +- `tables load` uploads a **parquet** file (or uses a staged `upload_id` from `POST /v1/files`) and publishes it as the table generation (`replace` mode). +- For CSV/JSON uploads without a managed database, use `hotdata datasets create` instead (`datasets.main.*`). + +Example: + +```sh +hotdata databases create --name sales --table orders +hotdata databases tables load sales orders --file ./orders.parquet +hotdata query "SELECT count(*) FROM sales.public.orders" +``` + ## Tables ```sh diff --git a/skills/hotdata/SKILL.md b/skills/hotdata/SKILL.md index 665e40d..ec31ef1 100644 --- a/skills/hotdata/SKILL.md +++ b/skills/hotdata/SKILL.md @@ -1,6 +1,6 @@ --- name: hotdata -description: Use this skill when the user wants to run hotdata CLI commands, query the Hotdata API, list workspaces, list connections, create connections, list tables, manage datasets, execute SQL queries, inspect query run history, search tables, manage indexes, manage sandboxes, manage workspace context and stored docs such as context:DATAMODEL via the context API (`hotdata context`), install or update the bundled agent skills (`hotdata skills`), generate shell completions (`hotdata completions`), or interact with the hotdata service. Activate when the user says "run hotdata", "query hotdata", "list workspaces", "list connections", "create a connection", "list tables", "list datasets", "create a dataset", "upload a dataset", "execute a query", "search a table", "list indexes", "create an index", "list query runs", "list past queries", "query history", "list sandboxes", "create a sandbox", "run a sandbox", "workspace context", "pull context", "push context", "data model", "context:DATAMODEL", or asks you to use the hotdata CLI. +description: Use this skill when the user wants to run hotdata CLI commands, query the Hotdata API, list workspaces, list connections, create connections, list or create managed databases, load parquet into database tables, list tables, manage datasets, execute SQL queries, inspect query run history, search tables, manage indexes, manage sandboxes, manage workspace context and stored docs such as context:DATAMODEL via the context API (`hotdata context`), install or update the bundled agent skills (`hotdata skills`), generate shell completions (`hotdata completions`), or interact with the hotdata service. Activate when the user says "run hotdata", "query hotdata", "list workspaces", "list connections", "create a connection", "list databases", "create a database", "managed database", "load parquet", "list tables", "list datasets", "create a dataset", "upload a dataset", "execute a query", "search a table", "list indexes", "create an index", "list query runs", "list past queries", "query history", "list sandboxes", "create a sandbox", "run a sandbox", "workspace context", "pull context", "push context", "data model", "context:DATAMODEL", or asks you to use the hotdata CLI. version: 0.2.2 --- @@ -73,14 +73,14 @@ These are **patterns** built from the commands below—not separate CLI subcomma - **Model (`context:DATAMODEL`)** — The **shared** Markdown semantic map of the workspace (entities, keys, joins across connections). **Store and read it only via workspace context** (`hotdata context list`, then `hotdata context show DATAMODEL` **only when listed**, `context push DATAMODEL`); refresh using `connections`, `connections refresh`, `tables list`, and `datasets list`. For a **deep** pass (connector enrichment, indexes, per-table detail), see [references/MODEL_BUILD.md](references/MODEL_BUILD.md). Contrast **analysis modeling** in sandboxes or chat (see [Analysis modeling vs context:DATAMODEL](#analysis-modeling-vs-contextdatamodel)). - **History** — Inspect prior activity via `hotdata queries list` (query runs) and `hotdata results list` / `results ` (row data). -- **Chain** — Follow-ups via **`datasets create`** then `query` against `datasets..
`. +- **Chain** — Follow-ups via **`datasets create`** then `query` against `datasets..
`, or via **`databases create`** + **`databases tables load`** (parquet) then `query` against `..
`. - **Indexes** — Review SQL and schema, compare to existing indexes, create **sorted**, **bm25**, or **vector** indexes when it clearly helps; see [references/WORKFLOWS.md](references/WORKFLOWS.md#indexes). Full step-by-step procedures: [references/WORKFLOWS.md](references/WORKFLOWS.md). ## Available Commands -Top-level subcommands (each detailed below): **`auth`**, **`datasets`**, **`query`**, **`workspaces`**, **`connections`**, **`tables`**, **`skills`**, **`results`**, **`jobs`**, **`indexes`**, **`embedding-providers`**, **`search`**, **`queries`**, **`sandbox`**, **`context`**, **`completions`**. +Top-level subcommands (each detailed below): **`auth`**, **`datasets`**, **`query`**, **`workspaces`**, **`connections`**, **`databases`**, **`tables`**, **`skills`**, **`results`**, **`jobs`**, **`indexes`**, **`embedding-providers`**, **`search`**, **`queries`**, **`sandbox`**, **`context`**, **`completions`**. Global CLI options: **`--api-key`**, **`-v` / `--version`**, **`-h` / `--help`**. Hidden developer flag: **`--debug`** (verbose HTTP logs). @@ -167,6 +167,43 @@ hotdata connections create \ - Fields with `"type": "array"` must be JSON arrays (e.g. `"spreadsheet_ids": ["abc", "def"]`). - Nested `oneOf` fields must be a JSON object including a `"type"` discriminator field matching the chosen variant's `const` value. +### Managed databases (`databases`) + +**Managed databases** are Hotdata-owned catalogs (`source_type: managed`) you create and populate yourself—no remote source to sync. Query them in SQL as **`..
`** (the database name is the connection name). Prefer **`hotdata databases`** over **`hotdata connections create --type managed`** for this workflow. + +**Parquet vs datasets:** `databases tables load` accepts **parquet only**. For CSV/JSON uploads without a managed database, use **`hotdata datasets create`**. + +**Declare tables at create time:** On the current API, each table must be declared with **`--table`** when creating the database before **`tables load`** will succeed. If load fails with *not declared*, recreate with `--table` or add declaration support when the API allows it. + +``` +hotdata databases list [--workspace-id ] [--output table|json|yaml] +hotdata databases create --name [--table
...] [--schema public] [--workspace-id ] [--output table|json|yaml] +hotdata databases [--workspace-id ] [--output table|json|yaml] +hotdata databases delete [--workspace-id ] + +hotdata databases tables list [--schema ] [--workspace-id ] [--output table|json|yaml] +hotdata databases tables load
--file ./data.parquet [--schema public] [--workspace-id ] +hotdata databases tables load
--upload-id [--schema public] [--workspace-id ] +hotdata databases tables delete
[--schema public] [--workspace-id ] +``` + +- `list` — managed databases only (filters `source_type: managed` from connections). +- `create` — registers a managed connection with optional `config.schemas[].tables[]` from repeated **`--table`**. Default schema is **`public`**. +- `` — inspect one database (name, id, table counts, SQL prefix hint). +- `delete` — removes the managed database and its tables. +- `tables list` — tables with `TABLE` (`..
`), `SYNCED`, `LAST_SYNC` (via `information_schema`). +- `tables load` — uploads a local **parquet** file (or uses **`--upload-id`** from a prior `POST /v1/files` staging) and publishes with **`replace`** mode. **`--file`** and **`--upload-id`** are mutually exclusive. +- `tables delete` — drops a table from the managed database. +- Resolving by **name** or **connection id** works for all subcommands that take `` or ``. Non-managed connections error with a hint to use **`hotdata connections`**. + +Example: + +``` +hotdata databases create --name sales --table orders +hotdata databases tables load sales orders --file ./orders.parquet +hotdata query "SELECT count(*) FROM sales.public.orders" +``` + ### List Tables and Columns ``` hotdata tables list [--workspace-id ] [--connection-id ] [--schema ] [--table ] [--limit ] [--cursor ] [--output table|json|yaml] @@ -499,6 +536,24 @@ Use a sandbox to explore tables and capture **analysis-oriented** notes in sandb hotdata query "SELECT \"CustomerName\" FROM datasets.main.my_csv LIMIT 10" ``` +## Workflow: Creating a managed database (parquet) + +1. Create the database and declare tables up front: + ``` + hotdata databases create --name mydb --table events --table users + ``` +2. Load parquet into each table: + ``` + hotdata databases tables load mydb events --file ./events.parquet + ``` +3. Confirm tables and query: + ``` + hotdata databases tables list mydb + hotdata query "SELECT * FROM mydb.public.events LIMIT 10" + ``` + +For CSV/JSON file uploads, use **`hotdata datasets create`** instead. + ## Workflow: Creating a Connection 1. List available connection types: diff --git a/src/command.rs b/src/command.rs index 3013c70..2480ce8 100644 --- a/src/command.rs +++ b/src/command.rs @@ -69,6 +69,23 @@ pub enum Commands { command: Option, }, + /// Managed databases you create and populate with tables (parquet uploads) + Databases { + /// Database name or connection ID (omit to use a subcommand) + name_or_id: Option, + + /// Workspace ID (defaults to first workspace from login) + #[arg(long, short = 'w', global = true)] + workspace_id: Option, + + /// Output format + #[arg(long = "output", short = 'o', default_value = "table", value_parser = ["table", "json", "yaml"])] + output: String, + + #[command(subcommand)] + command: Option, + }, + /// Manage tables in a workspace Tables { #[command(subcommand)] @@ -515,6 +532,98 @@ pub enum ConnectionsCreateCommands { }, } +#[derive(Subcommand)] +pub enum DatabasesCommands { + /// List managed databases in the workspace + List { + /// Output format + #[arg(long = "output", short = 'o', default_value = "table", value_parser = ["table", "json", "yaml"])] + output: String, + }, + + /// Create a new managed database + Create { + /// Database name (used as the connection name in SQL: `name.schema.table`) + #[arg(long)] + name: String, + + /// Schema for tables declared at create time (default: public) + #[arg(long, default_value = "public")] + schema: String, + + /// Table to declare up front (repeatable). Required before load on current API. + #[arg(long = "table")] + tables: Vec, + + /// Output format + #[arg(long = "output", short = 'o', default_value = "table", value_parser = ["table", "json", "yaml"])] + output: String, + }, + + /// Delete a managed database and its tables + Delete { + /// Database name or connection ID + name_or_id: String, + }, + + /// Manage tables inside a managed database + Tables { + #[command(subcommand)] + command: DatabaseTablesCommands, + }, +} + +#[derive(Subcommand)] +pub enum DatabaseTablesCommands { + /// List tables in a managed database + List { + /// Database name or connection ID + database: String, + + /// Filter by schema name + #[arg(long)] + schema: Option, + + /// Output format + #[arg(long = "output", short = 'o', default_value = "table", value_parser = ["table", "json", "yaml"])] + output: String, + }, + + /// Load a parquet file into a table (creates or replaces the table) + Load { + /// Database name or connection ID + database: String, + + /// Table name + table: String, + + /// Schema name (default: public) + #[arg(long, default_value = "public")] + schema: String, + + /// Path to a local parquet file to upload and load + #[arg(long, conflicts_with = "upload_id")] + file: Option, + + /// Use a previously staged upload ID from `POST /v1/files` instead of uploading + #[arg(long)] + upload_id: Option, + }, + + /// Delete a table from a managed database + Delete { + /// Database name or connection ID + database: String, + + /// Table name + table: String, + + /// Schema name (default: public) + #[arg(long, default_value = "public")] + schema: String, + }, +} + #[derive(Subcommand)] pub enum ConnectionsCommands { /// Interactively create a new connection diff --git a/src/databases.rs b/src/databases.rs new file mode 100644 index 0000000..87b7b4a --- /dev/null +++ b/src/databases.rs @@ -0,0 +1,832 @@ +use crate::api::ApiClient; +use indicatif::{ProgressBar, ProgressStyle}; +use serde::{Deserialize, Serialize}; +use std::path::Path; + +const MANAGED_SOURCE_TYPE: &str = "managed"; +const DEFAULT_SCHEMA: &str = "public"; + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub struct Database { + pub id: String, + pub name: String, + pub source_type: String, +} + +#[derive(Deserialize)] +struct ListConnectionsResponse { + connections: Vec, +} + +#[derive(Deserialize, Serialize)] +struct DatabaseDetail { + id: String, + name: String, + source_type: String, + #[serde(default)] + table_count: u64, + #[serde(default)] + synced_table_count: u64, +} + +#[derive(Deserialize)] +struct InfoTable { + #[allow(dead_code)] + connection: String, + schema: String, + table: String, + synced: bool, + last_sync: Option, +} + +#[derive(Deserialize)] +struct InfoListResponse { + tables: Vec, + has_more: bool, + next_cursor: Option, +} + +#[derive(Deserialize, Serialize)] +struct TableRow { + full_name: String, + schema: String, + table: String, + synced: bool, + last_sync: Option, +} + +#[derive(Deserialize, Serialize)] +struct CreateConnectionResponse { + id: String, + name: String, + source_type: String, +} + +#[derive(Deserialize)] +struct LoadManagedTableResponse { + #[allow(dead_code)] + connection_id: String, + schema_name: String, + table_name: String, + row_count: u64, + #[allow(dead_code)] + arrow_schema_json: String, +} + +fn is_managed(db: &Database) -> bool { + db.source_type == MANAGED_SOURCE_TYPE +} + +pub fn try_resolve_database(api: &ApiClient, name_or_id: &str) -> Result { + let body: ListConnectionsResponse = api.get("/connections"); + let by_id = body + .connections + .iter() + .find(|c| c.id == name_or_id) + .cloned(); + let found = by_id.or_else(|| { + body.connections + .iter() + .find(|c| c.name == name_or_id) + .cloned() + }); + match found { + Some(db) if is_managed(&db) => Ok(db), + Some(db) => Err(format!( + "'{}' is not a managed database (source_type: {})", + db.name, db.source_type + )), + None => Err(format!("no database named or with id '{name_or_id}'")), + } +} + +pub fn resolve_database(api: &ApiClient, name_or_id: &str) -> Database { + match try_resolve_database(api, name_or_id) { + Ok(db) => db, + Err(e) => { + use crossterm::style::Stylize; + if e.contains("not a managed database") { + eprintln!("{}", format!("error: {e}. Use `hotdata connections` for remote sources.").red()); + } else { + eprintln!("{}", format!("error: {e}").red()); + } + std::process::exit(1); + } + } +} + +fn schema_name(schema: Option<&str>) -> &str { + schema.unwrap_or(DEFAULT_SCHEMA) +} + +/// Build managed-connection `config` with declared schemas/tables. +pub fn build_managed_config(schema: &str, tables: &[String]) -> serde_json::Value { + if tables.is_empty() { + return serde_json::json!({}); + } + let table_objs: Vec = tables + .iter() + .map(|t| serde_json::json!({ "name": t })) + .collect(); + serde_json::json!({ + "schemas": [{ "name": schema, "tables": table_objs }] + }) +} + +/// Request body for `POST /v1/connections` when creating a managed database. +pub fn create_connection_request(name: &str, schema: &str, tables: &[String]) -> serde_json::Value { + serde_json::json!({ + "name": name, + "source_type": MANAGED_SOURCE_TYPE, + "config": build_managed_config(schema, tables), + "skip_discovery": true, + }) +} + +pub fn managed_table_load_path(connection_id: &str, schema: &str, table: &str) -> String { + format!("/connections/{connection_id}/schemas/{schema}/tables/{table}/loads") +} + +pub fn managed_table_delete_path(connection_id: &str, schema: &str, table: &str) -> String { + format!("/connections/{connection_id}/schemas/{schema}/tables/{table}") +} + +pub fn load_table_request(upload_id: &str) -> serde_json::Value { + serde_json::json!({ + "mode": "replace", + "upload_id": upload_id, + }) +} + +/// Returns true when `path` looks like a parquet file by extension. +pub fn is_parquet_path(path: &str) -> bool { + path.to_ascii_lowercase().ends_with(".parquet") + || Path::new(path).extension().and_then(|e| e.to_str()) == Some("parquet") +} + +fn table_rows_for_database(db_name: &str, tables: Vec) -> Vec { + tables + .into_iter() + .map(|t| TableRow { + full_name: format!("{}.{}.{}", db_name, t.schema, t.table), + schema: t.schema, + table: t.table, + synced: t.synced, + last_sync: t.last_sync, + }) + .collect() +} + +fn upload_parquet_file(api: &ApiClient, path: &str) -> String { + if !is_parquet_path(path) { + eprintln!( + "error: managed table loads require a parquet file (got '{}'). \ + Convert your data to parquet or use `hotdata datasets create` for CSV/JSON.", + path + ); + std::process::exit(1); + } + + let f = match std::fs::File::open(path) { + Ok(f) => f, + Err(e) => { + eprintln!("error opening file '{path}': {e}"); + std::process::exit(1); + } + }; + + let file_size = f.metadata().map(|m| m.len()).unwrap_or(0); + let pb = ProgressBar::new(file_size); + pb.set_style( + ProgressStyle::with_template( + "{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})", + ) + .unwrap() + .progress_chars("=>-"), + ); + let reader = pb.wrap_read(f); + + let (status, resp_body) = api.post_body( + "/files", + "application/octet-stream", + reader, + Some(file_size), + ); + pb.finish_and_clear(); + + if !status.is_success() { + use crossterm::style::Stylize; + eprintln!("{}", crate::util::api_error(resp_body).red()); + std::process::exit(1); + } + + let body: serde_json::Value = match serde_json::from_str(&resp_body) { + Ok(v) => v, + Err(e) => { + eprintln!("error parsing upload response: {e}"); + std::process::exit(1); + } + }; + match body["id"].as_str() { + Some(id) => id.to_string(), + None => { + eprintln!("error: upload response missing id"); + std::process::exit(1); + } + } +} + +fn collect_tables(api: &ApiClient, connection_id: &str, schema: Option<&str>) -> Vec { + let mut out = Vec::new(); + let mut cursor: Option = None; + loop { + let mut params: Vec<(&str, Option)> = vec![ + ("connection_id", Some(connection_id.to_string())), + ]; + if let Some(s) = schema { + params.push(("schema", Some(s.to_string()))); + } + if let Some(ref c) = cursor { + params.push(("cursor", Some(c.clone()))); + } + let body: InfoListResponse = api.get_with_params("/information_schema", ¶ms); + out.extend(body.tables); + if !body.has_more { + break; + } + let Some(c) = body.next_cursor else { + break; + }; + cursor = Some(c); + } + out.sort_by(|a, b| { + a.schema + .cmp(&b.schema) + .then_with(|| a.table.cmp(&b.table)) + }); + out +} + +pub fn list(workspace_id: &str, format: &str) { + let api = ApiClient::new(Some(workspace_id)); + let body: ListConnectionsResponse = api.get("/connections"); + let databases: Vec<&Database> = body + .connections + .iter() + .filter(|c| is_managed(c)) + .collect(); + + match format { + "json" => println!("{}", serde_json::to_string_pretty(&databases).unwrap()), + "yaml" => print!("{}", serde_yaml::to_string(&databases).unwrap()), + "table" => { + if databases.is_empty() { + use crossterm::style::Stylize; + eprintln!("{}", "No databases found.".dark_grey()); + eprintln!( + "{}", + "Create one with: hotdata databases create --name ".dark_grey() + ); + } else { + let rows: Vec> = databases + .iter() + .map(|d| vec![d.name.clone(), d.id.clone()]) + .collect(); + crate::table::print(&["NAME", "ID"], &rows); + } + } + _ => unreachable!(), + } +} + +pub fn get(workspace_id: &str, name_or_id: &str, format: &str) { + let api = ApiClient::new(Some(workspace_id)); + let db = resolve_database(&api, name_or_id); + let detail: DatabaseDetail = api.get(&format!("/connections/{}", db.id)); + + match format { + "json" => println!("{}", serde_json::to_string_pretty(&detail).unwrap()), + "yaml" => print!("{}", serde_yaml::to_string(&detail).unwrap()), + "table" => { + use crossterm::style::Stylize; + let label = |l: &str| format!("{:<16}", l).dark_grey().to_string(); + println!("{}{}", label("name:"), detail.name.clone().white()); + println!("{}{}", label("id:"), detail.id.dark_cyan()); + println!( + "{}{} synced / {} total", + label("tables:"), + detail.synced_table_count.to_string().cyan(), + detail.table_count.to_string().cyan(), + ); + println!( + "{}{}", + label("sql_prefix:"), + format!("{}.{{schema}}.{{table}}", detail.name).green() + ); + } + _ => unreachable!(), + } +} + +pub fn create(workspace_id: &str, name: &str, schema: &str, tables: &[String], format: &str) { + use crossterm::style::Stylize; + + let body = create_connection_request(name, schema, tables); + + let api = ApiClient::new(Some(workspace_id)); + let spinner = (format == "table").then(|| crate::util::spinner("Creating database...")); + let (status, resp_body) = api.post_raw("/connections", &body); + if let Some(s) = &spinner { + s.finish_and_clear(); + } + + if !status.is_success() { + eprintln!("{}", crate::util::api_error(resp_body).red()); + std::process::exit(1); + } + + let result: CreateConnectionResponse = match serde_json::from_str(&resp_body) { + Ok(v) => v, + Err(e) => { + eprintln!("error parsing response: {e}"); + std::process::exit(1); + } + }; + + match format { + "json" => println!("{}", serde_json::to_string_pretty(&result).unwrap()), + "yaml" => print!("{}", serde_yaml::to_string(&result).unwrap()), + "table" => { + println!("{}", "Database created".green()); + println!("name: {}", result.name); + println!("id: {}", result.id); + println!( + "load: hotdata databases tables load {}
--file ./data.parquet", + result.name + ); + } + _ => unreachable!(), + } +} + +pub fn delete(workspace_id: &str, name_or_id: &str) { + use crossterm::style::Stylize; + + let api = ApiClient::new(Some(workspace_id)); + let db = resolve_database(&api, name_or_id); + let (status, resp_body) = api.delete_raw(&format!("/connections/{}", db.id)); + + if !status.is_success() { + eprintln!("{}", crate::util::api_error(resp_body).red()); + std::process::exit(1); + } + + println!( + "{}", + format!("Database '{}' deleted.", db.name).green() + ); +} + +pub fn tables_list( + workspace_id: &str, + database: &str, + schema: Option<&str>, + format: &str, +) { + let api = ApiClient::new(Some(workspace_id)); + let db = resolve_database(&api, database); + let tables = collect_tables(&api, &db.id, schema); + + let rows = table_rows_for_database(&db.name, tables); + + match format { + "json" => println!("{}", serde_json::to_string_pretty(&rows).unwrap()), + "yaml" => print!("{}", serde_yaml::to_string(&rows).unwrap()), + "table" => { + if rows.is_empty() { + use crossterm::style::Stylize; + eprintln!("{}", "No tables found.".dark_grey()); + } else { + let table_rows: Vec> = rows + .iter() + .map(|r| { + vec![ + r.full_name.clone(), + r.synced.to_string(), + r.last_sync + .as_deref() + .map(crate::util::format_date) + .unwrap_or_else(|| "-".to_string()), + ] + }) + .collect(); + crate::table::print(&["TABLE", "SYNCED", "LAST_SYNC"], &table_rows); + } + } + _ => unreachable!(), + } +} + +pub fn tables_load( + workspace_id: &str, + database: &str, + table: &str, + schema: Option<&str>, + file: Option<&str>, + upload_id: Option<&str>, +) { + use crossterm::style::Stylize; + + let api = ApiClient::new(Some(workspace_id)); + let db = resolve_database(&api, database); + let schema = schema_name(schema); + + // clap rejects `--file` and `--upload-id` together; the `(Some, Some)` arm is unreachable. + let upload_id = match (upload_id, file) { + (Some(id), None) => id.to_string(), + (None, Some(path)) => upload_parquet_file(&api, path), + (None, None) => { + eprintln!("error: --file or --upload-id is required"); + std::process::exit(1); + } + (Some(_), Some(_)) => unreachable!(), + }; + + let path = managed_table_load_path(&db.id, schema, table); + let body = load_table_request(&upload_id); + + let spinner = crate::util::spinner("Loading table..."); + let (status, resp_body) = api.post_raw(&path, &body); + spinner.finish_and_clear(); + + if !status.is_success() { + let msg = crate::util::api_error(resp_body); + if msg.contains("not declared") { + eprintln!("{}", msg.red()); + eprintln!( + "{}", + format!( + "Declare the table when creating the database, e.g.:\n \ + hotdata databases create --name {} --table {}", + db.name, table + ) + .dark_grey() + ); + } else { + eprintln!("{}", msg.red()); + } + std::process::exit(1); + } + + let result: LoadManagedTableResponse = match serde_json::from_str(&resp_body) { + Ok(v) => v, + Err(e) => { + eprintln!("error parsing response: {e}"); + std::process::exit(1); + } + }; + + let full_name = format!("{}.{}.{}", db.name, result.schema_name, result.table_name); + println!("{}", "Table loaded".green()); + println!("full_name: {}", full_name.green()); + println!("rows: {}", result.row_count); +} + +pub fn tables_delete( + workspace_id: &str, + database: &str, + table: &str, + schema: Option<&str>, +) { + use crossterm::style::Stylize; + + let api = ApiClient::new(Some(workspace_id)); + let db = resolve_database(&api, database); + let schema = schema_name(schema); + + let path = managed_table_delete_path(&db.id, schema, table); + let (status, resp_body) = api.delete_raw(&path); + + if !status.is_success() { + eprintln!("{}", crate::util::api_error(resp_body).red()); + std::process::exit(1); + } + + println!( + "{}", + format!("Table '{}.{}.{}' deleted.", db.name, schema, table).green() + ); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn schema_name_defaults_to_public() { + assert_eq!(schema_name(None), "public"); + assert_eq!(schema_name(Some("custom")), "custom"); + } + + #[test] + fn build_managed_config_empty_without_tables() { + assert_eq!(build_managed_config("public", &[]), serde_json::json!({})); + } + + #[test] + fn build_managed_config_declares_tables() { + let cfg = build_managed_config("public", &["orders".to_string(), "customers".to_string()]); + assert_eq!( + cfg, + serde_json::json!({ + "schemas": [{ + "name": "public", + "tables": [{ "name": "orders" }, { "name": "customers" }] + }] + }) + ); + } + + #[test] + fn is_managed_only_matches_managed_type() { + let db = Database { + id: "c1".into(), + name: "sales".into(), + source_type: "managed".into(), + }; + assert!(is_managed(&db)); + let pg = Database { + id: "c2".into(), + name: "warehouse".into(), + source_type: "postgres".into(), + }; + assert!(!is_managed(&pg)); + } + + #[test] + fn resolve_database_by_name_and_id() { + let mut server = mockito::Server::new(); + let mock = server + .mock("GET", "/connections") + .with_status(200) + .with_body( + r#"{"connections":[ + {"id":"conn_abc","name":"sales","source_type":"managed"}, + {"id":"conn_xyz","name":"warehouse","source_type":"postgres"} + ]}"#, + ) + .expect(2) + .create(); + + let api = ApiClient::test_new(&server.url(), "k", Some("ws")); + let by_name = resolve_database(&api, "sales"); + assert_eq!(by_name.id, "conn_abc"); + let by_id = resolve_database(&api, "conn_abc"); + assert_eq!(by_id.name, "sales"); + mock.assert(); + } + + #[test] + fn try_resolve_database_rejects_non_managed() { + let mut server = mockito::Server::new(); + let mock = server + .mock("GET", "/connections") + .with_status(200) + .with_body( + r#"{"connections":[{"id":"c1","name":"warehouse","source_type":"postgres"}]}"#, + ) + .create(); + + let api = ApiClient::test_new(&server.url(), "k", None); + let err = try_resolve_database(&api, "warehouse").unwrap_err(); + assert!(err.contains("not a managed database")); + mock.assert(); + } + + #[test] + fn try_resolve_database_not_found() { + let mut server = mockito::Server::new(); + let mock = server + .mock("GET", "/connections") + .with_status(200) + .with_body(r#"{"connections":[]}"#) + .create(); + + let api = ApiClient::test_new(&server.url(), "k", None); + let err = try_resolve_database(&api, "missing").unwrap_err(); + assert!(err.contains("no database named")); + mock.assert(); + } + + #[test] + fn create_connection_request_includes_declared_tables() { + let body = create_connection_request( + "sales", + "public", + &["orders".to_string(), "customers".to_string()], + ); + assert_eq!(body["name"], "sales"); + assert_eq!(body["source_type"], "managed"); + assert_eq!(body["skip_discovery"], true); + assert_eq!( + body["config"]["schemas"][0]["tables"][0]["name"], + "orders" + ); + } + + #[test] + fn create_connection_request_empty_config_without_tables() { + let body = create_connection_request("sales", "public", &[]); + assert_eq!(body["config"], serde_json::json!({})); + } + + #[test] + fn managed_table_paths() { + assert_eq!( + managed_table_load_path("conn1", "public", "orders"), + "/connections/conn1/schemas/public/tables/orders/loads" + ); + assert_eq!( + managed_table_delete_path("conn1", "analytics", "events"), + "/connections/conn1/schemas/analytics/tables/events" + ); + } + + #[test] + fn load_table_request_is_replace_mode() { + let body = load_table_request("upl_abc"); + assert_eq!(body["mode"], "replace"); + assert_eq!(body["upload_id"], "upl_abc"); + } + + #[test] + fn is_parquet_path_by_extension() { + assert!(is_parquet_path("/data/orders.parquet")); + assert!(is_parquet_path("/data/ORDERS.PARQUET")); + assert!(is_parquet_path("file.parquet")); + assert!(!is_parquet_path("/data/orders.csv")); + assert!(!is_parquet_path("/data/orders")); + } + + #[test] + fn table_rows_for_database_builds_full_names() { + let rows = table_rows_for_database( + "sales", + vec![InfoTable { + connection: "sales".into(), + schema: "public".into(), + table: "orders".into(), + synced: true, + last_sync: Some("2026-05-19T00:00:00Z".into()), + }], + ); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].full_name, "sales.public.orders"); + assert!(rows[0].synced); + } + + #[test] + fn collect_tables_follows_cursor() { + let mut server = mockito::Server::new(); + let page1 = server + .mock("GET", "/information_schema") + .match_query(mockito::Matcher::AllOf(vec![ + mockito::Matcher::UrlEncoded("connection_id".into(), "conn1".into()), + mockito::Matcher::UrlEncoded("cursor".into(), "cur2".into()), + ])) + .with_status(200) + .with_body( + r#"{"tables":[{"connection":"sales","schema":"public","table":"b","synced":true,"last_sync":null}],"has_more":false,"next_cursor":null}"#, + ) + .create(); + let page0 = server + .mock("GET", "/information_schema") + .match_query(mockito::Matcher::UrlEncoded( + "connection_id".into(), + "conn1".into(), + )) + .with_status(200) + .with_body( + r#"{"tables":[{"connection":"sales","schema":"public","table":"a","synced":false,"last_sync":null}],"has_more":true,"next_cursor":"cur2"}"#, + ) + .create(); + + let api = ApiClient::test_new(&server.url(), "k", Some("ws")); + let tables = collect_tables(&api, "conn1", None); + page0.assert(); + page1.assert(); + assert_eq!(tables.len(), 2); + assert_eq!(tables[0].table, "a"); + assert_eq!(tables[1].table, "b"); + } + + #[test] + fn create_posts_managed_connection_with_schemas() { + let mut server = mockito::Server::new(); + let mock = server + .mock("POST", "/connections") + .match_header("X-Workspace-Id", "ws-test") + .with_status(201) + .with_body( + r#"{"id":"conn_new","name":"mydb","source_type":"managed","tables_discovered":1,"discovery_status":"skipped"}"#, + ) + .match_body(mockito::Matcher::JsonString( + serde_json::to_string(&create_connection_request( + "mydb", + "public", + &["gdp".to_string()], + )) + .unwrap(), + )) + .create(); + + let api = ApiClient::test_new(&server.url(), "k", Some("ws-test")); + let body = create_connection_request("mydb", "public", &["gdp".to_string()]); + let (status, resp_body) = api.post_raw("/connections", &body); + assert_eq!(status.as_u16(), 201); + let parsed: CreateConnectionResponse = serde_json::from_str(&resp_body).unwrap(); + assert_eq!(parsed.name, "mydb"); + assert_eq!(parsed.source_type, "managed"); + mock.assert(); + } + + #[test] + fn tables_load_posts_replace_with_upload_id() { + let mut server = mockito::Server::new(); + let list = server + .mock("GET", "/connections") + .with_status(200) + .with_body( + r#"{"connections":[{"id":"conn1","name":"sales","source_type":"managed"}]}"#, + ) + .create(); + let load = server + .mock("POST", "/connections/conn1/schemas/public/tables/orders/loads") + .match_body(mockito::Matcher::JsonString( + serde_json::to_string(&load_table_request("upl_123")).unwrap(), + )) + .with_status(200) + .with_body( + r#"{ + "connection_id":"conn1", + "schema_name":"public", + "table_name":"orders", + "row_count":42, + "arrow_schema_json":"{}" + }"#, + ) + .create(); + + let api = ApiClient::test_new(&server.url(), "k", Some("ws1")); + let db = resolve_database(&api, "sales"); + let path = managed_table_load_path(&db.id, "public", "orders"); + let body = load_table_request("upl_123"); + let (status, resp_body) = api.post_raw(&path, &body); + assert!(status.is_success()); + let parsed: LoadManagedTableResponse = serde_json::from_str(&resp_body).unwrap(); + assert_eq!(parsed.row_count, 42); + assert_eq!(parsed.table_name, "orders"); + list.assert(); + load.assert(); + } + + #[test] + fn tables_delete_calls_managed_table_endpoint() { + let mut server = mockito::Server::new(); + let list = server + .mock("GET", "/connections") + .with_status(200) + .with_body( + r#"{"connections":[{"id":"conn1","name":"sales","source_type":"managed"}]}"#, + ) + .create(); + let delete = server + .mock("DELETE", "/connections/conn1/schemas/public/tables/orders") + .with_status(204) + .with_body("") + .create(); + + let api = ApiClient::test_new(&server.url(), "k", None); + let db = resolve_database(&api, "sales"); + let path = managed_table_delete_path(&db.id, "public", "orders"); + let (status, _) = api.delete_raw(&path); + assert_eq!(status.as_u16(), 204); + list.assert(); + delete.assert(); + } + + #[test] + fn load_response_parses_row_count_and_names() { + let body = r#"{ + "connection_id":"conn1", + "schema_name":"analytics", + "table_name":"events", + "row_count":99, + "arrow_schema_json":"{}" + }"#; + let parsed: LoadManagedTableResponse = serde_json::from_str(body).unwrap(); + assert_eq!(parsed.schema_name, "analytics"); + assert_eq!(parsed.table_name, "events"); + assert_eq!(parsed.row_count, 99); + } +} diff --git a/src/main.rs b/src/main.rs index 3b2b72d..ca20713 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ mod config; mod connections; mod connections_new; mod context; +mod databases; mod datasets; mod embedding_providers; mod indexes; @@ -25,9 +26,9 @@ use anstyle::AnsiColor; use clap::{Parser, builder::Styles}; use command::{ AuthCommands, Commands, ConnectionsCommands, ConnectionsCreateCommands, ContextCommands, - DatasetsCommands, EmbeddingProvidersCommands, IndexesCommands, JobsCommands, QueriesCommands, - QueryCommands, ResultsCommands, SandboxCommands, SkillCommands, TablesCommands, - WorkspaceCommands, + DatabaseTablesCommands, DatabasesCommands, DatasetsCommands, EmbeddingProvidersCommands, + IndexesCommands, JobsCommands, QueriesCommands, QueryCommands, ResultsCommands, + SandboxCommands, SkillCommands, TablesCommands, WorkspaceCommands, }; #[derive(Parser)] @@ -358,6 +359,83 @@ fn main() { } } } + Commands::Databases { + name_or_id, + workspace_id, + output, + command, + } => { + let workspace_id = resolve_workspace(workspace_id); + if let Some(name_or_id) = name_or_id { + databases::get(&workspace_id, &name_or_id, &output); + } else { + match command { + Some(DatabasesCommands::List { output }) => { + databases::list(&workspace_id, &output) + } + Some(DatabasesCommands::Create { + name, + schema, + tables, + output, + }) => databases::create( + &workspace_id, + &name, + &schema, + &tables, + &output, + ), + Some(DatabasesCommands::Delete { name_or_id }) => { + databases::delete(&workspace_id, &name_or_id) + } + Some(DatabasesCommands::Tables { command }) => match command { + DatabaseTablesCommands::List { + database, + schema, + output, + } => databases::tables_list( + &workspace_id, + &database, + schema.as_deref(), + &output, + ), + DatabaseTablesCommands::Load { + database, + table, + schema, + file, + upload_id, + } => databases::tables_load( + &workspace_id, + &database, + &table, + Some(schema.as_str()), + file.as_deref(), + upload_id.as_deref(), + ), + DatabaseTablesCommands::Delete { + database, + table, + schema, + } => databases::tables_delete( + &workspace_id, + &database, + &table, + Some(schema.as_str()), + ), + }, + None => { + use clap::CommandFactory; + let mut cmd = Cli::command(); + cmd.build(); + cmd.find_subcommand_mut("databases") + .unwrap() + .print_help() + .unwrap(); + } + } + } + } Commands::Tables { command } => match command { TablesCommands::List { workspace_id, diff --git a/tests/databases_cli.rs b/tests/databases_cli.rs new file mode 100644 index 0000000..d9cd4ba --- /dev/null +++ b/tests/databases_cli.rs @@ -0,0 +1,85 @@ +use std::process::Command; + +fn hotdata() -> Command { + Command::new(env!("CARGO_BIN_EXE_hotdata")) +} + +#[test] +fn databases_help_lists_subcommands() { + let output = hotdata().args(["databases", "--help"]).output().unwrap(); + assert!( + output.status.success(), + "stderr: {}", + String::from_utf8_lossy(&output.stderr) + ); + let help = String::from_utf8_lossy(&output.stdout); + assert!(help.contains("list")); + assert!(help.contains("create")); + assert!(help.contains("delete")); + assert!(help.contains("tables")); +} + +#[test] +fn databases_create_help_documents_table_flag() { + let output = hotdata() + .args(["databases", "create", "--help"]) + .output() + .unwrap(); + assert!(output.status.success()); + let help = String::from_utf8_lossy(&output.stdout); + assert!(help.contains("--table")); + assert!(help.contains("--name")); +} + +#[test] +fn databases_tables_load_help_documents_file_and_upload_id() { + let output = hotdata() + .args(["databases", "tables", "load", "--help"]) + .output() + .unwrap(); + assert!(output.status.success()); + let help = String::from_utf8_lossy(&output.stdout); + assert!(help.contains("load")); + assert!(help.contains("--file")); + assert!(help.contains("--upload-id")); + assert!(help.contains("parquet")); +} + +#[test] +fn databases_create_requires_name() { + let output = hotdata().args(["databases", "create"]).output().unwrap(); + assert!(!output.status.success()); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!( + stderr.contains("--name") || stderr.contains("required"), + "stderr: {stderr}" + ); +} + +#[test] +fn databases_tables_load_rejects_both_file_and_upload_id_at_parse_time() { + let output = hotdata() + .args([ + "databases", + "tables", + "load", + "mydb", + "t1", + "--file", + "a.parquet", + "--upload-id", + "upl_1", + ]) + .output() + .unwrap(); + assert!(!output.status.success()); + let combined = format!( + "{}{}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); + assert!( + combined.contains("cannot be used with"), + "output: {combined}" + ); +}