12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- #pragma once
- #include <deque>
- #include <mutex>
- #include <condition_variable>
- #include <optional>
- #include <functional>
- namespace QuickMedia {
- template <typename T>
- class MessageQueue {
- public:
- MessageQueue() : running(true) {
- }
-
- void push(T data) {
- std::unique_lock<std::mutex> lock(mutex);
- data_queue.push_back(std::move(data));
- cv.notify_one();
- }
- std::optional<T> pop_wait() {
- std::unique_lock<std::mutex> lock(mutex);
- if(!running)
- return std::nullopt;
- while(data_queue.empty() && running) cv.wait(lock);
- if(!running)
- return std::nullopt;
- T data = std::move(data_queue.front());
- data_queue.pop_front();
- return data;
- }
- std::optional<T> pop_if_available() {
- std::unique_lock<std::mutex> lock(mutex);
- if(data_queue.empty())
- return std::nullopt;
- T data = std::move(data_queue.front());
- data_queue.pop_front();
- return data;
- }
- void close() {
- std::unique_lock<std::mutex> lock(mutex);
- running = false;
- data_queue.clear();
- cv.notify_one();
- }
- void clear() {
- std::unique_lock<std::mutex> lock(mutex);
- data_queue.clear();
- }
- void restart() {
- std::unique_lock<std::mutex> lock(mutex);
- running = true;
- }
- // Return true from |callback| to remove the element
- int erase_if(std::function<bool(T&)> callback) {
- std::unique_lock<std::mutex> lock(mutex);
- int removed = 0;
- for(auto it = data_queue.begin(); it != data_queue.end();) {
- if(callback(*it)) {
- it = data_queue.erase(it);
- ++removed;
- } else {
- ++it;
- }
- }
- return removed;
- }
- bool is_running() const {
- return running;
- }
- private:
- std::deque<T> data_queue;
- std::mutex mutex;
- std::condition_variable cv;
- bool running;
- };
- }
|