From 00532baef5eb55bfa16c54ca6f4fb8b84eb75599 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 25 Feb 2026 10:35:15 +0000 Subject: [PATCH 01/13] init Signed-off-by: wk989898 --- pkg/sink/mysql/ddl_index_rewrite.go | 185 ++++++++++++++++++ pkg/sink/mysql/mysql_writer_ddl.go | 15 ++ .../mysql_writer_ddl_index_rewrite_test.go | 83 ++++++++ tests/integration_tests/ddl_wait/run.sh | 21 +- 4 files changed, 293 insertions(+), 11 deletions(-) create mode 100644 pkg/sink/mysql/ddl_index_rewrite.go create mode 100644 pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go diff --git a/pkg/sink/mysql/ddl_index_rewrite.go b/pkg/sink/mysql/ddl_index_rewrite.go new file mode 100644 index 0000000000..b5277e1317 --- /dev/null +++ b/pkg/sink/mysql/ddl_index_rewrite.go @@ -0,0 +1,185 @@ +package mysql + +import ( + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/pkg/common" + timodel "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/format" +) + +type indexKeyPart struct { + nameL string + length int +} + +func restoreAnonymousIndexToNamedIndex(query string, tableInfo *common.TableInfo) (string, bool, error) { + if query == "" || tableInfo == nil { + return query, false, nil + } + + p := parser.New() + stmt, err := p.ParseOneStmt(query, "", "") + if err != nil { + return query, false, errors.Trace(err) + } + + alterStmt, ok := stmt.(*ast.AlterTableStmt) + if !ok { + return query, false, nil + } + + changed := false + for _, spec := range alterStmt.Specs { + if spec == nil || spec.Tp != ast.AlterTableAddConstraint || spec.Constraint == nil { + continue + } + constraint := spec.Constraint + if constraint.Name != "" { + continue + } + if !isIndexConstraint(constraint) { + continue + } + + indexName, ok := findIndexNameForConstraint(tableInfo, constraint) + if !ok { + continue + } + constraint.Name = indexName + changed = true + } + + if !changed { + return query, false, nil + } + + restoredQuery, err := restoreDDLStmt(stmt) + if err != nil { + return query, false, err + } + return restoredQuery, true, nil +} + +func isIndexConstraint(constraint *ast.Constraint) bool { + if constraint == nil { + return false + } + switch constraint.Tp { + case ast.ConstraintKey, + ast.ConstraintIndex, + ast.ConstraintUniq, + ast.ConstraintUniqKey, + ast.ConstraintUniqIndex, + ast.ConstraintFulltext, + ast.ConstraintVector, + ast.ConstraintColumnar: + return true + default: + return false + } +} + +func isUniqueIndexConstraint(constraint *ast.Constraint) bool { + if constraint == nil { + return false + } + switch constraint.Tp { + case ast.ConstraintUniq, ast.ConstraintUniqKey, ast.ConstraintUniqIndex: + return true + default: + return false + } +} + +func findIndexNameForConstraint(tableInfo *common.TableInfo, constraint *ast.Constraint) (string, bool) { + keyParts, ok := getConstraintKeyParts(constraint) + if !ok { + return "", false + } + + wantUnique := isUniqueIndexConstraint(constraint) + indices := tableInfo.GetIndices() + if len(indices) == 0 { + return "", false + } + + matches := make([]*timodel.IndexInfo, 0, 1) + for _, index := range indices { + if index == nil || index.Primary { + continue + } + if index.Unique != wantUnique { + continue + } + // For `ADD INDEX` jobs, TableInfo may contain indices in non-public states. + // Only use public indices to avoid selecting transient metadata. + if index.State != timodel.StatePublic { + continue + } + if !indexColumnsMatchKeyParts(index.Columns, keyParts) { + continue + } + matches = append(matches, index) + } + if len(matches) != 1 { + return "", false + } + return matches[0].Name.O, true +} + +func getConstraintKeyParts(constraint *ast.Constraint) ([]indexKeyPart, bool) { + if constraint == nil || len(constraint.Keys) == 0 { + return nil, false + } + + parts := make([]indexKeyPart, 0, len(constraint.Keys)) + for _, key := range constraint.Keys { + if key == nil || key.Expr != nil || key.Column == nil { + return nil, false + } + parts = append(parts, indexKeyPart{ + nameL: key.Column.Name.L, + length: key.Length, + }) + } + return parts, true +} + +func indexColumnsMatchKeyParts(indexColumns []*timodel.IndexColumn, keyParts []indexKeyPart) bool { + if len(indexColumns) != len(keyParts) { + return false + } + for i, part := range keyParts { + col := indexColumns[i] + if col == nil { + return false + } + if col.Name.L != part.nameL { + return false + } + if part.length > 0 { + if col.Length != part.length { + return false + } + } + } + return true +} + +func restoreDDLStmt(stmt ast.StmtNode) (string, error) { + var sb strings.Builder + restoreFlags := format.RestoreTiDBSpecialComment | + format.RestoreNameBackQuotes | + format.RestoreKeyWordUppercase | + format.RestoreStringSingleQuotes | + format.SkipPlacementRuleForRestore | + format.RestoreWithTTLEnableOff + if err := stmt.Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil { + return "", errors.Trace(err) + } + return sb.String(), nil +} diff --git a/pkg/sink/mysql/mysql_writer_ddl.go b/pkg/sink/mysql/mysql_writer_ddl.go index 67a45a4aeb..486af2f6a8 100644 --- a/pkg/sink/mysql/mysql_writer_ddl.go +++ b/pkg/sink/mysql/mysql_writer_ddl.go @@ -56,6 +56,21 @@ func (w *Writer) execDDL(event *commonEvent.DDLEvent) error { ctx := w.ctx shouldSwitchDB := needSwitchDB(event) + if event.GetDDLType() == timodel.ActionAddIndex { + newQuery, changed, err := restoreAnonymousIndexToNamedIndex(event.Query, event.TableInfo) + if err != nil { + log.Warn("failed to restore anonymous index name", + zap.String("changefeed", w.ChangefeedID.String()), + zap.String("query", event.Query), + zap.Error(err)) + } else if changed { + log.Info("restore anonymous index to named index", + zap.String("changefeed", w.ChangefeedID.String()), + zap.String("query", event.Query), + zap.String("newQuery", newQuery)) + event.Query = newQuery + } + } // Convert vector type to string type for unsupport database if w.cfg.HasVectorType { if newQuery := formatQuery(event.Query); newQuery != event.Query { diff --git a/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go new file mode 100644 index 0000000000..8e7985bf69 --- /dev/null +++ b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go @@ -0,0 +1,83 @@ +package mysql + +import ( + "testing" + + "github.com/DATA-DOG/go-sqlmock" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" + timodel "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/stretchr/testify/require" +) + +func TestExecDDL_RestoreAnonymousIndexToNamedIndex(t *testing.T) { + writer, db, mock := newTestMysqlWriter(t) + defer db.Close() + + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, name varchar(32), index name(id))") + + job := helper.DDL2Job("alter table t add index (name)") + require.Equal(t, timodel.ActionAddIndex, job.Type) + + tableInfo := helper.GetTableInfo(job) + require.NotNil(t, tableInfo) + + expectedIndexName := "" + for _, index := range tableInfo.GetIndices() { + if index == nil || index.Primary || index.Unique { + continue + } + if len(index.Columns) != 1 { + continue + } + if index.Columns[0].Name.L == "name" { + expectedIndexName = index.Name.O + break + } + } + require.NotEmpty(t, expectedIndexName) + + anonymousQuery := "ALTER TABLE `t` ADD INDEX (`name`)" + + restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(anonymousQuery, tableInfo) + require.NoError(t, err) + require.True(t, changed) + + p := parser.New() + stmt, err := p.ParseOneStmt(restoredQuery, "", "") + require.NoError(t, err) + alterStmt, ok := stmt.(*ast.AlterTableStmt) + require.True(t, ok) + + restoredName := "" + for _, spec := range alterStmt.Specs { + if spec != nil && spec.Tp == ast.AlterTableAddConstraint && spec.Constraint != nil { + restoredName = spec.Constraint.Name + break + } + } + require.Equal(t, expectedIndexName, restoredName) + + ddlEvent := &commonEvent.DDLEvent{ + Type: byte(job.Type), + Query: anonymousQuery, + SchemaName: job.SchemaName, + TableName: job.TableName, + TableInfo: tableInfo, + } + + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(restoredQuery).WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + err = writer.execDDL(ddlEvent) + require.NoError(t, err) + require.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/tests/integration_tests/ddl_wait/run.sh b/tests/integration_tests/ddl_wait/run.sh index e566b97c27..977d7c26d3 100755 --- a/tests/integration_tests/ddl_wait/run.sh +++ b/tests/integration_tests/ddl_wait/run.sh @@ -62,17 +62,16 @@ function run() { # indexes should be the same when CDC retries happened # ref: https://github.com/pingcap/tiflow/issues/12128 - # FIXME: use named index to avoid duplicate index - # run_sql "update test.t set col = 55 where id = 5;" - # run_sql "alter table test.t add index (col);" - # run_sql "update test.t set col = 66 where id = 6;" - # run_sql "alter table test.t add index (col);" - # run_sql "update test.t set col = 77 where id = 7;" - # sleep 10 - # cleanup_process $CDC_BINARY - # run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - # # make sure all tables are equal in upstream and downstream - # check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 300 + run_sql "update test.t set col = 55 where id = 5;" + run_sql "alter table test.t add index (col);" + run_sql "update test.t set col = 66 where id = 6;" + run_sql "alter table test.t add index (col);" + run_sql "update test.t set col = 77 where id = 7;" + sleep 10 + cleanup_process $CDC_BINARY + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + # make sure all tables are equal in upstream and downstream + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 300 cleanup_process $CDC_BINARY } From 9ee473123164ba8bf06a51134cf7a252582055ae Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 25 Feb 2026 10:38:34 +0000 Subject: [PATCH 02/13] chore Signed-off-by: wk989898 --- pkg/sink/mysql/ddl_index_rewrite.go | 13 +++++++++++++ .../mysql/mysql_writer_ddl_index_rewrite_test.go | 13 +++++++++++++ 2 files changed, 26 insertions(+) diff --git a/pkg/sink/mysql/ddl_index_rewrite.go b/pkg/sink/mysql/ddl_index_rewrite.go index b5277e1317..ba38e4c2c3 100644 --- a/pkg/sink/mysql/ddl_index_rewrite.go +++ b/pkg/sink/mysql/ddl_index_rewrite.go @@ -1,3 +1,16 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package mysql import ( diff --git a/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go index 8e7985bf69..1526c9ccb4 100644 --- a/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go +++ b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go @@ -1,3 +1,16 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package mysql import ( From 4b9ea9416d7254cc86a2a9f46c3970156a4a8de9 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 4 Mar 2026 04:02:20 +0000 Subject: [PATCH 03/13] update Signed-off-by: wk989898 --- pkg/sink/mysql/ddl_index_rewrite.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/sink/mysql/ddl_index_rewrite.go b/pkg/sink/mysql/ddl_index_rewrite.go index ba38e4c2c3..61f683dcc2 100644 --- a/pkg/sink/mysql/ddl_index_rewrite.go +++ b/pkg/sink/mysql/ddl_index_rewrite.go @@ -128,9 +128,8 @@ func findIndexNameForConstraint(tableInfo *common.TableInfo, constraint *ast.Con if index.Unique != wantUnique { continue } - // For `ADD INDEX` jobs, TableInfo may contain indices in non-public states. - // Only use public indices to avoid selecting transient metadata. - if index.State != timodel.StatePublic { + // Only use non-public indices. + if index.State == timodel.StatePublic { continue } if !indexColumnsMatchKeyParts(index.Columns, keyParts) { From d3f3925e82d97888886612a19041f7914541d5a7 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 4 Mar 2026 04:13:57 +0000 Subject: [PATCH 04/13] update Signed-off-by: wk989898 --- logservice/schemastore/utils.go | 27 +++---------------- pkg/common/event/util.go | 42 ++++++++++++++++------------- pkg/sink/mysql/ddl_index_rewrite.go | 20 ++------------ pkg/sink/mysql/format_ddl.go | 11 +++----- 4 files changed, 33 insertions(+), 67 deletions(-) diff --git a/logservice/schemastore/utils.go b/logservice/schemastore/utils.go index 06d27ab785..43c3c20446 100644 --- a/logservice/schemastore/utils.go +++ b/logservice/schemastore/utils.go @@ -18,10 +18,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser" - "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/parser/format" "go.uber.org/zap" ) @@ -39,29 +38,11 @@ func transformDDLJobQuery(job *model.Job) (string, error) { return "", errors.Trace(err) } var result string - buildQuery := func(stmt ast.StmtNode) (string, error) { - var sb strings.Builder - // translate TiDB feature to special comment - restoreFlags := format.RestoreTiDBSpecialComment - // escape the keyword - restoreFlags |= format.RestoreNameBackQuotes - // upper case keyword - restoreFlags |= format.RestoreKeyWordUppercase - // wrap string with single quote - restoreFlags |= format.RestoreStringSingleQuotes - // remove placement rule - restoreFlags |= format.SkipPlacementRuleForRestore - // force disable ttl - restoreFlags |= format.RestoreWithTTLEnableOff - if err = stmt.Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil { - return "", errors.Trace(err) - } - return sb.String(), nil - } + if len(stmts) > 1 { results := make([]string, 0, len(stmts)) for _, stmt := range stmts { - query, err := buildQuery(stmt) + query, err := commonEvent.Restore(stmt) if err != nil { return "", errors.Trace(err) } @@ -69,7 +50,7 @@ func transformDDLJobQuery(job *model.Job) (string, error) { } result = strings.Join(results, ";") } else { - result, err = buildQuery(stmts[0]) + result, err = commonEvent.Restore(stmts[0]) if err != nil { return "", errors.Trace(err) } diff --git a/pkg/common/event/util.go b/pkg/common/event/util.go index 5e2237cfa6..ee61099b60 100644 --- a/pkg/common/event/util.go +++ b/pkg/common/event/util.go @@ -436,6 +436,27 @@ func toTableInfosKey(schema, table string) string { return schema + "." + table } +func Restore(stmt ast.StmtNode) (string, error) { + var sb strings.Builder + // translate TiDB feature to special comment + restoreFlags := format.RestoreTiDBSpecialComment + // escape the keyword + restoreFlags |= format.RestoreNameBackQuotes + // upper case keyword + restoreFlags |= format.RestoreKeyWordUppercase + // wrap string with single quote + restoreFlags |= format.RestoreStringSingleQuotes + // remove placement rule + restoreFlags |= format.SkipPlacementRuleForRestore + // force disable ttl + restoreFlags |= format.RestoreWithTTLEnableOff + err := stmt.Restore(format.NewRestoreCtx(restoreFlags, &sb)) + if err != nil { + return "", errors.Trace(err) + } + return sb.String(), nil +} + // SplitQueries takes a string containing multiple SQL statements and splits them into individual SQL statements. // This function is designed for scenarios like batch creation of tables, where multiple `CREATE TABLE` statements // might be combined into a single query string. @@ -453,31 +474,14 @@ func SplitQueries(queries string) ([]string, error) { var res []string for _, stmt := range stmts { - var sb strings.Builder - // translate TiDB feature to special comment - restoreFlags := format.RestoreTiDBSpecialComment - // escape the keyword - restoreFlags |= format.RestoreNameBackQuotes - // upper case keyword - restoreFlags |= format.RestoreKeyWordUppercase - // wrap string with single quote - restoreFlags |= format.RestoreStringSingleQuotes - // remove placement rule - restoreFlags |= format.SkipPlacementRuleForRestore - // force disable ttl - restoreFlags |= format.RestoreWithTTLEnableOff - err := stmt.Restore(&format.RestoreCtx{ - Flags: restoreFlags, - In: &sb, - }) + query, err := Restore(stmt) if err != nil { return nil, errors.WrapError(errors.ErrTiDBUnexpectedJobMeta, err) } // The (ast.Node).Restore function generates a SQL string representation of the AST (Abstract Syntax Tree) node. // By default, the resulting SQL string does not include a trailing semicolon ";". // Therefore, we explicitly append a semicolon here to ensure the SQL statement is complete. - sb.WriteByte(';') - res = append(res, sb.String()) + res = append(res, fmt.Sprintf("%s;", query)) } return res, nil diff --git a/pkg/sink/mysql/ddl_index_rewrite.go b/pkg/sink/mysql/ddl_index_rewrite.go index 61f683dcc2..3e187d61c5 100644 --- a/pkg/sink/mysql/ddl_index_rewrite.go +++ b/pkg/sink/mysql/ddl_index_rewrite.go @@ -14,14 +14,12 @@ package mysql import ( - "strings" - "github.com/pingcap/errors" "github.com/pingcap/ticdc/pkg/common" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" timodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/parser/format" ) type indexKeyPart struct { @@ -70,7 +68,7 @@ func restoreAnonymousIndexToNamedIndex(query string, tableInfo *common.TableInfo return query, false, nil } - restoredQuery, err := restoreDDLStmt(stmt) + restoredQuery, err := commonEvent.Restore(stmt) if err != nil { return query, false, err } @@ -181,17 +179,3 @@ func indexColumnsMatchKeyParts(indexColumns []*timodel.IndexColumn, keyParts []i } return true } - -func restoreDDLStmt(stmt ast.StmtNode) (string, error) { - var sb strings.Builder - restoreFlags := format.RestoreTiDBSpecialComment | - format.RestoreNameBackQuotes | - format.RestoreKeyWordUppercase | - format.RestoreStringSingleQuotes | - format.SkipPlacementRuleForRestore | - format.RestoreWithTTLEnableOff - if err := stmt.Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil { - return "", errors.Trace(err) - } - return sb.String(), nil -} diff --git a/pkg/sink/mysql/format_ddl.go b/pkg/sink/mysql/format_ddl.go index 8332d18ff2..8f039cb05a 100644 --- a/pkg/sink/mysql/format_ddl.go +++ b/pkg/sink/mysql/format_ddl.go @@ -14,12 +14,10 @@ package mysql import ( - "bytes" - "github.com/pingcap/log" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" - "github.com/pingcap/tidb/pkg/parser/format" "github.com/pingcap/tidb/pkg/parser/mysql" "go.uber.org/zap" ) @@ -55,10 +53,9 @@ func formatQuery(sql string) string { } stmt.Accept(&visiter{}) - buf := new(bytes.Buffer) - restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, buf) - if err = stmt.Restore(restoreCtx); err != nil { + query, err := commonEvent.Restore(stmt) + if err != nil { log.Error("format query restore failed", zap.Error(err)) } - return buf.String() + return query } From 8450f9887c8c575de03709cdf4bd2252a9b17635 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Mon, 9 Mar 2026 10:44:47 +0000 Subject: [PATCH 05/13] update Signed-off-by: wk989898 --- .../persist_storage_ddl_handlers.go | 14 +- logservice/schemastore/types.go | 4 + logservice/schemastore/utils.go | 21 +++ pkg/common/event/ddl_event.go | 4 + pkg/sink/mysql/ddl_index_rewrite.go | 123 +++----------- pkg/sink/mysql/mysql_writer_ddl.go | 2 +- .../mysql_writer_ddl_index_rewrite_test.go | 159 ++++++++++++++---- 7 files changed, 199 insertions(+), 128 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index 7296c2d550..a0abdd0b27 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -173,7 +173,7 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ buildDDLEventFunc: buildDDLEventForNormalDDLOnSingleTable, }, model.ActionAddIndex: { - buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, + buildPersistedDDLEventFunc: buildPersistedDDLEventForAddIndex, updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, updateFullTableInfoFunc: updateFullTableInfoForSingleTableDDL, updateSchemaMetadataFunc: updateSchemaMetadataIgnore, @@ -673,6 +673,15 @@ func buildPersistedDDLEventForNormalDDLOnSingleTable(args buildPersistedDDLEvent return event } +func buildPersistedDDLEventForAddIndex(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { + event := buildPersistedDDLEventCommon(args) + event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) + event.TableName = getTableName(args.tableMap, event.TableID) + indexIDs := getIndexIDs(args.job) + event.IndexIDs = indexIDs + return event +} + func buildPersistedDDLEventForTruncateTable(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { event := buildPersistedDDLEventCommon(args) // only table id change after truncate @@ -1743,7 +1752,8 @@ func buildDDLEventCommon(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, TiDBOnly: tiDBOnly, BDRMode: rawEvent.BDRRole, - NotSync: notSync, + NotSync: notSync, + IndexIDs: rawEvent.IndexIDs, }, !filtered, nil } diff --git a/logservice/schemastore/types.go b/logservice/schemastore/types.go index 5b16ed4eb3..6b776eb28e 100644 --- a/logservice/schemastore/types.go +++ b/logservice/schemastore/types.go @@ -88,6 +88,10 @@ type PersistedDDLEvent struct { // TODO: do we need the following two fields? BDRRole string `msg:"bdr_role"` CDCWriteSource uint64 `msg:"cdc_write_source"` + + // IndexIDs store the index ids that are related to the ddl job, only used for add index. + // use these id to recover the index name for the anonymous add index + IndexIDs []int64 `msg:"index_ids"` } // TODO: use msgp.Raw to do version management diff --git a/logservice/schemastore/utils.go b/logservice/schemastore/utils.go index 43c3c20446..de9bfe72f1 100644 --- a/logservice/schemastore/utils.go +++ b/logservice/schemastore/utils.go @@ -79,3 +79,24 @@ func isSplitable(tableInfo *model.TableInfo) bool { } return true } + +func getIndexIDs(job *model.Job) []int64 { + res := make([]int64, 0) + idxArgs, err := model.GetModifyIndexArgs(job) + if idxArgs == nil || err != nil { + for idx, subJob := range job.MultiSchemaInfo.SubJobs { + proxyJob := subJob.ToProxyJob(job, idx) + idxArgs, err := model.GetModifyIndexArgs(&proxyJob) + if idxArgs != nil && err == nil { + for _, indexArg := range idxArgs.IndexArgs { + res = append(res, indexArg.IndexID) + } + } + } + return res + } + for _, indexArg := range idxArgs.IndexArgs { + res = append(res, indexArg.IndexID) + } + return res +} diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index ccd96bc621..dec0b1dd47 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -105,6 +105,10 @@ type DDLEvent struct { // If the DDL involves multiple tables, this field is not effective. // The multiple table DDL event will be handled by filtering querys and table infos. NotSync bool `msg:"not_sync"` + + // IndexIDs store the index ids that are related to the ddl job, only used for add index. + // use these id to recover the index name for the anonymous add index + IndexIDs []int64 `msg:"index_ids"` } func (d *DDLEvent) String() string { diff --git a/pkg/sink/mysql/ddl_index_rewrite.go b/pkg/sink/mysql/ddl_index_rewrite.go index 3e187d61c5..24937012a6 100644 --- a/pkg/sink/mysql/ddl_index_rewrite.go +++ b/pkg/sink/mysql/ddl_index_rewrite.go @@ -17,18 +17,12 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" - timodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" ) -type indexKeyPart struct { - nameL string - length int -} - -func restoreAnonymousIndexToNamedIndex(query string, tableInfo *common.TableInfo) (string, bool, error) { - if query == "" || tableInfo == nil { +func restoreAnonymousIndexToNamedIndex(query string, tableInfo *common.TableInfo, indexIDs []int64) (string, bool, error) { + if query == "" || tableInfo == nil || len(indexIDs) == 0 { return query, false, nil } @@ -43,7 +37,13 @@ func restoreAnonymousIndexToNamedIndex(query string, tableInfo *common.TableInfo return query, false, nil } + indexNameByID := getIndexNameByIDMap(tableInfo) + if len(indexNameByID) == 0 { + return query, false, nil + } + changed := false + indexIDPos := 0 for _, spec := range alterStmt.Specs { if spec == nil || spec.Tp != ast.AlterTableAddConstraint || spec.Constraint == nil { continue @@ -55,8 +55,12 @@ func restoreAnonymousIndexToNamedIndex(query string, tableInfo *common.TableInfo if !isIndexConstraint(constraint) { continue } + if indexIDPos >= len(indexIDs) { + continue + } - indexName, ok := findIndexNameForConstraint(tableInfo, constraint) + indexName, ok := indexNameByID[indexIDs[indexIDPos]] + indexIDPos++ if !ok { continue } @@ -75,6 +79,21 @@ func restoreAnonymousIndexToNamedIndex(query string, tableInfo *common.TableInfo return restoredQuery, true, nil } +func getIndexNameByIDMap(tableInfo *common.TableInfo) map[int64]string { + indices := tableInfo.GetIndices() + if len(indices) == 0 { + return nil + } + indexNameByID := make(map[int64]string, len(indices)) + for _, index := range indices { + if index == nil { + continue + } + indexNameByID[index.ID] = index.Name.O + } + return indexNameByID +} + func isIndexConstraint(constraint *ast.Constraint) bool { if constraint == nil { return false @@ -93,89 +112,3 @@ func isIndexConstraint(constraint *ast.Constraint) bool { return false } } - -func isUniqueIndexConstraint(constraint *ast.Constraint) bool { - if constraint == nil { - return false - } - switch constraint.Tp { - case ast.ConstraintUniq, ast.ConstraintUniqKey, ast.ConstraintUniqIndex: - return true - default: - return false - } -} - -func findIndexNameForConstraint(tableInfo *common.TableInfo, constraint *ast.Constraint) (string, bool) { - keyParts, ok := getConstraintKeyParts(constraint) - if !ok { - return "", false - } - - wantUnique := isUniqueIndexConstraint(constraint) - indices := tableInfo.GetIndices() - if len(indices) == 0 { - return "", false - } - - matches := make([]*timodel.IndexInfo, 0, 1) - for _, index := range indices { - if index == nil || index.Primary { - continue - } - if index.Unique != wantUnique { - continue - } - // Only use non-public indices. - if index.State == timodel.StatePublic { - continue - } - if !indexColumnsMatchKeyParts(index.Columns, keyParts) { - continue - } - matches = append(matches, index) - } - if len(matches) != 1 { - return "", false - } - return matches[0].Name.O, true -} - -func getConstraintKeyParts(constraint *ast.Constraint) ([]indexKeyPart, bool) { - if constraint == nil || len(constraint.Keys) == 0 { - return nil, false - } - - parts := make([]indexKeyPart, 0, len(constraint.Keys)) - for _, key := range constraint.Keys { - if key == nil || key.Expr != nil || key.Column == nil { - return nil, false - } - parts = append(parts, indexKeyPart{ - nameL: key.Column.Name.L, - length: key.Length, - }) - } - return parts, true -} - -func indexColumnsMatchKeyParts(indexColumns []*timodel.IndexColumn, keyParts []indexKeyPart) bool { - if len(indexColumns) != len(keyParts) { - return false - } - for i, part := range keyParts { - col := indexColumns[i] - if col == nil { - return false - } - if col.Name.L != part.nameL { - return false - } - if part.length > 0 { - if col.Length != part.length { - return false - } - } - } - return true -} diff --git a/pkg/sink/mysql/mysql_writer_ddl.go b/pkg/sink/mysql/mysql_writer_ddl.go index 486af2f6a8..e168039e98 100644 --- a/pkg/sink/mysql/mysql_writer_ddl.go +++ b/pkg/sink/mysql/mysql_writer_ddl.go @@ -57,7 +57,7 @@ func (w *Writer) execDDL(event *commonEvent.DDLEvent) error { shouldSwitchDB := needSwitchDB(event) if event.GetDDLType() == timodel.ActionAddIndex { - newQuery, changed, err := restoreAnonymousIndexToNamedIndex(event.Query, event.TableInfo) + newQuery, changed, err := restoreAnonymousIndexToNamedIndex(event.Query, event.TableInfo, event.IndexIDs) if err != nil { log.Warn("failed to restore anonymous index name", zap.String("changefeed", w.ChangefeedID.String()), diff --git a/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go index 1526c9ccb4..af54fd037d 100644 --- a/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go +++ b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go @@ -17,6 +17,7 @@ import ( "testing" "github.com/DATA-DOG/go-sqlmock" + "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" timodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser" @@ -24,6 +25,62 @@ import ( "github.com/stretchr/testify/require" ) +func getIndexIDsFromJob(t *testing.T, job *timodel.Job) []int64 { + idxArgs, err := timodel.GetModifyIndexArgs(job) + if idxArgs != nil && err == nil { + indexIDs := make([]int64, 0, len(idxArgs.IndexArgs)) + for _, indexArg := range idxArgs.IndexArgs { + indexIDs = append(indexIDs, indexArg.IndexID) + } + return indexIDs + } + + indexIDs := make([]int64, 0) + require.NotNil(t, job.MultiSchemaInfo) + for idx, subJob := range job.MultiSchemaInfo.SubJobs { + proxyJob := subJob.ToProxyJob(job, idx) + subIdxArgs, subErr := timodel.GetModifyIndexArgs(&proxyJob) + if subIdxArgs == nil || subErr != nil { + continue + } + for _, indexArg := range subIdxArgs.IndexArgs { + indexIDs = append(indexIDs, indexArg.IndexID) + } + } + return indexIDs +} + +func getIndexNameByID(t *testing.T, tableInfo *common.TableInfo, indexID int64) string { + for _, index := range tableInfo.GetIndices() { + if index != nil && index.ID == indexID { + return index.Name.O + } + } + require.FailNow(t, "index id not found", "index id: %d", indexID) + return "" +} + +func parseAddIndexConstraintNames(t *testing.T, query string) []string { + p := parser.New() + stmt, err := p.ParseOneStmt(query, "", "") + require.NoError(t, err) + + alterStmt, ok := stmt.(*ast.AlterTableStmt) + require.True(t, ok) + + names := make([]string, 0) + for _, spec := range alterStmt.Specs { + if spec == nil || spec.Tp != ast.AlterTableAddConstraint || spec.Constraint == nil { + continue + } + if !isIndexConstraint(spec.Constraint) { + continue + } + names = append(names, spec.Constraint.Name) + } + return names +} + func TestExecDDL_RestoreAnonymousIndexToNamedIndex(t *testing.T) { writer, db, mock := newTestMysqlWriter(t) defer db.Close() @@ -40,41 +97,16 @@ func TestExecDDL_RestoreAnonymousIndexToNamedIndex(t *testing.T) { tableInfo := helper.GetTableInfo(job) require.NotNil(t, tableInfo) - expectedIndexName := "" - for _, index := range tableInfo.GetIndices() { - if index == nil || index.Primary || index.Unique { - continue - } - if len(index.Columns) != 1 { - continue - } - if index.Columns[0].Name.L == "name" { - expectedIndexName = index.Name.O - break - } - } - require.NotEmpty(t, expectedIndexName) + indexIDs := getIndexIDsFromJob(t, job) + require.Len(t, indexIDs, 1) + expectedIndexName := getIndexNameByID(t, tableInfo, indexIDs[0]) anonymousQuery := "ALTER TABLE `t` ADD INDEX (`name`)" - restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(anonymousQuery, tableInfo) + restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(anonymousQuery, tableInfo, indexIDs) require.NoError(t, err) require.True(t, changed) - - p := parser.New() - stmt, err := p.ParseOneStmt(restoredQuery, "", "") - require.NoError(t, err) - alterStmt, ok := stmt.(*ast.AlterTableStmt) - require.True(t, ok) - - restoredName := "" - for _, spec := range alterStmt.Specs { - if spec != nil && spec.Tp == ast.AlterTableAddConstraint && spec.Constraint != nil { - restoredName = spec.Constraint.Name - break - } - } - require.Equal(t, expectedIndexName, restoredName) + require.Equal(t, []string{expectedIndexName}, parseAddIndexConstraintNames(t, restoredQuery)) ddlEvent := &commonEvent.DDLEvent{ Type: byte(job.Type), @@ -82,6 +114,7 @@ func TestExecDDL_RestoreAnonymousIndexToNamedIndex(t *testing.T) { SchemaName: job.SchemaName, TableName: job.TableName, TableInfo: tableInfo, + IndexIDs: indexIDs, } mock.ExpectBegin() @@ -94,3 +127,69 @@ func TestExecDDL_RestoreAnonymousIndexToNamedIndex(t *testing.T) { require.NoError(t, err) require.NoError(t, mock.ExpectationsWereMet()) } + +func TestRestoreAnonymousIndexToNamedIndexMultipleAnonymousIndexes(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, name varchar(32), age int)") + + job := helper.DDL2Job("alter table t add index (name), add unique (age)") + + tableInfo := helper.GetTableInfo(job) + require.NotNil(t, tableInfo) + + indexIDs := getIndexIDsFromJob(t, job) + require.Len(t, indexIDs, 2) + + expectedNames := []string{ + getIndexNameByID(t, tableInfo, indexIDs[0]), + getIndexNameByID(t, tableInfo, indexIDs[1]), + } + + anonymousQuery := "ALTER TABLE `t` ADD INDEX (`name`), ADD UNIQUE (`age`)" + restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(anonymousQuery, tableInfo, indexIDs) + require.NoError(t, err) + require.True(t, changed) + require.Equal(t, expectedNames, parseAddIndexConstraintNames(t, restoredQuery)) +} + +func TestRestoreAnonymousIndexToNamedIndexOnlyConsumesAnonymousIDs(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, name varchar(32), age int)") + + job := helper.DDL2Job("alter table t add index idx_name(name), add index (age)") + + tableInfo := helper.GetTableInfo(job) + require.NotNil(t, tableInfo) + + anonymousIndexID := int64(0) + expectedAnonymousName := "" + for _, index := range tableInfo.GetIndices() { + if index == nil || len(index.Columns) != 1 { + continue + } + if index.Columns[0].Name.L == "age" { + anonymousIndexID = index.ID + expectedAnonymousName = index.Name.O + break + } + } + require.NotZero(t, anonymousIndexID) + require.NotEmpty(t, expectedAnonymousName) + + mixedQuery := "ALTER TABLE `t` ADD INDEX `idx_name` (`name`), ADD INDEX (`age`)" + restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(mixedQuery, tableInfo, []int64{anonymousIndexID}) + require.NoError(t, err) + require.True(t, changed) + require.Equal(t, []string{"idx_name", expectedAnonymousName}, parseAddIndexConstraintNames(t, restoredQuery)) + + unchangedQuery, unchanged, err := restoreAnonymousIndexToNamedIndex(mixedQuery, tableInfo, nil) + require.NoError(t, err) + require.False(t, unchanged) + require.Equal(t, mixedQuery, unchangedQuery) +} From e9c4ed77c9bb20b30dbc05a1dea8329683c51c4f Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 10 Mar 2026 04:07:55 +0000 Subject: [PATCH 06/13] update Signed-off-by: wk989898 --- logservice/schemastore/types_gen.go | 71 +++++++++++++++++++++++-- logservice/schemastore/utils.go | 3 ++ pkg/common/event/ddl_event.go | 27 ++++++++-- tests/integration_tests/ddl_wait/run.sh | 31 +++++++++++ 4 files changed, 123 insertions(+), 9 deletions(-) diff --git a/logservice/schemastore/types_gen.go b/logservice/schemastore/types_gen.go index d5f5b033f1..316b2e1fc7 100644 --- a/logservice/schemastore/types_gen.go +++ b/logservice/schemastore/types_gen.go @@ -284,6 +284,25 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "CDCWriteSource") return } + case "index_ids": + var zb0010 uint32 + zb0010, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "IndexIDs") + return + } + if cap(z.IndexIDs) >= int(zb0010) { + z.IndexIDs = (z.IndexIDs)[:zb0010] + } else { + z.IndexIDs = make([]int64, zb0010) + } + for za0009 := range z.IndexIDs { + z.IndexIDs[za0009], err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "IndexIDs", za0009) + return + } + } default: err = dc.Skip() if err != nil { @@ -297,9 +316,9 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 26 + // map header, size 27 // write "id" - err = en.Append(0xde, 0x0, 0x1a, 0xa2, 0x69, 0x64) + err = en.Append(0xde, 0x0, 0x1b, 0xa2, 0x69, 0x64) if err != nil { return } @@ -614,15 +633,32 @@ func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "CDCWriteSource") return } + // write "index_ids" + err = en.Append(0xa9, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x5f, 0x69, 0x64, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.IndexIDs))) + if err != nil { + err = msgp.WrapError(err, "IndexIDs") + return + } + for za0009 := range z.IndexIDs { + err = en.WriteInt64(z.IndexIDs[za0009]) + if err != nil { + err = msgp.WrapError(err, "IndexIDs", za0009) + return + } + } return } // MarshalMsg implements msgp.Marshaler func (z *PersistedDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 26 + // map header, size 27 // string "id" - o = append(o, 0xde, 0x0, 0x1a, 0xa2, 0x69, 0x64) + o = append(o, 0xde, 0x0, 0x1b, 0xa2, 0x69, 0x64) o = msgp.AppendInt64(o, z.ID) // string "type" o = append(o, 0xa4, 0x74, 0x79, 0x70, 0x65) @@ -723,6 +759,12 @@ func (z *PersistedDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { // string "cdc_write_source" o = append(o, 0xb0, 0x63, 0x64, 0x63, 0x5f, 0x77, 0x72, 0x69, 0x74, 0x65, 0x5f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65) o = msgp.AppendUint64(o, z.CDCWriteSource) + // string "index_ids" + o = append(o, 0xa9, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x5f, 0x69, 0x64, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.IndexIDs))) + for za0009 := range z.IndexIDs { + o = msgp.AppendInt64(o, z.IndexIDs[za0009]) + } return } @@ -1004,6 +1046,25 @@ func (z *PersistedDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "CDCWriteSource") return } + case "index_ids": + var zb0010 uint32 + zb0010, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "IndexIDs") + return + } + if cap(z.IndexIDs) >= int(zb0010) { + z.IndexIDs = (z.IndexIDs)[:zb0010] + } else { + z.IndexIDs = make([]int64, zb0010) + } + for za0009 := range z.IndexIDs { + z.IndexIDs[za0009], bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "IndexIDs", za0009) + return + } + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -1034,7 +1095,7 @@ func (z *PersistedDDLEvent) Msgsize() (s int) { for za0008 := range z.MultipleTableInfosValue { s += msgp.BytesPrefixSize + len(z.MultipleTableInfosValue[za0008]) } - s += 9 + msgp.StringPrefixSize + len(z.BDRRole) + 17 + msgp.Uint64Size + s += 9 + msgp.StringPrefixSize + len(z.BDRRole) + 17 + msgp.Uint64Size + 10 + msgp.ArrayHeaderSize + (len(z.IndexIDs) * (msgp.Int64Size)) return } diff --git a/logservice/schemastore/utils.go b/logservice/schemastore/utils.go index de9bfe72f1..5dc1f6c644 100644 --- a/logservice/schemastore/utils.go +++ b/logservice/schemastore/utils.go @@ -84,6 +84,9 @@ func getIndexIDs(job *model.Job) []int64 { res := make([]int64, 0) idxArgs, err := model.GetModifyIndexArgs(job) if idxArgs == nil || err != nil { + if job.MultiSchemaInfo == nil { + return res + } for idx, subJob := range job.MultiSchemaInfo.SubJobs { proxyJob := subJob.ToProxyJob(job, idx) idxArgs, err := model.GetModifyIndexArgs(&proxyJob) diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index dec0b1dd47..00e55cf74e 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -92,7 +92,7 @@ type DDLEvent struct { eventSize int64 `json:"-"` // for simple protocol - IsBootstrap bool `msg:"-"` + IsBootstrap bool `json:"-"` // NotSync is used to indicate whether the event should be synced to downstream. // If it is true, sink should not sync this event to downstream. // It is used for some special DDL events that do not need to be synced, @@ -104,11 +104,11 @@ type DDLEvent struct { // to ensure the new truncated table can be handled correctly. // If the DDL involves multiple tables, this field is not effective. // The multiple table DDL event will be handled by filtering querys and table infos. - NotSync bool `msg:"not_sync"` + NotSync bool `json:"not_sync"` // IndexIDs store the index ids that are related to the ddl job, only used for add index. // use these id to recover the index name for the anonymous add index - IndexIDs []int64 `msg:"index_ids"` + IndexIDs []int64 `json:"index_ids"` } func (d *DDLEvent) String() string { @@ -343,6 +343,15 @@ func (t DDLEvent) encodeV1() ([]byte, error) { binary.BigEndian.PutUint64(multipleTableInfosDataSize, uint64(len(t.MultipleTableInfos))) data = append(data, multipleTableInfosDataSize...) + // append index ids for add index ddl, to recover the index name for the anonymous add index + for _, info := range t.IndexIDs { + indexIDData := make([]byte, 8) + binary.BigEndian.PutUint64(indexIDData, uint64(info)) + data = append(data, indexIDData...) + } + indexIDsDataSize := make([]byte, 8) + binary.BigEndian.PutUint64(indexIDsDataSize, uint64(len(t.IndexIDs))) + data = append(data, indexIDsDataSize...) return data, nil } @@ -351,7 +360,17 @@ func (t *DDLEvent) decodeV1(data []byte) error { t.eventSize = int64(len(data)) end := len(data) + indexIDsDataSize := binary.BigEndian.Uint64(data[end-8 : end]) + end -= 8 + t.IndexIDs = make([]int64, 0, indexIDsDataSize) + for i := 0; i < int(indexIDsDataSize); i++ { + indexID := int64(binary.BigEndian.Uint64(data[end-8 : end])) + t.IndexIDs = append(t.IndexIDs, indexID) + end -= 8 + } + multipleTableInfosDataSize := binary.BigEndian.Uint64(data[end-8 : end]) + end -= 8 for i := 0; i < int(multipleTableInfosDataSize); i++ { tableInfoDataSize := binary.BigEndian.Uint64(data[end-8 : end]) tableInfoData := data[end-8-int(tableInfoDataSize) : end-8] @@ -362,7 +381,7 @@ func (t *DDLEvent) decodeV1(data []byte) error { t.MultipleTableInfos = append(t.MultipleTableInfos, info) end -= 8 + int(tableInfoDataSize) } - end -= 8 + int(multipleTableInfosDataSize) + tableInfoDataSize := binary.BigEndian.Uint64(data[end-8 : end]) var err error if tableInfoDataSize > 0 { diff --git a/tests/integration_tests/ddl_wait/run.sh b/tests/integration_tests/ddl_wait/run.sh index 977d7c26d3..9bd5f438b0 100755 --- a/tests/integration_tests/ddl_wait/run.sh +++ b/tests/integration_tests/ddl_wait/run.sh @@ -8,6 +8,21 @@ WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=cdc.test SINK_TYPE=$1 +function check_downstream_indexes_match_upstream() { + schema_name=$1 + table_name=$2 + index_names=$(mysql -uroot -h${UP_TIDB_HOST} -P${UP_TIDB_PORT} --default-character-set utf8mb4 -N \ + -e "SELECT DISTINCT INDEX_NAME FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_SCHEMA='${schema_name}' AND TABLE_NAME='${table_name}' ORDER BY INDEX_NAME;") + + while IFS= read -r index_name; do + if [ -z "$index_name" ]; then + continue + fi + run_sql "SHOW INDEX FROM \`${schema_name}\`.\`${table_name}\` WHERE Key_name='${index_name}';" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_contains "Key_name: ${index_name}" + done <<<"${index_names}" +} + # This test simulates DDL operations that take a long time. # TiCDC blocks DDL operations until its state is not running, except for adding indexes. # TiCDC also checks add index ddl state before execute a new DDL. @@ -72,6 +87,22 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY # make sure all tables are equal in upstream and downstream check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 300 + + # anonymous add index related ddl + run_sql "create table test.t_anon_idx (id int primary key, a int, b int, c int);" + run_sql "insert into test.t_anon_idx values (1, 10, 20, 30), (2, 11, 21, 31), (3, 12, 22, 32);" + run_sql "alter table test.t_anon_idx add index (a);" + run_sql "alter table test.t_anon_idx add index idx_b (b), add index (a);" + run_sql "alter table test.t_anon_idx add index (a), add unique (b, c);" + run_sql "create table test.t_anon_idx_like like test.t_anon_idx;" + run_sql "insert into test.t_anon_idx values (4, 13, 23, 33);" + check_table_exists test.t_anon_idx ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_table_exists test.t_anon_idx_like ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 + check_downstream_indexes_match_upstream test t_anon_idx + check_downstream_indexes_match_upstream test t_anon_idx_like + + # ensure both data and index schema are eventually consistent after anonymous index ddl + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 300 cleanup_process $CDC_BINARY } From d65a079ddcf6067f961e2374571398969c99634e Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 10 Mar 2026 04:09:38 +0000 Subject: [PATCH 07/13] chore Signed-off-by: wk989898 --- pkg/common/event/ddl_event.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index 00e55cf74e..1873a0c2b5 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -301,7 +301,7 @@ func (t *DDLEvent) Unmarshal(data []byte) error { } func (t DDLEvent) encodeV1() ([]byte, error) { - // restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | multipleTableInfos | multipletableInfosDataSize + // restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | multipleTableInfos | multipletableInfosDataSize | indexIDsData | indexIDsDataSize // Note: version is now handled in the header by Marshal(), not here data, err := json.Marshal(t) if err != nil { @@ -356,7 +356,7 @@ func (t DDLEvent) encodeV1() ([]byte, error) { } func (t *DDLEvent) decodeV1(data []byte) error { - // restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | multipleTableInfos | multipleTableInfosDataSize + // restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | multipleTableInfos | multipleTableInfosDataSize | indexIDsDataSize | indexIDsData t.eventSize = int64(len(data)) end := len(data) From d2a908cd48aa78ca097b992df0cbf4a1b2a589b5 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 10 Mar 2026 05:17:57 +0000 Subject: [PATCH 08/13] fix Signed-off-by: wk989898 --- pkg/common/event/ddl_event.go | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index 1873a0c2b5..62c789e605 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -342,33 +342,14 @@ func (t DDLEvent) encodeV1() ([]byte, error) { multipleTableInfosDataSize := make([]byte, 8) binary.BigEndian.PutUint64(multipleTableInfosDataSize, uint64(len(t.MultipleTableInfos))) data = append(data, multipleTableInfosDataSize...) - - // append index ids for add index ddl, to recover the index name for the anonymous add index - for _, info := range t.IndexIDs { - indexIDData := make([]byte, 8) - binary.BigEndian.PutUint64(indexIDData, uint64(info)) - data = append(data, indexIDData...) - } - indexIDsDataSize := make([]byte, 8) - binary.BigEndian.PutUint64(indexIDsDataSize, uint64(len(t.IndexIDs))) - data = append(data, indexIDsDataSize...) return data, nil } func (t *DDLEvent) decodeV1(data []byte) error { - // restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | multipleTableInfos | multipleTableInfosDataSize | indexIDsDataSize | indexIDsData + // restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | multipleTableInfos | multipleTableInfosDataSize t.eventSize = int64(len(data)) end := len(data) - indexIDsDataSize := binary.BigEndian.Uint64(data[end-8 : end]) - end -= 8 - t.IndexIDs = make([]int64, 0, indexIDsDataSize) - for i := 0; i < int(indexIDsDataSize); i++ { - indexID := int64(binary.BigEndian.Uint64(data[end-8 : end])) - t.IndexIDs = append(t.IndexIDs, indexID) - end -= 8 - } - multipleTableInfosDataSize := binary.BigEndian.Uint64(data[end-8 : end]) end -= 8 for i := 0; i < int(multipleTableInfosDataSize); i++ { From c3af506b2b4a9a504b3819bf8ecab7e714ef7b5a Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 10 Mar 2026 05:21:13 +0000 Subject: [PATCH 09/13] chore Signed-off-by: wk989898 --- pkg/common/event/ddl_event.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index 62c789e605..261cd23b18 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -301,7 +301,7 @@ func (t *DDLEvent) Unmarshal(data []byte) error { } func (t DDLEvent) encodeV1() ([]byte, error) { - // restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | multipleTableInfos | multipletableInfosDataSize | indexIDsData | indexIDsDataSize + // restData | dispatcherIDData | dispatcherIDDataSize | tableInfoData | tableInfoDataSize | multipleTableInfos | multipletableInfosDataSize // Note: version is now handled in the header by Marshal(), not here data, err := json.Marshal(t) if err != nil { From 2081cf88d6248c419ac619b25745a85846a44591 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 10 Mar 2026 06:35:35 +0000 Subject: [PATCH 10/13] support multischema change Signed-off-by: wk989898 --- pkg/sink/mysql/mysql_writer_ddl.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/sink/mysql/mysql_writer_ddl.go b/pkg/sink/mysql/mysql_writer_ddl.go index e168039e98..06ca263d6a 100644 --- a/pkg/sink/mysql/mysql_writer_ddl.go +++ b/pkg/sink/mysql/mysql_writer_ddl.go @@ -56,7 +56,8 @@ func (w *Writer) execDDL(event *commonEvent.DDLEvent) error { ctx := w.ctx shouldSwitchDB := needSwitchDB(event) - if event.GetDDLType() == timodel.ActionAddIndex { + switch event.GetDDLType() { + case timodel.ActionMultiSchemaChange, timodel.ActionAddIndex: newQuery, changed, err := restoreAnonymousIndexToNamedIndex(event.Query, event.TableInfo, event.IndexIDs) if err != nil { log.Warn("failed to restore anonymous index name", From 1542d4ad1070422d1aad0c50df66606032046504 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 10 Mar 2026 07:45:06 +0000 Subject: [PATCH 11/13] fix Signed-off-by: wk989898 --- .../persist_storage_ddl_handlers.go | 11 ++- logservice/schemastore/types.go | 4 +- logservice/schemastore/utils_test.go | 98 +++++++++++++++++++ pkg/common/event/ddl_event.go | 4 +- pkg/sink/mysql/ddl_index_rewrite.go | 23 +++-- .../mysql_writer_ddl_index_rewrite_test.go | 55 ++++++++++- tests/integration_tests/ddl_wait/run.sh | 3 + 7 files changed, 177 insertions(+), 21 deletions(-) diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index a0abdd0b27..d0546981c0 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -407,7 +407,7 @@ var allDDLHandlers = map[model.ActionType]*persistStorageDDLHandler{ buildDDLEventFunc: buildDDLEventForCreateTables, }, model.ActionMultiSchemaChange: { - buildPersistedDDLEventFunc: buildPersistedDDLEventForNormalDDLOnSingleTable, + buildPersistedDDLEventFunc: buildPersistedDDLEventForMultiSchemaChange, updateDDLHistoryFunc: updateDDLHistoryForNormalDDLOnSingleTable, updateFullTableInfoFunc: updateFullTableInfoForSingleTableDDL, updateSchemaMetadataFunc: updateSchemaMetadataIgnore, @@ -673,12 +673,17 @@ func buildPersistedDDLEventForNormalDDLOnSingleTable(args buildPersistedDDLEvent return event } +func buildPersistedDDLEventForMultiSchemaChange(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { + event := buildPersistedDDLEventForNormalDDLOnSingleTable(args) + event.IndexIDs = getIndexIDs(args.job) + return event +} + func buildPersistedDDLEventForAddIndex(args buildPersistedDDLEventFuncArgs) PersistedDDLEvent { event := buildPersistedDDLEventCommon(args) event.SchemaName = getSchemaName(args.databaseMap, event.SchemaID) event.TableName = getTableName(args.tableMap, event.TableID) - indexIDs := getIndexIDs(args.job) - event.IndexIDs = indexIDs + event.IndexIDs = getIndexIDs(args.job) return event } diff --git a/logservice/schemastore/types.go b/logservice/schemastore/types.go index 6b776eb28e..1fbfce210b 100644 --- a/logservice/schemastore/types.go +++ b/logservice/schemastore/types.go @@ -89,8 +89,8 @@ type PersistedDDLEvent struct { BDRRole string `msg:"bdr_role"` CDCWriteSource uint64 `msg:"cdc_write_source"` - // IndexIDs store the index ids that are related to the ddl job, only used for add index. - // use these id to recover the index name for the anonymous add index + // IndexIDs store the add index ids in SQL order for add index and multi schema change DDLs. + // MySQL sink uses them to recover anonymous index names. IndexIDs []int64 `msg:"index_ids"` } diff --git a/logservice/schemastore/utils_test.go b/logservice/schemastore/utils_test.go index 98df7e11c0..8d7d9be4bd 100644 --- a/logservice/schemastore/utils_test.go +++ b/logservice/schemastore/utils_test.go @@ -17,6 +17,7 @@ import ( "testing" commonEvent "github.com/pingcap/ticdc/pkg/common/event" + "github.com/pingcap/tidb/pkg/meta/model" "github.com/stretchr/testify/require" ) @@ -60,3 +61,100 @@ func TestIsSplitable(t *testing.T) { tableInfo = helper.GetModelTableInfo(job) require.False(t, isSplitable(tableInfo)) } + +func TestBuildPersistedDDLEventForMultiSchemaChangeContainsIndexIDs(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, c1 int)") + + job := helper.DDL2Job("alter table t add column c2 int, add index (c1)") + require.Equal(t, model.ActionMultiSchemaChange, job.Type) + + args := buildPersistedDDLEventFuncArgs{ + job: job, + databaseMap: map[int64]*BasicDatabaseInfo{ + job.SchemaID: { + Name: "test", + Tables: map[int64]bool{ + job.TableID: true, + }, + }, + }, + tableMap: map[int64]*BasicTableInfo{ + job.TableID: { + SchemaID: job.SchemaID, + Name: "t", + }, + }, + } + + event := buildPersistedDDLEventForMultiSchemaChange(args) + expectedIndexIDs := getIndexIDs(job) + require.Len(t, expectedIndexIDs, 1) + require.Equal(t, expectedIndexIDs, event.IndexIDs) + require.Equal(t, "test", event.SchemaName) + require.Equal(t, "t", event.TableName) +} + +func TestGetIndexIDsReturnsAllAddIndexIDsInOrder(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, c1 int)") + + job := helper.DDL2Job("alter table t add index idx_c1(c1), add index (c1)") + tableInfo := helper.GetModelTableInfo(job) + require.NotNil(t, tableInfo) + + var namedIndexID int64 + var anonymousIndexID int64 + for _, index := range tableInfo.Indices { + if index == nil { + continue + } + if index.Name.O == "idx_c1" { + namedIndexID = index.ID + continue + } + if len(index.Columns) == 1 && index.Columns[0].Name.L == "c1" { + anonymousIndexID = index.ID + } + } + require.NotZero(t, namedIndexID) + require.NotZero(t, anonymousIndexID) + require.Equal(t, []int64{namedIndexID, anonymousIndexID}, getIndexIDs(job)) +} + +func TestGetIndexIDsReturnsAllAddIndexIDsInOrderForMultiSchemaChange(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, c1 int)") + + job := helper.DDL2Job("alter table t add column c2 int, add index idx_c1(c1), add index (c1)") + require.Equal(t, model.ActionMultiSchemaChange, job.Type) + tableInfo := helper.GetModelTableInfo(job) + require.NotNil(t, tableInfo) + + var namedIndexID int64 + var anonymousIndexID int64 + for _, index := range tableInfo.Indices { + if index == nil { + continue + } + if index.Name.O == "idx_c1" { + namedIndexID = index.ID + continue + } + if len(index.Columns) == 1 && index.Columns[0].Name.L == "c1" { + anonymousIndexID = index.ID + } + } + require.NotZero(t, namedIndexID) + require.NotZero(t, anonymousIndexID) + require.Equal(t, []int64{namedIndexID, anonymousIndexID}, getIndexIDs(job)) +} diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index 261cd23b18..ffa4bc94b7 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -106,8 +106,8 @@ type DDLEvent struct { // The multiple table DDL event will be handled by filtering querys and table infos. NotSync bool `json:"not_sync"` - // IndexIDs store the index ids that are related to the ddl job, only used for add index. - // use these id to recover the index name for the anonymous add index + // IndexIDs store the add index ids in SQL order for add index and multi schema change DDLs. + // MySQL sink uses them to recover anonymous index names. IndexIDs []int64 `json:"index_ids"` } diff --git a/pkg/sink/mysql/ddl_index_rewrite.go b/pkg/sink/mysql/ddl_index_rewrite.go index 24937012a6..7223b8337c 100644 --- a/pkg/sink/mysql/ddl_index_rewrite.go +++ b/pkg/sink/mysql/ddl_index_rewrite.go @@ -42,25 +42,30 @@ func restoreAnonymousIndexToNamedIndex(query string, tableInfo *common.TableInfo return query, false, nil } - changed := false - indexIDPos := 0 + indexConstraints := make([]*ast.Constraint, 0) for _, spec := range alterStmt.Specs { if spec == nil || spec.Tp != ast.AlterTableAddConstraint || spec.Constraint == nil { continue } constraint := spec.Constraint - if constraint.Name != "" { - continue - } if !isIndexConstraint(constraint) { continue } - if indexIDPos >= len(indexIDs) { + indexConstraints = append(indexConstraints, constraint) + } + if len(indexConstraints) == 0 { + return query, false, nil + } + + changed := false + for i, constraint := range indexConstraints { + if i >= len(indexIDs) { + break + } + if constraint.Name != "" { continue } - - indexName, ok := indexNameByID[indexIDs[indexIDPos]] - indexIDPos++ + indexName, ok := indexNameByID[indexIDs[i]] if !ok { continue } diff --git a/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go index af54fd037d..48471ab3c0 100644 --- a/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go +++ b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go @@ -155,7 +155,7 @@ func TestRestoreAnonymousIndexToNamedIndexMultipleAnonymousIndexes(t *testing.T) require.Equal(t, expectedNames, parseAddIndexConstraintNames(t, restoredQuery)) } -func TestRestoreAnonymousIndexToNamedIndexOnlyConsumesAnonymousIDs(t *testing.T) { +func TestRestoreAnonymousIndexToNamedIndexWithNamedAndAnonymousIndexes(t *testing.T) { helper := commonEvent.NewEventTestHelper(t) defer helper.Close() @@ -166,24 +166,23 @@ func TestRestoreAnonymousIndexToNamedIndexOnlyConsumesAnonymousIDs(t *testing.T) tableInfo := helper.GetTableInfo(job) require.NotNil(t, tableInfo) + indexIDs := getIndexIDsFromJob(t, job) + require.Len(t, indexIDs, 2) - anonymousIndexID := int64(0) expectedAnonymousName := "" for _, index := range tableInfo.GetIndices() { if index == nil || len(index.Columns) != 1 { continue } if index.Columns[0].Name.L == "age" { - anonymousIndexID = index.ID expectedAnonymousName = index.Name.O break } } - require.NotZero(t, anonymousIndexID) require.NotEmpty(t, expectedAnonymousName) mixedQuery := "ALTER TABLE `t` ADD INDEX `idx_name` (`name`), ADD INDEX (`age`)" - restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(mixedQuery, tableInfo, []int64{anonymousIndexID}) + restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(mixedQuery, tableInfo, indexIDs) require.NoError(t, err) require.True(t, changed) require.Equal(t, []string{"idx_name", expectedAnonymousName}, parseAddIndexConstraintNames(t, restoredQuery)) @@ -193,3 +192,49 @@ func TestRestoreAnonymousIndexToNamedIndexOnlyConsumesAnonymousIDs(t *testing.T) require.False(t, unchanged) require.Equal(t, mixedQuery, unchangedQuery) } + +func TestExecDDL_RestoreAnonymousIndexToNamedIndexForMultiSchemaChange(t *testing.T) { + writer, db, mock := newTestMysqlWriter(t) + defer db.Close() + + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t (id int primary key, name varchar(32))") + + job := helper.DDL2Job("alter table t add column age int, add index (name)") + require.Equal(t, timodel.ActionMultiSchemaChange, job.Type) + + tableInfo := helper.GetTableInfo(job) + require.NotNil(t, tableInfo) + + indexIDs := getIndexIDsFromJob(t, job) + require.Len(t, indexIDs, 1) + expectedIndexName := getIndexNameByID(t, tableInfo, indexIDs[0]) + + anonymousQuery := "ALTER TABLE `t` ADD COLUMN `age` INT, ADD INDEX (`name`)" + restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(anonymousQuery, tableInfo, indexIDs) + require.NoError(t, err) + require.True(t, changed) + require.Equal(t, []string{expectedIndexName}, parseAddIndexConstraintNames(t, restoredQuery)) + + ddlEvent := &commonEvent.DDLEvent{ + Type: byte(job.Type), + Query: anonymousQuery, + SchemaName: job.SchemaName, + TableName: job.TableName, + TableInfo: tableInfo, + IndexIDs: indexIDs, + } + + mock.ExpectBegin() + mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(restoredQuery).WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + err = writer.execDDL(ddlEvent) + require.NoError(t, err) + require.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/tests/integration_tests/ddl_wait/run.sh b/tests/integration_tests/ddl_wait/run.sh index 9bd5f438b0..a3453addb9 100755 --- a/tests/integration_tests/ddl_wait/run.sh +++ b/tests/integration_tests/ddl_wait/run.sh @@ -94,6 +94,9 @@ function run() { run_sql "alter table test.t_anon_idx add index (a);" run_sql "alter table test.t_anon_idx add index idx_b (b), add index (a);" run_sql "alter table test.t_anon_idx add index (a), add unique (b, c);" + run_sql "alter table test.t_anon_idx add index (a), add column d int;" + run_sql "alter table test.t_anon_idx add column e int, add index (a);" + run_sql "alter table test.t_anon_idx drop column d, drop column e;" run_sql "create table test.t_anon_idx_like like test.t_anon_idx;" run_sql "insert into test.t_anon_idx values (4, 13, 23, 33);" check_table_exists test.t_anon_idx ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 From 37024aa405edf0fff0700003d28441b5848c31ef Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 10 Mar 2026 09:38:45 +0000 Subject: [PATCH 12/13] add more test Signed-off-by: wk989898 --- .../mysql_writer_ddl_index_rewrite_test.go | 250 ++++++++++++++++++ tests/integration_tests/ddl_wait/run.sh | 4 + 2 files changed, 254 insertions(+) diff --git a/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go index 48471ab3c0..ca0d7b5d79 100644 --- a/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go +++ b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go @@ -14,6 +14,7 @@ package mysql import ( + "slices" "testing" "github.com/DATA-DOG/go-sqlmock" @@ -81,6 +82,74 @@ func parseAddIndexConstraintNames(t *testing.T, query string) []string { return names } +func expectedAddIndexConstraintNames(t *testing.T, query string, tableInfo *common.TableInfo, indexIDs []int64) []string { + p := parser.New() + stmt, err := p.ParseOneStmt(query, "", "") + require.NoError(t, err) + + alterStmt, ok := stmt.(*ast.AlterTableStmt) + require.True(t, ok) + + names := make([]string, 0) + indexPos := 0 + for _, spec := range alterStmt.Specs { + if spec == nil || spec.Tp != ast.AlterTableAddConstraint || spec.Constraint == nil { + continue + } + if !isIndexConstraint(spec.Constraint) { + continue + } + if spec.Constraint.Name != "" { + names = append(names, spec.Constraint.Name) + } else { + require.Less(t, indexPos, len(indexIDs)) + names = append(names, getIndexNameByID(t, tableInfo, indexIDs[indexPos])) + } + indexPos++ + } + require.Equal(t, indexPos, len(indexIDs)) + return names +} + +func assertRestoreAnonymousIndexToNamedIndex( + t *testing.T, + helper *commonEvent.EventTestHelper, + ddlSQL string, + anonymousQuery string, + expectedChanged bool, + expectedDDLType timodel.ActionType, +) *common.TableInfo { + job := helper.DDL2Job(ddlSQL) + require.Equal(t, expectedDDLType, job.Type) + + tableInfo := helper.GetTableInfo(job) + require.NotNil(t, tableInfo) + + indexIDs := getIndexIDsFromJob(t, job) + restoredQuery, changed, err := restoreAnonymousIndexToNamedIndex(anonymousQuery, tableInfo, indexIDs) + require.NoError(t, err) + require.Equal(t, expectedChanged, changed) + if !expectedChanged { + require.Equal(t, anonymousQuery, restoredQuery) + return tableInfo + } + + require.Equal(t, expectedAddIndexConstraintNames(t, anonymousQuery, tableInfo, indexIDs), parseAddIndexConstraintNames(t, restoredQuery)) + return tableInfo +} + +func getSecondaryIndexNames(tableInfo *common.TableInfo) []string { + indexNames := make([]string, 0) + for _, index := range tableInfo.GetIndices() { + if index == nil || index.Primary { + continue + } + indexNames = append(indexNames, index.Name.O) + } + slices.Sort(indexNames) + return indexNames +} + func TestExecDDL_RestoreAnonymousIndexToNamedIndex(t *testing.T) { writer, db, mock := newTestMysqlWriter(t) defer db.Close() @@ -238,3 +307,184 @@ func TestExecDDL_RestoreAnonymousIndexToNamedIndexForMultiSchemaChange(t *testin require.NoError(t, err) require.NoError(t, mock.ExpectationsWereMet()) } + +func TestRestoreAnonymousIndexToNamedIndexDDLWaitCases(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t_anon_idx (id int primary key, a int, b int, c int)") + + testCases := []struct { + name string + ddlSQL string + anonymousQuery string + expectedChanged bool + expectedDDLType timodel.ActionType + }{ + { + name: "single anonymous index", + ddlSQL: "alter table t_anon_idx add index (a)", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD INDEX (`a`)", + expectedChanged: true, + expectedDDLType: timodel.ActionAddIndex, + }, + { + name: "named and anonymous indexes", + ddlSQL: "alter table t_anon_idx add index idx_b(b), add index (a)", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD INDEX `idx_b`(`b`), ADD INDEX (`a`)", + expectedChanged: true, + expectedDDLType: timodel.ActionMultiSchemaChange, + }, + { + name: "anonymous index and anonymous unique", + ddlSQL: "alter table t_anon_idx add index (a), add unique (b, c)", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD INDEX (`a`), ADD UNIQUE (`b`, `c`)", + expectedChanged: true, + expectedDDLType: timodel.ActionMultiSchemaChange, + }, + { + name: "anonymous index before add column", + ddlSQL: "alter table t_anon_idx add index (a), add column d int", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD INDEX (`a`), ADD COLUMN `d` INT", + expectedChanged: true, + expectedDDLType: timodel.ActionMultiSchemaChange, + }, + { + name: "anonymous index after add column", + ddlSQL: "alter table t_anon_idx add column e int, add index (a)", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD COLUMN `e` INT, ADD INDEX (`a`)", + expectedChanged: true, + expectedDDLType: timodel.ActionMultiSchemaChange, + }, + { + name: "named index keeps original name", + ddlSQL: "alter table t_anon_idx add index a_7(a)", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD INDEX `a_7`(`a`)", + expectedChanged: false, + expectedDDLType: timodel.ActionAddIndex, + }, + { + name: "anonymous index after explicit suffix", + ddlSQL: "alter table t_anon_idx add index (a)", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD INDEX (`a`)", + expectedChanged: true, + expectedDDLType: timodel.ActionAddIndex, + }, + { + name: "repeated anonymous index second time", + ddlSQL: "alter table t_anon_idx add index (a)", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD INDEX (`a`)", + expectedChanged: true, + expectedDDLType: timodel.ActionAddIndex, + }, + { + name: "repeated anonymous index third time", + ddlSQL: "alter table t_anon_idx add index (a)", + anonymousQuery: "ALTER TABLE `t_anon_idx` ADD INDEX (`a`)", + expectedChanged: true, + expectedDDLType: timodel.ActionAddIndex, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + tc.ddlSQL, + tc.anonymousQuery, + tc.expectedChanged, + tc.expectedDDLType, + ) + }) + } +} + +func TestCreateTableLikeKeepsAnonymousIndexNamesAfterDDLWaitCases(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Event("create table t_anon_idx (id int primary key, a int, b int, c int)") + var sourceTableInfo *common.TableInfo + + assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add index (a)", + "ALTER TABLE `t_anon_idx` ADD INDEX (`a`)", + true, + timodel.ActionAddIndex, + ) + assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add index idx_b(b), add index (a)", + "ALTER TABLE `t_anon_idx` ADD INDEX `idx_b`(`b`), ADD INDEX (`a`)", + true, + timodel.ActionMultiSchemaChange, + ) + assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add index (a), add unique (b, c)", + "ALTER TABLE `t_anon_idx` ADD INDEX (`a`), ADD UNIQUE (`b`, `c`)", + true, + timodel.ActionMultiSchemaChange, + ) + assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add index (a), add column d int", + "ALTER TABLE `t_anon_idx` ADD INDEX (`a`), ADD COLUMN `d` INT", + true, + timodel.ActionMultiSchemaChange, + ) + sourceTableInfo = assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add column e int, add index (a)", + "ALTER TABLE `t_anon_idx` ADD COLUMN `e` INT, ADD INDEX (`a`)", + true, + timodel.ActionMultiSchemaChange, + ) + assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add index a_7(a)", + "ALTER TABLE `t_anon_idx` ADD INDEX `a_7`(`a`)", + false, + timodel.ActionAddIndex, + ) + sourceTableInfo = assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add index (a)", + "ALTER TABLE `t_anon_idx` ADD INDEX (`a`)", + true, + timodel.ActionAddIndex, + ) + sourceTableInfo = assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add index (a)", + "ALTER TABLE `t_anon_idx` ADD INDEX (`a`)", + true, + timodel.ActionAddIndex, + ) + sourceTableInfo = assertRestoreAnonymousIndexToNamedIndex( + t, + helper, + "alter table t_anon_idx add index (a)", + "ALTER TABLE `t_anon_idx` ADD INDEX (`a`)", + true, + timodel.ActionAddIndex, + ) + + likeJob := helper.DDL2Job("create table t_anon_idx_like like t_anon_idx") + likeTableInfo := helper.GetTableInfo(likeJob) + require.NotNil(t, sourceTableInfo) + require.NotNil(t, likeTableInfo) + require.Equal(t, getSecondaryIndexNames(sourceTableInfo), getSecondaryIndexNames(likeTableInfo)) +} diff --git a/tests/integration_tests/ddl_wait/run.sh b/tests/integration_tests/ddl_wait/run.sh index a3453addb9..ef1f281196 100755 --- a/tests/integration_tests/ddl_wait/run.sh +++ b/tests/integration_tests/ddl_wait/run.sh @@ -97,6 +97,10 @@ function run() { run_sql "alter table test.t_anon_idx add index (a), add column d int;" run_sql "alter table test.t_anon_idx add column e int, add index (a);" run_sql "alter table test.t_anon_idx drop column d, drop column e;" + run_sql "alter table test.t_anon_idx add index a_7(a);" + run_sql "alter table test.t_anon_idx add index (a);" + run_sql "alter table test.t_anon_idx add index (a);" + run_sql "alter table test.t_anon_idx add index (a);" run_sql "create table test.t_anon_idx_like like test.t_anon_idx;" run_sql "insert into test.t_anon_idx values (4, 13, 23, 33);" check_table_exists test.t_anon_idx ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300 From a4d8d1ca72c497b251d79b584f4fe6856e312290 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 11 Mar 2026 03:47:04 +0000 Subject: [PATCH 13/13] fix ut Signed-off-by: wk989898 --- pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go index ca0d7b5d79..0a2eacf008 100644 --- a/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go +++ b/pkg/sink/mysql/mysql_writer_ddl_index_rewrite_test.go @@ -26,6 +26,10 @@ import ( "github.com/stretchr/testify/require" ) +func disableDistTaskForTest(helper *commonEvent.EventTestHelper) { + helper.Tk().MustExec("set @@global.tidb_enable_dist_task = off") +} + func getIndexIDsFromJob(t *testing.T, job *timodel.Job) []int64 { idxArgs, err := timodel.GetModifyIndexArgs(job) if idxArgs != nil && err == nil { @@ -156,6 +160,7 @@ func TestExecDDL_RestoreAnonymousIndexToNamedIndex(t *testing.T) { helper := commonEvent.NewEventTestHelper(t) defer helper.Close() + disableDistTaskForTest(helper) helper.Tk().MustExec("use test") helper.DDL2Event("create table t (id int primary key, name varchar(32), index name(id))") @@ -200,6 +205,7 @@ func TestExecDDL_RestoreAnonymousIndexToNamedIndex(t *testing.T) { func TestRestoreAnonymousIndexToNamedIndexMultipleAnonymousIndexes(t *testing.T) { helper := commonEvent.NewEventTestHelper(t) defer helper.Close() + disableDistTaskForTest(helper) helper.Tk().MustExec("use test") helper.DDL2Event("create table t (id int primary key, name varchar(32), age int)") @@ -227,6 +233,7 @@ func TestRestoreAnonymousIndexToNamedIndexMultipleAnonymousIndexes(t *testing.T) func TestRestoreAnonymousIndexToNamedIndexWithNamedAndAnonymousIndexes(t *testing.T) { helper := commonEvent.NewEventTestHelper(t) defer helper.Close() + disableDistTaskForTest(helper) helper.Tk().MustExec("use test") helper.DDL2Event("create table t (id int primary key, name varchar(32), age int)") @@ -268,6 +275,7 @@ func TestExecDDL_RestoreAnonymousIndexToNamedIndexForMultiSchemaChange(t *testin helper := commonEvent.NewEventTestHelper(t) defer helper.Close() + disableDistTaskForTest(helper) helper.Tk().MustExec("use test") helper.DDL2Event("create table t (id int primary key, name varchar(32))") @@ -311,6 +319,7 @@ func TestExecDDL_RestoreAnonymousIndexToNamedIndexForMultiSchemaChange(t *testin func TestRestoreAnonymousIndexToNamedIndexDDLWaitCases(t *testing.T) { helper := commonEvent.NewEventTestHelper(t) defer helper.Close() + disableDistTaskForTest(helper) helper.Tk().MustExec("use test") helper.DDL2Event("create table t_anon_idx (id int primary key, a int, b int, c int)") @@ -404,6 +413,7 @@ func TestRestoreAnonymousIndexToNamedIndexDDLWaitCases(t *testing.T) { func TestCreateTableLikeKeepsAnonymousIndexNamesAfterDDLWaitCases(t *testing.T) { helper := commonEvent.NewEventTestHelper(t) defer helper.Close() + disableDistTaskForTest(helper) helper.Tk().MustExec("use test") helper.DDL2Event("create table t_anon_idx (id int primary key, a int, b int, c int)")