liuxiaolong
2021-07-20 58d904a328c0d849769b483e901a0be9426b8209
src/defs.cpp
@@ -18,10 +18,11 @@
#include "defs.h"
#include "msg.h"
#include "shm_msg_queue.h"
#include "socket.h"
#include "shm_socket.h"
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/string_generator.hpp>
#include <boost/uuid/uuid.hpp>
#include <sys/file.h>
namespace
{
@@ -76,8 +77,16 @@
static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct.");
const int64_t kCenterInfoFixedAddress = 1024 * 4;
const int64_t kShmMetaInfoFixedAddress = 1024 * 16;
const boost::uuids::uuid kCenterInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
const boost::uuids::uuid kMetaInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
struct BHomeMetaInfo {
   boost::uuids::uuid tag_;
   std::atomic<uint64_t> shm_id_;
   std::atomic<uint64_t> ssn_id_;
};
struct CenterMetaInfo {
   boost::uuids::uuid tag_;
   CenterInfo info_;
@@ -88,30 +97,137 @@
template <class T = void>
T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); }
class FileLock
{
public:
   FileLock(const std::string &path) :
       fd_(Open(path))
   {
      if (fd_ == -1) { throw std::runtime_error("error open file:" + path); }
   }
   ~FileLock() { Close(fd_); }
   bool try_lock() { return fd_ != -1 && (flock(fd_, LOCK_EX | LOCK_NB) == 0); }
   void unlock() { flock(fd_, LOCK_UN); }
private:
   static int Open(const std::string &path) { return open(path.c_str(), O_RDONLY, 0666); }
   static int Close(int fd) { return close(fd); }
   int fd_;
   std::mutex mtx_;
};
SharedMemory &BHomeMetaShm()
{
   static std::string name("bhshmq_meta_v0");
   static SharedMemory shm(name, 1024 * 128);
   return shm;
}
ShmSocket &ShmSender(SharedMemory &shm, const bool reset)
{
   typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair;
   static std::vector<Pair> store;
   static std::mutex s_mtx;
   thread_local Pair local_cache;
   std::lock_guard<std::mutex> lk(s_mtx);
   if (reset) {
      for (auto &kv : store) {
         if (kv.first == &shm) {
            auto &mq = GetCenterInfo(shm)->mq_sender_;
            kv.second.reset(new ShmSocket(mq.offset_, shm, mq.id_));
            local_cache = kv;
            return *local_cache.second;
         }
      }
   } else if (local_cache.first == &shm) {
      return *local_cache.second;
   }
   for (auto &kv : store) {
      if (kv.first == &shm) {
         local_cache = kv;
         return *local_cache.second;
      }
   }
   auto &mq = GetCenterInfo(shm)->mq_sender_;
   store.emplace_back(&shm, std::make_shared<ShmSocket>(mq.offset_, shm, mq.id_));
   // store.emplace_back(&shm, new ShmSocket(mq.offset_, shm, mq.id_));
   local_cache = store.back();
   return *local_cache.second;
}
} // namespace
CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm)
CenterInfo *GetCenterInfo(SharedMemory &shm)
{
   auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
   if (pmeta->tag_ == kCenterInfoTag) {
   if (pmeta->tag_ == kMetaInfoTag) {
      return &pmeta->info_;
   }
   return nullptr;
}
ShmSocket &DefaultSender(SharedMemory &shm) { return ShmSender(shm, false); }
BHomeMetaInfo *GetBHomeMeta()
{
   auto p = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address()));
   return (p->tag_ == kMetaInfoTag) ? p : nullptr;
}
bool ShmMetaInit()
{
   SharedMemory &shm = BHomeMetaShm();
   static FileLock fl("/dev/shm/" + shm.name());
   if (!fl.try_lock()) { // single center instance only.
      return false;
   }
   auto pmeta = GetBHomeMeta();
   if (pmeta && pmeta->tag_ == kMetaInfoTag) {
      // remove old shm
      SharedMemory::Remove(BHomeShmName());
      ++pmeta->shm_id_; // inc shm id
      return true;      // already exist.
   } else {
      Mutex *mutex = shm.FindOrCreate<Mutex>("bhshmq_meta_lock");
      if (!mutex || !mutex->try_lock()) {
         return false;
      }
      DEFER1(mutex->unlock());
      auto base = Addr(shm.get_address());
      auto offset = kShmMetaInfoFixedAddress;
      void *p = shm.Alloc(offset * 2);
      if (Addr(p) - base <= offset) {
         pmeta = new (Ptr(offset + base)) BHomeMetaInfo;
         pmeta->tag_ = kMetaInfoTag;
         pmeta->shm_id_ = 100;
         pmeta->ssn_id_ = 10000;
         return true;
      }
   }
   return false;
}
// put center info at fixed memory position.
// as boost shm find object (find socket/mq by id, etc...) also locks inside,
// which node might crash inside and cause deadlock.
bool CenterInit(bhome_shm::SharedMemory &shm)
bool CenterInit()
{
   Mutex *mutex = shm.Create<Mutex>("shm_center_lock");
   if (!ShmMetaInit()) { return false; }
   SharedMemory &shm = BHomeShm();
   Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock");
   if (!mutex || !mutex->try_lock()) {
      return false;
   }
   DEFER1(mutex->unlock());
   auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
   if (pmeta->tag_ == kCenterInfoTag) {
   if (pmeta->tag_ == kMetaInfoTag) {
      return true;
   } else {
      auto base = Addr(shm.get_address());
@@ -123,7 +239,7 @@
         auto InitMQ = [&](auto &mq, auto &&id) {
            mq.id_ = id;
            ShmSocket tmp(shm, id, 16);
            ShmSocket tmp(shm, id, eOpenOrCreate);
            mq.offset_ = tmp.AbsAddr();
         };
@@ -132,19 +248,24 @@
         InitMQ(info.mq_sender_, NextId());
         InitMQ(info.mq_center_, NextId());
         InitMQ(info.mq_bus_, NextId());
         InitMQ(info.mq_init_, NextId());
         pmeta->tag_ = kCenterInfoTag;
         pmeta->tag_ = kMetaInfoTag;
         return true;
      }
   }
   return false;
}
uint64_t BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_.id_; }
uint64_t BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_.id_; }
uint64_t BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_.id_; }
uint64_t BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_.id_; }
const MQInfo &BHTopicCenterAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_center_; }
const MQInfo &BHTopicBusAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_bus_; }
bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply)
{
   return GetCenterInfo(shm)->init_rr_.ClientRequest(request, reply);
}
void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq)
{
   GetCenterInfo(shm)->init_rr_.ServerProcess(onReq);
}
int64_t CalcAllocIndex(int64_t size)
{
@@ -156,20 +277,27 @@
std::string BHomeShmName()
{
   return "bhome_default_shm_v0";
}
bhome_shm::SharedMemory &BHomeShm()
{
   static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
   return shm;
   auto bhome_meta = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address()));
   return "bhshmq_sid_" + std::to_string(bhome_meta->shm_id_.load());
}
bool GlobalInit(bhome_shm::SharedMemory &shm)
SharedMemory &BHomeShm()
{
   MsgI::BindShm(shm);
   CenterInfo *pinfo = GetCenterInfo(shm);
   return pinfo && ShmMsgQueue::SetData(pinfo->mqid_);
   static std::unique_ptr<SharedMemory> shm_ptr;
   static std::string shm_name;
   if (!shm_ptr || shm_name != BHomeShmName()) {
      shm_name = BHomeShmName();
      if (shm_ptr) {
         ShmSender(*shm_ptr, true); // reset sender.
      }
      shm_ptr.reset(new SharedMemory(shm_name, 1024 * 1024 * 512));
   }
   return *shm_ptr;
}
bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); }
MQId NewSession() { return 10 * (++GetBHomeMeta()->ssn_id_); }
void SetLastError(const int ec, const std::string &msg)
{
@@ -181,4 +309,8 @@
{
   ec = LastErrorStore().ec_;
   msg = LastErrorStore().msg_;
}
}
int NodeTimeoutSec() { return 60; }
std::string BHLogDir() { return "/opt/vasystem/valog/"; }