ECThreadPool.h 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. /*
  2. * Copyright 2005 - 2016 Zarafa and its licensors
  3. *
  4. * This program is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU Affero General Public License, version 3,
  6. * as published by the Free Software Foundation.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU Affero General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU Affero General Public License
  14. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  15. *
  16. */
  17. #ifndef ECThreadPool_INCLUDED
  18. #define ECThreadPool_INCLUDED
  19. #include <condition_variable>
  20. #include <mutex>
  21. #include <pthread.h>
  22. #include <set>
  23. #include <list>
  24. #include <kopano/zcdefs.h>
  25. namespace KC {
  26. class ECTask;
  27. /**
  28. * This class represents a thread pool with a fixed amount of worker threads.
  29. * The amount of workers can be modified at run time, but is not automatically
  30. * adjusted based on the task queue length or age.
  31. */
  32. class _kc_export ECThreadPool _kc_final {
  33. private: // types
  34. struct STaskInfo {
  35. ECTask *lpTask;
  36. bool bDelete;
  37. struct timeval tvQueueTime;
  38. };
  39. typedef std::set<pthread_t> ThreadSet;
  40. typedef std::list<STaskInfo> TaskList;
  41. public:
  42. ECThreadPool(unsigned ulThreadCount);
  43. virtual ~ECThreadPool(void);
  44. virtual bool dispatch(ECTask *lpTask, bool bTakeOwnership = false);
  45. _kc_hidden unsigned int threadCount(void) const;
  46. _kc_hidden void setThreadCount(unsigned int cuont, bool wait = false);
  47. private: // methods
  48. _kc_hidden virtual bool getNextTask(STaskInfo *, std::unique_lock<std::mutex> &);
  49. _kc_hidden void joinTerminated(std::unique_lock<std::mutex> &);
  50. _kc_hidden static void *threadFunc(void *);
  51. _kc_hidden static bool isCurrentThread(const pthread_t &);
  52. ThreadSet m_setThreads;
  53. ThreadSet m_setTerminated;
  54. TaskList m_listTasks;
  55. mutable std::mutex m_hMutex;
  56. std::condition_variable m_hCondition;
  57. std::condition_variable m_hCondTerminated;
  58. mutable std::condition_variable m_hCondTaskDone;
  59. ECThreadPool(const ECThreadPool &) = delete;
  60. ECThreadPool &operator=(const ECThreadPool &) = delete;
  61. unsigned int m_ulTermReq = 0;
  62. };
  63. /**
  64. * Get the number of worker threads.
  65. * @retval The number of available worker threads.
  66. */
  67. inline unsigned ECThreadPool::threadCount() const {
  68. return m_setThreads.size() - m_ulTermReq;
  69. }
  70. /**
  71. * This class represents a task that can be dispatched on an ECThreadPool or
  72. * derived object.
  73. * Once dispatched, the objects run method will be executed once the threadpool
  74. * has a free worker and all previously queued tasks have been processed. There's
  75. * no way of knowing when the task is done.
  76. */
  77. class _kc_export ECTask {
  78. public:
  79. _kc_hidden virtual ~ECTask(void) _kc_impdtor;
  80. _kc_hidden virtual void execute(void);
  81. _kc_hidden bool dispatchOn(ECThreadPool *, bool transfer_ownership = false);
  82. protected:
  83. _kc_hidden virtual void run(void) = 0;
  84. _kc_hidden ECTask(void) {};
  85. private:
  86. // Make the object non-copyable
  87. ECTask(const ECTask &) = delete;
  88. ECTask &operator=(const ECTask &) = delete;
  89. };
  90. /**
  91. * Dispatch a task object on a particular threadpool.
  92. *
  93. * @param[in] lpThreadPool The threadpool on which to dispatch the task.
  94. * @param[in] bTransferOwnership Boolean parameter specifying wether the threadpool
  95. * should take ownership of the task object, and thus
  96. * is responsible for deleting the object when done.
  97. * @retval true if the task was successfully queued, false otherwise.
  98. */
  99. inline bool ECTask::dispatchOn(ECThreadPool *lpThreadPool, bool bTransferOwnership) {
  100. return lpThreadPool ? lpThreadPool->dispatch(this, bTransferOwnership) : false;
  101. }
  102. /**
  103. * This class represents a task that can be executed on an ECThreadPool or
  104. * derived object. It's similar to an ECTask, but one can wait for the task
  105. * to be finished.
  106. */
  107. class _kc_export ECWaitableTask : public ECTask {
  108. public:
  109. static const unsigned WAIT_INFINITE = (unsigned)-1;
  110. enum State {
  111. Idle = 1,
  112. Running = 2,
  113. Done = 4
  114. };
  115. virtual ~ECWaitableTask();
  116. virtual void execute(void) _kc_override;
  117. _kc_hidden bool done(void) const;
  118. bool wait(unsigned timeout = WAIT_INFINITE, unsigned waitMask = Done) const;
  119. protected:
  120. ECWaitableTask();
  121. private:
  122. mutable std::mutex m_hMutex;
  123. mutable std::condition_variable m_hCondition;
  124. State m_state;
  125. };
  126. /**
  127. * Check if the task has been executed.
  128. * @retval true when executed, false otherwise.
  129. */
  130. inline bool ECWaitableTask::done() const {
  131. return m_state == Done;
  132. }
  133. /**
  134. * This class can be used to run a function with one argument asynchronously on
  135. * an ECThreadPool or derived class.
  136. * To call a function with more than one argument boost::bind can be used.
  137. */
  138. template<typename _Rt, typename _Fn, typename _At>
  139. class ECDeferredFunc _kc_final : public ECWaitableTask {
  140. public:
  141. /**
  142. * Construct an ECDeferredFunc instance.
  143. * @param[in] fn The function to execute
  144. * @param[in] arg The argument to pass to fn.
  145. */
  146. ECDeferredFunc(_Fn fn, const _At &arg) : m_fn(fn), m_arg(arg)
  147. { }
  148. virtual void run(void) _kc_override
  149. {
  150. m_result = m_fn(m_arg);
  151. }
  152. /**
  153. * Get the result of the asynchronous function. This method will
  154. * block until the method has been executed.
  155. */
  156. _Rt result() const {
  157. wait();
  158. return m_result;
  159. }
  160. private:
  161. _Rt m_result = 0;
  162. _Fn m_fn;
  163. _At m_arg;
  164. };
  165. } /* namespace */
  166. #endif // ndef ECThreadPool_INCLUDED