MADNESS 0.10.1
worldrmi.h
Go to the documentation of this file.
1/*
2 This file is part of MADNESS.
3
4 Copyright (C) 2007,2010 Oak Ridge National Laboratory
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
20 For more information please contact:
21
22 Robert J. Harrison
23 Oak Ridge National Laboratory
24 One Bethel Valley Road
25 P.O. Box 2008, MS-6367
26
27 email: harrisonrj@ornl.gov
28 tel: 865-241-3937
29 fax: 865-572-0680
30*/
31
32#ifndef MADNESS_WORLD_WORLDRMI_H__INCLUDED
33#define MADNESS_WORLD_WORLDRMI_H__INCLUDED
34
39#include <sstream>
40#include <utility>
41#include <list>
42#include <memory>
43#include <tuple>
44#include <pthread.h>
45#include <madness/world/print.h>
46
47/*
48 There is just one server thread and it is the only one
49 messing with the recv buffers, so there is no need for
50 mutex on recv related data.
51
52 Multiple threads (including the server) may send hence
53 we need to be careful about send-related data.
54
55 When MPI is initialized we need to use init_thread with
56 multiple required.
57
58 This RMI service operates only in (a clone of) COMM_WORLD. It easy enough
59 to extend to other communicators but the point is to have
60 only one server thread for all possible uses. You just
61 have to translate rank_in_comm into rank_in_world by
62 getting the groups from both communicators using
63 MPI_Comm_group and then creating a map from ranks in
64 comm to ranks in world using MPI_Group_translate_ranks.
65
66 The class is a singleton ... i.e., there is only one instance of it
67 that is made the first time that you call RMI::instance().
68
69 Handler routines should have this type
70
71 typedef void (*rmi_handlerT)(void* buf, size_t nbyte);
72
73 There are few user accessible routines.
74
75 RMI::Request RMI::isend(const void* buf, size_t nbyte, int dest,
76 rmi_handlerT func, unsigned int attr=0)
77 - to send an asynchronous message
78 - RMI::Request has the same interface as SafeMPI::Request
79 (right now it is a SafeMPI::Request but this is not guaranteed)
80
81 void RMI::begin()
82 - to start the server thread
83
84 void RMI::end()
85 - to terminate the server thread
86
87 bool RMI::get_debug()
88 - to get the debug flag
89
90 void RMI::set_debug(bool)
91 - to set the debug flag
92
93*/
94
95/**
96 \file worldrmi.h
97 \brief Lowest level API for sending active messages --- you should probably be looking at worldam.h instead.
98 \addtogroup parallel_runtime
99 */
100
101namespace madness {
102
103 /// This is the generic low-level interface for a message handler
104 typedef void (*rmi_handlerT)(void* buf, size_t nbyte);
105 typedef std::ptrdiff_t rel_fn_ptr_t;
106
107 struct qmsg {
108 typedef uint16_t counterT; //!< counter for ordered messages
109 typedef uint32_t attrT; //!< attributes of the message; high 16 bits are the counter
110 size_t len;
112 int i; // buffer index
116
118 : len(len), func(func), i(i), src(src), attr(attr), count(count) {}
119
120 // N.B. since msg counters in same batch might wrap around 0, need to sort buckets defined by the 2 highest
121 // bits ... basically we want 11xxx < 00xxx < 01xxx < 10xxx < 11xxx ... assume we only have messages from
122 // at most 2 adjacent buckets, thus only when comparing counters from buckets 00 and 11 reverse the order
123 // P.S. we thus assume we won't have to deal with msg sequences > 2^14 (per rank)
124 friend inline bool operator<(const qmsg& a, const qmsg& b) {
125 const auto a_src = a.src;
126 const auto b_src = b.src;
127 if (a_src == b_src) {
128 const auto a_count_bk = a.count >> 14;
129 const auto b_count_bk = b.count >> 14;
130 if (a_count_bk == 0b00 && b_count_bk == 0b11) {
131 return false;
132 } else if (a_count_bk == 0b11 && b_count_bk == 0b00) {
133 return true;
134 } else {
135 return a.count < b.count;
136 }
137 } else {
138 return a_src < b_src;
139 }
140 }
141
142 qmsg() {}
143 }; // struct qmsg
144
145
146 // Holds message passing statistics
147 struct RMIStats {
148 uint64_t nmsg_sent;
149 uint64_t nbyte_sent;
150 uint64_t nmsg_recv;
151 uint64_t nbyte_recv;
153
156 };
157
158 /// This for RMI server thread to manage lifetime of WorldAM messages that it is sending
159 struct RMISendReq {
160 virtual bool TestAndFree() = 0;
161 virtual ~RMISendReq() {} // ESSENTIAL!!
162 };
163
164 /// This class implements the communications server thread and provides the only send interface
165 class RMI {
168
169 /// @return reference to the boolean variable indicating whether this thread is the server thread
170 static bool& is_server_thread_accessor();
171
172 public:
173
175
176 // Choose header length to hold at least sizeof(header) and
177 // also to ensure good alignment of the user payload.
178 static const size_t ALIGNMENT = 64;
179 static const size_t HEADER_LEN = ALIGNMENT;
180 static const attrT ATTR_UNORDERED=0x0;
181 static const attrT ATTR_ORDERED=0x1;
182
184
185 static void set_this_thread_is_server(bool flag = true) { is_server_thread_accessor() = flag;}
187
188 static std::list< std::unique_ptr<RMISendReq> > send_req; // List of outstanding world active messages sent by the server
189
190 private:
191
192 static void clear_send_req() {
193 //std::cout << "clearing server messages " << pthread_self() << std::endl;
194 stats.max_serv_send_q = std::max(stats.max_serv_send_q,uint64_t(send_req.size()));
195 auto it=send_req.begin();
196 while (it != send_req.end()) {
197 if ((*it)->TestAndFree())
198 it = send_req.erase(it);
199 else
200 ++it;
201 }
202 }
203
205#if HAVE_INTEL_TBB
206 : private madness::Mutex
207#else
208 : public madness::ThreadBase, private madness::Mutex
209#endif // HAVE_INTEL_TBB
210 {
211
212 public:
213
214 struct header {
217 }; // struct header
218
219 /// q of huge messages, each msg = {source,nbytes,tag}
220 std::list< std::tuple<int,size_t,int> > hugeq;
221
223 const int nproc; // No. of processes in comm world
224 const ProcessID rank; // Rank of this process
225 std::atomic<bool> finished; // True if finished ... atomic seems preferable to volatile
226 std::unique_ptr<counterT[]> send_counters; // used to be volatile but no need
227 std::unique_ptr<counterT[]> recv_counters;
228 std::size_t max_msg_len_;
229 std::size_t nrecv_;
231 std::size_t maxq_;
232 std::unique_ptr<void*[]> recv_buf; // Will be at least ALIGNMENT aligned ... +1 for huge messages
233 std::unique_ptr<SafeMPI::Request[]> recv_req;
234
235 std::unique_ptr<SafeMPI::Status[]> status;
236 std::unique_ptr<int[]> ind;
237 std::unique_ptr<qmsg[]> q;
239
240 static inline bool is_ordered(attrT attr) { return attr & ATTR_ORDERED; }
241
242 void process_some();
243
245 virtual ~RmiTask();
246
247 static void set_rmi_task_is_running(bool flag = true);
248
249#if HAVE_INTEL_TBB
250 void run() {
254
255 while (! finished) process_some();
256
259
260 finished = false; // to ensure that RmiTask::exit() that
261 // triggered the exit proceeds to completion
262 }
263#else
264 void run() {
267 try {
268 while (! finished) process_some();
269 finished = false;
270 } catch(...) {
271 delete this;
273 throw;
274 }
276 }
277#endif // HAVE_INTEL_TBB
278
279 void exit() {
280 if (debugging)
281 print_error(rank, ":RMI: sending exit request to server thread\n");
282
283 // Set finished flag
284 finished = true;
285 while(finished)
286 myusleep(1000);
287 }
288
289 static void huge_msg_handler(void *buf, size_t nbytein);
290
291 Request isend(const void* buf, size_t nbyte, ProcessID dest, rmi_handlerT func, attrT attr);
292
294
295 void post_recv_buf(int i);
296
297 private:
298
299 /// thread-safely round-robins through tags in [first_tag, first_tag+period) range
300 /// @returns new tag to be used in messaging
301 int unique_tag() const;
302 /// the period of tags returned by unique_tag()
303 /// @warning this bounds how many huge messages each RmiTask will be able to process
304 static constexpr int unique_tag_period() { return 2048; }
305
306 }; // class RmiTask
307
308
309 static std::unique_ptr<RmiTask> task_ptr; // Pointer to the singleton instance
311 static bool debugging; // True if debugging ... used to be volatile but no need
312
313 static const size_t DEFAULT_MAX_MSG_LEN = 3*512*1024; //!< the default size of recv buffers, in bytes; the actual size can be configured by the user via envvar MAD_BUFFER_SIZE
314 static const int DEFAULT_NRECV = 128; //!< the default # of recv buffers; the actual number can be configured by the user via envvar MAD_RECV_BUFFERS
315
316 // Not allowed
317 RMI(const RMI&);
318 RMI& operator=(const RMI&);
319
320 public:
321
322 /// Returns the size of recv buffers, in bytes
323
324 /// @return The size of recv buffers, in bytes
325 /// @note The default value is given by RMI::DEFAULT_MAX_MSG_LEN, can be overridden at runtime by the user via environment variable MAD_BUFFER_SIZE.
326 /// @warning Cannot be smaller than 1024 bytes.
327 static std::size_t max_msg_len() {
329 return task_ptr->max_msg_len_;
330 }
331 static std::size_t maxq() {
333 return task_ptr->maxq_;
334 }
335
336 /// Returns the number of recv buffers
337
338 /// @return The number of recv buffers
339 /// @note The default value is given by RMI::DEFAULT_NRECV, can be overridden at runtime by the user via environment variable MAD_RECV_BUFFERS
340 /// @warning Cannot be smaller than 32.
341 static std::size_t nrecv() {
343 return task_ptr->nrecv_;
344 }
345
346 /// Send a remote method invocation (again you should probably be looking at worldam.h instead)
347
348 /// @param[in] buf Pointer to the data buffer (do not modify until send is completed)
349 /// @param[in] nbyte Size of the data in bytes
350 /// @param[in] dest Process to receive the message
351 /// @param[in] func The function to handle the message on the remote end
352 /// @param[in] attr Attributes of the message (ATTR_UNORDERED or ATTR_ORDERED)
353 /// @return The status as an RMI::Request that presently is a SafeMPI::Request
354 static Request
355 isend(const void* buf, size_t nbyte, ProcessID dest, rmi_handlerT func, unsigned int attr=ATTR_UNORDERED) {
356 if(!task_ptr) {
358 "!! MADNESS RMI error: Attempting to send a message when the RMI thread is not running\n"
359 "!! MADNESS RMI error: This typically occurs when an active message is sent or a remote task is spawned after calling madness::finalize()\n");
360 MADNESS_EXCEPTION("!! MADNESS error: The RMI thread is not running", (task_ptr != nullptr));
361 }
362 return task_ptr->isend(buf, nbyte, dest, func, attr);
363 }
364
365 /// will complain to std::cerr and throw if ASLR is on by making
366 /// sure that address of this function matches across @p comm
367 /// @param[in] comm the communicator
368 static void assert_aslr_off(const SafeMPI::Intracomm& comm = SafeMPI::COMM_WORLD);
369
370 static void begin(const SafeMPI::Intracomm& comm = SafeMPI::COMM_WORLD);
371
372 static void end() {
373 if(task_ptr) {
374 task_ptr->exit();
375 //exit insures that RMI task is completed, therefore it is OK to delete it
376 task_ptr = nullptr;
377 }
378 }
379
380 static void set_debug(bool status) { debugging = status; }
381
382 static bool get_debug() { return debugging; }
383
384 static const RMIStats& get_stats() { return stats; }
385 }; // class RMI
386
387} // namespace madness
388
389#endif // MADNESS_WORLD_WORLDRMI_H__INCLUDED
Interface templates for the archives (serialization).
Wrapper around MPI_Comm. Has a shallow copy constructor; use Create(Get_group()) for deep copy.
Definition safempi.h:490
Definition safempi.h:289
Mutex using pthread mutex operations.
Definition worldmutex.h:131
Definition worldrmi.h:210
void post_pending_huge_msg()
Definition worldrmi.cc:180
const ProcessID rank
Definition worldrmi.h:224
std::size_t maxq_
Definition worldrmi.h:231
std::unique_ptr< counterT[]> recv_counters
Definition worldrmi.h:227
std::unique_ptr< void *[]> recv_buf
Definition worldrmi.h:232
std::unique_ptr< SafeMPI::Status[]> status
Definition worldrmi.h:235
std::atomic< bool > finished
Definition worldrmi.h:225
static bool is_ordered(attrT attr)
Definition worldrmi.h:240
int n_in_q
Definition worldrmi.h:238
void run()
Definition worldrmi.h:250
std::unique_ptr< counterT[]> send_counters
Definition worldrmi.h:226
std::unique_ptr< qmsg[]> q
Definition worldrmi.h:237
std::unique_ptr< int[]> ind
Definition worldrmi.h:236
static void huge_msg_handler(void *buf, size_t nbytein)
Definition worldrmi.cc:325
long nssend_
Definition worldrmi.h:230
static constexpr int unique_tag_period()
Definition worldrmi.h:304
void post_recv_buf(int i)
Definition worldrmi.cc:201
int unique_tag() const
Definition worldrmi.cc:488
std::size_t nrecv_
Definition worldrmi.h:229
Request isend(const void *buf, size_t nbyte, ProcessID dest, rmi_handlerT func, attrT attr)
SafeMPI::Intracomm comm
Definition worldrmi.h:222
std::unique_ptr< SafeMPI::Request[]> recv_req
Definition worldrmi.h:233
virtual ~RmiTask()
Definition worldrmi.cc:215
std::list< std::tuple< int, size_t, int > > hugeq
q of huge messages, each msg = {source,nbytes,tag}
Definition worldrmi.h:220
std::size_t max_msg_len_
Definition worldrmi.h:228
const int nproc
Definition worldrmi.h:223
void exit()
Definition worldrmi.h:279
void process_some()
Definition worldrmi.cc:61
static void set_rmi_task_is_running(bool flag=true)
Definition worldrmi.cc:411
This class implements the communications server thread and provides the only send interface.
Definition worldrmi.h:165
static const attrT ATTR_UNORDERED
Definition worldrmi.h:180
static void begin(const SafeMPI::Intracomm &comm=SafeMPI::COMM_WORLD)
Definition worldrmi.cc:374
static int testsome_backoff_us
Definition worldrmi.h:183
RMI(const RMI &)
qmsg::counterT counterT
Definition worldrmi.h:166
static bool & is_server_thread_accessor()
Definition worldrmi.cc:55
static void clear_send_req()
Definition worldrmi.h:192
static std::size_t nrecv()
Returns the number of recv buffers.
Definition worldrmi.h:341
static std::size_t maxq()
Definition worldrmi.h:331
static std::size_t max_msg_len()
Returns the size of recv buffers, in bytes.
Definition worldrmi.h:327
static bool debugging
Definition worldrmi.h:311
static bool get_this_thread_is_server()
Definition worldrmi.h:186
static void set_debug(bool status)
Definition worldrmi.h:380
static const size_t HEADER_LEN
Definition worldrmi.h:179
static const attrT ATTR_ORDERED
Definition worldrmi.h:181
static void assert_aslr_off(const SafeMPI::Intracomm &comm=SafeMPI::COMM_WORLD)
Definition worldrmi.cc:359
static std::list< std::unique_ptr< RMISendReq > > send_req
Definition worldrmi.h:188
static const size_t DEFAULT_MAX_MSG_LEN
the default size of recv buffers, in bytes; the actual size can be configured by the user via envvar ...
Definition worldrmi.h:313
static const int DEFAULT_NRECV
the default # of recv buffers; the actual number can be configured by the user via envvar MAD_RECV_BU...
Definition worldrmi.h:314
static bool get_debug()
Definition worldrmi.h:382
static void end()
Definition worldrmi.h:372
static void set_this_thread_is_server(bool flag=true)
Definition worldrmi.h:185
qmsg::attrT attrT
Definition worldrmi.h:167
static std::unique_ptr< RmiTask > task_ptr
Definition worldrmi.h:309
RMI & operator=(const RMI &)
static const size_t ALIGNMENT
Definition worldrmi.h:178
static const RMIStats & get_stats()
Definition worldrmi.h:384
static RMIStats stats
Definition worldrmi.h:310
SafeMPI::Request Request
Definition worldrmi.h:174
static Request isend(const void *buf, size_t nbyte, ProcessID dest, rmi_handlerT func, unsigned int attr=ATTR_UNORDERED)
Send a remote method invocation (again you should probably be looking at worldam.h instead)
Definition worldrmi.h:355
Simplified thread wrapper to hide pthread complexity.
Definition thread.h:164
void bind()
Definition thread.h:137
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
Intracomm COMM_WORLD
Definition safempi.cc:67
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
std::ptrdiff_t rel_fn_ptr_t
Definition worldrmi.h:105
ThreadBinder binder
Definition thread.cc:71
static void myusleep(unsigned int us)
Sleep or spin for specified number of microseconds.
Definition timers.h:164
std::shared_ptr< FunctionFunctorInterface< double, 3 > > func(new opT(g))
void(* rmi_handlerT)(void *buf, size_t nbyte)
This is the generic low-level interface for a message handler.
Definition worldrmi.h:104
void print_error(const T &t, const Ts &... ts)
Print items to std::cerr (items separated by spaces) and terminate with a new line.
Definition print.h:241
static const double b
Definition nonlinschro.cc:119
static const double a
Definition nonlinschro.cc:118
Defines simple templates for printing to std::cout "a la Python".
Serializes calls to MPI in case it does not support THREAD_MULTIPLE.
This for RMI server thread to manage lifetime of WorldAM messages that it is sending.
Definition worldrmi.h:159
virtual bool TestAndFree()=0
virtual ~RMISendReq()
Definition worldrmi.h:161
Definition worldrmi.h:147
uint64_t nbyte_recv
Definition worldrmi.h:151
uint64_t nbyte_sent
Definition worldrmi.h:149
uint64_t nmsg_recv
Definition worldrmi.h:150
RMIStats()
Definition worldrmi.h:154
uint64_t nmsg_sent
Definition worldrmi.h:148
uint64_t max_serv_send_q
Definition worldrmi.h:152
Definition worldrmi.h:214
attrT attr
Definition worldrmi.h:216
rel_fn_ptr_t func
Definition worldrmi.h:215
Definition worldrmi.h:107
rmi_handlerT func
Definition worldrmi.h:111
uint32_t attrT
attributes of the message; high 16 bits are the counter
Definition worldrmi.h:109
uint16_t counterT
counter for ordered messages
Definition worldrmi.h:108
friend bool operator<(const qmsg &a, const qmsg &b)
Definition worldrmi.h:124
qmsg()
Definition worldrmi.h:142
size_t len
Definition worldrmi.h:110
qmsg(size_t len, rmi_handlerT func, int i, int src, attrT attr, counterT count)
Definition worldrmi.h:117
attrT attr
Definition worldrmi.h:114
ProcessID src
Definition worldrmi.h:113
int i
Definition worldrmi.h:112
counterT count
Definition worldrmi.h:115
const char * status[2]
Definition testperiodic.cc:43
Implements Dqueue, Thread, ThreadBase and ThreadPool.
Defines types used by the parallel runtime.
int ProcessID
Used to clearly identify process number/rank.
Definition worldtypes.h:43