MADNESS 0.10.1
macrotaskq.h
Go to the documentation of this file.
1/**
2 \file macrotaskq.h
3 \brief Declares the \c macrotaskq and MacroTaskBase classes
4 \ingroup mra
5
6 A MacroTaskq executes tasks on World objects, e.g. differentiation of a function or other
7 arithmetic. Complex algorithms can be implemented.
8
9 The universe world is split into subworlds, each of them executing macrotasks of the task queue.
10 This improves locality and speedups for large number of compute nodes, by reducing communications
11 within worlds.
12
13 The user defines a macrotask (an example is found in test_vectormacrotask.cc), the tasks are
14 lightweight and carry only bookkeeping information, actual input and output are stored in a
15 cloud (see cloud.h)
16
17 The user-defined macrotask is derived from MacroTaskIntermediate and must implement the run()
18 method. A heterogeneous task queue is possible.
19
20 The result of a macrotask is an object that lives in the universe and that is accessible from all
21 subworlds. The result is accumulated in the universe and must therefore be a WorldObject. Currently we
22 have implemented
23 - Function<T,NDIM>
24 - std::vector<Function<T,NDIM>> (a vector of Function<T,NDIM>)
25 - ScalarResultImpl<T> (a scalar value)
26 - std::vector<std::shared_ptr<ScalarResultImpl<T>>> (a vector of scalar values), shared_ptr for technical reasons
27
28 - std::tuple<std::vector<XXX>, std::vector<YYY>> (a tuple of n vectors of WorldObjects: XXX, YYY, .. = {Function, ScalarResultImpl, ...})
29
30
31 TODO: priority q
32 TODO: task submission from inside task (serialize task instead of replicate)
33 TODO: update documentation
34 TODO: consider serializing task member variables
35
36*/
37
38
39
40#ifndef SRC_MADNESS_MRA_MACROTASKQ_H_
41#define SRC_MADNESS_MRA_MACROTASKQ_H_
42
43#include <madness/world/cloud.h>
44#include <madness/world/world.h>
47
48namespace madness {
49
50/// helper class for returning the result of a task, which is not a madness Function, but a simple scalar
51
52/// the result value is accumulated via gaxpy in universe rank=0, after completion of the taskq the final
53/// value can be obtained via get(), which includes a broadcast of the final value to all processes
54template<typename T>
55class ScalarResultImpl : public WorldObject<ScalarResultImpl<T>> {
56public:
57 typedef T value_type;
61
62 /// Disable the default copy constructor
63 ScalarResultImpl(const ScalarResultImpl& other) = delete;
64 // ScalarResultImpl<T>(ScalarResultImpl<T>&& ) = default;
65
66 /// disable assignment operator
68
71
72 /// simple assignment of the scalar value
74 value = x;
75 return *this;
76 }
77
79 gaxpy(1.0, x, 1.0,true);
80 return *this;
81 }
82
83 /// accumulate, optional fence
84 void gaxpy(const double a, const T& right, double b, const bool fence=true) {
85 if (this->get_world().rank()==0) {
86 value =a*value + b * right;
87 }
88 else this->send(0, &ScalarResultImpl<T>::gaxpy, a, right, b, fence);
89 }
90
91 template<typename Archive>
92 void serialize(Archive &ar) {
93 ar & value;
94 }
95
96 /// after completion of the taskq get the final value
97 T get() {
98 this->get_world().gop.broadcast_serializable(*this, 0);
99 return value;
100 }
101
102 /// get the local value of this rank, which might differ for different ranks
103 /// for the final value use get()
104 T get_local() const {
105 return value;
106 }
107
108private:
109 /// the scalar value
111};
112
113template<typename T=double>
115public:
117 std::shared_ptr<implT> impl;
118
119 ScalarResult() = default;
120 ScalarResult(World &world) : impl(new implT(world)) {}
121 ScalarResult(const std::shared_ptr<implT>& impl) : impl(impl) {}
123 *(this->impl) = x;
124 return *this;
125 }
126
127 std::shared_ptr<implT> get_impl() const {
128 return impl;
129 }
130
131 void set_impl(const std::shared_ptr<implT>& newimpl) {
132 impl=newimpl;
133 }
134
135 uniqueidT id() const {
136 return impl->id();
137 }
138
139 /// accumulate, optional fence
140 void gaxpy(const double a, const T& right, double b, const bool fence=true) {
141 impl->gaxpy(a,right,b,fence);
142 }
143
144 template<typename Archive>
145 void serialize(Archive &ar) {
146 ar & impl;
147 }
148
149 /// after completion of the taskq get the final value
150 T get() {
151 return impl->get();
152 }
153
154 /// after completion of the taskq get the final value
155 T get_local() const {
156 return impl->get_local();
157 }
158
159};
160
161/// helper function to create a vector of ScalarResultImpl, circumventing problems with the constructors
162template<typename T>
163std::vector<ScalarResult<T>> scalar_result_vector(World& world, std::size_t n) {
164 std::vector<ScalarResult<T>> v;
165 for (std::size_t i=0; i<n; ++i) v.emplace_back(ScalarResult<T>(world));
166 return v;
167}
168
169
170// type traits to check if a template parameter is a WorldContainer
171template<typename>
172struct is_scalar_result_ptr : std::false_type {};
173
174template <typename T>
175struct is_scalar_result_ptr<std::shared_ptr<madness::ScalarResultImpl<T>>> : std::true_type {};
176
177template<typename>
178struct is_scalar_result_ptr_vector : std::false_type {
179};
180
181template<typename T>
182struct is_scalar_result_ptr_vector<std::vector<std::shared_ptr<typename madness::ScalarResultImpl<T>>>> : std::true_type {
183};
184
185// type traits to check if a template parameter is a WorldContainer
186template<typename>
187struct is_scalar_result : std::false_type {};
188
189template <typename T>
190struct is_scalar_result<madness::ScalarResult<T>> : std::true_type {};
191
192template<typename>
193struct is_scalar_result_impl : std::false_type {};
194
195template <typename T>
196struct is_scalar_result<madness::ScalarResultImpl<T>> : std::true_type {};
197
198template<typename>
199struct is_scalar_result_vector : std::false_type {
200};
201
202template<typename T>
203struct is_scalar_result_vector<std::vector<typename madness::ScalarResult<T>>> : std::true_type {
204};
205
206/// check if type is a valid task result: it must be a WorldObject and must implement gaxpy
207template <typename T>
208inline constexpr bool is_valid_task_result_v =
209 is_madness_function<T>::value // Function<T,NDIM>
210 || is_madness_function_vector<T>::value // std::vector<Function<T,NDIM>>
211 || is_scalar_result<T>::value // ScalarResultImpl<T>
212 || is_scalar_result_vector<T>::value // std::vector<std::shared_ptr<ScalarResultImpl<T>>>
213 || is_scalar_result_ptr<T>::value // ScalarResultImpl<T>
214 || is_scalar_result_ptr_vector<T>::value; // std::vector<std::shared_ptr<ScalarResultImpl<T>>>
215
216
217template<typename> struct is_tuple : std::false_type { };
218template<typename ...T> struct is_tuple<std::tuple<T...>> : std::true_type { };
219
220/// given a tuple check recursively if all elements are valid task results
221template<typename tupleT, std::size_t I>
223
224 typedef decay_tuple <tupleT> argtupleT; // removes const, &, etc
225
226 if constexpr(I >= std::tuple_size_v<tupleT>) {
227 // Last case, if nothing is left to iterate, then exit the function
228 return true;
229 } else {
230 using typeT = typename std::tuple_element<I, argtupleT>::type;// use decay types for determining a vector
231 if constexpr (not is_valid_task_result_v<typeT>) {
232 return false;
233 } else {
234 // Going for next element.
235 return check_tuple_is_valid_task_result<tupleT,I+1>();
236 }
237 }
238}
239
240
241
242/// the result type of a macrotask must implement gaxpy
243template<typename T>
244void gaxpy(const double a, ScalarResult<T>& left, const double b, const T& right, const bool fence=true) {
245 left.gaxpy(a, right, b, fence);
246}
247
248template <class Archive, typename T>
249struct madness::archive::ArchiveStoreImpl<Archive, std::shared_ptr<ScalarResultImpl<T>>> {
250 static void store(const Archive& ar, const std::shared_ptr<ScalarResultImpl<T>>& ptr) {
251 bool exists=(ptr) ? true : false;
252 ar & exists;
253 if (exists) ar & ptr->id();
254 }
255};
256
257
258template <class Archive, typename T>
259struct madness::archive::ArchiveLoadImpl<Archive, std::shared_ptr<ScalarResultImpl<T>>> {
260 static void load(const Archive& ar, std::shared_ptr<ScalarResultImpl<T>>& ptr) {
261 bool exists=false;
262 ar & exists;
263 if (exists) {
264 uniqueidT id;
265 ar & id;
266 World* world = World::world_from_id(id.get_world_id());
267 MADNESS_ASSERT(world);
268 auto ptr_opt = (world->ptr_from_id< ScalarResultImpl<T> >(id));
269 if (!ptr_opt)
270 MADNESS_EXCEPTION("ScalarResultImpl: remote operation attempting to use a locally uninitialized object",0);
271 ptr.reset(ptr_opt.value(), [] (ScalarResultImpl<T> *p_) -> void {}); // disable destruction
272 if (!ptr)
273 MADNESS_EXCEPTION("ScalarResultImpl<T> operation attempting to use an unregistered object",0);
274 } else {
275 ptr=nullptr;
276 }
277 }
278};
279
282 StoreFunction, ///< store a madness function in the cloud -- can have a large memory impact
283 StorePointerToFunction, ///< store the pointer to the function in the cloud, the actual function lives in the universe and
284 ///< its coefficients can be copied to the subworlds (e.g. by macrotaskq) when needed.
285 ///< The task itself is responsible for handling data movement
286 StoreFunctionViaPointer ///< store a pointer to the function in the cloud, but macrotaskq will move the
287 ///< coefficients to the subworlds when the task is started. This is the default policy.
288 };
289
290 friend std::ostream& operator<<(std::ostream& os, const StoragePolicy sp) {
291 if (sp==StoreFunction) os << "StoreFunction";
292 if (sp==StorePointerToFunction) os << "StorePointerToFunction";
293 if (sp==StoreFunctionViaPointer) os << "StoreFunctionViaPointer";
294 return os;
295 }
296
297 /// given the MacroTask's storage policy return the corresponding Cloud storage policy
306
307 /// return some preset policies
308 /// - "default": StoreFunctionViaPointer, cloud rank-replicated, initial functions node-replicated
309 /// - "small_memory": StoreFunctionViaPointer, cloud rank-replicated, initial functions distributed
310 /// - "large_memory": StoreFunction, cloud rank-replicated, initial functions distributed
311 /// the user can also set the policies manually
312 /// note: the policies are checked for consistency when the MacroTaskQ is created
337
338 static std::vector<std::string> get_all_preset_names() {
339 return {"default","node_replicated_target","small_memory","large_memory"};
340 }
341
342 /// helper function to return all presets
343 static std::vector<MacroTaskInfo> get_all_presets() {
344 std::vector<MacroTaskInfo> result;
345 for (const auto& name : get_all_preset_names()) {
346 result.push_back(preset(name));
347 }
348 return result;
349 }
350
351 /// make sure the policies are consistent
352 bool check_consistency() const {
353 bool store_pointer_in_cloud = (storage_policy==MacroTaskInfo::StorePointerToFunction
355 bool good=true;
356
358 // if functions are stored in the cloud, the initial functions should be distributed
360
361 } else if (store_pointer_in_cloud) {
362 // if pointers are stored in the cloud, the initial functions can be distributed or replicated,
363 // the cloud should be rank-replicated
365
366 }
367 if (not good) std::cout << *this ;
368
369 return good;
370 }
371
372 /// set policy from a vector of strings, assuming the order is storage policy, cloud distribution policy, ptr target distribution policy
373 void from_vector_of_strings(const std::vector<std::string>& vec) {
374 if (vec.size()!=3) {
375 std::string msg="expected 3 policies, got "+std::to_string(vec.size());
376 MADNESS_EXCEPTION(msg.c_str(),0);
377 }
378 auto remove_quotes = [](const std::string& s) {
379 std::string result=s;
380 if (s.size()>=2 and s.front()=='"' and s.back()=='"') {
381 result=s.substr(1,s.size()-2);
382 }
383 return result;
384 };
385
386 std::string sstorage=remove_quotes(vec[0]);
387 if (sstorage=="storefunction") storage_policy=MacroTaskInfo::StoreFunction;
388 else if (sstorage=="storepointertofunction") storage_policy=MacroTaskInfo::StorePointerToFunction;
389 else if (sstorage=="storefunctionviapointer") storage_policy=MacroTaskInfo::StoreFunctionViaPointer;
390 else {
391 std::string msg="unknown storage policy: "+sstorage;
392 print("msg",msg);
393 MADNESS_CHECK_THROW(0, "Oh no1");
394 }
395
396 std::string scloud=remove_quotes(vec[1]);
398 else if (scloud=="nodereplicated") cloud_distribution_policy=DistributionType::NodeReplicated;
399 else if (scloud=="distributed") cloud_distribution_policy=DistributionType::Distributed;
400 else {
401 std::string msg="unknown cloud distribution policy: "+scloud;
402 print("msg",msg);
403 MADNESS_CHECK_THROW(0, "Oh no2");
404 }
405
406 std::string sptrtarget=remove_quotes(vec[2]);
408 else if (sptrtarget=="nodereplicated") ptr_target_distribution_policy=DistributionType::NodeReplicated;
409 else if (sptrtarget=="distributed") ptr_target_distribution_policy=DistributionType::Distributed;
410 else {
411 std::string msg="unknown ptr target distribution policy: "+sptrtarget;
412 print("msg",msg);
413 MADNESS_CHECK_THROW(0, "Oh no3");
414 }
415 print("set macrotaskinfo to");
416 print(*this);
417
418 }
419
423
424 friend std::ostream& operator<<(std::ostream& os, const MacroTaskInfo policy) {
425 os << "StoragePolicy: " << policy.storage_policy << std::endl;
426 os << "cloud_storage_policy: " << to_cloud_storage_policy(policy.storage_policy) << std::endl;
427 os << "cloud_distribution_policy: " << policy.cloud_distribution_policy << std::endl;
428 os << "ptr_target_distribution_policy: " << policy.ptr_target_distribution_policy << std::endl;
429 return os;
430 }
431
432 friend std::string to_string(const MacroTaskInfo::StoragePolicy sp) {
433 std::ostringstream os;
434 os << sp;
435 return os.str();
436 }
437
438 static StoragePolicy policy_to_string(const std::string policy) {
439 std::string policy_lc=commandlineparser::tolower(policy);
440 if (policy=="function") return MacroTaskInfo::StoreFunction;
441 if (policy=="pointer") return MacroTaskInfo::StorePointerToFunction;
442 if (policy=="functionviapointer") return MacroTaskInfo::StoreFunctionViaPointer;
443 std::string msg="unknown policy: "+policy;
444 MADNESS_EXCEPTION(msg.c_str(),1);
446 }
447
448 nlohmann::json to_json() const {
449 nlohmann::json j;
450 j["storage_policy"]=to_string(storage_policy);
451 j["cloud_distribution_policy"]=to_string(cloud_distribution_policy);
452 j["ptr_target_distribution_policy"]=to_string(ptr_target_distribution_policy);
453 return j;
454 }
455
456
457};
458
459template<typename T=double>
460std::ostream& operator<<(std::ostream& os, const typename MacroTaskInfo::StoragePolicy sp) {
461 if (sp==MacroTaskInfo::StoreFunction) os << "Function";
462 if (sp==MacroTaskInfo::StorePointerToFunction) os << "PointerToFunction";
463 if (sp==MacroTaskInfo::StoreFunctionViaPointer) os << "FunctionViaPointer";
464 return os;
465}
466
467/// base class
469public:
470
471 typedef std::vector<std::shared_ptr<MacroTaskBase> > taskqT;
472
474 virtual ~MacroTaskBase() {};
475
476 double priority=1.0;
478
482
483 bool is_complete() const {return stat==Complete;}
484 bool is_running() const {return stat==Running;}
485 bool is_waiting() const {return stat==Waiting;}
486
487 virtual void run(World& world, Cloud& cloud, taskqT& taskq, const long element,
488 const bool debug, const MacroTaskInfo policy) = 0;
489 virtual void cleanup() = 0; // clear static data (presumably persistent input data)
490
491 virtual void print_me(std::string s="") const {
492 printf("this is task with priority %4.1f\n",priority);
493 }
494 virtual void print_me_as_table(std::string s="") const {
495 print("nothing to print");
496 }
498 std::stringstream ss;
499 ss << std::setw(5) << this->get_priority() << " " <<this->stat;
500 return ss.str();
501 }
502
503 double get_priority() const {return priority;}
504 void set_priority(const double p) {priority=p;}
505
506 friend std::ostream& operator<<(std::ostream& os, const MacroTaskBase::Status s) {
507 if (s==MacroTaskBase::Status::Running) os << "Running";
508 if (s==MacroTaskBase::Status::Waiting) os << "Waiting";
509 if (s==MacroTaskBase::Status::Complete) os << "Complete";
510 if (s==MacroTaskBase::Status::Unknown) os << "Unknown";
511 return os;
512 }
513};
514
515
516template<typename macrotaskT>
518
519public:
520
522
524
525 void cleanup() {};
526};
527
528
529 /// Factory for the MacroTaskQ
531 public:
534 long nworld=1;
535
537
538 MacroTaskQFactory(World& universe) : world(universe), nworld(universe.size()) {}
539
541 nworld = n;
542 return *this;
543 }
544
546 printlevel = p;
547 return *this;
548 }
549
550 MacroTaskQFactory& preset(const std::string name) {
551 return *this;
552 }
553
555 policy = p;
556 return *this;
557 }
558
563
568
573
574 };
575
576
577
578class MacroTaskQ : public WorldObject< MacroTaskQ> {
579
581 std::shared_ptr<World> subworld_ptr;
583 std::mutex taskq_mutex;
585 long nsubworld=1;
586 nlohmann::json cloud_statistics; ///< save cloud statistics after run_all()
587 nlohmann::json taskq_statistics; ///< save taskq statistics after run_all()
588
589 const MacroTaskInfo policy; ///< storage and distribution policy
590
591 /// set the process map for the subworld
592 std::shared_ptr< WorldDCPmapInterface< Key<1> > > pmap1;
593 std::shared_ptr< WorldDCPmapInterface< Key<2> > > pmap2;
594 std::shared_ptr< WorldDCPmapInterface< Key<3> > > pmap3;
595 std::shared_ptr< WorldDCPmapInterface< Key<4> > > pmap4;
596 std::shared_ptr< WorldDCPmapInterface< Key<5> > > pmap5;
597 std::shared_ptr< WorldDCPmapInterface< Key<6> > > pmap6;
598
599 bool printdebug() const {return printlevel>=10;}
600 bool printprogress() const {return (printlevel>=4) and (not (printdebug()));}
601 bool printtimings() const {return universe.rank()==0 and printlevel>=3;}
602 bool printtimings_detail() const {return universe.rank()==0 and printlevel>=5;}
603
604public:
605
608 long get_nsubworld() const {return nsubworld;}
609 void set_printlevel(const long p) {printlevel=p;}
610
612 return policy;
613 }
614
615 nlohmann::json get_cloud_statistics() const {
616 return cloud_statistics;
617 }
618
619 nlohmann::json get_taskq_statistics() const {
620 return taskq_statistics;
621 }
622
623 /// create an empty taskq and initialize the subworlds
624 explicit MacroTaskQ(const MacroTaskQFactory factory)
625 : WorldObject<MacroTaskQ>(factory.world)
626 , universe(factory.world)
627 , taskq()
628 , printlevel(factory.printlevel)
629 , nsubworld(factory.nworld)
630 , policy(factory.policy)
631 , cloud(factory.world)
632 {
634 MADNESS_CHECK_THROW(policy.check_consistency(),"MacroTaskQ: inconsistent storage policy");
637
638 if (printdebug()) print(policy);
639 this->process_pending();
640 }
641
643
644 /// for each process create a world using a communicator shared with other processes by round-robin
645 /// copy-paste from test_world.cc
646 static std::shared_ptr<World> create_worlds(World& universe, const std::size_t nsubworld) {
647
648 int color = universe.rank() % nsubworld;
650
651 std::shared_ptr<World> all_worlds;
652 all_worlds.reset(new World(comm));
653
655 return all_worlds;
656 }
657
658 /// run all tasks
659 void run_all() {
660
661 if (printdebug()) print_taskq();
662 if (printtimings_detail()) {
663 if (universe.rank()==0) {
664 print("number of tasks in taskq",taskq.size());
665 print("redirecting output to files task.#####");
666 }
667 }
668 taskq_statistics["number_tasks"]=taskq.size();
669
670 // replicate the cloud (not necessarily the target if pointers are stored)
671 auto replication_policy = cloud.get_replication_policy();
672 if (replication_policy!=DistributionType::Distributed) {
673 double cpu0=cpu_time();
674 if (replication_policy==DistributionType::RankReplicated) cloud.replicate();
675 if (replication_policy==DistributionType::NodeReplicated) cloud.replicate_per_node(); // replicate to all hosts
677 double cpu1=cpu_time();
678 if (printtimings_detail()) print("cloud replication wall time",cpu1-cpu0);
679 }
680
681 // replicate the targets (not the cloud) if needed
682 {
683 double cpu0=cpu_time();
684 const bool need_replication_of_target=(policy.ptr_target_distribution_policy!=DistributionType::Distributed)
687
688
689 if (need_replication_of_target) {
691 for (auto wo : cloud.world_object_base_list) {
692 loop_types<Cloud::DistributeFunctor, double, float, double_complex, float_complex>(std::tuple<DistributionType>(dt),wo);
693 }
694 }
695
696 // if (need_replication_of_target) cloud.distribute_targets(policy.ptr_target_distribution_policy);
697 double cpu1=cpu_time();
698 if (printtimings_detail()) print("target replication wall time to ",policy.ptr_target_distribution_policy,cpu1-cpu0);
699 }
700
702 cloud_statistics=cloud.get_statistics(universe); // get stats before clearing the cloud
704 universe.gop.set_forbid_fence(true); // make sure there are no hidden universe fences
712
713 double cpu00=cpu_time();
714
715 World& subworld=get_subworld();
716// if (printdebug()) print("I am subworld",subworld.id());
717 double tasktime=0.0;
718 if (printprogress() and universe.rank()==0) std::cout << "progress in percent: " << std::flush;
719 while (true) {
720 long element=get_scheduled_task_number(subworld);
721 double cpu0=cpu_time();
722 if (element<0) break;
723 std::shared_ptr<MacroTaskBase> task=taskq[element];
724 if (printdebug()) print("starting task no",element, "in subworld",subworld.id(),"at time",wall_time());
725
726 task->run(subworld,cloud, taskq, element, printdebug(), policy);
727
728 double cpu1=cpu_time();
729 set_complete(element);
730 tasktime+=(cpu1-cpu0);
731 if (printdebug()) printf("completed task %3ld after %6.1fs at time %6.1fs\n",element,cpu1-cpu0,wall_time());
732
733 // print progress
734 const std::size_t ntask=taskq.size();
735 // return percentile of ntask for element
736 auto in_percentile = [&ntask](const long element) {
737 return std::floor(element/(0.1*(ntask+1)));
738 };
739 auto is_first_in_percentile = [&](const long element) {
740 return (in_percentile(element)!=in_percentile(element-1));
741 };
742 if (printprogress() and is_first_in_percentile(element)) {
743 std::cout << int(in_percentile(element)*10) << " " << std::flush;
744 }
745 }
748 universe.gop.sum(tasktime);
749 if (printprogress() and universe.rank()==0) std::cout << std::endl;
750 cloud_statistics.update(cloud.gather_timings(universe)); // get stats before clearing the cloud
751 double cpu11=cpu_time();
752 if (printlevel>=4) {
753 if (universe.rank()==0) {
756 print("all tasks complete");
757 }
759 }
760 if (printtimings_detail()) {
761 printf("completed taskqueue after %4.1fs at time %4.1fs\n", cpu11 - cpu00, wall_time());
762 printf(" total cpu time / per world %4.1fs %4.1fs\n", tasktime, tasktime / universe.size());
763 }
764 taskq_statistics["elapsed_time"]=cpu11-cpu00;
765 taskq_statistics["cpu_time_per_world"]=tasktime/universe.size();
766 taskq_statistics["total_cpu_time"]=tasktime;
767
768 // cleanup task-persistent input data
769 for (auto& task : taskq) task->cleanup();
770 cloud.clear_cache(subworld);
771 subworld.gop.fence();
772 subworld.gop.fence();
781 // restore targets to their original state
784 subworld.gop.fence();
785 cloud.clear();
787 subworld.gop.fence();
789 }
790
792 for (const auto& t : vtask) {
793 if (universe.rank()==0) t->set_waiting();
795 }
796 }
797
798 void print_taskq() const {
800 if (universe.rank()==0) {
801 print("\ntaskq on universe rank",universe.rank());
802 print("total number of tasks: ",taskq.size());
803 print(" task batch priority status");
804 for (const auto& t : taskq) t->print_me_as_table();
805 }
807 }
808
809private:
810 void add_replicated_task(const std::shared_ptr<MacroTaskBase>& task) {
811 taskq.push_back(task);
812 }
813
814 /// scheduler is located on universe.rank==0
816 long number=0;
817 if (subworld.rank()==0) {
819 number=r.get();
820 }
821 subworld.gop.broadcast_serializable(number, 0);
822 subworld.gop.fence();
823 return number;
824
825 }
826
829 std::lock_guard<std::mutex> lock(taskq_mutex);
830
831 auto is_Waiting = [](const std::shared_ptr<MacroTaskBase>& mtb_ptr) {return mtb_ptr->is_waiting();};
832 auto it=std::find_if(taskq.begin(),taskq.end(),is_Waiting);
833 if (it!=taskq.end()) {
834 it->get()->set_running();
835 long element=it-taskq.begin();
836 return element;
837 }
838// print("could not find task to schedule");
839 return -1;
840 }
841
842 /// scheduler is located on rank==0
843 void set_complete(const long task_number) const {
844 this->task(ProcessID(0), &MacroTaskQ::set_complete_local, task_number);
845 }
846
847 /// scheduler is located on rank==0
848 void set_complete_local(const long task_number) const {
850 taskq[task_number]->set_complete();
851 }
852
853public:
862private:
863 std::size_t size() const {
864 return taskq.size();
865 }
866
867};
868
869
870template<typename taskT>
873
874 template<typename Q>
875 struct is_vector : std::false_type {
876 };
877 template<typename Q>
878 struct is_vector<std::vector<Q>> : std::true_type {
879 };
880
881 typedef typename taskT::resultT resultT;
882 typedef typename taskT::argtupleT argtupleT;
884
885 taskT task;
886 bool debug=false;
889 std::shared_ptr<MacroTaskQ> taskq_ptr;
890
891public:
892
893 /// constructor takes the task, but no arguments to the task
894 explicit MacroTask(World &world, taskT &task)
897 }
898
899 /// constructor takes task and a taskq factory for customization, immediate execution
900 explicit MacroTask(World &world, taskT& task, const MacroTaskQFactory factory)
901 : MacroTask(world,task, std::shared_ptr<MacroTaskQ>(new MacroTaskQ(factory))) {
903 }
904
905 /// constructor takes the task, and a taskq, execution is not immediate
906 explicit MacroTask(World &world, taskT &task, std::shared_ptr<MacroTaskQ> taskq_ptr)
908
909 // someone might pass in a taskq nullptr
910 immediate_execution=false; // will be reset by the forwarding constructors
911 if (this->taskq_ptr==0) {
912 this->taskq_ptr=std::make_shared<MacroTaskQ>(MacroTaskQFactory(world));
914 }
915
916 if (debug) this->taskq_ptr->set_printlevel(20);
917 // set the cloud policies
918 auto cloud_storage_policy = MacroTaskInfo::to_cloud_storage_policy(this->taskq_ptr->get_policy().storage_policy);
919 this->taskq_ptr->cloud.set_storing_policy(cloud_storage_policy);
920 this->taskq_ptr->cloud.set_replication_policy(this->taskq_ptr->get_policy().cloud_distribution_policy);
921
922 }
923
924 MacroTask& set_debug(const bool value) {
925 debug=value;
926 return *this;
927 }
928
929 std::shared_ptr<MacroTaskQ> get_taskq() const {
930 return taskq_ptr;
931 }
932
933
934 /// this mimicks the original call to the task functor, called from the universe
935
936 /// store all input to the cloud, create output Function<T,NDIM> in the universe,
937 /// create the batched task and shove it into the taskq. Possibly execute the taskq.
938 template<typename ... Ts>
939 resultT operator()(const Ts &... args) {
940
941 auto argtuple = std::tie(args...);
942 static_assert(std::is_same<decltype(argtuple), argtupleT>::value, "type or number of arguments incorrect");
943
944 // partition the argument vector into batches
945 auto partitioner=task.partitioner;
946 if (not partitioner) partitioner.reset(new MacroTaskPartitioner);
947 partitioner->set_nsubworld(world.size());
948 partitionT partition = partitioner->partition_tasks(argtuple);
949
950 if (debug and world.rank()==0) print(taskq_ptr->get_policy());
951
952 recordlistT inputrecords = taskq_ptr->cloud.store(world, argtuple);
953 resultT result = task.allocator(world, argtuple);
954 auto outputrecords =prepare_output_records(taskq_ptr->cloud, result);
955
956 // create tasks and add them to the taskq
958 for (const auto& batch_prio : partition) {
959 vtask.push_back(
960 std::shared_ptr<MacroTaskBase>(new MacroTaskInternal(task, batch_prio, inputrecords, outputrecords)));
961 }
962 taskq_ptr->add_tasks(vtask);
963 if (immediate_execution) taskq_ptr->run_all();
964
965 return result;
966 }
967private:
968
969
970 /// store *pointers* to the result WorldObject in the cloud and return the recordlist
972 if constexpr (is_tuple<resultT>::value) {
973 static_assert(check_tuple_is_valid_task_result<resultT,0>(),
974 "tuple has invalid result type in prepare_output_records");
975 } else {
976 static_assert(is_valid_task_result_v<resultT>, "unknown result type in prepare_output_records");
977 }
978
979 if (debug) print("storing pointers to output in cloud");
980 // store an element of the tuple only
981 auto store_output_records = [&](const auto& result) {
982 recordlistT outputrecords;
983 typedef std::decay_t<decltype(result)> argT;
984 if constexpr (is_madness_function<argT>::value) {
985 outputrecords += cloud.store(world, result.get_impl().get()); // store pointer to FunctionImpl
986 } else if constexpr (is_madness_function_vector<argT>::value) {
987 outputrecords += cloud.store(world, get_impl(result));
988 } else if constexpr (is_scalar_result<argT>::value) {
989 outputrecords += cloud.store(world, result.get_impl()); // store pointer to ScalarResultImpl
990 } else if constexpr (is_vector<argT>::value) {
992 // argT = std::vector<ScalarResult<T>>
993 std::vector<std::shared_ptr<typename argT::value_type::implT>> v;
994 for (const auto& ptr : result) v.push_back(ptr.get_impl());
995 outputrecords+=cloud.store(world,v);
996 } else {
997 MADNESS_EXCEPTION("\n\n unknown vector result type in prepare_input ", 1);
998 }
999 } else {
1000 MADNESS_EXCEPTION("should not be here",1);
1001 }
1002 return outputrecords;
1003 };
1004
1005 recordlistT outputrecords;
1006 if constexpr (is_tuple<resultT>::value) {
1007 // loop over tuple elements -- args is the individual tuple element
1008 std::apply([&](auto &&... args) {
1009 (( outputrecords+=store_output_records(args) ), ...);
1010 }, result);
1011 } else {
1012 outputrecords=store_output_records(result);
1013 }
1014 return outputrecords;
1015 }
1016
1017
1018 class MacroTaskInternal : public MacroTaskIntermediate<MacroTask> {
1019
1020 typedef decay_tuple<typename taskT::argtupleT> argtupleT; // removes const, &, etc
1021 typedef typename taskT::resultT resultT;
1024 public:
1025 taskT task;
1026 std::string get_name() const {
1027 if (task.name=="unknown_task") return typeid(task).name();
1028 return task.name;
1029 }
1030
1031 MacroTaskInternal(const taskT &task, const std::pair<Batch,double> &batch_prio,
1034 if constexpr (is_tuple<resultT>::value) {
1035 static_assert(check_tuple_is_valid_task_result<resultT,0>(),
1036 "tuple has invalid result type in prepare_output_records");
1037 } else {
1038 static_assert(is_valid_task_result_v<resultT>, "unknown result type in prepare_output_records");
1039 }
1040 this->task.batch=batch_prio.first;
1041 this->priority=batch_prio.second;
1042 }
1043
1044
1045 void print_me(std::string s="") const override {
1046 print("this is task",get_name(),"with batch", task.batch,"priority",this->get_priority());
1047 }
1048
1049 void print_me_as_table(std::string s="") const override {
1050 std::stringstream ss;
1051 std::string name=get_name();
1052 std::size_t namesize=std::min(std::size_t(28),name.size());
1053 name += std::string(28-namesize,' ');
1054
1055 std::stringstream ssbatch;
1056 ssbatch << task.batch;
1057 std::string strbatch=ssbatch.str();
1058 int nspaces=std::max(int(0),35-int(ssbatch.str().size()));
1059 strbatch+=std::string(nspaces,' ');
1060
1061 ss << name
1062 << std::setw(10) << strbatch
1064 print(ss.str());
1065 }
1066
1067 /// accumulate the result of the task into the final result living in the universe
1068 template<typename resultT1, std::size_t I=0>
1069 typename std::enable_if<is_tuple<resultT1>::value, void>::type
1070 accumulate_into_final_result(World &subworld, resultT1 &final_result, const resultT1 &tmp_result, const argtupleT& argtuple) {
1071 if constexpr(I < std::tuple_size_v<resultT1>) {
1072 using elementT = typename std::tuple_element<I, resultT>::type;// use decay types for determining a vector
1073 auto element_final=std::get<I>(final_result);
1074 auto element_tmp=std::get<I>(tmp_result);
1075 accumulate_into_final_result<elementT>(subworld, element_final, element_tmp, argtuple);
1076 accumulate_into_final_result<resultT1,I+1>(subworld, final_result, tmp_result, argtuple);
1077 }
1078 }
1079
1080 /// accumulate the result of the task into the final result living in the universe
1081 template<typename resultT1>
1082 typename std::enable_if<not is_tuple<resultT1>::value, void>::type
1083 accumulate_into_final_result(World &subworld, resultT1 &result, const resultT1 &result_tmp, const argtupleT& argtuple) {
1085 // gaxpy can be done in reconstructed or compressed mode
1086 TreeState operating_state=result_tmp.get_impl()->get_tensor_type()==TT_FULL ? compressed : reconstructed;
1087 result_tmp.change_tree_state(operating_state);
1088 gaxpy(1.0,result,1.0, result_tmp);
1089 } else if constexpr(is_madness_function_vector<resultT1>::value) {
1090 TreeState operating_state=result_tmp[0].get_impl()->get_tensor_type()==TT_FULL ? compressed : reconstructed;
1091 change_tree_state(result_tmp,operating_state);
1092 // compress(subworld, result_tmp);
1093 // resultT1 tmp1=task.allocator(subworld,argtuple);
1094 // tmp1=task.batch.template insert_result_batch(tmp1,result_tmp);
1095 gaxpy(1.0,result,1.0,result_tmp,false);
1096 // was using operator+=, but this requires a fence, which is not allowed here..
1097 // result += tmp1;
1098 } else if constexpr (is_scalar_result<resultT1>::value) {
1099 gaxpy(1.0, result, 1.0, result_tmp.get_local(), false);
1100 } else if constexpr (is_scalar_result_vector<resultT1>::value) {
1101 // resultT1 tmp1=task.allocator(subworld,argtuple);
1102 // tmp1=task.batch.template insert_result_batch(tmp1,result_tmp);
1103 std::size_t sz=result.size();
1104 for (size_t i=0; i<sz; ++i) {
1105 gaxpy(1.0, result[i], 1.0, result_tmp[i].get_local(), false);
1106 }
1107 }
1108
1109 }
1110
1111 /// called by the MacroTaskQ when the task is scheduled
1112 void run(World &subworld, Cloud &cloud, MacroTaskBase::taskqT &taskq, const long element, const bool debug,
1113 const MacroTaskInfo policy) override {
1114 io_redirect io(element,get_name()+"_task",debug);
1115 const argtupleT argtuple = cloud.load<argtupleT>(subworld, inputrecords);
1116 argtupleT batched_argtuple = task.batch.copy_input_batch(argtuple);
1117
1118 std::string msg="";
1119 // maybe move this block to the cloud?
1121 double cpu0=wall_time();
1122 Cloud::cloudtimer timer(subworld,cloud.copy_time);
1123 // the functions loaded from the cloud are pointers to the universe functions,
1124 // retrieve the function coefficients from the universe
1125 // aka: turn the current shallow copy into a deep copy
1126 if (debug) print("loading function coefficients from universe for task",get_name());
1127
1128 // loop over the tuple -- copy the functions from the universe to the subworld
1129 auto copi = [&](auto& arg) {
1130 typedef std::decay_t<decltype(arg)> argT;
1131 if constexpr (is_madness_function<argT>::value) {
1132 arg=copy(subworld, arg);
1133 } else if constexpr (is_madness_function_vector<argT>::value) {
1134 for (auto& f : arg) f=copy(subworld,f);
1135 }
1136 };
1137
1138 unary_tuple_loop(batched_argtuple,copi);
1139 double cpu1=wall_time();
1140 if (debug) {
1141 io_redirect_cout io2;
1142 print("copied coefficients for task",get_name(),"in",cpu1-cpu0,"seconds");
1143 }
1144 }
1145
1146 try {
1147 print("starting task no",element, ", '",get_name(),"', in subworld",subworld.id(),"at time",wall_time());
1148 double cpu0=cpu_time();
1149 task.subworld_ptr=&subworld; // give the task access to the subworld
1150 resultT result_batch = std::apply(task, batched_argtuple); // lives in the subworld, is a batch of the full vector (if applicable)
1151 double cpu1=cpu_time();
1152 constexpr std::size_t bufsize=256;
1153 char buffer[bufsize];
1154 std::snprintf(buffer,bufsize,"completed task %3ld after %6.1fs at time %6.1fs\n",element,cpu1-cpu0,wall_time());
1155 print(std::string(buffer));
1156
1157 // move the result from the batch to the final result, all still in subworld
1158 auto insert_batch = [&](auto& element1, auto& element2) {
1159 typedef std::decay_t<decltype(element1)> decay_type;;
1160 if constexpr (is_vector<decay_type>::value) {
1161 element1=task.batch.insert_result_batch(element1,element2);
1162 } else {
1163 std::swap(element1,element2);
1164 }
1165 };
1166 resultT result_subworld=task.allocator(subworld,argtuple);
1167 if constexpr (is_tuple<resultT>::value) {
1168 binary_tuple_loop(result_subworld, result_batch, insert_batch);
1169 } else {
1170 insert_batch(result_subworld,result_batch);
1171 }
1172
1173 // accumulate the subworld-local results into the final, universe result
1174 resultT result_universe=get_output(subworld, cloud); // lives in the universe
1175
1176 accumulate_into_final_result<resultT>(subworld, result_universe, result_subworld, argtuple);
1177
1178 } catch (std::exception& e) {
1179 print("failing task no",element,"in subworld",subworld.id(),"at time",wall_time());
1180 print(e.what());
1182 print("\n\n");
1183 MADNESS_EXCEPTION("failing task",1);
1184 }
1185
1186 };
1187
1188 // this is called after all tasks have been executed and the taskq has ended
1189 void cleanup() override {
1190 }
1191
1192 template<typename T, std::size_t NDIM>
1193 static Function<T,NDIM> pointer2WorldObject(const std::shared_ptr<FunctionImpl<T,NDIM>> impl) {
1194 Function<T,NDIM> result;
1195 result.set_impl(impl);
1196 return result;
1197 }
1198
1199 template<typename T, std::size_t NDIM>
1200 static std::vector<Function<T,NDIM>> pointer2WorldObject(const std::vector<std::shared_ptr<FunctionImpl<T,NDIM>>> v_impl) {
1201 std::vector<Function<T,NDIM>> vresult;
1202 vresult.resize(v_impl.size());
1203 set_impl(vresult,v_impl);
1204 return vresult;
1205 }
1206
1207 template<typename T>
1208 static ScalarResult<T> pointer2WorldObject(const std::shared_ptr<ScalarResultImpl<T>> sr_impl) {
1209 return ScalarResult(sr_impl);
1210 }
1211
1212 template<typename T>
1213 static std::vector<ScalarResult<T>> pointer2WorldObject(const std::vector<std::shared_ptr<ScalarResultImpl<T>>> v_sr_impl) {
1214 std::vector<ScalarResult<T>> vresult(v_sr_impl.size());
1215 for (size_t i=0; i<v_sr_impl.size(); ++i) {
1216 vresult[i].set_impl(v_sr_impl[i]);
1217 }
1218 return vresult;
1219 }
1220
1221 /// return the WorldObjects or the result functions living in the universe
1222
1223 /// read the pointers to the universe WorldObjects from the cloud,
1224 /// convert them to actual WorldObjects and return them
1225 resultT get_output(World &subworld, Cloud &cloud) const {
1226 resultT result;
1227
1228 // save outputrecords, because they will be consumed by the cloud
1229 auto outputrecords1 = this->outputrecords;
1230
1231 // turn an element of the tuple of pointers into an element of the tuple of WorldObjects
1232 auto doit = [&](auto& element) {
1233 typedef std::decay_t<decltype(element)> elementT;
1234
1235 // load the elements from the cloud -- they contain pointers to WorldObjects
1237 typedef typename elementT::value_type::implT implT;
1238 auto ptr_element = cloud.consuming_load<std::vector<std::shared_ptr<implT>>>(
1239 subworld, outputrecords1);
1240 element = pointer2WorldObject(ptr_element);
1241 }
1242 else if constexpr (is_madness_function<elementT>::value) {
1243 typedef typename elementT::implT implT;
1244 auto ptr_element = cloud.consuming_load<std::shared_ptr<implT>>(subworld, outputrecords1);
1245 element = pointer2WorldObject(ptr_element);
1246 }
1247 else if constexpr (is_scalar_result_vector<elementT>::value) { // std::vector<ScalarResult<T>>
1248 typedef typename elementT::value_type ScalarResultT;
1249 typedef typename ScalarResultT::implT implT;
1250 typedef std::vector<std::shared_ptr<implT>> vptrT;
1251 auto ptr_element = cloud.consuming_load<vptrT>(subworld, outputrecords1);
1252 element = pointer2WorldObject(ptr_element);
1253 }
1254 else if constexpr (is_scalar_result<elementT>::value) {
1255 // elementT is a ScalarResultImpl<T>
1256 // in cloud we store a std::shared_ptr<ScalarResultImpl<T>>
1257 auto ptr_element = cloud.consuming_load<std::shared_ptr<typename elementT::implT>>(subworld, outputrecords1);
1258 element = pointer2WorldObject(ptr_element);
1259 }
1260 else {
1261 MADNESS_EXCEPTION("confused about the type of the result", 1);
1262 }
1263 };
1264 if constexpr (is_tuple<resultT>::value) {
1265 static_assert(check_tuple_is_valid_task_result<resultT, 0>(),
1266 "invalid tuple task result -- must be vectors of functions");
1267 static_assert(is_tuple<resultT>::value, "is a tuple");
1268
1269 // loop over all tuple elements
1270 // 1. load the pointers to the WorldObjects living in the universe
1271 // 2. create WorldObjects from the pointers and copy them into the tuple of type resultT
1272
1273 // turn the tuple of pointers into a tuple of WorldObjects
1274 unary_tuple_loop(result,doit);
1275
1276
1277 } else {
1278 doit(result);
1279
1280 }
1281 return result;
1282 }
1283
1284 };
1285
1286};
1287
1289public:
1292 std::string name="unknown_task";
1293 std::shared_ptr<MacroTaskPartitioner> partitioner=0;
1295};
1296
1297
1298} /* namespace madness */
1299
1300#endif /* SRC_MADNESS_MRA_MACROTASKQ_H_ */
Wrapper around MPI_Comm. Has a shallow copy constructor; use Create(Get_group()) for deep copy.
Definition safempi.h:497
Intracomm Split(int Color, int Key=0) const
Definition safempi.h:642
a batch consists of a 2D-input batch and a 1D-output batch: K-batch <- (I-batch, J-batch)
Definition macrotaskpartitioner.h:124
cloud class
Definition cloud.h:178
void clear()
Definition cloud.h:476
void replicate_per_node(const std::size_t chunk_size=INT_MAX)
Definition cloud.h:614
nlohmann::json get_statistics(World &world) const
return a json object with the cloud settings and statistics
Definition cloud.h:326
recordlistT store(madness::World &world, const T &source)
Definition cloud.h:574
Recordlist< keyT > recordlistT
Definition cloud.h:192
std::atomic< long > target_replication_time
Definition cloud.h:701
nlohmann::json gather_timings(World &universe) const
Definition cloud.h:381
void replicate(const std::size_t chunk_size=INT_MAX)
Definition cloud.h:634
void print_size(World &universe)
Definition cloud.h:300
T load(madness::World &world, const recordlistT recordlist) const
load a single object from the cloud, recordlist is kept unchanged
Definition cloud.h:534
std::list< WorldObjectBase * > world_object_base_list
Definition cloud.h:230
static void print_memory_statistics(const nlohmann::json stats)
Definition cloud.h:450
DistributionType get_replication_policy() const
is the cloud container replicated: per rank, per node, or distributed
Definition cloud.h:275
void clear_cache(World &subworld)
Definition cloud.h:470
std::atomic< long > copy_time
Definition cloud.h:700
void set_replication_policy(const DistributionType value)
is the cloud container replicated: per rank, per node, or distributed
Definition cloud.h:268
void distribute_targets(const DistributionType dt=Distributed)
distribute/node/rank replicate the targets of all world objects stored in the cloud
Definition cloud.h:517
void print_timings(World &universe) const
backwards compatibility
Definition cloud.h:420
void set_storing_policy(const StoragePolicy value)
storing policy refers to storing functions or pointers to functions
Definition cloud.h:291
StoragePolicy
Definition cloud.h:194
@ StoreFunctionPointer
Definition cloud.h:197
@ StoreFunction
Definition cloud.h:195
T consuming_load(madness::World &world, recordlistT &recordlist) const
similar to load, but will consume the recordlist
Definition cloud.h:547
static void set_default_pmap(World &world)
Definition mraimpl.h:3568
static std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > & get_pmap()
Returns the default process map that was last initialized via set_default_pmap()
Definition funcdefaults.h:401
static void set_pmap(const std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > &value)
Sets the default process map (does not redistribute existing functions)
Definition funcdefaults.h:432
FunctionImpl holds all Function state to facilitate shallow copy semantics.
Definition funcimpl.h:945
A multiresolution adaptive numerical function.
Definition mra.h:139
void set_impl(const std::shared_ptr< FunctionImpl< T, NDIM > > &impl)
Replace current FunctionImpl with provided new one.
Definition mra.h:666
A future is a possibly yet unevaluated value.
Definition future.h:369
T & get(bool dowork=true) &
Gets the value, waiting if necessary.
Definition future.h:570
base class
Definition macrotaskq.h:468
void set_running()
Definition macrotaskq.h:480
virtual ~MacroTaskBase()
Definition macrotaskq.h:474
void set_waiting()
Definition macrotaskq.h:481
MacroTaskBase()
Definition macrotaskq.h:473
virtual void cleanup()=0
virtual void print_me(std::string s="") const
Definition macrotaskq.h:491
std::string print_priority_and_status_to_string() const
Definition macrotaskq.h:497
void set_complete()
Definition macrotaskq.h:479
double priority
Definition macrotaskq.h:476
bool is_complete() const
Definition macrotaskq.h:483
Status
Definition macrotaskq.h:477
@ Complete
Definition macrotaskq.h:477
@ Running
Definition macrotaskq.h:477
@ Unknown
Definition macrotaskq.h:477
@ Waiting
Definition macrotaskq.h:477
bool is_running() const
Definition macrotaskq.h:484
enum madness::MacroTaskBase::Status stat
double get_priority() const
Definition macrotaskq.h:503
void set_priority(const double p)
Definition macrotaskq.h:504
virtual void print_me_as_table(std::string s="") const
Definition macrotaskq.h:494
virtual void run(World &world, Cloud &cloud, taskqT &taskq, const long element, const bool debug, const MacroTaskInfo policy)=0
std::vector< std::shared_ptr< MacroTaskBase > > taskqT
Definition macrotaskq.h:471
friend std::ostream & operator<<(std::ostream &os, const MacroTaskBase::Status s)
Definition macrotaskq.h:506
bool is_waiting() const
Definition macrotaskq.h:485
Definition macrotaskq.h:517
MacroTaskIntermediate()
Definition macrotaskq.h:521
void cleanup()
Definition macrotaskq.h:525
~MacroTaskIntermediate()
Definition macrotaskq.h:523
Definition macrotaskq.h:1288
MacroTaskOperationBase()
Definition macrotaskq.h:1294
Batch batch
Definition macrotaskq.h:1290
World * subworld_ptr
Definition macrotaskq.h:1291
std::shared_ptr< MacroTaskPartitioner > partitioner
Definition macrotaskq.h:1293
std::string name
Definition macrotaskq.h:1292
partition one (two) vectors into 1D (2D) batches.
Definition macrotaskpartitioner.h:182
std::list< std::pair< Batch, double > > partitionT
Definition macrotaskpartitioner.h:186
Factory for the MacroTaskQ.
Definition macrotaskq.h:530
MacroTaskQFactory & set_storage_policy(const MacroTaskInfo::StoragePolicy sp)
Definition macrotaskq.h:559
World & world
Definition macrotaskq.h:533
MacroTaskQFactory & set_policy(const MacroTaskInfo p)
Definition macrotaskq.h:554
MacroTaskQFactory & set_nworld(const long n)
Definition macrotaskq.h:540
MacroTaskQFactory & preset(const std::string name)
Definition macrotaskq.h:550
MacroTaskQFactory & set_cloud_distribution_policy(const DistributionType dp)
Definition macrotaskq.h:564
MacroTaskQFactory & set_printlevel(const long p)
Definition macrotaskq.h:545
long printlevel
Definition macrotaskq.h:532
MacroTaskQFactory & set_ptr_target_distribution_policy(const DistributionType dp)
Definition macrotaskq.h:569
long nworld
Definition macrotaskq.h:534
MacroTaskInfo policy
Definition macrotaskq.h:536
MacroTaskQFactory(World &universe)
Definition macrotaskq.h:538
Definition macrotaskq.h:578
long nsubworld
Definition macrotaskq.h:585
std::mutex taskq_mutex
Definition macrotaskq.h:583
bool printdebug() const
Definition macrotaskq.h:599
static void set_pmap(World &world)
Definition macrotaskq.h:854
bool printtimings_detail() const
Definition macrotaskq.h:602
void run_all()
run all tasks
Definition macrotaskq.h:659
void set_complete(const long task_number) const
scheduler is located on rank==0
Definition macrotaskq.h:843
World & universe
Definition macrotaskq.h:580
std::shared_ptr< WorldDCPmapInterface< Key< 1 > > > pmap1
set the process map for the subworld
Definition macrotaskq.h:592
std::shared_ptr< WorldDCPmapInterface< Key< 3 > > > pmap3
Definition macrotaskq.h:594
MacroTaskBase::taskqT taskq
Definition macrotaskq.h:582
MacroTaskInfo get_policy() const
Definition macrotaskq.h:611
nlohmann::json cloud_statistics
save cloud statistics after run_all()
Definition macrotaskq.h:586
void add_tasks(MacroTaskBase::taskqT &vtask)
Definition macrotaskq.h:791
long get_scheduled_task_number(World &subworld)
scheduler is located on universe.rank==0
Definition macrotaskq.h:815
void set_complete_local(const long task_number) const
scheduler is located on rank==0
Definition macrotaskq.h:848
World & get_subworld()
Definition macrotaskq.h:607
MacroTaskQ(const MacroTaskQFactory factory)
create an empty taskq and initialize the subworlds
Definition macrotaskq.h:624
std::shared_ptr< WorldDCPmapInterface< Key< 4 > > > pmap4
Definition macrotaskq.h:595
nlohmann::json get_taskq_statistics() const
Definition macrotaskq.h:619
std::size_t size() const
Definition macrotaskq.h:863
std::shared_ptr< WorldDCPmapInterface< Key< 5 > > > pmap5
Definition macrotaskq.h:596
void print_taskq() const
Definition macrotaskq.h:798
std::shared_ptr< World > subworld_ptr
Definition macrotaskq.h:581
bool printtimings() const
Definition macrotaskq.h:601
std::shared_ptr< WorldDCPmapInterface< Key< 2 > > > pmap2
Definition macrotaskq.h:593
bool printprogress() const
Definition macrotaskq.h:600
std::shared_ptr< WorldDCPmapInterface< Key< 6 > > > pmap6
Definition macrotaskq.h:597
nlohmann::json get_cloud_statistics() const
Definition macrotaskq.h:615
void set_printlevel(const long p)
Definition macrotaskq.h:609
long printlevel
Definition macrotaskq.h:584
madness::Cloud cloud
Definition macrotaskq.h:606
long get_nsubworld() const
Definition macrotaskq.h:608
void add_replicated_task(const std::shared_ptr< MacroTaskBase > &task)
Definition macrotaskq.h:810
~MacroTaskQ()
Definition macrotaskq.h:642
static std::shared_ptr< World > create_worlds(World &universe, const std::size_t nsubworld)
Definition macrotaskq.h:646
nlohmann::json taskq_statistics
save taskq statistics after run_all()
Definition macrotaskq.h:587
long get_scheduled_task_number_local()
Definition macrotaskq.h:827
const MacroTaskInfo policy
storage and distribution policy
Definition macrotaskq.h:589
Definition macrotaskq.h:1018
void run(World &subworld, Cloud &cloud, MacroTaskBase::taskqT &taskq, const long element, const bool debug, const MacroTaskInfo policy) override
called by the MacroTaskQ when the task is scheduled
Definition macrotaskq.h:1112
taskT task
Definition macrotaskq.h:1025
MacroTaskInternal(const taskT &task, const std::pair< Batch, double > &batch_prio, const recordlistT &inputrecords, const recordlistT &outputrecords)
Definition macrotaskq.h:1031
std::enable_if< notis_tuple< resultT1 >::value, void >::type accumulate_into_final_result(World &subworld, resultT1 &result, const resultT1 &result_tmp, const argtupleT &argtuple)
accumulate the result of the task into the final result living in the universe
Definition macrotaskq.h:1083
resultT get_output(World &subworld, Cloud &cloud) const
return the WorldObjects or the result functions living in the universe
Definition macrotaskq.h:1225
static ScalarResult< T > pointer2WorldObject(const std::shared_ptr< ScalarResultImpl< T > > sr_impl)
Definition macrotaskq.h:1208
decay_tuple< typename taskT::argtupleT > argtupleT
Definition macrotaskq.h:1020
static std::vector< Function< T, NDIM > > pointer2WorldObject(const std::vector< std::shared_ptr< FunctionImpl< T, NDIM > > > v_impl)
Definition macrotaskq.h:1200
void cleanup() override
Definition macrotaskq.h:1189
taskT::resultT resultT
Definition macrotaskq.h:1021
recordlistT outputrecords
Definition macrotaskq.h:1023
std::string get_name() const
Definition macrotaskq.h:1026
static Function< T, NDIM > pointer2WorldObject(const std::shared_ptr< FunctionImpl< T, NDIM > > impl)
Definition macrotaskq.h:1193
static std::vector< ScalarResult< T > > pointer2WorldObject(const std::vector< std::shared_ptr< ScalarResultImpl< T > > > v_sr_impl)
Definition macrotaskq.h:1213
void print_me_as_table(std::string s="") const override
Definition macrotaskq.h:1049
void print_me(std::string s="") const override
Definition macrotaskq.h:1045
std::enable_if< is_tuple< resultT1 >::value, void >::type accumulate_into_final_result(World &subworld, resultT1 &final_result, const resultT1 &tmp_result, const argtupleT &argtuple)
accumulate the result of the task into the final result living in the universe
Definition macrotaskq.h:1070
recordlistT inputrecords
Definition macrotaskq.h:1022
Definition macrotaskq.h:871
MacroTask & set_debug(const bool value)
Definition macrotaskq.h:924
MacroTask(World &world, taskT &task)
constructor takes the task, but no arguments to the task
Definition macrotaskq.h:894
MacroTask(World &world, taskT &task, const MacroTaskQFactory factory)
constructor takes task and a taskq factory for customization, immediate execution
Definition macrotaskq.h:900
taskT::resultT resultT
Definition macrotaskq.h:881
std::shared_ptr< MacroTaskQ > taskq_ptr
Definition macrotaskq.h:889
taskT::argtupleT argtupleT
Definition macrotaskq.h:882
bool immediate_execution
Definition macrotaskq.h:887
World & world
Definition macrotaskq.h:888
std::shared_ptr< MacroTaskQ > get_taskq() const
Definition macrotaskq.h:929
resultT operator()(const Ts &... args)
this mimicks the original call to the task functor, called from the universe
Definition macrotaskq.h:939
bool debug
Definition macrotaskq.h:886
taskT task
Definition macrotaskq.h:885
MacroTaskPartitioner::partitionT partitionT
Definition macrotaskq.h:872
Cloud::recordlistT recordlistT
Definition macrotaskq.h:883
recordlistT prepare_output_records(Cloud &cloud, resultT &result)
store pointers to the result WorldObject in the cloud and return the recordlist
Definition macrotaskq.h:971
MacroTask(World &world, taskT &task, std::shared_ptr< MacroTaskQ > taskq_ptr)
constructor takes the task, and a taskq, execution is not immediate
Definition macrotaskq.h:906
static std::map< MemKey, MemInfo > measure_and_print(World &world)
measure the memory usage of all objects of all worlds
Definition memory_measurement.h:24
helper class for returning the result of a task, which is not a madness Function, but a simple scalar
Definition macrotaskq.h:55
void gaxpy(const double a, const T &right, double b, const bool fence=true)
accumulate, optional fence
Definition macrotaskq.h:84
void serialize(Archive &ar)
Definition macrotaskq.h:92
T get()
after completion of the taskq get the final value
Definition macrotaskq.h:97
ScalarResultImpl< T > & operator=(const T &x)
simple assignment of the scalar value
Definition macrotaskq.h:73
ScalarResultImpl(const ScalarResultImpl &other)=delete
Disable the default copy constructor.
ScalarResultImpl< T > & operator=(const ScalarResultImpl< T > &other)=delete
disable assignment operator
ScalarResultImpl< T > & operator+=(const T &x)
Definition macrotaskq.h:78
~ScalarResultImpl()
Definition macrotaskq.h:69
T value
the scalar value
Definition macrotaskq.h:110
T get_local() const
Definition macrotaskq.h:104
T value_type
Definition macrotaskq.h:57
ScalarResultImpl(World &world)
Definition macrotaskq.h:58
Definition macrotaskq.h:114
void serialize(Archive &ar)
Definition macrotaskq.h:145
ScalarResult(const std::shared_ptr< implT > &impl)
Definition macrotaskq.h:121
void gaxpy(const double a, const T &right, double b, const bool fence=true)
accumulate, optional fence
Definition macrotaskq.h:140
ScalarResultImpl< T > implT
Definition macrotaskq.h:116
T get()
after completion of the taskq get the final value
Definition macrotaskq.h:150
ScalarResult(World &world)
Definition macrotaskq.h:120
std::shared_ptr< implT > impl
Definition macrotaskq.h:117
ScalarResult & operator=(const T &x)
Definition macrotaskq.h:122
std::shared_ptr< implT > get_impl() const
Definition macrotaskq.h:127
void set_impl(const std::shared_ptr< implT > &newimpl)
Definition macrotaskq.h:131
T get_local() const
after completion of the taskq get the final value
Definition macrotaskq.h:155
uniqueidT id() const
Definition macrotaskq.h:135
void broadcast_serializable(objT &obj, ProcessID root)
Broadcast a serializable object.
Definition worldgop.h:756
void fence(bool debug=false)
Synchronizes all processes in communicator AND globally ensures no pending AM or tasks.
Definition worldgop.cc:161
bool set_forbid_fence(bool value)
Set forbid_fence flag to new value and return old value.
Definition worldgop.h:676
void sum(T *buf, size_t nelem)
Inplace global sum while still processing AM & tasks.
Definition worldgop.h:872
SafeMPI::Intracomm & comm()
Returns the associated SafeMPI communicator.
Definition worldmpi.h:286
Implements most parts of a globally addressable object (via unique ID).
Definition world_object.h:366
World & get_world() const
Returns a reference to the world.
Definition world_object.h:719
World & world
The World this object belongs to. (Think globally, act locally).
Definition world_object.h:383
void process_pending()
To be called from derived constructor to process pending messages.
Definition world_object.h:658
detail::task_result_type< memfnT >::futureT send(ProcessID dest, memfnT memfn) const
Definition world_object.h:733
detail::task_result_type< memfnT >::futureT task(ProcessID dest, memfnT memfn, const TaskAttributes &attr=TaskAttributes()) const
Sends task to derived class method returnT (this->*memfn)().
Definition world_object.h:1007
A parallel world class.
Definition world.h:132
static World * world_from_id(std::uint64_t id)
Convert a World ID to a World pointer.
Definition world.h:492
ProcessID rank() const
Returns the process rank in this World (same as MPI_Comm_rank()).
Definition world.h:320
WorldMpiInterface & mpi
MPI interface.
Definition world.h:204
ProcessID size() const
Returns the number of processes in this World (same as MPI_Comm_size()).
Definition world.h:330
unsigned long id() const
Definition world.h:315
WorldGopInterface & gop
Global operations.
Definition world.h:207
std::optional< T * > ptr_from_id(uniqueidT id) const
Look up a local pointer from a world-wide unique ID.
Definition world.h:416
Class for unique global IDs.
Definition uniqueid.h:53
Declares the Cloud class for storing data and transfering them between worlds.
char * p(char *buf, const char *name, int k, int initial_level, double thresh, int order)
Definition derivatives.cc:72
const std::size_t bufsize
Definition derivatives.cc:16
static bool debug
Definition dirac-hatom.cc:16
auto T(World &world, response_space &f) -> response_space
Definition global_functions.cc:28
Tensor< typename Tensor< T >::scalar_type > arg(const Tensor< T > &t)
Return a new tensor holding the argument of each element of t (complex types only)
Definition tensor.h:2518
static const double v
Definition hatom_sf_dirac.cc:20
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
#define MADNESS_CHECK_THROW(condition, msg)
Check a condition — even in a release build the condition is always evaluated so it can have side eff...
Definition madness_exception.h:207
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
std::vector< ScalarResult< T > > scalar_result_vector(World &world, std::size_t n)
helper function to create a vector of ScalarResultImpl, circumventing problems with the constructors
Definition macrotaskq.h:163
std::ostream & operator<<(std::ostream &os, const particle< PDIM > &p)
Definition lowrankfunction.h:401
static double cpu_time()
Returns the cpu time in seconds relative to an arbitrary origin.
Definition timers.h:127
DistributionType
some introspection of how data is distributed
Definition worlddc.h:81
@ NodeReplicated
even if there are several ranks per node
Definition worlddc.h:84
@ Distributed
no replication of the container, the container is distributed over the world
Definition worlddc.h:82
@ RankReplicated
replicate the container over all world ranks
Definition worlddc.h:83
void set_impl(std::vector< Function< T, NDIM > > &v, const std::vector< std::shared_ptr< FunctionImpl< T, NDIM > > > vimpl)
Definition vmra.h:712
std::vector< std::shared_ptr< FunctionImpl< T, NDIM > > > get_impl(const std::vector< Function< T, NDIM > > &v)
Definition vmra.h:705
TreeState
Definition funcdefaults.h:59
@ reconstructed
s coeffs at the leaves only
Definition funcdefaults.h:60
@ compressed
d coeffs in internal nodes, s and d coeffs at the root
Definition funcdefaults.h:61
constexpr bool check_tuple_is_valid_task_result()
given a tuple check recursively if all elements are valid task results
Definition macrotaskq.h:222
static const Slice _(0,-1, 1)
static void binary_tuple_loop(tupleT &tuple1, tupleR &tuple2, opT &op)
loop over the tuple elements of both tuples and execute the operation op on each element pair
Definition type_traits.h:742
void print(const T &t, const Ts &... ts)
Print items to std::cout (items separated by spaces) and terminate with a new line.
Definition print.h:226
static void unary_tuple_loop(tupleT &tuple, opT &op)
loop over a tuple and apply unary operator op to each element
Definition type_traits.h:732
@ TT_FULL
Definition gentensor.h:120
NDIM & f
Definition mra.h:2543
constexpr bool is_valid_task_result_v
check if type is a valid task result: it must be a WorldObject and must implement gaxpy
Definition macrotaskq.h:208
const Function< T, NDIM > & change_tree_state(const Function< T, NDIM > &f, const TreeState finalstate, bool fence=true)
change tree state of a function
Definition mra.h:2869
decltype(decay_types(std::declval< T >())) decay_tuple
Definition macrotaskpartitioner.h:22
double wall_time()
Returns the wall time in seconds relative to an arbitrary origin.
Definition timers.cc:48
std::string type(const PairType &n)
Definition PNOParameters.h:18
constexpr Vector< T, sizeof...(Ts)+1 > vec(T t, Ts... ts)
Factory function for creating a madness::Vector.
Definition vector.h:750
std::string name(const FuncType &type, const int ex=-1)
Definition ccpairfunction.h:28
Function< T, NDIM > copy(const Function< T, NDIM > &f, const std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > &pmap, bool fence=true)
Create a new copy of the function with different distribution and optional fence.
Definition mra.h:2111
void gaxpy(const double a, ScalarResult< T > &left, const double b, const T &right, const bool fence=true)
the result type of a macrotask must implement gaxpy
Definition macrotaskq.h:244
Definition mraimpl.h:51
static const double b
Definition nonlinschro.cc:119
static const double a
Definition nonlinschro.cc:118
Definition cloud.h:720
Definition macrotaskq.h:280
friend std::string to_string(const MacroTaskInfo::StoragePolicy sp)
Definition macrotaskq.h:432
StoragePolicy
Definition macrotaskq.h:281
@ StoreFunction
store a madness function in the cloud – can have a large memory impact
Definition macrotaskq.h:282
@ StorePointerToFunction
Definition macrotaskq.h:283
@ StoreFunctionViaPointer
coefficients to the subworlds when the task is started. This is the default policy.
Definition macrotaskq.h:286
static std::vector< MacroTaskInfo > get_all_presets()
helper function to return all presets
Definition macrotaskq.h:343
StoragePolicy storage_policy
Definition macrotaskq.h:420
DistributionType ptr_target_distribution_policy
Definition macrotaskq.h:422
static MacroTaskInfo preset(const std::string name)
Definition macrotaskq.h:313
static std::vector< std::string > get_all_preset_names()
Definition macrotaskq.h:338
static Cloud::StoragePolicy to_cloud_storage_policy(MacroTaskInfo::StoragePolicy policy)
given the MacroTask's storage policy return the corresponding Cloud storage policy
Definition macrotaskq.h:298
DistributionType cloud_distribution_policy
Definition macrotaskq.h:421
friend std::ostream & operator<<(std::ostream &os, const MacroTaskInfo policy)
Definition macrotaskq.h:424
bool check_consistency() const
make sure the policies are consistent
Definition macrotaskq.h:352
friend std::ostream & operator<<(std::ostream &os, const StoragePolicy sp)
Definition macrotaskq.h:290
static StoragePolicy policy_to_string(const std::string policy)
Definition macrotaskq.h:438
nlohmann::json to_json() const
Definition macrotaskq.h:448
void from_vector_of_strings(const std::vector< std::string > &vec)
set policy from a vector of strings, assuming the order is storage policy, cloud distribution policy,...
Definition macrotaskq.h:373
Definition macrotaskq.h:875
Default load of an object via serialize(ar, t).
Definition archive.h:667
Default store of an object via serialize(ar, t).
Definition archive.h:612
static std::string tolower(std::string s)
make lower case
Definition commandlineparser.h:86
class to temporarily redirect output to cout
Definition print.h:277
RAII class to redirect cout to a file.
Definition print.h:251
Definition type_traits.h:756
Definition mra.h:2917
Definition macrotaskq.h:193
Definition macrotaskq.h:178
Definition macrotaskq.h:172
Definition macrotaskq.h:199
Definition macrotaskq.h:187
Definition macrotaskq.h:217
static void load(const Archive &ar, std::shared_ptr< ScalarResultImpl< T > > &ptr)
Definition macrotaskq.h:260
static void store(const Archive &ar, const std::shared_ptr< ScalarResultImpl< T > > &ptr)
Definition macrotaskq.h:250
Definition timing_utilities.h:9
static const double_complex I
Definition tdse1d.cc:164
void doit(World &world)
Definition tdse.cc:921
void e()
Definition test_sig.cc:75
Declares the World class for the parallel runtime environment.
int ProcessID
Used to clearly identify process number/rank.
Definition worldtypes.h:43