Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
defaultRetryJitterFraction = 0.5
importBulkRoute = "/authzed.api.v1.PermissionsService/ImportBulkRelationships"
exportBulkRoute = "/authzed.api.v1.PermissionsService/ExportBulkRelationships"
readRelationshipsRoute = "/authzed.api.v1.PermissionsService/ReadRelationships"
watchRoute = "/authzed.api.v1.WatchService/Watch"
)

Expand Down Expand Up @@ -236,7 +237,9 @@ func DialOptsFromFlags(cmd *cobra.Command, token storage.Token) ([]grpc.DialOpti
// retrying the bulk import in backup/restore logic is handled manually.
// retrying bulk export is also handled manually, because the default behavior is
// to start at the beginning of the stream, which produces duplicate values.
selector.StreamClientInterceptor(retry.StreamClientInterceptor(retryOpts...), selector.MatchFunc(isNoneOf(importBulkRoute, exportBulkRoute, watchRoute))),
// read relationships is also excluded, because replaying that stream silently
// duplicates results for callers that persist or print the stream incrementally.
selector.StreamClientInterceptor(retry.StreamClientInterceptor(retryOpts...), selector.MatchFunc(isNoneOf(importBulkRoute, exportBulkRoute, readRelationshipsRoute, watchRoute))),
}

if !cobrautil.MustGetBool(cmd, "skip-version-check") {
Expand Down
15 changes: 15 additions & 0 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ func (fss *fakeServer) Watch(*v1.WatchRequest, grpc.ServerStreamingServer[v1.Wat
return status.Errorf(codes.Unavailable, "")
}

func (fss *fakeServer) ReadRelationships(*v1.ReadRelationshipsRequest, grpc.ServerStreamingServer[v1.ReadRelationshipsResponse]) error {
fss.testFunc()
return status.Errorf(codes.Unavailable, "")
}

func TestRetries(t *testing.T) {
ctx := t.Context()
var callCount uint
Expand Down Expand Up @@ -254,4 +259,14 @@ func TestDoesNotRetry(t *testing.T) {
grpcutil.RequireStatus(t, codes.Unavailable, err)
require.Equal(t, uint(1), callCount)
})

t.Run("read_relationships", func(t *testing.T) {
callCount = 0
stream, err := c.ReadRelationships(ctx, &v1.ReadRelationshipsRequest{})
require.NoError(t, err)
resp, err := stream.Recv()
require.Nil(t, resp)
grpcutil.RequireStatus(t, codes.Unavailable, err)
require.Equal(t, uint(1), callCount)
})
}
67 changes: 61 additions & 6 deletions internal/cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -35,6 +36,13 @@ import (

const doNotReturnIfExists = false

const serverlessProgressPrefix = "serverless:"

type serverlessProgress struct {
ResourceType string `json:"resource_type"`
Cursor string `json:"cursor"`
}

// cobraRunEFunc is the signature of a cobra.Command.RunE function.
type cobraRunEFunc = func(cmd *cobra.Command, args []string) (err error)

Expand Down Expand Up @@ -236,6 +244,35 @@ func CloseAndJoin(e *error, maybeCloser any) {
}
}

func encodeServerlessProgress(resourceType, cursor string) (string, error) {
progressBytes, err := json.Marshal(serverlessProgress{
ResourceType: resourceType,
Cursor: cursor,
})
if err != nil {
return "", fmt.Errorf("failed to encode backup progress: %w", err)
}

return serverlessProgressPrefix + string(progressBytes), nil
}

func decodeServerlessProgress(progress string) (string, string, error) {
if !strings.HasPrefix(progress, serverlessProgressPrefix) {
return "", progress, nil
}

var state serverlessProgress
if err := json.Unmarshal([]byte(strings.TrimPrefix(progress, serverlessProgressPrefix)), &state); err != nil {
return "", "", fmt.Errorf("failed to decode backup progress: %w", err)
}

if state.ResourceType == "" || state.Cursor == "" {
return "", "", errors.New("backup progress marker is missing required fields")
}

return state.ResourceType, state.Cursor, nil
}

func backupCreateCmdFunc(cmd *cobra.Command, args []string) (err error) {
backupFileName, err := computeBackupFileName(cmd, args)
if err != nil {
Expand Down Expand Up @@ -281,7 +318,8 @@ func takeBackup(ctx context.Context, spiceClient client.Client, encoder backupfo
return err
}

var cursor string
var resumeCursor string
var resumeCursorObj string
if encoder == nil {
fencoder, backupExisted, err := backupformat.NewFileEncoder(backupFileName)
if err != nil {
Expand All @@ -290,10 +328,17 @@ func takeBackup(ctx context.Context, spiceClient client.Client, encoder backupfo
encoder = backupformat.WithProgress(backupformat.WithRewriter(rw, fencoder))
defer CloseAndJoin(&err, encoder)
if backupExisted {
cursor, err = fencoder.Cursor()
progress, err := fencoder.Cursor()
if err != nil {
return err
}
resumeCursorObj, resumeCursor, err = decodeServerlessProgress(progress)
if err != nil {
return err
}
if resumeCursor != "" && resumeCursorObj == "" {
return errors.New("cannot resume this backup: the saved serverless progress marker was created by an older zed version without resource type metadata; remove the incomplete backup and rerun the backup")
}
} else {
if err := encoder.WriteSchema(schemaResp.SchemaText, revision.Token); err != nil {
return err
Expand All @@ -306,14 +351,21 @@ func takeBackup(ctx context.Context, spiceClient client.Client, encoder backupfo
return def.Name
})).Msg("parsed object definitions")

var cursorObj string // Tracks the definition the cursor was on
for _, def := range compiledSchema.ObjectDefinitions {
if resumeCursor != "" && def.Name != resumeCursorObj {
continue
}

req := &v1.ReadRelationshipsRequest{
RelationshipFilter: &v1.RelationshipFilter{ResourceType: def.Name},
OptionalLimit: pageLimit,
}
if cursor != "" && cursorObj == def.Name {
cursor := ""
if resumeCursor != "" {
cursor = resumeCursor
req.OptionalCursor = &v1.Cursor{Token: cursor}
resumeCursor = ""
resumeCursorObj = ""
} else {
req.Consistency = &v1.Consistency{
Requirement: &v1.Consistency_AtExactSnapshot{
Expand Down Expand Up @@ -345,9 +397,12 @@ func takeBackup(ctx context.Context, spiceClient client.Client, encoder backupfo
return fmt.Errorf("aborted backup: %w", err)
default:
cursor = msg.AfterResultCursor.Token
cursorObj = def.Name
log.Trace().Str("cursor", cursor).Stringer("relationship", msg.Relationship).Msg("appending relationship")
if err := encoder.Append(msg.Relationship, cursor); err != nil {
progress, err := encodeServerlessProgress(def.Name, cursor)
if err != nil {
return err
}
if err := encoder.Append(msg.Relationship, progress); err != nil {
return err
}
}
Expand Down
196 changes: 196 additions & 0 deletions internal/cmd/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,151 @@
client.assertAllRecvCalls()
}

func TestTakeBackupResumesServerlessBackupAtSavedDefinition(t *testing.T) {
schema := `definition doc {
relation view: user
}

definition photo {
relation view: user
}

definition user {}`
revision := &v1.ZedToken{Token: "serverless-revision"}
docRel := tuple.MustParseV1Rel("doc:first#view@user:tom")
photoRel1 := tuple.MustParseV1Rel("photo:first#view@user:tom")
photoRel2 := tuple.MustParseV1Rel("photo:second#view@user:tom")

firstRun := &mockClientForServerlessBackup{
t: t,
readSchemaCalls: []func() (*v1.ReadSchemaResponse, error){
func() (*v1.ReadSchemaResponse, error) {
return &v1.ReadSchemaResponse{SchemaText: schema}, nil
},
},
readRelationshipCalls: []func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error){
func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error) {
require.Equal(t, "doc", req.RelationshipFilter.ResourceType)
require.Equal(t, uint32(1), req.OptionalLimit)
require.NotNil(t, req.Consistency)
return &mockReadRelationshipsStream{
recvCalls: []func() (*v1.ReadRelationshipsResponse, error){
func() (*v1.ReadRelationshipsResponse, error) {
return &v1.ReadRelationshipsResponse{
ReadAt: revision,
Relationship: docRel,
AfterResultCursor: &v1.Cursor{Token: "doc-revision-cursor"},

Check failure on line 734 in internal/cmd/backup_test.go

View workflow job for this annotation

GitHub Actions / Lint Go

G101: Potential hardcoded credentials (gosec)
}, nil
},
},
}, nil
},
func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error) {
require.Equal(t, "doc", req.RelationshipFilter.ResourceType)
require.Nil(t, req.OptionalCursor)
require.NotNil(t, req.Consistency)
return &mockReadRelationshipsStream{
recvCalls: []func() (*v1.ReadRelationshipsResponse, error){
func() (*v1.ReadRelationshipsResponse, error) {
return &v1.ReadRelationshipsResponse{
ReadAt: revision,
Relationship: docRel,
AfterResultCursor: &v1.Cursor{Token: "doc-cursor"},

Check failure on line 750 in internal/cmd/backup_test.go

View workflow job for this annotation

GitHub Actions / Lint Go

G101: Potential hardcoded credentials (gosec)
}, nil
},
},
}, nil
},
func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error) {
require.Equal(t, "photo", req.RelationshipFilter.ResourceType)
require.Nil(t, req.OptionalCursor)
require.NotNil(t, req.Consistency)
return &mockReadRelationshipsStream{
recvCalls: []func() (*v1.ReadRelationshipsResponse, error){
func() (*v1.ReadRelationshipsResponse, error) {
return &v1.ReadRelationshipsResponse{
ReadAt: revision,
Relationship: photoRel1,
AfterResultCursor: &v1.Cursor{Token: "photo-cursor-1"},

Check failure on line 766 in internal/cmd/backup_test.go

View workflow job for this annotation

GitHub Actions / Lint Go

G101: Potential hardcoded credentials (gosec)
}, nil
},
func() (*v1.ReadRelationshipsResponse, error) {
return nil, status.Error(codes.Internal, "interrupted backup")
},
},
}, nil
},
},
}

backupFileName := filepath.Join(t.TempDir(), "serverless.zedbackup")
err := takeBackup(t.Context(), firstRun, nil, backupFileName, &backupformat.NoopRewriter{}, 0)
require.ErrorContains(t, err, "interrupted backup")
require.FileExists(t, backupFileName+".lock")

secondRun := &mockClientForServerlessBackup{
t: t,
readSchemaCalls: []func() (*v1.ReadSchemaResponse, error){
func() (*v1.ReadSchemaResponse, error) {
return &v1.ReadSchemaResponse{SchemaText: schema}, nil
},
},
readRelationshipCalls: []func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error){
func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error) {
require.Equal(t, "doc", req.RelationshipFilter.ResourceType)
require.Equal(t, uint32(1), req.OptionalLimit)
require.NotNil(t, req.Consistency)
return &mockReadRelationshipsStream{
recvCalls: []func() (*v1.ReadRelationshipsResponse, error){
func() (*v1.ReadRelationshipsResponse, error) {
return &v1.ReadRelationshipsResponse{
ReadAt: revision,
Relationship: docRel,
AfterResultCursor: &v1.Cursor{Token: "doc-revision-cursor"},
}, nil
},
},
}, nil
},
func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error) {
require.Equal(t, "photo", req.RelationshipFilter.ResourceType)
require.NotNil(t, req.OptionalCursor)
require.Equal(t, "photo-cursor-1", req.OptionalCursor.Token)
require.Nil(t, req.Consistency)
return &mockReadRelationshipsStream{
recvCalls: []func() (*v1.ReadRelationshipsResponse, error){
func() (*v1.ReadRelationshipsResponse, error) {
return &v1.ReadRelationshipsResponse{
ReadAt: revision,
Relationship: photoRel2,
AfterResultCursor: &v1.Cursor{Token: "photo-cursor-2"},
}, nil
},
},
}, nil
},
func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error) {
require.Equal(t, "user", req.RelationshipFilter.ResourceType)
require.Nil(t, req.OptionalCursor)
require.NotNil(t, req.Consistency)
return &mockReadRelationshipsStream{}, nil
},
},
}

err = takeBackup(t.Context(), secondRun, nil, backupFileName, &backupformat.NoopRewriter{}, 0)
require.NoError(t, err)
require.NoFileExists(t, backupFileName+".lock")

validateBackup(t, backupFileName, schema, revision, []string{
"doc:first#view@user:tom",
"photo:first#view@user:tom",
"photo:second#view@user:tom",
})
firstRun.assertAllReadRelationshipCalls()
secondRun.assertAllReadRelationshipCalls()
}

func TestRevisionForServerless(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -817,3 +962,54 @@
func (m *mockClientForBackup) assertAllRecvCalls() {
require.Equal(m.t, len(m.recvCalls), m.recvCallIndex, "the number of provided recvCalls should match the number of invocations")
}

type mockClientForServerlessBackup struct {
client.Client
t *testing.T
readSchemaCalls []func() (*v1.ReadSchemaResponse, error)
readSchemaCallsIndex int
readRelationshipCalls []func(t *testing.T, req *v1.ReadRelationshipsRequest) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error)
readCallsIndex int
}

func (m *mockClientForServerlessBackup) ReadSchema(_ context.Context, _ *v1.ReadSchemaRequest, _ ...grpc.CallOption) (*v1.ReadSchemaResponse, error) {
if m.readSchemaCallsIndex == len(m.readSchemaCalls) {
m.t.FailNow()
return nil, nil
}

readSchemaCall := m.readSchemaCalls[m.readSchemaCallsIndex]
m.readSchemaCallsIndex++
return readSchemaCall()
}

func (m *mockClientForServerlessBackup) ReadRelationships(_ context.Context, req *v1.ReadRelationshipsRequest, _ ...grpc.CallOption) (grpc.ServerStreamingClient[v1.ReadRelationshipsResponse], error) {
if m.readCallsIndex == len(m.readRelationshipCalls) {
m.t.FailNow()
return nil, nil
}

readRelationshipsCall := m.readRelationshipCalls[m.readCallsIndex]
m.readCallsIndex++
return readRelationshipsCall(m.t, req)
}

func (m *mockClientForServerlessBackup) assertAllReadRelationshipCalls() {
require.Equal(m.t, len(m.readRelationshipCalls), m.readCallsIndex, "the number of provided read relationship calls should match the number of invocations")
}

type mockReadRelationshipsStream struct {
grpc.ServerStreamingClient[v1.ReadRelationshipsResponse]
recvCalls []func() (*v1.ReadRelationshipsResponse, error)
recvCallIndex int
}

func (m *mockReadRelationshipsStream) Recv() (*v1.ReadRelationshipsResponse, error) {
if m.recvCallIndex == len(m.recvCalls) {
return nil, io.EOF
}

recvCall := m.recvCalls[m.recvCallIndex]
m.recvCallIndex++
return recvCall()
}
Loading