From dc12826dd61ce18fac3a9561c5843d30a0cf9660 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 02 四月 2021 15:48:53 +0800 Subject: [PATCH] add request topic cache; refactor req/rep center. --- src/reqrep_center.cpp | 136 +++++++++++++++++--------- src/reqrep.h | 39 +++++++ src/bh_util.h | 39 +++++++ src/msg.h | 1 src/reqrep_center.h | 15 --- proto/source/bhome_msg.proto | 30 ++++- utest/utest.cpp | 13 ++ src/reqrep.cpp | 12 +- 8 files changed, 208 insertions(+), 77 deletions(-) diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto index 149d8ee..a8e5073 100644 --- a/proto/source/bhome_msg.proto +++ b/proto/source/bhome_msg.proto @@ -4,13 +4,31 @@ package bhome.msg; +// message format : header(BHMsgHead) + body(variable types) message BHAddress { bytes mq_id = 1; // mqid, uuid bytes ip = 2; // int32 port = 3; } -message BHMsg { +message ProcInfo +{ + bytes id = 1; + bytes name = 2; + bytes public_info = 3; + bytes private_info = 4; +} + +message BHMsgHead { + bytes msg_id = 1; + repeated BHAddress route = 2; // for reply and proxy. + int64 timestamp = 3; + int32 type = 4; + ProcInfo proc = 5; + bytes topic = 6; // for request route +} + +message BHMsg { // deprecated bytes msg_id = 1; int64 timestamp = 2; int32 type = 3; @@ -50,12 +68,6 @@ bytes data = 1; } -message ProcInfo -{ - bytes name = 1; - bytes info = 2; -} - message DataProcRegister { ProcInfo proc = 1; @@ -74,3 +86,7 @@ message DataProcQueryTopicReply { BHAddress address = 1; } + +service TopicRequestReplyService { + rpc Request (DataRequest) returns (DataReply); +} \ No newline at end of file diff --git a/src/bh_util.h b/src/bh_util.h index b5dc45e..bc48578 100644 --- a/src/bh_util.h +++ b/src/bh_util.h @@ -19,6 +19,7 @@ #define BH_UTIL_SOXWOK67 #include <functional> +#include <mutex> #include <stdint.h> inline uint16_t Get8(const void *p) @@ -104,6 +105,44 @@ } }; +template <class D, class M, class G = std::unique_lock<M>> +class SyncedPtr +{ + G lock_; + D *p_ = nullptr; + +public: + SyncedPtr(M &mtx, D &data) : + lock_(mtx), p_(&data) {} + SyncedPtr(SyncedPtr &&a) + { + lock_.swap(a.lock_); + std::swap(p_, a.p_); + } + D *operator->() const { return p_; } + D &operator*() const { return *p_; } +}; + +template <class T, class Mutex = std::mutex, class Lock = std::unique_lock<Mutex>> +class Synced +{ + typedef T Data; + Mutex mutex_; + Data data_; + typedef SyncedPtr<Data, Mutex, Lock> Ptr; + +public: + template <class... P> + explicit Synced(const P &...p) : + data_(p...) {} + Ptr operator->() { return Ptr(mutex_, data_); } + auto Apply(const auto &f) + { + Lock lk(mutex_); + return f(data_); + } +}; + // macro helper #define JOIN_IMPL(a, b) a##b #define JOIN(a, b) JOIN_IMPL(a, b) diff --git a/src/msg.h b/src/msg.h index 8c345fd..30b3208 100644 --- a/src/msg.h +++ b/src/msg.h @@ -69,6 +69,7 @@ BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics); BHMsg MakePub(const std::string &topic, const void *data, const size_t size); +// message content layout: header_size + header + data_size + data class MsgI { private: diff --git a/src/reqrep.cpp b/src/reqrep.cpp index bed6496..79ff892 100644 --- a/src/reqrep.cpp +++ b/src/reqrep.cpp @@ -155,8 +155,7 @@ bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms) { - if (tmp_cache_.first == topic) { - addr = tmp_cache_.second; + if (topic_cache_.Find(topic, addr)) { return true; } @@ -167,9 +166,12 @@ DataProcQueryTopicReply reply; if (reply.ParseFromString(result.body())) { addr = reply.address(); - tmp_cache_.first = topic; - tmp_cache_.second = addr; - return !addr.mq_id().empty(); + if (addr.mq_id().empty()) { + return false; + } else { + topic_cache_.Update(topic, addr); + return true; + } } } } else { diff --git a/src/reqrep.h b/src/reqrep.h index 2971403..e8a38f7 100644 --- a/src/reqrep.h +++ b/src/reqrep.h @@ -18,6 +18,7 @@ #ifndef REQREP_ACEH09NK #define REQREP_ACEH09NK +#include "bh_util.h" #include "defs.h" #include "msg.h" #include "socket.h" @@ -58,7 +59,43 @@ bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms); std::unordered_map<std::string, RecvCB> async_cbs_; - std::pair<std::string, bhome::msg::BHAddress> tmp_cache_; + typedef bhome_msg::BHAddress Address; + class TopicCache + { + class Impl + { + typedef std::unordered_map<std::string, Address> Store; + Store store_; + + public: + bool Find(const std::string &topic, Address &addr) + { + auto pos = store_.find(topic); + if (pos != store_.end()) { + addr = pos->second; + return true; + } else { + return false; + } + } + bool Update(const std::string &topic, const Address &addr) + { + store_[topic] = addr; + return true; + } + }; + Synced<Impl> impl_; + // Impl &impl() + // { + // thread_local Impl impl; + // return impl; + // } + + public: + bool Find(const std::string &topic, Address &addr) { return impl_->Find(topic, addr); } + bool Update(const std::string &topic, const Address &addr) { return impl_->Update(topic, addr); } + }; + TopicCache topic_cache_; }; class SocketReply : private ShmSocket diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp index 5f1873e..0b6ddea 100644 --- a/src/reqrep_center.cpp +++ b/src/reqrep_center.cpp @@ -17,24 +17,96 @@ */ #include "reqrep_center.h" #include "bh_util.h" -using namespace bhome_shm; +#include "msg.h" +#include <chrono> +#include <memory> +#include <mutex> +#include <unordered_map> -struct A { - void F(int){}; -}; +using namespace bhome_shm; namespace { -inline uint64_t Now() -{ - time_t t; - return time(&t); -} +auto Now = []() { time_t t; return time(&t); }; +class NodeCenter +{ +public: + typedef std::string ProcAddr; + typedef bhome::msg::ProcInfo ProcInfo; + + template <class Iter> + bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end) + { + try { + Node node(new NodeInfo); + node->addr_ = src_mq; + node->proc_.Swap(&info); + node->state_.timestamp_ = Now(); + nodes_[node->proc_.id()] = node; + for (auto it = topics_begin; it != topics_end; ++it) { + topic_map_[*it] = node; + } + return true; + } catch (...) { + return false; + } + } + void Heartbeat(ProcInfo &info, const ProcAddr &src_mq) + { + auto pos = nodes_.find(info.name()); + if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same. + NodeInfo &ni = *pos->second; + ni.state_.timestamp_ = Now(); + if (!info.public_info().empty()) { + ni.proc_.set_public_info(info.public_info()); + } + if (!info.private_info().empty()) { + ni.proc_.set_private_info(info.private_info()); + } + } + } + bool QueryTopic(const std::string &topic, ProcAddr &addr) + { + auto pos = topic_map_.find(topic); + if (pos != topic_map_.end()) { + Node node(pos->second.lock()); + if (node) { + addr = node->addr_; + return true; + } else { // dead, remove record. + topic_map_.erase(pos); + return false; + } + } else { + return false; + } + } + +private: + struct ProcState { + time_t timestamp_ = 0; + uint32_t flag_ = 0; // reserved + }; + typedef std::string ProcId; + struct NodeInfo { + ProcState state_; // state + ProcAddr addr_; // registered_mqid. + ProcInfo proc_; // + }; + typedef std::shared_ptr<NodeInfo> Node; + typedef std::weak_ptr<NodeInfo> WeakNode; + std::unordered_map<std::string, WeakNode> topic_map_; + std::unordered_map<ProcId, Node> nodes_; +}; } // namespace + bool ReqRepCenter::Start(const int nworker) { - auto onRecv = [&](BHMsg &msg) { + auto center_ptr = std::make_shared<Synced<NodeCenter>>(); + auto onRecv = [center_ptr, this](BHMsg &msg) { + auto ¢er = *center_ptr; + #ifndef NDEBUG static std::atomic<time_t> last(0); time_t now = 0; @@ -50,54 +122,22 @@ auto OnRegister = [&]() { DataProcRegister reg; - if (!reg.ParseFromString(msg.body())) { - return; + if (reg.ParseFromString(msg.body()) && reg.has_proc()) { + center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end()); } - ProcInfo pi; - pi.server_mqid_ = src_mq; - pi.proc_id_ = reg.proc().name(); - pi.ext_info_ = reg.proc().info(); - pi.timestamp_ = Now(); - - std::lock_guard<std::mutex> lock(mutex_); - for (auto &t : reg.topics()) { - topic_mq_[t] = pi.server_mqid_; - } - procs_[pi.proc_id_] = pi; }; auto OnHeartbeat = [&]() { DataProcHeartbeat hb; - if (!hb.ParseFromString(msg.body())) { - return; - } - - std::lock_guard<std::mutex> lock(mutex_); - auto pos = procs_.find(hb.proc().name()); - if (pos != procs_.end() && pos->second.server_mqid_ == src_mq) { // both name and mq should be the same. - pos->second.timestamp_ = Now(); - pos->second.ext_info_ = hb.proc().info(); + if (hb.ParseFromString(msg.body()) && hb.has_proc()) { + center->Heartbeat(*hb.mutable_proc(), src_mq); } }; auto OnQueryTopic = [&]() { DataProcQueryTopic query; - if (!query.ParseFromString(msg.body())) { - return; - } - - std::string dest; - auto FindDest = [&]() { - std::lock_guard<std::mutex> lock(mutex_); - auto pos = topic_mq_.find(query.topic()); - if (pos != topic_mq_.end()) { - dest = pos->second; - return true; - } else { - return false; - } - }; - if (FindDest()) { + NodeCenter::ProcAddr dest; + if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) { MQId remote; memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote)); MsgI imsg; diff --git a/src/reqrep_center.h b/src/reqrep_center.h index 2ca7295..6473841 100644 --- a/src/reqrep_center.h +++ b/src/reqrep_center.h @@ -20,9 +20,6 @@ #include "defs.h" #include "socket.h" -#include <chrono> -#include <mutex> -#include <set> class ReqRepCenter { @@ -35,18 +32,6 @@ }; Socket socket_; ShmSocket::Shm &shm() { return socket_.shm(); } - struct ProcInfo { - std::string proc_id_; // unique name - std::string server_mqid_; - std::string ext_info_; // maybe json. - uint64_t timestamp_ = 0; - }; - - typedef std::string Dests; - - std::mutex mutex_; - std::unordered_map<std::string, Dests> topic_mq_; - std::unordered_map<std::string, ProcInfo> procs_; public: ReqRepCenter(ShmSocket::Shm &shm) : diff --git a/utest/utest.cpp b/utest/utest.cpp index 54c6d6f..b4aa760 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -151,6 +151,17 @@ bus.Stop(); } +namespace +{ +struct C { + C() { printf("+C\n"); } + C(const C &c) { printf("+C(const C&)\n"); } + void F() { printf("C::F()\n"); } + ~C() { printf("-C\n"); } + char arr[100]; +}; +int F(C &c) { return printf(":::::::::::::F()\n"); } +} // namespace BOOST_AUTO_TEST_CASE(ReqRepTest) { @@ -182,8 +193,8 @@ auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { SocketReply server(shm); ProcInfo info; + info.set_id(name); info.set_name(name); - info.set_info(name); if (!server.Register(info, topics, 100)) { printf("register failed\n"); } -- Gitblit v1.8.0