From ab898268c8bc493ca9862b2d64f2e1e7d20e5a4c Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 02 六月 2021 13:39:00 +0800 Subject: [PATCH] refactor. --- src/defs.cpp | 58 +++++++++++++++++++++++++++++++++++++++------------------- 1 files changed, 39 insertions(+), 19 deletions(-) diff --git a/src/defs.cpp b/src/defs.cpp index 2715911..57df1bc 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -90,7 +90,7 @@ } // namespace -CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm) +CenterInfo *GetCenterInfo(SharedMemory &shm) { auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); if (pmeta->tag_ == kCenterInfoTag) { @@ -98,11 +98,33 @@ } return nullptr; } +ShmSocket &DefaultSender(SharedMemory &shm) +{ + typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair; + static std::vector<Pair> store; + static std::mutex s_mtx; + thread_local Pair local_cache; + if (local_cache.first == &shm) { + return *local_cache.second; + } + + std::lock_guard<std::mutex> lk(s_mtx); + for (auto &kv : store) { + if (kv.first == &shm) { + local_cache = kv; + return *local_cache.second; + } + } + auto &mq = GetCenterInfo(shm)->mq_sender_; + store.emplace_back(&shm, new ShmSocket(mq.offset_, shm, mq.id_)); + local_cache = store.back(); + return *local_cache.second; +} // put center info at fixed memory position. // as boost shm find object (find socket/mq by id, etc...) also locks inside, // which node might crash inside and cause deadlock. -bool CenterInit(bhome_shm::SharedMemory &shm) +bool CenterInit(SharedMemory &shm) { Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock"); if (!mutex || !mutex->try_lock()) { @@ -123,7 +145,7 @@ auto InitMQ = [&](auto &mq, auto &&id) { mq.id_ = id; - ShmSocket tmp(shm, id, 16); + ShmSocket tmp(shm, id, eOpenOrCreate); mq.offset_ = tmp.AbsAddr(); }; @@ -140,16 +162,15 @@ return false; } -const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; } -const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; } -const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; } -bool BHNodeInit(const int64_t request, int64_t &reply) +const MQInfo &BHTopicCenterAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_center_; } +const MQInfo &BHTopicBusAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_bus_; } +bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply) { - return GetCenterInfo(BHomeShm())->init_rr_.ClientRequest(request, reply); + return GetCenterInfo(shm)->init_rr_.ClientRequest(request, reply); } -void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq) +void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq) { - GetCenterInfo(BHomeShm())->init_rr_.ServerProcess(onReq); + GetCenterInfo(shm)->init_rr_.ServerProcess(onReq); } int64_t CalcAllocIndex(int64_t size) @@ -164,18 +185,15 @@ { return "bhome_default_shm_v0"; } -bhome_shm::SharedMemory &BHomeShm() +SharedMemory &BHomeShm() { - static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512); + static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512); return shm; } -bool GlobalInit(bhome_shm::SharedMemory &shm) -{ - MsgI::BindShm(shm); - CenterInfo *pinfo = GetCenterInfo(shm); - return pinfo && ShmMsgQueue::SetData(pinfo->mqid_); -} +bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); } + +MQId NewSession() { return 10 * (++GetCenterInfo(BHomeShm())->mqid_); } void SetLastError(const int ec, const std::string &msg) { @@ -189,4 +207,6 @@ msg = LastErrorStore().msg_; } -int NodeTimeoutSec() { return 60; } \ No newline at end of file +int NodeTimeoutSec() { return 60; } + +std::string BHLogDir() { return "/opt/vasystem/valog/"; } -- Gitblit v1.8.0