/* * ===================================================================================== * * Filename: shm_queue.cpp * * Description: * * Version: 1.0 * Created: 2021年03月25日 10时34分42秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "shm_queue.h" #include #include #include "bh_util.h" namespace bhome_shm { using namespace bhome_msg; using namespace boost::interprocess; using namespace boost::uuids; namespace { std::string MsgQIdToName(const MQId& id) { return "shmq" + to_string(id); } MQId EmptyId() { return nil_uuid(); } MQId NewId() { return random_generator()(); } const int AdjustMQLength(const int len) { const int kMaxLength = 10000; const int kDefaultLen = 12; if (len <= 0) { return kDefaultLen; } else if (len < kMaxLength) { return len; } else { return kMaxLength; } } } // ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2 ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const int len): Super(segment, MsgQIdToName(id), AdjustMQLength(len), segment.get_segment_manager()), id_(id) { } ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len):ShmMsgQueue(NewId(), segment, len) {} ShmMsgQueue::~ShmMsgQueue() { Remove(); } bool ShmMsgQueue::Send(const MQId &remote_id, const Msg &msg, const int timeout_ms) { Queue *remote = find(MsgQIdToName(remote_id)); return remote && remote->Write(msg, timeout_ms, [](const Msg&msg){msg.AddRef();}); } bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms) { // Test shows that in the 2 cases: // 1) build msg first, then find remote queue; // 2) find remote queue first, then build msg; // 1 is about 50% faster than 2, maybe cache related. Msg msg; if(msg.Build(shm(), Id(), data, size, false)) { if(Send(remote_id, msg, timeout_ms)) { return true; } else { msg.Release(shm()); } } return false; } bool ShmMsgQueue::Recv(MQId &source_id, void *&data, size_t &size, const int timeout_ms) { Msg msg; if (Read(msg, timeout_ms)) { DEFER1(msg.Release(shm());); auto ptr = msg.get(); if (ptr) { MsgMetaV1 meta; meta.Parse(ptr); source_id = meta.src_id_; size = meta.data_size_; data = malloc(size); if (data) { memcpy(data, ptr + meta.self_size_, size); return true; } } } source_id = EmptyId(); data = 0; size = 0; return false; } } // namespace bhome_shm