From 02ba913dc7bb5d711471b27f2ea23a897d0f2e28 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 23 四月 2021 15:34:26 +0800 Subject: [PATCH] bind msgi to shm, change offset_ptr to abs offset. --- src/msg.h | 173 ++++++++++++++++++++++++++++++++++++++++----------------- 1 files changed, 122 insertions(+), 51 deletions(-) diff --git a/src/msg.h b/src/msg.h index 452567e..99b3a09 100644 --- a/src/msg.h +++ b/src/msg.h @@ -31,83 +31,150 @@ { using namespace bhome_shm; -// MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required. -// message format: header(meta) + body(data). +// ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required. +// message content layout: (meta) / header_size + header + data_size + data typedef boost::uuids::uuid MQId; -// store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. -class RefCount : private boost::noncopyable -{ - std::atomic<int> num_; - -public: - RefCount() : - num_(1) { static_assert(std::is_pod<decltype(num_)>::value); } - int Inc() { return ++num_; } - int Dec() { return --num_; } - int Get() { return num_.load(); } -}; - -// message content layout: header_size + header + data_size + data -class MsgI +class ShmMsg { private: + // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. + class RefCount : private boost::noncopyable + { + std::atomic<int> num_; + + public: + RefCount() : + num_(1) { static_assert(std::is_pod<decltype(num_)>::value); } + int Inc() { return ++num_; } + int Dec() { return --num_; } + int Get() { return num_.load(); } + }; + typedef int64_t Offset; + static Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); } + static void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); } + static inline Offset BaseAddr() + { + static const Offset base = Addr(shm().get_address()); // cache value. + return base; + } + static inline SharedMemory &shm() + { + if (!pshm()) { throw std::string("Must set ShmMsg shm before use!"); } + return *pshm(); + } + static inline SharedMemory *&pshm() + { + static SharedMemory *pshm = 0; + return pshm; + } + struct Meta { RefCount count_; }; - offset_ptr<void> ptr_; - void *Alloc(SharedMemory &shm, const size_t size); - void Free(SharedMemory &shm); + Offset offset_; + void *Alloc(const size_t size) + { + void *p = shm().Alloc(sizeof(Meta) + size); + if (p) { + auto pmeta = new (p) Meta; + p = pmeta + 1; + } + return p; + } + void Free() + { + assert(valid()); + shm().Dealloc(meta()); + offset_ = 0; + assert(!valid()); + } Meta *meta() const { return get<Meta>() - 1; } typedef std::function<void(void *p, int len)> ToArray; - void *Pack(SharedMemory &shm, - const uint32_t head_len, const ToArray &headToArray, - const uint32_t body_len, const ToArray &bodyToArray); + void *Pack(const uint32_t head_len, const ToArray &headToArray, + const uint32_t body_len, const ToArray &bodyToArray) + { + void *addr = Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len); + if (addr) { + auto p = static_cast<char *>(addr); + auto Pack1 = [&p](auto len, auto &writer) { + Put32(p, len); + p += sizeof(len); + writer(p, len); + p += len; + }; + Pack1(head_len, headToArray); + Pack1(body_len, bodyToArray); + } + return addr; + } template <class Body> - void *Pack(SharedMemory &shm, const BHMsgHead &head, const Body &body) + void *Pack(const BHMsgHead &head, const Body &body) { return Pack( - shm, uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); }, uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); }); } - void *Pack(SharedMemory &shm, const std::string &content) + void *Pack(const std::string &content) { - void *addr = Alloc(shm, content.size()); + void *addr = Alloc(content.size()); if (addr) { memcpy(addr, content.data(), content.size()); } return addr; } - bool Make(SharedMemory &shm, void *addr); - MsgI(void *p) : - ptr_(p) {} + bool Make(void *addr) + { + if (!addr) { + return false; + } + ShmMsg(addr).swap(*this); + return true; + } + ShmMsg(void *p) : + offset_(p ? (Addr(p) - BaseAddr()) : 0) {} + + template <class T = void> + T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); } public: - MsgI() : - MsgI(nullptr) {} - MsgI(SharedMemory &shm, const size_t size) : - MsgI(Alloc(shm, size)) {} - void swap(MsgI &a) { std::swap(ptr_, a.ptr_); } - bool valid() const { return static_cast<bool>(ptr_); } - template <class T = void> - T *get() const { return static_cast<T *>(ptr_.get()); } + static bool BindShm(SharedMemory &shm) + { + assert(!pshm()); + pshm() = &shm; + return true; + } + + ShmMsg() : + ShmMsg(nullptr) {} + explicit ShmMsg(const size_t size) : + ShmMsg(Alloc(size)) {} + void swap(ShmMsg &a) { std::swap(offset_, a.offset_); } + bool valid() const { return static_cast<bool>(offset_); } // AddRef and Release works for both counted and not counted msg. int AddRef() const { return valid() ? meta()->count_.Inc() : 1; } - int Release(SharedMemory &shm); + int Release() + { + if (!valid()) { + return 0; + } + auto n = meta()->count_.Dec(); + if (n == 0) { + Free(); + } + return n; + } int Count() const { return valid() ? meta()->count_.Get() : 1; } template <class Body> - inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body) - { - return Make(shm, Pack(shm, head, body)); - } + inline bool Make(const BHMsgHead &head, const Body &body) { return Make(Pack(head, body)); } + inline bool Make(const std::string &content) { return Make(Pack(content)); } template <class Body> static inline std::string Serialize(const BHMsgHead &head, const Body &body) { @@ -126,17 +193,19 @@ assert(pos == s.size()); return s; } - inline bool Make(SharedMemory &shm, const std::string &content) - { - void *p = Pack(shm, content); - return Make(shm, p); - } - bool ParseHead(BHMsgHead &head) const; + bool ParseHead(BHMsgHead &head) const + { + auto p = get<char>(); + assert(p); + uint32_t msg_size = Get32(p); + p += 4; + return head.ParseFromArray(p, msg_size); + } template <class Body> bool ParseBody(Body &body) const { - auto p = static_cast<char *>(ptr_.get()); + auto p = get<char>(); assert(p); uint32_t size = Get32(p); p += 4; @@ -147,7 +216,9 @@ } }; -inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); } +inline void swap(ShmMsg &m1, ShmMsg &m2) { m1.swap(m2); } + +typedef ShmMsg MsgI; } // namespace bhome_msg -- Gitblit v1.8.0