MADNESS  0.10.1
thread.h
Go to the documentation of this file.
1 /*
2  This file is part of MADNESS.
3 
4  Copyright (C) 2007,2010 Oak Ridge National Laboratory
5 
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program; if not, write to the Free Software
18  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 
20  For more information please contact:
21 
22  Robert J. Harrison
23  Oak Ridge National Laboratory
24  One Bethel Valley Road
25  P.O. Box 2008, MS-6367
26 
27  email: harrisonrj@ornl.gov
28  tel: 865-241-3937
29  fax: 865-572-0680
30 */
31 
32 #ifndef MADNESS_WORLD_THREAD_H__INCLUDED
33 #define MADNESS_WORLD_THREAD_H__INCLUDED
34 
35 /**
36  \file thread.h
37  \brief Implements Dqueue, Thread, ThreadBase and ThreadPool.
38  \ingroup threads
39 */
40 
42 #include <madness/world/dqueue.h>
44 #include <vector>
45 #include <cstddef>
46 #include <cstdio>
47 #include <pthread.h>
48 #include <type_traits>
49 #include <typeinfo>
50 #include <new>
51 
52 //////////// Parsec Related Begin ////////////////////
53 #ifdef HAVE_PARSEC
54 #include "parsec.h"
55 #endif
56 //////////// Parsec Related End ////////////////////
57 
58 #ifdef MADNESS_TASK_PROFILING
59 #include <execinfo.h> // for backtrace_symbols
60 #ifndef USE_LIBIBERTY
61 #include <cxxabi.h> // for abi::__cxa_demangle
62 #else
63 extern "C" {
64  extern char * cplus_demangle (const char *mangled, int options);
65 #define DMGL_NO_OPTS 0 /* For readability... */
66 }
67 #endif
68 #include <sstream> // for std::istringstream
69 #include <cstring> // for strchr & strrchr
70 #endif // MADNESS_TASK_PROFILING
71 
72 #ifdef HAVE_INTEL_TBB
73 #include <tbb/task_arena.h>
74 #ifndef TBB_PREVIEW_GLOBAL_CONTROL
75 # define TBB_PREVIEW_GLOBAL_CONTROL 1
76 #endif
77 # include <tbb/global_control.h>
78 #endif
79 
80 
81 #ifndef _SC_NPROCESSORS_CONF
82 // Old macs don't have necessary support thru sysconf to determine the
83 // no. of processors so must use sysctl
84 #include <sys/types.h>
85 #include <sys/sysctl.h>
86 #endif
87 
88 namespace madness {
89 
90  // Forward decls.
91  class Barrier;
92  class ThreadPool;
93  class WorldTaskQueue;
94  class AtomicInt;
95  void error(const char *msg);
96 
97  class ThreadBinder {
98  static const size_t maxncpu = 1024;
99  bool print;
100  size_t ncpu = 0;
101  bool do_bind = true;
102  size_t cpus[maxncpu];
103  std::atomic<size_t> nextcpu = 0;
104  static thread_local bool bound;
105 
106  public:
107 
108  ThreadBinder(bool print = false) : print(print) {
109 #ifndef ON_A_MAC
110  ncpu = 0;
111  cpu_set_t mask;
112  sched_getaffinity(0, sizeof(mask), &mask);
113  for (size_t i=0; i<maxncpu; i++) {
114  if (CPU_ISSET(int(i),&mask)) {
116  cpus[ncpu++] = i;
117  }
118  }
119  if (print) {
120  std::cout << "ncpu: " << get_ncpu() << std::endl;
121  for (size_t i=0; i<get_ncpu(); i++) {
122  std::cout << get_cpus()[i] << " " ;
123  }
124  std::cout << std::endl;
125  }
126  nextcpu = ncpu/2;
127 #endif
128  if (this->print) { };
129  }
130 
131  void set_do_bind(bool value) {do_bind = value;}
132 
133  const size_t* get_cpus() const { return cpus; }
134 
135  const size_t get_ncpu() const { return ncpu; }
136 
137  void bind() {
138 #ifndef ON_A_MAC
139  if (do_bind && !bound) { // In TBB this is called by each task, so check do_bind first
140  bound = true;
141  cpu_set_t mask;
142  CPU_ZERO(&mask);
143  size_t cpu = cpus[nextcpu++ % ncpu];
144  CPU_SET(cpu, &mask);
145  sched_setaffinity(0, sizeof(mask), &mask);
146  if (print) std::cout << "bound thread to " << cpu << std::endl;
147  }
148 #endif
149  }
150  };
151 
152  extern ThreadBinder binder;
153 
154  /// \addtogroup threads
155  /// @{
156 
157  /// Simplified thread wrapper to hide pthread complexity.
158 
159  /// If the thread is using any of the object state, you cannot
160  /// delete the object until the thread has terminated.
161  ///
162  /// The cleanest solution is to put the object on the heap and
163  /// have the run method `delete this` at its end.
164  class ThreadBase {
165  friend class ThreadPool;
166 
167  static pthread_key_t thread_key; ///< Thread id key.
168 
169  /// \todo Brief description needed.
170 
171  /// \todo Descriptions needed.
172  /// \param[in,out] self Description needed.
173  /// \return Description needed.
174  static void* main(void* self);
175 
176  int pool_num; ///< Stores index of thread in pool or -1.
177  pthread_t id; ///< \todo Brief description needed.
178 
179  /// \todo Brief description needed.
180  static void init_thread_key() {
181  const int rc = pthread_key_create(&thread_key, nullptr);
182  if(rc != 0)
183  MADNESS_EXCEPTION("pthread_key_create failed", rc);
184  }
185 
186  /// \todo Brief description needed.
187  static void delete_thread_key() {
188  pthread_key_delete(thread_key);
189  }
190 
191  /// Sets the index of this thread within the pool.
192 
193  /// \todo Verify documentation.
194  /// \param[in] i The index of this thread.
195  void set_pool_thread_index(int i) {
196  pool_num = i;
197  }
198 
199 #if defined(HAVE_IBMBGQ) and defined(HPM)
200  static const int hpm_thread_id_all = -10; ///< \todo Brief description needed.
201  static const int hpm_thread_id_main = -2; ///< \todo Brief description needed.
202  static bool main_instrumented; ///< \todo Brief description needed.
203  static bool all_instrumented; ///< \todo Brief description needed.
204  static int hpm_thread_id; ///< \todo Brief description needed.
205 #endif
206 
207  public:
208 
209  /// Default constructor.
210 
211  /// Sets up the thread; however, \c start() must be invoked to
212  /// actually begin the thread.
213  ThreadBase() : pool_num(-1) { }
214 
215  virtual ~ThreadBase() { }
216 
217  /// Function to be executed by the thread.
218 
219  /// Override this to do work.
220  virtual void run() = 0;
221 
222  /// Start the thread running.
223  void start();
224 
225  /// A thread can call this to terminate its execution.
226  static void exit() {
227  pthread_exit(0);
228  }
229 
230  /// Get the pthread id of this thread (if running).
231  const pthread_t& get_id() const {
232  return id;
233  }
234 
235  /// Get index of this thread in \c ThreadPool.
236 
237  /// \return (0,...,nthread-1) or -1 if not in the \c ThreadPool.
238  int get_pool_thread_index() const {
239  return pool_num;
240  }
241 
242  /// Cancel this thread.
243  int cancel() const {
244  return pthread_cancel(get_id());
245  }
246 
247 
248  /// Get number of actual hardware processors.
249 
250  /// \return The number of hardward processors.
251  static int num_hw_processors();
252 
253  /// \todo Brief description needed.
254 
255  /// \todo Descriptions needed.
256  /// \return Description needed.
258  return static_cast<ThreadBase*>(pthread_getspecific(thread_key));
259  }
260 
261 #if defined(HAVE_IBMBGQ) and defined(HPM)
262  /// \todo Brief description needed.
263 
264  /// \todo Descriptions needed.
265  /// \param[in] hpm_thread_id Description needed.
266  static void set_hpm_thread_env(int hpm_thread_id);
267 #endif
268  }; // class ThreadBase
269 
270  /// Simplified thread wrapper to hide pthread complexity.
271  class Thread : public ThreadBase {
272  void* (*f)(void *); ///< The function called for executing this thread. \todo should we replace this by a std::function?
273  void* args; ///< The arguments passed to this thread for execution.
274 
275  /// Invokes the function for this thread.
276  void run() {
277  f(args);
278  }
279 
280  public:
281  /// Default constructor.
282 
283  /// \c start() must be invoked to actually execute the thread.
284  Thread() : f(nullptr), args(nullptr) { }
285 
286  /// Create a thread and start it running `f(args)`.
287 
288  /// \param[in] f The function to be called.
289  /// \param[in,out] args The arguments to the function.
290  Thread(void* (*f)(void *), void* args=nullptr)
291  : f(f), args(args) {
293  }
294 
295  /// Start the thread by running `f(args)`.
296 
297  /// \param[in] f The function to be called.
298  /// \param[in,out] args The arguments to the function.
299  void start(void* (*f)(void *), void* args=nullptr) {
300  this->f = f;
301  this->args = args;
303  }
304 
305  virtual ~Thread() = default;
306  }; // class Thread
307 
308 
309  /// Contains attributes of a task.
310 
311  /// The current attributes are:
312  /// - \c generator : Setting this hints that a task will produce
313  /// additional tasks and is used by the scheduler to
314  /// increase/throttle parallelism. The default is false.
315  /// - \c stealable : Setting this indicates that a task may be
316  /// migrated to another process for dynamic load balancing. The
317  /// default value is false.
318  /// - \c highpriority : indicates a high priority task. The default
319  /// value is false.
320  /// - \c nthread : indicates number of threads. 0 threads is interpreted
321  /// as 1 thread for backward compatibility and ease of specifying
322  /// defaults. The default value is 0 (==1).
324  unsigned long flags; ///< Byte-string storing the specified attributes.
325 
326  public:
327  static const unsigned long NTHREAD = 0xff; ///< Mask for nthread byte.
328  static const unsigned long GENERATOR = 1ul<<8; ///< Mask for generator bit.
329  static const unsigned long STEALABLE = GENERATOR<<1; ///< Mask for stealable bit.
330  static const unsigned long HIGHPRIORITY = GENERATOR<<2; ///< Mask for priority bit.
331 
332  /// Sets the attributes to the desired values.
333 
334  /// `flags`, if unspecified sets all attributes to their default
335  /// values.
336  /// \param[in] flags The attribute values.
337  explicit TaskAttributes(unsigned long flags = 0)
338  : flags(flags) {}
339 
340  /// Copy constructor.
341 
342  /// \param[in] attr The attributes to copy.
344  : flags(attr.flags) {}
345 
346  virtual ~TaskAttributes() {}
347 
348  /// Test if the generator attribute is true.
349 
350  /// \return True if this task is a generator, false otherwise.
351  bool is_generator() const {
352  return flags&GENERATOR;
353  }
354 
355  /// Test if the stealable attribute is true.
356 
357  /// \return True if this task is stealable, false otherwise.
358  bool is_stealable() const {
359  return flags&STEALABLE;
360  }
361 
362  /// Test if the high priority attribute is true.
363 
364  /// \return True if this task is a high priority, false otherwise.
365  bool is_high_priority() const {
366  return flags&HIGHPRIORITY;
367  }
368 
369  /// Sets the generator attribute.
370 
371  /// \param[in] generator_hint The new value for the generator attribute.
372  TaskAttributes& set_generator(bool generator_hint) {
373  if (generator_hint)
374  flags |= GENERATOR;
375  else
376  flags &= ~GENERATOR;
377  return *this;
378  }
379 
380  /// Sets the stealable attribute.
381 
382  /// \param[in] stealable The new value for the stealable attribute.
383  TaskAttributes& set_stealable(bool stealable) {
384  if (stealable) flags |= STEALABLE;
385  else flags &= ~STEALABLE;
386  return *this;
387  }
388 
389  /// Sets the high priority attribute.
390 
391  /// \param[in] hipri The new value for the high priority attribute.
393  if (hipri)
394  flags |= HIGHPRIORITY;
395  else
396  flags &= ~HIGHPRIORITY;
397  return *this;
398  }
399 
400  /// Set the number of threads.
401 
402  /// \attention Are you sure this is what you want to call? Only call
403  /// this for a \c TaskAttributes that is \em not a base class of a task
404  /// object.
405  /// \p If you are trying to set the number of threads in an \em existing
406  /// task you should call \c TaskInterface::set_nthread() instead. No
407  /// doubt there is some virtual/protected/something voodoo to prevent
408  /// you from doing harm.
409  ///
410  /// \todo Perhaps investigate a way to make this function only accessible
411  /// from the intended functions (using the so-called voodoo)?
412  ///
413  /// \param[in] nthread The new number of threads.
414  void set_nthread(int nthread) {
415  MADNESS_ASSERT(nthread>=0 && nthread<256);
416  flags = (flags & (~NTHREAD)) | (nthread & NTHREAD);
417  }
418 
419  /// Get the number of threads.
420 
421  /// \return The number of threads.
422  int get_nthread() const {
423  int n = flags & NTHREAD;
424  if (n == 0)
425  n = 1;
426  return n;
427  }
428 
429  /// Serializes the attributes for I/O.
430 
431  /// tparam Archive The archive type.
432  /// \param[in,out] ar The archive.
433  template <typename Archive>
434  void serialize(Archive& ar) {
435  ar & flags;
436  }
437 
438  /// \todo Brief description needed.
439 
440  /// \todo Descriptions needed.
441  /// \return Description needed.
443  return TaskAttributes(GENERATOR);
444  }
445 
446  /// \todo Brief description needed.
447 
448  /// \todo Descriptions needed.
449  /// \return Description needed.
452  }
453 
454  /// \todo Brief description needed.
455 
456  /// \todo Descriptions needed.
457  /// \return Description needed.
458  static TaskAttributes multi_threaded(int nthread) {
459  TaskAttributes t;
460  t.set_nthread(nthread);
461  return t;
462  }
463  };
464 
465  /// Used to pass information about the thread environment to a user's task.
467  const int _nthread; ///< Number of threads collaborating on task.
468  const int _id; ///< ID of this thread (0,...,nthread-1).
469  Barrier* _barrier; ///< Pointer to the shared barrier, `null` if there is only a single thread.
470 
471  public:
472  /// Constructor collecting necessary environmental information.
473 
474  /// \todo Verify this documentation.
475  /// \param[in] nthread The number of threads collaborating on this task.
476  /// \param[in] id The ID of this thread.
477  /// \param[in] barrier Pointer to the shared barrier.
481 
482 #if HAVE_INTEL_TBB
483  /// Constructor collecting necessary environmental information.
484 
485  /// \todo Verify this documentation.
486  /// \param[in] nthread The number of threads collaborating on this task.
487  /// \param[in] id The ID of this thread.
488  ///
489  /// \todo I cannot get the TaskThreadEnv to work with Barrier.
490  /// Need to figure out why.
491  TaskThreadEnv(int nthread, int id)
492  : _nthread(nthread), _id(id), _barrier(nullptr)
493  {::madness::binder.bind();};
494 #endif
495 
496  /// Get the number of threads collaborating on this task.
497 
498  /// \return The number of threads.
499  int nthread() const {
500  return _nthread;
501  }
502 
503  /// Get the ID of this thread.
504 
505  /// \return The ID of this thread.
506  int id() const {
507  return _id;
508  }
509 
510  /// \todo Brief description needed.
511 
512  /// \todo Descriptions needed.
513  /// \return Description needed.
514  bool barrier() const {
515  if (_nthread == 1)
516  return true;
517  else {
519  return _barrier->enter(_id);
520  }
521  }
522  };
523 
524 
525 #ifdef MADNESS_TASK_PROFILING
526 
527  namespace profiling {
528 
529  /// Task event class.
530 
531  /// This class is used to record the task trace information, including
532  /// submit, start, and stop times, as well as identification information.
533  class TaskEvent {
534  private:
535  double times_[3]; ///< Task trace times: { submit, start, stop }.
536  std::pair<void*, unsigned short> id_; ///< Task identification information.
537  unsigned short threads_; ///< Number of threads used by the task.
538 
539  /// Print demangled symbol name.
540 
541  /// Add the demangled symbol name to \c os. If demangling fails,
542  /// the unmodified symbol name is used instead. If symbol is NULL,
543  /// "UNKNOWN" is used instead. A tab character is added after the
544  /// symbol name.
545  /// \param[in,out] os The output stream.
546  /// \param[in] symbol The symbol to add to the stream.
547  static void print_demangled(std::ostream& os, const char* symbol) {
548  // Get the demagled symbol name
549  if(symbol) {
550  int status = 0;
551 #ifndef USE_LIBIBERTY
552  const char* name = abi::__cxa_demangle(symbol, 0, 0, &status);
553 #else
554  char* name = cplus_demangle(symbol, DMGL_NO_OPTS);
555 #endif
556  // Append the demangled symbol name to the output stream
557  if(status == 0) {
558  os << name << "\t";
559  free((void*)name);
560  } else {
561  os << symbol << "\t";
562  }
563  } else {
564  os << "UNKNOWN\t";
565  }
566  }
567 
568  /// Get name of the function pointer.
569 
570  /// \return The mangled function name.
571  std::string get_name() const {
572 
573  // Get the backtrace symbol for the function address,
574  // which contains the function name.
575  void* const * func_ptr = const_cast<void* const *>(& id_.first);
576  char** bt_sym = backtrace_symbols(func_ptr, 1);
577 
578  // Extract the mangled function name from the backtrace
579  // symbol.
580  std::string mangled_name;
581 
582 #ifdef ON_A_MAC
583  // Format of bt_sym is:
584  // <frame #> <file name> <address> <mangled name> + <function offset>
585  std::istringstream iss(bt_sym[0]);
586  long frame;
587  std::string file, address;
588  iss >> frame >> file >> address >> mangled_name;
589 #else // Assume Linux
590  // Format of bt_sym is:
591  // <file>(<mangled name>+<function offset>) [<address>]
592  const char* first = strchr(bt_sym[0],'(');
593  if(first) {
594  ++first;
595  const char* last = strrchr(first,'+');
596  if(last)
597  mangled_name.assign(first, (last - first) - 1);
598  }
599 #endif // ON_A_MAC
600 
601  // Free the backtrace buffer
602  free(bt_sym);
603 
604  return mangled_name;
605  }
606 
607  public:
608 
609  // Only default constructors are needed.
610 
611  /// Record the start time of the task and collect task information.
612 
613  /// \param[in,out] id The task identifier (a function pointer or const char*)
614  /// and an integer to differentiate the different types.
615  /// \param[in] threads The number of threads this task uses.
616  /// \param[in] submit_time The time that the task was submitted to the
617  /// task queue.
618  void start(const std::pair<void*, unsigned short>& id,
619  const unsigned short threads, const double submit_time)
620  {
621  id_ = id;
622  threads_ = threads;
623  times_[0] = submit_time;
624  times_[1] = wall_time();
625  }
626 
627  /// Record the stop time of the task.
628  void stop() {
629  times_[2] = wall_time();
630  }
631 
632  /// Output the task data using a tab-separated list.
633 
634  /// Output information includes
635  /// - the ID pointer
636  /// - the function, member function, and object type name
637  /// - the number of threads used by the task
638  /// - the submit time
639  /// - the start time
640  /// - the stop time.
641  ///
642  /// \param[in,out] os The output stream.
643  /// \param[in] te The task event to be output.
644  /// \return The \c os reference.
645  friend std::ostream& operator<<(std::ostream& os, const TaskEvent& te) {
646  // Add address to output stream
647  os << std::hex << std::showbase << te.id_.first <<
648  std::dec << std::noshowbase << "\t";
649 
650  // Print the name
651  switch(te.id_.second) {
652  case 1:
653  {
654  const std::string mangled_name = te.get_name();
655 
656  // Print the demangled name
657  if(! mangled_name.empty())
658  print_demangled(os, mangled_name.c_str());
659  else
660  os << "UNKNOWN\t";
661  }
662  break;
663  case 2:
664  print_demangled(os, static_cast<const char*>(te.id_.first));
665  break;
666  default:
667  os << "UNKNOWN\t";
668  }
669 
670  // Print:
671  // # of threads, submit time, start time, stop time
672  os << te.threads_;
673  const std::streamsize precision = os.precision();
674  os.precision(6);
675  os << std::fixed << "\t" << te.times_[0]
676  << "\t" << te.times_[1] << "\t" << te.times_[2];
677  os.precision(precision);
678  return os;
679  }
680 
681  }; // class TaskEvent
682 
683  /// Task event list base class.
684 
685  /// This base class allows the data to be stored in a linked list.
687  private:
688  TaskEventListBase* next_; ///< The next task event in the list.
689 
692 
693  public:
694 
695  /// Default constructor.
697  : next_(nullptr) { }
698 
699  /// Virtual destructor.
700  virtual ~TaskEventListBase() = default;
701 
702  /// Get the next event list in the linked list.
703 
704  /// \return The next event list.
706  return next_;
707  }
708 
709  /// Insert \c list after this list.
710 
711  /// \param[in] list The list to be inserted.
712  void insert(TaskEventListBase* list) {
713  if(next_)
714  list->next_ = next_;
715  next_ = list;
716  }
717 
718  /// Output a task event list to an output stream.
719 
720  /// \param[in,out] os The ouptut stream.
721  /// \param[in] tel The task event list to be output.
722  /// \return The modified output stream.
723  friend inline std::ostream& operator<<(std::ostream& os, const TaskEventListBase& tel) {
724  return tel.print_events(os);
725  }
726 
727  private:
728 
729  /// Print the events.
730  virtual std::ostream& print_events(std::ostream&) const = 0;
731 
732  }; // class TaskEventList
733 
734  /// A list of task events.
735 
736  /// This object is used by the thread pool to record task data.
738  private:
739  unsigned int n_; ///< The number of events recorded.
740  std::unique_ptr<TaskEvent[]> events_; ///< The event array.
741 
742  TaskEventList(const TaskEventList&) = delete;
744 
745  public:
746 
747  /// Default constructor.
748 
749  /// \param[in] nmax The maximum number of task events.
750  /// \todo Should nmax be stored? I think it used to be a template
751  /// parameter (N), which is no longer present.
752  TaskEventList(const unsigned int nmax) :
753  TaskEventListBase(), n_(0ul), events_(new TaskEvent[nmax])
754  { }
755 
756  /// Virtual destructor.
757  virtual ~TaskEventList() = default;
758 
759  /// Get a new event from this list.
760 
761  /// \warning This function can only be called \c nmax times. It is
762  /// the caller's resonsibility to ensure that it is not called too
763  /// many times.
764  /// \return The new event from the list.
766  return events_.get() + (n_++);
767  }
768 
769  private:
770 
771  /// Print events recorded in this list.
772 
773  /// \param[in,out] os The output stream.
774  /// \return The modified output stream.
775  virtual std::ostream& print_events(std::ostream& os) const {
776  const int thread_id = ThreadBase::this_thread()->get_pool_thread_index();
777  for(std::size_t i = 0; i < n_; ++i)
778  os << thread_id << "\t" << events_[i] << std::endl;
779  return os;
780  }
781 
782  }; // class TaskEventList
783 
784  /// This class collects and prints task profiling data.
785 
786  /// \note Each thread has its own \c TaskProfiler object, so only one
787  /// thread will ever operate on this object at a time and all operations
788  /// are inheirently thread safe.
789  class TaskProfiler {
790  private:
791  TaskEventListBase* head_; ///< The head of the linked list of data.
792  TaskEventListBase* tail_; ///< The tail of the linked list of data.
793 
794  static Mutex output_mutex_; ///< Mutex used to lock the output file.
795 
796  TaskProfiler(const TaskProfiler&) = delete;
798 
799  public:
800  /// The output file name.
801 
802  /// This variable is initialized by \c ThreadPool::begin and is
803  /// assigned the value given by the environment variable
804  /// `MAD_TASKPROFILER_NAME`.
805  static const char* output_file_name_;
806 
807  public:
808  /// Default constructor.
810  : head_(nullptr), tail_(nullptr)
811  { }
812 
813  /// Destructor.
815  // Cleanup linked list
816  TaskEventListBase* next = nullptr;
817  while(head_ != nullptr) {
818  next = head_->next();
819  delete head_;
820  head_ = next;
821  }
822  }
823 
824  /// Create a new task event list.
825 
826  /// \param[in] nmax The maximum number of elements that the list
827  /// can contain.
828  /// \return A new task event list.
829  TaskEventList* new_list(const std::size_t nmax) {
830  // Create a new event list
831  TaskEventList* list = new TaskEventList(nmax);
832 
833  // Append the list to the tail of the linked list
834  if(head_ != nullptr) {
835  tail_->insert(list);
836  tail_ = list;
837  } else {
838  head_ = list;
839  tail_ = list;
840  }
841  return list;
842  }
843 
844  /// Write the profile data to file.
845 
846  /// The data is cleared after it is written to the file, so this
847  /// function may be called more than once.
848  ///
849  /// \warning This function should only be called from the thread
850  /// that owns it, otherwise data will likely be corrupted.
851  ///
852  /// \note This function is thread safe, in that it may be called by
853  /// different objects in different threads simultaneously.
854  void write_to_file();
855  }; // class TaskProfiler
856 
857  } // namespace profiling
858 
859 #endif // MADNESS_TASK_PROFILING
860 
861 
862  /// Lowest level task interface.
863 
864  /// The pool invokes \c run_multi_threaded(), which does any necessary
865  /// setup for multiple threads, and then invokes the user's \c run() method.
867  {
868  friend class ThreadPool;
869 
870  private:
871 
872 #ifdef MADNESS_TASK_PROFILING
873  profiling::TaskEvent* task_event_; ///< \todo Description needed.
874  double submit_time_; ///< \todo Description needed.
875  std::pair<void*, unsigned short> id_; ///< \todo Description needed.
876 
877  /// \todo Brief description needed.
878 
879  /// \todo Descriptions needed.
880  /// \param[in,out] task_event Description needed.
881  void set_event(profiling::TaskEvent* task_event) {
882  task_event_ = task_event;
883  }
884 
885  /// Collect info on the task and record the submit time.
886  void submit() {
888  this->get_id(id_);
889  }
890 #endif // MADNESS_TASK_PROFILING
891 
892  /// Object that is used to convert function and member function pointers into `void*`.
893 
894  /// \note This is technically not supported by the C++ standard but
895  /// it will likely not cause any issues here (famous last words?).
896  /// \todo Descriptions needed.
897  /// \tparam T Description needed.
898  template <typename T>
900  T in; ///< \todo Description needed.
901  void* out; ///< \todo Description needed.
902  };
903 
904  protected:
905 
906  /// \todo Brief description needed.
907 
908  /// \todo Descriptions needed.
909  /// \tparam fnT Description needed.
910  /// \param[in,out] id Description needed.
911  /// \param[in] fn Description needed.
912  /// \return Description needed.
913  template <typename fnT>
914  static typename std::enable_if<detail::function_traits<fnT>::value ||
916  make_id(std::pair<void*,unsigned short>& id, fnT fn) {
918  poop.in = fn;
919  id.first = poop.out;
920  id.second = 1ul;
921  }
922 
923  /// \todo Brief description needed.
924 
925  /// \todo Descriptions needed. What is the purpose of the second argument?
926  /// \tparam fnobjT Description needed.
927  /// \param[in,out] id Description needed.
928  template <typename fnobjT>
929  static typename std::enable_if<!(detail::function_traits<fnobjT>::value ||
931  make_id(std::pair<void*,unsigned short>& id, const fnobjT&) {
932  id.first = reinterpret_cast<void*>(const_cast<char*>(typeid(fnobjT).name()));
933  id.second = 2ul;
934  }
935 
936  private:
937 
938  /// \todo Brief description needed.
939 
940  /// \todo Descriptions needed.
941  /// \param[in,out] id Description needed.
942  virtual void get_id(std::pair<void*,unsigned short>& id) const {
943  id.first = nullptr;
944  id.second = 0ul;
945  }
946 
947 #ifndef HAVE_INTEL_TBB
948 
949  Barrier* barrier; ///< Barrier, only allocated for multithreaded tasks.
950  AtomicInt count; ///< Used to count threads as they start.
951 
952  /// Returns true for the one thread that should invoke the destructor.
953 
954  /// \return True for the one thread that should invoke the destructor.
955  bool run_multi_threaded() {
956  // As a thread enters this routine it increments the shared counter
957  // to generate a unique id without needing any thread-local storage.
958  // A downside is this does not preserve any relationships between thread
959  // numbering and the architecture ... more work ahead.
960  int nthread = get_nthread();
961  if (nthread == 1) {
962 #ifdef MADNESS_TASK_PROFILING
963  task_event_->start(id_, nthread, submit_time_);
964 #endif // MADNESS_TASK_PROFILING
965  run(TaskThreadEnv(1,0,0));
966 #ifdef MADNESS_TASK_PROFILING
967  task_event_->stop();
968 #endif // MADNESS_TASK_PROFILING
969  return true;
970  }
971  else {
972  int id = count++;
973  volatile bool barrier_flag;
974  barrier->register_thread(id, &barrier_flag);
975 
976 #ifdef MADNESS_TASK_PROFILING
977  if(id == 0)
978  task_event_->start(id_, nthread, submit_time_);
979 #endif // MADNESS_TASK_PROFILING
980 
981  run(TaskThreadEnv(nthread, id, barrier));
982 
983 #ifdef MADNESS_TASK_PROFILING
984  const bool cleanup = barrier->enter(id);
985  if(cleanup) task_event_->stop();
986  return cleanup;
987 #else
988  return barrier->enter(id);
989 #endif // MADNESS_TASK_PROFILING
990  }
991  }
992 
993  public:
994 
995  /// Default constructor.
997  : TaskAttributes()
998  , barrier(nullptr)
999 #if HAVE_PARSEC
1000  , parsec_task(ParsecRuntime::task(is_high_priority(), this))
1001 #endif
1002  {
1003  count = 0;
1004  }
1005 
1006  /// Constructor setting the specified task attributes.
1007 
1008  /// \param[in] attr The task attributes.
1009  explicit PoolTaskInterface(const TaskAttributes& attr)
1010  : TaskAttributes(attr)
1011  , barrier(attr.get_nthread()>1 ? new Barrier(attr.get_nthread()) : 0)
1012 #if HAVE_PARSEC
1013  , parsec_task(ParsecRuntime::task(is_high_priority(), this))
1014 #endif
1015  {
1016  count = 0;
1017  }
1018 
1019  /// Destructor.
1020  /// \todo Should we either use a unique_ptr for barrier or check that barrier != nullptr here?
1021  virtual ~PoolTaskInterface() {
1022 #if HAVE_PARSEC
1023  *(reinterpret_cast<PoolTaskInterface**>(&(parsec_task->locals[0]))) = nullptr;
1024  ParsecRuntime::delete_parsec_task(parsec_task);
1025  parsec_task = nullptr;
1026 #endif
1027  delete barrier;
1028  }
1029 
1030  /// Call this to reset the number of threads before the task is submitted.
1031 
1032  /// Once a task has been constructed, /c TaskAttributes::set_nthread()
1033  /// is insufficient because a multithreaded task includes a barrier
1034  /// that needs to know the number of threads.
1035  ///
1036  /// \param[in] nthread The new number of threads.
1037  void set_nthread(int nthread) {
1038  if (nthread != get_nthread()) {
1039  TaskAttributes::set_nthread(nthread);
1040  delete barrier;
1041  if (nthread > 1)
1042  barrier = new Barrier(nthread);
1043  else
1044  barrier = 0;
1045  }
1046  }
1047 #if HAVE_PARSEC
1048  //////////// Parsec Related Begin ////////////////////
1049  parsec_task_t *parsec_task;
1050  //////////// Parsec Related End ///////////////////
1051 #endif
1052 
1053 #else
1054 
1055  public:
1056 
1057  /// Default constructor.
1059  }
1060 
1061  /// \todo Brief description needed.
1062 
1063  /// \todo Descriptions needed.
1064  /// \param[in] attr Description needed.
1065  explicit PoolTaskInterface(const TaskAttributes& attr) :
1066  TaskAttributes(attr)
1067  {
1068  }
1069 
1070  /// Destructor.
1071  virtual ~PoolTaskInterface() = default;
1072 
1073  /// Call this to reset the number of threads before the task is submitted
1074 
1075  /// Once a task has been constructed /c TaskAttributes::set_nthread()
1076  /// is insufficient because a multithreaded task includes a
1077  /// barrier that needs to know the number of threads.
1078  void set_nthread(int nthread) {
1079  if (nthread != get_nthread())
1080  TaskAttributes::set_nthread(nthread);
1081  }
1082 
1083  /// \todo Brief description needed.
1084 
1085  /// \todo Descriptions needed.
1086  /// \return Description needed.
1087  void execute() {
1088  const int nthread = get_nthread();
1089  run( TaskThreadEnv(nthread, 0) );
1090  }
1091 
1092 #endif // HAVE_INTEL_TBB
1093 
1094  /// Override this method to implement a multi-threaded task.
1095 
1096  /// \c info.nthread() will be the number of threads collaborating on this task.
1097  ///
1098  /// \c info.id() will be the index of the current thread \c id=0,...,nthread-1.
1099  ///
1100  /// \c info.barrier() will be a barrier for all of the threads, and returns
1101  /// true for the last thread to enter the barrier (other threads get false).
1102  ///
1103  /// \todo Description needed.
1104  /// \param[in] info Description needed.
1105 
1106 
1107  virtual void run(const TaskThreadEnv& info) = 0;
1108 
1109  };
1110 
1111  /// A no-operation task used for various purposes.
1113  public:
1114  /// Execution function that does nothing.
1115  void run(const TaskThreadEnv& /*info*/) {}
1116 
1117  /// Destructor.
1118  virtual ~PoolTaskNull() {}
1119 
1120  private:
1121  /// \todo Brief description needed.
1122 
1123  /// \todo Description needed.
1124  /// \param[in,out] id Description needed.
1125  virtual void get_id(std::pair<void*,unsigned short>& id) const {
1127  }
1128  };
1129 
1130  /// \c ThreadPool thread object.
1131 
1132  /// This class holds thread local data for thread pool threads. It can be
1133  /// accessed via \c ThreadBase::this_thread().
1134  class ThreadPoolThread : public Thread {
1135  private:
1136  // Thread local data for thread pool
1137 #ifdef MADNESS_TASK_PROFILING
1138  profiling::TaskProfiler profiler_; ///< \todo Description needed.
1139 #endif // MADNESS_TASK_PROFILING
1140 
1141  public:
1143  virtual ~ThreadPoolThread() = default;
1144 
1145 #ifdef MADNESS_TASK_PROFILING
1146  /// Task profiler accessor.
1147 
1148  /// \todo Description needed.
1149  /// \return Description needed.
1151  return profiler_;
1152  }
1153 #endif // MADNESS_TASK_PROFILING
1154  };
1155 
1156  /// A singleton pool of threads for dynamic execution of tasks.
1157 
1158  /// \attention You must instantiate the pool while running with just one
1159  /// thread.
1160  class ThreadPool {
1161  public:
1162  // non-copyable and non-movable
1163  ThreadPool(const ThreadPool&) = delete;
1164  ThreadPool(ThreadPool&&) = delete;
1165  void operator=(const ThreadPool&) = delete;
1166  void operator=(ThreadPool&&) = delete;
1167 
1168  /// Get the number of threads from the environment.
1169 
1170  /// \return The number of threads.
1171  static int default_nthread();
1172 
1173  private:
1174  friend class WorldTaskQueue;
1175 
1176  // Thread pool data
1177  ThreadPoolThread *threads; ///< Array of threads.
1178  ThreadPoolThread main_thread; ///< Placeholder for main thread tls.
1179  DQueue<PoolTaskInterface*> queue; ///< Queue of tasks.
1180  int nthreads; ///< Number of threads.
1181  volatile bool finish; ///< Set to true when time to stop.
1182  AtomicInt nfinished; ///< Thread pool exit counter.
1183 
1184  // Static data
1185  static ThreadPool* instance_ptr; ///< Singleton pointer.
1186  static const int nmax = 128; ///< Number of task a worker thread will pop from the task queue
1187  static double await_timeout; ///< Waiter timeout.
1188 
1189 #if defined(HAVE_IBMBGQ) and defined(HPM)
1190  static unsigned int main_hpmctx; ///< HPM context for main thread.
1191 #endif
1192  /// The constructor is private to enforce the singleton model.
1193 
1194  /// \todo Description needed.
1195  /// \param[in] nthread Description needed.
1196  ThreadPool(int nthread=-1);
1197 
1198  /// Run the next task.
1199 
1200  /// \todo Verify and complete this documentation.
1201  /// \param[in] wait Block of true.
1202  /// \param[in,out] this_thread Description needed.
1203  /// \return True if a task was run.
1204  bool run_task(bool wait, ThreadPoolThread* this_thread) {
1205 #if HAVE_INTEL_TBB
1206  MADNESS_EXCEPTION("run_task should not be called when using Intel TBB", 1);
1207 #else
1208 
1209  if (!wait && queue.empty()) return false;
1210  std::pair<PoolTaskInterface*,bool> t = queue.pop_front(wait);
1211 #ifdef MADNESS_TASK_PROFILING
1212  profiling::TaskEventList* event_list =
1213  this_thread->profiler().new_list(1);
1214 #endif // MADNESS_TASK_PROFILING
1215  // Task pointer might be zero due to stealing
1216  if (t.second && t.first) {
1217 #ifdef MADNESS_TASK_PROFILING
1218  t.first->set_event(event_list->event());
1219 #endif // MADNESS_TASK_PROFILING
1220  if (t.first->run_multi_threaded()) // What we are here to do
1221  delete t.first;
1222  }
1223  return t.second;
1224 #endif
1225  }
1226 
1227  /// \todo Brief description needed.
1228 
1229  /// \todo Descriptions needed.
1230  /// \param[in] wait Description needed.
1231  /// \param[in,out] this_thread Description needed.
1232  /// \return Description needed.
1233  bool run_tasks(bool wait, ThreadPoolThread* const this_thread) {
1234 #if HAVE_INTEL_TBB
1235 // if (!wait && tbb_task_list->empty()) return false;
1236 // tbb::task* t = &tbb_task_list->pop_front();
1237 // if (t) {
1238 // tbb_parent_task->increment_ref_count();
1239 // tbb_parent_task->enqueue(*t);
1240 // }
1241 
1242 // wait = (tbb_parent_task->ref_count() >= 1) ? false : true;
1243 // return wait;
1244 
1245  MADNESS_EXCEPTION("run_tasks should not be called when using Intel TBB", 1);
1246 #else
1247 
1248  PoolTaskInterface* taskbuf[nmax];
1249  int ntask = queue.pop_front(nmax, taskbuf, wait);
1250 #ifdef MADNESS_TASK_PROFILING
1251  profiling::TaskEventList* event_list =
1252  this_thread->profiler().new_list(ntask);
1253 #endif // MADNESS_TASK_PROFILING
1254  for (int i=0; i<ntask; ++i) {
1255  if (taskbuf[i]) { // Task pointer might be zero due to stealing
1256 #ifdef MADNESS_TASK_PROFILING
1257  taskbuf[i]->set_event(event_list->event());
1258 #endif // MADNESS_TASK_PROFILING
1259  if (taskbuf[i]->run_multi_threaded()) {
1260  delete taskbuf[i];
1261  }
1262  }
1263  }
1264 #if HAVE_PARSEC
1265  ////////////////// Parsec Related Begin //////////////////
1266  if(0 == ntask) {
1267  ntask = parsec_runtime->test();
1268  }
1269  ///////////////// Parsec Related End ////////////////////
1270 #endif
1271  return (ntask>0);
1272 #endif
1273  }
1274 
1275  /// \todo Brief description needed.
1276 
1277  /// \todo Description needed.
1278  /// \param[in,out] thread Description needed.
1279  void thread_main(ThreadPoolThread* const thread);
1280 
1281  /// Forwards the thread to bound member function.
1282 
1283  /// \todo Descriptions needed.
1284  /// \param[in] v Description needed.
1285  /// \return Description needed.
1286  static void* pool_thread_main(void *v);
1287 
1288  public:
1289  /// Return a pointer to the only instance, constructing as necessary.
1290 
1291  /// \return A pointer to the only instance.
1292  static ThreadPool* instance() {
1293 #ifndef MADNESS_ASSERTIONS_DISABLE
1294  if(! instance_ptr) {
1295  std::cerr << "!!! ERROR: The thread pool has not been initialized.\n"
1296  << "!!! ERROR: Call madness::initialize before submitting tasks to the task queue.\n";
1297  MADNESS_EXCEPTION("ThreadPool::instance_ptr is NULL", 0);
1298  }
1299 #endif
1300  return instance_ptr;
1301  }
1302 
1303  void flush_prebuf() {
1304 #if !(defined(HAVE_INTEL_TBB) || defined(HAVE_PARSEC))
1305  queue.lock_and_flush_prebuf();
1306 #endif
1307  }
1308 
1309 #if HAVE_PARSEC
1310  ////////////////// Parsec Related Begin //////////////////
1311  static ParsecRuntime *parsec_runtime;
1312  ///////////////// Parsec Related End ////////////////////
1313 #endif
1314 
1315 #if HAVE_INTEL_TBB
1316  static std::unique_ptr<tbb::global_control> tbb_control; ///< \todo Description needed.
1317  static std::unique_ptr<tbb::task_arena> tbb_arena;
1318 #endif
1319 
1320  /// Please invoke while in a single-threaded environment.
1321 
1322  /// \todo Verify documentation.
1323  /// \param[in] nthread The number of threads.
1324  static void begin(int nthread=-1);
1325 
1326  /// \todo Description needed.
1327  static void end();
1328 
1329  /// Add a new task to the pool.
1330 
1331  /// \todo Description needed.
1332  /// \param[in,out] task Description needed.
1333  static void add(PoolTaskInterface* task) {
1334 #ifdef MADNESS_TASK_PROFILING
1335  task->submit();
1336 #endif // MADNESS_TASK_PROFILING
1337 
1338 #if HAVE_PARSEC
1339  //////////// Parsec Related Begin ////////////////////
1340  parsec_runtime->schedule(task);
1341  //////////// Parsec Related End ////////////////////
1342 #elif HAVE_INTEL_TBB
1343 //#ifdef MADNESS_CAN_USE_TBB_PRIORITY
1344 // if(task->is_high_priority())
1345 // tbb::task::enqueue(*task, tbb::priority_high);
1346 // else
1347 //#endif // MADNESS_CAN_USE_TBB_PRIORITY
1348  tbb_arena->enqueue(
1349  //use unique_ptr to automatically delete task ptr
1350  [task_p = std::unique_ptr<PoolTaskInterface>(task)] () noexcept {
1351  //exceptions are not expected here, as nobody will catch them for enqueued tasks
1352  task_p->execute();
1353  });
1354 #else
1355  if (!task) MADNESS_EXCEPTION("ThreadPool: inserting a NULL task pointer", 1);
1356  int task_threads = task->get_nthread();
1357  // Currently multithreaded tasks must be shoved on the end of the q
1358  // to avoid a race condition as multithreaded task is starting up
1359  if (task->is_high_priority() && (task_threads == 1)) {
1360  instance()->queue.push_front(task);
1361  }
1362  else {
1363  instance()->queue.push_back(task, task_threads);
1364  }
1365 #endif // HAVE_INTEL_TBB
1366  }
1367 
1368  /// \todo Brief description needed.
1369 
1370  /// \todo Descriptions needed.
1371  /// \tparam opT Description needed.
1372  /// \param[in,out] op Description needed.
1373  template <typename opT>
1374  void scan(opT& op) {
1375  queue.scan(op);
1376  }
1377 
1378  /// Add a vector of tasks to the pool.
1379 
1380  /// \param[in] tasks Vector of tasks to add to the pool.
1381  static void add(const std::vector<PoolTaskInterface*>& tasks) {
1382 #if HAVE_INTEL_TBB
1383  MADNESS_EXCEPTION("Do not add tasks to the madness task queue when using Intel TBB.", 1);
1384 #else
1385  typedef std::vector<PoolTaskInterface*>::const_iterator iteratorT;
1386  for (iteratorT it=tasks.begin(); it!=tasks.end(); ++it) {
1387  add(*it);
1388  }
1389 #endif
1390  }
1391 
1392  /// An otherwise idle thread can all this to run a task.
1393 
1394  /// \return True if a task was run.
1395  static bool run_task() {
1396 #ifdef HAVE_INTEL_TBB
1397  return false;
1398 #else
1399 
1400 #ifdef MADNESS_TASK_PROFILING
1401  ThreadPoolThread* const thread = static_cast<ThreadPoolThread*>(ThreadBase::this_thread());
1402 #else
1403  ThreadPoolThread* const thread = nullptr;
1404 #endif // MADNESS_TASK_PROFILING
1405 
1406  return instance()->run_tasks(false, thread);
1407 #endif // HAVE_INTEL_TBB
1408  }
1409 
1410  /// Returns the number of threads in the pool.
1411 
1412  /// \return The number of threads in the pool.
1413  static std::size_t size() {
1414  return instance()->nthreads;
1415  }
1416 
1417  /// Returns the number of tasks in the queue.
1418 
1419  /// \return The number of tasks in the queue.
1420  static std::size_t queue_size() {
1421  return instance()->queue.size();
1422  }
1423 
1424  /// Returns queue statistics.
1425 
1426  /// \return Queue statistics.
1427  static const DQStats& get_stats();
1428 
1429  /// Access the pool thread array
1430  /// \return ptr to the pool thread array, its size is given by \c size()
1431  static const ThreadPoolThread* get_threads() {
1432  return const_cast<const ThreadPoolThread*>(instance()->threads);
1433  }
1434 
1435  /// Gracefully wait for a condition to become true, executing any tasks in the queue.
1436 
1437  /// Probe should be an object that, when called, returns the status.
1438  /// \todo Descriptions needed/need verification.
1439  /// \tparam Probe Type of the probe.
1440  /// \param[in] probe The probe.
1441  /// \param[in] dowork Do work while waiting - default is true
1442  /// \param[in] sleep Sleep instead of spin while waiting (e.g., to avoid pounding on MPI) - default is false
1443  template <typename Probe>
1444  static void await(const Probe& probe, bool dowork = true, bool sleep = false) {
1445  if (!probe()) {
1446  double start = cpu_time();
1447  const double timeout = await_timeout;
1448  int counter = 0;
1449 
1450  MutexWaiter waiter;
1451  while (!probe()) {
1452 
1453  const bool working = (dowork ? ThreadPool::run_task() : false);
1454  const double current_time = cpu_time();
1455 
1456  if (working) { // Reset timeout logic
1457  waiter.reset();
1458  start = current_time;
1459  counter = 0;
1460  } else {
1461  if(((current_time - start) > timeout) && (timeout > 1.0)) { // Check for timeout
1462  std::cerr << "!!MADNESS: Hung queue?" << std::endl;
1463  if (counter++ > 3) {
1464  const long bufsize=256;
1465  char errstr[bufsize];
1466  snprintf(errstr,bufsize, "ThreadPool::await() timed out after %.1lf seconds", timeout);
1467  throw madness::MadnessException(errstr, 0, 1,
1468  __LINE__, __FUNCTION__,
1469  __FILE__);
1470  }
1471  }
1472  if (sleep) {
1473  // THIS NEEDS TO BECOME AN EXTERNAL PARAMETER
1474  // Problem is exacerbated when running with many
1475  // (e.g., 512 or more) send/recv buffers, and
1476  // also with many threads. More outstanding
1477  // requests means each call into MPI takes
1478  // longer and more threads means more calls in
1479  // spots where all threads are messaging. Old
1480  // code was OK on dancer.icl.utk.edu with just
1481  // 32 bufs and 20 threads, but 512 bufs caused
1482  // intermittent hangs I think due to something
1483  // not being able to make progress or general
1484  // confusion (this with MPICH) ... maybe using a
1485  // fair mutex somewhere would help.
1486  //
1487  // 100us is a long time ... will try 10us. mmm ... perhaps need 100 at least on dancer with 17 threads per node
1488  myusleep(100);
1489  }
1490  else {
1491  waiter.wait();
1492  }
1493  }
1494  }
1495  } // if !probe()
1496  }
1497 
1498  /// Destructor.
1500 #if HAVE_PARSEC
1501  ////////////////// Parsec related Begin /////////////////
1502  delete parsec_runtime;
1503  ////////////////// Parsec related End /////////////////
1504 #elif HAVE_INTEL_TBB
1505 #else
1506  delete[] threads;
1507 #endif
1508  }
1509 
1510  /// \sa madness::threadpool_wait_policy
1511  static void set_wait_policy(
1512  WaitPolicy policy,
1513  int sleep_duration_in_microseconds = 0) {
1514 #if !HAVE_INTEL_TBB && !HAVE_PARSEC
1515  instance()->queue.set_wait_policy(policy,
1516  sleep_duration_in_microseconds);
1517 #endif
1518  }
1519 
1520  };
1521 
1522  // clang-format off
1523  /// Controls how aggressively ThreadPool holds on to the OS threads
1524  /// while waiting for work. Currently useful only for Pthread pool when it's using spinlocks;
1525  /// NOT used for TBB or PaRSEC.
1526  /// \param policy specifies how to wait for work;
1527  /// - WaitPolicy::Busy -- threads are kept busy (default); recommended when intensive work is only performed by MADNESS threads
1528  /// - WaitPolicy::Yield -- thread yields; recommended when intensive work is performed primarily by non-MADNESS threads
1529  /// - WaitPolicy::Sleep -- thread sleeps for \p sleep_duration_in_microseconds ; recommended when intensive work is performed by MADNESS nd non-MADNESS threads
1530  /// \param sleep_duration_in_microseconds if `policy==WaitPolicy::Sleep` this specifies the duration of sleep, in microseconds
1531  // clang-format on
1533  int sleep_duration_in_microseconds = 0) {
1534  ThreadPool::set_wait_policy(policy, sleep_duration_in_microseconds);
1535  }
1536 
1537  /// @}
1538 }
1539 
1540 #endif // MADNESS_WORLD_THREAD_H__INCLUDED
An integer with atomic set, get, read+increment, read+decrement, and decrement+test operations.
Definition: atomicint.h:126
Definition: worldmutex.h:700
bool enter(const int id)
Each thread calls this with its id (0,..,nthread-1) to enter the barrier.
Definition: worldmutex.h:729
void register_thread(int id, volatile bool *pflag)
Each thread calls this once before first use.
Definition: worldmutex.h:718
A thread safe, fast but simple doubled-ended queue.
Definition: dqueue.h:80
Base class for exceptions thrown in MADNESS.
Definition: madness_exception.h:66
Definition: worldmutex.h:109
void wait()
Definition: worldmutex.cc:103
void reset()
Definition: worldmutex.h:124
Mutex using pthread mutex operations.
Definition: worldmutex.h:131
Lowest level task interface.
Definition: thread.h:867
double submit_time_
Definition: thread.h:874
void execute()
Definition: thread.h:1087
static std::enable_if< detail::function_traits< fnT >::value||detail::memfunc_traits< fnT >::value >::type make_id(std::pair< void *, unsigned short > &id, fnT fn)
Definition: thread.h:916
static std::enable_if<!(detail::function_traits< fnobjT >::value||detail::memfunc_traits< fnobjT >::value) >::type make_id(std::pair< void *, unsigned short > &id, const fnobjT &)
Definition: thread.h:931
virtual ~PoolTaskInterface()=default
Destructor.
profiling::TaskEvent * task_event_
Definition: thread.h:873
PoolTaskInterface()
Default constructor.
Definition: thread.h:1058
virtual void run(const TaskThreadEnv &info)=0
Override this method to implement a multi-threaded task.
PoolTaskInterface(const TaskAttributes &attr)
Definition: thread.h:1065
std::pair< void *, unsigned short > id_
Definition: thread.h:875
void set_event(profiling::TaskEvent *task_event)
Definition: thread.h:881
void submit()
Collect info on the task and record the submit time.
Definition: thread.h:886
virtual void get_id(std::pair< void *, unsigned short > &id) const
Definition: thread.h:942
void set_nthread(int nthread)
Call this to reset the number of threads before the task is submitted.
Definition: thread.h:1078
A no-operation task used for various purposes.
Definition: thread.h:1112
void run(const TaskThreadEnv &)
Execution function that does nothing.
Definition: thread.h:1115
virtual ~PoolTaskNull()
Destructor.
Definition: thread.h:1118
virtual void get_id(std::pair< void *, unsigned short > &id) const
Definition: thread.h:1125
Contains attributes of a task.
Definition: thread.h:323
TaskAttributes(const TaskAttributes &attr)
Copy constructor.
Definition: thread.h:343
bool is_generator() const
Test if the generator attribute is true.
Definition: thread.h:351
TaskAttributes & set_generator(bool generator_hint)
Sets the generator attribute.
Definition: thread.h:372
static TaskAttributes hipri()
Definition: thread.h:450
TaskAttributes & set_stealable(bool stealable)
Sets the stealable attribute.
Definition: thread.h:383
static const unsigned long GENERATOR
Mask for generator bit.
Definition: thread.h:328
static const unsigned long STEALABLE
Mask for stealable bit.
Definition: thread.h:329
bool is_stealable() const
Test if the stealable attribute is true.
Definition: thread.h:358
TaskAttributes(unsigned long flags=0)
Sets the attributes to the desired values.
Definition: thread.h:337
static const unsigned long NTHREAD
Mask for nthread byte.
Definition: thread.h:327
void set_nthread(int nthread)
Set the number of threads.
Definition: thread.h:414
static TaskAttributes generator()
Definition: thread.h:442
bool is_high_priority() const
Test if the high priority attribute is true.
Definition: thread.h:365
static TaskAttributes multi_threaded(int nthread)
Definition: thread.h:458
void serialize(Archive &ar)
Serializes the attributes for I/O.
Definition: thread.h:434
unsigned long flags
Byte-string storing the specified attributes.
Definition: thread.h:324
int get_nthread() const
Get the number of threads.
Definition: thread.h:422
static const unsigned long HIGHPRIORITY
Mask for priority bit.
Definition: thread.h:330
TaskAttributes & set_highpriority(bool hipri)
Sets the high priority attribute.
Definition: thread.h:392
virtual ~TaskAttributes()
Definition: thread.h:346
Used to pass information about the thread environment to a user's task.
Definition: thread.h:466
int id() const
Get the ID of this thread.
Definition: thread.h:506
TaskThreadEnv(int nthread, int id, Barrier *barrier)
Constructor collecting necessary environmental information.
Definition: thread.h:478
TaskThreadEnv(int nthread, int id)
Constructor collecting necessary environmental information.
Definition: thread.h:491
bool barrier() const
Definition: thread.h:514
Barrier * _barrier
Pointer to the shared barrier, null if there is only a single thread.
Definition: thread.h:469
const int _nthread
Number of threads collaborating on task.
Definition: thread.h:467
int nthread() const
Get the number of threads collaborating on this task.
Definition: thread.h:499
const int _id
ID of this thread (0,...,nthread-1).
Definition: thread.h:468
Simplified thread wrapper to hide pthread complexity.
Definition: thread.h:164
virtual void run()=0
Function to be executed by the thread.
int pool_num
Stores index of thread in pool or -1.
Definition: thread.h:176
void set_pool_thread_index(int i)
Sets the index of this thread within the pool.
Definition: thread.h:195
static void exit()
A thread can call this to terminate its execution.
Definition: thread.h:226
ThreadBase()
Default constructor.
Definition: thread.h:213
static pthread_key_t thread_key
Thread id key.
Definition: thread.h:167
int cancel() const
Cancel this thread.
Definition: thread.h:243
pthread_t id
Definition: thread.h:177
static void delete_thread_key()
Definition: thread.h:187
static int num_hw_processors()
Get number of actual hardware processors.
Definition: thread.cc:174
void start()
Start the thread running.
Definition: thread.cc:158
virtual ~ThreadBase()
Definition: thread.h:215
static void init_thread_key()
Definition: thread.h:180
const pthread_t & get_id() const
Get the pthread id of this thread (if running).
Definition: thread.h:231
static void * main(void *self)
Definition: thread.cc:92
static ThreadBase * this_thread()
Definition: thread.h:257
int get_pool_thread_index() const
Get index of this thread in ThreadPool.
Definition: thread.h:238
Definition: thread.h:97
bool do_bind
Definition: thread.h:101
void bind()
Definition: thread.h:137
const size_t * get_cpus() const
Definition: thread.h:133
ThreadBinder(bool print=false)
Definition: thread.h:108
size_t cpus[maxncpu]
Definition: thread.h:102
std::atomic< size_t > nextcpu
Definition: thread.h:103
bool print
Definition: thread.h:99
static const size_t maxncpu
Definition: thread.h:98
static thread_local bool bound
Definition: thread.h:104
void set_do_bind(bool value)
Definition: thread.h:131
size_t ncpu
Definition: thread.h:100
const size_t get_ncpu() const
Definition: thread.h:135
ThreadPool thread object.
Definition: thread.h:1134
ThreadPoolThread()
Definition: thread.h:1142
profiling::TaskProfiler profiler_
Definition: thread.h:1138
virtual ~ThreadPoolThread()=default
profiling::TaskProfiler & profiler()
Task profiler accessor.
Definition: thread.h:1150
A singleton pool of threads for dynamic execution of tasks.
Definition: thread.h:1160
static void add(PoolTaskInterface *task)
Add a new task to the pool.
Definition: thread.h:1333
int nthreads
Number of threads.
Definition: thread.h:1180
static std::unique_ptr< tbb::task_arena > tbb_arena
Definition: thread.h:1317
static bool run_task()
An otherwise idle thread can all this to run a task.
Definition: thread.h:1395
static void add(const std::vector< PoolTaskInterface * > &tasks)
Add a vector of tasks to the pool.
Definition: thread.h:1381
ThreadPool(ThreadPool &&)=delete
void operator=(ThreadPool &&)=delete
volatile bool finish
Set to true when time to stop.
Definition: thread.h:1181
ThreadPool(const ThreadPool &)=delete
static void set_wait_policy(WaitPolicy policy, int sleep_duration_in_microseconds=0)
Definition: thread.h:1511
static int default_nthread()
Get the number of threads from the environment.
Definition: thread.cc:325
static const DQStats & get_stats()
Returns queue statistics.
Definition: thread.cc:466
static void end()
Definition: thread.cc:437
static ThreadPool * instance()
Return a pointer to the only instance, constructing as necessary.
Definition: thread.h:1292
AtomicInt nfinished
Thread pool exit counter.
Definition: thread.h:1182
void operator=(const ThreadPool &)=delete
static ThreadPool * instance_ptr
Singleton pointer.
Definition: thread.h:1185
ThreadPoolThread * threads
Array of threads.
Definition: thread.h:1177
void thread_main(ThreadPoolThread *const thread)
Definition: thread.cc:349
~ThreadPool()
Destructor.
Definition: thread.h:1499
void scan(opT &op)
Definition: thread.h:1374
void flush_prebuf()
Definition: thread.h:1303
static void begin(int nthread=-1)
Please invoke while in a single-threaded environment.
Definition: thread.cc:379
static double await_timeout
Waiter timeout.
Definition: thread.h:1187
DQueue< PoolTaskInterface * > queue
Queue of tasks.
Definition: thread.h:1179
bool run_tasks(bool wait, ThreadPoolThread *const this_thread)
Definition: thread.h:1233
static std::size_t queue_size()
Returns the number of tasks in the queue.
Definition: thread.h:1420
ThreadPoolThread main_thread
Placeholder for main thread tls.
Definition: thread.h:1178
static const int nmax
Number of task a worker thread will pop from the task queue.
Definition: thread.h:1186
bool run_task(bool wait, ThreadPoolThread *this_thread)
Run the next task.
Definition: thread.h:1204
static void * pool_thread_main(void *v)
Forwards the thread to bound member function.
Definition: thread.cc:374
static void await(const Probe &probe, bool dowork=true, bool sleep=false)
Gracefully wait for a condition to become true, executing any tasks in the queue.
Definition: thread.h:1444
static const ThreadPoolThread * get_threads()
Definition: thread.h:1431
static std::unique_ptr< tbb::global_control > tbb_control
Definition: thread.h:1316
static std::size_t size()
Returns the number of threads in the pool.
Definition: thread.h:1413
Simplified thread wrapper to hide pthread complexity.
Definition: thread.h:271
void *(* f)(void *)
The function called for executing this thread.
Definition: thread.h:272
void start(void *(*f)(void *), void *args=nullptr)
Start the thread by running f(args).
Definition: thread.h:299
virtual ~Thread()=default
Thread(void *(*f)(void *), void *args=nullptr)
Create a thread and start it running f(args).
Definition: thread.h:290
Thread()
Default constructor.
Definition: thread.h:284
void run()
Invokes the function for this thread.
Definition: thread.h:276
void * args
The arguments passed to this thread for execution.
Definition: thread.h:273
Multi-threaded queue to manage and run tasks.
Definition: world_task_queue.h:319
Task event list base class.
Definition: thread.h:686
virtual ~TaskEventListBase()=default
Virtual destructor.
TaskEventListBase * next_
The next task event in the list.
Definition: thread.h:688
friend std::ostream & operator<<(std::ostream &os, const TaskEventListBase &tel)
Output a task event list to an output stream.
Definition: thread.h:723
virtual std::ostream & print_events(std::ostream &) const =0
Print the events.
TaskEventListBase & operator=(const TaskEventListBase &)=delete
TaskEventListBase()
Default constructor.
Definition: thread.h:696
TaskEventListBase * next() const
Get the next event list in the linked list.
Definition: thread.h:705
void insert(TaskEventListBase *list)
Insert list after this list.
Definition: thread.h:712
TaskEventListBase(const TaskEventListBase &)=delete
A list of task events.
Definition: thread.h:737
TaskEvent * event()
Get a new event from this list.
Definition: thread.h:765
TaskEventList(const unsigned int nmax)
Default constructor.
Definition: thread.h:752
virtual std::ostream & print_events(std::ostream &os) const
Print events recorded in this list.
Definition: thread.h:775
TaskEventList & operator=(const TaskEventList &)=delete
unsigned int n_
The number of events recorded.
Definition: thread.h:739
TaskEventList(const TaskEventList &)=delete
virtual ~TaskEventList()=default
Virtual destructor.
std::unique_ptr< TaskEvent[]> events_
The event array.
Definition: thread.h:740
Task event class.
Definition: thread.h:533
std::string get_name() const
Get name of the function pointer.
Definition: thread.h:571
void start(const std::pair< void *, unsigned short > &id, const unsigned short threads, const double submit_time)
Record the start time of the task and collect task information.
Definition: thread.h:618
static void print_demangled(std::ostream &os, const char *symbol)
Print demangled symbol name.
Definition: thread.h:547
unsigned short threads_
Number of threads used by the task.
Definition: thread.h:537
friend std::ostream & operator<<(std::ostream &os, const TaskEvent &te)
Output the task data using a tab-separated list.
Definition: thread.h:645
void stop()
Record the stop time of the task.
Definition: thread.h:628
double times_[3]
Task trace times: { submit, start, stop }.
Definition: thread.h:535
std::pair< void *, unsigned short > id_
Task identification information.
Definition: thread.h:536
This class collects and prints task profiling data.
Definition: thread.h:789
TaskEventListBase * head_
The head of the linked list of data.
Definition: thread.h:791
TaskProfiler()
Default constructor.
Definition: thread.h:809
void write_to_file()
Write the profile data to file.
Definition: thread.cc:213
~TaskProfiler()
Destructor.
Definition: thread.h:814
TaskProfiler & operator=(const TaskProfiler &)=delete
TaskEventListBase * tail_
The tail of the linked list of data.
Definition: thread.h:792
TaskEventList * new_list(const std::size_t nmax)
Create a new task event list.
Definition: thread.h:829
TaskProfiler(const TaskProfiler &)=delete
static const char * output_file_name_
The output file name.
Definition: thread.h:805
static Mutex output_mutex_
Mutex used to lock the output file.
Definition: thread.h:794
const std::size_t bufsize
Definition: derivatives.cc:16
real_function_3d mask
Definition: dirac-hatom.cc:27
Implements DQueue.
auto T(World &world, response_space &f) -> response_space
Definition: global_functions.cc:34
void threadpool_wait_policy(WaitPolicy policy, int sleep_duration_in_microseconds=0)
Definition: thread.h:1532
static const double v
Definition: hatom_sf_dirac.cc:20
Tensor< double > op(const Tensor< double > &x)
Definition: kain.cc:508
#define MADNESS_CHECK(condition)
Check a condition — even in a release build the condition is always evaluated so it can have side eff...
Definition: madness_exception.h:190
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition: madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition: madness_exception.h:134
File holds all helper structures necessary for the CC_Operator and CC2 class.
Definition: DFParameters.h:10
WaitPolicy
wait policies supported by ConditionVariable/DQueue/ThreadPool
Definition: worldmutex.h:492
static double cpu_time()
Returns the cpu time in seconds relative to an arbitrary origin.
Definition: timers.h:127
ThreadBinder binder
Definition: thread.cc:71
static void myusleep(unsigned int us)
Sleep or spin for specified number of microseconds.
Definition: timers.h:164
void error(const char *msg)
Definition: world.cc:139
double wall_time()
Returns the wall time in seconds relative to an arbitrary origin.
Definition: timers.cc:48
std::string type(const PairType &n)
Definition: PNOParameters.h:18
std::string name(const FuncType &type, const int ex=-1)
Definition: ccpairfunction.h:28
Definition: dqueue.h:59
Function traits in the spirit of boost function traits.
Definition: function_traits.h:13
Member function traits in the spirit of boost function traits.
Definition: function_traits.h:21
static double current_time
Definition: tdse1d.cc:160
int task(int i)
Definition: test_runtime.cpp:4
const char * status[2]
Definition: testperiodic.cc:43
Implements thread introspection.
Object that is used to convert function and member function pointers into void*.
Definition: thread.h:899