From 377e395a5fdc6ad44bdd5a2d41d2930f45fc4384 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 30 四月 2021 18:25:33 +0800
Subject: [PATCH] add node init msg, alloc msgq on success.

---
 src/socket.h          |    2 
 utest/api_test.cpp    |    2 
 box/center.cpp        |   75 ++++++++++++++----
 src/socket.cpp        |   42 +++++++++
 src/defs.h            |    2 
 src/topic_node.h      |    7 +
 box/center.h          |    3 
 src/topic_node.cpp    |   63 ++++++++++++---
 src/shm_msg_queue.cpp |    2 
 9 files changed, 159 insertions(+), 39 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index 0952ca7..aa6f285 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -103,7 +103,22 @@
 
 	// center name, no relative to shm.
 	const std::string &id() const { return id_; }
+	void OnNodeInit(const int64_t msg)
+	{
+		MQId ssn = msg;
+		auto UpdateRegInfo = [&](Node &node) {
+			for (int i = 0; i < 10; ++i) {
+				node->addrs_.insert(ssn + i);
+			}
+			node->state_.timestamp_ = NowSec() - offline_time_;
+			node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
+		};
 
+		Node node(new NodeInfo);
+		UpdateRegInfo(node);
+		nodes_[ssn] = node;
+		printf("new node ssn (%ld) init\n", ssn);
+	}
 	MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
 	{
 		if (msg.proc().proc_id() != head.proc_id()) {
@@ -132,17 +147,19 @@
 				Node node(new NodeInfo);
 				UpdateRegInfo(node);
 				nodes_[ssn] = node;
+			}
+			printf("node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn);
 
-				printf("new node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn);
-				auto old = online_node_addr_map_.find(head.proc_id());
-				if (old != online_node_addr_map_.end()) { // old session
-					auto &old_ssn = old->second;
+			auto old = online_node_addr_map_.find(head.proc_id());
+			if (old != online_node_addr_map_.end()) { // old session
+				auto &old_ssn = old->second;
+				if (old_ssn != ssn) {
 					nodes_[old_ssn]->state_.PutOffline(offline_time_);
 					printf("put node (%s) ssn (%ld) offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second);
 					old_ssn = ssn;
-				} else {
-					online_node_addr_map_.emplace(head.proc_id(), ssn);
 				}
+			} else {
+				online_node_addr_map_.emplace(head.proc_id(), ssn);
 			}
 			return MakeReply(eSuccess);
 		} catch (...) {
@@ -446,16 +463,24 @@
 		    msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \
 		return true;
 
-bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
+auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, const std::string &proc_id)
 {
-	auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2);
-	auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
-		return [&](auto &&rep_body) {
-			auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id()));
-			auto remote = head.route(0).mq_id();
-			socket.Send(remote, reply_head, rep_body);
-		};
+	return [&](auto &&rep_body) {
+		auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id()));
+		auto remote = head.route(0).mq_id();
+		socket.Send(remote, reply_head, rep_body);
 	};
+}
+
+bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr)
+{
+	auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) {
+		auto &center = *center_ptr;
+		center->OnNodeInit(msg.Offset());
+	};
+	auto Nothing = [](ShmSocket &socket) {};
+
+	BHCenter::Install("#centetr.Init", OnNodeInit, Nothing, BHInitAddress(), 16);
 
 	auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
 		auto &center = *center_ptr;
@@ -475,6 +500,7 @@
 		default: return false;
 		}
 	};
+	BHCenter::Install("#center.main", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000);
 
 	auto OnBusIdle = [=](ShmSocket &socket) {};
 	auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
@@ -515,7 +541,6 @@
 		}
 	};
 
-	BHCenter::Install("#center.reg", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000);
 	BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000);
 
 	return true;
@@ -533,7 +558,12 @@
 
 bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len)
 {
-	Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len};
+	Centers()[name] = CenterInfo{name, handler, MsgIHandler(), idle, mqid, mq_len};
+	return true;
+}
+bool BHCenter::Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len)
+{
+	Centers()[name] = CenterInfo{name, MsgHandler(), handler, idle, mqid, mq_len};
 	return true;
 }
 
@@ -541,10 +571,13 @@
 {
 	auto gc = [&](const MQId id) {
 		auto r = ShmSocket::Remove(shm, id);
-		printf("remove mq %ld : %s\n", id, (r ? "ok" : "failed"));
+		if (r) {
+			printf("remove mq %ld ok\n", id);
+		}
 	};
 
-	AddCenter("#bhome_center", gc);
+	auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, 6s, 6s * 2);
+	AddCenter(center_ptr);
 
 	for (auto &kv : Centers()) {
 		auto &info = kv.second;
@@ -556,7 +589,11 @@
 {
 	for (auto &kv : Centers()) {
 		auto &info = kv.second;
-		sockets_[info.name_]->Start(info.handler_, info.idle_);
+		if (info.handler_) {
+			sockets_[info.name_]->Start(info.handler_, info.idle_);
+		} else {
+			sockets_[info.name_]->Start(info.raw_handler_, info.idle_);
+		}
 	}
 
 	return true;
diff --git a/box/center.h b/box/center.h
index ab8b15f..4d71bc9 100644
--- a/box/center.h
+++ b/box/center.h
@@ -29,8 +29,10 @@
 
 public:
 	typedef Socket::PartialRecvCB MsgHandler;
+	typedef Socket::RawRecvCB MsgIHandler;
 	typedef Socket::IdleCB IdleHandler;
 	static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len);
+	static bool Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len);
 
 	BHCenter(Socket::Shm &shm);
 	~BHCenter() { Stop(); }
@@ -41,6 +43,7 @@
 	struct CenterInfo {
 		std::string name_;
 		MsgHandler handler_;
+		MsgIHandler raw_handler_;
 		IdleHandler idle_;
 		MQId mqid_;
 		int mq_len_ = 0;
diff --git a/src/defs.h b/src/defs.h
index 43375bf..a95c81f 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -23,9 +23,11 @@
 
 typedef uint64_t MQId;
 
+const MQId kBHNodeInit = 10;
 const MQId kBHTopicCenter = 100;
 const MQId kBHTopicBus = 101;
 const MQId kBHUniCenter = 102;
+inline const MQId BHInitAddress() { return kBHNodeInit; }
 inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; }
 inline const MQId BHTopicBusAddress() { return kBHTopicBus; }
 inline const MQId BHUniCenterAddress() { return kBHUniCenter; }
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
index cd8cd66..38c5f1c 100644
--- a/src/shm_msg_queue.cpp
+++ b/src/shm_msg_queue.cpp
@@ -34,7 +34,7 @@
 ShmMsgQueue::MQId ShmMsgQueue::NewId()
 {
 	static auto &id = GetData();
-	return ++id;
+	return (++id) * 10;
 }
 // ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
 ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
diff --git a/src/socket.cpp b/src/socket.cpp
index 2127260..9580529 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -40,6 +40,44 @@
 	Stop();
 }
 
+bool ShmSocket::Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker)
+{
+	auto ioProc = [this, onData, onIdle]() {
+		auto DoSend = [this]() { return send_buffer_.TrySend(mq()); };
+		auto DoRecv = [=] {
+			// do not recv if no cb is set.
+			if (!onData) {
+				return false;
+			}
+			auto onMsg = [&](MsgI &imsg) {
+				DEFER1(imsg.Release());
+				onData(*this, imsg);
+			};
+			MsgI imsg;
+			return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false;
+		};
+
+		try {
+			bool more_to_send = DoSend();
+			bool more_to_recv = DoRecv();
+			if (onIdle) { onIdle(*this); }
+			if (!more_to_send && !more_to_recv) {
+				robust::QuickSleep();
+			}
+		} catch (...) {
+		}
+	};
+
+	std::lock_guard<std::mutex> lock(mutex_);
+	StopNoLock();
+
+	run_.store(true);
+	for (int i = 0; i < nworker; ++i) {
+		workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } });
+	}
+	return true;
+}
+
 bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
 {
 	auto ioProc = [this, onData, onIdle]() {
@@ -74,9 +112,7 @@
 			bool more_to_recv = DoRecv();
 			if (onIdle) { onIdle(*this); }
 			if (!more_to_send && !more_to_recv) {
-				std::this_thread::yield();
-				using namespace std::chrono_literals;
-				std::this_thread::sleep_for(10000ns);
+				robust::QuickSleep();
 			}
 		} catch (...) {
 		}
diff --git a/src/socket.h b/src/socket.h
index a5dd72c..bd85fec 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -42,6 +42,7 @@
 public:
 	typedef ShmMsgQueue::MQId MQId;
 	typedef bhome_shm::SharedMemory Shm;
+	typedef std::function<void(ShmSocket &sock, MsgI &imsg)> RawRecvCB;
 	typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
 	typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
 	typedef std::function<void(ShmSocket &sock)> IdleCB;
@@ -53,6 +54,7 @@
 	bool Remove() { return Remove(shm(), id()); }
 	MQId id() const { return mq().Id(); }
 	// start recv.
+	bool Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker = 1);
 	bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
 	bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
 	bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index d274c4b..f629597 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -37,30 +37,52 @@
 } // namespace
 
 TopicNode::TopicNode(SharedMemory &shm) :
-    shm_(shm), sockets_(eSockEnd), state_(eStateUnregistered)
+    shm_(shm), state_(eStateUnregistered)
 {
-	for (int i = eSockStart; i < eSockEnd; ++i) {
-		sockets_[i].reset(new ShmSocket(shm_, kMqLen));
-	}
-	// recv msgs to avoid memory leak.
-	auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
-	SockNode().Start(default_ignore_msg);
-	// for (auto &p : sockets_) {
-	// 	p->Start(default_ignore_msg);
-	// }
+	Init();
 }
 
 TopicNode::~TopicNode()
 {
+	printf("~TopicNode()\n");
 	Stop();
-	SockNode().Stop();
-	if (state() == eStateUnregistered) {
-		for (auto &p : sockets_) { p->Remove(); }
+}
+
+bool TopicNode::Init()
+{
+	std::lock_guard<std::mutex> lk(mutex_);
+
+	if (Valid()) {
+		return true;
 	}
+
+	if (ssn_id_ == 0) {
+		ssn_id_ = ShmMsgQueue::NewId();
+	}
+	printf("Node Init, id %ld \n", ssn_id_);
+	MsgI msg;
+	msg.OffsetRef() = ssn_id_;
+	if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) {
+		sockets_.resize(eSockEnd);
+		for (int i = eSockStart; i < eSockEnd; ++i) {
+			sockets_[i].reset(new ShmSocket(shm_, ssn_id_ + i, kMqLen));
+		}
+		// recv msgs to avoid memory leak.
+		auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
+		SockNode().Start(default_ignore_msg);
+		return true;
+	}
+	return false;
 }
 
 void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker)
 {
+	std::lock_guard<std::mutex> lk(mutex_);
+
+	if (!Init()) {
+		SetLastError(eError, "BHome Node Not Inited.");
+		return;
+	}
 	if (nworker < 1) {
 		nworker = 1;
 	} else if (nworker > 16) {
@@ -73,11 +95,18 @@
 }
 void TopicNode::Stop()
 {
+	printf("Node Stopping\n");
 	for (auto &p : sockets_) { p->Stop(); }
+	printf("Node Stopped\n");
 }
 
 bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
 {
+	if (!Init()) {
+		SetLastError(eError, "BHome Node Not Inited.");
+		return false;
+	}
+
 	info_ = proc;
 
 	auto &sock = SockNode();
@@ -123,6 +152,11 @@
 }
 bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
 {
+	if (!IsOnline()) {
+		SetLastError(eNotRegistered, "Not Registered.");
+		return false;
+	}
+
 	info_.Clear();
 	state_cas(eStateOnline, eStateOffline);
 
@@ -404,8 +438,6 @@
 				reply_head.mutable_proc_id()->swap(out_proc_id);
 				return true;
 			}
-		} else {
-			SetLastError(eNotFound, "remote not found.");
 		}
 	} catch (...) {
 		SetLastError(eError, __func__ + std::string(" internal errer."));
@@ -448,6 +480,7 @@
 			return true;
 		}
 	}
+	SetLastError(eNotFound, "remote not found.");
 	return false;
 }
 
diff --git a/src/topic_node.h b/src/topic_node.h
index 3c90e5b..afce4fc 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -22,6 +22,7 @@
 #include "socket.h"
 #include <atomic>
 #include <memory>
+#include <mutex>
 #include <vector>
 
 using namespace bhome_shm;
@@ -137,7 +138,11 @@
 	void state(const State st) { state_.store(st); }
 	void state_cas(State expected, const State val) { state_.compare_exchange_strong(expected, val); }
 	State state() const { return state_.load(); }
-	bool IsOnline() const { return state() == eStateOnline; }
+	bool IsOnline() { return Init() && state() == eStateOnline; }
+	bool Init();
+	bool Valid() const { return !sockets_.empty(); }
+	std::mutex mutex_;
+	MQId ssn_id_ = 0;
 	std::atomic<State> state_;
 
 	TopicQueryCache topic_query_cache_;
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index cf7baf9..38483eb 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -118,6 +118,8 @@
 
 	printf("maxsec: %ld\n", CountSeconds(max_time));
 
+	// BHCleanup();
+	// return;
 	bool reg = false;
 	for (int i = 0; i < 3 && !reg; ++i) {
 		ProcInfo proc;

--
Gitblit v1.8.0