From c3372e523dd6ef16f5969eeccbf2c2d6c8985436 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 12 Jan 2026 16:15:25 +0900 Subject: [PATCH 1/7] querier: support series batching from Store Gateways Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + docs/blocks-storage/querier.md | 5 + docs/configuration/config-file-reference.md | 5 + pkg/querier/blocks_store_queryable.go | 73 ++++++--- pkg/querier/blocks_store_queryable_test.go | 45 ++++++ pkg/querier/querier.go | 9 ++ pkg/querier/querier_test.go | 12 ++ pkg/storegateway/bucket_stores_bench_test.go | 162 +++++++++++++++++++ pkg/storegateway/bucket_stores_test.go | 2 +- schemas/cortex-config-schema.json | 6 + 10 files changed, 294 insertions(+), 26 deletions(-) create mode 100644 pkg/storegateway/bucket_stores_bench_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a48e74a931..ec095dd5ec3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077 * [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152 * [FEATURE] Ingester: Add experimental active series queried metric. #7173 +* [FEATURE] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203 * [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191 * [ENHANCEMENT] Ingester: Optimize labels out-of-order (ooo) check by allowing the iteration to terminate immediately upon finding the first unsorted label. #7186 * [ENHANCEMENT] Distributor: Skip attaching `__unit__` and `__type__` labels when `-distributor.enable-type-and-unit-labels` is enabled, as these are appended from metadata. #7145 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 94f8767814c..7879646ff32 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -237,6 +237,11 @@ querier: # CLI flag: -querier.store-gateway-consistency-check-max-attempts [store_gateway_consistency_check_max_attempts: | default = 3] + # [Experimental] The maximum number of series to be batched in a single gRPC + # response message from Store Gateways. A value of 0 or 1 disables batching. + # CLI flag: -querier.store-gateway-series-batch-size + [store_gateway_series_batch_size: | default = 1] + # The maximum number of times we attempt fetching data from ingesters for # retryable errors (ex. partial data returned). # CLI flag: -querier.ingester-query-max-attempts diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 5f4abf77f31..41de9d6e294 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4829,6 +4829,11 @@ store_gateway_client: # CLI flag: -querier.store-gateway-consistency-check-max-attempts [store_gateway_consistency_check_max_attempts: | default = 3] +# [Experimental] The maximum number of series to be batched in a single gRPC +# response message from Store Gateways. A value of 0 or 1 disables batching. +# CLI flag: -querier.store-gateway-series-batch-size +[store_gateway_series_batch_size: | default = 1] + # The maximum number of times we attempt fetching data from ingesters for # retryable errors (ex. partial data returned). # CLI flag: -querier.ingester-query-max-attempts diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index b888888c628..b3e336a940a 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -143,6 +143,7 @@ type BlocksStoreQueryable struct { storeGatewayQueryStatsEnabled bool storeGatewayConsistencyCheckMaxAttempts int + storeGatewaySeriesBatchSize int64 // Subservices manager. subservices *services.Manager @@ -175,6 +176,7 @@ func NewBlocksStoreQueryable( limits: limits, storeGatewayQueryStatsEnabled: config.StoreGatewayQueryStatsEnabled, storeGatewayConsistencyCheckMaxAttempts: config.StoreGatewayConsistencyCheckMaxAttempts, + storeGatewaySeriesBatchSize: config.StoreGatewaySeriesBatchSize, } q.Service = services.NewBasicService(q.starting, q.running, q.stopping) @@ -306,6 +308,7 @@ func (q *BlocksStoreQueryable) Querier(mint, maxt int64) (storage.Querier, error queryStoreAfter: q.queryStoreAfter, storeGatewayQueryStatsEnabled: q.storeGatewayQueryStatsEnabled, storeGatewayConsistencyCheckMaxAttempts: q.storeGatewayConsistencyCheckMaxAttempts, + storeGatewaySeriesBatchSize: q.storeGatewaySeriesBatchSize, }, nil } @@ -328,6 +331,9 @@ type blocksStoreQuerier struct { // The maximum number of times we attempt fetching missing blocks from different Store Gateways. storeGatewayConsistencyCheckMaxAttempts int + + // The maximum number of series to be batched in a single gRPC response message from Store Gateways. + storeGatewaySeriesBatchSize int64 } // Select implements storage.Querier interface. @@ -648,7 +654,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( seriesQueryStats := &hintspb.QueryStats{} skipChunks := sp != nil && sp.Func == "series" - req, err := createSeriesRequest(minT, maxT, limit, convertedMatchers, sp, shardingInfo, skipChunks, blockIDs, defaultAggrs) + req, err := createSeriesRequest(minT, maxT, limit, convertedMatchers, sp, shardingInfo, skipChunks, blockIDs, defaultAggrs, q.storeGatewaySeriesBatchSize) if err != nil { return errors.Wrapf(err, "failed to create series request") } @@ -670,6 +676,37 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( myWarnings := annotations.Annotations(nil) myQueriedBlocks := []ulid.ULID(nil) + processSeries := func(s *storepb.Series) error { + mySeries = append(mySeries, s) + + // Add series fingerprint to query limiter; will return error if we are over the limit + limitErr := queryLimiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s.PromLabels())) + if limitErr != nil { + return validation.LimitError(limitErr.Error()) + } + + // Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled). + if maxChunksLimit > 0 { + actual := numChunks.Add(int32(len(s.Chunks))) + if actual > int32(leftChunksLimit) { + return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit)) + } + } + chunksSize := countChunkBytes(s) + dataSize := countDataBytes(s) + if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil { + return validation.LimitError(chunkBytesLimitErr.Error()) + } + if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks)); chunkLimitErr != nil { + return validation.LimitError(chunkLimitErr.Error()) + } + if dataBytesLimitErr := queryLimiter.AddDataBytes(dataSize); dataBytesLimitErr != nil { + return validation.LimitError(dataBytesLimitErr.Error()) + } + + return nil + } + for { // Ensure the context hasn't been canceled in the meanwhile (eg. an error occurred // in another goroutine). @@ -708,34 +745,19 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( return errors.Wrapf(err, "failed to receive series from %s", c.RemoteAddress()) } - // Response may either contain series, warning or hints. + // Response may either contain series, batch, warning or hints. if s := resp.GetSeries(); s != nil { - mySeries = append(mySeries, s) - - // Add series fingerprint to query limiter; will return error if we are over the limit - limitErr := queryLimiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s.PromLabels())) - if limitErr != nil { - return validation.LimitError(limitErr.Error()) + if err := processSeries(s); err != nil { + return err } + } - // Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled). - if maxChunksLimit > 0 { - actual := numChunks.Add(int32(len(s.Chunks))) - if actual > int32(leftChunksLimit) { - return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit)) + if b := resp.GetBatch(); b != nil { + for _, s := range b.Series { + if err := processSeries(s); err != nil { + return err } } - chunksSize := countChunkBytes(s) - dataSize := countDataBytes(s) - if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil { - return validation.LimitError(chunkBytesLimitErr.Error()) - } - if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks)); chunkLimitErr != nil { - return validation.LimitError(chunkLimitErr.Error()) - } - if dataBytesLimitErr := queryLimiter.AddDataBytes(dataSize); dataBytesLimitErr != nil { - return validation.LimitError(dataBytesLimitErr.Error()) - } } if w := resp.GetWarning(); w != "" { @@ -1044,7 +1066,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( return valueSets, warnings, queriedBlocks, nil, merr.Err() } -func createSeriesRequest(minT, maxT, limit int64, matchers []storepb.LabelMatcher, selectHints *storage.SelectHints, shardingInfo *storepb.ShardInfo, skipChunks bool, blockIDs []ulid.ULID, aggrs []storepb.Aggr) (*storepb.SeriesRequest, error) { +func createSeriesRequest(minT, maxT, limit int64, matchers []storepb.LabelMatcher, selectHints *storage.SelectHints, shardingInfo *storepb.ShardInfo, skipChunks bool, blockIDs []ulid.ULID, aggrs []storepb.Aggr, batchSize int64) (*storepb.SeriesRequest, error) { // Selectively query only specific blocks. hints := &hintspb.SeriesRequestHints{ BlockMatchers: []storepb.LabelMatcher{ @@ -1074,6 +1096,7 @@ func createSeriesRequest(minT, maxT, limit int64, matchers []storepb.LabelMatche // TODO: support more downsample levels when downsampling is supported. Aggregates: aggrs, MaxResolutionWindow: downsample.ResLevel0, + ResponseBatchSize: batchSize, } if selectHints != nil { diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 09bae58cee7..26a2c2fb4ac 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -89,6 +89,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { } tests := map[string]struct { + seriesBatchSize int64 finderResult bucketindex.Blocks finderErr error storeSetResponses []any @@ -1581,6 +1582,33 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, }, + "a single store-gateway instance returns a batch of series": { + seriesBatchSize: 2, + finderResult: bucketindex.Blocks{ + &bucketindex.Block{ID: block1}, + }, + storeSetResponses: []any{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.FromStrings(metricNameLabel.Name, metricNameLabel.Value, series1Label.Name, series1Label.Value), []cortexpb.Sample{{Value: 1, TimestampMs: minT}}, nil, nil), + mockSeriesResponse(labels.FromStrings(metricNameLabel.Name, metricNameLabel.Value, series2Label.Name, series2Label.Value), []cortexpb.Sample{{Value: 2, TimestampMs: minT}}, nil, nil), + mockHintsResponse(block1), + }}: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel, series1Label), + values: []valueResult{{t: minT, v: 1}}, + }, + { + lbls: labels.New(metricNameLabel, series2Label), + values: []valueResult{{t: minT, v: 2}}, + }, + }, + }, } for testName, testData := range tests { @@ -1622,6 +1650,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { limits: testData.limits, storeGatewayConsistencyCheckMaxAttempts: 3, + storeGatewaySeriesBatchSize: testData.seriesBatchSize, } matchers := []*labels.Matcher{ @@ -1697,6 +1726,22 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { require.True(t, found) } + if testData.seriesBatchSize > 0 { + found := false + for _, resp := range testData.storeSetResponses { + if clientsMap, ok := resp.(map[BlocksStoreClient][]ulid.ULID); ok { + for client := range clientsMap { + if mockClient, ok := client.(*storeGatewayClientMock); ok { + // verify if SG get passed seriesBatchSize + assert.Equal(t, testData.seriesBatchSize, mockClient.lastSeriesRequest.ResponseBatchSize) + found = true + } + } + } + } + require.True(t, found) + } + // Assert on metrics (optional, only for test cases defining it). if testData.expectedMetrics != "" { assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(testData.expectedMetrics))) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 922501d9310..720e304dfcb 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -81,6 +81,9 @@ type Config struct { // The maximum number of times we attempt fetching missing blocks from different Store Gateways. StoreGatewayConsistencyCheckMaxAttempts int `yaml:"store_gateway_consistency_check_max_attempts"` + // The maximum number of series to be batched in a single gRPC response message from Store Gateways. + StoreGatewaySeriesBatchSize int64 `yaml:"store_gateway_series_batch_size"` + // The maximum number of times we attempt fetching data from Ingesters. IngesterQueryMaxAttempts int `yaml:"ingester_query_max_attempts"` @@ -109,6 +112,7 @@ var ( errEmptyTimeRange = errors.New("empty time range") errUnsupportedResponseCompression = errors.New("unsupported response compression. Supported compression 'gzip', 'snappy', 'zstd' and '' (disable compression)") errInvalidConsistencyCheckAttempts = errors.New("store gateway consistency check max attempts should be greater or equal than 1") + errInvalidSeriesBatchSize = errors.New("store gateway series batch size should be greater or equal than 0") errInvalidIngesterQueryMaxAttempts = errors.New("ingester query max attempts should be greater or equal than 1") errInvalidParquetQueryableDefaultBlockStore = errors.New("unsupported parquet queryable default block store. Supported options are tsdb and parquet") ) @@ -142,6 +146,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.StoreGatewayAddresses, "querier.store-gateway-addresses", "", "Comma separated list of store-gateway addresses in DNS Service Discovery format. This option should be set when using the blocks storage and the store-gateway sharding is disabled (when enabled, the store-gateway instances form a ring and addresses are picked from the ring).") f.BoolVar(&cfg.StoreGatewayQueryStatsEnabled, "querier.store-gateway-query-stats-enabled", true, "If enabled, store gateway query stats will be logged using `info` log level.") f.IntVar(&cfg.StoreGatewayConsistencyCheckMaxAttempts, "querier.store-gateway-consistency-check-max-attempts", maxFetchSeriesAttempts, "The maximum number of times we attempt fetching missing blocks from different store-gateways. If no more store-gateways are left (ie. due to lower replication factor) than we'll end the retries earlier") + f.Int64Var(&cfg.StoreGatewaySeriesBatchSize, "querier.store-gateway-series-batch-size", 1, "[Experimental] The maximum number of series to be batched in a single gRPC response message from Store Gateways. A value of 0 or 1 disables batching.") f.IntVar(&cfg.IngesterQueryMaxAttempts, "querier.ingester-query-max-attempts", 1, "The maximum number of times we attempt fetching data from ingesters for retryable errors (ex. partial data returned).") f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.") f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).") @@ -179,6 +184,10 @@ func (cfg *Config) Validate() error { return errInvalidConsistencyCheckAttempts } + if cfg.StoreGatewaySeriesBatchSize < 0 { + return errInvalidSeriesBatchSize + } + if cfg.IngesterQueryMaxAttempts < 1 { return errInvalidIngesterQueryMaxAttempts } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index bf3ff326213..ec08fee2ed2 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1666,6 +1666,18 @@ func TestConfig_Validate(t *testing.T) { }, expected: errInvalidParquetQueryableDefaultBlockStore, }, + "should if if invalid series batch size": { + setup: func(cfg *Config) { + cfg.StoreGatewaySeriesBatchSize = -1 + }, + expected: errInvalidSeriesBatchSize, + }, + "should pass when 0 series batch size": { + setup: func(cfg *Config) { + cfg.StoreGatewaySeriesBatchSize = 0 + }, + expected: nil, + }, } for testName, testData := range tests { diff --git a/pkg/storegateway/bucket_stores_bench_test.go b/pkg/storegateway/bucket_stores_bench_test.go new file mode 100644 index 00000000000..50f7b0ec1ba --- /dev/null +++ b/pkg/storegateway/bucket_stores_bench_test.go @@ -0,0 +1,162 @@ +package storegateway + +import ( + "context" + "fmt" + "io" + "math" + "net" + "os" + "path/filepath" + "testing" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/promslog" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/store/storepb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" + + "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" +) + +func BenchmarkThanosBucketStores_SeriesBatch(b *testing.B) { + seriesNum := []int{100, 1000, 10000, 100000} + samplePerSeries := 100 + batchSizes := []int{1, 10, 100, 1000, 10000} + + for _, series := range seriesNum { + b.Run(fmt.Sprintf("series_%d", series), func(b *testing.B) { + tmpDir := b.TempDir() + storageDir := filepath.Join(tmpDir, "storage") + userID := "user-1" + + // generate block for benchmark + generateBenchmarkBlock(b, storageDir, userID, series, samplePerSeries) + + // Initialize the BucketStore + cfg := prepareStorageConfig(b) + cfg.BucketStore.SyncDir = filepath.Join(tmpDir, "sync") + + bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(b, err) + + reg := prometheus.NewPedanticRegistry() + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucketClient), defaultLimitsOverrides(nil), mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(b, err) + + // Perform Initial Sync to load blocks + require.NoError(b, stores.InitialSync(context.Background())) + + // Start gRPC Server + listener, err := net.Listen("tcp", "localhost:0") + require.NoError(b, err) + + gRPCServer := grpc.NewServer() + storepb.RegisterStoreServer(gRPCServer, stores) + + // start gRPC server + go func() { + if err := gRPCServer.Serve(listener); err != nil && err != grpc.ErrServerStopped { + b.Error(err) + } + }() + defer gRPCServer.Stop() + + // Initialize gRPC Client + conn, err := grpc.NewClient(listener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(math.MaxInt32)), + ) + require.NoError(b, err) + defer conn.Close() + + gRPCClient := storepb.NewStoreClient(conn) + for _, batchSize := range batchSizes { + b.Run(fmt.Sprintf("batchSize=%d", batchSize), func(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + benchmarkBatching(b, gRPCClient, userID, batchSize, series) + require.NoError(b, err) + } + }) + } + }) + } +} + +func benchmarkBatching(b *testing.B, client storepb.StoreClient, userID string, batchSize int, expectedSeries int) { + // Inject Tenant ID into context + ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(cortex_tsdb.TenantIDExternalLabel, userID)) + + req := &storepb.SeriesRequest{ + MinTime: math.MinInt64, + MaxTime: math.MaxInt64, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: labels.MetricName, Value: ".*"}, + }, + ResponseBatchSize: int64(batchSize), // This triggers batching in Thanos + } + + stream, err := client.Series(ctx, req) + require.NoError(b, err) + + got := 0 + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + require.NoError(b, err) + + if series := resp.GetSeries(); series != nil { + got++ + } else if batch := resp.GetBatch(); batch != nil { + got += len(batch.Series) + } + } + + if got != expectedSeries { + b.Fatalf("expected %d series, got %d", expectedSeries, got) + } +} + +func generateBenchmarkBlock(b *testing.B, storageDir, userID string, numSeries, numSamples int) { + userDir := filepath.Join(storageDir, userID) + if err := os.MkdirAll(userDir, os.ModePerm); err != nil { + b.Fatal(err) + } + + tmpDir := b.TempDir() + db, err := tsdb.Open(tmpDir, promslog.NewNopLogger(), nil, tsdb.DefaultOptions(), nil) + require.NoError(b, err) + defer db.Close() + + app := db.Appender(context.Background()) + + for i := 0; i < numSeries; i++ { + lbls := labels.FromStrings( + labels.MetricName, "test_metric", + "idx", fmt.Sprintf("%d", i), + "job", "test_job", + "instance", "localhost:9090", + ) + + for t := 0; t < numSamples; t++ { + // Time in milliseconds, step 15s + ts := int64(t * 15000) + _, err := app.Append(0, lbls, ts, float64(i)) + require.NoError(b, err) + } + } + + require.NoError(b, app.Commit()) + require.NoError(b, db.Snapshot(userDir, true)) +} diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index e6d785d17ad..333658d4890 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -690,7 +690,7 @@ func TestBucketStores_SyncBlocksWithIgnoreBlocksBefore(t *testing.T) { `), "cortex_bucket_store_block_loads_total", "cortex_bucket_store_blocks_loaded", "cortex_blocks_meta_synced")) } -func prepareStorageConfig(t *testing.T) cortex_tsdb.BlocksStorageConfig { +func prepareStorageConfig(t testing.TB) cortex_tsdb.BlocksStorageConfig { cfg := cortex_tsdb.BlocksStorageConfig{} flagext.DefaultValues(&cfg) cfg.BucketStore.SyncDir = t.TempDir() diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 62c9d0b9e21..8e1e48a8f87 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -6076,6 +6076,12 @@ "type": "boolean", "x-cli-flag": "querier.store-gateway-query-stats-enabled" }, + "store_gateway_series_batch_size": { + "default": 1, + "description": "[Experimental] The maximum number of series to be batched in a single gRPC response message from Store Gateways. A value of 0 or 1 disables batching.", + "type": "number", + "x-cli-flag": "querier.store-gateway-series-batch-size" + }, "thanos_engine": { "properties": { "decoding_concurrency": { From 7331d3c60c3b159931c9bc31e031af20e1af8e8c Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 12 Jan 2026 16:48:04 +0900 Subject: [PATCH 2/7] fix lint Signed-off-by: SungJin1212 --- pkg/storegateway/bucket_stores_bench_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storegateway/bucket_stores_bench_test.go b/pkg/storegateway/bucket_stores_bench_test.go index 50f7b0ec1ba..2dd9c2207f7 100644 --- a/pkg/storegateway/bucket_stores_bench_test.go +++ b/pkg/storegateway/bucket_stores_bench_test.go @@ -141,7 +141,7 @@ func generateBenchmarkBlock(b *testing.B, storageDir, userID string, numSeries, app := db.Appender(context.Background()) - for i := 0; i < numSeries; i++ { + for i := range numSeries { lbls := labels.FromStrings( labels.MetricName, "test_metric", "idx", fmt.Sprintf("%d", i), @@ -149,7 +149,7 @@ func generateBenchmarkBlock(b *testing.B, storageDir, userID string, numSeries, "instance", "localhost:9090", ) - for t := 0; t < numSamples; t++ { + for t := range numSamples { // Time in milliseconds, step 15s ts := int64(t * 15000) _, err := app.Append(0, lbls, ts, float64(i)) From a75379543e77f2ce6a64bca666bd3e4a77c82aea Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 12 Jan 2026 17:00:07 +0900 Subject: [PATCH 3/7] Add flag to experimental features Signed-off-by: SungJin1212 --- docs/configuration/v1-guarantees.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 4356ee4a60b..2a35b853688 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -124,6 +124,7 @@ Currently experimental features are: - Enable string interning for metrics labels by setting `-ingester.labels-string-interning-enabled` on Ingester. - Query-frontend: query rejection (`-frontend.query-rejection.enabled`) - Querier: protobuf codec (`-api.querier-default-codec`) +- Querier: Series batch size (`-querier.store-gateway-series-batch-size`) - Query-frontend: dynamic query splits - `querier.max-shards-per-query` (int) CLI flag - `querier.max-fetched-data-duration-per-query` (duration) CLI flag From d4ba39ee9fca90462e76c31793cc4ee12546743e Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 13 Jan 2026 11:00:56 +0900 Subject: [PATCH 4/7] thanos dump to 49dde505913b6b838ca0cd77bb63dd50b8b6fdba Signed-off-by: SungJin1212 --- go.mod | 2 +- go.sum | 4 ++-- vendor/github.com/thanos-io/thanos/pkg/store/batchable.go | 6 +++--- vendor/modules.txt | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 47a06e91f78..4bf338d533c 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/stretchr/testify v1.11.1 github.com/thanos-io/objstore v0.0.0-20250804093838-71d60dfee488 github.com/thanos-io/promql-engine v0.0.0-20251224085502-3988aa4704b5 - github.com/thanos-io/thanos v0.40.1-0.20260109174305-38129bbb6008 + github.com/thanos-io/thanos v0.40.1-0.20260112164636-49dde505913b github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5 go.etcd.io/etcd/api/v3 v3.5.17 diff --git a/go.sum b/go.sum index f3490d37f7f..0b1b2630e2e 100644 --- a/go.sum +++ b/go.sum @@ -1789,8 +1789,8 @@ github.com/thanos-io/objstore v0.0.0-20250804093838-71d60dfee488 h1:khBsQLLRoF1K github.com/thanos-io/objstore v0.0.0-20250804093838-71d60dfee488/go.mod h1:uDHLkMKOGDAnlN75EAz8VrRzob1+VbgYSuUleatWuF0= github.com/thanos-io/promql-engine v0.0.0-20251224085502-3988aa4704b5 h1:hIg9M9TRha/qaLDdtwsTWsTDkewGHleVZaV2JsLY1vA= github.com/thanos-io/promql-engine v0.0.0-20251224085502-3988aa4704b5/go.mod h1:MOFN0M1nDMcWZg1t4iF39sOard/K4SWgO/HHSODeDIc= -github.com/thanos-io/thanos v0.40.1-0.20260109174305-38129bbb6008 h1:+msnhVSAWx7AewLJllGEELJmhGciT5BmLoq6rUt4Aq4= -github.com/thanos-io/thanos v0.40.1-0.20260109174305-38129bbb6008/go.mod h1:B9TgiYdhZdVxB1jXi4hRV+XDhiMmhHFykb8cxsZyWG8= +github.com/thanos-io/thanos v0.40.1-0.20260112164636-49dde505913b h1:KIQzAcxtdxi3PhrOpGP5t/TP7NBZqYvvcUvlu0q8fEQ= +github.com/thanos-io/thanos v0.40.1-0.20260112164636-49dde505913b/go.mod h1:B9TgiYdhZdVxB1jXi4hRV+XDhiMmhHFykb8cxsZyWG8= github.com/tinylib/msgp v1.3.0 h1:ULuf7GPooDaIlbyvgAxBV/FI7ynli6LZ1/nVUNu+0ww= github.com/tinylib/msgp v1.3.0/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0= github.com/tjhop/slog-gokit v0.1.4 h1:uj/vbDt3HaF0Py8bHPV4ti/s0utnO0miRbO277FLBKM= diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/batchable.go b/vendor/github.com/thanos-io/thanos/pkg/store/batchable.go index 477cb06750b..9a399a51f83 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/batchable.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/batchable.go @@ -37,7 +37,7 @@ func (b *batchableServer) Flush() error { if err := b.Store_SeriesServer.Send(storepb.NewBatchResponse(b.series)); err != nil { return err } - b.series = b.series[:0] + b.series = make([]*storepb.Series, 0, b.batchSize) } return nil @@ -50,7 +50,7 @@ func (b *batchableServer) Send(response *storepb.SeriesResponse) error { if err := b.Store_SeriesServer.Send(storepb.NewBatchResponse(b.series)); err != nil { return err } - b.series = b.series[:0] + b.series = make([]*storepb.Series, 0, b.batchSize) } return b.Store_SeriesServer.Send(response) } @@ -61,7 +61,7 @@ func (b *batchableServer) Send(response *storepb.SeriesResponse) error { if err := b.Store_SeriesServer.Send(storepb.NewBatchResponse(b.series)); err != nil { return err } - b.series = b.series[:0] + b.series = make([]*storepb.Series, 0, b.batchSize) } return nil diff --git a/vendor/modules.txt b/vendor/modules.txt index d218123d681..06408e6184e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1241,7 +1241,7 @@ github.com/thanos-io/promql-engine/ringbuffer github.com/thanos-io/promql-engine/storage github.com/thanos-io/promql-engine/storage/prometheus github.com/thanos-io/promql-engine/warnings -# github.com/thanos-io/thanos v0.40.1-0.20260109174305-38129bbb6008 +# github.com/thanos-io/thanos v0.40.1-0.20260112164636-49dde505913b ## explicit; go 1.25.0 github.com/thanos-io/thanos/pkg/api/query/querypb github.com/thanos-io/thanos/pkg/block From b69775883fd1680151df0f06d1e34e916db4e3c0 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 13 Jan 2026 11:01:10 +0900 Subject: [PATCH 5/7] changelog change Signed-off-by: SungJin1212 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ec095dd5ec3..6172372684e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ * [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077 * [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152 * [FEATURE] Ingester: Add experimental active series queried metric. #7173 -* [FEATURE] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203 +* [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203 * [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191 * [ENHANCEMENT] Ingester: Optimize labels out-of-order (ooo) check by allowing the iteration to terminate immediately upon finding the first unsorted label. #7186 * [ENHANCEMENT] Distributor: Skip attaching `__unit__` and `__type__` labels when `-distributor.enable-type-and-unit-labels` is enabled, as these are appended from metadata. #7145 From bf66b3e9fa926a8c5d8e6a1f631519aef2c2250f Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 13 Jan 2026 13:31:49 +0900 Subject: [PATCH 6/7] port batchable server to parget SG Signed-off-by: SungJin1212 --- pkg/storegateway/batchable.go | 91 ++++++++++++++++++++++++ pkg/storegateway/parquet_bucket_store.go | 8 ++- 2 files changed, 96 insertions(+), 3 deletions(-) create mode 100644 pkg/storegateway/batchable.go diff --git a/pkg/storegateway/batchable.go b/pkg/storegateway/batchable.go new file mode 100644 index 00000000000..51eb5299f4a --- /dev/null +++ b/pkg/storegateway/batchable.go @@ -0,0 +1,91 @@ +package storegateway + +import ( + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +type flushableServer interface { + storepb.Store_SeriesServer + + Flush() error +} + +// copied from thanos pkg/store/flushable.go +func newFlushableServer( + upstream storepb.Store_SeriesServer, +) flushableServer { + return &passthroughServer{Store_SeriesServer: upstream} +} + +type passthroughServer struct { + storepb.Store_SeriesServer +} + +func (p *passthroughServer) Flush() error { + // If the underlying server is also flushable, flush it + if f, ok := p.Store_SeriesServer.(flushableServer); ok { + return f.Flush() + } + return nil +} + +// copied from thanos pkg/store/batchable.go +func newBatchableServer( + upstream storepb.Store_SeriesServer, + batchSize int, +) storepb.Store_SeriesServer { + switch batchSize { + case 0: + return &passthroughServer{Store_SeriesServer: upstream} + case 1: + return &passthroughServer{Store_SeriesServer: upstream} + default: + return &batchableServer{ + Store_SeriesServer: upstream, + batchSize: batchSize, + series: make([]*storepb.Series, 0, batchSize), + } + } +} + +// batchableServer is a flushableServer that allows sending a batch of Series per message. +type batchableServer struct { + storepb.Store_SeriesServer + batchSize int + series []*storepb.Series +} + +func (b *batchableServer) Flush() error { + if len(b.series) != 0 { + if err := b.Store_SeriesServer.Send(storepb.NewBatchResponse(b.series)); err != nil { + return err + } + b.series = make([]*storepb.Series, 0, b.batchSize) + } + + return nil +} + +func (b *batchableServer) Send(response *storepb.SeriesResponse) error { + series := response.GetSeries() + if series == nil { + if len(b.series) > 0 { + if err := b.Store_SeriesServer.Send(storepb.NewBatchResponse(b.series)); err != nil { + return err + } + b.series = make([]*storepb.Series, 0, b.batchSize) + } + return b.Store_SeriesServer.Send(response) + } + + b.series = append(b.series, series) + + if len(b.series) >= b.batchSize { + if err := b.Store_SeriesServer.Send(storepb.NewBatchResponse(b.series)); err != nil { + return err + } + b.series = make([]*storepb.Series, 0, b.batchSize) + } + + return nil +} diff --git a/pkg/storegateway/parquet_bucket_store.go b/pkg/storegateway/parquet_bucket_store.go index 510b74b781c..e6bded1e2f6 100644 --- a/pkg/storegateway/parquet_bucket_store.go +++ b/pkg/storegateway/parquet_bucket_store.go @@ -77,10 +77,12 @@ func (p *parquetBucketStore) findParquetBlocks(ctx context.Context, blockMatcher } // Series implements the store interface for a single parquet bucket store -func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) { - spanLog, ctx := spanlogger.New(srv.Context(), "ParquetBucketStore.Series") +func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { + spanLog, ctx := spanlogger.New(seriesSrv.Context(), "ParquetBucketStore.Series") defer spanLog.Finish() + srv := newFlushableServer(newBatchableServer(seriesSrv, int(req.ResponseBatchSize))) + matchers, err := storecache.MatchersToPromMatchersCached(p.matcherCache, req.Matchers...) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) @@ -160,7 +162,7 @@ func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, srv storepb.Stor return } - return nil + return srv.Flush() } // LabelNames implements the store interface for a single parquet bucket store From f213eb053ef39f2f8172fd1f30e77d8a06ca3ef3 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 13 Jan 2026 18:01:22 +0900 Subject: [PATCH 7/7] Add BenchmarkParquetBucketStore_SeriesBatch Signed-off-by: SungJin1212 --- .../parquet_bucket_store_bench_test.go | 270 ++++++++++++++++++ 1 file changed, 270 insertions(+) create mode 100644 pkg/storegateway/parquet_bucket_store_bench_test.go diff --git a/pkg/storegateway/parquet_bucket_store_bench_test.go b/pkg/storegateway/parquet_bucket_store_bench_test.go new file mode 100644 index 00000000000..37b389104a4 --- /dev/null +++ b/pkg/storegateway/parquet_bucket_store_bench_test.go @@ -0,0 +1,270 @@ +package storegateway + +import ( + "context" + "encoding/json" + "fmt" + "io" + "math" + "net" + "os" + "path/filepath" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/gogo/protobuf/types" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/store/hintspb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + grpcMetadata "google.golang.org/grpc/metadata" + + "github.com/cortexproject/cortex/pkg/parquetconverter" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/users" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +func BenchmarkParquetBucketStore_SeriesBatch(b *testing.B) { + seriesNum := []int{100, 1000, 10000, 100000} + samplePerSeries := 100 + batchSizes := []int{1, 10, 100, 1000, 10000} + + for _, series := range seriesNum { + b.Run(fmt.Sprintf("series_%d", series), func(b *testing.B) { + ctx := context.Background() + tmpDir := b.TempDir() + storageDir := filepath.Join(tmpDir, "storage") + dataDir := filepath.Join(tmpDir, "data") + userID := "user-1" + + // Initialize the BucketStore + storageCfg := cortex_tsdb.BlocksStorageConfig{ + UsersScanner: users.UsersScannerConfig{ + Strategy: users.UserScanStrategyList, + UpdateInterval: time.Second, + }, + Bucket: bucket.Config{ + Backend: "filesystem", + Filesystem: filesystem.Config{ + Directory: storageDir, + }, + }, + BucketStore: cortex_tsdb.BucketStoreConfig{ + SyncDir: filepath.Join(tmpDir, "sync"), + BucketStoreType: "parquet", + BlockDiscoveryStrategy: string(cortex_tsdb.RecursiveDiscovery), + }, + } + bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, nil, "test", log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(b, err) + + blockID := prepareParquetBlock(b, ctx, storageCfg, bucketClient, dataDir, userID, series, samplePerSeries) + + reg := prometheus.NewPedanticRegistry() + stores, err := NewBucketStores(storageCfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucketClient), defaultLimitsOverrides(nil), mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(b, err) + + // Start gRPC Server + listener, err := net.Listen("tcp", "localhost:0") + require.NoError(b, err) + + gRPCServer := grpc.NewServer( + grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor), + ) + storepb.RegisterStoreServer(gRPCServer, stores) + + // start gRPC server + go func() { + if err := gRPCServer.Serve(listener); err != nil && err != grpc.ErrServerStopped { + b.Error(err) + } + }() + defer gRPCServer.Stop() + + // Initialize gRPC Client + conn, err := grpc.NewClient(listener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(math.MaxInt32), + ), + ) + require.NoError(b, err) + defer conn.Close() + + gRPCClient := storepb.NewStoreClient(conn) + for _, batchSize := range batchSizes { + b.Run(fmt.Sprintf("batchSize=%d", batchSize), func(b *testing.B) { + b.ReportAllocs() + for b.Loop() { + benchmarkBatchingForParquetBucketStore(b, gRPCClient, userID, batchSize, series, blockID) + } + }) + } + }) + } +} + +func prepareParquetBlock(b *testing.B, ctx context.Context, storageCfg cortex_tsdb.BlocksStorageConfig, bkt objstore.InstrumentedBucket, dataDir, userID string, numSeries, numSamples int) string { + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + + // Generate TSDB block + generateBenchmarkBlock(b, dataDir, userID, numSeries, numSamples) + + userBucket := bucket.NewUserBucketClient("user-1", bkt, nil) + srcBlockDir := filepath.Join(dataDir, userID) + + // Find blockID + dirs, err := os.ReadDir(srcBlockDir) + require.NoError(b, err) + var blockID string + for _, d := range dirs { + if d.IsDir() { + blockID = d.Name() + break + } + } + require.NotEmpty(b, blockID) + + blockPath := filepath.Join(srcBlockDir, blockID) + metaFilePath := filepath.Join(blockPath, "meta.json") + metaBytes, err := os.ReadFile(metaFilePath) + require.NoError(b, err) + + var meta metadata.Meta + err = json.Unmarshal(metaBytes, &meta) + require.NoError(b, err) + + if meta.Thanos.Labels == nil { + meta.Thanos.Labels = make(map[string]string) + } + meta.Thanos.Labels["replica"] = "0" // append dummy label to success block.Upload + + // Write thanos label appended meta.json + newMetaBytes, err := json.Marshal(meta) + require.NoError(b, err) + err = os.WriteFile(metaFilePath, newMetaBytes, 0666) + require.NoError(b, err) + + // Upload generated block to Storage + err = block.Upload(ctx, logger, userBucket, blockPath, metadata.NoneFunc) + require.NoError(b, err) + + convCfg := parquetconverter.Config{} + flagext.DefaultValues(&convCfg) + convCfg.ConversionInterval = time.Second // to convert quickly + convCfg.DataDir = filepath.Join(dataDir, "converter-data") + + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + b.Cleanup(func() { assert.NoError(b, closer.Close()) }) + + convCfg.Ring.InstanceID = "parquet-converter-1" + convCfg.Ring.InstanceAddr = "1.2.3.4" + convCfg.Ring.KVStore.Mock = ringStore + + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.ParquetConverterEnabled = true + overrides := validation.NewOverrides(*limits, nil) + + // Create parquet converter + converter, err := parquetconverter.NewConverter(convCfg, storageCfg, []int64{1, 2 * 3600 * 1000}, logger, reg, overrides) + require.NoError(b, err) + + err = services.StartAndAwaitRunning(context.Background(), converter) + require.NoError(b, err) + defer services.StopAndAwaitTerminated(ctx, converter) // nolint:errcheck + + // check parquet converter mark file exist + markerFile := filepath.Join(blockID, "parquet-converter-mark.json") + require.Eventually(b, func() bool { + exists, err := userBucket.Exists(ctx, markerFile) + return err == nil && exists + }, 10*time.Second, 100*time.Millisecond, "failed to wait for parquet conversion (marker file not found)") + + // check chunk parquet file exist + existsChunks, err := userBucket.Exists(ctx, filepath.Join(blockID, "0.chunks.parquet")) + require.NoError(b, err) + require.True(b, existsChunks, "chunks.parquet file should exist") + + // check labels parquet file exist + existsLabels, err := userBucket.Exists(ctx, filepath.Join(blockID, "0.labels.parquet")) + require.NoError(b, err) + require.True(b, existsLabels, "labels.parquet file should exist") + + return blockID +} + +func benchmarkBatchingForParquetBucketStore(b *testing.B, client storepb.StoreClient, userID string, batchSize int, expectedSeries int, blockID string) { + ctx := grpcMetadata.NewOutgoingContext(context.Background(), grpcMetadata.Pairs(cortex_tsdb.TenantIDExternalLabel, userID)) + ctx, err := user.InjectIntoGRPCRequest(user.InjectOrgID(ctx, userID)) + require.NoError(b, err) + + hintMatchers := []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_RE, + Name: block.BlockIDLabel, + Value: blockID, + }, + } + + dataMatchers := []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_RE, + Name: "__name__", + Value: ".+", + }, + } + + hints := &hintspb.SeriesRequestHints{ + BlockMatchers: hintMatchers, + } + hintsAny, err := types.MarshalAny(hints) + require.NoError(b, err) + + req := &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: math.MaxInt64, + Matchers: dataMatchers, + ResponseBatchSize: int64(batchSize), + Hints: hintsAny, + } + + stream, err := client.Series(ctx, req) + require.NoError(b, err) + + got := 0 + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + require.NoError(b, err) + + if series := resp.GetSeries(); series != nil { + got++ + } else if batch := resp.GetBatch(); batch != nil { + got += len(batch.Series) + } + } + + if got != expectedSeries { + b.Fatalf("expected %d series, got %d", expectedSeries, got) + } +}