Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions encoder/basisu_astc_hdr_6x6_enc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6717,6 +6717,8 @@ bool compress_photo(const basisu::imagef &orig_src_img, const astc_hdr_6x6_globa
interval_timer tm;
tm.start();

job_pool::token token{0};

std::atomic_bool any_failed_flag;
any_failed_flag.store(false);

Expand Down Expand Up @@ -6745,14 +6747,14 @@ bool compress_photo(const basisu::imagef &orig_src_img, const astc_hdr_6x6_globa
any_failed_flag.store(true, std::memory_order_relaxed);
}
}
} );
}, &token);

if (any_failed_flag)
break;

} // strip_index

pJob_pool->wait_for_all();
pJob_pool->wait_for_all(&token);

if (any_failed_flag)
{
Expand Down
12 changes: 4 additions & 8 deletions encoder/basisu_astc_ldr_encode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5603,6 +5603,8 @@ bool ldr_astc_block_encode_image(
if (enc_cfg.m_debug_output)
fmt_debug_printf("ASTC packing superpass: {}\n", 1 + superpass_index);

job_pool::token token{0};

uint32_t total_blocks_done = 0;
float last_printed_progress_val = -100.0f;

Expand Down Expand Up @@ -6650,7 +6652,7 @@ bool ldr_astc_block_encode_image(

} // if (superpass_index == ...)

});
}, &token);

if (encoder_failed_flag)
break;
Expand All @@ -6662,13 +6664,7 @@ bool ldr_astc_block_encode_image(

} // by

if (encoder_failed_flag)
{
fmt_error_printf("Main compressor block loop failed!\n");
return false;
}

job_pool.wait_for_all();
job_pool.wait_for_all(&token);

if (encoder_failed_flag)
{
Expand Down
18 changes: 10 additions & 8 deletions encoder/basisu_comp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,8 @@ namespace basisu
const uint32_t num_blocks_y = tex.get_blocks_y();
const uint32_t total_blocks = tex.get_total_blocks();
const imagef& source_image = m_slice_images_hdr[slice_index];

job_pool::token token{0};

const uint32_t N = 256;
for (uint32_t block_index_iter = 0; block_index_iter < total_blocks; block_index_iter += N)
Expand Down Expand Up @@ -1232,11 +1234,11 @@ namespace basisu
debug_printf("basis_compressor::encode_slices_to_uastc_4x4_hdr: %3.1f%% done\n", percent_done);
}

});
}, &token);

} // block_index_iter

m_params.m_pJob_pool->wait_for_all();
m_params.m_pJob_pool->wait_for_all(&token);

if (any_failures)
return cECFailedEncodeUASTC;
Expand Down Expand Up @@ -1500,6 +1502,8 @@ namespace basisu
uint32_t total_blocks_processed = 0;
float last_percentage_printed = 0;

job_pool::token token{0};

const uint32_t N = 256;
for (uint32_t block_index_iter = 0; block_index_iter < total_blocks; block_index_iter += N)
{
Expand Down Expand Up @@ -1555,11 +1559,11 @@ namespace basisu
debug_printf("basis_compressor::encode_slices_to_uastc_4x4_ldr: %3.1f%% done\n", percent_done);
}

});
}, &token);

} // block_index_iter

m_params.m_pJob_pool->wait_for_all();
m_params.m_pJob_pool->wait_for_all(&token);

if (m_params.m_rdo_uastc_ldr_4x4)
{
Expand Down Expand Up @@ -4845,7 +4849,7 @@ namespace basisu

for (uint32_t pindex = 0; pindex < params_vec.size(); pindex++)
{
jpool.add_job([pindex, &params_vec, &results_vec, &result, &opencl_failed] {
jpool.add_job([pindex, &params_vec, &results_vec, &result, &opencl_failed, &jpool] {

basis_compressor_params params = params_vec[pindex];
parallel_results& results = results_vec[pindex];
Expand All @@ -4855,9 +4859,7 @@ namespace basisu

basis_compressor c;

// Dummy job pool
job_pool task_jpool(1);
params.m_pJob_pool = &task_jpool;
params.m_pJob_pool = &jpool;
// TODO: Remove this flag entirely
params.m_multithreading = true;

Expand Down
111 changes: 58 additions & 53 deletions encoder/basisu_enc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2238,7 +2238,7 @@ namespace basisu
}

job_pool::job_pool(uint32_t num_threads) :
m_num_active_jobs(0)
m_num_pending_jobs(0)
{
m_kill_flag.store(false);
m_num_active_workers.store(0);
Expand Down Expand Up @@ -2285,68 +2285,89 @@ namespace basisu
m_threads[i].join();
}

void job_pool::add_job(const std::function<void()>& job)
void job_pool::add_job(std::function<void()> job, token* tok)
{
std::unique_lock<std::mutex> lock(m_mutex);
{
std::unique_lock<std::mutex> lock(m_mutex);

m_queue.emplace_back(job);
m_queue.push_back(item{ std::move(job), tok });

const size_t queue_size = m_queue.size();
if (tok)
(*tok)++;

lock.unlock();
m_num_pending_jobs++;
}

if (queue_size > 1)
m_has_work.notify_one();
m_has_work.notify_one();
}

void job_pool::add_job(std::function<void()>&& job)
void job_pool::wait_for_all(token* tok)
{
std::unique_lock<std::mutex> lock(m_mutex);

m_queue.emplace_back(std::move(job));

const size_t queue_size = m_queue.size();

lock.unlock();

if (queue_size > 1)
{
m_has_work.notify_one();
}
}
token* wait_token = tok ? tok : &m_num_pending_jobs;

void job_pool::wait_for_all()
{
std::unique_lock<std::mutex> lock(m_mutex);

// Drain the job queue on the calling thread.
while (!m_queue.empty())
while (true)
{
std::function<void()> job(m_queue.back());
m_queue.pop_back();

lock.unlock();
if (*wait_token == 0)
return;

job();
item job;
if (!job_steal(job, tok, lock))
break;

lock.lock();
job_run(job, lock);
}

// The queue is empty, now wait for all active jobs to finish up.
#ifndef __EMSCRIPTEN__
m_no_more_jobs.wait(lock, [this]{ return !m_num_active_jobs; } );
m_job_done.wait(lock, [wait_token] { return *wait_token == 0; });
#else
// Avoid infinite blocking
for (; ; )
{
if (m_no_more_jobs.wait_for(lock, std::chrono::milliseconds(50), [this] { return !m_num_active_jobs; }))
if (m_job_done.wait_for(lock, std::chrono::milliseconds(50), [wait_token] { return *wait_token == 0; }))
{
break;
}
}
#endif
}

bool job_pool::job_steal(item& job, token* tok, std::unique_lock<std::mutex>&)
{
for (size_t i = m_queue.size(); i > 0; --i)
{
item& victim = m_queue[i - 1];

if (tok == nullptr || victim.tok == tok)
{
job = std::move(victim);
victim = std::move(m_queue.back());
m_queue.pop_back();

return true;
}
}

return false;
}

void job_pool::job_run(item& job, std::unique_lock<std::mutex>& lock)
{
lock.unlock();

job.fn();

lock.lock();

if (job.tok)
(*job.tok)--;

m_num_pending_jobs--;

m_job_done.notify_all();
}

void job_pool::job_thread(uint32_t index)
{
BASISU_NOTE_UNUSED(index);
Expand Down Expand Up @@ -2376,26 +2397,10 @@ namespace basisu
continue;

// Get the job and execute it.
std::function<void()> job(m_queue.back());
item job = std::move(m_queue.back());
m_queue.pop_back();

++m_num_active_jobs;

lock.unlock();

job();

lock.lock();

--m_num_active_jobs;

// Now check if there are no more jobs remaining.
const bool all_done = m_queue.empty() && !m_num_active_jobs;

lock.unlock();

if (all_done)
m_no_more_jobs.notify_all();
job_run(job, lock);
}

m_num_active_workers.fetch_add(-1);
Expand Down
34 changes: 22 additions & 12 deletions encoder/basisu_enc.h
Original file line number Diff line number Diff line change
Expand Up @@ -804,31 +804,39 @@ namespace basisu
BASISU_NO_EQUALS_OR_COPY_CONSTRUCT(job_pool);

public:
using token = uint32_t;

// num_threads is the TOTAL number of job pool threads, including the calling thread! So 2=1 new thread, 3=2 new threads, etc.
job_pool(uint32_t num_threads);
~job_pool();

void add_job(const std::function<void()>& job);
void add_job(std::function<void()>&& job);

void wait_for_all();
void add_job(std::function<void()> job, token* tok = nullptr);
void wait_for_all(token* tok = nullptr);

size_t get_total_threads() const { return 1 + m_threads.size(); }

private:
struct item
{
std::function<void()> fn;
token* tok;
};

std::vector<std::thread> m_threads;
std::vector<std::function<void()> > m_queue;
std::vector<item> m_queue;

std::mutex m_mutex;
std::condition_variable m_has_work;
std::condition_variable m_no_more_jobs;

uint32_t m_num_active_jobs;

std::atomic<bool> m_kill_flag;
std::condition_variable m_job_done;

uint32_t m_num_pending_jobs;

std::atomic<bool> m_kill_flag;
std::atomic<int> m_num_active_workers;

bool job_steal(item& job, token* tok, std::unique_lock<std::mutex>& lock);
void job_run(item& job, std::unique_lock<std::mutex>& lock);

void job_thread(uint32_t index);
};

Expand Down Expand Up @@ -2076,6 +2084,8 @@ namespace basisu
basisu::vector<uint_vec> local_clusters[cMaxThreads];
basisu::vector<uint_vec> local_parent_clusters[cMaxThreads];

job_pool::token token{0};

for (uint32_t thread_iter = 0; thread_iter < max_threads; thread_iter++)
{
pJob_pool->add_job( [thread_iter, &local_clusters, &local_parent_clusters, &success_flags, &quantizers, &initial_codebook, &q, &limit_clusterizers, &max_codebook_size, &max_threads, &max_parent_codebook_size] {
Expand Down Expand Up @@ -2119,11 +2129,11 @@ namespace basisu
}
}

} );
}, &token);

} // thread_iter

pJob_pool->wait_for_all();
pJob_pool->wait_for_all(&token);

uint32_t total_clusters = 0, total_parent_clusters = 0;

Expand Down
Loading