MADNESS 0.10.1
thread.h
Go to the documentation of this file.
1/*
2 This file is part of MADNESS.
3
4 Copyright (C) 2007,2010 Oak Ridge National Laboratory
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
20 For more information please contact:
21
22 Robert J. Harrison
23 Oak Ridge National Laboratory
24 One Bethel Valley Road
25 P.O. Box 2008, MS-6367
26
27 email: harrisonrj@ornl.gov
28 tel: 865-241-3937
29 fax: 865-572-0680
30*/
31
32#ifndef MADNESS_WORLD_THREAD_H__INCLUDED
33#define MADNESS_WORLD_THREAD_H__INCLUDED
34
35/**
36 \file thread.h
37 \brief Implements Dqueue, Thread, ThreadBase and ThreadPool.
38 \ingroup threads
39*/
40
44#include <vector>
45#include <cstddef>
46#include <cstdio>
47#include <pthread.h>
48
49#include <functional>
50#include <type_traits>
51#include <typeinfo>
52#include <new>
53
54//////////// Parsec Related Begin ////////////////////
55#ifdef HAVE_PARSEC
56#include "parsec.h"
57#endif
58//////////// Parsec Related End ////////////////////
59
60#ifdef MADNESS_TASK_PROFILING
61#include <execinfo.h> // for backtrace_symbols
62#ifndef USE_LIBIBERTY
63#include <cxxabi.h> // for abi::__cxa_demangle
64#else
65extern "C" {
66 extern char * cplus_demangle (const char *mangled, int options);
67#define DMGL_NO_OPTS 0 /* For readability... */
68}
69#endif
70#include <sstream> // for std::istringstream
71#include <cstring> // for strchr & strrchr
72#endif // MADNESS_TASK_PROFILING
73
74#ifdef HAVE_INTEL_TBB
75#include <tbb/task_arena.h>
76#ifndef TBB_PREVIEW_GLOBAL_CONTROL
77# define TBB_PREVIEW_GLOBAL_CONTROL 1
78#endif
79# include <tbb/global_control.h>
80#endif
81
82
83#ifndef _SC_NPROCESSORS_CONF
84// Old macs don't have necessary support thru sysconf to determine the
85// no. of processors so must use sysctl
86#include <sys/types.h>
87#include <sys/sysctl.h>
88#endif
89
90namespace madness {
91
92 // Forward decls.
93 class Barrier;
94 class ThreadPool;
95 class WorldTaskQueue;
96 class AtomicInt;
97 void error(const char *msg);
98
99 /// purges tasks from this thread (if any) so that it's safe to make blocking calls from it
100 /// @note this is only needed for the Pthreads backend
101 inline void thread_purge();
102
104 static const size_t maxncpu = 1024;
105 bool print;
106 size_t ncpu = 0;
107 bool do_bind = true;
108 size_t cpus[maxncpu];
109 std::atomic<size_t> nextcpu = 0;
110 static thread_local bool bound;
111
112 public:
113
114 ThreadBinder(bool print = false) : print(print) {
115#ifndef ON_A_MAC
116 ncpu = 0;
117 cpu_set_t mask;
118 sched_getaffinity(0, sizeof(mask), &mask);
119 for (size_t i=0; i<maxncpu; i++) {
120 if (CPU_ISSET(int(i),&mask)) {
122 cpus[ncpu++] = i;
123 }
124 }
125 if (print) {
126 std::cout << "ncpu: " << get_ncpu() << std::endl;
127 for (size_t i=0; i<get_ncpu(); i++) {
128 std::cout << get_cpus()[i] << " " ;
129 }
130 std::cout << std::endl;
131 }
132 nextcpu = ncpu/2;
133#endif
134 if (this->print) { };
135 }
136
137 void set_do_bind(bool value) {do_bind = value;}
138
139 const size_t* get_cpus() const { return cpus; }
140
141 const size_t get_ncpu() const { return ncpu; }
142
143 void bind() {
144#ifndef ON_A_MAC
145 if (do_bind && !bound) { // In TBB this is called by each task, so check do_bind first
146 bound = true;
147 cpu_set_t mask;
148 CPU_ZERO(&mask);
149 size_t cpu = cpus[nextcpu++ % ncpu];
150 CPU_SET(cpu, &mask);
151 sched_setaffinity(0, sizeof(mask), &mask);
152 if (print) std::cout << "bound thread to " << cpu << std::endl;
153 }
154#endif
155 }
156 };
157
158 extern ThreadBinder binder;
159
160 /// \addtogroup threads
161 /// @{
162
163 /// Simplified thread wrapper to hide pthread complexity.
164
165 /// If the thread is using any of the object state, you cannot
166 /// delete the object until the thread has terminated.
167 ///
168 /// The cleanest solution is to put the object on the heap and
169 /// have the run method `delete this` at its end.
171 friend class ThreadPool;
172
173 static pthread_key_t thread_key; ///< Thread id key.
174
175 /// \todo Brief description needed.
176
177 /// \todo Descriptions needed.
178 /// \param[in,out] self Description needed.
179 /// \return Description needed.
180 static void* main(void* self);
181
182 int pool_num; ///< Stores index of thread in pool or -1.
183 pthread_t id; ///< \todo Brief description needed.
184
185 /// \todo Brief description needed.
186 static void init_thread_key() {
187 const int rc = pthread_key_create(&thread_key, nullptr);
188 if(rc != 0)
189 MADNESS_EXCEPTION("pthread_key_create failed", rc);
190 }
191
192 /// \todo Brief description needed.
193 static void delete_thread_key() {
194 pthread_key_delete(thread_key);
195 }
196
197 /// Sets the index of this thread within the pool.
198
199 /// \todo Verify documentation.
200 /// \param[in] i The index of this thread.
202 pool_num = i;
203 }
204
205#if defined(HAVE_IBMBGQ) and defined(HPM)
206 static const int hpm_thread_id_all = -10; ///< \todo Brief description needed.
207 static const int hpm_thread_id_main = -2; ///< \todo Brief description needed.
208 static bool main_instrumented; ///< \todo Brief description needed.
209 static bool all_instrumented; ///< \todo Brief description needed.
210 static int hpm_thread_id; ///< \todo Brief description needed.
211#endif
212
213 public:
214
215 /// Default constructor.
216
217 /// Sets up the thread; however, \c start() must be invoked to
218 /// actually begin the thread.
220
221 virtual ~ThreadBase() { }
222
223 /// Function to be executed by the thread.
224
225 /// Override this to do work.
226 virtual void run() = 0;
227
228 /// Start the thread running.
229 void start();
230
231 /// A thread can call this to terminate its execution.
232 static void exit() {
233 pthread_exit(0);
234 }
235
236 /// Get the pthread id of this thread (if running).
237 const pthread_t& get_id() const {
238 return id;
239 }
240
241 /// Get index of this thread in \c ThreadPool.
242
243 /// \return (0,...,nthread-1) or -1 if not in the \c ThreadPool.
245 return pool_num;
246 }
247
248 /// Cancel this thread.
249 int cancel() const {
250 return pthread_cancel(get_id());
251 }
252
253
254 /// Get number of actual hardware processors.
255
256 /// \return The number of hardward processors.
257 static int num_hw_processors();
258
259 /// \todo Brief description needed.
260
261 /// \todo Descriptions needed.
262 /// \return Description needed.
264 return static_cast<ThreadBase*>(pthread_getspecific(thread_key));
265 }
266
267#if defined(HAVE_IBMBGQ) and defined(HPM)
268 /// \todo Brief description needed.
269
270 /// \todo Descriptions needed.
271 /// \param[in] hpm_thread_id Description needed.
272 static void set_hpm_thread_env(int hpm_thread_id);
273#endif
274 }; // class ThreadBase
275
276 /// Simplified thread wrapper to hide pthread complexity.
277 class Thread : public ThreadBase {
278 void* (*f)(void *); ///< The function called for executing this thread. \todo should we replace this by a std::function?
279 void* args; ///< The arguments passed to this thread for execution.
280
281 /// Invokes the function for this thread.
282 void run() {
283 f(args);
284 }
285
286 public:
287 /// Default constructor.
288
289 /// \c start() must be invoked to actually execute the thread.
290 Thread() : f(nullptr), args(nullptr) { }
291
292 /// Create a thread and start it running `f(args)`.
293
294 /// \param[in] f The function to be called.
295 /// \param[in,out] args The arguments to the function.
296 Thread(void* (*f)(void *), void* args=nullptr)
297 : f(f), args(args) {
299 }
300
301 /// Start the thread by running `f(args)`.
302
303 /// \param[in] f The function to be called.
304 /// \param[in,out] args The arguments to the function.
305 void start(void* (*f)(void *), void* args=nullptr) {
306 this->f = f;
307 this->args = args;
309 }
310
311 virtual ~Thread() = default;
312 }; // class Thread
313
314
315 /// Contains attributes of a task.
316
317 /// The current attributes are:
318 /// - \c generator : Setting this hints that a task will produce
319 /// additional tasks and is used by the scheduler to
320 /// increase/throttle parallelism. The default is false.
321 /// - \c stealable : Setting this indicates that a task may be
322 /// migrated to another process for dynamic load balancing. The
323 /// default value is false.
324 /// - \c highpriority : indicates a high priority task. The default
325 /// value is false.
326 /// - \c nthread : indicates number of threads. 0 threads is interpreted
327 /// as 1 thread for backward compatibility and ease of specifying
328 /// defaults. The default value is 0 (==1).
330 unsigned long flags; ///< Byte-string storing the specified attributes.
331
332 public:
333 static const unsigned long NTHREAD = 0xff; ///< Mask for nthread byte.
334 static const unsigned long GENERATOR = 1ul<<8; ///< Mask for generator bit.
335 static const unsigned long STEALABLE = GENERATOR<<1; ///< Mask for stealable bit.
336 static const unsigned long HIGHPRIORITY = GENERATOR<<2; ///< Mask for priority bit.
337
338 /// Sets the attributes to the desired values.
339
340 /// `flags`, if unspecified sets all attributes to their default
341 /// values.
342 /// \param[in] flags The attribute values.
343 explicit TaskAttributes(unsigned long flags = 0)
344 : flags(flags) {}
345
346 /// Copy constructor.
347
348 /// \param[in] attr The attributes to copy.
350 : flags(attr.flags) {}
351
352 virtual ~TaskAttributes() {}
353
354 /// Test if the generator attribute is true.
355
356 /// \return True if this task is a generator, false otherwise.
357 bool is_generator() const {
358 return flags&GENERATOR;
359 }
360
361 /// Test if the stealable attribute is true.
362
363 /// \return True if this task is stealable, false otherwise.
364 bool is_stealable() const {
365 return flags&STEALABLE;
366 }
367
368 /// Test if the high priority attribute is true.
369
370 /// \return True if this task is a high priority, false otherwise.
371 bool is_high_priority() const {
372 return flags&HIGHPRIORITY;
373 }
374
375 /// Sets the generator attribute.
376
377 /// \param[in] generator_hint The new value for the generator attribute.
378 TaskAttributes& set_generator(bool generator_hint) {
379 if (generator_hint)
380 flags |= GENERATOR;
381 else
382 flags &= ~GENERATOR;
383 return *this;
384 }
385
386 /// Sets the stealable attribute.
387
388 /// \param[in] stealable The new value for the stealable attribute.
389 TaskAttributes& set_stealable(bool stealable) {
390 if (stealable) flags |= STEALABLE;
391 else flags &= ~STEALABLE;
392 return *this;
393 }
394
395 /// Sets the high priority attribute.
396
397 /// \param[in] hipri The new value for the high priority attribute.
399 if (hipri)
401 else
402 flags &= ~HIGHPRIORITY;
403 return *this;
404 }
405
406 /// Set the number of threads.
407
408 /// \attention Are you sure this is what you want to call? Only call
409 /// this for a \c TaskAttributes that is \em not a base class of a task
410 /// object.
411 /// \p If you are trying to set the number of threads in an \em existing
412 /// task you should call \c TaskInterface::set_nthread() instead. No
413 /// doubt there is some virtual/protected/something voodoo to prevent
414 /// you from doing harm.
415 ///
416 /// \todo Perhaps investigate a way to make this function only accessible
417 /// from the intended functions (using the so-called voodoo)?
418 ///
419 /// \param[in] nthread The new number of threads.
420 void set_nthread(int nthread) {
421 MADNESS_ASSERT(nthread>=0 && nthread<256);
422 flags = (flags & (~NTHREAD)) | (nthread & NTHREAD);
423 }
424
425 /// Get the number of threads.
426
427 /// \return The number of threads.
428 int get_nthread() const {
429 int n = flags & NTHREAD;
430 if (n == 0)
431 n = 1;
432 return n;
433 }
434
435 /// Serializes the attributes for I/O.
436
437 /// tparam Archive The archive type.
438 /// \param[in,out] ar The archive.
439 template <typename Archive>
440 void serialize(Archive& ar) {
441 ar & flags;
442 }
443
444 /// \todo Brief description needed.
445
446 /// \todo Descriptions needed.
447 /// \return Description needed.
450 }
451
452 /// \todo Brief description needed.
453
454 /// \todo Descriptions needed.
455 /// \return Description needed.
458 }
459
460 /// \todo Brief description needed.
461
462 /// \todo Descriptions needed.
463 /// \return Description needed.
464 static TaskAttributes multi_threaded(int nthread) {
466 t.set_nthread(nthread);
467 return t;
468 }
469 };
470
471 /// Used to pass information about the thread environment to a user's task.
473 const int _nthread; ///< Number of threads collaborating on task.
474 const int _id; ///< ID of this thread (0,...,nthread-1).
475 Barrier* _barrier; ///< Pointer to the shared barrier, `null` if there is only a single thread.
476
477 public:
478 /// Constructor collecting necessary environmental information.
479
480 /// \todo Verify this documentation.
481 /// \param[in] nthread The number of threads collaborating on this task.
482 /// \param[in] id The ID of this thread.
483 /// \param[in] barrier Pointer to the shared barrier.
487
488#if HAVE_INTEL_TBB
489 /// Constructor collecting necessary environmental information.
490
491 /// \todo Verify this documentation.
492 /// \param[in] nthread The number of threads collaborating on this task.
493 /// \param[in] id The ID of this thread.
494 ///
495 /// \todo I cannot get the TaskThreadEnv to work with Barrier.
496 /// Need to figure out why.
498 : _nthread(nthread), _id(id), _barrier(nullptr)
500#endif
501
502 /// Get the number of threads collaborating on this task.
503
504 /// \return The number of threads.
505 int nthread() const {
506 return _nthread;
507 }
508
509 /// Get the ID of this thread.
510
511 /// \return The ID of this thread.
512 int id() const {
513 return _id;
514 }
515
516 /// \todo Brief description needed.
517
518 /// \todo Descriptions needed.
519 /// \return Description needed.
520 bool barrier() const {
521 if (_nthread == 1)
522 return true;
523 else {
525 return _barrier->enter(_id);
526 }
527 }
528 };
529
530
531#ifdef MADNESS_TASK_PROFILING
532
533 namespace profiling {
534
535 /// Task event class.
536
537 /// This class is used to record the task trace information, including
538 /// submit, start, and stop times, as well as identification information.
539 class TaskEvent {
540 private:
541 double times_[3]; ///< Task trace times: { submit, start, stop }.
542 std::pair<void*, unsigned short> id_; ///< Task identification information.
543 unsigned short threads_; ///< Number of threads used by the task.
544
545 /// Print demangled symbol name.
546
547 /// Add the demangled symbol name to \c os. If demangling fails,
548 /// the unmodified symbol name is used instead. If symbol is NULL,
549 /// "UNKNOWN" is used instead. A tab character is added after the
550 /// symbol name.
551 /// \param[in,out] os The output stream.
552 /// \param[in] symbol The symbol to add to the stream.
553 static void print_demangled(std::ostream& os, const char* symbol) {
554 // Get the demagled symbol name
555 if(symbol) {
556 int status = 0;
557#ifndef USE_LIBIBERTY
558 const char* name = abi::__cxa_demangle(symbol, 0, 0, &status);
559#else
560 char* name = cplus_demangle(symbol, DMGL_NO_OPTS);
561#endif
562 // Append the demangled symbol name to the output stream
563 if(status == 0) {
564 os << name << "\t";
565 free((void*)name);
566 } else {
567 os << symbol << "\t";
568 }
569 } else {
570 os << "UNKNOWN\t";
571 }
572 }
573
574 /// Get name of the function pointer.
575
576 /// \return The mangled function name.
577 std::string get_name() const {
578
579 // Get the backtrace symbol for the function address,
580 // which contains the function name.
581 void* const * func_ptr = const_cast<void* const *>(& id_.first);
582 char** bt_sym = backtrace_symbols(func_ptr, 1);
583
584 // Extract the mangled function name from the backtrace
585 // symbol.
586 std::string mangled_name;
587
588#ifdef ON_A_MAC
589 // Format of bt_sym is:
590 // <frame #> <file name> <address> <mangled name> + <function offset>
591 std::istringstream iss(bt_sym[0]);
592 long frame;
593 std::string file, address;
594 iss >> frame >> file >> address >> mangled_name;
595#else // Assume Linux
596 // Format of bt_sym is:
597 // <file>(<mangled name>+<function offset>) [<address>]
598 const char* first = strchr(bt_sym[0],'(');
599 if(first) {
600 ++first;
601 const char* last = strrchr(first,'+');
602 if(last)
603 mangled_name.assign(first, (last - first) - 1);
604 }
605#endif // ON_A_MAC
606
607 // Free the backtrace buffer
608 free(bt_sym);
609
610 return mangled_name;
611 }
612
613 public:
614
615 // Only default constructors are needed.
616
617 /// Record the start time of the task and collect task information.
618
619 /// \param[in,out] id The task identifier (a function pointer or const char*)
620 /// and an integer to differentiate the different types.
621 /// \param[in] threads The number of threads this task uses.
622 /// \param[in] submit_time The time that the task was submitted to the
623 /// task queue.
624 void start(const std::pair<void*, unsigned short>& id,
625 const unsigned short threads, const double submit_time)
626 {
627 id_ = id;
628 threads_ = threads;
629 times_[0] = submit_time;
630 times_[1] = wall_time();
631 }
632
633 /// Record the stop time of the task.
634 void stop() {
635 times_[2] = wall_time();
636 }
637
638 /// Output the task data using a tab-separated list.
639
640 /// Output information includes
641 /// - the ID pointer
642 /// - the function, member function, and object type name
643 /// - the number of threads used by the task
644 /// - the submit time
645 /// - the start time
646 /// - the stop time.
647 ///
648 /// \param[in,out] os The output stream.
649 /// \param[in] te The task event to be output.
650 /// \return The \c os reference.
651 friend std::ostream& operator<<(std::ostream& os, const TaskEvent& te) {
652 // Add address to output stream
653 os << std::hex << std::showbase << te.id_.first <<
654 std::dec << std::noshowbase << "\t";
655
656 // Print the name
657 switch(te.id_.second) {
658 case 1:
659 {
660 const std::string mangled_name = te.get_name();
661
662 // Print the demangled name
663 if(! mangled_name.empty())
664 print_demangled(os, mangled_name.c_str());
665 else
666 os << "UNKNOWN\t";
667 }
668 break;
669 case 2:
670 print_demangled(os, static_cast<const char*>(te.id_.first));
671 break;
672 default:
673 os << "UNKNOWN\t";
674 }
675
676 // Print:
677 // # of threads, submit time, start time, stop time
678 os << te.threads_;
679 const std::streamsize precision = os.precision();
680 os.precision(6);
681 os << std::fixed << "\t" << te.times_[0]
682 << "\t" << te.times_[1] << "\t" << te.times_[2];
683 os.precision(precision);
684 return os;
685 }
686
687 }; // class TaskEvent
688
689 /// Task event list base class.
690
691 /// This base class allows the data to be stored in a linked list.
693 private:
694 TaskEventListBase* next_; ///< The next task event in the list.
695
698
699 public:
700
701 /// Default constructor.
703 : next_(nullptr) { }
704
705 /// Virtual destructor.
706 virtual ~TaskEventListBase() = default;
707
708 /// Get the next event list in the linked list.
709
710 /// \return The next event list.
712 return next_;
713 }
714
715 /// Insert \c list after this list.
716
717 /// \param[in] list The list to be inserted.
719 if(next_)
720 list->next_ = next_;
721 next_ = list;
722 }
723
724 /// Output a task event list to an output stream.
725
726 /// \param[in,out] os The ouptut stream.
727 /// \param[in] tel The task event list to be output.
728 /// \return The modified output stream.
729 friend inline std::ostream& operator<<(std::ostream& os, const TaskEventListBase& tel) {
730 return tel.print_events(os);
731 }
732
733 private:
734
735 /// Print the events.
736 virtual std::ostream& print_events(std::ostream&) const = 0;
737
738 }; // class TaskEventList
739
740 /// A list of task events.
741
742 /// This object is used by the thread pool to record task data.
744 private:
745 unsigned int n_; ///< The number of events recorded.
746 std::unique_ptr<TaskEvent[]> events_; ///< The event array.
747
748 TaskEventList(const TaskEventList&) = delete;
750
751 public:
752
753 /// Default constructor.
754
755 /// \param[in] nmax The maximum number of task events.
756 /// \todo Should nmax be stored? I think it used to be a template
757 /// parameter (N), which is no longer present.
758 TaskEventList(const unsigned int nmax) :
759 TaskEventListBase(), n_(0ul), events_(new TaskEvent[nmax])
760 { }
761
762 /// Virtual destructor.
763 virtual ~TaskEventList() = default;
764
765 /// Get a new event from this list.
766
767 /// \warning This function can only be called \c nmax times. It is
768 /// the caller's resonsibility to ensure that it is not called too
769 /// many times.
770 /// \return The new event from the list.
772 return events_.get() + (n_++);
773 }
774
775 private:
776
777 /// Print events recorded in this list.
778
779 /// \param[in,out] os The output stream.
780 /// \return The modified output stream.
781 virtual std::ostream& print_events(std::ostream& os) const {
782 const int thread_id = ThreadBase::this_thread()->get_pool_thread_index();
783 for(std::size_t i = 0; i < n_; ++i)
784 os << thread_id << "\t" << events_[i] << std::endl;
785 return os;
786 }
787
788 }; // class TaskEventList
789
790 /// This class collects and prints task profiling data.
791
792 /// \note Each thread has its own \c TaskProfiler object, so only one
793 /// thread will ever operate on this object at a time and all operations
794 /// are inheirently thread safe.
796 private:
797 TaskEventListBase* head_; ///< The head of the linked list of data.
798 TaskEventListBase* tail_; ///< The tail of the linked list of data.
799
800 static Mutex output_mutex_; ///< Mutex used to lock the output file.
801
802 TaskProfiler(const TaskProfiler&) = delete;
804
805 public:
806 /// The output file name.
807
808 /// This variable is initialized by \c ThreadPool::begin and is
809 /// assigned the value given by the environment variable
810 /// `MAD_TASKPROFILER_NAME`.
811 static const char* output_file_name_;
812
813 public:
814 /// Default constructor.
816 : head_(nullptr), tail_(nullptr)
817 { }
818
819 /// Destructor.
821 // Cleanup linked list
822 TaskEventListBase* next = nullptr;
823 while(head_ != nullptr) {
824 next = head_->next();
825 delete head_;
826 head_ = next;
827 }
828 }
829
830 /// Create a new task event list.
831
832 /// \param[in] nmax The maximum number of elements that the list
833 /// can contain.
834 /// \return A new task event list.
835 TaskEventList* new_list(const std::size_t nmax) {
836 // Create a new event list
837 TaskEventList* list = new TaskEventList(nmax);
838
839 // Append the list to the tail of the linked list
840 if(head_ != nullptr) {
841 tail_->insert(list);
842 tail_ = list;
843 } else {
844 head_ = list;
845 tail_ = list;
846 }
847 return list;
848 }
849
850 /// Write the profile data to file.
851
852 /// The data is cleared after it is written to the file, so this
853 /// function may be called more than once.
854 ///
855 /// \warning This function should only be called from the thread
856 /// that owns it, otherwise data will likely be corrupted.
857 ///
858 /// \note This function is thread safe, in that it may be called by
859 /// different objects in different threads simultaneously.
860 void write_to_file();
861 }; // class TaskProfiler
862
863 } // namespace profiling
864
865#endif // MADNESS_TASK_PROFILING
866
867
868 /// Lowest level task interface.
869
870 /// The pool invokes \c run_multi_threaded(), which does any necessary
871 /// setup for multiple threads, and then invokes the user's \c run() method.
873 {
874 friend class ThreadPool;
875
876 private:
877
878#ifdef MADNESS_TASK_PROFILING
879 profiling::TaskEvent* task_event_; ///< \todo Description needed.
880 double submit_time_; ///< \todo Description needed.
881 std::pair<void*, unsigned short> id_; ///< \todo Description needed.
882
883 /// \todo Brief description needed.
884
885 /// \todo Descriptions needed.
886 /// \param[in,out] task_event Description needed.
887 void set_event(profiling::TaskEvent* task_event) {
888 task_event_ = task_event;
889 }
890
891 /// Collect info on the task and record the submit time.
892 void submit() {
894 this->get_id(id_);
895 }
896#endif // MADNESS_TASK_PROFILING
897
898 /// Object that is used to convert function and member function pointers into `void*`.
899
900 /// \note This is technically not supported by the C++ standard but
901 /// it will likely not cause any issues here (famous last words?).
902 /// \todo Descriptions needed.
903 /// \tparam T Description needed.
904 template <typename T>
906 T in; ///< \todo Description needed.
907 void* out; ///< \todo Description needed.
908 };
909
910 protected:
911
912 /// \todo Brief description needed.
913
914 /// \todo Descriptions needed.
915 /// \tparam fnT Description needed.
916 /// \param[in,out] id Description needed.
917 /// \param[in] fn Description needed.
918 /// \return Description needed.
919 template <typename fnT>
920 static typename std::enable_if<detail::function_traits<fnT>::value ||
922 make_id(std::pair<void*,unsigned short>& id, fnT fn) {
924 poop.in = fn;
925 id.first = poop.out;
926 id.second = 1ul;
927 }
928
929 /// \todo Brief description needed.
930
931 /// \todo Descriptions needed. What is the purpose of the second argument?
932 /// \tparam fnobjT Description needed.
933 /// \param[in,out] id Description needed.
934 template <typename fnobjT>
935 static typename std::enable_if<!(detail::function_traits<fnobjT>::value ||
937 make_id(std::pair<void*,unsigned short>& id, const fnobjT&) {
938 id.first = reinterpret_cast<void*>(const_cast<char*>(typeid(fnobjT).name()));
939 id.second = 2ul;
940 }
941
942 private:
943
944 /// \todo Brief description needed.
945
946 /// \todo Descriptions needed.
947 /// \param[in,out] id Description needed.
948 virtual void get_id(std::pair<void*,unsigned short>& id) const {
949 id.first = nullptr;
950 id.second = 0ul;
951 }
952
953#ifndef HAVE_INTEL_TBB
954
955 Barrier* barrier; ///< Barrier, only allocated for multithreaded tasks.
956 AtomicInt count; ///< Used to count threads as they start.
957
958 /// Returns true for the one thread that should invoke the destructor.
959
960 /// \return True for the one thread that should invoke the destructor.
961 bool run_multi_threaded() {
962 // As a thread enters this routine it increments the shared counter
963 // to generate a unique id without needing any thread-local storage.
964 // A downside is this does not preserve any relationships between thread
965 // numbering and the architecture ... more work ahead.
966 int nthread = get_nthread();
967 if (nthread == 1) {
968#ifdef MADNESS_TASK_PROFILING
969 task_event_->start(id_, nthread, submit_time_);
970#endif // MADNESS_TASK_PROFILING
971 run(TaskThreadEnv(1,0,0));
972#ifdef MADNESS_TASK_PROFILING
973 task_event_->stop();
974#endif // MADNESS_TASK_PROFILING
975 return true;
976 }
977 else {
978 int id = count++;
979 volatile bool barrier_flag;
980 barrier->register_thread(id, &barrier_flag);
981
982#ifdef MADNESS_TASK_PROFILING
983 if(id == 0)
984 task_event_->start(id_, nthread, submit_time_);
985#endif // MADNESS_TASK_PROFILING
986
987 run(TaskThreadEnv(nthread, id, barrier));
988
989#ifdef MADNESS_TASK_PROFILING
990 const bool cleanup = barrier->enter(id);
991 if(cleanup) task_event_->stop();
992 return cleanup;
993#else
994 return barrier->enter(id);
995#endif // MADNESS_TASK_PROFILING
996 }
997 }
998
999 public:
1000
1001 /// Default constructor.
1003 : TaskAttributes()
1004 , barrier(nullptr)
1005#if HAVE_PARSEC
1006 , parsec_task(ParsecRuntime::task(is_high_priority(), this))
1007#endif
1008 {
1009 count = 0;
1010 }
1011
1012 /// Constructor setting the specified task attributes.
1013
1014 /// \param[in] attr The task attributes.
1015 explicit PoolTaskInterface(const TaskAttributes& attr)
1016 : TaskAttributes(attr)
1017 , barrier(attr.get_nthread()>1 ? new Barrier(attr.get_nthread()) : 0)
1018#if HAVE_PARSEC
1019 , parsec_task(ParsecRuntime::task(is_high_priority(), this))
1020#endif
1021 {
1022 count = 0;
1023 }
1024
1025 /// Destructor.
1026 /// \todo Should we either use a unique_ptr for barrier or check that barrier != nullptr here?
1027 virtual ~PoolTaskInterface() {
1028#if HAVE_PARSEC
1029 *(reinterpret_cast<PoolTaskInterface**>(&(parsec_task->locals[0]))) = nullptr;
1030 ParsecRuntime::delete_parsec_task(parsec_task);
1031 parsec_task = nullptr;
1032#endif
1033 delete barrier;
1034 }
1035
1036 /// Call this to reset the number of threads before the task is submitted.
1037
1038 /// Once a task has been constructed, /c TaskAttributes::set_nthread()
1039 /// is insufficient because a multithreaded task includes a barrier
1040 /// that needs to know the number of threads.
1041 ///
1042 /// \param[in] nthread The new number of threads.
1043 void set_nthread(int nthread) {
1044 if (nthread != get_nthread()) {
1046 delete barrier;
1047 if (nthread > 1)
1048 barrier = new Barrier(nthread);
1049 else
1050 barrier = 0;
1051 }
1052 }
1053#if HAVE_PARSEC
1054 //////////// Parsec Related Begin ////////////////////
1055 parsec_task_t *parsec_task;
1056 //////////// Parsec Related End ///////////////////
1057#endif
1058
1059#else
1060
1061 public:
1062
1063 /// Default constructor.
1066
1067 /// \todo Brief description needed.
1068
1069 /// \todo Descriptions needed.
1070 /// \param[in] attr Description needed.
1071 explicit PoolTaskInterface(const TaskAttributes& attr) :
1072 TaskAttributes(attr)
1073 {
1074 }
1075
1076 /// Destructor.
1077 virtual ~PoolTaskInterface() = default;
1078
1079 /// Call this to reset the number of threads before the task is submitted
1080
1081 /// Once a task has been constructed /c TaskAttributes::set_nthread()
1082 /// is insufficient because a multithreaded task includes a
1083 /// barrier that needs to know the number of threads.
1084 void set_nthread(int nthread) {
1085 if (nthread != get_nthread())
1087 }
1088
1089 /// \todo Brief description needed.
1090
1091 /// \todo Descriptions needed.
1092 /// \return Description needed.
1093 void execute() {
1094 const int nthread = get_nthread();
1095 run( TaskThreadEnv(nthread, 0) );
1096 }
1097
1098#endif // HAVE_INTEL_TBB
1099
1100 /// Override this method to implement a multi-threaded task.
1101
1102 /// \c info.nthread() will be the number of threads collaborating on this task.
1103 ///
1104 /// \c info.id() will be the index of the current thread \c id=0,...,nthread-1.
1105 ///
1106 /// \c info.barrier() will be a barrier for all of the threads, and returns
1107 /// true for the last thread to enter the barrier (other threads get false).
1108 ///
1109 /// \todo Description needed.
1110 /// \param[in] info Description needed.
1111
1112
1113 virtual void run(const TaskThreadEnv& info) = 0;
1114
1115 };
1116
1117 /// A no-operation task used for various purposes.
1119 public:
1120 /// Execution function that does nothing.
1121 void run(const TaskThreadEnv& /*info*/) {}
1122
1123 /// Destructor.
1124 virtual ~PoolTaskNull() {}
1125
1126 private:
1127 /// \todo Brief description needed.
1128
1129 /// \todo Description needed.
1130 /// \param[in,out] id Description needed.
1131 virtual void get_id(std::pair<void*,unsigned short>& id) const {
1133 }
1134 };
1135
1136 /// \c ThreadPool thread object.
1137
1138 /// This class holds thread local data for thread pool threads. It can be
1139 /// accessed via \c ThreadBase::this_thread().
1140 class ThreadPoolThread : public Thread {
1141 private:
1142 // Thread local data for thread pool
1143#ifdef MADNESS_TASK_PROFILING
1144 profiling::TaskProfiler profiler_; ///< \todo Description needed.
1145#endif // MADNESS_TASK_PROFILING
1146
1147 public:
1149 virtual ~ThreadPoolThread() = default;
1150
1151#ifdef MADNESS_TASK_PROFILING
1152 /// Task profiler accessor.
1153
1154 /// \todo Description needed.
1155 /// \return Description needed.
1159#endif // MADNESS_TASK_PROFILING
1160 };
1161
1162 /// A singleton pool of threads for dynamic execution of tasks.
1163
1164 /// \attention You must instantiate the pool while running with just one
1165 /// thread.
1167 public:
1168 // non-copyable and non-movable
1169 ThreadPool(const ThreadPool&) = delete;
1171 void operator=(const ThreadPool&) = delete;
1172 void operator=(ThreadPool&&) = delete;
1173
1174 /// Get the number of threads from the environment.
1175
1176 /// \return The number of threads.
1177 static int default_nthread();
1178
1179 private:
1180 friend class WorldTaskQueue;
1181
1182 // Thread pool data
1183 ThreadPoolThread *threads; ///< Array of threads.
1184 ThreadPoolThread main_thread; ///< Placeholder for main thread tls.
1185 DQueue<PoolTaskInterface*> queue; ///< Queue of tasks.
1186 int nthreads; ///< Number of threads.
1187 volatile bool finish; ///< Set to true when time to stop.
1188 AtomicInt nfinished; ///< Thread pool exit counter.
1189
1190 // Static data
1191 static ThreadPool* instance_ptr; ///< Singleton pointer.
1192 static const int nmax = 128; ///< Number of task a worker thread will pop from the task queue
1193 static double await_timeout; ///< Waiter timeout.
1194
1195#if defined(HAVE_IBMBGQ) and defined(HPM)
1196 static unsigned int main_hpmctx; ///< HPM context for main thread.
1197#endif
1198 /// The constructor is private to enforce the singleton model.
1199
1200 /// \todo Description needed.
1201 /// \param[in] nthread Description needed.
1202 ThreadPool(int nthread=-1);
1203
1204 /// Run the next task.
1205
1206 /// \todo Verify and complete this documentation.
1207 /// \param[in] wait Block of true.
1208 /// \param[in,out] this_thread Description needed.
1209 /// \return True if a task was run.
1210 bool run_task(bool wait, ThreadPoolThread* this_thread) {
1211#if HAVE_INTEL_TBB
1212 MADNESS_EXCEPTION("run_task should not be called when using Intel TBB", 1);
1213#else
1214
1215 if (!wait && queue.empty()) return false;
1216 std::pair<PoolTaskInterface*,bool> t = queue.pop_front(wait);
1217#ifdef MADNESS_TASK_PROFILING
1218 profiling::TaskEventList* event_list =
1219 this_thread->profiler().new_list(1);
1220#endif // MADNESS_TASK_PROFILING
1221 // Task pointer might be zero due to stealing
1222 if (t.second && t.first) {
1223#ifdef MADNESS_TASK_PROFILING
1224 t.first->set_event(event_list->event());
1225#endif // MADNESS_TASK_PROFILING
1226 if (t.first->run_multi_threaded()) // What we are here to do
1227 delete t.first;
1228 }
1229 return t.second;
1230#endif
1231 }
1232
1233 /// \todo Brief description needed.
1234
1235 /// \todo Descriptions needed.
1236 /// \param[in] wait Description needed.
1237 /// \param[in,out] this_thread Description needed.
1238 /// \return Description needed.
1239 bool run_tasks(bool wait, ThreadPoolThread* const this_thread) {
1240#if HAVE_INTEL_TBB
1241// if (!wait && tbb_task_list->empty()) return false;
1242// tbb::task* t = &tbb_task_list->pop_front();
1243// if (t) {
1244// tbb_parent_task->increment_ref_count();
1245// tbb_parent_task->enqueue(*t);
1246// }
1247
1248// wait = (tbb_parent_task->ref_count() >= 1) ? false : true;
1249// return wait;
1250
1251 MADNESS_EXCEPTION("run_tasks should not be called when using Intel TBB", 1);
1252#else
1253
1254 PoolTaskInterface* taskbuf[nmax];
1255 int ntask = queue.pop_front(nmax, taskbuf, wait);
1256#ifdef MADNESS_TASK_PROFILING
1257 profiling::TaskEventList* event_list =
1258 this_thread->profiler().new_list(ntask);
1259#endif // MADNESS_TASK_PROFILING
1260 for (int i=0; i<ntask; ++i) {
1261 if (taskbuf[i]) { // Task pointer might be zero due to stealing
1262#ifdef MADNESS_TASK_PROFILING
1263 taskbuf[i]->set_event(event_list->event());
1264#endif // MADNESS_TASK_PROFILING
1265 if (taskbuf[i]->run_multi_threaded()) {
1266 delete taskbuf[i];
1267 }
1268 }
1269 }
1270#if HAVE_PARSEC
1271 ////////////////// Parsec Related Begin //////////////////
1272 if(0 == ntask) {
1273 ntask = parsec_runtime->test();
1274 }
1275 ///////////////// Parsec Related End ////////////////////
1276#endif
1277 return (ntask>0);
1278#endif
1279 }
1280
1281 /// \todo Brief description needed.
1282
1283 /// \todo Description needed.
1284 /// \param[in,out] thread Description needed.
1285 void thread_main(ThreadPoolThread* const thread);
1286
1287 /// Forwards the thread to bound member function.
1288
1289 /// \todo Descriptions needed.
1290 /// \param[in] v Description needed.
1291 /// \return Description needed.
1292 static void* pool_thread_main(void *v);
1293
1294 public:
1295 /// Return a pointer to the only instance, constructing as necessary.
1296
1297 /// \return A pointer to the only instance.
1299#ifndef MADNESS_ASSERTIONS_DISABLE
1300 if(! instance_ptr) {
1301 std::cerr << "!!! ERROR: The thread pool has not been initialized.\n"
1302 << "!!! ERROR: Call madness::initialize before submitting tasks to the task queue.\n";
1303 MADNESS_EXCEPTION("ThreadPool::instance_ptr is NULL", 0);
1304 }
1305#endif
1306 return instance_ptr;
1307 }
1308
1310#if !(defined(HAVE_INTEL_TBB) || defined(HAVE_PARSEC))
1311 queue.lock_and_flush_prebuf();
1312#endif
1313 }
1314
1315#if HAVE_PARSEC
1316 ////////////////// Parsec Related Begin //////////////////
1317 static ParsecRuntime *parsec_runtime;
1318 ///////////////// Parsec Related End ////////////////////
1319#endif
1320
1321#if HAVE_INTEL_TBB
1322 static std::unique_ptr<tbb::global_control> tbb_control; ///< \todo Description needed.
1323 static std::unique_ptr<tbb::task_arena> tbb_arena;
1324#endif
1325
1326 /// Please invoke while in a single-threaded environment.
1327
1328 /// \todo Verify documentation.
1329 /// \param[in] nthread The number of threads.
1330 static void begin(int nthread=-1);
1331
1332 /// \todo Description needed.
1333 static void end();
1334
1335 /// Add a new task to the pool.
1336
1337 /// \todo Description needed.
1338 /// \param[in,out] task Description needed.
1340#ifdef MADNESS_TASK_PROFILING
1341 task->submit();
1342#endif // MADNESS_TASK_PROFILING
1343
1344#if HAVE_PARSEC
1345 //////////// Parsec Related Begin ////////////////////
1346 parsec_runtime->schedule(task);
1347 //////////// Parsec Related End ////////////////////
1348#elif HAVE_INTEL_TBB
1349//#ifdef MADNESS_CAN_USE_TBB_PRIORITY
1350// if(task->is_high_priority())
1351// tbb::task::enqueue(*task, tbb::priority_high);
1352// else
1353//#endif // MADNESS_CAN_USE_TBB_PRIORITY
1354 tbb_arena->enqueue(
1355 //use unique_ptr to automatically delete task ptr
1356 [task_p = std::unique_ptr<PoolTaskInterface>(task)] () noexcept {
1357 //exceptions are not expected here, as nobody will catch them for enqueued tasks
1358 task_p->execute();
1359 });
1360#else
1361 if (!task) MADNESS_EXCEPTION("ThreadPool: inserting a NULL task pointer", 1);
1362 int task_threads = task->get_nthread();
1363 // Currently multithreaded tasks must be shoved on the end of the q
1364 // to avoid a race condition as multithreaded task is starting up
1365 if (task->is_high_priority() && (task_threads == 1)) {
1366 instance()->queue.push_front(task);
1367 }
1368 else {
1369 instance()->queue.push_back(task, task_threads);
1370 }
1371#endif // HAVE_INTEL_TBB
1372 }
1373
1374 /// \todo Brief description needed.
1375
1376 /// \todo Descriptions needed.
1377 /// \tparam opT Description needed.
1378 /// \param[in,out] op Description needed.
1379 template <typename opT>
1380 void scan(opT& op) {
1381 queue.scan(op);
1382 }
1383
1384 /// Add a vector of tasks to the pool.
1385
1386 /// \param[in] tasks Vector of tasks to add to the pool.
1387 static void add(const std::vector<PoolTaskInterface*>& tasks) {
1388#if HAVE_INTEL_TBB
1389 MADNESS_EXCEPTION("Do not add tasks to the madness task queue when using Intel TBB.", 1);
1390#else
1391 typedef std::vector<PoolTaskInterface*>::const_iterator iteratorT;
1392 for (iteratorT it=tasks.begin(); it!=tasks.end(); ++it) {
1393 add(*it);
1394 }
1395#endif
1396 }
1397
1398 /// An otherwise idle thread can all this to run a task.
1399
1400 /// \return True if a task was run.
1401 static bool run_task() {
1402#ifdef HAVE_INTEL_TBB
1403 return false;
1404#else
1405
1406#ifdef MADNESS_TASK_PROFILING
1407 ThreadPoolThread* const thread = static_cast<ThreadPoolThread*>(ThreadBase::this_thread());
1408#else
1409 ThreadPoolThread* const thread = nullptr;
1410#endif // MADNESS_TASK_PROFILING
1411
1412 return instance()->run_tasks(false, thread);
1413#endif // HAVE_INTEL_TBB
1414 }
1415
1416 /// Returns the number of threads in the pool.
1417
1418 /// \return The number of threads in the pool.
1419 static std::size_t size() {
1420 return instance()->nthreads;
1421 }
1422
1423 /// Returns the number of tasks in the queue.
1424
1425 /// \return The number of tasks in the queue.
1426 static std::size_t queue_size() {
1427 return instance()->queue.size();
1428 }
1429
1430 /// Returns queue statistics.
1431
1432 /// \return Queue statistics.
1433 static const DQStats& get_stats();
1434
1435 /// Access the pool thread array
1436 /// \return ptr to the pool thread array, its size is given by \c size()
1438 return const_cast<const ThreadPoolThread*>(instance()->threads);
1439 }
1440
1441 /// Gracefully wait for a condition to become true, executing any tasks in the queue.
1442
1443 /// Probe should be an object that, when called, returns the status.
1444 /// \todo Descriptions needed/need verification.
1445 /// \tparam Probe Type of the probe.
1446 /// \param[in] probe The probe.
1447 /// \param[in] dowork Do work while waiting - default is true
1448 /// \param[in] sleep Sleep instead of spin while waiting (e.g., to avoid pounding on MPI) - default is false
1449 template <typename Probe>
1450 static void await(const Probe& probe, bool dowork = true, bool sleep = false) {
1451 if (!probe()) {
1452 double start = cpu_time();
1453 const double timeout = await_timeout;
1454 int counter = 0;
1455
1456 // if dowork=false must manually purge threal-local tasks to ensure progress
1457 if (!dowork) thread_purge();
1458
1459 MutexWaiter waiter;
1460 while (!probe()) {
1461
1462 const bool working = (dowork ? ThreadPool::run_task() : false);
1463 const double current_time = cpu_time();
1464
1465 if (working) { // Reset timeout logic
1466 waiter.reset();
1467 start = current_time;
1468 counter = 0;
1469 } else {
1470 if(((current_time - start) > timeout) && (timeout > 1.0)) { // Check for timeout
1471 std::cerr << "!!MADNESS: Hung queue?" << std::endl;
1472 if (counter++ > 3) {
1473 const long bufsize=256;
1474 char errstr[bufsize];
1475 snprintf(errstr,bufsize, "ThreadPool::await() timed out after %.1lf seconds", timeout);
1476 throw madness::MadnessException(errstr, 0, 1,
1477 __LINE__, __FUNCTION__,
1478 __FILE__);
1479 }
1480 }
1481 if (sleep) {
1482 // THIS NEEDS TO BECOME AN EXTERNAL PARAMETER
1483 // Problem is exacerbated when running with many
1484 // (e.g., 512 or more) send/recv buffers, and
1485 // also with many threads. More outstanding
1486 // requests means each call into MPI takes
1487 // longer and more threads means more calls in
1488 // spots where all threads are messaging. Old
1489 // code was OK on dancer.icl.utk.edu with just
1490 // 32 bufs and 20 threads, but 512 bufs caused
1491 // intermittent hangs I think due to something
1492 // not being able to make progress or general
1493 // confusion (this with MPICH) ... maybe using a
1494 // fair mutex somewhere would help.
1495 //
1496 // 100us is a long time ... will try 10us. mmm ... perhaps need 100 at least on dancer with 17 threads per node
1497 myusleep(100);
1498 }
1499 else {
1500 waiter.wait();
1501 }
1502 }
1503 }
1504 } // if !probe()
1505 }
1506
1507 /// Destructor.
1509#if HAVE_PARSEC
1510 ////////////////// Parsec related Begin /////////////////
1511 delete parsec_runtime;
1512 ////////////////// Parsec related End /////////////////
1513#elif HAVE_INTEL_TBB
1514#else
1515 delete[] threads;
1516#endif
1517 }
1518
1519 /// \sa madness::threadpool_wait_policy
1520 static void set_wait_policy(
1521 WaitPolicy policy,
1522 int sleep_duration_in_microseconds = 0) {
1523#if !HAVE_INTEL_TBB && !HAVE_PARSEC
1524 instance()->queue.set_wait_policy(policy,
1525 sleep_duration_in_microseconds);
1526#endif
1527 }
1528
1529 };
1530
1531 // clang-format off
1532 /// Controls how aggressively ThreadPool holds on to the OS threads
1533 /// while waiting for work. Currently useful only for Pthread pool when it's using spinlocks;
1534 /// NOT used for TBB or PaRSEC.
1535 /// \param policy specifies how to wait for work;
1536 /// - WaitPolicy::Busy -- threads are kept busy (default); recommended when intensive work is only performed by MADNESS threads
1537 /// - WaitPolicy::Yield -- thread yields; recommended when intensive work is performed primarily by non-MADNESS threads
1538 /// - WaitPolicy::Sleep -- thread sleeps for \p sleep_duration_in_microseconds ; recommended when intensive work is performed by MADNESS nd non-MADNESS threads
1539 /// \param sleep_duration_in_microseconds if `policy==WaitPolicy::Sleep` this specifies the duration of sleep, in microseconds
1540 // clang-format on
1542 int sleep_duration_in_microseconds = 0) {
1543 ThreadPool::set_wait_policy(policy, sleep_duration_in_microseconds);
1544 }
1545
1546 /// @}
1547
1548 inline void thread_purge() {
1549#if !(defined(HAVE_PARSEC) || defined(HAVE_INTEL_TBB))
1552#endif
1553 }
1554
1555 template<class F, class... Args>
1556 constexpr decltype(auto) blocking_invoke(F&& f, Args&&... args)
1557 noexcept(std::is_nothrow_invocable_v<F, Args...>) {
1558 thread_purge();
1559 return std::invoke(std::forward<F>(f), std::forward<Args>(args)...);
1560 }
1561
1562 template<class R, class F, class... Args>
1563 constexpr R blocking_invoke_r(F&& f, Args&&... args)
1564 noexcept(std::is_nothrow_invocable_v<F, Args...>) {
1565 thread_purge();
1566#if __cplusplus < 202302L
1567 if constexpr (std::is_void_v<R>)
1568 std::invoke(std::forward<F>(f), std::forward<Args>(args)...);
1569 else
1570 return std::invoke(std::forward<F>(f), std::forward<Args>(args)...);
1571#else
1572 return std::invoke_r<R>(std::forward<F>(f), std::forward<Args>(args)...);
1573#endif
1574 }
1575
1576} // namespace madness
1577
1578#endif // MADNESS_WORLD_THREAD_H__INCLUDED
simple class for testing the solver
Definition derivatives.cc:60
An integer with atomic set, get, read+increment, read+decrement, and decrement+test operations.
Definition atomicint.h:126
Definition worldmutex.h:700
bool enter(const int id)
Each thread calls this with its id (0,..,nthread-1) to enter the barrier.
Definition worldmutex.h:729
void register_thread(int id, volatile bool *pflag)
Each thread calls this once before first use.
Definition worldmutex.h:718
A thread safe, fast but simple doubled-ended queue.
Definition dqueue.h:80
Base class for exceptions thrown in MADNESS.
Definition madness_exception.h:66
Definition worldmutex.h:109
void wait()
Definition worldmutex.cc:103
void reset()
Definition worldmutex.h:124
Mutex using pthread mutex operations.
Definition worldmutex.h:131
Lowest level task interface.
Definition thread.h:873
double submit_time_
Definition thread.h:880
void execute()
Definition thread.h:1093
virtual ~PoolTaskInterface()=default
Destructor.
static std::enable_if<!(detail::function_traits< fnobjT >::value||detail::memfunc_traits< fnobjT >::value)>::type make_id(std::pair< void *, unsigned short > &id, const fnobjT &)
Definition thread.h:937
profiling::TaskEvent * task_event_
Definition thread.h:879
PoolTaskInterface()
Default constructor.
Definition thread.h:1064
virtual void run(const TaskThreadEnv &info)=0
Override this method to implement a multi-threaded task.
PoolTaskInterface(const TaskAttributes &attr)
Definition thread.h:1071
std::pair< void *, unsigned short > id_
Definition thread.h:881
void set_event(profiling::TaskEvent *task_event)
Definition thread.h:887
void submit()
Collect info on the task and record the submit time.
Definition thread.h:892
static std::enable_if< detail::function_traits< fnT >::value||detail::memfunc_traits< fnT >::value >::type make_id(std::pair< void *, unsigned short > &id, fnT fn)
Definition thread.h:922
virtual void get_id(std::pair< void *, unsigned short > &id) const
Definition thread.h:948
void set_nthread(int nthread)
Call this to reset the number of threads before the task is submitted.
Definition thread.h:1084
A no-operation task used for various purposes.
Definition thread.h:1118
void run(const TaskThreadEnv &)
Execution function that does nothing.
Definition thread.h:1121
virtual ~PoolTaskNull()
Destructor.
Definition thread.h:1124
virtual void get_id(std::pair< void *, unsigned short > &id) const
Definition thread.h:1131
Contains attributes of a task.
Definition thread.h:329
TaskAttributes & set_highpriority(bool hipri)
Sets the high priority attribute.
Definition thread.h:398
TaskAttributes(const TaskAttributes &attr)
Copy constructor.
Definition thread.h:349
bool is_generator() const
Test if the generator attribute is true.
Definition thread.h:357
static TaskAttributes hipri()
Definition thread.h:456
TaskAttributes & set_generator(bool generator_hint)
Sets the generator attribute.
Definition thread.h:378
static const unsigned long GENERATOR
Mask for generator bit.
Definition thread.h:334
static const unsigned long STEALABLE
Mask for stealable bit.
Definition thread.h:335
bool is_stealable() const
Test if the stealable attribute is true.
Definition thread.h:364
TaskAttributes(unsigned long flags=0)
Sets the attributes to the desired values.
Definition thread.h:343
static const unsigned long NTHREAD
Mask for nthread byte.
Definition thread.h:333
void set_nthread(int nthread)
Set the number of threads.
Definition thread.h:420
static TaskAttributes generator()
Definition thread.h:448
bool is_high_priority() const
Test if the high priority attribute is true.
Definition thread.h:371
TaskAttributes & set_stealable(bool stealable)
Sets the stealable attribute.
Definition thread.h:389
static TaskAttributes multi_threaded(int nthread)
Definition thread.h:464
void serialize(Archive &ar)
Serializes the attributes for I/O.
Definition thread.h:440
unsigned long flags
Byte-string storing the specified attributes.
Definition thread.h:330
int get_nthread() const
Get the number of threads.
Definition thread.h:428
static const unsigned long HIGHPRIORITY
Mask for priority bit.
Definition thread.h:336
virtual ~TaskAttributes()
Definition thread.h:352
Used to pass information about the thread environment to a user's task.
Definition thread.h:472
int id() const
Get the ID of this thread.
Definition thread.h:512
TaskThreadEnv(int nthread, int id, Barrier *barrier)
Constructor collecting necessary environmental information.
Definition thread.h:484
TaskThreadEnv(int nthread, int id)
Constructor collecting necessary environmental information.
Definition thread.h:497
bool barrier() const
Definition thread.h:520
Barrier * _barrier
Pointer to the shared barrier, null if there is only a single thread.
Definition thread.h:475
const int _nthread
Number of threads collaborating on task.
Definition thread.h:473
int nthread() const
Get the number of threads collaborating on this task.
Definition thread.h:505
const int _id
ID of this thread (0,...,nthread-1).
Definition thread.h:474
Simplified thread wrapper to hide pthread complexity.
Definition thread.h:170
virtual void run()=0
Function to be executed by the thread.
int pool_num
Stores index of thread in pool or -1.
Definition thread.h:182
void set_pool_thread_index(int i)
Sets the index of this thread within the pool.
Definition thread.h:201
static void exit()
A thread can call this to terminate its execution.
Definition thread.h:232
ThreadBase()
Default constructor.
Definition thread.h:219
const pthread_t & get_id() const
Get the pthread id of this thread (if running).
Definition thread.h:237
static pthread_key_t thread_key
Thread id key.
Definition thread.h:173
static ThreadBase * this_thread()
Definition thread.h:263
int cancel() const
Cancel this thread.
Definition thread.h:249
pthread_t id
Definition thread.h:183
static void delete_thread_key()
Definition thread.h:193
static int num_hw_processors()
Get number of actual hardware processors.
Definition thread.cc:174
void start()
Start the thread running.
Definition thread.cc:158
virtual ~ThreadBase()
Definition thread.h:221
static void init_thread_key()
Definition thread.h:186
int get_pool_thread_index() const
Get index of this thread in ThreadPool.
Definition thread.h:244
Definition thread.h:103
bool do_bind
Definition thread.h:107
void bind()
Definition thread.h:143
ThreadBinder(bool print=false)
Definition thread.h:114
size_t cpus[maxncpu]
Definition thread.h:108
std::atomic< size_t > nextcpu
Definition thread.h:109
const size_t * get_cpus() const
Definition thread.h:139
bool print
Definition thread.h:105
static const size_t maxncpu
Definition thread.h:104
static thread_local bool bound
Definition thread.h:110
void set_do_bind(bool value)
Definition thread.h:137
size_t ncpu
Definition thread.h:106
const size_t get_ncpu() const
Definition thread.h:141
ThreadPool thread object.
Definition thread.h:1140
ThreadPoolThread()
Definition thread.h:1148
profiling::TaskProfiler profiler_
Definition thread.h:1144
profiling::TaskProfiler & profiler()
Task profiler accessor.
Definition thread.h:1156
virtual ~ThreadPoolThread()=default
A singleton pool of threads for dynamic execution of tasks.
Definition thread.h:1166
static void add(PoolTaskInterface *task)
Add a new task to the pool.
Definition thread.h:1339
int nthreads
Number of threads.
Definition thread.h:1186
static std::unique_ptr< tbb::task_arena > tbb_arena
Definition thread.h:1323
static bool run_task()
An otherwise idle thread can all this to run a task.
Definition thread.h:1401
static void add(const std::vector< PoolTaskInterface * > &tasks)
Add a vector of tasks to the pool.
Definition thread.h:1387
ThreadPool(ThreadPool &&)=delete
void operator=(ThreadPool &&)=delete
volatile bool finish
Set to true when time to stop.
Definition thread.h:1187
ThreadPool(const ThreadPool &)=delete
static void set_wait_policy(WaitPolicy policy, int sleep_duration_in_microseconds=0)
Definition thread.h:1520
static int default_nthread()
Get the number of threads from the environment.
Definition thread.cc:325
static const DQStats & get_stats()
Returns queue statistics.
Definition thread.cc:466
static void end()
Definition thread.cc:437
AtomicInt nfinished
Thread pool exit counter.
Definition thread.h:1188
void operator=(const ThreadPool &)=delete
static ThreadPool * instance_ptr
Singleton pointer.
Definition thread.h:1191
ThreadPoolThread * threads
Array of threads.
Definition thread.h:1183
void thread_main(ThreadPoolThread *const thread)
Definition thread.cc:349
~ThreadPool()
Destructor.
Definition thread.h:1508
void scan(opT &op)
Definition thread.h:1380
void flush_prebuf()
Definition thread.h:1309
static void begin(int nthread=-1)
Please invoke while in a single-threaded environment.
Definition thread.cc:379
static ThreadPool * instance()
Return a pointer to the only instance, constructing as necessary.
Definition thread.h:1298
static double await_timeout
Waiter timeout.
Definition thread.h:1193
DQueue< PoolTaskInterface * > queue
Queue of tasks.
Definition thread.h:1185
bool run_tasks(bool wait, ThreadPoolThread *const this_thread)
Definition thread.h:1239
static std::size_t queue_size()
Returns the number of tasks in the queue.
Definition thread.h:1426
ThreadPoolThread main_thread
Placeholder for main thread tls.
Definition thread.h:1184
static const int nmax
Number of task a worker thread will pop from the task queue.
Definition thread.h:1192
bool run_task(bool wait, ThreadPoolThread *this_thread)
Run the next task.
Definition thread.h:1210
static void * pool_thread_main(void *v)
Forwards the thread to bound member function.
Definition thread.cc:374
static void await(const Probe &probe, bool dowork=true, bool sleep=false)
Gracefully wait for a condition to become true, executing any tasks in the queue.
Definition thread.h:1450
static std::unique_ptr< tbb::global_control > tbb_control
Definition thread.h:1322
static const ThreadPoolThread * get_threads()
Definition thread.h:1437
static std::size_t size()
Returns the number of threads in the pool.
Definition thread.h:1419
Simplified thread wrapper to hide pthread complexity.
Definition thread.h:277
void start(void *(*f)(void *), void *args=nullptr)
Start the thread by running f(args).
Definition thread.h:305
virtual ~Thread()=default
Thread(void *(*f)(void *), void *args=nullptr)
Create a thread and start it running f(args).
Definition thread.h:296
Thread()
Default constructor.
Definition thread.h:290
void run()
Invokes the function for this thread.
Definition thread.h:282
void * args
The arguments passed to this thread for execution.
Definition thread.h:279
void *(* f)(void *)
The function called for executing this thread.
Definition thread.h:278
Multi-threaded queue to manage and run tasks.
Definition world_task_queue.h:319
Task event list base class.
Definition thread.h:692
virtual ~TaskEventListBase()=default
Virtual destructor.
TaskEventListBase * next_
The next task event in the list.
Definition thread.h:694
friend std::ostream & operator<<(std::ostream &os, const TaskEventListBase &tel)
Output a task event list to an output stream.
Definition thread.h:729
TaskEventListBase & operator=(const TaskEventListBase &)=delete
TaskEventListBase * next() const
Get the next event list in the linked list.
Definition thread.h:711
virtual std::ostream & print_events(std::ostream &) const =0
Print the events.
TaskEventListBase()
Default constructor.
Definition thread.h:702
void insert(TaskEventListBase *list)
Insert list after this list.
Definition thread.h:718
TaskEventListBase(const TaskEventListBase &)=delete
A list of task events.
Definition thread.h:743
TaskEvent * event()
Get a new event from this list.
Definition thread.h:771
TaskEventList & operator=(const TaskEventList &)=delete
TaskEventList(const unsigned int nmax)
Default constructor.
Definition thread.h:758
virtual std::ostream & print_events(std::ostream &os) const
Print events recorded in this list.
Definition thread.h:781
unsigned int n_
The number of events recorded.
Definition thread.h:745
TaskEventList(const TaskEventList &)=delete
virtual ~TaskEventList()=default
Virtual destructor.
std::unique_ptr< TaskEvent[]> events_
The event array.
Definition thread.h:746
Task event class.
Definition thread.h:539
std::string get_name() const
Get name of the function pointer.
Definition thread.h:577
friend std::ostream & operator<<(std::ostream &os, const TaskEvent &te)
Output the task data using a tab-separated list.
Definition thread.h:651
void start(const std::pair< void *, unsigned short > &id, const unsigned short threads, const double submit_time)
Record the start time of the task and collect task information.
Definition thread.h:624
static void print_demangled(std::ostream &os, const char *symbol)
Print demangled symbol name.
Definition thread.h:553
unsigned short threads_
Number of threads used by the task.
Definition thread.h:543
void stop()
Record the stop time of the task.
Definition thread.h:634
double times_[3]
Task trace times: { submit, start, stop }.
Definition thread.h:541
std::pair< void *, unsigned short > id_
Task identification information.
Definition thread.h:542
This class collects and prints task profiling data.
Definition thread.h:795
TaskEventListBase * head_
The head of the linked list of data.
Definition thread.h:797
TaskProfiler()
Default constructor.
Definition thread.h:815
void write_to_file()
Write the profile data to file.
Definition thread.cc:213
~TaskProfiler()
Destructor.
Definition thread.h:820
TaskEventList * new_list(const std::size_t nmax)
Create a new task event list.
Definition thread.h:835
TaskProfiler & operator=(const TaskProfiler &)=delete
TaskEventListBase * tail_
The tail of the linked list of data.
Definition thread.h:798
TaskProfiler(const TaskProfiler &)=delete
static const char * output_file_name_
The output file name.
Definition thread.h:811
static Mutex output_mutex_
Mutex used to lock the output file.
Definition thread.h:800
static const double R
Definition csqrt.cc:46
const std::size_t bufsize
Definition derivatives.cc:16
real_function_3d mask
Definition dirac-hatom.cc:27
Implements DQueue.
auto T(World &world, response_space &f) -> response_space
Definition global_functions.cc:34
void threadpool_wait_policy(WaitPolicy policy, int sleep_duration_in_microseconds=0)
Definition thread.h:1541
static const double v
Definition hatom_sf_dirac.cc:20
Tensor< double > op(const Tensor< double > &x)
Definition kain.cc:508
#define MADNESS_CHECK(condition)
Check a condition — even in a release build the condition is always evaluated so it can have side eff...
Definition madness_exception.h:182
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
WaitPolicy
wait policies supported by ConditionVariable/DQueue/ThreadPool
Definition worldmutex.h:492
static double cpu_time()
Returns the cpu time in seconds relative to an arbitrary origin.
Definition timers.h:127
ThreadBinder binder
Definition thread.cc:71
void thread_purge()
Definition thread.h:1548
static void myusleep(unsigned int us)
Sleep or spin for specified number of microseconds.
Definition timers.h:164
bool is_madness_thread()
Definition thread_info.h:70
NDIM & f
Definition mra.h:2481
void error(const char *msg)
Definition world.cc:139
double wall_time()
Returns the wall time in seconds relative to an arbitrary origin.
Definition timers.cc:48
std::string type(const PairType &n)
Definition PNOParameters.h:18
constexpr R blocking_invoke_r(F &&f, Args &&... args) noexcept(std::is_nothrow_invocable_v< F, Args... >)
Definition thread.h:1563
constexpr decltype(auto) blocking_invoke(F &&f, Args &&... args) noexcept(std::is_nothrow_invocable_v< F, Args... >)
Definition thread.h:1556
std::string name(const FuncType &type, const int ex=-1)
Definition ccpairfunction.h:28
Definition dqueue.h:59
Function traits in the spirit of boost function traits.
Definition function_traits.h:13
Member function traits in the spirit of boost function traits.
Definition function_traits.h:21
static double current_time
Definition tdse1d.cc:160
int task(int i)
Definition test_runtime.cpp:4
int main()
Definition testmolbas.cc:37
const char * status[2]
Definition testperiodic.cc:43
Implements thread introspection for Pthreads backend.
Object that is used to convert function and member function pointers into void*.
Definition thread.h:905