Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions cmd/kafka-consumer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,8 @@ func (p *partitionProgress) updateWatermark(newWatermark uint64, offset kafka.Of
zap.Uint64("watermark", newWatermark))
return
}
readOldOffset := true
if offset > p.watermarkOffset {
readOldOffset = false
}
readOldOffset := offset <= p.watermarkOffset

log.Warn("partition resolved ts fall back, ignore it",
zap.Bool("readOldOffset", readOldOffset),
zap.Int32("partition", p.partition),
Expand Down
5 changes: 3 additions & 2 deletions cmd/oauth2-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ func run(_ *cobra.Command, _ []string) {
}
})))
http.Handle("/.well-known/openid-configuration", logMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(fmt.Sprintf(openIDConfiguration, serverConfig.port, serverConfig.port, serverConfig.port)))
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = fmt.Fprintf(w, openIDConfiguration, serverConfig.port, serverConfig.port, serverConfig.port)
})))
log.Info("starting auth2 server", zap.Int("port", serverConfig.port))
log.Panic("run auth2 server failed", zap.Error(http.ListenAndServe(fmt.Sprintf(":%d", serverConfig.port), nil)))
Expand Down
2 changes: 1 addition & 1 deletion cmd/pulsar-consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (c *consumer) readMessage(ctx context.Context) error {
if !needCommit {
continue
}
err := c.pulsarConsumer.AckID(consumerMsg.Message.ID())
err := c.pulsarConsumer.AckID(consumerMsg.ID())
if err != nil {
log.Panic("Error ack message", zap.Error(err))
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/storage-consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,6 @@ func getRenameTableOldTableKey(tableDef cloudstorage.TableDefinition) (string, b
return "", false
}
schemaName := tableDef.Schema
tableName := tableDef.Table
stmt, err := parser.New().ParseOneStmt(tableDef.Query, "", "")
if err != nil {
log.Panic("parse statement failed", zap.Any("DDL", tableDef.Query), zap.Error(err))
Expand All @@ -576,7 +575,7 @@ func getRenameTableOldTableKey(tableDef cloudstorage.TableDefinition) (string, b
if oldTable.Schema.O != "" {
schemaName = oldTable.Schema.O
}
tableName = oldTable.Name.O
tableName := oldTable.Name.O
return commonType.QuoteSchema(schemaName, tableName), true
}

Expand Down
1 change: 0 additions & 1 deletion coordinator/changefeed/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func TestTableRoutingErrorsFastFail(t *testing.T) {
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
backoff := NewBackoff(common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName), time.Minute*30, 1)
require.True(t, backoff.ShouldRun())
Expand Down
2 changes: 1 addition & 1 deletion coordinator/changefeed/etcd_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestGetAllChangefeeds(t *testing.T) {
require.Nil(t, err)
require.Len(t, resp, 0)

// status unmarshal failed, changefeed will not be ignored, and the checkpiont ts will be the start ts
// status unmarshal failed, changefeed will not be ignored, and the checkpoint ts will be the start ts
// the old version of changefeed without gid
cdcClient.EXPECT().GetChangefeedInfoAndStatus(gomock.Any()).Return(
int64(0),
Expand Down
10 changes: 2 additions & 8 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,10 +723,7 @@ func (c *Controller) RemoveChangefeed(ctx context.Context, id common.ChangeFeedI
count := 0
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
if op.IsFinished() {
break
}
for !op.IsFinished() {
select {
case <-ctx.Done():
return 0, errors.Trace(ctx.Err())
Expand Down Expand Up @@ -764,10 +761,7 @@ func (c *Controller) PauseChangefeed(ctx context.Context, id common.ChangeFeedID
count := 0
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
if op.IsFinished() {
break
}
for !op.IsFinished() {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
Expand Down
8 changes: 4 additions & 4 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,9 +490,9 @@ func (d *BasicDispatcher) isFirstEvent(event commonEvent.Event) bool {
}

func (d *BasicDispatcher) GetHeartBeatInfo(h *HeartBeatInfo) {
h.Watermark.CheckpointTs = d.GetCheckpointTs()
h.Watermark.ResolvedTs = d.GetResolvedTs()
h.Watermark.LastSyncedTs = d.GetLastSyncedTs()
h.CheckpointTs = d.GetCheckpointTs()
h.ResolvedTs = d.GetResolvedTs()
h.LastSyncedTs = d.GetLastSyncedTs()
h.Id = d.GetId()
h.ComponentStatus = d.GetComponentStatus()
h.IsRemoving = d.GetRemovingStatus()
Expand Down Expand Up @@ -591,7 +591,7 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
if log.GetLevel() == zapcore.DebugLevel {
log.Debug("dispatcher receive all event",
zap.Stringer("dispatcher", d.id), zap.Int64("mode", d.mode),
zap.String("eventType", commonEvent.TypeToString(dispatcherEvent.Event.GetType())),
zap.String("eventType", commonEvent.TypeToString(dispatcherEvent.GetType())),
zap.Any("event", dispatcherEvent.Event))
}

Expand Down
6 changes: 3 additions & 3 deletions downstreamadapter/dispatcher/event_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ func (d *EventDispatcher) cache(dispatcherEvents []DispatcherEvent, wakeCallback
zap.Stringer("dispatcher", d.id),
zap.Uint64("dispatcherResolvedTs", d.GetResolvedTs()),
zap.Int("length", len(dispatcherEvents)),
zap.Int("eventType", dispatcherEvents[len(dispatcherEvents)-1].Event.GetType()),
zap.Uint64("commitTs", dispatcherEvents[len(dispatcherEvents)-1].Event.GetCommitTs()),
zap.Int("eventType", dispatcherEvents[len(dispatcherEvents)-1].GetType()),
zap.Uint64("commitTs", dispatcherEvents[len(dispatcherEvents)-1].GetCommitTs()),
zap.Uint64("redoGlobalTs", d.redoGlobalTs.Load()),
)
default:
Expand All @@ -137,7 +137,7 @@ func (d *EventDispatcher) cache(dispatcherEvents []DispatcherEvent, wakeCallback
func (d *EventDispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallback func()) bool {
// if the commit-ts of last event of dispatcherEvents is greater than redoGlobalTs,
// the dispatcherEvents will be cached util the redoGlobalTs is updated.
if d.redoEnable && len(dispatcherEvents) > 0 && d.redoGlobalTs.Load() < dispatcherEvents[len(dispatcherEvents)-1].Event.GetCommitTs() {
if d.redoEnable && len(dispatcherEvents) > 0 && d.redoGlobalTs.Load() < dispatcherEvents[len(dispatcherEvents)-1].GetCommitTs() {
d.cache(dispatcherEvents, wakeCallback)
return true
}
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/dispatchermanager/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ func (e *DispatcherManager) MergeDispatcher(dispatcherIDs []common.DispatcherID,
return e.mergeEventDispatcher(dispatcherIDs, mergedDispatcherID)
}

// mergeEventDispatcher merges the mulitple event dispatchers belonging to the same table with consecutive ranges.
// mergeEventDispatcher merges the multiple event dispatchers belonging to the same table with consecutive ranges.
func (e *DispatcherManager) mergeEventDispatcher(dispatcherIDs []common.DispatcherID, mergedDispatcherID common.DispatcherID) *MergeCheckTask {
// Step 1: check the dispatcherIDs and mergedDispatcherID are valid:
// 1. whether the mergedDispatcherID is not exist in the dispatcherMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func getDispatcherStatus(id common.DispatcherID, dispatcherItem dispatcher.Dispa
return &heartbeatpb.TableSpanStatus{
ID: id.ToPB(),
ComponentStatus: heartBeatInfo.ComponentStatus,
CheckpointTs: heartBeatInfo.Watermark.CheckpointTs,
CheckpointTs: heartBeatInfo.CheckpointTs,
EventSizePerSecond: dispatcherItem.GetEventSizePerSecond(),
Mode: dispatcherItem.GetMode(),
}, nil, &heartBeatInfo.Watermark
Expand Down
3 changes: 0 additions & 3 deletions downstreamadapter/eventcollector/dispatcher_stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,6 @@ func TestHandleBatchDataEvents(t *testing.T) {
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
mockDisp := newMockDispatcher(common.NewDispatcherID(), 0)
Expand Down Expand Up @@ -1152,7 +1151,6 @@ func TestHandleSingleDataEvents(t *testing.T) {
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
mockDisp := newMockDispatcher(common.NewDispatcherID(), 0)
Expand Down Expand Up @@ -1357,7 +1355,6 @@ func TestHandleBatchDMLEvent(t *testing.T) {
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
mockDisp := newMockDispatcher(common.NewDispatcherID(), 0)
Expand Down
1 change: 0 additions & 1 deletion downstreamadapter/routing/ddl_query_rewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,6 @@ func TestRewriteParserBackedDDLQueryWithSemicolonsInLiteralsAndComments(t *testi
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

Expand Down
3 changes: 0 additions & 3 deletions downstreamadapter/routing/router_apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ func TestApplyToDDLEvent(t *testing.T) {
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
routed, err := tc.router.ApplyToDDLEvent(tc.ddl)
require.NoError(t, err)
Expand Down Expand Up @@ -540,7 +539,6 @@ func TestRewriteDDLQueryWithRouting(t *testing.T) {
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
newQuery, err := tc.router.rewriteParserBackedDDLQuery(tc.ddl)
require.NoError(t, err)
Expand Down Expand Up @@ -775,7 +773,6 @@ func TestApplyToDDLEventRejectsParserUnsupportedIndexDDL(t *testing.T) {
}

for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

Expand Down
1 change: 0 additions & 1 deletion downstreamadapter/routing/router_supported_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,6 @@ func TestRewriteDDLQueryWithRoutingSupportsParserBackedDDLTypes(t *testing.T) {
require.GreaterOrEqual(t, len(cases), 39)

for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, byte(tc.action), tc.ddl.Type)

Expand Down
8 changes: 3 additions & 5 deletions downstreamadapter/sink/redo/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,9 @@ func (m *RedoMeta) prepareForFlushMeta() (bool, misc.LogMeta) {
unflushed.CheckpointTs = m.metaCheckpointTs.getUnflushed()
unflushed.ResolvedTs = m.metaResolvedTs.getUnflushed()

hasChange := false
if flushed.CheckpointTs < unflushed.CheckpointTs ||
flushed.ResolvedTs < unflushed.ResolvedTs {
hasChange = true
}
hasChange := flushed.CheckpointTs < unflushed.CheckpointTs ||
flushed.ResolvedTs < unflushed.ResolvedTs

return hasChange, unflushed
}

Expand Down
4 changes: 2 additions & 2 deletions downstreamadapter/syncpoint/sync_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ func CalculateStartSyncPointTs(startTs uint64, syncPointInterval time.Duration,
if syncPointInterval == time.Duration(0) {
return 0
}
k := oracle.GetTimeFromTS(startTs).Sub(time.Unix(0, 0)) / syncPointInterval
k := int64(oracle.GetTimeFromTS(startTs).Sub(time.Unix(0, 0)) / syncPointInterval)
if oracle.GetTimeFromTS(startTs).Sub(time.Unix(0, 0))%syncPointInterval != 0 || oracle.ExtractLogical(startTs) != 0 {
k += 1
} else if skipSyncpointAtStartTs {
k += 1
}
return oracle.GoTimeToTS(time.Unix(0, 0).Add(k * syncPointInterval))
return oracle.GoTimeToTS(time.Unix(0, 0).Add(time.Duration(int64(syncPointInterval) * k)))
Comment thread
3AceShowHand marked this conversation as resolved.
}
11 changes: 3 additions & 8 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,10 +956,7 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com

decoder := e.decoderPool.Get().(*zstd.Decoder)

needCheckSpan := true
if stat.tableSpan.Equal(subStat.tableSpan) {
needCheckSpan = false
}
needCheckSpan := !stat.tableSpan.Equal(subStat.tableSpan)

return &eventStoreIter{
tableSpan: stat.tableSpan,
Expand Down Expand Up @@ -1605,14 +1602,12 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool) {
skippedBytesMetrics.Add(float64(len(value)))
iter.innerIter.Next()
}
isNewTxn := false
isNewTxn := iter.prevCommitTs == 0 || (rawKV.StartTs != iter.prevStartTs || rawKV.CRTs != iter.prevCommitTs)
// 2 PC transactions have different startTs and commitTs.
// async-commit transactions have different startTs and may have the same commitTs.
// at the moment, use commit-ts determine whether it is a new transaction, even though multiple
// different transactions may be grouped together, to satisfy the resolved-ts semantics.
if iter.prevCommitTs == 0 || (rawKV.StartTs != iter.prevStartTs || rawKV.CRTs != iter.prevCommitTs) {
isNewTxn = true
}

iter.prevCommitTs = rawKV.CRTs
iter.prevStartTs = rawKV.StartTs
iter.rowCount++
Expand Down
4 changes: 2 additions & 2 deletions logservice/schemastore/schema_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func TestIgnoreDDLByCommitTs(t *testing.T) {
require.Len(t, tables, 2)
tableNames := make(map[string]struct{})
for _, tbl := range tables {
log.Info("found table", zap.String("name", tbl.SchemaTableName.TableName))
tableNames[tbl.SchemaTableName.TableName] = struct{}{}
log.Info("found table", zap.String("name", tbl.TableName))
tableNames[tbl.TableName] = struct{}{}
}
require.Contains(t, tableNames, "t1")
require.Contains(t, tableNames, "t3")
Expand Down
4 changes: 2 additions & 2 deletions maintainer/barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (b *Barrier) handleBootstrapResponse(bootstrapRespMap map[node.ID]*heartbea
b.blockedEvents.Range(func(key eventKey, barrierEvent *BarrierEvent) bool {
if barrierEvent.allDispatcherReported() {
// it means the dispatchers involved in the block event are all in the cached resp, not restarted.
// so we don't do speical check for this event
// so we don't do special check for this event
// just use usual logic to handle it
// Besides, is the dispatchers are all reported waiting status, it means at least one dispatcher
// is not get acked, so it must be resent by dispatcher later.
Expand Down Expand Up @@ -467,7 +467,7 @@ func (b *Barrier) checkEventFinish(be *BarrierEvent) {
if be.selected.Load() {
log.Info("all dispatchers reported event done, remove event",
zap.String("changefeed", be.cfID.Name()),
zap.Uint64("committs", be.commitTs),
zap.Uint64("commitTs", be.commitTs),
zap.Int64("mode", b.mode))
// already selected a dispatcher to write, now all dispatchers reported the block event
b.blockedEvents.Delete(getEventKey(be.commitTs, be.isSyncPoint))
Expand Down
4 changes: 2 additions & 2 deletions maintainer/operator/operator_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ type MergeDispatcherOperator struct {
func buildMergedSpanInfo(toMergedSpans []*heartbeatpb.TableSpan) string {
var spansInfo strings.Builder
for _, span := range toMergedSpans {
spansInfo.WriteString(fmt.Sprintf("[%s,%s,%d]",
hex.EncodeToString(span.StartKey), hex.EncodeToString(span.EndKey), span.TableID))
fmt.Fprintf(&spansInfo, "[%s,%s,%d]",
hex.EncodeToString(span.StartKey), hex.EncodeToString(span.EndKey), span.TableID)
}
return spansInfo.String()
}
Expand Down
4 changes: 2 additions & 2 deletions maintainer/operator/operator_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func NewSplitDispatcherOperator(
) *SplitDispatcherOperator {
var spansInfo strings.Builder
for _, span := range splitSpans {
spansInfo.WriteString(fmt.Sprintf("[%s,%s]",
hex.EncodeToString(span.StartKey), hex.EncodeToString(span.EndKey)))
fmt.Fprintf(&spansInfo, "[%s,%s]",
hex.EncodeToString(span.StartKey), hex.EncodeToString(span.EndKey))
}
op := &SplitDispatcherOperator{
replicaSet: replicaSet,
Expand Down
2 changes: 1 addition & 1 deletion maintainer/range_checker/table_span_range_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (rc *TableSpanRangeChecker) Detail() string {
buf.WriteString("uncovered tables: ")
for id, span := range rc.tableSpans {
if !span.IsFullyCovered() {
buf.WriteString(fmt.Sprintf("%d,\n", id))
fmt.Fprintf(buf, "%d,\n", id)
}
}
return buf.String()
Expand Down
Loading
Loading