Geant4-11
Public Types | Public Member Functions | Static Public Member Functions | Protected Member Functions | Static Protected Member Functions | Private Attributes | Static Private Attributes
PTL::ThreadPool Class Reference

#include <ThreadPool.hh>

Public Types

typedef std::function< intmax_t(intmax_t)> affinity_func_t
 
using atomic_bool_type = std::shared_ptr< std::atomic_bool >
 
using atomic_int_type = std::shared_ptr< std::atomic_uintmax_t >
 
typedef std::vector< bool > bool_list_t
 
using condition_t = std::shared_ptr< Condition >
 
typedef std::function< void()> initialize_func_t
 
using lock_t = std::shared_ptr< Mutex >
 
using pool_state_type = std::shared_ptr< std::atomic_short >
 
using size_type = size_t
 
using task_count_type = std::shared_ptr< std::atomic_uintmax_t >
 
using task_pointer = std::shared_ptr< task_type >
 
using task_queue_t = VUserTaskQueue
 
using task_type = VTask
 
using thread_data_t = std::vector< std::shared_ptr< ThreadData > >
 
typedef std::map< ThreadId, uintmax_t > thread_id_map_t
 
typedef std::map< uintmax_t, ThreadIdthread_index_map_t
 
typedef std::deque< ThreadIdthread_list_t
 
using thread_vec_t = std::vector< Thread >
 
template<typename KeyT , typename MappedT , typename HashT = KeyT>
using uomap = std::unordered_map< KeyT, MappedT, std::hash< HashT > >
 

Public Member Functions

size_type add_task (task_pointer &&task, int bin=-1)
 
template<typename ListT >
size_type add_tasks (ListT &)
 
size_type destroy_threadpool ()
 
template<typename FuncT >
void execute_on_all_threads (FuncT &&_func)
 
template<typename FuncT >
void execute_on_specific_threads (const std::set< std::thread::id > &_tid, FuncT &&_func)
 
int get_active_threads_count () const
 
task_queue_tget_queue () const
 
tbb_task_arena_tget_task_arena ()
 
Threadget_thread (size_type _n) const
 
Threadget_thread (std::thread::id id) const
 
task_queue_t *& get_valid_queue (task_queue_t *&) const
 
int get_verbose () const
 
size_type initialize_threadpool (size_type)
 
bool is_alive ()
 
bool is_initialized () const
 
bool is_main () const
 
bool is_tbb_threadpool () const
 
void notify ()
 
void notify (size_type)
 
void notify_all ()
 
ThreadPooloperator= (const ThreadPool &)=delete
 
ThreadPooloperator= (ThreadPool &&)=default
 
void reset_initialization ()
 
void resize (size_type _n)
 
void set_affinity (affinity_func_t f)
 
void set_affinity (intmax_t i, Thread &)
 
void set_initialization (initialize_func_t f)
 
void set_verbose (int n)
 
size_type size () const
 
const pool_state_typestate () const
 
size_type stop_thread ()
 
 ThreadPool (const size_type &pool_size, VUserTaskQueue *task_queue=nullptr, bool _use_affinity=GetEnv< bool >("PTL_CPU_AFFINITY", false), affinity_func_t=[](intmax_t) { static std::atomic< intmax_t > assigned;intmax_t _assign=assigned++;return _assign % Thread::hardware_concurrency();})
 
 ThreadPool (const ThreadPool &)=delete
 
 ThreadPool (ThreadPool &&)=default
 
bool using_affinity () const
 
virtual ~ThreadPool ()
 

Static Public Member Functions

static uintmax_t add_thread_id ()
 
static uintmax_t get_this_thread_id ()
 
static const thread_id_map_tget_thread_ids ()
 
static void set_use_tbb (bool val)
 
static tbb_global_control_t *& tbb_global_control ()
 
static bool using_tbb ()
 

Protected Member Functions

void execute_thread (VUserTaskQueue *)
 
int insert (task_pointer &&, int=-1)
 
void record_entry ()
 
void record_exit ()
 
int run_on_this (task_pointer &&)
 

Static Protected Member Functions

static void start_thread (ThreadPool *, thread_data_t *, intmax_t=-1)
 

Private Attributes

affinity_func_t m_affinity_func
 
atomic_bool_type m_alive_flag = std::make_shared<std::atomic_bool>(false)
 
bool m_delete_task_queue = false
 
initialize_func_t m_init_func = []() {}
 
bool_list_t m_is_joined {}
 
bool_list_t m_is_stopped {}
 
thread_list_t m_main_threads {}
 
ThreadId m_main_tid = ThisThread::get_id()
 
size_type m_pool_size = 0
 
pool_state_type m_pool_state = std::make_shared<std::atomic_short>(0)
 
thread_list_t m_stop_threads {}
 
condition_t m_task_cond = std::make_shared<Condition>()
 
lock_t m_task_lock = std::make_shared<Mutex>()
 
task_queue_tm_task_queue = nullptr
 
tbb_task_arena_tm_tbb_task_arena = nullptr
 
tbb_task_group_tm_tbb_task_group = nullptr
 
bool m_tbb_tp = false
 
atomic_int_type m_thread_active = std::make_shared<std::atomic_uintmax_t>()
 
atomic_int_type m_thread_awake = std::make_shared<std::atomic_uintmax_t>()
 
thread_data_t m_thread_data {}
 
thread_vec_t m_threads {}
 
bool m_use_affinity = false
 
int m_verbose = GetEnv<int>("PTL_VERBOSE", 0)
 

Static Private Attributes

static PTL_DLL thread_id_map_t f_thread_ids
 
static PTL_DLL bool f_use_tbb = false
 

Detailed Description

Definition at line 68 of file ThreadPool.hh.

Member Typedef Documentation

◆ affinity_func_t

typedef std::function<intmax_t(intmax_t)> PTL::ThreadPool::affinity_func_t

Definition at line 95 of file ThreadPool.hh.

◆ atomic_bool_type

using PTL::ThreadPool::atomic_bool_type = std::shared_ptr<std::atomic_bool>

Definition at line 79 of file ThreadPool.hh.

◆ atomic_int_type

using PTL::ThreadPool::atomic_int_type = std::shared_ptr<std::atomic_uintmax_t>

Definition at line 77 of file ThreadPool.hh.

◆ bool_list_t

typedef std::vector<bool> PTL::ThreadPool::bool_list_t

Definition at line 88 of file ThreadPool.hh.

◆ condition_t

using PTL::ThreadPool::condition_t = std::shared_ptr<Condition>

Definition at line 83 of file ThreadPool.hh.

◆ initialize_func_t

Definition at line 94 of file ThreadPool.hh.

◆ lock_t

using PTL::ThreadPool::lock_t = std::shared_ptr<Mutex>

Definition at line 82 of file ThreadPool.hh.

◆ pool_state_type

using PTL::ThreadPool::pool_state_type = std::shared_ptr<std::atomic_short>

Definition at line 78 of file ThreadPool.hh.

◆ size_type

Definition at line 75 of file ThreadPool.hh.

◆ task_count_type

using PTL::ThreadPool::task_count_type = std::shared_ptr<std::atomic_uintmax_t>

Definition at line 76 of file ThreadPool.hh.

◆ task_pointer

using PTL::ThreadPool::task_pointer = std::shared_ptr<task_type>

Definition at line 84 of file ThreadPool.hh.

◆ task_queue_t

Definition at line 85 of file ThreadPool.hh.

◆ task_type

Definition at line 81 of file ThreadPool.hh.

◆ thread_data_t

using PTL::ThreadPool::thread_data_t = std::vector<std::shared_ptr<ThreadData> >

Definition at line 92 of file ThreadPool.hh.

◆ thread_id_map_t

typedef std::map<ThreadId, uintmax_t> PTL::ThreadPool::thread_id_map_t

Definition at line 89 of file ThreadPool.hh.

◆ thread_index_map_t

typedef std::map<uintmax_t, ThreadId> PTL::ThreadPool::thread_index_map_t

Definition at line 90 of file ThreadPool.hh.

◆ thread_list_t

Definition at line 87 of file ThreadPool.hh.

◆ thread_vec_t

using PTL::ThreadPool::thread_vec_t = std::vector<Thread>

Definition at line 91 of file ThreadPool.hh.

◆ uomap

template<typename KeyT , typename MappedT , typename HashT = KeyT>
using PTL::ThreadPool::uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT> >

Definition at line 72 of file ThreadPool.hh.

Constructor & Destructor Documentation

◆ ThreadPool() [1/3]

ThreadPool::ThreadPool ( const size_type pool_size,
VUserTaskQueue task_queue = nullptr,
bool  _use_affinity = GetEnv<bool>("PTL_CPU_AFFINITY", false),
affinity_func_t  _affinity_func = [](intmax_t) { static std::atomic<intmax_t> assigned; intmax_t _assign = assigned++; return _assign % Thread::hardware_concurrency(); } 
)

Definition at line 140 of file ThreadPool.cc.

142: m_use_affinity(_use_affinity)
143, m_pool_state(std::make_shared<std::atomic_short>(thread_pool::state::NONINIT))
144, m_task_queue(task_queue)
145, m_affinity_func(std::move(_affinity_func))
146{
147 auto master_id = get_this_thread_id();
148 if(master_id != 0 && m_verbose > 1)
149 std::cerr << "ThreadPool created on non-master slave" << std::endl;
150
151 thread_data() = new ThreadData(this);
152
153 // initialize after get_this_thread_id so master is zero
154 this->initialize_threadpool(pool_size);
155
156 if(!m_task_queue)
158}
task_queue_t * m_task_queue
Definition: ThreadPool.hh:255
static uintmax_t get_this_thread_id()
Definition: ThreadPool.cc:122
size_type m_pool_size
Definition: ThreadPool.hh:234
pool_state_type m_pool_state
Definition: ThreadPool.hh:237
affinity_func_t m_affinity_func
Definition: ThreadPool.hh:261
size_type initialize_threadpool(size_type)
Definition: ThreadPool.cc:213
static const short NONINIT
Definition: Globals.hh:215

References get_this_thread_id(), initialize_threadpool(), m_pool_size, m_task_queue, m_verbose, and anonymous_namespace{ThreadPool.cc}::thread_data().

◆ ~ThreadPool()

ThreadPool::~ThreadPool ( )
virtual

Definition at line 162 of file ThreadPool.cc.

163{
164 if(m_alive_flag->load())
165 {
166 std::cerr << "Warning! ThreadPool was not properly destroyed! Call "
167 "destroy_threadpool() before deleting the ThreadPool object to "
168 "eliminate this message."
169 << std::endl;
171 m_task_lock->lock();
172 m_task_cond->notify_all();
173 m_task_lock->unlock();
174 for(auto& itr : m_threads)
175 itr.join();
176 m_threads.clear();
177 }
178}
lock_t m_task_lock
Definition: ThreadPool.hh:242
condition_t m_task_cond
Definition: ThreadPool.hh:244
atomic_bool_type m_alive_flag
Definition: ThreadPool.hh:236
thread_vec_t m_threads
Definition: ThreadPool.hh:251
static const short STOPPED
Definition: Globals.hh:214

References m_alive_flag, m_pool_state, m_task_cond, m_task_lock, m_threads, and PTL::thread_pool::state::STOPPED.

◆ ThreadPool() [2/3]

PTL::ThreadPool::ThreadPool ( const ThreadPool )
delete

◆ ThreadPool() [3/3]

PTL::ThreadPool::ThreadPool ( ThreadPool &&  )
default

Member Function Documentation

◆ add_task()

ThreadPool::size_type PTL::ThreadPool::add_task ( task_pointer &&  task,
int  bin = -1 
)
inline

Definition at line 380 of file ThreadPool.hh.

381{
382 // if not native (i.e. TBB) or we haven't built thread-pool, just execute
383 if(m_tbb_tp || !task->is_native_task() || !m_alive_flag->load())
384 return static_cast<size_type>(run_on_this(std::move(task)));
385
386 return static_cast<size_type>(insert(std::move(task), bin));
387}
int insert(task_pointer &&, int=-1)
Definition: ThreadPool.hh:369
int run_on_this(task_pointer &&)
Definition: ThreadPool.hh:351
size_t size_type
Definition: ThreadPool.hh:75

References insert(), m_alive_flag, m_tbb_tp, and run_on_this().

Referenced by PTL::TaskManager::async(), and PTL::TaskManager::exec().

◆ add_tasks()

template<typename ListT >
ThreadPool::size_type PTL::ThreadPool::add_tasks ( ListT &  c)
inline

Definition at line 391 of file ThreadPool.hh.

392{
393 if(!m_alive_flag) // if we haven't built thread-pool, just execute
394 {
395 for(auto& itr : c)
396 run(itr);
397 c.clear();
398 return 0;
399 }
400
401 // TODO: put a limit on how many tasks can be added at most
402 auto c_size = c.size();
403 for(auto& itr : c)
404 {
405 if(!itr->is_native_task())
406 --c_size;
407 else
408 {
409 //++(m_task_queue);
411 }
412 }
413 c.clear();
414
415 // notify sleeping threads
416 notify(c_size);
417
418 return c_size;
419}
task_queue_t *& get_valid_queue(task_queue_t *&) const
Definition: ThreadPool.cc:536
virtual intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) PTL_NO_SANITIZE_THREAD=0
Definition: run.py:1

References get_valid_queue(), PTL::VUserTaskQueue::InsertTask(), m_alive_flag, m_task_queue, and notify().

◆ add_thread_id()

static uintmax_t PTL::ThreadPool::add_thread_id ( )
inlinestatic

Definition at line 191 of file ThreadPool.hh.

192 {
193 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
194 if(!lock.owns_lock())
195 lock.lock();
196 auto _tid = ThisThread::get_id();
197 if(f_thread_ids.find(_tid) == f_thread_ids.end())
198 {
199 auto _idx = f_thread_ids.size();
200 f_thread_ids[_tid] = _idx;
202 }
203 return f_thread_ids.at(_tid);
204 }
static PTL_DLL thread_id_map_t f_thread_ids
Definition: ThreadPool.hh:265
void SetThreadId(int aNewValue)
Definition: Threading.cc:68
TemplateAutoLock< Mutex > AutoLock
Definition: AutoLock.hh:483

References f_thread_ids, and PTL::Threading::SetThreadId().

Referenced by execute_on_all_threads(), and execute_on_specific_threads().

◆ destroy_threadpool()

ThreadPool::size_type ThreadPool::destroy_threadpool ( )

Definition at line 364 of file ThreadPool.cc.

365{
366 // Note: this is not for synchronization, its for thread communication!
367 // destroy_threadpool() will only be called from the main thread, yet
368 // the modified m_pool_state may not show up to other threads until its
369 // modified in a lock!
370 //------------------------------------------------------------------------//
372
373 //--------------------------------------------------------------------//
374 // handle tbb task scheduler
375#if defined(PTL_USE_TBB)
377 {
378 auto _func = [&]() { m_tbb_task_group->wait(); };
381 else
382 _func();
383 delete m_tbb_task_group;
384 m_tbb_task_group = nullptr;
385 }
387 {
388 delete m_tbb_task_arena;
389 m_tbb_task_arena = nullptr;
390 }
392 {
393 tbb_global_control_t*& _global_control = tbb_global_control();
394 delete _global_control;
395 _global_control = nullptr;
396 m_tbb_tp = false;
397 std::cout << "ThreadPool [TBB] destroyed" << std::endl;
398 }
399#endif
400
401 if(!m_alive_flag->load())
402 return 0;
403
404 //------------------------------------------------------------------------//
405 // notify all threads we are shutting down
406 m_task_lock->lock();
407 m_task_cond->notify_all();
408 m_task_lock->unlock();
409 //------------------------------------------------------------------------//
410
411 if(m_is_joined.size() != m_main_threads.size())
412 {
413 std::stringstream ss;
414 ss << " ThreadPool::destroy_thread_pool - boolean is_joined vector "
415 << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
416 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
417
418 throw std::runtime_error(ss.str());
419 }
420
421 for(size_type i = 0; i < m_is_joined.size(); i++)
422 {
423 //--------------------------------------------------------------------//
424 //
425 if(i < m_threads.size())
426 m_threads.at(i).join();
427
428 //--------------------------------------------------------------------//
429 // if its joined already, nothing else needs to be done
430 if(m_is_joined.at(i))
431 continue;
432
433 //--------------------------------------------------------------------//
434 // join
435 if(std::this_thread::get_id() == m_main_threads[i])
436 continue;
437
438 //--------------------------------------------------------------------//
439 // thread id and index
440 auto _tid = m_main_threads[i];
441
442 //--------------------------------------------------------------------//
443 // erase thread from thread ID list
444 if(f_thread_ids.find(_tid) != f_thread_ids.end())
445 f_thread_ids.erase(f_thread_ids.find(_tid));
446
447 //--------------------------------------------------------------------//
448 // it's joined
449 m_is_joined.at(i) = true;
450 }
451
452 m_thread_data.clear();
453 m_threads.clear();
454 m_main_threads.clear();
455 m_is_joined.clear();
456 m_alive_flag->store(false);
457
458 auto start = std::chrono::steady_clock::now();
459 auto elapsed = std::chrono::duration<double>{};
460 // wait maximum of 30 seconds for threads to exit
461 while(m_thread_active->load() > 0 && elapsed.count() < 30)
462 {
463 std::this_thread::sleep_for(std::chrono::milliseconds(50));
464 elapsed = std::chrono::steady_clock::now() - start;
465 }
466
467 auto _active = m_thread_active->load();
468
469 if(get_verbose() >= 0)
470 {
471 if(_active == 0)
472 {
473 std::cout << "ThreadPool destroyed" << std::endl;
474 }
475 else
476 {
477 std::cout << "ThreadPool destroyed but " << _active
478 << " threads might still be active (and cause a termination error)"
479 << std::endl;
480 }
481 }
482
484 {
485 delete m_task_queue;
486 m_task_queue = nullptr;
487 }
488
489 return 0;
490}
thread_list_t m_main_threads
Definition: ThreadPool.hh:249
thread_data_t m_thread_data
Definition: ThreadPool.hh:252
tbb_task_group_t * m_tbb_task_group
Definition: ThreadPool.hh:257
bool_list_t m_is_joined
Definition: ThreadPool.hh:247
static tbb_global_control_t *& tbb_global_control()
Definition: ThreadPool.hh:313
int get_verbose() const
Definition: ThreadPool.hh:182
tbb_task_arena_t * m_tbb_task_arena
Definition: ThreadPool.hh:256
bool m_delete_task_queue
Definition: ThreadPool.hh:232
atomic_int_type m_thread_active
Definition: ThreadPool.hh:239
auto execute(FuncT &&_func) -> decltype(_func())
Definition: ThreadData.hh:111

References PTL::tbb::task_arena::execute(), f_thread_ids, get_verbose(), m_alive_flag, m_delete_task_queue, m_is_joined, m_main_threads, m_pool_state, m_task_cond, m_task_lock, m_task_queue, m_tbb_task_arena, m_tbb_task_group, m_tbb_tp, m_thread_active, m_thread_data, m_threads, PTL::thread_pool::state::STOPPED, tbb_global_control(), and PTL::tbb::task_group::wait().

Referenced by PTL::TaskManager::finalize(), PTL::TaskRunManager::Terminate(), and G4TaskRunManager::~G4TaskRunManager().

◆ execute_on_all_threads()

template<typename FuncT >
void PTL::ThreadPool::execute_on_all_threads ( FuncT &&  _func)
inline

Definition at line 423 of file ThreadPool.hh.

424{
426 {
427#if defined(PTL_USE_TBB)
428 // TBB lazily activates threads to process tasks and the main thread
429 // participates in processing the tasks so getting a specific
430 // function to execute only on the worker threads requires some trickery
431 //
432 std::set<std::thread::id> _first{};
433 Mutex _mutex{};
434 // init function which executes function and returns 1 only once
435 auto _init = [&]() {
436 int _once = 0;
437 _mutex.lock();
438 if(_first.find(std::this_thread::get_id()) == _first.end())
439 {
440 // we need to reset this thread-local static for multiple invocations
441 // of the same template instantiation
442 _once = 1;
443 _first.insert(std::this_thread::get_id());
444 }
445 _mutex.unlock();
446 if(_once != 0)
447 {
448 _func();
449 return 1;
450 }
451 return 0;
452 };
453 // this will collect the number of threads which have
454 // executed the _init function above
455 std::atomic<size_t> _total_init{ 0 };
456 // max parallelism by TBB
457 size_t _maxp = tbb_global_control()->active_value(
459 // create a task arean
460 auto _arena = get_task_arena();
461 // size of the thread-pool
462 size_t _sz = size();
463 // number of cores
464 size_t _ncore = Threading::GetNumberOfCores();
465 // maximum depth for recursion
466 size_t _dmax = std::max<size_t>(_ncore, 4);
467 // how many threads we need to initialize
468 size_t _num = std::min(_maxp, std::min(_sz, _ncore));
469 // this is the task passed to the task-group
470 std::function<void()> _init_task;
471 _init_task = [&]() {
473 static thread_local size_type _depth = 0;
474 int _ret = 0;
475 // don't let the main thread execute the function
476 if(!is_main())
477 {
478 // execute the function
479 _ret = _init();
480 // add the result
481 _total_init += _ret;
482 }
483 // if the function did not return anything, recursively execute
484 // two more tasks
485 ++_depth;
486 if(_ret == 0 && _depth < _dmax && _total_init.load() < _num)
487 {
488 tbb::task_group tg{};
489 tg.run([&]() { _init_task(); });
490 tg.run([&]() { _init_task(); });
491 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
492 tg.wait();
493 }
494 --_depth;
495 };
496
497 // TBB won't oversubscribe so we need to limit by ncores - 1
498 size_t nitr = 0;
499 auto _fname = __FUNCTION__;
500 auto _write_info = [&]() {
501 std::cout << "[" << _fname << "]> Total initalized: " << _total_init
502 << ", expected: " << _num << ", max-parallel: " << _maxp
503 << ", size: " << _sz << ", ncore: " << _ncore << std::endl;
504 };
505 while(_total_init < _num)
506 {
507 auto _n = 2 * _num;
508 while(--_n > 0)
509 {
510 _arena->execute(
511 [&]() { m_tbb_task_group->run([&]() { _init_task(); }); });
512 }
513 _arena->execute([&]() { m_tbb_task_group->wait(); });
514 // don't loop infinitely but use a strict condition
515 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num)
516 {
517 _write_info();
518 break;
519 }
520 // at this point we need to exit
521 if(nitr > 4 * (_ncore + 1))
522 {
523 _write_info();
524 break;
525 }
526 }
527 if(get_verbose() > 3)
528 _write_info();
529#endif
530 }
531 else if(get_queue())
532 {
533 get_queue()->ExecuteOnAllThreads(this, std::forward<FuncT>(_func));
534 }
535}
G4double(* function)(G4double)
bool is_main() const
Definition: ThreadPool.hh:183
task_queue_t * get_queue() const
Definition: ThreadPool.hh:127
tbb_task_arena_t * get_task_arena()
Definition: ThreadPool.hh:321
size_type size() const
Definition: ThreadPool.hh:163
static uintmax_t add_thread_id()
Definition: ThreadPool.hh:191
virtual void ExecuteOnAllThreads(ThreadPool *tp, function_type f)=0
static size_t active_value(parameter param)
void run(FuncT f)
Definition: ThreadData.hh:65
T min(const T t1, const T t2)
brief Return the smallest of the two arguments
unsigned GetNumberOfCores()
Definition: Threading.cc:60
std::mutex Mutex
Definition: Threading.hh:77

References PTL::tbb::global_control::active_value(), add_thread_id(), PTL::VUserTaskQueue::ExecuteOnAllThreads(), get_queue(), get_task_arena(), get_verbose(), PTL::Threading::GetNumberOfCores(), is_main(), m_tbb_task_group, m_tbb_tp, PTL::tbb::global_control::max_allowed_parallelism, G4INCL::Math::min(), PTL::tbb::task_group::run(), size(), tbb_global_control(), and PTL::tbb::task_group::wait().

Referenced by G4TaskRunManager::CreateAndStartWorkers(), G4TaskRunManager::RequestWorkersProcessCommandsStack(), G4TaskRunManager::TerminateWorkers(), and G4TaskRunManager::WaitForEndEventLoopWorkers().

◆ execute_on_specific_threads()

template<typename FuncT >
void PTL::ThreadPool::execute_on_specific_threads ( const std::set< std::thread::id > &  _tid,
FuncT &&  _func 
)
inline

Definition at line 541 of file ThreadPool.hh.

543{
545 {
546#if defined(PTL_USE_TBB)
547 // TBB lazily activates threads to process tasks and the main thread
548 // participates in processing the tasks so getting a specific
549 // function to execute only on the worker threads requires some trickery
550 //
551 std::set<std::thread::id> _first{};
552 Mutex _mutex{};
553 // init function which executes function and returns 1 only once
554 auto _exec = [&]() {
555 int _once = 0;
556 _mutex.lock();
557 if(_first.find(std::this_thread::get_id()) == _first.end())
558 {
559 // we need to reset this thread-local static for multiple invocations
560 // of the same template instantiation
561 _once = 1;
562 _first.insert(std::this_thread::get_id());
563 }
564 _mutex.unlock();
565 if(_once != 0)
566 {
567 _func();
568 return 1;
569 }
570 return 0;
571 };
572 // this will collect the number of threads which have
573 // executed the _exec function above
574 std::atomic<size_t> _total_exec{ 0 };
575 // number of cores
576 size_t _ncore = Threading::GetNumberOfCores();
577 // maximum depth for recursion
578 size_t _dmax = std::max<size_t>(_ncore, 4);
579 // how many threads we need to initialize
580 size_t _num = _tids.size();
581 // create a task arena
582 auto _arena = get_task_arena();
583 // this is the task passed to the task-group
584 std::function<void()> _exec_task;
585 _exec_task = [&]() {
587 static thread_local size_type _depth = 0;
588 int _ret = 0;
589 auto _this_tid = std::this_thread::get_id();
590 // don't let the main thread execute the function
591 if(_tids.count(_this_tid) > 0)
592 {
593 // execute the function
594 _ret = _exec();
595 // add the result
596 _total_exec += _ret;
597 }
598 // if the function did not return anything, recursively execute
599 // two more tasks
600 ++_depth;
601 if(_ret == 0 && _depth < _dmax && _total_exec.load() < _num)
602 {
603 tbb::task_group tg{};
604 tg.run([&]() { _exec_task(); });
605 tg.run([&]() { _exec_task(); });
606 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
607 tg.wait();
608 }
609 --_depth;
610 };
611
612 // TBB won't oversubscribe so we need to limit by ncores - 1
613 size_t nitr = 0;
614 auto _fname = __FUNCTION__;
615 auto _write_info = [&]() {
616 std::cout << "[" << _fname << "]> Total executed: " << _total_exec
617 << ", expected: " << _num << ", size: " << size() << std::endl;
618 };
619 while(_total_exec < _num)
620 {
621 auto _n = 2 * _num;
622 while(--_n > 0)
623 {
624 _arena->execute(
625 [&]() { m_tbb_task_group->run([&]() { _exec_task(); }); });
626 }
627 _arena->execute([&]() { m_tbb_task_group->wait(); });
628 // don't loop infinitely but use a strict condition
629 if(nitr++ > 2 * (_num + 1) && (_total_exec - 1) == _num)
630 {
631 _write_info();
632 break;
633 }
634 // at this point we need to exit
635 if(nitr > 8 * (_num + 1))
636 {
637 _write_info();
638 break;
639 }
640 }
641 if(get_verbose() > 3)
642 _write_info();
643#endif
644 }
645 else if(get_queue())
646 {
647 get_queue()->ExecuteOnSpecificThreads(_tids, this, std::forward<FuncT>(_func));
648 }
649}
virtual void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f)=0

References add_thread_id(), PTL::VUserTaskQueue::ExecuteOnSpecificThreads(), get_queue(), get_task_arena(), get_verbose(), PTL::Threading::GetNumberOfCores(), m_tbb_task_group, m_tbb_tp, PTL::tbb::task_group::run(), size(), and PTL::tbb::task_group::wait().

◆ execute_thread()

void ThreadPool::execute_thread ( VUserTaskQueue _task_queue)
protected

Definition at line 546 of file ThreadPool.cc.

547{
548 // how long the thread waits on condition variable
549 // static int wait_time = GetEnv<int>("PTL_POOL_WAIT_TIME", 5);
550
551 ++(*m_thread_awake);
552
553 // initialization function
554 m_init_func();
555
556 ThreadId tid = ThisThread::get_id();
557 ThreadData* data = thread_data();
558 // auto thread_bin = _task_queue->GetThreadBin();
559 // auto workers = _task_queue->workers();
560
561 auto start = std::chrono::steady_clock::now();
562 auto elapsed = std::chrono::duration<double>{};
563 // check for updates for 60 seconds max
564 while(!_task_queue && elapsed.count() < 60)
565 {
566 elapsed = std::chrono::steady_clock::now() - start;
567 data->update();
568 _task_queue = data->current_queue;
569 }
570
571 if(!_task_queue)
572 {
573 --(*m_thread_awake);
574 throw std::runtime_error("No task queue was found after 60 seconds!");
575 }
576
577 assert(data->current_queue != nullptr);
578 assert(_task_queue == data->current_queue);
579
580 // essentially a dummy run
581 if(_task_queue)
582 {
583 data->within_task = true;
584 auto _task = _task_queue->GetTask();
585 if(_task)
586 {
587 (*_task)();
588 }
589 data->within_task = false;
590 }
591
592 // threads stay in this loop forever until thread-pool destroyed
593 while(true)
594 {
595 static thread_local auto p_task_lock = m_task_lock;
596
597 //--------------------------------------------------------------------//
598 // Try to pick a task
599 AutoLock _task_lock(*p_task_lock, std::defer_lock);
600 //--------------------------------------------------------------------//
601
602 auto leave_pool = [&]() {
603 auto _state = [&]() { return static_cast<int>(m_pool_state->load()); };
604 auto _pool_state = _state();
605 if(_pool_state > 0)
606 {
607 // stop whole pool
608 if(_pool_state == thread_pool::state::STOPPED)
609 {
610 if(_task_lock.owns_lock())
611 _task_lock.unlock();
612 return true;
613 }
614 // single thread stoppage
615 else if(_pool_state == thread_pool::state::PARTIAL) // NOLINT
616 {
617 if(!_task_lock.owns_lock())
618 _task_lock.lock();
619 if(!m_is_stopped.empty() && m_is_stopped.back())
620 {
621 m_stop_threads.push_back(tid);
622 m_is_stopped.pop_back();
623 if(_task_lock.owns_lock())
624 _task_lock.unlock();
625 // exit entire function
626 return true;
627 }
628 if(_task_lock.owns_lock())
629 _task_lock.unlock();
630 }
631 }
632 return false;
633 };
634
635 // We need to put condition.wait() in a loop for two reasons:
636 // 1. There can be spurious wake-ups (due to signal/ENITR)
637 // 2. When mutex is released for waiting, another thread can be woken up
638 // from a signal/broadcast and that thread can mess up the condition.
639 // So when the current thread wakes up the condition may no longer be
640 // actually true!
641 while(_task_queue->empty())
642 {
643 auto _state = [&]() { return static_cast<int>(m_pool_state->load()); };
644 auto _size = [&]() { return _task_queue->true_size(); };
645 auto _empty = [&]() { return _task_queue->empty(); };
646 auto _wake = [&]() { return (!_empty() || _size() > 0 || _state() > 0); };
647
648 if(leave_pool())
649 return;
650
651 if(_task_queue->true_size() == 0)
652 {
653 if(m_thread_awake && m_thread_awake->load() > 0)
654 --(*m_thread_awake);
655
656 // lock before sleeping on condition
657 if(!_task_lock.owns_lock())
658 _task_lock.lock();
659
660 // Wait until there is a task in the queue
661 // Unlocks mutex while waiting, then locks it back when signaled
662 // use lambda to control waking
663 m_task_cond->wait(_task_lock, _wake);
664
665 if(_state() == thread_pool::state::STOPPED)
666 return;
667
668 // unlock if owned
669 if(_task_lock.owns_lock())
670 _task_lock.unlock();
671
672 // notify that is awake
674 ++(*m_thread_awake);
675 }
676 else
677 break;
678 }
679
680 // release the lock
681 if(_task_lock.owns_lock())
682 _task_lock.unlock();
683
684 //----------------------------------------------------------------//
685
686 // leave pool if conditions dictate it
687 if(leave_pool())
688 return;
689
690 // activate guard against recursive deadlock
691 data->within_task = true;
692 //----------------------------------------------------------------//
693
694 // execute the task(s)
695 while(!_task_queue->empty())
696 {
697 auto _task = _task_queue->GetTask();
698 if(_task)
699 {
700 (*_task)();
701 }
702 }
703 //----------------------------------------------------------------//
704
705 // disable guard against recursive deadlock
706 data->within_task = false;
707 //----------------------------------------------------------------//
708 }
709}
VUserTaskQueue * current_queue
Definition: ThreadData.hh:148
initialize_func_t m_init_func
Definition: ThreadPool.hh:260
thread_list_t m_stop_threads
Definition: ThreadPool.hh:250
atomic_int_type m_thread_awake
Definition: ThreadPool.hh:238
bool_list_t m_is_stopped
Definition: ThreadPool.hh:248
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1)=0
virtual size_type true_size() const
virtual bool empty() const =0
static const short PARTIAL
Definition: Globals.hh:213
Thread::id ThreadId
Definition: Threading.hh:139

References PTL::ThreadData::current_queue, PTL::VUserTaskQueue::empty(), PTL::VUserTaskQueue::GetTask(), m_init_func, m_is_stopped, m_pool_size, m_pool_state, m_stop_threads, m_task_cond, m_task_lock, m_thread_awake, PTL::thread_pool::state::PARTIAL, PTL::thread_pool::state::STOPPED, anonymous_namespace{ThreadPool.cc}::thread_data(), PTL::VUserTaskQueue::true_size(), PTL::ThreadData::update(), and PTL::ThreadData::within_task.

◆ get_active_threads_count()

int PTL::ThreadPool::get_active_threads_count ( ) const
inline

Definition at line 173 of file ThreadPool.hh.

174 {
175 return (m_thread_awake) ? m_thread_awake->load() : 0;
176 }

References m_thread_awake.

◆ get_queue()

task_queue_t * PTL::ThreadPool::get_queue ( ) const
inline

Definition at line 127 of file ThreadPool.hh.

127{ return m_task_queue; }

References m_task_queue.

Referenced by execute_on_all_threads(), execute_on_specific_threads(), and PTL::ThreadData::update().

◆ get_task_arena()

tbb_task_arena_t * PTL::ThreadPool::get_task_arena ( )
inline

Definition at line 321 of file ThreadPool.hh.

322{
323#if defined(PTL_USE_TBB)
324 // create a task arena
326 {
327 auto _sz = (tbb_global_control())
330 : size();
331 m_tbb_task_arena = new tbb_task_arena_t(::tbb::task_arena::attach{});
333 }
334#else
337#endif
338 return m_tbb_task_arena;
339}
void initialize(int max_concurrency=automatic, unsigned reserved_for_masters=1)
tbb::task_arena tbb_task_arena_t
Definition: ThreadData.hh:121

References PTL::tbb::global_control::active_value(), PTL::tbb::task_arena::initialize(), m_tbb_task_arena, PTL::tbb::global_control::max_allowed_parallelism, size(), and tbb_global_control().

Referenced by execute_on_all_threads(), execute_on_specific_threads(), and run_on_this().

◆ get_this_thread_id()

uintmax_t ThreadPool::get_this_thread_id ( )
static

Definition at line 122 of file ThreadPool.cc.

123{
124 auto _tid = ThisThread::get_id();
125 {
126 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
127 if(!lock.owns_lock())
128 lock.lock();
129 if(f_thread_ids.find(_tid) == f_thread_ids.end())
130 {
131 auto _idx = f_thread_ids.size();
132 f_thread_ids[_tid] = _idx;
133 }
134 }
135 return f_thread_ids[_tid];
136}

References f_thread_ids.

Referenced by PTL::UserTaskQueue::GetThreadBin(), initialize_threadpool(), G4TaskRunManagerKernel::InitializeWorker(), ThreadPool(), and PTL::UserTaskQueue::UserTaskQueue().

◆ get_thread() [1/2]

Thread * PTL::ThreadPool::get_thread ( size_type  _n) const

◆ get_thread() [2/2]

Thread * PTL::ThreadPool::get_thread ( std::thread::id  id) const

◆ get_thread_ids()

const ThreadPool::thread_id_map_t & ThreadPool::get_thread_ids ( )
static

Definition at line 114 of file ThreadPool.cc.

115{
116 return f_thread_ids;
117}

References f_thread_ids.

◆ get_valid_queue()

ThreadPool::task_queue_t *& ThreadPool::get_valid_queue ( task_queue_t *&  _queue) const

Definition at line 536 of file ThreadPool.cc.

537{
538 if(!_queue)
539 _queue = new UserTaskQueue{ static_cast<intmax_t>(m_pool_size) };
540 return _queue;
541}

References m_pool_size.

Referenced by add_tasks(), and insert().

◆ get_verbose()

int PTL::ThreadPool::get_verbose ( ) const
inline

Definition at line 182 of file ThreadPool.hh.

182{ return m_verbose; }

References m_verbose.

Referenced by destroy_threadpool(), execute_on_all_threads(), and execute_on_specific_threads().

◆ initialize_threadpool()

ThreadPool::size_type ThreadPool::initialize_threadpool ( size_type  proposed_size)

Definition at line 213 of file ThreadPool.cc.

214{
215 //--------------------------------------------------------------------//
216 // return before initializing
217 if(proposed_size < 1)
218 return 0;
219
220 //--------------------------------------------------------------------//
221 // store that has been started
222 if(!m_alive_flag->load())
224
225#if defined(PTL_USE_TBB)
226 //--------------------------------------------------------------------//
227 // handle tbb task scheduler
228 if(f_use_tbb || m_tbb_tp)
229 {
230 m_tbb_tp = true;
231 m_pool_size = proposed_size;
232 tbb_global_control_t*& _global_control = tbb_global_control();
233 // delete if wrong size
234 if(m_pool_size != proposed_size)
235 {
236 delete _global_control;
237 _global_control = nullptr;
238 }
239
240 if(!_global_control)
241 {
242 _global_control = new tbb_global_control_t(
244 if(m_verbose > 0)
245 {
246 std::cout << "ThreadPool [TBB] initialized with " << m_pool_size
247 << " threads." << std::endl;
248 }
249 }
250
251 // create task group (used for async)
254 return m_pool_size;
255 }
256#endif
257
258 m_alive_flag->store(true);
259
260 //--------------------------------------------------------------------//
261 // if started, stop some thread if smaller or return if equal
263 {
264 if(m_pool_size > proposed_size)
265 {
266 while(stop_thread() > proposed_size)
267 ;
268 if(m_verbose > 0)
269 {
270 std::cout << "ThreadPool initialized with " << m_pool_size << " threads."
271 << std::endl;
272 }
273 if(!m_task_queue)
274 {
275 m_delete_task_queue = true;
277 }
278 return m_pool_size;
279 }
280 else if(m_pool_size == proposed_size) // NOLINT
281 {
282 if(m_verbose > 0)
283 {
284 std::cout << "ThreadPool initialized with " << m_pool_size << " threads."
285 << std::endl;
286 }
287 if(!m_task_queue)
288 {
289 m_delete_task_queue = true;
291 }
292 return m_pool_size;
293 }
294 }
295
296 //--------------------------------------------------------------------//
297 // reserve enough space to prevent realloc later
298 {
299 AutoLock _task_lock(*m_task_lock);
300 m_is_joined.reserve(proposed_size);
301 }
302
303 if(!m_task_queue)
304 m_task_queue = new UserTaskQueue(proposed_size);
305
306 auto this_tid = get_this_thread_id();
307 for(size_type i = m_pool_size; i < proposed_size; ++i)
308 {
309 // add the threads
310 try
311 {
312 // create thread
314 this_tid + i + 1 };
315 // only reaches here if successful creation of thread
316 ++m_pool_size;
317 // store thread
318 m_main_threads.push_back(thr.get_id());
319 // list of joined thread booleans
320 m_is_joined.push_back(false);
321 // set the affinity
323 set_affinity(i, thr);
324 // store
325 m_threads.emplace_back(std::move(thr));
326 } catch(std::runtime_error& e)
327 {
328 std::cerr << e.what() << std::endl; // issue creating thread
329 continue;
330 } catch(std::bad_alloc& e)
331 {
332 std::cerr << e.what() << std::endl;
333 continue;
334 }
335 }
336 //------------------------------------------------------------------------//
337
338 AutoLock _task_lock(*m_task_lock);
339
340 // thread pool size doesn't match with join vector
341 // this will screw up joining later
342 if(m_is_joined.size() != m_main_threads.size())
343 {
344 std::stringstream ss;
345 ss << "ThreadPool::initialize_threadpool - boolean is_joined vector "
346 << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
347 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
348
349 throw std::runtime_error(ss.str());
350 }
351
352 if(m_verbose > 0)
353 {
354 std::cout << "ThreadPool initialized with " << m_pool_size << " threads."
355 << std::endl;
356 }
357
358 return m_main_threads.size();
359}
static void start_thread(ThreadPool *, thread_data_t *, intmax_t=-1)
Definition: ThreadPool.cc:72
void set_affinity(affinity_func_t f)
Definition: ThreadPool.hh:178
size_type stop_thread()
Definition: ThreadPool.cc:495
static PTL_DLL bool f_use_tbb
Definition: ThreadPool.hh:266
static const short STARTED
Definition: Globals.hh:212
tbb::task_group tbb_task_group_t
Definition: ThreadData.hh:120
tbb::global_control tbb_global_control_t
Definition: ThreadData.hh:119
std::thread Thread
Definition: Threading.hh:128

References f_use_tbb, get_this_thread_id(), m_alive_flag, m_delete_task_queue, m_is_joined, m_main_threads, m_pool_size, m_pool_state, m_task_lock, m_task_queue, m_tbb_task_group, m_tbb_tp, m_thread_data, m_threads, m_use_affinity, m_verbose, PTL::tbb::global_control::max_allowed_parallelism, set_affinity(), start_thread(), PTL::thread_pool::state::STARTED, stop_thread(), and tbb_global_control().

Referenced by resize(), and ThreadPool().

◆ insert()

int PTL::ThreadPool::insert ( task_pointer &&  task,
int  bin = -1 
)
inlineprotected

Definition at line 369 of file ThreadPool.hh.

370{
371 static thread_local ThreadData* _data = ThreadData::GetInstance();
372
373 // pass the task to the queue
374 auto ibin = get_valid_queue(m_task_queue)->InsertTask(std::move(task), _data, bin);
375 notify();
376 return ibin;
377}
static ThreadData *& GetInstance()
Definition: ThreadData.cc:35

References get_valid_queue(), PTL::ThreadData::GetInstance(), PTL::VUserTaskQueue::InsertTask(), m_task_queue, and notify().

Referenced by add_task().

◆ is_alive()

bool PTL::ThreadPool::is_alive ( )
inline

Definition at line 168 of file ThreadPool.hh.

168{ return m_alive_flag->load(); }

References m_alive_flag.

◆ is_initialized()

bool ThreadPool::is_initialized ( ) const

Definition at line 183 of file ThreadPool.cc.

184{
185 return !(m_pool_state->load() == thread_pool::state::NONINIT);
186}

References m_pool_state, and PTL::thread_pool::state::NONINIT.

◆ is_main()

bool PTL::ThreadPool::is_main ( ) const
inline

Definition at line 183 of file ThreadPool.hh.

183{ return ThisThread::get_id() == m_main_tid; }
ThreadId m_main_tid
Definition: ThreadPool.hh:235

References m_main_tid.

Referenced by execute_on_all_threads().

◆ is_tbb_threadpool()

bool PTL::ThreadPool::is_tbb_threadpool ( ) const
inline

Definition at line 130 of file ThreadPool.hh.

130{ return m_tbb_tp; }

References m_tbb_tp.

Referenced by G4TaskRunManager::InitializeThreadPool().

◆ notify() [1/2]

void PTL::ThreadPool::notify ( )
inline

Definition at line 271 of file ThreadPool.hh.

272{
273 // wake up one thread that is waiting for a task to be available
275 {
277 m_task_cond->notify_one();
278 }
279}

References m_pool_size, m_task_cond, m_task_lock, and m_thread_awake.

Referenced by add_tasks(), and insert().

◆ notify() [2/2]

void PTL::ThreadPool::notify ( size_type  ntasks)
inline

Definition at line 290 of file ThreadPool.hh.

291{
292 if(ntasks == 0)
293 return;
294
295 // wake up as many threads that tasks just added
297 {
299 if(ntasks < this->size())
300 {
301 for(size_type i = 0; i < ntasks; ++i)
302 m_task_cond->notify_one();
303 }
304 else
305 {
306 m_task_cond->notify_all();
307 }
308 }
309}

References m_pool_size, m_task_cond, m_task_lock, m_thread_awake, and size().

◆ notify_all()

void PTL::ThreadPool::notify_all ( )
inline

Definition at line 282 of file ThreadPool.hh.

283{
284 // wake all threads
286 m_task_cond->notify_all();
287}

References m_task_cond, and m_task_lock.

◆ operator=() [1/2]

ThreadPool & PTL::ThreadPool::operator= ( const ThreadPool )
delete

◆ operator=() [2/2]

ThreadPool & PTL::ThreadPool::operator= ( ThreadPool &&  )
default

◆ record_entry()

void PTL::ThreadPool::record_entry ( )
inlineprotected

Definition at line 215 of file ThreadPool.hh.

216 {
218 ++(*m_thread_active);
219 }

References m_thread_active.

◆ record_exit()

void PTL::ThreadPool::record_exit ( )
inlineprotected

Definition at line 221 of file ThreadPool.hh.

222 {
224 --(*m_thread_active);
225 }

References m_thread_active.

◆ reset_initialization()

void PTL::ThreadPool::reset_initialization ( )
inline

Definition at line 153 of file ThreadPool.hh.

154 {
155 auto f = []() {};
156 m_init_func = f;
157 }

References m_init_func.

◆ resize()

void PTL::ThreadPool::resize ( size_type  _n)
inline

Definition at line 342 of file ThreadPool.hh.

343{
344 if(_n == m_pool_size)
345 return;
347 m_task_queue->resize(static_cast<intmax_t>(_n));
348}
virtual void resize(intmax_t)=0

References initialize_threadpool(), m_pool_size, m_task_queue, and PTL::VUserTaskQueue::resize().

Referenced by PTL::TaskRunManager::Initialize(), and G4TaskRunManager::SetNumberOfThreads().

◆ run_on_this()

int PTL::ThreadPool::run_on_this ( task_pointer &&  _task)
inlineprotected

Definition at line 351 of file ThreadPool.hh.

352{
353 auto&& _func = [_task]() { (*_task)(); };
354
356 {
357 auto _arena = get_task_arena();
358 _arena->execute([this, _func]() { this->m_tbb_task_group->run(_func); });
359 }
360 else
361 {
362 _func();
363 }
364 // return the number of tasks added to task-list
365 return 0;
366}

References get_task_arena(), m_tbb_task_group, m_tbb_tp, and PTL::tbb::task_group::run().

Referenced by add_task().

◆ set_affinity() [1/2]

void PTL::ThreadPool::set_affinity ( affinity_func_t  f)
inline

Definition at line 178 of file ThreadPool.hh.

178{ m_affinity_func = f; }

References m_affinity_func.

Referenced by initialize_threadpool().

◆ set_affinity() [2/2]

void ThreadPool::set_affinity ( intmax_t  i,
Thread _thread 
)

Definition at line 191 of file ThreadPool.cc.

192{
193 try
194 {
195 NativeThread native_thread = _thread.native_handle();
196 intmax_t _pin = m_affinity_func(i);
197 if(m_verbose > 0)
198 {
199 std::cout << "Setting pin affinity for thread " << _thread.get_id() << " to "
200 << _pin << std::endl;
201 }
202 Threading::SetPinAffinity(_pin, native_thread);
203 } catch(std::runtime_error& e)
204 {
205 std::cout << "Error setting pin affinity" << std::endl;
206 std::cerr << e.what() << std::endl; // issue assigning affinity
207 }
208}
bool SetPinAffinity(int idx, NativeThread &at)
Definition: Threading.cc:82
std::thread::native_handle_type NativeThread
Definition: Threading.hh:129

References m_affinity_func, m_verbose, and PTL::Threading::SetPinAffinity().

◆ set_initialization()

void PTL::ThreadPool::set_initialization ( initialize_func_t  f)
inline

Definition at line 152 of file ThreadPool.hh.

152{ m_init_func = f; }

References m_init_func.

◆ set_use_tbb()

void ThreadPool::set_use_tbb ( bool  val)
static

Definition at line 102 of file ThreadPool.cc.

103{
104#if defined(PTL_USE_TBB)
105 f_use_tbb = enable;
106#else
107 ConsumeParameters<bool>(enable);
108#endif
109}

References f_use_tbb.

Referenced by G4TaskRunManager::G4TaskRunManager(), and PTL::TaskRunManager::TaskRunManager().

◆ set_verbose()

void PTL::ThreadPool::set_verbose ( int  n)
inline

Definition at line 181 of file ThreadPool.hh.

References m_verbose, and CLHEP::detail::n.

◆ size()

size_type PTL::ThreadPool::size ( ) const
inline

◆ start_thread()

void ThreadPool::start_thread ( ThreadPool tp,
thread_data_t _data,
intmax_t  _idx = -1 
)
staticprotected

Definition at line 72 of file ThreadPool.cc.

73{
74 auto _thr_data = std::make_shared<ThreadData>(tp);
75 {
76 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
77 if(!lock.owns_lock())
78 lock.lock();
79 if(_idx < 0)
80 _idx = f_thread_ids.size();
81 f_thread_ids[std::this_thread::get_id()] = _idx;
83 _data->emplace_back(_thr_data);
84 }
85 thread_data() = _thr_data.get();
86 tp->record_entry();
87 tp->execute_thread(thread_data()->current_queue);
88 tp->record_exit();
89}

References f_thread_ids, PTL::Threading::SetThreadId(), anonymous_namespace{ThreadPool.cc}::thread_data(), and G4InuclParticleNames::tp.

Referenced by initialize_threadpool().

◆ state()

const pool_state_type & PTL::ThreadPool::state ( ) const
inline

Definition at line 161 of file ThreadPool.hh.

161{ return m_pool_state; }

References m_pool_state.

◆ stop_thread()

ThreadPool::size_type ThreadPool::stop_thread ( )

Definition at line 495 of file ThreadPool.cc.

496{
497 if(!m_alive_flag->load() || m_pool_size == 0)
498 return 0;
499
500 //------------------------------------------------------------------------//
501 // notify all threads we are shutting down
502 m_task_lock->lock();
503 m_is_stopped.push_back(true);
504 m_task_cond->notify_one();
505 m_task_lock->unlock();
506 //------------------------------------------------------------------------//
507
508 // lock up the task queue
509 AutoLock _task_lock(*m_task_lock);
510
511 while(!m_stop_threads.empty())
512 {
513 auto tid = m_stop_threads.front();
514 // remove from stopped
515 m_stop_threads.pop_front();
516 // remove from main
517 for(auto itr = m_main_threads.begin(); itr != m_main_threads.end(); ++itr)
518 {
519 if(*itr == tid)
520 {
521 m_main_threads.erase(itr);
522 break;
523 }
524 }
525 // remove from join list
526 m_is_joined.pop_back();
527 }
528
530 return m_main_threads.size();
531}

References m_alive_flag, m_is_joined, m_is_stopped, m_main_threads, m_pool_size, m_stop_threads, m_task_cond, and m_task_lock.

Referenced by initialize_threadpool().

◆ tbb_global_control()

tbb_global_control_t *& PTL::ThreadPool::tbb_global_control ( )
inlinestatic

Definition at line 313 of file ThreadPool.hh.

314{
315 static thread_local tbb_global_control_t* _instance = nullptr;
316 return _instance;
317}

Referenced by destroy_threadpool(), execute_on_all_threads(), get_task_arena(), and initialize_threadpool().

◆ using_affinity()

bool PTL::ThreadPool::using_affinity ( ) const
inline

Definition at line 167 of file ThreadPool.hh.

167{ return m_use_affinity; }

References m_use_affinity.

◆ using_tbb()

bool ThreadPool::using_tbb ( )
static

Definition at line 94 of file ThreadPool.cc.

95{
96 return f_use_tbb;
97}

References f_use_tbb.

Referenced by PTL::TaskRunManager::Initialize().

Field Documentation

◆ f_thread_ids

ThreadPool::thread_id_map_t ThreadPool::f_thread_ids
staticprivate

◆ f_use_tbb

bool ThreadPool::f_use_tbb = false
staticprivate

Definition at line 266 of file ThreadPool.hh.

Referenced by initialize_threadpool(), set_use_tbb(), and using_tbb().

◆ m_affinity_func

affinity_func_t PTL::ThreadPool::m_affinity_func
private

Definition at line 261 of file ThreadPool.hh.

Referenced by set_affinity().

◆ m_alive_flag

atomic_bool_type PTL::ThreadPool::m_alive_flag = std::make_shared<std::atomic_bool>(false)
private

◆ m_delete_task_queue

bool PTL::ThreadPool::m_delete_task_queue = false
private

Definition at line 232 of file ThreadPool.hh.

Referenced by destroy_threadpool(), and initialize_threadpool().

◆ m_init_func

initialize_func_t PTL::ThreadPool::m_init_func = []() {}
private

Definition at line 260 of file ThreadPool.hh.

Referenced by execute_thread(), reset_initialization(), and set_initialization().

◆ m_is_joined

bool_list_t PTL::ThreadPool::m_is_joined {}
private

Definition at line 247 of file ThreadPool.hh.

Referenced by destroy_threadpool(), initialize_threadpool(), and stop_thread().

◆ m_is_stopped

bool_list_t PTL::ThreadPool::m_is_stopped {}
private

Definition at line 248 of file ThreadPool.hh.

Referenced by execute_thread(), and stop_thread().

◆ m_main_threads

thread_list_t PTL::ThreadPool::m_main_threads {}
private

Definition at line 249 of file ThreadPool.hh.

Referenced by destroy_threadpool(), initialize_threadpool(), and stop_thread().

◆ m_main_tid

ThreadId PTL::ThreadPool::m_main_tid = ThisThread::get_id()
private

Definition at line 235 of file ThreadPool.hh.

Referenced by is_main().

◆ m_pool_size

size_type PTL::ThreadPool::m_pool_size = 0
private

◆ m_pool_state

pool_state_type PTL::ThreadPool::m_pool_state = std::make_shared<std::atomic_short>(0)
private

◆ m_stop_threads

thread_list_t PTL::ThreadPool::m_stop_threads {}
private

Definition at line 250 of file ThreadPool.hh.

Referenced by execute_thread(), and stop_thread().

◆ m_task_cond

condition_t PTL::ThreadPool::m_task_cond = std::make_shared<Condition>()
private

◆ m_task_lock

lock_t PTL::ThreadPool::m_task_lock = std::make_shared<Mutex>()
private

◆ m_task_queue

task_queue_t* PTL::ThreadPool::m_task_queue = nullptr
private

◆ m_tbb_task_arena

tbb_task_arena_t* PTL::ThreadPool::m_tbb_task_arena = nullptr
private

Definition at line 256 of file ThreadPool.hh.

Referenced by destroy_threadpool(), and get_task_arena().

◆ m_tbb_task_group

tbb_task_group_t* PTL::ThreadPool::m_tbb_task_group = nullptr
private

◆ m_tbb_tp

bool PTL::ThreadPool::m_tbb_tp = false
private

◆ m_thread_active

atomic_int_type PTL::ThreadPool::m_thread_active = std::make_shared<std::atomic_uintmax_t>()
private

Definition at line 239 of file ThreadPool.hh.

Referenced by destroy_threadpool(), record_entry(), and record_exit().

◆ m_thread_awake

atomic_int_type PTL::ThreadPool::m_thread_awake = std::make_shared<std::atomic_uintmax_t>()
private

Definition at line 238 of file ThreadPool.hh.

Referenced by execute_thread(), get_active_threads_count(), and notify().

◆ m_thread_data

thread_data_t PTL::ThreadPool::m_thread_data {}
private

Definition at line 252 of file ThreadPool.hh.

Referenced by destroy_threadpool(), and initialize_threadpool().

◆ m_threads

thread_vec_t PTL::ThreadPool::m_threads {}
private

Definition at line 251 of file ThreadPool.hh.

Referenced by destroy_threadpool(), initialize_threadpool(), and ~ThreadPool().

◆ m_use_affinity

bool PTL::ThreadPool::m_use_affinity = false
private

Definition at line 230 of file ThreadPool.hh.

Referenced by initialize_threadpool(), and using_affinity().

◆ m_verbose

int PTL::ThreadPool::m_verbose = GetEnv<int>("PTL_VERBOSE", 0)
private

The documentation for this class was generated from the following files: