Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
* [FEATURE] Ingester: Add experimental active series queried metric. #7173
* [ENHANCEMENT] Distributor: Add validation to ensure remote write v2 requests contain at least one sample or histogram. #7201
* [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191
* [ENHANCEMENT] Ingester: Optimize labels out-of-order (ooo) check by allowing the iteration to terminate immediately upon finding the first unsorted label. #7186
* [ENHANCEMENT] Distributor: Skip attaching `__unit__` and `__type__` labels when `-distributor.enable-type-and-unit-labels` is enabled, as these are appended from metadata. #7145
Expand Down
4 changes: 4 additions & 0 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUni
return v1Req, err
}

if len(v2Ts.Samples) == 0 && len(v2Ts.Histograms) == 0 {
return v1Req, fmt.Errorf("TimeSeries must contain at least one sample or histogram for series %v", lbs.String())
}

unit := symbols[v2Ts.Metadata.UnitRef]
metricType := v2Ts.Metadata.Type
shouldAttachTypeAndUnitLabels := enableTypeAndUnitLabels && (metricType != cortexpb.METRIC_TYPE_UNSPECIFIED || unit != "")
Expand Down
99 changes: 77 additions & 22 deletions pkg/util/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,30 +452,85 @@ func TestHandler_remoteWrite(t *testing.T) {
flagext.DefaultValues(&limits)
overrides := validation.NewOverrides(limits, nil)

t.Run("remote write v1", func(t *testing.T) {
handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API))
req := createRequest(t, createPrometheusRemoteWriteProtobuf(t), false)
resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
assert.Equal(t, http.StatusOK, resp.Code)
})
t.Run("remote write v2", func(t *testing.T) {
ctx := context.Background()
ctx = user.InjectOrgID(ctx, "user-1")
tests := []struct {
name string
createBody func() ([]byte, bool) // returns (bodyBytes, isV2)
expectedStatus int
expectedBody string
verifyResponse func(resp *httptest.ResponseRecorder)
}{
{
name: "remote write v1",
createBody: func() ([]byte, bool) {
return createPrometheusRemoteWriteProtobuf(t), false
},
expectedStatus: http.StatusOK,
},
{
name: "remote write v2",
createBody: func() ([]byte, bool) {
return createPrometheusRemoteWriteV2Protobuf(t), true
},
expectedStatus: http.StatusNoContent,
verifyResponse: func(resp *httptest.ResponseRecorder) {
respHeader := resp.Header()
assert.Equal(t, "1", respHeader[rw20WrittenSamplesHeader][0])
assert.Equal(t, "1", respHeader[rw20WrittenHistogramsHeader][0])
assert.Equal(t, "1", respHeader[rw20WrittenExemplarsHeader][0])
},
},
{
name: "remote write v2 with empty samples and histograms should return 400",
createBody: func() ([]byte, bool) {
// Create a request with a TimeSeries that has no samples and no histograms
reqProto := writev2.Request{
Symbols: []string{"", "__name__", "foo"},
Timeseries: []writev2.TimeSeries{
{
LabelsRefs: []uint32{1, 2},
Exemplars: []writev2.Exemplar{
{
LabelsRefs: []uint32{},
Value: 1.0,
Timestamp: time.Now().UnixMilli(),
},
},
},
},
}
reqBytes, err := reqProto.Marshal()
require.NoError(t, err)
return reqBytes, true
},
expectedStatus: http.StatusBadRequest,
expectedBody: "TimeSeries must contain at least one sample or histogram for series {__name__=\"foo\"}",
},
}

handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API))
req := createRequest(t, createPrometheusRemoteWriteV2Protobuf(t), true)
req = req.WithContext(ctx)
resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
assert.Equal(t, http.StatusNoContent, resp.Code)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
ctx = user.InjectOrgID(ctx, "user-1")
handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API))

// test header value
respHeader := resp.Header()
assert.Equal(t, "1", respHeader[rw20WrittenSamplesHeader][0])
assert.Equal(t, "1", respHeader[rw20WrittenHistogramsHeader][0])
assert.Equal(t, "1", respHeader[rw20WrittenExemplarsHeader][0])
})
body, isV2 := test.createBody()
req := createRequest(t, body, isV2)
req = req.WithContext(ctx)

resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)

assert.Equal(t, test.expectedStatus, resp.Code)

if test.expectedBody != "" {
assert.Contains(t, resp.Body.String(), test.expectedBody)
}

if test.verifyResponse != nil {
test.verifyResponse(resp)
}
})
}
}

func TestHandler_ContentTypeAndEncoding(t *testing.T) {
Expand Down
Loading