/* * ===================================================================================== * * Filename: msg.cpp * * Description: * * Version: 1.0 * Created: 2021年03月24日 16时48分42秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "msg.h" #include "bh_util.h" namespace bhome_msg { const uint32_t kMsgTag = 0xf1e2d3c4; const uint32_t kMsgPrefixLen = 4; BHMsg InitMsg(MsgType type) { BHMsg msg; msg.set_type(type); time_t tm = 0; msg.set_timestamp(time(&tm)); return msg; } BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size) { assert(data && size); BHMsg msg(InitMsg(kMsgTypeRequest)); msg.set_body(data, size); msg.add_route()->set_mq_id(&src_id, sizeof(src_id)); return msg; } BHMsg MakeReply(const void *data, const size_t size) { assert(data && size); BHMsg msg(InitMsg(kMsgTypeReply)); msg.set_body(data, size); return msg; } BHMsg MakeSubUnsub(const MQId &client, const std::vector &topics, const MsgType sub_unsub) { assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe); BHMsg msg(InitMsg(sub_unsub)); msg.add_route()->set_mq_id(&client, sizeof(client)); DataSub subs; for (auto &t : topics) { subs.add_topics(t); } msg.set_body(subs.SerializeAsString()); return msg; } BHMsg MakeSub(const MQId &client, const std::vector &topics) { return MakeSubUnsub(client, topics, kMsgTypeSubscribe); } BHMsg MakeUnsub(const MQId &client, const std::vector &topics) { return MakeSubUnsub(client, topics, kMsgTypeUnsubscribe); } BHMsg MakePub(const std::string &topic, const void *data, const size_t size) { assert(data && size); BHMsg msg(InitMsg(kMsgTypePublish)); DataPub pub; pub.set_topic(topic); pub.set_data(data, size); msg.set_body(pub.SerializeAsString()); return msg; } void *Pack(SharedMemory &shm, const BHMsg &msg) { uint32_t msg_size = msg.ByteSizeLong(); void *p = shm.Alloc(4 + msg_size); if(p) { Put32(p, msg_size); if (!msg.SerializeToArray(static_cast(p) + kMsgPrefixLen, msg_size)) { shm.Dealloc(p); p = 0; } } return p; } bool MsgI::Unpack(BHMsg &msg) const { void *p = ptr_.get(); assert(p); uint32_t msg_size = Get32(p); return msg.ParseFromArray(static_cast(p) + kMsgPrefixLen, msg_size); } // with ref count; bool MsgI::MakeRC(SharedMemory &shm, const BHMsg &msg) { void *p = Pack(shm, msg); if(!p) { return false; } RefCount *rc = shm.New(); if (!rc) { shm.Dealloc(p); return false; } MsgI(p, rc).swap(*this); return true; } bool MsgI::Make(SharedMemory &shm, const BHMsg &msg) { void *p = Pack(shm, msg); if(!p) { return false; } MsgI(p, 0).swap(*this); return true; } int MsgI::Release(SharedMemory &shm) { if (IsCounted()) { const int n = count_->Dec(); if (n != 0) { return n; } } // free data shm.Dealloc(ptr_); ptr_ = 0; shm.Delete(count_); count_ = 0; return 0; } } // namespace bhome_msg