/* * ===================================================================================== * * Filename: msg.h * * Description: * * Version: 1.0 * Created: 2021年03月24日 16时49分20秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #ifndef MSG_5BILLZET #define MSG_5BILLZET #include "bhome_msg.pb.h" #include "shm.h" #include #include #include namespace bhome_msg { using namespace bhome_shm; using namespace bhome::msg; // for serialized data in MsgI // MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required. // message format: header(meta) + body(data). typedef boost::uuids::uuid MQId; // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. class RefCount : private boost::noncopyable { public: int Inc() { Guard lk(mutex_); return ++num_; } int Dec() { Guard lk(mutex_); return --num_; } int Get() { Guard lk(mutex_); return num_; } private: Mutex mutex_; int num_ = 1; }; BHMsg MakeQueryTopic(const std::string &topic); BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size); BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size); BHMsg MakeReply(const void *data, const size_t size); BHMsg MakeSub(const MQId &client, const std::vector &topics); BHMsg MakeUnsub(const MQId &client, const std::vector &topics); BHMsg MakePub(const std::string &topic, const void *data, const size_t size); class MsgI { private: offset_ptr ptr_; offset_ptr count_; bool BuildSubOrUnsub(SharedMemory &shm, const std::vector &topics, const MsgType sub_unsub); public: MsgI(void *p = 0, RefCount *c = 0) : ptr_(p), count_(c) {} void swap(MsgI &a) { std::swap(ptr_, a.ptr_); std::swap(count_, a.count_); } template T *get() { return static_cast(ptr_.get()); } // AddRef and Release works for both counted and not counted msg. int AddRef() const { return IsCounted() ? count_->Inc() : 1; } int Release(SharedMemory &shm); int Count() const { return IsCounted() ? count_->Get() : 1; } bool IsCounted() const { return static_cast(count_); } bool Make(SharedMemory &shm, const BHMsg &msg); bool MakeRC(SharedMemory &shm, const BHMsg &msg); bool Unpack(BHMsg &msg) const; }; inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); } } // namespace bhome_msg #endif // end of include guard: MSG_5BILLZET