Skip to content

Commit a0a8bc2

Browse files
committed
feat(fracmanager): implement fraction snapshots with wait group reference counting
1 parent fd721bd commit a0a8bc2

15 files changed

Lines changed: 439 additions & 417 deletions

fracmanager/fracmanager.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,27 +76,28 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client) (*FracManager, func
7676
cancel()
7777
wg.Wait()
7878

79-
// freeze active fraction to prevent new writes
80-
active := lc.registry.Active()
81-
if err := active.Finalize(); err != nil {
79+
// finalize appender fraction to prevent new writes
80+
appender := lc.registry.Appender()
81+
if err := appender.Finalize(); err != nil {
8282
logger.Fatal("shutdown fraction freezing error", zap.Error(err))
8383
}
84-
active.WaitWriteIdle()
84+
appender.WaitWriteIdle()
8585

8686
stopIdx()
8787

8888
lc.SyncInfoCache()
8989

90-
sealOnShutdown(active.instance, provider, cfg.MinSealFracSize)
90+
// Seal active fraction
91+
sealOnShutdown(appender.frac, provider, cfg.MinSealFracSize)
9192

9293
logger.Info("fracmanager's workers are stopped", zap.Int64("took_ms", time.Since(n).Milliseconds()))
9394
}
9495

9596
return &fm, stop, nil
9697
}
9798

98-
func (fm *FracManager) Fractions() List {
99-
return fm.lc.registry.AllFractions()
99+
func (fm *FracManager) FractionsSnapshot() (List, ReleaseSnapshot) {
100+
return fm.lc.registry.FractionsSnapshot()
100101
}
101102

102103
func (fm *FracManager) Oldest() uint64 {
@@ -116,7 +117,7 @@ func (fm *FracManager) Append(ctx context.Context, docs storage.DocBlock, metas
116117
return ctx.Err()
117118
default:
118119
// Try to append data to the currently active fraction
119-
err := fm.lc.registry.Active().Append(docs, metas)
120+
err := fm.lc.registry.Appender().Append(docs, metas)
120121
if err != nil {
121122
logger.Info("append fail", zap.Error(err))
122123
if err == ErrFractionNotWritable {

fracmanager/fracmanager_for_tests.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package fracmanager
33
import "sync"
44

55
func (fm *FracManager) WaitIdleForTests() {
6-
fm.lc.registry.Active().WaitWriteIdle()
6+
fm.lc.registry.Appender().WaitWriteIdle()
77
}
88

99
func (fm *FracManager) SealForcedForTests() {

fracmanager/fracmanager_test.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,30 +52,33 @@ func TestSealingOnShutdown(t *testing.T) {
5252
cfg.MinSealFracSize = 0 // to ensure that the frac will not be sealed on shutdown
5353
cfg, fm, stop := setupFracManager(t, cfg)
5454
appendDocsToFracManager(t, fm, 10)
55-
activeName := fm.Fractions()[0].Info().Name()
55+
56+
fractions := fm.lc.registry.all.fractions
57+
activeName := fractions[0].Info().Name()
58+
5659
stop()
5760

5861
// second start
5962
cfg.MinSealFracSize = 1 // to ensure that the frac will be sealed on shutdown
6063
cfg, fm, stop = setupFracManager(t, cfg)
6164

62-
assert.Equal(t, 1, len(fm.Fractions()), "should have one fraction")
63-
assert.Equal(t, activeName, fm.Fractions()[0].Info().Name(), "fraction should have the same name")
64-
_, ok := fm.Fractions()[0].(*fractionProxy).impl.(*frac.Active)
65+
fractions = fm.lc.registry.all.fractions
66+
assert.Equal(t, 1, len(fractions), "should have one fraction")
67+
assert.Equal(t, activeName, fractions[0].Info().Name(), "fraction should have the same name")
68+
_, ok := fractions[0].(*frac.Active)
6569
assert.True(t, ok, "fraction should be active")
66-
6770
stop()
6871

6972
// third start
7073
_, fm, stop = setupFracManager(t, cfg)
7174

72-
assert.Equal(t, 2, len(fm.Fractions()), "should have 2 fraction: new active and old sealed")
73-
_, ok = fm.Fractions()[0].(*fractionProxy).impl.(*frac.Sealed)
75+
fractions = fm.lc.registry.all.fractions
76+
assert.Equal(t, 2, len(fractions), "should have 2 fraction: new active and old sealed")
77+
_, ok = fractions[0].(*frac.Sealed)
7478
assert.True(t, ok, "first fraction should be sealed")
75-
assert.Equal(t, activeName, fm.Fractions()[0].Info().Name(), "sealed fraction should have the same name")
76-
assert.Equal(t, uint32(0), fm.Fractions()[1].Info().DocsTotal, "active fraction should be empty")
77-
_, ok = fm.Fractions()[1].(*fractionProxy).impl.(*frac.Active)
79+
assert.Equal(t, activeName, fractions[0].Info().Name(), "sealed fraction should have the same name")
80+
assert.Equal(t, uint32(0), fractions[1].Info().DocsTotal, "active fraction should be empty")
81+
_, ok = fractions[1].(*frac.Active)
7882
assert.True(t, ok, "new fraction should be active")
79-
8083
stop()
8184
}

fracmanager/fracs_stats.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,7 @@ func (s *registryStats) SetMetrics() {
9595
s.offloading.SetMetrics(dataSizeTotal, "offloading")
9696
s.remotes.SetMetrics(dataSizeTotal, "remotes")
9797
}
98+
99+
func (s registryStats) TotalSizeOnDiskLocal() uint64 {
100+
return s.sealing.totalSizeOnDisk + s.sealed.totalSizeOnDisk
101+
}

fracmanager/fraction_provider.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,16 @@ import (
88
"time"
99

1010
"github.com/oklog/ulid/v2"
11+
"go.uber.org/zap"
1112

1213
"github.com/ozontech/seq-db/frac"
1314
"github.com/ozontech/seq-db/frac/common"
1415
"github.com/ozontech/seq-db/frac/sealed"
1516
"github.com/ozontech/seq-db/frac/sealed/sealing"
17+
"github.com/ozontech/seq-db/logger"
1618
"github.com/ozontech/seq-db/storage"
1719
"github.com/ozontech/seq-db/storage/s3"
20+
"github.com/ozontech/seq-db/util"
1821
)
1922

2023
const fileBasePattern = "seq-db-"
@@ -107,8 +110,11 @@ func (fp *fractionProvider) CreateActive() *frac.Active {
107110

108111
// Seal converts an active fraction to a sealed one
109112
// Process includes sorting, indexing, and data optimization for reading
110-
func (fp *fractionProvider) Seal(active *frac.Active) (*frac.Sealed, error) {
111-
src, err := frac.NewActiveSealingSource(active, fp.config.SealParams)
113+
func (fp *fractionProvider) Seal(a *frac.Active) (*frac.Sealed, error) {
114+
sealsTotal.Inc()
115+
now := time.Now()
116+
117+
src, err := frac.NewActiveSealingSource(a, fp.config.SealParams)
112118
if err != nil {
113119
return nil, err
114120
}
@@ -117,7 +123,18 @@ func (fp *fractionProvider) Seal(active *frac.Active) (*frac.Sealed, error) {
117123
return nil, err
118124
}
119125

120-
return fp.NewSealedPreloaded(active.BaseFileName, preloaded), nil
126+
s := fp.NewSealedPreloaded(a.BaseFileName, preloaded)
127+
128+
sealingTime := time.Since(now)
129+
sealsDoneSeconds.Observe(sealingTime.Seconds())
130+
131+
logger.Info(
132+
"fraction sealed",
133+
zap.String("fraction", filepath.Base(s.BaseFileName)),
134+
zap.Float64("time_spent_s", util.DurationToUnit(sealingTime, "s")),
135+
)
136+
137+
return s, nil
121138
}
122139

123140
// Offload uploads fraction to S3 storage and returns a remote fraction

0 commit comments

Comments
 (0)