MADNESS  0.10.1
worldam.h
Go to the documentation of this file.
1 /*
2  This file is part of MADNESS.
3 
4  Copyright (C) 2007,2010 Oak Ridge National Laboratory
5 
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program; if not, write to the Free Software
18  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 
20  For more information please contact:
21 
22  Robert J. Harrison
23  Oak Ridge National Laboratory
24  One Bethel Valley Road
25  P.O. Box 2008, MS-6367
26 
27  email: harrisonrj@ornl.gov
28  tel: 865-241-3937
29  fax: 865-572-0680
30  */
31 
32 #ifndef MADNESS_WORLD_WORLDAM_H__INCLUDED
33 #define MADNESS_WORLD_WORLDAM_H__INCLUDED
34 
35 /// \file worldam.h
36 /// \brief Implements active message layer for World on top of RMI layer
37 
39 #include <madness/world/worldrmi.h>
40 #include <madness/world/world.h>
41 #include <vector>
42 #include <cstddef>
43 #include <memory>
44 #include <pthread.h>
45 
46 namespace madness {
47 
48  /*
49  The RMI layer just does transport and does not know about World
50  or even necessarily about MPI. It also has no buffering or
51  virtualization of resources. In particular, we must be careful
52  about having too many outstanding sends and active message
53  handlers must be careful about what they do in order to avoid
54  deadlock --- especially problematic is a handler trying to send
55  a message.
56 
57  The WorldAM class provides a World-aware RMI capability that
58  limits the number of outstanding sends and can optionally manage
59  buffers. The issue of what handlers can do safely is handled by
60  the integration of WorldAM and the task queues ... if you have
61  an operation that might
62 
63  - send messages
64 
65  - take a long time
66 
67  - consume a lot of stack/heap (e.g., recursive algorithm)
68 
69  then the right thing to do is to send a task rather than
70  an active message.
71  */
72 
73  template <class Derived> class WorldObject;
74 
75  class AmArg;
76  /// Type of AM handler functions
77  typedef void (*am_handlerT)(const AmArg&);
78 
79  /// World active message that extends an RMI message
80  class AmArg {
81  private:
82  friend class WorldAmInterface;
83  template <class Derived> friend class WorldObject;
84 
85  friend AmArg* alloc_am_arg(std::size_t nbyte);
86 
87  unsigned char header[RMI::HEADER_LEN]; // !!!!!!!!! MUST BE FIRST !!!!!!!!!!
88  std::size_t nbyte; // Size of user payload
89  std::uint64_t worldid; // Id of associated world
90  std::ptrdiff_t func; // User function to call, as a relative fn ptr (see archive::to_rel_fn_ptr)
91  ProcessID src; // Rank of process sending the message
92  unsigned int flags; // Misc. bit flags
93 
94  // On 32 bit machine AmArg is HEADER_LEN+4+4+4+4+4=84 bytes
95  // On 64 bit machine AmArg is HEADER_LEN+8+8+8+4+4=96 bytes
96 
97  // No copy constructor or assignment
98  AmArg(const AmArg&);
99  AmArg& operator=(const AmArg&);
100 
102 
103  void set_worldid(unsigned long id) { worldid = id; }
104 
105  void set_func(am_handlerT handler) {
106  MADNESS_ASSERT(handler);
107  func = archive::to_rel_fn_ptr(handler);
108  }
109 
110  void set_size(std::size_t numbyte) { nbyte = numbyte; }
111 
112  void set_pending() { flags |= 0x1ul; }
113 
114  bool is_pending() const { return flags & 0x1ul; }
115 
116  void clear_flags() { flags = 0; }
117 
118  am_handlerT get_func() const { return archive::to_abs_fn_ptr<am_handlerT>(func); }
119 
122  }
123 
126  }
127 
128  public:
129  AmArg() {}
130 
131  /// Returns a pointer to the user's payload (aligned in same way as AmArg)
132  unsigned char* buf() const { return (unsigned char*)(this) + sizeof(AmArg); }
133 
134  /// Returns the size of the user's payload
135  std::size_t size() const { return nbyte; }
136 
137  /// Used to deserialize arguments from incoming message
138  template <typename T>
140  return make_input_arch() & t;
141  }
142 
143  /// Used to serialize arguments into outgoing message
144  template <typename T>
146  return make_output_arch() & t;
147  }
148 
149  /// For incoming AM gives the source process
150  ProcessID get_src() const { return src; }
151 
152  // This is not inline in order to keep World opaque.
153  /// For incoming AM gives the associated world
155 
156  /// Return the world id
157  std::uint64_t get_worldid() const { return worldid; }
158  };
159 
160 
161  /// Allocates a new AmArg with nbytes of user data ... delete with free_am_arg
162  inline AmArg* alloc_am_arg(std::size_t nbyte) {
163  std::size_t narg = 1 + (nbyte+sizeof(AmArg)-1)/sizeof(AmArg);
164  AmArg *arg = new AmArg[narg];
165  arg->set_size(nbyte);
166  return arg;
167  }
168 
169 
170  inline AmArg* copy_am_arg(const AmArg& arg) {
171  AmArg* r = alloc_am_arg(arg.size());
172  memcpy(reinterpret_cast<void*>(r), &arg, arg.size()+sizeof(AmArg));
173  return r;
174  }
175 
176  /// Frees an AmArg allocated with alloc_am_arg
177  inline void free_am_arg(AmArg* arg) {
178  //std::cout << " freeing amarg " << (void*)(arg) << " " << pthread_self() << std::endl;
179  delete [] arg;
180  }
181 
182  /// Terminate argument serialization
183  template <typename Archive>
184  inline void serialize_am_args(Archive&&) { }
185 
186  /// Argument serialization
187  template <typename Archive, typename T, typename... argT>
188  inline void serialize_am_args(Archive&& archive, T&& t, argT&&... args) {
189  serialize_am_args(archive & t, std::forward<argT>(args)...);
190  }
191 
192  /// Convenience template for serializing arguments into a new AmArg
193  template <typename... argT>
194  inline AmArg* new_am_arg(const argT&... args) {
195  // compute size
197  serialize_am_args(count, args...);
198 
199  // Serialize arguments
200  AmArg* am_args = alloc_am_arg(count.size());
201  serialize_am_args(*am_args, args...);
202  return am_args;
203  }
204 
205 
206  /// Implements AM interface
208  friend class WorldGopInterface;
209  friend class World;
210  private:
211 
212 #ifdef HAVE_CRAYXT
213  static const int DEFAULT_NSEND = 512;
214 #else
215  static const int DEFAULT_NSEND = 128;
216 #endif
217 
218  class SendReq : public SPINLOCK_TYPE, public RMISendReq {
221  void free() {if (buf) {free_am_arg(buf); buf=0;}}
222  public:
223  SendReq() : buf(0) {}
224  SendReq(AmArg* b, const RMI::Request& r) : buf(b), req(r) {} // lock is NOT set
225  void set(AmArg* b, const RMI::Request& r) {buf=b; req=r;} // assumes lock held
226  bool TestAndFree() { // assumes lock held if necessary
227  if (buf) {
228  bool ok = req.Test();
229  if (ok) free();
230  return ok;
231  }
232  else {
233  return true;
234  }
235  }
236  ~SendReq() {free();}
237  };
238 
239  // Multiple threads are making their way thru here ... must be careful
240  // to ensure updates are atomic and consistent
241 
242  int nsend; ///< Max no. of pending sends
243  std::unique_ptr<SendReq []> send_req; ///< Send requests and managed buffers
244  unsigned long worldid; ///< The world which contains this instance of WorldAmInterface
246  const int nproc;
247  // Next 3 were volatile but no need since protected by spinlock with implied barriers/fence
248  int cur_msg; ///< Index of next buffer to attempt to use
249  unsigned long nsent; ///< Counts no. of AM sent for purpose of termination detection
250  unsigned long nrecv; ///< Counts no. of AM received for purpose of termination detection
251 
252  std::vector<int> map_to_comm_world; ///< Maps rank in current MPI communicator to SafeMPI::COMM_WORLD
253 
254  /// This handles all incoming RMI messages for all instances
255  static void handler(void *buf, std::size_t nbyte) {
256  // It will be singled threaded since only the RMI receiver
257  // thread will invoke it ... however note that nrecv will
258  // be read by the main thread during fence operations.
259  AmArg* arg = static_cast<AmArg*>(buf);
260  am_handlerT func = arg->get_func();
261  World* w = arg->get_world();
262  MADNESS_ASSERT(arg->size() + sizeof(AmArg) == nbyte);
263  MADNESS_ASSERT(w);
265  func(*arg);
266  w->am.nrecv++; // Must be AFTER execution of the function
267  }
268 
269  public:
270  WorldAmInterface(World& world);
271 
272  virtual ~WorldAmInterface();
273 
274  /// Currently a noop
275  void fence() {}
276 
277  /// Sends a managed non-blocking active message
278  void send(ProcessID dest, am_handlerT op, const AmArg* arg,
279  const int attr=RMI::ATTR_ORDERED)
280  {
281  // Setup the header
282  {
283  AmArg* argx = const_cast<AmArg*>(arg);
284 
285  argx->set_worldid(worldid);
286  argx->set_src(rank);
287  argx->set_func(op);
288  argx->clear_flags(); // Is this the right place for this?
289  }
290 
291  // Sanity check
292  MADNESS_ASSERT(arg->get_world());
293  MADNESS_ASSERT(arg->get_func());
294 
295  // Map dest from world's communicator to comm_world
296  dest = map_to_comm_world[dest];
297 
298  // Remaining code refactored to avoid blocking with lock
299  // and to enable finer grained calls into MPI send
300 
302 
303  // The server thread may be executing a handler that
304  // is trying to send a message (e.g., assigning a
305  // remote future). However, the server must not block
306  // in here. It also needs to send messages in order,
307  // thus it puts all send_reqs onto a queue which it
308  // processes in its main loop using the RMI::send
309  // interface.
310 
311  lock(); nsent++; unlock(); // This world must still keep track of messages
312 
313  RMI::send_req.emplace_back(std::make_unique<SendReq>((AmArg*)(arg), RMI::isend(arg, arg->size()+sizeof(AmArg), dest, handler, attr)));
314 
315  //std::cout << "sending message from server " << (void*)(arg) << " " << pthread_self() << " " << p << std::endl;
316 
317  return;
318  }
319 
320 
321  // Find a free buffer oldest first (in order to assist
322  // with flow control). Exit loop with a lock on buffer.
323 
324  // This design will need nsend >= nthreads
325  int i=-1;
326  while (i == -1) {
327  lock(); // << Protect cur_msg and nsent;
328  if (send_req[cur_msg].try_lock()) { // << matching unlock at end of routine
329  i = cur_msg;
330  cur_msg = (cur_msg + 1) % nsend;
331  nsent++;
332  }
333  unlock(); // << Protect cur_msg and nsent;
334  }
335 
336 
337  // If the buffer is still in-use wait for it to complete
338  while (!send_req[i].TestAndFree()) {
339  // If the oldest message has still not completed then
340  // there is likely severe network or end-point
341  // congestion, so pause for 100us in a rather
342  // arbitrary attempt to decrease the injection rate.
343  // Both the server thread and this call to Test()
344  // should ensure progress.
345  myusleep(100);
346  }
347 
348  // Buffer is now free but still locked by me
349  send_req[i].set((AmArg*)(arg), RMI::isend(arg, arg->size()+sizeof(AmArg), dest, handler, attr));
350  send_req[i].unlock(); // << matches try_lock above
351  }
352 
353  /// Frees as many send buffers as possible, returning the number that are free
355  int nfree = 0;
356  for (int i=0; i<nsend; i++) {
357  if (send_req[i].try_lock()) { // Someone may be trying to put a message into this buffer
358  if (send_req[i].TestAndFree()) nfree++;
359  send_req[i].unlock(); // matching unlock
360  }
361  }
362  return nfree;
363  }
364 
365  };
366 }
367 
368 #endif // MADNESS_WORLD_WORLDAM_H__INCLUDED
double w(double t, double eps)
Definition: DKops.h:22
Implements an archive wrapping a memory buffer.
Definition: safempi.h:289
bool Test(MPI_Status &status)
Definition: safempi.h:409
World active message that extends an RMI message.
Definition: worldam.h:80
std::uint64_t worldid
Definition: worldam.h:89
unsigned char header[RMI::HEADER_LEN]
Definition: worldam.h:87
std::size_t nbyte
Definition: worldam.h:88
archive::BufferOutputArchive make_output_arch() const
Definition: worldam.h:124
archive::BufferOutputArchive operator&(const T &t) const
Used to serialize arguments into outgoing message.
Definition: worldam.h:145
std::size_t size() const
Returns the size of the user's payload.
Definition: worldam.h:135
void set_src(ProcessID source)
Definition: worldam.h:101
bool is_pending() const
Definition: worldam.h:114
friend AmArg * alloc_am_arg(std::size_t nbyte)
Allocates a new AmArg with nbytes of user data ... delete with free_am_arg.
Definition: worldam.h:162
ProcessID src
Definition: worldam.h:91
AmArg(const AmArg &)
unsigned char * buf() const
Returns a pointer to the user's payload (aligned in same way as AmArg)
Definition: worldam.h:132
std::ptrdiff_t func
Definition: worldam.h:90
AmArg()
Definition: worldam.h:129
archive::BufferInputArchive operator&(T &t) const
Used to deserialize arguments from incoming message.
Definition: worldam.h:139
void set_worldid(unsigned long id)
Definition: worldam.h:103
AmArg & operator=(const AmArg &)
archive::BufferInputArchive make_input_arch() const
Definition: worldam.h:120
am_handlerT get_func() const
Definition: worldam.h:118
ProcessID get_src() const
For incoming AM gives the source process.
Definition: worldam.h:150
unsigned int flags
Definition: worldam.h:92
void set_size(std::size_t numbyte)
Definition: worldam.h:110
World * get_world() const
For incoming AM gives the associated world.
Definition: worldam.h:154
void set_func(am_handlerT handler)
Definition: worldam.h:105
void clear_flags()
Definition: worldam.h:116
void set_pending()
Definition: worldam.h:112
std::uint64_t get_worldid() const
Return the world id.
Definition: worldam.h:157
Mutex using pthread mutex operations.
Definition: worldmutex.h:131
void unlock() const
Free a mutex owned by this thread.
Definition: worldmutex.h:165
bool try_lock() const
Try to acquire the mutex ... return true on success, false on failure.
Definition: worldmutex.h:150
void lock() const
Acquire the mutex waiting if necessary.
Definition: worldmutex.h:155
static bool get_this_thread_is_server()
Definition: worldrmi.h:186
static const size_t HEADER_LEN
Definition: worldrmi.h:179
static const attrT ATTR_ORDERED
Definition: worldrmi.h:181
static std::list< std::unique_ptr< RMISendReq > > send_req
Definition: worldrmi.h:188
static Request isend(const void *buf, size_t nbyte, ProcessID dest, rmi_handlerT func, unsigned int attr=ATTR_UNORDERED)
Send a remote method invocation (again you should probably be looking at worldam.h instead)
Definition: worldrmi.h:355
Definition: worldam.h:218
void free()
Definition: worldam.h:221
SendReq(AmArg *b, const RMI::Request &r)
Definition: worldam.h:224
RMI::Request req
Definition: worldam.h:220
SendReq()
Definition: worldam.h:223
bool TestAndFree()
Definition: worldam.h:226
void set(AmArg *b, const RMI::Request &r)
Definition: worldam.h:225
~SendReq()
Definition: worldam.h:236
AmArg * buf
Definition: worldam.h:219
Implements AM interface.
Definition: worldam.h:207
const int nproc
Definition: worldam.h:246
unsigned long worldid
The world which contains this instance of WorldAmInterface.
Definition: worldam.h:244
int cur_msg
Index of next buffer to attempt to use.
Definition: worldam.h:248
std::vector< int > map_to_comm_world
Maps rank in current MPI communicator to SafeMPI::COMM_WORLD.
Definition: worldam.h:252
const ProcessID rank
Definition: worldam.h:245
virtual ~WorldAmInterface()
Definition: worldam.cc:85
WorldAmInterface(World &world)
Definition: worldam.cc:41
unsigned long nsent
Counts no. of AM sent for purpose of termination detection.
Definition: worldam.h:249
static const int DEFAULT_NSEND
Definition: worldam.h:215
void fence()
Currently a noop.
Definition: worldam.h:275
std::unique_ptr< SendReq[]> send_req
Send requests and managed buffers.
Definition: worldam.h:243
int free_managed_buffers()
Frees as many send buffers as possible, returning the number that are free.
Definition: worldam.h:354
void send(ProcessID dest, am_handlerT op, const AmArg *arg, const int attr=RMI::ATTR_ORDERED)
Sends a managed non-blocking active message.
Definition: worldam.h:278
int nsend
Max no. of pending sends.
Definition: worldam.h:242
unsigned long nrecv
Counts no. of AM received for purpose of termination detection.
Definition: worldam.h:250
static void handler(void *buf, std::size_t nbyte)
This handles all incoming RMI messages for all instances.
Definition: worldam.h:255
Provides collectives that interoperate with the AM and task interfaces.
Definition: worldgop.h:145
Implements most parts of a globally addressable object (via unique ID).
Definition: world_object.h:364
A parallel world class.
Definition: world.h:132
static World * world_from_id(std::uint64_t id)
Convert a World ID to a World pointer.
Definition: world.h:474
Wraps an archive around a memory buffer for input.
Definition: buffer_archive.h:134
Wraps an archive around a memory buffer for output.
Definition: buffer_archive.h:59
std::size_t size() const
Return the amount of data stored (counted) in the buffer.
Definition: buffer_archive.h:123
auto T(World &world, response_space &f) -> response_space
Definition: global_functions.cc:34
std::ptrdiff_t to_rel_fn_ptr(const T &fn)
converts function or (free or static member) function pointer to the relative function pointer
Definition: archive.h:237
Tensor< typename Tensor< T >::scalar_type > arg(const Tensor< T > &t)
Return a new tensor holding the argument of each element of t (complex types only)
Definition: tensor.h:2502
Tensor< double > op(const Tensor< double > &x)
Definition: kain.cc:508
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition: madness_exception.h:134
File holds all helper structures necessary for the CC_Operator and CC2 class.
Definition: DFParameters.h:10
AmArg * new_am_arg(const argT &... args)
Convenience template for serializing arguments into a new AmArg.
Definition: worldam.h:194
static void myusleep(unsigned int us)
Sleep or spin for specified number of microseconds.
Definition: timers.h:164
std::shared_ptr< FunctionFunctorInterface< double, 3 > > func(new opT(g))
void free_am_arg(AmArg *arg)
Frees an AmArg allocated with alloc_am_arg.
Definition: worldam.h:177
void(* am_handlerT)(const AmArg &)
Type of AM handler functions.
Definition: worldam.h:77
void serialize_am_args(Archive &&)
Terminate argument serialization.
Definition: worldam.h:184
AmArg * alloc_am_arg(std::size_t nbyte)
Allocates a new AmArg with nbytes of user data ... delete with free_am_arg.
Definition: worldam.h:162
AmArg * copy_am_arg(const AmArg &arg)
Definition: worldam.h:170
static const double b
Definition: nonlinschro.cc:119
This for RMI server thread to manage lifetime of WorldAM messages that it is sending.
Definition: worldrmi.h:159
std::string ok(const bool b)
Definition: test6.cc:43
double source(const coordT &r)
Definition: testperiodic.cc:48
Declares the World class for the parallel runtime environment.
Lowest level API for sending active messages — you should probably be looking at worldam....
int ProcessID
Used to clearly identify process number/rank.
Definition: worldtypes.h:43