MADNESS  0.10.1
worldrmi.h
Go to the documentation of this file.
1 /*
2  This file is part of MADNESS.
3 
4  Copyright (C) 2007,2010 Oak Ridge National Laboratory
5 
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program; if not, write to the Free Software
18  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 
20  For more information please contact:
21 
22  Robert J. Harrison
23  Oak Ridge National Laboratory
24  One Bethel Valley Road
25  P.O. Box 2008, MS-6367
26 
27  email: harrisonrj@ornl.gov
28  tel: 865-241-3937
29  fax: 865-572-0680
30 */
31 
32 #ifndef MADNESS_WORLD_WORLDRMI_H__INCLUDED
33 #define MADNESS_WORLD_WORLDRMI_H__INCLUDED
34 
35 #include <madness/world/safempi.h>
36 #include <madness/world/thread.h>
38 #include <madness/world/archive.h>
39 #include <sstream>
40 #include <utility>
41 #include <list>
42 #include <memory>
43 #include <tuple>
44 #include <pthread.h>
45 #include <madness/world/print.h>
46 
47 /*
48  There is just one server thread and it is the only one
49  messing with the recv buffers, so there is no need for
50  mutex on recv related data.
51 
52  Multiple threads (including the server) may send hence
53  we need to be careful about send-related data.
54 
55  When MPI is initialized we need to use init_thread with
56  multiple required.
57 
58  This RMI service operates only in (a clone of) COMM_WORLD. It easy enough
59  to extend to other communicators but the point is to have
60  only one server thread for all possible uses. You just
61  have to translate rank_in_comm into rank_in_world by
62  getting the groups from both communicators using
63  MPI_Comm_group and then creating a map from ranks in
64  comm to ranks in world using MPI_Group_translate_ranks.
65 
66  The class is a singleton ... i.e., there is only one instance of it
67  that is made the first time that you call RMI::instance().
68 
69  Handler routines should have this type
70 
71  typedef void (*rmi_handlerT)(void* buf, size_t nbyte);
72 
73  There are few user accessible routines.
74 
75  RMI::Request RMI::isend(const void* buf, size_t nbyte, int dest,
76  rmi_handlerT func, unsigned int attr=0)
77  - to send an asynchronous message
78  - RMI::Request has the same interface as SafeMPI::Request
79  (right now it is a SafeMPI::Request but this is not guaranteed)
80 
81  void RMI::begin()
82  - to start the server thread
83 
84  void RMI::end()
85  - to terminate the server thread
86 
87  bool RMI::get_debug()
88  - to get the debug flag
89 
90  void RMI::set_debug(bool)
91  - to set the debug flag
92 
93 */
94 
95 /**
96  \file worldrmi.h
97  \brief Lowest level API for sending active messages --- you should probably be looking at worldam.h instead.
98  \addtogroup parallel_runtime
99  */
100 
101 namespace madness {
102 
103  /// This is the generic low-level interface for a message handler
104  typedef void (*rmi_handlerT)(void* buf, size_t nbyte);
105  typedef std::ptrdiff_t rel_fn_ptr_t;
106 
107  struct qmsg {
108  typedef uint16_t counterT; //!< counter for ordered messages
109  typedef uint32_t attrT; //!< attributes of the message; high 16 bits are the counter
110  size_t len;
112  int i; // buffer index
116 
118  : len(len), func(func), i(i), src(src), attr(attr), count(count) {}
119 
120  // N.B. since msg counters in same batch might wrap around 0, need to sort buckets defined by the 2 highest
121  // bits ... basically we want 11xxx < 00xxx < 01xxx < 10xxx < 11xxx ... assume we only have messages from
122  // at most 2 adjacent buckets, thus only when comparing counters from buckets 00 and 11 reverse the order
123  // P.S. we thus assume we won't have to deal with msg sequences > 2^14 (per rank)
124  friend inline bool operator<(const qmsg& a, const qmsg& b) {
125  const auto a_src = a.src;
126  const auto b_src = b.src;
127  if (a_src == b_src) {
128  const auto a_count_bk = a.count >> 14;
129  const auto b_count_bk = b.count >> 14;
130  if (a_count_bk == 0b00 && b_count_bk == 0b11) {
131  return false;
132  } else if (a_count_bk == 0b11 && b_count_bk == 0b00) {
133  return true;
134  } else {
135  return a.count < b.count;
136  }
137  } else {
138  return a_src < b_src;
139  }
140  }
141 
142  qmsg() {}
143  }; // struct qmsg
144 
145 
146  // Holds message passing statistics
147  struct RMIStats {
148  uint64_t nmsg_sent;
149  uint64_t nbyte_sent;
150  uint64_t nmsg_recv;
151  uint64_t nbyte_recv;
152  uint64_t max_serv_send_q;
153 
156  };
157 
158  /// This for RMI server thread to manage lifetime of WorldAM messages that it is sending
159  struct RMISendReq {
160  virtual bool TestAndFree() = 0;
161  virtual ~RMISendReq() {} // ESSENTIAL!!
162  };
163 
164  /// This class implements the communications server thread and provides the only send interface
165  class RMI {
168 
169  /// @return reference to the boolean variable indicating whether this thread is the server thread
170  static bool& is_server_thread_accessor();
171 
172  public:
173 
175 
176  // Choose header length to hold at least sizeof(header) and
177  // also to ensure good alignment of the user payload.
178  static const size_t ALIGNMENT = 64;
179  static const size_t HEADER_LEN = ALIGNMENT;
180  static const attrT ATTR_UNORDERED=0x0;
181  static const attrT ATTR_ORDERED=0x1;
182 
184 
185  static void set_this_thread_is_server(bool flag = true) { is_server_thread_accessor() = flag;}
187 
188  static std::list< std::unique_ptr<RMISendReq> > send_req; // List of outstanding world active messages sent by the server
189 
190  private:
191 
192  static void clear_send_req() {
193  //std::cout << "clearing server messages " << pthread_self() << std::endl;
195  auto it=send_req.begin();
196  while (it != send_req.end()) {
197  if ((*it)->TestAndFree())
198  it = send_req.erase(it);
199  else
200  ++it;
201  }
202  }
203 
204  class RmiTask
205 #if HAVE_INTEL_TBB
206  : private madness::Mutex
207 #else
208  : public madness::ThreadBase, private madness::Mutex
209 #endif // HAVE_INTEL_TBB
210  {
211 
212  public:
213 
214  struct header {
217  }; // struct header
218 
219  /// q of huge messages, each msg = {source,nbytes,tag}
220  std::list< std::tuple<int,size_t,int> > hugeq;
221 
223  const int nproc; // No. of processes in comm world
224  const ProcessID rank; // Rank of this process
225  std::atomic<bool> finished; // True if finished ... atomic seems preferable to volatile
226  std::unique_ptr<counterT[]> send_counters; // used to be volatile but no need
227  std::unique_ptr<counterT[]> recv_counters;
228  std::size_t max_msg_len_;
229  std::size_t nrecv_;
230  long nssend_;
231  std::size_t maxq_;
232  std::unique_ptr<void*[]> recv_buf; // Will be at least ALIGNMENT aligned ... +1 for huge messages
233  std::unique_ptr<SafeMPI::Request[]> recv_req;
234 
235  std::unique_ptr<SafeMPI::Status[]> status;
236  std::unique_ptr<int[]> ind;
237  std::unique_ptr<qmsg[]> q;
238  int n_in_q;
239 
240  static inline bool is_ordered(attrT attr) { return attr & ATTR_ORDERED; }
241 
242  void process_some();
243 
245  virtual ~RmiTask();
246 
247  static void set_rmi_task_is_running(bool flag = true);
248 
249 #if HAVE_INTEL_TBB
250  void run() {
254 
255  while (! finished) process_some();
256 
259 
260  finished = false; // to ensure that RmiTask::exit() that
261  // triggered the exit proceeds to completion
262  }
263 #else
264  void run() {
267  try {
268  while (! finished) process_some();
269  finished = false;
270  } catch(...) {
271  delete this;
273  throw;
274  }
276  }
277 #endif // HAVE_INTEL_TBB
278 
279  void exit() {
280  if (debugging)
281  print_error(rank, ":RMI: sending exit request to server thread\n");
282 
283  // Set finished flag
284  finished = true;
285  while(finished)
286  myusleep(1000);
287  }
288 
289  static void huge_msg_handler(void *buf, size_t nbytein);
290 
291  Request isend(const void* buf, size_t nbyte, ProcessID dest, rmi_handlerT func, attrT attr);
292 
293  void post_pending_huge_msg();
294 
295  void post_recv_buf(int i);
296 
297  private:
298 
299  /// thread-safely round-robins through tags in [first_tag, first_tag+period) range
300  /// @returns new tag to be used in messaging
301  int unique_tag() const;
302  /// the period of tags returned by unique_tag()
303  /// @warning this bounds how many huge messages each RmiTask will be able to process
304  static constexpr int unique_tag_period() { return 2048; }
305 
306  }; // class RmiTask
307 
308 
309  static std::unique_ptr<RmiTask> task_ptr; // Pointer to the singleton instance
310  static RMIStats stats;
311  static bool debugging; // True if debugging ... used to be volatile but no need
312 
313  static const size_t DEFAULT_MAX_MSG_LEN = 3*512*1024; //!< the default size of recv buffers, in bytes; the actual size can be configured by the user via envvar MAD_BUFFER_SIZE
314  static const int DEFAULT_NRECV = 128; //!< the default # of recv buffers; the actual number can be configured by the user via envvar MAD_RECV_BUFFERS
315 
316  // Not allowed
317  RMI(const RMI&);
318  RMI& operator=(const RMI&);
319 
320  public:
321 
322  /// Returns the size of recv buffers, in bytes
323 
324  /// @return The size of recv buffers, in bytes
325  /// @note The default value is given by RMI::DEFAULT_MAX_MSG_LEN, can be overridden at runtime by the user via environment variable MAD_BUFFER_SIZE.
326  /// @warning Cannot be smaller than 1024 bytes.
327  static std::size_t max_msg_len() {
329  return task_ptr->max_msg_len_;
330  }
331  static std::size_t maxq() {
333  return task_ptr->maxq_;
334  }
335 
336  /// Returns the number of recv buffers
337 
338  /// @return The number of recv buffers
339  /// @note The default value is given by RMI::DEFAULT_NRECV, can be overridden at runtime by the user via environment variable MAD_RECV_BUFFERS
340  /// @warning Cannot be smaller than 32.
341  static std::size_t nrecv() {
343  return task_ptr->nrecv_;
344  }
345 
346  /// Send a remote method invocation (again you should probably be looking at worldam.h instead)
347 
348  /// @param[in] buf Pointer to the data buffer (do not modify until send is completed)
349  /// @param[in] nbyte Size of the data in bytes
350  /// @param[in] dest Process to receive the message
351  /// @param[in] func The function to handle the message on the remote end
352  /// @param[in] attr Attributes of the message (ATTR_UNORDERED or ATTR_ORDERED)
353  /// @return The status as an RMI::Request that presently is a SafeMPI::Request
354  static Request
355  isend(const void* buf, size_t nbyte, ProcessID dest, rmi_handlerT func, unsigned int attr=ATTR_UNORDERED) {
356  if(!task_ptr) {
357  print_error(
358  "!! MADNESS RMI error: Attempting to send a message when the RMI thread is not running\n"
359  "!! MADNESS RMI error: This typically occurs when an active message is sent or a remote task is spawned after calling madness::finalize()\n");
360  MADNESS_EXCEPTION("!! MADNESS error: The RMI thread is not running", (task_ptr != nullptr));
361  }
362  return task_ptr->isend(buf, nbyte, dest, func, attr);
363  }
364 
365  /// will complain to std::cerr and throw if ASLR is on by making
366  /// sure that address of this function matches across @p comm
367  /// @param[in] comm the communicator
368  static void assert_aslr_off(const SafeMPI::Intracomm& comm = SafeMPI::COMM_WORLD);
369 
370  static void begin(const SafeMPI::Intracomm& comm = SafeMPI::COMM_WORLD);
371 
372  static void end() {
373  if(task_ptr) {
374  task_ptr->exit();
375  //exit insures that RMI task is completed, therefore it is OK to delete it
376  task_ptr = nullptr;
377  }
378  }
379 
380  static void set_debug(bool status) { debugging = status; }
381 
382  static bool get_debug() { return debugging; }
383 
384  static const RMIStats& get_stats() { return stats; }
385  }; // class RMI
386 
387 } // namespace madness
388 
389 #endif // MADNESS_WORLD_WORLDRMI_H__INCLUDED
Interface templates for the archives (serialization).
Wrapper around MPI_Comm. Has a shallow copy constructor; use Create(Get_group()) for deep copy.
Definition: safempi.h:490
Definition: safempi.h:289
Mutex using pthread mutex operations.
Definition: worldmutex.h:131
Definition: worldrmi.h:210
void post_pending_huge_msg()
Definition: worldrmi.cc:180
RmiTask(const SafeMPI::Intracomm &comm=SafeMPI::COMM_WORLD)
Definition: worldrmi.cc:227
const ProcessID rank
Definition: worldrmi.h:224
std::size_t maxq_
Definition: worldrmi.h:231
std::unique_ptr< counterT[]> recv_counters
Definition: worldrmi.h:227
std::unique_ptr< void *[]> recv_buf
Definition: worldrmi.h:232
std::unique_ptr< SafeMPI::Status[]> status
Definition: worldrmi.h:235
std::atomic< bool > finished
Definition: worldrmi.h:225
static bool is_ordered(attrT attr)
Definition: worldrmi.h:240
int n_in_q
Definition: worldrmi.h:238
void run()
Definition: worldrmi.h:250
std::unique_ptr< counterT[]> send_counters
Definition: worldrmi.h:226
std::unique_ptr< qmsg[]> q
Definition: worldrmi.h:237
std::unique_ptr< int[]> ind
Definition: worldrmi.h:236
static void huge_msg_handler(void *buf, size_t nbytein)
Definition: worldrmi.cc:325
long nssend_
Definition: worldrmi.h:230
static constexpr int unique_tag_period()
Definition: worldrmi.h:304
void post_recv_buf(int i)
Definition: worldrmi.cc:201
int unique_tag() const
Definition: worldrmi.cc:488
std::size_t nrecv_
Definition: worldrmi.h:229
Request isend(const void *buf, size_t nbyte, ProcessID dest, rmi_handlerT func, attrT attr)
SafeMPI::Intracomm comm
Definition: worldrmi.h:222
std::unique_ptr< SafeMPI::Request[]> recv_req
Definition: worldrmi.h:233
virtual ~RmiTask()
Definition: worldrmi.cc:215
std::list< std::tuple< int, size_t, int > > hugeq
q of huge messages, each msg = {source,nbytes,tag}
Definition: worldrmi.h:220
std::size_t max_msg_len_
Definition: worldrmi.h:228
const int nproc
Definition: worldrmi.h:223
void exit()
Definition: worldrmi.h:279
void process_some()
Definition: worldrmi.cc:61
static void set_rmi_task_is_running(bool flag=true)
Definition: worldrmi.cc:411
This class implements the communications server thread and provides the only send interface.
Definition: worldrmi.h:165
static const attrT ATTR_UNORDERED
Definition: worldrmi.h:180
static void begin(const SafeMPI::Intracomm &comm=SafeMPI::COMM_WORLD)
Definition: worldrmi.cc:374
static int testsome_backoff_us
Definition: worldrmi.h:183
RMI(const RMI &)
qmsg::counterT counterT
Definition: worldrmi.h:166
static bool & is_server_thread_accessor()
Definition: worldrmi.cc:55
static void clear_send_req()
Definition: worldrmi.h:192
static const RMIStats & get_stats()
Definition: worldrmi.h:384
static std::size_t nrecv()
Returns the number of recv buffers.
Definition: worldrmi.h:341
static std::size_t maxq()
Definition: worldrmi.h:331
static std::size_t max_msg_len()
Returns the size of recv buffers, in bytes.
Definition: worldrmi.h:327
static bool debugging
Definition: worldrmi.h:311
static bool get_this_thread_is_server()
Definition: worldrmi.h:186
static void set_debug(bool status)
Definition: worldrmi.h:380
static const size_t HEADER_LEN
Definition: worldrmi.h:179
static const attrT ATTR_ORDERED
Definition: worldrmi.h:181
static void assert_aslr_off(const SafeMPI::Intracomm &comm=SafeMPI::COMM_WORLD)
Definition: worldrmi.cc:359
RMI & operator=(const RMI &)
static std::list< std::unique_ptr< RMISendReq > > send_req
Definition: worldrmi.h:188
static const size_t DEFAULT_MAX_MSG_LEN
the default size of recv buffers, in bytes; the actual size can be configured by the user via envvar ...
Definition: worldrmi.h:313
static const int DEFAULT_NRECV
the default # of recv buffers; the actual number can be configured by the user via envvar MAD_RECV_BU...
Definition: worldrmi.h:314
static bool get_debug()
Definition: worldrmi.h:382
static void end()
Definition: worldrmi.h:372
static void set_this_thread_is_server(bool flag=true)
Definition: worldrmi.h:185
qmsg::attrT attrT
Definition: worldrmi.h:167
static std::unique_ptr< RmiTask > task_ptr
Definition: worldrmi.h:309
static const size_t ALIGNMENT
Definition: worldrmi.h:178
static RMIStats stats
Definition: worldrmi.h:310
SafeMPI::Request Request
Definition: worldrmi.h:174
static Request isend(const void *buf, size_t nbyte, ProcessID dest, rmi_handlerT func, unsigned int attr=ATTR_UNORDERED)
Send a remote method invocation (again you should probably be looking at worldam.h instead)
Definition: worldrmi.h:355
Simplified thread wrapper to hide pthread complexity.
Definition: thread.h:164
void bind()
Definition: thread.h:137
#define max(a, b)
Definition: lda.h:51
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition: madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition: madness_exception.h:134
Intracomm COMM_WORLD
Definition: safempi.cc:67
File holds all helper structures necessary for the CC_Operator and CC2 class.
Definition: DFParameters.h:10
std::ptrdiff_t rel_fn_ptr_t
Definition: worldrmi.h:105
ThreadBinder binder
Definition: thread.cc:71
static void myusleep(unsigned int us)
Sleep or spin for specified number of microseconds.
Definition: timers.h:164
void(* rmi_handlerT)(void *buf, size_t nbyte)
This is the generic low-level interface for a message handler.
Definition: worldrmi.h:104
std::shared_ptr< FunctionFunctorInterface< double, 3 > > func(new opT(g))
void print_error(const T &t, const Ts &... ts)
Print items to std::cerr (items separated by spaces) and terminate with a new line.
Definition: print.h:241
static const double b
Definition: nonlinschro.cc:119
static const double a
Definition: nonlinschro.cc:118
Defines simple templates for printing to std::cout "a la Python".
Serializes calls to MPI in case it does not support THREAD_MULTIPLE.
This for RMI server thread to manage lifetime of WorldAM messages that it is sending.
Definition: worldrmi.h:159
virtual bool TestAndFree()=0
virtual ~RMISendReq()
Definition: worldrmi.h:161
Definition: worldrmi.h:147
uint64_t nbyte_recv
Definition: worldrmi.h:151
uint64_t nbyte_sent
Definition: worldrmi.h:149
uint64_t nmsg_recv
Definition: worldrmi.h:150
RMIStats()
Definition: worldrmi.h:154
uint64_t nmsg_sent
Definition: worldrmi.h:148
uint64_t max_serv_send_q
Definition: worldrmi.h:152
Definition: worldrmi.h:214
attrT attr
Definition: worldrmi.h:216
rel_fn_ptr_t func
Definition: worldrmi.h:215
Definition: worldrmi.h:107
rmi_handlerT func
Definition: worldrmi.h:111
uint32_t attrT
attributes of the message; high 16 bits are the counter
Definition: worldrmi.h:109
uint16_t counterT
counter for ordered messages
Definition: worldrmi.h:108
friend bool operator<(const qmsg &a, const qmsg &b)
Definition: worldrmi.h:124
qmsg()
Definition: worldrmi.h:142
size_t len
Definition: worldrmi.h:110
qmsg(size_t len, rmi_handlerT func, int i, int src, attrT attr, counterT count)
Definition: worldrmi.h:117
attrT attr
Definition: worldrmi.h:114
ProcessID src
Definition: worldrmi.h:113
int i
Definition: worldrmi.h:112
counterT count
Definition: worldrmi.h:115
const char * status[2]
Definition: testperiodic.cc:43
Implements Dqueue, Thread, ThreadBase and ThreadPool.
Defines types used by the parallel runtime.
int ProcessID
Used to clearly identify process number/rank.
Definition: worldtypes.h:43