From 9bf199a4770b08c03d553129757d960b605e598a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 14 五月 2021 18:05:21 +0800
Subject: [PATCH] add center info at fixed address in shm.

---
 box/center_main.cc     |   11 +-
 utest/speed_test.cpp   |    4 
 src/shm.h              |    8 ++
 src/socket.h           |    3 
 utest/simple_tests.cpp |    2 
 box/center.cpp         |    4 
 src/shm_msg_queue.h    |   15 +--
 src/socket.cpp         |   22 +----
 src/defs.h             |   30 +++++-
 src/shm_msg_queue.cpp  |   16 ++-
 src/defs.cpp           |   79 +++++++++++++++++++
 src/msg.cpp            |    7 +
 12 files changed, 145 insertions(+), 56 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index 445d307..7d51f2f 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -98,7 +98,7 @@
 		if (now < time_to_clean_) {
 			return;
 		}
-		LOG_FUNCTION;
+		// LOG_FUNCTION;
 		time_to_clean_ = now + 1;
 		int64_t limit = std::max(10000ul, msgs_.size() / 10);
 		int64_t n = 0;
@@ -109,7 +109,7 @@
 				msg.Free();
 				it = msgs_.erase(it);
 				++n;
-			} else if (msg.timestamp() + 10 < NowSec()) {
+			} else if (msg.timestamp() + 60 < NowSec()) {
 				msg.Free();
 				it = msgs_.erase(it);
 				++n;
diff --git a/box/center_main.cc b/box/center_main.cc
index 6795e41..5eb21b9 100644
--- a/box/center_main.cc
+++ b/box/center_main.cc
@@ -83,12 +83,6 @@
 	std::atomic<bool> run_;
 };
 
-bool CenterInit(bhome_shm::SharedMemory &shm)
-{
-	ShmSocket create(shm, BHGlobalSenderAddress(), 16);
-	return true;
-}
-
 } // namespace
 int center_main(int argc, const char *argv[])
 {
@@ -108,7 +102,10 @@
 	if (strcasecmp(lvl.c_str(), "fatal") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::fatal); }
 
 	auto &shm = BHomeShm();
-	CenterInit(shm);
+	if (!CenterInit(shm)) {
+		LOG_FATAL() << "init memory error.";
+		exit(0);
+	}
 	GlobalInit(shm);
 
 	InstanceFlag inst(shm, kCenterRunningFlag);
diff --git a/src/defs.cpp b/src/defs.cpp
index 450349e..b812b65 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -18,6 +18,10 @@
 #include "defs.h"
 #include "msg.h"
 #include "shm_msg_queue.h"
+#include "socket.h"
+#include <boost/uuid/random_generator.hpp>
+#include <boost/uuid/string_generator.hpp>
+#include <boost/uuid/uuid.hpp>
 
 namespace
 {
@@ -70,7 +74,77 @@
 const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]);
 static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough.");
 static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct.");
+
+const int64_t kCenterInfoFixedAddress = 1024 * 4;
+
+const boost::uuids::uuid kCenterInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
+struct CenterMetaInfo {
+	boost::uuids::uuid tag_;
+	CenterInfo info_;
+};
+
+int64_t Addr(void *ptr) { return reinterpret_cast<int64_t>(ptr); }
+// void *Ptr(const int64_t offset) { return reinterpret_cast<void *>(offset); }
+template <class T = void>
+T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); }
+
 } // namespace
+
+CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm)
+{
+	auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
+	if (pmeta->tag_ == kCenterInfoTag) {
+		return &pmeta->info_;
+	}
+	return nullptr;
+}
+
+// put center info at fixed memory position.
+// as boost shm find object (find socket/mq by id, etc...) also locks inside,
+// which node might crash inside and cause deadlock.
+bool CenterInit(bhome_shm::SharedMemory &shm)
+{
+	Mutex *mutex = shm.Create<Mutex>("shm_center_lock");
+	if (!mutex || !mutex->try_lock()) {
+		return false;
+	}
+	DEFER1(mutex->unlock());
+
+	auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
+	if (pmeta->tag_ == kCenterInfoTag) {
+		return true;
+	} else {
+		auto base = Addr(shm.get_address());
+		auto offset = kCenterInfoFixedAddress;
+		void *p = shm.Alloc(offset * 2);
+		if (Addr(p) - base <= offset) {
+			pmeta = new (Ptr(offset + base)) CenterMetaInfo;
+			auto &info = pmeta->info_;
+
+			auto InitMQ = [&](auto &mq, auto &&id) {
+				mq.id_ = id;
+				ShmSocket tmp(shm, id, 16);
+				mq.offset_ = tmp.AbsAddr();
+			};
+
+			int id = 100;
+			auto NextId = [&]() { return ++id; };
+			InitMQ(info.mq_sender_, NextId());
+			InitMQ(info.mq_center_, NextId());
+			InitMQ(info.mq_bus_, NextId());
+			InitMQ(info.mq_init_, NextId());
+
+			pmeta->tag_ = kCenterInfoTag;
+			return true;
+		}
+	}
+	return false;
+}
+
+uint64_t BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_.id_; }
+uint64_t BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_.id_; }
+uint64_t BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_.id_; }
+uint64_t BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_.id_; }
 
 int64_t CalcAllocIndex(int64_t size)
 {
@@ -93,9 +167,8 @@
 bool GlobalInit(bhome_shm::SharedMemory &shm)
 {
 	MsgI::BindShm(shm);
-	typedef std::atomic<MQId> IdSrc;
-	IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000);
-	return psrc && ShmMsgQueue::SetData(*psrc);
+	CenterInfo *pinfo = GetCenterInfo(shm);
+	return pinfo && ShmMsgQueue::SetData(pinfo->mqid_);
 }
 
 void SetLastError(const int ec, const std::string &msg)
diff --git a/src/defs.h b/src/defs.h
index f0a0d49..5c770a7 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -19,19 +19,28 @@
 #ifndef DEFS_KP8LKGD0
 #define DEFS_KP8LKGD0
 
+#include <atomic>
 #include <string>
 
 typedef uint64_t MQId;
 
-const MQId kBHDefaultSender = 99;
-const MQId kBHTopicCenter = 100;
-const MQId kBHTopicBus = 101;
-inline const MQId BHGlobalSenderAddress() { return kBHDefaultSender; }
-inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; }
-inline const MQId BHTopicBusAddress() { return kBHTopicBus; }
-
 int64_t CalcAllocIndex(int64_t size);
 int64_t GetAllocSize(int index);
+
+struct CenterInfo {
+	struct MQInfo {
+		int64_t id_ = 0;
+		int64_t offset_ = 0;
+	};
+
+	MQInfo mq_center_;
+	MQInfo mq_bus_;
+	MQInfo mq_init_;
+	MQInfo mq_sender_;
+	std::atomic<MQId> mqid_;
+	CenterInfo() :
+	    mqid_(100000) {}
+};
 
 const int kBHCenterPort = 24287;
 const char kTopicSep = '.';
@@ -42,10 +51,17 @@
 
 std::string BHomeShmName();
 bhome_shm::SharedMemory &BHomeShm();
+CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm);
+bool CenterInit(bhome_shm::SharedMemory &shm);
 bool GlobalInit(bhome_shm::SharedMemory &shm);
 typedef std::string Topic;
 void SetLastError(const int ec, const std::string &msg);
 void GetLastError(int &ec, std::string &msg);
 //TODO center can check shm for previous crash.
 
+uint64_t BHGlobalSenderAddress();
+uint64_t BHTopicCenterAddress();
+uint64_t BHTopicBusAddress();
+uint64_t BHCenterReplyAddress();
+
 #endif // end of include guard: DEFS_KP8LKGD0
diff --git a/src/msg.cpp b/src/msg.cpp
index a4777d2..edffff1 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -17,6 +17,7 @@
  */
 #include "msg.h"
 #include "bh_util.h"
+#include "defs.h"
 #include "socket.h"
 
 namespace bhome_msg
@@ -24,7 +25,8 @@
 
 ShmSocket &ShmMsg::Sender()
 {
-	static ShmSocket sender(shm(), false, BHGlobalSenderAddress(), 16);
+	static auto &mq = GetCenterInfo(shm())->mq_sender_;
+	static ShmSocket sender(mq.offset_, shm(), mq.id_);
 	return sender;
 }
 
@@ -38,7 +40,8 @@
 		int64_t free_cmd = (id() << 4) | EncodeCmd(eCmdFree);
 		Sender().Send(BHTopicCenterAddress(), free_cmd);
 	} else if (n < 0) {
-		throw -123;
+		LOG_FATAL() << "error double release data.";
+		throw std::runtime_error("double release msg.");
 	}
 	return n;
 }
diff --git a/src/shm.h b/src/shm.h
index 269df44..b5ec2ea 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -192,6 +192,12 @@
 			pdata_ = shm_.Find<Data>(ObjName(name_));
 		}
 	}
+	ShmObject(const int64_t offset, ShmType &segment, const std::string &name) :
+	    shm_(segment), name_(name)
+	{
+		pdata_ = reinterpret_cast<Data *>(Addr(shm_.get_address()) + offset);
+	}
+
 	bool IsOk() const { return pdata_; }
 
 	static bool Remove(SharedMemory &shm, const std::string &name) { return shm.destroy<Data>(ObjName(name).c_str()); }
@@ -201,11 +207,13 @@
 	std::string name() const { return name_; }
 	Data *data() { return pdata_; }
 	const Data *data() const { return pdata_; }
+	int64_t offset() const { return Addr(pdata_) - Addr(shm_.get_address()); }
 	Data *operator->() { return data(); }
 	const Data *operator->() const { return data(); }
 	bool Remove() { return Remove(shm_, name_); }
 
 private:
+	static int64_t Addr(const void *p) { return reinterpret_cast<int64_t>(p); }
 	ShmType &shm_;
 	std::string name_;
 	Data *pdata_ = nullptr;
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
index d96c511..663da1e 100644
--- a/src/shm_msg_queue.cpp
+++ b/src/shm_msg_queue.cpp
@@ -37,13 +37,13 @@
 	return (++id) * 10;
 }
 
-ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
+ShmMsgQueue::ShmMsgQueue(ShmType &segment, const MQId id, const int len) :
     id_(id),
     queue_(segment, MsgQIdToName(id_), len, segment.get_segment_manager())
 {
 }
 
-ShmMsgQueue::ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len) :
+ShmMsgQueue::ShmMsgQueue(ShmType &segment, const bool create_or_else_find, const MQId id, const int len) :
     id_(id),
     queue_(segment, create_or_else_find, MsgQIdToName(id_), len, segment.get_segment_manager())
 {
@@ -51,8 +51,11 @@
 		throw("error create/find msgq " + std::to_string(id_));
 	}
 }
-ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
-    ShmMsgQueue(NewId(), true, segment, len) {}
+ShmMsgQueue::ShmMsgQueue(const int64_t abs_addr, ShmType &segment, const MQId id) :
+    id_(id), queue_(abs_addr, segment, MsgQIdToName(id_))
+{
+	//TODO check some tag.
+}
 
 ShmMsgQueue::~ShmMsgQueue() {}
 
@@ -93,10 +96,11 @@
 	return Shmq::Find(shm, MsgQIdToName(remote_id));
 }
 
-bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, int64_t val)
+bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote, int64_t val)
 {
 	try {
-		ShmMsgQueue dest(remote_id, false, shm, 1);
+		//TODO find from center, or use offset.
+		ShmMsgQueue dest(shm, false, remote, 1);
 #ifndef BH_USE_ATOMIC_Q
 		Guard lock(GetMutex(remote_id));
 #endif
diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h
index 56ea076..eead739 100644
--- a/src/shm_msg_queue.h
+++ b/src/shm_msg_queue.h
@@ -47,13 +47,14 @@
 
 	static MQId NewId();
 
-	ShmMsgQueue(const MQId id, ShmType &segment, const int len);
-	ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len);
-	ShmMsgQueue(ShmType &segment, const int len);
+	ShmMsgQueue(ShmType &segment, const MQId id, const int len);
+	ShmMsgQueue(ShmType &segment, const bool create_or_else_find, const MQId id, const int len);
+	ShmMsgQueue(const int64_t abs_addr, ShmType &segment, const MQId id);
 	~ShmMsgQueue();
 	static bool Remove(ShmType &shm, const MQId id);
 	MQId Id() const { return id_; }
 	ShmType &shm() const { return queue_.shm(); }
+	int64_t AbsAddr() const { return queue_.offset(); }
 
 	bool Recv(RawData &val, const int timeout_ms)
 	{
@@ -73,11 +74,9 @@
 
 	bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); }
 	bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); }
-	static Queue *Find(ShmType &shm, const MQId remote_id);
-	static bool TrySend(ShmType &shm, const MQId remote_id, const RawData val);
-	static bool TrySend(ShmType &shm, const MQId remote_id, MsgI msg) { return TrySend(shm, remote_id, msg.Offset()); }
-	bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); }
-	bool TrySend(const MQId remote_id, const RawData val) { return TrySend(shm(), remote_id, val); }
+	static Queue *Find(ShmType &shm, const MQId remote);
+	static bool TrySend(ShmType &shm, const MQId remote, const RawData val);
+	bool TrySend(const MQId remote, const RawData val) { return TrySend(shm(), remote, val); }
 
 private:
 #ifndef BH_USE_ATOMIC_Q
diff --git a/src/socket.cpp b/src/socket.cpp
index 0704174..4f09517 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -28,25 +28,13 @@
 using namespace bhome_shm;
 
 ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) :
-    run_(false), mq_(id, shm, len), alloc_id_(0)
-{
-	Start();
-}
+    run_(false), mq_(shm, id, len), alloc_id_(0) { Start(); }
 ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) :
-    run_(false), mq_(id, create_or_else_find, shm, len), alloc_id_(0)
-{
-	Start();
-}
-ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
-    run_(false), mq_(shm, len), alloc_id_(0)
-{
-	Start();
-}
+    run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0) { Start(); }
+ShmSocket::ShmSocket(int64_t abs_addr, Shm &shm, const MQId id) :
+    run_(false), mq_(abs_addr, shm, id), alloc_id_(0) { Start(); }
 
-ShmSocket::~ShmSocket()
-{
-	Stop();
-}
+ShmSocket::~ShmSocket() { Stop(); }
 
 bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle)
 {
diff --git a/src/socket.h b/src/socket.h
index d69b8d4..8e9db69 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -49,11 +49,12 @@
 
 	ShmSocket(Shm &shm, const MQId id, const int len);
 	ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len);
-	ShmSocket(Shm &shm, const int len = 12);
+	ShmSocket(int64_t offset, Shm &shm, const MQId id);
 	~ShmSocket();
 	static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
 	bool Remove() { return Remove(shm(), id()); }
 	MQId id() const { return mq().Id(); }
+	int64_t AbsAddr() const { return mq().AbsAddr(); }
 	void SetNodeProc(const int proc_index, const int socket_index)
 	{
 		node_proc_index_ = proc_index;
diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp
index e14a1cd..e1f1d2f 100644
--- a/utest/simple_tests.cpp
+++ b/utest/simple_tests.cpp
@@ -108,7 +108,7 @@
 {
 	SharedMemory &shm = TestShm();
 	GlobalInit(shm);
-	ShmMsgQueue q(shm, 64);
+	ShmMsgQueue q(shm, ShmMsgQueue::NewId(), 64);
 	for (int i = 0; i < 2; ++i) {
 		int ms = i * 100;
 		printf("Timeout Test %4d: ", ms);
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 8950bbf..66e5179 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -158,8 +158,8 @@
 
 	auto Avail = [&]() { return shm.get_free_memory(); };
 	auto init_avail = Avail();
-	ShmSocket srv(shm, qlen);
-	ShmSocket cli(shm, qlen);
+	ShmSocket srv(shm, ShmMsgQueue::NewId(), qlen);
+	ShmSocket cli(shm, ShmMsgQueue::NewId(), qlen);
 
 	int ncli = 1;
 	uint64_t nmsg = 1000 * 1000 * 1;

--
Gitblit v1.8.0