diff --git a/Lite/Database/DuckDbInitializer.cs b/Lite/Database/DuckDbInitializer.cs index e767c6d..a839b1a 100644 --- a/Lite/Database/DuckDbInitializer.cs +++ b/Lite/Database/DuckDbInitializer.cs @@ -124,10 +124,17 @@ public DuckDbInitializer(string databasePath, ILogger? logger /// /// Gets the connection string for the DuckDB database. - /// Disables automatic WAL checkpoints to prevent 2-3s stop-the-world stalls - /// during collector writes. Manual CHECKPOINT runs between collection cycles instead. + /// - checkpoint_threshold=1GB: disables automatic WAL checkpoints to prevent + /// 2-3s stop-the-world stalls during collector writes. Manual CHECKPOINT + /// runs between collection cycles instead. + /// - memory_limit=1GB: caps the resting buffer pool so it doesn't grow + /// unbounded as the archive directory fills with parquet files (the + /// ".tmp dir caching" path is the actual driver of #933's titled + /// complaint — uncapped, buffer pool grows toward 80% of system RAM). + /// ArchiveService raises this temporarily for parquet COPY operations, + /// which need more headroom due to a DuckDB pre-reservation behavior. /// - public string ConnectionString => $"Data Source={_databasePath};checkpoint_threshold=1GB"; + public string ConnectionString => $"Data Source={_databasePath};memory_limit=1GB;checkpoint_threshold=1GB"; /// /// Ensures the database exists and all tables are created. diff --git a/Lite/Services/ArchiveService.cs b/Lite/Services/ArchiveService.cs index aa44a3c..ed1100e 100644 --- a/Lite/Services/ArchiveService.cs +++ b/Lite/Services/ArchiveService.cs @@ -187,23 +187,100 @@ private static async Task GetRowCountBeforeCutoff(DuckDBConnection connect private static async Task ExportToParquet(DuckDBConnection connection, string table, string timeColumn, DateTime cutoff, string filePath) { - using var cmd = connection.CreateCommand(); - cmd.CommandText = $@" + await WithRaisedCopyMemoryLimit(connection, async () => + { + using var cmd = connection.CreateCommand(); + cmd.CommandText = $@" COPY ( SELECT * FROM {table} WHERE {timeColumn} < $1 ) TO '{EscapeSqlPath(filePath)}' (FORMAT PARQUET, COMPRESSION ZSTD)"; - cmd.Parameters.Add(new DuckDBParameter { Value = cutoff }); - await cmd.ExecuteNonQueryAsync(); + cmd.Parameters.Add(new DuckDBParameter { Value = cutoff }); + await cmd.ExecuteNonQueryAsync(); + }); } private static string EscapeSqlPath(string path) => DuckDbInitializer.EscapeSqlPath(path); + /* Resting and COPY memory_limit values for the main DuckDB connection. + The resting value is also set in DuckDbInitializer.ConnectionString so + newly-opened connections start at the resting cap; the COPY value is + applied transiently around parquet COPY operations and restored after. + See WithRaisedCopyMemoryLimit and the comment block on ConnectionString. */ + private const string MainConnectionRestingMemoryLimit = "1GB"; + private const string MainConnectionCopyMemoryLimit = "4GB"; + + /// + /// Runs with the connection's memory_limit raised + /// to , restoring to + /// after. Use around parquet + /// COPY operations on the main connection — those hit a DuckDB + /// pre-reservation behavior that needs more headroom than the resting cap + /// (#933). memory_limit is instance-level; concurrent operations briefly + /// see the raised cap. + /// + private static async Task WithRaisedCopyMemoryLimit(DuckDBConnection connection, Func action) + { + using (var raiseCmd = connection.CreateCommand()) + { + raiseCmd.CommandText = $"SET memory_limit = '{MainConnectionCopyMemoryLimit}'"; + await raiseCmd.ExecuteNonQueryAsync(); + } + + try + { + await action(); + } + finally + { + try + { + using var restoreCmd = connection.CreateCommand(); + restoreCmd.CommandText = $"SET memory_limit = '{MainConnectionRestingMemoryLimit}'"; + await restoreCmd.ExecuteNonQueryAsync(); + } + catch + { + /* Best-effort restore. If this fails the connection is in a bad + state and will be disposed by the caller's `using` shortly. */ + } + } + } + /* Columns to exclude during compaction — dead weight from legacy archives */ private static readonly Dictionary CompactionExcludeColumns = new() { ["query_store_stats"] = ["query_plan_text"] }; + /* Build the SELECT clause for a compaction COPY, excluding only the + CompactionExcludeColumns actually present in THIS set of files. + Detection must be per-merge-set, not global: archive files predating a + schema change lack the column, so a globally-computed "* EXCLUDE (col)" + fails the binder on a pair where neither file has it. query_plan_text + was added to query_store_stats in migration v13 (2026-02-23), so a + reporter's pre-v13 archives don't carry it. (#933) */ + private static string BuildSelectClause(string table, IReadOnlyList paths) + { + if (!CompactionExcludeColumns.TryGetValue(table, out var excludeCols)) + { + return "*"; + } + + using var schemaCon = new DuckDBConnection("DataSource=:memory:"); + schemaCon.Open(); + var pathList = string.Join(", ", paths.Select(p => $"'{EscapeSqlPath(p)}'")); + using var schemaCmd = schemaCon.CreateCommand(); + schemaCmd.CommandText = $"SELECT column_name FROM (DESCRIBE SELECT * FROM read_parquet([{pathList}], union_by_name=true))"; + using var reader = schemaCmd.ExecuteReader(); + var existingCols = new HashSet(StringComparer.OrdinalIgnoreCase); + while (reader.Read()) existingCols.Add(reader.GetString(0)); + + var colsToExclude = excludeCols.Where(c => existingCols.Contains(c)).ToArray(); + return colsToExclude.Length > 0 + ? $"* EXCLUDE ({string.Join(", ", colsToExclude)})" + : "*"; + } + /// /// Compacts all per-cycle parquet files into monthly files (YYYYMM_tablename.parquet). /// This keeps the archive directory small (~75 files for 3 months of 25 tables) @@ -371,26 +448,6 @@ parquet files already live on. */ .Select(f => Path.Combine(_archivePath, f).Replace("\\", "/")) .ToList(); - /* Determine column exclusions up front using all source files */ - var selectClause = "*"; - if (CompactionExcludeColumns.TryGetValue(table, out var excludeCols)) - { - using var schemaCon = new DuckDBConnection("DataSource=:memory:"); - schemaCon.Open(); - var allPathList = string.Join(", ", sourcePaths.Select(p => $"'{EscapeSqlPath(p)}'")); - using var schemaCmd = schemaCon.CreateCommand(); - schemaCmd.CommandText = $"SELECT column_name FROM (DESCRIBE SELECT * FROM read_parquet([{allPathList}], union_by_name=true))"; - using var reader = schemaCmd.ExecuteReader(); - var existingCols = new HashSet(StringComparer.OrdinalIgnoreCase); - while (reader.Read()) existingCols.Add(reader.GetString(0)); - - var colsToExclude = excludeCols.Where(c => existingCols.Contains(c)).ToArray(); - if (colsToExclude.Length > 0) - { - selectClause = $"* EXCLUDE ({string.Join(", ", colsToExclude)})"; - } - } - if (sourcePaths.Count <= 2) { /* Small group — single-pass merge. @@ -416,6 +473,7 @@ guide recommendation of 50-60% of system RAM. pragma.ExecuteNonQuery(); } + var selectClause = BuildSelectClause(table, sourcePaths); var pathList = string.Join(", ", sourcePaths.Select(p => $"'{EscapeSqlPath(p)}'")); using var cmd = con.CreateCommand(); cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pathList}], union_by_name=true)) " + @@ -447,6 +505,7 @@ Sort smallest-first so early merges are cheap. */ pragma.ExecuteNonQuery(); } + var selectClause = BuildSelectClause(table, new[] { currentPath, sorted[i] }); var pairList = $"'{EscapeSqlPath(currentPath)}', '{EscapeSqlPath(sorted[i])}'"; using var cmd = con.CreateCommand(); cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pairList}], union_by_name=true)) " + @@ -562,9 +621,12 @@ Archive views use glob (*_table.parquet) to pick up all files. */ var parquetPath = Path.Combine(_archivePath, $"{timestamp}_{table}.parquet") .Replace("\\", "/"); - using var exportCmd = connection.CreateCommand(); - exportCmd.CommandText = $"COPY (SELECT * FROM {table}) TO '{EscapeSqlPath(parquetPath)}' (FORMAT PARQUET, COMPRESSION ZSTD)"; - await exportCmd.ExecuteNonQueryAsync(); + await WithRaisedCopyMemoryLimit(connection, async () => + { + using var exportCmd = connection.CreateCommand(); + exportCmd.CommandText = $"COPY (SELECT * FROM {table}) TO '{EscapeSqlPath(parquetPath)}' (FORMAT PARQUET, COMPRESSION ZSTD)"; + await exportCmd.ExecuteNonQueryAsync(); + }); _logger?.LogInformation("Archived {Count} rows from {Table}", rowCount, table); } @@ -587,9 +649,12 @@ Archive views use glob (*_table.parquet) to pick up all files. */ if (rowCount == 0) continue; var preservePath = Path.Combine(preserveDir, $"{table}.parquet").Replace("\\", "/"); - using var exportCmd = connection.CreateCommand(); - exportCmd.CommandText = $"COPY (SELECT * FROM {table}) TO '{EscapeSqlPath(preservePath)}' (FORMAT PARQUET)"; - await exportCmd.ExecuteNonQueryAsync(); + await WithRaisedCopyMemoryLimit(connection, async () => + { + using var exportCmd = connection.CreateCommand(); + exportCmd.CommandText = $"COPY (SELECT * FROM {table}) TO '{EscapeSqlPath(preservePath)}' (FORMAT PARQUET)"; + await exportCmd.ExecuteNonQueryAsync(); + }); preservedFiles[table] = preservePath; _logger?.LogInformation("Preserved {Count} rows from {Table} for restoration after reset", rowCount, table);