diff --git a/pkg/local_object_storage/engine/bench_test.go b/pkg/local_object_storage/engine/bench_test.go new file mode 100644 index 0000000000..3d5077fa63 --- /dev/null +++ b/pkg/local_object_storage/engine/bench_test.go @@ -0,0 +1,394 @@ +package engine + +import ( + "errors" + "fmt" + "os" + "os/exec" + "sync" + "testing" + "time" + + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" +) + +const ( + benchmarkObjectsPerShard = 1000 + benchmarkRawMatchesPerShard = 16 + benchmarkExistsMissCount = 256 +) + +var benchmarkShardCounts = []int{1, 2, 4, 8, 16, 32} + +type benchmarkEngineFixture struct { + engine *StorageEngine + shards []shardWrapper + shardCount int +} + +type searchBenchmarkScenario struct { + container cid.ID + hitFS []objectcore.SearchFilter + hitCursor *objectcore.SearchCursor + hitAttrs []string + missFS []objectcore.SearchFilter + missCursor *objectcore.SearchCursor + missAttrs []string +} + +type collectRawBenchmarkScenario struct { + container cid.ID + attr string + hitValue []byte + missValue []byte +} + +type existsBenchmarkScenario struct { + hitFirst oid.Address + hitLast oid.Address + misses []oid.Address +} + +func BenchmarkSearch(b *testing.B) { + forEachBenchmarkFixture(b, func(b *testing.B, _ int, fx *benchmarkEngineFixture) { + sc := prepareSearchBenchmarkScenario(b, fx) + b.Run("hit", func(b *testing.B) { + benchmarkSearch(b, fx, sc.container, sc.hitFS, sc.hitAttrs, sc.hitCursor) + }) + b.Run("miss", func(b *testing.B) { + benchmarkSearch(b, fx, sc.container, sc.missFS, sc.missAttrs, sc.missCursor) + }) + }) +} + +func BenchmarkListWithCursor(b *testing.B) { + forEachBenchmarkFixture(b, func(b *testing.B, _ int, fx *benchmarkEngineFixture) { + prepareListBenchmarkScenario(b, fx) + for _, batchSize := range []uint32{1, 10, 100} { + b.Run(fmt.Sprintf("batch=%d", batchSize), func(b *testing.B) { + benchmarkListWithCursor(b, fx, batchSize) + }) + } + }) +} + +func BenchmarkCollectRawWithAttribute(b *testing.B) { + forEachBenchmarkFixture(b, func(b *testing.B, shardCount int, fx *benchmarkEngineFixture) { + sc := prepareCollectRawBenchmarkScenario(b, fx) + b.Run("hit", func(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + dropCaches(b) + ids, err := fx.engine.collectRawWithAttribute(sc.container, sc.attr, sc.hitValue) + if err != nil { + b.Fatal(err) + } + if len(ids) != shardCount*benchmarkRawMatchesPerShard { + b.Fatalf("unexpected hit result len %d", len(ids)) + } + } + }) + b.Run("miss", func(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + dropCaches(b) + ids, err := fx.engine.collectRawWithAttribute(sc.container, sc.attr, sc.missValue) + if err != nil { + b.Fatal(err) + } + if len(ids) != 0 { + b.Fatalf("unexpected miss result len %d", len(ids)) + } + } + }) + }) +} + +func BenchmarkExistsPhysical(b *testing.B) { + forEachBenchmarkFixture(b, func(b *testing.B, _ int, fx *benchmarkEngineFixture) { + sc := prepareExistsBenchmarkScenario(b, fx) + b.Run("hit-first", func(b *testing.B) { + benchmarkExists(b, fx, []oid.Address{sc.hitFirst}, true) + }) + b.Run("hit-last", func(b *testing.B) { + benchmarkExists(b, fx, []oid.Address{sc.hitLast}, true) + }) + b.Run("miss-rotating", func(b *testing.B) { + benchmarkExists(b, fx, sc.misses, false) + }) + }) +} + +func benchmarkSearch(b *testing.B, fx *benchmarkEngineFixture, container cid.ID, fs []objectcore.SearchFilter, attrs []string, cursor *objectcore.SearchCursor) { + b.ReportAllocs() + for b.Loop() { + dropCaches(b) + _, nextCursor, err := fx.engine.Search(container, fs, attrs, cursor, 1000) + if err != nil { + b.Fatal(err) + } + if len(nextCursor) != 0 { + b.Fatalf("unexpected cursor len %d", len(nextCursor)) + } + } +} + +func benchmarkListWithCursor(b *testing.B, fx *benchmarkEngineFixture, batchSize uint32) { + b.ReportAllocs() + for b.Loop() { + dropCaches(b) + var cursor *Cursor + var err error + for { + _, cursor, err = fx.engine.ListWithCursor(batchSize, cursor) + if errors.Is(err, ErrEndOfListing) { + break + } + if err != nil { + b.Fatal(err) + } + } + } +} + +func benchmarkExists(b *testing.B, fx *benchmarkEngineFixture, addrs []oid.Address, expected bool) { + var i int + b.ReportAllocs() + for b.Loop() { + ok, err := fx.engine.existsPhysical(addrs[i%len(addrs)]) + if err != nil { + b.Fatal(err) + } + if ok != expected { + b.Fatalf("unexpected exists=%t expected=%t", ok, expected) + } + i++ + } +} + +func newBenchmarkEngineFixture(tb testing.TB, shardCount int) *benchmarkEngineFixture { + engine := benchmarkNewEngineWithShardNum(tb, shardCount) + return &benchmarkEngineFixture{ + engine: engine, + shards: engine.unsortedShards(), + shardCount: shardCount, + } +} + +func forEachBenchmarkFixture(b *testing.B, bench func(*testing.B, int, *benchmarkEngineFixture)) { + for _, shardCount := range benchmarkShardCounts { + b.Run(fmt.Sprintf("shards=%d", shardCount), func(b *testing.B) { + fx := newBenchmarkEngineFixture(b, shardCount) + b.Cleanup(func() { + _ = fx.engine.Close() + }) + bench(b, shardCount, fx) + }) + } +} + +func prepareSearchBenchmarkScenario(tb testing.TB, fx *benchmarkEngineFixture) searchBenchmarkScenario { + started := time.Now() + defer func() { + tb.Logf("search setup wall=%s shards=%d objects_per_shard=%d", time.Since(started), fx.shardCount, benchmarkObjectsPerShard) + }() + + const searchAttr = "bench_search_key" + const hitValue = "value-hit" + const missValue = "value-miss" + + sc := searchBenchmarkScenario{ + container: cidtest.ID(), + } + + var wg sync.WaitGroup + errCh := make(chan error, len(fx.shards)) + + for shardIdx, sh := range fx.shards { + wg.Add(1) + go func(shardIdx int, sh shardWrapper) { + defer wg.Done() + for objIdx := range benchmarkObjectsPerShard { + obj := generateObjectWithCID(sc.container) + obj.SetAttributes( + object.NewAttribute(searchAttr, fmt.Sprintf("value-%02d-%03d", shardIdx, objIdx)), + ) + + if shardIdx == 0 && objIdx == 0 { + obj.SetAttributes( + object.NewAttribute(searchAttr, hitValue), + ) + } + + if err := sh.Put(obj, nil); err != nil { + errCh <- err + return + } + } + }(shardIdx, sh) + } + + wg.Wait() + close(errCh) + for err := range errCh { + if err != nil { + tb.Fatal(err) + } + } + + sc.hitFS, sc.hitAttrs, sc.hitCursor = benchmarkSearchQuery(tb, searchAttr, hitValue) + sc.missFS, sc.missAttrs, sc.missCursor = benchmarkSearchQuery(tb, searchAttr, missValue) + return sc +} + +func prepareListBenchmarkScenario(tb testing.TB, fx *benchmarkEngineFixture) { + started := time.Now() + defer func() { + tb.Logf("list setup wall=%s shards=%d objects_per_shard=%d", time.Since(started), fx.shardCount, benchmarkObjectsPerShard) + }() + + var wg sync.WaitGroup + errCh := make(chan error, len(fx.shards)) + + for _, sh := range fx.shards { + wg.Add(1) + go func(sh shardWrapper) { + defer wg.Done() + for range benchmarkObjectsPerShard { + obj := generateObjectWithCID(cidtest.ID()) + if err := sh.Put(obj, nil); err != nil { + errCh <- err + return + } + } + }(sh) + } + + wg.Wait() + close(errCh) + for err := range errCh { + if err != nil { + tb.Fatal(err) + } + } +} + +func prepareCollectRawBenchmarkScenario(tb testing.TB, fx *benchmarkEngineFixture) collectRawBenchmarkScenario { + started := time.Now() + defer func() { + tb.Logf("collectRaw setup wall=%s shards=%d matches_per_shard=%d", time.Since(started), fx.shardCount, benchmarkRawMatchesPerShard) + }() + + sc := collectRawBenchmarkScenario{ + container: cidtest.ID(), + attr: "bench_raw_group", + hitValue: []byte("target"), + missValue: []byte("missing"), + } + + var wg sync.WaitGroup + errCh := make(chan error, len(fx.shards)) + + for _, sh := range fx.shards { + wg.Add(1) + go func(sh shardWrapper) { + defer wg.Done() + for range benchmarkRawMatchesPerShard { + obj := generateObjectWithCID(sc.container) + obj.SetAttributes( + object.NewAttribute(sc.attr, string(sc.hitValue)), + ) + + if err := sh.Put(obj, nil); err != nil { + errCh <- err + return + } + } + }(sh) + } + + wg.Wait() + close(errCh) + for err := range errCh { + if err != nil { + tb.Fatal(err) + } + } + + return sc +} + +func prepareExistsBenchmarkScenario(tb testing.TB, fx *benchmarkEngineFixture) existsBenchmarkScenario { + started := time.Now() + defer func() { + tb.Logf("exists setup wall=%s shards=%d miss_count=%d", time.Since(started), fx.shardCount, benchmarkExistsMissCount) + }() + + sc := existsBenchmarkScenario{ + misses: make([]oid.Address, benchmarkExistsMissCount), + } + + for i := range sc.misses { + sc.misses[i] = oidtest.Address() + } + + sc.hitFirst = benchmarkExistsObject(tb, fx.engine, true) + sc.hitLast = benchmarkExistsObject(tb, fx.engine, false) + return sc +} + +func benchmarkNewEngineWithShardNum(tb testing.TB, shardCount int) *StorageEngine { + shards := make([]*shard.Shard, 0, shardCount) + for i := range shardCount { + shards = append(shards, testNewShard(tb, i)) + } + return testNewEngineWithShards(shards...) +} + +func benchmarkSearchQuery(tb testing.TB, attr, val string) ([]objectcore.SearchFilter, []string, *objectcore.SearchCursor) { + fs := object.SearchFilters{} + fs.AddFilter(attr, val, object.MatchStringEqual) + + attrs := []string{attr} + resFS, cursor, err := objectcore.PreprocessSearchQuery(fs, attrs, "") + if err != nil { + tb.Fatal(err) + } + return resFS, attrs, cursor +} + +func dropCaches(b *testing.B) { + b.StopTimer() + + _ = exec.Command("sync").Run() + err := os.WriteFile("/proc/sys/vm/drop_caches", []byte("3"), 0200) + if err != nil { + b.Skipf("can't drop caches: %v", err) + } + + b.StartTimer() +} + +func benchmarkExistsObject(tb testing.TB, engine *StorageEngine, first bool) oid.Address { + obj := generateObjectWithCID(cidtest.ID()) + sorted := engine.sortedShards(obj.GetID()) + if len(sorted) == 0 { + tb.Fatal("no shards") + } + + target := sorted[len(sorted)-1] + if first { + target = sorted[0] + } + + if err := target.Put(obj, nil); err != nil { + tb.Fatal(err) + } + return obj.Address() +} diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index e9c7f18493..c3ce9fa8a5 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -20,7 +20,6 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/checksum" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" - cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" @@ -39,47 +38,6 @@ func (s epochState) CurrentEpoch() uint64 { return s.e } -func BenchmarkExists(b *testing.B) { - b.Run("2 shards", func(b *testing.B) { - benchmarkExists(b, 2) - }) - b.Run("4 shards", func(b *testing.B) { - benchmarkExists(b, 4) - }) - b.Run("8 shards", func(b *testing.B) { - benchmarkExists(b, 8) - }) -} - -func benchmarkExists(b *testing.B, shardNum int) { - shards := make([]*shard.Shard, shardNum) - for i := range shardNum { - shards[i] = testNewShard(b, i) - } - - e := testNewEngineWithShards(shards...) - b.Cleanup(func() { - _ = e.Close() - }) - - addr := oidtest.Address() - for range 100 { - obj := generateObjectWithCID(cidtest.ID()) - err := e.Put(obj, nil) - if err != nil { - b.Fatal(err) - } - } - - b.ReportAllocs() - for b.Loop() { - ok, err := e.existsPhysical(addr) - if err != nil || ok { - b.Fatalf("%t %v", ok, err) - } - } -} - func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine { engine := New(WithObjectPutRetryTimeout(100 * time.Millisecond)) diff --git a/pkg/local_object_storage/engine/exists.go b/pkg/local_object_storage/engine/exists.go index 98804759f6..01f741b1c6 100644 --- a/pkg/local_object_storage/engine/exists.go +++ b/pkg/local_object_storage/engine/exists.go @@ -2,6 +2,7 @@ package engine import ( "errors" + "sync" ierrors "github.com/nspcc-dev/neofs-node/internal/errors" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" @@ -9,21 +10,36 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) +const ( + existsParallelThreshold = 4 +) + +type shardExistsResult struct { + exists bool + err error +} + func (e *StorageEngine) existsPhysical(addr oid.Address) (bool, error) { if e.metrics != nil { defer elapsed(e.metrics.AddExistsDuration)() } - for _, sh := range e.sortedShards(addr.Object()) { + shs := e.sortedShards(addr.Object()) + if len(shs) < existsParallelThreshold { + return e.existsPhysicalSequential(addr, shs) + } + return e.existsPhysicalParallel(addr, shs) +} + +func (e *StorageEngine) existsPhysicalSequential(addr oid.Address, shs []shardWrapper) (bool, error) { + for _, sh := range shs { exists, err := sh.Exists(addr, false) if err != nil { if shard.IsErrObjectExpired(err) { return true, nil } - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || - errors.Is(err, ierrors.ErrParentObject) || - errors.Is(err, apistatus.ErrObjectNotFound) { + if isObjectPresenceStatus(err) { return false, err } @@ -38,3 +54,48 @@ func (e *StorageEngine) existsPhysical(addr oid.Address) (bool, error) { return false, nil } + +func (e *StorageEngine) existsPhysicalParallel(addr oid.Address, shs []shardWrapper) (bool, error) { + results := make([]shardExistsResult, len(shs)) + workers := (len(shs) + listParallelThreshold - 1) / listParallelThreshold + chunkSize := (len(shs) + workers - 1) / workers + + var wg sync.WaitGroup + wg.Add(workers) + for w := range workers { + start := w * chunkSize + end := start + chunkSize + end = min(end, len(shs)) + + go func(start, end int) { + defer wg.Done() + for i := start; i < end; i++ { + results[i].exists, results[i].err = shs[i].Exists(addr, false) + } + }(start, end) + } + wg.Wait() + + for i, res := range results { + if res.err != nil { + if shard.IsErrObjectExpired(res.err) { + return true, nil + } + if isObjectPresenceStatus(res.err) { + return false, res.err + } + e.reportShardError(shs[i], "could not check existence of object in shard", res.err) + continue + } + if res.exists { + return true, nil + } + } + return false, nil +} + +func isObjectPresenceStatus(err error) bool { + return errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || + errors.Is(err, ierrors.ErrParentObject) || + errors.Is(err, apistatus.ErrObjectNotFound) +} diff --git a/pkg/local_object_storage/engine/list.go b/pkg/local_object_storage/engine/list.go index 72371e03b9..dd5113682b 100644 --- a/pkg/local_object_storage/engine/list.go +++ b/pkg/local_object_storage/engine/list.go @@ -1,6 +1,8 @@ package engine import ( + "sync" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -12,6 +14,13 @@ import ( // cursor. Use nil cursor object to start listing again. var ErrEndOfListing = shard.ErrEndOfListing +const listParallelThreshold = 4 + +type shardListResult struct { + items []objectcore.AddressWithAttributes + err error +} + // Cursor is a type for continuous object listing. It's returned from // [StorageEngine.ListWithCursor] and can be reused as a parameter for it for // subsequent requests. @@ -63,14 +72,19 @@ func (e *StorageEngine) ListWithCursor(count uint32, cursor *Cursor, attrs ...st var result, buf []objectcore.AddressWithAttributes cnr, obj := cursor.shardCursor.ContainerID(), cursor.shardCursor.LastObjectID() - for _, sh := range shards { - cursor.shardCursor.Reset(cnr, obj) - res, _, err := sh.ListWithCursor(int(count), cursor.shardCursor, attrs...) - if err != nil || len(res) == 0 { + var shardResults []shardListResult + if len(shards) <= listParallelThreshold { + shardResults = listSequential(shards, int(count), cnr, obj, attrs...) + } else { + shardResults = listParallel(shards, int(count), cnr, obj, attrs...) + } + + for i := range shards { + if shardResults[i].err != nil || len(shardResults[i].items) == 0 { continue } prev := result - result = mergeListResults(buf, result, res, sh.ID().String(), int(count)) + result = mergeListResults(buf, result, shardResults[i].items, shards[i].ID().String(), int(count)) if prev != nil { buf = prev[:0] } @@ -85,6 +99,41 @@ func (e *StorageEngine) ListWithCursor(count uint32, cursor *Cursor, attrs ...st return result, cursor, nil } +func listSequential(shards []shardWrapper, count int, cnr cid.ID, obj oid.ID, attrs ...string) []shardListResult { + res := make([]shardListResult, len(shards)) + crs := shard.NewCursor(cnr, obj) + for i := range shards { + crs.Reset(cnr, obj) + res[i].items, _, res[i].err = shards[i].ListWithCursor(count, crs, attrs...) + } + return res +} + +func listParallel(shards []shardWrapper, count int, cnr cid.ID, obj oid.ID, attrs ...string) []shardListResult { + res := make([]shardListResult, len(shards)) + workers := (len(shards) + listParallelThreshold - 1) / listParallelThreshold + chunkSize := (len(shards) + workers - 1) / workers + + var wg sync.WaitGroup + wg.Add(workers) + for w := range workers { + start := w * chunkSize + end := start + chunkSize + end = min(end, len(shards)) + + go func(start, end int) { + defer wg.Done() + for i := start; i < end; i++ { + crs := shard.NewCursor(cnr, obj) + res[i].items, _, res[i].err = shards[i].ListWithCursor(count, crs, attrs...) + } + }(start, end) + } + + wg.Wait() + return res +} + // mergeListResults merges a sorted accumulated result with a new sorted slice // of items from a single shard into a single sorted deduplicated slice of at // most count items. Objects present on multiple shards have their ShardIDs merged. diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index 25e910b9e7..56a7fd2ca6 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -4,6 +4,8 @@ import ( "bytes" "errors" "fmt" + "slices" + "sync" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-sdk-go/client" @@ -12,6 +14,22 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) +const ( + searchParallelThreshold = 4 + collectRawParallelThreshold = 4 +) + +type shardSearchResult struct { + items []client.SearchResultItem + more bool + err error +} + +type shardCollectRawResult struct { + ids []oid.ID + err error +} + // selectOld selects the objects from local storage that match select parameters. // // Returns any error encountered that did not allow to completely select the objects. @@ -69,6 +87,14 @@ func (e *StorageEngine) Search(cnr cid.ID, fs []objectcore.SearchFilter, attrs [ if len(shs) == 0 { return nil, nil, nil } + if len(shs) <= searchParallelThreshold { + return e.searchSequential(cnr, fs, attrs, cursor, count, shs) + } + + return e.searchParallel(cnr, fs, attrs, cursor, count, shs) +} + +func (e *StorageEngine) searchSequential(cnr cid.ID, fs []objectcore.SearchFilter, attrs []string, cursor *objectcore.SearchCursor, count uint16, shs []shardWrapper) ([]client.SearchResultItem, []byte, error) { items, nextCursor, err := shs[0].Search(cnr, fs, attrs, cursor, count) if err != nil { e.reportShardError(shs[0], "could not select objects from shard", err) @@ -86,6 +112,67 @@ func (e *StorageEngine) Search(cnr cid.ID, fs []objectcore.SearchFilter, attrs [ } sets, mores = append(sets, items), append(mores, nextCursor != nil) } + + return finalizeSearch(fs, attrs, count, sets, mores) +} + +func (e *StorageEngine) searchParallel(cnr cid.ID, fs []objectcore.SearchFilter, attrs []string, cursor *objectcore.SearchCursor, count uint16, shs []shardWrapper) ([]client.SearchResultItem, []byte, error) { + results := make([]shardSearchResult, len(shs)) + workers := (len(shs) + searchParallelThreshold - 1) / searchParallelThreshold + chunkSize := (len(shs) + workers - 1) / workers + + var wg sync.WaitGroup + wg.Add(workers) + + for w := range workers { + start := w * chunkSize + end := start + chunkSize + end = min(end, len(shs)) + + go func(start, end int) { + defer wg.Done() + + for idx := start; idx < end; idx++ { + var cClone *objectcore.SearchCursor + if cursor != nil { + cClone = &objectcore.SearchCursor{ + PrimaryKeysPrefix: slices.Clone(cursor.PrimaryKeysPrefix), + PrimarySeekKey: slices.Clone(cursor.PrimarySeekKey), + } + } + + items, nextCursor, err := shs[idx].Search(cnr, fs, attrs, cClone, count) + results[idx] = shardSearchResult{ + items: items, + more: nextCursor != nil, + err: err, + } + } + }(start, end) + } + + wg.Wait() + + sets := make([][]client.SearchResultItem, 0, len(shs)) + mores := make([]bool, 0, len(shs)) + + for i := range shs { + if results[i].err != nil { + e.reportShardError(shs[i], "could not select objects from shard", results[i].err) + continue + } + sets = append(sets, results[i].items) + mores = append(mores, results[i].more) + } + + if len(sets) == 0 { + return nil, nil, nil + } + + return finalizeSearch(fs, attrs, count, sets, mores) +} + +func finalizeSearch(fs []objectcore.SearchFilter, attrs []string, count uint16, sets [][]client.SearchResultItem, mores []bool) ([]client.SearchResultItem, []byte, error) { var ( firstAttr string firstFilter *object.SearchFilter @@ -111,21 +198,60 @@ func (e *StorageEngine) Search(cnr cid.ID, fs []objectcore.SearchFilter, attrs [ } func (e *StorageEngine) collectRawWithAttribute(cnr cid.ID, attr string, val []byte) ([]oid.ID, error) { - var ( - err error - shards = e.unsortedShards() - ids = make([][]oid.ID, len(shards)) - ) + shards := e.unsortedShards() + if len(shards) == 0 { + return nil, nil + } - for i, sh := range shards { - ids[i], err = sh.CollectRawWithAttribute(cnr, attr, val) - if err != nil { - return nil, fmt.Errorf("shard %s: %w", sh.ID(), err) + var results []shardCollectRawResult + if len(shards) <= collectRawParallelThreshold { + results = collectRawWithAttributeSequential(cnr, attr, val, shards) + } else { + results = collectRawWithAttributeParallel(cnr, attr, val, shards) + } + + ids := make([][]oid.ID, len(shards)) + for i := range shards { + if results[i].err != nil { + return nil, fmt.Errorf("shard %s: %w", shards[i].ID(), results[i].err) } + ids[i] = results[i].ids } return mergeOIDs(ids), nil } +func collectRawWithAttributeSequential(cnr cid.ID, attr string, val []byte, shards []shardWrapper) []shardCollectRawResult { + res := make([]shardCollectRawResult, len(shards)) + for i := range shards { + res[i].ids, res[i].err = shards[i].CollectRawWithAttribute(cnr, attr, val) + } + return res +} + +func collectRawWithAttributeParallel(cnr cid.ID, attr string, val []byte, shards []shardWrapper) []shardCollectRawResult { + res := make([]shardCollectRawResult, len(shards)) + workers := (len(shards) + collectRawParallelThreshold - 1) / collectRawParallelThreshold + chunkSize := (len(shards) + workers - 1) / workers + + var wg sync.WaitGroup + wg.Add(workers) + for w := range workers { + start := w * chunkSize + end := start + chunkSize + end = min(end, len(shards)) + + go func(start, end int) { + defer wg.Done() + for i := start; i < end; i++ { + res[i].ids, res[i].err = shards[i].CollectRawWithAttribute(cnr, attr, val) + } + }(start, end) + } + + wg.Wait() + return res +} + // mergeOIDs merges given set of lists of object IDs into a single flat list. // ids are expected to be sorted and the result contains no duplicates from // different original lists (slices are expected to not contain any inner