lichao
2021-04-08 c338820e4db43ad32c20ff429a038b06bcb980f8
src/msg.h
@@ -18,10 +18,12 @@
#ifndef MSG_5BILLZET
#define MSG_5BILLZET
#include "bhome_msg.pb.h"
#include "bh_util.h"
#include "proto.h"
#include "shm.h"
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <functional>
#include <stdint.h>
namespace bhome_msg
@@ -59,16 +61,6 @@
   int num_ = 1;
};
BHMsg MakeQueryTopic(const MQId &client, const std::string &topic);
BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid);
BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size);
BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size);
BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics);
BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info);
BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics);
BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics);
BHMsg MakePub(const std::string &topic, const void *data, const size_t size);
// message content layout: header_size + header + data_size + data
class MsgI
{
@@ -76,7 +68,22 @@
   offset_ptr<void> ptr_;
   offset_ptr<RefCount> count_;
   bool BuildSubOrUnsub(SharedMemory &shm, const std::vector<std::string> &topics, const MsgType sub_unsub);
   typedef std::function<void(void *p, int len)> ToArray;
   void *Pack(SharedMemory &shm,
              const uint32_t head_len, const ToArray &headToArray,
              const uint32_t body_len, const ToArray &bodyToArray);
   template <class Body>
   void *Pack(SharedMemory &shm, const BHMsgHead &head, const Body &body)
   {
      return Pack(
          shm,
          uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); },
          uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); });
   }
   bool MakeRC(SharedMemory &shm, void *addr);
   bool Make(SharedMemory &shm, void *addr);
public:
   MsgI(void *p = 0, RefCount *c = 0) :
@@ -97,9 +104,41 @@
   int Count() const { return IsCounted() ? count_->Get() : 1; }
   bool IsCounted() const { return static_cast<bool>(count_); }
   bool Make(SharedMemory &shm, const BHMsg &msg);
   bool MakeRC(SharedMemory &shm, const BHMsg &msg);
   bool Unpack(BHMsg &msg) const;
   template <class Body>
   bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
   {
      return Make(shm, Pack(shm, head, body));
   }
   template <class Body>
   bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body)
   {
      return MakeRC(shm, Pack(shm, head, body));
   }
   bool MakeRC(SharedMemory &shm, MsgI &a)
   {
      if (a.IsCounted()) {
         *this = a;
         AddRef();
         return true;
      } else {
         void *p = a.ptr_.get();
         a.ptr_ = 0;
         return MakeRC(shm, p);
      }
   }
   bool ParseHead(BHMsgHead &head) const;
   template <class Body>
   bool ParseBody(Body &body) const
   {
      auto p = static_cast<char *>(ptr_.get());
      assert(p);
      uint32_t size = Get32(p);
      p += 4;
      p += size;
      size = Get32(p);
      p += 4;
      return body.ParseFromArray(p, size);
   }
};
inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); }