diff --git a/Cargo.lock b/Cargo.lock index fdd7420a93f..515e1ecd9f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9073,8 +9073,11 @@ name = "vortex-tui" version = "0.1.0" dependencies = [ "anyhow", + "arrow-array", + "arrow-schema", "clap", "crossterm 0.29.0", + "datafusion", "env_logger", "flatbuffers", "futures", @@ -9084,9 +9087,12 @@ dependencies = [ "itertools 0.14.0", "parquet", "ratatui", + "serde", + "serde_json", "taffy", "tokio", "vortex", + "vortex-datafusion", ] [[package]] diff --git a/vortex-tui/Cargo.toml b/vortex-tui/Cargo.toml index 01b3553acf2..0916e18f0e7 100644 --- a/vortex-tui/Cargo.toml +++ b/vortex-tui/Cargo.toml @@ -15,8 +15,11 @@ version = { workspace = true } [dependencies] anyhow = { workspace = true } +arrow-array = { workspace = true } +arrow-schema = { workspace = true } clap = { workspace = true, features = ["derive"] } crossterm = { workspace = true } +datafusion = { workspace = true } env_logger = { version = "0.11" } flatbuffers = { workspace = true } futures = { workspace = true, features = ["executor"] } @@ -26,9 +29,12 @@ indicatif = { workspace = true, features = ["futures"] } itertools = { workspace = true } parquet = { workspace = true, features = ["arrow", "async"] } ratatui = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } taffy = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } vortex = { workspace = true, features = ["tokio"] } +vortex-datafusion = { workspace = true } [lints] workspace = true diff --git a/vortex-tui/src/browse/app.rs b/vortex-tui/src/browse/app.rs index 38b8ab64da7..a00ae258efb 100644 --- a/vortex-tui/src/browse/app.rs +++ b/vortex-tui/src/browse/app.rs @@ -12,6 +12,7 @@ use vortex::dtype::DType; use vortex::error::VortexExpect; use vortex::error::VortexResult; use vortex::error::VortexUnwrap; +use vortex::error::vortex_err; use vortex::file::Footer; use vortex::file::OpenOptionsSessionExt; use vortex::file::SegmentSpec; @@ -24,6 +25,7 @@ use vortex::layout::segments::SegmentId; use vortex::layout::segments::SegmentSource; use crate::SESSION; +use crate::browse::ui::QueryState; use crate::browse::ui::SegmentGridState; #[derive(Default, Copy, Clone, Eq, PartialEq)] @@ -34,8 +36,9 @@ pub enum Tab { /// Show a segment map of the file Segments, - // TODO(aduffy): SQL query page powered by DF - // Query, + + /// SQL query interface powered by DataFusion + Query, } /// A pointer into the `Layout` hierarchy that can be advanced. @@ -199,6 +202,12 @@ pub struct AppState<'a> { /// Scroll offset for the encoding tree display in FlatLayout view pub tree_scroll_offset: u16, + + /// State for the Query tab + pub query_state: QueryState, + + /// File path for use in query execution + pub file_path: String, } impl AppState<'_> { @@ -210,6 +219,11 @@ impl AppState<'_> { /// Create an app backed from a file path. pub async fn create_file_app<'a>(path: impl AsRef) -> VortexResult> { + let file_path = path + .as_ref() + .to_str() + .ok_or_else(|| vortex_err!("Path is not valid UTF-8"))? + .to_string(); let vxf = SESSION.open_options().open(path.as_ref()).await?; let cursor = LayoutCursor::new(vxf.footer().clone(), vxf.segment_source()); @@ -225,5 +239,7 @@ pub async fn create_file_app<'a>(path: impl AsRef) -> VortexResult HandleResult { if let Event::Key(key) = event && key.kind == KeyEventKind::Press { + // Check if we're in Query tab with SQL input focus - handle text input first + let in_sql_input = + app.current_tab == Tab::Query && app.query_state.focus == QueryFocus::SqlInput; + + // Handle SQL input mode - most keys should type into the input + if in_sql_input { + match (key.code, key.modifiers) { + // These keys exit/switch even in SQL input mode + (KeyCode::Tab, _) => { + app.current_tab = Tab::Layout; + } + (KeyCode::Esc, _) => { + app.query_state.toggle_focus(); + } + (KeyCode::Enter, _) => { + // Execute the SQL query with COUNT(*) for pagination + app.query_state.sort_column = None; + app.query_state.sort_direction = SortDirection::None; + let file_path = app.file_path.clone(); + app.query_state.execute_initial_query(&file_path); + // Switch focus to results table after executing + app.query_state.focus = QueryFocus::ResultsTable; + } + // Navigation keys + (KeyCode::Left, _) => app.query_state.move_cursor_left(), + (KeyCode::Right, _) => app.query_state.move_cursor_right(), + (KeyCode::Home, _) => app.query_state.move_cursor_start(), + (KeyCode::End, _) => app.query_state.move_cursor_end(), + // Control key shortcuts + (KeyCode::Char('a'), KeyModifiers::CONTROL) => app.query_state.move_cursor_start(), + (KeyCode::Char('e'), KeyModifiers::CONTROL) => app.query_state.move_cursor_end(), + (KeyCode::Char('u'), KeyModifiers::CONTROL) => app.query_state.clear_input(), + (KeyCode::Char('b'), KeyModifiers::CONTROL) => app.query_state.move_cursor_left(), + (KeyCode::Char('f'), KeyModifiers::CONTROL) => app.query_state.move_cursor_right(), + (KeyCode::Char('d'), KeyModifiers::CONTROL) => { + app.query_state.delete_char_forward() + } + // Delete keys + (KeyCode::Backspace, _) => app.query_state.delete_char(), + (KeyCode::Delete, _) => app.query_state.delete_char_forward(), + // All other characters get typed into the input + (KeyCode::Char(c), KeyModifiers::NONE | KeyModifiers::SHIFT) => { + app.query_state.insert_char(c); + } + _ => {} + } + return HandleResult::Continue; + } + + // Normal mode handling for all other cases match (key.code, key.modifiers) { (KeyCode::Char('q'), _) => { // Close the process down. @@ -60,9 +114,25 @@ fn handle_normal_mode(app: &mut AppState, event: Event) -> HandleResult { // toggle between tabs app.current_tab = match app.current_tab { Tab::Layout => Tab::Segments, - Tab::Segments => Tab::Layout, + Tab::Segments => Tab::Query, + Tab::Query => Tab::Layout, }; } + + // Query tab: Ctrl+h for previous page + (KeyCode::Char('h'), KeyModifiers::CONTROL) => { + if app.current_tab == Tab::Query { + app.query_state.prev_page(&app.file_path.clone()); + } + } + + // Query tab: Ctrl+l for next page + (KeyCode::Char('l'), KeyModifiers::CONTROL) => { + if app.current_tab == Tab::Query { + app.query_state.next_page(&app.file_path.clone()); + } + } + (KeyCode::Up | KeyCode::Char('k'), _) | (KeyCode::Char('p'), KeyModifiers::CONTROL) => { // We send the key-up to the list state if we're looking at // the Layouts tab. @@ -75,6 +145,9 @@ fn handle_normal_mode(app: &mut AppState, event: Event) -> HandleResult { } } Tab::Segments => app.segment_grid_state.scroll_up(10), + Tab::Query => { + app.query_state.table_state.select_previous(); + } } } (KeyCode::Down | KeyCode::Char('j'), _) @@ -87,6 +160,9 @@ fn handle_normal_mode(app: &mut AppState, event: Event) -> HandleResult { } } Tab::Segments => app.segment_grid_state.scroll_down(10), + Tab::Query => { + app.query_state.table_state.select_next(); + } }, (KeyCode::PageUp, _) | (KeyCode::Char('v'), KeyModifiers::ALT) => { match app.current_tab { @@ -98,6 +174,9 @@ fn handle_normal_mode(app: &mut AppState, event: Event) -> HandleResult { } } Tab::Segments => app.segment_grid_state.scroll_up(100), + Tab::Query => { + app.query_state.prev_page(&app.file_path.clone()); + } } } (KeyCode::PageDown, _) | (KeyCode::Char('v'), KeyModifiers::CONTROL) => { @@ -110,25 +189,39 @@ fn handle_normal_mode(app: &mut AppState, event: Event) -> HandleResult { } } Tab::Segments => app.segment_grid_state.scroll_down(100), + Tab::Query => { + app.query_state.next_page(&app.file_path.clone()); + } } } (KeyCode::Home, _) | (KeyCode::Char('<'), KeyModifiers::ALT) => match app.current_tab { Tab::Layout => app.layouts_list_state.select_first(), Tab::Segments => app.segment_grid_state.scroll_left(200), + Tab::Query => { + app.query_state.table_state.select_first(); + } }, (KeyCode::End, _) | (KeyCode::Char('>'), KeyModifiers::ALT) => match app.current_tab { Tab::Layout => app.layouts_list_state.select_last(), Tab::Segments => app.segment_grid_state.scroll_right(200), + Tab::Query => { + app.query_state.table_state.select_last(); + } }, (KeyCode::Enter, _) => { - if app.current_tab == Tab::Layout && app.cursor.layout().nchildren() > 0 { - // Descend into the layout subtree for the selected child. - let selected = app.layouts_list_state.selected().unwrap_or_default(); - app.cursor = app.cursor.child(selected); + match app.current_tab { + Tab::Layout => { + if app.cursor.layout().nchildren() > 0 { + // Descend into the layout subtree for the selected child. + let selected = app.layouts_list_state.selected().unwrap_or_default(); + app.cursor = app.cursor.child(selected); - // Reset the list scroll state and tree scroll offset. - app.layouts_list_state = ListState::default().with_selected(Some(0)); - app.tree_scroll_offset = 0; + // Reset the list scroll state and tree scroll offset. + app.layouts_list_state = ListState::default().with_selected(Some(0)); + app.tree_scroll_offset = 0; + } + } + Tab::Query | Tab::Segments => {} } } (KeyCode::Left | KeyCode::Char('h'), _) @@ -142,17 +235,44 @@ fn handle_normal_mode(app: &mut AppState, event: Event) -> HandleResult { app.tree_scroll_offset = 0; } Tab::Segments => app.segment_grid_state.scroll_left(20), + Tab::Query => { + app.query_state.horizontal_scroll = + app.query_state.horizontal_scroll.saturating_sub(1); + } } } (KeyCode::Right | KeyCode::Char('l'), _) | (KeyCode::Char('b'), KeyModifiers::ALT) => { match app.current_tab { Tab::Layout => {} Tab::Segments => app.segment_grid_state.scroll_right(20), + Tab::Query => { + let max_col = app.query_state.column_count().saturating_sub(1); + if app.query_state.horizontal_scroll < max_col { + app.query_state.horizontal_scroll += 1; + } + } } } (KeyCode::Char('/'), _) | (KeyCode::Char('s'), KeyModifiers::CONTROL) => { - app.key_mode = KeyMode::Search; + if app.current_tab != Tab::Query { + app.key_mode = KeyMode::Search; + } + } + + (KeyCode::Char('s'), KeyModifiers::NONE) => { + if app.current_tab == Tab::Query { + // Sort by selected column - modifies the SQL query + let col = app.query_state.selected_column(); + app.query_state.apply_sort(col, &app.file_path); + } + } + + (KeyCode::Esc, _) => { + if app.current_tab == Tab::Query { + // Toggle focus in Query tab + app.query_state.toggle_focus(); + } } // Most events not handled diff --git a/vortex-tui/src/browse/ui/mod.rs b/vortex-tui/src/browse/ui/mod.rs index cbe30878ec6..90cf8bf5deb 100644 --- a/vortex-tui/src/browse/ui/mod.rs +++ b/vortex-tui/src/browse/ui/mod.rs @@ -2,9 +2,14 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors mod layouts; +mod query; mod segments; use layouts::render_layouts; +pub use query::QueryFocus; +pub use query::QueryState; +pub use query::SortDirection; +use query::render_query; use ratatui::prelude::*; use ratatui::widgets::Block; use ratatui::widgets::BorderType; @@ -57,17 +62,13 @@ pub fn render_app(app: &mut AppState<'_>, frame: &mut Frame<'_>) { let selected_tab = match app.current_tab { Tab::Layout => 0, Tab::Segments => 1, + Tab::Query => 2, }; - let tabs = Tabs::new([ - "File Layout", - "Segments", - // TODO(aduffy): add SQL query interface - // "Query", - ]) - .style(Style::default().bold().white()) - .highlight_style(Style::default().bold().black().on_white()) - .select(Some(selected_tab)); + let tabs = Tabs::new(["File Layout", "Segments", "Query"]) + .style(Style::default().bold().white()) + .highlight_style(Style::default().bold().black().on_white()) + .select(Some(selected_tab)); frame.render_widget(tabs, tab_view); @@ -77,5 +78,6 @@ pub fn render_app(app: &mut AppState<'_>, frame: &mut Frame<'_>) { render_layouts(app, app_view, frame.buffer_mut()); } Tab::Segments => segments_ui(app, app_view, frame.buffer_mut()), + Tab::Query => render_query(app, app_view, frame.buffer_mut()), } } diff --git a/vortex-tui/src/browse/ui/query.rs b/vortex-tui/src/browse/ui/query.rs new file mode 100644 index 00000000000..62ff89b5954 --- /dev/null +++ b/vortex-tui/src/browse/ui/query.rs @@ -0,0 +1,644 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use arrow_array::RecordBatch; +use ratatui::buffer::Buffer; +use ratatui::layout::Constraint; +use ratatui::layout::Layout; +use ratatui::layout::Rect; +use ratatui::style::Color; +use ratatui::style::Style; +use ratatui::style::Stylize; +use ratatui::text::Line; +use ratatui::text::Span; +use ratatui::widgets::Block; +use ratatui::widgets::BorderType; +use ratatui::widgets::Borders; +use ratatui::widgets::Cell; +use ratatui::widgets::Paragraph; +use ratatui::widgets::Row; +use ratatui::widgets::Scrollbar; +use ratatui::widgets::ScrollbarOrientation; +use ratatui::widgets::ScrollbarState; +use ratatui::widgets::StatefulWidget; +use ratatui::widgets::Table; +use ratatui::widgets::TableState; +use ratatui::widgets::Widget; +use tokio::runtime::Handle; +use tokio::task::block_in_place; + +use crate::browse::app::AppState; +use crate::datafusion_helper::arrow_value_to_json; +use crate::datafusion_helper::execute_query; +use crate::datafusion_helper::json_value_to_display; + +/// Sort direction for table columns. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum SortDirection { + #[default] + None, + Ascending, + Descending, +} + +impl SortDirection { + pub fn cycle(self) -> Self { + match self { + SortDirection::None => SortDirection::Ascending, + SortDirection::Ascending => SortDirection::Descending, + SortDirection::Descending => SortDirection::None, + } + } + + pub fn indicator(self) -> &'static str { + match self { + SortDirection::None => "", + SortDirection::Ascending => " ▲", + SortDirection::Descending => " ▼", + } + } +} + +/// Focus state within the Query tab. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum QueryFocus { + #[default] + SqlInput, + ResultsTable, +} + +/// State for the SQL query interface. +pub struct QueryState { + /// The SQL query input text. + pub sql_input: String, + /// Cursor position in the SQL input. + pub cursor_position: usize, + /// Current focus within the Query tab. + pub focus: QueryFocus, + /// Query results as RecordBatches. + pub results: Option, + /// Error message if query failed. + pub error: Option, + /// Whether a query is currently running. + pub running: bool, + /// Table state for the results view. + pub table_state: TableState, + /// Horizontal scroll offset for the results table. + pub horizontal_scroll: usize, + /// Column being sorted (if any). + pub sort_column: Option, + /// Sort direction. + pub sort_direction: SortDirection, + /// Current page (0-indexed). + pub current_page: usize, + /// Rows per page (parsed from LIMIT clause). + pub page_size: usize, + /// Total row count from COUNT(*) query. + pub total_row_count: Option, + /// Base SQL query (without LIMIT/OFFSET) for pagination. + pub base_query: String, + /// ORDER BY clause if any. + pub order_clause: Option, +} + +impl Default for QueryState { + fn default() -> Self { + let default_sql = "SELECT * FROM data LIMIT 20"; + Self { + sql_input: default_sql.to_string(), + cursor_position: default_sql.len(), + focus: QueryFocus::default(), + results: None, + error: None, + running: false, + table_state: TableState::default(), + horizontal_scroll: 0, + sort_column: None, + sort_direction: SortDirection::default(), + current_page: 0, + page_size: 20, + total_row_count: None, + base_query: "SELECT * FROM data".to_string(), + order_clause: None, + } + } +} + +impl QueryState { + /// Insert a character at the cursor position. + pub fn insert_char(&mut self, c: char) { + self.sql_input.insert(self.cursor_position, c); + self.cursor_position += 1; + } + + /// Delete the character before the cursor. + pub fn delete_char(&mut self) { + if self.cursor_position > 0 { + self.cursor_position -= 1; + self.sql_input.remove(self.cursor_position); + } + } + + /// Delete the character at the cursor. + pub fn delete_char_forward(&mut self) { + if self.cursor_position < self.sql_input.len() { + self.sql_input.remove(self.cursor_position); + } + } + + /// Move cursor left. + pub fn move_cursor_left(&mut self) { + self.cursor_position = self.cursor_position.saturating_sub(1); + } + + /// Move cursor right. + pub fn move_cursor_right(&mut self) { + if self.cursor_position < self.sql_input.len() { + self.cursor_position += 1; + } + } + + /// Move cursor to start. + pub fn move_cursor_start(&mut self) { + self.cursor_position = 0; + } + + /// Move cursor to end. + pub fn move_cursor_end(&mut self) { + self.cursor_position = self.sql_input.len(); + } + + /// Clear the SQL input. + pub fn clear_input(&mut self) { + self.sql_input.clear(); + self.cursor_position = 0; + } + + /// Toggle focus between SQL input and results table. + pub fn toggle_focus(&mut self) { + self.focus = match self.focus { + QueryFocus::SqlInput => QueryFocus::ResultsTable, + QueryFocus::ResultsTable => QueryFocus::SqlInput, + }; + } + + /// Execute initial query - parses SQL, gets total count, fetches first page. + pub fn execute_initial_query(&mut self, file_path: &str) { + self.running = true; + self.error = None; + + // Parse the SQL to extract base query, order clause, and page size + let (base_sql, order_clause, limit) = self.parse_sql_parts(); + self.base_query = base_sql; + self.order_clause = order_clause; + self.page_size = limit.unwrap_or(20); + self.current_page = 0; + + // Get total row count + self.total_row_count = get_row_count(file_path, &self.base_query).ok(); + + // Build and execute the query + self.rebuild_and_execute(file_path); + } + + /// Navigate to next page. + pub fn next_page(&mut self, file_path: &str) { + let total_pages = self.total_pages(); + if self.current_page + 1 < total_pages { + self.current_page += 1; + self.rebuild_and_execute(file_path); + } + } + + /// Navigate to previous page. + pub fn prev_page(&mut self, file_path: &str) { + if self.current_page > 0 { + self.current_page -= 1; + self.rebuild_and_execute(file_path); + } + } + + /// Get total number of pages. + pub fn total_pages(&self) -> usize { + match self.total_row_count { + Some(total) if total > 0 => total.div_ceil(self.page_size), + _ => 1, + } + } + + /// Build SQL query from current state and execute it. + fn rebuild_and_execute(&mut self, file_path: &str) { + let offset = self.current_page * self.page_size; + + let new_sql = match &self.order_clause { + Some(order) => { + format!( + "{} {} LIMIT {} OFFSET {}", + self.base_query, order, self.page_size, offset + ) + } + None => { + format!( + "{} LIMIT {} OFFSET {}", + self.base_query, self.page_size, offset + ) + } + }; + + self.sql_input = new_sql; + self.cursor_position = self.sql_input.len(); + + self.running = true; + self.error = None; + + match exec_query(file_path, &self.sql_input) { + Ok(results) => { + self.results = Some(results); + self.table_state.select(Some(0)); + } + Err(e) => { + self.error = Some(e); + } + } + self.running = false; + } + + /// Parse SQL to extract base query, ORDER BY clause, and LIMIT value. + fn parse_sql_parts(&self) -> (String, Option, Option) { + let sql = &self.sql_input; + let sql_upper = sql.to_uppercase(); + + // Find positions of clauses + let order_idx = sql_upper.find(" ORDER BY "); + let limit_idx = sql_upper.find(" LIMIT "); + let offset_idx = sql_upper.find(" OFFSET "); + + // Extract limit value if present + let limit_value = if let Some(li) = limit_idx { + let after_limit = &sql[li + 7..]; // Skip " LIMIT " + let end_idx = after_limit + .find(|c: char| !c.is_ascii_digit() && c != ' ') + .unwrap_or(after_limit.len()); + after_limit[..end_idx].trim().parse::().ok() + } else { + None + }; + + // Find the earliest of LIMIT or OFFSET to know where to cut + let cut_idx = match (limit_idx, offset_idx) { + (Some(li), Some(oi)) => Some(li.min(oi)), + (Some(li), None) => Some(li), + (None, Some(oi)) => Some(oi), + (None, None) => None, + }; + + match (order_idx, cut_idx) { + (Some(oi), Some(ci)) if oi < ci => { + // ORDER BY comes before LIMIT/OFFSET + let base = sql[..oi].trim().to_string(); + let order = sql[oi..ci].trim().to_string(); + (base, Some(order), limit_value) + } + (Some(oi), None) => { + // Only ORDER BY, no LIMIT/OFFSET + let base = sql[..oi].trim().to_string(); + let order = sql[oi..].trim().to_string(); + (base, Some(order), limit_value) + } + (None, Some(ci)) => { + // No ORDER BY, just LIMIT/OFFSET + let base = sql[..ci].trim().to_string(); + (base, None, limit_value) + } + (Some(_oi), Some(ci)) => { + // ORDER BY comes after LIMIT (unusual) - just cut at LIMIT + let base = sql[..ci].trim().to_string(); + (base, None, limit_value) + } + (None, None) => { + // No ORDER BY or LIMIT/OFFSET + (sql.clone(), None, limit_value) + } + } + } + + /// Get the currently selected column index. + pub fn selected_column(&self) -> usize { + self.horizontal_scroll + } + + /// Total number of columns in results. + pub fn column_count(&self) -> usize { + self.results + .as_ref() + .and_then(|r| r.batches.first()) + .map(|b| b.num_columns()) + .unwrap_or(0) + } + + /// Apply sort on a column by modifying the ORDER BY clause and re-executing. + pub fn apply_sort(&mut self, column: usize, file_path: &str) { + // Get the column name from results + let column_name = match &self.results { + Some(results) if column < results.column_names.len() => { + results.column_names[column].clone() + } + _ => return, + }; + + // Cycle sort direction + if self.sort_column == Some(column) { + self.sort_direction = self.sort_direction.cycle(); + if self.sort_direction == SortDirection::None { + self.sort_column = None; + } + } else { + self.sort_column = Some(column); + self.sort_direction = SortDirection::Ascending; + } + + // Update the ORDER BY clause + self.order_clause = if self.sort_direction == SortDirection::None { + None + } else { + let direction = match self.sort_direction { + SortDirection::Ascending => "ASC", + SortDirection::Descending => "DESC", + SortDirection::None => unreachable!(), + }; + Some(format!("ORDER BY \"{column_name}\" {direction}")) + }; + + // Reset to first page and re-execute + self.current_page = 0; + self.rebuild_and_execute(file_path); + } +} + +/// Holds query results for display. +pub struct QueryResults { + pub batches: Vec, + pub total_rows: usize, + pub column_names: Vec, +} + +/// Execute a SQL query against the Vortex file. +fn exec_query(file_path: &str, sql: &str) -> Result { + block_in_place(|| { + Handle::current().block_on(async { + let batches = execute_query(file_path, sql).await?; + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + let column_names = if let Some(batch) = batches.first() { + let schema = batch.schema(); + schema.fields().iter().map(|f| f.name().clone()).collect() + } else { + vec![] + }; + + Ok(QueryResults { + batches, + total_rows, + column_names, + }) + }) + }) +} + +/// Get total row count for a base query using COUNT(*). +pub fn get_row_count(file_path: &str, base_query: &str) -> Result { + block_in_place(|| { + Handle::current().block_on(async { + let count_sql = format!("SELECT COUNT(*) as count FROM ({base_query}) AS subquery"); + + let batches = execute_query(file_path, &count_sql).await?; + + // Extract count from result + if let Some(batch) = batches.first() + && batch.num_rows() > 0 + && batch.num_columns() > 0 + { + use arrow_array::Int64Array; + if let Some(arr) = batch.column(0).as_any().downcast_ref::() { + #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)] + return Ok(arr.value(0) as usize); + } + } + + Ok(0) + }) + }) +} + +/// Render the Query tab UI. +pub fn render_query(app: &mut AppState<'_>, area: Rect, buf: &mut Buffer) { + let [input_area, results_area] = + Layout::vertical([Constraint::Length(5), Constraint::Min(10)]).areas(area); + + render_sql_input(app, input_area, buf); + render_results_table(app, results_area, buf); +} + +fn render_sql_input(app: &mut AppState<'_>, area: Rect, buf: &mut Buffer) { + let is_focused = app.query_state.focus == QueryFocus::SqlInput; + + let border_color = if is_focused { + Color::Cyan + } else { + Color::DarkGray + }; + + let block = Block::default() + .title("SQL Query (Enter to execute, Esc to switch focus)") + .borders(Borders::ALL) + .border_type(BorderType::Rounded) + .border_style(Style::default().fg(border_color)); + + let inner = block.inner(area); + block.render(area, buf); + + // Create the input text with cursor + let sql = &app.query_state.sql_input; + let cursor_pos = app.query_state.cursor_position; + + let (before_cursor, after_cursor) = sql.split_at(cursor_pos.min(sql.len())); + + let first_char = after_cursor.chars().next(); + let cursor_char = if is_focused { + match first_char { + None => Span::styled(" ", Style::default().bg(Color::White).fg(Color::Black)), + Some(c) => Span::styled( + c.to_string(), + Style::default().bg(Color::White).fg(Color::Black), + ), + } + } else { + match first_char { + None => Span::raw(""), + Some(c) => Span::raw(c.to_string()), + } + }; + + let rest = match first_char { + Some(c) if after_cursor.len() > c.len_utf8() => &after_cursor[c.len_utf8()..], + _ => "", + }; + + let line = Line::from(vec![Span::raw(before_cursor), cursor_char, Span::raw(rest)]); + + let paragraph = Paragraph::new(line).style(Style::default().fg(Color::White)); + + paragraph.render(inner, buf); +} + +fn render_results_table(app: &mut AppState<'_>, area: Rect, buf: &mut Buffer) { + let is_focused = app.query_state.focus == QueryFocus::ResultsTable; + + let border_color = if is_focused { + Color::Cyan + } else { + Color::DarkGray + }; + + // Show status in title + let title = if app.query_state.running { + "Results (running...)".to_string() + } else if let Some(ref error) = app.query_state.error { + format!("Results (error: {})", truncate_str(error, 50)) + } else if let Some(ref _results) = app.query_state.results { + let total_rows = app.query_state.total_row_count.unwrap_or(0); + let total_pages = app.query_state.total_pages(); + format!( + "Results ({} rows, page {}/{}) [hjkl navigate, ^h/^l pages, s sort]", + total_rows, + app.query_state.current_page + 1, + total_pages, + ) + } else { + "Results (press Enter to execute query)".to_string() + }; + + let block = Block::default() + .title(title) + .borders(Borders::ALL) + .border_type(BorderType::Rounded) + .border_style(Style::default().fg(border_color)); + + let inner = block.inner(area); + block.render(area, buf); + + if let Some(ref error) = app.query_state.error { + let error_text = Paragraph::new(error.as_str()) + .style(Style::default().fg(Color::Red)) + .wrap(ratatui::widgets::Wrap { trim: true }); + error_text.render(inner, buf); + return; + } + + let Some(ref results) = app.query_state.results else { + let help = Paragraph::new("Enter a SQL query above and press Enter to execute.\nThe table is available as 'data'.\n\nExample: SELECT * FROM data WHERE column > 10 LIMIT 100") + .style(Style::default().fg(Color::Gray)); + help.render(inner, buf); + return; + }; + + if results.batches.is_empty() || results.total_rows == 0 { + let empty = + Paragraph::new("Query returned no results.").style(Style::default().fg(Color::Yellow)); + empty.render(inner, buf); + return; + } + + // Build header row with sort indicators + let header_cells: Vec = results + .column_names + .iter() + .enumerate() + .map(|(i, name)| { + let indicator = if app.query_state.sort_column == Some(i) { + app.query_state.sort_direction.indicator() + } else { + "" + }; + + let style = if is_focused && i == app.query_state.horizontal_scroll { + Style::default().fg(Color::Black).bg(Color::Cyan).bold() + } else { + Style::default().fg(Color::Green).bold() + }; + + Cell::from(format!("{name}{indicator}")).style(style) + }) + .collect(); + + let header = Row::new(header_cells).height(1); + + // Since we use LIMIT/OFFSET in SQL, batches contain only the current page's data + // Display all rows from the batches + let rows = get_all_rows(results, &app.query_state); + + // Calculate column widths + #[allow(clippy::cast_possible_truncation)] + let widths: Vec = results + .column_names + .iter() + .map(|name| Constraint::Min((name.len() + 3).max(10) as u16)) + .collect(); + + let table = Table::new(rows, widths) + .header(header) + .row_highlight_style(Style::default().bg(Color::DarkGray)); + + // Split area for table and scrollbar + let [table_area, scrollbar_area] = + Layout::horizontal([Constraint::Min(0), Constraint::Length(1)]).areas(inner); + + StatefulWidget::render(table, table_area, buf, &mut app.query_state.table_state); + + // Render vertical scrollbar + let total_pages = app.query_state.total_pages(); + if total_pages > 1 { + let mut scrollbar_state = ScrollbarState::new(total_pages) + .position(app.query_state.current_page) + .viewport_content_length(1); + + Scrollbar::new(ScrollbarOrientation::VerticalRight) + .begin_symbol(Some("▲")) + .end_symbol(Some("▼")) + .render(scrollbar_area, buf, &mut scrollbar_state); + } +} + +/// Get all rows from batches (pagination is handled via SQL LIMIT/OFFSET). +fn get_all_rows<'a>(results: &'a QueryResults, query_state: &QueryState) -> Vec> { + let mut rows = Vec::new(); + + for batch in &results.batches { + for row_idx in 0..batch.num_rows() { + let cells: Vec = (0..batch.num_columns()) + .map(|col_idx| { + let json_value = arrow_value_to_json(batch.column(col_idx).as_ref(), row_idx); + let value = json_value_to_display(json_value); + let style = if query_state.sort_column == Some(col_idx) { + Style::default().fg(Color::Cyan) + } else { + Style::default() + }; + Cell::from(truncate_str(&value, 30).to_string()).style(style) + }) + .collect(); + rows.push(Row::new(cells)); + } + } + + rows +} + +fn truncate_str(s: &str, max_len: usize) -> &str { + if s.len() <= max_len { + s + } else { + &s[..max_len.saturating_sub(3)] + } +} diff --git a/vortex-tui/src/browse/ui/segments.rs b/vortex-tui/src/browse/ui/segments.rs index abed64388b5..dce3ac641ea 100644 --- a/vortex-tui/src/browse/ui/segments.rs +++ b/vortex-tui/src/browse/ui/segments.rs @@ -1,8 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use std::sync::Arc; - use humansize::DECIMAL; use ratatui::buffer::Buffer; use ratatui::layout::Rect; @@ -30,15 +28,13 @@ use taffy::TaffyTree; use taffy::TraversePartialTree; use vortex::dtype::FieldName; use vortex::error::VortexExpect; -use vortex::error::VortexResult; use vortex::error::VortexUnwrap; use vortex::error::vortex_err; -use vortex::file::SegmentSpec; -use vortex::layout::Layout; -use vortex::layout::LayoutChildType; use vortex::utils::aliases::hash_map::HashMap; use crate::browse::app::AppState; +use crate::segment_tree::SegmentTree; +use crate::segment_tree::collect_segment_tree; #[derive(Debug, Clone, Default)] pub struct SegmentGridState<'a> { @@ -90,13 +86,6 @@ pub struct NodeContents<'a> { contents: Vec>, } -pub struct SegmentDisplay { - name: FieldName, - spec: SegmentSpec, - row_offset: u64, - row_count: u64, -} - #[expect( clippy::cast_possible_truncation, reason = "UI coordinates are small enough" @@ -382,107 +371,3 @@ fn to_display_segment_tree<'a>( )?; Ok((tree, root, node_contents)) } - -fn collect_segment_tree(root_layout: &dyn Layout, segments: &Arc<[SegmentSpec]>) -> SegmentTree { - let mut tree = SegmentTree { - segments: HashMap::new(), - segment_ordering: Vec::new(), - }; - segments_by_name_impl(root_layout, None, None, Some(0), segments, &mut tree).vortex_unwrap(); - - tree -} - -struct SegmentTree { - segments: HashMap>, - segment_ordering: Vec, -} - -fn segments_by_name_impl( - root: &dyn Layout, - group_name: Option, - name: Option, - row_offset: Option, - segments: &Arc<[SegmentSpec]>, - segment_tree: &mut SegmentTree, -) -> VortexResult<()> { - // Recurse into children - for (child, child_type) in root.children()?.into_iter().zip(root.child_types()) { - match child_type { - LayoutChildType::Transparent(sub_name) => segments_by_name_impl( - child.as_ref(), - group_name.clone(), - Some( - name.as_ref() - .map(|n| format!("{n}.{sub_name}").into()) - .unwrap_or_else(|| sub_name.into()), - ), - row_offset, - segments, - segment_tree, - )?, - LayoutChildType::Auxiliary(aux_name) => segments_by_name_impl( - child.as_ref(), - group_name.clone(), - Some( - name.as_ref() - .map(|n| format!("{n}.{aux_name}").into()) - .unwrap_or_else(|| aux_name.into()), - ), - Some(0), - segments, - segment_tree, - )?, - LayoutChildType::Chunk((idx, chunk_row_offset)) => { - segments_by_name_impl( - child.as_ref(), - group_name.clone(), - Some( - name.as_ref() - .map(|n| format!("{n}.[{idx}]")) - .unwrap_or_else(|| format!("[{idx}]")) - .into(), - ), - // Compute absolute row offset. - Some(chunk_row_offset + row_offset.unwrap_or(0)), - segments, - segment_tree, - )? - } - LayoutChildType::Field(field_name) => { - // Step into a new group name - let group_name = group_name - .as_ref() - .map(|n| format!("{n}.{field_name}").into()) - .unwrap_or_else(|| field_name); - segment_tree.segment_ordering.push(group_name.clone()); - - segments_by_name_impl( - child.as_ref(), - Some(group_name), - None, - row_offset, - segments, - segment_tree, - )? - } - } - } - - let current_segments = segment_tree - .segments - .entry(group_name.unwrap_or_else(|| FieldName::from("root"))) - .or_default(); - - for segment_id in root.segment_ids() { - let segment_spec = segments[*segment_id as usize].clone(); - current_segments.push(SegmentDisplay { - name: name.clone().unwrap_or_else(|| "".into()), - spec: segment_spec, - row_count: root.row_count(), - row_offset: row_offset.unwrap_or(0), - }) - } - - Ok(()) -} diff --git a/vortex-tui/src/datafusion_helper.rs b/vortex-tui/src/datafusion_helper.rs new file mode 100644 index 00000000000..d6c176fae94 --- /dev/null +++ b/vortex-tui/src/datafusion_helper.rs @@ -0,0 +1,218 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Shared DataFusion query execution utilities for both CLI and TUI. + +use std::sync::Arc; + +use arrow_array::Array as ArrowArray; +use arrow_array::RecordBatch; +use datafusion::datasource::listing::ListingOptions; +use datafusion::datasource::listing::ListingTable; +use datafusion::datasource::listing::ListingTableConfig; +use datafusion::datasource::listing::ListingTableUrl; +use datafusion::prelude::SessionContext; +use vortex_datafusion::VortexFormat; + +use crate::SESSION; + +/// Execute a SQL query against a Vortex file. +/// +/// The file is registered as a table named "data". +/// Returns the result as a vector of RecordBatches. +pub async fn execute_query(file_path: &str, sql: &str) -> Result, String> { + let ctx = create_context(file_path).await?; + + let df = ctx.sql(sql).await.map_err(|e| format!("SQL error: {e}"))?; + + df.collect() + .await + .map_err(|e| format!("Query execution error: {e}")) +} + +/// Create a DataFusion SessionContext with a Vortex file registered as "data". +async fn create_context(file_path: &str) -> Result { + let ctx = SessionContext::new(); + let format = Arc::new(VortexFormat::new(SESSION.clone())); + + let table_url = + ListingTableUrl::parse(file_path).map_err(|e| format!("Failed to parse file path: {e}"))?; + + let config = ListingTableConfig::new(table_url) + .with_listing_options( + ListingOptions::new(format).with_session_config_options(ctx.state().config()), + ) + .infer_schema(&ctx.state()) + .await + .map_err(|e| format!("Failed to infer schema: {e}"))?; + + let listing_table = Arc::new( + ListingTable::try_new(config).map_err(|e| format!("Failed to create table: {e}"))?, + ); + + ctx.register_table("data", listing_table) + .map_err(|e| format!("Failed to register table: {e}"))?; + + Ok(ctx) +} + +/// Convert an Arrow array value at a given index to a JSON value. +#[allow(clippy::unwrap_used)] +pub fn arrow_value_to_json(array: &dyn ArrowArray, idx: usize) -> serde_json::Value { + use arrow_array::*; + use arrow_schema::DataType; + + if array.is_null(idx) { + return serde_json::Value::Null; + } + + match array.data_type() { + DataType::Null => serde_json::Value::Null, + DataType::Boolean => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::Value::Bool(arr.value(idx)) + } + DataType::Int8 => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::json!(arr.value(idx)) + } + DataType::Int16 => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::json!(arr.value(idx)) + } + DataType::Int32 => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::json!(arr.value(idx)) + } + DataType::Int64 => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::json!(arr.value(idx)) + } + DataType::UInt8 => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::json!(arr.value(idx)) + } + DataType::UInt16 => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::json!(arr.value(idx)) + } + DataType::UInt32 => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::json!(arr.value(idx)) + } + DataType::UInt64 => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::json!(arr.value(idx)) + } + DataType::Float16 => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::json!(arr.value(idx).to_f32()) + } + DataType::Float32 => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::json!(arr.value(idx)) + } + DataType::Float64 => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::json!(arr.value(idx)) + } + DataType::Utf8 => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::Value::String(arr.value(idx).to_string()) + } + DataType::LargeUtf8 => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::Value::String(arr.value(idx).to_string()) + } + DataType::Utf8View => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::Value::String(arr.value(idx).to_string()) + } + DataType::Binary => { + let arr = array.as_any().downcast_ref::().unwrap(); + let hex: String = arr.value(idx).iter().map(|b| format!("{b:02x}")).collect(); + serde_json::Value::String(hex) + } + DataType::LargeBinary => { + let arr = array.as_any().downcast_ref::().unwrap(); + let hex: String = arr.value(idx).iter().map(|b| format!("{b:02x}")).collect(); + serde_json::Value::String(hex) + } + DataType::BinaryView => { + let arr = array.as_any().downcast_ref::().unwrap(); + let hex: String = arr.value(idx).iter().map(|b| format!("{b:02x}")).collect(); + serde_json::Value::String(hex) + } + DataType::Date32 => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::json!(arr.value(idx)) + } + DataType::Date64 => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::json!(arr.value(idx)) + } + DataType::Timestamp(_, _) => { + if let Some(arr) = array.as_any().downcast_ref::() { + serde_json::json!(arr.value(idx)) + } else if let Some(arr) = array.as_any().downcast_ref::() { + serde_json::json!(arr.value(idx)) + } else if let Some(arr) = array.as_any().downcast_ref::() { + serde_json::json!(arr.value(idx)) + } else if let Some(arr) = array.as_any().downcast_ref::() { + serde_json::json!(arr.value(idx)) + } else { + serde_json::Value::String("".to_string()) + } + } + DataType::Decimal128(_, _) => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::Value::String(arr.value_as_string(idx)) + } + DataType::Decimal256(_, _) => { + let arr = array.as_any().downcast_ref::().unwrap(); + serde_json::Value::String(arr.value_as_string(idx)) + } + DataType::List(_) => { + let arr = array.as_any().downcast_ref::().unwrap(); + let value_arr = arr.value(idx); + let elements: Vec = (0..value_arr.len()) + .map(|i| arrow_value_to_json(value_arr.as_ref(), i)) + .collect(); + serde_json::Value::Array(elements) + } + DataType::LargeList(_) => { + let arr = array.as_any().downcast_ref::().unwrap(); + let value_arr = arr.value(idx); + let elements: Vec = (0..value_arr.len()) + .map(|i| arrow_value_to_json(value_arr.as_ref(), i)) + .collect(); + serde_json::Value::Array(elements) + } + DataType::Struct(_) => { + let arr = array.as_any().downcast_ref::().unwrap(); + let mut obj = serde_json::Map::new(); + for (i, field) in arr.fields().iter().enumerate() { + let col = arr.column(i); + obj.insert(field.name().clone(), arrow_value_to_json(col.as_ref(), idx)); + } + serde_json::Value::Object(obj) + } + _ => { + // Fallback for unsupported types + serde_json::Value::String(format!("<{}>", array.data_type())) + } + } +} + +/// Format a JSON value for display in the TUI. +/// +/// - Null becomes "NULL" +/// - Strings are displayed without quotes +/// - Other values use their JSON string representation +pub fn json_value_to_display(value: serde_json::Value) -> String { + match value { + serde_json::Value::Null => "NULL".to_string(), + serde_json::Value::String(s) => s, + other => other.to_string(), + } +} diff --git a/vortex-tui/src/inspect.rs b/vortex-tui/src/inspect.rs index 2f4f1e0432a..1f1edea153a 100644 --- a/vortex-tui/src/inspect.rs +++ b/vortex-tui/src/inspect.rs @@ -6,11 +6,13 @@ use std::fs::File; use std::io::Read; use std::io::Seek; use std::io::SeekFrom; +use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use flatbuffers::root; use itertools::Itertools; +use serde::Serialize; use vortex::buffer::Alignment; use vortex::buffer::ByteBuffer; use vortex::error::VortexExpect; @@ -36,6 +38,10 @@ pub struct InspectArgs { /// Path to the Vortex file to inspect pub file: PathBuf, + + /// Output as JSON + #[arg(long, global = true)] + pub json: bool, } #[derive(Debug, clap::Subcommand)] @@ -50,15 +56,189 @@ pub enum InspectMode { Footer, } +#[derive(Serialize)] +pub struct InspectOutput { + pub file_path: String, + pub file_size: u64, + pub eof: EofInfoJson, + #[serde(skip_serializing_if = "Option::is_none")] + pub postscript: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub footer: Option, +} + +#[derive(Serialize)] +pub struct EofInfoJson { + pub version: u16, + pub current_version: u16, + pub postscript_size: u16, + pub magic_bytes: String, + pub valid_magic: bool, +} + +#[derive(Serialize)] +pub struct SegmentInfoJson { + pub offset: u64, + pub length: u32, + pub alignment: usize, +} + +#[derive(Serialize)] +pub struct PostscriptInfoJson { + pub dtype: Option, + pub layout: SegmentInfoJson, + pub statistics: Option, + pub footer: SegmentInfoJson, +} + +#[derive(Serialize)] +pub struct FooterInfoJson { + pub total_segments: usize, + pub total_data_size: u64, + pub segments: Vec, +} + +#[derive(Serialize)] +pub struct FooterSegmentJson { + pub index: usize, + pub offset: u64, + pub end_offset: u64, + pub length: u32, + pub alignment: usize, + pub path: Option, +} + pub async fn exec_inspect(args: InspectArgs) -> anyhow::Result<()> { let mut inspector = VortexInspector::new(args.file.clone())?; - println!("File: {}", args.file.display()); + let mode = args.mode.unwrap_or(InspectMode::Footer); + + if args.json { + exec_inspect_json(&mut inspector, &args.file, mode).await + } else { + exec_inspect_text(&mut inspector, &args.file, mode).await + } +} + +async fn exec_inspect_json( + inspector: &mut VortexInspector, + file_path: &Path, + mode: InspectMode, +) -> anyhow::Result<()> { + let eof = inspector.read_eof()?; + let eof_json = EofInfoJson { + version: eof.version, + current_version: VERSION, + postscript_size: eof.postscript_size, + magic_bytes: std::str::from_utf8(&eof.magic_bytes) + .unwrap_or("") + .to_string(), + valid_magic: eof.valid_magic, + }; + + let postscript_json = + if matches!(mode, InspectMode::Postscript | InspectMode::Footer) && eof.valid_magic { + inspector + .read_postscript(eof.postscript_size) + .ok() + .map(|ps| PostscriptInfoJson { + dtype: ps.dtype.map(|s| SegmentInfoJson { + offset: s.offset, + length: s.length, + alignment: *s.alignment, + }), + layout: SegmentInfoJson { + offset: ps.layout.offset, + length: ps.layout.length, + alignment: *ps.layout.alignment, + }, + statistics: ps.statistics.map(|s| SegmentInfoJson { + offset: s.offset, + length: s.length, + alignment: *s.alignment, + }), + footer: SegmentInfoJson { + offset: ps.footer.offset, + length: ps.footer.length, + alignment: *ps.footer.alignment, + }, + }) + } else { + None + }; + + let footer_json = + if matches!(mode, InspectMode::Footer) && eof.valid_magic && postscript_json.is_some() { + inspector.read_footer().await.ok().map(|footer| { + let segment_map = footer.segment_map().clone(); + let root_layout = footer.layout().clone(); + + let mut segment_paths: Vec>>> = vec![None; segment_map.len()]; + let mut queue = + VecDeque::<(Vec>, LayoutRef)>::from_iter([(Vec::new(), root_layout)]); + while !queue.is_empty() { + let (path, layout) = queue.pop_front().vortex_expect("queue is not empty"); + for segment in layout.segment_ids() { + segment_paths[*segment as usize] = Some(path.clone()); + } + if let Ok(children) = layout.children() { + for (child_layout, child_name) in + children.into_iter().zip(layout.child_names()) + { + let child_path = path.iter().cloned().chain([child_name]).collect(); + queue.push_back((child_path, child_layout)); + } + } + } + + let segments: Vec = segment_map + .iter() + .enumerate() + .map(|(i, segment)| FooterSegmentJson { + index: i, + offset: segment.offset, + end_offset: segment.offset + segment.length as u64, + length: segment.length, + alignment: *segment.alignment, + path: segment_paths[i] + .as_ref() + .map(|p| p.iter().map(|s| s.as_ref()).collect::>().join(".")), + }) + .collect(); + + FooterInfoJson { + total_segments: segment_map.len(), + total_data_size: segment_map.iter().map(|s| s.length as u64).sum(), + segments, + } + }) + } else { + None + }; + + let output = InspectOutput { + file_path: file_path.display().to_string(), + file_size: inspector.file_size, + eof: eof_json, + postscript: postscript_json, + footer: footer_json, + }; + + let json_output = serde_json::to_string_pretty(&output)?; + println!("{json_output}"); + + Ok(()) +} + +async fn exec_inspect_text( + inspector: &mut VortexInspector, + file_path: &Path, + mode: InspectMode, +) -> anyhow::Result<()> { + println!("File: {}", file_path.display()); println!("Size: {} bytes", inspector.file_size); println!(); - let mode = args.mode.unwrap_or(InspectMode::Footer); - match mode { InspectMode::Eof => { let eof = inspector.read_eof()?; diff --git a/vortex-tui/src/main.rs b/vortex-tui/src/main.rs index f9f4ad4a8fe..0f54a2d7a4c 100644 --- a/vortex-tui/src/main.rs +++ b/vortex-tui/src/main.rs @@ -4,7 +4,11 @@ #![allow(clippy::expect_used)] mod browse; mod convert; +mod datafusion_helper; mod inspect; +mod query; +mod segment_tree; +mod segments; mod tree; use std::path::PathBuf; @@ -21,6 +25,7 @@ use vortex::io::session::RuntimeSessionExt; use vortex::session::VortexSession; use crate::inspect::InspectArgs; +use crate::segments::SegmentsArgs; #[derive(clap::Parser)] #[command(version)] @@ -39,18 +44,24 @@ enum Commands { Browse { file: PathBuf }, /// Inspect Vortex file footer and metadata Inspect(InspectArgs), + /// Execute a SQL query against a Vortex file using DataFusion + Query(query::QueryArgs), + /// Display segment information for a Vortex file + Segments(SegmentsArgs), } impl Commands { fn file_path(&self) -> &PathBuf { match self { Commands::Tree(args) => match &args.mode { - tree::TreeMode::Array { file } => file, + tree::TreeMode::Array { file, .. } => file, tree::TreeMode::Layout { file, .. } => file, }, Commands::Browse { file } => file, Commands::Convert(flags) => &flags.file, Commands::Inspect(args) => &args.file, + Commands::Query(args) => &args.file, + Commands::Segments(args) => &args.file, } } } @@ -82,6 +93,8 @@ async fn main() -> anyhow::Result<()> { Commands::Convert(flags) => convert::exec_convert(flags).await?, Commands::Browse { file } => exec_tui(file).await?, Commands::Inspect(args) => inspect::exec_inspect(args).await?, + Commands::Query(args) => query::exec_query(args).await?, + Commands::Segments(args) => segments::exec_segments(args).await?, }; Ok(()) diff --git a/vortex-tui/src/query.rs b/vortex-tui/src/query.rs new file mode 100644 index 00000000000..2c69ebfafcb --- /dev/null +++ b/vortex-tui/src/query.rs @@ -0,0 +1,112 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::path::PathBuf; + +use arrow_array::RecordBatch; +use serde::Serialize; +use vortex::error::VortexResult; +use vortex::error::vortex_err; + +use crate::datafusion_helper::arrow_value_to_json; +use crate::datafusion_helper::execute_query; + +#[derive(Debug, clap::Parser)] +pub struct QueryArgs { + /// Path to the Vortex file + pub file: PathBuf, + + /// SQL query to execute. The table is available as 'data'. + /// Example: "SELECT * FROM data WHERE col > 10 LIMIT 100" + #[arg(long, short)] + pub sql: String, +} + +#[derive(Serialize)] +struct QueryOutput { + schema: SchemaInfo, + total_rows: u64, + rows: Vec, +} + +#[derive(Serialize)] +struct SchemaInfo { + fields: Vec, +} + +#[derive(Serialize)] +struct FieldInfo { + name: String, + dtype: String, + nullable: bool, +} + +pub async fn exec_query(args: QueryArgs) -> VortexResult<()> { + let file_path = args + .file + .to_str() + .ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?; + + let batches: Vec = execute_query(file_path, &args.sql) + .await + .map_err(|e| vortex_err!("{e}"))?; + + // Build schema info from the result + let schema = if let Some(batch) = batches.first() { + build_schema_from_arrow(batch.schema().as_ref()) + } else { + SchemaInfo { fields: vec![] } + }; + + // Convert batches to JSON rows + let mut rows = Vec::new(); + for batch in &batches { + batch_to_json_rows(batch, &mut rows)?; + } + + let total_rows = rows.len() as u64; + + let output = QueryOutput { + schema, + total_rows, + rows, + }; + + let json_output = serde_json::to_string_pretty(&output) + .map_err(|e| vortex_err!("Failed to serialize JSON: {e}"))?; + println!("{json_output}"); + + Ok(()) +} + +fn build_schema_from_arrow(schema: &arrow_schema::Schema) -> SchemaInfo { + let fields = schema + .fields() + .iter() + .map(|f| FieldInfo { + name: f.name().clone(), + dtype: f.data_type().to_string(), + nullable: f.is_nullable(), + }) + .collect(); + + SchemaInfo { fields } +} + +fn batch_to_json_rows(batch: &RecordBatch, rows: &mut Vec) -> VortexResult<()> { + let schema = batch.schema(); + + for row_idx in 0..batch.num_rows() { + let mut obj = serde_json::Map::new(); + + for (col_idx, field) in schema.fields().iter().enumerate() { + let column = batch.column(col_idx); + let value = arrow_value_to_json(column.as_ref(), row_idx); + obj.insert(field.name().clone(), value); + } + + rows.push(serde_json::Value::Object(obj)); + } + + Ok(()) +} diff --git a/vortex-tui/src/segment_tree.rs b/vortex-tui/src/segment_tree.rs new file mode 100644 index 00000000000..79dd419722d --- /dev/null +++ b/vortex-tui/src/segment_tree.rs @@ -0,0 +1,141 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Shared segment tree collection logic used by both the TUI browse view and the CLI segments command. + +use std::sync::Arc; + +use vortex::dtype::FieldName; +use vortex::error::VortexResult; +use vortex::file::SegmentSpec; +use vortex::layout::Layout; +use vortex::layout::LayoutChildType; +use vortex::utils::aliases::hash_map::HashMap; + +/// Information about a single segment for display purposes. +pub struct SegmentDisplay { + /// Name of the segment (e.g., "data", "[0]", "zones") + pub name: FieldName, + /// The underlying segment specification + pub spec: SegmentSpec, + /// Row offset within the file + pub row_offset: u64, + /// Number of rows in this segment + pub row_count: u64, +} + +/// A tree of segments organized by field name. +pub struct SegmentTree { + /// Map from field name to list of segments for that field + pub segments: HashMap>, + /// Ordered list of field names (columns) in display order + pub segment_ordering: Vec, +} + +/// Collect segment tree from a layout and segment map. +pub fn collect_segment_tree( + root_layout: &dyn Layout, + segments: &Arc<[SegmentSpec]>, +) -> SegmentTree { + let mut tree = SegmentTree { + segments: HashMap::new(), + segment_ordering: Vec::new(), + }; + // Ignore errors during traversal - we want to collect as much as possible + drop(segments_by_name_impl( + root_layout, + None, + None, + Some(0), + segments, + &mut tree, + )); + tree +} + +fn segments_by_name_impl( + root: &dyn Layout, + group_name: Option, + name: Option, + row_offset: Option, + segments: &Arc<[SegmentSpec]>, + segment_tree: &mut SegmentTree, +) -> VortexResult<()> { + // Recurse into children + for (child, child_type) in root.children()?.into_iter().zip(root.child_types()) { + match child_type { + LayoutChildType::Transparent(sub_name) => segments_by_name_impl( + child.as_ref(), + group_name.clone(), + Some( + name.as_ref() + .map(|n| format!("{n}.{sub_name}").into()) + .unwrap_or_else(|| sub_name.into()), + ), + row_offset, + segments, + segment_tree, + )?, + LayoutChildType::Auxiliary(aux_name) => segments_by_name_impl( + child.as_ref(), + group_name.clone(), + Some( + name.as_ref() + .map(|n| format!("{n}.{aux_name}").into()) + .unwrap_or_else(|| aux_name.into()), + ), + Some(0), + segments, + segment_tree, + )?, + LayoutChildType::Chunk((idx, chunk_row_offset)) => segments_by_name_impl( + child.as_ref(), + group_name.clone(), + Some( + name.as_ref() + .map(|n| format!("{n}.[{idx}]")) + .unwrap_or_else(|| format!("[{idx}]")) + .into(), + ), + // Compute absolute row offset. + Some(chunk_row_offset + row_offset.unwrap_or(0)), + segments, + segment_tree, + )?, + LayoutChildType::Field(field_name) => { + // Step into a new group name + let new_group_name = group_name + .as_ref() + .map(|n| format!("{n}.{field_name}").into()) + .unwrap_or_else(|| field_name); + segment_tree.segment_ordering.push(new_group_name.clone()); + + segments_by_name_impl( + child.as_ref(), + Some(new_group_name), + None, + row_offset, + segments, + segment_tree, + )? + } + } + } + + let current_segments = segment_tree + .segments + .entry(group_name.unwrap_or_else(|| FieldName::from("root"))) + .or_default(); + + for segment_id in root.segment_ids() { + let segment_spec = segments[*segment_id as usize].clone(); + current_segments.push(SegmentDisplay { + name: name.clone().unwrap_or_else(|| "".into()), + spec: segment_spec, + row_count: root.row_count(), + row_offset: row_offset.unwrap_or(0), + }) + } + + Ok(()) +} diff --git a/vortex-tui/src/segments.rs b/vortex-tui/src/segments.rs new file mode 100644 index 00000000000..3118203a03d --- /dev/null +++ b/vortex-tui/src/segments.rs @@ -0,0 +1,105 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::path::PathBuf; + +use serde::Serialize; +use vortex::error::VortexResult; +use vortex::file::OpenOptionsSessionExt; + +use crate::SESSION; +use crate::segment_tree::collect_segment_tree; + +#[derive(Debug, clap::Parser)] +pub struct SegmentsArgs { + /// Path to the Vortex file + pub file: PathBuf, +} + +#[derive(Serialize)] +struct SegmentsOutput { + /// Columns in display order + columns: Vec, +} + +#[derive(Serialize)] +struct ColumnInfo { + /// Field name (column header) + name: String, + /// Segments within this column + segments: Vec, +} + +#[derive(Serialize)] +struct SegmentInfo { + /// Segment name (e.g., "[0]", "data", etc.) + name: String, + /// Row range start + row_offset: u64, + /// Number of rows + row_count: u64, + /// Byte offset in file + byte_offset: u64, + /// Length in bytes + byte_length: u32, + /// Alignment requirement + alignment: usize, + /// Gap from previous segment end + byte_gap: u64, +} + +pub async fn exec_segments(args: SegmentsArgs) -> VortexResult<()> { + let vxf = SESSION.open_options().open(args.file).await?; + + let footer = vxf.footer(); + let mut segment_tree = collect_segment_tree(footer.layout().as_ref(), footer.segment_map()); + + // Convert to output format + let columns: Vec = segment_tree + .segment_ordering + .iter() + .filter_map(|name| { + let mut segments = segment_tree.segments.remove(name)?; + + // Sort by byte offset + segments.sort_by(|a, b| a.spec.offset.cmp(&b.spec.offset)); + + // Convert to output format, computing byte gaps + let mut current_offset = 0u64; + let segment_infos: Vec = segments + .into_iter() + .map(|seg| { + let byte_gap = if current_offset == 0 { + 0 + } else { + seg.spec.offset.saturating_sub(current_offset) + }; + current_offset = seg.spec.offset + seg.spec.length as u64; + + SegmentInfo { + name: seg.name.to_string(), + row_offset: seg.row_offset, + row_count: seg.row_count, + byte_offset: seg.spec.offset, + byte_length: seg.spec.length, + alignment: *seg.spec.alignment, + byte_gap, + } + }) + .collect(); + + Some(ColumnInfo { + name: name.to_string(), + segments: segment_infos, + }) + }) + .collect(); + + let output = SegmentsOutput { columns }; + + let json_output = serde_json::to_string_pretty(&output) + .map_err(|e| vortex::error::vortex_err!("Failed to serialize JSON: {e}"))?; + println!("{json_output}"); + + Ok(()) +} diff --git a/vortex-tui/src/tree.rs b/vortex-tui/src/tree.rs index c33b857d1a3..119378b1e5a 100644 --- a/vortex-tui/src/tree.rs +++ b/vortex-tui/src/tree.rs @@ -4,9 +4,11 @@ use std::path::Path; use std::path::PathBuf; +use serde::Serialize; use vortex::array::stream::ArrayStreamExt; use vortex::error::VortexResult; use vortex::file::OpenOptionsSessionExt; +use vortex::layout::LayoutRef; use crate::SESSION; @@ -23,6 +25,9 @@ pub enum TreeMode { Array { /// Path to the Vortex file file: PathBuf, + /// Output as JSON + #[arg(long)] + json: bool, }, /// Display the layout tree structure (metadata only, no array loading) Layout { @@ -31,19 +36,43 @@ pub enum TreeMode { /// Show additional metadata information including buffer sizes (requires fetching segments) #[arg(short, long)] verbose: bool, + /// Output as JSON + #[arg(long)] + json: bool, }, } +#[derive(Serialize)] +pub struct LayoutTreeNode { + pub encoding: String, + pub dtype: String, + pub row_count: u64, + pub metadata_bytes: usize, + pub segment_ids: Vec, + pub children: Vec, +} + +#[derive(Serialize)] +pub struct LayoutTreeNodeWithName { + pub name: String, + #[serde(flatten)] + pub node: LayoutTreeNode, +} + pub async fn exec_tree(args: TreeArgs) -> VortexResult<()> { match args.mode { - TreeMode::Array { file } => exec_array_tree(&file).await?, - TreeMode::Layout { file, verbose } => exec_layout_tree(&file, verbose).await?, + TreeMode::Array { file, json } => exec_array_tree(&file, json).await?, + TreeMode::Layout { + file, + verbose, + json, + } => exec_layout_tree(&file, verbose, json).await?, } Ok(()) } -async fn exec_array_tree(file: &Path) -> VortexResult<()> { +async fn exec_array_tree(file: &Path, _json: bool) -> VortexResult<()> { let full = SESSION .open_options() .open(file) @@ -58,9 +87,18 @@ async fn exec_array_tree(file: &Path) -> VortexResult<()> { Ok(()) } -async fn exec_layout_tree(file: &Path, verbose: bool) -> VortexResult<()> { +async fn exec_layout_tree(file: &Path, verbose: bool, json: bool) -> VortexResult<()> { let vxf = SESSION.open_options().open(file).await?; + if json { + let tree = layout_to_json(vxf.footer().layout().clone())?; + let json_output = serde_json::to_string_pretty(&tree) + .map_err(|e| vortex::error::vortex_err!("Failed to serialize JSON: {e}"))?; + println!("{json_output}"); + + return Ok(()); + } + if verbose { // In verbose mode, fetch segments to display buffer sizes let output = vxf @@ -76,3 +114,29 @@ async fn exec_layout_tree(file: &Path, verbose: bool) -> VortexResult<()> { Ok(()) } + +fn layout_to_json(layout: LayoutRef) -> VortexResult { + let children = layout.children()?; + let child_names: Vec<_> = layout.child_names().collect(); + + let children_json: Vec = children + .into_iter() + .zip(child_names.into_iter()) + .map(|(child, name)| { + let node = layout_to_json(child)?; + Ok(LayoutTreeNodeWithName { + name: name.to_string(), + node, + }) + }) + .collect::>>()?; + + Ok(LayoutTreeNode { + encoding: layout.encoding().to_string(), + dtype: layout.dtype().to_string(), + row_count: layout.row_count(), + metadata_bytes: layout.metadata().len(), + segment_ids: layout.segment_ids().iter().map(|s| **s).collect(), + children: children_json, + }) +}