From 22929aa0f513126f137cac0635cf8ec591cf8ea6 Mon Sep 17 00:00:00 2001 From: AdaWorldAPI Date: Fri, 30 Jan 2026 08:25:21 +0100 Subject: [PATCH 1/9] Add LanceDB storage substrate --- src/storage/lance.rs | 823 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 823 insertions(+) create mode 100644 src/storage/lance.rs diff --git a/src/storage/lance.rs b/src/storage/lance.rs new file mode 100644 index 0000000..b0dce36 --- /dev/null +++ b/src/storage/lance.rs @@ -0,0 +1,823 @@ +//! LanceDB Storage Substrate +//! +//! Provides the persistent storage layer using Lance columnar format. +//! All data (nodes, edges, fingerprints) stored in Lance tables +//! with native vector/Hamming index support. +//! +//! # Architecture +//! +//! ```text +//! ┌─────────────────────────────────────────────────────────────────┐ +//! │ LANCE SUBSTRATE │ +//! ├─────────────────────────────────────────────────────────────────┤ +//! │ │ +//! │ nodes table → id, label, fingerprint, embedding, props │ +//! │ edges table → from_id, to_id, type, weight, amplification │ +//! │ sessions table → session state, consciousness snapshots │ +//! │ │ +//! │ Indices: │ +//! │ - IVF-PQ on embedding (vector ANN) │ +//! │ - Scalar on label, type (filtering) │ +//! │ - Custom Hamming index on fingerprint │ +//! │ │ +//! └─────────────────────────────────────────────────────────────────┘ +//! ``` + +use arrow::array::*; +use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use arrow::record_batch::RecordBatch; +use lance::dataset::WriteParams; +use lance::Dataset; +use std::path::Path; +use std::sync::Arc; + +use crate::core::Fingerprint; +use crate::cognitive::Thought; +use crate::graph::Edge; +use crate::{Error, Result}; + +/// Fingerprint size in bytes (10,000 bits = 1,250 bytes) +pub const FINGERPRINT_BYTES: usize = 1250; + +/// Jina embedding dimension +pub const EMBEDDING_DIM: usize = 1024; + +/// Thinking style vector dimension (7 axes) +pub const THINKING_STYLE_DIM: usize = 7; + +// ============================================================================= +// SCHEMA DEFINITIONS +// ============================================================================= + +/// Create the nodes table schema +pub fn nodes_schema() -> Schema { + Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("label", DataType::Utf8, false), + Field::new("fingerprint", DataType::FixedSizeBinary(FINGERPRINT_BYTES as i32), true), + Field::new( + "embedding", + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float32, false)), + EMBEDDING_DIM as i32, + ), + true, + ), + Field::new("qidx", DataType::UInt8, false), + Field::new( + "thinking_style", + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float32, false)), + THINKING_STYLE_DIM as i32, + ), + true, + ), + Field::new("content", DataType::Utf8, true), + Field::new("properties", DataType::Utf8, true), // JSON + Field::new("created_at", DataType::Timestamp(TimeUnit::Microsecond, None), false), + Field::new("version", DataType::Int64, false), + ]) +} + +/// Create the edges table schema +pub fn edges_schema() -> Schema { + Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("from_id", DataType::Utf8, false), + Field::new("to_id", DataType::Utf8, false), + Field::new("type", DataType::Utf8, false), + Field::new("weight", DataType::Float32, false), + Field::new("amplification", DataType::Float32, false), + Field::new("properties", DataType::Utf8, true), // JSON + Field::new("created_at", DataType::Timestamp(TimeUnit::Microsecond, None), false), + ]) +} + +/// Create the sessions table schema (for consciousness snapshots) +pub fn sessions_schema() -> Schema { + Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("name", DataType::Utf8, true), + Field::new("thinking_style", + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Float32, false)), + THINKING_STYLE_DIM as i32, + ), + true, + ), + Field::new("coherence", DataType::Float32, false), + Field::new("ice_cake_layers", DataType::Int32, false), + Field::new("state", DataType::Utf8, true), // JSON blob + Field::new("created_at", DataType::Timestamp(TimeUnit::Microsecond, None), false), + Field::new("updated_at", DataType::Timestamp(TimeUnit::Microsecond, None), false), + ]) +} + +// ============================================================================= +// LANCE STORE +// ============================================================================= + +/// LanceDB-backed storage for LadybugDB +pub struct LanceStore { + /// Path to database directory + path: String, + /// Nodes dataset (lazy-loaded) + nodes: Option, + /// Edges dataset (lazy-loaded) + edges: Option, + /// Sessions dataset (lazy-loaded) + sessions: Option, +} + +impl LanceStore { + /// Open or create a Lance store at the given path + pub async fn open>(path: P) -> Result { + let path_str = path.as_ref().to_string_lossy().to_string(); + + // Create directory if needed + std::fs::create_dir_all(&path_str)?; + + Ok(Self { + path: path_str, + nodes: None, + edges: None, + sessions: None, + }) + } + + /// Create in-memory store (for testing) + pub fn memory() -> Self { + Self { + path: ":memory:".to_string(), + nodes: None, + edges: None, + sessions: None, + } + } + + // ------------------------------------------------------------------------- + // TABLE MANAGEMENT + // ------------------------------------------------------------------------- + + /// Get or create the nodes table + pub async fn nodes(&mut self) -> Result<&Dataset> { + if self.nodes.is_none() { + let table_path = format!("{}/nodes.lance", self.path); + + self.nodes = Some(if Path::new(&table_path).exists() { + Dataset::open(&table_path).await? + } else { + // Create empty table with schema + let schema = Arc::new(nodes_schema()); + let batch = RecordBatch::new_empty(schema.clone()); + Dataset::write(&[batch], &table_path, None).await? + }); + } + + Ok(self.nodes.as_ref().unwrap()) + } + + /// Get or create the edges table + pub async fn edges(&mut self) -> Result<&Dataset> { + if self.edges.is_none() { + let table_path = format!("{}/edges.lance", self.path); + + self.edges = Some(if Path::new(&table_path).exists() { + Dataset::open(&table_path).await? + } else { + let schema = Arc::new(edges_schema()); + let batch = RecordBatch::new_empty(schema.clone()); + Dataset::write(&[batch], &table_path, None).await? + }); + } + + Ok(self.edges.as_ref().unwrap()) + } + + /// Get or create the sessions table + pub async fn sessions(&mut self) -> Result<&Dataset> { + if self.sessions.is_none() { + let table_path = format!("{}/sessions.lance", self.path); + + self.sessions = Some(if Path::new(&table_path).exists() { + Dataset::open(&table_path).await? + } else { + let schema = Arc::new(sessions_schema()); + let batch = RecordBatch::new_empty(schema.clone()); + Dataset::write(&[batch], &table_path, None).await? + }); + } + + Ok(self.sessions.as_ref().unwrap()) + } + + // ------------------------------------------------------------------------- + // NODE OPERATIONS + // ------------------------------------------------------------------------- + + /// Insert a node + pub async fn insert_node(&mut self, node: &NodeRecord) -> Result<()> { + let table_path = format!("{}/nodes.lance", self.path); + let batch = node.to_record_batch()?; + + if self.nodes.is_some() { + // Append to existing + let params = WriteParams::default(); + Dataset::write(&[batch], &table_path, Some(params)).await?; + // Invalidate cache to reload + self.nodes = None; + } else { + Dataset::write(&[batch], &table_path, None).await?; + } + + Ok(()) + } + + /// Insert multiple nodes + pub async fn insert_nodes(&mut self, nodes: &[NodeRecord]) -> Result<()> { + if nodes.is_empty() { + return Ok(()); + } + + let table_path = format!("{}/nodes.lance", self.path); + let batch = NodeRecord::batch_to_record_batch(nodes)?; + + let params = WriteParams::default(); + Dataset::write(&[batch], &table_path, Some(params)).await?; + self.nodes = None; + + Ok(()) + } + + /// Get a node by ID + pub async fn get_node(&mut self, id: &str) -> Result> { + let dataset = self.nodes().await?; + + // Scan with filter + let scanner = dataset + .scan() + .filter(format!("id = '{}'", id).as_str())? + .try_into_stream() + .await?; + + use futures::StreamExt; + let mut batches = Vec::new(); + let mut stream = scanner; + while let Some(batch) = stream.next().await { + batches.push(batch?); + } + + if batches.is_empty() || batches[0].num_rows() == 0 { + return Ok(None); + } + + Ok(Some(NodeRecord::from_record_batch(&batches[0], 0)?)) + } + + // ------------------------------------------------------------------------- + // EDGE OPERATIONS + // ------------------------------------------------------------------------- + + /// Insert an edge + pub async fn insert_edge(&mut self, edge: &EdgeRecord) -> Result<()> { + let table_path = format!("{}/edges.lance", self.path); + let batch = edge.to_record_batch()?; + + let params = WriteParams::default(); + Dataset::write(&[batch], &table_path, Some(params)).await?; + self.edges = None; + + Ok(()) + } + + /// Get edges from a node + pub async fn get_edges_from(&mut self, from_id: &str) -> Result> { + let dataset = self.edges().await?; + + let scanner = dataset + .scan() + .filter(format!("from_id = '{}'", from_id).as_str())? + .try_into_stream() + .await?; + + use futures::StreamExt; + let mut results = Vec::new(); + let mut stream = scanner; + while let Some(batch) = stream.next().await { + let batch = batch?; + for i in 0..batch.num_rows() { + results.push(EdgeRecord::from_record_batch(&batch, i)?); + } + } + + Ok(results) + } + + /// Get edges to a node + pub async fn get_edges_to(&mut self, to_id: &str) -> Result> { + let dataset = self.edges().await?; + + let scanner = dataset + .scan() + .filter(format!("to_id = '{}'", to_id).as_str())? + .try_into_stream() + .await?; + + use futures::StreamExt; + let mut results = Vec::new(); + let mut stream = scanner; + while let Some(batch) = stream.next().await { + let batch = batch?; + for i in 0..batch.num_rows() { + results.push(EdgeRecord::from_record_batch(&batch, i)?); + } + } + + Ok(results) + } + + // ------------------------------------------------------------------------- + // VECTOR SEARCH + // ------------------------------------------------------------------------- + + /// Vector similarity search using Lance native ANN + pub async fn vector_search( + &mut self, + embedding: &[f32], + k: usize, + filter: Option<&str>, + ) -> Result> { + let dataset = self.nodes().await?; + + let mut query = dataset + .query() + .nearest_to(embedding)? + .limit(k); + + if let Some(f) = filter { + query = query.filter(f)?; + } + + let results = query.execute().await?; + + use futures::StreamExt; + let mut nodes = Vec::new(); + let mut stream = results; + while let Some(batch) = stream.next().await { + let batch = batch?; + // Distance is in "_distance" column + let distances = batch + .column_by_name("_distance") + .map(|c| c.as_any().downcast_ref::().unwrap()); + + for i in 0..batch.num_rows() { + let node = NodeRecord::from_record_batch(&batch, i)?; + let dist = distances.map(|d| d.value(i)).unwrap_or(0.0); + nodes.push((node, dist)); + } + } + + Ok(nodes) + } + + // ------------------------------------------------------------------------- + // HAMMING SEARCH (Fingerprint similarity) + // ------------------------------------------------------------------------- + + /// Fingerprint similarity search using Hamming distance + /// + /// This loads fingerprints and uses SIMD for comparison. + /// For very large datasets, consider building a custom index. + pub async fn hamming_search( + &mut self, + query_fp: &Fingerprint, + k: usize, + threshold: Option, + ) -> Result> { + use crate::core::HammingEngine; + + let dataset = self.nodes().await?; + + // Load all fingerprints (for now - TODO: index) + let scanner = dataset + .scan() + .project(&["id", "label", "fingerprint", "qidx", "content", "properties", "created_at", "version"])? + .filter("fingerprint IS NOT NULL")? + .try_into_stream() + .await?; + + use futures::StreamExt; + let mut candidates: Vec<(NodeRecord, u32)> = Vec::new(); + let mut stream = scanner; + + while let Some(batch) = stream.next().await { + let batch = batch?; + let fp_col = batch + .column_by_name("fingerprint") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + + for i in 0..batch.num_rows() { + if fp_col.is_null(i) { + continue; + } + + let fp_bytes = fp_col.value(i); + let candidate_fp = Fingerprint::from_bytes(fp_bytes)?; + let distance = query_fp.hamming_distance(&candidate_fp); + + let node = NodeRecord::from_record_batch(&batch, i)?; + candidates.push((node, distance)); + } + } + + // Sort by distance + candidates.sort_by_key(|(_, d)| *d); + + // Apply threshold and limit + let max_distance = threshold.map(|t| ((1.0 - t) * 10000.0) as u32); + + let results: Vec<(NodeRecord, u32, f32)> = candidates + .into_iter() + .filter(|(_, d)| max_distance.map(|m| *d <= m).unwrap_or(true)) + .take(k) + .map(|(node, dist)| { + let similarity = 1.0 - (dist as f32 / 10000.0); + (node, dist, similarity) + }) + .collect(); + + Ok(results) + } + + // ------------------------------------------------------------------------- + // GRAPH QUERIES + // ------------------------------------------------------------------------- + + /// Execute raw SQL via DataFusion (delegated to query module) + pub async fn sql(&mut self, _query: &str) -> Result { + // This will be implemented in the datafusion module + // and called from here + todo!("Delegate to DataFusion execution engine") + } +} + +// ============================================================================= +// DATA RECORDS +// ============================================================================= + +/// Node record for insert/query operations +#[derive(Debug, Clone)] +pub struct NodeRecord { + pub id: String, + pub label: String, + pub fingerprint: Option>, + pub embedding: Option>, + pub qidx: u8, + pub thinking_style: Option>, + pub content: Option, + pub properties: Option, + pub created_at: i64, // microseconds since epoch + pub version: i64, +} + +impl NodeRecord { + /// Create a new node record + pub fn new(id: impl Into, label: impl Into) -> Self { + Self { + id: id.into(), + label: label.into(), + fingerprint: None, + embedding: None, + qidx: 128, + thinking_style: None, + content: None, + properties: None, + created_at: chrono::Utc::now().timestamp_micros(), + version: 1, + } + } + + /// Set fingerprint + pub fn with_fingerprint(mut self, fp: &Fingerprint) -> Self { + self.fingerprint = Some(fp.to_bytes().to_vec()); + self + } + + /// Set embedding + pub fn with_embedding(mut self, emb: Vec) -> Self { + self.embedding = Some(emb); + self + } + + /// Set qualia index + pub fn with_qidx(mut self, qidx: u8) -> Self { + self.qidx = qidx; + self + } + + /// Set thinking style + pub fn with_thinking_style(mut self, style: Vec) -> Self { + self.thinking_style = Some(style); + self + } + + /// Set content + pub fn with_content(mut self, content: impl Into) -> Self { + self.content = Some(content.into()); + self + } + + /// Set properties (JSON) + pub fn with_properties(mut self, props: impl Into) -> Self { + self.properties = Some(props.into()); + self + } + + /// Convert to Arrow RecordBatch + pub fn to_record_batch(&self) -> Result { + Self::batch_to_record_batch(std::slice::from_ref(self)) + } + + /// Convert multiple nodes to Arrow RecordBatch + pub fn batch_to_record_batch(nodes: &[Self]) -> Result { + let schema = Arc::new(nodes_schema()); + + let ids: StringArray = nodes.iter().map(|n| Some(n.id.as_str())).collect(); + let labels: StringArray = nodes.iter().map(|n| Some(n.label.as_str())).collect(); + + // Fingerprints + let fp_builder = FixedSizeBinaryBuilder::new(FINGERPRINT_BYTES as i32); + let mut fp_array = fp_builder; + for node in nodes { + if let Some(ref fp) = node.fingerprint { + fp_array.append_value(fp)?; + } else { + fp_array.append_null(); + } + } + let fingerprints = fp_array.finish(); + + // Embeddings (FixedSizeList of Float32) + let embedding_values: Vec>> = nodes.iter() + .map(|n| n.embedding.clone()) + .collect(); + let embeddings = create_fixed_size_list_f32(&embedding_values, EMBEDDING_DIM)?; + + let qidxs: UInt8Array = nodes.iter().map(|n| Some(n.qidx)).collect(); + + // Thinking styles + let style_values: Vec>> = nodes.iter() + .map(|n| n.thinking_style.clone()) + .collect(); + let thinking_styles = create_fixed_size_list_f32(&style_values, THINKING_STYLE_DIM)?; + + let contents: StringArray = nodes.iter() + .map(|n| n.content.as_deref()) + .collect(); + let properties: StringArray = nodes.iter() + .map(|n| n.properties.as_deref()) + .collect(); + let created_ats: TimestampMicrosecondArray = nodes.iter() + .map(|n| Some(n.created_at)) + .collect(); + let versions: Int64Array = nodes.iter() + .map(|n| Some(n.version)) + .collect(); + + Ok(RecordBatch::try_new( + schema, + vec![ + Arc::new(ids), + Arc::new(labels), + Arc::new(fingerprints), + Arc::new(embeddings), + Arc::new(qidxs), + Arc::new(thinking_styles), + Arc::new(contents), + Arc::new(properties), + Arc::new(created_ats), + Arc::new(versions), + ], + )?) + } + + /// Extract from RecordBatch at given row index + pub fn from_record_batch(batch: &RecordBatch, row: usize) -> Result { + let id = batch.column(0) + .as_any().downcast_ref::() + .unwrap().value(row).to_string(); + let label = batch.column(1) + .as_any().downcast_ref::() + .unwrap().value(row).to_string(); + + let fp_col = batch.column(2) + .as_any().downcast_ref::() + .unwrap(); + let fingerprint = if fp_col.is_null(row) { + None + } else { + Some(fp_col.value(row).to_vec()) + }; + + let qidx = batch.column(4) + .as_any().downcast_ref::() + .unwrap().value(row); + + let content_col = batch.column(6) + .as_any().downcast_ref::() + .unwrap(); + let content = if content_col.is_null(row) { + None + } else { + Some(content_col.value(row).to_string()) + }; + + let props_col = batch.column(7) + .as_any().downcast_ref::() + .unwrap(); + let properties = if props_col.is_null(row) { + None + } else { + Some(props_col.value(row).to_string()) + }; + + let created_at = batch.column(8) + .as_any().downcast_ref::() + .unwrap().value(row); + let version = batch.column(9) + .as_any().downcast_ref::() + .unwrap().value(row); + + Ok(Self { + id, + label, + fingerprint, + embedding: None, // TODO: extract from FixedSizeList + qidx, + thinking_style: None, // TODO: extract from FixedSizeList + content, + properties, + created_at, + version, + }) + } +} + +/// Edge record for insert/query operations +#[derive(Debug, Clone)] +pub struct EdgeRecord { + pub id: String, + pub from_id: String, + pub to_id: String, + pub edge_type: String, + pub weight: f32, + pub amplification: f32, + pub properties: Option, + pub created_at: i64, +} + +impl EdgeRecord { + /// Create a new edge + pub fn new(from_id: impl Into, to_id: impl Into, edge_type: impl Into) -> Self { + let from = from_id.into(); + let to = to_id.into(); + let etype = edge_type.into(); + Self { + id: format!("{}->{}:{}", from, to, etype), + from_id: from, + to_id: to, + edge_type: etype, + weight: 1.0, + amplification: 1.0, + properties: None, + created_at: chrono::Utc::now().timestamp_micros(), + } + } + + /// Set weight + pub fn with_weight(mut self, weight: f32) -> Self { + self.weight = weight; + self + } + + /// Set amplification factor + pub fn with_amplification(mut self, amp: f32) -> Self { + self.amplification = amp; + self + } + + /// Convert to RecordBatch + pub fn to_record_batch(&self) -> Result { + let schema = Arc::new(edges_schema()); + + let ids: StringArray = [Some(self.id.as_str())].into_iter().collect(); + let from_ids: StringArray = [Some(self.from_id.as_str())].into_iter().collect(); + let to_ids: StringArray = [Some(self.to_id.as_str())].into_iter().collect(); + let types: StringArray = [Some(self.edge_type.as_str())].into_iter().collect(); + let weights: Float32Array = [Some(self.weight)].into_iter().collect(); + let amplifications: Float32Array = [Some(self.amplification)].into_iter().collect(); + let properties: StringArray = [self.properties.as_deref()].into_iter().collect(); + let created_ats: TimestampMicrosecondArray = [Some(self.created_at)].into_iter().collect(); + + Ok(RecordBatch::try_new( + schema, + vec![ + Arc::new(ids), + Arc::new(from_ids), + Arc::new(to_ids), + Arc::new(types), + Arc::new(weights), + Arc::new(amplifications), + Arc::new(properties), + Arc::new(created_ats), + ], + )?) + } + + /// Extract from RecordBatch + pub fn from_record_batch(batch: &RecordBatch, row: usize) -> Result { + Ok(Self { + id: batch.column(0).as_any().downcast_ref::().unwrap().value(row).to_string(), + from_id: batch.column(1).as_any().downcast_ref::().unwrap().value(row).to_string(), + to_id: batch.column(2).as_any().downcast_ref::().unwrap().value(row).to_string(), + edge_type: batch.column(3).as_any().downcast_ref::().unwrap().value(row).to_string(), + weight: batch.column(4).as_any().downcast_ref::().unwrap().value(row), + amplification: batch.column(5).as_any().downcast_ref::().unwrap().value(row), + properties: { + let col = batch.column(6).as_any().downcast_ref::().unwrap(); + if col.is_null(row) { None } else { Some(col.value(row).to_string()) } + }, + created_at: batch.column(7).as_any().downcast_ref::().unwrap().value(row), + }) + } +} + +// ============================================================================= +// HELPERS +// ============================================================================= + +/// Create a FixedSizeList array from optional vectors +fn create_fixed_size_list_f32(values: &[Option>], size: usize) -> Result { + let mut builder = FixedSizeListBuilder::new(Float32Builder::new(), size as i32); + + for val in values { + if let Some(v) = val { + if v.len() != size { + return Err(Error::Storage(format!( + "Expected {} elements, got {}", size, v.len() + ))); + } + for &f in v { + builder.values().append_value(f); + } + builder.append(true); + } else { + for _ in 0..size { + builder.values().append_null(); + } + builder.append(false); + } + } + + Ok(builder.finish()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_node_record_roundtrip() { + let node = NodeRecord::new("test-1", "Thought") + .with_qidx(42) + .with_content("Hello world"); + + let batch = node.to_record_batch().unwrap(); + assert_eq!(batch.num_rows(), 1); + + let recovered = NodeRecord::from_record_batch(&batch, 0).unwrap(); + assert_eq!(recovered.id, "test-1"); + assert_eq!(recovered.label, "Thought"); + assert_eq!(recovered.qidx, 42); + assert_eq!(recovered.content, Some("Hello world".to_string())); + } + + #[tokio::test] + async fn test_edge_record_roundtrip() { + let edge = EdgeRecord::new("a", "b", "CAUSES") + .with_weight(0.8) + .with_amplification(1.5); + + let batch = edge.to_record_batch().unwrap(); + let recovered = EdgeRecord::from_record_batch(&batch, 0).unwrap(); + + assert_eq!(recovered.from_id, "a"); + assert_eq!(recovered.to_id, "b"); + assert_eq!(recovered.edge_type, "CAUSES"); + assert_eq!(recovered.weight, 0.8); + assert_eq!(recovered.amplification, 1.5); + } +} From a57ac0287cef2cbbaeb60c190f860ae4070f708e Mon Sep 17 00:00:00 2001 From: AdaWorldAPI Date: Fri, 30 Jan 2026 08:25:27 +0100 Subject: [PATCH 2/9] Add Cypher parser and SQL transpiler --- src/query/cypher.rs | 1397 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1397 insertions(+) create mode 100644 src/query/cypher.rs diff --git a/src/query/cypher.rs b/src/query/cypher.rs new file mode 100644 index 0000000..1a4fd0c --- /dev/null +++ b/src/query/cypher.rs @@ -0,0 +1,1397 @@ +//! Cypher Parser and Transpiler +//! +//! Parses Cypher queries and transpiles them to SQL with recursive CTEs. +//! This enables graph queries over the relational Lance storage. +//! +//! # Supported Cypher Features +//! +//! ```cypher +//! -- Simple pattern matching +//! MATCH (a:Thought)-[:CAUSES]->(b:Thought) +//! WHERE a.qidx > 100 +//! RETURN b +//! +//! -- Variable-length paths (recursive CTE) +//! MATCH (a)-[:CAUSES*1..5]->(b) +//! WHERE a.id = 'start' +//! RETURN b, path, amplification +//! +//! -- Multiple relationships +//! MATCH (a)-[:CAUSES|ENABLES]->(b) +//! RETURN a, b +//! +//! -- Create operations +//! CREATE (a:Thought {content: 'Hello'}) +//! CREATE (a)-[:CAUSES {weight: 0.8}]->(b) +//! ``` + +use std::collections::HashMap; +use crate::{Error, Result}; + +// ============================================================================= +// AST TYPES +// ============================================================================= + +/// Parsed Cypher query +#[derive(Debug, Clone)] +pub struct CypherQuery { + pub query_type: QueryType, + pub match_clause: Option, + pub where_clause: Option, + pub return_clause: Option, + pub order_by: Option, + pub limit: Option, + pub skip: Option, + pub create_clause: Option, + pub set_clause: Option, + pub delete_clause: Option, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum QueryType { + Match, + Create, + Merge, + Delete, + Set, +} + +/// MATCH clause: pattern to search for +#[derive(Debug, Clone)] +pub struct MatchClause { + pub patterns: Vec, +} + +/// A graph pattern: (node)-[edge]->(node)... +#[derive(Debug, Clone)] +pub struct Pattern { + pub elements: Vec, +} + +#[derive(Debug, Clone)] +pub enum PatternElement { + Node(NodePattern), + Edge(EdgePattern), +} + +/// Node pattern: (alias:Label {props}) +#[derive(Debug, Clone)] +pub struct NodePattern { + pub alias: Option, + pub labels: Vec, + pub properties: HashMap, +} + +/// Edge pattern: -[alias:TYPE*min..max {props}]-> +#[derive(Debug, Clone)] +pub struct EdgePattern { + pub alias: Option, + pub types: Vec, + pub direction: EdgeDirection, + pub min_hops: u32, + pub max_hops: u32, + pub properties: HashMap, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum EdgeDirection { + Outgoing, // -> + Incoming, // <- + Both, // - +} + +/// WHERE clause conditions +#[derive(Debug, Clone)] +pub struct WhereClause { + pub condition: Condition, +} + +#[derive(Debug, Clone)] +pub enum Condition { + Comparison { + left: Expr, + op: ComparisonOp, + right: Expr, + }, + And(Box, Box), + Or(Box, Box), + Not(Box), + IsNull(Expr), + IsNotNull(Expr), + In(Expr, Vec), +} + +#[derive(Debug, Clone, PartialEq)] +pub enum ComparisonOp { + Eq, // = + Ne, // <> + Lt, // < + Le, // <= + Gt, // > + Ge, // >= + Contains, + StartsWith, + EndsWith, +} + +#[derive(Debug, Clone)] +pub enum Expr { + Property { alias: String, property: String }, + Literal(Value), + Function { name: String, args: Vec }, + Variable(String), +} + +#[derive(Debug, Clone)] +pub enum Value { + String(String), + Integer(i64), + Float(f64), + Boolean(bool), + Null, + List(Vec), +} + +/// RETURN clause +#[derive(Debug, Clone)] +pub struct ReturnClause { + pub items: Vec, + pub distinct: bool, +} + +#[derive(Debug, Clone)] +pub struct ReturnItem { + pub expr: Expr, + pub alias: Option, +} + +/// ORDER BY clause +#[derive(Debug, Clone)] +pub struct OrderByClause { + pub items: Vec, +} + +#[derive(Debug, Clone)] +pub struct OrderItem { + pub expr: Expr, + pub direction: SortDirection, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum SortDirection { + Asc, + Desc, +} + +/// CREATE clause +#[derive(Debug, Clone)] +pub struct CreateClause { + pub patterns: Vec, +} + +/// SET clause +#[derive(Debug, Clone)] +pub struct SetClause { + pub items: Vec, +} + +#[derive(Debug, Clone)] +pub struct SetItem { + pub target: Expr, + pub value: Expr, +} + +/// DELETE clause +#[derive(Debug, Clone)] +pub struct DeleteClause { + pub items: Vec, + pub detach: bool, +} + +// ============================================================================= +// PARSER +// ============================================================================= + +/// Cypher parser +pub struct CypherParser { + tokens: Vec, + pos: usize, +} + +#[derive(Debug, Clone, PartialEq)] +enum Token { + // Keywords + Match, + Where, + Return, + Create, + Merge, + Delete, + Detach, + Set, + OrderBy, + Limit, + Skip, + And, + Or, + Not, + In, + Is, + Null, + Distinct, + As, + Asc, + Desc, + Contains, + StartsWith, + EndsWith, + + // Symbols + LParen, + RParen, + LBracket, + RBracket, + LBrace, + RBrace, + Colon, + Comma, + Dot, + Pipe, + Star, + DotDot, + Arrow, // -> + LeftArrow, // <- + Dash, // - + + // Operators + Eq, + Ne, + Lt, + Le, + Gt, + Ge, + + // Literals + Identifier(String), + StringLit(String), + IntLit(i64), + FloatLit(f64), + BoolLit(bool), + + // End + Eof, +} + +impl CypherParser { + /// Parse a Cypher query string + pub fn parse(input: &str) -> Result { + let tokens = Self::tokenize(input)?; + let mut parser = Self { tokens, pos: 0 }; + parser.parse_query() + } + + /// Tokenize input string + fn tokenize(input: &str) -> Result> { + let mut tokens = Vec::new(); + let chars: Vec = input.chars().collect(); + let mut i = 0; + + while i < chars.len() { + let c = chars[i]; + + // Skip whitespace + if c.is_whitespace() { + i += 1; + continue; + } + + // Skip comments + if c == '/' && i + 1 < chars.len() && chars[i + 1] == '/' { + while i < chars.len() && chars[i] != '\n' { + i += 1; + } + continue; + } + + // Symbols + match c { + '(' => { tokens.push(Token::LParen); i += 1; continue; } + ')' => { tokens.push(Token::RParen); i += 1; continue; } + '[' => { tokens.push(Token::LBracket); i += 1; continue; } + ']' => { tokens.push(Token::RBracket); i += 1; continue; } + '{' => { tokens.push(Token::LBrace); i += 1; continue; } + '}' => { tokens.push(Token::RBrace); i += 1; continue; } + ':' => { tokens.push(Token::Colon); i += 1; continue; } + ',' => { tokens.push(Token::Comma); i += 1; continue; } + '|' => { tokens.push(Token::Pipe); i += 1; continue; } + '*' => { tokens.push(Token::Star); i += 1; continue; } + '=' => { tokens.push(Token::Eq); i += 1; continue; } + _ => {} + } + + // Multi-char operators + if c == '-' { + if i + 1 < chars.len() && chars[i + 1] == '>' { + tokens.push(Token::Arrow); + i += 2; + continue; + } else { + tokens.push(Token::Dash); + i += 1; + continue; + } + } + + if c == '<' { + if i + 1 < chars.len() { + match chars[i + 1] { + '-' => { tokens.push(Token::LeftArrow); i += 2; continue; } + '=' => { tokens.push(Token::Le); i += 2; continue; } + '>' => { tokens.push(Token::Ne); i += 2; continue; } + _ => { tokens.push(Token::Lt); i += 1; continue; } + } + } else { + tokens.push(Token::Lt); + i += 1; + continue; + } + } + + if c == '>' { + if i + 1 < chars.len() && chars[i + 1] == '=' { + tokens.push(Token::Ge); + i += 2; + continue; + } else { + tokens.push(Token::Gt); + i += 1; + continue; + } + } + + if c == '.' { + if i + 1 < chars.len() && chars[i + 1] == '.' { + tokens.push(Token::DotDot); + i += 2; + continue; + } else { + tokens.push(Token::Dot); + i += 1; + continue; + } + } + + // String literals + if c == '\'' || c == '"' { + let quote = c; + i += 1; + let start = i; + while i < chars.len() && chars[i] != quote { + if chars[i] == '\\' && i + 1 < chars.len() { + i += 2; + } else { + i += 1; + } + } + let s: String = chars[start..i].iter().collect(); + tokens.push(Token::StringLit(s)); + i += 1; // skip closing quote + continue; + } + + // Numbers + if c.is_ascii_digit() || (c == '-' && i + 1 < chars.len() && chars[i + 1].is_ascii_digit()) { + let start = i; + if c == '-' { i += 1; } + while i < chars.len() && (chars[i].is_ascii_digit() || chars[i] == '.') { + i += 1; + } + let num_str: String = chars[start..i].iter().collect(); + if num_str.contains('.') { + tokens.push(Token::FloatLit(num_str.parse().unwrap())); + } else { + tokens.push(Token::IntLit(num_str.parse().unwrap())); + } + continue; + } + + // Identifiers and keywords + if c.is_alphabetic() || c == '_' { + let start = i; + while i < chars.len() && (chars[i].is_alphanumeric() || chars[i] == '_') { + i += 1; + } + let word: String = chars[start..i].iter().collect(); + let token = match word.to_uppercase().as_str() { + "MATCH" => Token::Match, + "WHERE" => Token::Where, + "RETURN" => Token::Return, + "CREATE" => Token::Create, + "MERGE" => Token::Merge, + "DELETE" => Token::Delete, + "DETACH" => Token::Detach, + "SET" => Token::Set, + "ORDER" => { + // Check for ORDER BY + while i < chars.len() && chars[i].is_whitespace() { i += 1; } + if i + 1 < chars.len() { + let by_start = i; + while i < chars.len() && chars[i].is_alphabetic() { i += 1; } + let by_word: String = chars[by_start..i].iter().collect(); + if by_word.to_uppercase() == "BY" { + Token::OrderBy + } else { + i = by_start; // reset + Token::Identifier(word) + } + } else { + Token::Identifier(word) + } + } + "BY" => Token::Identifier(word), // handled in ORDER + "LIMIT" => Token::Limit, + "SKIP" => Token::Skip, + "AND" => Token::And, + "OR" => Token::Or, + "NOT" => Token::Not, + "IN" => Token::In, + "IS" => Token::Is, + "NULL" => Token::Null, + "DISTINCT" => Token::Distinct, + "AS" => Token::As, + "ASC" => Token::Asc, + "DESC" => Token::Desc, + "CONTAINS" => Token::Contains, + "STARTS" => { + // STARTS WITH + while i < chars.len() && chars[i].is_whitespace() { i += 1; } + let with_start = i; + while i < chars.len() && chars[i].is_alphabetic() { i += 1; } + let with_word: String = chars[with_start..i].iter().collect(); + if with_word.to_uppercase() == "WITH" { + Token::StartsWith + } else { + i = with_start; + Token::Identifier(word) + } + } + "ENDS" => { + // ENDS WITH + while i < chars.len() && chars[i].is_whitespace() { i += 1; } + let with_start = i; + while i < chars.len() && chars[i].is_alphabetic() { i += 1; } + let with_word: String = chars[with_start..i].iter().collect(); + if with_word.to_uppercase() == "WITH" { + Token::EndsWith + } else { + i = with_start; + Token::Identifier(word) + } + } + "TRUE" => Token::BoolLit(true), + "FALSE" => Token::BoolLit(false), + _ => Token::Identifier(word), + }; + tokens.push(token); + continue; + } + + return Err(Error::Query(format!("Unexpected character: {}", c))); + } + + tokens.push(Token::Eof); + Ok(tokens) + } + + fn current(&self) -> &Token { + &self.tokens[self.pos] + } + + fn advance(&mut self) -> Token { + let t = self.tokens[self.pos].clone(); + if self.pos < self.tokens.len() - 1 { + self.pos += 1; + } + t + } + + fn expect(&mut self, expected: Token) -> Result<()> { + if std::mem::discriminant(self.current()) == std::mem::discriminant(&expected) { + self.advance(); + Ok(()) + } else { + Err(Error::Query(format!( + "Expected {:?}, got {:?}", expected, self.current() + ))) + } + } + + fn parse_query(&mut self) -> Result { + let mut query = CypherQuery { + query_type: QueryType::Match, + match_clause: None, + where_clause: None, + return_clause: None, + order_by: None, + limit: None, + skip: None, + create_clause: None, + set_clause: None, + delete_clause: None, + }; + + match self.current() { + Token::Match => { + query.query_type = QueryType::Match; + self.advance(); + query.match_clause = Some(self.parse_match()?); + } + Token::Create => { + query.query_type = QueryType::Create; + self.advance(); + query.create_clause = Some(self.parse_create()?); + } + _ => return Err(Error::Query("Expected MATCH or CREATE".into())), + } + + // Optional WHERE + if matches!(self.current(), Token::Where) { + self.advance(); + query.where_clause = Some(self.parse_where()?); + } + + // Optional RETURN + if matches!(self.current(), Token::Return) { + self.advance(); + query.return_clause = Some(self.parse_return()?); + } + + // Optional ORDER BY + if matches!(self.current(), Token::OrderBy) { + self.advance(); + query.order_by = Some(self.parse_order_by()?); + } + + // Optional LIMIT + if matches!(self.current(), Token::Limit) { + self.advance(); + if let Token::IntLit(n) = self.advance() { + query.limit = Some(n as u64); + } + } + + // Optional SKIP + if matches!(self.current(), Token::Skip) { + self.advance(); + if let Token::IntLit(n) = self.advance() { + query.skip = Some(n as u64); + } + } + + Ok(query) + } + + fn parse_match(&mut self) -> Result { + let patterns = vec![self.parse_pattern()?]; + Ok(MatchClause { patterns }) + } + + fn parse_pattern(&mut self) -> Result { + let mut elements = Vec::new(); + + // First element must be a node + elements.push(PatternElement::Node(self.parse_node_pattern()?)); + + // Then alternating edges and nodes + loop { + if self.is_edge_start() { + elements.push(PatternElement::Edge(self.parse_edge_pattern()?)); + elements.push(PatternElement::Node(self.parse_node_pattern()?)); + } else { + break; + } + } + + Ok(Pattern { elements }) + } + + fn is_edge_start(&self) -> bool { + matches!(self.current(), Token::Dash | Token::LeftArrow) + } + + fn parse_node_pattern(&mut self) -> Result { + self.expect(Token::LParen)?; + + let mut node = NodePattern { + alias: None, + labels: Vec::new(), + properties: HashMap::new(), + }; + + // Optional alias + if let Token::Identifier(id) = self.current() { + node.alias = Some(id.clone()); + self.advance(); + } + + // Optional labels + while matches!(self.current(), Token::Colon) { + self.advance(); + if let Token::Identifier(label) = self.advance() { + node.labels.push(label); + } + } + + // Optional properties + if matches!(self.current(), Token::LBrace) { + node.properties = self.parse_properties()?; + } + + self.expect(Token::RParen)?; + Ok(node) + } + + fn parse_edge_pattern(&mut self) -> Result { + let mut edge = EdgePattern { + alias: None, + types: Vec::new(), + direction: EdgeDirection::Outgoing, + min_hops: 1, + max_hops: 1, + properties: HashMap::new(), + }; + + // Direction start + if matches!(self.current(), Token::LeftArrow) { + edge.direction = EdgeDirection::Incoming; + self.advance(); + } else { + self.expect(Token::Dash)?; + } + + // Edge details [...] + if matches!(self.current(), Token::LBracket) { + self.advance(); + + // Optional alias + if let Token::Identifier(id) = self.current() { + edge.alias = Some(id.clone()); + self.advance(); + } + + // Optional types + while matches!(self.current(), Token::Colon | Token::Pipe) { + if matches!(self.current(), Token::Pipe) { + self.advance(); + } else { + self.advance(); // colon + } + if let Token::Identifier(t) = self.advance() { + edge.types.push(t); + } + } + + // Optional variable length *min..max + if matches!(self.current(), Token::Star) { + self.advance(); + + // min + if let Token::IntLit(n) = self.current() { + edge.min_hops = *n as u32; + self.advance(); + } + + // ..max + if matches!(self.current(), Token::DotDot) { + self.advance(); + if let Token::IntLit(n) = self.current() { + edge.max_hops = *n as u32; + self.advance(); + } else { + edge.max_hops = 10; // default max + } + } else { + edge.max_hops = edge.min_hops; + } + } + + // Optional properties + if matches!(self.current(), Token::LBrace) { + edge.properties = self.parse_properties()?; + } + + self.expect(Token::RBracket)?; + } + + // Direction end + if edge.direction == EdgeDirection::Incoming { + self.expect(Token::Dash)?; + } else if matches!(self.current(), Token::Arrow) { + self.advance(); + } else { + self.expect(Token::Dash)?; + edge.direction = EdgeDirection::Both; + } + + Ok(edge) + } + + fn parse_properties(&mut self) -> Result> { + self.expect(Token::LBrace)?; + let mut props = HashMap::new(); + + loop { + if matches!(self.current(), Token::RBrace) { + break; + } + + // key: value + let key = if let Token::Identifier(k) = self.advance() { + k + } else { + return Err(Error::Query("Expected property key".into())); + }; + + self.expect(Token::Colon)?; + let value = self.parse_value()?; + props.insert(key, value); + + if matches!(self.current(), Token::Comma) { + self.advance(); + } else { + break; + } + } + + self.expect(Token::RBrace)?; + Ok(props) + } + + fn parse_value(&mut self) -> Result { + match self.advance() { + Token::StringLit(s) => Ok(Value::String(s)), + Token::IntLit(n) => Ok(Value::Integer(n)), + Token::FloatLit(f) => Ok(Value::Float(f)), + Token::BoolLit(b) => Ok(Value::Boolean(b)), + Token::Null => Ok(Value::Null), + t => Err(Error::Query(format!("Expected value, got {:?}", t))), + } + } + + fn parse_where(&mut self) -> Result { + let condition = self.parse_condition()?; + Ok(WhereClause { condition }) + } + + fn parse_condition(&mut self) -> Result { + let mut left = self.parse_comparison()?; + + loop { + match self.current() { + Token::And => { + self.advance(); + let right = self.parse_comparison()?; + left = Condition::And(Box::new(left), Box::new(right)); + } + Token::Or => { + self.advance(); + let right = self.parse_comparison()?; + left = Condition::Or(Box::new(left), Box::new(right)); + } + _ => break, + } + } + + Ok(left) + } + + fn parse_comparison(&mut self) -> Result { + let left = self.parse_expr()?; + + let op = match self.current() { + Token::Eq => ComparisonOp::Eq, + Token::Ne => ComparisonOp::Ne, + Token::Lt => ComparisonOp::Lt, + Token::Le => ComparisonOp::Le, + Token::Gt => ComparisonOp::Gt, + Token::Ge => ComparisonOp::Ge, + Token::Contains => ComparisonOp::Contains, + Token::StartsWith => ComparisonOp::StartsWith, + Token::EndsWith => ComparisonOp::EndsWith, + Token::Is => { + self.advance(); + if matches!(self.current(), Token::Not) { + self.advance(); + self.expect(Token::Null)?; + return Ok(Condition::IsNotNull(left)); + } else { + self.expect(Token::Null)?; + return Ok(Condition::IsNull(left)); + } + } + _ => return Ok(Condition::Comparison { + left: left.clone(), + op: ComparisonOp::Eq, + right: Expr::Literal(Value::Boolean(true)), + }), + }; + + self.advance(); + let right = self.parse_expr()?; + + Ok(Condition::Comparison { left, op, right }) + } + + fn parse_expr(&mut self) -> Result { + match self.current().clone() { + Token::Identifier(name) => { + self.advance(); + if matches!(self.current(), Token::Dot) { + self.advance(); + if let Token::Identifier(prop) = self.advance() { + Ok(Expr::Property { alias: name, property: prop }) + } else { + Err(Error::Query("Expected property name".into())) + } + } else if matches!(self.current(), Token::LParen) { + // Function call + self.advance(); + let mut args = Vec::new(); + while !matches!(self.current(), Token::RParen) { + args.push(self.parse_expr()?); + if matches!(self.current(), Token::Comma) { + self.advance(); + } + } + self.expect(Token::RParen)?; + Ok(Expr::Function { name, args }) + } else { + Ok(Expr::Variable(name)) + } + } + Token::StringLit(s) => { + self.advance(); + Ok(Expr::Literal(Value::String(s))) + } + Token::IntLit(n) => { + self.advance(); + Ok(Expr::Literal(Value::Integer(n))) + } + Token::FloatLit(f) => { + self.advance(); + Ok(Expr::Literal(Value::Float(f))) + } + Token::BoolLit(b) => { + self.advance(); + Ok(Expr::Literal(Value::Boolean(b))) + } + Token::Null => { + self.advance(); + Ok(Expr::Literal(Value::Null)) + } + _ => Err(Error::Query(format!("Unexpected token in expression: {:?}", self.current()))), + } + } + + fn parse_return(&mut self) -> Result { + let distinct = if matches!(self.current(), Token::Distinct) { + self.advance(); + true + } else { + false + }; + + let mut items = Vec::new(); + loop { + let expr = self.parse_expr()?; + let alias = if matches!(self.current(), Token::As) { + self.advance(); + if let Token::Identifier(a) = self.advance() { + Some(a) + } else { + None + } + } else { + None + }; + items.push(ReturnItem { expr, alias }); + + if matches!(self.current(), Token::Comma) { + self.advance(); + } else { + break; + } + } + + Ok(ReturnClause { items, distinct }) + } + + fn parse_order_by(&mut self) -> Result { + let mut items = Vec::new(); + + loop { + let expr = self.parse_expr()?; + let direction = match self.current() { + Token::Desc => { self.advance(); SortDirection::Desc } + Token::Asc => { self.advance(); SortDirection::Asc } + _ => SortDirection::Asc, + }; + items.push(OrderItem { expr, direction }); + + if matches!(self.current(), Token::Comma) { + self.advance(); + } else { + break; + } + } + + Ok(OrderByClause { items }) + } + + fn parse_create(&mut self) -> Result { + let patterns = vec![self.parse_pattern()?]; + Ok(CreateClause { patterns }) + } +} + +// ============================================================================= +// TRANSPILER (Cypher → SQL) +// ============================================================================= + +/// Transpile Cypher AST to SQL +pub struct CypherTranspiler; + +impl CypherTranspiler { + /// Transpile a Cypher query to SQL + pub fn transpile(query: &CypherQuery) -> Result { + match query.query_type { + QueryType::Match => Self::transpile_match(query), + QueryType::Create => Self::transpile_create(query), + _ => Err(Error::Query("Unsupported query type".into())), + } + } + + fn transpile_match(query: &CypherQuery) -> Result { + let match_clause = query.match_clause.as_ref() + .ok_or_else(|| Error::Query("Missing MATCH clause".into()))?; + + let pattern = &match_clause.patterns[0]; + + // Determine if we need recursive CTE + let needs_recursive = pattern.elements.iter().any(|e| { + if let PatternElement::Edge(edge) = e { + edge.max_hops > 1 + } else { + false + } + }); + + if needs_recursive { + Self::transpile_recursive_match(query, pattern) + } else { + Self::transpile_simple_match(query, pattern) + } + } + + fn transpile_simple_match(query: &CypherQuery, pattern: &Pattern) -> Result { + let mut sql = String::new(); + let mut tables = Vec::new(); + let mut joins = Vec::new(); + let mut where_parts = Vec::new(); + + let mut node_idx = 0; + let mut edge_idx = 0; + + for element in &pattern.elements { + match element { + PatternElement::Node(node) => { + let alias = node.alias.clone() + .unwrap_or_else(|| format!("n{}", node_idx)); + + if node_idx == 0 { + tables.push(format!("nodes AS {}", alias)); + } + + // Label filter + if !node.labels.is_empty() { + where_parts.push(format!( + "{}.label = '{}'", + alias, + node.labels[0] + )); + } + + // Property filters + for (key, value) in &node.properties { + where_parts.push(format!( + "{}.{} = {}", + alias, + key, + Self::value_to_sql(value) + )); + } + + node_idx += 1; + } + PatternElement::Edge(edge) => { + let edge_alias = edge.alias.clone() + .unwrap_or_else(|| format!("e{}", edge_idx)); + let prev_node_alias = pattern.elements.get(node_idx * 2 - 2) + .and_then(|e| if let PatternElement::Node(n) = e { n.alias.clone() } else { None }) + .unwrap_or_else(|| format!("n{}", node_idx - 1)); + let next_node_alias = format!("n{}", node_idx); + + // Join edge table + let (from_col, to_col) = match edge.direction { + EdgeDirection::Outgoing => ("from_id", "to_id"), + EdgeDirection::Incoming => ("to_id", "from_id"), + EdgeDirection::Both => ("from_id", "to_id"), // simplified + }; + + joins.push(format!( + "JOIN edges AS {} ON {}.id = {}.{}", + edge_alias, prev_node_alias, edge_alias, from_col + )); + joins.push(format!( + "JOIN nodes AS {} ON {}.{} = {}.id", + next_node_alias, edge_alias, to_col, next_node_alias + )); + + // Edge type filter + if !edge.types.is_empty() { + let types_sql = edge.types.iter() + .map(|t| format!("'{}'", t)) + .collect::>() + .join(", "); + where_parts.push(format!("{}.type IN ({})", edge_alias, types_sql)); + } + + edge_idx += 1; + } + } + } + + // Build SELECT clause + let select_cols = if let Some(ref ret) = query.return_clause { + ret.items.iter() + .map(|item| Self::expr_to_sql(&item.expr)) + .collect::>() + .join(", ") + } else { + "*".to_string() + }; + + sql.push_str(&format!("SELECT {}\n", select_cols)); + sql.push_str(&format!("FROM {}\n", tables.join(", "))); + + for join in joins { + sql.push_str(&join); + sql.push('\n'); + } + + // WHERE clause + if let Some(ref where_clause) = query.where_clause { + where_parts.push(Self::condition_to_sql(&where_clause.condition)); + } + + if !where_parts.is_empty() { + sql.push_str(&format!("WHERE {}\n", where_parts.join(" AND "))); + } + + // ORDER BY + if let Some(ref order) = query.order_by { + let order_sql = order.items.iter() + .map(|item| { + let dir = if item.direction == SortDirection::Desc { "DESC" } else { "ASC" }; + format!("{} {}", Self::expr_to_sql(&item.expr), dir) + }) + .collect::>() + .join(", "); + sql.push_str(&format!("ORDER BY {}\n", order_sql)); + } + + // LIMIT + if let Some(limit) = query.limit { + sql.push_str(&format!("LIMIT {}\n", limit)); + } + + // OFFSET (SKIP) + if let Some(skip) = query.skip { + sql.push_str(&format!("OFFSET {}\n", skip)); + } + + Ok(sql) + } + + fn transpile_recursive_match(query: &CypherQuery, pattern: &Pattern) -> Result { + // Extract start node, edge, and end node + let (start_node, edge, end_node) = Self::extract_path_pattern(pattern)?; + + let edge_type_filter = if !edge.types.is_empty() { + format!( + "AND e.type IN ({})", + edge.types.iter().map(|t| format!("'{}'", t)).collect::>().join(", ") + ) + } else { + String::new() + }; + + // Start condition + let start_where = if !start_node.labels.is_empty() { + format!("WHERE label = '{}'", start_node.labels[0]) + } else { + String::new() + }; + + // Build recursive CTE + let sql = format!(r#" +WITH RECURSIVE traverse AS ( + -- Base case: start nodes + SELECT + id, + ARRAY[id] as path, + 1.0 as amplification, + 0 as depth + FROM nodes + {start_where} + + UNION ALL + + -- Recursive case: follow edges + SELECT + n.id, + t.path || n.id, + t.amplification * COALESCE(e.amplification, e.weight, 1.0), + t.depth + 1 + FROM traverse t + JOIN edges e ON t.id = e.from_id {edge_type_filter} + JOIN nodes n ON e.to_id = n.id + WHERE t.depth < {max_depth} + AND n.id != ALL(t.path) -- Cycle detection +) +SELECT t.*, n.* +FROM traverse t +JOIN nodes n ON t.id = n.id +WHERE t.depth >= {min_depth} +{end_label_filter} +{user_where} +ORDER BY t.depth, t.amplification DESC +{limit} +"#, + start_where = start_where, + edge_type_filter = edge_type_filter, + max_depth = edge.max_hops, + min_depth = edge.min_hops, + end_label_filter = if let Some(ref end) = end_node { + if !end.labels.is_empty() { + format!(" AND n.label = '{}'", end.labels[0]) + } else { + String::new() + } + } else { + String::new() + }, + user_where = if let Some(ref w) = query.where_clause { + format!(" AND ({})", Self::condition_to_sql(&w.condition)) + } else { + String::new() + }, + limit = query.limit.map(|l| format!("LIMIT {}", l)).unwrap_or_default(), + ); + + Ok(sql) + } + + fn extract_path_pattern(pattern: &Pattern) -> Result<(NodePattern, EdgePattern, Option)> { + let start = match pattern.elements.first() { + Some(PatternElement::Node(n)) => n.clone(), + _ => return Err(Error::Query("Pattern must start with a node".into())), + }; + + let edge = match pattern.elements.get(1) { + Some(PatternElement::Edge(e)) => e.clone(), + _ => return Err(Error::Query("Pattern must have an edge".into())), + }; + + let end = match pattern.elements.get(2) { + Some(PatternElement::Node(n)) => Some(n.clone()), + _ => None, + }; + + Ok((start, edge, end)) + } + + fn transpile_create(query: &CypherQuery) -> Result { + let create_clause = query.create_clause.as_ref() + .ok_or_else(|| Error::Query("Missing CREATE clause".into()))?; + + let mut sql = String::new(); + + for pattern in &create_clause.patterns { + for element in &pattern.elements { + match element { + PatternElement::Node(node) => { + let id = node.properties.get("id") + .map(|v| Self::value_to_sql(v)) + .unwrap_or_else(|| format!("'{}'", uuid::Uuid::new_v4())); + + let label = node.labels.first() + .map(|l| format!("'{}'", l)) + .unwrap_or_else(|| "'Node'".to_string()); + + let props = serde_json::to_string(&node.properties) + .unwrap_or_else(|_| "{}".to_string()); + + sql.push_str(&format!( + "INSERT INTO nodes (id, label, properties) VALUES ({}, {}, '{}');\n", + id, label, props + )); + } + PatternElement::Edge(edge) => { + // Edge creation requires knowing the from/to node IDs + // This is simplified - real implementation needs alias resolution + } + } + } + } + + Ok(sql) + } + + fn value_to_sql(value: &Value) -> String { + match value { + Value::String(s) => format!("'{}'", s.replace('\'', "''")), + Value::Integer(n) => n.to_string(), + Value::Float(f) => f.to_string(), + Value::Boolean(b) => if *b { "TRUE" } else { "FALSE" }.to_string(), + Value::Null => "NULL".to_string(), + Value::List(items) => { + let vals = items.iter().map(Self::value_to_sql).collect::>().join(", "); + format!("ARRAY[{}]", vals) + } + } + } + + fn expr_to_sql(expr: &Expr) -> String { + match expr { + Expr::Property { alias, property } => format!("{}.{}", alias, property), + Expr::Literal(v) => Self::value_to_sql(v), + Expr::Variable(v) => format!("{}.*", v), + Expr::Function { name, args } => { + let args_sql = args.iter().map(Self::expr_to_sql).collect::>().join(", "); + format!("{}({})", name, args_sql) + } + } + } + + fn condition_to_sql(cond: &Condition) -> String { + match cond { + Condition::Comparison { left, op, right } => { + let op_str = match op { + ComparisonOp::Eq => "=", + ComparisonOp::Ne => "<>", + ComparisonOp::Lt => "<", + ComparisonOp::Le => "<=", + ComparisonOp::Gt => ">", + ComparisonOp::Ge => ">=", + ComparisonOp::Contains => "LIKE", + ComparisonOp::StartsWith => "LIKE", + ComparisonOp::EndsWith => "LIKE", + }; + + let right_sql = match op { + ComparisonOp::Contains => { + if let Expr::Literal(Value::String(s)) = right { + format!("'%{}%'", s) + } else { + Self::expr_to_sql(right) + } + } + ComparisonOp::StartsWith => { + if let Expr::Literal(Value::String(s)) = right { + format!("'{}%'", s) + } else { + Self::expr_to_sql(right) + } + } + ComparisonOp::EndsWith => { + if let Expr::Literal(Value::String(s)) = right { + format!("'%{}'", s) + } else { + Self::expr_to_sql(right) + } + } + _ => Self::expr_to_sql(right), + }; + + format!("{} {} {}", Self::expr_to_sql(left), op_str, right_sql) + } + Condition::And(left, right) => { + format!("({}) AND ({})", Self::condition_to_sql(left), Self::condition_to_sql(right)) + } + Condition::Or(left, right) => { + format!("({}) OR ({})", Self::condition_to_sql(left), Self::condition_to_sql(right)) + } + Condition::Not(inner) => { + format!("NOT ({})", Self::condition_to_sql(inner)) + } + Condition::IsNull(expr) => { + format!("{} IS NULL", Self::expr_to_sql(expr)) + } + Condition::IsNotNull(expr) => { + format!("{} IS NOT NULL", Self::expr_to_sql(expr)) + } + Condition::In(expr, values) => { + let vals = values.iter().map(Self::value_to_sql).collect::>().join(", "); + format!("{} IN ({})", Self::expr_to_sql(expr), vals) + } + } + } +} + +// ============================================================================= +// PUBLIC API +// ============================================================================= + +/// Parse and transpile Cypher to SQL +pub fn cypher_to_sql(cypher: &str) -> Result { + let query = CypherParser::parse(cypher)?; + CypherTranspiler::transpile(&query) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_simple_match() { + let cypher = "MATCH (a:Thought)-[:CAUSES]->(b:Thought) RETURN b"; + let sql = cypher_to_sql(cypher).unwrap(); + assert!(sql.contains("SELECT")); + assert!(sql.contains("JOIN edges")); + assert!(sql.contains("type IN ('CAUSES')")); + } + + #[test] + fn test_variable_length() { + let cypher = "MATCH (a)-[:CAUSES*1..5]->(b) RETURN b"; + let sql = cypher_to_sql(cypher).unwrap(); + assert!(sql.contains("WITH RECURSIVE")); + assert!(sql.contains("depth < 5")); + } + + #[test] + fn test_where_clause() { + let cypher = "MATCH (a:Thought) WHERE a.qidx > 100 RETURN a"; + let sql = cypher_to_sql(cypher).unwrap(); + assert!(sql.contains("a.qidx > 100")); + } + + #[test] + fn test_multi_type_edge() { + let cypher = "MATCH (a)-[:CAUSES|ENABLES]->(b) RETURN b"; + let sql = cypher_to_sql(cypher).unwrap(); + assert!(sql.contains("'CAUSES'")); + assert!(sql.contains("'ENABLES'")); + } +} From f95b3247e4d236890af4abdfdf585795adb7b21c Mon Sep 17 00:00:00 2001 From: AdaWorldAPI Date: Fri, 30 Jan 2026 08:25:32 +0100 Subject: [PATCH 3/9] Add DataFusion SQL engine with Hamming UDFs --- src/query/datafusion.rs | 762 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 762 insertions(+) create mode 100644 src/query/datafusion.rs diff --git a/src/query/datafusion.rs b/src/query/datafusion.rs new file mode 100644 index 0000000..37e0616 --- /dev/null +++ b/src/query/datafusion.rs @@ -0,0 +1,762 @@ +//! DataFusion SQL Execution Engine +//! +//! Integrates Apache DataFusion for SQL query execution over Lance tables. +//! Registers custom UDFs for Hamming distance and similarity. +//! +//! # Architecture +//! +//! ```text +//! ┌─────────────────────────────────────────────────────────────────┐ +//! │ DATAFUSION EXECUTION │ +//! ├─────────────────────────────────────────────────────────────────┤ +//! │ │ +//! │ Query → Parser → Logical Plan → Physical Plan → Execution │ +//! │ │ +//! │ Custom UDFs: │ +//! │ - hamming(a, b) → distance (0-10000) │ +//! │ - similarity(a, b) → similarity (0.0-1.0) │ +//! │ - popcount(x) → count of set bits │ +//! │ - xor_bind(a, b) → XOR of two fingerprints │ +//! │ │ +//! │ Data Sources: │ +//! │ - Lance tables (nodes, edges, sessions) │ +//! │ - In-memory Arrow batches │ +//! │ │ +//! └─────────────────────────────────────────────────────────────────┘ +//! ``` + +use arrow::array::*; +use arrow::datatypes::DataType; +use arrow::record_batch::RecordBatch; +use datafusion::prelude::*; +use datafusion::execution::context::SessionContext; +use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; +use std::sync::Arc; + +use crate::core::DIM; +use crate::{Error, Result}; + +// ============================================================================= +// HAMMING OPERATIONS (Pure functions for UDFs) +// ============================================================================= + +/// Compute Hamming distance between two byte slices +fn hamming_distance_bytes(a: &[u8], b: &[u8]) -> u32 { + let min_len = a.len().min(b.len()); + let mut dist: u32 = 0; + + // Process 8 bytes at a time for SIMD-friendly access + let chunks = min_len / 8; + for i in 0..chunks { + let offset = i * 8; + let a_u64 = u64::from_le_bytes(a[offset..offset + 8].try_into().unwrap()); + let b_u64 = u64::from_le_bytes(b[offset..offset + 8].try_into().unwrap()); + dist += (a_u64 ^ b_u64).count_ones(); + } + + // Handle remaining bytes + for i in (chunks * 8)..min_len { + dist += (a[i] ^ b[i]).count_ones(); + } + + dist +} + +/// Compute similarity from Hamming distance +fn hamming_similarity(dist: u32) -> f32 { + 1.0 - (dist as f32 / DIM as f32) +} + +/// XOR bind two fingerprints +fn xor_bind_bytes(a: &[u8], b: &[u8]) -> Vec { + a.iter().zip(b.iter()).map(|(x, y)| x ^ y).collect() +} + +// ============================================================================= +// EXECUTION CONTEXT +// ============================================================================= + +/// DataFusion-based SQL execution engine +pub struct SqlEngine { + /// DataFusion session context + ctx: SessionContext, + /// Path to database (for Lance table registration) + db_path: Option, +} + +impl SqlEngine { + /// Create a new SQL engine + pub async fn new() -> Self { + let ctx = SessionContext::new(); + let mut engine = Self { + ctx, + db_path: None, + }; + + // Register custom UDFs + engine.register_udfs(); + + engine + } + + /// Create engine with database path for Lance tables + pub async fn with_database(path: impl Into) -> Result { + let mut engine = Self::new().await; + let db_path = path.into(); + engine.db_path = Some(db_path.clone()); + + // Register Lance tables + engine.register_lance_tables(&db_path).await?; + + Ok(engine) + } + + /// Register all custom UDFs + fn register_udfs(&mut self) { + // hamming(a, b) -> distance + self.ctx.register_udf(create_hamming_udf()); + + // similarity(a, b) -> 0.0-1.0 + self.ctx.register_udf(create_similarity_udf()); + + // popcount(x) -> count + self.ctx.register_udf(create_popcount_udf()); + + // xor_bind(a, b) -> fingerprint + self.ctx.register_udf(create_xor_bind_udf()); + } + + /// Register Lance tables as DataFusion tables + async fn register_lance_tables(&mut self, db_path: &str) -> Result<()> { + use lance::dataset::Dataset; + use datafusion::datasource::MemTable; + + // Register nodes table + let nodes_path = format!("{}/nodes.lance", db_path); + if std::path::Path::new(&nodes_path).exists() { + let dataset = Dataset::open(&nodes_path).await?; + let schema = dataset.schema().clone(); + + // Read all data into memory (for now - TODO: use Lance TableProvider) + let batches = dataset + .scan() + .try_into_stream() + .await?; + + use futures::StreamExt; + let mut all_batches = Vec::new(); + let mut stream = batches; + while let Some(batch) = stream.next().await { + all_batches.push(batch?); + } + + if !all_batches.is_empty() { + let table = MemTable::try_new(Arc::new(schema.into()), vec![all_batches])?; + self.ctx.register_table("nodes", Arc::new(table))?; + } + } + + // Register edges table + let edges_path = format!("{}/edges.lance", db_path); + if std::path::Path::new(&edges_path).exists() { + let dataset = Dataset::open(&edges_path).await?; + let schema = dataset.schema().clone(); + + let batches = dataset + .scan() + .try_into_stream() + .await?; + + use futures::StreamExt; + let mut all_batches = Vec::new(); + let mut stream = batches; + while let Some(batch) = stream.next().await { + all_batches.push(batch?); + } + + if !all_batches.is_empty() { + let table = MemTable::try_new(Arc::new(schema.into()), vec![all_batches])?; + self.ctx.register_table("edges", Arc::new(table))?; + } + } + + Ok(()) + } + + /// Register an in-memory table + pub fn register_table(&mut self, name: &str, batches: Vec) -> Result<()> { + if batches.is_empty() { + return Ok(()); + } + + let schema = batches[0].schema(); + let table = datafusion::datasource::MemTable::try_new(schema, vec![batches])?; + self.ctx.register_table(name, Arc::new(table))?; + Ok(()) + } + + /// Execute a SQL query + pub async fn execute(&self, sql: &str) -> Result> { + let df = self.ctx.sql(sql).await?; + let batches = df.collect().await?; + Ok(batches) + } + + /// Execute a SQL query and return a DataFrame + pub async fn query(&self, sql: &str) -> Result { + let df = self.ctx.sql(sql).await?; + Ok(df) + } + + /// Execute with parameters (prepared statement style) + pub async fn execute_with_params( + &self, + sql: &str, + params: &[(&str, ScalarValue)], + ) -> Result> { + // Replace $param with actual values + let mut processed_sql = sql.to_string(); + for (name, value) in params { + let placeholder = format!("${}", name); + let replacement = match value { + ScalarValue::Utf8(Some(s)) => format!("'{}'", s.replace('\'', "''")), + ScalarValue::Int64(Some(n)) => n.to_string(), + ScalarValue::Float64(Some(f)) => f.to_string(), + ScalarValue::Boolean(Some(b)) => if *b { "TRUE" } else { "FALSE" }.to_string(), + ScalarValue::Binary(Some(b)) => format!("X'{}'", hex::encode(b)), + _ => "NULL".to_string(), + }; + processed_sql = processed_sql.replace(&placeholder, &replacement); + } + + self.execute(&processed_sql).await + } + + /// Get the underlying DataFusion context + pub fn context(&self) -> &SessionContext { + &self.ctx + } +} + +impl Default for SqlEngine { + fn default() -> Self { + // Create synchronously for Default trait + let ctx = SessionContext::new(); + let mut engine = Self { + ctx, + db_path: None, + }; + engine.register_udfs(); + engine + } +} + +// ============================================================================= +// UDF IMPLEMENTATIONS +// ============================================================================= + +use datafusion::arrow::datatypes::Field; +use datafusion::logical_expr::ColumnarValue; +use datafusion::scalar::ScalarValue; + +/// Create hamming distance UDF +fn create_hamming_udf() -> ScalarUDF { + let signature = Signature::new( + TypeSignature::Any(2), + Volatility::Immutable, + ); + + ScalarUDF::new( + "hamming", + &signature, + &(|_: &[DataType]| Ok(Arc::new(DataType::UInt32))), + &(|args: &[ColumnarValue]| { + match (&args[0], &args[1]) { + (ColumnarValue::Array(a), ColumnarValue::Array(b)) => { + let result = hamming_array(a.clone(), b.clone())?; + Ok(ColumnarValue::Array(result)) + } + (ColumnarValue::Scalar(ScalarValue::Binary(Some(a))), + ColumnarValue::Scalar(ScalarValue::Binary(Some(b)))) => { + let dist = hamming_distance_bytes(a, b); + Ok(ColumnarValue::Scalar(ScalarValue::UInt32(Some(dist)))) + } + _ => Err(datafusion::error::DataFusionError::Execution( + "hamming requires binary arguments".into() + )) + } + }), + ) +} + +/// Create similarity UDF +fn create_similarity_udf() -> ScalarUDF { + let signature = Signature::new( + TypeSignature::Any(2), + Volatility::Immutable, + ); + + ScalarUDF::new( + "similarity", + &signature, + &(|_: &[DataType]| Ok(Arc::new(DataType::Float32))), + &(|args: &[ColumnarValue]| { + match (&args[0], &args[1]) { + (ColumnarValue::Array(a), ColumnarValue::Array(b)) => { + let result = similarity_array(a.clone(), b.clone())?; + Ok(ColumnarValue::Array(result)) + } + (ColumnarValue::Scalar(ScalarValue::Binary(Some(a))), + ColumnarValue::Scalar(ScalarValue::Binary(Some(b)))) => { + let dist = hamming_distance_bytes(a, b); + let sim = hamming_similarity(dist); + Ok(ColumnarValue::Scalar(ScalarValue::Float32(Some(sim)))) + } + _ => Err(datafusion::error::DataFusionError::Execution( + "similarity requires binary arguments".into() + )) + } + }), + ) +} + +/// Create popcount UDF +fn create_popcount_udf() -> ScalarUDF { + let signature = Signature::new( + TypeSignature::Any(1), + Volatility::Immutable, + ); + + ScalarUDF::new( + "popcount", + &signature, + &(|_: &[DataType]| Ok(Arc::new(DataType::UInt32))), + &(|args: &[ColumnarValue]| { + match &args[0] { + ColumnarValue::Array(arr) => { + let result = popcount_array(arr.clone())?; + Ok(ColumnarValue::Array(result)) + } + ColumnarValue::Scalar(ScalarValue::UInt64(Some(n))) => { + Ok(ColumnarValue::Scalar(ScalarValue::UInt32(Some(n.count_ones())))) + } + _ => Err(datafusion::error::DataFusionError::Execution( + "popcount requires uint64 argument".into() + )) + } + }), + ) +} + +/// Create xor_bind UDF +fn create_xor_bind_udf() -> ScalarUDF { + let signature = Signature::new( + TypeSignature::Any(2), + Volatility::Immutable, + ); + + ScalarUDF::new( + "xor_bind", + &signature, + &(|_: &[DataType]| Ok(Arc::new(DataType::Binary))), + &(|args: &[ColumnarValue]| { + match (&args[0], &args[1]) { + (ColumnarValue::Array(a), ColumnarValue::Array(b)) => { + let result = xor_bind_array(a.clone(), b.clone())?; + Ok(ColumnarValue::Array(result)) + } + (ColumnarValue::Scalar(ScalarValue::Binary(Some(a))), + ColumnarValue::Scalar(ScalarValue::Binary(Some(b)))) => { + let result = xor_bind_bytes(a, b); + Ok(ColumnarValue::Scalar(ScalarValue::Binary(Some(result)))) + } + _ => Err(datafusion::error::DataFusionError::Execution( + "xor_bind requires binary arguments".into() + )) + } + }), + ) +} + +// ============================================================================= +// ARRAY OPERATIONS +// ============================================================================= + +/// Compute Hamming distance for arrays +fn hamming_array(a: ArrayRef, b: ArrayRef) -> datafusion::error::Result { + let a_bin = a.as_any().downcast_ref::() + .or_else(|| { + a.as_any().downcast_ref::() + .map(|_| todo!("Convert FixedSizeBinaryArray")) + }); + let b_bin = b.as_any().downcast_ref::(); + + match (a_bin, b_bin) { + (Some(a), Some(b)) => { + let mut builder = UInt32Builder::new(); + for i in 0..a.len() { + if a.is_null(i) || b.is_null(i) { + builder.append_null(); + } else { + let dist = hamming_distance_bytes(a.value(i), b.value(i)); + builder.append_value(dist); + } + } + Ok(Arc::new(builder.finish())) + } + _ => { + // Try FixedSizeBinaryArray + let a_fixed = a.as_any().downcast_ref::(); + let b_fixed = b.as_any().downcast_ref::(); + + match (a_fixed, b_fixed) { + (Some(a), Some(b)) => { + let mut builder = UInt32Builder::new(); + for i in 0..a.len() { + if a.is_null(i) || b.is_null(i) { + builder.append_null(); + } else { + let dist = hamming_distance_bytes(a.value(i), b.value(i)); + builder.append_value(dist); + } + } + Ok(Arc::new(builder.finish())) + } + _ => Err(datafusion::error::DataFusionError::Execution( + "hamming_array requires binary arrays".into() + )) + } + } + } +} + +/// Compute similarity for arrays +fn similarity_array(a: ArrayRef, b: ArrayRef) -> datafusion::error::Result { + let a_fixed = a.as_any().downcast_ref::(); + let b_fixed = b.as_any().downcast_ref::(); + + match (a_fixed, b_fixed) { + (Some(a), Some(b)) => { + let mut builder = Float32Builder::new(); + for i in 0..a.len() { + if a.is_null(i) || b.is_null(i) { + builder.append_null(); + } else { + let dist = hamming_distance_bytes(a.value(i), b.value(i)); + let sim = hamming_similarity(dist); + builder.append_value(sim); + } + } + Ok(Arc::new(builder.finish())) + } + _ => { + // Try regular binary + let a_bin = a.as_any().downcast_ref::(); + let b_bin = b.as_any().downcast_ref::(); + + match (a_bin, b_bin) { + (Some(a), Some(b)) => { + let mut builder = Float32Builder::new(); + for i in 0..a.len() { + if a.is_null(i) || b.is_null(i) { + builder.append_null(); + } else { + let dist = hamming_distance_bytes(a.value(i), b.value(i)); + let sim = hamming_similarity(dist); + builder.append_value(sim); + } + } + Ok(Arc::new(builder.finish())) + } + _ => Err(datafusion::error::DataFusionError::Execution( + "similarity_array requires binary arrays".into() + )) + } + } + } +} + +/// Compute popcount for array +fn popcount_array(arr: ArrayRef) -> datafusion::error::Result { + let u64_arr = arr.as_any().downcast_ref::(); + + match u64_arr { + Some(arr) => { + let mut builder = UInt32Builder::new(); + for i in 0..arr.len() { + if arr.is_null(i) { + builder.append_null(); + } else { + builder.append_value(arr.value(i).count_ones()); + } + } + Ok(Arc::new(builder.finish())) + } + None => { + // Try binary array - count all bits + let bin_arr = arr.as_any().downcast_ref::(); + match bin_arr { + Some(arr) => { + let mut builder = UInt32Builder::new(); + for i in 0..arr.len() { + if arr.is_null(i) { + builder.append_null(); + } else { + let count: u32 = arr.value(i).iter().map(|b| b.count_ones()).sum(); + builder.append_value(count); + } + } + Ok(Arc::new(builder.finish())) + } + None => Err(datafusion::error::DataFusionError::Execution( + "popcount_array requires uint64 or binary array".into() + )) + } + } + } +} + +/// XOR bind two arrays +fn xor_bind_array(a: ArrayRef, b: ArrayRef) -> datafusion::error::Result { + let a_bin = a.as_any().downcast_ref::(); + let b_bin = b.as_any().downcast_ref::(); + + match (a_bin, b_bin) { + (Some(a), Some(b)) => { + let mut builder = BinaryBuilder::new(); + for i in 0..a.len() { + if a.is_null(i) || b.is_null(i) { + builder.append_null(); + } else { + let result = xor_bind_bytes(a.value(i), b.value(i)); + builder.append_value(&result); + } + } + Ok(Arc::new(builder.finish())) + } + _ => { + // Try FixedSizeBinaryArray + let a_fixed = a.as_any().downcast_ref::(); + let b_fixed = b.as_any().downcast_ref::(); + + match (a_fixed, b_fixed) { + (Some(a), Some(b)) => { + let size = a.value_length() as usize; + let mut builder = FixedSizeBinaryBuilder::new(size as i32); + for i in 0..a.len() { + if a.is_null(i) || b.is_null(i) { + builder.append_null(); + } else { + let result = xor_bind_bytes(a.value(i), b.value(i)); + builder.append_value(&result)?; + } + } + Ok(Arc::new(builder.finish())) + } + _ => Err(datafusion::error::DataFusionError::Execution( + "xor_bind_array requires binary arrays".into() + )) + } + } + } +} + +// ============================================================================= +// QUERY BUILDER +// ============================================================================= + +/// Helper for building SQL queries programmatically +pub struct QueryBuilder { + select: Vec, + from: String, + joins: Vec, + where_clauses: Vec, + order_by: Vec, + limit: Option, + offset: Option, +} + +impl QueryBuilder { + /// Start building a query from a table + pub fn from(table: &str) -> Self { + Self { + select: Vec::new(), + from: table.to_string(), + joins: Vec::new(), + where_clauses: Vec::new(), + order_by: Vec::new(), + limit: None, + offset: None, + } + } + + /// Add SELECT columns + pub fn select(mut self, columns: &[&str]) -> Self { + self.select.extend(columns.iter().map(|s| s.to_string())); + self + } + + /// Add a JOIN clause + pub fn join(mut self, join_type: &str, table: &str, on: &str) -> Self { + self.joins.push(format!("{} JOIN {} ON {}", join_type, table, on)); + self + } + + /// Add a WHERE condition + pub fn where_clause(mut self, condition: &str) -> Self { + self.where_clauses.push(condition.to_string()); + self + } + + /// Add Hamming distance filter + pub fn where_hamming_lt(mut self, col: &str, param: &str, max_dist: u32) -> Self { + self.where_clauses.push(format!("hamming({}, {}) < {}", col, param, max_dist)); + self + } + + /// Add similarity filter + pub fn where_similar(mut self, col: &str, param: &str, min_sim: f32) -> Self { + let max_dist = ((1.0 - min_sim) * DIM as f32) as u32; + self.where_clauses.push(format!("hamming({}, {}) < {}", col, param, max_dist)); + self + } + + /// Add ORDER BY + pub fn order_by(mut self, expr: &str, desc: bool) -> Self { + let dir = if desc { "DESC" } else { "ASC" }; + self.order_by.push(format!("{} {}", expr, dir)); + self + } + + /// Set LIMIT + pub fn limit(mut self, n: u64) -> Self { + self.limit = Some(n); + self + } + + /// Set OFFSET + pub fn offset(mut self, n: u64) -> Self { + self.offset = Some(n); + self + } + + /// Build the SQL query string + pub fn build(self) -> String { + let mut sql = String::new(); + + // SELECT + let cols = if self.select.is_empty() { + "*".to_string() + } else { + self.select.join(", ") + }; + sql.push_str(&format!("SELECT {}\n", cols)); + + // FROM + sql.push_str(&format!("FROM {}\n", self.from)); + + // JOINs + for join in self.joins { + sql.push_str(&join); + sql.push('\n'); + } + + // WHERE + if !self.where_clauses.is_empty() { + sql.push_str(&format!("WHERE {}\n", self.where_clauses.join(" AND "))); + } + + // ORDER BY + if !self.order_by.is_empty() { + sql.push_str(&format!("ORDER BY {}\n", self.order_by.join(", "))); + } + + // LIMIT + if let Some(n) = self.limit { + sql.push_str(&format!("LIMIT {}\n", n)); + } + + // OFFSET + if let Some(n) = self.offset { + sql.push_str(&format!("OFFSET {}\n", n)); + } + + sql + } +} + +// ============================================================================= +// TESTS +// ============================================================================= + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_hamming_distance() { + let a = vec![0xFF, 0x00, 0xFF, 0x00]; + let b = vec![0x00, 0xFF, 0x00, 0xFF]; + // All bits differ: 4 bytes * 8 bits = 32 + assert_eq!(hamming_distance_bytes(&a, &b), 32); + + let c = vec![0xFF, 0xFF, 0xFF, 0xFF]; + let d = vec![0xFF, 0xFF, 0xFF, 0xFF]; + assert_eq!(hamming_distance_bytes(&c, &d), 0); + } + + #[test] + fn test_similarity() { + let sim = hamming_similarity(0); + assert_eq!(sim, 1.0); + + let sim = hamming_similarity(DIM as u32); + assert_eq!(sim, 0.0); + + let sim = hamming_similarity(DIM as u32 / 2); + assert!((sim - 0.5).abs() < 0.01); + } + + #[test] + fn test_query_builder() { + let sql = QueryBuilder::from("nodes") + .select(&["id", "label", "content"]) + .where_clause("label = 'Thought'") + .where_similar("fingerprint", "$fp", 0.8) + .order_by("created_at", true) + .limit(10) + .build(); + + assert!(sql.contains("SELECT id, label, content")); + assert!(sql.contains("FROM nodes")); + assert!(sql.contains("label = 'Thought'")); + assert!(sql.contains("hamming(fingerprint, $fp)")); + assert!(sql.contains("ORDER BY created_at DESC")); + assert!(sql.contains("LIMIT 10")); + } + + #[tokio::test] + async fn test_sql_engine_basic() { + let engine = SqlEngine::new().await; + + // Register a simple test table + let schema = arrow::datatypes::Schema::new(vec![ + arrow::datatypes::Field::new("id", DataType::Int64, false), + arrow::datatypes::Field::new("name", DataType::Utf8, false), + ]); + + let ids: Int64Array = vec![1, 2, 3].into_iter().map(Some).collect(); + let names: StringArray = vec!["a", "b", "c"].into_iter().map(Some).collect(); + + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(ids), Arc::new(names)], + ).unwrap(); + + let mut engine = engine; + engine.register_table("test", vec![batch]).unwrap(); + + let results = engine.execute("SELECT * FROM test WHERE id > 1").await.unwrap(); + assert_eq!(results[0].num_rows(), 2); + } +} From ee257f99468926bd33d2979b6ec2a8f916724618 Mon Sep 17 00:00:00 2001 From: AdaWorldAPI Date: Fri, 30 Jan 2026 08:25:39 +0100 Subject: [PATCH 4/9] Update storage/mod.rs to export LanceStore --- src/storage/mod.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b5d909a..1381641 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,8 +1,26 @@ //! Storage layer - LanceDB integration +//! +//! Provides persistent storage via LanceDB with: +//! - Columnar Arrow format +//! - Native vector ANN indices +//! - Zero-copy operations +//! - Versioned datasets mod database; +mod lance; pub use database::Database; +pub use lance::{ + LanceStore, + NodeRecord, + EdgeRecord, + nodes_schema, + edges_schema, + sessions_schema, + FINGERPRINT_BYTES, + EMBEDDING_DIM, + THINKING_STYLE_DIM, +}; #[derive(thiserror::Error, Debug)] pub enum StorageError { @@ -10,4 +28,8 @@ pub enum StorageError { Io(#[from] std::io::Error), #[error("Not found: {0}")] NotFound(String), + #[error("Lance error: {0}")] + Lance(String), + #[error("Arrow error: {0}")] + Arrow(String), } From 7d46bdf6211886b19882b6a4e17d70c3c01f3f6d Mon Sep 17 00:00:00 2001 From: AdaWorldAPI Date: Fri, 30 Jan 2026 08:25:45 +0100 Subject: [PATCH 5/9] Update query/mod.rs to export Cypher and DataFusion --- src/query/mod.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index d265ec4..a7351ee 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -1,8 +1,25 @@ -//! Query types +//! Query layer - SQL, Cypher, and execution +//! +//! Provides unified query interface: +//! - SQL via DataFusion +//! - Cypher via transpilation to recursive CTEs +//! - Custom UDFs for Hamming/similarity operations mod builder; +mod cypher; +mod datafusion; pub use builder::{Query, QueryResult}; +pub use cypher::{ + CypherParser, + CypherTranspiler, + CypherQuery, + cypher_to_sql, +}; +pub use datafusion::{ + SqlEngine, + QueryBuilder, +}; #[derive(thiserror::Error, Debug)] pub enum QueryError { @@ -10,4 +27,6 @@ pub enum QueryError { Parse(String), #[error("Execution error: {0}")] Execution(String), + #[error("Transpile error: {0}")] + Transpile(String), } From 1f7e8e848bb433cf3d0373b779183c76f3a124be Mon Sep 17 00:00:00 2001 From: AdaWorldAPI Date: Fri, 30 Jan 2026 08:25:52 +0100 Subject: [PATCH 6/9] Update Database to use LanceStore and SqlEngine --- src/storage/database.rs | 300 ++++++++++++++++++++++++++++++++-------- 1 file changed, 244 insertions(+), 56 deletions(-) diff --git a/src/storage/database.rs b/src/storage/database.rs index 1559c74..fbeaba9 100644 --- a/src/storage/database.rs +++ b/src/storage/database.rs @@ -1,12 +1,21 @@ //! Main Database API - unified interface for all operations +//! +//! Provides a single entry point for all LadybugDB operations: +//! - SQL queries (via DataFusion) +//! - Cypher queries (via transpilation) +//! - Vector search (via LanceDB ANN) +//! - Hamming/resonance search (via SIMD engine) +//! - Graph traversal and butterfly detection use crate::core::{Fingerprint, HammingEngine}; use crate::cognitive::Thought; use crate::nars::TruthValue; use crate::graph::{Edge, Traversal}; -use crate::query::{Query, QueryResult}; +use crate::query::{Query, QueryResult, cypher_to_sql, SqlEngine, QueryBuilder}; +use crate::storage::{LanceStore, NodeRecord, EdgeRecord}; use crate::{Result, Error}; +use arrow::record_batch::RecordBatch; use std::path::Path; use std::sync::Arc; use parking_lot::RwLock; @@ -15,67 +24,124 @@ use parking_lot::RwLock; pub struct Database { /// Path to database path: String, - /// Hamming search engine (pre-indexed) + /// Lance storage backend + lance: Arc>, + /// SQL execution engine + sql_engine: Arc>, + /// Hamming search engine (pre-indexed, in-memory) hamming: Arc>, /// Current version (for copy-on-write) version: u64, } impl Database { - /// Open or create a database - pub fn open>(path: P) -> Result { + /// Open or create a database (async) + pub async fn open>(path: P) -> Result { let path_str = path.as_ref().to_string_lossy().to_string(); // Create directory if needed std::fs::create_dir_all(&path_str)?; + // Open Lance store + let lance = LanceStore::open(&path_str).await?; + + // Create SQL engine with Lance tables + let sql_engine = SqlEngine::with_database(&path_str).await?; + Ok(Self { path: path_str, + lance: Arc::new(tokio::sync::RwLock::new(lance)), + sql_engine: Arc::new(tokio::sync::RwLock::new(sql_engine)), hamming: Arc::new(RwLock::new(HammingEngine::new())), version: 0, }) } + /// Open synchronously (blocks on runtime) + pub fn open_sync>(path: P) -> Result { + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(Self::open(path)) + } + /// Connect to in-memory database pub fn memory() -> Self { Self { path: ":memory:".to_string(), + lance: Arc::new(tokio::sync::RwLock::new(LanceStore::memory())), + sql_engine: Arc::new(tokio::sync::RwLock::new(SqlEngine::default())), hamming: Arc::new(RwLock::new(HammingEngine::new())), version: 0, } } - // === Conventional Operations === + // ========================================================================= + // SQL OPERATIONS + // ========================================================================= /// Execute SQL query - pub fn sql(&self, query: &str) -> Result { - // TODO: Integrate with DataFusion - let _ = query; - Ok(QueryResult { - rows: vec![], - columns: vec![], - }) + pub async fn sql(&self, query: &str) -> Result> { + let engine = self.sql_engine.read().await; + engine.execute(query).await + } + + /// Execute SQL query with parameters + pub async fn sql_params( + &self, + query: &str, + params: &[(&str, datafusion::scalar::ScalarValue)], + ) -> Result> { + let engine = self.sql_engine.read().await; + engine.execute_with_params(query, params).await + } + + /// Build and execute a query + pub async fn query(&self) -> QueryBuilder { + QueryBuilder::from("nodes") } + // ========================================================================= + // CYPHER OPERATIONS + // ========================================================================= + /// Execute Cypher query (transpiled to SQL) - pub fn cypher(&self, query: &str) -> Result { - // TODO: Cypher parser + transpiler - let _ = query; - Ok(QueryResult { - rows: vec![], - columns: vec![], - }) + pub async fn cypher(&self, query: &str) -> Result> { + // Transpile Cypher to SQL + let sql = cypher_to_sql(query)?; + + // Execute via SQL engine + self.sql(&sql).await } + // ========================================================================= + // VECTOR OPERATIONS + // ========================================================================= + /// Vector similarity search (ANN) - pub fn vector_search(&self, _embedding: &[f32], _k: usize) -> Result> { - // TODO: Lance vector index - Ok(vec![]) + pub async fn vector_search( + &self, + embedding: &[f32], + k: usize, + ) -> Result> { + let mut lance = self.lance.write().await; + lance.vector_search(embedding, k, None).await + } + + /// Vector search with filter + pub async fn vector_search_filtered( + &self, + embedding: &[f32], + k: usize, + filter: &str, + ) -> Result> { + let mut lance = self.lance.write().await; + lance.vector_search(embedding, k, Some(filter)).await } - // === AGI Operations === + // ========================================================================= + // HAMMING/RESONANCE OPERATIONS + // ========================================================================= - /// Resonance search (Hamming similarity) + /// Resonance search (Hamming similarity) - in-memory indexed pub fn resonate( &self, fingerprint: &Fingerprint, @@ -89,6 +155,17 @@ impl Database { .collect() } + /// Resonance search over Lance storage + pub async fn resonate_lance( + &self, + fingerprint: &Fingerprint, + k: usize, + threshold: Option, + ) -> Result> { + let mut lance = self.lance.write().await; + lance.hamming_search(fingerprint, k, threshold).await + } + /// Resonate by content (auto-generates fingerprint) pub fn resonate_content( &self, @@ -100,61 +177,154 @@ impl Database { self.resonate(&fp, threshold, limit) } - /// Index fingerprints for resonance search + /// Index fingerprints for resonance search (in-memory) pub fn index_fingerprints(&self, fingerprints: Vec) { let mut engine = self.hamming.write(); engine.index(fingerprints); } + // ========================================================================= + // GRAPH OPERATIONS + // ========================================================================= + /// Start a graph traversal query pub fn traverse(&self, start_id: &str) -> Traversal { Traversal::from(start_id) } - /// Fork database for counterfactual reasoning - pub fn fork(&self) -> Database { - Database { - path: self.path.clone(), - hamming: Arc::clone(&self.hamming), - version: self.version + 1, - } - } - /// Detect butterfly effects (causal amplification chains) - pub fn detect_butterflies( + pub async fn detect_butterflies( &self, source_id: &str, threshold: f32, max_depth: usize, - ) -> Result, f32)>> { - // TODO: Recursive CTE query for amplification chains - let _ = (source_id, threshold, max_depth); - Ok(vec![]) + ) -> Result> { + let cypher = format!( + "MATCH (source)-[:CAUSES|AMPLIFIES*1..{}]->(target) \ + WHERE source.id = '{}' \ + RETURN target, path, amplification", + max_depth, source_id + ); + + let mut sql = cypher_to_sql(&cypher)?; + sql.push_str(&format!("\n AND t.amplification > {}", threshold)); + + self.sql(&sql).await } - // === CRUD Operations === + /// Impact analysis for a potential change + pub async fn impact_analysis(&self, change_id: &str) -> Result { + // Get all affected nodes + let affected = self.cypher(&format!( + "MATCH (source)-[:CAUSES|AMPLIFIES|ENABLES*1..10]->(affected) \ + WHERE source.id = '{}' \ + RETURN affected", + change_id + )).await?; + + // Get butterfly effects + let butterflies = self.detect_butterflies(change_id, 5.0, 10).await?; + + let total_affected = affected.iter().map(|b| b.num_rows()).sum(); + let butterfly_count = butterflies.iter().map(|b| b.num_rows()).sum(); + + Ok(ImpactReport { + total_affected, + butterfly_count, + affected_batches: affected, + butterfly_batches: butterflies, + }) + } - /// Add a thought - pub fn add_thought(&self, thought: &Thought) -> Result { - // TODO: Lance insert - Ok(thought.id.clone()) + // ========================================================================= + // CRUD OPERATIONS + // ========================================================================= + + /// Add a node + pub async fn add_node(&self, node: NodeRecord) -> Result<()> { + let mut lance = self.lance.write().await; + lance.insert_node(&node).await + } + + /// Add multiple nodes + pub async fn add_nodes(&self, nodes: &[NodeRecord]) -> Result<()> { + let mut lance = self.lance.write().await; + lance.insert_nodes(nodes).await } /// Add an edge - pub fn add_edge(&self, edge: &Edge) -> Result<()> { - // TODO: Lance insert - let _ = edge; - Ok(()) + pub async fn add_edge(&self, edge: EdgeRecord) -> Result<()> { + let mut lance = self.lance.write().await; + lance.insert_edge(&edge).await + } + + /// Get a node by ID + pub async fn get_node(&self, id: &str) -> Result> { + let mut lance = self.lance.write().await; + lance.get_node(id).await } - /// Get thought by ID - pub fn get_thought(&self, id: &str) -> Result> { - // TODO: Lance lookup - let _ = id; - Ok(None) + /// Get edges from a node + pub async fn get_edges_from(&self, from_id: &str) -> Result> { + let mut lance = self.lance.write().await; + lance.get_edges_from(from_id).await } - // === Database Info === + /// Add a thought (convenience method) + pub async fn add_thought(&self, thought: &Thought) -> Result { + let node = NodeRecord::new(&thought.id, "Thought") + .with_qidx(thought.qidx) + .with_content(&thought.content); + + // Add fingerprint if available + let node = if let Some(ref fp) = thought.fingerprint { + node.with_fingerprint(fp) + } else { + node + }; + + self.add_node(node).await?; + Ok(thought.id.clone()) + } + + /// Create a CAUSES edge + pub async fn causes(&self, from_id: &str, to_id: &str, amplification: f32) -> Result<()> { + let edge = EdgeRecord::new(from_id, to_id, "CAUSES") + .with_amplification(amplification); + self.add_edge(edge).await + } + + /// Create an ENABLES edge + pub async fn enables(&self, from_id: &str, to_id: &str) -> Result<()> { + let edge = EdgeRecord::new(from_id, to_id, "ENABLES"); + self.add_edge(edge).await + } + + /// Create an AMPLIFIES edge + pub async fn amplifies(&self, from_id: &str, to_id: &str, factor: f32) -> Result<()> { + let edge = EdgeRecord::new(from_id, to_id, "AMPLIFIES") + .with_amplification(factor); + self.add_edge(edge).await + } + + // ========================================================================= + // COUNTERFACTUAL OPERATIONS + // ========================================================================= + + /// Fork database for counterfactual reasoning + pub fn fork(&self) -> Database { + Database { + path: self.path.clone(), + lance: Arc::clone(&self.lance), + sql_engine: Arc::clone(&self.sql_engine), + hamming: Arc::clone(&self.hamming), + version: self.version + 1, + } + } + + // ========================================================================= + // DATABASE INFO + // ========================================================================= /// Database path pub fn path(&self) -> &str { @@ -166,15 +336,24 @@ impl Database { self.version } - /// Number of indexed fingerprints + /// Number of indexed fingerprints (in-memory) pub fn fingerprint_count(&self) -> usize { self.hamming.read().len() } } +/// Impact analysis report +#[derive(Debug)] +pub struct ImpactReport { + pub total_affected: usize, + pub butterfly_count: usize, + pub affected_batches: Vec, + pub butterfly_batches: Vec, +} + // Convenience function pub fn open>(path: P) -> Result { - Database::open(path) + Database::open_sync(path) } #[cfg(test)] @@ -213,4 +392,13 @@ mod tests { assert_eq!(forked.version(), db.version() + 1); } + + #[tokio::test] + async fn test_cypher_transpile() { + let cypher = "MATCH (a:Thought)-[:CAUSES]->(b:Thought) RETURN b"; + let sql = cypher_to_sql(cypher).unwrap(); + + assert!(sql.contains("SELECT")); + assert!(sql.contains("JOIN edges")); + } } From 6a28172e1871c951e6da6f4c7560621415f6ebc5 Mon Sep 17 00:00:00 2001 From: AdaWorldAPI Date: Fri, 30 Jan 2026 08:25:58 +0100 Subject: [PATCH 7/9] Add DIM constants to core module --- src/core/mod.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/core/mod.rs b/src/core/mod.rs index 435be79..0306a72 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -12,3 +12,12 @@ pub use buffer::BufferPool; /// Dense embedding vector pub type Embedding = Vec; + +/// Fingerprint dimension in bits (10K VSA standard) +pub const DIM: usize = 10_000; + +/// Fingerprint dimension in u64 words +pub const DIM_U64: usize = 157; // ceil(10000/64) + +/// Last word mask for 10K bits (only 16 bits used in last u64) +pub const LAST_MASK: u64 = (1 << 16) - 1; From f0b5efe06b2b36ce341fe585f78097c7d43075f8 Mon Sep 17 00:00:00 2001 From: AdaWorldAPI Date: Fri, 30 Jan 2026 08:26:04 +0100 Subject: [PATCH 8/9] Update lib.rs with new exports and error types --- src/lib.rs | 94 ++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 63 insertions(+), 31 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 804eabc..442e37a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,37 +6,48 @@ //! //! ## Quick Start //! -//! ```rust -//! use ladybug::{Database, Thought}; +//! ```rust,ignore +//! use ladybug::{Database, Thought, NodeRecord, cypher_to_sql}; //! //! // Open database -//! let db = Database::open("./mydb")?; +//! let db = Database::open("./mydb").await?; //! -//! // Conventional: SQL -//! let results = db.sql("SELECT * FROM thoughts WHERE confidence > 0.7")?; +//! // SQL queries (via DataFusion) +//! let results = db.sql("SELECT * FROM nodes WHERE label = 'Thought'").await?; //! -//! // Conventional: Vector search -//! let similar = db.vector_search(&embedding, 10)?; +//! // Cypher queries (auto-transpiled to recursive CTEs) +//! let paths = db.cypher("MATCH (a)-[:CAUSES*1..5]->(b) RETURN b").await?; //! -//! // AGI: Resonance (Hamming similarity on 10K-bit fingerprints) -//! let resonant = db.resonate(&fingerprint, 0.7)?; +//! // Vector search (via LanceDB ANN) +//! let similar = db.vector_search(&embedding, 10).await?; //! -//! // AGI: Graph traversal with amplification -//! let chains = db.query() -//! .from("config_change") -//! .causes() -//! .amplifies(2.0) -//! .depth(1..=5) -//! .execute()?; +//! // Resonance search (Hamming similarity on 10K-bit fingerprints) +//! let resonant = db.resonate(&fingerprint, 0.7, 10); //! -//! // AGI: Counterfactual reasoning -//! let what_if = db.fork() -//! .apply(|w| w.remove("feature_flag")) -//! .propagate() -//! .diff()?; +//! // Butterfly detection (causal amplification chains) +//! let butterflies = db.detect_butterflies("change_id", 5.0, 10).await?; //! -//! // AGI: NARS inference -//! let conclusion = premise1.infer::(&premise2)?; +//! // Counterfactual reasoning +//! let forked = db.fork(); +//! ``` +//! +//! ## Architecture +//! +//! ```text +//! ┌─────────────────────────────────────────────────────────────────┐ +//! │ LADYBUGDB │ +//! ├─────────────────────────────────────────────────────────────────┤ +//! │ │ +//! │ SQL → DataFusion + Custom UDFs (hamming, similarity) │ +//! │ Cypher → Parser + Transpiler → Recursive CTEs │ +//! │ Vector → LanceDB native ANN indices │ +//! │ Hamming → AVX-512 SIMD (65M comparisons/sec) │ +//! │ NARS → Non-Axiomatic Reasoning System │ +//! │ │ +//! │ Storage: Lance columnar format, zero-copy Arrow │ +//! │ Indices: IVF-PQ (vector), scalar (labels), Hamming (custom) │ +//! │ │ +//! └─────────────────────────────────────────────────────────────────┘ //! ``` #![cfg_attr(feature = "simd", feature(portable_simd))] @@ -55,22 +66,22 @@ pub mod fabric; pub mod python; // Re-exports for convenience -pub use crate::core::{Fingerprint, Embedding, VsaOps}; +pub use crate::core::{Fingerprint, Embedding, VsaOps, DIM, DIM_U64}; pub use crate::cognitive::{Thought, Concept, Belief, ThinkingStyle}; pub use crate::nars::{TruthValue, Evidence, Deduction, Induction, Abduction}; pub use crate::graph::{Edge, EdgeType, Traversal}; pub use crate::world::{World, Counterfactual, Change}; -pub use crate::query::{Query, QueryResult}; -pub use crate::storage::Database; +pub use crate::query::{Query, QueryResult, cypher_to_sql, SqlEngine, QueryBuilder}; +pub use crate::storage::{Database, LanceStore, NodeRecord, EdgeRecord}; /// Crate-level error type #[derive(thiserror::Error, Debug)] pub enum Error { #[error("Storage error: {0}")] - Storage(#[from] storage::StorageError), + Storage(String), #[error("Query error: {0}")] - Query(#[from] query::QueryError), + Query(String), #[error("Invalid fingerprint: expected {expected} bytes, got {got}")] InvalidFingerprint { expected: usize, got: usize }, @@ -83,6 +94,30 @@ pub enum Error { #[error("IO error: {0}")] Io(#[from] std::io::Error), + + #[error("Lance error: {0}")] + Lance(#[from] lance::Error), + + #[error("Arrow error: {0}")] + Arrow(#[from] arrow::error::ArrowError), + + #[error("DataFusion error: {0}")] + DataFusion(#[from] datafusion::error::DataFusionError), + + #[error("Tokio error: {0}")] + Tokio(#[from] tokio::io::Error), +} + +impl From for Error { + fn from(e: storage::StorageError) -> Self { + Error::Storage(e.to_string()) + } +} + +impl From for Error { + fn from(e: query::QueryError) -> Self { + Error::Query(e.to_string()) + } } pub type Result = std::result::Result; @@ -94,6 +129,3 @@ pub const VERSION: &str = env!("CARGO_PKG_VERSION"); pub const FINGERPRINT_BITS: usize = 10_000; pub const FINGERPRINT_U64: usize = 157; // ceil(10000/64) pub const FINGERPRINT_BYTES: usize = FINGERPRINT_U64 * 8; - -/// Default embedding dimension (for dense vectors) -pub const EMBEDDING_DIM: usize = 1024; From 2ae0ee1a110886166874254a01c2e9b1ed492528 Mon Sep 17 00:00:00 2001 From: AdaWorldAPI Date: Fri, 30 Jan 2026 08:26:11 +0100 Subject: [PATCH 9/9] Add tokio, futures, chrono, hex dependencies --- Cargo.toml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index eb3c712..aebdd25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,10 @@ arrow-schema = "50" datafusion = "35" sqlparser = "0.43" +# Async +tokio = { version = "1.35", features = ["full"] } +futures = "0.3" + # Parallelism rayon = { version = "1.8", optional = true } parking_lot = "0.12" @@ -44,6 +48,8 @@ thiserror = "1.0" uuid = { version = "1.6", features = ["v4"] } hashbrown = "0.14" smallvec = "1.11" +chrono = "0.4" +hex = "0.4" # Python pyo3 = { version = "0.20", optional = true, features = ["extension-module"] }