| | |
| | | */ |
| | | #include "topic_node.h" |
| | | #include "bh_util.h" |
| | | #include "sleeper.h" |
| | | #include <chrono> |
| | | #include <list> |
| | | |
| | |
| | | |
| | | TopicNode::~TopicNode() |
| | | { |
| | | LOG_DEBUG() << "~TopicNode()"; |
| | | Stop(); |
| | | } |
| | | |
| | |
| | | } |
| | | SetProcIndex(reply.proc_index()); |
| | | this->state_ = eStateUnregistered; |
| | | auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { |
| | | server_buffer_->Write(std::move(head), msg.body()); |
| | | }; |
| | | SockServer().Start(onRequest); |
| | | auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { |
| | | sub_buffer_->Write(std::move(head), msg.body()); |
| | | }; |
| | | SockSub().Start(onSub); |
| | | |
| | | ServerStart(ServerAsyncCB(), 1); |
| | | SubscribeStartWorker(SubDataCB(), 1); |
| | | } |
| | | } break; |
| | | default: break; |
| | |
| | | } |
| | | void TopicNode::Stop() |
| | | { |
| | | LOG_DEBUG() << "Node Stopping"; |
| | | for (auto &p : sockets_) { p->Stop(); } |
| | | LOG_INFO() << "Node Stopped"; |
| | | } |
| | | |
| | | bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | |
| | | reply.ParseBody(reply_body)); |
| | | } |
| | | |
| | | bool TopicNode::QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | auto &sock = SockNode(); |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); |
| | | AddRoute(head, sock); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release()); |
| | | BHMsgHead reply_head; |
| | | return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeQueryProcReply && |
| | | reply.ParseBody(reply_body)); |
| | | } |
| | | |
| | | bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | |
| | | } |
| | | BHMsgHead head; |
| | | std::string body; |
| | | FibUSleeper sleeper(1000 * 10); |
| | | auto end_time = steady_clock::now() + milliseconds(timeout_ms); |
| | | while (!server_buffer_->Read(head, body)) { |
| | | if (steady_clock::now() < end_time) { |
| | | robust::QuickSleep(); |
| | | sleeper.Sleep(); |
| | | } else { |
| | | return false; |
| | | } |
| | |
| | | |
| | | BHMsgHead head; |
| | | std::string body; |
| | | FibUSleeper sleeper(1000 * 10); |
| | | auto end_time = steady_clock::now() + milliseconds(timeout_ms); |
| | | while (!sub_buffer_->Read(head, body)) { |
| | | if (steady_clock::now() < end_time) { |
| | | robust::QuickSleep(); |
| | | sleeper.Sleep(); |
| | | } else { |
| | | return false; |
| | | } |