diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 22fe7abf915..93b7b925e8e 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -356,7 +356,7 @@ func (c *BlocksCleaner) cleanUpActiveUsers(ctx context.Context, users []string, return nil } errChan := make(chan error, 1) - go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true) + go visitMarkerManager.HeartBeat(ctx, func() {}, errChan, c.cleanerVisitMarkerFileUpdateInterval, true) defer func() { errChan <- nil }() @@ -391,7 +391,7 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e return nil } errChan := make(chan error, 1) - go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true) + go visitMarkerManager.HeartBeat(ctx, func() {}, errChan, c.cleanerVisitMarkerFileUpdateInterval, true) defer func() { errChan <- nil }() @@ -439,7 +439,7 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (visitMarkerManager *VisitMarkerManager, isVisited bool, err error) { cleanerVisitMarker := NewCleanerVisitMarker(c.ringLifecyclerID) - visitMarkerManager = NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker) + visitMarkerManager = NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker, nil) existingCleanerVisitMarker := &CleanerVisitMarker{} err = visitMarkerManager.ReadVisitMarker(ctx, existingCleanerVisitMarker) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index d66797c6e13..aac98a07dc0 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -139,7 +139,7 @@ var ( return nil, nil, err } - plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ string, _ prometheus.Counter, _ prometheus.Counter, _ *compactorMetrics) compact.Planner { + plannerFactory := func(ctx context.Context, cancel context.CancelFunc, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ string, _ prometheus.Counter, _ prometheus.Counter, _ *compactorMetrics) compact.Planner { return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter) } @@ -152,10 +152,9 @@ var ( return nil, nil, err } - plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner { - + plannerFactory := func(ctx context.Context, cancel context.CancelFunc, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner { if cfg.CompactionStrategy == util.CompactionStrategyPartitioning { - return NewPartitionCompactionPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, userID, cfg.ShardingPlannerDelay, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, compactorMetrics) + return NewPartitionCompactionPlanner(ctx, cancel, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, userID, cfg.ShardingPlannerDelay, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, compactorMetrics) } else { return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed) } @@ -217,6 +216,7 @@ type BlocksCompactorFactory func( type PlannerFactory func( ctx context.Context, + cancel context.CancelFunc, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, @@ -1091,7 +1091,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { ulogger, syncer, c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, c.BlocksMarkedForNoCompaction, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, syncerMetrics, c.compactorMetrics, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter, c.ingestionReplicationFactor), - c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, userID, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.compactorMetrics), + c.blocksPlannerFactory(currentCtx, cancel, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, userID, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.compactorMetrics), c.blocksCompactor, c.blockDeletableCheckerFactory(currentCtx, bucket, ulogger), c.compactionLifecycleCallbackFactory(currentCtx, bucket, ulogger, c.compactorCfg.MetaSyncConcurrency, c.compactDirForUser(userID), userID, c.compactorMetrics), @@ -1104,7 +1104,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { return errors.Wrap(err, "failed to create bucket compactor") } - if err := compactor.Compact(ctx); err != nil { + if err := compactor.Compact(currentCtx); err != nil { level.Warn(ulogger).Log("msg", "compaction failed with error", "err", err) return errors.Wrap(err, "compaction") } diff --git a/pkg/compactor/partition_compaction_grouper.go b/pkg/compactor/partition_compaction_grouper.go index 4d9f6811cac..fff74bd3bb7 100644 --- a/pkg/compactor/partition_compaction_grouper.go +++ b/pkg/compactor/partition_compaction_grouper.go @@ -617,7 +617,7 @@ func (g *PartitionCompactionGrouper) handleEmptyPartition(partitionedGroupInfo * PartitionID: partition.PartitionID, Version: PartitionVisitMarkerVersion1, } - visitMarkerManager := NewVisitMarkerManager(g.bkt, g.logger, g.ringLifecyclerID, visitMarker) + visitMarkerManager := NewVisitMarkerManager(g.bkt, g.logger, g.ringLifecyclerID, visitMarker, nil) visitMarkerManager.MarkWithStatus(g.ctx, Completed) level.Info(g.logger).Log("msg", "handled empty block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID) @@ -632,8 +632,8 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact partitionCount := partitionedGroup.partitionedGroupInfo.PartitionCount partitionID := partitionedGroup.partition.PartitionID partitionedGroupLogger := log.With(g.logger, "rangeStart", partitionedGroup.rangeStartTime().String(), "rangeEnd", partitionedGroup.rangeEndTime().String(), "rangeDuration", partitionedGroup.rangeDuration().String(), "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "group_hash", groupHash) - visitMarker := newPartitionVisitMarker(g.ringLifecyclerID, partitionedGroupID, partitionID) - visitMarkerManager := NewVisitMarkerManager(g.bkt, g.logger, g.ringLifecyclerID, visitMarker) + visitMarker := newPartitionVisitMarker(g.ringLifecyclerID, partitionedGroupID, partitionedGroup.partitionedGroupInfo.CreationTime, partitionID) + visitMarkerManager := NewVisitMarkerManager(g.bkt, g.logger, g.ringLifecyclerID, visitMarker, nil) if isVisited, err := g.isGroupVisited(partitionID, visitMarkerManager); err != nil { level.Warn(partitionedGroupLogger).Log("msg", "unable to check if partition is visited", "err", err, "group", partitionedGroup.String()) continue diff --git a/pkg/compactor/partition_compaction_planner.go b/pkg/compactor/partition_compaction_planner.go index 566a62e87e3..cc3a7aef6ee 100644 --- a/pkg/compactor/partition_compaction_planner.go +++ b/pkg/compactor/partition_compaction_planner.go @@ -22,6 +22,7 @@ var ( type PartitionCompactionPlanner struct { ctx context.Context + ctxCancel context.CancelFunc bkt objstore.InstrumentedBucket logger log.Logger ranges []int64 @@ -36,6 +37,7 @@ type PartitionCompactionPlanner struct { func NewPartitionCompactionPlanner( ctx context.Context, + cancel context.CancelFunc, bkt objstore.InstrumentedBucket, logger log.Logger, ranges []int64, @@ -49,6 +51,7 @@ func NewPartitionCompactionPlanner( ) *PartitionCompactionPlanner { return &PartitionCompactionPlanner{ ctx: ctx, + ctxCancel: cancel, bkt: bkt, logger: logger, ranges: ranges, @@ -73,7 +76,7 @@ func (p *PartitionCompactionPlanner) Plan(ctx context.Context, metasByMinTime [] return p.PlanWithPartition(ctx, metasByMinTime, cortexMetaExtensions, errChan) } -func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasByMinTime []*metadata.Meta, cortexMetaExtensions *tsdb.CortexMetaExtensions, errChan chan error) ([]*metadata.Meta, error) { +func (p *PartitionCompactionPlanner) PlanWithPartition(ctx context.Context, metasByMinTime []*metadata.Meta, cortexMetaExtensions *tsdb.CortexMetaExtensions, errChan chan error) ([]*metadata.Meta, error) { partitionInfo := cortexMetaExtensions.PartitionInfo if partitionInfo == nil { return nil, fmt.Errorf("partitionInfo cannot be nil") @@ -85,8 +88,21 @@ func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasB // claimed same partition in grouper at same time. time.Sleep(p.plannerDelay) - visitMarker := newPartitionVisitMarker(p.ringLifecyclerID, partitionedGroupID, partitionID) - visitMarkerManager := NewVisitMarkerManager(p.bkt, p.logger, p.ringLifecyclerID, visitMarker) + visitMarker := newPartitionVisitMarker(p.ringLifecyclerID, partitionedGroupID, partitionInfo.PartitionedGroupCreationTime, partitionID) + visitMarkerManager := NewVisitMarkerManager(p.bkt, p.logger, p.ringLifecyclerID, visitMarker, func(v VisitMarker) bool { + partitionVisitMarker, ok := v.(*partitionVisitMarker) + if !ok { + level.Info(p.logger).Log("msg", "not a partition visit marker, must be consistent") + return true + } + partitionGroupInfo, err := ReadPartitionedGroupInfo(ctx, p.bkt, p.logger, partitionVisitMarker.PartitionedGroupID) + if err != nil { + level.Error(p.logger).Log("msg", "failed to read partition info file, assuming visit marker is inconsistent", "partition_group_id", partitionVisitMarker.PartitionedGroupID, "err", err) + return false + } + level.Info(p.logger).Log("msg", "checking partitiong group creation time", "visit_marker", partitionVisitMarker.PartitionedGroupCreationTime, "partition_group", partitionGroupInfo.CreationTime) + return (partitionVisitMarker.PartitionedGroupCreationTime == 0 || partitionVisitMarker.PartitionedGroupCreationTime == partitionGroupInfo.CreationTime) + }) existingPartitionVisitMarker := &partitionVisitMarker{} err := visitMarkerManager.ReadVisitMarker(p.ctx, existingPartitionVisitMarker) visitMarkerExists := true @@ -171,7 +187,7 @@ func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasB return nil, nil } - go visitMarkerManager.HeartBeat(p.ctx, errChan, p.partitionVisitMarkerFileUpdateInterval, false) + go visitMarkerManager.HeartBeat(p.ctx, p.ctxCancel, errChan, p.partitionVisitMarkerFileUpdateInterval, false) return resultMetas, nil } diff --git a/pkg/compactor/partition_visit_marker.go b/pkg/compactor/partition_visit_marker.go index 4a5d8fdc4ac..5fbea8e5e74 100644 --- a/pkg/compactor/partition_visit_marker.go +++ b/pkg/compactor/partition_visit_marker.go @@ -25,21 +25,23 @@ var ( ) type partitionVisitMarker struct { - CompactorID string `json:"compactorID"` - Status VisitStatus `json:"status"` - PartitionedGroupID uint32 `json:"partitionedGroupID"` - PartitionID int `json:"partitionID"` + CompactorID string `json:"compactorID"` + Status VisitStatus `json:"status"` + PartitionedGroupID uint32 `json:"partitionedGroupID"` + PartitionedGroupCreationTime int64 `json:"partitionedGroupCreationTime"` + PartitionID int `json:"partitionID"` // VisitTime is a unix timestamp of when the partition was visited (mark updated). VisitTime int64 `json:"visitTime"` // Version of the file. Version int `json:"version"` } -func newPartitionVisitMarker(compactorID string, partitionedGroupID uint32, partitionID int) *partitionVisitMarker { +func newPartitionVisitMarker(compactorID string, partitionedGroupID uint32, partitionedGroupCreationTime int64, partitionID int) *partitionVisitMarker { return &partitionVisitMarker{ - CompactorID: compactorID, - PartitionedGroupID: partitionedGroupID, - PartitionID: partitionID, + CompactorID: compactorID, + PartitionedGroupID: partitionedGroupID, + PartitionedGroupCreationTime: partitionedGroupCreationTime, + PartitionID: partitionID, } } diff --git a/pkg/compactor/partitioned_group_info.go b/pkg/compactor/partitioned_group_info.go index a43bb262301..89884e875d7 100644 --- a/pkg/compactor/partitioned_group_info.go +++ b/pkg/compactor/partitioned_group_info.go @@ -137,7 +137,7 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus( PartitionedGroupID: p.PartitionedGroupID, PartitionID: partition.PartitionID, } - visitMarkerManager := NewVisitMarkerManager(userBucket, partitionedGroupLogger, "PartitionedGroupInfo.getPartitionedGroupStatus", visitMarker) + visitMarkerManager := NewVisitMarkerManager(userBucket, partitionedGroupLogger, "PartitionedGroupInfo.getPartitionedGroupStatus", visitMarker, nil) partitionVisitMarkerExists := true if err := visitMarkerManager.ReadVisitMarker(ctx, visitMarker); err != nil { if errors.Is(err, errorVisitMarkerNotFound) { diff --git a/pkg/compactor/visit_marker.go b/pkg/compactor/visit_marker.go index ebe675556d8..31ed884919d 100644 --- a/pkg/compactor/visit_marker.go +++ b/pkg/compactor/visit_marker.go @@ -40,10 +40,11 @@ type VisitMarker interface { } type VisitMarkerManager struct { - bkt objstore.InstrumentedBucket - logger log.Logger - ownerIdentifier string - visitMarker VisitMarker + bkt objstore.InstrumentedBucket + logger log.Logger + ownerIdentifier string + visitMarker VisitMarker + isConsistentWithCompactionFn func(v VisitMarker) bool } func NewVisitMarkerManager( @@ -51,23 +52,28 @@ func NewVisitMarkerManager( logger log.Logger, ownerIdentifier string, visitMarker VisitMarker, + isConsistentWithCompactionFn func(v VisitMarker) bool, ) *VisitMarkerManager { return &VisitMarkerManager{ - bkt: bkt, - logger: log.With(logger, "type", fmt.Sprintf("%T", visitMarker)), - ownerIdentifier: ownerIdentifier, - visitMarker: visitMarker, + bkt: bkt, + logger: log.With(logger, "type", fmt.Sprintf("%T", visitMarker)), + ownerIdentifier: ownerIdentifier, + visitMarker: visitMarker, + isConsistentWithCompactionFn: isConsistentWithCompactionFn, } } -func (v *VisitMarkerManager) HeartBeat(ctx context.Context, errChan <-chan error, visitMarkerFileUpdateInterval time.Duration, deleteOnExit bool) { +func (v *VisitMarkerManager) HeartBeat(ctx context.Context, cancel context.CancelFunc, errChan <-chan error, visitMarkerFileUpdateInterval time.Duration, deleteOnExit bool) { level.Info(v.getLogger()).Log("msg", "start visit marker heart beat") ticker := time.NewTicker(visitMarkerFileUpdateInterval) defer ticker.Stop() heartBeat: for { v.MarkWithStatus(ctx, InProgress) - + if !v.isConsistentWithCompaction() { + v.cancelContext(ctx, cancel) + break heartBeat + } select { case <-ctx.Done(): level.Warn(v.getLogger()).Log("msg", "visit marker heart beat got cancelled") @@ -92,6 +98,11 @@ heartBeat: break heartBeat } } + + if !v.isConsistentWithCompaction() { + v.cancelContext(ctx, cancel) + return + } level.Info(v.getLogger()).Log("msg", "stop visit marker heart beat") if deleteOnExit { level.Info(v.getLogger()).Log("msg", "delete visit marker when exiting heart beat") @@ -153,3 +164,13 @@ func (v *VisitMarkerManager) updateVisitMarker(ctx context.Context) error { func (v *VisitMarkerManager) getLogger() log.Logger { return log.With(v.logger, "visit_marker", v.visitMarker.String()) } + +func (v *VisitMarkerManager) isConsistentWithCompaction() bool { + return v.isConsistentWithCompactionFn == nil || v.isConsistentWithCompactionFn(v.visitMarker) +} + +func (v *VisitMarkerManager) cancelContext(ctx context.Context, cancel context.CancelFunc) { + level.Error(v.getLogger()).Log("msg", "visit marker is inconsistent with compaction data, deleting visit marker and cancelling the compaction", "visit_marker_file", v.visitMarker.GetVisitMarkerFilePath()) + v.DeleteVisitMarker(context.Background()) + cancel() +}