/* * ===================================================================================== * * Filename: topic_node.cpp * * Description: * * Version: 1.0 * Created: 2021年04月07日 09时01分48秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "topic_node.h" #include "bh_util.h" #include "sleeper.h" #include #include using namespace std::chrono; using namespace std::chrono_literals; const char *const kErrMsgNotInit = "BHome node NOT initialized."; const char *const kErrMsgNotRegistered = "BHome node NOT registered."; namespace { bool ValidUserSymbol(const std::string &s) { return !s.empty() && s[0] != '#' && s[0] != '@'; } inline void AddRoute(BHMsgHead &head, const ShmSocket &sock) { auto route = head.add_route(); route->set_mq_id(sock.id()); route->set_abs_addr(sock.AbsAddr()); } struct SrcInfo { std::vector route; std::string msg_id; }; const int kMqLen = 700; } // namespace TopicNode::TopicNode(SharedMemory &shm, MQId ssn_id) : shm_(shm), state_(eStateUninited), ssn_id_(ssn_id) { } TopicNode::~TopicNode() { Stop(); } bool TopicNode::Init() { std::lock_guard lk(mutex_); if (Valid()) { return true; } else if (info_.proc_id().empty()) { return false; } if (ssn_id_ == 0) { ssn_id_ = NewSession(); } LOG_DEBUG() << "Node Init, id " << ssn_id_; auto NodeInit = [&]() { int64_t init_request = ssn_id_ << 4 | EncodeCmd(eCmdNodeInit); int64_t reply = 0; if (BHNodeInit(shm(), init_request, reply) && DecodeCmd(reply) == eCmdNodeInitReply) { int64_t abs_addr = reply >> 4; sockets_.emplace_back(new ShmSocket(abs_addr, shm_, ssn_id_)); LOG_DEBUG() << "node init ok"; } else { LOG_ERROR() << "Node Init Error"; } }; if (sockets_.empty()) { NodeInit(); } if (!sockets_.empty()) { auto onMsg = [this](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { LOG_DEBUG() << "node recv type: " << head.type(); switch (head.type()) { case kMsgTypeProcInit: { // reuse msg to send proc init. MsgProcInit body; body.set_extra_mq_num(eSockEnd - eSockStart - 1); auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_); AddRoute(head, socket); if (imsg.Fill(head, body)) { socket.Send(CenterAddr(), imsg); } } break; case kMsgTypeProcInitReply: { LOG_DEBUG() << "got proc init reply"; MsgProcInitReply reply; if (imsg.ParseBody(reply) && IsSuccess(reply.errmsg().errcode())) { for (auto &addr : reply.extra_mqs()) { LOG_DEBUG() << "add socket " << addr.abs_addr() << ", id:" << addr.mq_id(); sockets_.emplace_back(new ShmSocket(addr.abs_addr(), shm(), addr.mq_id())); } SetProcIndex(reply.proc_index()); this->state_ = eStateUnregistered; ServerStart(ServerAsyncCB(), 1); SubscribeStartWorker(SubDataCB(), 1); } } break; default: break; } return true; }; SockNode().Start(1, onMsg); return true; } return false; } void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) { if (!Init()) { SetLastError(eError, kErrMsgNotInit); return; } if (nworker < 1) { nworker = 1; } else if (nworker > 16) { nworker = 16; } // SockNode().Start(); ServerStart(server_cb, nworker); SubscribeStartWorker(sub_cb, nworker); ClientStartWorker(client_cb, nworker); } void TopicNode::Stop() { for (auto &p : sockets_) { p->Stop(); } } bool TopicNode::DoRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { if (!internal && !ValidUserSymbol(proc.proc_id())) { SetLastError(eInvalidInput, "invalid proc id :'" + proc.proc_id() + "'"); return false; } { std::lock_guard lk(mutex_); info_ = proc; } if (!Init()) { SetLastError(eError, kErrMsgNotInit); return false; } auto end_time = steady_clock::now() + milliseconds(timeout_ms); while (!Valid() && steady_clock::now() < end_time) { std::this_thread::yield(); } if (!Valid()) { SetLastError(eError, kErrMsgNotInit); return false; } auto &sock = SockNode(); MsgRegister body; body.mutable_proc()->Swap(&proc); auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); AddRoute(head, sock); auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { bool ok = head.type() == kMsgTypeCommonReply && msg.ParseBody(rbody) && IsSuccess(rbody.errmsg().errcode()); if (ok) { state(eStateOnline); } else { state_cas(eStateOnline, eStateOffline); } }; if (timeout_ms == 0) { auto onResult = [this, CheckResult](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { MsgCommonReply body; CheckResult(imsg, head, body); }; return sock.Send(CenterAddr(), head, body, onResult); } else { MsgI reply(shm()); DEFER1(reply.Release();); BHMsgHead reply_head; bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms); if (r) { CheckResult(reply, reply_head, reply_body); } return IsOnline(); } } bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { if (!IsOnline()) { SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } info_.Clear(); state_cas(eStateOnline, eStateOffline); auto &sock = SockNode(); MsgUnregister body; body.mutable_proc()->Swap(&proc); auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); AddRoute(head, sock); auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { bool r = head.type() == kMsgTypeCommonReply && msg.ParseBody(rbody) && IsSuccess(rbody.errmsg().errcode()); return r; }; if (timeout_ms == 0) { auto onResult = [this, CheckResult](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { MsgCommonReply body; CheckResult(imsg, head, body); }; return sock.Send(CenterAddr(), head, body, onResult); } else { MsgI reply(shm()); DEFER1(reply.Release();); BHMsgHead reply_head; bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms); return r && CheckResult(reply, reply_head, reply_body); } } bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { if (!IsOnline()) { SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } auto &sock = SockNode(); MsgHeartbeat body; body.mutable_proc()->Swap(&proc); auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); AddRoute(head, sock); if (timeout_ms == 0) { return sock.Send(CenterAddr(), head, body); } else { MsgI reply(shm()); DEFER1(reply.Release();); BHMsgHead reply_head; bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms); r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); return (r && IsSuccess(reply_body.errmsg().errcode())); } } bool TopicNode::Heartbeat(const int timeout_ms) { ProcInfo proc; proc.set_proc_id(proc_id()); MsgCommonReply reply_body; return Heartbeat(proc, reply_body, timeout_ms); } bool TopicNode::QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms) { if (!IsOnline()) { SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } auto &sock = SockNode(); BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); AddRoute(head, sock); MsgI reply(shm()); DEFER1(reply.Release()); BHMsgHead reply_head; return (sock.SendAndRecv(CenterAddr(), head, query, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeQueryTopicReply && reply.ParseBody(reply_body)); } bool TopicNode::QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms) { if (!IsOnline()) { SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } auto &sock = SockNode(); BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); AddRoute(head, sock); MsgI reply(shm()); DEFER1(reply.Release()); BHMsgHead reply_head; return (sock.SendAndRecv(CenterAddr(), head, query, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeQueryProcReply && reply.ParseBody(reply_body)); } bool TopicNode::DoServerRegisterRPC(const bool internal, MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) { if (!internal) { for (auto &&topic : topics.topic_list()) { if (!ValidUserSymbol(topic)) { SetLastError(eInvalidInput, "invalid user topic :'" + topic + "'"); return false; } } } if (!IsOnline()) { SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } auto &sock = SockServer(); MsgRegisterRPC body; body.mutable_topics()->Swap(&topics); auto head(InitMsgHead(GetType(body), proc_id(), ssn())); AddRoute(head, sock); if (timeout_ms == 0) { return sock.Send(CenterAddr(), head, body); } else { MsgI reply(shm()); DEFER1(reply.Release();); BHMsgHead reply_head; bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms); r = r && reply_head.type() == kMsgTypeCommonReply; r = r && reply.ParseBody(reply_body); return r; } } bool TopicNode::ServerStart(const ServerSyncCB &rcb, int nworker) { auto onRecv = [this, rcb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } MsgRequestTopic req; if (!imsg.ParseBody(req)) { return; } MsgRequestTopicReply reply_body; if (rcb(head.proc_id(), req, reply_body)) { BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), ssn(), head.msg_id())); for (int i = 0; i < head.route_size() - 1; ++i) { reply_head.add_route()->Swap(head.mutable_route(i)); } MQInfo remote = {head.route().rbegin()->mq_id(), head.route().rbegin()->abs_addr()}; sock.Send(remote, reply_head, reply_body); } }; auto &sock = SockServer(); return rcb && sock.Start(onRecv, nworker); } bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker) { if (acb) { auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } MsgRequestTopic req; if (!imsg.ParseBody(req)) { return; } try { SrcInfo *p = new SrcInfo; if (!p) { throw std::runtime_error("no memory."); } p->route.assign(head.route().begin(), head.route().end()); p->msg_id = head.msg_id(); acb(p, *head.mutable_proc_id(), req); } catch (std::exception &e) { LOG_ERROR() << "error server handle msg:" << e.what(); } }; return SockServer().Start(onRecv, nworker); } else { auto onRequest = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { server_buffer_->Write(std::move(head), msg.body()); }; return SockServer().Start(onRequest, nworker); } } bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) { if (!IsOnline()) { SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } BHMsgHead head; std::string body; FibUSleeper sleeper(1000 * 10); auto end_time = steady_clock::now() + milliseconds(timeout_ms); while (!server_buffer_->Read(head, body)) { if (steady_clock::now() < end_time) { sleeper.Sleep(); } else { return false; } } if (head.type() == kMsgTypeRequestTopic) { if (request.ParseFromString(body)) { head.mutable_proc_id()->swap(proc_id); try { SrcInfo *p = new SrcInfo; if (!p) { throw std::runtime_error("no memory."); } p->route.assign(head.route().begin(), head.route().end()); p->msg_id = head.msg_id(); src_info = p; return true; } catch (std::exception &e) { LOG_ERROR() << "error recv request: " << e.what(); return false; } } } return false; } bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body) { if (!IsOnline()) { SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } auto &sock = SockServer(); SrcInfo *p = static_cast(src_info); DEFER1(delete p); if (!p || p->route.empty()) { return false; } BHMsgHead head(InitMsgHead(GetType(body), proc_id(), ssn(), p->msg_id)); for (unsigned i = 0; i < p->route.size() - 1; ++i) { head.add_route()->Swap(&p->route[i]); } MQInfo dest = {p->route.back().mq_id(), p->route.back().abs_addr()}; return sock.Send(dest, head, body); } bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker) { if (!cb) { return false; } auto onData = [this, cb](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { if (head.type() == kMsgTypeRequestTopicReply) { MsgRequestTopicReply reply; if (imsg.ParseBody(reply)) { cb(head, reply); } } }; return SockClient().Start(onData, nworker); } bool TopicNode::ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb) { if (!IsOnline()) { SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } const std::string &msg_id(NewMsgId()); out_msg_id = msg_id; auto SendTo = [this, remote_addr, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) { auto &sock = SockClient(); BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id)); *head.mutable_dest() = remote_addr; AddRoute(head, sock); head.set_topic(req.topic()); if (cb) { auto onRecv = [cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { if (head.type() == kMsgTypeRequestTopicReply) { MsgRequestTopicReply reply; if (imsg.ParseBody(reply)) { cb(head, reply); } } }; return sock.Send(remote, head, req, onRecv); } else { return sock.Send(remote, head, req); } }; try { if (remote_addr.ip().empty()) { BHAddress addr; return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb); } else { return SendTo(CenterAddr(), req, cb); } } catch (...) { SetLastError(eError, "internal error."); return false; } } bool TopicNode::ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms) { if (!IsOnline()) { SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } try { auto &sock = SockClient(); MQInfo dest; if (!remote_addr.ip().empty()) { dest = CenterAddr(); } else { BHAddress addr; if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { dest.offset_ = addr.abs_addr(); dest.id_ = addr.mq_id(); } else { return false; } } BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn())); *head.mutable_dest() = remote_addr; AddRoute(head, sock); head.set_topic(request.topic()); MsgI reply_msg(shm()); DEFER1(reply_msg.Release();); BHMsgHead reply_head; if (sock.SendAndRecv(dest, head, request, reply_msg, reply_head, timeout_ms) && reply_head.type() == kMsgTypeRequestTopicReply && reply_msg.ParseBody(out_reply)) { reply_head.mutable_proc_id()->swap(out_proc_id); return true; } } catch (std::exception &e) { LOG_ERROR() << __func__ << " exception: " << e.what(); SetLastError(eError, __func__ + std::string(" internal errer.")); } return false; } int TopicNode::QueryTopicServers(const Topic &topic, std::vector &addr, const int timeout_ms) { int n = 0; MsgQueryTopic query; query.set_topic(topic); MsgQueryTopicReply rep; BHAddress dest; // empty means local. if (QueryTopicAddress(dest, query, rep, timeout_ms)) { auto &ls = rep.node_address(); n = ls.size(); for (auto &na : ls) { addr.push_back(na); } } return n; } bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms) { if (!IsOnline()) { SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } if (topic_query_cache_.Find(topic, addr)) { return true; } std::vector lst; if (QueryTopicServers(topic, lst, timeout_ms)) { addr = lst.front().addr(); if (addr.mq_id() != 0) { topic_query_cache_.Store(topic, addr); return true; } } SetLastError(eNotFound, "remote not found."); return false; } // publish bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms) { if (!IsOnline()) { SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } try { auto &sock = SockPub(); BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn())); AddRoute(head, sock); head.set_topic(pub.topic()); if (timeout_ms == 0) { return sock.Send(BusAddr(), head, pub); } else { MsgI reply(shm()); DEFER1(reply.Release();); BHMsgHead reply_head; MsgCommonReply reply_body; return sock.SendAndRecv(BusAddr(), head, pub, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body) && IsSuccess(reply_body.errmsg().errcode()); } } catch (...) { } return false; } // subscribe bool TopicNode::DoSubscribe(MsgTopicList &topics, const bool net, MsgCommonReply &reply_body, const int timeout_ms) { if (!IsOnline()) { SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } try { auto &sock = SockSub(); MsgSubscribe sub; sub.set_network(net); sub.mutable_topics()->Swap(&topics); BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn())); AddRoute(head, sock); if (timeout_ms == 0) { return sock.Send(BusAddr(), head, sub); } else { MsgI reply(shm()); DEFER1(reply.Release();); BHMsgHead reply_head; return sock.SendAndRecv(BusAddr(), head, sub, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body) && IsSuccess(reply_body.errmsg().errcode()); } } catch (...) { return false; } } bool TopicNode::SubscribeStartWorker(const SubDataCB &tdcb, int nworker) { if (tdcb) { auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { if (head.type() == kMsgTypePublish) { MsgPublish pub; if (imsg.ParseBody(pub)) { tdcb(head.proc_id(), pub); } } else { // ignored, or dropped } }; return SockSub().Start(AsyncRecvProc, nworker); } else { auto onSub = [this](ShmSocket &socket, MsgI &msg, BHMsgHead &head) { sub_buffer_->Write(std::move(head), msg.body()); }; return SockSub().Start(onSub, nworker); } } bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms) { if (!IsOnline()) { SetLastError(eNotRegistered, kErrMsgNotRegistered); return false; } BHMsgHead head; std::string body; FibUSleeper sleeper(1000 * 10); auto end_time = steady_clock::now() + milliseconds(timeout_ms); while (!sub_buffer_->Read(head, body)) { if (steady_clock::now() < end_time) { sleeper.Sleep(); } else { return false; } } if (head.type() == kMsgTypePublish) { if (pub.ParseFromString(body)) { head.mutable_proc_id()->swap(proc_id); return true; } } SetLastError(eError, "invalid subcribe msg received."); return false; }