Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions pkg/ccl/changefeedccl/sink_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -206,9 +207,9 @@ func TestWebhookSink(t *testing.T) {
// sink's client should not accept the endpoint's use of HTTP (expects HTTPS)
require.NoError(t, sinkSrcWrongProtocol.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc, nil))

require.EqualError(t, sinkSrcWrongProtocol.Flush(context.Background()),
fmt.Sprintf(`Post "%s": http: server gave HTTP response to HTTPS client`, fmt.Sprintf("https://%s", strings.TrimPrefix(sinkDestHTTP.URL(),
"http://"))))
flushErr := sinkSrcWrongProtocol.Flush(context.Background())
require.True(t, testutils.IsError(flushErr, "webhook sink request failed"), flushErr)
require.True(t, testutils.IsError(flushErr, "http: server gave HTTP response to HTTPS client"), flushErr)

sinkDestSecure, err := cdctest.StartMockWebhookSinkSecure(cert)
require.NoError(t, err)
Expand Down Expand Up @@ -305,7 +306,9 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) {
require.NoError(t, err)
require.NoError(t, sinkSrcNoCreds.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc, nil))

require.EqualError(t, sinkSrcNoCreds.Flush(context.Background()), "401 Unauthorized: ")
flushErrNoCreds := sinkSrcNoCreds.Flush(context.Background())
require.True(t, testutils.IsError(flushErrNoCreds, "webhook sink HTTP error"), flushErrNoCreds)
require.True(t, testutils.IsError(flushErrNoCreds, "401 Unauthorized"), flushErrNoCreds)

// wrong credentials should result in a 401 as well
var wrongAuthHeader string
Expand All @@ -316,7 +319,9 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) {

require.NoError(t, sinkSrcWrongCreds.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc, nil))

require.EqualError(t, sinkSrcWrongCreds.Flush(context.Background()), "401 Unauthorized: ")
flushErrWrongCreds := sinkSrcWrongCreds.Flush(context.Background())
require.True(t, testutils.IsError(flushErrWrongCreds, "webhook sink HTTP error"), flushErrWrongCreds)
require.True(t, testutils.IsError(flushErrWrongCreds, "401 Unauthorized"), flushErrWrongCreds)

require.NoError(t, sinkSrc.Close())
require.NoError(t, sinkSrcNoCreds.Close())
Expand Down Expand Up @@ -403,7 +408,9 @@ func TestWebhookSinkConfig(t *testing.T) {

require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc, nil))

require.EqualError(t, sinkSrc.Flush(context.Background()), "500 Internal Server Error: ")
flushErr := sinkSrc.Flush(context.Background())
require.True(t, testutils.IsError(flushErr, "webhook sink HTTP error"), flushErr)
require.True(t, testutils.IsError(flushErr, "500 Internal Server Error"), flushErr)

// ensure that failures are retried the expected maximum number of times
// before returning error
Expand Down Expand Up @@ -445,7 +452,9 @@ func TestWebhookSinkConfig(t *testing.T) {

require.NoError(t, sinkSrc.EmitRow(context.Background(), noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, pool.alloc(), nil))

require.EqualError(t, sinkSrc.Flush(context.Background()), "500 Internal Server Error: ")
flushErr := sinkSrc.Flush(context.Background())
require.True(t, testutils.IsError(flushErr, "webhook sink HTTP error"), flushErr)
require.True(t, testutils.IsError(flushErr, "500 Internal Server Error"), flushErr)

// ensure that failures are retried the expected maximum number of times
// before returning error
Expand Down Expand Up @@ -678,7 +687,14 @@ func TestWebhookSinkShutsDownOnError(t *testing.T) {

require.NoError(t, sinkSrc.EmitRow(ctx, noTopic{}, []byte("[1001]"), []byte(`{"after":{"col1":"val1","rowid":1000},"key":[1001],"topic:":"foo"}`), zeroTS, zeroTS, zeroAlloc, nil))
// error should be propagated immediately in the next call
require.EqualError(t, sinkSrc.Flush(ctx), "500 Internal Server Error: ")
flushErr := sinkSrc.Flush(ctx)
require.True(t, testutils.IsError(flushErr, "webhook sink HTTP error"), flushErr)
require.True(t, testutils.IsError(flushErr, "500 Internal Server Error"), flushErr)

// The HTTP status must survive redaction so operators can debug
// from anonymized logs.
redacted := string(redact.Sprint(flushErr).Redact())
require.Contains(t, redacted, "500 Internal Server Error")

// check that no messages have been delivered
require.Equal(t, "", sinkDest.Pop())
Expand Down Expand Up @@ -1040,10 +1056,18 @@ func TestWebhookSinkErrorCompressedResponse(t *testing.T) {
zeroTS,
zeroAlloc, nil))

err = sinkSrc.Flush(ctx)
require.Error(t, err)
// Verify error body is decompressed
require.Equal(t, fmt.Sprintf(`500 Internal Server Error: %s`, responseBody), err.Error())
flushErr := sinkSrc.Flush(ctx)
require.Error(t, flushErr)
// Verify error contains the HTTP status and the decompressed body.
require.True(t, testutils.IsError(flushErr, "webhook sink HTTP error"), flushErr)
require.True(t, testutils.IsError(flushErr, "500 Internal Server Error"), flushErr)
require.True(t, testutils.IsError(flushErr, responseBody), flushErr)

// The HTTP status must survive redaction, but the response body
// must be redacted.
redacted := string(redact.Sprint(flushErr).Redact())
require.Contains(t, redacted, "500 Internal Server Error")
require.NotContains(t, redacted, responseBody)

// Verify no messages delivered
require.Equal(t, "", sinkDest.Pop())
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/sink_webhook_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

const (
Expand Down Expand Up @@ -272,7 +273,7 @@ func (sc *webhookSinkClient) Flush(ctx context.Context, batch SinkPayload) error
req.Body = b
res, err := sc.client.Do(req)
if err != nil {
return err
return errors.Wrap(err, "webhook sink request failed")
}
defer res.Body.Close()

Expand All @@ -282,7 +283,8 @@ func (sc *webhookSinkClient) Flush(ctx context.Context, batch SinkPayload) error
if err != nil {
return errors.Wrapf(err, "failed to read body for HTTP response with status: %d", res.StatusCode)
}
return fmt.Errorf("%s: %s", res.Status, string(resBody))
return errors.Newf("webhook sink HTTP error %s: %s",
redact.Safe(res.Status), resBody)
}
return nil
}
Expand Down
Loading