Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion diskcache/diskcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (c *DiskCache) Pretty() string {
}

if c.rfd != nil {
arr = append(arr, "cur-read: "+c.rfd.Name())
arr = append(arr, "cur-read: "+c.readFileName())
} else {
arr = append(arr, "no-Get()")
}
Expand Down
16 changes: 8 additions & 8 deletions diskcache/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ retry:
if n, err = c.rfd.Read(c.batchHeader); err != nil || n != dataHeaderLen {
if err != nil && !errors.Is(err, io.EOF) {
l.Errorf("read %d bytes header error: %s", dataHeaderLen, err.Error())
err = WrapFileOperationError(OpRead, err, c.path, c.rfd.Name()).
err = WrapFileOperationError(OpRead, err, c.path, c.readFileName()).
WithDetails(fmt.Sprintf("header_read: expected=%d, actual=%d", dataHeaderLen, n))
} else if n > 0 && n != dataHeaderLen {
l.Errorf("invalid header length: %d, expect %d", n, dataHeaderLen)
err = NewCacheError(OpRead, ErrUnexpectedReadSize,
fmt.Sprintf("header_size_mismatch: expected=%d, actual=%d", dataHeaderLen, n)).
WithPath(c.path).WithFile(c.rfd.Name())
WithPath(c.path).WithFile(c.readFileName())
}

// On bad datafile, just ignore and delete the file.
Expand All @@ -135,7 +135,7 @@ retry:

if uint32(nbytes) == EOFHint { // EOF
if err := c.switchNextFile(); err != nil {
return WrapGetError(err, c.path, c.rfd.Name()).
return WrapGetError(err, c.path, c.readFileName()).
WithDetails("eof_encountered_during_get")
}

Expand All @@ -156,23 +156,23 @@ retry:
if len(readbuf) < nbytes {
// seek to next read position
if x, err := c.rfd.Seek(int64(nbytes), io.SeekCurrent); err != nil {
return WrapFileOperationError(OpSeek, err, c.path, c.rfd.Name()).
return WrapFileOperationError(OpSeek, err, c.path, c.readFileName()).
WithDetails(fmt.Sprintf("failed_to_seek_past_data: data_size=%d", nbytes))
} else {
l.Warnf("got %d bytes to buffer with len %d, seek to new read position %d, drop %d bytes within file %s",
nbytes, len(readbuf), x, nbytes, c.curReadfile)

droppedDataVec.WithLabelValues(c.path, reasonTooSmallReadBuffer).Observe(float64(nbytes))
return WrapGetError(ErrTooSmallReadBuf, c.path, c.rfd.Name()).
return WrapGetError(ErrTooSmallReadBuf, c.path, c.readFileName()).
WithDetails(fmt.Sprintf("buffer_too_small: required=%d, provided=%d", nbytes, len(readbuf)))
}
}

if n, err := c.rfd.Read(readbuf[:nbytes]); err != nil {
return WrapFileOperationError(OpRead, err, c.path, c.rfd.Name()).
return WrapFileOperationError(OpRead, err, c.path, c.readFileName()).
WithDetails(fmt.Sprintf("data_read: expected=%d, actual=%d", nbytes, n))
} else if n != nbytes {
return WrapGetError(ErrUnexpectedReadSize, c.path, c.rfd.Name()).
return WrapGetError(ErrUnexpectedReadSize, c.path, c.readFileName()).
WithDetails(fmt.Sprintf("partial_read: expected=%d, actual=%d", nbytes, n))
}

Expand All @@ -184,7 +184,7 @@ retry:
// seek back
if !c.noFallbackOnError {
if _, serr := c.rfd.Seek(-int64(dataHeaderLen+nbytes), io.SeekCurrent); serr != nil {
return WrapFileOperationError(OpSeek, serr, c.path, c.rfd.Name()).
return WrapFileOperationError(OpSeek, serr, c.path, c.readFileName()).
WithDetails(fmt.Sprintf("fallback_seek_failed: offset=%d", -int64(dataHeaderLen+nbytes)))
}

Expand Down
21 changes: 15 additions & 6 deletions diskcache/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,26 @@ func (c *DiskCache) Put(data []byte) error {
fmt.Sprintf("max_size=%d, actual_size=%d", c.maxDataSize, len(data)))
}

if err := c.ensureWriteFile(); err != nil {
return WrapPutError(err, c.path, len(data)).WithDetails("failed_to_open_write_file")
}

hdr := make([]byte, dataHeaderLen)

binary.LittleEndian.PutUint32(hdr, uint32(len(data)))
if _, err := c.wfd.Write(hdr); err != nil {
return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()).
return WrapFileOperationError(OpWrite, err, c.path, c.writeFileName()).
WithDetails("failed_to_write_header")
}

if _, err := c.wfd.Write(data); err != nil {
return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()).
return WrapFileOperationError(OpWrite, err, c.path, c.writeFileName()).
WithDetails("failed_to_write_data")
}

if !c.noSync {
if err := c.wfd.Sync(); err != nil {
return WrapFileOperationError(OpSync, err, c.path, c.wfd.Name()).
return WrapFileOperationError(OpSync, err, c.path, c.writeFileName()).
WithDetails("failed_to_sync_write")
}
}
Expand Down Expand Up @@ -116,15 +120,20 @@ func (c *DiskCache) StreamPut(r io.Reader, size int) error {
fmt.Sprintf("size_exceeded: max=%d, actual=%d", c.maxDataSize, size)).WithPath(c.path)
}

if err := c.ensureWriteFile(); err != nil {
return NewCacheError(OpStreamPut, err, "failed_to_open_write_file").
WithPath(c.path)
}

if startOffset, err = c.wfd.Seek(0, io.SeekCurrent); err != nil {
return WrapFileOperationError(OpSeek, err, c.path, c.wfd.Name()).
return WrapFileOperationError(OpSeek, err, c.path, c.writeFileName()).
WithDetails("failed_to_get_current_position")
}

defer func() {
if total > 0 && err != nil { // fallback to origin position
if _, serr := c.wfd.Seek(startOffset, io.SeekStart); serr != nil {
c.LastErr = WrapFileOperationError(OpSeek, serr, c.path, c.wfd.Name()).
c.LastErr = WrapFileOperationError(OpSeek, serr, c.path, c.writeFileName()).
WithDetails(fmt.Sprintf("failed_to_fallback_to_position_%d", startOffset))
}
}
Expand All @@ -135,7 +144,7 @@ func (c *DiskCache) StreamPut(r io.Reader, size int) error {
if size > 0 {
binary.LittleEndian.PutUint32(c.batchHeader, uint32(size))
if _, err := c.wfd.Write(c.batchHeader); err != nil {
return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()).
return WrapFileOperationError(OpWrite, err, c.path, c.writeFileName()).
WithDetails("failed_to_write_stream_header")
}
}
Expand Down
58 changes: 43 additions & 15 deletions diskcache/rotate.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,16 @@ func (c *DiskCache) rotate() error {
datafilesVec.WithLabelValues(c.path).Set(float64(len(c.dataFiles)))
}()

if err := c.ensureWriteFile(); err != nil {
return NewCacheError(OpRotate, err, "failed_to_open_write_file_before_rotate").
WithPath(c.path)
}

batchSizeBeforeEOF := c.curBatchSize
eof := make([]byte, dataHeaderLen)
binary.LittleEndian.PutUint32(eof, EOFHint)
if _, err := c.wfd.Write(eof); err != nil { // append EOF to file end
return WrapFileOperationError(OpWrite, err, c.path, c.wfd.Name()).
return WrapFileOperationError(OpWrite, err, c.path, c.writeFileName()).
WithDetails("failed_to_write_eof_marker_during_rotate")
}

Expand Down Expand Up @@ -69,39 +75,61 @@ func (c *DiskCache) rotate() error {

// close current writing file
if err := c.wfd.Close(); err != nil {
return WrapFileOperationError(OpClose, err, c.path, c.wfd.Name()).
return WrapFileOperationError(OpClose, err, c.path, c.writeFileName()).
WithDetails("failed_to_close_write_file_during_rotate")
}
c.wfd = nil

// rename data -> data.0004
renamed := true
var rotateErr error
if err := os.Rename(c.curWriteFile, newfile); err != nil {
return WrapRotateError(err, c.path, c.curWriteFile, newfile).
renamed = false
rotateErr = WrapRotateError(err, c.path, c.curWriteFile, newfile).
WithDetails("failed_to_rename_file_during_rotate")

if fi, statErr := os.Stat(c.curWriteFile); statErr == nil && !fi.IsDir() {
if truncErr := os.Truncate(c.curWriteFile, batchSizeBeforeEOF); truncErr != nil {
rotateErr = NewCacheError(OpRotate, rotateErr,
fmt.Sprintf("failed_to_restore_write_file_size_after_rename_failure: size=%d, error=%v",
batchSizeBeforeEOF, truncErr)).
WithPath(c.path)
}
}
}

// new file added, add it's size to cache size
if fi, err := os.Stat(newfile); err == nil {
if fi.Size() > dataHeaderLen {
c.size.Add(fi.Size())
sizeVec.WithLabelValues(c.path).Add(float64(fi.Size()))
putBytesVec.WithLabelValues(c.path).Observe(float64(fi.Size()))
if renamed {
if fi, err := os.Stat(newfile); err == nil {
if fi.Size() > dataHeaderLen {
c.size.Add(fi.Size())
sizeVec.WithLabelValues(c.path).Add(float64(fi.Size()))
putBytesVec.WithLabelValues(c.path).Observe(float64(fi.Size()))
}
} else {
// Non-critical error: log but don't fail rotation
l.Warnf("failed to stat rotated file %s: %v", newfile, err)
}

c.dataFiles = append(c.dataFiles, newfile)
sort.Strings(c.dataFiles)
} else {
// Non-critical error: log but don't fail rotation
l.Warnf("failed to stat rotated file %s: %v", newfile, err)
l.Errorf("%s", rotateErr)
}

c.dataFiles = append(c.dataFiles, newfile)
sort.Strings(c.dataFiles)

// reopen new write file
if err := c.openWriteFile(); err != nil {
if rotateErr != nil {
return NewCacheError(OpRotate, rotateErr,
fmt.Sprintf("failed_to_open_write_file_after_rotate_error: %v", err)).
WithPath(c.path)
}

return NewCacheError(OpRotate, err, "failed_to_open_new_write_file").
WithPath(c.path)
}

return nil
return rotateErr
}

// after file read on EOF, remove the file.
Expand All @@ -116,7 +144,7 @@ func (c *DiskCache) removeCurrentReadingFile() error {

if c.rfd != nil {
if err := c.rfd.Close(); err != nil {
return WrapFileOperationError(OpClose, err, c.path, c.rfd.Name()).
return WrapFileOperationError(OpClose, err, c.path, c.readFileName()).
WithDetails("failed_to_close_read_file_during_removal")
}
c.rfd = nil
Expand Down
62 changes: 62 additions & 0 deletions diskcache/rotate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ package diskcache
import (
"bytes"
"errors"
"os"
"path/filepath"
"runtime"
T "testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -122,3 +125,62 @@ func TestRotate(t *T.T) {
})
})
}

func TestRotateRecoverWriteFileOnRenameFailure(t *T.T) {
if runtime.GOOS == "windows" {
t.Skip("removing an open file is not supported on windows")
}

p := t.TempDir()
c, err := Open(WithPath(p), WithBatchSize(1024*1024))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, c.Close())
ResetMetrics()
})

require.NoError(t, c.Put([]byte("before")))
require.NoError(t, os.Remove(c.curWriteFile))

require.Error(t, c.Rotate())
require.NotNil(t, c.wfd)
require.NoError(t, c.Put([]byte("after")))
}

func TestRotateTruncateEOFOnRenameFailure(t *T.T) {
p := t.TempDir()
c, err := Open(WithPath(p), WithBatchSize(1024*1024))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, c.Close())
ResetMetrics()
})

newfile := filepath.Join(p, "data.00000000000000000000000000000000")
require.NoError(t, os.Mkdir(newfile, 0o755))

before := []byte("before")
after := []byte("after")

require.NoError(t, c.Put(before))
require.Error(t, c.Rotate())
require.NotNil(t, c.wfd)

require.NoError(t, os.Remove(newfile))
require.NoError(t, c.Put(after))
require.NoError(t, c.Rotate())

var got [][]byte
for {
err := c.Get(func(data []byte) error {
got = append(got, append([]byte(nil), data...))
return nil
})
if errors.Is(err, ErrNoData) {
break
}
require.NoError(t, err)
}

require.Equal(t, [][]byte{before, after}, got)
}
45 changes: 45 additions & 0 deletions diskcache/write_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the MIT License.
// This product includes software developed at Guance Cloud (https://www.guance.com/).
// Copyright 2021-present Guance, Inc.

package diskcache

import "os"

func (c *DiskCache) writeFileName() string {
if c.wfd != nil {
return c.wfd.Name()
}

if c.curWriteFile != "" {
return c.curWriteFile
}

return c.path
}

func (c *DiskCache) readFileName() string {
if c.rfd != nil {
return c.rfd.Name()
}

if c.curReadfile != "" {
return c.curReadfile
}

return c.path
}

func (c *DiskCache) ensureWriteFile() error {
if c.wfd != nil {
return nil
}

if c.curWriteFile == "" {
return WrapFileOperationError(OpCreate, os.ErrInvalid, c.path, c.writeFileName()).
WithDetails("write_file_path_not_set")
}

return c.openWriteFile()
}
Loading