MADNESS 0.10.1
macrotaskq.h
Go to the documentation of this file.
1/**
2 \file macrotaskq.h
3 \brief Declares the \c macrotaskq and MacroTaskBase classes
4 \ingroup mra
5
6 A MacroTaskq executes tasks on World objects, e.g. differentiation of a function or other
7 arithmetic. Complex algorithms can be implemented.
8
9 The universe world is split into subworlds, each of them executing macrotasks of the task queue.
10 This improves locality and speedups for large number of compute nodes, by reducing communications
11 within worlds.
12
13 The user defines a macrotask (an example is found in test_vectormacrotask.cc), the tasks are
14 lightweight and carry only bookkeeping information, actual input and output are stored in a
15 cloud (see cloud.h)
16
17 The user-defined macrotask is derived from MacroTaskIntermediate and must implement the run()
18 method. A heterogeneous task queue is possible.
19
20 The result of a macrotask is an object that lives in the universe and that is accessible from all
21 subworlds. The result is accumulated in the universe and must therefore be a WorldObject. Currently we
22 have implemented
23 - Function<T,NDIM>
24 - std::vector<Function<T,NDIM>> (a vector of Function<T,NDIM>)
25 - ScalarResultImpl<T> (a scalar value)
26 - std::vector<std::shared_ptr<ScalarResultImpl<T>>> (a vector of scalar values), shared_ptr for technical reasons
27
28 - std::tuple<std::vector<XXX>, std::vector<YYY>> (a tuple of n vectors of WorldObjects: XXX, YYY, .. = {Function, ScalarResultImpl, ...})
29
30
31 TODO: priority q
32 TODO: task submission from inside task (serialize task instead of replicate)
33 TODO: update documentation
34 TODO: consider serializing task member variables
35
36*/
37
38
39
40#ifndef SRC_MADNESS_MRA_MACROTASKQ_H_
41#define SRC_MADNESS_MRA_MACROTASKQ_H_
42
43#include <madness/world/cloud.h>
44#include <madness/world/world.h>
47
48namespace madness {
49
50/// helper class for returning the result of a task, which is not a madness Function, but a simple scalar
51
52/// the result value is accumulated via gaxpy in universe rank=0, after completion of the taskq the final
53/// value can be obtained via get(), which includes a broadcast of the final value to all processes
54template<typename T>
55class ScalarResultImpl : public WorldObject<ScalarResultImpl<T>> {
56public:
57 typedef T value_type;
61
62 /// Disable the default copy constructor
63 ScalarResultImpl(const ScalarResultImpl& other) = delete;
64 // ScalarResultImpl<T>(ScalarResultImpl<T>&& ) = default;
65
66 /// disable assignment operator
68
70 // print("calling destructor of ScalarResultImpl",this->id());
71 // std::cout << std::flush;
72 }
73
74 /// simple assignment of the scalar value
76 value = x;
77 return *this;
78 }
79
81 gaxpy(1.0, x, 1.0,true);
82 return *this;
83 }
84
85 /// accumulate, optional fence
86 void gaxpy(const double a, const T& right, double b, const bool fence=true) {
87 if (this->get_world().rank()==0) {
88 value =a*value + b * right;
89 }
90 else this->send(0, &ScalarResultImpl<T>::gaxpy, a, right, b, fence);
91 }
92
93 template<typename Archive>
94 void serialize(Archive &ar) {
95 ar & value;
96 }
97
98 /// after completion of the taskq get the final value
99 T get() {
100 this->get_world().gop.broadcast_serializable(*this, 0);
101 return value;
102 }
103
104 /// get the local value of this rank, which might differ for different ranks
105 /// for the final value use get()
106 T get_local() const {
107 return value;
108 }
109
110private:
111 /// the scalar value
113};
114
115template<typename T=double>
117public:
119 std::shared_ptr<implT> impl;
120
121 ScalarResult() = default;
122 ScalarResult(World &world) : impl(new implT(world)) {}
123 ScalarResult(const std::shared_ptr<implT>& impl) : impl(impl) {}
125 *(this->impl) = x;
126 return *this;
127 }
128
129 std::shared_ptr<implT> get_impl() const {
130 return impl;
131 }
132
133 void set_impl(const std::shared_ptr<implT>& newimpl) {
134 impl=newimpl;
135 }
136
137 uniqueidT id() const {
138 return impl->id();
139 }
140
141 /// accumulate, optional fence
142 void gaxpy(const double a, const T& right, double b, const bool fence=true) {
143 impl->gaxpy(a,right,b,fence);
144 }
145
146 template<typename Archive>
147 void serialize(Archive &ar) {
148 ar & impl;
149 }
150
151 /// after completion of the taskq get the final value
152 T get() {
153 return impl->get();
154 }
155
156 /// after completion of the taskq get the final value
157 T get_local() const {
158 return impl->get_local();
159 }
160
161};
162
163/// helper function to create a vector of ScalarResultImpl, circumventing problems with the constructors
164template<typename T>
165std::vector<ScalarResult<T>> scalar_result_vector(World& world, std::size_t n) {
166 std::vector<ScalarResult<T>> v;
167 for (std::size_t i=0; i<n; ++i) v.emplace_back(ScalarResult<T>(world));
168 return v;
169}
170
171
172// type traits to check if a template parameter is a WorldContainer
173template<typename>
174struct is_scalar_result_ptr : std::false_type {};
175
176template <typename T>
177struct is_scalar_result_ptr<std::shared_ptr<madness::ScalarResultImpl<T>>> : std::true_type {};
178
179template<typename>
180struct is_scalar_result_ptr_vector : std::false_type {
181};
182
183template<typename T>
184struct is_scalar_result_ptr_vector<std::vector<std::shared_ptr<typename madness::ScalarResultImpl<T>>>> : std::true_type {
185};
186
187// type traits to check if a template parameter is a WorldContainer
188template<typename>
189struct is_scalar_result : std::false_type {};
190
191template <typename T>
192struct is_scalar_result<madness::ScalarResult<T>> : std::true_type {};
193
194template<typename>
195struct is_scalar_result_impl : std::false_type {};
196
197template <typename T>
198struct is_scalar_result<madness::ScalarResultImpl<T>> : std::true_type {};
199
200template<typename>
201struct is_scalar_result_vector : std::false_type {
202};
203
204template<typename T>
205struct is_scalar_result_vector<std::vector<typename madness::ScalarResult<T>>> : std::true_type {
206};
207
208/// check if type is a valid task result: it must be a WorldObject and must implement gaxpy
209template <typename T>
210inline constexpr bool is_valid_task_result_v =
211 is_madness_function<T>::value // Function<T,NDIM>
212 || is_madness_function_vector<T>::value // std::vector<Function<T,NDIM>>
213 || is_scalar_result<T>::value // ScalarResultImpl<T>
214 || is_scalar_result_vector<T>::value // std::vector<std::shared_ptr<ScalarResultImpl<T>>>
215 || is_scalar_result_ptr<T>::value // ScalarResultImpl<T>
216 || is_scalar_result_ptr_vector<T>::value; // std::vector<std::shared_ptr<ScalarResultImpl<T>>>
217
218
219template<typename> struct is_tuple : std::false_type { };
220template<typename ...T> struct is_tuple<std::tuple<T...>> : std::true_type { };
221
222/// given a tuple check recursively if all elements are valid task results
223template<typename tupleT, std::size_t I>
225
226 typedef decay_tuple <tupleT> argtupleT; // removes const, &, etc
227
228 if constexpr(I >= std::tuple_size_v<tupleT>) {
229 // Last case, if nothing is left to iterate, then exit the function
230 return true;
231 } else {
232 using typeT = typename std::tuple_element<I, argtupleT>::type;// use decay types for determining a vector
233 if constexpr (not is_valid_task_result_v<typeT>) {
234 return false;
235 } else {
236 // Going for next element.
237 return check_tuple_is_valid_task_result<tupleT,I+1>();
238 }
239 }
240}
241
242
243
244/// the result type of a macrotask must implement gaxpy
245template<typename T>
246void gaxpy(const double a, ScalarResult<T>& left, const double b, const T& right, const bool fence=true) {
247 left.gaxpy(a, right, b, fence);
248}
249
250template <class Archive, typename T>
251struct madness::archive::ArchiveStoreImpl<Archive, std::shared_ptr<ScalarResultImpl<T>>> {
252 static void store(const Archive& ar, const std::shared_ptr<ScalarResultImpl<T>>& ptr) {
253 bool exists=(ptr) ? true : false;
254 ar & exists;
255 if (exists) ar & ptr->id();
256 }
257};
258
259
260template <class Archive, typename T>
261struct madness::archive::ArchiveLoadImpl<Archive, std::shared_ptr<ScalarResultImpl<T>>> {
262 static void load(const Archive& ar, std::shared_ptr<ScalarResultImpl<T>>& ptr) {
263 bool exists=false;
264 ar & exists;
265 if (exists) {
266 uniqueidT id;
267 ar & id;
268 World* world = World::world_from_id(id.get_world_id());
269 MADNESS_ASSERT(world);
270 auto ptr_opt = (world->ptr_from_id< ScalarResultImpl<T> >(id));
271 if (!ptr_opt)
272 MADNESS_EXCEPTION("ScalarResultImpl: remote operation attempting to use a locally uninitialized object",0);
273 ptr.reset(ptr_opt.value(), [] (ScalarResultImpl<T> *p_) -> void {}); // disable destruction
274 if (!ptr)
275 MADNESS_EXCEPTION("ScalarResultImpl<T> operation attempting to use an unregistered object",0);
276 } else {
277 ptr=nullptr;
278 }
279 }
280};
281
284 StoreFunction, ///< store a madness function in the cloud -- can have a large memory impact
285 StorePointerToFunction, ///< store the pointer to the function in the cloud, the actual function lives in the universe and
286 ///< its coefficients can be copied to the subworlds (e.g. by macrotaskq) when needed.
287 ///< The task itself is responsible for handling data movement
288 StoreFunctionViaPointer ///< store a pointer to the function in the cloud, but macrotaskq will move the
289 ///< coefficients to the subworlds when the task is started. This is the default policy.
290 };
291 /// given the MacroTask's storage policy return the corresponding Cloud storage policy
300
304
308
309 /// declaration here, definition in mra1.cc
311
312};
313
314
315
316template<typename T=double>
317std::ostream& operator<<(std::ostream& os, const typename MacroTaskInfo::StoragePolicy sp) {
318 if (sp==MacroTaskInfo::StoreFunction) os << "StoreFunction";
319 if (sp==MacroTaskInfo::StorePointerToFunction) os << "StorePointerToFunction";
320 if (sp==MacroTaskInfo::StoreFunctionViaPointer) os << "StoreFunctionViaPointer";
321 return os;
322}
323
324/// base class
326public:
327
328 typedef std::vector<std::shared_ptr<MacroTaskBase> > taskqT;
329
331 virtual ~MacroTaskBase() {};
332
333 double priority=1.0;
335
339
340 bool is_complete() const {return stat==Complete;}
341 bool is_running() const {return stat==Running;}
342 bool is_waiting() const {return stat==Waiting;}
343
344 virtual void run(World& world, Cloud& cloud, taskqT& taskq, const long element,
345 const bool debug, const MacroTaskInfo::StoragePolicy storage_policy) = 0;
346 virtual void cleanup() = 0; // clear static data (presumably persistent input data)
347
348 virtual void print_me(std::string s="") const {
349 printf("this is task with priority %4.1f\n",priority);
350 }
351 virtual void print_me_as_table(std::string s="") const {
352 print("nothing to print");
353 }
355 std::stringstream ss;
356 ss << std::setw(5) << this->get_priority() << " " <<this->stat;
357 return ss.str();
358 }
359
360 double get_priority() const {return priority;}
361 void set_priority(const double p) {priority=p;}
362
363 friend std::ostream& operator<<(std::ostream& os, const MacroTaskBase::Status s) {
364 if (s==MacroTaskBase::Status::Running) os << "Running";
365 if (s==MacroTaskBase::Status::Waiting) os << "Waiting";
366 if (s==MacroTaskBase::Status::Complete) os << "Complete";
367 if (s==MacroTaskBase::Status::Unknown) os << "Unknown";
368 return os;
369 }
370};
371
372
373template<typename macrotaskT>
375
376public:
377
379
381
382 void cleanup() {};
383};
384
385
386
387class MacroTaskQ : public WorldObject< MacroTaskQ> {
388
390 std::shared_ptr<World> subworld_ptr;
392 std::mutex taskq_mutex;
394 long nsubworld=1;
395
396 /// storage policy for the taskq can be set only once at construction
398
399 /// set the process map for the subworld
400 std::shared_ptr< WorldDCPmapInterface< Key<1> > > pmap1;
401 std::shared_ptr< WorldDCPmapInterface< Key<2> > > pmap2;
402 std::shared_ptr< WorldDCPmapInterface< Key<3> > > pmap3;
403 std::shared_ptr< WorldDCPmapInterface< Key<4> > > pmap4;
404 std::shared_ptr< WorldDCPmapInterface< Key<5> > > pmap5;
405 std::shared_ptr< WorldDCPmapInterface< Key<6> > > pmap6;
406
407 bool printdebug() const {return printlevel>=10;}
408 bool printprogress() const {return (printlevel>=4) and (not (printdebug()));}
409 bool printtimings() const {return universe.rank()==0 and printlevel>=3;}
410 bool printtimings_detail() const {return universe.rank()==0 and printlevel>=3;}
411
412public:
413
416 long get_nsubworld() const {return nsubworld;}
417 void set_printlevel(const long p) {printlevel=p;}
418
422
423 /// create an empty taskq and initialize the subworlds
427 , taskq()
429 , nsubworld(nworld)
430 , storage_policy(sp)
431 , cloud(universe)
432 {
433
435 this->process_pending();
436 }
437
439
440 /// for each process create a world using a communicator shared with other processes by round-robin
441 /// copy-paste from test_world.cc
442 static std::shared_ptr<World> create_worlds(World& universe, const std::size_t nsubworld) {
443
444 int color = universe.rank() % nsubworld;
446
447 std::shared_ptr<World> all_worlds;
448 all_worlds.reset(new World(comm));
449
451 return all_worlds;
452 }
453
454 /// run all tasks
455 void run_all() {
456
457 if (printdebug()) print_taskq();
458 if (printtimings_detail()) {
459 if (universe.rank()==0) {
460 print("number of tasks in taskq",taskq.size());
461 print("redirecting output to files task.#####");
462 }
463 }
464
465 auto replication_policy = cloud.get_replication_policy();
466 if (replication_policy!=Cloud::Distributed) {
467 double cpu0=cpu_time();
468 if (replication_policy==Cloud::RankReplicated) cloud.replicate();
469 if (replication_policy==Cloud::NodeReplicated) cloud.replicate_per_node(); // replicate to all hosts
471 double cpu1=cpu_time();
472 if (printtimings_detail()) print("cloud replication wall time",cpu1-cpu0);
473 }
475 universe.gop.set_forbid_fence(true); // make sure there are no hidden universe fences
483
484 double cpu00=cpu_time();
485
486 World& subworld=get_subworld();
487// if (printdebug()) print("I am subworld",subworld.id());
488 double tasktime=0.0;
489 if (printprogress() and universe.rank()==0) std::cout << "progress in percent: " << std::flush;
490 while (true) {
491 long element=get_scheduled_task_number(subworld);
492 double cpu0=cpu_time();
493 if (element<0) break;
494 std::shared_ptr<MacroTaskBase> task=taskq[element];
495 if (printdebug()) print("starting task no",element, "in subworld",subworld.id(),"at time",wall_time());
496
497 task->run(subworld,cloud, taskq, element, printdebug(), storage_policy);
498
499 double cpu1=cpu_time();
500 set_complete(element);
501 tasktime+=(cpu1-cpu0);
502 if (printdebug()) printf("completed task %3ld after %6.1fs at time %6.1fs\n",element,cpu1-cpu0,wall_time());
503
504 // print progress
505 const std::size_t ntask=taskq.size();
506 // return percentile of ntask for element
507 auto in_percentile = [&ntask](const long element) {
508 return std::floor(element/(0.1*(ntask+1)));
509 };
510 auto is_first_in_percentile = [&](const long element) {
511 return (in_percentile(element)!=in_percentile(element-1));
512 };
513 if (printprogress() and is_first_in_percentile(element)) {
514 std::cout << int(in_percentile(element)*10) << " " << std::flush;
515 }
516 }
519 universe.gop.sum(tasktime);
520 if (printprogress() and universe.rank()==0) std::cout << std::endl;
521 double cpu11=cpu_time();
522 if (printlevel>=4) {
524 if (universe.rank()==0) print("all tasks complete");
526 }
527 if (printtimings_detail()) {
528 printf("completed taskqueue after %4.1fs at time %4.1fs\n", cpu11 - cpu00, wall_time());
529 printf(" total cpu time / per world %4.1fs %4.1fs\n", tasktime, tasktime / universe.size());
530 }
531
532 // cleanup task-persistent input data
533 for (auto& task : taskq) task->cleanup();
534 cloud.clear_cache(subworld);
535 cloud.clear();
536 subworld.gop.fence();
537 subworld.gop.fence();
546 }
547
549 for (const auto& t : vtask) {
550 if (universe.rank()==0) t->set_waiting();
552 }
553 }
554
555 void print_taskq() const {
557 if (universe.rank()==0) {
558 print("\ntaskq on universe rank",universe.rank());
559 print("total number of tasks: ",taskq.size());
560 print(" task batch priority status");
561 for (const auto& t : taskq) t->print_me_as_table();
562 }
564 }
565
566private:
567 void add_replicated_task(const std::shared_ptr<MacroTaskBase>& task) {
568 taskq.push_back(task);
569 }
570
571 /// scheduler is located on universe.rank==0
573 long number=0;
574 if (subworld.rank()==0) {
576 number=r.get();
577 }
578 subworld.gop.broadcast_serializable(number, 0);
579 subworld.gop.fence();
580 return number;
581
582 }
583
586 std::lock_guard<std::mutex> lock(taskq_mutex);
587
588 auto is_Waiting = [](const std::shared_ptr<MacroTaskBase>& mtb_ptr) {return mtb_ptr->is_waiting();};
589 auto it=std::find_if(taskq.begin(),taskq.end(),is_Waiting);
590 if (it!=taskq.end()) {
591 it->get()->set_running();
592 long element=it-taskq.begin();
593 return element;
594 }
595// print("could not find task to schedule");
596 return -1;
597 }
598
599 /// scheduler is located on rank==0
600 void set_complete(const long task_number) const {
601 this->task(ProcessID(0), &MacroTaskQ::set_complete_local, task_number);
602 }
603
604 /// scheduler is located on rank==0
605 void set_complete_local(const long task_number) const {
607 taskq[task_number]->set_complete();
608 }
609
610public:
619private:
620 std::size_t size() const {
621 return taskq.size();
622 }
623
624};
625
626
627template<typename taskT>
630
631 template<typename Q>
632 struct is_vector : std::false_type {
633 };
634 template<typename Q>
635 struct is_vector<std::vector<Q>> : std::true_type {
636 };
637
638 typedef typename taskT::resultT resultT;
639 typedef typename taskT::argtupleT argtupleT;
641 taskT task;
642 bool debug=false;
644
645
646public:
647
648 /// constructor takes the task, but no arguments to the task
649 explicit MacroTask(World &world, taskT &task)
650 : MacroTask(world,task, std::make_shared<MacroTaskQ>(world, world.size(), MacroTaskInfo::get_default())) {
652 }
653
654 /// constructor takes task and the storage policy, but no arguments to the task
655 explicit MacroTask(World &world, taskT& task, MacroTaskInfo::StoragePolicy storage_policy)
656 : MacroTask(world,task, std::make_shared<MacroTaskQ>(world, world.size(), storage_policy)) {
658 }
659
660 /// constructor takes the task,
661 explicit MacroTask(World &world, taskT &task, std::shared_ptr<MacroTaskQ> taskq_ptr)
663
664 // someone might pass in a taskq nullptr
665 immediate_execution=false; // will be reset by the forwarding constructors
666 if (this->taskq_ptr==0) {
667 this->taskq_ptr=std::make_shared<MacroTaskQ>(world, world.size(), MacroTaskInfo::get_default());
669 }
670
671 if (debug) this->taskq_ptr->set_printlevel(20);
672 this->taskq_ptr->cloud.set_storing_policy(MacroTaskInfo::to_cloud_storage_policy(this->taskq_ptr->get_storage_policy())); // how to store madness functions: deep or shallow
673
674 }
675
676 MacroTask& set_debug(const bool value) {
677 debug=value;
678 return *this;
679 }
680
681 /// this mimicks the original call to the task functor, called from the universe
682
683 /// store all input to the cloud, create output Function<T,NDIM> in the universe,
684 /// create the batched task and shove it into the taskq. Possibly execute the taskq.
685 template<typename ... Ts>
686 resultT operator()(const Ts &... args) {
687
688 auto argtuple = std::tie(args...);
689 static_assert(std::is_same<decltype(argtuple), argtupleT>::value, "type or number of arguments incorrect");
690
691 // partition the argument vector into batches
692 auto partitioner=task.partitioner;
693 if (not partitioner) partitioner.reset(new MacroTaskPartitioner);
694 partitioner->set_nsubworld(world.size());
695 partitionT partition = partitioner->partition_tasks(argtuple);
696
697 if (debug and world.rank()==0) {
698 print("MacroTask storage policy: ",taskq_ptr->get_storage_policy());
699 print("Cloud storage policy: ",taskq_ptr->cloud.get_storing_policy());
700 }
701
702 recordlistT inputrecords = taskq_ptr->cloud.store(world, argtuple);
703 resultT result = task.allocator(world, argtuple);
704 auto outputrecords =prepare_output_records(taskq_ptr->cloud, result);
705
706 // create tasks and add them to the taskq
708 for (const auto& batch_prio : partition) {
709 vtask.push_back(
710 std::shared_ptr<MacroTaskBase>(new MacroTaskInternal(task, batch_prio, inputrecords, outputrecords)));
711 }
712 taskq_ptr->add_tasks(vtask);
713
714 if (immediate_execution) taskq_ptr->run_all();
715
716 return result;
717 }
718
719//private:
720
722 std::shared_ptr<MacroTaskQ> taskq_ptr;
723private:
724
725
726 /// store *pointers* to the result WorldObject in the cloud and return the recordlist
728 if constexpr (is_tuple<resultT>::value) {
729 static_assert(check_tuple_is_valid_task_result<resultT,0>(),
730 "tuple has invalid result type in prepare_output_records");
731 } else {
732 static_assert(is_valid_task_result_v<resultT>, "unknown result type in prepare_output_records");
733 }
734
735 if (debug) print("storing pointers to output in cloud");
736 // store an element of the tuple only
737 auto store_output_records = [&](const auto& result) {
738 recordlistT outputrecords;
739 typedef std::decay_t<decltype(result)> argT;
740 if constexpr (is_madness_function<argT>::value) {
741 outputrecords += cloud.store(world, result.get_impl().get()); // store pointer to FunctionImpl
742 } else if constexpr (is_madness_function_vector<argT>::value) {
743 outputrecords += cloud.store(world, get_impl(result));
744 } else if constexpr (is_scalar_result<argT>::value) {
745 outputrecords += cloud.store(world, result.get_impl()); // store pointer to ScalarResultImpl
746 } else if constexpr (is_vector<argT>::value) {
748 // argT = std::vector<ScalarResult<T>>
749 std::vector<std::shared_ptr<typename argT::value_type::implT>> v;
750 for (const auto& ptr : result) v.push_back(ptr.get_impl());
751 outputrecords+=cloud.store(world,v);
752 } else {
753 MADNESS_EXCEPTION("\n\n unknown vector result type in prepare_input ", 1);
754 }
755 } else {
756 MADNESS_EXCEPTION("should not be here",1);
757 }
758 return outputrecords;
759 };
760
761 recordlistT outputrecords;
762 if constexpr (is_tuple<resultT>::value) {
763 // loop over tuple elements -- args is the individual tuple element
764 std::apply([&](auto &&... args) {
765 (( outputrecords+=store_output_records(args) ), ...);
766 }, result);
767 } else {
768 outputrecords=store_output_records(result);
769 }
770 return outputrecords;
771 }
772
773
774 class MacroTaskInternal : public MacroTaskIntermediate<MacroTask> {
775
776 typedef decay_tuple<typename taskT::argtupleT> argtupleT; // removes const, &, etc
777 typedef typename taskT::resultT resultT;
780 public:
781 taskT task;
782 std::string get_name() const {
783 if (task.name=="unknown_task") return typeid(task).name();
784 return task.name;
785 }
786
787 MacroTaskInternal(const taskT &task, const std::pair<Batch,double> &batch_prio,
790 if constexpr (is_tuple<resultT>::value) {
791 static_assert(check_tuple_is_valid_task_result<resultT,0>(),
792 "tuple has invalid result type in prepare_output_records");
793 } else {
794 static_assert(is_valid_task_result_v<resultT>, "unknown result type in prepare_output_records");
795 }
796 this->task.batch=batch_prio.first;
797 this->priority=batch_prio.second;
798 }
799
800
801 void print_me(std::string s="") const override {
802 print("this is task",get_name(),"with batch", task.batch,"priority",this->get_priority());
803 }
804
805 void print_me_as_table(std::string s="") const override {
806 std::stringstream ss;
807 std::string name=get_name();
808 std::size_t namesize=std::min(std::size_t(28),name.size());
809 name += std::string(28-namesize,' ');
810
811 std::stringstream ssbatch;
812 ssbatch << task.batch;
813 std::string strbatch=ssbatch.str();
814 int nspaces=std::max(int(0),35-int(ssbatch.str().size()));
815 strbatch+=std::string(nspaces,' ');
816
817 ss << name
818 << std::setw(10) << strbatch
820 print(ss.str());
821 }
822
823 /// loop over the tuple elements of both tuples and execute the operation op on each element pair
824 template<typename tupleT, typename tupleR, typename opT, std::size_t I=0>
825 void binary_tuple_loop(tupleT& tuple1, tupleR& tuple2, opT& op) const {
826 if constexpr(I < std::tuple_size_v<tupleT>) {
827 auto& element1=std::get<I>(tuple1);
828 auto& element2=std::get<I>(tuple2);
829 op(element1,element2);
830 binary_tuple_loop<tupleT, tupleR, opT, I+1>(tuple1,tuple2,op);
831 }
832 }
833
834 template<typename tupleT, typename opT, std::size_t I=0>
835 void unary_tuple_loop(tupleT& tuple, opT& op) const {
836 if constexpr(I < std::tuple_size_v<tupleT>) {
837 auto& element1=std::get<I>(tuple);
838 op(element1);
839 unary_tuple_loop<tupleT,opT, I+1>(tuple,op);
840 }
841 }
842
843 /// accumulate the result of the task into the final result living in the universe
844 template<typename resultT1, std::size_t I=0>
845 typename std::enable_if<is_tuple<resultT1>::value, void>::type
846 accumulate_into_final_result(World &subworld, resultT1 &final_result, const resultT1 &tmp_result, const argtupleT& argtuple) {
847 if constexpr(I < std::tuple_size_v<resultT1>) {
848 using elementT = typename std::tuple_element<I, resultT>::type;// use decay types for determining a vector
849 auto element_final=std::get<I>(final_result);
850 auto element_tmp=std::get<I>(tmp_result);
851 accumulate_into_final_result<elementT>(subworld, element_final, element_tmp, argtuple);
852 accumulate_into_final_result<resultT1,I+1>(subworld, final_result, tmp_result, argtuple);
853 }
854 }
855
856 /// accumulate the result of the task into the final result living in the universe
857 template<typename resultT1>
858 typename std::enable_if<not is_tuple<resultT1>::value, void>::type
859 accumulate_into_final_result(World &subworld, resultT1 &result, const resultT1 &result_tmp, const argtupleT& argtuple) {
861 // gaxpy can be done in reconstructed or compressed mode
862 TreeState operating_state=result_tmp.get_impl()->get_tensor_type()==TT_FULL ? compressed : reconstructed;
863 result_tmp.change_tree_state(operating_state);
864 gaxpy(1.0,result,1.0, result_tmp);
866 TreeState operating_state=result_tmp[0].get_impl()->get_tensor_type()==TT_FULL ? compressed : reconstructed;
867 change_tree_state(result_tmp,operating_state);
868 // compress(subworld, result_tmp);
869 // resultT1 tmp1=task.allocator(subworld,argtuple);
870 // tmp1=task.batch.template insert_result_batch(tmp1,result_tmp);
871 gaxpy(1.0,result,1.0,result_tmp,false);
872 // was using operator+=, but this requires a fence, which is not allowed here..
873 // result += tmp1;
874 } else if constexpr (is_scalar_result<resultT1>::value) {
875 gaxpy(1.0, result, 1.0, result_tmp.get_local(), false);
876 } else if constexpr (is_scalar_result_vector<resultT1>::value) {
877 // resultT1 tmp1=task.allocator(subworld,argtuple);
878 // tmp1=task.batch.template insert_result_batch(tmp1,result_tmp);
879 std::size_t sz=result.size();
880 for (int i=0; i<sz; ++i) {
881 gaxpy(1.0, result[i], 1.0, result_tmp[i].get_local(), false);
882 }
883 }
884
885 }
886
887 /// called by the MacroTaskQ when the task is scheduled
888 void run(World &subworld, Cloud &cloud, MacroTaskBase::taskqT &taskq, const long element, const bool debug,
889 const MacroTaskInfo::StoragePolicy storage_policy) override {
890 io_redirect io(element,get_name()+"_task",debug);
891 const argtupleT argtuple = cloud.load<argtupleT>(subworld, inputrecords);
892 argtupleT batched_argtuple = task.batch.copy_input_batch(argtuple);
893
894 if (storage_policy==MacroTaskInfo::StoreFunctionViaPointer) {
895 double cpu0=wall_time();
896 // the functions loaded from the cloud are pointers to the universe functions,
897 // retrieve the function coefficients from the universe
898 // aka: turn the current shallow copy into a deep copy
899 if (debug) print("loading function coefficients from universe for task",get_name());
900
901 // loop over the tuple -- copy the functions from the universe to the subworld
902 auto copi = [&](auto& arg) {
903 typedef std::decay_t<decltype(arg)> argT;
904 if constexpr (is_madness_function<argT>::value) {
905 arg=copy(subworld, arg);
906 } else if constexpr (is_madness_function_vector<argT>::value) {
907 for (auto& f : arg) f=copy(subworld,f);
908 }
909 };
910
911 unary_tuple_loop(batched_argtuple,copi);
912 double cpu1=wall_time();
913 if (debug) {
915 print("copied coefficients for task",get_name(),"in",cpu1-cpu0,"seconds");
916 }
917 }
918
919 try {
920 print("starting task no",element, ", '",get_name(),"', in subworld",subworld.id(),"at time",wall_time());
921 double cpu0=cpu_time();
922 resultT result_batch = std::apply(task, batched_argtuple); // lives in the subworld, is a batch of the full vector (if applicable)
923 double cpu1=cpu_time();
924 constexpr std::size_t bufsize=256;
925 char buffer[bufsize];
926 std::snprintf(buffer,bufsize,"completed task %3ld after %6.1fs at time %6.1fs\n",element,cpu1-cpu0,wall_time());
927 print(std::string(buffer));
928
929 // move the result from the batch to the final result, all still in subworld
930 auto insert_batch = [&](auto& element1, auto& element2) {
931 typedef std::decay_t<decltype(element1)> decay_type;;
932 if constexpr (is_vector<decay_type>::value) {
933 element1=task.batch.insert_result_batch(element1,element2);
934 } else {
935 std::swap(element1,element2);
936 }
937 };
938 resultT result_subworld=task.allocator(subworld,argtuple);
939 if constexpr (is_tuple<resultT>::value) {
940 binary_tuple_loop(result_subworld, result_batch, insert_batch);
941 } else {
942 insert_batch(result_subworld,result_batch);
943 }
944
945 // accumulate the subworld-local results into the final, universe result
946 resultT result_universe=get_output(subworld, cloud); // lives in the universe
947
948 accumulate_into_final_result<resultT>(subworld, result_universe, result_subworld, argtuple);
949
950 } catch (std::exception& e) {
951 print("failing task no",element,"in subworld",subworld.id(),"at time",wall_time());
952 print(e.what());
954 print("\n\n");
955 MADNESS_EXCEPTION("failing task",1);
956 }
957
958 };
959
960 // this is called after all tasks have been executed and the taskq has ended
961 void cleanup() override {
962 // resultT result_universe=get_output(subworld, cloud); // lives in the universe
963
964 }
965
966 template<typename T, std::size_t NDIM>
967 static Function<T,NDIM> pointer2WorldObject(const std::shared_ptr<FunctionImpl<T,NDIM>> impl) {
968 Function<T,NDIM> result;
969 result.set_impl(impl);
970 return result;
971 }
972
973 template<typename T, std::size_t NDIM>
974 static std::vector<Function<T,NDIM>> pointer2WorldObject(const std::vector<std::shared_ptr<FunctionImpl<T,NDIM>>> v_impl) {
975 std::vector<Function<T,NDIM>> vresult;
976 vresult.resize(v_impl.size());
977 set_impl(vresult,v_impl);
978 return vresult;
979 }
980
981 template<typename T>
982 static ScalarResult<T> pointer2WorldObject(const std::shared_ptr<ScalarResultImpl<T>> sr_impl) {
983 return ScalarResult(sr_impl);
984 }
985
986 template<typename T>
987 static std::vector<ScalarResult<T>> pointer2WorldObject(const std::vector<std::shared_ptr<ScalarResultImpl<T>>> v_sr_impl) {
988 std::vector<ScalarResult<T>> vresult(v_sr_impl.size());
989 for (auto i=0; i<v_sr_impl.size(); ++i) {
990 vresult[i].set_impl(v_sr_impl[i]);
991 }
992 return vresult;
993 }
994
995 /// return the WorldObjects or the result functions living in the universe
996
997 /// read the pointers to the universe WorldObjects from the cloud,
998 /// convert them to actual WorldObjects and return them
999 resultT get_output(World &subworld, Cloud &cloud) const {
1000 resultT result;
1001
1002 // save outputrecords, because they will be consumed by the cloud
1003 auto outputrecords1 = this->outputrecords;
1004
1005 // turn an element of the tuple of pointers into an element of the tuple of WorldObjects
1006 auto doit = [&](auto& element) {
1007 typedef std::decay_t<decltype(element)> elementT;
1008
1009 // load the elements from the cloud -- they contain pointers to WorldObjects
1011 typedef typename elementT::value_type::implT implT;
1012 auto ptr_element = cloud.consuming_load<std::vector<std::shared_ptr<implT>>>(
1013 subworld, outputrecords1);
1014 element = pointer2WorldObject(ptr_element);
1015 }
1016 else if constexpr (is_madness_function<elementT>::value) {
1017 typedef typename elementT::implT implT;
1018 auto ptr_element = cloud.consuming_load<std::shared_ptr<implT>>(subworld, outputrecords1);
1019 element = pointer2WorldObject(ptr_element);
1020 }
1021 else if constexpr (is_scalar_result_vector<elementT>::value) { // std::vector<ScalarResult<T>>
1022 typedef typename elementT::value_type ScalarResultT;
1023 typedef typename ScalarResultT::implT implT;
1024 typedef std::vector<std::shared_ptr<implT>> vptrT;
1025 auto ptr_element = cloud.consuming_load<vptrT>(subworld, outputrecords1);
1026 element = pointer2WorldObject(ptr_element);
1027 }
1028 else if constexpr (is_scalar_result<elementT>::value) {
1029 // elementT is a ScalarResultImpl<T>
1030 // in cloud we store a std::shared_ptr<ScalarResultImpl<T>>
1031 auto ptr_element = cloud.consuming_load<std::shared_ptr<typename elementT::implT>>(subworld, outputrecords1);
1032 element = pointer2WorldObject(ptr_element);
1033 }
1034 else {
1035 MADNESS_EXCEPTION("confused about the type of the result", 1);
1036 }
1037 };
1038 if constexpr (is_tuple<resultT>::value) {
1039 static_assert(check_tuple_is_valid_task_result<resultT, 0>(),
1040 "invalid tuple task result -- must be vectors of functions");
1041 static_assert(is_tuple<resultT>::value, "is a tuple");
1042
1043 // loop over all tuple elements
1044 // 1. load the pointers to the WorldObjects living in the universe
1045 // 2. create WorldObjects from the pointers and copy them into the tuple of type resultT
1046
1047 // turn the tuple of pointers into a tuple of WorldObjects
1048 unary_tuple_loop(result,doit);
1049
1050
1051 } else {
1052 doit(result);
1053
1054 }
1055 return result;
1056 }
1057
1058 };
1059
1060};
1061
1063public:
1065 std::string name="unknown_task";
1066 std::shared_ptr<MacroTaskPartitioner> partitioner=0;
1068};
1069
1070
1071} /* namespace madness */
1072
1073#endif /* SRC_MADNESS_MRA_MACROTASKQ_H_ */
Wrapper around MPI_Comm. Has a shallow copy constructor; use Create(Get_group()) for deep copy.
Definition safempi.h:490
Intracomm Split(int Color, int Key=0) const
Definition safempi.h:635
a batch consists of a 2D-input batch and a 1D-output batch: K-batch <- (I-batch, J-batch)
Definition macrotaskpartitioner.h:124
cloud class
Definition cloud.h:178
void clear()
Definition cloud.h:352
void replicate_per_node(const std::size_t chunk_size=INT_MAX)
Definition cloud.h:433
recordlistT store(madness::World &world, const T &source)
Definition cloud.h:410
Recordlist< keyT > recordlistT
Definition cloud.h:230
std::tuple< size_t, double > print_size(World &universe)
Definition cloud.h:279
void replicate(const std::size_t chunk_size=INT_MAX)
Definition cloud.h:441
T load(madness::World &world, const recordlistT recordlist) const
load a single object from the cloud, recordlist is kept unchanged
Definition cloud.h:370
ReplicationPolicy get_replication_policy() const
Definition cloud.h:267
void clear_cache(World &subworld)
Definition cloud.h:346
@ NodeReplicated
Definition cloud.h:216
@ Distributed
Definition cloud.h:212
@ RankReplicated
Definition cloud.h:214
void print_timings(World &universe) const
Definition cloud.h:317
StoragePolicy
Definition cloud.h:204
@ StoreFunctionPointer
Definition cloud.h:207
@ StoreFunction
Definition cloud.h:205
T consuming_load(madness::World &world, recordlistT &recordlist) const
similar to load, but will consume the recordlist
Definition cloud.h:383
static void set_default_pmap(World &world)
Sets the default process map.
Definition mraimpl.h:3548
static std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > & get_pmap()
Returns the default process map.
Definition funcdefaults.h:389
static void set_pmap(const std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > &value)
Sets the default process map (does not redistribute existing functions)
Definition funcdefaults.h:405
FunctionImpl holds all Function state to facilitate shallow copy semantics.
Definition funcimpl.h:945
A multiresolution adaptive numerical function.
Definition mra.h:139
void set_impl(const std::shared_ptr< FunctionImpl< T, NDIM > > &impl)
Replace current FunctionImpl with provided new one.
Definition mra.h:661
A future is a possibly yet unevaluated value.
Definition future.h:373
T & get(bool dowork=true) &
Gets the value, waiting if necessary.
Definition future.h:574
base class
Definition macrotaskq.h:325
void set_running()
Definition macrotaskq.h:337
virtual ~MacroTaskBase()
Definition macrotaskq.h:331
void set_waiting()
Definition macrotaskq.h:338
MacroTaskBase()
Definition macrotaskq.h:330
virtual void cleanup()=0
virtual void print_me(std::string s="") const
Definition macrotaskq.h:348
std::string print_priority_and_status_to_string() const
Definition macrotaskq.h:354
void set_complete()
Definition macrotaskq.h:336
double priority
Definition macrotaskq.h:333
bool is_complete() const
Definition macrotaskq.h:340
Status
Definition macrotaskq.h:334
@ Complete
Definition macrotaskq.h:334
@ Running
Definition macrotaskq.h:334
@ Unknown
Definition macrotaskq.h:334
@ Waiting
Definition macrotaskq.h:334
bool is_running() const
Definition macrotaskq.h:341
enum madness::MacroTaskBase::Status stat
virtual void run(World &world, Cloud &cloud, taskqT &taskq, const long element, const bool debug, const MacroTaskInfo::StoragePolicy storage_policy)=0
double get_priority() const
Definition macrotaskq.h:360
void set_priority(const double p)
Definition macrotaskq.h:361
virtual void print_me_as_table(std::string s="") const
Definition macrotaskq.h:351
std::vector< std::shared_ptr< MacroTaskBase > > taskqT
Definition macrotaskq.h:328
friend std::ostream & operator<<(std::ostream &os, const MacroTaskBase::Status s)
Definition macrotaskq.h:363
bool is_waiting() const
Definition macrotaskq.h:342
Definition macrotaskq.h:374
MacroTaskIntermediate()
Definition macrotaskq.h:378
void cleanup()
Definition macrotaskq.h:382
~MacroTaskIntermediate()
Definition macrotaskq.h:380
Definition macrotaskq.h:1062
MacroTaskOperationBase()
Definition macrotaskq.h:1067
Batch batch
Definition macrotaskq.h:1064
std::shared_ptr< MacroTaskPartitioner > partitioner
Definition macrotaskq.h:1066
std::string name
Definition macrotaskq.h:1065
partition one (two) vectors into 1D (2D) batches.
Definition macrotaskpartitioner.h:182
std::list< std::pair< Batch, double > > partitionT
Definition macrotaskpartitioner.h:186
Definition macrotaskq.h:387
long nsubworld
Definition macrotaskq.h:394
std::mutex taskq_mutex
Definition macrotaskq.h:392
bool printdebug() const
Definition macrotaskq.h:407
static void set_pmap(World &world)
Definition macrotaskq.h:611
MacroTaskInfo::StoragePolicy get_storage_policy() const
Definition macrotaskq.h:419
bool printtimings_detail() const
Definition macrotaskq.h:410
void run_all()
run all tasks
Definition macrotaskq.h:455
void set_complete(const long task_number) const
scheduler is located on rank==0
Definition macrotaskq.h:600
MacroTaskQ(World &universe, int nworld, const MacroTaskInfo::StoragePolicy sp, const long printlevel=0)
create an empty taskq and initialize the subworlds
Definition macrotaskq.h:424
World & universe
Definition macrotaskq.h:389
std::shared_ptr< WorldDCPmapInterface< Key< 1 > > > pmap1
set the process map for the subworld
Definition macrotaskq.h:400
std::shared_ptr< WorldDCPmapInterface< Key< 3 > > > pmap3
Definition macrotaskq.h:402
MacroTaskBase::taskqT taskq
Definition macrotaskq.h:391
void add_tasks(MacroTaskBase::taskqT &vtask)
Definition macrotaskq.h:548
long get_scheduled_task_number(World &subworld)
scheduler is located on universe.rank==0
Definition macrotaskq.h:572
void set_complete_local(const long task_number) const
scheduler is located on rank==0
Definition macrotaskq.h:605
World & get_subworld()
Definition macrotaskq.h:415
std::shared_ptr< WorldDCPmapInterface< Key< 4 > > > pmap4
Definition macrotaskq.h:403
std::size_t size() const
Definition macrotaskq.h:620
std::shared_ptr< WorldDCPmapInterface< Key< 5 > > > pmap5
Definition macrotaskq.h:404
void print_taskq() const
Definition macrotaskq.h:555
std::shared_ptr< World > subworld_ptr
Definition macrotaskq.h:390
bool printtimings() const
Definition macrotaskq.h:409
std::shared_ptr< WorldDCPmapInterface< Key< 2 > > > pmap2
Definition macrotaskq.h:401
bool printprogress() const
Definition macrotaskq.h:408
std::shared_ptr< WorldDCPmapInterface< Key< 6 > > > pmap6
Definition macrotaskq.h:405
void set_printlevel(const long p)
Definition macrotaskq.h:417
long printlevel
Definition macrotaskq.h:393
madness::Cloud cloud
Definition macrotaskq.h:414
long get_nsubworld() const
Definition macrotaskq.h:416
void add_replicated_task(const std::shared_ptr< MacroTaskBase > &task)
Definition macrotaskq.h:567
~MacroTaskQ()
Definition macrotaskq.h:438
static std::shared_ptr< World > create_worlds(World &universe, const std::size_t nsubworld)
Definition macrotaskq.h:442
const MacroTaskInfo::StoragePolicy storage_policy
storage policy for the taskq can be set only once at construction
Definition macrotaskq.h:397
long get_scheduled_task_number_local()
Definition macrotaskq.h:584
Definition macrotaskq.h:774
taskT task
Definition macrotaskq.h:781
MacroTaskInternal(const taskT &task, const std::pair< Batch, double > &batch_prio, const recordlistT &inputrecords, const recordlistT &outputrecords)
Definition macrotaskq.h:787
std::enable_if< notis_tuple< resultT1 >::value, void >::type accumulate_into_final_result(World &subworld, resultT1 &result, const resultT1 &result_tmp, const argtupleT &argtuple)
accumulate the result of the task into the final result living in the universe
Definition macrotaskq.h:859
resultT get_output(World &subworld, Cloud &cloud) const
return the WorldObjects or the result functions living in the universe
Definition macrotaskq.h:999
static ScalarResult< T > pointer2WorldObject(const std::shared_ptr< ScalarResultImpl< T > > sr_impl)
Definition macrotaskq.h:982
decay_tuple< typename taskT::argtupleT > argtupleT
Definition macrotaskq.h:776
static std::vector< Function< T, NDIM > > pointer2WorldObject(const std::vector< std::shared_ptr< FunctionImpl< T, NDIM > > > v_impl)
Definition macrotaskq.h:974
void cleanup() override
Definition macrotaskq.h:961
taskT::resultT resultT
Definition macrotaskq.h:777
recordlistT outputrecords
Definition macrotaskq.h:779
std::string get_name() const
Definition macrotaskq.h:782
void binary_tuple_loop(tupleT &tuple1, tupleR &tuple2, opT &op) const
loop over the tuple elements of both tuples and execute the operation op on each element pair
Definition macrotaskq.h:825
static Function< T, NDIM > pointer2WorldObject(const std::shared_ptr< FunctionImpl< T, NDIM > > impl)
Definition macrotaskq.h:967
static std::vector< ScalarResult< T > > pointer2WorldObject(const std::vector< std::shared_ptr< ScalarResultImpl< T > > > v_sr_impl)
Definition macrotaskq.h:987
void print_me_as_table(std::string s="") const override
Definition macrotaskq.h:805
void print_me(std::string s="") const override
Definition macrotaskq.h:801
void run(World &subworld, Cloud &cloud, MacroTaskBase::taskqT &taskq, const long element, const bool debug, const MacroTaskInfo::StoragePolicy storage_policy) override
called by the MacroTaskQ when the task is scheduled
Definition macrotaskq.h:888
void unary_tuple_loop(tupleT &tuple, opT &op) const
Definition macrotaskq.h:835
std::enable_if< is_tuple< resultT1 >::value, void >::type accumulate_into_final_result(World &subworld, resultT1 &final_result, const resultT1 &tmp_result, const argtupleT &argtuple)
accumulate the result of the task into the final result living in the universe
Definition macrotaskq.h:846
recordlistT inputrecords
Definition macrotaskq.h:778
Definition macrotaskq.h:628
MacroTask & set_debug(const bool value)
Definition macrotaskq.h:676
MacroTask(World &world, taskT &task)
constructor takes the task, but no arguments to the task
Definition macrotaskq.h:649
taskT::resultT resultT
Definition macrotaskq.h:638
std::shared_ptr< MacroTaskQ > taskq_ptr
Definition macrotaskq.h:722
taskT::argtupleT argtupleT
Definition macrotaskq.h:639
bool immediate_execution
Definition macrotaskq.h:643
World & world
Definition macrotaskq.h:721
MacroTask(World &world, taskT &task, MacroTaskInfo::StoragePolicy storage_policy)
constructor takes task and the storage policy, but no arguments to the task
Definition macrotaskq.h:655
resultT operator()(const Ts &... args)
this mimicks the original call to the task functor, called from the universe
Definition macrotaskq.h:686
bool debug
Definition macrotaskq.h:642
taskT task
Definition macrotaskq.h:641
MacroTaskPartitioner::partitionT partitionT
Definition macrotaskq.h:629
Cloud::recordlistT recordlistT
Definition macrotaskq.h:640
recordlistT prepare_output_records(Cloud &cloud, resultT &result)
store pointers to the result WorldObject in the cloud and return the recordlist
Definition macrotaskq.h:727
MacroTask(World &world, taskT &task, std::shared_ptr< MacroTaskQ > taskq_ptr)
constructor takes the task,
Definition macrotaskq.h:661
static void measure_and_print(World &world)
measure the memory usage of all objects of all worlds
Definition memory_measurement.h:21
helper class for returning the result of a task, which is not a madness Function, but a simple scalar
Definition macrotaskq.h:55
void gaxpy(const double a, const T &right, double b, const bool fence=true)
accumulate, optional fence
Definition macrotaskq.h:86
void serialize(Archive &ar)
Definition macrotaskq.h:94
T get()
after completion of the taskq get the final value
Definition macrotaskq.h:99
ScalarResultImpl< T > & operator=(const T &x)
simple assignment of the scalar value
Definition macrotaskq.h:75
ScalarResultImpl(const ScalarResultImpl &other)=delete
Disable the default copy constructor.
ScalarResultImpl< T > & operator=(const ScalarResultImpl< T > &other)=delete
disable assignment operator
ScalarResultImpl< T > & operator+=(const T &x)
Definition macrotaskq.h:80
~ScalarResultImpl()
Definition macrotaskq.h:69
T value
the scalar value
Definition macrotaskq.h:112
T get_local() const
Definition macrotaskq.h:106
T value_type
Definition macrotaskq.h:57
ScalarResultImpl(World &world)
Definition macrotaskq.h:58
Definition macrotaskq.h:116
void serialize(Archive &ar)
Definition macrotaskq.h:147
ScalarResult(const std::shared_ptr< implT > &impl)
Definition macrotaskq.h:123
void gaxpy(const double a, const T &right, double b, const bool fence=true)
accumulate, optional fence
Definition macrotaskq.h:142
ScalarResultImpl< T > implT
Definition macrotaskq.h:118
T get()
after completion of the taskq get the final value
Definition macrotaskq.h:152
ScalarResult(World &world)
Definition macrotaskq.h:122
std::shared_ptr< implT > impl
Definition macrotaskq.h:119
ScalarResult & operator=(const T &x)
Definition macrotaskq.h:124
std::shared_ptr< implT > get_impl() const
Definition macrotaskq.h:129
void set_impl(const std::shared_ptr< implT > &newimpl)
Definition macrotaskq.h:133
T get_local() const
after completion of the taskq get the final value
Definition macrotaskq.h:157
uniqueidT id() const
Definition macrotaskq.h:137
void broadcast_serializable(objT &obj, ProcessID root)
Broadcast a serializable object.
Definition worldgop.h:756
void fence(bool debug=false)
Synchronizes all processes in communicator AND globally ensures no pending AM or tasks.
Definition worldgop.cc:161
bool set_forbid_fence(bool value)
Set forbid_fence flag to new value and return old value.
Definition worldgop.h:676
void sum(T *buf, size_t nelem)
Inplace global sum while still processing AM & tasks.
Definition worldgop.h:872
SafeMPI::Intracomm & comm()
Returns the associated SafeMPI communicator.
Definition worldmpi.h:286
Implements most parts of a globally addressable object (via unique ID).
Definition world_object.h:364
World & get_world() const
Returns a reference to the world.
Definition world_object.h:717
World & world
The World this object belongs to. (Think globally, act locally).
Definition world_object.h:381
void process_pending()
To be called from derived constructor to process pending messages.
Definition world_object.h:656
detail::task_result_type< memfnT >::futureT send(ProcessID dest, memfnT memfn) const
Definition world_object.h:731
detail::task_result_type< memfnT >::futureT task(ProcessID dest, memfnT memfn, const TaskAttributes &attr=TaskAttributes()) const
Sends task to derived class method returnT (this->*memfn)().
Definition world_object.h:1005
A parallel world class.
Definition world.h:132
static World * world_from_id(std::uint64_t id)
Convert a World ID to a World pointer.
Definition world.h:492
ProcessID rank() const
Returns the process rank in this World (same as MPI_Comm_rank()).
Definition world.h:320
WorldMpiInterface & mpi
MPI interface.
Definition world.h:204
ProcessID size() const
Returns the number of processes in this World (same as MPI_Comm_size()).
Definition world.h:330
unsigned long id() const
Definition world.h:315
WorldGopInterface & gop
Global operations.
Definition world.h:207
std::optional< T * > ptr_from_id(uniqueidT id) const
Look up a local pointer from a world-wide unique ID.
Definition world.h:416
Class for unique global IDs.
Definition uniqueid.h:53
Declares the Cloud class for storing data and transfering them between worlds.
char * p(char *buf, const char *name, int k, int initial_level, double thresh, int order)
Definition derivatives.cc:72
const std::size_t bufsize
Definition derivatives.cc:16
static bool debug
Definition dirac-hatom.cc:16
auto T(World &world, response_space &f) -> response_space
Definition global_functions.cc:28
Tensor< typename Tensor< T >::scalar_type > arg(const Tensor< T > &t)
Return a new tensor holding the argument of each element of t (complex types only)
Definition tensor.h:2503
static const double v
Definition hatom_sf_dirac.cc:20
Tensor< double > op(const Tensor< double > &x)
Definition kain.cc:508
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
std::vector< ScalarResult< T > > scalar_result_vector(World &world, std::size_t n)
helper function to create a vector of ScalarResultImpl, circumventing problems with the constructors
Definition macrotaskq.h:165
std::ostream & operator<<(std::ostream &os, const particle< PDIM > &p)
Definition lowrankfunction.h:401
static double cpu_time()
Returns the cpu time in seconds relative to an arbitrary origin.
Definition timers.h:127
void set_impl(std::vector< Function< T, NDIM > > &v, const std::vector< std::shared_ptr< FunctionImpl< T, NDIM > > > vimpl)
Definition vmra.h:688
std::vector< std::shared_ptr< FunctionImpl< T, NDIM > > > get_impl(const std::vector< Function< T, NDIM > > &v)
Definition vmra.h:681
TreeState
Definition funcdefaults.h:59
@ reconstructed
s coeffs at the leaves only
Definition funcdefaults.h:60
@ compressed
d coeffs in internal nodes, s and d coeffs at the root
Definition funcdefaults.h:61
constexpr bool check_tuple_is_valid_task_result()
given a tuple check recursively if all elements are valid task results
Definition macrotaskq.h:224
static const Slice _(0,-1, 1)
void print(const T &t, const Ts &... ts)
Print items to std::cout (items separated by spaces) and terminate with a new line.
Definition print.h:226
@ TT_FULL
Definition gentensor.h:120
NDIM & f
Definition mra.h:2498
constexpr bool is_valid_task_result_v
check if type is a valid task result: it must be a WorldObject and must implement gaxpy
Definition macrotaskq.h:210
const Function< T, NDIM > & change_tree_state(const Function< T, NDIM > &f, const TreeState finalstate, bool fence=true)
change tree state of a function
Definition mra.h:2824
decltype(decay_types(std::declval< T >())) decay_tuple
Definition macrotaskpartitioner.h:22
double wall_time()
Returns the wall time in seconds relative to an arbitrary origin.
Definition timers.cc:48
std::string type(const PairType &n)
Definition PNOParameters.h:18
std::string name(const FuncType &type, const int ex=-1)
Definition ccpairfunction.h:28
Function< T, NDIM > copy(const Function< T, NDIM > &f, const std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > &pmap, bool fence=true)
Create a new copy of the function with different distribution and optional fence.
Definition mra.h:2066
void gaxpy(const double a, ScalarResult< T > &left, const double b, const T &right, const bool fence=true)
the result type of a macrotask must implement gaxpy
Definition macrotaskq.h:246
Definition mraimpl.h:50
static const double b
Definition nonlinschro.cc:119
static const double a
Definition nonlinschro.cc:118
Definition macrotaskq.h:282
StoragePolicy
Definition macrotaskq.h:283
@ StoreFunction
store a madness function in the cloud – can have a large memory impact
Definition macrotaskq.h:284
@ StorePointerToFunction
Definition macrotaskq.h:285
@ StoreFunctionViaPointer
coefficients to the subworlds when the task is started. This is the default policy.
Definition macrotaskq.h:288
static MacroTaskInfo::StoragePolicy default_storage_policy
declaration here, definition in mra1.cc
Definition macrotaskq.h:310
static Cloud::StoragePolicy to_cloud_storage_policy(MacroTaskInfo::StoragePolicy policy)
given the MacroTask's storage policy return the corresponding Cloud storage policy
Definition macrotaskq.h:292
static void set_default(MacroTaskInfo::StoragePolicy sp)
Definition macrotaskq.h:305
static MacroTaskInfo::StoragePolicy get_default()
Definition macrotaskq.h:301
Definition macrotaskq.h:632
Default load of an object via serialize(ar, t).
Definition archive.h:667
Default store of an object via serialize(ar, t).
Definition archive.h:612
class to temporarily redirect output to cout
Definition print.h:277
RAII class to redirect cout to a file.
Definition print.h:251
Definition mra.h:2868
Definition macrotaskq.h:195
Definition macrotaskq.h:180
Definition macrotaskq.h:174
Definition macrotaskq.h:201
Definition macrotaskq.h:189
Definition macrotaskq.h:219
static void load(const Archive &ar, std::shared_ptr< ScalarResultImpl< T > > &ptr)
Definition macrotaskq.h:262
static void store(const Archive &ar, const std::shared_ptr< ScalarResultImpl< T > > &ptr)
Definition macrotaskq.h:252
static const double_complex I
Definition tdse1d.cc:164
void doit(World &world)
Definition tdse.cc:921
void e()
Definition test_sig.cc:75
Declares the World class for the parallel runtime environment.
int ProcessID
Used to clearly identify process number/rank.
Definition worldtypes.h:43