diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index bd5b4f19..b8c6dd70 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -76,18 +76,19 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client) (*FracManager, func cancel() wg.Wait() - // freeze active fraction to prevent new writes - active := lc.registry.Active() - if err := active.Finalize(); err != nil { + // finalize appender to prevent new writes + appender := lc.registry.Appender() + if err := appender.Finalize(); err != nil { logger.Fatal("shutdown fraction freezing error", zap.Error(err)) } - active.WaitWriteIdle() + appender.WaitWriteIdle() stopIdx() lc.SyncInfoCache() - sealOnShutdown(active.instance, provider, cfg.MinSealFracSize) + // Seal active fraction + sealOnShutdown(appender.active.frac, provider, cfg.MinSealFracSize) logger.Info("fracmanager's workers are stopped", zap.Int64("took_ms", time.Since(n).Milliseconds())) } @@ -95,8 +96,8 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client) (*FracManager, func return &fm, stop, nil } -func (fm *FracManager) Fractions() List { - return fm.lc.registry.AllFractions() +func (fm *FracManager) FractionsSnapshot() (List, ReleaseSnapshot) { + return fm.lc.registry.FractionsSnapshot() } func (fm *FracManager) Oldest() uint64 { @@ -116,7 +117,7 @@ func (fm *FracManager) Append(ctx context.Context, docs storage.DocBlock, metas return ctx.Err() default: // Try to append data to the currently active fraction - err := fm.lc.registry.Active().Append(docs, metas) + err := fm.lc.registry.Appender().Append(docs, metas) if err != nil { logger.Info("append fail", zap.Error(err)) if err == ErrFractionNotWritable { diff --git a/fracmanager/fracmanager_for_tests.go b/fracmanager/fracmanager_for_tests.go index ab7cd851..c4ec1cad 100644 --- a/fracmanager/fracmanager_for_tests.go +++ b/fracmanager/fracmanager_for_tests.go @@ -3,7 +3,7 @@ package fracmanager import "sync" func (fm *FracManager) WaitIdleForTests() { - fm.lc.registry.Active().WaitWriteIdle() + fm.lc.registry.Appender().WaitWriteIdle() } func (fm *FracManager) SealForcedForTests() { diff --git a/fracmanager/fracmanager_test.go b/fracmanager/fracmanager_test.go index 89437904..43afe89e 100644 --- a/fracmanager/fracmanager_test.go +++ b/fracmanager/fracmanager_test.go @@ -52,30 +52,33 @@ func TestSealingOnShutdown(t *testing.T) { cfg.MinSealFracSize = 0 // to ensure that the frac will not be sealed on shutdown cfg, fm, stop := setupFracManager(t, cfg) appendDocsToFracManager(t, fm, 10) - activeName := fm.Fractions()[0].Info().Name() + + fractions := fm.lc.registry.all.fractions + activeName := fractions[0].Info().Name() + stop() // second start cfg.MinSealFracSize = 1 // to ensure that the frac will be sealed on shutdown cfg, fm, stop = setupFracManager(t, cfg) - assert.Equal(t, 1, len(fm.Fractions()), "should have one fraction") - assert.Equal(t, activeName, fm.Fractions()[0].Info().Name(), "fraction should have the same name") - _, ok := fm.Fractions()[0].(*fractionProxy).impl.(*frac.Active) + fractions = fm.lc.registry.all.fractions + assert.Equal(t, 1, len(fractions), "should have one fraction") + assert.Equal(t, activeName, fractions[0].Info().Name(), "fraction should have the same name") + _, ok := fractions[0].(*frac.Active) assert.True(t, ok, "fraction should be active") - stop() // third start _, fm, stop = setupFracManager(t, cfg) - assert.Equal(t, 2, len(fm.Fractions()), "should have 2 fraction: new active and old sealed") - _, ok = fm.Fractions()[0].(*fractionProxy).impl.(*frac.Sealed) + fractions = fm.lc.registry.all.fractions + assert.Equal(t, 2, len(fractions), "should have 2 fraction: new active and old sealed") + _, ok = fractions[0].(*frac.Sealed) assert.True(t, ok, "first fraction should be sealed") - assert.Equal(t, activeName, fm.Fractions()[0].Info().Name(), "sealed fraction should have the same name") - assert.Equal(t, uint32(0), fm.Fractions()[1].Info().DocsTotal, "active fraction should be empty") - _, ok = fm.Fractions()[1].(*fractionProxy).impl.(*frac.Active) + assert.Equal(t, activeName, fractions[0].Info().Name(), "sealed fraction should have the same name") + assert.Equal(t, uint32(0), fractions[1].Info().DocsTotal, "active fraction should be empty") + _, ok = fractions[1].(*frac.Active) assert.True(t, ok, "new fraction should be active") - stop() } diff --git a/fracmanager/fracs_stats.go b/fracmanager/fracs_stats.go index 968b8b41..c70bbd37 100644 --- a/fracmanager/fracs_stats.go +++ b/fracmanager/fracs_stats.go @@ -95,3 +95,7 @@ func (s *registryStats) SetMetrics() { s.offloading.SetMetrics(dataSizeTotal, "offloading") s.remotes.SetMetrics(dataSizeTotal, "remotes") } + +func (s registryStats) TotalSizeOnDiskLocal() uint64 { + return s.sealing.totalSizeOnDisk + s.sealed.totalSizeOnDisk +} diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index e2915598..cb4a6eec 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -8,13 +8,16 @@ import ( "time" "github.com/oklog/ulid/v2" + "go.uber.org/zap" "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/frac/sealed/sealing" + "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/storage/s3" + "github.com/ozontech/seq-db/util" ) const fileBasePattern = "seq-db-" @@ -107,8 +110,11 @@ func (fp *fractionProvider) CreateActive() *frac.Active { // Seal converts an active fraction to a sealed one // Process includes sorting, indexing, and data optimization for reading -func (fp *fractionProvider) Seal(active *frac.Active) (*frac.Sealed, error) { - src, err := frac.NewActiveSealingSource(active, fp.config.SealParams) +func (fp *fractionProvider) Seal(a *frac.Active) (*frac.Sealed, error) { + sealsTotal.Inc() + now := time.Now() + + src, err := frac.NewActiveSealingSource(a, fp.config.SealParams) if err != nil { return nil, err } @@ -117,7 +123,18 @@ func (fp *fractionProvider) Seal(active *frac.Active) (*frac.Sealed, error) { return nil, err } - return fp.NewSealedPreloaded(active.BaseFileName, preloaded), nil + s := fp.NewSealedPreloaded(a.BaseFileName, preloaded) + + sealingTime := time.Since(now) + sealsDoneSeconds.Observe(sealingTime.Seconds()) + + logger.Info( + "fraction sealed", + zap.String("fraction", filepath.Base(s.BaseFileName)), + zap.Float64("time_spent_s", util.DurationToUnit(sealingTime, "s")), + ) + + return s, nil } // Offload uploads fraction to S3 storage and returns a remote fraction diff --git a/fracmanager/fraction_registry.go b/fracmanager/fraction_registry.go index 4f804930..6d31bce2 100644 --- a/fracmanager/fraction_registry.go +++ b/fracmanager/fraction_registry.go @@ -3,6 +3,7 @@ package fracmanager import ( "errors" "fmt" + "slices" "sync" "time" @@ -13,6 +14,24 @@ import ( "github.com/ozontech/seq-db/util" ) +// activeToSealed represents a fraction in transition from active to sealed state. +// This is a transient state where the fraction is no longer accepting writes +// but hasn't yet completed the sealing process. +// Fractions in this state are stored in the sealing queue and processed in FIFO order. +type activeToSealed struct { + active *refCountedActive + sealed *refCountedSealed +} + +// sealedToRemote represents a fraction in transition from sealed to remote state. +// This is a transient state where a local sealed fraction is being offloaded +// to remote storage. The remote field may be nil if the fraction doesn't require +// offloading or offloading hasn't completed yet. +type sealedToRemote struct { + sealed *refCountedSealed + remote *refCountedRemote +} + // fractionRegistry manages fraction queues at different lifecycle stages. // Tracks fractions through different stages: active → sealing → sealed → offloading → remote // Ensures correct state transitions while maintaining chronological order. @@ -22,18 +41,18 @@ type fractionRegistry struct { mu sync.RWMutex // main mutex for protecting registry state // lifecycle queues (FIFO order, oldest at lower indexes) - sealing []*activeProxy // fractions being sealed (0-5 typical) - sealed []*sealedProxy // local sealed fractions (can be thousands) - offloading []*sealedProxy // fractions being offloaded (0-5 typical) - remotes []*remoteProxy // offloaded fractions (can be thousands) + sealing []*activeToSealed // fractions being sealed (0-5 typical) + sealed []*refCountedSealed // local sealed fractions (can be thousands) + offloading []*sealedToRemote // fractions being offloaded (0-5 typical) + remotes []*refCountedRemote // offloaded fractions (can be thousands) stats registryStats // size statistics for monitoring oldestTotal uint64 // creation time of oldest fraction in all list including remote oldestLocal uint64 // creation time of oldest fraction in local or offloading queues - muAll sync.RWMutex // protects active, all, and oldestTotal fields - active *activeProxy // currently active writable fraction - all []frac.Fraction // all fractions in creation order (read-only view) + muAll sync.RWMutex // protects active, all, and oldestTotal fields + appender *syncAppender // currently active writable fraction + all *fractionsSnapshot // all fractions in creation order (read-only view) } // NewFractionRegistry creates and initializes a new fraction registry instance. @@ -44,49 +63,38 @@ func NewFractionRegistry(active *frac.Active, sealed []*frac.Sealed, remotes []* return nil, errors.New("active fraction must be specified") } - r := fractionRegistry{ - active: &activeProxy{ - proxy: &fractionProxy{impl: active}, - instance: active, - }, - } + r := fractionRegistry{appender: &syncAppender{active: &refCountedActive{frac: active}}} // initialize local sealed fractions for _, sealed := range sealed { r.stats.sealed.Add(sealed.Info()) - r.sealed = append(r.sealed, &sealedProxy{ - proxy: &fractionProxy{impl: sealed}, - instance: sealed, - }) + r.sealed = append(r.sealed, &refCountedSealed{frac: sealed}) } // initialize remote fractions for _, remote := range remotes { r.stats.remotes.Add(remote.Info()) - r.remotes = append(r.remotes, &remoteProxy{ - proxy: &fractionProxy{impl: remote}, - instance: remote, - }) + r.remotes = append(r.remotes, &refCountedRemote{frac: remote}) } r.updateOldestLocal() - r.rebuildAllFractions() + r.rebuildSnapshot() return &r, nil } -// Active returns the currently active writable fraction. -func (r *fractionRegistry) Active() *activeProxy { +// Appender returns the currently active writable fraction. +func (r *fractionRegistry) Appender() *syncAppender { r.muAll.RLock() defer r.muAll.RUnlock() - return r.active + return r.appender } -// AllFractions returns a read-only view of all fractions in creation order. -func (r *fractionRegistry) AllFractions() []frac.Fraction { +// FractionsSnapshot returns a read-only view of all fractions in creation order. +func (r *fractionRegistry) FractionsSnapshot() ([]frac.Fraction, ReleaseSnapshot) { r.muAll.RLock() defer r.muAll.RUnlock() - return r.all + return r.all.Retain() } // Stats returns current size statistics of the registry. @@ -94,7 +102,7 @@ func (r *fractionRegistry) Stats() registryStats { r.mu.RLock() defer r.mu.RUnlock() - r.stats.active.Set(r.active.instance.Info()) + r.stats.active.Set(r.appender.active.frac.Info()) return r.stats } @@ -112,30 +120,41 @@ func (r *fractionRegistry) OldestLocal() uint64 { return r.oldestLocal } +type ActiveProvider interface { + CreateActive() *frac.Active +} + // RotateIfFull completes the current active fraction and starts a new one. // Moves previous active fraction to sealing queue. // Updates statistics and maintains chronological order. -// Should be called when creating a new fraction. -func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *activeProxy) (*activeProxy, func(), error) { +// Should be called when the current active fraction reaches size limit and needs to be rotated +func (r *fractionRegistry) RotateIfFull(maxSize uint64, ap ActiveProvider) (*activeToSealed, func(), error) { r.mu.Lock() defer r.mu.Unlock() - if r.active.instance.Info().DocsOnDisk <= maxSize { + if r.appender.active.frac.Info().DocsOnDisk <= maxSize { return nil, nil, nil } - old := r.active - r.sealing = append(r.sealing, old) - r.addActive(newActive()) + old := r.appender + + sealing := &activeToSealed{active: old.active} + r.sealing = append(r.sealing, sealing) + + r.muAll.Lock() + r.appender = &syncAppender{active: &refCountedActive{frac: ap.CreateActive()}} + r.muAll.Unlock() + + r.rebuildSnapshot() if err := old.Finalize(); err != nil { - return old, nil, err + return nil, nil, err } - curInfo := old.instance.Info() + curInfo := old.active.frac.Info() r.stats.sealing.Add(curInfo) - r.active.Suspend(old.Suspended()) + r.appender.Suspend(old.Suspended()) wg := sync.WaitGroup{} wg.Add(1) @@ -145,7 +164,7 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active defer wg.Done() old.WaitWriteIdle() // can be long enough - finalInfo := old.instance.Info() + finalInfo := old.active.frac.Info() r.mu.Lock() defer r.mu.Unlock() @@ -156,14 +175,14 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active r.stats.sealing.Add(finalInfo) }() - return old, wg.Wait, nil + return sealing, wg.Wait, nil } func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { r.mu.Lock() defer r.mu.Unlock() - suspended := r.active.Suspended() + suspended := r.appender.Suspended() if maxQueue > 0 && r.stats.sealing.count >= int(maxQueue) { if !suspended { @@ -171,7 +190,7 @@ func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { zap.String("reason", "sealing queue size exceeded"), zap.Uint64("limit", maxQueue), zap.Int("queue_size", r.stats.sealing.count)) - r.active.Suspend(true) + r.appender.Suspend(true) } return } @@ -184,7 +203,7 @@ func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { zap.String("reason", "occupied space limit exceeded"), zap.Float64("queue_size_limit_gb", util.Float64ToPrec(util.SizeToUnit(maxSize, "gb"), 2)), zap.Float64("occupied_space_gb", util.Float64ToPrec(util.SizeToUnit(du, "gb"), 2))) - r.active.Suspend(true) + r.appender.Suspend(true) } return } @@ -195,59 +214,69 @@ func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { zap.Float64("occupied_space_gb", util.Float64ToPrec(util.SizeToUnit(du, "gb"), 2)), zap.Uint64("sealing_queue_size_limit", maxQueue), zap.Int("queue_size", r.stats.sealing.count)) - r.active.Suspend(false) + r.appender.Suspend(false) } } func (r *fractionRegistry) diskUsage() uint64 { - return r.active.instance.Info().FullSize() + + return r.appender.active.frac.Info().FullSize() + r.stats.sealed.totalSizeOnDisk + r.stats.sealing.totalSizeOnDisk + r.stats.offloading.totalSizeOnDisk } -// addActive sets a new active fraction and updates the complete fractions list. -func (r *fractionRegistry) addActive(a *activeProxy) { - r.muAll.Lock() - defer r.muAll.Unlock() +// EvictLocalForDelete removes oldest local fractions to free disk space. +// Returns evicted fractions or error if insufficient space is released. +func (r *fractionRegistry) EvictLocalForDelete(sizeLimit uint64) (evicted []*refCountedSealed, err error) { + r.mu.Lock() + defer r.mu.Unlock() - r.active = a - r.all = append(r.all, a.proxy) -} + if evicted, err = r.evictLocal(sizeLimit); err != nil { + return nil, err + } -// trimAll removes the oldest fractions from the complete fractions list. -// Used when fractions are evicted or deleted from the system. -func (r *fractionRegistry) trimAll(count int) { - r.muAll.Lock() - defer r.muAll.Unlock() + r.rebuildSnapshot() + r.updateOldestLocal() // oldest local can be changed here - r.all = r.all[count:] - r.updateOldestTotal() + return evicted, nil } -// EvictLocal removes oldest local fractions to free disk space. -// If shouldOffload is true, moves fractions to offloading queue instead of deleting. +// EvictLocalForOffload removes oldest local fractions to moves it to offloading queue. // Returns evicted fractions or error if insufficient space is released. -func (r *fractionRegistry) EvictLocal(shouldOffload bool, sizeLimit uint64) ([]*sealedProxy, error) { +func (r *fractionRegistry) EvictLocalForOffload(sizeLimit uint64) ([]*sealedToRemote, error) { r.mu.Lock() defer r.mu.Unlock() + evicted, err := r.evictLocal(sizeLimit) + if err != nil { + return nil, err + } + + pos := len(r.offloading) + r.offloading = slices.Grow(r.offloading, len(evicted)) + for _, sealed := range evicted { + r.offloading = append(r.offloading, &sealedToRemote{sealed: sealed}) + r.stats.offloading.Add(sealed.frac.Info()) + } + + return r.offloading[pos:], nil +} + +func (r *fractionRegistry) evictLocal(sizeLimit uint64) ([]*refCountedSealed, error) { var ( count int releasingSize uint64 ) // calculate total used disk space - totalUsedSize := r.stats.sealed.totalSizeOnDisk + - r.stats.sealing.totalSizeOnDisk + - r.active.instance.Info().FullSize() + totalUsedSize := r.stats.TotalSizeOnDiskLocal() + r.appender.active.frac.Info().FullSize() // determine how many oldest fractions need to be removed to meet size limit for _, item := range r.sealed { if totalUsedSize-releasingSize <= sizeLimit { break } - info := item.instance.Info() + info := item.frac.Info() releasingSize += info.FullSize() r.stats.sealed.Sub(info) count++ @@ -264,24 +293,13 @@ func (r *fractionRegistry) EvictLocal(shouldOffload bool, sizeLimit uint64) ([]* evicted := r.sealed[:count] r.sealed = r.sealed[count:] - // either offload or completely remove the fractions - if shouldOffload { - for _, item := range evicted { - r.offloading = append(r.offloading, item) - r.stats.offloading.Add(item.instance.Info()) - } - } else { - r.trimAll(count) // permanently remove - r.updateOldestLocal() // oldest local can be changed here - } - return evicted, nil } // EvictRemote removes oldest remote fractions based on retention policy. // Fractions older than retention period are permanently deleted. // Returns removed fractions or empty slice if nothing to remove. -func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy { +func (r *fractionRegistry) EvictRemote(retention time.Duration) []*refCountedRemote { if retention == 0 { return nil } @@ -292,7 +310,7 @@ func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy { count := 0 // find fractions older than retention period for _, item := range r.remotes { - info := item.instance.Info() + info := item.frac.Info() if time.Since(time.UnixMilli(int64(info.CreationTime))) <= retention { break // stop at first fraction within retention } @@ -302,7 +320,7 @@ func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy { evicted := r.remotes[:count] r.remotes = r.remotes[count:] - r.trimAll(count) // remove from complete list + r.rebuildSnapshot() return evicted } @@ -310,7 +328,7 @@ func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy { // EvictOverflowed removes oldest fractions from offloading queue when it exceeds size limit. // Selects fractions that haven't finished offloading yet to minimize data loss. // Used when offloading queue grows too large due to slow remote storage performance. -func (r *fractionRegistry) EvictOverflowed(sizeLimit uint64) []*sealedProxy { +func (r *fractionRegistry) EvictOverflowed(sizeLimit uint64) (evicted []*refCountedSealed) { if sizeLimit == 0 { return nil } @@ -324,7 +342,6 @@ func (r *fractionRegistry) EvictOverflowed(sizeLimit uint64) []*sealedProxy { } count := 0 - evicted := []*sealedProxy{} // filter fractions for _, item := range r.offloading { // keep items that are within limits or already offloaded @@ -333,23 +350,23 @@ func (r *fractionRegistry) EvictOverflowed(sizeLimit uint64) []*sealedProxy { count++ continue } - evicted = append(evicted, item) - r.stats.offloading.Sub(item.instance.Info()) + evicted = append(evicted, item.sealed) + r.stats.offloading.Sub(item.sealed.frac.Info()) } r.offloading = r.offloading[:count] - r.rebuildAllFractions() + r.rebuildSnapshot() return evicted } // PromoteToSealed moves fractions from sealing to local queue when sealing completes. // Maintains strict ordering - younger fractions wait for older ones to seal first. -func (r *fractionRegistry) PromoteToSealed(active *activeProxy, sealed *frac.Sealed) { +func (r *fractionRegistry) PromoteToSealed(active *activeToSealed, sealed *frac.Sealed) { r.mu.Lock() defer r.mu.Unlock() - active.sealed = sealed + active.sealed = &refCountedSealed{frac: sealed} promotedCount := 0 // process sealing queue in order, promoting completed fractions @@ -358,30 +375,31 @@ func (r *fractionRegistry) PromoteToSealed(active *activeProxy, sealed *frac.Sea break // maintain order - wait for previous fractions to complete } promotedCount++ - r.sealed = append(r.sealed, &sealedProxy{ - proxy: item.proxy, - instance: item.sealed, - }) - r.stats.sealed.Add(item.sealed.Info()) - r.stats.sealing.Sub(item.instance.Info()) + r.sealed = append(r.sealed, item.sealed) + r.stats.sealed.Add(item.sealed.frac.Info()) + r.stats.sealing.Sub(item.active.frac.Info()) + } + + if promotedCount > 0 { + // remove promoted fractions from sealing queue and rebuild snapshot + r.sealing = r.sealing[promotedCount:] + r.rebuildSnapshot() } - // remove promoted fractions from sealing queue - r.sealing = r.sealing[promotedCount:] } // PromoteToRemote moves fractions from offloading to remote queue when offloading completes. // Special case: handles fractions that don't require offloading (remote == nil). // Maintains strict ordering - younger fractions wait for older ones to offload. -func (r *fractionRegistry) PromoteToRemote(sealed *sealedProxy, remote *frac.Remote) { +func (r *fractionRegistry) PromoteToRemote(sealed *sealedToRemote, remote *frac.Remote) { r.mu.Lock() defer r.mu.Unlock() - sealed.remote = remote - - // special case: remote == nil means fraction doesn't require offloading if remote == nil { + // special case: remote == nil means fraction doesn't require offloading r.removeFromOffloading(sealed) + } else { + sealed.remote = &refCountedRemote{frac: remote} } promotedCount := 0 @@ -391,28 +409,26 @@ func (r *fractionRegistry) PromoteToRemote(sealed *sealedProxy, remote *frac.Rem break // maintain order - wait for previous fractions to complete } promotedCount++ - r.remotes = append(r.remotes, &remoteProxy{ - proxy: item.proxy, - instance: item.remote, - }) + r.remotes = append(r.remotes, item.remote) - r.stats.remotes.Add(item.remote.Info()) - r.stats.offloading.Sub(item.instance.Info()) + r.stats.remotes.Add(item.remote.frac.Info()) + r.stats.offloading.Sub(item.sealed.frac.Info()) } if promotedCount > 0 { // remove promoted fractions from offloading queue r.offloading = r.offloading[promotedCount:] r.updateOldestLocal() + r.rebuildSnapshot() } } // removeFromOffloading removes a specific fraction from offloading queue. // O(n) operation that rebuilds the all fractions list. -func (r *fractionRegistry) removeFromOffloading(sealed *sealedProxy) { +func (r *fractionRegistry) removeFromOffloading(sealed *sealedToRemote) { count := 0 // filter out the target fraction for _, item := range r.offloading { - if sealed != item { + if sealed.sealed != item.sealed { r.offloading[count] = item count++ } @@ -423,35 +439,46 @@ func (r *fractionRegistry) removeFromOffloading(sealed *sealedProxy) { } r.offloading = r.offloading[:count] - r.stats.offloading.Sub(sealed.instance.Info()) + r.stats.offloading.Sub(sealed.sealed.frac.Info()) - // oldest local can be changed here r.updateOldestLocal() - - // rebuild complete list since we modified the middle of the queue - r.rebuildAllFractions() + r.rebuildSnapshot() } -// rebuildAllFractions reconstructs the all fractions list in correct chronological order. +// rebuildSnapshot reconstructs the all fractions list in correct chronological order. // Order: remote (oldest) → offloading → sealed → sealing → active (newest) // Expensive O(n) operation used when direct list modification is insufficient. -func (r *fractionRegistry) rebuildAllFractions() { - all := make([]frac.Fraction, 0, len(r.all)) +func (r *fractionRegistry) rebuildSnapshot() { + capacity := 1 + len(r.remotes) + len(r.offloading) + len(r.sealed) + len(r.sealing) + + all := &fractionsSnapshot{ + counters: make([]RefCounter, 0, capacity), + fractions: make([]frac.Fraction, 0, capacity), + } - // collect fractions in correct chronological order: from oldest (remote) to newest (active) - for _, remote := range r.remotes { - all = append(all, remote.proxy) + // Remote fractions (oldest) + for _, r := range r.remotes { + all.counters = append(all.counters, r) + all.fractions = append(all.fractions, r.frac) } - for _, offloaded := range r.offloading { - all = append(all, offloaded.proxy) + // Offloading fractions + for _, o := range r.offloading { + all.counters = append(all.counters, o.sealed) + all.fractions = append(all.fractions, o.sealed.frac) } - for _, sealed := range r.sealed { - all = append(all, sealed.proxy) + // Local fractions + for _, l := range r.sealed { + all.counters = append(all.counters, l) + all.fractions = append(all.fractions, l.frac) } - for _, active := range r.sealing { - all = append(all, active.proxy) + // Sealing fractions + for _, s := range r.sealing { + all.counters = append(all.counters, s.active) + all.fractions = append(all.fractions, s.active.frac) } - all = append(all, r.active.proxy) + // Active fraction (newest) + all.counters = append(all.counters, r.appender.active) + all.fractions = append(all.fractions, r.appender.active.frac) r.muAll.Lock() defer r.muAll.Unlock() @@ -463,7 +490,7 @@ func (r *fractionRegistry) rebuildAllFractions() { // updateOldestTotal recalculates the creation time of the oldest fraction. // Called after modifications of the complete fractions list. func (r *fractionRegistry) updateOldestTotal() { - r.oldestTotal = r.all[0].Info().CreationTime + r.oldestTotal = r.all.fractions[0].Info().CreationTime } // updateOldestLocal recalculates the creation time of the oldest local fraction. @@ -471,12 +498,12 @@ func (r *fractionRegistry) updateOldestTotal() { // Called after modifications func (r *fractionRegistry) updateOldestLocal() { if len(r.offloading) > 0 { - r.oldestLocal = r.offloading[0].proxy.Info().CreationTime + r.oldestLocal = r.offloading[0].sealed.frac.Info().CreationTime } else if len(r.sealed) > 0 { - r.oldestLocal = r.sealed[0].proxy.Info().CreationTime + r.oldestLocal = r.sealed[0].frac.Info().CreationTime } else if len(r.sealing) > 0 { - r.oldestLocal = r.sealing[0].proxy.Info().CreationTime + r.oldestLocal = r.sealing[0].active.frac.Info().CreationTime } else { - r.oldestLocal = r.active.proxy.Info().CreationTime + r.oldestLocal = r.appender.active.frac.Info().CreationTime } } diff --git a/fracmanager/fractions_snapshot.go b/fracmanager/fractions_snapshot.go new file mode 100644 index 00000000..764191be --- /dev/null +++ b/fracmanager/fractions_snapshot.go @@ -0,0 +1,84 @@ +package fracmanager + +import ( + "sync" + + "github.com/ozontech/seq-db/frac" +) + +// ReleaseSnapshot is a cleanup function that releases the acquired snapshot. +type ReleaseSnapshot func() + +// RefCounter provides reference counting capability. +type RefCounter interface { + Inc() + Dec() +} + +// fractionsSnapshot represents a point-in-time view of multiple fractions +// with associated reference counters to keep them alive. +type fractionsSnapshot struct { + counters []RefCounter // Reference counters to keep fractions alive + fractions []frac.Fraction // The actual fractions in chronological order +} + +// Retain returns the fractions and a release function. +// Caller must call the release function when done to decrement reference counts. +func (fs *fractionsSnapshot) Retain() ([]frac.Fraction, ReleaseSnapshot) { + for _, c := range fs.counters { + c.Inc() + } + return fs.fractions, fs.release +} + +// release decrements all reference counters. +func (fs *fractionsSnapshot) release() { + for _, c := range fs.counters { + c.Dec() + } +} + +type refCounterWg struct { + wg sync.WaitGroup +} + +func (p *refCounterWg) Inc() { p.wg.Add(1) } + +func (p *refCounterWg) Dec() { p.wg.Done() } + +// refCountedActive wraps frac.Active with reference counting. +// Destroy releases the underlying Active after all references are gone. +type refCountedActive struct { + refCounterWg + frac *frac.Active +} + +// Destroy waits for all references to be released and then releases the Active. +func (p *refCountedActive) Destroy() { + p.wg.Wait() + p.frac.Release() +} + +// refCountedSealed wraps frac.Sealed with reference counting. +type refCountedSealed struct { + refCounterWg + frac *frac.Sealed +} + +// Destroy waits for all references to be released and then destroys the Sealed. +func (p *refCountedSealed) Destroy() { + p.wg.Wait() + p.frac.Suicide() +} + +// refCountedRemote wraps frac.Remote with reference counting. +type refCountedRemote struct { + refCounterWg + frac *frac.Remote +} + +// Destroy waits for all references to be released and then destroys the Remote. +func (p *refCountedRemote) Destroy() { + p.wg.Wait() + p.frac.Suicide() +} diff --git a/fracmanager/lifecycle_manager.go b/fracmanager/lifecycle_manager.go index cd1c4bd3..799967cd 100644 --- a/fracmanager/lifecycle_manager.go +++ b/fracmanager/lifecycle_manager.go @@ -2,7 +2,6 @@ package fracmanager import ( "context" - "path/filepath" "sync" "time" @@ -10,7 +9,6 @@ import ( "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/logger" - "github.com/ozontech/seq-db/util" ) // lifecycleManager manages the complete lifecycle of fractions. @@ -23,7 +21,7 @@ type lifecycleManager struct { registry *fractionRegistry // fraction state registry tasks *TaskManager // Background offloading tasks - sealingWg sync.WaitGroup + sealingWg sync.WaitGroup // todo: get rid after removing SealAll in tests } func newLifecycleManager( @@ -67,37 +65,10 @@ func (lc *lifecycleManager) SyncInfoCache() { } } -// seal converts an active fraction to sealed state. -// It freezes writes, waits for pending operations, then seals the fraction. -func (lc *lifecycleManager) seal(active *activeProxy) error { - sealsTotal.Inc() - now := time.Now() - sealed, err := lc.provider.Seal(active.instance) - if err != nil { - return err - } - sealingTime := time.Since(now) - sealsDoneSeconds.Observe(sealingTime.Seconds()) - - logger.Info( - "fraction sealed", - zap.String("fraction", filepath.Base(sealed.BaseFileName)), - zap.Float64("time_spent_s", util.DurationToUnit(sealingTime, "s")), - ) - - lc.infoCache.Add(sealed.Info()) - lc.registry.PromoteToSealed(active, sealed) - active.proxy.Redirect(sealed) - active.instance.Release() - return nil -} - // rotate checks if active fraction needs rotation based on size limit. // Creates new active fraction and starts sealing the previous one. func (lc *lifecycleManager) rotate(maxSize uint64, wg *sync.WaitGroup) { - activeToSeal, waitBeforeSealing, err := lc.registry.RotateIfFull(maxSize, func() *activeProxy { - return newActiveProxy(lc.provider.CreateActive()) - }) + activeToSeal, waitBeforeSealing, err := lc.registry.RotateIfFull(maxSize, lc.provider) if err != nil { logger.Fatal("active fraction rotation error", zap.Error(err)) } @@ -112,37 +83,39 @@ func (lc *lifecycleManager) rotate(maxSize uint64, wg *sync.WaitGroup) { defer lc.sealingWg.Done() waitBeforeSealing() - if err := lc.seal(activeToSeal); err != nil { + sealed, err := lc.provider.Seal(activeToSeal.active.frac) + if err != nil { logger.Fatal("sealing error", zap.Error(err)) } + + lc.infoCache.Add(sealed.Info()) + lc.registry.PromoteToSealed(activeToSeal, sealed) + activeToSeal.active.Destroy() }() } // offloadLocal starts offloading of local fractions to remote storage. // Selects fractions based on disk space usage and retention policy. func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, retryDelay time.Duration, wg *sync.WaitGroup) { - toOffload, err := lc.registry.EvictLocal(true, sizeLimit) + toOffload, err := lc.registry.EvictLocalForOffload(sizeLimit) if err != nil { logger.Fatal("error releasing old fractions:", zap.Error(err)) } - for _, sealed := range toOffload { + for _, sealedToRemote := range toOffload { wg.Add(1) - _, err := lc.tasks.Run(sealed.instance.BaseFileName, ctx, func(ctx context.Context) { + _, err := lc.tasks.Run(sealedToRemote.sealed.frac.BaseFileName, ctx, func(ctx context.Context) { defer wg.Done() - remote := lc.offloadWithRetry(ctx, sealed.instance, retryDelay) + remote := lc.offloadWithRetry(ctx, sealedToRemote.sealed.frac, retryDelay) - lc.registry.PromoteToRemote(sealed, remote) + lc.registry.PromoteToRemote(sealedToRemote, remote) if remote == nil { - sealed.proxy.Redirect(emptyFraction{}) - lc.infoCache.Remove(sealed.instance.Info().Name()) - } else { - sealed.proxy.Redirect(remote) + lc.infoCache.Remove(sealedToRemote.sealed.frac.Info().Name()) } // free up local resources - sealed.instance.Suicide() + sealedToRemote.sealed.Destroy() maintenanceTruncateTotal.Add(1) }) if err != nil { @@ -209,20 +182,19 @@ func (lc *lifecycleManager) tryOffload(ctx context.Context, sealed *frac.Sealed) // cleanRemote deletes outdated remote fractions based on retention policy. func (lc *lifecycleManager) cleanRemote(retention time.Duration, wg *sync.WaitGroup) { toDelete := lc.registry.EvictRemote(retention) - wg.Add(1) - go func() { - defer wg.Done() - for _, remote := range toDelete { - remote.proxy.Redirect(emptyFraction{}) - lc.infoCache.Remove(remote.instance.Info().Name()) - remote.instance.Suicide() - } - }() + wg.Add(len(toDelete)) + for _, remote := range toDelete { + go func() { + defer wg.Done() + lc.infoCache.Remove(remote.frac.Info().Name()) + remote.Destroy() + }() + } } // cleanLocal deletes outdated local fractions when offloading is disabled. func (lc *lifecycleManager) cleanLocal(sizeLimit uint64, wg *sync.WaitGroup) { - toDelete, err := lc.registry.EvictLocal(false, sizeLimit) + toDelete, err := lc.registry.EvictLocalForDelete(sizeLimit) if err != nil { logger.Fatal("error releasing old fractions:", zap.Error(err)) } @@ -232,16 +204,15 @@ func (lc *lifecycleManager) cleanLocal(sizeLimit uint64, wg *sync.WaitGroup) { } } - wg.Add(1) - go func() { - defer wg.Done() - for _, sealed := range toDelete { - sealed.proxy.Redirect(emptyFraction{}) - lc.infoCache.Remove(sealed.instance.Info().Name()) - sealed.instance.Suicide() + wg.Add(len(toDelete)) + for _, sealed := range toDelete { + go func() { + defer wg.Done() + lc.infoCache.Remove(sealed.frac.Info().Name()) + sealed.Destroy() maintenanceTruncateTotal.Add(1) - } - }() + }() + } } // updateOldestMetric updates the prometheus metric with oldest fraction timestamp. @@ -254,13 +225,13 @@ func (lc *lifecycleManager) updateOldestMetric() { // Stops ongoing offloading tasks and cleans up both local and remote resources. func (lc *lifecycleManager) removeOverflowed(sizeLimit uint64, wg *sync.WaitGroup) { evicted := lc.registry.EvictOverflowed(sizeLimit) - for _, item := range evicted { + for _, sealed := range evicted { wg.Add(1) go func() { defer wg.Done() // Cancel the offloading task - this operation may take significant time // hence executed in a separate goroutine to avoid blocking - lc.tasks.Cancel(item.instance.BaseFileName) + lc.tasks.Cancel(sealed.frac.BaseFileName) }() } } diff --git a/fracmanager/lifecycle_manager_test.go b/fracmanager/lifecycle_manager_test.go index abd180e2..3510e346 100644 --- a/fracmanager/lifecycle_manager_test.go +++ b/fracmanager/lifecycle_manager_test.go @@ -1,14 +1,20 @@ package fracmanager import ( + "math" "math/rand" "path/filepath" "sync" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/frac/processor" + "github.com/ozontech/seq-db/parser" + "github.com/ozontech/seq-db/seq" ) func setupLifecycle(t testing.TB, cfg *Config) (*lifecycleManager, func()) { @@ -31,21 +37,18 @@ func TestFracInfoCache(t *testing.T) { lc, tearDown := setupLifecycle(t, nil) defer tearDown() - var total uint64 - fillRotateAndCheck := func(names map[string]struct{}) { - active := lc.registry.Active() - appendDocsToActive(t, active.instance, 10+rand.Intn(10)) + appender := lc.registry.Appender() + appendDocsToActive(t, appender.active.frac, 10+rand.Intn(10)) wg := sync.WaitGroup{} lc.rotate(0, &wg) wg.Wait() - info := active.proxy.Info() + info := appender.active.frac.Info() _, ok := lc.infoCache.Get(info.Name()) assert.True(t, ok) - total += info.FullSize() names[info.Name()] = struct{}{} } @@ -53,12 +56,13 @@ func TestFracInfoCache(t *testing.T) { for range 10 { fillRotateAndCheck(first) } - halfSize := total + halfSize := lc.registry.Stats().TotalSizeOnDiskLocal() second := map[string]struct{}{} for range 10 { fillRotateAndCheck(second) } + total := lc.registry.Stats().TotalSizeOnDiskLocal() wg := sync.WaitGroup{} lc.cleanLocal(total-halfSize, &wg) @@ -80,18 +84,14 @@ func TestCapacityExceeded(t *testing.T) { defer tearDown() const fracsCount = 10 - var total uint64 fillAndRotate := func() { - active := lc.registry.Active() - appendDocsToActive(t, active.instance, 10+rand.Intn(10)) + appender := lc.registry.Appender() + appendDocsToActive(t, appender.active.frac, 10+rand.Intn(10)) wg := sync.WaitGroup{} lc.rotate(0, &wg) wg.Wait() - - info := active.proxy.Info() - total += info.FullSize() } assert.False(t, lc.flags.IsCapacityExceeded(), "expect data dir is empty") @@ -102,6 +102,8 @@ func TestCapacityExceeded(t *testing.T) { } assert.False(t, lc.flags.IsCapacityExceeded(), "there should be no deletions and the flag is false") + total := lc.registry.Stats().TotalSizeOnDiskLocal() + wg := sync.WaitGroup{} lc.cleanLocal(total, &wg) wg.Wait() @@ -121,20 +123,15 @@ func TestOldestMetrics(t *testing.T) { defer tearDown() const fracsCount = 10 - var total uint64 - fillAndRotate := func() { - active := lc.registry.Active() - appendDocsToActive(t, active.instance, 10+rand.Intn(10)) + appender := lc.registry.Appender() + appendDocsToActive(t, appender.active.frac, 10+rand.Intn(10)) wg := sync.WaitGroup{} lc.rotate(0, &wg) wg.Wait() - - info := active.proxy.Info() - total += info.FullSize() } - firstFracTime := lc.registry.Active().proxy.Info().CreationTime + firstFracTime := lc.registry.Appender().active.frac.Info().CreationTime for range fracsCount { fillAndRotate() } @@ -143,12 +140,15 @@ func TestOldestMetrics(t *testing.T) { assert.Equal(t, firstFracTime, lc.registry.OldestTotal(), "should point to the very first fraction when all data is local") assert.Equal(t, firstFracTime, lc.registry.OldestLocal(), "should point to the first fraction when nothing is offloaded") - halfSize := total - halfwayFracTime := lc.registry.Active().proxy.Info().CreationTime + halfSize := lc.registry.Stats().TotalSizeOnDiskLocal() + + halfwayFracTime := lc.registry.Appender().active.frac.Info().CreationTime for range fracsCount { fillAndRotate() } + total := lc.registry.Stats().TotalSizeOnDiskLocal() + wg := sync.WaitGroup{} lc.offloadLocal(t.Context(), total-halfSize, 0, &wg) wg.Wait() @@ -158,3 +158,80 @@ func TestOldestMetrics(t *testing.T) { assert.Equal(t, firstFracTime, lc.registry.OldestTotal(), "should still reference the first fraction after offload") assert.Equal(t, halfwayFracTime, lc.registry.OldestLocal(), "should point to the oldest remaining local fraction after offload") } + +func TestPendingDestroy(t *testing.T) { + lc, tearDown := setupLifecycle(t, nil) + defer tearDown() + + const ( + fracsCount = 10 + docsPerFrac = 10 + ) + // appending docs to `fracsCount` fractions where the last is active and the rest are sealed + wg := sync.WaitGroup{} + for range fracsCount - 1 { + appendDocsToActive(t, lc.registry.Appender().active.frac, docsPerFrac) + lc.rotate(0, &wg) + } + appendDocsToActive(t, lc.registry.Appender().active.frac, docsPerFrac) + + // wait sealing complete + wg.Wait() + + // take all fracs to search + fractions1, release1 := lc.registry.FractionsSnapshot() + + // delete all sealing fracs + lc.cleanLocal(lc.registry.Appender().active.frac.Info().FullSize(), &wg) + + var ( + beforeRelease time.Time + afterCleanup time.Time + ) + + cleanup := sync.WaitGroup{} + cleanup.Add(1) + go func() { + // cleanup is pending, so run it in a goroutine + // waiting for cleanup to finish + defer cleanup.Done() + wg.Wait() + afterCleanup = time.Now() + }() + + queryAst, err := parser.ParseSeqQL("*", seq.Mapping{}) + require.NoError(t, err, "failed to parse query") + params := processor.SearchParams{ + AST: queryAst.Root, + From: seq.MID(0), + To: seq.MID(math.MaxUint64), + Limit: math.MaxInt32, + } + + for _, f := range fractions1 { + qpr, err := f.Search(t.Context(), params) + assert.NoError(t, err, "failed to search") + assert.Equal(t, docsPerFrac, len(qpr.IDs)) + } + + beforeRelease = time.Now() + release1() + + cleanup.Wait() + assert.Less(t, beforeRelease, afterCleanup, "we expect cleanup to happen after release") + + fractions2, release2 := lc.registry.FractionsSnapshot() + + assert.Len(t, fractions2, 1, "only one active fraction should remain") + singleName := fractions2[0].Info().Name() + + for _, f := range fractions1 { + if f.Info().Name() == singleName { + continue + } + assert.Panics(t, func() { + _, _ = f.Search(t.Context(), params) + }, "searching by destroyed faction is expected to trigger a panic") + } + release2() +} diff --git a/fracmanager/proxy_frac.go b/fracmanager/proxy_frac.go deleted file mode 100644 index ffc31854..00000000 --- a/fracmanager/proxy_frac.go +++ /dev/null @@ -1,193 +0,0 @@ -package fracmanager - -import ( - "context" - "errors" - "math" - "sync" - "time" - - "go.uber.org/zap" - - "github.com/ozontech/seq-db/frac" - "github.com/ozontech/seq-db/frac/common" - "github.com/ozontech/seq-db/frac/processor" - "github.com/ozontech/seq-db/logger" - "github.com/ozontech/seq-db/metric" - "github.com/ozontech/seq-db/seq" - "github.com/ozontech/seq-db/util" -) - -var ( - _ frac.Fraction = (*fractionProxy)(nil) - _ frac.Fraction = (*emptyFraction)(nil) - - ErrFractionNotWritable = errors.New("fraction is not writable") - ErrFractionSuspended = errors.New("write operations temporarily suspended - database capacity exceeded") -) - -// fractionProxy provides thread-safe access to a fraction with atomic replacement -// Used to switch fraction implementations (active → sealed → remote) without blocking readers. -// Lifecycle: Created for each fraction, persists through state transitions. -type fractionProxy struct { - mu sync.RWMutex - impl frac.Fraction // Current fraction implementation -} - -func (p *fractionProxy) Redirect(f frac.Fraction) { - p.mu.Lock() - defer p.mu.Unlock() - p.impl = f -} - -func (p *fractionProxy) Info() *common.Info { - p.mu.RLock() - defer p.mu.RUnlock() - return p.impl.Info() -} - -func (p *fractionProxy) IsIntersecting(from, to seq.MID) bool { - p.mu.RLock() - defer p.mu.RUnlock() - return p.impl.IsIntersecting(from, to) -} - -func (p *fractionProxy) Contains(mid seq.MID) bool { - p.mu.RLock() - defer p.mu.RUnlock() - return p.impl.Contains(mid) -} - -func (p *fractionProxy) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) { - p.mu.RLock() - defer p.mu.RUnlock() - return p.impl.Fetch(ctx, ids) -} - -func (p *fractionProxy) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) { - p.mu.RLock() - defer p.mu.RUnlock() - return p.impl.Search(ctx, params) -} - -// activeProxy manages an active (writable) fraction -// Tracks pending write operations and provides freeze capability. -// Lifecycle: Created when fraction becomes active, destroyed after sealing. -type activeProxy struct { - proxy *fractionProxy // Thread-safe fraction access - instance *frac.Active // Actual active fraction instance - sealed *frac.Sealed // Sealed version (set after sealing) - - mu sync.RWMutex // Protects readonly state - wg sync.WaitGroup // Tracks pending write operations - - finalized bool // Whether fraction is frozen for writes - suspended bool // Temporarily suspended for writes -} - -func newActiveProxy(active *frac.Active) *activeProxy { - return &activeProxy{ - proxy: &fractionProxy{impl: active}, - instance: active, - } -} - -// Append adds documents to the active fraction -func (p *activeProxy) Append(docs, meta []byte) error { - p.mu.RLock() - if p.finalized { - p.mu.RUnlock() - return ErrFractionNotWritable - } - if p.suspended { - p.mu.RUnlock() - return ErrFractionSuspended - } - p.wg.Add(1) // Important: wg.Add() inside lock to prevent race with WaitWriteIdle() - p.mu.RUnlock() - - return p.instance.Append(docs, meta, &p.wg) -} - -// WaitWriteIdle waits for all pending write operations to complete -// Used before sealing to ensure data consistency. -func (p *activeProxy) WaitWriteIdle() { - start := time.Now() - logger.Info("waiting fraction to stop write...", zap.String("name", p.instance.BaseFileName)) - p.wg.Wait() - waitTime := util.DurationToUnit(time.Since(start), "s") - logger.Info("write is stopped", - zap.String("name", p.instance.BaseFileName), - zap.Float64("time_wait_s", waitTime)) -} - -func (p *activeProxy) Suspended() bool { - p.mu.Lock() - defer p.mu.Unlock() - - return p.suspended -} - -func (p *activeProxy) Suspend(value bool) { - p.mu.Lock() - p.suspended = value - p.mu.Unlock() -} - -// Finalize marks the fraction as read-only and prevents new writes from starting after finalize. -func (p *activeProxy) Finalize() error { - p.mu.Lock() - defer p.mu.Unlock() - - if p.finalized { - return errors.New("fraction is already finalized") - } - p.finalized = true - - return nil -} - -// sealedProxy represents a sealed fraction that may be offloaded -// Tracks both local sealed instance and remote version if offloaded. -type sealedProxy struct { - proxy *fractionProxy // Thread-safe fraction access - instance *frac.Sealed // Local sealed fraction - remote *frac.Remote // Remote version (if offloaded) -} - -// remoteProxy represents an offloaded fraction -type remoteProxy struct { - proxy *fractionProxy // Thread-safe fraction access - instance *frac.Remote // Remote fraction instance -} - -// emptyFraction represents a missing or deleted fraction -// Returns empty results for all operations. -// Used as placeholder when fraction is removed but references still exist. -type emptyFraction struct { -} - -func (emptyFraction) Info() *common.Info { - return &common.Info{ - Path: "empty", - From: math.MaxUint64, - To: 0, - } -} - -func (emptyFraction) IsIntersecting(_, _ seq.MID) bool { - return false -} - -func (emptyFraction) Contains(mid seq.MID) bool { - return false -} - -func (emptyFraction) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) { - return nil, nil -} - -func (emptyFraction) Search(_ context.Context, params processor.SearchParams) (*seq.QPR, error) { - metric.CountersTotal.WithLabelValues("empty_data_provider").Inc() - return &seq.QPR{Aggs: make([]seq.AggregatableSamples, len(params.AggQ))}, nil -} diff --git a/fracmanager/sync_appender.go b/fracmanager/sync_appender.go new file mode 100644 index 00000000..5933a15d --- /dev/null +++ b/fracmanager/sync_appender.go @@ -0,0 +1,81 @@ +package fracmanager + +import ( + "errors" + "sync" + "time" + + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/util" + "go.uber.org/zap" +) + +var ( + ErrFractionNotWritable = errors.New("fraction is not writable") + ErrFractionSuspended = errors.New("write operations temporarily suspended - database capacity exceeded") +) + +type syncAppender struct { + active *refCountedActive // Actual active fraction instance + + mu sync.RWMutex // Protects readonly state + wg sync.WaitGroup // Tracks pending write operations + + finalized bool // Whether fraction is frozen for writes + suspended bool // Temporarily suspended for writes +} + +// Append adds documents to the active fraction +func (a *syncAppender) Append(docs, meta []byte) error { + a.mu.RLock() + if a.finalized { + a.mu.RUnlock() + return ErrFractionNotWritable + } + if a.suspended { + a.mu.RUnlock() + return ErrFractionSuspended + } + a.wg.Add(1) // Important: wg.Add() inside lock to prevent race with WaitWriteIdle() + a.mu.RUnlock() + + return a.active.frac.Append(docs, meta, &a.wg) +} + +func (a *syncAppender) Suspended() bool { + a.mu.Lock() + defer a.mu.Unlock() + + return a.suspended +} + +func (a *syncAppender) Suspend(value bool) { + a.mu.Lock() + a.suspended = value + a.mu.Unlock() +} + +// WaitWriteIdle waits for all pending write operations to complete +// Used before sealing to ensure data consistency. +func (a *syncAppender) WaitWriteIdle() { + start := time.Now() + logger.Info("waiting fraction to stop write...", zap.String("name", a.active.frac.BaseFileName)) + a.wg.Wait() + waitTime := util.DurationToUnit(time.Since(start), "s") + logger.Info("write is stopped", + zap.String("name", a.active.frac.BaseFileName), + zap.Float64("time_wait_s", waitTime)) +} + +// Finalize marks the fraction as read-only and prevents new writes from starting after finalize. +func (a *syncAppender) Finalize() error { + a.mu.Lock() + if a.finalized { + a.mu.Unlock() + return errors.New("fraction is already finalized") + } + a.finalized = true + a.mu.Unlock() + + return nil +} diff --git a/storeapi/grpc_async_search.go b/storeapi/grpc_async_search.go index 518ed19f..d44df1b2 100644 --- a/storeapi/grpc_async_search.go +++ b/storeapi/grpc_async_search.go @@ -46,8 +46,12 @@ func (g *GrpcV1) StartAsyncSearch( Retention: r.Retention.AsDuration(), WithDocs: r.WithDocs, } - fracs := g.fracManager.Fractions().FilterInRange(seq.MillisToMID(uint64(r.From)), seq.MillisToMID(uint64(r.To))) - if err := g.asyncSearcher.StartSearch(req, fracs); err != nil { + + fracs, release := g.fracManager.FractionsSnapshot() + defer release() + + filtered := fracs.FilterInRange(seq.MillisToMID(uint64(r.From)), seq.MillisToMID(uint64(r.To))) + if err := g.asyncSearcher.StartSearch(req, filtered); err != nil { return nil, err } diff --git a/storeapi/grpc_fetch.go b/storeapi/grpc_fetch.go index d640618c..7ddaa5ab 100644 --- a/storeapi/grpc_fetch.go +++ b/storeapi/grpc_fetch.go @@ -68,7 +68,10 @@ func (g *GrpcV1) doFetch(ctx context.Context, req *storeapi.FetchRequest, stream dp := acquireDocFieldsFilter(req.FieldsFilter) defer releaseDocFieldsFilter(dp) - docsStream := newDocsStream(ctx, ids, g.fetchData.docFetcher, g.fracManager.Fractions()) + fractions, release := g.fracManager.FractionsSnapshot() + defer release() + + docsStream := newDocsStream(ctx, ids, g.fetchData.docFetcher, fractions) for _, id := range ids { workTime := time.Now() doc, err := docsStream.Next() diff --git a/storeapi/grpc_search.go b/storeapi/grpc_search.go index 2057ea39..63718ef5 100644 --- a/storeapi/grpc_search.go +++ b/storeapi/grpc_search.go @@ -189,13 +189,16 @@ func (g *GrpcV1) doSearch( } searchTr := tr.NewChild("search iteratively") + fractions, release := g.fracManager.FractionsSnapshot() qpr, err := g.searchData.searcher.SearchDocs( ctx, - g.fracManager.Fractions(), + fractions, searchParams, tr, ) + release() searchTr.Done() + if err != nil { if code, ok := parseStoreError(err); ok { return &storeapi.SearchResponse{Code: code}, nil diff --git a/storeapi/grpc_v1.go b/storeapi/grpc_v1.go index a242b172..0d4f0f9c 100644 --- a/storeapi/grpc_v1.go +++ b/storeapi/grpc_v1.go @@ -98,6 +98,9 @@ type GrpcV1 struct { } func NewGrpcV1(cfg APIConfig, fracManager *fracmanager.FracManager, mappingProvider MappingProvider) *GrpcV1 { + fractions, release := fracManager.FractionsSnapshot() + defer release() + g := &GrpcV1{ config: cfg, fracManager: fracManager, @@ -114,7 +117,7 @@ func NewGrpcV1(cfg APIConfig, fracManager *fracmanager.FracManager, mappingProvi }, asyncSearcher: asyncsearcher.MustStartAsync( cfg.Search.Async, mappingProvider, - fracManager.Fractions(), + fractions, ), }