MADNESS 0.10.1
Go to the documentation of this file.
2 This file is part of MADNESS.
4 Copyright (C) 2007,2010 Oak Ridge National Laboratory
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 For more information please contact:
22 Robert J. Harrison
23 Oak Ridge National Laboratory
24 One Bethel Valley Road
25 P.O. Box 2008, MS-6367
27 email:
28 tel: 865-241-3937
29 fax: 865-572-0680
30 */
35/// \file worldam.h
36/// \brief Implements active message layer for World on top of RMI layer
40#include <madness/world/world.h>
41#include <vector>
42#include <cstddef>
43#include <memory>
44#include <pthread.h>
46namespace madness {
48 /*
49 The RMI layer just does transport and does not know about World
50 or even necessarily about MPI. It also has no buffering or
51 virtualization of resources. In particular, we must be careful
52 about having too many outstanding sends and active message
53 handlers must be careful about what they do in order to avoid
54 deadlock --- especially problematic is a handler trying to send
55 a message.
57 The WorldAM class provides a World-aware RMI capability that
58 limits the number of outstanding sends and can optionally manage
59 buffers. The issue of what handlers can do safely is handled by
60 the integration of WorldAM and the task queues ... if you have
61 an operation that might
63 - send messages
65 - take a long time
67 - consume a lot of stack/heap (e.g., recursive algorithm)
69 then the right thing to do is to send a task rather than
70 an active message.
71 */
73 template <class Derived> class WorldObject;
75 class AmArg;
76 /// Type of AM handler functions
77 typedef void (*am_handlerT)(const AmArg&);
79 /// World active message that extends an RMI message
80 class AmArg {
81 private:
82 friend class WorldAmInterface;
83 template <class Derived> friend class WorldObject;
85 friend AmArg* alloc_am_arg(std::size_t nbyte);
87 unsigned char header[RMI::HEADER_LEN]; // !!!!!!!!! MUST BE FIRST !!!!!!!!!!
88 std::size_t nbyte; // Size of user payload
89 std::uint64_t worldid; // Id of associated world
90 std::ptrdiff_t func; // User function to call, as a relative fn ptr (see archive::to_rel_fn_ptr)
91 ProcessID src; // Rank of process sending the message
92 unsigned int flags; // Misc. bit flags
94 // On 32 bit machine AmArg is HEADER_LEN+4+4+4+4+4=84 bytes
95 // On 64 bit machine AmArg is HEADER_LEN+8+8+8+4+4=96 bytes
97 // No copy constructor or assignment
98 AmArg(const AmArg&);
103 void set_worldid(unsigned long id) { worldid = id; }
105 void set_func(am_handlerT handler) {
106 MADNESS_ASSERT(handler);
107 func = archive::to_rel_fn_ptr(handler);
108 }
110 void set_size(std::size_t numbyte) { nbyte = numbyte; }
112 void set_pending() { flags |= 0x1ul; }
114 bool is_pending() const { return flags & 0x1ul; }
116 void clear_flags() { flags = 0; }
118 am_handlerT get_func() const { return archive::to_abs_fn_ptr<am_handlerT>(func); }
128 public:
129 AmArg() {}
131 /// Returns a pointer to the user's payload (aligned in same way as AmArg)
132 unsigned char* buf() const { return (unsigned char*)(this) + sizeof(AmArg); }
134 /// Returns the size of the user's payload
135 std::size_t size() const { return nbyte; }
137 /// Used to deserialize arguments from incoming message
138 template <typename T>
140 return make_input_arch() & t;
141 }
143 /// Used to serialize arguments into outgoing message
144 template <typename T>
146 return make_output_arch() & t;
147 }
149 /// For incoming AM gives the source process
150 ProcessID get_src() const { return src; }
152 // This is not inline in order to keep World opaque.
153 /// For incoming AM gives the associated world
156 /// Return the world id
157 std::uint64_t get_worldid() const { return worldid; }
158 };
161 /// Allocates a new AmArg with nbytes of user data ... delete with free_am_arg
162 inline AmArg* alloc_am_arg(std::size_t nbyte) {
163 std::size_t narg = 1 + (nbyte+sizeof(AmArg)-1)/sizeof(AmArg);
164 AmArg *arg = new AmArg[narg];
165 arg->set_size(nbyte);
166 return arg;
167 }
170 inline AmArg* copy_am_arg(const AmArg& arg) {
171 AmArg* r = alloc_am_arg(arg.size());
172 memcpy(reinterpret_cast<void*>(r), &arg, arg.size()+sizeof(AmArg));
173 return r;
174 }
176 /// Frees an AmArg allocated with alloc_am_arg
177 inline void free_am_arg(AmArg* arg) {
178 //std::cout << " freeing amarg " << (void*)(arg) << " " << pthread_self() << std::endl;
179 delete [] arg;
180 }
182 /// Terminate argument serialization
183 template <typename Archive>
184 inline void serialize_am_args(Archive&&) { }
186 /// Argument serialization
187 template <typename Archive, typename T, typename... argT>
188 inline void serialize_am_args(Archive&& archive, T&& t, argT&&... args) {
189 serialize_am_args(archive & t, std::forward<argT>(args)...);
190 }
192 /// Convenience template for serializing arguments into a new AmArg
193 template <typename... argT>
194 inline AmArg* new_am_arg(const argT&... args) {
195 // compute size
197 serialize_am_args(count, args...);
199 // Serialize arguments
200 AmArg* am_args = alloc_am_arg(count.size());
201 serialize_am_args(*am_args, args...);
202 return am_args;
203 }
206 /// Implements AM interface
208 friend class WorldGopInterface;
209 friend class World;
210 private:
212#ifdef HAVE_CRAYXT
213 static const int DEFAULT_NSEND = 512;
215 static const int DEFAULT_NSEND = 128;
218 class SendReq : public SPINLOCK_TYPE, public RMISendReq {
221 void free() {if (buf) {free_am_arg(buf); buf=0;}}
222 public:
223 SendReq() : buf(0) {}
224 SendReq(AmArg* b, const RMI::Request& r) : buf(b), req(r) {} // lock is NOT set
225 void set(AmArg* b, const RMI::Request& r) {buf=b; req=r;} // assumes lock held
226 bool TestAndFree() { // assumes lock held if necessary
227 if (buf) {
228 bool ok = req.Test();
229 if (ok) free();
230 return ok;
231 }
232 else {
233 return true;
234 }
235 }
237 };
239 // Multiple threads are making their way thru here ... must be careful
240 // to ensure updates are atomic and consistent
242 int nsend; ///< Max no. of pending sends
243 std::unique_ptr<SendReq []> send_req; ///< Send requests and managed buffers
244 unsigned long worldid; ///< The world which contains this instance of WorldAmInterface
246 const int nproc;
247 // Next 3 were volatile but no need since protected by spinlock with implied barriers/fence
248 int cur_msg; ///< Index of next buffer to attempt to use
249 unsigned long nsent; ///< Counts no. of AM sent for purpose of termination detection
250 unsigned long nrecv; ///< Counts no. of AM received for purpose of termination detection
252 std::vector<int> map_to_comm_world; ///< Maps rank in current MPI communicator to SafeMPI::COMM_WORLD
254 /// This handles all incoming RMI messages for all instances
255 static void handler(void *buf, std::size_t nbyte) {
256 // It will be singled threaded since only the RMI receiver
257 // thread will invoke it ... however note that nrecv will
258 // be read by the main thread during fence operations.
259 AmArg* arg = static_cast<AmArg*>(buf);
261 World* w = arg->get_world();
262 MADNESS_ASSERT(arg->size() + sizeof(AmArg) == nbyte);
265 func(*arg);
266 w->am.nrecv++; // Must be AFTER execution of the function
267 }
269 public:
270 WorldAmInterface(World& world);
272 virtual ~WorldAmInterface();
274 /// Currently a noop
275 void fence() {}
277 /// Sends a managed non-blocking active message
278 void send(ProcessID dest, am_handlerT op, const AmArg* arg,
279 const int attr=RMI::ATTR_ORDERED)
280 {
281 // Setup the header
282 {
283 AmArg* argx = const_cast<AmArg*>(arg);
285 argx->set_worldid(worldid);
286 argx->set_src(rank);
287 argx->set_func(op);
288 argx->clear_flags(); // Is this the right place for this?
289 }
291 // Sanity check
292 MADNESS_ASSERT(arg->get_world());
293 MADNESS_ASSERT(arg->get_func());
295 // Map dest from world's communicator to comm_world
296 dest = map_to_comm_world[dest];
298 // Remaining code refactored to avoid blocking with lock
299 // and to enable finer grained calls into MPI send
303 // The server thread may be executing a handler that
304 // is trying to send a message (e.g., assigning a
305 // remote future). However, the server must not block
306 // in here. It also needs to send messages in order,
307 // thus it puts all send_reqs onto a queue which it
308 // processes in its main loop using the RMI::send
309 // interface.
311 lock(); nsent++; unlock(); // This world must still keep track of messages
313 RMI::send_req.emplace_back(std::make_unique<SendReq>((AmArg*)(arg), RMI::isend(arg, arg->size()+sizeof(AmArg), dest, handler, attr)));
315 //std::cout << "sending message from server " << (void*)(arg) << " " << pthread_self() << " " << p << std::endl;
317 return;
318 }
321 // Find a free buffer oldest first (in order to assist
322 // with flow control). Exit loop with a lock on buffer.
324 // This design will need nsend >= nthreads
325 int i=-1;
326 while (i == -1) {
327 lock(); // << Protect cur_msg and nsent;
328 if (send_req[cur_msg].try_lock()) { // << matching unlock at end of routine
329 i = cur_msg;
330 cur_msg = (cur_msg + 1) % nsend;
331 nsent++;
332 }
333 unlock(); // << Protect cur_msg and nsent;
334 }
337 // If the buffer is still in-use wait for it to complete
338 while (!send_req[i].TestAndFree()) {
339 // If the oldest message has still not completed then
340 // there is likely severe network or end-point
341 // congestion, so pause for 100us in a rather
342 // arbitrary attempt to decrease the injection rate.
343 // Both the server thread and this call to Test()
344 // should ensure progress.
345 myusleep(100);
346 }
348 // Buffer is now free but still locked by me
349 send_req[i].set((AmArg*)(arg), RMI::isend(arg, arg->size()+sizeof(AmArg), dest, handler, attr));
350 send_req[i].unlock(); // << matches try_lock above
351 }
353 /// Frees as many send buffers as possible, returning the number that are free
355 int nfree = 0;
356 for (int i=0; i<nsend; i++) {
357 if (send_req[i].try_lock()) { // Someone may be trying to put a message into this buffer
358 if (send_req[i].TestAndFree()) nfree++;
359 send_req[i].unlock(); // matching unlock
360 }
361 }
362 return nfree;
363 }
365 };
double w(double t, double eps)
Definition DKops.h:22
Implements an archive wrapping a memory buffer.
Definition safempi.h:289
bool Test(MPI_Status &status)
Definition safempi.h:409
World active message that extends an RMI message.
Definition worldam.h:80
std::uint64_t worldid
Definition worldam.h:89
unsigned char header[RMI::HEADER_LEN]
Definition worldam.h:87
std::size_t nbyte
Definition worldam.h:88
archive::BufferOutputArchive make_output_arch() const
Definition worldam.h:124
archive::BufferOutputArchive operator&(const T &t) const
Used to serialize arguments into outgoing message.
Definition worldam.h:145
std::size_t size() const
Returns the size of the user's payload.
Definition worldam.h:135
void set_src(ProcessID source)
Definition worldam.h:101
bool is_pending() const
Definition worldam.h:114
AmArg & operator=(const AmArg &)
ProcessID src
Definition worldam.h:91
AmArg(const AmArg &)
friend AmArg * alloc_am_arg(std::size_t nbyte)
Allocates a new AmArg with nbytes of user data ... delete with free_am_arg.
Definition worldam.h:162
std::ptrdiff_t func
Definition worldam.h:90
Definition worldam.h:129
archive::BufferInputArchive operator&(T &t) const
Used to deserialize arguments from incoming message.
Definition worldam.h:139
unsigned char * buf() const
Returns a pointer to the user's payload (aligned in same way as AmArg)
Definition worldam.h:132
void set_worldid(unsigned long id)
Definition worldam.h:103
archive::BufferInputArchive make_input_arch() const
Definition worldam.h:120
am_handlerT get_func() const
Definition worldam.h:118
ProcessID get_src() const
For incoming AM gives the source process.
Definition worldam.h:150
unsigned int flags
Definition worldam.h:92
void set_size(std::size_t numbyte)
Definition worldam.h:110
void set_func(am_handlerT handler)
Definition worldam.h:105
void clear_flags()
Definition worldam.h:116
void set_pending()
Definition worldam.h:112
World * get_world() const
For incoming AM gives the associated world.
Definition worldam.h:154
std::uint64_t get_worldid() const
Return the world id.
Definition worldam.h:157
Mutex using pthread mutex operations.
Definition worldmutex.h:131
void unlock() const
Free a mutex owned by this thread.
Definition worldmutex.h:165
bool try_lock() const
Try to acquire the mutex ... return true on success, false on failure.
Definition worldmutex.h:150
void lock() const
Acquire the mutex waiting if necessary.
Definition worldmutex.h:155
static bool get_this_thread_is_server()
Definition worldrmi.h:186
static const size_t HEADER_LEN
Definition worldrmi.h:179
static const attrT ATTR_ORDERED
Definition worldrmi.h:181
static std::list< std::unique_ptr< RMISendReq > > send_req
Definition worldrmi.h:188
static Request isend(const void *buf, size_t nbyte, ProcessID dest, rmi_handlerT func, unsigned int attr=ATTR_UNORDERED)
Send a remote method invocation (again you should probably be looking at worldam.h instead)
Definition worldrmi.h:355
Definition worldam.h:218
void free()
Definition worldam.h:221
SendReq(AmArg *b, const RMI::Request &r)
Definition worldam.h:224
RMI::Request req
Definition worldam.h:220
Definition worldam.h:223
bool TestAndFree()
Definition worldam.h:226
void set(AmArg *b, const RMI::Request &r)
Definition worldam.h:225
Definition worldam.h:236
AmArg * buf
Definition worldam.h:219
Implements AM interface.
Definition worldam.h:207
const int nproc
Definition worldam.h:246
unsigned long worldid
The world which contains this instance of WorldAmInterface.
Definition worldam.h:244
int cur_msg
Index of next buffer to attempt to use.
Definition worldam.h:248
std::vector< int > map_to_comm_world
Maps rank in current MPI communicator to SafeMPI::COMM_WORLD.
Definition worldam.h:252
const ProcessID rank
Definition worldam.h:245
virtual ~WorldAmInterface()
unsigned long nsent
Counts no. of AM sent for purpose of termination detection.
Definition worldam.h:249
static const int DEFAULT_NSEND
Definition worldam.h:215
void fence()
Currently a noop.
Definition worldam.h:275
std::unique_ptr< SendReq[]> send_req
Send requests and managed buffers.
Definition worldam.h:243
int free_managed_buffers()
Frees as many send buffers as possible, returning the number that are free.
Definition worldam.h:354
void send(ProcessID dest, am_handlerT op, const AmArg *arg, const int attr=RMI::ATTR_ORDERED)
Sends a managed non-blocking active message.
Definition worldam.h:278
int nsend
Max no. of pending sends.
Definition worldam.h:242
unsigned long nrecv
Counts no. of AM received for purpose of termination detection.
Definition worldam.h:250
static void handler(void *buf, std::size_t nbyte)
This handles all incoming RMI messages for all instances.
Definition worldam.h:255
Provides collectives that interoperate with the AM and task interfaces.
Definition worldgop.h:145
Implements most parts of a globally addressable object (via unique ID).
Definition world_object.h:364
A parallel world class.
Definition world.h:132
static World * world_from_id(std::uint64_t id)
Convert a World ID to a World pointer.
Definition world.h:492
Wraps an archive around a memory buffer for input.
Definition buffer_archive.h:134
Wraps an archive around a memory buffer for output.
Definition buffer_archive.h:59
std::size_t size() const
Return the amount of data stored (counted) in the buffer.
Definition buffer_archive.h:123
auto T(World &world, response_space &f) -> response_space
std::ptrdiff_t to_rel_fn_ptr(const T &fn)
converts function or (free or static member) function pointer to the relative function pointer
Definition archive.h:237
Tensor< typename Tensor< T >::scalar_type > arg(const Tensor< T > &t)
Return a new tensor holding the argument of each element of t (complex types only)
Definition tensor.h:2503
Tensor< double > op(const Tensor< double > &x)
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
AmArg * copy_am_arg(const AmArg &arg)
Definition worldam.h:170
static void myusleep(unsigned int us)
Sleep or spin for specified number of microseconds.
Definition timers.h:164
std::shared_ptr< FunctionFunctorInterface< double, 3 > > func(new opT(g))
AmArg * new_am_arg(const argT &... args)
Convenience template for serializing arguments into a new AmArg.
Definition worldam.h:194
void free_am_arg(AmArg *arg)
Frees an AmArg allocated with alloc_am_arg.
Definition worldam.h:177
void(* am_handlerT)(const AmArg &)
Type of AM handler functions.
Definition worldam.h:77
void serialize_am_args(Archive &&)
Terminate argument serialization.
Definition worldam.h:184
AmArg * alloc_am_arg(std::size_t nbyte)
Allocates a new AmArg with nbytes of user data ... delete with free_am_arg.
Definition worldam.h:162
static const double b
This for RMI server thread to manage lifetime of WorldAM messages that it is sending.
Definition worldrmi.h:159
std::string ok(const bool b)
double source(const coordT &r)
Declares the World class for the parallel runtime environment.
Lowest level API for sending active messages — you should probably be looking at worldam....
int ProcessID
Used to clearly identify process number/rank.
Definition worldtypes.h:43