lichao
2021-05-06 7ecd6323ffedbfef92c87c02b2a8680dd53b772c
src/msg.h
@@ -23,7 +23,6 @@
#include "shm.h"
#include <atomic>
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <functional>
#include <stdint.h>
@@ -34,11 +33,10 @@
// ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required.
// message content layout: (meta) / header_size + header + data_size + data
typedef boost::uuids::uuid MQId;
class ShmMsg
class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg>
{
private:
   static inline SharedMemory &shm() { return GetData(); }
   // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
   class RefCount : private boost::noncopyable
   {
@@ -51,34 +49,29 @@
      int Dec() { return --num_; }
      int Get() { return num_.load(); }
   };
   typedef int64_t Offset;
   static Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); }
   static void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); }
   static inline Offset BaseAddr()
   typedef int64_t OffsetType;
   static OffsetType Addr(void *ptr) { return reinterpret_cast<OffsetType>(ptr); }
   static void *Ptr(const OffsetType offset) { return reinterpret_cast<void *>(offset); }
   static inline OffsetType BaseAddr()
   {
      static const Offset base = Addr(shm().get_address()); // cache value.
      static const OffsetType base = Addr(shm().get_address()); // cache value.
      return base;
   }
   static inline SharedMemory &shm()
   {
      if (!pshm()) { throw std::string("Must set ShmMsg shm before use!"); }
      return *pshm();
   }
   static inline SharedMemory *&pshm()
   {
      static SharedMemory *pshm = 0;
      return pshm;
   }
   static const uint32_t kMsgTag = 0xf1e2d3c4;
   struct Meta {
      RefCount count_;
      const uint32_t tag_ = kMsgTag;
      const uint32_t size_ = 0;
      Meta(uint32_t size) :
          size_(size) {}
   };
   Offset offset_;
   OffsetType offset_;
   void *Alloc(const size_t size)
   {
      void *p = shm().Alloc(sizeof(Meta) + size);
      if (p) {
         auto pmeta = new (p) Meta;
         auto pmeta = new (p) Meta(size);
         p = pmeta + 1;
      }
      return p;
@@ -143,21 +136,16 @@
   T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); }
public:
   static bool BindShm(SharedMemory &shm)
   {
      assert(!pshm());
      pshm() = &shm;
      return true;
   }
   static bool BindShm(SharedMemory &shm) { return SetData(shm); }
   ShmMsg() :
       ShmMsg(nullptr) {}
   explicit ShmMsg(const size_t size) :
       ShmMsg(Alloc(size)) {}
   explicit ShmMsg(const OffsetType offset) :
       offset_(offset) {}
   OffsetType Offset() const { return offset_; }
   OffsetType &OffsetRef() { return offset_; }
   void swap(ShmMsg &a) { std::swap(offset_, a.offset_); }
   bool valid() const { return static_cast<bool>(offset_); }
   bool valid() const { return static_cast<bool>(offset_) && meta()->tag_ == kMsgTag; }
   // AddRef and Release works for both counted and not counted msg.
   int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
   int Release()
   {