#ifndef _c_fixed_q_h_
|
#define _c_fixed_q_h_
|
|
#include <deque>
|
#include <condition_variable>
|
#include <functional>
|
|
#define notify notify_one
|
// #define notify notify_all
|
|
constexpr int du = 6;
|
|
template <class T>
|
class fixed_q{
|
public:
|
fixed_q(const size_t qsize, std::function<bool()> fn)
|
:qsize(qsize),pred(move(fn)){}
|
fixed_q(const fixed_q&)=delete;
|
fixed_q& operator=(const fixed_q&)=delete;
|
|
const size_t capacity()const{ return qsize; }
|
const size_t size(){
|
std::lock_guard<std::mutex> l{mtx_q};
|
return q.size();
|
}
|
int push(const T& m){
|
std::unique_lock<std::mutex> l{mtx_q};
|
auto d = std::chrono::milliseconds{du};
|
while(!cond_spare.wait_for(l, d, [this]{ return q.size() < qsize; })){
|
if (pred()) return -1;
|
}
|
|
q.push_back(m);
|
cond_node.notify();
|
return q.size();
|
}
|
int emplace(T&& m){
|
std::unique_lock<std::mutex> l{mtx_q};
|
auto d = std::chrono::milliseconds{du};
|
while(!cond_spare.wait_for(l, d, [this]{ return q.size() < qsize; })){
|
if (pred()) {
|
return -1;
|
}
|
}
|
q.emplace_back(move(m));
|
cond_node.notify();
|
return q.size();
|
}
|
int pop(T& m, const size_t ms){
|
std::unique_lock<std::mutex> l{mtx_q};
|
auto d = std::chrono::milliseconds{ms};
|
if(!cond_node.wait_for(l, d, [this]{ return !q.empty(); })){
|
return -1;
|
}
|
m = q.front();
|
q.pop_front();
|
cond_spare.notify();
|
return q.size();
|
}
|
int pop(T& m){
|
std::unique_lock<std::mutex> l{mtx_q};
|
auto d = std::chrono::milliseconds{du};
|
while(!cond_node.wait_for(l, d, [this]{ return !q.empty(); })){
|
if (pred()) return -1;
|
}
|
m = q.front();
|
q.pop_front();
|
cond_spare.notify();
|
return q.size();
|
}
|
T pop(const size_t ms){
|
std::unique_lock<std::mutex> l{mtx_q};
|
auto d = std::chrono::milliseconds{ms};
|
T t{};
|
if(!cond_node.wait_for(l, d, [this]{ return !q.empty(); })){
|
return t;
|
}
|
t = q.front();
|
q.pop_front();
|
cond_spare.notify();
|
return t;
|
}
|
T pop(){
|
std::unique_lock<std::mutex> l{mtx_q};
|
auto d = std::chrono::milliseconds{du};
|
T t{};
|
while(!cond_node.wait_for(l, d, [this]{ return !q.empty(); })){
|
if (pred()) return t;
|
}
|
t = q.front();
|
q.pop_front();
|
cond_spare.notify();
|
return t;
|
}
|
|
template<typename F> void clear(F&& f){
|
std::lock_guard<std::mutex> l{mtx_q};
|
while(!q.empty()){
|
auto& msg = q.front();
|
f(msg);
|
q.pop_front();
|
}
|
}
|
|
private:
|
const size_t qsize;
|
std::function<bool()> pred;
|
std::deque<T> q;
|
std::mutex mtx_q;
|
std::condition_variable cond_node;
|
std::condition_variable cond_spare;
|
};
|
|
#endif
|