box/center_main.cc | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
box/center_topic_node.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/defs.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/defs.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/robust.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/topic_node.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/topic_node.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
utest/api_test.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
utest/tcp_test.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
box/center_main.cc
@@ -27,63 +27,6 @@ using namespace std::chrono_literals; using namespace bhome_shm; namespace { const std::string kCenterRunningFlag = "bh_center_single_flag_0"; class InstanceFlag { public: InstanceFlag(SharedMemory &shm, const std::string &name) : shm_(shm), name_(name), run_(false) {} ~InstanceFlag() { Stop(); } bool TryStartAsFirstInstance() { if (run_) { return true; } auto mtx(shm_.FindOrCreate<Mutex>(name_ + "_mutex_0")); auto time_stamp(shm_.FindOrCreate<int64_t>(name_ + "_timestamp_0", 0)); if (mtx && time_stamp) { Guard lock(*mtx); auto now = NowSec(); LOG_DEBUG() << "old: " << *time_stamp << ", now: " << now; if (now > *time_stamp + 10) { *time_stamp = now; auto UpdateTime = [this, time_stamp]() { while (run_) { std::this_thread::sleep_for(1s); *time_stamp = NowSec(); } }; run_.store(true); std::thread(UpdateTime).swap(worker_); return true; } } return false; } private: void Stop() { run_.store(false); if (worker_.joinable()) { worker_.join(); } } std::thread worker_; SharedMemory &shm_; std::string name_; std::atomic<bool> run_; }; } // namespace int center_main(int argc, const char *argv[]) { AppArg args(argc, argv); @@ -101,22 +44,14 @@ if (strcasecmp(lvl.c_str(), "error") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::error); } if (strcasecmp(lvl.c_str(), "fatal") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::fatal); } auto &shm = BHomeShm(); if (!CenterInit(shm)) { auto msg = "init memory error."; if (!CenterInit()) { auto msg = "init memory failed, or center is already running."; LOG_FATAL() << msg; printf("%s\n", msg); exit(0); } auto &shm = BHomeShm(); GlobalInit(shm); InstanceFlag inst(shm, kCenterRunningFlag); if (!inst.TryStartAsFirstInstance()) { auto msg = "another instance is running, exit."; LOG_INFO() << msg; printf("%s\n", msg); return 0; } if (args.Has("daemon") || args.Has("d")) { int r = daemon(0, 0); // TODO center control msg to close itself. @@ -125,9 +60,9 @@ BHCenter center(shm); center.Start(); auto msg = "center started ..."; auto msg = "center (" + shm.name() + ") started ..."; LOG_INFO() << msg; printf("%s\n", msg); printf("%s\n", msg.c_str()); WaitForSignals({SIGINT, SIGTERM}); center.Stop(); LOG_INFO() << "center stopped."; box/center_topic_node.cpp
@@ -56,7 +56,7 @@ } // namespace CenterTopicNode::CenterTopicNode(CenterPtr center, SharedMemory &shm) : pscenter_(center), pnode_(new TopicNode(shm)), run_(false) {} pscenter_(center), pnode_(new TopicNode(shm, 200)), run_(false) {} CenterTopicNode::~CenterTopicNode() { Stop(); } src/defs.cpp
@@ -22,6 +22,7 @@ #include <boost/uuid/random_generator.hpp> #include <boost/uuid/string_generator.hpp> #include <boost/uuid/uuid.hpp> #include <sys/file.h> namespace { @@ -76,8 +77,16 @@ static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct."); const int64_t kCenterInfoFixedAddress = 1024 * 4; const int64_t kShmMetaInfoFixedAddress = 1024 * 16; const boost::uuids::uuid kCenterInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a"); const boost::uuids::uuid kMetaInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a"); struct BHomeMetaInfo { boost::uuids::uuid tag_; std::atomic<uint64_t> shm_id_; std::atomic<uint64_t> ssn_id_; }; struct CenterMetaInfo { boost::uuids::uuid tag_; CenterInfo info_; @@ -88,16 +97,43 @@ template <class T = void> T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); } class FileLock { public: FileLock(const std::string &path) : fd_(Open(path)) { if (fd_ == -1) { throw std::runtime_error("error open file:" + path); } } ~FileLock() { Close(fd_); } bool try_lock() { return fd_ != -1 && (flock(fd_, LOCK_EX | LOCK_NB) == 0); } void unlock() { flock(fd_, LOCK_UN); } private: static int Open(const std::string &path) { return open(path.c_str(), O_RDONLY, 0666); } static int Close(int fd) { return close(fd); } int fd_; std::mutex mtx_; }; SharedMemory &BHomeMetaShm() { static std::string name("bhshmq_meta_v0"); static SharedMemory shm(name, 1024 * 128); return shm; } } // namespace CenterInfo *GetCenterInfo(SharedMemory &shm) { auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); if (pmeta->tag_ == kCenterInfoTag) { if (pmeta->tag_ == kMetaInfoTag) { return &pmeta->info_; } return nullptr; } ShmSocket &DefaultSender(SharedMemory &shm) { typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair; @@ -121,11 +157,55 @@ local_cache = store.back(); return *local_cache.second; } BHomeMetaInfo *GetBHomeMeta() { auto p = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address())); return (p->tag_ == kMetaInfoTag) ? p : nullptr; } bool ShmMetaInit() { SharedMemory &shm = BHomeMetaShm(); static FileLock fl("/dev/shm/" + shm.name()); if (!fl.try_lock()) { // single center instance only. return false; } auto pmeta = GetBHomeMeta(); if (pmeta && pmeta->tag_ == kMetaInfoTag) { ++pmeta->shm_id_; // inc shm id return true; // already exist. } else { Mutex *mutex = shm.FindOrCreate<Mutex>("bhshmq_meta_lock"); if (!mutex || !mutex->try_lock()) { return false; } DEFER1(mutex->unlock()); auto base = Addr(shm.get_address()); auto offset = kShmMetaInfoFixedAddress; void *p = shm.Alloc(offset * 2); if (Addr(p) - base <= offset) { pmeta = new (Ptr(offset + base)) BHomeMetaInfo; pmeta->tag_ = kMetaInfoTag; pmeta->shm_id_ = 100; pmeta->ssn_id_ = 10000; return true; } } return false; } // put center info at fixed memory position. // as boost shm find object (find socket/mq by id, etc...) also locks inside, // which node might crash inside and cause deadlock. bool CenterInit(SharedMemory &shm) bool CenterInit() { if (!ShmMetaInit()) { return false; } SharedMemory &shm = BHomeShm(); Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock"); if (!mutex || !mutex->try_lock()) { return false; @@ -133,7 +213,7 @@ DEFER1(mutex->unlock()); auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); if (pmeta->tag_ == kCenterInfoTag) { if (pmeta->tag_ == kMetaInfoTag) { return true; } else { auto base = Addr(shm.get_address()); @@ -155,7 +235,7 @@ InitMQ(info.mq_center_, NextId()); InitMQ(info.mq_bus_, NextId()); pmeta->tag_ = kCenterInfoTag; pmeta->tag_ = kMetaInfoTag; return true; } } @@ -183,8 +263,10 @@ std::string BHomeShmName() { return "bhome_default_shm_v0"; auto bhome_meta = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address())); return "bhome_shmq_id_" + std::to_string(bhome_meta->shm_id_.load()); } SharedMemory &BHomeShm() { static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512); @@ -193,7 +275,7 @@ bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); } MQId NewSession() { return 10 * (++GetCenterInfo(BHomeShm())->mqid_); } MQId NewSession() { return 10 * (++GetBHomeMeta()->ssn_id_); } void SetLastError(const int ec, const std::string &msg) { src/defs.h
@@ -39,9 +39,6 @@ MQInfo mq_bus_; MQInfo mq_sender_; robust::AtomicReqRep init_rr_; std::atomic<MQId> mqid_; CenterInfo() : mqid_(100000) {} }; const int kBHCenterPort = 24287; @@ -59,7 +56,7 @@ ShmSocket &DefaultSender(SharedMemory &shm); MQId NewSession(); bool CenterInit(SharedMemory &shm); bool CenterInit(); bool GlobalInit(SharedMemory &shm); typedef std::string Topic; void SetLastError(const int ec, const std::string &msg); src/robust.h
@@ -67,6 +67,8 @@ typedef std::function<Data(const Data)> Handler; bool ClientRequest(const Data request, Data &reply); bool ServerProcess(Handler onReq); AtomicReqRep() : data_(0), timestamp_(now()) {} private: enum State { @@ -79,7 +81,7 @@ static Data Decode(Data d) { return d >> 3; } typedef std::chrono::steady_clock steady_clock; typedef steady_clock::duration Duration; Duration now() { return steady_clock::now().time_since_epoch(); } static Duration now() { return steady_clock::now().time_since_epoch(); } bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); } std::atomic<Data> data_; src/topic_node.cpp
@@ -50,8 +50,8 @@ } // namespace TopicNode::TopicNode(SharedMemory &shm) : shm_(shm), state_(eStateUninited) TopicNode::TopicNode(SharedMemory &shm, MQId ssn_id) : shm_(shm), state_(eStateUninited), ssn_id_(ssn_id) { } src/topic_node.h
@@ -39,7 +39,7 @@ const MQInfo &BusAddr() const { return BHTopicBusAddress(shm()); } public: TopicNode(SharedMemory &shm); TopicNode(SharedMemory &shm, MQId ssn_id = 0); ~TopicNode(); // topic node utest/api_test.cpp
@@ -235,8 +235,9 @@ printf("query with ip set\n"); host.set_ip("127.0.0.1"); host.set_port(kBHCenterPort); host.set_mq_id(1000011); host.set_abs_addr(10296); // center topic node address. host.set_mq_id(201); host.set_abs_addr(10072); std::string dest(host.SerializeAsString()); void *proc_id = 0; utest/tcp_test.cpp
@@ -52,8 +52,8 @@ head.mutable_dest()->set_ip(connect_addr); head.mutable_dest()->set_port(port); head.mutable_dest()->set_mq_id(1000011); head.mutable_dest()->set_abs_addr(10296); head.mutable_dest()->set_mq_id(201); head.mutable_dest()->set_abs_addr(10072); return (MsgI::Serialize(head, req)); };