From aaea1faaa191e13641cdc3a52ea083ae6f95217b Mon Sep 17 00:00:00 2001 From: Rex Johnston Date: Wed, 6 May 2026 11:18:47 +1200 Subject: [PATCH] MDEV-39492 Parallel Query: Study how to create worker threads Introduces parallel_worker_threads variable to control the number of worker threads created by a parallel execution query. 2 new files, sql_parallel_workers.h sql_parallel_workers.cc which contain structures for the creation, management and deletion of parallel worker threads (pwt_ in the name). Main management class created in the stack in JOIN::exec, implemented for the top level select. Current parallel_worker_thread_func sleeps for 10 seconds, generates a warning, signals the main thread, sleeps 10 seconds, signals the main thread again, sets it's finished flag and cleans it's THD. The main thread loops through worker threads, looking for finished thread and cleans them up if they have finished. It then waits for a signal, then processes it's message queue. Threads are registed in server_threads, so are visible in information_schema.processlist and the show processlist command. We check that a kill query on a parallel worker is passed onto it's manager and the query is properly aborted, and that a kill connection is handled properly in parallel_worker.test. Review input 1: cleanup earlier Do cleanup before we've finished sending the result to the client. This way, one can see the errors (and eventually warnings) marshalled back to the main thread and returned to the user: MariaDB [test]> set parallel_worker_threads=10; Query OK, 0 rows affected (0.001 sec) MariaDB [test]> select seq from seq_1_to_10; ERROR 4103 (HY000): Argument to the worker_busted_function() function does not belong to the range [0,1] Assisted by Sergei Petrunia and Claude Code. --- include/my_pthread.h | 14 + libmysqld/CMakeLists.txt | 1 + mysql-test/main/mysqld--help.result | 4 + mysql-test/main/parallel_query.result | 55 ++ mysql-test/main/parallel_query.test | 90 +++ mysql-test/main/parallel_query_oom.result | 21 + mysql-test/main/parallel_query_oom.test | 33 + .../r/sysvars_server_notembedded.result | 10 + mysys/my_thread_name.cc | 5 - sql/CMakeLists.txt | 1 + sql/mysqld.cc | 12 +- sql/mysqld.h | 5 + sql/privilege.h | 2 + sql/sql_class.cc | 5 +- sql/sql_class.h | 5 +- sql/sql_parallel_workers.cc | 619 ++++++++++++++++++ sql/sql_parallel_workers.h | 117 ++++ sql/sql_select.cc | 27 + sql/sql_select.h | 3 + sql/sys_vars.cc | 10 + 20 files changed, 1029 insertions(+), 10 deletions(-) create mode 100644 mysql-test/main/parallel_query.result create mode 100644 mysql-test/main/parallel_query.test create mode 100644 mysql-test/main/parallel_query_oom.result create mode 100644 mysql-test/main/parallel_query_oom.test create mode 100644 sql/sql_parallel_workers.cc create mode 100644 sql/sql_parallel_workers.h diff --git a/include/my_pthread.h b/include/my_pthread.h index 6fe76c40cba54..0d64090701a05 100644 --- a/include/my_pthread.h +++ b/include/my_pthread.h @@ -624,6 +624,20 @@ typedef uint64 my_thread_id; */ #define MY_THREAD_ID_MAX UINT32_MAX +#ifdef _WIN32 +#define MAX_THREAD_NAME 256 +#elif defined(__linux__) +#define MAX_THREAD_NAME 16 +#elif defined(__FreeBSD__) || defined(__OpenBSD__) +#define MAX_THREAD_NAME 19 +#include +#elif defined(__apple_build_version__) +#include +#define MAX_THREAD_NAME MAXTHREADNAMESIZE +#else +#define MAX_THREAD_NAME 16 +#endif + extern void my_threadattr_global_init(void); extern my_bool my_thread_global_init(void); extern void my_thread_set_name(const char *); diff --git a/libmysqld/CMakeLists.txt b/libmysqld/CMakeLists.txt index f59354625b345..14f7a12381db9 100644 --- a/libmysqld/CMakeLists.txt +++ b/libmysqld/CMakeLists.txt @@ -72,6 +72,7 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc ../sql/mf_iocache.cc ../sql/my_decimal.cc ../sql/net_serv.cc ../sql/opt_range.cc ../sql/opt_group_by_cardinality.cc + ../sql/sql_parallel_workers.cc ../sql/opt_rewrite_date_cmp.cc ../sql/opt_rewrite_remove_casefold.cc ../sql/opt_sargable_left.cc diff --git a/mysql-test/main/mysqld--help.result b/mysql-test/main/mysqld--help.result index 17458f8cf56ba..c5314ead01525 100644 --- a/mysql-test/main/mysqld--help.result +++ b/mysql-test/main/mysqld--help.result @@ -1012,6 +1012,9 @@ The following specify which files/extra groups are read (specified before remain Cost of checking the row against the WHERE clause. Increasing this will have the optimizer to prefer plans with less row combinations + --parallel-worker-threads=# + Number of worker threads available for parallel query + execution. 0 means parallel execution is disabled --path=name Comma-separated list of schema names that defines the search order for stored routines --performance-schema @@ -2004,6 +2007,7 @@ optimizer-trace optimizer-trace-max-mem-size 1048576 optimizer-use-condition-selectivity 4 optimizer-where-cost 0.032 +parallel-worker-threads 0 path CURRENT_SCHEMA performance-schema FALSE performance-schema-accounts-size -1 diff --git a/mysql-test/main/parallel_query.result b/mysql-test/main/parallel_query.result new file mode 100644 index 0000000000000..ad5fac674c534 --- /dev/null +++ b/mysql-test/main/parallel_query.result @@ -0,0 +1,55 @@ +SET DEBUG_SYNC='RESET'; +# +# MDEV-39492 Parallel Query: Study how to create worker threads +# +set session parallel_worker_threads=3; +# we should currently see 3 warnings; +select seq from seq_1_to_2; +seq +1 +2 +Warnings: +Warning 1105 This is an example warning to show we can push a warning from a worker thread to its manager +Warning 1105 This is an example warning to show we can push a warning from a worker thread to its manager +Warning 1105 This is an example warning to show we can push a warning from a worker thread to its manager +set session parallel_worker_threads=10; +# the 10th workers throws out an error +select seq from seq_1_to_2; +ERROR HY000: Argument to the worker_busted_function() function does not belong to the range [0,1] +set session parallel_worker_threads=0; +connect killee, localhost, root, , ; +# check that kill query on a parallel worker is passed to the manager +connection killee; +set session parallel_worker_threads=3; +SET @save_dbug= @@global.debug_dbug; +SET GLOBAL debug_dbug = "+d,pwt_worker_pause_before_signal"; +select seq from seq_1_to_2;; +connection default; +SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; +# Process list without thread ID +select USER, HOST, DB, COMMAND, STATE, SUBSTRING(INFO, 1, 32) +from information_schema.processlist; +USER HOST DB COMMAND STATE SUBSTRING(INFO, 1, 32) +root localhost test Query debug sync point: now Parallel Worker 3 For Thread ID +root localhost test Query debug sync point: now Parallel Worker 2 For Thread ID +root localhost test Query debug sync point: now Parallel Worker 1 For Thread ID +root localhost test Query Reading data from parallel workers select seq from seq_1_to_2 +root localhost test Query Filling schema table select USER, HOST, DB, COMMAND, +kill query ID; +SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; +connection killee; +Got one of the listed errors +SET DEBUG_SYNC = "RESET"; +# save as above, but kill a worker with a simple kill and see the +# connection drop +select seq from seq_1_to_2;; +connection default; +SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; +kill ID; +SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; +connection killee; +ERROR 70100: Query execution was interrupted +# killee connection now dead, confirmed below +connection default; +SET GLOBAL debug_dbug = @save_dbug; +SET DEBUG_SYNC = "RESET"; diff --git a/mysql-test/main/parallel_query.test b/mysql-test/main/parallel_query.test new file mode 100644 index 0000000000000..aaac9c399b2e5 --- /dev/null +++ b/mysql-test/main/parallel_query.test @@ -0,0 +1,90 @@ +# +# Test KILL and KILL QUERY statements. +# + +-- source include/count_sessions.inc +-- source include/not_embedded.inc +# this will be used in the future -- source include/have_innodb.inc +-- source include/have_sequence.inc +--source include/have_debug.inc +--source include/have_debug_sync.inc +SET DEBUG_SYNC='RESET'; + +--disable_service_connection + +--echo # +--echo # MDEV-39492 Parallel Query: Study how to create worker threads +--echo # + +set session parallel_worker_threads=3; +--echo # we should currently see 3 warnings; +select seq from seq_1_to_2; + +set session parallel_worker_threads=10; +--echo # the 10th workers throws out an error +--error ER_ARGUMENT_OUT_OF_RANGE +select seq from seq_1_to_2; + +set session parallel_worker_threads=0; +connect (killee, localhost, root, , ); + +--echo # check that kill query on a parallel worker is passed to the manager + +connection killee; +let $id= `select connection_id()`; +set session parallel_worker_threads=3; +# worker THDs don't inherit session vars, so flip dbug globally +SET @save_dbug= @@global.debug_dbug; +SET GLOBAL debug_dbug = "+d,pwt_worker_pause_before_signal"; + +--send select seq from seq_1_to_2; +connection default; +SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; +let $name= "Parallel Worker 1 For Thread ID $id"; +let $killID= `SELECT @kid:=ID from information_schema.processlist + where info like $name limit 1`; +--echo # Process list without thread ID +select USER, HOST, DB, COMMAND, STATE, SUBSTRING(INFO, 1, 32) + from information_schema.processlist; +--replace_result $killID ID +eval kill query $killID; +SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; +let $wait_condition= + select count(*) = 1 from information_schema.processlist + where command = 'Sleep' and id = $id; +--source include/wait_condition.inc +connection killee; +--error ER_QUERY_INTERRUPTED, ER_ARGUMENT_OUT_OF_RANGE +--reap + +SET DEBUG_SYNC = "RESET"; + +--echo # save as above, but kill a worker with a simple kill and see the +--echo # connection drop + +--send select seq from seq_1_to_2; +connection default; +SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; +let $name= "Parallel Worker 1 For Thread ID $id"; +let $killID= `SELECT @kid:=ID from information_schema.processlist + where info like $name limit 1`; +--replace_result $killID ID +eval kill $killID; +SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; +let $wait_condition= + select count(*) = 0 from information_schema.processlist + where command = 'Sleep' and id = $id; +--source include/wait_condition.inc +connection killee; +--disable_warnings +--error ER_QUERY_INTERRUPTED +--reap + +--echo # killee connection now dead, confirmed below + +connection default; + +SET GLOBAL debug_dbug = @save_dbug; +SET DEBUG_SYNC = "RESET"; + +--source include/wait_until_count_sessions.inc diff --git a/mysql-test/main/parallel_query_oom.result b/mysql-test/main/parallel_query_oom.result new file mode 100644 index 0000000000000..c42bf2cf2abf7 --- /dev/null +++ b/mysql-test/main/parallel_query_oom.result @@ -0,0 +1,21 @@ +# +# MDEV-39492 Parallel Query: OOM in worker error_to_queue surfaces a +# single ER_OUTOFMEMORY warning so worker diagnostics aren't silently +# lost. +# +set @save_dbug=@@global.debug_dbug; +set global debug_dbug="+d,pwt_error_to_queue_oom"; +set session parallel_worker_threads=1; +# The prototype worker emits either a warning or my_error() depending on the +# parity of its thread id. Either path runs through error_to_queue; the +# DBUG injection forces both into the OOM branch, so neither the original +# warning nor the original error reaches the user. We expect just the +# manager-surfaced ER_OUTOFMEMORY warning. +select count(*) from seq_1_to_2; +count(*) +2 +show warnings; +Level Code Message +Warning 1037 Parallel worker diagnostics were dropped due to memory allocation failure +set global debug_dbug=@save_dbug; +set session parallel_worker_threads=default; diff --git a/mysql-test/main/parallel_query_oom.test b/mysql-test/main/parallel_query_oom.test new file mode 100644 index 0000000000000..6a8afd3266182 --- /dev/null +++ b/mysql-test/main/parallel_query_oom.test @@ -0,0 +1,33 @@ +# +# Test OOM handling in the parallel worker error/warning queue. +# + +-- source include/count_sessions.inc +-- source include/not_embedded.inc +-- source include/have_sequence.inc +-- source include/have_debug.inc + +--disable_service_connection + +--echo # +--echo # MDEV-39492 Parallel Query: OOM in worker error_to_queue surfaces a +--echo # single ER_OUTOFMEMORY warning so worker diagnostics aren't silently +--echo # lost. +--echo # + +set @save_dbug=@@global.debug_dbug; +set global debug_dbug="+d,pwt_error_to_queue_oom"; +set session parallel_worker_threads=1; + +--echo # The prototype worker emits either a warning or my_error() depending on the +--echo # parity of its thread id. Either path runs through error_to_queue; the +--echo # DBUG injection forces both into the OOM branch, so neither the original +--echo # warning nor the original error reaches the user. We expect just the +--echo # manager-surfaced ER_OUTOFMEMORY warning. +select count(*) from seq_1_to_2; +show warnings; + +set global debug_dbug=@save_dbug; +set session parallel_worker_threads=default; + +--source include/wait_until_count_sessions.inc diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result index 268c06632b8b4..573cb113d9efe 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result +++ b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result @@ -2992,6 +2992,16 @@ NUMERIC_BLOCK_SIZE NULL ENUM_VALUE_LIST NULL READ_ONLY NO COMMAND_LINE_ARGUMENT REQUIRED +VARIABLE_NAME PARALLEL_WORKER_THREADS +VARIABLE_SCOPE SESSION +VARIABLE_TYPE BIGINT UNSIGNED +VARIABLE_COMMENT Number of worker threads available for parallel query execution. 0 means parallel execution is disabled +NUMERIC_MIN_VALUE 0 +NUMERIC_MAX_VALUE 100 +NUMERIC_BLOCK_SIZE 1 +ENUM_VALUE_LIST NULL +READ_ONLY NO +COMMAND_LINE_ARGUMENT REQUIRED VARIABLE_NAME PATH VARIABLE_SCOPE SESSION VARIABLE_TYPE VARCHAR diff --git a/mysys/my_thread_name.cc b/mysys/my_thread_name.cc index 9f32dae269ab0..90418c5cd8825 100644 --- a/mysys/my_thread_name.cc +++ b/mysys/my_thread_name.cc @@ -20,12 +20,7 @@ #include #ifdef _WIN32 -#define MAX_THREAD_NAME 256 typedef HRESULT (*func_SetThreadDescription)(HANDLE,PCWSTR); -#elif defined(__linux__) -#define MAX_THREAD_NAME 16 -#elif defined(__FreeBSD__) || defined(__OpenBSD__) -#include #endif #if defined(HAVE_PSI_THREAD_INTERFACE) && !defined DBUG_OFF diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index e54e894e1d0fc..5ede64143d08a 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -115,6 +115,7 @@ SET (SQL_SOURCE ../sql-common/client_plugin.c opt_range.cc vector_mhnsw.cc opt_group_by_cardinality.cc + sql_parallel_workers.cc opt_rewrite_date_cmp.cc opt_rewrite_remove_casefold.cc opt_sargable_left.cc diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 1aa5e0cfe7ddf..67293cd2553e4 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -9911,6 +9911,7 @@ PSI_stage_info stage_starting= { 0, "starting", 0}; PSI_stage_info stage_waiting_for_flush= { 0, "Waiting for non trans tables to be flushed", 0}; PSI_stage_info stage_waiting_for_ddl= { 0, "Waiting for DDLs", 0}; PSI_stage_info stage_waiting_for_reset_master= { 0, "Waiting for a running RESET MASTER to complete", 0}; +PSI_stage_info stage_reading_data_from_parallel_worker= { 0, "Reading data from parallel workers", 0}; #ifdef WITH_WSREP // Additional Galera thread states @@ -10003,6 +10004,10 @@ PSI_memory_key key_memory_user_var_entry_value; PSI_memory_key key_memory_String_value; PSI_memory_key key_memory_WSREP; PSI_memory_key key_memory_trace_ddl_info; +PSI_memory_key key_memory_pwt_queued_event; +PSI_memory_key key_memory_pwt_error_message; +PSI_memory_key key_memory_pwt_workers; +PSI_memory_key key_memory_pwt_db; #ifdef HAVE_PSI_INTERFACE @@ -10144,7 +10149,8 @@ PSI_stage_info *all_server_stages[]= & stage_reading_semi_sync_ack, & stage_waiting_for_deadlock_kill, & stage_starting, - & stage_waiting_for_reset_master + & stage_waiting_for_reset_master, + & stage_reading_data_from_parallel_worker #ifdef WITH_WSREP , & stage_waiting_isolation, @@ -10256,6 +10262,9 @@ static PSI_memory_info all_server_memory[]= { &key_memory_trace_ddl_info, "TRACE_DDL_INFO", 0} }; + +extern void pwt_init_psi_keys(void); + /** Initialise all the performance schema instrumentation points used by the server. @@ -10342,6 +10351,7 @@ void init_server_psi_keys(void) stmt_info_rpl.m_flags= PSI_FLAG_MUTABLE; mysql_statement_register(category, &stmt_info_rpl, 1); #endif + pwt_init_psi_keys(); } #endif /* HAVE_PSI_INTERFACE */ diff --git a/sql/mysqld.h b/sql/mysqld.h index 9ee5bc33740e2..1264b0d4904a2 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -499,6 +499,10 @@ extern PSI_memory_key key_memory_Table_trigger_dispatcher; extern PSI_memory_key key_memory_native_functions; extern PSI_memory_key key_memory_WSREP; extern PSI_memory_key key_memory_trace_ddl_info; +extern PSI_memory_key key_memory_pwt_queued_event; +extern PSI_memory_key key_memory_pwt_error_message; +extern PSI_memory_key key_memory_pwt_workers; +extern PSI_memory_key key_memory_pwt_db; /* MAINTAINER: Please keep this list in order, to limit merge collisions. @@ -646,6 +650,7 @@ extern PSI_stage_info stage_slave_background_wait_request; extern PSI_stage_info stage_waiting_for_deadlock_kill; extern PSI_stage_info stage_starting; extern PSI_stage_info stage_waiting_for_reset_master; +extern PSI_stage_info stage_reading_data_from_parallel_worker; #ifdef WITH_WSREP // Additional Galera thread states extern PSI_stage_info stage_waiting_isolation; diff --git a/sql/privilege.h b/sql/privilege.h index b3791a8e3a2f5..80d9d3b80f254 100644 --- a/sql/privilege.h +++ b/sql/privilege.h @@ -452,6 +452,8 @@ constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_MAX_CONNECT_ERRORS= // Was SUPER_ACL prior to 10.5.2 constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_MAX_PASSWORD_ERRORS= CONNECTION_ADMIN_ACL; +constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_PARALLEL_WORKER_THREADS= + CONNECTION_ADMIN_ACL; // Was SUPER_ACL prior to 10.5.2 constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_PROXY_PROTOCOL_NETWORKS= CONNECTION_ADMIN_ACL; diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 5423eb393573d..fa6068a382d09 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -714,7 +714,7 @@ const char *thd_where(THD *thd) THD::THD(my_thread_id id, bool is_wsrep_applier) :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, /* statement id */ 0), - rli_fake(0), rgi_fake(0), rgi_slave(NULL), + rli_fake(0), rgi_fake(0), rgi_slave(NULL), pwt_worker_info(NULL), protocol_text(this), protocol_binary(this), initial_status_var(0), m_current_stage_key(0), m_psi(0), start_time(0), start_time_sec_part(0), in_sub_stmt(0), log_all_errors(0), @@ -5462,8 +5462,7 @@ void destroy_thd(MYSQL_THD thd) /** Create a THD that only has auxiliary functions - It will never be added to the global connection list - server_threads. It does not represent any client connection. + It does not represent any client connection. It should never be counted, because it will stall the shutdown. It is solely for engine's internal use, diff --git a/sql/sql_class.h b/sql/sql_class.h index 9897df33766b9..d34ce97369622 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -872,6 +872,7 @@ typedef struct system_variables ulong server_id; ulong session_track_transaction_info; ulong threadpool_priority; + ulong parallel_worker_threads; ulong vers_alter_history; /* deadlock detection */ @@ -3154,7 +3155,7 @@ enum class THD_WHERE const char *thd_where(THD *thd); - +class pwt_worker; /** @class THD @@ -3199,6 +3200,8 @@ class THD: public THD_count, /* this must be first */ /* Slave applier execution context */ rpl_group_info* rgi_slave; + pwt_worker *pwt_worker_info; + union { rpl_io_thread_info *rpl_io_info; rpl_sql_thread_info *rpl_sql_info; diff --git a/sql/sql_parallel_workers.cc b/sql/sql_parallel_workers.cc new file mode 100644 index 0000000000000..439124098e66c --- /dev/null +++ b/sql/sql_parallel_workers.cc @@ -0,0 +1,619 @@ +/* + Copyright (c) 2026, MariaDB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 + USA */ + +/** + @file + + Implementation of parallel worker threads (PWT) management and execution + logic. + + Contains + error_to_queue + push an error message onto our queue to send to the manager + PWT_error_handler + intercept error and warnings, queue them to the manager + parallel_worker_thread_func + Entry point for our worker threads + abort_worker + pwt_management::free_queue + helper for error conditions + pwt_management::init_parallel_workers + Initialise our parallel worker threads + pwt_init_psi_keys + initialize PSI keys + pwt_management::join_parallel_workers + process information from worker threads until they are finished +*/ + + +#include "sql_parallel_workers.h" +#include "debug_sync.h" + +#ifdef HAVE_PSI_INTERFACE +static PSI_thread_key key_thread_pwt; +static PSI_thread_info all_pwt_threads[]= +{ + { &key_thread_pwt, WORKER_NAME, PSI_FLAG_GLOBAL}, +}; + +static PSI_mutex_key key_mutex_pwt_LOCK_manager, + key_mutex_pwt_LOCK_thread, + key_mutex_pwt_LOCK_worker; +static PSI_mutex_info all_pwt_mutexes[]= +{ + { &key_mutex_pwt_LOCK_manager, "pwt_management::LOCK_pwt_manager", 0}, + { &key_mutex_pwt_LOCK_thread, "pwt_management::LOCK_pwt_thread", 0}, + { &key_mutex_pwt_LOCK_worker, "pwt_worker::LOCK_worker", 0}, +}; + +static PSI_cond_key key_COND_pwt_new_message, key_COND_pwt_worker; +static PSI_cond_info all_pwt_conds[]= +{ + { &key_COND_pwt_new_message, "pwt_management::COND_pwt_new_message", 0}, + { &key_COND_pwt_worker, "pwt_worker::COND_worker", 0}, +}; + +static PSI_memory_info all_pwt_memory[]= +{ + { &key_memory_pwt_queued_event, "pwt_queued_event", 0}, + { &key_memory_pwt_error_message, "pwt_error_message", 0}, + { &key_memory_pwt_workers, "pwt_management::workers", 0}, + { &key_memory_pwt_db, "pwt_worker::db", 0}, +}; +#endif /* HAVE_PSI_INTERFACE */ + + +/** + @brief + push an error message onto our queue to send to the manager + + @return + true an error occurred + false error or warning is queued +*/ + +bool error_to_queue(THD *thd, pwt_queued_event **event, uint error, + Sql_condition::enum_warning_level level, const char *msg) +{ + DBUG_EXECUTE_IF("pwt_error_to_queue_oom", + { *event= nullptr; return true; }); + *event= (pwt_queued_event*) my_malloc(key_memory_pwt_queued_event, + sizeof(pwt_queued_event), + MYF(0)); + if (!*event) + return true; + (*event)->error= (pwt_error_message*) my_malloc(key_memory_pwt_error_message, + sizeof(pwt_error_message), + MYF(0)); + if (!(*event)->error) + { + my_free(*event); + *event= nullptr; + return true; + } + (*event)->data= nullptr; + (*event)->error->level= level; + if (level == Sql_condition::enum_warning_level::WARN_LEVEL_ERROR) + (*event)->error->worker_errno= thd->killed_errno(); + else + (*event)->error->worker_errno= 0; + (*event)->error->code= error; + (*event)->error->message= (char *) my_malloc(key_memory_pwt_error_message, + strlen(msg)+1, + MYF(0)); + if (!(*event)->error->message) + { + my_free((*event)->error); + my_free(*event); + *event= nullptr; + return true; + } + strmake((*event)->error->message, msg, strlen(msg)); + return false; +} + + +class PWT_error_handler : public Internal_error_handler +{ +public: + bool handle_condition(THD *thd, + uint sql_errno, + const char* sql_state, + Sql_condition::enum_warning_level *level, + const char* msg, + Sql_condition ** cond_hdl) override + { + if (pwt_worker *worker= thd->pwt_worker_info) + { + pwt_queued_event *event; + if (error_to_queue(thd, &event, sql_errno, *level, msg)) + { + /* + Couldn't allocate the queued event. The worker THD's diagnostics + area is discarded when the worker exits, so flag the manager so it + can surface a single ER_OUTOFMEMORY warning to the user instead of + letting this condition vanish. + */ + mysql_mutex_lock(&worker->manager->LOCK_pwt_thread); + worker->manager->messages_dropped= true; + mysql_mutex_unlock(&worker->manager->LOCK_pwt_thread); + mysql_mutex_lock(&worker->manager->LOCK_pwt_manager); + mysql_cond_signal(&worker->manager->COND_pwt_new_message); + mysql_mutex_unlock(&worker->manager->LOCK_pwt_manager); + return true; + } + mysql_mutex_lock(&worker->manager->LOCK_pwt_thread); + worker->manager->parallel_messages.push_back(event); + mysql_mutex_unlock(&worker->manager->LOCK_pwt_thread); + } + return true; // no further processing in worker thread + } + +}; + + +/** + @brief + Entry point for our worker threads, arg supplied by manager details what + needs to be run +*/ + +static void *parallel_worker_thread_func(void *arg) +{ + pwt_worker *worker= (pwt_worker*) arg; + struct timespec abs_timeout; + PSI_stage_info old_stage; + + PWT_error_handler error_handler; + abs_timeout.tv_nsec= 0; + /* + Set current_thd and thread local storage (my_thread_var) for our new THD + to ensure they have their own local objects/errors/warnings etc + */ + void *save= thd_attach_thd(worker->thd); + my_thread_set_name(worker->thd->connection_name.str); + THD_STAGE_INFO(worker->thd, stage_sending_data); + worker->thd->push_internal_handler(&error_handler); + + /* + START: in lieu of work, wait 1 seconds, push out an error or a warning, + wait another 1 seconds then exit + */ + abs_timeout.tv_sec= time(0)+1; + mysql_mutex_lock(&worker->LOCK_worker); + worker->thd->ENTER_COND(&worker->COND_worker, &worker->LOCK_worker, + &stage_sending_data, &old_stage); + mysql_cond_timedwait(&worker->COND_worker, &worker->LOCK_worker, + &abs_timeout); + worker->thd->EXIT_COND(&old_stage); + + mysql_mutex_lock(&worker->thd->LOCK_thd_kill); + if (worker->thd->killed) + { + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + my_error(ER_QUERY_INTERRUPTED, MYF(0)); + goto worker_thread_exit; + } + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + + if (worker->parallel_scan_job) + push_warning(current_thd, Sql_condition::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR, + "This is an example warning to show we can push a " + "warning from a worker thread to its manager "); + else + my_error(ER_ARGUMENT_OUT_OF_RANGE, MYF(0), "worker_busted_function()"); + +#ifdef ENABLED_DEBUG_SYNC + /* + we can't sync on the managers or our THD, spin the whole thing about + and use the global signal pool, NO_CLEAR_EVENT is needed because we have + multiple workers and the wrong one will likely consume the signal. + */ + DBUG_EXECUTE_IF("pwt_worker_pause_before_signal", + DBUG_ASSERT(!debug_sync_set_action(worker->thd, STRING_WITH_LEN( + "now SIGNAL pwt_worker_paused WAIT_FOR pwt_worker_continue NO_CLEAR_EVENT" + )));); +#endif + + // signal manager there is something in the queue, + mysql_mutex_lock(&worker->manager->LOCK_pwt_manager); + mysql_cond_signal(&worker->manager->COND_pwt_new_message); + mysql_mutex_unlock(&worker->manager->LOCK_pwt_manager); + + abs_timeout.tv_sec= time(0)+1; + mysql_mutex_lock(&worker->LOCK_worker); + worker->thd->ENTER_COND(&worker->COND_worker, &worker->LOCK_worker, + &stage_sending_data, &old_stage); + mysql_cond_timedwait(&worker->COND_worker, &worker->LOCK_worker, + &abs_timeout); + worker->thd->EXIT_COND(&old_stage); + + mysql_mutex_lock(&worker->thd->LOCK_thd_kill); + if (worker->thd->killed) + { + my_error(ER_QUERY_INTERRUPTED, MYF(0)); + } + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + + // END: in lieu of work + +worker_thread_exit: + + // manager needs to see this as atomic + mysql_mutex_lock(&worker->LOCK_worker); + /* + LOCK_thd_kill is the canonical guard for thd->killed; a user-issued + KILL on this worker's thread_id goes through THD::awake() which holds + LOCK_thd_kill but not LOCK_worker, so we must nest both to get a + race-free snapshot for the manager. + Lock order matches join_parallel_workers(). + */ + mysql_mutex_lock(&worker->thd->LOCK_thd_kill); + worker->killed= worker->thd->killed; // save this flag, THD is destroyed + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + worker->thd->pop_internal_handler(); // maybe not needed + worker->finished= true; + THD *thd= worker->thd; + worker->thd= nullptr; + mysql_mutex_unlock(&worker->LOCK_worker); + + // signal manager again to wake up and end this thread + mysql_mutex_lock(&worker->manager->LOCK_pwt_manager); + mysql_cond_signal(&worker->manager->COND_pwt_new_message); + mysql_mutex_unlock(&worker->manager->LOCK_pwt_manager); + + /* + executing this sets my_thread_var to null, stopping our ability use + the normal mutex mechanisms, so we operate this outside the locked region + on a copy of our THD pointer + */ + thd_detach_thd(save); + server_threads.erase(thd); + destroy_background_thd(thd); + + return nullptr; +} + +/** + @brief + Abort this worker, called as part of an error condition + + The worker may already be tearing itself down: parallel_worker_thread_func + nulls worker->thd and destroys the THD under LOCK_worker. Take that lock + and only awake() if the worker hasn't yet entered its exit section; if + it has, the worker is on its way out and pthread_join will reap it. +*/ + +void abort_worker(pwt_worker *worker) +{ + mysql_mutex_lock(&worker->LOCK_worker); + if (worker->thd) + worker->thd->awake(ABORT_QUERY); + mysql_mutex_unlock(&worker->LOCK_worker); + pthread_join(worker->pthread, nullptr); + mysql_mutex_destroy(&worker->LOCK_worker); + mysql_cond_destroy(&worker->COND_worker); +} + + +/** + @brief + Free our message queue, discard the messages +*/ + +void pwt_management::free_queue() +{ + // process queue + if (!parallel_messages.head()) + return; + + mysql_mutex_lock(&LOCK_pwt_thread); + pwt_queued_event *event; + while ((event= parallel_messages.get())) + { + if (pwt_error_message *err= event->error) + { + my_free(err->message); + my_free(err); + } + if (event->data) + { + // TODO: free associated + } + my_free(event); + } + mysql_mutex_unlock(&LOCK_pwt_thread); +} + + +/** + @brief + Initialise our parallel worker threads, setting their own new THD objects. + Set up our mutexs for synchronization. + Register our new threads in server_threads. + + Called from the management thread for applicable queries at the top level. +*/ + +bool pwt_management::init_parallel_workers(THD *thd) +{ + bool result= false; + uint i= 0; + + if (const uint n= thd->variables.parallel_worker_threads) + { + workers= (pwt_worker *) my_malloc(key_memory_pwt_workers, + n * sizeof(pwt_worker), + MYF(MY_WME | MY_ZEROFILL)); + if (!workers) + return true; + + nworkers= n; + + mysql_mutex_init(key_mutex_pwt_LOCK_manager, &LOCK_pwt_manager, + MY_MUTEX_INIT_SLOW); + mysql_mutex_init(key_mutex_pwt_LOCK_thread, &LOCK_pwt_thread, + MY_MUTEX_INIT_SLOW); + mysql_cond_init(key_COND_pwt_new_message, &COND_pwt_new_message, NULL); + for (i= 0; i < n; i++) + { + workers[i].thd= create_background_thd(); + if (!workers[i].thd) + { + result= true; + goto cleanup_old_workers; + } + + workers[i].manager= this; + mysql_mutex_init(key_mutex_pwt_LOCK_worker, &workers[i].LOCK_worker, + MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_pwt_worker, &workers[i].COND_worker, nullptr); + workers[i].thd->system_thread= SYSTEM_THREAD_GENERIC; + size_t len= my_snprintf(workers[i].conn_name, MAX_THREAD_NAME, + WORKER_NAME); + workers[i].thd->connection_name.str= workers[i].conn_name; + workers[i].thd->connection_name.length= len; + workers[i].thd->security_ctx= thd->security_ctx; + workers[i].thd->set_command(thd->get_command()); + if (thd->db.str) + { + // explicit call in ~THD/THD::free_connection()/my_free, so we do this + workers[i].thd->db.str= (char*)my_malloc(key_memory_pwt_db, + thd->db.length+1, + MYF(0)); + if (!workers[i].thd->db.str) + { + result= true; + goto cleanup_db_string; + } + + strmake(const_cast(workers[i].thd->db.str), thd->db.str, + thd->db.length); + workers[i].thd->db.length= thd->db.length; + } + else + { + workers[i].thd->db.str= nullptr; + workers[i].thd->db.length= 0; + } + workers[i].thd->start_utime= thd->start_utime; + workers[i].thd->thread_id= next_thread_id(); + my_snprintf(workers[i].info, sizeof(workers[i].info), + WORKER_NAME " %u " CONNECTION_NAME_THREAD " %llu", + i+1, thd->thread_id); + workers[i].thd->query_string= CSET_STRING(workers[i].info, + strlen(workers[i].info), + workers[i].thd->query_charset()); + workers[i].thd->pwt_worker_info= workers+i; + workers[i].finished= workers[i].joined= false; + workers[i].killed= NOT_KILLED; + if ((i+1)%10) // determines error or warning in a deterministic way + workers[i].parallel_scan_job= (void*)0x1; + server_threads.insert(workers[i].thd); // +information_schema.processlist + + if (mysql_thread_create(key_thread_pwt, &workers[i].pthread, nullptr, + parallel_worker_thread_func, &workers[i])) + { + result= true; + goto cleanup_thread_create; + } + } + this->thd= thd; + return result; + } + else + return false; + +cleanup_thread_create: + server_threads.erase(workers[i].thd); + +cleanup_db_string: + /* + destroy_background_thd() requires current_thd to be NULL because it + re-attaches the background THD to this thread's TLS. We are running on + the user's query thread (current_thd == manager thd), so save/null/ + restore around the call. Mirrors the create_background_thd() pattern. + */ + { + THD *save_thd= current_thd; + set_current_thd(nullptr); + destroy_background_thd(workers[i].thd); + set_current_thd(save_thd); + } + mysql_mutex_destroy(&workers[i].LOCK_worker); + mysql_cond_destroy(&workers[i].COND_worker); + +cleanup_old_workers: + for (uint j= 0; j < i; j++) + abort_worker(workers+j); + free_queue(); + my_free(workers); + workers= nullptr; + nworkers= 0; + mysql_cond_destroy(&COND_pwt_new_message); + mysql_mutex_destroy(&LOCK_pwt_manager); + mysql_mutex_destroy(&LOCK_pwt_thread); + + return result; +} + +#ifdef HAVE_PSI_INTERFACE +void pwt_init_psi_keys(void) +{ + const char *category= "sql"; + int count; + count= array_elements(all_pwt_threads); + PSI_server->register_thread(category, all_pwt_threads, count); + count= array_elements(all_pwt_mutexes); + mysql_mutex_register(category, all_pwt_mutexes, count); + count= array_elements(all_pwt_conds); + mysql_cond_register(category, all_pwt_conds, count); + count= array_elements(all_pwt_memory); + mysql_memory_register(category, all_pwt_memory, count); +} +#endif + + +/** + @brief + Process data {errors, warnings, data, signals} from the workers. + + Currently this is called in the main thread after JOIN::exec_inner, but + this will need to be disassembled and integrated into the above (or vice + versa). +*/ + +void pwt_management::join_parallel_workers(THD *thd) +{ + bool all_done= false, workers_killed= false; + PSI_stage_info old_stage; + struct timespec wait_max; + wait_max.tv_nsec= 0; + int killed_from= -1; + + while (!all_done) + { + wait_max.tv_sec= time(0)+1; // wait 1s + mysql_mutex_lock(&LOCK_pwt_manager); + thd->ENTER_COND(&COND_pwt_new_message, &LOCK_pwt_manager, + &stage_reading_data_from_parallel_worker, &old_stage); + mysql_cond_timedwait(&COND_pwt_new_message, &LOCK_pwt_manager, &wait_max); + thd->EXIT_COND(&old_stage); + + all_done= true; + + // delete worker threads that are finished + for (uint i= 0; i < nworkers; i++) + { + if (workers[i].joined) // already done + continue; + mysql_mutex_lock(&workers[i].LOCK_worker); + if (workers[i].finished) + { + mysql_mutex_unlock(&workers[i].LOCK_worker); + if (workers[i].killed) + { + killed_from= i; + thd->awake(workers[i].killed); + } + pthread_join(workers[i].pthread, nullptr); + mysql_mutex_destroy(&workers[i].LOCK_worker); + mysql_cond_destroy(&workers[i].COND_worker); + workers[i].joined= true; + } + else + { + mysql_mutex_unlock(&workers[i].LOCK_worker); + all_done= false; + } + } + + if (thd->killed && !workers_killed) + { + // inform our workers that they are killed + for (uint i= 0; i < nworkers; i++) + { + if (workers[i].joined) + continue; + mysql_mutex_lock(&workers[i].LOCK_worker); + if (workers[i].finished) + { + mysql_mutex_unlock(&workers[i].LOCK_worker); + continue; + } + + if ((int)i != killed_from) + { + mysql_mutex_lock(&workers[i].thd->LOCK_thd_kill); + workers[i].thd->killed= thd->killed; + mysql_mutex_unlock(&workers[i].thd->LOCK_thd_kill); + mysql_cond_signal(&workers[i].COND_worker); + } + mysql_mutex_unlock(&workers[i].LOCK_worker); + } + workers_killed= true; + } + else + { + // process queue + bool surface_drop; + mysql_mutex_lock(&LOCK_pwt_thread); + surface_drop= messages_dropped; + messages_dropped= false; + pwt_queued_event *event; + while ((event= parallel_messages.get())) + { + if (pwt_error_message *err= event->error) + { + /* + set_overwrite_status to capture a message in our worker THD + TODO, look at getting rid of this if we can + */ + // thd->get_stmt_da()->set_overwrite_status(true); + if (err->level == Sql_condition::enum_warning_level::WARN_LEVEL_ERROR) + my_message_sql(err->code, err->message, MYF(0)); + else + push_warning(thd, err->level, err->code, err->message); + + my_free(err->message); + my_free(err); + } + if (event->data) + { + // process data from our worker thread + } + my_free(event); + } + mysql_mutex_unlock(&LOCK_pwt_thread); + + if (surface_drop) + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_OUTOFMEMORY, + "Parallel worker diagnostics were dropped due " + "to memory allocation failure"); + } + } + + if (nworkers) + { + mysql_cond_destroy(&COND_pwt_new_message); + mysql_mutex_destroy(&LOCK_pwt_manager); + mysql_mutex_destroy(&LOCK_pwt_thread); + my_free(workers); + workers= nullptr; + } +} diff --git a/sql/sql_parallel_workers.h b/sql/sql_parallel_workers.h new file mode 100644 index 0000000000000..e7117e8155fc6 --- /dev/null +++ b/sql/sql_parallel_workers.h @@ -0,0 +1,117 @@ +#ifndef SQL_PARALLEL_WORKERS_H +#define SQL_PARALLEL_WORKERS_H + +#include "mariadb.h" +#include "sql_class.h" +#include "mysqld.h" +#include "sql_error.h" + +extern MYSQL_THD create_background_thd(); +extern void destroy_background_thd(MYSQL_THD thd); +extern void *thd_attach_thd(MYSQL_THD thd); +extern void thd_detach_thd(void *save); + +// PWT Parallel Worker Thread + + +/* + Message Types +*/ +class pwt_error_message +{ +public: + uint worker_errno; + uint code; + Sql_condition::enum_warning_level level; + char *message; +}; + +class pwt_data_message +{ +public: + TABLE *tmp_table; +}; + + +/* + Event type. Inherits ilink so it can live in an I_List. +*/ +class pwt_queued_event : public ilink +{ +public: + pwt_error_message *error; + pwt_data_message *data; +}; + +class pwt_management; + + +#define WORKER_NAME "Parallel Worker" +#define WORKER_ID_LENGTH 3 +#define WORKER_NAME_LENGTH 15 +#define CONNECTION_NAME_THREAD "For Thread ID" +#define CONNECTION_NAME_THREAD_LENGTH 13 +#define THREAD_ID_LENGTH 20 // ull can occupy 20 chars + +/* + Parallel Worker Thread specific attributes +*/ +class pwt_worker +{ +public: + THD *thd; + pwt_management *manager; + pthread_t pthread; + mysql_mutex_t LOCK_worker; + mysql_cond_t COND_worker; + char conn_name[MAX_THREAD_NAME+1]; + /* + This is displayed in information_schema.processlist.info + Currently "Parallel Worker {1..N} For Thread M" + */ + char info[WORKER_NAME_LENGTH+ + 1+WORKER_ID_LENGTH+1+ + CONNECTION_NAME_THREAD_LENGTH+ + 1+THREAD_ID_LENGTH+1]; + bool joined; + bool finished; + killed_state killed; + void *parallel_scan_job; +}; + + +/* + Class to create, manage and eventually destroy a "team" of worker threads. +*/ +class pwt_management : public Sql_alloc +{ +public: + pwt_worker *workers; + uint nworkers; + I_List parallel_messages; + mysql_cond_t COND_pwt_new_message; + mysql_mutex_t LOCK_pwt_manager; + mysql_mutex_t LOCK_pwt_thread; + THD *thd; + /* + Set under LOCK_pwt_thread when a worker fails to allocate a queued event. + The manager surfaces a single ER_OUTOFMEMORY warning so the user sees + that worker diagnostics were dropped instead of silently disappearing. + */ + bool messages_dropped; + pwt_management(): + workers(nullptr), + nworkers(0), + messages_dropped(false) + {} + ~pwt_management() + { + if (workers) + join_parallel_workers(current_thd); + } + bool init_parallel_workers(THD *thd); + void join_parallel_workers(THD *thd); + void free_queue(); +}; + +#endif diff --git a/sql/sql_select.cc b/sql/sql_select.cc index dbd3283c45c5c..376fde7b339b8 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -26,6 +26,7 @@ */ #include "mariadb.h" +#include "mysqld_error.h" #include "sql_priv.h" #include "unireg.h" #include "sql_select.h" @@ -70,6 +71,7 @@ #include "derived_handler.h" #include "opt_hints.h" #include "opt_group_by_cardinality.h" +#include "sql_parallel_workers.h" /* A key part number that means we're using a fulltext scan. @@ -4909,6 +4911,21 @@ int JOIN::exec() select_lex->select_number)) dbug_serve_apcs(thd, 1); ); + + // If we are a top level select statement + // TEMPORARY-PARALLEL-WORK-TEST: + if (!select_lex->outer_select() && thd->lex->sql_command == SQLCOM_SELECT && + !parallel_work_manager) + { + if (!(parallel_work_manager= new (thd->mem_root) pwt_management) || + parallel_work_manager->init_parallel_workers(thd)) + { + my_error(ER_INTERNAL_ERROR, MYF(0), + "Failed to initialize parallel work mgr"); + return 1; + } + } + ANALYZE_START_TRACKING(thd, &explain->time_tracker); res= exec_inner(); ANALYZE_STOP_TRACKING(thd, &explain->time_tracker); @@ -17316,6 +17333,16 @@ void JOIN::cleanup(bool full) if (full) have_query_plan= QEP_DELETED; + // TEMPORARY-PARALLEL-WORK-TEST: + if (parallel_work_manager) + { + // Finish work and delete the manager + if (parallel_work_manager->workers) + parallel_work_manager->join_parallel_workers(thd); + delete parallel_work_manager; + parallel_work_manager= NULL; + } + if (original_join_tab) { /* Free the original optimized join created for the group_by_handler */ diff --git a/sql/sql_select.h b/sql/sql_select.h index 2507a871a6005..0b6b0cfdb8156 100644 --- a/sql/sql_select.h +++ b/sql/sql_select.h @@ -33,6 +33,7 @@ #include "sql_update.h" #include "cset_narrowing.h" +#include "sql_parallel_workers.h" typedef struct st_join_table JOIN_TAB; /* Values in optimize */ @@ -1753,6 +1754,8 @@ class JOIN :public Sql_alloc */ Sql_cmd_dml *sql_cmd_dml; + pwt_management *parallel_work_manager{0}; + JOIN(THD *thd_arg, List &fields_arg, ulonglong select_options_arg, select_result *result_arg) :fields_list(fields_arg) diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index d6becc62d8632..c2580ad215f55 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -2496,6 +2496,16 @@ Sys_slave_parallel_workers( ON_UPDATE(fix_slave_parallel_threads)); +static Sys_var_on_access_global +Sys_parallel_worker_threads( + "parallel_worker_threads", + "Number of worker threads available for parallel query execution. " + "0 means parallel execution is disabled", + SESSION_VAR(parallel_worker_threads), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0,100), DEFAULT(0), BLOCK_SIZE(1)); + + static bool check_slave_domain_parallel_threads(sys_var *self, THD *thd, set_var *var) {