Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions integration/python/test_psycopg.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,25 @@ def test_pipeline_error_recovery():

conn.close()
no_out_of_sync()


def test_pipeline_multiple_errors():
"""Pipeline with multiple queries to a non-existent table.

Tests that pipeline mode properly handles errors when multiple
queries are sent without waiting for server responses.
"""
conn = normal_sync()
conn.autocommit = True

# Pipeline with multiple errors should return the first error
# and not timeout or get stuck in "cannot exit pipeline mode"
with pytest.raises(psycopg.errors.UndefinedTable):
with conn.pipeline():
cur = conn.cursor()
for i in range(5):
cur.execute("SELECT * FROM no_existing_table")
cur.fetchone()

conn.close()
no_out_of_sync()
9 changes: 9 additions & 0 deletions pgdog/src/backend/pool/connection/binding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,15 @@ impl Binding {
}
}

/// Protocol is out of sync due to an error in extended protocol.
pub fn out_of_sync(&self) -> bool {
match self {
Binding::Direct(Some(server)) => server.out_of_sync(),
Binding::MultiShard(servers, _state) => servers.iter().any(|s| s.out_of_sync()),
_ => false,
}
}

pub(super) fn state_check(&self, state: State) -> bool {
match self {
Binding::Direct(Some(server)) => {
Expand Down
5 changes: 5 additions & 0 deletions pgdog/src/backend/prepared_statements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ impl PreparedStatements {
self.state.in_copy_mode()
}

/// The protocol is out of sync due to an error in extended protocol.
pub(crate) fn out_of_sync(&self) -> bool {
self.state.out_of_sync()
}

fn check_prepared(&mut self, name: &str) -> Result<Option<ProtocolMessage>, Error> {
if !self.contains(name) && !self.parses.iter().any(|s| s == name) {
let parse = self.parse(name);
Expand Down
5 changes: 5 additions & 0 deletions pgdog/src/backend/protocol/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ impl ProtocolState {
pub(crate) fn in_sync(&self) -> bool {
!self.out_of_sync
}

/// Check if the protocol is out of sync due to an error in extended protocol.
pub(crate) fn out_of_sync(&self) -> bool {
self.out_of_sync
}
}

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions pgdog/src/backend/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,11 @@ impl Server {
self.prepared_statements.in_copy_mode()
}

/// Protocol is out of sync due to an error in extended protocol.
pub fn out_of_sync(&self) -> bool {
self.prepared_statements.out_of_sync()
}

/// Server is still inside a transaction.
#[inline]
pub fn in_transaction(&self) -> bool {
Expand Down
16 changes: 16 additions & 0 deletions pgdog/src/frontend/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,22 @@ impl Client {
let mut context = QueryEngineContext::new(self).spliced(&mut req, reqs.len());
query_engine.handle(&mut context).await?;
self.transaction = context.transaction();

// If pipeline is aborted due to error, skip to Sync to complete the pipeline.
// Postgres ignores all commands after an error until it receives Sync.
if query_engine.out_of_sync() && !req.is_sync_only() {
debug!("pipeline aborted, skipping to Sync");
while let Some((_, mut next_req)) = reqs.next() {
if next_req.is_sync_only() {
debug!("processing Sync to complete aborted pipeline");
let mut ctx = QueryEngineContext::new(self).spliced(&mut next_req, 0);
query_engine.handle(&mut ctx).await?;
self.transaction = ctx.transaction();
break;
}
}
break;
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions pgdog/src/frontend/client/query_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,9 @@ impl QueryEngine {
pub fn get_state(&self) -> State {
self.stats.state
}

/// Check if the backend protocol is out of sync due to an error in extended protocol.
pub fn out_of_sync(&self) -> bool {
self.backend.out_of_sync()
}
}
60 changes: 59 additions & 1 deletion pgdog/src/frontend/client/query_engine/test/spliced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use super::{test_client, test_sharded_client};
use crate::{
expect_message,
net::{
BindComplete, CommandComplete, DataRow, Describe, Parameters, ParseComplete, ReadyForQuery,
BindComplete, CommandComplete, DataRow, Describe, ErrorResponse, Parameters, ParseComplete,
ReadyForQuery,
},
};

Expand Down Expand Up @@ -413,3 +414,60 @@ async fn test_simple_query_protocol_sharded_transaction() {
client.try_process().await.unwrap();
client.read_until('Z').await.unwrap();
}

/// Test that pipeline errors skip remaining queries and jump to Sync.
/// When Postgres receives an error in pipeline mode, it enters "aborted" state
/// and ignores all commands until Sync. pgdog must detect this and skip
/// remaining spliced requests to avoid timeout.
#[tokio::test]
async fn test_spliced_pipeline_error_skips_to_sync() {
let mut client = TestClient::new_sharded(Parameters::default()).await;

assert!(!client.backend_connected());

// Send 3 queries in pipeline mode, first one will error (non-existent table)
client
.send(Parse::named("", "SELECT * FROM nonexistent_table_12345"))
.await;
client.send(Bind::new_statement("")).await;
client.send(Execute::new()).await;

client.send(Parse::named("", "SELECT 2")).await;
client.send(Bind::new_statement("")).await;
client.send(Execute::new()).await;

client.send(Parse::named("", "SELECT 3")).await;
client.send(Bind::new_statement("")).await;
client.send(Execute::new()).await;

client.send(Sync).await;

// Process should complete without timeout
client.try_process().await.unwrap();

// Read messages manually since read_until returns Err on ErrorResponse
let mut messages = vec![];
loop {
let message = client.read().await;
let code = message.code();
messages.push(message);
if code == 'Z' {
break;
}
}

// Must have at least ErrorResponse and ReadyForQuery
assert!(messages.len() >= 2, "Expected at least 2 messages");

// First message should be ErrorResponse (42P01 = undefined table)
let error =
ErrorResponse::try_from(messages[0].clone()).expect("first message should be error");
assert_eq!(error.code, "42P01", "Expected undefined table error");

// Last message should be ReadyForQuery
let rfq = expect_message!(messages.last().unwrap().clone(), ReadyForQuery);
assert_eq!(rfq.status, 'I', "Should be idle after pipeline error");

// Connection should be released back to pool
assert!(!client.backend_connected());
}
Loading