From c026f1ddbf414eb8508b8417dd720b3897fe45e1 Mon Sep 17 00:00:00 2001 From: Jeff Swenson Date: Thu, 2 Apr 2026 15:23:37 +0000 Subject: [PATCH] changefeedccl: improve webhook sink error reporting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, the entire error returned by the webhook sink was redacted, which gave us the following unhelpful log line: ``` job 1161794369145176075 encountered transient error: ‹×› (attempt 19) ``` Now, we will see the status code and the fact the error was reported by the webhook sink. Release note: none Epic: none --- pkg/ccl/changefeedccl/sink_webhook_test.go | 48 ++++++++++++++++------ pkg/ccl/changefeedccl/sink_webhook_v2.go | 6 ++- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index 552b4100225e..a6bd5a0a5c74 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" "github.com/stretchr/testify/require" ) @@ -206,9 +207,9 @@ func TestWebhookSink(t *testing.T) { // sink's client should not accept the endpoint's use of HTTP (expects HTTPS) require.NoError(t, sinkSrcWrongProtocol.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc, nil)) - require.EqualError(t, sinkSrcWrongProtocol.Flush(context.Background()), - fmt.Sprintf(`Post "%s": http: server gave HTTP response to HTTPS client`, fmt.Sprintf("https://%s", strings.TrimPrefix(sinkDestHTTP.URL(), - "http://")))) + flushErr := sinkSrcWrongProtocol.Flush(context.Background()) + require.True(t, testutils.IsError(flushErr, "webhook sink request failed"), flushErr) + require.True(t, testutils.IsError(flushErr, "http: server gave HTTP response to HTTPS client"), flushErr) sinkDestSecure, err := cdctest.StartMockWebhookSinkSecure(cert) require.NoError(t, err) @@ -305,7 +306,9 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) { require.NoError(t, err) require.NoError(t, sinkSrcNoCreds.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc, nil)) - require.EqualError(t, sinkSrcNoCreds.Flush(context.Background()), "401 Unauthorized: ") + flushErrNoCreds := sinkSrcNoCreds.Flush(context.Background()) + require.True(t, testutils.IsError(flushErrNoCreds, "webhook sink HTTP error"), flushErrNoCreds) + require.True(t, testutils.IsError(flushErrNoCreds, "401 Unauthorized"), flushErrNoCreds) // wrong credentials should result in a 401 as well var wrongAuthHeader string @@ -316,7 +319,9 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) { require.NoError(t, sinkSrcWrongCreds.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc, nil)) - require.EqualError(t, sinkSrcWrongCreds.Flush(context.Background()), "401 Unauthorized: ") + flushErrWrongCreds := sinkSrcWrongCreds.Flush(context.Background()) + require.True(t, testutils.IsError(flushErrWrongCreds, "webhook sink HTTP error"), flushErrWrongCreds) + require.True(t, testutils.IsError(flushErrWrongCreds, "401 Unauthorized"), flushErrWrongCreds) require.NoError(t, sinkSrc.Close()) require.NoError(t, sinkSrcNoCreds.Close()) @@ -403,7 +408,9 @@ func TestWebhookSinkConfig(t *testing.T) { require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc, nil)) - require.EqualError(t, sinkSrc.Flush(context.Background()), "500 Internal Server Error: ") + flushErr := sinkSrc.Flush(context.Background()) + require.True(t, testutils.IsError(flushErr, "webhook sink HTTP error"), flushErr) + require.True(t, testutils.IsError(flushErr, "500 Internal Server Error"), flushErr) // ensure that failures are retried the expected maximum number of times // before returning error @@ -445,7 +452,9 @@ func TestWebhookSinkConfig(t *testing.T) { require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc(), nil)) - require.EqualError(t, sinkSrc.Flush(context.Background()), "500 Internal Server Error: ") + flushErr := sinkSrc.Flush(context.Background()) + require.True(t, testutils.IsError(flushErr, "webhook sink HTTP error"), flushErr) + require.True(t, testutils.IsError(flushErr, "500 Internal Server Error"), flushErr) // ensure that failures are retried the expected maximum number of times // before returning error @@ -678,7 +687,14 @@ func TestWebhookSinkShutsDownOnError(t *testing.T) { require.NoError(t, sinkSrc.EmitRow(ctx, noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc, nil)) // error should be propagated immediately in the next call - require.EqualError(t, sinkSrc.Flush(ctx), "500 Internal Server Error: ") + flushErr := sinkSrc.Flush(ctx) + require.True(t, testutils.IsError(flushErr, "webhook sink HTTP error"), flushErr) + require.True(t, testutils.IsError(flushErr, "500 Internal Server Error"), flushErr) + + // The HTTP status must survive redaction so operators can debug + // from anonymized logs. + redacted := string(redact.Sprint(flushErr).Redact()) + require.Contains(t, redacted, "500 Internal Server Error") // check that no messages have been delivered require.Equal(t, "", sinkDest.Pop()) @@ -1040,10 +1056,18 @@ func TestWebhookSinkErrorCompressedResponse(t *testing.T) { zeroTS, zeroAlloc, nil)) - err = sinkSrc.Flush(ctx) - require.Error(t, err) - // Verify error body is decompressed - require.Equal(t, fmt.Sprintf(`500 Internal Server Error: %s`, responseBody), err.Error()) + flushErr := sinkSrc.Flush(ctx) + require.Error(t, flushErr) + // Verify error contains the HTTP status and the decompressed body. + require.True(t, testutils.IsError(flushErr, "webhook sink HTTP error"), flushErr) + require.True(t, testutils.IsError(flushErr, "500 Internal Server Error"), flushErr) + require.True(t, testutils.IsError(flushErr, responseBody), flushErr) + + // The HTTP status must survive redaction, but the response body + // must be redacted. + redacted := string(redact.Sprint(flushErr).Redact()) + require.Contains(t, redacted, "500 Internal Server Error") + require.NotContains(t, redacted, responseBody) // Verify no messages delivered require.Equal(t, "", sinkDest.Pop()) diff --git a/pkg/ccl/changefeedccl/sink_webhook_v2.go b/pkg/ccl/changefeedccl/sink_webhook_v2.go index 6fc1306091c1..5d7d5482daf5 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_v2.go +++ b/pkg/ccl/changefeedccl/sink_webhook_v2.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" ) const ( @@ -272,7 +273,7 @@ func (sc *webhookSinkClient) Flush(ctx context.Context, batch SinkPayload) error req.Body = b res, err := sc.client.Do(req) if err != nil { - return err + return errors.Wrap(err, "webhook sink request failed") } defer res.Body.Close() @@ -282,7 +283,8 @@ func (sc *webhookSinkClient) Flush(ctx context.Context, batch SinkPayload) error if err != nil { return errors.Wrapf(err, "failed to read body for HTTP response with status: %d", res.StatusCode) } - return fmt.Errorf("%s: %s", res.Status, string(resBody)) + return errors.Newf("webhook sink HTTP error %s: %s", + redact.Safe(res.Status), resBody) } return nil }