MADNESS 0.10.1
dqueue.h
Go to the documentation of this file.
1/*
2 This file is part of MADNESS.
3
4 Copyright (C) 2007,2010 Oak Ridge National Laboratory
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
20 For more information please contact:
21
22 Robert J. Harrison
23 Oak Ridge National Laboratory
24 One Bethel Valley Road
25 P.O. Box 2008, MS-6367
26
27 email: harrisonrj@ornl.gov
28 tel: 865-241-3937
29 fax: 865-572-0680
30
31
32 $Id$
33*/
34
35
36#ifndef MADNESS_WORLD_DQUEUE_H__INCLUDED
37#define MADNESS_WORLD_DQUEUE_H__INCLUDED
38
39// If defined aggregate q insertions to reduce contention on accessing the q
40//#define MADNESS_DQ_USE_PREBUF // now in config file
41
42// If defined capture stats on dqueue class --- seems to have small overhead
43#define MADNESS_DQ_STATS
44
45#include <algorithm>
46#include <cstddef>
47#include <iostream>
48#include <madness/config.h>
51#include <stdint.h>
52#include <utility>
53
54/// \file dqueue.h
55/// \brief Implements DQueue
56
57namespace madness {
58
59 struct DQStats { // Dilly bar, blizzard, ...
60 uint64_t npush_back; ///< #calls to push_back
61 uint64_t npush_front; ///< #calls to push_front
62 uint64_t npop_front; ///< #calls to pop_front
63 uint64_t ngrow; ///< #calls to grow
64 uint64_t nmax; ///< Lifetime max. entries in the queue
65
67 : npush_back(0), npush_front(0), npop_front(0), ngrow(0), nmax(0) {}
68 };
69
70
71 /// A thread safe, fast but simple doubled-ended queue.
72
73 /// Since the point is speed, the implementation is a circular
74 /// buffer rather than a linked list so as to avoid the new/del
75 /// overhead. It will grow as needed, but presently will not
76 /// shrink. Had to modify STL API to make things thread safe.
77 ///
78 /// It is now rather heavily specialized to its only use.
79 template <typename T>
81 char pad[64]; ///< To put the lock and the data in separate cache lines
82
83 // n, sz, buf, _front, _back used to be volatile, but not actually needed since every access
84 // happens with the mutex and its implied barriers.
85 size_t n __attribute__((aligned(64))); ///< Number of elements in the buffer
86 size_t sz; ///< Current capacity
87 T* buf; ///< Actual buffer
88 int _front; ///< Index of element at front of buffer
89 int _back; ///< Index of element at back of buffer
90
92
93#ifdef MADNESS_DQ_USE_PREBUF
94 static const size_t NPREBUF=MADNESS_DQ_PREBUF_SIZE;
95 inline static thread_local T prebuf[NPREBUF] = {T{}}; // relies on this being a singleton class!!!!!!!!!!!!!!!!!!
96 inline static thread_local T prebufhi[NPREBUF] = {T{}}; // relies on this being a singleton class!!!!!!!!!!!!!!!!!!
97 inline static thread_local size_t ninprebuf = 0, ninprebufhi = 0;
98 static auto prebuf_info() { return std::make_tuple(ninprebuf, prebuf, ninprebufhi, prebufhi); }
99#endif
100
101 void grow() {
102 // ASSUME WE ALREADY HAVE THE MUTEX WHEN IN HERE
103#ifdef MADNESS_DQ_STATS
104 ++(stats.ngrow);
105#endif
106 if (sz != n) MADNESS_EXCEPTION("assertion failure in dqueue::grow", static_cast<int>(sz));
107 size_t oldsz = sz;
108 if (sz < 32768)
109 sz = 65536;
110 else if (sz <= 1048576)
111 sz *= 2;
112 else
113 sz += 1048576;
114 //volatile T* volatile nbuf = new T[sz];
115 T* nbuf = new T[sz];
116 int lo = sz/2 - oldsz/2;
117 for (int i=_front; i<int(oldsz); ++i,++lo) {
118 nbuf[lo] = buf[i];
119 }
120 if (_front > 0) {
121 for (int i=0; i<=_back; ++i,++lo) {
122 nbuf[lo] = buf[i];
123 }
124 }
125 _front = sz/2 - oldsz/2;
126 _back = _front + n - 1;
127 delete [] buf;
128 buf = nbuf;
129 //sanity_check();
130 }
131
132 void sanity_check() const {
133 // ASSUME WE ALREADY HAVE THE MUTEX WHEN IN HERE
134 int num = _back - _front + 1;
135 if (num < 0) num += sz;
136 if (num==int(sz) && n==0) num=0;
137 if (num==0 && n==sz) num=sz;
138 //if (long(n) != num) print("size",sz,"front",_front,"back",_back,"n",n,"num",num);
139 MADNESS_ASSERT(long(n) == num);
140 }
141
142 void push_back_with_lock(const T& value) {
143 size_t nn = n;
144 size_t ss = sz;
145 if (nn == ss) {
146 grow();
147 ss = sz;
148 }
149 ++nn;
150#ifdef MADNESS_DQ_STATS
151 if (nn > stats.nmax) stats.nmax = nn;
152#endif
153 n = nn;
154
155 int b = _back + 1;
156 if (b >= int(ss)) b = 0;
157 buf[b] = value;
158 _back = b;
159#ifdef MADNESS_DQ_STATS
160 ++(stats.npush_back);
161#endif
162
163 signal();
164 }
165
166 void push_front_with_lock(const T& value) {
167 //sanity_check();
168
169 size_t nn = n;
170 size_t ss = sz;
171 if (nn == ss) {
172 grow();
173 ss = sz;
174 }
175 ++nn;
176#ifdef MADNESS_DQ_STATS
177 if (nn > stats.nmax) stats.nmax = nn;
178#endif
179 n = nn;
180
181 int f = _front - 1;
182 if (f < 0) f = ss - 1;
183 buf[f] = value;
184 _front = f;
185#ifdef MADNESS_DQ_STATS
186 ++(stats.npush_front);
187#endif
188
189 //sanity_check();
190 signal();
191 //broadcast();
192 }
193
195#ifdef MADNESS_DQ_USE_PREBUF
196 if (ninprebufhi) {
197 // in reverse order of insertion
198 for (size_t i=ninprebufhi; i!=0;) {
199 push_front_with_lock(prebufhi[--i]);
200 }
201 ninprebufhi = 0;
202 }
203#endif
204 }
205
207#ifdef MADNESS_DQ_USE_PREBUF
208 if (ninprebuf) {
209 for (size_t i=0; i<ninprebuf; i++) push_back_with_lock(prebuf[i]);
210 ninprebuf = 0;
211 }
212#endif
213 }
214
218 }
219
220 public:
221
222 void set_wait_policy(WaitPolicy p, int us = 0) {
223#ifdef USE_SPINLOCKS
225#endif
226 }
227
228 DQueue(size_t hint=200000) // was 32768
229 : n(0)
230 , sz(hint>2 ? hint : 2)
231 , buf(new T[sz])
232 , _front(sz/2)
233 , _back(_front-1) {}
234
235 virtual ~DQueue() {
236 delete [] buf;
237 }
238
240
241 /// Insert value at front of queue
242 void push_front(const T& value);
243
244 /// Insert element at back of queue (default is just one copy)
245 void push_back(const T& value, int ncopy=1);
246
247 template <typename opT>
248 void scan(opT& op) {
250
251 int f = _front;
252 size_t nn = n;
253 int size = int(sz);
254 std::cout << "IN Q " << nn << std::endl;
255
256 while (nn--) {
257 T* p = const_cast<T*>(buf + f);
258 if (!op(p)) break;
259 ++f;
260 if (f >= size) f = 0;
261 }
262 }
263
264 /// Pop multiple values off the front of queue ... returns number popped ... might be zero
265
266 /// r must refer to an array of dimension at least nmax ... you are presently
267 /// given no more than max(size()/64,1) values ... arbitrary choice.
268 ///
269 /// multi-threaded tasks might cause fewer tasks to be taken
270 int pop_front(int nmax, T* r, bool wait) {
272 flush_prebuf();
273
274 size_t nn = n;
275
276 if (nn==0 && wait) {
277 while (n == 0) // !!! Must be n (memory) not nn (local copy)
278 CONDITION_VARIABLE_TYPE::wait(); // might release then reacquire the lock, acts as barrier
279
280 nn = n;
281 }
282
283#ifdef MADNESS_DQ_STATS
284 ++(stats.npop_front);
285#endif
286 if (nn) {
287 size_t thesize = sz;
288 //sanity_check();
289
290 nmax = std::min(nmax,std::max(int(nn>>6),1));
291 int retval; // Will return the number of items taken
292
293
294 int f = _front;
295
296 // Original loop was this
297 //retval = nmax;
298 //while (nmax--) {
299 // *r++ = buf[f++];
300 // if (f >= int(sz)) f = 0;
301 //}
302
303 // New loop includes checking for replicated multi-threaded task
304 // ... take one task and then check that subsequent tasks differ
305 nmax--;
306 *r++ = buf[f++];
307 if (f >= int(thesize)) f = 0;
308 retval=1;
309 while (nmax--) {
310 T ptr = buf[f];
311 if (ptr == *(r-1)) {
312 break;
313 }
314 else if (ptr) { // Null pointer indicates stolen task
315 *r++ = ptr;
316 ++f;
317 if (f >= int(thesize)) f = 0;
318 ++retval;
319 }
320 }
321
322 n = nn - retval;
323 _front = f;
324
325 //sanity_check();
326 return retval;
327 }
328 else {
329 return 0;
330 }
331 }
332
333 /// Pop value off the front of queue
334 std::pair<T,bool> pop_front(bool wait) {
335 T r;
336 int ngot = pop_front(1, &r, wait);
337 return std::pair<T,bool>(r,ngot==1);
338 }
339
340 size_t size() const {
341 return n;
342 }
343
344 bool empty() const;
345
346 const DQStats& get_stats() const {
347 return stats;
348 }
349 };
350
351 template <typename T>
353#ifdef MADNESS_DQ_USE_PREBUF
354 MADNESS_ASSERT(ninprebufhi <= NPREBUF && ninprebuf <= NPREBUF);
355 if (ninprebuf+ninprebufhi) {
357 flush_prebuf();
358 }
359#endif
360 }
361
362 template <typename T>
363 void DQueue<T>::push_front(const T& value) {
364#ifdef MADNESS_DQ_USE_PREBUF
365 if (is_madness_thread() && ninprebufhi < NPREBUF) {
366 prebufhi[ninprebufhi++] = value;
367 return;
368 }
369 MADNESS_ASSERT(ninprebufhi <= NPREBUF && ninprebuf <= NPREBUF);
370#endif
371 {
373 push_front_with_lock(value);
374 flush_prebufhi();
375 }
376 }
377
378 template <typename T>
379 void DQueue<T>::push_back(const T& value, int ncopy) {
380#ifdef MADNESS_DQ_USE_PREBUF
381 if (is_madness_thread() && ncopy==1 && ninprebuf < NPREBUF) {
382 prebuf[ninprebuf++] = value;
383 return;
384 }
385 MADNESS_ASSERT(ninprebufhi <= NPREBUF && ninprebuf <= NPREBUF);
386#endif
387 {
389 flush_prebuf();
390 //sanity_check();
391 while (ncopy--)
392 push_back_with_lock(value);
393 //sanity_check();
394 //broadcast();
395 }
396 }
397
398 template <typename T>
399 bool DQueue<T>::empty() const {
400#ifdef MADNESS_DQ_USE_PREBUF
401 return (ninprebuf+ninprebufhi+n)==0; // this is just from the perspective of this thread!!!!!
402#else
403 return (n==0);
404#endif
405 }
406
407} // namespace madness
408
409#endif // MADNESS_WORLD_DQUEUE_H__INCLUDED
void set_wait_policy(WaitPolicy p, int us=0)
Definition worldmutex.h:504
A thread safe, fast but simple doubled-ended queue.
Definition dqueue.h:80
int pop_front(int nmax, T *r, bool wait)
Pop multiple values off the front of queue ... returns number popped ... might be zero.
Definition dqueue.h:270
void flush_prebuf()
Definition dqueue.h:215
const DQStats & get_stats() const
Definition dqueue.h:346
void push_front(const T &value)
Insert value at front of queue.
Definition dqueue.h:363
virtual ~DQueue()
Definition dqueue.h:235
size_t sz
Current capacity.
Definition dqueue.h:86
void flush_prebufhi()
Definition dqueue.h:194
void flush_prebuflo()
Definition dqueue.h:206
std::pair< T, bool > pop_front(bool wait)
Pop value off the front of queue.
Definition dqueue.h:334
DQStats stats
Definition dqueue.h:91
size_t size() const
Definition dqueue.h:340
void push_back(const T &value, int ncopy=1)
Insert element at back of queue (default is just one copy)
Definition dqueue.h:379
void scan(opT &op)
Definition dqueue.h:248
int _front
Index of element at front of buffer.
Definition dqueue.h:88
T * buf
Actual buffer.
Definition dqueue.h:87
size_t n __attribute__((aligned(64)))
Number of elements in the buffer.
bool empty() const
Definition dqueue.h:399
void push_back_with_lock(const T &value)
Definition dqueue.h:142
DQueue(size_t hint=200000)
Definition dqueue.h:228
void push_front_with_lock(const T &value)
Definition dqueue.h:166
int _back
Index of element at back of buffer.
Definition dqueue.h:89
void grow()
Definition dqueue.h:101
void sanity_check() const
Definition dqueue.h:132
char pad[64]
To put the lock and the data in separate cache lines.
Definition dqueue.h:81
void lock_and_flush_prebuf()
Definition dqueue.h:352
void set_wait_policy(WaitPolicy p, int us=0)
Definition dqueue.h:222
Simple wrapper for Pthread condition variable with its own mutex.
Definition worldmutex.h:635
void signal() const
Definition worldmutex.h:673
void wait() const
You should have acquired the mutex before entering here.
Definition worldmutex.h:669
Mutex that is applied/released at start/end of a scope.
Definition worldmutex.h:239
char * p(char *buf, const char *name, int k, int initial_level, double thresh, int order)
Definition derivatives.cc:72
static double lo
Definition dirac-hatom.cc:23
auto T(World &world, response_space &f) -> response_space
Definition global_functions.cc:34
Tensor< double > op(const Tensor< double > &x)
Definition kain.cc:508
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
WaitPolicy
wait policies supported by ConditionVariable/DQueue/ThreadPool
Definition worldmutex.h:492
bool is_madness_thread()
Definition thread_info.h:70
NDIM & f
Definition mra.h:2481
static const double b
Definition nonlinschro.cc:119
Definition dqueue.h:59
uint64_t ngrow
#calls to grow
Definition dqueue.h:63
uint64_t npush_front
#calls to push_front
Definition dqueue.h:61
uint64_t npush_back
#calls to push_back
Definition dqueue.h:60
DQStats()
Definition dqueue.h:66
uint64_t nmax
Lifetime max. entries in the queue.
Definition dqueue.h:64
uint64_t npop_front
#calls to pop_front
Definition dqueue.h:62
Implements thread introspection for Pthreads backend.
Implements Mutex, MutexFair, Spinlock, ConditionVariable.