From 3931f83205f153f2bc7fc36d1a894cdc3f14b4db Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 21 四月 2021 16:52:51 +0800
Subject: [PATCH] change node socket to vector; try lock free queue.

---
 utest/api_test.cpp |  101 ++++++++++++++++++++++++++++++++++++++++++--------
 1 files changed, 85 insertions(+), 16 deletions(-)

diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index cff2cc5..766c0f8 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -19,18 +19,29 @@
 #include "util.h"
 #include <atomic>
 
-using namespace bhome::msg;
+using namespace bhome_msg;
 
 namespace
 {
 typedef std::atomic<uint64_t> Number;
 
+void Assign(Number &a, const Number &b) { a.store(b.load()); }
 struct MsgStatus {
+
 	Number nrequest_;
+	Number nfailed_;
 	Number nreply_;
 	Number nserved_;
 	MsgStatus() :
 	    nrequest_(0), nreply_(0), nserved_(0) {}
+	MsgStatus &operator=(const MsgStatus &a)
+	{
+		Assign(nrequest_, a.nrequest_);
+		Assign(nserved_, a.nserved_);
+		Assign(nreply_, a.nreply_);
+		Assign(nfailed_, a.nfailed_);
+		return *this;
+	}
 };
 
 MsgStatus &Status()
@@ -55,7 +66,7 @@
                 const int proc_id_len,
                 const void *data,
                 const int data_len,
-                BHServerCallbackTag *tag)
+                void *src)
 {
 	// printf("ServerProc: ");
 	// DEFER1(printf("\n"););
@@ -65,7 +76,7 @@
 		reply.set_data(" reply: " + request.data());
 		std::string s(reply.SerializeAsString());
 		// printf("%s", reply.data().c_str());
-		BHServerCallbackReply(tag, s.data(), s.size());
+		BHSendReply(src, s.data(), s.size());
 		++Status().nserved_;
 	}
 }
@@ -85,6 +96,48 @@
 	// printf("client Recv reply : %s\n", reply.data().c_str());
 }
 
+BOOST_AUTO_TEST_CASE(MutexTest)
+{
+	const std::string shm_name("ShmMutex");
+	// ShmRemover auto_remove(shm_name);
+	SharedMemory shm(shm_name, 1024 * 1024 * 10);
+
+	const std::string mtx_name("test_mutex");
+	const std::string int_name("test_int");
+	auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())();
+	auto pi = shm.find_or_construct<int>(int_name.c_str())(100);
+	if (pi) {
+		auto old = *pi;
+		printf("int : %d, add1: %d\n", old, ++*pi);
+	}
+
+	auto TryLock = [&]() {
+		if (mtx->try_lock()) {
+			printf("try_lock ok\n");
+			return true;
+		} else {
+			printf("try_lock failed\n");
+			return false;
+		}
+	};
+	auto Unlock = [&]() {
+		mtx->unlock();
+		printf("unlocked\n");
+	};
+
+	if (mtx) {
+		printf("mtx exists\n");
+		if (TryLock()) {
+			if (TryLock()) {
+				Unlock();
+			}
+			// Unlock();
+		}
+	} else {
+		printf("mtx not exists\n");
+	}
+}
+
 BOOST_AUTO_TEST_CASE(ApiTest)
 {
 	auto max_time = std::chrono::steady_clock::time_point::max();
@@ -102,7 +155,7 @@
 	printf("maxsec: %ld\n", CountSeconds(max_time));
 
 	bool reg = false;
-	for (int i = 0; i < 10 && !reg; ++i) {
+	for (int i = 0; i < 3 && !reg; ++i) {
 		ProcInfo proc;
 		proc.set_proc_id("demo_client");
 		proc.set_public_info("public info of demo_client. etc...");
@@ -114,6 +167,9 @@
 
 		BHFree(reply, reply_len);
 		Sleep(1s);
+	}
+	if (!reg) {
+		return;
 	}
 
 	const std::string topic_ = "topic_";
@@ -151,7 +207,7 @@
 		for (int i = 0; i < 1; ++i) {
 			MsgPublish pub;
 			pub.set_topic(topic_ + std::to_string(i));
-			pub.set_data("pub_data_" + std::string(1024 * 1024, 'a'));
+			pub.set_data("pub_data_" + std::string(1024 * 1, 'a'));
 			std::string s(pub.SerializeAsString());
 			BHPublish(s.data(), s.size(), 0);
 			// Sleep(1s);
@@ -166,36 +222,49 @@
 			std::string s(req.SerializeAsString());
 			void *msg_id = 0;
 			int len = 0;
+			// Sleep(10ms, false);
 			bool r = BHAsyncRequest(s.data(), s.size(), 0, 0);
 			DEFER1(BHFree(msg_id, len););
 			if (r) {
 				++Status().nrequest_;
 			} else {
-				printf("request topic : %s\n", r ? "ok" : "failed");
+				++Status().nfailed_;
+				static std::atomic<int64_t> last(0);
+				auto now = NowSec();
+				if (last.exchange(now) < now) {
+					int ec = 0;
+					std::string msg;
+					GetLastError(ec, msg);
+					printf("request topic error --------- : %s\n", msg.c_str());
+				}
 			}
 		}
 	};
 	auto showStatus = [](std::atomic<bool> *run) {
-		int64_t last = 0;
+		MsgStatus last;
 		while (*run) {
 			auto &st = Status();
-			std::this_thread::sleep_for(1s);
-			int cur = st.nreply_.load();
-			printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld, speed %8ld\n", st.nrequest_.load(), st.nserved_.load(), cur, cur - last);
-			last = cur;
+			Sleep(1s, false);
+			printf("nreq: %8ld, spd %8ld | failed: %8ld | nsrv: %8ld, spd %8ld | nreply: %8ld, spd %8ld\n",
+			       st.nrequest_.load(), st.nrequest_ - last.nrequest_,
+			       st.nfailed_.load(),
+			       st.nserved_.load(), st.nserved_ - last.nserved_,
+			       st.nreply_.load(), st.nreply_ - last.nreply_);
+			last = st;
 		}
 	};
 	auto hb = [](std::atomic<bool> *run) {
 		while (*run) {
-			BHHeartBeatEasy(0);
-			std::this_thread::sleep_for(1s);
+			Sleep(1s, false);
+			bool r = BHHeartbeatEasy(1000);
+			printf("heartbeat: %s\n", r ? "ok" : "failed");
 		}
 	};
 	std::atomic<bool> run(true);
 	ThreadManager threads;
 	boost::timer::auto_cpu_timer timer;
 	threads.Launch(hb, &run);
-	// threads.Launch(showStatus, &run);
+	threads.Launch(showStatus, &run);
 	int ncli = 10;
 	const uint64_t nreq = 1000 * 100;
 	for (int i = 0; i < ncli; ++i) {
@@ -204,8 +273,8 @@
 
 	int same = 0;
 	int64_t last = 0;
-	while (last < nreq * ncli && same < 3) {
-		Sleep(1s);
+	while (last < nreq * ncli && same < 2) {
+		Sleep(1s, false);
 		auto cur = Status().nreply_.load();
 		if (last == cur) {
 			++same;

--
Gitblit v1.8.0