MADNESS 0.10.1
cloud.h
Go to the documentation of this file.
1
2/**
3 \file cloud.h
4 \brief Declares the \c Cloud class for storing data and transfering them between worlds
5 \ingroup world
6
7*/
8
9/**
10 * TODO: - delete container record upon caching if container is replicated
11 */
12
13#ifndef SRC_MADNESS_WORLD_CLOUD_H_
14#define SRC_MADNESS_WORLD_CLOUD_H_
15
16
18#include<any>
19#include<iomanip>
20
21
22/*!
23 \file cloud.h
24 \brief Defines and implements most of madness cloud storage
25
26 TODO: check use of preprocessor directives
27 TODO: clear cache in destructor won't work because no subworld is present -> must be explicitly called, error prone/
28*/
29
30namespace madness {
31
32 /// \brief A utility to get the name of a type as a string from chatGPT
33 template<typename T>
34 struct type_name {
35 static const char* value() { return typeid(T).name();}
36 };
37
38 template<>
39 struct type_name<Function<double,1>> { static const char* value() { return "Function<double,1>"; } };
40 template<>
41 struct type_name<Function<double,2>> { static const char* value() { return "Function<double,2>"; } };
42 template<>
43 struct type_name<Function<double,3>> { static const char* value() { return "Function<double,3>"; } };
44 template<>
45 struct type_name<Function<double,4>> { static const char* value() { return "Function<double,4>"; } };
46 template<>
47 struct type_name<Function<double,5>> { static const char* value() { return "Function<double,5>"; } };
48 template<>
49 struct type_name<Function<double,6>> { static const char* value() { return "Function<double,6>"; } };
50
51 template<>
52 struct type_name<std::vector<Function<double,1>>> { static const char* value() { return "std::vector<Function<double,1>>"; } };
53 template<>
54 struct type_name<std::vector<Function<double,2>>> { static const char* value() { return "std::vector<Function<double,2>>"; } };
55 template<>
56 struct type_name<std::vector<Function<double,3>>> { static const char* value() { return "std::vector<Function<double,3>>"; } };
57 template<>
58 struct type_name<std::vector<Function<double,4>>> { static const char* value() { return "std::vector<Function<double,4>>"; } };
59 template<>
60 struct type_name<std::vector<Function<double,5>>> { static const char* value() { return "std::vector<Function<double,5>>"; } };
61 template<>
62 struct type_name<std::vector<Function<double,6>>> { static const char* value() { return "std::vector<Function<double,6>>"; } };
63
64template<typename keyT>
65struct Recordlist {
66 std::list<keyT> list;
67
68 Recordlist() : list() {};
69
70 explicit Recordlist(const keyT &key) : list{key} {};
71
72 Recordlist(const Recordlist &other) : list(other.list) {};
73
75 for (auto &l2 : list2.list) list.push_back(l2);
76 return *this;
77 }
78
79 Recordlist &operator+=(const keyT &key) {
80 list.push_back(key);
81 return *this;
82 }
83
85 keyT key = list.front();
86 list.pop_front();
87 return key;
88 }
89
90 std::size_t size() const {
91 return list.size();
92 }
93
94 // if type provides id() member function (i.e. WorldObject) use that for hashing, otherwise use hash_value() for
95 // fundamental types (see worldhash.h)
96 template <typename T>
97 using member_id_t = decltype(std::declval<T>().id());
98
99 template <typename T>
101
102 // if type provides a hashing function use that, intrusive hashing, see worldhash.h
103 template <typename T>
104 using member_hash_t = decltype(std::declval<T>().hash());
105
106 template <typename T>
108
109 template<typename T, std::size_t NDIM>
110 static keyT compute_record(const Function<T,NDIM>& arg) {return hash_value(arg.get_impl()->id());}
111
112 template<typename T, std::size_t NDIM>
114
115 template<typename keyQ, typename valueT>
117
118 template<typename keyQ, typename valueT>
119 static keyT compute_record(const std::shared_ptr<WorldContainer<keyQ,valueT>>& arg) {return hash_value(arg->id());}
120
121 template<typename T, std::size_t NDIM>
122 static keyT compute_record(const std::shared_ptr<madness::FunctionImpl<T, NDIM>>& arg) {return hash_value(arg->id());}
123
124 template<typename T>
125 static keyT compute_record(const std::vector<T>& arg) {return hash_range(arg.begin(), arg.end());}
126
127 template<typename T>
128 static keyT compute_record(const Tensor<T>& arg) {return hash_value(arg.normf());}
129
130 template<typename T>
131 static keyT compute_record(const std::shared_ptr<T>& arg) {return compute_record(*arg);}
132
133 template<typename T>
134 static keyT compute_record(const T& arg) {
135 if constexpr (has_member_id<T>::value) {
136 return hash_value(arg.id());
137 } else if constexpr (std::is_pointer_v<T> && has_member_id<std::remove_pointer_t<T>>::value) {
138 return hash_value(arg->id());
139 } else {
140 // compute hash_code for fundamental types
141 std::size_t hashtype = typeid(T).hash_code();
142 hash_combine(hashtype,hash_value(arg));
143 return hashtype;
144 }
145 }
146
147
148 friend std::ostream &operator<<(std::ostream &os, const Recordlist &arg) {
149 using namespace madness::operators;
150 os << arg.list;
151 return os;
152 }
153
154};
155
156/// cloud class
157
158/// store and load data to/from the cloud into arbitrary worlds
159///
160/// Distributed data is always bound to a certain world. If it needs to be
161/// present in another world it can be serialized to the cloud and deserialized
162/// from there again. For an example see test_cloud.cc
163///
164/// Data is stored into a distributed container living in the universe.
165/// During storing a (replicated) list of records is returned that can be used to find the data
166/// in the container. If a combined object (a vector, tuple, etc) is stored a list of records
167/// will be generated. When loading the data from the world the record list will be used to
168/// deserialize all stored objects.
169///
170/// Note that there must be a fence after the destruction of subworld containers, as in:
171///
172/// create subworlds
173/// {
174/// dcT(subworld)
175/// do work
176/// }
177/// subworld.gop.fence();
178class Cloud {
179
180 bool debug = false; ///< prints debug output
181 bool is_replicated=false; ///< if contents of the container are replicated
182 bool dofence = true; ///< fences after load/store
183 bool force_load_from_cache = false; ///< forces load from cache (mainly for debugging)
184 bool use_cache=true;
185
186public:
187
188 typedef std::any cached_objT;
190 using valueT = std::vector<unsigned char>;
191 typedef std::map<keyT, cached_objT> cacheT;
193
195 StoreFunction, ///< store a madness function in the cloud -- can have a large memory impact
196 ///< equivalent to a deep copy
197 StoreFunctionPointer, ///< store the pointer to the function in the cloud.
198 ///< Return type still is a Function<T,NDIM> with a pointer to the universe function impl.
199 ///< equivalent to a shallow copy
200 };
201
202
203 friend std::ostream& operator<<(std::ostream& os, const StoragePolicy& sp) {
204 switch(sp) {
205 case StoreFunction: os << "Function"; break;
206 case StoreFunctionPointer: os << "FunctionPointer"; break;
207 default: os << "UnknownStoragePolicy"; break;
208 }
209 return os;
210 }
211
212 friend std::string to_string(const StoragePolicy sp) {
213 std::ostringstream os;
214 os << sp;
215 return os.str();
216 }
217
218private:
219 /// are the functions (WorldObjects) stored in the cloud or only pointers to them
221
222 /// cloud is a container: replication policy for the cloud container: distributed, node-replicated, rank-replicated
224
227 recordlistT local_list_of_container_keys; // a world-local list of keys occupied in container
228
229public:
230 std::list<WorldObjectBase*> world_object_base_list; // list of world objects stored in the cloud
231
232 template <typename T>
233 using member_cloud_serialize_t = decltype(std::declval<T>().cloud_store(std::declval<World&>(), std::declval<Cloud&>()));
234
235 template <typename T>
237
238public:
239
240 /// @param[in] universe the universe world
241 Cloud(madness::World &universe) : container(universe), reading_time(0l), copy_time(0l), writing_time(0l),
242 cache_reads(0l), cache_stores(0l) {
243 }
244
246 if ((not cached_objects.empty()) or (not local_list_of_container_keys.list.empty())) {
247 print("\nCloud::~Cloud(): cached_objects not empty, size=", cached_objects.size());
248 print("You need to call clear_cache(subworld) before destroying the cloud");
249 print("\n------------------------------\n");
250 std::string msg="deferred destruction of cloud with non-empty cache";
251 std::cerr << msg << std::endl;
252 }
253 }
254
255 void set_debug(bool value) {
256 debug = value;
257 }
258
259 void set_fence(bool value) {
260 dofence = value;
261 }
262
263 void set_force_load_from_cache(bool value) {
264 force_load_from_cache = value;
265 }
266
267 /// is the cloud container replicated: per rank, per node, or distributed
270 use_cache=false;
271 if (value == RankReplicated) use_cache=true;
272 }
273
274 /// is the cloud container replicated: per rank, per node, or distributed
278
281 if (disttype!=cloud_replication_policy) {
282 std::cout << "Cloud::validate_distribution(): distribution type mismatch, container is " << disttype
283 << " but cloud_replication_policy is " << cloud_replication_policy << std::endl;
284 return false;
285 }
286 return true;
287 }
288
289
290 /// storing policy refers to storing functions or pointers to functions
292 storage_policy = value;
293 }
294
295 /// storing policy refers to storing functions or pointers to functions
299
300 void print_size(World& universe) {
301 nlohmann::json stats=gather_memory_statistics(universe);
302 double byte2gbyte=1.0/(1024*1024*1024);
303 double global_memsize=stats["memory_global"].template get<double>();
304 double max_record_size=stats["max_record_size"].template get<double>();
305 double min_memsize=stats["memory_min"].template get<double>();
306 double max_memsize=stats["memory_max"].template get<double>();
307 double global_size=stats["container_size_global"].template get<double>();
308
309 if (universe.rank()==0) {
310 print("Cloud memory:");
311 print(" replicated:",is_replicated);
312 print("size of cloud (total)");
313 print(" number of records: ",global_size);
314 print(" memory in GBytes: ",global_memsize*byte2gbyte);
315 print("size of cloud (average per node)");
316 print(" number of records: ",double(global_size)/universe.size());
317 print(" memory in GBytes: ",global_memsize*byte2gbyte/universe.size());
318 print("min/max of node");
319 print(" memory in GBytes: ",min_memsize*byte2gbyte,max_memsize*byte2gbyte);
320 print(" max record size in GBytes:",max_record_size*byte2gbyte);
321
322 }
323 }
324
325 /// return a json object with the cloud settings and statistics
326 nlohmann::json get_statistics(World& world) const {
327 nlohmann::json j;
328 { // settings
329 j["storage_policy"]=to_string(storage_policy);
330 j["cloud_replication_policy"]=to_string(cloud_replication_policy);
331 j["is_replicated"]=is_replicated;
332 j["local_cached_objects_size"]=cached_objects.size();
333 }
334 // timings
335 j.update(gather_timings(world));
336 j.update(gather_memory_statistics(world));
337 return j;
338
339 }
340
341 /// get size of the cloud container
342 nlohmann::json gather_memory_statistics(World &universe) const {
343
344 std::size_t memsize=0;
345 std::size_t max_record_size=0;
346 for (auto& item : container) {
347 memsize+=item.second.size();
348 max_record_size=std::max(max_record_size,item.second.size());
349 }
350 std::size_t global_memsize=memsize;
351 std::size_t max_memsize=memsize;
352 std::size_t min_memsize=memsize;
353 double rss=madness::get_rss_usage_in_GB();
354 universe.gop.sum(global_memsize);
355 universe.gop.max(max_memsize);
356 universe.gop.max(max_record_size);
357 universe.gop.min(min_memsize);
358 universe.gop.max(rss);
359
360 auto local_size=container.size();
361 auto global_size=local_size;
362 universe.gop.sum(global_size);
363 nlohmann::json j;
364 j["container_size_global"] = global_size;
365 j["memory_global"] = global_memsize;
366 j["memory_min"] = min_memsize;
367 j["memory_max"] = max_memsize;
368 j["memory_rss_GB_max"] = rss;
369 j["max_record_size"] = max_record_size;
370 return j;
371 }
372
373 nlohmann::json gather_timings(World &universe) const {
374 double rtime_max = double(reading_time)*1.e-6;
375 double rtime_acc = double(reading_time)*1.e-6;
376 double rtime_av = double(reading_time)*1.e-6;
377 double ctime_max = double(copy_time)*1.e-6;
378 double ctime_acc = double(copy_time)*1.e-6;
379 double ctime_av = double(copy_time)*1.e-6;
380 double wtime = double(writing_time)*1.e-6;
381 double ptime = double(replication_time)*1.e-6;
382 double tptime = double(target_replication_time)*1.e-6;
383 universe.gop.max(rtime_max);
384 universe.gop.sum(rtime_acc);
385 rtime_av = rtime_acc/universe.size();
386 universe.gop.max(ctime_max);
387 universe.gop.sum(ctime_acc);
388 ctime_av = ctime_acc/universe.size();
389 universe.gop.max(wtime);
390 universe.gop.max(ptime);
391 universe.gop.max(tptime);
392 long creads = long(cache_reads);
393 long cstores = long(cache_stores);
394 universe.gop.sum(creads);
395 universe.gop.sum(cstores);
396 nlohmann::json j;
397 j["reading_time_max_s"] = rtime_max;
398 j["reading_time_acc_s"] = rtime_acc;
399 j["reading_time_av_s"] = rtime_av;
400 j["copy_time_max_s"] = ctime_max;
401 j["copy_time_acc_s"] = ctime_acc;
402 j["copy_time_av_s"] = ctime_av;
403 j["writing_time_s"] = wtime;
404 j["replication_time_s"] = ptime;
405 j["target_replication_time_s"] = tptime;
406 j["cache_reads"] = creads;
407 j["cache_stores"] = cstores;
408 return j;
409 }
410
411 /// backwards compatibility
412 void print_timings(World& universe) const {
413 print_timings(gather_timings(universe));
414 }
415
416 static void print_timings(const nlohmann::json timings) {
417 double rtime_max=timings["reading_time_max_s"].template get<double>();
418 double rtime_av=timings["reading_time_av_s"].template get<double>();
419 double rtime_acc=timings["reading_time_acc_s"].template get<double>();
420 // double ctime_max=timings["copy_time_max_s"].template get<double>();
421 // double ctime_av=timings["copy_time_av_s"].template get<double>();
422 // double ctime_acc=timings["copy_time_acc_s"].template get<double>();
423 double wtime=timings["writing_time_s"].template get<double>();
424 double ptime=timings["replication_time_s"].template get<double>();
425 double tptime=timings["target_replication_time_s"].template get<double>();
426 long creads=timings["cache_reads"].template get<long>();
427 long cstores=timings["cache_stores"].template get<long>();
428
429 auto precision = std::cout.precision();
430 std::cout << std::fixed << std::setprecision(1);
431 print("cloud storing wall time ", wtime);
432 print("cloud replication wall time ", ptime);
433 print("target replication wall time ", tptime);
434 print("cloud max reading time (all procs) ", rtime_max, std::defaultfloat);
435 print("cloud average reading cpu time (all procs) ", rtime_av, std::defaultfloat);
436 print("cloud accumulated reading cpu time (all procs) ", rtime_acc, std::defaultfloat);
437 std::cout << std::setprecision(precision) << std::scientific;
438 print("cloud cache stores ", long(cstores));
439 print("cloud cache loads ", long(creads));
440 }
441
442 static void print_memory_statistics(const nlohmann::json stats) {
443 double byte2gbyte=1.0/(1024*1024*1024);
444 double global_memsize=stats["memory_global"].template get<double>();
445 double max_record_size=stats["max_record_size"].template get<double>();
446 double min_memsize=stats["memory_min"].template get<double>();
447 double max_memsize=stats["memory_max"].template get<double>();
448 double global_size=stats["container_size_global"].template get<double>();
449
450 print("Cloud memory:");
451 print(" size of cloud (total)");
452 print(" number of records: ",global_size);
453 print(" memory in GBytes: ",global_memsize*byte2gbyte);
454 // print(" size of cloud (average per node)");
455 // print(" number of records: ",double(global_size)/madness::world().size());
456 // print(" memory in GBytes: ",global_memsize*byte2gbyte/madness::world().size());
457 print(" min/max of node");
458 print(" memory in GBytes: ",min_memsize*byte2gbyte,max_memsize*byte2gbyte);
459 print(" max record size in GBytes:",max_record_size*byte2gbyte);
460 }
461
462 void clear_cache(World &subworld) {
463 cached_objects.clear();
464 local_list_of_container_keys.list.clear();
465 subworld.gop.fence();
466 }
467
468 void clear() {
469 container.clear();
470 }
471
473 reading_time=0l;
474 copy_time=0l;
475 writing_time=0l;
476 writing_time1=0l;
479 cache_stores=0l;
480 cache_reads=0l;
481 }
482
483 /// functor to distribute/rank/node-replicate a function, passed in as a pointer to WorldObjectBase
484 template<typename T, std::size_t NDIM>
489 // figure out if wo is a FunctionImpl and do the distribution
490 if (auto fimpl=dynamic_cast<FunctionImpl<T, NDIM>*>(wo)) {
491 // fimpl->get_pmap()->print_data_sizes(world,"before distribution of function in cloud");
492 if (dt==RankReplicated) {
493 fimpl->replicate(false);
494 } else if (dt==NodeReplicated) {
495 // print("replicating function per node",fimpl);;
496 fimpl->replicate_on_hosts(true);
497 } else if (dt==Distributed) {
498 fimpl->undo_replicate(false);
499 } else {
500 MADNESS_EXCEPTION("unknown distribution type",1);
501 }
502 // fimpl->get_pmap()->print_data_sizes(world,"after distribution of function in cloud");
503 }
504 return 0;
505 }
506 };
507
508 /// distribute/node/rank replicate the targets of all world objects stored in the cloud
510 if (world_object_base_list.empty()) return;
511 World& world=world_object_base_list.front()->get_world();
512
513 for (auto wo : world_object_base_list) {
514 loop_types<DistributeFunctor, double, float, double_complex, float_complex>(std::tuple<DistributionType>(dt),wo);
515 // world.gop.fence();
516 }
517 world.gop.fence();
518
519 }
520
521 /// @param[in] world the subworld the objects are loaded to
522 /// @param[in] recordlist the list of records where the objects are stored
523
524 /// load a single object from the cloud, recordlist is kept unchanged
525 template<typename T>
526 T load(madness::World &world, const recordlistT recordlist) const {
527 recordlistT rlist = recordlist;
528 cloudtimer t(world, reading_time);
529
530 // forward_load will consume the recordlist while loading elements
531 return forward_load<T>(world, rlist);
532 }
533
534 /// similar to load, but will consume the recordlist
535
536 /// @param[in] world the subworld the objects are loaded to
537 /// @param[in] recordlist the list of records where the objects are stored
538 template<typename T>
539 T consuming_load(madness::World &world, recordlistT& recordlist) const {
540 cloudtimer t(world, reading_time);
541
542 // forward_load will consume the recordlist while loading elements
543 return forward_load<T>(world, recordlist);
544 }
545
546 /// load a single object from the cloud, recordlist is consumed while loading elements
547 template<typename T>
548 T forward_load(madness::World &world, recordlistT& recordlist) const {
549 // different objects are stored in different ways
550 // - tuples are split up into their components
551 // - classes with their own cloud serialization are stored using that
552 // - everything else is stored using their usual serialization
553 if constexpr (is_tuple<T>::value) {
554 return load_tuple<T>(world, recordlist);
555 } else if constexpr (has_cloud_serialize<T>::value) {
556 T target = allocator<T>(world);
557 target.cloud_load(world, *this, recordlist);
558 return target;
559 } else {
560 return do_load<T>(world, recordlist);
561 }
562 }
563
564 /// @param[in] world presumably the universe
565 template<typename T>
567 if (is_replicated) {
568 print("Cloud contents are replicated and read-only!");
569 MADNESS_EXCEPTION("cloud error",1);
570 }
571 cloudtimer t(world,writing_time);
572
573 // different objects are stored in different ways
574 // - tuples are split up into their components
575 // - classes with their own cloud serialization are stored using that
576 // - everything else is stored using their usual serialization
577 recordlistT recordlist;
578 if constexpr (is_tuple<T>::value) {
579 recordlist+=store_tuple(world,source);
580 } else if constexpr (has_cloud_serialize<T>::value) {
581 recordlist+=source.cloud_store(world,*this);
582 } else {
583 recordlist+=store_other(world,source);
584 }
585 if (dofence) world.gop.fence();
586 return recordlist;
587 }
588
589 void replicate_according_to_policy(const std::size_t chunk_size=INT_MAX) {
591 // if (debug and (container.size() > 0)) print("no replication of container");
592 return;
593 }
595 replicate(chunk_size);
596 }
598 replicate_per_node(chunk_size);
599 }
600 else {
601 MADNESS_EXCEPTION("unknown replication policy",1);
602 }
603 container.get_world().gop.fence();
604 }
605
606 void replicate_per_node(const std::size_t chunk_size=INT_MAX) {
607 // this will fail if the container values are larger that 2GB
608 // need to reimplement that at some point
609 try {
610 double cpu0=cpu_time();
611 World& world=container.get_world();
612 world.gop.fence();
614 MADNESS_CHECK_THROW(not is_replicated,"cloud::replicate_per_node: container is already replicated");
615 container.replicate_on_hosts(true);
616 is_replicated=true;
617 world.gop.fence();
618 double cpu1=cpu_time();
619 if (debug and (world.rank()==0)) print("replication_per_node ended after ",cpu1-cpu0," seconds");
620 } catch (...) {
621 MADNESS_EXCEPTION("cloud replication_per_node failed, presumably because some data is larger than 2GB",1);
622 }
623 }
624
625 // replicates the contents of the container
626 void replicate(const std::size_t chunk_size=INT_MAX) {
627 MADNESS_CHECK_THROW(not is_replicated,"cloud::replicate_per_node: container is already replicated");
628
629 double cpu0=cpu_time();
630 World& world=container.get_world();
631 world.gop.fence();
633 container.reset_pmap_to_local();
634 is_replicated=true;
635
636 std::list<keyT> keylist;
637 for (auto it=container.begin(); it!=container.end(); ++it) {
638 keylist.push_back(it->first);
639 }
640
641 for (ProcessID rank=0; rank<world.size(); rank++) {
642 if (rank == world.rank()) {
643 std::size_t keylistsize = keylist.size();
644 world.mpi.Bcast(&keylistsize,sizeof(keylistsize),MPI_BYTE,rank);
645
646 for (auto key : keylist) {
648 bool found=container.find(acc,key);
649 MADNESS_CHECK(found);
650 auto data = acc->second;
651 std::size_t sz=data.size();
652
653 world.mpi.Bcast(&key,sizeof(key),MPI_BYTE,rank);
654 world.mpi.Bcast(&sz,sizeof(sz),MPI_BYTE,rank);
655
656 // if data is too large for MPI_INT break it into pieces to avoid integer overflow
657 for (std::size_t start=0; start<sz; start+=chunk_size) {
658 std::size_t remainder = std::min(sz - start, chunk_size);
659 world.mpi.Bcast(&data[start], remainder, MPI_BYTE, rank);
660 }
661
662 }
663 }
664 else {
665 std::size_t keylistsize;
666 world.mpi.Bcast(&keylistsize,sizeof(keylistsize),MPI_BYTE,rank);
667 for (size_t i=0; i<keylistsize; i++) {
668 keyT key;
669 world.mpi.Bcast(&key,sizeof(key),MPI_BYTE,rank);
670 std::size_t sz;
671 world.mpi.Bcast(&sz,sizeof(sz),MPI_BYTE,rank);
672 valueT data(sz);
673// world.mpi.Bcast(&data[0],sz,MPI_BYTE,rank);
674 for (std::size_t start=0; start<sz; start+=chunk_size) {
675 std::size_t remainder=std::min(sz-start,chunk_size);
676 world.mpi.Bcast(&data[start],remainder,MPI_BYTE,rank);
677 }
678
679 container.replace(key,data);
680 }
681 }
682 }
683 world.gop.fence();
684 double cpu1=cpu_time();
685 if (debug and (world.rank()==0)) print("replication ended after ",cpu1-cpu0," seconds");
686 }
687
688private:
689
690 mutable std::atomic<long> reading_time=0l; // in microseconds
691public:
692 mutable std::atomic<long> copy_time=0l; // if pointers are stored in cloud, time to copy from universe to subworld
693 mutable std::atomic<long> target_replication_time=0l; // if pointers are stored in cloud, time to replicate targets
694private:
695 mutable std::atomic<long> writing_time=0l; // in microseconds
696 mutable std::atomic<long> writing_time1=0l; // in microseconds
697 mutable std::atomic<long> replication_time=0l; // in microseconds
698 mutable std::atomic<long> cache_reads=0l;
699 mutable std::atomic<long> cache_stores=0l;
700
701 template<typename> struct is_tuple : std::false_type { };
702 template<typename ...T> struct is_tuple<std::tuple<T...>> : std::true_type { };
703
704 template<typename Q> struct is_vector : std::false_type { };
705 template<typename Q> struct is_vector<std::vector<Q>> : std::true_type { };
706
707 template<typename T> using is_parallel_serializable_object = std::is_base_of<archive::ParallelSerializableObject,T>;
708
709 template<typename T> using is_world_constructible = std::is_constructible<T, World &>;
710
711public:
712 struct cloudtimer {
714 double wall0;
715 std::atomic<long> &rtime;
716
717 cloudtimer(World& world, std::atomic<long> &readtime) : world(world), wall0(wall_time()), rtime(readtime) {}
718
720 long deltatime=long((wall_time() - wall0) * 1000000l);
721 rtime += deltatime;
722 }
723 };
724private:
725
726 template<typename T>
727 void cache(madness::World &world, const T &obj, const keyT &record) const {
728 const_cast<cacheT &>(cached_objects).insert({record,std::make_any<T>(obj)});
729 }
730
731 /// load an object from the cache, record is unchanged
732 template<typename T>
733 T load_from_cache(madness::World &world, const keyT &record) const {
734 if (world.rank()==0) cache_reads++;
735 if (debug) print("loading", type_name<T>::value(), "from cache record", record, "to world", world.id());
736 if (auto obj = std::any_cast<T>(&cached_objects.find(record)->second)) return *obj;
737 MADNESS_EXCEPTION("failed to load from cloud-cache", 1);
738 T target = allocator<T>(world);
739 return target;
740 }
741
742 bool is_cached(const keyT &key) const {
743 return (cached_objects.count(key) == 1);
744 }
745
746 /// checks if a (universe) container record is used
747
748 /// currently implemented with a local copy of the recordlist, might be
749 /// reimplemented with container.find(), which would include blocking communication.
750 bool is_in_container(const keyT &key) const {
751 auto it = std::find(local_list_of_container_keys.list.begin(),
752 local_list_of_container_keys.list.end(), key);
753 return it!=local_list_of_container_keys.list.end();
754 }
755
756 template<typename T>
757 T allocator(World &world) const {
758 if constexpr (is_world_constructible<T>::value) {
759 return T(world);
760 } else {
761 return T();
762 }
763 }
764
765 template<typename T>
768 bool is_already_present= is_in_container(record);
769 if (debug and world.rank()==0) {
770 if (is_already_present) std::cout << "skipping ";
771 if constexpr (Recordlist<keyT>::has_member_id<T>::value) {
772 std::cout << "storing world object of " << type_name<T>::value() << "id " << source.id()
773 << " to record " << record << std::endl;
774 }
775 std::cout << "storing object of " << type_name<T>::value() << " to record " << record << std::endl;
776 }
777 if constexpr (is_madness_function<T>::value) {
778 if (source.is_compressed() and T::dimT>3) print("WARNING: storing compressed hi-dim `function");
779 }
780
781 // scope is important because of destruction ordering of world objects and fence
782 if (is_already_present) {
783 if (world.rank()==0) cache_stores++;
784 } else {
785 cloudtimer t(world,writing_time1);
789 if constexpr (is_madness_function<T>::value) {
790 // store the pointer to the function, not the function itself
791 par & source.get_impl();
792 // store the pointer to the WorldObject in a list for later reference (replication/redistribution)
793 WorldObjectBase* wobj=source.get_impl().get();
794 world_object_base_list.push_back(wobj);
795 } else {
796 // store everything else
797 par & source;
798 }
799 } else {
800 // store everything else
801 par & source;
802 }
804 }
805 if (dofence) world.gop.fence();
806 return recordlistT{record};
807 }
808
809public:
810 /// load a vector from the cloud, pop records from recordlist
811 ///
812 /// @param[inout] world destination world
813 /// @param[inout] recordlist list of records to load from (reduced by the first few elements)
814 template<typename T>
815 typename std::enable_if<is_vector<T>::value, T>::type
816 do_load(World &world, recordlistT &recordlist) const {
817 std::size_t sz = do_load<std::size_t>(world, recordlist);
818 T target(sz);
819 for (std::size_t i = 0; i < sz; ++i) {
820 target[i] = do_load<typename T::value_type>(world, recordlist);
821 }
822 return target;
823 }
824
825 /// load a single object from the cloud, pop record from recordlist
826 ///
827 /// @param[inout] world destination world
828 /// @param[inout] recordlist list of records to load from (reduced by the first element)
829 template<typename T>
830 typename std::enable_if<!is_vector<T>::value, T>::type
831 do_load(World &world, recordlistT &recordlist) const {
832 keyT record = recordlist.pop_front_and_return();
834
835 if (is_cached(record)) return load_from_cache<T>(world, record);
836 if (debug) print("loading", type_name<T>::value(), "from container record", record, "to world", world.id());
837 T target = allocator<T>(world);
840 if constexpr (is_madness_function<T>::value) {
842 // load the pointer to the function, not the function itself
843 // this is important for large functions, as they are not replicated
844 // and only copied to subworlds when needed
845 try {
847 std::shared_ptr<implT> impl;
848 par & impl;
849 target.set_impl(impl); // target now points to a universe function impl
850 } catch (...) {
851 {
852 io_redirect_cout redirect;
853 print("failed to load function pointer from cloud, maybe the target is out of scope?");
854 print("record:", record, "world:", world.id());
855 print("function type:", type_name<T>::value());
856 print("\n");
857 }
858 MADNESS_EXCEPTION("load/store error of pointers in cloud", 1);
859 }
860 } else {
861 // load everything else
862 par & target;
863 }
864 } else {
865 // load everything else
866 par & target;
867 }
868
869 if (use_cache) {
870 cache(world, target, record);
871 if (is_replicated) container.erase(record);
872 }
873
874 return target;
875 }
876
877public:
878
879 // overloaded
880 template<typename T>
881 recordlistT store_other(madness::World& world, const std::vector<T>& source) {
882 if (debug and world.rank()==0)
883 std::cout << "storing vector of " << type_name<T>::value() << " of size " << source.size() << std::endl;
884 recordlistT l = store_other(world, source.size());
885 for (const auto& s : source) l += store_other(world, s);
886 if (dofence) world.gop.fence();
887 if (debug and world.rank()==0) std::cout << "done with vector storing; container size "
888 << container.size() << std::endl;
889 return l;
890 }
891
892 /// store a tuple in multiple records
893 template<typename... Ts>
894 recordlistT store_tuple(World &world, const std::tuple<Ts...> &input) {
896 auto storeaway = [&](const auto &arg) {
897 v += this->store(world, arg);
898 };
899 auto l = [&](Ts const &... arg) {
900 ((storeaway(arg)), ...);
901 };
902 std::apply(l, input);
903 return v;
904 }
905
906 /// load a tuple from the cloud, pop records from recordlist
907 ///
908 /// @param[inout] world destination world
909 /// @param[inout] recordlist list of records to load from (reduced by the first few elements)
910 template<typename T>
911 T load_tuple(madness::World &world, recordlistT &recordlist) const {
912 if (debug) std::cout << "loading tuple of type " << typeid(T).name() << " to world " << world.id() << std::endl;
913 T target;
914 std::apply([&](auto &&... args) {
915 ((args = forward_load<typename std::remove_reference<decltype(args)>::type>(world, recordlist)), ...);
916 }, target);
917 return target;
918 }
919};
920
921} /* namespace madness */
922
923#endif /* SRC_MADNESS_WORLD_CLOUD_H_ */
cloud class
Definition cloud.h:178
bool is_cached(const keyT &key) const
Definition cloud.h:742
bool use_cache
Definition cloud.h:184
void clear()
Definition cloud.h:468
void replicate_per_node(const std::size_t chunk_size=INT_MAX)
Definition cloud.h:606
bool is_in_container(const keyT &key) const
checks if a (universe) container record is used
Definition cloud.h:750
bool force_load_from_cache
forces load from cache (mainly for debugging)
Definition cloud.h:183
bool debug
prints debug output
Definition cloud.h:180
std::atomic< long > writing_time1
Definition cloud.h:696
nlohmann::json get_statistics(World &world) const
return a json object with the cloud settings and statistics
Definition cloud.h:326
std::enable_if< is_vector< T >::value, T >::type do_load(World &world, recordlistT &recordlist) const
Definition cloud.h:816
recordlistT store_other(madness::World &world, const std::vector< T > &source)
Definition cloud.h:881
std::any cached_objT
Definition cloud.h:188
recordlistT store(madness::World &world, const T &source)
Definition cloud.h:566
T load_tuple(madness::World &world, recordlistT &recordlist) const
Definition cloud.h:911
std::is_base_of< archive::ParallelSerializableObject, T > is_parallel_serializable_object
Definition cloud.h:707
~Cloud()
Definition cloud.h:245
madness::archive::ContainerRecordOutputArchive::keyT keyT
Definition cloud.h:189
std::atomic< long > replication_time
Definition cloud.h:697
Recordlist< keyT > recordlistT
Definition cloud.h:192
StoragePolicy storage_policy
are the functions (WorldObjects) stored in the cloud or only pointers to them
Definition cloud.h:220
std::is_constructible< T, World & > is_world_constructible
Definition cloud.h:709
std::atomic< long > target_replication_time
Definition cloud.h:693
nlohmann::json gather_timings(World &universe) const
Definition cloud.h:373
friend std::string to_string(const StoragePolicy sp)
Definition cloud.h:212
recordlistT store_other(madness::World &world, const T &source)
Definition cloud.h:766
bool is_replicated
if contents of the container are replicated
Definition cloud.h:181
void set_force_load_from_cache(bool value)
Definition cloud.h:263
decltype(std::declval< T >().cloud_store(std::declval< World & >(), std::declval< Cloud & >())) member_cloud_serialize_t
Definition cloud.h:233
void replicate(const std::size_t chunk_size=INT_MAX)
Definition cloud.h:626
recordlistT local_list_of_container_keys
Definition cloud.h:227
DistributionType cloud_replication_policy
cloud is a container: replication policy for the cloud container: distributed, node-replicated,...
Definition cloud.h:223
void print_size(World &universe)
Definition cloud.h:300
std::atomic< long > reading_time
Definition cloud.h:690
void clear_timings()
Definition cloud.h:472
recordlistT store_tuple(World &world, const std::tuple< Ts... > &input)
store a tuple in multiple records
Definition cloud.h:894
void set_debug(bool value)
Definition cloud.h:255
nlohmann::json gather_memory_statistics(World &universe) const
get size of the cloud container
Definition cloud.h:342
std::enable_if<!is_vector< T >::value, T >::type do_load(World &world, recordlistT &recordlist) const
Definition cloud.h:831
T load(madness::World &world, const recordlistT recordlist) const
load a single object from the cloud, recordlist is kept unchanged
Definition cloud.h:526
void cache(madness::World &world, const T &obj, const keyT &record) const
Definition cloud.h:727
void set_fence(bool value)
Definition cloud.h:259
std::atomic< long > cache_reads
Definition cloud.h:698
friend std::ostream & operator<<(std::ostream &os, const StoragePolicy &sp)
Definition cloud.h:203
static void print_timings(const nlohmann::json timings)
Definition cloud.h:416
std::list< WorldObjectBase * > world_object_base_list
Definition cloud.h:230
static void print_memory_statistics(const nlohmann::json stats)
Definition cloud.h:442
DistributionType get_replication_policy() const
is the cloud container replicated: per rank, per node, or distributed
Definition cloud.h:275
Cloud(madness::World &universe)
Definition cloud.h:241
cacheT cached_objects
Definition cloud.h:226
void clear_cache(World &subworld)
Definition cloud.h:462
std::atomic< long > copy_time
Definition cloud.h:692
StoragePolicy get_storing_policy() const
storing policy refers to storing functions or pointers to functions
Definition cloud.h:296
void replicate_according_to_policy(const std::size_t chunk_size=INT_MAX)
Definition cloud.h:589
T load_from_cache(madness::World &world, const keyT &record) const
load an object from the cache, record is unchanged
Definition cloud.h:733
madness::meta::is_detected< member_cloud_serialize_t, T > has_cloud_serialize
Definition cloud.h:236
std::vector< unsigned char > valueT
Definition cloud.h:190
void set_replication_policy(const DistributionType value)
is the cloud container replicated: per rank, per node, or distributed
Definition cloud.h:268
void distribute_targets(const DistributionType dt=Distributed)
distribute/node/rank replicate the targets of all world objects stored in the cloud
Definition cloud.h:509
bool dofence
fences after load/store
Definition cloud.h:182
void print_timings(World &universe) const
backwards compatibility
Definition cloud.h:412
bool validate_replication_policy() const
Definition cloud.h:279
T allocator(World &world) const
Definition cloud.h:757
std::atomic< long > writing_time
Definition cloud.h:695
void set_storing_policy(const StoragePolicy value)
storing policy refers to storing functions or pointers to functions
Definition cloud.h:291
madness::WorldContainer< keyT, valueT > container
Definition cloud.h:225
T forward_load(madness::World &world, recordlistT &recordlist) const
load a single object from the cloud, recordlist is consumed while loading elements
Definition cloud.h:548
StoragePolicy
Definition cloud.h:194
@ StoreFunctionPointer
Definition cloud.h:197
@ StoreFunction
Definition cloud.h:195
std::map< keyT, cached_objT > cacheT
Definition cloud.h:191
T consuming_load(madness::World &world, recordlistT &recordlist) const
similar to load, but will consume the recordlist
Definition cloud.h:539
std::atomic< long > cache_stores
Definition cloud.h:699
FunctionImpl holds all Function state to facilitate shallow copy semantics.
Definition funcimpl.h:945
A multiresolution adaptive numerical function.
Definition mra.h:139
A tensor is a multidimensional array.
Definition tensor.h:317
Makes a distributed container with specified attributes.
Definition worlddc.h:1127
bool find(accessor &acc, const keyT &key)
Write access to LOCAL value by key. Returns true if found, false otherwise (always false for remote).
Definition worlddc.h:1274
void max(T *buf, size_t nelem)
Inplace global max while still processing AM & tasks.
Definition worldgop.h:884
void fence(bool debug=false)
Synchronizes all processes in communicator AND globally ensures no pending AM or tasks.
Definition worldgop.cc:161
void min(T *buf, size_t nelem)
Inplace global min while still processing AM & tasks.
Definition worldgop.h:878
void sum(T *buf, size_t nelem)
Inplace global sum while still processing AM & tasks.
Definition worldgop.h:872
void Bcast(T *buffer, int count, int root) const
MPI broadcast an array of count elements.
Definition worldmpi.h:416
A parallel world class.
Definition world.h:132
ProcessID rank() const
Returns the process rank in this World (same as MPI_Comm_rank()).
Definition world.h:320
WorldMpiInterface & mpi
MPI interface.
Definition world.h:204
ProcessID size() const
Returns the number of processes in this World (same as MPI_Comm_size()).
Definition world.h:330
unsigned long id() const
Definition world.h:315
WorldGopInterface & gop
Global operations.
Definition world.h:207
Definition parallel_dc_archive.h:60
Definition parallel_dc_archive.h:14
long keyT
Definition parallel_dc_archive.h:16
An archive for storing local or parallel data, wrapping a BinaryFstreamInputArchive.
Definition parallel_archive.h:366
An archive for storing local or parallel data wrapping a BinaryFstreamOutputArchive.
Definition parallel_archive.h:321
auto T(World &world, response_space &f) -> response_space
Definition global_functions.cc:28
Tensor< typename Tensor< T >::scalar_type > arg(const Tensor< T > &t)
Return a new tensor holding the argument of each element of t (complex types only)
Definition tensor.h:2503
static const double v
Definition hatom_sf_dirac.cc:20
#define MADNESS_CHECK(condition)
Check a condition — even in a release build the condition is always evaluated so it can have side eff...
Definition madness_exception.h:182
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition madness_exception.h:119
#define MADNESS_CHECK_THROW(condition, msg)
Check a condition — even in a release build the condition is always evaluated so it can have side eff...
Definition madness_exception.h:207
typename detail::detector< nonesuch, void, Op, Args... >::value_t is_detected
Definition meta.h:169
Definition array_addons.h:50
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
void hash_range(hashT &seed, It first, It last)
Definition worldhash.h:280
double get_rss_usage_in_GB()
Definition ranks_and_hosts.cpp:10
static double cpu_time()
Returns the cpu time in seconds relative to an arbitrary origin.
Definition timers.h:127
DistributionType
some introspection of how data is distributed
Definition worlddc.h:81
@ NodeReplicated
even if there are several ranks per node
Definition worlddc.h:84
@ Distributed
no replication of the container, the container is distributed over the world
Definition worlddc.h:82
@ RankReplicated
replicate the container over all world ranks
Definition worlddc.h:83
void hash_combine(hashT &seed, const T &v)
Combine hash values.
Definition worldhash.h:260
static class madness::twoscale_cache_class cache[kmax+1]
DistributionType validate_distribution_type(const dcT &dc)
check distribution type of WorldContainer – global communication
Definition worlddc.h:319
void print(const T &t, const Ts &... ts)
Print items to std::cout (items separated by spaces) and terminate with a new line.
Definition print.h:226
double wall_time()
Returns the wall time in seconds relative to an arbitrary origin.
Definition timers.cc:48
std::string type(const PairType &n)
Definition PNOParameters.h:18
static bool print_timings
Definition SCF.cc:106
std::string name(const FuncType &type, const int ex=-1)
Definition ccpairfunction.h:28
madness::hashT hash_value(const std::array< T, N > &a)
Hash std::array with madness hash.
Definition array_addons.h:78
Definition mraimpl.h:51
Definition test_dc.cc:47
Definition hatom_sf_dirac.cc:91
Definition test_ccpairfunction.cc:22
functor to distribute/rank/node-replicate a function, passed in as a pointer to WorldObjectBase
Definition cloud.h:485
DistributionType dt
Definition cloud.h:486
int operator()(WorldObjectBase *wo) const
Definition cloud.h:488
DistributeFunctor(const DistributionType dt)
Definition cloud.h:487
Definition cloud.h:712
std::atomic< long > & rtime
Definition cloud.h:715
World & world
Definition cloud.h:713
double wall0
Definition cloud.h:714
~cloudtimer()
Definition cloud.h:719
cloudtimer(World &world, std::atomic< long > &readtime)
Definition cloud.h:717
Definition cloud.h:701
Definition cloud.h:704
Definition cloud.h:65
static keyT compute_record(const std::vector< T > &arg)
Definition cloud.h:125
keyT pop_front_and_return()
Definition cloud.h:84
static keyT compute_record(const Function< T, NDIM > &arg)
Definition cloud.h:110
Recordlist(const Recordlist &other)
Definition cloud.h:72
Recordlist(const keyT &key)
Definition cloud.h:70
static keyT compute_record(const std::shared_ptr< T > &arg)
Definition cloud.h:131
static keyT compute_record(const T &arg)
Definition cloud.h:134
std::size_t size() const
Definition cloud.h:90
madness::meta::is_detected< member_id_t, T > has_member_id
Definition cloud.h:100
decltype(std::declval< T >().id()) member_id_t
Definition cloud.h:97
friend std::ostream & operator<<(std::ostream &os, const Recordlist &arg)
Definition cloud.h:148
Recordlist & operator+=(const Recordlist &list2)
Definition cloud.h:74
static keyT compute_record(const WorldContainer< keyQ, valueT > &arg)
Definition cloud.h:116
decltype(std::declval< T >().hash()) member_hash_t
Definition cloud.h:104
static keyT compute_record(const std::shared_ptr< WorldContainer< keyQ, valueT > > &arg)
Definition cloud.h:119
std::list< keyT > list
Definition cloud.h:66
Recordlist & operator+=(const keyT &key)
Definition cloud.h:79
static keyT compute_record(const FunctionImpl< T, NDIM > *arg)
Definition cloud.h:113
static keyT compute_record(const std::shared_ptr< madness::FunctionImpl< T, NDIM > > &arg)
Definition cloud.h:122
static keyT compute_record(const Tensor< T > &arg)
Definition cloud.h:128
madness::meta::is_detected< member_hash_t, T > has_member_hash
Definition cloud.h:107
Recordlist()
Definition cloud.h:68
Base class for WorldObject, useful for introspection.
Definition world_object.h:340
class to temporarily redirect output to cout
Definition print.h:277
Definition mra.h:2902
static const char * value()
Definition cloud.h:39
static const char * value()
Definition cloud.h:41
static const char * value()
Definition cloud.h:43
static const char * value()
Definition cloud.h:45
static const char * value()
Definition cloud.h:47
static const char * value()
Definition cloud.h:49
static const char * value()
Definition cloud.h:52
static const char * value()
Definition cloud.h:54
static const char * value()
Definition cloud.h:56
static const char * value()
Definition cloud.h:58
static const char * value()
Definition cloud.h:60
static const char * value()
Definition cloud.h:62
A utility to get the name of a type as a string from chatGPT.
Definition cloud.h:34
static const char * value()
Definition cloud.h:35
#define MPI_BYTE
Definition stubmpi.h:77
double source(const coordT &r)
Definition testperiodic.cc:48
static madness::WorldMemInfo stats
Definition worldmem.cc:64
int ProcessID
Used to clearly identify process number/rank.
Definition worldtypes.h:43
FLOAT target(const FLOAT &x)
Definition y.cc:295