lichao
2021-05-19 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3
src/defs.cpp
@@ -18,6 +18,10 @@
#include "defs.h"
#include "msg.h"
#include "shm_msg_queue.h"
#include "socket.h"
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/string_generator.hpp>
#include <boost/uuid/uuid.hpp>
namespace
{
@@ -70,7 +74,83 @@
const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]);
static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough.");
static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct.");
const int64_t kCenterInfoFixedAddress = 1024 * 4;
const boost::uuids::uuid kCenterInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
struct CenterMetaInfo {
   boost::uuids::uuid tag_;
   CenterInfo info_;
};
int64_t Addr(void *ptr) { return reinterpret_cast<int64_t>(ptr); }
// void *Ptr(const int64_t offset) { return reinterpret_cast<void *>(offset); }
template <class T = void>
T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); }
} // namespace
CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm)
{
   auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
   if (pmeta->tag_ == kCenterInfoTag) {
      return &pmeta->info_;
   }
   return nullptr;
}
// put center info at fixed memory position.
// as boost shm find object (find socket/mq by id, etc...) also locks inside,
// which node might crash inside and cause deadlock.
bool CenterInit(bhome_shm::SharedMemory &shm)
{
   Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock");
   if (!mutex || !mutex->try_lock()) {
      return false;
   }
   DEFER1(mutex->unlock());
   auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
   if (pmeta->tag_ == kCenterInfoTag) {
      return true;
   } else {
      auto base = Addr(shm.get_address());
      auto offset = kCenterInfoFixedAddress;
      void *p = shm.Alloc(offset * 2);
      if (Addr(p) - base <= offset) {
         pmeta = new (Ptr(offset + base)) CenterMetaInfo;
         auto &info = pmeta->info_;
         auto InitMQ = [&](auto &mq, auto &&id) {
            mq.id_ = id;
            ShmSocket tmp(shm, id, 16);
            mq.offset_ = tmp.AbsAddr();
         };
         int id = 100;
         auto NextId = [&]() { return ++id; };
         InitMQ(info.mq_sender_, NextId());
         InitMQ(info.mq_center_, NextId());
         InitMQ(info.mq_bus_, NextId());
         pmeta->tag_ = kCenterInfoTag;
         return true;
      }
   }
   return false;
}
const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; }
const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; }
const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; }
bool BHNodeInit(const int64_t request, int64_t &reply)
{
   return GetCenterInfo(BHomeShm())->init_rr_.ClientRequest(request, reply);
}
void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq)
{
   GetCenterInfo(BHomeShm())->init_rr_.ServerProcess(onReq);
}
int64_t CalcAllocIndex(int64_t size)
{
@@ -93,9 +173,8 @@
bool GlobalInit(bhome_shm::SharedMemory &shm)
{
   MsgI::BindShm(shm);
   typedef std::atomic<MQId> IdSrc;
   IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000);
   return psrc && ShmMsgQueue::SetData(*psrc);
   CenterInfo *pinfo = GetCenterInfo(shm);
   return pinfo && ShmMsgQueue::SetData(pinfo->mqid_);
}
void SetLastError(const int ec, const std::string &msg)
@@ -108,4 +187,6 @@
{
   ec = LastErrorStore().ec_;
   msg = LastErrorStore().msg_;
}
}
int NodeTimeoutSec() { return 60; }