From ae17d1439b35b55212c3a30712e0a60b1d6a99c0 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 30 六月 2021 11:15:53 +0800 Subject: [PATCH] support tcp pub/sub. --- box/center.cpp | 572 +++++++++----------------------------------------------- 1 files changed, 93 insertions(+), 479 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index aa6f285..e0abbb3 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -16,11 +16,12 @@ * ===================================================================================== */ #include "center.h" -#include "bh_util.h" -#include "defs.h" -#include "shm.h" +#include "center_topic_node.h" +#include "io_service.h" +#include "node_center.h" +#include "tcp_proxy.h" +#include "tcp_server.h" #include <chrono> -#include <set> using namespace std::chrono; using namespace std::chrono_literals; @@ -32,409 +33,6 @@ namespace { -//TODO check proc_id -class NodeCenter -{ -public: - typedef std::string ProcId; - typedef MQId Address; - typedef bhome_msg::ProcInfo ProcInfo; - typedef std::function<void(Address const)> Cleaner; - -private: - enum { - kStateInvalid, - kStateNormal, - kStateOffline, - kStateKillme, - }; - - struct ProcState { - int64_t timestamp_ = 0; - uint32_t flag_ = 0; // reserved - void PutOffline(const int64_t offline_time) - { - timestamp_ = NowSec() - offline_time; - flag_ = kStateOffline; - } - void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) - { - auto diff = now - timestamp_; -#ifndef NDEBUG - printf("state %p diff: %ld\n", this, diff); -#endif - if (diff < offline_time) { - flag_ = kStateNormal; - } else if (diff < kill_time) { - flag_ = kStateOffline; - } else { - flag_ = kStateKillme; - } - } - }; - typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; - - struct NodeInfo { - ProcState state_; // state - std::set<Address> addrs_; // registered mqs - ProcInfo proc_; // - AddressTopics services_; // address: topics - AddressTopics subscriptions_; // address: topics - }; - typedef std::shared_ptr<NodeInfo> Node; - typedef std::weak_ptr<NodeInfo> WeakNode; - - struct TopicDest { - Address mq_; - WeakNode weak_node_; - bool operator<(const TopicDest &a) const { return mq_ < a.mq_; } - }; - inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } - inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); } - - NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) : - id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(0) {} - -public: - typedef std::set<TopicDest> Clients; - - NodeCenter(const std::string &id, const Cleaner &cleaner, const steady_clock::duration offline_time, const steady_clock::duration kill_time) : - NodeCenter(id, cleaner, duration_cast<seconds>(offline_time).count(), duration_cast<seconds>(kill_time).count()) {} - - // center name, no relative to shm. - const std::string &id() const { return id_; } - void OnNodeInit(const int64_t msg) - { - MQId ssn = msg; - auto UpdateRegInfo = [&](Node &node) { - for (int i = 0; i < 10; ++i) { - node->addrs_.insert(ssn + i); - } - node->state_.timestamp_ = NowSec() - offline_time_; - node->state_.UpdateState(NowSec(), offline_time_, kill_time_); - }; - - Node node(new NodeInfo); - UpdateRegInfo(node); - nodes_[ssn] = node; - printf("new node ssn (%ld) init\n", ssn); - } - MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) - { - if (msg.proc().proc_id() != head.proc_id()) { - return MakeReply(eInvalidInput, "invalid proc id."); - } - - try { - MQId ssn = head.ssn_id(); - // when node restart, ssn will change, - // and old node will be removed after timeout. - auto UpdateRegInfo = [&](Node &node) { - node->addrs_.insert(SrcAddr(head)); - for (auto &addr : msg.addrs()) { - node->addrs_.insert(addr.mq_id()); - } - node->proc_.Swap(msg.mutable_proc()); - node->state_.timestamp_ = head.timestamp(); - node->state_.UpdateState(NowSec(), offline_time_, kill_time_); - }; - - auto pos = nodes_.find(ssn); - if (pos != nodes_.end()) { // update - Node &node = pos->second; - UpdateRegInfo(node); - } else { - Node node(new NodeInfo); - UpdateRegInfo(node); - nodes_[ssn] = node; - } - printf("node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn); - - auto old = online_node_addr_map_.find(head.proc_id()); - if (old != online_node_addr_map_.end()) { // old session - auto &old_ssn = old->second; - if (old_ssn != ssn) { - nodes_[old_ssn]->state_.PutOffline(offline_time_); - printf("put node (%s) ssn (%ld) offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second); - old_ssn = ssn; - } - } else { - online_node_addr_map_.emplace(head.proc_id(), ssn); - } - return MakeReply(eSuccess); - } catch (...) { - return MakeReply(eError, "register node error."); - } - } - - template <class Reply, class Func> - Reply HandleMsg(const BHMsgHead &head, Func const &op) - { - try { - auto pos = nodes_.find(head.ssn_id()); - if (pos == nodes_.end()) { - return MakeReply<Reply>(eNotRegistered, "Node is not registered."); - } else { - auto &node = pos->second; - if (!MatchAddr(node->addrs_, SrcAddr(head))) { - return MakeReply<Reply>(eAddressNotMatch, "Node address error."); - } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) { - return op(node); - } else if (!Valid(*node)) { - return MakeReply<Reply>(eNoRespond, "Node is not alive."); - } else { - return op(node); - } - } - } catch (...) { - //TODO error log - return MakeReply<Reply>(eError, "internal error."); - } - } - template <class Func> - inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op) - { - return HandleMsg<MsgCommonReply, Func>(head, op); - } - - MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg) - { - return HandleMsg( - head, [&](Node node) -> MsgCommonReply { - NodeInfo &ni = *node; - ni.state_.PutOffline(offline_time_); - return MakeReply(eSuccess); - }); - } - - MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg) - { - return HandleMsg( - head, [&](Node node) -> MsgCommonReply { - auto src = SrcAddr(head); - auto &topics = msg.topics().topic_list(); - node->services_[src].insert(topics.begin(), topics.end()); - TopicDest dest = {src, node}; - for (auto &topic : topics) { - service_map_[topic].insert(dest); - } - printf("node %s ssn %ld serve %d topics:\n", node->proc_.proc_id().c_str(), *node->addrs_.begin(), topics.size()); - for (auto &topic : topics) { - printf("\t %s\n", topic.c_str()); - } - return MakeReply(eSuccess); - }); - } - - MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg) - { - return HandleMsg(head, [&](Node node) { - NodeInfo &ni = *node; - ni.state_.timestamp_ = head.timestamp(); - ni.state_.UpdateState(NowSec(), offline_time_, kill_time_); - - auto &info = msg.proc(); - if (!info.public_info().empty()) { - ni.proc_.set_public_info(info.public_info()); - } - if (!info.private_info().empty()) { - ni.proc_.set_private_info(info.private_info()); - } - return MakeReply(eSuccess); - }); - } - - MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req) - { - typedef MsgQueryTopicReply Reply; - - auto query = [&](Node self) -> MsgQueryTopicReply { - auto pos = service_map_.find(req.topic()); - if (pos != service_map_.end() && !pos->second.empty()) { - auto &clients = pos->second; - Reply reply = MakeReply<Reply>(eSuccess); - for (auto &dest : clients) { - Node dest_node(dest.weak_node_.lock()); - if (dest_node && Valid(*dest_node)) { - auto node_addr = reply.add_node_address(); - node_addr->set_proc_id(dest_node->proc_.proc_id()); - node_addr->mutable_addr()->set_mq_id(dest.mq_); - } - } - return reply; - } else { - return MakeReply<Reply>(eNotFound, "topic server not found."); - } - }; - - return HandleMsg<Reply>(head, query); - } - - MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) - { - return HandleMsg(head, [&](Node node) { - auto src = SrcAddr(head); - auto &topics = msg.topics().topic_list(); - node->subscriptions_[src].insert(topics.begin(), topics.end()); - TopicDest dest = {src, node}; - for (auto &topic : topics) { - subscribe_map_[topic].insert(dest); - } - return MakeReply(eSuccess); - }); - } - MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) - { - return HandleMsg(head, [&](Node node) { - auto src = SrcAddr(head); - auto pos = node->subscriptions_.find(src); - - auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) { - auto pos = subscribe_map_.find(topic); - if (pos != subscribe_map_.end() && - pos->second.erase(dest) != 0 && - pos->second.empty()) { - subscribe_map_.erase(pos); - } - }; - - if (pos != node->subscriptions_.end()) { - const TopicDest &dest = {src, node}; - auto &topics = msg.topics().topic_list(); - // clear node sub records; - for (auto &topic : topics) { - pos->second.erase(topic); - RemoveSubTopicDestRecord(topic, dest); - } - if (pos->second.empty()) { - node->subscriptions_.erase(pos); - } - } - return MakeReply(eSuccess); - }); - } - - Clients DoFindClients(const std::string &topic) - { - Clients dests; - auto Find1 = [&](const std::string &t) { - auto pos = subscribe_map_.find(topic); - if (pos != subscribe_map_.end()) { - auto &clients = pos->second; - for (auto &cli : clients) { - if (Valid(cli.weak_node_)) { - dests.insert(cli); - } - } - } - }; - Find1(topic); - - size_t pos = 0; - while (true) { - pos = topic.find(kTopicSep, pos); - if (pos == topic.npos || ++pos == topic.size()) { - // Find1(std::string()); // sub all. - break; - } else { - Find1(topic.substr(0, pos)); - } - } - return dests; - } - bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply) - { - bool ret = false; - HandleMsg(head, [&](Node node) { - DoFindClients(msg.topic()).swap(out); - ret = true; - return MakeReply(eSuccess); - }).Swap(&reply); - return ret; - } - - void OnTimer() - { - CheckNodes(); - } - -private: - void CheckNodes() - { - auto now = NowSec(); - if (now - last_check_time_ < 1) { return; } - last_check_time_ = now; - - auto it = nodes_.begin(); - while (it != nodes_.end()) { - auto &cli = *it->second; - cli.state_.UpdateState(now, offline_time_, kill_time_); - if (cli.state_.flag_ == kStateKillme) { - RemoveNode(it->second); - it = nodes_.erase(it); - } else { - ++it; - } - } - } - bool CanHeartbeat(const NodeInfo &node) - { - return Valid(node) || node.state_.flag_ == kStateOffline; - } - bool Valid(const NodeInfo &node) - { - return node.state_.flag_ == kStateNormal; - } - bool Valid(const WeakNode &weak) - { - auto node = weak.lock(); - return node && Valid(*node); - } - void RemoveNode(Node &node) - { - auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) { - for (auto &addr_topics : node_rec) { - TopicDest dest{addr_topics.first, node}; - for (auto &topic : addr_topics.second) { - auto pos = rec_map.find(topic); - if (pos != rec_map.end()) { - pos->second.erase(dest); - if (pos->second.empty()) { - rec_map.erase(pos); - } - } - } - } - }; - EraseMapRec(service_map_, node->services_); - EraseMapRec(subscribe_map_, node->subscriptions_); - - // remove online record. - auto pos = online_node_addr_map_.find(node->proc_.proc_id()); - if (pos != online_node_addr_map_.end()) { - if (node->addrs_.find(pos->second) != node->addrs_.end()) { - online_node_addr_map_.erase(pos); - } - } - - for (auto &addr : node->addrs_) { - cleaner_(addr); - } - - node->addrs_.clear(); - } - std::string id_; // center proc id; - - std::unordered_map<Topic, Clients> service_map_; - std::unordered_map<Topic, Clients> subscribe_map_; - std::unordered_map<Address, Node> nodes_; - std::unordered_map<std::string, Address> online_node_addr_map_; - Cleaner cleaner_; // remove mqs. - int64_t offline_time_; - int64_t kill_time_; - int64_t last_check_time_; -}; - template <class Body, class OnMsg, class Replyer> inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer) { @@ -445,92 +43,111 @@ } } -Handler Combine(const Handler &h1, const Handler &h2) -{ - return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) { - return h1(socket, msg, head) || h2(socket, msg, head); - }; -} -template <class... H> -Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest) -{ - return Combine(Combine(h0, h1), h2, rest...); -} - #define CASE_ON_MSG_TYPE(MsgTag) \ case kMsgType##MsgTag: \ Dispatch<Msg##MsgTag>( \ msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \ return true; -auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) +auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, Synced<NodeCenter> ¢er) { return [&](auto &&rep_body) { - auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id())); - auto remote = head.route(0).mq_id(); - socket.Send(remote, reply_head, rep_body); + auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); + MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()}; + MsgI msg(socket.shm()); + if (msg.Make(reply_head, rep_body)) { + DEFER1(msg.Release();); + center->SendAllocMsg(socket, remote, msg); + } }; } -bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) +bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy) { - auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) { + // command + auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { auto ¢er = *center_ptr; - center->OnNodeInit(msg.Offset()); + return IsCmd(cmd) && center->OnCommand(socket, cmd); }; - auto Nothing = [](ShmSocket &socket) {}; - BHCenter::Install("#centetr.Init", OnNodeInit, Nothing, BHInitAddress(), 16); - + // now we can talk. auto OnCenterIdle = [center_ptr](ShmSocket &socket) { auto ¢er = *center_ptr; + auto onInit = [&](const int64_t request) { + return center->OnNodeInit(socket, request); + }; + BHCenterHandleInit(socket.shm(), onInit); center->OnTimer(); }; - auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { + auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; - auto replyer = MakeReplyer(socket, head, center->id()); + auto replyer = MakeReplyer(socket, head, center); + + if (!head.dest().ip().empty()) { // other host, proxy + auto valid = [&]() { return head.route_size() == 1; }; + if (!valid()) { return false; } + + if (head.type() == kMsgTypeRequestTopic) { + typedef MsgRequestTopicReply Reply; + Reply reply; + if (!center->CheckMsg(head, reply)) { + replyer(reply); + } else { + auto onResult = [¢er](BHMsgHead &head, std::string body_content) { + if (head.route_size() > 0) { + auto &back = head.route(head.route_size() - 1); + MQInfo dest = {back.mq_id(), back.abs_addr()}; + head.mutable_route()->RemoveLast(); + center->PassRemoteReplyToLocal(dest, head, std::move(body_content)); + } + }; + uint16_t port = head.dest().port(); + if (port == 0) { + port = kBHCenterPort; + } + if (!tcp_proxy.Request(head.dest().ip(), port, msg.content(), onResult)) { + replyer(MakeReply<Reply>(eError, "send request failed.")); + } else { + // success + } + } + return true; + } else { + // ignore other msgs for now. + } + return false; + } + switch (head.type()) { + CASE_ON_MSG_TYPE(ProcInit); CASE_ON_MSG_TYPE(Register); CASE_ON_MSG_TYPE(Heartbeat); CASE_ON_MSG_TYPE(Unregister); CASE_ON_MSG_TYPE(RegisterRPC); CASE_ON_MSG_TYPE(QueryTopic); + CASE_ON_MSG_TYPE(QueryProc); default: return false; } }; - BHCenter::Install("#center.main", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000); + BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000); auto OnBusIdle = [=](ShmSocket &socket) {}; - auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { + auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; + auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; - auto replyer = MakeReplyer(socket, head, center->id()); + auto replyer = MakeReplyer(socket, head, center); auto OnPublish = [&]() { MsgPublish pub; - NodeCenter::Clients clients; - MsgCommonReply reply; - if (head.route_size() != 1 || !msg.ParseBody(pub)) { - return; - } else if (!center->FindClients(head, pub, clients, reply)) { + if (head.route_size() == 1 && msg.ParseBody(pub)) { + // replyer(center->Publish(head, pub.topic(), msg)); // dead lock? + auto reply(center->Publish(head, pub.topic(), msg)); replyer(reply); - } else { - replyer(MakeReply(eSuccess)); - if (clients.empty()) { return; } - - auto it = clients.begin(); - do { - auto &cli = *it; - auto node = cli.weak_node_.lock(); - if (node) { - // should also make sure that mq is not killed before msg expires. - // it would be ok if (kill_time - offline_time) is longer than expire time. - socket.Send(cli.mq_, msg); - ++it; - } else { - it = clients.erase(it); - } - } while (it != clients.end()); + auto hosts = center->FindRemoteSubClients(pub.topic()); + for (auto &host : hosts) { + tcp_proxy.Publish(host, kBHCenterPort, msg.content()); + } } }; switch (head.type()) { @@ -541,7 +158,7 @@ } }; - BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000); + BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000); return true; } @@ -556,51 +173,48 @@ return rec; } -bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) +bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len) { - Centers()[name] = CenterInfo{name, handler, MsgIHandler(), idle, mqid, mq_len}; - return true; -} -bool BHCenter::Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) -{ - Centers()[name] = CenterInfo{name, MsgHandler(), handler, idle, mqid, mq_len}; + Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mq, mq_len}; return true; } BHCenter::BHCenter(Socket::Shm &shm) { - auto gc = [&](const MQId id) { - auto r = ShmSocket::Remove(shm, id); - if (r) { - printf("remove mq %ld ok\n", id); - } - }; + auto nsec = NodeTimeoutSec(); + auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. + io_service_.reset(new IoService); + tcp_proxy_.reset(new TcpProxy(io_service_->io())); - auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, 6s, 6s * 2); - AddCenter(center_ptr); + AddCenter(center_ptr, shm, *tcp_proxy_); for (auto &kv : Centers()) { auto &info = kv.second; - sockets_[info.name_] = std::make_shared<ShmSocket>(shm, info.mqid_, info.mq_len_); + sockets_[info.name_] = std::make_shared<ShmSocket>(info.mq_.offset_, shm, info.mq_.id_); } + + topic_node_.reset(new CenterTopicNode(center_ptr, shm)); + tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr)); } + +BHCenter::~BHCenter() { Stop(); } bool BHCenter::Start() { for (auto &kv : Centers()) { auto &info = kv.second; - if (info.handler_) { - sockets_[info.name_]->Start(info.handler_, info.idle_); - } else { - sockets_[info.name_]->Start(info.raw_handler_, info.idle_); - } + sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); } - + topic_node_->Start(); return true; } bool BHCenter::Stop() { + tcp_proxy_.reset(); + tcp_server_.reset(); + io_service_.reset(); + topic_node_->Stop(); for (auto &kv : sockets_) { kv.second->Stop(); } -- Gitblit v1.8.0