MessageQueue.hpp 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. #pragma once
  2. #include <deque>
  3. #include <mutex>
  4. #include <condition_variable>
  5. #include <optional>
  6. #include <functional>
  7. namespace QuickMedia {
  8. template <typename T>
  9. class MessageQueue {
  10. public:
  11. MessageQueue() : running(true) {
  12. }
  13. void push(T data) {
  14. std::unique_lock<std::mutex> lock(mutex);
  15. data_queue.push_back(std::move(data));
  16. cv.notify_one();
  17. }
  18. std::optional<T> pop_wait() {
  19. std::unique_lock<std::mutex> lock(mutex);
  20. if(!running)
  21. return std::nullopt;
  22. while(data_queue.empty() && running) cv.wait(lock);
  23. if(!running)
  24. return std::nullopt;
  25. T data = std::move(data_queue.front());
  26. data_queue.pop_front();
  27. return data;
  28. }
  29. std::optional<T> pop_if_available() {
  30. std::unique_lock<std::mutex> lock(mutex);
  31. if(data_queue.empty())
  32. return std::nullopt;
  33. T data = std::move(data_queue.front());
  34. data_queue.pop_front();
  35. return data;
  36. }
  37. void close() {
  38. std::unique_lock<std::mutex> lock(mutex);
  39. running = false;
  40. data_queue.clear();
  41. cv.notify_one();
  42. }
  43. void clear() {
  44. std::unique_lock<std::mutex> lock(mutex);
  45. data_queue.clear();
  46. }
  47. void restart() {
  48. std::unique_lock<std::mutex> lock(mutex);
  49. running = true;
  50. }
  51. // Return true from |callback| to remove the element
  52. int erase_if(std::function<bool(T&)> callback) {
  53. std::unique_lock<std::mutex> lock(mutex);
  54. int removed = 0;
  55. for(auto it = data_queue.begin(); it != data_queue.end();) {
  56. if(callback(*it)) {
  57. it = data_queue.erase(it);
  58. ++removed;
  59. } else {
  60. ++it;
  61. }
  62. }
  63. return removed;
  64. }
  65. bool is_running() const {
  66. return running;
  67. }
  68. private:
  69. std::deque<T> data_queue;
  70. std::mutex mutex;
  71. std::condition_variable cv;
  72. bool running;
  73. };
  74. }