Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include "threadpool.h"
00041
00042 #include "common/wave_ex.h"
00043 #include "datahash/datahash_util.h"
00044 #include "perf/perf.h"
00045 #include "threadsafe/smart_mutex.h"
00046
00047
00048 namespace threadpool {
00049
00050
00051
00052 Pool::~Pool(void) throw() { }
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068 class TPool : public Pool {
00069 public:
00070 TPool(void) throw();
00071 ~TPool(void) throw() { }
00072
00073
00074 void initialize(IN const Datahash * params);
00075
00076
00077 bool addRequest(IN thread_work_fn work_fn,
00078 IN void * context);
00079
00080 private:
00081
00082 struct request_t {
00083 request_t(void) { bzero(this, sizeof(request_t)); }
00084
00085 thread_work_fn fn;
00086 void * context;
00087 TPool * pThis;
00088 };
00089
00090
00091 void updateThreadCount(IN int increment) throw();
00092 static void * poolThreadStart(IN void * ctx);
00093
00094
00095 smart_mutex m_mutex;
00096 int m_maxThreadCount;
00097 int m_threadCount;
00098 };
00099
00100
00101
00102 TPool::TPool
00103 (
00104 void
00105 )
00106 throw()
00107 {
00108 m_maxThreadCount = 0;
00109 m_threadCount = 0;
00110 }
00111
00112
00113
00114 void
00115 TPool::initialize
00116 (
00117 IN const Datahash * params
00118 )
00119 {
00120 ASSERT(params, "null");
00121
00122 m_maxThreadCount = getInt(params, "maxThreads");
00123 ASSERT(m_maxThreadCount > 0, "Bad max thread count: %d",
00124 m_maxThreadCount);
00125 }
00126
00127
00128
00129 bool
00130 TPool::addRequest
00131 (
00132 IN thread_work_fn fn,
00133 IN void * context
00134 )
00135 {
00136 perf::Timer timer("threadpool::addRequest");
00137 ASSERT(fn, "null");
00138
00139
00140
00141
00142
00143 smart_ptr<request_t> req = new request_t;
00144 ASSERT(req, "out of memory");
00145
00146 req->fn = fn;
00147 req->context = context;
00148 req->pThis = this;
00149
00150 pthread_t thread_id_unused;
00151 pthread_create(&thread_id_unused, NULL, poolThreadStart, req);
00152 req.disown();
00153
00154
00155 return true;
00156 }
00157
00158
00159
00160 void *
00161 TPool::poolThreadStart
00162 (
00163 IN void * ctx
00164 )
00165 {
00166 request_t * preq = (request_t *) ctx;
00167 ASSERT(preq, "null thread context?");
00168 smart_ptr<request_t> req = preq;
00169 ASSERT(req, "failed to assign smart pointer?");
00170 ASSERT(req->pThis, "null");
00171 ASSERT(req->fn, "null thread work function");
00172
00173
00174
00175 req->pThis->updateThreadCount(+1);
00176
00177
00178 try {
00179 req->fn(req->context);
00180 } catch (std::exception& e) {
00181 DPRINTF("Exception in thread pool while handling request: %s",
00182 e.what());
00183 } catch (...) {
00184 DPRINTF("Unknown exception handling thread pool request!");
00185 }
00186
00187
00188 req->pThis->updateThreadCount(-1);
00189
00190
00191 return NULL;
00192 }
00193
00194
00195
00196 void
00197 TPool::updateThreadCount
00198 (
00199 IN int increment
00200 )
00201 throw()
00202 {
00203 int count = 0;
00204 {
00205 mlock lock(m_mutex);
00206 m_threadCount += increment;
00207 count = m_threadCount;
00208 }
00209
00210 if (count < 0) {
00211 DPRINTF("ERROR! Negative thread count? %d", count);
00212 }
00213 if (count > m_maxThreadCount) {
00214 DPRINTF("WARNING! Thread count is higher than requested size");
00215 DPRINTF(" Requested %d threads, have %d currently",
00216 m_maxThreadCount, count);
00217 }
00218 }
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228 smart_ptr<Pool>
00229 Pool::create
00230 (
00231 IN const Datahash * params
00232 )
00233 {
00234 ASSERT(params, "null");
00235
00236 smart_ptr<TPool> local = new TPool;
00237 ASSERT(local, "out of memory");
00238
00239 local->initialize(params);
00240
00241 return local;
00242 }
00243
00244
00245
00246 };
00247