From eacb3e16772de2ff12c892d85b2831d4f9c2a677 Mon Sep 17 00:00:00 2001 From: David Wein Date: Wed, 3 Dec 2025 08:11:16 +0000 Subject: [PATCH] pgbench: Add --init-batch-size with automatic retry on disconnect Add a new --init-batch-size option to pgbench initialization mode that enables batched commits and automatic reconnection/retry on errors. The --init-batch-size parameter accepts a value in scale units (same as --scale). When specified, the initialization process commits after each batch instead of using one large transaction. This allows pgbench to recover from network disconnections or other transient errors during long-running initializations that can take hours for multi-TB datasets. Key features: - Batched commits: Each batch commits independently, preserving work - Automatic retry: On error, pgbench reconnects and retries the current batch up to 5 times before giving up - Progress tracking: Allows dozens of retries over the entire run as long as progress is made between retries - Error reporting: Reports total errors and retries at completion - COPY FREEZE disabled: When batching is enabled, COPY FREEZE is automatically disabled since it requires a transaction that created the table The implementation: - Modifies initPopulateTable() to accept a connection pointer and handle batching/retry logic - Updates initGenerateDataClientSide() to support both batched and non-batched modes - Maintains cancel handler (SetCancelConn/ResetCancelConn) across reconnections - Adds global counters for num_init_errors and num_init_retries Also adds TAP tests to verify the retry behavior works correctly when connections are terminated during initialization. Example usage: pgbench -i --scale=1000 --init-batch-size=10 dbname This commits every 10 scale units (1 million accounts rows per batch). --- src/bin/pgbench/pgbench.c | 412 ++++++++++++++----- src/bin/pgbench/t/001_pgbench_with_server.pl | 94 +++++ 2 files changed, 413 insertions(+), 93 deletions(-) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 7848d147153..225c54ec214 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -179,6 +179,9 @@ int64 end_time = 0; /* when to stop in micro seconds, under -T */ * pgbench_accounts table. */ int scale = 1; +int init_batch_size = 0; +int num_init_errors = 0; +int num_init_retries = 0; /* * fillfactor. for example, fillfactor = 90 will use only 90 percent @@ -888,6 +891,7 @@ usage(void) " -q, --quiet quiet logging (one message each 5 seconds)\n" " -s, --scale=NUM scaling factor\n" " --foreign-keys create foreign key constraints between tables\n" + " --init-batch-size=NUM batch size for initialization in scale units (default: 0 = all)\n" " --index-tablespace=TABLESPACE\n" " create indexes in the specified tablespace\n" " --partition-method=(range|hash)\n" @@ -4973,7 +4977,7 @@ initAccount(PQExpBufferData *sql, int64 curr) } static void -initPopulateTable(PGconn *con, const char *table, int64 base, +initPopulateTable(PGconn **con_p, const char *table, int64 base, initRowMethod init_row) { int n; @@ -4985,6 +4989,7 @@ initPopulateTable(PGconn *con, const char *table, int64 base, char copy_statement[256]; const char *copy_statement_fmt = "copy %s from stdin"; int64 total = base * scale; + int64 batch_rows = (init_batch_size > 0) ? (init_batch_size * base) : total; /* used to track elapsed time and estimate of the remaining time */ pg_time_usec_t start; @@ -4993,13 +4998,19 @@ initPopulateTable(PGconn *con, const char *table, int64 base, /* Stay on the same line if reporting to a terminal */ char eol = isatty(fileno(stderr)) ? '\r' : '\n'; + PGconn *con = *con_p; + initPQExpBuffer(&sql); /* * Use COPY with FREEZE on v14 and later for all the tables except * pgbench_accounts when it is partitioned. + * + * When using initialization batches (init_batch_size > 0), we cannot use FREEZE + * because subsequent batches will run in separate transactions, where the table + * was not created/truncated in the current transaction. */ - if (PQserverVersion(con) >= 140000) + if (PQserverVersion(con) >= 140000 && init_batch_size == 0) { if (strcmp(table, "pgbench_accounts") != 0 || partitions == 0) @@ -5012,87 +5023,175 @@ initPopulateTable(PGconn *con, const char *table, int64 base, else if (n == -1) pg_fatal("invalid format string"); - res = PQexec(con, copy_statement); - - if (PQresultStatus(res) != PGRES_COPY_IN) - pg_fatal("unexpected copy in result: %s", PQerrorMessage(con)); - PQclear(res); - start = pg_time_now(); - for (k = 0; k < total; k++) + k = 0; + while (k < total) { - int64 j = k + 1; + int64 batch_end = k + batch_rows; + int retries = 0; + bool batch_success = false; - init_row(&sql, k); - if (PQputline(con, sql.data)) - pg_fatal("PQputline failed"); + if (batch_end > total) + batch_end = total; - if (CancelRequested) - break; - - /* - * If we want to stick with the original logging, print a message each - * 100k inserted rows. - */ - if ((!use_quiet) && (j % 100000 == 0)) + while (!batch_success) { - double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start); - double remaining_sec = ((double) total - j) * elapsed_sec / j; + bool error = false; - chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)", - j, total, - (int) ((j * 100) / total), - table, elapsed_sec, remaining_sec); + if (init_batch_size > 0) + { + res = PQexec(con, "begin"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + error = true; + PQclear(res); + } - /* - * If the previous progress message is longer than the current - * one, add spaces to the current line to fully overwrite any - * remaining characters from the previous message. - */ - if (prev_chars > chars) - fprintf(stderr, "%*c", prev_chars - chars, ' '); - fputc(eol, stderr); - prev_chars = chars; - } - /* let's not call the timing for each row, but only each 100 rows */ - else if (use_quiet && (j % 100 == 0)) - { - double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start); - double remaining_sec = ((double) total - j) * elapsed_sec / j; + if (!error) + { + res = PQexec(con, copy_statement); + if (PQresultStatus(res) != PGRES_COPY_IN) + { + /* On retry, we might have lost connection, checking status/msg */ + error = true; + } + PQclear(res); + } - /* have we reached the next interval (or end)? */ - if ((j == total) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS)) + if (!error) { - chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)", - j, total, - (int) ((j * 100) / total), - table, elapsed_sec, remaining_sec); + for (int64 i = k; i < batch_end; i++) + { + int64 j = i + 1; - /* - * If the previous progress message is longer than the current - * one, add spaces to the current line to fully overwrite any - * remaining characters from the previous message. - */ - if (prev_chars > chars) - fprintf(stderr, "%*c", prev_chars - chars, ' '); - fputc(eol, stderr); - prev_chars = chars; + init_row(&sql, i); + if (PQputline(con, sql.data)) + { + error = true; + break; + } + + if (CancelRequested) + { + error = true; + break; + } + + /* + * If we want to stick with the original logging, print a message each + * 100k inserted rows. + */ + if ((!use_quiet) && (j % 100000 == 0)) + { + double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start); + double remaining_sec = ((double) total - j) * elapsed_sec / j; + + chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)", + j, total, + (int) ((j * 100) / total), + table, elapsed_sec, remaining_sec); + + /* + * If the previous progress message is longer than the current + * one, add spaces to the current line to fully overwrite any + * remaining characters from the previous message. + */ + if (prev_chars > chars) + fprintf(stderr, "%*c", prev_chars - chars, ' '); + fputc(eol, stderr); + prev_chars = chars; + } + /* let's not call the timing for each row, but only each 100 rows */ + else if (use_quiet && (j % 100 == 0)) + { + double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start); + double remaining_sec = ((double) total - j) * elapsed_sec / j; + + /* have we reached the next interval (or end)? */ + if ((j == total) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS)) + { + chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)", + j, total, + (int) ((j * 100) / total), + table, elapsed_sec, remaining_sec); + + /* + * If the previous progress message is longer than the current + * one, add spaces to the current line to fully overwrite any + * remaining characters from the previous message. + */ + if (prev_chars > chars) + fprintf(stderr, "%*c", prev_chars - chars, ' '); + fputc(eol, stderr); + prev_chars = chars; + + /* skip to the next interval */ + log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS); + } + } + } + } + + if (!error) + { + if (PQputline(con, "\\.\n")) + error = true; + if (!error && PQendcopy(con)) + error = true; + } + + if (!error && init_batch_size > 0) + { + res = PQexec(con, "commit"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + error = true; + PQclear(res); + } + + if (CancelRequested) + break; + + if (!error) + { + batch_success = true; + } + else + { + if (init_batch_size == 0) + { + pg_log_error("initialization failed for %s: %s", table, PQerrorMessage(con)); + exit(1); + } + + num_init_errors++; + retries++; + if (retries > 5) + { + pg_fatal("too many errors during initialization batch, giving up"); + } - /* skip to the next interval */ - log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS); + pg_log_info("initialization batch failed (retry %d/5), reconnecting...", retries); + + ResetCancelConn(); + PQfinish(con); + con = doConnect(); + if (con == NULL) + pg_fatal("could not reconnect"); + SetCancelConn(con); + *con_p = con; + num_init_retries++; } } + + if (CancelRequested) + break; + + k = batch_end; } if (chars != 0 && eol != '\n') fprintf(stderr, "%*c\r", chars, ' '); /* Clear the current line */ - if (PQputline(con, "\\.\n")) - pg_fatal("very last PQputline failed"); - if (PQendcopy(con)) - pg_fatal("PQendcopy failed"); - termPQExpBuffer(&sql); } @@ -5103,15 +5202,18 @@ initPopulateTable(PGconn *con, const char *table, int64 base, * a blank-padded string in pgbench_accounts. */ static void -initGenerateDataClientSide(PGconn *con) +initGenerateDataClientSide(PGconn **con_p) { + PGconn *con = *con_p; + fprintf(stderr, "generating data (client-side)...\n"); /* * we do all of this in one transaction to enable the backend's * data-loading optimizations */ - executeStatement(con, "begin"); + if (init_batch_size == 0) + executeStatement(con, "begin"); /* truncate away any old data */ initTruncateTables(con); @@ -5120,11 +5222,12 @@ initGenerateDataClientSide(PGconn *con) * fill branches, tellers, accounts in that order in case foreign keys * already exist */ - initPopulateTable(con, "pgbench_branches", nbranches, initBranch); - initPopulateTable(con, "pgbench_tellers", ntellers, initTeller); - initPopulateTable(con, "pgbench_accounts", naccounts, initAccount); + initPopulateTable(con_p, "pgbench_branches", nbranches, initBranch); + initPopulateTable(con_p, "pgbench_tellers", ntellers, initTeller); + initPopulateTable(con_p, "pgbench_accounts", naccounts, initAccount); - executeStatement(con, "commit"); + if (init_batch_size == 0) + executeStatement(*con_p, "commit"); } /* @@ -5135,9 +5238,13 @@ initGenerateDataClientSide(PGconn *con) * and is a blank-padded string in pgbench_accounts. */ static void -initGenerateDataServerSide(PGconn *con) +initGenerateDataServerSide(PGconn **con_p) { + PGconn *con = *con_p; PQExpBufferData sql; + PGresult *res; + int batch_size = (init_batch_size > 0) ? init_batch_size : scale; + int64 scale_start; fprintf(stderr, "generating data (server-side)...\n"); @@ -5145,35 +5252,143 @@ initGenerateDataServerSide(PGconn *con) * we do all of this in one transaction to enable the backend's * data-loading optimizations */ - executeStatement(con, "begin"); + if (init_batch_size == 0) + executeStatement(con, "begin"); /* truncate away any old data */ initTruncateTables(con); initPQExpBuffer(&sql); - printfPQExpBuffer(&sql, - "insert into pgbench_branches(bid,bbalance) " - "select bid, 0 " - "from generate_series(1, %d) as bid", nbranches * scale); - executeStatement(con, sql.data); - - printfPQExpBuffer(&sql, - "insert into pgbench_tellers(tid,bid,tbalance) " - "select tid, (tid - 1) / %d + 1, 0 " - "from generate_series(1, %d) as tid", ntellers, ntellers * scale); - executeStatement(con, sql.data); - - printfPQExpBuffer(&sql, - "insert into pgbench_accounts(aid,bid,abalance,filler) " - "select aid, (aid - 1) / %d + 1, 0, '' " - "from generate_series(1, " INT64_FORMAT ") as aid", - naccounts, (int64) naccounts * scale); - executeStatement(con, sql.data); + for (scale_start = 0; scale_start < scale; scale_start += batch_size) + { + int64 scale_end = scale_start + batch_size; + bool batch_success = false; + int retries = 0; + + if (scale_end > scale) + scale_end = scale; + + while (!batch_success) + { + bool error = false; + + if (init_batch_size > 0) + { + res = PQexec(con, "begin"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + error = true; + PQclear(res); + } + + if (!error) + { + int64 start_bid = scale_start * nbranches + 1; + int64 end_bid = scale_end * nbranches; + + printfPQExpBuffer(&sql, + "insert into pgbench_branches(bid,bbalance) " + "select bid, 0 " + "from generate_series(" INT64_FORMAT ", " INT64_FORMAT ") as bid", + start_bid, end_bid); + if (!error) + { + res = PQexec(con, sql.data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + error = true; + PQclear(res); + } + } + + if (!error) + { + int64 start_tid = scale_start * ntellers + 1; + int64 end_tid = scale_end * ntellers; + + printfPQExpBuffer(&sql, + "insert into pgbench_tellers(tid,bid,tbalance) " + "select tid, (tid - 1) / %d + 1, 0 " + "from generate_series(" INT64_FORMAT ", " INT64_FORMAT ") as tid", + ntellers, start_tid, end_tid); + if (!error) + { + res = PQexec(con, sql.data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + error = true; + PQclear(res); + } + } + + if (!error) + { + int64 start_aid = scale_start * naccounts + 1; + int64 end_aid = scale_end * naccounts; + + printfPQExpBuffer(&sql, + "insert into pgbench_accounts(aid,bid,abalance,filler) " + "select aid, (aid - 1) / %d + 1, 0, '' " + "from generate_series(" INT64_FORMAT ", " INT64_FORMAT ") as aid", + naccounts, start_aid, end_aid); + if (!error) + { + res = PQexec(con, sql.data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + error = true; + PQclear(res); + } + } + + if (!error && init_batch_size > 0) + { + res = PQexec(con, "commit"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + error = true; + PQclear(res); + } + + if (CancelRequested) + break; + + if (!error) + { + batch_success = true; + } + else + { + if (init_batch_size == 0) + { + pg_log_error("server-side data generation failed: %s", PQerrorMessage(con)); + exit(1); + } + + num_init_errors++; + retries++; + if (retries > 5) + { + pg_fatal("too many errors during initialization batch, giving up"); + } + + pg_log_info("initialization batch failed (retry %d/5), reconnecting...", retries); + + ResetCancelConn(); + PQfinish(con); + con = doConnect(); + if (con == NULL) + pg_fatal("could not reconnect"); + SetCancelConn(con); + *con_p = con; + num_init_retries++; + } + } + + if (CancelRequested) + break; + } termPQExpBuffer(&sql); - executeStatement(con, "commit"); + if (init_batch_size == 0) + executeStatement(con, "commit"); } /* @@ -5310,11 +5525,11 @@ runInitSteps(const char *initialize_steps) break; case 'g': op = "client-side generate"; - initGenerateDataClientSide(con); + initGenerateDataClientSide(&con); break; case 'G': op = "server-side generate"; - initGenerateDataServerSide(con); + initGenerateDataServerSide(&con); break; case 'v': op = "vacuum"; @@ -5352,6 +5567,10 @@ runInitSteps(const char *initialize_steps) } fprintf(stderr, "done in %.2f s (%s).\n", run_time, stats.data); + + if (num_init_errors > 0 || num_init_retries > 0) + fprintf(stderr, "initialization errors: %d, retries: %d\n", num_init_errors, num_init_retries); + ResetCancelConn(); PQfinish(con); termPQExpBuffer(&stats); @@ -6720,6 +6939,7 @@ main(int argc, char **argv) {"verbose-errors", no_argument, NULL, 15}, {"exit-on-abort", no_argument, NULL, 16}, {"debug", no_argument, NULL, 17}, + {"init-batch-size", required_argument, NULL, 18}, {NULL, 0, NULL, 0} }; @@ -7060,6 +7280,12 @@ main(int argc, char **argv) case 17: /* debug */ pg_logging_increase_verbosity(); break; + case 18: /* init-batch-size */ + initialization_option_set = true; + if (!option_parse_int(optarg, "--init-batch-size", 1, INT_MAX, + &init_batch_size)) + exit(1); + break; default: /* getopt_long already emitted a complaint */ pg_log_error_hint("Try \"%s --help\" for more information.", progname); diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index ab0d587b358..307682ce67d 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -7,6 +7,7 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +use Time::HiRes qw(usleep); # Check the initial state of the data generated. Tables for tellers and # branches use NULL for their filler attribute. The table accounts uses @@ -1650,6 +1651,99 @@ BEGIN # Clean up $node->safe_psql('postgres', 'DROP TABLE counter;'); +# Test --init-batch-size retry on backend termination +# This test verifies that pgbench can recover from connection drops during +# initialization when using the --init-batch-size option. +# +# We run pgbench in a separate process and monitor its activity. +# When we detect that pgbench is actively loading data (COPY for client-side, +# INSERT for server-side), we terminate the backend to simulate a connection +# failure. We then verify that pgbench retries and succeeds. + +for my $method ('client', 'server') +{ + # Use larger scale to ensure we have time to catch and kill the backend. + # Client-side COPY is faster than server-side INSERT, so we need a larger + # scale for client-side to reliably catch it. + # Scale 200 = 20 million rows for accounts table. + my $scale = ($method eq 'client') ? 200 : 100; + my $init_steps = ($method eq 'client') ? 'dtg' : 'dtG'; + my $target_query = ($method eq 'client') ? 'copy' : 'insert'; + my $expected_rows = $scale * 100000; + + my $pgbench_timeout = IPC::Run::timer(600); + my ($pgbench_stdout, $pgbench_stderr) = ('', ''); + + # Start pgbench with --init-batch-size in the background + my @pgbench_cmd = ( + 'pgbench', '-i', '-I', $init_steps, + '-s', $scale, '--init-batch-size=5', + $node->connstr('postgres') + ); + + my $pgbench = IPC::Run::start( + \@pgbench_cmd, + '>', \$pgbench_stdout, + '2>', \$pgbench_stderr, + $pgbench_timeout + ); + + # Pump to ensure process starts + $pgbench->pump_nb(); + + # Wait until pgbench is running COPY/INSERT on pgbench_accounts, then kill it. + my $killed = 0; + my $deadline = time() + 120; + + while (time() < $deadline && !$killed && $pgbench->pumpable()) + { + $pgbench->pump_nb(); + + # Look for pgbench backend running data loading (COPY/INSERT) on pgbench_accounts + # We need to match the specific data loading query, not DROP/CREATE TABLE + my $result = $node->safe_psql('postgres', qq{ + SELECT pid FROM pg_stat_activity + WHERE pid != pg_backend_pid() + AND datname = 'postgres' + AND query ILIKE '%${target_query}%pgbench_accounts%' + }); + + if ($result ne '') + { + # Found the backend running data loading, terminate it + my ($pid) = split(/\n/, $result); + $node->safe_psql('postgres', "SELECT pg_terminate_backend($pid)"); + $killed = 1; + } + + usleep(10_000) unless $killed; # 10ms + } + + # Wait for pgbench to finish + $pgbench->finish; + my $pgbench_exit = $pgbench->result(0); + + # Check results + is($pgbench_exit, 0, + "pgbench --init-batch-size $method-side succeeded after backend termination"); + + if ($killed) + { + like($pgbench_stderr, qr/initialization batch failed|initialization errors:/, + "pgbench --init-batch-size $method-side logged retry message"); + } + else + { + diag("Warning: did not find pgbench backend to kill for $method-side test"); + } + + # Verify data count + my $count = $node->safe_psql('postgres', + 'SELECT count(*) FROM pgbench_accounts'); + is($count, $expected_rows, + "pgbench --init-batch-size $method-side correct row count"); +} + # done $node->safe_psql('postgres', 'DROP TABLESPACE regress_pgbench_tap_1_ts'); $node->stop;