Skip to content
Merged
146 changes: 100 additions & 46 deletions logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,56 +699,77 @@ func buildPersistedDDLEventForRenameTable(args buildPersistedDDLEventFuncArgs) P
event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID)
// get the table's current table name from the ddl job
event.TableName = event.TableInfo.Name.O

// The old schema/table names cannot rely on ExtraSchemaName/ExtraTableName,
// because the snapshot used by schema store may already reflect the post-rename state.
// Example (after https://github.com/pingcap/tidb/pull/43341):
// table `test.t`, DDL `rename table t to test2.t;`, commit ts = 100
// snapshot at ts = 99 already shows `t` under `test2`
// => event.ExtraSchemaName becomes `test2`, which is wrong for the old name
// SchemaStore can still use ExtraSchemaID to update internal state,
// but the emitted event.Query must carry the correct old names.
// Rebuild them with the following precedence:
// 1. InvolvingSchemaInfo provides a fallback old schema/table pair, but names may be normalized.
// 2. RenameTableArgs.OldSchemaName overrides the fallback when available.
// It is reliable in TiDB >= v8.5, but can be missing in older versions.
// 3. The original query (if it specifies old schema) has the highest priority for identifier case.
// 4. If the query omits old schema and ExtraSchemaID differs from SchemaID, use ExtraSchemaID to
// recover the old schema name from the schema store.
oldSchemaName := ""
oldTableName := ""
oldSchemaSource := "unknown"
if len(args.job.InvolvingSchemaInfo) > 0 {
log.Info("buildPersistedDDLEvent for rename table",
oldSchemaName = args.job.InvolvingSchemaInfo[0].Database
oldTableName = args.job.InvolvingSchemaInfo[0].Table
if oldSchemaName != "" {
oldSchemaSource = "involving_schema_info"
}
}
if args.job.Version == model.JobVersion1 || args.job.Version == model.JobVersion2 {
if renameArgs, err := model.GetRenameTableArgs(args.job); err == nil {
if renameArgs.OldSchemaName.O != "" {
oldSchemaName = renameArgs.OldSchemaName.O
oldSchemaSource = "rename_table_args"
}
} else {
log.Warn("failed to get rename table args from ddl job",
zap.Int64("jobID", args.job.ID),
zap.String("query", event.Query),
zap.Error(err))
}
}
queryProvidedOldSchema := false
if queryInfo, parsed := parseRenameTableQueryInfo(args.job.Query); parsed {
if queryInfo.oldTableName != "" {
oldTableName = queryInfo.oldTableName
}
if queryInfo.oldSchemaName != "" {
oldSchemaName = queryInfo.oldSchemaName
queryProvidedOldSchema = true
oldSchemaSource = "query"
}
}
// ExtraSchemaID can be incorrect due to snapshot timing, so only use it if the query
// does not specify the old schema.
if !queryProvidedOldSchema && event.ExtraSchemaID != 0 && event.ExtraSchemaID != event.SchemaID {
if extraName := getSchemaName(args.databaseMap, event.ExtraSchemaID); extraName != "" {
oldSchemaName = extraName
oldSchemaSource = "extra_schema_id"
}
}
if oldSchemaName != "" && oldTableName != "" {
log.Info("rebuild rename table query",
zap.Int64("jobID", event.ID),
zap.String("query", event.Query),
zap.Int64("schemaID", event.SchemaID),
zap.String("SchemaName", event.SchemaName),
zap.String("schemaName", event.SchemaName),
zap.String("tableName", event.TableName),
zap.Int64("ExtraSchemaID", event.ExtraSchemaID),
zap.String("ExtraSchemaName", event.ExtraSchemaName),
zap.String("ExtraTableName", event.ExtraTableName),
zap.Any("involvingSchemaInfo", args.job.InvolvingSchemaInfo))
// The query in job maybe "RENAME TABLE table1 to test2.table2", we need rebuild it here.
//
// Note: Why use args.job.InvolvingSchemaInfo to build query?
// because event.ExtraSchemaID may not be accurate for rename table in some case.
// after pr: https://github.com/pingcap/tidb/pull/43341,
// assume there is a table `test.t` and a ddl: `rename table t to test2.t;`, and its commit ts is `100`.
// if you get a ddl snapshot at ts `99`, table `t` is already in `test2`.
// so event.ExtraSchemaName will also be `test2`.
// And because SchemaStore is the source of truth inside cdc,
// we can use event.ExtraSchemaID(even it is wrong) to update the internal state of the cdc.
// But event.Query will be emit to downstream(out of cdc), we must make it correct.
//
// InvolvingSchemaInfo returns the schema info involved in the job.
// The value should be stored in lower case.
//
// InvolvingSchemaInfo may store normalized lower-case names,
// while the original query can keep user-provided identifier case.
// Prefer names parsed from the original query whenever possible.
// See https://github.com/pingcap/ticdc/pull/2218 for background.
oldSchemaName := args.job.InvolvingSchemaInfo[0].Database
oldTableName := args.job.InvolvingSchemaInfo[0].Table
stmt, err := parser.New().ParseOneStmt(args.job.Query, "", "")
if err != nil {
log.Error("parse statement failed for build persisted DDL event", zap.Any("DDL", args.job.Query), zap.Error(err))
} else {
switch s := stmt.(type) {
case *ast.AlterTableStmt:
oldTableName = s.Table.Name.O
if schemaName := s.Table.Schema.O; schemaName != "" {
oldSchemaName = schemaName
}
case *ast.RenameTableStmt:
oldTableName = s.TableToTables[0].OldTable.Name.O
if schemaName := s.TableToTables[0].OldTable.Schema.O; schemaName != "" {
oldSchemaName = schemaName
}
default:
log.Error("unknown stmt type", zap.String("query", args.job.Query), zap.Any("stmt", stmt))
}
}
zap.Int64("extraSchemaID", event.ExtraSchemaID),
zap.String("extraSchemaName", event.ExtraSchemaName),
zap.String("extraTableName", event.ExtraTableName),
zap.String("oldSchemaName", oldSchemaName),
zap.String("oldTableName", oldTableName),
zap.String("oldSchemaSource", oldSchemaSource))
event.Query = fmt.Sprintf("RENAME TABLE %s TO %s",
common.QuoteSchema(oldSchemaName, oldTableName),
common.QuoteSchema(event.SchemaName, event.TableName))
Expand Down Expand Up @@ -814,6 +835,39 @@ type renameTableQueryInfo struct {
newTableName string
}

func parseRenameTableQueryInfo(query string) (renameTableQueryInfo, bool) {
if query == "" {
return renameTableQueryInfo{}, false
}
stmt, err := parser.New().ParseOneStmt(query, "", "")
if err != nil {
log.Warn("parse rename table query failed",
zap.String("query", query),
zap.Error(err))
return renameTableQueryInfo{}, false
}

switch s := stmt.(type) {
case *ast.AlterTableStmt:
return renameTableQueryInfo{
oldSchemaName: s.Table.Schema.O,
oldTableName: s.Table.Name.O,
}, true
case *ast.RenameTableStmt:
if len(s.TableToTables) == 0 {
return renameTableQueryInfo{}, false
}
return renameTableQueryInfo{
oldSchemaName: s.TableToTables[0].OldTable.Schema.O,
oldTableName: s.TableToTables[0].OldTable.Name.O,
newSchemaName: s.TableToTables[0].NewTable.Schema.O,
newTableName: s.TableToTables[0].NewTable.Name.O,
}, true
default:
return renameTableQueryInfo{}, false
}
}

func parseRenameTablesQueryInfos(query string) ([]renameTableQueryInfo, bool) {
if query == "" {
return nil, false
Expand Down
67 changes: 67 additions & 0 deletions logservice/schemastore/persist_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3069,6 +3069,7 @@ func TestGCPersistStorage(t *testing.T) {
}

func TestRenameTable(t *testing.T) {
// Case: query specifies both old and new schema; use query for old schema/table.
// use t;
job := buildRenameTableJobForTest(100, 101, "t1", 101, &model.InvolvingSchemaInfo{
Database: "t",
Expand All @@ -3089,6 +3090,7 @@ func TestRenameTable(t *testing.T) {
})
assert.Equal(t, "RENAME TABLE `t`.`t3` TO `test`.`t1`", ddl.Query)

// Case: same-schema rename, query omits schema; fallback to InvolvingSchemaInfo.
// use test;
job = buildRenameTableJobForTest(100, 101, "t2", 100, &model.InvolvingSchemaInfo{
Database: "test",
Expand All @@ -3109,6 +3111,7 @@ func TestRenameTable(t *testing.T) {
})
assert.Equal(t, "RENAME TABLE `test`.`t1` TO `test`.`t2`", ddl.Query)

// Case: ALTER TABLE ... RENAME TO, same-schema rename; fallback to InvolvingSchemaInfo.
// use test;
job = buildRenameTableJobForTest(100, 101, "t2", 100, &model.InvolvingSchemaInfo{
Database: "test",
Expand All @@ -3128,6 +3131,70 @@ func TestRenameTable(t *testing.T) {
},
})
assert.Equal(t, "RENAME TABLE `test`.`t1` TO `test`.`t2`", ddl.Query)

// Case: args provide old schema name, should override InvolvingSchemaInfo (ExtraSchemaID == SchemaID).
// use SalesDB;
job = buildRenameTableJobForTest(100, 101, "t1", 100, &model.InvolvingSchemaInfo{
Database: "salesdb",
Table: "t1",
})
job.Version = model.JobVersion2
job.FillArgs(&model.RenameTableArgs{
OldSchemaID: 200,
OldSchemaName: ast.NewCIStr("SalesDB"),
NewTableName: ast.NewCIStr("t1"),
})
job.Query = "RENAME TABLE t1 TO ArchiveDB.t1"
ddl = buildPersistedDDLEventForRenameTable(buildPersistedDDLEventFuncArgs{
job: job,
databaseMap: map[int64]*BasicDatabaseInfo{
100: {Name: "ArchiveDB", Tables: map[int64]bool{101: true}},
200: {Name: "SalesDB", Tables: map[int64]bool{101: true}},
},
tableMap: map[int64]*BasicTableInfo{
101: {SchemaID: 100, Name: "t1"},
},
})
assert.Equal(t, "RENAME TABLE `SalesDB`.`t1` TO `ArchiveDB`.`t1`", ddl.Query)

// Case: query omits old schema; ExtraSchemaID overwrites normalized InvolvingSchemaInfo to recover case.
job = buildRenameTableJobForTest(100, 101, "t1", 100, &model.InvolvingSchemaInfo{
Database: "salesdb",
Table: "t1",
})
job.Query = "RENAME TABLE t1 TO ArchiveDB.t1"
ddl = buildPersistedDDLEventForRenameTable(buildPersistedDDLEventFuncArgs{
job: job,
databaseMap: map[int64]*BasicDatabaseInfo{
100: {Name: "ArchiveDB", Tables: map[int64]bool{101: true}},
200: {Name: "SalesDB", Tables: map[int64]bool{101: true}},
},
tableMap: map[int64]*BasicTableInfo{
101: {SchemaID: 200, Name: "t1"},
},
})
assert.Equal(t, "RENAME TABLE `SalesDB`.`t1` TO `ArchiveDB`.`t1`", ddl.Query)

// Case: args provide old schema name, no InvolvingSchemaInfo.
job = buildRenameTableJobForTest(100, 101, "t1", 100, nil)
job.Version = model.JobVersion2
job.FillArgs(&model.RenameTableArgs{
OldSchemaID: 200,
OldSchemaName: ast.NewCIStr("SalesDB"),
NewTableName: ast.NewCIStr("t1"),
})
job.Query = "RENAME TABLE t1 TO ArchiveDB.t1"
ddl = buildPersistedDDLEventForRenameTable(buildPersistedDDLEventFuncArgs{
job: job,
databaseMap: map[int64]*BasicDatabaseInfo{
100: {Name: "ArchiveDB", Tables: map[int64]bool{101: true}},
200: {Name: "SalesDB", Tables: map[int64]bool{101: true}},
},
tableMap: map[int64]*BasicTableInfo{
101: {SchemaID: 200, Name: "t1"},
},
})
assert.Equal(t, "RENAME TABLE `SalesDB`.`t1` TO `ArchiveDB`.`t1`", ddl.Query)
}

func TestBuildPersistedDDLEventForRenameTablesFallbackOldTableName(t *testing.T) {
Expand Down