MADNESS 0.10.1
worldgop.h
Go to the documentation of this file.
1/*
2 This file is part of MADNESS.
3
4 Copyright (C) 2007,2010 Oak Ridge National Laboratory
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
20 For more information please contact:
21
22 Robert J. Harrison
23 Oak Ridge National Laboratory
24 One Bethel Valley Road
25 P.O. Box 2008, MS-6367
26
27 email: harrisonrj@ornl.gov
28 tel: 865-241-3937
29 fax: 865-572-0680
30 */
31
32#ifndef MADNESS_WORLD_WORLDGOP_H__INCLUDED
33#define MADNESS_WORLD_WORLDGOP_H__INCLUDED
34
35/// \file worldgop.h
36/// \brief Implements global operations
37
38/// If you can recall the Intel hypercubes, their comm lib used GOP as
39/// the abbreviation.
40
41#include <functional>
42#include <type_traits>
45#include <madness/world/world.h>
48#include <madness/world/group.h>
50#include <madness/world/units.h>
51
52namespace madness {
53
54 // Forward declarations
55 class World;
56 class WorldAmInterface;
57 class WorldTaskQueue;
58 namespace detail {
59
60 class DeferredCleanup;
61
62 } // namespace detail
63
64 template <typename T>
65 struct WorldSumOp {
66 inline T operator()(const T& a, const T& b) const {
67 return a+b;
68 }
69 };
70
71 template <typename T>
72 struct WorldMultOp {
73 inline T operator()(const T& a, const T& b) const {
74 return a*b;
75 }
76 };
77
78 template <typename T>
79 struct WorldMaxOp {
80 inline T operator()(const T& a, const T& b) const {
81 return a>b? a : b;
82 }
83 };
84
85 template <typename T>
87 inline T operator()(const T& a, const T& b) const {
88 return abs(a)>abs(b)? abs(a) : abs(b);
89 }
90 };
91
92 template <typename T>
93 struct WorldMinOp {
94 inline T operator()(const T& a, const T& b) const {
95 return a<b? a : b;
96 }
97 };
98
99 template <typename T>
101 inline T operator()(const T& a, const T& b) const {
102 return abs(a)<abs(b)? abs(a) : abs(b);
103 }
104 };
105
106 template <typename T>
108 inline T operator()(const T& a, const T& b) const {
109 return a & b;
110 }
111 };
112
113 template <typename T>
115 inline T operator()(const T& a, const T& b) const {
116 return a | b;
117 }
118 };
119
120 template <typename T>
122 inline T operator()(const T& a, const T& b) const {
123 return a ^ b;
124 }
125 };
126
127 template <typename T>
129 inline T operator()(const T& a, const T& b) const {
130 return a && b;
131 }
132 };
133
134 template <typename T>
136 inline T operator()(const T& a, const T& b) const {
137 return a || b;
138 }
139 };
140
141
142 /// Provides collectives that interoperate with the AM and task interfaces
143
144 /// If native AM interoperates with MPI we probably should map these to MPI.
146 private:
147 World& world_; ///< World object that this is a part of
148 std::shared_ptr<detail::DeferredCleanup> deferred_; ///< Deferred cleanup object.
149 bool debug_; ///< Debug mode
150 bool forbid_fence_=false; ///< forbid calling fence() in case of several active worlds
151 int max_reducebcast_msg_size_ = std::numeric_limits<int>::max(); ///< maximum size of messages (in bytes) sent by reduce and broadcast
152
154
155 // Message tags
156 struct PointToPointTag { };
157 struct LazySyncTag { };
159 struct BcastTag { };
160 struct GroupBcastTag { };
161 struct ReduceTag { };
162 struct GroupReduceTag { };
163 struct AllReduceTag { };
165
166
167 /// Delayed send callback object
168
169 /// This callback object is used to send local data to a remove process
170 /// once it has been set.
171 /// \tparam keyT The data key
172 /// \tparam valueT The type of data to be sent
173 template <typename keyT, typename valueT>
175 private:
176 World& world_; ///< The communication world
177 const ProcessID dest_; ///< The destination process id
178 const keyT key_; ///< The distributed id associated with \c value_
179 Future<valueT> value_; ///< The data to be sent
180
181 // Not allowed
184
185 public:
186
187 /// Constructor
188 DelayedSend(World& world, const ProcessID dest,
189 const keyT& key, const Future<valueT>& value) :
190 world_(world), dest_(dest), key_(key), value_(value)
191 { }
192
193 virtual ~DelayedSend() { }
194
195 /// Notify this object that the future has been set.
196
197 /// This will set the value of the future on the remote node and delete
198 /// this callback object.
199 virtual void notify() {
202 delete this;
203 }
204 }; // class DelayedSend
205
206 /// Receive data from remote node
207
208 /// \tparam valueT The data type stored in cache
209 /// \param key The distributed ID
210 /// \return A future to the data
211 template <typename valueT, typename keyT>
212 static Future<valueT> recv_internal(const keyT& key) {
213 return detail::DistCache<keyT>::template get_cache_value<valueT>(key);
214 }
215
216 /// Send \c value to \c dest
217
218 /// Send non-future data to \c dest.
219 /// \tparam keyT The key type
220 /// \tparam valueT The value type
221 /// \param dest The node where the data will be sent
222 /// \param key The key that is associated with the data
223 /// \param value The data to be sent to \c dest
224 template <typename keyT, typename valueT>
225 typename std::enable_if<!is_future<valueT>::value >::type
226 send_internal(const ProcessID dest, const keyT& key, const valueT& value) const {
227 typedef detail::DistCache<keyT> dist_cache;
228
229 if(world_.rank() == dest) {
230 // When dest is this process, skip the task and set the future immediately.
231 dist_cache::set_cache_value(key, value);
232 } else {
233 // Spawn a remote task to set the value
234 world_.taskq.add(dest, dist_cache::template set_cache_value<valueT>, key,
235 value, TaskAttributes::hipri());
236 }
237 }
238
239 /// Send \c value to \c dest
240
241 /// Send data that is stored in a future to \c dest. The data in
242 /// \c value is only sent to the remote process once it has been set.
243 /// \tparam keyT The key type
244 /// \tparam valueT The value type
245 /// \param dest The node where the data will be sent
246 /// \param key The key that is associated with the data
247 /// \param value The data to be sent to \c dest
248 template <typename keyT, typename valueT>
249 void send_internal(ProcessID dest, const keyT& key, const Future<valueT>& value) const {
250 typedef detail::DistCache<keyT> dist_cache;
251
252 if(world_.rank() == dest) {
253 dist_cache::set_cache_value(key, value);
254 } else {
255 // The destination is not this node, so send it to the destination.
256 if(value.probe()) {
257 // Spawn a remote task to set the value
258 world_.taskq.add(dest, dist_cache::template set_cache_value<valueT>, key,
259 value.get(), TaskAttributes::hipri());
260 } else {
261 // The future is not ready, so create a callback object that will
262 // send value to the destination node when it is ready.
263 DelayedSend<keyT, valueT>* delayed_send_callback =
264 new DelayedSend<keyT, valueT>(world_, dest, key, value);
265 const_cast<Future<valueT>&>(value).register_callback(delayed_send_callback);
266
267 }
268 }
269 }
270
271 /// Lazy sync parent task
272
273 /// Send signal to the parent process in the binary tree for a lazy sync
274 /// operation.
275 /// \tparam keyT The key type
276 /// \param parent The parent process of this process in the binary tree
277 /// \param key The lazy sync key
278 template <typename keyT>
279 void lazy_sync_parent(const ProcessID parent, const keyT& key,
280 const ProcessID, const ProcessID) const
281 {
282 send_internal(parent, key, key.proc());
283 }
284
285 /// Lazy sync parent task
286
287 /// Send signal to the child processes in the binary tree for a lazy
288 /// sync operation. After the signal has been sent to the children, the
289 /// sync operation, \c op, will be run.
290 /// \tparam keyT The key type
291 /// \tparam opT The sync operation type
292 /// \param child0 The first child process of this process in the binary tree
293 /// \param child1 The second child process of this process in the binary tree
294 /// \param key The key associated with the sync operation
295 /// \param op The sync operation that will be run
296 template <typename keyT, typename opT>
297 void lazy_sync_children(const ProcessID child0, const ProcessID child1,
298 const keyT& key, opT& op, const ProcessID) const
299 {
300 // Signal children to execute the operation.
301 if(child0 != -1)
302 send_internal(child0, key, 1);
303 if(child1 != -1)
304 send_internal(child1, key, 1);
305
306 // Execute the operation on this process.
307 op();
308 }
309
310 /// Start a distributed lazy sync operation
311
312 /// \param key The sync key
313 /// \param op The sync operation to be executed on this process
314 template <typename tagT, typename keyT, typename opT>
315 void lazy_sync_internal(const ProcessID parent, const ProcessID child0,
316 const ProcessID child1, const keyT& key, const opT& op) const {
317 typedef ProcessKey<keyT, tagT> key_type;
318
319 // Get signals from parent and children.
320 madness::Future<ProcessID> child0_signal = (child0 != -1 ?
321 recv_internal<ProcessID>(key_type(key, child0)) :
323 madness::Future<ProcessID> child1_signal = (child1 != -1 ?
324 recv_internal<ProcessID>(key_type(key, child1)) :
326 madness::Future<ProcessID> parent_signal = (parent != -1 ?
327 recv_internal<ProcessID>(key_type(key, parent)) :
329
330 // Construct the task that notifies children to run the operation
331 key_type my_key(key, world_.rank());
332 auto lazy_sync_children_fn = & WorldGopInterface::template lazy_sync_children<key_type, opT>;
333 world_.taskq.add(*this, lazy_sync_children_fn,
334 child0_signal, child1_signal, my_key, op, parent_signal,
336
337 // Send signal to parent
338 if(parent != -1) {
339 if(child0_signal.probe() && child1_signal.probe())
340 send_internal(parent, my_key, world_.rank());
341 else {
342 auto lazy_sync_parent_fn = & WorldGopInterface::template lazy_sync_parent<key_type>;
343 world_.taskq.add(*this, lazy_sync_parent_fn,
344 parent, my_key, child0_signal, child1_signal,
346 }
347 }
348 }
349
350
351 template <typename keyT, typename valueT, typename taskfnT>
352 static void bcast_handler(const AmArg& arg) {
353 // Deserialize message arguments
354 taskfnT taskfn;
355 keyT key;
356 valueT value;
357 ProcessID root;
358
359 arg & taskfn & key & value & root;
360
361 // Add task to queue
362 arg.get_world()->taskq.add(arg.get_world()->gop, taskfn, key,
363 value, root, TaskAttributes::hipri());
364 }
365
366 template <typename keyT, typename valueT, typename taskfnT>
367 static void group_bcast_handler(const AmArg& arg) {
368 // Deserialize message arguments
369 taskfnT taskfn;
370 keyT key;
371 valueT value;
372 ProcessID group_root;
373 DistributedID group_key;
374
375 arg & taskfn & key & value & group_root & group_key;
376
377 // Get the local group
378 const Future<Group> group = Group::get_group(group_key);
379
380 // Add task to queue
381 arg.get_world()->taskq.add(arg.get_world()->gop, taskfn, key, value,
382 group_root, group, TaskAttributes::hipri());
383 }
384
385
386 /// Broadcast task
387
388 /// This task will set the local cache with the broadcast data and send
389 /// it to child processes in the binary tree.
390 template <typename keyT, typename valueT>
391 void bcast_task(const keyT& key, const valueT& value, const ProcessID root) const {
392 typedef void (WorldGopInterface::*taskfnT)(const keyT&, const valueT&,
393 const ProcessID) const;
394
395 // Compute binary tree data
396 ProcessID parent = -1, child0 = -1, child1 = -1;
397 world_.mpi.binary_tree_info(root, parent, child0, child1);
398
399 // Set the local data, except on the root process
400 if(parent != -1)
402
403 if(child0 != -1) { // Check that this process has children in the binary tree
404
405 // Get handler function and arguments
406 void (*handler)(const AmArg&) =
407 & WorldGopInterface::template bcast_handler<keyT, valueT, taskfnT>;
408 AmArg* const args0 = new_am_arg(
409 & WorldGopInterface::template bcast_task<keyT, valueT>,
410 key, value, root);
411
412 // Send active message to children
413 if(child1 != -1) {
414 AmArg* const args1 = copy_am_arg(*args0);
415 world_.am.send(child1, handler, args1, RMI::ATTR_UNORDERED);
416 }
417 world_.am.send(child0, handler, args0, RMI::ATTR_UNORDERED);
418 }
419 }
420
421 template <typename keyT, typename valueT>
422 void group_bcast_task(const keyT& key, const valueT& value,
423 const ProcessID group_root, const Group& group) const
424 {
425 typedef void (WorldGopInterface::*taskfnT)(const keyT&, const valueT&,
426 const ProcessID, const Group&) const;
427
428 // Set the local data, except on the root process
429 ProcessID parent = -1, child0 = -1, child1 = -1;
430 group.make_tree(group_root, parent, child0, child1);
431
432 // Set the local data
433 if(parent != -1) {
435 group.remote_update();
436 }
437
438 if(child0 != -1) { // Check that this process has children in the binary tree
439
440 // Get handler function and arguments
441 void (*handler)(const AmArg&) =
442 & WorldGopInterface::template group_bcast_handler<keyT, valueT, taskfnT>;
443 AmArg* const args0 = new_am_arg(
444 & WorldGopInterface::template group_bcast_task<keyT, valueT>,
445 key, value, group_root, group.id());
446
447 // Send active message to children
448 if(child1 != -1) {
449 AmArg* const args1 = copy_am_arg(*args0);
450 world_.am.send(child1, handler, args1, RMI::ATTR_UNORDERED);
451 }
452 world_.am.send(child0, handler, args0, RMI::ATTR_UNORDERED);
453 }
454 }
455
456 /// Broadcast
457
458 /// Broadcast data from the \c root process to all processes in \c world.
459 /// The input/output data is held by \c value.
460 /// \tparam tagT The tag type that is attached to \c keyT
461 /// \tparam keyT The base key type
462 /// \tparam valueT The value type that will be broadcast
463 /// \param[in] key The key associated with this broadcast
464 /// \param[in,out] value On the \c root process, this is used as the input
465 /// data that will be broadcast to all other processes in the group.
466 /// On other processes it is used as the output to the broadcast
467 /// \param root The process that owns the data to be broadcast
468 /// \throw madness::Exception When \c value has been set, except on the
469 /// \c root process.
470 template <typename tagT, typename keyT, typename valueT>
471 void bcast_internal(const keyT& key, Future<valueT>& value, const ProcessID root) const {
472 MADNESS_ASSERT((root >= 0) && (root < world_.size()));
473 MADNESS_ASSERT((world_.rank() == root) || (! value.probe()));
474
475 // Add operation tag to key
476 typedef TaggedKey<keyT, tagT> key_type;
477 const key_type tagged_key(key);
478
479 if(world_.size() > 1) { // Do nothing for the trivial case
480 if(world_.rank() == root) {
481 // This process owns the data to be broadcast.
482
483 // Spawn remote tasks that will set the local cache for this
484 // broadcast on other nodes.
485 if(value.probe())
486 // The value is ready so send it now
487 bcast_task(tagged_key, value.get(), root);
488 else {
489 // The value is not ready so spawn a task to send the
490 // data when it is ready.
491 auto bcast_task_fn = & WorldGopInterface::template bcast_task<key_type, valueT>;
492 world_.taskq.add(*this, bcast_task_fn,
493 tagged_key, value, root, TaskAttributes::hipri());
494 }
495 } else {
496 MADNESS_ASSERT(! value.probe());
497
498 // Get the broadcast value from local cache
500 }
501 }
502 }
503
504 /// Group broadcast
505
506 /// Broadcast data from the \c group_root process to all processes in
507 /// \c group. The input/output data is held by \c value.
508 /// \tparam tagT The tag type that is attached to \c keyT
509 /// \tparam keyT The base key type
510 /// \tparam valueT The value type that will be broadcast
511 /// \param[in] key The key associated with this broadcast
512 /// \param[in,out] value On the \c group_root process, this is used as the
513 /// input data that will be broadcast to all other processes in the group.
514 /// On other processes it is used as the output to the broadcast
515 /// \param group_root The process in \c group that owns the data to be
516 /// broadcast
517 /// \param group The process group where value will be broadcast
518 /// \throw madness::Exception When \c value has been set, except on the
519 /// \c group_root process.
520 template <typename tagT, typename keyT, typename valueT>
521 void bcast_internal(const keyT& key, Future<valueT>& value,
522 const ProcessID group_root, const Group& group) const
523 {
524 // Construct the internal broadcast key
525 typedef TaggedKey<keyT, tagT> key_type;
526 const key_type tagged_key(key);
527
528 if(group.rank() == group_root) {
529 // This process owns the data to be broadcast.
530 if(value.probe())
531 group_bcast_task(tagged_key, value.get(), group_root, group);
532 else {
533 auto group_bcast_task_fn = & WorldGopInterface::template group_bcast_task<key_type, valueT>;
534 world_.taskq.add(this, group_bcast_task_fn,
535 tagged_key, value, group_root, group,
537 }
538 } else {
539 MADNESS_ASSERT(! value.probe());
540
541 // This is not the root process, so retrieve the broadcast data
543
544 // Increment local use counter for group
545 group.local_update();
546 }
547 }
548
549 template <typename valueT, typename opT>
550 static typename detail::result_of<opT>::type
551 reduce_task(const valueT& value, const opT& op) {
552 typename detail::result_of<opT>::type result = op();
553 op(result, value);
554 return result;
555 }
556
557 template <typename opT>
558 static typename detail::result_of<opT>::type
559 reduce_result_task(const std::vector<Future<typename detail::result_of<opT>::type> >& results,
560 const opT& op)
561 {
562 MADNESS_ASSERT(results.size() != 0ul);
563 Future<typename detail::result_of<opT>::type> result = results.front();
564 for(std::size_t i = 1ul; i < results.size(); ++i)
565 op(result.get(), results[i].get());
566 return result.get();
567 }
568
569 /// Distributed reduce
570
571 /// \tparam tagT The tag type to be added to the key type
572 /// \tparam keyT The key type
573 /// \tparam valueT The data type to be reduced
574 /// \tparam opT The reduction operation type
575 /// \param key The key associated with this reduction
576 /// \param value The local value to be reduced
577 /// \param op The reduction operation to be applied to local and remote data
578 /// \param root The process that will receive the result of the reduction
579 /// \return A future to the reduce value on the root process, otherwise an
580 /// uninitialized future that may be ignored.
581 template <typename tagT, typename keyT, typename valueT, typename opT>
583 reduce_internal(const ProcessID parent, const ProcessID child0,
584 const ProcessID child1, const ProcessID root, const keyT& key,
585 const valueT& value, const opT& op)
586 {
587 // Create tagged key
588 typedef ProcessKey<keyT, tagT> key_type;
589 typedef typename detail::result_of<opT>::type result_type;
590 typedef typename remove_future<valueT>::type value_type;
591 std::vector<Future<result_type> > results;
592 results.reserve(3);
593
594 // Add local data to vector of values to reduce
595 results.push_back(world_.taskq.add(WorldGopInterface::template reduce_task<value_type, opT>,
596 value, op, TaskAttributes::hipri()));
597
598 // Reduce child data
599 if(child0 != -1)
600 results.push_back(recv_internal<result_type>(key_type(key, child0)));
601 if(child1 != -1)
602 results.push_back(recv_internal<result_type>(key_type(key, child1)));
603
604 // Submit the local reduction task
605 Future<result_type> local_result =
606 world_.taskq.add(WorldGopInterface::template reduce_result_task<opT>,
607 results, op, TaskAttributes::hipri());
608
609 // Send reduced value to parent or, if this is the root process, set the
610 // result future.
611 if(parent == -1)
612 return local_result;
613 else
614 send_internal(parent, key_type(key, world_.rank()), local_result);
615
617 }
618
619 /// Implementation of fence
620
621 /// \param[in] epilogue the action to execute (by the calling thread) immediately after the fence
622 /// \param[in] pause_during_epilogue whether to suspend work while executing epilogue
623 /// \param[in] debug set to true to print progress statistics using madness::print(); the default is false.
624 /// \warning currently only \c pause_during_epilogue=false is supported
625 void fence_impl(std::function<void()> epilogue = []{},
626 bool pause_during_epilogue = false,
627 bool debug = false);
628
630 int result = std::numeric_limits<int>::max();
631 const auto* initial_max_reducebcast_msg_size_cstr = std::getenv("MAD_MAX_REDUCEBCAST_MSG_SIZE");
632 if (initial_max_reducebcast_msg_size_cstr) {
633 auto result_u64 = cstr_to_memory_size(initial_max_reducebcast_msg_size_cstr);
634 const auto do_print = SafeMPI::COMM_WORLD.Get_rank() == 0 && !madness::quiet();
635 if (result_u64>std::numeric_limits<int>::max()) {
636 if (do_print)
637 std::cout
638 << "!!MADNESS WARNING: Invalid value for environment variable MAD_MAX_REDUCEBCAST_MSG_SIZE.\n"
639 << "!!MADNESS WARNING: MAD_MAX_REDUCEBCAST_MSG_SIZE = "
640 << result_u64 << "\n";
641 result = std::numeric_limits<int>::max();
642 }
643 result = static_cast<int>(result_u64);
644 if(do_print) {
645 std::cout
646 << "MADNESS max msg size for GOP reduce/broadcast set to "
647 << result << " bytes.\n";
648 }
649 }
650 return result;
651 }
652
653 public:
654
655 // In the World constructor can ONLY rely on MPI and MPI being initialized
659
661 deferred_->destroy(true);
662 deferred_->do_cleanup();
663 }
664
665
666 /// Set debug flag to new value and return old value
667 bool set_debug(bool value) {
668 bool status = debug_;
669 debug_ = value;
670 return status;
671 }
672
673 /// Set forbid_fence flag to new value and return old value
674 bool set_forbid_fence(bool value) {
675 bool status = forbid_fence_;
676 forbid_fence_ = value;
677 return status;
678 }
679
680 /// Set the maximum size of messages (in bytes) sent by reduce and broadcast
681
682 /// \param sz the maximum size of messages (in bytes) sent by reduce and broadcast
683 /// \return the previous maximum size of messages (in bytes) sent by reduce and broadcast
684 /// \pre `sz>0`
686 MADNESS_ASSERT(sz>0);
687 std::swap(max_reducebcast_msg_size_,sz);
689 }
690
691
692 /// Returns the maximum size of messages (in bytes) sent by reduce and broadcast
693
694 /// \return the maximum size of messages (in bytes) sent by reduce and broadcast
698
699 /// Synchronizes all processes in communicator ... does NOT fence pending AM or tasks
700 void barrier() {
701 long i = world_.rank();
702 sum(i);
703 if (i != world_.size()*(world_.size()-1)/2) error("bad value after sum in barrier");
704 }
705
706
707 /// Synchronizes all processes in communicator AND globally ensures no pending AM or tasks
708
709 /// \internal Runs Dykstra-like termination algorithm on binary tree by
710 /// locally ensuring ntask=0 and all am sent and processed,
711 /// and then participating in a global sum of nsent and nrecv.
712 /// Then globally checks that nsent=nrecv and that both are
713 /// constant over two traversals. We are then sure
714 /// that all tasks and AM are processed and there no AM in
715 /// flight.
716 /// \param[in] debug set to true to print progress statistics using madness::print(); the default is false.
717 void fence(bool debug = false);
718
719 /// Executes an action on single (this) thread after ensuring all other work is done
720
721 /// \param[in] action the action to execute (by the calling thread)
722 void serial_invoke(std::function<void()> action);
723
724 /// Broadcasts bytes from process root while still processing AM & tasks
725
726 /// Optimizations can be added for long messages
727 void broadcast(void* buf, size_t nbyte, ProcessID root, bool dowork = true, Tag bcast_tag = -1);
728
729
730 /// Broadcasts typed contiguous data from process root while still processing AM & tasks
731
732 /// Optimizations can be added for long messages
733 template <typename T, typename = std::enable_if_t<madness::is_trivially_copyable_v<T>>>
734 inline void broadcast(T* buf, size_t nelem, ProcessID root) {
735 broadcast((void *) buf, nelem*sizeof(T), root);
736 }
737
738 /// Broadcast of a scalar from node 0 to all other nodes
739 template <typename T, typename = std::enable_if_t<madness::is_trivially_copyable_v<T>>>
740 void broadcast(T& t) {
741 broadcast(&t, 1, 0);
742 }
743
744 /// Broadcast of a scalar from node root to all other nodes
745 template <typename T, typename = std::enable_if_t<madness::is_trivially_copyable_v<T>>>
746 void broadcast(T& t, ProcessID root) {
747 broadcast(&t, 1, root);
748 }
749
750 /// Broadcast a serializable object
751 template <typename objT,
752 typename = std::void_t<decltype(std::declval<archive::BufferInputArchive&>()&std::declval<objT&>())>,
753 typename = std::void_t<decltype(std::declval<archive::BufferOutputArchive&>()&std::declval<const objT&>())>>
754 void broadcast_serializable(objT& obj, ProcessID root) {
755 MADNESS_ASSERT(root < world_.size());
756 if (world_.size() == 1) return;
757
758 size_t BUFLEN;
759 if (world_.rank() == root) {
761 count & obj;
762 BUFLEN = count.size();
763 }
764 broadcast(BUFLEN, root);
765
766 unsigned char* buf = new unsigned char[BUFLEN];
767 if (world_.rank() == root) {
768 archive::BufferOutputArchive ar(buf,BUFLEN);
769 ar & obj;
770 }
771 broadcast(buf, BUFLEN, root);
772 if (world_.rank() != root) {
773 archive::BufferInputArchive ar(buf,BUFLEN);
774 ar & obj;
775 }
776 delete [] buf;
777 }
778
779 /// Inplace global reduction (like MPI all_reduce) while still processing AM & tasks
780
781 /// Optimizations can be added for long messages and to reduce the memory footprint
782 template <typename T, class opT>
783 void reduce(T* buf, std::size_t nelem, opT op) {
784 static_assert(madness::is_trivially_copyable_v<T>, "T must be trivially copyable");
785
786 ProcessID parent, child0, child1;
787 world_.mpi.binary_tree_info(0, parent, child0, child1);
788 const std::size_t nelem_per_maxmsg =
789 max_reducebcast_msg_size() / sizeof(T);
790
791 const auto buf_size = ((sizeof(T) * std::min(nelem_per_maxmsg, nelem) +
792 std::alignment_of_v<T> - 1) /
793 std::alignment_of_v<T>) * std::alignment_of_v<T>;
794 struct free_dtor {
795 void operator()(T *ptr) {
796 if (ptr != nullptr)
797 std::free(ptr);
798 };
799 };
800 using sptr_t = std::unique_ptr<T[], free_dtor>;
801
802 auto aligned_buf_alloc = [&]() -> T* {
803 // posix_memalign requires alignment to be an integer multiple of sizeof(void*)!! so ensure that
804 const std::size_t alignment =
805 ((std::alignment_of_v<T> + sizeof(void *) - 1) /
806 sizeof(void *)) *
807 sizeof(void *);
808#ifdef HAVE_POSIX_MEMALIGN
809 void *ptr;
810 if (posix_memalign(&ptr, alignment, buf_size) != 0) {
811 throw std::bad_alloc();
812 }
813 return static_cast<T *>(ptr);
814#else
815 return static_cast<T *>(std::aligned_alloc(alignment, buf_size));
816#endif
817 };
818
819 sptr_t buf0;
820 if (child0 != -1)
821 buf0 = sptr_t(aligned_buf_alloc(),
822 free_dtor{});
823 sptr_t buf1(nullptr);
824 if (child1 != -1)
825 buf1 = sptr_t(aligned_buf_alloc(),
826 free_dtor{});
827
828 auto reduce_impl = [&,this](T* buf, size_t nelem) {
829 MADNESS_ASSERT(nelem <= nelem_per_maxmsg);
830 SafeMPI::Request req0, req1;
831 Tag gsum_tag = world_.mpi.unique_tag();
832
833 if (child0 != -1)
834 req0 = world_.mpi.Irecv(buf0.get(), nelem * sizeof(T), MPI_BYTE,
835 child0, gsum_tag);
836 if (child1 != -1)
837 req1 = world_.mpi.Irecv(buf1.get(), nelem * sizeof(T), MPI_BYTE,
838 child1, gsum_tag);
839
840 if (child0 != -1) {
841 World::await(req0);
842 for (long i = 0; i < (long)nelem; ++i)
843 buf[i] = op(buf[i], buf0[i]);
844 }
845 if (child1 != -1) {
846 World::await(req1);
847 for (long i = 0; i < (long)nelem; ++i)
848 buf[i] = op(buf[i], buf1[i]);
849 }
850
851 if (parent != -1) {
852 req0 = world_.mpi.Isend(buf, nelem * sizeof(T), MPI_BYTE, parent,
853 gsum_tag);
854 World::await(req0);
855 }
856
857 broadcast(buf, nelem, 0);
858 };
859
860 while (nelem) {
861 const int n = std::min(nelem_per_maxmsg, nelem);
862 reduce_impl(buf, n);
863 nelem -= n;
864 buf += n;
865 }
866 }
867
868 /// Inplace global sum while still processing AM & tasks
869 template <typename T>
870 inline void sum(T* buf, size_t nelem) {
871 reduce< T, WorldSumOp<T> >(buf, nelem, WorldSumOp<T>());
872 }
873
874 /// Inplace global min while still processing AM & tasks
875 template <typename T>
876 inline void min(T* buf, size_t nelem) {
877 reduce< T, WorldMinOp<T> >(buf, nelem, WorldMinOp<T>());
878 }
879
880 /// Inplace global max while still processing AM & tasks
881 template <typename T>
882 inline void max(T* buf, size_t nelem) {
883 reduce< T, WorldMaxOp<T> >(buf, nelem, WorldMaxOp<T>());
884 }
885
886 /// Inplace global absmin while still processing AM & tasks
887 template <typename T>
888 inline void absmin(T* buf, size_t nelem) {
889 reduce< T, WorldAbsMinOp<T> >(buf, nelem, WorldAbsMinOp<T>());
890 }
891
892 /// Inplace global absmax while still processing AM & tasks
893 template <typename T>
894 inline void absmax(T* buf, size_t nelem) {
895 reduce< T, WorldAbsMaxOp<T> >(buf, nelem, WorldAbsMaxOp<T>());
896 }
897
898 /// Inplace global product while still processing AM & tasks
899 template <typename T>
900 inline void product(T* buf, size_t nelem) {
901 reduce< T, WorldMultOp<T> >(buf, nelem, WorldMultOp<T>());
902 }
903
904 template <typename T>
905 inline void bit_and(T* buf, size_t nelem) {
906 reduce< T, WorldBitAndOp<T> >(buf, nelem, WorldBitAndOp<T>());
907 }
908
909 template <typename T>
910 inline void bit_or(T* buf, size_t nelem) {
911 reduce< T, WorldBitOrOp<T> >(buf, nelem, WorldBitOrOp<T>());
912 }
913
914 template <typename T>
915 inline void bit_xor(T* buf, size_t nelem) {
916 reduce< T, WorldBitXorOp<T> >(buf, nelem, WorldBitXorOp<T>());
917 }
918
919 template <typename T>
920 inline void logic_and(T* buf, size_t nelem) {
921 reduce< T, WorldLogicAndOp<T> >(buf, nelem, WorldLogicAndOp<T>());
922 }
923
924 template <typename T>
925 inline void logic_or(T* buf, size_t nelem) {
926 reduce< T, WorldLogicOrOp<T> >(buf, nelem, WorldLogicOrOp<T>());
927 }
928
929 /// Global sum of a scalar while still processing AM & tasks
930 template <typename T>
931 void sum(T& a) {
932 sum(&a, 1);
933 }
934
935 /// Global max of a scalar while still processing AM & tasks
936 template <typename T>
937 void max(T& a) {
938 max(&a, 1);
939 }
940
941 /// Global min of a scalar while still processing AM & tasks
942 template <typename T>
943 void min(T& a) {
944 min(&a, 1);
945 }
946
947 /// Concatenate an STL vector of serializable stuff onto node 0
948
949 /// \param[in] v input vector
950 /// \param[in] bufsz the max number of bytes in the result; must be less than std::numeric_limits<int>::max()
951 /// \return on rank 0 returns the concatenated vector, elsewhere returns an empty vector
952 template <typename T>
953 std::vector<T> concat0(const std::vector<T>& v, size_t bufsz=1024*1024) {
954 MADNESS_ASSERT(bufsz <= std::numeric_limits<int>::max());
955 // bufsz must be multiple of alignment!!! so ensure that
956 bufsz = ((bufsz + sizeof(void*) - 1) / sizeof(void*)) * sizeof(void*);
957
958 ProcessID parent, child0, child1;
959 world_.mpi.binary_tree_info(0, parent, child0, child1);
960 int child0_nbatch = 0, child1_nbatch = 0;
961
962 struct free_dtor {
963 void operator()(std::byte *ptr) {
964 if (ptr != nullptr)
965 std::free(ptr);
966 };
967 };
968 using sptr_t = std::unique_ptr<std::byte[], free_dtor>;
969
970 auto aligned_buf_alloc = [&]() -> std::byte* {
971#ifdef HAVE_POSIX_MEMALIGN
972 void *ptr;
973 if (posix_memalign(&ptr, sizeof(void *), bufsz) != 0) {
974 throw std::bad_alloc();
975 }
976 return static_cast<std::byte *>(ptr);
977#else
978 return static_cast<std::byte *>(
979 std::aligned_alloc(sizeof(void *), bufsz));
980#endif
981 };
982
983 auto buf0 = sptr_t(aligned_buf_alloc(),
984 free_dtor{});
985 auto buf1 = sptr_t(aligned_buf_alloc(),
986 free_dtor{});
987
988 // transfer data in chunks at most this large
989 const int batch_size = static_cast<int>(
990 std::min(static_cast<size_t>(max_reducebcast_msg_size()), bufsz));
991
992 // precompute max # of tags any node ... will need, and allocate them on every node to avoid tag counter divergence
993 const int max_nbatch = bufsz / batch_size;
994 // one tag is reserved for sending the number of messages to expect and the size of the last message
995 const int max_ntags = max_nbatch + 1;
997 std::vector<Tag> tags; // stores tags used to send each batch
998 tags.reserve(max_nbatch);
999 for(int t=0; t<max_ntags; ++t) tags.push_back(world_.mpi.unique_tag());
1000
1001 if (child0 != -1 || child1 != -1) {
1002 // receive # of batches
1003
1004 auto receive_nbatch = [&,this]() {
1005 if (child0 != -1) {
1006 world_.mpi.Recv(&child0_nbatch, 1, MPI_INT, child0,
1007 tags[0]);
1008 }
1009 if (child1 != -1) {
1010 world_.mpi.Recv(&child1_nbatch, 1, MPI_INT, child1,
1011 tags[0]);
1012 }
1013 };
1014
1015 receive_nbatch();
1016
1017 // receive data in batches
1018
1019 auto receive_batch = [&,this](const int batch, const size_t buf_offset) {
1020 SafeMPI::Request req0, req1;
1021 if (child0 != -1 && batch < child0_nbatch) {
1022 int msg_size = batch_size;
1023 // if last batch, receive # of bytes to expect
1024 if (batch + 1 == child0_nbatch) {
1025 auto req = world_.mpi.Irecv(
1026 &msg_size, 1, MPI_INT, child0, tags[0]);
1027 World::await(req);
1028 }
1029
1030 req0 = world_.mpi.Irecv(buf0.get() + buf_offset,
1031 msg_size, MPI_BYTE, child0,
1032 tags[batch + 1]);
1033 }
1034 if (child1 != -1 && batch < child1_nbatch) {
1035 int msg_size = batch_size;
1036 // if last batch, receive # of bytes to expect
1037 if (batch + 1 == child1_nbatch) {
1038 auto req = world_.mpi.Irecv(
1039 &msg_size, 1, MPI_INT, child1, tags[0]);
1040 World::await(req);
1041 }
1042 req1 = world_.mpi.Irecv(buf1.get() + buf_offset,
1043 msg_size, MPI_BYTE, child1,
1044 tags[batch + 1]);
1045 }
1046
1047 if (child0 != -1 && batch < child0_nbatch) {
1048 World::await(req0);
1049 }
1050 if (child1 != -1 && batch < child1_nbatch) {
1051 World::await(req1);
1052 }
1053 };
1054
1055 size_t buf_offset = 0;
1056 int batch = 0;
1057 while (buf_offset < bufsz) {
1058 receive_batch(batch, buf_offset);
1059 buf_offset += batch_size;
1060 buf_offset = std::min(buf_offset, bufsz);
1061 ++batch;
1062 }
1063 }
1064
1065
1066 std::vector<T> left, right;
1067 if (child0 != -1) {
1068 archive::BufferInputArchive ar(buf0.get(), bufsz);
1069 ar & left;
1070 }
1071 if (child1 != -1) {
1072 archive::BufferInputArchive ar(buf1.get(), bufsz);
1073 ar & right;
1074 for (unsigned int i = 0; i < right.size(); ++i)
1075 left.push_back(right[i]);
1076 }
1077 for (unsigned int i=0; i<v.size(); ++i) left.push_back(v[i]);
1078
1079 // send data in batches
1080 if (parent != -1) {
1081 archive::BufferOutputArchive ar(buf0.get(), bufsz);
1082 ar & left;
1083 const auto total_nbytes_to_send = ar.size();
1084
1085 // send nbatches to expect
1086 const int nbatch = (total_nbytes_to_send + batch_size - 1) / batch_size;
1087 world_.mpi.Send(&nbatch, 1, MPI_INT, parent,
1088 tags[0]);
1089
1090 size_t buf_offset = 0;
1091 int batch = 0;
1092 while (buf_offset < bufsz) {
1093
1094 // send data in batches
1095 auto send_batch = [&,this](const int batch, const size_t buf_offset) {
1096 const int nbytes_to_send = static_cast<int>(
1097 std::min(static_cast<size_t>(batch_size),
1098 total_nbytes_to_send - buf_offset));
1099 // if last batch, send # of bytes to expect
1100 if (batch + 1 == nbatch) {
1101 auto req = world_.mpi.Isend(
1102 &nbytes_to_send, 1, MPI_INT, parent, tags[0]);
1103 World::await(req);
1104 }
1105 auto req0 =
1106 world_.mpi.Isend(buf0.get() + buf_offset, nbytes_to_send,
1107 MPI_BYTE, parent, tags[batch + 1]);
1108 World::await(req0);
1109 };
1110
1111 send_batch(batch, buf_offset);
1112 buf_offset += batch_size;
1113 buf_offset = std::min(buf_offset, bufsz);
1114 ++batch;
1115 }
1116 }
1117
1118 if (parent == -1) return left;
1119 else return std::vector<T>();
1120 }
1121
1122 /// Receive data from \c source
1123
1124 /// \tparam valueT The data type stored in cache
1125 /// \tparam keyT The key type
1126 /// \param source The process that is sending the data to this process
1127 /// \param key The key associated with the received data
1128 /// \return A future that will be set with the received data
1129 /// \note It is the user's responsibility to ensure that \c key does not
1130 /// conflict with other calls to \c recv. Keys may be reused after the
1131 /// associated operation has finished.
1132 template <typename valueT, typename keyT>
1133 static Future<valueT> recv(const ProcessID source, const keyT& key) {
1134 return recv_internal<valueT>(ProcessKey<keyT, PointToPointTag>(key, source));
1135 }
1136
1137 /// Send value to \c dest
1138
1139 /// \tparam keyT The key type
1140 /// \tparam valueT The value type (this may be a \c Future type)
1141 /// \param dest The process where the data will be sent
1142 /// \param key The key that is associated with the data
1143 /// \param value The data to be sent to \c dest
1144 /// \note It is the user's responsibility to ensure that \c key does not
1145 /// conflict with other calls to \c send. Keys may be reused after the
1146 /// associated operation has finished.
1147 template <typename keyT, typename valueT>
1148 void send(const ProcessID dest, const keyT& key, const valueT& value) const {
1150 }
1151
1152 /// Lazy sync
1153
1154 /// Lazy sync functions are asynchronous barriers with a nullary functor
1155 /// that is called after all processes have called it with the same
1156 /// key. You can think of lazy_sync as an asynchronous barrier. The
1157 /// lazy_sync functor must have the following signature:
1158 /// \code
1159 /// class SyncFunc {
1160 /// public:
1161 /// // typedefs
1162 /// typedef void result_type;
1163 ///
1164 /// // Constructors
1165 /// SyncFunc(const SyncFunc&);
1166 ///
1167 /// // The function that performs the sync operation
1168 /// void operator()();
1169 ///
1170 /// }; // class SyncFunc
1171 /// \endcode
1172 /// \tparam keyT The key type
1173 /// \tparam opT The operation type
1174 /// \param key The sync key
1175 /// \param op The sync operation to be executed on this process
1176 /// \note It is the user's responsibility to ensure that \c key does not
1177 /// conflict with other calls to \c lazy_sync. Keys may be reused after
1178 /// the associated operation has finished.
1179 template <typename keyT, typename opT>
1180 void lazy_sync(const keyT& key, const opT& op) const {
1181 if(world_.size() > 1) { // Do nothing for the trivial case
1182 // Get the binary tree data
1183 Hash<keyT> hasher;
1184 const ProcessID root = hasher(key) % world_.size();
1185 ProcessID parent = -1, child0 = -1, child1 = -1;
1186 world_.mpi.binary_tree_info(root, parent, child0, child1);
1187
1188 lazy_sync_internal<LazySyncTag>(parent, child0, child1, key, op);
1189 } else {
1190 auto lazy_sync_children_fn = & WorldGopInterface::template lazy_sync_children<keyT, opT>;
1191 // There is only one process, so run the sync operation now.
1192 world_.taskq.add(*this, lazy_sync_children_fn,
1193 -1, -1, key, op, -1, TaskAttributes::hipri());
1194 }
1195 }
1196
1197
1198 /// Group lazy sync
1199
1200 /// Lazy sync functions are asynchronous barriers with a nullary functor
1201 /// that is called after all processes in the group have called it with
1202 /// the same key. You can think of lazy_sync as an asynchronous barrier.
1203 /// The \c op functor must have the following signature:
1204 /// \code
1205 /// class SyncFunc {
1206 /// public:
1207 /// // typedefs
1208 /// typedef void result_type;
1209 ///
1210 /// // Constructors
1211 /// SyncFunc(const SyncFunc&);
1212 ///
1213 /// // The function that performs the sync operation
1214 /// void operator()();
1215 ///
1216 /// }; // class SyncFunc
1217 /// \endcode
1218 /// \tparam keyT The key type
1219 /// \tparam opT The operation type
1220 /// \param key The sync key
1221 /// \param op The sync operation to be executed on this process
1222 /// \note It is the user's responsibility to ensure that \c key does not
1223 /// conflict with other calls to \c lazy_sync. Keys may be reused after
1224 /// the associated operation has finished.
1225 template <typename keyT, typename opT>
1226 void lazy_sync(const keyT& key, const opT& op, const Group& group) const {
1227 MADNESS_ASSERT(! group.empty());
1228 MADNESS_ASSERT(group.get_world().id() == world_.id());
1229
1230 if(group.size() > 1) { // Do nothing for the trivial case
1231 // Get the binary tree data
1232 Hash<keyT> hasher;
1233 const ProcessID group_root = hasher(key) % group.size();
1234 ProcessID parent = -1, child0 = -1, child1 = -1;
1235 group.make_tree(group_root, parent, child0, child1);
1236
1237 lazy_sync_internal<GroupLazySyncTag>(parent, child0, child1, key, op);
1238 } else {
1239 auto lazy_sync_children_fn = & WorldGopInterface::template lazy_sync_children<keyT, opT>;
1240 world_.taskq.add(*this, lazy_sync_children_fn,
1241 -1, -1, key, op, -1, TaskAttributes::hipri());
1242 }
1243 }
1244
1245 /// Broadcast
1246
1247 /// Broadcast data from the \c root process to all processes. The input/
1248 /// output data is held by \c value.
1249 /// \param[in] key The key associated with this broadcast
1250 /// \param[in,out] value On the \c root process, this is used as the input
1251 /// data that will be broadcast to all other processes. On other
1252 /// processes it is used as the output to the broadcast.
1253 /// \param root The process that owns the data to be broadcast
1254 /// \throw madness::Exception When \c root is less than 0 or greater
1255 /// than or equal to the world size.
1256 /// \throw madness::Exception When \c value has been set, except on the
1257 /// \c root process.
1258 /// \note It is the user's responsibility to ensure that \c key does not
1259 /// conflict with other calls to \c bcast. Keys may be reused after
1260 /// the associated operation has finished.
1261 template <typename keyT, typename valueT>
1262 void bcast(const keyT& key, Future<valueT>& value, const ProcessID root) const {
1263 MADNESS_ASSERT((root >= 0) && (root < world_.size()));
1264 MADNESS_ASSERT((world_.rank() == root) || (! value.probe()));
1265
1266 if(world_.size() > 1) // Do nothing for the trivial case
1267 bcast_internal<BcastTag>(key, value, root);
1268 }
1269
1270 /// Group broadcast
1271
1272 /// Broadcast data from the \c group_root process to all processes in
1273 /// \c group. The input/output data is held by \c value.
1274 /// \param[in] key The key associated with this broadcast
1275 /// \param[in,out] value On the \c group_root process, this is used as the
1276 /// input data that will be broadcast to all other processes in the group.
1277 /// On other processes it is used as the output to the broadcast
1278 /// \param group_root The process in \c group that owns the data to be
1279 /// broadcast
1280 /// \param group The process group where value will be broadcast
1281 /// \throw madness::Exception When \c group is empty
1282 /// \throw madness::Exception When \c group is not registered
1283 /// \throw madness::Exception When the world id of \c group is not
1284 /// equal to that of the world used to construct this object
1285 /// \throw madness::Exception When this process is not in the group
1286 /// \throw madness::Exception When \c group_root is less than 0 or
1287 /// greater than or equal to \c group size
1288 /// \throw madness::Exception When \c data has been set except on the
1289 /// \c root process
1290 /// \note It is the user's responsibility to ensure that \c key does not
1291 /// conflict with other calls to \c bcast. Keys may be reused after
1292 /// the associated operation has finished.
1293 template <typename keyT, typename valueT>
1294 void bcast(const keyT& key, Future<valueT>& value,
1295 const ProcessID group_root, const Group& group) const
1296 {
1297 MADNESS_ASSERT(! group.empty());
1298 MADNESS_ASSERT(group.get_world().id() == world_.id());
1299 MADNESS_ASSERT((group_root >= 0) && (group_root < group.size()));
1300 MADNESS_ASSERT((group.rank() == group_root) || (! value.probe()));
1301
1302 if(group.size() > 1) // Do nothing for the trivial case
1303 bcast_internal<GroupBcastTag>(key, value, group_root, group);
1304 }
1305
1306 /// Distributed reduce
1307
1308 /// The reduce functor must have the following signature:
1309 /// \code
1310 /// class ReduceFunc {
1311 /// public:
1312 /// // Typedefs
1313 /// typedef ... result_type;
1314 /// typedef ... argument_type;
1315 ///
1316 /// // Constructors
1317 /// ReduceFunc(const ReduceFunc&);
1318 ///
1319 /// // Initialization operation, which returns a default result object
1320 /// result_type operator()() const;
1321 ///
1322 /// // Reduce two result objects
1323 /// void operator()(result_type&, const result_type&) const;
1324 ///
1325 /// // Reduce a result object and an argument object
1326 /// void operator()(result_type&, const argument_type&) const;
1327 /// }; // class ReduceFunc
1328 /// \endcode
1329 /// \tparam keyT The key type
1330 /// \tparam valueT The data type to be reduced
1331 /// \tparam opT The reduction operation type
1332 /// \param key The key associated with this reduction
1333 /// \param value The local value to be reduced
1334 /// \param op The reduction operation to be applied to local and remote data
1335 /// \param root The process that will receive the result of the reduction
1336 /// \return A future to the reduce value on the root process, otherwise an
1337 /// uninitialized future that may be ignored.
1338 /// \note It is the user's responsibility to ensure that \c key does not
1339 /// conflict with other calls to \c reduce. Keys may be reused after
1340 /// the associated operation has finished.
1341 template <typename keyT, typename valueT, typename opT>
1343 reduce(const keyT& key, const valueT& value, const opT& op, const ProcessID root) {
1344 MADNESS_ASSERT((root >= 0) && (root < world_.size()));
1345
1346 // Get the binary tree data
1347 ProcessID parent = -1, child0 = -1, child1 = -1;
1348 world_.mpi.binary_tree_info(root, parent, child0, child1);
1349
1350 return reduce_internal<ReduceTag>(parent, child0, child1, root, key,
1351 value, op);
1352 }
1353
1354 /// Distributed group reduce
1355
1356 /// The reduce functor must have the following signature:
1357 /// \code
1358 /// class ReduceFunc {
1359 /// public:
1360 /// // Typedefs
1361 /// typedef ... result_type;
1362 /// typedef ... argument_type;
1363 ///
1364 /// // Constructors
1365 /// ReduceFunc(const ReduceFunc&);
1366 ///
1367 /// // Initialization operation, which returns a default result object
1368 /// result_type operator()() const;
1369 ///
1370 /// // Reduce two result objects
1371 /// void operator()(result_type&, const result_type&) const;
1372 ///
1373 /// // Reduce a result object and an argument object
1374 /// void operator()(result_type&, const argument_type&) const;
1375 /// }; // class ReduceFunc
1376 /// \endcode
1377 /// \tparam keyT The key type
1378 /// \tparam valueT The data type to be reduced
1379 /// \tparam opT The reduction operation type
1380 /// \param key The key associated with this reduction
1381 /// \param value The local value to be reduced
1382 /// \param op The reduction operation to be applied to local and remote data
1383 /// \param group_root The group process that will receive the result of the reduction
1384 /// \param group The group that will preform the reduction
1385 /// \return A future to the reduce value on the root process, otherwise an
1386 /// uninitialized future that may be ignored.
1387 /// \throw madness::Exception When \c group is empty
1388 /// \throw madness::Exception When \c group is not registered
1389 /// \throw madness::Exception When the world id of \c group is not
1390 /// equal to that of the world used to construct this object
1391 /// \throw madness::Exception When this process is not in the group
1392 /// \throw madness::Exception When \c group_root is less than zero or
1393 /// greater than or equal to \c group size.
1394 /// \note It is the user's responsibility to ensure that \c key does not
1395 /// conflict with other calls to \c reduce. Keys may be reused after
1396 /// the associated operation has finished.
1397 template <typename keyT, typename valueT, typename opT>
1399 reduce(const keyT& key, const valueT& value, const opT& op,
1400 const ProcessID group_root, const Group& group)
1401 {
1402 MADNESS_ASSERT(! group.empty());
1403 MADNESS_ASSERT(group.get_world().id() == world_.id());
1404 MADNESS_ASSERT((group_root >= 0) && (group_root < group.size()));
1405
1406 // Get the binary tree data
1407 ProcessID parent = -1, child0 = -1, child1 = -1;
1408 group.make_tree(group_root, parent, child0, child1);
1409
1410 return reduce_internal<ReduceTag>(parent, child0, child1, group_root,
1411 key, value, op);
1412 }
1413
1414 /// Distributed all reduce
1415
1416 /// The reduce functor must have the following signature:
1417 /// \code
1418 /// class ReduceFunc {
1419 /// public:
1420 /// // Typedefs
1421 /// typedef ... result_type;
1422 /// typedef ... argument_type;
1423 ///
1424 /// // Constructors
1425 /// ReduceFunc(const ReduceFunc&);
1426 ///
1427 /// // Initialization operation, which returns a default result object
1428 /// result_type operator()() const;
1429 ///
1430 /// // Reduce two result objects
1431 /// void operator()(result_type&, const result_type&) const;
1432 ///
1433 /// // Reduce a result object and an argument object
1434 /// void operator()(result_type&, const argument_type&) const;
1435 /// }; // class ReduceFunc
1436 /// \endcode
1437 /// \tparam keyT The key type
1438 /// \tparam valueT The data type to be reduced
1439 /// \tparam opT The reduction operation type
1440 /// \param key The key associated with this reduction
1441 /// \param value The local value to be reduced
1442 /// \param op The reduction operation to be applied to local and remote data
1443 /// \return A future to the reduce value on the root process, otherwise an
1444 /// uninitialized future that may be ignored.
1445 /// \note It is the user's responsibility to ensure that \c key does not
1446 /// conflict with other calls to \c all_reduce. Keys may be reused after
1447 /// the associated operation has finished.
1448 template <typename keyT, typename valueT, typename opT>
1450 all_reduce(const keyT& key, const valueT& value, const opT& op) {
1451 // Compute the parent and child processes of this process in a binary tree.
1452 Hash<keyT> hasher;
1453 const ProcessID root = hasher(key) % world_.size();
1454 ProcessID parent = -1, child0 = -1, child1 = -1;
1455 world_.mpi.binary_tree_info(root, parent, child0, child1);
1456
1457 // Reduce the data
1459 reduce_internal<AllReduceTag>(parent, child0, child1, root,
1460 key, value, op);
1461
1462 if(world_.rank() != root)
1464
1465 // Broadcast the result of the reduction to all processes
1466 bcast_internal<AllReduceTag>(key, reduce_result, root);
1467
1468 return reduce_result;
1469 }
1470
1471 /// Distributed, group all reduce
1472
1473 /// The reduce functor must have the following signature:
1474 /// \code
1475 /// class ReduceFunc {
1476 /// public:
1477 /// // Typedefs
1478 /// typedef ... result_type;
1479 /// typedef ... argument_type;
1480 ///
1481 /// // Constructors
1482 /// ReduceFunc(const ReduceFunc&);
1483 ///
1484 /// // Initialization operation, which returns a default result object
1485 /// result_type operator()() const;
1486 ///
1487 /// // Reduce two result objects
1488 /// void operator()(result_type&, const result_type&) const;
1489 ///
1490 /// // Reduce a result object and an argument object
1491 /// void operator()(result_type&, const argument_type&) const;
1492 /// }; // class ReduceFunc
1493 /// \endcode
1494 /// \tparam keyT The key type
1495 /// \tparam valueT The data type to be reduced
1496 /// \tparam opT The reduction operation type
1497 /// \param key The key associated with this reduction
1498 /// \param value The local value to be reduced
1499 /// \param op The reduction operation to be applied to local and remote data
1500 /// \param group The group that will preform the reduction
1501 /// \return A future to the reduce value on the root process, otherwise an
1502 /// uninitialized future that may be ignored
1503 /// \throw madness::Exception When \c group is empty
1504 /// \throw madness::Exception When \c group is not registered
1505 /// \throw madness::Exception When the world id of \c group is not
1506 /// equal to that of the world used to construct this object
1507 /// \throw madness::Exception When this process is not in the group
1508 /// \note It is the user's responsibility to ensure that \c key does not
1509 /// conflict with other calls to \c reduce. Keys may be reused after
1510 /// the associated operation has finished.
1511 template <typename keyT, typename valueT, typename opT>
1513 all_reduce(const keyT& key, const valueT& value, const opT& op, const Group& group) {
1514 MADNESS_ASSERT(! group.empty());
1515 MADNESS_ASSERT(group.get_world().id() == world_.id());
1516
1517 // Compute the parent and child processes of this process in a binary tree.
1518 Hash<keyT> hasher;
1519 const ProcessID group_root = hasher(key) % group.size();
1520 ProcessID parent = -1, child0 = -1, child1 = -1;
1521 group.make_tree(group_root, parent, child0, child1);
1522
1523 // Reduce the data
1525 reduce_internal<GroupAllReduceTag>(parent, child0, child1,
1526 group_root, key, value, op);
1527
1528
1529 if(group.rank() != group_root)
1531
1532 // Broadcast the result of the reduction to all processes in the group
1533 bcast_internal<GroupAllReduceTag>(key, reduce_result, 0, group);
1534
1535 return reduce_result;
1536 }
1537 }; // class WorldGopInterface
1538
1539} // namespace madness
1540
1541#endif // MADNESS_WORLD_WORLDGOP_H__INCLUDED
Implements an archive wrapping a memory buffer.
void binary_tree_info(int root, int &parent, int &child0, int &child1)
Construct info about a binary tree with given root.
Definition safempi.cc:39
int Get_rank() const
Definition safempi.h:714
static int unique_tag_period()
Definition safempi.h:836
int unique_tag()
Returns a unique tag for temporary use (1023<tag<4095)
Definition safempi.h:830
Definition safempi.h:289
World active message that extends an RMI message.
Definition worldam.h:80
The class used for callbacks (e.g., dependency tracking).
Definition dependency_interface.h:61
A future is a possibly yet unevaluated value.
Definition future.h:373
T & get(bool dowork=true) &
Gets the value, waiting if necessary.
Definition future.h:574
static const Future< T > default_initializer()
See "Gotchas" on Futures about why this exists and how to use it.
Definition future.h:462
bool probe() const
Check whether this future has been assigned.
Definition future.h:631
A collection of processes.
Definition group.h:50
void remote_update() const
Update remote usage count.
Definition group.h:383
void local_update() const
Update local usage count.
Definition group.h:369
ProcessID size() const
Group size accessor.
Definition group.h:429
const DistributedID & id() const
Group id accessor.
Definition group.h:396
bool empty() const
Quary empty group.
Definition group.h:391
ProcessID rank() const
Group rank accessor.
Definition group.h:412
static madness::Future< Group > get_group(const DistributedID &did)
Get group from the registry.
Definition group.cc:90
void make_tree(const ProcessID group_root, ProcessID &parent, ProcessID &child1, ProcessID &child2) const
Compute the binary tree parents and children.
Definition group.h:449
World & get_world() const
Parent world accessor.
Definition group.h:404
Key object that includes the process information.
Definition distributed_id.h:80
static const attrT ATTR_UNORDERED
Definition worldrmi.h:180
Key object that uses a tag to differentiate keys.
Definition distributed_id.h:177
static TaskAttributes hipri()
Definition thread.h:450
void send(ProcessID dest, am_handlerT op, const AmArg *arg, const int attr=RMI::ATTR_ORDERED)
Sends a managed non-blocking active message.
Definition worldam.h:278
Delayed send callback object.
Definition worldgop.h:174
DelayedSend(World &world, const ProcessID dest, const keyT &key, const Future< valueT > &value)
Constructor.
Definition worldgop.h:188
virtual ~DelayedSend()
Definition worldgop.h:193
World & world_
The communication world.
Definition worldgop.h:176
const ProcessID dest_
The destination process id.
Definition worldgop.h:177
Future< valueT > value_
The data to be sent.
Definition worldgop.h:179
const keyT key_
The distributed id associated with value_.
Definition worldgop.h:178
DelayedSend< keyT, valueT > & operator=(const DelayedSend< keyT, valueT > &)
DelayedSend(const DelayedSend< keyT, valueT > &)
virtual void notify()
Notify this object that the future has been set.
Definition worldgop.h:199
Provides collectives that interoperate with the AM and task interfaces.
Definition worldgop.h:145
int max_reducebcast_msg_size() const
Returns the maximum size of messages (in bytes) sent by reduce and broadcast.
Definition worldgop.h:695
void lazy_sync(const keyT &key, const opT &op, const Group &group) const
Group lazy sync.
Definition worldgop.h:1226
void send_internal(ProcessID dest, const keyT &key, const Future< valueT > &value) const
Send value to dest.
Definition worldgop.h:249
void max(T *buf, size_t nelem)
Inplace global max while still processing AM & tasks.
Definition worldgop.h:882
static void bcast_handler(const AmArg &arg)
Definition worldgop.h:352
void lazy_sync(const keyT &key, const opT &op) const
Lazy sync.
Definition worldgop.h:1180
World & world_
World object that this is a part of.
Definition worldgop.h:147
int set_max_reducebcast_msg_size(int sz)
Set the maximum size of messages (in bytes) sent by reduce and broadcast.
Definition worldgop.h:685
std::shared_ptr< detail::DeferredCleanup > deferred_
Deferred cleanup object.
Definition worldgop.h:148
void reduce(T *buf, std::size_t nelem, opT op)
Inplace global reduction (like MPI all_reduce) while still processing AM & tasks.
Definition worldgop.h:783
void broadcast(T &t)
Broadcast of a scalar from node 0 to all other nodes.
Definition worldgop.h:740
~WorldGopInterface()
Definition worldgop.h:660
void broadcast_serializable(objT &obj, ProcessID root)
Broadcast a serializable object.
Definition worldgop.h:754
void lazy_sync_internal(const ProcessID parent, const ProcessID child0, const ProcessID child1, const keyT &key, const opT &op) const
Start a distributed lazy sync operation.
Definition worldgop.h:315
void sum(T &a)
Global sum of a scalar while still processing AM & tasks.
Definition worldgop.h:931
int max_reducebcast_msg_size_
maximum size of messages (in bytes) sent by reduce and broadcast
Definition worldgop.h:151
void fence(bool debug=false)
Synchronizes all processes in communicator AND globally ensures no pending AM or tasks.
Definition worldgop.cc:161
void broadcast(void *buf, size_t nbyte, ProcessID root, bool dowork=true, Tag bcast_tag=-1)
Broadcasts bytes from process root while still processing AM & tasks.
Definition worldgop.cc:173
void bit_and(T *buf, size_t nelem)
Definition worldgop.h:905
void bcast_internal(const keyT &key, Future< valueT > &value, const ProcessID root) const
Broadcast.
Definition worldgop.h:471
void lazy_sync_parent(const ProcessID parent, const keyT &key, const ProcessID, const ProcessID) const
Lazy sync parent task.
Definition worldgop.h:279
static Future< valueT > recv_internal(const keyT &key)
Receive data from remote node.
Definition worldgop.h:212
void absmin(T *buf, size_t nelem)
Inplace global absmin while still processing AM & tasks.
Definition worldgop.h:888
void bit_or(T *buf, size_t nelem)
Definition worldgop.h:910
std::enable_if<!is_future< valueT >::value >::type send_internal(const ProcessID dest, const keyT &key, const valueT &value) const
Send value to dest.
Definition worldgop.h:226
WorldGopInterface(World &world)
Definition worldgop.h:656
bool set_forbid_fence(bool value)
Set forbid_fence flag to new value and return old value.
Definition worldgop.h:674
void bcast(const keyT &key, Future< valueT > &value, const ProcessID group_root, const Group &group) const
Group broadcast.
Definition worldgop.h:1294
void bcast_task(const keyT &key, const valueT &value, const ProcessID root) const
Broadcast task.
Definition worldgop.h:391
void group_bcast_task(const keyT &key, const valueT &value, const ProcessID group_root, const Group &group) const
Definition worldgop.h:422
void send(const ProcessID dest, const keyT &key, const valueT &value) const
Send value to dest.
Definition worldgop.h:1148
void logic_or(T *buf, size_t nelem)
Definition worldgop.h:925
void bcast(const keyT &key, Future< valueT > &value, const ProcessID root) const
Broadcast.
Definition worldgop.h:1262
int initial_max_reducebcast_msg_size()
Definition worldgop.h:629
void serial_invoke(std::function< void()> action)
Executes an action on single (this) thread after ensuring all other work is done.
Definition worldgop.cc:165
static void group_bcast_handler(const AmArg &arg)
Definition worldgop.h:367
Future< typename detail::result_of< opT >::type > all_reduce(const keyT &key, const valueT &value, const opT &op)
Distributed all reduce.
Definition worldgop.h:1450
Future< typename detail::result_of< opT >::type > reduce_internal(const ProcessID parent, const ProcessID child0, const ProcessID child1, const ProcessID root, const keyT &key, const valueT &value, const opT &op)
Distributed reduce.
Definition worldgop.h:583
bool forbid_fence_
forbid calling fence() in case of several active worlds
Definition worldgop.h:150
static detail::result_of< opT >::type reduce_result_task(const std::vector< Future< typename detail::result_of< opT >::type > > &results, const opT &op)
Definition worldgop.h:559
void absmax(T *buf, size_t nelem)
Inplace global absmax while still processing AM & tasks.
Definition worldgop.h:894
void broadcast(T *buf, size_t nelem, ProcessID root)
Broadcasts typed contiguous data from process root while still processing AM & tasks.
Definition worldgop.h:734
void fence_impl(std::function< void()> epilogue=[]{}, bool pause_during_epilogue=false, bool debug=false)
Implementation of fence.
Definition worldgop.cc:50
static detail::result_of< opT >::type reduce_task(const valueT &value, const opT &op)
Definition worldgop.h:551
void product(T *buf, size_t nelem)
Inplace global product while still processing AM & tasks.
Definition worldgop.h:900
void min(T *buf, size_t nelem)
Inplace global min while still processing AM & tasks.
Definition worldgop.h:876
void logic_and(T *buf, size_t nelem)
Definition worldgop.h:920
static Future< valueT > recv(const ProcessID source, const keyT &key)
Receive data from source.
Definition worldgop.h:1133
Future< typename detail::result_of< opT >::type > reduce(const keyT &key, const valueT &value, const opT &op, const ProcessID group_root, const Group &group)
Distributed group reduce.
Definition worldgop.h:1399
bool debug_
Debug mode.
Definition worldgop.h:149
void min(T &a)
Global min of a scalar while still processing AM & tasks.
Definition worldgop.h:943
void max(T &a)
Global max of a scalar while still processing AM & tasks.
Definition worldgop.h:937
std::vector< T > concat0(const std::vector< T > &v, size_t bufsz=1024 *1024)
Concatenate an STL vector of serializable stuff onto node 0.
Definition worldgop.h:953
Future< typename detail::result_of< opT >::type > reduce(const keyT &key, const valueT &value, const opT &op, const ProcessID root)
Distributed reduce.
Definition worldgop.h:1343
void barrier()
Synchronizes all processes in communicator ... does NOT fence pending AM or tasks.
Definition worldgop.h:700
Future< typename detail::result_of< opT >::type > all_reduce(const keyT &key, const valueT &value, const opT &op, const Group &group)
Distributed, group all reduce.
Definition worldgop.h:1513
void lazy_sync_children(const ProcessID child0, const ProcessID child1, const keyT &key, opT &op, const ProcessID) const
Lazy sync parent task.
Definition worldgop.h:297
void bcast_internal(const keyT &key, Future< valueT > &value, const ProcessID group_root, const Group &group) const
Group broadcast.
Definition worldgop.h:521
void sum(T *buf, size_t nelem)
Inplace global sum while still processing AM & tasks.
Definition worldgop.h:870
void bit_xor(T *buf, size_t nelem)
Definition worldgop.h:915
friend class detail::DeferredCleanup
Definition worldgop.h:153
bool set_debug(bool value)
Set debug flag to new value and return old value.
Definition worldgop.h:667
void broadcast(T &t, ProcessID root)
Broadcast of a scalar from node root to all other nodes.
Definition worldgop.h:746
std::enable_if<!std::is_pointer< T >::value, SafeMPI::Request >::type Isend(const T &datum, int dest, int tag=SafeMPI::DEFAULT_SEND_RECV_TAG) const
Isend one element.
Definition worldmpi.h:308
SafeMPI::Request Irecv(T *buf, int count, int source, int tag=SafeMPI::DEFAULT_SEND_RECV_TAG) const
Async receive data of up to count elements from process source.
Definition worldmpi.h:321
void Send(const T *buf, long lenbuf, int dest, int tag=SafeMPI::DEFAULT_SEND_RECV_TAG) const
Send array of lenbuf elements to process dest.
Definition worldmpi.h:347
void Recv(T *buf, long lenbuf, int src, int tag) const
Receive data of up to lenbuf elements from process src.
Definition worldmpi.h:374
void add(TaskInterface *t)
Add a new local task, taking ownership of the pointer.
Definition world_task_queue.h:466
A parallel world class.
Definition world.h:132
WorldTaskQueue & taskq
Task queue.
Definition world.h:204
ProcessID rank() const
Returns the process rank in this World (same as MPI_Comm_rank()).
Definition world.h:318
static void await(SafeMPI::Request &request, bool dowork=true)
Wait for a MPI request to complete.
Definition world.h:516
WorldMpiInterface & mpi
MPI interface.
Definition world.h:202
ProcessID size() const
Returns the number of processes in this World (same as MPI_Comm_size()).
Definition world.h:328
unsigned long id() const
Definition world.h:313
WorldGopInterface & gop
Global operations.
Definition world.h:205
WorldAmInterface & am
AM interface.
Definition world.h:203
Wraps an archive around a memory buffer for input.
Definition buffer_archive.h:134
Wraps an archive around a memory buffer for output.
Definition buffer_archive.h:59
std::size_t size() const
Return the amount of data stored (counted) in the buffer.
Definition buffer_archive.h:123
Deferred cleanup of shared_ptr's.
Definition deferred_cleanup.h:60
Distributed caching utility.
Definition dist_cache.h:54
static void get_cache_value(const keyT &key, madness::Future< valueT > &value)
Get the cache value accosted with key.
Definition dist_cache.h:185
static void set_cache_value(const keyT &key, const valueT &value)
Set the cache value accosted with key.
Definition dist_cache.h:146
static bool debug
Definition dirac-hatom.cc:16
auto T(World &world, response_space &f) -> response_space
Definition global_functions.cc:34
Tensor< typename Tensor< T >::scalar_type > arg(const Tensor< T > &t)
Return a new tensor holding the argument of each element of t (complex types only)
Definition tensor.h:2502
static const double v
Definition hatom_sf_dirac.cc:20
Tensor< double > op(const Tensor< double > &x)
Definition kain.cc:508
#define max(a, b)
Definition lda.h:51
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
Intracomm COMM_WORLD
Definition safempi.cc:67
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
std::pair< uniqueidT, std::size_t > DistributedID
Distributed ID which is used to identify objects.
Definition distributed_id.h:48
double abs(double x)
Definition complexfun.h:48
AmArg * copy_am_arg(const AmArg &arg)
Definition worldam.h:170
AmArg * new_am_arg(const argT &... args)
Convenience template for serializing arguments into a new AmArg.
Definition worldam.h:194
bool quiet()
Check if the MADNESS runtime was initialized for quiet operation.
Definition world.cc:77
void error(const char *msg)
Definition world.cc:139
std::string type(const PairType &n)
Definition PNOParameters.h:18
std::uint64_t cstr_to_memory_size(const char *str)
Unit-aware conversion of a C string to a size_t.
Definition units.cc:14
static const double b
Definition nonlinschro.cc:119
static const double a
Definition nonlinschro.cc:118
int posix_memalign(void **memptr, std::size_t alignment, std::size_t size)
Definition posixmem.h:44
Hash functor.
Definition worldhash.h:233
Definition worldgop.h:86
T operator()(const T &a, const T &b) const
Definition worldgop.h:87
Definition worldgop.h:100
T operator()(const T &a, const T &b) const
Definition worldgop.h:101
Definition worldgop.h:107
T operator()(const T &a, const T &b) const
Definition worldgop.h:108
Definition worldgop.h:114
T operator()(const T &a, const T &b) const
Definition worldgop.h:115
Definition worldgop.h:121
T operator()(const T &a, const T &b) const
Definition worldgop.h:122
Definition worldgop.h:159
Definition worldgop.h:161
Definition worldgop.h:128
T operator()(const T &a, const T &b) const
Definition worldgop.h:129
Definition worldgop.h:135
T operator()(const T &a, const T &b) const
Definition worldgop.h:136
Definition worldgop.h:79
T operator()(const T &a, const T &b) const
Definition worldgop.h:80
Definition worldgop.h:93
T operator()(const T &a, const T &b) const
Definition worldgop.h:94
Definition worldgop.h:72
T operator()(const T &a, const T &b) const
Definition worldgop.h:73
Definition worldgop.h:65
T operator()(const T &a, const T &b) const
Definition worldgop.h:66
fnT::result_type type
Definition function_traits.h:97
T type
Type with Future removed.
Definition type_traits.h:110
#define MPI_INT
Definition stubmpi.h:81
#define MPI_BYTE
Definition stubmpi.h:77
AtomicInt sum
Definition test_atomicint.cc:46
std::pair< int, double > valueT
Definition test_binsorter.cc:6
double source(const coordT &r)
Definition testperiodic.cc:48
const char * status[2]
Definition testperiodic.cc:43
Declares the World class for the parallel runtime environment.
Defines TaskInterface and implements WorldTaskQueue and associated stuff.
Defines types used by the parallel runtime.
int ProcessID
Used to clearly identify process number/rank.
Definition worldtypes.h:43
int Tag
Used to clearly identify message tag/type.
Definition worldtypes.h:44