Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (c *BlocksCleaner) cleanUpActiveUsers(ctx context.Context, users []string,
return nil
}
errChan := make(chan error, 1)
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
go visitMarkerManager.HeartBeat(ctx, func() {}, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
defer func() {
errChan <- nil
}()
Expand Down Expand Up @@ -391,7 +391,7 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e
return nil
}
errChan := make(chan error, 1)
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
go visitMarkerManager.HeartBeat(ctx, func() {}, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
defer func() {
errChan <- nil
}()
Expand Down Expand Up @@ -439,7 +439,7 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro

func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (visitMarkerManager *VisitMarkerManager, isVisited bool, err error) {
cleanerVisitMarker := NewCleanerVisitMarker(c.ringLifecyclerID)
visitMarkerManager = NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker)
visitMarkerManager = NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker, nil)

existingCleanerVisitMarker := &CleanerVisitMarker{}
err = visitMarkerManager.ReadVisitMarker(ctx, existingCleanerVisitMarker)
Expand Down
12 changes: 6 additions & 6 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ var (
return nil, nil, err
}

plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ string, _ prometheus.Counter, _ prometheus.Counter, _ *compactorMetrics) compact.Planner {
plannerFactory := func(ctx context.Context, cancel context.CancelFunc, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ string, _ prometheus.Counter, _ prometheus.Counter, _ *compactorMetrics) compact.Planner {
return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter)
}

Expand All @@ -152,10 +152,9 @@ var (
return nil, nil, err
}

plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner {

plannerFactory := func(ctx context.Context, cancel context.CancelFunc, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, userID string, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, compactorMetrics *compactorMetrics) compact.Planner {
if cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
return NewPartitionCompactionPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, userID, cfg.ShardingPlannerDelay, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, compactorMetrics)
return NewPartitionCompactionPlanner(ctx, cancel, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, userID, cfg.ShardingPlannerDelay, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, compactorMetrics)
} else {
return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.CompactionVisitMarkerTimeout, cfg.CompactionVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
}
Expand Down Expand Up @@ -217,6 +216,7 @@ type BlocksCompactorFactory func(

type PlannerFactory func(
ctx context.Context,
cancel context.CancelFunc,
bkt objstore.InstrumentedBucket,
logger log.Logger,
cfg Config,
Expand Down Expand Up @@ -1091,7 +1091,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
ulogger,
syncer,
c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, c.BlocksMarkedForNoCompaction, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, syncerMetrics, c.compactorMetrics, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter, c.ingestionReplicationFactor),
c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, userID, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.compactorMetrics),
c.blocksPlannerFactory(currentCtx, cancel, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, userID, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.compactorMetrics),
c.blocksCompactor,
c.blockDeletableCheckerFactory(currentCtx, bucket, ulogger),
c.compactionLifecycleCallbackFactory(currentCtx, bucket, ulogger, c.compactorCfg.MetaSyncConcurrency, c.compactDirForUser(userID), userID, c.compactorMetrics),
Expand All @@ -1104,7 +1104,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
return errors.Wrap(err, "failed to create bucket compactor")
}

if err := compactor.Compact(ctx); err != nil {
if err := compactor.Compact(currentCtx); err != nil {
level.Warn(ulogger).Log("msg", "compaction failed with error", "err", err)
return errors.Wrap(err, "compaction")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/compactor/partition_compaction_grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ func (g *PartitionCompactionGrouper) handleEmptyPartition(partitionedGroupInfo *
PartitionID: partition.PartitionID,
Version: PartitionVisitMarkerVersion1,
}
visitMarkerManager := NewVisitMarkerManager(g.bkt, g.logger, g.ringLifecyclerID, visitMarker)
visitMarkerManager := NewVisitMarkerManager(g.bkt, g.logger, g.ringLifecyclerID, visitMarker, nil)
visitMarkerManager.MarkWithStatus(g.ctx, Completed)

level.Info(g.logger).Log("msg", "handled empty block in partition", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "partitioned_group_creation_time", partitionedGroupInfo.CreationTimeString(), "partition_count", partitionedGroupInfo.PartitionCount, "partition_id", partition.PartitionID)
Expand All @@ -632,8 +632,8 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact
partitionCount := partitionedGroup.partitionedGroupInfo.PartitionCount
partitionID := partitionedGroup.partition.PartitionID
partitionedGroupLogger := log.With(g.logger, "rangeStart", partitionedGroup.rangeStartTime().String(), "rangeEnd", partitionedGroup.rangeEndTime().String(), "rangeDuration", partitionedGroup.rangeDuration().String(), "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "group_hash", groupHash)
visitMarker := newPartitionVisitMarker(g.ringLifecyclerID, partitionedGroupID, partitionID)
visitMarkerManager := NewVisitMarkerManager(g.bkt, g.logger, g.ringLifecyclerID, visitMarker)
visitMarker := newPartitionVisitMarker(g.ringLifecyclerID, partitionedGroupID, partitionedGroup.partitionedGroupInfo.CreationTime, partitionID)
visitMarkerManager := NewVisitMarkerManager(g.bkt, g.logger, g.ringLifecyclerID, visitMarker, nil)
if isVisited, err := g.isGroupVisited(partitionID, visitMarkerManager); err != nil {
level.Warn(partitionedGroupLogger).Log("msg", "unable to check if partition is visited", "err", err, "group", partitionedGroup.String())
continue
Expand Down
24 changes: 20 additions & 4 deletions pkg/compactor/partition_compaction_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var (

type PartitionCompactionPlanner struct {
ctx context.Context
ctxCancel context.CancelFunc
bkt objstore.InstrumentedBucket
logger log.Logger
ranges []int64
Expand All @@ -36,6 +37,7 @@ type PartitionCompactionPlanner struct {

func NewPartitionCompactionPlanner(
ctx context.Context,
cancel context.CancelFunc,
bkt objstore.InstrumentedBucket,
logger log.Logger,
ranges []int64,
Expand All @@ -49,6 +51,7 @@ func NewPartitionCompactionPlanner(
) *PartitionCompactionPlanner {
return &PartitionCompactionPlanner{
ctx: ctx,
ctxCancel: cancel,
bkt: bkt,
logger: logger,
ranges: ranges,
Expand All @@ -73,7 +76,7 @@ func (p *PartitionCompactionPlanner) Plan(ctx context.Context, metasByMinTime []
return p.PlanWithPartition(ctx, metasByMinTime, cortexMetaExtensions, errChan)
}

func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasByMinTime []*metadata.Meta, cortexMetaExtensions *tsdb.CortexMetaExtensions, errChan chan error) ([]*metadata.Meta, error) {
func (p *PartitionCompactionPlanner) PlanWithPartition(ctx context.Context, metasByMinTime []*metadata.Meta, cortexMetaExtensions *tsdb.CortexMetaExtensions, errChan chan error) ([]*metadata.Meta, error) {
partitionInfo := cortexMetaExtensions.PartitionInfo
if partitionInfo == nil {
return nil, fmt.Errorf("partitionInfo cannot be nil")
Expand All @@ -85,8 +88,21 @@ func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasB
// claimed same partition in grouper at same time.
time.Sleep(p.plannerDelay)

visitMarker := newPartitionVisitMarker(p.ringLifecyclerID, partitionedGroupID, partitionID)
visitMarkerManager := NewVisitMarkerManager(p.bkt, p.logger, p.ringLifecyclerID, visitMarker)
visitMarker := newPartitionVisitMarker(p.ringLifecyclerID, partitionedGroupID, partitionInfo.PartitionedGroupCreationTime, partitionID)
visitMarkerManager := NewVisitMarkerManager(p.bkt, p.logger, p.ringLifecyclerID, visitMarker, func(v VisitMarker) bool {
partitionVisitMarker, ok := v.(*partitionVisitMarker)
if !ok {
level.Info(p.logger).Log("msg", "not a partition visit marker, must be consistent")
return true
}
partitionGroupInfo, err := ReadPartitionedGroupInfo(ctx, p.bkt, p.logger, partitionVisitMarker.PartitionedGroupID)
if err != nil {
level.Error(p.logger).Log("msg", "failed to read partition info file, assuming visit marker is inconsistent", "partition_group_id", partitionVisitMarker.PartitionedGroupID, "err", err)
return false
}
level.Info(p.logger).Log("msg", "checking partitiong group creation time", "visit_marker", partitionVisitMarker.PartitionedGroupCreationTime, "partition_group", partitionGroupInfo.CreationTime)
return (partitionVisitMarker.PartitionedGroupCreationTime == 0 || partitionVisitMarker.PartitionedGroupCreationTime == partitionGroupInfo.CreationTime)
})
existingPartitionVisitMarker := &partitionVisitMarker{}
err := visitMarkerManager.ReadVisitMarker(p.ctx, existingPartitionVisitMarker)
visitMarkerExists := true
Expand Down Expand Up @@ -171,7 +187,7 @@ func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasB
return nil, nil
}

go visitMarkerManager.HeartBeat(p.ctx, errChan, p.partitionVisitMarkerFileUpdateInterval, false)
go visitMarkerManager.HeartBeat(p.ctx, p.ctxCancel, errChan, p.partitionVisitMarkerFileUpdateInterval, false)

return resultMetas, nil
}
18 changes: 10 additions & 8 deletions pkg/compactor/partition_visit_marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,23 @@ var (
)

type partitionVisitMarker struct {
CompactorID string `json:"compactorID"`
Status VisitStatus `json:"status"`
PartitionedGroupID uint32 `json:"partitionedGroupID"`
PartitionID int `json:"partitionID"`
CompactorID string `json:"compactorID"`
Status VisitStatus `json:"status"`
PartitionedGroupID uint32 `json:"partitionedGroupID"`
PartitionedGroupCreationTime int64 `json:"partitionedGroupCreationTime"`
PartitionID int `json:"partitionID"`
// VisitTime is a unix timestamp of when the partition was visited (mark updated).
VisitTime int64 `json:"visitTime"`
// Version of the file.
Version int `json:"version"`
}

func newPartitionVisitMarker(compactorID string, partitionedGroupID uint32, partitionID int) *partitionVisitMarker {
func newPartitionVisitMarker(compactorID string, partitionedGroupID uint32, partitionedGroupCreationTime int64, partitionID int) *partitionVisitMarker {
return &partitionVisitMarker{
CompactorID: compactorID,
PartitionedGroupID: partitionedGroupID,
PartitionID: partitionID,
CompactorID: compactorID,
PartitionedGroupID: partitionedGroupID,
PartitionedGroupCreationTime: partitionedGroupCreationTime,
PartitionID: partitionID,
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/partitioned_group_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus(
PartitionedGroupID: p.PartitionedGroupID,
PartitionID: partition.PartitionID,
}
visitMarkerManager := NewVisitMarkerManager(userBucket, partitionedGroupLogger, "PartitionedGroupInfo.getPartitionedGroupStatus", visitMarker)
visitMarkerManager := NewVisitMarkerManager(userBucket, partitionedGroupLogger, "PartitionedGroupInfo.getPartitionedGroupStatus", visitMarker, nil)
partitionVisitMarkerExists := true
if err := visitMarkerManager.ReadVisitMarker(ctx, visitMarker); err != nil {
if errors.Is(err, errorVisitMarkerNotFound) {
Expand Down
41 changes: 31 additions & 10 deletions pkg/compactor/visit_marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,34 +40,40 @@ type VisitMarker interface {
}

type VisitMarkerManager struct {
bkt objstore.InstrumentedBucket
logger log.Logger
ownerIdentifier string
visitMarker VisitMarker
bkt objstore.InstrumentedBucket
logger log.Logger
ownerIdentifier string
visitMarker VisitMarker
isConsistentWithCompactionFn func(v VisitMarker) bool
}

func NewVisitMarkerManager(
bkt objstore.InstrumentedBucket,
logger log.Logger,
ownerIdentifier string,
visitMarker VisitMarker,
isConsistentWithCompactionFn func(v VisitMarker) bool,
) *VisitMarkerManager {
return &VisitMarkerManager{
bkt: bkt,
logger: log.With(logger, "type", fmt.Sprintf("%T", visitMarker)),
ownerIdentifier: ownerIdentifier,
visitMarker: visitMarker,
bkt: bkt,
logger: log.With(logger, "type", fmt.Sprintf("%T", visitMarker)),
ownerIdentifier: ownerIdentifier,
visitMarker: visitMarker,
isConsistentWithCompactionFn: isConsistentWithCompactionFn,
}
}

func (v *VisitMarkerManager) HeartBeat(ctx context.Context, errChan <-chan error, visitMarkerFileUpdateInterval time.Duration, deleteOnExit bool) {
func (v *VisitMarkerManager) HeartBeat(ctx context.Context, cancel context.CancelFunc, errChan <-chan error, visitMarkerFileUpdateInterval time.Duration, deleteOnExit bool) {
level.Info(v.getLogger()).Log("msg", "start visit marker heart beat")
ticker := time.NewTicker(visitMarkerFileUpdateInterval)
defer ticker.Stop()
heartBeat:
for {
v.MarkWithStatus(ctx, InProgress)

if !v.isConsistentWithCompaction() {
v.cancelContext(ctx, cancel)
break heartBeat
}
select {
case <-ctx.Done():
level.Warn(v.getLogger()).Log("msg", "visit marker heart beat got cancelled")
Expand All @@ -92,6 +98,11 @@ heartBeat:
break heartBeat
}
}

if !v.isConsistentWithCompaction() {
v.cancelContext(ctx, cancel)
return
}
level.Info(v.getLogger()).Log("msg", "stop visit marker heart beat")
if deleteOnExit {
level.Info(v.getLogger()).Log("msg", "delete visit marker when exiting heart beat")
Expand Down Expand Up @@ -153,3 +164,13 @@ func (v *VisitMarkerManager) updateVisitMarker(ctx context.Context) error {
func (v *VisitMarkerManager) getLogger() log.Logger {
return log.With(v.logger, "visit_marker", v.visitMarker.String())
}

func (v *VisitMarkerManager) isConsistentWithCompaction() bool {
return v.isConsistentWithCompactionFn == nil || v.isConsistentWithCompactionFn(v.visitMarker)
}

func (v *VisitMarkerManager) cancelContext(ctx context.Context, cancel context.CancelFunc) {
level.Error(v.getLogger()).Log("msg", "visit marker is inconsistent with compaction data, deleting visit marker and cancelling the compaction", "visit_marker_file", v.visitMarker.GetVisitMarkerFilePath())
v.DeleteVisitMarker(context.Background())
cancel()
}
Loading