threadpool.cpp

Go to the documentation of this file.
00001 /*
00002  * threadpool.cpp
00003  *
00004  * Copyright (C) 2007  Thomas A. Vaughan
00005  * All rights reserved.
00006  *
00007  *
00008  * Redistribution and use in source and binary forms, with or without
00009  * modification, are permitted provided that the following conditions are met:
00010  *     * Redistributions of source code must retain the above copyright
00011  *       notice, this list of conditions and the following disclaimer.
00012  *     * Redistributions in binary form must reproduce the above copyright
00013  *       notice, this list of conditions and the following disclaimer in the
00014  *       documentation and/or other materials provided with the distribution.
00015  *     * Neither the name of the <organization> nor the
00016  *       names of its contributors may be used to endorse or promote products
00017  *       derived from this software without specific prior written permission.
00018  *
00019  * THIS SOFTWARE IS PROVIDED BY THOMAS A. VAUGHAN ''AS IS'' AND ANY
00020  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00021  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00022  * DISCLAIMED. IN NO EVENT SHALL THOMAS A. VAUGHAN BE LIABLE FOR ANY
00023  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00024  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00025  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00026  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00027  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00028  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00029  *
00030  *
00031  * Implementation of a basic threadpool object.
00032  *
00033  * NOTE: at the moment, the implementation is *VERY* basic!  There is no pool at
00034  * all, and instead threads are created for every incoming request.  But I
00035  * wanted to at least set up the abstraction, and this can later be filled in
00036  * with real threadpool behavior.
00037  */
00038 
00039 // includes --------------------------------------------------------------------
00040 #include "threadpool.h"         // always include our own header first!
00041 
00042 #include "common/wave_ex.h"
00043 #include "datahash/datahash_util.h"
00044 #include "perf/perf.h"
00045 #include "threadsafe/smart_mutex.h"
00046 
00047 
00048 namespace threadpool {
00049 
00050 
00051 // interface destructor implementation
00052 Pool::~Pool(void) throw() { }
00053 
00054 
00055 ////////////////////////////////////////////////////////////////////////////////
00056 //
00057 //      static helper methods
00058 //
00059 ////////////////////////////////////////////////////////////////////////////////
00060 
00061 
00062 ////////////////////////////////////////////////////////////////////////////////
00063 //
00064 //      TPool - object that implements the threadpool::Pool interface
00065 //
00066 ////////////////////////////////////////////////////////////////////////////////
00067 
00068 class TPool : public Pool {
00069 public:
00070         TPool(void) throw();
00071         ~TPool(void) throw() { }
00072 
00073         // public class methods ------------------------------------------------
00074         void initialize(IN const Datahash * params);
00075 
00076         // threadpool::Pool class interface methods ----------------------------
00077         bool addRequest(IN thread_work_fn work_fn,
00078                                 IN void * context);
00079 
00080 private:
00081         // private typedefs ----------------------------------------------------
00082         struct request_t {
00083                 request_t(void) { bzero(this, sizeof(request_t)); }
00084 
00085                 thread_work_fn          fn;
00086                 void *                  context;
00087                 TPool *                 pThis;
00088         };
00089 
00090         // private helper functions --------------------------------------------
00091         void updateThreadCount(IN int increment) throw();
00092         static void * poolThreadStart(IN void * ctx);
00093 
00094         // private data members ------------------------------------------------
00095         smart_mutex     m_mutex;
00096         int             m_maxThreadCount;
00097         int             m_threadCount;
00098 };
00099 
00100 
00101 
00102 TPool::TPool
00103 (
00104 void
00105 )
00106 throw()
00107 {
00108         m_maxThreadCount = 0;
00109         m_threadCount = 0;
00110 }
00111 
00112 
00113 
00114 void
00115 TPool::initialize
00116 (
00117 IN const Datahash * params
00118 )
00119 {
00120         ASSERT(params, "null");
00121 
00122         m_maxThreadCount = getInt(params, "maxThreads");
00123         ASSERT(m_maxThreadCount > 0, "Bad max thread count: %d",
00124             m_maxThreadCount);
00125 }
00126 
00127 
00128 
00129 bool
00130 TPool::addRequest
00131 (
00132 IN thread_work_fn fn,
00133 IN void * context
00134 )
00135 {
00136         perf::Timer timer("threadpool::addRequest");
00137         ASSERT(fn, "null");
00138         // ASSERT(context) -- we don't care!
00139 
00140         // NOTE: we aren't actually maintaining a thread pool!  But we at
00141         // least keep an eye on the thread count...
00142 
00143         smart_ptr<request_t> req = new request_t;
00144         ASSERT(req, "out of memory");
00145 
00146         req->fn = fn;
00147         req->context = context;
00148         req->pThis = this;
00149 
00150         pthread_t thread_id_unused;
00151         pthread_create(&thread_id_unused, NULL, poolThreadStart, req);
00152         req.disown();   // thread owns this now!
00153 
00154         // worked just fine
00155         return true;
00156 }
00157 
00158 
00159 
00160 void *
00161 TPool::poolThreadStart
00162 (
00163 IN void * ctx
00164 )
00165 {
00166         request_t * preq = (request_t *) ctx;
00167         ASSERT(preq, "null thread context?");
00168         smart_ptr<request_t> req = preq;
00169         ASSERT(req, "failed to assign smart pointer?");
00170         ASSERT(req->pThis, "null");
00171         ASSERT(req->fn, "null thread work function");
00172         // ASSERT(req->context) - we don't care
00173 
00174         // increment thread count
00175         req->pThis->updateThreadCount(+1);
00176 
00177         // invoke provided work function and provide context
00178         try {
00179                 req->fn(req->context);
00180         } catch (std::exception& e) {
00181                 DPRINTF("Exception in thread pool while handling request: %s",
00182                     e.what());
00183         } catch (...) {
00184                 DPRINTF("Unknown exception handling thread pool request!");
00185         }
00186 
00187         // decrement thread count
00188         req->pThis->updateThreadCount(-1);
00189 
00190         // all done
00191         return NULL;
00192 }
00193 
00194 
00195 
00196 void
00197 TPool::updateThreadCount
00198 (
00199 IN int increment
00200 )
00201 throw()
00202 {
00203         int count = 0;
00204         {
00205                 mlock lock(m_mutex);
00206                 m_threadCount += increment;
00207                 count = m_threadCount;
00208         }
00209 
00210         if (count < 0) {
00211                 DPRINTF("ERROR!  Negative thread count? %d", count);
00212         }
00213         if (count > m_maxThreadCount) {
00214                 DPRINTF("WARNING!  Thread count is higher than requested size");
00215                 DPRINTF("  Requested %d threads, have %d currently",
00216                     m_maxThreadCount, count);
00217         }
00218 }
00219 
00220 
00221 
00222 ////////////////////////////////////////////////////////////////////////////////
00223 //
00224 //      public API
00225 //
00226 ////////////////////////////////////////////////////////////////////////////////
00227 
00228 smart_ptr<Pool>
00229 Pool::create
00230 (
00231 IN const Datahash * params
00232 )
00233 {
00234         ASSERT(params, "null");
00235 
00236         smart_ptr<TPool> local = new TPool;
00237         ASSERT(local, "out of memory");
00238 
00239         local->initialize(params);
00240 
00241         return local;
00242 }
00243 
00244 
00245 
00246 };      // threadpool namespace
00247