Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{
buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable,
},
model.ActionAddIndex: {
buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable,
buildPersistedDDLEventFunc: buildPersistedDDLEventForAddIndex,
updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable,
updateFullTableInfoFunc: updateFullTableInfoForSingleTableDDL,
updateSchemaMetadataFunc: updateSchemaMetadataIgnore,
Expand Down Expand Up @@ -407,7 +407,7 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{
buildDDLEventFunc: buildDDLEventForCreateTables,
},
model.ActionMultiSchemaChange: {
buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable,
buildPersistedDDLEventFunc: buildPersistedDDLEventForMultiSchemaChange,
updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable,
updateFullTableInfoFunc: updateFullTableInfoForSingleTableDDL,
updateSchemaMetadataFunc: updateSchemaMetadataIgnore,
Expand Down Expand Up @@ -673,6 +673,20 @@ func buildPersistedDDLEventForNormalDDLOnSingleTable(args buildPersistedDDLEvent
return event
}

func buildPersistedDDLEventForMultiSchemaChange(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent {
event := buildPersistedDDLEventForNormalDDLOnSingleTable(args)
event.IndexIDs = getIndexIDs(args.job)
return event
}

func buildPersistedDDLEventForAddIndex(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent {
event := buildPersistedDDLEventCommon(args)
event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID)
event.TableName = getTableName(args.tableMap, event.TableID)
event.IndexIDs = getIndexIDs(args.job)
return event
}

func buildPersistedDDLEventForTruncateTable(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent {
event := buildPersistedDDLEventCommon(args)
// only table id change after truncate
Expand Down Expand Up @@ -1831,7 +1845,8 @@ func buildDDLEventCommon(rawEvent *PersistedDDLEvent, tableFilter filter.Filter,
TiDBOnly: tiDBOnly,
BDRMode: rawEvent.BDRRole,

NotSync: notSync,
NotSync: notSync,
IndexIDs: rawEvent.IndexIDs,
}, !filtered, nil
}

Expand Down
4 changes: 4 additions & 0 deletions logservice/schemastore/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ type PersistedDDLEvent struct {
// TODO: do we need the following two fields?
BDRRole string `msg:"bdr_role"`
CDCWriteSource uint64 `msg:"cdc_write_source"`

// IndexIDs store the add index ids in SQL order for add index and multi schema change DDLs.
// MySQL sink uses them to recover anonymous index names.
IndexIDs []int64 `msg:"index_ids"`
}

// TODO: use msgp.Raw to do version management
Expand Down
71 changes: 66 additions & 5 deletions logservice/schemastore/types_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 28 additions & 23 deletions logservice/schemastore/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/format"
"go.uber.org/zap"
)

Expand All @@ -39,37 +38,19 @@ func transformDDLJobQuery(job *model.Job) (string, error) {
return "", errors.Trace(err)
}
var result string
buildQuery := func(stmt ast.StmtNode) (string, error) {
var sb strings.Builder
// translate TiDB feature to special comment
restoreFlags := format.RestoreTiDBSpecialComment
// escape the keyword
restoreFlags |= format.RestoreNameBackQuotes
// upper case keyword
restoreFlags |= format.RestoreKeyWordUppercase
// wrap string with single quote
restoreFlags |= format.RestoreStringSingleQuotes
// remove placement rule
restoreFlags |= format.SkipPlacementRuleForRestore
// force disable ttl
restoreFlags |= format.RestoreWithTTLEnableOff
if err = stmt.Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil {
return "", errors.Trace(err)
}
return sb.String(), nil
}

if len(stmts) > 1 {
results := make([]string, 0, len(stmts))
for _, stmt := range stmts {
query, err := buildQuery(stmt)
query, err := commonEvent.Restore(stmt)
if err != nil {
return "", errors.Trace(err)
}
results = append(results, query)
}
result = strings.Join(results, ";")
} else {
result, err = buildQuery(stmts[0])
result, err = commonEvent.Restore(stmts[0])
if err != nil {
return "", errors.Trace(err)
}
Expand Down Expand Up @@ -98,3 +79,27 @@ func isSplitable(tableInfo *model.TableInfo) bool {
}
return true
}

func getIndexIDs(job *model.Job) []int64 {
res := make([]int64, 0)
idxArgs, err := model.GetModifyIndexArgs(job)
if idxArgs == nil || err != nil {
if job.MultiSchemaInfo == nil {
return res
}
for idx, subJob := range job.MultiSchemaInfo.SubJobs {
proxyJob := subJob.ToProxyJob(job, idx)
idxArgs, err := model.GetModifyIndexArgs(&proxyJob)
if idxArgs != nil && err == nil {
for _, indexArg := range idxArgs.IndexArgs {
res = append(res, indexArg.IndexID)
}
}
}
return res
}
for _, indexArg := range idxArgs.IndexArgs {
res = append(res, indexArg.IndexID)
}
return res
}
98 changes: 98 additions & 0 deletions logservice/schemastore/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"

commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -60,3 +61,100 @@ func TestIsSplitable(t *testing.T) {
tableInfo = helper.GetModelTableInfo(job)
require.False(t, isSplitable(tableInfo))
}

func TestBuildPersistedDDLEventForMultiSchemaChangeContainsIndexIDs(t *testing.T) {
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")
helper.DDL2Event("create table t (id int primary key, c1 int)")

job := helper.DDL2Job("alter table t add column c2 int, add index (c1)")
require.Equal(t, model.ActionMultiSchemaChange, job.Type)

args := buildPersistedDDLEventFuncArgs{
job: job,
databaseMap: map[int64]*BasicDatabaseInfo{
job.SchemaID: {
Name: "test",
Tables: map[int64]bool{
job.TableID: true,
},
},
},
tableMap: map[int64]*BasicTableInfo{
job.TableID: {
SchemaID: job.SchemaID,
Name: "t",
},
},
}

event := buildPersistedDDLEventForMultiSchemaChange(args)
expectedIndexIDs := getIndexIDs(job)
require.Len(t, expectedIndexIDs, 1)
require.Equal(t, expectedIndexIDs, event.IndexIDs)
require.Equal(t, "test", event.SchemaName)
require.Equal(t, "t", event.TableName)
}

func TestGetIndexIDsReturnsAllAddIndexIDsInOrder(t *testing.T) {
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")
helper.DDL2Event("create table t (id int primary key, c1 int)")

job := helper.DDL2Job("alter table t add index idx_c1(c1), add index (c1)")
tableInfo := helper.GetModelTableInfo(job)
require.NotNil(t, tableInfo)

var namedIndexID int64
var anonymousIndexID int64
for _, index := range tableInfo.Indices {
if index == nil {
continue
}
if index.Name.O == "idx_c1" {
namedIndexID = index.ID
continue
}
if len(index.Columns) == 1 && index.Columns[0].Name.L == "c1" {
anonymousIndexID = index.ID
}
}
require.NotZero(t, namedIndexID)
require.NotZero(t, anonymousIndexID)
require.Equal(t, []int64{namedIndexID, anonymousIndexID}, getIndexIDs(job))
}

func TestGetIndexIDsReturnsAllAddIndexIDsInOrderForMultiSchemaChange(t *testing.T) {
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")
helper.DDL2Event("create table t (id int primary key, c1 int)")

job := helper.DDL2Job("alter table t add column c2 int, add index idx_c1(c1), add index (c1)")
require.Equal(t, model.ActionMultiSchemaChange, job.Type)
tableInfo := helper.GetModelTableInfo(job)
require.NotNil(t, tableInfo)

var namedIndexID int64
var anonymousIndexID int64
for _, index := range tableInfo.Indices {
if index == nil {
continue
}
if index.Name.O == "idx_c1" {
namedIndexID = index.ID
continue
}
if len(index.Columns) == 1 && index.Columns[0].Name.L == "c1" {
anonymousIndexID = index.ID
}
}
require.NotZero(t, namedIndexID)
require.NotZero(t, anonymousIndexID)
require.Equal(t, []int64{namedIndexID, anonymousIndexID}, getIndexIDs(job))
}
Loading
Loading