32 #ifndef MADNESS_WORLD_THREAD_H__INCLUDED
33 #define MADNESS_WORLD_THREAD_H__INCLUDED
48 #include <type_traits>
58 #ifdef MADNESS_TASK_PROFILING
64 extern char * cplus_demangle (
const char *mangled,
int options);
65 #define DMGL_NO_OPTS 0
73 #include <tbb/task_arena.h>
74 #ifndef TBB_PREVIEW_GLOBAL_CONTROL
75 # define TBB_PREVIEW_GLOBAL_CONTROL 1
77 # include <tbb/global_control.h>
81 #ifndef _SC_NPROCESSORS_CONF
84 #include <sys/types.h>
85 #include <sys/sysctl.h>
95 void error(
const char *msg);
112 sched_getaffinity(0,
sizeof(
mask), &
mask);
113 for (
size_t i=0; i<
maxncpu; i++) {
114 if (CPU_ISSET(
int(i),&
mask)) {
120 std::cout <<
"ncpu: " <<
get_ncpu() << std::endl;
121 for (
size_t i=0; i<
get_ncpu(); i++) {
124 std::cout << std::endl;
128 if (this->print) { };
145 sched_setaffinity(0,
sizeof(
mask), &
mask);
146 if (
print) std::cout <<
"bound thread to " << cpu << std::endl;
152 extern ThreadBinder
binder;
174 static void*
main(
void*
self);
181 const int rc = pthread_key_create(&
thread_key,
nullptr);
199 #if defined(HAVE_IBMBGQ) and defined(HPM)
200 static const int hpm_thread_id_all = -10;
201 static const int hpm_thread_id_main = -2;
202 static bool main_instrumented;
203 static bool all_instrumented;
204 static int hpm_thread_id;
244 return pthread_cancel(
get_id());
261 #if defined(HAVE_IBMBGQ) and defined(HPM)
266 static void set_hpm_thread_env(
int hpm_thread_id);
433 template <
typename Archive>
525 #ifdef MADNESS_TASK_PROFILING
527 namespace profiling {
536 std::pair<void*, unsigned short>
id_;
551 #ifndef USE_LIBIBERTY
552 const char*
name = abi::__cxa_demangle(symbol, 0, 0, &
status);
554 char*
name = cplus_demangle(symbol, DMGL_NO_OPTS);
561 os << symbol <<
"\t";
575 void*
const * func_ptr =
const_cast<void*
const *
>(&
id_.first);
576 char** bt_sym = backtrace_symbols(func_ptr, 1);
580 std::string mangled_name;
585 std::istringstream iss(bt_sym[0]);
587 std::string file, address;
588 iss >> frame >> file >> address >> mangled_name;
592 const char* first = strchr(bt_sym[0],
'(');
595 const char* last = strrchr(first,
'+');
597 mangled_name.assign(first, (last - first) - 1);
618 void start(
const std::pair<void*, unsigned short>&
id,
619 const unsigned short threads,
const double submit_time)
647 os << std::hex << std::showbase << te.
id_.first <<
648 std::dec << std::noshowbase <<
"\t";
651 switch(te.
id_.second) {
654 const std::string mangled_name = te.
get_name();
657 if(! mangled_name.empty())
673 const std::streamsize precision = os.precision();
675 os << std::fixed <<
"\t" << te.
times_[0]
677 os.precision(precision);
777 for(std::size_t i = 0; i <
n_; ++i)
778 os << thread_id <<
"\t" <<
events_[i] << std::endl;
817 while(
head_ !=
nullptr) {
834 if(
head_ !=
nullptr) {
872 #ifdef MADNESS_TASK_PROFILING
875 std::pair<void*, unsigned short>
id_;
898 template <
typename T>
913 template <
typename fnT>
914 static typename std::enable_if<detail::function_traits<fnT>::value ||
916 make_id(std::pair<void*,unsigned short>&
id, fnT fn) {
928 template <
typename fnobjT>
931 make_id(std::pair<void*,unsigned short>&
id,
const fnobjT&) {
932 id.first =
reinterpret_cast<void*
>(
const_cast<char*
>(
typeid(fnobjT).
name()));
942 virtual void get_id(std::pair<void*,unsigned short>&
id)
const {
947 #ifndef HAVE_INTEL_TBB
955 bool run_multi_threaded() {
962 #ifdef MADNESS_TASK_PROFILING
966 #ifdef MADNESS_TASK_PROFILING
973 volatile bool barrier_flag;
976 #ifdef MADNESS_TASK_PROFILING
981 run(TaskThreadEnv(nthread,
id, barrier));
983 #ifdef MADNESS_TASK_PROFILING
984 const bool cleanup = barrier->
enter(
id);
988 return barrier->
enter(
id);
1024 ParsecRuntime::delete_parsec_task(parsec_task);
1025 parsec_task =
nullptr;
1042 barrier =
new Barrier(nthread);
1049 parsec_task_t *parsec_task;
1125 virtual void get_id(std::pair<void*,unsigned short>&
id)
const {
1137 #ifdef MADNESS_TASK_PROFILING
1145 #ifdef MADNESS_TASK_PROFILING
1189 #if defined(HAVE_IBMBGQ) and defined(HPM)
1190 static unsigned int main_hpmctx;
1209 if (!wait &&
queue.empty())
return false;
1210 std::pair<PoolTaskInterface*,bool> t =
queue.pop_front(wait);
1211 #ifdef MADNESS_TASK_PROFILING
1216 if (t.second && t.first) {
1217 #ifdef MADNESS_TASK_PROFILING
1218 t.first->set_event(event_list->
event());
1220 if (t.first->run_multi_threaded())
1249 int ntask =
queue.pop_front(
nmax, taskbuf, wait);
1250 #ifdef MADNESS_TASK_PROFILING
1254 for (
int i=0; i<ntask; ++i) {
1256 #ifdef MADNESS_TASK_PROFILING
1259 if (taskbuf[i]->run_multi_threaded()) {
1267 ntask = parsec_runtime->test();
1293 #ifndef MADNESS_ASSERTIONS_DISABLE
1295 std::cerr <<
"!!! ERROR: The thread pool has not been initialized.\n"
1296 <<
"!!! ERROR: Call madness::initialize before submitting tasks to the task queue.\n";
1304 #if !(defined(HAVE_INTEL_TBB) || defined(HAVE_PARSEC))
1305 queue.lock_and_flush_prebuf();
1311 static ParsecRuntime *parsec_runtime;
1324 static void begin(
int nthread=-1);
1334 #ifdef MADNESS_TASK_PROFILING
1340 parsec_runtime->schedule(
task);
1342 #elif HAVE_INTEL_TBB
1350 [task_p = std::unique_ptr<PoolTaskInterface>(
task)] () noexcept {
1356 int task_threads =
task->get_nthread();
1359 if (
task->is_high_priority() && (task_threads == 1)) {
1373 template <
typename opT>
1381 static void add(
const std::vector<PoolTaskInterface*>& tasks) {
1383 MADNESS_EXCEPTION(
"Do not add tasks to the madness task queue when using Intel TBB.", 1);
1385 typedef std::vector<PoolTaskInterface*>::const_iterator iteratorT;
1386 for (iteratorT it=tasks.begin(); it!=tasks.end(); ++it) {
1396 #ifdef HAVE_INTEL_TBB
1400 #ifdef MADNESS_TASK_PROFILING
1443 template <
typename Probe>
1444 static void await(
const Probe& probe,
bool dowork =
true,
bool sleep =
false) {
1461 if(((
current_time - start) > timeout) && (timeout > 1.0)) {
1462 std::cerr <<
"!!MADNESS: Hung queue?" << std::endl;
1463 if (counter++ > 3) {
1466 snprintf(errstr,
bufsize,
"ThreadPool::await() timed out after %.1lf seconds", timeout);
1468 __LINE__, __FUNCTION__,
1502 delete parsec_runtime;
1504 #elif HAVE_INTEL_TBB
1513 int sleep_duration_in_microseconds = 0) {
1514 #if !HAVE_INTEL_TBB && !HAVE_PARSEC
1516 sleep_duration_in_microseconds);
1533 int sleep_duration_in_microseconds = 0) {
An integer with atomic set, get, read+increment, read+decrement, and decrement+test operations.
Definition: atomicint.h:126
Definition: worldmutex.h:700
bool enter(const int id)
Each thread calls this with its id (0,..,nthread-1) to enter the barrier.
Definition: worldmutex.h:729
void register_thread(int id, volatile bool *pflag)
Each thread calls this once before first use.
Definition: worldmutex.h:718
A thread safe, fast but simple doubled-ended queue.
Definition: dqueue.h:80
Base class for exceptions thrown in MADNESS.
Definition: madness_exception.h:66
Definition: worldmutex.h:109
void wait()
Definition: worldmutex.cc:103
void reset()
Definition: worldmutex.h:124
Mutex using pthread mutex operations.
Definition: worldmutex.h:131
Lowest level task interface.
Definition: thread.h:867
double submit_time_
Definition: thread.h:874
void execute()
Definition: thread.h:1087
static std::enable_if< detail::function_traits< fnT >::value||detail::memfunc_traits< fnT >::value >::type make_id(std::pair< void *, unsigned short > &id, fnT fn)
Definition: thread.h:916
static std::enable_if<!(detail::function_traits< fnobjT >::value||detail::memfunc_traits< fnobjT >::value) >::type make_id(std::pair< void *, unsigned short > &id, const fnobjT &)
Definition: thread.h:931
virtual ~PoolTaskInterface()=default
Destructor.
profiling::TaskEvent * task_event_
Definition: thread.h:873
PoolTaskInterface()
Default constructor.
Definition: thread.h:1058
virtual void run(const TaskThreadEnv &info)=0
Override this method to implement a multi-threaded task.
PoolTaskInterface(const TaskAttributes &attr)
Definition: thread.h:1065
std::pair< void *, unsigned short > id_
Definition: thread.h:875
void set_event(profiling::TaskEvent *task_event)
Definition: thread.h:881
void submit()
Collect info on the task and record the submit time.
Definition: thread.h:886
virtual void get_id(std::pair< void *, unsigned short > &id) const
Definition: thread.h:942
void set_nthread(int nthread)
Call this to reset the number of threads before the task is submitted.
Definition: thread.h:1078
A no-operation task used for various purposes.
Definition: thread.h:1112
void run(const TaskThreadEnv &)
Execution function that does nothing.
Definition: thread.h:1115
virtual ~PoolTaskNull()
Destructor.
Definition: thread.h:1118
virtual void get_id(std::pair< void *, unsigned short > &id) const
Definition: thread.h:1125
Contains attributes of a task.
Definition: thread.h:323
TaskAttributes(const TaskAttributes &attr)
Copy constructor.
Definition: thread.h:343
bool is_generator() const
Test if the generator attribute is true.
Definition: thread.h:351
TaskAttributes & set_generator(bool generator_hint)
Sets the generator attribute.
Definition: thread.h:372
static TaskAttributes hipri()
Definition: thread.h:450
TaskAttributes & set_stealable(bool stealable)
Sets the stealable attribute.
Definition: thread.h:383
static const unsigned long GENERATOR
Mask for generator bit.
Definition: thread.h:328
static const unsigned long STEALABLE
Mask for stealable bit.
Definition: thread.h:329
bool is_stealable() const
Test if the stealable attribute is true.
Definition: thread.h:358
TaskAttributes(unsigned long flags=0)
Sets the attributes to the desired values.
Definition: thread.h:337
static const unsigned long NTHREAD
Mask for nthread byte.
Definition: thread.h:327
void set_nthread(int nthread)
Set the number of threads.
Definition: thread.h:414
static TaskAttributes generator()
Definition: thread.h:442
bool is_high_priority() const
Test if the high priority attribute is true.
Definition: thread.h:365
static TaskAttributes multi_threaded(int nthread)
Definition: thread.h:458
void serialize(Archive &ar)
Serializes the attributes for I/O.
Definition: thread.h:434
unsigned long flags
Byte-string storing the specified attributes.
Definition: thread.h:324
int get_nthread() const
Get the number of threads.
Definition: thread.h:422
static const unsigned long HIGHPRIORITY
Mask for priority bit.
Definition: thread.h:330
TaskAttributes & set_highpriority(bool hipri)
Sets the high priority attribute.
Definition: thread.h:392
virtual ~TaskAttributes()
Definition: thread.h:346
Used to pass information about the thread environment to a user's task.
Definition: thread.h:466
int id() const
Get the ID of this thread.
Definition: thread.h:506
TaskThreadEnv(int nthread, int id, Barrier *barrier)
Constructor collecting necessary environmental information.
Definition: thread.h:478
TaskThreadEnv(int nthread, int id)
Constructor collecting necessary environmental information.
Definition: thread.h:491
bool barrier() const
Definition: thread.h:514
Barrier * _barrier
Pointer to the shared barrier, null if there is only a single thread.
Definition: thread.h:469
const int _nthread
Number of threads collaborating on task.
Definition: thread.h:467
int nthread() const
Get the number of threads collaborating on this task.
Definition: thread.h:499
const int _id
ID of this thread (0,...,nthread-1).
Definition: thread.h:468
Simplified thread wrapper to hide pthread complexity.
Definition: thread.h:164
virtual void run()=0
Function to be executed by the thread.
int pool_num
Stores index of thread in pool or -1.
Definition: thread.h:176
void set_pool_thread_index(int i)
Sets the index of this thread within the pool.
Definition: thread.h:195
static void exit()
A thread can call this to terminate its execution.
Definition: thread.h:226
ThreadBase()
Default constructor.
Definition: thread.h:213
static pthread_key_t thread_key
Thread id key.
Definition: thread.h:167
int cancel() const
Cancel this thread.
Definition: thread.h:243
pthread_t id
Definition: thread.h:177
static void delete_thread_key()
Definition: thread.h:187
static int num_hw_processors()
Get number of actual hardware processors.
Definition: thread.cc:174
void start()
Start the thread running.
Definition: thread.cc:158
virtual ~ThreadBase()
Definition: thread.h:215
static void init_thread_key()
Definition: thread.h:180
const pthread_t & get_id() const
Get the pthread id of this thread (if running).
Definition: thread.h:231
static void * main(void *self)
Definition: thread.cc:92
static ThreadBase * this_thread()
Definition: thread.h:257
int get_pool_thread_index() const
Get index of this thread in ThreadPool.
Definition: thread.h:238
bool do_bind
Definition: thread.h:101
void bind()
Definition: thread.h:137
const size_t * get_cpus() const
Definition: thread.h:133
ThreadBinder(bool print=false)
Definition: thread.h:108
size_t cpus[maxncpu]
Definition: thread.h:102
std::atomic< size_t > nextcpu
Definition: thread.h:103
bool print
Definition: thread.h:99
static const size_t maxncpu
Definition: thread.h:98
static thread_local bool bound
Definition: thread.h:104
void set_do_bind(bool value)
Definition: thread.h:131
size_t ncpu
Definition: thread.h:100
const size_t get_ncpu() const
Definition: thread.h:135
ThreadPool thread object.
Definition: thread.h:1134
ThreadPoolThread()
Definition: thread.h:1142
profiling::TaskProfiler profiler_
Definition: thread.h:1138
virtual ~ThreadPoolThread()=default
profiling::TaskProfiler & profiler()
Task profiler accessor.
Definition: thread.h:1150
A singleton pool of threads for dynamic execution of tasks.
Definition: thread.h:1160
static void add(PoolTaskInterface *task)
Add a new task to the pool.
Definition: thread.h:1333
int nthreads
Number of threads.
Definition: thread.h:1180
static std::unique_ptr< tbb::task_arena > tbb_arena
Definition: thread.h:1317
static bool run_task()
An otherwise idle thread can all this to run a task.
Definition: thread.h:1395
static void add(const std::vector< PoolTaskInterface * > &tasks)
Add a vector of tasks to the pool.
Definition: thread.h:1381
ThreadPool(ThreadPool &&)=delete
void operator=(ThreadPool &&)=delete
volatile bool finish
Set to true when time to stop.
Definition: thread.h:1181
ThreadPool(const ThreadPool &)=delete
static void set_wait_policy(WaitPolicy policy, int sleep_duration_in_microseconds=0)
Definition: thread.h:1511
static int default_nthread()
Get the number of threads from the environment.
Definition: thread.cc:325
static const DQStats & get_stats()
Returns queue statistics.
Definition: thread.cc:466
static void end()
Definition: thread.cc:437
static ThreadPool * instance()
Return a pointer to the only instance, constructing as necessary.
Definition: thread.h:1292
AtomicInt nfinished
Thread pool exit counter.
Definition: thread.h:1182
void operator=(const ThreadPool &)=delete
static ThreadPool * instance_ptr
Singleton pointer.
Definition: thread.h:1185
ThreadPoolThread * threads
Array of threads.
Definition: thread.h:1177
void thread_main(ThreadPoolThread *const thread)
Definition: thread.cc:349
~ThreadPool()
Destructor.
Definition: thread.h:1499
void scan(opT &op)
Definition: thread.h:1374
void flush_prebuf()
Definition: thread.h:1303
static void begin(int nthread=-1)
Please invoke while in a single-threaded environment.
Definition: thread.cc:379
static double await_timeout
Waiter timeout.
Definition: thread.h:1187
DQueue< PoolTaskInterface * > queue
Queue of tasks.
Definition: thread.h:1179
bool run_tasks(bool wait, ThreadPoolThread *const this_thread)
Definition: thread.h:1233
static std::size_t queue_size()
Returns the number of tasks in the queue.
Definition: thread.h:1420
ThreadPoolThread main_thread
Placeholder for main thread tls.
Definition: thread.h:1178
static const int nmax
Number of task a worker thread will pop from the task queue.
Definition: thread.h:1186
bool run_task(bool wait, ThreadPoolThread *this_thread)
Run the next task.
Definition: thread.h:1204
static void * pool_thread_main(void *v)
Forwards the thread to bound member function.
Definition: thread.cc:374
static void await(const Probe &probe, bool dowork=true, bool sleep=false)
Gracefully wait for a condition to become true, executing any tasks in the queue.
Definition: thread.h:1444
static const ThreadPoolThread * get_threads()
Definition: thread.h:1431
static std::unique_ptr< tbb::global_control > tbb_control
Definition: thread.h:1316
static std::size_t size()
Returns the number of threads in the pool.
Definition: thread.h:1413
Simplified thread wrapper to hide pthread complexity.
Definition: thread.h:271
void *(* f)(void *)
The function called for executing this thread.
Definition: thread.h:272
void start(void *(*f)(void *), void *args=nullptr)
Start the thread by running f(args).
Definition: thread.h:299
virtual ~Thread()=default
Thread(void *(*f)(void *), void *args=nullptr)
Create a thread and start it running f(args).
Definition: thread.h:290
Thread()
Default constructor.
Definition: thread.h:284
void run()
Invokes the function for this thread.
Definition: thread.h:276
void * args
The arguments passed to this thread for execution.
Definition: thread.h:273
Multi-threaded queue to manage and run tasks.
Definition: world_task_queue.h:319
Task event list base class.
Definition: thread.h:686
virtual ~TaskEventListBase()=default
Virtual destructor.
TaskEventListBase * next_
The next task event in the list.
Definition: thread.h:688
friend std::ostream & operator<<(std::ostream &os, const TaskEventListBase &tel)
Output a task event list to an output stream.
Definition: thread.h:723
virtual std::ostream & print_events(std::ostream &) const =0
Print the events.
TaskEventListBase & operator=(const TaskEventListBase &)=delete
TaskEventListBase()
Default constructor.
Definition: thread.h:696
TaskEventListBase * next() const
Get the next event list in the linked list.
Definition: thread.h:705
void insert(TaskEventListBase *list)
Insert list after this list.
Definition: thread.h:712
TaskEventListBase(const TaskEventListBase &)=delete
A list of task events.
Definition: thread.h:737
TaskEvent * event()
Get a new event from this list.
Definition: thread.h:765
TaskEventList(const unsigned int nmax)
Default constructor.
Definition: thread.h:752
virtual std::ostream & print_events(std::ostream &os) const
Print events recorded in this list.
Definition: thread.h:775
TaskEventList & operator=(const TaskEventList &)=delete
unsigned int n_
The number of events recorded.
Definition: thread.h:739
TaskEventList(const TaskEventList &)=delete
virtual ~TaskEventList()=default
Virtual destructor.
std::unique_ptr< TaskEvent[]> events_
The event array.
Definition: thread.h:740
Task event class.
Definition: thread.h:533
std::string get_name() const
Get name of the function pointer.
Definition: thread.h:571
void start(const std::pair< void *, unsigned short > &id, const unsigned short threads, const double submit_time)
Record the start time of the task and collect task information.
Definition: thread.h:618
static void print_demangled(std::ostream &os, const char *symbol)
Print demangled symbol name.
Definition: thread.h:547
unsigned short threads_
Number of threads used by the task.
Definition: thread.h:537
friend std::ostream & operator<<(std::ostream &os, const TaskEvent &te)
Output the task data using a tab-separated list.
Definition: thread.h:645
void stop()
Record the stop time of the task.
Definition: thread.h:628
double times_[3]
Task trace times: { submit, start, stop }.
Definition: thread.h:535
std::pair< void *, unsigned short > id_
Task identification information.
Definition: thread.h:536
This class collects and prints task profiling data.
Definition: thread.h:789
TaskEventListBase * head_
The head of the linked list of data.
Definition: thread.h:791
TaskProfiler()
Default constructor.
Definition: thread.h:809
void write_to_file()
Write the profile data to file.
Definition: thread.cc:213
~TaskProfiler()
Destructor.
Definition: thread.h:814
TaskProfiler & operator=(const TaskProfiler &)=delete
TaskEventListBase * tail_
The tail of the linked list of data.
Definition: thread.h:792
TaskEventList * new_list(const std::size_t nmax)
Create a new task event list.
Definition: thread.h:829
TaskProfiler(const TaskProfiler &)=delete
static const char * output_file_name_
The output file name.
Definition: thread.h:805
static Mutex output_mutex_
Mutex used to lock the output file.
Definition: thread.h:794
const std::size_t bufsize
Definition: derivatives.cc:16
real_function_3d mask
Definition: dirac-hatom.cc:27
auto T(World &world, response_space &f) -> response_space
Definition: global_functions.cc:34
void threadpool_wait_policy(WaitPolicy policy, int sleep_duration_in_microseconds=0)
Definition: thread.h:1532
static const double v
Definition: hatom_sf_dirac.cc:20
Tensor< double > op(const Tensor< double > &x)
Definition: kain.cc:508
#define MADNESS_CHECK(condition)
Check a condition — even in a release build the condition is always evaluated so it can have side eff...
Definition: madness_exception.h:190
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition: madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition: madness_exception.h:134
File holds all helper structures necessary for the CC_Operator and CC2 class.
Definition: DFParameters.h:10
WaitPolicy
wait policies supported by ConditionVariable/DQueue/ThreadPool
Definition: worldmutex.h:492
static double cpu_time()
Returns the cpu time in seconds relative to an arbitrary origin.
Definition: timers.h:127
ThreadBinder binder
Definition: thread.cc:71
static void myusleep(unsigned int us)
Sleep or spin for specified number of microseconds.
Definition: timers.h:164
void error(const char *msg)
Definition: world.cc:139
double wall_time()
Returns the wall time in seconds relative to an arbitrary origin.
Definition: timers.cc:48
std::string type(const PairType &n)
Definition: PNOParameters.h:18
std::string name(const FuncType &type, const int ex=-1)
Definition: ccpairfunction.h:28
Function traits in the spirit of boost function traits.
Definition: function_traits.h:13
Member function traits in the spirit of boost function traits.
Definition: function_traits.h:21
static double current_time
Definition: tdse1d.cc:160
int task(int i)
Definition: test_runtime.cpp:4
const char * status[2]
Definition: testperiodic.cc:43
Implements thread introspection.
Object that is used to convert function and member function pointers into void*.
Definition: thread.h:899
T in
Definition: thread.h:900
void * out
Definition: thread.h:901