lichao
2021-04-02 c28cdf2fbf1565709b359c9cca6c5e29d9592dce
src/msg.cpp
@@ -24,36 +24,61 @@
const uint32_t kMsgTag = 0xf1e2d3c4;
const uint32_t kMsgPrefixLen = 4;
BHMsg InitMsg(MsgType type)
inline void AddRoute(BHMsg &msg, const MQId &id) { msg.add_route()->set_mq_id(&id, sizeof(id)); }
std::string RandId()
{
   boost::uuids::uuid id = boost::uuids::random_generator()();
   return std::string((char *) &id, sizeof(id));
}
BHMsg InitMsg(MsgType type, const std::string &msgid = RandId())
{
   BHMsg msg;
   msg.set_msg_id(msgid);
   msg.set_type(type);
   time_t tm = 0;
   msg.set_timestamp(time(&tm));
   return msg;
}
BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size)
{
   assert(data && size);
   BHMsg msg(InitMsg(kMsgTypeRequest));
   msg.set_body(data, size);
   msg.add_route()->set_mq_id(&src_id, sizeof(src_id));
   return msg;
}
BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size)
{
   BHMsg msg(InitMsg(kMsgTypeRequest));
   AddRoute(msg, src_id);
   DataRequest req;
   req.set_topic(topic);
   req.set_data(data, size);
   const std::string &body(req.SerializeAsString());
   return MakeRequest(src_id, body.data(), body.size());
   msg.set_body(req.SerializeAsString());
   return msg;
}
BHMsg MakeReply(const void *data, const size_t size)
BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics)
{
   BHMsg msg(InitMsg(kMsgTypeProcRegisterTopics));
   AddRoute(msg, src_id);
   DataProcRegister reg;
   reg.mutable_proc()->Swap(&info);
   for (auto &t : topics) {
      reg.add_topics(t);
   }
   msg.set_body(reg.SerializeAsString());
   return msg;
}
BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info)
{
   BHMsg msg(InitMsg(kMsgTypeProcHeartbeat));
   AddRoute(msg, src_id);
   DataProcRegister reg;
   reg.mutable_proc()->Swap(&info);
   msg.set_body(reg.SerializeAsString());
   return msg;
}
BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size)
{
   assert(data && size);
   BHMsg msg(InitMsg(kMsgTypeReply));
   BHMsg msg(InitMsg(kMsgTypeReply, src_msgid));
   DataReply reply;
   reply.set_data(data, size);
   msg.set_body(reply.SerializeAsString());
@@ -64,7 +89,7 @@
{
   assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
   BHMsg msg(InitMsg(sub_unsub));
   msg.add_route()->set_mq_id(&client, sizeof(client));
   AddRoute(msg, client);
   DataSub subs;
   for (auto &t : topics) {
      subs.add_topics(t);
@@ -87,14 +112,23 @@
   return msg;
}
BHMsg MakeQueryTopic(const std::string &topic)
BHMsg MakeQueryTopic(const MQId &client, const std::string &topic)
{
   BHMsg msg(InitMsg(kMsgTypeQueryTopic));
   DataQueryTopic query;
   BHMsg msg(InitMsg(kMsgTypeProcQueryTopic));
   AddRoute(msg, client);
   DataProcQueryTopic query;
   query.set_topic(topic);
   msg.set_body(query.SerializeAsString());
   return msg;
}
BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid)
{
   BHMsg msg(InitMsg(kMsgTypeProcQueryTopicReply, msgid));
   DataProcQueryTopicReply reply;
   reply.mutable_address()->set_mq_id(mqid);
   msg.set_body(reply.SerializeAsString());
   return msg;
}
void *Pack(SharedMemory &shm, const BHMsg &msg)
{