32#ifndef MADNESS_WORLD_THREAD_H__INCLUDED
33#define MADNESS_WORLD_THREAD_H__INCLUDED
60#ifdef MADNESS_TASK_PROFILING
66 extern char * cplus_demangle (
const char *mangled,
int options);
75#include <tbb/task_arena.h>
76#ifndef TBB_PREVIEW_GLOBAL_CONTROL
77# define TBB_PREVIEW_GLOBAL_CONTROL 1
79# include <tbb/global_control.h>
83#ifndef _SC_NPROCESSORS_CONF
87#include <sys/sysctl.h>
97 void error(
const char *msg);
118 sched_getaffinity(0,
sizeof(
mask), &
mask);
119 for (
size_t i=0; i<
maxncpu; i++) {
120 if (CPU_ISSET(
int(i),&
mask)) {
126 std::cout <<
"ncpu: " <<
get_ncpu() << std::endl;
127 for (
size_t i=0; i<
get_ncpu(); i++) {
130 std::cout << std::endl;
134 if (this->print) { };
151 sched_setaffinity(0,
sizeof(
mask), &
mask);
152 if (
print) std::cout <<
"bound thread to " << cpu << std::endl;
158 extern ThreadBinder
binder;
180 static void*
main(
void* self);
187 const int rc = pthread_key_create(&
thread_key,
nullptr);
205#if defined(HAVE_IBMBGQ) and defined(HPM)
206 static const int hpm_thread_id_all = -10;
207 static const int hpm_thread_id_main = -2;
208 static bool main_instrumented;
209 static bool all_instrumented;
210 static int hpm_thread_id;
250 return pthread_cancel(
get_id());
267#if defined(HAVE_IBMBGQ) and defined(HPM)
272 static void set_hpm_thread_env(
int hpm_thread_id);
391 else flags &= ~STEALABLE;
402 flags &= ~HIGHPRIORITY;
439 template <
typename Archive>
531#ifdef MADNESS_TASK_PROFILING
533 namespace profiling {
542 std::pair<void*, unsigned short>
id_;
558 const char*
name = abi::__cxa_demangle(symbol, 0, 0, &
status);
560 char*
name = cplus_demangle(symbol, DMGL_NO_OPTS);
567 os << symbol <<
"\t";
581 void*
const * func_ptr =
const_cast<void*
const *
>(&
id_.first);
582 char** bt_sym = backtrace_symbols(func_ptr, 1);
586 std::string mangled_name;
591 std::istringstream iss(bt_sym[0]);
593 std::string file, address;
594 iss >> frame >> file >> address >> mangled_name;
598 const char* first = strchr(bt_sym[0],
'(');
601 const char* last = strrchr(first,
'+');
603 mangled_name.assign(first, (last - first) - 1);
624 void start(
const std::pair<void*, unsigned short>&
id,
625 const unsigned short threads,
const double submit_time)
653 os << std::hex << std::showbase << te.
id_.first <<
654 std::dec << std::noshowbase <<
"\t";
657 switch(te.
id_.second) {
660 const std::string mangled_name = te.
get_name();
663 if(! mangled_name.empty())
679 const std::streamsize precision = os.precision();
681 os << std::fixed <<
"\t" << te.
times_[0]
683 os.precision(precision);
783 for(std::size_t i = 0; i <
n_; ++i)
784 os << thread_id <<
"\t" <<
events_[i] << std::endl;
823 while(
head_ !=
nullptr) {
840 if(
head_ !=
nullptr) {
878#ifdef MADNESS_TASK_PROFILING
881 std::pair<void*, unsigned short>
id_;
904 template <
typename T>
919 template <
typename fnT>
920 static typename std::enable_if<detail::function_traits<fnT>::value ||
922 make_id(std::pair<void*,unsigned short>&
id, fnT fn) {
934 template <
typename fnobjT>
937 make_id(std::pair<void*,unsigned short>&
id,
const fnobjT&) {
938 id.first =
reinterpret_cast<void*
>(
const_cast<char*
>(
typeid(fnobjT).
name()));
948 virtual void get_id(std::pair<void*,unsigned short>&
id)
const {
953#ifndef HAVE_INTEL_TBB
961 bool run_multi_threaded() {
968#ifdef MADNESS_TASK_PROFILING
972#ifdef MADNESS_TASK_PROFILING
979 volatile bool barrier_flag;
982#ifdef MADNESS_TASK_PROFILING
987 run(TaskThreadEnv(nthread,
id, barrier));
989#ifdef MADNESS_TASK_PROFILING
990 const bool cleanup = barrier->
enter(
id);
994 return barrier->
enter(
id);
1030 ParsecRuntime::delete_parsec_task(parsec_task);
1031 parsec_task =
nullptr;
1048 barrier =
new Barrier(nthread);
1055 parsec_task_t *parsec_task;
1131 virtual void get_id(std::pair<void*,unsigned short>&
id)
const {
1143#ifdef MADNESS_TASK_PROFILING
1151#ifdef MADNESS_TASK_PROFILING
1195#if defined(HAVE_IBMBGQ) and defined(HPM)
1196 static unsigned int main_hpmctx;
1215 if (!wait &&
queue.empty())
return false;
1216 std::pair<PoolTaskInterface*,bool> t =
queue.pop_front(wait);
1217#ifdef MADNESS_TASK_PROFILING
1222 if (t.second && t.first) {
1223#ifdef MADNESS_TASK_PROFILING
1224 t.first->set_event(event_list->
event());
1226 if (t.first->run_multi_threaded())
1255 int ntask =
queue.pop_front(
nmax, taskbuf, wait);
1256#ifdef MADNESS_TASK_PROFILING
1260 for (
int i=0; i<ntask; ++i) {
1262#ifdef MADNESS_TASK_PROFILING
1265 if (taskbuf[i]->run_multi_threaded()) {
1273 ntask = parsec_runtime->test();
1299#ifndef MADNESS_ASSERTIONS_DISABLE
1301 std::cerr <<
"!!! ERROR: The thread pool has not been initialized.\n"
1302 <<
"!!! ERROR: Call madness::initialize before submitting tasks to the task queue.\n";
1310#if !(defined(HAVE_INTEL_TBB) || defined(HAVE_PARSEC))
1311 queue.lock_and_flush_prebuf();
1317 static ParsecRuntime *parsec_runtime;
1330 static void begin(
int nthread=-1);
1340#ifdef MADNESS_TASK_PROFILING
1346 parsec_runtime->schedule(
task);
1356 [task_p = std::unique_ptr<PoolTaskInterface>(
task)] ()
noexcept {
1362 int task_threads =
task->get_nthread();
1365 if (
task->is_high_priority() && (task_threads == 1)) {
1379 template <
typename opT>
1387 static void add(
const std::vector<PoolTaskInterface*>& tasks) {
1389 MADNESS_EXCEPTION(
"Do not add tasks to the madness task queue when using Intel TBB.", 1);
1391 typedef std::vector<PoolTaskInterface*>::const_iterator iteratorT;
1392 for (iteratorT it=tasks.begin(); it!=tasks.end(); ++it) {
1402#ifdef HAVE_INTEL_TBB
1406#ifdef MADNESS_TASK_PROFILING
1449 template <
typename Probe>
1450 static void await(
const Probe& probe,
bool dowork =
true,
bool sleep =
false) {
1470 if(((
current_time - start) > timeout) && (timeout > 1.0)) {
1471 std::cerr <<
"!!MADNESS: Hung queue?" << std::endl;
1472 if (counter++ > 3) {
1475 snprintf(errstr,
bufsize,
"ThreadPool::await() timed out after %.1lf seconds", timeout);
1477 __LINE__, __FUNCTION__,
1511 delete parsec_runtime;
1522 int sleep_duration_in_microseconds = 0) {
1523#if !HAVE_INTEL_TBB && !HAVE_PARSEC
1525 sleep_duration_in_microseconds);
1542 int sleep_duration_in_microseconds = 0) {
1549#if !(defined(HAVE_PARSEC) || defined(HAVE_INTEL_TBB))
1555 template<
class F,
class... Args>
1557 noexcept(std::is_nothrow_invocable_v<
F, Args...>) {
1559 return std::invoke(std::forward<F>(
f), std::forward<Args>(args)...);
1562 template<
class R,
class F,
class... Args>
1564 noexcept(std::is_nothrow_invocable_v<
F, Args...>) {
1566#if __cplusplus < 202302L
1567 if constexpr (std::is_void_v<R>)
1568 std::invoke(std::forward<F>(
f), std::forward<Args>(args)...);
1570 return std::invoke(std::forward<F>(
f), std::forward<Args>(args)...);
1572 return std::invoke_r<R>(std::forward<F>(
f), std::forward<Args>(args)...);
simple class for testing the solver
Definition derivatives.cc:60
An integer with atomic set, get, read+increment, read+decrement, and decrement+test operations.
Definition atomicint.h:126
Definition worldmutex.h:700
bool enter(const int id)
Each thread calls this with its id (0,..,nthread-1) to enter the barrier.
Definition worldmutex.h:729
void register_thread(int id, volatile bool *pflag)
Each thread calls this once before first use.
Definition worldmutex.h:718
A thread safe, fast but simple doubled-ended queue.
Definition dqueue.h:80
Base class for exceptions thrown in MADNESS.
Definition madness_exception.h:66
Definition worldmutex.h:109
void wait()
Definition worldmutex.cc:103
void reset()
Definition worldmutex.h:124
Mutex using pthread mutex operations.
Definition worldmutex.h:131
Lowest level task interface.
Definition thread.h:873
double submit_time_
Definition thread.h:880
void execute()
Definition thread.h:1093
virtual ~PoolTaskInterface()=default
Destructor.
static std::enable_if<!(detail::function_traits< fnobjT >::value||detail::memfunc_traits< fnobjT >::value)>::type make_id(std::pair< void *, unsigned short > &id, const fnobjT &)
Definition thread.h:937
profiling::TaskEvent * task_event_
Definition thread.h:879
PoolTaskInterface()
Default constructor.
Definition thread.h:1064
virtual void run(const TaskThreadEnv &info)=0
Override this method to implement a multi-threaded task.
PoolTaskInterface(const TaskAttributes &attr)
Definition thread.h:1071
std::pair< void *, unsigned short > id_
Definition thread.h:881
void set_event(profiling::TaskEvent *task_event)
Definition thread.h:887
void submit()
Collect info on the task and record the submit time.
Definition thread.h:892
static std::enable_if< detail::function_traits< fnT >::value||detail::memfunc_traits< fnT >::value >::type make_id(std::pair< void *, unsigned short > &id, fnT fn)
Definition thread.h:922
virtual void get_id(std::pair< void *, unsigned short > &id) const
Definition thread.h:948
void set_nthread(int nthread)
Call this to reset the number of threads before the task is submitted.
Definition thread.h:1084
A no-operation task used for various purposes.
Definition thread.h:1118
void run(const TaskThreadEnv &)
Execution function that does nothing.
Definition thread.h:1121
virtual ~PoolTaskNull()
Destructor.
Definition thread.h:1124
virtual void get_id(std::pair< void *, unsigned short > &id) const
Definition thread.h:1131
Contains attributes of a task.
Definition thread.h:329
TaskAttributes & set_highpriority(bool hipri)
Sets the high priority attribute.
Definition thread.h:398
TaskAttributes(const TaskAttributes &attr)
Copy constructor.
Definition thread.h:349
bool is_generator() const
Test if the generator attribute is true.
Definition thread.h:357
static TaskAttributes hipri()
Definition thread.h:456
TaskAttributes & set_generator(bool generator_hint)
Sets the generator attribute.
Definition thread.h:378
static const unsigned long GENERATOR
Mask for generator bit.
Definition thread.h:334
static const unsigned long STEALABLE
Mask for stealable bit.
Definition thread.h:335
bool is_stealable() const
Test if the stealable attribute is true.
Definition thread.h:364
TaskAttributes(unsigned long flags=0)
Sets the attributes to the desired values.
Definition thread.h:343
static const unsigned long NTHREAD
Mask for nthread byte.
Definition thread.h:333
void set_nthread(int nthread)
Set the number of threads.
Definition thread.h:420
static TaskAttributes generator()
Definition thread.h:448
bool is_high_priority() const
Test if the high priority attribute is true.
Definition thread.h:371
TaskAttributes & set_stealable(bool stealable)
Sets the stealable attribute.
Definition thread.h:389
static TaskAttributes multi_threaded(int nthread)
Definition thread.h:464
void serialize(Archive &ar)
Serializes the attributes for I/O.
Definition thread.h:440
unsigned long flags
Byte-string storing the specified attributes.
Definition thread.h:330
int get_nthread() const
Get the number of threads.
Definition thread.h:428
static const unsigned long HIGHPRIORITY
Mask for priority bit.
Definition thread.h:336
virtual ~TaskAttributes()
Definition thread.h:352
Used to pass information about the thread environment to a user's task.
Definition thread.h:472
int id() const
Get the ID of this thread.
Definition thread.h:512
TaskThreadEnv(int nthread, int id, Barrier *barrier)
Constructor collecting necessary environmental information.
Definition thread.h:484
TaskThreadEnv(int nthread, int id)
Constructor collecting necessary environmental information.
Definition thread.h:497
bool barrier() const
Definition thread.h:520
Barrier * _barrier
Pointer to the shared barrier, null if there is only a single thread.
Definition thread.h:475
const int _nthread
Number of threads collaborating on task.
Definition thread.h:473
int nthread() const
Get the number of threads collaborating on this task.
Definition thread.h:505
const int _id
ID of this thread (0,...,nthread-1).
Definition thread.h:474
Simplified thread wrapper to hide pthread complexity.
Definition thread.h:170
virtual void run()=0
Function to be executed by the thread.
int pool_num
Stores index of thread in pool or -1.
Definition thread.h:182
void set_pool_thread_index(int i)
Sets the index of this thread within the pool.
Definition thread.h:201
static void exit()
A thread can call this to terminate its execution.
Definition thread.h:232
ThreadBase()
Default constructor.
Definition thread.h:219
const pthread_t & get_id() const
Get the pthread id of this thread (if running).
Definition thread.h:237
static pthread_key_t thread_key
Thread id key.
Definition thread.h:173
static ThreadBase * this_thread()
Definition thread.h:263
int cancel() const
Cancel this thread.
Definition thread.h:249
pthread_t id
Definition thread.h:183
static void delete_thread_key()
Definition thread.h:193
static int num_hw_processors()
Get number of actual hardware processors.
Definition thread.cc:174
void start()
Start the thread running.
Definition thread.cc:158
virtual ~ThreadBase()
Definition thread.h:221
static void init_thread_key()
Definition thread.h:186
int get_pool_thread_index() const
Get index of this thread in ThreadPool.
Definition thread.h:244
bool do_bind
Definition thread.h:107
void bind()
Definition thread.h:143
ThreadBinder(bool print=false)
Definition thread.h:114
size_t cpus[maxncpu]
Definition thread.h:108
std::atomic< size_t > nextcpu
Definition thread.h:109
const size_t * get_cpus() const
Definition thread.h:139
bool print
Definition thread.h:105
static const size_t maxncpu
Definition thread.h:104
static thread_local bool bound
Definition thread.h:110
void set_do_bind(bool value)
Definition thread.h:137
size_t ncpu
Definition thread.h:106
const size_t get_ncpu() const
Definition thread.h:141
ThreadPool thread object.
Definition thread.h:1140
ThreadPoolThread()
Definition thread.h:1148
profiling::TaskProfiler profiler_
Definition thread.h:1144
profiling::TaskProfiler & profiler()
Task profiler accessor.
Definition thread.h:1156
virtual ~ThreadPoolThread()=default
A singleton pool of threads for dynamic execution of tasks.
Definition thread.h:1166
static void add(PoolTaskInterface *task)
Add a new task to the pool.
Definition thread.h:1339
int nthreads
Number of threads.
Definition thread.h:1186
static std::unique_ptr< tbb::task_arena > tbb_arena
Definition thread.h:1323
static bool run_task()
An otherwise idle thread can all this to run a task.
Definition thread.h:1401
static void add(const std::vector< PoolTaskInterface * > &tasks)
Add a vector of tasks to the pool.
Definition thread.h:1387
ThreadPool(ThreadPool &&)=delete
void operator=(ThreadPool &&)=delete
volatile bool finish
Set to true when time to stop.
Definition thread.h:1187
ThreadPool(const ThreadPool &)=delete
static void set_wait_policy(WaitPolicy policy, int sleep_duration_in_microseconds=0)
Definition thread.h:1520
static int default_nthread()
Get the number of threads from the environment.
Definition thread.cc:325
static const DQStats & get_stats()
Returns queue statistics.
Definition thread.cc:466
static void end()
Definition thread.cc:437
AtomicInt nfinished
Thread pool exit counter.
Definition thread.h:1188
void operator=(const ThreadPool &)=delete
static ThreadPool * instance_ptr
Singleton pointer.
Definition thread.h:1191
ThreadPoolThread * threads
Array of threads.
Definition thread.h:1183
void thread_main(ThreadPoolThread *const thread)
Definition thread.cc:349
~ThreadPool()
Destructor.
Definition thread.h:1508
void scan(opT &op)
Definition thread.h:1380
void flush_prebuf()
Definition thread.h:1309
static void begin(int nthread=-1)
Please invoke while in a single-threaded environment.
Definition thread.cc:379
static ThreadPool * instance()
Return a pointer to the only instance, constructing as necessary.
Definition thread.h:1298
static double await_timeout
Waiter timeout.
Definition thread.h:1193
DQueue< PoolTaskInterface * > queue
Queue of tasks.
Definition thread.h:1185
bool run_tasks(bool wait, ThreadPoolThread *const this_thread)
Definition thread.h:1239
static std::size_t queue_size()
Returns the number of tasks in the queue.
Definition thread.h:1426
ThreadPoolThread main_thread
Placeholder for main thread tls.
Definition thread.h:1184
static const int nmax
Number of task a worker thread will pop from the task queue.
Definition thread.h:1192
bool run_task(bool wait, ThreadPoolThread *this_thread)
Run the next task.
Definition thread.h:1210
static void * pool_thread_main(void *v)
Forwards the thread to bound member function.
Definition thread.cc:374
static void await(const Probe &probe, bool dowork=true, bool sleep=false)
Gracefully wait for a condition to become true, executing any tasks in the queue.
Definition thread.h:1450
static std::unique_ptr< tbb::global_control > tbb_control
Definition thread.h:1322
static const ThreadPoolThread * get_threads()
Definition thread.h:1437
static std::size_t size()
Returns the number of threads in the pool.
Definition thread.h:1419
Simplified thread wrapper to hide pthread complexity.
Definition thread.h:277
void start(void *(*f)(void *), void *args=nullptr)
Start the thread by running f(args).
Definition thread.h:305
virtual ~Thread()=default
Thread(void *(*f)(void *), void *args=nullptr)
Create a thread and start it running f(args).
Definition thread.h:296
Thread()
Default constructor.
Definition thread.h:290
void run()
Invokes the function for this thread.
Definition thread.h:282
void * args
The arguments passed to this thread for execution.
Definition thread.h:279
void *(* f)(void *)
The function called for executing this thread.
Definition thread.h:278
Multi-threaded queue to manage and run tasks.
Definition world_task_queue.h:319
Task event list base class.
Definition thread.h:692
virtual ~TaskEventListBase()=default
Virtual destructor.
TaskEventListBase * next_
The next task event in the list.
Definition thread.h:694
friend std::ostream & operator<<(std::ostream &os, const TaskEventListBase &tel)
Output a task event list to an output stream.
Definition thread.h:729
TaskEventListBase & operator=(const TaskEventListBase &)=delete
TaskEventListBase * next() const
Get the next event list in the linked list.
Definition thread.h:711
virtual std::ostream & print_events(std::ostream &) const =0
Print the events.
TaskEventListBase()
Default constructor.
Definition thread.h:702
void insert(TaskEventListBase *list)
Insert list after this list.
Definition thread.h:718
TaskEventListBase(const TaskEventListBase &)=delete
A list of task events.
Definition thread.h:743
TaskEvent * event()
Get a new event from this list.
Definition thread.h:771
TaskEventList & operator=(const TaskEventList &)=delete
TaskEventList(const unsigned int nmax)
Default constructor.
Definition thread.h:758
virtual std::ostream & print_events(std::ostream &os) const
Print events recorded in this list.
Definition thread.h:781
unsigned int n_
The number of events recorded.
Definition thread.h:745
TaskEventList(const TaskEventList &)=delete
virtual ~TaskEventList()=default
Virtual destructor.
std::unique_ptr< TaskEvent[]> events_
The event array.
Definition thread.h:746
Task event class.
Definition thread.h:539
std::string get_name() const
Get name of the function pointer.
Definition thread.h:577
friend std::ostream & operator<<(std::ostream &os, const TaskEvent &te)
Output the task data using a tab-separated list.
Definition thread.h:651
void start(const std::pair< void *, unsigned short > &id, const unsigned short threads, const double submit_time)
Record the start time of the task and collect task information.
Definition thread.h:624
static void print_demangled(std::ostream &os, const char *symbol)
Print demangled symbol name.
Definition thread.h:553
unsigned short threads_
Number of threads used by the task.
Definition thread.h:543
void stop()
Record the stop time of the task.
Definition thread.h:634
double times_[3]
Task trace times: { submit, start, stop }.
Definition thread.h:541
std::pair< void *, unsigned short > id_
Task identification information.
Definition thread.h:542
This class collects and prints task profiling data.
Definition thread.h:795
TaskEventListBase * head_
The head of the linked list of data.
Definition thread.h:797
TaskProfiler()
Default constructor.
Definition thread.h:815
void write_to_file()
Write the profile data to file.
Definition thread.cc:213
~TaskProfiler()
Destructor.
Definition thread.h:820
TaskEventList * new_list(const std::size_t nmax)
Create a new task event list.
Definition thread.h:835
TaskProfiler & operator=(const TaskProfiler &)=delete
TaskEventListBase * tail_
The tail of the linked list of data.
Definition thread.h:798
TaskProfiler(const TaskProfiler &)=delete
static const char * output_file_name_
The output file name.
Definition thread.h:811
static Mutex output_mutex_
Mutex used to lock the output file.
Definition thread.h:800
static const double R
Definition csqrt.cc:46
const std::size_t bufsize
Definition derivatives.cc:16
real_function_3d mask
Definition dirac-hatom.cc:27
auto T(World &world, response_space &f) -> response_space
Definition global_functions.cc:34
void threadpool_wait_policy(WaitPolicy policy, int sleep_duration_in_microseconds=0)
Definition thread.h:1541
static const double v
Definition hatom_sf_dirac.cc:20
Tensor< double > op(const Tensor< double > &x)
Definition kain.cc:508
#define MADNESS_CHECK(condition)
Check a condition — even in a release build the condition is always evaluated so it can have side eff...
Definition madness_exception.h:182
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
WaitPolicy
wait policies supported by ConditionVariable/DQueue/ThreadPool
Definition worldmutex.h:492
static double cpu_time()
Returns the cpu time in seconds relative to an arbitrary origin.
Definition timers.h:127
ThreadBinder binder
Definition thread.cc:71
void thread_purge()
Definition thread.h:1548
static void myusleep(unsigned int us)
Sleep or spin for specified number of microseconds.
Definition timers.h:164
bool is_madness_thread()
Definition thread_info.h:70
NDIM & f
Definition mra.h:2481
void error(const char *msg)
Definition world.cc:139
double wall_time()
Returns the wall time in seconds relative to an arbitrary origin.
Definition timers.cc:48
std::string type(const PairType &n)
Definition PNOParameters.h:18
constexpr R blocking_invoke_r(F &&f, Args &&... args) noexcept(std::is_nothrow_invocable_v< F, Args... >)
Definition thread.h:1563
constexpr decltype(auto) blocking_invoke(F &&f, Args &&... args) noexcept(std::is_nothrow_invocable_v< F, Args... >)
Definition thread.h:1556
std::string name(const FuncType &type, const int ex=-1)
Definition ccpairfunction.h:28
Function traits in the spirit of boost function traits.
Definition function_traits.h:13
Member function traits in the spirit of boost function traits.
Definition function_traits.h:21
static double current_time
Definition tdse1d.cc:160
int task(int i)
Definition test_runtime.cpp:4
int main()
Definition testmolbas.cc:37
const char * status[2]
Definition testperiodic.cc:43
Implements thread introspection for Pthreads backend.
Object that is used to convert function and member function pointers into void*.
Definition thread.h:905
T in
Definition thread.h:906
void * out
Definition thread.h:907