/* * ===================================================================================== * * Filename: msg.h * * Description: * * Version: 1.0 * Created: 2021年03月24日 16时49分20秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #ifndef MSG_5BILLZET #define MSG_5BILLZET #include "bh_util.h" #include "proto.h" #include "shm.h" #include #include #include #include namespace bhome_msg { using namespace bhome_shm; using namespace bhome::msg; // for serialized data in MsgI // MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required. // message format: header(meta) + body(data). typedef boost::uuids::uuid MQId; // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. class RefCount : private boost::noncopyable { public: int Inc() { Guard lk(mutex_); return ++num_; } int Dec() { Guard lk(mutex_); return --num_; } int Get() { Guard lk(mutex_); return num_; } private: Mutex mutex_; int num_ = 1; }; // message content layout: header_size + header + data_size + data class MsgI { private: offset_ptr ptr_; offset_ptr count_; typedef std::function ToArray; void *Pack(SharedMemory &shm, const uint32_t head_len, const ToArray &headToArray, const uint32_t body_len, const ToArray &bodyToArray); template void *Pack(SharedMemory &shm, const BHMsgHead &head, const Body &body) { return Pack( shm, uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); }, uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); }); } bool MakeRC(SharedMemory &shm, void *addr); bool Make(SharedMemory &shm, void *addr); public: MsgI(void *p = 0, RefCount *c = 0) : ptr_(p), count_(c) {} void swap(MsgI &a) { std::swap(ptr_, a.ptr_); std::swap(count_, a.count_); } template T *get() { return static_cast(ptr_.get()); } // AddRef and Release works for both counted and not counted msg. int AddRef() const { return IsCounted() ? count_->Inc() : 1; } int Release(SharedMemory &shm); int Count() const { return IsCounted() ? count_->Get() : 1; } bool IsCounted() const { return static_cast(count_); } template inline bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body) { return MakeRC(shm, Pack(shm, head, body)); } bool EnableRefCount(SharedMemory &shm); template inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body) { void *p = Pack(shm, head, body); auto NeedRefCount = [&]() { return head.type() == kMsgTypePublish; }; return NeedRefCount() ? MakeRC(shm, p) : Make(shm, p); } bool ParseHead(BHMsgHead &head) const; template bool ParseBody(Body &body) const { auto p = static_cast(ptr_.get()); assert(p); uint32_t size = Get32(p); p += 4; p += size; size = Get32(p); p += 4; return body.ParseFromArray(p, size); } }; inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); } } // namespace bhome_msg #endif // end of include guard: MSG_5BILLZET