From 58d904a328c0d849769b483e901a0be9426b8209 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期二, 20 七月 2021 20:20:44 +0800 Subject: [PATCH] 调整Request C.BHFree的位置 --- src/defs.cpp | 174 +++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 files changed, 149 insertions(+), 25 deletions(-) diff --git a/src/defs.cpp b/src/defs.cpp index 2715911..8305d66 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -22,6 +22,7 @@ #include <boost/uuid/random_generator.hpp> #include <boost/uuid/string_generator.hpp> #include <boost/uuid/uuid.hpp> +#include <sys/file.h> namespace { @@ -76,8 +77,16 @@ static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct."); const int64_t kCenterInfoFixedAddress = 1024 * 4; +const int64_t kShmMetaInfoFixedAddress = 1024 * 16; -const boost::uuids::uuid kCenterInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a"); +const boost::uuids::uuid kMetaInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a"); + +struct BHomeMetaInfo { + boost::uuids::uuid tag_; + std::atomic<uint64_t> shm_id_; + std::atomic<uint64_t> ssn_id_; +}; + struct CenterMetaInfo { boost::uuids::uuid tag_; CenterInfo info_; @@ -88,22 +97,129 @@ template <class T = void> T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); } +class FileLock +{ +public: + FileLock(const std::string &path) : + fd_(Open(path)) + { + if (fd_ == -1) { throw std::runtime_error("error open file:" + path); } + } + ~FileLock() { Close(fd_); } + bool try_lock() { return fd_ != -1 && (flock(fd_, LOCK_EX | LOCK_NB) == 0); } + void unlock() { flock(fd_, LOCK_UN); } + +private: + static int Open(const std::string &path) { return open(path.c_str(), O_RDONLY, 0666); } + static int Close(int fd) { return close(fd); } + int fd_; + std::mutex mtx_; +}; + +SharedMemory &BHomeMetaShm() +{ + static std::string name("bhshmq_meta_v0"); + static SharedMemory shm(name, 1024 * 128); + return shm; +} + +ShmSocket &ShmSender(SharedMemory &shm, const bool reset) +{ + typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair; + static std::vector<Pair> store; + static std::mutex s_mtx; + thread_local Pair local_cache; + + std::lock_guard<std::mutex> lk(s_mtx); + + if (reset) { + for (auto &kv : store) { + if (kv.first == &shm) { + auto &mq = GetCenterInfo(shm)->mq_sender_; + kv.second.reset(new ShmSocket(mq.offset_, shm, mq.id_)); + local_cache = kv; + return *local_cache.second; + } + } + } else if (local_cache.first == &shm) { + return *local_cache.second; + } + + for (auto &kv : store) { + if (kv.first == &shm) { + local_cache = kv; + return *local_cache.second; + } + } + auto &mq = GetCenterInfo(shm)->mq_sender_; + store.emplace_back(&shm, std::make_shared<ShmSocket>(mq.offset_, shm, mq.id_)); + // store.emplace_back(&shm, new ShmSocket(mq.offset_, shm, mq.id_)); + local_cache = store.back(); + return *local_cache.second; +} } // namespace -CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm) +CenterInfo *GetCenterInfo(SharedMemory &shm) { auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); - if (pmeta->tag_ == kCenterInfoTag) { + if (pmeta->tag_ == kMetaInfoTag) { return &pmeta->info_; } return nullptr; } +ShmSocket &DefaultSender(SharedMemory &shm) { return ShmSender(shm, false); } + +BHomeMetaInfo *GetBHomeMeta() +{ + auto p = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address())); + return (p->tag_ == kMetaInfoTag) ? p : nullptr; +} + +bool ShmMetaInit() +{ + SharedMemory &shm = BHomeMetaShm(); + + static FileLock fl("/dev/shm/" + shm.name()); + if (!fl.try_lock()) { // single center instance only. + return false; + } + + auto pmeta = GetBHomeMeta(); + if (pmeta && pmeta->tag_ == kMetaInfoTag) { + // remove old shm + SharedMemory::Remove(BHomeShmName()); + ++pmeta->shm_id_; // inc shm id + return true; // already exist. + } else { + Mutex *mutex = shm.FindOrCreate<Mutex>("bhshmq_meta_lock"); + if (!mutex || !mutex->try_lock()) { + return false; + } + DEFER1(mutex->unlock()); + + auto base = Addr(shm.get_address()); + auto offset = kShmMetaInfoFixedAddress; + void *p = shm.Alloc(offset * 2); + if (Addr(p) - base <= offset) { + pmeta = new (Ptr(offset + base)) BHomeMetaInfo; + pmeta->tag_ = kMetaInfoTag; + pmeta->shm_id_ = 100; + pmeta->ssn_id_ = 10000; + return true; + } + } + return false; +} + // put center info at fixed memory position. // as boost shm find object (find socket/mq by id, etc...) also locks inside, // which node might crash inside and cause deadlock. -bool CenterInit(bhome_shm::SharedMemory &shm) +bool CenterInit() { + if (!ShmMetaInit()) { return false; } + + SharedMemory &shm = BHomeShm(); Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock"); if (!mutex || !mutex->try_lock()) { return false; @@ -111,7 +227,7 @@ DEFER1(mutex->unlock()); auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); - if (pmeta->tag_ == kCenterInfoTag) { + if (pmeta->tag_ == kMetaInfoTag) { return true; } else { auto base = Addr(shm.get_address()); @@ -123,7 +239,7 @@ auto InitMQ = [&](auto &mq, auto &&id) { mq.id_ = id; - ShmSocket tmp(shm, id, 16); + ShmSocket tmp(shm, id, eOpenOrCreate); mq.offset_ = tmp.AbsAddr(); }; @@ -133,23 +249,22 @@ InitMQ(info.mq_center_, NextId()); InitMQ(info.mq_bus_, NextId()); - pmeta->tag_ = kCenterInfoTag; + pmeta->tag_ = kMetaInfoTag; return true; } } return false; } -const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; } -const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; } -const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; } -bool BHNodeInit(const int64_t request, int64_t &reply) +const MQInfo &BHTopicCenterAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_center_; } +const MQInfo &BHTopicBusAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_bus_; } +bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply) { - return GetCenterInfo(BHomeShm())->init_rr_.ClientRequest(request, reply); + return GetCenterInfo(shm)->init_rr_.ClientRequest(request, reply); } -void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq) +void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq) { - GetCenterInfo(BHomeShm())->init_rr_.ServerProcess(onReq); + GetCenterInfo(shm)->init_rr_.ServerProcess(onReq); } int64_t CalcAllocIndex(int64_t size) @@ -162,20 +277,27 @@ std::string BHomeShmName() { - return "bhome_default_shm_v0"; -} -bhome_shm::SharedMemory &BHomeShm() -{ - static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512); - return shm; + auto bhome_meta = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address())); + return "bhshmq_sid_" + std::to_string(bhome_meta->shm_id_.load()); } -bool GlobalInit(bhome_shm::SharedMemory &shm) +SharedMemory &BHomeShm() { - MsgI::BindShm(shm); - CenterInfo *pinfo = GetCenterInfo(shm); - return pinfo && ShmMsgQueue::SetData(pinfo->mqid_); + static std::unique_ptr<SharedMemory> shm_ptr; + static std::string shm_name; + if (!shm_ptr || shm_name != BHomeShmName()) { + shm_name = BHomeShmName(); + if (shm_ptr) { + ShmSender(*shm_ptr, true); // reset sender. + } + shm_ptr.reset(new SharedMemory(shm_name, 1024 * 1024 * 512)); + } + return *shm_ptr; } + +bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); } + +MQId NewSession() { return 10 * (++GetBHomeMeta()->ssn_id_); } void SetLastError(const int ec, const std::string &msg) { @@ -189,4 +311,6 @@ msg = LastErrorStore().msg_; } -int NodeTimeoutSec() { return 60; } \ No newline at end of file +int NodeTimeoutSec() { return 60; } + +std::string BHLogDir() { return "/opt/vasystem/valog/"; } -- Gitblit v1.8.0