lichao
2021-04-12 1b52f1cb8c47dd2c0195d2fd65d7b6a4c2f10704
src/center.cpp
@@ -18,6 +18,7 @@
#include "center.h"
#include "bh_util.h"
#include "defs.h"
#include "failed_msg.h"
#include "shm.h"
#include <chrono>
#include <set>
@@ -364,28 +365,31 @@
bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
{
   auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner);
   auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
   auto center_failed_q = std::make_shared<FailedMsgQ>();
   auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) {
      return [&](auto &&rep_body) {
         auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
         bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100);
         MsgI msg;
         if (msg.Make(socket.shm(), reply_head, rep_body)) {
            auto &remote = head.route(0).mq_id();
            bool r = socket.Send(remote.data(), msg, timeout_ms);
         if (!r) {
            printf("send reply failed.\n");
               failq.Push(remote, msg, 60s); // for later retry.
         }
         //TODO resend failed.
         }
      };
   };
   auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
   auto OnCenterIdle = [center_ptr, center_failed_q](ShmSocket &socket) {
      auto &center = *center_ptr;
      center_failed_q->TrySend(socket);
      center->OnTimer();
   };
   auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
      auto &center = *center_ptr;
      auto replyer = MakeReplyer(socket, head, center->id());
      auto replyer = MakeReplyer(socket, head, center->id(), *center_failed_q);
      switch (head.type()) {
         CASE_ON_MSG_TYPE(Register);
         CASE_ON_MSG_TYPE(Heartbeat);
@@ -396,10 +400,11 @@
      }
   };
   auto OnBusIdle = [](ShmSocket &socket) {};
   auto bus_failed_q = std::make_shared<FailedMsgQ>();
   auto OnBusIdle = [=](ShmSocket &socket) { bus_failed_q->TrySend(socket); };
   auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
      auto &center = *center_ptr;
      auto replyer = MakeReplyer(socket, head, center->id());
      auto replyer = MakeReplyer(socket, head, center->id(), *bus_failed_q);
      auto OnPublish = [&]() {
         MsgPublish pub;
         NodeCenter::Clients clients;
@@ -407,19 +412,25 @@
         if (head.route_size() != 1 || !msg.ParseBody(pub)) {
            return;
         } else if (!center->FindClients(head, pub, clients, reply)) {
            MakeReplyer(socket, head, center->id())(reply);
            replyer(reply);
         } else {
            MakeReplyer(socket, head, center->id())(MakeReply(eSuccess));
            replyer(MakeReply(eSuccess));
            if (!msg.EnableRefCount(socket.shm())) { return; } // no memory?
            if (clients.empty()) { return; }
            for (auto &cli : clients) {
            auto it = clients.begin();
            do {
               auto &cli = *it;
               auto node = cli.weak_node_.lock();
               if (node) {
                  if (!socket.Send(cli.mq_.data(), msg, 100)) {
                     printf("center route publish failed. need resend.\n");
                  if (!socket.Send(cli.mq_.data(), msg, 0)) {
                     bus_failed_q->Push(cli.mq_, msg, 60s);
                  }
                  ++it;
               } else {
                  it = clients.erase(it);
               }
            }
            } while (it != clients.end());
         }
      };
      switch (head.type()) {
@@ -484,7 +495,7 @@
{
   for (auto &kv : Centers()) {
      auto &info = kv.second;
      sockets_[info.name_]->Start(info.handler_);
      sockets_[info.name_]->Start(info.handler_, info.idle_);
   }
   return true;