From 7fb6c7eb2c8bf92c4bfbec1fd497154ac9a68f15 Mon Sep 17 00:00:00 2001 From: dev-lew Date: Wed, 18 Feb 2026 15:06:27 -0500 Subject: [PATCH] Implement comment removal and tests --- pgdog/src/frontend/router/parser/cache/ast.rs | 11 +- .../router/parser/cache/cache_impl.rs | 41 +++- .../src/frontend/router/parser/cache/test.rs | 2 +- pgdog/src/frontend/router/parser/comment.rs | 201 +++++++++++++++--- .../frontend/router/parser/query/explain.rs | 7 + .../src/frontend/router/parser/query/show.rs | 4 + .../frontend/router/parser/query/test/mod.rs | 40 +++- 7 files changed, 268 insertions(+), 38 deletions(-) diff --git a/pgdog/src/frontend/router/parser/cache/ast.rs b/pgdog/src/frontend/router/parser/cache/ast.rs index 2be0e0fbf..c80c560e3 100644 --- a/pgdog/src/frontend/router/parser/cache/ast.rs +++ b/pgdog/src/frontend/router/parser/cache/ast.rs @@ -7,9 +7,7 @@ use std::{collections::HashSet, ops::Deref}; use parking_lot::Mutex; use std::sync::Arc; -use super::super::{ - comment::comment, Error, Route, Shard, StatementRewrite, StatementRewriteContext, Table, -}; +use super::super::{Error, Route, Shard, StatementRewrite, StatementRewriteContext, Table}; use super::{Fingerprint, Stats}; use crate::backend::schema::Schema; use crate::frontend::router::parser::rewrite::statement::RewritePlan; @@ -72,6 +70,8 @@ impl Ast { schema: &ShardingSchema, db_schema: &Schema, prepared_statements: &mut PreparedStatements, + comment_shard: Option, + comment_role: Option, user: &str, search_path: Option<&ParameterValue>, ) -> Result { @@ -81,7 +81,6 @@ impl Ast { QueryParserEngine::PgQueryRaw => parse_raw(query), } .map_err(Error::PgQuery)?; - let (comment_shard, comment_role) = comment(query, schema)?; let fingerprint = Fingerprint::new(query, schema.query_parser_engine).map_err(Error::PgQuery)?; @@ -125,12 +124,16 @@ impl Ast { query: &BufferedQuery, ctx: &super::AstContext<'_>, prepared_statements: &mut PreparedStatements, + shard: Option, + role: Option, ) -> Result { Self::new( query, &ctx.sharding_schema, &ctx.db_schema, prepared_statements, + shard, + role, ctx.user, ctx.search_path, ) diff --git a/pgdog/src/frontend/router/parser/cache/cache_impl.rs b/pgdog/src/frontend/router/parser/cache/cache_impl.rs index 4c50e61f8..4b28343b3 100644 --- a/pgdog/src/frontend/router/parser/cache/cache_impl.rs +++ b/pgdog/src/frontend/router/parser/cache/cache_impl.rs @@ -2,6 +2,7 @@ use lru::LruCache; use once_cell::sync::Lazy; use pg_query::normalize; use pgdog_config::QueryParserEngine; +use std::borrow::Cow; use std::collections::HashMap; use std::time::Duration; @@ -11,6 +12,7 @@ use tracing::debug; use super::super::{Error, Route}; use super::{Ast, AstContext}; +use crate::frontend::router::parser::comment; use crate::frontend::{BufferedQuery, PreparedStatements}; static CACHE: Lazy = Lazy::new(Cache::new); @@ -97,6 +99,10 @@ impl Cache { /// Parse a statement by either getting it from cache /// or using pg_query parser. /// + /// In the event of cache miss, we retry after removing all comments except + /// for pgdog metadata. We retain it for correctness, since a query with + /// that metadata must not map to an identical query without it. + /// /// N.B. There is a race here that allows multiple threads to /// parse the same query. That's better imo than locking the data structure /// while we parse the query. @@ -118,12 +124,38 @@ impl Cache { } } + let (maybe_shard, maybe_role, maybe_filtered_query) = + comment::parse_comment(&query, &ctx.sharding_schema)?; + + let query_to_cache: Cow<'_, str>; + + if let Some(filtered_query) = maybe_filtered_query { + query_to_cache = Cow::Owned(filtered_query); + + // Check cache again after removing comments from query + let mut guard = self.inner.lock(); + + let ast = guard.queries.get_mut(&*query_to_cache).map(|entry| { + entry.stats.lock().hits += 1; + entry.clone() + }); + + if let Some(ast) = ast { + guard.stats.hits += 1; + return Ok(ast); + } + } else { + query_to_cache = Cow::Borrowed(query.query()); + } + // Parse query without holding lock. - let entry = Ast::with_context(query, ctx, prepared_statements)?; + let entry = Ast::with_context(query, ctx, prepared_statements, maybe_shard, maybe_role)?; let parse_time = entry.stats.lock().parse_time; let mut guard = self.inner.lock(); - guard.queries.put(query.query().to_string(), entry.clone()); + guard + .queries + .put(query_to_cache.into_owned(), entry.clone()); guard.stats.misses += 1; guard.stats.parse_time += parse_time; @@ -138,7 +170,10 @@ impl Cache { ctx: &AstContext<'_>, prepared_statements: &mut PreparedStatements, ) -> Result { - let mut entry = Ast::with_context(query, ctx, prepared_statements)?; + let (maybe_shard, maybe_role, _) = comment::parse_comment(&query, &ctx.sharding_schema)?; + + let mut entry = + Ast::with_context(query, ctx, prepared_statements, maybe_shard, maybe_role)?; entry.cached = false; let parse_time = entry.stats.lock().parse_time; diff --git a/pgdog/src/frontend/router/parser/cache/test.rs b/pgdog/src/frontend/router/parser/cache/test.rs index f996b9169..c15641c86 100644 --- a/pgdog/src/frontend/router/parser/cache/test.rs +++ b/pgdog/src/frontend/router/parser/cache/test.rs @@ -121,7 +121,7 @@ fn test_tables_list() { "DELETE FROM private_schema.test", "DROP TABLE private_schema.test", ] { - let ast = Ast::new(&BufferedQuery::Query(Query::new(q)), &ShardingSchema::default(), &db_schema, &mut prepared_statements, "", None).unwrap(); + let ast = Ast::new(&BufferedQuery::Query(Query::new(q)), &ShardingSchema::default(), &db_schema, &mut prepared_statements, None, None, "", None).unwrap(); let tables = ast.tables(); println!("{:?}", tables); } diff --git a/pgdog/src/frontend/router/parser/comment.rs b/pgdog/src/frontend/router/parser/comment.rs index a87883adb..93196a2b9 100644 --- a/pgdog/src/frontend/router/parser/comment.rs +++ b/pgdog/src/frontend/router/parser/comment.rs @@ -1,4 +1,5 @@ use once_cell::sync::Lazy; +use pg_query::protobuf::ScanToken; use pg_query::scan_raw; use pg_query::{protobuf::Token, scan}; use pgdog_config::QueryParserEngine; @@ -24,23 +25,26 @@ fn get_matched_value<'a>(caps: &'a regex::Captures<'a>) -> Option<&'a str> { .map(|m| m.as_str()) } -/// Extract shard number from a comment. +/// Extract shard number from a comment. Additionally returns the entire +/// comment string if it exists. /// -/// Comment style uses the C-style comments (not SQL comments!) +/// Comment style for the shard metadata uses the C-style comments (not SQL comments!) /// as to allow the comment to appear anywhere in the query. /// /// See [`SHARD`] and [`SHARDING_KEY`] for the style of comment we expect. /// -pub fn comment( +pub fn parse_comment( query: &str, schema: &ShardingSchema, -) -> Result<(Option, Option), Error> { +) -> Result<(Option, Option, Option), Error> { let tokens = match schema.query_parser_engine { QueryParserEngine::PgQueryProtobuf => scan(query), QueryParserEngine::PgQueryRaw => scan_raw(query), } .map_err(Error::PgQuery)?; + let mut shard = None; let mut role = None; + let mut filtered_query = None; for token in tokens.tokens.iter() { if token.token == Token::CComment as i32 { @@ -57,33 +61,138 @@ pub fn comment( if let Some(cap) = SHARDING_KEY.captures(comment) { if let Some(sharding_key) = get_matched_value(&cap) { if let Some(schema) = schema.schemas.get(Some(sharding_key.into())) { - return Ok((Some(schema.shard().into()), role)); + shard = Some(schema.shard().into()); + } else { + let ctx = ContextBuilder::infer_from_from_and_config(sharding_key, schema)? + .shards(schema.shards) + .build()?; + shard = Some(ctx.apply()?); } - let ctx = ContextBuilder::infer_from_from_and_config(sharding_key, schema)? - .shards(schema.shards) - .build()?; - return Ok((Some(ctx.apply()?), role)); } } if let Some(cap) = SHARD.captures(comment) { - if let Some(shard) = cap.get(1) { - return Ok(( - Some( - shard - .as_str() - .parse::() - .ok() - .map(Shard::Direct) - .unwrap_or(Shard::All), - ), - role, - )); + if let Some(s) = cap.get(1) { + shard = Some( + s.as_str() + .parse::() + .ok() + .map(Shard::Direct) + .unwrap_or(Shard::All), + ); } } } } - Ok((None, role)) + if has_comments(&tokens.tokens) { + filtered_query = Some(remove_comments( + query, + &tokens.tokens, + Some(&[&SHARD, &*SHARDING_KEY, &ROLE]), + )?); + } + + Ok((shard, role, filtered_query)) +} + +pub fn has_comments(tokenized_query: &Vec) -> bool { + tokenized_query + .iter() + .any(|st| st.token == Token::CComment as i32 || st.token == Token::SqlComment as i32) +} + +pub fn remove_comments( + query: &str, + tokenized_query: &Vec, + except: Option<&[&Regex]>, +) -> Result { + let mut cursor = 0; + let mut out = String::with_capacity(query.len()); + let mut metadata = Vec::with_capacity(3); + let mut has_comment = false; + + for st in tokenized_query { + let start = st.start as usize; + let end = st.end as usize; + + out.push_str(&query[cursor..start]); + + match st.token { + t if t == Token::CComment as i32 => { + has_comment = true; + + let comment = &query[start..end]; + + if let Some(except) = except { + let m = keep_only_matching(comment, except); + + if !m.is_empty() { + metadata.push(m.to_string()); + } + } + } + _ => { + out.push_str(&query[start..end]); + } + } + + cursor = end; + } + + if cursor < query.len() { + out.push_str(&query[cursor..]); + } + + if has_comment { + return Ok(normalize_query(&out, &mut metadata)); + } + + Ok(out) +} + +/// Prepends metadata comments and removes duplicate whitespace +/// that likely appear during comment removal +fn normalize_query(query: &str, metadata: &mut Vec) -> String { + let mut result = String::with_capacity(query.len()); + + query.split_whitespace().for_each(|s| { + if !result.is_empty() { + result.push(' '); + } + + result.push_str(s); + }); + + // Special case for when a comment is at the end of a query, + if result.ends_with(" ;") { + result.truncate(result.len() - 2); + result.push(';'); + } + + if !metadata.is_empty() { + metadata.sort_unstable(); + + let metadata_str = &mut metadata.join(" "); + + metadata_str.insert_str(0, "/* "); + metadata_str.push_str(" */ "); + + result.insert_str(0, metadata_str); + } + + result +} + +fn keep_only_matching(comment: &str, regs: &[&Regex]) -> String { + let mut out = String::new(); + + for reg in regs { + for m in reg.find_iter(comment) { + out.push_str(m.as_str()); + } + } + + out } #[cfg(test)] @@ -166,7 +275,7 @@ mod tests { }; let query = "SELECT * FROM users /* pgdog_role: primary */"; - let result = comment(query, &schema).unwrap(); + let result = parse_comment(query, &schema).unwrap(); assert_eq!(result.1, Some(Role::Primary)); } @@ -181,11 +290,47 @@ mod tests { }; let query = "SELECT * FROM users /* pgdog_role: replica pgdog_shard: 2 */"; - let result = comment(query, &schema).unwrap(); + let result = parse_comment(query, &schema).unwrap(); assert_eq!(result.0, Some(Shard::Direct(2))); assert_eq!(result.1, Some(Role::Replica)); } + #[test] + fn test_remove_comments_no_exceptions() { + let query = "SELECT * FROM table /* comment */ WHERE id = 1"; + let tokens = scan(query).unwrap().tokens; + + let result = remove_comments(query, &tokens, None).unwrap(); + + assert_eq!(result, "SELECT * FROM table WHERE id = 1"); + } + + #[test] + fn test_remove_comments_with_exceptions() { + let query = "SELECT /* comment */ * FROM table /* pgdog_shard: 4 comment */ WHERE id = 1"; + let tokens = scan(query).unwrap().tokens; + + let result = remove_comments(query, &tokens, Some(&[&SHARD])).unwrap(); + + assert_eq!( + result, + "/* pgdog_shard: 4 */ SELECT * FROM table WHERE id = 1" + ); + } + + #[test] + fn test_remove_comments_multiple_metadata() { + let query = "SELECT 1 /* pgdog_shard: 4 */ + 2 /* pgdog_role: primary */;"; + let tokens = scan(query).unwrap().tokens; + + let result = remove_comments(query, &tokens, Some(&[&ROLE, &SHARD])).unwrap(); + + assert_eq!( + result, + "/* pgdog_role: primary pgdog_shard: 4 */ SELECT 1 + 2;" + ); + } + #[test] fn test_replica_role_detection() { use crate::backend::ShardedTables; @@ -197,7 +342,7 @@ mod tests { }; let query = "SELECT * FROM users /* pgdog_role: replica */"; - let result = comment(query, &schema).unwrap(); + let result = parse_comment(query, &schema).unwrap(); assert_eq!(result.1, Some(Role::Replica)); } @@ -212,7 +357,7 @@ mod tests { }; let query = "SELECT * FROM users /* pgdog_role: invalid */"; - let result = comment(query, &schema).unwrap(); + let result = parse_comment(query, &schema).unwrap(); assert_eq!(result.1, None); } @@ -227,7 +372,7 @@ mod tests { }; let query = "SELECT * FROM users"; - let result = comment(query, &schema).unwrap(); + let result = parse_comment(query, &schema).unwrap(); assert_eq!(result.1, None); } @@ -252,7 +397,7 @@ mod tests { }; let query = "SELECT * FROM users /* pgdog_sharding_key: sales */"; - let result = comment(query, &schema).unwrap(); + let result = parse_comment(query, &schema).unwrap(); assert_eq!(result.0, Some(Shard::Direct(1))); } } diff --git a/pgdog/src/frontend/router/parser/query/explain.rs b/pgdog/src/frontend/router/parser/query/explain.rs index 6e7b382dc..8c831efa0 100644 --- a/pgdog/src/frontend/router/parser/query/explain.rs +++ b/pgdog/src/frontend/router/parser/query/explain.rs @@ -76,11 +76,16 @@ mod tests { let cluster = Cluster::new_test(&config()); let mut stmts = PreparedStatements::default(); + let (maybe_shard, maybe_role, _) = + comment::parse_comment(sql, &cluster.sharding_schema()).unwrap(); + let ast = Ast::new( &BufferedQuery::Query(Query::new(sql)), &cluster.sharding_schema(), &cluster.schema(), &mut stmts, + maybe_shard, + maybe_role, "", None, ) @@ -119,6 +124,8 @@ mod tests { &cluster.sharding_schema(), &cluster.schema(), &mut stmts, + None, + None, "", None, ) diff --git a/pgdog/src/frontend/router/parser/query/show.rs b/pgdog/src/frontend/router/parser/query/show.rs index f59f1c6be..7e7be98ae 100644 --- a/pgdog/src/frontend/router/parser/query/show.rs +++ b/pgdog/src/frontend/router/parser/query/show.rs @@ -51,6 +51,8 @@ mod test_show { &c.sharding_schema(), &c.schema(), &mut PreparedStatements::default(), + None, + None, "", None, ) @@ -72,6 +74,8 @@ mod test_show { &c.sharding_schema(), &c.schema(), &mut PreparedStatements::default(), + None, + None, "", None, ) diff --git a/pgdog/src/frontend/router/parser/query/test/mod.rs b/pgdog/src/frontend/router/parser/query/test/mod.rs index 8f84467e5..5299c08eb 100644 --- a/pgdog/src/frontend/router/parser/query/test/mod.rs +++ b/pgdog/src/frontend/router/parser/query/test/mod.rs @@ -12,6 +12,7 @@ use bytes::Bytes; use super::{super::Shard, *}; use crate::backend::Cluster; use crate::config::ReadWriteStrategy; +use crate::frontend::router::parser::comment; use crate::frontend::{ client::{Sticky, TransactionType}, router::Ast, @@ -42,11 +43,17 @@ pub mod test_transaction; fn parse_query(query: &str) -> Command { let mut query_parser = QueryParser::default(); let cluster = Cluster::new_test(&config()); + + let (maybe_role, maybe_shard, _) = + comment::parse_comment(&query, &cluster.sharding_schema()).unwrap(); + let ast = Ast::new( &BufferedQuery::Query(Query::new(query)), &cluster.sharding_schema(), &cluster.schema(), &mut PreparedStatements::default(), + maybe_role, + maybe_shard, "", None, ) @@ -70,11 +77,19 @@ macro_rules! command { let query = $query; let mut query_parser = QueryParser::default(); let cluster = Cluster::new_test(&crate::config::config()); + + let buffered_query = &BufferedQuery::Query(Query::new($query)); + + let (maybe_role, maybe_shard, _) = + comment::parse_comment(&buffered_query, &cluster.sharding_schema()).unwrap(); + let mut ast = Ast::new( - &BufferedQuery::Query(Query::new($query)), + buffered_query, &cluster.sharding_schema(), &cluster.schema(), &mut PreparedStatements::default(), + maybe_role, + maybe_shard, "", None, ) @@ -131,11 +146,16 @@ macro_rules! query_parser { let mut prep_stmts = PreparedStatements::default(); + let (maybe_role, maybe_shard, _) = + comment::parse_comment(&buffered_query, &cluster.sharding_schema()).unwrap(); + let mut ast = Ast::new( &buffered_query, &cluster.sharding_schema(), &cluster.schema(), &mut prep_stmts, + maybe_role, + maybe_shard, "", None, ) @@ -188,11 +208,19 @@ macro_rules! parse { .collect::>(); let bind = Bind::new_params_codes($name, ¶ms, $codes); let cluster = Cluster::new_test(&crate::config::config()); + + let buffered_query = &BufferedQuery::Prepared(Parse::new_anonymous($query)); + + let (maybe_role, maybe_shard, _) = + comment::parse_comment(&buffered_query, &cluster.sharding_schema()).unwrap(); + let ast = Ast::new( - &BufferedQuery::Prepared(Parse::new_anonymous($query)), + buffered_query, &cluster.sharding_schema(), &cluster.schema(), &mut PreparedStatements::default(), + maybe_role, + maybe_shard, "", None, ) @@ -430,11 +458,17 @@ fn test_set() { let cluster = Cluster::new_test(&config()); let mut prep_stmts = PreparedStatements::default(); let buffered_query = BufferedQuery::Query(Query::new(query_str)); + + let (maybe_role, maybe_shard, _) = + comment::parse_comment(&buffered_query, &cluster.sharding_schema()).unwrap(); + let mut ast = Ast::new( &buffered_query, &cluster.sharding_schema(), &cluster.schema(), &mut prep_stmts, + maybe_role, + maybe_shard, "", None, ) @@ -581,6 +615,8 @@ WHERE t2.account = ( &cluster.sharding_schema(), &cluster.schema(), &mut prep_stmts, + None, + None, "", None, )