From e0a92b7bd8dd7434297e1223aab67fcb92a5f9b6 Mon Sep 17 00:00:00 2001 From: liang8283 Date: Mon, 11 May 2026 16:50:08 +0800 Subject: [PATCH 1/2] feat(backup): file-hash based incremental for heap and AO tables Adds two independent flags to gpbackup that let incremental backups detect table changes by file content rather than by AO modcount alone: --heap-file-hash Hashes each heap table's data-file mtime+size on every segment via pg_stat_file() (CHECKPOINT first), so unchanged heap tables can be skipped in incremental. Default behavior (always include heap) is preserved when the flag is absent. --ao-file-hash Hashes (segno, eof, tupcount) of each AO table's aoseg rows -- deliberately excluding modcount, which on GP5 propagates across sibling partitions when only one leaf is modified. AOCS tables use a Cloudberry-aware column set: segno + tupcount + vpinfo when !IsGPDB(), or GP7+ schema otherwise. With the flag, partition-level change detection works correctly even on GP5-style modcount leaks. Both flags require --leaf-partition-data or --incremental. Implementation: toc/toc.go Extends IncrementalEntries with optional Heap map[string]HeapEntry, and AOEntry with optional FileHashMD5. Both yaml-omitempty so TOCs written without the flags remain bytewise identical to the old format and are readable by older binaries. backup/queries_incremental.go Adds ensureFileStatFunction (installs a plpgsql wrapper around pg_stat_file in gp_toolkit; pure SQL, no plpython), getTableFileHash, getHeapTableFQNs, GetAOContentHashes, getAOSegContentHash. Uses a dedicated dbconn so per-table query failures do not abort the backup transaction. backup/wrappers.go backupIncrementalMetadata: when --ao-file-hash is set, merges aoseg content hashes into the existing AOEntry map; when --heap-file-hash is set, CHECKPOINTs, then collects per-segment file hashes for heap tables into a new HeapEntry map. backup/incremental.go FilterTablesForIncremental now branches independently on the two flags. When neither is set, behavior is identical to the prior version (AO compared by modcount+DDL, heap unconditionally included). backup/validate.go Rejects --*-file-hash without --leaf-partition-data or --incremental. backup/incremental_test.go Moves the FilterTablesForIncremental call into JustBeforeEach so the new MustGetFlagBool reads happen after BeforeSuite initializes cmdFlags. (Previously the call ran during spec-tree construction.) Verification: - make build / make lint / make unit: pass on the server (no new lint findings; 13 ginkgo unit suites green). - Targeted live-cluster e2e on Cloudberry 2.5.0: 4-binary install, full + incremental cycle with both flags, mixed schema (heap/ao_row/ aocs/partitioned AO). Confirmed unchanged tables skipped, partition- level granularity (one leaf included, sibling skipped), restore round-trip row counts match across all tables. - make end_to_end (1232s, 196 of 221 specs ran): no regressions attributable to this change. Co-Authored-By: Claude Opus 4.7 (1M context) --- backup/incremental.go | 58 +++++++- backup/incremental_test.go | 7 +- backup/queries_incremental.go | 240 ++++++++++++++++++++++++++++++++++ backup/validate.go | 6 + backup/wrappers.go | 56 ++++++++ options/flag.go | 4 + toc/toc.go | 8 +- 7 files changed, 373 insertions(+), 6 deletions(-) diff --git a/backup/incremental.go b/backup/incremental.go index dcf70619..916ba182 100644 --- a/backup/incremental.go +++ b/backup/incremental.go @@ -13,17 +13,67 @@ import ( "github.com/pkg/errors" ) +// FilterTablesForIncremental decides which tables need data backed up. +// +// Independent toggles: +// - AO: default = modcount + DDL timestamp; with --ao-file-hash, use per-table aoseg content hash +// - Heap: default = always include (original gpbackup behavior); with --heap-file-hash, skip unchanged via file hash func FilterTablesForIncremental(lastBackupTOC, currentTOC *toc.TOC, tables []Table) []Table { + useAOHash := MustGetFlagBool(options.AO_FILE_HASH) + useHeapHash := MustGetFlagBool(options.HEAP_FILE_HASH) + var filteredTables []Table for _, table := range tables { - currentAOEntry, isAOTable := currentTOC.IncrementalMetadata.AO[table.FQN()] - if !isAOTable { + fqn := table.FQN() + + if currentAO, isAO := currentTOC.IncrementalMetadata.AO[fqn]; isAO { + if useAOHash && currentAO.FileHashMD5 != "" { + prevAO, hasPrev := lastBackupTOC.IncrementalMetadata.AO[fqn] + if !hasPrev || prevAO.FileHashMD5 == "" { + gplog.Debug("Filter: %s (AO/content) prev hash missing, including", fqn) + filteredTables = append(filteredTables, table) + continue + } + if prevAO.FileHashMD5 != currentAO.FileHashMD5 { + filteredTables = append(filteredTables, table) + } + } else { + prevAO := lastBackupTOC.IncrementalMetadata.AO[fqn] + if prevAO.Modcount != currentAO.Modcount || prevAO.LastDDLTimestamp != currentAO.LastDDLTimestamp { + filteredTables = append(filteredTables, table) + } + } + continue + } + + if !useHeapHash { + // Original behavior: heap tables always included in incremental filteredTables = append(filteredTables, table) continue } - previousAOEntry := lastBackupTOC.IncrementalMetadata.AO[table.FQN()] - if previousAOEntry.Modcount != currentAOEntry.Modcount || previousAOEntry.LastDDLTimestamp != currentAOEntry.LastDDLTimestamp { + // --heap-file-hash: skip heap tables whose file hash is unchanged + if currentTOC.IncrementalMetadata.Heap == nil { + filteredTables = append(filteredTables, table) + continue + } + currentHeap, isHeap := currentTOC.IncrementalMetadata.Heap[fqn] + if !isHeap || currentHeap.FileHashMD5 == "" { + gplog.Debug("Filter: %s (Heap) no current hash, including", fqn) + filteredTables = append(filteredTables, table) + continue + } + if lastBackupTOC.IncrementalMetadata.Heap == nil { + filteredTables = append(filteredTables, table) + continue + } + prevHeap, hasPrev := lastBackupTOC.IncrementalMetadata.Heap[fqn] + if !hasPrev || prevHeap.FileHashMD5 == "" { + gplog.Debug("Filter: %s (Heap) prev hash missing, including", fqn) + filteredTables = append(filteredTables, table) + continue + } + if prevHeap.FileHashMD5 != currentHeap.FileHashMD5 { filteredTables = append(filteredTables, table) } } diff --git a/backup/incremental_test.go b/backup/incremental_test.go index 425313d1..674ac03a 100644 --- a/backup/incremental_test.go +++ b/backup/incremental_test.go @@ -61,7 +61,12 @@ var _ = Describe("backup/incremental tests", func() { tblAOUnchanged, } - filteredTables := backup.FilterTablesForIncremental(&prevTOC, &currTOC, tables) + // FilterTablesForIncremental reads --ao-file-hash / --heap-file-hash flags via MustGetFlagBool, + // so the call must happen after BeforeSuite initializes cmdFlags — i.e. inside a leaf node. + var filteredTables []backup.Table + JustBeforeEach(func() { + filteredTables = backup.FilterTablesForIncremental(&prevTOC, &currTOC, tables) + }) It("Should include the heap table in the filtered list", func() { Expect(filteredTables).To(ContainElement(tblHeap)) diff --git a/backup/queries_incremental.go b/backup/queries_incremental.go index abc295da..1ebff78e 100644 --- a/backup/queries_incremental.go +++ b/backup/queries_incremental.go @@ -2,6 +2,7 @@ package backup import ( "fmt" + "strings" "github.com/apache/cloudberry-backup/toc" "github.com/apache/cloudberry-go-libs/dbconn" @@ -176,3 +177,242 @@ func getLastDDLTimestamps(connectionPool *dbconn.DBConn) map[string]string { } return resultMap } + +// getFileHashesForTables collects file timestamp+size MD5 hashes for a list of tables +// using the provided connection. The connection must be reused across all tables +// to ensure consistent gp_segment_id mapping. +func getFileHashesForTables(hashConn *dbconn.DBConn, tableFQNs []string) map[string]string { + result := make(map[string]string) + for _, fqn := range tableFQNs { + hash := getTableFileHash(hashConn, fqn) + if hash != "" { + result[fqn] = hash + } + } + return result +} + +// ensureFileStatFunction creates a plpgsql function (gp_toolkit.gpbackup_file_info) +// that uses pg_stat_file() to read each segment's data-file mtime+size. Pure SQL, +// no plpython/shell. Uses a separate connection so a failed setup does not abort +// the backup transaction. +func ensureFileStatFunction(connectionPool *dbconn.DBConn) bool { + gplog.Verbose("Setting up file hash detection function (plpgsql + pg_stat_file)") + + setupConn := dbconn.NewDBConnFromEnvironment(connectionPool.DBName) + setupConn.MustConnect(1) + defer setupConn.Close() + + checkSQL := `SELECT 1 AS val FROM pg_proc + WHERE proname = 'gpbackup_file_info' + AND pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'gp_toolkit');` + var checkResult []struct{ Val int } + err := setupConn.Select(&checkResult, checkSQL) + + if err != nil || len(checkResult) == 0 { + gplog.Verbose("Creating gp_toolkit.gpbackup_file_info function") + + createSQL := ` +CREATE OR REPLACE FUNCTION gp_toolkit.gpbackup_file_info(p_schema text, p_table text) +RETURNS text AS $BODY$ +DECLARE + v_tsp oid; + v_rfn oid; + v_dboid oid; + v_dbtsp oid; + v_path text; + v_mod text; + v_size text; +BEGIN + SELECT c.reltablespace, c.relfilenode INTO v_tsp, v_rfn + FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid + WHERE n.nspname = p_schema AND c.relname = p_table; + + IF v_rfn IS NULL THEN + RETURN ''; + END IF; + + SELECT oid, dattablespace INTO v_dboid, v_dbtsp + FROM pg_database WHERE datname = current_database(); + + IF v_tsp = 0 THEN + v_tsp := v_dbtsp; + END IF; + + IF v_tsp = 1663 THEN + v_path := 'base/' || v_dboid || '/' || v_rfn; + ELSE + v_path := 'pg_tblspc/' || v_tsp || '/' || v_dboid || '/' || v_rfn; + END IF; + + BEGIN + SELECT (pg_stat_file(v_path)).modification::text, + (pg_stat_file(v_path)).size::text + INTO v_mod, v_size; + EXCEPTION WHEN OTHERS THEN + v_mod := ''; + v_size := '0'; + END; + + RETURN COALESCE(v_mod, '') || '|' || COALESCE(v_size, '0'); +END; +$BODY$ LANGUAGE plpgsql;` + + _, createErr := setupConn.Exec(createSQL, 0) + if createErr != nil { + gplog.Warn("Could not create gpbackup_file_info function: %v", createErr) + return false + } + gplog.Verbose("gpbackup_file_info function created successfully") + } + + return true +} + +// getHeapTableFQNs returns FQNs of heap tables eligible for incremental backup. +func getHeapTableFQNs(connectionPool *dbconn.DBConn) []string { + var query string + if connectionPool.Version.IsGPDB() && connectionPool.Version.Before("7") { + query = fmt.Sprintf(` + SELECT quote_ident(n.nspname) || '.' || quote_ident(c.relname) AS tablefqn + FROM pg_class c + JOIN pg_namespace n ON c.relnamespace = n.oid + WHERE c.relstorage = 'h' + AND c.relkind = 'r' + AND c.oid NOT IN (SELECT inhrelid FROM pg_inherits) + AND %s`, relationAndSchemaFilterClause()) + } else { + query = fmt.Sprintf(` + SELECT quote_ident(n.nspname) || '.' || quote_ident(c.relname) AS tablefqn + FROM pg_class c + JOIN pg_namespace n ON c.relnamespace = n.oid + JOIN pg_am a ON c.relam = a.oid + WHERE a.amname = 'heap' + AND c.relkind = 'r' + AND c.oid NOT IN (SELECT inhrelid FROM pg_inherits) + AND %s`, relationAndSchemaFilterClause()) + } + + var results []struct{ TableFQN string } + err := connectionPool.Select(&results, query) + gplog.FatalOnError(err) + fqns := make([]string, len(results)) + for i, r := range results { + fqns[i] = r.TableFQN + } + return fqns +} + +// getTableFileHash computes an MD5 hash of per-segment data-file mtime+size for a +// heap table, using gp_toolkit.gpbackup_file_info (plpgsql + pg_stat_file). +// gp_dist_random('gp_id') runs the function locally on each segment. +func getTableFileHash(hashConn *dbconn.DBConn, tableFQN string) string { + parts := splitFQN(tableFQN) + if len(parts) != 2 { + return "" + } + schema, table := parts[0], parts[1] + + query := fmt.Sprintf(` + SELECT COALESCE(md5(string_agg( + gp_segment_id::text || ',' || info, chr(10) ORDER BY gp_segment_id + )), '') AS filehash + FROM ( + SELECT gp_segment_id, + gp_toolkit.gpbackup_file_info('%s', '%s') AS info + FROM gp_dist_random('gp_id') + ) x + WHERE info <> ''`, + schema, table) + + var results []struct{ FileHash string } + err := hashConn.Select(&results, query) + if err != nil { + gplog.Warn("Could not get file hash for %s: %v", tableFQN, err) + return "" + } + if len(results) == 0 || results[0].FileHash == "" { + return "" + } + return results[0].FileHash +} + +// splitFQN splits "schema.table" into [schema, table], stripping quote chars. +func splitFQN(fqn string) []string { + fqn = strings.ReplaceAll(fqn, "\"", "") + parts := strings.SplitN(fqn, ".", 2) + return parts +} + +// GetAOContentHashes returns a per-table aoseg content hash for every AO/AOCS table. +// In GP5, parent modcount changes when any child partition is modified, but each +// child's aoseg table only changes when that specific child receives data — so +// hashing the aoseg rows gives partition-level granularity. +// Uses a dedicated connection to avoid transaction abort propagation. +func GetAOContentHashes(connectionPool *dbconn.DBConn) map[string]string { + segTableFQNs := getAOSegTableFQNs(connectionPool) + + hashConn := dbconn.NewDBConnFromEnvironment(connectionPool.DBName) + hashConn.MustConnect(1) + defer hashConn.Close() + + result := make(map[string]string) + for aoTableFQN, aosegTableFQN := range segTableFQNs { + hash := getAOSegContentHash(hashConn, aosegTableFQN) + if hash != "" { + result[aoTableFQN] = hash + } + } + return result +} + +// getAOSegContentHash hashes content-bearing columns of the aoseg metadata table — +// deliberately excluding modcount, which in GP5 propagates across sibling partitions +// even when only one is modified. +// +// - AO row (pg_aoseg_*): segno + eof + tupcount. +// - AOCS column store (pg_aocsseg_*): layout differs by product / version. +// Cloudberry AOCS exposes vpinfo (vertical-partition info, a bytea containing +// per-column eofs); GP7+ exposes column_num + physical_segno + eof_uncompressed; +// pre-GP6 only has segno + tupcount. +func getAOSegContentHash(hashConn *dbconn.DBConn, aosegTableFQN string) string { + isColumnStore := strings.Contains(aosegTableFQN, "pg_aocsseg") + + var cols string + if isColumnStore { + switch { + case !hashConn.Version.IsGPDB(): + // Cloudberry AOCS: segno, tupcount, vpinfo (bytea with per-column EOFs) + cols = "segno::text || ',' || tupcount::text || ',' || encode(vpinfo, 'hex')" + case hashConn.Version.Before("6"): + cols = "segno::text || ',' || tupcount::text" + default: + // GP7+ AOCS + cols = "segno::text || ',' || column_num::text || ',' || physical_segno::text || ',' || tupcount::text || ',' || eof_uncompressed::text" + } + } else { + cols = "segno::text || ',' || eof::text || ',' || tupcount::text" + } + + var query string + if hashConn.Version.IsGPDB() && hashConn.Version.Before("7") { + query = fmt.Sprintf(`SELECT COALESCE(md5(string_agg(%s, + chr(10) ORDER BY segno)), '') AS contenthash FROM %s`, cols, aosegTableFQN) + } else { + query = fmt.Sprintf(`SELECT COALESCE(md5(string_agg( + gp_segment_id::text || ',' || %s, + chr(10) ORDER BY gp_segment_id, segno)), '') AS contenthash FROM gp_dist_random('%s')`, + cols, aosegTableFQN) + } + + var results []struct{ ContentHash string } + err := hashConn.Select(&results, query) + if err != nil { + gplog.Warn("Could not get aoseg content hash for %s: %v", aosegTableFQN, err) + return "" + } + if len(results) == 0 { + return "" + } + return results[0].ContentHash +} diff --git a/backup/validate.go b/backup/validate.go index e3169d6f..7131a7fd 100644 --- a/backup/validate.go +++ b/backup/validate.go @@ -156,6 +156,12 @@ func validateFlagCombinations(flags *pflag.FlagSet) { if MustGetFlagBool(options.INCREMENTAL) && !MustGetFlagBool(options.LEAF_PARTITION_DATA) { gplog.Fatal(errors.Errorf("--leaf-partition-data must be specified with --incremental"), "") } + if MustGetFlagBool(options.HEAP_FILE_HASH) && !MustGetFlagBool(options.INCREMENTAL) && !MustGetFlagBool(options.LEAF_PARTITION_DATA) { + gplog.Fatal(errors.Errorf("--heap-file-hash requires --leaf-partition-data (for full backup baseline) or --incremental"), "") + } + if MustGetFlagBool(options.AO_FILE_HASH) && !MustGetFlagBool(options.INCREMENTAL) && !MustGetFlagBool(options.LEAF_PARTITION_DATA) { + gplog.Fatal(errors.Errorf("--ao-file-hash requires --leaf-partition-data (for full backup baseline) or --incremental"), "") + } if MustGetFlagBool(options.NO_INHERITS) && !(FlagChanged(options.INCLUDE_RELATION) || FlagChanged(options.INCLUDE_RELATION_FILE)) { gplog.Fatal(errors.Errorf("--no-inherits must be specified with either --include-table or --include-table-file"), "") } diff --git a/backup/wrappers.go b/backup/wrappers.go index 284348cd..3f6c79d5 100644 --- a/backup/wrappers.go +++ b/backup/wrappers.go @@ -760,8 +760,64 @@ func backupTableStatistics(statisticsFile *utils.FileWithByteCount, tables []Tab } func backupIncrementalMetadata() { + // Always collect AO metadata via modcount + DDL timestamp. aoTableEntries := GetAOIncrementalMetadata(connectionPool) globalTOC.IncrementalMetadata.AO = aoTableEntries + + useHeapHash := MustGetFlagBool(options.HEAP_FILE_HASH) + useAOHash := MustGetFlagBool(options.AO_FILE_HASH) + + if !useHeapHash && !useAOHash { + gplog.Info("Collected incremental metadata: %d AO tables", len(aoTableEntries)) + return + } + + aoContentHashCount := 0 + if useAOHash { + gplog.Info("Collecting AO aoseg content hashes (--ao-file-hash)") + aoContentHashes := GetAOContentHashes(connectionPool) + aoContentHashCount = len(aoContentHashes) + for fqn, hash := range aoContentHashes { + if existing, ok := aoTableEntries[fqn]; ok { + existing.FileHashMD5 = hash + aoTableEntries[fqn] = existing + } + } + globalTOC.IncrementalMetadata.AO = aoTableEntries + } + + heapHashCount := 0 + if useHeapHash { + if !ensureFileStatFunction(connectionPool) { + gplog.Warn("File hash setup incomplete, skipping heap file hash collection") + } else { + hashConn := dbconn.NewDBConnFromEnvironment(connectionPool.DBName) + hashConn.MustConnect(1) + defer hashConn.Close() + + // CHECKPOINT flushes dirty pages so pg_stat_file sees up-to-date mtime/size. + gplog.Verbose("Executing CHECKPOINT before heap file hash collection") + if _, cpErr := hashConn.Exec("CHECKPOINT;", 0); cpErr != nil { + gplog.Warn("CHECKPOINT failed (non-fatal, file hashes may be stale): %v", cpErr) + } + + heapFQNs := getHeapTableFQNs(connectionPool) + heapFileHashes := getFileHashesForTables(hashConn, heapFQNs) + heapEntries := make(map[string]toc.HeapEntry) + for fqn, hash := range heapFileHashes { + if hash != "" { + heapEntries[fqn] = toc.HeapEntry{FileHashMD5: hash} + } + } + if len(heapEntries) > 0 { + globalTOC.IncrementalMetadata.Heap = heapEntries + } + heapHashCount = len(heapEntries) + } + } + + gplog.Info("Collected incremental metadata: %d AO tables (%d with content hash), %d heap tables", + len(aoTableEntries), aoContentHashCount, heapHashCount) } func backupStorageServers(metadataFile *utils.FileWithByteCount) { diff --git a/options/flag.go b/options/flag.go index 3618a259..3a4399cc 100644 --- a/options/flag.go +++ b/options/flag.go @@ -54,6 +54,8 @@ const ( RESIZE_CLUSTER = "resize-cluster" NO_INHERITS = "no-inherits" REPORT_DIR = "report-dir" + HEAP_FILE_HASH = "heap-file-hash" + AO_FILE_HASH = "ao-file-hash" ) func SetBackupFlagDefaults(flagSet *pflag.FlagSet) { @@ -89,6 +91,8 @@ func SetBackupFlagDefaults(flagSet *pflag.FlagSet) { flagSet.Bool(WITH_STATS, false, "Back up query plan statistics") flagSet.Bool(WITHOUT_GLOBALS, false, "Skip backup of global metadata") flagSet.Bool(NO_INHERITS, false, "For a filtered backup, don't back up all tables that inherit included tables") + flagSet.Bool(HEAP_FILE_HASH, false, "Use file timestamp hash (CHECKPOINT + pg_stat_file) to detect heap table changes for incremental backup") + flagSet.Bool(AO_FILE_HASH, false, "Use aoseg content hash (eof+tupcount) for per-partition AO table change detection; avoids GP5 modcount cross-partition propagation") } func SetRestoreFlagDefaults(flagSet *pflag.FlagSet) { diff --git a/toc/toc.go b/toc/toc.go index 32cb913a..6b075cdd 100644 --- a/toc/toc.go +++ b/toc/toc.go @@ -53,12 +53,18 @@ type SegmentDataEntry struct { } type IncrementalEntries struct { - AO map[string]AOEntry + AO map[string]AOEntry + Heap map[string]HeapEntry `yaml:"heap,omitempty"` } type AOEntry struct { Modcount int64 LastDDLTimestamp string + FileHashMD5 string `yaml:"fileHashMD5,omitempty"` +} + +type HeapEntry struct { + FileHashMD5 string `yaml:"fileHashMD5"` } type UniqueID struct { From 4f575d943756aa7481db24647dda5631d9d8759b Mon Sep 17 00:00:00 2001 From: liang8283 Date: Mon, 11 May 2026 17:52:26 +0800 Subject: [PATCH 2/2] fix(backup): make file-hash safe for custom tablespaces and quoted names Follow-up to e24bc69a. Two real bugs and two minor cleanups found in code review of that commit: 1. (data loss) gp_toolkit.gpbackup_file_info hand-built the tablespace path as 'pg_tblspc///', missing the mandatory 'PG__' directory that PostgreSQL/Cloudberry put in the middle. pg_stat_file would fail for every heap table living in a custom tablespace; the EXCEPTION branch swallowed the error and returned the literal string '|0' (not empty). Because '|0' is not filtered by 'WHERE info <> '''', that constant became the table's hash on every backup, so any custom-tablespace heap table would appear unchanged forever and be silently skipped from every incremental backup -- losing any new rows. 2. (SQL injection / correctness) getTableFileHash interpolated FQN parts straight into the SQL via fmt.Sprintf('%s', '%s'). splitFQN stripped double quotes but left single quotes intact, so a table name with an embedded single quote (a legal pg identifier, e.g. "o'reilly") would break out of the string literal. 3. (clarity) getAOSegContentHash's default case said "GP7+ AOCS" but actually fires for GP6+. Comment fixed. 4. (consistency) HeapEntry.FileHashMD5 missing 'omitempty' that AOEntry.FileHashMD5 already had. Fix for #1 and #2 is the same change: pass the table's OID through to the plpgsql function, and let pg_relation_filepath() compute the on-disk path. The built-in already handles all tablespace layouts correctly across PG/Cloudberry versions, and oid interpolation is just an integer literal so the SQL-string-escaping concern disappears. The EXCEPTION block now returns true empty-string, so a hash failure on a single table falls through to "include in incremental" instead of poisoning the hash with a constant. Implementation notes: backup/queries_incremental.go - gp_toolkit.gpbackup_file_info now takes (p_oid oid) and uses pg_relation_filepath(p_oid). The setup path drops any older (text, text) signature first so an in-place upgrade against a previous gpbackup installation cleans up cleanly. The duplicate- check query gained 'pronargs = 1' to distinguish from the old signature. - getHeapTableFQNs -> getHeapTables, returning []heapTable{Oid,FQN}. - getTableFileHash(hashConn, oid, fqn) -- oid interpolated as integer literal, fqn used only for log messages. - getFileHashesForTables takes []heapTable. - splitFQN removed. backup/wrappers.go - One-line callsite update. toc/toc.go - HeapEntry.FileHashMD5 gets omitempty for symmetry with AOEntry. Verification: - make build / make unit on the server: green. - Targeted live cluster e2e (heap + ao_row + ao_column + partitioned AO, full + mutate + incremental + restore): every expected inclusion/exclusion matches, restored row counts match source. Co-Authored-By: Claude Opus 4.7 (1M context) --- backup/queries_incremental.go | 129 ++++++++++++++++------------------ backup/wrappers.go | 4 +- toc/toc.go | 2 +- 3 files changed, 65 insertions(+), 70 deletions(-) diff --git a/backup/queries_incremental.go b/backup/queries_incremental.go index 1ebff78e..bc8e0eb6 100644 --- a/backup/queries_incremental.go +++ b/backup/queries_incremental.go @@ -178,24 +178,39 @@ func getLastDDLTimestamps(connectionPool *dbconn.DBConn) map[string]string { return resultMap } -// getFileHashesForTables collects file timestamp+size MD5 hashes for a list of tables -// using the provided connection. The connection must be reused across all tables -// to ensure consistent gp_segment_id mapping. -func getFileHashesForTables(hashConn *dbconn.DBConn, tableFQNs []string) map[string]string { +// heapTable carries the catalog identity of a heap table eligible for file-hash +// incremental backup. Oid is what we pass into the file-hash plpgsql function +// (pg_relation_filepath resolves it locally on each segment); FQN is for keying +// the result map and for log messages. +type heapTable struct { + Oid uint32 + FQN string +} + +// getFileHashesForTables collects file timestamp+size MD5 hashes for the given +// heap tables using the provided connection. The connection must be reused +// across all tables so gp_segment_id mapping stays consistent. +func getFileHashesForTables(hashConn *dbconn.DBConn, tables []heapTable) map[string]string { result := make(map[string]string) - for _, fqn := range tableFQNs { - hash := getTableFileHash(hashConn, fqn) + for _, t := range tables { + hash := getTableFileHash(hashConn, t.Oid, t.FQN) if hash != "" { - result[fqn] = hash + result[t.FQN] = hash } } return result } // ensureFileStatFunction creates a plpgsql function (gp_toolkit.gpbackup_file_info) -// that uses pg_stat_file() to read each segment's data-file mtime+size. Pure SQL, -// no plpython/shell. Uses a separate connection so a failed setup does not abort -// the backup transaction. +// that uses pg_relation_filepath() + pg_stat_file() to read each segment's +// data-file mtime+size. Pure SQL, no plpython/shell. Uses a separate connection +// so a failed setup does not abort the backup transaction. +// +// pg_relation_filepath is used because manual path construction is fragile: +// custom tablespaces live under pg_tblspc//PG__//, +// and that PG__ segment varies by server version. Letting the +// server compute the path keeps us correct across versions and tablespace +// configurations. func ensureFileStatFunction(connectionPool *dbconn.DBConn) bool { gplog.Verbose("Setting up file hash detection function (plpgsql + pg_stat_file)") @@ -205,6 +220,7 @@ func ensureFileStatFunction(connectionPool *dbconn.DBConn) bool { checkSQL := `SELECT 1 AS val FROM pg_proc WHERE proname = 'gpbackup_file_info' + AND pronargs = 1 AND pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'gp_toolkit');` var checkResult []struct{ Val int } err := setupConn.Select(&checkResult, checkSQL) @@ -212,46 +228,29 @@ func ensureFileStatFunction(connectionPool *dbconn.DBConn) bool { if err != nil || len(checkResult) == 0 { gplog.Verbose("Creating gp_toolkit.gpbackup_file_info function") + // Drop any older (text, text) signature from a previous gpbackup version + // before recreating with the new (oid) signature. + _, _ = setupConn.Exec("DROP FUNCTION IF EXISTS gp_toolkit.gpbackup_file_info(text, text);", 0) + createSQL := ` -CREATE OR REPLACE FUNCTION gp_toolkit.gpbackup_file_info(p_schema text, p_table text) +CREATE OR REPLACE FUNCTION gp_toolkit.gpbackup_file_info(p_oid oid) RETURNS text AS $BODY$ DECLARE - v_tsp oid; - v_rfn oid; - v_dboid oid; - v_dbtsp oid; v_path text; v_mod text; v_size text; BEGIN - SELECT c.reltablespace, c.relfilenode INTO v_tsp, v_rfn - FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid - WHERE n.nspname = p_schema AND c.relname = p_table; - - IF v_rfn IS NULL THEN + v_path := pg_relation_filepath(p_oid); + IF v_path IS NULL THEN RETURN ''; END IF; - SELECT oid, dattablespace INTO v_dboid, v_dbtsp - FROM pg_database WHERE datname = current_database(); - - IF v_tsp = 0 THEN - v_tsp := v_dbtsp; - END IF; - - IF v_tsp = 1663 THEN - v_path := 'base/' || v_dboid || '/' || v_rfn; - ELSE - v_path := 'pg_tblspc/' || v_tsp || '/' || v_dboid || '/' || v_rfn; - END IF; - BEGIN SELECT (pg_stat_file(v_path)).modification::text, (pg_stat_file(v_path)).size::text INTO v_mod, v_size; EXCEPTION WHEN OTHERS THEN - v_mod := ''; - v_size := '0'; + RETURN ''; END; RETURN COALESCE(v_mod, '') || '|' || COALESCE(v_size, '0'); @@ -269,12 +268,13 @@ $BODY$ LANGUAGE plpgsql;` return true } -// getHeapTableFQNs returns FQNs of heap tables eligible for incremental backup. -func getHeapTableFQNs(connectionPool *dbconn.DBConn) []string { +// getHeapTables returns (oid, FQN) pairs of heap tables eligible for incremental backup. +func getHeapTables(connectionPool *dbconn.DBConn) []heapTable { var query string if connectionPool.Version.IsGPDB() && connectionPool.Version.Before("7") { query = fmt.Sprintf(` - SELECT quote_ident(n.nspname) || '.' || quote_ident(c.relname) AS tablefqn + SELECT c.oid, + quote_ident(n.nspname) || '.' || quote_ident(c.relname) AS tablefqn FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid WHERE c.relstorage = 'h' @@ -283,7 +283,8 @@ func getHeapTableFQNs(connectionPool *dbconn.DBConn) []string { AND %s`, relationAndSchemaFilterClause()) } else { query = fmt.Sprintf(` - SELECT quote_ident(n.nspname) || '.' || quote_ident(c.relname) AS tablefqn + SELECT c.oid, + quote_ident(n.nspname) || '.' || quote_ident(c.relname) AS tablefqn FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid JOIN pg_am a ON c.relam = a.oid @@ -293,42 +294,40 @@ func getHeapTableFQNs(connectionPool *dbconn.DBConn) []string { AND %s`, relationAndSchemaFilterClause()) } - var results []struct{ TableFQN string } + var results []struct { + Oid uint32 + TableFQN string + } err := connectionPool.Select(&results, query) gplog.FatalOnError(err) - fqns := make([]string, len(results)) + out := make([]heapTable, len(results)) for i, r := range results { - fqns[i] = r.TableFQN + out[i] = heapTable{Oid: r.Oid, FQN: r.TableFQN} } - return fqns + return out } -// getTableFileHash computes an MD5 hash of per-segment data-file mtime+size for a -// heap table, using gp_toolkit.gpbackup_file_info (plpgsql + pg_stat_file). -// gp_dist_random('gp_id') runs the function locally on each segment. -func getTableFileHash(hashConn *dbconn.DBConn, tableFQN string) string { - parts := splitFQN(tableFQN) - if len(parts) != 2 { - return "" - } - schema, table := parts[0], parts[1] - +// getTableFileHash computes an MD5 hash of per-segment data-file mtime+size for +// a heap table, using gp_toolkit.gpbackup_file_info(oid). gp_dist_random('gp_id') +// runs the function locally on each segment; pg_relation_filepath on the segment +// resolves the local path, so each segment hashes its own copy of the table. +func getTableFileHash(hashConn *dbconn.DBConn, oid uint32, fqn string) string { + // oid is interpolated as an integer literal -- no string-escaping concerns. query := fmt.Sprintf(` SELECT COALESCE(md5(string_agg( gp_segment_id::text || ',' || info, chr(10) ORDER BY gp_segment_id )), '') AS filehash FROM ( SELECT gp_segment_id, - gp_toolkit.gpbackup_file_info('%s', '%s') AS info + gp_toolkit.gpbackup_file_info(%d::oid) AS info FROM gp_dist_random('gp_id') ) x - WHERE info <> ''`, - schema, table) + WHERE info <> ''`, oid) var results []struct{ FileHash string } err := hashConn.Select(&results, query) if err != nil { - gplog.Warn("Could not get file hash for %s: %v", tableFQN, err) + gplog.Warn("Could not get file hash for %s: %v", fqn, err) return "" } if len(results) == 0 || results[0].FileHash == "" { @@ -337,13 +336,6 @@ func getTableFileHash(hashConn *dbconn.DBConn, tableFQN string) string { return results[0].FileHash } -// splitFQN splits "schema.table" into [schema, table], stripping quote chars. -func splitFQN(fqn string) []string { - fqn = strings.ReplaceAll(fqn, "\"", "") - parts := strings.SplitN(fqn, ".", 2) - return parts -} - // GetAOContentHashes returns a per-table aoseg content hash for every AO/AOCS table. // In GP5, parent modcount changes when any child partition is modified, but each // child's aoseg table only changes when that specific child receives data — so @@ -373,8 +365,11 @@ func GetAOContentHashes(connectionPool *dbconn.DBConn) map[string]string { // - AO row (pg_aoseg_*): segno + eof + tupcount. // - AOCS column store (pg_aocsseg_*): layout differs by product / version. // Cloudberry AOCS exposes vpinfo (vertical-partition info, a bytea containing -// per-column eofs); GP7+ exposes column_num + physical_segno + eof_uncompressed; -// pre-GP6 only has segno + tupcount. +// per-column eofs); GPDB 6+ exposes column_num + physical_segno + +// eof_uncompressed; pre-GP6 only has segno + tupcount. If a column is missing +// on a given version, the query fails and getAOSegContentHash returns "", +// and FilterTablesForIncremental falls back to modcount comparison for that +// table. func getAOSegContentHash(hashConn *dbconn.DBConn, aosegTableFQN string) string { isColumnStore := strings.Contains(aosegTableFQN, "pg_aocsseg") @@ -387,7 +382,7 @@ func getAOSegContentHash(hashConn *dbconn.DBConn, aosegTableFQN string) string { case hashConn.Version.Before("6"): cols = "segno::text || ',' || tupcount::text" default: - // GP7+ AOCS + // GPDB 6+ AOCS cols = "segno::text || ',' || column_num::text || ',' || physical_segno::text || ',' || tupcount::text || ',' || eof_uncompressed::text" } } else { diff --git a/backup/wrappers.go b/backup/wrappers.go index 3f6c79d5..ce643a33 100644 --- a/backup/wrappers.go +++ b/backup/wrappers.go @@ -801,8 +801,8 @@ func backupIncrementalMetadata() { gplog.Warn("CHECKPOINT failed (non-fatal, file hashes may be stale): %v", cpErr) } - heapFQNs := getHeapTableFQNs(connectionPool) - heapFileHashes := getFileHashesForTables(hashConn, heapFQNs) + heapTables := getHeapTables(connectionPool) + heapFileHashes := getFileHashesForTables(hashConn, heapTables) heapEntries := make(map[string]toc.HeapEntry) for fqn, hash := range heapFileHashes { if hash != "" { diff --git a/toc/toc.go b/toc/toc.go index 6b075cdd..2e291d28 100644 --- a/toc/toc.go +++ b/toc/toc.go @@ -64,7 +64,7 @@ type AOEntry struct { } type HeapEntry struct { - FileHashMD5 string `yaml:"fileHashMD5"` + FileHashMD5 string `yaml:"fileHashMD5,omitempty"` } type UniqueID struct {