/* * ===================================================================================== * * Filename: msg.h * * Description: * * Version: 1.0 * Created: 2021年03月24日 16时49分20秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #ifndef MSG_5BILLZET #define MSG_5BILLZET #include "bh_util.h" #include "proto.h" #include "shm.h" #include #include #include #include #include namespace bhome_msg { using namespace bhome_shm; // MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required. // message format: header(meta) + body(data). typedef boost::uuids::uuid MQId; // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. class RefCount : private boost::noncopyable { std::atomic num_; public: RefCount() : num_(1) { static_assert(std::is_pod::value); } int Inc() { return ++num_; } int Dec() { return --num_; } int Get() { return num_.load(); } }; // message content layout: header_size + header + data_size + data class MsgI { private: struct Meta { RefCount count_; }; offset_ptr ptr_; void *Alloc(SharedMemory &shm, const size_t size); void Free(SharedMemory &shm); Meta *meta() const { return get() - 1; } typedef std::function ToArray; void *Pack(SharedMemory &shm, const uint32_t head_len, const ToArray &headToArray, const uint32_t body_len, const ToArray &bodyToArray); template void *Pack(SharedMemory &shm, const BHMsgHead &head, const Body &body) { return Pack( shm, uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); }, uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); }); } void *Pack(SharedMemory &shm, const std::string &content) { void *addr = Alloc(shm, content.size()); if (addr) { memcpy(addr, content.data(), content.size()); } return addr; } bool Make(SharedMemory &shm, void *addr); MsgI(void *p) : ptr_(p) {} public: MsgI() : MsgI(nullptr) {} MsgI(SharedMemory &shm, const size_t size) : MsgI(Alloc(shm, size)) {} void swap(MsgI &a) { std::swap(ptr_, a.ptr_); } bool valid() const { return static_cast(ptr_); } template T *get() const { return static_cast(ptr_.get()); } // AddRef and Release works for both counted and not counted msg. int AddRef() const { return valid() ? meta()->count_.Inc() : 1; } int Release(SharedMemory &shm); int Count() const { return valid() ? meta()->count_.Get() : 1; } template inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body) { return Make(shm, Pack(shm, head, body)); } template static inline std::string Serialize(const BHMsgHead &head, const Body &body) { uint32_t head_len = head.ByteSizeLong(); uint32_t body_len = body.ByteSizeLong(); std::string s(4 + head_len + 4 + body_len, '\0'); size_t pos = 0; auto add1 = [&](auto &&msg, auto &&size) { Put32(&s[pos], size); pos += 4; msg.SerializeToArray(&s[pos], size); pos += size; }; add1(head, head_len); add1(body, body_len); assert(pos == s.size()); return s; } inline bool Make(SharedMemory &shm, const std::string &content) { void *p = Pack(shm, content); return Make(shm, p); } bool ParseHead(BHMsgHead &head) const; template bool ParseBody(Body &body) const { auto p = static_cast(ptr_.get()); assert(p); uint32_t size = Get32(p); p += 4; p += size; size = Get32(p); p += 4; return body.ParseFromArray(p, size); } }; inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); } } // namespace bhome_msg #endif // end of include guard: MSG_5BILLZET