diff --git a/integration/python/test_psycopg.py b/integration/python/test_psycopg.py index f8b838fa6..2d90d28f9 100644 --- a/integration/python/test_psycopg.py +++ b/integration/python/test_psycopg.py @@ -306,3 +306,25 @@ def test_pipeline_error_recovery(): conn.close() no_out_of_sync() + + +def test_pipeline_multiple_errors(): + """Pipeline with multiple queries to a non-existent table. + + Tests that pipeline mode properly handles errors when multiple + queries are sent without waiting for server responses. + """ + conn = normal_sync() + conn.autocommit = True + + # Pipeline with multiple errors should return the first error + # and not timeout or get stuck in "cannot exit pipeline mode" + with pytest.raises(psycopg.errors.UndefinedTable): + with conn.pipeline(): + cur = conn.cursor() + for i in range(5): + cur.execute("SELECT * FROM no_existing_table") + cur.fetchone() + + conn.close() + no_out_of_sync() diff --git a/pgdog/src/backend/pool/connection/binding.rs b/pgdog/src/backend/pool/connection/binding.rs index 913926c3f..677cd3cf3 100644 --- a/pgdog/src/backend/pool/connection/binding.rs +++ b/pgdog/src/backend/pool/connection/binding.rs @@ -240,6 +240,15 @@ impl Binding { } } + /// Protocol is out of sync due to an error in extended protocol. + pub fn out_of_sync(&self) -> bool { + match self { + Binding::Direct(Some(server)) => server.out_of_sync(), + Binding::MultiShard(servers, _state) => servers.iter().any(|s| s.out_of_sync()), + _ => false, + } + } + pub(super) fn state_check(&self, state: State) -> bool { match self { Binding::Direct(Some(server)) => { diff --git a/pgdog/src/backend/prepared_statements.rs b/pgdog/src/backend/prepared_statements.rs index 8c218526a..76ad8480a 100644 --- a/pgdog/src/backend/prepared_statements.rs +++ b/pgdog/src/backend/prepared_statements.rs @@ -260,6 +260,11 @@ impl PreparedStatements { self.state.in_copy_mode() } + /// The protocol is out of sync due to an error in extended protocol. + pub(crate) fn out_of_sync(&self) -> bool { + self.state.out_of_sync() + } + fn check_prepared(&mut self, name: &str) -> Result, Error> { if !self.contains(name) && !self.parses.iter().any(|s| s == name) { let parse = self.parse(name); diff --git a/pgdog/src/backend/protocol/state.rs b/pgdog/src/backend/protocol/state.rs index b8505e5d6..ca86915d2 100644 --- a/pgdog/src/backend/protocol/state.rs +++ b/pgdog/src/backend/protocol/state.rs @@ -225,6 +225,11 @@ impl ProtocolState { pub(crate) fn in_sync(&self) -> bool { !self.out_of_sync } + + /// Check if the protocol is out of sync due to an error in extended protocol. + pub(crate) fn out_of_sync(&self) -> bool { + self.out_of_sync + } } #[cfg(test)] diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index 3ea59adeb..666ae4f24 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -605,6 +605,11 @@ impl Server { self.prepared_statements.in_copy_mode() } + /// Protocol is out of sync due to an error in extended protocol. + pub fn out_of_sync(&self) -> bool { + self.prepared_statements.out_of_sync() + } + /// Server is still inside a transaction. #[inline] pub fn in_transaction(&self) -> bool { diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index 3a07451b0..32e17220d 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -487,6 +487,22 @@ impl Client { let mut context = QueryEngineContext::new(self).spliced(&mut req, reqs.len()); query_engine.handle(&mut context).await?; self.transaction = context.transaction(); + + // If pipeline is aborted due to error, skip to Sync to complete the pipeline. + // Postgres ignores all commands after an error until it receives Sync. + if query_engine.out_of_sync() && !req.is_sync_only() { + debug!("pipeline aborted, skipping to Sync"); + while let Some((_, mut next_req)) = reqs.next() { + if next_req.is_sync_only() { + debug!("processing Sync to complete aborted pipeline"); + let mut ctx = QueryEngineContext::new(self).spliced(&mut next_req, 0); + query_engine.handle(&mut ctx).await?; + self.transaction = ctx.transaction(); + break; + } + } + break; + } } } diff --git a/pgdog/src/frontend/client/query_engine/mod.rs b/pgdog/src/frontend/client/query_engine/mod.rs index 61ee4c9ff..7762a278a 100644 --- a/pgdog/src/frontend/client/query_engine/mod.rs +++ b/pgdog/src/frontend/client/query_engine/mod.rs @@ -295,4 +295,9 @@ impl QueryEngine { pub fn get_state(&self) -> State { self.stats.state } + + /// Check if the backend protocol is out of sync due to an error in extended protocol. + pub fn out_of_sync(&self) -> bool { + self.backend.out_of_sync() + } } diff --git a/pgdog/src/frontend/client/query_engine/test/spliced.rs b/pgdog/src/frontend/client/query_engine/test/spliced.rs index 703a5088d..9987a2e41 100644 --- a/pgdog/src/frontend/client/query_engine/test/spliced.rs +++ b/pgdog/src/frontend/client/query_engine/test/spliced.rs @@ -2,7 +2,8 @@ use super::{test_client, test_sharded_client}; use crate::{ expect_message, net::{ - BindComplete, CommandComplete, DataRow, Describe, Parameters, ParseComplete, ReadyForQuery, + BindComplete, CommandComplete, DataRow, Describe, ErrorResponse, Parameters, ParseComplete, + ReadyForQuery, }, }; @@ -413,3 +414,60 @@ async fn test_simple_query_protocol_sharded_transaction() { client.try_process().await.unwrap(); client.read_until('Z').await.unwrap(); } + +/// Test that pipeline errors skip remaining queries and jump to Sync. +/// When Postgres receives an error in pipeline mode, it enters "aborted" state +/// and ignores all commands until Sync. pgdog must detect this and skip +/// remaining spliced requests to avoid timeout. +#[tokio::test] +async fn test_spliced_pipeline_error_skips_to_sync() { + let mut client = TestClient::new_sharded(Parameters::default()).await; + + assert!(!client.backend_connected()); + + // Send 3 queries in pipeline mode, first one will error (non-existent table) + client + .send(Parse::named("", "SELECT * FROM nonexistent_table_12345")) + .await; + client.send(Bind::new_statement("")).await; + client.send(Execute::new()).await; + + client.send(Parse::named("", "SELECT 2")).await; + client.send(Bind::new_statement("")).await; + client.send(Execute::new()).await; + + client.send(Parse::named("", "SELECT 3")).await; + client.send(Bind::new_statement("")).await; + client.send(Execute::new()).await; + + client.send(Sync).await; + + // Process should complete without timeout + client.try_process().await.unwrap(); + + // Read messages manually since read_until returns Err on ErrorResponse + let mut messages = vec![]; + loop { + let message = client.read().await; + let code = message.code(); + messages.push(message); + if code == 'Z' { + break; + } + } + + // Must have at least ErrorResponse and ReadyForQuery + assert!(messages.len() >= 2, "Expected at least 2 messages"); + + // First message should be ErrorResponse (42P01 = undefined table) + let error = + ErrorResponse::try_from(messages[0].clone()).expect("first message should be error"); + assert_eq!(error.code, "42P01", "Expected undefined table error"); + + // Last message should be ReadyForQuery + let rfq = expect_message!(messages.last().unwrap().clone(), ReadyForQuery); + assert_eq!(rfq.status, 'I', "Should be idle after pipeline error"); + + // Connection should be released back to pool + assert!(!client.backend_connected()); +}