From dfd4b7bf7b7ae88212f1a7c41027a99fe65eec8e Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Fri, 8 May 2026 10:57:30 +0200 Subject: [PATCH 1/5] Cover the build_exclude_extension_string with conditional compilation It is just the fact that --exclude-extension has been introduced in Postgres 18. --- src/spock_failover_slots.c | 5 +++-- src/spock_sync.c | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/spock_failover_slots.c b/src/spock_failover_slots.c index 552e3745..73a8e385 100644 --- a/src/spock_failover_slots.c +++ b/src/spock_failover_slots.c @@ -1584,6 +1584,9 @@ attach_to_walsender(Port *port, int status) void spock_init_failover_slot(void) { +#if PG_VERSION_NUM < 180000 + BackgroundWorker bgw; +#endif DefineCustomStringVariable( "spock.pg_standby_slot_names", "list of names of slot that must confirm changes before they're sent by the decoding plugin", @@ -1648,8 +1651,6 @@ spock_init_failover_slot(void) elog(LOG, "spock: skipping failover slot worker on PostgreSQL 18+ " "(use sync_replication_slots = on instead)"); #else - /* Run the worker. */ - BackgroundWorker bgw; memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = diff --git a/src/spock_sync.c b/src/spock_sync.c index 50d16971..7f7ed86b 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -166,7 +166,7 @@ get_pg_executable(char *cmdname, char *cmdbuf) * The sub->skip_schema typically doesn't include our globally-skipped objects. * So, don't care about duplicates. */ - +#if PG_VERSION_NUM >= 180000 static List * build_exclude_extension_string(void) { @@ -184,6 +184,7 @@ build_exclude_extension_string(void) } return lst; } +#endif static List * build_exclude_schema_string(SpockSubscription *sub) From 145c1193e2bb88da86c2e10b885a29c99bb50185 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Wed, 18 Feb 2026 11:10:57 +0100 Subject: [PATCH 2/5] Remove Windows platform support code Drop Win32-specific code paths including exec_cmd_win32(), GetTempPath() temp directory resolution, Windows argument quoting, and related type definitions. Spock targets POSIX-only environments. --- include/spock_sync.h | 4 - src/spock.c | 16 -- src/spock_sync.c | 336 +---------------------------------- utils/pgindent/typedefs.list | 7 - 4 files changed, 1 insertion(+), 362 deletions(-) diff --git a/include/spock_sync.h b/include/spock_sync.h index 2169c07a..b1cf40e2 100644 --- a/include/spock_sync.h +++ b/include/spock_sync.h @@ -102,8 +102,4 @@ extern bool wait_for_sync_status_change(Oid subid, const char *nspname, extern void truncate_table(char *nspname, char *relname); extern List *get_subscription_tables(Oid subid); -#ifdef WIN32 -extern void QuoteWindowsArgv(StringInfo cmdline, const char *argv[]); -#endif - #endif /* SPOCK_SYNC_H */ diff --git a/src/spock.c b/src/spock.c index dd3dcb66..2d9680ad 100644 --- a/src/spock.c +++ b/src/spock.c @@ -814,28 +814,12 @@ spock_temp_directory_assing_hook(const char *newval, void *extra) } else { -#ifndef WIN32 const char *tmpdir = getenv("TMPDIR"); if (!tmpdir) tmpdir = "/tmp"; -#else - char tmpdir[MAXPGPATH]; - int ret; - - ret = GetTempPath(MAXPGPATH, tmpdir); - if (ret == 0 || ret > MAXPGPATH) - { - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("could not locate temporary directory: %s\n", - !ret ? strerror(errno) : ""))); - return false; - } -#endif spock_temp_directory = strdup(tmpdir); - } if (spock_temp_directory == NULL) diff --git a/src/spock_sync.c b/src/spock_sync.c index 7f7ed86b..2b2450aa 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -13,12 +13,7 @@ #include "postgres.h" #include - -#ifdef WIN32 -#include -#else #include -#endif #include "libpq-fe.h" @@ -89,14 +84,10 @@ PGDLLEXPORT void spock_sync_main(Datum main_arg); static SpockSyncWorker *MySyncWorker = NULL; -#ifdef WIN32 -static int exec_cmd_win32(const char *cmd, char *cmdargv[]); -#endif - /* * Run a command and wait for it to exit, then return its exit code - * in the same format as waitpid() including on Windows. + * in the same format as waitpid(). * * Does not elog(ERROR). * @@ -122,7 +113,6 @@ exec_cmd(const char *cmd, char *cmdargv[]) fflush(stdout); fflush(stderr); -#ifndef WIN32 if ((pid = fork()) == 0) { if (execv(cmd, cmdargv) < 0) @@ -136,9 +126,6 @@ exec_cmd(const char *cmd, char *cmdargv[]) if (waitpid(pid, &stat, 0) != pid) stat = -1; -#else - stat = exec_cmd_win32(cmd, cmdargv); -#endif return stat; } @@ -2588,324 +2575,3 @@ truncate_table(char *nspname, char *relname) CommandCounterIncrement(); } - - -/* - * exec_cmd support for win32 - */ -#ifdef WIN32 -/* - * Return formatted message from GetLastError() in a palloc'd string in the - * current memory context, or a copy of a constant generic error string if - * there's no recorded error state. - */ -static char * -PglGetLastWin32Error(void) -{ - LPVOID lpMsgBuf; - DWORD dw = GetLastError(); - char *pgstr = NULL; - - if (dw != ERROR_SUCCESS) - { - FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, dw, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPTSTR) &lpMsgBuf, 0, NULL); - pgstr = pstrdup((LPTSTR) lpMsgBuf); - LocalFree(lpMsgBuf); - } - else - { - pgstr = pstrdup("Unknown error or no recent error"); - } - - return pgstr; -} - - -/* - * See https://docs.microsoft.com/en-us/archive/blogs/twistylittlepassagesallalike/everyone-quotes-command-line-arguments-the-wrong-way - * for the utterly putrid way Windows handles command line arguments, and the insane lack of any inverse - * form of the CommandLineToArgvW function in the win32 API. - */ -static void -QuoteWindowsArgvElement(StringInfo cmdline, const char *arg, bool force) -{ - if (!force && *arg != '\0' - && strchr(arg, ' ') == NULL - && strchr(arg, '\t') == NULL - && strchr(arg, '\n') == NULL - && strchr(arg, '\v') == NULL - && strchr(arg, '"') == NULL) - { - appendStringInfoString(cmdline, arg); - } - else - { - const char *it; - - /* Begin quoted argument */ - appendStringInfoChar(cmdline, '"'); - - /* - * In terms of the algorithm described in CommandLineToArgvW's - * documentation we are now "in quotes". - */ - - for (it = arg; *it != '\0'; it++) - { - unsigned int NumberBackslashes = 0; - - /* - * Accumulate runs of backslashes. They may or may not have - * special meaning depending on what follows them. - */ - while (*it != '\0' && *it == '\\') - { - ++it; - ++NumberBackslashes; - } - - if (*it == '\0') - { - /* - * Handle command line arguments ending with or consisting - * only of backslashes. Particularly important for Windows, - * given its backslash paths. - * - * We want NumberBackSlashes * 2 backslashes here to prevent - * the final backslash from escaping the quote we'll append at - * the end of the argument. - */ - for (; NumberBackslashes > 0; NumberBackslashes--) - appendStringInfoString(cmdline, "\\\\"); - break; - } - else if (*it == '"') - { - /* - * Escape all accumulated backslashes, then append escaped - * quotation mark. - * - * We want NumberBackSlashes * 2 + 1 backslashes to prevent - * the backslashes from escaping the backslash we have to - * append to escape the quote char that's part of the argument - * itself. - */ - for (; NumberBackslashes > 0; NumberBackslashes--) - appendStringInfoString(cmdline, "\\\\"); - appendStringInfoString(cmdline, "\\\""); - } - else - { - /* - * A series of backslashes followed by something other than a - * double quote is not special to the CommandLineToArgvW - * parser in MSVCRT and must be appended literally. - */ - for (; NumberBackslashes > 0; NumberBackslashes--) - appendStringInfoChar(cmdline, '\\'); - /* Finally any normal char */ - appendStringInfoChar(cmdline, *it); - } - } - - /* End quoted argument */ - appendStringInfoChar(cmdline, '"'); - - /* - * In terms of the algorithm described in CommandLineToArgvW's - * documentation we are now "not in quotes". - */ - } -} - -/* - * Turn an execv-style argument vector into something that Win32's - * CommandLineToArgvW will parse back into the original argument - * vector. - * - * You'd think this would be part of the win32 API. But no... - * - * (This should arguably be part of libpq_fe.c, but I didn't want to expand our - * abuse of PqExpBuffer.) - */ -static void -QuoteWindowsArgv(StringInfo cmdline, const char *argv[]) -{ - /* argv0 is required */ - Assert(*argv != NULL && **argv != '\0'); - QuoteWindowsArgvElement(cmdline, *argv, false); - ++argv; - - for (; *argv != NULL; ++argv) - { - appendStringInfoChar(cmdline, ' '); - QuoteWindowsArgvElement(cmdline, *argv, false); - } -} - -/* - * Run a process on Windows and wait for it to exit, then return its exit code. - * Preserve argument quoting. See exec_cmd() for the function contract details. - * This is only split out to keep all the win32 horror separate for reability. - * - * Don't be tempted to use Win32's _spawnv. It is not like execv. It does *not* - * preserve the individual arguments in the vector, it concatenates them - * without any escaping or quoting. Thus any arguments with spaces, double - * quotes, etc will be mangled by the child process's MSVC runtime when it - * tries to turn the argument string back into an argument vector for the main - * function by calling CommandLineToArgv() from the C library entrypoint. - * _spawnv is also limited to 1024 characters not the 32767 characters permited - * by the underlying Win32 APIs, and that could matter for pg_dump. - * - * This provides something more like we'e expect from execv and waitpid() - * including a waitpid()-style return code with the exit code in the high - * 8 bits of a 16 bit value. Use WEXITSTATUS() for the exit status. The - * special value -1 is returned for a failure to launch the process, - * wait for it, or get its exit code. - */ -static int -exec_cmd_win32(const char *cmd, char *cmdargv[]) -{ - BOOL ret; - int exitcode = -1; - PROCESS_INFORMATION pi; - - elog(DEBUG1, "trying to launch \"%s\"", cmd); - - /* Launch the process */ - { - STARTUPINFO si; - StringInfoData cmdline; - char *cmd_tmp; - - /* Deal with insane windows command line quoting */ - initStringInfo(&cmdline); - QuoteWindowsArgv(&cmdline, cmdargv); - - /* CreateProcess may scribble on the cmd string */ - cmd_tmp = pstrdup(cmd); - - /* - * STARTUPINFO contains various extra options for the process that are - * not passed as CreateProcess flags, and is required. - */ - ZeroMemory(&si, sizeof(si)); - si.cb = sizeof(si); - - /* - * PROCESS_INFORMATION accepts the returned process handle. - */ - ZeroMemory(&pi, sizeof(pi)); - ret = CreateProcess(cmd_tmp, cmdline.data, - NULL /* default process attributes */ , - NULL /* default thread attributes */ , - TRUE /* handles (fds) are inherited, to match - * execv */ , - CREATE_NO_WINDOW /* process creation flags */ , - NULL /* inherit environment variables */ , - NULL /* inherit working directory */ , - &si, - &pi); - - pfree(cmd_tmp); - pfree(cmdline.data); - } - - if (!ret) - { - char *winerr = PglGetLastWin32Error(); - - ereport(LOG, - (errcode_for_file_access(), - errmsg("failed to launch \"%s\": %s", - cmd, winerr))); - pfree(winerr); - } - else - { - /* - * Process created. It can still fail due to DLL linkage errors, - * startup problems etc, but the handle exists. - * - * Wait for it to exit, while responding to interrupts. Ideally we - * should be able to use WaitEventSetWait here since Windows sees a - * process handle much like a socket, but the Pg API for it won't let - * us, so we have to DIY. - */ - - elog(DEBUG1, "process launched, waiting"); - - do - { - ret = WaitForSingleObject(pi.hProcess, 500 /* timeout in ms */ ); - - /* - * Note that if we elog(ERROR) or elog(FATAL) as a result of a - * signal here we won't kill the child proc. - */ - CHECK_FOR_INTERRUPTS(); - - if (ret == WAIT_TIMEOUT) - continue; - - if (ret != WAIT_OBJECT_0) - { - char *winerr = PglGetLastWin32Error(); - - ereport(DEBUG1, - (errcode_for_file_access(), - errmsg("unexpected WaitForSingleObject() return code %d while waiting for child process \"%s\": %s", - ret, cmd, winerr))); - pfree(winerr); - /* Try to get the exit code anyway */ - } - - if (!GetExitCodeProcess(pi.hProcess, &exitcode)) - { - char *winerr = PglGetLastWin32Error(); - - ereport(DEBUG1, - (errcode_for_file_access(), - errmsg("failed to get exit code from process \"%s\": %s", - cmd, winerr))); - pfree(winerr); - /* Give up on learning about the process's outcome */ - exitcode = -1; - break; - } - else - { - /* Woken up for a reason other than child process termination */ - if (exitcode == STILL_ACTIVE) - continue; - - /* - * Process must've exited, so code is a value from - * ExitProcess, TerminateProcess, main or WinMain. - */ - ereport(DEBUG1, - (errmsg("process \"%s\" exited with code %d", - cmd, exitcode))); - - /* - * Adapt exit code to WEXITSTATUS form to behave like - * waitpid(). - * - * The lower 8 bits are the terminating signal, with 0 for no - * signal. - */ - exitcode = exitcode << 8; - - break; - } - } while (true); - - CloseHandle(pi.hProcess); - CloseHandle(pi.hThread); - } - - elog(DEBUG1, "exec_cmd_win32 for \"%s\" exiting with %d", cmd, exitcode); - return exitcode; -} -#endif diff --git a/utils/pgindent/typedefs.list b/utils/pgindent/typedefs.list index fd3797cc..adfc133f 100644 --- a/utils/pgindent/typedefs.list +++ b/utils/pgindent/typedefs.list @@ -234,7 +234,6 @@ BTVacuumPostingData BTWriteState BUF_MEM BYTE -BY_HANDLE_FILE_INFORMATION BackendParameters BackendStartupData BackendState @@ -605,7 +604,6 @@ DR_transientrel DSMREntryType DSMRegistryCtxStruct DSMRegistryEntry -DWORD DataDirSyncMethod DataDumperPtr DataPageDeleteStack @@ -1137,7 +1135,6 @@ GucStackState GucStringAssignHook GucStringCheckHook GzipCompressorState -HANDLE HASHACTION HASHBUCKET HASHCTL @@ -1525,7 +1522,6 @@ LPSERVICE_STATUS LPSTR LPTHREAD_START_ROUTINE LPTSTR -LPVOID LPWSTR LSEG LUID @@ -2620,7 +2616,6 @@ RunMode RunningTransactions RunningTransactionsData SASLStatus -SC_HANDLE SECURITY_ATTRIBUTES SECURITY_STATUS SEG @@ -2628,7 +2623,6 @@ SERIALIZABLEXACT SERIALIZABLEXID SERIALIZABLEXIDTAG SERVICE_STATUS -SERVICE_STATUS_HANDLE SERVICE_TABLE_ENTRY SID_AND_ATTRIBUTES SID_IDENTIFIER_AUTHORITY @@ -4245,7 +4239,6 @@ walrcv_server_version_fn walrcv_startstreaming_fn wchar2mb_with_len_converter wchar_t -win32_deadchild_waitinfo wint_t worker_state worktable From 02bc4e2c7cda1ab6bbc24dd3e0a157f6dae033dc Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Tue, 31 Mar 2026 14:01:09 +0200 Subject: [PATCH 3/5] Add clarification comment for spock_disable_subscription It may be unclear to detect if someone forget to commit after the call of this function wiping out the result. So, add a comment to reduce number of coding errors. --- src/spock_exception_handler.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/spock_exception_handler.c b/src/spock_exception_handler.c index 2efe532e..d5ff5105 100644 --- a/src/spock_exception_handler.c +++ b/src/spock_exception_handler.c @@ -217,6 +217,12 @@ add_entry_to_exception_log(Oid remote_origin, TimestampTz remote_commit_ts, * This function is invoked when the configured exception handling behavior is * SUB_DISABLE, meaning the subscription must be suspended instead of skipping * or retrying the failing transaction. + * + * May be called with or without an active transaction. If no transaction is + * in progress, one is started and committed internally. If the caller already + * holds an open transaction, it is the caller's responsibility to ensure that + * transaction is either committed or terminates with a FATAL error; otherwise + * the subscription state change and exception_log entry will be rolled back. */ void spock_disable_subscription(SpockSubscription *sub, From 081ce0d704f335c5f623a703856292393d0c7d55 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Fri, 8 May 2026 11:06:05 +0200 Subject: [PATCH 4/5] sql: tidy spock--6.0.0-devel.sql to read as a uniform UI Group the head install script by feature area and normalise function signatures to a single style, so the script reads as a coherent user-facing interface rather than the accumulated insertion-order history it had become. No functional changes; upgrade scripts untouched. --- sql/spock--6.0.0-devel.sql | 1281 +++++++++++++++++++++--------------- 1 file changed, 762 insertions(+), 519 deletions(-) diff --git a/sql/spock--6.0.0-devel.sql b/sql/spock--6.0.0-devel.sql index 35cbebe1..fb580294 100644 --- a/sql/spock--6.0.0-devel.sql +++ b/sql/spock--6.0.0-devel.sql @@ -1,5 +1,9 @@ \echo Use "CREATE EXTENSION spock" to load this file. \quit +-- ---------------------------------------------------------------------- +-- Catalog tables +-- ---------------------------------------------------------------------- + CREATE TABLE spock.node ( node_id oid NOT NULL PRIMARY KEY, node_name name NOT NULL UNIQUE, @@ -51,6 +55,91 @@ CREATE TABLE spock.local_sync_status ( UNIQUE (sync_subid, sync_nspname, sync_relname) ); +CREATE TABLE spock.replication_set ( + set_id oid NOT NULL PRIMARY KEY, + set_nodeid oid NOT NULL REFERENCES node(node_id) ON UPDATE CASCADE, + set_name name NOT NULL, + replicate_insert boolean NOT NULL DEFAULT true, + replicate_update boolean NOT NULL DEFAULT true, + replicate_delete boolean NOT NULL DEFAULT true, + replicate_truncate boolean NOT NULL DEFAULT true, + UNIQUE (set_nodeid, set_name) +) WITH (user_catalog_table=true); + +CREATE TABLE spock.replication_set_table ( + set_id oid NOT NULL, + set_reloid regclass NOT NULL, + set_att_list text[], + set_row_filter pg_node_tree, + PRIMARY KEY(set_id, set_reloid) +) WITH (user_catalog_table=true); + +CREATE TABLE spock.replication_set_seq ( + set_id oid NOT NULL, + set_seqoid regclass NOT NULL, + PRIMARY KEY(set_id, set_seqoid) +) WITH (user_catalog_table=true); + +CREATE TABLE spock.sequence_state ( + seqoid oid NOT NULL PRIMARY KEY, + cache_size integer NOT NULL, + last_value bigint NOT NULL +) WITH (user_catalog_table=true); + +CREATE TABLE spock.depend ( + classid oid NOT NULL, + objid oid NOT NULL, + objsubid integer NOT NULL, + + refclassid oid NOT NULL, + refobjid oid NOT NULL, + refobjsubid integer NOT NULL, + + deptype "char" NOT NULL +) WITH (user_catalog_table=true); + +CREATE TABLE spock.queue ( + queued_at timestamp with time zone NOT NULL, + role name NOT NULL, + replication_sets text[], + message_type "char" NOT NULL, + message json NOT NULL +); + +CREATE TABLE spock.pii ( + id int generated always as identity, + pii_schema text NOT NULL, + pii_table text NOT NULL, + pii_column text NOT NULL, + PRIMARY KEY(id) +) WITH (user_catalog_table=true); + +CREATE TABLE spock.resolutions ( + id int generated always as identity, + node_name name NOT NULL, + log_time timestamptz NOT NULL, + relname text, + idxname text, + conflict_type text, + conflict_resolution text, + + -- columns for local changes + local_origin int, + local_tuple text, + local_xid xid, + local_timestamp timestamptz, + + -- columns for remote changes + remote_origin int, + remote_tuple text, + remote_xid xid, + remote_timestamp timestamptz, + remote_lsn pg_lsn, + + PRIMARY KEY(id, node_name) +) WITH (user_catalog_table=true); +CREATE INDEX ON spock.resolutions (log_time); + CREATE TABLE spock.exception_log ( remote_origin oid NOT NULL, remote_commit_ts timestamptz NOT NULL, @@ -98,169 +187,149 @@ CREATE TABLE spock.exception_status_detail ( REFERENCES spock.exception_status ) WITH (user_catalog_table=true); -CREATE FUNCTION spock.apply_group_progress ( - OUT dbid oid, - OUT node_id oid, - OUT remote_node_id oid, - OUT remote_commit_ts timestamptz, - OUT prev_remote_ts timestamptz, - OUT remote_commit_lsn pg_lsn, - OUT remote_insert_lsn pg_lsn, - OUT received_lsn pg_lsn, - OUT last_updated_ts timestamptz, - OUT updated_by_decode bool -) RETURNS SETOF record -LANGUAGE c AS 'MODULE_PATHNAME', 'get_apply_group_progress'; - --- Show the Spock apply progress for the current database --- Columns prev_remote_ts, last_updated_ts, and updated_by_decode is dedicated --- for internal use only. -CREATE VIEW spock.progress AS - SELECT * FROM spock.apply_group_progress() - WHERE dbid = ( - SELECT oid FROM pg_database WHERE datname = current_database() - ); - --- Read peer progress (ros.remote_lsn) for all peer subscriptions. --- Called while apply workers are paused and the slot's snapshot is imported. --- Row 0: header (lsn + snapshot placeholder). Rows 1+: one progress entry per peer. -CREATE FUNCTION spock.read_peer_progress( - p_slot_name text, - p_provider_node_id oid, - p_subscriber_node_id oid -) RETURNS TABLE( - lsn pg_lsn, - snapshot text, - dbid oid, - node_id oid, - remote_node_id oid, - remote_commit_ts timestamptz, - prev_remote_ts timestamptz, - remote_commit_lsn pg_lsn, - remote_insert_lsn pg_lsn, - received_lsn pg_lsn, - last_updated_ts timestamptz, - updated_by_decode boolean -) VOLATILE STRICT LANGUAGE plpgsql AS $$ -DECLARE - v_lsn pg_lsn; - v_snap text; - rec record; - v_n_peers int := 0; -BEGIN - /* - * The slot and snapshot are created by the C caller via the replication - * protocol. The slot's snapshot is imported into this transaction. - * This function just reads peer progress (ros.remote_lsn) while apply - * workers are paused. - */ - - -- Get the slot's LSN and the imported snapshot for the header row. - SELECT restart_lsn INTO v_lsn - FROM pg_replication_slots WHERE slot_name = p_slot_name; - v_snap := ''; -- snapshot managed by C caller - - RAISE NOTICE 'SPOCK cswp slot=% v_lsn=%', p_slot_name, v_lsn; +-- ---------------------------------------------------------------------- +-- Node management +-- ---------------------------------------------------------------------- - -- Header row: lsn only (snapshot managed by C caller). - lsn := v_lsn; - snapshot := v_snap; - RETURN NEXT; +CREATE FUNCTION spock.node_create( + node_name name, + dsn text, + location text DEFAULT NULL, + country text DEFAULT NULL, + info jsonb DEFAULT NULL +) +RETURNS oid CALLED ON NULL INPUT +AS 'MODULE_PATHNAME', 'spock_create_node' +LANGUAGE C VOLATILE; - /* - * Emit one progress row per peer. With apply workers paused, - * ros.remote_lsn is exact: it reflects only committed transactions - * whose effects are visible in the slot snapshot. - */ - FOR rec IN ( - SELECT p.dbid, p.node_id, p.remote_node_id, - p.remote_commit_ts, p.prev_remote_ts, - p.remote_commit_lsn AS grp_remote_commit_lsn, - p.remote_insert_lsn, - p.received_lsn, p.last_updated_ts, p.updated_by_decode, - ros.remote_lsn AS ros_remote_lsn, - sub.sub_slot_name AS sub_slot_name - FROM spock.subscription sub - JOIN spock.progress p - ON p.remote_node_id = sub.sub_origin - AND p.node_id = sub.sub_target - JOIN pg_replication_origin o - ON o.roname = sub.sub_slot_name - LEFT JOIN pg_replication_origin_status ros - ON ros.local_id = o.roident - WHERE sub.sub_target = p_provider_node_id - AND sub.sub_origin <> p_subscriber_node_id - ) LOOP - v_n_peers := v_n_peers + 1; +CREATE FUNCTION spock.node_drop( + node_name name, + ifexists boolean DEFAULT false +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_drop_node' +LANGUAGE C STRICT VOLATILE; - lsn := v_lsn; - snapshot := v_snap; - dbid := rec.dbid; - node_id := rec.node_id; - remote_node_id := rec.remote_node_id; - remote_commit_ts := rec.remote_commit_ts; - prev_remote_ts := rec.prev_remote_ts; - remote_commit_lsn := COALESCE(rec.ros_remote_lsn, '0/0'::pg_lsn); - remote_insert_lsn := rec.remote_insert_lsn; - received_lsn := rec.received_lsn; - last_updated_ts := rec.last_updated_ts; - updated_by_decode := rec.updated_by_decode; +CREATE FUNCTION spock.node_add_interface( + node_name name, + interface_name name, + dsn text +) +RETURNS oid +AS 'MODULE_PATHNAME', 'spock_alter_node_add_interface' +LANGUAGE C STRICT VOLATILE; - RAISE NOTICE 'SPOCK cswp peer=% resume_lsn=%', - rec.remote_node_id, remote_commit_lsn; +CREATE FUNCTION spock.node_drop_interface( + node_name name, + interface_name name +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_alter_node_drop_interface' +LANGUAGE C STRICT VOLATILE; - RETURN NEXT; - END LOOP; +CREATE FUNCTION spock.node_info( + OUT node_id oid, + OUT node_name text, + OUT sysid text, + OUT dbname text, + OUT replication_sets text, + OUT location text, + OUT country text, + OUT info jsonb +) +RETURNS record +AS 'MODULE_PATHNAME', 'spock_node_info' +LANGUAGE C STRICT STABLE; - RAISE NOTICE 'SPOCK cswp slot=% done peers=%', p_slot_name, v_n_peers; -END; -$$; +CREATE FUNCTION spock.get_country() +RETURNS text +AS $$ SELECT current_setting('spock.country') $$ +LANGUAGE SQL; -CREATE FUNCTION spock.node_create(node_name name, dsn text, - location text DEFAULT NULL, country text DEFAULT NULL, - info jsonb DEFAULT NULL) -RETURNS oid CALLED ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_create_node'; -CREATE FUNCTION spock.node_drop(node_name name, ifexists boolean DEFAULT false) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_drop_node'; +-- ---------------------------------------------------------------------- +-- Subscription management +-- ---------------------------------------------------------------------- -CREATE FUNCTION spock.node_add_interface(node_name name, interface_name name, dsn text) -RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_node_add_interface'; -CREATE FUNCTION spock.node_drop_interface(node_name name, interface_name name) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_node_drop_interface'; +CREATE FUNCTION spock.spock_gen_slot_name( + dbname name, + provider_node name, + subscription name +) +RETURNS name +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT IMMUTABLE PARALLEL SAFE; CREATE FUNCTION spock.sub_create( subscription_name name, provider_dsn text, - replication_sets text[] = '{default,default_insert_only,ddl_sql}', - synchronize_structure boolean = false, - synchronize_data boolean = false, - forward_origins text[] = '{}', + replication_sets text[] DEFAULT '{default,default_insert_only,ddl_sql}', + synchronize_structure boolean DEFAULT false, + synchronize_data boolean DEFAULT false, + forward_origins text[] DEFAULT '{}', apply_delay interval DEFAULT '0', - force_text_transfer boolean = false, - enabled boolean = true, - skip_schema text[] = '{}' + force_text_transfer boolean DEFAULT false, + enabled boolean DEFAULT true, + skip_schema text[] DEFAULT '{}' ) RETURNS oid AS 'MODULE_PATHNAME', 'spock_create_subscription' LANGUAGE C STRICT VOLATILE; -CREATE FUNCTION spock.sub_drop(subscription_name name, ifexists boolean DEFAULT false) -RETURNS oid STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_drop_subscription'; +CREATE FUNCTION spock.sub_drop( + subscription_name name, + ifexists boolean DEFAULT false +) +RETURNS oid +AS 'MODULE_PATHNAME', 'spock_drop_subscription' +LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION spock.sub_alter_interface( + subscription_name name, + interface_name name +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_alter_subscription_interface' +LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION spock.sub_disable( + subscription_name name, + immediate boolean DEFAULT false +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_alter_subscription_disable' +LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION spock.sub_enable( + subscription_name name, + immediate boolean DEFAULT false +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_alter_subscription_enable' +LANGUAGE C STRICT VOLATILE; -CREATE FUNCTION spock.sub_alter_interface(subscription_name name, interface_name name) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_subscription_interface'; +CREATE FUNCTION spock.sub_add_repset( + subscription_name name, + replication_set name +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_alter_subscription_add_replication_set' +LANGUAGE C STRICT VOLATILE; -CREATE FUNCTION spock.sub_disable(subscription_name name, immediate boolean DEFAULT false) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_subscription_disable'; -CREATE FUNCTION spock.sub_enable(subscription_name name, immediate boolean DEFAULT false) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_subscription_enable'; +CREATE FUNCTION spock.sub_remove_repset( + subscription_name name, + replication_set name +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_alter_subscription_remove_replication_set' +LANGUAGE C STRICT VOLATILE; -CREATE FUNCTION spock.sub_add_repset(subscription_name name, replication_set name) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_subscription_add_replication_set'; -CREATE FUNCTION spock.sub_remove_repset(subscription_name name, replication_set name) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_subscription_remove_replication_set'; -CREATE FUNCTION spock.sub_alter_skiplsn(subscription_name name, lsn pg_lsn) - RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_subscription_skip_lsn'; +CREATE FUNCTION spock.sub_alter_skiplsn( + subscription_name name, + lsn pg_lsn +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_alter_subscription_skip_lsn' +LANGUAGE C STRICT VOLATILE; CREATE FUNCTION spock.sub_alter_options( subscription_name name, @@ -270,6 +339,23 @@ RETURNS boolean AS 'MODULE_PATHNAME', 'spock_alter_subscription_options' LANGUAGE C STRICT VOLATILE; +CREATE FUNCTION spock.sub_alter_sync( + subscription_name name, + truncate boolean DEFAULT false +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_alter_subscription_synchronize' +LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION spock.sub_resync_table( + subscription_name name, + relation regclass, + truncate boolean DEFAULT true +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_alter_subscription_resynchronize_table' +LANGUAGE C STRICT VOLATILE; + CREATE FUNCTION spock.sub_show_status( subscription_name name DEFAULT NULL, OUT subscription_name text, @@ -284,87 +370,169 @@ RETURNS SETOF record AS 'MODULE_PATHNAME', 'spock_show_subscription_status' LANGUAGE C STABLE; -CREATE TABLE spock.replication_set ( - set_id oid NOT NULL PRIMARY KEY, - set_nodeid oid NOT NULL REFERENCES node(node_id) ON UPDATE CASCADE, - set_name name NOT NULL, - replicate_insert boolean NOT NULL DEFAULT true, - replicate_update boolean NOT NULL DEFAULT true, - replicate_delete boolean NOT NULL DEFAULT true, - replicate_truncate boolean NOT NULL DEFAULT true, - UNIQUE (set_nodeid, set_name) -) WITH (user_catalog_table=true); - -CREATE TABLE spock.replication_set_table ( - set_id oid NOT NULL, - set_reloid regclass NOT NULL, - set_att_list text[], - set_row_filter pg_node_tree, - PRIMARY KEY(set_id, set_reloid) -) WITH (user_catalog_table=true); +CREATE FUNCTION spock.sub_show_table( + subscription_name name, + relation regclass, + OUT nspname text, + OUT relname text, + OUT status text +) +RETURNS record +AS 'MODULE_PATHNAME', 'spock_show_subscription_table' +LANGUAGE C STRICT STABLE; -CREATE TABLE spock.replication_set_seq ( - set_id oid NOT NULL, - set_seqoid regclass NOT NULL, - PRIMARY KEY(set_id, set_seqoid) -) WITH (user_catalog_table=true); +CREATE FUNCTION spock.sub_wait_for_sync( + subscription_name name +) +RETURNS void +AS 'MODULE_PATHNAME', 'spock_wait_for_subscription_sync_complete' +LANGUAGE C STRICT VOLATILE; -CREATE TABLE spock.sequence_state ( - seqoid oid NOT NULL PRIMARY KEY, - cache_size integer NOT NULL, - last_value bigint NOT NULL -) WITH (user_catalog_table=true); +CREATE FUNCTION spock.table_wait_for_sync( + subscription_name name, + relation regclass +) +RETURNS void +AS 'MODULE_PATHNAME', 'spock_wait_for_table_sync_complete' +LANGUAGE C STRICT VOLATILE; -CREATE TABLE spock.depend ( - classid oid NOT NULL, - objid oid NOT NULL, - objsubid integer NOT NULL, +CREATE FUNCTION spock.sync_seq( + relation regclass +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_synchronize_sequence' +LANGUAGE C STRICT VOLATILE; - refclassid oid NOT NULL, - refobjid oid NOT NULL, - refobjsubid integer NOT NULL, +-- ---------------------------------------------------------------------- +-- Replication sets +-- ---------------------------------------------------------------------- - deptype "char" NOT NULL -) WITH (user_catalog_table=true); +CREATE FUNCTION spock.repset_create( + set_name name, + replicate_insert boolean DEFAULT true, + replicate_update boolean DEFAULT true, + replicate_delete boolean DEFAULT true, + replicate_truncate boolean DEFAULT true +) +RETURNS oid +AS 'MODULE_PATHNAME', 'spock_create_replication_set' +LANGUAGE C STRICT VOLATILE; -CREATE TABLE spock.pii ( - id int generated always as identity, - pii_schema text NOT NULL, - pii_table text NOT NULL, - pii_column text NOT NULL, - PRIMARY KEY(id) -) WITH (user_catalog_table=true); +CREATE FUNCTION spock.repset_alter( + set_name name, + replicate_insert boolean DEFAULT NULL, + replicate_update boolean DEFAULT NULL, + replicate_delete boolean DEFAULT NULL, + replicate_truncate boolean DEFAULT NULL +) +RETURNS oid CALLED ON NULL INPUT +AS 'MODULE_PATHNAME', 'spock_alter_replication_set' +LANGUAGE C VOLATILE; -CREATE TABLE spock.resolutions ( - id int generated always as identity, - node_name name NOT NULL, - log_time timestamptz NOT NULL, - relname text, - idxname text, - conflict_type text, - conflict_resolution text, +CREATE FUNCTION spock.repset_drop( + set_name name, + ifexists boolean DEFAULT false +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_drop_replication_set' +LANGUAGE C STRICT VOLATILE; - -- columns for local changes - local_origin int, - local_tuple text, - local_xid xid, - local_timestamp timestamptz, +CREATE FUNCTION spock.repset_add_table( + set_name name, + relation regclass, + synchronize_data boolean DEFAULT false, + columns text[] DEFAULT NULL, + row_filter text DEFAULT NULL, + include_partitions boolean DEFAULT true +) +RETURNS boolean CALLED ON NULL INPUT +AS 'MODULE_PATHNAME', 'spock_replication_set_add_table' +LANGUAGE C VOLATILE; - -- columns for remote changes - remote_origin int, - remote_tuple text, - remote_xid xid, - remote_timestamp timestamptz, - remote_lsn pg_lsn, +CREATE FUNCTION spock.repset_add_all_tables( + set_name name, + schema_names text[], + synchronize_data boolean DEFAULT false +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_replication_set_add_all_tables' +LANGUAGE C STRICT VOLATILE; - PRIMARY KEY(id, node_name) -) WITH (user_catalog_table=true); -CREATE INDEX ON spock.resolutions (log_time); +CREATE FUNCTION spock.repset_remove_table( + set_name name, + relation regclass, + include_partitions boolean DEFAULT true +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_replication_set_remove_table' +LANGUAGE C STRICT VOLATILE; -CREATE FUNCTION spock.cleanup_resolutions(days integer DEFAULT NULL) -RETURNS bigint VOLATILE -LANGUAGE c AS 'MODULE_PATHNAME', 'spock_cleanup_resolutions_sql'; -REVOKE ALL ON FUNCTION spock.cleanup_resolutions(integer) FROM PUBLIC; +CREATE FUNCTION spock.repset_add_seq( + set_name name, + relation regclass, + synchronize_data boolean DEFAULT false +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_replication_set_add_sequence' +LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION spock.repset_add_all_seqs( + set_name name, + schema_names text[], + synchronize_data boolean DEFAULT false +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_replication_set_add_all_sequences' +LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION spock.repset_remove_seq( + set_name name, + relation regclass +) +RETURNS boolean +AS 'MODULE_PATHNAME', 'spock_replication_set_remove_sequence' +LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION spock.repset_add_partition( + parent regclass, + partition regclass DEFAULT NULL, + row_filter text DEFAULT NULL +) +RETURNS int CALLED ON NULL INPUT +AS 'MODULE_PATHNAME', 'spock_replication_set_add_partition' +LANGUAGE C VOLATILE; + +CREATE FUNCTION spock.repset_remove_partition( + parent regclass, + partition regclass DEFAULT NULL +) +RETURNS int CALLED ON NULL INPUT +AS 'MODULE_PATHNAME', 'spock_replication_set_remove_partition' +LANGUAGE C VOLATILE; + +CREATE FUNCTION spock.repset_show_table( + relation regclass, + repsets text[], + OUT relid oid, + OUT nspname text, + OUT relname text, + OUT att_list text[], + OUT has_row_filter boolean, + OUT relkind "char", + OUT relispartition boolean +) +RETURNS record +AS 'MODULE_PATHNAME', 'spock_show_repset_table_info' +LANGUAGE C STRICT STABLE; + +CREATE FUNCTION spock.table_data_filtered( + reltyp anyelement, + relation regclass, + repsets text[] +) +RETURNS SETOF anyelement CALLED ON NULL INPUT +AS 'MODULE_PATHNAME', 'spock_table_data_filtered' +LANGUAGE C STABLE; CREATE VIEW spock.TABLES AS WITH set_relations AS ( @@ -398,180 +566,292 @@ CREATE VIEW spock.TABLES AS FROM user_tables t WHERE t.oid NOT IN (SELECT set_reloid FROM set_relations); -CREATE FUNCTION spock.repset_create( - set_name name, - replicate_insert boolean = true, - replicate_update boolean = true, - replicate_delete boolean = true, - replicate_truncate boolean = true -) -RETURNS oid -AS 'MODULE_PATHNAME', 'spock_create_replication_set' -LANGUAGE C STRICT VOLATILE; - -CREATE FUNCTION spock.repset_alter(set_name name, - replicate_insert boolean DEFAULT NULL, replicate_update boolean DEFAULT NULL, - replicate_delete boolean DEFAULT NULL, replicate_truncate boolean DEFAULT NULL) -RETURNS oid CALLED ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_replication_set'; +-- ---------------------------------------------------------------------- +-- DDL replication +-- ---------------------------------------------------------------------- -CREATE FUNCTION spock.repset_drop( - set_name name, - ifexists boolean DEFAULT false +CREATE FUNCTION spock.replicate_ddl( + command text, + replication_sets text[] DEFAULT '{ddl_sql}', + search_path text DEFAULT current_setting('search_path'), + role text DEFAULT CURRENT_USER ) RETURNS boolean -AS 'MODULE_PATHNAME', 'spock_drop_replication_set' +AS 'MODULE_PATHNAME', 'spock_replicate_ddl_command' LANGUAGE C STRICT VOLATILE; -CREATE FUNCTION spock.repset_add_table( - set_name name, - relation regclass, - synchronize_data boolean DEFAULT false, - columns text[] DEFAULT NULL, - row_filter text DEFAULT NULL, - include_partitions boolean default true +CREATE FUNCTION spock.replicate_ddl( + command text[], + replication_sets text[] DEFAULT '{ddl_sql}', + search_path text DEFAULT current_setting('search_path'), + role text DEFAULT CURRENT_USER ) -RETURNS boolean -AS 'MODULE_PATHNAME', 'spock_replication_set_add_table' -LANGUAGE C CALLED ON NULL INPUT VOLATILE; +RETURNS SETOF boolean +AS 'SELECT spock.replicate_ddl(cmd, $2, $3, $4) FROM (SELECT unnest(command) cmd)' +LANGUAGE SQL STRICT VOLATILE; + +-- ---------------------------------------------------------------------- +-- Apply progress and lag +-- ---------------------------------------------------------------------- -CREATE FUNCTION spock.repset_add_all_tables(set_name name, schema_names text[], synchronize_data boolean DEFAULT false) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replication_set_add_all_tables'; -CREATE FUNCTION spock.repset_remove_table(set_name name, relation regclass, include_partitions boolean default true) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replication_set_remove_table'; +CREATE FUNCTION spock.apply_group_progress( + OUT dbid oid, + OUT node_id oid, + OUT remote_node_id oid, + OUT remote_commit_ts timestamptz, + OUT prev_remote_ts timestamptz, + OUT remote_commit_lsn pg_lsn, + OUT remote_insert_lsn pg_lsn, + OUT received_lsn pg_lsn, + OUT last_updated_ts timestamptz, + OUT updated_by_decode bool +) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'get_apply_group_progress' +LANGUAGE C; -CREATE FUNCTION spock.repset_add_seq(set_name name, relation regclass, synchronize_data boolean DEFAULT false) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replication_set_add_sequence'; -CREATE FUNCTION spock.repset_add_all_seqs(set_name name, schema_names text[], synchronize_data boolean DEFAULT false) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replication_set_add_all_sequences'; -CREATE FUNCTION spock.repset_remove_seq(set_name name, relation regclass) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replication_set_remove_sequence'; +-- Show the Spock apply progress for the current database +-- Columns prev_remote_ts, last_updated_ts, and updated_by_decode is dedicated +-- for internal use only. +CREATE VIEW spock.progress AS + SELECT * FROM spock.apply_group_progress() + WHERE dbid = ( + SELECT oid FROM pg_database WHERE datname = current_database() + ); -CREATE FUNCTION spock.repset_add_partition(parent regclass, partition regclass default NULL, - row_filter text default NULL) -RETURNS int CALLED ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replication_set_add_partition'; +-- Read peer progress (ros.remote_lsn) for all peer subscriptions. +-- Called while apply workers are paused and the slot's snapshot is imported. +-- Row 0: header (lsn + snapshot placeholder). Rows 1+: one progress entry per peer. +CREATE FUNCTION spock.read_peer_progress( + p_slot_name text, + p_provider_node_id oid, + p_subscriber_node_id oid +) +RETURNS TABLE( + lsn pg_lsn, + snapshot text, + dbid oid, + node_id oid, + remote_node_id oid, + remote_commit_ts timestamptz, + prev_remote_ts timestamptz, + remote_commit_lsn pg_lsn, + remote_insert_lsn pg_lsn, + received_lsn pg_lsn, + last_updated_ts timestamptz, + updated_by_decode boolean +) +AS $$ +DECLARE + v_lsn pg_lsn; + v_snap text; + rec record; + v_n_peers int := 0; +BEGIN + /* + * The slot and snapshot are created by the C caller via the replication + * protocol. The slot's snapshot is imported into this transaction. + * This function just reads peer progress (ros.remote_lsn) while apply + * workers are paused. + */ -CREATE FUNCTION spock.repset_remove_partition(parent regclass, partition regclass default NULL) -RETURNS int CALLED ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replication_set_remove_partition'; + -- Get the slot's LSN and the imported snapshot for the header row. + SELECT restart_lsn INTO v_lsn + FROM pg_replication_slots WHERE slot_name = p_slot_name; + v_snap := ''; -- snapshot managed by C caller -CREATE FUNCTION spock.sub_alter_sync(subscription_name name, truncate boolean DEFAULT false) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_alter_subscription_synchronize'; + RAISE NOTICE 'SPOCK cswp slot=% v_lsn=%', p_slot_name, v_lsn; -CREATE FUNCTION spock.sub_resync_table( - subscription_name name, - relation regclass, - truncate boolean DEFAULT true -) -RETURNS boolean -AS 'MODULE_PATHNAME', 'spock_alter_subscription_resynchronize_table' -LANGUAGE C STRICT VOLATILE; + -- Header row: lsn only (snapshot managed by C caller). + lsn := v_lsn; + snapshot := v_snap; + RETURN NEXT; -CREATE FUNCTION spock.sync_seq(relation regclass) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_synchronize_sequence'; + /* + * Emit one progress row per peer. With apply workers paused, + * ros.remote_lsn is exact: it reflects only committed transactions + * whose effects are visible in the slot snapshot. + */ + FOR rec IN ( + SELECT p.dbid, p.node_id, p.remote_node_id, + p.remote_commit_ts, p.prev_remote_ts, + p.remote_commit_lsn AS grp_remote_commit_lsn, + p.remote_insert_lsn, + p.received_lsn, p.last_updated_ts, p.updated_by_decode, + ros.remote_lsn AS ros_remote_lsn, + sub.sub_slot_name AS sub_slot_name + FROM spock.subscription sub + JOIN spock.progress p + ON p.remote_node_id = sub.sub_origin + AND p.node_id = sub.sub_target + JOIN pg_replication_origin o + ON o.roname = sub.sub_slot_name + LEFT JOIN pg_replication_origin_status ros + ON ros.local_id = o.roident + WHERE sub.sub_target = p_provider_node_id + AND sub.sub_origin <> p_subscriber_node_id + ) LOOP + v_n_peers := v_n_peers + 1; -CREATE FUNCTION spock.table_data_filtered(reltyp anyelement, relation regclass, repsets text[]) -RETURNS SETOF anyelement CALLED ON NULL INPUT STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_table_data_filtered'; + lsn := v_lsn; + snapshot := v_snap; + dbid := rec.dbid; + node_id := rec.node_id; + remote_node_id := rec.remote_node_id; + remote_commit_ts := rec.remote_commit_ts; + prev_remote_ts := rec.prev_remote_ts; + remote_commit_lsn := COALESCE(rec.ros_remote_lsn, '0/0'::pg_lsn); + remote_insert_lsn := rec.remote_insert_lsn; + received_lsn := rec.received_lsn; + last_updated_ts := rec.last_updated_ts; + updated_by_decode := rec.updated_by_decode; -CREATE FUNCTION spock.repset_show_table(relation regclass, repsets text[], OUT relid oid, OUT nspname text, - OUT relname text, OUT att_list text[], OUT has_row_filter boolean, OUT relkind "char", OUT relispartition boolean) -RETURNS record STRICT STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_show_repset_table_info'; + RAISE NOTICE 'SPOCK cswp peer=% resume_lsn=%', + rec.remote_node_id, remote_commit_lsn; -CREATE FUNCTION spock.sub_show_table(subscription_name name, relation regclass, OUT nspname text, OUT relname text, OUT status text) -RETURNS record STRICT STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_show_subscription_table'; + RETURN NEXT; + END LOOP; -CREATE TABLE spock.queue ( - queued_at timestamp with time zone NOT NULL, - role name NOT NULL, - replication_sets text[], - message_type "char" NOT NULL, - message json NOT NULL -); + RAISE NOTICE 'SPOCK cswp slot=% done peers=%', p_slot_name, v_n_peers; +END; +$$ LANGUAGE plpgsql STRICT VOLATILE; -CREATE FUNCTION spock.replicate_ddl(command text, - replication_sets text[] DEFAULT '{ddl_sql}', - search_path text DEFAULT current_setting('search_path'), - role text DEFAULT CURRENT_USER) -RETURNS boolean STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_replicate_ddl_command'; - -CREATE FUNCTION spock.replicate_ddl(command text[], - replication_sets text[] DEFAULT '{ddl_sql}', - search_path text DEFAULT current_setting('search_path'), - role text DEFAULT CURRENT_USER) -RETURNS SETOF boolean STRICT VOLATILE LANGUAGE sql AS - 'SELECT spock.replicate_ddl(cmd, $2, $3, $4) FROM (SELECT unnest(command) cmd)'; - -CREATE FUNCTION spock.node_info(OUT node_id oid, OUT node_name text, - OUT sysid text, OUT dbname text, OUT replication_sets text, - OUT location text, OUT country text, OUT info jsonb) -RETURNS record -STABLE STRICT LANGUAGE c AS 'MODULE_PATHNAME', 'spock_node_info'; +CREATE VIEW spock.lag_tracker AS + SELECT + origin.node_name AS origin_name, + n.node_name AS receiver_name, + MAX(p.remote_commit_ts) AS commit_timestamp, + MAX(p.remote_commit_lsn) AS commit_lsn, + MAX(p.remote_insert_lsn) AS remote_insert_lsn, + MAX(p.received_lsn) AS received_lsn, + CASE + WHEN MAX(p.remote_insert_lsn) IS NOT NULL AND MAX(p.remote_commit_lsn) IS NOT NULL + THEN MAX(pg_wal_lsn_diff(p.remote_insert_lsn, p.remote_commit_lsn)) + ELSE NULL + END AS replication_lag_bytes, + CASE + WHEN MAX(p.remote_commit_ts) IS NOT NULL AND MAX(p.last_updated_ts) IS NOT NULL + THEN MAX(p.last_updated_ts - p.remote_commit_ts) + ELSE NULL + END AS replication_lag + FROM spock.progress p + LEFT JOIN spock.subscription sub ON (p.node_id = sub.sub_target and p.remote_node_id = sub.sub_origin) + LEFT JOIN spock.node origin ON sub.sub_origin = origin.node_id + LEFT JOIN spock.node n ON n.node_id = p.node_id + GROUP BY origin.node_name, n.node_name; -CREATE FUNCTION spock.spock_gen_slot_name( - dbname name, - provider_node name, - subscription name -) RETURNS name -AS 'MODULE_PATHNAME' -LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE; +CREATE OR REPLACE FUNCTION spock.get_apply_worker_status( + OUT worker_pid bigint, -- Changed from int to bigint + OUT worker_dboid int, + OUT worker_subid bigint, + OUT worker_status text +) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'get_apply_worker_status' +LANGUAGE C STABLE; + +CREATE FUNCTION spock.wait_for_apply_worker( + p_subbid bigint, + timeout int DEFAULT 0 +) +RETURNS boolean +AS $$ +DECLARE + start_time timestamptz := clock_timestamp(); + elapsed_time int := 0; + current_status text; +BEGIN + -- Loop until the timeout is reached or the worker is no longer running + WHILE true LOOP + -- Call spock.get_apply_worker_status to check the worker's status + SELECT worker_status + INTO current_status + FROM spock.get_apply_worker_status() + WHERE worker_subid = p_subbid; -CREATE FUNCTION spock_version() RETURNS text -LANGUAGE c AS 'MODULE_PATHNAME'; + -- If no row is found, return -1 + IF NOT FOUND THEN + RETURN false; + END IF; -CREATE FUNCTION spock_version_num() RETURNS integer -LANGUAGE c AS 'MODULE_PATHNAME'; + -- If the worker is no longer running, return 0 + IF current_status IS DISTINCT FROM 'running' THEN + RETURN false; + END IF; -CREATE FUNCTION spock_max_proto_version() RETURNS integer -LANGUAGE c AS 'MODULE_PATHNAME'; + -- Check if the timeout has been reached + elapsed_time := EXTRACT(EPOCH FROM clock_timestamp() - start_time) * 1000; + IF timeout > 0 AND elapsed_time >= timeout THEN + RETURN true; + END IF; -CREATE FUNCTION spock_min_proto_version() RETURNS integer -LANGUAGE c AS 'MODULE_PATHNAME'; + -- Sleep for a short interval before checking again + PERFORM pg_sleep(0.2); + END LOOP; +END; +$$ LANGUAGE plpgsql; -CREATE FUNCTION spock.get_country() RETURNS text -LANGUAGE sql AS -$$ SELECT current_setting('spock.country') $$; +CREATE FUNCTION spock.get_channel_stats( + OUT subid oid, + OUT relid oid, + OUT n_tup_ins bigint, + OUT n_tup_upd bigint, + OUT n_tup_del bigint, + OUT n_conflict bigint, + OUT n_dca bigint +) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'get_channel_stats' +LANGUAGE C; -CREATE FUNCTION spock.wait_slot_confirm_lsn(slotname name, target pg_lsn) +CREATE FUNCTION spock.reset_channel_stats() RETURNS void -AS 'spock','spock_wait_slot_confirm_lsn' +AS 'MODULE_PATHNAME', 'reset_channel_stats' LANGUAGE C; -CREATE FUNCTION spock.sub_wait_for_sync(subscription_name name) -RETURNS void RETURNS NULL ON NULL INPUT -AS 'MODULE_PATHNAME', 'spock_wait_for_subscription_sync_complete' -LANGUAGE C VOLATILE; - -CREATE FUNCTION spock.table_wait_for_sync( - subscription_name name, - relation regclass -) RETURNS void RETURNS NULL ON NULL INPUT -AS 'MODULE_PATHNAME', 'spock_wait_for_table_sync_complete' -LANGUAGE C VOLATILE; - -CREATE FUNCTION spock.sync_event(transactional boolean DEFAULT false) -RETURNS pg_lsn RETURNS NULL ON NULL INPUT -AS 'MODULE_PATHNAME', 'spock_create_sync_event' -LANGUAGE C VOLATILE; - -CREATE FUNCTION spock.pause_apply_workers() -RETURNS void -AS 'MODULE_PATHNAME', 'spock_pause_apply_workers' -LANGUAGE C VOLATILE; +CREATE VIEW spock.channel_table_stats AS + SELECT H.subid, H.relid, + CASE H.subid + WHEN 0 THEN '' + ELSE S.sub_name + END AS sub_name, + pg_catalog.quote_ident(N.nspname) || '.' || pg_catalog.quote_ident(C.relname) AS table_name, + H.n_tup_ins, H.n_tup_upd, H.n_tup_del, + H.n_conflict, H.n_dca + FROM spock.get_channel_stats() AS H + LEFT JOIN spock.subscription AS S ON S.sub_id = H.subid + LEFT JOIN pg_catalog.pg_class AS C ON C.oid = H.relid + LEFT JOIN pg_catalog.pg_namespace AS N ON N.oid = C.relnamespace; -REVOKE ALL ON FUNCTION spock.pause_apply_workers() FROM PUBLIC; +CREATE VIEW spock.channel_summary_stats AS + SELECT subid, sub_name, + sum(n_tup_ins) AS n_tup_ins, + sum(n_tup_upd) AS n_tup_upd, + sum(n_tup_del) AS n_tup_del, + sum(n_conflict) AS n_conflict, + sum(n_dca) AS n_dca + FROM spock.channel_table_stats + GROUP BY subid, sub_name; -CREATE FUNCTION spock.resume_apply_workers() -RETURNS void -AS 'MODULE_PATHNAME', 'spock_resume_apply_workers' -LANGUAGE C VOLATILE; +-- ---------------------------------------------------------------------- +-- Sync events +-- ---------------------------------------------------------------------- -REVOKE ALL ON FUNCTION spock.resume_apply_workers() FROM PUBLIC; +CREATE FUNCTION spock.sync_event( + transactional boolean DEFAULT false +) +RETURNS pg_lsn +AS 'MODULE_PATHNAME', 'spock_create_sync_event' +LANGUAGE C STRICT VOLATILE; CREATE PROCEDURE spock.wait_for_sync_event( - OUT result bool, - origin_id oid, - lsn pg_lsn, - timeout int DEFAULT 0, - wait_if_disabled bool DEFAULT false -) AS $$ + OUT result bool, + origin_id oid, + lsn pg_lsn, + timeout int DEFAULT 0, + wait_if_disabled bool DEFAULT false +) +AS $$ DECLARE target_id oid; start_time timestamptz := clock_timestamp(); @@ -645,12 +925,13 @@ END; $$ LANGUAGE plpgsql; CREATE PROCEDURE spock.wait_for_sync_event( - OUT result bool, - origin name, - lsn pg_lsn, - timeout int DEFAULT 0, - wait_if_disabled bool DEFAULT false -) AS $$ + OUT result bool, + origin name, + lsn pg_lsn, + timeout int DEFAULT 0, + wait_if_disabled bool DEFAULT false +) +AS $$ DECLARE origin_id oid; BEGIN @@ -662,93 +943,90 @@ BEGIN END; $$ LANGUAGE plpgsql; -CREATE FUNCTION spock.xact_commit_timestamp_origin("xid" xid, OUT "timestamp" timestamptz, OUT "roident" oid) -RETURNS record RETURNS NULL ON NULL INPUT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_xact_commit_timestamp_origin'; +CREATE FUNCTION spock.pause_apply_workers() +RETURNS void +AS 'MODULE_PATHNAME', 'spock_pause_apply_workers' +LANGUAGE C VOLATILE; +REVOKE ALL ON FUNCTION spock.pause_apply_workers() FROM PUBLIC; -CREATE FUNCTION spock.get_channel_stats( - OUT subid oid, - OUT relid oid, - OUT n_tup_ins bigint, - OUT n_tup_upd bigint, - OUT n_tup_del bigint, - OUT n_conflict bigint, - OUT n_dca bigint) -RETURNS SETOF record -LANGUAGE c AS 'MODULE_PATHNAME', 'get_channel_stats'; +CREATE FUNCTION spock.resume_apply_workers() +RETURNS void +AS 'MODULE_PATHNAME', 'spock_resume_apply_workers' +LANGUAGE C VOLATILE; +REVOKE ALL ON FUNCTION spock.resume_apply_workers() FROM PUBLIC; -CREATE FUNCTION spock.reset_channel_stats() RETURNS void -LANGUAGE c AS 'MODULE_PATHNAME', 'reset_channel_stats'; +CREATE FUNCTION spock.wait_slot_confirm_lsn( + slotname name, + target pg_lsn +) +RETURNS void +AS 'MODULE_PATHNAME', 'spock_wait_slot_confirm_lsn' +LANGUAGE C; -CREATE VIEW spock.channel_table_stats AS - SELECT H.subid, H.relid, - CASE H.subid - WHEN 0 THEN '' - ELSE S.sub_name - END AS sub_name, - pg_catalog.quote_ident(N.nspname) || '.' || pg_catalog.quote_ident(C.relname) AS table_name, - H.n_tup_ins, H.n_tup_upd, H.n_tup_del, - H.n_conflict, H.n_dca - FROM spock.get_channel_stats() AS H - LEFT JOIN spock.subscription AS S ON S.sub_id = H.subid - LEFT JOIN pg_catalog.pg_class AS C ON C.oid = H.relid - LEFT JOIN pg_catalog.pg_namespace AS N ON N.oid = C.relnamespace; +-- ---------------------------------------------------------------------- +-- Conflict tracking and exceptions +-- ---------------------------------------------------------------------- -CREATE VIEW spock.channel_summary_stats AS - SELECT subid, sub_name, - sum(n_tup_ins) AS n_tup_ins, - sum(n_tup_upd) AS n_tup_upd, - sum(n_tup_del) AS n_tup_del, - sum(n_conflict) AS n_conflict, - sum(n_dca) AS n_dca - FROM spock.channel_table_stats - GROUP BY subid, sub_name; +CREATE FUNCTION spock.cleanup_resolutions( + days integer DEFAULT NULL +) +RETURNS bigint +AS 'MODULE_PATHNAME', 'spock_cleanup_resolutions_sql' +LANGUAGE C VOLATILE; +REVOKE ALL ON FUNCTION spock.cleanup_resolutions(integer) FROM PUBLIC; -CREATE VIEW spock.lag_tracker AS - SELECT - origin.node_name AS origin_name, - n.node_name AS receiver_name, - MAX(p.remote_commit_ts) AS commit_timestamp, - MAX(p.remote_commit_lsn) AS commit_lsn, - MAX(p.remote_insert_lsn) AS remote_insert_lsn, - MAX(p.received_lsn) AS received_lsn, - CASE - WHEN MAX(p.remote_insert_lsn) IS NOT NULL AND MAX(p.remote_commit_lsn) IS NOT NULL - THEN MAX(pg_wal_lsn_diff(p.remote_insert_lsn, p.remote_commit_lsn)) - ELSE NULL - END AS replication_lag_bytes, - CASE - WHEN MAX(p.remote_commit_ts) IS NOT NULL AND MAX(p.last_updated_ts) IS NOT NULL - THEN MAX(p.last_updated_ts - p.remote_commit_ts) - ELSE NULL - END AS replication_lag - FROM spock.progress p - LEFT JOIN spock.subscription sub ON (p.node_id = sub.sub_target and p.remote_node_id = sub.sub_origin) - LEFT JOIN spock.node origin ON sub.sub_origin = origin.node_id - LEFT JOIN spock.node n ON n.node_id = p.node_id - GROUP BY origin.node_name, n.node_name; +CREATE FUNCTION spock.get_subscription_stats( + subid oid, + OUT subid oid, + OUT confl_insert_exists bigint, + OUT confl_update_origin_differs bigint, + OUT confl_update_exists bigint, + OUT confl_update_missing bigint, + OUT confl_delete_origin_differs bigint, + OUT confl_delete_missing bigint, + OUT confl_delete_exists bigint, + OUT stats_reset timestamptz +) +RETURNS record +AS 'MODULE_PATHNAME', 'spock_get_subscription_stats' +LANGUAGE C STABLE; -CREATE FUNCTION spock.md5_agg_sfunc(text, anyelement) - RETURNS text -AS $$ SELECT md5($1 || $2::text) $$ -LANGUAGE sql IMMUTABLE PARALLEL SAFE; -CREATE AGGREGATE spock.md5_agg (ORDER BY anyelement) -( - STYPE = text, - SFUNC = spock.md5_agg_sfunc, - INITCOND = '', - PARALLEL = SAFE -); +CREATE FUNCTION spock.reset_subscription_stats( + subid oid DEFAULT NULL +) +RETURNS void CALLED ON NULL INPUT +AS 'MODULE_PATHNAME', 'spock_reset_subscription_stats' +LANGUAGE C VOLATILE; + +CREATE FUNCTION spock.xact_commit_timestamp_origin( + "xid" xid, + OUT "timestamp" timestamptz, + OUT "roident" oid +) +RETURNS record +AS 'MODULE_PATHNAME', 'spock_xact_commit_timestamp_origin' +LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION spock.get_lsn_from_commit_ts( + slot_name name, + commit_ts timestamptz +) +RETURNS pg_lsn +AS 'MODULE_PATHNAME', 'spock_get_lsn_from_commit_ts' +LANGUAGE C STRICT VOLATILE; + +CREATE FUNCTION spock.repair_mode( + enabled bool +) +RETURNS pg_catalog.pg_lsn +AS 'MODULE_PATHNAME', 'spock_repair_mode' +LANGUAGE C; -- ---------------------------------------------------------------------- --- Spock Read Only +-- Conflict resolution: delta_apply -- ---------------------------------------------------------------------- -CREATE FUNCTION spock.terminate_active_transactions() RETURNS bool - AS 'MODULE_PATHNAME', 'spockro_terminate_active_transactions' - LANGUAGE C STRICT; --- ---- -- Generic delta apply functions for all numeric data types --- ---- CREATE FUNCTION spock.delta_apply(int2, int2, int2) RETURNS int2 AS 'MODULE_PATHNAME', 'delta_apply_int2' @@ -778,95 +1056,14 @@ RETURNS money AS 'MODULE_PATHNAME', 'delta_apply_money' LANGUAGE C; --- ---- --- Function to control REPAIR mode --- ---- -CREATE FUNCTION spock.repair_mode(enabled bool) -RETURNS pg_catalog.pg_lsn LANGUAGE c -AS 'MODULE_PATHNAME', 'spock_repair_mode'; - --- ---- --- Function to determine LSN from commit timestamp --- ---- -CREATE FUNCTION spock.get_lsn_from_commit_ts(slot_name name, commit_ts timestamptz) -RETURNS pg_lsn STRICT VOLATILE LANGUAGE c AS 'MODULE_PATHNAME', 'spock_get_lsn_from_commit_ts'; - -CREATE OR REPLACE FUNCTION spock.get_apply_worker_status( - OUT worker_pid bigint, -- Changed from int to bigint - OUT worker_dboid int, - OUT worker_subid bigint, - OUT worker_status text -) -RETURNS SETOF record STABLE LANGUAGE c AS 'MODULE_PATHNAME', 'get_apply_worker_status'; - -CREATE FUNCTION spock.wait_for_apply_worker(p_subbid bigint, timeout int DEFAULT 0) -RETURNS boolean -AS $$ -DECLARE - start_time timestamptz := clock_timestamp(); - elapsed_time int := 0; - current_status text; -BEGIN - -- Loop until the timeout is reached or the worker is no longer running - WHILE true LOOP - -- Call spock.get_apply_worker_status to check the worker's status - SELECT worker_status - INTO current_status - FROM spock.get_apply_worker_status() - WHERE worker_subid = p_subbid; - - -- If no row is found, return -1 - IF NOT FOUND THEN - RETURN false; - END IF; - - -- If the worker is no longer running, return 0 - IF current_status IS DISTINCT FROM 'running' THEN - RETURN false; - END IF; - - -- Check if the timeout has been reached - elapsed_time := EXTRACT(EPOCH FROM clock_timestamp() - start_time) * 1000; - IF timeout > 0 AND elapsed_time >= timeout THEN - RETURN true; - END IF; - - -- Sleep for a short interval before checking again - PERFORM pg_sleep(0.2); - END LOOP; -END; -$$ LANGUAGE plpgsql; - --- ---- --- Subscription conflict statistics --- ---- -CREATE FUNCTION spock.get_subscription_stats( - subid oid, - OUT subid oid, - OUT confl_insert_exists bigint, - OUT confl_update_origin_differs bigint, - OUT confl_update_exists bigint, - OUT confl_update_missing bigint, - OUT confl_delete_origin_differs bigint, - OUT confl_delete_missing bigint, - OUT confl_delete_exists bigint, - OUT stats_reset timestamptz -) -RETURNS record -AS 'MODULE_PATHNAME', 'spock_get_subscription_stats' -LANGUAGE C STABLE; - -CREATE FUNCTION spock.reset_subscription_stats(subid oid DEFAULT NULL) -RETURNS void -AS 'MODULE_PATHNAME', 'spock_reset_subscription_stats' -LANGUAGE C CALLED ON NULL INPUT VOLATILE; - -- Set delta_apply security label on specific column CREATE FUNCTION spock.delta_apply( rel regclass, att_name name, to_drop boolean DEFAULT false -) RETURNS boolean AS $$ +) +RETURNS boolean +AS $$ DECLARE label text; atttype name; @@ -972,3 +1169,49 @@ BEGIN RETURN true; END; $$ LANGUAGE plpgsql STRICT VOLATILE; + +CREATE FUNCTION spock.md5_agg_sfunc(text, anyelement) +RETURNS text +AS $$ SELECT md5($1 || $2::text) $$ +LANGUAGE SQL IMMUTABLE PARALLEL SAFE; + +CREATE AGGREGATE spock.md5_agg (ORDER BY anyelement) +( + STYPE = text, + SFUNC = spock.md5_agg_sfunc, + INITCOND = '', + PARALLEL = SAFE +); + +-- ---------------------------------------------------------------------- +-- Spock Read Only +-- ---------------------------------------------------------------------- + +CREATE FUNCTION spock.terminate_active_transactions() +RETURNS bool +AS 'MODULE_PATHNAME', 'spockro_terminate_active_transactions' +LANGUAGE C STRICT; + +-- ---------------------------------------------------------------------- +-- Version helpers +-- ---------------------------------------------------------------------- + +CREATE FUNCTION spock_version() +RETURNS text +AS 'MODULE_PATHNAME' +LANGUAGE C; + +CREATE FUNCTION spock_version_num() +RETURNS integer +AS 'MODULE_PATHNAME' +LANGUAGE C; + +CREATE FUNCTION spock_max_proto_version() +RETURNS integer +AS 'MODULE_PATHNAME' +LANGUAGE C; + +CREATE FUNCTION spock_min_proto_version() +RETURNS integer +AS 'MODULE_PATHNAME' +LANGUAGE C; From e44418a3f4acb399e806937133885f4010038c6a Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Tue, 12 May 2026 09:06:24 +0200 Subject: [PATCH 5/5] Fix: extend usage of the '--exclude-extension' option As mentioned in the review it is available since PostgreSQL 17. --- src/spock_sync.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/spock_sync.c b/src/spock_sync.c index 2b2450aa..990bb5a1 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -153,7 +153,7 @@ get_pg_executable(char *cmdname, char *cmdbuf) * The sub->skip_schema typically doesn't include our globally-skipped objects. * So, don't care about duplicates. */ -#if PG_VERSION_NUM >= 180000 +#if PG_VERSION_NUM >= 170000 static List * build_exclude_extension_string(void) { @@ -236,7 +236,7 @@ dump_structure(SpockSubscription *sub, const char *destfile, /* Filter out schemas and extensions, skipped globally. */ args = list_concat(args, build_exclude_schema_string(sub)); -#if PG_VERSION_NUM >= 180000 +#if PG_VERSION_NUM >= 170000 args = list_concat(args, build_exclude_extension_string()); #endif