Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
* [FEATURE] Ingester: Add experimental active series queried metric. #7173
* [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203
* [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191
* [ENHANCEMENT] Ingester: Optimize labels out-of-order (ooo) check by allowing the iteration to terminate immediately upon finding the first unsorted label. #7186
* [ENHANCEMENT] Distributor: Skip attaching `__unit__` and `__type__` labels when `-distributor.enable-type-and-unit-labels` is enabled, as these are appended from metadata. #7145
Expand Down
5 changes: 5 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,11 @@ querier:
# CLI flag: -querier.store-gateway-consistency-check-max-attempts
[store_gateway_consistency_check_max_attempts: <int> | default = 3]

# [Experimental] The maximum number of series to be batched in a single gRPC
# response message from Store Gateways. A value of 0 or 1 disables batching.
# CLI flag: -querier.store-gateway-series-batch-size
[store_gateway_series_batch_size: <int> | default = 1]

# The maximum number of times we attempt fetching data from ingesters for
# retryable errors (ex. partial data returned).
# CLI flag: -querier.ingester-query-max-attempts
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4829,6 +4829,11 @@ store_gateway_client:
# CLI flag: -querier.store-gateway-consistency-check-max-attempts
[store_gateway_consistency_check_max_attempts: <int> | default = 3]

# [Experimental] The maximum number of series to be batched in a single gRPC
# response message from Store Gateways. A value of 0 or 1 disables batching.
# CLI flag: -querier.store-gateway-series-batch-size
[store_gateway_series_batch_size: <int> | default = 1]

# The maximum number of times we attempt fetching data from ingesters for
# retryable errors (ex. partial data returned).
# CLI flag: -querier.ingester-query-max-attempts
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ Currently experimental features are:
- Enable string interning for metrics labels by setting `-ingester.labels-string-interning-enabled` on Ingester.
- Query-frontend: query rejection (`-frontend.query-rejection.enabled`)
- Querier: protobuf codec (`-api.querier-default-codec`)
- Querier: Series batch size (`-querier.store-gateway-series-batch-size`)
- Query-frontend: dynamic query splits
- `querier.max-shards-per-query` (int) CLI flag
- `querier.max-fetched-data-duration-per-query` (duration) CLI flag
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/stretchr/testify v1.11.1
github.com/thanos-io/objstore v0.0.0-20250804093838-71d60dfee488
github.com/thanos-io/promql-engine v0.0.0-20251224085502-3988aa4704b5
github.com/thanos-io/thanos v0.40.1-0.20260109174305-38129bbb6008
github.com/thanos-io/thanos v0.40.1-0.20260112164636-49dde505913b
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5
go.etcd.io/etcd/api/v3 v3.5.17
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1789,8 +1789,8 @@ github.com/thanos-io/objstore v0.0.0-20250804093838-71d60dfee488 h1:khBsQLLRoF1K
github.com/thanos-io/objstore v0.0.0-20250804093838-71d60dfee488/go.mod h1:uDHLkMKOGDAnlN75EAz8VrRzob1+VbgYSuUleatWuF0=
github.com/thanos-io/promql-engine v0.0.0-20251224085502-3988aa4704b5 h1:hIg9M9TRha/qaLDdtwsTWsTDkewGHleVZaV2JsLY1vA=
github.com/thanos-io/promql-engine v0.0.0-20251224085502-3988aa4704b5/go.mod h1:MOFN0M1nDMcWZg1t4iF39sOard/K4SWgO/HHSODeDIc=
github.com/thanos-io/thanos v0.40.1-0.20260109174305-38129bbb6008 h1:+msnhVSAWx7AewLJllGEELJmhGciT5BmLoq6rUt4Aq4=
github.com/thanos-io/thanos v0.40.1-0.20260109174305-38129bbb6008/go.mod h1:B9TgiYdhZdVxB1jXi4hRV+XDhiMmhHFykb8cxsZyWG8=
github.com/thanos-io/thanos v0.40.1-0.20260112164636-49dde505913b h1:KIQzAcxtdxi3PhrOpGP5t/TP7NBZqYvvcUvlu0q8fEQ=
github.com/thanos-io/thanos v0.40.1-0.20260112164636-49dde505913b/go.mod h1:B9TgiYdhZdVxB1jXi4hRV+XDhiMmhHFykb8cxsZyWG8=
github.com/tinylib/msgp v1.3.0 h1:ULuf7GPooDaIlbyvgAxBV/FI7ynli6LZ1/nVUNu+0ww=
github.com/tinylib/msgp v1.3.0/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0=
github.com/tjhop/slog-gokit v0.1.4 h1:uj/vbDt3HaF0Py8bHPV4ti/s0utnO0miRbO277FLBKM=
Expand Down
73 changes: 48 additions & 25 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ type BlocksStoreQueryable struct {

storeGatewayQueryStatsEnabled bool
storeGatewayConsistencyCheckMaxAttempts int
storeGatewaySeriesBatchSize int64

// Subservices manager.
subservices *services.Manager
Expand Down Expand Up @@ -175,6 +176,7 @@ func NewBlocksStoreQueryable(
limits: limits,
storeGatewayQueryStatsEnabled: config.StoreGatewayQueryStatsEnabled,
storeGatewayConsistencyCheckMaxAttempts: config.StoreGatewayConsistencyCheckMaxAttempts,
storeGatewaySeriesBatchSize: config.StoreGatewaySeriesBatchSize,
}

q.Service = services.NewBasicService(q.starting, q.running, q.stopping)
Expand Down Expand Up @@ -306,6 +308,7 @@ func (q *BlocksStoreQueryable) Querier(mint, maxt int64) (storage.Querier, error
queryStoreAfter: q.queryStoreAfter,
storeGatewayQueryStatsEnabled: q.storeGatewayQueryStatsEnabled,
storeGatewayConsistencyCheckMaxAttempts: q.storeGatewayConsistencyCheckMaxAttempts,
storeGatewaySeriesBatchSize: q.storeGatewaySeriesBatchSize,
}, nil
}

Expand All @@ -328,6 +331,9 @@ type blocksStoreQuerier struct {

// The maximum number of times we attempt fetching missing blocks from different Store Gateways.
storeGatewayConsistencyCheckMaxAttempts int

// The maximum number of series to be batched in a single gRPC response message from Store Gateways.
storeGatewaySeriesBatchSize int64
}

// Select implements storage.Querier interface.
Expand Down Expand Up @@ -648,7 +654,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
seriesQueryStats := &hintspb.QueryStats{}
skipChunks := sp != nil && sp.Func == "series"

req, err := createSeriesRequest(minT, maxT, limit, convertedMatchers, sp, shardingInfo, skipChunks, blockIDs, defaultAggrs)
req, err := createSeriesRequest(minT, maxT, limit, convertedMatchers, sp, shardingInfo, skipChunks, blockIDs, defaultAggrs, q.storeGatewaySeriesBatchSize)
if err != nil {
return errors.Wrapf(err, "failed to create series request")
}
Expand All @@ -670,6 +676,37 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
myWarnings := annotations.Annotations(nil)
myQueriedBlocks := []ulid.ULID(nil)

processSeries := func(s *storepb.Series) error {
mySeries = append(mySeries, s)

// Add series fingerprint to query limiter; will return error if we are over the limit
limitErr := queryLimiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s.PromLabels()))
if limitErr != nil {
return validation.LimitError(limitErr.Error())
}

// Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled).
if maxChunksLimit > 0 {
actual := numChunks.Add(int32(len(s.Chunks)))
if actual > int32(leftChunksLimit) {
return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit))
}
}
chunksSize := countChunkBytes(s)
dataSize := countDataBytes(s)
if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil {
return validation.LimitError(chunkBytesLimitErr.Error())
}
if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks)); chunkLimitErr != nil {
return validation.LimitError(chunkLimitErr.Error())
}
if dataBytesLimitErr := queryLimiter.AddDataBytes(dataSize); dataBytesLimitErr != nil {
return validation.LimitError(dataBytesLimitErr.Error())
}

return nil
}

for {
// Ensure the context hasn't been canceled in the meanwhile (eg. an error occurred
// in another goroutine).
Expand Down Expand Up @@ -708,34 +745,19 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
return errors.Wrapf(err, "failed to receive series from %s", c.RemoteAddress())
}

// Response may either contain series, warning or hints.
// Response may either contain series, batch, warning or hints.
if s := resp.GetSeries(); s != nil {
mySeries = append(mySeries, s)

// Add series fingerprint to query limiter; will return error if we are over the limit
limitErr := queryLimiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s.PromLabels()))
if limitErr != nil {
return validation.LimitError(limitErr.Error())
if err := processSeries(s); err != nil {
return err
}
}

// Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled).
if maxChunksLimit > 0 {
actual := numChunks.Add(int32(len(s.Chunks)))
if actual > int32(leftChunksLimit) {
return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit))
if b := resp.GetBatch(); b != nil {
for _, s := range b.Series {
if err := processSeries(s); err != nil {
return err
}
}
chunksSize := countChunkBytes(s)
dataSize := countDataBytes(s)
if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil {
return validation.LimitError(chunkBytesLimitErr.Error())
}
if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks)); chunkLimitErr != nil {
return validation.LimitError(chunkLimitErr.Error())
}
if dataBytesLimitErr := queryLimiter.AddDataBytes(dataSize); dataBytesLimitErr != nil {
return validation.LimitError(dataBytesLimitErr.Error())
}
}

if w := resp.GetWarning(); w != "" {
Expand Down Expand Up @@ -1044,7 +1066,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
return valueSets, warnings, queriedBlocks, nil, merr.Err()
}

func createSeriesRequest(minT, maxT, limit int64, matchers []storepb.LabelMatcher, selectHints *storage.SelectHints, shardingInfo *storepb.ShardInfo, skipChunks bool, blockIDs []ulid.ULID, aggrs []storepb.Aggr) (*storepb.SeriesRequest, error) {
func createSeriesRequest(minT, maxT, limit int64, matchers []storepb.LabelMatcher, selectHints *storage.SelectHints, shardingInfo *storepb.ShardInfo, skipChunks bool, blockIDs []ulid.ULID, aggrs []storepb.Aggr, batchSize int64) (*storepb.SeriesRequest, error) {
// Selectively query only specific blocks.
hints := &hintspb.SeriesRequestHints{
BlockMatchers: []storepb.LabelMatcher{
Expand Down Expand Up @@ -1074,6 +1096,7 @@ func createSeriesRequest(minT, maxT, limit int64, matchers []storepb.LabelMatche
// TODO: support more downsample levels when downsampling is supported.
Aggregates: aggrs,
MaxResolutionWindow: downsample.ResLevel0,
ResponseBatchSize: batchSize,
}

if selectHints != nil {
Expand Down
45 changes: 45 additions & 0 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
}

tests := map[string]struct {
seriesBatchSize int64
finderResult bucketindex.Blocks
finderErr error
storeSetResponses []any
Expand Down Expand Up @@ -1581,6 +1582,33 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
},
},
},
"a single store-gateway instance returns a batch of series": {
seriesBatchSize: 2,
finderResult: bucketindex.Blocks{
&bucketindex.Block{ID: block1},
},
storeSetResponses: []any{
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.FromStrings(metricNameLabel.Name, metricNameLabel.Value, series1Label.Name, series1Label.Value), []cortexpb.Sample{{Value: 1, TimestampMs: minT}}, nil, nil),
mockSeriesResponse(labels.FromStrings(metricNameLabel.Name, metricNameLabel.Value, series2Label.Name, series2Label.Value), []cortexpb.Sample{{Value: 2, TimestampMs: minT}}, nil, nil),
mockHintsResponse(block1),
}}: {block1},
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: noOpQueryLimiter,
expectedSeries: []seriesResult{
{
lbls: labels.New(metricNameLabel, series1Label),
values: []valueResult{{t: minT, v: 1}},
},
{
lbls: labels.New(metricNameLabel, series2Label),
values: []valueResult{{t: minT, v: 2}},
},
},
},
}

for testName, testData := range tests {
Expand Down Expand Up @@ -1622,6 +1650,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
limits: testData.limits,

storeGatewayConsistencyCheckMaxAttempts: 3,
storeGatewaySeriesBatchSize: testData.seriesBatchSize,
}

matchers := []*labels.Matcher{
Expand Down Expand Up @@ -1697,6 +1726,22 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
require.True(t, found)
}

if testData.seriesBatchSize > 0 {
found := false
for _, resp := range testData.storeSetResponses {
if clientsMap, ok := resp.(map[BlocksStoreClient][]ulid.ULID); ok {
for client := range clientsMap {
if mockClient, ok := client.(*storeGatewayClientMock); ok {
// verify if SG get passed seriesBatchSize
assert.Equal(t, testData.seriesBatchSize, mockClient.lastSeriesRequest.ResponseBatchSize)
found = true
}
}
}
}
require.True(t, found)
}

// Assert on metrics (optional, only for test cases defining it).
if testData.expectedMetrics != "" {
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(testData.expectedMetrics)))
Expand Down
9 changes: 9 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type Config struct {
// The maximum number of times we attempt fetching missing blocks from different Store Gateways.
StoreGatewayConsistencyCheckMaxAttempts int `yaml:"store_gateway_consistency_check_max_attempts"`

// The maximum number of series to be batched in a single gRPC response message from Store Gateways.
StoreGatewaySeriesBatchSize int64 `yaml:"store_gateway_series_batch_size"`

// The maximum number of times we attempt fetching data from Ingesters.
IngesterQueryMaxAttempts int `yaml:"ingester_query_max_attempts"`

Expand Down Expand Up @@ -109,6 +112,7 @@ var (
errEmptyTimeRange = errors.New("empty time range")
errUnsupportedResponseCompression = errors.New("unsupported response compression. Supported compression 'gzip', 'snappy', 'zstd' and '' (disable compression)")
errInvalidConsistencyCheckAttempts = errors.New("store gateway consistency check max attempts should be greater or equal than 1")
errInvalidSeriesBatchSize = errors.New("store gateway series batch size should be greater or equal than 0")
errInvalidIngesterQueryMaxAttempts = errors.New("ingester query max attempts should be greater or equal than 1")
errInvalidParquetQueryableDefaultBlockStore = errors.New("unsupported parquet queryable default block store. Supported options are tsdb and parquet")
)
Expand Down Expand Up @@ -142,6 +146,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.StoreGatewayAddresses, "querier.store-gateway-addresses", "", "Comma separated list of store-gateway addresses in DNS Service Discovery format. This option should be set when using the blocks storage and the store-gateway sharding is disabled (when enabled, the store-gateway instances form a ring and addresses are picked from the ring).")
f.BoolVar(&cfg.StoreGatewayQueryStatsEnabled, "querier.store-gateway-query-stats-enabled", true, "If enabled, store gateway query stats will be logged using `info` log level.")
f.IntVar(&cfg.StoreGatewayConsistencyCheckMaxAttempts, "querier.store-gateway-consistency-check-max-attempts", maxFetchSeriesAttempts, "The maximum number of times we attempt fetching missing blocks from different store-gateways. If no more store-gateways are left (ie. due to lower replication factor) than we'll end the retries earlier")
f.Int64Var(&cfg.StoreGatewaySeriesBatchSize, "querier.store-gateway-series-batch-size", 1, "[Experimental] The maximum number of series to be batched in a single gRPC response message from Store Gateways. A value of 0 or 1 disables batching.")
f.IntVar(&cfg.IngesterQueryMaxAttempts, "querier.ingester-query-max-attempts", 1, "The maximum number of times we attempt fetching data from ingesters for retryable errors (ex. partial data returned).")
f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.")
f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).")
Expand Down Expand Up @@ -179,6 +184,10 @@ func (cfg *Config) Validate() error {
return errInvalidConsistencyCheckAttempts
}

if cfg.StoreGatewaySeriesBatchSize < 0 {
return errInvalidSeriesBatchSize
}

if cfg.IngesterQueryMaxAttempts < 1 {
return errInvalidIngesterQueryMaxAttempts
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1666,6 +1666,18 @@ func TestConfig_Validate(t *testing.T) {
},
expected: errInvalidParquetQueryableDefaultBlockStore,
},
"should if if invalid series batch size": {
setup: func(cfg *Config) {
cfg.StoreGatewaySeriesBatchSize = -1
},
expected: errInvalidSeriesBatchSize,
},
"should pass when 0 series batch size": {
setup: func(cfg *Config) {
cfg.StoreGatewaySeriesBatchSize = 0
},
expected: nil,
},
}

for testName, testData := range tests {
Expand Down
Loading
Loading