29 #ifndef SRC_MADNESS_MRA_MACROTASKQ_H_
30 #define SRC_MADNESS_MRA_MACROTASKQ_H_
42 template<
typename T=
double>
70 gaxpy(1.0, x, 1.0,
true);
75 void gaxpy(
const double a,
const T& right,
double b,
const bool fence=
true) {
82 template<
typename Archive>
107 auto v=std::vector<std::shared_ptr<ScalarResult<T>>>();
108 for (std::size_t i=0; i<n; ++i)
v.emplace_back(std::make_shared<
ScalarResult<T>>(world));
110 std::cout << std::flush;
113 MADNESS_EXCEPTION(
"ScalarResult: remote operation attempting to use a locally uninitialized object",0);
116 MADNESS_EXCEPTION(
"ScalarResult<T> operation attempting to use an unregistered object",0);
125 template <
typename T>
141 left.
gaxpy(
a, right,
b, fence);
144 template <
class Archive,
typename T>
147 bool exists=(ptr) ?
true :
false;
149 if (exists) ar & ptr->id();
154 template <
class Archive,
typename T>
166 MADNESS_EXCEPTION(
"ScalarResult: remote operation attempting to use a locally uninitialized object",0);
169 MADNESS_EXCEPTION(
"ScalarResult<T> operation attempting to use an unregistered object",0);
181 typedef std::vector<std::shared_ptr<MacroTaskBase> >
taskqT;
201 printf(
"this is task with priority %4.1f\n",
priority);
204 print(
"nothing to print");
207 std::stringstream ss;
216 if (s==MacroTaskBase::Status::Running) os <<
"Running";
217 if (s==MacroTaskBase::Status::Waiting) os <<
"Waiting";
218 if (s==MacroTaskBase::Status::Complete) os <<
"Complete";
219 if (s==MacroTaskBase::Status::Unknown) os <<
"Unknown";
225 template<
typename macrotaskT>
247 std::shared_ptr< WorldDCPmapInterface< Key<1> > >
pmap1;
248 std::shared_ptr< WorldDCPmapInterface< Key<2> > >
pmap2;
249 std::shared_ptr< WorldDCPmapInterface< Key<3> > >
pmap3;
250 std::shared_ptr< WorldDCPmapInterface< Key<4> > >
pmap4;
251 std::shared_ptr< WorldDCPmapInterface< Key<5> > >
pmap5;
252 std::shared_ptr< WorldDCPmapInterface< Key<6> > >
pmap6;
288 std::shared_ptr<World> all_worlds;
289 all_worlds.reset(
new World(comm));
298 for (
const auto& t : vtask)
if (
universe.
rank()==0) t->set_waiting();
302 print(
"number of tasks in taskq",
taskq.size());
303 print(
"redirecting output to files task.#####");
330 if (element<0)
break;
331 std::shared_ptr<MacroTaskBase>
task=
taskq[element];
338 tasktime+=(cpu1-cpu0);
339 if (
printdebug()) printf(
"completed task %3ld after %6.1fs at time %6.1fs\n",element,cpu1-cpu0,
wall_time());
342 const std::size_t ntask=
taskq.size();
344 auto in_percentile = [&ntask](
const long element) {
345 return std::floor(element/(0.1*(ntask+1)));
347 auto is_first_in_percentile = [&](
const long element) {
348 return (in_percentile(element)!=in_percentile(element-1));
351 std::cout << int(in_percentile(element)*10) <<
" " << std::flush;
361 printf(
"completed taskqueue after %4.1fs at time %4.1fs\n", cpu11 - cpu00,
wall_time());
362 printf(
" total cpu time / per world %4.1fs %4.1fs\n", tasktime, tasktime /
universe.
size());
382 for (
const auto& t : vtask) {
392 print(
"total number of tasks: ",
taskq.size());
393 print(
" task batch priority status");
394 for (
const auto& t :
taskq) t->print_me_as_table();
407 if (subworld.
rank()==0) {
421 auto is_Waiting = [](
const std::shared_ptr<MacroTaskBase>& mtb_ptr) {
return mtb_ptr->is_waiting();};
422 auto it=std::find_if(
taskq.begin(),
taskq.end(),is_Waiting);
423 if (it!=
taskq.end()) {
424 it->get()->set_running();
425 long element=it-
taskq.begin();
440 taskq[task_number]->set_complete();
462 template<
typename taskT>
478 std::string
name=
"unknown_task";
487 constexpr std::size_t
bufsize=256;
489 std::snprintf(cfilename,
bufsize,
"%s.%5.5ld",
filename.c_str(),task_number);
490 ofile=std::ofstream(cfilename);
491 if (
debug) std::cout <<
"redirecting to file " << cfilename << std::endl;
493 std::cout.sync_with_stdio(
true);
499 std::cout.sync_with_stdio(
true);
500 if (
debug) std::cout <<
"redirecting back to cout" << std::endl;
530 template<
typename ... Ts>
533 const bool immediate_execution = (not
taskq_ptr);
537 auto argtuple = std::tie(args...);
538 static_assert(std::is_same<decltype(argtuple),
argtupleT>::value,
"type or number of arguments incorrect");
541 auto partitioner=
task.partitioner;
543 partitioner->set_nsubworld(
world.
size());
544 partitionT partition = partitioner->partition_tasks(argtuple);
553 for (
const auto& batch_prio : partition) {
559 if (immediate_execution)
taskq_ptr->run_all();
576 "unknown result type in prepare_output_records");
579 outputrecords += cloud.
store(
world, result.get_impl().get());
589 return outputrecords;
601 std::string
name=
"unknown_task";
610 "unknown result type in MacroTaskInternal constructor");
611 this->task.batch=batch_prio.first;
617 print(
"this is task",
typeid(
task).
name(),
"with batch",
task.batch,
"priority",this->get_priority());
621 std::stringstream ss;
623 std::size_t namesize=std::min(std::size_t(28),
name.size());
624 name += std::string(28-namesize,
' ');
626 std::stringstream ssbatch;
627 ssbatch <<
task.batch;
628 std::string strbatch=ssbatch.str();
629 int nspaces=
std::max(
int(0),35-
int(ssbatch.str().size()));
630 strbatch+=std::string(nspaces,
' ');
633 << std::setw(10) << strbatch
642 const argtupleT batched_argtuple =
task.batch.template copy_input_batch(argtuple);
644 print(
"starting task no",element,
"in subworld",subworld.
id(),
"at time",
wall_time());
650 std::snprintf(buffer,
bufsize,
"completed task %3ld after %6.1fs at time %6.1fs\n",element,cpu1-cpu0,
wall_time());
651 print(std::string(buffer));
655 result_tmp.compress();
656 gaxpy(1.0,result,1.0, result_tmp);
660 tmp1=
task.batch.template insert_result_batch(tmp1,result_tmp);
661 gaxpy(1.0,result,1.0,tmp1,
false);
665 gaxpy(1.0, *result, 1.0, result_tmp->get_local(),
false);
668 tmp1=
task.batch.template insert_result_batch(tmp1,result_tmp);
670 std::size_t sz=result.size();
671 for (
int i=0; i<sz; ++i) {
672 gaxpy(1.0, *(result[i]), 1.0, tmp1[i]->get_local(),
false);
677 }
catch (std::exception&
e) {
678 print(
"failing task no",element,
"in subworld",subworld.
id(),
"at time",
wall_time());
693 typedef std::shared_ptr<typename resultT::implT> impl_ptrT;
696 typedef std::shared_ptr<typename resultT::value_type::implT> impl_ptrT;
697 std::vector<impl_ptrT> rimpl = cloud.
load<std::vector<impl_ptrT>>(subworld,
outputrecords);
698 result.resize(rimpl.size());
703 typedef typename resultT::value_type::element_type ScalarResultT;
704 result=cloud.
load<std::vector<std::shared_ptr<ScalarResultT>>>(subworld,
outputrecords);
719 std::string
name=
"unknown_task";
Wrapper around MPI_Comm. Has a shallow copy constructor; use Create(Get_group()) for deep copy.
Definition: safempi.h:490
Intracomm Split(int Color, int Key=0) const
Definition: safempi.h:635
a batch consists of a 2D-input batch and a 1D-output batch: K-batch <- (I-batch, J-batch)
Definition: macrotaskpartitioner.h:132
cloud class
Definition: cloud.h:147
void clear()
Definition: cloud.h:258
recordlistT store(madness::World &world, const T &source)
Definition: cloud.h:302
void replicate(const std::size_t chunk_size=INT_MAX)
Definition: cloud.h:325
void print_size(World &universe)
Definition: cloud.h:193
T load(madness::World &world, const recordlistT recordlist) const
Definition: cloud.h:274
void clear_cache(World &subworld)
Definition: cloud.h:252
void print_timings(World &universe) const
Definition: cloud.h:230
static std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > & get_pmap()
Returns the default process map.
Definition: funcdefaults.h:488
static void set_default_pmap(World &world)
Sets the default process map.
Definition: mraimpl.h:3550
static void set_pmap(const std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > &value)
Sets the default process map (does not redistribute existing functions)
Definition: funcdefaults.h:504
A future is a possibly yet unevaluated value.
Definition: future.h:373
T & get(bool dowork=true) &
Gets the value, waiting if necessary.
Definition: future.h:574
base class
Definition: macrotaskq.h:178
void set_running()
Definition: macrotaskq.h:190
virtual ~MacroTaskBase()
Definition: macrotaskq.h:184
void set_waiting()
Definition: macrotaskq.h:191
friend std::ostream & operator<<(std::ostream &os, const MacroTaskBase::Status s)
Definition: macrotaskq.h:215
MacroTaskBase()
Definition: macrotaskq.h:183
virtual void print_me(std::string s="") const
Definition: macrotaskq.h:200
std::string print_priority_and_status_to_string() const
Definition: macrotaskq.h:206
void set_complete()
Definition: macrotaskq.h:189
double priority
Definition: macrotaskq.h:186
bool is_complete() const
Definition: macrotaskq.h:193
Status
Definition: macrotaskq.h:187
@ Complete
Definition: macrotaskq.h:187
@ Running
Definition: macrotaskq.h:187
@ Unknown
Definition: macrotaskq.h:187
@ Waiting
Definition: macrotaskq.h:187
bool is_running() const
Definition: macrotaskq.h:194
enum madness::MacroTaskBase::Status stat
double get_priority() const
Definition: macrotaskq.h:212
void set_priority(const double p)
Definition: macrotaskq.h:213
virtual void print_me_as_table(std::string s="") const
Definition: macrotaskq.h:203
std::vector< std::shared_ptr< MacroTaskBase > > taskqT
Definition: macrotaskq.h:181
virtual void run(World &world, Cloud &cloud, taskqT &taskq, const long element, const bool debug)=0
bool is_waiting() const
Definition: macrotaskq.h:195
Definition: macrotaskq.h:716
MacroTaskOperationBase()
Definition: macrotaskq.h:721
Batch batch
Definition: macrotaskq.h:718
std::shared_ptr< MacroTaskPartitioner > partitioner
Definition: macrotaskq.h:720
std::string name
Definition: macrotaskq.h:719
partition one (two) vectors into 1D (2D) batches.
Definition: macrotaskpartitioner.h:190
std::list< std::pair< Batch, double > > partitionT
Definition: macrotaskpartitioner.h:194
Definition: macrotaskq.h:239
long nsubworld
Definition: macrotaskq.h:246
std::mutex taskq_mutex
Definition: macrotaskq.h:244
bool printdebug() const
Definition: macrotaskq.h:254
static void set_pmap(World &world)
Definition: macrotaskq.h:444
void set_complete(const long task_number) const
scheduler is located on rank==0
Definition: macrotaskq.h:433
World & universe
Definition: macrotaskq.h:241
std::shared_ptr< WorldDCPmapInterface< Key< 1 > > > pmap1
Definition: macrotaskq.h:247
std::shared_ptr< WorldDCPmapInterface< Key< 3 > > > pmap3
Definition: macrotaskq.h:249
MacroTaskBase::taskqT taskq
Definition: macrotaskq.h:243
void run_all(MacroTaskBase::taskqT vtask=MacroTaskBase::taskqT())
run all tasks, tasks may store the results in the cloud
Definition: macrotaskq.h:296
void add_tasks(MacroTaskBase::taskqT &vtask)
Definition: macrotaskq.h:381
long get_scheduled_task_number(World &subworld)
scheduler is located on universe.rank==0
Definition: macrotaskq.h:405
void set_complete_local(const long task_number) const
scheduler is located on rank==0
Definition: macrotaskq.h:438
static std::shared_ptr< World > create_worlds(World &universe, const std::size_t nsubworld)
Definition: macrotaskq.h:283
MacroTaskQ(World &universe, int nworld, const long printlevel=0)
create an empty taskq and initialize the subworlds
Definition: macrotaskq.h:266
std::shared_ptr< WorldDCPmapInterface< Key< 4 > > > pmap4
Definition: macrotaskq.h:250
std::size_t size() const
Definition: macrotaskq.h:453
std::shared_ptr< WorldDCPmapInterface< Key< 5 > > > pmap5
Definition: macrotaskq.h:251
void print_taskq() const
Definition: macrotaskq.h:388
std::shared_ptr< World > subworld_ptr
Definition: macrotaskq.h:242
bool printtimings() const
Definition: macrotaskq.h:256
std::shared_ptr< WorldDCPmapInterface< Key< 2 > > > pmap2
Definition: macrotaskq.h:248
bool printprogress() const
Definition: macrotaskq.h:255
World & get_subworld()
Definition: macrotaskq.h:261
std::shared_ptr< WorldDCPmapInterface< Key< 6 > > > pmap6
Definition: macrotaskq.h:252
void set_printlevel(const long p)
Definition: macrotaskq.h:263
long printlevel
Definition: macrotaskq.h:245
madness::Cloud cloud
Definition: macrotaskq.h:260
long get_nsubworld() const
Definition: macrotaskq.h:262
void add_replicated_task(const std::shared_ptr< MacroTaskBase > &task)
Definition: macrotaskq.h:400
~MacroTaskQ()
Definition: macrotaskq.h:279
long get_scheduled_task_number_local()
Definition: macrotaskq.h:417
Definition: macrotaskq.h:593
taskT task
Definition: macrotaskq.h:600
decay_tuple< typename taskT::argtupleT > argtupleT
Definition: macrotaskq.h:595
virtual void print_me_as_table(std::string s="") const
Definition: macrotaskq.h:620
std::string name
Definition: macrotaskq.h:601
taskT::resultT resultT
Definition: macrotaskq.h:596
void run(World &subworld, Cloud &cloud, MacroTaskBase::taskqT &taskq, const long element, const bool debug)
Definition: macrotaskq.h:638
recordlistT outputrecords
Definition: macrotaskq.h:598
resultT get_output(World &subworld, Cloud &cloud, const argtupleT &argtuple)
get the pointers to the output functions living in the universe
Definition: macrotaskq.h:690
virtual void print_me(std::string s="") const
Definition: macrotaskq.h:616
MacroTaskInternal(const taskT &task, const std::pair< Batch, double > &batch_prio, const recordlistT &inputrecords, const recordlistT &outputrecords, std::string name)
Definition: macrotaskq.h:603
recordlistT inputrecords
Definition: macrotaskq.h:597
Definition: macrotaskq.h:463
void set_name(const std::string name1)
set a name for this task for debugging and output naming
Definition: macrotaskq.h:522
taskT::resultT resultT
Definition: macrotaskq.h:473
std::shared_ptr< MacroTaskQ > taskq_ptr
Definition: macrotaskq.h:568
taskT::argtupleT argtupleT
Definition: macrotaskq.h:474
MacroTask(World &world, taskT &task, std::shared_ptr< MacroTaskQ > taskq_ptr=0)
constructor takes the actual task
Definition: macrotaskq.h:508
World & world
Definition: macrotaskq.h:567
resultT operator()(const Ts &... args)
this mimicks the original call to the task functor, called from the universe
Definition: macrotaskq.h:531
bool debug
Definition: macrotaskq.h:477
taskT task
Definition: macrotaskq.h:476
MacroTaskPartitioner::partitionT partitionT
Definition: macrotaskq.h:464
std::string name
Definition: macrotaskq.h:478
Cloud::recordlistT recordlistT
Definition: macrotaskq.h:475
void set_debug(const bool value)
Definition: macrotaskq.h:517
recordlistT prepare_output_records(Cloud &cloud, resultT &result)
store the result WorldObject in the cloud and return the recordlist
Definition: macrotaskq.h:571
helper class for returning the result of a task, which is not a madness Function, but a simple scalar
Definition: macrotaskq.h:43
void serialize(Archive &ar)
Definition: macrotaskq.h:83
void gaxpy(const double a, const T &right, double b, const bool fence=true)
accumulate, optional fence
Definition: macrotaskq.h:75
T get()
after completion of the taskq get the final value
Definition: macrotaskq.h:88
ScalarResult(World &world)
Definition: macrotaskq.h:46
T value
the scalar value
Definition: macrotaskq.h:101
~ScalarResult()
Definition: macrotaskq.h:58
ScalarResult< T > & operator=(ScalarResult< T > &&)=default
ScalarResult< T > & operator+=(const T &x)
Definition: macrotaskq.h:69
T get_local() const
Definition: macrotaskq.h:95
T value_type
Definition: macrotaskq.h:45
ScalarResult< T > & operator=(const ScalarResult< T > &other)=delete
disable assignment operator
ScalarResult< T > & operator=(const T &x)
simple assignment of the scalar value
Definition: macrotaskq.h:64
void broadcast_serializable(objT &obj, ProcessID root)
Broadcast a serializable object.
Definition: worldgop.h:754
void fence(bool debug=false)
Synchronizes all processes in communicator AND globally ensures no pending AM or tasks.
Definition: worldgop.cc:161
bool set_forbid_fence(bool value)
Set forbid_fence flag to new value and return old value.
Definition: worldgop.h:674
void sum(T *buf, size_t nelem)
Inplace global sum while still processing AM & tasks.
Definition: worldgop.h:870
SafeMPI::Intracomm & comm()
Returns the associated SafeMPI communicator.
Definition: worldmpi.h:286
Implements most parts of a globally addressable object (via unique ID).
Definition: world_object.h:364
detail::task_result_type< memfnT >::futureT send(ProcessID dest, memfnT memfn) const
Definition: world_object.h:731
World & world
The World this object belongs to. (Think globally, act locally).
Definition: world_object.h:381
void process_pending()
To be called from derived constructor to process pending messages.
Definition: world_object.h:656
detail::task_result_type< memfnT >::futureT task(ProcessID dest, memfnT memfn, const TaskAttributes &attr=TaskAttributes()) const
Sends task to derived class method returnT (this->*memfn)().
Definition: world_object.h:1005
World & get_world() const
Returns a reference to the world.
Definition: world_object.h:717
A parallel world class.
Definition: world.h:132
static World * world_from_id(std::uint64_t id)
Convert a World ID to a World pointer.
Definition: world.h:474
ProcessID rank() const
Returns the process rank in this World (same as MPI_Comm_rank()).
Definition: world.h:318
WorldMpiInterface & mpi
MPI interface.
Definition: world.h:202
std::optional< T * > ptr_from_id(uniqueidT id) const
Look up a local pointer from a world-wide unique ID.
Definition: world.h:414
ProcessID size() const
Returns the number of processes in this World (same as MPI_Comm_size()).
Definition: world.h:328
unsigned long id() const
Definition: world.h:313
WorldGopInterface & gop
Global operations.
Definition: world.h:205
Class for unique global IDs.
Definition: uniqueid.h:53
Declares the Cloud class for storing data and transfering them between worlds.
const std::size_t bufsize
Definition: derivatives.cc:16
char * p(char *buf, const char *name, int k, int initial_level, double thresh, int order)
Definition: derivatives.cc:72
static bool debug
Definition: dirac-hatom.cc:16
Fcwf apply(World &world, real_convolution_3d &op, const Fcwf &psi)
Definition: fcwf.cc:281
auto T(World &world, response_space &f) -> response_space
Definition: global_functions.cc:34
static const double v
Definition: hatom_sf_dirac.cc:20
#define max(a, b)
Definition: lda.h:51
#define MADNESS_CHECK(condition)
Check a condition — even in a release build the condition is always evaluated so it can have side eff...
Definition: madness_exception.h:190
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition: madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition: madness_exception.h:134
File holds all helper structures necessary for the CC_Operator and CC2 class.
Definition: DFParameters.h:10
static const char * filename
Definition: legendre.cc:96
static double cpu_time()
Returns the cpu time in seconds relative to an arbitrary origin.
Definition: timers.h:127
void set_impl(std::vector< Function< T, NDIM >> &v, const std::vector< std::shared_ptr< FunctionImpl< T, NDIM >>> vimpl)
Definition: vmra.h:670
void compress(World &world, const std::vector< Function< T, NDIM > > &v, bool fence=true)
Compress a vector of functions.
Definition: vmra.h:133
std::vector< std::shared_ptr< FunctionImpl< T, NDIM > > > get_impl(const std::vector< Function< T, NDIM >> &v)
Definition: vmra.h:663
static const Slice _(0,-1, 1)
void print(const T &t, const Ts &... ts)
Print items to std::cout (items separated by spaces) and terminate with a new line.
Definition: print.h:225
decltype(decay_types(std::declval< T >())) decay_tuple
Definition: macrotaskpartitioner.h:22
double wall_time()
Returns the wall time in seconds relative to an arbitrary origin.
Definition: timers.cc:48
std::vector< std::shared_ptr< ScalarResult< T > > > scalar_result_shared_ptr_vector(World &world, std::size_t n)
helper function to create a vector of ScalarResult, circumventing problems with the constructors
Definition: macrotaskq.h:106
void gaxpy(const double a, ScalarResult< T > &left, const double b, const T &right, const bool fence=true)
the result type of a macrotask must implement gaxpy
Definition: macrotaskq.h:140
static const double b
Definition: nonlinschro.cc:119
static const double a
Definition: nonlinschro.cc:118
RAII class to redirect cout to a file.
Definition: macrotaskq.h:481
io_redirect(const long task_number, std::string filename, bool debug=false)
Definition: macrotaskq.h:486
std::streambuf * stream_buffer_cout
Definition: macrotaskq.h:482
~io_redirect()
Definition: macrotaskq.h:496
bool debug
Definition: macrotaskq.h:484
std::ofstream ofile
Definition: macrotaskq.h:483
Definition: macrotaskq.h:467
Default load of an object via serialize(ar, t).
Definition: archive.h:666
Default store of an object via serialize(ar, t).
Definition: archive.h:611
Definition: macrotaskpartitioner.h:25
Definition: macrotaskq.h:129
Definition: macrotaskq.h:123
static void load(const Archive &ar, std::shared_ptr< ScalarResult< T >> &ptr)
Definition: macrotaskq.h:156
static void store(const Archive &ar, const std::shared_ptr< ScalarResult< T >> &ptr)
Definition: macrotaskq.h:146
void e()
Definition: test_sig.cc:75
Declares the World class for the parallel runtime environment.
int ProcessID
Used to clearly identify process number/rank.
Definition: worldtypes.h:43