diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 2b917737d9..591ae4b3e5 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -173,7 +173,7 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, }, model.ActionAddIndex: { - buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, + buildPersistedDDLEventFunc: buildPersistedDDLEventForAddIndex, updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, updateFullTableInfoFunc: updateFullTableInfoForSingleTableDDL, updateSchemaMetadataFunc: updateSchemaMetadataIgnore, @@ -407,7 +407,7 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ buildDDLEventFunc: buildDDLEventForCreateTables, }, model.ActionMultiSchemaChange: { - buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, + buildPersistedDDLEventFunc: buildPersistedDDLEventForMultiSchemaChange, updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, updateFullTableInfoFunc: updateFullTableInfoForSingleTableDDL, updateSchemaMetadataFunc: updateSchemaMetadataIgnore, @@ -673,6 +673,20 @@ func buildPersistedDDLEventForNormalDDLOnSingleTable(args buildPersistedDDLEvent return event } +func buildPersistedDDLEventForMultiSchemaChange(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { + event := buildPersistedDDLEventForNormalDDLOnSingleTable(args) + event.IndexIDs = getIndexIDs(args.job) + return event +} + +func buildPersistedDDLEventForAddIndex(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { + event := buildPersistedDDLEventCommon(args) + event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) + event.TableName = getTableName(args.tableMap, event.TableID) + event.IndexIDs = getIndexIDs(args.job) + return event +} + func buildPersistedDDLEventForTruncateTable(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { event := buildPersistedDDLEventCommon(args) // only table id change after truncate @@ -1831,7 +1845,8 @@ func buildDDLEventCommon(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, TiDBOnly: tiDBOnly, BDRMode: rawEvent.BDRRole, - NotSync: notSync, + NotSync: notSync, + IndexIDs: rawEvent.IndexIDs, }, !filtered, nil } diff --git a/logservice/schemastore/types.go b/logservice/schemastore/types.go index 5b16ed4eb3..1fbfce210b 100644 --- a/logservice/schemastore/types.go +++ b/logservice/schemastore/types.go @@ -88,6 +88,10 @@ type PersistedDDLEvent struct { // TODO: do we need the following two fields? BDRRole string `msg:"bdr_role"` CDCWriteSource uint64 `msg:"cdc_write_source"` + + // IndexIDs store the add index ids in SQL order for add index and multi schema change DDLs. + // MySQL sink uses them to recover anonymous index names. + IndexIDs []int64 `msg:"index_ids"` } // TODO: use msgp.Raw to do version management diff --git a/logservice/schemastore/types_gen.go b/logservice/schemastore/types_gen.go index d5f5b033f1..316b2e1fc7 100644 --- a/logservice/schemastore/types_gen.go +++ b/logservice/schemastore/types_gen.go @@ -284,6 +284,25 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "CDCWriteSource") return } + case "index_ids": + var zb0010 uint32 + zb0010, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "IndexIDs") + return + } + if cap(z.IndexIDs) >= int(zb0010) { + z.IndexIDs = (z.IndexIDs)[:zb0010] + } else { + z.IndexIDs = make([]int64, zb0010) + } + for za0009 := range z.IndexIDs { + z.IndexIDs[za0009], err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "IndexIDs", za0009) + return + } + } default: err = dc.Skip() if err != nil { @@ -297,9 +316,9 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 26 + // map header, size 27 // write "id" - err = en.Append(0xde, 0x0, 0x1a, 0xa2, 0x69, 0x64) + err = en.Append(0xde, 0x0, 0x1b, 0xa2, 0x69, 0x64) if err != nil { return } @@ -614,15 +633,32 @@ func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "CDCWriteSource") return } + // write "index_ids" + err = en.Append(0xa9, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x5f, 0x69, 0x64, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.IndexIDs))) + if err != nil { + err = msgp.WrapError(err, "IndexIDs") + return + } + for za0009 := range z.IndexIDs { + err = en.WriteInt64(z.IndexIDs[za0009]) + if err != nil { + err = msgp.WrapError(err, "IndexIDs", za0009) + return + } + } return } // MarshalMsg implements msgp.Marshaler func (z *PersistedDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 26 + // map header, size 27 // string "id" - o = append(o, 0xde, 0x0, 0x1a, 0xa2, 0x69, 0x64) + o = append(o, 0xde, 0x0, 0x1b, 0xa2, 0x69, 0x64) o = msgp.AppendInt64(o, z.ID) // string "type" o = append(o, 0xa4, 0x74, 0x79, 0x70, 0x65) @@ -723,6 +759,12 @@ func (z *PersistedDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { // string "cdc_write_source" o = append(o, 0xb0, 0x63, 0x64, 0x63, 0x5f, 0x77, 0x72, 0x69, 0x74, 0x65, 0x5f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65) o = msgp.AppendUint64(o, z.CDCWriteSource) + // string "index_ids" + o = append(o, 0xa9, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x5f, 0x69, 0x64, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.IndexIDs))) + for za0009 := range z.IndexIDs { + o = msgp.AppendInt64(o, z.IndexIDs[za0009]) + } return } @@ -1004,6 +1046,25 @@ func (z *PersistedDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "CDCWriteSource") return } + case "index_ids": + var zb0010 uint32 + zb0010, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "IndexIDs") + return + } + if cap(z.IndexIDs) >= int(zb0010) { + z.IndexIDs = (z.IndexIDs)[:zb0010] + } else { + z.IndexIDs = make([]int64, zb0010) + } + for za0009 := range z.IndexIDs { + z.IndexIDs[za0009], bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "IndexIDs", za0009) + return + } + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -1034,7 +1095,7 @@ func (z *PersistedDDLEvent) Msgsize() (s int) { for za0008 := range z.MultipleTableInfosValue { s += msgp.BytesPrefixSize + len(z.MultipleTableInfosValue[za0008]) } - s += 9 + msgp.StringPrefixSize + len(z.BDRRole) + 17 + msgp.Uint64Size + s += 9 + msgp.StringPrefixSize + len(z.BDRRole) + 17 + msgp.Uint64Size + 10 + msgp.ArrayHeaderSize + (len(z.IndexIDs) * (msgp.Int64Size)) return } diff --git a/logservice/schemastore/utils.go b/logservice/schemastore/utils.go index 06d27ab785..5dc1f6c644 100644 --- a/logservice/schemastore/utils.go +++ b/logservice/schemastore/utils.go @@ -18,10 +18,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser" - "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/parser/format" "go.uber.org/zap" ) @@ -39,29 +38,11 @@ func transformDDLJobQuery(job *model.Job) (string, error) { return "", errors.Trace(err) } var result string - buildQuery := func(stmt ast.StmtNode) (string, error) { - var sb strings.Builder - // translate TiDB feature to special comment - restoreFlags := format.RestoreTiDBSpecialComment - // escape the keyword - restoreFlags |= format.RestoreNameBackQuotes - // upper case keyword - restoreFlags |= format.RestoreKeyWordUppercase - // wrap string with single quote - restoreFlags |= format.RestoreStringSingleQuotes - // remove placement rule - restoreFlags |= format.SkipPlacementRuleForRestore - // force disable ttl - restoreFlags |= format.RestoreWithTTLEnableOff - if err = stmt.Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil { - return "", errors.Trace(err) - } - return sb.String(), nil - } + if len(stmts) > 1 { results := make([]string, 0, len(stmts)) for _, stmt := range stmts { - query, err := buildQuery(stmt) + query, err := commonEvent.Restore(stmt) if err != nil { return "", errors.Trace(err) } @@ -69,7 +50,7 @@ func transformDDLJobQuery(job *model.Job) (string, error) { } result = strings.Join(results, ";") } else { - result, err = buildQuery(stmts[0]) + result, err = commonEvent.Restore(stmts[0]) if err != nil { return "", errors.Trace(err) } @@ -98,3 +79,27 @@ func isSplitable(tableInfo *model.TableInfo) bool { } return true } + +func getIndexIDs(job *model.Job) []int64 { + res := make([]int64, 0) + idxArgs, err := model.GetModifyIndexArgs(job) + if idxArgs == nil || err != nil { + if job.MultiSchemaInfo == nil { + return res + } + for idx, subJob := range job.MultiSchemaInfo.SubJobs { + proxyJob := subJob.ToProxyJob(job, idx) + idxArgs, err := model.GetModifyIndexArgs(&proxyJob) + if idxArgs != nil && err == nil { + for _, indexArg := range idxArgs.IndexArgs { + res = append(res, indexArg.IndexID) + } + } + } + return res + } + for _, indexArg := range idxArgs.IndexArgs { + res = append(res, indexArg.IndexID) + } + return res +} diff --git a/logservice/schemastore/utils_test.go b/logservice/schemastore/utils_test.go index 98df7e11c0..8d7d9be4bd 100644 --- a/logservice/schemastore/utils_test.go +++ b/logservice/schemastore/utils_test.go @@ -17,6 +17,7 @@ import ( "testing" commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/tidb/pkg/meta/model" "github.com/stretchr/testify/require" ) @@ -60,3 +61,100 @@ func TestIsSplitable(t *testing.T) { tableInfo = helper.GetModelTableInfo(job) require.False(t, isSplitable(tableInfo)) } + +func TestBuildPersistedDDLEventForMultiSchemaChangeContainsIndexIDs(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, c1 int)") + + job := helper.DDL2Job("alter table t add column c2 int, add index (c1)") + require.Equal(t, model.ActionMultiSchemaChange, job.Type) + + args := buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + job.SchemaID: { + Name: "test", + Tables: map[int64]bool{ + job.TableID: true, + }, + }, + }, + tableMap: map[int64]*BasicTableInfo{ + job.TableID: { + SchemaID: job.SchemaID, + Name: "t", + }, + }, + } + + event := buildPersistedDDLEventForMultiSchemaChange(args) + expectedIndexIDs := getIndexIDs(job) + require.Len(t, expectedIndexIDs, 1) + require.Equal(t, expectedIndexIDs, event.IndexIDs) + require.Equal(t, "test", event.SchemaName) + require.Equal(t, "t", event.TableName) +} + +func TestGetIndexIDsReturnsAllAddIndexIDsInOrder(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, c1 int)") + + job := helper.DDL2Job("alter table t add index idx_c1(c1), add index (c1)") + tableInfo := helper.GetModelTableInfo(job) + require.NotNil(t, tableInfo) + + var namedIndexID int64 + var anonymousIndexID int64 + for _, index := range tableInfo.Indices { + if index == nil { + continue + } + if index.Name.O == "idx_c1" { + namedIndexID = index.ID + continue + } + if len(index.Columns) == 1 && index.Columns[0].Name.L == "c1" { + anonymousIndexID = index.ID + } + } + require.NotZero(t, namedIndexID) + require.NotZero(t, anonymousIndexID) + require.Equal(t, []int64{namedIndexID, anonymousIndexID}, getIndexIDs(job)) +} + +func TestGetIndexIDsReturnsAllAddIndexIDsInOrderForMultiSchemaChange(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, c1 int)") + + job := helper.DDL2Job("alter table t add column c2 int, add index idx_c1(c1), add index (c1)") + require.Equal(t, model.ActionMultiSchemaChange, job.Type) + tableInfo := helper.GetModelTableInfo(job) + require.NotNil(t, tableInfo) + + var namedIndexID int64 + var anonymousIndexID int64 + for _, index := range tableInfo.Indices { + if index == nil { + continue + } + if index.Name.O == "idx_c1" { + namedIndexID = index.ID + continue + } + if len(index.Columns) == 1 && index.Columns[0].Name.L == "c1" { + anonymousIndexID = index.ID + } + } + require.NotZero(t, namedIndexID) + require.NotZero(t, anonymousIndexID) + require.Equal(t, []int64{namedIndexID, anonymousIndexID}, getIndexIDs(job)) +} diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index ccd96bc621..ffa4bc94b7 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -92,7 +92,7 @@ type DDLEvent struct { eventSize int64 `json:"-"` // for simple protocol - IsBootstrap bool `msg:"-"` + IsBootstrap bool `json:"-"` // NotSync is used to indicate whether the event should be synced to downstream. // If it is true, sink should not sync this event to downstream. // It is used for some special DDL events that do not need to be synced, @@ -104,7 +104,11 @@ type DDLEvent struct { // to ensure the new truncated table can be handled correctly. // If the DDL involves multiple tables, this field is not effective. // The multiple table DDL event will be handled by filtering querys and table infos. - NotSync bool `msg:"not_sync"` + NotSync bool `json:"not_sync"` + + // IndexIDs store the add index ids in SQL order for add index and multi schema change DDLs. + // MySQL sink uses them to recover anonymous index names. + IndexIDs []int64 `json:"index_ids"` } func (d *DDLEvent) String() string { @@ -338,7 +342,6 @@ func (t DDLEvent) encodeV1() ([]byte, error) { multipleTableInfosDataSize := make([]byte, 8) binary.BigEndian.PutUint64(multipleTableInfosDataSize, uint64(len(t.MultipleTableInfos))) data = append(data, multipleTableInfosDataSize...) - return data, nil } @@ -348,6 +351,7 @@ func (t *DDLEvent) decodeV1(data []byte) error { end := len(data) multipleTableInfosDataSize := binary.BigEndian.Uint64(data[end-8 : end]) + end -= 8 for i := 0; i < int(multipleTableInfosDataSize); i++ { tableInfoDataSize := binary.BigEndian.Uint64(data[end-8 : end]) tableInfoData := data[end-8-int(tableInfoDataSize) : end-8] @@ -358,7 +362,7 @@ func (t *DDLEvent) decodeV1(data []byte) error { t.MultipleTableInfos = append(t.MultipleTableInfos, info) end -= 8 + int(tableInfoDataSize) } - end -= 8 + int(multipleTableInfosDataSize) + tableInfoDataSize := binary.BigEndian.Uint64(data[end-8 : end]) var err error if tableInfoDataSize > 0 { diff --git a/pkg/common/event/util.go b/pkg/common/event/util.go index 5e2237cfa6..ee61099b60 100644 --- a/pkg/common/event/util.go +++ b/pkg/common/event/util.go @@ -436,6 +436,27 @@ func toTableInfosKey(schema, table string) string { return schema + "." + table } +func Restore(stmt ast.StmtNode) (string, error) { + var sb strings.Builder + // translate TiDB feature to special comment + restoreFlags := format.RestoreTiDBSpecialComment + // escape the keyword + restoreFlags |= format.RestoreNameBackQuotes + // upper case keyword + restoreFlags |= format.RestoreKeyWordUppercase + // wrap string with single quote + restoreFlags |= format.RestoreStringSingleQuotes + // remove placement rule + restoreFlags |= format.SkipPlacementRuleForRestore + // force disable ttl + restoreFlags |= format.RestoreWithTTLEnableOff + err := stmt.Restore(format.NewRestoreCtx(restoreFlags, &sb)) + if err != nil { + return "", errors.Trace(err) + } + return sb.String(), nil +} + // SplitQueries takes a string containing multiple SQL statements and splits them into individual SQL statements. // This function is designed for scenarios like batch creation of tables, where multiple `CREATE TABLE` statements // might be combined into a single query string. @@ -453,31 +474,14 @@ func SplitQueries(queries string) ([]string, error) { var res []string for _, stmt := range stmts { - var sb strings.Builder - // translate TiDB feature to special comment - restoreFlags := format.RestoreTiDBSpecialComment - // escape the keyword - restoreFlags |= format.RestoreNameBackQuotes - // upper case keyword - restoreFlags |= format.RestoreKeyWordUppercase - // wrap string with single quote - restoreFlags |= format.RestoreStringSingleQuotes - // remove placement rule - restoreFlags |= format.SkipPlacementRuleForRestore - // force disable ttl - restoreFlags |= format.RestoreWithTTLEnableOff - err := stmt.Restore(&format.RestoreCtx{ - Flags: restoreFlags, - In: &sb, - }) + query, err := Restore(stmt) if err != nil { return nil, errors.WrapError(errors.ErrTiDBUnexpectedJobMeta, err) } // The (ast.Node).Restore function generates a SQL string representation of the AST (Abstract Syntax Tree) node. // By default, the resulting SQL string does not include a trailing semicolon ";". // Therefore, we explicitly append a semicolon here to ensure the SQL statement is complete. - sb.WriteByte(';') - res = append(res, sb.String()) + res = append(res, fmt.Sprintf("%s;", query)) } return res, nil diff --git a/pkg/sink/mysql/ddl_index_rewrite.go b/pkg/sink/mysql/ddl_index_rewrite.go new file mode 100644 index 0000000000..7223b8337c --- /dev/null +++ b/pkg/sink/mysql/ddl_index_rewrite.go @@ -0,0 +1,119 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" +) + +func restoreAnonymousIndexToNamedIndex(query string, tableInfo *common.TableInfo, indexIDs []int64) (string, bool, error) { + if query == "" || tableInfo == nil || len(indexIDs) == 0 { + return query, false, nil + } + + p := parser.New() + stmt, err := p.ParseOneStmt(query, "", "") + if err != nil { + return query, false, errors.Trace(err) + } + + alterStmt, ok := stmt.(*ast.AlterTableStmt) + if !ok { + return query, false, nil + } + + indexNameByID := getIndexNameByIDMap(tableInfo) + if len(indexNameByID) == 0 { + return query, false, nil + } + + indexConstraints := make([]*ast.Constraint, 0) + for _, spec := range alterStmt.Specs { + if spec == nil || spec.Tp != ast.AlterTableAddConstraint || spec.Constraint == nil { + continue + } + constraint := spec.Constraint + if !isIndexConstraint(constraint) { + continue + } + indexConstraints = append(indexConstraints, constraint) + } + if len(indexConstraints) == 0 { + return query, false, nil + } + + changed := false + for i, constraint := range indexConstraints { + if i >= len(indexIDs) { + break + } + if constraint.Name != "" { + continue + } + indexName, ok := indexNameByID[indexIDs[i]] + if !ok { + continue + } + constraint.Name = indexName + changed = true + } + + if !changed { + return query, false, nil + } + + restoredQuery, err := commonEvent.Restore(stmt) + if err != nil { + return query, false, err + } + return restoredQuery, true, nil +} + +func getIndexNameByIDMap(tableInfo *common.TableInfo) map[int64]string { + indices := tableInfo.GetIndices() + if len(indices) == 0 { + return nil + } + indexNameByID := make(map[int64]string, len(indices)) + for _, index := range indices { + if index == nil { + continue + } + indexNameByID[index.ID] = index.Name.O + } + return indexNameByID +} + +func isIndexConstraint(constraint *ast.Constraint) bool { + if constraint == nil { + return false + } + switch constraint.Tp { + case ast.ConstraintKey, + ast.ConstraintIndex, + ast.ConstraintUniq, + ast.ConstraintUniqKey, + ast.ConstraintUniqIndex, + ast.ConstraintFulltext, + ast.ConstraintVector, + ast.ConstraintColumnar: + return true + default: + return false + } +} diff --git a/pkg/sink/mysql/format_ddl.go b/pkg/sink/mysql/format_ddl.go index 8332d18ff2..8f039cb05a 100644 --- a/pkg/sink/mysql/format_ddl.go +++ b/pkg/sink/mysql/format_ddl.go @@ -14,12 +14,10 @@ package mysql import ( - "bytes" - "github.com/pingcap/log" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/parser/format" "github.com/pingcap/tidb/pkg/parser/mysql" "go.uber.org/zap" ) @@ -55,10 +53,9 @@ func formatQuery(sql string) string { } stmt.Accept(&visiter{}) - buf := new(bytes.Buffer) - restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, buf) - if err = stmt.Restore(restoreCtx); err != nil { + query, err := commonEvent.Restore(stmt) + if err != nil { log.Error("format query restore failed", zap.Error(err)) } - return buf.String() + return query } diff --git a/pkg/sink/mysql/mysql_writer_ddl.go b/pkg/sink/mysql/mysql_writer_ddl.go index 67a45a4aeb..06ca263d6a 100644 --- a/pkg/sink/mysql/mysql_writer_ddl.go +++ b/pkg/sink/mysql/mysql_writer_ddl.go @@ -56,6 +56,22 @@ func (w *Writer) execDDL(event *commonEvent.DDLEvent) error { ctx := w.ctx shouldSwitchDB := needSwitchDB(event) + switch event.GetDDLType() { + case timodel.ActionMultiSchemaChange, timodel.ActionAddIndex: + newQuery, changed, err := restoreAnonymousIndexToNamedIndex(event.Query, event.TableInfo, event.IndexIDs) + if err != nil { + log.Warn("failed to restore anonymous index name", + zap.String("changefeed", w.ChangefeedID.String()), + zap.String("query", event.Query), + zap.Error(err)) + } else if changed { + log.Info("restore anonymous index to named index", + zap.String("changefeed", w.ChangefeedID.String()), + zap.String("query", event.Query), + zap.String("newQuery", newQuery)) + event.Query = newQuery + } + } // Convert vector type to string type for unsupport database if w.cfg.HasVectorType { if newQuery := formatQuery(event.Query); newQuery != event.Query { diff --git a/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go new file mode 100644 index 0000000000..0a2eacf008 --- /dev/null +++ b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go @@ -0,0 +1,500 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "slices" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + timodel "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/stretchr/testify/require" +) + +func disableDistTaskForTest(helper *commonEvent.EventTestHelper) { + helper.Tk().MustExec("set @@global.tidb_enable_dist_task = off") +} + +func getIndexIDsFromJob(t *testing.T, job *timodel.Job) []int64 { + idxArgs, err := timodel.GetModifyIndexArgs(job) + if idxArgs != nil && err == nil { + indexIDs := make([]int64, 0, len(idxArgs.IndexArgs)) + for _, indexArg := range idxArgs.IndexArgs { + indexIDs = append(indexIDs, indexArg.IndexID) + } + return indexIDs + } + + indexIDs := make([]int64, 0) + require.NotNil(t, job.MultiSchemaInfo) + for idx, subJob := range job.MultiSchemaInfo.SubJobs { + proxyJob := subJob.ToProxyJob(job, idx) + subIdxArgs, subErr := timodel.GetModifyIndexArgs(&proxyJob) + if subIdxArgs == nil || subErr != nil { + continue + } + for _, indexArg := range subIdxArgs.IndexArgs { + indexIDs = append(indexIDs, indexArg.IndexID) + } + } + return indexIDs +} + +func getIndexNameByID(t *testing.T, tableInfo *common.TableInfo, indexID int64) string { + for _, index := range tableInfo.GetIndices() { + if index != nil && index.ID == indexID { + return index.Name.O + } + } + require.FailNow(t, "index id not found", "index id: %d", indexID) + return "" +} + +func parseAddIndexConstraintNames(t *testing.T, query string) []string { + p := parser.New() + stmt, err := p.ParseOneStmt(query, "", "") + require.NoError(t, err) + + alterStmt, ok := stmt.(*ast.AlterTableStmt) + require.True(t, ok) + + names := make([]string, 0) + for _, spec := range alterStmt.Specs { + if spec == nil || spec.Tp != ast.AlterTableAddConstraint || spec.Constraint == nil { + continue + } + if !isIndexConstraint(spec.Constraint) { + continue + } + names = append(names, spec.Constraint.Name) + } + return names +} + +func expectedAddIndexConstraintNames(t *testing.T, query string, tableInfo *common.TableInfo, indexIDs []int64) []string { + p := parser.New() + stmt, err := p.ParseOneStmt(query, "", "") + require.NoError(t, err) + + alterStmt, ok := stmt.(*ast.AlterTableStmt) + require.True(t, ok) + + names := make([]string, 0) + indexPos := 0 + for _, spec := range alterStmt.Specs { + if spec == nil || spec.Tp != ast.AlterTableAddConstraint || spec.Constraint == nil { + continue + } + if !isIndexConstraint(spec.Constraint) { + continue + } + if spec.Constraint.Name != "" { + names = append(names, spec.Constraint.Name) + } else { + require.Less(t, indexPos, len(indexIDs)) + names = append(names, getIndexNameByID(t, tableInfo, indexIDs[indexPos])) + } + indexPos++ + } + require.Equal(t, indexPos, len(indexIDs)) + return names +} + +func assertRestoreAnonymousIndexToNamedIndex( + t *testing.T, + helper *commonEvent.EventTestHelper, + ddlSQL string, + anonymousQuery string, + expectedChanged bool, + expectedDDLType timodel.ActionType, +) *common.TableInfo { + job := helper.DDL2Job(ddlSQL) + require.Equal(t, expectedDDLType, job.Type) + + tableInfo := helper.GetTableInfo(job) + require.NotNil(t, tableInfo) + + indexIDs := getIndexIDsFromJob(t, job) + restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(anonymousQuery, tableInfo, indexIDs) + require.NoError(t, err) + require.Equal(t, expectedChanged, changed) + if !expectedChanged { + require.Equal(t, anonymousQuery, restoredQuery) + return tableInfo + } + + require.Equal(t, expectedAddIndexConstraintNames(t, anonymousQuery, tableInfo, indexIDs), parseAddIndexConstraintNames(t, restoredQuery)) + return tableInfo +} + +func getSecondaryIndexNames(tableInfo *common.TableInfo) []string { + indexNames := make([]string, 0) + for _, index := range tableInfo.GetIndices() { + if index == nil || index.Primary { + continue + } + indexNames = append(indexNames, index.Name.O) + } + slices.Sort(indexNames) + return indexNames +} + +func TestExecDDL_RestoreAnonymousIndexToNamedIndex(t *testing.T) { + writer, db, mock := newTestMysqlWriter(t) + defer db.Close() + + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + disableDistTaskForTest(helper) + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, name varchar(32), index name(id))") + + job := helper.DDL2Job("alter table t add index (name)") + require.Equal(t, timodel.ActionAddIndex, job.Type) + + tableInfo := helper.GetTableInfo(job) + require.NotNil(t, tableInfo) + + indexIDs := getIndexIDsFromJob(t, job) + require.Len(t, indexIDs, 1) + expectedIndexName := getIndexNameByID(t, tableInfo, indexIDs[0]) + + anonymousQuery := "ALTER TABLE `t` ADD INDEX (`name`)" + + restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(anonymousQuery, tableInfo, indexIDs) + require.NoError(t, err) + require.True(t, changed) + require.Equal(t, []string{expectedIndexName}, parseAddIndexConstraintNames(t, restoredQuery)) + + ddlEvent := &commonEvent.DDLEvent{ + Type: byte(job.Type), + Query: anonymousQuery, + SchemaName: job.SchemaName, + TableName: job.TableName, + TableInfo: tableInfo, + IndexIDs: indexIDs, + } + + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(restoredQuery).WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + err = writer.execDDL(ddlEvent) + require.NoError(t, err) + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestRestoreAnonymousIndexToNamedIndexMultipleAnonymousIndexes(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + disableDistTaskForTest(helper) + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, name varchar(32), age int)") + + job := helper.DDL2Job("alter table t add index (name), add unique (age)") + + tableInfo := helper.GetTableInfo(job) + require.NotNil(t, tableInfo) + + indexIDs := getIndexIDsFromJob(t, job) + require.Len(t, indexIDs, 2) + + expectedNames := []string{ + getIndexNameByID(t, tableInfo, indexIDs[0]), + getIndexNameByID(t, tableInfo, indexIDs[1]), + } + + anonymousQuery := "ALTER TABLE `t` ADD INDEX (`name`), ADD UNIQUE (`age`)" + restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(anonymousQuery, tableInfo, indexIDs) + require.NoError(t, err) + require.True(t, changed) + require.Equal(t, expectedNames, parseAddIndexConstraintNames(t, restoredQuery)) +} + +func TestRestoreAnonymousIndexToNamedIndexWithNamedAndAnonymousIndexes(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + disableDistTaskForTest(helper) + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, name varchar(32), age int)") + + job := helper.DDL2Job("alter table t add index idx_name(name), add index (age)") + + tableInfo := helper.GetTableInfo(job) + require.NotNil(t, tableInfo) + indexIDs := getIndexIDsFromJob(t, job) + require.Len(t, indexIDs, 2) + + expectedAnonymousName := "" + for _, index := range tableInfo.GetIndices() { + if index == nil || len(index.Columns) != 1 { + continue + } + if index.Columns[0].Name.L == "age" { + expectedAnonymousName = index.Name.O + break + } + } + require.NotEmpty(t, expectedAnonymousName) + + mixedQuery := "ALTER TABLE `t` ADD INDEX `idx_name` (`name`), ADD INDEX (`age`)" + restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(mixedQuery, tableInfo, indexIDs) + require.NoError(t, err) + require.True(t, changed) + require.Equal(t, []string{"idx_name", expectedAnonymousName}, parseAddIndexConstraintNames(t, restoredQuery)) + + unchangedQuery, unchanged, err := restoreAnonymousIndexToNamedIndex(mixedQuery, tableInfo, nil) + require.NoError(t, err) + require.False(t, unchanged) + require.Equal(t, mixedQuery, unchangedQuery) +} + +func TestExecDDL_RestoreAnonymousIndexToNamedIndexForMultiSchemaChange(t *testing.T) { + writer, db, mock := newTestMysqlWriter(t) + defer db.Close() + + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + disableDistTaskForTest(helper) + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, name varchar(32))") + + job := helper.DDL2Job("alter table t add column age int, add index (name)") + require.Equal(t, timodel.ActionMultiSchemaChange, job.Type) + + tableInfo := helper.GetTableInfo(job) + require.NotNil(t, tableInfo) + + indexIDs := getIndexIDsFromJob(t, job) + require.Len(t, indexIDs, 1) + expectedIndexName := getIndexNameByID(t, tableInfo, indexIDs[0]) + + anonymousQuery := "ALTER TABLE `t` ADD COLUMN `age` INT, ADD INDEX (`name`)" + restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(anonymousQuery, tableInfo, indexIDs) + require.NoError(t, err) + require.True(t, changed) + require.Equal(t, []string{expectedIndexName}, parseAddIndexConstraintNames(t, restoredQuery)) + + ddlEvent := &commonEvent.DDLEvent{ + Type: byte(job.Type), + Query: anonymousQuery, + SchemaName: job.SchemaName, + TableName: job.TableName, + TableInfo: tableInfo, + IndexIDs: indexIDs, + } + + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(restoredQuery).WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + err = writer.execDDL(ddlEvent) + require.NoError(t, err) + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestRestoreAnonymousIndexToNamedIndexDDLWaitCases(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + disableDistTaskForTest(helper) + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t_anon_idx (id int primary key, a int, b int, c int)") + + testCases := []struct { + name string + ddlSQL string + anonymousQuery string + expectedChanged bool + expectedDDLType timodel.ActionType + }{ + { + name: "single anonymous index", + ddlSQL: "alter table t_anon_idx add index (a)", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD INDEX (`a`)", + expectedChanged: true, + expectedDDLType: timodel.ActionAddIndex, + }, + { + name: "named and anonymous indexes", + ddlSQL: "alter table t_anon_idx add index idx_b(b), add index (a)", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD INDEX `idx_b`(`b`), ADD INDEX (`a`)", + expectedChanged: true, + expectedDDLType: timodel.ActionMultiSchemaChange, + }, + { + name: "anonymous index and anonymous unique", + ddlSQL: "alter table t_anon_idx add index (a), add unique (b, c)", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD INDEX (`a`), ADD UNIQUE (`b`, `c`)", + expectedChanged: true, + expectedDDLType: timodel.ActionMultiSchemaChange, + }, + { + name: "anonymous index before add column", + ddlSQL: "alter table t_anon_idx add index (a), add column d int", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD INDEX (`a`), ADD COLUMN `d` INT", + expectedChanged: true, + expectedDDLType: timodel.ActionMultiSchemaChange, + }, + { + name: "anonymous index after add column", + ddlSQL: "alter table t_anon_idx add column e int, add index (a)", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD COLUMN `e` INT, ADD INDEX (`a`)", + expectedChanged: true, + expectedDDLType: timodel.ActionMultiSchemaChange, + }, + { + name: "named index keeps original name", + ddlSQL: "alter table t_anon_idx add index a_7(a)", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD INDEX `a_7`(`a`)", + expectedChanged: false, + expectedDDLType: timodel.ActionAddIndex, + }, + { + name: "anonymous index after explicit suffix", + ddlSQL: "alter table t_anon_idx add index (a)", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD INDEX (`a`)", + expectedChanged: true, + expectedDDLType: timodel.ActionAddIndex, + }, + { + name: "repeated anonymous index second time", + ddlSQL: "alter table t_anon_idx add index (a)", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD INDEX (`a`)", + expectedChanged: true, + expectedDDLType: timodel.ActionAddIndex, + }, + { + name: "repeated anonymous index third time", + ddlSQL: "alter table t_anon_idx add index (a)", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD INDEX (`a`)", + expectedChanged: true, + expectedDDLType: timodel.ActionAddIndex, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + tc.ddlSQL, + tc.anonymousQuery, + tc.expectedChanged, + tc.expectedDDLType, + ) + }) + } +} + +func TestCreateTableLikeKeepsAnonymousIndexNamesAfterDDLWaitCases(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + disableDistTaskForTest(helper) + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t_anon_idx (id int primary key, a int, b int, c int)") + var sourceTableInfo *common.TableInfo + + assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add index (a)", + "ALTER TABLE `t_anon_idx` ADD INDEX (`a`)", + true, + timodel.ActionAddIndex, + ) + assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add index idx_b(b), add index (a)", + "ALTER TABLE `t_anon_idx` ADD INDEX `idx_b`(`b`), ADD INDEX (`a`)", + true, + timodel.ActionMultiSchemaChange, + ) + assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add index (a), add unique (b, c)", + "ALTER TABLE `t_anon_idx` ADD INDEX (`a`), ADD UNIQUE (`b`, `c`)", + true, + timodel.ActionMultiSchemaChange, + ) + assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add index (a), add column d int", + "ALTER TABLE `t_anon_idx` ADD INDEX (`a`), ADD COLUMN `d` INT", + true, + timodel.ActionMultiSchemaChange, + ) + sourceTableInfo = assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add column e int, add index (a)", + "ALTER TABLE `t_anon_idx` ADD COLUMN `e` INT, ADD INDEX (`a`)", + true, + timodel.ActionMultiSchemaChange, + ) + assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add index a_7(a)", + "ALTER TABLE `t_anon_idx` ADD INDEX `a_7`(`a`)", + false, + timodel.ActionAddIndex, + ) + sourceTableInfo = assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add index (a)", + "ALTER TABLE `t_anon_idx` ADD INDEX (`a`)", + true, + timodel.ActionAddIndex, + ) + sourceTableInfo = assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add index (a)", + "ALTER TABLE `t_anon_idx` ADD INDEX (`a`)", + true, + timodel.ActionAddIndex, + ) + sourceTableInfo = assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add index (a)", + "ALTER TABLE `t_anon_idx` ADD INDEX (`a`)", + true, + timodel.ActionAddIndex, + ) + + likeJob := helper.DDL2Job("create table t_anon_idx_like like t_anon_idx") + likeTableInfo := helper.GetTableInfo(likeJob) + require.NotNil(t, sourceTableInfo) + require.NotNil(t, likeTableInfo) + require.Equal(t, getSecondaryIndexNames(sourceTableInfo), getSecondaryIndexNames(likeTableInfo)) +} diff --git a/tests/integration_tests/ddl_wait/run.sh b/tests/integration_tests/ddl_wait/run.sh index e566b97c27..ef1f281196 100755 --- a/tests/integration_tests/ddl_wait/run.sh +++ b/tests/integration_tests/ddl_wait/run.sh @@ -8,6 +8,21 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 +function check_downstream_indexes_match_upstream() { + schema_name=$1 + table_name=$2 + index_names=$(mysql -uroot -h${UP_TIDB_HOST} -P${UP_TIDB_PORT} --default-character-set utf8mb4 -N \ + -e "SELECT DISTINCT INDEX_NAME FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_SCHEMA='${schema_name}' AND TABLE_NAME='${table_name}' ORDER BY INDEX_NAME;") + + while IFS= read -r index_name; do + if [ -z "$index_name" ]; then + continue + fi + run_sql "SHOW INDEX FROM \`${schema_name}\`.\`${table_name}\` WHERE Key_name='${index_name}';" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_contains "Key_name: ${index_name}" + done <<<"${index_names}" +} + # This test simulates DDL operations that take a long time. # TiCDC blocks DDL operations until its state is not running, except for adding indexes. # TiCDC also checks add index ddl state before execute a new DDL. @@ -62,17 +77,39 @@ function run() { # indexes should be the same when CDC retries happened # ref: https://github.com/pingcap/tiflow/issues/12128 - # FIXME: use named index to avoid duplicate index - # run_sql "update test.t set col = 55 where id = 5;" - # run_sql "alter table test.t add index (col);" - # run_sql "update test.t set col = 66 where id = 6;" - # run_sql "alter table test.t add index (col);" - # run_sql "update test.t set col = 77 where id = 7;" - # sleep 10 - # cleanup_process $CDC_BINARY - # run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - # # make sure all tables are equal in upstream and downstream - # check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 300 + run_sql "update test.t set col = 55 where id = 5;" + run_sql "alter table test.t add index (col);" + run_sql "update test.t set col = 66 where id = 6;" + run_sql "alter table test.t add index (col);" + run_sql "update test.t set col = 77 where id = 7;" + sleep 10 + cleanup_process $CDC_BINARY + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + # make sure all tables are equal in upstream and downstream + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 300 + + # anonymous add index related ddl + run_sql "create table test.t_anon_idx (id int primary key, a int, b int, c int);" + run_sql "insert into test.t_anon_idx values (1, 10, 20, 30), (2, 11, 21, 31), (3, 12, 22, 32);" + run_sql "alter table test.t_anon_idx add index (a);" + run_sql "alter table test.t_anon_idx add index idx_b (b), add index (a);" + run_sql "alter table test.t_anon_idx add index (a), add unique (b, c);" + run_sql "alter table test.t_anon_idx add index (a), add column d int;" + run_sql "alter table test.t_anon_idx add column e int, add index (a);" + run_sql "alter table test.t_anon_idx drop column d, drop column e;" + run_sql "alter table test.t_anon_idx add index a_7(a);" + run_sql "alter table test.t_anon_idx add index (a);" + run_sql "alter table test.t_anon_idx add index (a);" + run_sql "alter table test.t_anon_idx add index (a);" + run_sql "create table test.t_anon_idx_like like test.t_anon_idx;" + run_sql "insert into test.t_anon_idx values (4, 13, 23, 33);" + check_table_exists test.t_anon_idx ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists test.t_anon_idx_like ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_downstream_indexes_match_upstream test t_anon_idx + check_downstream_indexes_match_upstream test t_anon_idx_like + + # ensure both data and index schema are eventually consistent after anonymous index ddl + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 300 cleanup_process $CDC_BINARY }