123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- #include <chrono>
- #include <kopano/platform.h>
- #include <kopano/lockhelper.hpp>
- #include "ECFifoBuffer.h"
- namespace KC {
- ECFifoBuffer::ECFifoBuffer(size_type ulMaxSize)
- : m_ulMaxSize(ulMaxSize)
- {
- }
- ECRESULT ECFifoBuffer::Write(const void *lpBuf, size_type cbBuf, unsigned int ulTimeoutMs, size_type *lpcbWritten)
- {
- ECRESULT er = erSuccess;
- size_type cbWritten = 0;
- struct timespec deadline = {0};
- auto lpData = reinterpret_cast<const unsigned char *>(lpBuf);
- if (lpBuf == NULL)
- return KCERR_INVALID_PARAMETER;
- if (IsClosed(cfWrite))
- return KCERR_NETWORK_ERROR;
- if (cbBuf == 0) {
- if (lpcbWritten)
- *lpcbWritten = 0;
- return erSuccess;
- }
- if (ulTimeoutMs > 0)
- deadline = GetDeadline(ulTimeoutMs);
- ulock_normal locker(m_hMutex);
- while (cbWritten < cbBuf) {
- while (IsFull()) {
- if (IsClosed(cfRead)) {
- er = KCERR_NETWORK_ERROR;
- goto exit;
- }
- if (ulTimeoutMs > 0) {
- if (m_hCondNotFull.wait_for(locker,
- std::chrono::milliseconds(ulTimeoutMs)) ==
- std::cv_status::timeout) {
- er = KCERR_TIMEOUT;
- goto exit;
- }
- } else
- m_hCondNotFull.wait(locker);
- }
- const size_type cbNow = std::min(cbBuf - cbWritten, m_ulMaxSize - m_storage.size());
- try {
- m_storage.insert(m_storage.end(), lpData + cbWritten, lpData + cbWritten + cbNow);
- } catch (const std::bad_alloc &) {
- er = KCERR_NOT_ENOUGH_MEMORY;
- goto exit;
- }
- m_hCondNotEmpty.notify_one();
- cbWritten += cbNow;
- }
- exit:
- locker.unlock();
- if (lpcbWritten && (er == erSuccess || er == KCERR_TIMEOUT))
- *lpcbWritten = cbWritten;
- return er;
- }
- ECRESULT ECFifoBuffer::Read(void *lpBuf, size_type cbBuf, unsigned int ulTimeoutMs, size_type *lpcbRead)
- {
- ECRESULT er = erSuccess;
- size_type cbRead = 0;
- struct timespec deadline = {0};
- auto lpData = reinterpret_cast<unsigned char *>(lpBuf);
- if (lpBuf == NULL)
- return KCERR_INVALID_PARAMETER;
- if (IsClosed(cfRead))
- return KCERR_NETWORK_ERROR;
- if (cbBuf == 0) {
- if (lpcbRead)
- *lpcbRead = 0;
- return erSuccess;
- }
- if (ulTimeoutMs > 0)
- deadline = GetDeadline(ulTimeoutMs);
-
- ulock_normal locker(m_hMutex);
- while (cbRead < cbBuf) {
- while (IsEmpty()) {
- if (IsClosed(cfWrite))
- goto exit;
- if (ulTimeoutMs > 0) {
- if (m_hCondNotEmpty.wait_for(locker,
- std::chrono::milliseconds(ulTimeoutMs)) ==
- std::cv_status::timeout) {
- er = KCERR_TIMEOUT;
- goto exit;
- }
- } else
- m_hCondNotEmpty.wait(locker);
- }
- const size_type cbNow = std::min(cbBuf - cbRead, m_storage.size());
- auto iEndNow = m_storage.begin() + cbNow;
- std::copy(m_storage.begin(), iEndNow, lpData + cbRead);
- m_storage.erase(m_storage.begin(), iEndNow);
- m_hCondNotFull.notify_one();
- cbRead += cbNow;
- }
-
- if (IsEmpty() && IsClosed(cfWrite))
- m_hCondFlushed.notify_one();
- exit:
- locker.unlock();
- if (lpcbRead && (er == erSuccess || er == KCERR_TIMEOUT))
- *lpcbRead = cbRead;
- return er;
- }
- ECRESULT ECFifoBuffer::Close(close_flags flags)
- {
- scoped_lock locker(m_hMutex);
- if (flags & cfRead) {
- m_bReaderClosed = true;
- m_hCondNotFull.notify_one();
- if(IsEmpty())
- m_hCondFlushed.notify_one();
- }
- if (flags & cfWrite) {
- m_bWriterClosed = true;
- m_hCondNotEmpty.notify_one();
- }
- return erSuccess;
- }
- ECRESULT ECFifoBuffer::Flush()
- {
- if (!IsClosed(cfWrite))
- return KCERR_NETWORK_ERROR;
- ulock_normal locker(m_hMutex);
- m_hCondFlushed.wait(locker,
- [this](void) { return IsClosed(cfWrite) || IsEmpty(); });
- return erSuccess;
- }
- }
|