| | |
| | | return std::string(buf, n + 4); |
| | | } |
| | | |
| | | const int AdjustMQLength(const int len) |
| | | { |
| | | const int kMaxLength = 10000; |
| | | const int kDefaultLen = 12; |
| | | if (len <= 0) { |
| | | return kDefaultLen; |
| | | } else if (len < kMaxLength) { |
| | | return len; |
| | | } else { |
| | | return kMaxLength; |
| | | } |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | ShmMsgQueue::MQId ShmMsgQueue::NewId() |
| | | { |
| | | static auto &id = GetData(); |
| | | return ++id; |
| | | static auto &id = GetData("Must init shared memory before use! Please make sure center is running."); |
| | | return (++id) * 10; |
| | | } |
| | | // ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2 |
| | | ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : |
| | | |
| | | ShmMsgQueue::ShmMsgQueue(ShmType &segment, const MQId id, const int len) : |
| | | id_(id), |
| | | queue_(segment, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager()) |
| | | queue_(segment, MsgQIdToName(id_), len, segment.get_segment_manager()) |
| | | { |
| | | } |
| | | |
| | | ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) : |
| | | id_(NewId()), |
| | | queue_(segment, true, MsgQIdToName(id_), AdjustMQLength(len), segment.get_segment_manager()) |
| | | ShmMsgQueue::ShmMsgQueue(ShmType &segment, const bool create_or_else_find, const MQId id, const int len) : |
| | | id_(id), |
| | | queue_(segment, create_or_else_find, MsgQIdToName(id_), len, segment.get_segment_manager()) |
| | | { |
| | | if (!queue_.IsOk()) { |
| | | throw("error create msgq " + std::to_string(id_)); |
| | | throw("error create/find msgq " + std::to_string(id_)); |
| | | } |
| | | } |
| | | ShmMsgQueue::ShmMsgQueue(const int64_t abs_addr, ShmType &segment, const MQId id) : |
| | | id_(id), queue_(abs_addr, segment, MsgQIdToName(id_)) |
| | | { |
| | | //TODO check some tag. |
| | | } |
| | | |
| | | ShmMsgQueue::~ShmMsgQueue() {} |
| | | |
| | | #ifndef BH_USE_ATOMIC_Q |
| | | ShmMsgQueue::Mutex &ShmMsgQueue::GetMutex(const MQId id) |
| | | { |
| | | static std::unordered_map<MQId, std::shared_ptr<Mutex>> imm; |
| | | |
| | | static std::mutex mtx; |
| | | std::lock_guard<std::mutex> lock(mtx); |
| | | auto pos = imm.find(id); |
| | | if (pos == imm.end()) { |
| | | pos = imm.emplace(id, new Mutex(id)).first; |
| | | // pos = imm.emplace(id, new Mutex()).first; |
| | | } |
| | | return *pos->second; |
| | | } |
| | | #endif |
| | | |
| | | bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id) |
| | | { |
| | | Queue *q = Find(shm, id); |
| | | if (q) { |
| | | MsgI msg; |
| | | while (q->TryRead(msg)) { |
| | | msg.Release(); |
| | | RawData val = 0; |
| | | while (q->TryRead(val)) { |
| | | if (IsCmd(val)) { |
| | | LOG_DEBUG() << "clsing queue " << id << ", has a cmd" << DecodeCmd(val); |
| | | } else { |
| | | MsgI(val).Release(); |
| | | } |
| | | } |
| | | } |
| | | return Shmq::Remove(shm, MsgQIdToName(id)); |
| | |
| | | return Shmq::Find(shm, MsgQIdToName(remote_id)); |
| | | } |
| | | |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend) |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQInfo &remote, const RawData val) |
| | | { |
| | | Queue *remote = Find(shm, remote_id); |
| | | if (remote) { |
| | | if (onsend) { |
| | | return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); |
| | | } else { |
| | | return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); }); |
| | | } |
| | | } else { |
| | | // SetLestError(eNotFound); |
| | | try { |
| | | //TODO find from center, or use offset. |
| | | ShmMsgQueue dest(remote.offset_, shm, remote.id_); |
| | | #ifndef BH_USE_ATOMIC_Q |
| | | Guard lock(GetMutex(remote_id)); |
| | | #endif |
| | | return dest.queue().TryWrite(val); |
| | | } catch (...) { |
| | | // SetLastError(eNotFound, "remote not found"); |
| | | return false; |
| | | } |
| | | } |