MADNESS 0.10.1
thread.h
Go to the documentation of this file.
1/*
2 This file is part of MADNESS.
3
4 Copyright (C) 2007,2010 Oak Ridge National Laboratory
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
20 For more information please contact:
21
22 Robert J. Harrison
23 Oak Ridge National Laboratory
24 One Bethel Valley Road
25 P.O. Box 2008, MS-6367
26
27 email: harrisonrj@ornl.gov
28 tel: 865-241-3937
29 fax: 865-572-0680
30*/
31
32#ifndef MADNESS_WORLD_THREAD_H__INCLUDED
33#define MADNESS_WORLD_THREAD_H__INCLUDED
34
35/**
36 \file thread.h
37 \brief Implements Dqueue, Thread, ThreadBase and ThreadPool.
38 \ingroup threads
39*/
40
44#include <vector>
45#include <cstddef>
46#include <cstdio>
47#include <pthread.h>
48#include <type_traits>
49#include <typeinfo>
50#include <new>
51
52//////////// Parsec Related Begin ////////////////////
53#ifdef HAVE_PARSEC
54#include "parsec.h"
55#endif
56//////////// Parsec Related End ////////////////////
57
58#ifdef MADNESS_TASK_PROFILING
59#include <execinfo.h> // for backtrace_symbols
60#ifndef USE_LIBIBERTY
61#include <cxxabi.h> // for abi::__cxa_demangle
62#else
63extern "C" {
64 extern char * cplus_demangle (const char *mangled, int options);
65#define DMGL_NO_OPTS 0 /* For readability... */
66}
67#endif
68#include <sstream> // for std::istringstream
69#include <cstring> // for strchr & strrchr
70#endif // MADNESS_TASK_PROFILING
71
72#ifdef HAVE_INTEL_TBB
73#include <tbb/task_arena.h>
74#ifndef TBB_PREVIEW_GLOBAL_CONTROL
75# define TBB_PREVIEW_GLOBAL_CONTROL 1
76#endif
77# include <tbb/global_control.h>
78#endif
79
80
81#ifndef _SC_NPROCESSORS_CONF
82// Old macs don't have necessary support thru sysconf to determine the
83// no. of processors so must use sysctl
84#include <sys/types.h>
85#include <sys/sysctl.h>
86#endif
87
88namespace madness {
89
90 // Forward decls.
91 class Barrier;
92 class ThreadPool;
93 class WorldTaskQueue;
94 class AtomicInt;
95 void error(const char *msg);
96
98 static const size_t maxncpu = 1024;
99 bool print;
100 size_t ncpu = 0;
101 bool do_bind = true;
102 size_t cpus[maxncpu];
103 std::atomic<size_t> nextcpu = 0;
104 static thread_local bool bound;
105
106 public:
107
108 ThreadBinder(bool print = false) : print(print) {
109#ifndef ON_A_MAC
110 ncpu = 0;
111 cpu_set_t mask;
112 sched_getaffinity(0, sizeof(mask), &mask);
113 for (size_t i=0; i<maxncpu; i++) {
114 if (CPU_ISSET(int(i),&mask)) {
116 cpus[ncpu++] = i;
117 }
118 }
119 if (print) {
120 std::cout << "ncpu: " << get_ncpu() << std::endl;
121 for (size_t i=0; i<get_ncpu(); i++) {
122 std::cout << get_cpus()[i] << " " ;
123 }
124 std::cout << std::endl;
125 }
126 nextcpu = ncpu/2;
127#endif
128 if (this->print) { };
129 }
130
131 void set_do_bind(bool value) {do_bind = value;}
132
133 const size_t* get_cpus() const { return cpus; }
134
135 const size_t get_ncpu() const { return ncpu; }
136
137 void bind() {
138#ifndef ON_A_MAC
139 if (do_bind && !bound) { // In TBB this is called by each task, so check do_bind first
140 bound = true;
141 cpu_set_t mask;
142 CPU_ZERO(&mask);
143 size_t cpu = cpus[nextcpu++ % ncpu];
144 CPU_SET(cpu, &mask);
145 sched_setaffinity(0, sizeof(mask), &mask);
146 if (print) std::cout << "bound thread to " << cpu << std::endl;
147 }
148#endif
149 }
150 };
151
152 extern ThreadBinder binder;
153
154 /// \addtogroup threads
155 /// @{
156
157 /// Simplified thread wrapper to hide pthread complexity.
158
159 /// If the thread is using any of the object state, you cannot
160 /// delete the object until the thread has terminated.
161 ///
162 /// The cleanest solution is to put the object on the heap and
163 /// have the run method `delete this` at its end.
165 friend class ThreadPool;
166
167 static pthread_key_t thread_key; ///< Thread id key.
168
169 /// \todo Brief description needed.
170
171 /// \todo Descriptions needed.
172 /// \param[in,out] self Description needed.
173 /// \return Description needed.
174 static void* main(void* self);
175
176 int pool_num; ///< Stores index of thread in pool or -1.
177 pthread_t id; ///< \todo Brief description needed.
178
179 /// \todo Brief description needed.
180 static void init_thread_key() {
181 const int rc = pthread_key_create(&thread_key, nullptr);
182 if(rc != 0)
183 MADNESS_EXCEPTION("pthread_key_create failed", rc);
184 }
185
186 /// \todo Brief description needed.
187 static void delete_thread_key() {
188 pthread_key_delete(thread_key);
189 }
190
191 /// Sets the index of this thread within the pool.
192
193 /// \todo Verify documentation.
194 /// \param[in] i The index of this thread.
196 pool_num = i;
197 }
198
199#if defined(HAVE_IBMBGQ) and defined(HPM)
200 static const int hpm_thread_id_all = -10; ///< \todo Brief description needed.
201 static const int hpm_thread_id_main = -2; ///< \todo Brief description needed.
202 static bool main_instrumented; ///< \todo Brief description needed.
203 static bool all_instrumented; ///< \todo Brief description needed.
204 static int hpm_thread_id; ///< \todo Brief description needed.
205#endif
206
207 public:
208
209 /// Default constructor.
210
211 /// Sets up the thread; however, \c start() must be invoked to
212 /// actually begin the thread.
214
215 virtual ~ThreadBase() { }
216
217 /// Function to be executed by the thread.
218
219 /// Override this to do work.
220 virtual void run() = 0;
221
222 /// Start the thread running.
223 void start();
224
225 /// A thread can call this to terminate its execution.
226 static void exit() {
227 pthread_exit(0);
228 }
229
230 /// Get the pthread id of this thread (if running).
231 const pthread_t& get_id() const {
232 return id;
233 }
234
235 /// Get index of this thread in \c ThreadPool.
236
237 /// \return (0,...,nthread-1) or -1 if not in the \c ThreadPool.
239 return pool_num;
240 }
241
242 /// Cancel this thread.
243 int cancel() const {
244 return pthread_cancel(get_id());
245 }
246
247
248 /// Get number of actual hardware processors.
249
250 /// \return The number of hardward processors.
251 static int num_hw_processors();
252
253 /// \todo Brief description needed.
254
255 /// \todo Descriptions needed.
256 /// \return Description needed.
258 return static_cast<ThreadBase*>(pthread_getspecific(thread_key));
259 }
260
261#if defined(HAVE_IBMBGQ) and defined(HPM)
262 /// \todo Brief description needed.
263
264 /// \todo Descriptions needed.
265 /// \param[in] hpm_thread_id Description needed.
266 static void set_hpm_thread_env(int hpm_thread_id);
267#endif
268 }; // class ThreadBase
269
270 /// Simplified thread wrapper to hide pthread complexity.
271 class Thread : public ThreadBase {
272 void* (*f)(void *); ///< The function called for executing this thread. \todo should we replace this by a std::function?
273 void* args; ///< The arguments passed to this thread for execution.
274
275 /// Invokes the function for this thread.
276 void run() {
277 f(args);
278 }
279
280 public:
281 /// Default constructor.
282
283 /// \c start() must be invoked to actually execute the thread.
284 Thread() : f(nullptr), args(nullptr) { }
285
286 /// Create a thread and start it running `f(args)`.
287
288 /// \param[in] f The function to be called.
289 /// \param[in,out] args The arguments to the function.
290 Thread(void* (*f)(void *), void* args=nullptr)
291 : f(f), args(args) {
293 }
294
295 /// Start the thread by running `f(args)`.
296
297 /// \param[in] f The function to be called.
298 /// \param[in,out] args The arguments to the function.
299 void start(void* (*f)(void *), void* args=nullptr) {
300 this->f = f;
301 this->args = args;
303 }
304
305 virtual ~Thread() = default;
306 }; // class Thread
307
308
309 /// Contains attributes of a task.
310
311 /// The current attributes are:
312 /// - \c generator : Setting this hints that a task will produce
313 /// additional tasks and is used by the scheduler to
314 /// increase/throttle parallelism. The default is false.
315 /// - \c stealable : Setting this indicates that a task may be
316 /// migrated to another process for dynamic load balancing. The
317 /// default value is false.
318 /// - \c highpriority : indicates a high priority task. The default
319 /// value is false.
320 /// - \c nthread : indicates number of threads. 0 threads is interpreted
321 /// as 1 thread for backward compatibility and ease of specifying
322 /// defaults. The default value is 0 (==1).
324 unsigned long flags; ///< Byte-string storing the specified attributes.
325
326 public:
327 static const unsigned long NTHREAD = 0xff; ///< Mask for nthread byte.
328 static const unsigned long GENERATOR = 1ul<<8; ///< Mask for generator bit.
329 static const unsigned long STEALABLE = GENERATOR<<1; ///< Mask for stealable bit.
330 static const unsigned long HIGHPRIORITY = GENERATOR<<2; ///< Mask for priority bit.
331
332 /// Sets the attributes to the desired values.
333
334 /// `flags`, if unspecified sets all attributes to their default
335 /// values.
336 /// \param[in] flags The attribute values.
337 explicit TaskAttributes(unsigned long flags = 0)
338 : flags(flags) {}
339
340 /// Copy constructor.
341
342 /// \param[in] attr The attributes to copy.
344 : flags(attr.flags) {}
345
346 virtual ~TaskAttributes() {}
347
348 /// Test if the generator attribute is true.
349
350 /// \return True if this task is a generator, false otherwise.
351 bool is_generator() const {
352 return flags&GENERATOR;
353 }
354
355 /// Test if the stealable attribute is true.
356
357 /// \return True if this task is stealable, false otherwise.
358 bool is_stealable() const {
359 return flags&STEALABLE;
360 }
361
362 /// Test if the high priority attribute is true.
363
364 /// \return True if this task is a high priority, false otherwise.
365 bool is_high_priority() const {
366 return flags&HIGHPRIORITY;
367 }
368
369 /// Sets the generator attribute.
370
371 /// \param[in] generator_hint The new value for the generator attribute.
372 TaskAttributes& set_generator(bool generator_hint) {
373 if (generator_hint)
374 flags |= GENERATOR;
375 else
376 flags &= ~GENERATOR;
377 return *this;
378 }
379
380 /// Sets the stealable attribute.
381
382 /// \param[in] stealable The new value for the stealable attribute.
383 TaskAttributes& set_stealable(bool stealable) {
384 if (stealable) flags |= STEALABLE;
385 else flags &= ~STEALABLE;
386 return *this;
387 }
388
389 /// Sets the high priority attribute.
390
391 /// \param[in] hipri The new value for the high priority attribute.
393 if (hipri)
395 else
396 flags &= ~HIGHPRIORITY;
397 return *this;
398 }
399
400 /// Set the number of threads.
401
402 /// \attention Are you sure this is what you want to call? Only call
403 /// this for a \c TaskAttributes that is \em not a base class of a task
404 /// object.
405 /// \p If you are trying to set the number of threads in an \em existing
406 /// task you should call \c TaskInterface::set_nthread() instead. No
407 /// doubt there is some virtual/protected/something voodoo to prevent
408 /// you from doing harm.
409 ///
410 /// \todo Perhaps investigate a way to make this function only accessible
411 /// from the intended functions (using the so-called voodoo)?
412 ///
413 /// \param[in] nthread The new number of threads.
414 void set_nthread(int nthread) {
415 MADNESS_ASSERT(nthread>=0 && nthread<256);
416 flags = (flags & (~NTHREAD)) | (nthread & NTHREAD);
417 }
418
419 /// Get the number of threads.
420
421 /// \return The number of threads.
422 int get_nthread() const {
423 int n = flags & NTHREAD;
424 if (n == 0)
425 n = 1;
426 return n;
427 }
428
429 /// Serializes the attributes for I/O.
430
431 /// tparam Archive The archive type.
432 /// \param[in,out] ar The archive.
433 template <typename Archive>
434 void serialize(Archive& ar) {
435 ar & flags;
436 }
437
438 /// \todo Brief description needed.
439
440 /// \todo Descriptions needed.
441 /// \return Description needed.
444 }
445
446 /// \todo Brief description needed.
447
448 /// \todo Descriptions needed.
449 /// \return Description needed.
452 }
453
454 /// \todo Brief description needed.
455
456 /// \todo Descriptions needed.
457 /// \return Description needed.
458 static TaskAttributes multi_threaded(int nthread) {
460 t.set_nthread(nthread);
461 return t;
462 }
463 };
464
465 /// Used to pass information about the thread environment to a user's task.
467 const int _nthread; ///< Number of threads collaborating on task.
468 const int _id; ///< ID of this thread (0,...,nthread-1).
469 Barrier* _barrier; ///< Pointer to the shared barrier, `null` if there is only a single thread.
470
471 public:
472 /// Constructor collecting necessary environmental information.
473
474 /// \todo Verify this documentation.
475 /// \param[in] nthread The number of threads collaborating on this task.
476 /// \param[in] id The ID of this thread.
477 /// \param[in] barrier Pointer to the shared barrier.
481
482#if HAVE_INTEL_TBB
483 /// Constructor collecting necessary environmental information.
484
485 /// \todo Verify this documentation.
486 /// \param[in] nthread The number of threads collaborating on this task.
487 /// \param[in] id The ID of this thread.
488 ///
489 /// \todo I cannot get the TaskThreadEnv to work with Barrier.
490 /// Need to figure out why.
492 : _nthread(nthread), _id(id), _barrier(nullptr)
494#endif
495
496 /// Get the number of threads collaborating on this task.
497
498 /// \return The number of threads.
499 int nthread() const {
500 return _nthread;
501 }
502
503 /// Get the ID of this thread.
504
505 /// \return The ID of this thread.
506 int id() const {
507 return _id;
508 }
509
510 /// \todo Brief description needed.
511
512 /// \todo Descriptions needed.
513 /// \return Description needed.
514 bool barrier() const {
515 if (_nthread == 1)
516 return true;
517 else {
519 return _barrier->enter(_id);
520 }
521 }
522 };
523
524
525#ifdef MADNESS_TASK_PROFILING
526
527 namespace profiling {
528
529 /// Task event class.
530
531 /// This class is used to record the task trace information, including
532 /// submit, start, and stop times, as well as identification information.
533 class TaskEvent {
534 private:
535 double times_[3]; ///< Task trace times: { submit, start, stop }.
536 std::pair<void*, unsigned short> id_; ///< Task identification information.
537 unsigned short threads_; ///< Number of threads used by the task.
538
539 /// Print demangled symbol name.
540
541 /// Add the demangled symbol name to \c os. If demangling fails,
542 /// the unmodified symbol name is used instead. If symbol is NULL,
543 /// "UNKNOWN" is used instead. A tab character is added after the
544 /// symbol name.
545 /// \param[in,out] os The output stream.
546 /// \param[in] symbol The symbol to add to the stream.
547 static void print_demangled(std::ostream& os, const char* symbol) {
548 // Get the demagled symbol name
549 if(symbol) {
550 int status = 0;
551#ifndef USE_LIBIBERTY
552 const char* name = abi::__cxa_demangle(symbol, 0, 0, &status);
553#else
554 char* name = cplus_demangle(symbol, DMGL_NO_OPTS);
555#endif
556 // Append the demangled symbol name to the output stream
557 if(status == 0) {
558 os << name << "\t";
559 free((void*)name);
560 } else {
561 os << symbol << "\t";
562 }
563 } else {
564 os << "UNKNOWN\t";
565 }
566 }
567
568 /// Get name of the function pointer.
569
570 /// \return The mangled function name.
571 std::string get_name() const {
572
573 // Get the backtrace symbol for the function address,
574 // which contains the function name.
575 void* const * func_ptr = const_cast<void* const *>(& id_.first);
576 char** bt_sym = backtrace_symbols(func_ptr, 1);
577
578 // Extract the mangled function name from the backtrace
579 // symbol.
580 std::string mangled_name;
581
582#ifdef ON_A_MAC
583 // Format of bt_sym is:
584 // <frame #> <file name> <address> <mangled name> + <function offset>
585 std::istringstream iss(bt_sym[0]);
586 long frame;
587 std::string file, address;
588 iss >> frame >> file >> address >> mangled_name;
589#else // Assume Linux
590 // Format of bt_sym is:
591 // <file>(<mangled name>+<function offset>) [<address>]
592 const char* first = strchr(bt_sym[0],'(');
593 if(first) {
594 ++first;
595 const char* last = strrchr(first,'+');
596 if(last)
597 mangled_name.assign(first, (last - first) - 1);
598 }
599#endif // ON_A_MAC
600
601 // Free the backtrace buffer
602 free(bt_sym);
603
604 return mangled_name;
605 }
606
607 public:
608
609 // Only default constructors are needed.
610
611 /// Record the start time of the task and collect task information.
612
613 /// \param[in,out] id The task identifier (a function pointer or const char*)
614 /// and an integer to differentiate the different types.
615 /// \param[in] threads The number of threads this task uses.
616 /// \param[in] submit_time The time that the task was submitted to the
617 /// task queue.
618 void start(const std::pair<void*, unsigned short>& id,
619 const unsigned short threads, const double submit_time)
620 {
621 id_ = id;
622 threads_ = threads;
623 times_[0] = submit_time;
624 times_[1] = wall_time();
625 }
626
627 /// Record the stop time of the task.
628 void stop() {
629 times_[2] = wall_time();
630 }
631
632 /// Output the task data using a tab-separated list.
633
634 /// Output information includes
635 /// - the ID pointer
636 /// - the function, member function, and object type name
637 /// - the number of threads used by the task
638 /// - the submit time
639 /// - the start time
640 /// - the stop time.
641 ///
642 /// \param[in,out] os The output stream.
643 /// \param[in] te The task event to be output.
644 /// \return The \c os reference.
645 friend std::ostream& operator<<(std::ostream& os, const TaskEvent& te) {
646 // Add address to output stream
647 os << std::hex << std::showbase << te.id_.first <<
648 std::dec << std::noshowbase << "\t";
649
650 // Print the name
651 switch(te.id_.second) {
652 case 1:
653 {
654 const std::string mangled_name = te.get_name();
655
656 // Print the demangled name
657 if(! mangled_name.empty())
658 print_demangled(os, mangled_name.c_str());
659 else
660 os << "UNKNOWN\t";
661 }
662 break;
663 case 2:
664 print_demangled(os, static_cast<const char*>(te.id_.first));
665 break;
666 default:
667 os << "UNKNOWN\t";
668 }
669
670 // Print:
671 // # of threads, submit time, start time, stop time
672 os << te.threads_;
673 const std::streamsize precision = os.precision();
674 os.precision(6);
675 os << std::fixed << "\t" << te.times_[0]
676 << "\t" << te.times_[1] << "\t" << te.times_[2];
677 os.precision(precision);
678 return os;
679 }
680
681 }; // class TaskEvent
682
683 /// Task event list base class.
684
685 /// This base class allows the data to be stored in a linked list.
687 private:
688 TaskEventListBase* next_; ///< The next task event in the list.
689
692
693 public:
694
695 /// Default constructor.
697 : next_(nullptr) { }
698
699 /// Virtual destructor.
700 virtual ~TaskEventListBase() = default;
701
702 /// Get the next event list in the linked list.
703
704 /// \return The next event list.
706 return next_;
707 }
708
709 /// Insert \c list after this list.
710
711 /// \param[in] list The list to be inserted.
713 if(next_)
714 list->next_ = next_;
715 next_ = list;
716 }
717
718 /// Output a task event list to an output stream.
719
720 /// \param[in,out] os The ouptut stream.
721 /// \param[in] tel The task event list to be output.
722 /// \return The modified output stream.
723 friend inline std::ostream& operator<<(std::ostream& os, const TaskEventListBase& tel) {
724 return tel.print_events(os);
725 }
726
727 private:
728
729 /// Print the events.
730 virtual std::ostream& print_events(std::ostream&) const = 0;
731
732 }; // class TaskEventList
733
734 /// A list of task events.
735
736 /// This object is used by the thread pool to record task data.
738 private:
739 unsigned int n_; ///< The number of events recorded.
740 std::unique_ptr<TaskEvent[]> events_; ///< The event array.
741
742 TaskEventList(const TaskEventList&) = delete;
744
745 public:
746
747 /// Default constructor.
748
749 /// \param[in] nmax The maximum number of task events.
750 /// \todo Should nmax be stored? I think it used to be a template
751 /// parameter (N), which is no longer present.
752 TaskEventList(const unsigned int nmax) :
753 TaskEventListBase(), n_(0ul), events_(new TaskEvent[nmax])
754 { }
755
756 /// Virtual destructor.
757 virtual ~TaskEventList() = default;
758
759 /// Get a new event from this list.
760
761 /// \warning This function can only be called \c nmax times. It is
762 /// the caller's resonsibility to ensure that it is not called too
763 /// many times.
764 /// \return The new event from the list.
766 return events_.get() + (n_++);
767 }
768
769 private:
770
771 /// Print events recorded in this list.
772
773 /// \param[in,out] os The output stream.
774 /// \return The modified output stream.
775 virtual std::ostream& print_events(std::ostream& os) const {
776 const int thread_id = ThreadBase::this_thread()->get_pool_thread_index();
777 for(std::size_t i = 0; i < n_; ++i)
778 os << thread_id << "\t" << events_[i] << std::endl;
779 return os;
780 }
781
782 }; // class TaskEventList
783
784 /// This class collects and prints task profiling data.
785
786 /// \note Each thread has its own \c TaskProfiler object, so only one
787 /// thread will ever operate on this object at a time and all operations
788 /// are inheirently thread safe.
790 private:
791 TaskEventListBase* head_; ///< The head of the linked list of data.
792 TaskEventListBase* tail_; ///< The tail of the linked list of data.
793
794 static Mutex output_mutex_; ///< Mutex used to lock the output file.
795
796 TaskProfiler(const TaskProfiler&) = delete;
798
799 public:
800 /// The output file name.
801
802 /// This variable is initialized by \c ThreadPool::begin and is
803 /// assigned the value given by the environment variable
804 /// `MAD_TASKPROFILER_NAME`.
805 static const char* output_file_name_;
806
807 public:
808 /// Default constructor.
810 : head_(nullptr), tail_(nullptr)
811 { }
812
813 /// Destructor.
815 // Cleanup linked list
816 TaskEventListBase* next = nullptr;
817 while(head_ != nullptr) {
818 next = head_->next();
819 delete head_;
820 head_ = next;
821 }
822 }
823
824 /// Create a new task event list.
825
826 /// \param[in] nmax The maximum number of elements that the list
827 /// can contain.
828 /// \return A new task event list.
829 TaskEventList* new_list(const std::size_t nmax) {
830 // Create a new event list
831 TaskEventList* list = new TaskEventList(nmax);
832
833 // Append the list to the tail of the linked list
834 if(head_ != nullptr) {
835 tail_->insert(list);
836 tail_ = list;
837 } else {
838 head_ = list;
839 tail_ = list;
840 }
841 return list;
842 }
843
844 /// Write the profile data to file.
845
846 /// The data is cleared after it is written to the file, so this
847 /// function may be called more than once.
848 ///
849 /// \warning This function should only be called from the thread
850 /// that owns it, otherwise data will likely be corrupted.
851 ///
852 /// \note This function is thread safe, in that it may be called by
853 /// different objects in different threads simultaneously.
854 void write_to_file();
855 }; // class TaskProfiler
856
857 } // namespace profiling
858
859#endif // MADNESS_TASK_PROFILING
860
861
862 /// Lowest level task interface.
863
864 /// The pool invokes \c run_multi_threaded(), which does any necessary
865 /// setup for multiple threads, and then invokes the user's \c run() method.
867 {
868 friend class ThreadPool;
869
870 private:
871
872#ifdef MADNESS_TASK_PROFILING
873 profiling::TaskEvent* task_event_; ///< \todo Description needed.
874 double submit_time_; ///< \todo Description needed.
875 std::pair<void*, unsigned short> id_; ///< \todo Description needed.
876
877 /// \todo Brief description needed.
878
879 /// \todo Descriptions needed.
880 /// \param[in,out] task_event Description needed.
881 void set_event(profiling::TaskEvent* task_event) {
882 task_event_ = task_event;
883 }
884
885 /// Collect info on the task and record the submit time.
886 void submit() {
888 this->get_id(id_);
889 }
890#endif // MADNESS_TASK_PROFILING
891
892 /// Object that is used to convert function and member function pointers into `void*`.
893
894 /// \note This is technically not supported by the C++ standard but
895 /// it will likely not cause any issues here (famous last words?).
896 /// \todo Descriptions needed.
897 /// \tparam T Description needed.
898 template <typename T>
900 T in; ///< \todo Description needed.
901 void* out; ///< \todo Description needed.
902 };
903
904 protected:
905
906 /// \todo Brief description needed.
907
908 /// \todo Descriptions needed.
909 /// \tparam fnT Description needed.
910 /// \param[in,out] id Description needed.
911 /// \param[in] fn Description needed.
912 /// \return Description needed.
913 template <typename fnT>
914 static typename std::enable_if<detail::function_traits<fnT>::value ||
916 make_id(std::pair<void*,unsigned short>& id, fnT fn) {
918 poop.in = fn;
919 id.first = poop.out;
920 id.second = 1ul;
921 }
922
923 /// \todo Brief description needed.
924
925 /// \todo Descriptions needed. What is the purpose of the second argument?
926 /// \tparam fnobjT Description needed.
927 /// \param[in,out] id Description needed.
928 template <typename fnobjT>
929 static typename std::enable_if<!(detail::function_traits<fnobjT>::value ||
931 make_id(std::pair<void*,unsigned short>& id, const fnobjT&) {
932 id.first = reinterpret_cast<void*>(const_cast<char*>(typeid(fnobjT).name()));
933 id.second = 2ul;
934 }
935
936 private:
937
938 /// \todo Brief description needed.
939
940 /// \todo Descriptions needed.
941 /// \param[in,out] id Description needed.
942 virtual void get_id(std::pair<void*,unsigned short>& id) const {
943 id.first = nullptr;
944 id.second = 0ul;
945 }
946
947#ifndef HAVE_INTEL_TBB
948
949 Barrier* barrier; ///< Barrier, only allocated for multithreaded tasks.
950 AtomicInt count; ///< Used to count threads as they start.
951
952 /// Returns true for the one thread that should invoke the destructor.
953
954 /// \return True for the one thread that should invoke the destructor.
955 bool run_multi_threaded() {
956 // As a thread enters this routine it increments the shared counter
957 // to generate a unique id without needing any thread-local storage.
958 // A downside is this does not preserve any relationships between thread
959 // numbering and the architecture ... more work ahead.
960 int nthread = get_nthread();
961 if (nthread == 1) {
962#ifdef MADNESS_TASK_PROFILING
963 task_event_->start(id_, nthread, submit_time_);
964#endif // MADNESS_TASK_PROFILING
965 run(TaskThreadEnv(1,0,0));
966#ifdef MADNESS_TASK_PROFILING
967 task_event_->stop();
968#endif // MADNESS_TASK_PROFILING
969 return true;
970 }
971 else {
972 int id = count++;
973 volatile bool barrier_flag;
974 barrier->register_thread(id, &barrier_flag);
975
976#ifdef MADNESS_TASK_PROFILING
977 if(id == 0)
978 task_event_->start(id_, nthread, submit_time_);
979#endif // MADNESS_TASK_PROFILING
980
981 run(TaskThreadEnv(nthread, id, barrier));
982
983#ifdef MADNESS_TASK_PROFILING
984 const bool cleanup = barrier->enter(id);
985 if(cleanup) task_event_->stop();
986 return cleanup;
987#else
988 return barrier->enter(id);
989#endif // MADNESS_TASK_PROFILING
990 }
991 }
992
993 public:
994
995 /// Default constructor.
998 , barrier(nullptr)
999#if HAVE_PARSEC
1000 , parsec_task(ParsecRuntime::task(is_high_priority(), this))
1001#endif
1002 {
1003 count = 0;
1004 }
1005
1006 /// Constructor setting the specified task attributes.
1007
1008 /// \param[in] attr The task attributes.
1009 explicit PoolTaskInterface(const TaskAttributes& attr)
1010 : TaskAttributes(attr)
1011 , barrier(attr.get_nthread()>1 ? new Barrier(attr.get_nthread()) : 0)
1012#if HAVE_PARSEC
1013 , parsec_task(ParsecRuntime::task(is_high_priority(), this))
1014#endif
1015 {
1016 count = 0;
1017 }
1018
1019 /// Destructor.
1020 /// \todo Should we either use a unique_ptr for barrier or check that barrier != nullptr here?
1021 virtual ~PoolTaskInterface() {
1022#if HAVE_PARSEC
1023 *(reinterpret_cast<PoolTaskInterface**>(&(parsec_task->locals[0]))) = nullptr;
1024 ParsecRuntime::delete_parsec_task(parsec_task);
1025 parsec_task = nullptr;
1026#endif
1027 delete barrier;
1028 }
1029
1030 /// Call this to reset the number of threads before the task is submitted.
1031
1032 /// Once a task has been constructed, /c TaskAttributes::set_nthread()
1033 /// is insufficient because a multithreaded task includes a barrier
1034 /// that needs to know the number of threads.
1035 ///
1036 /// \param[in] nthread The new number of threads.
1037 void set_nthread(int nthread) {
1038 if (nthread != get_nthread()) {
1040 delete barrier;
1041 if (nthread > 1)
1042 barrier = new Barrier(nthread);
1043 else
1044 barrier = 0;
1045 }
1046 }
1047#if HAVE_PARSEC
1048 //////////// Parsec Related Begin ////////////////////
1049 parsec_task_t *parsec_task;
1050 //////////// Parsec Related End ///////////////////
1051#endif
1052
1053#else
1054
1055 public:
1056
1057 /// Default constructor.
1060
1061 /// \todo Brief description needed.
1062
1063 /// \todo Descriptions needed.
1064 /// \param[in] attr Description needed.
1065 explicit PoolTaskInterface(const TaskAttributes& attr) :
1066 TaskAttributes(attr)
1067 {
1068 }
1069
1070 /// Destructor.
1071 virtual ~PoolTaskInterface() = default;
1072
1073 /// Call this to reset the number of threads before the task is submitted
1074
1075 /// Once a task has been constructed /c TaskAttributes::set_nthread()
1076 /// is insufficient because a multithreaded task includes a
1077 /// barrier that needs to know the number of threads.
1078 void set_nthread(int nthread) {
1079 if (nthread != get_nthread())
1081 }
1082
1083 /// \todo Brief description needed.
1084
1085 /// \todo Descriptions needed.
1086 /// \return Description needed.
1087 void execute() {
1088 const int nthread = get_nthread();
1089 run( TaskThreadEnv(nthread, 0) );
1090 }
1091
1092#endif // HAVE_INTEL_TBB
1093
1094 /// Override this method to implement a multi-threaded task.
1095
1096 /// \c info.nthread() will be the number of threads collaborating on this task.
1097 ///
1098 /// \c info.id() will be the index of the current thread \c id=0,...,nthread-1.
1099 ///
1100 /// \c info.barrier() will be a barrier for all of the threads, and returns
1101 /// true for the last thread to enter the barrier (other threads get false).
1102 ///
1103 /// \todo Description needed.
1104 /// \param[in] info Description needed.
1105
1106
1107 virtual void run(const TaskThreadEnv& info) = 0;
1108
1109 };
1110
1111 /// A no-operation task used for various purposes.
1113 public:
1114 /// Execution function that does nothing.
1115 void run(const TaskThreadEnv& /*info*/) {}
1116
1117 /// Destructor.
1118 virtual ~PoolTaskNull() {}
1119
1120 private:
1121 /// \todo Brief description needed.
1122
1123 /// \todo Description needed.
1124 /// \param[in,out] id Description needed.
1125 virtual void get_id(std::pair<void*,unsigned short>& id) const {
1127 }
1128 };
1129
1130 /// \c ThreadPool thread object.
1131
1132 /// This class holds thread local data for thread pool threads. It can be
1133 /// accessed via \c ThreadBase::this_thread().
1134 class ThreadPoolThread : public Thread {
1135 private:
1136 // Thread local data for thread pool
1137#ifdef MADNESS_TASK_PROFILING
1138 profiling::TaskProfiler profiler_; ///< \todo Description needed.
1139#endif // MADNESS_TASK_PROFILING
1140
1141 public:
1143 virtual ~ThreadPoolThread() = default;
1144
1145#ifdef MADNESS_TASK_PROFILING
1146 /// Task profiler accessor.
1147
1148 /// \todo Description needed.
1149 /// \return Description needed.
1153#endif // MADNESS_TASK_PROFILING
1154 };
1155
1156 /// A singleton pool of threads for dynamic execution of tasks.
1157
1158 /// \attention You must instantiate the pool while running with just one
1159 /// thread.
1161 public:
1162 // non-copyable and non-movable
1163 ThreadPool(const ThreadPool&) = delete;
1165 void operator=(const ThreadPool&) = delete;
1166 void operator=(ThreadPool&&) = delete;
1167
1168 /// Get the number of threads from the environment.
1169
1170 /// \return The number of threads.
1171 static int default_nthread();
1172
1173 private:
1174 friend class WorldTaskQueue;
1175
1176 // Thread pool data
1177 ThreadPoolThread *threads; ///< Array of threads.
1178 ThreadPoolThread main_thread; ///< Placeholder for main thread tls.
1179 DQueue<PoolTaskInterface*> queue; ///< Queue of tasks.
1180 int nthreads; ///< Number of threads.
1181 volatile bool finish; ///< Set to true when time to stop.
1182 AtomicInt nfinished; ///< Thread pool exit counter.
1183
1184 // Static data
1185 static ThreadPool* instance_ptr; ///< Singleton pointer.
1186 static const int nmax = 128; ///< Number of task a worker thread will pop from the task queue
1187 static double await_timeout; ///< Waiter timeout.
1188
1189#if defined(HAVE_IBMBGQ) and defined(HPM)
1190 static unsigned int main_hpmctx; ///< HPM context for main thread.
1191#endif
1192 /// The constructor is private to enforce the singleton model.
1193
1194 /// \todo Description needed.
1195 /// \param[in] nthread Description needed.
1196 ThreadPool(int nthread=-1);
1197
1198 /// Run the next task.
1199
1200 /// \todo Verify and complete this documentation.
1201 /// \param[in] wait Block of true.
1202 /// \param[in,out] this_thread Description needed.
1203 /// \return True if a task was run.
1204 bool run_task(bool wait, ThreadPoolThread* this_thread) {
1205#if HAVE_INTEL_TBB
1206 MADNESS_EXCEPTION("run_task should not be called when using Intel TBB", 1);
1207#else
1208
1209 if (!wait && queue.empty()) return false;
1210 std::pair<PoolTaskInterface*,bool> t = queue.pop_front(wait);
1211#ifdef MADNESS_TASK_PROFILING
1212 profiling::TaskEventList* event_list =
1213 this_thread->profiler().new_list(1);
1214#endif // MADNESS_TASK_PROFILING
1215 // Task pointer might be zero due to stealing
1216 if (t.second && t.first) {
1217#ifdef MADNESS_TASK_PROFILING
1218 t.first->set_event(event_list->event());
1219#endif // MADNESS_TASK_PROFILING
1220 if (t.first->run_multi_threaded()) // What we are here to do
1221 delete t.first;
1222 }
1223 return t.second;
1224#endif
1225 }
1226
1227 /// \todo Brief description needed.
1228
1229 /// \todo Descriptions needed.
1230 /// \param[in] wait Description needed.
1231 /// \param[in,out] this_thread Description needed.
1232 /// \return Description needed.
1233 bool run_tasks(bool wait, ThreadPoolThread* const this_thread) {
1234#if HAVE_INTEL_TBB
1235// if (!wait && tbb_task_list->empty()) return false;
1236// tbb::task* t = &tbb_task_list->pop_front();
1237// if (t) {
1238// tbb_parent_task->increment_ref_count();
1239// tbb_parent_task->enqueue(*t);
1240// }
1241
1242// wait = (tbb_parent_task->ref_count() >= 1) ? false : true;
1243// return wait;
1244
1245 MADNESS_EXCEPTION("run_tasks should not be called when using Intel TBB", 1);
1246#else
1247
1248 PoolTaskInterface* taskbuf[nmax];
1249 int ntask = queue.pop_front(nmax, taskbuf, wait);
1250#ifdef MADNESS_TASK_PROFILING
1251 profiling::TaskEventList* event_list =
1252 this_thread->profiler().new_list(ntask);
1253#endif // MADNESS_TASK_PROFILING
1254 for (int i=0; i<ntask; ++i) {
1255 if (taskbuf[i]) { // Task pointer might be zero due to stealing
1256#ifdef MADNESS_TASK_PROFILING
1257 taskbuf[i]->set_event(event_list->event());
1258#endif // MADNESS_TASK_PROFILING
1259 if (taskbuf[i]->run_multi_threaded()) {
1260 delete taskbuf[i];
1261 }
1262 }
1263 }
1264#if HAVE_PARSEC
1265 ////////////////// Parsec Related Begin //////////////////
1266 if(0 == ntask) {
1267 ntask = parsec_runtime->test();
1268 }
1269 ///////////////// Parsec Related End ////////////////////
1270#endif
1271 return (ntask>0);
1272#endif
1273 }
1274
1275 /// \todo Brief description needed.
1276
1277 /// \todo Description needed.
1278 /// \param[in,out] thread Description needed.
1279 void thread_main(ThreadPoolThread* const thread);
1280
1281 /// Forwards the thread to bound member function.
1282
1283 /// \todo Descriptions needed.
1284 /// \param[in] v Description needed.
1285 /// \return Description needed.
1286 static void* pool_thread_main(void *v);
1287
1288 public:
1289 /// Return a pointer to the only instance, constructing as necessary.
1290
1291 /// \return A pointer to the only instance.
1293#ifndef MADNESS_ASSERTIONS_DISABLE
1294 if(! instance_ptr) {
1295 std::cerr << "!!! ERROR: The thread pool has not been initialized.\n"
1296 << "!!! ERROR: Call madness::initialize before submitting tasks to the task queue.\n";
1297 MADNESS_EXCEPTION("ThreadPool::instance_ptr is NULL", 0);
1298 }
1299#endif
1300 return instance_ptr;
1301 }
1302
1304#if !(defined(HAVE_INTEL_TBB) || defined(HAVE_PARSEC))
1305 queue.lock_and_flush_prebuf();
1306#endif
1307 }
1308
1309#if HAVE_PARSEC
1310 ////////////////// Parsec Related Begin //////////////////
1311 static ParsecRuntime *parsec_runtime;
1312 ///////////////// Parsec Related End ////////////////////
1313#endif
1314
1315#if HAVE_INTEL_TBB
1316 static std::unique_ptr<tbb::global_control> tbb_control; ///< \todo Description needed.
1317 static std::unique_ptr<tbb::task_arena> tbb_arena;
1318#endif
1319
1320 /// Please invoke while in a single-threaded environment.
1321
1322 /// \todo Verify documentation.
1323 /// \param[in] nthread The number of threads.
1324 static void begin(int nthread=-1);
1325
1326 /// \todo Description needed.
1327 static void end();
1328
1329 /// Add a new task to the pool.
1330
1331 /// \todo Description needed.
1332 /// \param[in,out] task Description needed.
1334#ifdef MADNESS_TASK_PROFILING
1335 task->submit();
1336#endif // MADNESS_TASK_PROFILING
1337
1338#if HAVE_PARSEC
1339 //////////// Parsec Related Begin ////////////////////
1340 parsec_runtime->schedule(task);
1341 //////////// Parsec Related End ////////////////////
1342#elif HAVE_INTEL_TBB
1343//#ifdef MADNESS_CAN_USE_TBB_PRIORITY
1344// if(task->is_high_priority())
1345// tbb::task::enqueue(*task, tbb::priority_high);
1346// else
1347//#endif // MADNESS_CAN_USE_TBB_PRIORITY
1348 tbb_arena->enqueue(
1349 //use unique_ptr to automatically delete task ptr
1350 [task_p = std::unique_ptr<PoolTaskInterface>(task)] () noexcept {
1351 //exceptions are not expected here, as nobody will catch them for enqueued tasks
1352 task_p->execute();
1353 });
1354#else
1355 if (!task) MADNESS_EXCEPTION("ThreadPool: inserting a NULL task pointer", 1);
1356 int task_threads = task->get_nthread();
1357 // Currently multithreaded tasks must be shoved on the end of the q
1358 // to avoid a race condition as multithreaded task is starting up
1359 if (task->is_high_priority() && (task_threads == 1)) {
1360 instance()->queue.push_front(task);
1361 }
1362 else {
1363 instance()->queue.push_back(task, task_threads);
1364 }
1365#endif // HAVE_INTEL_TBB
1366 }
1367
1368 /// \todo Brief description needed.
1369
1370 /// \todo Descriptions needed.
1371 /// \tparam opT Description needed.
1372 /// \param[in,out] op Description needed.
1373 template <typename opT>
1374 void scan(opT& op) {
1375 queue.scan(op);
1376 }
1377
1378 /// Add a vector of tasks to the pool.
1379
1380 /// \param[in] tasks Vector of tasks to add to the pool.
1381 static void add(const std::vector<PoolTaskInterface*>& tasks) {
1382#if HAVE_INTEL_TBB
1383 MADNESS_EXCEPTION("Do not add tasks to the madness task queue when using Intel TBB.", 1);
1384#else
1385 typedef std::vector<PoolTaskInterface*>::const_iterator iteratorT;
1386 for (iteratorT it=tasks.begin(); it!=tasks.end(); ++it) {
1387 add(*it);
1388 }
1389#endif
1390 }
1391
1392 /// An otherwise idle thread can all this to run a task.
1393
1394 /// \return True if a task was run.
1395 static bool run_task() {
1396#ifdef HAVE_INTEL_TBB
1397 return false;
1398#else
1399
1400#ifdef MADNESS_TASK_PROFILING
1401 ThreadPoolThread* const thread = static_cast<ThreadPoolThread*>(ThreadBase::this_thread());
1402#else
1403 ThreadPoolThread* const thread = nullptr;
1404#endif // MADNESS_TASK_PROFILING
1405
1406 return instance()->run_tasks(false, thread);
1407#endif // HAVE_INTEL_TBB
1408 }
1409
1410 /// Returns the number of threads in the pool.
1411
1412 /// \return The number of threads in the pool.
1413 static std::size_t size() {
1414 return instance()->nthreads;
1415 }
1416
1417 /// Returns the number of tasks in the queue.
1418
1419 /// \return The number of tasks in the queue.
1420 static std::size_t queue_size() {
1421 return instance()->queue.size();
1422 }
1423
1424 /// Returns queue statistics.
1425
1426 /// \return Queue statistics.
1427 static const DQStats& get_stats();
1428
1429 /// Access the pool thread array
1430 /// \return ptr to the pool thread array, its size is given by \c size()
1432 return const_cast<const ThreadPoolThread*>(instance()->threads);
1433 }
1434
1435 /// Gracefully wait for a condition to become true, executing any tasks in the queue.
1436
1437 /// Probe should be an object that, when called, returns the status.
1438 /// \todo Descriptions needed/need verification.
1439 /// \tparam Probe Type of the probe.
1440 /// \param[in] probe The probe.
1441 /// \param[in] dowork Do work while waiting - default is true
1442 /// \param[in] sleep Sleep instead of spin while waiting (e.g., to avoid pounding on MPI) - default is false
1443 template <typename Probe>
1444 static void await(const Probe& probe, bool dowork = true, bool sleep = false) {
1445 if (!probe()) {
1446 double start = cpu_time();
1447 const double timeout = await_timeout;
1448 int counter = 0;
1449
1450 MutexWaiter waiter;
1451 while (!probe()) {
1452
1453 const bool working = (dowork ? ThreadPool::run_task() : false);
1454 const double current_time = cpu_time();
1455
1456 if (working) { // Reset timeout logic
1457 waiter.reset();
1458 start = current_time;
1459 counter = 0;
1460 } else {
1461 if(((current_time - start) > timeout) && (timeout > 1.0)) { // Check for timeout
1462 std::cerr << "!!MADNESS: Hung queue?" << std::endl;
1463 if (counter++ > 3) {
1464 const long bufsize=256;
1465 char errstr[bufsize];
1466 snprintf(errstr,bufsize, "ThreadPool::await() timed out after %.1lf seconds", timeout);
1467 throw madness::MadnessException(errstr, 0, 1,
1468 __LINE__, __FUNCTION__,
1469 __FILE__);
1470 }
1471 }
1472 if (sleep) {
1473 // THIS NEEDS TO BECOME AN EXTERNAL PARAMETER
1474 // Problem is exacerbated when running with many
1475 // (e.g., 512 or more) send/recv buffers, and
1476 // also with many threads. More outstanding
1477 // requests means each call into MPI takes
1478 // longer and more threads means more calls in
1479 // spots where all threads are messaging. Old
1480 // code was OK on dancer.icl.utk.edu with just
1481 // 32 bufs and 20 threads, but 512 bufs caused
1482 // intermittent hangs I think due to something
1483 // not being able to make progress or general
1484 // confusion (this with MPICH) ... maybe using a
1485 // fair mutex somewhere would help.
1486 //
1487 // 100us is a long time ... will try 10us. mmm ... perhaps need 100 at least on dancer with 17 threads per node
1488 myusleep(100);
1489 }
1490 else {
1491 waiter.wait();
1492 }
1493 }
1494 }
1495 } // if !probe()
1496 }
1497
1498 /// Destructor.
1500#if HAVE_PARSEC
1501 ////////////////// Parsec related Begin /////////////////
1502 delete parsec_runtime;
1503 ////////////////// Parsec related End /////////////////
1504#elif HAVE_INTEL_TBB
1505#else
1506 delete[] threads;
1507#endif
1508 }
1509
1510 /// \sa madness::threadpool_wait_policy
1511 static void set_wait_policy(
1512 WaitPolicy policy,
1513 int sleep_duration_in_microseconds = 0) {
1514#if !HAVE_INTEL_TBB && !HAVE_PARSEC
1515 instance()->queue.set_wait_policy(policy,
1516 sleep_duration_in_microseconds);
1517#endif
1518 }
1519
1520 };
1521
1522 // clang-format off
1523 /// Controls how aggressively ThreadPool holds on to the OS threads
1524 /// while waiting for work. Currently useful only for Pthread pool when it's using spinlocks;
1525 /// NOT used for TBB or PaRSEC.
1526 /// \param policy specifies how to wait for work;
1527 /// - WaitPolicy::Busy -- threads are kept busy (default); recommended when intensive work is only performed by MADNESS threads
1528 /// - WaitPolicy::Yield -- thread yields; recommended when intensive work is performed primarily by non-MADNESS threads
1529 /// - WaitPolicy::Sleep -- thread sleeps for \p sleep_duration_in_microseconds ; recommended when intensive work is performed by MADNESS nd non-MADNESS threads
1530 /// \param sleep_duration_in_microseconds if `policy==WaitPolicy::Sleep` this specifies the duration of sleep, in microseconds
1531 // clang-format on
1533 int sleep_duration_in_microseconds = 0) {
1534 ThreadPool::set_wait_policy(policy, sleep_duration_in_microseconds);
1535 }
1536
1537 /// @}
1538}
1539
1540#endif // MADNESS_WORLD_THREAD_H__INCLUDED
An integer with atomic set, get, read+increment, read+decrement, and decrement+test operations.
Definition atomicint.h:126
Definition worldmutex.h:700
bool enter(const int id)
Each thread calls this with its id (0,..,nthread-1) to enter the barrier.
Definition worldmutex.h:729
void register_thread(int id, volatile bool *pflag)
Each thread calls this once before first use.
Definition worldmutex.h:718
A thread safe, fast but simple doubled-ended queue.
Definition dqueue.h:80
Base class for exceptions thrown in MADNESS.
Definition madness_exception.h:66
Definition worldmutex.h:109
void wait()
Definition worldmutex.cc:103
void reset()
Definition worldmutex.h:124
Mutex using pthread mutex operations.
Definition worldmutex.h:131
Lowest level task interface.
Definition thread.h:867
double submit_time_
Definition thread.h:874
void execute()
Definition thread.h:1087
virtual ~PoolTaskInterface()=default
Destructor.
static std::enable_if<!(detail::function_traits< fnobjT >::value||detail::memfunc_traits< fnobjT >::value)>::type make_id(std::pair< void *, unsigned short > &id, const fnobjT &)
Definition thread.h:931
profiling::TaskEvent * task_event_
Definition thread.h:873
PoolTaskInterface()
Default constructor.
Definition thread.h:1058
virtual void run(const TaskThreadEnv &info)=0
Override this method to implement a multi-threaded task.
PoolTaskInterface(const TaskAttributes &attr)
Definition thread.h:1065
std::pair< void *, unsigned short > id_
Definition thread.h:875
void set_event(profiling::TaskEvent *task_event)
Definition thread.h:881
void submit()
Collect info on the task and record the submit time.
Definition thread.h:886
static std::enable_if< detail::function_traits< fnT >::value||detail::memfunc_traits< fnT >::value >::type make_id(std::pair< void *, unsigned short > &id, fnT fn)
Definition thread.h:916
virtual void get_id(std::pair< void *, unsigned short > &id) const
Definition thread.h:942
void set_nthread(int nthread)
Call this to reset the number of threads before the task is submitted.
Definition thread.h:1078
A no-operation task used for various purposes.
Definition thread.h:1112
void run(const TaskThreadEnv &)
Execution function that does nothing.
Definition thread.h:1115
virtual ~PoolTaskNull()
Destructor.
Definition thread.h:1118
virtual void get_id(std::pair< void *, unsigned short > &id) const
Definition thread.h:1125
Contains attributes of a task.
Definition thread.h:323
TaskAttributes & set_highpriority(bool hipri)
Sets the high priority attribute.
Definition thread.h:392
TaskAttributes(const TaskAttributes &attr)
Copy constructor.
Definition thread.h:343
bool is_generator() const
Test if the generator attribute is true.
Definition thread.h:351
static TaskAttributes hipri()
Definition thread.h:450
TaskAttributes & set_generator(bool generator_hint)
Sets the generator attribute.
Definition thread.h:372
static const unsigned long GENERATOR
Mask for generator bit.
Definition thread.h:328
static const unsigned long STEALABLE
Mask for stealable bit.
Definition thread.h:329
bool is_stealable() const
Test if the stealable attribute is true.
Definition thread.h:358
TaskAttributes(unsigned long flags=0)
Sets the attributes to the desired values.
Definition thread.h:337
static const unsigned long NTHREAD
Mask for nthread byte.
Definition thread.h:327
void set_nthread(int nthread)
Set the number of threads.
Definition thread.h:414
static TaskAttributes generator()
Definition thread.h:442
bool is_high_priority() const
Test if the high priority attribute is true.
Definition thread.h:365
TaskAttributes & set_stealable(bool stealable)
Sets the stealable attribute.
Definition thread.h:383
static TaskAttributes multi_threaded(int nthread)
Definition thread.h:458
void serialize(Archive &ar)
Serializes the attributes for I/O.
Definition thread.h:434
unsigned long flags
Byte-string storing the specified attributes.
Definition thread.h:324
int get_nthread() const
Get the number of threads.
Definition thread.h:422
static const unsigned long HIGHPRIORITY
Mask for priority bit.
Definition thread.h:330
virtual ~TaskAttributes()
Definition thread.h:346
Used to pass information about the thread environment to a user's task.
Definition thread.h:466
int id() const
Get the ID of this thread.
Definition thread.h:506
TaskThreadEnv(int nthread, int id, Barrier *barrier)
Constructor collecting necessary environmental information.
Definition thread.h:478
TaskThreadEnv(int nthread, int id)
Constructor collecting necessary environmental information.
Definition thread.h:491
bool barrier() const
Definition thread.h:514
Barrier * _barrier
Pointer to the shared barrier, null if there is only a single thread.
Definition thread.h:469
const int _nthread
Number of threads collaborating on task.
Definition thread.h:467
int nthread() const
Get the number of threads collaborating on this task.
Definition thread.h:499
const int _id
ID of this thread (0,...,nthread-1).
Definition thread.h:468
Simplified thread wrapper to hide pthread complexity.
Definition thread.h:164
virtual void run()=0
Function to be executed by the thread.
int pool_num
Stores index of thread in pool or -1.
Definition thread.h:176
void set_pool_thread_index(int i)
Sets the index of this thread within the pool.
Definition thread.h:195
static void exit()
A thread can call this to terminate its execution.
Definition thread.h:226
ThreadBase()
Default constructor.
Definition thread.h:213
const pthread_t & get_id() const
Get the pthread id of this thread (if running).
Definition thread.h:231
static pthread_key_t thread_key
Thread id key.
Definition thread.h:167
static ThreadBase * this_thread()
Definition thread.h:257
int cancel() const
Cancel this thread.
Definition thread.h:243
pthread_t id
Definition thread.h:177
static void delete_thread_key()
Definition thread.h:187
static int num_hw_processors()
Get number of actual hardware processors.
Definition thread.cc:174
void start()
Start the thread running.
Definition thread.cc:158
virtual ~ThreadBase()
Definition thread.h:215
static void init_thread_key()
Definition thread.h:180
int get_pool_thread_index() const
Get index of this thread in ThreadPool.
Definition thread.h:238
Definition thread.h:97
bool do_bind
Definition thread.h:101
void bind()
Definition thread.h:137
ThreadBinder(bool print=false)
Definition thread.h:108
size_t cpus[maxncpu]
Definition thread.h:102
std::atomic< size_t > nextcpu
Definition thread.h:103
const size_t * get_cpus() const
Definition thread.h:133
bool print
Definition thread.h:99
static const size_t maxncpu
Definition thread.h:98
static thread_local bool bound
Definition thread.h:104
void set_do_bind(bool value)
Definition thread.h:131
size_t ncpu
Definition thread.h:100
const size_t get_ncpu() const
Definition thread.h:135
ThreadPool thread object.
Definition thread.h:1134
ThreadPoolThread()
Definition thread.h:1142
profiling::TaskProfiler profiler_
Definition thread.h:1138
profiling::TaskProfiler & profiler()
Task profiler accessor.
Definition thread.h:1150
virtual ~ThreadPoolThread()=default
A singleton pool of threads for dynamic execution of tasks.
Definition thread.h:1160
static void add(PoolTaskInterface *task)
Add a new task to the pool.
Definition thread.h:1333
int nthreads
Number of threads.
Definition thread.h:1180
static std::unique_ptr< tbb::task_arena > tbb_arena
Definition thread.h:1317
static bool run_task()
An otherwise idle thread can all this to run a task.
Definition thread.h:1395
static void add(const std::vector< PoolTaskInterface * > &tasks)
Add a vector of tasks to the pool.
Definition thread.h:1381
ThreadPool(ThreadPool &&)=delete
void operator=(ThreadPool &&)=delete
volatile bool finish
Set to true when time to stop.
Definition thread.h:1181
ThreadPool(const ThreadPool &)=delete
static void set_wait_policy(WaitPolicy policy, int sleep_duration_in_microseconds=0)
Definition thread.h:1511
static int default_nthread()
Get the number of threads from the environment.
Definition thread.cc:325
static const DQStats & get_stats()
Returns queue statistics.
Definition thread.cc:466
static void end()
Definition thread.cc:437
AtomicInt nfinished
Thread pool exit counter.
Definition thread.h:1182
void operator=(const ThreadPool &)=delete
static ThreadPool * instance_ptr
Singleton pointer.
Definition thread.h:1185
ThreadPoolThread * threads
Array of threads.
Definition thread.h:1177
void thread_main(ThreadPoolThread *const thread)
Definition thread.cc:349
~ThreadPool()
Destructor.
Definition thread.h:1499
void scan(opT &op)
Definition thread.h:1374
void flush_prebuf()
Definition thread.h:1303
static void begin(int nthread=-1)
Please invoke while in a single-threaded environment.
Definition thread.cc:379
static ThreadPool * instance()
Return a pointer to the only instance, constructing as necessary.
Definition thread.h:1292
static double await_timeout
Waiter timeout.
Definition thread.h:1187
DQueue< PoolTaskInterface * > queue
Queue of tasks.
Definition thread.h:1179
bool run_tasks(bool wait, ThreadPoolThread *const this_thread)
Definition thread.h:1233
static std::size_t queue_size()
Returns the number of tasks in the queue.
Definition thread.h:1420
ThreadPoolThread main_thread
Placeholder for main thread tls.
Definition thread.h:1178
static const int nmax
Number of task a worker thread will pop from the task queue.
Definition thread.h:1186
bool run_task(bool wait, ThreadPoolThread *this_thread)
Run the next task.
Definition thread.h:1204
static void * pool_thread_main(void *v)
Forwards the thread to bound member function.
Definition thread.cc:374
static void await(const Probe &probe, bool dowork=true, bool sleep=false)
Gracefully wait for a condition to become true, executing any tasks in the queue.
Definition thread.h:1444
static std::unique_ptr< tbb::global_control > tbb_control
Definition thread.h:1316
static const ThreadPoolThread * get_threads()
Definition thread.h:1431
static std::size_t size()
Returns the number of threads in the pool.
Definition thread.h:1413
Simplified thread wrapper to hide pthread complexity.
Definition thread.h:271
void start(void *(*f)(void *), void *args=nullptr)
Start the thread by running f(args).
Definition thread.h:299
virtual ~Thread()=default
Thread(void *(*f)(void *), void *args=nullptr)
Create a thread and start it running f(args).
Definition thread.h:290
Thread()
Default constructor.
Definition thread.h:284
void run()
Invokes the function for this thread.
Definition thread.h:276
void * args
The arguments passed to this thread for execution.
Definition thread.h:273
void *(* f)(void *)
The function called for executing this thread.
Definition thread.h:272
Multi-threaded queue to manage and run tasks.
Definition world_task_queue.h:319
Task event list base class.
Definition thread.h:686
virtual ~TaskEventListBase()=default
Virtual destructor.
TaskEventListBase * next_
The next task event in the list.
Definition thread.h:688
friend std::ostream & operator<<(std::ostream &os, const TaskEventListBase &tel)
Output a task event list to an output stream.
Definition thread.h:723
TaskEventListBase & operator=(const TaskEventListBase &)=delete
TaskEventListBase * next() const
Get the next event list in the linked list.
Definition thread.h:705
virtual std::ostream & print_events(std::ostream &) const =0
Print the events.
TaskEventListBase()
Default constructor.
Definition thread.h:696
void insert(TaskEventListBase *list)
Insert list after this list.
Definition thread.h:712
TaskEventListBase(const TaskEventListBase &)=delete
A list of task events.
Definition thread.h:737
TaskEvent * event()
Get a new event from this list.
Definition thread.h:765
TaskEventList & operator=(const TaskEventList &)=delete
TaskEventList(const unsigned int nmax)
Default constructor.
Definition thread.h:752
virtual std::ostream & print_events(std::ostream &os) const
Print events recorded in this list.
Definition thread.h:775
unsigned int n_
The number of events recorded.
Definition thread.h:739
TaskEventList(const TaskEventList &)=delete
virtual ~TaskEventList()=default
Virtual destructor.
std::unique_ptr< TaskEvent[]> events_
The event array.
Definition thread.h:740
Task event class.
Definition thread.h:533
std::string get_name() const
Get name of the function pointer.
Definition thread.h:571
friend std::ostream & operator<<(std::ostream &os, const TaskEvent &te)
Output the task data using a tab-separated list.
Definition thread.h:645
void start(const std::pair< void *, unsigned short > &id, const unsigned short threads, const double submit_time)
Record the start time of the task and collect task information.
Definition thread.h:618
static void print_demangled(std::ostream &os, const char *symbol)
Print demangled symbol name.
Definition thread.h:547
unsigned short threads_
Number of threads used by the task.
Definition thread.h:537
void stop()
Record the stop time of the task.
Definition thread.h:628
double times_[3]
Task trace times: { submit, start, stop }.
Definition thread.h:535
std::pair< void *, unsigned short > id_
Task identification information.
Definition thread.h:536
This class collects and prints task profiling data.
Definition thread.h:789
TaskEventListBase * head_
The head of the linked list of data.
Definition thread.h:791
TaskProfiler()
Default constructor.
Definition thread.h:809
void write_to_file()
Write the profile data to file.
Definition thread.cc:213
~TaskProfiler()
Destructor.
Definition thread.h:814
TaskEventList * new_list(const std::size_t nmax)
Create a new task event list.
Definition thread.h:829
TaskProfiler & operator=(const TaskProfiler &)=delete
TaskEventListBase * tail_
The tail of the linked list of data.
Definition thread.h:792
TaskProfiler(const TaskProfiler &)=delete
static const char * output_file_name_
The output file name.
Definition thread.h:805
static Mutex output_mutex_
Mutex used to lock the output file.
Definition thread.h:794
const std::size_t bufsize
Definition derivatives.cc:16
real_function_3d mask
Definition dirac-hatom.cc:27
Implements DQueue.
auto T(World &world, response_space &f) -> response_space
Definition global_functions.cc:34
void threadpool_wait_policy(WaitPolicy policy, int sleep_duration_in_microseconds=0)
Definition thread.h:1532
static const double v
Definition hatom_sf_dirac.cc:20
Tensor< double > op(const Tensor< double > &x)
Definition kain.cc:508
#define MADNESS_CHECK(condition)
Check a condition — even in a release build the condition is always evaluated so it can have side eff...
Definition madness_exception.h:182
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
WaitPolicy
wait policies supported by ConditionVariable/DQueue/ThreadPool
Definition worldmutex.h:492
static double cpu_time()
Returns the cpu time in seconds relative to an arbitrary origin.
Definition timers.h:127
ThreadBinder binder
Definition thread.cc:71
static void myusleep(unsigned int us)
Sleep or spin for specified number of microseconds.
Definition timers.h:164
void error(const char *msg)
Definition world.cc:139
double wall_time()
Returns the wall time in seconds relative to an arbitrary origin.
Definition timers.cc:48
std::string type(const PairType &n)
Definition PNOParameters.h:18
std::string name(const FuncType &type, const int ex=-1)
Definition ccpairfunction.h:28
Definition dqueue.h:59
Function traits in the spirit of boost function traits.
Definition function_traits.h:13
Member function traits in the spirit of boost function traits.
Definition function_traits.h:21
static double current_time
Definition tdse1d.cc:160
int task(int i)
Definition test_runtime.cpp:4
int main()
Definition testmolbas.cc:37
const char * status[2]
Definition testperiodic.cc:43
Implements thread introspection.
Object that is used to convert function and member function pointers into void*.
Definition thread.h:899