| | |
| | | |
| | | ShmMsgQueue::~ShmMsgQueue() {} |
| | | |
| | | #ifndef BH_USE_ATOMIC_Q |
| | | ShmMsgQueue::Mutex &ShmMsgQueue::GetMutex(const MQId id) |
| | | { |
| | | static std::unordered_map<MQId, std::shared_ptr<Mutex>> imm; |
| | |
| | | } |
| | | return *pos->second; |
| | | } |
| | | #endif |
| | | |
| | | bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id) |
| | | { |
| | | Queue *q = Find(shm, id); |
| | | if (q) { |
| | | MsgI msg; |
| | | while (q->TryRead(msg.OffsetRef())) { |
| | | msg.Release(); |
| | | RawData val = 0; |
| | | while (q->TryRead(val)) { |
| | | if (IsCmd(val)) { |
| | | LOG_DEBUG() << "clsing queue " << id << ", has a cmd" << DecodeCmd(val); |
| | | } else { |
| | | MsgI(val).Release(); |
| | | } |
| | | } |
| | | } |
| | | return Shmq::Remove(shm, MsgQIdToName(id)); |
| | |
| | | return Shmq::Find(shm, MsgQIdToName(remote_id)); |
| | | } |
| | | |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg) |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, int64_t val) |
| | | { |
| | | bool r = false; |
| | | try { |
| | | ShmMsgQueue dest(remote_id, false, shm, 1); |
| | | msg.AddRef(); |
| | | DEFER1(if (!r) { msg.Release(); }); |
| | | |
| | | #ifndef BH_USE_ATOMIC_Q |
| | | Guard lock(GetMutex(remote_id)); |
| | | r = dest.queue().TryWrite(msg.Offset()); |
| | | #endif |
| | | return dest.queue().TryWrite(val); |
| | | } catch (...) { |
| | | // SetLastError(eNotFound, "remote not found"); |
| | | return false; |
| | | } |
| | | return r; |
| | | } |
| | | |
| | | // Test shows that in the 2 cases: |