From 34cd75f77d0ca94dbdba4e6cc9451fe4d33e78b3 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 19 五月 2021 19:14:13 +0800 Subject: [PATCH] add api BHQueryProcs. --- src/defs.cpp | 89 ++++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 85 insertions(+), 4 deletions(-) diff --git a/src/defs.cpp b/src/defs.cpp index 450349e..0d9efc1 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -18,6 +18,10 @@ #include "defs.h" #include "msg.h" #include "shm_msg_queue.h" +#include "socket.h" +#include <boost/uuid/random_generator.hpp> +#include <boost/uuid/string_generator.hpp> +#include <boost/uuid/uuid.hpp> namespace { @@ -70,7 +74,83 @@ const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]); static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough."); static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct."); + +const int64_t kCenterInfoFixedAddress = 1024 * 4; + +const boost::uuids::uuid kCenterInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a"); +struct CenterMetaInfo { + boost::uuids::uuid tag_; + CenterInfo info_; +}; + +int64_t Addr(void *ptr) { return reinterpret_cast<int64_t>(ptr); } +// void *Ptr(const int64_t offset) { return reinterpret_cast<void *>(offset); } +template <class T = void> +T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); } + } // namespace + +CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm) +{ + auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); + if (pmeta->tag_ == kCenterInfoTag) { + return &pmeta->info_; + } + return nullptr; +} + +// put center info at fixed memory position. +// as boost shm find object (find socket/mq by id, etc...) also locks inside, +// which node might crash inside and cause deadlock. +bool CenterInit(bhome_shm::SharedMemory &shm) +{ + Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock"); + if (!mutex || !mutex->try_lock()) { + return false; + } + DEFER1(mutex->unlock()); + + auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); + if (pmeta->tag_ == kCenterInfoTag) { + return true; + } else { + auto base = Addr(shm.get_address()); + auto offset = kCenterInfoFixedAddress; + void *p = shm.Alloc(offset * 2); + if (Addr(p) - base <= offset) { + pmeta = new (Ptr(offset + base)) CenterMetaInfo; + auto &info = pmeta->info_; + + auto InitMQ = [&](auto &mq, auto &&id) { + mq.id_ = id; + ShmSocket tmp(shm, id, 16); + mq.offset_ = tmp.AbsAddr(); + }; + + int id = 100; + auto NextId = [&]() { return ++id; }; + InitMQ(info.mq_sender_, NextId()); + InitMQ(info.mq_center_, NextId()); + InitMQ(info.mq_bus_, NextId()); + + pmeta->tag_ = kCenterInfoTag; + return true; + } + } + return false; +} + +const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; } +const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; } +const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; } +bool BHNodeInit(const int64_t request, int64_t &reply) +{ + return GetCenterInfo(BHomeShm())->init_rr_.ClientRequest(request, reply); +} +void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq) +{ + GetCenterInfo(BHomeShm())->init_rr_.ServerProcess(onReq); +} int64_t CalcAllocIndex(int64_t size) { @@ -93,9 +173,8 @@ bool GlobalInit(bhome_shm::SharedMemory &shm) { MsgI::BindShm(shm); - typedef std::atomic<MQId> IdSrc; - IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000); - return psrc && ShmMsgQueue::SetData(*psrc); + CenterInfo *pinfo = GetCenterInfo(shm); + return pinfo && ShmMsgQueue::SetData(pinfo->mqid_); } void SetLastError(const int ec, const std::string &msg) @@ -108,4 +187,6 @@ { ec = LastErrorStore().ec_; msg = LastErrorStore().msg_; -} \ No newline at end of file +} + +int NodeTimeoutSec() { return 60; } \ No newline at end of file -- Gitblit v1.8.0