MADNESS 0.10.1
dqueue.h
Go to the documentation of this file.
1/*
2 This file is part of MADNESS.
3
4 Copyright (C) 2007,2010 Oak Ridge National Laboratory
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
20 For more information please contact:
21
22 Robert J. Harrison
23 Oak Ridge National Laboratory
24 One Bethel Valley Road
25 P.O. Box 2008, MS-6367
26
27 email: harrisonrj@ornl.gov
28 tel: 865-241-3937
29 fax: 865-572-0680
30
31
32 $Id$
33*/
34
35
36#ifndef MADNESS_WORLD_DQUEUE_H__INCLUDED
37#define MADNESS_WORLD_DQUEUE_H__INCLUDED
38
39// If defined aggregate q insertions to reduce contention on accessing the q
40//#define MADNESS_DQ_USE_PREBUF // now in config file
41
42// If defined capture stats on dqueue class --- seems to have small overhead
43#define MADNESS_DQ_STATS
44
45#include <algorithm>
46#include <cstddef>
47#include <iostream>
48#include <madness/config.h>
51#include <stdint.h>
52#include <utility>
53
54/// \file dqueue.h
55/// \brief Implements DQueue
56
57namespace madness {
58
59 struct DQStats { // Dilly bar, blizzard, ...
60 uint64_t npush_back; ///< #calls to push_back
61 uint64_t npush_front; ///< #calls to push_front
62 uint64_t npop_front; ///< #calls to pop_front
63 uint64_t ngrow; ///< #calls to grow
64 uint64_t nmax; ///< Lifetime max. entries in the queue
65
67 : npush_back(0), npush_front(0), npop_front(0), ngrow(0), nmax(0) {}
68 };
69
70
71 /// A thread safe, fast but simple doubled-ended queue.
72
73 /// Since the point is speed, the implementation is a circular
74 /// buffer rather than a linked list so as to avoid the new/del
75 /// overhead. It will grow as needed, but presently will not
76 /// shrink. Had to modify STL API to make things thread safe.
77 ///
78 /// It is now rather heavily specialized to its only use.
79 template <typename T>
81 char pad[64]; ///< To put the lock and the data in separate cache lines
82
83 // n, sz, buf, _front, _back used to be volatile, but not actually needed since every access
84 // happens with the mutex and its implied barriers.
85 size_t n __attribute__((aligned(64))); ///< Number of elements in the buffer
86 size_t sz; ///< Current capacity
87 T* buf; ///< Actual buffer
88 int _front; ///< Index of element at front of buffer
89 int _back; ///< Index of element at back of buffer
90
92
93#ifdef MADNESS_DQ_USE_PREBUF
94 static const size_t NPREBUF=MADNESS_DQ_PREBUF_SIZE;
95 inline static thread_local T prebuf[NPREBUF] = {T{}}; // relies on this being a singleton class!!!!!!!!!!!!!!!!!!
96 inline static thread_local T prebufhi[NPREBUF] = {T{}}; // relies on this being a singleton class!!!!!!!!!!!!!!!!!!
97 inline static thread_local size_t ninprebuf = 0, ninprebufhi = 0;
98#endif
99
100 void grow() {
101 // ASSUME WE ALREADY HAVE THE MUTEX WHEN IN HERE
102#ifdef MADNESS_DQ_STATS
103 ++(stats.ngrow);
104#endif
105 if (sz != n) MADNESS_EXCEPTION("assertion failure in dqueue::grow", static_cast<int>(sz));
106 size_t oldsz = sz;
107 if (sz < 32768)
108 sz = 65536;
109 else if (sz <= 1048576)
110 sz *= 2;
111 else
112 sz += 1048576;
113 //volatile T* volatile nbuf = new T[sz];
114 T* nbuf = new T[sz];
115 int lo = sz/2 - oldsz/2;
116 for (int i=_front; i<int(oldsz); ++i,++lo) {
117 nbuf[lo] = buf[i];
118 }
119 if (_front > 0) {
120 for (int i=0; i<=_back; ++i,++lo) {
121 nbuf[lo] = buf[i];
122 }
123 }
124 _front = sz/2 - oldsz/2;
125 _back = _front + n - 1;
126 delete [] buf;
127 buf = nbuf;
128 //sanity_check();
129 }
130
131 void sanity_check() const {
132 // ASSUME WE ALREADY HAVE THE MUTEX WHEN IN HERE
133 int num = _back - _front + 1;
134 if (num < 0) num += sz;
135 if (num==int(sz) && n==0) num=0;
136 if (num==0 && n==sz) num=sz;
137 //if (long(n) != num) print("size",sz,"front",_front,"back",_back,"n",n,"num",num);
138 MADNESS_ASSERT(long(n) == num);
139 }
140
141 void push_back_with_lock(const T& value) {
142 size_t nn = n;
143 size_t ss = sz;
144 if (nn == ss) {
145 grow();
146 ss = sz;
147 }
148 ++nn;
149#ifdef MADNESS_DQ_STATS
150 if (nn > stats.nmax) stats.nmax = nn;
151#endif
152 n = nn;
153
154 int b = _back + 1;
155 if (b >= int(ss)) b = 0;
156 buf[b] = value;
157 _back = b;
158#ifdef MADNESS_DQ_STATS
159 ++(stats.npush_back);
160#endif
161
162 signal();
163 }
164
165 void push_front_with_lock(const T& value) {
166 //sanity_check();
167
168 size_t nn = n;
169 size_t ss = sz;
170 if (nn == ss) {
171 grow();
172 ss = sz;
173 }
174 ++nn;
175#ifdef MADNESS_DQ_STATS
176 if (nn > stats.nmax) stats.nmax = nn;
177#endif
178 n = nn;
179
180 int f = _front - 1;
181 if (f < 0) f = ss - 1;
182 buf[f] = value;
183 _front = f;
184#ifdef MADNESS_DQ_STATS
185 ++(stats.npush_front);
186#endif
187
188 //sanity_check();
189 signal();
190 //broadcast();
191 }
192
194#ifdef MADNESS_DQ_USE_PREBUF
195 if (ninprebuf) {
196 for (size_t i=0; i<ninprebuf; i++) push_back_with_lock(prebuf[i]);
197 ninprebuf = 0;
198 }
199 if (ninprebufhi) {
200 for (size_t i=0; i<ninprebufhi; i++) push_front_with_lock(prebufhi[i]);
201 ninprebufhi = 0;
202 }
203#endif
204 }
205
206 public:
207
208 void set_wait_policy(WaitPolicy p, int us = 0) {
209#ifdef USE_SPINLOCKS
211#endif
212 }
213
214 DQueue(size_t hint=200000) // was 32768
215 : n(0)
216 , sz(hint>2 ? hint : 2)
217 , buf(new T[sz])
218 , _front(sz/2)
219 , _back(_front-1) {}
220
221 virtual ~DQueue() {
222 delete [] buf;
223 }
224
226
227 /// Insert value at front of queue
228 void push_front(const T& value);
229
230 /// Insert element at back of queue (default is just one copy)
231 void push_back(const T& value, int ncopy=1);
232
233 template <typename opT>
234 void scan(opT& op) {
236
237 int f = _front;
238 size_t nn = n;
239 int size = int(sz);
240 std::cout << "IN Q " << nn << std::endl;
241
242 while (nn--) {
243 T* p = const_cast<T*>(buf + f);
244 if (!op(p)) break;
245 ++f;
246 if (f >= size) f = 0;
247 }
248 }
249
250 /// Pop multiple values off the front of queue ... returns number popped ... might be zero
251
252 /// r must refer to an array of dimension at least nmax ... you are presently
253 /// given no more than max(size()/64,1) values ... arbitrary choice.
254 ///
255 /// multi-threaded tasks might cause fewer tasks to be taken
256 int pop_front(int nmax, T* r, bool wait) {
258 flush_prebuf();
259
260 size_t nn = n;
261
262 if (nn==0 && wait) {
263 while (n == 0) // !!! Must be n (memory) not nn (local copy)
264 CONDITION_VARIABLE_TYPE::wait(); // might release then reacquire the lock, acts as barrier
265
266 nn = n;
267 }
268
269#ifdef MADNESS_DQ_STATS
270 ++(stats.npop_front);
271#endif
272 if (nn) {
273 size_t thesize = sz;
274 //sanity_check();
275
276 nmax = std::min(nmax,std::max(int(nn>>6),1));
277 int retval; // Will return the number of items taken
278
279
280 int f = _front;
281
282 // Original loop was this
283 //retval = nmax;
284 //while (nmax--) {
285 // *r++ = buf[f++];
286 // if (f >= int(sz)) f = 0;
287 //}
288
289 // New loop includes checking for replicated multi-threaded task
290 // ... take one task and then check that subsequent tasks differ
291 nmax--;
292 *r++ = buf[f++];
293 if (f >= int(thesize)) f = 0;
294 retval=1;
295 while (nmax--) {
296 T ptr = buf[f];
297 if (ptr == *(r-1)) {
298 break;
299 }
300 else if (ptr) { // Null pointer indicates stolen task
301 *r++ = ptr;
302 ++f;
303 if (f >= int(thesize)) f = 0;
304 ++retval;
305 }
306 }
307
308 n = nn - retval;
309 _front = f;
310
311 //sanity_check();
312 return retval;
313 }
314 else {
315 return 0;
316 }
317 }
318
319 /// Pop value off the front of queue
320 std::pair<T,bool> pop_front(bool wait) {
321 T r;
322 int ngot = pop_front(1, &r, wait);
323 return std::pair<T,bool>(r,ngot==1);
324 }
325
326 size_t size() const {
327 return n;
328 }
329
330 bool empty() const;
331
332 const DQStats& get_stats() const {
333 return stats;
334 }
335 };
336
337 template <typename T>
339#ifdef MADNESS_DQ_USE_PREBUF
340 if (ninprebuf+ninprebufhi) {
342 flush_prebuf();
343 }
344#endif
345 }
346
347 template <typename T>
348 void DQueue<T>::push_front(const T& value) {
349#ifdef MADNESS_DQ_USE_PREBUF
350 if (is_madness_thread() && ninprebufhi < NPREBUF) {
351 prebufhi[ninprebufhi++] = value;
352 return;
353 }
354#endif
355 {
357 push_front_with_lock(value);
358 }
359 }
360
361 template <typename T>
362 void DQueue<T>::push_back(const T& value, int ncopy) {
363#ifdef MADNESS_DQ_USE_PREBUF
364 if (is_madness_thread() && ncopy==1 && ninprebuf < NPREBUF) {
365 prebuf[ninprebuf++] = value;
366 return;
367 }
368#endif
369 {
371 flush_prebuf();
372 //sanity_check();
373 while (ncopy--)
374 push_back_with_lock(value);
375 //sanity_check();
376 //broadcast();
377 }
378 }
379
380 template <typename T>
381 bool DQueue<T>::empty() const {
382#ifdef MADNESS_DQ_USE_PREBUF
383 return (ninprebuf+ninprebufhi+n)==0; // this is just from the perspective of this thread!!!!!
384#else
385 return (n==0);
386#endif
387 }
388
389} // namespace madness
390
391#endif // MADNESS_WORLD_DQUEUE_H__INCLUDED
void set_wait_policy(WaitPolicy p, int us=0)
Definition worldmutex.h:504
A thread safe, fast but simple doubled-ended queue.
Definition dqueue.h:80
int pop_front(int nmax, T *r, bool wait)
Pop multiple values off the front of queue ... returns number popped ... might be zero.
Definition dqueue.h:256
void flush_prebuf()
Definition dqueue.h:193
const DQStats & get_stats() const
Definition dqueue.h:332
void push_front(const T &value)
Insert value at front of queue.
Definition dqueue.h:348
virtual ~DQueue()
Definition dqueue.h:221
size_t sz
Current capacity.
Definition dqueue.h:86
std::pair< T, bool > pop_front(bool wait)
Pop value off the front of queue.
Definition dqueue.h:320
DQStats stats
Definition dqueue.h:91
size_t size() const
Definition dqueue.h:326
void push_back(const T &value, int ncopy=1)
Insert element at back of queue (default is just one copy)
Definition dqueue.h:362
void scan(opT &op)
Definition dqueue.h:234
int _front
Index of element at front of buffer.
Definition dqueue.h:88
T * buf
Actual buffer.
Definition dqueue.h:87
size_t n __attribute__((aligned(64)))
Number of elements in the buffer.
bool empty() const
Definition dqueue.h:381
void push_back_with_lock(const T &value)
Definition dqueue.h:141
DQueue(size_t hint=200000)
Definition dqueue.h:214
void push_front_with_lock(const T &value)
Definition dqueue.h:165
int _back
Index of element at back of buffer.
Definition dqueue.h:89
void grow()
Definition dqueue.h:100
void sanity_check() const
Definition dqueue.h:131
char pad[64]
To put the lock and the data in separate cache lines.
Definition dqueue.h:81
void lock_and_flush_prebuf()
Definition dqueue.h:338
void set_wait_policy(WaitPolicy p, int us=0)
Definition dqueue.h:208
Simple wrapper for Pthread condition variable with its own mutex.
Definition worldmutex.h:635
void signal() const
Definition worldmutex.h:673
void wait() const
You should have acquired the mutex before entering here.
Definition worldmutex.h:669
Mutex that is applied/released at start/end of a scope.
Definition worldmutex.h:239
char * p(char *buf, const char *name, int k, int initial_level, double thresh, int order)
Definition derivatives.cc:72
static double lo
Definition dirac-hatom.cc:23
auto T(World &world, response_space &f) -> response_space
Definition global_functions.cc:34
Tensor< double > op(const Tensor< double > &x)
Definition kain.cc:508
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition madness_exception.h:134
Namespace for all elements and tools of MADNESS.
Definition DFParameters.h:10
WaitPolicy
wait policies supported by ConditionVariable/DQueue/ThreadPool
Definition worldmutex.h:492
bool is_madness_thread()
Definition thread_info.h:70
NDIM & f
Definition mra.h:2416
static const double b
Definition nonlinschro.cc:119
Definition dqueue.h:59
uint64_t ngrow
#calls to grow
Definition dqueue.h:63
uint64_t npush_front
#calls to push_front
Definition dqueue.h:61
uint64_t npush_back
#calls to push_back
Definition dqueue.h:60
DQStats()
Definition dqueue.h:66
uint64_t nmax
Lifetime max. entries in the queue.
Definition dqueue.h:64
uint64_t npop_front
#calls to pop_front
Definition dqueue.h:62
Implements thread introspection.
Implements Mutex, MutexFair, Spinlock, ConditionVariable.