| | |
| | | * ===================================================================================== |
| | | */ |
| | | #include "node_center.h" |
| | | #include "json.h" |
| | | #include "log.h" |
| | | |
| | | using ssjson::Json; |
| | | |
| | | namespace |
| | | { |
| | | std::string Join(const std::string &parent, const std::string &child) |
| | | { |
| | | return parent + kTopicSep + child; |
| | | } |
| | | const std::string kTopicCenterRoot = "#center"; |
| | | const std::string kTopicNode = Join(kTopicCenterRoot, "node"); |
| | | const std::string kTopicNodeOnline = Join(kTopicNode, "online"); |
| | | const std::string kTopicNodeOffline = Join(kTopicNode, "offline"); |
| | | } // namespace |
| | | |
| | | ProcIndex ProcRecords::Put(const ProcId &proc_id, const MQId ssn) |
| | | { |
| | |
| | | { |
| | | auto pos = msgs_.find(id); |
| | | if (pos != msgs_.end()) { |
| | | ShmMsg(pos->second).Free(); |
| | | pos->second.Free(); |
| | | msgs_.erase(pos); |
| | | } else { |
| | | LOG_TRACE() << "ignore late free request."; |
| | |
| | | return; |
| | | } |
| | | // LOG_FUNCTION; |
| | | const size_t total = msgs_.size(); |
| | | time_to_clean_ = now + 1; |
| | | int64_t limit = std::max(10000ul, msgs_.size() / 10); |
| | | int64_t limit = std::max(10000ul, total / 10); |
| | | int64_t n = 0; |
| | | auto it = msgs_.begin(); |
| | | while (it != msgs_.end() && --limit > 0) { |
| | |
| | | ++n; |
| | | }; |
| | | int n = now - msg.timestamp(); |
| | | if (n < 10) { |
| | | if (msg.Count() == 0) { |
| | | Free(); |
| | | } else if (n > NodeTimeoutSec()) { |
| | | Free(); |
| | | } else { |
| | | ++it; |
| | | } else if (msg.Count() == 0) { |
| | | Free(); |
| | | } else if (n > 60) { |
| | | Free(); |
| | | } |
| | | } |
| | | if (n > 0) { |
| | | LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n; |
| | | LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n << '/' << total; |
| | | } |
| | | } |
| | | |
| | | void MsgRecords::DebugPrint() const |
| | | { |
| | | LOG_DEBUG() << "msgs : " << size(); |
| | | LOG_TRACE() << "msgs : " << size(); |
| | | int i = 0; |
| | | int total_count = 0; |
| | | for (auto &kv : msgs_) { |
| | | MsgI msg(kv.second); |
| | | auto &msg = kv.second; |
| | | total_count += msg.Count(); |
| | | LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size(); |
| | | LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second.Offset() << ", count: " << msg.Count() << ", size: " << msg.Size(); |
| | | } |
| | | LOG_DEBUG() << "total count: " << total_count; |
| | | LOG_TRACE() << "total count: " << total_count; |
| | | } |
| | | |
| | | // NodeCenter::ProcState |
| | | void NodeCenter::ProcState::PutOffline(const int64_t offline_time) |
| | | void NodeCenter::NodeInfo::PutOffline(const int64_t offline_time) |
| | | { |
| | | timestamp_ = NowSec() - offline_time; |
| | | flag_ = kStateOffline; |
| | | state_.timestamp_ = NowSec() - offline_time; |
| | | state_.flag_ = kStateOffline; |
| | | |
| | | Json json; |
| | | json.put("proc_id", proc_.proc_id()); |
| | | center_.Publish(shm_, kTopicNodeOffline, json.dump()); |
| | | } |
| | | |
| | | void NodeCenter::ProcState::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) |
| | | void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) |
| | | { |
| | | auto diff = now - timestamp_; |
| | | LOG_DEBUG() << "state " << this << " diff: " << diff; |
| | | auto old = state_.flag_; |
| | | auto diff = now - state_.timestamp_; |
| | | auto publish = [this](const std::string &topic) { |
| | | if (proc_.proc_id().empty()) { return; } // node init, ignore. |
| | | Json json; |
| | | json.put("proc_id", proc_.proc_id()); |
| | | center_.Publish(shm_, topic, json.dump()); |
| | | }; |
| | | |
| | | LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff; |
| | | if (diff < offline_time) { |
| | | flag_ = kStateNormal; |
| | | state_.flag_ = kStateNormal; |
| | | if (old != state_.flag_) { |
| | | publish(kTopicNodeOnline); |
| | | } |
| | | } else if (diff < kill_time) { |
| | | flag_ = kStateOffline; |
| | | state_.flag_ = kStateOffline; |
| | | if (old != state_.flag_) { |
| | | publish(kTopicNodeOffline); |
| | | } |
| | | } else { |
| | | flag_ = kStateKillme; |
| | | state_.flag_ = kStateKillme; |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | auto UpdateRegInfo = [&](Node &node) { |
| | | node->state_.timestamp_ = NowSec() - offline_time_; |
| | | node->state_.UpdateState(NowSec(), offline_time_, kill_time_); |
| | | node->UpdateState(NowSec(), offline_time_, kill_time_); |
| | | |
| | | // create sockets. |
| | | try { |
| | | ShmSocket tmp(shm, true, ssn, 16); |
| | | ShmSocket tmp(shm, ssn, eCreate); |
| | | node->addrs_.emplace(ssn, tmp.AbsAddr()); |
| | | return true; |
| | | } catch (...) { |
| | |
| | | |
| | | auto PrepareProcInit = [&](Node &node) { |
| | | bool r = false; |
| | | ShmMsg init_msg; |
| | | ShmMsg init_msg(shm); |
| | | DEFER1(init_msg.Release()); |
| | | MsgProcInit body; |
| | | auto head = InitMsgHead(GetType(body), id(), ssn); |
| | |
| | | SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg); |
| | | }; |
| | | |
| | | Node node(new NodeInfo); |
| | | Node node(new NodeInfo(*this, shm)); |
| | | if (UpdateRegInfo(node) && PrepareProcInit(node)) { |
| | | reply |= (node->addrs_[ssn] << 4); |
| | | nodes_[ssn] = node; |
| | |
| | | { |
| | | RecordMsg(msg); |
| | | return socket.Send(dest, msg); |
| | | } |
| | | |
| | | NodeCenter::Node NodeCenter::GetNode(const MQId mq_id) |
| | | { |
| | | Node node; |
| | | auto ssn = mq_id - (mq_id % 10); |
| | | auto pos = nodes_.find(ssn); |
| | | if (pos != nodes_.end()) { |
| | | node = pos->second; |
| | | } |
| | | return node; |
| | | } |
| | | |
| | | bool NodeCenter::PassRemoteRequestToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content, ShmSocket::RecvCB &&cb) |
| | | { |
| | | Node node(GetNode(dest.id_)); |
| | | if (!node || !Valid(*node)) { |
| | | LOG_ERROR() << id() << " pass remote request, dest not found."; |
| | | return false; |
| | | } |
| | | |
| | | ShmSocket &sender(DefaultSender(node->shm_)); |
| | | auto route = head.add_route(); |
| | | route->set_mq_id(sender.id()); |
| | | route->set_abs_addr(sender.AbsAddr()); |
| | | |
| | | ShmMsg msg(node->shm_); |
| | | if (!msg.Make(head, body_content)) { return false; } |
| | | DEFER1(msg.Release();); |
| | | RecordMsg(msg); |
| | | return sender.Send(dest, msg, head.msg_id(), std::move(cb)); |
| | | } |
| | | |
| | | bool NodeCenter::PassRemoteReplyToLocal(const MQInfo &dest, BHMsgHead &head, const std::string &body_content) |
| | | { |
| | | Node node(GetNode(dest.id_)); |
| | | if (!node) { |
| | | LOG_ERROR() << id() << " pass remote reply , ssn not found."; |
| | | return false; |
| | | } |
| | | auto offset = node->addrs_[dest.id_]; |
| | | if (offset != dest.offset_) { |
| | | LOG_ERROR() << id() << " pass remote reply, dest address not match"; |
| | | return false; |
| | | } |
| | | |
| | | ShmMsg msg(node->shm_); |
| | | if (!msg.Make(head, body_content)) { return false; } |
| | | DEFER1(msg.Release();); |
| | | RecordMsg(msg); |
| | | return DefaultSender(node->shm_).Send(dest, msg); |
| | | } |
| | | |
| | | void NodeCenter::OnAlloc(ShmSocket &socket, const int64_t val) |
| | |
| | | if (!FindMq()) { return; } |
| | | |
| | | auto size = GetAllocSize((val >> 52) & MaskBits(8)); |
| | | MsgI new_msg; |
| | | MsgI new_msg(socket.shm()); |
| | | if (new_msg.Make(size)) { |
| | | // 31bit proc index, 28bit id, ,4bit cmd+flag |
| | | int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0); |
| | |
| | | auto &node = pos->second; |
| | | try { |
| | | for (int i = 0; i < msg.extra_mq_num(); ++i) { |
| | | ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16); |
| | | ShmSocket tmp(node->shm_, head.ssn_id() + i + 1, eCreate); |
| | | node->addrs_.emplace(tmp.id(), tmp.AbsAddr()); |
| | | auto addr = reply.add_extra_mqs(); |
| | | addr->set_mq_id(tmp.id()); |
| | |
| | | MQId ssn = head.ssn_id(); |
| | | // when node restart, ssn will change, |
| | | // and old node will be removed after timeout. |
| | | auto UpdateRegInfo = [&](Node &node) { |
| | | node->proc_.Swap(msg.mutable_proc()); |
| | | node->state_.timestamp_ = head.timestamp(); |
| | | node->state_.UpdateState(NowSec(), offline_time_, kill_time_); |
| | | }; |
| | | |
| | | auto pos = nodes_.find(ssn); |
| | | if (pos == nodes_.end()) { |
| | | return MakeReply(eInvalidInput, "invalid session."); |
| | | } |
| | | |
| | | // update proc info |
| | | Node &node = pos->second; |
| | | UpdateRegInfo(node); |
| | | LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")"; |
| | | |
| | | // try to remove old session |
| | | auto old = online_node_addr_map_.find(head.proc_id()); |
| | | if (old != online_node_addr_map_.end()) { // old session |
| | | auto &old_ssn = old->second; |
| | | if (old_ssn != ssn) { |
| | | nodes_[old_ssn]->state_.PutOffline(offline_time_); |
| | | nodes_[old_ssn]->PutOffline(offline_time_); |
| | | |
| | | LOG_DEBUG() << "put node (" << nodes_[old_ssn]->proc_.proc_id() << ") ssn (" << old->second << ") offline"; |
| | | old_ssn = ssn; |
| | | } |
| | | } else { |
| | | online_node_addr_map_.emplace(head.proc_id(), ssn); |
| | | } |
| | | |
| | | // update proc info |
| | | Node &node = pos->second; |
| | | node->proc_.Swap(msg.mutable_proc()); |
| | | node->state_.timestamp_ = head.timestamp(); |
| | | node->UpdateState(NowSec(), offline_time_, kill_time_); |
| | | |
| | | LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")"; |
| | | |
| | | return MakeReply(eSuccess); |
| | | } catch (...) { |
| | | return MakeReply(eError, "register node error."); |
| | |
| | | return HandleMsg( |
| | | head, [&](Node node) -> MsgCommonReply { |
| | | NodeInfo &ni = *node; |
| | | ni.state_.PutOffline(offline_time_); |
| | | ni.PutOffline(offline_time_); |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | |
| | | return HandleMsg(head, [&](Node node) { |
| | | NodeInfo &ni = *node; |
| | | ni.state_.timestamp_ = head.timestamp(); |
| | | ni.state_.UpdateState(NowSec(), offline_time_, kill_time_); |
| | | ni.UpdateState(NowSec(), offline_time_, kill_time_); |
| | | |
| | | auto &info = msg.proc(); |
| | | if (!info.public_info().empty()) { |
| | |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | | MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req) |
| | | |
| | | MsgQueryProcReply NodeCenter::QueryProc(const std::string &proc_id) |
| | | { |
| | | typedef MsgQueryProcReply Reply; |
| | | auto query = [&](Node self) -> Reply { |
| | | auto Add1 = [](Reply &reply, Node node) { |
| | | auto info = reply.add_proc_list(); |
| | | *info->mutable_proc() = node->proc_; |
| | | info->set_online(node->state_.flag_ == kStateNormal); |
| | | for (auto &addr_topics : node->services_) { |
| | | for (auto &topic : addr_topics.second) { |
| | | info->mutable_topics()->add_topic_list(topic); |
| | | } |
| | | auto Add1 = [](Reply &reply, Node node) { |
| | | auto info = reply.add_proc_list(); |
| | | *info->mutable_proc() = node->proc_; |
| | | info->mutable_proc()->clear_private_info(); |
| | | info->set_online(node->state_.flag_ == kStateNormal); |
| | | for (auto &addr_topics : node->services_) { |
| | | for (auto &topic : addr_topics.second) { |
| | | info->mutable_topics()->add_topic_list(topic); |
| | | } |
| | | }; |
| | | |
| | | if (!req.proc_id().empty()) { |
| | | auto pos = online_node_addr_map_.find(req.proc_id()); |
| | | if (pos == online_node_addr_map_.end()) { |
| | | return MakeReply<Reply>(eNotFound, "proc not found."); |
| | | } else { |
| | | auto node_pos = nodes_.find(pos->second); |
| | | if (node_pos == nodes_.end()) { |
| | | return MakeReply<Reply>(eNotFound, "proc node not found."); |
| | | } else { |
| | | auto reply = MakeReply<Reply>(eSuccess); |
| | | Add1(reply, node_pos->second); |
| | | return reply; |
| | | } |
| | | } |
| | | } else { |
| | | Reply reply(MakeReply<Reply>(eSuccess)); |
| | | for (auto &kv : nodes_) { |
| | | Add1(reply, kv.second); |
| | | } |
| | | return reply; |
| | | } |
| | | }; |
| | | |
| | | if (!proc_id.empty()) { |
| | | auto pos = online_node_addr_map_.find(proc_id); |
| | | if (pos == online_node_addr_map_.end()) { |
| | | return MakeReply<Reply>(eNotFound, "proc not found."); |
| | | } else { |
| | | auto node_pos = nodes_.find(pos->second); |
| | | if (node_pos == nodes_.end()) { |
| | | return MakeReply<Reply>(eNotFound, "proc node not found."); |
| | | } else { |
| | | auto reply = MakeReply<Reply>(eSuccess); |
| | | Add1(reply, node_pos->second); |
| | | return reply; |
| | | } |
| | | } |
| | | } else { |
| | | Reply reply(MakeReply<Reply>(eSuccess)); |
| | | for (auto &kv : nodes_) { |
| | | Add1(reply, kv.second); |
| | | } |
| | | return reply; |
| | | } |
| | | } |
| | | MsgQueryProcReply NodeCenter::QueryProc(const BHMsgHead &head, const MsgQueryProc &req) |
| | | { |
| | | typedef MsgQueryProcReply Reply; |
| | | auto query = [&](Node self) -> Reply { return this->QueryProc(req.proc_id()); }; |
| | | return HandleMsg<Reply>(head, query); |
| | | } |
| | | |
| | |
| | | NodeCenter::Clients NodeCenter::DoFindClients(const std::string &topic) |
| | | { |
| | | Clients dests; |
| | | auto Find1 = [&](const std::string &t) { |
| | | auto pos = subscribe_map_.find(topic); |
| | | auto Find1 = [&](const std::string &exact) { |
| | | auto pos = subscribe_map_.find(exact); |
| | | if (pos != subscribe_map_.end()) { |
| | | auto &clients = pos->second; |
| | | for (auto &cli : clients) { |
| | |
| | | // Find1(std::string()); // sub all. |
| | | break; |
| | | } else { |
| | | Find1(topic.substr(0, pos)); |
| | | Find1(topic.substr(0, pos - 1)); |
| | | } |
| | | } |
| | | return dests; |
| | |
| | | auto it = nodes_.begin(); |
| | | while (it != nodes_.end()) { |
| | | auto &cli = *it->second; |
| | | cli.state_.UpdateState(now, offline_time_, kill_time_); |
| | | cli.UpdateState(now, offline_time_, kill_time_); |
| | | if (cli.state_.flag_ == kStateKillme) { |
| | | RemoveNode(it->second); |
| | | it = nodes_.erase(it); |
| | |
| | | } |
| | | |
| | | for (auto &addr : node->addrs_) { |
| | | cleaner_(addr.first); |
| | | auto &id = addr.first; |
| | | auto r = ShmSocket::Remove(node->shm_, id); |
| | | LOG_DEBUG() << "remove mq " << id << (r ? " ok" : " failed"); |
| | | } |
| | | |
| | | node->addrs_.clear(); |
| | | } |
| | | |
| | | void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content) |
| | | { |
| | | try { |
| | | // LOG_DEBUG() << "center publish: " << topic << ": " << content; |
| | | Clients clients(DoFindClients(topic)); |
| | | if (clients.empty()) { return; } |
| | | |
| | | MsgPublish pub; |
| | | pub.set_topic(topic); |
| | | pub.set_data(content); |
| | | BHMsgHead head(InitMsgHead(GetType(pub), id(), 0)); |
| | | MsgI msg(shm); |
| | | if (msg.Make(head, pub)) { |
| | | DEFER1(msg.Release()); |
| | | RecordMsg(msg); |
| | | |
| | | for (auto &cli : clients) { |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node && node->state_.flag_ == kStateNormal) { |
| | | DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | } |
| | | } |
| | | } |
| | | |
| | | } catch (...) { |
| | | LOG_ERROR() << "center publish error."; |
| | | } |
| | | } |