MADNESS  0.10.1
binsorter.h
Go to the documentation of this file.
1 
2 #ifndef MADNESS_WORLD_BINSORTER_H__INCLUDED
3 #define MADNESS_WORLD_BINSORTER_H__INCLUDED
4 
8 #include <vector>
9 
10 namespace madness {
11 
12  /// A parallel bin sort across MPI processes
13  template <typename T, typename inserterT>
14  class BinSorter : public WorldObject< BinSorter<T,inserterT> > {
16  inserterT inserter;
17  std::size_t bufsize;
18  std::vector<T>* bins;
19 
20  void flush(int owner) {
21  if (bins[owner].size())
22  this->send(owner, &BinSorter<T,inserterT>::sorter, bins[owner]);
23  bins[owner].clear();
24  }
25 
26  void sorter(const std::vector<T>& v) {
27  for(typename std::vector<T>::const_iterator it = v.begin(); it != v.end(); ++it) {
28  inserter(*it);
29  }
30  }
31 
32  public:
33  /// Constructs the sorter object
34 
35  /// @param[in] world The world object that must persist during the existence of this object
36  /// @param[in] inserter User provides this routine to process an item of data on remote end
37  /// @param[in] bufsize Size of bin (in units of T) ... default value is as large as possible
38  BinSorter(World& world, inserterT inserter, int bufsize=0)
40  , pworld(&world)
42  , bufsize(bufsize)
43  , bins(new std::vector<T>[world.size()])
44  {
45  // bufsize ... max from AM buffer size is about 512K/sizeof(T)
46  // bufsize ... max from total buffer use is about 1GB/sizeof(T)/P
47  if (bufsize <= 0) this->bufsize = std::min((RMI::max_msg_len()-1024)/sizeof(T),(1<<30)/(world.size()*sizeof(T)));
48 
49  // for (int i=0; i<world.size(); i++) {
50  // bins[i].reserve(bufsize); // Not a good idea on large process counts unless truly all to all?
51  // }
52 
53  //print("binsorter bufsize is", this->bufsize, this->bufsize*sizeof(T));
55  }
56 
57  virtual ~BinSorter() {
58  for (int i=0; i<pworld->size(); i++) {
59  MADNESS_ASSERT(bins[i].size() == 0);
60  }
61  delete [] bins;
62  }
63 
64  /// Invoke to complete the sort, flush all buffers, and ensure communication/processing is complete
65  void finish() {
66  for (int i=0; i<pworld->size(); i++) {
67  flush(i);
68  }
69  pworld->gop.fence();
70  }
71 
72  /// Application calls this to add a value to the bin for process p
73  void insert(ProcessID p, const T& value) {
74  bins[p].push_back(value);
75 
76  // More intelligent buffer management would look at total use and flush
77  // largest buffers rather than using a fixed buffersize per process
78  if (bins[p].size() >= bufsize) flush(p);
79  }
80  };
81 }
82 
83 #endif // MADNESS_WORLD_BINSORTER_H__INCLUDED
This header should include pretty much everything needed for the parallel runtime.
A parallel bin sort across MPI processes.
Definition: binsorter.h:14
virtual ~BinSorter()
Definition: binsorter.h:57
void finish()
Invoke to complete the sort, flush all buffers, and ensure communication/processing is complete.
Definition: binsorter.h:65
BinSorter(World &world, inserterT inserter, int bufsize=0)
Constructs the sorter object.
Definition: binsorter.h:38
void insert(ProcessID p, const T &value)
Application calls this to add a value to the bin for process p.
Definition: binsorter.h:73
void flush(int owner)
Definition: binsorter.h:20
inserterT inserter
Definition: binsorter.h:16
std::vector< T > * bins
Definition: binsorter.h:18
std::size_t bufsize
Definition: binsorter.h:17
World * pworld
Definition: binsorter.h:15
void sorter(const std::vector< T > &v)
Definition: binsorter.h:26
static std::size_t max_msg_len()
Returns the size of recv buffers, in bytes.
Definition: worldrmi.h:327
void fence(bool debug=false)
Synchronizes all processes in communicator AND globally ensures no pending AM or tasks.
Definition: worldgop.cc:161
Implements most parts of a globally addressable object (via unique ID).
Definition: world_object.h:364
detail::task_result_type< memfnT >::futureT send(ProcessID dest, memfnT memfn) const
Definition: world_object.h:731
World & world
The World this object belongs to. (Think globally, act locally).
Definition: world_object.h:381
void process_pending()
To be called from derived constructor to process pending messages.
Definition: world_object.h:656
A parallel world class.
Definition: world.h:132
ProcessID size() const
Returns the number of processes in this World (same as MPI_Comm_size()).
Definition: world.h:328
WorldGopInterface & gop
Global operations.
Definition: world.h:205
char * p(char *buf, const char *name, int k, int initial_level, double thresh, int order)
Definition: derivatives.cc:72
auto T(World &world, response_space &f) -> response_space
Definition: global_functions.cc:34
static const double v
Definition: hatom_sf_dirac.cc:20
Defines madness::MadnessException for exception handling.
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition: madness_exception.h:134
File holds all helper structures necessary for the CC_Operator and CC2 class.
Definition: DFParameters.h:10
Definition: mraimpl.h:50
Defines and implements WorldObject.
int ProcessID
Used to clearly identify process number/rank.
Definition: worldtypes.h:43