123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- #ifndef ECTHREADMANAGER_H
- #define ECTHREADMANAGER_H
- #include <kopano/zcdefs.h>
- #include <condition_variable>
- #include <mutex>
- #include <queue>
- #include <set>
- #include <pthread.h>
- #include <kopano/ECConfig.h>
- #include <kopano/kcodes.h>
- #include "SOAPUtils.h"
- #include "soapH.h"
- struct WORKITEM {
- struct soap *soap;
- double dblReceiveStamp;
- };
- struct ACTIVESOCKET _kc_final {
- struct soap *soap;
- time_t ulLastActivity;
-
- bool operator < (const ACTIVESOCKET &a) const { return a.soap->socket < this->soap->socket; };
- };
- class FindSocket _kc_final {
- public:
- FindSocket(SOAP_SOCKET s) { this->s = s; };
- bool operator()(const ACTIVESOCKET &a) const { return a.soap->socket == s; }
- private:
- SOAP_SOCKET s;
- };
- class FindListenSocket _kc_final {
- public:
- FindListenSocket(SOAP_SOCKET s) { this->s = s; };
- bool operator()(struct soap *soap) const { return soap->socket == s; }
- private:
- SOAP_SOCKET s;
- };
- class ECThreadManager;
- class ECDispatcher;
- class ECWorkerThread {
- public:
- ECWorkerThread(ECThreadManager *, ECDispatcher *, bool nostart = false);
- protected:
-
- virtual ~ECWorkerThread(void) _kc_impdtor;
- static void *Work(void *param);
- pthread_t m_thread;
- ECThreadManager *m_lpManager;
- ECDispatcher *m_lpDispatcher;
- };
- class _kc_export_dycast ECPriorityWorkerThread _kc_final :
- public ECWorkerThread {
- public:
- _kc_hidden ECPriorityWorkerThread(ECThreadManager *, ECDispatcher *);
-
- _kc_hidden ~ECPriorityWorkerThread(void);
- };
- class ECThreadManager _kc_final {
- public:
-
- ECThreadManager(ECDispatcher *, unsigned int threads);
- ~ECThreadManager();
-
-
-
- ECRESULT ForceAddThread(int nThreads);
-
-
- ECRESULT GetThreadCount(unsigned int *lpulThreads);
-
-
- ECRESULT SetThreadCount(unsigned int ulThreads);
-
-
- ECRESULT NotifyIdle(ECWorkerThread *, bool *lpfStop);
-
- private:
- std::mutex m_mutexThreads;
- std::list<ECWorkerThread *> m_lstThreads;
- ECPriorityWorkerThread * m_lpPrioWorker;
- ECDispatcher * m_lpDispatcher;
- unsigned int m_ulThreads;
- };
- class ECWatchDog _kc_final {
- public:
- ECWatchDog(ECConfig *, ECDispatcher *, ECThreadManager *);
- ~ECWatchDog();
- private:
-
- static void *Watch(void *);
-
- ECConfig * m_lpConfig;
- ECDispatcher * m_lpDispatcher;
- ECThreadManager* m_lpThreadManager;
- pthread_t m_thread;
- bool m_bExit = false;
- std::mutex m_mutexExit;
- std::condition_variable m_condExit;
- };
- typedef SOAP_SOCKET (*CREATEPIPESOCKETCALLBACK)(void *lpParam);
- class ECDispatcher {
- public:
- ECDispatcher(ECConfig *, CREATEPIPESOCKETCALLBACK, void *cbparam);
- virtual ~ECDispatcher(void) _kc_impdtor;
-
-
- ECRESULT GetIdle(unsigned int *lpulIdle);
- ECRESULT GetThreadCount(unsigned int *lpulThreads, unsigned int *lpulIdleThreads);
- ECRESULT GetFrontItemAge(double *lpdblAge);
- ECRESULT GetQueueLength(unsigned int *lpulQueueLength);
- ECRESULT SetThreadCount(unsigned int ulThreads);
-
-
- ECRESULT AddListenSocket(struct soap *soap);
-
- ECRESULT QueueItem(struct soap *soap);
-
-
- ECRESULT GetNextWorkItem(WORKITEM **item, bool bWait, bool bPrio);
-
- ECRESULT DoHUP();
-
- virtual ECRESULT ShutDown();
-
-
-
- ECRESULT NotifyDone(struct soap *soap);
- virtual ECRESULT NotifyRestart(SOAP_SOCKET s) = 0;
-
-
- virtual ECRESULT MainLoop() = 0;
-
- protected:
- ECConfig * m_lpConfig;
- ECThreadManager *m_lpThreadManager = nullptr;
- std::mutex m_mutexItems;
- std::queue<WORKITEM *> m_queueItems;
- std::condition_variable m_condItems;
- std::queue<WORKITEM *> m_queuePrioItems;
- std::condition_variable m_condPrioItems;
- std::map<int, ACTIVESOCKET> m_setSockets;
- std::map<int, struct soap *> m_setListenSockets;
- std::mutex m_mutexSockets;
- bool m_bExit = false;
- std::mutex m_mutexIdle;
- unsigned int m_ulIdle = 0;
- CREATEPIPESOCKETCALLBACK m_lpCreatePipeSocketCallback;
- void * m_lpCreatePipeSocketParam;
-
- int m_nMaxKeepAlive;
- int m_nRecvTimeout;
- int m_nReadTimeout;
- int m_nSendTimeout;
- };
- class ECDispatcherSelect _kc_final : public ECDispatcher {
- private:
- int m_fdRescanRead;
- int m_fdRescanWrite;
- public:
- ECDispatcherSelect(ECConfig *, CREATEPIPESOCKETCALLBACK, void *cbparam);
- virtual ECRESULT MainLoop();
- virtual ECRESULT ShutDown();
- virtual ECRESULT NotifyRestart(SOAP_SOCKET s);
- };
- #ifdef HAVE_EPOLL_CREATE
- class ECDispatcherEPoll _kc_final : public ECDispatcher {
- private:
- int m_fdMax;
- int m_epFD;
- public:
- ECDispatcherEPoll(ECConfig *, CREATEPIPESOCKETCALLBACK, void *cbparam);
- virtual ~ECDispatcherEPoll();
- virtual ECRESULT MainLoop();
-
- virtual ECRESULT NotifyRestart(SOAP_SOCKET s);
- };
- #endif
- #endif
|