From c621377e7a271d2531e0edf5dbacafdb1a190998 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 18 Feb 2026 11:46:16 -0800 Subject: [PATCH 1/7] feat: support OFFSET in cross-shard queries --- pgdog-plugin/src/bindings.rs | 470 ++++++++++------- .../src/frontend/client/query_engine/query.rs | 2 +- .../client/query_engine/route_query.rs | 9 + .../router/parser/rewrite/statement/error.rs | 3 + .../router/parser/rewrite/statement/mod.rs | 2 + .../router/parser/rewrite/statement/offset.rs | 473 ++++++++++++++++++ .../router/parser/rewrite/statement/plan.rs | 11 +- pgdog/src/frontend/router/parser/route.rs | 4 + pgdog/src/net/messages/bind.rs | 26 + 9 files changed, 825 insertions(+), 175 deletions(-) create mode 100644 pgdog/src/frontend/router/parser/rewrite/statement/offset.rs diff --git a/pgdog-plugin/src/bindings.rs b/pgdog-plugin/src/bindings.rs index 561d24e5b..6f47703df 100644 --- a/pgdog-plugin/src/bindings.rs +++ b/pgdog-plugin/src/bindings.rs @@ -1,213 +1,338 @@ /* automatically generated by rust-bindgen 0.71.1 */ -pub const _STDINT_H: u32 = 1; -pub const _FEATURES_H: u32 = 1; -pub const _DEFAULT_SOURCE: u32 = 1; -pub const __GLIBC_USE_ISOC2Y: u32 = 0; -pub const __GLIBC_USE_ISOC23: u32 = 0; -pub const __USE_ISOC11: u32 = 1; -pub const __USE_ISOC99: u32 = 1; -pub const __USE_ISOC95: u32 = 1; -pub const __USE_POSIX_IMPLICITLY: u32 = 1; -pub const _POSIX_SOURCE: u32 = 1; -pub const _POSIX_C_SOURCE: u32 = 200809; -pub const __USE_POSIX: u32 = 1; -pub const __USE_POSIX2: u32 = 1; -pub const __USE_POSIX199309: u32 = 1; -pub const __USE_POSIX199506: u32 = 1; -pub const __USE_XOPEN2K: u32 = 1; -pub const __USE_XOPEN2K8: u32 = 1; -pub const _ATFILE_SOURCE: u32 = 1; pub const __WORDSIZE: u32 = 64; -pub const __WORDSIZE_TIME64_COMPAT32: u32 = 1; -pub const __SYSCALL_WORDSIZE: u32 = 64; -pub const __TIMESIZE: u32 = 64; -pub const __USE_TIME_BITS64: u32 = 1; -pub const __USE_MISC: u32 = 1; -pub const __USE_ATFILE: u32 = 1; -pub const __USE_FORTIFY_LEVEL: u32 = 0; -pub const __GLIBC_USE_DEPRECATED_GETS: u32 = 0; -pub const __GLIBC_USE_DEPRECATED_SCANF: u32 = 0; -pub const __GLIBC_USE_C23_STRTOL: u32 = 0; -pub const _STDC_PREDEF_H: u32 = 1; -pub const __STDC_IEC_559__: u32 = 1; -pub const __STDC_IEC_60559_BFP__: u32 = 201404; -pub const __STDC_IEC_559_COMPLEX__: u32 = 1; -pub const __STDC_IEC_60559_COMPLEX__: u32 = 201404; -pub const __STDC_ISO_10646__: u32 = 201706; -pub const __GNU_LIBRARY__: u32 = 6; -pub const __GLIBC__: u32 = 2; -pub const __GLIBC_MINOR__: u32 = 42; -pub const _SYS_CDEFS_H: u32 = 1; -pub const __glibc_c99_flexarr_available: u32 = 1; -pub const __LDOUBLE_REDIRECTS_TO_FLOAT128_ABI: u32 = 0; -pub const __HAVE_GENERIC_SELECTION: u32 = 1; -pub const __GLIBC_USE_LIB_EXT2: u32 = 0; -pub const __GLIBC_USE_IEC_60559_BFP_EXT: u32 = 0; -pub const __GLIBC_USE_IEC_60559_BFP_EXT_C23: u32 = 0; -pub const __GLIBC_USE_IEC_60559_EXT: u32 = 0; -pub const __GLIBC_USE_IEC_60559_FUNCS_EXT: u32 = 0; -pub const __GLIBC_USE_IEC_60559_FUNCS_EXT_C23: u32 = 0; -pub const __GLIBC_USE_IEC_60559_TYPES_EXT: u32 = 0; -pub const _BITS_TYPES_H: u32 = 1; -pub const _BITS_TYPESIZES_H: u32 = 1; -pub const __OFF_T_MATCHES_OFF64_T: u32 = 1; -pub const __INO_T_MATCHES_INO64_T: u32 = 1; -pub const __RLIM_T_MATCHES_RLIM64_T: u32 = 1; -pub const __STATFS_MATCHES_STATFS64: u32 = 1; -pub const __KERNEL_OLD_TIMEVAL_MATCHES_TIMEVAL64: u32 = 1; -pub const __FD_SETSIZE: u32 = 1024; -pub const _BITS_TIME64_H: u32 = 1; -pub const _BITS_WCHAR_H: u32 = 1; -pub const _BITS_STDINT_INTN_H: u32 = 1; -pub const _BITS_STDINT_UINTN_H: u32 = 1; -pub const _BITS_STDINT_LEAST_H: u32 = 1; -pub const INT8_MIN: i32 = -128; -pub const INT16_MIN: i32 = -32768; -pub const INT32_MIN: i32 = -2147483648; +pub const __has_safe_buffers: u32 = 1; +pub const __DARWIN_ONLY_64_BIT_INO_T: u32 = 1; +pub const __DARWIN_ONLY_UNIX_CONFORMANCE: u32 = 1; +pub const __DARWIN_ONLY_VERS_1050: u32 = 1; +pub const __DARWIN_UNIX03: u32 = 1; +pub const __DARWIN_64_BIT_INO_T: u32 = 1; +pub const __DARWIN_VERS_1050: u32 = 1; +pub const __DARWIN_NON_CANCELABLE: u32 = 0; +pub const __DARWIN_SUF_EXTSN: &[u8; 14] = b"$DARWIN_EXTSN\0"; +pub const __DARWIN_C_ANSI: u32 = 4096; +pub const __DARWIN_C_FULL: u32 = 900000; +pub const __DARWIN_C_LEVEL: u32 = 900000; +pub const __STDC_WANT_LIB_EXT1__: u32 = 1; +pub const __DARWIN_NO_LONG_LONG: u32 = 0; +pub const _DARWIN_FEATURE_64_BIT_INODE: u32 = 1; +pub const _DARWIN_FEATURE_ONLY_64_BIT_INODE: u32 = 1; +pub const _DARWIN_FEATURE_ONLY_VERS_1050: u32 = 1; +pub const _DARWIN_FEATURE_ONLY_UNIX_CONFORMANCE: u32 = 1; +pub const _DARWIN_FEATURE_UNIX_CONFORMANCE: u32 = 3; +pub const __has_ptrcheck: u32 = 0; +pub const USE_CLANG_TYPES: u32 = 0; +pub const __PTHREAD_SIZE__: u32 = 8176; +pub const __PTHREAD_ATTR_SIZE__: u32 = 56; +pub const __PTHREAD_MUTEXATTR_SIZE__: u32 = 8; +pub const __PTHREAD_MUTEX_SIZE__: u32 = 56; +pub const __PTHREAD_CONDATTR_SIZE__: u32 = 8; +pub const __PTHREAD_COND_SIZE__: u32 = 40; +pub const __PTHREAD_ONCE_SIZE__: u32 = 8; +pub const __PTHREAD_RWLOCK_SIZE__: u32 = 192; +pub const __PTHREAD_RWLOCKATTR_SIZE__: u32 = 16; pub const INT8_MAX: u32 = 127; pub const INT16_MAX: u32 = 32767; pub const INT32_MAX: u32 = 2147483647; +pub const INT64_MAX: u64 = 9223372036854775807; +pub const INT8_MIN: i32 = -128; +pub const INT16_MIN: i32 = -32768; +pub const INT32_MIN: i32 = -2147483648; +pub const INT64_MIN: i64 = -9223372036854775808; pub const UINT8_MAX: u32 = 255; pub const UINT16_MAX: u32 = 65535; pub const UINT32_MAX: u32 = 4294967295; +pub const UINT64_MAX: i32 = -1; pub const INT_LEAST8_MIN: i32 = -128; pub const INT_LEAST16_MIN: i32 = -32768; pub const INT_LEAST32_MIN: i32 = -2147483648; +pub const INT_LEAST64_MIN: i64 = -9223372036854775808; pub const INT_LEAST8_MAX: u32 = 127; pub const INT_LEAST16_MAX: u32 = 32767; pub const INT_LEAST32_MAX: u32 = 2147483647; +pub const INT_LEAST64_MAX: u64 = 9223372036854775807; pub const UINT_LEAST8_MAX: u32 = 255; pub const UINT_LEAST16_MAX: u32 = 65535; pub const UINT_LEAST32_MAX: u32 = 4294967295; +pub const UINT_LEAST64_MAX: i32 = -1; pub const INT_FAST8_MIN: i32 = -128; -pub const INT_FAST16_MIN: i64 = -9223372036854775808; -pub const INT_FAST32_MIN: i64 = -9223372036854775808; +pub const INT_FAST16_MIN: i32 = -32768; +pub const INT_FAST32_MIN: i32 = -2147483648; +pub const INT_FAST64_MIN: i64 = -9223372036854775808; pub const INT_FAST8_MAX: u32 = 127; -pub const INT_FAST16_MAX: u64 = 9223372036854775807; -pub const INT_FAST32_MAX: u64 = 9223372036854775807; +pub const INT_FAST16_MAX: u32 = 32767; +pub const INT_FAST32_MAX: u32 = 2147483647; +pub const INT_FAST64_MAX: u64 = 9223372036854775807; pub const UINT_FAST8_MAX: u32 = 255; -pub const UINT_FAST16_MAX: i32 = -1; -pub const UINT_FAST32_MAX: i32 = -1; -pub const INTPTR_MIN: i64 = -9223372036854775808; +pub const UINT_FAST16_MAX: u32 = 65535; +pub const UINT_FAST32_MAX: u32 = 4294967295; +pub const UINT_FAST64_MAX: i32 = -1; pub const INTPTR_MAX: u64 = 9223372036854775807; +pub const INTPTR_MIN: i64 = -9223372036854775808; pub const UINTPTR_MAX: i32 = -1; -pub const PTRDIFF_MIN: i64 = -9223372036854775808; -pub const PTRDIFF_MAX: u64 = 9223372036854775807; +pub const SIZE_MAX: i32 = -1; +pub const RSIZE_MAX: i32 = -1; +pub const WINT_MIN: i32 = -2147483648; +pub const WINT_MAX: u32 = 2147483647; pub const SIG_ATOMIC_MIN: i32 = -2147483648; pub const SIG_ATOMIC_MAX: u32 = 2147483647; -pub const SIZE_MAX: i32 = -1; -pub const WINT_MIN: u32 = 0; -pub const WINT_MAX: u32 = 4294967295; pub type wchar_t = ::std::os::raw::c_int; -#[repr(C)] -#[repr(align(16))] -#[derive(Debug, Copy, Clone)] -pub struct max_align_t { - pub __clang_max_align_nonce1: ::std::os::raw::c_longlong, - pub __bindgen_padding_0: u64, - pub __clang_max_align_nonce2: u128, -} -#[allow(clippy::unnecessary_operation, clippy::identity_op)] -const _: () = { - ["Size of max_align_t"][::std::mem::size_of::() - 32usize]; - ["Alignment of max_align_t"][::std::mem::align_of::() - 16usize]; - ["Offset of field: max_align_t::__clang_max_align_nonce1"] - [::std::mem::offset_of!(max_align_t, __clang_max_align_nonce1) - 0usize]; - ["Offset of field: max_align_t::__clang_max_align_nonce2"] - [::std::mem::offset_of!(max_align_t, __clang_max_align_nonce2) - 16usize]; -}; -pub type __u_char = ::std::os::raw::c_uchar; -pub type __u_short = ::std::os::raw::c_ushort; -pub type __u_int = ::std::os::raw::c_uint; -pub type __u_long = ::std::os::raw::c_ulong; +pub type max_align_t = f64; +pub type int_least8_t = i8; +pub type int_least16_t = i16; +pub type int_least32_t = i32; +pub type int_least64_t = i64; +pub type uint_least8_t = u8; +pub type uint_least16_t = u16; +pub type uint_least32_t = u32; +pub type uint_least64_t = u64; +pub type int_fast8_t = i8; +pub type int_fast16_t = i16; +pub type int_fast32_t = i32; +pub type int_fast64_t = i64; +pub type uint_fast8_t = u8; +pub type uint_fast16_t = u16; +pub type uint_fast32_t = u32; +pub type uint_fast64_t = u64; pub type __int8_t = ::std::os::raw::c_schar; pub type __uint8_t = ::std::os::raw::c_uchar; pub type __int16_t = ::std::os::raw::c_short; pub type __uint16_t = ::std::os::raw::c_ushort; pub type __int32_t = ::std::os::raw::c_int; pub type __uint32_t = ::std::os::raw::c_uint; -pub type __int64_t = ::std::os::raw::c_long; -pub type __uint64_t = ::std::os::raw::c_ulong; -pub type __int_least8_t = __int8_t; -pub type __uint_least8_t = __uint8_t; -pub type __int_least16_t = __int16_t; -pub type __uint_least16_t = __uint16_t; -pub type __int_least32_t = __int32_t; -pub type __uint_least32_t = __uint32_t; -pub type __int_least64_t = __int64_t; -pub type __uint_least64_t = __uint64_t; -pub type __quad_t = ::std::os::raw::c_long; -pub type __u_quad_t = ::std::os::raw::c_ulong; -pub type __intmax_t = ::std::os::raw::c_long; -pub type __uintmax_t = ::std::os::raw::c_ulong; -pub type __dev_t = ::std::os::raw::c_ulong; -pub type __uid_t = ::std::os::raw::c_uint; -pub type __gid_t = ::std::os::raw::c_uint; -pub type __ino_t = ::std::os::raw::c_ulong; -pub type __ino64_t = ::std::os::raw::c_ulong; -pub type __mode_t = ::std::os::raw::c_uint; -pub type __nlink_t = ::std::os::raw::c_ulong; -pub type __off_t = ::std::os::raw::c_long; -pub type __off64_t = ::std::os::raw::c_long; -pub type __pid_t = ::std::os::raw::c_int; +pub type __int64_t = ::std::os::raw::c_longlong; +pub type __uint64_t = ::std::os::raw::c_ulonglong; +pub type __darwin_intptr_t = ::std::os::raw::c_long; +pub type __darwin_natural_t = ::std::os::raw::c_uint; +pub type __darwin_ct_rune_t = ::std::os::raw::c_int; +#[repr(C)] +#[derive(Copy, Clone)] +pub union __mbstate_t { + pub __mbstate8: [::std::os::raw::c_char; 128usize], + pub _mbstateL: ::std::os::raw::c_longlong, +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of __mbstate_t"][::std::mem::size_of::<__mbstate_t>() - 128usize]; + ["Alignment of __mbstate_t"][::std::mem::align_of::<__mbstate_t>() - 8usize]; + ["Offset of field: __mbstate_t::__mbstate8"] + [::std::mem::offset_of!(__mbstate_t, __mbstate8) - 0usize]; + ["Offset of field: __mbstate_t::_mbstateL"] + [::std::mem::offset_of!(__mbstate_t, _mbstateL) - 0usize]; +}; +pub type __darwin_mbstate_t = __mbstate_t; +pub type __darwin_ptrdiff_t = ::std::os::raw::c_long; +pub type __darwin_size_t = ::std::os::raw::c_ulong; +pub type __darwin_va_list = __builtin_va_list; +pub type __darwin_wchar_t = ::std::os::raw::c_int; +pub type __darwin_rune_t = __darwin_wchar_t; +pub type __darwin_wint_t = ::std::os::raw::c_int; +pub type __darwin_clock_t = ::std::os::raw::c_ulong; +pub type __darwin_socklen_t = __uint32_t; +pub type __darwin_ssize_t = ::std::os::raw::c_long; +pub type __darwin_time_t = ::std::os::raw::c_long; +pub type __darwin_blkcnt_t = __int64_t; +pub type __darwin_blksize_t = __int32_t; +pub type __darwin_dev_t = __int32_t; +pub type __darwin_fsblkcnt_t = ::std::os::raw::c_uint; +pub type __darwin_fsfilcnt_t = ::std::os::raw::c_uint; +pub type __darwin_gid_t = __uint32_t; +pub type __darwin_id_t = __uint32_t; +pub type __darwin_ino64_t = __uint64_t; +pub type __darwin_ino_t = __darwin_ino64_t; +pub type __darwin_mach_port_name_t = __darwin_natural_t; +pub type __darwin_mach_port_t = __darwin_mach_port_name_t; +pub type __darwin_mode_t = __uint16_t; +pub type __darwin_off_t = __int64_t; +pub type __darwin_pid_t = __int32_t; +pub type __darwin_sigset_t = __uint32_t; +pub type __darwin_suseconds_t = __int32_t; +pub type __darwin_uid_t = __uint32_t; +pub type __darwin_useconds_t = __uint32_t; +pub type __darwin_uuid_t = [::std::os::raw::c_uchar; 16usize]; +pub type __darwin_uuid_string_t = [::std::os::raw::c_char; 37usize]; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct __darwin_pthread_handler_rec { + pub __routine: ::std::option::Option, + pub __arg: *mut ::std::os::raw::c_void, + pub __next: *mut __darwin_pthread_handler_rec, +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of __darwin_pthread_handler_rec"] + [::std::mem::size_of::<__darwin_pthread_handler_rec>() - 24usize]; + ["Alignment of __darwin_pthread_handler_rec"] + [::std::mem::align_of::<__darwin_pthread_handler_rec>() - 8usize]; + ["Offset of field: __darwin_pthread_handler_rec::__routine"] + [::std::mem::offset_of!(__darwin_pthread_handler_rec, __routine) - 0usize]; + ["Offset of field: __darwin_pthread_handler_rec::__arg"] + [::std::mem::offset_of!(__darwin_pthread_handler_rec, __arg) - 8usize]; + ["Offset of field: __darwin_pthread_handler_rec::__next"] + [::std::mem::offset_of!(__darwin_pthread_handler_rec, __next) - 16usize]; +}; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct _opaque_pthread_attr_t { + pub __sig: ::std::os::raw::c_long, + pub __opaque: [::std::os::raw::c_char; 56usize], +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of _opaque_pthread_attr_t"][::std::mem::size_of::<_opaque_pthread_attr_t>() - 64usize]; + ["Alignment of _opaque_pthread_attr_t"] + [::std::mem::align_of::<_opaque_pthread_attr_t>() - 8usize]; + ["Offset of field: _opaque_pthread_attr_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_attr_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_attr_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_attr_t, __opaque) - 8usize]; +}; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct _opaque_pthread_cond_t { + pub __sig: ::std::os::raw::c_long, + pub __opaque: [::std::os::raw::c_char; 40usize], +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of _opaque_pthread_cond_t"][::std::mem::size_of::<_opaque_pthread_cond_t>() - 48usize]; + ["Alignment of _opaque_pthread_cond_t"] + [::std::mem::align_of::<_opaque_pthread_cond_t>() - 8usize]; + ["Offset of field: _opaque_pthread_cond_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_cond_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_cond_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_cond_t, __opaque) - 8usize]; +}; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct _opaque_pthread_condattr_t { + pub __sig: ::std::os::raw::c_long, + pub __opaque: [::std::os::raw::c_char; 8usize], +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of _opaque_pthread_condattr_t"] + [::std::mem::size_of::<_opaque_pthread_condattr_t>() - 16usize]; + ["Alignment of _opaque_pthread_condattr_t"] + [::std::mem::align_of::<_opaque_pthread_condattr_t>() - 8usize]; + ["Offset of field: _opaque_pthread_condattr_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_condattr_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_condattr_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_condattr_t, __opaque) - 8usize]; +}; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct _opaque_pthread_mutex_t { + pub __sig: ::std::os::raw::c_long, + pub __opaque: [::std::os::raw::c_char; 56usize], +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of _opaque_pthread_mutex_t"][::std::mem::size_of::<_opaque_pthread_mutex_t>() - 64usize]; + ["Alignment of _opaque_pthread_mutex_t"] + [::std::mem::align_of::<_opaque_pthread_mutex_t>() - 8usize]; + ["Offset of field: _opaque_pthread_mutex_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_mutex_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_mutex_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_mutex_t, __opaque) - 8usize]; +}; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct _opaque_pthread_mutexattr_t { + pub __sig: ::std::os::raw::c_long, + pub __opaque: [::std::os::raw::c_char; 8usize], +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of _opaque_pthread_mutexattr_t"] + [::std::mem::size_of::<_opaque_pthread_mutexattr_t>() - 16usize]; + ["Alignment of _opaque_pthread_mutexattr_t"] + [::std::mem::align_of::<_opaque_pthread_mutexattr_t>() - 8usize]; + ["Offset of field: _opaque_pthread_mutexattr_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_mutexattr_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_mutexattr_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_mutexattr_t, __opaque) - 8usize]; +}; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct _opaque_pthread_once_t { + pub __sig: ::std::os::raw::c_long, + pub __opaque: [::std::os::raw::c_char; 8usize], +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of _opaque_pthread_once_t"][::std::mem::size_of::<_opaque_pthread_once_t>() - 16usize]; + ["Alignment of _opaque_pthread_once_t"] + [::std::mem::align_of::<_opaque_pthread_once_t>() - 8usize]; + ["Offset of field: _opaque_pthread_once_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_once_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_once_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_once_t, __opaque) - 8usize]; +}; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct _opaque_pthread_rwlock_t { + pub __sig: ::std::os::raw::c_long, + pub __opaque: [::std::os::raw::c_char; 192usize], +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of _opaque_pthread_rwlock_t"] + [::std::mem::size_of::<_opaque_pthread_rwlock_t>() - 200usize]; + ["Alignment of _opaque_pthread_rwlock_t"] + [::std::mem::align_of::<_opaque_pthread_rwlock_t>() - 8usize]; + ["Offset of field: _opaque_pthread_rwlock_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_rwlock_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_rwlock_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_rwlock_t, __opaque) - 8usize]; +}; +#[repr(C)] +#[derive(Debug, Copy, Clone)] +pub struct _opaque_pthread_rwlockattr_t { + pub __sig: ::std::os::raw::c_long, + pub __opaque: [::std::os::raw::c_char; 16usize], +} +#[allow(clippy::unnecessary_operation, clippy::identity_op)] +const _: () = { + ["Size of _opaque_pthread_rwlockattr_t"] + [::std::mem::size_of::<_opaque_pthread_rwlockattr_t>() - 24usize]; + ["Alignment of _opaque_pthread_rwlockattr_t"] + [::std::mem::align_of::<_opaque_pthread_rwlockattr_t>() - 8usize]; + ["Offset of field: _opaque_pthread_rwlockattr_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_rwlockattr_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_rwlockattr_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_rwlockattr_t, __opaque) - 8usize]; +}; #[repr(C)] #[derive(Debug, Copy, Clone)] -pub struct __fsid_t { - pub __val: [::std::os::raw::c_int; 2usize], +pub struct _opaque_pthread_t { + pub __sig: ::std::os::raw::c_long, + pub __cleanup_stack: *mut __darwin_pthread_handler_rec, + pub __opaque: [::std::os::raw::c_char; 8176usize], } #[allow(clippy::unnecessary_operation, clippy::identity_op)] const _: () = { - ["Size of __fsid_t"][::std::mem::size_of::<__fsid_t>() - 8usize]; - ["Alignment of __fsid_t"][::std::mem::align_of::<__fsid_t>() - 4usize]; - ["Offset of field: __fsid_t::__val"][::std::mem::offset_of!(__fsid_t, __val) - 0usize]; + ["Size of _opaque_pthread_t"][::std::mem::size_of::<_opaque_pthread_t>() - 8192usize]; + ["Alignment of _opaque_pthread_t"][::std::mem::align_of::<_opaque_pthread_t>() - 8usize]; + ["Offset of field: _opaque_pthread_t::__sig"] + [::std::mem::offset_of!(_opaque_pthread_t, __sig) - 0usize]; + ["Offset of field: _opaque_pthread_t::__cleanup_stack"] + [::std::mem::offset_of!(_opaque_pthread_t, __cleanup_stack) - 8usize]; + ["Offset of field: _opaque_pthread_t::__opaque"] + [::std::mem::offset_of!(_opaque_pthread_t, __opaque) - 16usize]; }; -pub type __clock_t = ::std::os::raw::c_long; -pub type __rlim_t = ::std::os::raw::c_ulong; -pub type __rlim64_t = ::std::os::raw::c_ulong; -pub type __id_t = ::std::os::raw::c_uint; -pub type __time_t = ::std::os::raw::c_long; -pub type __useconds_t = ::std::os::raw::c_uint; -pub type __suseconds_t = ::std::os::raw::c_long; -pub type __suseconds64_t = ::std::os::raw::c_long; -pub type __daddr_t = ::std::os::raw::c_int; -pub type __key_t = ::std::os::raw::c_int; -pub type __clockid_t = ::std::os::raw::c_int; -pub type __timer_t = *mut ::std::os::raw::c_void; -pub type __blksize_t = ::std::os::raw::c_long; -pub type __blkcnt_t = ::std::os::raw::c_long; -pub type __blkcnt64_t = ::std::os::raw::c_long; -pub type __fsblkcnt_t = ::std::os::raw::c_ulong; -pub type __fsblkcnt64_t = ::std::os::raw::c_ulong; -pub type __fsfilcnt_t = ::std::os::raw::c_ulong; -pub type __fsfilcnt64_t = ::std::os::raw::c_ulong; -pub type __fsword_t = ::std::os::raw::c_long; -pub type __ssize_t = ::std::os::raw::c_long; -pub type __syscall_slong_t = ::std::os::raw::c_long; -pub type __syscall_ulong_t = ::std::os::raw::c_ulong; -pub type __loff_t = __off64_t; -pub type __caddr_t = *mut ::std::os::raw::c_char; -pub type __intptr_t = ::std::os::raw::c_long; -pub type __socklen_t = ::std::os::raw::c_uint; -pub type __sig_atomic_t = ::std::os::raw::c_int; -pub type int_least8_t = __int_least8_t; -pub type int_least16_t = __int_least16_t; -pub type int_least32_t = __int_least32_t; -pub type int_least64_t = __int_least64_t; -pub type uint_least8_t = __uint_least8_t; -pub type uint_least16_t = __uint_least16_t; -pub type uint_least32_t = __uint_least32_t; -pub type uint_least64_t = __uint_least64_t; -pub type int_fast8_t = ::std::os::raw::c_schar; -pub type int_fast16_t = ::std::os::raw::c_long; -pub type int_fast32_t = ::std::os::raw::c_long; -pub type int_fast64_t = ::std::os::raw::c_long; -pub type uint_fast8_t = ::std::os::raw::c_uchar; -pub type uint_fast16_t = ::std::os::raw::c_ulong; -pub type uint_fast32_t = ::std::os::raw::c_ulong; -pub type uint_fast64_t = ::std::os::raw::c_ulong; -pub type intmax_t = __intmax_t; -pub type uintmax_t = __uintmax_t; +pub type __darwin_pthread_attr_t = _opaque_pthread_attr_t; +pub type __darwin_pthread_cond_t = _opaque_pthread_cond_t; +pub type __darwin_pthread_condattr_t = _opaque_pthread_condattr_t; +pub type __darwin_pthread_key_t = ::std::os::raw::c_ulong; +pub type __darwin_pthread_mutex_t = _opaque_pthread_mutex_t; +pub type __darwin_pthread_mutexattr_t = _opaque_pthread_mutexattr_t; +pub type __darwin_pthread_once_t = _opaque_pthread_once_t; +pub type __darwin_pthread_rwlock_t = _opaque_pthread_rwlock_t; +pub type __darwin_pthread_rwlockattr_t = _opaque_pthread_rwlockattr_t; +pub type __darwin_pthread_t = *mut _opaque_pthread_t; +pub type intmax_t = ::std::os::raw::c_long; +pub type uintmax_t = ::std::os::raw::c_ulong; #[doc = " Wrapper around Rust's [`&str`], without allocating memory, unlike [`std::ffi::CString`].\n The caller must use it as a Rust string. This is not a C-string."] #[repr(C)] #[derive(Debug, Copy, Clone)] @@ -324,3 +449,4 @@ const _: () = { ["Offset of field: PdRoute::shard"][::std::mem::offset_of!(PdRoute, shard) - 0usize]; ["Offset of field: PdRoute::read_write"][::std::mem::offset_of!(PdRoute, read_write) - 8usize]; }; +pub type __builtin_va_list = *mut ::std::os::raw::c_char; diff --git a/pgdog/src/frontend/client/query_engine/query.rs b/pgdog/src/frontend/client/query_engine/query.rs index 7b2fe6f23..5e87f7a2f 100644 --- a/pgdog/src/frontend/client/query_engine/query.rs +++ b/pgdog/src/frontend/client/query_engine/query.rs @@ -87,7 +87,7 @@ impl QueryEngine { .await?; } - Some(RewriteResult::InPlace) | None => { + Some(RewriteResult::InPlace { .. }) | None => { self.backend .handle_client_request(context.client_request, &mut self.router, self.streaming) .await?; diff --git a/pgdog/src/frontend/client/query_engine/route_query.rs b/pgdog/src/frontend/client/query_engine/route_query.rs index 38875dcfa..25bd823d2 100644 --- a/pgdog/src/frontend/client/query_engine/route_query.rs +++ b/pgdog/src/frontend/client/query_engine/route_query.rs @@ -2,6 +2,8 @@ use pgdog_config::PoolerMode; use tokio::time::timeout; use tracing::trace; +use crate::frontend::router::parser::rewrite::statement::plan::RewriteResult; + use super::*; #[derive(Debug, Clone)] @@ -96,6 +98,13 @@ impl QueryEngine { context.client_request.messages, command, ); + + // Apply post-parser rewrites, e.g. offset/limit. + if let Some(RewriteResult::InPlace { offset }) = &context.rewrite_result { + if let Some(offset) = offset { + offset.apply_after_parser(context.client_request)?; + } + } } Err(err) => { self.error_response(context, ErrorResponse::syntax(err.to_string().as_str())) diff --git a/pgdog/src/frontend/router/parser/rewrite/statement/error.rs b/pgdog/src/frontend/router/parser/rewrite/statement/error.rs index f745feab3..1e0858159 100644 --- a/pgdog/src/frontend/router/parser/rewrite/statement/error.rs +++ b/pgdog/src/frontend/router/parser/rewrite/statement/error.rs @@ -34,4 +34,7 @@ pub enum Error { #[error("primary key is missing")] MissingPrimaryKey, + + #[error("missing AST on request")] + MissingAst, } diff --git a/pgdog/src/frontend/router/parser/rewrite/statement/mod.rs b/pgdog/src/frontend/router/parser/rewrite/statement/mod.rs index 13f66b50d..b03fab315 100644 --- a/pgdog/src/frontend/router/parser/rewrite/statement/mod.rs +++ b/pgdog/src/frontend/router/parser/rewrite/statement/mod.rs @@ -14,6 +14,7 @@ pub mod aggregate; pub mod auto_id; pub mod error; pub mod insert; +pub mod offset; pub mod plan; pub mod simple_prepared; pub mod unique_id; @@ -140,6 +141,7 @@ impl<'a> StatementRewrite<'a> { })?; self.rewrite_aggregates(&mut plan)?; + self.limit_offset(&mut plan)?; if self.rewritten { plan.stmt = Some(match self.schema.query_parser_engine { diff --git a/pgdog/src/frontend/router/parser/rewrite/statement/offset.rs b/pgdog/src/frontend/router/parser/rewrite/statement/offset.rs new file mode 100644 index 000000000..afeb375a3 --- /dev/null +++ b/pgdog/src/frontend/router/parser/rewrite/statement/offset.rs @@ -0,0 +1,473 @@ +use pg_query::protobuf::{a_const::Val, AConst, Integer, ParamRef, ParseResult}; +use pg_query::NodeEnum; + +use crate::frontend::router::parser::Limit; +use crate::frontend::ClientRequest; +use crate::net::messages::bind::{Format, Parameter}; +use crate::net::ProtocolMessage; + +use super::*; + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct OffsetPlan { + pub(crate) limit: Limit, + pub(crate) limit_param: usize, + pub(crate) offset_param: usize, +} + +impl OffsetPlan { + pub(crate) fn apply_after_parser(&self, request: &mut ClientRequest) -> Result<(), Error> { + let route = match request.route.as_mut() { + Some(route) => route, + None => return Ok(()), + }; + + if !route.is_cross_shard() { + return Ok(()); + } + + // Resolve actual values: use literal if known, otherwise read from Bind. + let mut limit_val = self.limit.limit; + let mut offset_val = self.limit.offset; + + for message in request.messages.iter_mut() { + if let ProtocolMessage::Bind(bind) = message { + if limit_val.is_none() { + let idx = self.limit_param - 1; + limit_val = Some( + bind.parameter(idx)? + .ok_or(Error::MissingParameter(self.limit_param as u16))? + .bigint() + .ok_or(Error::MissingParameter(self.limit_param as u16))? + as usize, + ); + } + if offset_val.is_none() { + let idx = self.offset_param - 1; + offset_val = Some( + bind.parameter(idx)? + .ok_or(Error::MissingParameter(self.offset_param as u16))? + .bigint() + .ok_or(Error::MissingParameter(self.offset_param as u16))? + as usize, + ); + } + + let new_limit = limit_val.unwrap_or(0) + offset_val.unwrap_or(0); + + // Overwrite parameterized limit. + if self.limit.limit.is_none() { + let idx = self.limit_param - 1; + let fmt = bind.parameter_format(idx)?; + let param = match fmt { + Format::Binary => Parameter::new(&(new_limit as i64).to_be_bytes()), + Format::Text => Parameter::new(new_limit.to_string().as_bytes()), + }; + bind.set_param(idx, param); + } + + // Overwrite parameterized offset. + if self.limit.offset.is_none() { + let idx = self.offset_param - 1; + let fmt = bind.parameter_format(idx)?; + let param = match fmt { + Format::Binary => Parameter::new(&0i64.to_be_bytes()), + Format::Text => Parameter::new(b"0"), + }; + bind.set_param(idx, param); + } + break; + } + } + + // Rewrite SQL if any value was a literal. + if self.limit.limit.is_some() || self.limit.offset.is_some() { + let new_limit = (limit_val.unwrap_or(0) + offset_val.unwrap_or(0)) as i32; + let ast = request.ast.as_ref().ok_or(Error::MissingAst)?; + let mut protobuf = ast.ast.protobuf.clone(); + if rewrite_ast_limit_offset(&mut protobuf, new_limit) { + let result = pg_query::ParseResult::new(protobuf, "".into()); + let new_sql = result.deparse()?; + for message in request.messages.iter_mut() { + match message { + ProtocolMessage::Query(q) => q.set_query(&new_sql), + ProtocolMessage::Parse(p) => p.set_query(&new_sql), + _ => {} + } + } + } + } + + route.set_limit(Limit { + limit: limit_val, + offset: offset_val, + }); + + Ok(()) + } +} + +enum LimitValueInfo { + Literal(usize), + Param(usize), +} + +impl LimitValueInfo { + fn literal(&self) -> Option { + match self { + LimitValueInfo::Literal(v) => Some(*v), + LimitValueInfo::Param(_) => None, + } + } + + fn param_index(&self) -> usize { + match self { + LimitValueInfo::Param(i) => *i, + LimitValueInfo::Literal(_) => 0, + } + } +} + +fn extract_limit_value(node: &Option) -> Option { + match node { + Some(NodeEnum::AConst(AConst { + val: Some(Val::Ival(Integer { ival })), + .. + })) => Some(LimitValueInfo::Literal(*ival as usize)), + Some(NodeEnum::ParamRef(ParamRef { number, .. })) => { + Some(LimitValueInfo::Param(*number as usize)) + } + _ => None, + } +} + +fn rewrite_ast_limit_offset(ast: &mut ParseResult, new_limit: i32) -> bool { + let raw_stmt = match ast.stmts.first_mut() { + Some(s) => s, + None => return false, + }; + let stmt = match raw_stmt.stmt.as_mut() { + Some(s) => s, + None => return false, + }; + let select = match &mut stmt.node { + Some(NodeEnum::SelectStmt(s)) => s, + _ => return false, + }; + + select.limit_count = Some(Box::new(pg_query::Node { + node: Some(NodeEnum::AConst(AConst { + val: Some(Val::Ival(Integer { ival: new_limit })), + isnull: false, + location: -1i32, + })), + })); + + select.limit_offset = Some(Box::new(pg_query::Node { + node: Some(NodeEnum::AConst(AConst { + val: Some(Val::Ival(Integer { ival: 0 })), + isnull: false, + location: -1i32, + })), + })); + + true +} + +impl StatementRewrite<'_> { + pub(super) fn limit_offset(&mut self, plan: &mut RewritePlan) -> Result<(), Error> { + if self.schema.shards <= 1 { + return Ok(()); + } + + let raw_stmt = match self.stmt.stmts.first() { + Some(s) => s, + None => return Ok(()), + }; + let stmt = match raw_stmt.stmt.as_ref() { + Some(s) => s, + None => return Ok(()), + }; + let select = match &stmt.node { + Some(NodeEnum::SelectStmt(s)) => s, + _ => return Ok(()), + }; + + let offset_node = match &select.limit_offset { + Some(node) => node, + None => return Ok(()), + }; + let limit_node = match &select.limit_count { + Some(node) => node, + None => return Ok(()), + }; + + let limit_info = extract_limit_value(&limit_node.node); + let offset_info = extract_limit_value(&offset_node.node); + + let (limit_info, offset_info) = match (limit_info, offset_info) { + (Some(l), Some(o)) => (l, o), + _ => return Ok(()), + }; + + plan.offset = Some(OffsetPlan { + limit: Limit { + limit: limit_info.literal(), + offset: offset_info.literal(), + }, + limit_param: limit_info.param_index(), + offset_param: offset_info.param_index(), + }); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::backend::schema::Schema; + use crate::backend::ShardingSchema; + use crate::frontend::router::parser::cache::ast::Ast; + use crate::frontend::router::parser::route::{Route, Shard, ShardWithPriority}; + use crate::frontend::router::parser::StatementRewriteContext; + use crate::frontend::PreparedStatements; + use crate::net::messages::bind::{Bind, Parameter}; + use crate::net::messages::Query; + use crate::net::Parse; + use pgdog_config::{QueryParserEngine, Rewrite}; + + fn sharded_schema() -> ShardingSchema { + ShardingSchema { + shards: 2, + rewrite: Rewrite { + enabled: true, + ..Default::default() + }, + ..Default::default() + } + } + + fn single_shard_schema() -> ShardingSchema { + ShardingSchema { + shards: 1, + ..Default::default() + } + } + + fn cross_shard_route() -> Route { + Route::select( + ShardWithPriority::new_table(Shard::All), + vec![], + Default::default(), + Limit::default(), + None, + ) + } + + fn single_shard_route() -> Route { + Route::select( + ShardWithPriority::new_table(Shard::Direct(0)), + vec![], + Default::default(), + Limit::default(), + None, + ) + } + + fn make_ast(sql: &str) -> Ast { + Ast::new_record(sql, QueryParserEngine::PgQueryProtobuf).unwrap() + } + + fn run_limit_offset(sql: &str, schema: &ShardingSchema) -> RewritePlan { + let mut ast = pg_query::parse(sql).unwrap(); + let db_schema = Schema::default(); + let mut ps = PreparedStatements::default(); + let mut rewrite = StatementRewrite::new(StatementRewriteContext { + stmt: &mut ast.protobuf, + extended: false, + prepared: false, + prepared_statements: &mut ps, + schema, + db_schema: &db_schema, + user: "test", + search_path: None, + }); + let mut plan = RewritePlan::default(); + rewrite.limit_offset(&mut plan).unwrap(); + plan + } + + #[test] + fn test_limit_offset_detection_literals() { + let plan = run_limit_offset("SELECT * FROM t LIMIT 10 OFFSET 5", &sharded_schema()); + let offset = plan.offset.unwrap(); + assert_eq!(offset.limit.limit, Some(10)); + assert_eq!(offset.limit.offset, Some(5)); + } + + #[test] + fn test_limit_offset_detection_params() { + let plan = run_limit_offset("SELECT * FROM t LIMIT $1 OFFSET $2", &sharded_schema()); + let offset = plan.offset.unwrap(); + assert_eq!(offset.limit.limit, None); + assert_eq!(offset.limit.offset, None); + assert_eq!(offset.limit_param, 1); + assert_eq!(offset.offset_param, 2); + } + + #[test] + fn test_limit_offset_detection_mixed_limit_literal_offset_param() { + let plan = run_limit_offset("SELECT * FROM t LIMIT 10 OFFSET $1", &sharded_schema()); + let offset = plan.offset.unwrap(); + assert_eq!(offset.limit.limit, Some(10)); + assert_eq!(offset.limit.offset, None); + assert_eq!(offset.offset_param, 1); + } + + #[test] + fn test_limit_offset_detection_mixed_limit_param_offset_literal() { + let plan = run_limit_offset("SELECT * FROM t LIMIT $1 OFFSET 5", &sharded_schema()); + let offset = plan.offset.unwrap(); + assert_eq!(offset.limit.limit, None); + assert_eq!(offset.limit.offset, Some(5)); + assert_eq!(offset.limit_param, 1); + } + + #[test] + fn test_limit_offset_skipped_single_shard() { + let plan = run_limit_offset("SELECT * FROM t LIMIT 10 OFFSET 5", &single_shard_schema()); + assert!(plan.offset.is_none()); + } + + #[test] + fn test_limit_offset_skipped_no_offset() { + let plan = run_limit_offset("SELECT * FROM t LIMIT 10", &sharded_schema()); + assert!(plan.offset.is_none()); + } + + #[test] + fn test_limit_offset_skipped_no_limit() { + let plan = run_limit_offset("SELECT * FROM t OFFSET 5", &sharded_schema()); + assert!(plan.offset.is_none()); + } + + #[test] + fn test_apply_after_parser_literals_cross_shard() { + let plan = OffsetPlan { + limit: Limit { + limit: Some(10), + offset: Some(5), + }, + limit_param: 0, + offset_param: 0, + }; + let mut request = ClientRequest::from(vec![ProtocolMessage::Query(Query::new( + "SELECT * FROM t LIMIT 10 OFFSET 5", + ))]); + request.route = Some(cross_shard_route()); + request.ast = Some(make_ast("SELECT * FROM t LIMIT 10 OFFSET 5")); + + plan.apply_after_parser(&mut request).unwrap(); + + let query = match &request.messages[0] { + ProtocolMessage::Query(q) => q.query().to_owned(), + _ => panic!("expected Query"), + }; + assert_eq!(query, "SELECT * FROM t LIMIT 15 OFFSET 0"); + + let route = request.route.unwrap(); + assert_eq!(route.limit().limit, Some(10)); + assert_eq!(route.limit().offset, Some(5)); + } + + #[test] + fn test_apply_after_parser_params_cross_shard() { + let plan = OffsetPlan { + limit: Limit { + limit: None, + offset: None, + }, + limit_param: 1, + offset_param: 2, + }; + let mut request = ClientRequest::from(vec![ProtocolMessage::Bind(Bind::new_params( + "", + &[Parameter::new(b"10"), Parameter::new(b"5")], + ))]); + request.route = Some(cross_shard_route()); + + plan.apply_after_parser(&mut request).unwrap(); + + if let ProtocolMessage::Bind(bind) = &request.messages[0] { + assert_eq!(bind.params_raw()[0].data.as_ref(), b"15"); + assert_eq!(bind.params_raw()[1].data.as_ref(), b"0"); + } else { + panic!("expected Bind"); + } + + let route = request.route.unwrap(); + assert_eq!(route.limit().limit, Some(10)); + assert_eq!(route.limit().offset, Some(5)); + } + + #[test] + fn test_apply_after_parser_single_shard_noop() { + let plan = OffsetPlan { + limit: Limit { + limit: Some(10), + offset: Some(5), + }, + limit_param: 0, + offset_param: 0, + }; + let mut request = ClientRequest::from(vec![ProtocolMessage::Query(Query::new( + "SELECT * FROM t LIMIT 10 OFFSET 5", + ))]); + request.route = Some(single_shard_route()); + + plan.apply_after_parser(&mut request).unwrap(); + + let query = match &request.messages[0] { + ProtocolMessage::Query(q) => q.query().to_owned(), + _ => panic!("expected Query"), + }; + assert_eq!(query, "SELECT * FROM t LIMIT 10 OFFSET 5"); + } + + #[test] + fn test_apply_after_parser_mixed_limit_literal_offset_param() { + let plan = OffsetPlan { + limit: Limit { + limit: Some(10), + offset: None, + }, + limit_param: 0, + offset_param: 1, + }; + let mut request = ClientRequest::from(vec![ + ProtocolMessage::Parse(Parse::named("s", "SELECT * FROM t LIMIT 10 OFFSET $1")), + ProtocolMessage::Bind(Bind::new_params("s", &[Parameter::new(b"5")])), + ]); + request.route = Some(cross_shard_route()); + request.ast = Some(make_ast("SELECT * FROM t LIMIT 10 OFFSET $1")); + + plan.apply_after_parser(&mut request).unwrap(); + + if let ProtocolMessage::Bind(bind) = &request.messages[1] { + assert_eq!(bind.params_raw()[0].data.as_ref(), b"0"); + } else { + panic!("expected Bind"); + } + + let sql = match &request.messages[0] { + ProtocolMessage::Parse(p) => p.query().to_owned(), + _ => panic!("expected Parse"), + }; + assert_eq!(sql, "SELECT * FROM t LIMIT 15 OFFSET 0"); + + let route = request.route.unwrap(); + assert_eq!(route.limit().limit, Some(10)); + assert_eq!(route.limit().offset, Some(5)); + } +} diff --git a/pgdog/src/frontend/router/parser/rewrite/statement/plan.rs b/pgdog/src/frontend/router/parser/rewrite/statement/plan.rs index dde580f31..30e2b4ad3 100644 --- a/pgdog/src/frontend/router/parser/rewrite/statement/plan.rs +++ b/pgdog/src/frontend/router/parser/rewrite/statement/plan.rs @@ -4,6 +4,7 @@ use crate::net::{Bind, Parse, ProtocolMessage, Query}; use crate::unique_id::UniqueId; use super::insert::build_split_requests; +use super::offset::OffsetPlan; use super::{aggregate::AggregateRewritePlan, Error, InsertSplit, ShardingKeyUpdate}; /// Statement rewrite plan. @@ -42,11 +43,14 @@ pub struct RewritePlan { /// Sharding key is being updated, we need to execute /// a multi-step plan. pub(crate) sharding_key_update: Option, + + /// Limit/offset pagination. + pub(crate) offset: Option, } #[derive(Debug, Clone)] pub(crate) enum RewriteResult { - InPlace, + InPlace { offset: Option }, InsertSplit(Vec), ShardingKeyUpdate(ShardingKeyUpdate), } @@ -65,6 +69,7 @@ impl RewritePlan { }; bind.push_param(param, format); } + Ok(()) } @@ -122,7 +127,9 @@ impl RewritePlan { } } - Ok(RewriteResult::InPlace) + Ok(RewriteResult::InPlace { + offset: self.offset.clone(), + }) } } diff --git a/pgdog/src/frontend/router/parser/route.rs b/pgdog/src/frontend/router/parser/route.rs index 4505c0533..6aa8c919a 100644 --- a/pgdog/src/frontend/router/parser/route.rs +++ b/pgdog/src/frontend/router/parser/route.rs @@ -236,6 +236,10 @@ impl Route { &self.limit } + pub fn set_limit(&mut self, limit: Limit) { + self.limit = limit; + } + pub fn with_read(mut self, read: bool) -> Self { self.set_read(read); self diff --git a/pgdog/src/net/messages/bind.rs b/pgdog/src/net/messages/bind.rs index d1462702f..59a6f9862 100644 --- a/pgdog/src/net/messages/bind.rs +++ b/pgdog/src/net/messages/bind.rs @@ -278,6 +278,18 @@ impl Bind { self.original = None; } + /// Overwrite an existing parameter at the given index. + /// Returns `false` if the index is out of bounds. + pub fn set_param(&mut self, index: usize, param: Parameter) -> bool { + if let Some(slot) = self.params.get_mut(index) { + *slot = param; + self.original = None; + true + } else { + false + } + } + /// Get the effective format for new parameters. pub fn default_param_format(&self) -> Format { if self.codes.len() == 1 { @@ -479,6 +491,20 @@ mod test { } } + #[test] + fn test_set_param() { + let mut bind = Bind::new_params("test", &[Parameter::new(b"10"), Parameter::new(b"5")]); + assert_eq!(bind.params_raw()[0].data.as_ref(), b"10"); + assert_eq!(bind.params_raw()[1].data.as_ref(), b"5"); + + bind.set_param(0, Parameter::new(b"15")); + bind.set_param(1, Parameter::new(b"0")); + + assert_eq!(bind.params_raw()[0].data.as_ref(), b"15"); + assert_eq!(bind.params_raw()[1].data.as_ref(), b"0"); + assert_eq!(bind.params_raw().len(), 2); + } + #[test] fn test_large_parameter_count_round_trip() { let count = 35_000; From b27edfddab3cc1387560aaaf1afdbdfed8a0c9d6 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 18 Feb 2026 11:54:50 -0800 Subject: [PATCH 2/7] add integration tests --- integration/rust/tests/integration/mod.rs | 1 + integration/rust/tests/integration/offset.rs | 199 ++++++++++++++++++ .../frontend/client/query_engine/test/mod.rs | 1 + .../query_engine/test/rewrite_offset.rs | 106 ++++++++++ 4 files changed, 307 insertions(+) create mode 100644 integration/rust/tests/integration/offset.rs create mode 100644 pgdog/src/frontend/client/query_engine/test/rewrite_offset.rs diff --git a/integration/rust/tests/integration/mod.rs b/integration/rust/tests/integration/mod.rs index dda14b903..a25d94950 100644 --- a/integration/rust/tests/integration/mod.rs +++ b/integration/rust/tests/integration/mod.rs @@ -15,6 +15,7 @@ pub mod maintenance_mode; pub mod max; pub mod multi_set; pub mod notify; +pub mod offset; pub mod per_stmt_routing; pub mod prepared; pub mod reload; diff --git a/integration/rust/tests/integration/offset.rs b/integration/rust/tests/integration/offset.rs new file mode 100644 index 000000000..ec7de4769 --- /dev/null +++ b/integration/rust/tests/integration/offset.rs @@ -0,0 +1,199 @@ +use rust::setup::{admin_sqlx, connections_sqlx}; +use sqlx::{Executor, Row, postgres::PgPool}; + +const TABLE: &str = "offset_test"; + +async fn reset(pool: &PgPool) -> Result<(), sqlx::Error> { + for shard in [0, 1] { + pool.execute(format!("/* pgdog_shard: {shard} */ DROP TABLE IF EXISTS {TABLE}").as_str()) + .await + .ok(); + } + for shard in [0, 1] { + pool.execute( + format!( + "/* pgdog_shard: {shard} */ CREATE TABLE {TABLE} (id SERIAL, value INTEGER, customer_id BIGINT)" + ) + .as_str(), + ) + .await?; + } + admin_sqlx().await.execute("RELOAD").await?; + Ok(()) +} + +async fn seed(pool: &PgPool) -> Result<(), sqlx::Error> { + // Shard 0: values 1..=5 + pool.execute( + format!("/* pgdog_shard: 0 */ INSERT INTO {TABLE}(value) VALUES (1), (2), (3), (4), (5)") + .as_str(), + ) + .await?; + // Shard 1: values 6..=10 + pool.execute( + format!("/* pgdog_shard: 1 */ INSERT INTO {TABLE}(value) VALUES (6), (7), (8), (9), (10)") + .as_str(), + ) + .await?; + Ok(()) +} + +async fn cleanup(pool: &PgPool) { + for shard in [0, 1] { + pool.execute(format!("/* pgdog_shard: {shard} */ DROP TABLE {TABLE}").as_str()) + .await + .ok(); + } +} + +fn values(rows: &[sqlx::postgres::PgRow]) -> Vec { + rows.iter().map(|r| r.get::("value")).collect() +} + +#[tokio::test] +async fn offset_pagination_literals() -> Result<(), Box> { + let sharded = connections_sqlx().await.get(1).cloned().unwrap(); + reset(&sharded).await?; + seed(&sharded).await?; + + // Page 1: OFFSET 0 LIMIT 3 + let rows = sharded + .fetch_all(format!("SELECT value FROM {TABLE} ORDER BY value LIMIT 3 OFFSET 0").as_str()) + .await?; + assert_eq!(values(&rows), vec![1, 2, 3]); + + // Page 2: OFFSET 3 LIMIT 3 + let rows = sharded + .fetch_all(format!("SELECT value FROM {TABLE} ORDER BY value LIMIT 3 OFFSET 3").as_str()) + .await?; + assert_eq!(values(&rows), vec![4, 5, 6]); + + // Page 3: OFFSET 6 LIMIT 3 + let rows = sharded + .fetch_all(format!("SELECT value FROM {TABLE} ORDER BY value LIMIT 3 OFFSET 6").as_str()) + .await?; + assert_eq!(values(&rows), vec![7, 8, 9]); + + // Page 4: OFFSET 9 LIMIT 3 (only 1 row left) + let rows = sharded + .fetch_all(format!("SELECT value FROM {TABLE} ORDER BY value LIMIT 3 OFFSET 9").as_str()) + .await?; + assert_eq!(values(&rows), vec![10]); + + // Past the end + let rows = sharded + .fetch_all(format!("SELECT value FROM {TABLE} ORDER BY value LIMIT 3 OFFSET 10").as_str()) + .await?; + assert!(rows.is_empty()); + + cleanup(&sharded).await; + Ok(()) +} + +#[tokio::test] +async fn offset_pagination_prepared() -> Result<(), Box> { + let sharded = connections_sqlx().await.get(1).cloned().unwrap(); + reset(&sharded).await?; + seed(&sharded).await?; + + let sql = format!("SELECT value FROM {TABLE} ORDER BY value LIMIT $1 OFFSET $2"); + + // Page 1 + let rows = sqlx::query(&sql) + .bind(3i64) + .bind(0i64) + .fetch_all(&sharded) + .await?; + assert_eq!(values(&rows), vec![1, 2, 3]); + + // Page 2 + let rows = sqlx::query(&sql) + .bind(3i64) + .bind(3i64) + .fetch_all(&sharded) + .await?; + assert_eq!(values(&rows), vec![4, 5, 6]); + + // Page 3 + let rows = sqlx::query(&sql) + .bind(3i64) + .bind(6i64) + .fetch_all(&sharded) + .await?; + assert_eq!(values(&rows), vec![7, 8, 9]); + + // Partial last page + let rows = sqlx::query(&sql) + .bind(3i64) + .bind(9i64) + .fetch_all(&sharded) + .await?; + assert_eq!(values(&rows), vec![10]); + + // Past the end + let rows = sqlx::query(&sql) + .bind(3i64) + .bind(10i64) + .fetch_all(&sharded) + .await?; + assert!(rows.is_empty()); + + cleanup(&sharded).await; + Ok(()) +} + +#[tokio::test] +async fn offset_full_scan() -> Result<(), Box> { + let sharded = connections_sqlx().await.get(1).cloned().unwrap(); + reset(&sharded).await?; + seed(&sharded).await?; + + // OFFSET 0 with LIMIT covering all rows returns everything in order. + let rows = sharded + .fetch_all(format!("SELECT value FROM {TABLE} ORDER BY value LIMIT 10 OFFSET 0").as_str()) + .await?; + assert_eq!(values(&rows), vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + + cleanup(&sharded).await; + Ok(()) +} + +#[tokio::test] +async fn offset_large_offset() -> Result<(), Box> { + let sharded = connections_sqlx().await.get(1).cloned().unwrap(); + reset(&sharded).await?; + seed(&sharded).await?; + + // OFFSET larger than total row count. + let rows = sharded + .fetch_all(format!("SELECT value FROM {TABLE} ORDER BY value LIMIT 5 OFFSET 100").as_str()) + .await?; + assert!(rows.is_empty()); + + cleanup(&sharded).await; + Ok(()) +} + +#[tokio::test] +async fn offset_descending_order() -> Result<(), Box> { + let sharded = connections_sqlx().await.get(1).cloned().unwrap(); + reset(&sharded).await?; + seed(&sharded).await?; + + let rows = sharded + .fetch_all( + format!("SELECT value FROM {TABLE} ORDER BY value DESC LIMIT 3 OFFSET 2").as_str(), + ) + .await?; + assert_eq!(values(&rows), vec![8, 7, 6]); + + let rows = sharded + .fetch_all( + format!("SELECT value FROM {TABLE} ORDER BY value DESC LIMIT 3 OFFSET 5").as_str(), + ) + .await?; + assert_eq!(values(&rows), vec![5, 4, 3]); + + cleanup(&sharded).await; + Ok(()) +} diff --git a/pgdog/src/frontend/client/query_engine/test/mod.rs b/pgdog/src/frontend/client/query_engine/test/mod.rs index 31d552474..de93b592b 100644 --- a/pgdog/src/frontend/client/query_engine/test/mod.rs +++ b/pgdog/src/frontend/client/query_engine/test/mod.rs @@ -11,6 +11,7 @@ mod omni; pub mod prelude; mod rewrite_extended; mod rewrite_insert_split; +mod rewrite_offset; mod rewrite_simple_prepared; mod schema_changed; mod set; diff --git a/pgdog/src/frontend/client/query_engine/test/rewrite_offset.rs b/pgdog/src/frontend/client/query_engine/test/rewrite_offset.rs new file mode 100644 index 000000000..5c7b84a1d --- /dev/null +++ b/pgdog/src/frontend/client/query_engine/test/rewrite_offset.rs @@ -0,0 +1,106 @@ +use crate::frontend::router::parser::rewrite::statement::{ + offset::OffsetPlan, plan::RewriteResult, +}; +use crate::frontend::router::parser::Limit; + +use super::prelude::*; +use super::{test_client, test_sharded_client}; + +async fn run_test(messages: Vec) -> Option { + let mut client = test_sharded_client(); + client.client_request = ClientRequest::from(messages); + + let mut engine = QueryEngine::from_client(&client).unwrap(); + let mut context = QueryEngineContext::new(&mut client); + + engine.parse_and_rewrite(&mut context).await.unwrap(); + + match context.rewrite_result { + Some(RewriteResult::InPlace { offset }) => offset, + other => panic!("expected InPlace, got {:?}", other), + } +} + +#[tokio::test] +async fn test_offset_limit_literals() { + let offset = run_test(vec![ProtocolMessage::Query(Query::new( + "SELECT * FROM test LIMIT 10 OFFSET 5", + ))]) + .await; + + let offset = offset.expect("expected OffsetPlan"); + assert_eq!( + offset.limit, + Limit { + limit: Some(10), + offset: Some(5) + } + ); +} + +#[tokio::test] +async fn test_offset_limit_params() { + let offset = run_test(vec![ + ProtocolMessage::Parse(Parse::new_anonymous( + "SELECT * FROM test LIMIT $1 OFFSET $2", + )), + ProtocolMessage::Bind(Bind::new_params( + "", + &[Parameter::new(b"10"), Parameter::new(b"5")], + )), + ProtocolMessage::Execute(Execute::new()), + ProtocolMessage::Sync(Sync), + ]) + .await; + + let offset = offset.expect("expected OffsetPlan"); + assert_eq!(offset.limit.limit, None); + assert_eq!(offset.limit.offset, None); + assert_eq!(offset.limit_param, 1); + assert_eq!(offset.offset_param, 2); +} + +#[tokio::test] +async fn test_offset_limit_no_offset_no_plan() { + let offset = run_test(vec![ProtocolMessage::Query(Query::new( + "SELECT * FROM test LIMIT 10", + ))]) + .await; + + assert!(offset.is_none()); +} + +#[tokio::test] +async fn test_offset_limit_no_limit_no_plan() { + let offset = run_test(vec![ProtocolMessage::Query(Query::new( + "SELECT * FROM test OFFSET 5", + ))]) + .await; + + assert!(offset.is_none()); +} + +#[tokio::test] +async fn test_offset_limit_not_sharded() { + let mut client = test_client(); + client.client_request = ClientRequest::from(vec![ProtocolMessage::Query(Query::new( + "SELECT * FROM test LIMIT 10 OFFSET 5", + ))]); + + let mut engine = QueryEngine::from_client(&client).unwrap(); + let mut context = QueryEngineContext::new(&mut client); + + engine.parse_and_rewrite(&mut context).await.unwrap(); + + assert!(context.rewrite_result.is_none()); +} + +#[tokio::test] +async fn test_offset_limit_no_select() { + let offset = run_test(vec![ProtocolMessage::Query(Query::new( + "INSERT INTO test (id) VALUES (1)", + ))]) + .await; + + assert!(offset.is_none()); +} From 7d50e4d046a28397fe292308507f3b5f32e7a9d7 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 18 Feb 2026 12:09:25 -0800 Subject: [PATCH 3/7] check that we didnt break single-shard queries --- integration/rust/tests/integration/offset.rs | 121 +++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/integration/rust/tests/integration/offset.rs b/integration/rust/tests/integration/offset.rs index ec7de4769..5529b2dbe 100644 --- a/integration/rust/tests/integration/offset.rs +++ b/integration/rust/tests/integration/offset.rs @@ -46,6 +46,26 @@ async fn cleanup(pool: &PgPool) { } } +async fn seed_with_customer_id(pool: &PgPool) -> Result<(), sqlx::Error> { + // Shard 0: values 1..=5 with customer_id=1 + pool.execute( + format!( + "/* pgdog_shard: 0 */ INSERT INTO {TABLE}(value, customer_id) VALUES (1,1), (2,1), (3,1), (4,1), (5,1)" + ) + .as_str(), + ) + .await?; + // Shard 1: values 6..=10 with customer_id=1 + pool.execute( + format!( + "/* pgdog_shard: 1 */ INSERT INTO {TABLE}(value, customer_id) VALUES (6,1), (7,1), (8,1), (9,1), (10,1)" + ) + .as_str(), + ) + .await?; + Ok(()) +} + fn values(rows: &[sqlx::postgres::PgRow]) -> Vec { rows.iter().map(|r| r.get::("value")).collect() } @@ -197,3 +217,104 @@ async fn offset_descending_order() -> Result<(), Box> { cleanup(&sharded).await; Ok(()) } + +#[tokio::test] +async fn offset_single_shard_not_rewritten() -> Result<(), Box> { + let sharded = connections_sqlx().await.get(1).cloned().unwrap(); + reset(&sharded).await?; + seed_with_customer_id(&sharded).await?; + + // Query with shard comment forces single-shard routing. + // LIMIT/OFFSET should be passed through unchanged to Postgres. + let rows = sharded + .fetch_all( + format!( + "/* pgdog_shard: 0 */ SELECT value FROM {TABLE} ORDER BY value LIMIT 3 OFFSET 2" + ) + .as_str(), + ) + .await?; + // Shard 0 has values 1..=5, so OFFSET 2 LIMIT 3 → [3, 4, 5] + assert_eq!(values(&rows), vec![3, 4, 5]); + + let rows = sharded + .fetch_all( + format!( + "/* pgdog_shard: 0 */ SELECT value FROM {TABLE} ORDER BY value LIMIT 2 OFFSET 4" + ) + .as_str(), + ) + .await?; + // Only 1 row left at offset 4 on shard 0 + assert_eq!(values(&rows), vec![5]); + + let rows = sharded + .fetch_all( + format!( + "/* pgdog_shard: 1 */ SELECT value FROM {TABLE} ORDER BY value LIMIT 3 OFFSET 0" + ) + .as_str(), + ) + .await?; + // Shard 1 has values 6..=10 + assert_eq!(values(&rows), vec![6, 7, 8]); + + let rows = sharded + .fetch_all( + format!( + "/* pgdog_shard: 1 */ SELECT value FROM {TABLE} ORDER BY value LIMIT 10 OFFSET 3" + ) + .as_str(), + ) + .await?; + assert_eq!(values(&rows), vec![9, 10]); + + cleanup(&sharded).await; + Ok(()) +} + +#[tokio::test] +async fn offset_single_shard_prepared() -> Result<(), Box> { + let sharded = connections_sqlx().await.get(1).cloned().unwrap(); + reset(&sharded).await?; + seed_with_customer_id(&sharded).await?; + + // Parameterized query targeting a single shard via WHERE customer_id. + let sql = format!( + "SELECT value FROM {TABLE} WHERE customer_id = $1 ORDER BY value LIMIT $2 OFFSET $3" + ); + + // customer_id=1 hashes to some shard; both shards have data with customer_id=1, + // but the router sends the query to only one shard. Regardless of which shard + // it picks, LIMIT/OFFSET should work correctly on the shard's local data. + let rows = sqlx::query(&sql) + .bind(1i64) + .bind(3i64) + .bind(0i64) + .fetch_all(&sharded) + .await?; + assert_eq!(rows.len(), 3); + // Values should be consecutive and ordered. + let vals = values(&rows); + assert_eq!(vals[1] - vals[0], 1); + assert_eq!(vals[2] - vals[1], 1); + + let first_page = vals.clone(); + + // Next page should continue where we left off. + let rows = sqlx::query(&sql) + .bind(1i64) + .bind(3i64) + .bind(3i64) + .fetch_all(&sharded) + .await?; + let second_page = values(&rows); + + // No overlap between pages. + for v in &second_page { + assert!(!first_page.contains(v)); + } + + cleanup(&sharded).await; + Ok(()) +} From c8d1ed6afafc06c5574eae65b2403d0d22db5a55 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 18 Feb 2026 12:12:20 -0800 Subject: [PATCH 4/7] cleaner --- pgdog/src/frontend/client/query_engine/route_query.rs | 8 ++------ .../router/parser/rewrite/statement/offset.rs | 2 +- .../frontend/router/parser/rewrite/statement/plan.rs | 11 +++++++++++ 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/pgdog/src/frontend/client/query_engine/route_query.rs b/pgdog/src/frontend/client/query_engine/route_query.rs index 25bd823d2..f8ad25479 100644 --- a/pgdog/src/frontend/client/query_engine/route_query.rs +++ b/pgdog/src/frontend/client/query_engine/route_query.rs @@ -2,8 +2,6 @@ use pgdog_config::PoolerMode; use tokio::time::timeout; use tracing::trace; -use crate::frontend::router::parser::rewrite::statement::plan::RewriteResult; - use super::*; #[derive(Debug, Clone)] @@ -100,10 +98,8 @@ impl QueryEngine { ); // Apply post-parser rewrites, e.g. offset/limit. - if let Some(RewriteResult::InPlace { offset }) = &context.rewrite_result { - if let Some(offset) = offset { - offset.apply_after_parser(context.client_request)?; - } + if let Some(ref rewrite_result) = &context.rewrite_result { + rewrite_result.apply_after_parser(context.client_request)?; } } Err(err) => { diff --git a/pgdog/src/frontend/router/parser/rewrite/statement/offset.rs b/pgdog/src/frontend/router/parser/rewrite/statement/offset.rs index afeb375a3..7e54b7ad2 100644 --- a/pgdog/src/frontend/router/parser/rewrite/statement/offset.rs +++ b/pgdog/src/frontend/router/parser/rewrite/statement/offset.rs @@ -16,7 +16,7 @@ pub(crate) struct OffsetPlan { } impl OffsetPlan { - pub(crate) fn apply_after_parser(&self, request: &mut ClientRequest) -> Result<(), Error> { + pub(super) fn apply_after_parser(&self, request: &mut ClientRequest) -> Result<(), Error> { let route = match request.route.as_mut() { Some(route) => route, None => return Ok(()), diff --git a/pgdog/src/frontend/router/parser/rewrite/statement/plan.rs b/pgdog/src/frontend/router/parser/rewrite/statement/plan.rs index 30e2b4ad3..2ef79d65a 100644 --- a/pgdog/src/frontend/router/parser/rewrite/statement/plan.rs +++ b/pgdog/src/frontend/router/parser/rewrite/statement/plan.rs @@ -55,6 +55,17 @@ pub(crate) enum RewriteResult { ShardingKeyUpdate(ShardingKeyUpdate), } +impl RewriteResult { + pub(crate) fn apply_after_parser(&self, request: &mut ClientRequest) -> Result<(), Error> { + match self { + Self::InPlace { + offset: Some(ref offset), + } => offset.apply_after_parser(request), + _ => Ok(()), + } + } +} + impl RewritePlan { /// Apply the rewrite plan to a Bind message by appending generated unique IDs. pub(crate) fn apply_bind(&self, bind: &mut Bind) -> Result<(), Error> { From ee825835748125f5273323c80336b2081bcffbda Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 18 Feb 2026 12:28:19 -0800 Subject: [PATCH 5/7] more tests --- .../query_engine/test/rewrite_offset.rs | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) diff --git a/pgdog/src/frontend/client/query_engine/test/rewrite_offset.rs b/pgdog/src/frontend/client/query_engine/test/rewrite_offset.rs index 5c7b84a1d..f2e996d57 100644 --- a/pgdog/src/frontend/client/query_engine/test/rewrite_offset.rs +++ b/pgdog/src/frontend/client/query_engine/test/rewrite_offset.rs @@ -1,6 +1,7 @@ use crate::frontend::router::parser::rewrite::statement::{ offset::OffsetPlan, plan::RewriteResult, }; +use crate::frontend::router::parser::route::{Route, Shard, ShardWithPriority}; use crate::frontend::router::parser::Limit; use super::prelude::*; @@ -21,6 +22,16 @@ async fn run_test(messages: Vec) -> Option { } } +fn cross_shard_route() -> Route { + Route::select( + ShardWithPriority::new_table(Shard::All), + vec![], + Default::default(), + Limit::default(), + None, + ) +} + #[tokio::test] async fn test_offset_limit_literals() { let offset = run_test(vec![ProtocolMessage::Query(Query::new( @@ -104,3 +115,130 @@ async fn test_offset_limit_no_select() { assert!(offset.is_none()); } + +#[tokio::test] +async fn test_offset_with_unique_id_simple() { + unsafe { + std::env::set_var("NODE_ID", "pgdog-1"); + } + let sql = "SELECT pgdog.unique_id() FROM test LIMIT 10 OFFSET 5"; + let mut client = test_sharded_client(); + client.client_request = ClientRequest::from(vec![ProtocolMessage::Query(Query::new(sql))]); + + let mut engine = QueryEngine::from_client(&client).unwrap(); + let mut context = QueryEngineContext::new(&mut client); + + engine.parse_and_rewrite(&mut context).await.unwrap(); + + // After parse_and_rewrite, the Query message should have unique_id replaced. + let rewritten_sql = match &context.client_request.messages[0] { + ProtocolMessage::Query(q) => q.query().to_owned(), + _ => panic!("expected Query"), + }; + assert!( + !rewritten_sql.contains("pgdog.unique_id"), + "unique_id should be replaced: {rewritten_sql}" + ); + assert!( + rewritten_sql.contains("::bigint"), + "should have bigint cast: {rewritten_sql}" + ); + + // apply_after_parser with a cross-shard route. + context.client_request.route = Some(cross_shard_route()); + context + .rewrite_result + .as_ref() + .unwrap() + .apply_after_parser(context.client_request) + .unwrap(); + + let final_sql = match &context.client_request.messages[0] { + ProtocolMessage::Query(q) => q.query().to_owned(), + _ => panic!("expected Query"), + }; + + // unique_id rewrite must survive. + assert!( + !final_sql.contains("pgdog.unique_id"), + "unique_id rewrite must survive apply_after_parser: {final_sql}" + ); + assert!( + final_sql.contains("::bigint"), + "bigint cast must survive: {final_sql}" + ); + // LIMIT/OFFSET must be rewritten for cross-shard. + assert!( + final_sql.contains("LIMIT 15"), + "LIMIT should be 10+5=15: {final_sql}" + ); + assert!( + final_sql.contains("OFFSET 0"), + "OFFSET should be 0: {final_sql}" + ); +} + +#[tokio::test] +async fn test_offset_with_unique_id_extended() { + unsafe { + std::env::set_var("NODE_ID", "pgdog-1"); + } + let sql = "SELECT pgdog.unique_id(), $1 FROM test LIMIT $2 OFFSET $3"; + let mut client = test_sharded_client(); + client.client_request = ClientRequest::from(vec![ + ProtocolMessage::Parse(Parse::new_anonymous(sql)), + ProtocolMessage::Bind(Bind::new_params( + "", + &[ + Parameter::new(b"hello"), + Parameter::new(b"10"), + Parameter::new(b"5"), + ], + )), + ProtocolMessage::Execute(Execute::new()), + ProtocolMessage::Sync(Sync), + ]); + + let mut engine = QueryEngine::from_client(&client).unwrap(); + let mut context = QueryEngineContext::new(&mut client); + + engine.parse_and_rewrite(&mut context).await.unwrap(); + + // After parse_and_rewrite, Parse should have unique_id rewritten to $4::bigint. + let rewritten_sql = match &context.client_request.messages[0] { + ProtocolMessage::Parse(p) => p.query().to_owned(), + _ => panic!("expected Parse"), + }; + assert_eq!( + rewritten_sql, + "SELECT $4::bigint, $1 FROM test LIMIT $2 OFFSET $3" + ); + + // apply_after_parser with cross-shard route should only rewrite Bind params. + context.client_request.route = Some(cross_shard_route()); + context + .rewrite_result + .as_ref() + .unwrap() + .apply_after_parser(context.client_request) + .unwrap(); + + // SQL unchanged (all limit/offset are params). + let final_sql = match &context.client_request.messages[0] { + ProtocolMessage::Parse(p) => p.query().to_owned(), + _ => panic!("expected Parse"), + }; + assert_eq!( + final_sql, "SELECT $4::bigint, $1 FROM test LIMIT $2 OFFSET $3", + "SQL must be unchanged for all-param case" + ); + + // Bind params: $1=hello unchanged, $2=limit rewritten to 15, $3=offset rewritten to 0. + if let ProtocolMessage::Bind(bind) = &context.client_request.messages[1] { + assert_eq!(bind.params_raw()[0].data.as_ref(), b"hello"); + assert_eq!(bind.params_raw()[1].data.as_ref(), b"15"); + assert_eq!(bind.params_raw()[2].data.as_ref(), b"0"); + } else { + panic!("expected Bind"); + } +} From 4104af149540591ea218260f8fe9cc7fd01067ee Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 18 Feb 2026 12:33:46 -0800 Subject: [PATCH 6/7] update comment --- pgdog/src/backend/pool/connection/buffer.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pgdog/src/backend/pool/connection/buffer.rs b/pgdog/src/backend/pool/connection/buffer.rs index 4b9f028c2..c5f6c3527 100644 --- a/pgdog/src/backend/pool/connection/buffer.rs +++ b/pgdog/src/backend/pool/connection/buffer.rs @@ -220,13 +220,6 @@ impl Buffer { } /// Execute LIMIT ... OFFSET ... - /// - /// N.B.: offset is incorrectly calculated at the moment - /// because we send it to Postgres as-is. - /// - /// What we need to do is rewrite LIMIT to be LIMIT + OFFSET, - /// overfetch those rows, and then apply the OFFSET to the entire - /// result set from all shards. pub(super) fn limit(&mut self, limit: &Limit) { let offset = limit.offset.unwrap_or(0); self.buffer.drain(..offset.min(self.buffer.len())); From 050d0c8b65fd931fb8dcaf44de87572d00588cf7 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 18 Feb 2026 12:48:22 -0800 Subject: [PATCH 7/7] moar tests --- pgdog/src/frontend/router/parser/route.rs | 80 ++++++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/pgdog/src/frontend/router/parser/route.rs b/pgdog/src/frontend/router/parser/route.rs index 6aa8c919a..ed717bf8d 100644 --- a/pgdog/src/frontend/router/parser/route.rs +++ b/pgdog/src/frontend/router/parser/route.rs @@ -229,7 +229,10 @@ impl Route { } pub fn should_buffer(&self) -> bool { - !self.order_by().is_empty() || !self.aggregate().is_empty() || self.distinct().is_some() + !self.order_by().is_empty() + || !self.aggregate().is_empty() + || self.distinct().is_some() + || self.limit().offset.is_some() } pub fn limit(&self) -> &Limit { @@ -590,6 +593,81 @@ mod test { ); } + #[test] + fn test_should_buffer_empty_route() { + let route = Route::default(); + assert!(!route.should_buffer()); + } + + #[test] + fn test_should_buffer_order_by() { + let route = Route::select( + ShardWithPriority::new_table(Shard::All), + vec![OrderBy::Asc(0)], + Default::default(), + Limit::default(), + None, + ); + assert!(route.should_buffer()); + } + + #[test] + fn test_should_buffer_limit_only() { + let route = Route::select( + ShardWithPriority::new_table(Shard::All), + vec![], + Default::default(), + Limit { + limit: Some(10), + offset: None, + }, + None, + ); + assert!(!route.should_buffer()); + } + + #[test] + fn test_should_buffer_offset_only() { + let route = Route::select( + ShardWithPriority::new_table(Shard::All), + vec![], + Default::default(), + Limit { + limit: None, + offset: Some(5), + }, + None, + ); + assert!(route.should_buffer()); + } + + #[test] + fn test_should_buffer_limit_and_offset() { + let route = Route::select( + ShardWithPriority::new_table(Shard::All), + vec![], + Default::default(), + Limit { + limit: Some(10), + offset: Some(5), + }, + None, + ); + assert!(route.should_buffer()); + } + + #[test] + fn test_should_buffer_no_limit_no_offset() { + let route = Route::select( + ShardWithPriority::new_table(Shard::All), + vec![], + Default::default(), + Limit::default(), + None, + ); + assert!(!route.should_buffer()); + } + #[test] fn test_comment_override_set() { let mut shards = ShardsWithPriority::default();