diff --git a/pkg/diskmonitor/diskmonitor.go b/pkg/diskmonitor/diskmonitor.go new file mode 100644 index 0000000000..ac78de6345 --- /dev/null +++ b/pkg/diskmonitor/diskmonitor.go @@ -0,0 +1,91 @@ +package diskmonitor + +import ( + "context" + "fmt" + "io/fs" + "path/filepath" + "time" + + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +type int64Gauge interface { + Record(ctx context.Context, value int64, options ...metric.RecordOption) +} + +// totalRegularFileSizeBytes sums on-disk file sizes for every non-directory under dirPath (recursive), using filepath.WalkDir. +func totalRegularFileSizeBytes(dirPath string) (int64, error) { + var totalSize int64 + walkErr := filepath.WalkDir(dirPath, func(_ string, d fs.DirEntry, ierr error) error { + if ierr != nil || d.IsDir() { + return nil + } + fi, err := d.Info() + if err != nil { + return nil + } + totalSize += fi.Size() + return nil + }) + return totalSize, walkErr +} + +// DiskMonitor measures dirPath on a fixed interval and records total file bytes to gaugeName. +type DiskMonitor struct { + services.Service + + eng *services.Engine + tickInterval time.Duration + lggr logger.Logger + sizeOfDir func() (int64, error) + gauge int64Gauge +} + +// NewDiskMonitor returns a [DiskMonitor] for dirPath using gaugeName at tickInterval. +func NewDiskMonitor(lggr logger.Logger, dirPath string, gaugeName string, tickInterval time.Duration) (*DiskMonitor, error) { + g, err := beholder.GetMeter().Int64Gauge(gaugeName) + if err != nil { + return nil, fmt.Errorf("int64 gauge %q: %w", gaugeName, err) + } + + dm := &DiskMonitor{ + gauge: g, + tickInterval: tickInterval, + sizeOfDir: func() (int64, error) { + return totalRegularFileSizeBytes(dirPath) + }, + } + + dm.Service, dm.eng = services.Config{ + Name: "DiskMonitor", + Start: dm.start, + }.NewServiceEngine(logger.With( + lggr, + "dirPath", dirPath, + "gaugeName", gaugeName, + )) + dm.lggr = dm.eng.SugaredLogger + return dm, nil +} + +func (dm *DiskMonitor) start(ctx context.Context) error { + ticker := services.TickerConfig{}.NewTicker(dm.tickInterval) + dm.eng.GoTick(ticker, dm.emitDirSizeMetric) + return nil +} + +func (dm *DiskMonitor) emitDirSizeMetric(ctx context.Context) { + totalSize, err := dm.sizeOfDir() + if err != nil { + dm.lggr.Errorw("Failed to measure directory size", "error", err) + return + } + + dm.lggr.Debugw("Emitting directory size metric", "sizeBytes", totalSize) + dm.gauge.Record(ctx, totalSize) +} diff --git a/pkg/diskmonitor/diskmonitor_test.go b/pkg/diskmonitor/diskmonitor_test.go new file mode 100644 index 0000000000..7325267823 --- /dev/null +++ b/pkg/diskmonitor/diskmonitor_test.go @@ -0,0 +1,86 @@ +package diskmonitor + +import ( + "context" + "errors" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric" + "go.uber.org/zap/zapcore" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +func TestNewDiskMonitor(t *testing.T) { + dm, err := NewDiskMonitor(logger.Test(t), t.TempDir(), "tmp_disk_usage_bytes", time.Second) + require.NoError(t, err) + assert.NotNil(t, dm) + assert.Equal(t, time.Second, dm.tickInterval) +} + +func TestDiskMonitor_emitDirSizeMetric_realDir(t *testing.T) { + root := t.TempDir() + require.NoError(t, os.WriteFile(filepath.Join(root, "a.txt"), []byte("hello"), 0o644)) + require.NoError(t, os.WriteFile(filepath.Join(root, "b.txt"), []byte("xy"), 0o644)) + sub := filepath.Join(root, "sub") + require.NoError(t, os.Mkdir(sub, 0o755)) + require.NoError(t, os.WriteFile(filepath.Join(sub, "c.txt"), []byte("z"), 0o644)) + + dm := &DiskMonitor{ + sizeOfDir: func() (int64, error) { + return totalRegularFileSizeBytes(root) + }, + gauge: &mockGauge{}, + lggr: logger.Test(t), + } + + dm.emitDirSizeMetric(t.Context()) + assert.Equal(t, int64(8), dm.gauge.(*mockGauge).gotValue) +} + +type mockGauge struct { + gotValue int64 +} + +func (m *mockGauge) Record(ctx context.Context, value int64, options ...metric.RecordOption) { + m.gotValue = value +} + +func TestDiskMonitor_emitDirSizeMetric(t *testing.T) { + lggr, observed := logger.TestObserved(t, zapcore.DebugLevel) + + dm := &DiskMonitor{ + sizeOfDir: func() (int64, error) { + return 42, nil + }, + gauge: &mockGauge{}, + lggr: lggr, + } + + dm.emitDirSizeMetric(t.Context()) + assert.Equal(t, int64(42), dm.gauge.(*mockGauge).gotValue) + + assert.Len(t, observed.FilterMessage("Emitting directory size metric").All(), 1) +} + +func TestDiskMonitor_emitDirSizeMetric_error(t *testing.T) { + lggr, observed := logger.TestObserved(t, zapcore.DebugLevel) + + dm := &DiskMonitor{ + sizeOfDir: func() (int64, error) { + return 0, errors.New("disk read error") + }, + gauge: &mockGauge{}, + lggr: lggr, + } + + dm.emitDirSizeMetric(t.Context()) + assert.Equal(t, int64(0), dm.gauge.(*mockGauge).gotValue) + + assert.Len(t, observed.FilterMessage("Failed to measure directory size").All(), 1) +}