SafetyQueue.hpp
1 
7 #ifndef EMX_SAFETYQUEUE_HPP
8 #define EMX_SAFETYQUEUE_HPP
9 
10 #include <list>
11 #include <mutex>
12 #include <condition_variable>
13 #include <functional>
14 #include <atomic>
15 
16 namespace Emx {
20 template<typename T>
21 class SafetyQueue {
22  public:
28  using OnCloned = std::function<T(const T& data)>;
29 
35  using OnFreed = std::function<void(T& data)>;
36 
37  SafetyQueue(int total = 3,
38  OnCloned clonedCb = nullptr,
39  OnFreed freedCb = nullptr)
40  : m_total(total), m_clonedCb(clonedCb), m_freedCb(freedCb), m_exit(false) {
41  }
42 
44  Clear();
45  }
46 
52  void Put(const T& data) {
53  std::unique_lock<std::mutex> locker(m_mtx);
54  T clonedData = (m_clonedCb) ? m_clonedCb(data) : data;
55  if (clonedData == nullptr) {
56  return;
57  }
58  m_queue.emplace_back(std::move(clonedData));
59 
60  while ((int)m_queue.size() > m_total) {
61  if (m_freedCb != nullptr) {
62  m_freedCb(m_queue.front());
63  }
64  m_queue.pop_front();
65  }
66 
67  // 如果存在等待线程,通知一个线程继续执行
68  if (!m_waitQueue.empty()) {
69  auto wait = m_waitQueue.front();
70  m_waitQueue.pop_front();
71  (*(wait.first)) = false;
72  wait.second->notify_one();
73  }
74  }
75 
82  T Get() {
83  std::unique_lock<std::mutex> locker(m_mtx);
84  while (m_queue.empty()) {
85  // 队列为空的时候,线程阻塞等待队列非空状态唤醒。只有当原子位复位才退出等待
86  if (m_exit) {
87  return nullptr;
88  }
89  std::shared_ptr<bool> is_empty = std::make_shared<bool>(true);
90  std::shared_ptr<std::condition_variable> cv = std::make_shared<std::condition_variable>();
91  m_waitQueue.emplace_back(std::make_pair(is_empty, cv));
92  cv->wait(locker, [is_empty, this]() { return !(*is_empty) || m_exit; });
93  }
94  // 处理stop退出场景,队列被清空的场景
95  if (m_exit && m_queue.empty()) {
96  return nullptr;
97  }
98  T data = std::move(m_queue.front());
99  m_queue.pop_front();
100  return data;
101  }
102 
107  int Size() {
108  std::unique_lock<std::mutex> locker(m_mtx);
109  return m_queue.size();
110  }
111 
115  void Clear() {
116  std::unique_lock<std::mutex> locker(m_mtx);
117  if (m_freedCb != nullptr) {
118  for(auto &data : m_queue) {
119  if (data != nullptr) {
120  m_freedCb(data);
121  }
122  }
123  }
124  m_queue.clear();
125  m_waitQueue.clear();
126  }
127 
131  void Stop() {
132  std::unique_lock<std::mutex> locker(m_mtx);
133  m_exit = true;
134 
135  for (auto& wait : m_waitQueue) {
136  *(wait.first) = false;
137  wait.second->notify_one();
138  }
139  m_waitQueue.clear();
140  }
141 
146  void SetTotal(int total) {
147  std::unique_lock<std::mutex> locker(m_mtx);
148  m_total = total;
149  }
150 
151  private:
152  int m_total;
153  std::mutex m_mtx;
154  std::list<T> m_queue;
155  std::list<std::pair<std::shared_ptr<bool>, std::shared_ptr<std::condition_variable>>> m_waitQueue;
156  OnCloned m_clonedCb;
157  OnFreed m_freedCb;
158  std::atomic<bool> m_exit;
159 };
161 }
162 
163 #endif // EMX_SAFETYQUEUE_HPP
Definition: SafetyQueue.hpp:21
int Size()
获取队列长度
Definition: SafetyQueue.hpp:107
void Clear()
清除队列
Definition: SafetyQueue.hpp:115
void Stop()
优雅安全退出Get操作线程等待阻塞
Definition: SafetyQueue.hpp:131
void SetTotal(int total)
设置队列长度上限,默认为3
Definition: SafetyQueue.hpp:146
std::function< T(const T &data)> OnCloned
数据克隆
Definition: SafetyQueue.hpp:28
void Put(const T &data)
put数据到队列
Definition: SafetyQueue.hpp:52
std::function< void(T &data)> OnFreed
数据释放
Definition: SafetyQueue.hpp:35
T Get()
从队列中get数据
Definition: SafetyQueue.hpp:82
~SafetyQueue()
Definition: SafetyQueue.hpp:43
SafetyQueue(int total=3, OnCloned clonedCb=nullptr, OnFreed freedCb=nullptr)
Definition: SafetyQueue.hpp:37
Definition: EmxGpio.hpp:10