MADNESS 0.10.1
systolic.h
Go to the documentation of this file.
1#ifndef MADNESS_SYSTOLIC_H
2#define MADNESS_SYSTOLIC_H
3
4/*
5 This file is part of MADNESS.
6
7 Copyright (C) 2007,2010 Oak Ridge National Laboratory
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 2 of the License, or
12 (at your option) any later version.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22
23 For more information please contact:
24
25 Robert J. Harrison
26 Oak Ridge National Laboratory
27 One Bethel Valley Road
28 P.O. Box 2008, MS-6367
29
30 email: harrisonrj@ornl.gov
31 tel: 865-241-3937
32 fax: 865-572-0680
33*/
34
36#include <utility>
39
40#ifdef HAVE_INTEL_TBB
41# include <tbb/parallel_for.h>
42#endif
43
44namespace madness {
45
46 /// Base class for parallel algorithms that employ a systolic loop to generate all row pairs in parallel
47 template <typename T>
49 private:
51 const int64_t nproc; ///< No. of processes with rows of the matrix (not size of world)
52 const int64_t coldim; ///< A(coldim,rowdim)
53 const int64_t rowdim; ///< A(coldim,rowdim)
54 const int64_t nlocal; ///< No. of local pairs
55 const ProcessID rank; ///< Rank of current process
56 const int tag; ///< MPI tag to be used for messages
57 std::vector<T*> iptr, jptr; ///< Indirection for implementing cyclic buffer !! SHOULD BE VOLATILE ?????
58 std::vector<int64_t> map; ///< Used to keep track of actual row indices
59
60#ifdef HAVE_INTEL_TBB
61 void iteration(const int nthread) {
62
63 // Parallel initialization hook
64 tbb::parallel_for(0, nthread, [=](const int id) {
65 this->start_iteration_hook(TaskThreadEnv(nthread, id));
66 });
67
68 if (nlocal > 0) {
69// int64_t ilo, ihi;
70// A.local_colrange(ilo, ihi);
71
72 const int neven = coldim + (coldim&0x1);
73 const int pairlo = rank*A.coltile()/2;
74
75 for (int loop=0; loop<(neven-1); ++loop) {
76
77 // This loop is parallelized over threads
78 tbb::parallel_for(0, nthread,
79 [this,neven,pairlo,nthread,loop](const int id) {
80 for (int pair=id; pair<nlocal; pair+=nthread) {
81
82 int rp = neven/2-1-(pair+pairlo);
83 int iii = (rp+loop)%(neven-1);
84 int jjj = (2*neven-2-rp+loop)%(neven-1);
85 if (rp == 0) jjj = neven-1;
86
87 iii = map[iii];
88 jjj = map[jjj];
89
90 if (jptr[pair]) {
91 this->kernel(iii, jjj, iptr[pair], jptr[pair]);
92 }
93 }
94 });
95
96 cycle();
97
98 }
99 }
100
101 // Parallel finalization hook
102 tbb::parallel_for(0, nthread, [=](const int id) {
103 this->end_iteration_hook(TaskThreadEnv(nthread, id));
104 });
105
106 }
107#else
108 void iteration(const TaskThreadEnv& env) {
109
110 env.barrier();
112 env.barrier();
113
114 if (nlocal > 0) {
115 int64_t ilo, ihi;
116 A.local_colrange(ilo, ihi);
117
118 int neven = coldim + (coldim&0x1);
119
120 int pairlo = rank*A.coltile()/2;
121
122 int threadid = env.id();
123 int nthread = env.nthread();
124
125 for (int loop=0; loop<(neven-1); ++loop) {
126
127 // This loop is parallelized over threads
128 for (int pair=env.id(); pair<nlocal; pair+=nthread) {
129
130 int rp = neven/2-1-(pair+pairlo);
131 int iii = (rp+loop)%(neven-1);
132 int jjj = (2*neven-2-rp+loop)%(neven-1);
133 if (rp == 0) jjj = neven-1;
134
135 iii = map[iii];
136 jjj = map[jjj];
137
138 if (jptr[pair]) {
139 kernel(iii, jjj, iptr[pair], jptr[pair]);
140 }
141 }
142 env.barrier();
143
144 if (threadid == 0) cycle();
145
146 env.barrier();
147 }
148 }
149
151
152 env.barrier();
153 }
154#endif // HAVE_INTEL_TBB
155
156 /// Call this after iterating to restore correct order of rows in original matrix
157
158 /// At the end of each iteration the matrix rows are logically back in
159 /// their correct order. However, due to indirection to reduce data motion,
160 /// if the local column dimension is not a factor of the number of cycles
161 /// the underlying data may be in a different order. This restores sanity.
162 ///
163 /// Only one thread should invoke this routine
164 void unshuffle() {
165 if (nlocal <= 0) return;
166 Tensor<T>& t = A.data();
167 Tensor<T> tmp(2L, t.dims(), false);
168 T* tp = tmp.ptr();
169 for (int64_t i=0; i<nlocal; ++i) {
170 memcpy(tp+i*rowdim, iptr[i], rowdim*sizeof(T));
171 if (jptr[i]) {
172 memcpy(tp+(i+nlocal)*rowdim, jptr[i], rowdim*sizeof(T));
173 }
174 iptr[i] = &t(i,0);
175 jptr[i] = &t(i+nlocal,0);
176 }
177 memcpy(t.ptr(), tmp.ptr(), t.size()*sizeof(T));
178
179 if (rank==(nproc-1) && (coldim&0x1)) jptr[nlocal-1] = 0;
180 }
181
182 /// Cycles data around the loop ... only one thread should invoke this
183 void cycle() {
184 if (coldim <= 2) return; // No cycling necessary
185 if (nlocal <= 0) { // Nothing local
187 return;
188 }
189
190 // Check assumption that tiling put incomplete tile at the end
191 MADNESS_ASSERT(A.local_coldim() == A.coltile() || rank == (nproc-1));
192
193 const ProcessID left = rank-1; //Invalid values are not used
194 const ProcessID right = rank+1;
195
196 /*
197 Consider matrix (10,*) distributed with coltile=4 over
198 three processors.
199
200 . 0 1 2 3 4 5 6 7 8 9
201
202 This is divided up as follows into this initial
203 configuration for the loop
204
205 . P=0 P=1 P=2
206 . msg msg
207 . i -->0-->1 --> 4-->5 --> 8 -->
208 . ^ | msg
209 . | <---------
210 . j <--2<--3 <-- 6<--7 <--| 9
211 . msg msg
212
213 The first and last processes in the loop have to wrap ... others
214 just pass left and right. Note that 9 stays put.
215
216 Note that the algorithm is assuming distribution puts equal
217 amount of data on all nodes except the last.
218
219 The i data is considered as flowing to the right.
220 The j data is considered as flowing to the left.
221
222
223 Hence, we should explore the pairs in this order
224 (n-1 sets of n/2 pairs)
225
226 . P=0 P=1 P=2
227 . 0 1 4 5 8
228 . 2 3 6 7 9
229
230 . 2 0 1 4 5
231 . 3 6 7 8 9
232
233 . 3 2 0 1 4
234 . 6 7 8 5 9
235
236 . 6 3 2 0 1
237 . 7 8 5 4 9
238
239 . 7 6 3 2 0
240 . 8 5 4 1 9
241
242 . 8 7 6 3 2
243 . 5 4 1 0 9
244
245 . 5 8 7 6 3
246 . 4 1 0 2 9
247
248 . 4 5 8 7 6
249 . 1 0 2 3 9
250
251 . 1 4 5 8 7
252 . 0 2 3 6 9
253 */
254
255 // Copy end elements before they are overwritten
256 T* ilast = iptr[nlocal-1];
257 T* jfirst = jptr[0];
258
259 // Cycle local pointers
260 for (int64_t i=0; i<nlocal-1; ++i) {
261 iptr[nlocal-i-1] = iptr[nlocal-i-2];
262 jptr[i] = jptr[i+1];
263 }
264
265 World& world = A.get_world();
266
267 if (nproc == 1) {
268 iptr[0] = jfirst;
269 jptr[nlocal-2] = ilast;
270 }
271 else if (rank == 0) {
272 iptr[0] = jfirst;
273 world.mpi.Send(ilast, rowdim, right, tag);
274 jptr[nlocal-1] = ilast;
275 world.mpi.Recv(ilast, rowdim, right, tag);
276 }
277 else if (rank == (nproc-1)) {
278 if (nlocal > 1) {
279 iptr[0] = jfirst;
280 jptr[nlocal-2] = ilast;
281 }
282 std::vector<T> buf(rowdim);
283 SafeMPI::Request req = world.mpi.Irecv(&buf[0], rowdim, left, tag);
284 world.mpi.Send(iptr[0], rowdim, left, tag);
285 world.await(req,false);
286 std::memcpy(iptr[0], &buf[0], rowdim*sizeof(T));
287 }
288 else {
289 std::vector<T> buf1(rowdim);
290 std::vector<T> buf2(rowdim);
291 SafeMPI::Request req1 = world.mpi.Irecv(&buf1[0], rowdim, left, tag);
292 SafeMPI::Request req2 = world.mpi.Irecv(&buf2[0], rowdim, right, tag);
293 world.mpi.Send( ilast, rowdim, right, tag);
294 world.mpi.Send(jfirst, rowdim, left, tag);
295 world.await(req1,false);
296 world.await(req2,false);
297 std::memcpy(ilast, &buf2[0], rowdim*sizeof(T)); //world.mpi.Recv( ilast, rowdim, right, tag);
298 std::memcpy(jfirst, &buf1[0], rowdim*sizeof(T)); //world.mpi.Recv(jfirst, rowdim, left, tag);
299
300 iptr[0] = jfirst;
301 jptr[nlocal-1] = ilast;
302 }
303 }
304
305 /// Get the task id
306
307 /// \param id The id to set for this task
308 virtual void get_id(std::pair<void*,unsigned short>& id) const {
310 }
311
312 public:
313 /// A must be a column distributed matrix with an even column tile >= 2
314
315 /// It is assumed that it is the main thread invoking this.
316 /// @param[in,out] A The matrix on which the algorithm is performed and modified in-place
317 /// @param[in] tag The MPI tag used for communication (obtain from \c world.mpi.comm().unique_tag() )
318 /// @param[in] nthread The number of local threads to use (default is main thread all threads in the pool)
320 : A(A)
321 , nproc(A.process_coldim()*A.process_rowdim())
322 , coldim(A.coldim())
323 , rowdim(A.rowdim())
324 , nlocal((A.local_coldim()+1)/2)
325 , rank(A.get_world().rank())
326 , tag(tag)
327 , iptr(nlocal)
328 , jptr(nlocal)
329 , map(coldim+(coldim&0x1))
330 {
332
333 MADNESS_ASSERT(A.is_column_distributed() && (nproc==1 || (A.coltile()&0x1)==0));
334
335 // Initialize vectors of pointers to matrix rows)
336 Tensor<T>& t = A.data();
337
338 //madness::print(nproc, coldim, rowdim, nlocal, rank, tag);
339
340 for (int64_t i=0; i<nlocal; ++i) {
341 iptr[i] = &t(i,0);
342 jptr[i] = &t(i+nlocal,0);
343 }
344
345 // If no. of rows is odd, last process should have an empty last row
346 if (rank==(nproc-1) && (coldim&0x1)) jptr[nlocal-1] = 0;
347
348 // Initialize map from logical index order to actual index order
349
350 int neven = (coldim+1)/2;
351 int ii=0;
352 for (ProcessID p=0; p<nproc; ++p) {
353 int64_t lo, hi;
354 A.get_colrange(p, lo, hi);
355 int p_nlocal = (hi - lo + 2)/2;
356 //print("I think process",p,"has",lo,hi,p_nlocal);
357 for (int i=0; i<p_nlocal; ++i) {
358 map[ii+i] = lo+i;
359 //map[coldim-ii-nlocal+i] = lo+i+nlocal;
360 map[ii+i+neven] = lo+i+p_nlocal;
361 }
362 ii += p_nlocal;
363 }
364
365 std::reverse(map.begin(),map.begin()+neven);
366
367 //print("MAP", map);
368 }
369
371
372 /// Threadsafe routine to apply the operation to rows i and j of the matrix
373
374 /// @param[in] i First row index in the matrix
375 /// @param[in] j Second row index in the matrix
376 /// @param[in] rowi Pointer to row \c i of the matrix (to be modified by kernel in-place)
377 /// @param[in] rowj Pointer to row \c j of the matrix (to be modified by kernel in-place)
378 virtual void kernel(int i, int j, T* rowi, T* rowj) = 0;
379
380
381 /// Invoked simultaneously by all threads after each sweep to test for convergence
382
383 /// There is a thread barrier before and after the invocation of this routine
384 /// @param[in] env The madness thread environment in case synchronization between threads is needed during computation of the convergence condition.
385 virtual bool converged(const TaskThreadEnv& env) const = 0;
386
387
388 /// Invoked by all threads at the start of each iteration
389
390 /// There is a thread barrier before and after the invocation of this routine
391 /// @param[in] env The madness thread environment in case synchronization between threads is needed during startup.
392 virtual void start_iteration_hook(const TaskThreadEnv& env) {}
393
394
395 /// Invoked by all threads at the end of each iteration before convergence test
396
397 /// There is a thread barrier before and after the invocation of this routine.
398 /// Note that the \c converged() method is \c const whereas this can modify the class.
399 /// @param[in] env The madness thread environment in case synchronization between threads is needed during startup.
400 virtual void end_iteration_hook(const TaskThreadEnv& env) {}
401
402
403#ifdef HAVE_INTEL_TBB
404 /// Invoked by the task queue to run the algorithm with multiple threads
405
406 /// This is a collective call ... all processes in world should submit this task
407 void run(World& world, const TaskThreadEnv& env) {
408 const int nthread = env.nthread();
409 bool done = false;
410 do {
411 iteration(nthread);
412 done = tbb::parallel_reduce(tbb::blocked_range<int>(0,nthread), true,
413 [=] (const tbb::blocked_range<int>& range, bool init) -> bool {
414 for(int id = range.begin(); id < range.end(); ++id)
415 init = init &&
416 this->converged(TaskThreadEnv(nthread, id));
417 return init;
418 },
419 [] (const bool l, const bool r) { return l && r; });
420
421 } while (!done);
422
423 unshuffle();
424 }
425#else
426 /// Invoked by the task queue to run the algorithm with multiple threads
427
428 /// This is a collective call ... all processes in world should submit this task
429 void run(World& world, const TaskThreadEnv& env) {
430 do {
431 iteration(env);
432 } while (!converged(env));
433
434 if (env.id() == 0) unshuffle();
435
436 env.barrier();
437 }
438#endif // HAVE_INTEL_TBB
439
440 /// Invoked by the user to run the algorithm with one thread mostly for debugging
441
442 /// This is a collective call ... all processes in world should call this routine.
444 run(A.get_world(), TaskThreadEnv(1,0,0));
445 }
446
447 /// Returns length of row
448 int64_t get_rowdim() const {return rowdim;}
449
450
451 /// Returns length of column
452 int64_t get_coldim() const {return coldim;}
453
454 /// Returns a reference to the world
455 World& get_world() const {
456 return A.get_world();
457 }
458
459 /// Returns rank of this process in the world
461 return rank;
462 }
463 };
464}
465
466#endif
This header should include pretty much everything needed for the parallel runtime.
Definition test_ar.cc:118
Definition safempi.h:289
const long * dims() const
Returns the array of tensor dimensions.
Definition basetensor.h:153
long size() const
Returns the number of elements in the tensor.
Definition basetensor.h:138
Manages data associated with a row/column/block distributed array.
Definition distributed_matrix.h:388
static std::enable_if< detail::function_traits< fnT >::value||detail::memfunc_traits< fnT >::value >::type make_id(std::pair< void *, unsigned short > &id, fnT fn)
Definition thread.h:916
Base class for parallel algorithms that employ a systolic loop to generate all row pairs in parallel.
Definition systolic.h:48
const int64_t nproc
No. of processes with rows of the matrix (not size of world)
Definition systolic.h:51
const int64_t rowdim
A(coldim,rowdim)
Definition systolic.h:53
const int tag
MPI tag to be used for messages.
Definition systolic.h:56
const int64_t nlocal
No. of local pairs.
Definition systolic.h:54
void solve_sequential()
Invoked by the user to run the algorithm with one thread mostly for debugging.
Definition systolic.h:443
World & get_world() const
Returns a reference to the world.
Definition systolic.h:455
DistributedMatrix< T > & A
Definition systolic.h:50
virtual void kernel(int i, int j, T *rowi, T *rowj)=0
Threadsafe routine to apply the operation to rows i and j of the matrix.
virtual void start_iteration_hook(const TaskThreadEnv &env)
Invoked by all threads at the start of each iteration.
Definition systolic.h:392
void iteration(const int nthread)
Definition systolic.h:61
int64_t get_rowdim() const
Returns length of row.
Definition systolic.h:448
std::vector< T * > iptr
Definition systolic.h:57
const int64_t coldim
A(coldim,rowdim)
Definition systolic.h:52
virtual ~SystolicMatrixAlgorithm()
Definition systolic.h:370
virtual bool converged(const TaskThreadEnv &env) const =0
Invoked simultaneously by all threads after each sweep to test for convergence.
virtual void get_id(std::pair< void *, unsigned short > &id) const
Get the task id.
Definition systolic.h:308
const ProcessID rank
Rank of current process.
Definition systolic.h:55
void run(World &world, const TaskThreadEnv &env)
Invoked by the task queue to run the algorithm with multiple threads.
Definition systolic.h:407
ProcessID get_rank() const
Returns rank of this process in the world.
Definition systolic.h:460
void cycle()
Cycles data around the loop ... only one thread should invoke this.
Definition systolic.h:183
SystolicMatrixAlgorithm(DistributedMatrix< T > &A, int tag, int nthread=ThreadPool::size()+1)
A must be a column distributed matrix with an even column tile >= 2.
Definition systolic.h:319
std::vector< T * > jptr
Indirection for implementing cyclic buffer !! SHOULD BE VOLATILE ?????
Definition systolic.h:57
virtual void end_iteration_hook(const TaskThreadEnv &env)
Invoked by all threads at the end of each iteration before convergence test.
Definition systolic.h:400
void unshuffle()
Call this after iterating to restore correct order of rows in original matrix.
Definition systolic.h:164
std::vector< int64_t > map
Used to keep track of actual row indices.
Definition systolic.h:58
int64_t get_coldim() const
Returns length of column.
Definition systolic.h:452
void set_nthread(int nthread)
Set the number of threads.
Definition thread.h:414
All world tasks must be derived from this public interface.
Definition taskfn.h:69
volatile World * world
Definition taskfn.h:72
Used to pass information about the thread environment to a user's task.
Definition thread.h:466
int id() const
Get the ID of this thread.
Definition thread.h:506
bool barrier() const
Definition thread.h:514
int nthread() const
Get the number of threads collaborating on this task.
Definition thread.h:499
A tensor is a multidimension array.
Definition tensor.h:317
T * ptr()
Returns a pointer to the internal data.
Definition tensor.h:1824
static std::size_t size()
Returns the number of threads in the pool.
Definition thread.h:1413
SafeMPI::Request Irecv(T *buf, int count, int source, int tag=SafeMPI::DEFAULT_SEND_RECV_TAG) const
Async receive data of up to count elements from process source.
Definition worldmpi.h:321
void Send(const T *buf, long lenbuf, int dest, int tag=SafeMPI::DEFAULT_SEND_RECV_TAG) const
Send array of lenbuf elements to process dest.
Definition worldmpi.h:347
void Recv(T *buf, long lenbuf, int src, int tag) const
Receive data of up to lenbuf elements from process src.
Definition worldmpi.h:374
A parallel world class.
Definition world.h:132
static void await(SafeMPI::Request &request, bool dowork=true)
Wait for a MPI request to complete.
Definition world.h:516
WorldMpiInterface & mpi
MPI interface.
Definition world.h:202
char * p(char *buf, const char *name, int k, int initial_level, double thresh, int order)
Definition derivatives.cc:72
static double lo
Definition dirac-hatom.cc:23
auto T(World &world, response_space &f) -> response_space
Definition global_functions.cc:34
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
Defines and implements most of Tensor.
int ProcessID
Used to clearly identify process number/rank.
Definition worldtypes.h:43