Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions pkg/diskmonitor/diskmonitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package diskmonitor

import (
"context"
"fmt"
"io/fs"
"path/filepath"
"time"

"go.opentelemetry.io/otel/metric"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)

type int64Gauge interface {
Record(ctx context.Context, value int64, options ...metric.RecordOption)
}

// totalRegularFileSizeBytes sums on-disk file sizes for every non-directory under dirPath (recursive), using filepath.WalkDir.
func totalRegularFileSizeBytes(dirPath string) (int64, error) {
var totalSize int64
walkErr := filepath.WalkDir(dirPath, func(_ string, d fs.DirEntry, ierr error) error {
if ierr != nil || d.IsDir() {
return nil
}
fi, err := d.Info()
if err != nil {
return nil
}
totalSize += fi.Size()
return nil
})
return totalSize, walkErr
}

// DiskMonitor measures dirPath on a fixed interval and records total file bytes to gaugeName.
type DiskMonitor struct {
services.Service

eng *services.Engine
tickInterval time.Duration
lggr logger.Logger
sizeOfDir func() (int64, error)
gauge int64Gauge
}

// NewDiskMonitor returns a [DiskMonitor] for dirPath using gaugeName at tickInterval.
func NewDiskMonitor(lggr logger.Logger, dirPath string, gaugeName string, tickInterval time.Duration) (*DiskMonitor, error) {
g, err := beholder.GetMeter().Int64Gauge(gaugeName)
if err != nil {
return nil, fmt.Errorf("int64 gauge %q: %w", gaugeName, err)
}

dm := &DiskMonitor{
gauge: g,
tickInterval: tickInterval,
sizeOfDir: func() (int64, error) {
return totalRegularFileSizeBytes(dirPath)
},
}

dm.Service, dm.eng = services.Config{
Name: "DiskMonitor",
Start: dm.start,
}.NewServiceEngine(logger.With(
lggr,
"dirPath", dirPath,
"gaugeName", gaugeName,
))
dm.lggr = dm.eng.SugaredLogger
return dm, nil
}

func (dm *DiskMonitor) start(ctx context.Context) error {
ticker := services.TickerConfig{}.NewTicker(dm.tickInterval)
dm.eng.GoTick(ticker, dm.emitDirSizeMetric)
return nil
}

func (dm *DiskMonitor) emitDirSizeMetric(ctx context.Context) {
totalSize, err := dm.sizeOfDir()
if err != nil {
dm.lggr.Errorw("Failed to measure directory size", "error", err)
return
}

dm.lggr.Debugw("Emitting directory size metric", "sizeBytes", totalSize)
dm.gauge.Record(ctx, totalSize)
}
86 changes: 86 additions & 0 deletions pkg/diskmonitor/diskmonitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package diskmonitor

import (
"context"
"errors"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

func TestNewDiskMonitor(t *testing.T) {
dm, err := NewDiskMonitor(logger.Test(t), t.TempDir(), "tmp_disk_usage_bytes", time.Second)
require.NoError(t, err)
assert.NotNil(t, dm)
assert.Equal(t, time.Second, dm.tickInterval)
}

func TestDiskMonitor_emitDirSizeMetric_realDir(t *testing.T) {
root := t.TempDir()
require.NoError(t, os.WriteFile(filepath.Join(root, "a.txt"), []byte("hello"), 0o644))
require.NoError(t, os.WriteFile(filepath.Join(root, "b.txt"), []byte("xy"), 0o644))
sub := filepath.Join(root, "sub")
require.NoError(t, os.Mkdir(sub, 0o755))
require.NoError(t, os.WriteFile(filepath.Join(sub, "c.txt"), []byte("z"), 0o644))

dm := &DiskMonitor{
sizeOfDir: func() (int64, error) {
return totalRegularFileSizeBytes(root)
},
gauge: &mockGauge{},
lggr: logger.Test(t),
}

dm.emitDirSizeMetric(t.Context())
assert.Equal(t, int64(8), dm.gauge.(*mockGauge).gotValue)
}

type mockGauge struct {
gotValue int64
}

func (m *mockGauge) Record(ctx context.Context, value int64, options ...metric.RecordOption) {
m.gotValue = value
}

func TestDiskMonitor_emitDirSizeMetric(t *testing.T) {
lggr, observed := logger.TestObserved(t, zapcore.DebugLevel)

dm := &DiskMonitor{
sizeOfDir: func() (int64, error) {
return 42, nil
},
gauge: &mockGauge{},
lggr: lggr,
}

dm.emitDirSizeMetric(t.Context())
assert.Equal(t, int64(42), dm.gauge.(*mockGauge).gotValue)

assert.Len(t, observed.FilterMessage("Emitting directory size metric").All(), 1)
}

func TestDiskMonitor_emitDirSizeMetric_error(t *testing.T) {
lggr, observed := logger.TestObserved(t, zapcore.DebugLevel)

dm := &DiskMonitor{
sizeOfDir: func() (int64, error) {
return 0, errors.New("disk read error")
},
gauge: &mockGauge{},
lggr: lggr,
}

dm.emitDirSizeMetric(t.Context())
assert.Equal(t, int64(0), dm.gauge.(*mockGauge).gotValue)

assert.Len(t, observed.FilterMessage("Failed to measure directory size").All(), 1)
}
Loading