From dc12826dd61ce18fac3a9561c5843d30a0cf9660 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 02 四月 2021 15:48:53 +0800
Subject: [PATCH] add request topic cache; refactor req/rep center.
---
src/reqrep_center.cpp | 136 +++++++++++++++++---------
src/reqrep.h | 39 +++++++
src/bh_util.h | 39 +++++++
src/msg.h | 1
src/reqrep_center.h | 15 ---
proto/source/bhome_msg.proto | 30 ++++-
utest/utest.cpp | 13 ++
src/reqrep.cpp | 12 +-
8 files changed, 208 insertions(+), 77 deletions(-)
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index 149d8ee..a8e5073 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -4,13 +4,31 @@
package bhome.msg;
+// message format : header(BHMsgHead) + body(variable types)
message BHAddress {
bytes mq_id = 1; // mqid, uuid
bytes ip = 2; //
int32 port = 3;
}
-message BHMsg {
+message ProcInfo
+{
+ bytes id = 1;
+ bytes name = 2;
+ bytes public_info = 3;
+ bytes private_info = 4;
+}
+
+message BHMsgHead {
+ bytes msg_id = 1;
+ repeated BHAddress route = 2; // for reply and proxy.
+ int64 timestamp = 3;
+ int32 type = 4;
+ ProcInfo proc = 5;
+ bytes topic = 6; // for request route
+}
+
+message BHMsg { // deprecated
bytes msg_id = 1;
int64 timestamp = 2;
int32 type = 3;
@@ -50,12 +68,6 @@
bytes data = 1;
}
-message ProcInfo
-{
- bytes name = 1;
- bytes info = 2;
-}
-
message DataProcRegister
{
ProcInfo proc = 1;
@@ -74,3 +86,7 @@
message DataProcQueryTopicReply {
BHAddress address = 1;
}
+
+service TopicRequestReplyService {
+ rpc Request (DataRequest) returns (DataReply);
+}
\ No newline at end of file
diff --git a/src/bh_util.h b/src/bh_util.h
index b5dc45e..bc48578 100644
--- a/src/bh_util.h
+++ b/src/bh_util.h
@@ -19,6 +19,7 @@
#define BH_UTIL_SOXWOK67
#include <functional>
+#include <mutex>
#include <stdint.h>
inline uint16_t Get8(const void *p)
@@ -104,6 +105,44 @@
}
};
+template <class D, class M, class G = std::unique_lock<M>>
+class SyncedPtr
+{
+ G lock_;
+ D *p_ = nullptr;
+
+public:
+ SyncedPtr(M &mtx, D &data) :
+ lock_(mtx), p_(&data) {}
+ SyncedPtr(SyncedPtr &&a)
+ {
+ lock_.swap(a.lock_);
+ std::swap(p_, a.p_);
+ }
+ D *operator->() const { return p_; }
+ D &operator*() const { return *p_; }
+};
+
+template <class T, class Mutex = std::mutex, class Lock = std::unique_lock<Mutex>>
+class Synced
+{
+ typedef T Data;
+ Mutex mutex_;
+ Data data_;
+ typedef SyncedPtr<Data, Mutex, Lock> Ptr;
+
+public:
+ template <class... P>
+ explicit Synced(const P &...p) :
+ data_(p...) {}
+ Ptr operator->() { return Ptr(mutex_, data_); }
+ auto Apply(const auto &f)
+ {
+ Lock lk(mutex_);
+ return f(data_);
+ }
+};
+
// macro helper
#define JOIN_IMPL(a, b) a##b
#define JOIN(a, b) JOIN_IMPL(a, b)
diff --git a/src/msg.h b/src/msg.h
index 8c345fd..30b3208 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -69,6 +69,7 @@
BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics);
BHMsg MakePub(const std::string &topic, const void *data, const size_t size);
+// message content layout: header_size + header + data_size + data
class MsgI
{
private:
diff --git a/src/reqrep.cpp b/src/reqrep.cpp
index bed6496..79ff892 100644
--- a/src/reqrep.cpp
+++ b/src/reqrep.cpp
@@ -155,8 +155,7 @@
bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
{
- if (tmp_cache_.first == topic) {
- addr = tmp_cache_.second;
+ if (topic_cache_.Find(topic, addr)) {
return true;
}
@@ -167,9 +166,12 @@
DataProcQueryTopicReply reply;
if (reply.ParseFromString(result.body())) {
addr = reply.address();
- tmp_cache_.first = topic;
- tmp_cache_.second = addr;
- return !addr.mq_id().empty();
+ if (addr.mq_id().empty()) {
+ return false;
+ } else {
+ topic_cache_.Update(topic, addr);
+ return true;
+ }
}
}
} else {
diff --git a/src/reqrep.h b/src/reqrep.h
index 2971403..e8a38f7 100644
--- a/src/reqrep.h
+++ b/src/reqrep.h
@@ -18,6 +18,7 @@
#ifndef REQREP_ACEH09NK
#define REQREP_ACEH09NK
+#include "bh_util.h"
#include "defs.h"
#include "msg.h"
#include "socket.h"
@@ -58,7 +59,43 @@
bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
std::unordered_map<std::string, RecvCB> async_cbs_;
- std::pair<std::string, bhome::msg::BHAddress> tmp_cache_;
+ typedef bhome_msg::BHAddress Address;
+ class TopicCache
+ {
+ class Impl
+ {
+ typedef std::unordered_map<std::string, Address> Store;
+ Store store_;
+
+ public:
+ bool Find(const std::string &topic, Address &addr)
+ {
+ auto pos = store_.find(topic);
+ if (pos != store_.end()) {
+ addr = pos->second;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ bool Update(const std::string &topic, const Address &addr)
+ {
+ store_[topic] = addr;
+ return true;
+ }
+ };
+ Synced<Impl> impl_;
+ // Impl &impl()
+ // {
+ // thread_local Impl impl;
+ // return impl;
+ // }
+
+ public:
+ bool Find(const std::string &topic, Address &addr) { return impl_->Find(topic, addr); }
+ bool Update(const std::string &topic, const Address &addr) { return impl_->Update(topic, addr); }
+ };
+ TopicCache topic_cache_;
};
class SocketReply : private ShmSocket
diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp
index 5f1873e..0b6ddea 100644
--- a/src/reqrep_center.cpp
+++ b/src/reqrep_center.cpp
@@ -17,24 +17,96 @@
*/
#include "reqrep_center.h"
#include "bh_util.h"
-using namespace bhome_shm;
+#include "msg.h"
+#include <chrono>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
-struct A {
- void F(int){};
-};
+using namespace bhome_shm;
namespace
{
-inline uint64_t Now()
-{
- time_t t;
- return time(&t);
-}
+auto Now = []() { time_t t; return time(&t); };
+class NodeCenter
+{
+public:
+ typedef std::string ProcAddr;
+ typedef bhome::msg::ProcInfo ProcInfo;
+
+ template <class Iter>
+ bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end)
+ {
+ try {
+ Node node(new NodeInfo);
+ node->addr_ = src_mq;
+ node->proc_.Swap(&info);
+ node->state_.timestamp_ = Now();
+ nodes_[node->proc_.id()] = node;
+ for (auto it = topics_begin; it != topics_end; ++it) {
+ topic_map_[*it] = node;
+ }
+ return true;
+ } catch (...) {
+ return false;
+ }
+ }
+ void Heartbeat(ProcInfo &info, const ProcAddr &src_mq)
+ {
+ auto pos = nodes_.find(info.name());
+ if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same.
+ NodeInfo &ni = *pos->second;
+ ni.state_.timestamp_ = Now();
+ if (!info.public_info().empty()) {
+ ni.proc_.set_public_info(info.public_info());
+ }
+ if (!info.private_info().empty()) {
+ ni.proc_.set_private_info(info.private_info());
+ }
+ }
+ }
+ bool QueryTopic(const std::string &topic, ProcAddr &addr)
+ {
+ auto pos = topic_map_.find(topic);
+ if (pos != topic_map_.end()) {
+ Node node(pos->second.lock());
+ if (node) {
+ addr = node->addr_;
+ return true;
+ } else { // dead, remove record.
+ topic_map_.erase(pos);
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+
+private:
+ struct ProcState {
+ time_t timestamp_ = 0;
+ uint32_t flag_ = 0; // reserved
+ };
+ typedef std::string ProcId;
+ struct NodeInfo {
+ ProcState state_; // state
+ ProcAddr addr_; // registered_mqid.
+ ProcInfo proc_; //
+ };
+ typedef std::shared_ptr<NodeInfo> Node;
+ typedef std::weak_ptr<NodeInfo> WeakNode;
+ std::unordered_map<std::string, WeakNode> topic_map_;
+ std::unordered_map<ProcId, Node> nodes_;
+};
} // namespace
+
bool ReqRepCenter::Start(const int nworker)
{
- auto onRecv = [&](BHMsg &msg) {
+ auto center_ptr = std::make_shared<Synced<NodeCenter>>();
+ auto onRecv = [center_ptr, this](BHMsg &msg) {
+ auto ¢er = *center_ptr;
+
#ifndef NDEBUG
static std::atomic<time_t> last(0);
time_t now = 0;
@@ -50,54 +122,22 @@
auto OnRegister = [&]() {
DataProcRegister reg;
- if (!reg.ParseFromString(msg.body())) {
- return;
+ if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
+ center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end());
}
- ProcInfo pi;
- pi.server_mqid_ = src_mq;
- pi.proc_id_ = reg.proc().name();
- pi.ext_info_ = reg.proc().info();
- pi.timestamp_ = Now();
-
- std::lock_guard<std::mutex> lock(mutex_);
- for (auto &t : reg.topics()) {
- topic_mq_[t] = pi.server_mqid_;
- }
- procs_[pi.proc_id_] = pi;
};
auto OnHeartbeat = [&]() {
DataProcHeartbeat hb;
- if (!hb.ParseFromString(msg.body())) {
- return;
- }
-
- std::lock_guard<std::mutex> lock(mutex_);
- auto pos = procs_.find(hb.proc().name());
- if (pos != procs_.end() && pos->second.server_mqid_ == src_mq) { // both name and mq should be the same.
- pos->second.timestamp_ = Now();
- pos->second.ext_info_ = hb.proc().info();
+ if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
+ center->Heartbeat(*hb.mutable_proc(), src_mq);
}
};
auto OnQueryTopic = [&]() {
DataProcQueryTopic query;
- if (!query.ParseFromString(msg.body())) {
- return;
- }
-
- std::string dest;
- auto FindDest = [&]() {
- std::lock_guard<std::mutex> lock(mutex_);
- auto pos = topic_mq_.find(query.topic());
- if (pos != topic_mq_.end()) {
- dest = pos->second;
- return true;
- } else {
- return false;
- }
- };
- if (FindDest()) {
+ NodeCenter::ProcAddr dest;
+ if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
MQId remote;
memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote));
MsgI imsg;
diff --git a/src/reqrep_center.h b/src/reqrep_center.h
index 2ca7295..6473841 100644
--- a/src/reqrep_center.h
+++ b/src/reqrep_center.h
@@ -20,9 +20,6 @@
#include "defs.h"
#include "socket.h"
-#include <chrono>
-#include <mutex>
-#include <set>
class ReqRepCenter
{
@@ -35,18 +32,6 @@
};
Socket socket_;
ShmSocket::Shm &shm() { return socket_.shm(); }
- struct ProcInfo {
- std::string proc_id_; // unique name
- std::string server_mqid_;
- std::string ext_info_; // maybe json.
- uint64_t timestamp_ = 0;
- };
-
- typedef std::string Dests;
-
- std::mutex mutex_;
- std::unordered_map<std::string, Dests> topic_mq_;
- std::unordered_map<std::string, ProcInfo> procs_;
public:
ReqRepCenter(ShmSocket::Shm &shm) :
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 54c6d6f..b4aa760 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -151,6 +151,17 @@
bus.Stop();
}
+namespace
+{
+struct C {
+ C() { printf("+C\n"); }
+ C(const C &c) { printf("+C(const C&)\n"); }
+ void F() { printf("C::F()\n"); }
+ ~C() { printf("-C\n"); }
+ char arr[100];
+};
+int F(C &c) { return printf(":::::::::::::F()\n"); }
+} // namespace
BOOST_AUTO_TEST_CASE(ReqRepTest)
{
@@ -182,8 +193,8 @@
auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
SocketReply server(shm);
ProcInfo info;
+ info.set_id(name);
info.set_name(name);
- info.set_info(name);
if (!server.Register(info, topics, 100)) {
printf("register failed\n");
}
--
Gitblit v1.8.0