From 5896ae4fc5389628439d9bd92e2e7d2cd7c57bb4 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 11 Dec 2025 15:14:27 +0000 Subject: [PATCH 01/10] Fs cp now upload in parallel --- NEXT_CHANGELOG.md | 3 ++ cmd/fs/cp.go | 81 +++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 70 insertions(+), 14 deletions(-) diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 7f34ce2a5e..7a9476e5a1 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -6,6 +6,9 @@ ### CLI +* Improve performance of `databricks fs cp` command by parallelizing file uploads when + copying directories with the `--recursive` flag. + ### Bundles ### Dependency updates diff --git a/cmd/fs/cp.go b/cmd/fs/cp.go index 275620d7c3..31b21031f7 100644 --- a/cmd/fs/cp.go +++ b/cmd/fs/cp.go @@ -9,13 +9,19 @@ import ( "path" "path/filepath" "strings" + "sync" "github.com/databricks/cli/cmd/root" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/filer" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" ) +// Maximum number of concurrent file copy operations. The number was set +// conservatively and might be adjusted in the future. +const maxWorkers = 16 + type copy struct { overwrite bool recursive bool @@ -25,40 +31,83 @@ type copy struct { targetFiler filer.Filer sourceScheme string targetScheme string + + mu sync.Mutex // protect output from concurrent writes +} + +type copyTask struct { + sourcePath string + targetPath string } -func (c *copy) cpWriteCallback(sourceDir, targetDir string) fs.WalkDirFunc { +// cpDirToDir recursively copies the contents of a directory to another +// directory. +// +// There is no guarantee on the order in which the files are copied. +// +// The method does not take care of retrying on error; this is considered to +// be the responsibility of the Filer implementation. If a file copy fails, +// the error is returned and the other copies are cancelled. +func (c *copy) cpDirToDir(sourceDir, targetDir string) error { + if !c.recursive { + return fmt.Errorf("source path %s is a directory. Please specify the --recursive flag", sourceDir) + } + + // Walk the source directory and collect all files to copy. + var tasks []copyTask + sourceFs := filer.NewFS(c.ctx, c.sourceFiler) + err := fs.WalkDir(sourceFs, sourceDir, c.cpCollectCallback(sourceDir, targetDir, &tasks)) + if err != nil { + return err + } + + if len(tasks) == 0 { + return nil // no file to copy + } + + // Process each file copy in parallel using a fixed number of workers. + g, ctx := errgroup.WithContext(c.ctx) + g.SetLimit(min(maxWorkers, len(tasks))) + for _, task := range tasks { + g.Go(func() error { + if ctx.Err() != nil { + return ctx.Err() + } + return c.cpFileToFile(task.sourcePath, task.targetPath) + }) + } + + return g.Wait() +} + +func (c *copy) cpCollectCallback(sourceDir, targetDir string, tasks *[]copyTask) fs.WalkDirFunc { return func(sourcePath string, d fs.DirEntry, err error) error { if err != nil { return err } - // Compute path relative to the target directory + // Compute path relative to the source directory. relPath, err := filepath.Rel(sourceDir, sourcePath) if err != nil { return err } relPath = filepath.ToSlash(relPath) - // Compute target path for the file + // Compute target path for the file. targetPath := path.Join(targetDir, relPath) - // create directory and return early + // Create directory and return early. if d.IsDir() { return c.targetFiler.Mkdir(c.ctx, targetPath) } - return c.cpFileToFile(sourcePath, targetPath) - } -} - -func (c *copy) cpDirToDir(sourceDir, targetDir string) error { - if !c.recursive { - return fmt.Errorf("source path %s is a directory. Please specify the --recursive flag", sourceDir) + // Collect file copy tasks. + *tasks = append(*tasks, copyTask{ + sourcePath: sourcePath, + targetPath: targetPath, + }) + return nil } - - sourceFs := filer.NewFS(c.ctx, c.sourceFiler) - return fs.WalkDir(sourceFs, sourceDir, c.cpWriteCallback(sourceDir, targetDir)) } func (c *copy) cpFileToDir(sourcePath, targetDir string) error { @@ -109,6 +158,8 @@ func (c *copy) emitFileSkippedEvent(sourcePath, targetPath string) error { event := newFileSkippedEvent(fullSourcePath, fullTargetPath) template := "{{.SourcePath}} -> {{.TargetPath}} (skipped; already exists)\n" + c.mu.Lock() + defer c.mu.Unlock() return cmdio.RenderWithTemplate(c.ctx, event, "", template) } @@ -125,6 +176,8 @@ func (c *copy) emitFileCopiedEvent(sourcePath, targetPath string) error { event := newFileCopiedEvent(fullSourcePath, fullTargetPath) template := "{{.SourcePath}} -> {{.TargetPath}}\n" + c.mu.Lock() + defer c.mu.Unlock() return cmdio.RenderWithTemplate(c.ctx, event, "", template) } From b9333981ca5727817ceb3e31138c4233562c00f7 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 11 Dec 2025 21:00:22 +0000 Subject: [PATCH 02/10] Improve performance --- cmd/fs/cp.go | 76 ++++++++++++++++++++--------------------------- cmd/fs/cp_test.go | 27 +++++++++++++++++ 2 files changed, 59 insertions(+), 44 deletions(-) create mode 100644 cmd/fs/cp_test.go diff --git a/cmd/fs/cp.go b/cmd/fs/cp.go index 31b21031f7..a80ce53c74 100644 --- a/cmd/fs/cp.go +++ b/cmd/fs/cp.go @@ -18,13 +18,13 @@ import ( "golang.org/x/sync/errgroup" ) -// Maximum number of concurrent file copy operations. The number was set -// conservatively and might be adjusted in the future. -const maxWorkers = 16 +// Default number of concurrent file copy operations. +const defaultConcurrency = 16 type copy struct { - overwrite bool - recursive bool + overwrite bool + recursive bool + concurrency int ctx context.Context sourceFiler filer.Filer @@ -35,11 +35,6 @@ type copy struct { mu sync.Mutex // protect output from concurrent writes } -type copyTask struct { - sourcePath string - targetPath string -} - // cpDirToDir recursively copies the contents of a directory to another // directory. // @@ -53,35 +48,13 @@ func (c *copy) cpDirToDir(sourceDir, targetDir string) error { return fmt.Errorf("source path %s is a directory. Please specify the --recursive flag", sourceDir) } - // Walk the source directory and collect all files to copy. - var tasks []copyTask - sourceFs := filer.NewFS(c.ctx, c.sourceFiler) - err := fs.WalkDir(sourceFs, sourceDir, c.cpCollectCallback(sourceDir, targetDir, &tasks)) - if err != nil { - return err - } - - if len(tasks) == 0 { - return nil // no file to copy - } - - // Process each file copy in parallel using a fixed number of workers. + // Pool of workers to process copy operations in parallel. g, ctx := errgroup.WithContext(c.ctx) - g.SetLimit(min(maxWorkers, len(tasks))) - for _, task := range tasks { - g.Go(func() error { - if ctx.Err() != nil { - return ctx.Err() - } - return c.cpFileToFile(task.sourcePath, task.targetPath) - }) - } + g.SetLimit(c.concurrency) - return g.Wait() -} - -func (c *copy) cpCollectCallback(sourceDir, targetDir string, tasks *[]copyTask) fs.WalkDirFunc { - return func(sourcePath string, d fs.DirEntry, err error) error { + // Walk the source directory, queueing copy operations for processing. + sourceFs := filer.NewFS(c.ctx, c.sourceFiler) + err := fs.WalkDir(sourceFs, sourceDir, func(sourcePath string, d fs.DirEntry, err error) error { if err != nil { return err } @@ -96,18 +69,26 @@ func (c *copy) cpCollectCallback(sourceDir, targetDir string, tasks *[]copyTask) // Compute target path for the file. targetPath := path.Join(targetDir, relPath) - // Create directory and return early. + // Create the directory synchronously. This must happen before files + // are copied into it, and WalkDir guarantees directories are visited + // before their contents. if d.IsDir() { return c.targetFiler.Mkdir(c.ctx, targetPath) } - // Collect file copy tasks. - *tasks = append(*tasks, copyTask{ - sourcePath: sourcePath, - targetPath: targetPath, + // Queue file copy operation for processing. + g.Go(func() error { + if ctx.Err() != nil { + return ctx.Err() + } + return c.cpFileToFile(sourcePath, targetPath) }) return nil + }) + if err != nil { + return err } + return g.Wait() } func (c *copy) cpFileToDir(sourcePath, targetDir string) error { @@ -206,13 +187,20 @@ func newCpCommand() *cobra.Command { When copying a file, if TARGET_PATH is a directory, the file will be created inside the directory, otherwise the file is created at TARGET_PATH. `, - Args: root.ExactArgs(2), - PreRunE: root.MustWorkspaceClient, + Args: root.ExactArgs(2), } var c copy cmd.Flags().BoolVar(&c.overwrite, "overwrite", false, "overwrite existing files") cmd.Flags().BoolVarP(&c.recursive, "recursive", "r", false, "recursively copy files from directory") + cmd.Flags().IntVar(&c.concurrency, "concurrency", defaultConcurrency, "number of parallel copy operations") + + cmd.PreRunE = func(cmd *cobra.Command, args []string) error { + if c.concurrency <= 0 { + return fmt.Errorf("--concurrency must be at least 1") + } + return root.MustWorkspaceClient(cmd, args) + } cmd.RunE = func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() diff --git a/cmd/fs/cp_test.go b/cmd/fs/cp_test.go new file mode 100644 index 0000000000..87c1170b7e --- /dev/null +++ b/cmd/fs/cp_test.go @@ -0,0 +1,27 @@ +package fs + +import ( + "context" + "strings" + "testing" +) + +func TestCpConcurrencyValidation(t *testing.T) { + ctx := context.Background() + cmd := newCpCommand() + cmd.SetContext(ctx) + + // Test concurrency = 0 + cmd.SetArgs([]string{"src", "dst", "--concurrency", "0"}) + err := cmd.Execute() + if err == nil || !strings.Contains(err.Error(), "--concurrency must be at least 1") { + t.Errorf("expected error containing '--concurrency must be at least 1', got %v", err) + } + + // Test concurrency = -1 + cmd.SetArgs([]string{"src", "dst", "--concurrency", "-1"}) + err = cmd.Execute() + if err == nil || !strings.Contains(err.Error(), "--concurrency must be at least 1") { + t.Errorf("expected error containing '--concurrency must be at least 1', got %v", err) + } +} From 2fa03e1965a3211b9de4a64a8d9e5200d0e7dc36 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 11 Dec 2025 21:11:29 +0000 Subject: [PATCH 03/10] Use error value --- cmd/fs/cp.go | 6 +++++- cmd/fs/cp_test.go | 34 ++++++++++++++++++---------------- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/cmd/fs/cp.go b/cmd/fs/cp.go index a80ce53c74..24cb333aa7 100644 --- a/cmd/fs/cp.go +++ b/cmd/fs/cp.go @@ -21,6 +21,10 @@ import ( // Default number of concurrent file copy operations. const defaultConcurrency = 16 +// errInvalidConcurrency is returned when the value of the concurrency +// flag is invalid. +var errInvalidConcurrency = errors.New("--concurrency must be at least 1") + type copy struct { overwrite bool recursive bool @@ -197,7 +201,7 @@ func newCpCommand() *cobra.Command { cmd.PreRunE = func(cmd *cobra.Command, args []string) error { if c.concurrency <= 0 { - return fmt.Errorf("--concurrency must be at least 1") + return errInvalidConcurrency } return root.MustWorkspaceClient(cmd, args) } diff --git a/cmd/fs/cp_test.go b/cmd/fs/cp_test.go index 87c1170b7e..1d719368da 100644 --- a/cmd/fs/cp_test.go +++ b/cmd/fs/cp_test.go @@ -1,27 +1,29 @@ package fs import ( - "context" - "strings" + "errors" + "fmt" "testing" ) func TestCpConcurrencyValidation(t *testing.T) { - ctx := context.Background() - cmd := newCpCommand() - cmd.SetContext(ctx) - - // Test concurrency = 0 - cmd.SetArgs([]string{"src", "dst", "--concurrency", "0"}) - err := cmd.Execute() - if err == nil || !strings.Contains(err.Error(), "--concurrency must be at least 1") { - t.Errorf("expected error containing '--concurrency must be at least 1', got %v", err) + testCases := []struct { + concurrency int + wantError error + }{ + {-1337, errInvalidConcurrency}, + {-1, errInvalidConcurrency}, + {0, errInvalidConcurrency}, } - // Test concurrency = -1 - cmd.SetArgs([]string{"src", "dst", "--concurrency", "-1"}) - err = cmd.Execute() - if err == nil || !strings.Contains(err.Error(), "--concurrency must be at least 1") { - t.Errorf("expected error containing '--concurrency must be at least 1', got %v", err) + for _, tc := range testCases { + t.Run(fmt.Sprintf("concurrency=%d", tc.concurrency), func(t *testing.T) { + cmd := newCpCommand() + cmd.SetArgs([]string{"src", "dst", "--concurrency", fmt.Sprintf("%d", tc.concurrency)}) + err := cmd.Execute() + if !errors.Is(err, tc.wantError) { + t.Errorf("expected error %v, got %v", tc.wantError, err) + } + }) } } From d393388686c299298f588244c88dded8d202821c Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 11 Dec 2025 21:31:55 +0000 Subject: [PATCH 04/10] Cancel go routines --- cmd/fs/cp.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/cmd/fs/cp.go b/cmd/fs/cp.go index 24cb333aa7..0bbdc2c631 100644 --- a/cmd/fs/cp.go +++ b/cmd/fs/cp.go @@ -52,11 +52,16 @@ func (c *copy) cpDirToDir(sourceDir, targetDir string) error { return fmt.Errorf("source path %s is a directory. Please specify the --recursive flag", sourceDir) } + // Create cancellable context to ensure cleanup and that all goroutines + // are stopped when the function exits on any error path. + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + // Pool of workers to process copy operations in parallel. - g, ctx := errgroup.WithContext(c.ctx) + g, ctx := errgroup.WithContext(ctx) g.SetLimit(c.concurrency) - // Walk the source directory, queueing copy operations for processing. + // Walk the source directory, queueing file copy operations for processing. sourceFs := filer.NewFS(c.ctx, c.sourceFiler) err := fs.WalkDir(sourceFs, sourceDir, func(sourcePath string, d fs.DirEntry, err error) error { if err != nil { From 14cfad9121748b281284e33b14665aa4609a6f8f Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 11 Dec 2025 21:37:22 +0000 Subject: [PATCH 05/10] Properly propagate context --- cmd/fs/cp.go | 45 ++++++++++++++++++++++----------------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/cmd/fs/cp.go b/cmd/fs/cp.go index 0bbdc2c631..6149b4c611 100644 --- a/cmd/fs/cp.go +++ b/cmd/fs/cp.go @@ -30,7 +30,6 @@ type copy struct { recursive bool concurrency int - ctx context.Context sourceFiler filer.Filer targetFiler filer.Filer sourceScheme string @@ -47,14 +46,15 @@ type copy struct { // The method does not take care of retrying on error; this is considered to // be the responsibility of the Filer implementation. If a file copy fails, // the error is returned and the other copies are cancelled. -func (c *copy) cpDirToDir(sourceDir, targetDir string) error { +func (c *copy) cpDirToDir(ctx context.Context, sourceDir, targetDir string) error { if !c.recursive { return fmt.Errorf("source path %s is a directory. Please specify the --recursive flag", sourceDir) } // Create cancellable context to ensure cleanup and that all goroutines - // are stopped when the function exits on any error path. - ctx, cancel := context.WithCancel(c.ctx) + // are stopped when the function exits on any error path (e.g. permission + // denied when walking the source directory). + ctx, cancel := context.WithCancel(ctx) defer cancel() // Pool of workers to process copy operations in parallel. @@ -62,7 +62,7 @@ func (c *copy) cpDirToDir(sourceDir, targetDir string) error { g.SetLimit(c.concurrency) // Walk the source directory, queueing file copy operations for processing. - sourceFs := filer.NewFS(c.ctx, c.sourceFiler) + sourceFs := filer.NewFS(ctx, c.sourceFiler) err := fs.WalkDir(sourceFs, sourceDir, func(sourcePath string, d fs.DirEntry, err error) error { if err != nil { return err @@ -82,7 +82,7 @@ func (c *copy) cpDirToDir(sourceDir, targetDir string) error { // are copied into it, and WalkDir guarantees directories are visited // before their contents. if d.IsDir() { - return c.targetFiler.Mkdir(c.ctx, targetPath) + return c.targetFiler.Mkdir(ctx, targetPath) } // Queue file copy operation for processing. @@ -90,7 +90,7 @@ func (c *copy) cpDirToDir(sourceDir, targetDir string) error { if ctx.Err() != nil { return ctx.Err() } - return c.cpFileToFile(sourcePath, targetPath) + return c.cpFileToFile(ctx, sourcePath, targetPath) }) return nil }) @@ -100,42 +100,42 @@ func (c *copy) cpDirToDir(sourceDir, targetDir string) error { return g.Wait() } -func (c *copy) cpFileToDir(sourcePath, targetDir string) error { +func (c *copy) cpFileToDir(ctx context.Context, sourcePath, targetDir string) error { fileName := filepath.Base(sourcePath) targetPath := path.Join(targetDir, fileName) - return c.cpFileToFile(sourcePath, targetPath) + return c.cpFileToFile(ctx, sourcePath, targetPath) } -func (c *copy) cpFileToFile(sourcePath, targetPath string) error { +func (c *copy) cpFileToFile(ctx context.Context, sourcePath, targetPath string) error { // Get reader for file at source path - r, err := c.sourceFiler.Read(c.ctx, sourcePath) + r, err := c.sourceFiler.Read(ctx, sourcePath) if err != nil { return err } defer r.Close() if c.overwrite { - err = c.targetFiler.Write(c.ctx, targetPath, r, filer.OverwriteIfExists) + err = c.targetFiler.Write(ctx, targetPath, r, filer.OverwriteIfExists) if err != nil { return err } } else { - err = c.targetFiler.Write(c.ctx, targetPath, r) + err = c.targetFiler.Write(ctx, targetPath, r) // skip if file already exists if err != nil && errors.Is(err, fs.ErrExist) { - return c.emitFileSkippedEvent(sourcePath, targetPath) + return c.emitFileSkippedEvent(ctx, sourcePath, targetPath) } if err != nil { return err } } - return c.emitFileCopiedEvent(sourcePath, targetPath) + return c.emitFileCopiedEvent(ctx, sourcePath, targetPath) } // TODO: emit these events on stderr // TODO: add integration tests for these events -func (c *copy) emitFileSkippedEvent(sourcePath, targetPath string) error { +func (c *copy) emitFileSkippedEvent(ctx context.Context, sourcePath, targetPath string) error { fullSourcePath := sourcePath if c.sourceScheme != "" { fullSourcePath = path.Join(c.sourceScheme+":", sourcePath) @@ -150,10 +150,10 @@ func (c *copy) emitFileSkippedEvent(sourcePath, targetPath string) error { c.mu.Lock() defer c.mu.Unlock() - return cmdio.RenderWithTemplate(c.ctx, event, "", template) + return cmdio.RenderWithTemplate(ctx, event, "", template) } -func (c *copy) emitFileCopiedEvent(sourcePath, targetPath string) error { +func (c *copy) emitFileCopiedEvent(ctx context.Context, sourcePath, targetPath string) error { fullSourcePath := sourcePath if c.sourceScheme != "" { fullSourcePath = path.Join(c.sourceScheme+":", sourcePath) @@ -168,7 +168,7 @@ func (c *copy) emitFileCopiedEvent(sourcePath, targetPath string) error { c.mu.Lock() defer c.mu.Unlock() - return cmdio.RenderWithTemplate(c.ctx, event, "", template) + return cmdio.RenderWithTemplate(ctx, event, "", template) } // hasTrailingDirSeparator checks if a path ends with a directory separator. @@ -237,7 +237,6 @@ func newCpCommand() *cobra.Command { c.targetScheme = "dbfs" } - c.ctx = ctx c.sourceFiler = sourceFiler c.targetFiler = targetFiler @@ -249,7 +248,7 @@ func newCpCommand() *cobra.Command { // case 1: source path is a directory, then recursively create files at target path if sourceInfo.IsDir() { - return c.cpDirToDir(sourcePath, targetPath) + return c.cpDirToDir(ctx, sourcePath, targetPath) } // If target path has a trailing separator, trim it and let case 2 handle it @@ -260,11 +259,11 @@ func newCpCommand() *cobra.Command { // case 2: source path is a file, and target path is a directory. In this case // we copy the file to inside the directory if targetInfo, err := targetFiler.Stat(ctx, targetPath); err == nil && targetInfo.IsDir() { - return c.cpFileToDir(sourcePath, targetPath) + return c.cpFileToDir(ctx, sourcePath, targetPath) } // case 3: source path is a file, and target path is a file - return c.cpFileToFile(sourcePath, targetPath) + return c.cpFileToFile(ctx, sourcePath, targetPath) } v := newValidArgs() From 2a9ec1bd406c9f84b2b15c74671b334116859c9d Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 11 Dec 2025 21:41:29 +0000 Subject: [PATCH 06/10] Linter --- cmd/fs/cp_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/fs/cp_test.go b/cmd/fs/cp_test.go index 1d719368da..bf8930051c 100644 --- a/cmd/fs/cp_test.go +++ b/cmd/fs/cp_test.go @@ -3,6 +3,7 @@ package fs import ( "errors" "fmt" + "strconv" "testing" ) @@ -19,7 +20,7 @@ func TestCpConcurrencyValidation(t *testing.T) { for _, tc := range testCases { t.Run(fmt.Sprintf("concurrency=%d", tc.concurrency), func(t *testing.T) { cmd := newCpCommand() - cmd.SetArgs([]string{"src", "dst", "--concurrency", fmt.Sprintf("%d", tc.concurrency)}) + cmd.SetArgs([]string{"src", "dst", "--concurrency", strconv.Itoa(tc.concurrency)}) err := cmd.Execute() if !errors.Is(err, tc.wantError) { t.Errorf("expected error %v, got %v", tc.wantError, err) From 179a9443be3602c3e8e5708617fb834bf5d28733 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 12 Dec 2025 08:23:52 +0000 Subject: [PATCH 07/10] Test context cancellation --- cmd/fs/cp_test.go | 193 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 192 insertions(+), 1 deletion(-) diff --git a/cmd/fs/cp_test.go b/cmd/fs/cp_test.go index bf8930051c..98de33c74c 100644 --- a/cmd/fs/cp_test.go +++ b/cmd/fs/cp_test.go @@ -1,13 +1,20 @@ package fs import ( + "context" "errors" "fmt" + "io" + "io/fs" "strconv" + "strings" "testing" + "time" + + "github.com/databricks/cli/libs/filer" ) -func TestCpConcurrencyValidation(t *testing.T) { +func TestCp_concurrencyValidation(t *testing.T) { testCases := []struct { concurrency int wantError error @@ -28,3 +35,187 @@ func TestCpConcurrencyValidation(t *testing.T) { }) } } + +// mockFiler mocks filer.Filer. +type mockFiler struct { + write func(ctx context.Context, path string, r io.Reader, mode ...filer.WriteMode) error + read func(ctx context.Context, path string) (io.ReadCloser, error) + delete func(ctx context.Context, path string, mode ...filer.DeleteMode) error + readDir func(ctx context.Context, path string) ([]fs.DirEntry, error) + mkdir func(ctx context.Context, path string) error + stat func(ctx context.Context, path string) (fs.FileInfo, error) +} + +func (m *mockFiler) Write(ctx context.Context, path string, r io.Reader, mode ...filer.WriteMode) error { + if m.write == nil { + return nil + } + return m.write(ctx, path, r, mode...) +} + +func (m *mockFiler) Read(ctx context.Context, path string) (io.ReadCloser, error) { + if m.read == nil { + return nil, nil + } + return m.read(ctx, path) +} + +func (m *mockFiler) Delete(ctx context.Context, path string, mode ...filer.DeleteMode) error { + if m.delete == nil { + return nil + } + return m.delete(ctx, path, mode...) +} + +func (m *mockFiler) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) { + if m.readDir == nil { + return nil, nil + } + return m.readDir(ctx, path) +} + +func (m *mockFiler) Mkdir(ctx context.Context, path string) error { + if m.mkdir == nil { + return nil + } + return m.mkdir(ctx, path) +} + +func (m *mockFiler) Stat(ctx context.Context, path string) (fs.FileInfo, error) { + if m.stat == nil { + return nil, nil + } + return m.stat(ctx, path) +} + +// mockFileInfo mocks fs.FileInfo. +type mockFileInfo struct { + name string + isDir bool +} + +func (m mockFileInfo) Name() string { return m.name } +func (m mockFileInfo) Size() int64 { return 0 } +func (m mockFileInfo) Mode() fs.FileMode { return 0o644 } +func (m mockFileInfo) ModTime() time.Time { return time.Time{} } +func (m mockFileInfo) IsDir() bool { return m.isDir } +func (m mockFileInfo) Sys() any { return nil } + +// mockDirEntry mocks fs.DirEntry. +type mockDirEntry struct { + name string + isDir bool +} + +func (m mockDirEntry) Name() string { return m.name } +func (m mockDirEntry) IsDir() bool { return m.isDir } +func (m mockDirEntry) Type() fs.FileMode { return 0 } +func (m mockDirEntry) Info() (fs.FileInfo, error) { + return mockFileInfo(m), nil +} + +func TestCp_cpDirToDir_contextCancellation(t *testing.T) { + testError := errors.New("test error") + + // Mock the stats and readDir methods for a Filer over a file system that + // has the following directory structure: + // + // src/ + // ├── subdir/ + // ├── file1.txt + // ├── file2.txt + // └── file3.txt + // + mockSourceStat := func(ctx context.Context, path string) (fs.FileInfo, error) { + isDir := path == "src" || path == "src/subdir" + return mockFileInfo{name: path, isDir: isDir}, nil + } + mockSourceReadDir := func(ctx context.Context, path string) ([]fs.DirEntry, error) { + if path == "src" { + return []fs.DirEntry{ + mockDirEntry{name: "subdir", isDir: true}, + mockDirEntry{name: "file1.txt", isDir: false}, + mockDirEntry{name: "file2.txt", isDir: false}, + mockDirEntry{name: "file3.txt", isDir: false}, + }, nil + } + return nil, nil + } + + testCases := []struct { + desc string + c *copy + wantErr error + }{ + { + // The source filer's Read method blocks until context is cancelled, + // simulating a slow file copy operation. The target filer's Mkdir + // method returns an error which should cancel the walk and all file + // copy goroutines. + desc: "cancel go routines on walk error", + c: ©{ + recursive: true, + concurrency: 5, + sourceFiler: &mockFiler{ + stat: mockSourceStat, + readDir: mockSourceReadDir, + read: func(ctx context.Context, path string) (io.ReadCloser, error) { + <-ctx.Done() // block until context is cancelled + return nil, ctx.Err() + }, + }, + targetFiler: &mockFiler{ + mkdir: func(ctx context.Context, path string) error { + return testError + }, + }, + }, + wantErr: testError, + }, + { + // The target filer's Write method returns an error when writing the + // file1.txt file. This error is expected to be returned by the file copy + // goroutine and all other file copy goroutines should be cancelled. + desc: "cancel go routines on file copy error", + c: ©{ + recursive: true, + concurrency: 5, + sourceFiler: &mockFiler{ + stat: mockSourceStat, + readDir: mockSourceReadDir, + read: func(ctx context.Context, path string) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader("content")), nil + }, + }, + targetFiler: &mockFiler{ + write: func(ctx context.Context, path string, r io.Reader, mode ...filer.WriteMode) error { + if path == "dst/file1.txt" { + return testError + } + <-ctx.Done() // block until context is cancelled + return ctx.Err() + }, + }, + }, + wantErr: testError, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + done := make(chan error, 1) + go func() { + done <- tc.c.cpDirToDir(t.Context(), "src", "dst") + }() + + select { + case gotErr := <-done: + if !errors.Is(gotErr, tc.wantErr) { + t.Errorf("want error %v, got %v", tc.wantErr, gotErr) + } + case <-time.After(3 * time.Second): // do not wait too long in case of test issues + t.Fatal("cpDirToDir blocked instead of returning error immediately") + } + }) + } +} From 79dd600cbdfed80c2b2b1a79b1c9c6cb08bfd739 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 12 Dec 2025 12:30:25 +0000 Subject: [PATCH 08/10] Clarified rationale --- cmd/fs/cp.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/cmd/fs/cp.go b/cmd/fs/cp.go index 6149b4c611..333c550d37 100644 --- a/cmd/fs/cp.go +++ b/cmd/fs/cp.go @@ -38,7 +38,7 @@ type copy struct { mu sync.Mutex // protect output from concurrent writes } -// cpDirToDir recursively copies the contents of a directory to another +// cpDirToDir recursively copies the content of a directory to another // directory. // // There is no guarantee on the order in which the files are copied. @@ -51,13 +51,15 @@ func (c *copy) cpDirToDir(ctx context.Context, sourceDir, targetDir string) erro return fmt.Errorf("source path %s is a directory. Please specify the --recursive flag", sourceDir) } - // Create cancellable context to ensure cleanup and that all goroutines - // are stopped when the function exits on any error path (e.g. permission - // denied when walking the source directory). + // Create a cancellable context purely for the purpose of having a way to + // cancel the goroutines in case of error walking the directory. ctx, cancel := context.WithCancel(ctx) defer cancel() - // Pool of workers to process copy operations in parallel. + // Pool of workers to process copy operations in parallel. The created + // context is the real context for this operation. It is shared by the + // walking function and the goroutines and can be cancelled manually + // by calling the cancel() function of its parent context. g, ctx := errgroup.WithContext(ctx) g.SetLimit(c.concurrency) @@ -85,8 +87,10 @@ func (c *copy) cpDirToDir(ctx context.Context, sourceDir, targetDir string) erro return c.targetFiler.Mkdir(ctx, targetPath) } - // Queue file copy operation for processing. g.Go(func() error { + // Goroutines are queued and may start after the context is already + // cancelled (e.g. a prior copy failed). This check aims to avoid + // starting work that will inevitably fail. if ctx.Err() != nil { return ctx.Err() } From 2d1f19da1533c741fa61fd323b7ce731f0ee348c Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 12 Dec 2025 12:54:48 +0000 Subject: [PATCH 09/10] Gracefully wait for cancellation to complete --- cmd/fs/cp.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/fs/cp.go b/cmd/fs/cp.go index 333c550d37..f34cf2882e 100644 --- a/cmd/fs/cp.go +++ b/cmd/fs/cp.go @@ -99,7 +99,9 @@ func (c *copy) cpDirToDir(ctx context.Context, sourceDir, targetDir string) erro return nil }) if err != nil { - return err + cancel() // cancel the goroutines + _ = g.Wait() // wait for the goroutines to finish + return err // return the "real" error that led to cancellation } return g.Wait() } From 12dec5c902d9b074a3d976b82c258d39ad787726 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 15 Dec 2025 09:01:36 +0000 Subject: [PATCH 10/10] Add acceptance tests for fs cp --- acceptance/cmd/fs/cp/dir-to-dir/out.test.toml | 5 +++ acceptance/cmd/fs/cp/dir-to-dir/output.txt | 2 ++ acceptance/cmd/fs/cp/dir-to-dir/script | 6 ++++ acceptance/cmd/fs/cp/dir-to-dir/test.toml | 20 +++++++++++ .../cmd/fs/cp/file-to-dir/out.test.toml | 5 +++ acceptance/cmd/fs/cp/file-to-dir/output.txt | 3 ++ acceptance/cmd/fs/cp/file-to-dir/script | 4 +++ acceptance/cmd/fs/cp/file-to-dir/test.toml | 12 +++++++ .../cmd/fs/cp/file-to-file/out.test.toml | 5 +++ acceptance/cmd/fs/cp/file-to-file/output.txt | 7 ++++ acceptance/cmd/fs/cp/file-to-file/script | 9 +++++ acceptance/cmd/fs/cp/file-to-file/test.toml | 34 +++++++++++++++++++ .../cmd/fs/cp/input-validation/out.test.toml | 5 +++ .../cmd/fs/cp/input-validation/output.txt | 10 ++++++ acceptance/cmd/fs/cp/input-validation/script | 3 ++ .../cmd/fs/cp/input-validation/test.toml | 2 ++ cmd/fs/cp.go | 6 ++-- cmd/fs/cp_test.go | 24 ------------- 18 files changed, 136 insertions(+), 26 deletions(-) create mode 100644 acceptance/cmd/fs/cp/dir-to-dir/out.test.toml create mode 100644 acceptance/cmd/fs/cp/dir-to-dir/output.txt create mode 100644 acceptance/cmd/fs/cp/dir-to-dir/script create mode 100644 acceptance/cmd/fs/cp/dir-to-dir/test.toml create mode 100644 acceptance/cmd/fs/cp/file-to-dir/out.test.toml create mode 100644 acceptance/cmd/fs/cp/file-to-dir/output.txt create mode 100644 acceptance/cmd/fs/cp/file-to-dir/script create mode 100644 acceptance/cmd/fs/cp/file-to-dir/test.toml create mode 100644 acceptance/cmd/fs/cp/file-to-file/out.test.toml create mode 100644 acceptance/cmd/fs/cp/file-to-file/output.txt create mode 100644 acceptance/cmd/fs/cp/file-to-file/script create mode 100644 acceptance/cmd/fs/cp/file-to-file/test.toml create mode 100644 acceptance/cmd/fs/cp/input-validation/out.test.toml create mode 100644 acceptance/cmd/fs/cp/input-validation/output.txt create mode 100644 acceptance/cmd/fs/cp/input-validation/script create mode 100644 acceptance/cmd/fs/cp/input-validation/test.toml diff --git a/acceptance/cmd/fs/cp/dir-to-dir/out.test.toml b/acceptance/cmd/fs/cp/dir-to-dir/out.test.toml new file mode 100644 index 0000000000..d560f1de04 --- /dev/null +++ b/acceptance/cmd/fs/cp/dir-to-dir/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/cmd/fs/cp/dir-to-dir/output.txt b/acceptance/cmd/fs/cp/dir-to-dir/output.txt new file mode 100644 index 0000000000..9b708fb9f0 --- /dev/null +++ b/acceptance/cmd/fs/cp/dir-to-dir/output.txt @@ -0,0 +1,2 @@ +localdir/file1.txt -> dbfs:/Volumes/main/default/data/uploaded-dir/file1.txt +localdir/file2.txt -> dbfs:/Volumes/main/default/data/uploaded-dir/file2.txt diff --git a/acceptance/cmd/fs/cp/dir-to-dir/script b/acceptance/cmd/fs/cp/dir-to-dir/script new file mode 100644 index 0000000000..6f0b8510b0 --- /dev/null +++ b/acceptance/cmd/fs/cp/dir-to-dir/script @@ -0,0 +1,6 @@ +mkdir -p localdir +echo -n "file1 content" > localdir/file1.txt +echo -n "file2 content" > localdir/file2.txt + +# Recursive directory copy (output sorted for deterministic ordering). +$CLI fs cp -r localdir dbfs:/Volumes/main/default/data/uploaded-dir 2>&1 | sort diff --git a/acceptance/cmd/fs/cp/dir-to-dir/test.toml b/acceptance/cmd/fs/cp/dir-to-dir/test.toml new file mode 100644 index 0000000000..10da309c5b --- /dev/null +++ b/acceptance/cmd/fs/cp/dir-to-dir/test.toml @@ -0,0 +1,20 @@ +Local = true +Cloud = false +Ignore = ["localdir"] + +# Recursive copy: localdir/ -> uploaded-dir/. +[[Server]] +Pattern = "PUT /api/2.0/fs/directories/Volumes/main/default/data/uploaded-dir" +Response.StatusCode = 200 + +[[Server]] +Pattern = "HEAD /api/2.0/fs/directories/Volumes/main/default/data/uploaded-dir" +Response.StatusCode = 200 + +[[Server]] +Pattern = "PUT /api/2.0/fs/files/Volumes/main/default/data/uploaded-dir/file1.txt" +Response.StatusCode = 200 + +[[Server]] +Pattern = "PUT /api/2.0/fs/files/Volumes/main/default/data/uploaded-dir/file2.txt" +Response.StatusCode = 200 diff --git a/acceptance/cmd/fs/cp/file-to-dir/out.test.toml b/acceptance/cmd/fs/cp/file-to-dir/out.test.toml new file mode 100644 index 0000000000..d560f1de04 --- /dev/null +++ b/acceptance/cmd/fs/cp/file-to-dir/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/cmd/fs/cp/file-to-dir/output.txt b/acceptance/cmd/fs/cp/file-to-dir/output.txt new file mode 100644 index 0000000000..1bbd3d2cad --- /dev/null +++ b/acceptance/cmd/fs/cp/file-to-dir/output.txt @@ -0,0 +1,3 @@ + +>>> [CLI] fs cp local.txt dbfs:/Volumes/main/default/data/mydir/ +local.txt -> dbfs:/Volumes/main/default/data/mydir/local.txt diff --git a/acceptance/cmd/fs/cp/file-to-dir/script b/acceptance/cmd/fs/cp/file-to-dir/script new file mode 100644 index 0000000000..d21baf28bd --- /dev/null +++ b/acceptance/cmd/fs/cp/file-to-dir/script @@ -0,0 +1,4 @@ +echo -n "hello world!" > local.txt + +# Copy file into a directory (trailing slash indicates directory target). +trace $CLI fs cp local.txt dbfs:/Volumes/main/default/data/mydir/ diff --git a/acceptance/cmd/fs/cp/file-to-dir/test.toml b/acceptance/cmd/fs/cp/file-to-dir/test.toml new file mode 100644 index 0000000000..d8c7892808 --- /dev/null +++ b/acceptance/cmd/fs/cp/file-to-dir/test.toml @@ -0,0 +1,12 @@ +Local = true +Cloud = false +Ignore = ["local.txt"] + +# Copy file into existing directory: local.txt -> mydir/local.txt. +[[Server]] +Pattern = "HEAD /api/2.0/fs/directories/Volumes/main/default/data/mydir" +Response.StatusCode = 200 + +[[Server]] +Pattern = "PUT /api/2.0/fs/files/Volumes/main/default/data/mydir/local.txt" +Response.StatusCode = 200 diff --git a/acceptance/cmd/fs/cp/file-to-file/out.test.toml b/acceptance/cmd/fs/cp/file-to-file/out.test.toml new file mode 100644 index 0000000000..d560f1de04 --- /dev/null +++ b/acceptance/cmd/fs/cp/file-to-file/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/cmd/fs/cp/file-to-file/output.txt b/acceptance/cmd/fs/cp/file-to-file/output.txt new file mode 100644 index 0000000000..93b9425293 --- /dev/null +++ b/acceptance/cmd/fs/cp/file-to-file/output.txt @@ -0,0 +1,7 @@ + +>>> [CLI] fs cp local.txt dbfs:/Volumes/main/default/data/uploaded.txt +local.txt -> dbfs:/Volumes/main/default/data/uploaded.txt + +>>> [CLI] fs cp dbfs:/Volumes/main/default/data/remote.txt downloaded.txt +dbfs:/Volumes/main/default/data/remote.txt -> downloaded.txt +content from volume \ No newline at end of file diff --git a/acceptance/cmd/fs/cp/file-to-file/script b/acceptance/cmd/fs/cp/file-to-file/script new file mode 100644 index 0000000000..510ec04618 --- /dev/null +++ b/acceptance/cmd/fs/cp/file-to-file/script @@ -0,0 +1,9 @@ +echo -n "hello world!" > local.txt + +# Upload local file to volume. +trace $CLI fs cp local.txt dbfs:/Volumes/main/default/data/uploaded.txt + +# Download file from volume to local. +trace $CLI fs cp dbfs:/Volumes/main/default/data/remote.txt downloaded.txt + +cat downloaded.txt diff --git a/acceptance/cmd/fs/cp/file-to-file/test.toml b/acceptance/cmd/fs/cp/file-to-file/test.toml new file mode 100644 index 0000000000..e083bbfdbc --- /dev/null +++ b/acceptance/cmd/fs/cp/file-to-file/test.toml @@ -0,0 +1,34 @@ +Local = true +Cloud = false +Ignore = ["local.txt", "downloaded.txt"] + +# Upload: local.txt -> dbfs:/Volumes/.../uploaded.txt. +[[Server]] +Pattern = "HEAD /api/2.0/fs/directories/Volumes/main/default/data/uploaded.txt" +Response.StatusCode = 404 + +[[Server]] +Pattern = "HEAD /api/2.0/fs/files/Volumes/main/default/data/uploaded.txt" +Response.StatusCode = 404 + +[[Server]] +Pattern = "HEAD /api/2.0/fs/directories/Volumes/main/default/data" +Response.StatusCode = 200 + +[[Server]] +Pattern = "PUT /api/2.0/fs/files/Volumes/main/default/data/uploaded.txt" +Response.StatusCode = 200 + +# Download: dbfs:/Volumes/.../remote.txt -> downloaded.txt. +[[Server]] +Pattern = "HEAD /api/2.0/fs/directories/Volumes/main/default/data/remote.txt" +Response.StatusCode = 404 + +[[Server]] +Pattern = "HEAD /api/2.0/fs/files/Volumes/main/default/data/remote.txt" +Response.StatusCode = 200 + +[[Server]] +Pattern = "GET /api/2.0/fs/files/Volumes/main/default/data/remote.txt" +Response.StatusCode = 200 +Response.Body = "content from volume" diff --git a/acceptance/cmd/fs/cp/input-validation/out.test.toml b/acceptance/cmd/fs/cp/input-validation/out.test.toml new file mode 100644 index 0000000000..d560f1de04 --- /dev/null +++ b/acceptance/cmd/fs/cp/input-validation/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/cmd/fs/cp/input-validation/output.txt b/acceptance/cmd/fs/cp/input-validation/output.txt new file mode 100644 index 0000000000..febe55b74e --- /dev/null +++ b/acceptance/cmd/fs/cp/input-validation/output.txt @@ -0,0 +1,10 @@ + +>>> errcode [CLI] fs cp src dst --concurrency -1 +Error: --concurrency must be at least 1 + +Exit code: 1 + +>>> errcode [CLI] fs cp src dst --concurrency 0 +Error: --concurrency must be at least 1 + +Exit code: 1 diff --git a/acceptance/cmd/fs/cp/input-validation/script b/acceptance/cmd/fs/cp/input-validation/script new file mode 100644 index 0000000000..a5e8cec862 --- /dev/null +++ b/acceptance/cmd/fs/cp/input-validation/script @@ -0,0 +1,3 @@ +# Invalid concurrency values should fail. +trace errcode $CLI fs cp src dst --concurrency -1 +trace errcode $CLI fs cp src dst --concurrency 0 diff --git a/acceptance/cmd/fs/cp/input-validation/test.toml b/acceptance/cmd/fs/cp/input-validation/test.toml new file mode 100644 index 0000000000..7d36fb9dc1 --- /dev/null +++ b/acceptance/cmd/fs/cp/input-validation/test.toml @@ -0,0 +1,2 @@ +Local = true +Cloud = false diff --git a/cmd/fs/cp.go b/cmd/fs/cp.go index f34cf2882e..d7ae651753 100644 --- a/cmd/fs/cp.go +++ b/cmd/fs/cp.go @@ -18,8 +18,10 @@ import ( "golang.org/x/sync/errgroup" ) -// Default number of concurrent file copy operations. -const defaultConcurrency = 16 +// Default number of concurrent file copy operations. This is a conservative +// default that should be sufficient to fully utilize the available bandwidth +// in most cases. +const defaultConcurrency = 8 // errInvalidConcurrency is returned when the value of the concurrency // flag is invalid. diff --git a/cmd/fs/cp_test.go b/cmd/fs/cp_test.go index 98de33c74c..b50ee0658f 100644 --- a/cmd/fs/cp_test.go +++ b/cmd/fs/cp_test.go @@ -3,10 +3,8 @@ package fs import ( "context" "errors" - "fmt" "io" "io/fs" - "strconv" "strings" "testing" "time" @@ -14,28 +12,6 @@ import ( "github.com/databricks/cli/libs/filer" ) -func TestCp_concurrencyValidation(t *testing.T) { - testCases := []struct { - concurrency int - wantError error - }{ - {-1337, errInvalidConcurrency}, - {-1, errInvalidConcurrency}, - {0, errInvalidConcurrency}, - } - - for _, tc := range testCases { - t.Run(fmt.Sprintf("concurrency=%d", tc.concurrency), func(t *testing.T) { - cmd := newCpCommand() - cmd.SetArgs([]string{"src", "dst", "--concurrency", strconv.Itoa(tc.concurrency)}) - err := cmd.Execute() - if !errors.Is(err, tc.wantError) { - t.Errorf("expected error %v, got %v", tc.wantError, err) - } - }) - } -} - // mockFiler mocks filer.Filer. type mockFiler struct { write func(ctx context.Context, path string, r io.Reader, mode ...filer.WriteMode) error