From 7c07dc4edce5d12033188c73d6112b2c1e0e4033 Mon Sep 17 00:00:00 2001 From: Kyle Brown Date: Tue, 17 Mar 2026 14:57:42 -0700 Subject: [PATCH] fix serverless backup resume bug --- internal/client/client.go | 5 +- internal/client/client_test.go | 15 +++ internal/cmd/backup.go | 67 ++++++++++- internal/cmd/backup_test.go | 196 +++++++++++++++++++++++++++++++++ 4 files changed, 276 insertions(+), 7 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index e11a3b62..0b93561b 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -44,6 +44,7 @@ const ( defaultRetryJitterFraction = 0.5 importBulkRoute = "/authzed.api.v1.PermissionsService/ImportBulkRelationships" exportBulkRoute = "/authzed.api.v1.PermissionsService/ExportBulkRelationships" + readRelationshipsRoute = "/authzed.api.v1.PermissionsService/ReadRelationships" watchRoute = "/authzed.api.v1.WatchService/Watch" ) @@ -236,7 +237,9 @@ func DialOptsFromFlags(cmd *cobra.Command, token storage.Token) ([]grpc.DialOpti // retrying the bulk import in backup/restore logic is handled manually. // retrying bulk export is also handled manually, because the default behavior is // to start at the beginning of the stream, which produces duplicate values. - selector.StreamClientInterceptor(retry.StreamClientInterceptor(retryOpts...), selector.MatchFunc(isNoneOf(importBulkRoute, exportBulkRoute, watchRoute))), + // read relationships is also excluded, because replaying that stream silently + // duplicates results for callers that persist or print the stream incrementally. + selector.StreamClientInterceptor(retry.StreamClientInterceptor(retryOpts...), selector.MatchFunc(isNoneOf(importBulkRoute, exportBulkRoute, readRelationshipsRoute, watchRoute))), } if !cobrautil.MustGetBool(cmd, "skip-version-check") { diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 7ca833e8..9c6dd2e7 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -155,6 +155,11 @@ func (fss *fakeServer) Watch(*v1.WatchRequest, grpc.ServerStreamingServer[v1.Wat return status.Errorf(codes.Unavailable, "") } +func (fss *fakeServer) ReadRelationships(*v1.ReadRelationshipsRequest, grpc.ServerStreamingServer[v1.ReadRelationshipsResponse]) error { + fss.testFunc() + return status.Errorf(codes.Unavailable, "") +} + func TestRetries(t *testing.T) { ctx := t.Context() var callCount uint @@ -254,4 +259,14 @@ func TestDoesNotRetry(t *testing.T) { grpcutil.RequireStatus(t, codes.Unavailable, err) require.Equal(t, uint(1), callCount) }) + + t.Run("read_relationships", func(t *testing.T) { + callCount = 0 + stream, err := c.ReadRelationships(ctx, &v1.ReadRelationshipsRequest{}) + require.NoError(t, err) + resp, err := stream.Recv() + require.Nil(t, resp) + grpcutil.RequireStatus(t, codes.Unavailable, err) + require.Equal(t, uint(1), callCount) + }) } diff --git a/internal/cmd/backup.go b/internal/cmd/backup.go index 71347d3d..039efd1c 100644 --- a/internal/cmd/backup.go +++ b/internal/cmd/backup.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "encoding/json" "errors" "fmt" "io" @@ -35,6 +36,13 @@ import ( const doNotReturnIfExists = false +const serverlessProgressPrefix = "serverless:" + +type serverlessProgress struct { + ResourceType string `json:"resource_type"` + Cursor string `json:"cursor"` +} + // cobraRunEFunc is the signature of a cobra.Command.RunE function. type cobraRunEFunc = func(cmd *cobra.Command, args []string) (err error) @@ -236,6 +244,35 @@ func CloseAndJoin(e *error, maybeCloser any) { } } +func encodeServerlessProgress(resourceType, cursor string) (string, error) { + progressBytes, err := json.Marshal(serverlessProgress{ + ResourceType: resourceType, + Cursor: cursor, + }) + if err != nil { + return "", fmt.Errorf("failed to encode backup progress: %w", err) + } + + return serverlessProgressPrefix + string(progressBytes), nil +} + +func decodeServerlessProgress(progress string) (string, string, error) { + if !strings.HasPrefix(progress, serverlessProgressPrefix) { + return "", progress, nil + } + + var state serverlessProgress + if err := json.Unmarshal([]byte(strings.TrimPrefix(progress, serverlessProgressPrefix)), &state); err != nil { + return "", "", fmt.Errorf("failed to decode backup progress: %w", err) + } + + if state.ResourceType == "" || state.Cursor == "" { + return "", "", errors.New("backup progress marker is missing required fields") + } + + return state.ResourceType, state.Cursor, nil +} + func backupCreateCmdFunc(cmd *cobra.Command, args []string) (err error) { backupFileName, err := computeBackupFileName(cmd, args) if err != nil { @@ -281,7 +318,8 @@ func takeBackup(ctx context.Context, spiceClient client.Client, encoder backupfo return err } - var cursor string + var resumeCursor string + var resumeCursorObj string if encoder == nil { fencoder, backupExisted, err := backupformat.NewFileEncoder(backupFileName) if err != nil { @@ -290,10 +328,17 @@ func takeBackup(ctx context.Context, spiceClient client.Client, encoder backupfo encoder = backupformat.WithProgress(backupformat.WithRewriter(rw, fencoder)) defer CloseAndJoin(&err, encoder) if backupExisted { - cursor, err = fencoder.Cursor() + progress, err := fencoder.Cursor() + if err != nil { + return err + } + resumeCursorObj, resumeCursor, err = decodeServerlessProgress(progress) if err != nil { return err } + if resumeCursor != "" && resumeCursorObj == "" { + return errors.New("cannot resume this backup: the saved serverless progress marker was created by an older zed version without resource type metadata; remove the incomplete backup and rerun the backup") + } } else { if err := encoder.WriteSchema(schemaResp.SchemaText, revision.Token); err != nil { return err @@ -306,14 +351,21 @@ func takeBackup(ctx context.Context, spiceClient client.Client, encoder backupfo return def.Name })).Msg("parsed object definitions") - var cursorObj string // Tracks the definition the cursor was on for _, def := range compiledSchema.ObjectDefinitions { + if resumeCursor != "" && def.Name != resumeCursorObj { + continue + } + req := &v1.ReadRelationshipsRequest{ RelationshipFilter: &v1.RelationshipFilter{ResourceType: def.Name}, OptionalLimit: pageLimit, } - if cursor != "" && cursorObj == def.Name { + cursor := "" + if resumeCursor != "" { + cursor = resumeCursor req.OptionalCursor = &v1.Cursor{Token: cursor} + resumeCursor = "" + resumeCursorObj = "" } else { req.Consistency = &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ @@ -345,9 +397,12 @@ func takeBackup(ctx context.Context, spiceClient client.Client, encoder backupfo return fmt.Errorf("aborted backup: %w", err) default: cursor = msg.AfterResultCursor.Token - cursorObj = def.Name log.Trace().Str("cursor", cursor).Stringer("relationship", msg.Relationship).Msg("appending relationship") - if err := encoder.Append(msg.Relationship, cursor); err != nil { + progress, err := encodeServerlessProgress(def.Name, cursor) + if err != nil { + return err + } + if err := encoder.Append(msg.Relationship, progress); err != nil { return err } } diff --git a/internal/cmd/backup_test.go b/internal/cmd/backup_test.go index 3fdf1562..3bd5d67a 100644 --- a/internal/cmd/backup_test.go +++ b/internal/cmd/backup_test.go @@ -698,6 +698,151 @@ func TestTakeBackupRecoversFromRetryableErrors(t *testing.T) { client.assertAllRecvCalls() } +func TestTakeBackupResumesServerlessBackupAtSavedDefinition(t *testing.T) { + schema := `definition doc { + relation view: user +} + +definition photo { + relation view: user +} + +definition user {}` + revision := &v1.ZedToken{Token: "serverless-revision"} + docRel := tuple.MustParseV1Rel("doc:first#view@user:tom") + photoRel1 := tuple.MustParseV1Rel("photo:first#view@user:tom") + photoRel2 := tuple.MustParseV1Rel("photo:second#view@user:tom") + + firstRun := &mockClientForServerlessBackup{ + t: t, + readSchemaCalls: []func() (*v1.ReadSchemaResponse, error){ + func() (*v1.ReadSchemaResponse, error) { + return &v1.ReadSchemaResponse{SchemaText: schema}, nil + }, + }, + readRelationshipCalls: []func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error){ + func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error) { + require.Equal(t, "doc", req.RelationshipFilter.ResourceType) + require.Equal(t, uint32(1), req.OptionalLimit) + require.NotNil(t, req.Consistency) + return &mockReadRelationshipsStream{ + recvCalls: []func() (*v1.ReadRelationshipsResponse, error){ + func() (*v1.ReadRelationshipsResponse, error) { + return &v1.ReadRelationshipsResponse{ + ReadAt: revision, + Relationship: docRel, + AfterResultCursor: &v1.Cursor{Token: "doc-revision-cursor"}, + }, nil + }, + }, + }, nil + }, + func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error) { + require.Equal(t, "doc", req.RelationshipFilter.ResourceType) + require.Nil(t, req.OptionalCursor) + require.NotNil(t, req.Consistency) + return &mockReadRelationshipsStream{ + recvCalls: []func() (*v1.ReadRelationshipsResponse, error){ + func() (*v1.ReadRelationshipsResponse, error) { + return &v1.ReadRelationshipsResponse{ + ReadAt: revision, + Relationship: docRel, + AfterResultCursor: &v1.Cursor{Token: "doc-cursor"}, + }, nil + }, + }, + }, nil + }, + func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error) { + require.Equal(t, "photo", req.RelationshipFilter.ResourceType) + require.Nil(t, req.OptionalCursor) + require.NotNil(t, req.Consistency) + return &mockReadRelationshipsStream{ + recvCalls: []func() (*v1.ReadRelationshipsResponse, error){ + func() (*v1.ReadRelationshipsResponse, error) { + return &v1.ReadRelationshipsResponse{ + ReadAt: revision, + Relationship: photoRel1, + AfterResultCursor: &v1.Cursor{Token: "photo-cursor-1"}, + }, nil + }, + func() (*v1.ReadRelationshipsResponse, error) { + return nil, status.Error(codes.Internal, "interrupted backup") + }, + }, + }, nil + }, + }, + } + + backupFileName := filepath.Join(t.TempDir(), "serverless.zedbackup") + err := takeBackup(t.Context(), firstRun, nil, backupFileName, &backupformat.NoopRewriter{}, 0) + require.ErrorContains(t, err, "interrupted backup") + require.FileExists(t, backupFileName+".lock") + + secondRun := &mockClientForServerlessBackup{ + t: t, + readSchemaCalls: []func() (*v1.ReadSchemaResponse, error){ + func() (*v1.ReadSchemaResponse, error) { + return &v1.ReadSchemaResponse{SchemaText: schema}, nil + }, + }, + readRelationshipCalls: []func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error){ + func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error) { + require.Equal(t, "doc", req.RelationshipFilter.ResourceType) + require.Equal(t, uint32(1), req.OptionalLimit) + require.NotNil(t, req.Consistency) + return &mockReadRelationshipsStream{ + recvCalls: []func() (*v1.ReadRelationshipsResponse, error){ + func() (*v1.ReadRelationshipsResponse, error) { + return &v1.ReadRelationshipsResponse{ + ReadAt: revision, + Relationship: docRel, + AfterResultCursor: &v1.Cursor{Token: "doc-revision-cursor"}, + }, nil + }, + }, + }, nil + }, + func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error) { + require.Equal(t, "photo", req.RelationshipFilter.ResourceType) + require.NotNil(t, req.OptionalCursor) + require.Equal(t, "photo-cursor-1", req.OptionalCursor.Token) + require.Nil(t, req.Consistency) + return &mockReadRelationshipsStream{ + recvCalls: []func() (*v1.ReadRelationshipsResponse, error){ + func() (*v1.ReadRelationshipsResponse, error) { + return &v1.ReadRelationshipsResponse{ + ReadAt: revision, + Relationship: photoRel2, + AfterResultCursor: &v1.Cursor{Token: "photo-cursor-2"}, + }, nil + }, + }, + }, nil + }, + func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error) { + require.Equal(t, "user", req.RelationshipFilter.ResourceType) + require.Nil(t, req.OptionalCursor) + require.NotNil(t, req.Consistency) + return &mockReadRelationshipsStream{}, nil + }, + }, + } + + err = takeBackup(t.Context(), secondRun, nil, backupFileName, &backupformat.NoopRewriter{}, 0) + require.NoError(t, err) + require.NoFileExists(t, backupFileName+".lock") + + validateBackup(t, backupFileName, schema, revision, []string{ + "doc:first#view@user:tom", + "photo:first#view@user:tom", + "photo:second#view@user:tom", + }) + firstRun.assertAllReadRelationshipCalls() + secondRun.assertAllReadRelationshipCalls() +} + func TestRevisionForServerless(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -817,3 +962,54 @@ func (m *mockClientForBackup) ExportBulkRelationships(_ context.Context, req *v1 func (m *mockClientForBackup) assertAllRecvCalls() { require.Equal(m.t, len(m.recvCalls), m.recvCallIndex, "the number of provided recvCalls should match the number of invocations") } + +type mockClientForServerlessBackup struct { + client.Client + t *testing.T + readSchemaCalls []func() (*v1.ReadSchemaResponse, error) + readSchemaCallsIndex int + readRelationshipCalls []func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error) + readCallsIndex int +} + +func (m *mockClientForServerlessBackup) ReadSchema(_ context.Context, _ *v1.ReadSchemaRequest, _ ...grpc.CallOption) (*v1.ReadSchemaResponse, error) { + if m.readSchemaCallsIndex == len(m.readSchemaCalls) { + m.t.FailNow() + return nil, nil + } + + readSchemaCall := m.readSchemaCalls[m.readSchemaCallsIndex] + m.readSchemaCallsIndex++ + return readSchemaCall() +} + +func (m *mockClientForServerlessBackup) ReadRelationships(_ context.Context, req *v1.ReadRelationshipsRequest, _ ...grpc.CallOption) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error) { + if m.readCallsIndex == len(m.readRelationshipCalls) { + m.t.FailNow() + return nil, nil + } + + readRelationshipsCall := m.readRelationshipCalls[m.readCallsIndex] + m.readCallsIndex++ + return readRelationshipsCall(m.t, req) +} + +func (m *mockClientForServerlessBackup) assertAllReadRelationshipCalls() { + require.Equal(m.t, len(m.readRelationshipCalls), m.readCallsIndex, "the number of provided read relationship calls should match the number of invocations") +} + +type mockReadRelationshipsStream struct { + grpc.ServerStreamingClient[v1.ReadRelationshipsResponse] + recvCalls []func() (*v1.ReadRelationshipsResponse, error) + recvCallIndex int +} + +func (m *mockReadRelationshipsStream) Recv() (*v1.ReadRelationshipsResponse, error) { + if m.recvCallIndex == len(m.recvCalls) { + return nil, io.EOF + } + + recvCall := m.recvCalls[m.recvCallIndex] + m.recvCallIndex++ + return recvCall() +}