diff --git a/src/command.rs b/src/command.rs index a8ac3de..12e659d 100644 --- a/src/command.rs +++ b/src/command.rs @@ -8,7 +8,7 @@ pub enum Commands { command: Option, }, - /// Upload and query Parquet, CSV, and JSON files + /// Derived views — virtual SQL tables built from queries over your data Datasets { /// Dataset ID to show details id: Option, @@ -453,53 +453,37 @@ pub enum DatasetsCommands { output: String, }, - /// Create a new dataset from a file, piped stdin, upload ID, or SQL query + /// Create a derived view from a SQL query or saved query Create { - /// Dataset label (derived from filename if omitted) + /// SQL table name the dataset is addressable as (e.g. my_view) #[arg(long)] - label: Option, + name: String, - /// Table name (derived from label if omitted) + /// Human-readable display label #[arg(long)] - table_name: Option, - - /// Path to a file to upload (omit to read from stdin) - #[arg(long, conflicts_with_all = ["upload_id", "sql"])] - file: Option, - - /// Skip upload and use a pre-existing upload ID directly - #[arg(long, conflicts_with_all = ["file", "sql"])] - upload_id: Option, - - /// Source format when using --upload-id (csv, json, parquet) - #[arg(long, default_value = "csv", value_parser = ["csv", "json", "parquet"], requires = "upload_id")] - format: String, + description: Option, /// SQL query to create the dataset from - #[arg(long, conflicts_with_all = ["file", "upload_id", "query_id", "url"])] + #[arg(long, conflicts_with = "query_id", required_unless_present = "query_id")] sql: Option, /// Saved query ID to create the dataset from - #[arg(long, conflicts_with_all = ["file", "upload_id", "sql", "url"])] + #[arg(long, conflicts_with = "sql", required_unless_present = "sql")] query_id: Option, - - /// URL to import data from - #[arg(long, conflicts_with_all = ["file", "upload_id", "sql", "query_id"])] - url: Option, }, - /// Update a dataset's label and/or table name + /// Update a dataset's description and/or name Update { /// Dataset ID id: String, /// New display label #[arg(long)] - label: Option, + description: Option, /// New SQL table name (must be a valid identifier) #[arg(long)] - table_name: Option, + name: Option, /// Output format #[arg(long = "output", short = 'o', default_value = "table", value_parser = ["table", "json", "yaml"])] diff --git a/src/datasets.rs b/src/datasets.rs index bab4604..65219ee 100644 --- a/src/datasets.rs +++ b/src/datasets.rs @@ -1,8 +1,6 @@ use crate::api::ApiClient; -use indicatif::{ProgressBar, ProgressStyle}; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::path::Path; #[derive(Deserialize, Serialize)] struct Dataset { @@ -72,187 +70,20 @@ struct UpdateResponse { updated_at: String, } -struct FileType { - content_type: &'static str, - format: &'static str, -} - -fn detect_from_bytes(bytes: &[u8]) -> FileType { - if bytes.starts_with(b"PAR1") { - return FileType { - content_type: "application/octet-stream", - format: "parquet", - }; - } - let first = bytes.iter().find(|&&b| !b.is_ascii_whitespace()).copied(); - if matches!(first, Some(b'{') | Some(b'[')) { - return FileType { - content_type: "application/json", - format: "json", - }; - } - FileType { - content_type: "text/csv", - format: "csv", - } -} - -fn detect_from_path(path: &str) -> Option { - match Path::new(path).extension().and_then(|e| e.to_str()) { - Some("csv") => Some(FileType { - content_type: "text/csv", - format: "csv", - }), - Some("json") => Some(FileType { - content_type: "application/json", - format: "json", - }), - Some("parquet") => Some(FileType { - content_type: "application/octet-stream", - format: "parquet", - }), - _ => None, - } -} - -/// Try to resolve the filename of the file redirected into stdin. -/// Works for `cmd < file.csv` but not for pipes (`cat file.csv | cmd`). -fn stdin_redirect_filename() -> Option { - #[cfg(target_os = "linux")] - { - std::fs::read_link("/proc/self/fd/0") - .ok() - .and_then(|p| p.file_stem().map(|s| s.to_string_lossy().into_owned())) - } - #[cfg(target_os = "macos")] - { - use nix::fcntl::{FcntlArg, fcntl}; - use std::os::unix::io::AsRawFd; - let fd = std::io::stdin().as_raw_fd(); - let mut path = std::path::PathBuf::new(); - match fcntl(fd, FcntlArg::F_GETPATH(&mut path)) { - Ok(_) => path.file_stem().map(|s| s.to_string_lossy().into_owned()), - Err(_) => None, - } - } - #[cfg(not(any(target_os = "linux", target_os = "macos")))] - { - None - } -} - -fn make_progress_bar(total: u64) -> ProgressBar { - let pb = ProgressBar::new(total); - pb.set_style( - ProgressStyle::with_template( - "{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})", - ) - .unwrap() - .progress_chars("=>-"), - ); - pb -} - -fn do_upload( - api: &ApiClient, - content_type: &str, - reader: R, - pb: ProgressBar, - content_length: Option, -) -> String { - let (status, resp_body) = api.post_body("/files", content_type, reader, content_length); - - pb.finish_and_clear(); - - if !status.is_success() { - use crossterm::style::Stylize; - eprintln!("{}", crate::util::api_error(resp_body).red()); - std::process::exit(1); - } - - let body: serde_json::Value = match serde_json::from_str(&resp_body) { - Ok(v) => v, - Err(e) => { - eprintln!("error parsing upload response: {e}"); - std::process::exit(1); - } - }; - - match body["id"].as_str() { - Some(id) => id.to_string(), - None => { - eprintln!("error: upload response missing id"); - std::process::exit(1); - } - } -} - -// Returns (upload_id, format) -fn upload_from_file(api: &ApiClient, path: &str) -> (String, &'static str) { - let mut f = match std::fs::File::open(path) { - Ok(f) => f, - Err(e) => { - eprintln!("error opening file '{path}': {e}"); - std::process::exit(1); - } - }; - - let ft = detect_from_path(path).unwrap_or_else(|| { - use std::io::{Read, Seek}; - let mut probe = [0u8; 512]; - let n = f.read(&mut probe).unwrap_or(0); - let _ = f.seek(std::io::SeekFrom::Start(0)); - detect_from_bytes(&probe[..n]) - }); - - let file_size = f.metadata().map(|m| m.len()).unwrap_or(0); - let pb = make_progress_bar(file_size); - let reader = pb.wrap_read(f); - - let id = do_upload(api, ft.content_type, reader, pb, Some(file_size)); - (id, ft.format) -} - -// Returns (upload_id, format) -fn upload_from_stdin(api: &ApiClient) -> (String, &'static str) { - use std::io::Read; - let mut probe = [0u8; 512]; - let n = std::io::stdin().read(&mut probe).unwrap_or(0); - let ft = detect_from_bytes(&probe[..n]); - - let reader = std::io::Cursor::new(probe[..n].to_vec()).chain(std::io::stdin()); - - let pb = ProgressBar::new_spinner(); - pb.set_style( - ProgressStyle::with_template("{spinner:.green} {bytes} uploaded ({elapsed})").unwrap(), - ); - pb.enable_steady_tick(std::time::Duration::from_millis(80)); - let reader = pb.wrap_read(reader); - - let id = do_upload(api, ft.content_type, reader, pb, None); - (id, ft.format) -} - fn create_dataset( api: &ApiClient, - label: &str, - table_name: Option<&str>, + description: Option<&str>, + name: &str, source: serde_json::Value, - on_failure: Option>, ) { - let mut body = json!({ "label": label, "source": source }); - if let Some(tn) = table_name { - body["table_name"] = json!(tn); - } + let label = description.unwrap_or(name); + let body = json!({ "table_name": name, "label": label, "source": source }); let (status, resp_body) = api.post_raw("/datasets", &body); if !status.is_success() { use crossterm::style::Stylize; eprintln!("{}", crate::util::api_error(resp_body).red()); - if let Some(f) = on_failure { - f(); - } std::process::exit(1); } @@ -274,144 +105,19 @@ fn create_dataset( ); } -pub fn create_from_upload( - workspace_id: &str, - label: Option<&str>, - table_name: Option<&str>, - file: Option<&str>, - upload_id: Option<&str>, - source_format: &str, -) { +pub fn create_from_query(workspace_id: &str, sql: &str, description: Option<&str>, name: &str) { let api = ApiClient::new(Some(workspace_id)); - - let label_derived; - let label: &str = match label { - Some(l) => l, - None => match file { - Some(path) => { - label_derived = Path::new(path) - .file_stem() - .and_then(|s| s.to_str()) - .unwrap_or("dataset") - .to_string(); - &label_derived - } - None => { - if upload_id.is_some() { - eprintln!("error: no label provided. Use --label to name the dataset."); - std::process::exit(1); - } - match stdin_redirect_filename() { - Some(name) => { - label_derived = name; - &label_derived - } - None => { - eprintln!("error: no label provided. Use --label to name the dataset."); - std::process::exit(1); - } - } - } - }, - }; - - let (upload_id, format, upload_id_was_uploaded): (String, &str, bool) = if let Some(id) = - upload_id - { - (id.to_string(), source_format, false) - } else { - let (id, fmt) = match file { - Some(path) => upload_from_file(&api, path), - None => { - use std::io::IsTerminal; - if std::io::stdin().is_terminal() { - eprintln!( - "error: no input data. Use --file , --upload-id , or pipe data via stdin." - ); - std::process::exit(1); - } - upload_from_stdin(&api) - } - }; - (id, fmt, true) - }; - - let source = json!({ "upload_id": upload_id, "format": format }); - - let on_failure: Option> = if upload_id_was_uploaded { - let uid = upload_id.clone(); - Some(Box::new(move || { - use crossterm::style::Stylize; - eprintln!( - "{}", - format!( - "Resume dataset creation without re-uploading by passing --upload-id {uid}" - ) - .yellow() - ); - })) - } else { - None - }; - - create_dataset(&api, label, table_name, source, on_failure); -} - -pub fn create_from_url( - workspace_id: &str, - url: &str, - label: Option<&str>, - table_name: Option<&str>, -) { - let label = match label { - Some(l) => l, - None => { - eprintln!("error: --label is required when using --url"); - std::process::exit(1); - } - }; - let api = ApiClient::new(Some(workspace_id)); - create_dataset(&api, label, table_name, json!({ "url": url }), None); -} - -pub fn create_from_query( - workspace_id: &str, - sql: &str, - label: Option<&str>, - table_name: Option<&str>, -) { - let label = match label { - Some(l) => l, - None => { - eprintln!("error: --label is required when using --sql"); - std::process::exit(1); - } - }; - let api = ApiClient::new(Some(workspace_id)); - create_dataset(&api, label, table_name, json!({ "sql": sql }), None); + create_dataset(&api, description, name, json!({ "sql": sql })); } pub fn create_from_saved_query( workspace_id: &str, query_id: &str, - label: Option<&str>, - table_name: Option<&str>, + description: Option<&str>, + name: &str, ) { - let label = match label { - Some(l) => l, - None => { - eprintln!("error: --label is required when using --query-id"); - std::process::exit(1); - } - }; let api = ApiClient::new(Some(workspace_id)); - create_dataset( - &api, - label, - table_name, - json!({ "saved_query_id": query_id }), - None, - ); + create_dataset(&api, description, name, json!({ "saved_query_id": query_id })); } pub fn list(workspace_id: &str, limit: Option, offset: Option, format: &str) { @@ -502,23 +208,23 @@ pub fn get(dataset_id: &str, workspace_id: &str, format: &str) { pub fn update( dataset_id: &str, workspace_id: &str, - label: Option<&str>, - table_name: Option<&str>, + description: Option<&str>, + name: Option<&str>, format: &str, ) { - if label.is_none() && table_name.is_none() { - eprintln!("error: provide at least one of --label or --table-name."); + if description.is_none() && name.is_none() { + eprintln!("error: provide at least one of --description or --name."); std::process::exit(1); } let api = ApiClient::new(Some(workspace_id)); let mut body = json!({}); - if let Some(l) = label { - body["label"] = json!(l); + if let Some(d) = description { + body["label"] = json!(d); } - if let Some(tn) = table_name { - body["table_name"] = json!(tn); + if let Some(n) = name { + body["table_name"] = json!(n); } let d: UpdateResponse = api.put(&format!("/datasets/{dataset_id}"), &body); diff --git a/src/main.rs b/src/main.rs index 4fb4a38..8957ae7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -208,57 +208,37 @@ fn main() { output, }) => datasets::list(&workspace_id, limit, offset, &output), Some(DatasetsCommands::Create { - label, - table_name, - file, - upload_id, - format, + name, + description, sql, query_id, - url, }) => { if let Some(sql) = sql { datasets::create_from_query( &workspace_id, &sql, - label.as_deref(), - table_name.as_deref(), - ) - } else if let Some(query_id) = query_id { - datasets::create_from_saved_query( - &workspace_id, - &query_id, - label.as_deref(), - table_name.as_deref(), - ) - } else if let Some(url) = url { - datasets::create_from_url( - &workspace_id, - &url, - label.as_deref(), - table_name.as_deref(), + description.as_deref(), + &name, ) } else { - datasets::create_from_upload( + datasets::create_from_saved_query( &workspace_id, - label.as_deref(), - table_name.as_deref(), - file.as_deref(), - upload_id.as_deref(), - &format, + query_id.as_deref().unwrap_or_else(|| unreachable!("clap enforces --sql or --query-id")), + description.as_deref(), + &name, ) } } Some(DatasetsCommands::Update { id, - label, - table_name, + description, + name, output, }) => datasets::update( &id, &workspace_id, - label.as_deref(), - table_name.as_deref(), + description.as_deref(), + name.as_deref(), &output, ), Some(DatasetsCommands::Refresh { id, r#async }) => {