lichao
2021-05-21 1ff714838c03cba1a18884d5b48a20ee6c4275ac
src/defs.cpp
@@ -90,7 +90,7 @@
} // namespace
CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm)
CenterInfo *GetCenterInfo(SharedMemory &shm)
{
   auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
   if (pmeta->tag_ == kCenterInfoTag) {
@@ -98,11 +98,33 @@
   }
   return nullptr;
}
ShmSocket &DefaultSender(SharedMemory &shm)
{
   typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair;
   static std::vector<Pair> store;
   static std::mutex s_mtx;
   thread_local Pair local_cache;
   if (local_cache.first == &shm) {
      return *local_cache.second;
   }
   std::lock_guard<std::mutex> lk(s_mtx);
   for (auto &kv : store) {
      if (kv.first == &shm) {
         local_cache = kv;
         return *local_cache.second;
      }
   }
   auto &mq = GetCenterInfo(shm)->mq_sender_;
   store.emplace_back(&shm, new ShmSocket(mq.offset_, shm, mq.id_));
   local_cache = store.back();
   return *local_cache.second;
}
// put center info at fixed memory position.
// as boost shm find object (find socket/mq by id, etc...) also locks inside,
// which node might crash inside and cause deadlock.
bool CenterInit(bhome_shm::SharedMemory &shm)
bool CenterInit(SharedMemory &shm)
{
   Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock");
   if (!mutex || !mutex->try_lock()) {
@@ -140,16 +162,15 @@
   return false;
}
const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; }
const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; }
const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; }
bool BHNodeInit(const int64_t request, int64_t &reply)
const MQInfo &BHTopicCenterAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_center_; }
const MQInfo &BHTopicBusAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_bus_; }
bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply)
{
   return GetCenterInfo(BHomeShm())->init_rr_.ClientRequest(request, reply);
   return GetCenterInfo(shm)->init_rr_.ClientRequest(request, reply);
}
void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq)
void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq)
{
   GetCenterInfo(BHomeShm())->init_rr_.ServerProcess(onReq);
   GetCenterInfo(shm)->init_rr_.ServerProcess(onReq);
}
int64_t CalcAllocIndex(int64_t size)
@@ -164,18 +185,15 @@
{
   return "bhome_default_shm_v0";
}
bhome_shm::SharedMemory &BHomeShm()
SharedMemory &BHomeShm()
{
   static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
   static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
   return shm;
}
bool GlobalInit(bhome_shm::SharedMemory &shm)
{
   MsgI::BindShm(shm);
   CenterInfo *pinfo = GetCenterInfo(shm);
   return pinfo && ShmMsgQueue::SetData(pinfo->mqid_);
}
bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); }
MQId NewSession() { return 10 * (++GetCenterInfo(BHomeShm())->mqid_); }
void SetLastError(const int ec, const std::string &msg)
{