MADNESS 0.10.1
macrotaskq.h
Go to the documentation of this file.
1/**
2 \file macrotaskq.h
3 \brief Declares the \c macrotaskq and MacroTaskBase classes
4 \ingroup mra
5
6 A MacroTaskq executes tasks on World objects, e.g. differentiation of a function or other
7 arithmetic. Complex algorithms can be implemented.
8
9 The universe world is split into subworlds, each of them executing macrotasks of the task queue.
10 This improves locality and speedups for large number of compute nodes, by reducing communications
11 within worlds.
12
13 The user defines a macrotask (an example is found in test_vectormacrotask.cc), the tasks are
14 lightweight and carry only bookkeeping information, actual input and output are stored in a
15 cloud (see cloud.h)
16
17 The user-defined macrotask is derived from MacroTaskIntermediate and must implement the run()
18 method. A heterogeneous task queue is possible.
19
20 The result of a macrotask is an object that lives in the universe and that is accessible from all
21 subworlds. The result is accumulated in the universe and must therefore be a WorldObject. Currently we
22 have implemented
23 - Function<T,NDIM>
24 - std::vector<Function<T,NDIM>> (a vector of Function<T,NDIM>)
25 - ScalarResultImpl<T> (a scalar value)
26 - std::vector<std::shared_ptr<ScalarResultImpl<T>>> (a vector of scalar values), shared_ptr for technical reasons
27
28 - std::tuple<std::vector<XXX>, std::vector<YYY>> (a tuple of n vectors of WorldObjects: XXX, YYY, .. = {Function, ScalarResultImpl, ...})
29
30
31 TODO: priority q
32 TODO: task submission from inside task (serialize task instead of replicate)
33 TODO: update documentation
34 TODO: consider serializing task member variables
35
36*/
37
38
39
40#ifndef SRC_MADNESS_MRA_MACROTASKQ_H_
41#define SRC_MADNESS_MRA_MACROTASKQ_H_
42
43#include <madness/world/cloud.h>
44#include <madness/world/world.h>
47
48namespace madness {
49
50/// helper class for returning the result of a task, which is not a madness Function, but a simple scalar
51
52/// the result value is accumulated via gaxpy in universe rank=0, after completion of the taskq the final
53/// value can be obtained via get(), which includes a broadcast of the final value to all processes
54template<typename T>
55class ScalarResultImpl : public WorldObject<ScalarResultImpl<T>> {
56public:
57 typedef T value_type;
61
62 /// Disable the default copy constructor
63 ScalarResultImpl(const ScalarResultImpl& other) = delete;
64 // ScalarResultImpl<T>(ScalarResultImpl<T>&& ) = default;
65
66 /// disable assignment operator
68
71
72 /// simple assignment of the scalar value
74 value = x;
75 return *this;
76 }
77
79 gaxpy(1.0, x, 1.0,true);
80 return *this;
81 }
82
83 /// accumulate, optional fence
84 void gaxpy(const double a, const T& right, double b, const bool fence=true) {
85 if (this->get_world().rank()==0) {
86 value =a*value + b * right;
87 }
88 else this->send(0, &ScalarResultImpl<T>::gaxpy, a, right, b, fence);
89 }
90
91 template<typename Archive>
92 void serialize(Archive &ar) {
93 ar & value;
94 }
95
96 /// after completion of the taskq get the final value
97 T get() {
98 this->get_world().gop.broadcast_serializable(*this, 0);
99 return value;
100 }
101
102 /// get the local value of this rank, which might differ for different ranks
103 /// for the final value use get()
104 T get_local() const {
105 return value;
106 }
107
108private:
109 /// the scalar value
111};
112
113template<typename T=double>
115public:
117 std::shared_ptr<implT> impl;
118
119 ScalarResult() = default;
120 ScalarResult(World &world) : impl(new implT(world)) {}
121 ScalarResult(const std::shared_ptr<implT>& impl) : impl(impl) {}
123 *(this->impl) = x;
124 return *this;
125 }
126
127 std::shared_ptr<implT> get_impl() const {
128 return impl;
129 }
130
131 void set_impl(const std::shared_ptr<implT>& newimpl) {
133 }
134
135 uniqueidT id() const {
136 return impl->id();
137 }
138
139 /// accumulate, optional fence
140 void gaxpy(const double a, const T& right, double b, const bool fence=true) {
141 impl->gaxpy(a,right,b,fence);
142 }
143
144 template<typename Archive>
145 void serialize(Archive &ar) {
146 ar & impl;
147 }
148
149 /// after completion of the taskq get the final value
150 T get() {
151 return impl->get();
152 }
153
154 /// after completion of the taskq get the final value
155 T get_local() const {
156 return impl->get_local();
157 }
158
159};
160
161/// helper function to create a vector of ScalarResultImpl, circumventing problems with the constructors
162template<typename T>
163std::vector<ScalarResult<T>> scalar_result_vector(World& world, std::size_t n) {
164 std::vector<ScalarResult<T>> v;
165 for (std::size_t i=0; i<n; ++i) v.emplace_back(ScalarResult<T>(world));
166 return v;
167}
168
169
170// type traits to check if a template parameter is a WorldContainer
171template<typename>
172struct is_scalar_result_ptr : std::false_type {};
173
174template <typename T>
175struct is_scalar_result_ptr<std::shared_ptr<madness::ScalarResultImpl<T>>> : std::true_type {};
176
177template<typename>
178struct is_scalar_result_ptr_vector : std::false_type {
179};
180
181template<typename T>
182struct is_scalar_result_ptr_vector<std::vector<std::shared_ptr<typename madness::ScalarResultImpl<T>>>> : std::true_type {
183};
184
185// type traits to check if a template parameter is a WorldContainer
186template<typename>
187struct is_scalar_result : std::false_type {};
188
189template <typename T>
190struct is_scalar_result<madness::ScalarResult<T>> : std::true_type {};
191
192template<typename>
193struct is_scalar_result_impl : std::false_type {};
194
195template <typename T>
196struct is_scalar_result<madness::ScalarResultImpl<T>> : std::true_type {};
197
198template<typename>
199struct is_scalar_result_vector : std::false_type {
200};
201
202template<typename T>
203struct is_scalar_result_vector<std::vector<typename madness::ScalarResult<T>>> : std::true_type {
204};
205
206/// check if type is a valid task result: it must be a WorldObject and must implement gaxpy
207template <typename T>
208inline constexpr bool is_valid_task_result_v =
209 is_madness_function<T>::value // Function<T,NDIM>
210 || is_madness_function_vector<T>::value // std::vector<Function<T,NDIM>>
211 || is_scalar_result<T>::value // ScalarResultImpl<T>
212 || is_scalar_result_vector<T>::value // std::vector<std::shared_ptr<ScalarResultImpl<T>>>
213 || is_scalar_result_ptr<T>::value // ScalarResultImpl<T>
214 || is_scalar_result_ptr_vector<T>::value; // std::vector<std::shared_ptr<ScalarResultImpl<T>>>
215
216
217template<typename> struct is_tuple : std::false_type { };
218template<typename ...T> struct is_tuple<std::tuple<T...>> : std::true_type { };
219
220/// given a tuple check recursively if all elements are valid task results
221template<typename tupleT, std::size_t I>
223
224 typedef decay_tuple <tupleT> argtupleT; // removes const, &, etc
225
226 if constexpr(I >= std::tuple_size_v<tupleT>) {
227 // Last case, if nothing is left to iterate, then exit the function
228 return true;
229 } else {
230 using typeT = typename std::tuple_element<I, argtupleT>::type;// use decay types for determining a vector
231 if constexpr (not is_valid_task_result_v<typeT>) {
232 return false;
233 } else {
234 // Going for next element.
236 }
237 }
238}
239
240
241
242/// the result type of a macrotask must implement gaxpy
243template<typename T>
244void gaxpy(const double a, ScalarResult<T>& left, const double b, const T& right, const bool fence=true) {
245 left.gaxpy(a, right, b, fence);
246}
247
248template <class Archive, typename T>
250 static void store(const Archive& ar, const std::shared_ptr<ScalarResultImpl<T>>& ptr) {
251 bool exists=(ptr) ? true : false;
252 ar & exists;
253 if (exists) ar & ptr->id();
254 }
255};
256
257
258template <class Archive, typename T>
260 static void load(const Archive& ar, std::shared_ptr<ScalarResultImpl<T>>& ptr) {
261 bool exists=false;
262 ar & exists;
263 if (exists) {
264 uniqueidT id;
265 ar & id;
266 World* world = World::world_from_id(id.get_world_id());
267 MADNESS_ASSERT(world);
268 auto ptr_opt = (world->ptr_from_id< ScalarResultImpl<T> >(id));
269 if (!ptr_opt)
270 MADNESS_EXCEPTION("ScalarResultImpl: remote operation attempting to use a locally uninitialized object",0);
271 ptr.reset(ptr_opt.value(), [] (ScalarResultImpl<T> *p_) -> void {}); // disable destruction
272 if (!ptr)
273 MADNESS_EXCEPTION("ScalarResultImpl<T> operation attempting to use an unregistered object",0);
274 } else {
275 ptr=nullptr;
276 }
277 }
278};
279
282 StoreFunction, ///< store a madness function in the cloud -- can have a large memory impact
283 StorePointerToFunction, ///< store the pointer to the function in the cloud, the actual function lives in the universe and
284 ///< its coefficients can be copied to the subworlds (e.g. by macrotaskq) when needed.
285 ///< The task itself is responsible for handling data movement
286 StoreFunctionViaPointer ///< store a pointer to the function in the cloud, but macrotaskq will move the
287 ///< coefficients to the subworlds when the task is started. This is the default policy.
288 };
289
290 friend std::ostream& operator<<(std::ostream& os, const StoragePolicy sp) {
291 if (sp==StoreFunction) os << "StoreFunction";
292 if (sp==StorePointerToFunction) os << "StorePointerToFunction";
293 if (sp==StoreFunctionViaPointer) os << "StoreFunctionViaPointer";
294 return os;
295 }
296
297 /// given the MacroTask's storage policy return the corresponding Cloud storage policy
306
307 /// return some preset policies
308 /// - "default": StoreFunctionViaPointer, cloud rank-replicated, initial functions node-replicated
309 /// - "small_memory": StoreFunctionViaPointer, cloud rank-replicated, initial functions distributed
310 /// - "large_memory": StoreFunction, cloud rank-replicated, initial functions distributed
311 /// the user can also set the policies manually
312 /// note: the policies are checked for consistency when the MacroTaskQ is created
337
338 static std::vector<std::string> get_all_preset_names() {
339 return {"default","node_replicated_target","small_memory","large_memory"};
340 }
341
342 /// helper function to return all presets
343 static std::vector<MacroTaskInfo> get_all_presets() {
344 std::vector<MacroTaskInfo> result;
345 for (const auto& name : get_all_preset_names()) {
346 result.push_back(preset(name));
347 }
348 return result;
349 }
350
351 /// make sure the policies are consistent
352 bool check_consistency() const {
355 bool good=true;
356
358 // if functions are stored in the cloud, the initial functions should be distributed
360
361 } else if (store_pointer_in_cloud) {
362 // if pointers are stored in the cloud, the initial functions can be distributed or replicated,
363 // the cloud should be rank-replicated
365
366 }
367 if (not good) std::cout << *this ;
368
369 return good;
370 }
371
375
376 friend std::ostream& operator<<(std::ostream& os, const MacroTaskInfo policy) {
377 os << "StoragePolicy: " << policy.storage_policy << std::endl;
378 os << "cloud_storage_policy: " << to_cloud_storage_policy(policy.storage_policy) << std::endl;
379 os << "cloud_distribution_policy: " << policy.cloud_distribution_policy << std::endl;
380 os << "ptr_target_distribution_policy: " << policy.ptr_target_distribution_policy << std::endl;
381 return os;
382 }
383
384 friend std::string to_string(const MacroTaskInfo::StoragePolicy sp) {
385 std::ostringstream os;
386 os << sp;
387 return os.str();
388 }
389
390 static StoragePolicy policy_to_string(const std::string policy) {
391 std::string policy_lc=commandlineparser::tolower(policy);
392 if (policy=="function") return MacroTaskInfo::StoreFunction;
393 if (policy=="pointer") return MacroTaskInfo::StorePointerToFunction;
394 if (policy=="functionviapointer") return MacroTaskInfo::StoreFunctionViaPointer;
395 std::string msg="unknown policy: "+policy;
396 MADNESS_EXCEPTION(msg.c_str(),1);
398 }
399
400 nlohmann::json to_json() const {
401 nlohmann::json j;
402 j["storage_policy"]=to_string(storage_policy);
403 j["cloud_distribution_policy"]=to_string(cloud_distribution_policy);
404 j["ptr_target_distribution_policy"]=to_string(ptr_target_distribution_policy);
405 return j;
406 }
407
408
409};
410
411template<typename T=double>
412std::ostream& operator<<(std::ostream& os, const typename MacroTaskInfo::StoragePolicy sp) {
413 if (sp==MacroTaskInfo::StoreFunction) os << "Function";
414 if (sp==MacroTaskInfo::StorePointerToFunction) os << "PointerToFunction";
415 if (sp==MacroTaskInfo::StoreFunctionViaPointer) os << "FunctionViaPointer";
416 return os;
417}
418
419/// base class
421public:
422
423 typedef std::vector<std::shared_ptr<MacroTaskBase> > taskqT;
424
426 virtual ~MacroTaskBase() {};
427
428 double priority=1.0;
430
434
435 bool is_complete() const {return stat==Complete;}
436 bool is_running() const {return stat==Running;}
437 bool is_waiting() const {return stat==Waiting;}
438
439 virtual void run(World& world, Cloud& cloud, taskqT& taskq, const long element,
440 const bool debug, const MacroTaskInfo policy) = 0;
441 virtual void cleanup() = 0; // clear static data (presumably persistent input data)
442
443 virtual void print_me(std::string s="") const {
444 printf("this is task with priority %4.1f\n",priority);
445 }
446 virtual void print_me_as_table(std::string s="") const {
447 print("nothing to print");
448 }
450 std::stringstream ss;
451 ss << std::setw(5) << this->get_priority() << " " <<this->stat;
452 return ss.str();
453 }
454
455 double get_priority() const {return priority;}
456 void set_priority(const double p) {priority=p;}
457
458 friend std::ostream& operator<<(std::ostream& os, const MacroTaskBase::Status s) {
459 if (s==MacroTaskBase::Status::Running) os << "Running";
460 if (s==MacroTaskBase::Status::Waiting) os << "Waiting";
461 if (s==MacroTaskBase::Status::Complete) os << "Complete";
462 if (s==MacroTaskBase::Status::Unknown) os << "Unknown";
463 return os;
464 }
465};
466
467
468template<typename macrotaskT>
470
471public:
472
474
476
477 void cleanup() {};
478};
479
480
481 /// Factory for the MacroTaskQ
483 public:
486 long nworld=1;
487
489
490 MacroTaskQFactory(World& universe) : world(universe), nworld(universe.size()) {}
491
493 nworld = n;
494 return *this;
495 }
496
498 printlevel = p;
499 return *this;
500 }
501
502 MacroTaskQFactory& preset(const std::string name) {
503 return *this;
504 }
505
507 policy = p;
508 return *this;
509 }
510
515
520
525
526 };
527
528
529
530class MacroTaskQ : public WorldObject< MacroTaskQ> {
531
533 std::shared_ptr<World> subworld_ptr;
535 std::mutex taskq_mutex;
537 long nsubworld=1;
538 nlohmann::json cloud_statistics; ///< save cloud statistics after run_all()
539 nlohmann::json taskq_statistics; ///< save taskq statistics after run_all()
540
541 const MacroTaskInfo policy; ///< storage and distribution policy
542
543 /// set the process map for the subworld
544 std::shared_ptr< WorldDCPmapInterface< Key<1> > > pmap1;
545 std::shared_ptr< WorldDCPmapInterface< Key<2> > > pmap2;
546 std::shared_ptr< WorldDCPmapInterface< Key<3> > > pmap3;
547 std::shared_ptr< WorldDCPmapInterface< Key<4> > > pmap4;
548 std::shared_ptr< WorldDCPmapInterface< Key<5> > > pmap5;
549 std::shared_ptr< WorldDCPmapInterface< Key<6> > > pmap6;
550
551 bool printdebug() const {return printlevel>=10;}
552 bool printprogress() const {return (printlevel>=4) and (not (printdebug()));}
553 bool printtimings() const {return universe.rank()==0 and printlevel>=3;}
554 bool printtimings_detail() const {return universe.rank()==0 and printlevel>=5;}
555
556public:
557
560 long get_nsubworld() const {return nsubworld;}
561 void set_printlevel(const long p) {printlevel=p;}
562
564 return policy;
565 }
566
567 nlohmann::json get_cloud_statistics() const {
568 return cloud_statistics;
569 }
570
571 nlohmann::json get_taskq_statistics() const {
572 return taskq_statistics;
573 }
574
575 /// create an empty taskq and initialize the subworlds
593
595
596 /// for each process create a world using a communicator shared with other processes by round-robin
597 /// copy-paste from test_world.cc
598 static std::shared_ptr<World> create_worlds(World& universe, const std::size_t nsubworld) {
599
600 int color = universe.rank() % nsubworld;
602
603 std::shared_ptr<World> all_worlds;
604 all_worlds.reset(new World(comm));
605
607 return all_worlds;
608 }
609
610 /// run all tasks
611 void run_all() {
612
613 if (printdebug()) print_taskq();
614 if (printtimings_detail()) {
615 if (universe.rank()==0) {
616 print("number of tasks in taskq",taskq.size());
617 print("redirecting output to files task.#####");
618 }
619 }
620 taskq_statistics["number_tasks"]=taskq.size();
621
622 // replicate the cloud (not necessarily the target if pointers are stored)
625 double cpu0=cpu_time();
629 double cpu1=cpu_time();
630 if (printtimings_detail()) print("cloud replication wall time",cpu1-cpu0);
631 }
632
633 // replicate the targets (not the cloud) if needed
634 {
635 double cpu0=cpu_time();
639
640
643 for (auto wo : cloud.world_object_base_list) {
645 }
646 }
647
648 // if (need_replication_of_target) cloud.distribute_targets(policy.ptr_target_distribution_policy);
649 double cpu1=cpu_time();
650 if (printtimings_detail()) print("target replication wall time to ",policy.ptr_target_distribution_policy,cpu1-cpu0);
651 }
652
654 cloud_statistics=cloud.get_statistics(universe); // get stats before clearing the cloud
656 universe.gop.set_forbid_fence(true); // make sure there are no hidden universe fences
664
665 double cpu00=cpu_time();
666
667 World& subworld=get_subworld();
668// if (printdebug()) print("I am subworld",subworld.id());
669 double tasktime=0.0;
670 if (printprogress() and universe.rank()==0) std::cout << "progress in percent: " << std::flush;
671 while (true) {
672 long element=get_scheduled_task_number(subworld);
673 double cpu0=cpu_time();
674 if (element<0) break;
675 std::shared_ptr<MacroTaskBase> task=taskq[element];
676 if (printdebug()) print("starting task no",element, "in subworld",subworld.id(),"at time",wall_time());
677
678 task->run(subworld,cloud, taskq, element, printdebug(), policy);
679
680 double cpu1=cpu_time();
682 tasktime+=(cpu1-cpu0);
683 if (printdebug()) printf("completed task %3ld after %6.1fs at time %6.1fs\n",element,cpu1-cpu0,wall_time());
684
685 // print progress
686 const std::size_t ntask=taskq.size();
687 // return percentile of ntask for element
688 auto in_percentile = [&ntask](const long element) {
689 return std::floor(element/(0.1*(ntask+1)));
690 };
691 auto is_first_in_percentile = [&](const long element) {
693 };
695 std::cout << int(in_percentile(element)*10) << " " << std::flush;
696 }
697 }
701 if (printprogress() and universe.rank()==0) std::cout << std::endl;
702 cloud_statistics.update(cloud.gather_timings(universe)); // get stats before clearing the cloud
703 double cpu11=cpu_time();
704 if (printlevel>=4) {
705 if (universe.rank()==0) {
708 print("all tasks complete");
709 }
711 }
712 if (printtimings_detail()) {
713 printf("completed taskqueue after %4.1fs at time %4.1fs\n", cpu11 - cpu00, wall_time());
714 printf(" total cpu time / per world %4.1fs %4.1fs\n", tasktime, tasktime / universe.size());
715 }
716 taskq_statistics["elapsed_time"]=cpu11-cpu00;
717 taskq_statistics["cpu_time_per_world"]=tasktime/universe.size();
718 taskq_statistics["total_cpu_time"]=tasktime;
719
720 // cleanup task-persistent input data
721 for (auto& task : taskq) task->cleanup();
722 cloud.clear_cache(subworld);
723 subworld.gop.fence();
724 subworld.gop.fence();
733 // restore targets to their original state
736 subworld.gop.fence();
737 cloud.clear();
739 subworld.gop.fence();
741 }
742
744 for (const auto& t : vtask) {
745 if (universe.rank()==0) t->set_waiting();
747 }
748 }
749
750 void print_taskq() const {
752 if (universe.rank()==0) {
753 print("\ntaskq on universe rank",universe.rank());
754 print("total number of tasks: ",taskq.size());
755 print(" task batch priority status");
756 for (const auto& t : taskq) t->print_me_as_table();
757 }
759 }
760
761private:
762 void add_replicated_task(const std::shared_ptr<MacroTaskBase>& task) {
763 taskq.push_back(task);
764 }
765
766 /// scheduler is located on universe.rank==0
768 long number=0;
769 if (subworld.rank()==0) {
771 number=r.get();
772 }
773 subworld.gop.broadcast_serializable(number, 0);
774 subworld.gop.fence();
775 return number;
776
777 }
778
781 std::lock_guard<std::mutex> lock(taskq_mutex);
782
783 auto is_Waiting = [](const std::shared_ptr<MacroTaskBase>& mtb_ptr) {return mtb_ptr->is_waiting();};
784 auto it=std::find_if(taskq.begin(),taskq.end(),is_Waiting);
785 if (it!=taskq.end()) {
786 it->get()->set_running();
787 long element=it-taskq.begin();
788 return element;
789 }
790// print("could not find task to schedule");
791 return -1;
792 }
793
794 /// scheduler is located on rank==0
795 void set_complete(const long task_number) const {
796 this->task(ProcessID(0), &MacroTaskQ::set_complete_local, task_number);
797 }
798
799 /// scheduler is located on rank==0
800 void set_complete_local(const long task_number) const {
802 taskq[task_number]->set_complete();
803 }
804
805public:
814private:
815 std::size_t size() const {
816 return taskq.size();
817 }
818
819};
820
821
822template<typename taskT>
825
826 template<typename Q>
827 struct is_vector : std::false_type {
828 };
829 template<typename Q>
830 struct is_vector<std::vector<Q>> : std::true_type {
831 };
832
833 typedef typename taskT::resultT resultT;
834 typedef typename taskT::argtupleT argtupleT;
836
838 bool debug=false;
841 std::shared_ptr<MacroTaskQ> taskq_ptr;
842
843public:
844
845 /// constructor takes the task, but no arguments to the task
850
851 /// constructor takes task and a taskq factory for customization, immediate execution
856
857 /// constructor takes the task, and a taskq, execution is not immediate
858 explicit MacroTask(World &world, taskT &task, std::shared_ptr<MacroTaskQ> taskq_ptr)
860
861 // someone might pass in a taskq nullptr
862 immediate_execution=false; // will be reset by the forwarding constructors
863 if (this->taskq_ptr==0) {
864 this->taskq_ptr=std::make_shared<MacroTaskQ>(MacroTaskQFactory(world));
866 }
867
868 if (debug) this->taskq_ptr->set_printlevel(20);
869 // set the cloud policies
870 auto cloud_storage_policy = MacroTaskInfo::to_cloud_storage_policy(this->taskq_ptr->get_policy().storage_policy);
871 this->taskq_ptr->cloud.set_storing_policy(cloud_storage_policy);
872 this->taskq_ptr->cloud.set_replication_policy(this->taskq_ptr->get_policy().cloud_distribution_policy);
873
874 }
875
876 MacroTask& set_debug(const bool value) {
877 debug=value;
878 return *this;
879 }
880
881 std::shared_ptr<MacroTaskQ> get_taskq() const {
882 return taskq_ptr;
883 }
884
885
886 /// this mimicks the original call to the task functor, called from the universe
887
888 /// store all input to the cloud, create output Function<T,NDIM> in the universe,
889 /// create the batched task and shove it into the taskq. Possibly execute the taskq.
890 template<typename ... Ts>
891 resultT operator()(const Ts &... args) {
892
893 auto argtuple = std::tie(args...);
894 static_assert(std::is_same<decltype(argtuple), argtupleT>::value, "type or number of arguments incorrect");
895
896 // partition the argument vector into batches
897 auto partitioner=task.partitioner;
898 if (not partitioner) partitioner.reset(new MacroTaskPartitioner);
899 partitioner->set_nsubworld(world.size());
900 partitionT partition = partitioner->partition_tasks(argtuple);
901
902 if (debug and world.rank()==0) print(taskq_ptr->get_policy());
903
904 recordlistT inputrecords = taskq_ptr->cloud.store(world, argtuple);
905 resultT result = task.allocator(world, argtuple);
906 auto outputrecords =prepare_output_records(taskq_ptr->cloud, result);
907
908 // create tasks and add them to the taskq
910 for (const auto& batch_prio : partition) {
911 vtask.push_back(
912 std::shared_ptr<MacroTaskBase>(new MacroTaskInternal(task, batch_prio, inputrecords, outputrecords)));
913 }
914 taskq_ptr->add_tasks(vtask);
915 if (immediate_execution) taskq_ptr->run_all();
916
917 return result;
918 }
919private:
920
921
922 /// store *pointers* to the result WorldObject in the cloud and return the recordlist
924 if constexpr (is_tuple<resultT>::value) {
926 "tuple has invalid result type in prepare_output_records");
927 } else {
928 static_assert(is_valid_task_result_v<resultT>, "unknown result type in prepare_output_records");
929 }
930
931 if (debug) print("storing pointers to output in cloud");
932 // store an element of the tuple only
933 auto store_output_records = [&](const auto& result) {
934 recordlistT outputrecords;
935 typedef std::decay_t<decltype(result)> argT;
936 if constexpr (is_madness_function<argT>::value) {
937 outputrecords += cloud.store(world, result.get_impl().get()); // store pointer to FunctionImpl
938 } else if constexpr (is_madness_function_vector<argT>::value) {
939 outputrecords += cloud.store(world, get_impl(result));
940 } else if constexpr (is_scalar_result<argT>::value) {
941 outputrecords += cloud.store(world, result.get_impl()); // store pointer to ScalarResultImpl
942 } else if constexpr (is_vector<argT>::value) {
944 // argT = std::vector<ScalarResult<T>>
945 std::vector<std::shared_ptr<typename argT::value_type::implT>> v;
946 for (const auto& ptr : result) v.push_back(ptr.get_impl());
947 outputrecords+=cloud.store(world,v);
948 } else {
949 MADNESS_EXCEPTION("\n\n unknown vector result type in prepare_input ", 1);
950 }
951 } else {
952 MADNESS_EXCEPTION("should not be here",1);
953 }
954 return outputrecords;
955 };
956
957 recordlistT outputrecords;
958 if constexpr (is_tuple<resultT>::value) {
959 // loop over tuple elements -- args is the individual tuple element
960 std::apply([&](auto &&... args) {
961 (( outputrecords+=store_output_records(args) ), ...);
962 }, result);
963 } else {
964 outputrecords=store_output_records(result);
965 }
966 return outputrecords;
967 }
968
969
970 class MacroTaskInternal : public MacroTaskIntermediate<MacroTask> {
971
972 typedef decay_tuple<typename taskT::argtupleT> argtupleT; // removes const, &, etc
973 typedef typename taskT::resultT resultT;
976 public:
978 std::string get_name() const {
979 if (task.name=="unknown_task") return typeid(task).name();
980 return task.name;
981 }
982
983 MacroTaskInternal(const taskT &task, const std::pair<Batch,double> &batch_prio,
986 if constexpr (is_tuple<resultT>::value) {
988 "tuple has invalid result type in prepare_output_records");
989 } else {
990 static_assert(is_valid_task_result_v<resultT>, "unknown result type in prepare_output_records");
991 }
992 this->task.batch=batch_prio.first;
993 this->priority=batch_prio.second;
994 }
995
996
997 void print_me(std::string s="") const override {
998 print("this is task",get_name(),"with batch", task.batch,"priority",this->get_priority());
999 }
1000
1001 void print_me_as_table(std::string s="") const override {
1002 std::stringstream ss;
1003 std::string name=get_name();
1004 std::size_t namesize=std::min(std::size_t(28),name.size());
1005 name += std::string(28-namesize,' ');
1006
1007 std::stringstream ssbatch;
1008 ssbatch << task.batch;
1009 std::string strbatch=ssbatch.str();
1010 int nspaces=std::max(int(0),35-int(ssbatch.str().size()));
1011 strbatch+=std::string(nspaces,' ');
1012
1013 ss << name
1014 << std::setw(10) << strbatch
1016 print(ss.str());
1017 }
1018
1019 /// accumulate the result of the task into the final result living in the universe
1020 template<typename resultT1, std::size_t I=0>
1021 typename std::enable_if<is_tuple<resultT1>::value, void>::type
1023 if constexpr(I < std::tuple_size_v<resultT1>) {
1024 using elementT = typename std::tuple_element<I, resultT>::type;// use decay types for determining a vector
1025 auto element_final=std::get<I>(final_result);
1026 auto element_tmp=std::get<I>(tmp_result);
1029 }
1030 }
1031
1032 /// accumulate the result of the task into the final result living in the universe
1033 template<typename resultT1>
1034 typename std::enable_if<not is_tuple<resultT1>::value, void>::type
1037 // gaxpy can be done in reconstructed or compressed mode
1038 TreeState operating_state=result_tmp.get_impl()->get_tensor_type()==TT_FULL ? compressed : reconstructed;
1039 result_tmp.change_tree_state(operating_state);
1040 gaxpy(1.0,result,1.0, result_tmp);
1041 } else if constexpr(is_madness_function_vector<resultT1>::value) {
1042 TreeState operating_state=result_tmp[0].get_impl()->get_tensor_type()==TT_FULL ? compressed : reconstructed;
1043 change_tree_state(result_tmp,operating_state);
1044 // compress(subworld, result_tmp);
1045 // resultT1 tmp1=task.allocator(subworld,argtuple);
1046 // tmp1=task.batch.template insert_result_batch(tmp1,result_tmp);
1047 gaxpy(1.0,result,1.0,result_tmp,false);
1048 // was using operator+=, but this requires a fence, which is not allowed here..
1049 // result += tmp1;
1050 } else if constexpr (is_scalar_result<resultT1>::value) {
1051 gaxpy(1.0, result, 1.0, result_tmp.get_local(), false);
1052 } else if constexpr (is_scalar_result_vector<resultT1>::value) {
1053 // resultT1 tmp1=task.allocator(subworld,argtuple);
1054 // tmp1=task.batch.template insert_result_batch(tmp1,result_tmp);
1055 std::size_t sz=result.size();
1056 for (size_t i=0; i<sz; ++i) {
1057 gaxpy(1.0, result[i], 1.0, result_tmp[i].get_local(), false);
1058 }
1059 }
1060
1061 }
1062
1063 /// called by the MacroTaskQ when the task is scheduled
1064 void run(World &subworld, Cloud &cloud, MacroTaskBase::taskqT &taskq, const long element, const bool debug,
1065 const MacroTaskInfo policy) override {
1066 io_redirect io(element,get_name()+"_task",debug);
1067 const argtupleT argtuple = cloud.load<argtupleT>(subworld, inputrecords);
1068 argtupleT batched_argtuple = task.batch.copy_input_batch(argtuple);
1069
1070 std::string msg="";
1071 // maybe move this block to the cloud?
1073 double cpu0=wall_time();
1074 Cloud::cloudtimer timer(subworld,cloud.copy_time);
1075 // the functions loaded from the cloud are pointers to the universe functions,
1076 // retrieve the function coefficients from the universe
1077 // aka: turn the current shallow copy into a deep copy
1078 if (debug) print("loading function coefficients from universe for task",get_name());
1079
1080 // loop over the tuple -- copy the functions from the universe to the subworld
1081 auto copi = [&](auto& arg) {
1082 typedef std::decay_t<decltype(arg)> argT;
1083 if constexpr (is_madness_function<argT>::value) {
1084 arg=copy(subworld, arg);
1085 } else if constexpr (is_madness_function_vector<argT>::value) {
1086 for (auto& f : arg) f=copy(subworld,f);
1087 }
1088 };
1089
1091 double cpu1=wall_time();
1092 if (debug) {
1094 print("copied coefficients for task",get_name(),"in",cpu1-cpu0,"seconds");
1095 }
1096 }
1097
1098 try {
1099 print("starting task no",element, ", '",get_name(),"', in subworld",subworld.id(),"at time",wall_time());
1100 double cpu0=cpu_time();
1101 task.subworld_ptr=&subworld; // give the task access to the subworld
1102 resultT result_batch = std::apply(task, batched_argtuple); // lives in the subworld, is a batch of the full vector (if applicable)
1103 double cpu1=cpu_time();
1104 constexpr std::size_t bufsize=256;
1105 char buffer[bufsize];
1106 std::snprintf(buffer,bufsize,"completed task %3ld after %6.1fs at time %6.1fs\n",element,cpu1-cpu0,wall_time());
1107 print(std::string(buffer));
1108
1109 // move the result from the batch to the final result, all still in subworld
1110 auto insert_batch = [&](auto& element1, auto& element2) {
1111 typedef std::decay_t<decltype(element1)> decay_type;;
1112 if constexpr (is_vector<decay_type>::value) {
1113 element1=task.batch.insert_result_batch(element1,element2);
1114 } else {
1115 std::swap(element1,element2);
1116 }
1117 };
1118 resultT result_subworld=task.allocator(subworld,argtuple);
1119 if constexpr (is_tuple<resultT>::value) {
1121 } else {
1122 insert_batch(result_subworld,result_batch);
1123 }
1124
1125 // accumulate the subworld-local results into the final, universe result
1126 resultT result_universe=get_output(subworld, cloud); // lives in the universe
1127
1129
1130 } catch (std::exception& e) {
1131 print("failing task no",element,"in subworld",subworld.id(),"at time",wall_time());
1132 print(e.what());
1134 print("\n\n");
1135 MADNESS_EXCEPTION("failing task",1);
1136 }
1137
1138 };
1139
1140 // this is called after all tasks have been executed and the taskq has ended
1141 void cleanup() override {
1142 }
1143
1144 template<typename T, std::size_t NDIM>
1145 static Function<T,NDIM> pointer2WorldObject(const std::shared_ptr<FunctionImpl<T,NDIM>> impl) {
1146 Function<T,NDIM> result;
1147 result.set_impl(impl);
1148 return result;
1149 }
1150
1151 template<typename T, std::size_t NDIM>
1152 static std::vector<Function<T,NDIM>> pointer2WorldObject(const std::vector<std::shared_ptr<FunctionImpl<T,NDIM>>> v_impl) {
1153 std::vector<Function<T,NDIM>> vresult;
1154 vresult.resize(v_impl.size());
1156 return vresult;
1157 }
1158
1159 template<typename T>
1161 return ScalarResult(sr_impl);
1162 }
1163
1164 template<typename T>
1165 static std::vector<ScalarResult<T>> pointer2WorldObject(const std::vector<std::shared_ptr<ScalarResultImpl<T>>> v_sr_impl) {
1166 std::vector<ScalarResult<T>> vresult(v_sr_impl.size());
1167 for (size_t i=0; i<v_sr_impl.size(); ++i) {
1168 vresult[i].set_impl(v_sr_impl[i]);
1169 }
1170 return vresult;
1171 }
1172
1173 /// return the WorldObjects or the result functions living in the universe
1174
1175 /// read the pointers to the universe WorldObjects from the cloud,
1176 /// convert them to actual WorldObjects and return them
1177 resultT get_output(World &subworld, Cloud &cloud) const {
1178 resultT result;
1179
1180 // save outputrecords, because they will be consumed by the cloud
1181 auto outputrecords1 = this->outputrecords;
1182
1183 // turn an element of the tuple of pointers into an element of the tuple of WorldObjects
1184 auto doit = [&](auto& element) {
1185 typedef std::decay_t<decltype(element)> elementT;
1186
1187 // load the elements from the cloud -- they contain pointers to WorldObjects
1189 typedef typename elementT::value_type::implT implT;
1190 auto ptr_element = cloud.consuming_load<std::vector<std::shared_ptr<implT>>>(
1191 subworld, outputrecords1);
1193 }
1194 else if constexpr (is_madness_function<elementT>::value) {
1195 typedef typename elementT::implT implT;
1196 auto ptr_element = cloud.consuming_load<std::shared_ptr<implT>>(subworld, outputrecords1);
1198 }
1199 else if constexpr (is_scalar_result_vector<elementT>::value) { // std::vector<ScalarResult<T>>
1200 typedef typename elementT::value_type ScalarResultT;
1201 typedef typename ScalarResultT::implT implT;
1202 typedef std::vector<std::shared_ptr<implT>> vptrT;
1203 auto ptr_element = cloud.consuming_load<vptrT>(subworld, outputrecords1);
1205 }
1206 else if constexpr (is_scalar_result<elementT>::value) {
1207 // elementT is a ScalarResultImpl<T>
1208 // in cloud we store a std::shared_ptr<ScalarResultImpl<T>>
1209 auto ptr_element = cloud.consuming_load<std::shared_ptr<typename elementT::implT>>(subworld, outputrecords1);
1211 }
1212 else {
1213 MADNESS_EXCEPTION("confused about the type of the result", 1);
1214 }
1215 };
1216 if constexpr (is_tuple<resultT>::value) {
1218 "invalid tuple task result -- must be vectors of functions");
1219 static_assert(is_tuple<resultT>::value, "is a tuple");
1220
1221 // loop over all tuple elements
1222 // 1. load the pointers to the WorldObjects living in the universe
1223 // 2. create WorldObjects from the pointers and copy them into the tuple of type resultT
1224
1225 // turn the tuple of pointers into a tuple of WorldObjects
1226 unary_tuple_loop(result,doit);
1227
1228
1229 } else {
1230 doit(result);
1231
1232 }
1233 return result;
1234 }
1235
1236 };
1237
1238};
1239
1241public:
1244 std::string name="unknown_task";
1245 std::shared_ptr<MacroTaskPartitioner> partitioner=0;
1247};
1248
1249
1250} /* namespace madness */
1251
1252#endif /* SRC_MADNESS_MRA_MACROTASKQ_H_ */
Wrapper around MPI_Comm. Has a shallow copy constructor; use Create(Get_group()) for deep copy.
Definition safempi.h:497
Intracomm Split(int Color, int Key=0) const
Definition safempi.h:642
a batch consists of a 2D-input batch and a 1D-output batch: K-batch <- (I-batch, J-batch)
Definition macrotaskpartitioner.h:124
cloud class
Definition cloud.h:178
void clear()
Definition cloud.h:468
void replicate_per_node(const std::size_t chunk_size=INT_MAX)
Definition cloud.h:606
nlohmann::json get_statistics(World &world) const
return a json object with the cloud settings and statistics
Definition cloud.h:326
recordlistT store(madness::World &world, const T &source)
Definition cloud.h:566
std::atomic< long > target_replication_time
Definition cloud.h:693
nlohmann::json gather_timings(World &universe) const
Definition cloud.h:373
void replicate(const std::size_t chunk_size=INT_MAX)
Definition cloud.h:626
void print_size(World &universe)
Definition cloud.h:300
T load(madness::World &world, const recordlistT recordlist) const
load a single object from the cloud, recordlist is kept unchanged
Definition cloud.h:526
std::list< WorldObjectBase * > world_object_base_list
Definition cloud.h:230
static void print_memory_statistics(const nlohmann::json stats)
Definition cloud.h:442
DistributionType get_replication_policy() const
is the cloud container replicated: per rank, per node, or distributed
Definition cloud.h:275
void clear_cache(World &subworld)
Definition cloud.h:462
std::atomic< long > copy_time
Definition cloud.h:692
void set_replication_policy(const DistributionType value)
is the cloud container replicated: per rank, per node, or distributed
Definition cloud.h:268
void distribute_targets(const DistributionType dt=Distributed)
distribute/node/rank replicate the targets of all world objects stored in the cloud
Definition cloud.h:509
void print_timings(World &universe) const
backwards compatibility
Definition cloud.h:412
void set_storing_policy(const StoragePolicy value)
storing policy refers to storing functions or pointers to functions
Definition cloud.h:291
StoragePolicy
Definition cloud.h:194
@ StoreFunctionPointer
Definition cloud.h:197
@ StoreFunction
Definition cloud.h:195
T consuming_load(madness::World &world, recordlistT &recordlist) const
similar to load, but will consume the recordlist
Definition cloud.h:539
static void set_default_pmap(World &world)
Definition mraimpl.h:3562
static std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > & get_pmap()
Returns the default process map that was last initialized via set_default_pmap()
Definition funcdefaults.h:390
static void set_pmap(const std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > &value)
Sets the default process map (does not redistribute existing functions)
Definition funcdefaults.h:421
FunctionImpl holds all Function state to facilitate shallow copy semantics.
Definition funcimpl.h:945
A multiresolution adaptive numerical function.
Definition mra.h:139
void set_impl(const std::shared_ptr< FunctionImpl< T, NDIM > > &impl)
Replace current FunctionImpl with provided new one.
Definition mra.h:666
A future is a possibly yet unevaluated value.
Definition future.h:369
T & get(bool dowork=true) &
Gets the value, waiting if necessary.
Definition future.h:570
base class
Definition macrotaskq.h:420
void set_running()
Definition macrotaskq.h:432
virtual ~MacroTaskBase()
Definition macrotaskq.h:426
void set_waiting()
Definition macrotaskq.h:433
MacroTaskBase()
Definition macrotaskq.h:425
virtual void cleanup()=0
virtual void print_me(std::string s="") const
Definition macrotaskq.h:443
std::string print_priority_and_status_to_string() const
Definition macrotaskq.h:449
void set_complete()
Definition macrotaskq.h:431
double priority
Definition macrotaskq.h:428
bool is_complete() const
Definition macrotaskq.h:435
Status
Definition macrotaskq.h:429
@ Complete
Definition macrotaskq.h:429
@ Running
Definition macrotaskq.h:429
@ Unknown
Definition macrotaskq.h:429
@ Waiting
Definition macrotaskq.h:429
bool is_running() const
Definition macrotaskq.h:436
enum madness::MacroTaskBase::Status stat
double get_priority() const
Definition macrotaskq.h:455
void set_priority(const double p)
Definition macrotaskq.h:456
virtual void print_me_as_table(std::string s="") const
Definition macrotaskq.h:446
virtual void run(World &world, Cloud &cloud, taskqT &taskq, const long element, const bool debug, const MacroTaskInfo policy)=0
std::vector< std::shared_ptr< MacroTaskBase > > taskqT
Definition macrotaskq.h:423
friend std::ostream & operator<<(std::ostream &os, const MacroTaskBase::Status s)
Definition macrotaskq.h:458
bool is_waiting() const
Definition macrotaskq.h:437
Definition macrotaskq.h:469
MacroTaskIntermediate()
Definition macrotaskq.h:473
void cleanup()
Definition macrotaskq.h:477
~MacroTaskIntermediate()
Definition macrotaskq.h:475
Definition macrotaskq.h:1240
MacroTaskOperationBase()
Definition macrotaskq.h:1246
Batch batch
Definition macrotaskq.h:1242
World * subworld_ptr
Definition macrotaskq.h:1243
std::shared_ptr< MacroTaskPartitioner > partitioner
Definition macrotaskq.h:1245
std::string name
Definition macrotaskq.h:1244
partition one (two) vectors into 1D (2D) batches.
Definition macrotaskpartitioner.h:182
std::list< std::pair< Batch, double > > partitionT
Definition macrotaskpartitioner.h:186
Factory for the MacroTaskQ.
Definition macrotaskq.h:482
MacroTaskQFactory & set_storage_policy(const MacroTaskInfo::StoragePolicy sp)
Definition macrotaskq.h:511
World & world
Definition macrotaskq.h:485
MacroTaskQFactory & set_policy(const MacroTaskInfo p)
Definition macrotaskq.h:506
MacroTaskQFactory & set_nworld(const long n)
Definition macrotaskq.h:492
MacroTaskQFactory & preset(const std::string name)
Definition macrotaskq.h:502
MacroTaskQFactory & set_cloud_distribution_policy(const DistributionType dp)
Definition macrotaskq.h:516
MacroTaskQFactory & set_printlevel(const long p)
Definition macrotaskq.h:497
long printlevel
Definition macrotaskq.h:484
MacroTaskQFactory & set_ptr_target_distribution_policy(const DistributionType dp)
Definition macrotaskq.h:521
long nworld
Definition macrotaskq.h:486
MacroTaskInfo policy
Definition macrotaskq.h:488
MacroTaskQFactory(World &universe)
Definition macrotaskq.h:490
Definition macrotaskq.h:530
long nsubworld
Definition macrotaskq.h:537
std::mutex taskq_mutex
Definition macrotaskq.h:535
bool printdebug() const
Definition macrotaskq.h:551
static void set_pmap(World &world)
Definition macrotaskq.h:806
bool printtimings_detail() const
Definition macrotaskq.h:554
void run_all()
run all tasks
Definition macrotaskq.h:611
void set_complete(const long task_number) const
scheduler is located on rank==0
Definition macrotaskq.h:795
World & universe
Definition macrotaskq.h:532
std::shared_ptr< WorldDCPmapInterface< Key< 1 > > > pmap1
set the process map for the subworld
Definition macrotaskq.h:544
std::shared_ptr< WorldDCPmapInterface< Key< 3 > > > pmap3
Definition macrotaskq.h:546
MacroTaskBase::taskqT taskq
Definition macrotaskq.h:534
MacroTaskInfo get_policy() const
Definition macrotaskq.h:563
nlohmann::json cloud_statistics
save cloud statistics after run_all()
Definition macrotaskq.h:538
void add_tasks(MacroTaskBase::taskqT &vtask)
Definition macrotaskq.h:743
long get_scheduled_task_number(World &subworld)
scheduler is located on universe.rank==0
Definition macrotaskq.h:767
void set_complete_local(const long task_number) const
scheduler is located on rank==0
Definition macrotaskq.h:800
World & get_subworld()
Definition macrotaskq.h:559
MacroTaskQ(const MacroTaskQFactory factory)
create an empty taskq and initialize the subworlds
Definition macrotaskq.h:576
std::shared_ptr< WorldDCPmapInterface< Key< 4 > > > pmap4
Definition macrotaskq.h:547
nlohmann::json get_taskq_statistics() const
Definition macrotaskq.h:571
std::size_t size() const
Definition macrotaskq.h:815
std::shared_ptr< WorldDCPmapInterface< Key< 5 > > > pmap5
Definition macrotaskq.h:548
void print_taskq() const
Definition macrotaskq.h:750
std::shared_ptr< World > subworld_ptr
Definition macrotaskq.h:533
bool printtimings() const
Definition macrotaskq.h:553
std::shared_ptr< WorldDCPmapInterface< Key< 2 > > > pmap2
Definition macrotaskq.h:545
bool printprogress() const
Definition macrotaskq.h:552
std::shared_ptr< WorldDCPmapInterface< Key< 6 > > > pmap6
Definition macrotaskq.h:549
nlohmann::json get_cloud_statistics() const
Definition macrotaskq.h:567
void set_printlevel(const long p)
Definition macrotaskq.h:561
long printlevel
Definition macrotaskq.h:536
madness::Cloud cloud
Definition macrotaskq.h:558
long get_nsubworld() const
Definition macrotaskq.h:560
void add_replicated_task(const std::shared_ptr< MacroTaskBase > &task)
Definition macrotaskq.h:762
~MacroTaskQ()
Definition macrotaskq.h:594
static std::shared_ptr< World > create_worlds(World &universe, const std::size_t nsubworld)
Definition macrotaskq.h:598
nlohmann::json taskq_statistics
save taskq statistics after run_all()
Definition macrotaskq.h:539
long get_scheduled_task_number_local()
Definition macrotaskq.h:779
const MacroTaskInfo policy
storage and distribution policy
Definition macrotaskq.h:541
Definition macrotaskq.h:970
void run(World &subworld, Cloud &cloud, MacroTaskBase::taskqT &taskq, const long element, const bool debug, const MacroTaskInfo policy) override
called by the MacroTaskQ when the task is scheduled
Definition macrotaskq.h:1064
taskT task
Definition macrotaskq.h:977
MacroTaskInternal(const taskT &task, const std::pair< Batch, double > &batch_prio, const recordlistT &inputrecords, const recordlistT &outputrecords)
Definition macrotaskq.h:983
std::enable_if< notis_tuple< resultT1 >::value, void >::type accumulate_into_final_result(World &subworld, resultT1 &result, const resultT1 &result_tmp, const argtupleT &argtuple)
accumulate the result of the task into the final result living in the universe
Definition macrotaskq.h:1035
resultT get_output(World &subworld, Cloud &cloud) const
return the WorldObjects or the result functions living in the universe
Definition macrotaskq.h:1177
static ScalarResult< T > pointer2WorldObject(const std::shared_ptr< ScalarResultImpl< T > > sr_impl)
Definition macrotaskq.h:1160
decay_tuple< typename taskT::argtupleT > argtupleT
Definition macrotaskq.h:972
static std::vector< Function< T, NDIM > > pointer2WorldObject(const std::vector< std::shared_ptr< FunctionImpl< T, NDIM > > > v_impl)
Definition macrotaskq.h:1152
void cleanup() override
Definition macrotaskq.h:1141
taskT::resultT resultT
Definition macrotaskq.h:973
recordlistT outputrecords
Definition macrotaskq.h:975
std::string get_name() const
Definition macrotaskq.h:978
static Function< T, NDIM > pointer2WorldObject(const std::shared_ptr< FunctionImpl< T, NDIM > > impl)
Definition macrotaskq.h:1145
static std::vector< ScalarResult< T > > pointer2WorldObject(const std::vector< std::shared_ptr< ScalarResultImpl< T > > > v_sr_impl)
Definition macrotaskq.h:1165
void print_me_as_table(std::string s="") const override
Definition macrotaskq.h:1001
void print_me(std::string s="") const override
Definition macrotaskq.h:997
std::enable_if< is_tuple< resultT1 >::value, void >::type accumulate_into_final_result(World &subworld, resultT1 &final_result, const resultT1 &tmp_result, const argtupleT &argtuple)
accumulate the result of the task into the final result living in the universe
Definition macrotaskq.h:1022
recordlistT inputrecords
Definition macrotaskq.h:974
Definition macrotaskq.h:823
MacroTask & set_debug(const bool value)
Definition macrotaskq.h:876
MacroTask(World &world, taskT &task)
constructor takes the task, but no arguments to the task
Definition macrotaskq.h:846
MacroTask(World &world, taskT &task, const MacroTaskQFactory factory)
constructor takes task and a taskq factory for customization, immediate execution
Definition macrotaskq.h:852
taskT::resultT resultT
Definition macrotaskq.h:833
std::shared_ptr< MacroTaskQ > taskq_ptr
Definition macrotaskq.h:841
taskT::argtupleT argtupleT
Definition macrotaskq.h:834
bool immediate_execution
Definition macrotaskq.h:839
World & world
Definition macrotaskq.h:840
std::shared_ptr< MacroTaskQ > get_taskq() const
Definition macrotaskq.h:881
resultT operator()(const Ts &... args)
this mimicks the original call to the task functor, called from the universe
Definition macrotaskq.h:891
bool debug
Definition macrotaskq.h:838
taskT task
Definition macrotaskq.h:837
MacroTaskPartitioner::partitionT partitionT
Definition macrotaskq.h:824
Cloud::recordlistT recordlistT
Definition macrotaskq.h:835
recordlistT prepare_output_records(Cloud &cloud, resultT &result)
store pointers to the result WorldObject in the cloud and return the recordlist
Definition macrotaskq.h:923
MacroTask(World &world, taskT &task, std::shared_ptr< MacroTaskQ > taskq_ptr)
constructor takes the task, and a taskq, execution is not immediate
Definition macrotaskq.h:858
static void measure_and_print(World &world)
measure the memory usage of all objects of all worlds
Definition memory_measurement.h:21
helper class for returning the result of a task, which is not a madness Function, but a simple scalar
Definition macrotaskq.h:55
void gaxpy(const double a, const T &right, double b, const bool fence=true)
accumulate, optional fence
Definition macrotaskq.h:84
void serialize(Archive &ar)
Definition macrotaskq.h:92
T get()
after completion of the taskq get the final value
Definition macrotaskq.h:97
ScalarResultImpl< T > & operator=(const T &x)
simple assignment of the scalar value
Definition macrotaskq.h:73
ScalarResultImpl(const ScalarResultImpl &other)=delete
Disable the default copy constructor.
ScalarResultImpl< T > & operator=(const ScalarResultImpl< T > &other)=delete
disable assignment operator
ScalarResultImpl< T > & operator+=(const T &x)
Definition macrotaskq.h:78
~ScalarResultImpl()
Definition macrotaskq.h:69
T value
the scalar value
Definition macrotaskq.h:110
T get_local() const
Definition macrotaskq.h:104
T value_type
Definition macrotaskq.h:57
ScalarResultImpl(World &world)
Definition macrotaskq.h:58
Definition macrotaskq.h:114
void serialize(Archive &ar)
Definition macrotaskq.h:145
ScalarResult(const std::shared_ptr< implT > &impl)
Definition macrotaskq.h:121
void gaxpy(const double a, const T &right, double b, const bool fence=true)
accumulate, optional fence
Definition macrotaskq.h:140
ScalarResultImpl< T > implT
Definition macrotaskq.h:116
T get()
after completion of the taskq get the final value
Definition macrotaskq.h:150
ScalarResult(World &world)
Definition macrotaskq.h:120
std::shared_ptr< implT > impl
Definition macrotaskq.h:117
ScalarResult & operator=(const T &x)
Definition macrotaskq.h:122
std::shared_ptr< implT > get_impl() const
Definition macrotaskq.h:127
void set_impl(const std::shared_ptr< implT > &newimpl)
Definition macrotaskq.h:131
T get_local() const
after completion of the taskq get the final value
Definition macrotaskq.h:155
uniqueidT id() const
Definition macrotaskq.h:135
void broadcast_serializable(objT &obj, ProcessID root)
Broadcast a serializable object.
Definition worldgop.h:756
void fence(bool debug=false)
Synchronizes all processes in communicator AND globally ensures no pending AM or tasks.
Definition worldgop.cc:161
bool set_forbid_fence(bool value)
Set forbid_fence flag to new value and return old value.
Definition worldgop.h:676
void sum(T *buf, size_t nelem)
Inplace global sum while still processing AM & tasks.
Definition worldgop.h:872
SafeMPI::Intracomm & comm()
Returns the associated SafeMPI communicator.
Definition worldmpi.h:286
Implements most parts of a globally addressable object (via unique ID).
Definition world_object.h:366
World & get_world() const
Returns a reference to the world.
Definition world_object.h:719
World & world
The World this object belongs to. (Think globally, act locally).
Definition world_object.h:383
void process_pending()
To be called from derived constructor to process pending messages.
Definition world_object.h:658
detail::task_result_type< memfnT >::futureT send(ProcessID dest, memfnT memfn) const
Definition world_object.h:733
detail::task_result_type< memfnT >::futureT task(ProcessID dest, memfnT memfn, const TaskAttributes &attr=TaskAttributes()) const
Sends task to derived class method returnT (this->*memfn)().
Definition world_object.h:1007
A parallel world class.
Definition world.h:132
static World * world_from_id(std::uint64_t id)
Convert a World ID to a World pointer.
Definition world.h:492
ProcessID rank() const
Returns the process rank in this World (same as MPI_Comm_rank()).
Definition world.h:320
WorldMpiInterface & mpi
MPI interface.
Definition world.h:204
ProcessID size() const
Returns the number of processes in this World (same as MPI_Comm_size()).
Definition world.h:330
unsigned long id() const
Definition world.h:315
WorldGopInterface & gop
Global operations.
Definition world.h:207
std::optional< T * > ptr_from_id(uniqueidT id) const
Look up a local pointer from a world-wide unique ID.
Definition world.h:416
Class for unique global IDs.
Definition uniqueid.h:53
Declares the Cloud class for storing data and transfering them between worlds.
char * p(char *buf, const char *name, int k, int initial_level, double thresh, int order)
Definition derivatives.cc:72
const std::size_t bufsize
Definition derivatives.cc:16
static bool debug
Definition dirac-hatom.cc:16
auto T(World &world, response_space &f) -> response_space
Definition global_functions.cc:28
Tensor< typename Tensor< T >::scalar_type > arg(const Tensor< T > &t)
Return a new tensor holding the argument of each element of t (complex types only)
Definition tensor.h:2518
static const double v
Definition hatom_sf_dirac.cc:20
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
#define MADNESS_CHECK_THROW(condition, msg)
Check a condition — even in a release build the condition is always evaluated so it can have side eff...
Definition madness_exception.h:207
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
std::vector< ScalarResult< T > > scalar_result_vector(World &world, std::size_t n)
helper function to create a vector of ScalarResultImpl, circumventing problems with the constructors
Definition macrotaskq.h:163
std::ostream & operator<<(std::ostream &os, const particle< PDIM > &p)
Definition lowrankfunction.h:401
static double cpu_time()
Returns the cpu time in seconds relative to an arbitrary origin.
Definition timers.h:127
DistributionType
some introspection of how data is distributed
Definition worlddc.h:81
@ NodeReplicated
even if there are several ranks per node
Definition worlddc.h:84
@ Distributed
no replication of the container, the container is distributed over the world
Definition worlddc.h:82
@ RankReplicated
replicate the container over all world ranks
Definition worlddc.h:83
void set_impl(std::vector< Function< T, NDIM > > &v, const std::vector< std::shared_ptr< FunctionImpl< T, NDIM > > > vimpl)
Definition vmra.h:688
std::vector< std::shared_ptr< FunctionImpl< T, NDIM > > > get_impl(const std::vector< Function< T, NDIM > > &v)
Definition vmra.h:681
TreeState
Definition funcdefaults.h:59
@ reconstructed
s coeffs at the leaves only
Definition funcdefaults.h:60
@ compressed
d coeffs in internal nodes, s and d coeffs at the root
Definition funcdefaults.h:61
constexpr bool check_tuple_is_valid_task_result()
given a tuple check recursively if all elements are valid task results
Definition macrotaskq.h:222
static const Slice _(0,-1, 1)
static void binary_tuple_loop(tupleT &tuple1, tupleR &tuple2, opT &op)
loop over the tuple elements of both tuples and execute the operation op on each element pair
Definition type_traits.h:742
void print(const T &t, const Ts &... ts)
Print items to std::cout (items separated by spaces) and terminate with a new line.
Definition print.h:226
static void unary_tuple_loop(tupleT &tuple, opT &op)
loop over a tuple and apply unary operator op to each element
Definition type_traits.h:732
@ TT_FULL
Definition gentensor.h:120
NDIM & f
Definition mra.h:2528
constexpr bool is_valid_task_result_v
check if type is a valid task result: it must be a WorldObject and must implement gaxpy
Definition macrotaskq.h:208
const Function< T, NDIM > & change_tree_state(const Function< T, NDIM > &f, const TreeState finalstate, bool fence=true)
change tree state of a function
Definition mra.h:2854
decltype(decay_types(std::declval< T >())) decay_tuple
Definition macrotaskpartitioner.h:22
double wall_time()
Returns the wall time in seconds relative to an arbitrary origin.
Definition timers.cc:48
std::string type(const PairType &n)
Definition PNOParameters.h:18
static XNonlinearSolver< std::vector< Function< T, NDIM > >, T, vector_function_allocator< T, NDIM > > nonlinear_vector_solver(World &world, const long nvec)
Definition nonlinsol.h:371
std::string name(const FuncType &type, const int ex=-1)
Definition ccpairfunction.h:28
Function< T, NDIM > copy(const Function< T, NDIM > &f, const std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > &pmap, bool fence=true)
Create a new copy of the function with different distribution and optional fence.
Definition mra.h:2096
void gaxpy(const double a, ScalarResult< T > &left, const double b, const T &right, const bool fence=true)
the result type of a macrotask must implement gaxpy
Definition macrotaskq.h:244
Definition mraimpl.h:51
static const double b
Definition nonlinschro.cc:119
static const double a
Definition nonlinschro.cc:118
Definition cloud.h:712
Definition macrotaskq.h:280
friend std::string to_string(const MacroTaskInfo::StoragePolicy sp)
Definition macrotaskq.h:384
StoragePolicy
Definition macrotaskq.h:281
@ StoreFunction
store a madness function in the cloud – can have a large memory impact
Definition macrotaskq.h:282
@ StorePointerToFunction
Definition macrotaskq.h:283
@ StoreFunctionViaPointer
coefficients to the subworlds when the task is started. This is the default policy.
Definition macrotaskq.h:286
static std::vector< MacroTaskInfo > get_all_presets()
helper function to return all presets
Definition macrotaskq.h:343
StoragePolicy storage_policy
Definition macrotaskq.h:372
DistributionType ptr_target_distribution_policy
Definition macrotaskq.h:374
static MacroTaskInfo preset(const std::string name)
Definition macrotaskq.h:313
static std::vector< std::string > get_all_preset_names()
Definition macrotaskq.h:338
static Cloud::StoragePolicy to_cloud_storage_policy(MacroTaskInfo::StoragePolicy policy)
given the MacroTask's storage policy return the corresponding Cloud storage policy
Definition macrotaskq.h:298
DistributionType cloud_distribution_policy
Definition macrotaskq.h:373
friend std::ostream & operator<<(std::ostream &os, const MacroTaskInfo policy)
Definition macrotaskq.h:376
bool check_consistency() const
make sure the policies are consistent
Definition macrotaskq.h:352
friend std::ostream & operator<<(std::ostream &os, const StoragePolicy sp)
Definition macrotaskq.h:290
static StoragePolicy policy_to_string(const std::string policy)
Definition macrotaskq.h:390
nlohmann::json to_json() const
Definition macrotaskq.h:400
Definition macrotaskq.h:827
Definition cloud.h:65
Default load of an object via serialize(ar, t).
Definition archive.h:667
Default store of an object via serialize(ar, t).
Definition archive.h:612
static std::string tolower(std::string s)
make lower case
Definition commandlineparser.h:86
class to temporarily redirect output to cout
Definition print.h:277
RAII class to redirect cout to a file.
Definition print.h:251
Definition type_traits.h:756
Definition mra.h:2902
Definition macrotaskq.h:193
Definition macrotaskq.h:178
Definition macrotaskq.h:172
Definition macrotaskq.h:199
Definition macrotaskq.h:187
Definition macrotaskq.h:217
static void load(const Archive &ar, std::shared_ptr< ScalarResultImpl< T > > &ptr)
Definition macrotaskq.h:260
static void store(const Archive &ar, const std::shared_ptr< ScalarResultImpl< T > > &ptr)
Definition macrotaskq.h:250
Definition timing_utilities.h:9
static const double_complex I
Definition tdse1d.cc:164
void doit(World &world)
Definition tdse.cc:921
void e()
Definition test_sig.cc:75
Declares the World class for the parallel runtime environment.
int ProcessID
Used to clearly identify process number/rank.
Definition worldtypes.h:43