| | |
| | | |
| | | Json json; |
| | | json.put("proc_id", proc_.proc_id()); |
| | | center_.Publish(kTopicNodeOffline, json.dump()); |
| | | center_.Publish(shm_, kTopicNodeOffline, json.dump()); |
| | | } |
| | | |
| | | void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) |
| | |
| | | if (proc_.proc_id().empty()) { return; } // node init, ignore. |
| | | Json json; |
| | | json.put("proc_id", proc_.proc_id()); |
| | | center_.Publish(topic, json.dump()); |
| | | center_.Publish(shm_, topic, json.dump()); |
| | | }; |
| | | |
| | | LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff; |
| | |
| | | SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg); |
| | | }; |
| | | |
| | | Node node(new NodeInfo(*this)); |
| | | Node node(new NodeInfo(*this, shm)); |
| | | if (UpdateRegInfo(node) && PrepareProcInit(node)) { |
| | | reply |= (node->addrs_[ssn] << 4); |
| | | nodes_[ssn] = node; |
| | |
| | | auto &node = pos->second; |
| | | try { |
| | | for (int i = 0; i < msg.extra_mq_num(); ++i) { |
| | | ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16); |
| | | ShmSocket tmp(node->shm_, true, head.ssn_id() + i + 1, 16); |
| | | node->addrs_.emplace(tmp.id(), tmp.AbsAddr()); |
| | | auto addr = reply.add_extra_mqs(); |
| | | addr->set_mq_id(tmp.id()); |
| | |
| | | } |
| | | |
| | | for (auto &addr : node->addrs_) { |
| | | cleaner_(addr.first); |
| | | auto &id = addr.first; |
| | | auto r = ShmSocket::Remove(node->shm_, id); |
| | | LOG_DEBUG() << "remove mq " << id << (r ? " ok" : " failed"); |
| | | } |
| | | |
| | | node->addrs_.clear(); |
| | | } |
| | | |
| | | void NodeCenter::Publish(const Topic &topic, const std::string &content) |
| | | void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content) |
| | | { |
| | | try { |
| | | // LOG_DEBUG() << "center publish: " << topic << ": " << content; |
| | |
| | | DEFER1(msg.Release()); |
| | | RecordMsg(msg); |
| | | |
| | | auto &mq = GetCenterInfo(BHomeShm())->mq_sender_; |
| | | ShmSocket sender(mq.offset_, BHomeShm(), mq.id_); |
| | | auto &mq = GetCenterInfo(shm)->mq_sender_; |
| | | ShmSocket sender(mq.offset_, shm, mq.id_); |
| | | |
| | | for (auto &cli : clients) { |
| | | auto node = cli.weak_node_.lock(); |