From 4e5cb7960ce4e7e66d5190be67426aeca8b55c3d Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 09 四月 2021 18:45:08 +0800
Subject: [PATCH] add heartbeat, not tested yet.
---
src/shm.h | 3
src/socket.h | 1
src/proto.h | 1
proto/source/bhome_msg.proto | 10 ++
utest/utest.cpp | 14 ++-
src/shm_queue.cpp | 12 +-
src/topic_node.cpp | 21 +++-
src/center.cpp | 137 ++++++++++++++++++++++-----------
proto/source/bhome_msg_api.proto | 18 +---
utest/util.h | 6
src/shm_queue.h | 1
src/center.h | 6 +
src/topic_node.h | 6
proto/cpp/CMakeLists.txt | 1
14 files changed, 151 insertions(+), 86 deletions(-)
diff --git a/proto/cpp/CMakeLists.txt b/proto/cpp/CMakeLists.txt
index f8d07f7..abf2098 100644
--- a/proto/cpp/CMakeLists.txt
+++ b/proto/cpp/CMakeLists.txt
@@ -9,4 +9,3 @@
add_library(${Target} STATIC ${PROTO_SRCS})
target_link_libraries(${Target} libprotobuf-lite.a)
-
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index 5056a26..11ff5a2 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -62,6 +62,16 @@
}
+message MsgSubscribe {
+ MsgTopicList topics = 1;
+}
+message MsgUnsubscribe {
+ MsgTopicList topics = 1;
+}
+message MsgRegisterRPC {
+ MsgTopicList topics = 1;
+}
+
service TopicRPC {
rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply);
rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply);
diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto
index 82b8115..a8e8545 100644
--- a/proto/source/bhome_msg_api.proto
+++ b/proto/source/bhome_msg_api.proto
@@ -20,16 +20,14 @@
bytes private_info = 4;
}
+message MsgTopicList {
+ repeated bytes topic_list = 1;
+}
+
+
message MsgPublish {
bytes topic = 1;
bytes data = 2;
-}
-
-message MsgSubscribe {
- repeated bytes topics = 1;
-}
-message MsgUnsubscribe {
- repeated bytes topics = 1;
}
message MsgCommonReply {
@@ -49,11 +47,7 @@
message MsgRegister
{
ProcInfo proc = 1;
-}
-
-message MsgRegisterRPC
-{
- repeated bytes topics = 1;
+ repeated BHAddress addrs = 2;
}
message MsgHeartbeat
diff --git a/src/center.cpp b/src/center.cpp
index 71c85c3..d2aad0a 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -19,7 +19,11 @@
#include "bh_util.h"
#include "defs.h"
#include "shm.h"
+#include <chrono>
#include <set>
+
+using namespace std::chrono;
+using namespace std::chrono_literals;
using namespace bhome_shm;
using namespace bhome_msg;
@@ -28,7 +32,8 @@
namespace
{
-auto Now = []() { time_t t; return time(&t); };
+typedef steady_clock::time_point TimePoint;
+inline TimePoint Now() { return steady_clock::now(); };
//TODO check proc_id
class NodeCenter
@@ -37,24 +42,39 @@
typedef std::string ProcId;
typedef std::string Address;
typedef bhome::msg::ProcInfo ProcInfo;
+ typedef std::function<void(Address const &)> Cleaner;
private:
enum {
- kStateInvalid = 0,
- kStateNormal = 1,
- kStateNoRespond = 2,
- kStateOffline = 3,
+ kStateInvalid,
+ kStateNormal,
+ kStateOffline,
+ kStateKillme,
};
struct ProcState {
- time_t timestamp_ = 0;
+ TimePoint timestamp_;
uint32_t flag_ = 0; // reserved
+ void UpdateState(TimePoint now)
+ {
+ const auto kOfflineTime = 60 * 10s;
+ const auto kKillTime = 60 * 20s;
+
+ auto diff = now - timestamp_;
+ if (diff < kOfflineTime) {
+ flag_ = kStateNormal;
+ } else if (diff < kKillTime) {
+ flag_ = kStateOffline;
+ } else {
+ flag_ = kStateKillme;
+ }
+ }
};
typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
struct NodeInfo {
ProcState state_; // state
- Address addr_; // registered_mqid.
+ std::set<Address> addrs_; // registered mqs
ProcInfo proc_; //
AddressTopics services_; // address: topics
AddressTopics subscriptions_; // address: topics
@@ -67,13 +87,14 @@
WeakNode weak_node_;
bool operator<(const TopicDest &a) const { return mq_ < a.mq_; }
};
- const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
+ inline const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
+ inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
public:
typedef std::set<TopicDest> Clients;
- NodeCenter(const std::string &id = "#Center") :
- id_(id) {}
+ NodeCenter(const std::string &id, const Cleaner &cleaner) :
+ id_(id), cleaner_(cleaner) {}
const std::string &id() const { return id_; } // no need to lock.
//TODO maybe just return serialized string.
@@ -85,7 +106,10 @@
try {
Node node(new NodeInfo);
- node->addr_ = SrcAddr(head);
+ node->addrs_.insert(SrcAddr(head));
+ for (auto &addr : msg.addrs()) {
+ node->addrs_.insert(addr.mq_id());
+ }
node->proc_.Swap(msg.mutable_proc());
node->state_.timestamp_ = Now();
node->state_.flag_ = kStateNormal;
@@ -95,37 +119,17 @@
return MakeReply(eError, "register node error.");
}
}
- template <class OnSuccess, class OnError>
- auto HandleMsg(const BHMsgHead &head, OnSuccess onOk, OnError onErr)
- {
- auto pos = nodes_.find(head.proc_id());
- if (pos == nodes_.end()) {
- return onErr(eNotRegistered, "Node is not registered.");
- } else {
- auto node = pos->second;
- if (head.type() == kMsgTypeHeartbeat && node->addr_ != SrcAddr(head)) {
- return onErr(eAddressNotMatch, "Node address error.");
- } else if (!Valid(*node)) {
- return onErr(eNoRespond, "Node is not alive.");
- } else {
- return onOk(node);
- }
- }
- }
template <class Reply, class Func>
Reply HandleMsg(const BHMsgHead &head, Func const &op)
{
try {
- auto onErr = [](const ErrorCode ec, const std::string &str) { return MakeReply<Reply>(ec, str); };
- return HandleMsg(head, op, onErr);
-
auto pos = nodes_.find(head.proc_id());
if (pos == nodes_.end()) {
return MakeReply<Reply>(eNotRegistered, "Node is not registered.");
} else {
auto node = pos->second;
- if (node->addr_ != SrcAddr(head)) {
+ if (!MatchAddr(node->addrs_, SrcAddr(head))) {
return MakeReply<Reply>(eAddressNotMatch, "Node address error.");
} else if (!Valid(*node)) {
return MakeReply<Reply>(eNoRespond, "Node is not alive.");
@@ -149,9 +153,10 @@
return HandleMsg(
head, [&](Node node) -> MsgCommonReply {
auto &src = SrcAddr(head);
- node->services_[src].insert(msg.topics().begin(), msg.topics().end());
+ auto &topics = msg.topics().topic_list();
+ node->services_[src].insert(topics.begin(), topics.end());
TopicDest dest = {src, node};
- for (auto &topic : msg.topics()) {
+ for (auto &topic : topics) {
service_map_[topic].insert(dest);
}
return MakeReply(eSuccess);
@@ -163,6 +168,7 @@
return HandleMsg(head, [&](Node node) {
NodeInfo &ni = *node;
ni.state_.timestamp_ = Now();
+
auto &info = msg.proc();
if (!info.public_info().empty()) {
ni.proc_.set_public_info(info.public_info());
@@ -207,9 +213,10 @@
{
return HandleMsg(head, [&](Node node) {
auto &src = SrcAddr(head);
- node->subscriptions_[src].insert(msg.topics().begin(), msg.topics().end());
+ auto &topics = msg.topics().topic_list();
+ node->subscriptions_[src].insert(topics.begin(), topics.end());
TopicDest dest = {src, node};
- for (auto &topic : msg.topics()) {
+ for (auto &topic : topics) {
subscribe_map_[topic].insert(dest);
}
return MakeReply(eSuccess);
@@ -232,8 +239,9 @@
if (pos != node->subscriptions_.end()) {
const TopicDest &dest = {src, node};
+ auto &topics = msg.topics().topic_list();
// clear node sub records;
- for (auto &topic : msg.topics()) {
+ for (auto &topic : topics) {
pos->second.erase(topic);
RemoveSubTopicDestRecord(topic, dest);
}
@@ -284,7 +292,30 @@
return ret;
}
+ void OnTimer()
+ {
+ CheckNodes();
+ }
+
private:
+ void CheckNodes()
+ {
+ auto it = nodes_.begin();
+ while (it != nodes_.end()) {
+ auto &cli = *it->second;
+ cli.state_.UpdateState(Now());
+ if (cli.state_.flag_ == kStateKillme) {
+ if (cleaner_) {
+ for (auto &addr : cli.addrs_) {
+ cleaner_(addr);
+ }
+ }
+ it = nodes_.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ }
bool Valid(const NodeInfo &node)
{
return node.state_.flag_ == kStateNormal;
@@ -300,6 +331,7 @@
std::unordered_map<Topic, Clients> service_map_;
std::unordered_map<Topic, Clients> subscribe_map_;
std::unordered_map<ProcId, Node> nodes_;
+ Cleaner cleaner_; // remove mqs.
};
template <class Body, class OnMsg, class Replyer>
@@ -330,9 +362,11 @@
msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \
return true;
-bool InstallCenter()
+bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
{
- auto center_ptr = std::make_shared<Synced<NodeCenter>>();
+
+ auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner);
+
auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
return [&](auto &&rep_body) {
auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
@@ -342,6 +376,11 @@
}
//TODO resend failed.
};
+ };
+
+ auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
+ auto ¢er = *center_ptr;
+ center->OnTimer();
};
auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
@@ -357,6 +396,7 @@
}
};
+ auto OnBusIdle = [](ShmSocket &socket) {};
auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
auto ¢er = *center_ptr;
auto replyer = MakeReplyer(socket, head, center->id());
@@ -390,8 +430,8 @@
}
};
- BHCenter::Install("#center.reg", OnCenter, BHTopicCenterAddress(), 1000);
- BHCenter::Install("#center.bus", OnPubSub, BHTopicBusAddress(), 1000);
+ BHCenter::Install("#center.reg", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000);
+ BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000);
return true;
}
@@ -412,19 +452,24 @@
return rec;
}
-bool BHCenter::Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len)
+bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len)
{
- Centers()[name] = CenterInfo{name, handler, mqid, mq_len};
+ Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len};
return true;
}
-bool BHCenter::Install(const std::string &name, MsgHandler handler, const MQId &mqid, const int mq_len)
+bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len)
{
- return Install(name, handler, std::string((const char *) &mqid, sizeof(mqid)), mq_len);
+ return Install(name, handler, idle, std::string((const char *) &mqid, sizeof(mqid)), mq_len);
}
BHCenter::BHCenter(Socket::Shm &shm)
{
- InstallCenter();
+ auto gc = [&](const std::string &id) {
+ auto r = ShmSocket::Remove(shm, *(MQId *) id.data());
+ printf("remove mq : %s\n", r ? "ok" : "failed");
+ };
+
+ AddCenter("#center", gc);
for (auto &kv : Centers()) {
auto &info = kv.second;
diff --git a/src/center.h b/src/center.h
index 920addd..aea0897 100644
--- a/src/center.h
+++ b/src/center.h
@@ -29,8 +29,9 @@
public:
typedef Socket::PartialRecvCB MsgHandler;
- static bool Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len);
- static bool Install(const std::string &name, MsgHandler handler, const MQId &mqid, const int mq_len);
+ typedef Socket::IdleCB IdleHandler;
+ static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len);
+ static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len);
BHCenter(Socket::Shm &shm);
BHCenter();
@@ -42,6 +43,7 @@
struct CenterInfo {
std::string name_;
MsgHandler handler_;
+ IdleHandler idle_;
std::string mqid_;
int mq_len_ = 0;
};
diff --git a/src/proto.h b/src/proto.h
index da3bde6..fff19ac 100644
--- a/src/proto.h
+++ b/src/proto.h
@@ -19,6 +19,7 @@
#define PROTO_UA9UWKL1
#include "bhome_msg.pb.h"
+#include "bhome_msg_api.pb.h"
using namespace bhome::msg;
diff --git a/src/shm.h b/src/shm.h
index 22a975b..28745dc 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -114,6 +114,7 @@
throw("Error: Not enough memory, can not allocate \"" + name_ + "\"");
}
}
+ static bool Remove(SharedMemory &shm, const std::string &name) { return shm.destroy<Data>(ObjName(name).c_str()); }
static Data *Find(SharedMemory &shm, const std::string &name) { return shm.Find<Data>(ObjName(name)); }
Data *Find(const std::string &name) { return Find(shm_, ObjName(name)); }
virtual ~ShmObject() {}
@@ -122,7 +123,7 @@
const Data *data() const { return pdata_; }
Data *operator->() { return data(); }
const Data *operator->() const { return data(); }
- bool Remove() { return shm_.destroy<Data>(ObjName(name_).c_str()); }
+ bool Remove() { return Remove(shm_, name_); }
};
} // namespace bhome_shm
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index 521f773..652ed5b 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -60,15 +60,13 @@
}
ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
- ShmMsgQueue(NewId(), segment, len)
-{
-}
+ ShmMsgQueue(NewId(), segment, len) {}
-ShmMsgQueue::~ShmMsgQueue()
+ShmMsgQueue::~ShmMsgQueue() {}
+
+bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId &id)
{
- // It's not safe to remove, others may still holder pointers and write to it.
- // TODO use smart_ptr or garbage collection.
- //Remove();
+ return Super::Remove(shm, MsgQIdToName(id));
}
bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend)
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 88c13ec..20ff3dc 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -129,6 +129,7 @@
ShmMsgQueue(const MQId &id, ShmType &segment, const int len);
ShmMsgQueue(ShmType &segment, const int len);
~ShmMsgQueue();
+ static bool Remove(SharedMemory &shm, const MQId &id);
const MQId &Id() const { return id_; }
// bool Recv(MsgI &msg, BHMsgHead &head, const int timeout_ms);
diff --git a/src/socket.h b/src/socket.h
index f73bee5..ee25d81 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -56,6 +56,7 @@
ShmSocket(Shm &shm, const MQId &id, const int len);
ShmSocket(Shm &shm, const int len = 12);
~ShmSocket();
+ static bool Remove(SharedMemory &shm, const MQId &id) { return Queue::Remove(shm, id); }
const MQId &id() const { return mq().Id(); }
Shm &shm() { return shm_; }
// start recv.
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index d76c03a..5afec3f 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -92,9 +92,17 @@
SockNode().Stop();
}
-bool TopicNode::Register(const MsgRegister &body, MsgCommonReply &reply_body, const int timeout_ms)
+bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
auto &sock = SockNode();
+ MsgRegister body;
+ *body.mutable_proc() = proc;
+ auto AddId = [&](const MQId &id) { body.add_addrs()->set_mq_id(&id, sizeof(id)); };
+ AddId(SockNode().id());
+ AddId(SockServer().id());
+ AddId(SockClient().id());
+ AddId(SockSub().id());
+ AddId(SockPub().id());
auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
AddRoute(head, sock.id());
@@ -110,10 +118,12 @@
return r;
}
-bool TopicNode::RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply_body, const int timeout_ms)
+bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
//TODO check registered
auto &sock = SockServer();
+ MsgRegisterRPC body;
+ body.mutable_topics()->Swap(&topics);
auto head(InitMsgHead(GetType(body), proc_id()));
AddRoute(head, sock.id());
@@ -361,14 +371,13 @@
// subscribe
-bool TopicNode::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
+bool TopicNode::Subscribe(MsgTopicList &topics, const int timeout_ms)
{
try {
auto &sock = SockSub();
MsgSubscribe sub;
- for (auto &topic : topics) {
- sub.add_topics(topic);
- }
+ sub.mutable_topics()->Swap(&topics);
+
BHMsgHead head(InitMsgHead(GetType(sub), proc_id()));
AddRoute(head, sock.id());
diff --git a/src/topic_node.h b/src/topic_node.h
index 34fe2ee..60497ad 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -39,12 +39,12 @@
void StopAll();
// topic node
- bool Register(const MsgRegister &body, MsgCommonReply &reply, const int timeout_ms);
- bool RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply, const int timeout_ms);
+ bool Register(ProcInfo &body, MsgCommonReply &reply, const int timeout_ms);
// topic rpc server
typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
bool ServerStart(OnRequest const &cb, const int nworker = 2);
+ bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms);
bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms);
@@ -68,7 +68,7 @@
// subscribe
typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB;
bool SubscribeStartWorker(const TopicDataCB &tdcb, int nworker = 2);
- bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms);
+ bool Subscribe(MsgTopicList &topics, const int timeout_ms);
bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms);
private:
diff --git a/utest/utest.cpp b/utest/utest.cpp
index f88eab9..a178fab 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -9,6 +9,7 @@
#include <string>
#include <thread>
#include <vector>
+
using namespace bhome_msg;
template <class A, class B>
@@ -90,8 +91,11 @@
const int timeout = 1000;
auto Sub = [&](int id, const std::vector<std::string> &topics) {
DemoNode client("client_" + std::to_string(id), shm);
-
- bool r = client.Subscribe(topics, timeout);
+ MsgTopicList tlist;
+ for (auto &t : topics) {
+ tlist.add_topic_list(t);
+ }
+ bool r = client.Subscribe(tlist, timeout);
if (!r) {
printf("client subscribe failed.\n");
}
@@ -227,12 +231,12 @@
};
server.ServerStart(onData);
- MsgRegisterRPC rpc;
+ MsgTopicList rpc;
for (auto &topic : topics) {
- rpc.add_topics(topic);
+ rpc.add_topic_list(topic);
}
MsgCommonReply reply_body;
- if (!server.RegisterRPC(rpc, reply_body, 100)) {
+ if (!server.ServerRegisterRPC(rpc, reply_body, 100)) {
printf("server register topic failed\n");
return;
}
diff --git a/utest/util.h b/utest/util.h
index 28b636e..aaa5189 100644
--- a/utest/util.h
+++ b/utest/util.h
@@ -114,11 +114,11 @@
TopicNode(shm), id_(id) { Init(); }
void Init()
{
- MsgRegister reg;
- reg.mutable_proc()->set_proc_id(id_);
+ ProcInfo proc;
+ proc.set_proc_id(id_);
MsgCommonReply reply_body;
- if (!Register(reg, reply_body, 1000)) {
+ if (!Register(proc, reply_body, 1000)) {
printf("node %s register failed\n", id_.c_str());
}
}
--
Gitblit v1.8.0