Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 54 additions & 4 deletions backup/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,67 @@ import (
"github.com/pkg/errors"
)

// FilterTablesForIncremental decides which tables need data backed up.
//
// Independent toggles:
// - AO: default = modcount + DDL timestamp; with --ao-file-hash, use per-table aoseg content hash
// - Heap: default = always include (original gpbackup behavior); with --heap-file-hash, skip unchanged via file hash
func FilterTablesForIncremental(lastBackupTOC, currentTOC *toc.TOC, tables []Table) []Table {
useAOHash := MustGetFlagBool(options.AO_FILE_HASH)
useHeapHash := MustGetFlagBool(options.HEAP_FILE_HASH)

var filteredTables []Table
for _, table := range tables {
currentAOEntry, isAOTable := currentTOC.IncrementalMetadata.AO[table.FQN()]
if !isAOTable {
fqn := table.FQN()

if currentAO, isAO := currentTOC.IncrementalMetadata.AO[fqn]; isAO {
if useAOHash && currentAO.FileHashMD5 != "" {
prevAO, hasPrev := lastBackupTOC.IncrementalMetadata.AO[fqn]
if !hasPrev || prevAO.FileHashMD5 == "" {
gplog.Debug("Filter: %s (AO/content) prev hash missing, including", fqn)
filteredTables = append(filteredTables, table)
continue
}
if prevAO.FileHashMD5 != currentAO.FileHashMD5 {
filteredTables = append(filteredTables, table)
}
} else {
prevAO := lastBackupTOC.IncrementalMetadata.AO[fqn]
if prevAO.Modcount != currentAO.Modcount || prevAO.LastDDLTimestamp != currentAO.LastDDLTimestamp {
filteredTables = append(filteredTables, table)
}
}
continue
}

if !useHeapHash {
// Original behavior: heap tables always included in incremental
filteredTables = append(filteredTables, table)
continue
}
previousAOEntry := lastBackupTOC.IncrementalMetadata.AO[table.FQN()]

if previousAOEntry.Modcount != currentAOEntry.Modcount || previousAOEntry.LastDDLTimestamp != currentAOEntry.LastDDLTimestamp {
// --heap-file-hash: skip heap tables whose file hash is unchanged
if currentTOC.IncrementalMetadata.Heap == nil {
filteredTables = append(filteredTables, table)
continue
}
currentHeap, isHeap := currentTOC.IncrementalMetadata.Heap[fqn]
if !isHeap || currentHeap.FileHashMD5 == "" {
gplog.Debug("Filter: %s (Heap) no current hash, including", fqn)
filteredTables = append(filteredTables, table)
continue
}
if lastBackupTOC.IncrementalMetadata.Heap == nil {
filteredTables = append(filteredTables, table)
continue
}
prevHeap, hasPrev := lastBackupTOC.IncrementalMetadata.Heap[fqn]
if !hasPrev || prevHeap.FileHashMD5 == "" {
gplog.Debug("Filter: %s (Heap) prev hash missing, including", fqn)
filteredTables = append(filteredTables, table)
continue
}
if prevHeap.FileHashMD5 != currentHeap.FileHashMD5 {
filteredTables = append(filteredTables, table)
}
}
Expand Down
7 changes: 6 additions & 1 deletion backup/incremental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ var _ = Describe("backup/incremental tests", func() {
tblAOUnchanged,
}

filteredTables := backup.FilterTablesForIncremental(&prevTOC, &currTOC, tables)
// FilterTablesForIncremental reads --ao-file-hash / --heap-file-hash flags via MustGetFlagBool,
// so the call must happen after BeforeSuite initializes cmdFlags — i.e. inside a leaf node.
var filteredTables []backup.Table
JustBeforeEach(func() {
filteredTables = backup.FilterTablesForIncremental(&prevTOC, &currTOC, tables)
})

It("Should include the heap table in the filtered list", func() {
Expect(filteredTables).To(ContainElement(tblHeap))
Expand Down
235 changes: 235 additions & 0 deletions backup/queries_incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backup

import (
"fmt"
"strings"

"github.com/apache/cloudberry-backup/toc"
"github.com/apache/cloudberry-go-libs/dbconn"
Expand Down Expand Up @@ -176,3 +177,237 @@ func getLastDDLTimestamps(connectionPool *dbconn.DBConn) map[string]string {
}
return resultMap
}

// heapTable carries the catalog identity of a heap table eligible for file-hash
// incremental backup. Oid is what we pass into the file-hash plpgsql function
// (pg_relation_filepath resolves it locally on each segment); FQN is for keying
// the result map and for log messages.
type heapTable struct {
Oid uint32
FQN string
}

// getFileHashesForTables collects file timestamp+size MD5 hashes for the given
// heap tables using the provided connection. The connection must be reused
// across all tables so gp_segment_id mapping stays consistent.
func getFileHashesForTables(hashConn *dbconn.DBConn, tables []heapTable) map[string]string {
result := make(map[string]string)
for _, t := range tables {
hash := getTableFileHash(hashConn, t.Oid, t.FQN)
if hash != "" {
result[t.FQN] = hash
}
}
return result
}

// ensureFileStatFunction creates a plpgsql function (gp_toolkit.gpbackup_file_info)
// that uses pg_relation_filepath() + pg_stat_file() to read each segment's
// data-file mtime+size. Pure SQL, no plpython/shell. Uses a separate connection
// so a failed setup does not abort the backup transaction.
//
// pg_relation_filepath is used because manual path construction is fragile:
// custom tablespaces live under pg_tblspc/<tsp>/PG_<major>_<catver>/<db>/<rfn>,
// and that PG_<major>_<catver> segment varies by server version. Letting the
// server compute the path keeps us correct across versions and tablespace
// configurations.
func ensureFileStatFunction(connectionPool *dbconn.DBConn) bool {
gplog.Verbose("Setting up file hash detection function (plpgsql + pg_stat_file)")

setupConn := dbconn.NewDBConnFromEnvironment(connectionPool.DBName)
setupConn.MustConnect(1)
defer setupConn.Close()

checkSQL := `SELECT 1 AS val FROM pg_proc
WHERE proname = 'gpbackup_file_info'
AND pronargs = 1
AND pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'gp_toolkit');`
var checkResult []struct{ Val int }
err := setupConn.Select(&checkResult, checkSQL)

if err != nil || len(checkResult) == 0 {
gplog.Verbose("Creating gp_toolkit.gpbackup_file_info function")

// Drop any older (text, text) signature from a previous gpbackup version
// before recreating with the new (oid) signature.
_, _ = setupConn.Exec("DROP FUNCTION IF EXISTS gp_toolkit.gpbackup_file_info(text, text);", 0)

createSQL := `
CREATE OR REPLACE FUNCTION gp_toolkit.gpbackup_file_info(p_oid oid)
RETURNS text AS $BODY$
DECLARE
v_path text;
v_mod text;
v_size text;
BEGIN
v_path := pg_relation_filepath(p_oid);
IF v_path IS NULL THEN
RETURN '';
END IF;

BEGIN
SELECT (pg_stat_file(v_path)).modification::text,
(pg_stat_file(v_path)).size::text
INTO v_mod, v_size;
EXCEPTION WHEN OTHERS THEN
RETURN '';
END;

RETURN COALESCE(v_mod, '') || '|' || COALESCE(v_size, '0');
END;
$BODY$ LANGUAGE plpgsql;`

_, createErr := setupConn.Exec(createSQL, 0)
if createErr != nil {
gplog.Warn("Could not create gpbackup_file_info function: %v", createErr)
return false
}
gplog.Verbose("gpbackup_file_info function created successfully")
}

return true
}

// getHeapTables returns (oid, FQN) pairs of heap tables eligible for incremental backup.
func getHeapTables(connectionPool *dbconn.DBConn) []heapTable {
var query string
if connectionPool.Version.IsGPDB() && connectionPool.Version.Before("7") {
query = fmt.Sprintf(`
SELECT c.oid,
quote_ident(n.nspname) || '.' || quote_ident(c.relname) AS tablefqn
FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE c.relstorage = 'h'
AND c.relkind = 'r'
AND c.oid NOT IN (SELECT inhrelid FROM pg_inherits)
AND %s`, relationAndSchemaFilterClause())
} else {
query = fmt.Sprintf(`
SELECT c.oid,
quote_ident(n.nspname) || '.' || quote_ident(c.relname) AS tablefqn
FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
JOIN pg_am a ON c.relam = a.oid
WHERE a.amname = 'heap'
AND c.relkind = 'r'
AND c.oid NOT IN (SELECT inhrelid FROM pg_inherits)
AND %s`, relationAndSchemaFilterClause())
}

var results []struct {
Oid uint32
TableFQN string
}
err := connectionPool.Select(&results, query)
gplog.FatalOnError(err)
out := make([]heapTable, len(results))
for i, r := range results {
out[i] = heapTable{Oid: r.Oid, FQN: r.TableFQN}
}
return out
}

// getTableFileHash computes an MD5 hash of per-segment data-file mtime+size for
// a heap table, using gp_toolkit.gpbackup_file_info(oid). gp_dist_random('gp_id')
// runs the function locally on each segment; pg_relation_filepath on the segment
// resolves the local path, so each segment hashes its own copy of the table.
func getTableFileHash(hashConn *dbconn.DBConn, oid uint32, fqn string) string {
// oid is interpolated as an integer literal -- no string-escaping concerns.
query := fmt.Sprintf(`
SELECT COALESCE(md5(string_agg(
gp_segment_id::text || ',' || info, chr(10) ORDER BY gp_segment_id
)), '') AS filehash
FROM (
SELECT gp_segment_id,
gp_toolkit.gpbackup_file_info(%d::oid) AS info
FROM gp_dist_random('gp_id')
) x
WHERE info <> ''`, oid)

var results []struct{ FileHash string }
err := hashConn.Select(&results, query)
if err != nil {
gplog.Warn("Could not get file hash for %s: %v", fqn, err)
return ""
}
if len(results) == 0 || results[0].FileHash == "" {
return ""
}
return results[0].FileHash
}

// GetAOContentHashes returns a per-table aoseg content hash for every AO/AOCS table.
// In GP5, parent modcount changes when any child partition is modified, but each
// child's aoseg table only changes when that specific child receives data — so
// hashing the aoseg rows gives partition-level granularity.
// Uses a dedicated connection to avoid transaction abort propagation.
func GetAOContentHashes(connectionPool *dbconn.DBConn) map[string]string {
segTableFQNs := getAOSegTableFQNs(connectionPool)

hashConn := dbconn.NewDBConnFromEnvironment(connectionPool.DBName)
hashConn.MustConnect(1)
defer hashConn.Close()

result := make(map[string]string)
for aoTableFQN, aosegTableFQN := range segTableFQNs {
hash := getAOSegContentHash(hashConn, aosegTableFQN)
if hash != "" {
result[aoTableFQN] = hash
}
}
return result
}

// getAOSegContentHash hashes content-bearing columns of the aoseg metadata table —
// deliberately excluding modcount, which in GP5 propagates across sibling partitions
// even when only one is modified.
//
// - AO row (pg_aoseg_*): segno + eof + tupcount.
// - AOCS column store (pg_aocsseg_*): layout differs by product / version.
// Cloudberry AOCS exposes vpinfo (vertical-partition info, a bytea containing
// per-column eofs); GPDB 6+ exposes column_num + physical_segno +
// eof_uncompressed; pre-GP6 only has segno + tupcount. If a column is missing
// on a given version, the query fails and getAOSegContentHash returns "",
// and FilterTablesForIncremental falls back to modcount comparison for that
// table.
func getAOSegContentHash(hashConn *dbconn.DBConn, aosegTableFQN string) string {
isColumnStore := strings.Contains(aosegTableFQN, "pg_aocsseg")

var cols string
if isColumnStore {
switch {
case !hashConn.Version.IsGPDB():
// Cloudberry AOCS: segno, tupcount, vpinfo (bytea with per-column EOFs)
cols = "segno::text || ',' || tupcount::text || ',' || encode(vpinfo, 'hex')"
case hashConn.Version.Before("6"):
cols = "segno::text || ',' || tupcount::text"
default:
// GPDB 6+ AOCS
cols = "segno::text || ',' || column_num::text || ',' || physical_segno::text || ',' || tupcount::text || ',' || eof_uncompressed::text"
}
} else {
cols = "segno::text || ',' || eof::text || ',' || tupcount::text"
}

var query string
if hashConn.Version.IsGPDB() && hashConn.Version.Before("7") {
query = fmt.Sprintf(`SELECT COALESCE(md5(string_agg(%s,
chr(10) ORDER BY segno)), '') AS contenthash FROM %s`, cols, aosegTableFQN)
} else {
query = fmt.Sprintf(`SELECT COALESCE(md5(string_agg(
gp_segment_id::text || ',' || %s,
chr(10) ORDER BY gp_segment_id, segno)), '') AS contenthash FROM gp_dist_random('%s')`,
cols, aosegTableFQN)
}

var results []struct{ ContentHash string }
err := hashConn.Select(&results, query)
if err != nil {
gplog.Warn("Could not get aoseg content hash for %s: %v", aosegTableFQN, err)
return ""
}
if len(results) == 0 {
return ""
}
return results[0].ContentHash
}
6 changes: 6 additions & 0 deletions backup/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ func validateFlagCombinations(flags *pflag.FlagSet) {
if MustGetFlagBool(options.INCREMENTAL) && !MustGetFlagBool(options.LEAF_PARTITION_DATA) {
gplog.Fatal(errors.Errorf("--leaf-partition-data must be specified with --incremental"), "")
}
if MustGetFlagBool(options.HEAP_FILE_HASH) && !MustGetFlagBool(options.INCREMENTAL) && !MustGetFlagBool(options.LEAF_PARTITION_DATA) {
gplog.Fatal(errors.Errorf("--heap-file-hash requires --leaf-partition-data (for full backup baseline) or --incremental"), "")
}
if MustGetFlagBool(options.AO_FILE_HASH) && !MustGetFlagBool(options.INCREMENTAL) && !MustGetFlagBool(options.LEAF_PARTITION_DATA) {
gplog.Fatal(errors.Errorf("--ao-file-hash requires --leaf-partition-data (for full backup baseline) or --incremental"), "")
}
if MustGetFlagBool(options.NO_INHERITS) && !(FlagChanged(options.INCLUDE_RELATION) || FlagChanged(options.INCLUDE_RELATION_FILE)) {
gplog.Fatal(errors.Errorf("--no-inherits must be specified with either --include-table or --include-table-file"), "")
}
Expand Down
Loading