/* * ===================================================================================== * * Filename: msg.cpp * * Description: * * Version: 1.0 * Created: 2021年03月24日 16时48分42秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "msg.h" #include "bh_util.h" namespace bhome_msg { const uint32_t kMsgTag = 0xf1e2d3c4; const uint32_t kMsgPrefixLen = 4; inline void AddRoute(BHMsg &msg, const MQId &id) { msg.add_route()->set_mq_id(&id, sizeof(id)); } std::string RandId() { boost::uuids::uuid id = boost::uuids::random_generator()(); return std::string((char *) &id, sizeof(id)); } BHMsg InitMsg(MsgType type, const std::string &msgid = RandId()) { BHMsg msg; msg.set_msg_id(msgid); msg.set_type(type); time_t tm = 0; msg.set_timestamp(time(&tm)); return msg; } BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size) { BHMsg msg(InitMsg(kMsgTypeRequest)); AddRoute(msg, src_id); DataRequest req; req.set_topic(topic); req.set_data(data, size); msg.set_body(req.SerializeAsString()); return msg; } BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector &topics) { BHMsg msg(InitMsg(kMsgTypeProcRegisterTopics)); AddRoute(msg, src_id); DataProcRegister reg; reg.mutable_proc()->Swap(&info); for (auto &t : topics) { reg.add_topics(t); } msg.set_body(reg.SerializeAsString()); return msg; } BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info) { BHMsg msg(InitMsg(kMsgTypeProcHeartbeat)); AddRoute(msg, src_id); DataProcRegister reg; reg.mutable_proc()->Swap(&info); msg.set_body(reg.SerializeAsString()); return msg; } BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size) { assert(data && size); BHMsg msg(InitMsg(kMsgTypeReply, src_msgid)); DataReply reply; reply.set_data(data, size); msg.set_body(reply.SerializeAsString()); return msg; } BHMsg MakeSubUnsub(const MQId &client, const std::vector &topics, const MsgType sub_unsub) { assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe); BHMsg msg(InitMsg(sub_unsub)); AddRoute(msg, client); DataSub subs; for (auto &t : topics) { subs.add_topics(t); } msg.set_body(subs.SerializeAsString()); return msg; } BHMsg MakeSub(const MQId &client, const std::vector &topics) { return MakeSubUnsub(client, topics, kMsgTypeSubscribe); } BHMsg MakeUnsub(const MQId &client, const std::vector &topics) { return MakeSubUnsub(client, topics, kMsgTypeUnsubscribe); } BHMsg MakePub(const std::string &topic, const void *data, const size_t size) { assert(data && size); BHMsg msg(InitMsg(kMsgTypePublish)); DataPub pub; pub.set_topic(topic); pub.set_data(data, size); msg.set_body(pub.SerializeAsString()); return msg; } BHMsg MakeQueryTopic(const MQId &client, const std::string &topic) { BHMsg msg(InitMsg(kMsgTypeProcQueryTopic)); AddRoute(msg, client); DataProcQueryTopic query; query.set_topic(topic); msg.set_body(query.SerializeAsString()); return msg; } BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid) { BHMsg msg(InitMsg(kMsgTypeProcQueryTopicReply, msgid)); DataProcQueryTopicReply reply; reply.mutable_address()->set_mq_id(mqid); msg.set_body(reply.SerializeAsString()); return msg; } void *Pack(SharedMemory &shm, const BHMsg &msg) { uint32_t msg_size = msg.ByteSizeLong(); void *p = shm.Alloc(4 + msg_size); if (p) { Put32(p, msg_size); if (!msg.SerializeToArray(static_cast(p) + kMsgPrefixLen, msg_size)) { shm.Dealloc(p); p = 0; } } return p; } bool MsgI::Unpack(BHMsg &msg) const { void *p = ptr_.get(); assert(p); uint32_t msg_size = Get32(p); return msg.ParseFromArray(static_cast(p) + kMsgPrefixLen, msg_size); } // with ref count; bool MsgI::MakeRC(SharedMemory &shm, const BHMsg &msg) { void *p = Pack(shm, msg); if (!p) { return false; } RefCount *rc = shm.New(); if (!rc) { shm.Dealloc(p); return false; } MsgI(p, rc).swap(*this); return true; } bool MsgI::Make(SharedMemory &shm, const BHMsg &msg) { void *p = Pack(shm, msg); if (!p) { return false; } MsgI(p, 0).swap(*this); return true; } int MsgI::Release(SharedMemory &shm) { if (IsCounted()) { const int n = count_->Dec(); if (n != 0) { return n; } } // free data shm.Dealloc(ptr_); ptr_ = 0; shm.Delete(count_); count_ = 0; return 0; } } // namespace bhome_msg