Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4b52052
ore: add pager feature flag
antiguru May 4, 2026
13d3239
ore: skeleton mz_ore::pager module with Backend enum
antiguru May 4, 2026
8dea6b4
ore: pager scratch dir lifecycle and stale-subdir reaper
antiguru May 4, 2026
01d5359
ore: pager Handle type and inner storage scaffolding
antiguru May 4, 2026
027334b
ore: pager swap backend pageout with MADV_COLD
antiguru May 4, 2026
1c27263
ore: pager swap backend read_at_many
antiguru May 4, 2026
c1522ab
ore: pager swap backend take with zero-copy fast path
antiguru May 4, 2026
b9e1d10
ore: pager public dispatch surface (pageout/read_at/take)
antiguru May 4, 2026
70f035c
ore: pager file backend pageout with pwritev
antiguru May 4, 2026
8c5b92b
ore: pager file backend read_at_many with coalescing
antiguru May 4, 2026
8db7d6c
ore: pager file backend take and drop reclaim
antiguru May 4, 2026
83f8c34
ore: pager cross-backend integration tests
antiguru May 4, 2026
84189cd
ore: pager Criterion bench harness
antiguru May 4, 2026
e584512
ore: pager clippy + lint cleanups (write_vectored, cast_from, exhaust…
antiguru May 4, 2026
e469b6b
ore: pager copyright headers and test-attribute lint compliance
antiguru May 4, 2026
4cf0cd8
ore: pager bench round-trip with touch-every-page readback
antiguru May 4, 2026
3cb8968
ore: pager merge-batcher example with cache-line touch
antiguru May 4, 2026
7569984
ore: update Cargo.lock for pager tempfile dev-dep
antiguru May 4, 2026
98b4717
ore: pager prefetch and prefetch_at hints
antiguru May 5, 2026
2fd9d3a
ore: replace as_conversions with cast_from/cast_lossy/try_from
antiguru May 5, 2026
b65569e
ore: pager merge example takes --threads, partitions chain
antiguru May 5, 2026
2bbe256
doc: pager design — add operational characteristics with measured thr…
antiguru May 5, 2026
f6878e3
ore: gate pager_merge example on the pager feature
antiguru May 5, 2026
ea3a51e
ore: drop pager prefetch API and example usage
antiguru May 5, 2026
0f6602f
doc: pager design — add r8gd.16xlarge bench, retract swap-caps-regard…
antiguru May 5, 2026
8844e0f
ore: pageout_with helper for explicit-backend dispatch
antiguru May 14, 2026
1274f63
timely-util: column_pager with policy + lz4
antiguru May 14, 2026
aef6d53
timely-util: tiered paging policy + drop-based release
antiguru May 14, 2026
9054eba
timely-util: criterion bench for column_pager
antiguru May 14, 2026
e5f246d
timely-util: relabel swap-backend bench as swap-warm
antiguru May 14, 2026
898640b
cargo: add lz4_flex workspace dep
DAlperin May 19, 2026
199d91a
compute: column-paged merge batcher
DAlperin May 18, 2026
161d66a
timely-util,feature-benchmark: benches for the column-paged batcher
DAlperin May 19, 2026
ecd5a88
lint fixes
DAlperin May 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ launchdarkly-server-sdk = { version = "2.6.2", default-features = false }
lgalloc = "0.6.0"
libc = "0.2.184"
lru = "0.16.3"
lz4_flex = { version = "0.12.1", default-features = false, features = ["frame"] }
maplit = "1.0.2"
mappings = "0.7.2"
md-5 = "0.10.6"
Expand Down
411 changes: 411 additions & 0 deletions doc/developer/design/20260504_pager.md

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions misc/python/materialize/feature_benchmark/benchmark_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ def this_as_str(self) -> str:
return f"{self.this():>11.3f}"

def other(self) -> T:
# `_points` has length 1 when the runner ran only the THIS side
# (e.g. `--skip-other`); treat the absent baseline as `None` so
# report rendering falls through to its `None` formatting.
if len(self._points) < 2:
return None # type: ignore[return-value]
return self._points[1]

def other_as_str(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ def get_threshold(self, metric: BenchmarkScenarioMetric) -> float:
return self.threshold_by_measurement_type[metric.measurement_type]

def ratio(self, metric: BenchmarkScenarioMetric) -> float | None:
if metric._points[0] is None or metric._points[1] is None:
# `_points` has length 1 when the runner ran only the THIS side
# (e.g. `--skip-other`); there's no baseline to compare against.
if (
len(metric._points) < 2
or metric._points[0] is None
or metric._points[1] is None
):
return None
else:
return metric._points[0] / metric._points[1]
Expand Down
134 changes: 134 additions & 0 deletions misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,140 @@ def benchmark(self) -> MeasurementSource:
""")


class DifferentialJoinColumnPaged(Dataflow):
"""Same shape as `DifferentialJoin`, but with the column-paged merge
batcher enabled for the linear-join arrange stage.

Compare against `DifferentialJoin` to gauge the steady-state overhead of
the paged path (resident chunks plus byte-budget bookkeeping) when no
pressure forces spill. To measure spill cost, see
`DifferentialJoinHydrationFile`.
"""

def init(self) -> list[Action]:
return [
self.view_ten(),
TdAction(f"""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET enable_column_paged_batcher = true;

> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
"""),
]

def benchmark(self) -> MeasurementSource:
return Td(f"""
> SELECT 1;
/* A */
1


> SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1)
/* B */
{self.n()}
""")


class DifferentialJoinHydration(Dataflow):
"""Non-leaf parent for the linear-join hydration benchmark family.

Holds the shared `init` / `benchmark` (replica-toggle hydration loop) so
Baseline and File variants only need to override `shared()` with the
dyncfgs they want set. Has subclasses, so the feature-benchmark runner
treats it as non-leaf and never executes it directly — pick one of the
leaf classes via `--root-scenario`.

Run both leaves under a memory-capped Materialized (`--this-memory=2g`)
so the baseline has to swap and the paged-file variant has somewhere
predictable to spill.
"""

# SCALE=8 → 100M rows per side, ~1.6 GiB per side input. Two sides plus
# the join arrangement (typically 2–4× input) reliably exceeds a few
# GiB total; a 2g container cap forces real swap pressure on the
# baseline. File variant's 16 MiB per-worker + 128 MiB shared budget
# means almost everything spills under that cap.
SCALE = 8

def init(self) -> list[Action]:
# `v1` lives on the default cluster, not `join_cluster`, so the
# replication-factor toggle in `benchmark` only tears down `v2`'s
# dataflow. Keeps the measurement scoped to the join-arrangement
# rebuild we're trying to measure.
return [
self.view_ten(),
TdAction(f"""
> CREATE MATERIALIZED VIEW v1
AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
> SELECT COUNT(*) FROM v1
{self.n()}

> CREATE CLUSTER join_cluster SIZE 'scale=1,workers=16', REPLICATION FACTOR 1
"""),
]

def benchmark(self) -> MeasurementSource:
# Match HydrateIndex's pattern: take the cluster offline *before*
# defining the object so the dataflow doesn't pre-hydrate. The
# `REPLICATION FACTOR 1` flip after `/* A */` is the actual
# hydration trigger we want to time.
return Td(f"""
> DROP MATERIALIZED VIEW IF EXISTS v2

> ALTER CLUSTER join_cluster SET (REPLICATION FACTOR 0)

> CREATE MATERIALIZED VIEW v2
IN CLUSTER join_cluster
AS SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1)

> SELECT 1
/* A */
1
> ALTER CLUSTER join_cluster SET (REPLICATION FACTOR 1)
> SET CLUSTER = join_cluster
> SELECT * FROM v2
/* B */
{self.n()}
> SET CLUSTER = default
""")


class DifferentialJoinHydrationBaseline(DifferentialJoinHydration):
"""Hydration measurement with the paged batcher disabled (current
production path). Compare against `DifferentialJoinHydrationFile` to
see if user-space file-backed spill beats OS swap under pressure.
"""

def shared(self) -> Action:
return TdAction("""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET enable_column_paged_batcher = false;
""")


class DifferentialJoinHydrationFile(DifferentialJoinHydration):
"""Hydration time with the column-paged batcher on, file backend, and
a tight budget fraction so the merge-batcher transient spills rather
than competing with the spine for RAM.

`budget_fraction = 0.01` (1% of announced memory limit) lands in the
clamp floors of the worker-init derivation (per-worker 16 MiB,
shared 128 MiB), giving us the same effective sizing we benchmarked
before the fraction-knob refactor.
"""

def shared(self) -> Action:
return TdAction("""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET enable_column_paged_batcher = true;
ALTER SYSTEM SET column_paged_batcher_backend = 'file';
ALTER SYSTEM SET column_paged_batcher_budget_fraction = 0.01;
""")


class FullOuterJoin(Dataflow):
def benchmark(self) -> BenchmarkingSequence:
columns_select = ", ".join(
Expand Down
3 changes: 3 additions & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,9 @@ def get_default_system_parameters(
"enable_mcp_developer",
"mcp_max_response_size",
"user_id_pool_batch_size",
"enable_column_paged_batcher",
"column_paged_batcher_budget_fraction",
"column_paged_batcher_backend",
]


Expand Down
7 changes: 7 additions & 0 deletions misc/python/materialize/mzcompose/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ class ServiceConfig(TypedDict, total=False):
TODO(benesch): this should use a nested TypedDict.
"""

memswap_limit: str | int
"""Total memory limit (memory + swap). Set greater than the memory limit to
enable host swap usage under memory pressure."""

mem_swappiness: int
"""Kernel swappiness for the container (0-100)."""

ulimits: dict[str, Any]
"""Override the default ulimits for a container."""

Expand Down
13 changes: 13 additions & 0 deletions misc/python/materialize/mzcompose/services/clusterd.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def __init__(
environment_id: str | None = None,
environment_extra: list[str] = [],
memory: str | None = None,
memory_swap: str | None = None,
mem_swappiness: int | None = None,
cpu: str | None = None,
options: list[str] = [],
restart: str = "no",
Expand Down Expand Up @@ -93,6 +95,17 @@ def __init__(
limits["cpus"] = cpu
config["deploy"] = {"resources": {"limits": limits}}

# Swap controls aren't part of compose's `deploy.resources` schema; they
# live as top-level compose v2 service keys (`memswap_limit`,
# `mem_swappiness`). Setting `memswap_limit > mem_limit` enables the
# container to use host swap when RAM pressure builds, which lets the
# kernel page out anonymous memory rather than OOM-killing. Useful for
# benchmarking "OS swap" as a baseline vs application-managed spill.
if memory_swap is not None:
config["memswap_limit"] = memory_swap
if mem_swappiness is not None:
config["mem_swappiness"] = mem_swappiness

config.update(
{
"command": options,
Expand Down
11 changes: 11 additions & 0 deletions misc/python/materialize/mzcompose/services/materialized.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def __init__(
volumes_extra: list[str] = [],
depends_on: list[str] = [],
memory: str | None = None,
memory_swap: str | None = None,
mem_swappiness: int | None = None,
cpu: str | None = None,
options: list[str] = [],
persist_blob_url: str | None = None,
Expand Down Expand Up @@ -332,6 +334,15 @@ def __init__(
limits["cpus"] = cpu
config["deploy"] = {"resources": {"limits": limits}}

# Swap controls live as top-level compose v2 service keys, not under
# `deploy.resources`. `memswap_limit > mem_limit` lets the container use
# host swap so the kernel can page out anonymous memory rather than OOM.
# `mem_swappiness=100` biases the kernel toward swapping aggressively.
if memory_swap is not None:
config["memswap_limit"] = memory_swap
if mem_swappiness is not None:
config["mem_swappiness"] = mem_swappiness

if sanity_restart:
# Workaround for https://github.com/docker/compose/issues/11133
config["labels"] = {"sanity_restart": True}
Expand Down
3 changes: 3 additions & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,9 @@ def __init__(
"oidc_group_role_sync_strict",
"console_oidc_client_id",
"console_oidc_scopes",
"enable_column_paged_batcher",
"column_paged_batcher_budget_fraction",
"column_paged_batcher_backend",
]

def run(self, exe: Executor) -> bool:
Expand Down
48 changes: 48 additions & 0 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,51 @@ pub const ENABLE_HALF_JOIN2: Config<bool> = Config::new(
"Whether compute should use `half_join2` rather than DD's `half_join` to render delta joins.",
);

/// Install the column-pageable merge batcher on each compute worker, so
/// arrangements that route through it can spill chunks under memory
/// pressure rather than holding them all resident. Disabled by default;
/// the budget/backend knobs below tune the behavior when enabled.
pub const ENABLE_COLUMN_PAGED_BATCHER: Config<bool> = Config::new(
"enable_column_paged_batcher",
false,
"Install the column-paged merge batcher on each compute worker so it can spill under memory \
pressure.",
);

/// Total resident-byte budget the column-paged batcher's
/// [`TieredPolicy`](mz_timely_util::column_pager::policy::TieredPolicy)
/// is allowed to hold across all workers in this process, expressed as
/// a fraction of the replica's announced memory limit. Workers split
/// this between a per-worker local pool and a process-wide shared pool;
/// values beyond either pool spill to the configured backend.
///
/// `0.05` (5%) is a reasonable starting point: large enough that the
/// per-call ColumnBuilder ship-threshold (~2 MiB) fits multiple chunks
/// per worker, small enough that the merge-batcher's transient state
/// doesn't crowd out the spine. Set lower to spill more aggressively
/// under pressure; set `0.0` to spill on every chunk (sanity check only).
/// Ignored when `enable_column_paged_batcher` is `false`.
pub const COLUMN_PAGED_BATCHER_BUDGET_FRACTION: Config<f64> = Config::new(
"column_paged_batcher_budget_fraction",
0.05,
"Fraction of replica memory the column-paged batcher's tiered policy may hold resident \
before spilling to the backend. Total budget = mem_limit * fraction; split 1/8 per-worker \
local (clamped 16-64 MiB) and 7/8 shared (clamped 128 MiB - 1 GiB).",
);

/// Backend to which the column-paged batcher spills chunks once both the
/// per-worker and shared budgets are exhausted. `"swap"` keeps the bytes
/// in process memory (the OS swap subsystem may page them out under
/// pressure); `"file"` writes them to compute's scratch directory under
/// our control. The file backend requires `--scratch-directory` to be
/// configured on clusterd; if absent, the worker falls back to swap and
/// logs a warning. Ignored when `enable_column_paged_batcher` is `false`.
pub const COLUMN_PAGED_BATCHER_BACKEND: Config<&str> = Config::new(
"column_paged_batcher_backend",
"swap",
"Backend for column-paged batcher spills: \"swap\" or \"file\".",
);

/// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`.
pub const ENABLE_MZ_JOIN_CORE: Config<bool> = Config::new(
"enable_mz_join_core",
Expand Down Expand Up @@ -424,4 +469,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL)
.add(&SUBSCRIBE_SNAPSHOT_OPTIMIZATION)
.add(&MV_SINK_ADVANCE_PERSIST_FRONTIERS)
.add(&ENABLE_COLUMN_PAGED_BATCHER)
.add(&COLUMN_PAGED_BATCHER_BUDGET_FRACTION)
.add(&COLUMN_PAGED_BATCHER_BACKEND)
}
Loading
Loading