MADNESS 0.10.1
binsorter.h
Go to the documentation of this file.
1
2#ifndef MADNESS_WORLD_BINSORTER_H__INCLUDED
3#define MADNESS_WORLD_BINSORTER_H__INCLUDED
4
8#include <vector>
9
10namespace madness {
11
12 /// A parallel bin sort across MPI processes
13 template <typename T, typename inserterT>
14 class BinSorter : public WorldObject< BinSorter<T,inserterT> > {
16 inserterT inserter;
17 std::size_t bufsize;
18 std::vector<T>* bins;
19
20 void flush(int owner) {
21 if (bins[owner].size())
22 this->send(owner, &BinSorter<T,inserterT>::sorter, bins[owner]);
23 bins[owner].clear();
24 }
25
26 void sorter(const std::vector<T>& v) {
27 for(typename std::vector<T>::const_iterator it = v.begin(); it != v.end(); ++it) {
28 inserter(*it);
29 }
30 }
31
32 public:
33 /// Constructs the sorter object
34
35 /// @param[in] world The world object that must persist during the existence of this object
36 /// @param[in] inserter User provides this routine to process an item of data on remote end
37 /// @param[in] bufsize Size of bin (in units of T) ... default value is as large as possible
38 BinSorter(World& world, inserterT inserter, int bufsize=0)
40 , pworld(&world)
43 , bins(new std::vector<T>[world.size()])
44 {
45 // bufsize ... max from AM buffer size is about 512K/sizeof(T)
46 // bufsize ... max from total buffer use is about 1GB/sizeof(T)/P
47 if (bufsize <= 0) this->bufsize = std::min((RMI::max_msg_len()-1024)/sizeof(T),(1<<30)/(world.size()*sizeof(T)));
48
49 // for (int i=0; i<world.size(); i++) {
50 // bins[i].reserve(bufsize); // Not a good idea on large process counts unless truly all to all?
51 // }
52
53 //print("binsorter bufsize is", this->bufsize, this->bufsize*sizeof(T));
55 }
56
57 virtual ~BinSorter() {
58 for (int i=0; i<pworld->size(); i++) {
59 MADNESS_ASSERT(bins[i].size() == 0);
60 }
61 delete [] bins;
62 }
63
64 /// Invoke to complete the sort, flush all buffers, and ensure communication/processing is complete
65 void finish() {
66 for (int i=0; i<pworld->size(); i++) {
67 flush(i);
68 }
69 pworld->gop.fence();
70 }
71
72 /// Application calls this to add a value to the bin for process p
73 void insert(ProcessID p, const T& value) {
74 bins[p].push_back(value);
75
76 // More intelligent buffer management would look at total use and flush
77 // largest buffers rather than using a fixed buffersize per process
78 if (bins[p].size() >= bufsize) flush(p);
79 }
80 };
81}
82
83#endif // MADNESS_WORLD_BINSORTER_H__INCLUDED
This header should include pretty much everything needed for the parallel runtime.
A parallel bin sort across MPI processes.
Definition binsorter.h:14
virtual ~BinSorter()
Definition binsorter.h:57
void finish()
Invoke to complete the sort, flush all buffers, and ensure communication/processing is complete.
Definition binsorter.h:65
BinSorter(World &world, inserterT inserter, int bufsize=0)
Constructs the sorter object.
Definition binsorter.h:38
void insert(ProcessID p, const T &value)
Application calls this to add a value to the bin for process p.
Definition binsorter.h:73
void flush(int owner)
Definition binsorter.h:20
inserterT inserter
Definition binsorter.h:16
std::vector< T > * bins
Definition binsorter.h:18
std::size_t bufsize
Definition binsorter.h:17
World * pworld
Definition binsorter.h:15
void sorter(const std::vector< T > &v)
Definition binsorter.h:26
static std::size_t max_msg_len()
Returns the size of recv buffers, in bytes.
Definition worldrmi.h:327
void fence(bool debug=false)
Synchronizes all processes in communicator AND globally ensures no pending AM or tasks.
Definition worldgop.cc:161
Implements most parts of a globally addressable object (via unique ID).
Definition world_object.h:364
World & world
The World this object belongs to. (Think globally, act locally).
Definition world_object.h:381
void process_pending()
To be called from derived constructor to process pending messages.
Definition world_object.h:656
detail::task_result_type< memfnT >::futureT send(ProcessID dest, memfnT memfn) const
Definition world_object.h:731
A parallel world class.
Definition world.h:132
ProcessID size() const
Returns the number of processes in this World (same as MPI_Comm_size()).
Definition world.h:328
WorldGopInterface & gop
Global operations.
Definition world.h:205
char * p(char *buf, const char *name, int k, int initial_level, double thresh, int order)
Definition derivatives.cc:72
auto T(World &world, response_space &f) -> response_space
Definition global_functions.cc:34
static const double v
Definition hatom_sf_dirac.cc:20
Defines madness::MadnessException for exception handling.
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
Definition mraimpl.h:50
Defines and implements WorldObject.
int ProcessID
Used to clearly identify process number/rank.
Definition worldtypes.h:43