/*
|
* =====================================================================================
|
*
|
* Filename: shm_msg_queue.h
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年04月26日 16时25分21秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#ifndef SHM_MSG_QUEUE_D847TQXH
|
#define SHM_MSG_QUEUE_D847TQXH
|
|
#include "msg.h"
|
#include "shm_queue.h"
|
|
using namespace bhome_shm;
|
using namespace bhome_msg;
|
|
class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
|
{
|
// typedef ShmObject<SharedQ63<4>> Shmq;
|
typedef ShmObject<SharedQueue<int64_t>> Shmq;
|
typedef Shmq::ShmType ShmType;
|
typedef Shmq::Data Queue;
|
typedef std::function<void()> OnSend;
|
typedef robust::FMutex Mutex;
|
typedef robust::Guard<Mutex> Guard;
|
|
public:
|
typedef uint64_t MQId;
|
|
static MQId NewId();
|
|
ShmMsgQueue(const MQId id, ShmType &segment, const int len);
|
ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len);
|
ShmMsgQueue(ShmType &segment, const int len);
|
~ShmMsgQueue();
|
static bool Remove(SharedMemory &shm, const MQId id);
|
MQId Id() const { return id_; }
|
ShmType &shm() const { return queue_.shm(); }
|
|
bool Recv(MsgI &msg, const int timeout_ms)
|
{
|
Guard lock(GetMutex(Id()));
|
return queue().Read(msg.OffsetRef(), timeout_ms);
|
}
|
bool TryRecv(MsgI &msg)
|
{
|
Guard lock(GetMutex(Id()));
|
return queue().TryRead(msg.OffsetRef());
|
}
|
static Queue *Find(SharedMemory &shm, const MQId remote_id);
|
static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg);
|
bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); }
|
|
private:
|
static Mutex &GetMutex(const MQId id);
|
MQId id_;
|
Queue &queue() { return *queue_.data(); }
|
Shmq queue_;
|
};
|
|
#endif // end of include guard: SHM_MSG_QUEUE_D847TQXH
|