lichao
2021-06-30 ae17d1439b35b55212c3a30712e0a60b1d6a99c0
box/center.cpp
@@ -17,7 +17,10 @@
 */
#include "center.h"
#include "center_topic_node.h"
#include "io_service.h"
#include "node_center.h"
#include "tcp_proxy.h"
#include "tcp_server.h"
#include <chrono>
using namespace std::chrono;
@@ -30,8 +33,6 @@
namespace
{
//TODO check proc_id
template <class Body, class OnMsg, class Replyer>
inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer)
{
@@ -40,18 +41,6 @@
   if (msg.ParseBody(body)) {
      replyer(onmsg(body));
   }
}
Handler Combine(const Handler &h1, const Handler &h2)
{
   return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head) {
      return h1(socket, msg, head) || h2(socket, msg, head);
   };
}
template <class... H>
Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest)
{
   return Combine(Combine(h0, h1), h2, rest...);
}
#define CASE_ON_MSG_TYPE(MsgTag)                                                         \
@@ -65,7 +54,7 @@
   return [&](auto &&rep_body) {
      auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id()));
      MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()};
      MsgI msg;
      MsgI msg(socket.shm());
      if (msg.Make(reply_head, rep_body)) {
         DEFER1(msg.Release(););
         center->SendAllocMsg(socket, remote, msg);
@@ -73,7 +62,7 @@
   };
}
bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr)
bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy)
{
   // command
   auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
@@ -87,13 +76,49 @@
      auto onInit = [&](const int64_t request) {
         return center->OnNodeInit(socket, request);
      };
      BHCenterHandleInit(onInit);
      BHCenterHandleInit(socket.shm(), onInit);
      center->OnTimer();
   };
   auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
   auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
      auto &center = *center_ptr;
      auto replyer = MakeReplyer(socket, head, center);
      if (!head.dest().ip().empty()) { // other host, proxy
         auto valid = [&]() { return head.route_size() == 1; };
         if (!valid()) { return false; }
         if (head.type() == kMsgTypeRequestTopic) {
            typedef MsgRequestTopicReply Reply;
            Reply reply;
            if (!center->CheckMsg(head, reply)) {
               replyer(reply);
            } else {
               auto onResult = [&center](BHMsgHead &head, std::string body_content) {
                  if (head.route_size() > 0) {
                     auto &back = head.route(head.route_size() - 1);
                     MQInfo dest = {back.mq_id(), back.abs_addr()};
                     head.mutable_route()->RemoveLast();
                     center->PassRemoteReplyToLocal(dest, head, std::move(body_content));
                  }
               };
               uint16_t port = head.dest().port();
               if (port == 0) {
                  port = kBHCenterPort;
               }
               if (!tcp_proxy.Request(head.dest().ip(), port, msg.content(), onResult)) {
                  replyer(MakeReply<Reply>(eError, "send request failed."));
               } else {
                  // success
               }
            }
            return true;
         } else {
            // ignore other msgs for now.
         }
         return false;
      }
      switch (head.type()) {
         CASE_ON_MSG_TYPE(ProcInit);
         CASE_ON_MSG_TYPE(Register);
@@ -106,31 +131,22 @@
      default: return false;
      }
   };
   BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000);
   BHCenter::Install("@center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000);
   auto OnBusIdle = [=](ShmSocket &socket) {};
   auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
   auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
   auto OnPubSub = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
      auto &center = *center_ptr;
      auto replyer = MakeReplyer(socket, head, center);
      auto OnPublish = [&]() {
         MsgPublish pub;
         NodeCenter::Clients clients;
         MsgCommonReply reply;
         if (head.route_size() != 1 || !msg.ParseBody(pub)) {
            return;
         } else if (!center->FindClients(head, pub, clients, reply)) {
         if (head.route_size() == 1 && msg.ParseBody(pub)) {
            // replyer(center->Publish(head, pub.topic(), msg)); // dead lock?
            auto reply(center->Publish(head, pub.topic(), msg));
            replyer(reply);
         } else {
            replyer(MakeReply(eSuccess));
            if (clients.empty()) { return; }
            for (auto &cli : clients) {
               auto node = cli.weak_node_.lock();
               if (node) {
                  // should also make sure that mq is not killed before msg expires.
                  // it would be ok if (kill_time - offline_time) is longer than expire time.
                  socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
               }
            auto hosts = center->FindRemoteSubClients(pub.topic());
            for (auto &host : hosts) {
               tcp_proxy.Publish(host, kBHCenterPort, msg.content());
            }
         }
      };
@@ -142,7 +158,7 @@
      }
   };
   BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000);
   BHCenter::Install("@center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000);
   return true;
}
@@ -165,16 +181,12 @@
BHCenter::BHCenter(Socket::Shm &shm)
{
   auto gc = [&](const MQId id) {
      auto r = ShmSocket::Remove(shm, id);
      if (r) {
         LOG_DEBUG() << "remove mq " << id << " ok\n";
      }
   };
   auto nsec = NodeTimeoutSec();
   auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
   AddCenter(center_ptr);
   auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
   io_service_.reset(new IoService);
   tcp_proxy_.reset(new TcpProxy(io_service_->io()));
   AddCenter(center_ptr, shm, *tcp_proxy_);
   for (auto &kv : Centers()) {
      auto &info = kv.second;
@@ -182,7 +194,9 @@
   }
   topic_node_.reset(new CenterTopicNode(center_ptr, shm));
   tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr));
}
BHCenter::~BHCenter() { Stop(); }
bool BHCenter::Start()
@@ -197,6 +211,9 @@
bool BHCenter::Stop()
{
   tcp_proxy_.reset();
   tcp_server_.reset();
   io_service_.reset();
   topic_node_->Stop();
   for (auto &kv : sockets_) {
      kv.second->Stop();