ECFifoBuffer.cpp 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. /*
  2. * Copyright 2005 - 2016 Zarafa and its licensors
  3. *
  4. * This program is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU Affero General Public License, version 3,
  6. * as published by the Free Software Foundation.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU Affero General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU Affero General Public License
  14. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  15. *
  16. */
  17. #include <chrono>
  18. #include <kopano/platform.h>
  19. #include <kopano/lockhelper.hpp>
  20. #include "ECFifoBuffer.h"
  21. namespace KC {
  22. ECFifoBuffer::ECFifoBuffer(size_type ulMaxSize)
  23. : m_ulMaxSize(ulMaxSize)
  24. {
  25. }
  26. /**
  27. * Write data into the FIFO.
  28. *
  29. * @param[in] lpBuf Pointer to the data being written.
  30. * @param[in] cbBuf The amount of data to write (in bytes).
  31. * @param[out] lpcbWritten The amount of data actually written.
  32. * @param[in] ulTimeoutMs The maximum amount that this function may block.
  33. *
  34. * @retval erSuccess The data was successfully written.
  35. * @retval KCERR_INVALID_PARAMETER lpBuf is NULL.
  36. * @retval KCERR_NOT_ENOUGH_MEMORY There was not enough memory available to store the data.
  37. * @retval KCERR_TIMEOUT Not all data was writting within the specified time limit.
  38. * The amount of data that was written is returned in lpcbWritten.
  39. * @retval KCERR_NETWORK_ERROR The buffer was closed prior to this call.
  40. */
  41. ECRESULT ECFifoBuffer::Write(const void *lpBuf, size_type cbBuf, unsigned int ulTimeoutMs, size_type *lpcbWritten)
  42. {
  43. ECRESULT er = erSuccess;
  44. size_type cbWritten = 0;
  45. struct timespec deadline = {0};
  46. auto lpData = reinterpret_cast<const unsigned char *>(lpBuf);
  47. if (lpBuf == NULL)
  48. return KCERR_INVALID_PARAMETER;
  49. if (IsClosed(cfWrite))
  50. return KCERR_NETWORK_ERROR;
  51. if (cbBuf == 0) {
  52. if (lpcbWritten)
  53. *lpcbWritten = 0;
  54. return erSuccess;
  55. }
  56. if (ulTimeoutMs > 0)
  57. deadline = GetDeadline(ulTimeoutMs);
  58. ulock_normal locker(m_hMutex);
  59. while (cbWritten < cbBuf) {
  60. while (IsFull()) {
  61. if (IsClosed(cfRead)) {
  62. er = KCERR_NETWORK_ERROR;
  63. goto exit;
  64. }
  65. if (ulTimeoutMs > 0) {
  66. if (m_hCondNotFull.wait_for(locker,
  67. std::chrono::milliseconds(ulTimeoutMs)) ==
  68. std::cv_status::timeout) {
  69. er = KCERR_TIMEOUT;
  70. goto exit;
  71. }
  72. } else
  73. m_hCondNotFull.wait(locker);
  74. }
  75. const size_type cbNow = std::min(cbBuf - cbWritten, m_ulMaxSize - m_storage.size());
  76. try {
  77. m_storage.insert(m_storage.end(), lpData + cbWritten, lpData + cbWritten + cbNow);
  78. } catch (const std::bad_alloc &) {
  79. er = KCERR_NOT_ENOUGH_MEMORY;
  80. goto exit;
  81. }
  82. m_hCondNotEmpty.notify_one();
  83. cbWritten += cbNow;
  84. }
  85. exit:
  86. locker.unlock();
  87. if (lpcbWritten && (er == erSuccess || er == KCERR_TIMEOUT))
  88. *lpcbWritten = cbWritten;
  89. return er;
  90. }
  91. /**
  92. * Read data from the FIFO.
  93. *
  94. * @param[in,out] lpBuf Pointer to where the data should be stored.
  95. * @param[in] cbBuf The amount of data to read (in bytes).
  96. * @param[out] lpcbWritten The amount of data actually read.
  97. * @param[in] ulTimeoutMs The maximum amount that this function may block.
  98. *
  99. * @retval erSuccess The data was successfully written.
  100. * @retval KCERR_INVALID_PARAMETER lpBuf is NULL.
  101. * @retval KCERR_TIMEOUT Not all data was writting within the specified time limit.
  102. * The amount of data that was written is returned in lpcbWritten.
  103. */
  104. ECRESULT ECFifoBuffer::Read(void *lpBuf, size_type cbBuf, unsigned int ulTimeoutMs, size_type *lpcbRead)
  105. {
  106. ECRESULT er = erSuccess;
  107. size_type cbRead = 0;
  108. struct timespec deadline = {0};
  109. auto lpData = reinterpret_cast<unsigned char *>(lpBuf);
  110. if (lpBuf == NULL)
  111. return KCERR_INVALID_PARAMETER;
  112. if (IsClosed(cfRead))
  113. return KCERR_NETWORK_ERROR;
  114. if (cbBuf == 0) {
  115. if (lpcbRead)
  116. *lpcbRead = 0;
  117. return erSuccess;
  118. }
  119. if (ulTimeoutMs > 0)
  120. deadline = GetDeadline(ulTimeoutMs);
  121. ulock_normal locker(m_hMutex);
  122. while (cbRead < cbBuf) {
  123. while (IsEmpty()) {
  124. if (IsClosed(cfWrite))
  125. goto exit;
  126. if (ulTimeoutMs > 0) {
  127. if (m_hCondNotEmpty.wait_for(locker,
  128. std::chrono::milliseconds(ulTimeoutMs)) ==
  129. std::cv_status::timeout) {
  130. er = KCERR_TIMEOUT;
  131. goto exit;
  132. }
  133. } else
  134. m_hCondNotEmpty.wait(locker);
  135. }
  136. const size_type cbNow = std::min(cbBuf - cbRead, m_storage.size());
  137. auto iEndNow = m_storage.begin() + cbNow;
  138. std::copy(m_storage.begin(), iEndNow, lpData + cbRead);
  139. m_storage.erase(m_storage.begin(), iEndNow);
  140. m_hCondNotFull.notify_one();
  141. cbRead += cbNow;
  142. }
  143. if (IsEmpty() && IsClosed(cfWrite))
  144. m_hCondFlushed.notify_one();
  145. exit:
  146. locker.unlock();
  147. if (lpcbRead && (er == erSuccess || er == KCERR_TIMEOUT))
  148. *lpcbRead = cbRead;
  149. return er;
  150. }
  151. /**
  152. * Close a buffer.
  153. * This causes new writes to the buffer to fail with KCERR_NETWORK_ERROR and all
  154. * (pending) reads on the buffer to return immediately.
  155. *
  156. * @retval erSucces (never fails)
  157. */
  158. ECRESULT ECFifoBuffer::Close(close_flags flags)
  159. {
  160. scoped_lock locker(m_hMutex);
  161. if (flags & cfRead) {
  162. m_bReaderClosed = true;
  163. m_hCondNotFull.notify_one();
  164. if(IsEmpty())
  165. m_hCondFlushed.notify_one();
  166. }
  167. if (flags & cfWrite) {
  168. m_bWriterClosed = true;
  169. m_hCondNotEmpty.notify_one();
  170. }
  171. return erSuccess;
  172. }
  173. /**
  174. * Wait for the stream to be flushed
  175. *
  176. * This guarantees that the reader has read all the data from the fifo or
  177. * the reader endpoint is closed.
  178. *
  179. * The writer endpoint must be closed before calling this method.
  180. */
  181. ECRESULT ECFifoBuffer::Flush()
  182. {
  183. if (!IsClosed(cfWrite))
  184. return KCERR_NETWORK_ERROR;
  185. ulock_normal locker(m_hMutex);
  186. m_hCondFlushed.wait(locker,
  187. [this](void) { return IsClosed(cfWrite) || IsEmpty(); });
  188. return erSuccess;
  189. }
  190. } /* namespace */