/* * ===================================================================================== * * Filename: shm_msg_queue.h * * Description: * * Version: 1.0 * Created: 2021年04月26日 16时25分21秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), lichao@aiotlink.com * Organization: * * ===================================================================================== */ #ifndef SHM_MSG_QUEUE_D847TQXH #define SHM_MSG_QUEUE_D847TQXH #include "msg.h" #include "shm_queue.h" using namespace bhome_shm; using namespace bhome_msg; class ShmMsgQueue : public StaticDataRef, ShmMsgQueue> { typedef ShmObject> Shmq; typedef Shmq::ShmType ShmType; typedef Shmq::Data Queue; typedef std::function OnSend; public: typedef uint64_t MQId; static MQId NewId(); ShmMsgQueue(const MQId id, ShmType &segment, const int len); ShmMsgQueue(ShmType &segment, const int len); ~ShmMsgQueue(); static bool Remove(SharedMemory &shm, const MQId id); MQId Id() const { return id_; } ShmType &shm() const { return queue_.shm(); } bool Recv(MsgI &msg, const int timeout_ms) { return queue_.data()->Read(msg, timeout_ms); } bool TryRecv(MsgI &msg) { return queue_.data()->TryRead(msg); } static Queue *Find(SharedMemory &shm, const MQId remote_id); static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend()); template static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend()) { Queue *remote = Find(shm, remote_id); if (remote) { if (onsend) { return remote->TryWrite(begin, end, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); } else { return remote->TryWrite(begin, end, [](const MsgI &msg) { msg.AddRef(); }); } } else { // SetLestError(eNotFound); return 0; } } template bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); } template int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); } private: MQId id_; Shmq &queue() { return queue_; } Shmq queue_; }; #endif // end of include guard: SHM_MSG_QUEUE_D847TQXH