/* * ===================================================================================== * * Filename: shm_queue.h * * Description: * * Version: 1.0 * Created: 2021年03月25日 10时35分09秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #ifndef SHM_QUEUE_JE0OEUP3 #define SHM_QUEUE_JE0OEUP3 #include "msg.h" #include "shm.h" #include #include namespace bhome_shm { template using Circular = boost::circular_buffer>; typedef boost::uuids::uuid MQId; template class SharedQueue : private Circular { typedef Circular Super; Mutex mutex_; Cond cond_read_; Cond cond_write_; Mutex &mutex() { return mutex_; } static boost::posix_time::ptime MSFromNow(const int ms) { using namespace boost::posix_time; ptime cur = boost::posix_time::microsec_clock::universal_time(); return cur + millisec(ms); } public: SharedQueue(const uint32_t len, Allocator const &alloc) : Super(len, alloc) {} using Super::capacity; using Super::size; template int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite) { int n = 0; if (begin != end) { auto endtime = MSFromNow(timeout_ms); Guard lock(mutex()); while (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); })) { onWrite(*begin); this->push_back(*begin); ++n; cond_read_.notify_one(); if (++begin == end) { break; } } } return n; } template bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) { return Write(&buf, (&buf) + 1, timeout_ms, onWrite); } bool Write(const D &buf, const int timeout_ms) { return Write(buf, timeout_ms, [](const D &buf) {}); } template bool Read(const int timeout_ms, OnData onData) { int n = 0; auto endtime = MSFromNow(timeout_ms); Guard lock(mutex()); while (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })) { const bool more = onData(this->front()); this->pop_front(); cond_write_.notify_one(); ++n; if (!more) { break; } } return n; } bool Read(D &buf, const int timeout_ms) { auto read1 = [&](D &d) { using std::swap; swap(buf, d); return false; }; return Read(timeout_ms, read1) == 1; } }; using namespace bhome_msg; class ShmMsgQueue : private ShmObject> { typedef ShmObject> Super; typedef Super::Data Queue; typedef std::function OnSend; bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); } bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); } MQId id_; protected: ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use. public: ShmMsgQueue(const MQId &id, ShmType &segment, const int len); ShmMsgQueue(ShmType &segment, const int len); ~ShmMsgQueue(); const MQId &Id() const { return id_; } bool Recv(BHMsg &msg, const int timeout_ms); bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); } static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend); static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms); template bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, Extra const &...extra) { return Send(shm(), remote_id, msg, timeout_ms, extra...); } template bool Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, Extra const &...extra) { MsgI msg; if (msg.Make(shm(), data)) { if (Send(shm(), remote_id, msg, timeout_ms, extra...)) { return true; } else { msg.Release(shm()); } } return false; } size_t Pending() const { return data()->size(); } }; } // namespace bhome_shm #endif // end of include guard: SHM_QUEUE_JE0OEUP3