threadsafe_queue.h

Go to the documentation of this file.
00001 /*
00002  * threadsafe_queue.h
00003  * 
00004  * Copyright (C) 2007,2009  Thomas A. Vaughan
00005  * All rights reserved.
00006  *
00007  *
00008  * Redistribution and use in source and binary forms, with or without
00009  * modification, are permitted provided that the following conditions are met:
00010  *     * Redistributions of source code must retain the above copyright
00011  *       notice, this list of conditions and the following disclaimer.
00012  *     * Redistributions in binary form must reproduce the above copyright
00013  *       notice, this list of conditions and the following disclaimer in the
00014  *       documentation and/or other materials provided with the distribution.
00015  *     * Neither the name of the <organization> nor the
00016  *       names of its contributors may be used to endorse or promote products
00017  *       derived from this software without specific prior written permission.
00018  *
00019  * THIS SOFTWARE IS PROVIDED BY THOMAS A. VAUGHAN ''AS IS'' AND ANY
00020  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00021  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00022  * DISCLAIMED. IN NO EVENT SHALL THOMAS A. VAUGHAN BE LIABLE FOR ANY
00023  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00024  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00025  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00026  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00027  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00028  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00029  *
00030  *
00031  * Implementation of a simple threadsafe queue.
00032  */
00033 
00034 #ifndef WAVEPACKET_THREADSAFE_QUEUE_H__
00035 #define WAVEPACKET_THREADSAFE_QUEUE_H__
00036 
00037 // includes --------------------------------------------------------------------
00038 #include "smart_mutex.h"
00039 
00040 #include <deque>
00041 
00042 
00043 ///\ingroup threadsafe
00044 /*@{*/
00045 
00046 
00047 /// A threadsafe FIFO queue.  Supports basic push/pop operations, and iteration.
00048 template <class T>
00049 class threadsafe_queue {
00050 private:
00051         // private typedefs ----------------------------------------------------
00052         typedef typename std::deque<T>::iterator        iterator;
00053         typedef threadsafe_queue<T>                     queue_t;
00054         struct real_iter_t {
00055                 iterator                iter;
00056                 dword_t                 rvn;
00057         };
00058 
00059 public:
00060         // public typedefs -----------------------------------------------------
00061         struct iterator_t {
00062         protected:
00063                 byte_t          opaque[sizeof(real_iter_t)];
00064         };
00065 
00066         /// push an element onto the back of the queue
00067         void push_back(IN T& t) {
00068                         mlock l(m_mutex);
00069                         ++m_rvn;        // queue has changed!  increment version
00070                         m_queue.push_back(t);
00071                 }
00072 
00073         /// pop an element from the front of the queue.
00074         /// Returns false if the queue is empty
00075         bool pop_front(OUT T& t) {
00076                         bool retval = false;
00077                         mlock l(m_mutex);
00078                         if (m_queue.size()) {
00079                                 ++m_rvn;        // queue has changed!
00080                                 retval = true;
00081                                 t = m_queue.front();
00082                                 m_queue.pop_front();
00083                         }
00084                         return retval;
00085                 }
00086 
00087         /// returns the number of elements in the queue
00088         int size(void) const throw() {
00089                         queue_t * pThis = this->getNonConst();
00090                         mlock l(pThis->m_mutex);
00091                         return pThis->m_queue.size();
00092                 }
00093 
00094         /// resets the given iterator to point to the beginning of the queue
00095         void getIterator(OUT iterator_t& i) const throw() {
00096                         queue_t * pThis = this->getNonConst();
00097                         real_iter_t * ri = getRealIter(i);
00098                         mlock l(pThis->m_mutex);
00099                         ri->rvn = m_rvn;
00100                         ri->iter = pThis->m_queue.begin();
00101                 }
00102 
00103         /// gets element pointed to by iterator, and increments iterator
00104         bool getNextElement(IO iterator_t& i, OUT T& t) const {
00105                         queue_t * pThis = this->getNonConst();
00106                         real_iter_t * ri = getRealIter(i);
00107                         mlock l(pThis->m_mutex);
00108                         if (ri->rvn != m_rvn) {
00109                                 return false;   // iterator has been invalidated
00110                         }
00111                         if (pThis->m_queue.end() == ri->iter) {
00112                                 return false;   // already at end of queue
00113                         }
00114 
00115                         // iterator is valid!  get element and increment
00116                         t = *ri->iter;
00117                         ++ri->iter;
00118                         return true;
00119                 }
00120 
00121         /// for debugging/validation only
00122         static int getRealIteratorSize(void) throw() {
00123                         return sizeof(real_iter_t);
00124                 }
00125 
00126 private:
00127         // private helper methods ----------------------------------------------
00128         static real_iter_t * getRealIter(IN iterator_t& i) throw() {
00129                         return (real_iter_t *) &i;
00130                 }
00131 
00132         // really ugly method to retrieve a non-const pointer.  This is because
00133         //      read-only APIs need to take the mutex, even though they
00134         //      otherwise won't modify the collection
00135         queue_t * getNonConst(void) const throw() {
00136                         return (queue_t *) this;
00137                 }
00138 
00139         // private member data -------------------------------------------------
00140         smart_mutex             m_mutex;
00141         std::deque<T>           m_queue;
00142         dword_t                 m_rvn;          // record version number
00143 };
00144 
00145 
00146 #endif  // WAVEPACKET_THREADSAFE_QUEUE_H__
00147