MADNESS 0.10.1
worldgop.h
Go to the documentation of this file.
1/*
2 This file is part of MADNESS.
3
4 Copyright (C) 2007,2010 Oak Ridge National Laboratory
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
20 For more information please contact:
21
22 Robert J. Harrison
23 Oak Ridge National Laboratory
24 One Bethel Valley Road
25 P.O. Box 2008, MS-6367
26
27 email: harrisonrj@ornl.gov
28 tel: 865-241-3937
29 fax: 865-572-0680
30 */
31
32#ifndef MADNESS_WORLD_WORLDGOP_H__INCLUDED
33#define MADNESS_WORLD_WORLDGOP_H__INCLUDED
34
35/// \file worldgop.h
36/// \brief Implements global operations
37
38/// If you can recall the Intel hypercubes, their comm lib used GOP as
39/// the abbreviation.
40
41#include <functional>
42#include <type_traits>
45#include <madness/world/world.h>
48#include <madness/world/group.h>
50#include <madness/world/units.h>
51
52namespace madness {
53
54 // Forward declarations
55 class World;
56 class WorldAmInterface;
57 class WorldTaskQueue;
58 namespace detail {
59
60 class DeferredCleanup;
61
62 } // namespace detail
63
64 template <typename T>
65 struct WorldSumOp {
66 inline T operator()(const T& a, const T& b) const {
67 return a+b;
68 }
69 };
70
71 template <typename T>
72 struct WorldMultOp {
73 inline T operator()(const T& a, const T& b) const {
74 return a*b;
75 }
76 };
77
78 template <typename T>
79 struct WorldMaxOp {
80 inline T operator()(const T& a, const T& b) const {
81 return a>b? a : b;
82 }
83 };
84
85 template <typename T>
87 inline T operator()(const T& a, const T& b) const {
88 using std::abs;
89 return abs(a)>abs(b)? abs(a) : abs(b);
90 }
91 };
92
93 template <typename T>
94 struct WorldMinOp {
95 inline T operator()(const T& a, const T& b) const {
96 return a<b? a : b;
97 }
98 };
99
100 template <typename T>
102 inline T operator()(const T& a, const T& b) const {
103 using std::abs;
104 return abs(a)<abs(b)? abs(a) : abs(b);
105 }
106 };
107
108 template <typename T>
110 inline T operator()(const T& a, const T& b) const {
111 return a & b;
112 }
113 };
114
115 template <typename T>
117 inline T operator()(const T& a, const T& b) const {
118 return a | b;
119 }
120 };
121
122 template <typename T>
124 inline T operator()(const T& a, const T& b) const {
125 return a ^ b;
126 }
127 };
128
129 template <typename T>
131 inline T operator()(const T& a, const T& b) const {
132 return a && b;
133 }
134 };
135
136 template <typename T>
138 inline T operator()(const T& a, const T& b) const {
139 return a || b;
140 }
141 };
142
143
144 /// Provides collectives that interoperate with the AM and task interfaces
145
146 /// If native AM interoperates with MPI we probably should map these to MPI.
148 private:
149 World& world_; ///< World object that this is a part of
150 std::shared_ptr<detail::DeferredCleanup> deferred_; ///< Deferred cleanup object.
151 bool debug_; ///< Debug mode
152 bool forbid_fence_=false; ///< forbid calling fence() in case of several active worlds
153 int max_reducebcast_msg_size_ = std::numeric_limits<int>::max(); ///< maximum size of messages (in bytes) sent by reduce and broadcast
154
156
157 // Message tags
158 struct PointToPointTag { };
159 struct LazySyncTag { };
161 struct BcastTag { };
162 struct GroupBcastTag { };
163 struct ReduceTag { };
164 struct GroupReduceTag { };
165 struct AllReduceTag { };
167
168
169 /// Delayed send callback object
170
171 /// This callback object is used to send local data to a remove process
172 /// once it has been set.
173 /// \tparam keyT The data key
174 /// \tparam valueT The type of data to be sent
175 template <typename keyT, typename valueT>
177 private:
178 World& world_; ///< The communication world
179 const ProcessID dest_; ///< The destination process id
180 const keyT key_; ///< The distributed id associated with \c value_
181 Future<valueT> value_; ///< The data to be sent
182
183 // Not allowed
186
187 public:
188
189 /// Constructor
190 DelayedSend(World& world, const ProcessID dest,
191 const keyT& key, const Future<valueT>& value) :
192 world_(world), dest_(dest), key_(key), value_(value)
193 { }
194
195 virtual ~DelayedSend() { }
196
197 /// Notify this object that the future has been set.
198
199 /// This will set the value of the future on the remote node and delete
200 /// this callback object.
201 virtual void notify() {
204 delete this;
205 }
206 }; // class DelayedSend
207
208 /// Receive data from remote node
209
210 /// \tparam valueT The data type stored in cache
211 /// \param key The distributed ID
212 /// \return A future to the data
213 template <typename valueT, typename keyT>
214 static Future<valueT> recv_internal(const keyT& key) {
215 return detail::DistCache<keyT>::template get_cache_value<valueT>(key);
216 }
217
218 /// Send \c value to \c dest
219
220 /// Send non-future data to \c dest.
221 /// \tparam keyT The key type
222 /// \tparam valueT The value type
223 /// \param dest The node where the data will be sent
224 /// \param key The key that is associated with the data
225 /// \param value The data to be sent to \c dest
226 template <typename keyT, typename valueT>
227 typename std::enable_if<!is_future<valueT>::value >::type
228 send_internal(const ProcessID dest, const keyT& key, const valueT& value) const {
229 typedef detail::DistCache<keyT> dist_cache;
230
231 if(world_.rank() == dest) {
232 // When dest is this process, skip the task and set the future immediately.
233 dist_cache::set_cache_value(key, value);
234 } else {
235 // Spawn a remote task to set the value
236 world_.taskq.add(dest, dist_cache::template set_cache_value<valueT>, key,
237 value, TaskAttributes::hipri());
238 }
239 }
240
241 /// Send \c value to \c dest
242
243 /// Send data that is stored in a future to \c dest. The data in
244 /// \c value is only sent to the remote process once it has been set.
245 /// \tparam keyT The key type
246 /// \tparam valueT The value type
247 /// \param dest The node where the data will be sent
248 /// \param key The key that is associated with the data
249 /// \param value The data to be sent to \c dest
250 template <typename keyT, typename valueT>
251 void send_internal(ProcessID dest, const keyT& key, const Future<valueT>& value) const {
252 typedef detail::DistCache<keyT> dist_cache;
253
254 if(world_.rank() == dest) {
255 dist_cache::set_cache_value(key, value);
256 } else {
257 // The destination is not this node, so send it to the destination.
258 if(value.probe()) {
259 // Spawn a remote task to set the value
260 world_.taskq.add(dest, dist_cache::template set_cache_value<valueT>, key,
261 value.get(), TaskAttributes::hipri());
262 } else {
263 // The future is not ready, so create a callback object that will
264 // send value to the destination node when it is ready.
265 DelayedSend<keyT, valueT>* delayed_send_callback =
266 new DelayedSend<keyT, valueT>(world_, dest, key, value);
267 const_cast<Future<valueT>&>(value).register_callback(delayed_send_callback);
268
269 }
270 }
271 }
272
273 /// Lazy sync parent task
274
275 /// Send signal to the parent process in the binary tree for a lazy sync
276 /// operation.
277 /// \tparam keyT The key type
278 /// \param parent The parent process of this process in the binary tree
279 /// \param key The lazy sync key
280 template <typename keyT>
281 void lazy_sync_parent(const ProcessID parent, const keyT& key,
282 const ProcessID, const ProcessID) const
283 {
284 send_internal(parent, key, key.proc());
285 }
286
287 /// Lazy sync parent task
288
289 /// Send signal to the child processes in the binary tree for a lazy
290 /// sync operation. After the signal has been sent to the children, the
291 /// sync operation, \c op, will be run.
292 /// \tparam keyT The key type
293 /// \tparam opT The sync operation type
294 /// \param child0 The first child process of this process in the binary tree
295 /// \param child1 The second child process of this process in the binary tree
296 /// \param key The key associated with the sync operation
297 /// \param op The sync operation that will be run
298 template <typename keyT, typename opT>
299 void lazy_sync_children(const ProcessID child0, const ProcessID child1,
300 const keyT& key, opT& op, const ProcessID) const
301 {
302 // Signal children to execute the operation.
303 if(child0 != -1)
304 send_internal(child0, key, 1);
305 if(child1 != -1)
306 send_internal(child1, key, 1);
307
308 // Execute the operation on this process.
309 op();
310 }
311
312 /// Start a distributed lazy sync operation
313
314 /// \param key The sync key
315 /// \param op The sync operation to be executed on this process
316 template <typename tagT, typename keyT, typename opT>
317 void lazy_sync_internal(const ProcessID parent, const ProcessID child0,
318 const ProcessID child1, const keyT& key, const opT& op) const {
319 typedef ProcessKey<keyT, tagT> key_type;
320
321 // Get signals from parent and children.
322 madness::Future<ProcessID> child0_signal = (child0 != -1 ?
323 recv_internal<ProcessID>(key_type(key, child0)) :
325 madness::Future<ProcessID> child1_signal = (child1 != -1 ?
326 recv_internal<ProcessID>(key_type(key, child1)) :
328 madness::Future<ProcessID> parent_signal = (parent != -1 ?
329 recv_internal<ProcessID>(key_type(key, parent)) :
331
332 // Construct the task that notifies children to run the operation
333 key_type my_key(key, world_.rank());
334 auto lazy_sync_children_fn = & WorldGopInterface::template lazy_sync_children<key_type, opT>;
335 world_.taskq.add(*this, lazy_sync_children_fn,
336 child0_signal, child1_signal, my_key, op, parent_signal,
338
339 // Send signal to parent
340 if(parent != -1) {
341 if(child0_signal.probe() && child1_signal.probe())
342 send_internal(parent, my_key, world_.rank());
343 else {
344 auto lazy_sync_parent_fn = & WorldGopInterface::template lazy_sync_parent<key_type>;
345 world_.taskq.add(*this, lazy_sync_parent_fn,
346 parent, my_key, child0_signal, child1_signal,
348 }
349 }
350 }
351
352
353 template <typename keyT, typename valueT, typename taskfnT>
354 static void bcast_handler(const AmArg& arg) {
355 // Deserialize message arguments
356 taskfnT taskfn;
357 keyT key;
358 valueT value;
359 ProcessID root;
360
361 arg & taskfn & key & value & root;
362
363 // Add task to queue
364 arg.get_world()->taskq.add(arg.get_world()->gop, taskfn, key,
365 value, root, TaskAttributes::hipri());
366 }
367
368 template <typename keyT, typename valueT, typename taskfnT>
369 static void group_bcast_handler(const AmArg& arg) {
370 // Deserialize message arguments
371 taskfnT taskfn;
372 keyT key;
373 valueT value;
374 ProcessID group_root;
375 DistributedID group_key;
376
377 arg & taskfn & key & value & group_root & group_key;
378
379 // Get the local group
380 const Future<Group> group = Group::get_group(group_key);
381
382 // Add task to queue
383 arg.get_world()->taskq.add(arg.get_world()->gop, taskfn, key, value,
384 group_root, group, TaskAttributes::hipri());
385 }
386
387
388 /// Broadcast task
389
390 /// This task will set the local cache with the broadcast data and send
391 /// it to child processes in the binary tree.
392 template <typename keyT, typename valueT>
393 void bcast_task(const keyT& key, const valueT& value, const ProcessID root) const {
394 typedef void (WorldGopInterface::*taskfnT)(const keyT&, const valueT&,
395 const ProcessID) const;
396
397 // Compute binary tree data
398 ProcessID parent = -1, child0 = -1, child1 = -1;
399 world_.mpi.binary_tree_info(root, parent, child0, child1);
400
401 // Set the local data, except on the root process
402 if(parent != -1)
404
405 if(child0 != -1) { // Check that this process has children in the binary tree
406
407 // Get handler function and arguments
408 void (*handler)(const AmArg&) =
409 & WorldGopInterface::template bcast_handler<keyT, valueT, taskfnT>;
410 AmArg* const args0 = new_am_arg(
411 & WorldGopInterface::template bcast_task<keyT, valueT>,
412 key, value, root);
413
414 // Send active message to children
415 if(child1 != -1) {
416 AmArg* const args1 = copy_am_arg(*args0);
417 world_.am.send(child1, handler, args1, RMI::ATTR_UNORDERED);
418 }
419 world_.am.send(child0, handler, args0, RMI::ATTR_UNORDERED);
420 }
421 }
422
423 template <typename keyT, typename valueT>
424 void group_bcast_task(const keyT& key, const valueT& value,
425 const ProcessID group_root, const Group& group) const
426 {
427 typedef void (WorldGopInterface::*taskfnT)(const keyT&, const valueT&,
428 const ProcessID, const Group&) const;
429
430 // Set the local data, except on the root process
431 ProcessID parent = -1, child0 = -1, child1 = -1;
432 group.make_tree(group_root, parent, child0, child1);
433
434 // Set the local data
435 if(parent != -1) {
437 group.remote_update();
438 }
439
440 if(child0 != -1) { // Check that this process has children in the binary tree
441
442 // Get handler function and arguments
443 void (*handler)(const AmArg&) =
444 & WorldGopInterface::template group_bcast_handler<keyT, valueT, taskfnT>;
445 AmArg* const args0 = new_am_arg(
446 & WorldGopInterface::template group_bcast_task<keyT, valueT>,
447 key, value, group_root, group.id());
448
449 // Send active message to children
450 if(child1 != -1) {
451 AmArg* const args1 = copy_am_arg(*args0);
452 world_.am.send(child1, handler, args1, RMI::ATTR_UNORDERED);
453 }
454 world_.am.send(child0, handler, args0, RMI::ATTR_UNORDERED);
455 }
456 }
457
458 /// Broadcast
459
460 /// Broadcast data from the \c root process to all processes in \c world.
461 /// The input/output data is held by \c value.
462 /// \tparam tagT The tag type that is attached to \c keyT
463 /// \tparam keyT The base key type
464 /// \tparam valueT The value type that will be broadcast
465 /// \param[in] key The key associated with this broadcast
466 /// \param[in,out] value On the \c root process, this is used as the input
467 /// data that will be broadcast to all other processes in the group.
468 /// On other processes it is used as the output to the broadcast
469 /// \param root The process that owns the data to be broadcast
470 /// \throw madness::Exception When \c value has been set, except on the
471 /// \c root process.
472 template <typename tagT, typename keyT, typename valueT>
473 void bcast_internal(const keyT& key, Future<valueT>& value, const ProcessID root) const {
474 MADNESS_ASSERT((root >= 0) && (root < world_.size()));
475 MADNESS_ASSERT((world_.rank() == root) || (! value.probe()));
476
477 // Add operation tag to key
478 typedef TaggedKey<keyT, tagT> key_type;
479 const key_type tagged_key(key);
480
481 if(world_.size() > 1) { // Do nothing for the trivial case
482 if(world_.rank() == root) {
483 // This process owns the data to be broadcast.
484
485 // Spawn remote tasks that will set the local cache for this
486 // broadcast on other nodes.
487 if(value.probe())
488 // The value is ready so send it now
489 bcast_task(tagged_key, value.get(), root);
490 else {
491 // The value is not ready so spawn a task to send the
492 // data when it is ready.
493 auto bcast_task_fn = & WorldGopInterface::template bcast_task<key_type, valueT>;
494 world_.taskq.add(*this, bcast_task_fn,
495 tagged_key, value, root, TaskAttributes::hipri());
496 }
497 } else {
498 MADNESS_ASSERT(! value.probe());
499
500 // Get the broadcast value from local cache
502 }
503 }
504 }
505
506 /// Group broadcast
507
508 /// Broadcast data from the \c group_root process to all processes in
509 /// \c group. The input/output data is held by \c value.
510 /// \tparam tagT The tag type that is attached to \c keyT
511 /// \tparam keyT The base key type
512 /// \tparam valueT The value type that will be broadcast
513 /// \param[in] key The key associated with this broadcast
514 /// \param[in,out] value On the \c group_root process, this is used as the
515 /// input data that will be broadcast to all other processes in the group.
516 /// On other processes it is used as the output to the broadcast
517 /// \param group_root The process in \c group that owns the data to be
518 /// broadcast
519 /// \param group The process group where value will be broadcast
520 /// \throw madness::Exception When \c value has been set, except on the
521 /// \c group_root process.
522 template <typename tagT, typename keyT, typename valueT>
523 void bcast_internal(const keyT& key, Future<valueT>& value,
524 const ProcessID group_root, const Group& group) const
525 {
526 // Construct the internal broadcast key
527 typedef TaggedKey<keyT, tagT> key_type;
528 const key_type tagged_key(key);
529
530 if(group.rank() == group_root) {
531 // This process owns the data to be broadcast.
532 if(value.probe())
533 group_bcast_task(tagged_key, value.get(), group_root, group);
534 else {
535 auto group_bcast_task_fn = & WorldGopInterface::template group_bcast_task<key_type, valueT>;
536 world_.taskq.add(this, group_bcast_task_fn,
537 tagged_key, value, group_root, group,
539 }
540 } else {
541 MADNESS_ASSERT(! value.probe());
542
543 // This is not the root process, so retrieve the broadcast data
545
546 // Increment local use counter for group
547 group.local_update();
548 }
549 }
550
551 template <typename valueT, typename opT>
552 static typename detail::result_of<opT>::type
553 reduce_task(const valueT& value, const opT& op) {
554 typename detail::result_of<opT>::type result = op();
555 op(result, value);
556 return result;
557 }
558
559 template <typename opT>
560 static typename detail::result_of<opT>::type
561 reduce_result_task(const std::vector<Future<typename detail::result_of<opT>::type> >& results,
562 const opT& op)
563 {
564 MADNESS_ASSERT(results.size() != 0ul);
565 Future<typename detail::result_of<opT>::type> result = results.front();
566 for(std::size_t i = 1ul; i < results.size(); ++i)
567 op(result.get(), results[i].get());
568 return result.get();
569 }
570
571 /// Distributed reduce
572
573 /// \tparam tagT The tag type to be added to the key type
574 /// \tparam keyT The key type
575 /// \tparam valueT The data type to be reduced
576 /// \tparam opT The reduction operation type
577 /// \param key The key associated with this reduction
578 /// \param value The local value to be reduced
579 /// \param op The reduction operation to be applied to local and remote data
580 /// \param root The process that will receive the result of the reduction
581 /// \return A future to the reduce value on the root process, otherwise an
582 /// uninitialized future that may be ignored.
583 template <typename tagT, typename keyT, typename valueT, typename opT>
585 reduce_internal(const ProcessID parent, const ProcessID child0,
586 const ProcessID child1, const ProcessID root, const keyT& key,
587 const valueT& value, const opT& op)
588 {
589 // Create tagged key
590 typedef ProcessKey<keyT, tagT> key_type;
591 typedef typename detail::result_of<opT>::type result_type;
592 typedef typename remove_future<valueT>::type value_type;
593 std::vector<Future<result_type> > results;
594 results.reserve(3);
595
596 // Add local data to vector of values to reduce
597 results.push_back(world_.taskq.add(WorldGopInterface::template reduce_task<value_type, opT>,
598 value, op, TaskAttributes::hipri()));
599
600 // Reduce child data
601 if(child0 != -1)
602 results.push_back(recv_internal<result_type>(key_type(key, child0)));
603 if(child1 != -1)
604 results.push_back(recv_internal<result_type>(key_type(key, child1)));
605
606 // Submit the local reduction task
607 Future<result_type> local_result =
608 world_.taskq.add(WorldGopInterface::template reduce_result_task<opT>,
609 results, op, TaskAttributes::hipri());
610
611 // Send reduced value to parent or, if this is the root process, set the
612 // result future.
613 if(parent == -1)
614 return local_result;
615 else
616 send_internal(parent, key_type(key, world_.rank()), local_result);
617
619 }
620
621 /// Implementation of fence
622
623 /// \param[in] epilogue the action to execute (by the calling thread) immediately after the fence
624 /// \param[in] pause_during_epilogue whether to suspend work while executing epilogue
625 /// \param[in] debug set to true to print progress statistics using madness::print(); the default is false.
626 /// \warning currently only \c pause_during_epilogue=false is supported
627 void fence_impl(std::function<void()> epilogue = []{},
628 bool pause_during_epilogue = false,
629 bool debug = false);
630
632 int result = std::numeric_limits<int>::max();
633 const auto* initial_max_reducebcast_msg_size_cstr = std::getenv("MAD_MAX_REDUCEBCAST_MSG_SIZE");
634 if (initial_max_reducebcast_msg_size_cstr) {
635 auto result_u64 = cstr_to_memory_size(initial_max_reducebcast_msg_size_cstr);
636 const auto do_print = SafeMPI::COMM_WORLD.Get_rank() == 0 && !madness::quiet();
637 if (result_u64>std::numeric_limits<int>::max()) {
638 if (do_print)
639 std::cout
640 << "!!MADNESS WARNING: Invalid value for environment variable MAD_MAX_REDUCEBCAST_MSG_SIZE.\n"
641 << "!!MADNESS WARNING: MAD_MAX_REDUCEBCAST_MSG_SIZE = "
642 << result_u64 << "\n";
643 result = std::numeric_limits<int>::max();
644 }
645 result = static_cast<int>(result_u64);
646 if(do_print) {
647 std::cout
648 << "MADNESS max msg size for GOP reduce/broadcast set to "
649 << result << " bytes.\n";
650 }
651 }
652 return result;
653 }
654
655 public:
656
657 // In the World constructor can ONLY rely on MPI and MPI being initialized
661
663 deferred_->destroy(true);
664 deferred_->do_cleanup();
665 }
666
667
668 /// Set debug flag to new value and return old value
669 bool set_debug(bool value) {
670 bool status = debug_;
671 debug_ = value;
672 return status;
673 }
674
675 /// Set forbid_fence flag to new value and return old value
676 bool set_forbid_fence(bool value) {
677 bool status = forbid_fence_;
678 forbid_fence_ = value;
679 return status;
680 }
681
682 /// Set the maximum size of messages (in bytes) sent by reduce and broadcast
683
684 /// \param sz the maximum size of messages (in bytes) sent by reduce and broadcast
685 /// \return the previous maximum size of messages (in bytes) sent by reduce and broadcast
686 /// \pre `sz>0`
688 MADNESS_ASSERT(sz>0);
689 std::swap(max_reducebcast_msg_size_,sz);
691 }
692
693
694 /// Returns the maximum size of messages (in bytes) sent by reduce and broadcast
695
696 /// \return the maximum size of messages (in bytes) sent by reduce and broadcast
700
701 /// Synchronizes all processes in communicator ... does NOT fence pending AM or tasks
702 void barrier() {
703 long i = world_.rank();
704 sum(i);
705 if (i != world_.size()*(world_.size()-1)/2) error("bad value after sum in barrier");
706 }
707
708
709 /// Synchronizes all processes in communicator AND globally ensures no pending AM or tasks
710
711 /// \internal Runs Dykstra-like termination algorithm on binary tree by
712 /// locally ensuring ntask=0 and all am sent and processed,
713 /// and then participating in a global sum of nsent and nrecv.
714 /// Then globally checks that nsent=nrecv and that both are
715 /// constant over two traversals. We are then sure
716 /// that all tasks and AM are processed and there no AM in
717 /// flight.
718 /// \param[in] debug set to true to print progress statistics using madness::print(); the default is false.
719 void fence(bool debug = false);
720
721 /// Executes an action on single (this) thread after ensuring all other work is done
722
723 /// \param[in] action the action to execute (by the calling thread)
724 void serial_invoke(std::function<void()> action);
725
726 /// Broadcasts bytes from process root while still processing AM & tasks
727
728 /// Optimizations can be added for long messages
729 void broadcast(void* buf, size_t nbyte, ProcessID root, bool dowork = true, Tag bcast_tag = -1);
730
731
732 /// Broadcasts typed contiguous data from process root while still processing AM & tasks
733
734 /// Optimizations can be added for long messages
735 template <typename T, typename = std::enable_if_t<madness::is_trivially_copyable_v<T>>>
736 inline void broadcast(T* buf, size_t nelem, ProcessID root) {
737 broadcast((void *) buf, nelem*sizeof(T), root);
738 }
739
740 /// Broadcast of a scalar from node 0 to all other nodes
741 template <typename T, typename = std::enable_if_t<madness::is_trivially_copyable_v<T>>>
742 void broadcast(T& t) {
743 broadcast(&t, 1, 0);
744 }
745
746 /// Broadcast of a scalar from node root to all other nodes
747 template <typename T, typename = std::enable_if_t<madness::is_trivially_copyable_v<T>>>
748 void broadcast(T& t, ProcessID root) {
749 broadcast(&t, 1, root);
750 }
751
752 /// Broadcast a serializable object
753 template <typename objT,
754 typename = std::void_t<decltype(std::declval<archive::BufferInputArchive&>()&std::declval<objT&>())>,
755 typename = std::void_t<decltype(std::declval<archive::BufferOutputArchive&>()&std::declval<const objT&>())>>
756 void broadcast_serializable(objT& obj, ProcessID root) {
757 MADNESS_ASSERT(root < world_.size());
758 if (world_.size() == 1) return;
759
760 size_t BUFLEN;
761 if (world_.rank() == root) {
763 count & obj;
764 BUFLEN = count.size();
765 }
766 broadcast(BUFLEN, root);
767
768 unsigned char* buf = new unsigned char[BUFLEN];
769 if (world_.rank() == root) {
770 archive::BufferOutputArchive ar(buf,BUFLEN);
771 ar & obj;
772 }
773 broadcast(buf, BUFLEN, root);
774 if (world_.rank() != root) {
775 archive::BufferInputArchive ar(buf,BUFLEN);
776 ar & obj;
777 }
778 delete [] buf;
779 }
780
781 /// Inplace global reduction (like MPI all_reduce) while still processing AM & tasks
782
783 /// Optimizations can be added for long messages and to reduce the memory footprint
784 template <typename T, class opT>
785 void reduce(T* buf, std::size_t nelem, opT op) {
786 static_assert(madness::is_trivially_copyable_v<T>, "T must be trivially copyable");
787
788 ProcessID parent, child0, child1;
789 world_.mpi.binary_tree_info(0, parent, child0, child1);
790 const std::size_t nelem_per_maxmsg =
791 max_reducebcast_msg_size() / sizeof(T);
792
793 const auto buf_size = ((sizeof(T) * std::min(nelem_per_maxmsg, nelem) +
794 std::alignment_of_v<T> - 1) /
795 std::alignment_of_v<T>) * std::alignment_of_v<T>;
796 struct free_dtor {
797 void operator()(T *ptr) {
798 if (ptr != nullptr)
799 std::free(ptr);
800 };
801 };
802 using sptr_t = std::unique_ptr<T[], free_dtor>;
803
804 auto aligned_buf_alloc = [&]() -> T* {
805 // posix_memalign requires alignment to be an integer multiple of sizeof(void*)!! so ensure that
806 const std::size_t alignment =
807 ((std::alignment_of_v<T> + sizeof(void *) - 1) /
808 sizeof(void *)) *
809 sizeof(void *);
810#ifdef HAVE_POSIX_MEMALIGN
811 void *ptr;
812 if (posix_memalign(&ptr, alignment, buf_size) != 0) {
813 throw std::bad_alloc();
814 }
815 return static_cast<T *>(ptr);
816#else
817 return static_cast<T *>(std::aligned_alloc(alignment, buf_size));
818#endif
819 };
820
821 sptr_t buf0;
822 if (child0 != -1)
823 buf0 = sptr_t(aligned_buf_alloc(),
824 free_dtor{});
825 sptr_t buf1(nullptr);
826 if (child1 != -1)
827 buf1 = sptr_t(aligned_buf_alloc(),
828 free_dtor{});
829
830 auto reduce_impl = [&,this](T* buf, size_t nelem) {
831 MADNESS_ASSERT(nelem <= nelem_per_maxmsg);
832 SafeMPI::Request req0, req1;
833 Tag gsum_tag = world_.mpi.unique_tag();
834
835 if (child0 != -1)
836 req0 = world_.mpi.Irecv(buf0.get(), nelem * sizeof(T), MPI_BYTE,
837 child0, gsum_tag);
838 if (child1 != -1)
839 req1 = world_.mpi.Irecv(buf1.get(), nelem * sizeof(T), MPI_BYTE,
840 child1, gsum_tag);
841
842 if (child0 != -1) {
843 World::await(req0);
844 for (long i = 0; i < (long)nelem; ++i)
845 buf[i] = op(buf[i], buf0[i]);
846 }
847 if (child1 != -1) {
848 World::await(req1);
849 for (long i = 0; i < (long)nelem; ++i)
850 buf[i] = op(buf[i], buf1[i]);
851 }
852
853 if (parent != -1) {
854 req0 = world_.mpi.Isend(buf, nelem * sizeof(T), MPI_BYTE, parent,
855 gsum_tag);
856 World::await(req0);
857 }
858
859 broadcast(buf, nelem, 0);
860 };
861
862 while (nelem) {
863 const int n = std::min(nelem_per_maxmsg, nelem);
864 reduce_impl(buf, n);
865 nelem -= n;
866 buf += n;
867 }
868 }
869
870 /// Inplace global sum while still processing AM & tasks
871 template <typename T>
872 inline void sum(T* buf, size_t nelem) {
873 reduce< T, WorldSumOp<T> >(buf, nelem, WorldSumOp<T>());
874 }
875
876 /// Inplace global min while still processing AM & tasks
877 template <typename T>
878 inline void min(T* buf, size_t nelem) {
879 reduce< T, WorldMinOp<T> >(buf, nelem, WorldMinOp<T>());
880 }
881
882 /// Inplace global max while still processing AM & tasks
883 template <typename T>
884 inline void max(T* buf, size_t nelem) {
885 reduce< T, WorldMaxOp<T> >(buf, nelem, WorldMaxOp<T>());
886 }
887
888 /// Inplace global absmin while still processing AM & tasks
889 template <typename T>
890 inline void absmin(T* buf, size_t nelem) {
891 reduce< T, WorldAbsMinOp<T> >(buf, nelem, WorldAbsMinOp<T>());
892 }
893
894 /// Inplace global absmax while still processing AM & tasks
895 template <typename T>
896 inline void absmax(T* buf, size_t nelem) {
897 reduce< T, WorldAbsMaxOp<T> >(buf, nelem, WorldAbsMaxOp<T>());
898 }
899
900 /// Inplace global product while still processing AM & tasks
901 template <typename T>
902 inline void product(T* buf, size_t nelem) {
903 reduce< T, WorldMultOp<T> >(buf, nelem, WorldMultOp<T>());
904 }
905
906 template <typename T>
907 inline void bit_and(T* buf, size_t nelem) {
908 reduce< T, WorldBitAndOp<T> >(buf, nelem, WorldBitAndOp<T>());
909 }
910
911 template <typename T>
912 inline void bit_or(T* buf, size_t nelem) {
913 reduce< T, WorldBitOrOp<T> >(buf, nelem, WorldBitOrOp<T>());
914 }
915
916 template <typename T>
917 inline void bit_xor(T* buf, size_t nelem) {
918 reduce< T, WorldBitXorOp<T> >(buf, nelem, WorldBitXorOp<T>());
919 }
920
921 template <typename T>
922 inline void logic_and(T* buf, size_t nelem) {
923 reduce< T, WorldLogicAndOp<T> >(buf, nelem, WorldLogicAndOp<T>());
924 }
925
926 template <typename T>
927 inline void logic_or(T* buf, size_t nelem) {
928 reduce< T, WorldLogicOrOp<T> >(buf, nelem, WorldLogicOrOp<T>());
929 }
930
931 /// Global sum of a scalar while still processing AM & tasks
932 template <typename T>
933 void sum(T& a) {
934 sum(&a, 1);
935 }
936
937 /// Global max of a scalar while still processing AM & tasks
938 template <typename T>
939 void max(T& a) {
940 max(&a, 1);
941 }
942
943 /// Global min of a scalar while still processing AM & tasks
944 template <typename T>
945 void min(T& a) {
946 min(&a, 1);
947 }
948
949 /// Concatenate an STL vector of serializable stuff onto node 0
950
951 /// \param[in] v input vector
952 /// \param[in] bufsz the max number of bytes in the result; must be less than std::numeric_limits<int>::max()
953 /// \return on rank 0 returns the concatenated vector, elsewhere returns an empty vector
954 template <typename T>
955 std::vector<T> concat0(const std::vector<T>& v, size_t bufsz=1024*1024) {
956 MADNESS_ASSERT(bufsz <= std::numeric_limits<int>::max());
957 // bufsz must be multiple of alignment!!! so ensure that
958 bufsz = ((bufsz + sizeof(void*) - 1) / sizeof(void*)) * sizeof(void*);
959
960 ProcessID parent, child0, child1;
961 world_.mpi.binary_tree_info(0, parent, child0, child1);
962 int child0_nbatch = 0, child1_nbatch = 0;
963
964 struct free_dtor {
965 void operator()(std::byte *ptr) {
966 if (ptr != nullptr)
967 std::free(ptr);
968 };
969 };
970 using sptr_t = std::unique_ptr<std::byte[], free_dtor>;
971
972 auto aligned_buf_alloc = [&]() -> std::byte* {
973#ifdef HAVE_POSIX_MEMALIGN
974 void *ptr;
975 if (posix_memalign(&ptr, sizeof(void *), bufsz) != 0) {
976 throw std::bad_alloc();
977 }
978 return static_cast<std::byte *>(ptr);
979#else
980 return static_cast<std::byte *>(
981 std::aligned_alloc(sizeof(void *), bufsz));
982#endif
983 };
984
985 auto buf0 = sptr_t(aligned_buf_alloc(),
986 free_dtor{});
987 auto buf1 = sptr_t(aligned_buf_alloc(),
988 free_dtor{});
989
990 // transfer data in chunks at most this large
991 const int batch_size = static_cast<int>(
992 std::min(static_cast<size_t>(max_reducebcast_msg_size()), bufsz));
993
994 // precompute max # of tags any node ... will need, and allocate them on every node to avoid tag counter divergence
995 const int max_nbatch = bufsz / batch_size;
996 // one tag is reserved for sending the number of messages to expect and the size of the last message
997 const int max_ntags = max_nbatch + 1;
999 std::vector<Tag> tags; // stores tags used to send each batch
1000 tags.reserve(max_nbatch);
1001 for(int t=0; t<max_ntags; ++t) tags.push_back(world_.mpi.unique_tag());
1002
1003 if (child0 != -1 || child1 != -1) {
1004 // receive # of batches
1005
1006 auto receive_nbatch = [&,this]() {
1007 if (child0 != -1) {
1008 world_.mpi.Recv(&child0_nbatch, 1, MPI_INT, child0,
1009 tags[0]);
1010 }
1011 if (child1 != -1) {
1012 world_.mpi.Recv(&child1_nbatch, 1, MPI_INT, child1,
1013 tags[0]);
1014 }
1015 };
1016
1017 receive_nbatch();
1018
1019 // receive data in batches
1020
1021 auto receive_batch = [&,this](const int batch, const size_t buf_offset) {
1022 SafeMPI::Request req0, req1;
1023 if (child0 != -1 && batch < child0_nbatch) {
1024 int msg_size = batch_size;
1025 // if last batch, receive # of bytes to expect
1026 if (batch + 1 == child0_nbatch) {
1027 auto req = world_.mpi.Irecv(
1028 &msg_size, 1, MPI_INT, child0, tags[0]);
1029 World::await(req);
1030 }
1031
1032 req0 = world_.mpi.Irecv(buf0.get() + buf_offset,
1033 msg_size, MPI_BYTE, child0,
1034 tags[batch + 1]);
1035 }
1036 if (child1 != -1 && batch < child1_nbatch) {
1037 int msg_size = batch_size;
1038 // if last batch, receive # of bytes to expect
1039 if (batch + 1 == child1_nbatch) {
1040 auto req = world_.mpi.Irecv(
1041 &msg_size, 1, MPI_INT, child1, tags[0]);
1042 World::await(req);
1043 }
1044 req1 = world_.mpi.Irecv(buf1.get() + buf_offset,
1045 msg_size, MPI_BYTE, child1,
1046 tags[batch + 1]);
1047 }
1048
1049 if (child0 != -1 && batch < child0_nbatch) {
1050 World::await(req0);
1051 }
1052 if (child1 != -1 && batch < child1_nbatch) {
1053 World::await(req1);
1054 }
1055 };
1056
1057 size_t buf_offset = 0;
1058 int batch = 0;
1059 while (buf_offset < bufsz) {
1060 receive_batch(batch, buf_offset);
1061 buf_offset += batch_size;
1062 buf_offset = std::min(buf_offset, bufsz);
1063 ++batch;
1064 }
1065 }
1066
1067
1068 std::vector<T> left, right;
1069 if (child0 != -1) {
1070 archive::BufferInputArchive ar(buf0.get(), bufsz);
1071 ar & left;
1072 }
1073 if (child1 != -1) {
1074 archive::BufferInputArchive ar(buf1.get(), bufsz);
1075 ar & right;
1076 for (unsigned int i = 0; i < right.size(); ++i)
1077 left.push_back(right[i]);
1078 }
1079 for (unsigned int i=0; i<v.size(); ++i) left.push_back(v[i]);
1080
1081 // send data in batches
1082 if (parent != -1) {
1083 archive::BufferOutputArchive ar(buf0.get(), bufsz);
1084 ar & left;
1085 const auto total_nbytes_to_send = ar.size();
1086
1087 // send nbatches to expect
1088 const int nbatch = (total_nbytes_to_send + batch_size - 1) / batch_size;
1089 world_.mpi.Send(&nbatch, 1, MPI_INT, parent,
1090 tags[0]);
1091
1092 size_t buf_offset = 0;
1093 int batch = 0;
1094 while (buf_offset < bufsz) {
1095
1096 // send data in batches
1097 auto send_batch = [&,this](const int batch, const size_t buf_offset) {
1098 const int nbytes_to_send = static_cast<int>(
1099 std::min(static_cast<size_t>(batch_size),
1100 total_nbytes_to_send - buf_offset));
1101 // if last batch, send # of bytes to expect
1102 if (batch + 1 == nbatch) {
1103 auto req = world_.mpi.Isend(
1104 &nbytes_to_send, 1, MPI_INT, parent, tags[0]);
1105 World::await(req);
1106 }
1107 auto req0 =
1108 world_.mpi.Isend(buf0.get() + buf_offset, nbytes_to_send,
1109 MPI_BYTE, parent, tags[batch + 1]);
1110 World::await(req0);
1111 };
1112
1113 send_batch(batch, buf_offset);
1114 buf_offset += batch_size;
1115 buf_offset = std::min(buf_offset, bufsz);
1116 ++batch;
1117 }
1118 }
1119
1120 if (parent == -1) return left;
1121 else return std::vector<T>();
1122 }
1123
1124 /// Receive data from \c source
1125
1126 /// \tparam valueT The data type stored in cache
1127 /// \tparam keyT The key type
1128 /// \param source The process that is sending the data to this process
1129 /// \param key The key associated with the received data
1130 /// \return A future that will be set with the received data
1131 /// \note It is the user's responsibility to ensure that \c key does not
1132 /// conflict with other calls to \c recv. Keys may be reused after the
1133 /// associated operation has finished.
1134 template <typename valueT, typename keyT>
1135 static Future<valueT> recv(const ProcessID source, const keyT& key) {
1136 return recv_internal<valueT>(ProcessKey<keyT, PointToPointTag>(key, source));
1137 }
1138
1139 /// Send value to \c dest
1140
1141 /// \tparam keyT The key type
1142 /// \tparam valueT The value type (this may be a \c Future type)
1143 /// \param dest The process where the data will be sent
1144 /// \param key The key that is associated with the data
1145 /// \param value The data to be sent to \c dest
1146 /// \note It is the user's responsibility to ensure that \c key does not
1147 /// conflict with other calls to \c send. Keys may be reused after the
1148 /// associated operation has finished.
1149 template <typename keyT, typename valueT>
1150 void send(const ProcessID dest, const keyT& key, const valueT& value) const {
1152 }
1153
1154 /// Lazy sync
1155
1156 /// Lazy sync functions are asynchronous barriers with a nullary functor
1157 /// that is called after all processes have called it with the same
1158 /// key. You can think of lazy_sync as an asynchronous barrier. The
1159 /// lazy_sync functor must have the following signature:
1160 /// \code
1161 /// class SyncFunc {
1162 /// public:
1163 /// // typedefs
1164 /// typedef void result_type;
1165 ///
1166 /// // Constructors
1167 /// SyncFunc(const SyncFunc&);
1168 ///
1169 /// // The function that performs the sync operation
1170 /// void operator()();
1171 ///
1172 /// }; // class SyncFunc
1173 /// \endcode
1174 /// \tparam keyT The key type
1175 /// \tparam opT The operation type
1176 /// \param key The sync key
1177 /// \param op The sync operation to be executed on this process
1178 /// \note It is the user's responsibility to ensure that \c key does not
1179 /// conflict with other calls to \c lazy_sync. Keys may be reused after
1180 /// the associated operation has finished.
1181 template <typename keyT, typename opT>
1182 void lazy_sync(const keyT& key, const opT& op) const {
1183 if(world_.size() > 1) { // Do nothing for the trivial case
1184 // Get the binary tree data
1185 Hash<keyT> hasher;
1186 const ProcessID root = hasher(key) % world_.size();
1187 ProcessID parent = -1, child0 = -1, child1 = -1;
1188 world_.mpi.binary_tree_info(root, parent, child0, child1);
1189
1190 lazy_sync_internal<LazySyncTag>(parent, child0, child1, key, op);
1191 } else {
1192 auto lazy_sync_children_fn = & WorldGopInterface::template lazy_sync_children<keyT, opT>;
1193 // There is only one process, so run the sync operation now.
1194 world_.taskq.add(*this, lazy_sync_children_fn,
1195 -1, -1, key, op, -1, TaskAttributes::hipri());
1196 }
1197 }
1198
1199
1200 /// Group lazy sync
1201
1202 /// Lazy sync functions are asynchronous barriers with a nullary functor
1203 /// that is called after all processes in the group have called it with
1204 /// the same key. You can think of lazy_sync as an asynchronous barrier.
1205 /// The \c op functor must have the following signature:
1206 /// \code
1207 /// class SyncFunc {
1208 /// public:
1209 /// // typedefs
1210 /// typedef void result_type;
1211 ///
1212 /// // Constructors
1213 /// SyncFunc(const SyncFunc&);
1214 ///
1215 /// // The function that performs the sync operation
1216 /// void operator()();
1217 ///
1218 /// }; // class SyncFunc
1219 /// \endcode
1220 /// \tparam keyT The key type
1221 /// \tparam opT The operation type
1222 /// \param key The sync key
1223 /// \param op The sync operation to be executed on this process
1224 /// \note It is the user's responsibility to ensure that \c key does not
1225 /// conflict with other calls to \c lazy_sync. Keys may be reused after
1226 /// the associated operation has finished.
1227 template <typename keyT, typename opT>
1228 void lazy_sync(const keyT& key, const opT& op, const Group& group) const {
1229 MADNESS_ASSERT(! group.empty());
1230 MADNESS_ASSERT(group.get_world().id() == world_.id());
1231
1232 if(group.size() > 1) { // Do nothing for the trivial case
1233 // Get the binary tree data
1234 Hash<keyT> hasher;
1235 const ProcessID group_root = hasher(key) % group.size();
1236 ProcessID parent = -1, child0 = -1, child1 = -1;
1237 group.make_tree(group_root, parent, child0, child1);
1238
1239 lazy_sync_internal<GroupLazySyncTag>(parent, child0, child1, key, op);
1240 } else {
1241 auto lazy_sync_children_fn = & WorldGopInterface::template lazy_sync_children<keyT, opT>;
1242 world_.taskq.add(*this, lazy_sync_children_fn,
1243 -1, -1, key, op, -1, TaskAttributes::hipri());
1244 }
1245 }
1246
1247 /// Broadcast
1248
1249 /// Broadcast data from the \c root process to all processes. The input/
1250 /// output data is held by \c value.
1251 /// \param[in] key The key associated with this broadcast
1252 /// \param[in,out] value On the \c root process, this is used as the input
1253 /// data that will be broadcast to all other processes. On other
1254 /// processes it is used as the output to the broadcast.
1255 /// \param root The process that owns the data to be broadcast
1256 /// \throw madness::Exception When \c root is less than 0 or greater
1257 /// than or equal to the world size.
1258 /// \throw madness::Exception When \c value has been set, except on the
1259 /// \c root process.
1260 /// \note It is the user's responsibility to ensure that \c key does not
1261 /// conflict with other calls to \c bcast. Keys may be reused after
1262 /// the associated operation has finished.
1263 template <typename keyT, typename valueT>
1264 void bcast(const keyT& key, Future<valueT>& value, const ProcessID root) const {
1265 MADNESS_ASSERT((root >= 0) && (root < world_.size()));
1266 MADNESS_ASSERT((world_.rank() == root) || (! value.probe()));
1267
1268 if(world_.size() > 1) // Do nothing for the trivial case
1269 bcast_internal<BcastTag>(key, value, root);
1270 }
1271
1272 /// Group broadcast
1273
1274 /// Broadcast data from the \c group_root process to all processes in
1275 /// \c group. The input/output data is held by \c value.
1276 /// \param[in] key The key associated with this broadcast
1277 /// \param[in,out] value On the \c group_root process, this is used as the
1278 /// input data that will be broadcast to all other processes in the group.
1279 /// On other processes it is used as the output to the broadcast
1280 /// \param group_root The process in \c group that owns the data to be
1281 /// broadcast
1282 /// \param group The process group where value will be broadcast
1283 /// \throw madness::Exception When \c group is empty
1284 /// \throw madness::Exception When \c group is not registered
1285 /// \throw madness::Exception When the world id of \c group is not
1286 /// equal to that of the world used to construct this object
1287 /// \throw madness::Exception When this process is not in the group
1288 /// \throw madness::Exception When \c group_root is less than 0 or
1289 /// greater than or equal to \c group size
1290 /// \throw madness::Exception When \c data has been set except on the
1291 /// \c root process
1292 /// \note It is the user's responsibility to ensure that \c key does not
1293 /// conflict with other calls to \c bcast. Keys may be reused after
1294 /// the associated operation has finished.
1295 template <typename keyT, typename valueT>
1296 void bcast(const keyT& key, Future<valueT>& value,
1297 const ProcessID group_root, const Group& group) const
1298 {
1299 MADNESS_ASSERT(! group.empty());
1300 MADNESS_ASSERT(group.get_world().id() == world_.id());
1301 MADNESS_ASSERT((group_root >= 0) && (group_root < group.size()));
1302 MADNESS_ASSERT((group.rank() == group_root) || (! value.probe()));
1303
1304 if(group.size() > 1) // Do nothing for the trivial case
1305 bcast_internal<GroupBcastTag>(key, value, group_root, group);
1306 }
1307
1308 /// Distributed reduce
1309
1310 /// The reduce functor must have the following signature:
1311 /// \code
1312 /// class ReduceFunc {
1313 /// public:
1314 /// // Typedefs
1315 /// typedef ... result_type;
1316 /// typedef ... argument_type;
1317 ///
1318 /// // Constructors
1319 /// ReduceFunc(const ReduceFunc&);
1320 ///
1321 /// // Initialization operation, which returns a default result object
1322 /// result_type operator()() const;
1323 ///
1324 /// // Reduce two result objects
1325 /// void operator()(result_type&, const result_type&) const;
1326 ///
1327 /// // Reduce a result object and an argument object
1328 /// void operator()(result_type&, const argument_type&) const;
1329 /// }; // class ReduceFunc
1330 /// \endcode
1331 /// \tparam keyT The key type
1332 /// \tparam valueT The data type to be reduced
1333 /// \tparam opT The reduction operation type
1334 /// \param key The key associated with this reduction
1335 /// \param value The local value to be reduced
1336 /// \param op The reduction operation to be applied to local and remote data
1337 /// \param root The process that will receive the result of the reduction
1338 /// \return A future to the reduce value on the root process, otherwise an
1339 /// uninitialized future that may be ignored.
1340 /// \note It is the user's responsibility to ensure that \c key does not
1341 /// conflict with other calls to \c reduce. Keys may be reused after
1342 /// the associated operation has finished.
1343 template <typename keyT, typename valueT, typename opT>
1345 reduce(const keyT& key, const valueT& value, const opT& op, const ProcessID root) {
1346 MADNESS_ASSERT((root >= 0) && (root < world_.size()));
1347
1348 // Get the binary tree data
1349 ProcessID parent = -1, child0 = -1, child1 = -1;
1350 world_.mpi.binary_tree_info(root, parent, child0, child1);
1351
1352 return reduce_internal<ReduceTag>(parent, child0, child1, root, key,
1353 value, op);
1354 }
1355
1356 /// Distributed group reduce
1357
1358 /// The reduce functor must have the following signature:
1359 /// \code
1360 /// class ReduceFunc {
1361 /// public:
1362 /// // Typedefs
1363 /// typedef ... result_type;
1364 /// typedef ... argument_type;
1365 ///
1366 /// // Constructors
1367 /// ReduceFunc(const ReduceFunc&);
1368 ///
1369 /// // Initialization operation, which returns a default result object
1370 /// result_type operator()() const;
1371 ///
1372 /// // Reduce two result objects
1373 /// void operator()(result_type&, const result_type&) const;
1374 ///
1375 /// // Reduce a result object and an argument object
1376 /// void operator()(result_type&, const argument_type&) const;
1377 /// }; // class ReduceFunc
1378 /// \endcode
1379 /// \tparam keyT The key type
1380 /// \tparam valueT The data type to be reduced
1381 /// \tparam opT The reduction operation type
1382 /// \param key The key associated with this reduction
1383 /// \param value The local value to be reduced
1384 /// \param op The reduction operation to be applied to local and remote data
1385 /// \param group_root The group process that will receive the result of the reduction
1386 /// \param group The group that will preform the reduction
1387 /// \return A future to the reduce value on the root process, otherwise an
1388 /// uninitialized future that may be ignored.
1389 /// \throw madness::Exception When \c group is empty
1390 /// \throw madness::Exception When \c group is not registered
1391 /// \throw madness::Exception When the world id of \c group is not
1392 /// equal to that of the world used to construct this object
1393 /// \throw madness::Exception When this process is not in the group
1394 /// \throw madness::Exception When \c group_root is less than zero or
1395 /// greater than or equal to \c group size.
1396 /// \note It is the user's responsibility to ensure that \c key does not
1397 /// conflict with other calls to \c reduce. Keys may be reused after
1398 /// the associated operation has finished.
1399 template <typename keyT, typename valueT, typename opT>
1401 reduce(const keyT& key, const valueT& value, const opT& op,
1402 const ProcessID group_root, const Group& group)
1403 {
1404 MADNESS_ASSERT(! group.empty());
1405 MADNESS_ASSERT(group.get_world().id() == world_.id());
1406 MADNESS_ASSERT((group_root >= 0) && (group_root < group.size()));
1407
1408 // Get the binary tree data
1409 ProcessID parent = -1, child0 = -1, child1 = -1;
1410 group.make_tree(group_root, parent, child0, child1);
1411
1412 return reduce_internal<ReduceTag>(parent, child0, child1, group_root,
1413 key, value, op);
1414 }
1415
1416 /// Distributed all reduce
1417
1418 /// The reduce functor must have the following signature:
1419 /// \code
1420 /// class ReduceFunc {
1421 /// public:
1422 /// // Typedefs
1423 /// typedef ... result_type;
1424 /// typedef ... argument_type;
1425 ///
1426 /// // Constructors
1427 /// ReduceFunc(const ReduceFunc&);
1428 ///
1429 /// // Initialization operation, which returns a default result object
1430 /// result_type operator()() const;
1431 ///
1432 /// // Reduce two result objects
1433 /// void operator()(result_type&, const result_type&) const;
1434 ///
1435 /// // Reduce a result object and an argument object
1436 /// void operator()(result_type&, const argument_type&) const;
1437 /// }; // class ReduceFunc
1438 /// \endcode
1439 /// \tparam keyT The key type
1440 /// \tparam valueT The data type to be reduced
1441 /// \tparam opT The reduction operation type
1442 /// \param key The key associated with this reduction
1443 /// \param value The local value to be reduced
1444 /// \param op The reduction operation to be applied to local and remote data
1445 /// \return A future to the reduce value on the root process, otherwise an
1446 /// uninitialized future that may be ignored.
1447 /// \note It is the user's responsibility to ensure that \c key does not
1448 /// conflict with other calls to \c all_reduce. Keys may be reused after
1449 /// the associated operation has finished.
1450 template <typename keyT, typename valueT, typename opT>
1452 all_reduce(const keyT& key, const valueT& value, const opT& op) {
1453 // Compute the parent and child processes of this process in a binary tree.
1454 Hash<keyT> hasher;
1455 const ProcessID root = hasher(key) % world_.size();
1456 ProcessID parent = -1, child0 = -1, child1 = -1;
1457 world_.mpi.binary_tree_info(root, parent, child0, child1);
1458
1459 // Reduce the data
1461 reduce_internal<AllReduceTag>(parent, child0, child1, root,
1462 key, value, op);
1463
1464 if(world_.rank() != root)
1466
1467 // Broadcast the result of the reduction to all processes
1468 bcast_internal<AllReduceTag>(key, reduce_result, root);
1469
1470 return reduce_result;
1471 }
1472
1473 /// Distributed, group all reduce
1474
1475 /// The reduce functor must have the following signature:
1476 /// \code
1477 /// class ReduceFunc {
1478 /// public:
1479 /// // Typedefs
1480 /// typedef ... result_type;
1481 /// typedef ... argument_type;
1482 ///
1483 /// // Constructors
1484 /// ReduceFunc(const ReduceFunc&);
1485 ///
1486 /// // Initialization operation, which returns a default result object
1487 /// result_type operator()() const;
1488 ///
1489 /// // Reduce two result objects
1490 /// void operator()(result_type&, const result_type&) const;
1491 ///
1492 /// // Reduce a result object and an argument object
1493 /// void operator()(result_type&, const argument_type&) const;
1494 /// }; // class ReduceFunc
1495 /// \endcode
1496 /// \tparam keyT The key type
1497 /// \tparam valueT The data type to be reduced
1498 /// \tparam opT The reduction operation type
1499 /// \param key The key associated with this reduction
1500 /// \param value The local value to be reduced
1501 /// \param op The reduction operation to be applied to local and remote data
1502 /// \param group The group that will preform the reduction
1503 /// \return A future to the reduce value on the root process, otherwise an
1504 /// uninitialized future that may be ignored
1505 /// \throw madness::Exception When \c group is empty
1506 /// \throw madness::Exception When \c group is not registered
1507 /// \throw madness::Exception When the world id of \c group is not
1508 /// equal to that of the world used to construct this object
1509 /// \throw madness::Exception When this process is not in the group
1510 /// \note It is the user's responsibility to ensure that \c key does not
1511 /// conflict with other calls to \c reduce. Keys may be reused after
1512 /// the associated operation has finished.
1513 template <typename keyT, typename valueT, typename opT>
1515 all_reduce(const keyT& key, const valueT& value, const opT& op, const Group& group) {
1516 MADNESS_ASSERT(! group.empty());
1517 MADNESS_ASSERT(group.get_world().id() == world_.id());
1518
1519 // Compute the parent and child processes of this process in a binary tree.
1520 Hash<keyT> hasher;
1521 const ProcessID group_root = hasher(key) % group.size();
1522 ProcessID parent = -1, child0 = -1, child1 = -1;
1523 group.make_tree(group_root, parent, child0, child1);
1524
1525 // Reduce the data
1527 reduce_internal<GroupAllReduceTag>(parent, child0, child1,
1528 group_root, key, value, op);
1529
1530
1531 if(group.rank() != group_root)
1533
1534 // Broadcast the result of the reduction to all processes in the group
1535 bcast_internal<GroupAllReduceTag>(key, reduce_result, 0, group);
1536
1537 return reduce_result;
1538 }
1539 }; // class WorldGopInterface
1540
1541} // namespace madness
1542
1543#endif // MADNESS_WORLD_WORLDGOP_H__INCLUDED
Implements an archive wrapping a memory buffer.
void binary_tree_info(int root, int &parent, int &child0, int &child1)
Construct info about a binary tree with given root.
Definition safempi.cc:39
int Get_rank() const
Definition safempi.h:714
static int unique_tag_period()
Definition safempi.h:836
int unique_tag()
Returns a unique tag for temporary use (1023<tag<4095)
Definition safempi.h:830
Definition safempi.h:289
World active message that extends an RMI message.
Definition worldam.h:80
The class used for callbacks (e.g., dependency tracking).
Definition dependency_interface.h:61
A future is a possibly yet unevaluated value.
Definition future.h:373
T & get(bool dowork=true) &
Gets the value, waiting if necessary.
Definition future.h:574
static const Future< T > default_initializer()
See "Gotchas" on Futures about why this exists and how to use it.
Definition future.h:462
bool probe() const
Check whether this future has been assigned.
Definition future.h:631
A collection of processes.
Definition group.h:50
void remote_update() const
Update remote usage count.
Definition group.h:383
void local_update() const
Update local usage count.
Definition group.h:369
ProcessID size() const
Group size accessor.
Definition group.h:429
const DistributedID & id() const
Group id accessor.
Definition group.h:396
bool empty() const
Quary empty group.
Definition group.h:391
ProcessID rank() const
Group rank accessor.
Definition group.h:412
static madness::Future< Group > get_group(const DistributedID &did)
Get group from the registry.
Definition group.cc:90
void make_tree(const ProcessID group_root, ProcessID &parent, ProcessID &child1, ProcessID &child2) const
Compute the binary tree parents and children.
Definition group.h:449
World & get_world() const
Parent world accessor.
Definition group.h:404
Key object that includes the process information.
Definition distributed_id.h:80
static const attrT ATTR_UNORDERED
Definition worldrmi.h:180
Key object that uses a tag to differentiate keys.
Definition distributed_id.h:177
static TaskAttributes hipri()
Definition thread.h:456
void send(ProcessID dest, am_handlerT op, const AmArg *arg, const int attr=RMI::ATTR_ORDERED)
Sends a managed non-blocking active message.
Definition worldam.h:278
Delayed send callback object.
Definition worldgop.h:176
DelayedSend(World &world, const ProcessID dest, const keyT &key, const Future< valueT > &value)
Constructor.
Definition worldgop.h:190
virtual ~DelayedSend()
Definition worldgop.h:195
World & world_
The communication world.
Definition worldgop.h:178
const ProcessID dest_
The destination process id.
Definition worldgop.h:179
Future< valueT > value_
The data to be sent.
Definition worldgop.h:181
const keyT key_
The distributed id associated with value_.
Definition worldgop.h:180
DelayedSend< keyT, valueT > & operator=(const DelayedSend< keyT, valueT > &)
DelayedSend(const DelayedSend< keyT, valueT > &)
virtual void notify()
Notify this object that the future has been set.
Definition worldgop.h:201
Provides collectives that interoperate with the AM and task interfaces.
Definition worldgop.h:147
int max_reducebcast_msg_size() const
Returns the maximum size of messages (in bytes) sent by reduce and broadcast.
Definition worldgop.h:697
void lazy_sync(const keyT &key, const opT &op, const Group &group) const
Group lazy sync.
Definition worldgop.h:1228
void send_internal(ProcessID dest, const keyT &key, const Future< valueT > &value) const
Send value to dest.
Definition worldgop.h:251
void max(T *buf, size_t nelem)
Inplace global max while still processing AM & tasks.
Definition worldgop.h:884
static void bcast_handler(const AmArg &arg)
Definition worldgop.h:354
void lazy_sync(const keyT &key, const opT &op) const
Lazy sync.
Definition worldgop.h:1182
World & world_
World object that this is a part of.
Definition worldgop.h:149
int set_max_reducebcast_msg_size(int sz)
Set the maximum size of messages (in bytes) sent by reduce and broadcast.
Definition worldgop.h:687
std::shared_ptr< detail::DeferredCleanup > deferred_
Deferred cleanup object.
Definition worldgop.h:150
void reduce(T *buf, std::size_t nelem, opT op)
Inplace global reduction (like MPI all_reduce) while still processing AM & tasks.
Definition worldgop.h:785
void broadcast(T &t)
Broadcast of a scalar from node 0 to all other nodes.
Definition worldgop.h:742
~WorldGopInterface()
Definition worldgop.h:662
void broadcast_serializable(objT &obj, ProcessID root)
Broadcast a serializable object.
Definition worldgop.h:756
void lazy_sync_internal(const ProcessID parent, const ProcessID child0, const ProcessID child1, const keyT &key, const opT &op) const
Start a distributed lazy sync operation.
Definition worldgop.h:317
void sum(T &a)
Global sum of a scalar while still processing AM & tasks.
Definition worldgop.h:933
int max_reducebcast_msg_size_
maximum size of messages (in bytes) sent by reduce and broadcast
Definition worldgop.h:153
void fence(bool debug=false)
Synchronizes all processes in communicator AND globally ensures no pending AM or tasks.
Definition worldgop.cc:161
void broadcast(void *buf, size_t nbyte, ProcessID root, bool dowork=true, Tag bcast_tag=-1)
Broadcasts bytes from process root while still processing AM & tasks.
Definition worldgop.cc:173
void bit_and(T *buf, size_t nelem)
Definition worldgop.h:907
void bcast_internal(const keyT &key, Future< valueT > &value, const ProcessID root) const
Broadcast.
Definition worldgop.h:473
void lazy_sync_parent(const ProcessID parent, const keyT &key, const ProcessID, const ProcessID) const
Lazy sync parent task.
Definition worldgop.h:281
static Future< valueT > recv_internal(const keyT &key)
Receive data from remote node.
Definition worldgop.h:214
void absmin(T *buf, size_t nelem)
Inplace global absmin while still processing AM & tasks.
Definition worldgop.h:890
void bit_or(T *buf, size_t nelem)
Definition worldgop.h:912
std::enable_if<!is_future< valueT >::value >::type send_internal(const ProcessID dest, const keyT &key, const valueT &value) const
Send value to dest.
Definition worldgop.h:228
WorldGopInterface(World &world)
Definition worldgop.h:658
bool set_forbid_fence(bool value)
Set forbid_fence flag to new value and return old value.
Definition worldgop.h:676
void bcast(const keyT &key, Future< valueT > &value, const ProcessID group_root, const Group &group) const
Group broadcast.
Definition worldgop.h:1296
void bcast_task(const keyT &key, const valueT &value, const ProcessID root) const
Broadcast task.
Definition worldgop.h:393
void group_bcast_task(const keyT &key, const valueT &value, const ProcessID group_root, const Group &group) const
Definition worldgop.h:424
void send(const ProcessID dest, const keyT &key, const valueT &value) const
Send value to dest.
Definition worldgop.h:1150
void logic_or(T *buf, size_t nelem)
Definition worldgop.h:927
void bcast(const keyT &key, Future< valueT > &value, const ProcessID root) const
Broadcast.
Definition worldgop.h:1264
int initial_max_reducebcast_msg_size()
Definition worldgop.h:631
void serial_invoke(std::function< void()> action)
Executes an action on single (this) thread after ensuring all other work is done.
Definition worldgop.cc:165
static void group_bcast_handler(const AmArg &arg)
Definition worldgop.h:369
Future< typename detail::result_of< opT >::type > all_reduce(const keyT &key, const valueT &value, const opT &op)
Distributed all reduce.
Definition worldgop.h:1452
Future< typename detail::result_of< opT >::type > reduce_internal(const ProcessID parent, const ProcessID child0, const ProcessID child1, const ProcessID root, const keyT &key, const valueT &value, const opT &op)
Distributed reduce.
Definition worldgop.h:585
bool forbid_fence_
forbid calling fence() in case of several active worlds
Definition worldgop.h:152
static detail::result_of< opT >::type reduce_result_task(const std::vector< Future< typename detail::result_of< opT >::type > > &results, const opT &op)
Definition worldgop.h:561
void absmax(T *buf, size_t nelem)
Inplace global absmax while still processing AM & tasks.
Definition worldgop.h:896
void broadcast(T *buf, size_t nelem, ProcessID root)
Broadcasts typed contiguous data from process root while still processing AM & tasks.
Definition worldgop.h:736
void fence_impl(std::function< void()> epilogue=[]{}, bool pause_during_epilogue=false, bool debug=false)
Implementation of fence.
Definition worldgop.cc:50
static detail::result_of< opT >::type reduce_task(const valueT &value, const opT &op)
Definition worldgop.h:553
void product(T *buf, size_t nelem)
Inplace global product while still processing AM & tasks.
Definition worldgop.h:902
void min(T *buf, size_t nelem)
Inplace global min while still processing AM & tasks.
Definition worldgop.h:878
void logic_and(T *buf, size_t nelem)
Definition worldgop.h:922
static Future< valueT > recv(const ProcessID source, const keyT &key)
Receive data from source.
Definition worldgop.h:1135
Future< typename detail::result_of< opT >::type > reduce(const keyT &key, const valueT &value, const opT &op, const ProcessID group_root, const Group &group)
Distributed group reduce.
Definition worldgop.h:1401
bool debug_
Debug mode.
Definition worldgop.h:151
void min(T &a)
Global min of a scalar while still processing AM & tasks.
Definition worldgop.h:945
void max(T &a)
Global max of a scalar while still processing AM & tasks.
Definition worldgop.h:939
std::vector< T > concat0(const std::vector< T > &v, size_t bufsz=1024 *1024)
Concatenate an STL vector of serializable stuff onto node 0.
Definition worldgop.h:955
Future< typename detail::result_of< opT >::type > reduce(const keyT &key, const valueT &value, const opT &op, const ProcessID root)
Distributed reduce.
Definition worldgop.h:1345
void barrier()
Synchronizes all processes in communicator ... does NOT fence pending AM or tasks.
Definition worldgop.h:702
Future< typename detail::result_of< opT >::type > all_reduce(const keyT &key, const valueT &value, const opT &op, const Group &group)
Distributed, group all reduce.
Definition worldgop.h:1515
void lazy_sync_children(const ProcessID child0, const ProcessID child1, const keyT &key, opT &op, const ProcessID) const
Lazy sync parent task.
Definition worldgop.h:299
void bcast_internal(const keyT &key, Future< valueT > &value, const ProcessID group_root, const Group &group) const
Group broadcast.
Definition worldgop.h:523
void sum(T *buf, size_t nelem)
Inplace global sum while still processing AM & tasks.
Definition worldgop.h:872
void bit_xor(T *buf, size_t nelem)
Definition worldgop.h:917
friend class detail::DeferredCleanup
Definition worldgop.h:155
bool set_debug(bool value)
Set debug flag to new value and return old value.
Definition worldgop.h:669
void broadcast(T &t, ProcessID root)
Broadcast of a scalar from node root to all other nodes.
Definition worldgop.h:748
std::enable_if<!std::is_pointer< T >::value, SafeMPI::Request >::type Isend(const T &datum, int dest, int tag=SafeMPI::DEFAULT_SEND_RECV_TAG) const
Isend one element.
Definition worldmpi.h:308
SafeMPI::Request Irecv(T *buf, int count, int source, int tag=SafeMPI::DEFAULT_SEND_RECV_TAG) const
Async receive data of up to count elements from process source.
Definition worldmpi.h:321
void Send(const T *buf, long lenbuf, int dest, int tag=SafeMPI::DEFAULT_SEND_RECV_TAG) const
Send array of lenbuf elements to process dest.
Definition worldmpi.h:347
void Recv(T *buf, long lenbuf, int src, int tag) const
Receive data of up to lenbuf elements from process src.
Definition worldmpi.h:374
void add(TaskInterface *t)
Add a new local task, taking ownership of the pointer.
Definition world_task_queue.h:466
A parallel world class.
Definition world.h:132
WorldTaskQueue & taskq
Task queue.
Definition world.h:206
ProcessID rank() const
Returns the process rank in this World (same as MPI_Comm_rank()).
Definition world.h:320
static void await(SafeMPI::Request &request, bool dowork=true)
Wait for a MPI request to complete.
Definition world.h:534
WorldMpiInterface & mpi
MPI interface.
Definition world.h:204
ProcessID size() const
Returns the number of processes in this World (same as MPI_Comm_size()).
Definition world.h:330
unsigned long id() const
Definition world.h:315
WorldGopInterface & gop
Global operations.
Definition world.h:207
WorldAmInterface & am
AM interface.
Definition world.h:205
Wraps an archive around a memory buffer for input.
Definition buffer_archive.h:134
Wraps an archive around a memory buffer for output.
Definition buffer_archive.h:59
std::size_t size() const
Return the amount of data stored (counted) in the buffer.
Definition buffer_archive.h:123
Deferred cleanup of shared_ptr's.
Definition deferred_cleanup.h:60
Distributed caching utility.
Definition dist_cache.h:54
static void get_cache_value(const keyT &key, madness::Future< valueT > &value)
Get the cache value accosted with key.
Definition dist_cache.h:185
static void set_cache_value(const keyT &key, const valueT &value)
Set the cache value accosted with key.
Definition dist_cache.h:146
static bool debug
Definition dirac-hatom.cc:16
auto T(World &world, response_space &f) -> response_space
Definition global_functions.cc:34
Tensor< typename Tensor< T >::scalar_type > arg(const Tensor< T > &t)
Return a new tensor holding the argument of each element of t (complex types only)
Definition tensor.h:2503
static const double v
Definition hatom_sf_dirac.cc:20
Tensor< double > op(const Tensor< double > &x)
Definition kain.cc:508
#define max(a, b)
Definition lda.h:51
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
Intracomm COMM_WORLD
Definition safempi.cc:67
Definition potentialmanager.cc:41
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
std::pair< uniqueidT, std::size_t > DistributedID
Distributed ID which is used to identify objects.
Definition distributed_id.h:48
double abs(double x)
Definition complexfun.h:48
AmArg * copy_am_arg(const AmArg &arg)
Definition worldam.h:170
AmArg * new_am_arg(const argT &... args)
Convenience template for serializing arguments into a new AmArg.
Definition worldam.h:194
bool quiet()
Check if the MADNESS runtime was initialized for quiet operation.
Definition world.cc:77
void error(const char *msg)
Definition world.cc:139
std::string type(const PairType &n)
Definition PNOParameters.h:18
std::uint64_t cstr_to_memory_size(const char *str)
Unit-aware conversion of a C string to a size_t.
Definition units.cc:14
static long abs(long a)
Definition tensor.h:218
static const double b
Definition nonlinschro.cc:119
static const double a
Definition nonlinschro.cc:118
int posix_memalign(void **memptr, std::size_t alignment, std::size_t size)
Definition posixmem.h:44
Hash functor.
Definition worldhash.h:233
Definition worldgop.h:86
T operator()(const T &a, const T &b) const
Definition worldgop.h:87
Definition worldgop.h:101
T operator()(const T &a, const T &b) const
Definition worldgop.h:102
Definition worldgop.h:109
T operator()(const T &a, const T &b) const
Definition worldgop.h:110
Definition worldgop.h:116
T operator()(const T &a, const T &b) const
Definition worldgop.h:117
Definition worldgop.h:123
T operator()(const T &a, const T &b) const
Definition worldgop.h:124
Definition worldgop.h:161
Definition worldgop.h:163
Definition worldgop.h:130
T operator()(const T &a, const T &b) const
Definition worldgop.h:131
Definition worldgop.h:137
T operator()(const T &a, const T &b) const
Definition worldgop.h:138
Definition worldgop.h:79
T operator()(const T &a, const T &b) const
Definition worldgop.h:80
Definition worldgop.h:94
T operator()(const T &a, const T &b) const
Definition worldgop.h:95
Definition worldgop.h:72
T operator()(const T &a, const T &b) const
Definition worldgop.h:73
Definition worldgop.h:65
T operator()(const T &a, const T &b) const
Definition worldgop.h:66
fnT::result_type type
Definition function_traits.h:97
T type
Type with Future removed.
Definition type_traits.h:110
#define MPI_INT
Definition stubmpi.h:81
#define MPI_BYTE
Definition stubmpi.h:77
AtomicInt sum
Definition test_atomicint.cc:46
std::pair< int, double > valueT
Definition test_binsorter.cc:6
double source(const coordT &r)
Definition testperiodic.cc:48
const char * status[2]
Definition testperiodic.cc:43
Declares the World class for the parallel runtime environment.
Defines TaskInterface and implements WorldTaskQueue and associated stuff.
Defines types used by the parallel runtime.
int ProcessID
Used to clearly identify process number/rank.
Definition worldtypes.h:43
int Tag
Used to clearly identify message tag/type.
Definition worldtypes.h:44