| | |
| | | { |
| | | using namespace bhome_shm; |
| | | |
| | | // MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required. |
| | | // message format: header(meta) + body(data). |
| | | // ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required. |
| | | // message content layout: (meta) / header_size + header + data_size + data |
| | | |
| | | typedef boost::uuids::uuid MQId; |
| | | |
| | | class ShmMsg |
| | | { |
| | | private: |
| | | // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. |
| | | class RefCount : private boost::noncopyable |
| | | { |
| | |
| | | int Dec() { return --num_; } |
| | | int Get() { return num_.load(); } |
| | | }; |
| | | |
| | | // message content layout: header_size + header + data_size + data |
| | | class MsgI |
| | | typedef int64_t Offset; |
| | | static Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); } |
| | | static void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); } |
| | | static inline Offset BaseAddr() |
| | | { |
| | | private: |
| | | static const Offset base = Addr(shm().get_address()); // cache value. |
| | | return base; |
| | | } |
| | | static inline SharedMemory &shm() |
| | | { |
| | | if (!pshm()) { throw std::string("Must set ShmMsg shm before use!"); } |
| | | return *pshm(); |
| | | } |
| | | static inline SharedMemory *&pshm() |
| | | { |
| | | static SharedMemory *pshm = 0; |
| | | return pshm; |
| | | } |
| | | |
| | | struct Meta { |
| | | RefCount count_; |
| | | }; |
| | | offset_ptr<void> ptr_; |
| | | void *Alloc(SharedMemory &shm, const size_t size); |
| | | void Free(SharedMemory &shm); |
| | | Offset offset_; |
| | | void *Alloc(const size_t size) |
| | | { |
| | | void *p = shm().Alloc(sizeof(Meta) + size); |
| | | if (p) { |
| | | auto pmeta = new (p) Meta; |
| | | p = pmeta + 1; |
| | | } |
| | | return p; |
| | | } |
| | | void Free() |
| | | { |
| | | assert(valid()); |
| | | shm().Dealloc(meta()); |
| | | offset_ = 0; |
| | | assert(!valid()); |
| | | } |
| | | Meta *meta() const { return get<Meta>() - 1; } |
| | | |
| | | typedef std::function<void(void *p, int len)> ToArray; |
| | | void *Pack(SharedMemory &shm, |
| | | const uint32_t head_len, const ToArray &headToArray, |
| | | const uint32_t body_len, const ToArray &bodyToArray); |
| | | void *Pack(const uint32_t head_len, const ToArray &headToArray, |
| | | const uint32_t body_len, const ToArray &bodyToArray) |
| | | { |
| | | void *addr = Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len); |
| | | if (addr) { |
| | | auto p = static_cast<char *>(addr); |
| | | auto Pack1 = [&p](auto len, auto &writer) { |
| | | Put32(p, len); |
| | | p += sizeof(len); |
| | | writer(p, len); |
| | | p += len; |
| | | }; |
| | | Pack1(head_len, headToArray); |
| | | Pack1(body_len, bodyToArray); |
| | | } |
| | | return addr; |
| | | } |
| | | |
| | | template <class Body> |
| | | void *Pack(SharedMemory &shm, const BHMsgHead &head, const Body &body) |
| | | void *Pack(const BHMsgHead &head, const Body &body) |
| | | { |
| | | return Pack( |
| | | shm, |
| | | uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); }, |
| | | uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); }); |
| | | } |
| | | |
| | | void *Pack(SharedMemory &shm, const std::string &content) |
| | | void *Pack(const std::string &content) |
| | | { |
| | | void *addr = Alloc(shm, content.size()); |
| | | void *addr = Alloc(content.size()); |
| | | if (addr) { |
| | | memcpy(addr, content.data(), content.size()); |
| | | } |
| | | return addr; |
| | | } |
| | | |
| | | bool Make(SharedMemory &shm, void *addr); |
| | | MsgI(void *p) : |
| | | ptr_(p) {} |
| | | bool Make(void *addr) |
| | | { |
| | | if (!addr) { |
| | | return false; |
| | | } |
| | | ShmMsg(addr).swap(*this); |
| | | return true; |
| | | } |
| | | ShmMsg(void *p) : |
| | | offset_(p ? (Addr(p) - BaseAddr()) : 0) {} |
| | | |
| | | template <class T = void> |
| | | T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); } |
| | | |
| | | public: |
| | | MsgI() : |
| | | MsgI(nullptr) {} |
| | | MsgI(SharedMemory &shm, const size_t size) : |
| | | MsgI(Alloc(shm, size)) {} |
| | | void swap(MsgI &a) { std::swap(ptr_, a.ptr_); } |
| | | bool valid() const { return static_cast<bool>(ptr_); } |
| | | template <class T = void> |
| | | T *get() const { return static_cast<T *>(ptr_.get()); } |
| | | static bool BindShm(SharedMemory &shm) |
| | | { |
| | | assert(!pshm()); |
| | | pshm() = &shm; |
| | | return true; |
| | | } |
| | | |
| | | ShmMsg() : |
| | | ShmMsg(nullptr) {} |
| | | explicit ShmMsg(const size_t size) : |
| | | ShmMsg(Alloc(size)) {} |
| | | void swap(ShmMsg &a) { std::swap(offset_, a.offset_); } |
| | | bool valid() const { return static_cast<bool>(offset_); } |
| | | |
| | | // AddRef and Release works for both counted and not counted msg. |
| | | int AddRef() const { return valid() ? meta()->count_.Inc() : 1; } |
| | | int Release(SharedMemory &shm); |
| | | int Release() |
| | | { |
| | | if (!valid()) { |
| | | return 0; |
| | | } |
| | | auto n = meta()->count_.Dec(); |
| | | if (n == 0) { |
| | | Free(); |
| | | } |
| | | return n; |
| | | } |
| | | int Count() const { return valid() ? meta()->count_.Get() : 1; } |
| | | |
| | | template <class Body> |
| | | inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body) |
| | | { |
| | | return Make(shm, Pack(shm, head, body)); |
| | | } |
| | | inline bool Make(const BHMsgHead &head, const Body &body) { return Make(Pack(head, body)); } |
| | | inline bool Make(const std::string &content) { return Make(Pack(content)); } |
| | | template <class Body> |
| | | static inline std::string Serialize(const BHMsgHead &head, const Body &body) |
| | | { |
| | |
| | | assert(pos == s.size()); |
| | | return s; |
| | | } |
| | | inline bool Make(SharedMemory &shm, const std::string &content) |
| | | { |
| | | void *p = Pack(shm, content); |
| | | return Make(shm, p); |
| | | } |
| | | |
| | | bool ParseHead(BHMsgHead &head) const; |
| | | bool ParseHead(BHMsgHead &head) const |
| | | { |
| | | auto p = get<char>(); |
| | | assert(p); |
| | | uint32_t msg_size = Get32(p); |
| | | p += 4; |
| | | return head.ParseFromArray(p, msg_size); |
| | | } |
| | | template <class Body> |
| | | bool ParseBody(Body &body) const |
| | | { |
| | | auto p = static_cast<char *>(ptr_.get()); |
| | | auto p = get<char>(); |
| | | assert(p); |
| | | uint32_t size = Get32(p); |
| | | p += 4; |
| | |
| | | } |
| | | }; |
| | | |
| | | inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); } |
| | | inline void swap(ShmMsg &m1, ShmMsg &m2) { m1.swap(m2); } |
| | | |
| | | typedef ShmMsg MsgI; |
| | | |
| | | } // namespace bhome_msg |
| | | |