From e066bb86481db855545dfd818df17a7d25fb37b6 Mon Sep 17 00:00:00 2001 From: songlq <31207055+songlonqi-java@users.noreply.github.com> Date: Wed, 27 May 2026 15:55:47 +0800 Subject: [PATCH 1/2] fix(diskcache): restore writer after rotate failure --- diskcache/diskcache.go | 2 +- diskcache/get.go | 16 +++++------ diskcache/put.go | 21 ++++++++++---- diskcache/rotate.go | 27 +++++++++++++++-- diskcache/rotate_test.go | 62 ++++++++++++++++++++++++++++++++++++++++ diskcache/write_file.go | 45 +++++++++++++++++++++++++++++ 6 files changed, 155 insertions(+), 18 deletions(-) create mode 100644 diskcache/write_file.go diff --git a/diskcache/diskcache.go b/diskcache/diskcache.go index 3fb4c9da..dfc56dd5 100644 --- a/diskcache/diskcache.go +++ b/diskcache/diskcache.go @@ -164,7 +164,7 @@ func (c *DiskCache) Pretty() string { } if c.rfd != nil { - arr = append(arr, "cur-read: "+c.rfd.Name()) + arr = append(arr, "cur-read: "+c.readFileName()) } else { arr = append(arr, "no-Get()") } diff --git a/diskcache/get.go b/diskcache/get.go index 61b8d01e..fe40bcb9 100644 --- a/diskcache/get.go +++ b/diskcache/get.go @@ -113,13 +113,13 @@ retry: if n, err = c.rfd.Read(c.batchHeader); err != nil || n != dataHeaderLen { if err != nil && !errors.Is(err, io.EOF) { l.Errorf("read %d bytes header error: %s", dataHeaderLen, err.Error()) - err = WrapFileOperationError(OpRead, err, c.path, c.rfd.Name()). + err = WrapFileOperationError(OpRead, err, c.path, c.readFileName()). WithDetails(fmt.Sprintf("header_read: expected=%d, actual=%d", dataHeaderLen, n)) } else if n > 0 && n != dataHeaderLen { l.Errorf("invalid header length: %d, expect %d", n, dataHeaderLen) err = NewCacheError(OpRead, ErrUnexpectedReadSize, fmt.Sprintf("header_size_mismatch: expected=%d, actual=%d", dataHeaderLen, n)). - WithPath(c.path).WithFile(c.rfd.Name()) + WithPath(c.path).WithFile(c.readFileName()) } // On bad datafile, just ignore and delete the file. @@ -135,7 +135,7 @@ retry: if uint32(nbytes) == EOFHint { // EOF if err := c.switchNextFile(); err != nil { - return WrapGetError(err, c.path, c.rfd.Name()). + return WrapGetError(err, c.path, c.readFileName()). WithDetails("eof_encountered_during_get") } @@ -156,23 +156,23 @@ retry: if len(readbuf) < nbytes { // seek to next read position if x, err := c.rfd.Seek(int64(nbytes), io.SeekCurrent); err != nil { - return WrapFileOperationError(OpSeek, err, c.path, c.rfd.Name()). + return WrapFileOperationError(OpSeek, err, c.path, c.readFileName()). WithDetails(fmt.Sprintf("failed_to_seek_past_data: data_size=%d", nbytes)) } else { l.Warnf("got %d bytes to buffer with len %d, seek to new read position %d, drop %d bytes within file %s", nbytes, len(readbuf), x, nbytes, c.curReadfile) droppedDataVec.WithLabelValues(c.path, reasonTooSmallReadBuffer).Observe(float64(nbytes)) - return WrapGetError(ErrTooSmallReadBuf, c.path, c.rfd.Name()). + return WrapGetError(ErrTooSmallReadBuf, c.path, c.readFileName()). WithDetails(fmt.Sprintf("buffer_too_small: required=%d, provided=%d", nbytes, len(readbuf))) } } if n, err := c.rfd.Read(readbuf[:nbytes]); err != nil { - return WrapFileOperationError(OpRead, err, c.path, c.rfd.Name()). + return WrapFileOperationError(OpRead, err, c.path, c.readFileName()). WithDetails(fmt.Sprintf("data_read: expected=%d, actual=%d", nbytes, n)) } else if n != nbytes { - return WrapGetError(ErrUnexpectedReadSize, c.path, c.rfd.Name()). + return WrapGetError(ErrUnexpectedReadSize, c.path, c.readFileName()). WithDetails(fmt.Sprintf("partial_read: expected=%d, actual=%d", nbytes, n)) } @@ -184,7 +184,7 @@ retry: // seek back if !c.noFallbackOnError { if _, serr := c.rfd.Seek(-int64(dataHeaderLen+nbytes), io.SeekCurrent); serr != nil { - return WrapFileOperationError(OpSeek, serr, c.path, c.rfd.Name()). + return WrapFileOperationError(OpSeek, serr, c.path, c.readFileName()). WithDetails(fmt.Sprintf("fallback_seek_failed: offset=%d", -int64(dataHeaderLen+nbytes))) } diff --git a/diskcache/put.go b/diskcache/put.go index 4c5d370e..bfd47be3 100644 --- a/diskcache/put.go +++ b/diskcache/put.go @@ -51,22 +51,26 @@ func (c *DiskCache) Put(data []byte) error { fmt.Sprintf("max_size=%d, actual_size=%d", c.maxDataSize, len(data))) } + if err := c.ensureWriteFile(); err != nil { + return WrapPutError(err, c.path, len(data)).WithDetails("failed_to_open_write_file") + } + hdr := make([]byte, dataHeaderLen) binary.LittleEndian.PutUint32(hdr, uint32(len(data))) if _, err := c.wfd.Write(hdr); err != nil { - return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()). + return WrapFileOperationError(OpWrite, err, c.path, c.writeFileName()). WithDetails("failed_to_write_header") } if _, err := c.wfd.Write(data); err != nil { - return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()). + return WrapFileOperationError(OpWrite, err, c.path, c.writeFileName()). WithDetails("failed_to_write_data") } if !c.noSync { if err := c.wfd.Sync(); err != nil { - return WrapFileOperationError(OpSync, err, c.path, c.wfd.Name()). + return WrapFileOperationError(OpSync, err, c.path, c.writeFileName()). WithDetails("failed_to_sync_write") } } @@ -116,15 +120,20 @@ func (c *DiskCache) StreamPut(r io.Reader, size int) error { fmt.Sprintf("size_exceeded: max=%d, actual=%d", c.maxDataSize, size)).WithPath(c.path) } + if err := c.ensureWriteFile(); err != nil { + return NewCacheError(OpStreamPut, err, "failed_to_open_write_file"). + WithPath(c.path) + } + if startOffset, err = c.wfd.Seek(0, io.SeekCurrent); err != nil { - return WrapFileOperationError(OpSeek, err, c.path, c.wfd.Name()). + return WrapFileOperationError(OpSeek, err, c.path, c.writeFileName()). WithDetails("failed_to_get_current_position") } defer func() { if total > 0 && err != nil { // fallback to origin position if _, serr := c.wfd.Seek(startOffset, io.SeekStart); serr != nil { - c.LastErr = WrapFileOperationError(OpSeek, serr, c.path, c.wfd.Name()). + c.LastErr = WrapFileOperationError(OpSeek, serr, c.path, c.writeFileName()). WithDetails(fmt.Sprintf("failed_to_fallback_to_position_%d", startOffset)) } } @@ -135,7 +144,7 @@ func (c *DiskCache) StreamPut(r io.Reader, size int) error { if size > 0 { binary.LittleEndian.PutUint32(c.batchHeader, uint32(size)) if _, err := c.wfd.Write(c.batchHeader); err != nil { - return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()). + return WrapFileOperationError(OpWrite, err, c.path, c.writeFileName()). WithDetails("failed_to_write_stream_header") } } diff --git a/diskcache/rotate.go b/diskcache/rotate.go index 6637e46a..f14076fe 100644 --- a/diskcache/rotate.go +++ b/diskcache/rotate.go @@ -34,10 +34,16 @@ func (c *DiskCache) rotate() error { datafilesVec.WithLabelValues(c.path).Set(float64(len(c.dataFiles))) }() + if err := c.ensureWriteFile(); err != nil { + return NewCacheError(OpRotate, err, "failed_to_open_write_file_before_rotate"). + WithPath(c.path) + } + + batchSizeBeforeEOF := c.curBatchSize eof := make([]byte, dataHeaderLen) binary.LittleEndian.PutUint32(eof, EOFHint) if _, err := c.wfd.Write(eof); err != nil { // append EOF to file end - return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()). + return WrapFileOperationError(OpWrite, err, c.path, c.writeFileName()). WithDetails("failed_to_write_eof_marker_during_rotate") } @@ -69,13 +75,28 @@ func (c *DiskCache) rotate() error { // close current writing file if err := c.wfd.Close(); err != nil { - return WrapFileOperationError(OpClose, err, c.path, c.wfd.Name()). + return WrapFileOperationError(OpClose, err, c.path, c.writeFileName()). WithDetails("failed_to_close_write_file_during_rotate") } c.wfd = nil // rename data -> data.0004 if err := os.Rename(c.curWriteFile, newfile); err != nil { + if fi, statErr := os.Stat(c.curWriteFile); statErr == nil && !fi.IsDir() { + if truncErr := os.Truncate(c.curWriteFile, batchSizeBeforeEOF); truncErr != nil { + return NewCacheError(OpRotate, WrapRotateError(err, c.path, c.curWriteFile, newfile), + fmt.Sprintf("failed_to_restore_write_file_size_after_rename_failure: size=%d, error=%v", + batchSizeBeforeEOF, truncErr)). + WithPath(c.path) + } + } + + if openErr := c.openWriteFile(); openErr != nil { + return NewCacheError(OpRotate, WrapRotateError(err, c.path, c.curWriteFile, newfile), + fmt.Sprintf("failed_to_restore_write_file_after_rename_failure: %v", openErr)). + WithPath(c.path) + } + return WrapRotateError(err, c.path, c.curWriteFile, newfile). WithDetails("failed_to_rename_file_during_rotate") } @@ -116,7 +137,7 @@ func (c *DiskCache) removeCurrentReadingFile() error { if c.rfd != nil { if err := c.rfd.Close(); err != nil { - return WrapFileOperationError(OpClose, err, c.path, c.rfd.Name()). + return WrapFileOperationError(OpClose, err, c.path, c.readFileName()). WithDetails("failed_to_close_read_file_during_removal") } c.rfd = nil diff --git a/diskcache/rotate_test.go b/diskcache/rotate_test.go index e852ef88..f1ac22db 100644 --- a/diskcache/rotate_test.go +++ b/diskcache/rotate_test.go @@ -8,6 +8,9 @@ package diskcache import ( "bytes" "errors" + "os" + "path/filepath" + "runtime" T "testing" "github.com/stretchr/testify/assert" @@ -122,3 +125,62 @@ func TestRotate(t *T.T) { }) }) } + +func TestRotateRecoverWriteFileOnRenameFailure(t *T.T) { + if runtime.GOOS == "windows" { + t.Skip("removing an open file is not supported on windows") + } + + p := t.TempDir() + c, err := Open(WithPath(p), WithBatchSize(1024*1024)) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, c.Close()) + ResetMetrics() + }) + + require.NoError(t, c.Put([]byte("before"))) + require.NoError(t, os.Remove(c.curWriteFile)) + + require.Error(t, c.Rotate()) + require.NotNil(t, c.wfd) + require.NoError(t, c.Put([]byte("after"))) +} + +func TestRotateTruncateEOFOnRenameFailure(t *T.T) { + p := t.TempDir() + c, err := Open(WithPath(p), WithBatchSize(1024*1024)) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, c.Close()) + ResetMetrics() + }) + + newfile := filepath.Join(p, "data.00000000000000000000000000000000") + require.NoError(t, os.Mkdir(newfile, 0o755)) + + before := []byte("before") + after := []byte("after") + + require.NoError(t, c.Put(before)) + require.Error(t, c.Rotate()) + require.NotNil(t, c.wfd) + + require.NoError(t, os.Remove(newfile)) + require.NoError(t, c.Put(after)) + require.NoError(t, c.Rotate()) + + var got [][]byte + for { + err := c.Get(func(data []byte) error { + got = append(got, append([]byte(nil), data...)) + return nil + }) + if errors.Is(err, ErrNoData) { + break + } + require.NoError(t, err) + } + + require.Equal(t, [][]byte{before, after}, got) +} diff --git a/diskcache/write_file.go b/diskcache/write_file.go new file mode 100644 index 00000000..378c4081 --- /dev/null +++ b/diskcache/write_file.go @@ -0,0 +1,45 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package diskcache + +import "os" + +func (c *DiskCache) writeFileName() string { + if c.wfd != nil { + return c.wfd.Name() + } + + if c.curWriteFile != "" { + return c.curWriteFile + } + + return c.path +} + +func (c *DiskCache) readFileName() string { + if c.rfd != nil { + return c.rfd.Name() + } + + if c.curReadfile != "" { + return c.curReadfile + } + + return c.path +} + +func (c *DiskCache) ensureWriteFile() error { + if c.wfd != nil { + return nil + } + + if c.curWriteFile == "" { + return WrapFileOperationError(OpCreate, os.ErrInvalid, c.path, c.writeFileName()). + WithDetails("write_file_path_not_set") + } + + return c.openWriteFile() +} From de5375eb78303f2c3031536ca69c23ea18eae0a8 Mon Sep 17 00:00:00 2001 From: songlq <31207055+songlonqi-java@users.noreply.github.com> Date: Wed, 27 May 2026 15:55:47 +0800 Subject: [PATCH 2/2] fix(diskcache): restore writer after rotate failure --- diskcache/diskcache.go | 2 +- diskcache/get.go | 16 +++++------ diskcache/put.go | 21 ++++++++++---- diskcache/rotate.go | 58 +++++++++++++++++++++++++++---------- diskcache/rotate_test.go | 62 ++++++++++++++++++++++++++++++++++++++++ diskcache/write_file.go | 45 +++++++++++++++++++++++++++++ 6 files changed, 174 insertions(+), 30 deletions(-) create mode 100644 diskcache/write_file.go diff --git a/diskcache/diskcache.go b/diskcache/diskcache.go index 3fb4c9da..dfc56dd5 100644 --- a/diskcache/diskcache.go +++ b/diskcache/diskcache.go @@ -164,7 +164,7 @@ func (c *DiskCache) Pretty() string { } if c.rfd != nil { - arr = append(arr, "cur-read: "+c.rfd.Name()) + arr = append(arr, "cur-read: "+c.readFileName()) } else { arr = append(arr, "no-Get()") } diff --git a/diskcache/get.go b/diskcache/get.go index 61b8d01e..fe40bcb9 100644 --- a/diskcache/get.go +++ b/diskcache/get.go @@ -113,13 +113,13 @@ retry: if n, err = c.rfd.Read(c.batchHeader); err != nil || n != dataHeaderLen { if err != nil && !errors.Is(err, io.EOF) { l.Errorf("read %d bytes header error: %s", dataHeaderLen, err.Error()) - err = WrapFileOperationError(OpRead, err, c.path, c.rfd.Name()). + err = WrapFileOperationError(OpRead, err, c.path, c.readFileName()). WithDetails(fmt.Sprintf("header_read: expected=%d, actual=%d", dataHeaderLen, n)) } else if n > 0 && n != dataHeaderLen { l.Errorf("invalid header length: %d, expect %d", n, dataHeaderLen) err = NewCacheError(OpRead, ErrUnexpectedReadSize, fmt.Sprintf("header_size_mismatch: expected=%d, actual=%d", dataHeaderLen, n)). - WithPath(c.path).WithFile(c.rfd.Name()) + WithPath(c.path).WithFile(c.readFileName()) } // On bad datafile, just ignore and delete the file. @@ -135,7 +135,7 @@ retry: if uint32(nbytes) == EOFHint { // EOF if err := c.switchNextFile(); err != nil { - return WrapGetError(err, c.path, c.rfd.Name()). + return WrapGetError(err, c.path, c.readFileName()). WithDetails("eof_encountered_during_get") } @@ -156,23 +156,23 @@ retry: if len(readbuf) < nbytes { // seek to next read position if x, err := c.rfd.Seek(int64(nbytes), io.SeekCurrent); err != nil { - return WrapFileOperationError(OpSeek, err, c.path, c.rfd.Name()). + return WrapFileOperationError(OpSeek, err, c.path, c.readFileName()). WithDetails(fmt.Sprintf("failed_to_seek_past_data: data_size=%d", nbytes)) } else { l.Warnf("got %d bytes to buffer with len %d, seek to new read position %d, drop %d bytes within file %s", nbytes, len(readbuf), x, nbytes, c.curReadfile) droppedDataVec.WithLabelValues(c.path, reasonTooSmallReadBuffer).Observe(float64(nbytes)) - return WrapGetError(ErrTooSmallReadBuf, c.path, c.rfd.Name()). + return WrapGetError(ErrTooSmallReadBuf, c.path, c.readFileName()). WithDetails(fmt.Sprintf("buffer_too_small: required=%d, provided=%d", nbytes, len(readbuf))) } } if n, err := c.rfd.Read(readbuf[:nbytes]); err != nil { - return WrapFileOperationError(OpRead, err, c.path, c.rfd.Name()). + return WrapFileOperationError(OpRead, err, c.path, c.readFileName()). WithDetails(fmt.Sprintf("data_read: expected=%d, actual=%d", nbytes, n)) } else if n != nbytes { - return WrapGetError(ErrUnexpectedReadSize, c.path, c.rfd.Name()). + return WrapGetError(ErrUnexpectedReadSize, c.path, c.readFileName()). WithDetails(fmt.Sprintf("partial_read: expected=%d, actual=%d", nbytes, n)) } @@ -184,7 +184,7 @@ retry: // seek back if !c.noFallbackOnError { if _, serr := c.rfd.Seek(-int64(dataHeaderLen+nbytes), io.SeekCurrent); serr != nil { - return WrapFileOperationError(OpSeek, serr, c.path, c.rfd.Name()). + return WrapFileOperationError(OpSeek, serr, c.path, c.readFileName()). WithDetails(fmt.Sprintf("fallback_seek_failed: offset=%d", -int64(dataHeaderLen+nbytes))) } diff --git a/diskcache/put.go b/diskcache/put.go index 4c5d370e..bfd47be3 100644 --- a/diskcache/put.go +++ b/diskcache/put.go @@ -51,22 +51,26 @@ func (c *DiskCache) Put(data []byte) error { fmt.Sprintf("max_size=%d, actual_size=%d", c.maxDataSize, len(data))) } + if err := c.ensureWriteFile(); err != nil { + return WrapPutError(err, c.path, len(data)).WithDetails("failed_to_open_write_file") + } + hdr := make([]byte, dataHeaderLen) binary.LittleEndian.PutUint32(hdr, uint32(len(data))) if _, err := c.wfd.Write(hdr); err != nil { - return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()). + return WrapFileOperationError(OpWrite, err, c.path, c.writeFileName()). WithDetails("failed_to_write_header") } if _, err := c.wfd.Write(data); err != nil { - return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()). + return WrapFileOperationError(OpWrite, err, c.path, c.writeFileName()). WithDetails("failed_to_write_data") } if !c.noSync { if err := c.wfd.Sync(); err != nil { - return WrapFileOperationError(OpSync, err, c.path, c.wfd.Name()). + return WrapFileOperationError(OpSync, err, c.path, c.writeFileName()). WithDetails("failed_to_sync_write") } } @@ -116,15 +120,20 @@ func (c *DiskCache) StreamPut(r io.Reader, size int) error { fmt.Sprintf("size_exceeded: max=%d, actual=%d", c.maxDataSize, size)).WithPath(c.path) } + if err := c.ensureWriteFile(); err != nil { + return NewCacheError(OpStreamPut, err, "failed_to_open_write_file"). + WithPath(c.path) + } + if startOffset, err = c.wfd.Seek(0, io.SeekCurrent); err != nil { - return WrapFileOperationError(OpSeek, err, c.path, c.wfd.Name()). + return WrapFileOperationError(OpSeek, err, c.path, c.writeFileName()). WithDetails("failed_to_get_current_position") } defer func() { if total > 0 && err != nil { // fallback to origin position if _, serr := c.wfd.Seek(startOffset, io.SeekStart); serr != nil { - c.LastErr = WrapFileOperationError(OpSeek, serr, c.path, c.wfd.Name()). + c.LastErr = WrapFileOperationError(OpSeek, serr, c.path, c.writeFileName()). WithDetails(fmt.Sprintf("failed_to_fallback_to_position_%d", startOffset)) } } @@ -135,7 +144,7 @@ func (c *DiskCache) StreamPut(r io.Reader, size int) error { if size > 0 { binary.LittleEndian.PutUint32(c.batchHeader, uint32(size)) if _, err := c.wfd.Write(c.batchHeader); err != nil { - return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()). + return WrapFileOperationError(OpWrite, err, c.path, c.writeFileName()). WithDetails("failed_to_write_stream_header") } } diff --git a/diskcache/rotate.go b/diskcache/rotate.go index 6637e46a..1473525e 100644 --- a/diskcache/rotate.go +++ b/diskcache/rotate.go @@ -34,10 +34,16 @@ func (c *DiskCache) rotate() error { datafilesVec.WithLabelValues(c.path).Set(float64(len(c.dataFiles))) }() + if err := c.ensureWriteFile(); err != nil { + return NewCacheError(OpRotate, err, "failed_to_open_write_file_before_rotate"). + WithPath(c.path) + } + + batchSizeBeforeEOF := c.curBatchSize eof := make([]byte, dataHeaderLen) binary.LittleEndian.PutUint32(eof, EOFHint) if _, err := c.wfd.Write(eof); err != nil { // append EOF to file end - return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()). + return WrapFileOperationError(OpWrite, err, c.path, c.writeFileName()). WithDetails("failed_to_write_eof_marker_during_rotate") } @@ -69,39 +75,61 @@ func (c *DiskCache) rotate() error { // close current writing file if err := c.wfd.Close(); err != nil { - return WrapFileOperationError(OpClose, err, c.path, c.wfd.Name()). + return WrapFileOperationError(OpClose, err, c.path, c.writeFileName()). WithDetails("failed_to_close_write_file_during_rotate") } c.wfd = nil // rename data -> data.0004 + renamed := true + var rotateErr error if err := os.Rename(c.curWriteFile, newfile); err != nil { - return WrapRotateError(err, c.path, c.curWriteFile, newfile). + renamed = false + rotateErr = WrapRotateError(err, c.path, c.curWriteFile, newfile). WithDetails("failed_to_rename_file_during_rotate") + + if fi, statErr := os.Stat(c.curWriteFile); statErr == nil && !fi.IsDir() { + if truncErr := os.Truncate(c.curWriteFile, batchSizeBeforeEOF); truncErr != nil { + rotateErr = NewCacheError(OpRotate, rotateErr, + fmt.Sprintf("failed_to_restore_write_file_size_after_rename_failure: size=%d, error=%v", + batchSizeBeforeEOF, truncErr)). + WithPath(c.path) + } + } } // new file added, add it's size to cache size - if fi, err := os.Stat(newfile); err == nil { - if fi.Size() > dataHeaderLen { - c.size.Add(fi.Size()) - sizeVec.WithLabelValues(c.path).Add(float64(fi.Size())) - putBytesVec.WithLabelValues(c.path).Observe(float64(fi.Size())) + if renamed { + if fi, err := os.Stat(newfile); err == nil { + if fi.Size() > dataHeaderLen { + c.size.Add(fi.Size()) + sizeVec.WithLabelValues(c.path).Add(float64(fi.Size())) + putBytesVec.WithLabelValues(c.path).Observe(float64(fi.Size())) + } + } else { + // Non-critical error: log but don't fail rotation + l.Warnf("failed to stat rotated file %s: %v", newfile, err) } + + c.dataFiles = append(c.dataFiles, newfile) + sort.Strings(c.dataFiles) } else { - // Non-critical error: log but don't fail rotation - l.Warnf("failed to stat rotated file %s: %v", newfile, err) + l.Errorf("%s", rotateErr) } - c.dataFiles = append(c.dataFiles, newfile) - sort.Strings(c.dataFiles) - // reopen new write file if err := c.openWriteFile(); err != nil { + if rotateErr != nil { + return NewCacheError(OpRotate, rotateErr, + fmt.Sprintf("failed_to_open_write_file_after_rotate_error: %v", err)). + WithPath(c.path) + } + return NewCacheError(OpRotate, err, "failed_to_open_new_write_file"). WithPath(c.path) } - return nil + return rotateErr } // after file read on EOF, remove the file. @@ -116,7 +144,7 @@ func (c *DiskCache) removeCurrentReadingFile() error { if c.rfd != nil { if err := c.rfd.Close(); err != nil { - return WrapFileOperationError(OpClose, err, c.path, c.rfd.Name()). + return WrapFileOperationError(OpClose, err, c.path, c.readFileName()). WithDetails("failed_to_close_read_file_during_removal") } c.rfd = nil diff --git a/diskcache/rotate_test.go b/diskcache/rotate_test.go index e852ef88..f1ac22db 100644 --- a/diskcache/rotate_test.go +++ b/diskcache/rotate_test.go @@ -8,6 +8,9 @@ package diskcache import ( "bytes" "errors" + "os" + "path/filepath" + "runtime" T "testing" "github.com/stretchr/testify/assert" @@ -122,3 +125,62 @@ func TestRotate(t *T.T) { }) }) } + +func TestRotateRecoverWriteFileOnRenameFailure(t *T.T) { + if runtime.GOOS == "windows" { + t.Skip("removing an open file is not supported on windows") + } + + p := t.TempDir() + c, err := Open(WithPath(p), WithBatchSize(1024*1024)) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, c.Close()) + ResetMetrics() + }) + + require.NoError(t, c.Put([]byte("before"))) + require.NoError(t, os.Remove(c.curWriteFile)) + + require.Error(t, c.Rotate()) + require.NotNil(t, c.wfd) + require.NoError(t, c.Put([]byte("after"))) +} + +func TestRotateTruncateEOFOnRenameFailure(t *T.T) { + p := t.TempDir() + c, err := Open(WithPath(p), WithBatchSize(1024*1024)) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, c.Close()) + ResetMetrics() + }) + + newfile := filepath.Join(p, "data.00000000000000000000000000000000") + require.NoError(t, os.Mkdir(newfile, 0o755)) + + before := []byte("before") + after := []byte("after") + + require.NoError(t, c.Put(before)) + require.Error(t, c.Rotate()) + require.NotNil(t, c.wfd) + + require.NoError(t, os.Remove(newfile)) + require.NoError(t, c.Put(after)) + require.NoError(t, c.Rotate()) + + var got [][]byte + for { + err := c.Get(func(data []byte) error { + got = append(got, append([]byte(nil), data...)) + return nil + }) + if errors.Is(err, ErrNoData) { + break + } + require.NoError(t, err) + } + + require.Equal(t, [][]byte{before, after}, got) +} diff --git a/diskcache/write_file.go b/diskcache/write_file.go new file mode 100644 index 00000000..378c4081 --- /dev/null +++ b/diskcache/write_file.go @@ -0,0 +1,45 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package diskcache + +import "os" + +func (c *DiskCache) writeFileName() string { + if c.wfd != nil { + return c.wfd.Name() + } + + if c.curWriteFile != "" { + return c.curWriteFile + } + + return c.path +} + +func (c *DiskCache) readFileName() string { + if c.rfd != nil { + return c.rfd.Name() + } + + if c.curReadfile != "" { + return c.curReadfile + } + + return c.path +} + +func (c *DiskCache) ensureWriteFile() error { + if c.wfd != nil { + return nil + } + + if c.curWriteFile == "" { + return WrapFileOperationError(OpCreate, os.ErrInvalid, c.path, c.writeFileName()). + WithDetails("write_file_path_not_set") + } + + return c.openWriteFile() +}