diff --git a/pkg/eventservice/dispatcher_stat.go b/pkg/eventservice/dispatcher_stat.go index 552a684ea2..4fbf45f9a6 100644 --- a/pkg/eventservice/dispatcher_stat.go +++ b/pkg/eventservice/dispatcher_stat.go @@ -430,22 +430,21 @@ type changefeedStatus struct { availableMemoryQuota sync.Map // nodeID -> atomic.Uint64 (memory quota in bytes) minSentTs atomic.Uint64 scanInterval atomic.Int64 + reportBandState atomic.Int32 + fastBandState atomic.Int32 + slowBandState atomic.Int32 - lastAdjustTime atomic.Time - lastTrendAdjustTime atomic.Time - usageWindow *memoryUsageWindow - syncPointInterval time.Duration + scanWindowController *adaptiveScanWindowController + syncPointInterval time.Duration } func newChangefeedStatus(changefeedID common.ChangeFeedID, syncPointInterval time.Duration) *changefeedStatus { status := &changefeedStatus{ - changefeedID: changefeedID, - usageWindow: newMemoryUsageWindow(memoryUsageWindowDuration), - syncPointInterval: syncPointInterval, + changefeedID: changefeedID, + scanWindowController: newAdaptiveScanWindowController(time.Now()), + syncPointInterval: syncPointInterval, } status.scanInterval.Store(int64(defaultScanInterval)) - status.lastAdjustTime.Store(time.Now()) - status.lastTrendAdjustTime.Store(time.Now()) return status } diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 21bdb93641..3e74a55a28 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -1142,9 +1142,7 @@ func (c *eventBroker) removeChangefeedStatus(status *changefeedStatus) { } filter.GetSharedFilterStorage().RemoveFilter(changefeedID) - metrics.EventServiceAvailableMemoryQuotaGaugeVec.DeleteLabelValues(changefeedID.String()) - metrics.EventServiceScanWindowBaseTsGaugeVec.DeleteLabelValues(changefeedID.String()) - metrics.EventServiceScanWindowIntervalGaugeVec.DeleteLabelValues(changefeedID.String()) + deleteScanWindowMetrics(changefeedID.String()) } func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error { @@ -1263,8 +1261,7 @@ func (c *eventBroker) getOrSetChangefeedStatus(info DispatcherInfo) *changefeedS return actual.(*changefeedStatus) } log.Info("new changefeed status", zap.Stringer("changefeedID", changefeedID)) - metrics.EventServiceScanWindowBaseTsGaugeVec.WithLabelValues(changefeedID.String()).Set(0) - metrics.EventServiceScanWindowIntervalGaugeVec.WithLabelValues(changefeedID.String()).Set(defaultScanInterval.Seconds()) + initializeScanWindowMetrics(changefeedID.String()) return status } diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index 4a274e0c12..1b14728515 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -365,24 +365,7 @@ func TestGetScanTaskDataRangeRingWaitWithThreeDispatchersCanAdvancePendingDDL(t require.Equal(t, ts103, dataRange.CommitTsEnd) } -func TestHandleCongestionControlV2AdjustsScanInterval(t *testing.T) { - broker, _, _, _ := newEventBrokerForTest() - defer broker.close() - - changefeedID := common.NewChangefeedID4Test("default", "test") - status := addChangefeedStatusToBrokerForTest(t, broker, changefeedID, time.Second*10) - - status.scanInterval.Store(int64(40 * time.Second)) - status.lastAdjustTime.Store(time.Now()) - - control := event.NewCongestionControlWithVersion(event.CongestionControlVersion2) - control.AddAvailableMemoryWithDispatchersAndUsage(changefeedID.ID(), 0, 1, nil) - broker.handleCongestionControl(node.ID("event-collector-1"), control) - - require.Equal(t, int64(10*time.Second), status.scanInterval.Load()) -} - -func TestHandleCongestionControlV2ResetsScanIntervalOnMemoryRelease(t *testing.T) { +func TestHandleCongestionControlV2DoesNotResetScanIntervalOnMemoryRelease(t *testing.T) { broker, _, _, _ := newEventBrokerForTest() defer broker.close() @@ -395,7 +378,7 @@ func TestHandleCongestionControlV2ResetsScanIntervalOnMemoryRelease(t *testing.T control.AddAvailableMemoryWithDispatchersAndUsageAndReleaseCount(changefeedID.ID(), 0, 0.5, nil, 1) broker.handleCongestionControl(node.ID("event-collector-1"), control) - require.Equal(t, int64(defaultScanInterval), status.scanInterval.Load()) + require.Equal(t, int64(40*time.Second), status.scanInterval.Load()) } func TestHandleCongestionControlV1DoesNotAdjustScanInterval(t *testing.T) { @@ -406,7 +389,6 @@ func TestHandleCongestionControlV1DoesNotAdjustScanInterval(t *testing.T) { status := addChangefeedStatusToBrokerForTest(t, broker, changefeedID, time.Second*10) status.scanInterval.Store(int64(40 * time.Second)) - status.lastAdjustTime.Store(time.Now()) control := event.NewCongestionControl() control.AddAvailableMemoryWithDispatchers(changefeedID.ID(), 0, nil) diff --git a/pkg/eventservice/scan_window.go b/pkg/eventservice/scan_window.go index 46cb9cffb3..5cf3f5ccba 100644 --- a/pkg/eventservice/scan_window.go +++ b/pkg/eventservice/scan_window.go @@ -43,11 +43,6 @@ const ( // this cooldown and are applied immediately. scanIntervalAdjustCooldown = 30 * time.Second - // scanTrendAdjustCooldown is the minimum time between trend-based interval - // adjustments. This is shorter than the general cooldown because trend - // adjustments need to be more responsive to rising memory pressure. - scanTrendAdjustCooldown = 5 * time.Second - // memoryUsageWindowDuration is the duration of the sliding window for // collecting memory usage samples. Samples older than this duration are // pruned from the window. @@ -58,19 +53,94 @@ const ( memoryUsageHighThreshold = 0.7 // memoryUsageCriticalThreshold (90%) triggers an aggressive reduction of - // the scan interval to 1/4 of its current value when memory usage exceeds - // this level. + // the scan interval once memory usage exceeds this level. memoryUsageCriticalThreshold = 0.9 + // memoryUsageEmergencyThreshold (98%) triggers the strongest emergency brake. + memoryUsageEmergencyThreshold = 0.98 + // memoryUsageLowThreshold (20%) allows the scan interval to be increased // by 25% when both max and average memory usage are below this level. memoryUsageLowThreshold = 0.2 + // scanWindowLowPressureFastEMAOffset widens the fast EMA threshold slightly + // for low-pressure recovery decisions. + scanWindowLowPressureFastEMAOffset = 0.03 + + // scanWindowLowPressureSlowEMAOffset widens the slow EMA threshold slightly + // for low-pressure recovery decisions. + scanWindowLowPressureSlowEMAOffset = 0.02 + // memoryUsageVeryLowThreshold (10%) allows the scan interval to be increased // by 50% when both max and average memory usage are below this level. This // increase may exceed the normal sync point interval cap. memoryUsageVeryLowThreshold = 0.1 + // scanWindowModeratePressureThreshold is the smoothed usage threshold that + // starts accumulating pressure score for gradual interval reductions. + scanWindowModeratePressureThreshold = 0.55 + + // scanWindowHighPressureThreshold triggers a stronger but still bounded + // interval reduction when sustained high pressure is observed. + scanWindowHighPressureThreshold = 0.75 + + // scanWindowPressureAdjustCooldown is the minimum time between non-critical + // downward adjustments. It prevents the controller from overreacting before + // previous interval changes have time to take effect. + scanWindowPressureAdjustCooldown = 10 * time.Second + + // scanWindowCriticalBrakeCooldown deduplicates repeated critical brakes + // caused by the same short burst. Without this cooldown, one peak retained + // in the usage window can repeatedly trigger critical_brake on every report. + scanWindowCriticalBrakeCooldown = 10 * time.Second + + // scanWindowReleaseRecoveryCooldown is the minimum time after a downward + // adjustment before the controller is allowed to recover upward again. + scanWindowReleaseRecoveryCooldown = 15 * time.Second + + // scanWindowVeryLowRecoveryCooldown is the minimum time after a recent + // instability event before the controller can re-enter the aggressive + // very_low_recovery path. + scanWindowVeryLowRecoveryCooldown = 90 * time.Second + + // scanWindowFloorRecoveryCooldown allows the controller to escape from the + // default floor faster once the observed pressure has clearly fallen. This + // specifically mitigates feedback-lag cases where a late critical report + // pushes the interval to the floor after the real pressure has already eased. + scanWindowFloorRecoveryCooldown = 5 * time.Second + + // scanWindowEmergencyBrakePlateauInterval keeps the emergency brake + // continuous when transitioning from the small-window moderate brake path to + // the large-window strong brake path. + scanWindowEmergencyBrakePlateauInterval = 3 * defaultScanInterval + + // scanWindowEmergencyMinIntervalUnlockSamples is the minimum number of + // observed samples before emergency pressure is allowed to drive the scan + // window below the default floor toward the minimum interval. + scanWindowEmergencyMinIntervalUnlockSamples = 3 + + // scanWindowFastUsageAlpha controls the responsiveness of the short-term EMA. + scanWindowFastUsageAlpha = 0.4 + + // scanWindowSlowUsageAlpha controls the responsiveness of the long-term EMA. + scanWindowSlowUsageAlpha = 0.2 + + // scanWindowPressureTriggerScore is the score required to trigger a gradual + // downward adjustment under sustained but non-critical pressure. + scanWindowPressureTriggerScore = 3.0 + + // scanWindowPressureScoreCeiling bounds the pressure accumulator. + scanWindowPressureScoreCeiling = 8.0 + + // scanWindowPressureReliefPerRelease is the amount of accumulated pressure + // cleared by one downstream release pulse. + scanWindowPressureReliefPerRelease = 2.0 + + // scanWindowTargetBandLower and scanWindowTargetBandUpper define the desired + // operating region for observed pressure related signals. + scanWindowTargetBandLower = 0.30 + scanWindowTargetBandUpper = 0.50 + // scanWindowStaleDispatcherHeartbeatThreshold is the duration after which a // dispatcher is treated as stale for scan window base ts calculation if it // hasn't sent heartbeat updates. This prevents stale dispatchers (for example, @@ -102,12 +172,76 @@ type memoryUsageStats struct { cnt int } +type scanWindowReport struct { + usageRatio float64 + memoryReleaseCount uint32 +} + +type scanWindowDecisionReason string + +const ( + scanWindowDecisionNone scanWindowDecisionReason = "none" + scanWindowDecisionCriticalBrake scanWindowDecisionReason = "critical_brake" + scanWindowDecisionHighPressure scanWindowDecisionReason = "high_pressure" + scanWindowDecisionSustainedPressure scanWindowDecisionReason = "sustained_pressure" + scanWindowDecisionLowRecovery scanWindowDecisionReason = "low_recovery" + scanWindowDecisionVeryLowRecovery scanWindowDecisionReason = "very_low_recovery" +) + +type scanWindowDecision struct { + newInterval time.Duration + maxInterval time.Duration + reason scanWindowDecisionReason + usage memoryUsageStats + fastUsageEMA float64 + slowUsageEMA float64 + pressureScore float64 +} + +type scanWindowBandState int32 + +const ( + scanWindowBandUnknown scanWindowBandState = iota + scanWindowBandBelow + scanWindowBandIn + scanWindowBandAbove +) + +type scanWindowController interface { + OnCongestionReport(now time.Time, currentInterval time.Duration, maxInterval time.Duration, report scanWindowReport) scanWindowDecision +} + +type adaptiveScanWindowController struct { + mu sync.Mutex + + usageWindow *memoryUsageWindow + + lastAdjustTime time.Time + lastDownAdjustTime time.Time + lastCriticalTime time.Time + lastInstabilityTime time.Time + + fastUsageEMA float64 + slowUsageEMA float64 + emaInitialized bool + + pressureScore float64 +} + func newMemoryUsageWindow(window time.Duration) *memoryUsageWindow { return &memoryUsageWindow{ window: window, } } +func newAdaptiveScanWindowController(now time.Time) *adaptiveScanWindowController { + return &adaptiveScanWindowController{ + usageWindow: newMemoryUsageWindow(memoryUsageWindowDuration), + lastAdjustTime: now, + lastDownAdjustTime: now, + } +} + func (w *memoryUsageWindow) addSample(now time.Time, ratio float64) { if ratio < 0 { ratio = 0 @@ -166,150 +300,531 @@ func (w *memoryUsageWindow) pruneLocked(now time.Time) { } func (c *changefeedStatus) updateMemoryUsage(now time.Time, usageRatio float64, memoryReleaseCount uint32) { - if c.usageWindow == nil { + if c.scanWindowController == nil { return } - if usageRatio != usageRatio || usageRatio < 0 { - usageRatio = 0 + normalizedUsageRatio := normalizeUsageRatio(usageRatio) + current := time.Duration(c.scanInterval.Load()) + decision := c.scanWindowController.OnCongestionReport(now, current, c.maxScanInterval(), scanWindowReport{ + usageRatio: normalizedUsageRatio, + memoryReleaseCount: memoryReleaseCount, + }) + c.observeScanWindowControllerMetrics(normalizedUsageRatio, memoryReleaseCount, current, decision) + if decision.newInterval == current { + return } - if usageRatio > 1 { - usageRatio = 1 + + c.scanInterval.Store(int64(decision.newInterval)) + metrics.EventServiceScanWindowIntervalGaugeVec.WithLabelValues(c.changefeedID.String()).Set(decision.newInterval.Seconds()) + + log.Info("scan interval adjusted", + zap.Stringer("changefeedID", c.changefeedID), + zap.String("reason", string(decision.reason)), + zap.Duration("oldInterval", current), + zap.Duration("newInterval", decision.newInterval), + zap.Duration("maxInterval", decision.maxInterval), + zap.Float64("avgUsage", decision.usage.avg), + zap.Float64("maxUsage", decision.usage.max), + zap.Float64("firstUsage", decision.usage.first), + zap.Float64("lastUsage", decision.usage.last), + zap.Float64("fastUsageEMA", decision.fastUsageEMA), + zap.Float64("slowUsageEMA", decision.slowUsageEMA), + zap.Float64("pressureScore", decision.pressureScore), + zap.Uint32("memoryReleaseCount", memoryReleaseCount), + zap.Bool("syncPointEnabled", c.isSyncpointEnabled()), + zap.Duration("syncPointInterval", c.syncPointInterval)) +} + +func initializeScanWindowMetrics(changefeed string) { + metrics.EventServiceScanWindowBaseTsGaugeVec.WithLabelValues(changefeed).Set(0) + metrics.EventServiceScanWindowIntervalGaugeVec.WithLabelValues(changefeed).Set(defaultScanInterval.Seconds()) + metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "report").Set(0) + metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "avg").Set(0) + metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "max").Set(0) + metrics.EventServiceScanWindowUsageEMAGaugeVec.WithLabelValues(changefeed, "fast").Set(0) + metrics.EventServiceScanWindowUsageEMAGaugeVec.WithLabelValues(changefeed, "slow").Set(0) + metrics.EventServiceScanWindowTargetBandGaugeVec.WithLabelValues(changefeed, "report").Set(0) + metrics.EventServiceScanWindowTargetBandGaugeVec.WithLabelValues(changefeed, "fast").Set(0) + metrics.EventServiceScanWindowTargetBandGaugeVec.WithLabelValues(changefeed, "slow").Set(0) + metrics.EventServiceScanWindowPressureScoreGaugeVec.WithLabelValues(changefeed).Set(0) +} + +func deleteScanWindowMetrics(changefeed string) { + metrics.EventServiceAvailableMemoryQuotaGaugeVec.DeleteLabelValues(changefeed) + metrics.EventServiceScanWindowBaseTsGaugeVec.DeleteLabelValues(changefeed) + metrics.EventServiceScanWindowIntervalGaugeVec.DeleteLabelValues(changefeed) + metrics.EventServiceScanWindowUsageRatioGaugeVec.DeleteLabelValues(changefeed, "report") + metrics.EventServiceScanWindowUsageRatioGaugeVec.DeleteLabelValues(changefeed, "avg") + metrics.EventServiceScanWindowUsageRatioGaugeVec.DeleteLabelValues(changefeed, "max") + metrics.EventServiceScanWindowUsageEMAGaugeVec.DeleteLabelValues(changefeed, "fast") + metrics.EventServiceScanWindowUsageEMAGaugeVec.DeleteLabelValues(changefeed, "slow") + metrics.EventServiceScanWindowTargetBandGaugeVec.DeleteLabelValues(changefeed, "report") + metrics.EventServiceScanWindowTargetBandGaugeVec.DeleteLabelValues(changefeed, "fast") + metrics.EventServiceScanWindowTargetBandGaugeVec.DeleteLabelValues(changefeed, "slow") + metrics.EventServiceScanWindowTargetBandCrossCount.DeleteLabelValues(changefeed, "report") + metrics.EventServiceScanWindowTargetBandCrossCount.DeleteLabelValues(changefeed, "fast") + metrics.EventServiceScanWindowTargetBandCrossCount.DeleteLabelValues(changefeed, "slow") + metrics.EventServiceScanWindowPressureScoreGaugeVec.DeleteLabelValues(changefeed) + metrics.EventServiceScanWindowMemoryReleaseCount.DeleteLabelValues(changefeed) + for _, reason := range []scanWindowDecisionReason{ + scanWindowDecisionNone, + scanWindowDecisionCriticalBrake, + scanWindowDecisionHighPressure, + scanWindowDecisionSustainedPressure, + scanWindowDecisionLowRecovery, + scanWindowDecisionVeryLowRecovery, + } { + metrics.EventServiceScanWindowAdjustCount.DeleteLabelValues(changefeed, string(reason)) } +} +func (c *changefeedStatus) observeScanWindowControllerMetrics( + usageRatio float64, + memoryReleaseCount uint32, + current time.Duration, + decision scanWindowDecision, +) { + changefeed := c.changefeedID.String() + metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "report").Set(usageRatio) + metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "avg").Set(decision.usage.avg) + metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "max").Set(decision.usage.max) + metrics.EventServiceScanWindowUsageEMAGaugeVec.WithLabelValues(changefeed, "fast").Set(decision.fastUsageEMA) + metrics.EventServiceScanWindowUsageEMAGaugeVec.WithLabelValues(changefeed, "slow").Set(decision.slowUsageEMA) + c.observeScanWindowTargetBandMetrics(changefeed, "report", usageRatio, &c.reportBandState) + c.observeScanWindowTargetBandMetrics(changefeed, "fast", decision.fastUsageEMA, &c.fastBandState) + c.observeScanWindowTargetBandMetrics(changefeed, "slow", decision.slowUsageEMA, &c.slowBandState) + metrics.EventServiceScanWindowPressureScoreGaugeVec.WithLabelValues(changefeed).Set(decision.pressureScore) if memoryReleaseCount > 0 { - c.resetScanIntervalToDefault(now) - c.usageWindow.reset() - c.usageWindow.addSample(now, usageRatio) - return + metrics.EventServiceScanWindowMemoryReleaseCount.WithLabelValues(changefeed).Add(float64(memoryReleaseCount)) + } + if decision.newInterval != current { + metrics.EventServiceScanWindowAdjustCount.WithLabelValues(changefeed, string(decision.reason)).Inc() } - - c.usageWindow.addSample(now, usageRatio) - stats := c.usageWindow.stats(now) - c.adjustScanInterval(now, stats) } -func (c *changefeedStatus) resetScanIntervalToDefault(now time.Time) { - current := time.Duration(c.scanInterval.Load()) - if current != defaultScanInterval { - c.scanInterval.Store(int64(defaultScanInterval)) - metrics.EventServiceScanWindowIntervalGaugeVec.WithLabelValues(c.changefeedID.String()).Set(defaultScanInterval.Seconds()) +func (c *changefeedStatus) observeScanWindowTargetBandMetrics( + changefeed string, + metricType string, + value float64, + state *atomic.Int32, +) { + currentState := classifyScanWindowBandState(value) + if currentState == scanWindowBandIn { + metrics.EventServiceScanWindowTargetBandGaugeVec.WithLabelValues(changefeed, metricType).Set(1) + } else { + metrics.EventServiceScanWindowTargetBandGaugeVec.WithLabelValues(changefeed, metricType).Set(0) + } - log.Info("scan interval reset to default", - zap.Stringer("changefeedID", c.changefeedID), - zap.Duration("oldInterval", current), - zap.Duration("newInterval", defaultScanInterval)) + previousState := scanWindowBandState(state.Load()) + if previousState != scanWindowBandUnknown && previousState != currentState { + metrics.EventServiceScanWindowTargetBandCrossCount.WithLabelValues(changefeed, metricType).Inc() } + state.Store(int32(currentState)) +} - c.lastAdjustTime.Store(now) - c.lastTrendAdjustTime.Store(now) +func classifyScanWindowBandState(value float64) scanWindowBandState { + switch { + case value < scanWindowTargetBandLower: + return scanWindowBandBelow + case value > scanWindowTargetBandUpper: + return scanWindowBandAbove + default: + return scanWindowBandIn + } } -// Constants for trend detection and increase eligibility. const ( - minTrendSamples = 4 // Minimum samples needed to detect a valid trend - increasingTrendEpsilon = 0.02 // Minimum delta to consider as "increasing" - increasingTrendStartRatio = 0.3 // Threshold (30%) above which trend damping kicks in - minIncreaseSamples = 10 // Minimum samples needed before allowing increase minIncreaseSpanNumerator = 4 // Observation span must be at least 4/5 of window minIncreaseSpanDenominator = 5 ) -// adjustScanInterval dynamically adjusts the scan interval based on memory pressure. -// -// Algorithm overview: -// - "Fast brake, slow accelerate": Decreases are applied immediately when memory -// pressure is high, while increases require cooldown periods and stable conditions. -// - Tiered response: Different thresholds trigger different adjustment magnitudes. -// - Trend prediction: Detects rising memory pressure early and proactively reduces -// the interval before hitting critical thresholds. -// -// Thresholds and actions: -// - Critical (>90%): Reduce interval to 1/4 (aggressive) -// - High (>70%): Reduce interval to 1/2 -// - Trend damping (>30% AND rising): Reduce interval by 10% -// - Low (<30% max AND avg): Increase interval by 25% -// - Very low (<10% max AND avg): Increase interval by 50%, may exceed normal cap -func (c *changefeedStatus) adjustScanInterval(now time.Time, usage memoryUsageStats) { - current := time.Duration(c.scanInterval.Load()) +func (c *adaptiveScanWindowController) OnCongestionReport(now time.Time, current time.Duration, maxInterval time.Duration, report scanWindowReport) scanWindowDecision { + c.mu.Lock() + defer c.mu.Unlock() + if current <= 0 { current = defaultScanInterval } - maxInterval := c.maxScanInterval() if maxInterval < minScanInterval { maxInterval = minScanInterval } - // Trend detection: check if memory usage is rising over the observation window. - // This enables proactive intervention before hitting high thresholds. - trendDelta := usage.last - usage.first - isIncreasing := usage.cnt >= minTrendSamples && trendDelta > increasingTrendEpsilon - isAboveTrendStart := usage.last > increasingTrendStartRatio - canAdjustOnTrend := now.Sub(c.lastTrendAdjustTime.Load()) >= scanTrendAdjustCooldown - shouldDampOnTrend := isAboveTrendStart && isIncreasing && canAdjustOnTrend - - // Increase eligibility: conservative conditions to prevent oscillation. - // Requires: cooldown passed, enough samples, sufficient observation span, - // and NOT in an increasing trend situation (to avoid fighting against pressure). + + c.usageWindow.addSample(now, report.usageRatio) + usage := c.usageWindow.stats(now) + c.updateUsageEMALocked(report.usageRatio) + + if decision, ok := c.tryCriticalBrakeLocked(now, current, maxInterval, usage); ok { + return decision + } + + c.updatePressureScoreLocked(usage) + if report.memoryReleaseCount > 0 { + c.relievePressureLocked(report.memoryReleaseCount) + } + + if c.shouldReduceForHighPressureLocked(now, usage) { + newInterval := scanWindowPressureInterval(current, max(scaleDuration(current, 3, 4), defaultScanInterval)) + c.noteAdjustmentLocked(now, true) + return scanWindowDecision{ + newInterval: newInterval, + maxInterval: maxInterval, + reason: scanWindowDecisionHighPressure, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + } + } + + if c.shouldReduceForSustainedPressureLocked(now, usage) { + newInterval := scanWindowPressureInterval(current, max(scaleDuration(current, 9, 10), defaultScanInterval)) + c.noteAdjustmentLocked(now, true) + return scanWindowDecision{ + newInterval: newInterval, + maxInterval: maxInterval, + reason: scanWindowDecisionSustainedPressure, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + } + } + + if c.shouldRecoverFromFloorLocked(now, current, usage) { + newInterval := min(scaleDuration(current, 5, 4), maxInterval) + if newInterval > current { + c.noteAdjustmentLocked(now, false) + return scanWindowDecision{ + newInterval: newInterval, + maxInterval: maxInterval, + reason: scanWindowDecisionLowRecovery, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + } + } + } + + if !c.allowedToIncreaseLocked(now, usage) { + return scanWindowDecision{ + newInterval: current, + maxInterval: maxInterval, + reason: scanWindowDecisionNone, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + } + } + + if c.isVeryLowPressureLocked(usage) && c.allowedToVeryLowRecoverLocked(now) { + effectiveMaxInterval := maxScanInterval + numerator, denominator := scanWindowVeryLowRecoveryScale(current) + newInterval := min(scaleDuration(current, numerator, denominator), effectiveMaxInterval) + if newInterval > current { + c.noteAdjustmentLocked(now, false) + return scanWindowDecision{ + newInterval: newInterval, + maxInterval: effectiveMaxInterval, + reason: scanWindowDecisionVeryLowRecovery, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + } + } + } + + if current < maxInterval && c.isLowPressureLocked(usage) { + numerator, denominator := scanWindowLowRecoveryScale(current) + newInterval := min(scaleDuration(current, numerator, denominator), maxInterval) + if newInterval > current { + c.noteAdjustmentLocked(now, false) + return scanWindowDecision{ + newInterval: newInterval, + maxInterval: maxInterval, + reason: scanWindowDecisionLowRecovery, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + } + } + } + + return scanWindowDecision{ + newInterval: current, + maxInterval: maxInterval, + reason: scanWindowDecisionNone, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + } +} + +func scanWindowVeryLowRecoveryScale(current time.Duration) (numerator int64, denominator int64) { + switch { + case current >= 120*time.Second: + return 11, 10 + case current >= 60*time.Second: + return 6, 5 + default: + return 3, 2 + } +} + +func scanWindowEmergencyBrakeInterval(current time.Duration, allowMinInterval bool) time.Duration { + if current <= defaultScanInterval && allowMinInterval { + return max(current/2, minScanInterval) + } + if current <= 6*defaultScanInterval { + return scanWindowPressureInterval(current, max(current/2, defaultScanInterval)) + } + return max(current/4, scanWindowEmergencyBrakePlateauInterval) +} + +func scanWindowPressureInterval(current time.Duration, next time.Duration) time.Duration { + return min(next, current) +} + +func scanWindowLowRecoveryScale(current time.Duration) (numerator int64, denominator int64) { + switch { + case current >= 120*time.Second: + return 21, 20 + case current >= 60*time.Second: + return 11, 10 + default: + return 5, 4 + } +} + +func (c *adaptiveScanWindowController) tryCriticalBrakeLocked( + now time.Time, + current time.Duration, + maxInterval time.Duration, + usage memoryUsageStats, +) (scanWindowDecision, bool) { + if now.Sub(c.lastCriticalTime) < scanWindowCriticalBrakeCooldown { + return scanWindowDecision{}, false + } + + switch { + case usage.last > memoryUsageEmergencyThreshold: + newInterval := scanWindowEmergencyBrakeInterval(current, c.shouldAllowEmergencyMinIntervalLocked(current, usage)) + c.lastCriticalTime = now + c.noteAdjustmentLocked(now, true) + return scanWindowDecision{ + newInterval: newInterval, + maxInterval: maxInterval, + reason: scanWindowDecisionCriticalBrake, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + }, true + case usage.last > memoryUsageCriticalThreshold: + if c.shouldGraceCriticalBrakeLocked(usage) { + return scanWindowDecision{}, false + } + newInterval := scanWindowPressureInterval(current, max(current/2, defaultScanInterval)) + c.lastCriticalTime = now + c.noteAdjustmentLocked(now, true) + return scanWindowDecision{ + newInterval: newInterval, + maxInterval: maxInterval, + reason: scanWindowDecisionCriticalBrake, + usage: usage, + fastUsageEMA: c.fastUsageEMA, + slowUsageEMA: c.slowUsageEMA, + pressureScore: c.pressureScore, + }, true + default: + return scanWindowDecision{}, false + } +} + +func (c *adaptiveScanWindowController) shouldGraceCriticalBrakeLocked(usage memoryUsageStats) bool { + return usage.last < memoryUsageEmergencyThreshold && + c.fastUsageEMA < 0.85 && + c.slowUsageEMA < scanWindowHighPressureThreshold && + usage.max < memoryUsageEmergencyThreshold +} + +func (c *adaptiveScanWindowController) shouldAllowEmergencyMinIntervalLocked( + current time.Duration, + usage memoryUsageStats, +) bool { + return current <= defaultScanInterval && + usage.cnt >= scanWindowEmergencyMinIntervalUnlockSamples && + c.fastUsageEMA >= memoryUsageCriticalThreshold +} + +func (c *adaptiveScanWindowController) shouldRecoverFromFloorLocked( + now time.Time, + current time.Duration, + usage memoryUsageStats, +) bool { + if current > defaultScanInterval { + return false + } + if now.Sub(c.lastAdjustTime) < scanWindowFloorRecoveryCooldown { + return false + } + if now.Sub(c.lastDownAdjustTime) < scanWindowFloorRecoveryCooldown { + return false + } + if usage.cnt < 3 { + return false + } + + return usage.last < 0.35 && + usage.avg < scanWindowModeratePressureThreshold && + c.fastUsageEMA < 0.45 && + c.slowUsageEMA < 0.40 && + c.pressureScore < 1.5 +} + +func (c *adaptiveScanWindowController) updateUsageEMALocked(value float64) { + if !c.emaInitialized { + c.fastUsageEMA = value + c.slowUsageEMA = value + c.emaInitialized = true + return + } + c.fastUsageEMA = ema(c.fastUsageEMA, value, scanWindowFastUsageAlpha) + c.slowUsageEMA = ema(c.slowUsageEMA, value, scanWindowSlowUsageAlpha) +} + +func (c *adaptiveScanWindowController) updatePressureScoreLocked(usage memoryUsageStats) { + switch { + case c.fastUsageEMA >= scanWindowHighPressureThreshold || + c.slowUsageEMA >= scanWindowHighPressureThreshold || + usage.max >= memoryUsageHighThreshold: + c.pressureScore = min(c.pressureScore+2, scanWindowPressureScoreCeiling) + case c.fastUsageEMA >= scanWindowModeratePressureThreshold || + c.slowUsageEMA >= scanWindowModeratePressureThreshold || + usage.avg >= scanWindowModeratePressureThreshold: + c.pressureScore = min(c.pressureScore+1, scanWindowPressureScoreCeiling) + case c.fastUsageEMA < 0.30 && c.slowUsageEMA < 0.25 && usage.last < 0.30: + c.pressureScore = max(0.0, c.pressureScore-1.5) + default: + c.pressureScore = max(0.0, c.pressureScore-0.5) + } +} + +func (c *adaptiveScanWindowController) relievePressureLocked(memoryReleaseCount uint32) { + relief := min(float64(memoryReleaseCount)*scanWindowPressureReliefPerRelease, scanWindowPressureScoreCeiling) + c.pressureScore = max(0.0, c.pressureScore-relief) +} + +func (c *adaptiveScanWindowController) shouldReduceForHighPressureLocked(now time.Time, usage memoryUsageStats) bool { + if now.Sub(c.lastDownAdjustTime) < scanWindowPressureAdjustCooldown { + return false + } + + return c.fastUsageEMA >= scanWindowHighPressureThreshold || + c.slowUsageEMA >= scanWindowHighPressureThreshold || + usage.max >= memoryUsageHighThreshold +} + +func (c *adaptiveScanWindowController) shouldReduceForSustainedPressureLocked(now time.Time, usage memoryUsageStats) bool { + if now.Sub(c.lastDownAdjustTime) < scanWindowPressureAdjustCooldown { + return false + } + if c.pressureScore < scanWindowPressureTriggerScore { + return false + } + return c.fastUsageEMA >= scanWindowModeratePressureThreshold || + c.slowUsageEMA >= scanWindowModeratePressureThreshold || + usage.avg >= scanWindowModeratePressureThreshold +} + +func (c *adaptiveScanWindowController) allowedToIncreaseLocked(now time.Time, usage memoryUsageStats) bool { minIncreaseSpan := memoryUsageWindowDuration * minIncreaseSpanNumerator / minIncreaseSpanDenominator - allowedToIncrease := now.Sub(c.lastAdjustTime.Load()) >= scanIntervalAdjustCooldown && + return now.Sub(c.lastAdjustTime) >= scanIntervalAdjustCooldown && + now.Sub(c.lastDownAdjustTime) >= scanWindowReleaseRecoveryCooldown && usage.cnt >= minIncreaseSamples && usage.span >= minIncreaseSpan && - !(isAboveTrendStart && isIncreasing) + c.pressureScore < 1 +} - // Determine the new interval based on memory pressure levels. - // Priority order: critical > high > trend damping > very low > low - adjustedOnTrend := false - newInterval := current - switch { - case usage.last > memoryUsageCriticalThreshold || usage.max > memoryUsageCriticalThreshold: - // Critical pressure: aggressive reduction to 1/4 - newInterval = max(current/4, minScanInterval) - case usage.last > memoryUsageHighThreshold || usage.max > memoryUsageHighThreshold: - // High pressure: reduce to 1/2 - newInterval = max(current/2, minScanInterval) - case shouldDampOnTrend: - // Trend damping: pressure is moderate (>30%) but rising. Reduce by 10% to - // preemptively slow down before downstream gets overwhelmed. - newInterval = max(scaleDuration(current, 9, 10), minScanInterval) - adjustedOnTrend = true - case allowedToIncrease && usage.max < memoryUsageVeryLowThreshold && usage.avg < memoryUsageVeryLowThreshold: - // Very low pressure (<20%): increase by 50%, allowed to exceed sync point cap. - maxInterval = maxScanInterval - newInterval = min(scaleDuration(current, 3, 2), maxInterval) - case allowedToIncrease && usage.max < memoryUsageLowThreshold && usage.avg < memoryUsageLowThreshold: - // Low pressure (<40%): increase by 25%, capped by sync point interval. - newInterval = min(scaleDuration(current, 5, 4), maxInterval) - } - - // Anti-oscillation guard: decreases are always applied immediately, - // but increases are blocked if cooldown conditions aren't met. - if newInterval > current && !allowedToIncrease { - return +func (c *adaptiveScanWindowController) allowedToVeryLowRecoverLocked(now time.Time) bool { + if c.lastInstabilityTime.IsZero() { + return true + } + return now.Sub(c.lastInstabilityTime) >= scanWindowVeryLowRecoveryCooldown +} + +func (c *adaptiveScanWindowController) isVeryLowPressureLocked(usage memoryUsageStats) bool { + return usage.max < memoryUsageVeryLowThreshold && + usage.avg < memoryUsageVeryLowThreshold && + c.fastUsageEMA < memoryUsageVeryLowThreshold && + c.slowUsageEMA < memoryUsageVeryLowThreshold +} + +func (c *adaptiveScanWindowController) isLowPressureLocked(usage memoryUsageStats) bool { + return usage.max < memoryUsageLowThreshold && + usage.avg < memoryUsageLowThreshold && + c.fastUsageEMA < memoryUsageLowThreshold+scanWindowLowPressureFastEMAOffset && + c.slowUsageEMA < memoryUsageLowThreshold+scanWindowLowPressureSlowEMAOffset +} + +func (c *adaptiveScanWindowController) noteAdjustmentLocked(now time.Time, downward bool) { + c.lastAdjustTime = now + if downward { + c.lastDownAdjustTime = now + c.lastInstabilityTime = now } +} - if newInterval != current { - c.scanInterval.Store(int64(newInterval)) - metrics.EventServiceScanWindowIntervalGaugeVec.WithLabelValues(c.changefeedID.String()).Set(newInterval.Seconds()) - c.lastAdjustTime.Store(now) - if adjustedOnTrend { - c.lastTrendAdjustTime.Store(now) - } +func (c *adaptiveScanWindowController) setLastAdjustTimeForTest(now time.Time) { + c.mu.Lock() + defer c.mu.Unlock() + c.lastAdjustTime = now +} + +func (c *adaptiveScanWindowController) setLastDownAdjustTimeForTest(now time.Time) { + c.mu.Lock() + defer c.mu.Unlock() + c.lastDownAdjustTime = now +} - log.Info("scan interval adjusted", - zap.Stringer("changefeedID", c.changefeedID), - zap.Duration("oldInterval", current), - zap.Duration("newInterval", newInterval), - zap.Duration("maxInterval", maxInterval), - zap.Float64("avgUsage", usage.avg), - zap.Float64("maxUsage", usage.max), - zap.Float64("firstUsage", usage.first), - zap.Float64("lastUsage", usage.last), - zap.Float64("trendDelta", trendDelta), - zap.Int("usageSamples", usage.cnt), - zap.Bool("syncPointEnabled", c.isSyncpointEnabled()), - zap.Duration("syncPointInterval", c.syncPointInterval)) +func (c *adaptiveScanWindowController) setPressureScoreForTest(score float64) { + c.mu.Lock() + defer c.mu.Unlock() + c.pressureScore = score +} + +func (c *adaptiveScanWindowController) resetForTest(now time.Time) { + c.mu.Lock() + defer c.mu.Unlock() + c.usageWindow.reset() + c.lastAdjustTime = now + c.lastDownAdjustTime = now + c.lastCriticalTime = time.Time{} + c.lastInstabilityTime = time.Time{} + c.fastUsageEMA = 0 + c.slowUsageEMA = 0 + c.emaInitialized = false + c.pressureScore = 0 +} + +func normalizeUsageRatio(usageRatio float64) float64 { + if usageRatio != usageRatio || usageRatio < 0 { + return 0 + } + if usageRatio > 1 { + return 1 } + return usageRatio +} + +func ema(previous float64, value float64, alpha float64) float64 { + return previous + alpha*(value-previous) } func (c *changefeedStatus) maxScanInterval() time.Duration { diff --git a/pkg/eventservice/scan_window_test.go b/pkg/eventservice/scan_window_test.go index 01ee458e70..d40b1606bc 100644 --- a/pkg/eventservice/scan_window_test.go +++ b/pkg/eventservice/scan_window_test.go @@ -18,71 +18,183 @@ import ( "time" "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/metrics" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" "go.uber.org/atomic" ) -func TestAdjustScanIntervalVeryLowBypassesSyncPointCap(t *testing.T) { +func markScanWindowReadyForIncrease(status *changefeedStatus, now time.Time) { + status.scanWindowController.setLastAdjustTimeForTest(now.Add(-scanIntervalAdjustCooldown - time.Second)) + status.scanWindowController.setLastDownAdjustTimeForTest(now.Add(-scanWindowReleaseRecoveryCooldown - time.Second)) +} + +func markScanWindowReadyForDecrease(status *changefeedStatus, now time.Time) { + status.scanWindowController.setLastDownAdjustTimeForTest(now.Add(-scanWindowPressureAdjustCooldown - time.Second)) +} + +func TestAdjustScanIntervalLowPressureSlowsRecoveryForLargeWindow(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) now := time.Now() - status.lastAdjustTime.Store(now.Add(-scanIntervalAdjustCooldown - time.Second)) - - // Start from the sync point capped max interval, then allow it to grow slowly. - status.scanInterval.Store(int64(1 * time.Minute)) + markScanWindowReadyForIncrease(status, now) + status.scanInterval.Store(int64(80 * time.Second)) - // Maintain a very low pressure for a full window to allow bypassing the sync point cap. for i := 0; i <= int(memoryUsageWindowDuration/time.Second); i++ { - status.updateMemoryUsage(now.Add(time.Duration(i)*time.Second), 0, 0) + status.updateMemoryUsage(now.Add(time.Duration(i)*time.Second), 0.15, 0) } - require.Equal(t, int64(90*time.Second), status.scanInterval.Load()) + require.Equal(t, int64(88*time.Second), status.scanInterval.Load()) } -func TestAdjustScanIntervalLowRespectsSyncPointCap(t *testing.T) { +func TestAdjustScanIntervalVeryLowPressureSlowsRecoveryForVeryLargeWindow(t *testing.T) { t.Parallel() - status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) now := time.Now() - status.lastAdjustTime.Store(now.Add(-scanIntervalAdjustCooldown - time.Second)) - - status.scanInterval.Store(int64(40 * time.Second)) + markScanWindowReadyForIncrease(status, now) + status.scanInterval.Store(int64(150 * time.Second)) for i := 0; i <= int(memoryUsageWindowDuration/time.Second); i++ { - status.updateMemoryUsage(now.Add(time.Duration(i)*time.Second), 0.15, 0) + status.updateMemoryUsage(now.Add(time.Duration(i)*time.Second), 0, 0) } - require.Equal(t, int64(50*time.Second), status.scanInterval.Load()) + require.Equal(t, int64(165*time.Second), status.scanInterval.Load()) } -func TestAdjustScanIntervalDecreaseIgnoresCooldown(t *testing.T) { +func TestAdjustScanIntervalHighPressureUsesBoundedReduction(t *testing.T) { t.Parallel() status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) now := time.Now() - status.lastAdjustTime.Store(now) + markScanWindowReadyForDecrease(status, now) status.scanInterval.Store(int64(40 * time.Second)) status.updateMemoryUsage(now.Add(memoryUsageWindowDuration), 0.8, 0) - require.Equal(t, int64(20*time.Second), status.scanInterval.Load()) + require.Equal(t, int64(30*time.Second), status.scanInterval.Load()) } func TestAdjustScanIntervalCriticalPressure(t *testing.T) { t.Parallel() + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) + status.scanInterval.Store(int64(40 * time.Second)) + status.updateMemoryUsage(time.Now().Add(memoryUsageWindowDuration), 1, 0) + require.Equal(t, int64(scanWindowEmergencyBrakePlateauInterval), status.scanInterval.Load()) +} + +func TestAdjustScanIntervalCriticalPressureUsesDefaultFloor(t *testing.T) { + t.Parallel() + + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status.scanInterval.Store(int64(8 * time.Second)) + status.updateMemoryUsage(time.Now().Add(memoryUsageWindowDuration), 0.95, 0) + require.Equal(t, int64(defaultScanInterval), status.scanInterval.Load()) +} + +func TestAdjustScanIntervalHighPressureDoesNotIncreaseBelowDefaultFloor(t *testing.T) { + t.Parallel() + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) now := time.Now() - status.lastAdjustTime.Store(now) + markScanWindowReadyForDecrease(status, now) - status.scanInterval.Store(int64(40 * time.Second)) + status.scanInterval.Store(int64(2 * time.Second)) + status.updateMemoryUsage(now.Add(memoryUsageWindowDuration), 0.8, 0) + require.Equal(t, int64(2*time.Second), status.scanInterval.Load()) +} + +func TestAdjustScanIntervalCriticalPressureDoesNotIncreaseBelowDefaultFloor(t *testing.T) { + t.Parallel() - status.updateMemoryUsage(now.Add(memoryUsageWindowDuration), 1, 0) + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status.scanInterval.Store(int64(2 * time.Second)) + status.updateMemoryUsage(time.Now().Add(memoryUsageWindowDuration), 0.95, 0) + require.Equal(t, int64(2*time.Second), status.scanInterval.Load()) +} + +func TestAdjustScanIntervalEmergencyPressureUsesModerateBrakeForSmallWindow(t *testing.T) { + t.Parallel() + + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status.scanInterval.Store(int64(20 * time.Second)) + status.updateMemoryUsage(time.Now().Add(memoryUsageWindowDuration), 1, 0) require.Equal(t, int64(10*time.Second), status.scanInterval.Load()) } -func TestUpdateMemoryUsageResetsScanIntervalOnMemoryRelease(t *testing.T) { +func TestScanWindowEmergencyBrakeIntervalIsContinuousAtThirtySeconds(t *testing.T) { + t.Parallel() + + require.Equal(t, 15*time.Second, scanWindowEmergencyBrakeInterval(30*time.Second, false)) + require.Equal(t, 15*time.Second, scanWindowEmergencyBrakeInterval(31*time.Second, false)) + require.Equal(t, 15*time.Second, scanWindowEmergencyBrakeInterval(60*time.Second, false)) +} + +func TestScanWindowEmergencyBrakeIntervalUsesStrongBrakeForLargeWindow(t *testing.T) { + t.Parallel() + + require.Equal(t, 20*time.Second, scanWindowEmergencyBrakeInterval(80*time.Second, false)) +} + +func TestAdjustScanIntervalEmergencyPressureUsesDefaultFloorForVerySmallWindow(t *testing.T) { + t.Parallel() + + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status.scanInterval.Store(int64(8 * time.Second)) + status.updateMemoryUsage(time.Now().Add(memoryUsageWindowDuration), 1, 0) + require.Equal(t, int64(defaultScanInterval), status.scanInterval.Load()) +} + +func TestAdjustScanIntervalEmergencyPressureDoesNotImmediatelyDropBelowDefaultFloor(t *testing.T) { + t.Parallel() + + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status.scanInterval.Store(int64(defaultScanInterval)) + status.updateMemoryUsage(time.Now().Add(memoryUsageWindowDuration), 1, 0) + require.Equal(t, int64(defaultScanInterval), status.scanInterval.Load()) +} + +func TestAdjustScanIntervalEmergencyPressureDoesNotIncreaseBelowDefaultFloor(t *testing.T) { + t.Parallel() + + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status.scanInterval.Store(int64(2 * time.Second)) + status.updateMemoryUsage(time.Now().Add(memoryUsageWindowDuration), 1, 0) + require.Equal(t, int64(2*time.Second), status.scanInterval.Load()) +} + +func TestAdjustScanIntervalEmergencyPressureCanReachMinFloorWhenSustained(t *testing.T) { + t.Parallel() + + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + status.scanInterval.Store(int64(defaultScanInterval)) + start := time.Now() + + for i := 0; i <= 30; i++ { + status.updateMemoryUsage(start.Add(time.Duration(i)*time.Second), 1, 0) + } + + require.Equal(t, int64(minScanInterval), status.scanInterval.Load()) +} + +func TestAdjustScanIntervalRecoversFromFloorBeforeNormalIncreaseCooldown(t *testing.T) { + t.Parallel() + + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) + now := time.Now() + status.scanInterval.Store(int64(defaultScanInterval)) + status.scanWindowController.setLastAdjustTimeForTest(now.Add(-scanWindowFloorRecoveryCooldown - time.Second)) + status.scanWindowController.setLastDownAdjustTimeForTest(now.Add(-scanWindowFloorRecoveryCooldown - time.Second)) + + for i, usage := range []float64{0.30, 0.25, 0.20, 0.18, 0.15} { + status.updateMemoryUsage(now.Add(time.Duration(i)*time.Second), usage, 0) + } + require.Greater(t, status.scanInterval.Load(), int64(defaultScanInterval)) +} + +func TestUpdateMemoryUsageDoesNotResetScanIntervalOnMemoryRelease(t *testing.T) { t.Parallel() status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) @@ -90,7 +202,68 @@ func TestUpdateMemoryUsageResetsScanIntervalOnMemoryRelease(t *testing.T) { status.scanInterval.Store(int64(40 * time.Second)) status.updateMemoryUsage(now, 0.5, 1) - require.Equal(t, int64(defaultScanInterval), status.scanInterval.Load()) + require.Equal(t, int64(40*time.Second), status.scanInterval.Load()) +} + +func TestUpdateMemoryUsageRecordsScanWindowObservationMetrics(t *testing.T) { + status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 1*time.Minute) + changefeed := status.changefeedID.String() + t.Cleanup(func() { + deleteScanWindowMetrics(changefeed) + }) + + now := time.Now() + status.scanInterval.Store(int64(40 * time.Second)) + + status.updateMemoryUsage(now, 0.6, 1) + + require.InDelta(t, 0.6, testutil.ToFloat64(metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "report")), 1e-9) + require.InDelta(t, 0.6, testutil.ToFloat64(metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "avg")), 1e-9) + require.InDelta(t, 0.6, testutil.ToFloat64(metrics.EventServiceScanWindowUsageRatioGaugeVec.WithLabelValues(changefeed, "max")), 1e-9) + require.InDelta(t, 0.6, testutil.ToFloat64(metrics.EventServiceScanWindowUsageEMAGaugeVec.WithLabelValues(changefeed, "fast")), 1e-9) + require.InDelta(t, 0.6, testutil.ToFloat64(metrics.EventServiceScanWindowUsageEMAGaugeVec.WithLabelValues(changefeed, "slow")), 1e-9) + require.InDelta(t, 0, testutil.ToFloat64(metrics.EventServiceScanWindowTargetBandGaugeVec.WithLabelValues(changefeed, "report")), 1e-9) + require.InDelta(t, 0, testutil.ToFloat64(metrics.EventServiceScanWindowTargetBandGaugeVec.WithLabelValues(changefeed, "fast")), 1e-9) + require.InDelta(t, 0, testutil.ToFloat64(metrics.EventServiceScanWindowTargetBandGaugeVec.WithLabelValues(changefeed, "slow")), 1e-9) + require.InDelta(t, 0, testutil.ToFloat64(metrics.EventServiceScanWindowPressureScoreGaugeVec.WithLabelValues(changefeed)), 1e-9) + require.InDelta(t, 1, testutil.ToFloat64(metrics.EventServiceScanWindowMemoryReleaseCount.WithLabelValues(changefeed)), 1e-9) +} + +func TestUpdateMemoryUsageRecordsScanWindowAdjustCount(t *testing.T) { + status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 1*time.Minute) + changefeed := status.changefeedID.String() + t.Cleanup(func() { + deleteScanWindowMetrics(changefeed) + }) + + now := time.Now() + markScanWindowReadyForDecrease(status, now) + status.scanInterval.Store(int64(40 * time.Second)) + + status.updateMemoryUsage(now.Add(memoryUsageWindowDuration), 0.8, 0) + + require.Equal(t, int64(30*time.Second), status.scanInterval.Load()) + require.InDelta(t, 1, testutil.ToFloat64(metrics.EventServiceScanWindowAdjustCount.WithLabelValues(changefeed, string(scanWindowDecisionHighPressure))), 1e-9) +} + +func TestUpdateMemoryUsageRecordsScanWindowTargetBandMetrics(t *testing.T) { + status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 10*time.Minute) + changefeed := status.changefeedID.String() + t.Cleanup(func() { + deleteScanWindowMetrics(changefeed) + }) + + start := time.Now() + status.updateMemoryUsage(start, 0.20, 0) + status.updateMemoryUsage(start.Add(time.Second), 0.40, 0) + status.updateMemoryUsage(start.Add(2*time.Second), 0.60, 0) + + require.InDelta(t, 0, testutil.ToFloat64(metrics.EventServiceScanWindowTargetBandGaugeVec.WithLabelValues(changefeed, "report")), 1e-9) + require.InDelta(t, 1, testutil.ToFloat64(metrics.EventServiceScanWindowTargetBandGaugeVec.WithLabelValues(changefeed, "fast")), 1e-9) + require.InDelta(t, 1, testutil.ToFloat64(metrics.EventServiceScanWindowTargetBandGaugeVec.WithLabelValues(changefeed, "slow")), 1e-9) + require.InDelta(t, 2, testutil.ToFloat64(metrics.EventServiceScanWindowTargetBandCrossCount.WithLabelValues(changefeed, "report")), 1e-9) + require.InDelta(t, 1, testutil.ToFloat64(metrics.EventServiceScanWindowTargetBandCrossCount.WithLabelValues(changefeed, "fast")), 1e-9) + require.InDelta(t, 1, testutil.ToFloat64(metrics.EventServiceScanWindowTargetBandCrossCount.WithLabelValues(changefeed, "slow")), 1e-9) } func TestAdjustScanIntervalIncreaseWithJitteredSamples(t *testing.T) { @@ -99,7 +272,7 @@ func TestAdjustScanIntervalIncreaseWithJitteredSamples(t *testing.T) { status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) start := time.Now() - status.lastAdjustTime.Store(start.Add(-scanIntervalAdjustCooldown - time.Second)) + markScanWindowReadyForIncrease(status, start) status.scanInterval.Store(int64(40 * time.Second)) @@ -112,37 +285,47 @@ func TestAdjustScanIntervalIncreaseWithJitteredSamples(t *testing.T) { require.Equal(t, int64(50*time.Second), status.scanInterval.Load()) } -func TestAdjustScanIntervalDecreasesWhenUsageIncreasing(t *testing.T) { +func TestAdjustScanIntervalReducesOnSustainedPressure(t *testing.T) { t.Parallel() status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) now := time.Now() - status.lastAdjustTime.Store(now) + markScanWindowReadyForDecrease(status, now) status.scanInterval.Store(int64(40 * time.Second)) - status.updateMemoryUsage(now, 0.10, 0) - status.updateMemoryUsage(now.Add(1*time.Second), 0.11, 0) - status.updateMemoryUsage(now.Add(2*time.Second), 0.12, 0) - status.updateMemoryUsage(now.Add(3*time.Second), 0.13, 0) - require.Equal(t, int64(40*time.Second), status.scanInterval.Load()) + status.updateMemoryUsage(now, 0.60, 0) + status.updateMemoryUsage(now.Add(1*time.Second), 0.60, 0) + status.updateMemoryUsage(now.Add(2*time.Second), 0.60, 0) + require.Equal(t, int64(36*time.Second), status.scanInterval.Load()) } -func TestAdjustScanIntervalDecreasesWhenUsageIncreasingAboveThirtyPercent(t *testing.T) { +func TestAdjustScanIntervalSustainedPressureDoesNotIncreaseBelowDefaultFloor(t *testing.T) { t.Parallel() status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) now := time.Now() - status.lastAdjustTime.Store(now) - status.lastTrendAdjustTime.Store(now.Add(-scanTrendAdjustCooldown - time.Second)) + markScanWindowReadyForDecrease(status, now) + + status.scanInterval.Store(int64(2 * time.Second)) + + status.updateMemoryUsage(now, 0.60, 0) + status.updateMemoryUsage(now.Add(1*time.Second), 0.60, 0) + status.updateMemoryUsage(now.Add(2*time.Second), 0.60, 0) + require.Equal(t, int64(2*time.Second), status.scanInterval.Load()) +} + +func TestAdjustScanIntervalDoesNotIncreaseBeforeCooldown(t *testing.T) { + t.Parallel() + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 1*time.Minute) + now := time.Now() status.scanInterval.Store(int64(40 * time.Second)) - status.updateMemoryUsage(now, 0.31, 0) - status.updateMemoryUsage(now.Add(1*time.Second), 0.32, 0) - status.updateMemoryUsage(now.Add(2*time.Second), 0.33, 0) - status.updateMemoryUsage(now.Add(3*time.Second), 0.34, 0) - require.Equal(t, int64(36*time.Second), status.scanInterval.Load()) + for i := 0; i < 10; i++ { + status.updateMemoryUsage(now.Add(time.Duration(i)*time.Second), 0.05, 0) + } + require.Equal(t, int64(40*time.Second), status.scanInterval.Load()) } func TestRefreshMinSentResolvedTsMinAndSkipRules(t *testing.T) { diff --git a/pkg/metrics/event_service.go b/pkg/metrics/event_service.go index e6ed10cc90..a8d96a2bbd 100644 --- a/pkg/metrics/event_service.go +++ b/pkg/metrics/event_service.go @@ -69,6 +69,55 @@ var ( Name: "scan_window_interval", Help: "The scan window interval in seconds for each changefeed", }, []string{"changefeed"}) + EventServiceScanWindowUsageRatioGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "scan_window_usage_ratio", + Help: "The usage ratio observed by the scan window controller for each changefeed", + }, []string{"changefeed", "type"}) + EventServiceScanWindowUsageEMAGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "scan_window_usage_ema", + Help: "The usage EMA values used by the scan window controller for each changefeed", + }, []string{"changefeed", "type"}) + EventServiceScanWindowTargetBandGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "scan_window_target_band", + Help: "Whether the observed scan window value is currently inside the target band for each changefeed", + }, []string{"changefeed", "type"}) + EventServiceScanWindowTargetBandCrossCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "scan_window_target_band_cross_count", + Help: "The number of target band state changes observed by the scan window controller for each changefeed", + }, []string{"changefeed", "type"}) + EventServiceScanWindowPressureScoreGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "scan_window_pressure_score", + Help: "The pressure score maintained by the scan window controller for each changefeed", + }, []string{"changefeed"}) + EventServiceScanWindowMemoryReleaseCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "scan_window_memory_release_count", + Help: "The number of memory release events reported to the scan window controller for each changefeed", + }, []string{"changefeed"}) + EventServiceScanWindowAdjustCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "scan_window_adjust_count", + Help: "The number of scan window adjustments made by the controller for each changefeed", + }, []string{"changefeed", "reason"}) EventServiceScanDuration = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: "ticdc", @@ -201,6 +250,13 @@ func initEventServiceMetrics(registry *prometheus.Registry) { registry.MustRegister(EventServiceResolvedTsLagGauge) registry.MustRegister(EventServiceScanWindowBaseTsGaugeVec) registry.MustRegister(EventServiceScanWindowIntervalGaugeVec) + registry.MustRegister(EventServiceScanWindowUsageRatioGaugeVec) + registry.MustRegister(EventServiceScanWindowUsageEMAGaugeVec) + registry.MustRegister(EventServiceScanWindowTargetBandGaugeVec) + registry.MustRegister(EventServiceScanWindowTargetBandCrossCount) + registry.MustRegister(EventServiceScanWindowPressureScoreGaugeVec) + registry.MustRegister(EventServiceScanWindowMemoryReleaseCount) + registry.MustRegister(EventServiceScanWindowAdjustCount) registry.MustRegister(EventServiceScanDuration) registry.MustRegister(EventServiceScannedCount) registry.MustRegister(EventServiceDispatcherGauge)