lichao
2021-04-16 c6964d5af25d4ec7ed9dbe7674dc4e3896b36ead
src/msg.h
@@ -18,10 +18,12 @@
#ifndef MSG_5BILLZET
#define MSG_5BILLZET
#include "bhome_msg.pb.h"
#include "bh_util.h"
#include "proto.h"
#include "shm.h"
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <functional>
#include <stdint.h>
namespace bhome_msg
@@ -59,19 +61,29 @@
   int num_ = 1;
};
BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size);
BHMsg MakeReply(const void *data, const size_t size);
BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics);
BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics);
BHMsg MakePub(const std::string &topic, const void *data, const size_t size);
// message content layout: header_size + header + data_size + data
class MsgI
{
private:
   offset_ptr<void> ptr_;
   offset_ptr<RefCount> count_;
   bool BuildSubOrUnsub(SharedMemory &shm, const std::vector<std::string> &topics, const MsgType sub_unsub);
   typedef std::function<void(void *p, int len)> ToArray;
   void *Pack(SharedMemory &shm,
              const uint32_t head_len, const ToArray &headToArray,
              const uint32_t body_len, const ToArray &bodyToArray);
   template <class Body>
   void *Pack(SharedMemory &shm, const BHMsgHead &head, const Body &body)
   {
      return Pack(
          shm,
          uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); },
          uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); });
   }
   bool MakeRC(SharedMemory &shm, void *addr);
   bool Make(SharedMemory &shm, void *addr);
public:
   MsgI(void *p = 0, RefCount *c = 0) :
@@ -92,9 +104,35 @@
   int Count() const { return IsCounted() ? count_->Get() : 1; }
   bool IsCounted() const { return static_cast<bool>(count_); }
   bool Make(SharedMemory &shm, const BHMsg &msg);
   bool MakeRC(SharedMemory &shm, const BHMsg &msg);
   bool Unpack(BHMsg &msg) const;
   template <class Body>
   inline bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body)
   {
      return MakeRC(shm, Pack(shm, head, body));
   }
   bool EnableRefCount(SharedMemory &shm);
   template <class Body>
   inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
   {
      void *p = Pack(shm, head, body);
      auto NeedRefCount = [&]() { return head.type() == kMsgTypePublish; };
      return NeedRefCount() ? MakeRC(shm, p) : Make(shm, p);
   }
   bool ParseHead(BHMsgHead &head) const;
   template <class Body>
   bool ParseBody(Body &body) const
   {
      auto p = static_cast<char *>(ptr_.get());
      assert(p);
      uint32_t size = Get32(p);
      p += 4;
      p += size;
      size = Get32(p);
      p += 4;
      return body.ParseFromArray(p, size);
   }
};
inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); }