MADNESS 0.10.1
worldam.h
Go to the documentation of this file.
1/*
2 This file is part of MADNESS.
3
4 Copyright (C) 2007,2010 Oak Ridge National Laboratory
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
20 For more information please contact:
21
22 Robert J. Harrison
23 Oak Ridge National Laboratory
24 One Bethel Valley Road
25 P.O. Box 2008, MS-6367
26
27 email: harrisonrj@ornl.gov
28 tel: 865-241-3937
29 fax: 865-572-0680
30 */
31
32#ifndef MADNESS_WORLD_WORLDAM_H__INCLUDED
33#define MADNESS_WORLD_WORLDAM_H__INCLUDED
34
35/// \file worldam.h
36/// \brief Implements active message layer for World on top of RMI layer
37
40#include <madness/world/world.h>
41#include <vector>
42#include <cstddef>
43#include <memory>
44#include <pthread.h>
45
46namespace madness {
47
48 /*
49 The RMI layer just does transport and does not know about World
50 or even necessarily about MPI. It also has no buffering or
51 virtualization of resources. In particular, we must be careful
52 about having too many outstanding sends and active message
53 handlers must be careful about what they do in order to avoid
54 deadlock --- especially problematic is a handler trying to send
55 a message.
56
57 The WorldAM class provides a World-aware RMI capability that
58 limits the number of outstanding sends and can optionally manage
59 buffers. The issue of what handlers can do safely is handled by
60 the integration of WorldAM and the task queues ... if you have
61 an operation that might
62
63 - send messages
64
65 - take a long time
66
67 - consume a lot of stack/heap (e.g., recursive algorithm)
68
69 then the right thing to do is to send a task rather than
70 an active message.
71 */
72
73 template <class Derived> class WorldObject;
74
75 class AmArg;
76 /// Type of AM handler functions
77 typedef void (*am_handlerT)(const AmArg&);
78
79 /// World active message that extends an RMI message
80 class AmArg {
81 private:
82 friend class WorldAmInterface;
83 template <class Derived> friend class WorldObject;
84
85 friend AmArg* alloc_am_arg(std::size_t nbyte);
86
87 unsigned char header[RMI::HEADER_LEN]; // !!!!!!!!! MUST BE FIRST !!!!!!!!!!
88 std::size_t nbyte; // Size of user payload
89 std::uint64_t worldid; // Id of associated world
90 std::ptrdiff_t func; // User function to call, as a relative fn ptr (see archive::to_rel_fn_ptr)
91 ProcessID src; // Rank of process sending the message
92 unsigned int flags; // Misc. bit flags
93
94 // On 32 bit machine AmArg is HEADER_LEN+4+4+4+4+4=84 bytes
95 // On 64 bit machine AmArg is HEADER_LEN+8+8+8+4+4=96 bytes
96
97 // No copy constructor or assignment
98 AmArg(const AmArg&);
100
102
103 void set_worldid(unsigned long id) { worldid = id; }
104
105 void set_func(am_handlerT handler) {
106 MADNESS_ASSERT(handler);
107 func = archive::to_rel_fn_ptr(handler);
108 }
109
110 void set_size(std::size_t numbyte) { nbyte = numbyte; }
111
112 void set_pending() { flags |= 0x1ul; }
113
114 bool is_pending() const { return flags & 0x1ul; }
115
116 void clear_flags() { flags = 0; }
117
118 am_handlerT get_func() const { return archive::to_abs_fn_ptr<am_handlerT>(func); }
119
123
127
128 public:
129 AmArg() {}
130
131 /// Returns a pointer to the user's payload (aligned in same way as AmArg)
132 unsigned char* buf() const { return (unsigned char*)(this) + sizeof(AmArg); }
133
134 /// Returns the size of the user's payload
135 std::size_t size() const { return nbyte; }
136
137 /// Used to deserialize arguments from incoming message
138 template <typename T>
140 return make_input_arch() & t;
141 }
142
143 /// Used to serialize arguments into outgoing message
144 template <typename T>
146 return make_output_arch() & t;
147 }
148
149 /// For incoming AM gives the source process
150 ProcessID get_src() const { return src; }
151
152 // This is not inline in order to keep World opaque.
153 /// For incoming AM gives the associated world
155
156 /// Return the world id
157 std::uint64_t get_worldid() const { return worldid; }
158 };
159
160
161 /// Allocates a new AmArg with nbytes of user data ... delete with free_am_arg
162 inline AmArg* alloc_am_arg(std::size_t nbyte) {
163 std::size_t narg = 1 + (nbyte+sizeof(AmArg)-1)/sizeof(AmArg);
164 AmArg *arg = new AmArg[narg];
165 arg->set_size(nbyte);
166 return arg;
167 }
168
169
170 inline AmArg* copy_am_arg(const AmArg& arg) {
171 AmArg* r = alloc_am_arg(arg.size());
172 memcpy(reinterpret_cast<void*>(r), &arg, arg.size()+sizeof(AmArg));
173 return r;
174 }
175
176 /// Frees an AmArg allocated with alloc_am_arg
177 inline void free_am_arg(AmArg* arg) {
178 //std::cout << " freeing amarg " << (void*)(arg) << " " << pthread_self() << std::endl;
179 delete [] arg;
180 }
181
182 /// Terminate argument serialization
183 template <typename Archive>
184 inline void serialize_am_args(Archive&&) { }
185
186 /// Argument serialization
187 template <typename Archive, typename T, typename... argT>
188 inline void serialize_am_args(Archive&& archive, T&& t, argT&&... args) {
189 serialize_am_args(archive & t, std::forward<argT>(args)...);
190 }
191
192 /// Convenience template for serializing arguments into a new AmArg
193 template <typename... argT>
194 inline AmArg* new_am_arg(const argT&... args) {
195 // compute size
197 serialize_am_args(count, args...);
198
199 // Serialize arguments
200 AmArg* am_args = alloc_am_arg(count.size());
201 serialize_am_args(*am_args, args...);
202 return am_args;
203 }
204
205
206 /// Implements AM interface
208 friend class WorldGopInterface;
209 friend class World;
210 private:
211
212#ifdef HAVE_CRAYXT
213 static const int DEFAULT_NSEND = 512;
214#else
215 static const int DEFAULT_NSEND = 128;
216#endif
217
218 class SendReq : public SPINLOCK_TYPE, public RMISendReq {
221 void free() {if (buf) {free_am_arg(buf); buf=0;}}
222 public:
223 SendReq() : buf(0) {}
224 SendReq(AmArg* b, const RMI::Request& r) : buf(b), req(r) {} // lock is NOT set
225 void set(AmArg* b, const RMI::Request& r) {buf=b; req=r;} // assumes lock held
226 bool TestAndFree() { // assumes lock held if necessary
227 if (buf) {
228 bool ok = req.Test();
229 if (ok) free();
230 return ok;
231 }
232 else {
233 return true;
234 }
235 }
237 };
238
239 // Multiple threads are making their way thru here ... must be careful
240 // to ensure updates are atomic and consistent
241
242 int nsend; ///< Max no. of pending sends
243 std::unique_ptr<SendReq []> send_req; ///< Send requests and managed buffers
244 unsigned long worldid; ///< The world which contains this instance of WorldAmInterface
246 const int nproc;
247 // Next 3 were volatile but no need since protected by spinlock with implied barriers/fence
248 int cur_msg; ///< Index of next buffer to attempt to use
249 unsigned long nsent; ///< Counts no. of AM sent for purpose of termination detection
250 unsigned long nrecv; ///< Counts no. of AM received for purpose of termination detection
251
252 std::vector<int> map_to_comm_world; ///< Maps rank in current MPI communicator to SafeMPI::COMM_WORLD
253
254 /// This handles all incoming RMI messages for all instances
255 static void handler(void *buf, std::size_t nbyte) {
256 // It will be singled threaded since only the RMI receiver
257 // thread will invoke it ... however note that nrecv will
258 // be read by the main thread during fence operations.
259 AmArg* arg = static_cast<AmArg*>(buf);
261 World* w = arg->get_world();
262 MADNESS_ASSERT(arg->size() + sizeof(AmArg) == nbyte);
265 func(*arg);
266 w->am.nrecv++; // Must be AFTER execution of the function
267 }
268
269 public:
270 WorldAmInterface(World& world);
271
272 virtual ~WorldAmInterface();
273
274 /// Currently a noop
275 void fence() {}
276
277 /// Sends a managed non-blocking active message
278 void send(ProcessID dest, am_handlerT op, const AmArg* arg,
279 const int attr=RMI::ATTR_ORDERED)
280 {
281 // Setup the header
282 {
283 AmArg* argx = const_cast<AmArg*>(arg);
284
285 argx->set_worldid(worldid);
286 argx->set_src(rank);
287 argx->set_func(op);
288 argx->clear_flags(); // Is this the right place for this?
289 }
290
291 // Sanity check
292 MADNESS_ASSERT(arg->get_world());
293 MADNESS_ASSERT(arg->get_func());
294
295 // Map dest from world's communicator to comm_world
296 dest = map_to_comm_world[dest];
297
298 // Remaining code refactored to avoid blocking with lock
299 // and to enable finer grained calls into MPI send
300
302
303 // The server thread may be executing a handler that
304 // is trying to send a message (e.g., assigning a
305 // remote future). However, the server must not block
306 // in here. It also needs to send messages in order,
307 // thus it puts all send_reqs onto a queue which it
308 // processes in its main loop using the RMI::send
309 // interface.
310
311 lock(); nsent++; unlock(); // This world must still keep track of messages
312
313 RMI::send_req.emplace_back(std::make_unique<SendReq>((AmArg*)(arg), RMI::isend(arg, arg->size()+sizeof(AmArg), dest, handler, attr)));
314
315 //std::cout << "sending message from server " << (void*)(arg) << " " << pthread_self() << " " << p << std::endl;
316
317 return;
318 }
319
320
321 // Find a free buffer oldest first (in order to assist
322 // with flow control). Exit loop with a lock on buffer.
323
324 // This design will need nsend >= nthreads
325 int i=-1;
326 while (i == -1) {
327 lock(); // << Protect cur_msg and nsent;
328 if (send_req[cur_msg].try_lock()) { // << matching unlock at end of routine
329 i = cur_msg;
330 cur_msg = (cur_msg + 1) % nsend;
331 nsent++;
332 }
333 unlock(); // << Protect cur_msg and nsent;
334 }
335
336
337 // If the buffer is still in-use wait for it to complete
338 while (!send_req[i].TestAndFree()) {
339 // If the oldest message has still not completed then
340 // there is likely severe network or end-point
341 // congestion, so pause for 100us in a rather
342 // arbitrary attempt to decrease the injection rate.
343 // Both the server thread and this call to Test()
344 // should ensure progress.
345 myusleep(100);
346 }
347
348 // Buffer is now free but still locked by me
349 send_req[i].set((AmArg*)(arg), RMI::isend(arg, arg->size()+sizeof(AmArg), dest, handler, attr));
350 send_req[i].unlock(); // << matches try_lock above
351 }
352
353 /// Frees as many send buffers as possible, returning the number that are free
355 int nfree = 0;
356 for (int i=0; i<nsend; i++) {
357 if (send_req[i].try_lock()) { // Someone may be trying to put a message into this buffer
358 if (send_req[i].TestAndFree()) nfree++;
359 send_req[i].unlock(); // matching unlock
360 }
361 }
362 return nfree;
363 }
364
365 };
366}
367
368#endif // MADNESS_WORLD_WORLDAM_H__INCLUDED
double w(double t, double eps)
Definition DKops.h:22
Implements an archive wrapping a memory buffer.
Definition safempi.h:289
bool Test(MPI_Status &status)
Definition safempi.h:409
World active message that extends an RMI message.
Definition worldam.h:80
std::uint64_t worldid
Definition worldam.h:89
unsigned char header[RMI::HEADER_LEN]
Definition worldam.h:87
std::size_t nbyte
Definition worldam.h:88
archive::BufferOutputArchive make_output_arch() const
Definition worldam.h:124
archive::BufferOutputArchive operator&(const T &t) const
Used to serialize arguments into outgoing message.
Definition worldam.h:145
std::size_t size() const
Returns the size of the user's payload.
Definition worldam.h:135
void set_src(ProcessID source)
Definition worldam.h:101
bool is_pending() const
Definition worldam.h:114
AmArg & operator=(const AmArg &)
ProcessID src
Definition worldam.h:91
AmArg(const AmArg &)
friend AmArg * alloc_am_arg(std::size_t nbyte)
Allocates a new AmArg with nbytes of user data ... delete with free_am_arg.
Definition worldam.h:162
std::ptrdiff_t func
Definition worldam.h:90
AmArg()
Definition worldam.h:129
archive::BufferInputArchive operator&(T &t) const
Used to deserialize arguments from incoming message.
Definition worldam.h:139
unsigned char * buf() const
Returns a pointer to the user's payload (aligned in same way as AmArg)
Definition worldam.h:132
void set_worldid(unsigned long id)
Definition worldam.h:103
archive::BufferInputArchive make_input_arch() const
Definition worldam.h:120
am_handlerT get_func() const
Definition worldam.h:118
ProcessID get_src() const
For incoming AM gives the source process.
Definition worldam.h:150
unsigned int flags
Definition worldam.h:92
void set_size(std::size_t numbyte)
Definition worldam.h:110
void set_func(am_handlerT handler)
Definition worldam.h:105
void clear_flags()
Definition worldam.h:116
void set_pending()
Definition worldam.h:112
World * get_world() const
For incoming AM gives the associated world.
Definition worldam.h:154
std::uint64_t get_worldid() const
Return the world id.
Definition worldam.h:157
Mutex using pthread mutex operations.
Definition worldmutex.h:131
void unlock() const
Free a mutex owned by this thread.
Definition worldmutex.h:165
bool try_lock() const
Try to acquire the mutex ... return true on success, false on failure.
Definition worldmutex.h:150
void lock() const
Acquire the mutex waiting if necessary.
Definition worldmutex.h:155
static bool get_this_thread_is_server()
Definition worldrmi.h:186
static const size_t HEADER_LEN
Definition worldrmi.h:179
static const attrT ATTR_ORDERED
Definition worldrmi.h:181
static std::list< std::unique_ptr< RMISendReq > > send_req
Definition worldrmi.h:188
static Request isend(const void *buf, size_t nbyte, ProcessID dest, rmi_handlerT func, unsigned int attr=ATTR_UNORDERED)
Send a remote method invocation (again you should probably be looking at worldam.h instead)
Definition worldrmi.h:355
Definition worldam.h:218
void free()
Definition worldam.h:221
SendReq(AmArg *b, const RMI::Request &r)
Definition worldam.h:224
RMI::Request req
Definition worldam.h:220
SendReq()
Definition worldam.h:223
bool TestAndFree()
Definition worldam.h:226
void set(AmArg *b, const RMI::Request &r)
Definition worldam.h:225
~SendReq()
Definition worldam.h:236
AmArg * buf
Definition worldam.h:219
Implements AM interface.
Definition worldam.h:207
const int nproc
Definition worldam.h:246
unsigned long worldid
The world which contains this instance of WorldAmInterface.
Definition worldam.h:244
int cur_msg
Index of next buffer to attempt to use.
Definition worldam.h:248
std::vector< int > map_to_comm_world
Maps rank in current MPI communicator to SafeMPI::COMM_WORLD.
Definition worldam.h:252
const ProcessID rank
Definition worldam.h:245
virtual ~WorldAmInterface()
Definition worldam.cc:85
unsigned long nsent
Counts no. of AM sent for purpose of termination detection.
Definition worldam.h:249
static const int DEFAULT_NSEND
Definition worldam.h:215
void fence()
Currently a noop.
Definition worldam.h:275
std::unique_ptr< SendReq[]> send_req
Send requests and managed buffers.
Definition worldam.h:243
int free_managed_buffers()
Frees as many send buffers as possible, returning the number that are free.
Definition worldam.h:354
void send(ProcessID dest, am_handlerT op, const AmArg *arg, const int attr=RMI::ATTR_ORDERED)
Sends a managed non-blocking active message.
Definition worldam.h:278
int nsend
Max no. of pending sends.
Definition worldam.h:242
unsigned long nrecv
Counts no. of AM received for purpose of termination detection.
Definition worldam.h:250
static void handler(void *buf, std::size_t nbyte)
This handles all incoming RMI messages for all instances.
Definition worldam.h:255
Provides collectives that interoperate with the AM and task interfaces.
Definition worldgop.h:145
Implements most parts of a globally addressable object (via unique ID).
Definition world_object.h:364
A parallel world class.
Definition world.h:132
static World * world_from_id(std::uint64_t id)
Convert a World ID to a World pointer.
Definition world.h:474
Wraps an archive around a memory buffer for input.
Definition buffer_archive.h:134
Wraps an archive around a memory buffer for output.
Definition buffer_archive.h:59
std::size_t size() const
Return the amount of data stored (counted) in the buffer.
Definition buffer_archive.h:123
auto T(World &world, response_space &f) -> response_space
Definition global_functions.cc:34
std::ptrdiff_t to_rel_fn_ptr(const T &fn)
converts function or (free or static member) function pointer to the relative function pointer
Definition archive.h:237
Tensor< typename Tensor< T >::scalar_type > arg(const Tensor< T > &t)
Return a new tensor holding the argument of each element of t (complex types only)
Definition tensor.h:2502
Tensor< double > op(const Tensor< double > &x)
Definition kain.cc:508
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
AmArg * copy_am_arg(const AmArg &arg)
Definition worldam.h:170
static void myusleep(unsigned int us)
Sleep or spin for specified number of microseconds.
Definition timers.h:164
std::shared_ptr< FunctionFunctorInterface< double, 3 > > func(new opT(g))
AmArg * new_am_arg(const argT &... args)
Convenience template for serializing arguments into a new AmArg.
Definition worldam.h:194
void free_am_arg(AmArg *arg)
Frees an AmArg allocated with alloc_am_arg.
Definition worldam.h:177
void(* am_handlerT)(const AmArg &)
Type of AM handler functions.
Definition worldam.h:77
void serialize_am_args(Archive &&)
Terminate argument serialization.
Definition worldam.h:184
AmArg * alloc_am_arg(std::size_t nbyte)
Allocates a new AmArg with nbytes of user data ... delete with free_am_arg.
Definition worldam.h:162
static const double b
Definition nonlinschro.cc:119
This for RMI server thread to manage lifetime of WorldAM messages that it is sending.
Definition worldrmi.h:159
std::string ok(const bool b)
Definition test6.cc:43
double source(const coordT &r)
Definition testperiodic.cc:48
Declares the World class for the parallel runtime environment.
Lowest level API for sending active messages — you should probably be looking at worldam....
int ProcessID
Used to clearly identify process number/rank.
Definition worldtypes.h:43