ECThreadPool.cpp 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. /*
  2. * Copyright 2005 - 2016 Zarafa and its licensors
  3. *
  4. * This program is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU Affero General Public License, version 3,
  6. * as published by the Free Software Foundation.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU Affero General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU Affero General Public License
  14. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  15. *
  16. */
  17. #include <chrono>
  18. #include <condition_variable>
  19. #include <utility>
  20. #include <kopano/platform.h>
  21. #include <kopano/ECThreadPool.h>
  22. #include <kopano/lockhelper.hpp>
  23. #include <algorithm>
  24. #include <sys/time.h> /* gettimeofday */
  25. namespace KC {
  26. /**
  27. * Construct an ECThreadPool instance.
  28. * @param[in] ulThreadCount The amount of worker hreads to create.
  29. */
  30. ECThreadPool::ECThreadPool(unsigned ulThreadCount)
  31. {
  32. setThreadCount(ulThreadCount);
  33. }
  34. /**
  35. * Destruct an ECThreadPool instance. This blocks until all worker
  36. * threads have exited.
  37. */
  38. ECThreadPool::~ECThreadPool()
  39. {
  40. setThreadCount(0, true);
  41. }
  42. /**
  43. * Dispatch a task object on the threadpool instance.
  44. * @param[in] lpTask The task object to dispatch.
  45. * @param[in] bTakeOwnership Boolean parameter specifying wether the threadpool
  46. * should take ownership of the task object, and thus
  47. * is responsible for deleting the object when done.
  48. * @returns true if the task was successfully queued, false otherwise.
  49. */
  50. bool ECThreadPool::dispatch(ECTask *lpTask, bool bTakeOwnership)
  51. {
  52. STaskInfo sTaskInfo = {lpTask, bTakeOwnership, {0, 0}};
  53. gettimeofday(&sTaskInfo.tvQueueTime, NULL);
  54. ulock_normal locker(m_hMutex);
  55. m_listTasks.push_back(std::move(sTaskInfo));
  56. m_hCondition.notify_one();
  57. joinTerminated(locker);
  58. return true;
  59. }
  60. /**
  61. * Set the amount of worker threads for the threadpool.
  62. * @param[in] ulThreadCount The amount of required worker threads.
  63. * @param[in] bWait If the requested amount of worker threads is less
  64. * than the current amount, this method will wait until
  65. * the extra threads have exited if this argument is true.
  66. */
  67. void ECThreadPool::setThreadCount(unsigned ulThreadCount, bool bWait)
  68. {
  69. ulock_normal locker(m_hMutex);
  70. if (ulThreadCount == threadCount() - 1) {
  71. ++m_ulTermReq;
  72. m_hCondition.notify_one();
  73. }
  74. else if (ulThreadCount < threadCount()) {
  75. m_ulTermReq += (threadCount() - ulThreadCount);
  76. m_hCondition.notify_all();
  77. }
  78. else {
  79. unsigned ulThreadsToAdd = ulThreadCount - threadCount();
  80. if (ulThreadsToAdd <= m_ulTermReq)
  81. m_ulTermReq -= ulThreadsToAdd;
  82. else {
  83. ulThreadsToAdd -= m_ulTermReq;
  84. m_ulTermReq = 0;
  85. for (unsigned i = 0; i < ulThreadsToAdd; ++i) {
  86. pthread_t hThread;
  87. pthread_create(&hThread, NULL, &threadFunc, this);
  88. set_thread_name(hThread, "ECThreadPool");
  89. m_setThreads.insert(hThread);
  90. }
  91. }
  92. }
  93. while (bWait && m_setThreads.size() > ulThreadCount) {
  94. m_hCondTerminated.wait(locker);
  95. joinTerminated(locker);
  96. }
  97. assert(threadCount() == ulThreadCount);
  98. joinTerminated(locker);
  99. }
  100. /**
  101. * Get the next task from the queue (or terminate thread).
  102. * This method normally pops the next task object from the queue. However when
  103. * the number of worker threads needs to be decreased this method will remove the
  104. * calling thread from the set of available worker threads and return false to
  105. * inform the caller that it should exit.
  106. *
  107. * @param[out] lpsTaskInfo A STaskInfo struct containing the task to be executed.
  108. * @retval true The next task was successfully obtained.
  109. * @retval false The thread was requested to exit.
  110. */
  111. bool ECThreadPool::getNextTask(STaskInfo *lpsTaskInfo, ulock_normal &locker)
  112. {
  113. assert(locker.owns_lock());
  114. assert(lpsTaskInfo != NULL);
  115. bool bTerminate = false;
  116. while ((bTerminate = (m_ulTermReq > 0)) == false && m_listTasks.empty())
  117. m_hCondition.wait(locker);
  118. if (bTerminate) {
  119. auto iThread = std::find_if(m_setThreads.cbegin(), m_setThreads.cend(), &isCurrentThread);
  120. assert(iThread != m_setThreads.cend());
  121. m_setTerminated.insert(*iThread);
  122. m_setThreads.erase(iThread);
  123. --m_ulTermReq;
  124. m_hCondTerminated.notify_one();
  125. return false;
  126. }
  127. *lpsTaskInfo = m_listTasks.front();
  128. m_listTasks.pop_front();
  129. return true;
  130. }
  131. /**
  132. * Call pthread_join on all terminated threads for cleanup.
  133. */
  134. void ECThreadPool::joinTerminated(ulock_normal &locker)
  135. {
  136. assert(locker.owns_lock());
  137. for (auto thr : m_setTerminated)
  138. pthread_join(thr, NULL);
  139. m_setTerminated.clear();
  140. }
  141. /**
  142. * Check if the calling thread equals the passed thread handle.
  143. * @param[in] hThread The thread handle to compare with.
  144. * @retval true when matched, false otherwise.
  145. */
  146. inline bool ECThreadPool::isCurrentThread(const pthread_t &hThread)
  147. {
  148. return pthread_equal(hThread, pthread_self()) != 0;
  149. }
  150. /**
  151. * The main loop of the worker threads.
  152. * @param[in] lpVoid Pointer to the owning ECThreadPool object cast to a void pointer.
  153. * @returns NULL
  154. */
  155. void* ECThreadPool::threadFunc(void *lpVoid)
  156. {
  157. auto lpPool = static_cast<ECThreadPool *>(lpVoid);
  158. while (true) {
  159. STaskInfo sTaskInfo = {NULL, false};
  160. bool bResult = false;
  161. ulock_normal locker(lpPool->m_hMutex);
  162. bResult = lpPool->getNextTask(&sTaskInfo, locker);
  163. locker.unlock();
  164. if (!bResult)
  165. break;
  166. assert(sTaskInfo.lpTask != NULL);
  167. sTaskInfo.lpTask->execute();
  168. if (sTaskInfo.bDelete)
  169. delete sTaskInfo.lpTask;
  170. lpPool->m_hCondTaskDone.notify_one();
  171. }
  172. return NULL;
  173. }
  174. /**
  175. * Execute an ECTask instance, just calls the run() method of the derived class.
  176. */
  177. void ECTask::execute()
  178. {
  179. run();
  180. }
  181. /**
  182. * Construct an ECWaitableTask object.
  183. */
  184. ECWaitableTask::ECWaitableTask()
  185. : m_state(Idle)
  186. {
  187. }
  188. /**
  189. * Destruct an ECWaitableTask object.
  190. */
  191. ECWaitableTask::~ECWaitableTask()
  192. {
  193. wait(WAIT_INFINITE, Idle|Done);
  194. }
  195. /**
  196. * Execute an ECWaitableTask object.
  197. * This calls ECTask::execute and makes sure any blocking threads will be notified when done.
  198. */
  199. void ECWaitableTask::execute()
  200. {
  201. ulock_normal big(m_hMutex);
  202. m_state = Running;
  203. m_hCondition.notify_all();
  204. big.unlock();
  205. ECTask::execute();
  206. big.lock();
  207. m_state = Done;
  208. m_hCondition.notify_all();
  209. big.unlock();
  210. }
  211. /**
  212. * Wait for an ECWaitableTask instance to finish.
  213. * @param[in] timeout Timeout in ms to wait for the task to finish. Pass 0 don't block at all or WAIT_INFINITE to block indefinitely.
  214. * @param[in] waitMask Mask of the combined states for which this function will wait. Default is Done, causing this function to wait
  215. * until the task is executed. The destructor for instance used Idle|Done, causing this function to only wait when
  216. * the task is currently running.
  217. * @retval true if the task state matches any state in waitMask, false otherwise.
  218. */
  219. bool ECWaitableTask::wait(unsigned timeout, unsigned waitMask) const
  220. {
  221. bool bResult = false;
  222. ulock_normal locker(m_hMutex);
  223. switch (timeout) {
  224. case 0:
  225. bResult = ((m_state & waitMask) != 0);
  226. break;
  227. case WAIT_INFINITE:
  228. m_hCondition.wait(locker, [&](void) { return m_state & waitMask; });
  229. bResult = true;
  230. break;
  231. default:
  232. while (!(m_state & waitMask))
  233. if (m_hCondition.wait_for(locker, std::chrono::milliseconds(timeout)) ==
  234. std::cv_status::timeout)
  235. break;
  236. bResult = ((m_state & waitMask) != 0);
  237. break;
  238. }
  239. return bResult;
  240. }
  241. } /* namespace */