From 94a455aba299af5ffe476560d859dfd007cd5467 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 16 四月 2021 12:13:43 +0800
Subject: [PATCH] fix crash by using normal timeout; add sendq todo.

---
 src/socket.h       |   32 ++++------
 utest/api_test.cpp |   82 ++++++++++++++++++++++++--
 .gitignore         |    1 
 box/center.cpp     |    7 -
 utest/util.h       |    6 +
 box/center.h       |    1 
 src/sendq.cpp      |    5 +
 src/sendq.h        |    2 
 src/topic_node.cpp |    6 +
 src/defs.cpp       |    4 
 10 files changed, 106 insertions(+), 40 deletions(-)

diff --git a/.gitignore b/.gitignore
index d6ac3de..7cf0ce4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,3 +9,4 @@
 box/bhshmqbox
 box/bhshmq_center
 box/help
+utest/bhshmq_center
diff --git a/box/center.cpp b/box/center.cpp
index 8625f7f..a95e82d 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -417,7 +417,7 @@
 
 bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
 {
-	auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 5s, 10s);
+	auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2);
 	auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
 		return [&](auto &&rep_body) {
 			auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
@@ -496,7 +496,7 @@
 
 SharedMemory &BHomeShm()
 {
-	static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 64);
+	static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512);
 	return shm;
 }
 
@@ -530,9 +530,6 @@
 		sockets_[info.name_] = std::make_shared<ShmSocket>(shm, *(MQId *) info.mqid_.data(), info.mq_len_);
 	}
 }
-
-BHCenter::BHCenter() :
-    BHCenter(BHomeShm()) {}
 
 bool BHCenter::Start()
 {
diff --git a/box/center.h b/box/center.h
index aea0897..60639d5 100644
--- a/box/center.h
+++ b/box/center.h
@@ -34,7 +34,6 @@
 	static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len);
 
 	BHCenter(Socket::Shm &shm);
-	BHCenter();
 	~BHCenter() { Stop(); }
 	bool Start();
 	bool Stop();
diff --git a/src/defs.cpp b/src/defs.cpp
index bab2e53..77b0722 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -20,7 +20,7 @@
 {
 
 const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
-const MQId kBHTopicReqRepCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
+const MQId kBHTopicCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
 const MQId kBHUniCenter = boost::uuids::string_generator()("87654321-89ab-cdef-8349-1234567890ff");
 
 struct LastError {
@@ -37,7 +37,7 @@
 } // namespace
 
 const MQId &BHTopicBusAddress() { return kBHTopicBus; }
-const MQId &BHTopicCenterAddress() { return kBHTopicReqRepCenter; }
+const MQId &BHTopicCenterAddress() { return kBHTopicCenter; }
 const MQId &BHUniCenterAddress() { return kBHUniCenter; }
 
 void SetLastError(const int ec, const std::string &msg)
diff --git a/src/sendq.cpp b/src/sendq.cpp
index 242f8de..ad293c3 100644
--- a/src/sendq.cpp
+++ b/src/sendq.cpp
@@ -19,6 +19,11 @@
 #include "shm_queue.h"
 #include <chrono>
 
+//TODO change to save head, body, instead of MsgI.
+// as MsgI which is in shm, but head, body are in current process.
+// Then if node crashes, shm will not be affected by msgs in sendq.
+// but pulishing ref-counted msg need some work.
+
 int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr)
 {
 	auto FirstNotExpired = [](Array &l) {
diff --git a/src/sendq.h b/src/sendq.h
index aa8923d..b4f3821 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -55,7 +55,7 @@
 	void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
 	{
 		using namespace std::chrono_literals;
-		Append(addr, msg, Now() + 3s, onExpire);
+		Append(addr, msg, Now() + 60s, onExpire);
 	}
 	bool TrySend(bhome_shm::ShmMsgQueue &mq);
 	// bool empty() const { return store_.empty(); }
diff --git a/src/socket.h b/src/socket.h
index 96af6e7..66f716e 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -66,32 +66,24 @@
 	size_t Pending() const { return mq().Pending(); }
 
 	template <class Body>
-	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body)
+	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb = RecvCB())
 	{
-		MsgI msg;
-		if (msg.Make(shm(), head, body)) {
-			DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
-			return SendImpl(valid_remote, msg);
-		}
-		return false;
-	}
-
-	template <class Body>
-	bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb)
-	{
-		//TODO send_buffer_ need flag, and remove callback on expire.
 		MsgI msg;
 		if (msg.Make(shm(), head, body)) {
 			DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
 			std::string msg_id(head.msg_id());
-			per_msg_cbs_->Add(msg_id, cb);
-			auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) {
-				RecvCB cb_no_use;
-				per_msg_cbs_->Find(msg_id, cb_no_use);
-			};
-			return SendImpl(valid_remote, msg, onExpireRemoveCB);
+			if (!cb) {
+				return SendImpl(valid_remote, msg);
+			} else {
+				per_msg_cbs_->Add(msg_id, cb);
+				auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) {
+					RecvCB cb_no_use;
+					per_msg_cbs_->Find(msg_id, cb_no_use);
+				};
+				return SendImpl(valid_remote, msg, onExpireRemoveCB);
+			}
 		} else {
-			printf("out of mem?, avail: %ld\n", shm().get_free_memory());
+			SetLastError(ENOMEM, "Out of mem");
 		}
 		return false;
 	}
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index e9e627f..f947f98 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -245,7 +245,10 @@
 
 bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
 {
-	if (!IsRegistered()) { return false; }
+	if (!IsRegistered()) {
+		SetLastError(eNotRegistered, "Not Registered.");
+		return false;
+	}
 
 	const std::string &msg_id(NewMsgId());
 
@@ -298,6 +301,7 @@
 		return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult);
 
 	} catch (...) {
+		SetLastError(eError, "internal error.");
 		return false;
 	}
 }
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index cff2cc5..58c73c6 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -25,12 +25,23 @@
 {
 typedef std::atomic<uint64_t> Number;
 
+void Assign(Number &a, const Number &b) { a.store(b.load()); }
 struct MsgStatus {
+
 	Number nrequest_;
+	Number nfailed_;
 	Number nreply_;
 	Number nserved_;
 	MsgStatus() :
 	    nrequest_(0), nreply_(0), nserved_(0) {}
+	MsgStatus &operator=(const MsgStatus &a)
+	{
+		Assign(nrequest_, a.nrequest_);
+		Assign(nserved_, a.nserved_);
+		Assign(nreply_, a.nreply_);
+		Assign(nfailed_, a.nfailed_);
+		return *this;
+	}
 };
 
 MsgStatus &Status()
@@ -83,6 +94,48 @@
 		++Status().nreply_;
 	}
 	// printf("client Recv reply : %s\n", reply.data().c_str());
+}
+
+BOOST_AUTO_TEST_CASE(MutexTest)
+{
+	const std::string shm_name("ShmMutex");
+	// ShmRemover auto_remove(shm_name);
+	SharedMemory shm(shm_name, 1024 * 1024 * 10);
+
+	const std::string mtx_name("test_mutex");
+	const std::string int_name("test_int");
+	auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())();
+	auto pi = shm.find_or_construct<int>(int_name.c_str())(100);
+	if (pi) {
+		auto old = *pi;
+		printf("int : %d, add1: %d\n", old, ++*pi);
+	}
+
+	auto TryLock = [&]() {
+		if (mtx->try_lock()) {
+			printf("try_lock ok\n");
+			return true;
+		} else {
+			printf("try_lock failed\n");
+			return false;
+		}
+	};
+	auto Unlock = [&]() {
+		mtx->unlock();
+		printf("unlocked\n");
+	};
+
+	if (mtx) {
+		printf("mtx exists\n");
+		if (TryLock()) {
+			if (TryLock()) {
+				Unlock();
+			}
+			// Unlock();
+		}
+	} else {
+		printf("mtx not exists\n");
+	}
 }
 
 BOOST_AUTO_TEST_CASE(ApiTest)
@@ -166,36 +219,49 @@
 			std::string s(req.SerializeAsString());
 			void *msg_id = 0;
 			int len = 0;
+			// Sleep(10ms, false);
 			bool r = BHAsyncRequest(s.data(), s.size(), 0, 0);
 			DEFER1(BHFree(msg_id, len););
 			if (r) {
 				++Status().nrequest_;
 			} else {
-				printf("request topic : %s\n", r ? "ok" : "failed");
+				++Status().nfailed_;
+				static std::atomic<int64_t> last(0);
+				auto now = NowSec();
+				if (last.exchange(now) < now) {
+					int ec = 0;
+					std::string msg;
+					GetLastError(ec, msg);
+					printf("request topic error --------- : %s\n", msg.c_str());
+				}
 			}
 		}
 	};
 	auto showStatus = [](std::atomic<bool> *run) {
-		int64_t last = 0;
+		MsgStatus last;
 		while (*run) {
 			auto &st = Status();
 			std::this_thread::sleep_for(1s);
-			int cur = st.nreply_.load();
-			printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld, speed %8ld\n", st.nrequest_.load(), st.nserved_.load(), cur, cur - last);
-			last = cur;
+			printf("nreq: %8ld, spd %8ld | failed: %8ld | nsrv: %8ld, spd %8ld | nreply: %8ld, spd %8ld\n",
+			       st.nrequest_.load(), st.nrequest_ - last.nrequest_,
+			       st.nfailed_.load(),
+			       st.nserved_.load(), st.nserved_ - last.nserved_,
+			       st.nreply_.load(), st.nreply_ - last.nreply_);
+			last = st;
 		}
 	};
 	auto hb = [](std::atomic<bool> *run) {
 		while (*run) {
-			BHHeartBeatEasy(0);
-			std::this_thread::sleep_for(1s);
+			Sleep(1s, false);
+			bool r = BHHeartBeatEasy(1000);
+			printf("heartbeat: %s\n", r ? "ok" : "failed");
 		}
 	};
 	std::atomic<bool> run(true);
 	ThreadManager threads;
 	boost::timer::auto_cpu_timer timer;
 	threads.Launch(hb, &run);
-	// threads.Launch(showStatus, &run);
+	threads.Launch(showStatus, &run);
 	int ncli = 10;
 	const uint64_t nreq = 1000 * 100;
 	for (int i = 0; i < ncli; ++i) {
diff --git a/utest/util.h b/utest/util.h
index 7f41da9..f31d63f 100644
--- a/utest/util.h
+++ b/utest/util.h
@@ -39,9 +39,11 @@
 using namespace std::chrono_literals;
 
 template <class D>
-inline void Sleep(D d)
+inline void Sleep(D d, bool print = true)
 {
-	printf("sleep for %ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
+	if (print) {
+		printf("sleep for %ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
+	}
 	std::this_thread::sleep_for(d);
 }
 

--
Gitblit v1.8.0