From 36e6a35a886252516f168b90f7a9a7c1c5177312 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期六, 08 五月 2021 15:57:01 +0800
Subject: [PATCH] center alloc node queue; node just find them.
---
src/socket.h | 15 +++--
box/center.cpp | 31 ++++++++-
src/shm_msg_queue.h | 1
src/socket.cpp | 5 +
src/topic_node.h | 2
src/topic_node.cpp | 56 +++++++++++-------
src/shm_msg_queue.cpp | 12 ++-
src/defs.cpp | 2
8 files changed, 85 insertions(+), 39 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index c57d34d..d6ac804 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -102,22 +102,43 @@
// center name, no relative to shm.
const std::string &id() const { return id_; }
- void OnNodeInit(const int64_t msg)
+ void OnNodeInit(SharedMemory &shm, const int64_t msg)
{
MQId ssn = msg;
+ if (nodes_.find(ssn) != nodes_.end()) {
+ return; // ignore in exists.
+ }
+
auto UpdateRegInfo = [&](Node &node) {
for (int i = 0; i < 10; ++i) {
node->addrs_.insert(ssn + i);
}
node->state_.timestamp_ = NowSec() - offline_time_;
node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
+
+ // create sockets.
+ try {
+ auto CreateSocket = [](SharedMemory &shm, const MQId id) {
+ ShmSocket tmp(shm, true, id, 16);
+ };
+ // alloc(-1), node, server, sub, request,
+ for (int i = -1; i < 4; ++i) {
+ CreateSocket(shm, ssn + i);
+ node->addrs_.insert(ssn + i);
+ }
+ return true;
+ } catch (...) {
+ return false;
+ }
};
Node node(new NodeInfo);
- UpdateRegInfo(node);
- nodes_[ssn] = node;
- LOG_INFO() << "new node ssn (" << ssn << ") init";
+ if (UpdateRegInfo(node)) {
+ nodes_[ssn] = node;
+ LOG_INFO() << "new node ssn (" << ssn << ") init";
+ }
}
+
MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
{
if (msg.proc().proc_id() != head.proc_id()) {
@@ -475,7 +496,7 @@
{
auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) {
auto ¢er = *center_ptr;
- center->OnNodeInit(msg.Offset());
+ center->OnNodeInit(socket.shm(), msg.Offset());
};
auto Nothing = [](ShmSocket &socket) {};
diff --git a/src/defs.cpp b/src/defs.cpp
index cc6f23b..6d688b2 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -50,7 +50,7 @@
MsgI::BindShm(shm);
typedef std::atomic<MQId> IdSrc;
IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000);
- return ShmMsgQueue::SetData(*psrc);
+ return psrc && ShmMsgQueue::SetData(*psrc);
}
void SetLastError(const int ec, const std::string &msg)
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
index 38c5f1c..17558de 100644
--- a/src/shm_msg_queue.cpp
+++ b/src/shm_msg_queue.cpp
@@ -36,21 +36,23 @@
static auto &id = GetData();
return (++id) * 10;
}
-// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
+
ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
id_(id),
queue_(segment, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager())
{
}
-ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
- id_(NewId()),
- queue_(segment, true, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager())
+ShmMsgQueue::ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len) :
+ id_(id),
+ queue_(segment, create_or_else_find, MsgQIdToName(id_))
{
if (!queue_.IsOk()) {
- throw("error create msgq " + std::to_string(id_));
+ throw("error create/find msgq " + std::to_string(id_));
}
}
+ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
+ ShmMsgQueue(NewId(), true, segment, len) {}
ShmMsgQueue::~ShmMsgQueue() {}
diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h
index aff931c..f496f0f 100644
--- a/src/shm_msg_queue.h
+++ b/src/shm_msg_queue.h
@@ -38,6 +38,7 @@
static MQId NewId();
ShmMsgQueue(const MQId id, ShmType &segment, const int len);
+ ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len);
ShmMsgQueue(ShmType &segment, const int len);
~ShmMsgQueue();
static bool Remove(SharedMemory &shm, const MQId id);
diff --git a/src/socket.cpp b/src/socket.cpp
index 9580529..6231579 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -29,6 +29,11 @@
{
Start();
}
+ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) :
+ run_(false), mq_(id, create_or_else_find, shm, len)
+{
+ Start();
+}
ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
run_(false), mq_(shm, len)
{
diff --git a/src/socket.h b/src/socket.h
index 08a4b0a..981677f 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -48,6 +48,7 @@
typedef std::function<void(ShmSocket &sock)> IdleCB;
ShmSocket(Shm &shm, const MQId id, const int len);
+ ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len);
ShmSocket(Shm &shm, const int len = 12);
~ShmSocket();
static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
@@ -142,6 +143,7 @@
template <class... Rest>
bool SendImpl(const MQId remote, Rest &&...rest)
{
+ // TODO send alloc request, and pack later, higher bit means alloc?
send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
return true;
}
@@ -151,14 +153,15 @@
std::atomic<bool> run_;
Queue mq_;
- class AsyncCBs
+ template <class Key>
+ class CallbackRecords
{
- std::unordered_map<std::string, RecvCB> store_;
+ std::unordered_map<Key, RecvCB> store_;
public:
bool empty() const { return store_.empty(); }
- bool Store(const std::string &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; }
- bool Pick(const std::string &id, RecvCB &cb)
+ bool Store(const Key &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; }
+ bool Pick(const Key &id, RecvCB &cb)
{
auto pos = store_.find(id);
if (pos != store_.end()) {
@@ -171,9 +174,9 @@
}
};
- Synced<AsyncCBs> per_msg_cbs_;
+ Synced<CallbackRecords<std::string>> per_msg_cbs_;
+
SendQ send_buffer_;
- // Synced<SendQ> send_buffer_;
};
#endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 3883062..d8d6a42 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -23,6 +23,9 @@
using namespace std::chrono;
using namespace std::chrono_literals;
+const char *const kErrMsgNotInit = "BHome node NOT initialized.";
+const char *const kErrMsgNotRegistered = "BHome node NOT registered.";
+
namespace
{
inline void AddRoute(BHMsgHead &head, const MQId id) { head.add_route()->set_mq_id(id); }
@@ -63,14 +66,25 @@
MsgI msg;
msg.OffsetRef() = ssn_id_;
if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) {
- sockets_.resize(eSockEnd);
- for (int i = eSockStart; i < eSockEnd; ++i) {
- sockets_[i].reset(new ShmSocket(shm_, ssn_id_ + i, kMqLen));
+
+ auto end_time = steady_clock::now() + 3s;
+ do {
+ try {
+ for (int i = eSockStart; i < eSockEnd; ++i) {
+ sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen));
+ }
+ break;
+ } catch (...) {
+ sockets_.clear();
+ std::this_thread::sleep_for(100ms);
+ }
+ } while (steady_clock::now() < end_time);
+ if (!sockets_.empty()) {
+ // recv msgs to avoid memory leak.
+ auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
+ SockNode().Start(default_ignore_msg);
+ return true;
}
- // recv msgs to avoid memory leak.
- auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
- SockNode().Start(default_ignore_msg);
- return true;
}
return false;
}
@@ -78,7 +92,7 @@
void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker)
{
if (!Init()) {
- SetLastError(eError, "BHome Node Not Inited.");
+ SetLastError(eError, kErrMsgNotInit);
return;
}
if (nworker < 1) {
@@ -101,7 +115,7 @@
bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
if (!Init()) {
- SetLastError(eError, "BHome Node Not Inited.");
+ SetLastError(eError, kErrMsgNotInit);
return false;
}
@@ -151,7 +165,7 @@
bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
if (!IsOnline()) {
- SetLastError(eNotRegistered, "Not Registered.");
+ SetLastError(eNotRegistered, kErrMsgNotRegistered);
return false;
}
@@ -190,7 +204,7 @@
bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
if (!IsOnline()) {
- SetLastError(eNotRegistered, "Not Registered.");
+ SetLastError(eNotRegistered, kErrMsgNotRegistered);
return false;
}
@@ -223,7 +237,7 @@
bool TopicNode::QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms)
{
if (!IsOnline()) {
- SetLastError(eNotRegistered, "Not Registered.");
+ SetLastError(eNotRegistered, kErrMsgNotRegistered);
return false;
}
auto &sock = SockNode();
@@ -242,7 +256,7 @@
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
if (!IsOnline()) {
- SetLastError(eNotRegistered, "Not Registered.");
+ SetLastError(eNotRegistered, kErrMsgNotRegistered);
return false;
}
@@ -309,7 +323,7 @@
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
{
if (!IsOnline()) {
- SetLastError(eNotRegistered, "Not Registered.");
+ SetLastError(eNotRegistered, kErrMsgNotRegistered);
return false;
}
@@ -333,7 +347,7 @@
bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body)
{
if (!IsOnline()) {
- SetLastError(eNotRegistered, "Not Registered.");
+ SetLastError(eNotRegistered, kErrMsgNotRegistered);
return false;
}
@@ -371,7 +385,7 @@
bool TopicNode::ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
{
if (!IsOnline()) {
- SetLastError(eNotRegistered, "Not Registered.");
+ SetLastError(eNotRegistered, kErrMsgNotRegistered);
return false;
}
@@ -412,7 +426,7 @@
bool TopicNode::ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
{
if (!IsOnline()) {
- SetLastError(eNotRegistered, "Not Registered.");
+ SetLastError(eNotRegistered, kErrMsgNotRegistered);
return false;
}
@@ -463,7 +477,7 @@
bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms)
{
if (!IsOnline()) {
- SetLastError(eNotRegistered, "Not Registered.");
+ SetLastError(eNotRegistered, kErrMsgNotRegistered);
return false;
}
@@ -487,7 +501,7 @@
bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms)
{
if (!IsOnline()) {
- SetLastError(eNotRegistered, "Not Registered.");
+ SetLastError(eNotRegistered, kErrMsgNotRegistered);
return false;
}
@@ -518,7 +532,7 @@
bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
if (!IsOnline()) {
- SetLastError(eNotRegistered, "Not Registered.");
+ SetLastError(eNotRegistered, kErrMsgNotRegistered);
return false;
}
@@ -567,7 +581,7 @@
bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms)
{
if (!IsOnline()) {
- SetLastError(eNotRegistered, "Not Registered.");
+ SetLastError(eNotRegistered, kErrMsgNotRegistered);
return false;
}
diff --git a/src/topic_node.h b/src/topic_node.h
index afce4fc..338a6e3 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -122,7 +122,7 @@
eSockSub,
eSockEnd,
};
- std::vector<std::unique_ptr<ShmSocket>> sockets_;
+ std::vector<std::shared_ptr<ShmSocket>> sockets_;
ShmSocket &SockNode() { return *sockets_[eSockNode]; }
ShmSocket &SockPub() { return *sockets_[eSockPub]; }
--
Gitblit v1.8.0