From 641f3fcba36d7e7186411423442ff3659e01efc7 Mon Sep 17 00:00:00 2001 From: Erik Darling <2136037+erikdarlingdata@users.noreply.github.com> Date: Tue, 12 May 2026 06:25:21 +0200 Subject: [PATCH] =?UTF-8?q?Fix=20#933=20=E2=80=94=20raise=20compaction=20m?= =?UTF-8?q?emory=5Flimit=20to=204=20GB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #942 lowered the cap to 1 GB on the theory that a tight memory_limit plus temp_directory would force DuckDB to spill earlier and keep peak working set down. That validation ran against query_stats (narrow, ~1.7M rows) and showed peak 1236 MB → 166 MB. The reporter's actual failure is on query_snapshots, which carries query_text + query_plan + live_query_plan per row. With the 1 GB cap, the nightly logs show OOM at "906/953 MiB used" before any merge progress. The standalone reproducer (tools/CompactionRepro) confirms the cause: parquet COPY in DuckDB v1.5.2 makes allocations that bypass the buffer manager and can't be spilled. The cap acts as a hard ceiling for those, not a spill trigger. Spill on disk = 0 MB across every configuration we tested (memory_limit 1/2/4 GB, accumulator vs tournament merge, threads 1 vs 2, :memory: vs file-backed DB). The same failure reproduces in standalone DuckDB CLI v1.5.2, so it's an engine issue — see upstream issues duckdb#16482 and discussion#10084. DuckDB's own OOM guide explicitly warns about this case and recommends memory_limit at 50-60% of system RAM, not a tight cap. 4 GB sits well inside that range for typical workstation/server hosts and leaves real headroom on top of the un-spillable allocations. Reporter's actual file sizes (15-25 chunks of 2-6 MB plus a 35-45 MB monthly file per group) are well below the level where 4 GB has any trouble. The reproducer confirms 4 GB succeeds on a synthetic query_snapshots-shaped dataset of ~1.5 GB with peak working set of ~400 MB; the reporter's data is ~143 MB at worst. Also updates the stale comment about spilling — temp_directory was set per #935 but the buffer-manager-bypassing allocations don't use it. The comment now describes what actually happens. The tools/CompactionRepro changes add --strategy {accumulator|tournament}, --db-mode {memory|file}, --merge-files, --synthetic data generation, and --cycles for leak testing. These are kept so a future regression in this area can be reproduced and diagnosed quickly. Co-Authored-By: Claude Opus 4.7 (1M context) --- Lite/Services/ArchiveService.cs | 32 ++- tools/CompactionRepro/Program.cs | 411 ++++++++++++++++++++++++++----- 2 files changed, 376 insertions(+), 67 deletions(-) diff --git a/Lite/Services/ArchiveService.cs b/Lite/Services/ArchiveService.cs index f5f68e9..aa44a3c 100644 --- a/Lite/Services/ArchiveService.cs +++ b/Lite/Services/ArchiveService.cs @@ -332,10 +332,14 @@ Each group gets its own DuckDB connection so memory is fully released between gr var totalMerged = 0; var totalRemoved = 0; - /* Spill directory for the in-memory compaction connections. Without this, - the memory_limit pragma is a hard wall — DuckDB has nowhere to spill and - OOMs the moment the cap is hit. Co-locating with the archive keeps the - write on the same volume the parquet files already live on. */ + /* Spill directory for the in-memory compaction connections. Set per #935 + so DuckDB has somewhere to page if it chooses to. In practice (see #933) + the parquet COPY path uses allocations that bypass the buffer manager + and never actually spill — DuckDB's own OOM guide warns about this. We + keep the dir set for any code path that *can* spill, but memory_limit + below has to leave real headroom on top of those un-spillable allocs. + Co-locating with the archive keeps the write on the same volume the + parquet files already live on. */ var spillDir = Path.Combine(_archivePath, "duckdb_tmp"); Directory.CreateDirectory(spillDir); var spillDirSql = spillDir.Replace("\\", "/"); @@ -389,12 +393,26 @@ write on the same volume the parquet files already live on. */ if (sourcePaths.Count <= 2) { - /* Small group — single-pass merge */ + /* Small group — single-pass merge. + + Pragma tuning (history per #933): + - memory_limit = 4GB: parquet COPY does allocations that + bypass the buffer manager and can't be spilled. The cap + is effectively a hard ceiling for those, not a spill + trigger. At 1GB (the prior value) the reproducer dies + at ~900/953 MiB used before any rows are read. 4GB + leaves enough headroom for query_snapshots-shaped data + (wide VARCHAR plan XML) and aligns with DuckDB's OOM + guide recommendation of 50-60% of system RAM. + - threads = 2: fewer per-thread row-group buffers in flight. + - ROW_GROUP_SIZE 8192: smaller buffered batch per group. + - preserve_insertion_order = false: lets DuckDB stream. + See tools/CompactionRepro for the stress reproducer. */ using var con = new DuckDBConnection("DataSource=:memory:"); con.Open(); using (var pragma = con.CreateCommand()) { - pragma.CommandText = $"SET memory_limit = '1GB'; SET threads = 2; SET preserve_insertion_order = false; SET temp_directory = '{EscapeSqlPath(spillDirSql)}';"; + pragma.CommandText = $"SET memory_limit = '4GB'; SET threads = 2; SET preserve_insertion_order = false; SET temp_directory = '{EscapeSqlPath(spillDirSql)}';"; pragma.ExecuteNonQuery(); } @@ -425,7 +443,7 @@ Sort smallest-first so early merges are cheap. */ con.Open(); using (var pragma = con.CreateCommand()) { - pragma.CommandText = $"SET memory_limit = '1GB'; SET threads = 2; SET preserve_insertion_order = false; SET temp_directory = '{EscapeSqlPath(spillDirSql)}';"; + pragma.CommandText = $"SET memory_limit = '4GB'; SET threads = 2; SET preserve_insertion_order = false; SET temp_directory = '{EscapeSqlPath(spillDirSql)}';"; pragma.ExecuteNonQuery(); } diff --git a/tools/CompactionRepro/Program.cs b/tools/CompactionRepro/Program.cs index 399ced8..8dc9d23 100644 --- a/tools/CompactionRepro/Program.cs +++ b/tools/CompactionRepro/Program.cs @@ -5,74 +5,163 @@ * CompactionRepro — standalone reproducer for issue #933. * * Splits an existing monthly parquet file (like 202604_query_snapshots.parquet) - * into N per-cycle-shaped chunks, then runs the same pair-merge compaction - * logic ArchiveService.CompactParquetFiles uses, with knobs you can flip on - * the command line. The split chunks have the exact row shape that caused - * the user's OOM in #933. + * into N per-cycle-shaped chunks, then runs ArchiveService.CompactParquetFiles' + * merge logic with knobs you can flip on the command line. The split chunks + * have the exact row shape that caused the user's OOM in #933. * - * Compare OLD vs NEW tuning by running the same data shape twice with - * different --memory-limit / --threads / --row-group-size values. + * Two merge strategies: + * accumulator (current prod) — sort smallest-first, fold each next file + * into a growing accumulator. Peak input size + * grows linearly with file count. + * tournament (proposed fix) — pair files in rounds; each round halves the + * file count. Each merge step's inputs stay + * balanced. Final round still merges ~all the + * data but every intermediate step is smaller. * * Usage: * dotnet run -- --source-file [options] * - * Options (defaults match the proposed NEW tuning): + * Options: * --source-file Required. Path to a monthly parquet file to split & merge. + * --strategy accumulator | tournament. Default: tournament * --memory-limit DuckDB memory_limit (e.g. "1GB", "4GB"). Default: 1GB - * --threads DuckDB threads. 0 = DuckDB default. Default: 2 + * --threads DuckDB threads. 0 = DuckDB default. Default: 1 * --row-group-size Output ROW_GROUP_SIZE. Default: 8192 * --num-files Number of split chunks. Default: 15 * --keep Don't delete temp dir after run (for inspection) * * Examples: - * # NEW tuning (the proposed fix) on real query_snapshots data + * # Proposed fix: tournament merge, threads=1 * dotnet run -- --source-file "$LOCALAPPDATA/PerformanceMonitorLite/archive/202604_query_snapshots.parquet" \ - * --memory-limit 1GB --threads 2 --row-group-size 8192 + * --strategy tournament --threads 1 * - * # OLD tuning (current production) — should reproduce the OOM + * # Reproduce the current OOM: accumulator + threads=2 (matches prod after #942) * dotnet run -- --source-file "$LOCALAPPDATA/PerformanceMonitorLite/archive/202604_query_snapshots.parquet" \ - * --memory-limit 4GB --threads 0 --row-group-size 122880 + * --strategy accumulator --threads 2 + * + * # Isolate the thread-count effect on the existing accumulator strategy + * dotnet run -- --source-file "$LOCALAPPDATA/PerformanceMonitorLite/archive/202604_query_snapshots.parquet" \ + * --strategy accumulator --threads 1 */ var sourceFile = GetArg(args, "--source-file", ""); -if (string.IsNullOrEmpty(sourceFile)) +var mergeFilesArg = GetArg(args, "--merge-files", ""); +var synthetic = args.Contains("--synthetic"); +var syntheticRows = int.Parse(GetArg(args, "--synthetic-rows", "30000")); +var syntheticPlanKb = int.Parse(GetArg(args, "--synthetic-plan-kb", "100")); +if (string.IsNullOrEmpty(sourceFile) && string.IsNullOrEmpty(mergeFilesArg) && !synthetic) { - Console.Error.WriteLine("error: --source-file is required"); - Console.Error.WriteLine("Try: --source-file \"$LOCALAPPDATA/PerformanceMonitorLite/archive/202604_query_snapshots.parquet\""); + Console.Error.WriteLine("error: --source-file OR --merge-files OR --synthetic required"); + Console.Error.WriteLine(" --source-file: split the given monthly parquet into chunks, then compact (full repro)"); + Console.Error.WriteLine(" --merge-files: merge the given comma-separated files directly (skip split)"); + Console.Error.WriteLine(" --synthetic: generate a query_snapshots-shaped source file (see --synthetic-rows, --synthetic-plan-kb)"); return 2; } -if (!File.Exists(sourceFile)) +if (!string.IsNullOrEmpty(sourceFile) && !File.Exists(sourceFile)) { Console.Error.WriteLine($"error: source file not found: {sourceFile}"); return 2; } +var strategy = GetArg(args, "--strategy", "tournament").ToLowerInvariant(); +if (strategy != "accumulator" && strategy != "tournament") +{ + Console.Error.WriteLine($"error: --strategy must be 'accumulator' or 'tournament', got '{strategy}'"); + return 2; +} +var dbMode = GetArg(args, "--db-mode", "memory").ToLowerInvariant(); +if (dbMode != "memory" && dbMode != "file") +{ + Console.Error.WriteLine($"error: --db-mode must be 'memory' or 'file', got '{dbMode}'"); + return 2; +} var memoryLimit = GetArg(args, "--memory-limit", "1GB"); -var threads = int.Parse(GetArg(args, "--threads", "2")); +var threads = int.Parse(GetArg(args, "--threads", "1")); var rowGroupSize = int.Parse(GetArg(args, "--row-group-size", "8192")); var numFiles = int.Parse(GetArg(args, "--num-files", "15")); +var cycles = int.Parse(GetArg(args, "--cycles", "1")); var keep = args.Contains("--keep"); var tempDir = Path.Combine(Path.GetTempPath(), $"CompactionRepro_{Guid.NewGuid():N}"); Directory.CreateDirectory(tempDir); -Console.WriteLine($"Source: {sourceFile} ({new FileInfo(sourceFile).Length / 1024.0 / 1024.0:F1} MB)"); +var mergeFiles = string.IsNullOrEmpty(mergeFilesArg) + ? new List() + : mergeFilesArg.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries).ToList(); +foreach (var mf in mergeFiles) +{ + if (!File.Exists(mf)) + { + Console.Error.WriteLine($"error: --merge-files entry not found: {mf}"); + return 2; + } +} +if (synthetic) +{ + sourceFile = Path.Combine(tempDir, "synthetic_query_snapshots.parquet").Replace("\\", "/"); + Console.WriteLine($"Mode: synthetic+split+merge"); + Console.WriteLine($"Synthetic: {syntheticRows} rows, ~{syntheticPlanKb} KB plan XML per row"); + Console.WriteLine($"Source: {sourceFile} (will be generated)"); +} +else if (mergeFiles.Count > 0) +{ + Console.WriteLine($"Mode: merge-files (no split, isolate compaction)"); + Console.WriteLine($"Inputs:"); + foreach (var mf in mergeFiles) + Console.WriteLine($" {mf} ({new FileInfo(mf).Length / 1024.0 / 1024.0:F1} MB)"); +} +else +{ + Console.WriteLine($"Mode: split+merge (full repro)"); + Console.WriteLine($"Source: {sourceFile} ({new FileInfo(sourceFile).Length / 1024.0 / 1024.0:F1} MB)"); +} Console.WriteLine($"Temp dir: {tempDir}"); +/* Print engine version so we can correlate with standalone DuckDB CLI tests */ +using (var versionCon = new DuckDBConnection("DataSource=:memory:")) +{ + versionCon.Open(); + using var versionCmd = versionCon.CreateCommand(); + versionCmd.CommandText = "SELECT version()"; + Console.WriteLine($"Engine: DuckDB {versionCmd.ExecuteScalar()}"); +} +Console.WriteLine($"Strategy: {strategy}"); +Console.WriteLine($"DB mode: {dbMode}"); Console.WriteLine($"Settings: memory_limit={memoryLimit}, threads={threads}, ROW_GROUP_SIZE={rowGroupSize}"); -Console.WriteLine($"Splitting source into {numFiles} chunks"); +if (mergeFiles.Count == 0) + Console.WriteLine($"Splitting source into {numFiles} chunks"); Console.WriteLine(); try { - Console.WriteLine($"[1/3] Splitting source file into {numFiles} chunks..."); - var sw = Stopwatch.StartNew(); - var sourcePaths = SplitSourceFile(sourceFile, tempDir, numFiles); - sw.Stop(); - var totalSourceBytes = sourcePaths.Sum(p => new FileInfo(p).Length); - Console.WriteLine($" Wrote {sourcePaths.Count} files, {totalSourceBytes / 1024.0 / 1024.0:F1} MB total in {sw.ElapsedMilliseconds} ms"); + if (synthetic) + { + Console.WriteLine($"[0/3] Generating synthetic source ({syntheticRows} rows, ~{syntheticPlanKb} KB plan/row)..."); + var sw = Stopwatch.StartNew(); + GenerateSyntheticSource(sourceFile, syntheticRows, syntheticPlanKb); + sw.Stop(); + var size = new FileInfo(sourceFile).Length / 1024.0 / 1024.0; + Console.WriteLine($" Generated {size:F1} MB in {sw.ElapsedMilliseconds} ms"); + Console.WriteLine(); + } + + List sourcePaths; + if (mergeFiles.Count > 0) + { + Console.WriteLine($"[1/3] Skipping split — using {mergeFiles.Count} provided files"); + sourcePaths = mergeFiles; + } + else + { + Console.WriteLine($"[1/3] Splitting source file into {numFiles} chunks..."); + var sw = Stopwatch.StartNew(); + sourcePaths = SplitSourceFile(sourceFile, tempDir, numFiles); + sw.Stop(); + var totalSourceBytes = sourcePaths.Sum(p => new FileInfo(p).Length); + Console.WriteLine($" Wrote {sourcePaths.Count} files, {totalSourceBytes / 1024.0 / 1024.0:F1} MB total in {sw.ElapsedMilliseconds} ms"); + } Console.WriteLine(); - Console.WriteLine("[2/3] Running pair-merge compaction (mirrors ArchiveService.CompactParquetFiles)..."); + Console.WriteLine($"[2/3] Running pair-merge compaction (mirrors ArchiveService.CompactParquetFiles), {cycles} cycle(s)..."); var spillDir = Path.Combine(tempDir, "duckdb_tmp").Replace("\\", "/"); Directory.CreateDirectory(spillDir); @@ -80,30 +169,38 @@ var process = Process.GetCurrentProcess(); var startBytes = GC.GetTotalMemory(forceFullCollection: true); var startWorkingSet = process.WorkingSet64; + Console.WriteLine($" baseline working set (after GC): {startWorkingSet / 1024.0 / 1024.0:F0} MB"); var compactionSw = Stopwatch.StartNew(); var peakWorkingSet = startWorkingSet; long compactedFileBytes = 0; + var perCycleWorkingSet = new List<(long peak, long postGc)>(); var success = false; string? failureMessage = null; - try - { - /* Sort smallest-first like ArchiveService does */ - var sorted = sourcePaths - .OrderBy(p => new FileInfo(p).Length) - .ToList(); + var stepCounter = 0; + var intermediates = new List(); - var currentPath = sorted[0]; - var intermediateFiles = new List(); - - for (var i = 1; i < sorted.Count; i++) + var perStepDbCounter = 0; + void MergePair(string aPath, string bPath, string outPath, int stepIndex, int totalSteps) + { + string dataSource; + string? dbFile = null; + if (dbMode == "file") { - var stepOutput = i < sorted.Count - 1 - ? targetPath + $".step{i}.tmp" - : targetPath; + /* Each merge gets its own fresh on-disk database so DuckDB has real + paging room. Deleted after the step to avoid disk bloat. */ + dbFile = Path.Combine(tempDir, $"merge_{++perStepDbCounter}.duckdb").Replace("\\", "/"); + dataSource = $"DataSource={dbFile}"; + } + else + { + dataSource = "DataSource=:memory:"; + } - using var con = new DuckDBConnection("DataSource=:memory:"); + try + { + using var con = new DuckDBConnection(dataSource); con.Open(); using (var pragma = con.CreateCommand()) { @@ -116,35 +213,140 @@ pragma.ExecuteNonQuery(); } - var pairList = $"'{currentPath.Replace("'", "''")}', '{sorted[i].Replace("'", "''")}'"; + var pairList = $"'{aPath.Replace("'", "''")}', '{bPath.Replace("'", "''")}'"; using var cmd = con.CreateCommand(); cmd.CommandText = $"COPY (SELECT * FROM read_parquet([{pairList}], union_by_name=true)) " + - $"TO '{stepOutput.Replace("'", "''")}' " + + $"TO '{outPath.Replace("'", "''")}' " + $"(FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE {rowGroupSize})"; cmd.ExecuteNonQuery(); + } + finally + { + if (dbFile != null) + { + try { File.Delete(dbFile); } catch { } + try { File.Delete(dbFile + ".wal"); } catch { } + } + } + + process.Refresh(); + if (process.WorkingSet64 > peakWorkingSet) peakWorkingSet = process.WorkingSet64; + + var aSize = new FileInfo(aPath).Length / 1024.0 / 1024.0; + var bSize = new FileInfo(bPath).Length / 1024.0 / 1024.0; + var outSize = new FileInfo(outPath).Length / 1024.0 / 1024.0; + Console.WriteLine($" step {stepIndex}/{totalSteps}: {aSize:F1} + {bSize:F1} -> {outSize:F1} MB | peak WS {peakWorkingSet / 1024.0 / 1024.0:F0} MB"); + } + + string NewIntermediatePath() + { + var p = targetPath + $".step{++stepCounter}.tmp"; + intermediates.Add(p); + return p; + } + + for (var cycle = 1; cycle <= cycles; cycle++) + { + if (cycles > 1) Console.WriteLine($" --- cycle {cycle}/{cycles} ---"); - process.Refresh(); - if (process.WorkingSet64 > peakWorkingSet) peakWorkingSet = process.WorkingSet64; + /* Reset per-cycle state */ + stepCounter = 0; + intermediates.Clear(); + perStepDbCounter = 0; + if (File.Exists(targetPath)) File.Delete(targetPath); - if (intermediateFiles.Count > 0) + var cycleStartPeak = peakWorkingSet; + + try + { + if (strategy == "accumulator") { - var prev = intermediateFiles[^1]; - try { File.Delete(prev); } catch { } + /* Sort smallest-first like current production ArchiveService does */ + var sorted = sourcePaths + .OrderBy(p => new FileInfo(p).Length) + .ToList(); + + var currentPath = sorted[0]; + var totalSteps = sorted.Count - 1; + + for (var i = 1; i < sorted.Count; i++) + { + var isFinal = i == sorted.Count - 1; + var stepOutput = isFinal ? targetPath : NewIntermediatePath(); + + MergePair(currentPath, sorted[i], stepOutput, i, totalSteps); + + if (i >= 2) + { + var prev = intermediates[^2]; + try { File.Delete(prev); } catch { } + } + + currentPath = stepOutput; + } } + else /* tournament */ + { + var current = new List(sourcePaths); + var totalSteps = current.Count - 1; + + while (current.Count > 1) + { + var next = new List(); + var pairs = current.Count / 2; + + for (var i = 0; i < pairs; i++) + { + var aPath = current[i * 2]; + var bPath = current[i * 2 + 1]; + var isLastMerge = current.Count == 2; + var outPath = isLastMerge ? targetPath : NewIntermediatePath(); - intermediateFiles.Add(stepOutput); - currentPath = stepOutput; + MergePair(aPath, bPath, outPath, stepCounter, totalSteps); + next.Add(outPath); - Console.WriteLine($" step {i}/{sorted.Count - 1}: peak working set {peakWorkingSet / 1024.0 / 1024.0:F0} MB"); + if (intermediates.Contains(aPath)) + { + try { File.Delete(aPath); } catch { } + } + if (intermediates.Contains(bPath)) + { + try { File.Delete(bPath); } catch { } + } + } + + if (current.Count % 2 == 1) + { + next.Add(current[^1]); + } + + current = next; + } + } + + compactedFileBytes = new FileInfo(targetPath).Length; + success = true; + } + catch (Exception ex) + { + failureMessage = ex.Message; + break; } - compactedFileBytes = new FileInfo(targetPath).Length; - success = true; - } - catch (Exception ex) - { - failureMessage = ex.Message; + /* Post-cycle measurement: force GC and sample working set after a brief + pause to let native finalizers run. This is what the user cares about — + does memory release between archival cycles? */ + var cyclePeak = peakWorkingSet; + GC.Collect(); + GC.WaitForPendingFinalizers(); + GC.Collect(); + Thread.Sleep(500); + process.Refresh(); + var postGc = process.WorkingSet64; + perCycleWorkingSet.Add((cyclePeak - cycleStartPeak, postGc)); + if (cycles > 1) + Console.WriteLine($" cycle {cycle} peak +{(cyclePeak - cycleStartPeak) / 1024.0 / 1024.0:F0} MB | post-GC WS {postGc / 1024.0 / 1024.0:F0} MB"); } compactionSw.Stop(); @@ -155,18 +357,35 @@ Console.WriteLine("[3/3] Result:"); Console.WriteLine($" Status: {(success ? "SUCCESS" : "FAILURE")}"); Console.WriteLine($" Wall time: {compactionSw.Elapsed.TotalSeconds:F2}s"); - Console.WriteLine($" Peak working set: {peakWorkingSet / 1024.0 / 1024.0:F0} MB"); + Console.WriteLine($" Baseline WS: {startWorkingSet / 1024.0 / 1024.0:F0} MB"); + Console.WriteLine($" Peak WS: {peakWorkingSet / 1024.0 / 1024.0:F0} MB (+{(peakWorkingSet - startWorkingSet) / 1024.0 / 1024.0:F0} MB)"); + if (cycles > 1 && perCycleWorkingSet.Count > 0) + { + Console.WriteLine($" Post-GC WS by cycle:"); + for (var i = 0; i < perCycleWorkingSet.Count; i++) + { + var (peak, postGc) = perCycleWorkingSet[i]; + Console.WriteLine($" cycle {i + 1}: peak +{peak / 1024.0 / 1024.0:F0} MB, post-GC {postGc / 1024.0 / 1024.0:F0} MB"); + } + var firstPostGc = perCycleWorkingSet[0].postGc; + var lastPostGc = perCycleWorkingSet[^1].postGc; + var drift = (lastPostGc - firstPostGc) / 1024.0 / 1024.0; + Console.WriteLine($" WS drift (last - first post-GC): {drift:+0;-0;0} MB"); + } if (success) { Console.WriteLine($" Output size: {compactedFileBytes / 1024.0 / 1024.0:F1} MB"); - /* Sanity check: row count round-trip — output must match source */ + /* Sanity check: row count round-trip — output must match inputs */ + var srcSqlList = mergeFiles.Count > 0 + ? string.Join(", ", mergeFiles.Select(p => $"'{p.Replace("'", "''").Replace("\\", "/")}'")) + : $"'{sourceFile.Replace("'", "''").Replace("\\", "/")}'"; using var verifyCon = new DuckDBConnection("DataSource=:memory:"); verifyCon.Open(); using var verifyCmd = verifyCon.CreateCommand(); verifyCmd.CommandText = $"SELECT (SELECT COUNT(*) FROM read_parquet('{targetPath.Replace("'", "''")}')) AS out_rows, " + - $" (SELECT COUNT(*) FROM read_parquet('{sourceFile.Replace("'", "''").Replace("\\", "/")}')) AS src_rows"; + $" (SELECT COUNT(*) FROM read_parquet([{srcSqlList}], union_by_name=true)) AS src_rows"; using var verifyReader = verifyCmd.ExecuteReader(); verifyReader.Read(); var actualRows = verifyReader.GetInt64(0); @@ -203,7 +422,12 @@ static List SplitSourceFile(string sourceFile, string outDir, int numChu { /* Split a real monthly parquet into N chunks using row-number bucketing. Each chunk is written as ZSTD parquet (matching the production format). - Empty chunks are skipped. */ + Empty chunks are skipped. + NOTE: This connection deliberately runs with DuckDB defaults (no memory_limit). + The merge connections set their own memory_limit. If you see the merge OOM + with "X MiB / 953.6 MiB used" where the high-water mark looks like leftover + state from this split, that's a clue DuckDB.NET shares buffer state across + :memory: connections in the same process. */ var sourceSql = sourceFile.Replace("'", "''").Replace("\\", "/"); using var con = new DuckDBConnection("DataSource=:memory:"); @@ -238,3 +462,70 @@ static string GetArg(string[] args, string key, string defaultValue) if (args[i] == key) return args[i + 1]; return defaultValue; } + +static void GenerateSyntheticSource(string outputPath, int rows, int planKb) +{ + /* Generate query_snapshots-shaped parquet with high-entropy plan XML so + ZSTD can't collapse content to a single dictionary entry. We aggregate + a list of md5() hashes per row — each hash uses (collection_id, op_index) + as a unique seed, defeating both per-row and cross-row compression. + This matches the in-memory pain of real plan XML during merge. */ + var sourceSql = outputPath.Replace("'", "''").Replace("\\", "/"); + /* Each "op" tag is roughly '' = ~46 chars. + Pick op count so the assembled XML hits the target byte size. */ + const int opTagBytes = 46; + var opsPerPlan = Math.Max(4, (planKb * 1024) / opTagBytes); + + using var con = new DuckDBConnection("DataSource=:memory:"); + con.Open(); + + using var cmd = con.CreateCommand(); + /* list_aggregate builds the plan as the concatenation of {opsPerPlan} + unique tags. md5() is high-entropy and depends on the + row index + op index, so the per-row content is irreducible. */ + cmd.CommandText = $@" +COPY ( + SELECT + i AS collection_id, + TIMESTAMP '2026-04-01 00:00:00' + INTERVAL (i) MINUTE AS collection_time, + ((i % 4) + 1)::INTEGER AS server_id, + ('Server' || ((i % 4) + 1)::VARCHAR) AS server_name, + ((i % 200) + 50)::INTEGER AS session_id, + ('db_' || ((i % 10) + 1)::VARCHAR) AS database_name, + '00:00:00' AS elapsed_time_formatted, + ('SELECT * FROM t_' || (i % 1000)::VARCHAR || ' WHERE c = ''' || md5(i::VARCHAR) || '''') AS query_text, + ('' || + list_aggregate( + list_transform(generate_series(1, {opsPerPlan}), + j -> ''), + 'string_agg', '') || + '') AS query_plan, + ('' || + list_aggregate( + list_transform(generate_series(1, {opsPerPlan}), + j -> ''), + 'string_agg', '') || + '') AS live_query_plan, + CASE (i % 5) WHEN 0 THEN 'running' WHEN 1 THEN 'suspended' WHEN 2 THEN 'sleeping' WHEN 3 THEN 'background' ELSE 'rollback' END AS status, + CASE WHEN i % 7 = 0 THEN ((i % 200) + 1)::INTEGER ELSE NULL END AS blocking_session_id, + CASE (i % 4) WHEN 0 THEN 'PAGEIOLATCH_SH' WHEN 1 THEN 'CXPACKET' WHEN 2 THEN 'LCK_M_S' ELSE NULL END AS wait_type, + ((i * 13) % 5000)::BIGINT AS wait_time_ms, + ('PAGE: 1:' || (i % 1000000)::VARCHAR) AS wait_resource, + ((i * 17) % 60000)::BIGINT AS cpu_time_ms, + ((i * 23) % 120000)::BIGINT AS total_elapsed_time_ms, + ((i * 31) % 1000000)::BIGINT AS reads, + ((i * 41) % 10000)::BIGINT AS writes, + ((i * 43) % 5000000)::BIGINT AS logical_reads, + ((i % 1000) / 100.0)::DECIMAL(18,2) AS granted_query_memory_gb, + 'READ_COMMITTED' AS transaction_isolation_level, + ((i % 8) + 1)::INTEGER AS dop, + ((i % 16) + 1)::INTEGER AS parallel_worker_count, + ('login_' || (i % 50)::VARCHAR) AS login_name, + ('HOST-' || (i % 20)::VARCHAR) AS host_name, + ('Program_' || (i % 30)::VARCHAR) AS program_name, + (i % 5)::INTEGER AS open_transaction_count, + ((i % 100))::DECIMAL(5,2) AS percent_complete + FROM generate_series(1, {rows}) t(i) +) TO '{sourceSql}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 122880)"; + cmd.ExecuteNonQuery(); +}