diff --git a/include/my_pthread.h b/include/my_pthread.h index 6fe76c40cba54..0d64090701a05 100644 --- a/include/my_pthread.h +++ b/include/my_pthread.h @@ -624,6 +624,20 @@ typedef uint64 my_thread_id; */ #define MY_THREAD_ID_MAX UINT32_MAX +#ifdef _WIN32 +#define MAX_THREAD_NAME 256 +#elif defined(__linux__) +#define MAX_THREAD_NAME 16 +#elif defined(__FreeBSD__) || defined(__OpenBSD__) +#define MAX_THREAD_NAME 19 +#include +#elif defined(__apple_build_version__) +#include +#define MAX_THREAD_NAME MAXTHREADNAMESIZE +#else +#define MAX_THREAD_NAME 16 +#endif + extern void my_threadattr_global_init(void); extern my_bool my_thread_global_init(void); extern void my_thread_set_name(const char *); diff --git a/libmysqld/CMakeLists.txt b/libmysqld/CMakeLists.txt index f59354625b345..14f7a12381db9 100644 --- a/libmysqld/CMakeLists.txt +++ b/libmysqld/CMakeLists.txt @@ -72,6 +72,7 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc ../sql/mf_iocache.cc ../sql/my_decimal.cc ../sql/net_serv.cc ../sql/opt_range.cc ../sql/opt_group_by_cardinality.cc + ../sql/sql_parallel_workers.cc ../sql/opt_rewrite_date_cmp.cc ../sql/opt_rewrite_remove_casefold.cc ../sql/opt_sargable_left.cc diff --git a/mysql-test/main/mysqld--help.result b/mysql-test/main/mysqld--help.result index 17458f8cf56ba..c5314ead01525 100644 --- a/mysql-test/main/mysqld--help.result +++ b/mysql-test/main/mysqld--help.result @@ -1012,6 +1012,9 @@ The following specify which files/extra groups are read (specified before remain Cost of checking the row against the WHERE clause. Increasing this will have the optimizer to prefer plans with less row combinations + --parallel-worker-threads=# + Number of worker threads available for parallel query + execution. 0 means parallel execution is disabled --path=name Comma-separated list of schema names that defines the search order for stored routines --performance-schema @@ -2004,6 +2007,7 @@ optimizer-trace optimizer-trace-max-mem-size 1048576 optimizer-use-condition-selectivity 4 optimizer-where-cost 0.032 +parallel-worker-threads 0 path CURRENT_SCHEMA performance-schema FALSE performance-schema-accounts-size -1 diff --git a/mysql-test/main/parallel_query.result b/mysql-test/main/parallel_query.result new file mode 100644 index 0000000000000..43d557410be02 --- /dev/null +++ b/mysql-test/main/parallel_query.result @@ -0,0 +1,61 @@ +SET DEBUG_SYNC='RESET'; +# +# MDEV-39492 Parallel Query: Study how to create worker threads +# +set session parallel_worker_threads=3; +# we should currently see 3 warnings; +select seq from seq_1_to_2; +seq +1 +2 +Warnings: +Warning 1105 This is an example warning to show we can push a warning from a worker thread to its manager +Warning 1105 This is an example warning to show we can push a warning from a worker thread to its manager +Warning 1105 This is an example warning to show we can push a warning from a worker thread to its manager +set session parallel_worker_threads=10; +# the 10th workers throws out an error +select seq from seq_1_to_2; +ERROR HY000: Argument to the worker_busted_function() function does not belong to the range [0,1] +set session parallel_worker_threads=0; +connect killee, localhost, root, , ; +# check that kill query on a parallel worker is passed to the manager +connection killee; +set session parallel_worker_threads=3; +SET @save_dbug= @@global.debug_dbug; +SET GLOBAL debug_dbug = "+d,pwt_worker_pause_before_signal"; +select seq from seq_1_to_2;; +# now use the default connection to view/kill the thread group executing +# the parallel work +connection default; +SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; +# wait for all 3 workers to hit our debug sync point +# then have a look at what is in our process list without thread ID +select USER, HOST, DB, COMMAND, STATE, SUBSTRING(INFO, 1, 32) +from information_schema.processlist; +USER HOST DB COMMAND STATE SUBSTRING(INFO, 1, 32) +root localhost test Query debug sync point: now Parallel Worker 3 For Thread ID +root localhost test Query debug sync point: now Parallel Worker 2 For Thread ID +root localhost test Query debug sync point: now Parallel Worker 1 For Thread ID +root localhost test Query Reading data from parallel workers select seq from seq_1_to_2 +root localhost test Query Filling schema table select USER, HOST, DB, COMMAND, +kill query ID; +# signal our workers to continue execution +SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; +# then wait for the manager thread to clean up and go back to sleep +# review error message on --reap +connection killee; +ERROR 70100: Query execution was interrupted +SET DEBUG_SYNC = "RESET"; +# save as above, but kill a worker with a simple kill and see the +# connection drop +select seq from seq_1_to_2;; +connection default; +SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; +kill ID; +SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; +connection killee; +ERROR 70100: Query execution was interrupted +# killee connection now dead, confirmed below +connection default; +SET GLOBAL debug_dbug = @save_dbug; +SET DEBUG_SYNC = "RESET"; diff --git a/mysql-test/main/parallel_query.test b/mysql-test/main/parallel_query.test new file mode 100644 index 0000000000000..bf358d851049c --- /dev/null +++ b/mysql-test/main/parallel_query.test @@ -0,0 +1,104 @@ +# +# Test KILL and KILL QUERY statements. +# + +-- source include/count_sessions.inc +-- source include/not_embedded.inc +# this will be used in the future -- source include/have_innodb.inc +-- source include/have_sequence.inc +--source include/have_debug.inc +--source include/have_debug_sync.inc +SET DEBUG_SYNC='RESET'; + +--disable_service_connection + +--echo # +--echo # MDEV-39492 Parallel Query: Study how to create worker threads +--echo # + +set session parallel_worker_threads=3; +--echo # we should currently see 3 warnings; +select seq from seq_1_to_2; + +set session parallel_worker_threads=10; +--echo # the 10th workers throws out an error +--error ER_ARGUMENT_OUT_OF_RANGE +select seq from seq_1_to_2; + +set session parallel_worker_threads=0; +connect (killee, localhost, root, , ); + +--echo # check that kill query on a parallel worker is passed to the manager + +--connection killee +let $id= `select connection_id()`; +set session parallel_worker_threads=3; + +# worker THDs don't inherit session vars, we are using a debug_sync_set_action +# to pause execution in our worker threads just after they have done something + +SET @save_dbug= @@global.debug_dbug; +SET GLOBAL debug_dbug = "+d,pwt_worker_pause_before_signal"; + +--send select seq from seq_1_to_2; +--echo # now use the default connection to view/kill the thread group executing +--echo # the parallel work +--connection default +SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; +let $name= "Parallel Worker 1 For Thread ID $id"; +let $killID= `SELECT @kid:=ID from information_schema.processlist + where info like $name limit 1`; +--echo # wait for all 3 workers to hit our debug sync point +let $wait_condition= + select count(*) = 3 from information_schema.processlist + where state like 'debug sync%' and info like '%Thread ID $id'; +--source include/wait_condition.inc +--echo # then have a look at what is in our process list without thread ID +select USER, HOST, DB, COMMAND, STATE, SUBSTRING(INFO, 1, 32) + from information_schema.processlist; +--replace_result $killID ID +eval kill query $killID; +--echo # signal our workers to continue execution +SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; +--echo # then wait for the manager thread to clean up and go back to sleep +let $wait_condition= + select count(*) = 1 from information_schema.processlist + where command = 'Sleep' and id = $id; +--source include/wait_condition.inc +#select * from information_schema.processlist; +--echo # review error message on --reap +--connection killee +--error ER_QUERY_INTERRUPTED +reap; + +SET DEBUG_SYNC = "RESET"; + +--echo # save as above, but kill a worker with a simple kill and see the +--echo # connection drop + +--send select seq from seq_1_to_2; +connection default; +SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; +let $name= "Parallel Worker 1 For Thread ID $id"; +let $killID= `SELECT @kid:=ID from information_schema.processlist + where info like $name limit 1`; +--replace_result $killID ID +eval kill $killID; +SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; +let $wait_condition= + select count(*) = 0 from information_schema.processlist + where command = 'Sleep' and id = $id; +--source include/wait_condition.inc +connection killee; +--disable_warnings +--error ER_QUERY_INTERRUPTED +reap; + +--echo # killee connection now dead, confirmed below + +connection default; + +SET GLOBAL debug_dbug = @save_dbug; +SET DEBUG_SYNC = "RESET"; + +--source include/wait_until_count_sessions.inc diff --git a/mysql-test/main/parallel_query_oom.result b/mysql-test/main/parallel_query_oom.result new file mode 100644 index 0000000000000..c42bf2cf2abf7 --- /dev/null +++ b/mysql-test/main/parallel_query_oom.result @@ -0,0 +1,21 @@ +# +# MDEV-39492 Parallel Query: OOM in worker error_to_queue surfaces a +# single ER_OUTOFMEMORY warning so worker diagnostics aren't silently +# lost. +# +set @save_dbug=@@global.debug_dbug; +set global debug_dbug="+d,pwt_error_to_queue_oom"; +set session parallel_worker_threads=1; +# The prototype worker emits either a warning or my_error() depending on the +# parity of its thread id. Either path runs through error_to_queue; the +# DBUG injection forces both into the OOM branch, so neither the original +# warning nor the original error reaches the user. We expect just the +# manager-surfaced ER_OUTOFMEMORY warning. +select count(*) from seq_1_to_2; +count(*) +2 +show warnings; +Level Code Message +Warning 1037 Parallel worker diagnostics were dropped due to memory allocation failure +set global debug_dbug=@save_dbug; +set session parallel_worker_threads=default; diff --git a/mysql-test/main/parallel_query_oom.test b/mysql-test/main/parallel_query_oom.test new file mode 100644 index 0000000000000..6a8afd3266182 --- /dev/null +++ b/mysql-test/main/parallel_query_oom.test @@ -0,0 +1,33 @@ +# +# Test OOM handling in the parallel worker error/warning queue. +# + +-- source include/count_sessions.inc +-- source include/not_embedded.inc +-- source include/have_sequence.inc +-- source include/have_debug.inc + +--disable_service_connection + +--echo # +--echo # MDEV-39492 Parallel Query: OOM in worker error_to_queue surfaces a +--echo # single ER_OUTOFMEMORY warning so worker diagnostics aren't silently +--echo # lost. +--echo # + +set @save_dbug=@@global.debug_dbug; +set global debug_dbug="+d,pwt_error_to_queue_oom"; +set session parallel_worker_threads=1; + +--echo # The prototype worker emits either a warning or my_error() depending on the +--echo # parity of its thread id. Either path runs through error_to_queue; the +--echo # DBUG injection forces both into the OOM branch, so neither the original +--echo # warning nor the original error reaches the user. We expect just the +--echo # manager-surfaced ER_OUTOFMEMORY warning. +select count(*) from seq_1_to_2; +show warnings; + +set global debug_dbug=@save_dbug; +set session parallel_worker_threads=default; + +--source include/wait_until_count_sessions.inc diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result index 268c06632b8b4..573cb113d9efe 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result +++ b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result @@ -2992,6 +2992,16 @@ NUMERIC_BLOCK_SIZE NULL ENUM_VALUE_LIST NULL READ_ONLY NO COMMAND_LINE_ARGUMENT REQUIRED +VARIABLE_NAME PARALLEL_WORKER_THREADS +VARIABLE_SCOPE SESSION +VARIABLE_TYPE BIGINT UNSIGNED +VARIABLE_COMMENT Number of worker threads available for parallel query execution. 0 means parallel execution is disabled +NUMERIC_MIN_VALUE 0 +NUMERIC_MAX_VALUE 100 +NUMERIC_BLOCK_SIZE 1 +ENUM_VALUE_LIST NULL +READ_ONLY NO +COMMAND_LINE_ARGUMENT REQUIRED VARIABLE_NAME PATH VARIABLE_SCOPE SESSION VARIABLE_TYPE VARCHAR diff --git a/mysys/my_thread_name.cc b/mysys/my_thread_name.cc index 9f32dae269ab0..90418c5cd8825 100644 --- a/mysys/my_thread_name.cc +++ b/mysys/my_thread_name.cc @@ -20,12 +20,7 @@ #include #ifdef _WIN32 -#define MAX_THREAD_NAME 256 typedef HRESULT (*func_SetThreadDescription)(HANDLE,PCWSTR); -#elif defined(__linux__) -#define MAX_THREAD_NAME 16 -#elif defined(__FreeBSD__) || defined(__OpenBSD__) -#include #endif #if defined(HAVE_PSI_THREAD_INTERFACE) && !defined DBUG_OFF diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index e54e894e1d0fc..5ede64143d08a 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -115,6 +115,7 @@ SET (SQL_SOURCE ../sql-common/client_plugin.c opt_range.cc vector_mhnsw.cc opt_group_by_cardinality.cc + sql_parallel_workers.cc opt_rewrite_date_cmp.cc opt_rewrite_remove_casefold.cc opt_sargable_left.cc diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 1aa5e0cfe7ddf..67293cd2553e4 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -9911,6 +9911,7 @@ PSI_stage_info stage_starting= { 0, "starting", 0}; PSI_stage_info stage_waiting_for_flush= { 0, "Waiting for non trans tables to be flushed", 0}; PSI_stage_info stage_waiting_for_ddl= { 0, "Waiting for DDLs", 0}; PSI_stage_info stage_waiting_for_reset_master= { 0, "Waiting for a running RESET MASTER to complete", 0}; +PSI_stage_info stage_reading_data_from_parallel_worker= { 0, "Reading data from parallel workers", 0}; #ifdef WITH_WSREP // Additional Galera thread states @@ -10003,6 +10004,10 @@ PSI_memory_key key_memory_user_var_entry_value; PSI_memory_key key_memory_String_value; PSI_memory_key key_memory_WSREP; PSI_memory_key key_memory_trace_ddl_info; +PSI_memory_key key_memory_pwt_queued_event; +PSI_memory_key key_memory_pwt_error_message; +PSI_memory_key key_memory_pwt_workers; +PSI_memory_key key_memory_pwt_db; #ifdef HAVE_PSI_INTERFACE @@ -10144,7 +10149,8 @@ PSI_stage_info *all_server_stages[]= & stage_reading_semi_sync_ack, & stage_waiting_for_deadlock_kill, & stage_starting, - & stage_waiting_for_reset_master + & stage_waiting_for_reset_master, + & stage_reading_data_from_parallel_worker #ifdef WITH_WSREP , & stage_waiting_isolation, @@ -10256,6 +10262,9 @@ static PSI_memory_info all_server_memory[]= { &key_memory_trace_ddl_info, "TRACE_DDL_INFO", 0} }; + +extern void pwt_init_psi_keys(void); + /** Initialise all the performance schema instrumentation points used by the server. @@ -10342,6 +10351,7 @@ void init_server_psi_keys(void) stmt_info_rpl.m_flags= PSI_FLAG_MUTABLE; mysql_statement_register(category, &stmt_info_rpl, 1); #endif + pwt_init_psi_keys(); } #endif /* HAVE_PSI_INTERFACE */ diff --git a/sql/mysqld.h b/sql/mysqld.h index 9ee5bc33740e2..1264b0d4904a2 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -499,6 +499,10 @@ extern PSI_memory_key key_memory_Table_trigger_dispatcher; extern PSI_memory_key key_memory_native_functions; extern PSI_memory_key key_memory_WSREP; extern PSI_memory_key key_memory_trace_ddl_info; +extern PSI_memory_key key_memory_pwt_queued_event; +extern PSI_memory_key key_memory_pwt_error_message; +extern PSI_memory_key key_memory_pwt_workers; +extern PSI_memory_key key_memory_pwt_db; /* MAINTAINER: Please keep this list in order, to limit merge collisions. @@ -646,6 +650,7 @@ extern PSI_stage_info stage_slave_background_wait_request; extern PSI_stage_info stage_waiting_for_deadlock_kill; extern PSI_stage_info stage_starting; extern PSI_stage_info stage_waiting_for_reset_master; +extern PSI_stage_info stage_reading_data_from_parallel_worker; #ifdef WITH_WSREP // Additional Galera thread states extern PSI_stage_info stage_waiting_isolation; diff --git a/sql/privilege.h b/sql/privilege.h index b3791a8e3a2f5..80d9d3b80f254 100644 --- a/sql/privilege.h +++ b/sql/privilege.h @@ -452,6 +452,8 @@ constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_MAX_CONNECT_ERRORS= // Was SUPER_ACL prior to 10.5.2 constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_MAX_PASSWORD_ERRORS= CONNECTION_ADMIN_ACL; +constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_PARALLEL_WORKER_THREADS= + CONNECTION_ADMIN_ACL; // Was SUPER_ACL prior to 10.5.2 constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_PROXY_PROTOCOL_NETWORKS= CONNECTION_ADMIN_ACL; diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 5423eb393573d..fa6068a382d09 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -714,7 +714,7 @@ const char *thd_where(THD *thd) THD::THD(my_thread_id id, bool is_wsrep_applier) :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, /* statement id */ 0), - rli_fake(0), rgi_fake(0), rgi_slave(NULL), + rli_fake(0), rgi_fake(0), rgi_slave(NULL), pwt_worker_info(NULL), protocol_text(this), protocol_binary(this), initial_status_var(0), m_current_stage_key(0), m_psi(0), start_time(0), start_time_sec_part(0), in_sub_stmt(0), log_all_errors(0), @@ -5462,8 +5462,7 @@ void destroy_thd(MYSQL_THD thd) /** Create a THD that only has auxiliary functions - It will never be added to the global connection list - server_threads. It does not represent any client connection. + It does not represent any client connection. It should never be counted, because it will stall the shutdown. It is solely for engine's internal use, diff --git a/sql/sql_class.h b/sql/sql_class.h index 9897df33766b9..d34ce97369622 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -872,6 +872,7 @@ typedef struct system_variables ulong server_id; ulong session_track_transaction_info; ulong threadpool_priority; + ulong parallel_worker_threads; ulong vers_alter_history; /* deadlock detection */ @@ -3154,7 +3155,7 @@ enum class THD_WHERE const char *thd_where(THD *thd); - +class pwt_worker; /** @class THD @@ -3199,6 +3200,8 @@ class THD: public THD_count, /* this must be first */ /* Slave applier execution context */ rpl_group_info* rgi_slave; + pwt_worker *pwt_worker_info; + union { rpl_io_thread_info *rpl_io_info; rpl_sql_thread_info *rpl_sql_info; diff --git a/sql/sql_parallel_workers.cc b/sql/sql_parallel_workers.cc new file mode 100644 index 0000000000000..26b6e68fa5d9d --- /dev/null +++ b/sql/sql_parallel_workers.cc @@ -0,0 +1,619 @@ +/* + Copyright (c) 2026, MariaDB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 + USA */ + +/** + @file + + Implementation of parallel worker threads (PWT) management and execution + logic. + + Contains + error_to_queue + push an error message onto our queue to send to the manager + PWT_error_handler + intercept error and warnings, queue them to the manager + parallel_worker_thread_func + Entry point for our worker threads + abort_worker + pwt_management::free_queue + helper for error conditions + pwt_management::init_parallel_workers + Initialise our parallel worker threads + pwt_init_psi_keys + initialize PSI keys + pwt_management::join_parallel_workers + process information from worker threads until they are finished +*/ + + +#include "sql_parallel_workers.h" +#include "debug_sync.h" + +#ifdef HAVE_PSI_INTERFACE +static PSI_thread_key key_thread_pwt; +static PSI_thread_info all_pwt_threads[]= +{ + { &key_thread_pwt, WORKER_NAME, PSI_FLAG_GLOBAL}, +}; + +static PSI_mutex_key key_mutex_pwt_LOCK_manager, + key_mutex_pwt_LOCK_thread, + key_mutex_pwt_LOCK_worker; +static PSI_mutex_info all_pwt_mutexes[]= +{ + { &key_mutex_pwt_LOCK_manager, "pwt_management::LOCK_pwt_manager", 0}, + { &key_mutex_pwt_LOCK_thread, "pwt_management::LOCK_pwt_thread", 0}, + { &key_mutex_pwt_LOCK_worker, "pwt_worker::LOCK_worker", 0}, +}; + +static PSI_cond_key key_COND_pwt_new_message, key_COND_pwt_worker; +static PSI_cond_info all_pwt_conds[]= +{ + { &key_COND_pwt_new_message, "pwt_management::COND_pwt_new_message", 0}, + { &key_COND_pwt_worker, "pwt_worker::COND_worker", 0}, +}; + +static PSI_memory_info all_pwt_memory[]= +{ + { &key_memory_pwt_queued_event, "pwt_queued_event", 0}, + { &key_memory_pwt_error_message, "pwt_error_message", 0}, + { &key_memory_pwt_workers, "pwt_management::workers", 0}, + { &key_memory_pwt_db, "pwt_worker::db", 0}, +}; +#endif /* HAVE_PSI_INTERFACE */ + + +/** + @brief + push an error message onto our queue to send to the manager + + @return + true an error occurred + false error or warning is queued +*/ + +bool error_to_queue(THD *thd, pwt_queued_event **event, uint error, + Sql_condition::enum_warning_level level, const char *msg) +{ + DBUG_EXECUTE_IF("pwt_error_to_queue_oom", + { *event= nullptr; return true; }); + *event= (pwt_queued_event*) my_malloc(key_memory_pwt_queued_event, + sizeof(pwt_queued_event), + MYF(0)); + if (!*event) + return true; + (*event)->error= (pwt_error_message*) my_malloc(key_memory_pwt_error_message, + sizeof(pwt_error_message), + MYF(0)); + if (!(*event)->error) + { + my_free(*event); + *event= nullptr; + return true; + } + (*event)->data= nullptr; + (*event)->error->level= level; + if (level == Sql_condition::enum_warning_level::WARN_LEVEL_ERROR) + (*event)->error->worker_errno= thd->killed_errno(); + else + (*event)->error->worker_errno= 0; + (*event)->error->code= error; + (*event)->error->message= (char *) my_malloc(key_memory_pwt_error_message, + strlen(msg)+1, + MYF(0)); + if (!(*event)->error->message) + { + my_free((*event)->error); + my_free(*event); + *event= nullptr; + return true; + } + strmake((*event)->error->message, msg, strlen(msg)); + return false; +} + + +class PWT_error_handler : public Internal_error_handler +{ +public: + bool handle_condition(THD *thd, + uint sql_errno, + const char* sql_state, + Sql_condition::enum_warning_level *level, + const char* msg, + Sql_condition ** cond_hdl) override + { + if (pwt_worker *worker= thd->pwt_worker_info) + { + pwt_queued_event *event; + if (error_to_queue(thd, &event, sql_errno, *level, msg)) + { + /* + Couldn't allocate the queued event. The worker THD's diagnostics + area is discarded when the worker exits, so flag the manager so it + can surface a single ER_OUTOFMEMORY warning to the user instead of + letting this condition vanish. + */ + mysql_mutex_lock(&worker->manager->LOCK_pwt_thread); + worker->manager->messages_dropped= true; + mysql_mutex_unlock(&worker->manager->LOCK_pwt_thread); + mysql_mutex_lock(&worker->manager->LOCK_pwt_manager); + mysql_cond_signal(&worker->manager->COND_pwt_new_message); + mysql_mutex_unlock(&worker->manager->LOCK_pwt_manager); + return true; + } + mysql_mutex_lock(&worker->manager->LOCK_pwt_thread); + worker->manager->parallel_messages.push_back(event); + mysql_mutex_unlock(&worker->manager->LOCK_pwt_thread); + } + return true; // no further processing in worker thread + } + +}; + + +/** + @brief + Entry point for our worker threads, arg supplied by manager details what + needs to be run +*/ + +static void *parallel_worker_thread_func(void *arg) +{ + pwt_worker *worker= (pwt_worker*) arg; + struct timespec abs_timeout; + PSI_stage_info old_stage; + + PWT_error_handler error_handler; + abs_timeout.tv_nsec= 0; + /* + Set current_thd and thread local storage (my_thread_var) for our new THD + to ensure they have their own local objects/errors/warnings etc + */ + void *save= thd_attach_thd(worker->thd); + my_thread_set_name(worker->thd->connection_name.str); + THD_STAGE_INFO(worker->thd, stage_sending_data); + worker->thd->push_internal_handler(&error_handler); + + /* + START: in lieu of work, wait 1 seconds, push out an error or a warning, + wait another 1 seconds then exit + */ + abs_timeout.tv_sec= time(0)+1; + mysql_mutex_lock(&worker->LOCK_worker); + worker->thd->ENTER_COND(&worker->COND_worker, &worker->LOCK_worker, + &stage_sending_data, &old_stage); + mysql_cond_timedwait(&worker->COND_worker, &worker->LOCK_worker, + &abs_timeout); + worker->thd->EXIT_COND(&old_stage); + +#ifdef ENABLED_DEBUG_SYNC + /* + we can't sync on the managers or our THD, spin the whole thing about + and use the global signal pool, NO_CLEAR_EVENT is needed because we have + multiple workers and the wrong one will likely consume the signal. + */ + DBUG_EXECUTE_IF("pwt_worker_pause_before_signal", + DBUG_ASSERT(!debug_sync_set_action(worker->thd, STRING_WITH_LEN( + "now SIGNAL pwt_worker_paused WAIT_FOR pwt_worker_continue NO_CLEAR_EVENT" + )));); +#endif + + mysql_mutex_lock(&worker->thd->LOCK_thd_kill); + if (worker->thd->killed) + { + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + my_error(ER_QUERY_INTERRUPTED, MYF(0)); + goto worker_thread_exit; + } + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + + if (worker->parallel_scan_job) + push_warning(current_thd, Sql_condition::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR, + "This is an example warning to show we can push a " + "warning from a worker thread to its manager "); + else + my_error(ER_ARGUMENT_OUT_OF_RANGE, MYF(0), "worker_busted_function()"); + + // signal manager there is something in the queue, + mysql_mutex_lock(&worker->manager->LOCK_pwt_manager); + mysql_cond_signal(&worker->manager->COND_pwt_new_message); + mysql_mutex_unlock(&worker->manager->LOCK_pwt_manager); + + abs_timeout.tv_sec= time(0)+5; + mysql_mutex_lock(&worker->LOCK_worker); + worker->thd->ENTER_COND(&worker->COND_worker, &worker->LOCK_worker, + &stage_sending_data, &old_stage); + mysql_cond_timedwait(&worker->COND_worker, &worker->LOCK_worker, + &abs_timeout); + worker->thd->EXIT_COND(&old_stage); + + mysql_mutex_lock(&worker->thd->LOCK_thd_kill); + if (worker->thd->killed) + { + my_error(ER_QUERY_INTERRUPTED, MYF(0)); + } + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + + // END: in lieu of work + +worker_thread_exit: + + // manager needs to see this as atomic + mysql_mutex_lock(&worker->LOCK_worker); + /* + LOCK_thd_kill is the canonical guard for thd->killed; a user-issued + KILL on this worker's thread_id goes through THD::awake() which holds + LOCK_thd_kill but not LOCK_worker, so we must nest both to get a + race-free snapshot for the manager. + Lock order matches join_parallel_workers(). + */ + mysql_mutex_lock(&worker->thd->LOCK_thd_kill); + worker->killed= worker->thd->killed; // save this flag, THD is destroyed + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + worker->thd->pop_internal_handler(); // maybe not needed + worker->finished= true; + THD *thd= worker->thd; + worker->thd= nullptr; + mysql_mutex_unlock(&worker->LOCK_worker); + + // signal manager again to wake up and end this thread + mysql_mutex_lock(&worker->manager->LOCK_pwt_manager); + mysql_cond_signal(&worker->manager->COND_pwt_new_message); + mysql_mutex_unlock(&worker->manager->LOCK_pwt_manager); + + /* + executing this sets my_thread_var to null, stopping our ability use + the normal mutex mechanisms, so we operate this outside the locked region + on a copy of our THD pointer + */ + thd_detach_thd(save); + server_threads.erase(thd); + destroy_background_thd(thd); + + return nullptr; +} + +/** + @brief + Abort this worker, called as part of an error condition + + The worker may already be tearing itself down: parallel_worker_thread_func + nulls worker->thd and destroys the THD under LOCK_worker. Take that lock + and only awake() if the worker hasn't yet entered its exit section; if + it has, the worker is on its way out and pthread_join will reap it. +*/ + +void abort_worker(pwt_worker *worker) +{ + mysql_mutex_lock(&worker->LOCK_worker); + if (worker->thd) + worker->thd->awake(ABORT_QUERY); + mysql_mutex_unlock(&worker->LOCK_worker); + pthread_join(worker->pthread, nullptr); + mysql_mutex_destroy(&worker->LOCK_worker); + mysql_cond_destroy(&worker->COND_worker); +} + + +/** + @brief + Free our message queue, discard the messages +*/ + +void pwt_management::free_queue() +{ + // process queue + if (!parallel_messages.head()) + return; + + mysql_mutex_lock(&LOCK_pwt_thread); + pwt_queued_event *event; + while ((event= parallel_messages.get())) + { + if (pwt_error_message *err= event->error) + { + my_free(err->message); + my_free(err); + } + if (event->data) + { + // TODO: free associated + } + my_free(event); + } + mysql_mutex_unlock(&LOCK_pwt_thread); +} + + +/** + @brief + Initialise our parallel worker threads, setting their own new THD objects. + Set up our mutexs for synchronization. + Register our new threads in server_threads. + + Called from the management thread for applicable queries at the top level. +*/ + +bool pwt_management::init_parallel_workers(THD *thd) +{ + bool result= false; + uint i= 0; + + if (const uint n= thd->variables.parallel_worker_threads) + { + workers= (pwt_worker *) my_malloc(key_memory_pwt_workers, + n * sizeof(pwt_worker), + MYF(MY_WME | MY_ZEROFILL)); + if (!workers) + return true; + + nworkers= n; + + mysql_mutex_init(key_mutex_pwt_LOCK_manager, &LOCK_pwt_manager, + MY_MUTEX_INIT_SLOW); + mysql_mutex_init(key_mutex_pwt_LOCK_thread, &LOCK_pwt_thread, + MY_MUTEX_INIT_SLOW); + mysql_cond_init(key_COND_pwt_new_message, &COND_pwt_new_message, NULL); + for (i= 0; i < n; i++) + { + workers[i].thd= create_background_thd(); + if (!workers[i].thd) + { + result= true; + goto cleanup_old_workers; + } + + workers[i].manager= this; + mysql_mutex_init(key_mutex_pwt_LOCK_worker, &workers[i].LOCK_worker, + MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_pwt_worker, &workers[i].COND_worker, nullptr); + workers[i].thd->system_thread= SYSTEM_THREAD_GENERIC; + size_t len= my_snprintf(workers[i].conn_name, MAX_THREAD_NAME, + WORKER_NAME); + workers[i].thd->connection_name.str= workers[i].conn_name; + workers[i].thd->connection_name.length= len; + workers[i].thd->security_ctx= thd->security_ctx; + workers[i].thd->set_command(thd->get_command()); + if (thd->db.str) + { + // explicit call in ~THD/THD::free_connection()/my_free, so we do this + workers[i].thd->db.str= (char*)my_malloc(key_memory_pwt_db, + thd->db.length+1, + MYF(0)); + if (!workers[i].thd->db.str) + { + result= true; + goto cleanup_db_string; + } + + strmake(const_cast(workers[i].thd->db.str), thd->db.str, + thd->db.length); + workers[i].thd->db.length= thd->db.length; + } + else + { + workers[i].thd->db.str= nullptr; + workers[i].thd->db.length= 0; + } + workers[i].thd->start_utime= thd->start_utime; + workers[i].thd->thread_id= next_thread_id(); + my_snprintf(workers[i].info, sizeof(workers[i].info), + WORKER_NAME " %u " CONNECTION_NAME_THREAD " %llu", + i+1, thd->thread_id); + workers[i].thd->query_string= CSET_STRING(workers[i].info, + strlen(workers[i].info), + workers[i].thd->query_charset()); + workers[i].thd->pwt_worker_info= workers+i; + workers[i].finished= workers[i].joined= false; + workers[i].killed= NOT_KILLED; + if ((i+1)%10) // determines error or warning in a deterministic way + workers[i].parallel_scan_job= (void*)0x1; + server_threads.insert(workers[i].thd); // +information_schema.processlist + + if (mysql_thread_create(key_thread_pwt, &workers[i].pthread, nullptr, + parallel_worker_thread_func, &workers[i])) + { + result= true; + goto cleanup_thread_create; + } + } + this->thd= thd; + return result; + } + else + return false; + +cleanup_thread_create: + server_threads.erase(workers[i].thd); + +cleanup_db_string: + /* + destroy_background_thd() requires current_thd to be NULL because it + re-attaches the background THD to this thread's TLS. We are running on + the user's query thread (current_thd == manager thd), so save/null/ + restore around the call. Mirrors the create_background_thd() pattern. + */ + { + THD *save_thd= current_thd; + set_current_thd(nullptr); + destroy_background_thd(workers[i].thd); + set_current_thd(save_thd); + } + mysql_mutex_destroy(&workers[i].LOCK_worker); + mysql_cond_destroy(&workers[i].COND_worker); + +cleanup_old_workers: + for (uint j= 0; j < i; j++) + abort_worker(workers+j); + free_queue(); + my_free(workers); + workers= nullptr; + nworkers= 0; + mysql_cond_destroy(&COND_pwt_new_message); + mysql_mutex_destroy(&LOCK_pwt_manager); + mysql_mutex_destroy(&LOCK_pwt_thread); + + return result; +} + +#ifdef HAVE_PSI_INTERFACE +void pwt_init_psi_keys(void) +{ + const char *category= "sql"; + int count; + count= array_elements(all_pwt_threads); + PSI_server->register_thread(category, all_pwt_threads, count); + count= array_elements(all_pwt_mutexes); + mysql_mutex_register(category, all_pwt_mutexes, count); + count= array_elements(all_pwt_conds); + mysql_cond_register(category, all_pwt_conds, count); + count= array_elements(all_pwt_memory); + mysql_memory_register(category, all_pwt_memory, count); +} +#endif + + +/** + @brief + Process data {errors, warnings, data, signals} from the workers. + + Currently this is called in the main thread after JOIN::exec_inner, but + this will need to be disassembled and integrated into the above (or vice + versa). +*/ + +void pwt_management::join_parallel_workers(THD *thd) +{ + bool all_done= false, workers_killed= false; + PSI_stage_info old_stage; + struct timespec wait_max; + wait_max.tv_nsec= 0; + int killed_from= -1; + + while (!all_done) + { + wait_max.tv_sec= time(0)+1; // wait 1s + mysql_mutex_lock(&LOCK_pwt_manager); + thd->ENTER_COND(&COND_pwt_new_message, &LOCK_pwt_manager, + &stage_reading_data_from_parallel_worker, &old_stage); + mysql_cond_timedwait(&COND_pwt_new_message, &LOCK_pwt_manager, &wait_max); + thd->EXIT_COND(&old_stage); + + all_done= true; + + // delete worker threads that are finished + for (uint i= 0; i < nworkers; i++) + { + if (workers[i].joined) // already done + continue; + mysql_mutex_lock(&workers[i].LOCK_worker); + if (workers[i].finished) + { + mysql_mutex_unlock(&workers[i].LOCK_worker); + if (workers[i].killed) + { + killed_from= i; + thd->awake(workers[i].killed); + } + pthread_join(workers[i].pthread, nullptr); + mysql_mutex_destroy(&workers[i].LOCK_worker); + mysql_cond_destroy(&workers[i].COND_worker); + workers[i].joined= true; + } + else + { + mysql_mutex_unlock(&workers[i].LOCK_worker); + all_done= false; + } + } + + if (thd->killed && !workers_killed) + { + // inform our workers that they are killed + for (uint i= 0; i < nworkers; i++) + { + if (workers[i].joined) + continue; + mysql_mutex_lock(&workers[i].LOCK_worker); + if (workers[i].finished) + { + mysql_mutex_unlock(&workers[i].LOCK_worker); + continue; + } + + if ((int)i != killed_from) + { + mysql_mutex_lock(&workers[i].thd->LOCK_thd_kill); + workers[i].thd->killed= thd->killed; + mysql_mutex_unlock(&workers[i].thd->LOCK_thd_kill); + mysql_cond_signal(&workers[i].COND_worker); + } + mysql_mutex_unlock(&workers[i].LOCK_worker); + } + workers_killed= true; + } + else + { + // process queue + bool surface_drop; + mysql_mutex_lock(&LOCK_pwt_thread); + surface_drop= messages_dropped; + messages_dropped= false; + pwt_queued_event *event; + while ((event= parallel_messages.get())) + { + if (pwt_error_message *err= event->error) + { + /* + set_overwrite_status to capture a message in our worker THD + TODO, look at getting rid of this if we can + */ + // thd->get_stmt_da()->set_overwrite_status(true); + if (err->level == Sql_condition::enum_warning_level::WARN_LEVEL_ERROR) + my_message_sql(err->code, err->message, MYF(0)); + else + push_warning(thd, err->level, err->code, err->message); + + my_free(err->message); + my_free(err); + } + if (event->data) + { + // process data from our worker thread + } + my_free(event); + } + mysql_mutex_unlock(&LOCK_pwt_thread); + + if (surface_drop) + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_OUTOFMEMORY, + "Parallel worker diagnostics were dropped due " + "to memory allocation failure"); + } + } + + if (nworkers) + { + mysql_cond_destroy(&COND_pwt_new_message); + mysql_mutex_destroy(&LOCK_pwt_manager); + mysql_mutex_destroy(&LOCK_pwt_thread); + my_free(workers); + workers= nullptr; + } +} diff --git a/sql/sql_parallel_workers.h b/sql/sql_parallel_workers.h new file mode 100644 index 0000000000000..e7117e8155fc6 --- /dev/null +++ b/sql/sql_parallel_workers.h @@ -0,0 +1,117 @@ +#ifndef SQL_PARALLEL_WORKERS_H +#define SQL_PARALLEL_WORKERS_H + +#include "mariadb.h" +#include "sql_class.h" +#include "mysqld.h" +#include "sql_error.h" + +extern MYSQL_THD create_background_thd(); +extern void destroy_background_thd(MYSQL_THD thd); +extern void *thd_attach_thd(MYSQL_THD thd); +extern void thd_detach_thd(void *save); + +// PWT Parallel Worker Thread + + +/* + Message Types +*/ +class pwt_error_message +{ +public: + uint worker_errno; + uint code; + Sql_condition::enum_warning_level level; + char *message; +}; + +class pwt_data_message +{ +public: + TABLE *tmp_table; +}; + + +/* + Event type. Inherits ilink so it can live in an I_List. +*/ +class pwt_queued_event : public ilink +{ +public: + pwt_error_message *error; + pwt_data_message *data; +}; + +class pwt_management; + + +#define WORKER_NAME "Parallel Worker" +#define WORKER_ID_LENGTH 3 +#define WORKER_NAME_LENGTH 15 +#define CONNECTION_NAME_THREAD "For Thread ID" +#define CONNECTION_NAME_THREAD_LENGTH 13 +#define THREAD_ID_LENGTH 20 // ull can occupy 20 chars + +/* + Parallel Worker Thread specific attributes +*/ +class pwt_worker +{ +public: + THD *thd; + pwt_management *manager; + pthread_t pthread; + mysql_mutex_t LOCK_worker; + mysql_cond_t COND_worker; + char conn_name[MAX_THREAD_NAME+1]; + /* + This is displayed in information_schema.processlist.info + Currently "Parallel Worker {1..N} For Thread M" + */ + char info[WORKER_NAME_LENGTH+ + 1+WORKER_ID_LENGTH+1+ + CONNECTION_NAME_THREAD_LENGTH+ + 1+THREAD_ID_LENGTH+1]; + bool joined; + bool finished; + killed_state killed; + void *parallel_scan_job; +}; + + +/* + Class to create, manage and eventually destroy a "team" of worker threads. +*/ +class pwt_management : public Sql_alloc +{ +public: + pwt_worker *workers; + uint nworkers; + I_List parallel_messages; + mysql_cond_t COND_pwt_new_message; + mysql_mutex_t LOCK_pwt_manager; + mysql_mutex_t LOCK_pwt_thread; + THD *thd; + /* + Set under LOCK_pwt_thread when a worker fails to allocate a queued event. + The manager surfaces a single ER_OUTOFMEMORY warning so the user sees + that worker diagnostics were dropped instead of silently disappearing. + */ + bool messages_dropped; + pwt_management(): + workers(nullptr), + nworkers(0), + messages_dropped(false) + {} + ~pwt_management() + { + if (workers) + join_parallel_workers(current_thd); + } + bool init_parallel_workers(THD *thd); + void join_parallel_workers(THD *thd); + void free_queue(); +}; + +#endif diff --git a/sql/sql_select.cc b/sql/sql_select.cc index dbd3283c45c5c..376fde7b339b8 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -26,6 +26,7 @@ */ #include "mariadb.h" +#include "mysqld_error.h" #include "sql_priv.h" #include "unireg.h" #include "sql_select.h" @@ -70,6 +71,7 @@ #include "derived_handler.h" #include "opt_hints.h" #include "opt_group_by_cardinality.h" +#include "sql_parallel_workers.h" /* A key part number that means we're using a fulltext scan. @@ -4909,6 +4911,21 @@ int JOIN::exec() select_lex->select_number)) dbug_serve_apcs(thd, 1); ); + + // If we are a top level select statement + // TEMPORARY-PARALLEL-WORK-TEST: + if (!select_lex->outer_select() && thd->lex->sql_command == SQLCOM_SELECT && + !parallel_work_manager) + { + if (!(parallel_work_manager= new (thd->mem_root) pwt_management) || + parallel_work_manager->init_parallel_workers(thd)) + { + my_error(ER_INTERNAL_ERROR, MYF(0), + "Failed to initialize parallel work mgr"); + return 1; + } + } + ANALYZE_START_TRACKING(thd, &explain->time_tracker); res= exec_inner(); ANALYZE_STOP_TRACKING(thd, &explain->time_tracker); @@ -17316,6 +17333,16 @@ void JOIN::cleanup(bool full) if (full) have_query_plan= QEP_DELETED; + // TEMPORARY-PARALLEL-WORK-TEST: + if (parallel_work_manager) + { + // Finish work and delete the manager + if (parallel_work_manager->workers) + parallel_work_manager->join_parallel_workers(thd); + delete parallel_work_manager; + parallel_work_manager= NULL; + } + if (original_join_tab) { /* Free the original optimized join created for the group_by_handler */ diff --git a/sql/sql_select.h b/sql/sql_select.h index 2507a871a6005..0b6b0cfdb8156 100644 --- a/sql/sql_select.h +++ b/sql/sql_select.h @@ -33,6 +33,7 @@ #include "sql_update.h" #include "cset_narrowing.h" +#include "sql_parallel_workers.h" typedef struct st_join_table JOIN_TAB; /* Values in optimize */ @@ -1753,6 +1754,8 @@ class JOIN :public Sql_alloc */ Sql_cmd_dml *sql_cmd_dml; + pwt_management *parallel_work_manager{0}; + JOIN(THD *thd_arg, List &fields_arg, ulonglong select_options_arg, select_result *result_arg) :fields_list(fields_arg) diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index d6becc62d8632..c2580ad215f55 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -2496,6 +2496,16 @@ Sys_slave_parallel_workers( ON_UPDATE(fix_slave_parallel_threads)); +static Sys_var_on_access_global +Sys_parallel_worker_threads( + "parallel_worker_threads", + "Number of worker threads available for parallel query execution. " + "0 means parallel execution is disabled", + SESSION_VAR(parallel_worker_threads), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0,100), DEFAULT(0), BLOCK_SIZE(1)); + + static bool check_slave_domain_parallel_threads(sys_var *self, THD *thd, set_var *var) {