| | |
| | | |
| | | ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : |
| | | id_(id), |
| | | queue_(segment, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager()) |
| | | queue_(segment, MsgQIdToName(id_), len, segment.get_segment_manager()) |
| | | { |
| | | } |
| | | |
| | | ShmMsgQueue::ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len) : |
| | | id_(id), |
| | | queue_(segment, create_or_else_find, MsgQIdToName(id_)) |
| | | queue_(segment, create_or_else_find, MsgQIdToName(id_), len, segment.get_segment_manager()) |
| | | { |
| | | if (!queue_.IsOk()) { |
| | | throw("error create/find msgq " + std::to_string(id_)); |
| | |
| | | |
| | | ShmMsgQueue::~ShmMsgQueue() {} |
| | | |
| | | ShmMsgQueue::Mutex &ShmMsgQueue::GetMutex(const MQId id) |
| | | { |
| | | static std::unordered_map<MQId, std::shared_ptr<Mutex>> imm; |
| | | |
| | | static std::mutex mtx; |
| | | std::lock_guard<std::mutex> lock(mtx); |
| | | auto pos = imm.find(id); |
| | | if (pos == imm.end()) { |
| | | pos = imm.emplace(id, new Mutex(id)).first; |
| | | } |
| | | return *pos->second; |
| | | } |
| | | bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id) |
| | | { |
| | | Queue *q = Find(shm, id); |
| | |
| | | |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg) |
| | | { |
| | | Queue *remote = Find(shm, remote_id); |
| | | bool r = false; |
| | | if (remote) { |
| | | try { |
| | | ShmMsgQueue dest(remote_id, false, shm, 1); |
| | | msg.AddRef(); |
| | | r = remote->TryWrite(msg.Offset()); |
| | | if (!r) { |
| | | msg.Release(); |
| | | } |
| | | DEFER1(if (!r) { msg.Release(); }); |
| | | |
| | | Guard lock(GetMutex(remote_id)); |
| | | r = dest.queue().TryWrite(msg.Offset()); |
| | | } catch (...) { |
| | | } |
| | | return r; |
| | | } |