/* * ===================================================================================== * * Filename: msg.h * * Description: * * Version: 1.0 * Created: 2021年03月24日 16时49分20秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #ifndef MSG_5BILLZET #define MSG_5BILLZET #include "bh_util.h" #include "proto.h" #include "shm.h" #include #include #include #include namespace bhome_msg { using namespace bhome_shm; // ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required. // message content layout: (meta) / header_size + header + data_size + data class ShmMsg : private StaticDataRef { private: static inline SharedMemory &shm() { return GetData(); } // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. class RefCount : private boost::noncopyable { std::atomic num_; public: RefCount() : num_(1) { static_assert(std::is_pod::value); } int Inc() { return ++num_; } int Dec() { return --num_; } int Get() { return num_.load(); } }; typedef int64_t OffsetType; static OffsetType Addr(void *ptr) { return reinterpret_cast(ptr); } static void *Ptr(const OffsetType offset) { return reinterpret_cast(offset); } static inline OffsetType BaseAddr() { static const OffsetType base = Addr(shm().get_address()); // cache value. return base; } static const uint32_t kMsgTag = 0xf1e2d3c4; struct Meta { RefCount count_; const uint32_t tag_ = kMsgTag; const uint32_t size_ = 0; Meta(uint32_t size) : size_(size) {} }; OffsetType offset_; void *Alloc(const size_t size) { void *p = shm().Alloc(sizeof(Meta) + size); if (p) { auto pmeta = new (p) Meta(size); p = pmeta + 1; } return p; } void Free() { assert(valid()); shm().Dealloc(meta()); offset_ = 0; assert(!valid()); } Meta *meta() const { return get() - 1; } typedef std::function ToArray; void *Pack(const uint32_t head_len, const ToArray &headToArray, const uint32_t body_len, const ToArray &bodyToArray) { void *addr = Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len); if (addr) { auto p = static_cast(addr); auto Pack1 = [&p](auto len, auto &writer) { Put32(p, len); p += sizeof(len); writer(p, len); p += len; }; Pack1(head_len, headToArray); Pack1(body_len, bodyToArray); } return addr; } template void *Pack(const BHMsgHead &head, const Body &body) { return Pack( uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); }, uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); }); } void *Pack(const std::string &content) { void *addr = Alloc(content.size()); if (addr) { memcpy(addr, content.data(), content.size()); } return addr; } bool Make(void *addr) { if (!addr) { return false; } ShmMsg(addr).swap(*this); return true; } ShmMsg(void *p) : offset_(p ? (Addr(p) - BaseAddr()) : 0) {} template T *get() const { return static_cast(Ptr(offset_ + BaseAddr())); } public: static bool BindShm(SharedMemory &shm) { return SetData(shm); } ShmMsg() : ShmMsg(nullptr) {} explicit ShmMsg(const OffsetType offset) : offset_(offset) {} OffsetType Offset() const { return offset_; } OffsetType &OffsetRef() { return offset_; } void swap(ShmMsg &a) { std::swap(offset_, a.offset_); } bool valid() const { return static_cast(offset_) && meta()->tag_ == kMsgTag; } int AddRef() const { return valid() ? meta()->count_.Inc() : 1; } int Release() { if (!valid()) { return 0; } auto n = meta()->count_.Dec(); if (n == 0) { Free(); } return n; } int Count() const { return valid() ? meta()->count_.Get() : 1; } template inline bool Make(const BHMsgHead &head, const Body &body) { return Make(Pack(head, body)); } inline bool Make(const std::string &content) { return Make(Pack(content)); } template static inline std::string Serialize(const BHMsgHead &head, const Body &body) { uint32_t head_len = head.ByteSizeLong(); uint32_t body_len = body.ByteSizeLong(); std::string s(4 + head_len + 4 + body_len, '\0'); size_t pos = 0; auto add1 = [&](auto &&msg, auto &&size) { Put32(&s[pos], size); pos += 4; msg.SerializeToArray(&s[pos], size); pos += size; }; add1(head, head_len); add1(body, body_len); assert(pos == s.size()); return s; } bool ParseHead(BHMsgHead &head) const { auto p = get(); assert(p); uint32_t msg_size = Get32(p); p += 4; return head.ParseFromArray(p, msg_size); } template bool ParseBody(Body &body) const { auto p = get(); assert(p); uint32_t size = Get32(p); p += 4; p += size; size = Get32(p); p += 4; return body.ParseFromArray(p, size); } }; inline void swap(ShmMsg &m1, ShmMsg &m2) { m1.swap(m2); } typedef ShmMsg MsgI; } // namespace bhome_msg #endif // end of include guard: MSG_5BILLZET