/* * ===================================================================================== * * Filename: shm_queue.h * * Description: * * Version: 1.0 * Created: 2021年03月25日 10时35分09秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #ifndef SHM_QUEUE_JE0OEUP3 #define SHM_QUEUE_JE0OEUP3 #include "shm.h" #include "msg.h" #include #include namespace bhome_shm { template using Circular = boost::circular_buffer >; typedef boost::uuids::uuid MQId; template class SyncedQueue : private Circular { typedef Circular Super; Mutex mutex_; Cond cond_read_; Cond cond_write_; Mutex & mutex() { return mutex_; } const MQId id_; static boost::posix_time::ptime MSFromNow(const int ms) { using namespace boost::posix_time; ptime cur = boost::posix_time::microsec_clock::universal_time(); return cur + millisec(ms); } public: // template SyncedQueue(const MQId &id, T&&...t):Super(t...), id_(id) {} SyncedQueue(const MQId &id, const uint32_t len, Allocator const& alloc):Super(len, alloc), id_(id) {} using Super::size; using Super::capacity; const MQId &Id() const { return id_; } bool Write(const D &buf, const int timeout_ms) { Guard lock(mutex()); if (cond_write_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->full(); })) { this->push_back(buf); cond_read_.notify_one(); return true; } else { return false; } } bool Read(D &buf, const int timeout_ms){ Guard lock(mutex()); if (cond_read_.timed_wait(lock, MSFromNow(timeout_ms), [&]() { return !this->empty(); })) { using std::swap; swap(buf, this->front()); this->pop_front(); cond_write_.notify_one(); return true; } else { return false; } } }; class ShmMsgQueue : private ShmObject > { typedef ShmObject > SharedQueue; typedef SharedQueue::Data Queue; bool Write(const Msg &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); } bool Read(Msg &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); } public: ShmMsgQueue(const MQId &id, ShmType &segment, const uint32_t len); ShmMsgQueue(ShmType &segment, const uint32_t len); ~ShmMsgQueue(); bool Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms); bool Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms); const MQId &Id() const { return data()->Id(); } bool Send(const MQId &remote_id, const Msg &msg, const int timeout_ms); }; } // namespace bhome_shm #endif // end of include guard: SHM_QUEUE_JE0OEUP3