diff --git a/CMakeLists.txt b/CMakeLists.txt index 9111281..d8c3b9b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -125,6 +125,9 @@ set(util_source_files src/util/mixin_exception_adapter.hpp + src/util/native_file_operations_helpers.hpp + src/util/native_file_operations_helpers_posix.cpp + src/util/nv_tuple_fwd.hpp src/util/nv_tuple.hpp src/util/nv_tuple_json_support.hpp diff --git a/mtr/binlog_streaming/r/list.result b/mtr/binlog_streaming/r/list.result new file mode 100644 index 0000000..47fd111 --- /dev/null +++ b/mtr/binlog_streaming/r/list.result @@ -0,0 +1,51 @@ +*** Resetting replication at the very beginning of the test. + +*** Determining the first binary log name. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. + +*** 1. Executing the Binlog Server utility in the 'list' mode on an +*** empty storage and expecting an empty result array +include/read_file_to_var.inc + +*** Creating a simple table. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +*** Filling the table with some data. +INSERT INTO t1 VALUES(); + +*** Flushing the first binary log and switching to the second one. +FLUSH BINARY LOGS; + +*** Determining the second binary log name. + +*** Filling the table with more data. +INSERT INTO t1 VALUES(); + +*** Executing the Binlog Server utility and fetching all events. + +*** 2. Executing the Binlog Server utility in the 'list' mode on a +*** non-empty storage and expecting both binlog files to be returned +*** in chronological order +include/read_file_to_var.inc + +*** 3. Executing the Binlog Server utility in the 'list' mode with a +*** nonexistent configuration file path +include/read_file_to_var.inc + +*** Removing the list result file. + +*** Dropping the table. +DROP TABLE t1; + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. diff --git a/mtr/binlog_streaming/r/purge_binlogs.result b/mtr/binlog_streaming/r/purge_binlogs.result new file mode 100644 index 0000000..dfb84b5 --- /dev/null +++ b/mtr/binlog_streaming/r/purge_binlogs.result @@ -0,0 +1,95 @@ +*** Resetting replication at the very beginning of the test. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. + +*** 1. Executing 'purge_binlogs' on an empty storage and expecting +*** an error. +include/read_file_to_var.inc + +*** Determining the first binary log name. + +*** Creating a simple table. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +*** Filling the table with some data. +INSERT INTO t1 VALUES(); + +*** Flushing the first binary log and switching to the second one. +FLUSH BINARY LOGS; + +*** Determining the second binary log name. + +*** Filling the table with more data. +INSERT INTO t1 VALUES(); + +*** Flushing the second binary log and switching to the third one. +FLUSH BINARY LOGS; + +*** Determining the third binary log name. + +*** Filling the table with even more data. +INSERT INTO t1 VALUES(); + +*** Executing the Binlog Server utility and fetching all events. + +*** Sanity check: 'list' returns all three binlog files in chronological +*** order. +include/read_file_to_var.inc + +*** 2. 'purge_binlogs' with an unparseable target name fails with error. +include/read_file_to_var.inc + +*** 3. 'purge_binlogs' with a target whose base name differs from the +*** one in storage fails with error. +include/read_file_to_var.inc + +*** 4. 'purge_binlogs' with a target not present in the storage fails +*** with error. +include/read_file_to_var.inc + +*** 5. 'purge_binlogs' with a nonexistent configuration file path fails +*** with error. +include/read_file_to_var.inc + +*** 6. 'purge_binlogs' the first binary log removes only that one file; +*** the response lists the removed record and 'list' shows the two +*** surviving files. +include/read_file_to_var.inc +include/read_file_to_var.inc + +*** 6a. Re-opening the purged storage with another 'fetch' must keep the +*** surviving GTID metadata stable. +include/read_file_to_var.inc + +*** 7. Re-running 'purge_binlogs' with the just-purged target now fails +*** with the "not present" error - the operation is not idempotent +*** by design. +include/read_file_to_var.inc + +*** 8. 'purge_binlogs' the second binary log removes only that one (the +*** first one is already gone); 'list' shows just the third file. +include/read_file_to_var.inc +include/read_file_to_var.inc + +*** 9. 'purge_binlogs' the last remaining (current tail) file is +*** refused so that the resume position is preserved for the next +*** 'fetch' / 'pull'; 'list' still shows the third file unchanged. +include/read_file_to_var.inc +include/read_file_to_var.inc + +*** Removing the purge_binlogs result file. + +*** Dropping the table. +DROP TABLE t1; + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. diff --git a/mtr/binlog_streaming/t/list.combinations b/mtr/binlog_streaming/t/list.combinations new file mode 100644 index 0000000..411ce97 --- /dev/null +++ b/mtr/binlog_streaming/t/list.combinations @@ -0,0 +1,5 @@ +[position] + +[gtid] +gtid-mode=on +enforce-gtid-consistency diff --git a/mtr/binlog_streaming/t/list.test b/mtr/binlog_streaming/t/list.test new file mode 100644 index 0000000..d563a60 --- /dev/null +++ b/mtr/binlog_streaming/t/list.test @@ -0,0 +1,94 @@ +--source ../include/have_binsrv.inc + +--source ../include/v80_v84_compatibility_defines.inc + +# in case of --repeat=N, we need to start from a fresh binary log to make +# this test deterministic +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +--echo +--echo *** Determining the first binary log name. +--let $first_binlog = query_get_value($stmt_show_binary_log_status, File, 1) + +# identifying backend storage type ('file' or 's3') +--source ../include/identify_storage_backend.inc + +# creating data directory, configuration file, etc. +--let $binsrv_connect_timeout = 20 +--let $binsrv_read_timeout = 60 +--let $binsrv_idle_time = 10 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = `SELECT IF(@@global.gtid_mode = 'ON', 'gtid', 'position')` +--let $binsrv_checkpoint_size = 1 +--source ../include/set_up_binsrv_environment.inc + +--let $read_from_file = $MYSQL_TMP_DIR/list_result.json + +--echo +--echo *** 1. Executing the Binlog Server utility in the 'list' mode on an +--echo *** empty storage and expecting an empty result array +--exec $BINSRV list $binsrv_config_file_path > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 0`) + +--echo +--echo *** Creating a simple table. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +--echo +--echo *** Filling the table with some data. +INSERT INTO t1 VALUES(); + +--echo +--echo *** Flushing the first binary log and switching to the second one. +FLUSH BINARY LOGS; + +--echo +--echo *** Determining the second binary log name. +--let $second_binlog = query_get_value($stmt_show_binary_log_status, File, 1) + +--echo +--echo *** Filling the table with more data. +INSERT INTO t1 VALUES(); + +--echo +--echo *** Executing the Binlog Server utility and fetching all events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** 2. Executing the Binlog Server utility in the 'list' mode on a +--echo *** non-empty storage and expecting both binlog files to be returned +--echo *** in chronological order +--exec $BINSRV list $binsrv_config_file_path > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 2`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].name') = '$first_binlog'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[1].name') = '$second_binlog'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].size') > 0`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[1].size') > 0`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].uri') IS NOT NULL`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[1].uri') IS NOT NULL`) + +--echo +--echo *** 3. Executing the Binlog Server utility in the 'list' mode with a +--echo *** nonexistent configuration file path +--error 1 +--exec $BINSRV list $MYSQL_TMP_DIR/no_such_config.json > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'error'`) + +--echo +--echo *** Removing the list result file. +--remove_file $read_from_file + +--echo +--echo *** Dropping the table. +DROP TABLE t1; + +# cleaning up +--source ../include/tear_down_binsrv_environment.inc diff --git a/mtr/binlog_streaming/t/purge_binlogs.combinations b/mtr/binlog_streaming/t/purge_binlogs.combinations new file mode 100644 index 0000000..411ce97 --- /dev/null +++ b/mtr/binlog_streaming/t/purge_binlogs.combinations @@ -0,0 +1,5 @@ +[position] + +[gtid] +gtid-mode=on +enforce-gtid-consistency diff --git a/mtr/binlog_streaming/t/purge_binlogs.test b/mtr/binlog_streaming/t/purge_binlogs.test new file mode 100644 index 0000000..a07a26d --- /dev/null +++ b/mtr/binlog_streaming/t/purge_binlogs.test @@ -0,0 +1,227 @@ +--source ../include/have_binsrv.inc + +--source ../include/v80_v84_compatibility_defines.inc + +# identifying backend storage type ('file' or 's3') early - 'purge_binlogs' +# is supported only on the local filesystem backend +--source ../include/identify_storage_backend.inc + +if ($storage_backend == s3) +{ + --skip purge_binlogs is supported only on the local filesystem backend +} + +# in case of --repeat=N, we need to start from a fresh binary log to make +# this test deterministic +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +# creating data directory, configuration file, etc. +--let $binsrv_connect_timeout = 20 +--let $binsrv_read_timeout = 60 +--let $binsrv_idle_time = 10 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = `SELECT IF(@@global.gtid_mode = 'ON', 'gtid', 'position')` +--let $binsrv_checkpoint_size = 1 +--source ../include/set_up_binsrv_environment.inc + +--let $read_from_file = $MYSQL_TMP_DIR/purge_binlogs_result.json + +--echo +--echo *** 1. Executing 'purge_binlogs' on an empty storage and expecting +--echo *** an error. +--error 1 +--exec $BINSRV purge_binlogs $binsrv_config_file_path binlog.000001 > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'error'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.message') LIKE '%binlog storage is empty%'`) + +--echo +--echo *** Determining the first binary log name. +--let $first_binlog = query_get_value($stmt_show_binary_log_status, File, 1) + +--echo +--echo *** Creating a simple table. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +--echo +--echo *** Filling the table with some data. +INSERT INTO t1 VALUES(); + +--echo +--echo *** Flushing the first binary log and switching to the second one. +FLUSH BINARY LOGS; + +if ($binsrv_replication_mode == gtid) +{ + --let $gtids_before_second_binlog = `SELECT @@global.gtid_executed` +} + +--echo +--echo *** Determining the second binary log name. +--let $second_binlog = query_get_value($stmt_show_binary_log_status, File, 1) + +--echo +--echo *** Filling the table with more data. +INSERT INTO t1 VALUES(); + +--echo +--echo *** Flushing the second binary log and switching to the third one. +FLUSH BINARY LOGS; + +if ($binsrv_replication_mode == gtid) +{ + --let $gtids_before_third_binlog = `SELECT @@global.gtid_executed` + --let $second_binlog_gtids = `SELECT GTID_SUBTRACT('$gtids_before_third_binlog', '$gtids_before_second_binlog')` +} + +--echo +--echo *** Determining the third binary log name. +--let $third_binlog = query_get_value($stmt_show_binary_log_status, File, 1) + +--echo +--echo *** Filling the table with even more data. +INSERT INTO t1 VALUES(); + +--echo +--echo *** Executing the Binlog Server utility and fetching all events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** Sanity check: 'list' returns all three binlog files in chronological +--echo *** order. +--exec $BINSRV list $binsrv_config_file_path > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 3`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].name') = '$first_binlog'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[1].name') = '$second_binlog'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[2].name') = '$third_binlog'`) + +--echo +--echo *** 2. 'purge_binlogs' with an unparseable target name fails with error. +--error 1 +--exec $BINSRV purge_binlogs $binsrv_config_file_path notabinlogname > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'error'`) + +--echo +--echo *** 3. 'purge_binlogs' with a target whose base name differs from the +--echo *** one in storage fails with error. +--error 1 +--exec $BINSRV purge_binlogs $binsrv_config_file_path other.000001 > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'error'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.message') LIKE '%different base name%'`) + +--echo +--echo *** 4. 'purge_binlogs' with a target not present in the storage fails +--echo *** with error. +--error 1 +--exec $BINSRV purge_binlogs $binsrv_config_file_path binlog.999999 > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'error'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.message') LIKE '%not present%'`) + +--echo +--echo *** 5. 'purge_binlogs' with a nonexistent configuration file path fails +--echo *** with error. +--error 1 +--exec $BINSRV purge_binlogs $MYSQL_TMP_DIR/no_such_config.json $first_binlog > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'error'`) + +--echo +--echo *** 6. 'purge_binlogs' the first binary log removes only that one file; +--echo *** the response lists the removed record and 'list' shows the two +--echo *** surviving files. +--exec $BINSRV purge_binlogs $binsrv_config_file_path $first_binlog > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 1`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].name') = '$first_binlog'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].size') > 0`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].uri') IS NOT NULL`) + +--exec $BINSRV list $binsrv_config_file_path > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 2`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].name') = '$second_binlog'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[1].name') = '$third_binlog'`) +if ($binsrv_replication_mode == gtid) +{ + --assert(`SELECT JSON_EXTRACT('$result', '$.result[0].previous_gtids') = '$gtids_before_second_binlog'`) + --assert(`SELECT JSON_EXTRACT('$result', '$.result[0].added_gtids') = '$second_binlog_gtids'`) +} + +--echo +--echo *** 6a. Re-opening the purged storage with another 'fetch' must keep the +--echo *** surviving GTID metadata stable. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--exec $BINSRV list $binsrv_config_file_path > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 2`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].name') = '$second_binlog'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[1].name') = '$third_binlog'`) +if ($binsrv_replication_mode == gtid) +{ + --assert(`SELECT JSON_EXTRACT('$result', '$.result[0].previous_gtids') = '$gtids_before_second_binlog'`) + --assert(`SELECT JSON_EXTRACT('$result', '$.result[0].added_gtids') = '$second_binlog_gtids'`) +} + +--echo +--echo *** 7. Re-running 'purge_binlogs' with the just-purged target now fails +--echo *** with the "not present" error - the operation is not idempotent +--echo *** by design. +--error 1 +--exec $BINSRV purge_binlogs $binsrv_config_file_path $first_binlog > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'error'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.message') LIKE '%not present%'`) + +--echo +--echo *** 8. 'purge_binlogs' the second binary log removes only that one (the +--echo *** first one is already gone); 'list' shows just the third file. +--exec $BINSRV purge_binlogs $binsrv_config_file_path $second_binlog > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 1`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].name') = '$second_binlog'`) + +--exec $BINSRV list $binsrv_config_file_path > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 1`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].name') = '$third_binlog'`) + +--echo +--echo *** 9. 'purge_binlogs' the last remaining (current tail) file is +--echo *** refused so that the resume position is preserved for the next +--echo *** 'fetch' / 'pull'; 'list' still shows the third file unchanged. +--error 1 +--exec $BINSRV purge_binlogs $binsrv_config_file_path $third_binlog > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'error'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.message') LIKE '%current tail binlog file%'`) + +--exec $BINSRV list $binsrv_config_file_path > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 1`) +--assert(`SELECT JSON_EXTRACT('$result', '$.result[0].name') = '$third_binlog'`) + +--echo +--echo *** Removing the purge_binlogs result file. +--remove_file $read_from_file + +--echo +--echo *** Dropping the table. +DROP TABLE t1; + +# cleaning up +--source ../include/tear_down_binsrv_environment.inc diff --git a/src/app.cpp b/src/app.cpp index 60eccba..c9d6f7c 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -56,6 +56,7 @@ #include "binsrv/gtids/gtid_set.hpp" #include "binsrv/models/error_response.hpp" +#include "binsrv/models/response_status_type.hpp" #include "binsrv/models/search_response.hpp" #include "binsrv/events/checksum_algorithm_type.hpp" @@ -115,6 +116,7 @@ check_cmd_args(const util::command_line_arg_view &cmd_args, switch (operation_mode) { case binsrv::operation_mode_type::fetch: case binsrv::operation_mode_type::pull: + case binsrv::operation_mode_type::list: if (number_of_cmd_args != expected_number_of_cmd_args_with_config) { return false; } @@ -122,6 +124,7 @@ check_cmd_args(const util::command_line_arg_view &cmd_args, return true; case binsrv::operation_mode_type::search_by_timestamp: case binsrv::operation_mode_type::search_by_gtid_set: + case binsrv::operation_mode_type::purge_binlogs: if (number_of_cmd_args != expected_number_of_cmd_args_with_config_and_value) { return false; @@ -952,6 +955,47 @@ bool handle_version() { return true; } +// shared by all 'handle_*' subcommands that build a 'search_response': +// translates a binlog record kept inside 'binsrv::storage' into +// a record of the response model +void append_record_to_response(binsrv::models::search_response &response, + const binsrv::storage &storage, + const auto &record) { + response.add_record(record.name.str(), record.size, + storage.get_binlog_uri(record.name), + record.previous_gtids, record.added_gtids, + record.timestamps.get_min_timestamp().get_value(), + record.timestamps.get_max_timestamp().get_value()); +} + +bool handle_list(std::string_view config_file_path) { + bool operation_successful{false}; + std::string result; + + try { + const binsrv::main_config config{config_file_path}; + const auto &storage_config = config.root().get<"storage">(); + const auto &replication_config = config.root().get<"replication">(); + const auto replication_mode{replication_config.get<"mode">()}; + + const binsrv::storage storage{ + storage_config, binsrv::storage_construction_mode_type::querying_only, + replication_mode}; + + binsrv::models::search_response response; + for (const auto &record : storage.get_binlog_records()) { + append_record_to_response(response, storage, record); + } + result = response.str(); + operation_successful = true; + } catch (const std::exception &e) { + const binsrv::models::error_response response{e.what()}; + result = response.str(); + } + std::cout << result << '\n'; + return operation_successful; +} + // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) bool handle_search_by_timestamp(std::string_view config_file_path, std::string_view subcommand_value) { @@ -984,11 +1028,7 @@ bool handle_search_by_timestamp(std::string_view config_file_path, if (record.timestamps.get_min_timestamp() > timestamp) { break; } - response.add_record(record.name.str(), record.size, - storage.get_binlog_uri(record.name), - record.previous_gtids, record.added_gtids, - record.timestamps.get_min_timestamp().get_value(), - record.timestamps.get_max_timestamp().get_value()); + append_record_to_response(response, storage, record); } if (response.root().get<"result">().empty()) { throw std::runtime_error("Timestamp is too old"); @@ -1003,6 +1043,62 @@ bool handle_search_by_timestamp(std::string_view config_file_path, return operation_successful; } +// NOLINTNEXTLINE(bugprone-easily-swappable-parameters) +bool handle_purge_binlogs(std::string_view config_file_path, + std::string_view subcommand_value) { + bool operation_successful{false}; + std::string result; + + try { + const auto target_name{ + binsrv::events::composite_binlog_name::parse(subcommand_value)}; + + const binsrv::main_config config{config_file_path}; + const auto &storage_config = config.root().get<"storage">(); + const auto &replication_config = config.root().get<"replication">(); + const auto replication_mode{replication_config.get<"mode">()}; + + // for now, only file backend supported + if (storage_config.get<"backend">() != binsrv::storage_backend_type::file) { + throw std::runtime_error( + "purge_binlogs is only supported on the local filesystem storage " + "backend"); + } + + binsrv::storage storage{storage_config, + binsrv::storage_construction_mode_type::purging, + replication_mode}; + + const auto [removed_records, cleanup_warning_message] = + storage.purge_binlogs(target_name); + + // The step-2 index rewrite has already committed by the time + // 'purge_binlogs' returns normally; if its best-effort step-3 + // cleanup left orphan payload/metadata objects on disk, the + // call reports it via a non-empty 'cleanup_warning_message' and + // we surface that as a 'warning' status (instead of plain + // 'success'), with the underlying error message attached, so the + // operator knows the storage will need attention before the next + // 'fetch' / 'pull' run. + binsrv::models::search_response response; + if (!cleanup_warning_message.empty()) { + response = binsrv::models::search_response{ + binsrv::models::response_status_type::warning, + cleanup_warning_message}; + } + for (const auto &record : removed_records) { + append_record_to_response(response, storage, record); + } + result = response.str(); + operation_successful = true; + } catch (const std::exception &e) { + const binsrv::models::error_response response{e.what()}; + result = response.str(); + } + std::cout << result << '\n'; + return operation_successful; +} + // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) bool handle_search_by_gtid_set(std::string_view config_file_path, std::string_view subcommand_value) { @@ -1044,11 +1140,7 @@ bool handle_search_by_gtid_set(std::string_view config_file_path, } remaining_gtids.subtract(*record.added_gtids); - response.add_record(record.name.str(), record.size, - storage.get_binlog_uri(record.name), - record.previous_gtids, record.added_gtids, - record.timestamps.get_min_timestamp().get_value(), - record.timestamps.get_max_timestamp().get_value()); + append_record_to_response(response, storage, record); } if (!remaining_gtids.is_empty()) { throw std::runtime_error("The specified GTID set cannot be covered"); @@ -1063,6 +1155,29 @@ bool handle_search_by_gtid_set(std::string_view config_file_path, return operation_successful; } +// dispatcher for the read-only subcommands that do not need logger / signal +// handler / replication setup; returns std::nullopt for streaming modes +// ('fetch' and 'pull') and the handler's success flag otherwise +std::optional +dispatch_stateless_command(binsrv::operation_mode_type operation_mode, + std::string_view config_file_path, + std::string_view subcommand_value) { + switch (operation_mode) { + case binsrv::operation_mode_type::version: + return handle_version(); + case binsrv::operation_mode_type::list: + return handle_list(config_file_path); + case binsrv::operation_mode_type::search_by_timestamp: + return handle_search_by_timestamp(config_file_path, subcommand_value); + case binsrv::operation_mode_type::search_by_gtid_set: + return handle_search_by_gtid_set(config_file_path, subcommand_value); + case binsrv::operation_mode_type::purge_binlogs: + return handle_purge_binlogs(config_file_path, subcommand_value); + default: + return std::nullopt; + } +} + // since c++20 it is no longer needed to initialize std::atomic_flag with // ATOMIC_FLAG_INIT as this flag is modified from a signal handler it is marked // as volatile to make sure optimizer do optimizations which will be unsafe for @@ -1090,31 +1205,22 @@ int main(int argc, char *argv[]) { if (!cmd_args_checked) { std::cerr << "usage: " << executable_name << " (fetch|pull)) \n" + << " " << executable_name << " list \n" << " " << executable_name << " search_by_timestamp \n" << " " << executable_name << " search_by_gtid_set \n" + << " " << executable_name + << " purge_binlogs \n" << " " << executable_name << " version\n"; return EXIT_FAILURE; } - // handling the 'version' command - if (operation_mode == binsrv::operation_mode_type::version) { - return handle_version() ? EXIT_SUCCESS : EXIT_FAILURE; - } - - // handling the 'search_by_timestamp' command - if (operation_mode == binsrv::operation_mode_type::search_by_timestamp) { - return handle_search_by_timestamp(config_file_path, subcommand_value) - ? EXIT_SUCCESS - : EXIT_FAILURE; - } - - // handling the 'search_by_gtid_set' command - if (operation_mode == binsrv::operation_mode_type::search_by_gtid_set) { - return handle_search_by_gtid_set(config_file_path, subcommand_value) - ? EXIT_SUCCESS - : EXIT_FAILURE; + // handling the read-only subcommands ('version', 'list', 'search_by_*') + if (const auto stateless_result{dispatch_stateless_command( + operation_mode, config_file_path, subcommand_value)}; + stateless_result.has_value()) { + return *stateless_result ? EXIT_SUCCESS : EXIT_FAILURE; } int exit_code = EXIT_FAILURE; diff --git a/src/binsrv/basic_storage_backend.cpp b/src/binsrv/basic_storage_backend.cpp index 28e80a3..df953b3 100644 --- a/src/binsrv/basic_storage_backend.cpp +++ b/src/binsrv/basic_storage_backend.cpp @@ -16,6 +16,8 @@ #include "binsrv/basic_storage_backend.hpp" #include +#include +#include #include #include #include @@ -40,6 +42,33 @@ void basic_storage_backend::put_object(std::string_view name, do_put_object(name, content); } +void basic_storage_backend::remove_object(std::string_view name) { + do_remove_object(name); + do_fsync(); +} + +void basic_storage_backend::remove_objects(std::span names) { + // Best-effort batch with delayed failure reporting: try every + // 'do_remove_object', remember the first failure (if any), always + // run the single 'do_fsync' barrier so the partial cleanup we did + // achieve is durable, and finally re-raise the captured failure + // so the caller can decide what to do. + std::exception_ptr first_error; + for (const auto &name : names) { + try { + do_remove_object(name); + } catch (...) { + if (!first_error) { + first_error = std::current_exception(); + } + } + } + do_fsync(); + if (first_error) { + std::rethrow_exception(first_error); + } +} + [[nodiscard]] std::uint64_t basic_storage_backend::open_stream(std::string_view name, storage_backend_open_stream_mode mode) { diff --git a/src/binsrv/basic_storage_backend.hpp b/src/binsrv/basic_storage_backend.hpp index 04d4f2f..653a3fa 100644 --- a/src/binsrv/basic_storage_backend.hpp +++ b/src/binsrv/basic_storage_backend.hpp @@ -18,6 +18,7 @@ #include "binsrv/basic_storage_backend_fwd.hpp" // IWYU pragma: export +#include #include #include @@ -37,7 +38,22 @@ class basic_storage_backend { [[nodiscard]] storage_object_name_container list_objects(); [[nodiscard]] std::string get_object(std::string_view name); + // 'put_object' is an atomic overwrite: a concurrent / post-crash + // reader either sees the previous bytes in full or the new bytes in + // full, never a partial mix. void put_object(std::string_view name, util::const_byte_span content); + // Single-object remove followed by a durability barrier. On + // return the unlink is durable against a power-loss / hard crash. + void remove_object(std::string_view name); + // Batch remove. Each name is dropped on a best-effort basis - + // failures do not abort the loop - and a single durability + // barrier runs once for the whole batch. If at least one + // per-name removal failed, the very first failure is re-raised + // out of this call after the barrier has run, so the caller + // learns that the batch was not 100% clean while still benefiting + // from the partial cleanup and the durability of whatever did + // succeed. + void remove_objects(std::span names); [[nodiscard]] bool is_stream_open() const noexcept { return stream_open_; } [[nodiscard]] std::uint64_t @@ -55,6 +71,9 @@ class basic_storage_backend { [[nodiscard]] virtual std::string do_get_object(std::string_view name) = 0; virtual void do_put_object(std::string_view name, util::const_byte_span content) = 0; + virtual void do_remove_object(std::string_view name) = 0; + // Backend-specific durability barrier. + virtual void do_fsync() = 0; [[nodiscard]] virtual std::uint64_t do_open_stream(std::string_view name, diff --git a/src/binsrv/filesystem_storage_backend.cpp b/src/binsrv/filesystem_storage_backend.cpp index 631ea7f..cba4ace 100644 --- a/src/binsrv/filesystem_storage_backend.cpp +++ b/src/binsrv/filesystem_storage_backend.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -34,9 +35,19 @@ #include "util/byte_span.hpp" #include "util/exception_location_helpers.hpp" +#include "util/native_file_operations_helpers.hpp" namespace binsrv { +namespace { +// suffix appended to the object name when writing the temporary file +// used by the atomic-overwrite implementation of 'do_put_object'; a +// deterministic name keeps cleanup-on-startup trivial - any stale +// '.tmp' left by a crashed put is simply overwritten by the +// next legitimate put for the same name +constexpr std::string_view tmp_object_suffix{".tmp"}; +} // namespace + filesystem_storage_backend::filesystem_storage_backend( const storage_config &config) : root_path_{}, ofs_{} { @@ -138,22 +149,78 @@ filesystem_storage_backend::do_get_object(std::string_view name) { void filesystem_storage_backend::do_put_object(std::string_view name, util::const_byte_span content) { + // atomic-overwrite is implemented via the standard POSIX + // write-temp-then-rename idiom: a crash mid-write leaves only the + // (deterministically-named) '.tmp', never a truncated ''; + // a subsequent legitimate put for the same name simply truncates and + // overwrites the stale tmp before the rename. + // The two fsync(2) calls below are both required for full crash + // durability: the first flushes the tmp file's data + metadata to + // stable storage *before* the rename commits, so a post-crash + // observer never sees the final name pointing at a partially-written + // inode; the second flushes the parent directory entry so the + // rename itself survives a hard crash. const auto object_path = get_object_path(name); + auto tmp_object_path = object_path; + tmp_object_path += tmp_object_suffix; + // opening in binary mode with truncating std::ofstream object_ofs{}; object_ofs.rdbuf()->pubsetbuf(nullptr, 0U); - object_ofs.open(object_path, std::ios_base::out | std::ios_base::binary | - std::ios_base::trunc); + object_ofs.open(tmp_object_path, std::ios_base::out | std::ios_base::binary | + std::ios_base::trunc); if (!object_ofs.is_open()) { util::exception_location().raise( - "cannot open underlying object file for writing"); + "cannot open underlying tmp object file for writing"); } const auto content_sv = util::as_string_view(content); if (!object_ofs.write(std::data(content_sv), static_cast(std::size(content_sv)))) { util::exception_location().raise( - "cannot write date to underlying object file"); + "cannot write data to underlying tmp object file"); + } + // explicit close so a failed flush surfaces before the rename + object_ofs.close(); + if (object_ofs.fail()) { + util::exception_location().raise( + "cannot close underlying tmp object file"); } + // make the tmp file's content durable before the rename swaps it + util::fsync(tmp_object_path); + + std::error_code rename_ec; + // std::filesystem::rename() with std::error_code overload is noexcept; + // on POSIX it maps to rename(2), which is atomic within a single + // directory - readers see either the previous '' content or the + // new content, never a partial state + std::filesystem::rename(tmp_object_path, object_path, rename_ec); + if (rename_ec) { + util::exception_location().raise( + "cannot rename underlying tmp object file: " + rename_ec.message()); + } + util::fsync(object_path.parent_path()); +} + +void filesystem_storage_backend::do_remove_object(std::string_view name) { + const auto object_path = get_object_path(name); + std::error_code remove_ec; + // intentionally using non-throwing overload to handle the "file does not + // exist" case explicitly below + if (!std::filesystem::remove(object_path, remove_ec)) { + if (remove_ec) { + util::exception_location().raise( + "cannot remove underlying object file: " + remove_ec.message()); + } + util::exception_location().raise( + "cannot remove underlying object file: file does not exist"); + } + // durability barrier is the responsibility of 'do_fsync()', + // invoked by the caller +} + +void filesystem_storage_backend::do_fsync() { + // all objects in this backend live directly under 'root_path_' + util::fsync(root_path_); } [[nodiscard]] std::uint64_t filesystem_storage_backend::do_open_stream( diff --git a/src/binsrv/filesystem_storage_backend.hpp b/src/binsrv/filesystem_storage_backend.hpp index dd2dbaf..a305eb2 100644 --- a/src/binsrv/filesystem_storage_backend.hpp +++ b/src/binsrv/filesystem_storage_backend.hpp @@ -47,6 +47,8 @@ class [[nodiscard]] filesystem_storage_backend final [[nodiscard]] std::string do_get_object(std::string_view name) override; void do_put_object(std::string_view name, util::const_byte_span content) override; + void do_remove_object(std::string_view name) override; + void do_fsync() override; [[nodiscard]] std::uint64_t do_open_stream(std::string_view name, diff --git a/src/binsrv/models/response_status_type.hpp b/src/binsrv/models/response_status_type.hpp index e42e69d..66c995f 100644 --- a/src/binsrv/models/response_status_type.hpp +++ b/src/binsrv/models/response_status_type.hpp @@ -34,6 +34,7 @@ namespace binsrv::models { // clang-format off #define BINSRV_MODELS_RESPONSE_STATUS_TYPE_X_SEQUENCE() \ BINSRV_MODELS_RESPONSE_STATUS_TYPE_X_MACRO(error ), \ + BINSRV_MODELS_RESPONSE_STATUS_TYPE_X_MACRO(warning), \ BINSRV_MODELS_RESPONSE_STATUS_TYPE_X_MACRO(success) // clang-format on diff --git a/src/binsrv/models/search_response.cpp b/src/binsrv/models/search_response.cpp index 0a6b02d..9b5304e 100644 --- a/src/binsrv/models/search_response.cpp +++ b/src/binsrv/models/search_response.cpp @@ -36,8 +36,16 @@ namespace binsrv::models { search_response::search_response() : impl_{{expected_search_response_version}, {response_status_type::success}, + {}, {}} {} +search_response::search_response(response_status_type status, + std::string_view message) + : impl_{{expected_search_response_version}, + {status}, + {}, + {std::string{message}}} {} + search_response::search_response(const search_response &) = default; search_response::search_response(search_response &&) noexcept = default; search_response &search_response::operator=(const search_response &) = default; diff --git a/src/binsrv/models/search_response.hpp b/src/binsrv/models/search_response.hpp index 68ecd86..729dcb1 100644 --- a/src/binsrv/models/search_response.hpp +++ b/src/binsrv/models/search_response.hpp @@ -29,6 +29,7 @@ #include "binsrv/models/binlog_file_record_fwd.hpp" #include "binsrv/models/response_status_type_fwd.hpp" +#include "util/common_optional_types.hpp" #include "util/nv_tuple.hpp" namespace binsrv::models { @@ -40,13 +41,17 @@ class [[nodiscard]] search_response { using impl_type = util::nv_tuple< // clang-format off util::nv<"version", std::uint32_t>, - util::nv<"status", response_status_type>, - util::nv<"result", binlog_file_record_container> + util::nv<"status" , response_status_type>, + util::nv<"result" , binlog_file_record_container>, + util::nv<"message", util::optional_string> // clang-format on >; public: explicit search_response(); + // Constructs a response with the given 'status' and a non-empty + // diagnostic 'message' attached to it. + search_response(response_status_type status, std::string_view message); search_response(const search_response &); search_response(search_response &&) noexcept; search_response &operator=(const search_response &); diff --git a/src/binsrv/operation_mode_type.hpp b/src/binsrv/operation_mode_type.hpp index 700f9f5..aa1827e 100644 --- a/src/binsrv/operation_mode_type.hpp +++ b/src/binsrv/operation_mode_type.hpp @@ -34,8 +34,10 @@ namespace binsrv { #define BINSRV_OPERATION_MODE_TYPE_X_SEQUENCE() \ BINSRV_OPERATION_MODE_TYPE_X_MACRO(fetch ), \ BINSRV_OPERATION_MODE_TYPE_X_MACRO(pull ), \ + BINSRV_OPERATION_MODE_TYPE_X_MACRO(list ), \ BINSRV_OPERATION_MODE_TYPE_X_MACRO(search_by_timestamp), \ BINSRV_OPERATION_MODE_TYPE_X_MACRO(search_by_gtid_set ), \ + BINSRV_OPERATION_MODE_TYPE_X_MACRO(purge_binlogs ), \ BINSRV_OPERATION_MODE_TYPE_X_MACRO(version ) // clang-format on diff --git a/src/binsrv/s3_storage_backend.cpp b/src/binsrv/s3_storage_backend.cpp index 5a3ebe8..87a6977 100644 --- a/src/binsrv/s3_storage_backend.cpp +++ b/src/binsrv/s3_storage_backend.cpp @@ -679,6 +679,18 @@ void s3_storage_backend::do_put_object(std::string_view name, {.bucket = bucket_, .object_path = get_object_path(name)}, content); } +void s3_storage_backend::do_remove_object( + [[maybe_unused]] std::string_view name) { + util::exception_location().raise( + "remove_object is not supported on the S3 storage backend"); +} + +void s3_storage_backend::do_fsync() { + // intentional no-op on S3: every PutObject / DeleteObject response + // is itself the durability point (S3 provides strong + // read-after-write consistency) +} + [[nodiscard]] std::uint64_t s3_storage_backend::do_open_stream(std::string_view name, storage_backend_open_stream_mode mode) { diff --git a/src/binsrv/s3_storage_backend.hpp b/src/binsrv/s3_storage_backend.hpp index e4bf039..dad1aab 100644 --- a/src/binsrv/s3_storage_backend.hpp +++ b/src/binsrv/s3_storage_backend.hpp @@ -72,6 +72,8 @@ class [[nodiscard]] s3_storage_backend final : public basic_storage_backend { [[nodiscard]] std::string do_get_object(std::string_view name) override; void do_put_object(std::string_view name, util::const_byte_span content) override; + void do_remove_object(std::string_view name) override; + void do_fsync() override; [[nodiscard]] std::uint64_t do_open_stream(std::string_view name, diff --git a/src/binsrv/storage.cpp b/src/binsrv/storage.cpp index 90fd655..6f03a37 100644 --- a/src/binsrv/storage.cpp +++ b/src/binsrv/storage.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -26,11 +27,13 @@ #include #include #include +#include #include "binsrv/basic_storage_backend.hpp" #include "binsrv/binlog_file_metadata.hpp" #include "binsrv/replication_mode_type.hpp" #include "binsrv/storage_backend_factory.hpp" +#include "binsrv/storage_backend_type.hpp" #include "binsrv/storage_config.hpp" #include "binsrv/storage_metadata.hpp" @@ -51,6 +54,13 @@ storage::storage(const storage_config &config, replication_mode_type replication_mode) : construction_mode_{construction_mode}, backend_{}, replication_mode_{replication_mode} { + if (construction_mode_ == storage_construction_mode_type::purging && + config.get<"backend">() != storage_backend_type::file) { + util::exception_location().raise( + "purge_binlogs is only supported on the local filesystem storage " + "backend"); + } + const auto &checkpoint_size_opt{config.get<"checkpoint_size">()}; if (checkpoint_size_opt.has_value()) { checkpoint_size_bytes_ = checkpoint_size_opt->get_value(); @@ -313,6 +323,101 @@ void storage::flush_event_buffer() { } } +[[nodiscard]] std::pair +storage::purge_binlogs(const events::composite_binlog_name &target) { + ensure_purging_mode(); + + if (is_empty()) { + util::exception_location().raise( + "cannot purge: binlog storage is empty"); + } + const auto &front_base_name{binlog_records_.front().name.get_base_name()}; + if (target.get_base_name() != front_base_name) { + util::exception_location().raise( + "cannot purge: target binlog name has a different base name than " + "the binlog records in the storage"); + } + const auto target_it{std::ranges::find(std::as_const(binlog_records_), target, + &binlog_record::name)}; + if (target_it == std::cend(binlog_records_)) { + util::exception_location().raise( + "cannot purge: target binlog name is not present in the storage"); + } + // refuse to purge the current tail: emptying the storage would lose + // the resume position (current binlog name / position in position + // mode, executed GTID set in GTID mode) and force the next 'fetch' / + // 'pull' to re-stream from the very beginning of the source's + // retained binlog history. + if (target_it == std::prev(std::cend(binlog_records_))) { + util::exception_location().raise( + "cannot purge: target is the current tail binlog file; at least " + "one binlog file must remain in the storage to preserve the " + "resume position"); + } + + // step 1: extract the prefix [begin, target_it + 1) - this + // becomes the set of records we are going to drop on disk; the + // returned vector preserves the original order so the caller can + // use it directly to produce a response + const auto victim_count{static_cast( + std::distance(std::cbegin(binlog_records_), target_it) + 1)}; + binlog_record_container removed_records; + removed_records.reserve(victim_count); + std::move(std::begin(binlog_records_), + std::begin(binlog_records_) + + static_cast(victim_count), + std::back_inserter(removed_records)); + binlog_records_.erase(std::begin(binlog_records_), + std::begin(binlog_records_) + + static_cast(victim_count)); + + // step 2: rewrite the binlog index from the surviving records left + // in 'binlog_records_' after step 1 (always non-empty thanks to the + // tail-refusal guard above). 'save_binlog_index' goes through the + // backend's atomic-overwrite 'put_object', so from this point on + // the purge is considered committed - any subsequent failure + // leaves the storage in an inconsistent state (leftover payload / + // metadata files no longer referenced by the index) that the + // constructor's existing validators will refuse to open on next + // startup. + save_binlog_index(); + + // step 3: best-effort removal of the victim payload + metadata + // objects; any failure here is intentionally swallowed - the index + // has already been committed and reporting a "file could not be + // removed" error to the caller would falsely suggest that the + // purge itself failed; the resulting leftovers will trip the + // constructor's validators on next startup. + // We materialise the (metadata + payload) names for every victim + // into a single batch and hand it to 'basic_storage_backend:: + // remove_objects', which runs the backend's durability barrier + // exactly once at the end of the batch - so the whole batch + // amortises to a single fsync(2) on the local filesystem backend + // (and a no-op on S3) instead of O(N) syncs. + std::vector victim_object_names; + victim_object_names.reserve(std::size(removed_records) * 2U); + for (const auto &victim : removed_records) { + victim_object_names.emplace_back( + generate_binlog_metadata_name(victim.name)); + victim_object_names.emplace_back(victim.name.str()); + } + std::string cleanup_warning_message; + try { + backend_->remove_objects(victim_object_names); + } catch (const std::exception &e) { + // 'remove_objects' re-raises the first per-name failure (if any) + // after running the durability barrier; we do not propagate it + // to the caller because the index has already been committed + // and any leftover payload/metadata files will be picked up by + // the constructor's validators on the next startup. We just + // capture the underlying message so the caller can surface it + // under a 'warning' status in the JSON response. + cleanup_warning_message = e.what(); + } + + return {std::move(removed_records), std::move(cleanup_warning_message)}; +} + [[nodiscard]] std::string storage::get_binlog_uri( const events::composite_binlog_name &binlog_name) const { return backend_->get_object_uri(binlog_name.str()); @@ -325,6 +430,13 @@ void storage::ensure_streaming_mode() const { } } +void storage::ensure_purging_mode() const { + if (construction_mode_ != storage_construction_mode_type::purging) { + util::exception_location().raise( + "operation requires storage to be constructed in purging mode"); + } +} + void storage::update_last_checkpoint_info() { if (size_checkpointing_enabled()) { last_checkpoint_position_ = get_current_position(); diff --git a/src/binsrv/storage.hpp b/src/binsrv/storage.hpp index ecdbbcc..5ab5fea 100644 --- a/src/binsrv/storage.hpp +++ b/src/binsrv/storage.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include "binsrv/basic_storage_backend_fwd.hpp" @@ -133,6 +134,23 @@ class [[nodiscard]] storage { void discard_incomplete_transaction_events(); void flush_event_buffer(); + // Removes the contiguous prefix of binlog records [front, target] + // (inclusive) from the storage and returns a pair: + // .first - the dropped records in chronological order (oldest + // first), suitable for direct iteration by the caller + // to build a response; + // .second - empty on full success; non-empty when the best-effort + // step-3 cleanup (removal of victim payload + metadata + // objects) failed for at least one object after the + // step-2 index rewrite had already committed. The purge + // itself is considered successful in this case, but the + // storage on disk now contains orphan files that the + // constructor's validators will refuse to open on next + // startup The string carries the underlying cleanup + // error message so the caller. + [[nodiscard]] std::pair + purge_binlogs(const events::composite_binlog_name &target); + [[nodiscard]] std::string get_binlog_uri(const events::composite_binlog_name &binlog_name) const; @@ -159,6 +177,7 @@ class [[nodiscard]] storage { util::ctime_timestamp_range incomplete_transaction_timestamps_{}; void ensure_streaming_mode() const; + void ensure_purging_mode() const; [[nodiscard]] const binlog_record & get_current_binlog_record() const noexcept { diff --git a/src/binsrv/storage_fwd.hpp b/src/binsrv/storage_fwd.hpp index f1a243f..91a0310 100644 --- a/src/binsrv/storage_fwd.hpp +++ b/src/binsrv/storage_fwd.hpp @@ -22,7 +22,8 @@ namespace binsrv { enum class storage_construction_mode_type : std::uint8_t { querying_only, - streaming + streaming, + purging }; enum class open_binlog_status : std::uint8_t { diff --git a/src/util/native_file_operations_helpers.hpp b/src/util/native_file_operations_helpers.hpp new file mode 100644 index 0000000..289f2d5 --- /dev/null +++ b/src/util/native_file_operations_helpers.hpp @@ -0,0 +1,33 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef UTIL_NATIVE_FILE_OPERATIONS_HELPERS_HPP +#define UTIL_NATIVE_FILE_OPERATIONS_HELPERS_HPP + +#include + +namespace util { + +// Forces a previously-completed change to the contents of the +// regular file or directory at 'path' to stable storage so it +// survives a power-loss / hard crash on return. +// +// Raises 'std::runtime_error' if the path cannot be opened, fsync'd, +// or closed. +void fsync(const std::filesystem::path &path); + +} // namespace util + +#endif // UTIL_NATIVE_FILE_OPERATIONS_HELPERS_HPP diff --git a/src/util/native_file_operations_helpers_posix.cpp b/src/util/native_file_operations_helpers_posix.cpp new file mode 100644 index 0000000..1b7d65d --- /dev/null +++ b/src/util/native_file_operations_helpers_posix.cpp @@ -0,0 +1,56 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#include "util/native_file_operations_helpers.hpp" + +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include "util/exception_location_helpers.hpp" + +namespace util { + +void fsync(const std::filesystem::path &path) { + // O_RDONLY is sufficient for 'fsync(2)' on both a regular file + // and a directory's entry list; the kernel does + // not require write access to flush. + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-vararg,hicpp-vararg) + const int file_descriptor = ::open(path.c_str(), O_RDONLY); + if (file_descriptor < 0) { + const auto saved_errno = errno; + exception_location().raise( + "cannot open path for fsync: " + + std::error_code{saved_errno, std::generic_category()}.message()); + } + + const boost::scope::scope_exit close_guard{ + [&file_descriptor]() noexcept { ::close(file_descriptor); }}; + if (::fsync(file_descriptor) != 0) { + const auto saved_errno = errno; + exception_location().raise( + "cannot fsync path: " + + std::error_code{saved_errno, std::generic_category()}.message()); + } +} + +} // namespace util