From 58d904a328c0d849769b483e901a0be9426b8209 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期二, 20 七月 2021 20:20:44 +0800 Subject: [PATCH] 调整Request C.BHFree的位置 --- src/defs.cpp | 274 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 264 insertions(+), 10 deletions(-) diff --git a/src/defs.cpp b/src/defs.cpp index 0ca82bf..8305d66 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -17,7 +17,12 @@ */ #include "defs.h" #include "msg.h" -#include "shm_queue.h" +#include "shm_msg_queue.h" +#include "shm_socket.h" +#include <boost/uuid/random_generator.hpp> +#include <boost/uuid/string_generator.hpp> +#include <boost/uuid/uuid.hpp> +#include <sys/file.h> namespace { @@ -33,21 +38,266 @@ return le; } -} // namespace +constexpr int64_t AllocSizeIndex[] = { + 16, 24, 32, 40, 48, 56, 64, 72, + 80, 88, 96, 104, 120, 136, 152, 168, + 184, 200, 224, 248, 272, 296, 328, 360, + 392, 432, 472, 520, 568, 624, 680, 744, + 816, 896, 984, 1080, 1184, 1296, 1416, 1544, + 1688, 1848, 2016, 2200, 2400, 2624, 2864, 3128, + 3416, 3728, 4072, 4448, 4856, 5304, 5792, 6320, + 6896, 7528, 8216, 8968, 9784, 10680, 11656, 12720, + 13880, 15144, 16520, 18024, 19664, 21456, 23408, 25536, + 27864, 30400, 33168, 36184, 39480, 43072, 46992, 51264, + 55928, 61016, 66568, 72624, 79232, 86440, 94304, 102880, + 112232, 122440, 133576, 145720, 158968, 173424, 189192, 206392, + 225160, 245632, 267968, 292328, 318904, 347896, 379528, 414032, + 451672, 492736, 537536, 586408, 639720, 697880, 761328, 830544, + 906048, 988416, 1078272, 1176296, 1283232, 1399896, 1527160, 1665992, + 1817448, 1982672, 2162920, 2359552, 2574056, 2808064, 3063344, 3341832, + 3645640, 3977064, 4338616, 4733040, 5163320, 5632712, 6144776, 6703392, + 7312792, 7977592, 8702832, 9494000, 10357096, 11298656, 12325808, 13446336, + 14668736, 16002264, 17457016, 19044024, 20775304, 22663968, 24724328, 26972000, + 29424000, 32098912, 35017000, 38200368, 41673128, 45461600, 49594472, 54103064, + 59021528, 64387128, 70240504, 76626008, 83592008, 91191288, 99481408, 108525176, + 118391104, 129153936, 140895208, 153703864, 167676944, 182920304, 199549424, 217690280, + 237480312, 259069432, 282621200, 308314040, 336342592, 366919192, 400275488, 436664168, + 476360912, 519666456, 566908864, 618446040, 674668408, 736001904, 802911168, 875903096, + 955530656, 1042397080, 1137160456, 1240538680, 1353314928, 1476343560, 1610556616, 1756970856, + 1916695480, 2090940528, 2281026032, 2488392040, 2714609504, 2961392192, 3230609664, 3524301456, + 3844692504, 4194210008, 4575501832, 4991456544, 5445225320, 5940245808, 6480268160, 7069383448, + 7712054672, 8413150552, 9177982424, 10012344464, 10922557600, 11915517384, 12998746240, 14180450448, + 15469582312, 16875907976, 18410081432, 20083725200, 21909518400, 23901292800, 26074137600, 28444513752, + 31030378640, 33851322152, 36928715080, 40285871000, 43948222912, 47943515904, 52302017352, 57056746208, + 62243723136, 67902243424, 74075174648, 80809281440, 88155579752, 96169723368, 104912425496, 114449918728, + 124854456800, 136204861968, 148587122152, 162095042352, 176830955296, 192906496688, 210443450936, 229574673752}; -bhome_shm::SharedMemory &BHomeShm() +const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]); +static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough."); +static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct."); + +const int64_t kCenterInfoFixedAddress = 1024 * 4; +const int64_t kShmMetaInfoFixedAddress = 1024 * 16; + +const boost::uuids::uuid kMetaInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a"); + +struct BHomeMetaInfo { + boost::uuids::uuid tag_; + std::atomic<uint64_t> shm_id_; + std::atomic<uint64_t> ssn_id_; +}; + +struct CenterMetaInfo { + boost::uuids::uuid tag_; + CenterInfo info_; +}; + +int64_t Addr(void *ptr) { return reinterpret_cast<int64_t>(ptr); } +// void *Ptr(const int64_t offset) { return reinterpret_cast<void *>(offset); } +template <class T = void> +T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); } + +class FileLock { - static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512); +public: + FileLock(const std::string &path) : + fd_(Open(path)) + { + if (fd_ == -1) { throw std::runtime_error("error open file:" + path); } + } + ~FileLock() { Close(fd_); } + bool try_lock() { return fd_ != -1 && (flock(fd_, LOCK_EX | LOCK_NB) == 0); } + void unlock() { flock(fd_, LOCK_UN); } + +private: + static int Open(const std::string &path) { return open(path.c_str(), O_RDONLY, 0666); } + static int Close(int fd) { return close(fd); } + int fd_; + std::mutex mtx_; +}; + +SharedMemory &BHomeMetaShm() +{ + static std::string name("bhshmq_meta_v0"); + static SharedMemory shm(name, 1024 * 128); return shm; } -bool GlobalInit(bhome_shm::SharedMemory &shm) +ShmSocket &ShmSender(SharedMemory &shm, const bool reset) { - MsgI::BindShm(shm); - typedef std::atomic<MQId> IdSrc; - IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000); - return ShmMsgQueue::SetData(*psrc); + typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair; + static std::vector<Pair> store; + static std::mutex s_mtx; + thread_local Pair local_cache; + + std::lock_guard<std::mutex> lk(s_mtx); + + if (reset) { + for (auto &kv : store) { + if (kv.first == &shm) { + auto &mq = GetCenterInfo(shm)->mq_sender_; + kv.second.reset(new ShmSocket(mq.offset_, shm, mq.id_)); + local_cache = kv; + return *local_cache.second; + } + } + } else if (local_cache.first == &shm) { + return *local_cache.second; + } + + for (auto &kv : store) { + if (kv.first == &shm) { + local_cache = kv; + return *local_cache.second; + } + } + auto &mq = GetCenterInfo(shm)->mq_sender_; + store.emplace_back(&shm, std::make_shared<ShmSocket>(mq.offset_, shm, mq.id_)); + // store.emplace_back(&shm, new ShmSocket(mq.offset_, shm, mq.id_)); + local_cache = store.back(); + return *local_cache.second; } +} // namespace + +CenterInfo *GetCenterInfo(SharedMemory &shm) +{ + auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); + if (pmeta->tag_ == kMetaInfoTag) { + return &pmeta->info_; + } + return nullptr; +} + +ShmSocket &DefaultSender(SharedMemory &shm) { return ShmSender(shm, false); } + +BHomeMetaInfo *GetBHomeMeta() +{ + auto p = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address())); + return (p->tag_ == kMetaInfoTag) ? p : nullptr; +} + +bool ShmMetaInit() +{ + SharedMemory &shm = BHomeMetaShm(); + + static FileLock fl("/dev/shm/" + shm.name()); + if (!fl.try_lock()) { // single center instance only. + return false; + } + + auto pmeta = GetBHomeMeta(); + if (pmeta && pmeta->tag_ == kMetaInfoTag) { + // remove old shm + SharedMemory::Remove(BHomeShmName()); + ++pmeta->shm_id_; // inc shm id + return true; // already exist. + } else { + Mutex *mutex = shm.FindOrCreate<Mutex>("bhshmq_meta_lock"); + if (!mutex || !mutex->try_lock()) { + return false; + } + DEFER1(mutex->unlock()); + + auto base = Addr(shm.get_address()); + auto offset = kShmMetaInfoFixedAddress; + void *p = shm.Alloc(offset * 2); + if (Addr(p) - base <= offset) { + pmeta = new (Ptr(offset + base)) BHomeMetaInfo; + pmeta->tag_ = kMetaInfoTag; + pmeta->shm_id_ = 100; + pmeta->ssn_id_ = 10000; + return true; + } + } + return false; +} + +// put center info at fixed memory position. +// as boost shm find object (find socket/mq by id, etc...) also locks inside, +// which node might crash inside and cause deadlock. +bool CenterInit() +{ + if (!ShmMetaInit()) { return false; } + + SharedMemory &shm = BHomeShm(); + Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock"); + if (!mutex || !mutex->try_lock()) { + return false; + } + DEFER1(mutex->unlock()); + + auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); + if (pmeta->tag_ == kMetaInfoTag) { + return true; + } else { + auto base = Addr(shm.get_address()); + auto offset = kCenterInfoFixedAddress; + void *p = shm.Alloc(offset * 2); + if (Addr(p) - base <= offset) { + pmeta = new (Ptr(offset + base)) CenterMetaInfo; + auto &info = pmeta->info_; + + auto InitMQ = [&](auto &mq, auto &&id) { + mq.id_ = id; + ShmSocket tmp(shm, id, eOpenOrCreate); + mq.offset_ = tmp.AbsAddr(); + }; + + int id = 100; + auto NextId = [&]() { return ++id; }; + InitMQ(info.mq_sender_, NextId()); + InitMQ(info.mq_center_, NextId()); + InitMQ(info.mq_bus_, NextId()); + + pmeta->tag_ = kMetaInfoTag; + return true; + } + } + return false; +} + +const MQInfo &BHTopicCenterAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_center_; } +const MQInfo &BHTopicBusAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_bus_; } +bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply) +{ + return GetCenterInfo(shm)->init_rr_.ClientRequest(request, reply); +} +void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq) +{ + GetCenterInfo(shm)->init_rr_.ServerProcess(onReq); +} + +int64_t CalcAllocIndex(int64_t size) +{ + auto pos = std::lower_bound(AllocSizeIndex, AllocSizeIndex + kAllocIndexLen, size); + return (pos == AllocSizeIndex + kAllocIndexLen) ? -1 : pos - AllocSizeIndex; +} + +int64_t GetAllocSize(int index) { return index < kAllocIndexLen ? AllocSizeIndex[index] : 0; } + +std::string BHomeShmName() +{ + auto bhome_meta = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address())); + return "bhshmq_sid_" + std::to_string(bhome_meta->shm_id_.load()); +} + +SharedMemory &BHomeShm() +{ + static std::unique_ptr<SharedMemory> shm_ptr; + static std::string shm_name; + if (!shm_ptr || shm_name != BHomeShmName()) { + shm_name = BHomeShmName(); + if (shm_ptr) { + ShmSender(*shm_ptr, true); // reset sender. + } + shm_ptr.reset(new SharedMemory(shm_name, 1024 * 1024 * 512)); + } + return *shm_ptr; +} + +bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); } + +MQId NewSession() { return 10 * (++GetBHomeMeta()->ssn_id_); } void SetLastError(const int ec, const std::string &msg) { @@ -59,4 +309,8 @@ { ec = LastErrorStore().ec_; msg = LastErrorStore().msg_; -} \ No newline at end of file +} + +int NodeTimeoutSec() { return 60; } + +std::string BHLogDir() { return "/opt/vasystem/valog/"; } -- Gitblit v1.8.0