/*
|
* =====================================================================================
|
*
|
* Filename: shm_msg_queue.h
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年04月26日 16时25分21秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#ifndef SHM_MSG_QUEUE_D847TQXH
|
#define SHM_MSG_QUEUE_D847TQXH
|
|
#include "msg.h"
|
#include "shm_queue.h"
|
|
using namespace bhome_shm;
|
using namespace bhome_msg;
|
|
#define BH_USE_ATOMIC_Q
|
|
class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
|
{
|
public:
|
typedef int64_t RawData;
|
|
#ifdef BH_USE_ATOMIC_Q
|
typedef ShmObject<SharedQ63<0>> Shmq;
|
#else
|
typedef ShmObject<SharedQueue<RawData>> Shmq;
|
// typedef robust::FMutex Mutex;
|
// typedef robust::SemMutex Mutex;
|
typedef robust::NullMutex Mutex;
|
typedef robust::Guard<Mutex> Guard;
|
#endif
|
|
typedef Shmq::Data Queue;
|
typedef Shmq::ShmType ShmType;
|
typedef uint64_t MQId;
|
|
static MQId NewId();
|
|
ShmMsgQueue(const MQId id, ShmType &segment, const int len);
|
ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len);
|
ShmMsgQueue(ShmType &segment, const int len);
|
~ShmMsgQueue();
|
static bool Remove(ShmType &shm, const MQId id);
|
MQId Id() const { return id_; }
|
ShmType &shm() const { return queue_.shm(); }
|
|
bool Recv(RawData &val, const int timeout_ms)
|
{
|
#ifndef BH_USE_ATOMIC_Q
|
Guard lock(GetMutex(Id()));
|
#endif
|
return queue().Read(val, timeout_ms);
|
}
|
|
bool TryRecv(RawData &val)
|
{
|
#ifndef BH_USE_ATOMIC_Q
|
Guard lock(GetMutex(Id()));
|
#endif
|
return queue().TryRead(val);
|
}
|
|
bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); }
|
bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); }
|
static Queue *Find(ShmType &shm, const MQId remote_id);
|
static bool TrySend(ShmType &shm, const MQId remote_id, const RawData val);
|
static bool TrySend(ShmType &shm, const MQId remote_id, MsgI msg)
|
{
|
bool r = false;
|
msg.AddRef(); // TODO check if we could avoid addref here.
|
DEFER1(if (!r) { msg.Release(); });
|
r = TrySend(shm, remote_id, msg.Offset());
|
return r;
|
}
|
bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); }
|
bool TrySend(const MQId remote_id, const RawData val) { return TrySend(shm(), remote_id, val); }
|
|
private:
|
#ifndef BH_USE_ATOMIC_Q
|
static Mutex &GetMutex(const MQId id);
|
#endif
|
MQId id_;
|
Queue &queue() { return *queue_.data(); }
|
Shmq queue_;
|
};
|
|
#endif // end of include guard: SHM_MSG_QUEUE_D847TQXH
|