MADNESS  0.10.1
macrotaskq.h
Go to the documentation of this file.
1 /**
2  \file macrotaskq.h
3  \brief Declares the \c macrotaskq and MacroTaskBase classes
4  \ingroup mra
5 
6  A MacroTaskq executes tasks on World objects, e.g. differentiation of a function or other
7  arithmetic. Complex algorithms can be implemented.
8 
9  The universe world is split into subworlds, each of them executing macrotasks of the task queue.
10  This improves locality and speedups for large number of compute nodes, by reducing communications
11  within worlds.
12 
13  The user defines a macrotask (an example is found in test_vectormacrotask.cc), the tasks are
14  lightweight and carry only bookkeeping information, actual input and output are stored in a
15  cloud (see cloud.h)
16 
17  The user-defined macrotask is derived from MacroTaskIntermediate and must implement the run()
18  method. A heterogeneous task queue is possible.
19 
20  TODO: priority q
21  TODO: task submission from inside task (serialize task instead of replicate)
22  TODO: update documentation
23  TODO: consider serializing task member variables
24 
25 */
26 
27 
28 
29 #ifndef SRC_MADNESS_MRA_MACROTASKQ_H_
30 #define SRC_MADNESS_MRA_MACROTASKQ_H_
31 
32 #include <madness/world/cloud.h>
33 #include <madness/world/world.h>
35 
36 namespace madness {
37 
38 /// helper class for returning the result of a task, which is not a madness Function, but a simple scalar
39 
40 /// the result value is accumulated via gaxpy in universe rank=0, after completion of the taskq the final
41 /// value can be obtained via get(), which includes a broadcast of the final value to all processes
42 template<typename T=double>
43 class ScalarResult : public WorldObject<ScalarResult<T>> {
44 public:
45  typedef T value_type;
47  this->process_pending();
48  }
49 
50  /// Disable the default copy constructor
51  ScalarResult<T>(const ScalarResult<T>& other) = delete;
52  ScalarResult<T>(ScalarResult<T>&& ) = default;
54 
55  /// disable assignment operator
56  ScalarResult<T>& operator=(const ScalarResult<T>& other) = delete;
57 
59 // print("calling destructor of ScalarResult",this->id());
60 // std::cout << std::flush;
61  }
62 
63  /// simple assignment of the scalar value
65  value = x;
66  return *this;
67  }
68 
70  gaxpy(1.0, x, 1.0,true);
71  return *this;
72  }
73 
74  /// accumulate, optional fence
75  void gaxpy(const double a, const T& right, double b, const bool fence=true) {
76  if (this->get_world().rank()==0) {
77  value =a*value + b * right;
78  }
79  else this->send(0, &ScalarResult<T>::gaxpy, a, right, b, fence);
80  }
81 
82  template<typename Archive>
83  void serialize(Archive &ar) {
84  ar & value;
85  }
86 
87  /// after completion of the taskq get the final value
88  T get() {
89  this->get_world().gop.broadcast_serializable(*this, 0);
90  return value;
91  }
92 
93  /// get the local value of this rank, which might differ for different ranks
94  /// for the final value use get()
95  T get_local() const {
96  return value;
97  }
98 
99 private:
100  /// the scalar value
101  T value=T();
102 };
103 
104 /// helper function to create a vector of ScalarResult, circumventing problems with the constructors
105 template<typename T>
106 std::vector<std::shared_ptr<ScalarResult<T>>> scalar_result_shared_ptr_vector(World& world, std::size_t n) {
107  auto v=std::vector<std::shared_ptr<ScalarResult<T>>>();
108  for (std::size_t i=0; i<n; ++i) v.emplace_back(std::make_shared<ScalarResult<T>>(world));
109 // for (int i=0; i<n; ++i) print("creating vector of ScalarResult",v[i]->id());
110  std::cout << std::flush;
111  auto ptr_opt = world.ptr_from_id< WorldObject< ScalarResult<T> > >(v[0]->id());
112  if (!ptr_opt)
113  MADNESS_EXCEPTION("ScalarResult: remote operation attempting to use a locally uninitialized object",0);
114  auto ptr = static_cast< ScalarResult<T>*>(*ptr_opt);
115  if (!ptr)
116  MADNESS_EXCEPTION("ScalarResult<T> operation attempting to use an unregistered object",0);
117  return v;
118 }
119 
120 
121 // type traits to check if a template parameter is a WorldContainer
122 template<typename>
123 struct is_scalar_result_ptr : std::false_type {};
124 
125 template <typename T>
126 struct is_scalar_result_ptr<std::shared_ptr<madness::ScalarResult<T>>> : std::true_type {};
127 
128 template<typename>
129 struct is_scalar_result_ptr_vector : std::false_type {
130 };
131 
132 template<typename T>
133 struct is_scalar_result_ptr_vector<std::vector<std::shared_ptr<typename madness::ScalarResult<T>>>> : std::true_type {
134 };
135 
136 
137 
138 /// the result type of a macrotask must implement gaxpy
139 template<typename T>
140 void gaxpy(const double a, ScalarResult<T>& left, const double b, const T& right, const bool fence=true) {
141  left.gaxpy(a, right, b, fence);
142 }
143 
144 template <class Archive, typename T>
145 struct madness::archive::ArchiveStoreImpl<Archive, std::shared_ptr<ScalarResult<T>>> {
146  static void store(const Archive& ar, const std::shared_ptr<ScalarResult<T>>& ptr) {
147  bool exists=(ptr) ? true : false;
148  ar & exists;
149  if (exists) ar & ptr->id();
150  }
151 };
152 
153 
154 template <class Archive, typename T>
155 struct madness::archive::ArchiveLoadImpl<Archive, std::shared_ptr<ScalarResult<T>>> {
156  static void load(const Archive& ar, std::shared_ptr<ScalarResult<T>>& ptr) {
157  bool exists=false;
158  ar & exists;
159  if (exists) {
160  uniqueidT id;
161  ar & id;
162  World* world = World::world_from_id(id.get_world_id());
163  MADNESS_ASSERT(world);
164  auto ptr_opt = (world->ptr_from_id< ScalarResult<T> >(id));
165  if (!ptr_opt)
166  MADNESS_EXCEPTION("ScalarResult: remote operation attempting to use a locally uninitialized object",0);
167  ptr.reset(ptr_opt.value(), [] (ScalarResult<T> *p_) -> void {}); // disable destruction
168  if (!ptr)
169  MADNESS_EXCEPTION("ScalarResult<T> operation attempting to use an unregistered object",0);
170  } else {
171  ptr=nullptr;
172  }
173  }
174 };
175 
176 
177 /// base class
179 public:
180 
181  typedef std::vector<std::shared_ptr<MacroTaskBase> > taskqT;
182 
184  virtual ~MacroTaskBase() {};
185 
186  double priority=1.0;
188 
192 
193  bool is_complete() const {return stat==Complete;}
194  bool is_running() const {return stat==Running;}
195  bool is_waiting() const {return stat==Waiting;}
196 
197  virtual void run(World& world, Cloud& cloud, taskqT& taskq, const long element, const bool debug) = 0;
198  virtual void cleanup() = 0; // clear static data (presumably persistent input data)
199 
200  virtual void print_me(std::string s="") const {
201  printf("this is task with priority %4.1f\n",priority);
202  }
203  virtual void print_me_as_table(std::string s="") const {
204  print("nothing to print");
205  }
207  std::stringstream ss;
208  ss << std::setw(5) << this->get_priority() << " " <<this->stat;
209  return ss.str();
210  }
211 
212  double get_priority() const {return priority;}
213  void set_priority(const double p) {priority=p;}
214 
215  friend std::ostream& operator<<(std::ostream& os, const MacroTaskBase::Status s) {
216  if (s==MacroTaskBase::Status::Running) os << "Running";
217  if (s==MacroTaskBase::Status::Waiting) os << "Waiting";
218  if (s==MacroTaskBase::Status::Complete) os << "Complete";
219  if (s==MacroTaskBase::Status::Unknown) os << "Unknown";
220  return os;
221  }
222 };
223 
224 
225 template<typename macrotaskT>
227 
228 public:
229 
231 
233 
234  void cleanup() {};
235 };
236 
237 
238 
239 class MacroTaskQ : public WorldObject< MacroTaskQ> {
240 
242  std::shared_ptr<World> subworld_ptr;
244  std::mutex taskq_mutex;
245  long printlevel=0;
246  long nsubworld=1;
247  std::shared_ptr< WorldDCPmapInterface< Key<1> > > pmap1;
248  std::shared_ptr< WorldDCPmapInterface< Key<2> > > pmap2;
249  std::shared_ptr< WorldDCPmapInterface< Key<3> > > pmap3;
250  std::shared_ptr< WorldDCPmapInterface< Key<4> > > pmap4;
251  std::shared_ptr< WorldDCPmapInterface< Key<5> > > pmap5;
252  std::shared_ptr< WorldDCPmapInterface< Key<6> > > pmap6;
253 
254  bool printdebug() const {return printlevel>=10;}
255  bool printprogress() const {return (printlevel>=3) and (not (printdebug()));}
256  bool printtimings() const {return universe.rank()==0 and printlevel>=3;}
257 
258 public:
259 
262  long get_nsubworld() const {return nsubworld;}
263  void set_printlevel(const long p) {printlevel=p;}
264 
265  /// create an empty taskq and initialize the subworlds
266  MacroTaskQ(World& universe, int nworld, const long printlevel=0)
268  , universe(universe)
269  , taskq()
271  , nsubworld(nworld)
272  , cloud(universe)
273  {
274 
276  this->process_pending();
277  }
278 
280 
281  /// for each process create a world using a communicator shared with other processes by round-robin
282  /// copy-paste from test_world.cc
283  static std::shared_ptr<World> create_worlds(World& universe, const std::size_t nsubworld) {
284 
285  int color = universe.rank() % nsubworld;
287 
288  std::shared_ptr<World> all_worlds;
289  all_worlds.reset(new World(comm));
290 
291  universe.gop.fence();
292  return all_worlds;
293  }
294 
295  /// run all tasks, tasks may store the results in the cloud
297 
298  for (const auto& t : vtask) if (universe.rank()==0) t->set_waiting();
299  for (size_t i=0; i<vtask.size(); ++i) add_replicated_task(vtask[i]);
300  if (printdebug()) print_taskq();
301  if (printtimings()) {
302  print("number of tasks in taskq",taskq.size());
303  print("redirecting output to files task.#####");
304  }
305 
306  double cpu0=cpu_time();
307  cloud.replicate();
308  universe.gop.fence();
309  double cpu1=cpu_time();
310  if (printtimings()) print("cloud replication wall time",cpu1-cpu0);
312  universe.gop.set_forbid_fence(true); // make sure there are no hidden universe fences
320 
321  double cpu00=cpu_time();
322 
323  World& subworld=get_subworld();
324 // if (printdebug()) print("I am subworld",subworld.id());
325  double tasktime=0.0;
326  if (printprogress() and universe.rank()==0) std::cout << "progress in percent: " << std::flush;
327  while (true) {
328  long element=get_scheduled_task_number(subworld);
329  double cpu0=cpu_time();
330  if (element<0) break;
331  std::shared_ptr<MacroTaskBase> task=taskq[element];
332  if (printdebug()) print("starting task no",element, "in subworld",subworld.id(),"at time",wall_time());
333 
334  task->run(subworld,cloud, taskq, element, printdebug());
335 
336  double cpu1=cpu_time();
337  set_complete(element);
338  tasktime+=(cpu1-cpu0);
339  if (printdebug()) printf("completed task %3ld after %6.1fs at time %6.1fs\n",element,cpu1-cpu0,wall_time());
340 
341  // print progress
342  const std::size_t ntask=taskq.size();
343  // return percentile of ntask for element
344  auto in_percentile = [&ntask](const long element) {
345  return std::floor(element/(0.1*(ntask+1)));
346  };
347  auto is_first_in_percentile = [&](const long element) {
348  return (in_percentile(element)!=in_percentile(element-1));
349  };
350  if (printprogress() and is_first_in_percentile(element)) {
351  std::cout << int(in_percentile(element)*10) << " " << std::flush;
352  }
353  }
355  universe.gop.fence();
356  universe.gop.sum(tasktime);
357  if (printprogress() and universe.rank()==0) std::cout << std::endl;
358  double cpu11=cpu_time();
360  if (printtimings()) {
361  printf("completed taskqueue after %4.1fs at time %4.1fs\n", cpu11 - cpu00, wall_time());
362  printf(" total cpu time / per world %4.1fs %4.1fs\n", tasktime, tasktime / universe.size());
363  }
364 
365  // cleanup task-persistent input data
366  for (auto& task : taskq) task->cleanup();
367  cloud.clear_cache(subworld);
368  cloud.clear();
369  subworld.gop.fence();
370  subworld.gop.fence();
371  universe.gop.fence();
378  universe.gop.fence();
379  }
380 
382  for (const auto& t : vtask) {
383  if (universe.rank()==0) t->set_waiting();
385  }
386  }
387 
388  void print_taskq() const {
389  universe.gop.fence();
390  if (universe.rank()==0) {
391  print("\ntaskq on universe rank",universe.rank());
392  print("total number of tasks: ",taskq.size());
393  print(" task batch priority status");
394  for (const auto& t : taskq) t->print_me_as_table();
395  }
396  universe.gop.fence();
397  }
398 
399 private:
400  void add_replicated_task(const std::shared_ptr<MacroTaskBase>& task) {
401  taskq.push_back(task);
402  }
403 
404  /// scheduler is located on universe.rank==0
406  long number=0;
407  if (subworld.rank()==0) {
409  number=r.get();
410  }
411  subworld.gop.broadcast_serializable(number, 0);
412  subworld.gop.fence();
413  return number;
414 
415  }
416 
419  std::lock_guard<std::mutex> lock(taskq_mutex);
420 
421  auto is_Waiting = [](const std::shared_ptr<MacroTaskBase>& mtb_ptr) {return mtb_ptr->is_waiting();};
422  auto it=std::find_if(taskq.begin(),taskq.end(),is_Waiting);
423  if (it!=taskq.end()) {
424  it->get()->set_running();
425  long element=it-taskq.begin();
426  return element;
427  }
428 // print("could not find task to schedule");
429  return -1;
430  }
431 
432  /// scheduler is located on rank==0
433  void set_complete(const long task_number) const {
434  this->task(ProcessID(0), &MacroTaskQ::set_complete_local, task_number);
435  }
436 
437  /// scheduler is located on rank==0
438  void set_complete_local(const long task_number) const {
440  taskq[task_number]->set_complete();
441  }
442 
443 public:
444  void static set_pmap(World& world) {
451  }
452 private:
453  std::size_t size() const {
454  return taskq.size();
455  }
456 
457 };
458 
459 
460 
461 
462 template<typename taskT>
463 class MacroTask {
465 
466  template<typename Q>
467  struct is_vector : std::false_type {
468  };
469  template<typename Q>
470  struct is_vector<std::vector<Q>> : std::true_type {
471  };
472 
473  typedef typename taskT::resultT resultT;
474  typedef typename taskT::argtupleT argtupleT;
476  taskT task;
477  bool debug=false;
478  std::string name="unknown_task";
479 
480  /// RAII class to redirect cout to a file
481  struct io_redirect {
482  std::streambuf* stream_buffer_cout;
483  std::ofstream ofile;
484  bool debug=false;
485 
486  io_redirect(const long task_number, std::string filename, bool debug=false) : debug(debug) {
487  constexpr std::size_t bufsize=256;
488  char cfilename[bufsize];
489  std::snprintf(cfilename,bufsize,"%s.%5.5ld",filename.c_str(),task_number);
490  ofile=std::ofstream(cfilename);
491  if (debug) std::cout << "redirecting to file " << cfilename << std::endl;
492  stream_buffer_cout = std::cout.rdbuf(ofile.rdbuf());
493  std::cout.sync_with_stdio(true);
494  }
495 
497  std::cout.rdbuf(stream_buffer_cout);
498  ofile.close();
499  std::cout.sync_with_stdio(true);
500  if (debug) std::cout << "redirecting back to cout" << std::endl;
501  }
502  };
503 
504 
505 public:
506 
507  /// constructor takes the actual task
508  MacroTask(World &world, taskT &task, std::shared_ptr<MacroTaskQ> taskq_ptr = 0)
510  if (taskq_ptr) {
511  // for the time being this condition must hold because tasks are
512  // constructed as replicated objects and are not broadcast to other processes
513  MADNESS_CHECK(world.id()==taskq_ptr->get_world().id());
514  }
515  }
516 
517  void set_debug(const bool value) {
518  debug=value;
519  }
520 
521  /// set a name for this task for debugging and output naming
522  void set_name(const std::string name1) {
523  name=name1;
524  }
525 
526  /// this mimicks the original call to the task functor, called from the universe
527 
528  /// store all input to the cloud, create output Function<T,NDIM> in the universe,
529  /// create the batched task and shove it into the taskq. Possibly execute the taskq.
530  template<typename ... Ts>
531  resultT operator()(const Ts &... args) {
532 
533  const bool immediate_execution = (not taskq_ptr);
534  if (not taskq_ptr) taskq_ptr.reset(new MacroTaskQ(world, world.size()));
535  if (debug) taskq_ptr->set_printlevel(20);
536 
537  auto argtuple = std::tie(args...);
538  static_assert(std::is_same<decltype(argtuple), argtupleT>::value, "type or number of arguments incorrect");
539 
540  // partition the argument vector into batches
541  auto partitioner=task.partitioner;
542  if (not partitioner) partitioner.reset(new MacroTaskPartitioner);
543  partitioner->set_nsubworld(world.size());
544  partitionT partition = partitioner->partition_tasks(argtuple);
545 
546  // store input and output: output being a pointer to a universe function (vector)
547  recordlistT inputrecords = taskq_ptr->cloud.store(world, argtuple);
548  resultT result = task.allocator(world, argtuple);
549  auto outputrecords =prepare_output_records(taskq_ptr->cloud, result);
550 
551  // create tasks and add them to the taskq
552  MacroTaskBase::taskqT vtask;
553  for (const auto& batch_prio : partition) {
554  vtask.push_back(
555  std::shared_ptr<MacroTaskBase>(new MacroTaskInternal(task, batch_prio, inputrecords, outputrecords, name)));
556  }
557  taskq_ptr->add_tasks(vtask);
558 
559  if (immediate_execution) taskq_ptr->run_all();
560 
561  // return std::move(result);
562  return result;
563  }
564 
565 private:
566 
568  std::shared_ptr<MacroTaskQ> taskq_ptr;
569 
570  /// store the result WorldObject in the cloud and return the recordlist
576  "unknown result type in prepare_output_records");
577  recordlistT outputrecords;
578  if constexpr (is_madness_function<resultT>::value) {
579  outputrecords += cloud.store(world, result.get_impl().get()); // store pointer to FunctionImpl
580  } else if constexpr (is_madness_function_vector<resultT>::value) {
581  outputrecords += cloud.store(world, get_impl(result));
582  } else if constexpr (is_scalar_result_ptr<resultT>::value) {
583  outputrecords += cloud.store(world, result); // store pointer to ScalarResult
585  outputrecords+=cloud.store(world,result);
586  } else {
587  MADNESS_EXCEPTION("\n\n unknown result type in prepare_input ", 1);
588  }
589  return outputrecords;
590  }
591 
592 
593  class MacroTaskInternal : public MacroTaskIntermediate<MacroTask> {
594 
595  typedef decay_tuple<typename taskT::argtupleT> argtupleT; // removes const, &, etc
596  typedef typename taskT::resultT resultT;
599  public:
600  taskT task;
601  std::string name="unknown_task"; // for identification in debug output
602 
603  MacroTaskInternal(const taskT &task, const std::pair<Batch,double> &batch_prio,
604  const recordlistT &inputrecords, const recordlistT &outputrecords, std::string name)
610  "unknown result type in MacroTaskInternal constructor");
611  this->task.batch=batch_prio.first;
612  this->priority=batch_prio.second;
613  }
614 
615 
616  virtual void print_me(std::string s="") const {
617  print("this is task",typeid(task).name(),"with batch", task.batch,"priority",this->get_priority());
618  }
619 
620  virtual void print_me_as_table(std::string s="") const {
621  std::stringstream ss;
622  std::string name=typeid(task).name();
623  std::size_t namesize=std::min(std::size_t(28),name.size());
624  name += std::string(28-namesize,' ');
625 
626  std::stringstream ssbatch;
627  ssbatch << task.batch;
628  std::string strbatch=ssbatch.str();
629  int nspaces=std::max(int(0),35-int(ssbatch.str().size()));
630  strbatch+=std::string(nspaces,' ');
631 
632  ss << name
633  << std::setw(10) << strbatch
635  print(ss.str());
636  }
637 
638  void run(World &subworld, Cloud &cloud, MacroTaskBase::taskqT &taskq, const long element, const bool debug) {
639 
640  io_redirect io(element,name+"_task",debug);
641  const argtupleT argtuple = cloud.load<argtupleT>(subworld, inputrecords);
642  const argtupleT batched_argtuple = task.batch.template copy_input_batch(argtuple);
643  try {
644  print("starting task no",element, "in subworld",subworld.id(),"at time",wall_time());
645  double cpu0=cpu_time();
646  resultT result_tmp = std::apply(task, batched_argtuple);
647  double cpu1=cpu_time();
648  std::size_t bufsize=256;
649  char buffer[bufsize];
650  std::snprintf(buffer,bufsize,"completed task %3ld after %6.1fs at time %6.1fs\n",element,cpu1-cpu0,wall_time());
651  print(std::string(buffer));
652 
653  resultT result = get_output(subworld, cloud, argtuple); // lives in the universe
654  if constexpr (is_madness_function<resultT>::value) {
655  result_tmp.compress();
656  gaxpy(1.0,result,1.0, result_tmp);
657  } else if constexpr(is_madness_function_vector<resultT>::value) {
658  compress(subworld, result_tmp);
659  resultT tmp1=task.allocator(subworld,argtuple);
660  tmp1=task.batch.template insert_result_batch(tmp1,result_tmp);
661  gaxpy(1.0,result,1.0,tmp1,false);
662  // was using operator+=, but this requires a fence, which is not allowed here..
663  // result += tmp1;
664  } else if constexpr (is_scalar_result_ptr<resultT>::value) {
665  gaxpy(1.0, *result, 1.0, result_tmp->get_local(), false);
666  } else if constexpr (is_scalar_result_ptr_vector<resultT>::value) {
667  resultT tmp1=task.allocator(subworld,argtuple);
668  tmp1=task.batch.template insert_result_batch(tmp1,result_tmp);
669 
670  std::size_t sz=result.size();
671  for (int i=0; i<sz; ++i) {
672  gaxpy(1.0, *(result[i]), 1.0, tmp1[i]->get_local(), false);
673  }
674  } else {
675  MADNESS_EXCEPTION("failing result",1);
676  }
677  } catch (std::exception& e) {
678  print("failing task no",element,"in subworld",subworld.id(),"at time",wall_time());
679  print(e.what());
680  print("\n\n");
681  MADNESS_EXCEPTION("failing task",1);
682  }
683 
684  };
685 
686  /// get the pointers to the output functions living in the universe
687 
688  /// the result of a task living in subworld will be accumulated into result
689  /// living in the universe
690  resultT get_output(World &subworld, Cloud &cloud, const argtupleT &argtuple) {
691  resultT result;
692  if constexpr (is_madness_function<resultT>::value) {
693  typedef std::shared_ptr<typename resultT::implT> impl_ptrT;
694  result.set_impl(cloud.load<impl_ptrT>(subworld, outputrecords));
695  } else if constexpr (is_madness_function_vector<resultT>::value) {
696  typedef std::shared_ptr<typename resultT::value_type::implT> impl_ptrT;
697  std::vector<impl_ptrT> rimpl = cloud.load<std::vector<impl_ptrT>>(subworld, outputrecords);
698  result.resize(rimpl.size());
699  set_impl(result, rimpl);
700  } else if constexpr (is_scalar_result_ptr<resultT>::value) {
701  result = cloud.load<resultT>(subworld, outputrecords);
702  } else if constexpr (is_scalar_result_ptr_vector<resultT>::value) {
703  typedef typename resultT::value_type::element_type ScalarResultT;
704  result=cloud.load<std::vector<std::shared_ptr<ScalarResultT>>>(subworld, outputrecords);
705 
706  } else {
707  MADNESS_EXCEPTION("unknown result type in get_output", 1);
708  }
709  return result;
710  }
711 
712  };
713 
714 };
715 
717 public:
719  std::string name="unknown_task";
720  std::shared_ptr<MacroTaskPartitioner> partitioner=0;
722 };
723 
724 
725 } /* namespace madness */
726 
727 #endif /* SRC_MADNESS_MRA_MACROTASKQ_H_ */
Wrapper around MPI_Comm. Has a shallow copy constructor; use Create(Get_group()) for deep copy.
Definition: safempi.h:490
Intracomm Split(int Color, int Key=0) const
Definition: safempi.h:635
a batch consists of a 2D-input batch and a 1D-output batch: K-batch <- (I-batch, J-batch)
Definition: macrotaskpartitioner.h:132
cloud class
Definition: cloud.h:147
void clear()
Definition: cloud.h:258
recordlistT store(madness::World &world, const T &source)
Definition: cloud.h:302
void replicate(const std::size_t chunk_size=INT_MAX)
Definition: cloud.h:325
void print_size(World &universe)
Definition: cloud.h:193
T load(madness::World &world, const recordlistT recordlist) const
Definition: cloud.h:274
void clear_cache(World &subworld)
Definition: cloud.h:252
void print_timings(World &universe) const
Definition: cloud.h:230
static std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > & get_pmap()
Returns the default process map.
Definition: funcdefaults.h:488
static void set_default_pmap(World &world)
Sets the default process map.
Definition: mraimpl.h:3550
static void set_pmap(const std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > &value)
Sets the default process map (does not redistribute existing functions)
Definition: funcdefaults.h:504
A future is a possibly yet unevaluated value.
Definition: future.h:373
T & get(bool dowork=true) &
Gets the value, waiting if necessary.
Definition: future.h:574
base class
Definition: macrotaskq.h:178
void set_running()
Definition: macrotaskq.h:190
virtual ~MacroTaskBase()
Definition: macrotaskq.h:184
void set_waiting()
Definition: macrotaskq.h:191
friend std::ostream & operator<<(std::ostream &os, const MacroTaskBase::Status s)
Definition: macrotaskq.h:215
MacroTaskBase()
Definition: macrotaskq.h:183
virtual void cleanup()=0
virtual void print_me(std::string s="") const
Definition: macrotaskq.h:200
std::string print_priority_and_status_to_string() const
Definition: macrotaskq.h:206
void set_complete()
Definition: macrotaskq.h:189
double priority
Definition: macrotaskq.h:186
bool is_complete() const
Definition: macrotaskq.h:193
Status
Definition: macrotaskq.h:187
@ Complete
Definition: macrotaskq.h:187
@ Running
Definition: macrotaskq.h:187
@ Unknown
Definition: macrotaskq.h:187
@ Waiting
Definition: macrotaskq.h:187
bool is_running() const
Definition: macrotaskq.h:194
enum madness::MacroTaskBase::Status stat
double get_priority() const
Definition: macrotaskq.h:212
void set_priority(const double p)
Definition: macrotaskq.h:213
virtual void print_me_as_table(std::string s="") const
Definition: macrotaskq.h:203
std::vector< std::shared_ptr< MacroTaskBase > > taskqT
Definition: macrotaskq.h:181
virtual void run(World &world, Cloud &cloud, taskqT &taskq, const long element, const bool debug)=0
bool is_waiting() const
Definition: macrotaskq.h:195
Definition: macrotaskq.h:226
MacroTaskIntermediate()
Definition: macrotaskq.h:230
void cleanup()
Definition: macrotaskq.h:234
~MacroTaskIntermediate()
Definition: macrotaskq.h:232
Definition: macrotaskq.h:716
MacroTaskOperationBase()
Definition: macrotaskq.h:721
Batch batch
Definition: macrotaskq.h:718
std::shared_ptr< MacroTaskPartitioner > partitioner
Definition: macrotaskq.h:720
std::string name
Definition: macrotaskq.h:719
partition one (two) vectors into 1D (2D) batches.
Definition: macrotaskpartitioner.h:190
std::list< std::pair< Batch, double > > partitionT
Definition: macrotaskpartitioner.h:194
Definition: macrotaskq.h:239
long nsubworld
Definition: macrotaskq.h:246
std::mutex taskq_mutex
Definition: macrotaskq.h:244
bool printdebug() const
Definition: macrotaskq.h:254
static void set_pmap(World &world)
Definition: macrotaskq.h:444
void set_complete(const long task_number) const
scheduler is located on rank==0
Definition: macrotaskq.h:433
World & universe
Definition: macrotaskq.h:241
std::shared_ptr< WorldDCPmapInterface< Key< 1 > > > pmap1
Definition: macrotaskq.h:247
std::shared_ptr< WorldDCPmapInterface< Key< 3 > > > pmap3
Definition: macrotaskq.h:249
MacroTaskBase::taskqT taskq
Definition: macrotaskq.h:243
void run_all(MacroTaskBase::taskqT vtask=MacroTaskBase::taskqT())
run all tasks, tasks may store the results in the cloud
Definition: macrotaskq.h:296
void add_tasks(MacroTaskBase::taskqT &vtask)
Definition: macrotaskq.h:381
long get_scheduled_task_number(World &subworld)
scheduler is located on universe.rank==0
Definition: macrotaskq.h:405
void set_complete_local(const long task_number) const
scheduler is located on rank==0
Definition: macrotaskq.h:438
static std::shared_ptr< World > create_worlds(World &universe, const std::size_t nsubworld)
Definition: macrotaskq.h:283
MacroTaskQ(World &universe, int nworld, const long printlevel=0)
create an empty taskq and initialize the subworlds
Definition: macrotaskq.h:266
std::shared_ptr< WorldDCPmapInterface< Key< 4 > > > pmap4
Definition: macrotaskq.h:250
std::size_t size() const
Definition: macrotaskq.h:453
std::shared_ptr< WorldDCPmapInterface< Key< 5 > > > pmap5
Definition: macrotaskq.h:251
void print_taskq() const
Definition: macrotaskq.h:388
std::shared_ptr< World > subworld_ptr
Definition: macrotaskq.h:242
bool printtimings() const
Definition: macrotaskq.h:256
std::shared_ptr< WorldDCPmapInterface< Key< 2 > > > pmap2
Definition: macrotaskq.h:248
bool printprogress() const
Definition: macrotaskq.h:255
World & get_subworld()
Definition: macrotaskq.h:261
std::shared_ptr< WorldDCPmapInterface< Key< 6 > > > pmap6
Definition: macrotaskq.h:252
void set_printlevel(const long p)
Definition: macrotaskq.h:263
long printlevel
Definition: macrotaskq.h:245
madness::Cloud cloud
Definition: macrotaskq.h:260
long get_nsubworld() const
Definition: macrotaskq.h:262
void add_replicated_task(const std::shared_ptr< MacroTaskBase > &task)
Definition: macrotaskq.h:400
~MacroTaskQ()
Definition: macrotaskq.h:279
long get_scheduled_task_number_local()
Definition: macrotaskq.h:417
Definition: macrotaskq.h:593
taskT task
Definition: macrotaskq.h:600
decay_tuple< typename taskT::argtupleT > argtupleT
Definition: macrotaskq.h:595
virtual void print_me_as_table(std::string s="") const
Definition: macrotaskq.h:620
std::string name
Definition: macrotaskq.h:601
taskT::resultT resultT
Definition: macrotaskq.h:596
void run(World &subworld, Cloud &cloud, MacroTaskBase::taskqT &taskq, const long element, const bool debug)
Definition: macrotaskq.h:638
recordlistT outputrecords
Definition: macrotaskq.h:598
resultT get_output(World &subworld, Cloud &cloud, const argtupleT &argtuple)
get the pointers to the output functions living in the universe
Definition: macrotaskq.h:690
virtual void print_me(std::string s="") const
Definition: macrotaskq.h:616
MacroTaskInternal(const taskT &task, const std::pair< Batch, double > &batch_prio, const recordlistT &inputrecords, const recordlistT &outputrecords, std::string name)
Definition: macrotaskq.h:603
recordlistT inputrecords
Definition: macrotaskq.h:597
Definition: macrotaskq.h:463
void set_name(const std::string name1)
set a name for this task for debugging and output naming
Definition: macrotaskq.h:522
taskT::resultT resultT
Definition: macrotaskq.h:473
std::shared_ptr< MacroTaskQ > taskq_ptr
Definition: macrotaskq.h:568
taskT::argtupleT argtupleT
Definition: macrotaskq.h:474
MacroTask(World &world, taskT &task, std::shared_ptr< MacroTaskQ > taskq_ptr=0)
constructor takes the actual task
Definition: macrotaskq.h:508
World & world
Definition: macrotaskq.h:567
resultT operator()(const Ts &... args)
this mimicks the original call to the task functor, called from the universe
Definition: macrotaskq.h:531
bool debug
Definition: macrotaskq.h:477
taskT task
Definition: macrotaskq.h:476
MacroTaskPartitioner::partitionT partitionT
Definition: macrotaskq.h:464
std::string name
Definition: macrotaskq.h:478
Cloud::recordlistT recordlistT
Definition: macrotaskq.h:475
void set_debug(const bool value)
Definition: macrotaskq.h:517
recordlistT prepare_output_records(Cloud &cloud, resultT &result)
store the result WorldObject in the cloud and return the recordlist
Definition: macrotaskq.h:571
helper class for returning the result of a task, which is not a madness Function, but a simple scalar
Definition: macrotaskq.h:43
void serialize(Archive &ar)
Definition: macrotaskq.h:83
void gaxpy(const double a, const T &right, double b, const bool fence=true)
accumulate, optional fence
Definition: macrotaskq.h:75
T get()
after completion of the taskq get the final value
Definition: macrotaskq.h:88
ScalarResult(World &world)
Definition: macrotaskq.h:46
T value
the scalar value
Definition: macrotaskq.h:101
~ScalarResult()
Definition: macrotaskq.h:58
ScalarResult< T > & operator=(ScalarResult< T > &&)=default
ScalarResult< T > & operator+=(const T &x)
Definition: macrotaskq.h:69
T get_local() const
Definition: macrotaskq.h:95
T value_type
Definition: macrotaskq.h:45
ScalarResult< T > & operator=(const ScalarResult< T > &other)=delete
disable assignment operator
ScalarResult< T > & operator=(const T &x)
simple assignment of the scalar value
Definition: macrotaskq.h:64
void broadcast_serializable(objT &obj, ProcessID root)
Broadcast a serializable object.
Definition: worldgop.h:754
void fence(bool debug=false)
Synchronizes all processes in communicator AND globally ensures no pending AM or tasks.
Definition: worldgop.cc:161
bool set_forbid_fence(bool value)
Set forbid_fence flag to new value and return old value.
Definition: worldgop.h:674
void sum(T *buf, size_t nelem)
Inplace global sum while still processing AM & tasks.
Definition: worldgop.h:870
SafeMPI::Intracomm & comm()
Returns the associated SafeMPI communicator.
Definition: worldmpi.h:286
Implements most parts of a globally addressable object (via unique ID).
Definition: world_object.h:364
detail::task_result_type< memfnT >::futureT send(ProcessID dest, memfnT memfn) const
Definition: world_object.h:731
World & world
The World this object belongs to. (Think globally, act locally).
Definition: world_object.h:381
void process_pending()
To be called from derived constructor to process pending messages.
Definition: world_object.h:656
detail::task_result_type< memfnT >::futureT task(ProcessID dest, memfnT memfn, const TaskAttributes &attr=TaskAttributes()) const
Sends task to derived class method returnT (this->*memfn)().
Definition: world_object.h:1005
World & get_world() const
Returns a reference to the world.
Definition: world_object.h:717
A parallel world class.
Definition: world.h:132
static World * world_from_id(std::uint64_t id)
Convert a World ID to a World pointer.
Definition: world.h:474
ProcessID rank() const
Returns the process rank in this World (same as MPI_Comm_rank()).
Definition: world.h:318
WorldMpiInterface & mpi
MPI interface.
Definition: world.h:202
std::optional< T * > ptr_from_id(uniqueidT id) const
Look up a local pointer from a world-wide unique ID.
Definition: world.h:414
ProcessID size() const
Returns the number of processes in this World (same as MPI_Comm_size()).
Definition: world.h:328
unsigned long id() const
Definition: world.h:313
WorldGopInterface & gop
Global operations.
Definition: world.h:205
Class for unique global IDs.
Definition: uniqueid.h:53
Declares the Cloud class for storing data and transfering them between worlds.
const std::size_t bufsize
Definition: derivatives.cc:16
char * p(char *buf, const char *name, int k, int initial_level, double thresh, int order)
Definition: derivatives.cc:72
static bool debug
Definition: dirac-hatom.cc:16
Fcwf apply(World &world, real_convolution_3d &op, const Fcwf &psi)
Definition: fcwf.cc:281
auto T(World &world, response_space &f) -> response_space
Definition: global_functions.cc:34
static const double v
Definition: hatom_sf_dirac.cc:20
#define max(a, b)
Definition: lda.h:51
#define MADNESS_CHECK(condition)
Check a condition — even in a release build the condition is always evaluated so it can have side eff...
Definition: madness_exception.h:190
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition: madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition: madness_exception.h:134
File holds all helper structures necessary for the CC_Operator and CC2 class.
Definition: DFParameters.h:10
static const char * filename
Definition: legendre.cc:96
static double cpu_time()
Returns the cpu time in seconds relative to an arbitrary origin.
Definition: timers.h:127
void set_impl(std::vector< Function< T, NDIM >> &v, const std::vector< std::shared_ptr< FunctionImpl< T, NDIM >>> vimpl)
Definition: vmra.h:670
void compress(World &world, const std::vector< Function< T, NDIM > > &v, bool fence=true)
Compress a vector of functions.
Definition: vmra.h:133
std::vector< std::shared_ptr< FunctionImpl< T, NDIM > > > get_impl(const std::vector< Function< T, NDIM >> &v)
Definition: vmra.h:663
static const Slice _(0,-1, 1)
void print(const T &t, const Ts &... ts)
Print items to std::cout (items separated by spaces) and terminate with a new line.
Definition: print.h:225
decltype(decay_types(std::declval< T >())) decay_tuple
Definition: macrotaskpartitioner.h:22
double wall_time()
Returns the wall time in seconds relative to an arbitrary origin.
Definition: timers.cc:48
std::vector< std::shared_ptr< ScalarResult< T > > > scalar_result_shared_ptr_vector(World &world, std::size_t n)
helper function to create a vector of ScalarResult, circumventing problems with the constructors
Definition: macrotaskq.h:106
void gaxpy(const double a, ScalarResult< T > &left, const double b, const T &right, const bool fence=true)
the result type of a macrotask must implement gaxpy
Definition: macrotaskq.h:140
Definition: mraimpl.h:50
static const double b
Definition: nonlinschro.cc:119
static const double a
Definition: nonlinschro.cc:118
RAII class to redirect cout to a file.
Definition: macrotaskq.h:481
io_redirect(const long task_number, std::string filename, bool debug=false)
Definition: macrotaskq.h:486
std::streambuf * stream_buffer_cout
Definition: macrotaskq.h:482
~io_redirect()
Definition: macrotaskq.h:496
bool debug
Definition: macrotaskq.h:484
std::ofstream ofile
Definition: macrotaskq.h:483
Definition: macrotaskq.h:467
Definition: cloud.h:34
Default load of an object via serialize(ar, t).
Definition: archive.h:666
Default store of an object via serialize(ar, t).
Definition: archive.h:611
Definition: macrotaskpartitioner.h:25
Definition: mra.h:2761
Definition: macrotaskq.h:129
Definition: macrotaskq.h:123
static void load(const Archive &ar, std::shared_ptr< ScalarResult< T >> &ptr)
Definition: macrotaskq.h:156
static void store(const Archive &ar, const std::shared_ptr< ScalarResult< T >> &ptr)
Definition: macrotaskq.h:146
void e()
Definition: test_sig.cc:75
Declares the World class for the parallel runtime environment.
int ProcessID
Used to clearly identify process number/rank.
Definition: worldtypes.h:43