11/*
22 Thread Pool implementation for unix / linux environments
33 Copyright (C) 2008 Shobhit Gupta
4-
4+
55 This program is free software: you can redistribute it and/or modify
66 it under the terms of the GNU General Public License as published by
77 the Free Software Foundation, either version 3 of the License, or
@@ -33,38 +33,38 @@ ThreadPool::ThreadPool()
3333
3434ThreadPool::ThreadPool (int maxThreads)
3535{
36- if (maxThreads < 1 ) maxThreads=1 ;
37-
38- // mutexSync = PTHREAD_MUTEX_INITIALIZER;
39- // mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER;
40-
41- pthread_mutex_lock (&mutexSync);
42- this ->maxThreads = maxThreads;
43- this ->queueSize = maxThreads;
44- // workerQueue = new WorkerThread *[maxThreads];
45- workerQueue.resize (maxThreads, NULL );
46- topIndex = 0 ;
47- bottomIndex = 0 ;
48- incompleteWork = 0 ;
49- sem_init (&availableWork, 0 , 0 );
50- sem_init (&availableThreads, 0 , queueSize);
51- pthread_mutex_unlock (&mutexSync);
36+ if (maxThreads < 1 ) maxThreads=1 ;
37+
38+ // mutexSync = PTHREAD_MUTEX_INITIALIZER;
39+ // mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER;
40+
41+ pthread_mutex_lock (&mutexSync);
42+ this ->maxThreads = maxThreads;
43+ this ->queueSize = maxThreads;
44+ // workerQueue = new WorkerThread *[maxThreads];
45+ workerQueue.resize (maxThreads, NULL );
46+ topIndex = 0 ;
47+ bottomIndex = 0 ;
48+ incompleteWork = 0 ;
49+ sem_init (&availableWork, 0 , 0 );
50+ sem_init (&availableThreads, 0 , queueSize);
51+ pthread_mutex_unlock (&mutexSync);
5252}
5353
5454void ThreadPool::initializeThreads ()
5555{
56- for (int i = 0 ; i<maxThreads; ++i)
56+ for (int i = 0 ; i<maxThreads; ++i)
5757 {
5858 pthread_t tempThread;
59- pthread_create (&tempThread, NULL , &ThreadPool::threadExecute, (void *) this );
60- // threadIdVec[i] = tempThread;
61- }
59+ pthread_create (&tempThread, NULL , &ThreadPool::threadExecute, (void *) this );
60+ // threadIdVec[i] = tempThread;
61+ }
6262
6363}
6464
6565ThreadPool::~ThreadPool ()
6666{
67- workerQueue.clear ();
67+ workerQueue.clear ();
6868}
6969
7070
@@ -73,34 +73,34 @@ void ThreadPool::destroyPool(int maxPollSecs = 2)
7373{
7474 while ( incompleteWork>0 )
7575 {
76- // cout << "Work is still incomplete=" << incompleteWork << endl;
76+ // cout << "Work is still incomplete=" << incompleteWork << endl;
7777 sleep (maxPollSecs);
7878 }
7979 cout << " All Done!! Wow! That was a lot of work!" << endl;
8080 sem_destroy (&availableWork);
8181 sem_destroy (&availableThreads);
82- pthread_mutex_destroy (&mutexSync);
83- pthread_mutex_destroy (&mutexWorkCompletion);
82+ pthread_mutex_destroy (&mutexSync);
83+ pthread_mutex_destroy (&mutexWorkCompletion);
8484
8585}
8686
8787
8888bool ThreadPool::assignWork (WorkerThread *workerThread)
8989{
90- pthread_mutex_lock (&mutexWorkCompletion);
91- incompleteWork++;
92- // cout << "assignWork...incomapleteWork=" << incompleteWork << endl;
90+ pthread_mutex_lock (&mutexWorkCompletion);
91+ incompleteWork++;
92+ // cout << "assignWork...incomapleteWork=" << incompleteWork << endl;
9393 pthread_mutex_unlock (&mutexWorkCompletion);
94-
94+
9595 sem_wait (&availableThreads);
96-
96+
9797 pthread_mutex_lock (&mutexSync);
98- // workerVec[topIndex] = workerThread;
99- workerQueue[topIndex] = workerThread;
100- // cout << "Assigning Worker[" << workerThread->id << "] Address:[" << workerThread << "] to Queue index [" << topIndex << "]" << endl;
101- if (queueSize !=1 )
102- topIndex = (topIndex+1 ) % (queueSize-1 );
103- sem_post (&availableWork);
98+ // workerVec[topIndex] = workerThread;
99+ workerQueue[topIndex] = workerThread;
100+ // cout << "Assigning Worker[" << workerThread->id << "] Address:[" << workerThread << "] to Queue index [" << topIndex << "]" << endl;
101+ if (queueSize !=1 )
102+ topIndex = (topIndex+1 ) % (queueSize-1 );
103+ sem_post (&availableWork);
104104 pthread_mutex_unlock (&mutexSync);
105105 return true ;
106106}
@@ -110,33 +110,33 @@ bool ThreadPool::fetchWork(WorkerThread **workerArg)
110110 sem_wait (&availableWork);
111111
112112 pthread_mutex_lock (&mutexSync);
113- WorkerThread * workerThread = workerQueue[bottomIndex];
114- workerQueue[bottomIndex] = NULL ;
115- *workerArg = workerThread;
116- if (queueSize !=1 )
117- bottomIndex = (bottomIndex+1 ) % (queueSize-1 );
118- sem_post (&availableThreads);
113+ WorkerThread * workerThread = workerQueue[bottomIndex];
114+ workerQueue[bottomIndex] = NULL ;
115+ *workerArg = workerThread;
116+ if (queueSize !=1 )
117+ bottomIndex = (bottomIndex+1 ) % (queueSize-1 );
118+ sem_post (&availableThreads);
119119 pthread_mutex_unlock (&mutexSync);
120- return true ;
120+ return true ;
121121}
122122
123123void *ThreadPool::threadExecute (void *param)
124124{
125125 WorkerThread *worker = NULL ;
126-
126+
127127 while (((ThreadPool *)param)->fetchWork (&worker))
128128 {
129129 if (worker)
130- {
130+ {
131131 worker->executeThis ();
132- // cout << "worker[" << worker->id << "]\tdelete address: [" << worker << "]" << endl;
133- delete worker;
134- worker = NULL ;
135- }
132+ // cout << "worker[" << worker->id << "]\tdelete address: [" << worker << "]" << endl;
133+ delete worker;
134+ worker = NULL ;
135+ }
136136
137137 pthread_mutex_lock ( &(((ThreadPool *)param)->mutexWorkCompletion ) );
138- // cout << "Thread " << pthread_self() << " has completed a Job !" << endl;
139- ((ThreadPool *)param)->incompleteWork --;
138+ // cout << "Thread " << pthread_self() << " has completed a Job !" << endl;
139+ ((ThreadPool *)param)->incompleteWork --;
140140 pthread_mutex_unlock ( &(((ThreadPool *)param)->mutexWorkCompletion ) );
141141 }
142142 return 0 ;
0 commit comments