From ab898268c8bc493ca9862b2d64f2e1e7d20e5a4c Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 02 六月 2021 13:39:00 +0800
Subject: [PATCH] refactor.

---
 src/defs.cpp |  121 +++++++++++++++++++++++++++++++++++++---
 1 files changed, 111 insertions(+), 10 deletions(-)

diff --git a/src/defs.cpp b/src/defs.cpp
index 450349e..57df1bc 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -18,6 +18,10 @@
 #include "defs.h"
 #include "msg.h"
 #include "shm_msg_queue.h"
+#include "shm_socket.h"
+#include <boost/uuid/random_generator.hpp>
+#include <boost/uuid/string_generator.hpp>
+#include <boost/uuid/uuid.hpp>
 
 namespace
 {
@@ -70,7 +74,104 @@
 const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]);
 static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough.");
 static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct.");
+
+const int64_t kCenterInfoFixedAddress = 1024 * 4;
+
+const boost::uuids::uuid kCenterInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
+struct CenterMetaInfo {
+	boost::uuids::uuid tag_;
+	CenterInfo info_;
+};
+
+int64_t Addr(void *ptr) { return reinterpret_cast<int64_t>(ptr); }
+// void *Ptr(const int64_t offset) { return reinterpret_cast<void *>(offset); }
+template <class T = void>
+T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); }
+
 } // namespace
+
+CenterInfo *GetCenterInfo(SharedMemory &shm)
+{
+	auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
+	if (pmeta->tag_ == kCenterInfoTag) {
+		return &pmeta->info_;
+	}
+	return nullptr;
+}
+ShmSocket &DefaultSender(SharedMemory &shm)
+{
+	typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair;
+	static std::vector<Pair> store;
+	static std::mutex s_mtx;
+
+	thread_local Pair local_cache;
+	if (local_cache.first == &shm) {
+		return *local_cache.second;
+	}
+
+	std::lock_guard<std::mutex> lk(s_mtx);
+	for (auto &kv : store) {
+		if (kv.first == &shm) {
+			local_cache = kv;
+			return *local_cache.second;
+		}
+	}
+	auto &mq = GetCenterInfo(shm)->mq_sender_;
+	store.emplace_back(&shm, new ShmSocket(mq.offset_, shm, mq.id_));
+	local_cache = store.back();
+	return *local_cache.second;
+}
+// put center info at fixed memory position.
+// as boost shm find object (find socket/mq by id, etc...) also locks inside,
+// which node might crash inside and cause deadlock.
+bool CenterInit(SharedMemory &shm)
+{
+	Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock");
+	if (!mutex || !mutex->try_lock()) {
+		return false;
+	}
+	DEFER1(mutex->unlock());
+
+	auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
+	if (pmeta->tag_ == kCenterInfoTag) {
+		return true;
+	} else {
+		auto base = Addr(shm.get_address());
+		auto offset = kCenterInfoFixedAddress;
+		void *p = shm.Alloc(offset * 2);
+		if (Addr(p) - base <= offset) {
+			pmeta = new (Ptr(offset + base)) CenterMetaInfo;
+			auto &info = pmeta->info_;
+
+			auto InitMQ = [&](auto &mq, auto &&id) {
+				mq.id_ = id;
+				ShmSocket tmp(shm, id, eOpenOrCreate);
+				mq.offset_ = tmp.AbsAddr();
+			};
+
+			int id = 100;
+			auto NextId = [&]() { return ++id; };
+			InitMQ(info.mq_sender_, NextId());
+			InitMQ(info.mq_center_, NextId());
+			InitMQ(info.mq_bus_, NextId());
+
+			pmeta->tag_ = kCenterInfoTag;
+			return true;
+		}
+	}
+	return false;
+}
+
+const MQInfo &BHTopicCenterAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_center_; }
+const MQInfo &BHTopicBusAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_bus_; }
+bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply)
+{
+	return GetCenterInfo(shm)->init_rr_.ClientRequest(request, reply);
+}
+void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq)
+{
+	GetCenterInfo(shm)->init_rr_.ServerProcess(onReq);
+}
 
 int64_t CalcAllocIndex(int64_t size)
 {
@@ -84,19 +185,15 @@
 {
 	return "bhome_default_shm_v0";
 }
-bhome_shm::SharedMemory &BHomeShm()
+SharedMemory &BHomeShm()
 {
-	static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
+	static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
 	return shm;
 }
 
-bool GlobalInit(bhome_shm::SharedMemory &shm)
-{
-	MsgI::BindShm(shm);
-	typedef std::atomic<MQId> IdSrc;
-	IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000);
-	return psrc && ShmMsgQueue::SetData(*psrc);
-}
+bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); }
+
+MQId NewSession() { return 10 * (++GetCenterInfo(BHomeShm())->mqid_); }
 
 void SetLastError(const int ec, const std::string &msg)
 {
@@ -108,4 +205,8 @@
 {
 	ec = LastErrorStore().ec_;
 	msg = LastErrorStore().msg_;
-}
\ No newline at end of file
+}
+
+int NodeTimeoutSec() { return 60; }
+
+std::string BHLogDir() { return "/opt/vasystem/valog/"; }

--
Gitblit v1.8.0