| | |
| | | |
| | | } // namespace |
| | | |
| | | CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm) |
| | | CenterInfo *GetCenterInfo(SharedMemory &shm) |
| | | { |
| | | auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); |
| | | if (pmeta->tag_ == kCenterInfoTag) { |
| | |
| | | } |
| | | return nullptr; |
| | | } |
| | | ShmSocket &DefaultSender(SharedMemory &shm) |
| | | { |
| | | typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair; |
| | | static std::vector<Pair> store; |
| | | static std::mutex s_mtx; |
| | | |
| | | thread_local Pair local_cache; |
| | | if (local_cache.first == &shm) { |
| | | return *local_cache.second; |
| | | } |
| | | |
| | | std::lock_guard<std::mutex> lk(s_mtx); |
| | | for (auto &kv : store) { |
| | | if (kv.first == &shm) { |
| | | local_cache = kv; |
| | | return *local_cache.second; |
| | | } |
| | | } |
| | | auto &mq = GetCenterInfo(shm)->mq_sender_; |
| | | store.emplace_back(&shm, new ShmSocket(mq.offset_, shm, mq.id_)); |
| | | local_cache = store.back(); |
| | | return *local_cache.second; |
| | | } |
| | | // put center info at fixed memory position. |
| | | // as boost shm find object (find socket/mq by id, etc...) also locks inside, |
| | | // which node might crash inside and cause deadlock. |
| | | bool CenterInit(bhome_shm::SharedMemory &shm) |
| | | bool CenterInit(SharedMemory &shm) |
| | | { |
| | | Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock"); |
| | | if (!mutex || !mutex->try_lock()) { |
| | |
| | | return false; |
| | | } |
| | | |
| | | const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; } |
| | | const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; } |
| | | const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; } |
| | | bool BHNodeInit(const int64_t request, int64_t &reply) |
| | | const MQInfo &BHTopicCenterAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_center_; } |
| | | const MQInfo &BHTopicBusAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_bus_; } |
| | | bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply) |
| | | { |
| | | return GetCenterInfo(BHomeShm())->init_rr_.ClientRequest(request, reply); |
| | | return GetCenterInfo(shm)->init_rr_.ClientRequest(request, reply); |
| | | } |
| | | void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq) |
| | | void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq) |
| | | { |
| | | GetCenterInfo(BHomeShm())->init_rr_.ServerProcess(onReq); |
| | | GetCenterInfo(shm)->init_rr_.ServerProcess(onReq); |
| | | } |
| | | |
| | | int64_t CalcAllocIndex(int64_t size) |
| | |
| | | { |
| | | return "bhome_default_shm_v0"; |
| | | } |
| | | bhome_shm::SharedMemory &BHomeShm() |
| | | SharedMemory &BHomeShm() |
| | | { |
| | | static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512); |
| | | static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512); |
| | | return shm; |
| | | } |
| | | |
| | | bool GlobalInit(bhome_shm::SharedMemory &shm) |
| | | { |
| | | MsgI::BindShm(shm); |
| | | CenterInfo *pinfo = GetCenterInfo(shm); |
| | | return pinfo && ShmMsgQueue::SetData(pinfo->mqid_); |
| | | } |
| | | bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); } |
| | | |
| | | MQId NewSession() { return 10 * (++GetCenterInfo(BHomeShm())->mqid_); } |
| | | |
| | | void SetLastError(const int ec, const std::string &msg) |
| | | { |