From 982333cf5d559cdfc77af41b8a5cf0522daec1df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20=C5=9Awi=C4=99cki?= Date: Wed, 28 May 2025 11:22:24 +0200 Subject: [PATCH 1/3] Add detailed flush strategy better suited for long-running upload processes * Add max flush interval - flush the FS once the interval is reached * Allow configuring flushing on finished column and z layer --- cmd/maps-tile-uploader/defaultConfig.yaml | 5 ++ cmd/maps-tile-uploader/flush_strategy.go | 87 +++++++++++++++++++ cmd/maps-tile-uploader/main.go | 53 +++++++---- .../cinode-upload-config-configmap.yaml | 5 ++ 4 files changed, 133 insertions(+), 17 deletions(-) create mode 100644 cmd/maps-tile-uploader/flush_strategy.go diff --git a/cmd/maps-tile-uploader/defaultConfig.yaml b/cmd/maps-tile-uploader/defaultConfig.yaml index da5be8d..8a41597 100644 --- a/cmd/maps-tile-uploader/defaultConfig.yaml +++ b/cmd/maps-tile-uploader/defaultConfig.yaml @@ -3,6 +3,11 @@ minZoom: 0 planetMaxZoom: 9 maxTileDownloadRetries: 100 maxTileDownloadRetryDelaySec: 10 +flushStrategy: + # maxFlushInterval: 1h + flushOnDetailedColumnFinished: false + flushOnColumnFinished: false + flushOnZLayerFinished: true detailedRegions: - name: europe-luxembourg geoBBox: diff --git a/cmd/maps-tile-uploader/flush_strategy.go b/cmd/maps-tile-uploader/flush_strategy.go new file mode 100644 index 0000000..54b341d --- /dev/null +++ b/cmd/maps-tile-uploader/flush_strategy.go @@ -0,0 +1,87 @@ +package main + +import ( + "context" + "time" + + "github.com/cinode/go/pkg/cinodefs" +) + +type FlushStrategy interface { + FlushOpportunity(ctx context.Context) error + ColumnFinished(ctx context.Context, isDetailedRegion bool) error + ZLayerFinished(ctx context.Context) error + ProcessFinished(ctx context.Context) error +} + +func NewFlushStrategy( + fs cinodefs.FS, + cfg FlushStrategyConfig, + timeSource func() time.Time, +) FlushStrategy { + return &flushStrategy{ + fs: fs, + cfg: cfg, + timeSource: timeSource, + } +} + +type FlushStrategyConfig struct { + MaxFlushInterval *time.Duration `yaml:"maxFlushInterval,omitempty"` + FlushOnDetailedColumnFinished bool `yaml:"flushOnDetailedColumnFinished"` + FlushOnColumnFinished bool `yaml:"flushOnColumnFinished"` + FlushOnZLayerFinished bool `yaml:"flushOnZLayerFinished"` +} + +type flushStrategy struct { + fs cinodefs.FS + lastFlushTime time.Time + cfg FlushStrategyConfig + timeSource func() time.Time +} + +func (f *flushStrategy) flush(ctx context.Context) error { + if err := f.fs.Flush(ctx); err != nil { + return err + } + + f.lastFlushTime = f.timeSource() + + return nil +} + +func (f *flushStrategy) FlushOpportunity(ctx context.Context) error { + if f.cfg.MaxFlushInterval == nil { + return nil + } + + if f.timeSource().Sub(f.lastFlushTime) < *f.cfg.MaxFlushInterval { + return nil + } + + return f.flush(ctx) +} + +func (f *flushStrategy) ColumnFinished(ctx context.Context, isDetailedRegion bool) error { + if isDetailedRegion && f.cfg.FlushOnDetailedColumnFinished { + return f.flush(ctx) + } + + if f.cfg.FlushOnColumnFinished { + return f.flush(ctx) + } + + return nil +} + +func (f *flushStrategy) ZLayerFinished(ctx context.Context) error { + if f.cfg.FlushOnZLayerFinished { + return f.flush(ctx) + } + + return nil +} + +func (f *flushStrategy) ProcessFinished(ctx context.Context) error { + return f.flush(ctx) +} diff --git a/cmd/maps-tile-uploader/main.go b/cmd/maps-tile-uploader/main.go index b19dfb1..0823456 100644 --- a/cmd/maps-tile-uploader/main.go +++ b/cmd/maps-tile-uploader/main.go @@ -99,9 +99,10 @@ func main() { } gen := tilesGenerator{ - cfg: cfg, - fs: fs, - log: slog.Default(), + cfg: cfg, + fs: fs, + log: slog.Default(), + flush: NewFlushStrategy(fs, cfg.FlushStrategy, time.Now), } err = gen.Process(ctx) @@ -123,12 +124,14 @@ type Config struct { MaxTileDownloadRetries int `yaml:"maxTileDownloadRetries"` MaxTileDownloadRetryDelaySec int `yaml:"maxTileDownloadRetryDelaySec"` DetailedRegions []DetailedRegionConfig `yaml:"detailedRegions"` + FlushStrategy FlushStrategyConfig `yaml:"flushStrategy"` } type tilesGenerator struct { - cfg Config - fs cinodefs.FS - log *slog.Logger + cfg Config + fs cinodefs.FS + log *slog.Logger + flush FlushStrategy } func (t *tilesGenerator) fetchTile( @@ -162,6 +165,10 @@ func (t *tilesGenerator) fetchTile( for retry := 0; ctx.Err() == nil; retry++ { log := log.With("url", url, "retry", retry) + if err := t.flush.FlushOpportunity(ctx); err != nil { + return fmt.Errorf("failed to flush the filesystem: %w", err) + } + log.InfoContext(ctx, "Fetching tile started") resp, err := http.Get(url) @@ -211,6 +218,10 @@ func (t *tilesGenerator) fetchTile( log.InfoContext(ctx, "Tile uploaded to cinode", "bn", ep.BlobName().String()) + if err := t.flush.FlushOpportunity(ctx); err != nil { + return fmt.Errorf("failed to flush the filesystem: %w", err) + } + return nil } @@ -250,6 +261,11 @@ func (t *tilesGenerator) genXLayer( return err } } + + if err := t.flush.ColumnFinished(ctx, true); err != nil { + return fmt.Errorf("failed to flush the filesystem: %w", err) + } + return nil } @@ -288,13 +304,12 @@ func (t *tilesGenerator) genZLayer( if err != nil { return err } + } - // For region-based z layer, flush once every column for better persistency and faster results - err = t.fs.Flush(ctx) - if err != nil { - return fmt.Errorf("failed to flush the filesystem: %w", err) - } + if err := t.flush.ZLayerFinished(ctx); err != nil { + return fmt.Errorf("failed to flush the filesystem: %w", err) } + return nil } @@ -310,6 +325,7 @@ func (t *tilesGenerator) genZLayerNoConstraints( return err } } + return nil } @@ -329,6 +345,11 @@ func (t *tilesGenerator) genXLayerNoConstraints( return err } } + + if err := t.flush.ColumnFinished(ctx, false); err != nil { + return fmt.Errorf("failed to flush the filesystem: %w", err) + } + return nil } @@ -344,13 +365,11 @@ func (t *tilesGenerator) Process(ctx context.Context) error { if err != nil { return err } - err = t.fs.Flush(ctx) - if err != nil { - return fmt.Errorf("failed to flush the filesystem: %w", err) - } + } + + if err := t.flush.ProcessFinished(ctx); err != nil { + return fmt.Errorf("failed to flush the filesystem: %w", err) } return nil } - -// https://github.com/openstreetmap/mod_tile/pull/263#issuecomment-1006034286 diff --git a/helm/osm-machinery/templates/cinode-upload-config-configmap.yaml b/helm/osm-machinery/templates/cinode-upload-config-configmap.yaml index 9685c26..3f491c7 100644 --- a/helm/osm-machinery/templates/cinode-upload-config-configmap.yaml +++ b/helm/osm-machinery/templates/cinode-upload-config-configmap.yaml @@ -9,6 +9,11 @@ data: planetMaxZoom: {{ $.Values.cinodeUpload.planetMaxZoom | int }} maxTileDownloadRetries: {{ $.Values.cinodeUpload.maxTileDownloadRetries | int }} maxTileDownloadRetryDelaySec: {{ $.Values.cinodeUpload.maxTileDownloadRetryDelaySec | int }} + flushStrategy: + maxFlushInterval: 1h + flushOnDetailedColumnFinished: true + flushOnColumnFinished: true + flushOnZLayerFinished: true detailedRegions: {{- range $.Values.regions }} {{- if .enabled }} From ec85393907d36220f0897bcfe9832bdfa239a80a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20=C5=9Awi=C4=99cki?= Date: Wed, 28 May 2025 11:31:15 +0200 Subject: [PATCH 2/3] Add logging to flushing --- cmd/maps-tile-uploader/flush_strategy.go | 17 +++++++++++------ cmd/maps-tile-uploader/main.go | 9 ++++++--- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/cmd/maps-tile-uploader/flush_strategy.go b/cmd/maps-tile-uploader/flush_strategy.go index 54b341d..ec9fc45 100644 --- a/cmd/maps-tile-uploader/flush_strategy.go +++ b/cmd/maps-tile-uploader/flush_strategy.go @@ -2,6 +2,7 @@ package main import ( "context" + "log/slog" "time" "github.com/cinode/go/pkg/cinodefs" @@ -18,11 +19,13 @@ func NewFlushStrategy( fs cinodefs.FS, cfg FlushStrategyConfig, timeSource func() time.Time, + log *slog.Logger, ) FlushStrategy { return &flushStrategy{ fs: fs, cfg: cfg, timeSource: timeSource, + log: log, } } @@ -38,9 +41,11 @@ type flushStrategy struct { lastFlushTime time.Time cfg FlushStrategyConfig timeSource func() time.Time + log *slog.Logger } -func (f *flushStrategy) flush(ctx context.Context) error { +func (f *flushStrategy) flush(ctx context.Context, reason string) error { + f.log.InfoContext(ctx, "Flushing filesystem", "reason", reason) if err := f.fs.Flush(ctx); err != nil { return err } @@ -59,16 +64,16 @@ func (f *flushStrategy) FlushOpportunity(ctx context.Context) error { return nil } - return f.flush(ctx) + return f.flush(ctx, "maxFlushInterval reached") } func (f *flushStrategy) ColumnFinished(ctx context.Context, isDetailedRegion bool) error { if isDetailedRegion && f.cfg.FlushOnDetailedColumnFinished { - return f.flush(ctx) + return f.flush(ctx, "detailed column finished") } if f.cfg.FlushOnColumnFinished { - return f.flush(ctx) + return f.flush(ctx, "column finished") } return nil @@ -76,12 +81,12 @@ func (f *flushStrategy) ColumnFinished(ctx context.Context, isDetailedRegion boo func (f *flushStrategy) ZLayerFinished(ctx context.Context) error { if f.cfg.FlushOnZLayerFinished { - return f.flush(ctx) + return f.flush(ctx, "zoom layer finished") } return nil } func (f *flushStrategy) ProcessFinished(ctx context.Context) error { - return f.flush(ctx) + return f.flush(ctx, "process finished") } diff --git a/cmd/maps-tile-uploader/main.go b/cmd/maps-tile-uploader/main.go index 0823456..559482f 100644 --- a/cmd/maps-tile-uploader/main.go +++ b/cmd/maps-tile-uploader/main.go @@ -98,16 +98,19 @@ func main() { fmt.Printf(" WriterInfo: %s\n", golang.Must(fs.RootWriterInfo(ctx))) } + log := slog.Default() + gen := tilesGenerator{ cfg: cfg, fs: fs, - log: slog.Default(), - flush: NewFlushStrategy(fs, cfg.FlushStrategy, time.Now), + log: log, + flush: NewFlushStrategy(fs, cfg.FlushStrategy, time.Now, log), } err = gen.Process(ctx) if err != nil { - log.Fatal(err) + log.ErrorContext(ctx, "Failed to process tiles", "err", err) + os.Exit(1) } } From 6cd4a815ef587fae58bb33f706127c5d25d08cf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20=C5=9Awi=C4=99cki?= Date: Wed, 28 May 2025 11:51:47 +0200 Subject: [PATCH 3/3] Add script for quick rebuild and redeployment on local minikube --- .dockerignore | 1 + toolbox/minikube-test.sh | 48 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100755 toolbox/minikube-test.sh diff --git a/.dockerignore b/.dockerignore index 56aff21..86a05f8 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,6 +1,7 @@ deploy/ build/ helm/ +toolbox/ .git/ .github/ .vscode/ diff --git a/toolbox/minikube-test.sh b/toolbox/minikube-test.sh new file mode 100755 index 0000000..81c5dc9 --- /dev/null +++ b/toolbox/minikube-test.sh @@ -0,0 +1,48 @@ +#!/bin/bash + +set -euo pipefail + +cd "$(dirname "$0")/.." + +if ! minikube status | grep -q "Running"; then + echo "Minikube is not running" + exit 1 +fi + +eval "$(minikube docker-env)" + +if git diff --quiet; then + TAGNAME="devel-$(git rev-parse --short HEAD)" +else + TAGNAME="devel-$(git rev-parse --short HEAD)-dirty-$(date +%s)" +fi + +for image in \ + maps-tile-uploader \ + maps-tile-server \ +; do + docker build -t cinode/$image:$TAGNAME -f build/docker/Dockerfile.${image} . +done + +VALUES_CONTENT="--- +cinodeUpload: + image: + tag: $TAGNAME + registry: docker.io + repository: cinode/maps-tile-uploader + pullPolicy: Never +tileServer: + image: + tag: $TAGNAME + registry: docker.io + repository: cinode/maps-tile-server + pullPolicy: Never +" + +helm \ + upgrade --install \ + cinode-maps-tile-server \ + ./helm/osm-machinery \ + --kube-context minikube \ + --values ./helm/osm-machinery/values.yaml \ + --values <( echo "$VALUES_CONTENT" )