diff --git a/backup/incremental.go b/backup/incremental.go index dcf70619..916ba182 100644 --- a/backup/incremental.go +++ b/backup/incremental.go @@ -13,17 +13,67 @@ import ( "github.com/pkg/errors" ) +// FilterTablesForIncremental decides which tables need data backed up. +// +// Independent toggles: +// - AO: default = modcount + DDL timestamp; with --ao-file-hash, use per-table aoseg content hash +// - Heap: default = always include (original gpbackup behavior); with --heap-file-hash, skip unchanged via file hash func FilterTablesForIncremental(lastBackupTOC, currentTOC *toc.TOC, tables []Table) []Table { + useAOHash := MustGetFlagBool(options.AO_FILE_HASH) + useHeapHash := MustGetFlagBool(options.HEAP_FILE_HASH) + var filteredTables []Table for _, table := range tables { - currentAOEntry, isAOTable := currentTOC.IncrementalMetadata.AO[table.FQN()] - if !isAOTable { + fqn := table.FQN() + + if currentAO, isAO := currentTOC.IncrementalMetadata.AO[fqn]; isAO { + if useAOHash && currentAO.FileHashMD5 != "" { + prevAO, hasPrev := lastBackupTOC.IncrementalMetadata.AO[fqn] + if !hasPrev || prevAO.FileHashMD5 == "" { + gplog.Debug("Filter: %s (AO/content) prev hash missing, including", fqn) + filteredTables = append(filteredTables, table) + continue + } + if prevAO.FileHashMD5 != currentAO.FileHashMD5 { + filteredTables = append(filteredTables, table) + } + } else { + prevAO := lastBackupTOC.IncrementalMetadata.AO[fqn] + if prevAO.Modcount != currentAO.Modcount || prevAO.LastDDLTimestamp != currentAO.LastDDLTimestamp { + filteredTables = append(filteredTables, table) + } + } + continue + } + + if !useHeapHash { + // Original behavior: heap tables always included in incremental filteredTables = append(filteredTables, table) continue } - previousAOEntry := lastBackupTOC.IncrementalMetadata.AO[table.FQN()] - if previousAOEntry.Modcount != currentAOEntry.Modcount || previousAOEntry.LastDDLTimestamp != currentAOEntry.LastDDLTimestamp { + // --heap-file-hash: skip heap tables whose file hash is unchanged + if currentTOC.IncrementalMetadata.Heap == nil { + filteredTables = append(filteredTables, table) + continue + } + currentHeap, isHeap := currentTOC.IncrementalMetadata.Heap[fqn] + if !isHeap || currentHeap.FileHashMD5 == "" { + gplog.Debug("Filter: %s (Heap) no current hash, including", fqn) + filteredTables = append(filteredTables, table) + continue + } + if lastBackupTOC.IncrementalMetadata.Heap == nil { + filteredTables = append(filteredTables, table) + continue + } + prevHeap, hasPrev := lastBackupTOC.IncrementalMetadata.Heap[fqn] + if !hasPrev || prevHeap.FileHashMD5 == "" { + gplog.Debug("Filter: %s (Heap) prev hash missing, including", fqn) + filteredTables = append(filteredTables, table) + continue + } + if prevHeap.FileHashMD5 != currentHeap.FileHashMD5 { filteredTables = append(filteredTables, table) } } diff --git a/backup/incremental_test.go b/backup/incremental_test.go index 425313d1..674ac03a 100644 --- a/backup/incremental_test.go +++ b/backup/incremental_test.go @@ -61,7 +61,12 @@ var _ = Describe("backup/incremental tests", func() { tblAOUnchanged, } - filteredTables := backup.FilterTablesForIncremental(&prevTOC, &currTOC, tables) + // FilterTablesForIncremental reads --ao-file-hash / --heap-file-hash flags via MustGetFlagBool, + // so the call must happen after BeforeSuite initializes cmdFlags — i.e. inside a leaf node. + var filteredTables []backup.Table + JustBeforeEach(func() { + filteredTables = backup.FilterTablesForIncremental(&prevTOC, &currTOC, tables) + }) It("Should include the heap table in the filtered list", func() { Expect(filteredTables).To(ContainElement(tblHeap)) diff --git a/backup/queries_incremental.go b/backup/queries_incremental.go index abc295da..bc8e0eb6 100644 --- a/backup/queries_incremental.go +++ b/backup/queries_incremental.go @@ -2,6 +2,7 @@ package backup import ( "fmt" + "strings" "github.com/apache/cloudberry-backup/toc" "github.com/apache/cloudberry-go-libs/dbconn" @@ -176,3 +177,237 @@ func getLastDDLTimestamps(connectionPool *dbconn.DBConn) map[string]string { } return resultMap } + +// heapTable carries the catalog identity of a heap table eligible for file-hash +// incremental backup. Oid is what we pass into the file-hash plpgsql function +// (pg_relation_filepath resolves it locally on each segment); FQN is for keying +// the result map and for log messages. +type heapTable struct { + Oid uint32 + FQN string +} + +// getFileHashesForTables collects file timestamp+size MD5 hashes for the given +// heap tables using the provided connection. The connection must be reused +// across all tables so gp_segment_id mapping stays consistent. +func getFileHashesForTables(hashConn *dbconn.DBConn, tables []heapTable) map[string]string { + result := make(map[string]string) + for _, t := range tables { + hash := getTableFileHash(hashConn, t.Oid, t.FQN) + if hash != "" { + result[t.FQN] = hash + } + } + return result +} + +// ensureFileStatFunction creates a plpgsql function (gp_toolkit.gpbackup_file_info) +// that uses pg_relation_filepath() + pg_stat_file() to read each segment's +// data-file mtime+size. Pure SQL, no plpython/shell. Uses a separate connection +// so a failed setup does not abort the backup transaction. +// +// pg_relation_filepath is used because manual path construction is fragile: +// custom tablespaces live under pg_tblspc//PG__//, +// and that PG__ segment varies by server version. Letting the +// server compute the path keeps us correct across versions and tablespace +// configurations. +func ensureFileStatFunction(connectionPool *dbconn.DBConn) bool { + gplog.Verbose("Setting up file hash detection function (plpgsql + pg_stat_file)") + + setupConn := dbconn.NewDBConnFromEnvironment(connectionPool.DBName) + setupConn.MustConnect(1) + defer setupConn.Close() + + checkSQL := `SELECT 1 AS val FROM pg_proc + WHERE proname = 'gpbackup_file_info' + AND pronargs = 1 + AND pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'gp_toolkit');` + var checkResult []struct{ Val int } + err := setupConn.Select(&checkResult, checkSQL) + + if err != nil || len(checkResult) == 0 { + gplog.Verbose("Creating gp_toolkit.gpbackup_file_info function") + + // Drop any older (text, text) signature from a previous gpbackup version + // before recreating with the new (oid) signature. + _, _ = setupConn.Exec("DROP FUNCTION IF EXISTS gp_toolkit.gpbackup_file_info(text, text);", 0) + + createSQL := ` +CREATE OR REPLACE FUNCTION gp_toolkit.gpbackup_file_info(p_oid oid) +RETURNS text AS $BODY$ +DECLARE + v_path text; + v_mod text; + v_size text; +BEGIN + v_path := pg_relation_filepath(p_oid); + IF v_path IS NULL THEN + RETURN ''; + END IF; + + BEGIN + SELECT (pg_stat_file(v_path)).modification::text, + (pg_stat_file(v_path)).size::text + INTO v_mod, v_size; + EXCEPTION WHEN OTHERS THEN + RETURN ''; + END; + + RETURN COALESCE(v_mod, '') || '|' || COALESCE(v_size, '0'); +END; +$BODY$ LANGUAGE plpgsql;` + + _, createErr := setupConn.Exec(createSQL, 0) + if createErr != nil { + gplog.Warn("Could not create gpbackup_file_info function: %v", createErr) + return false + } + gplog.Verbose("gpbackup_file_info function created successfully") + } + + return true +} + +// getHeapTables returns (oid, FQN) pairs of heap tables eligible for incremental backup. +func getHeapTables(connectionPool *dbconn.DBConn) []heapTable { + var query string + if connectionPool.Version.IsGPDB() && connectionPool.Version.Before("7") { + query = fmt.Sprintf(` + SELECT c.oid, + quote_ident(n.nspname) || '.' || quote_ident(c.relname) AS tablefqn + FROM pg_class c + JOIN pg_namespace n ON c.relnamespace = n.oid + WHERE c.relstorage = 'h' + AND c.relkind = 'r' + AND c.oid NOT IN (SELECT inhrelid FROM pg_inherits) + AND %s`, relationAndSchemaFilterClause()) + } else { + query = fmt.Sprintf(` + SELECT c.oid, + quote_ident(n.nspname) || '.' || quote_ident(c.relname) AS tablefqn + FROM pg_class c + JOIN pg_namespace n ON c.relnamespace = n.oid + JOIN pg_am a ON c.relam = a.oid + WHERE a.amname = 'heap' + AND c.relkind = 'r' + AND c.oid NOT IN (SELECT inhrelid FROM pg_inherits) + AND %s`, relationAndSchemaFilterClause()) + } + + var results []struct { + Oid uint32 + TableFQN string + } + err := connectionPool.Select(&results, query) + gplog.FatalOnError(err) + out := make([]heapTable, len(results)) + for i, r := range results { + out[i] = heapTable{Oid: r.Oid, FQN: r.TableFQN} + } + return out +} + +// getTableFileHash computes an MD5 hash of per-segment data-file mtime+size for +// a heap table, using gp_toolkit.gpbackup_file_info(oid). gp_dist_random('gp_id') +// runs the function locally on each segment; pg_relation_filepath on the segment +// resolves the local path, so each segment hashes its own copy of the table. +func getTableFileHash(hashConn *dbconn.DBConn, oid uint32, fqn string) string { + // oid is interpolated as an integer literal -- no string-escaping concerns. + query := fmt.Sprintf(` + SELECT COALESCE(md5(string_agg( + gp_segment_id::text || ',' || info, chr(10) ORDER BY gp_segment_id + )), '') AS filehash + FROM ( + SELECT gp_segment_id, + gp_toolkit.gpbackup_file_info(%d::oid) AS info + FROM gp_dist_random('gp_id') + ) x + WHERE info <> ''`, oid) + + var results []struct{ FileHash string } + err := hashConn.Select(&results, query) + if err != nil { + gplog.Warn("Could not get file hash for %s: %v", fqn, err) + return "" + } + if len(results) == 0 || results[0].FileHash == "" { + return "" + } + return results[0].FileHash +} + +// GetAOContentHashes returns a per-table aoseg content hash for every AO/AOCS table. +// In GP5, parent modcount changes when any child partition is modified, but each +// child's aoseg table only changes when that specific child receives data — so +// hashing the aoseg rows gives partition-level granularity. +// Uses a dedicated connection to avoid transaction abort propagation. +func GetAOContentHashes(connectionPool *dbconn.DBConn) map[string]string { + segTableFQNs := getAOSegTableFQNs(connectionPool) + + hashConn := dbconn.NewDBConnFromEnvironment(connectionPool.DBName) + hashConn.MustConnect(1) + defer hashConn.Close() + + result := make(map[string]string) + for aoTableFQN, aosegTableFQN := range segTableFQNs { + hash := getAOSegContentHash(hashConn, aosegTableFQN) + if hash != "" { + result[aoTableFQN] = hash + } + } + return result +} + +// getAOSegContentHash hashes content-bearing columns of the aoseg metadata table — +// deliberately excluding modcount, which in GP5 propagates across sibling partitions +// even when only one is modified. +// +// - AO row (pg_aoseg_*): segno + eof + tupcount. +// - AOCS column store (pg_aocsseg_*): layout differs by product / version. +// Cloudberry AOCS exposes vpinfo (vertical-partition info, a bytea containing +// per-column eofs); GPDB 6+ exposes column_num + physical_segno + +// eof_uncompressed; pre-GP6 only has segno + tupcount. If a column is missing +// on a given version, the query fails and getAOSegContentHash returns "", +// and FilterTablesForIncremental falls back to modcount comparison for that +// table. +func getAOSegContentHash(hashConn *dbconn.DBConn, aosegTableFQN string) string { + isColumnStore := strings.Contains(aosegTableFQN, "pg_aocsseg") + + var cols string + if isColumnStore { + switch { + case !hashConn.Version.IsGPDB(): + // Cloudberry AOCS: segno, tupcount, vpinfo (bytea with per-column EOFs) + cols = "segno::text || ',' || tupcount::text || ',' || encode(vpinfo, 'hex')" + case hashConn.Version.Before("6"): + cols = "segno::text || ',' || tupcount::text" + default: + // GPDB 6+ AOCS + cols = "segno::text || ',' || column_num::text || ',' || physical_segno::text || ',' || tupcount::text || ',' || eof_uncompressed::text" + } + } else { + cols = "segno::text || ',' || eof::text || ',' || tupcount::text" + } + + var query string + if hashConn.Version.IsGPDB() && hashConn.Version.Before("7") { + query = fmt.Sprintf(`SELECT COALESCE(md5(string_agg(%s, + chr(10) ORDER BY segno)), '') AS contenthash FROM %s`, cols, aosegTableFQN) + } else { + query = fmt.Sprintf(`SELECT COALESCE(md5(string_agg( + gp_segment_id::text || ',' || %s, + chr(10) ORDER BY gp_segment_id, segno)), '') AS contenthash FROM gp_dist_random('%s')`, + cols, aosegTableFQN) + } + + var results []struct{ ContentHash string } + err := hashConn.Select(&results, query) + if err != nil { + gplog.Warn("Could not get aoseg content hash for %s: %v", aosegTableFQN, err) + return "" + } + if len(results) == 0 { + return "" + } + return results[0].ContentHash +} diff --git a/backup/validate.go b/backup/validate.go index e3169d6f..7131a7fd 100644 --- a/backup/validate.go +++ b/backup/validate.go @@ -156,6 +156,12 @@ func validateFlagCombinations(flags *pflag.FlagSet) { if MustGetFlagBool(options.INCREMENTAL) && !MustGetFlagBool(options.LEAF_PARTITION_DATA) { gplog.Fatal(errors.Errorf("--leaf-partition-data must be specified with --incremental"), "") } + if MustGetFlagBool(options.HEAP_FILE_HASH) && !MustGetFlagBool(options.INCREMENTAL) && !MustGetFlagBool(options.LEAF_PARTITION_DATA) { + gplog.Fatal(errors.Errorf("--heap-file-hash requires --leaf-partition-data (for full backup baseline) or --incremental"), "") + } + if MustGetFlagBool(options.AO_FILE_HASH) && !MustGetFlagBool(options.INCREMENTAL) && !MustGetFlagBool(options.LEAF_PARTITION_DATA) { + gplog.Fatal(errors.Errorf("--ao-file-hash requires --leaf-partition-data (for full backup baseline) or --incremental"), "") + } if MustGetFlagBool(options.NO_INHERITS) && !(FlagChanged(options.INCLUDE_RELATION) || FlagChanged(options.INCLUDE_RELATION_FILE)) { gplog.Fatal(errors.Errorf("--no-inherits must be specified with either --include-table or --include-table-file"), "") } diff --git a/backup/wrappers.go b/backup/wrappers.go index 284348cd..ce643a33 100644 --- a/backup/wrappers.go +++ b/backup/wrappers.go @@ -760,8 +760,64 @@ func backupTableStatistics(statisticsFile *utils.FileWithByteCount, tables []Tab } func backupIncrementalMetadata() { + // Always collect AO metadata via modcount + DDL timestamp. aoTableEntries := GetAOIncrementalMetadata(connectionPool) globalTOC.IncrementalMetadata.AO = aoTableEntries + + useHeapHash := MustGetFlagBool(options.HEAP_FILE_HASH) + useAOHash := MustGetFlagBool(options.AO_FILE_HASH) + + if !useHeapHash && !useAOHash { + gplog.Info("Collected incremental metadata: %d AO tables", len(aoTableEntries)) + return + } + + aoContentHashCount := 0 + if useAOHash { + gplog.Info("Collecting AO aoseg content hashes (--ao-file-hash)") + aoContentHashes := GetAOContentHashes(connectionPool) + aoContentHashCount = len(aoContentHashes) + for fqn, hash := range aoContentHashes { + if existing, ok := aoTableEntries[fqn]; ok { + existing.FileHashMD5 = hash + aoTableEntries[fqn] = existing + } + } + globalTOC.IncrementalMetadata.AO = aoTableEntries + } + + heapHashCount := 0 + if useHeapHash { + if !ensureFileStatFunction(connectionPool) { + gplog.Warn("File hash setup incomplete, skipping heap file hash collection") + } else { + hashConn := dbconn.NewDBConnFromEnvironment(connectionPool.DBName) + hashConn.MustConnect(1) + defer hashConn.Close() + + // CHECKPOINT flushes dirty pages so pg_stat_file sees up-to-date mtime/size. + gplog.Verbose("Executing CHECKPOINT before heap file hash collection") + if _, cpErr := hashConn.Exec("CHECKPOINT;", 0); cpErr != nil { + gplog.Warn("CHECKPOINT failed (non-fatal, file hashes may be stale): %v", cpErr) + } + + heapTables := getHeapTables(connectionPool) + heapFileHashes := getFileHashesForTables(hashConn, heapTables) + heapEntries := make(map[string]toc.HeapEntry) + for fqn, hash := range heapFileHashes { + if hash != "" { + heapEntries[fqn] = toc.HeapEntry{FileHashMD5: hash} + } + } + if len(heapEntries) > 0 { + globalTOC.IncrementalMetadata.Heap = heapEntries + } + heapHashCount = len(heapEntries) + } + } + + gplog.Info("Collected incremental metadata: %d AO tables (%d with content hash), %d heap tables", + len(aoTableEntries), aoContentHashCount, heapHashCount) } func backupStorageServers(metadataFile *utils.FileWithByteCount) { diff --git a/options/flag.go b/options/flag.go index 3618a259..3a4399cc 100644 --- a/options/flag.go +++ b/options/flag.go @@ -54,6 +54,8 @@ const ( RESIZE_CLUSTER = "resize-cluster" NO_INHERITS = "no-inherits" REPORT_DIR = "report-dir" + HEAP_FILE_HASH = "heap-file-hash" + AO_FILE_HASH = "ao-file-hash" ) func SetBackupFlagDefaults(flagSet *pflag.FlagSet) { @@ -89,6 +91,8 @@ func SetBackupFlagDefaults(flagSet *pflag.FlagSet) { flagSet.Bool(WITH_STATS, false, "Back up query plan statistics") flagSet.Bool(WITHOUT_GLOBALS, false, "Skip backup of global metadata") flagSet.Bool(NO_INHERITS, false, "For a filtered backup, don't back up all tables that inherit included tables") + flagSet.Bool(HEAP_FILE_HASH, false, "Use file timestamp hash (CHECKPOINT + pg_stat_file) to detect heap table changes for incremental backup") + flagSet.Bool(AO_FILE_HASH, false, "Use aoseg content hash (eof+tupcount) for per-partition AO table change detection; avoids GP5 modcount cross-partition propagation") } func SetRestoreFlagDefaults(flagSet *pflag.FlagSet) { diff --git a/toc/toc.go b/toc/toc.go index 32cb913a..2e291d28 100644 --- a/toc/toc.go +++ b/toc/toc.go @@ -53,12 +53,18 @@ type SegmentDataEntry struct { } type IncrementalEntries struct { - AO map[string]AOEntry + AO map[string]AOEntry + Heap map[string]HeapEntry `yaml:"heap,omitempty"` } type AOEntry struct { Modcount int64 LastDDLTimestamp string + FileHashMD5 string `yaml:"fileHashMD5,omitempty"` +} + +type HeapEntry struct { + FileHashMD5 string `yaml:"fileHashMD5,omitempty"` } type UniqueID struct {