From 58d904a328c0d849769b483e901a0be9426b8209 Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期二, 20 七月 2021 20:20:44 +0800
Subject: [PATCH] 调整Request C.BHFree的位置

---
 src/defs.cpp |  274 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 264 insertions(+), 10 deletions(-)

diff --git a/src/defs.cpp b/src/defs.cpp
index 0ca82bf..8305d66 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -17,7 +17,12 @@
  */
 #include "defs.h"
 #include "msg.h"
-#include "shm_queue.h"
+#include "shm_msg_queue.h"
+#include "shm_socket.h"
+#include <boost/uuid/random_generator.hpp>
+#include <boost/uuid/string_generator.hpp>
+#include <boost/uuid/uuid.hpp>
+#include <sys/file.h>
 
 namespace
 {
@@ -33,21 +38,266 @@
 	return le;
 }
 
-} // namespace
+constexpr int64_t AllocSizeIndex[] = {
+    16, 24, 32, 40, 48, 56, 64, 72,
+    80, 88, 96, 104, 120, 136, 152, 168,
+    184, 200, 224, 248, 272, 296, 328, 360,
+    392, 432, 472, 520, 568, 624, 680, 744,
+    816, 896, 984, 1080, 1184, 1296, 1416, 1544,
+    1688, 1848, 2016, 2200, 2400, 2624, 2864, 3128,
+    3416, 3728, 4072, 4448, 4856, 5304, 5792, 6320,
+    6896, 7528, 8216, 8968, 9784, 10680, 11656, 12720,
+    13880, 15144, 16520, 18024, 19664, 21456, 23408, 25536,
+    27864, 30400, 33168, 36184, 39480, 43072, 46992, 51264,
+    55928, 61016, 66568, 72624, 79232, 86440, 94304, 102880,
+    112232, 122440, 133576, 145720, 158968, 173424, 189192, 206392,
+    225160, 245632, 267968, 292328, 318904, 347896, 379528, 414032,
+    451672, 492736, 537536, 586408, 639720, 697880, 761328, 830544,
+    906048, 988416, 1078272, 1176296, 1283232, 1399896, 1527160, 1665992,
+    1817448, 1982672, 2162920, 2359552, 2574056, 2808064, 3063344, 3341832,
+    3645640, 3977064, 4338616, 4733040, 5163320, 5632712, 6144776, 6703392,
+    7312792, 7977592, 8702832, 9494000, 10357096, 11298656, 12325808, 13446336,
+    14668736, 16002264, 17457016, 19044024, 20775304, 22663968, 24724328, 26972000,
+    29424000, 32098912, 35017000, 38200368, 41673128, 45461600, 49594472, 54103064,
+    59021528, 64387128, 70240504, 76626008, 83592008, 91191288, 99481408, 108525176,
+    118391104, 129153936, 140895208, 153703864, 167676944, 182920304, 199549424, 217690280,
+    237480312, 259069432, 282621200, 308314040, 336342592, 366919192, 400275488, 436664168,
+    476360912, 519666456, 566908864, 618446040, 674668408, 736001904, 802911168, 875903096,
+    955530656, 1042397080, 1137160456, 1240538680, 1353314928, 1476343560, 1610556616, 1756970856,
+    1916695480, 2090940528, 2281026032, 2488392040, 2714609504, 2961392192, 3230609664, 3524301456,
+    3844692504, 4194210008, 4575501832, 4991456544, 5445225320, 5940245808, 6480268160, 7069383448,
+    7712054672, 8413150552, 9177982424, 10012344464, 10922557600, 11915517384, 12998746240, 14180450448,
+    15469582312, 16875907976, 18410081432, 20083725200, 21909518400, 23901292800, 26074137600, 28444513752,
+    31030378640, 33851322152, 36928715080, 40285871000, 43948222912, 47943515904, 52302017352, 57056746208,
+    62243723136, 67902243424, 74075174648, 80809281440, 88155579752, 96169723368, 104912425496, 114449918728,
+    124854456800, 136204861968, 148587122152, 162095042352, 176830955296, 192906496688, 210443450936, 229574673752};
 
-bhome_shm::SharedMemory &BHomeShm()
+const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]);
+static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough.");
+static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct.");
+
+const int64_t kCenterInfoFixedAddress = 1024 * 4;
+const int64_t kShmMetaInfoFixedAddress = 1024 * 16;
+
+const boost::uuids::uuid kMetaInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
+
+struct BHomeMetaInfo {
+	boost::uuids::uuid tag_;
+	std::atomic<uint64_t> shm_id_;
+	std::atomic<uint64_t> ssn_id_;
+};
+
+struct CenterMetaInfo {
+	boost::uuids::uuid tag_;
+	CenterInfo info_;
+};
+
+int64_t Addr(void *ptr) { return reinterpret_cast<int64_t>(ptr); }
+// void *Ptr(const int64_t offset) { return reinterpret_cast<void *>(offset); }
+template <class T = void>
+T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); }
+
+class FileLock
 {
-	static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512);
+public:
+	FileLock(const std::string &path) :
+	    fd_(Open(path))
+	{
+		if (fd_ == -1) { throw std::runtime_error("error open file:" + path); }
+	}
+	~FileLock() { Close(fd_); }
+	bool try_lock() { return fd_ != -1 && (flock(fd_, LOCK_EX | LOCK_NB) == 0); }
+	void unlock() { flock(fd_, LOCK_UN); }
+
+private:
+	static int Open(const std::string &path) { return open(path.c_str(), O_RDONLY, 0666); }
+	static int Close(int fd) { return close(fd); }
+	int fd_;
+	std::mutex mtx_;
+};
+
+SharedMemory &BHomeMetaShm()
+{
+	static std::string name("bhshmq_meta_v0");
+	static SharedMemory shm(name, 1024 * 128);
 	return shm;
 }
 
-bool GlobalInit(bhome_shm::SharedMemory &shm)
+ShmSocket &ShmSender(SharedMemory &shm, const bool reset)
 {
-	MsgI::BindShm(shm);
-	typedef std::atomic<MQId> IdSrc;
-	IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000);
-	return ShmMsgQueue::SetData(*psrc);
+	typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair;
+	static std::vector<Pair> store;
+	static std::mutex s_mtx;
+	thread_local Pair local_cache;
+
+	std::lock_guard<std::mutex> lk(s_mtx);
+
+	if (reset) {
+		for (auto &kv : store) {
+			if (kv.first == &shm) {
+				auto &mq = GetCenterInfo(shm)->mq_sender_;
+				kv.second.reset(new ShmSocket(mq.offset_, shm, mq.id_));
+				local_cache = kv;
+				return *local_cache.second;
+			}
+		}
+	} else if (local_cache.first == &shm) {
+		return *local_cache.second;
+	}
+
+	for (auto &kv : store) {
+		if (kv.first == &shm) {
+			local_cache = kv;
+			return *local_cache.second;
+		}
+	}
+	auto &mq = GetCenterInfo(shm)->mq_sender_;
+	store.emplace_back(&shm, std::make_shared<ShmSocket>(mq.offset_, shm, mq.id_));
+	// store.emplace_back(&shm, new ShmSocket(mq.offset_, shm, mq.id_));
+	local_cache = store.back();
+	return *local_cache.second;
 }
+} // namespace
+
+CenterInfo *GetCenterInfo(SharedMemory &shm)
+{
+	auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
+	if (pmeta->tag_ == kMetaInfoTag) {
+		return &pmeta->info_;
+	}
+	return nullptr;
+}
+
+ShmSocket &DefaultSender(SharedMemory &shm) { return ShmSender(shm, false); }
+
+BHomeMetaInfo *GetBHomeMeta()
+{
+	auto p = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address()));
+	return (p->tag_ == kMetaInfoTag) ? p : nullptr;
+}
+
+bool ShmMetaInit()
+{
+	SharedMemory &shm = BHomeMetaShm();
+
+	static FileLock fl("/dev/shm/" + shm.name());
+	if (!fl.try_lock()) { // single center instance only.
+		return false;
+	}
+
+	auto pmeta = GetBHomeMeta();
+	if (pmeta && pmeta->tag_ == kMetaInfoTag) {
+		// remove old shm
+		SharedMemory::Remove(BHomeShmName());
+		++pmeta->shm_id_; // inc shm id
+		return true;      // already exist.
+	} else {
+		Mutex *mutex = shm.FindOrCreate<Mutex>("bhshmq_meta_lock");
+		if (!mutex || !mutex->try_lock()) {
+			return false;
+		}
+		DEFER1(mutex->unlock());
+
+		auto base = Addr(shm.get_address());
+		auto offset = kShmMetaInfoFixedAddress;
+		void *p = shm.Alloc(offset * 2);
+		if (Addr(p) - base <= offset) {
+			pmeta = new (Ptr(offset + base)) BHomeMetaInfo;
+			pmeta->tag_ = kMetaInfoTag;
+			pmeta->shm_id_ = 100;
+			pmeta->ssn_id_ = 10000;
+			return true;
+		}
+	}
+	return false;
+}
+
+// put center info at fixed memory position.
+// as boost shm find object (find socket/mq by id, etc...) also locks inside,
+// which node might crash inside and cause deadlock.
+bool CenterInit()
+{
+	if (!ShmMetaInit()) { return false; }
+
+	SharedMemory &shm = BHomeShm();
+	Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock");
+	if (!mutex || !mutex->try_lock()) {
+		return false;
+	}
+	DEFER1(mutex->unlock());
+
+	auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
+	if (pmeta->tag_ == kMetaInfoTag) {
+		return true;
+	} else {
+		auto base = Addr(shm.get_address());
+		auto offset = kCenterInfoFixedAddress;
+		void *p = shm.Alloc(offset * 2);
+		if (Addr(p) - base <= offset) {
+			pmeta = new (Ptr(offset + base)) CenterMetaInfo;
+			auto &info = pmeta->info_;
+
+			auto InitMQ = [&](auto &mq, auto &&id) {
+				mq.id_ = id;
+				ShmSocket tmp(shm, id, eOpenOrCreate);
+				mq.offset_ = tmp.AbsAddr();
+			};
+
+			int id = 100;
+			auto NextId = [&]() { return ++id; };
+			InitMQ(info.mq_sender_, NextId());
+			InitMQ(info.mq_center_, NextId());
+			InitMQ(info.mq_bus_, NextId());
+
+			pmeta->tag_ = kMetaInfoTag;
+			return true;
+		}
+	}
+	return false;
+}
+
+const MQInfo &BHTopicCenterAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_center_; }
+const MQInfo &BHTopicBusAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_bus_; }
+bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply)
+{
+	return GetCenterInfo(shm)->init_rr_.ClientRequest(request, reply);
+}
+void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq)
+{
+	GetCenterInfo(shm)->init_rr_.ServerProcess(onReq);
+}
+
+int64_t CalcAllocIndex(int64_t size)
+{
+	auto pos = std::lower_bound(AllocSizeIndex, AllocSizeIndex + kAllocIndexLen, size);
+	return (pos == AllocSizeIndex + kAllocIndexLen) ? -1 : pos - AllocSizeIndex;
+}
+
+int64_t GetAllocSize(int index) { return index < kAllocIndexLen ? AllocSizeIndex[index] : 0; }
+
+std::string BHomeShmName()
+{
+	auto bhome_meta = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address()));
+	return "bhshmq_sid_" + std::to_string(bhome_meta->shm_id_.load());
+}
+
+SharedMemory &BHomeShm()
+{
+	static std::unique_ptr<SharedMemory> shm_ptr;
+	static std::string shm_name;
+	if (!shm_ptr || shm_name != BHomeShmName()) {
+		shm_name = BHomeShmName();
+		if (shm_ptr) {
+			ShmSender(*shm_ptr, true); // reset sender.
+		}
+		shm_ptr.reset(new SharedMemory(shm_name, 1024 * 1024 * 512));
+	}
+	return *shm_ptr;
+}
+
+bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); }
+
+MQId NewSession() { return 10 * (++GetBHomeMeta()->ssn_id_); }
 
 void SetLastError(const int ec, const std::string &msg)
 {
@@ -59,4 +309,8 @@
 {
 	ec = LastErrorStore().ec_;
 	msg = LastErrorStore().msg_;
-}
\ No newline at end of file
+}
+
+int NodeTimeoutSec() { return 60; }
+
+std::string BHLogDir() { return "/opt/vasystem/valog/"; }

--
Gitblit v1.8.0