lichao
2021-05-28 9243710ca372de26823c2225c7b46b072458c671
box/center.cpp
@@ -17,7 +17,9 @@
 */
#include "center.h"
#include "center_topic_node.h"
#include "io_service.h"
#include "node_center.h"
#include "tcp_proxy.h"
#include "tcp_server.h"
#include <chrono>
@@ -74,7 +76,7 @@
   };
}
bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm)
bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm, TcpProxy &tcp_proxy)
{
   // command
   auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
@@ -92,9 +94,41 @@
      center->OnTimer();
   };
   auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
   auto OnCenter = [=, &tcp_proxy](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
      auto &center = *center_ptr;
      auto replyer = MakeReplyer(socket, head, center);
      if (!head.dest().ip().empty()) { // other host, proxy
         auto valid = [&]() { return head.route_size() == 1; };
         if (!valid()) { return false; }
         if (head.type() == kMsgTypeRequestTopic) {
            typedef MsgRequestTopicReply Reply;
            Reply reply;
            if (!center->CheckMsg(head, reply)) {
               replyer(reply);
            } else {
               auto onResult = [&center](BHMsgHead &head, std::string body_content) {
                  if (head.route_size() > 0) {
                     auto &back = head.route(head.route_size() - 1);
                     MQInfo dest = {back.mq_id(), back.abs_addr()};
                     head.mutable_route()->RemoveLast();
                     center->PassRemoteReplyToLocal(dest, head, std::move(body_content));
                  }
               };
               if (!tcp_proxy.Request(head.dest().ip(), head.dest().port(), msg.content(), onResult)) {
                  replyer(MakeReply<Reply>(eError, "send request failed."));
               } else {
                  // success
               }
            }
            return true;
         } else {
            // ignore other msgs for now.
         }
         return false;
      }
      switch (head.type()) {
         CASE_ON_MSG_TYPE(ProcInit);
         CASE_ON_MSG_TYPE(Register);
@@ -168,7 +202,10 @@
{
   auto nsec = NodeTimeoutSec();
   auto center_ptr = std::make_shared<Synced<NodeCenter>>("@bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
   AddCenter(center_ptr, shm);
   io_service_.reset(new IoService);
   tcp_proxy_.reset(new TcpProxy(io_service_->io()));
   AddCenter(center_ptr, shm, *tcp_proxy_);
   for (auto &kv : Centers()) {
      auto &info = kv.second;
@@ -176,7 +213,7 @@
   }
   topic_node_.reset(new CenterTopicNode(center_ptr, shm));
   tcp_server_.reset(new TcpServer(kBHCenterPort, center_ptr));
   tcp_server_.reset(new TcpServer(io_service_->io(), kBHCenterPort, center_ptr));
}
BHCenter::~BHCenter() { Stop(); }
@@ -188,13 +225,14 @@
      sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
   }
   topic_node_->Start();
   tcp_server_->Start();
   return true;
}
bool BHCenter::Stop()
{
   tcp_server_->Stop();
   tcp_proxy_.reset();
   tcp_server_.reset();
   io_service_.reset();
   topic_node_->Stop();
   for (auto &kv : sockets_) {
      kv.second->Stop();