From 993c556000a414011626770540678948f16eaa9e Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 02 六月 2021 17:40:50 +0800 Subject: [PATCH] center restart with new shm; set center node ssn. --- box/center_main.cc | 75 +----------------- src/robust.h | 4 utest/api_test.cpp | 5 box/center_topic_node.cpp | 2 src/defs.h | 5 - utest/tcp_test.cpp | 4 src/topic_node.h | 2 src/topic_node.cpp | 4 src/defs.cpp | 96 ++++++++++++++++++++++- 9 files changed, 107 insertions(+), 90 deletions(-) diff --git a/box/center_main.cc b/box/center_main.cc index e7715f8..c7d67e3 100644 --- a/box/center_main.cc +++ b/box/center_main.cc @@ -27,63 +27,6 @@ using namespace std::chrono_literals; using namespace bhome_shm; -namespace -{ -const std::string kCenterRunningFlag = "bh_center_single_flag_0"; - -class InstanceFlag -{ - -public: - InstanceFlag(SharedMemory &shm, const std::string &name) : - shm_(shm), name_(name), run_(false) {} - ~InstanceFlag() { Stop(); } - - bool TryStartAsFirstInstance() - { - if (run_) { - return true; - } - - auto mtx(shm_.FindOrCreate<Mutex>(name_ + "_mutex_0")); - auto time_stamp(shm_.FindOrCreate<int64_t>(name_ + "_timestamp_0", 0)); - - if (mtx && time_stamp) { - Guard lock(*mtx); - auto now = NowSec(); - LOG_DEBUG() << "old: " << *time_stamp << ", now: " << now; - if (now > *time_stamp + 10) { - *time_stamp = now; - auto UpdateTime = [this, time_stamp]() { - while (run_) { - std::this_thread::sleep_for(1s); - *time_stamp = NowSec(); - } - }; - run_.store(true); - std::thread(UpdateTime).swap(worker_); - return true; - } - } - return false; - } - -private: - void Stop() - { - run_.store(false); - if (worker_.joinable()) { - worker_.join(); - } - } - - std::thread worker_; - SharedMemory &shm_; - std::string name_; - std::atomic<bool> run_; -}; - -} // namespace int center_main(int argc, const char *argv[]) { AppArg args(argc, argv); @@ -101,22 +44,14 @@ if (strcasecmp(lvl.c_str(), "error") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::error); } if (strcasecmp(lvl.c_str(), "fatal") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::fatal); } - auto &shm = BHomeShm(); - if (!CenterInit(shm)) { - auto msg = "init memory error."; + if (!CenterInit()) { + auto msg = "init memory failed, or center is already running."; LOG_FATAL() << msg; printf("%s\n", msg); exit(0); } + auto &shm = BHomeShm(); GlobalInit(shm); - - InstanceFlag inst(shm, kCenterRunningFlag); - if (!inst.TryStartAsFirstInstance()) { - auto msg = "another instance is running, exit."; - LOG_INFO() << msg; - printf("%s\n", msg); - return 0; - } if (args.Has("daemon") || args.Has("d")) { int r = daemon(0, 0); // TODO center control msg to close itself. @@ -125,9 +60,9 @@ BHCenter center(shm); center.Start(); - auto msg = "center started ..."; + auto msg = "center (" + shm.name() + ") started ..."; LOG_INFO() << msg; - printf("%s\n", msg); + printf("%s\n", msg.c_str()); WaitForSignals({SIGINT, SIGTERM}); center.Stop(); LOG_INFO() << "center stopped."; diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp index 749f4e6..8228992 100644 --- a/box/center_topic_node.cpp +++ b/box/center_topic_node.cpp @@ -56,7 +56,7 @@ } // namespace CenterTopicNode::CenterTopicNode(CenterPtr center, SharedMemory &shm) : - pscenter_(center), pnode_(new TopicNode(shm)), run_(false) {} + pscenter_(center), pnode_(new TopicNode(shm, 200)), run_(false) {} CenterTopicNode::~CenterTopicNode() { Stop(); } diff --git a/src/defs.cpp b/src/defs.cpp index 57df1bc..694e2c5 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -22,6 +22,7 @@ #include <boost/uuid/random_generator.hpp> #include <boost/uuid/string_generator.hpp> #include <boost/uuid/uuid.hpp> +#include <sys/file.h> namespace { @@ -76,8 +77,16 @@ static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct."); const int64_t kCenterInfoFixedAddress = 1024 * 4; +const int64_t kShmMetaInfoFixedAddress = 1024 * 16; -const boost::uuids::uuid kCenterInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a"); +const boost::uuids::uuid kMetaInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a"); + +struct BHomeMetaInfo { + boost::uuids::uuid tag_; + std::atomic<uint64_t> shm_id_; + std::atomic<uint64_t> ssn_id_; +}; + struct CenterMetaInfo { boost::uuids::uuid tag_; CenterInfo info_; @@ -88,16 +97,43 @@ template <class T = void> T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); } +class FileLock +{ +public: + FileLock(const std::string &path) : + fd_(Open(path)) + { + if (fd_ == -1) { throw std::runtime_error("error open file:" + path); } + } + ~FileLock() { Close(fd_); } + bool try_lock() { return fd_ != -1 && (flock(fd_, LOCK_EX | LOCK_NB) == 0); } + void unlock() { flock(fd_, LOCK_UN); } + +private: + static int Open(const std::string &path) { return open(path.c_str(), O_RDONLY, 0666); } + static int Close(int fd) { return close(fd); } + int fd_; + std::mutex mtx_; +}; + +SharedMemory &BHomeMetaShm() +{ + static std::string name("bhshmq_meta_v0"); + static SharedMemory shm(name, 1024 * 128); + return shm; +} + } // namespace CenterInfo *GetCenterInfo(SharedMemory &shm) { auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); - if (pmeta->tag_ == kCenterInfoTag) { + if (pmeta->tag_ == kMetaInfoTag) { return &pmeta->info_; } return nullptr; } + ShmSocket &DefaultSender(SharedMemory &shm) { typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair; @@ -121,11 +157,55 @@ local_cache = store.back(); return *local_cache.second; } + +BHomeMetaInfo *GetBHomeMeta() +{ + auto p = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address())); + return (p->tag_ == kMetaInfoTag) ? p : nullptr; +} + +bool ShmMetaInit() +{ + SharedMemory &shm = BHomeMetaShm(); + + static FileLock fl("/dev/shm/" + shm.name()); + if (!fl.try_lock()) { // single center instance only. + return false; + } + + auto pmeta = GetBHomeMeta(); + if (pmeta && pmeta->tag_ == kMetaInfoTag) { + ++pmeta->shm_id_; // inc shm id + return true; // already exist. + } else { + Mutex *mutex = shm.FindOrCreate<Mutex>("bhshmq_meta_lock"); + if (!mutex || !mutex->try_lock()) { + return false; + } + DEFER1(mutex->unlock()); + + auto base = Addr(shm.get_address()); + auto offset = kShmMetaInfoFixedAddress; + void *p = shm.Alloc(offset * 2); + if (Addr(p) - base <= offset) { + pmeta = new (Ptr(offset + base)) BHomeMetaInfo; + pmeta->tag_ = kMetaInfoTag; + pmeta->shm_id_ = 100; + pmeta->ssn_id_ = 10000; + return true; + } + } + return false; +} + // put center info at fixed memory position. // as boost shm find object (find socket/mq by id, etc...) also locks inside, // which node might crash inside and cause deadlock. -bool CenterInit(SharedMemory &shm) +bool CenterInit() { + if (!ShmMetaInit()) { return false; } + + SharedMemory &shm = BHomeShm(); Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock"); if (!mutex || !mutex->try_lock()) { return false; @@ -133,7 +213,7 @@ DEFER1(mutex->unlock()); auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); - if (pmeta->tag_ == kCenterInfoTag) { + if (pmeta->tag_ == kMetaInfoTag) { return true; } else { auto base = Addr(shm.get_address()); @@ -155,7 +235,7 @@ InitMQ(info.mq_center_, NextId()); InitMQ(info.mq_bus_, NextId()); - pmeta->tag_ = kCenterInfoTag; + pmeta->tag_ = kMetaInfoTag; return true; } } @@ -183,8 +263,10 @@ std::string BHomeShmName() { - return "bhome_default_shm_v0"; + auto bhome_meta = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address())); + return "bhome_shmq_id_" + std::to_string(bhome_meta->shm_id_.load()); } + SharedMemory &BHomeShm() { static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512); @@ -193,7 +275,7 @@ bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); } -MQId NewSession() { return 10 * (++GetCenterInfo(BHomeShm())->mqid_); } +MQId NewSession() { return 10 * (++GetBHomeMeta()->ssn_id_); } void SetLastError(const int ec, const std::string &msg) { diff --git a/src/defs.h b/src/defs.h index 5f6fc16..56de8fa 100644 --- a/src/defs.h +++ b/src/defs.h @@ -39,9 +39,6 @@ MQInfo mq_bus_; MQInfo mq_sender_; robust::AtomicReqRep init_rr_; - std::atomic<MQId> mqid_; - CenterInfo() : - mqid_(100000) {} }; const int kBHCenterPort = 24287; @@ -59,7 +56,7 @@ ShmSocket &DefaultSender(SharedMemory &shm); MQId NewSession(); -bool CenterInit(SharedMemory &shm); +bool CenterInit(); bool GlobalInit(SharedMemory &shm); typedef std::string Topic; void SetLastError(const int ec, const std::string &msg); diff --git a/src/robust.h b/src/robust.h index c46010a..1a3f430 100644 --- a/src/robust.h +++ b/src/robust.h @@ -67,6 +67,8 @@ typedef std::function<Data(const Data)> Handler; bool ClientRequest(const Data request, Data &reply); bool ServerProcess(Handler onReq); + AtomicReqRep() : + data_(0), timestamp_(now()) {} private: enum State { @@ -79,7 +81,7 @@ static Data Decode(Data d) { return d >> 3; } typedef std::chrono::steady_clock steady_clock; typedef steady_clock::duration Duration; - Duration now() { return steady_clock::now().time_since_epoch(); } + static Duration now() { return steady_clock::now().time_since_epoch(); } bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); } std::atomic<Data> data_; diff --git a/src/topic_node.cpp b/src/topic_node.cpp index b21f7ef..f592bff 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -50,8 +50,8 @@ } // namespace -TopicNode::TopicNode(SharedMemory &shm) : - shm_(shm), state_(eStateUninited) +TopicNode::TopicNode(SharedMemory &shm, MQId ssn_id) : + shm_(shm), state_(eStateUninited), ssn_id_(ssn_id) { } diff --git a/src/topic_node.h b/src/topic_node.h index 3d6767b..8c08fcf 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -39,7 +39,7 @@ const MQInfo &BusAddr() const { return BHTopicBusAddress(shm()); } public: - TopicNode(SharedMemory &shm); + TopicNode(SharedMemory &shm, MQId ssn_id = 0); ~TopicNode(); // topic node diff --git a/utest/api_test.cpp b/utest/api_test.cpp index bddcbf7..fc1ad08 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -235,8 +235,9 @@ printf("query with ip set\n"); host.set_ip("127.0.0.1"); host.set_port(kBHCenterPort); - host.set_mq_id(1000011); - host.set_abs_addr(10296); + // center topic node address. + host.set_mq_id(201); + host.set_abs_addr(10072); std::string dest(host.SerializeAsString()); void *proc_id = 0; diff --git a/utest/tcp_test.cpp b/utest/tcp_test.cpp index 86a0897..2aead7e 100644 --- a/utest/tcp_test.cpp +++ b/utest/tcp_test.cpp @@ -52,8 +52,8 @@ head.mutable_dest()->set_ip(connect_addr); head.mutable_dest()->set_port(port); - head.mutable_dest()->set_mq_id(1000011); - head.mutable_dest()->set_abs_addr(10296); + head.mutable_dest()->set_mq_id(201); + head.mutable_dest()->set_abs_addr(10072); return (MsgI::Serialize(head, req)); }; -- Gitblit v1.8.0