lichao
2021-04-02 dc12826dd61ce18fac3a9561c5843d30a0cf9660
src/reqrep_center.cpp
@@ -17,24 +17,96 @@
 */
#include "reqrep_center.h"
#include "bh_util.h"
using namespace bhome_shm;
#include "msg.h"
#include <chrono>
#include <memory>
#include <mutex>
#include <unordered_map>
struct A {
   void F(int){};
};
using namespace bhome_shm;
namespace
{
inline uint64_t Now()
{
   time_t t;
   return time(&t);
}
auto Now = []() { time_t t; return time(&t); };
class NodeCenter
{
public:
   typedef std::string ProcAddr;
   typedef bhome::msg::ProcInfo ProcInfo;
   template <class Iter>
   bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end)
   {
      try {
         Node node(new NodeInfo);
         node->addr_ = src_mq;
         node->proc_.Swap(&info);
         node->state_.timestamp_ = Now();
         nodes_[node->proc_.id()] = node;
         for (auto it = topics_begin; it != topics_end; ++it) {
            topic_map_[*it] = node;
         }
         return true;
      } catch (...) {
         return false;
      }
   }
   void Heartbeat(ProcInfo &info, const ProcAddr &src_mq)
   {
      auto pos = nodes_.find(info.name());
      if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same.
         NodeInfo &ni = *pos->second;
         ni.state_.timestamp_ = Now();
         if (!info.public_info().empty()) {
            ni.proc_.set_public_info(info.public_info());
         }
         if (!info.private_info().empty()) {
            ni.proc_.set_private_info(info.private_info());
         }
      }
   }
   bool QueryTopic(const std::string &topic, ProcAddr &addr)
   {
      auto pos = topic_map_.find(topic);
      if (pos != topic_map_.end()) {
         Node node(pos->second.lock());
         if (node) {
            addr = node->addr_;
            return true;
         } else { // dead, remove record.
            topic_map_.erase(pos);
            return false;
         }
      } else {
         return false;
      }
   }
private:
   struct ProcState {
      time_t timestamp_ = 0;
      uint32_t flag_ = 0; // reserved
   };
   typedef std::string ProcId;
   struct NodeInfo {
      ProcState state_; // state
      ProcAddr addr_;   // registered_mqid.
      ProcInfo proc_;   //
   };
   typedef std::shared_ptr<NodeInfo> Node;
   typedef std::weak_ptr<NodeInfo> WeakNode;
   std::unordered_map<std::string, WeakNode> topic_map_;
   std::unordered_map<ProcId, Node> nodes_;
};
} // namespace
bool ReqRepCenter::Start(const int nworker)
{
   auto onRecv = [&](BHMsg &msg) {
   auto center_ptr = std::make_shared<Synced<NodeCenter>>();
   auto onRecv = [center_ptr, this](BHMsg &msg) {
      auto &center = *center_ptr;
#ifndef NDEBUG
      static std::atomic<time_t> last(0);
      time_t now = 0;
@@ -50,54 +122,22 @@
      auto OnRegister = [&]() {
         DataProcRegister reg;
         if (!reg.ParseFromString(msg.body())) {
            return;
         if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
            center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end());
         }
         ProcInfo pi;
         pi.server_mqid_ = src_mq;
         pi.proc_id_ = reg.proc().name();
         pi.ext_info_ = reg.proc().info();
         pi.timestamp_ = Now();
         std::lock_guard<std::mutex> lock(mutex_);
         for (auto &t : reg.topics()) {
            topic_mq_[t] = pi.server_mqid_;
         }
         procs_[pi.proc_id_] = pi;
      };
      auto OnHeartbeat = [&]() {
         DataProcHeartbeat hb;
         if (!hb.ParseFromString(msg.body())) {
            return;
         }
         std::lock_guard<std::mutex> lock(mutex_);
         auto pos = procs_.find(hb.proc().name());
         if (pos != procs_.end() && pos->second.server_mqid_ == src_mq) { // both name and mq should be the same.
            pos->second.timestamp_ = Now();
            pos->second.ext_info_ = hb.proc().info();
         if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
            center->Heartbeat(*hb.mutable_proc(), src_mq);
         }
      };
      auto OnQueryTopic = [&]() {
         DataProcQueryTopic query;
         if (!query.ParseFromString(msg.body())) {
            return;
         }
         std::string dest;
         auto FindDest = [&]() {
            std::lock_guard<std::mutex> lock(mutex_);
            auto pos = topic_mq_.find(query.topic());
            if (pos != topic_mq_.end()) {
               dest = pos->second;
               return true;
            } else {
               return false;
            }
         };
         if (FindDest()) {
         NodeCenter::ProcAddr dest;
         if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
            MQId remote;
            memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote));
            MsgI imsg;