proto/source/bhome_msg.proto | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/bh_util.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/msg.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/reqrep.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/reqrep.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/reqrep_center.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/reqrep_center.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
utest/utest.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
proto/source/bhome_msg.proto
@@ -4,13 +4,31 @@ package bhome.msg; // message format : header(BHMsgHead) + body(variable types) message BHAddress { bytes mq_id = 1; // mqid, uuid bytes ip = 2; // int32 port = 3; } message BHMsg { message ProcInfo { bytes id = 1; bytes name = 2; bytes public_info = 3; bytes private_info = 4; } message BHMsgHead { bytes msg_id = 1; repeated BHAddress route = 2; // for reply and proxy. int64 timestamp = 3; int32 type = 4; ProcInfo proc = 5; bytes topic = 6; // for request route } message BHMsg { // deprecated bytes msg_id = 1; int64 timestamp = 2; int32 type = 3; @@ -50,12 +68,6 @@ bytes data = 1; } message ProcInfo { bytes name = 1; bytes info = 2; } message DataProcRegister { ProcInfo proc = 1; @@ -74,3 +86,7 @@ message DataProcQueryTopicReply { BHAddress address = 1; } service TopicRequestReplyService { rpc Request (DataRequest) returns (DataReply); } src/bh_util.h
@@ -19,6 +19,7 @@ #define BH_UTIL_SOXWOK67 #include <functional> #include <mutex> #include <stdint.h> inline uint16_t Get8(const void *p) @@ -104,6 +105,44 @@ } }; template <class D, class M, class G = std::unique_lock<M>> class SyncedPtr { G lock_; D *p_ = nullptr; public: SyncedPtr(M &mtx, D &data) : lock_(mtx), p_(&data) {} SyncedPtr(SyncedPtr &&a) { lock_.swap(a.lock_); std::swap(p_, a.p_); } D *operator->() const { return p_; } D &operator*() const { return *p_; } }; template <class T, class Mutex = std::mutex, class Lock = std::unique_lock<Mutex>> class Synced { typedef T Data; Mutex mutex_; Data data_; typedef SyncedPtr<Data, Mutex, Lock> Ptr; public: template <class... P> explicit Synced(const P &...p) : data_(p...) {} Ptr operator->() { return Ptr(mutex_, data_); } auto Apply(const auto &f) { Lock lk(mutex_); return f(data_); } }; // macro helper #define JOIN_IMPL(a, b) a##b #define JOIN(a, b) JOIN_IMPL(a, b) src/msg.h
@@ -69,6 +69,7 @@ BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics); BHMsg MakePub(const std::string &topic, const void *data, const size_t size); // message content layout: header_size + header + data_size + data class MsgI { private: src/reqrep.cpp
@@ -155,8 +155,7 @@ bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms) { if (tmp_cache_.first == topic) { addr = tmp_cache_.second; if (topic_cache_.Find(topic, addr)) { return true; } @@ -167,9 +166,12 @@ DataProcQueryTopicReply reply; if (reply.ParseFromString(result.body())) { addr = reply.address(); tmp_cache_.first = topic; tmp_cache_.second = addr; return !addr.mq_id().empty(); if (addr.mq_id().empty()) { return false; } else { topic_cache_.Update(topic, addr); return true; } } } } else { src/reqrep.h
@@ -18,6 +18,7 @@ #ifndef REQREP_ACEH09NK #define REQREP_ACEH09NK #include "bh_util.h" #include "defs.h" #include "msg.h" #include "socket.h" @@ -58,7 +59,43 @@ bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms); std::unordered_map<std::string, RecvCB> async_cbs_; std::pair<std::string, bhome::msg::BHAddress> tmp_cache_; typedef bhome_msg::BHAddress Address; class TopicCache { class Impl { typedef std::unordered_map<std::string, Address> Store; Store store_; public: bool Find(const std::string &topic, Address &addr) { auto pos = store_.find(topic); if (pos != store_.end()) { addr = pos->second; return true; } else { return false; } } bool Update(const std::string &topic, const Address &addr) { store_[topic] = addr; return true; } }; Synced<Impl> impl_; // Impl &impl() // { // thread_local Impl impl; // return impl; // } public: bool Find(const std::string &topic, Address &addr) { return impl_->Find(topic, addr); } bool Update(const std::string &topic, const Address &addr) { return impl_->Update(topic, addr); } }; TopicCache topic_cache_; }; class SocketReply : private ShmSocket src/reqrep_center.cpp
@@ -17,24 +17,96 @@ */ #include "reqrep_center.h" #include "bh_util.h" using namespace bhome_shm; #include "msg.h" #include <chrono> #include <memory> #include <mutex> #include <unordered_map> struct A { void F(int){}; }; using namespace bhome_shm; namespace { inline uint64_t Now() { time_t t; return time(&t); } auto Now = []() { time_t t; return time(&t); }; class NodeCenter { public: typedef std::string ProcAddr; typedef bhome::msg::ProcInfo ProcInfo; template <class Iter> bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end) { try { Node node(new NodeInfo); node->addr_ = src_mq; node->proc_.Swap(&info); node->state_.timestamp_ = Now(); nodes_[node->proc_.id()] = node; for (auto it = topics_begin; it != topics_end; ++it) { topic_map_[*it] = node; } return true; } catch (...) { return false; } } void Heartbeat(ProcInfo &info, const ProcAddr &src_mq) { auto pos = nodes_.find(info.name()); if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same. NodeInfo &ni = *pos->second; ni.state_.timestamp_ = Now(); if (!info.public_info().empty()) { ni.proc_.set_public_info(info.public_info()); } if (!info.private_info().empty()) { ni.proc_.set_private_info(info.private_info()); } } } bool QueryTopic(const std::string &topic, ProcAddr &addr) { auto pos = topic_map_.find(topic); if (pos != topic_map_.end()) { Node node(pos->second.lock()); if (node) { addr = node->addr_; return true; } else { // dead, remove record. topic_map_.erase(pos); return false; } } else { return false; } } private: struct ProcState { time_t timestamp_ = 0; uint32_t flag_ = 0; // reserved }; typedef std::string ProcId; struct NodeInfo { ProcState state_; // state ProcAddr addr_; // registered_mqid. ProcInfo proc_; // }; typedef std::shared_ptr<NodeInfo> Node; typedef std::weak_ptr<NodeInfo> WeakNode; std::unordered_map<std::string, WeakNode> topic_map_; std::unordered_map<ProcId, Node> nodes_; }; } // namespace bool ReqRepCenter::Start(const int nworker) { auto onRecv = [&](BHMsg &msg) { auto center_ptr = std::make_shared<Synced<NodeCenter>>(); auto onRecv = [center_ptr, this](BHMsg &msg) { auto ¢er = *center_ptr; #ifndef NDEBUG static std::atomic<time_t> last(0); time_t now = 0; @@ -50,54 +122,22 @@ auto OnRegister = [&]() { DataProcRegister reg; if (!reg.ParseFromString(msg.body())) { return; if (reg.ParseFromString(msg.body()) && reg.has_proc()) { center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end()); } ProcInfo pi; pi.server_mqid_ = src_mq; pi.proc_id_ = reg.proc().name(); pi.ext_info_ = reg.proc().info(); pi.timestamp_ = Now(); std::lock_guard<std::mutex> lock(mutex_); for (auto &t : reg.topics()) { topic_mq_[t] = pi.server_mqid_; } procs_[pi.proc_id_] = pi; }; auto OnHeartbeat = [&]() { DataProcHeartbeat hb; if (!hb.ParseFromString(msg.body())) { return; } std::lock_guard<std::mutex> lock(mutex_); auto pos = procs_.find(hb.proc().name()); if (pos != procs_.end() && pos->second.server_mqid_ == src_mq) { // both name and mq should be the same. pos->second.timestamp_ = Now(); pos->second.ext_info_ = hb.proc().info(); if (hb.ParseFromString(msg.body()) && hb.has_proc()) { center->Heartbeat(*hb.mutable_proc(), src_mq); } }; auto OnQueryTopic = [&]() { DataProcQueryTopic query; if (!query.ParseFromString(msg.body())) { return; } std::string dest; auto FindDest = [&]() { std::lock_guard<std::mutex> lock(mutex_); auto pos = topic_mq_.find(query.topic()); if (pos != topic_mq_.end()) { dest = pos->second; return true; } else { return false; } }; if (FindDest()) { NodeCenter::ProcAddr dest; if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) { MQId remote; memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote)); MsgI imsg; src/reqrep_center.h
@@ -20,9 +20,6 @@ #include "defs.h" #include "socket.h" #include <chrono> #include <mutex> #include <set> class ReqRepCenter { @@ -35,18 +32,6 @@ }; Socket socket_; ShmSocket::Shm &shm() { return socket_.shm(); } struct ProcInfo { std::string proc_id_; // unique name std::string server_mqid_; std::string ext_info_; // maybe json. uint64_t timestamp_ = 0; }; typedef std::string Dests; std::mutex mutex_; std::unordered_map<std::string, Dests> topic_mq_; std::unordered_map<std::string, ProcInfo> procs_; public: ReqRepCenter(ShmSocket::Shm &shm) : utest/utest.cpp
@@ -151,6 +151,17 @@ bus.Stop(); } namespace { struct C { C() { printf("+C\n"); } C(const C &c) { printf("+C(const C&)\n"); } void F() { printf("C::F()\n"); } ~C() { printf("-C\n"); } char arr[100]; }; int F(C &c) { return printf(":::::::::::::F()\n"); } } // namespace BOOST_AUTO_TEST_CASE(ReqRepTest) { @@ -182,8 +193,8 @@ auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { SocketReply server(shm); ProcInfo info; info.set_id(name); info.set_name(name); info.set_info(name); if (!server.Register(info, topics, 100)) { printf("register failed\n"); }