lichao
2021-04-06 3e9f5b869dd32441fdd3d77091cb33ef4301f244
src/msg.cpp
@@ -20,41 +20,69 @@
namespace bhome_msg
{
/*TODO change msg format, header has proc info;
reply has errer msg
    center accept request and route.;
//*/
const uint32_t kMsgTag = 0xf1e2d3c4;
const uint32_t kMsgPrefixLen = 4;
BHMsg InitMsg(MsgType type)
inline void AddRoute(BHMsg &msg, const MQId &id) { msg.add_route()->set_mq_id(&id, sizeof(id)); }
std::string RandId()
{
   boost::uuids::uuid id = boost::uuids::random_generator()();
   return std::string((char *) &id, sizeof(id));
}
BHMsg InitMsg(MsgType type, const std::string &msgid = RandId())
{
   BHMsg msg;
   msg.set_msg_id(msgid);
   msg.set_type(type);
   time_t tm = 0;
   msg.set_timestamp(time(&tm));
   return msg;
}
BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size)
{
   assert(data && size);
   BHMsg msg(InitMsg(kMsgTypeRequest));
   msg.set_body(data, size);
   msg.add_route()->set_mq_id(&src_id, sizeof(src_id));
   return msg;
}
BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size)
{
   DataRequest req;
   BHMsg msg(InitMsg(kMsgTypeRequestTopic));
   AddRoute(msg, src_id);
   MsgRequestTopic req;
   req.set_topic(topic);
   req.set_data(data, size);
   const std::string &body(req.SerializeAsString());
   return MakeRequest(src_id, body.data(), body.size());
   msg.set_body(req.SerializeAsString());
   return msg;
}
BHMsg MakeReply(const void *data, const size_t size)
BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics)
{
   BHMsg msg(InitMsg(kMsgTypeRegister));
   AddRoute(msg, src_id);
   MsgRegister reg;
   reg.mutable_proc()->Swap(&info);
   for (auto &t : topics) {
      reg.add_topics(t);
   }
   msg.set_body(reg.SerializeAsString());
   return msg;
}
BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info)
{
   BHMsg msg(InitMsg(kMsgTypeHeartbeat));
   AddRoute(msg, src_id);
   MsgHeartbeat reg;
   reg.mutable_proc()->Swap(&info);
   msg.set_body(reg.SerializeAsString());
   return msg;
}
BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size)
{
   assert(data && size);
   BHMsg msg(InitMsg(kMsgTypeReply));
   DataReply reply;
   BHMsg msg(InitMsg(kMsgTypeRequestTopicReply, src_msgid));
   MsgRequestTopicReply reply;
   reply.set_data(data, size);
   msg.set_body(reply.SerializeAsString());
   return msg;
@@ -64,8 +92,8 @@
{
   assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
   BHMsg msg(InitMsg(sub_unsub));
   msg.add_route()->set_mq_id(&client, sizeof(client));
   DataSub subs;
   AddRoute(msg, client);
   MsgSub subs;
   for (auto &t : topics) {
      subs.add_topics(t);
   }
@@ -80,21 +108,30 @@
{
   assert(data && size);
   BHMsg msg(InitMsg(kMsgTypePublish));
   DataPub pub;
   MsgPub pub;
   pub.set_topic(topic);
   pub.set_data(data, size);
   msg.set_body(pub.SerializeAsString());
   return msg;
}
BHMsg MakeQueryTopic(const std::string &topic)
BHMsg MakeQueryTopic(const MQId &client, const std::string &topic)
{
   BHMsg msg(InitMsg(kMsgTypeQueryTopic));
   DataQueryTopic query;
   AddRoute(msg, client);
   MsgQueryTopic query;
   query.set_topic(topic);
   msg.set_body(query.SerializeAsString());
   return msg;
}
BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid)
{
   BHMsg msg(InitMsg(kMsgTypeQueryTopicReply, msgid));
   MsgQueryTopicReply reply;
   reply.mutable_address()->set_mq_id(mqid);
   msg.set_body(reply.SerializeAsString());
   return msg;
}
void *Pack(SharedMemory &shm, const BHMsg &msg)
{