Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions Lite/Database/DuckDbInitializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,17 @@ public DuckDbInitializer(string databasePath, ILogger<DuckDbInitializer>? logger

/// <summary>
/// Gets the connection string for the DuckDB database.
/// Disables automatic WAL checkpoints to prevent 2-3s stop-the-world stalls
/// during collector writes. Manual CHECKPOINT runs between collection cycles instead.
/// - checkpoint_threshold=1GB: disables automatic WAL checkpoints to prevent
/// 2-3s stop-the-world stalls during collector writes. Manual CHECKPOINT
/// runs between collection cycles instead.
/// - memory_limit=1GB: caps the resting buffer pool so it doesn't grow
/// unbounded as the archive directory fills with parquet files (the
/// ".tmp dir caching" path is the actual driver of #933's titled
/// complaint — uncapped, buffer pool grows toward 80% of system RAM).
/// ArchiveService raises this temporarily for parquet COPY operations,
/// which need more headroom due to a DuckDB pre-reservation behavior.
/// </summary>
public string ConnectionString => $"Data Source={_databasePath};checkpoint_threshold=1GB";
public string ConnectionString => $"Data Source={_databasePath};memory_limit=1GB;checkpoint_threshold=1GB";

/// <summary>
/// Ensures the database exists and all tables are created.
Expand Down
125 changes: 95 additions & 30 deletions Lite/Services/ArchiveService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,23 +187,100 @@ private static async Task<long> GetRowCountBeforeCutoff(DuckDBConnection connect

private static async Task ExportToParquet(DuckDBConnection connection, string table, string timeColumn, DateTime cutoff, string filePath)
{
using var cmd = connection.CreateCommand();
cmd.CommandText = $@"
await WithRaisedCopyMemoryLimit(connection, async () =>
{
using var cmd = connection.CreateCommand();
cmd.CommandText = $@"
COPY (
SELECT * FROM {table} WHERE {timeColumn} < $1
) TO '{EscapeSqlPath(filePath)}' (FORMAT PARQUET, COMPRESSION ZSTD)";
cmd.Parameters.Add(new DuckDBParameter { Value = cutoff });
await cmd.ExecuteNonQueryAsync();
cmd.Parameters.Add(new DuckDBParameter { Value = cutoff });
await cmd.ExecuteNonQueryAsync();
});
}

private static string EscapeSqlPath(string path) => DuckDbInitializer.EscapeSqlPath(path);

/* Resting and COPY memory_limit values for the main DuckDB connection.
The resting value is also set in DuckDbInitializer.ConnectionString so
newly-opened connections start at the resting cap; the COPY value is
applied transiently around parquet COPY operations and restored after.
See WithRaisedCopyMemoryLimit and the comment block on ConnectionString. */
private const string MainConnectionRestingMemoryLimit = "1GB";
private const string MainConnectionCopyMemoryLimit = "4GB";

/// <summary>
/// Runs <paramref name="action"/> with the connection's memory_limit raised
/// to <see cref="MainConnectionCopyMemoryLimit"/>, restoring to
/// <see cref="MainConnectionRestingMemoryLimit"/> after. Use around parquet
/// COPY operations on the main connection — those hit a DuckDB
/// pre-reservation behavior that needs more headroom than the resting cap
/// (#933). memory_limit is instance-level; concurrent operations briefly
/// see the raised cap.
/// </summary>
private static async Task WithRaisedCopyMemoryLimit(DuckDBConnection connection, Func<Task> action)
{
using (var raiseCmd = connection.CreateCommand())
{
raiseCmd.CommandText = $"SET memory_limit = '{MainConnectionCopyMemoryLimit}'";
await raiseCmd.ExecuteNonQueryAsync();
}

try
{
await action();
}
finally
{
try
{
using var restoreCmd = connection.CreateCommand();
restoreCmd.CommandText = $"SET memory_limit = '{MainConnectionRestingMemoryLimit}'";
await restoreCmd.ExecuteNonQueryAsync();
}
catch
{
/* Best-effort restore. If this fails the connection is in a bad
state and will be disposed by the caller's `using` shortly. */
}
}
}

/* Columns to exclude during compaction — dead weight from legacy archives */
private static readonly Dictionary<string, string[]> CompactionExcludeColumns = new()
{
["query_store_stats"] = ["query_plan_text"]
};

/* Build the SELECT clause for a compaction COPY, excluding only the
CompactionExcludeColumns actually present in THIS set of files.
Detection must be per-merge-set, not global: archive files predating a
schema change lack the column, so a globally-computed "* EXCLUDE (col)"
fails the binder on a pair where neither file has it. query_plan_text
was added to query_store_stats in migration v13 (2026-02-23), so a
reporter's pre-v13 archives don't carry it. (#933) */
private static string BuildSelectClause(string table, IReadOnlyList<string> paths)
{
if (!CompactionExcludeColumns.TryGetValue(table, out var excludeCols))
{
return "*";
}

using var schemaCon = new DuckDBConnection("DataSource=:memory:");
schemaCon.Open();
var pathList = string.Join(", ", paths.Select(p => $"'{EscapeSqlPath(p)}'"));
using var schemaCmd = schemaCon.CreateCommand();
schemaCmd.CommandText = $"SELECT column_name FROM (DESCRIBE SELECT * FROM read_parquet([{pathList}], union_by_name=true))";
using var reader = schemaCmd.ExecuteReader();
var existingCols = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
while (reader.Read()) existingCols.Add(reader.GetString(0));

var colsToExclude = excludeCols.Where(c => existingCols.Contains(c)).ToArray();
return colsToExclude.Length > 0
? $"* EXCLUDE ({string.Join(", ", colsToExclude)})"
: "*";
}

/// <summary>
/// Compacts all per-cycle parquet files into monthly files (YYYYMM_tablename.parquet).
/// This keeps the archive directory small (~75 files for 3 months of 25 tables)
Expand Down Expand Up @@ -371,26 +448,6 @@ parquet files already live on. */
.Select(f => Path.Combine(_archivePath, f).Replace("\\", "/"))
.ToList();

/* Determine column exclusions up front using all source files */
var selectClause = "*";
if (CompactionExcludeColumns.TryGetValue(table, out var excludeCols))
{
using var schemaCon = new DuckDBConnection("DataSource=:memory:");
schemaCon.Open();
var allPathList = string.Join(", ", sourcePaths.Select(p => $"'{EscapeSqlPath(p)}'"));
using var schemaCmd = schemaCon.CreateCommand();
schemaCmd.CommandText = $"SELECT column_name FROM (DESCRIBE SELECT * FROM read_parquet([{allPathList}], union_by_name=true))";
using var reader = schemaCmd.ExecuteReader();
var existingCols = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
while (reader.Read()) existingCols.Add(reader.GetString(0));

var colsToExclude = excludeCols.Where(c => existingCols.Contains(c)).ToArray();
if (colsToExclude.Length > 0)
{
selectClause = $"* EXCLUDE ({string.Join(", ", colsToExclude)})";
}
}

if (sourcePaths.Count <= 2)
{
/* Small group — single-pass merge.
Expand All @@ -416,6 +473,7 @@ guide recommendation of 50-60% of system RAM.
pragma.ExecuteNonQuery();
}

var selectClause = BuildSelectClause(table, sourcePaths);
var pathList = string.Join(", ", sourcePaths.Select(p => $"'{EscapeSqlPath(p)}'"));
using var cmd = con.CreateCommand();
cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pathList}], union_by_name=true)) " +
Expand Down Expand Up @@ -447,6 +505,7 @@ Sort smallest-first so early merges are cheap. */
pragma.ExecuteNonQuery();
}

var selectClause = BuildSelectClause(table, new[] { currentPath, sorted[i] });
var pairList = $"'{EscapeSqlPath(currentPath)}', '{EscapeSqlPath(sorted[i])}'";
using var cmd = con.CreateCommand();
cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pairList}], union_by_name=true)) " +
Expand Down Expand Up @@ -562,9 +621,12 @@ Archive views use glob (*_table.parquet) to pick up all files. */
var parquetPath = Path.Combine(_archivePath, $"{timestamp}_{table}.parquet")
.Replace("\\", "/");

using var exportCmd = connection.CreateCommand();
exportCmd.CommandText = $"COPY (SELECT * FROM {table}) TO '{EscapeSqlPath(parquetPath)}' (FORMAT PARQUET, COMPRESSION ZSTD)";
await exportCmd.ExecuteNonQueryAsync();
await WithRaisedCopyMemoryLimit(connection, async () =>
{
using var exportCmd = connection.CreateCommand();
exportCmd.CommandText = $"COPY (SELECT * FROM {table}) TO '{EscapeSqlPath(parquetPath)}' (FORMAT PARQUET, COMPRESSION ZSTD)";
await exportCmd.ExecuteNonQueryAsync();
});

_logger?.LogInformation("Archived {Count} rows from {Table}", rowCount, table);
}
Expand All @@ -587,9 +649,12 @@ Archive views use glob (*_table.parquet) to pick up all files. */
if (rowCount == 0) continue;

var preservePath = Path.Combine(preserveDir, $"{table}.parquet").Replace("\\", "/");
using var exportCmd = connection.CreateCommand();
exportCmd.CommandText = $"COPY (SELECT * FROM {table}) TO '{EscapeSqlPath(preservePath)}' (FORMAT PARQUET)";
await exportCmd.ExecuteNonQueryAsync();
await WithRaisedCopyMemoryLimit(connection, async () =>
{
using var exportCmd = connection.CreateCommand();
exportCmd.CommandText = $"COPY (SELECT * FROM {table}) TO '{EscapeSqlPath(preservePath)}' (FORMAT PARQUET)";
await exportCmd.ExecuteNonQueryAsync();
});
preservedFiles[table] = preservePath;

_logger?.LogInformation("Preserved {Count} rows from {Table} for restoration after reset", rowCount, table);
Expand Down
Loading