From 993c556000a414011626770540678948f16eaa9e Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 02 六月 2021 17:40:50 +0800
Subject: [PATCH] center restart with new shm; set center node ssn.

---
 box/center_main.cc        |   75 +-----------------
 src/robust.h              |    4 
 utest/api_test.cpp        |    5 
 box/center_topic_node.cpp |    2 
 src/defs.h                |    5 -
 utest/tcp_test.cpp        |    4 
 src/topic_node.h          |    2 
 src/topic_node.cpp        |    4 
 src/defs.cpp              |   96 ++++++++++++++++++++++-
 9 files changed, 107 insertions(+), 90 deletions(-)

diff --git a/box/center_main.cc b/box/center_main.cc
index e7715f8..c7d67e3 100644
--- a/box/center_main.cc
+++ b/box/center_main.cc
@@ -27,63 +27,6 @@
 using namespace std::chrono_literals;
 using namespace bhome_shm;
 
-namespace
-{
-const std::string kCenterRunningFlag = "bh_center_single_flag_0";
-
-class InstanceFlag
-{
-
-public:
-	InstanceFlag(SharedMemory &shm, const std::string &name) :
-	    shm_(shm), name_(name), run_(false) {}
-	~InstanceFlag() { Stop(); }
-
-	bool TryStartAsFirstInstance()
-	{
-		if (run_) {
-			return true;
-		}
-
-		auto mtx(shm_.FindOrCreate<Mutex>(name_ + "_mutex_0"));
-		auto time_stamp(shm_.FindOrCreate<int64_t>(name_ + "_timestamp_0", 0));
-
-		if (mtx && time_stamp) {
-			Guard lock(*mtx);
-			auto now = NowSec();
-			LOG_DEBUG() << "old: " << *time_stamp << ", now: " << now;
-			if (now > *time_stamp + 10) {
-				*time_stamp = now;
-				auto UpdateTime = [this, time_stamp]() {
-					while (run_) {
-						std::this_thread::sleep_for(1s);
-						*time_stamp = NowSec();
-					}
-				};
-				run_.store(true);
-				std::thread(UpdateTime).swap(worker_);
-				return true;
-			}
-		}
-		return false;
-	}
-
-private:
-	void Stop()
-	{
-		run_.store(false);
-		if (worker_.joinable()) {
-			worker_.join();
-		}
-	}
-
-	std::thread worker_;
-	SharedMemory &shm_;
-	std::string name_;
-	std::atomic<bool> run_;
-};
-
-} // namespace
 int center_main(int argc, const char *argv[])
 {
 	AppArg args(argc, argv);
@@ -101,22 +44,14 @@
 	if (strcasecmp(lvl.c_str(), "error") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::error); }
 	if (strcasecmp(lvl.c_str(), "fatal") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::fatal); }
 
-	auto &shm = BHomeShm();
-	if (!CenterInit(shm)) {
-		auto msg = "init memory error.";
+	if (!CenterInit()) {
+		auto msg = "init memory failed, or center is already running.";
 		LOG_FATAL() << msg;
 		printf("%s\n", msg);
 		exit(0);
 	}
+	auto &shm = BHomeShm();
 	GlobalInit(shm);
-
-	InstanceFlag inst(shm, kCenterRunningFlag);
-	if (!inst.TryStartAsFirstInstance()) {
-		auto msg = "another instance is running, exit.";
-		LOG_INFO() << msg;
-		printf("%s\n", msg);
-		return 0;
-	}
 
 	if (args.Has("daemon") || args.Has("d")) {
 		int r = daemon(0, 0); // TODO center control msg to close itself.
@@ -125,9 +60,9 @@
 	BHCenter center(shm);
 	center.Start();
 
-	auto msg = "center started ...";
+	auto msg = "center (" + shm.name() + ") started ...";
 	LOG_INFO() << msg;
-	printf("%s\n", msg);
+	printf("%s\n", msg.c_str());
 	WaitForSignals({SIGINT, SIGTERM});
 	center.Stop();
 	LOG_INFO() << "center stopped.";
diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp
index 749f4e6..8228992 100644
--- a/box/center_topic_node.cpp
+++ b/box/center_topic_node.cpp
@@ -56,7 +56,7 @@
 } // namespace
 
 CenterTopicNode::CenterTopicNode(CenterPtr center, SharedMemory &shm) :
-    pscenter_(center), pnode_(new TopicNode(shm)), run_(false) {}
+    pscenter_(center), pnode_(new TopicNode(shm, 200)), run_(false) {}
 
 CenterTopicNode::~CenterTopicNode() { Stop(); }
 
diff --git a/src/defs.cpp b/src/defs.cpp
index 57df1bc..694e2c5 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -22,6 +22,7 @@
 #include <boost/uuid/random_generator.hpp>
 #include <boost/uuid/string_generator.hpp>
 #include <boost/uuid/uuid.hpp>
+#include <sys/file.h>
 
 namespace
 {
@@ -76,8 +77,16 @@
 static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct.");
 
 const int64_t kCenterInfoFixedAddress = 1024 * 4;
+const int64_t kShmMetaInfoFixedAddress = 1024 * 16;
 
-const boost::uuids::uuid kCenterInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
+const boost::uuids::uuid kMetaInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
+
+struct BHomeMetaInfo {
+	boost::uuids::uuid tag_;
+	std::atomic<uint64_t> shm_id_;
+	std::atomic<uint64_t> ssn_id_;
+};
+
 struct CenterMetaInfo {
 	boost::uuids::uuid tag_;
 	CenterInfo info_;
@@ -88,16 +97,43 @@
 template <class T = void>
 T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); }
 
+class FileLock
+{
+public:
+	FileLock(const std::string &path) :
+	    fd_(Open(path))
+	{
+		if (fd_ == -1) { throw std::runtime_error("error open file:" + path); }
+	}
+	~FileLock() { Close(fd_); }
+	bool try_lock() { return fd_ != -1 && (flock(fd_, LOCK_EX | LOCK_NB) == 0); }
+	void unlock() { flock(fd_, LOCK_UN); }
+
+private:
+	static int Open(const std::string &path) { return open(path.c_str(), O_RDONLY, 0666); }
+	static int Close(int fd) { return close(fd); }
+	int fd_;
+	std::mutex mtx_;
+};
+
+SharedMemory &BHomeMetaShm()
+{
+	static std::string name("bhshmq_meta_v0");
+	static SharedMemory shm(name, 1024 * 128);
+	return shm;
+}
+
 } // namespace
 
 CenterInfo *GetCenterInfo(SharedMemory &shm)
 {
 	auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
-	if (pmeta->tag_ == kCenterInfoTag) {
+	if (pmeta->tag_ == kMetaInfoTag) {
 		return &pmeta->info_;
 	}
 	return nullptr;
 }
+
 ShmSocket &DefaultSender(SharedMemory &shm)
 {
 	typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair;
@@ -121,11 +157,55 @@
 	local_cache = store.back();
 	return *local_cache.second;
 }
+
+BHomeMetaInfo *GetBHomeMeta()
+{
+	auto p = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address()));
+	return (p->tag_ == kMetaInfoTag) ? p : nullptr;
+}
+
+bool ShmMetaInit()
+{
+	SharedMemory &shm = BHomeMetaShm();
+
+	static FileLock fl("/dev/shm/" + shm.name());
+	if (!fl.try_lock()) { // single center instance only.
+		return false;
+	}
+
+	auto pmeta = GetBHomeMeta();
+	if (pmeta && pmeta->tag_ == kMetaInfoTag) {
+		++pmeta->shm_id_; // inc shm id
+		return true;      // already exist.
+	} else {
+		Mutex *mutex = shm.FindOrCreate<Mutex>("bhshmq_meta_lock");
+		if (!mutex || !mutex->try_lock()) {
+			return false;
+		}
+		DEFER1(mutex->unlock());
+
+		auto base = Addr(shm.get_address());
+		auto offset = kShmMetaInfoFixedAddress;
+		void *p = shm.Alloc(offset * 2);
+		if (Addr(p) - base <= offset) {
+			pmeta = new (Ptr(offset + base)) BHomeMetaInfo;
+			pmeta->tag_ = kMetaInfoTag;
+			pmeta->shm_id_ = 100;
+			pmeta->ssn_id_ = 10000;
+			return true;
+		}
+	}
+	return false;
+}
+
 // put center info at fixed memory position.
 // as boost shm find object (find socket/mq by id, etc...) also locks inside,
 // which node might crash inside and cause deadlock.
-bool CenterInit(SharedMemory &shm)
+bool CenterInit()
 {
+	if (!ShmMetaInit()) { return false; }
+
+	SharedMemory &shm = BHomeShm();
 	Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock");
 	if (!mutex || !mutex->try_lock()) {
 		return false;
@@ -133,7 +213,7 @@
 	DEFER1(mutex->unlock());
 
 	auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
-	if (pmeta->tag_ == kCenterInfoTag) {
+	if (pmeta->tag_ == kMetaInfoTag) {
 		return true;
 	} else {
 		auto base = Addr(shm.get_address());
@@ -155,7 +235,7 @@
 			InitMQ(info.mq_center_, NextId());
 			InitMQ(info.mq_bus_, NextId());
 
-			pmeta->tag_ = kCenterInfoTag;
+			pmeta->tag_ = kMetaInfoTag;
 			return true;
 		}
 	}
@@ -183,8 +263,10 @@
 
 std::string BHomeShmName()
 {
-	return "bhome_default_shm_v0";
+	auto bhome_meta = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address()));
+	return "bhome_shmq_id_" + std::to_string(bhome_meta->shm_id_.load());
 }
+
 SharedMemory &BHomeShm()
 {
 	static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
@@ -193,7 +275,7 @@
 
 bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); }
 
-MQId NewSession() { return 10 * (++GetCenterInfo(BHomeShm())->mqid_); }
+MQId NewSession() { return 10 * (++GetBHomeMeta()->ssn_id_); }
 
 void SetLastError(const int ec, const std::string &msg)
 {
diff --git a/src/defs.h b/src/defs.h
index 5f6fc16..56de8fa 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -39,9 +39,6 @@
 	MQInfo mq_bus_;
 	MQInfo mq_sender_;
 	robust::AtomicReqRep init_rr_;
-	std::atomic<MQId> mqid_;
-	CenterInfo() :
-	    mqid_(100000) {}
 };
 
 const int kBHCenterPort = 24287;
@@ -59,7 +56,7 @@
 ShmSocket &DefaultSender(SharedMemory &shm);
 
 MQId NewSession();
-bool CenterInit(SharedMemory &shm);
+bool CenterInit();
 bool GlobalInit(SharedMemory &shm);
 typedef std::string Topic;
 void SetLastError(const int ec, const std::string &msg);
diff --git a/src/robust.h b/src/robust.h
index c46010a..1a3f430 100644
--- a/src/robust.h
+++ b/src/robust.h
@@ -67,6 +67,8 @@
 	typedef std::function<Data(const Data)> Handler;
 	bool ClientRequest(const Data request, Data &reply);
 	bool ServerProcess(Handler onReq);
+	AtomicReqRep() :
+	    data_(0), timestamp_(now()) {}
 
 private:
 	enum State {
@@ -79,7 +81,7 @@
 	static Data Decode(Data d) { return d >> 3; }
 	typedef std::chrono::steady_clock steady_clock;
 	typedef steady_clock::duration Duration;
-	Duration now() { return steady_clock::now().time_since_epoch(); }
+	static Duration now() { return steady_clock::now().time_since_epoch(); }
 
 	bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); }
 	std::atomic<Data> data_;
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index b21f7ef..f592bff 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -50,8 +50,8 @@
 
 } // namespace
 
-TopicNode::TopicNode(SharedMemory &shm) :
-    shm_(shm), state_(eStateUninited)
+TopicNode::TopicNode(SharedMemory &shm, MQId ssn_id) :
+    shm_(shm), state_(eStateUninited), ssn_id_(ssn_id)
 {
 }
 
diff --git a/src/topic_node.h b/src/topic_node.h
index 3d6767b..8c08fcf 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -39,7 +39,7 @@
 	const MQInfo &BusAddr() const { return BHTopicBusAddress(shm()); }
 
 public:
-	TopicNode(SharedMemory &shm);
+	TopicNode(SharedMemory &shm, MQId ssn_id = 0);
 	~TopicNode();
 
 	// topic node
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index bddcbf7..fc1ad08 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -235,8 +235,9 @@
 		printf("query with ip set\n");
 		host.set_ip("127.0.0.1");
 		host.set_port(kBHCenterPort);
-		host.set_mq_id(1000011);
-		host.set_abs_addr(10296);
+		// center topic node address.
+		host.set_mq_id(201);
+		host.set_abs_addr(10072);
 
 		std::string dest(host.SerializeAsString());
 		void *proc_id = 0;
diff --git a/utest/tcp_test.cpp b/utest/tcp_test.cpp
index 86a0897..2aead7e 100644
--- a/utest/tcp_test.cpp
+++ b/utest/tcp_test.cpp
@@ -52,8 +52,8 @@
 
 		head.mutable_dest()->set_ip(connect_addr);
 		head.mutable_dest()->set_port(port);
-		head.mutable_dest()->set_mq_id(1000011);
-		head.mutable_dest()->set_abs_addr(10296);
+		head.mutable_dest()->set_mq_id(201);
+		head.mutable_dest()->set_abs_addr(10072);
 
 		return (MsgI::Serialize(head, req));
 	};

--
Gitblit v1.8.0