lichao
2021-04-06 4deeafbd502dc3c57dab8ad6ca601a38a9e7f074
src/reqrep_center.cpp
@@ -17,105 +17,157 @@
 */
#include "reqrep_center.h"
#include "bh_util.h"
using namespace bhome_shm;
#include "msg.h"
#include <chrono>
#include <memory>
#include <mutex>
#include <unordered_map>
struct A {
   void F(int){};
};
using namespace bhome_shm;
namespace
{
inline uint64_t Now()
auto Now = []() { time_t t; return time(&t); };
class NodeCenter
{
   time_t t;
   return time(&t);
}
public:
   typedef std::string ProcAddr;
   typedef bhome::msg::ProcInfo ProcInfo;
   template <class Iter>
   bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end)
   {
      try {
         Node node(new NodeInfo);
         node->addr_ = src_mq;
         node->proc_.Swap(&info);
         node->state_.timestamp_ = Now();
         nodes_[node->proc_.id()] = node;
         for (auto it = topics_begin; it != topics_end; ++it) {
            topic_map_[*it] = node;
         }
         return true;
      } catch (...) {
         return false;
      }
   }
   void Heartbeat(ProcInfo &info, const ProcAddr &src_mq)
   {
      auto pos = nodes_.find(info.name());
      if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same.
         NodeInfo &ni = *pos->second;
         ni.state_.timestamp_ = Now();
         if (!info.public_info().empty()) {
            ni.proc_.set_public_info(info.public_info());
         }
         if (!info.private_info().empty()) {
            ni.proc_.set_private_info(info.private_info());
         }
      }
   }
   bool QueryTopic(const Topic &topic, ProcAddr &addr)
   {
      auto pos = topic_map_.find(topic);
      if (pos != topic_map_.end()) {
         Node node(pos->second.lock());
         if (node) {
            addr = node->addr_;
            return true;
         } else { // dead, remove record.
            topic_map_.erase(pos);
            return false;
         }
      } else {
         return false;
      }
   }
private:
   struct ProcState {
      time_t timestamp_ = 0;
      uint32_t flag_ = 0; // reserved
   };
   typedef std::string ProcId;
   struct NodeInfo {
      ProcState state_; // state
      ProcAddr addr_;   // registered_mqid.
      ProcInfo proc_;   //
   };
   typedef std::shared_ptr<NodeInfo> Node;
   typedef std::weak_ptr<NodeInfo> WeakNode;
   std::unordered_map<Topic, WeakNode> topic_map_;
   std::unordered_map<ProcId, Node> nodes_;
};
} // namespace
bool ReqRepCenter::Start(const int nworker)
BHCenter::MsgHandler MakeReqRepCenter()
{
   auto onRecv = [&](BHMsg &msg) {
   auto center_ptr = std::make_shared<Synced<NodeCenter>>();
   return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
      auto &center = *center_ptr;
      auto &shm = socket.shm();
#ifndef NDEBUG
      static std::atomic<time_t> last(0);
      time_t now = 0;
      time(&now);
      if (last.exchange(now) < now) {
         printf("bus queue size: %ld\n", socket_.Pending());
         printf("center queue size: %ld\n", socket.Pending());
      }
#endif
      if (msg.route_size() == 0) {
         return;
      }
      auto &src_mq = msg.route(0).mq_id();
      auto SrcMQ = [&]() { return msg.route(0).mq_id(); };
      auto OnRegister = [&]() {
         DataProcRegister reg;
         if (!reg.ParseFromString(msg.body())) {
            return;
         }
         ProcInfo pi;
         pi.server_mqid_ = src_mq;
         pi.proc_id_ = reg.proc().name();
         pi.ext_info_ = reg.proc().info();
         pi.timestamp_ = Now();
         if (msg.route_size() != 1) { return; }
         std::lock_guard<std::mutex> lock(mutex_);
         for (auto &t : reg.topics()) {
            topic_mq_[t] = pi.server_mqid_;
         MsgRegister reg;
         if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
            center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end());
         }
         procs_[pi.proc_id_] = pi;
      };
      auto OnHeartbeat = [&]() {
         DataProcHeartbeat hb;
         if (!hb.ParseFromString(msg.body())) {
            return;
         }
         if (msg.route_size() != 1) { return; }
         auto &src_mq = msg.route(0).mq_id();
         std::lock_guard<std::mutex> lock(mutex_);
         auto pos = procs_.find(hb.proc().name());
         if (pos != procs_.end() && pos->second.server_mqid_ == src_mq) { // both name and mq should be the same.
            pos->second.timestamp_ = Now();
            pos->second.ext_info_ = hb.proc().info();
         MsgHeartbeat hb;
         if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
            center->Heartbeat(*hb.mutable_proc(), SrcMQ());
         }
      };
      auto OnQueryTopic = [&]() {
         DataProcQueryTopic query;
         if (!query.ParseFromString(msg.body())) {
            return;
         }
         if (msg.route_size() != 1) { return; }
         std::string dest;
         auto FindDest = [&]() {
            std::lock_guard<std::mutex> lock(mutex_);
            auto pos = topic_mq_.find(query.topic());
            if (pos != topic_mq_.end()) {
               dest = pos->second;
               return true;
            } else {
               return false;
            }
         };
         if (FindDest()) {
         MsgQueryTopic query;
         NodeCenter::ProcAddr dest;
         if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
            MQId remote;
            memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote));
            memcpy(&remote, SrcMQ().data(), sizeof(MQId));
            MsgI imsg;
            if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
            if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) {
               imsg.Release(shm());
            if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
            if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) {
               imsg.Release(shm);
            }
         }
      };
      switch (msg.type()) {
      case kMsgTypeProcRegisterTopics: OnRegister(); break;
      case kMsgTypeProcHeartbeat: OnHeartbeat(); break;
      case kMsgTypeProcQueryTopic: OnQueryTopic(); break;
      default: break;
      case kMsgTypeRegister: OnRegister(); return true;
      case kMsgTypeHeartbeat: OnHeartbeat(); return true;
      case kMsgTypeQueryTopic: OnQueryTopic(); return true;
      default: return false;
      }
   };
}
bool ReqRepCenter::Start(const int nworker)
{
   auto handler = MakeReqRepCenter();
   printf("sizeof(rep/req handler) = %ld\n", sizeof(handler));
   const int kMaxWorker = 16;
   return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
}
   return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
}