diff --git a/config/local.yaml b/config/local.yaml index 9391a4a6..28357f1f 100644 --- a/config/local.yaml +++ b/config/local.yaml @@ -301,6 +301,7 @@ checks: download-timeout: 1m iteration-wait: 5m duration: 10m + r-levels: [0, 2, 4] timeout: 11m type: smoke ci-load: diff --git a/pkg/bee/api/api.go b/pkg/bee/api/api.go index 60d864fa..b4e44474 100644 --- a/pkg/bee/api/api.go +++ b/pkg/bee/api/api.go @@ -11,6 +11,8 @@ import ( "net/url" "strconv" "strings" + + "github.com/ethersphere/bee/v2/pkg/file/redundancy" ) const ( @@ -32,6 +34,7 @@ const ( swarmFeedIndexNextHeader = "Swarm-Feed-Index-Next" swarmIndexDocumentHeader = "Swarm-Index-Document" swarmErrorDocumentHeader = "Swarm-Error-Document" + redundancyLevelHeader = "Swarm-Redundancy-Level" ) // Client manages communication with the Bee API. @@ -223,6 +226,9 @@ func (c *Client) requestDataGetHeader(ctx context.Context, method, path string, if opts != nil && opts.Cache != nil { req.Header.Set(swarmCacheDownloadHeader, strconv.FormatBool(*opts.Cache)) } + if opts != nil && opts.RLevel != redundancy.NONE { + req.Header.Set(redundancyLevelHeader, strconv.Itoa(int(opts.RLevel))) + } if opts != nil && opts.RedundancyFallbackMode != nil { req.Header.Set(swarmRedundancyFallbackMode, strconv.FormatBool(*opts.RedundancyFallbackMode)) } diff --git a/pkg/bee/api/bytes.go b/pkg/bee/api/bytes.go index 2544a302..6c7ff868 100644 --- a/pkg/bee/api/bytes.go +++ b/pkg/bee/api/bytes.go @@ -6,6 +6,7 @@ import ( "net/http" "strconv" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/swarm" ) @@ -34,6 +35,10 @@ func (b *BytesService) Upload(ctx context.Context, data io.Reader, o UploadOptio } h.Add(deferredUploadHeader, strconv.FormatBool(!o.Direct)) h.Add(postageStampBatchHeader, o.BatchID) + if o.RLevel != redundancy.NONE { + h.Add(redundancyLevelHeader, strconv.Itoa(int(o.RLevel))) + } + err := b.client.requestWithHeader(ctx, http.MethodPost, "/"+apiVersion+"/bytes", h, data, &resp) return resp, err } diff --git a/pkg/bee/api/options.go b/pkg/bee/api/options.go index 17331ec4..cbf82e9b 100644 --- a/pkg/bee/api/options.go +++ b/pkg/bee/api/options.go @@ -1,6 +1,9 @@ package api -import "github.com/ethersphere/bee/v2/pkg/swarm" +import ( + "github.com/ethersphere/bee/v2/pkg/file/redundancy" + "github.com/ethersphere/bee/v2/pkg/swarm" +) type UploadOptions struct { Act bool @@ -9,6 +12,7 @@ type UploadOptions struct { BatchID string Direct bool ActHistoryAddress swarm.Address + RLevel redundancy.Level // Dirs IndexDocument string @@ -21,6 +25,7 @@ type DownloadOptions struct { ActPublicKey *swarm.Address ActTimestamp *uint64 Cache *bool + RLevel redundancy.Level RedundancyFallbackMode *bool OnlyRootChunk *bool } diff --git a/pkg/check/load/load.go b/pkg/check/load/load.go index 059a3a48..7da43b60 100644 --- a/pkg/check/load/load.go +++ b/pkg/check/load/load.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/ethersphere/beekeeper/pkg/bee" "github.com/ethersphere/beekeeper/pkg/beekeeper" @@ -203,7 +204,7 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option c.logger.WithField("batch_id", batchID).Infof("node %s: using batch", uploader.Name()) - address, duration, err = test.Upload(ctx, uploader, txData, batchID) + address, duration, err = test.Upload(ctx, uploader, txData, batchID, redundancy.NONE) if err != nil { c.metrics.UploadErrors.WithLabelValues(sizeLabel).Inc() c.logger.Errorf("upload failed: %v", err) @@ -246,7 +247,7 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option c.metrics.DownloadAttempts.WithLabelValues(sizeLabel).Inc() - rxData, rxDuration, err = test.Download(ctx, downloader, address) + rxData, rxDuration, err = test.Download(ctx, downloader, address, redundancy.NONE) if err != nil { c.metrics.DownloadErrors.WithLabelValues(sizeLabel).Inc() c.logger.Errorf("download failed for size %d: %v", contentSize, err) diff --git a/pkg/check/smoke/smoke.go b/pkg/check/smoke/smoke.go index 8d2eb5f0..c425ec13 100644 --- a/pkg/check/smoke/smoke.go +++ b/pkg/check/smoke/smoke.go @@ -8,7 +8,9 @@ import ( "fmt" "time" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/beekeeper/pkg/beekeeper" "github.com/ethersphere/beekeeper/pkg/logging" "github.com/ethersphere/beekeeper/pkg/orchestration" @@ -33,6 +35,7 @@ type Options struct { UploadTimeout time.Duration DownloadTimeout time.Duration IterationWait time.Duration + RLevels []redundancy.Level } // NewDefaultOptions returns new default options @@ -51,6 +54,7 @@ func NewDefaultOptions() Options { UploadTimeout: 60 * time.Minute, DownloadTimeout: 60 * time.Minute, IterationWait: 5 * time.Minute, + RLevels: []redundancy.Level{redundancy.PARANOID}, } } @@ -90,6 +94,7 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option c.logger.Infof("upload timeout: %s", o.UploadTimeout.String()) c.logger.Infof("download timeout: %s", o.DownloadTimeout.String()) c.logger.Infof("total duration: %s", o.Duration.String()) + c.logger.Infof("redundancy levels: %v", o.RLevels) rnd := random.PseudoGenerator(o.RndSeed) @@ -133,146 +138,145 @@ func (c *Check) run(ctx context.Context, cluster orchestration.Cluster, o Option c.logger.WithField("batch_id", batchID).Infof("node %s: using batch", uploader.Name()) - for _, contentSize := range fileSizes { - select { - case <-ctx.Done(): - return nil - default: - c.logger.Infof("testing file size: %d bytes (%.2f KB)", contentSize, float64(contentSize)/1024) - } - - sizeLabel := fmt.Sprintf("%d", contentSize) - - var ( - txDuration time.Duration - rxDuration time.Duration - txData []byte - rxData []byte - address swarm.Address - uploaded bool - ) - - txData = make([]byte, contentSize) - if _, err := rand.Read(txData); err != nil { - c.logger.Errorf("unable to create random content for size %d: %v", contentSize, err) - continue - } - - var ( - txCtx context.Context - txCancel context.CancelFunc = func() {} - ) - - for range 3 { - txCancel() - - uploaded = false - + for _, rLevel := range o.RLevels { + for _, contentSize := range fileSizes { select { case <-ctx.Done(): return nil - case <-time.After(o.TxOnErrWait): + default: + c.logger.Infof("testing file size: %d bytes (%.2f KB), redundancy level: %d", contentSize, float64(contentSize)/1024, rLevel) } - txCtx, txCancel = context.WithTimeout(ctx, o.UploadTimeout) - - c.metrics.UploadAttempts.WithLabelValues(sizeLabel, uploader.Name()).Inc() - address, txDuration, err = test.Upload(txCtx, uploader, txData, batchID) - if err != nil { - c.metrics.UploadErrors.WithLabelValues(sizeLabel, uploader.Name()).Inc() - c.logger.Errorf("upload failed for size %d: %v", contentSize, err) - c.logger.Infof("retrying in: %v", o.TxOnErrWait) - } else { - uploaded = true - break + sizeLabel := fmt.Sprintf("%d", contentSize) + + var ( + txDuration time.Duration + rxDuration time.Duration + txData []byte + rxData []byte + address swarm.Address + uploaded bool + downloaded bool + ) + + txData = make([]byte, contentSize) + if _, err := rand.Read(txData); err != nil { + c.logger.Errorf("unable to create random content for size %d: %v", contentSize, err) + continue } - } - txCancel() - if !uploaded { - c.logger.Infof("skipping download for size %d due to upload failure", contentSize) - continue - } - - c.metrics.UploadDuration.WithLabelValues(sizeLabel, uploader.Name()).Observe(txDuration.Seconds()) - // Calculate and record upload throughput in bytes per second - if txDuration.Seconds() > 0 { - uploadThroughput := float64(contentSize) / txDuration.Seconds() - c.metrics.UploadThroughput.WithLabelValues(sizeLabel, uploader.Name()).Set(uploadThroughput) - } + var ( + txCtx context.Context + txCancel context.CancelFunc = func() {} + ) - time.Sleep(o.NodesSyncWait) + for range 3 { + txCancel() - var ( - rxCtx context.Context - rxCancel context.CancelFunc = func() {} - downloaded bool - ) + uploaded = false - for range 3 { - rxCancel() + select { + case <-ctx.Done(): + return nil + case <-time.After(o.TxOnErrWait): + } - select { - case <-ctx.Done(): - return nil - case <-time.After(o.RxOnErrWait): + txCtx, txCancel = context.WithTimeout(ctx, o.UploadTimeout) + + c.metrics.UploadAttempts.WithLabelValues(sizeLabel, uploader.Name()).Inc() + address, txDuration, err = test.Upload(txCtx, uploader, txData, batchID, rLevel) + if err != nil { + c.metrics.UploadErrors.WithLabelValues(sizeLabel, uploader.Name()).Inc() + c.logger.Errorf("upload failed for size %d: %v", contentSize, err) + c.logger.Infof("retrying in: %v", o.TxOnErrWait) + } else { + uploaded = true + break + } + } + txCancel() + if !uploaded { + c.logger.Infof("skipping download for size %d due to upload failure", contentSize) + continue } - c.metrics.DownloadAttempts.WithLabelValues(sizeLabel, downloader.Name()).Inc() + c.metrics.UploadDuration.WithLabelValues(sizeLabel, uploader.Name()).Observe(txDuration.Seconds()) - rxCtx, rxCancel = context.WithTimeout(ctx, o.DownloadTimeout) - rxData, rxDuration, err = test.Download(rxCtx, downloader, address) - if err != nil { - c.metrics.DownloadErrors.WithLabelValues(sizeLabel, downloader.Name()).Inc() - c.logger.Errorf("download failed for size %d: %v", contentSize, err) - c.logger.Infof("retrying in: %v", o.RxOnErrWait) - continue + if txDuration.Seconds() > 0 { + uploadThroughput := float64(contentSize) / txDuration.Seconds() + c.metrics.UploadThroughput.WithLabelValues(sizeLabel, uploader.Name()).Set(uploadThroughput) } - // good download - if bytes.Equal(rxData, txData) { - c.metrics.DownloadDuration.WithLabelValues(sizeLabel, downloader.Name()).Observe(rxDuration.Seconds()) + time.Sleep(o.NodesSyncWait) - if rxDuration.Seconds() > 0 { - downloadThroughput := float64(contentSize) / rxDuration.Seconds() - c.metrics.DownloadThroughput.WithLabelValues(sizeLabel, downloader.Name()).Set(downloadThroughput) + var ( + rxCtx context.Context + rxCancel context.CancelFunc = func() {} + ) + + for range 3 { + rxCancel() + + select { + case <-ctx.Done(): + return nil + case <-time.After(o.RxOnErrWait): } - downloaded = true - break - } - // bad download - c.logger.Infof("data mismatch for size %d: uploaded and downloaded data differ", contentSize) - c.metrics.DownloadMismatch.WithLabelValues(sizeLabel, downloader.Name()).Inc() + c.metrics.DownloadAttempts.WithLabelValues(sizeLabel, downloader.Name()).Inc() - rxLen, txLen := len(rxData), len(txData) - if rxLen != txLen { - c.logger.Errorf("length mismatch for size %d: downloaded %d bytes, uploaded %d bytes", contentSize, rxLen, txLen) - continue - } + rxCtx, rxCancel = context.WithTimeout(ctx, o.DownloadTimeout) + rxData, rxDuration, err = test.Download(rxCtx, downloader, address, rLevel) + if err != nil { + c.metrics.DownloadErrors.WithLabelValues(sizeLabel, downloader.Name()).Inc() + c.logger.Errorf("download failed for size %d: %v", contentSize, err) + c.logger.Infof("retrying in: %v", o.RxOnErrWait) + continue + } - var diff int - for i := range txData { - if txData[i] != rxData[i] { - diff++ + if bytes.Equal(rxData, txData) { + c.metrics.DownloadDuration.WithLabelValues(sizeLabel, downloader.Name()).Observe(rxDuration.Seconds()) + + if rxDuration.Seconds() > 0 { + downloadThroughput := float64(contentSize) / rxDuration.Seconds() + c.metrics.DownloadThroughput.WithLabelValues(sizeLabel, downloader.Name()).Set(downloadThroughput) + } + downloaded = true + break } + + c.logger.Infof("data mismatch for size %d: uploaded and downloaded data differ", contentSize) + c.metrics.DownloadMismatch.WithLabelValues(sizeLabel, downloader.Name()).Inc() + + rxLen, txLen := len(rxData), len(txData) + if rxLen != txLen { + c.logger.Errorf("length mismatch for size %d: downloaded %d bytes, uploaded %d bytes", contentSize, rxLen, txLen) + continue + } + + var diff int + for i := range txData { + if txData[i] != rxData[i] { + diff++ + } + } + c.logger.Infof("data mismatch for size %d: found %d different bytes, ~%.2f%%", contentSize, diff, float64(diff)/float64(txLen)*100) } - c.logger.Infof("data mismatch for size %d: found %d different bytes, ~%.2f%%", contentSize, diff, float64(diff)/float64(txLen)*100) - } - rxCancel() - - if !downloaded { - c.logger.Errorf("all download attempts failed for size %d, fetching downloader topology", contentSize) - top, topErr := downloader.Topology(ctx) - if topErr != nil { - c.logger.Errorf("failed to get downloader topology: %v", topErr) - } else { - c.logger.Infof("downloader %s topology: depth=%d, connected=%d, population=%d, reachability=%s, bins=%s", - downloader.Name(), top.Depth, top.Connected, top.Population, top.Reachability, top.Bins.String()) + rxCancel() + + if !downloaded { + c.logger.Errorf("all download attempts failed for size %d, fetching downloader topology", contentSize) + top, topErr := downloader.Topology(ctx) + if topErr != nil { + c.logger.Errorf("failed to get downloader topology: %v", topErr) + } else { + c.logger.Infof("downloader %s topology: depth=%d, connected=%d, population=%d, reachability=%s, bins=%s", + downloader.Name(), top.Depth, top.Connected, top.Population, top.Reachability, top.Bins.String()) + } } - } - c.logger.Infof("completed testing file size: %d bytes", contentSize) + c.logger.Infof("completed testing file size: %d bytes", contentSize) + } } time.Sleep(o.IterationWait) diff --git a/pkg/config/check.go b/pkg/config/check.go index b130c6e4..5b3344c5 100644 --- a/pkg/config/check.go +++ b/pkg/config/check.go @@ -6,6 +6,7 @@ import ( "reflect" "time" + beeRedundancy "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/beekeeper/pkg/beekeeper" "github.com/ethersphere/beekeeper/pkg/check/act" "github.com/ethersphere/beekeeper/pkg/check/balances" @@ -418,6 +419,7 @@ var Checks = map[string]CheckType{ RxOnErrWait *time.Duration `yaml:"rx-on-err-wait"` NodesSyncWait *time.Duration `yaml:"nodes-sync-wait"` Duration *time.Duration `yaml:"duration"` + RLevels *[]uint8 `yaml:"r-levels"` }) if err := check.Options.Decode(checkOpts); err != nil { @@ -745,6 +747,22 @@ func applyCheckConfig(global CheckGlobalConfig, local, opts any) (err error) { ov.FieldByName(fieldName).Set(fieldValue) } } + case "RLevels": + if !lv.Field(i).IsNil() { + fieldValue := lv.FieldByName(fieldName).Elem() + n := fieldValue.Len() + levels := make([]beeRedundancy.Level, n) + for j := 0; j < n; j++ { + levels[j] = beeRedundancy.Level(uint8(fieldValue.Index(j).Uint())) + } + ft, ok := ot.FieldByName(fieldName) + if ok { + v := reflect.ValueOf(levels) + if v.Type().AssignableTo(ft.Type) { + ov.FieldByName(fieldName).Set(v) + } + } + } default: if lv.Field(i).IsNil() { fmt.Printf("field %s not set, using default value\n", fieldName) diff --git a/pkg/test/test.go b/pkg/test/test.go index 5d603ae1..7d3c132c 100644 --- a/pkg/test/test.go +++ b/pkg/test/test.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/ethersphere/bee/v2/pkg/file/redundancy" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/ethersphere/beekeeper/pkg/bee" "github.com/ethersphere/beekeeper/pkg/bee/api" @@ -21,10 +22,10 @@ type test struct { logger logging.Logger } -func (t *test) Upload(ctx context.Context, bee *bee.Client, data []byte, batchID string) (swarm.Address, time.Duration, error) { +func (t *test) Upload(ctx context.Context, bee *bee.Client, data []byte, batchID string, rLevel redundancy.Level) (swarm.Address, time.Duration, error) { t.logger.Infof("node %s: uploading %d bytes, batch id %s", bee.Name(), len(data), batchID) start := time.Now() - addr, err := bee.UploadBytes(ctx, data, api.UploadOptions{Pin: false, BatchID: batchID, Direct: true}) + addr, err := bee.UploadBytes(ctx, data, api.UploadOptions{Pin: false, BatchID: batchID, Direct: true, RLevel: rLevel}) if err != nil { return swarm.ZeroAddress, 0, fmt.Errorf("upload to node %s: %w", bee.Name(), err) } @@ -35,11 +36,21 @@ func (t *test) Upload(ctx context.Context, bee *bee.Client, data []byte, batchID return addr, txDuration, nil } -func (t *test) Download(ctx context.Context, bee *bee.Client, addr swarm.Address) ([]byte, time.Duration, error) { +func (t *test) Download(ctx context.Context, bee *bee.Client, addr swarm.Address, rLevel redundancy.Level) ([]byte, time.Duration, error) { t.logger.Infof("node %s: downloading address %s", bee.Name(), addr) start := time.Now() - data, err := bee.DownloadBytes(ctx, addr, nil) + + var downloadOpts *api.DownloadOptions + if rLevel != redundancy.NONE { + fallbackMode := true + downloadOpts = &api.DownloadOptions{ + RLevel: rLevel, + RedundancyFallbackMode: &fallbackMode, + } + } + + data, err := bee.DownloadBytes(ctx, addr, downloadOpts) if err != nil { return nil, 0, fmt.Errorf("download from node %s: %w", bee.Name(), err) }