lichao
2021-04-20 1f3729698a131b3f701f67adb6a1258aa1235dce
src/topic_node.h
@@ -34,7 +34,6 @@
   SharedMemory &shm() { return shm_; }
public:
   typedef std::function<void(std::string &proc_id, const void *data, const int len)> DataCB;
   TopicNode(SharedMemory &shm);
   ~TopicNode();
@@ -44,8 +43,10 @@
   bool Heartbeat(const int timeout_ms);
   // topic rpc server
   typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerCB;
   bool ServerStart(ServerCB const &cb, const int nworker = 2);
   typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB;
   typedef std::function<void(void *src_info, std::string &client_proc_id, MsgRequestTopic &request)> ServerAsyncCB;
   bool ServerStart(ServerSyncCB const &cb, const int nworker = 2);
   bool ServerStart(ServerAsyncCB const &cb, const int nworker = 2);
   bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms);
   bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms);
   bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply);
@@ -65,7 +66,7 @@
   bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms);
   bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms);
   void Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2);
   void Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2);
   void Stop();
private:
@@ -77,45 +78,40 @@
   {
      class Impl
      {
         typedef std::unordered_map<Topic, Address> Store;
         Store store_;
         typedef std::unordered_map<Topic, Address> Records;
         Records records_;
      public:
         bool Find(const Topic &topic, Address &addr)
         {
            auto pos = store_.find(topic);
            if (pos != store_.end()) {
            auto pos = records_.find(topic);
            if (pos != records_.end()) {
               addr = pos->second;
               return true;
            } else {
               return false;
            }
         }
         bool Update(const Topic &topic, const Address &addr)
         bool Store(const Topic &topic, const Address &addr)
         {
            store_[topic] = addr;
            records_[topic] = addr;
            return true;
         }
      };
      Synced<Impl> impl_;
      // Impl &impl()
      // {
      //    thread_local Impl impl;
      //    return impl;
      // }
   public:
      bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); }
      bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); }
      bool Store(const Topic &topic, const Address &addr) { return impl_->Store(topic, addr); }
   };
   // some sockets may be the same one, using functions make it easy to change.
   auto &SockNode() { return sock_node_; }
   auto &SockPub() { return SockNode(); }
   auto &SockSub() { return sock_sub_; }
   auto &SockClient() { return sock_client_; }
   auto &SockServer() { return sock_server_; }
   ShmSocket &SockNode() { return sock_node_; }
   ShmSocket &SockPub() { return SockNode(); }
   ShmSocket &SockSub() { return sock_sub_; }
   ShmSocket &SockClient() { return sock_client_; }
   ShmSocket &SockServer() { return sock_server_; }
   bool IsRegistered() const { return registered_.load(); }
   ShmSocket sock_node_;