lichao
2021-04-06 bb9a7e348892eb5c4fccb063380aa6fcd9612b71
src/msg.cpp
@@ -20,7 +20,10 @@
namespace bhome_msg
{
/*TODO change msg format, header has proc info;
reply has errer msg
    center accept request and route.;
//*/
const uint32_t kMsgTag = 0xf1e2d3c4;
const uint32_t kMsgPrefixLen = 4;
@@ -43,9 +46,9 @@
BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size)
{
   BHMsg msg(InitMsg(kMsgTypeRequest));
   BHMsg msg(InitMsg(kMsgTypeRequestTopic));
   AddRoute(msg, src_id);
   DataRequest req;
   MsgRequestTopic req;
   req.set_topic(topic);
   req.set_data(data, size);
   msg.set_body(req.SerializeAsString());
@@ -54,9 +57,9 @@
BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics)
{
   BHMsg msg(InitMsg(kMsgTypeProcRegisterTopics));
   BHMsg msg(InitMsg(kMsgTypeRegister));
   AddRoute(msg, src_id);
   DataProcRegister reg;
   MsgRegister reg;
   reg.mutable_proc()->Swap(&info);
   for (auto &t : topics) {
      reg.add_topics(t);
@@ -67,9 +70,9 @@
BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info)
{
   BHMsg msg(InitMsg(kMsgTypeProcHeartbeat));
   BHMsg msg(InitMsg(kMsgTypeHeartbeat));
   AddRoute(msg, src_id);
   DataProcRegister reg;
   MsgHeartbeat reg;
   reg.mutable_proc()->Swap(&info);
   msg.set_body(reg.SerializeAsString());
   return msg;
@@ -78,8 +81,8 @@
BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size)
{
   assert(data && size);
   BHMsg msg(InitMsg(kMsgTypeReply, src_msgid));
   DataReply reply;
   BHMsg msg(InitMsg(kMsgTypeRequestTopicReply, src_msgid));
   MsgRequestTopicReply reply;
   reply.set_data(data, size);
   msg.set_body(reply.SerializeAsString());
   return msg;
@@ -90,7 +93,7 @@
   assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
   BHMsg msg(InitMsg(sub_unsub));
   AddRoute(msg, client);
   DataSub subs;
   MsgSub subs;
   for (auto &t : topics) {
      subs.add_topics(t);
   }
@@ -105,7 +108,7 @@
{
   assert(data && size);
   BHMsg msg(InitMsg(kMsgTypePublish));
   DataPub pub;
   MsgPub pub;
   pub.set_topic(topic);
   pub.set_data(data, size);
   msg.set_body(pub.SerializeAsString());
@@ -114,17 +117,17 @@
BHMsg MakeQueryTopic(const MQId &client, const std::string &topic)
{
   BHMsg msg(InitMsg(kMsgTypeProcQueryTopic));
   BHMsg msg(InitMsg(kMsgTypeQueryTopic));
   AddRoute(msg, client);
   DataProcQueryTopic query;
   MsgQueryTopic query;
   query.set_topic(topic);
   msg.set_body(query.SerializeAsString());
   return msg;
}
BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid)
{
   BHMsg msg(InitMsg(kMsgTypeProcQueryTopicReply, msgid));
   DataProcQueryTopicReply reply;
   BHMsg msg(InitMsg(kMsgTypeQueryTopicReply, msgid));
   MsgQueryTopicReply reply;
   reply.mutable_address()->set_mq_id(mqid);
   msg.set_body(reply.SerializeAsString());
   return msg;