Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8729a9d
mysql,sqlmodel: support table route in mysql sink
3AceShowHand May 7, 2026
41025ce
adjust tests
3AceShowHand May 7, 2026
ea7ed99
adjust tests
3AceShowHand May 7, 2026
9573010
adjust more code
3AceShowHand May 7, 2026
0ca3112
fix test, the ddl may does not have default schema name
3AceShowHand May 7, 2026
d641c7f
fix integration test
3AceShowHand May 7, 2026
3a85d40
add cross db tests
3AceShowHand May 7, 2026
6fd9d54
ignore unrelated DDL to the dispatcher
3AceShowHand May 8, 2026
2115c06
update tests
3AceShowHand May 8, 2026
bf3fc9c
Merge branch 'master' into table-route-pr6-mysql-sink
3AceShowHand May 8, 2026
3f2737e
update comments
3AceShowHand May 8, 2026
b90a0aa
update code
3AceShowHand May 8, 2026
a1b3e84
adjust InitPrivateFields method call
3AceShowHand May 8, 2026
69d3b9c
remove panic on local row
3AceShowHand May 9, 2026
da77713
do not set table info to local row if not routed
3AceShowHand May 9, 2026
ab495f5
rename some fields
3AceShowHand May 9, 2026
1f50909
rename methods
3AceShowHand May 9, 2026
4463961
Merge branch 'master' into table-route-pr6-mysql-sink
3AceShowHand May 11, 2026
7c949d6
Merge branch 'master' into table-route-pr6-mysql-sink
3AceShowHand May 12, 2026
6383df8
add cloned index ids
3AceShowHand May 12, 2026
5abc74b
add more code
3AceShowHand May 12, 2026
bd1a988
lazy init the sql
3AceShowHand May 12, 2026
084b1f8
revert init sql related code
3AceShowHand May 12, 2026
e14b66e
add more code by resolve the review
3AceShowHand May 12, 2026
e9fadd0
add more code by resolve the review
3AceShowHand May 12, 2026
3e35abe
Merge branch 'master' into table-route-pr6-mysql-sink
3AceShowHand May 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 44 additions & 6 deletions downstreamadapter/eventcollector/dispatcher_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ type dispatcherStat struct {
gotSyncpointOnTS atomic.Bool
// tableInfo is the latest table info of the dispatcher's corresponding table.
tableInfo atomic.Value
// tableInfoVersion is the latest table info version of the dispatcher's corresponding table.
// It is updated by ddl event
// tableInfoVersion is the latest schema version delivered to this dispatcher.
// It may advance even when tableInfo is not replaced.
tableInfoVersion atomic.Uint64
}

Expand Down Expand Up @@ -453,15 +453,53 @@ func (d *dispatcherStat) handleSingleDataEvents(events []dispatcher.DispatcherEv
return false
}
events[0].Event = ddl
d.tableInfoVersion.Store(ddl.FinishedTs)
if ddl.TableInfo != nil {
d.tableInfo.Store(ddl.TableInfo)
}
d.updateTableInfoByDDL(ddl)
}
d.updateCommitTsStateByEvents(state, events)
return d.target.HandleEvents(events, func() { d.wake() })
}

// updateTableInfoByDDL advances the table schema version and, when the DDL
// event carries a TableInfo matching the dispatcher's table, refreshes the
// cached TableInfo used for DML row assembly.
//
// Must be called from the per-dispatcher event loop (handleSingleDataEvents),
// which guarantees serial access to dispatcherStat fields for a given table.
func (d *dispatcherStat) updateTableInfoByDDL(ddl *commonEvent.DDLEvent) {
tableSpan := d.target.GetTableSpan()
if tableSpan == nil || tableSpan.TableID == common.DDLSpanTableID {
return
}

// EXCHANGE PARTITION can change the schema version of a physical table dispatcher
// while ddl.TableInfo carries another logical table. The storage sink uses
// tableInfoVersion to decide whether a DML belongs to an old schema, so advance
// it for every DDL delivered to this dispatcher.
// TODO: Revisit whether the storage sink should discard DML solely by comparing
// tableInfoVersion with existing schema files.
d.tableInfoVersion.Store(ddl.FinishedTs)

if ddl.TableInfo == nil {
return
}

// A table dispatcher can receive DDLs unrelated to its own table for barrier
// coordination, for example CREATE VIEW is tracked in every table's DDL history.
// The cached table info is used to assemble subsequent DML rows. For partition
// tables, the dispatcher span ID is a physical partition ID while TableInfo
// carries the logical table ID, so compare with the cached table info first.
expectedTableID := tableSpan.TableID
current := d.tableInfo.Load()
if current != nil {
expectedTableID = current.(*common.TableInfo).TableName.TableID
}
if ddl.TableInfo.TableName.TableID != expectedTableID {
return
}

d.tableInfo.Store(ddl.TableInfo)
}

func (d *dispatcherStat) handleDataEvents(events ...dispatcher.DispatcherEvent) bool {
switch events[0].GetType() {
case commonEvent.TypeDMLEvent,
Expand Down
85 changes: 40 additions & 45 deletions downstreamadapter/eventcollector/dispatcher_stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type mockDispatcher struct {
handleError func(err error)
events []dispatcher.DispatcherEvent
checkPointTs uint64
tableSpan *heartbeatpb.TableSpan

skipSyncpointAtStartTs bool
router routing.Router
Expand Down Expand Up @@ -88,6 +89,9 @@ func (m *mockDispatcher) GetChangefeedID() common.ChangeFeedID {
}

func (m *mockDispatcher) GetTableSpan() *heartbeatpb.TableSpan {
if m.tableSpan != nil {
return m.tableSpan
}
return &heartbeatpb.TableSpan{
TableID: 1,
}
Expand Down Expand Up @@ -1550,57 +1554,48 @@ func TestRegisterTo(t *testing.T) {
}

func TestHandleDDLEventTableInfoUpdate(t *testing.T) {
t.Parallel()
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()
helper.Tk().MustExec("use test")

tableDDL := helper.DDL2Event("CREATE TABLE `products` (`id` INT PRIMARY KEY)")
viewDDL := helper.DDL2Event("CREATE VIEW `transient_view` AS SELECT 1 AS `id`")

localServerID := node.ID("local")
remoteServerID := node.ID("remote")

t.Run("stores ddl table info", func(t *testing.T) {
var capturedEvent *commonEvent.DDLEvent
mockDisp := newMockDispatcher(common.NewDispatcherID(), 0)
mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) bool {
if len(events) > 0 {
capturedEvent = events[0].Event.(*commonEvent.DDLEvent)
}
return false
}

stat := newDispatcherStat(mockDisp, newTestEventCollector(localServerID), nil)
stat.session.connState.setEventServiceID(remoteServerID)
stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs()))
stat.lastEventCommitTs.Store(50)
mockDisp := newMockDispatcher(common.NewDispatcherID(), 0)
mockDisp.tableSpan = &heartbeatpb.TableSpan{TableID: tableDDL.TableInfo.TableName.TableID}
mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) bool {
return false
}

tableInfo := &common.TableInfo{
TableName: common.TableName{
Schema: "source_db",
Table: "users",
TableID: 1,
},
}
stat := newDispatcherStat(mockDisp, newTestEventCollector(localServerID), nil)
stat.session.connState.setEventServiceID(remoteServerID)
stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs()))
stat.lastEventCommitTs.Store(50)

ddlEvent := &commonEvent.DDLEvent{
Version: commonEvent.DDLEventVersion1,
Query: "ALTER TABLE `source_db`.`users` ADD COLUMN `c1` INT",
FinishedTs: 100,
Epoch: 10,
Seq: 2,
TableInfo: tableInfo,
}
tableDDL.Epoch = 10
tableDDL.Seq = 2
stat.handleDataEvents(dispatcher.DispatcherEvent{From: &remoteServerID, Event: tableDDL})

storedTableInfo := stat.tableInfo.Load().(*common.TableInfo)
require.NotNil(t, storedTableInfo)
require.Same(t, tableDDL.TableInfo, storedTableInfo)
require.Equal(t, "test", storedTableInfo.TableName.Schema)
require.Equal(t, "products", storedTableInfo.TableName.Table)
require.Equal(t, tableDDL.TableInfo.TableName.TableID, storedTableInfo.TableName.TableID)
require.Equal(t, tableDDL.FinishedTs, stat.tableInfoVersion.Load())
require.Len(t, mockDisp.events, 1)
require.Same(t, tableDDL, mockDisp.events[0].Event)

events := []dispatcher.DispatcherEvent{
{From: &remoteServerID, Event: ddlEvent},
}
viewDDL.Epoch = 10
viewDDL.Seq = 3
stat.handleDataEvents(dispatcher.DispatcherEvent{From: &remoteServerID, Event: viewDDL})

stat.handleDataEvents(events...)

storedTableInfo := stat.tableInfo.Load().(*common.TableInfo)
require.NotNil(t, storedTableInfo)
require.Same(t, tableInfo, storedTableInfo)
require.Equal(t, "source_db", storedTableInfo.TableName.Schema)
require.Equal(t, "users", storedTableInfo.TableName.Table)
require.Equal(t, int64(1), storedTableInfo.TableName.TableID)
require.Equal(t, uint64(100), stat.tableInfoVersion.Load())
require.NotNil(t, capturedEvent)
require.Same(t, ddlEvent, capturedEvent)
})
storedTableInfo = stat.tableInfo.Load().(*common.TableInfo)
require.Same(t, tableDDL.TableInfo, storedTableInfo)
require.Equal(t, viewDDL.FinishedTs, stat.tableInfoVersion.Load())
require.Len(t, mockDisp.events, 2)
require.Same(t, viewDDL, mockDisp.events[1].Event)
}
119 changes: 117 additions & 2 deletions downstreamadapter/routing/ddl_query_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (r Router) rewriteParserBackedDDLQuery(ddl *commonEvent.DDLEvent) (string,
)
for i := range queries {
query := queries[i]
newQuery, err := r.rewriteSingleDDLQuery(query)
newQuery, err := r.rewriteSingleDDLQuery(query, ddl.GetSchemaName(), ddl.BlockedTableNames)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -78,7 +78,11 @@ func splitMultiStmtDDLQuery(query string) ([]string, error) {
return queries, nil
}

func (r Router) rewriteSingleDDLQuery(query string) (string, error) {
func (r Router) rewriteSingleDDLQuery(
query string,
defaultSchema string,
blockedTableNames []commonEvent.SchemaTableName,
) (string, error) {
p := parser.New()
stmt, err := p.ParseOneStmt(query, "", "")
if err != nil {
Expand All @@ -89,6 +93,10 @@ func (r Router) rewriteSingleDDLQuery(query string) (string, error) {
if len(sourceTables) == 0 {
return query, nil
}
if err := resolveUnqualifiedReferences(stmt, sourceTables, blockedTableNames); err != nil {
return "", err
}
fillDefaultSchema(sourceTables, defaultSchema)

var (
routed bool
Expand Down Expand Up @@ -119,6 +127,113 @@ func (r Router) rewriteSingleDDLQuery(query string) (string, error) {
return newQuery, nil
}

func fillDefaultSchema(tables []commonEvent.SchemaTableName, defaultSchema string) {
if defaultSchema == "" {
return
}

for i := range tables {
if tables[i].SchemaName == "" && tables[i].TableName != "" {
tables[i].SchemaName = defaultSchema
}
}
}

// resolveUnqualifiedReferences ensures every table reference extracted from
// the DDL AST carries a non-empty schema name before the router applies
// routing rules. Without this, fillDefaultSchema would later assign the DDL
// event's own schema to every unqualified table, which is wrong when a table
// belongs to a different schema than the DDL target.
//
// It handles two DDL patterns where the parser sees multiple tables and at
// least one reference may lack a schema qualifier:
//
// CREATE TABLE LIKE (ast.CreateTableStmt with non-nil ReferTable):
//
// The ReferTable is the LIKE source. If it has no schema, the router needs
// to know which schema the source table lives in. The blockedTableNames
// parameter carries upstream metadata (e.g. from job.InvolvingSchemaInfo)
// that maps the unqualified table name to its real schema.
//
// Example:
// -- Session is in source_extra_db, DDL creates a table in source_extra_db
// -- but the LIKE source "users" belongs to source_db.
// CREATE TABLE `source_extra_db`.`external_users` LIKE `users`
// -> extractTableNames -> [{source_extra_db, external_users}, {"", users}]
// -> fillDefaultSchema would wrongly set users' schema to source_extra_db
// -> blockedTableNames carries {source_db, users} from upstream metadata
// -> this function patches sourceTables[1].SchemaName = "source_db"
//
// CREATE TABLE AS SELECT / CREATE VIEW (ast.CreateTableStmt with non-nil
//
// Select, or ast.CreateViewStmt):
// The SELECT body may reference tables from other schemas without
// qualifiers. Without upstream schema metadata for every referenced table,
// the router cannot safely determine the correct schema and must reject
// the rewrite.
//
// Example:
// CREATE VIEW `target_db`.`v` AS SELECT `id` FROM `users`
// -> extractTableNames -> [{target_db, v}, {"", users}]
// -> fillDefaultSchema would set users' schema to "target_db" (wrong)
// -> no metadata available to resolve, return ErrTableRoutingFailed
//
// For DDLs where the target table itself has no schema qualifier (e.g. USE
// db + CREATE TABLE t LIKE u), fillDefaultSchema handles both the target
// and source correctly because they all belong to the same session schema.
func resolveUnqualifiedReferences(
stmt ast.StmtNode,
sourceTables []commonEvent.SchemaTableName,
blockedTableNames []commonEvent.SchemaTableName,
) error {
switch s := stmt.(type) {
case *ast.CreateTableStmt:
if s.Table == nil || s.Table.Schema.O == "" {
return nil
}

if s.ReferTable != nil && s.ReferTable.Schema.O != "" && s.ReferTable.Name.O != "" {
for _, blockedTableName := range blockedTableNames {
if blockedTableName.SchemaName != "" &&
strings.EqualFold(blockedTableName.TableName, s.ReferTable.Name.O) &&
len(sourceTables) > 1 {
sourceTables[1].SchemaName = blockedTableName.SchemaName
break
}
}
if len(sourceTables) <= 1 || sourceTables[1].SchemaName == "" {
return errors.ErrTableRoutingFailed.GenWithStack(
"table routing cannot rewrite ddl with unqualified referenced table because upstream default schema is unavailable: %T",
stmt)
}
}
if s.Select != nil && hasUnqualifiedTableName(sourceTables[1:]) {
return errors.ErrTableRoutingFailed.GenWithStack(
"table routing cannot rewrite ddl with unqualified referenced table because upstream default schema is unavailable: %T",
stmt)
}
case *ast.CreateViewStmt:
if s.ViewName == nil || s.ViewName.Schema.O == "" {
return nil
}
if hasUnqualifiedTableName(sourceTables[1:]) {
return errors.ErrTableRoutingFailed.GenWithStack(
"table routing cannot rewrite ddl with unqualified referenced table because upstream default schema is unavailable: %T",
stmt)
}
}
return nil
}

func hasUnqualifiedTableName(tables []commonEvent.SchemaTableName) bool {
for _, table := range tables {
if table.SchemaName == "" && table.TableName != "" {
return true
}
}
return false
}

// tableNameExtractor extracts table names from DDL AST nodes.
// ref: https://github.com/pingcap/tidb/blob/09feccb529be2830944e11f5fed474020f50370f/server/sql_info_fetcher.go#L46
type tableNameExtractor struct {
Expand Down
Loading
Loading