center save shm on each node, no bind to shm.
| | |
| | | |
| | | BHCenter::BHCenter(Socket::Shm &shm) |
| | | { |
| | | auto gc = [&](const MQId id) { |
| | | auto r = ShmSocket::Remove(shm, id); |
| | | if (r) { |
| | | LOG_DEBUG() << "remove mq " << id << " ok\n"; |
| | | } |
| | | }; |
| | | |
| | | auto nsec = NodeTimeoutSec(); |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. |
| | | AddCenter(center_ptr); |
| | | |
| | | for (auto &kv : Centers()) { |
| | |
| | | |
| | | Json json; |
| | | json.put("proc_id", proc_.proc_id()); |
| | | center_.Publish(kTopicNodeOffline, json.dump()); |
| | | center_.Publish(shm_, kTopicNodeOffline, json.dump()); |
| | | } |
| | | |
| | | void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) |
| | |
| | | if (proc_.proc_id().empty()) { return; } // node init, ignore. |
| | | Json json; |
| | | json.put("proc_id", proc_.proc_id()); |
| | | center_.Publish(topic, json.dump()); |
| | | center_.Publish(shm_, topic, json.dump()); |
| | | }; |
| | | |
| | | LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff; |
| | |
| | | SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg); |
| | | }; |
| | | |
| | | Node node(new NodeInfo(*this)); |
| | | Node node(new NodeInfo(*this, shm)); |
| | | if (UpdateRegInfo(node) && PrepareProcInit(node)) { |
| | | reply |= (node->addrs_[ssn] << 4); |
| | | nodes_[ssn] = node; |
| | |
| | | auto &node = pos->second; |
| | | try { |
| | | for (int i = 0; i < msg.extra_mq_num(); ++i) { |
| | | ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16); |
| | | ShmSocket tmp(node->shm_, true, head.ssn_id() + i + 1, 16); |
| | | node->addrs_.emplace(tmp.id(), tmp.AbsAddr()); |
| | | auto addr = reply.add_extra_mqs(); |
| | | addr->set_mq_id(tmp.id()); |
| | |
| | | } |
| | | |
| | | for (auto &addr : node->addrs_) { |
| | | cleaner_(addr.first); |
| | | auto &id = addr.first; |
| | | auto r = ShmSocket::Remove(node->shm_, id); |
| | | LOG_DEBUG() << "remove mq " << id << (r ? " ok" : " failed"); |
| | | } |
| | | |
| | | node->addrs_.clear(); |
| | | } |
| | | |
| | | void NodeCenter::Publish(const Topic &topic, const std::string &content) |
| | | void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content) |
| | | { |
| | | try { |
| | | // LOG_DEBUG() << "center publish: " << topic << ": " << content; |
| | |
| | | DEFER1(msg.Release()); |
| | | RecordMsg(msg); |
| | | |
| | | auto &mq = GetCenterInfo(BHomeShm())->mq_sender_; |
| | | ShmSocket sender(mq.offset_, BHomeShm(), mq.id_); |
| | | auto &mq = GetCenterInfo(shm)->mq_sender_; |
| | | ShmSocket sender(mq.offset_, shm, mq.id_); |
| | | |
| | | for (auto &cli : clients) { |
| | | auto node = cli.weak_node_.lock(); |
| | |
| | | |
| | | struct NodeInfo { |
| | | NodeCenter ¢er_; |
| | | SharedMemory &shm_; |
| | | ProcState state_; // state |
| | | std::map<MQId, int64_t> addrs_; // registered mqs |
| | | ProcInfo proc_; // |
| | | AddressTopics services_; // address: topics |
| | | AddressTopics subscriptions_; // address: topics |
| | | NodeInfo(NodeCenter ¢er) : |
| | | center_(center) {} |
| | | NodeInfo(NodeCenter ¢er, SharedMemory &shm) : |
| | | center_(center), shm_(shm) {} |
| | | void PutOffline(const int64_t offline_time); |
| | | void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time); |
| | | }; |
| | |
| | | public: |
| | | typedef std::set<TopicDest> Clients; |
| | | |
| | | NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time_sec, const int64_t kill_time_sec) : |
| | | id_(id), cleaner_(cleaner), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {} |
| | | NodeCenter(const std::string &id, const int64_t offline_time_sec, const int64_t kill_time_sec) : |
| | | id_(id), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {} |
| | | |
| | | // center name, no relative to shm. |
| | | const std::string &id() const { return id_; } |
| | |
| | | private: |
| | | void CheckNodes(); |
| | | bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; } |
| | | void Publish(const Topic &topic, const std::string &content); |
| | | void Publish(SharedMemory &shm, const Topic &topic, const std::string &content); |
| | | bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; } |
| | | bool Valid(const WeakNode &weak) |
| | | { |
| | |
| | | ProcRecords procs_; // To get a short index for msg alloc. |
| | | MsgRecords msgs_; // record all msgs alloced. |
| | | |
| | | Cleaner cleaner_; // remove mqs. |
| | | int64_t offline_time_; |
| | | int64_t kill_time_; |
| | | int64_t last_check_time_; |
| | |
| | | threads.Launch(hb, &run); |
| | | threads.Launch(showStatus, &run); |
| | | int ncli = 10; |
| | | const int64_t nreq = 10; //00 * 100; |
| | | const int64_t nreq = 1000 * 100; |
| | | |
| | | for (int i = 0; i < 10; ++i) { |
| | | SyncRequest(i); |
| | |
| | | } |
| | | } |
| | | |
| | | Sleep(1s); |
| | | |
| | | run = false; |
| | | threads.WaitAll(); |
| | | auto &st = Status(); |
| | | Sleep(1s); |
| | | printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load()); |
| | | BHCleanup(); |
| | | printf("after cleanup\n"); |