use node mqid ssn id to index online nodes.
| | |
| | | struct ProcState { |
| | | int64_t timestamp_ = 0; |
| | | uint32_t flag_ = 0; // reserved |
| | | void PutOffline(const int64_t offline_time) |
| | | { |
| | | timestamp_ = NowSec() - offline_time; |
| | | flag_ = kStateOffline; |
| | | } |
| | | void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) |
| | | { |
| | | auto diff = now - timestamp_; |
| | |
| | | } |
| | | |
| | | try { |
| | | MQId ssn = head.ssn_id(); |
| | | // use src_addr as session id. |
| | | // when node restart, src_addr will change, |
| | | // and old node will be removed after timeout. |
| | | auto UpdateRegInfo = [&](Node &node) { |
| | | node->addrs_.insert(SrcAddr(head)); |
| | | for (auto &addr : msg.addrs()) { |
| | |
| | | node->state_.UpdateState(NowSec(), offline_time_, kill_time_); |
| | | }; |
| | | |
| | | auto pos = nodes_.find(head.proc_id()); |
| | | if (pos != nodes_.end()) { // new client |
| | | auto pos = nodes_.find(ssn); |
| | | if (pos != nodes_.end()) { // update |
| | | Node &node = pos->second; |
| | | if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) { |
| | | // node restarted, release old mq. |
| | | RemoveNode(node); |
| | | node.reset(new NodeInfo); |
| | | } |
| | | UpdateRegInfo(node); |
| | | } else { |
| | | Node node(new NodeInfo); |
| | | UpdateRegInfo(node); |
| | | nodes_[node->proc_.proc_id()] = node; |
| | | nodes_[ssn] = node; |
| | | |
| | | auto old = node_addr_map_.find(head.proc_id()); |
| | | if (old != node_addr_map_.end()) { // old session |
| | | auto &old_ssn = old->second; |
| | | nodes_[old_ssn]->state_.PutOffline(offline_time_); |
| | | printf("put %s %ld offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second); |
| | | old_ssn = ssn; |
| | | } else { |
| | | node_addr_map_.emplace(head.proc_id(), ssn); |
| | | } |
| | | } |
| | | return MakeReply(eSuccess); |
| | | } catch (...) { |
| | |
| | | Reply HandleMsg(const BHMsgHead &head, Func const &op) |
| | | { |
| | | try { |
| | | auto pos = nodes_.find(head.proc_id()); |
| | | auto pos = nodes_.find(head.ssn_id()); |
| | | if (pos == nodes_.end()) { |
| | | return MakeReply<Reply>(eNotRegistered, "Node is not registered."); |
| | | } else { |
| | |
| | | return HandleMsg( |
| | | head, [&](Node node) -> MsgCommonReply { |
| | | NodeInfo &ni = *node; |
| | | auto now = NowSec(); // just set to offline. |
| | | ni.state_.timestamp_ = now - offline_time_; |
| | | ni.state_.UpdateState(now, offline_time_, kill_time_); |
| | | ni.state_.PutOffline(offline_time_); |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | |
| | | }; |
| | | EraseMapRec(service_map_, node->services_); |
| | | EraseMapRec(subscribe_map_, node->subscriptions_); |
| | | node_addr_map_.erase(node->proc_.proc_id()); |
| | | |
| | | for (auto &addr : node->addrs_) { |
| | | cleaner_(addr); |
| | |
| | | |
| | | std::unordered_map<Topic, Clients> service_map_; |
| | | std::unordered_map<Topic, Clients> subscribe_map_; |
| | | std::unordered_map<ProcId, Node> nodes_; |
| | | std::unordered_map<Address, Node> nodes_; |
| | | std::unordered_map<std::string, Address> node_addr_map_; |
| | | Cleaner cleaner_; // remove mqs. |
| | | int64_t offline_time_; |
| | | int64_t kill_time_; |
| | |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2); |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id())); |
| | | auto remote = head.route(0).mq_id(); |
| | | socket.Send(remote, reply_head, rep_body); |
| | | }; |
| | |
| | | repeated BHAddress route = 2; // for reply and proxy. |
| | | int64 timestamp = 3; |
| | | int32 type = 4; |
| | | bytes proc_id = 5; |
| | | bytes topic = 6; // for request route |
| | | uint64 ssn_id = 5; // node mq id |
| | | bytes proc_id = 6; |
| | | bytes topic = 7; // for request route |
| | | } |
| | | |
| | | |
| | |
| | | |
| | | std::string NewMsgId() { return RandId(); } |
| | | |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id) |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id) |
| | | { |
| | | return InitMsgHead(type, proc_id, RandId()); |
| | | return InitMsgHead(type, proc_id, ssn_id, RandId()); |
| | | } |
| | | |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid) |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id, const std::string &msgid) |
| | | { |
| | | BHMsgHead msg; |
| | | msg.set_msg_id(msgid); |
| | | msg.set_type(type); |
| | | msg.set_proc_id(proc_id); |
| | | msg.set_ssn_id(ssn_id); |
| | | msg.set_timestamp(NowSec()); |
| | | return msg; |
| | | } |
| | |
| | | return msg; |
| | | } |
| | | std::string NewMsgId(); |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid); |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id); |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id, const std::string &msgid); |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const uint64_t ssn_id); |
| | | // inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } |
| | | inline bool IsSuccess(const ErrorCode ec) { return ec == eSuccess; } |
| | | bool IsMsgExpired(const BHMsgHead &head); |
| | |
| | | AddId(SockSub().id()); |
| | | AddId(SockPub().id()); |
| | | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { |
| | |
| | | MsgUnregister body; |
| | | body.mutable_proc()->Swap(&proc); |
| | | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { |
| | |
| | | MsgHeartbeat body; |
| | | body.mutable_proc()->Swap(&proc); |
| | | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | if (timeout_ms == 0) { |
| | |
| | | } |
| | | auto &sock = SockNode(); |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id())); |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | |
| | | MsgRegisterRPC body; |
| | | body.mutable_topics()->Swap(&topics); |
| | | |
| | | auto head(InitMsgHead(GetType(body), proc_id())); |
| | | auto head(InitMsgHead(GetType(body), proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | if (timeout_ms == 0) { |
| | |
| | | |
| | | MsgRequestTopicReply reply_body; |
| | | if (rcb(head.proc_id(), req, reply_body)) { |
| | | BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id())); |
| | | BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), ssn(), head.msg_id())); |
| | | |
| | | for (int i = 0; i < head.route_size() - 1; ++i) { |
| | | reply_head.add_route()->Swap(head.mutable_route(i)); |
| | |
| | | if (!p || p->route.empty()) { |
| | | return false; |
| | | } |
| | | BHMsgHead head(InitMsgHead(GetType(body), proc_id(), p->msg_id)); |
| | | BHMsgHead head(InitMsgHead(GetType(body), proc_id(), ssn(), p->msg_id)); |
| | | for (unsigned i = 0; i < p->route.size() - 1; ++i) { |
| | | head.add_route()->Swap(&p->route[i]); |
| | | } |
| | |
| | | |
| | | auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) { |
| | | auto &sock = SockClient(); |
| | | BHMsgHead head(InitMsgHead(GetType(req), proc_id(), msg_id)); |
| | | BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id)); |
| | | AddRoute(head, sock.id()); |
| | | head.set_topic(req.topic()); |
| | | |
| | |
| | | |
| | | BHAddress addr; |
| | | if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { |
| | | BHMsgHead head(InitMsgHead(GetType(request), proc_id())); |
| | | BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | head.set_topic(request.topic()); |
| | | |
| | |
| | | |
| | | try { |
| | | auto &sock = SockPub(); |
| | | BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); |
| | | BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | if (timeout_ms == 0) { |
| | |
| | | MsgSubscribe sub; |
| | | sub.mutable_topics()->Swap(&topics); |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(sub), proc_id())); |
| | | BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(BHTopicBusAddress(), head, sub); |
| | |
| | | void Stop(); |
| | | |
| | | private: |
| | | MQId ssn() { return SockNode().id(); } |
| | | bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms); |
| | | typedef MsgQueryTopicReply::BHNodeAddress NodeAddress; |
| | | int QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms); |
| | |
| | | MsgRequestTopic body; |
| | | body.set_topic("topic"); |
| | | body.set_data(str); |
| | | auto head(InitMsgHead(GetType(body), proc_id)); |
| | | auto head(InitMsgHead(GetType(body), proc_id, mq.Id())); |
| | | msg.Make(head, body); |
| | | assert(msg.valid()); |
| | | DEFER1(msg.Release();); |
| | |
| | | MsgRequestTopic req_body; |
| | | req_body.set_topic("topic"); |
| | | req_body.set_data(msg_content); |
| | | auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); |
| | | auto req_head(InitMsgHead(GetType(req_body), client_proc_id, cli.id())); |
| | | req_head.add_route()->set_mq_id(cli.id()); |
| | | return cli.Send(srv.id(), req_head, req_body); |
| | | }; |
| | |
| | | MsgRequestTopic reply_body; |
| | | reply_body.set_topic("topic"); |
| | | reply_body.set_data(msg_content); |
| | | auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id())); |
| | | auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id())); |
| | | return srv.Send(src_id, reply_head, reply_body); |
| | | }; |
| | | Reply(); |