Skip to content

Commit 91c657e

Browse files
committed
chore(docsfilter): tombstone headers cache cleanup
1 parent 001a952 commit 91c657e

4 files changed

Lines changed: 43 additions & 9 deletions

File tree

cmd/seq-db/seq-db.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,9 @@ func startStore(
314314
},
315315
},
316316
Filters: docsfilter.Config{
317-
DataDir: cfg.DocsFilter.DataDir,
318-
Workers: cfg.DocsFilter.Concurrency,
317+
DataDir: cfg.DocsFilter.DataDir,
318+
Workers: cfg.DocsFilter.Concurrency,
319+
CacheSizeLimit: uint64(cfg.DocsFilter.CacheSize),
319320
},
320321
}
321322

config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ type Config struct {
271271
DataDir string `config:"data_dir"`
272272
Concurrency int `config:"concurrency"`
273273
Filters []Filter `config:"filters"`
274+
CacheSize Bytes `config:"cache_size" default:"100MiB"`
274275
} `config:"docs_filter"`
275276

276277
// Experimental provides flags

docsfilter/docs_filter.go

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,20 @@ const (
3030
tmpExt = ".tmp"
3131
)
3232

33-
const defaultMaintenanceInterval = 30 * time.Second
33+
const (
34+
defaultMaintenanceInterval = 30 * time.Second
35+
defaultCacheCleanInterval = 10 * time.Millisecond
36+
defaultCacheGCDelay = 1 * time.Second
37+
)
3438

3539
type MappingProvider interface {
3640
GetMapping() seq.Mapping
3741
}
3842

3943
type Config struct {
40-
DataDir string
41-
Workers int
44+
DataDir string
45+
Workers int
46+
CacheSizeLimit uint64
4247
}
4348

4449
type DocsFilter struct {
@@ -56,8 +61,11 @@ type DocsFilter struct {
5661
createDirOnce *sync.Once
5762

5863
maintenanceInterval time.Duration
64+
cacheCleanInterval time.Duration
65+
cacheGCDelay time.Duration
5966

60-
headersCache *cache.Cache[[]lidsBlockHeader]
67+
headersCache *cache.Cache[[]lidsBlockHeader]
68+
headersCacheCleaner *cache.Cleaner
6169
}
6270

6371
func New(
@@ -78,6 +86,8 @@ func New(
7886
filtersMap[string(f.Hash())] = f
7987
}
8088

89+
cacheCleaner := cache.NewCleaner(cfg.CacheSizeLimit, nil)
90+
8191
return &DocsFilter{
8292
ctx: ctx,
8393
config: cfg,
@@ -88,8 +98,10 @@ func New(
8898
rateLimit: make(chan struct{}, workers),
8999
createDirOnce: &sync.Once{},
90100
maintenanceInterval: defaultMaintenanceInterval,
91-
// TODO: create cache properly (cleaner, metrics) (use cacheMaintainer ???)
92-
headersCache: cache.NewCache[[]lidsBlockHeader](nil, nil),
101+
cacheCleanInterval: defaultCacheCleanInterval,
102+
cacheGCDelay: defaultCacheGCDelay,
103+
headersCache: cache.NewCache[[]lidsBlockHeader](cacheCleaner, nil),
104+
headersCacheCleaner: cacheCleaner,
93105
}
94106
}
95107

@@ -107,6 +119,7 @@ func (df *DocsFilter) Start(fracs fracmanager.List) {
107119
}
108120

109121
go df.maintenance()
122+
go df.cacheCleanLoop()
110123

111124
mapping := df.mp.GetMapping()
112125

@@ -382,6 +395,25 @@ func (df *DocsFilter) maintenance() {
382395
}
383396
}
384397

398+
func (df *DocsFilter) cacheCleanLoop() {
399+
runs := 0
400+
gcRunsCount := int(df.cacheGCDelay / df.cacheCleanInterval)
401+
402+
for {
403+
runs++
404+
df.headersCacheCleaner.Cleanup(&cache.CleanStat{})
405+
df.headersCacheCleaner.Rotate()
406+
407+
if runs >= gcRunsCount {
408+
runs = 0
409+
df.headersCacheCleaner.CleanEmptyGenerations()
410+
df.headersCacheCleaner.ReleaseBuckets()
411+
}
412+
413+
time.Sleep(df.cacheCleanInterval)
414+
}
415+
}
416+
385417
func (df *DocsFilter) checkDiskUsage() {
386418
du := int64(0)
387419

frac/active.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type Active struct {
4646
TokenList *TokenList
4747

4848
DocsPositions *DocsPositions
49-
IDsToLIDs *ActiveLIDs // TODO: (???)
49+
IDsToLIDs *ActiveLIDs
5050

5151
docsFile *os.File
5252
docsReader storage.DocsReader

0 commit comments

Comments
 (0)