MADNESS 0.10.1
macrotaskq.h
Go to the documentation of this file.
1/**
2 \file macrotaskq.h
3 \brief Declares the \c macrotaskq and MacroTaskBase classes
4 \ingroup mra
5
6 A MacroTaskq executes tasks on World objects, e.g. differentiation of a function or other
7 arithmetic. Complex algorithms can be implemented.
8
9 The universe world is split into subworlds, each of them executing macrotasks of the task queue.
10 This improves locality and speedups for large number of compute nodes, by reducing communications
11 within worlds.
12
13 The user defines a macrotask (an example is found in test_vectormacrotask.cc), the tasks are
14 lightweight and carry only bookkeeping information, actual input and output are stored in a
15 cloud (see cloud.h)
16
17 The user-defined macrotask is derived from MacroTaskIntermediate and must implement the run()
18 method. A heterogeneous task queue is possible.
19
20 TODO: priority q
21 TODO: task submission from inside task (serialize task instead of replicate)
22 TODO: update documentation
23 TODO: consider serializing task member variables
24
25*/
26
27
28
29#ifndef SRC_MADNESS_MRA_MACROTASKQ_H_
30#define SRC_MADNESS_MRA_MACROTASKQ_H_
31
32#include <madness/world/cloud.h>
33#include <madness/world/world.h>
35
36namespace madness {
37
38/// helper class for returning the result of a task, which is not a madness Function, but a simple scalar
39
40/// the result value is accumulated via gaxpy in universe rank=0, after completion of the taskq the final
41/// value can be obtained via get(), which includes a broadcast of the final value to all processes
42template<typename T=double>
43class ScalarResult : public WorldObject<ScalarResult<T>> {
44public:
45 typedef T value_type;
49
50 /// Disable the default copy constructor
51 ScalarResult<T>(const ScalarResult<T>& other) = delete;
52 ScalarResult<T>(ScalarResult<T>&& ) = default;
54
55 /// disable assignment operator
56 ScalarResult<T>& operator=(const ScalarResult<T>& other) = delete;
57
59// print("calling destructor of ScalarResult",this->id());
60// std::cout << std::flush;
61 }
62
63 /// simple assignment of the scalar value
65 value = x;
66 return *this;
67 }
68
70 gaxpy(1.0, x, 1.0,true);
71 return *this;
72 }
73
74 /// accumulate, optional fence
75 void gaxpy(const double a, const T& right, double b, const bool fence=true) {
76 if (this->get_world().rank()==0) {
77 value =a*value + b * right;
78 }
79 else this->send(0, &ScalarResult<T>::gaxpy, a, right, b, fence);
80 }
81
82 template<typename Archive>
83 void serialize(Archive &ar) {
84 ar & value;
85 }
86
87 /// after completion of the taskq get the final value
88 T get() {
89 this->get_world().gop.broadcast_serializable(*this, 0);
90 return value;
91 }
92
93 /// get the local value of this rank, which might differ for different ranks
94 /// for the final value use get()
95 T get_local() const {
96 return value;
97 }
98
99private:
100 /// the scalar value
102};
103
104/// helper function to create a vector of ScalarResult, circumventing problems with the constructors
105template<typename T>
106std::vector<std::shared_ptr<ScalarResult<T>>> scalar_result_shared_ptr_vector(World& world, std::size_t n) {
107 auto v=std::vector<std::shared_ptr<ScalarResult<T>>>();
108 for (std::size_t i=0; i<n; ++i) v.emplace_back(std::make_shared<ScalarResult<T>>(world));
109// for (int i=0; i<n; ++i) print("creating vector of ScalarResult",v[i]->id());
110 std::cout << std::flush;
111 auto ptr_opt = world.ptr_from_id< WorldObject< ScalarResult<T> > >(v[0]->id());
112 if (!ptr_opt)
113 MADNESS_EXCEPTION("ScalarResult: remote operation attempting to use a locally uninitialized object",0);
114 auto ptr = static_cast< ScalarResult<T>*>(*ptr_opt);
115 if (!ptr)
116 MADNESS_EXCEPTION("ScalarResult<T> operation attempting to use an unregistered object",0);
117 return v;
118}
119
120
121// type traits to check if a template parameter is a WorldContainer
122template<typename>
123struct is_scalar_result_ptr : std::false_type {};
124
125template <typename T>
126struct is_scalar_result_ptr<std::shared_ptr<madness::ScalarResult<T>>> : std::true_type {};
127
128template<typename>
129struct is_scalar_result_ptr_vector : std::false_type {
130};
131
132template<typename T>
133struct is_scalar_result_ptr_vector<std::vector<std::shared_ptr<typename madness::ScalarResult<T>>>> : std::true_type {
134};
135
136
137
138/// the result type of a macrotask must implement gaxpy
139template<typename T>
140void gaxpy(const double a, ScalarResult<T>& left, const double b, const T& right, const bool fence=true) {
141 left.gaxpy(a, right, b, fence);
142}
143
144template <class Archive, typename T>
145struct madness::archive::ArchiveStoreImpl<Archive, std::shared_ptr<ScalarResult<T>>> {
146 static void store(const Archive& ar, const std::shared_ptr<ScalarResult<T>>& ptr) {
147 bool exists=(ptr) ? true : false;
148 ar & exists;
149 if (exists) ar & ptr->id();
150 }
151};
152
153
154template <class Archive, typename T>
155struct madness::archive::ArchiveLoadImpl<Archive, std::shared_ptr<ScalarResult<T>>> {
156 static void load(const Archive& ar, std::shared_ptr<ScalarResult<T>>& ptr) {
157 bool exists=false;
158 ar & exists;
159 if (exists) {
160 uniqueidT id;
161 ar & id;
162 World* world = World::world_from_id(id.get_world_id());
163 MADNESS_ASSERT(world);
164 auto ptr_opt = (world->ptr_from_id< ScalarResult<T> >(id));
165 if (!ptr_opt)
166 MADNESS_EXCEPTION("ScalarResult: remote operation attempting to use a locally uninitialized object",0);
167 ptr.reset(ptr_opt.value(), [] (ScalarResult<T> *p_) -> void {}); // disable destruction
168 if (!ptr)
169 MADNESS_EXCEPTION("ScalarResult<T> operation attempting to use an unregistered object",0);
170 } else {
171 ptr=nullptr;
172 }
173 }
174};
175
176
177/// base class
179public:
180
181 typedef std::vector<std::shared_ptr<MacroTaskBase> > taskqT;
182
184 virtual ~MacroTaskBase() {};
185
186 double priority=1.0;
188
192
193 bool is_complete() const {return stat==Complete;}
194 bool is_running() const {return stat==Running;}
195 bool is_waiting() const {return stat==Waiting;}
196
197 virtual void run(World& world, Cloud& cloud, taskqT& taskq, const long element, const bool debug) = 0;
198 virtual void cleanup() = 0; // clear static data (presumably persistent input data)
199
200 virtual void print_me(std::string s="") const {
201 printf("this is task with priority %4.1f\n",priority);
202 }
203 virtual void print_me_as_table(std::string s="") const {
204 print("nothing to print");
205 }
207 std::stringstream ss;
208 ss << std::setw(5) << this->get_priority() << " " <<this->stat;
209 return ss.str();
210 }
211
212 double get_priority() const {return priority;}
213 void set_priority(const double p) {priority=p;}
214
215 friend std::ostream& operator<<(std::ostream& os, const MacroTaskBase::Status s) {
216 if (s==MacroTaskBase::Status::Running) os << "Running";
217 if (s==MacroTaskBase::Status::Waiting) os << "Waiting";
218 if (s==MacroTaskBase::Status::Complete) os << "Complete";
219 if (s==MacroTaskBase::Status::Unknown) os << "Unknown";
220 return os;
221 }
222};
223
224
225template<typename macrotaskT>
227
228public:
229
231
233
234 void cleanup() {};
235};
236
237
238
239class MacroTaskQ : public WorldObject< MacroTaskQ> {
240
242 std::shared_ptr<World> subworld_ptr;
244 std::mutex taskq_mutex;
246 long nsubworld=1;
247 std::shared_ptr< WorldDCPmapInterface< Key<1> > > pmap1;
248 std::shared_ptr< WorldDCPmapInterface< Key<2> > > pmap2;
249 std::shared_ptr< WorldDCPmapInterface< Key<3> > > pmap3;
250 std::shared_ptr< WorldDCPmapInterface< Key<4> > > pmap4;
251 std::shared_ptr< WorldDCPmapInterface< Key<5> > > pmap5;
252 std::shared_ptr< WorldDCPmapInterface< Key<6> > > pmap6;
253
254 bool printdebug() const {return printlevel>=10;}
255 bool printprogress() const {return (printlevel>=3) and (not (printdebug()));}
256 bool printtimings() const {return universe.rank()==0 and printlevel>=3;}
257
258public:
259
262 long get_nsubworld() const {return nsubworld;}
263 void set_printlevel(const long p) {printlevel=p;}
264
265 /// create an empty taskq and initialize the subworlds
266 MacroTaskQ(World& universe, int nworld, const long printlevel=0)
269 , taskq()
271 , nsubworld(nworld)
272 , cloud(universe)
273 {
274
276 this->process_pending();
277 }
278
280
281 /// for each process create a world using a communicator shared with other processes by round-robin
282 /// copy-paste from test_world.cc
283 static std::shared_ptr<World> create_worlds(World& universe, const std::size_t nsubworld) {
284
285 int color = universe.rank() % nsubworld;
287
288 std::shared_ptr<World> all_worlds;
289 all_worlds.reset(new World(comm));
290
292 return all_worlds;
293 }
294
295 /// run all tasks, tasks may store the results in the cloud
297
298 for (const auto& t : vtask) if (universe.rank()==0) t->set_waiting();
299 for (size_t i=0; i<vtask.size(); ++i) add_replicated_task(vtask[i]);
300 if (printdebug()) print_taskq();
301 if (printtimings()) {
302 print("number of tasks in taskq",taskq.size());
303 print("redirecting output to files task.#####");
304 }
305
306 double cpu0=cpu_time();
309 double cpu1=cpu_time();
310 if (printtimings()) print("cloud replication wall time",cpu1-cpu0);
312 universe.gop.set_forbid_fence(true); // make sure there are no hidden universe fences
320
321 double cpu00=cpu_time();
322
323 World& subworld=get_subworld();
324// if (printdebug()) print("I am subworld",subworld.id());
325 double tasktime=0.0;
326 if (printprogress() and universe.rank()==0) std::cout << "progress in percent: " << std::flush;
327 while (true) {
328 long element=get_scheduled_task_number(subworld);
329 double cpu0=cpu_time();
330 if (element<0) break;
331 std::shared_ptr<MacroTaskBase> task=taskq[element];
332 if (printdebug()) print("starting task no",element, "in subworld",subworld.id(),"at time",wall_time());
333
334 task->run(subworld,cloud, taskq, element, printdebug());
335
336 double cpu1=cpu_time();
337 set_complete(element);
338 tasktime+=(cpu1-cpu0);
339 if (printdebug()) printf("completed task %3ld after %6.1fs at time %6.1fs\n",element,cpu1-cpu0,wall_time());
340
341 // print progress
342 const std::size_t ntask=taskq.size();
343 // return percentile of ntask for element
344 auto in_percentile = [&ntask](const long element) {
345 return std::floor(element/(0.1*(ntask+1)));
346 };
347 auto is_first_in_percentile = [&](const long element) {
348 return (in_percentile(element)!=in_percentile(element-1));
349 };
350 if (printprogress() and is_first_in_percentile(element)) {
351 std::cout << int(in_percentile(element)*10) << " " << std::flush;
352 }
353 }
356 universe.gop.sum(tasktime);
357 if (printprogress() and universe.rank()==0) std::cout << std::endl;
358 double cpu11=cpu_time();
360 if (printtimings()) {
361 printf("completed taskqueue after %4.1fs at time %4.1fs\n", cpu11 - cpu00, wall_time());
362 printf(" total cpu time / per world %4.1fs %4.1fs\n", tasktime, tasktime / universe.size());
363 }
364
365 // cleanup task-persistent input data
366 for (auto& task : taskq) task->cleanup();
367 cloud.clear_cache(subworld);
368 cloud.clear();
369 subworld.gop.fence();
370 subworld.gop.fence();
379 }
380
382 for (const auto& t : vtask) {
383 if (universe.rank()==0) t->set_waiting();
385 }
386 }
387
388 void print_taskq() const {
390 if (universe.rank()==0) {
391 print("\ntaskq on universe rank",universe.rank());
392 print("total number of tasks: ",taskq.size());
393 print(" task batch priority status");
394 for (const auto& t : taskq) t->print_me_as_table();
395 }
397 }
398
399private:
400 void add_replicated_task(const std::shared_ptr<MacroTaskBase>& task) {
401 taskq.push_back(task);
402 }
403
404 /// scheduler is located on universe.rank==0
406 long number=0;
407 if (subworld.rank()==0) {
409 number=r.get();
410 }
411 subworld.gop.broadcast_serializable(number, 0);
412 subworld.gop.fence();
413 return number;
414
415 }
416
419 std::lock_guard<std::mutex> lock(taskq_mutex);
420
421 auto is_Waiting = [](const std::shared_ptr<MacroTaskBase>& mtb_ptr) {return mtb_ptr->is_waiting();};
422 auto it=std::find_if(taskq.begin(),taskq.end(),is_Waiting);
423 if (it!=taskq.end()) {
424 it->get()->set_running();
425 long element=it-taskq.begin();
426 return element;
427 }
428// print("could not find task to schedule");
429 return -1;
430 }
431
432 /// scheduler is located on rank==0
433 void set_complete(const long task_number) const {
434 this->task(ProcessID(0), &MacroTaskQ::set_complete_local, task_number);
435 }
436
437 /// scheduler is located on rank==0
438 void set_complete_local(const long task_number) const {
440 taskq[task_number]->set_complete();
441 }
442
443public:
452private:
453 std::size_t size() const {
454 return taskq.size();
455 }
456
457};
458
459
460
461
462template<typename taskT>
465
466 template<typename Q>
467 struct is_vector : std::false_type {
468 };
469 template<typename Q>
470 struct is_vector<std::vector<Q>> : std::true_type {
471 };
472
473 typedef typename taskT::resultT resultT;
474 typedef typename taskT::argtupleT argtupleT;
476 taskT task;
477 bool debug=false;
478 std::string name="unknown_task";
479
480 /// RAII class to redirect cout to a file
481 struct io_redirect {
482 std::streambuf* stream_buffer_cout;
483 std::ofstream ofile;
484 bool debug=false;
485
486 io_redirect(const long task_number, std::string filename, bool debug=false) : debug(debug) {
487 constexpr std::size_t bufsize=256;
488 char cfilename[bufsize];
489 std::snprintf(cfilename,bufsize,"%s.%5.5ld",filename.c_str(),task_number);
490 ofile=std::ofstream(cfilename);
491 if (debug) std::cout << "redirecting to file " << cfilename << std::endl;
492 stream_buffer_cout = std::cout.rdbuf(ofile.rdbuf());
493 std::cout.sync_with_stdio(true);
494 }
495
497 std::cout.rdbuf(stream_buffer_cout);
498 ofile.close();
499 std::cout.sync_with_stdio(true);
500 if (debug) std::cout << "redirecting back to cout" << std::endl;
501 }
502 };
503
504
505public:
506
507 /// constructor takes the actual task
508 MacroTask(World &world, taskT &task, std::shared_ptr<MacroTaskQ> taskq_ptr = 0)
510 if (taskq_ptr) {
511 // for the time being this condition must hold because tasks are
512 // constructed as replicated objects and are not broadcast to other processes
513 MADNESS_CHECK(world.id()==taskq_ptr->get_world().id());
514 }
515 }
516
517 void set_debug(const bool value) {
518 debug=value;
519 }
520
521 /// set a name for this task for debugging and output naming
522 void set_name(const std::string name1) {
523 name=name1;
524 }
525
526 /// this mimicks the original call to the task functor, called from the universe
527
528 /// store all input to the cloud, create output Function<T,NDIM> in the universe,
529 /// create the batched task and shove it into the taskq. Possibly execute the taskq.
530 template<typename ... Ts>
531 resultT operator()(const Ts &... args) {
532
533 const bool immediate_execution = (not taskq_ptr);
534 if (not taskq_ptr) taskq_ptr.reset(new MacroTaskQ(world, world.size()));
535 if (debug) taskq_ptr->set_printlevel(20);
536
537 auto argtuple = std::tie(args...);
538 static_assert(std::is_same<decltype(argtuple), argtupleT>::value, "type or number of arguments incorrect");
539
540 // partition the argument vector into batches
541 auto partitioner=task.partitioner;
542 if (not partitioner) partitioner.reset(new MacroTaskPartitioner);
543 partitioner->set_nsubworld(world.size());
544 partitionT partition = partitioner->partition_tasks(argtuple);
545
546 // store input and output: output being a pointer to a universe function (vector)
547 recordlistT inputrecords = taskq_ptr->cloud.store(world, argtuple);
548 resultT result = task.allocator(world, argtuple);
549 auto outputrecords =prepare_output_records(taskq_ptr->cloud, result);
550
551 // create tasks and add them to the taskq
553 for (const auto& batch_prio : partition) {
554 vtask.push_back(
555 std::shared_ptr<MacroTaskBase>(new MacroTaskInternal(task, batch_prio, inputrecords, outputrecords, name)));
556 }
557 taskq_ptr->add_tasks(vtask);
558
559 if (immediate_execution) taskq_ptr->run_all();
560
561 // return std::move(result);
562 return result;
563 }
564
565private:
566
568 std::shared_ptr<MacroTaskQ> taskq_ptr;
569
570 /// store the result WorldObject in the cloud and return the recordlist
576 "unknown result type in prepare_output_records");
577 recordlistT outputrecords;
579 outputrecords += cloud.store(world, result.get_impl().get()); // store pointer to FunctionImpl
580 } else if constexpr (is_madness_function_vector<resultT>::value) {
581 outputrecords += cloud.store(world, get_impl(result));
582 } else if constexpr (is_scalar_result_ptr<resultT>::value) {
583 outputrecords += cloud.store(world, result); // store pointer to ScalarResult
585 outputrecords+=cloud.store(world,result);
586 } else {
587 MADNESS_EXCEPTION("\n\n unknown result type in prepare_input ", 1);
588 }
589 return outputrecords;
590 }
591
592
593 class MacroTaskInternal : public MacroTaskIntermediate<MacroTask> {
594
595 typedef decay_tuple<typename taskT::argtupleT> argtupleT; // removes const, &, etc
596 typedef typename taskT::resultT resultT;
599 public:
600 taskT task;
601 std::string name="unknown_task"; // for identification in debug output
602
603 MacroTaskInternal(const taskT &task, const std::pair<Batch,double> &batch_prio,
604 const recordlistT &inputrecords, const recordlistT &outputrecords, std::string name)
610 "unknown result type in MacroTaskInternal constructor");
611 this->task.batch=batch_prio.first;
612 this->priority=batch_prio.second;
613 }
614
615
616 virtual void print_me(std::string s="") const {
617 print("this is task",typeid(task).name(),"with batch", task.batch,"priority",this->get_priority());
618 }
619
620 virtual void print_me_as_table(std::string s="") const {
621 std::stringstream ss;
622 std::string name=typeid(task).name();
623 std::size_t namesize=std::min(std::size_t(28),name.size());
624 name += std::string(28-namesize,' ');
625
626 std::stringstream ssbatch;
627 ssbatch << task.batch;
628 std::string strbatch=ssbatch.str();
629 int nspaces=std::max(int(0),35-int(ssbatch.str().size()));
630 strbatch+=std::string(nspaces,' ');
631
632 ss << name
633 << std::setw(10) << strbatch
635 print(ss.str());
636 }
637
638 void run(World &subworld, Cloud &cloud, MacroTaskBase::taskqT &taskq, const long element, const bool debug) {
639
640 io_redirect io(element,name+"_task",debug);
641 const argtupleT argtuple = cloud.load<argtupleT>(subworld, inputrecords);
642 const argtupleT batched_argtuple = task.batch.template copy_input_batch(argtuple);
643 try {
644 print("starting task no",element, "in subworld",subworld.id(),"at time",wall_time());
645 double cpu0=cpu_time();
646 resultT result_tmp = std::apply(task, batched_argtuple);
647 double cpu1=cpu_time();
648 std::size_t bufsize=256;
649 char buffer[bufsize];
650 std::snprintf(buffer,bufsize,"completed task %3ld after %6.1fs at time %6.1fs\n",element,cpu1-cpu0,wall_time());
651 print(std::string(buffer));
652
653 resultT result = get_output(subworld, cloud, argtuple); // lives in the universe
655 result_tmp.compress();
656 gaxpy(1.0,result,1.0, result_tmp);
657 } else if constexpr(is_madness_function_vector<resultT>::value) {
658 compress(subworld, result_tmp);
659 resultT tmp1=task.allocator(subworld,argtuple);
660 tmp1=task.batch.template insert_result_batch(tmp1,result_tmp);
661 gaxpy(1.0,result,1.0,tmp1,false);
662 // was using operator+=, but this requires a fence, which is not allowed here..
663 // result += tmp1;
664 } else if constexpr (is_scalar_result_ptr<resultT>::value) {
665 gaxpy(1.0, *result, 1.0, result_tmp->get_local(), false);
666 } else if constexpr (is_scalar_result_ptr_vector<resultT>::value) {
667 resultT tmp1=task.allocator(subworld,argtuple);
668 tmp1=task.batch.template insert_result_batch(tmp1,result_tmp);
669
670 std::size_t sz=result.size();
671 for (int i=0; i<sz; ++i) {
672 gaxpy(1.0, *(result[i]), 1.0, tmp1[i]->get_local(), false);
673 }
674 } else {
675 MADNESS_EXCEPTION("failing result",1);
676 }
677 } catch (std::exception& e) {
678 print("failing task no",element,"in subworld",subworld.id(),"at time",wall_time());
679 print(e.what());
680 print("\n\n");
681 MADNESS_EXCEPTION("failing task",1);
682 }
683
684 };
685
686 /// get the pointers to the output functions living in the universe
687
688 /// the result of a task living in subworld will be accumulated into result
689 /// living in the universe
690 resultT get_output(World &subworld, Cloud &cloud, const argtupleT &argtuple) {
691 resultT result;
693 typedef std::shared_ptr<typename resultT::implT> impl_ptrT;
694 result.set_impl(cloud.load<impl_ptrT>(subworld, outputrecords));
695 } else if constexpr (is_madness_function_vector<resultT>::value) {
696 typedef std::shared_ptr<typename resultT::value_type::implT> impl_ptrT;
697 std::vector<impl_ptrT> rimpl = cloud.load<std::vector<impl_ptrT>>(subworld, outputrecords);
698 result.resize(rimpl.size());
699 set_impl(result, rimpl);
700 } else if constexpr (is_scalar_result_ptr<resultT>::value) {
701 result = cloud.load<resultT>(subworld, outputrecords);
702 } else if constexpr (is_scalar_result_ptr_vector<resultT>::value) {
703 typedef typename resultT::value_type::element_type ScalarResultT;
704 result=cloud.load<std::vector<std::shared_ptr<ScalarResultT>>>(subworld, outputrecords);
705
706 } else {
707 MADNESS_EXCEPTION("unknown result type in get_output", 1);
708 }
709 return result;
710 }
711
712 };
713
714};
715
717public:
719 std::string name="unknown_task";
720 std::shared_ptr<MacroTaskPartitioner> partitioner=0;
722};
723
724
725} /* namespace madness */
726
727#endif /* SRC_MADNESS_MRA_MACROTASKQ_H_ */
Wrapper around MPI_Comm. Has a shallow copy constructor; use Create(Get_group()) for deep copy.
Definition safempi.h:490
Intracomm Split(int Color, int Key=0) const
Definition safempi.h:635
a batch consists of a 2D-input batch and a 1D-output batch: K-batch <- (I-batch, J-batch)
Definition macrotaskpartitioner.h:132
cloud class
Definition cloud.h:147
void clear()
Definition cloud.h:258
recordlistT store(madness::World &world, const T &source)
Definition cloud.h:302
Recordlist< keyT > recordlistT
Definition cloud.h:160
void replicate(const std::size_t chunk_size=INT_MAX)
Definition cloud.h:325
void print_size(World &universe)
Definition cloud.h:193
T load(madness::World &world, const recordlistT recordlist) const
Definition cloud.h:274
void clear_cache(World &subworld)
Definition cloud.h:252
void print_timings(World &universe) const
Definition cloud.h:230
static void set_default_pmap(World &world)
Sets the default process map.
Definition mraimpl.h:3550
static std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > & get_pmap()
Returns the default process map.
Definition funcdefaults.h:488
static void set_pmap(const std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > &value)
Sets the default process map (does not redistribute existing functions)
Definition funcdefaults.h:504
A future is a possibly yet unevaluated value.
Definition future.h:373
T & get(bool dowork=true) &
Gets the value, waiting if necessary.
Definition future.h:574
base class
Definition macrotaskq.h:178
void set_running()
Definition macrotaskq.h:190
virtual ~MacroTaskBase()
Definition macrotaskq.h:184
void set_waiting()
Definition macrotaskq.h:191
MacroTaskBase()
Definition macrotaskq.h:183
virtual void cleanup()=0
virtual void print_me(std::string s="") const
Definition macrotaskq.h:200
std::string print_priority_and_status_to_string() const
Definition macrotaskq.h:206
void set_complete()
Definition macrotaskq.h:189
double priority
Definition macrotaskq.h:186
bool is_complete() const
Definition macrotaskq.h:193
Status
Definition macrotaskq.h:187
@ Complete
Definition macrotaskq.h:187
@ Running
Definition macrotaskq.h:187
@ Unknown
Definition macrotaskq.h:187
@ Waiting
Definition macrotaskq.h:187
bool is_running() const
Definition macrotaskq.h:194
enum madness::MacroTaskBase::Status stat
double get_priority() const
Definition macrotaskq.h:212
void set_priority(const double p)
Definition macrotaskq.h:213
virtual void print_me_as_table(std::string s="") const
Definition macrotaskq.h:203
std::vector< std::shared_ptr< MacroTaskBase > > taskqT
Definition macrotaskq.h:181
virtual void run(World &world, Cloud &cloud, taskqT &taskq, const long element, const bool debug)=0
friend std::ostream & operator<<(std::ostream &os, const MacroTaskBase::Status s)
Definition macrotaskq.h:215
bool is_waiting() const
Definition macrotaskq.h:195
Definition macrotaskq.h:226
MacroTaskIntermediate()
Definition macrotaskq.h:230
void cleanup()
Definition macrotaskq.h:234
~MacroTaskIntermediate()
Definition macrotaskq.h:232
Definition macrotaskq.h:716
MacroTaskOperationBase()
Definition macrotaskq.h:721
Batch batch
Definition macrotaskq.h:718
std::shared_ptr< MacroTaskPartitioner > partitioner
Definition macrotaskq.h:720
std::string name
Definition macrotaskq.h:719
partition one (two) vectors into 1D (2D) batches.
Definition macrotaskpartitioner.h:190
std::list< std::pair< Batch, double > > partitionT
Definition macrotaskpartitioner.h:194
Definition macrotaskq.h:239
long nsubworld
Definition macrotaskq.h:246
std::mutex taskq_mutex
Definition macrotaskq.h:244
bool printdebug() const
Definition macrotaskq.h:254
static void set_pmap(World &world)
Definition macrotaskq.h:444
void set_complete(const long task_number) const
scheduler is located on rank==0
Definition macrotaskq.h:433
World & universe
Definition macrotaskq.h:241
std::shared_ptr< WorldDCPmapInterface< Key< 1 > > > pmap1
Definition macrotaskq.h:247
std::shared_ptr< WorldDCPmapInterface< Key< 3 > > > pmap3
Definition macrotaskq.h:249
MacroTaskBase::taskqT taskq
Definition macrotaskq.h:243
void run_all(MacroTaskBase::taskqT vtask=MacroTaskBase::taskqT())
run all tasks, tasks may store the results in the cloud
Definition macrotaskq.h:296
void add_tasks(MacroTaskBase::taskqT &vtask)
Definition macrotaskq.h:381
long get_scheduled_task_number(World &subworld)
scheduler is located on universe.rank==0
Definition macrotaskq.h:405
void set_complete_local(const long task_number) const
scheduler is located on rank==0
Definition macrotaskq.h:438
World & get_subworld()
Definition macrotaskq.h:261
MacroTaskQ(World &universe, int nworld, const long printlevel=0)
create an empty taskq and initialize the subworlds
Definition macrotaskq.h:266
std::shared_ptr< WorldDCPmapInterface< Key< 4 > > > pmap4
Definition macrotaskq.h:250
std::size_t size() const
Definition macrotaskq.h:453
std::shared_ptr< WorldDCPmapInterface< Key< 5 > > > pmap5
Definition macrotaskq.h:251
void print_taskq() const
Definition macrotaskq.h:388
std::shared_ptr< World > subworld_ptr
Definition macrotaskq.h:242
bool printtimings() const
Definition macrotaskq.h:256
std::shared_ptr< WorldDCPmapInterface< Key< 2 > > > pmap2
Definition macrotaskq.h:248
bool printprogress() const
Definition macrotaskq.h:255
std::shared_ptr< WorldDCPmapInterface< Key< 6 > > > pmap6
Definition macrotaskq.h:252
void set_printlevel(const long p)
Definition macrotaskq.h:263
long printlevel
Definition macrotaskq.h:245
madness::Cloud cloud
Definition macrotaskq.h:260
long get_nsubworld() const
Definition macrotaskq.h:262
void add_replicated_task(const std::shared_ptr< MacroTaskBase > &task)
Definition macrotaskq.h:400
~MacroTaskQ()
Definition macrotaskq.h:279
static std::shared_ptr< World > create_worlds(World &universe, const std::size_t nsubworld)
Definition macrotaskq.h:283
long get_scheduled_task_number_local()
Definition macrotaskq.h:417
Definition macrotaskq.h:593
taskT task
Definition macrotaskq.h:600
decay_tuple< typename taskT::argtupleT > argtupleT
Definition macrotaskq.h:595
virtual void print_me_as_table(std::string s="") const
Definition macrotaskq.h:620
std::string name
Definition macrotaskq.h:601
taskT::resultT resultT
Definition macrotaskq.h:596
void run(World &subworld, Cloud &cloud, MacroTaskBase::taskqT &taskq, const long element, const bool debug)
Definition macrotaskq.h:638
recordlistT outputrecords
Definition macrotaskq.h:598
resultT get_output(World &subworld, Cloud &cloud, const argtupleT &argtuple)
get the pointers to the output functions living in the universe
Definition macrotaskq.h:690
virtual void print_me(std::string s="") const
Definition macrotaskq.h:616
MacroTaskInternal(const taskT &task, const std::pair< Batch, double > &batch_prio, const recordlistT &inputrecords, const recordlistT &outputrecords, std::string name)
Definition macrotaskq.h:603
recordlistT inputrecords
Definition macrotaskq.h:597
Definition macrotaskq.h:463
void set_name(const std::string name1)
set a name for this task for debugging and output naming
Definition macrotaskq.h:522
taskT::resultT resultT
Definition macrotaskq.h:473
std::shared_ptr< MacroTaskQ > taskq_ptr
Definition macrotaskq.h:568
taskT::argtupleT argtupleT
Definition macrotaskq.h:474
MacroTask(World &world, taskT &task, std::shared_ptr< MacroTaskQ > taskq_ptr=0)
constructor takes the actual task
Definition macrotaskq.h:508
World & world
Definition macrotaskq.h:567
resultT operator()(const Ts &... args)
this mimicks the original call to the task functor, called from the universe
Definition macrotaskq.h:531
bool debug
Definition macrotaskq.h:477
taskT task
Definition macrotaskq.h:476
MacroTaskPartitioner::partitionT partitionT
Definition macrotaskq.h:464
std::string name
Definition macrotaskq.h:478
Cloud::recordlistT recordlistT
Definition macrotaskq.h:475
void set_debug(const bool value)
Definition macrotaskq.h:517
recordlistT prepare_output_records(Cloud &cloud, resultT &result)
store the result WorldObject in the cloud and return the recordlist
Definition macrotaskq.h:571
helper class for returning the result of a task, which is not a madness Function, but a simple scalar
Definition macrotaskq.h:43
void serialize(Archive &ar)
Definition macrotaskq.h:83
void gaxpy(const double a, const T &right, double b, const bool fence=true)
accumulate, optional fence
Definition macrotaskq.h:75
ScalarResult< T > & operator+=(const T &x)
Definition macrotaskq.h:69
ScalarResult< T > & operator=(const ScalarResult< T > &other)=delete
disable assignment operator
T get()
after completion of the taskq get the final value
Definition macrotaskq.h:88
ScalarResult(World &world)
Definition macrotaskq.h:46
T value
the scalar value
Definition macrotaskq.h:101
~ScalarResult()
Definition macrotaskq.h:58
ScalarResult< T > & operator=(const T &x)
simple assignment of the scalar value
Definition macrotaskq.h:64
ScalarResult< T > & operator=(ScalarResult< T > &&)=default
T get_local() const
Definition macrotaskq.h:95
T value_type
Definition macrotaskq.h:45
void broadcast_serializable(objT &obj, ProcessID root)
Broadcast a serializable object.
Definition worldgop.h:754
void fence(bool debug=false)
Synchronizes all processes in communicator AND globally ensures no pending AM or tasks.
Definition worldgop.cc:161
bool set_forbid_fence(bool value)
Set forbid_fence flag to new value and return old value.
Definition worldgop.h:674
void sum(T *buf, size_t nelem)
Inplace global sum while still processing AM & tasks.
Definition worldgop.h:870
SafeMPI::Intracomm & comm()
Returns the associated SafeMPI communicator.
Definition worldmpi.h:286
Implements most parts of a globally addressable object (via unique ID).
Definition world_object.h:364
World & get_world() const
Returns a reference to the world.
Definition world_object.h:717
World & world
The World this object belongs to. (Think globally, act locally).
Definition world_object.h:381
void process_pending()
To be called from derived constructor to process pending messages.
Definition world_object.h:656
detail::task_result_type< memfnT >::futureT send(ProcessID dest, memfnT memfn) const
Definition world_object.h:731
detail::task_result_type< memfnT >::futureT task(ProcessID dest, memfnT memfn, const TaskAttributes &attr=TaskAttributes()) const
Sends task to derived class method returnT (this->*memfn)().
Definition world_object.h:1005
A parallel world class.
Definition world.h:132
static World * world_from_id(std::uint64_t id)
Convert a World ID to a World pointer.
Definition world.h:474
ProcessID rank() const
Returns the process rank in this World (same as MPI_Comm_rank()).
Definition world.h:318
WorldMpiInterface & mpi
MPI interface.
Definition world.h:202
ProcessID size() const
Returns the number of processes in this World (same as MPI_Comm_size()).
Definition world.h:328
unsigned long id() const
Definition world.h:313
WorldGopInterface & gop
Global operations.
Definition world.h:205
std::optional< T * > ptr_from_id(uniqueidT id) const
Look up a local pointer from a world-wide unique ID.
Definition world.h:414
Class for unique global IDs.
Definition uniqueid.h:53
Declares the Cloud class for storing data and transfering them between worlds.
char * p(char *buf, const char *name, int k, int initial_level, double thresh, int order)
Definition derivatives.cc:72
const std::size_t bufsize
Definition derivatives.cc:16
static bool debug
Definition dirac-hatom.cc:16
auto T(World &world, response_space &f) -> response_space
Definition global_functions.cc:34
static const double v
Definition hatom_sf_dirac.cc:20
#define MADNESS_CHECK(condition)
Check a condition — even in a release build the condition is always evaluated so it can have side eff...
Definition madness_exception.h:182
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
static const char * filename
Definition legendre.cc:96
static double cpu_time()
Returns the cpu time in seconds relative to an arbitrary origin.
Definition timers.h:127
void set_impl(std::vector< Function< T, NDIM > > &v, const std::vector< std::shared_ptr< FunctionImpl< T, NDIM > > > vimpl)
Definition vmra.h:670
std::vector< std::shared_ptr< FunctionImpl< T, NDIM > > > get_impl(const std::vector< Function< T, NDIM > > &v)
Definition vmra.h:663
std::vector< std::shared_ptr< ScalarResult< T > > > scalar_result_shared_ptr_vector(World &world, std::size_t n)
helper function to create a vector of ScalarResult, circumventing problems with the constructors
Definition macrotaskq.h:106
void compress(World &world, const std::vector< Function< T, NDIM > > &v, bool fence=true)
Compress a vector of functions.
Definition vmra.h:133
static const Slice _(0,-1, 1)
void print(const T &t, const Ts &... ts)
Print items to std::cout (items separated by spaces) and terminate with a new line.
Definition print.h:225
decltype(decay_types(std::declval< T >())) decay_tuple
Definition macrotaskpartitioner.h:22
double wall_time()
Returns the wall time in seconds relative to an arbitrary origin.
Definition timers.cc:48
void gaxpy(const double a, ScalarResult< T > &left, const double b, const T &right, const bool fence=true)
the result type of a macrotask must implement gaxpy
Definition macrotaskq.h:140
Definition mraimpl.h:50
static const double b
Definition nonlinschro.cc:119
static const double a
Definition nonlinschro.cc:118
RAII class to redirect cout to a file.
Definition macrotaskq.h:481
io_redirect(const long task_number, std::string filename, bool debug=false)
Definition macrotaskq.h:486
std::streambuf * stream_buffer_cout
Definition macrotaskq.h:482
~io_redirect()
Definition macrotaskq.h:496
bool debug
Definition macrotaskq.h:484
std::ofstream ofile
Definition macrotaskq.h:483
Definition macrotaskq.h:467
Default load of an object via serialize(ar, t).
Definition archive.h:666
Default store of an object via serialize(ar, t).
Definition archive.h:611
Definition macrotaskpartitioner.h:25
Definition mra.h:2761
Definition macrotaskq.h:129
Definition macrotaskq.h:123
static void load(const Archive &ar, std::shared_ptr< ScalarResult< T > > &ptr)
Definition macrotaskq.h:156
static void store(const Archive &ar, const std::shared_ptr< ScalarResult< T > > &ptr)
Definition macrotaskq.h:146
void e()
Definition test_sig.cc:75
Declares the World class for the parallel runtime environment.
int ProcessID
Used to clearly identify process number/rank.
Definition worldtypes.h:43