From ae17d1439b35b55212c3a30712e0a60b1d6a99c0 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 30 六月 2021 11:15:53 +0800 Subject: [PATCH] support tcp pub/sub. --- box/center.cpp | 810 +++++---------------------------------------------------- 1 files changed, 70 insertions(+), 740 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index f140289..e0abbb3 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -16,12 +16,12 @@ * ===================================================================================== */ #include "center.h" -#include "bh_util.h" -#include "defs.h" -#include "log.h" -#include "shm.h" +#include "center_topic_node.h" +#include "io_service.h" +#include "node_center.h" +#include "tcp_proxy.h" +#include "tcp_server.h" #include <chrono> -#include <unordered_map> using namespace std::chrono; using namespace std::chrono_literals; @@ -33,690 +33,6 @@ namespace { -typedef std::string ProcId; -typedef size_t ProcIndex; // max local procs. -const int kMaxProcs = 65536; - -// record all procs ever registered, always grow, never remove. -// mainly for node to request msg allocation. -// use index instead of MQId to save some bits. -class ProcRecords -{ -public: - struct ProcRec { - ProcId proc_; - MQId ssn_ = 0; - }; - - ProcRecords() { procs_.reserve(kMaxProcs); } - - ProcIndex Put(const ProcId &proc_id, const MQId ssn) - { - if (procs_.size() >= kMaxProcs) { - return -1; - } - auto pos_isnew = proc_index_.emplace(proc_id, procs_.size()); - int index = pos_isnew.first->second; - if (pos_isnew.second) { - procs_.emplace_back(ProcRec{proc_id, ssn}); - } else { // update ssn - procs_[index].ssn_ = ssn; - } - return index; - } - const ProcRec &Get(const ProcIndex index) const - { - static ProcRec empty_rec; - return (index < procs_.size()) ? procs_[index] : empty_rec; - } - -private: - std::unordered_map<ProcId, size_t> proc_index_; - std::vector<ProcRec> procs_; -}; - -class MsgRecords -{ - typedef int64_t MsgId; - typedef int64_t Offset; - -public: - void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); } - void FreeMsg(MsgId id) - { - auto pos = msgs_.find(id); - if (pos != msgs_.end()) { - ShmMsg(pos->second).Free(); - msgs_.erase(pos); - } else { - LOG_TRACE() << "ignore late free request."; - } - } - void AutoRemove() - { - auto now = NowSec(); - if (now < time_to_clean_) { - return; - } - // LOG_FUNCTION; - time_to_clean_ = now + 1; - int64_t limit = std::max(10000ul, msgs_.size() / 10); - int64_t n = 0; - auto it = msgs_.begin(); - while (it != msgs_.end() && --limit > 0) { - ShmMsg msg(it->second); - auto Free = [&]() { - msg.Free(); - it = msgs_.erase(it); - ++n; - }; - int n = now - msg.timestamp(); - if (n < 10) { - ++it; - } else if (msg.Count() == 0) { - Free(); - } else if (n > 60) { - Free(); - } - } - if (n > 0) { - LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n; - } - } - size_t size() const { return msgs_.size(); } - void DebugPrint() const - { - LOG_DEBUG() << "msgs : " << size(); - int i = 0; - int total_count = 0; - for (auto &kv : msgs_) { - MsgI msg(kv.second); - total_count += msg.Count(); - LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size(); - } - LOG_DEBUG() << "total count: " << total_count; - } - -private: - std::unordered_map<MsgId, Offset> msgs_; - int64_t time_to_clean_ = 0; -}; - -//TODO check proc_id -class NodeCenter -{ -public: - typedef MQId Address; - typedef bhome_msg::ProcInfo ProcInfo; - typedef std::function<void(Address const)> Cleaner; - -private: - enum { - kStateInvalid, - kStateNormal, - kStateOffline, - kStateKillme, - }; - - struct ProcState { - int64_t timestamp_ = 0; - uint32_t flag_ = 0; // reserved - void PutOffline(const int64_t offline_time) - { - timestamp_ = NowSec() - offline_time; - flag_ = kStateOffline; - } - void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) - { - auto diff = now - timestamp_; - LOG_DEBUG() << "state " << this << " diff: " << diff; - if (diff < offline_time) { - flag_ = kStateNormal; - } else if (diff < kill_time) { - flag_ = kStateOffline; - } else { - flag_ = kStateKillme; - } - } - }; - typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; - - struct NodeInfo { - ProcState state_; // state - std::map<MQId, int64_t> addrs_; // registered mqs - ProcInfo proc_; // - AddressTopics services_; // address: topics - AddressTopics subscriptions_; // address: topics - }; - typedef std::shared_ptr<NodeInfo> Node; - typedef std::weak_ptr<NodeInfo> WeakNode; - - struct TopicDest { - MQId mq_id_; - int64_t mq_abs_addr_; - WeakNode weak_node_; - bool operator<(const TopicDest &a) const { return mq_id_ < a.mq_id_; } - }; - inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } - inline int64_t SrcAbsAddr(const BHMsgHead &head) { return head.route(0).abs_addr(); } - inline bool MatchAddr(std::map<Address, int64_t> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); } - - NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) : - id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(0) {} - -public: - typedef std::set<TopicDest> Clients; - - NodeCenter(const std::string &id, const Cleaner &cleaner, const steady_clock::duration offline_time, const steady_clock::duration kill_time) : - NodeCenter(id, cleaner, duration_cast<seconds>(offline_time).count(), duration_cast<seconds>(kill_time).count()) {} - - // center name, no relative to shm. - const std::string &id() const { return id_; } - int64_t OnNodeInit(ShmSocket &socket, const int64_t val) - { - LOG_FUNCTION; - SharedMemory &shm = socket.shm(); - MQId ssn = (val >> 4) & MaskBits(56); - int reply = EncodeCmd(eCmdNodeInitReply); - - if (nodes_.find(ssn) != nodes_.end()) { - return reply; // ignore if exists. - } - - auto UpdateRegInfo = [&](Node &node) { - node->state_.timestamp_ = NowSec() - offline_time_; - node->state_.UpdateState(NowSec(), offline_time_, kill_time_); - - // create sockets. - try { - ShmSocket tmp(shm, true, ssn, 16); - node->addrs_.emplace(ssn, tmp.AbsAddr()); - return true; - } catch (...) { - return false; - } - }; - - auto PrepareProcInit = [&](Node &node) { - bool r = false; - ShmMsg init_msg; - DEFER1(init_msg.Release()); - MsgProcInit body; - auto head = InitMsgHead(GetType(body), id(), ssn); - return init_msg.Make(GetAllocSize(CalcAllocIndex(900))) && - init_msg.Fill(ShmMsg::Serialize(head, body)) && - SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg); - }; - - Node node(new NodeInfo); - if (UpdateRegInfo(node) && PrepareProcInit(node)) { - reply |= (node->addrs_[ssn] << 4); - nodes_[ssn] = node; - LOG_INFO() << "new node ssn (" << ssn << ") init"; - } else { - ShmSocket::Remove(shm, ssn); - } - return reply; - } - void RecordMsg(const MsgI &msg) - { - msg.reset_managed(true); - msgs_.RecordMsg(msg); - } - - bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg) - { - RecordMsg(msg); - auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); }; - return socket.Send(dest, reply, onExpireFree); - } - bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg) - { - RecordMsg(msg); - return socket.Send(dest, msg); - } - - void OnAlloc(ShmSocket &socket, const int64_t val) - { - // LOG_FUNCTION; - // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag - int64_t msg_id = (val >> 4) & MaskBits(28); - int proc_index = (val >> 32) & MaskBits(16); - int socket_index = ((val) >> 48) & MaskBits(4); - auto proc_rec(procs_.Get(proc_index)); - if (proc_rec.proc_.empty()) { - return; - } - - MQInfo dest = {proc_rec.ssn_ + socket_index, 0}; - auto FindMq = [&]() { - auto pos = nodes_.find(proc_rec.ssn_); - if (pos != nodes_.end()) { - for (auto &&mq : pos->second->addrs_) { - if (mq.first == dest.id_) { - dest.offset_ = mq.second; - return true; - } - } - } - return false; - }; - if (!FindMq()) { return; } - - auto size = GetAllocSize((val >> 52) & MaskBits(8)); - MsgI new_msg; - if (new_msg.Make(size)) { - // 31bit proc index, 28bit id, ,4bit cmd+flag - int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0); - SendAllocReply(socket, dest, reply, new_msg); - } else { - int64_t reply = (msg_id << 4) | EncodeCmd(eCmdAllocReply0); // send empty, ack failure. - socket.Send(dest, reply); - } - } - - void OnFree(ShmSocket &socket, const int64_t val) - { - int64_t msg_id = (val >> 4) & MaskBits(31); - msgs_.FreeMsg(msg_id); - } - - bool OnCommand(ShmSocket &socket, const int64_t val) - { - assert(IsCmd(val)); - int cmd = DecodeCmd(val); - switch (cmd) { - case eCmdAllocRequest0: OnAlloc(socket, val); break; - case eCmdFree: OnFree(socket, val); break; - default: return false; - } - return true; - } - - MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg) - { - LOG_DEBUG() << "center got proc init."; - auto pos = nodes_.find(head.ssn_id()); - if (pos == nodes_.end()) { - return MakeReply<MsgProcInitReply>(eNotFound, "Node Not Initialised"); - } - auto index = procs_.Put(head.proc_id(), head.ssn_id()); - auto reply(MakeReply<MsgProcInitReply>(eSuccess)); - reply.set_proc_index(index); - - auto &node = pos->second; - try { - for (int i = 0; i < msg.extra_mq_num(); ++i) { - ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16); - node->addrs_.emplace(tmp.id(), tmp.AbsAddr()); - auto addr = reply.add_extra_mqs(); - addr->set_mq_id(tmp.id()); - addr->set_abs_addr(tmp.AbsAddr()); - } - return reply; - } catch (...) { - LOG_ERROR() << "proc init create mq error"; - return MakeReply<MsgProcInitReply>(eError, "Create mq failed."); - } - } - - MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) - { - if (msg.proc().proc_id() != head.proc_id()) { - return MakeReply(eInvalidInput, "invalid proc id."); - } - - try { - MQId ssn = head.ssn_id(); - // when node restart, ssn will change, - // and old node will be removed after timeout. - auto UpdateRegInfo = [&](Node &node) { - node->proc_.Swap(msg.mutable_proc()); - node->state_.timestamp_ = head.timestamp(); - node->state_.UpdateState(NowSec(), offline_time_, kill_time_); - }; - - auto pos = nodes_.find(ssn); - if (pos == nodes_.end()) { - return MakeReply(eInvalidInput, "invalid session."); - } - - // update proc info - Node &node = pos->second; - UpdateRegInfo(node); - LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")"; - - auto old = online_node_addr_map_.find(head.proc_id()); - if (old != online_node_addr_map_.end()) { // old session - auto &old_ssn = old->second; - if (old_ssn != ssn) { - nodes_[old_ssn]->state_.PutOffline(offline_time_); - LOG_DEBUG() << "put node (" << nodes_[old_ssn]->proc_.proc_id() << ") ssn (" << old->second << ") offline"; - old_ssn = ssn; - } - } else { - online_node_addr_map_.emplace(head.proc_id(), ssn); - } - return MakeReply(eSuccess); - } catch (...) { - return MakeReply(eError, "register node error."); - } - } - - template <class Reply, class Func> - Reply HandleMsg(const BHMsgHead &head, Func const &op) - { - try { - auto pos = nodes_.find(head.ssn_id()); - if (pos == nodes_.end()) { - return MakeReply<Reply>(eNotRegistered, "Node is not registered."); - } else { - auto &node = pos->second; - if (!MatchAddr(node->addrs_, SrcAddr(head))) { - return MakeReply<Reply>(eAddressNotMatch, "Node address error."); - } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) { - return op(node); - } else if (!Valid(*node)) { - return MakeReply<Reply>(eNoRespond, "Node is not alive."); - } else { - return op(node); - } - } - } catch (...) { - //TODO error log - return MakeReply<Reply>(eError, "internal error."); - } - } - template <class Func> - inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op) - { - return HandleMsg<MsgCommonReply, Func>(head, op); - } - - MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg) - { - return HandleMsg( - head, [&](Node node) -> MsgCommonReply { - NodeInfo &ni = *node; - ni.state_.PutOffline(offline_time_); - return MakeReply(eSuccess); - }); - } - - MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg) - { - return HandleMsg( - head, [&](Node node) -> MsgCommonReply { - auto src = SrcAddr(head); - auto &topics = msg.topics().topic_list(); - node->services_[src].insert(topics.begin(), topics.end()); - TopicDest dest = {src, SrcAbsAddr(head), node}; - for (auto &topic : topics) { - service_map_[topic].insert(dest); - } - LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << node->addrs_.begin()->first << " serve " << topics.size() << " topics:\n"; - for (auto &topic : topics) { - LOG_DEBUG() << "\t" << topic; - } - return MakeReply(eSuccess); - }); - } - - MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg) - { - return HandleMsg(head, [&](Node node) { - NodeInfo &ni = *node; - ni.state_.timestamp_ = head.timestamp(); - ni.state_.UpdateState(NowSec(), offline_time_, kill_time_); - - auto &info = msg.proc(); - if (!info.public_info().empty()) { - ni.proc_.set_public_info(info.public_info()); - } - if (!info.private_info().empty()) { - ni.proc_.set_private_info(info.private_info()); - } - return MakeReply(eSuccess); - }); - } - MsgQueryProcReply QueryProc(const BHMsgHead &head, const MsgQueryProc &req) - { - typedef MsgQueryProcReply Reply; - auto query = [&](Node self) -> Reply { - auto Add1 = [](Reply &reply, Node node) { - auto info = reply.add_proc_list(); - *info->mutable_proc() = node->proc_; - info->set_online(node->state_.flag_ == kStateNormal); - for (auto &addr_topics : node->services_) { - for (auto &topic : addr_topics.second) { - info->mutable_topics()->add_topic_list(topic); - } - } - }; - - if (!req.proc_id().empty()) { - auto pos = online_node_addr_map_.find(req.proc_id()); - if (pos == online_node_addr_map_.end()) { - return MakeReply<Reply>(eNotFound, "proc not found."); - } else { - auto node_pos = nodes_.find(pos->second); - if (node_pos == nodes_.end()) { - return MakeReply<Reply>(eNotFound, "proc node not found."); - } else { - auto reply = MakeReply<Reply>(eSuccess); - Add1(reply, node_pos->second); - return reply; - } - } - } else { - Reply reply(MakeReply<Reply>(eSuccess)); - for (auto &kv : nodes_) { - Add1(reply, kv.second); - } - return reply; - } - }; - - return HandleMsg<Reply>(head, query); - } - - MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req) - { - typedef MsgQueryTopicReply Reply; - - auto query = [&](Node self) -> Reply { - auto pos = service_map_.find(req.topic()); - if (pos != service_map_.end() && !pos->second.empty()) { - auto &clients = pos->second; - Reply reply = MakeReply<Reply>(eSuccess); - for (auto &dest : clients) { - Node dest_node(dest.weak_node_.lock()); - if (dest_node && Valid(*dest_node)) { - auto node_addr = reply.add_node_address(); - node_addr->set_proc_id(dest_node->proc_.proc_id()); - node_addr->mutable_addr()->set_mq_id(dest.mq_id_); - node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); - } - } - return reply; - } else { - return MakeReply<Reply>(eNotFound, "topic server not found."); - } - }; - - return HandleMsg<Reply>(head, query); - } - - MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) - { - return HandleMsg(head, [&](Node node) { - auto src = SrcAddr(head); - auto &topics = msg.topics().topic_list(); - node->subscriptions_[src].insert(topics.begin(), topics.end()); - TopicDest dest = {src, SrcAbsAddr(head), node}; - for (auto &topic : topics) { - subscribe_map_[topic].insert(dest); - } - return MakeReply(eSuccess); - }); - } - MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) - { - return HandleMsg(head, [&](Node node) { - auto src = SrcAddr(head); - auto pos = node->subscriptions_.find(src); - - auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) { - auto pos = subscribe_map_.find(topic); - if (pos != subscribe_map_.end() && - pos->second.erase(dest) != 0 && - pos->second.empty()) { - subscribe_map_.erase(pos); - } - }; - - if (pos != node->subscriptions_.end()) { - const TopicDest &dest = {src, SrcAbsAddr(head), node}; - auto &topics = msg.topics().topic_list(); - // clear node sub records; - for (auto &topic : topics) { - pos->second.erase(topic); - RemoveSubTopicDestRecord(topic, dest); - } - if (pos->second.empty()) { - node->subscriptions_.erase(pos); - } - } - return MakeReply(eSuccess); - }); - } - - Clients DoFindClients(const std::string &topic) - { - Clients dests; - auto Find1 = [&](const std::string &t) { - auto pos = subscribe_map_.find(topic); - if (pos != subscribe_map_.end()) { - auto &clients = pos->second; - for (auto &cli : clients) { - if (Valid(cli.weak_node_)) { - dests.insert(cli); - } - } - } - }; - Find1(topic); - - size_t pos = 0; - while (true) { - pos = topic.find(kTopicSep, pos); - if (pos == topic.npos || ++pos == topic.size()) { - // Find1(std::string()); // sub all. - break; - } else { - Find1(topic.substr(0, pos)); - } - } - return dests; - } - bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply) - { - bool ret = false; - HandleMsg(head, [&](Node node) { - DoFindClients(msg.topic()).swap(out); - ret = true; - return MakeReply(eSuccess); - }).Swap(&reply); - return ret; - } - - void OnTimer() - { - CheckNodes(); - msgs_.AutoRemove(); - } - -private: - void CheckNodes() - { - auto now = NowSec(); - if (now <= last_check_time_) { return; } - last_check_time_ = now; - - auto it = nodes_.begin(); - while (it != nodes_.end()) { - auto &cli = *it->second; - cli.state_.UpdateState(now, offline_time_, kill_time_); - if (cli.state_.flag_ == kStateKillme) { - RemoveNode(it->second); - it = nodes_.erase(it); - } else { - ++it; - } - } - msgs_.DebugPrint(); - } - bool CanHeartbeat(const NodeInfo &node) - { - return Valid(node) || node.state_.flag_ == kStateOffline; - } - bool Valid(const NodeInfo &node) - { - return node.state_.flag_ == kStateNormal; - } - bool Valid(const WeakNode &weak) - { - auto node = weak.lock(); - return node && Valid(*node); - } - void RemoveNode(Node &node) - { - auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) { - for (auto &addr_topics : node_rec) { - TopicDest dest{addr_topics.first, 0, node}; // abs_addr is not used. - for (auto &topic : addr_topics.second) { - auto pos = rec_map.find(topic); - if (pos != rec_map.end()) { - pos->second.erase(dest); - if (pos->second.empty()) { - rec_map.erase(pos); - } - } - } - } - }; - EraseMapRec(service_map_, node->services_); - EraseMapRec(subscribe_map_, node->subscriptions_); - - // remove online record. - auto pos = online_node_addr_map_.find(node->proc_.proc_id()); - if (pos != online_node_addr_map_.end()) { - if (node->addrs_.find(pos->second) != node->addrs_.end()) { - online_node_addr_map_.erase(pos); - } - } - - for (auto &addr : node->addrs_) { - cleaner_(addr.first); - } - - node->addrs_.clear(); - } - std::string id_; // center proc id; - - std::unordered_map<Topic, Clients> service_map_; - std::unordered_map<Topic, Clients> subscribe_map_; - std::unordered_map<Address, Node> nodes_; - std::unordered_map<ProcId, Address> online_node_addr_map_; - ProcRecords procs_; // To get a short index for msg alloc. - MsgRecords msgs_; // record all msgs alloced. - - Cleaner cleaner_; // remove mqs. - int64_t offline_time_; - int64_t kill_time_; - int64_t last_check_time_; -}; - template <class Body, class OnMsg, class Replyer> inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer) { @@ -725,18 +41,6 @@ if (msg.ParseBody(body)) { replyer(onmsg(body)); } -} - -Handler Combine(const Handler &h1, const Handler &h2) -{ - return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) { - return h1(socket, msg, head) || h2(socket, msg, head); - }; -} -template <class... H> -Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest) -{ - return Combine(Combine(h0, h1), h2, rest...); } #define CASE_ON_MSG_TYPE(MsgTag) \ @@ -750,7 +54,7 @@ return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()}; - MsgI msg; + MsgI msg(socket.shm()); if (msg.Make(reply_head, rep_body)) { DEFER1(msg.Release();); center->SendAllocMsg(socket, remote, msg); @@ -758,7 +62,7 @@ }; } -bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) +bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy) { // command auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { @@ -772,13 +76,49 @@ auto onInit = [&](const int64_t request) { return center->OnNodeInit(socket, request); }; - BHCenterHandleInit(onInit); + BHCenterHandleInit(socket.shm(), onInit); center->OnTimer(); }; - auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { + auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center); + + if (!head.dest().ip().empty()) { // other host, proxy + auto valid = [&]() { return head.route_size() == 1; }; + if (!valid()) { return false; } + + if (head.type() == kMsgTypeRequestTopic) { + typedef MsgRequestTopicReply Reply; + Reply reply; + if (!center->CheckMsg(head, reply)) { + replyer(reply); + } else { + auto onResult = [¢er](BHMsgHead &head, std::string body_content) { + if (head.route_size() > 0) { + auto &back = head.route(head.route_size() - 1); + MQInfo dest = {back.mq_id(), back.abs_addr()}; + head.mutable_route()->RemoveLast(); + center->PassRemoteReplyToLocal(dest, head, std::move(body_content)); + } + }; + uint16_t port = head.dest().port(); + if (port == 0) { + port = kBHCenterPort; + } + if (!tcp_proxy.Request(head.dest().ip(), port, msg.content(), onResult)) { + replyer(MakeReply<Reply>(eError, "send request failed.")); + } else { + // success + } + } + return true; + } else { + // ignore other msgs for now. + } + return false; + } + switch (head.type()) { CASE_ON_MSG_TYPE(ProcInit); CASE_ON_MSG_TYPE(Register); @@ -791,38 +131,23 @@ default: return false; } }; - BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000); + BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000); auto OnBusIdle = [=](ShmSocket &socket) {}; auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; - auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { + auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; auto replyer = MakeReplyer(socket, head, center); auto OnPublish = [&]() { MsgPublish pub; - NodeCenter::Clients clients; - MsgCommonReply reply; - if (head.route_size() != 1 || !msg.ParseBody(pub)) { - return; - } else if (!center->FindClients(head, pub, clients, reply)) { + if (head.route_size() == 1 && msg.ParseBody(pub)) { + // replyer(center->Publish(head, pub.topic(), msg)); // dead lock? + auto reply(center->Publish(head, pub.topic(), msg)); replyer(reply); - } else { - replyer(MakeReply(eSuccess)); - if (clients.empty()) { return; } - - auto it = clients.begin(); - do { - auto &cli = *it; - auto node = cli.weak_node_.lock(); - if (node) { - // should also make sure that mq is not killed before msg expires. - // it would be ok if (kill_time - offline_time) is longer than expire time. - socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); - ++it; - } else { - it = clients.erase(it); - } - } while (it != clients.end()); + auto hosts = center->FindRemoteSubClients(pub.topic()); + for (auto &host : hosts) { + tcp_proxy.Publish(host, kBHCenterPort, msg.content()); + } } }; switch (head.type()) { @@ -833,7 +158,7 @@ } }; - BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000); + BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000); return true; } @@ -856,22 +181,23 @@ BHCenter::BHCenter(Socket::Shm &shm) { - auto gc = [&](const MQId id) { - auto r = ShmSocket::Remove(shm, id); - if (r) { - LOG_DEBUG() << "remove mq " << id << " ok\n"; - } - }; + auto nsec = NodeTimeoutSec(); + auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. + io_service_.reset(new IoService); + tcp_proxy_.reset(new TcpProxy(io_service_->io())); - auto nsec = seconds(NodeTimeoutSec()); - auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. - AddCenter(center_ptr); + AddCenter(center_ptr, shm, *tcp_proxy_); for (auto &kv : Centers()) { auto &info = kv.second; sockets_[info.name_] = std::make_shared<ShmSocket>(info.mq_.offset_, shm, info.mq_.id_); } + + topic_node_.reset(new CenterTopicNode(center_ptr, shm)); + tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr)); } + +BHCenter::~BHCenter() { Stop(); } bool BHCenter::Start() { @@ -879,12 +205,16 @@ auto &info = kv.second; sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); } - + topic_node_->Start(); return true; } bool BHCenter::Stop() { + tcp_proxy_.reset(); + tcp_server_.reset(); + io_service_.reset(); + topic_node_->Stop(); for (auto &kv : sockets_) { kv.second->Stop(); } -- Gitblit v1.8.0