Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,19 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb

saveLastNodeActivity(number_of_current_replica);

auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Replica number {} was marked as lost, can't set task for it anymore",
number_of_current_replica
);
{
std::lock_guard lock(mutex);
auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
// rescheduleTasksFromReplica can be called only when error catched in RemoteQueryExecutor::processPacket
// so getnextTash can't bealled after that
// Check only for logical eeror in code
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Replica number {} was marked as lost, can't set task for it anymore",
number_of_current_replica
);
}

// 1. Check pre-queued files first
auto file = getPreQueuedFile(number_of_current_replica);
Expand All @@ -63,7 +69,19 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb
file = getAnyUnprocessedFile(number_of_current_replica);

if (file)
{
std::lock_guard lock(mutex);
auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);

if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Replica number {} was marked as lost, can't set task for it anymore",
number_of_current_replica
);

processed_file_list_ptr->second.push_back(file);
}

return file;
}
Expand Down Expand Up @@ -121,7 +139,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t
auto next_file = files.back();
files.pop_back();

auto file_identifier = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getIdentifier();
auto file_identifier = getFileIdentifier(next_file);
auto it = unprocessed_files.find(file_identifier);
if (it == unprocessed_files.end())
continue;
Expand Down Expand Up @@ -179,20 +197,15 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter
}
}

String file_identifier;
if (send_over_whole_archive && object_info->isArchive())
{
file_identifier = object_info->getPathOrPathToArchiveIfArchive();
LOG_TEST(log, "Will send over the whole archive {} to replicas. "
"This will be suboptimal, consider turning on "
"cluster_function_process_archive_on_multiple_nodes setting", file_identifier);
}
else
String file_identifier = getFileIdentifier(object_info, true);

size_t file_replica_idx;

{
file_identifier = object_info->getIdentifier();
std::lock_guard lock(mutex);
file_replica_idx = getReplicaForFile(file_identifier);
}

size_t file_replica_idx = getReplicaForFile(file_identifier);
if (file_replica_idx == number_of_current_replica)
{
LOG_TRACE(
Expand Down Expand Up @@ -248,7 +261,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s
auto next_file = it->second.first;
unprocessed_files.erase(it);

auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getAbsolutePath().value_or(next_file->getPath());
auto file_path = getFileIdentifier(next_file);
LOG_TRACE(
log,
"Iterator exhausted. Assigning unprocessed file {} to replica {} from matched replica {}",
Expand Down Expand Up @@ -308,13 +321,31 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_
"All replicas were marked as lost"
);

auto files = std::move(processed_file_list_ptr->second);
replica_to_files_to_be_processed.erase(number_of_current_replica);
for (const auto & file : processed_file_list_ptr->second)
for (const auto & file : files)
{
auto file_replica_idx = getReplicaForFile(file->getAbsolutePath().value_or(file->getPath()));
unprocessed_files.emplace(file->getAbsolutePath().value_or(file->getPath()), std::make_pair(file, file_replica_idx));
auto file_identifier = getFileIdentifier(file);
auto file_replica_idx = getReplicaForFile(file_identifier);
unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx));
connection_to_files[file_replica_idx].push_back(file);
}
}

String StorageObjectStorageStableTaskDistributor::getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log) const
{
if (send_over_whole_archive && file_object->isArchive())
{
auto file_identifier = file_object->getPathOrPathToArchiveIfArchive();
if (write_to_log)
{
LOG_TEST(log, "Will send over the whole archive {} to replicas. "
"This will be suboptimal, consider turning on "
"cluster_function_process_archive_on_multiple_nodes setting", file_identifier);
}
return file_identifier;
}
return file_object->getIdentifier();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class StorageObjectStorageStableTaskDistributor

void saveLastNodeActivity(size_t number_of_current_replica);

String getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log = false) const;

const std::shared_ptr<IObjectIterator> iterator;
const bool send_over_whole_archive;

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_s3_cluster/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ def test_joins(started_cluster):
assert len(res) == 25


def _test_graceful_shutdown(started_cluster):
def test_graceful_shutdown(started_cluster):
node = started_cluster.instances["s0_0_0"]
node_to_shutdown = started_cluster.instances["s0_1_0"]

Expand Down
Loading