/* * ===================================================================================== * * Filename: defs.cpp * * Description: * * Version: 1.0 * Created: 2021年04月06日 19时23分14秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "defs.h" #include "msg.h" #include "shm_msg_queue.h" #include "shm_socket.h" #include #include #include #include namespace { struct LastError { int ec_ = 0; std::string msg_; }; LastError &LastErrorStore() { thread_local LastError le; return le; } constexpr int64_t AllocSizeIndex[] = { 16, 24, 32, 40, 48, 56, 64, 72, 80, 88, 96, 104, 120, 136, 152, 168, 184, 200, 224, 248, 272, 296, 328, 360, 392, 432, 472, 520, 568, 624, 680, 744, 816, 896, 984, 1080, 1184, 1296, 1416, 1544, 1688, 1848, 2016, 2200, 2400, 2624, 2864, 3128, 3416, 3728, 4072, 4448, 4856, 5304, 5792, 6320, 6896, 7528, 8216, 8968, 9784, 10680, 11656, 12720, 13880, 15144, 16520, 18024, 19664, 21456, 23408, 25536, 27864, 30400, 33168, 36184, 39480, 43072, 46992, 51264, 55928, 61016, 66568, 72624, 79232, 86440, 94304, 102880, 112232, 122440, 133576, 145720, 158968, 173424, 189192, 206392, 225160, 245632, 267968, 292328, 318904, 347896, 379528, 414032, 451672, 492736, 537536, 586408, 639720, 697880, 761328, 830544, 906048, 988416, 1078272, 1176296, 1283232, 1399896, 1527160, 1665992, 1817448, 1982672, 2162920, 2359552, 2574056, 2808064, 3063344, 3341832, 3645640, 3977064, 4338616, 4733040, 5163320, 5632712, 6144776, 6703392, 7312792, 7977592, 8702832, 9494000, 10357096, 11298656, 12325808, 13446336, 14668736, 16002264, 17457016, 19044024, 20775304, 22663968, 24724328, 26972000, 29424000, 32098912, 35017000, 38200368, 41673128, 45461600, 49594472, 54103064, 59021528, 64387128, 70240504, 76626008, 83592008, 91191288, 99481408, 108525176, 118391104, 129153936, 140895208, 153703864, 167676944, 182920304, 199549424, 217690280, 237480312, 259069432, 282621200, 308314040, 336342592, 366919192, 400275488, 436664168, 476360912, 519666456, 566908864, 618446040, 674668408, 736001904, 802911168, 875903096, 955530656, 1042397080, 1137160456, 1240538680, 1353314928, 1476343560, 1610556616, 1756970856, 1916695480, 2090940528, 2281026032, 2488392040, 2714609504, 2961392192, 3230609664, 3524301456, 3844692504, 4194210008, 4575501832, 4991456544, 5445225320, 5940245808, 6480268160, 7069383448, 7712054672, 8413150552, 9177982424, 10012344464, 10922557600, 11915517384, 12998746240, 14180450448, 15469582312, 16875907976, 18410081432, 20083725200, 21909518400, 23901292800, 26074137600, 28444513752, 31030378640, 33851322152, 36928715080, 40285871000, 43948222912, 47943515904, 52302017352, 57056746208, 62243723136, 67902243424, 74075174648, 80809281440, 88155579752, 96169723368, 104912425496, 114449918728, 124854456800, 136204861968, 148587122152, 162095042352, 176830955296, 192906496688, 210443450936, 229574673752}; const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]); static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough."); static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct."); const int64_t kCenterInfoFixedAddress = 1024 * 4; const int64_t kShmMetaInfoFixedAddress = 1024 * 16; const boost::uuids::uuid kMetaInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a"); struct BHomeMetaInfo { boost::uuids::uuid tag_; std::atomic shm_id_; std::atomic ssn_id_; }; struct CenterMetaInfo { boost::uuids::uuid tag_; CenterInfo info_; }; int64_t Addr(void *ptr) { return reinterpret_cast(ptr); } // void *Ptr(const int64_t offset) { return reinterpret_cast(offset); } template T *Ptr(const int64_t offset) { return reinterpret_cast(offset); } class FileLock { public: FileLock(const std::string &path) : fd_(Open(path)) { if (fd_ == -1) { throw std::runtime_error("error open file:" + path); } } ~FileLock() { Close(fd_); } bool try_lock() { return fd_ != -1 && (flock(fd_, LOCK_EX | LOCK_NB) == 0); } void unlock() { flock(fd_, LOCK_UN); } private: static int Open(const std::string &path) { return open(path.c_str(), O_RDONLY, 0666); } static int Close(int fd) { return close(fd); } int fd_; std::mutex mtx_; }; SharedMemory &BHomeMetaShm() { static std::string name("bhshmq_meta_v0"); static SharedMemory shm(name, 1024 * 128); return shm; } } // namespace CenterInfo *GetCenterInfo(SharedMemory &shm) { auto pmeta = Ptr(kCenterInfoFixedAddress + Addr(shm.get_address())); if (pmeta->tag_ == kMetaInfoTag) { return &pmeta->info_; } return nullptr; } ShmSocket &DefaultSender(SharedMemory &shm) { typedef std::pair> Pair; static std::vector store; static std::mutex s_mtx; thread_local Pair local_cache; if (local_cache.first == &shm) { return *local_cache.second; } std::lock_guard lk(s_mtx); for (auto &kv : store) { if (kv.first == &shm) { local_cache = kv; return *local_cache.second; } } auto &mq = GetCenterInfo(shm)->mq_sender_; store.emplace_back(&shm, new ShmSocket(mq.offset_, shm, mq.id_)); local_cache = store.back(); return *local_cache.second; } BHomeMetaInfo *GetBHomeMeta() { auto p = Ptr(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address())); return (p->tag_ == kMetaInfoTag) ? p : nullptr; } bool ShmMetaInit() { SharedMemory &shm = BHomeMetaShm(); static FileLock fl("/dev/shm/" + shm.name()); if (!fl.try_lock()) { // single center instance only. return false; } auto pmeta = GetBHomeMeta(); if (pmeta && pmeta->tag_ == kMetaInfoTag) { ++pmeta->shm_id_; // inc shm id return true; // already exist. } else { Mutex *mutex = shm.FindOrCreate("bhshmq_meta_lock"); if (!mutex || !mutex->try_lock()) { return false; } DEFER1(mutex->unlock()); auto base = Addr(shm.get_address()); auto offset = kShmMetaInfoFixedAddress; void *p = shm.Alloc(offset * 2); if (Addr(p) - base <= offset) { pmeta = new (Ptr(offset + base)) BHomeMetaInfo; pmeta->tag_ = kMetaInfoTag; pmeta->shm_id_ = 100; pmeta->ssn_id_ = 10000; return true; } } return false; } // put center info at fixed memory position. // as boost shm find object (find socket/mq by id, etc...) also locks inside, // which node might crash inside and cause deadlock. bool CenterInit() { if (!ShmMetaInit()) { return false; } SharedMemory &shm = BHomeShm(); Mutex *mutex = shm.FindOrCreate("shm_center_lock"); if (!mutex || !mutex->try_lock()) { return false; } DEFER1(mutex->unlock()); auto pmeta = Ptr(kCenterInfoFixedAddress + Addr(shm.get_address())); if (pmeta->tag_ == kMetaInfoTag) { return true; } else { auto base = Addr(shm.get_address()); auto offset = kCenterInfoFixedAddress; void *p = shm.Alloc(offset * 2); if (Addr(p) - base <= offset) { pmeta = new (Ptr(offset + base)) CenterMetaInfo; auto &info = pmeta->info_; auto InitMQ = [&](auto &mq, auto &&id) { mq.id_ = id; ShmSocket tmp(shm, id, eOpenOrCreate); mq.offset_ = tmp.AbsAddr(); }; int id = 100; auto NextId = [&]() { return ++id; }; InitMQ(info.mq_sender_, NextId()); InitMQ(info.mq_center_, NextId()); InitMQ(info.mq_bus_, NextId()); pmeta->tag_ = kMetaInfoTag; return true; } } return false; } const MQInfo &BHTopicCenterAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_center_; } const MQInfo &BHTopicBusAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_bus_; } bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply) { return GetCenterInfo(shm)->init_rr_.ClientRequest(request, reply); } void BHCenterHandleInit(SharedMemory &shm, std::function const &onReq) { GetCenterInfo(shm)->init_rr_.ServerProcess(onReq); } int64_t CalcAllocIndex(int64_t size) { auto pos = std::lower_bound(AllocSizeIndex, AllocSizeIndex + kAllocIndexLen, size); return (pos == AllocSizeIndex + kAllocIndexLen) ? -1 : pos - AllocSizeIndex; } int64_t GetAllocSize(int index) { return index < kAllocIndexLen ? AllocSizeIndex[index] : 0; } std::string BHomeShmName() { auto bhome_meta = Ptr(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address())); return "bhome_shmq_id_" + std::to_string(bhome_meta->shm_id_.load()); } SharedMemory &BHomeShm() { static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512); return shm; } bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); } MQId NewSession() { return 10 * (++GetBHomeMeta()->ssn_id_); } void SetLastError(const int ec, const std::string &msg) { LastErrorStore().ec_ = ec; LastErrorStore().msg_ = msg; } void GetLastError(int &ec, std::string &msg) { ec = LastErrorStore().ec_; msg = LastErrorStore().msg_; } int NodeTimeoutSec() { return 60; } std::string BHLogDir() { return "/opt/vasystem/valog/"; }