From 36e6a35a886252516f168b90f7a9a7c1c5177312 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期六, 08 五月 2021 15:57:01 +0800
Subject: [PATCH] center alloc node queue; node just find them.

---
 src/socket.h          |   15 +++--
 box/center.cpp        |   31 ++++++++-
 src/shm_msg_queue.h   |    1 
 src/socket.cpp        |    5 +
 src/topic_node.h      |    2 
 src/topic_node.cpp    |   56 +++++++++++-------
 src/shm_msg_queue.cpp |   12 ++-
 src/defs.cpp          |    2 
 8 files changed, 85 insertions(+), 39 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index c57d34d..d6ac804 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -102,22 +102,43 @@
 
 	// center name, no relative to shm.
 	const std::string &id() const { return id_; }
-	void OnNodeInit(const int64_t msg)
+	void OnNodeInit(SharedMemory &shm, const int64_t msg)
 	{
 		MQId ssn = msg;
+		if (nodes_.find(ssn) != nodes_.end()) {
+			return; // ignore in exists.
+		}
+
 		auto UpdateRegInfo = [&](Node &node) {
 			for (int i = 0; i < 10; ++i) {
 				node->addrs_.insert(ssn + i);
 			}
 			node->state_.timestamp_ = NowSec() - offline_time_;
 			node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
+
+			// create sockets.
+			try {
+				auto CreateSocket = [](SharedMemory &shm, const MQId id) {
+					ShmSocket tmp(shm, true, id, 16);
+				};
+				// alloc(-1), node, server, sub, request,
+				for (int i = -1; i < 4; ++i) {
+					CreateSocket(shm, ssn + i);
+					node->addrs_.insert(ssn + i);
+				}
+				return true;
+			} catch (...) {
+				return false;
+			}
 		};
 
 		Node node(new NodeInfo);
-		UpdateRegInfo(node);
-		nodes_[ssn] = node;
-		LOG_INFO() << "new node ssn (" << ssn << ") init";
+		if (UpdateRegInfo(node)) {
+			nodes_[ssn] = node;
+			LOG_INFO() << "new node ssn (" << ssn << ") init";
+		}
 	}
+
 	MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
 	{
 		if (msg.proc().proc_id() != head.proc_id()) {
@@ -475,7 +496,7 @@
 {
 	auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) {
 		auto &center = *center_ptr;
-		center->OnNodeInit(msg.Offset());
+		center->OnNodeInit(socket.shm(), msg.Offset());
 	};
 	auto Nothing = [](ShmSocket &socket) {};
 
diff --git a/src/defs.cpp b/src/defs.cpp
index cc6f23b..6d688b2 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -50,7 +50,7 @@
 	MsgI::BindShm(shm);
 	typedef std::atomic<MQId> IdSrc;
 	IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000);
-	return ShmMsgQueue::SetData(*psrc);
+	return psrc && ShmMsgQueue::SetData(*psrc);
 }
 
 void SetLastError(const int ec, const std::string &msg)
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
index 38c5f1c..17558de 100644
--- a/src/shm_msg_queue.cpp
+++ b/src/shm_msg_queue.cpp
@@ -36,21 +36,23 @@
 	static auto &id = GetData();
 	return (++id) * 10;
 }
-// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
+
 ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
     id_(id),
     queue_(segment, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager())
 {
 }
 
-ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
-    id_(NewId()),
-    queue_(segment, true, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager())
+ShmMsgQueue::ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len) :
+    id_(id),
+    queue_(segment, create_or_else_find, MsgQIdToName(id_))
 {
 	if (!queue_.IsOk()) {
-		throw("error create msgq " + std::to_string(id_));
+		throw("error create/find msgq " + std::to_string(id_));
 	}
 }
+ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
+    ShmMsgQueue(NewId(), true, segment, len) {}
 
 ShmMsgQueue::~ShmMsgQueue() {}
 
diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h
index aff931c..f496f0f 100644
--- a/src/shm_msg_queue.h
+++ b/src/shm_msg_queue.h
@@ -38,6 +38,7 @@
 	static MQId NewId();
 
 	ShmMsgQueue(const MQId id, ShmType &segment, const int len);
+	ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len);
 	ShmMsgQueue(ShmType &segment, const int len);
 	~ShmMsgQueue();
 	static bool Remove(SharedMemory &shm, const MQId id);
diff --git a/src/socket.cpp b/src/socket.cpp
index 9580529..6231579 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -29,6 +29,11 @@
 {
 	Start();
 }
+ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) :
+    run_(false), mq_(id, create_or_else_find, shm, len)
+{
+	Start();
+}
 ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
     run_(false), mq_(shm, len)
 {
diff --git a/src/socket.h b/src/socket.h
index 08a4b0a..981677f 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -48,6 +48,7 @@
 	typedef std::function<void(ShmSocket &sock)> IdleCB;
 
 	ShmSocket(Shm &shm, const MQId id, const int len);
+	ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len);
 	ShmSocket(Shm &shm, const int len = 12);
 	~ShmSocket();
 	static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
@@ -142,6 +143,7 @@
 	template <class... Rest>
 	bool SendImpl(const MQId remote, Rest &&...rest)
 	{
+		// TODO send alloc request, and pack later, higher bit means alloc?
 		send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
 		return true;
 	}
@@ -151,14 +153,15 @@
 	std::atomic<bool> run_;
 
 	Queue mq_;
-	class AsyncCBs
+	template <class Key>
+	class CallbackRecords
 	{
-		std::unordered_map<std::string, RecvCB> store_;
+		std::unordered_map<Key, RecvCB> store_;
 
 	public:
 		bool empty() const { return store_.empty(); }
-		bool Store(const std::string &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; }
-		bool Pick(const std::string &id, RecvCB &cb)
+		bool Store(const Key &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; }
+		bool Pick(const Key &id, RecvCB &cb)
 		{
 			auto pos = store_.find(id);
 			if (pos != store_.end()) {
@@ -171,9 +174,9 @@
 		}
 	};
 
-	Synced<AsyncCBs> per_msg_cbs_;
+	Synced<CallbackRecords<std::string>> per_msg_cbs_;
+
 	SendQ send_buffer_;
-	// Synced<SendQ> send_buffer_;
 };
 
 #endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 3883062..d8d6a42 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -23,6 +23,9 @@
 using namespace std::chrono;
 using namespace std::chrono_literals;
 
+const char *const kErrMsgNotInit = "BHome node NOT initialized.";
+const char *const kErrMsgNotRegistered = "BHome node NOT registered.";
+
 namespace
 {
 inline void AddRoute(BHMsgHead &head, const MQId id) { head.add_route()->set_mq_id(id); }
@@ -63,14 +66,25 @@
 	MsgI msg;
 	msg.OffsetRef() = ssn_id_;
 	if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) {
-		sockets_.resize(eSockEnd);
-		for (int i = eSockStart; i < eSockEnd; ++i) {
-			sockets_[i].reset(new ShmSocket(shm_, ssn_id_ + i, kMqLen));
+
+		auto end_time = steady_clock::now() + 3s;
+		do {
+			try {
+				for (int i = eSockStart; i < eSockEnd; ++i) {
+					sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen));
+				}
+				break;
+			} catch (...) {
+				sockets_.clear();
+				std::this_thread::sleep_for(100ms);
+			}
+		} while (steady_clock::now() < end_time);
+		if (!sockets_.empty()) {
+			// recv msgs to avoid memory leak.
+			auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
+			SockNode().Start(default_ignore_msg);
+			return true;
 		}
-		// recv msgs to avoid memory leak.
-		auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
-		SockNode().Start(default_ignore_msg);
-		return true;
 	}
 	return false;
 }
@@ -78,7 +92,7 @@
 void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker)
 {
 	if (!Init()) {
-		SetLastError(eError, "BHome Node Not Inited.");
+		SetLastError(eError, kErrMsgNotInit);
 		return;
 	}
 	if (nworker < 1) {
@@ -101,7 +115,7 @@
 bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
 {
 	if (!Init()) {
-		SetLastError(eError, "BHome Node Not Inited.");
+		SetLastError(eError, kErrMsgNotInit);
 		return false;
 	}
 
@@ -151,7 +165,7 @@
 bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
 {
 	if (!IsOnline()) {
-		SetLastError(eNotRegistered, "Not Registered.");
+		SetLastError(eNotRegistered, kErrMsgNotRegistered);
 		return false;
 	}
 
@@ -190,7 +204,7 @@
 bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
 {
 	if (!IsOnline()) {
-		SetLastError(eNotRegistered, "Not Registered.");
+		SetLastError(eNotRegistered, kErrMsgNotRegistered);
 		return false;
 	}
 
@@ -223,7 +237,7 @@
 bool TopicNode::QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms)
 {
 	if (!IsOnline()) {
-		SetLastError(eNotRegistered, "Not Registered.");
+		SetLastError(eNotRegistered, kErrMsgNotRegistered);
 		return false;
 	}
 	auto &sock = SockNode();
@@ -242,7 +256,7 @@
 bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
 {
 	if (!IsOnline()) {
-		SetLastError(eNotRegistered, "Not Registered.");
+		SetLastError(eNotRegistered, kErrMsgNotRegistered);
 		return false;
 	}
 
@@ -309,7 +323,7 @@
 bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
 {
 	if (!IsOnline()) {
-		SetLastError(eNotRegistered, "Not Registered.");
+		SetLastError(eNotRegistered, kErrMsgNotRegistered);
 		return false;
 	}
 
@@ -333,7 +347,7 @@
 bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body)
 {
 	if (!IsOnline()) {
-		SetLastError(eNotRegistered, "Not Registered.");
+		SetLastError(eNotRegistered, kErrMsgNotRegistered);
 		return false;
 	}
 
@@ -371,7 +385,7 @@
 bool TopicNode::ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
 {
 	if (!IsOnline()) {
-		SetLastError(eNotRegistered, "Not Registered.");
+		SetLastError(eNotRegistered, kErrMsgNotRegistered);
 		return false;
 	}
 
@@ -412,7 +426,7 @@
 bool TopicNode::ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
 {
 	if (!IsOnline()) {
-		SetLastError(eNotRegistered, "Not Registered.");
+		SetLastError(eNotRegistered, kErrMsgNotRegistered);
 		return false;
 	}
 
@@ -463,7 +477,7 @@
 bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms)
 {
 	if (!IsOnline()) {
-		SetLastError(eNotRegistered, "Not Registered.");
+		SetLastError(eNotRegistered, kErrMsgNotRegistered);
 		return false;
 	}
 
@@ -487,7 +501,7 @@
 bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms)
 {
 	if (!IsOnline()) {
-		SetLastError(eNotRegistered, "Not Registered.");
+		SetLastError(eNotRegistered, kErrMsgNotRegistered);
 		return false;
 	}
 
@@ -518,7 +532,7 @@
 bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
 {
 	if (!IsOnline()) {
-		SetLastError(eNotRegistered, "Not Registered.");
+		SetLastError(eNotRegistered, kErrMsgNotRegistered);
 		return false;
 	}
 
@@ -567,7 +581,7 @@
 bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms)
 {
 	if (!IsOnline()) {
-		SetLastError(eNotRegistered, "Not Registered.");
+		SetLastError(eNotRegistered, kErrMsgNotRegistered);
 		return false;
 	}
 
diff --git a/src/topic_node.h b/src/topic_node.h
index afce4fc..338a6e3 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -122,7 +122,7 @@
 		   eSockSub,
 		   eSockEnd,
 	};
-	std::vector<std::unique_ptr<ShmSocket>> sockets_;
+	std::vector<std::shared_ptr<ShmSocket>> sockets_;
 
 	ShmSocket &SockNode() { return *sockets_[eSockNode]; }
 	ShmSocket &SockPub() { return *sockets_[eSockPub]; }

--
Gitblit v1.8.0