MADNESS  0.10.1
dqueue.h
Go to the documentation of this file.
1 /*
2  This file is part of MADNESS.
3 
4  Copyright (C) 2007,2010 Oak Ridge National Laboratory
5 
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program; if not, write to the Free Software
18  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 
20  For more information please contact:
21 
22  Robert J. Harrison
23  Oak Ridge National Laboratory
24  One Bethel Valley Road
25  P.O. Box 2008, MS-6367
26 
27  email: harrisonrj@ornl.gov
28  tel: 865-241-3937
29  fax: 865-572-0680
30 
31 
32  $Id$
33 */
34 
35 
36 #ifndef MADNESS_WORLD_DQUEUE_H__INCLUDED
37 #define MADNESS_WORLD_DQUEUE_H__INCLUDED
38 
39 // If defined aggregate q insertions to reduce contention on accessing the q
40 //#define MADNESS_DQ_USE_PREBUF // now in config file
41 
42 // If defined capture stats on dqueue class --- seems to have small overhead
43 #define MADNESS_DQ_STATS
44 
45 #include <algorithm>
46 #include <cstddef>
47 #include <iostream>
48 #include <madness/config.h>
51 #include <stdint.h>
52 #include <utility>
53 
54 /// \file dqueue.h
55 /// \brief Implements DQueue
56 
57 namespace madness {
58 
59  struct DQStats { // Dilly bar, blizzard, ...
60  uint64_t npush_back; ///< #calls to push_back
61  uint64_t npush_front; ///< #calls to push_front
62  uint64_t npop_front; ///< #calls to pop_front
63  uint64_t ngrow; ///< #calls to grow
64  uint64_t nmax; ///< Lifetime max. entries in the queue
65 
67  : npush_back(0), npush_front(0), npop_front(0), ngrow(0), nmax(0) {}
68  };
69 
70 
71  /// A thread safe, fast but simple doubled-ended queue.
72 
73  /// Since the point is speed, the implementation is a circular
74  /// buffer rather than a linked list so as to avoid the new/del
75  /// overhead. It will grow as needed, but presently will not
76  /// shrink. Had to modify STL API to make things thread safe.
77  ///
78  /// It is now rather heavily specialized to its only use.
79  template <typename T>
80  class DQueue : private CONDITION_VARIABLE_TYPE {
81  char pad[64]; ///< To put the lock and the data in separate cache lines
82 
83  // n, sz, buf, _front, _back used to be volatile, but not actually needed since every access
84  // happens with the mutex and its implied barriers.
85  size_t n __attribute__((aligned(64))); ///< Number of elements in the buffer
86  size_t sz; ///< Current capacity
87  T* buf; ///< Actual buffer
88  int _front; ///< Index of element at front of buffer
89  int _back; ///< Index of element at back of buffer
90 
92 
93 #ifdef MADNESS_DQ_USE_PREBUF
94  static const size_t NPREBUF=MADNESS_DQ_PREBUF_SIZE;
95  inline static thread_local T prebuf[NPREBUF] = {T{}}; // relies on this being a singleton class!!!!!!!!!!!!!!!!!!
96  inline static thread_local T prebufhi[NPREBUF] = {T{}}; // relies on this being a singleton class!!!!!!!!!!!!!!!!!!
97  inline static thread_local size_t ninprebuf = 0, ninprebufhi = 0;
98 #endif
99 
100  void grow() {
101  // ASSUME WE ALREADY HAVE THE MUTEX WHEN IN HERE
102 #ifdef MADNESS_DQ_STATS
103  ++(stats.ngrow);
104 #endif
105  if (sz != n) MADNESS_EXCEPTION("assertion failure in dqueue::grow", static_cast<int>(sz));
106  size_t oldsz = sz;
107  if (sz < 32768)
108  sz = 65536;
109  else if (sz <= 1048576)
110  sz *= 2;
111  else
112  sz += 1048576;
113  //volatile T* volatile nbuf = new T[sz];
114  T* nbuf = new T[sz];
115  int lo = sz/2 - oldsz/2;
116  for (int i=_front; i<int(oldsz); ++i,++lo) {
117  nbuf[lo] = buf[i];
118  }
119  if (_front > 0) {
120  for (int i=0; i<=_back; ++i,++lo) {
121  nbuf[lo] = buf[i];
122  }
123  }
124  _front = sz/2 - oldsz/2;
125  _back = _front + n - 1;
126  delete [] buf;
127  buf = nbuf;
128  //sanity_check();
129  }
130 
131  void sanity_check() const {
132  // ASSUME WE ALREADY HAVE THE MUTEX WHEN IN HERE
133  int num = _back - _front + 1;
134  if (num < 0) num += sz;
135  if (num==int(sz) && n==0) num=0;
136  if (num==0 && n==sz) num=sz;
137  //if (long(n) != num) print("size",sz,"front",_front,"back",_back,"n",n,"num",num);
138  MADNESS_ASSERT(long(n) == num);
139  }
140 
141  void push_back_with_lock(const T& value) {
142  size_t nn = n;
143  size_t ss = sz;
144  if (nn == ss) {
145  grow();
146  ss = sz;
147  }
148  ++nn;
149 #ifdef MADNESS_DQ_STATS
150  if (nn > stats.nmax) stats.nmax = nn;
151 #endif
152  n = nn;
153 
154  int b = _back + 1;
155  if (b >= int(ss)) b = 0;
156  buf[b] = value;
157  _back = b;
158 #ifdef MADNESS_DQ_STATS
159  ++(stats.npush_back);
160 #endif
161 
162  signal();
163  }
164 
165  void push_front_with_lock(const T& value) {
166  //sanity_check();
167 
168  size_t nn = n;
169  size_t ss = sz;
170  if (nn == ss) {
171  grow();
172  ss = sz;
173  }
174  ++nn;
175 #ifdef MADNESS_DQ_STATS
176  if (nn > stats.nmax) stats.nmax = nn;
177 #endif
178  n = nn;
179 
180  int f = _front - 1;
181  if (f < 0) f = ss - 1;
182  buf[f] = value;
183  _front = f;
184 #ifdef MADNESS_DQ_STATS
185  ++(stats.npush_front);
186 #endif
187 
188  //sanity_check();
189  signal();
190  //broadcast();
191  }
192 
193  void flush_prebuf() {
194 #ifdef MADNESS_DQ_USE_PREBUF
195  if (ninprebuf) {
196  for (size_t i=0; i<ninprebuf; i++) push_back_with_lock(prebuf[i]);
197  ninprebuf = 0;
198  }
199  if (ninprebufhi) {
200  for (size_t i=0; i<ninprebufhi; i++) push_front_with_lock(prebufhi[i]);
201  ninprebufhi = 0;
202  }
203 #endif
204  }
205 
206  public:
207 
208  void set_wait_policy(WaitPolicy p, int us = 0) {
209 #ifdef USE_SPINLOCKS
211 #endif
212  }
213 
214  DQueue(size_t hint=200000) // was 32768
215  : n(0)
216  , sz(hint>2 ? hint : 2)
217  , buf(new T[sz])
218  , _front(sz/2)
219  , _back(_front-1) {}
220 
221  virtual ~DQueue() {
222  delete [] buf;
223  }
224 
226 
227  /// Insert value at front of queue
228  void push_front(const T& value);
229 
230  /// Insert element at back of queue (default is just one copy)
231  void push_back(const T& value, int ncopy=1);
232 
233  template <typename opT>
234  void scan(opT& op) {
236 
237  int f = _front;
238  size_t nn = n;
239  int size = int(sz);
240  std::cout << "IN Q " << nn << std::endl;
241 
242  while (nn--) {
243  T* p = const_cast<T*>(buf + f);
244  if (!op(p)) break;
245  ++f;
246  if (f >= size) f = 0;
247  }
248  }
249 
250  /// Pop multiple values off the front of queue ... returns number popped ... might be zero
251 
252  /// r must refer to an array of dimension at least nmax ... you are presently
253  /// given no more than max(size()/64,1) values ... arbitrary choice.
254  ///
255  /// multi-threaded tasks might cause fewer tasks to be taken
256  int pop_front(int nmax, T* r, bool wait) {
258  flush_prebuf();
259 
260  size_t nn = n;
261 
262  if (nn==0 && wait) {
263  while (n == 0) // !!! Must be n (memory) not nn (local copy)
264  CONDITION_VARIABLE_TYPE::wait(); // might release then reacquire the lock, acts as barrier
265 
266  nn = n;
267  }
268 
269 #ifdef MADNESS_DQ_STATS
270  ++(stats.npop_front);
271 #endif
272  if (nn) {
273  size_t thesize = sz;
274  //sanity_check();
275 
276  nmax = std::min(nmax,std::max(int(nn>>6),1));
277  int retval; // Will return the number of items taken
278 
279 
280  int f = _front;
281 
282  // Original loop was this
283  //retval = nmax;
284  //while (nmax--) {
285  // *r++ = buf[f++];
286  // if (f >= int(sz)) f = 0;
287  //}
288 
289  // New loop includes checking for replicated multi-threaded task
290  // ... take one task and then check that subsequent tasks differ
291  nmax--;
292  *r++ = buf[f++];
293  if (f >= int(thesize)) f = 0;
294  retval=1;
295  while (nmax--) {
296  T ptr = buf[f];
297  if (ptr == *(r-1)) {
298  break;
299  }
300  else if (ptr) { // Null pointer indicates stolen task
301  *r++ = ptr;
302  ++f;
303  if (f >= int(thesize)) f = 0;
304  ++retval;
305  }
306  }
307 
308  n = nn - retval;
309  _front = f;
310 
311  //sanity_check();
312  return retval;
313  }
314  else {
315  return 0;
316  }
317  }
318 
319  /// Pop value off the front of queue
320  std::pair<T,bool> pop_front(bool wait) {
321  T r;
322  int ngot = pop_front(1, &r, wait);
323  return std::pair<T,bool>(r,ngot==1);
324  }
325 
326  size_t size() const {
327  return n;
328  }
329 
330  bool empty() const;
331 
332  const DQStats& get_stats() const {
333  return stats;
334  }
335  };
336 
337  template <typename T>
339 #ifdef MADNESS_DQ_USE_PREBUF
340  if (ninprebuf+ninprebufhi) {
342  flush_prebuf();
343  }
344 #endif
345  }
346 
347  template <typename T>
348  void DQueue<T>::push_front(const T& value) {
349 #ifdef MADNESS_DQ_USE_PREBUF
350  if (is_madness_thread() && ninprebufhi < NPREBUF) {
351  prebufhi[ninprebufhi++] = value;
352  return;
353  }
354 #endif
355  {
357  push_front_with_lock(value);
358  }
359  }
360 
361  template <typename T>
362  void DQueue<T>::push_back(const T& value, int ncopy) {
363 #ifdef MADNESS_DQ_USE_PREBUF
364  if (is_madness_thread() && ncopy==1 && ninprebuf < NPREBUF) {
365  prebuf[ninprebuf++] = value;
366  return;
367  }
368 #endif
369  {
371  flush_prebuf();
372  //sanity_check();
373  while (ncopy--)
374  push_back_with_lock(value);
375  //sanity_check();
376  //broadcast();
377  }
378  }
379 
380  template <typename T>
381  bool DQueue<T>::empty() const {
382 #ifdef MADNESS_DQ_USE_PREBUF
383  return (ninprebuf+ninprebufhi+n)==0; // this is just from the perspective of this thread!!!!!
384 #else
385  return (n==0);
386 #endif
387  }
388 
389 } // namespace madness
390 
391 #endif // MADNESS_WORLD_DQUEUE_H__INCLUDED
void set_wait_policy(WaitPolicy p, int us=0)
Definition: worldmutex.h:504
A thread safe, fast but simple doubled-ended queue.
Definition: dqueue.h:80
int pop_front(int nmax, T *r, bool wait)
Pop multiple values off the front of queue ... returns number popped ... might be zero.
Definition: dqueue.h:256
const DQStats & get_stats() const
Definition: dqueue.h:332
void flush_prebuf()
Definition: dqueue.h:193
void push_front(const T &value)
Insert value at front of queue.
Definition: dqueue.h:348
virtual ~DQueue()
Definition: dqueue.h:221
size_t sz
Current capacity.
Definition: dqueue.h:86
DQStats stats
Definition: dqueue.h:91
size_t size() const
Definition: dqueue.h:326
std::pair< T, bool > pop_front(bool wait)
Pop value off the front of queue.
Definition: dqueue.h:320
void push_back(const T &value, int ncopy=1)
Insert element at back of queue (default is just one copy)
Definition: dqueue.h:362
void scan(opT &op)
Definition: dqueue.h:234
int _front
Index of element at front of buffer.
Definition: dqueue.h:88
T * buf
Actual buffer.
Definition: dqueue.h:87
size_t n __attribute__((aligned(64)))
Number of elements in the buffer.
bool empty() const
Definition: dqueue.h:381
void push_back_with_lock(const T &value)
Definition: dqueue.h:141
DQueue(size_t hint=200000)
Definition: dqueue.h:214
void push_front_with_lock(const T &value)
Definition: dqueue.h:165
int _back
Index of element at back of buffer.
Definition: dqueue.h:89
void grow()
Definition: dqueue.h:100
void sanity_check() const
Definition: dqueue.h:131
char pad[64]
To put the lock and the data in separate cache lines.
Definition: dqueue.h:81
void lock_and_flush_prebuf()
Definition: dqueue.h:338
void set_wait_policy(WaitPolicy p, int us=0)
Definition: dqueue.h:208
Simple wrapper for Pthread condition variable with its own mutex.
Definition: worldmutex.h:635
void signal() const
Definition: worldmutex.h:673
void wait() const
You should have acquired the mutex before entering here.
Definition: worldmutex.h:669
Mutex that is applied/released at start/end of a scope.
Definition: worldmutex.h:239
char * p(char *buf, const char *name, int k, int initial_level, double thresh, int order)
Definition: derivatives.cc:72
static double lo
Definition: dirac-hatom.cc:23
auto T(World &world, response_space &f) -> response_space
Definition: global_functions.cc:34
Tensor< double > op(const Tensor< double > &x)
Definition: kain.cc:508
#define max(a, b)
Definition: lda.h:51
#define MADNESS_EXCEPTION(msg, value)
Macro for throwing a MADNESS exception.
Definition: madness_exception.h:119
#define MADNESS_ASSERT(condition)
Assert a condition that should be free of side-effects since in release builds this might be a no-op.
Definition: madness_exception.h:134
File holds all helper structures necessary for the CC_Operator and CC2 class.
Definition: DFParameters.h:10
WaitPolicy
wait policies supported by ConditionVariable/DQueue/ThreadPool
Definition: worldmutex.h:492
bool is_madness_thread()
Definition: thread_info.h:70
NDIM & f
Definition: mra.h:2416
static const double b
Definition: nonlinschro.cc:119
Definition: dqueue.h:59
uint64_t ngrow
#calls to grow
Definition: dqueue.h:63
uint64_t npush_front
#calls to push_front
Definition: dqueue.h:61
uint64_t npush_back
#calls to push_back
Definition: dqueue.h:60
DQStats()
Definition: dqueue.h:66
uint64_t nmax
Lifetime max. entries in the queue.
Definition: dqueue.h:64
uint64_t npop_front
#calls to pop_front
Definition: dqueue.h:62
Implements thread introspection.
Implements Mutex, MutexFair, Spinlock, ConditionVariable.