MADNESS  0.10.1
worldgop.h
Go to the documentation of this file.
1 /*
2  This file is part of MADNESS.
3 
4  Copyright (C) 2007,2010 Oak Ridge National Laboratory
5 
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program; if not, write to the Free Software
18  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 
20  For more information please contact:
21 
22  Robert J. Harrison
23  Oak Ridge National Laboratory
24  One Bethel Valley Road
25  P.O. Box 2008, MS-6367
26 
27  email: harrisonrj@ornl.gov
28  tel: 865-241-3937
29  fax: 865-572-0680
30  */
31 
32 #ifndef MADNESS_WORLD_WORLDGOP_H__INCLUDED
33 #define MADNESS_WORLD_WORLDGOP_H__INCLUDED
34 
35 /// \file worldgop.h
36 /// \brief Implements global operations
37 
38 /// If you can recall the Intel hypercubes, their comm lib used GOP as
39 /// the abbreviation.
40 
41 #include <functional>
42 #include <type_traits>
45 #include <madness/world/world.h>
48 #include <madness/world/group.h>
50 #include <madness/world/units.h>
51 
52 namespace madness {
53 
54  // Forward declarations
55  class World;
56  class WorldAmInterface;
57  class WorldTaskQueue;
58  namespace detail {
59 
60  class DeferredCleanup;
61 
62  } // namespace detail
63 
64  template <typename T>
65  struct WorldSumOp {
66  inline T operator()(const T& a, const T& b) const {
67  return a+b;
68  }
69  };
70 
71  template <typename T>
72  struct WorldMultOp {
73  inline T operator()(const T& a, const T& b) const {
74  return a*b;
75  }
76  };
77 
78  template <typename T>
79  struct WorldMaxOp {
80  inline T operator()(const T& a, const T& b) const {
81  return a>b? a : b;
82  }
83  };
84 
85  template <typename T>
86  struct WorldAbsMaxOp {
87  inline T operator()(const T& a, const T& b) const {
88  return abs(a)>abs(b)? abs(a) : abs(b);
89  }
90  };
91 
92  template <typename T>
93  struct WorldMinOp {
94  inline T operator()(const T& a, const T& b) const {
95  return a<b? a : b;
96  }
97  };
98 
99  template <typename T>
100  struct WorldAbsMinOp {
101  inline T operator()(const T& a, const T& b) const {
102  return abs(a)<abs(b)? abs(a) : abs(b);
103  }
104  };
105 
106  template <typename T>
107  struct WorldBitAndOp {
108  inline T operator()(const T& a, const T& b) const {
109  return a & b;
110  }
111  };
112 
113  template <typename T>
114  struct WorldBitOrOp {
115  inline T operator()(const T& a, const T& b) const {
116  return a | b;
117  }
118  };
119 
120  template <typename T>
121  struct WorldBitXorOp {
122  inline T operator()(const T& a, const T& b) const {
123  return a ^ b;
124  }
125  };
126 
127  template <typename T>
129  inline T operator()(const T& a, const T& b) const {
130  return a && b;
131  }
132  };
133 
134  template <typename T>
135  struct WorldLogicOrOp {
136  inline T operator()(const T& a, const T& b) const {
137  return a || b;
138  }
139  };
140 
141 
142  /// Provides collectives that interoperate with the AM and task interfaces
143 
144  /// If native AM interoperates with MPI we probably should map these to MPI.
146  private:
147  World& world_; ///< World object that this is a part of
148  std::shared_ptr<detail::DeferredCleanup> deferred_; ///< Deferred cleanup object.
149  bool debug_; ///< Debug mode
150  bool forbid_fence_=false; ///< forbid calling fence() in case of several active worlds
151  int max_reducebcast_msg_size_ = std::numeric_limits<int>::max(); ///< maximum size of messages (in bytes) sent by reduce and broadcast
152 
154 
155  // Message tags
156  struct PointToPointTag { };
157  struct LazySyncTag { };
158  struct GroupLazySyncTag { };
159  struct BcastTag { };
160  struct GroupBcastTag { };
161  struct ReduceTag { };
162  struct GroupReduceTag { };
163  struct AllReduceTag { };
164  struct GroupAllReduceTag { };
165 
166 
167  /// Delayed send callback object
168 
169  /// This callback object is used to send local data to a remove process
170  /// once it has been set.
171  /// \tparam keyT The data key
172  /// \tparam valueT The type of data to be sent
173  template <typename keyT, typename valueT>
175  private:
176  World& world_; ///< The communication world
177  const ProcessID dest_; ///< The destination process id
178  const keyT key_; ///< The distributed id associated with \c value_
179  Future<valueT> value_; ///< The data to be sent
180 
181  // Not allowed
184 
185  public:
186 
187  /// Constructor
188  DelayedSend(World& world, const ProcessID dest,
189  const keyT& key, const Future<valueT>& value) :
190  world_(world), dest_(dest), key_(key), value_(value)
191  { }
192 
193  virtual ~DelayedSend() { }
194 
195  /// Notify this object that the future has been set.
196 
197  /// This will set the value of the future on the remote node and delete
198  /// this callback object.
199  virtual void notify() {
202  delete this;
203  }
204  }; // class DelayedSend
205 
206  /// Receive data from remote node
207 
208  /// \tparam valueT The data type stored in cache
209  /// \param key The distributed ID
210  /// \return A future to the data
211  template <typename valueT, typename keyT>
212  static Future<valueT> recv_internal(const keyT& key) {
213  return detail::DistCache<keyT>::template get_cache_value<valueT>(key);
214  }
215 
216  /// Send \c value to \c dest
217 
218  /// Send non-future data to \c dest.
219  /// \tparam keyT The key type
220  /// \tparam valueT The value type
221  /// \param dest The node where the data will be sent
222  /// \param key The key that is associated with the data
223  /// \param value The data to be sent to \c dest
224  template <typename keyT, typename valueT>
225  typename std::enable_if<!is_future<valueT>::value >::type
226  send_internal(const ProcessID dest, const keyT& key, const valueT& value) const {
227  typedef detail::DistCache<keyT> dist_cache;
228 
229  if(world_.rank() == dest) {
230  // When dest is this process, skip the task and set the future immediately.
231  dist_cache::set_cache_value(key, value);
232  } else {
233  // Spawn a remote task to set the value
234  world_.taskq.add(dest, dist_cache::template set_cache_value<valueT>, key,
235  value, TaskAttributes::hipri());
236  }
237  }
238 
239  /// Send \c value to \c dest
240 
241  /// Send data that is stored in a future to \c dest. The data in
242  /// \c value is only sent to the remote process once it has been set.
243  /// \tparam keyT The key type
244  /// \tparam valueT The value type
245  /// \param dest The node where the data will be sent
246  /// \param key The key that is associated with the data
247  /// \param value The data to be sent to \c dest
248  template <typename keyT, typename valueT>
249  void send_internal(ProcessID dest, const keyT& key, const Future<valueT>& value) const {
250  typedef detail::DistCache<keyT> dist_cache;
251 
252  if(world_.rank() == dest) {
253  dist_cache::set_cache_value(key, value);
254  } else {
255  // The destination is not this node, so send it to the destination.
256  if(value.probe()) {
257  // Spawn a remote task to set the value
258  world_.taskq.add(dest, dist_cache::template set_cache_value<valueT>, key,
259  value.get(), TaskAttributes::hipri());
260  } else {
261  // The future is not ready, so create a callback object that will
262  // send value to the destination node when it is ready.
263  DelayedSend<keyT, valueT>* delayed_send_callback =
264  new DelayedSend<keyT, valueT>(world_, dest, key, value);
265  const_cast<Future<valueT>&>(value).register_callback(delayed_send_callback);
266 
267  }
268  }
269  }
270 
271  /// Lazy sync parent task
272 
273  /// Send signal to the parent process in the binary tree for a lazy sync
274  /// operation.
275  /// \tparam keyT The key type
276  /// \param parent The parent process of this process in the binary tree
277  /// \param key The lazy sync key
278  template <typename keyT>
279  void lazy_sync_parent(const ProcessID parent, const keyT& key,
280  const ProcessID, const ProcessID) const
281  {
282  send_internal(parent, key, key.proc());
283  }
284 
285  /// Lazy sync parent task
286 
287  /// Send signal to the child processes in the binary tree for a lazy
288  /// sync operation. After the signal has been sent to the children, the
289  /// sync operation, \c op, will be run.
290  /// \tparam keyT The key type
291  /// \tparam opT The sync operation type
292  /// \param child0 The first child process of this process in the binary tree
293  /// \param child1 The second child process of this process in the binary tree
294  /// \param key The key associated with the sync operation
295  /// \param op The sync operation that will be run
296  template <typename keyT, typename opT>
297  void lazy_sync_children(const ProcessID child0, const ProcessID child1,
298  const keyT& key, opT& op, const ProcessID) const
299  {
300  // Signal children to execute the operation.
301  if(child0 != -1)
302  send_internal(child0, key, 1);
303  if(child1 != -1)
304  send_internal(child1, key, 1);
305 
306  // Execute the operation on this process.
307  op();
308  }
309 
310  /// Start a distributed lazy sync operation
311 
312  /// \param key The sync key
313  /// \param op The sync operation to be executed on this process
314  template <typename tagT, typename keyT, typename opT>
315  void lazy_sync_internal(const ProcessID parent, const ProcessID child0,
316  const ProcessID child1, const keyT& key, const opT& op) const {
317  typedef ProcessKey<keyT, tagT> key_type;
318 
319  // Get signals from parent and children.
320  madness::Future<ProcessID> child0_signal = (child0 != -1 ?
321  recv_internal<ProcessID>(key_type(key, child0)) :
323  madness::Future<ProcessID> child1_signal = (child1 != -1 ?
324  recv_internal<ProcessID>(key_type(key, child1)) :
326  madness::Future<ProcessID> parent_signal = (parent != -1 ?
327  recv_internal<ProcessID>(key_type(key, parent)) :
329 
330  // Construct the task that notifies children to run the operation
331  key_type my_key(key, world_.rank());
332  auto lazy_sync_children_fn = & WorldGopInterface::template lazy_sync_children<key_type, opT>;
333  world_.taskq.add(*this, lazy_sync_children_fn,
334  child0_signal, child1_signal, my_key, op, parent_signal,
336 
337  // Send signal to parent
338  if(parent != -1) {
339  if(child0_signal.probe() && child1_signal.probe())
340  send_internal(parent, my_key, world_.rank());
341  else {
342  auto lazy_sync_parent_fn = & WorldGopInterface::template lazy_sync_parent<key_type>;
343  world_.taskq.add(*this, lazy_sync_parent_fn,
344  parent, my_key, child0_signal, child1_signal,
346  }
347  }
348  }
349 
350 
351  template <typename keyT, typename valueT, typename taskfnT>
352  static void bcast_handler(const AmArg& arg) {
353  // Deserialize message arguments
354  taskfnT taskfn;
355  keyT key;
356  valueT value;
357  ProcessID root;
358 
359  arg & taskfn & key & value & root;
360 
361  // Add task to queue
362  arg.get_world()->taskq.add(arg.get_world()->gop, taskfn, key,
363  value, root, TaskAttributes::hipri());
364  }
365 
366  template <typename keyT, typename valueT, typename taskfnT>
367  static void group_bcast_handler(const AmArg& arg) {
368  // Deserialize message arguments
369  taskfnT taskfn;
370  keyT key;
371  valueT value;
372  ProcessID group_root;
373  DistributedID group_key;
374 
375  arg & taskfn & key & value & group_root & group_key;
376 
377  // Get the local group
378  const Future<Group> group = Group::get_group(group_key);
379 
380  // Add task to queue
381  arg.get_world()->taskq.add(arg.get_world()->gop, taskfn, key, value,
382  group_root, group, TaskAttributes::hipri());
383  }
384 
385 
386  /// Broadcast task
387 
388  /// This task will set the local cache with the broadcast data and send
389  /// it to child processes in the binary tree.
390  template <typename keyT, typename valueT>
391  void bcast_task(const keyT& key, const valueT& value, const ProcessID root) const {
392  typedef void (WorldGopInterface::*taskfnT)(const keyT&, const valueT&,
393  const ProcessID) const;
394 
395  // Compute binary tree data
396  ProcessID parent = -1, child0 = -1, child1 = -1;
397  world_.mpi.binary_tree_info(root, parent, child0, child1);
398 
399  // Set the local data, except on the root process
400  if(parent != -1)
402 
403  if(child0 != -1) { // Check that this process has children in the binary tree
404 
405  // Get handler function and arguments
406  void (*handler)(const AmArg&) =
407  & WorldGopInterface::template bcast_handler<keyT, valueT, taskfnT>;
408  AmArg* const args0 = new_am_arg(
409  & WorldGopInterface::template bcast_task<keyT, valueT>,
410  key, value, root);
411 
412  // Send active message to children
413  if(child1 != -1) {
414  AmArg* const args1 = copy_am_arg(*args0);
415  world_.am.send(child1, handler, args1, RMI::ATTR_UNORDERED);
416  }
417  world_.am.send(child0, handler, args0, RMI::ATTR_UNORDERED);
418  }
419  }
420 
421  template <typename keyT, typename valueT>
422  void group_bcast_task(const keyT& key, const valueT& value,
423  const ProcessID group_root, const Group& group) const
424  {
425  typedef void (WorldGopInterface::*taskfnT)(const keyT&, const valueT&,
426  const ProcessID, const Group&) const;
427 
428  // Set the local data, except on the root process
429  ProcessID parent = -1, child0 = -1, child1 = -1;
430  group.make_tree(group_root, parent, child0, child1);
431 
432  // Set the local data
433  if(parent != -1) {
435  group.remote_update();
436  }
437 
438  if(child0 != -1) { // Check that this process has children in the binary tree
439 
440  // Get handler function and arguments
441  void (*handler)(const AmArg&) =
442  & WorldGopInterface::template group_bcast_handler<keyT, valueT, taskfnT>;
443  AmArg* const args0 = new_am_arg(
444  & WorldGopInterface::template group_bcast_task<keyT, valueT>,
445  key, value, group_root, group.id());
446 
447  // Send active message to children
448  if(child1 != -1) {
449  AmArg* const args1 = copy_am_arg(*args0);
450  world_.am.send(child1, handler, args1, RMI::ATTR_UNORDERED);
451  }
452  world_.am.send(child0, handler, args0, RMI::ATTR_UNORDERED);
453  }
454  }
455 
456  /// Broadcast
457 
458  /// Broadcast data from the \c root process to all processes in \c world.
459  /// The input/output data is held by \c value.
460  /// \tparam tagT The tag type that is attached to \c keyT
461  /// \tparam keyT The base key type
462  /// \tparam valueT The value type that will be broadcast
463  /// \param[in] key The key associated with this broadcast
464  /// \param[in,out] value On the \c root process, this is used as the input
465  /// data that will be broadcast to all other processes in the group.
466  /// On other processes it is used as the output to the broadcast
467  /// \param root The process that owns the data to be broadcast
468  /// \throw madness::Exception When \c value has been set, except on the
469  /// \c root process.
470  template <typename tagT, typename keyT, typename valueT>
471  void bcast_internal(const keyT& key, Future<valueT>& value, const ProcessID root) const {
472  MADNESS_ASSERT((root >= 0) && (root < world_.size()));
473  MADNESS_ASSERT((world_.rank() == root) || (! value.probe()));
474 
475  // Add operation tag to key
476  typedef TaggedKey<keyT, tagT> key_type;
477  const key_type tagged_key(key);
478 
479  if(world_.size() > 1) { // Do nothing for the trivial case
480  if(world_.rank() == root) {
481  // This process owns the data to be broadcast.
482 
483  // Spawn remote tasks that will set the local cache for this
484  // broadcast on other nodes.
485  if(value.probe())
486  // The value is ready so send it now
487  bcast_task(tagged_key, value.get(), root);
488  else {
489  // The value is not ready so spawn a task to send the
490  // data when it is ready.
491  auto bcast_task_fn = & WorldGopInterface::template bcast_task<key_type, valueT>;
492  world_.taskq.add(*this, bcast_task_fn,
493  tagged_key, value, root, TaskAttributes::hipri());
494  }
495  } else {
496  MADNESS_ASSERT(! value.probe());
497 
498  // Get the broadcast value from local cache
500  }
501  }
502  }
503 
504  /// Group broadcast
505 
506  /// Broadcast data from the \c group_root process to all processes in
507  /// \c group. The input/output data is held by \c value.
508  /// \tparam tagT The tag type that is attached to \c keyT
509  /// \tparam keyT The base key type
510  /// \tparam valueT The value type that will be broadcast
511  /// \param[in] key The key associated with this broadcast
512  /// \param[in,out] value On the \c group_root process, this is used as the
513  /// input data that will be broadcast to all other processes in the group.
514  /// On other processes it is used as the output to the broadcast
515  /// \param group_root The process in \c group that owns the data to be
516  /// broadcast
517  /// \param group The process group where value will be broadcast
518  /// \throw madness::Exception When \c value has been set, except on the
519  /// \c group_root process.
520  template <typename tagT, typename keyT, typename valueT>
521  void bcast_internal(const keyT& key, Future<valueT>& value,
522  const ProcessID group_root, const Group& group) const
523  {
524  // Construct the internal broadcast key
525  typedef TaggedKey<keyT, tagT> key_type;
526  const key_type tagged_key(key);
527 
528  if(group.rank() == group_root) {
529  // This process owns the data to be broadcast.
530  if(value.probe())
531  group_bcast_task(tagged_key, value.get(), group_root, group);
532  else {
533  auto group_bcast_task_fn = & WorldGopInterface::template group_bcast_task<key_type, valueT>;
534  world_.taskq.add(this, group_bcast_task_fn,
535  tagged_key, value, group_root, group,
537  }
538  } else {
539  MADNESS_ASSERT(! value.probe());
540 
541  // This is not the root process, so retrieve the broadcast data
543 
544  // Increment local use counter for group
545  group.local_update();
546  }
547  }
548 
549  template <typename valueT, typename opT>
550  static typename detail::result_of<opT>::type
551  reduce_task(const valueT& value, const opT& op) {
552  typename detail::result_of<opT>::type result = op();
553  op(result, value);
554  return result;
555  }
556 
557  template <typename opT>
558  static typename detail::result_of<opT>::type
559  reduce_result_task(const std::vector<Future<typename detail::result_of<opT>::type> >& results,
560  const opT& op)
561  {
562  MADNESS_ASSERT(results.size() != 0ul);
563  Future<typename detail::result_of<opT>::type> result = results.front();
564  for(std::size_t i = 1ul; i < results.size(); ++i)
565  op(result.get(), results[i].get());
566  return result.get();
567  }
568 
569  /// Distributed reduce
570 
571  /// \tparam tagT The tag type to be added to the key type
572  /// \tparam keyT The key type
573  /// \tparam valueT The data type to be reduced
574  /// \tparam opT The reduction operation type
575  /// \param key The key associated with this reduction
576  /// \param value The local value to be reduced
577  /// \param op The reduction operation to be applied to local and remote data
578  /// \param root The process that will receive the result of the reduction
579  /// \return A future to the reduce value on the root process, otherwise an
580  /// uninitialized future that may be ignored.
581  template <typename tagT, typename keyT, typename valueT, typename opT>
583  reduce_internal(const ProcessID parent, const ProcessID child0,
584  const ProcessID child1, const ProcessID root, const keyT& key,
585  const valueT& value, const opT& op)
586  {
587  // Create tagged key
588  typedef ProcessKey<keyT, tagT> key_type;
589  typedef typename detail::result_of<opT>::type result_type;
590  typedef typename remove_future<valueT>::type value_type;
591  std::vector<Future<result_type> > results;
592  results.reserve(3);
593 
594  // Add local data to vector of values to reduce
595  results.push_back(world_.taskq.add(WorldGopInterface::template reduce_task<value_type, opT>,
596  value, op, TaskAttributes::hipri()));
597 
598  // Reduce child data
599  if(child0 != -1)
600  results.push_back(recv_internal<result_type>(key_type(key, child0)));
601  if(child1 != -1)
602  results.push_back(recv_internal<result_type>(key_type(key, child1)));
603 
604  // Submit the local reduction task
605  Future<result_type> local_result =
606  world_.taskq.add(WorldGopInterface::template reduce_result_task<opT>,
607  results, op, TaskAttributes::hipri());
608 
609  // Send reduced value to parent or, if this is the root process, set the
610  // result future.
611  if(parent == -1)
612  return local_result;
613  else
614  send_internal(parent, key_type(key, world_.rank()), local_result);
615 
617  }
618 
619  /// Implementation of fence
620 
621  /// \param[in] epilogue the action to execute (by the calling thread) immediately after the fence
622  /// \param[in] pause_during_epilogue whether to suspend work while executing epilogue
623  /// \param[in] debug set to true to print progress statistics using madness::print(); the default is false.
624  /// \warning currently only \c pause_during_epilogue=false is supported
625  void fence_impl(std::function<void()> epilogue = []{},
626  bool pause_during_epilogue = false,
627  bool debug = false);
628 
630  int result = std::numeric_limits<int>::max();
631  const auto* initial_max_reducebcast_msg_size_cstr = std::getenv("MAD_MAX_REDUCEBCAST_MSG_SIZE");
632  if (initial_max_reducebcast_msg_size_cstr) {
633  auto result_u64 = cstr_to_memory_size(initial_max_reducebcast_msg_size_cstr);
634  const auto do_print = SafeMPI::COMM_WORLD.Get_rank() == 0 && !madness::quiet();
635  if (result_u64>std::numeric_limits<int>::max()) {
636  if (do_print)
637  std::cout
638  << "!!MADNESS WARNING: Invalid value for environment variable MAD_MAX_REDUCEBCAST_MSG_SIZE.\n"
639  << "!!MADNESS WARNING: MAD_MAX_REDUCEBCAST_MSG_SIZE = "
640  << result_u64 << "\n";
642  }
643  result = static_cast<int>(result_u64);
644  if(do_print) {
645  std::cout
646  << "MADNESS max msg size for GOP reduce/broadcast set to "
647  << result << " bytes.\n";
648  }
649  }
650  return result;
651  }
652 
653  public:
654 
655  // In the World constructor can ONLY rely on MPI and MPI being initialized
657  world_(world), deferred_(new detail::DeferredCleanup()), debug_(false), max_reducebcast_msg_size_(initial_max_reducebcast_msg_size())
658  { }
659 
661  deferred_->destroy(true);
662  deferred_->do_cleanup();
663  }
664 
665 
666  /// Set debug flag to new value and return old value
667  bool set_debug(bool value) {
668  bool status = debug_;
669  debug_ = value;
670  return status;
671  }
672 
673  /// Set forbid_fence flag to new value and return old value
674  bool set_forbid_fence(bool value) {
675  bool status = forbid_fence_;
676  forbid_fence_ = value;
677  return status;
678  }
679 
680  /// Set the maximum size of messages (in bytes) sent by reduce and broadcast
681 
682  /// \param sz the maximum size of messages (in bytes) sent by reduce and broadcast
683  /// \return the previous maximum size of messages (in bytes) sent by reduce and broadcast
684  /// \pre `sz>0`
686  MADNESS_ASSERT(sz>0);
689  }
690 
691 
692  /// Returns the maximum size of messages (in bytes) sent by reduce and broadcast
693 
694  /// \return the maximum size of messages (in bytes) sent by reduce and broadcast
697  }
698 
699  /// Synchronizes all processes in communicator ... does NOT fence pending AM or tasks
700  void barrier() {
701  long i = world_.rank();
702  sum(i);
703  if (i != world_.size()*(world_.size()-1)/2) error("bad value after sum in barrier");
704  }
705 
706 
707  /// Synchronizes all processes in communicator AND globally ensures no pending AM or tasks
708 
709  /// \internal Runs Dykstra-like termination algorithm on binary tree by
710  /// locally ensuring ntask=0 and all am sent and processed,
711  /// and then participating in a global sum of nsent and nrecv.
712  /// Then globally checks that nsent=nrecv and that both are
713  /// constant over two traversals. We are then sure
714  /// that all tasks and AM are processed and there no AM in
715  /// flight.
716  /// \param[in] debug set to true to print progress statistics using madness::print(); the default is false.
717  void fence(bool debug = false);
718 
719  /// Executes an action on single (this) thread after ensuring all other work is done
720 
721  /// \param[in] action the action to execute (by the calling thread)
722  void serial_invoke(std::function<void()> action);
723 
724  /// Broadcasts bytes from process root while still processing AM & tasks
725 
726  /// Optimizations can be added for long messages
727  void broadcast(void* buf, size_t nbyte, ProcessID root, bool dowork = true, Tag bcast_tag = -1);
728 
729 
730  /// Broadcasts typed contiguous data from process root while still processing AM & tasks
731 
732  /// Optimizations can be added for long messages
733  template <typename T, typename = std::enable_if_t<std::is_trivially_copyable_v<T>>>
734  inline void broadcast(T* buf, size_t nelem, ProcessID root) {
735  broadcast((void *) buf, nelem*sizeof(T), root);
736  }
737 
738  /// Broadcast of a scalar from node 0 to all other nodes
739  template <typename T, typename = std::enable_if_t<std::is_trivially_copyable_v<T>>>
740  void broadcast(T& t) {
741  broadcast(&t, 1, 0);
742  }
743 
744  /// Broadcast of a scalar from node root to all other nodes
745  template <typename T, typename = std::enable_if_t<std::is_trivially_copyable_v<T>>>
746  void broadcast(T& t, ProcessID root) {
747  broadcast(&t, 1, root);
748  }
749 
750  /// Broadcast a serializable object
751  template <typename objT,
752  typename = std::void_t<decltype(std::declval<archive::BufferInputArchive&>()&std::declval<objT&>())>,
753  typename = std::void_t<decltype(std::declval<archive::BufferOutputArchive&>()&std::declval<const objT&>())>>
754  void broadcast_serializable(objT& obj, ProcessID root) {
755  MADNESS_ASSERT(root < world_.size());
756  if (world_.size() == 1) return;
757 
758  size_t BUFLEN;
759  if (world_.rank() == root) {
761  count & obj;
762  BUFLEN = count.size();
763  }
764  broadcast(BUFLEN, root);
765 
766  unsigned char* buf = new unsigned char[BUFLEN];
767  if (world_.rank() == root) {
768  archive::BufferOutputArchive ar(buf,BUFLEN);
769  ar & obj;
770  }
771  broadcast(buf, BUFLEN, root);
772  if (world_.rank() != root) {
773  archive::BufferInputArchive ar(buf,BUFLEN);
774  ar & obj;
775  }
776  delete [] buf;
777  }
778 
779  /// Inplace global reduction (like MPI all_reduce) while still processing AM & tasks
780 
781  /// Optimizations can be added for long messages and to reduce the memory footprint
782  template <typename T, class opT>
783  void reduce(T* buf, std::size_t nelem, opT op) {
784  static_assert(std::is_trivially_copyable_v<T>, "T must be trivially copyable");
785 
786  ProcessID parent, child0, child1;
787  world_.mpi.binary_tree_info(0, parent, child0, child1);
788  const std::size_t nelem_per_maxmsg =
789  max_reducebcast_msg_size() / sizeof(T);
790 
791  const auto buf_size = ((sizeof(T) * std::min(nelem_per_maxmsg, nelem) +
792  std::alignment_of_v<T> - 1) /
793  std::alignment_of_v<T>) * std::alignment_of_v<T>;
794  struct free_dtor {
795  void operator()(T *ptr) {
796  if (ptr != nullptr)
797  std::free(ptr);
798  };
799  };
800  using sptr_t = std::unique_ptr<T[], free_dtor>;
801 
802  auto aligned_buf_alloc = [&]() -> T* {
803  // posix_memalign requires alignment to be an integer multiple of sizeof(void*)!! so ensure that
804  const std::size_t alignment =
805  ((std::alignment_of_v<T> + sizeof(void *) - 1) /
806  sizeof(void *)) *
807  sizeof(void *);
808 #ifdef HAVE_POSIX_MEMALIGN
809  void *ptr;
810  if (posix_memalign(&ptr, alignment, buf_size) != 0) {
811  throw std::bad_alloc();
812  }
813  return static_cast<T *>(ptr);
814 #else
815  return static_cast<T *>(std::aligned_alloc(alignment, buf_size));
816 #endif
817  };
818 
819  sptr_t buf0;
820  if (child0 != -1)
821  buf0 = sptr_t(aligned_buf_alloc(),
822  free_dtor{});
823  sptr_t buf1(nullptr);
824  if (child1 != -1)
825  buf1 = sptr_t(aligned_buf_alloc(),
826  free_dtor{});
827 
828  auto reduce_impl = [&,this](T* buf, size_t nelem) {
829  MADNESS_ASSERT(nelem <= nelem_per_maxmsg);
830  SafeMPI::Request req0, req1;
831  Tag gsum_tag = world_.mpi.unique_tag();
832 
833  if (child0 != -1)
834  req0 = world_.mpi.Irecv(buf0.get(), nelem * sizeof(T), MPI_BYTE,
835  child0, gsum_tag);
836  if (child1 != -1)
837  req1 = world_.mpi.Irecv(buf1.get(), nelem * sizeof(T), MPI_BYTE,
838  child1, gsum_tag);
839 
840  if (child0 != -1) {
841  World::await(req0);
842  for (long i = 0; i < (long)nelem; ++i)
843  buf[i] = op(buf[i], buf0[i]);
844  }
845  if (child1 != -1) {
846  World::await(req1);
847  for (long i = 0; i < (long)nelem; ++i)
848  buf[i] = op(buf[i], buf1[i]);
849  }
850 
851  if (parent != -1) {
852  req0 = world_.mpi.Isend(buf, nelem * sizeof(T), MPI_BYTE, parent,
853  gsum_tag);
854  World::await(req0);
855  }
856 
857  broadcast(buf, nelem, 0);
858  };
859 
860  while (nelem) {
861  const int n = std::min(nelem_per_maxmsg, nelem);
862  reduce_impl(buf, n);
863  nelem -= n;
864  buf += n;
865  }
866  }
867 
868  /// Inplace global sum while still processing AM & tasks
869  template <typename T>
870  inline void sum(T* buf, size_t nelem) {
871  reduce< T, WorldSumOp<T> >(buf, nelem, WorldSumOp<T>());
872  }
873 
874  /// Inplace global min while still processing AM & tasks
875  template <typename T>
876  inline void min(T* buf, size_t nelem) {
877  reduce< T, WorldMinOp<T> >(buf, nelem, WorldMinOp<T>());
878  }
879 
880  /// Inplace global max while still processing AM & tasks
881  template <typename T>
882  inline void max(T* buf, size_t nelem) {
883  reduce< T, WorldMaxOp<T> >(buf, nelem, WorldMaxOp<T>());
884  }
885 
886  /// Inplace global absmin while still processing AM & tasks
887  template <typename T>
888  inline void absmin(T* buf, size_t nelem) {
889  reduce< T, WorldAbsMinOp<T> >(buf, nelem, WorldAbsMinOp<T>());
890  }
891 
892  /// Inplace global absmax while still processing AM & tasks
893  template <typename T>
894  inline void absmax(T* buf, size_t nelem) {
895  reduce< T, WorldAbsMaxOp<T> >(buf, nelem, WorldAbsMaxOp<T>());
896  }
897 
898  /// Inplace global product while still processing AM & tasks
899  template <typename T>
900  inline void product(T* buf, size_t nelem) {
901  reduce< T, WorldMultOp<T> >(buf, nelem, WorldMultOp<T>());
902  }
903 
904  template <typename T>
905  inline void bit_and(T* buf, size_t nelem) {
906  reduce< T, WorldBitAndOp<T> >(buf, nelem, WorldBitAndOp<T>());
907  }
908 
909  template <typename T>
910  inline void bit_or(T* buf, size_t nelem) {
911  reduce< T, WorldBitOrOp<T> >(buf, nelem, WorldBitOrOp<T>());
912  }
913 
914  template <typename T>
915  inline void bit_xor(T* buf, size_t nelem) {
916  reduce< T, WorldBitXorOp<T> >(buf, nelem, WorldBitXorOp<T>());
917  }
918 
919  template <typename T>
920  inline void logic_and(T* buf, size_t nelem) {
921  reduce< T, WorldLogicAndOp<T> >(buf, nelem, WorldLogicAndOp<T>());
922  }
923 
924  template <typename T>
925  inline void logic_or(T* buf, size_t nelem) {
926  reduce< T, WorldLogicOrOp<T> >(buf, nelem, WorldLogicOrOp<T>());
927  }
928 
929  /// Global sum of a scalar while still processing AM & tasks
930  template <typename T>
931  void sum(T& a) {
932  sum(&a, 1);
933  }
934 
935  /// Global max of a scalar while still processing AM & tasks
936  template <typename T>
937  void max(T& a) {
938  max(&a, 1);
939  }
940 
941  /// Global min of a scalar while still processing AM & tasks
942  template <typename T>
943  void min(T& a) {
944  min(&a, 1);
945  }
946 
947  /// Concatenate an STL vector of serializable stuff onto node 0
948 
949  /// \param[in] v input vector
950  /// \param[in] bufsz the max number of bytes in the result; must be less than std::numeric_limits<int>::max()
951  /// \return on rank 0 returns the concatenated vector, elsewhere returns an empty vector
952  template <typename T>
953  std::vector<T> concat0(const std::vector<T>& v, size_t bufsz=1024*1024) {
955  // bufsz must be multiple of alignment!!! so ensure that
956  bufsz = ((bufsz + sizeof(void*) - 1) / sizeof(void*)) * sizeof(void*);
957 
958  ProcessID parent, child0, child1;
959  world_.mpi.binary_tree_info(0, parent, child0, child1);
960  int child0_nbatch = 0, child1_nbatch = 0;
961 
962  struct free_dtor {
963  void operator()(std::byte *ptr) {
964  if (ptr != nullptr)
965  std::free(ptr);
966  };
967  };
968  using sptr_t = std::unique_ptr<std::byte[], free_dtor>;
969 
970  auto buf0 = sptr_t(static_cast<std::byte *>(
971  std::aligned_alloc(sizeof(void *), bufsz)),
972  free_dtor{});
973  auto buf1 = sptr_t(static_cast<std::byte *>(
974  std::aligned_alloc(sizeof(void *), bufsz)),
975  free_dtor{});
976 
977  // transfer data in chunks at most this large
978  const int batch_size = static_cast<int>(
979  std::min(static_cast<size_t>(max_reducebcast_msg_size()), bufsz));
980 
981  // precompute max # of tags any node ... will need, and allocate them on every node to avoid tag counter divergence
982  const int max_nbatch = bufsz / batch_size;
983  // one tag is reserved for sending the number of messages to expect and the size of the last message
984  const int max_ntags = max_nbatch + 1;
985  MADNESS_ASSERT(max_nbatch < world_.mpi.unique_tag_period());
986  std::vector<Tag> tags; // stores tags used to send each batch
987  tags.reserve(max_nbatch);
988  for(int t=0; t<max_ntags; ++t) tags.push_back(world_.mpi.unique_tag());
989 
990  if (child0 != -1 || child1 != -1) {
991  // receive # of batches
992 
993  auto receive_nbatch = [&,this]() {
994  if (child0 != -1) {
995  world_.mpi.Recv(&child0_nbatch, 1, MPI_INT, child0,
996  tags[0]);
997  }
998  if (child1 != -1) {
999  world_.mpi.Recv(&child1_nbatch, 1, MPI_INT, child1,
1000  tags[0]);
1001  }
1002  };
1003 
1004  receive_nbatch();
1005 
1006  // receive data in batches
1007 
1008  auto receive_batch = [&,this](const int batch, const size_t buf_offset) {
1009  SafeMPI::Request req0, req1;
1010  if (child0 != -1 && batch < child0_nbatch) {
1011  int msg_size = batch_size;
1012  // if last batch, receive # of bytes to expect
1013  if (batch + 1 == child0_nbatch) {
1014  auto req = world_.mpi.Irecv(
1015  &msg_size, 1, MPI_INT, child0, tags[0]);
1016  World::await(req);
1017  }
1018 
1019  req0 = world_.mpi.Irecv(buf0.get() + buf_offset,
1020  msg_size, MPI_BYTE, child0,
1021  tags[batch + 1]);
1022  }
1023  if (child1 != -1 && batch < child1_nbatch) {
1024  int msg_size = batch_size;
1025  // if last batch, receive # of bytes to expect
1026  if (batch + 1 == child1_nbatch) {
1027  auto req = world_.mpi.Irecv(
1028  &msg_size, 1, MPI_INT, child1, tags[0]);
1029  World::await(req);
1030  }
1031  req1 = world_.mpi.Irecv(buf1.get() + buf_offset,
1032  msg_size, MPI_BYTE, child1,
1033  tags[batch + 1]);
1034  }
1035 
1036  if (child0 != -1 && batch < child0_nbatch) {
1037  World::await(req0);
1038  }
1039  if (child1 != -1 && batch < child1_nbatch) {
1040  World::await(req1);
1041  }
1042  };
1043 
1044  size_t buf_offset = 0;
1045  int batch = 0;
1046  while (buf_offset < bufsz) {
1047  receive_batch(batch, buf_offset);
1048  buf_offset += batch_size;
1049  buf_offset = std::min(buf_offset, bufsz);
1050  ++batch;
1051  }
1052  }
1053 
1054 
1055  std::vector<T> left, right;
1056  if (child0 != -1) {
1057  archive::BufferInputArchive ar(buf0.get(), bufsz);
1058  ar & left;
1059  }
1060  if (child1 != -1) {
1061  archive::BufferInputArchive ar(buf1.get(), bufsz);
1062  ar & right;
1063  for (unsigned int i = 0; i < right.size(); ++i)
1064  left.push_back(right[i]);
1065  }
1066  for (unsigned int i=0; i<v.size(); ++i) left.push_back(v[i]);
1067 
1068  // send data in batches
1069  if (parent != -1) {
1070  archive::BufferOutputArchive ar(buf0.get(), bufsz);
1071  ar & left;
1072  const auto total_nbytes_to_send = ar.size();
1073 
1074  // send nbatches to expect
1075  const int nbatch = (total_nbytes_to_send + batch_size - 1) / batch_size;
1076  world_.mpi.Send(&nbatch, 1, MPI_INT, parent,
1077  tags[0]);
1078 
1079  size_t buf_offset = 0;
1080  int batch = 0;
1081  while (buf_offset < bufsz) {
1082 
1083  // send data in batches
1084  auto send_batch = [&,this](const int batch, const size_t buf_offset) {
1085  const int nbytes_to_send = static_cast<int>(
1086  std::min(static_cast<size_t>(batch_size),
1087  total_nbytes_to_send - buf_offset));
1088  // if last batch, send # of bytes to expect
1089  if (batch + 1 == nbatch) {
1090  auto req = world_.mpi.Isend(
1091  &nbytes_to_send, 1, MPI_INT, parent, tags[0]);
1092  World::await(req);
1093  }
1094  auto req0 =
1095  world_.mpi.Isend(buf0.get() + buf_offset, nbytes_to_send,
1096  MPI_BYTE, parent, tags[batch + 1]);
1097  World::await(req0);
1098  };
1099 
1100  send_batch(batch, buf_offset);
1101  buf_offset += batch_size;
1102  buf_offset = std::min(buf_offset, bufsz);
1103  ++batch;
1104  }
1105  }
1106 
1107  if (parent == -1) return left;
1108  else return std::vector<T>();
1109  }
1110 
1111  /// Receive data from \c source
1112 
1113  /// \tparam valueT The data type stored in cache
1114  /// \tparam keyT The key type
1115  /// \param source The process that is sending the data to this process
1116  /// \param key The key associated with the received data
1117  /// \return A future that will be set with the received data
1118  /// \note It is the user's responsibility to ensure that \c key does not
1119  /// conflict with other calls to \c recv. Keys may be reused after the
1120  /// associated operation has finished.
1121  template <typename valueT, typename keyT>
1122  static Future<valueT> recv(const ProcessID source, const keyT& key) {
1123  return recv_internal<valueT>(ProcessKey<keyT, PointToPointTag>(key, source));
1124  }
1125 
1126  /// Send value to \c dest
1127 
1128  /// \tparam keyT The key type
1129  /// \tparam valueT The value type (this may be a \c Future type)
1130  /// \param dest The process where the data will be sent
1131  /// \param key The key that is associated with the data
1132  /// \param value The data to be sent to \c dest
1133  /// \note It is the user's responsibility to ensure that \c key does not
1134  /// conflict with other calls to \c send. Keys may be reused after the
1135  /// associated operation has finished.
1136  template <typename keyT, typename valueT>
1137  void send(const ProcessID dest, const keyT& key, const valueT& value) const {
1139  }
1140 
1141  /// Lazy sync
1142 
1143  /// Lazy sync functions are asynchronous barriers with a nullary functor
1144  /// that is called after all processes have called it with the same
1145  /// key. You can think of lazy_sync as an asynchronous barrier. The
1146  /// lazy_sync functor must have the following signature:
1147  /// \code
1148  /// class SyncFunc {
1149  /// public:
1150  /// // typedefs
1151  /// typedef void result_type;
1152  ///
1153  /// // Constructors
1154  /// SyncFunc(const SyncFunc&);
1155  ///
1156  /// // The function that performs the sync operation
1157  /// void operator()();
1158  ///
1159  /// }; // class SyncFunc
1160  /// \endcode
1161  /// \tparam keyT The key type
1162  /// \tparam opT The operation type
1163  /// \param key The sync key
1164  /// \param op The sync operation to be executed on this process
1165  /// \note It is the user's responsibility to ensure that \c key does not
1166  /// conflict with other calls to \c lazy_sync. Keys may be reused after
1167  /// the associated operation has finished.
1168  template <typename keyT, typename opT>
1169  void lazy_sync(const keyT& key, const opT& op) const {
1170  if(world_.size() > 1) { // Do nothing for the trivial case
1171  // Get the binary tree data
1172  Hash<keyT> hasher;
1173  const ProcessID root = hasher(key) % world_.size();
1174  ProcessID parent = -1, child0 = -1, child1 = -1;
1175  world_.mpi.binary_tree_info(root, parent, child0, child1);
1176 
1177  lazy_sync_internal<LazySyncTag>(parent, child0, child1, key, op);
1178  } else {
1179  auto lazy_sync_children_fn = & WorldGopInterface::template lazy_sync_children<keyT, opT>;
1180  // There is only one process, so run the sync operation now.
1181  world_.taskq.add(*this, lazy_sync_children_fn,
1182  -1, -1, key, op, -1, TaskAttributes::hipri());
1183  }
1184  }
1185 
1186 
1187  /// Group lazy sync
1188 
1189  /// Lazy sync functions are asynchronous barriers with a nullary functor
1190  /// that is called after all processes in the group have called it with
1191  /// the same key. You can think of lazy_sync as an asynchronous barrier.
1192  /// The \c op functor must have the following signature:
1193  /// \code
1194  /// class SyncFunc {
1195  /// public:
1196  /// // typedefs
1197  /// typedef void result_type;
1198  ///
1199  /// // Constructors
1200  /// SyncFunc(const SyncFunc&);
1201  ///
1202  /// // The function that performs the sync operation
1203  /// void operator()();
1204  ///
1205  /// }; // class SyncFunc
1206  /// \endcode
1207  /// \tparam keyT The key type
1208  /// \tparam opT The operation type
1209  /// \param key The sync key
1210  /// \param op The sync operation to be executed on this process
1211  /// \note It is the user's responsibility to ensure that \c key does not
1212  /// conflict with other calls to \c lazy_sync. Keys may be reused after
1213  /// the associated operation has finished.
1214  template <typename keyT, typename opT>
1215  void lazy_sync(const keyT& key, const opT& op, const Group& group) const {
1216  MADNESS_ASSERT(! group.empty());
1217  MADNESS_ASSERT(group.get_world().id() == world_.id());
1218 
1219  if(group.size() > 1) { // Do nothing for the trivial case
1220  // Get the binary tree data
1221  Hash<keyT> hasher;
1222  const ProcessID group_root = hasher(key) % group.size();
1223  ProcessID parent = -1, child0 = -1, child1 = -1;
1224  group.make_tree(group_root, parent, child0, child1);
1225 
1226  lazy_sync_internal<GroupLazySyncTag>(parent, child0, child1, key, op);
1227  } else {
1228  auto lazy_sync_children_fn = & WorldGopInterface::template lazy_sync_children<keyT, opT>;
1229  world_.taskq.add(*this, lazy_sync_children_fn,
1230  -1, -1, key, op, -1, TaskAttributes::hipri());
1231  }
1232  }
1233 
1234  /// Broadcast
1235 
1236  /// Broadcast data from the \c root process to all processes. The input/
1237  /// output data is held by \c value.
1238  /// \param[in] key The key associated with this broadcast
1239  /// \param[in,out] value On the \c root process, this is used as the input
1240  /// data that will be broadcast to all other processes. On other
1241  /// processes it is used as the output to the broadcast.
1242  /// \param root The process that owns the data to be broadcast
1243  /// \throw madness::Exception When \c root is less than 0 or greater
1244  /// than or equal to the world size.
1245  /// \throw madness::Exception When \c value has been set, except on the
1246  /// \c root process.
1247  /// \note It is the user's responsibility to ensure that \c key does not
1248  /// conflict with other calls to \c bcast. Keys may be reused after
1249  /// the associated operation has finished.
1250  template <typename keyT, typename valueT>
1251  void bcast(const keyT& key, Future<valueT>& value, const ProcessID root) const {
1252  MADNESS_ASSERT((root >= 0) && (root < world_.size()));
1253  MADNESS_ASSERT((world_.rank() == root) || (! value.probe()));
1254 
1255  if(world_.size() > 1) // Do nothing for the trivial case
1256  bcast_internal<BcastTag>(key, value, root);
1257  }
1258 
1259  /// Group broadcast
1260 
1261  /// Broadcast data from the \c group_root process to all processes in
1262  /// \c group. The input/output data is held by \c value.
1263  /// \param[in] key The key associated with this broadcast
1264  /// \param[in,out] value On the \c group_root process, this is used as the
1265  /// input data that will be broadcast to all other processes in the group.
1266  /// On other processes it is used as the output to the broadcast
1267  /// \param group_root The process in \c group that owns the data to be
1268  /// broadcast
1269  /// \param group The process group where value will be broadcast
1270  /// \throw madness::Exception When \c group is empty
1271  /// \throw madness::Exception When \c group is not registered
1272  /// \throw madness::Exception When the world id of \c group is not
1273  /// equal to that of the world used to construct this object
1274  /// \throw madness::Exception When this process is not in the group
1275  /// \throw madness::Exception When \c group_root is less than 0 or
1276  /// greater than or equal to \c group size
1277  /// \throw madness::Exception When \c data has been set except on the
1278  /// \c root process
1279  /// \note It is the user's responsibility to ensure that \c key does not
1280  /// conflict with other calls to \c bcast. Keys may be reused after
1281  /// the associated operation has finished.
1282  template <typename keyT, typename valueT>
1283  void bcast(const keyT& key, Future<valueT>& value,
1284  const ProcessID group_root, const Group& group) const
1285  {
1286  MADNESS_ASSERT(! group.empty());
1287  MADNESS_ASSERT(group.get_world().id() == world_.id());
1288  MADNESS_ASSERT((group_root >= 0) && (group_root < group.size()));
1289  MADNESS_ASSERT((group.rank() == group_root) || (! value.probe()));
1290 
1291  if(group.size() > 1) // Do nothing for the trivial case
1292  bcast_internal<GroupBcastTag>(key, value, group_root, group);
1293  }
1294 
1295  /// Distributed reduce
1296 
1297  /// The reduce functor must have the following signature:
1298  /// \code
1299  /// class ReduceFunc {
1300  /// public:
1301  /// // Typedefs
1302  /// typedef ... result_type;
1303  /// typedef ... argument_type;
1304  ///
1305  /// // Constructors
1306  /// ReduceFunc(const ReduceFunc&);
1307  ///
1308  /// // Initialization operation, which returns a default result object
1309  /// result_type operator()() const;
1310  ///
1311  /// // Reduce two result objects
1312  /// void operator()(result_type&, const result_type&) const;
1313  ///
1314  /// // Reduce a result object and an argument object
1315  /// void operator()(result_type&, const argument_type&) const;
1316  /// }; // class ReduceFunc
1317  /// \endcode
1318  /// \tparam keyT The key type
1319  /// \tparam valueT The data type to be reduced
1320  /// \tparam opT The reduction operation type
1321  /// \param key The key associated with this reduction
1322  /// \param value The local value to be reduced
1323  /// \param op The reduction operation to be applied to local and remote data
1324  /// \param root The process that will receive the result of the reduction
1325  /// \return A future to the reduce value on the root process, otherwise an
1326  /// uninitialized future that may be ignored.
1327  /// \note It is the user's responsibility to ensure that \c key does not
1328  /// conflict with other calls to \c reduce. Keys may be reused after
1329  /// the associated operation has finished.
1330  template <typename keyT, typename valueT, typename opT>
1332  reduce(const keyT& key, const valueT& value, const opT& op, const ProcessID root) {
1333  MADNESS_ASSERT((root >= 0) && (root < world_.size()));
1334 
1335  // Get the binary tree data
1336  ProcessID parent = -1, child0 = -1, child1 = -1;
1337  world_.mpi.binary_tree_info(root, parent, child0, child1);
1338 
1339  return reduce_internal<ReduceTag>(parent, child0, child1, root, key,
1340  value, op);
1341  }
1342 
1343  /// Distributed group reduce
1344 
1345  /// The reduce functor must have the following signature:
1346  /// \code
1347  /// class ReduceFunc {
1348  /// public:
1349  /// // Typedefs
1350  /// typedef ... result_type;
1351  /// typedef ... argument_type;
1352  ///
1353  /// // Constructors
1354  /// ReduceFunc(const ReduceFunc&);
1355  ///
1356  /// // Initialization operation, which returns a default result object
1357  /// result_type operator()() const;
1358  ///
1359  /// // Reduce two result objects
1360  /// void operator()(result_type&, const result_type&) const;
1361  ///
1362  /// // Reduce a result object and an argument object
1363  /// void operator()(result_type&, const argument_type&) const;
1364  /// }; // class ReduceFunc
1365  /// \endcode
1366  /// \tparam keyT The key type
1367  /// \tparam valueT The data type to be reduced
1368  /// \tparam opT The reduction operation type
1369  /// \param key The key associated with this reduction
1370  /// \param value The local value to be reduced
1371  /// \param op The reduction operation to be applied to local and remote data
1372  /// \param group_root The group process that will receive the result of the reduction
1373  /// \param group The group that will preform the reduction
1374  /// \return A future to the reduce value on the root process, otherwise an
1375  /// uninitialized future that may be ignored.
1376  /// \throw madness::Exception When \c group is empty
1377  /// \throw madness::Exception When \c group is not registered
1378  /// \throw madness::Exception When the world id of \c group is not
1379  /// equal to that of the world used to construct this object
1380  /// \throw madness::Exception When this process is not in the group
1381  /// \throw madness::Exception When \c group_root is less than zero or
1382  /// greater than or equal to \c group size.
1383  /// \note It is the user's responsibility to ensure that \c key does not
1384  /// conflict with other calls to \c reduce. Keys may be reused after
1385  /// the associated operation has finished.
1386  template <typename keyT, typename valueT, typename opT>
1388  reduce(const keyT& key, const valueT& value, const opT& op,
1389  const ProcessID group_root, const Group& group)
1390  {
1391  MADNESS_ASSERT(! group.empty());
1392  MADNESS_ASSERT(group.get_world().id() == world_.id());
1393  MADNESS_ASSERT((group_root >= 0) && (group_root < group.size()));
1394 
1395  // Get the binary tree data
1396  ProcessID parent = -1, child0 = -1, child1 = -1;
1397  group.make_tree(group_root, parent, child0, child1);
1398 
1399  return reduce_internal<ReduceTag>(parent, child0, child1, group_root,
1400  key, value, op);
1401  }
1402 
1403  /// Distributed all reduce
1404 
1405  /// The reduce functor must have the following signature:
1406  /// \code
1407  /// class ReduceFunc {
1408  /// public:
1409  /// // Typedefs
1410  /// typedef ... result_type;
1411  /// typedef ... argument_type;
1412  ///
1413  /// // Constructors
1414  /// ReduceFunc(const ReduceFunc&);
1415  ///
1416  /// // Initialization operation, which returns a default result object
1417  /// result_type operator()() const;
1418  ///
1419  /// // Reduce two result objects
1420  /// void operator()(result_type&, const result_type&) const;
1421  ///
1422  /// // Reduce a result object and an argument object
1423  /// void operator()(result_type&, const argument_type&) const;
1424  /// }; // class ReduceFunc
1425  /// \endcode
1426  /// \tparam keyT The key type
1427  /// \tparam valueT The data type to be reduced
1428  /// \tparam opT The reduction operation type
1429  /// \param key The key associated with this reduction
1430  /// \param value The local value to be reduced
1431  /// \param op The reduction operation to be applied to local and remote data
1432  /// \return A future to the reduce value on the root process, otherwise an
1433  /// uninitialized future that may be ignored.
1434  /// \note It is the user's responsibility to ensure that \c key does not
1435  /// conflict with other calls to \c all_reduce. Keys may be reused after
1436  /// the associated operation has finished.
1437  template <typename keyT, typename valueT, typename opT>
1439  all_reduce(const keyT& key, const valueT& value, const opT& op) {
1440  // Compute the parent and child processes of this process in a binary tree.
1441  Hash<keyT> hasher;
1442  const ProcessID root = hasher(key) % world_.size();
1443  ProcessID parent = -1, child0 = -1, child1 = -1;
1444  world_.mpi.binary_tree_info(root, parent, child0, child1);
1445 
1446  // Reduce the data
1448  reduce_internal<AllReduceTag>(parent, child0, child1, root,
1449  key, value, op);
1450 
1451  if(world_.rank() != root)
1453 
1454  // Broadcast the result of the reduction to all processes
1455  bcast_internal<AllReduceTag>(key, reduce_result, root);
1456 
1457  return reduce_result;
1458  }
1459 
1460  /// Distributed, group all reduce
1461 
1462  /// The reduce functor must have the following signature:
1463  /// \code
1464  /// class ReduceFunc {
1465  /// public:
1466  /// // Typedefs
1467  /// typedef ... result_type;
1468  /// typedef ... argument_type;
1469  ///
1470  /// // Constructors
1471  /// ReduceFunc(const ReduceFunc&);
1472  ///
1473  /// // Initialization operation, which returns a default result object
1474  /// result_type operator()() const;
1475  ///
1476  /// // Reduce two result objects
1477  /// void operator()(result_type&, const result_type&) const;
1478  ///
1479  /// // Reduce a result object and an argument object
1480  /// void operator()(result_type&, const argument_type&) const;
1481  /// }; // class ReduceFunc
1482  /// \endcode
1483  /// \tparam keyT The key type
1484  /// \tparam valueT The data type to be reduced
1485  /// \tparam opT The reduction operation type
1486  /// \param key The key associated with this reduction
1487  /// \param value The local value to be reduced
1488  /// \param op The reduction operation to be applied to local and remote data
1489  /// \param group The group that will preform the reduction
1490  /// \return A future to the reduce value on the root process, otherwise an
1491  /// uninitialized future that may be ignored
1492  /// \throw madness::Exception When \c group is empty
1493  /// \throw madness::Exception When \c group is not registered
1494  /// \throw madness::Exception When the world id of \c group is not
1495  /// equal to that of the world used to construct this object
1496  /// \throw madness::Exception When this process is not in the group
1497  /// \note It is the user's responsibility to ensure that \c key does not
1498  /// conflict with other calls to \c reduce. Keys may be reused after
1499  /// the associated operation has finished.
1500  template <typename keyT, typename valueT, typename opT>
1502  all_reduce(const keyT& key, const valueT& value, const opT& op, const Group& group) {
1503  MADNESS_ASSERT(! group.empty());
1504  MADNESS_ASSERT(group.get_world().id() == world_.id());
1505 
1506  // Compute the parent and child processes of this process in a binary tree.
1507  Hash<keyT> hasher;
1508  const ProcessID group_root = hasher(key) % group.size();
1509  ProcessID parent = -1, child0 = -1, child1 = -1;
1510  group.make_tree(group_root, parent, child0, child1);
1511 
1512  // Reduce the data
1514  reduce_internal<GroupAllReduceTag>(parent, child0, child1,
1515  group_root, key, value, op);
1516 
1517 
1518  if(group.rank() != group_root)
1520 
1521  // Broadcast the result of the reduction to all processes in the group
1522  bcast_internal<GroupAllReduceTag>(key, reduce_result, 0, group);
1523 
1524  return reduce_result;
1525  }
1526  }; // class WorldGopInterface
1527 
1528 } // namespace madness
1529 
1530 #endif // MADNESS_WORLD_WORLDGOP_H__INCLUDED
Implements an archive wrapping a memory buffer.
void binary_tree_info(int root, int &parent, int &child0, int &child1)
Construct info about a binary tree with given root.
Definition: safempi.cc:39
int Get_rank() const
Definition: safempi.h:714
static int unique_tag_period()
Definition: safempi.h:836
int unique_tag()
Returns a unique tag for temporary use (1023<tag<4095)
Definition: safempi.h:830
Definition: safempi.h:289
World active message that extends an RMI message.
Definition: worldam.h:80
The class used for callbacks (e.g., dependency tracking).
Definition: dependency_interface.h:61
T & get(bool dowork=true) &
Gets the value, waiting if necessary.
Definition: future.h:574
static const Future< T > default_initializer()
See "Gotchas" on Futures about why this exists and how to use it.
Definition: future.h:462
bool probe() const
Check whether this future has been assigned.
Definition: future.h:631
A collection of processes.
Definition: group.h:50
void remote_update() const
Update remote usage count.
Definition: group.h:383
void local_update() const
Update local usage count.
Definition: group.h:369
ProcessID size() const
Group size accessor.
Definition: group.h:429
const DistributedID & id() const
Group id accessor.
Definition: group.h:396
World & get_world() const
Parent world accessor.
Definition: group.h:404
bool empty() const
Quary empty group.
Definition: group.h:391
ProcessID rank() const
Group rank accessor.
Definition: group.h:412
static madness::Future< Group > get_group(const DistributedID &did)
Get group from the registry.
Definition: group.cc:90
void make_tree(const ProcessID group_root, ProcessID &parent, ProcessID &child1, ProcessID &child2) const
Compute the binary tree parents and children.
Definition: group.h:449
Key object that includes the process information.
Definition: distributed_id.h:80
static const attrT ATTR_UNORDERED
Definition: worldrmi.h:180
Key object that uses a tag to differentiate keys.
Definition: distributed_id.h:177
static TaskAttributes hipri()
Definition: thread.h:450
void send(ProcessID dest, am_handlerT op, const AmArg *arg, const int attr=RMI::ATTR_ORDERED)
Sends a managed non-blocking active message.
Definition: worldam.h:278
Delayed send callback object.
Definition: worldgop.h:174
DelayedSend(World &world, const ProcessID dest, const keyT &key, const Future< valueT > &value)
Constructor.
Definition: worldgop.h:188
virtual ~DelayedSend()
Definition: worldgop.h:193
World & world_
The communication world.
Definition: worldgop.h:176
const ProcessID dest_
The destination process id.
Definition: worldgop.h:177
Future< valueT > value_
The data to be sent.
Definition: worldgop.h:179
const keyT key_
The distributed id associated with value_.
Definition: worldgop.h:178
DelayedSend< keyT, valueT > & operator=(const DelayedSend< keyT, valueT > &)
DelayedSend(const DelayedSend< keyT, valueT > &)
virtual void notify()
Notify this object that the future has been set.
Definition: worldgop.h:199
Provides collectives that interoperate with the AM and task interfaces.
Definition: worldgop.h:145
int max_reducebcast_msg_size() const
Returns the maximum size of messages (in bytes) sent by reduce and broadcast.
Definition: worldgop.h:695
void lazy_sync(const keyT &key, const opT &op, const Group &group) const
Group lazy sync.
Definition: worldgop.h:1215
void send_internal(ProcessID dest, const keyT &key, const Future< valueT > &value) const
Send value to dest.
Definition: worldgop.h:249
void max(T *buf, size_t nelem)
Inplace global max while still processing AM & tasks.
Definition: worldgop.h:882
static void bcast_handler(const AmArg &arg)
Definition: worldgop.h:352
void lazy_sync(const keyT &key, const opT &op) const
Lazy sync.
Definition: worldgop.h:1169
static Future< valueT > recv(const ProcessID source, const keyT &key)
Receive data from source.
Definition: worldgop.h:1122
World & world_
World object that this is a part of.
Definition: worldgop.h:147
int set_max_reducebcast_msg_size(int sz)
Set the maximum size of messages (in bytes) sent by reduce and broadcast.
Definition: worldgop.h:685
Future< typename detail::result_of< opT >::type > reduce(const keyT &key, const valueT &value, const opT &op, const ProcessID root)
Distributed reduce.
Definition: worldgop.h:1332
std::shared_ptr< detail::DeferredCleanup > deferred_
Deferred cleanup object.
Definition: worldgop.h:148
void reduce(T *buf, std::size_t nelem, opT op)
Inplace global reduction (like MPI all_reduce) while still processing AM & tasks.
Definition: worldgop.h:783
void broadcast(T &t)
Broadcast of a scalar from node 0 to all other nodes.
Definition: worldgop.h:740
~WorldGopInterface()
Definition: worldgop.h:660
void broadcast_serializable(objT &obj, ProcessID root)
Broadcast a serializable object.
Definition: worldgop.h:754
void lazy_sync_internal(const ProcessID parent, const ProcessID child0, const ProcessID child1, const keyT &key, const opT &op) const
Start a distributed lazy sync operation.
Definition: worldgop.h:315
Future< typename detail::result_of< opT >::type > all_reduce(const keyT &key, const valueT &value, const opT &op, const Group &group)
Distributed, group all reduce.
Definition: worldgop.h:1502
void sum(T &a)
Global sum of a scalar while still processing AM & tasks.
Definition: worldgop.h:931
Future< typename detail::result_of< opT >::type > reduce(const keyT &key, const valueT &value, const opT &op, const ProcessID group_root, const Group &group)
Distributed group reduce.
Definition: worldgop.h:1388
int max_reducebcast_msg_size_
maximum size of messages (in bytes) sent by reduce and broadcast
Definition: worldgop.h:151
void fence(bool debug=false)
Synchronizes all processes in communicator AND globally ensures no pending AM or tasks.
Definition: worldgop.cc:161
void broadcast(void *buf, size_t nbyte, ProcessID root, bool dowork=true, Tag bcast_tag=-1)
Broadcasts bytes from process root while still processing AM & tasks.
Definition: worldgop.cc:173
static detail::result_of< opT >::type reduce_result_task(const std::vector< Future< typename detail::result_of< opT >::type > > &results, const opT &op)
Definition: worldgop.h:559
void bit_and(T *buf, size_t nelem)
Definition: worldgop.h:905
void bcast_internal(const keyT &key, Future< valueT > &value, const ProcessID root) const
Broadcast.
Definition: worldgop.h:471
void lazy_sync_parent(const ProcessID parent, const keyT &key, const ProcessID, const ProcessID) const
Lazy sync parent task.
Definition: worldgop.h:279
void absmin(T *buf, size_t nelem)
Inplace global absmin while still processing AM & tasks.
Definition: worldgop.h:888
void bit_or(T *buf, size_t nelem)
Definition: worldgop.h:910
WorldGopInterface(World &world)
Definition: worldgop.h:656
bool set_forbid_fence(bool value)
Set forbid_fence flag to new value and return old value.
Definition: worldgop.h:674
void bcast(const keyT &key, Future< valueT > &value, const ProcessID group_root, const Group &group) const
Group broadcast.
Definition: worldgop.h:1283
void bcast_task(const keyT &key, const valueT &value, const ProcessID root) const
Broadcast task.
Definition: worldgop.h:391
void group_bcast_task(const keyT &key, const valueT &value, const ProcessID group_root, const Group &group) const
Definition: worldgop.h:422
void send(const ProcessID dest, const keyT &key, const valueT &value) const
Send value to dest.
Definition: worldgop.h:1137
void logic_or(T *buf, size_t nelem)
Definition: worldgop.h:925
void bcast(const keyT &key, Future< valueT > &value, const ProcessID root) const
Broadcast.
Definition: worldgop.h:1251
int initial_max_reducebcast_msg_size()
Definition: worldgop.h:629
void serial_invoke(std::function< void()> action)
Executes an action on single (this) thread after ensuring all other work is done.
Definition: worldgop.cc:165
static void group_bcast_handler(const AmArg &arg)
Definition: worldgop.h:367
bool forbid_fence_
forbid calling fence() in case of several active worlds
Definition: worldgop.h:150
void absmax(T *buf, size_t nelem)
Inplace global absmax while still processing AM & tasks.
Definition: worldgop.h:894
Future< typename detail::result_of< opT >::type > all_reduce(const keyT &key, const valueT &value, const opT &op)
Distributed all reduce.
Definition: worldgop.h:1439
void broadcast(T *buf, size_t nelem, ProcessID root)
Broadcasts typed contiguous data from process root while still processing AM & tasks.
Definition: worldgop.h:734
std::vector< T > concat0(const std::vector< T > &v, size_t bufsz=1024 *1024)
Concatenate an STL vector of serializable stuff onto node 0.
Definition: worldgop.h:953
void fence_impl(std::function< void()> epilogue=[]{}, bool pause_during_epilogue=false, bool debug=false)
Implementation of fence.
Definition: worldgop.cc:50
void product(T *buf, size_t nelem)
Inplace global product while still processing AM & tasks.
Definition: worldgop.h:900
void min(T *buf, size_t nelem)
Inplace global min while still processing AM & tasks.
Definition: worldgop.h:876
void logic_and(T *buf, size_t nelem)
Definition: worldgop.h:920
static detail::result_of< opT >::type reduce_task(const valueT &value, const opT &op)
Definition: worldgop.h:551
static Future< valueT > recv_internal(const keyT &key)
Receive data from remote node.
Definition: worldgop.h:212
std::enable_if<!is_future< valueT >::value >::type send_internal(const ProcessID dest, const keyT &key, const valueT &value) const
Send value to dest.
Definition: worldgop.h:226
Future< typename detail::result_of< opT >::type > reduce_internal(const ProcessID parent, const ProcessID child0, const ProcessID child1, const ProcessID root, const keyT &key, const valueT &value, const opT &op)
Distributed reduce.
Definition: worldgop.h:583
bool debug_
Debug mode.
Definition: worldgop.h:149
void min(T &a)
Global min of a scalar while still processing AM & tasks.
Definition: worldgop.h:943
void max(T &a)
Global max of a scalar while still processing AM & tasks.
Definition: worldgop.h:937
void barrier()
Synchronizes all processes in communicator ... does NOT fence pending AM or tasks.
Definition: worldgop.h:700
void lazy_sync_children(const ProcessID child0, const ProcessID child1, const keyT &key, opT &op, const ProcessID) const
Lazy sync parent task.
Definition: worldgop.h:297
void bcast_internal(const keyT &key, Future< valueT > &value, const ProcessID group_root, const Group &group) const
Group broadcast.
Definition: worldgop.h:521
void sum(T *buf, size_t nelem)
Inplace global sum while still processing AM & tasks.
Definition: worldgop.h:870
void bit_xor(T *buf, size_t nelem)
Definition: worldgop.h:915
bool set_debug(bool value)
Set debug flag to new value and return old value.
Definition: worldgop.h:667
void broadcast(T &t, ProcessID root)
Broadcast of a scalar from node root to all other nodes.
Definition: worldgop.h:746
std::enable_if<!std::is_pointer< T >::value, SafeMPI::Request >::type Isend(const T &datum, int dest, int tag=SafeMPI::DEFAULT_SEND_RECV_TAG) const
Isend one element.
Definition: worldmpi.h:308
SafeMPI::Request Irecv(T *buf, int count, int source, int tag=SafeMPI::DEFAULT_SEND_RECV_TAG) const
Async receive data of up to count elements from process source.
Definition: worldmpi.h:321
void Send(const T *buf, long lenbuf, int dest, int tag=SafeMPI::DEFAULT_SEND_RECV_TAG) const
Send array of lenbuf elements to process dest.
Definition: worldmpi.h:347
void Recv(T *buf, long lenbuf, int src, int tag) const
Receive data of up to lenbuf elements from process src.
Definition: worldmpi.h:374
void add(TaskInterface *t)
Add a new local task, taking ownership of the pointer.
Definition: world_task_queue.h:466
A parallel world class.
Definition: world.h:132
WorldTaskQueue & taskq
Task queue.
Definition: world.h:204
ProcessID rank() const
Returns the process rank in this World (same as MPI_Comm_rank()).
Definition: world.h:318
static void await(SafeMPI::Request &request, bool dowork=true)
Wait for a MPI request to complete.
Definition: world.h:516
WorldMpiInterface & mpi
MPI interface.
Definition: world.h:202
ProcessID size() const
Returns the number of processes in this World (same as MPI_Comm_size()).
Definition: world.h:328
unsigned long id() const
Definition: world.h:313
WorldGopInterface & gop
Global operations.
Definition: world.h:205
WorldAmInterface & am
AM interface.
Definition: world.h:203
Wraps an archive around a memory buffer for input.
Definition: buffer_archive.h:134
Wraps an archive around a memory buffer for output.
Definition: buffer_archive.h:59
std::size_t size() const
Return the amount of data stored (counted) in the buffer.
Definition: buffer_archive.h:123
Deferred cleanup of shared_ptr's.
Definition: deferred_cleanup.h:60
Distributed caching utility.
Definition: dist_cache.h:54
static void get_cache_value(const keyT &key, madness::Future< valueT > &value)
Get the cache value accosted with key.
Definition: dist_cache.h:185
static void set_cache_value(const keyT &key, const valueT &value)
Set the cache value accosted with key.
Definition: dist_cache.h:146
static bool debug
Definition: dirac-hatom.cc:16
static double function(const coord_3d &r)
Normalized gaussian.
Definition: functionio.cc:100
auto T(World &world, response_space &f) -> response_space
Definition: global_functions.cc:34
Tensor< typename Tensor< T >::scalar_type > arg(const Tensor< T > &t)
Return a new tensor holding the argument of each element of t (complex types only)
Definition: tensor.h:2502
static const double v
Definition: hatom_sf_dirac.cc:20
Tensor< double > op(const Tensor< double > &x)
Definition: kain.cc:508
#define max(a, b)
Definition: lda.h:51
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition: madness_exception.h:134
Intracomm COMM_WORLD
Definition: safempi.cc:67
typename make_void< Ts... >::type void_t
Definition: meta.h:27
File holds all helper structures necessary for the CC_Operator and CC2 class.
Definition: DFParameters.h:10
std::pair< uniqueidT, std::size_t > DistributedID
Distributed ID which is used to identify objects.
Definition: distributed_id.h:48
double abs(double x)
Definition: complexfun.h:48
AmArg * new_am_arg(const argT &... args)
Convenience template for serializing arguments into a new AmArg.
Definition: worldam.h:194
bool quiet()
Check if the MADNESS runtime was initialized for quiet operation.
Definition: world.cc:77
void error(const char *msg)
Definition: world.cc:139
std::string type(const PairType &n)
Definition: PNOParameters.h:18
std::uint64_t cstr_to_memory_size(const char *str)
Unit-aware conversion of a C string to a size_t.
Definition: units.cc:14
void swap(Vector< T, N > &l, Vector< T, N > &r)
Swap the contents of two Vectors.
Definition: vector.h:497
AmArg * copy_am_arg(const AmArg &arg)
Definition: worldam.h:170
static const double b
Definition: nonlinschro.cc:119
static const double a
Definition: nonlinschro.cc:118
int posix_memalign(void **memptr, std::size_t alignment, std::size_t size)
Definition: posixmem.h:44
Definition: worldgop.h:86
T operator()(const T &a, const T &b) const
Definition: worldgop.h:87
Definition: worldgop.h:100
T operator()(const T &a, const T &b) const
Definition: worldgop.h:101
Definition: worldgop.h:107
T operator()(const T &a, const T &b) const
Definition: worldgop.h:108
Definition: worldgop.h:114
T operator()(const T &a, const T &b) const
Definition: worldgop.h:115
Definition: worldgop.h:121
T operator()(const T &a, const T &b) const
Definition: worldgop.h:122
Definition: worldgop.h:159
Definition: worldgop.h:157
Definition: worldgop.h:161
Definition: worldgop.h:128
T operator()(const T &a, const T &b) const
Definition: worldgop.h:129
Definition: worldgop.h:135
T operator()(const T &a, const T &b) const
Definition: worldgop.h:136
Definition: worldgop.h:79
T operator()(const T &a, const T &b) const
Definition: worldgop.h:80
Definition: worldgop.h:93
T operator()(const T &a, const T &b) const
Definition: worldgop.h:94
Definition: worldgop.h:72
T operator()(const T &a, const T &b) const
Definition: worldgop.h:73
Definition: worldgop.h:65
T operator()(const T &a, const T &b) const
Definition: worldgop.h:66
fnT::result_type type
Definition: function_traits.h:97
T type
Type with Future removed.
Definition: type_traits.h:110
#define MPI_INT
Definition: stubmpi.h:78
#define MPI_BYTE
Definition: stubmpi.h:74
std::pair< int, double > valueT
Definition: test_binsorter.cc:6
double source(const coordT &r)
Definition: testperiodic.cc:48
const char * status[2]
Definition: testperiodic.cc:43
Declares the World class for the parallel runtime environment.
Defines TaskInterface and implements WorldTaskQueue and associated stuff.
Defines types used by the parallel runtime.
int ProcessID
Used to clearly identify process number/rank.
Definition: worldtypes.h:43
int Tag
Used to clearly identify message tag/type.
Definition: worldtypes.h:44