MADNESS  0.10.1
systolic.h
Go to the documentation of this file.
1 #ifndef MADNESS_SYSTOLIC_H
2 #define MADNESS_SYSTOLIC_H
3 
4 /*
5  This file is part of MADNESS.
6 
7  Copyright (C) 2007,2010 Oak Ridge National Laboratory
8 
9  This program is free software; you can redistribute it and/or modify
10  it under the terms of the GNU General Public License as published by
11  the Free Software Foundation; either version 2 of the License, or
12  (at your option) any later version.
13 
14  This program is distributed in the hope that it will be useful,
15  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  GNU General Public License for more details.
18 
19  You should have received a copy of the GNU General Public License
20  along with this program; if not, write to the Free Software
21  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 
23  For more information please contact:
24 
25  Robert J. Harrison
26  Oak Ridge National Laboratory
27  One Bethel Valley Road
28  P.O. Box 2008, MS-6367
29 
30  email: harrisonrj@ornl.gov
31  tel: 865-241-3937
32  fax: 865-572-0680
33 */
34 
35 #include <madness/world/MADworld.h>
36 #include <utility>
37 #include <madness/tensor/tensor.h>
39 
40 #ifdef HAVE_INTEL_TBB
41 # include <tbb/parallel_for.h>
42 #endif
43 
44 namespace madness {
45 
46  /// Base class for parallel algorithms that employ a systolic loop to generate all row pairs in parallel
47  template <typename T>
49  private:
51  const int64_t nproc; ///< No. of processes with rows of the matrix (not size of world)
52  const int64_t coldim; ///< A(coldim,rowdim)
53  const int64_t rowdim; ///< A(coldim,rowdim)
54  const int64_t nlocal; ///< No. of local pairs
55  const ProcessID rank; ///< Rank of current process
56  const int tag; ///< MPI tag to be used for messages
57  std::vector<T*> iptr, jptr; ///< Indirection for implementing cyclic buffer !! SHOULD BE VOLATILE ?????
58  std::vector<int64_t> map; ///< Used to keep track of actual row indices
59 
60 #ifdef HAVE_INTEL_TBB
61  void iteration(const int nthread) {
62 
63  // Parallel initialization hook
64  tbb::parallel_for(0, nthread, [=](const int id) {
65  this->start_iteration_hook(TaskThreadEnv(nthread, id));
66  });
67 
68  if (nlocal > 0) {
69 // int64_t ilo, ihi;
70 // A.local_colrange(ilo, ihi);
71 
72  const int neven = coldim + (coldim&0x1);
73  const int pairlo = rank*A.coltile()/2;
74 
75  for (int loop=0; loop<(neven-1); ++loop) {
76 
77  // This loop is parallelized over threads
78  tbb::parallel_for(0, nthread,
79  [this,neven,pairlo,nthread,loop](const int id) {
80  for (int pair=id; pair<nlocal; pair+=nthread) {
81 
82  int rp = neven/2-1-(pair+pairlo);
83  int iii = (rp+loop)%(neven-1);
84  int jjj = (2*neven-2-rp+loop)%(neven-1);
85  if (rp == 0) jjj = neven-1;
86 
87  iii = map[iii];
88  jjj = map[jjj];
89 
90  if (jptr[pair]) {
91  this->kernel(iii, jjj, iptr[pair], jptr[pair]);
92  }
93  }
94  });
95 
96  cycle();
97 
98  }
99  }
100 
101  // Parallel finalization hook
102  tbb::parallel_for(0, nthread, [=](const int id) {
103  this->end_iteration_hook(TaskThreadEnv(nthread, id));
104  });
105 
106  }
107 #else
108  void iteration(const TaskThreadEnv& env) {
109 
110  env.barrier();
112  env.barrier();
113 
114  if (nlocal > 0) {
115  int64_t ilo, ihi;
116  A.local_colrange(ilo, ihi);
117 
118  int neven = coldim + (coldim&0x1);
119 
120  int pairlo = rank*A.coltile()/2;
121 
122  int threadid = env.id();
123  int nthread = env.nthread();
124 
125  for (int loop=0; loop<(neven-1); ++loop) {
126 
127  // This loop is parallelized over threads
128  for (int pair=env.id(); pair<nlocal; pair+=nthread) {
129 
130  int rp = neven/2-1-(pair+pairlo);
131  int iii = (rp+loop)%(neven-1);
132  int jjj = (2*neven-2-rp+loop)%(neven-1);
133  if (rp == 0) jjj = neven-1;
134 
135  iii = map[iii];
136  jjj = map[jjj];
137 
138  if (jptr[pair]) {
139  kernel(iii, jjj, iptr[pair], jptr[pair]);
140  }
141  }
142  env.barrier();
143 
144  if (threadid == 0) cycle();
145 
146  env.barrier();
147  }
148  }
149 
150  end_iteration_hook(env);
151 
152  env.barrier();
153  }
154 #endif // HAVE_INTEL_TBB
155 
156  /// Call this after iterating to restore correct order of rows in original matrix
157 
158  /// At the end of each iteration the matrix rows are logically back in
159  /// their correct order. However, due to indirection to reduce data motion,
160  /// if the local column dimension is not a factor of the number of cycles
161  /// the underlying data may be in a different order. This restores sanity.
162  ///
163  /// Only one thread should invoke this routine
164  void unshuffle() {
165  if (nlocal <= 0) return;
166  Tensor<T>& t = A.data();
167  Tensor<T> tmp(2L, t.dims(), false);
168  T* tp = tmp.ptr();
169  for (int64_t i=0; i<nlocal; ++i) {
170  memcpy(tp+i*rowdim, iptr[i], rowdim*sizeof(T));
171  if (jptr[i]) {
172  memcpy(tp+(i+nlocal)*rowdim, jptr[i], rowdim*sizeof(T));
173  }
174  iptr[i] = &t(i,0);
175  jptr[i] = &t(i+nlocal,0);
176  }
177  memcpy(t.ptr(), tmp.ptr(), t.size()*sizeof(T));
178 
179  if (rank==(nproc-1) && (coldim&0x1)) jptr[nlocal-1] = 0;
180  }
181 
182  /// Cycles data around the loop ... only one thread should invoke this
183  void cycle() {
184  if (coldim <= 2) return; // No cycling necessary
185  if (nlocal <= 0) { // Nothing local
187  return;
188  }
189 
190  // Check assumption that tiling put incomplete tile at the end
191  MADNESS_ASSERT(A.local_coldim() == A.coltile() || rank == (nproc-1));
192 
193  const ProcessID left = rank-1; //Invalid values are not used
194  const ProcessID right = rank+1;
195 
196  /*
197  Consider matrix (10,*) distributed with coltile=4 over
198  three processors.
199 
200  . 0 1 2 3 4 5 6 7 8 9
201 
202  This is divided up as follows into this initial
203  configuration for the loop
204 
205  . P=0 P=1 P=2
206  . msg msg
207  . i -->0-->1 --> 4-->5 --> 8 -->
208  . ^ | msg
209  . | <---------
210  . j <--2<--3 <-- 6<--7 <--| 9
211  . msg msg
212 
213  The first and last processes in the loop have to wrap ... others
214  just pass left and right. Note that 9 stays put.
215 
216  Note that the algorithm is assuming distribution puts equal
217  amount of data on all nodes except the last.
218 
219  The i data is considered as flowing to the right.
220  The j data is considered as flowing to the left.
221 
222 
223  Hence, we should explore the pairs in this order
224  (n-1 sets of n/2 pairs)
225 
226  . P=0 P=1 P=2
227  . 0 1 4 5 8
228  . 2 3 6 7 9
229 
230  . 2 0 1 4 5
231  . 3 6 7 8 9
232 
233  . 3 2 0 1 4
234  . 6 7 8 5 9
235 
236  . 6 3 2 0 1
237  . 7 8 5 4 9
238 
239  . 7 6 3 2 0
240  . 8 5 4 1 9
241 
242  . 8 7 6 3 2
243  . 5 4 1 0 9
244 
245  . 5 8 7 6 3
246  . 4 1 0 2 9
247 
248  . 4 5 8 7 6
249  . 1 0 2 3 9
250 
251  . 1 4 5 8 7
252  . 0 2 3 6 9
253  */
254 
255  // Copy end elements before they are overwritten
256  T* ilast = iptr[nlocal-1];
257  T* jfirst = jptr[0];
258 
259  // Cycle local pointers
260  for (int64_t i=0; i<nlocal-1; ++i) {
261  iptr[nlocal-i-1] = iptr[nlocal-i-2];
262  jptr[i] = jptr[i+1];
263  }
264 
265  World& world = A.get_world();
266 
267  if (nproc == 1) {
268  iptr[0] = jfirst;
269  jptr[nlocal-2] = ilast;
270  }
271  else if (rank == 0) {
272  iptr[0] = jfirst;
273  world.mpi.Send(ilast, rowdim, right, tag);
274  jptr[nlocal-1] = ilast;
275  world.mpi.Recv(ilast, rowdim, right, tag);
276  }
277  else if (rank == (nproc-1)) {
278  if (nlocal > 1) {
279  iptr[0] = jfirst;
280  jptr[nlocal-2] = ilast;
281  }
282  std::vector<T> buf(rowdim);
283  SafeMPI::Request req = world.mpi.Irecv(&buf[0], rowdim, left, tag);
284  world.mpi.Send(iptr[0], rowdim, left, tag);
285  world.await(req,false);
286  std::memcpy(iptr[0], &buf[0], rowdim*sizeof(T));
287  }
288  else {
289  std::vector<T> buf1(rowdim);
290  std::vector<T> buf2(rowdim);
291  SafeMPI::Request req1 = world.mpi.Irecv(&buf1[0], rowdim, left, tag);
292  SafeMPI::Request req2 = world.mpi.Irecv(&buf2[0], rowdim, right, tag);
293  world.mpi.Send( ilast, rowdim, right, tag);
294  world.mpi.Send(jfirst, rowdim, left, tag);
295  world.await(req1,false);
296  world.await(req2,false);
297  std::memcpy(ilast, &buf2[0], rowdim*sizeof(T)); //world.mpi.Recv( ilast, rowdim, right, tag);
298  std::memcpy(jfirst, &buf1[0], rowdim*sizeof(T)); //world.mpi.Recv(jfirst, rowdim, left, tag);
299 
300  iptr[0] = jfirst;
301  jptr[nlocal-1] = ilast;
302  }
303  }
304 
305  /// Get the task id
306 
307  /// \param id The id to set for this task
308  virtual void get_id(std::pair<void*,unsigned short>& id) const {
309  PoolTaskInterface::make_id(id, *this);
310  }
311 
312  public:
313  /// A must be a column distributed matrix with an even column tile >= 2
314 
315  /// It is assumed that it is the main thread invoking this.
316  /// @param[in,out] A The matrix on which the algorithm is performed and modified in-place
317  /// @param[in] tag The MPI tag used for communication (obtain from \c world.mpi.comm().unique_tag() )
318  /// @param[in] nthread The number of local threads to use (default is main thread all threads in the pool)
320  : A(A)
321  , nproc(A.process_coldim()*A.process_rowdim())
322  , coldim(A.coldim())
323  , rowdim(A.rowdim())
324  , nlocal((A.local_coldim()+1)/2)
325  , rank(A.get_world().rank())
326  , tag(tag)
327  , iptr(nlocal)
328  , jptr(nlocal)
329  , map(coldim+(coldim&0x1))
330  {
332 
333  MADNESS_ASSERT(A.is_column_distributed() && (nproc==1 || (A.coltile()&0x1)==0));
334 
335  // Initialize vectors of pointers to matrix rows)
336  Tensor<T>& t = A.data();
337 
338  //madness::print(nproc, coldim, rowdim, nlocal, rank, tag);
339 
340  for (int64_t i=0; i<nlocal; ++i) {
341  iptr[i] = &t(i,0);
342  jptr[i] = &t(i+nlocal,0);
343  }
344 
345  // If no. of rows is odd, last process should have an empty last row
346  if (rank==(nproc-1) && (coldim&0x1)) jptr[nlocal-1] = 0;
347 
348  // Initialize map from logical index order to actual index order
349 
350  int neven = (coldim+1)/2;
351  int ii=0;
352  for (ProcessID p=0; p<nproc; ++p) {
353  int64_t lo, hi;
354  A.get_colrange(p, lo, hi);
355  int p_nlocal = (hi - lo + 2)/2;
356  //print("I think process",p,"has",lo,hi,p_nlocal);
357  for (int i=0; i<p_nlocal; ++i) {
358  map[ii+i] = lo+i;
359  //map[coldim-ii-nlocal+i] = lo+i+nlocal;
360  map[ii+i+neven] = lo+i+p_nlocal;
361  }
362  ii += p_nlocal;
363  }
364 
365  std::reverse(map.begin(),map.begin()+neven);
366 
367  //print("MAP", map);
368  }
369 
371 
372  /// Threadsafe routine to apply the operation to rows i and j of the matrix
373 
374  /// @param[in] i First row index in the matrix
375  /// @param[in] j Second row index in the matrix
376  /// @param[in] rowi Pointer to row \c i of the matrix (to be modified by kernel in-place)
377  /// @param[in] rowj Pointer to row \c j of the matrix (to be modified by kernel in-place)
378  virtual void kernel(int i, int j, T* rowi, T* rowj) = 0;
379 
380 
381  /// Invoked simultaneously by all threads after each sweep to test for convergence
382 
383  /// There is a thread barrier before and after the invocation of this routine
384  /// @param[in] env The madness thread environment in case synchronization between threads is needed during computation of the convergence condition.
385  virtual bool converged(const TaskThreadEnv& env) const = 0;
386 
387 
388  /// Invoked by all threads at the start of each iteration
389 
390  /// There is a thread barrier before and after the invocation of this routine
391  /// @param[in] env The madness thread environment in case synchronization between threads is needed during startup.
392  virtual void start_iteration_hook(const TaskThreadEnv& env) {}
393 
394 
395  /// Invoked by all threads at the end of each iteration before convergence test
396 
397  /// There is a thread barrier before and after the invocation of this routine.
398  /// Note that the \c converged() method is \c const whereas this can modify the class.
399  /// @param[in] env The madness thread environment in case synchronization between threads is needed during startup.
400  virtual void end_iteration_hook(const TaskThreadEnv& env) {}
401 
402 
403 #ifdef HAVE_INTEL_TBB
404  /// Invoked by the task queue to run the algorithm with multiple threads
405 
406  /// This is a collective call ... all processes in world should submit this task
407  void run(World& world, const TaskThreadEnv& env) {
408  const int nthread = env.nthread();
409  bool done = false;
410  do {
411  iteration(nthread);
412  done = tbb::parallel_reduce(tbb::blocked_range<int>(0,nthread), true,
413  [=] (const tbb::blocked_range<int>& range, bool init) -> bool {
414  for(int id = range.begin(); id < range.end(); ++id)
415  init = init &&
416  this->converged(TaskThreadEnv(nthread, id));
417  return init;
418  },
419  [] (const bool l, const bool r) { return l && r; });
420 
421  } while (!done);
422 
423  unshuffle();
424  }
425 #else
426  /// Invoked by the task queue to run the algorithm with multiple threads
427 
428  /// This is a collective call ... all processes in world should submit this task
429  void run(World& world, const TaskThreadEnv& env) {
430  do {
431  iteration(env);
432  } while (!converged(env));
433 
434  if (env.id() == 0) unshuffle();
435 
436  env.barrier();
437  }
438 #endif // HAVE_INTEL_TBB
439 
440  /// Invoked by the user to run the algorithm with one thread mostly for debugging
441 
442  /// This is a collective call ... all processes in world should call this routine.
444  run(A.get_world(), TaskThreadEnv(1,0,0));
445  }
446 
447  /// Returns length of row
448  int64_t get_rowdim() const {return rowdim;}
449 
450 
451  /// Returns length of column
452  int64_t get_coldim() const {return coldim;}
453 
454  /// Returns a reference to the world
455  World& get_world() const {
456  return A.get_world();
457  }
458 
459  /// Returns rank of this process in the world
460  ProcessID get_rank() const {
461  return rank;
462  }
463  };
464 }
465 
466 #endif
This header should include pretty much everything needed for the parallel runtime.
Definition: test_ar.cc:118
Definition: safempi.h:289
const long * dims() const
Returns the array of tensor dimensions.
Definition: basetensor.h:153
long size() const
Returns the number of elements in the tensor.
Definition: basetensor.h:138
Manages data associated with a row/column/block distributed array.
Definition: distributed_matrix.h:388
static std::enable_if< detail::function_traits< fnT >::value||detail::memfunc_traits< fnT >::value >::type make_id(std::pair< void *, unsigned short > &id, fnT fn)
Definition: thread.h:916
void set_nthread(int nthread)
Call this to reset the number of threads before the task is submitted.
Definition: thread.h:1078
Base class for parallel algorithms that employ a systolic loop to generate all row pairs in parallel.
Definition: systolic.h:48
const int64_t nproc
No. of processes with rows of the matrix (not size of world)
Definition: systolic.h:51
const int64_t rowdim
A(coldim,rowdim)
Definition: systolic.h:53
const int tag
MPI tag to be used for messages.
Definition: systolic.h:56
const int64_t nlocal
No. of local pairs.
Definition: systolic.h:54
void solve_sequential()
Invoked by the user to run the algorithm with one thread mostly for debugging.
Definition: systolic.h:443
DistributedMatrix< T > & A
Definition: systolic.h:50
virtual void kernel(int i, int j, T *rowi, T *rowj)=0
Threadsafe routine to apply the operation to rows i and j of the matrix.
virtual void start_iteration_hook(const TaskThreadEnv &env)
Invoked by all threads at the start of each iteration.
Definition: systolic.h:392
void iteration(const int nthread)
Definition: systolic.h:61
int64_t get_rowdim() const
Returns length of row.
Definition: systolic.h:448
std::vector< T * > iptr
Definition: systolic.h:57
const int64_t coldim
A(coldim,rowdim)
Definition: systolic.h:52
virtual ~SystolicMatrixAlgorithm()
Definition: systolic.h:370
virtual bool converged(const TaskThreadEnv &env) const =0
Invoked simultaneously by all threads after each sweep to test for convergence.
virtual void get_id(std::pair< void *, unsigned short > &id) const
Get the task id.
Definition: systolic.h:308
const ProcessID rank
Rank of current process.
Definition: systolic.h:55
void run(World &world, const TaskThreadEnv &env)
Invoked by the task queue to run the algorithm with multiple threads.
Definition: systolic.h:407
ProcessID get_rank() const
Returns rank of this process in the world.
Definition: systolic.h:460
void cycle()
Cycles data around the loop ... only one thread should invoke this.
Definition: systolic.h:183
SystolicMatrixAlgorithm(DistributedMatrix< T > &A, int tag, int nthread=ThreadPool::size()+1)
A must be a column distributed matrix with an even column tile >= 2.
Definition: systolic.h:319
std::vector< T * > jptr
Indirection for implementing cyclic buffer !! SHOULD BE VOLATILE ?????
Definition: systolic.h:57
virtual void end_iteration_hook(const TaskThreadEnv &env)
Invoked by all threads at the end of each iteration before convergence test.
Definition: systolic.h:400
void unshuffle()
Call this after iterating to restore correct order of rows in original matrix.
Definition: systolic.h:164
std::vector< int64_t > map
Used to keep track of actual row indices.
Definition: systolic.h:58
int64_t get_coldim() const
Returns length of column.
Definition: systolic.h:452
World & get_world() const
Returns a reference to the world.
Definition: systolic.h:455
All world tasks must be derived from this public interface.
Definition: taskfn.h:69
volatile World * world
Definition: taskfn.h:72
Used to pass information about the thread environment to a user's task.
Definition: thread.h:466
int id() const
Get the ID of this thread.
Definition: thread.h:506
bool barrier() const
Definition: thread.h:514
int nthread() const
Get the number of threads collaborating on this task.
Definition: thread.h:499
A tensor is a multidimension array.
Definition: tensor.h:317
T * ptr()
Returns a pointer to the internal data.
Definition: tensor.h:1824
static std::size_t size()
Returns the number of threads in the pool.
Definition: thread.h:1413
SafeMPI::Request Irecv(T *buf, int count, int source, int tag=SafeMPI::DEFAULT_SEND_RECV_TAG) const
Async receive data of up to count elements from process source.
Definition: worldmpi.h:321
void Send(const T *buf, long lenbuf, int dest, int tag=SafeMPI::DEFAULT_SEND_RECV_TAG) const
Send array of lenbuf elements to process dest.
Definition: worldmpi.h:347
void Recv(T *buf, long lenbuf, int src, int tag) const
Receive data of up to lenbuf elements from process src.
Definition: worldmpi.h:374
A parallel world class.
Definition: world.h:132
static void await(SafeMPI::Request &request, bool dowork=true)
Wait for a MPI request to complete.
Definition: world.h:516
WorldMpiInterface & mpi
MPI interface.
Definition: world.h:202
char * p(char *buf, const char *name, int k, int initial_level, double thresh, int order)
Definition: derivatives.cc:72
static double lo
Definition: dirac-hatom.cc:23
auto T(World &world, response_space &f) -> response_space
Definition: global_functions.cc:34
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition: madness_exception.h:134
File holds all helper structures necessary for the CC_Operator and CC2 class.
Definition: DFParameters.h:10
Defines and implements most of Tensor.
int ProcessID
Used to clearly identify process number/rank.
Definition: worldtypes.h:43