From b9b05caf7dc9d8a47480053535b5a157304fe7a9 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Mon, 20 Apr 2026 18:05:19 -0400 Subject: [PATCH 1/4] reorganize package - move internal packages to pkg --- api.go | 6 +++--- bench_test.go | 2 +- blocking.go | 2 +- compact/deletes_test.go | 5 +++-- compact/updates_test.go | 9 +++++---- delete_test.go | 3 ++- log.go | 6 +++--- log_test.go | 6 +++--- {index => pkg/index}/format.go | 0 {index => pkg/index}/format_test.go | 0 {index => pkg/index}/index.go | 2 +- {index => pkg/index}/keys.go | 3 ++- {index => pkg/index}/keys_test.go | 3 ++- {index => pkg/index}/offset.go | 2 +- {index => pkg/index}/offset_test.go | 3 ++- {index => pkg/index}/times.go | 2 +- {index => pkg/index}/times_test.go | 0 {message => pkg/message}/format.go | 0 {message => pkg/message}/format_test.go | 0 {message => pkg/message}/message.go | 0 {notify => pkg/notify}/notify.go | 0 {notify => pkg/notify}/notify_test.go | 0 {segment => pkg/segment}/index.go | 2 +- {segment => pkg/segment}/index_test.go | 0 {segment => pkg/segment}/segment.go | 4 ++-- {segment => pkg/segment}/segment_test.go | 4 ++-- {segment => pkg/segment}/segments.go | 4 ++-- {segment => pkg/segment}/segments_test.go | 2 +- {segment => pkg/segment}/utils.go | 0 reader.go | 6 +++--- trim/age_test.go | 5 +++-- trim/count_test.go | 5 +++-- trim/offset.go | 2 +- trim/offset_test.go | 13 +++++++------ trim/size_test.go | 7 ++++--- typed_blocking.go | 2 +- writer.go | 6 +++--- 37 files changed, 63 insertions(+), 53 deletions(-) rename {index => pkg/index}/format.go (100%) rename {index => pkg/index}/format_test.go (100%) rename {index => pkg/index}/index.go (93%) rename {index => pkg/index}/keys.go (95%) rename {index => pkg/index}/keys_test.go (96%) rename {index => pkg/index}/offset.go (98%) rename {index => pkg/index}/offset_test.go (99%) rename {index => pkg/index}/times.go (95%) rename {index => pkg/index}/times_test.go (100%) rename {message => pkg/message}/format.go (100%) rename {message => pkg/message}/format_test.go (100%) rename {message => pkg/message}/message.go (100%) rename {notify => pkg/notify}/notify.go (100%) rename {notify => pkg/notify}/notify_test.go (100%) rename {segment => pkg/segment}/index.go (98%) rename {segment => pkg/segment}/index_test.go (100%) rename {segment => pkg/segment}/segment.go (99%) rename {segment => pkg/segment}/segment_test.go (99%) rename {segment => pkg/segment}/segments.go (97%) rename {segment => pkg/segment}/segments_test.go (91%) rename {segment => pkg/segment}/utils.go (100%) diff --git a/api.go b/api.go index 257bddf..330a2f1 100644 --- a/api.go +++ b/api.go @@ -5,9 +5,9 @@ import ( "fmt" "time" - "github.com/klev-dev/klevdb/index" - "github.com/klev-dev/klevdb/message" - "github.com/klev-dev/klevdb/segment" + "github.com/klev-dev/klevdb/pkg/index" + "github.com/klev-dev/klevdb/pkg/message" + "github.com/klev-dev/klevdb/pkg/segment" ) const ( diff --git a/bench_test.go b/bench_test.go index 257fdb2..06fd01b 100644 --- a/bench_test.go +++ b/bench_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" - "github.com/klev-dev/klevdb/message" + "github.com/klev-dev/klevdb/pkg/message" ) var v1opts = VersionOptions{NewSegmentsVersion: V1} diff --git a/blocking.go b/blocking.go index bf6ed8a..53374d9 100644 --- a/blocking.go +++ b/blocking.go @@ -3,7 +3,7 @@ package klevdb import ( "context" - "github.com/klev-dev/klevdb/notify" + "github.com/klev-dev/klevdb/pkg/notify" ) // BlockingLog enhances [Log] adding blocking consume diff --git a/compact/deletes_test.go b/compact/deletes_test.go index 0aad84c..893f0ee 100644 --- a/compact/deletes_test.go +++ b/compact/deletes_test.go @@ -6,9 +6,10 @@ import ( "testing" "time" - "github.com/klev-dev/klevdb" - "github.com/klev-dev/klevdb/message" "github.com/stretchr/testify/require" + + "github.com/klev-dev/klevdb" + "github.com/klev-dev/klevdb/pkg/message" ) func TestDeletes(t *testing.T) { diff --git a/compact/updates_test.go b/compact/updates_test.go index d8ac91b..9d437e7 100644 --- a/compact/updates_test.go +++ b/compact/updates_test.go @@ -6,9 +6,10 @@ import ( "testing" "time" - "github.com/klev-dev/klevdb" - "github.com/klev-dev/klevdb/message" "github.com/stretchr/testify/require" + + "github.com/klev-dev/klevdb" + "github.com/klev-dev/klevdb/pkg/message" ) func TestUpdates(t *testing.T) { @@ -51,7 +52,7 @@ func TestUpdates(t *testing.T) { _, err = l.Publish(msgs) require.NoError(t, err) - dmsgs := []message.Message{msgs[0]} + dmsgs := []klevdb.Message{msgs[0]} dmsgs[0].Value = []byte("abc") _, err = l.Publish(dmsgs) require.NoError(t, err) @@ -75,7 +76,7 @@ func TestUpdates(t *testing.T) { _, err = l.Publish(msgs) require.NoError(t, err) - dmsgs := []message.Message{msgs[4]} + dmsgs := []klevdb.Message{msgs[4]} dmsgs[0].Value = []byte("abc") _, err = l.Publish(dmsgs) require.NoError(t, err) diff --git a/delete_test.go b/delete_test.go index 3b35ed0..9295dd1 100644 --- a/delete_test.go +++ b/delete_test.go @@ -5,8 +5,9 @@ import ( "testing" "time" - "github.com/klev-dev/klevdb/message" "github.com/stretchr/testify/require" + + "github.com/klev-dev/klevdb/pkg/message" ) func TestDeleteMulti(t *testing.T) { diff --git a/log.go b/log.go index 1fb3248..345c26d 100644 --- a/log.go +++ b/log.go @@ -10,9 +10,9 @@ import ( "github.com/gofrs/flock" - "github.com/klev-dev/klevdb/index" - "github.com/klev-dev/klevdb/message" - "github.com/klev-dev/klevdb/segment" + "github.com/klev-dev/klevdb/pkg/index" + "github.com/klev-dev/klevdb/pkg/message" + "github.com/klev-dev/klevdb/pkg/segment" ) var errNoKeyIndex = fmt.Errorf("%w by key", ErrNoIndex) diff --git a/log_test.go b/log_test.go index 88eb491..10afbe6 100644 --- a/log_test.go +++ b/log_test.go @@ -15,9 +15,9 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" - "github.com/klev-dev/klevdb/index" - "github.com/klev-dev/klevdb/message" - "github.com/klev-dev/klevdb/segment" + "github.com/klev-dev/klevdb/pkg/index" + "github.com/klev-dev/klevdb/pkg/message" + "github.com/klev-dev/klevdb/pkg/segment" ) func publishBatched(t *testing.T, l Log, msgs []Message, batchLen int) { diff --git a/index/format.go b/pkg/index/format.go similarity index 100% rename from index/format.go rename to pkg/index/format.go diff --git a/index/format_test.go b/pkg/index/format_test.go similarity index 100% rename from index/format_test.go rename to pkg/index/format_test.go diff --git a/index/index.go b/pkg/index/index.go similarity index 93% rename from index/index.go rename to pkg/index/index.go index e2d70fd..4aae0f5 100644 --- a/index/index.go +++ b/pkg/index/index.go @@ -1,7 +1,7 @@ package index import ( - "github.com/klev-dev/klevdb/message" + "github.com/klev-dev/klevdb/pkg/message" ) type Item struct { diff --git a/index/keys.go b/pkg/index/keys.go similarity index 95% rename from index/keys.go rename to pkg/index/keys.go index 9d3362e..f44efc3 100644 --- a/index/keys.go +++ b/pkg/index/keys.go @@ -5,8 +5,9 @@ import ( "fmt" "hash/fnv" - "github.com/klev-dev/klevdb/message" art "github.com/plar/go-adaptive-radix-tree/v2" + + "github.com/klev-dev/klevdb/pkg/message" ) var ErrKeyNotFound = fmt.Errorf("key: %w", message.ErrNotFound) diff --git a/index/keys_test.go b/pkg/index/keys_test.go similarity index 96% rename from index/keys_test.go rename to pkg/index/keys_test.go index e9872c3..c9e5034 100644 --- a/index/keys_test.go +++ b/pkg/index/keys_test.go @@ -3,9 +3,10 @@ package index import ( "testing" - "github.com/klev-dev/klevdb/message" art "github.com/plar/go-adaptive-radix-tree/v2" "github.com/stretchr/testify/require" + + "github.com/klev-dev/klevdb/pkg/message" ) func TestKeys(t *testing.T) { diff --git a/index/offset.go b/pkg/index/offset.go similarity index 98% rename from index/offset.go rename to pkg/index/offset.go index f4115ba..be87bc0 100644 --- a/index/offset.go +++ b/pkg/index/offset.go @@ -3,7 +3,7 @@ package index import ( "fmt" - "github.com/klev-dev/klevdb/message" + "github.com/klev-dev/klevdb/pkg/message" ) var ErrOffsetIndexEmpty = fmt.Errorf("%w: no offset items", message.ErrInvalidOffset) diff --git a/index/offset_test.go b/pkg/index/offset_test.go similarity index 99% rename from index/offset_test.go rename to pkg/index/offset_test.go index 62dc9d2..9464b0f 100644 --- a/index/offset_test.go +++ b/pkg/index/offset_test.go @@ -4,8 +4,9 @@ import ( "fmt" "testing" - "github.com/klev-dev/klevdb/message" "github.com/stretchr/testify/require" + + "github.com/klev-dev/klevdb/pkg/message" ) func genItems(offsets ...int64) []Item { diff --git a/index/times.go b/pkg/index/times.go similarity index 95% rename from index/times.go rename to pkg/index/times.go index 24438c1..0d454ab 100644 --- a/index/times.go +++ b/pkg/index/times.go @@ -5,7 +5,7 @@ import ( "fmt" "sort" - "github.com/klev-dev/klevdb/message" + "github.com/klev-dev/klevdb/pkg/message" ) var ErrTimeIndexEmpty = fmt.Errorf("%w: no time items", message.ErrInvalidOffset) diff --git a/index/times_test.go b/pkg/index/times_test.go similarity index 100% rename from index/times_test.go rename to pkg/index/times_test.go diff --git a/message/format.go b/pkg/message/format.go similarity index 100% rename from message/format.go rename to pkg/message/format.go diff --git a/message/format_test.go b/pkg/message/format_test.go similarity index 100% rename from message/format_test.go rename to pkg/message/format_test.go diff --git a/message/message.go b/pkg/message/message.go similarity index 100% rename from message/message.go rename to pkg/message/message.go diff --git a/notify/notify.go b/pkg/notify/notify.go similarity index 100% rename from notify/notify.go rename to pkg/notify/notify.go diff --git a/notify/notify_test.go b/pkg/notify/notify_test.go similarity index 100% rename from notify/notify_test.go rename to pkg/notify/notify_test.go diff --git a/segment/index.go b/pkg/segment/index.go similarity index 98% rename from segment/index.go rename to pkg/segment/index.go index 8180103..cbae738 100644 --- a/segment/index.go +++ b/pkg/segment/index.go @@ -3,7 +3,7 @@ package segment import ( "fmt" - "github.com/klev-dev/klevdb/message" + "github.com/klev-dev/klevdb/pkg/message" ) type Offsetter interface { diff --git a/segment/index_test.go b/pkg/segment/index_test.go similarity index 100% rename from segment/index_test.go rename to pkg/segment/index_test.go diff --git a/segment/segment.go b/pkg/segment/segment.go similarity index 99% rename from segment/segment.go rename to pkg/segment/segment.go index f9b6cb2..eace60f 100644 --- a/segment/segment.go +++ b/pkg/segment/segment.go @@ -8,9 +8,9 @@ import ( "path/filepath" "slices" - "github.com/klev-dev/klevdb/index" - "github.com/klev-dev/klevdb/message" + "github.com/klev-dev/klevdb/pkg/index" "github.com/klev-dev/klevdb/pkg/kdir" + "github.com/klev-dev/klevdb/pkg/message" ) type Segment struct { diff --git a/segment/segment_test.go b/pkg/segment/segment_test.go similarity index 99% rename from segment/segment_test.go rename to pkg/segment/segment_test.go index ef3bbc0..f72bc9c 100644 --- a/segment/segment_test.go +++ b/pkg/segment/segment_test.go @@ -8,8 +8,8 @@ import ( "github.com/stretchr/testify/require" - "github.com/klev-dev/klevdb/index" - "github.com/klev-dev/klevdb/message" + "github.com/klev-dev/klevdb/pkg/index" + "github.com/klev-dev/klevdb/pkg/message" ) func clearLastByte(fn string) error { diff --git a/segment/segments.go b/pkg/segment/segments.go similarity index 97% rename from segment/segments.go rename to pkg/segment/segments.go index 4519957..5230bba 100644 --- a/segment/segments.go +++ b/pkg/segment/segments.go @@ -7,9 +7,9 @@ import ( "strconv" "strings" - "github.com/klev-dev/klevdb/index" - "github.com/klev-dev/klevdb/message" + "github.com/klev-dev/klevdb/pkg/index" "github.com/klev-dev/klevdb/pkg/kdir" + "github.com/klev-dev/klevdb/pkg/message" ) func Find(dir string, autoSync bool) ([]Segment, error) { diff --git a/segment/segments_test.go b/pkg/segment/segments_test.go similarity index 91% rename from segment/segments_test.go rename to pkg/segment/segments_test.go index 1415d1f..8bcd1b5 100644 --- a/segment/segments_test.go +++ b/pkg/segment/segments_test.go @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/require" - "github.com/klev-dev/klevdb/index" + "github.com/klev-dev/klevdb/pkg/index" ) func TestRecoverDir(t *testing.T) { diff --git a/segment/utils.go b/pkg/segment/utils.go similarity index 100% rename from segment/utils.go rename to pkg/segment/utils.go diff --git a/reader.go b/reader.go index cd6e920..fde53ca 100644 --- a/reader.go +++ b/reader.go @@ -9,9 +9,9 @@ import ( art "github.com/plar/go-adaptive-radix-tree/v2" - "github.com/klev-dev/klevdb/index" - "github.com/klev-dev/klevdb/message" - "github.com/klev-dev/klevdb/segment" + "github.com/klev-dev/klevdb/pkg/index" + "github.com/klev-dev/klevdb/pkg/message" + "github.com/klev-dev/klevdb/pkg/segment" ) type reader struct { diff --git a/trim/age_test.go b/trim/age_test.go index 05a641a..e85b1de 100644 --- a/trim/age_test.go +++ b/trim/age_test.go @@ -5,9 +5,10 @@ import ( "testing" "time" - "github.com/klev-dev/klevdb" - "github.com/klev-dev/klevdb/message" "github.com/stretchr/testify/require" + + "github.com/klev-dev/klevdb" + "github.com/klev-dev/klevdb/pkg/message" ) func TestByAge(t *testing.T) { diff --git a/trim/count_test.go b/trim/count_test.go index 9e2d24c..20b60f6 100644 --- a/trim/count_test.go +++ b/trim/count_test.go @@ -4,9 +4,10 @@ import ( "context" "testing" - "github.com/klev-dev/klevdb" - "github.com/klev-dev/klevdb/message" "github.com/stretchr/testify/require" + + "github.com/klev-dev/klevdb" + "github.com/klev-dev/klevdb/pkg/message" ) func TestByCount(t *testing.T) { diff --git a/trim/offset.go b/trim/offset.go index 7cb90a2..e35b8ec 100644 --- a/trim/offset.go +++ b/trim/offset.go @@ -4,7 +4,7 @@ import ( "context" "github.com/klev-dev/klevdb" - "github.com/klev-dev/klevdb/message" + "github.com/klev-dev/klevdb/pkg/message" ) // FindByOffset returns a set of offsets for messages whose diff --git a/trim/offset_test.go b/trim/offset_test.go index 230c4d0..31d5c88 100644 --- a/trim/offset_test.go +++ b/trim/offset_test.go @@ -4,9 +4,10 @@ import ( "context" "testing" - "github.com/klev-dev/klevdb" - "github.com/klev-dev/klevdb/message" "github.com/stretchr/testify/require" + + "github.com/klev-dev/klevdb" + "github.com/klev-dev/klevdb/pkg/message" ) func TestByOffset(t *testing.T) { @@ -68,7 +69,7 @@ func TestByOffset(t *testing.T) { require.Equal(t, 0, stat.Messages) msg, err = l.Get(klevdb.OffsetOldest) - require.ErrorIs(t, err, message.ErrInvalidOffset) + require.ErrorIs(t, err, klevdb.ErrInvalidOffset) }) } @@ -91,7 +92,7 @@ func TestByOffsetRelative(t *testing.T) { require.Equal(t, int64(0), msg.Offset) t.Run("Oldest", func(t *testing.T) { - off, sz, err := ByOffset(context.TODO(), l, message.OffsetOldest) + off, sz, err := ByOffset(context.TODO(), l, klevdb.OffsetOldest) require.Len(t, off, 0) require.NoError(t, err) require.Equal(t, int64(0), sz) @@ -106,7 +107,7 @@ func TestByOffsetRelative(t *testing.T) { }) t.Run("Newest", func(t *testing.T) { - off, sz, err := ByOffset(context.TODO(), l, message.OffsetNewest) + off, sz, err := ByOffset(context.TODO(), l, klevdb.OffsetNewest) require.Len(t, off, 20) require.NoError(t, err) require.Equal(t, l.Size(msgs[0])*20, sz) @@ -116,6 +117,6 @@ func TestByOffsetRelative(t *testing.T) { require.Equal(t, 0, stat.Messages) msg, err = l.Get(klevdb.OffsetOldest) - require.ErrorIs(t, err, message.ErrInvalidOffset) + require.ErrorIs(t, err, klevdb.ErrInvalidOffset) }) } diff --git a/trim/size_test.go b/trim/size_test.go index 416c4ba..b7959a3 100644 --- a/trim/size_test.go +++ b/trim/size_test.go @@ -4,10 +4,11 @@ import ( "context" "testing" - "github.com/klev-dev/klevdb" - "github.com/klev-dev/klevdb/index" - "github.com/klev-dev/klevdb/message" "github.com/stretchr/testify/require" + + "github.com/klev-dev/klevdb" + "github.com/klev-dev/klevdb/pkg/index" + "github.com/klev-dev/klevdb/pkg/message" ) func TestBySize(t *testing.T) { diff --git a/typed_blocking.go b/typed_blocking.go index 85314f2..c59f01a 100644 --- a/typed_blocking.go +++ b/typed_blocking.go @@ -3,7 +3,7 @@ package klevdb import ( "context" - "github.com/klev-dev/klevdb/notify" + "github.com/klev-dev/klevdb/pkg/notify" ) // TBlockingLog enhances [TLog] adding blocking consume diff --git a/writer.go b/writer.go index 7744d16..b279352 100644 --- a/writer.go +++ b/writer.go @@ -8,9 +8,9 @@ import ( art "github.com/plar/go-adaptive-radix-tree/v2" - "github.com/klev-dev/klevdb/index" - "github.com/klev-dev/klevdb/message" - "github.com/klev-dev/klevdb/segment" + "github.com/klev-dev/klevdb/pkg/index" + "github.com/klev-dev/klevdb/pkg/message" + "github.com/klev-dev/klevdb/pkg/segment" ) type writer struct { From 7eb97aab87182f233b220cedb458e2ecbbca8220 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Mon, 20 Apr 2026 19:59:36 -0400 Subject: [PATCH 2/4] move trim to main package --- trim/age.go => trim_age.go | 26 +++++++--------- trim/age_test.go => trim_age_test.go | 27 ++++++++--------- trim/count.go => trim_count.go | 18 +++++------ trim/count_test.go => trim_count_test.go | 17 +++++------ trim/offset.go => trim_offset.go | 17 +++++------ trim/offset_test.go => trim_offset_test.go | 35 +++++++++++----------- trim/size.go => trim_size.go | 21 ++++++------- trim/size_test.go => trim_size_test.go | 17 +++++------ 8 files changed, 82 insertions(+), 96 deletions(-) rename trim/age.go => trim_age.go (62%) rename trim/age_test.go => trim_age_test.go (69%) rename trim/count.go => trim_count.go (62%) rename trim/count_test.go => trim_count_test.go (73%) rename trim/offset.go => trim_offset.go (62%) rename trim/offset_test.go => trim_offset_test.go (71%) rename trim/size.go => trim_size.go (55%) rename trim/size_test.go => trim_size_test.go (76%) diff --git a/trim/age.go b/trim_age.go similarity index 62% rename from trim/age.go rename to trim_age.go index 1e8decd..075a900 100644 --- a/trim/age.go +++ b/trim_age.go @@ -1,28 +1,26 @@ -package trim +package klevdb import ( "context" "errors" "time" - - "github.com/klev-dev/klevdb" ) // FindByAge returns a set of offsets for messages that are // at the start of the log and before given time. -func FindByAge(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, error) { +func FindByAge(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, error) { maxOffset, _, err := l.OffsetByTime(before) switch { case err == nil: // we've found the max offset, start collecting offsets to delete break - case errors.Is(err, klevdb.ErrNoIndex): + case errors.Is(err, ErrNoIndex): // this log is not indexed by time, use the max as a bound maxOffset, err = l.NextOffset() if err != nil { return nil, err } - case errors.Is(err, klevdb.ErrNotFound): + case errors.Is(err, ErrNotFound): // all messages are before, again use the max as a bound maxOffset, err = l.NextOffset() if err != nil { @@ -30,15 +28,13 @@ func FindByAge(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]s } default: // something else went wrong - if err != nil { - return nil, err - } + return nil, err } var offsets = map[int64]struct{}{} SEARCH: - for offset := klevdb.OffsetOldest; offset < maxOffset; { + for offset := OffsetOldest; offset < maxOffset; { nextOffset, msgs, err := l.Consume(offset, 32) if err != nil { return nil, err @@ -65,10 +61,10 @@ SEARCH: return offsets, nil } -// ByAge tries to remove the messages at the start of the log before given time. +// TrimByAge tries to remove the messages at the start of the log before given time. // // returns the offsets it deleted and the amount of storage freed -func ByAge(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, int64, error) { +func TrimByAge(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, int64, error) { offsets, err := FindByAge(ctx, l, before) if err != nil { return nil, 0, err @@ -76,11 +72,11 @@ func ByAge(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struc return l.Delete(offsets) } -// ByAgeMulti is similar to ByAge, but will try to remove messages from multiple segments -func ByAgeMulti(ctx context.Context, l klevdb.Log, before time.Time, backoff klevdb.DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +// TrimByAgeMulti is similar to ByAge, but will try to remove messages from multiple segments +func TrimByAgeMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { offsets, err := FindByAge(ctx, l, before) if err != nil { return nil, 0, err } - return klevdb.DeleteMulti(ctx, l, offsets, backoff) + return DeleteMulti(ctx, l, offsets, backoff) } diff --git a/trim/age_test.go b/trim_age_test.go similarity index 69% rename from trim/age_test.go rename to trim_age_test.go index e85b1de..f2e9735 100644 --- a/trim/age_test.go +++ b/trim_age_test.go @@ -1,4 +1,4 @@ -package trim +package klevdb import ( "context" @@ -7,11 +7,10 @@ import ( "github.com/stretchr/testify/require" - "github.com/klev-dev/klevdb" "github.com/klev-dev/klevdb/pkg/message" ) -func TestByAge(t *testing.T) { +func TestTrimByAge(t *testing.T) { t.Run("Partial", testByAgePartial) t.Run("NoIndex", testByAgeNoIndex) t.Run("All", testByAgeAll) @@ -20,23 +19,23 @@ func TestByAge(t *testing.T) { func testByAgePartial(t *testing.T) { msgs := message.Gen(20) - l, err := klevdb.Open(t.TempDir(), klevdb.Options{TimeIndex: true}) + l, err := Open(t.TempDir(), Options{TimeIndex: true}) require.NoError(t, err) defer l.Close() _, err = l.Publish(msgs) require.NoError(t, err) - msg, err := l.Get(klevdb.OffsetOldest) + msg, err := l.Get(OffsetOldest) require.NoError(t, err) require.Equal(t, int64(0), msg.Offset) trimTime := msgs[10].Time.Add(-time.Millisecond) - _, trim, err := ByAge(context.TODO(), l, trimTime) + _, trim, err := TrimByAge(context.TODO(), l, trimTime) require.NoError(t, err) require.Equal(t, l.Size(msgs[0])*10, trim) - msg, err = l.Get(klevdb.OffsetOldest) + msg, err = l.Get(OffsetOldest) require.NoError(t, err) require.Equal(t, int64(10), msg.Offset) } @@ -44,23 +43,23 @@ func testByAgePartial(t *testing.T) { func testByAgeNoIndex(t *testing.T) { msgs := message.Gen(20) - l, err := klevdb.Open(t.TempDir(), klevdb.Options{}) + l, err := Open(t.TempDir(), Options{}) require.NoError(t, err) defer l.Close() _, err = l.Publish(msgs) require.NoError(t, err) - msg, err := l.Get(klevdb.OffsetOldest) + msg, err := l.Get(OffsetOldest) require.NoError(t, err) require.Equal(t, int64(0), msg.Offset) trimTime := msgs[10].Time.Add(-time.Millisecond) - _, trim, err := ByAge(context.TODO(), l, trimTime) + _, trim, err := TrimByAge(context.TODO(), l, trimTime) require.NoError(t, err) require.Equal(t, l.Size(msgs[0])*10, trim) - msg, err = l.Get(klevdb.OffsetOldest) + msg, err = l.Get(OffsetOldest) require.NoError(t, err) require.Equal(t, int64(10), msg.Offset) } @@ -68,7 +67,7 @@ func testByAgeNoIndex(t *testing.T) { func testByAgeAll(t *testing.T) { msgs := message.Gen(20) - l, err := klevdb.Open(t.TempDir(), klevdb.Options{TimeIndex: true}) + l, err := Open(t.TempDir(), Options{TimeIndex: true}) require.NoError(t, err) defer l.Close() @@ -76,12 +75,12 @@ func testByAgeAll(t *testing.T) { require.NoError(t, err) trimTime := msgs[len(msgs)-1].Time.Add(time.Millisecond) - off, sz, err := ByAge(context.TODO(), l, trimTime) + off, sz, err := TrimByAge(context.TODO(), l, trimTime) require.NoError(t, err) require.Len(t, off, 20) require.Equal(t, l.Size(msgs[0])*20, sz) - coff, cmsgs, err := l.Consume(klevdb.OffsetOldest, 32) + coff, cmsgs, err := l.Consume(OffsetOldest, 32) require.NoError(t, err) require.Equal(t, int64(20), coff) require.Empty(t, cmsgs) diff --git a/trim/count.go b/trim_count.go similarity index 62% rename from trim/count.go rename to trim_count.go index b37d8dc..807eab4 100644 --- a/trim/count.go +++ b/trim_count.go @@ -1,14 +1,12 @@ -package trim +package klevdb import ( "context" - - "github.com/klev-dev/klevdb" ) // FindByCount returns a set of offsets for messages that when // removed will keep the number of messages in the log under max -func FindByCount(ctx context.Context, l klevdb.Log, max int) (map[int64]struct{}, error) { +func FindByCount(ctx context.Context, l Log, max int) (map[int64]struct{}, error) { stats, err := l.Stat() switch { case err != nil: @@ -25,7 +23,7 @@ func FindByCount(ctx context.Context, l klevdb.Log, max int) (map[int64]struct{} var offsets = map[int64]struct{}{} toRemove := stats.Messages - max - for offset := klevdb.OffsetOldest; offset < maxOffset && toRemove > 0; { + for offset := OffsetOldest; offset < maxOffset && toRemove > 0; { nextOffset, msgs, err := l.Consume(offset, 32) if err != nil { return nil, err @@ -53,11 +51,11 @@ func FindByCount(ctx context.Context, l klevdb.Log, max int) (map[int64]struct{} return offsets, nil } -// ByCount tries to remove messages to keep the number of messages +// TrimByCount tries to remove messages to keep the number of messages // in the log under max count. // // returns the offsets it deleted and the amount of storage freed -func ByCount(ctx context.Context, l klevdb.Log, max int) (map[int64]struct{}, int64, error) { +func TrimByCount(ctx context.Context, l Log, max int) (map[int64]struct{}, int64, error) { offsets, err := FindByCount(ctx, l, max) if err != nil { return nil, 0, err @@ -65,11 +63,11 @@ func ByCount(ctx context.Context, l klevdb.Log, max int) (map[int64]struct{}, in return l.Delete(offsets) } -// ByCountMulti is similar to ByCount, but will try to remove messages from multiple segments -func ByCountMulti(ctx context.Context, l klevdb.Log, max int, backoff klevdb.DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +// TrimByCountMulti is similar to ByCount, but will try to remove messages from multiple segments +func TrimByCountMulti(ctx context.Context, l Log, max int, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { offsets, err := FindByCount(ctx, l, max) if err != nil { return nil, 0, err } - return klevdb.DeleteMulti(ctx, l, offsets, backoff) + return DeleteMulti(ctx, l, offsets, backoff) } diff --git a/trim/count_test.go b/trim_count_test.go similarity index 73% rename from trim/count_test.go rename to trim_count_test.go index 20b60f6..ebea943 100644 --- a/trim/count_test.go +++ b/trim_count_test.go @@ -1,4 +1,4 @@ -package trim +package klevdb import ( "context" @@ -6,14 +6,13 @@ import ( "github.com/stretchr/testify/require" - "github.com/klev-dev/klevdb" "github.com/klev-dev/klevdb/pkg/message" ) -func TestByCount(t *testing.T) { +func TestTrimByCount(t *testing.T) { msgs := message.Gen(20) - l, err := klevdb.Open(t.TempDir(), klevdb.Options{}) + l, err := Open(t.TempDir(), Options{}) require.NoError(t, err) defer l.Close() @@ -24,12 +23,12 @@ func TestByCount(t *testing.T) { require.NoError(t, err) require.Equal(t, len(msgs), stat.Messages) - msg, err := l.Get(klevdb.OffsetOldest) + msg, err := l.Get(OffsetOldest) require.NoError(t, err) require.Equal(t, int64(0), msg.Offset) t.Run("None", func(t *testing.T) { - off, sz, err := ByCount(context.TODO(), l, 21) + off, sz, err := TrimByCount(context.TODO(), l, 21) require.Len(t, off, 0) require.NoError(t, err) require.Equal(t, int64(0), sz) @@ -38,13 +37,13 @@ func TestByCount(t *testing.T) { require.NoError(t, err) require.Equal(t, len(msgs), stat.Messages) - msg, err = l.Get(klevdb.OffsetOldest) + msg, err = l.Get(OffsetOldest) require.NoError(t, err) require.Equal(t, int64(0), msg.Offset) }) t.Run("Half", func(t *testing.T) { - off, sz, err := ByCount(context.TODO(), l, 10) + off, sz, err := TrimByCount(context.TODO(), l, 10) require.Len(t, off, 10) require.NoError(t, err) require.Equal(t, l.Size(msgs[0])*10, sz) @@ -53,7 +52,7 @@ func TestByCount(t *testing.T) { require.NoError(t, err) require.Equal(t, 10, stat.Messages) - msg, err = l.Get(klevdb.OffsetOldest) + msg, err = l.Get(OffsetOldest) require.NoError(t, err) require.Equal(t, int64(10), msg.Offset) }) diff --git a/trim/offset.go b/trim_offset.go similarity index 62% rename from trim/offset.go rename to trim_offset.go index e35b8ec..255a792 100644 --- a/trim/offset.go +++ b/trim_offset.go @@ -1,15 +1,14 @@ -package trim +package klevdb import ( "context" - "github.com/klev-dev/klevdb" "github.com/klev-dev/klevdb/pkg/message" ) // FindByOffset returns a set of offsets for messages whose // offset is before a given offset -func FindByOffset(ctx context.Context, l klevdb.Log, before int64) (map[int64]struct{}, error) { +func FindByOffset(ctx context.Context, l Log, before int64) (map[int64]struct{}, error) { if before == message.OffsetOldest { return map[int64]struct{}{}, nil } @@ -25,7 +24,7 @@ func FindByOffset(ctx context.Context, l klevdb.Log, before int64) (map[int64]st } var offsets = map[int64]struct{}{} - for offset := klevdb.OffsetOldest; offset < maxOffset; { + for offset := OffsetOldest; offset < maxOffset; { nextOffset, msgs, err := l.Consume(offset, 32) if err != nil { return nil, err @@ -51,10 +50,10 @@ func FindByOffset(ctx context.Context, l klevdb.Log, before int64) (map[int64]st return offsets, nil } -// ByOffset tries to remove the messages at the start of the log before offset +// TrimByOffset tries to remove the messages at the start of the log before offset // // returns the offsets it deleted and the amount of storage freed -func ByOffset(ctx context.Context, l klevdb.Log, before int64) (map[int64]struct{}, int64, error) { +func TrimByOffset(ctx context.Context, l Log, before int64) (map[int64]struct{}, int64, error) { offsets, err := FindByOffset(ctx, l, before) if err != nil { return nil, 0, err @@ -62,11 +61,11 @@ func ByOffset(ctx context.Context, l klevdb.Log, before int64) (map[int64]struct return l.Delete(offsets) } -// ByOffsetMulti is similar to ByOffset, but will try to remove messages from multiple segments -func ByOffsetMulti(ctx context.Context, l klevdb.Log, before int64, backoff klevdb.DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +// TrimByOffsetMulti is similar to ByOffset, but will try to remove messages from multiple segments +func TrimByOffsetMulti(ctx context.Context, l Log, before int64, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { offsets, err := FindByOffset(ctx, l, before) if err != nil { return nil, 0, err } - return klevdb.DeleteMulti(ctx, l, offsets, backoff) + return DeleteMulti(ctx, l, offsets, backoff) } diff --git a/trim/offset_test.go b/trim_offset_test.go similarity index 71% rename from trim/offset_test.go rename to trim_offset_test.go index 31d5c88..c8a5cd6 100644 --- a/trim/offset_test.go +++ b/trim_offset_test.go @@ -1,4 +1,4 @@ -package trim +package klevdb import ( "context" @@ -6,14 +6,13 @@ import ( "github.com/stretchr/testify/require" - "github.com/klev-dev/klevdb" "github.com/klev-dev/klevdb/pkg/message" ) func TestByOffset(t *testing.T) { msgs := message.Gen(20) - l, err := klevdb.Open(t.TempDir(), klevdb.Options{}) + l, err := Open(t.TempDir(), Options{}) require.NoError(t, err) defer l.Close() @@ -24,12 +23,12 @@ func TestByOffset(t *testing.T) { require.NoError(t, err) require.Equal(t, len(msgs), stat.Messages) - msg, err := l.Get(klevdb.OffsetOldest) + msg, err := l.Get(OffsetOldest) require.NoError(t, err) require.Equal(t, int64(0), msg.Offset) t.Run("None", func(t *testing.T) { - off, sz, err := ByOffset(context.TODO(), l, 0) + off, sz, err := TrimByOffset(context.TODO(), l, 0) require.Len(t, off, 0) require.NoError(t, err) require.Equal(t, int64(0), sz) @@ -38,13 +37,13 @@ func TestByOffset(t *testing.T) { require.NoError(t, err) require.Equal(t, len(msgs), stat.Messages) - msg, err = l.Get(klevdb.OffsetOldest) + msg, err = l.Get(OffsetOldest) require.NoError(t, err) require.Equal(t, int64(0), msg.Offset) }) t.Run("Half", func(t *testing.T) { - off, sz, err := ByOffset(context.TODO(), l, 10) + off, sz, err := TrimByOffset(context.TODO(), l, 10) require.Len(t, off, 10) require.NoError(t, err) require.Equal(t, l.Size(msgs[0])*10, sz) @@ -53,13 +52,13 @@ func TestByOffset(t *testing.T) { require.NoError(t, err) require.Equal(t, 10, stat.Messages) - msg, err = l.Get(klevdb.OffsetOldest) + msg, err = l.Get(OffsetOldest) require.NoError(t, err) require.Equal(t, int64(10), msg.Offset) }) t.Run("All", func(t *testing.T) { - off, sz, err := ByOffset(context.TODO(), l, 100) + off, sz, err := TrimByOffset(context.TODO(), l, 100) require.Len(t, off, 10) require.NoError(t, err) require.Equal(t, l.Size(msgs[0])*10, sz) @@ -68,15 +67,15 @@ func TestByOffset(t *testing.T) { require.NoError(t, err) require.Equal(t, 0, stat.Messages) - msg, err = l.Get(klevdb.OffsetOldest) - require.ErrorIs(t, err, klevdb.ErrInvalidOffset) + msg, err = l.Get(OffsetOldest) + require.ErrorIs(t, err, ErrInvalidOffset) }) } func TestByOffsetRelative(t *testing.T) { msgs := message.Gen(20) - l, err := klevdb.Open(t.TempDir(), klevdb.Options{}) + l, err := Open(t.TempDir(), Options{}) require.NoError(t, err) defer l.Close() @@ -87,12 +86,12 @@ func TestByOffsetRelative(t *testing.T) { require.NoError(t, err) require.Equal(t, len(msgs), stat.Messages) - msg, err := l.Get(klevdb.OffsetOldest) + msg, err := l.Get(OffsetOldest) require.NoError(t, err) require.Equal(t, int64(0), msg.Offset) t.Run("Oldest", func(t *testing.T) { - off, sz, err := ByOffset(context.TODO(), l, klevdb.OffsetOldest) + off, sz, err := TrimByOffset(context.TODO(), l, OffsetOldest) require.Len(t, off, 0) require.NoError(t, err) require.Equal(t, int64(0), sz) @@ -101,13 +100,13 @@ func TestByOffsetRelative(t *testing.T) { require.NoError(t, err) require.Equal(t, len(msgs), stat.Messages) - msg, err = l.Get(klevdb.OffsetOldest) + msg, err = l.Get(OffsetOldest) require.NoError(t, err) require.Equal(t, int64(0), msg.Offset) }) t.Run("Newest", func(t *testing.T) { - off, sz, err := ByOffset(context.TODO(), l, klevdb.OffsetNewest) + off, sz, err := TrimByOffset(context.TODO(), l, OffsetNewest) require.Len(t, off, 20) require.NoError(t, err) require.Equal(t, l.Size(msgs[0])*20, sz) @@ -116,7 +115,7 @@ func TestByOffsetRelative(t *testing.T) { require.NoError(t, err) require.Equal(t, 0, stat.Messages) - msg, err = l.Get(klevdb.OffsetOldest) - require.ErrorIs(t, err, klevdb.ErrInvalidOffset) + msg, err = l.Get(OffsetOldest) + require.ErrorIs(t, err, ErrInvalidOffset) }) } diff --git a/trim/size.go b/trim_size.go similarity index 55% rename from trim/size.go rename to trim_size.go index 8b70f88..24b41f6 100644 --- a/trim/size.go +++ b/trim_size.go @@ -1,14 +1,11 @@ -package trim +package klevdb import ( "context" - - "github.com/klev-dev/klevdb" ) -// FindBySize returns a set of offsets for messages that -// if deleted will decrease the log size to sz -func FindBySize(ctx context.Context, l klevdb.Log, sz int64) (map[int64]struct{}, error) { +// FindBySize returns a set of offsets for messages that if deleted will decrease the log size to sz +func FindBySize(ctx context.Context, l Log, sz int64) (map[int64]struct{}, error) { stats, err := l.Stat() switch { case err != nil: @@ -25,7 +22,7 @@ func FindBySize(ctx context.Context, l klevdb.Log, sz int64) (map[int64]struct{} var offsets = map[int64]struct{}{} total := stats.Size - for offset := klevdb.OffsetOldest; offset < maxOffset && total >= sz; { + for offset := OffsetOldest; offset < maxOffset && total >= sz; { nextOffset, msgs, err := l.Consume(offset, 32) if err != nil { return nil, err @@ -53,10 +50,10 @@ func FindBySize(ctx context.Context, l klevdb.Log, sz int64) (map[int64]struct{} return offsets, nil } -// BySize tries to remove messages until log size is less than sz +// TrimBySize tries to remove messages until log size is less than sz // // returns the offsets it deleted and the amount of storage freed -func BySize(ctx context.Context, l klevdb.Log, sz int64) (map[int64]struct{}, int64, error) { +func TrimBySize(ctx context.Context, l Log, sz int64) (map[int64]struct{}, int64, error) { offsets, err := FindBySize(ctx, l, sz) if err != nil { return nil, 0, err @@ -64,11 +61,11 @@ func BySize(ctx context.Context, l klevdb.Log, sz int64) (map[int64]struct{}, in return l.Delete(offsets) } -// BySizeMulti is similar to BySize, but will try to remove messages from multiple segments -func BySizeMulti(ctx context.Context, l klevdb.Log, sz int64, backoff klevdb.DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +// TrimBySizeMulti is similar to BySize, but will try to remove messages from multiple segments +func TrimBySizeMulti(ctx context.Context, l Log, sz int64, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { offsets, err := FindBySize(ctx, l, sz) if err != nil { return nil, 0, err } - return klevdb.DeleteMulti(ctx, l, offsets, backoff) + return DeleteMulti(ctx, l, offsets, backoff) } diff --git a/trim/size_test.go b/trim_size_test.go similarity index 76% rename from trim/size_test.go rename to trim_size_test.go index b7959a3..ce0734b 100644 --- a/trim/size_test.go +++ b/trim_size_test.go @@ -1,4 +1,4 @@ -package trim +package klevdb import ( "context" @@ -6,15 +6,14 @@ import ( "github.com/stretchr/testify/require" - "github.com/klev-dev/klevdb" "github.com/klev-dev/klevdb/pkg/index" "github.com/klev-dev/klevdb/pkg/message" ) -func TestBySize(t *testing.T) { +func TestTrimBySize(t *testing.T) { msgs := message.Gen(20) - l, err := klevdb.Open(t.TempDir(), klevdb.Options{}) + l, err := Open(t.TempDir(), Options{}) require.NoError(t, err) defer l.Close() @@ -25,12 +24,12 @@ func TestBySize(t *testing.T) { require.NoError(t, err) require.Equal(t, l.Size(msgs[0])*20+message.HeaderSize+index.HeaderSize, stat.Size) - msg, err := l.Get(klevdb.OffsetOldest) + msg, err := l.Get(OffsetOldest) require.NoError(t, err) require.Equal(t, int64(0), msg.Offset) t.Run("None", func(t *testing.T) { - off, sz, err := BySize(context.TODO(), l, l.Size(msgs[0])*21) + off, sz, err := TrimBySize(context.TODO(), l, l.Size(msgs[0])*21) require.Len(t, off, 0) require.NoError(t, err) require.Equal(t, int64(0), sz) @@ -39,14 +38,14 @@ func TestBySize(t *testing.T) { require.NoError(t, err) require.Equal(t, l.Size(msgs[0])*20+message.HeaderSize+index.HeaderSize, stat.Size) - msg, err = l.Get(klevdb.OffsetOldest) + msg, err = l.Get(OffsetOldest) require.NoError(t, err) require.Equal(t, int64(0), msg.Offset) }) t.Run("Half", func(t *testing.T) { toTrimSize := l.Size(msgs[0]) * 11 - off, sz, err := BySize(context.TODO(), l, toTrimSize) + off, sz, err := TrimBySize(context.TODO(), l, toTrimSize) require.Len(t, off, 10) require.NoError(t, err) require.Equal(t, l.Size(msgs[0])*10, sz) @@ -55,7 +54,7 @@ func TestBySize(t *testing.T) { require.NoError(t, err) require.Equal(t, l.Size(msgs[0])*10+message.HeaderSize+index.HeaderSize, stat.Size) - msg, err = l.Get(klevdb.OffsetOldest) + msg, err = l.Get(OffsetOldest) require.NoError(t, err) require.Equal(t, int64(10), msg.Offset) }) From 73934a95110078ab798f2d4786a6ee8b9380f6a3 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Mon, 20 Apr 2026 20:05:12 -0400 Subject: [PATCH 3/4] move compact to main package --- compact/deletes.go => compact_deletes.go | 18 ++++----- ...deletes_test.go => compact_deletes_test.go | 15 ++++--- compact/updates.go => compact_updates.go | 14 +++---- ...updates_test.go => compact_updates_test.go | 39 +++++++++---------- 4 files changed, 40 insertions(+), 46 deletions(-) rename compact/deletes.go => compact_deletes.go (72%) rename compact/deletes_test.go => compact_deletes_test.go (71%) rename compact/updates.go => compact_updates.go (77%) rename compact/updates_test.go => compact_updates_test.go (72%) diff --git a/compact/deletes.go b/compact_deletes.go similarity index 72% rename from compact/deletes.go rename to compact_deletes.go index 7d7e734..0cee06b 100644 --- a/compact/deletes.go +++ b/compact_deletes.go @@ -1,12 +1,10 @@ -package compact +package klevdb import ( "context" "time" art "github.com/plar/go-adaptive-radix-tree/v2" - - "github.com/klev-dev/klevdb" ) // FindDeletes returns a set of offsets for messages with @@ -14,7 +12,7 @@ import ( // // Messages that have a nil value are considered deletes // for this key, and therefore eligible for deletion. -func FindDeletes(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, error) { +func FindDeletes(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, error) { maxOffset, err := l.NextOffset() if err != nil { return nil, err @@ -24,7 +22,7 @@ func FindDeletes(ctx context.Context, l klevdb.Log, before time.Time) (map[int64 var offsets = map[int64]struct{}{} SEARCH: - for offset := klevdb.OffsetOldest; offset < maxOffset; { + for offset := OffsetOldest; offset < maxOffset; { nextOffset, msgs, err := l.Consume(offset, 32) if err != nil { return nil, err @@ -62,14 +60,14 @@ SEARCH: return offsets, nil } -// Deletes tries to remove messages with nil value before given time. +// CompactDeletes tries to remove messages with nil value before given time. // It will not remove messages for keys it sees before that offset. // // This is similar to removing keys, which were deleted (e.g. value set to nil) // and are therefore no longer relevant/active. // // returns the offsets it deleted and the amount of storage freed -func Deletes(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, int64, error) { +func CompactDeletes(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, int64, error) { offsets, err := FindDeletes(ctx, l, before) if err != nil { return nil, 0, err @@ -77,11 +75,11 @@ func Deletes(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]str return l.Delete(offsets) } -// DeletesMulti is similar to Deletes, but will try to remove messages from multiple segments -func DeletesMulti(ctx context.Context, l klevdb.Log, before time.Time, backoff klevdb.DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +// CompactDeletesMulti is similar to Deletes, but will try to remove messages from multiple segments +func CompactDeletesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { offsets, err := FindDeletes(ctx, l, before) if err != nil { return nil, 0, err } - return klevdb.DeleteMulti(ctx, l, offsets, backoff) + return DeleteMulti(ctx, l, offsets, backoff) } diff --git a/compact/deletes_test.go b/compact_deletes_test.go similarity index 71% rename from compact/deletes_test.go rename to compact_deletes_test.go index 893f0ee..f88b69b 100644 --- a/compact/deletes_test.go +++ b/compact_deletes_test.go @@ -1,4 +1,4 @@ -package compact +package klevdb import ( "context" @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/require" - "github.com/klev-dev/klevdb" "github.com/klev-dev/klevdb/pkg/message" ) @@ -16,32 +15,32 @@ func TestDeletes(t *testing.T) { msgs := message.Gen(5) t.Run("Empty", func(t *testing.T) { - l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true}) + l, err := Open(t.TempDir(), Options{KeyIndex: true}) require.NoError(t, err) defer l.Close() - off, cmp, err := Deletes(context.TODO(), l, time.Now()) + off, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) require.NoError(t, err) require.Empty(t, off) require.Equal(t, int64(0), cmp) }) t.Run("None", func(t *testing.T) { - l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true}) + l, err := Open(t.TempDir(), Options{KeyIndex: true}) require.NoError(t, err) defer l.Close() _, err = l.Publish(msgs) require.NoError(t, err) - off, cmp, err := Deletes(context.TODO(), l, time.Now()) + off, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) require.NoError(t, err) require.Empty(t, off) require.Equal(t, int64(0), cmp) }) t.Run("Dups", func(t *testing.T) { - l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true}) + l, err := Open(t.TempDir(), Options{KeyIndex: true}) require.NoError(t, err) defer l.Close() @@ -56,7 +55,7 @@ func TestDeletes(t *testing.T) { _, err = l.Publish(msgs) require.NoError(t, err) - off, cmp, err := Deletes(context.TODO(), l, time.Now()) + off, cmp, err := CompactDeletes(context.TODO(), l, time.Now()) require.NoError(t, err) require.Len(t, off, 5) for i := range nmsgs { diff --git a/compact/updates.go b/compact_updates.go similarity index 77% rename from compact/updates.go rename to compact_updates.go index bfda161..51c315e 100644 --- a/compact/updates.go +++ b/compact_updates.go @@ -1,12 +1,10 @@ -package compact +package klevdb import ( "context" "time" art "github.com/plar/go-adaptive-radix-tree/v2" - - "github.com/klev-dev/klevdb" ) // FindUpdates returns a set of offsets for messages that have @@ -14,7 +12,7 @@ import ( // // Messages before the last one for a given key are considered updates // that are no longer relevant, and therefore are eligible for deletion. -func FindUpdates(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, error) { +func FindUpdates(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, error) { maxOffset, err := l.NextOffset() if err != nil { return nil, err @@ -24,7 +22,7 @@ func FindUpdates(ctx context.Context, l klevdb.Log, before time.Time) (map[int64 var offsets = map[int64]struct{}{} SEARCH: - for offset := klevdb.OffsetOldest; offset < maxOffset; { + for offset := OffsetOldest; offset < maxOffset; { nextOffset, msgs, err := l.Consume(offset, 32) if err != nil { return nil, err @@ -60,7 +58,7 @@ SEARCH: // leaving only the current value (last update) for a key. // // returns the offsets it deleted and the amount of storage freed -func Updates(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, int64, error) { +func CompactUpdates(ctx context.Context, l Log, before time.Time) (map[int64]struct{}, int64, error) { offsets, err := FindUpdates(ctx, l, before) if err != nil { return nil, 0, err @@ -69,10 +67,10 @@ func Updates(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]str } // UpdatesMulti is similar to Updates, but will try to remove messages from multiple segments -func UpdatesMulti(ctx context.Context, l klevdb.Log, before time.Time, backoff klevdb.DeleteMultiBackoff) (map[int64]struct{}, int64, error) { +func CompactUpdatesMulti(ctx context.Context, l Log, before time.Time, backoff DeleteMultiBackoff) (map[int64]struct{}, int64, error) { offsets, err := FindUpdates(ctx, l, before) if err != nil { return nil, 0, err } - return klevdb.DeleteMulti(ctx, l, offsets, backoff) + return DeleteMulti(ctx, l, offsets, backoff) } diff --git a/compact/updates_test.go b/compact_updates_test.go similarity index 72% rename from compact/updates_test.go rename to compact_updates_test.go index 9d437e7..4b744f9 100644 --- a/compact/updates_test.go +++ b/compact_updates_test.go @@ -1,4 +1,4 @@ -package compact +package klevdb import ( "context" @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/require" - "github.com/klev-dev/klevdb" "github.com/klev-dev/klevdb/pkg/message" ) @@ -16,25 +15,25 @@ func TestUpdates(t *testing.T) { msgs := message.Gen(5) t.Run("Empty", func(t *testing.T) { - l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true}) + l, err := Open(t.TempDir(), Options{KeyIndex: true}) require.NoError(t, err) defer l.Close() - off, cmp, err := Updates(context.TODO(), l, time.Now()) + off, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) require.NoError(t, err) require.Empty(t, off) require.Equal(t, int64(0), cmp) }) t.Run("None", func(t *testing.T) { - l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true}) + l, err := Open(t.TempDir(), Options{KeyIndex: true}) require.NoError(t, err) defer l.Close() _, err = l.Publish(msgs) require.NoError(t, err) - off, cmp, err := Updates(context.TODO(), l, time.Now()) + off, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) require.NoError(t, err) require.Empty(t, off) require.Equal(t, int64(0), cmp) @@ -45,19 +44,19 @@ func TestUpdates(t *testing.T) { }) t.Run("First", func(t *testing.T) { - l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true}) + l, err := Open(t.TempDir(), Options{KeyIndex: true}) require.NoError(t, err) defer l.Close() _, err = l.Publish(msgs) require.NoError(t, err) - dmsgs := []klevdb.Message{msgs[0]} + dmsgs := []Message{msgs[0]} dmsgs[0].Value = []byte("abc") _, err = l.Publish(dmsgs) require.NoError(t, err) - off, cmp, err := Updates(context.TODO(), l, time.Now()) + off, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) require.NoError(t, err) require.Len(t, off, 1) require.Contains(t, off, int64(0)) @@ -69,19 +68,19 @@ func TestUpdates(t *testing.T) { }) t.Run("Last", func(t *testing.T) { - l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true}) + l, err := Open(t.TempDir(), Options{KeyIndex: true}) require.NoError(t, err) defer l.Close() _, err = l.Publish(msgs) require.NoError(t, err) - dmsgs := []klevdb.Message{msgs[4]} + dmsgs := []Message{msgs[4]} dmsgs[0].Value = []byte("abc") _, err = l.Publish(dmsgs) require.NoError(t, err) - off, cmp, err := Updates(context.TODO(), l, time.Now()) + off, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) require.NoError(t, err) require.Len(t, off, 1) require.Contains(t, off, int64(4)) @@ -93,7 +92,7 @@ func TestUpdates(t *testing.T) { }) t.Run("Multi", func(t *testing.T) { - l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true}) + l, err := Open(t.TempDir(), Options{KeyIndex: true}) require.NoError(t, err) defer l.Close() @@ -103,7 +102,7 @@ func TestUpdates(t *testing.T) { _, err = l.Publish(msgs) require.NoError(t, err) - off, cmp, err := Updates(context.TODO(), l, time.Now()) + off, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) require.NoError(t, err) require.Len(t, off, len(msgs)) for i := range msgs { @@ -117,7 +116,7 @@ func TestUpdates(t *testing.T) { }) t.Run("Time", func(t *testing.T) { - l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true}) + l, err := Open(t.TempDir(), Options{KeyIndex: true}) require.NoError(t, err) defer l.Close() @@ -131,7 +130,7 @@ func TestUpdates(t *testing.T) { _, err = l.Publish(nmsgs) require.NoError(t, err) - off, cmp, err := Updates(context.TODO(), l, nmsgs[2].Time) + off, cmp, err := CompactUpdates(context.TODO(), l, nmsgs[2].Time) require.NoError(t, err) require.Len(t, off, 3) for i := range 3 { @@ -145,21 +144,21 @@ func TestUpdates(t *testing.T) { }) t.Run("NilKey", func(t *testing.T) { - l, err := klevdb.Open(t.TempDir(), klevdb.Options{KeyIndex: true}) + l, err := Open(t.TempDir(), Options{KeyIndex: true}) require.NoError(t, err) defer l.Close() - _, err = l.Publish([]klevdb.Message{ + _, err = l.Publish([]Message{ {Key: []byte("x")}, {}, {}, }) require.NoError(t, err) - off, cmp, err := Updates(context.TODO(), l, time.Now()) + off, cmp, err := CompactUpdates(context.TODO(), l, time.Now()) require.NoError(t, err) require.Len(t, off, 1) require.Contains(t, off, int64(1)) - require.Equal(t, l.Size(klevdb.Message{}), cmp) + require.Equal(t, l.Size(Message{}), cmp) }) } From 772684d99e14c4f1b5fbdbf714b4b52b266002c7 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Mon, 20 Apr 2026 20:38:16 -0400 Subject: [PATCH 4/4] log renaming --- blocking.go => log_blocking.go | 0 reader.go => log_reader.go | 0 writer.go => log_writer.go | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename blocking.go => log_blocking.go (100%) rename reader.go => log_reader.go (100%) rename writer.go => log_writer.go (100%) diff --git a/blocking.go b/log_blocking.go similarity index 100% rename from blocking.go rename to log_blocking.go diff --git a/reader.go b/log_reader.go similarity index 100% rename from reader.go rename to log_reader.go diff --git a/writer.go b/log_writer.go similarity index 100% rename from writer.go rename to log_writer.go