40#ifndef SRC_MADNESS_MRA_MACROTASKQ_H_
41#define SRC_MADNESS_MRA_MACROTASKQ_H_
79 gaxpy(1.0, x, 1.0,
true);
84 void gaxpy(
const double a,
const T& right,
double b,
const bool fence=
true) {
91 template<
typename Archive>
113template<
typename T=
double>
131 void set_impl(
const std::shared_ptr<implT>& newimpl) {
140 void gaxpy(
const double a,
const T& right,
double b,
const bool fence=
true) {
141 impl->gaxpy(
a,right,
b,fence);
144 template<
typename Archive>
156 return impl->get_local();
164 std::vector<ScalarResult<T>>
v;
217template<
typename>
struct is_tuple : std::false_type { };
218template<
typename ...T>
struct is_tuple<
std::tuple<T...>> : std::true_type { };
221template<
typename tupleT, std::
size_t I>
224 typedef decay_tuple <tupleT> argtupleT;
226 if constexpr(
I >= std::tuple_size_v<tupleT>) {
230 using typeT =
typename std::tuple_element<I, argtupleT>::type;
231 if constexpr (not is_valid_task_result_v<typeT>) {
235 return check_tuple_is_valid_task_result<tupleT,I+1>();
245 left.
gaxpy(
a, right,
b, fence);
248template <
class Archive,
typename T>
251 bool exists=(ptr) ?
true :
false;
253 if (exists) ar & ptr->id();
258template <
class Archive,
typename T>
270 MADNESS_EXCEPTION(
"ScalarResultImpl: remote operation attempting to use a locally uninitialized object",0);
273 MADNESS_EXCEPTION(
"ScalarResultImpl<T> operation attempting to use an unregistered object",0);
315 if (
name==
"default") {
319 }
else if (
name==
"node_replicated_target") {
323 }
else if (
name==
"small_memory") {
327 }
else if (
name==
"large_memory") {
332 std::string msg=
"MacroTaskQFactory::preset: unknown preset "+
name;
339 return {
"default",
"node_replicated_target",
"small_memory",
"large_memory"};
344 std::vector<MacroTaskInfo> result;
361 }
else if (store_pointer_in_cloud) {
367 if (not good) std::cout << *this ;
375 std::string msg=
"expected 3 policies, got "+std::to_string(
vec.size());
378 auto remove_quotes = [](
const std::string& s) {
379 std::string result=s;
380 if (s.size()>=2 and s.front()==
'"' and s.back()==
'"') {
381 result=s.substr(1,s.size()-2);
386 std::string sstorage=remove_quotes(
vec[0]);
391 std::string msg=
"unknown storage policy: "+sstorage;
396 std::string scloud=remove_quotes(
vec[1]);
401 std::string msg=
"unknown cloud distribution policy: "+scloud;
406 std::string sptrtarget=remove_quotes(
vec[2]);
411 std::string msg=
"unknown ptr target distribution policy: "+sptrtarget;
415 print(
"set macrotaskinfo to");
433 std::ostringstream os;
443 std::string msg=
"unknown policy: "+policy;
459template<
typename T=
double>
471 typedef std::vector<std::shared_ptr<MacroTaskBase> >
taskqT;
492 printf(
"this is task with priority %4.1f\n",
priority);
495 print(
"nothing to print");
498 std::stringstream ss;
516template<
typename macrotaskT>
592 std::shared_ptr< WorldDCPmapInterface< Key<1> > >
pmap1;
593 std::shared_ptr< WorldDCPmapInterface< Key<2> > >
pmap2;
594 std::shared_ptr< WorldDCPmapInterface< Key<3> > >
pmap3;
595 std::shared_ptr< WorldDCPmapInterface< Key<4> > >
pmap4;
596 std::shared_ptr< WorldDCPmapInterface< Key<5> > >
pmap5;
597 std::shared_ptr< WorldDCPmapInterface< Key<6> > >
pmap6;
651 std::shared_ptr<World> all_worlds;
652 all_worlds.reset(
new World(comm));
664 print(
"number of tasks in taskq",
taskq.size());
665 print(
"redirecting output to files task.#####");
689 if (need_replication_of_target) {
692 loop_types<Cloud::DistributeFunctor, double, float, double_complex, float_complex>(std::tuple<DistributionType>(dt),wo);
722 if (element<0)
break;
723 std::shared_ptr<MacroTaskBase>
task=
taskq[element];
730 tasktime+=(cpu1-cpu0);
731 if (
printdebug()) printf(
"completed task %3ld after %6.1fs at time %6.1fs\n",element,cpu1-cpu0,
wall_time());
734 const std::size_t ntask=
taskq.size();
736 auto in_percentile = [&ntask](
const long element) {
737 return std::floor(element/(0.1*(ntask+1)));
739 auto is_first_in_percentile = [&](
const long element) {
740 return (in_percentile(element)!=in_percentile(element-1));
743 std::cout << int(in_percentile(element)*10) <<
" " << std::flush;
756 print(
"all tasks complete");
761 printf(
"completed taskqueue after %4.1fs at time %4.1fs\n", cpu11 - cpu00,
wall_time());
762 printf(
" total cpu time / per world %4.1fs %4.1fs\n", tasktime, tasktime /
universe.
size());
792 for (
const auto& t : vtask) {
802 print(
"total number of tasks: ",
taskq.size());
803 print(
" task batch priority status");
804 for (
const auto& t :
taskq) t->print_me_as_table();
817 if (subworld.
rank()==0) {
831 auto is_Waiting = [](
const std::shared_ptr<MacroTaskBase>& mtb_ptr) {
return mtb_ptr->is_waiting();};
832 auto it=std::find_if(
taskq.begin(),
taskq.end(),is_Waiting);
833 if (it!=
taskq.end()) {
834 it->get()->set_running();
835 long element=it-
taskq.begin();
850 taskq[task_number]->set_complete();
870template<
typename taskT>
911 if (this->taskq_ptr==0) {
916 if (
debug) this->taskq_ptr->set_printlevel(20);
919 this->taskq_ptr->cloud.set_storing_policy(cloud_storage_policy);
920 this->taskq_ptr->cloud.set_replication_policy(this->taskq_ptr->get_policy().cloud_distribution_policy);
938 template<
typename ... Ts>
941 auto argtuple = std::tie(args...);
942 static_assert(std::is_same<
decltype(argtuple),
argtupleT>::value,
"type or number of arguments incorrect");
945 auto partitioner=
task.partitioner;
947 partitioner->set_nsubworld(
world.
size());
948 partitionT partition = partitioner->partition_tasks(argtuple);
958 for (
const auto& batch_prio : partition) {
960 std::shared_ptr<MacroTaskBase>(
new MacroTaskInternal(
task, batch_prio, inputrecords, outputrecords)));
973 static_assert(check_tuple_is_valid_task_result<resultT,0>(),
974 "tuple has invalid result type in prepare_output_records");
976 static_assert(is_valid_task_result_v<resultT>,
"unknown result type in prepare_output_records");
979 if (
debug)
print(
"storing pointers to output in cloud");
981 auto store_output_records = [&](
const auto& result) {
983 typedef std::decay_t<
decltype(result)> argT;
985 outputrecords += cloud.
store(
world, result.get_impl().get());
989 outputrecords += cloud.
store(
world, result.get_impl());
993 std::vector<std::shared_ptr<typename argT::value_type::implT>>
v;
994 for (
const auto& ptr : result)
v.push_back(ptr.get_impl());
1002 return outputrecords;
1008 std::apply([&](
auto &&... args) {
1009 (( outputrecords+=store_output_records(args) ), ...);
1012 outputrecords=store_output_records(result);
1014 return outputrecords;
1027 if (
task.name==
"unknown_task")
return typeid(
task).
name();
1035 static_assert(check_tuple_is_valid_task_result<resultT,0>(),
1036 "tuple has invalid result type in prepare_output_records");
1038 static_assert(is_valid_task_result_v<resultT>,
"unknown result type in prepare_output_records");
1040 this->task.batch=batch_prio.first;
1046 print(
"this is task",
get_name(),
"with batch",
task.batch,
"priority",this->get_priority());
1050 std::stringstream ss;
1052 std::size_t namesize=std::min(std::size_t(28),
name.size());
1053 name += std::string(28-namesize,
' ');
1055 std::stringstream ssbatch;
1056 ssbatch <<
task.batch;
1057 std::string strbatch=ssbatch.str();
1058 int nspaces=std::max(
int(0),35-
int(ssbatch.str().size()));
1059 strbatch+=std::string(nspaces,
' ');
1062 << std::setw(10) << strbatch
1068 template<
typename resultT1, std::
size_t I=0>
1069 typename std::enable_if<is_tuple<resultT1>::value,
void>
::type
1071 if constexpr(I < std::tuple_size_v<resultT1>) {
1072 using elementT =
typename std::tuple_element<I, resultT>::type;
1073 auto element_final=std::get<I>(final_result);
1074 auto element_tmp=std::get<I>(tmp_result);
1075 accumulate_into_final_result<elementT>(subworld, element_final, element_tmp, argtuple);
1076 accumulate_into_final_result<resultT1,I+1>(subworld, final_result, tmp_result, argtuple);
1081 template<
typename resultT1>
1082 typename std::enable_if<not is_tuple<resultT1>::value,
void>
::type
1087 result_tmp.change_tree_state(operating_state);
1088 gaxpy(1.0,result,1.0, result_tmp);
1095 gaxpy(1.0,result,1.0,result_tmp,
false);
1099 gaxpy(1.0, result, 1.0, result_tmp.get_local(),
false);
1103 std::size_t sz=result.size();
1104 for (
size_t i=0; i<sz; ++i) {
1105 gaxpy(1.0, result[i], 1.0, result_tmp[i].get_local(),
false);
1116 argtupleT batched_argtuple =
task.batch.copy_input_batch(argtuple);
1129 auto copi = [&](
auto&
arg) {
1130 typedef std::decay_t<
decltype(
arg)> argT;
1142 print(
"copied coefficients for task",
get_name(),
"in",cpu1-cpu0,
"seconds");
1149 task.subworld_ptr=&subworld;
1150 resultT result_batch = std::apply(
task, batched_argtuple);
1152 constexpr std::size_t
bufsize=256;
1154 std::snprintf(buffer,
bufsize,
"completed task %3ld after %6.1fs at time %6.1fs\n",element,cpu1-cpu0,
wall_time());
1155 print(std::string(buffer));
1158 auto insert_batch = [&](
auto& element1,
auto& element2) {
1159 typedef std::decay_t<
decltype(element1)> decay_type;;
1161 element1=
task.batch.insert_result_batch(element1,element2);
1163 std::swap(element1,element2);
1166 resultT result_subworld=
task.allocator(subworld,argtuple);
1170 insert_batch(result_subworld,result_batch);
1176 accumulate_into_final_result<resultT>(subworld, result_universe, result_subworld, argtuple);
1178 }
catch (std::exception&
e) {
1179 print(
"failing task no",element,
"in subworld",subworld.
id(),
"at time",
wall_time());
1192 template<
typename T, std::
size_t NDIM>
1199 template<
typename T, std::
size_t NDIM>
1201 std::vector<Function<T,NDIM>> vresult;
1202 vresult.resize(v_impl.size());
1207 template<
typename T>
1212 template<
typename T>
1214 std::vector<ScalarResult<T>> vresult(v_sr_impl.size());
1215 for (
size_t i=0; i<v_sr_impl.size(); ++i) {
1216 vresult[i].set_impl(v_sr_impl[i]);
1232 auto doit = [&](
auto& element) {
1233 typedef std::decay_t<
decltype(element)> elementT;
1237 typedef typename elementT::value_type::implT implT;
1238 auto ptr_element = cloud.
consuming_load<std::vector<std::shared_ptr<implT>>>(
1239 subworld, outputrecords1);
1243 typedef typename elementT::implT implT;
1244 auto ptr_element = cloud.
consuming_load<std::shared_ptr<implT>>(subworld, outputrecords1);
1248 typedef typename elementT::value_type ScalarResultT;
1249 typedef typename ScalarResultT::implT implT;
1250 typedef std::vector<std::shared_ptr<implT>> vptrT;
1251 auto ptr_element = cloud.
consuming_load<vptrT>(subworld, outputrecords1);
1257 auto ptr_element = cloud.
consuming_load<std::shared_ptr<typename elementT::implT>>(subworld, outputrecords1);
1265 static_assert(check_tuple_is_valid_task_result<resultT, 0>(),
1266 "invalid tuple task result -- must be vectors of functions");
Wrapper around MPI_Comm. Has a shallow copy constructor; use Create(Get_group()) for deep copy.
Definition safempi.h:497
Intracomm Split(int Color, int Key=0) const
Definition safempi.h:642
a batch consists of a 2D-input batch and a 1D-output batch: K-batch <- (I-batch, J-batch)
Definition macrotaskpartitioner.h:124
cloud class
Definition cloud.h:178
void clear()
Definition cloud.h:476
void replicate_per_node(const std::size_t chunk_size=INT_MAX)
Definition cloud.h:614
nlohmann::json get_statistics(World &world) const
return a json object with the cloud settings and statistics
Definition cloud.h:326
recordlistT store(madness::World &world, const T &source)
Definition cloud.h:574
Recordlist< keyT > recordlistT
Definition cloud.h:192
std::atomic< long > target_replication_time
Definition cloud.h:701
nlohmann::json gather_timings(World &universe) const
Definition cloud.h:381
void replicate(const std::size_t chunk_size=INT_MAX)
Definition cloud.h:634
void print_size(World &universe)
Definition cloud.h:300
T load(madness::World &world, const recordlistT recordlist) const
load a single object from the cloud, recordlist is kept unchanged
Definition cloud.h:534
std::list< WorldObjectBase * > world_object_base_list
Definition cloud.h:230
static void print_memory_statistics(const nlohmann::json stats)
Definition cloud.h:450
DistributionType get_replication_policy() const
is the cloud container replicated: per rank, per node, or distributed
Definition cloud.h:275
void clear_cache(World &subworld)
Definition cloud.h:470
std::atomic< long > copy_time
Definition cloud.h:700
void set_replication_policy(const DistributionType value)
is the cloud container replicated: per rank, per node, or distributed
Definition cloud.h:268
void distribute_targets(const DistributionType dt=Distributed)
distribute/node/rank replicate the targets of all world objects stored in the cloud
Definition cloud.h:517
void print_timings(World &universe) const
backwards compatibility
Definition cloud.h:420
void set_storing_policy(const StoragePolicy value)
storing policy refers to storing functions or pointers to functions
Definition cloud.h:291
StoragePolicy
Definition cloud.h:194
@ StoreFunctionPointer
Definition cloud.h:197
@ StoreFunction
Definition cloud.h:195
T consuming_load(madness::World &world, recordlistT &recordlist) const
similar to load, but will consume the recordlist
Definition cloud.h:547
static void set_default_pmap(World &world)
Definition mraimpl.h:3568
static std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > & get_pmap()
Returns the default process map that was last initialized via set_default_pmap()
Definition funcdefaults.h:401
static void set_pmap(const std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > &value)
Sets the default process map (does not redistribute existing functions)
Definition funcdefaults.h:432
FunctionImpl holds all Function state to facilitate shallow copy semantics.
Definition funcimpl.h:945
A multiresolution adaptive numerical function.
Definition mra.h:139
void set_impl(const std::shared_ptr< FunctionImpl< T, NDIM > > &impl)
Replace current FunctionImpl with provided new one.
Definition mra.h:666
A future is a possibly yet unevaluated value.
Definition future.h:369
T & get(bool dowork=true) &
Gets the value, waiting if necessary.
Definition future.h:570
base class
Definition macrotaskq.h:468
void set_running()
Definition macrotaskq.h:480
virtual ~MacroTaskBase()
Definition macrotaskq.h:474
void set_waiting()
Definition macrotaskq.h:481
MacroTaskBase()
Definition macrotaskq.h:473
virtual void print_me(std::string s="") const
Definition macrotaskq.h:491
std::string print_priority_and_status_to_string() const
Definition macrotaskq.h:497
void set_complete()
Definition macrotaskq.h:479
double priority
Definition macrotaskq.h:476
bool is_complete() const
Definition macrotaskq.h:483
Status
Definition macrotaskq.h:477
@ Complete
Definition macrotaskq.h:477
@ Running
Definition macrotaskq.h:477
@ Unknown
Definition macrotaskq.h:477
@ Waiting
Definition macrotaskq.h:477
bool is_running() const
Definition macrotaskq.h:484
enum madness::MacroTaskBase::Status stat
double get_priority() const
Definition macrotaskq.h:503
void set_priority(const double p)
Definition macrotaskq.h:504
virtual void print_me_as_table(std::string s="") const
Definition macrotaskq.h:494
virtual void run(World &world, Cloud &cloud, taskqT &taskq, const long element, const bool debug, const MacroTaskInfo policy)=0
std::vector< std::shared_ptr< MacroTaskBase > > taskqT
Definition macrotaskq.h:471
friend std::ostream & operator<<(std::ostream &os, const MacroTaskBase::Status s)
Definition macrotaskq.h:506
bool is_waiting() const
Definition macrotaskq.h:485
Definition macrotaskq.h:1288
MacroTaskOperationBase()
Definition macrotaskq.h:1294
Batch batch
Definition macrotaskq.h:1290
World * subworld_ptr
Definition macrotaskq.h:1291
std::shared_ptr< MacroTaskPartitioner > partitioner
Definition macrotaskq.h:1293
std::string name
Definition macrotaskq.h:1292
partition one (two) vectors into 1D (2D) batches.
Definition macrotaskpartitioner.h:182
std::list< std::pair< Batch, double > > partitionT
Definition macrotaskpartitioner.h:186
Factory for the MacroTaskQ.
Definition macrotaskq.h:530
MacroTaskQFactory & set_storage_policy(const MacroTaskInfo::StoragePolicy sp)
Definition macrotaskq.h:559
World & world
Definition macrotaskq.h:533
MacroTaskQFactory & set_policy(const MacroTaskInfo p)
Definition macrotaskq.h:554
MacroTaskQFactory & set_nworld(const long n)
Definition macrotaskq.h:540
MacroTaskQFactory & preset(const std::string name)
Definition macrotaskq.h:550
MacroTaskQFactory & set_cloud_distribution_policy(const DistributionType dp)
Definition macrotaskq.h:564
MacroTaskQFactory & set_printlevel(const long p)
Definition macrotaskq.h:545
long printlevel
Definition macrotaskq.h:532
MacroTaskQFactory & set_ptr_target_distribution_policy(const DistributionType dp)
Definition macrotaskq.h:569
long nworld
Definition macrotaskq.h:534
MacroTaskInfo policy
Definition macrotaskq.h:536
MacroTaskQFactory(World &universe)
Definition macrotaskq.h:538
Definition macrotaskq.h:578
long nsubworld
Definition macrotaskq.h:585
std::mutex taskq_mutex
Definition macrotaskq.h:583
bool printdebug() const
Definition macrotaskq.h:599
static void set_pmap(World &world)
Definition macrotaskq.h:854
bool printtimings_detail() const
Definition macrotaskq.h:602
void run_all()
run all tasks
Definition macrotaskq.h:659
void set_complete(const long task_number) const
scheduler is located on rank==0
Definition macrotaskq.h:843
World & universe
Definition macrotaskq.h:580
std::shared_ptr< WorldDCPmapInterface< Key< 1 > > > pmap1
set the process map for the subworld
Definition macrotaskq.h:592
std::shared_ptr< WorldDCPmapInterface< Key< 3 > > > pmap3
Definition macrotaskq.h:594
MacroTaskBase::taskqT taskq
Definition macrotaskq.h:582
MacroTaskInfo get_policy() const
Definition macrotaskq.h:611
nlohmann::json cloud_statistics
save cloud statistics after run_all()
Definition macrotaskq.h:586
void add_tasks(MacroTaskBase::taskqT &vtask)
Definition macrotaskq.h:791
long get_scheduled_task_number(World &subworld)
scheduler is located on universe.rank==0
Definition macrotaskq.h:815
void set_complete_local(const long task_number) const
scheduler is located on rank==0
Definition macrotaskq.h:848
World & get_subworld()
Definition macrotaskq.h:607
MacroTaskQ(const MacroTaskQFactory factory)
create an empty taskq and initialize the subworlds
Definition macrotaskq.h:624
std::shared_ptr< WorldDCPmapInterface< Key< 4 > > > pmap4
Definition macrotaskq.h:595
nlohmann::json get_taskq_statistics() const
Definition macrotaskq.h:619
std::size_t size() const
Definition macrotaskq.h:863
std::shared_ptr< WorldDCPmapInterface< Key< 5 > > > pmap5
Definition macrotaskq.h:596
void print_taskq() const
Definition macrotaskq.h:798
std::shared_ptr< World > subworld_ptr
Definition macrotaskq.h:581
bool printtimings() const
Definition macrotaskq.h:601
std::shared_ptr< WorldDCPmapInterface< Key< 2 > > > pmap2
Definition macrotaskq.h:593
bool printprogress() const
Definition macrotaskq.h:600
std::shared_ptr< WorldDCPmapInterface< Key< 6 > > > pmap6
Definition macrotaskq.h:597
nlohmann::json get_cloud_statistics() const
Definition macrotaskq.h:615
void set_printlevel(const long p)
Definition macrotaskq.h:609
long printlevel
Definition macrotaskq.h:584
madness::Cloud cloud
Definition macrotaskq.h:606
long get_nsubworld() const
Definition macrotaskq.h:608
void add_replicated_task(const std::shared_ptr< MacroTaskBase > &task)
Definition macrotaskq.h:810
~MacroTaskQ()
Definition macrotaskq.h:642
static std::shared_ptr< World > create_worlds(World &universe, const std::size_t nsubworld)
Definition macrotaskq.h:646
nlohmann::json taskq_statistics
save taskq statistics after run_all()
Definition macrotaskq.h:587
long get_scheduled_task_number_local()
Definition macrotaskq.h:827
const MacroTaskInfo policy
storage and distribution policy
Definition macrotaskq.h:589
Definition macrotaskq.h:1018
void run(World &subworld, Cloud &cloud, MacroTaskBase::taskqT &taskq, const long element, const bool debug, const MacroTaskInfo policy) override
called by the MacroTaskQ when the task is scheduled
Definition macrotaskq.h:1112
taskT task
Definition macrotaskq.h:1025
MacroTaskInternal(const taskT &task, const std::pair< Batch, double > &batch_prio, const recordlistT &inputrecords, const recordlistT &outputrecords)
Definition macrotaskq.h:1031
std::enable_if< notis_tuple< resultT1 >::value, void >::type accumulate_into_final_result(World &subworld, resultT1 &result, const resultT1 &result_tmp, const argtupleT &argtuple)
accumulate the result of the task into the final result living in the universe
Definition macrotaskq.h:1083
resultT get_output(World &subworld, Cloud &cloud) const
return the WorldObjects or the result functions living in the universe
Definition macrotaskq.h:1225
static ScalarResult< T > pointer2WorldObject(const std::shared_ptr< ScalarResultImpl< T > > sr_impl)
Definition macrotaskq.h:1208
decay_tuple< typename taskT::argtupleT > argtupleT
Definition macrotaskq.h:1020
static std::vector< Function< T, NDIM > > pointer2WorldObject(const std::vector< std::shared_ptr< FunctionImpl< T, NDIM > > > v_impl)
Definition macrotaskq.h:1200
void cleanup() override
Definition macrotaskq.h:1189
taskT::resultT resultT
Definition macrotaskq.h:1021
recordlistT outputrecords
Definition macrotaskq.h:1023
std::string get_name() const
Definition macrotaskq.h:1026
static Function< T, NDIM > pointer2WorldObject(const std::shared_ptr< FunctionImpl< T, NDIM > > impl)
Definition macrotaskq.h:1193
static std::vector< ScalarResult< T > > pointer2WorldObject(const std::vector< std::shared_ptr< ScalarResultImpl< T > > > v_sr_impl)
Definition macrotaskq.h:1213
void print_me_as_table(std::string s="") const override
Definition macrotaskq.h:1049
void print_me(std::string s="") const override
Definition macrotaskq.h:1045
std::enable_if< is_tuple< resultT1 >::value, void >::type accumulate_into_final_result(World &subworld, resultT1 &final_result, const resultT1 &tmp_result, const argtupleT &argtuple)
accumulate the result of the task into the final result living in the universe
Definition macrotaskq.h:1070
recordlistT inputrecords
Definition macrotaskq.h:1022
Definition macrotaskq.h:871
MacroTask & set_debug(const bool value)
Definition macrotaskq.h:924
MacroTask(World &world, taskT &task)
constructor takes the task, but no arguments to the task
Definition macrotaskq.h:894
MacroTask(World &world, taskT &task, const MacroTaskQFactory factory)
constructor takes task and a taskq factory for customization, immediate execution
Definition macrotaskq.h:900
taskT::resultT resultT
Definition macrotaskq.h:881
std::shared_ptr< MacroTaskQ > taskq_ptr
Definition macrotaskq.h:889
taskT::argtupleT argtupleT
Definition macrotaskq.h:882
bool immediate_execution
Definition macrotaskq.h:887
World & world
Definition macrotaskq.h:888
std::shared_ptr< MacroTaskQ > get_taskq() const
Definition macrotaskq.h:929
resultT operator()(const Ts &... args)
this mimicks the original call to the task functor, called from the universe
Definition macrotaskq.h:939
bool debug
Definition macrotaskq.h:886
taskT task
Definition macrotaskq.h:885
MacroTaskPartitioner::partitionT partitionT
Definition macrotaskq.h:872
Cloud::recordlistT recordlistT
Definition macrotaskq.h:883
recordlistT prepare_output_records(Cloud &cloud, resultT &result)
store pointers to the result WorldObject in the cloud and return the recordlist
Definition macrotaskq.h:971
MacroTask(World &world, taskT &task, std::shared_ptr< MacroTaskQ > taskq_ptr)
constructor takes the task, and a taskq, execution is not immediate
Definition macrotaskq.h:906
static std::map< MemKey, MemInfo > measure_and_print(World &world)
measure the memory usage of all objects of all worlds
Definition memory_measurement.h:24
helper class for returning the result of a task, which is not a madness Function, but a simple scalar
Definition macrotaskq.h:55
void gaxpy(const double a, const T &right, double b, const bool fence=true)
accumulate, optional fence
Definition macrotaskq.h:84
void serialize(Archive &ar)
Definition macrotaskq.h:92
T get()
after completion of the taskq get the final value
Definition macrotaskq.h:97
ScalarResultImpl< T > & operator=(const T &x)
simple assignment of the scalar value
Definition macrotaskq.h:73
ScalarResultImpl(const ScalarResultImpl &other)=delete
Disable the default copy constructor.
ScalarResultImpl< T > & operator=(const ScalarResultImpl< T > &other)=delete
disable assignment operator
ScalarResultImpl< T > & operator+=(const T &x)
Definition macrotaskq.h:78
~ScalarResultImpl()
Definition macrotaskq.h:69
T value
the scalar value
Definition macrotaskq.h:110
T get_local() const
Definition macrotaskq.h:104
T value_type
Definition macrotaskq.h:57
ScalarResultImpl(World &world)
Definition macrotaskq.h:58
Definition macrotaskq.h:114
void serialize(Archive &ar)
Definition macrotaskq.h:145
ScalarResult(const std::shared_ptr< implT > &impl)
Definition macrotaskq.h:121
void gaxpy(const double a, const T &right, double b, const bool fence=true)
accumulate, optional fence
Definition macrotaskq.h:140
ScalarResultImpl< T > implT
Definition macrotaskq.h:116
T get()
after completion of the taskq get the final value
Definition macrotaskq.h:150
ScalarResult(World &world)
Definition macrotaskq.h:120
std::shared_ptr< implT > impl
Definition macrotaskq.h:117
ScalarResult & operator=(const T &x)
Definition macrotaskq.h:122
std::shared_ptr< implT > get_impl() const
Definition macrotaskq.h:127
void set_impl(const std::shared_ptr< implT > &newimpl)
Definition macrotaskq.h:131
T get_local() const
after completion of the taskq get the final value
Definition macrotaskq.h:155
uniqueidT id() const
Definition macrotaskq.h:135
void broadcast_serializable(objT &obj, ProcessID root)
Broadcast a serializable object.
Definition worldgop.h:756
void fence(bool debug=false)
Synchronizes all processes in communicator AND globally ensures no pending AM or tasks.
Definition worldgop.cc:161
bool set_forbid_fence(bool value)
Set forbid_fence flag to new value and return old value.
Definition worldgop.h:676
void sum(T *buf, size_t nelem)
Inplace global sum while still processing AM & tasks.
Definition worldgop.h:872
SafeMPI::Intracomm & comm()
Returns the associated SafeMPI communicator.
Definition worldmpi.h:286
Implements most parts of a globally addressable object (via unique ID).
Definition world_object.h:366
World & get_world() const
Returns a reference to the world.
Definition world_object.h:719
World & world
The World this object belongs to. (Think globally, act locally).
Definition world_object.h:383
void process_pending()
To be called from derived constructor to process pending messages.
Definition world_object.h:658
detail::task_result_type< memfnT >::futureT send(ProcessID dest, memfnT memfn) const
Definition world_object.h:733
detail::task_result_type< memfnT >::futureT task(ProcessID dest, memfnT memfn, const TaskAttributes &attr=TaskAttributes()) const
Sends task to derived class method returnT (this->*memfn)().
Definition world_object.h:1007
A parallel world class.
Definition world.h:132
static World * world_from_id(std::uint64_t id)
Convert a World ID to a World pointer.
Definition world.h:492
ProcessID rank() const
Returns the process rank in this World (same as MPI_Comm_rank()).
Definition world.h:320
WorldMpiInterface & mpi
MPI interface.
Definition world.h:204
ProcessID size() const
Returns the number of processes in this World (same as MPI_Comm_size()).
Definition world.h:330
unsigned long id() const
Definition world.h:315
WorldGopInterface & gop
Global operations.
Definition world.h:207
std::optional< T * > ptr_from_id(uniqueidT id) const
Look up a local pointer from a world-wide unique ID.
Definition world.h:416
Class for unique global IDs.
Definition uniqueid.h:53
Declares the Cloud class for storing data and transfering them between worlds.
char * p(char *buf, const char *name, int k, int initial_level, double thresh, int order)
Definition derivatives.cc:72
const std::size_t bufsize
Definition derivatives.cc:16
static bool debug
Definition dirac-hatom.cc:16
auto T(World &world, response_space &f) -> response_space
Definition global_functions.cc:28
Tensor< typename Tensor< T >::scalar_type > arg(const Tensor< T > &t)
Return a new tensor holding the argument of each element of t (complex types only)
Definition tensor.h:2518
static const double v
Definition hatom_sf_dirac.cc:20
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
#define MADNESS_CHECK_THROW(condition, msg)
Check a condition — even in a release build the condition is always evaluated so it can have side eff...
Definition madness_exception.h:207
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
std::vector< ScalarResult< T > > scalar_result_vector(World &world, std::size_t n)
helper function to create a vector of ScalarResultImpl, circumventing problems with the constructors
Definition macrotaskq.h:163
std::ostream & operator<<(std::ostream &os, const particle< PDIM > &p)
Definition lowrankfunction.h:401
static double cpu_time()
Returns the cpu time in seconds relative to an arbitrary origin.
Definition timers.h:127
DistributionType
some introspection of how data is distributed
Definition worlddc.h:81
@ NodeReplicated
even if there are several ranks per node
Definition worlddc.h:84
@ Distributed
no replication of the container, the container is distributed over the world
Definition worlddc.h:82
@ RankReplicated
replicate the container over all world ranks
Definition worlddc.h:83
void set_impl(std::vector< Function< T, NDIM > > &v, const std::vector< std::shared_ptr< FunctionImpl< T, NDIM > > > vimpl)
Definition vmra.h:712
std::vector< std::shared_ptr< FunctionImpl< T, NDIM > > > get_impl(const std::vector< Function< T, NDIM > > &v)
Definition vmra.h:705
TreeState
Definition funcdefaults.h:59
@ reconstructed
s coeffs at the leaves only
Definition funcdefaults.h:60
@ compressed
d coeffs in internal nodes, s and d coeffs at the root
Definition funcdefaults.h:61
constexpr bool check_tuple_is_valid_task_result()
given a tuple check recursively if all elements are valid task results
Definition macrotaskq.h:222
static const Slice _(0,-1, 1)
static void binary_tuple_loop(tupleT &tuple1, tupleR &tuple2, opT &op)
loop over the tuple elements of both tuples and execute the operation op on each element pair
Definition type_traits.h:742
void print(const T &t, const Ts &... ts)
Print items to std::cout (items separated by spaces) and terminate with a new line.
Definition print.h:226
static void unary_tuple_loop(tupleT &tuple, opT &op)
loop over a tuple and apply unary operator op to each element
Definition type_traits.h:732
@ TT_FULL
Definition gentensor.h:120
NDIM & f
Definition mra.h:2543
constexpr bool is_valid_task_result_v
check if type is a valid task result: it must be a WorldObject and must implement gaxpy
Definition macrotaskq.h:208
const Function< T, NDIM > & change_tree_state(const Function< T, NDIM > &f, const TreeState finalstate, bool fence=true)
change tree state of a function
Definition mra.h:2869
decltype(decay_types(std::declval< T >())) decay_tuple
Definition macrotaskpartitioner.h:22
double wall_time()
Returns the wall time in seconds relative to an arbitrary origin.
Definition timers.cc:48
std::string type(const PairType &n)
Definition PNOParameters.h:18
constexpr Vector< T, sizeof...(Ts)+1 > vec(T t, Ts... ts)
Factory function for creating a madness::Vector.
Definition vector.h:750
std::string name(const FuncType &type, const int ex=-1)
Definition ccpairfunction.h:28
Function< T, NDIM > copy(const Function< T, NDIM > &f, const std::shared_ptr< WorldDCPmapInterface< Key< NDIM > > > &pmap, bool fence=true)
Create a new copy of the function with different distribution and optional fence.
Definition mra.h:2111
void gaxpy(const double a, ScalarResult< T > &left, const double b, const T &right, const bool fence=true)
the result type of a macrotask must implement gaxpy
Definition macrotaskq.h:244
static const double b
Definition nonlinschro.cc:119
static const double a
Definition nonlinschro.cc:118
Definition macrotaskq.h:280
friend std::string to_string(const MacroTaskInfo::StoragePolicy sp)
Definition macrotaskq.h:432
StoragePolicy
Definition macrotaskq.h:281
@ StoreFunction
store a madness function in the cloud – can have a large memory impact
Definition macrotaskq.h:282
@ StorePointerToFunction
Definition macrotaskq.h:283
@ StoreFunctionViaPointer
coefficients to the subworlds when the task is started. This is the default policy.
Definition macrotaskq.h:286
static std::vector< MacroTaskInfo > get_all_presets()
helper function to return all presets
Definition macrotaskq.h:343
StoragePolicy storage_policy
Definition macrotaskq.h:420
DistributionType ptr_target_distribution_policy
Definition macrotaskq.h:422
static MacroTaskInfo preset(const std::string name)
Definition macrotaskq.h:313
static std::vector< std::string > get_all_preset_names()
Definition macrotaskq.h:338
static Cloud::StoragePolicy to_cloud_storage_policy(MacroTaskInfo::StoragePolicy policy)
given the MacroTask's storage policy return the corresponding Cloud storage policy
Definition macrotaskq.h:298
DistributionType cloud_distribution_policy
Definition macrotaskq.h:421
friend std::ostream & operator<<(std::ostream &os, const MacroTaskInfo policy)
Definition macrotaskq.h:424
bool check_consistency() const
make sure the policies are consistent
Definition macrotaskq.h:352
friend std::ostream & operator<<(std::ostream &os, const StoragePolicy sp)
Definition macrotaskq.h:290
static StoragePolicy policy_to_string(const std::string policy)
Definition macrotaskq.h:438
nlohmann::json to_json() const
Definition macrotaskq.h:448
void from_vector_of_strings(const std::vector< std::string > &vec)
set policy from a vector of strings, assuming the order is storage policy, cloud distribution policy,...
Definition macrotaskq.h:373
Definition macrotaskq.h:875
Default load of an object via serialize(ar, t).
Definition archive.h:667
Default store of an object via serialize(ar, t).
Definition archive.h:612
static std::string tolower(std::string s)
make lower case
Definition commandlineparser.h:86
class to temporarily redirect output to cout
Definition print.h:277
RAII class to redirect cout to a file.
Definition print.h:251
Definition type_traits.h:756
Definition macrotaskq.h:193
Definition macrotaskq.h:178
Definition macrotaskq.h:172
Definition macrotaskq.h:199
Definition macrotaskq.h:187
Definition macrotaskq.h:217
static void load(const Archive &ar, std::shared_ptr< ScalarResultImpl< T > > &ptr)
Definition macrotaskq.h:260
static void store(const Archive &ar, const std::shared_ptr< ScalarResultImpl< T > > &ptr)
Definition macrotaskq.h:250
Definition timing_utilities.h:9
static const double_complex I
Definition tdse1d.cc:164
void doit(World &world)
Definition tdse.cc:921
void e()
Definition test_sig.cc:75
Declares the World class for the parallel runtime environment.
int ProcessID
Used to clearly identify process number/rank.
Definition worldtypes.h:43