diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index 03b99a1800b1..f89585d04f89 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -116,6 +116,16 @@ class ThreadGroup void attachQueryForLog(const String & query_, UInt64 normalized_hash = 0); void attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & profile_queue); + /// Override the cancellation predicate. All threads that subsequently attach to this + /// group via ThreadGroupSwitcher inherit the predicate in their local_data, making + /// isQueryCanceled() reflect task-level cancellation without a process-list entry. + /// Required for part and partition export cancellation during S3 outage. + void setCancelPredicate(QueryIsCanceledPredicate predicate) + { + std::lock_guard lock(mutex); + shared_data.query_is_canceled_predicate = std::move(predicate); + } + /// When new query starts, new thread group is created for it, current thread becomes master thread of the query static ThreadGroupPtr createForQuery(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {}); diff --git a/src/Storages/MergeTree/ExportPartTask.cpp b/src/Storages/MergeTree/ExportPartTask.cpp index 20e8d52f03a1..83e8e28daeb1 100644 --- a/src/Storages/MergeTree/ExportPartTask.cpp +++ b/src/Storages/MergeTree/ExportPartTask.cpp @@ -175,6 +175,28 @@ bool ExportPartTask::executeStep() manifest.query_id, local_context); + /* + This is a hack to fix the issue where S3 is out, ClickHouse keeps retrying S3 requests deep + in the AWS SDK and never check for the `isCancelled()` flag. That prevents the task from being killed / cancelled. It also prevents the table from being dropped. + + Merges and mutations don't suffer from this problem because they don't make requests to S3 :). Select statements + do make requests to S3, but the cancel predicate is properly setup for regular queries. + + I think this is the first time we have a background operation that makes requests to S3, so we need to connect the dots. + + The simples way is this one, and given the release timeline, I am opting for it. + */ + (*exports_list_entry)->thread_group->setCancelPredicate( + [weak_this = weak_from_this()]() -> bool + { + if (auto shared_this = weak_this.lock()) + { + return shared_this->isCancelled(); + } + + return true; + }); + SinkToStoragePtr sink; const auto new_file_path_callback = [&exports_list_entry](const std::string & file_path) @@ -185,6 +207,8 @@ bool ExportPartTask::executeStep() try { + ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, ThreadName::EXPORT_PART); + const auto filename = buildDestinationFilename(manifest, storage.getStorageID(), local_context); sink = destination_storage->import( @@ -232,8 +256,6 @@ bool ExportPartTask::executeStep() local_context, getLogger("ExportPartition")); - ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, ThreadName::EXPORT_PART); - /// We need to support exporting materialized and alias columns to object storage. For some reason, object storage engines don't support them. /// This is a hack that materializes the columns before the export so they can be exported to tables that have matching columns materializeSpecialColumns(plan_for_part.getCurrentHeader(), metadata_snapshot, local_context, plan_for_part); diff --git a/src/Storages/MergeTree/ExportPartTask.h b/src/Storages/MergeTree/ExportPartTask.h index a3f1635c4902..1596f2bf23c9 100644 --- a/src/Storages/MergeTree/ExportPartTask.h +++ b/src/Storages/MergeTree/ExportPartTask.h @@ -7,7 +7,7 @@ namespace DB { -class ExportPartTask : public IExecutableTask +class ExportPartTask : public IExecutableTask, public std::enable_shared_from_this { public: explicit ExportPartTask(