From 1ff714838c03cba1a18884d5b48a20ee6c4275ac Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 21 五月 2021 15:00:53 +0800
Subject: [PATCH] class MsgI, ShmMsgQueue, no bind to shm.

---
 utest/simple_tests.cpp |    6 
 box/center.cpp         |   13 +-
 src/msg.h              |   37 ++---
 src/shm_msg_queue.h    |    4 
 src/defs.h             |   23 ++-
 src/sendq.cpp          |   28 +---
 box/node_center.cpp    |   17 +-
 src/topic_node.cpp     |   52 ++++----
 src/shm_msg_queue.cpp  |    8 -
 src/defs.cpp           |   52 +++++--
 src/msg.cpp            |    9 -
 utest/speed_test.cpp   |   12 +-
 src/shm_socket.cpp     |   18 +-
 src/shm_socket.h       |   23 ---
 src/topic_node.h       |    4 
 box/node_center.h      |    4 
 src/sendq.h            |   48 ++++++-
 17 files changed, 184 insertions(+), 174 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index e77c38f..53c1f42 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -65,7 +65,7 @@
 	return [&](auto &&rep_body) {
 		auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id()));
 		MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()};
-		MsgI msg;
+		MsgI msg(socket.shm());
 		if (msg.Make(reply_head, rep_body)) {
 			DEFER1(msg.Release(););
 			center->SendAllocMsg(socket, remote, msg);
@@ -73,7 +73,7 @@
 	};
 }
 
-bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr)
+bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm)
 {
 	// command
 	auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
@@ -87,7 +87,7 @@
 		auto onInit = [&](const int64_t request) {
 			return center->OnNodeInit(socket, request);
 		};
-		BHCenterHandleInit(onInit);
+		BHCenterHandleInit(socket.shm(), onInit);
 		center->OnTimer();
 	};
 
@@ -106,7 +106,7 @@
 		default: return false;
 		}
 	};
-	BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000);
+	BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000);
 
 	auto OnBusIdle = [=](ShmSocket &socket) {};
 	auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
@@ -142,7 +142,7 @@
 		}
 	};
 
-	BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000);
+	BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000);
 
 	return true;
 }
@@ -167,7 +167,7 @@
 {
 	auto nsec = NodeTimeoutSec();
 	auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
-	AddCenter(center_ptr);
+	AddCenter(center_ptr, shm);
 
 	for (auto &kv : Centers()) {
 		auto &info = kv.second;
@@ -176,6 +176,7 @@
 
 	topic_node_.reset(new CenterTopicNode(center_ptr, shm));
 }
+
 BHCenter::~BHCenter() { Stop(); }
 
 bool BHCenter::Start()
diff --git a/box/node_center.cpp b/box/node_center.cpp
index 4e228a7..cbaef0e 100644
--- a/box/node_center.cpp
+++ b/box/node_center.cpp
@@ -57,7 +57,7 @@
 {
 	auto pos = msgs_.find(id);
 	if (pos != msgs_.end()) {
-		ShmMsg(pos->second).Free();
+		pos->second.Free();
 		msgs_.erase(pos);
 	} else {
 		LOG_TRACE() << "ignore late free request.";
@@ -101,9 +101,9 @@
 	int i = 0;
 	int total_count = 0;
 	for (auto &kv : msgs_) {
-		MsgI msg(kv.second);
+		auto &msg = kv.second;
 		total_count += msg.Count();
-		LOG_TRACE() << "  " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size();
+		LOG_TRACE() << "  " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second.Offset() << ", count: " << msg.Count() << ", size: " << msg.Size();
 	}
 	LOG_TRACE() << "total count: " << total_count;
 }
@@ -173,7 +173,7 @@
 
 	auto PrepareProcInit = [&](Node &node) {
 		bool r = false;
-		ShmMsg init_msg;
+		ShmMsg init_msg(shm);
 		DEFER1(init_msg.Release());
 		MsgProcInit body;
 		auto head = InitMsgHead(GetType(body), id(), ssn);
@@ -238,7 +238,7 @@
 	if (!FindMq()) { return; }
 
 	auto size = GetAllocSize((val >> 52) & MaskBits(8));
-	MsgI new_msg;
+	MsgI new_msg(socket.shm());
 	if (new_msg.Make(size)) {
 		// 31bit proc index, 28bit id, ,4bit cmd+flag
 		int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0);
@@ -612,18 +612,15 @@
 		pub.set_topic(topic);
 		pub.set_data(content);
 		BHMsgHead head(InitMsgHead(GetType(pub), id(), 0));
-		MsgI msg;
+		MsgI msg(shm);
 		if (msg.Make(head, pub)) {
 			DEFER1(msg.Release());
 			RecordMsg(msg);
 
-			auto &mq = GetCenterInfo(shm)->mq_sender_;
-			ShmSocket sender(mq.offset_, shm, mq.id_);
-
 			for (auto &cli : clients) {
 				auto node = cli.weak_node_.lock();
 				if (node && node->state_.flag_ == kStateNormal) {
-					sender.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
+					DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
 				}
 			}
 		}
diff --git a/box/node_center.h b/box/node_center.h
index ca16cc5..caaf054 100644
--- a/box/node_center.h
+++ b/box/node_center.h
@@ -51,14 +51,14 @@
 	typedef int64_t Offset;
 
 public:
-	void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); }
+	void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); }
 	void FreeMsg(MsgId id);
 	void AutoRemove();
 	size_t size() const { return msgs_.size(); }
 	void DebugPrint() const;
 
 private:
-	std::unordered_map<MsgId, Offset> msgs_;
+	std::unordered_map<MsgId, MsgI> msgs_;
 	int64_t time_to_clean_ = 0;
 };
 
diff --git a/src/defs.cpp b/src/defs.cpp
index 2715911..a2f05cc 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -90,7 +90,7 @@
 
 } // namespace
 
-CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm)
+CenterInfo *GetCenterInfo(SharedMemory &shm)
 {
 	auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
 	if (pmeta->tag_ == kCenterInfoTag) {
@@ -98,11 +98,33 @@
 	}
 	return nullptr;
 }
+ShmSocket &DefaultSender(SharedMemory &shm)
+{
+	typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair;
+	static std::vector<Pair> store;
+	static std::mutex s_mtx;
 
+	thread_local Pair local_cache;
+	if (local_cache.first == &shm) {
+		return *local_cache.second;
+	}
+
+	std::lock_guard<std::mutex> lk(s_mtx);
+	for (auto &kv : store) {
+		if (kv.first == &shm) {
+			local_cache = kv;
+			return *local_cache.second;
+		}
+	}
+	auto &mq = GetCenterInfo(shm)->mq_sender_;
+	store.emplace_back(&shm, new ShmSocket(mq.offset_, shm, mq.id_));
+	local_cache = store.back();
+	return *local_cache.second;
+}
 // put center info at fixed memory position.
 // as boost shm find object (find socket/mq by id, etc...) also locks inside,
 // which node might crash inside and cause deadlock.
-bool CenterInit(bhome_shm::SharedMemory &shm)
+bool CenterInit(SharedMemory &shm)
 {
 	Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock");
 	if (!mutex || !mutex->try_lock()) {
@@ -140,16 +162,15 @@
 	return false;
 }
 
-const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; }
-const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; }
-const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; }
-bool BHNodeInit(const int64_t request, int64_t &reply)
+const MQInfo &BHTopicCenterAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_center_; }
+const MQInfo &BHTopicBusAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_bus_; }
+bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply)
 {
-	return GetCenterInfo(BHomeShm())->init_rr_.ClientRequest(request, reply);
+	return GetCenterInfo(shm)->init_rr_.ClientRequest(request, reply);
 }
-void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq)
+void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq)
 {
-	GetCenterInfo(BHomeShm())->init_rr_.ServerProcess(onReq);
+	GetCenterInfo(shm)->init_rr_.ServerProcess(onReq);
 }
 
 int64_t CalcAllocIndex(int64_t size)
@@ -164,18 +185,15 @@
 {
 	return "bhome_default_shm_v0";
 }
-bhome_shm::SharedMemory &BHomeShm()
+SharedMemory &BHomeShm()
 {
-	static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
+	static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
 	return shm;
 }
 
-bool GlobalInit(bhome_shm::SharedMemory &shm)
-{
-	MsgI::BindShm(shm);
-	CenterInfo *pinfo = GetCenterInfo(shm);
-	return pinfo && ShmMsgQueue::SetData(pinfo->mqid_);
-}
+bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); }
+
+MQId NewSession() { return 10 * (++GetCenterInfo(BHomeShm())->mqid_); }
 
 void SetLastError(const int ec, const std::string &msg)
 {
diff --git a/src/defs.h b/src/defs.h
index 51040e6..b117579 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -23,6 +23,7 @@
 #include <atomic>
 #include <string>
 
+class ShmSocket;
 typedef uint64_t MQId;
 
 int64_t CalcAllocIndex(int64_t size);
@@ -50,21 +51,25 @@
 class SharedMemory;
 } // namespace bhome_shm
 
+using bhome_shm::SharedMemory;
+
 std::string BHomeShmName();
-bhome_shm::SharedMemory &BHomeShm();
-CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm);
-bool CenterInit(bhome_shm::SharedMemory &shm);
-bool GlobalInit(bhome_shm::SharedMemory &shm);
+SharedMemory &BHomeShm();
+CenterInfo *GetCenterInfo(SharedMemory &shm);
+ShmSocket &DefaultSender(SharedMemory &shm);
+
+MQId NewSession();
+bool CenterInit(SharedMemory &shm);
+bool GlobalInit(SharedMemory &shm);
 typedef std::string Topic;
 void SetLastError(const int ec, const std::string &msg);
 void GetLastError(int &ec, std::string &msg);
 //TODO center can check shm for previous crash.
 
-const MQInfo &BHGlobalSenderAddress();
-const MQInfo &BHTopicCenterAddress();
-const MQInfo &BHTopicBusAddress();
-bool BHNodeInit(const int64_t request, int64_t &reply);
-void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq);
+const MQInfo &BHTopicCenterAddress(SharedMemory &shm);
+const MQInfo &BHTopicBusAddress(SharedMemory &shm);
+bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply);
+void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq);
 
 // node mq is avail with in timeout; after that may get killed.
 int NodeTimeoutSec();
diff --git a/src/msg.cpp b/src/msg.cpp
index dca2044..40a7b0d 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -23,13 +23,6 @@
 namespace bhome_msg
 {
 
-ShmSocket &ShmMsg::Sender()
-{
-	static auto &mq = GetCenterInfo(shm())->mq_sender_;
-	static ShmSocket sender(mq.offset_, shm(), mq.id_);
-	return sender;
-}
-
 int ShmMsg::Release()
 {
 	if (!valid()) {
@@ -39,7 +32,7 @@
 	if (n == 0) {
 		if (meta()->managed_) {
 			int64_t free_cmd = (id() << 4) | EncodeCmd(eCmdFree);
-			Sender().Send(BHTopicCenterAddress(), free_cmd);
+			DefaultSender(shm()).Send(BHTopicCenterAddress(shm()), free_cmd);
 		} else {
 			Free();
 		}
diff --git a/src/msg.h b/src/msg.h
index 1ac153a..12922b5 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -34,14 +34,9 @@
 // ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required.
 // message content layout: (meta) / header_size + header + data_size + data
 
-class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg>
+class ShmMsg
 {
-public:
-	static inline SharedMemory &shm() { return GetData(); }
-
 private:
-	static ShmSocket &Sender();
-
 	// store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
 	class RefCount : private boost::noncopyable
 	{
@@ -58,11 +53,7 @@
 	typedef int64_t OffsetType;
 	static OffsetType Addr(void *ptr) { return reinterpret_cast<OffsetType>(ptr); }
 	static void *Ptr(const OffsetType offset) { return reinterpret_cast<void *>(offset); }
-	static inline OffsetType BaseAddr()
-	{
-		static const OffsetType base = Addr(shm().get_address()); // cache value.
-		return base;
-	}
+	OffsetType BaseAddr() const { return Addr(shm().get_address()); }
 
 	static const uint32_t kMsgTag = 0xf1e2d3c4;
 	struct Meta {
@@ -83,7 +74,8 @@
 		    capacity_(size), id_(NewId()), timestamp_(NowSec()) {}
 	};
 	OffsetType offset_;
-	static void *Alloc(const size_t size)
+	SharedMemory *pshm_;
+	void *Alloc(const size_t size)
 	{
 		void *p = shm().Alloc(sizeof(Meta) + size);
 		if (p) {
@@ -132,24 +124,27 @@
 		if (!addr) {
 			return false;
 		}
-		ShmMsg(addr).swap(*this);
+		offset_ = Addr(addr) - BaseAddr();
 		return true;
 	}
-	ShmMsg(void *p) :
-	    offset_(p ? (Addr(p) - BaseAddr()) : 0) {}
 
 	template <class T = void>
 	T *get() const { return offset_ != 0 ? static_cast<T *>(Ptr(offset_ + BaseAddr())) : nullptr; }
 
 public:
-	static bool BindShm(SharedMemory &shm) { return SetData(shm); }
-	ShmMsg() :
-	    offset_(0) {}
-	explicit ShmMsg(const OffsetType offset) :
-	    offset_(offset) {}
+	explicit ShmMsg(SharedMemory &shm) :
+	    offset_(0), pshm_(&shm) {}
+	ShmMsg(const OffsetType offset, SharedMemory &shm) :
+	    offset_(offset), pshm_(&shm) {}
 	OffsetType Offset() const { return offset_; }
 	OffsetType &OffsetRef() { return offset_; }
-	void swap(ShmMsg &a) { std::swap(offset_, a.offset_); }
+	SharedMemory &shm() const { return *pshm_; }
+
+	void swap(ShmMsg &a)
+	{
+		std::swap(offset_, a.offset_);
+		std::swap(pshm_, a.pshm_);
+	}
 	bool valid() const { return offset_ != 0 && meta()->tag_ == kMsgTag; }
 	int64_t id() const { return valid() ? meta()->id_ : 0; }
 	int64_t timestamp() const { return valid() ? meta()->timestamp_.load() : 0; }
diff --git a/src/sendq.cpp b/src/sendq.cpp
index f1e5918..2a772b0 100644
--- a/src/sendq.cpp
+++ b/src/sendq.cpp
@@ -33,13 +33,14 @@
 		} else {
 			al.insert(al.begin(), Array())->emplace_back(std::move(tmp));
 		}
+		count_in_.Count1();
 	} catch (std::exception &e) {
 		LOG_ERROR() << "sendq error: " << e.what();
 		throw e;
 	}
 }
 
-int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr)
+int SendQ::DoSend1Remote(const Remote remote, Array &arr)
 {
 	auto FirstNotExpired = [](Array &l) {
 		auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; };
@@ -53,8 +54,8 @@
 			info.on_expire_(info.data_);
 		}
 	}
-
-	while (pos != arr.end() && mq.TrySend(pos->data().mq_, pos->data().data_)) {
+	auto TrySend1 = [this](MsgInfo const &info) { return ShmMsgQueue::TrySend(shm_, info.mq_, info.data_); };
+	while (pos != arr.end() && TrySend1(pos->data())) {
 		++pos;
 	}
 
@@ -63,27 +64,26 @@
 	return nprocessed;
 }
 
-int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &al)
+int SendQ::DoSend1Remote(const Remote remote, ArrayList &al)
 {
 	int nsend = 0;
 	auto AllSent = [&](Array &arr) {
-		nsend += DoSend1Remote(mq, remote, arr);
+		nsend += DoSend1Remote(remote, arr);
 		return arr.empty();
 	};
 	for (auto it = al.begin(); it != al.end() && AllSent(*it); it = al.erase(it)) {}
 	return nsend;
 }
 
-bool SendQ::TrySend(ShmMsgQueue &mq)
+bool SendQ::TrySend()
 {
 	std::unique_lock<std::mutex> lock(mutex_out_);
-	// if (TooFast()) { return false; }
 
 	size_t nsend = 0;
 	if (!out_.empty()) {
 		auto rec = out_.begin();
 		do {
-			nsend += DoSend1Remote(mq, rec->first, rec->second);
+			nsend += DoSend1Remote(rec->first, rec->second);
 			if (rec->second.empty()) {
 				rec = out_.erase(rec);
 			} else {
@@ -91,6 +91,7 @@
 			}
 		} while (rec != out_.end());
 	}
+	count_out_.Count(nsend);
 
 	auto Collect = [&]() {
 		std::unique_lock<std::mutex> lock(mutex_in_);
@@ -109,14 +110,3 @@
 
 	return !out_.empty();
 }
-
-bool SendQ::TooFast()
-{
-	auto cur = NowSec();
-	if (cur > last_time_) {
-		last_time_ = cur;
-		count_ = 0;
-	}
-
-	return ++count_ > 1000 * 100;
-} // not accurate in multi-thread.
\ No newline at end of file
diff --git a/src/sendq.h b/src/sendq.h
index 759e12a..ec63b05 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -46,11 +46,13 @@
 	typedef TimedData<MsgInfo> TimedMsg;
 	typedef TimedMsg::TimePoint TimePoint;
 	typedef TimedMsg::Duration Duration;
+	SendQ(SharedMemory &shm) :
+	    shm_(shm) {}
 
 	bool Append(const MQInfo &mq, MsgI msg)
 	{
 		msg.AddRef();
-		auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); };
+		auto onMsgExpire = [msg](const Data &d) mutable { msg.Release(); };
 		try {
 			AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
 			return true;
@@ -63,9 +65,9 @@
 	bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire)
 	{
 		msg.AddRef();
-		auto onMsgExpire = [onExpire](const Data &d) {
+		auto onMsgExpire = [onExpire, msg](const Data &d) mutable {
 			onExpire(d);
-			MsgI(d).Release();
+			msg.Release();
 		};
 		try {
 			AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
@@ -85,7 +87,7 @@
 			return false;
 		}
 	}
-	bool TrySend(ShmMsgQueue &mq);
+	bool TrySend();
 
 private:
 	static TimePoint Now() { return TimedMsg::Clock::now(); }
@@ -96,18 +98,48 @@
 	typedef std::list<Array> ArrayList;
 	typedef std::unordered_map<Remote, ArrayList> Store;
 
-	int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr);
-	int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
+	int DoSend1Remote(const Remote remote, Array &arr);
+	int DoSend1Remote(const Remote remote, ArrayList &arr);
 
 	bool TooFast();
 
+	SharedMemory &shm_;
 	std::mutex mutex_in_;
 	std::mutex mutex_out_;
 	Store in_;
 	Store out_;
 
-	int64_t count_ = 0;
-	int64_t last_time_ = 0;
+	struct Counter {
+		std::atomic<int64_t> count_;
+		std::atomic<int64_t> count_1sec_;
+		std::atomic<int64_t> last_time_;
+		Counter() :
+		    count_(0), count_1sec_(0), last_time_(0) {}
+		void Count1()
+		{
+			CheckTime();
+			++count_1sec_;
+			++count_;
+		}
+		void Count(int n)
+		{
+			CheckTime();
+			count_1sec_ += n;
+			count_ += n;
+		}
+		void CheckTime()
+		{
+			auto cur = NowSec();
+			if (cur > last_time_) {
+				count_1sec_ = 0;
+				last_time_ = cur;
+			}
+		}
+		int64_t GetCount() const { return count_.load(); }
+		int64_t LastSec() const { return count_1sec_.load(); }
+	};
+	Counter count_in_;
+	Counter count_out_;
 };
 
 #endif // end of include guard: SENDQ_IWKMSK7M
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
index 1d78e8c..be2d2a2 100644
--- a/src/shm_msg_queue.cpp
+++ b/src/shm_msg_queue.cpp
@@ -31,12 +31,6 @@
 
 } // namespace
 
-ShmMsgQueue::MQId ShmMsgQueue::NewId()
-{
-	static auto &id = GetData("Must init shared memory before use! Please make sure center is running.");
-	return (++id) * 10;
-}
-
 ShmMsgQueue::ShmMsgQueue(ShmType &segment, const MQId id, const int len) :
     id_(id),
     queue_(segment, MsgQIdToName(id_), len, segment.get_segment_manager())
@@ -84,7 +78,7 @@
 			if (IsCmd(val)) {
 				LOG_DEBUG() << "clsing queue " << id << ", has a cmd" << DecodeCmd(val);
 			} else {
-				MsgI(val).Release();
+				MsgI(val, shm).Release();
 			}
 		}
 	}
diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h
index de60fde..6d922aa 100644
--- a/src/shm_msg_queue.h
+++ b/src/shm_msg_queue.h
@@ -27,7 +27,7 @@
 
 #define BH_USE_ATOMIC_Q
 
-class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
+class ShmMsgQueue
 {
 public:
 	typedef int64_t RawData;
@@ -45,8 +45,6 @@
 	typedef Shmq::Data Queue;
 	typedef Shmq::ShmType ShmType;
 	typedef uint64_t MQId;
-
-	static MQId NewId();
 
 	ShmMsgQueue(ShmType &segment, const MQId id, const int len);
 	ShmMsgQueue(ShmType &segment, const bool create_or_else_find, const MQId id, const int len);
diff --git a/src/shm_socket.cpp b/src/shm_socket.cpp
index f177b87..11824d7 100644
--- a/src/shm_socket.cpp
+++ b/src/shm_socket.cpp
@@ -30,18 +30,18 @@
 using namespace bhome_shm;
 
 ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) :
-    run_(false), mq_(shm, id, len), alloc_id_(0) { Start(); }
+    run_(false), mq_(shm, id, len), alloc_id_(0), send_buffer_(shm) { Start(); }
 ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) :
-    run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0) { Start(); }
+    run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0), send_buffer_(shm) { Start(); }
 ShmSocket::ShmSocket(int64_t abs_addr, Shm &shm, const MQId id) :
-    run_(false), mq_(abs_addr, shm, id), alloc_id_(0) { Start(); }
+    run_(false), mq_(abs_addr, shm, id), alloc_id_(0), send_buffer_(shm) { Start(); }
 
 ShmSocket::~ShmSocket() { Stop(); }
 
 bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle)
 {
 	auto ioProc = [this, onData, onRaw, onIdle]() {
-		auto DoSend = [this]() { return send_buffer_.TrySend(mq()); };
+		auto DoSend = [this]() { return send_buffer_.TrySend(); };
 		auto DoRecv = [=] {
 			// do not recv if no cb is set.
 			if (!onData && per_msg_cbs_->empty() && !onRaw && alloc_cbs_->empty()) { return false; }
@@ -73,7 +73,7 @@
 				if (IsCmd(val)) {
 					onCmdCB(*this, val);
 				} else {
-					MsgI imsg(val);
+					MsgI imsg(val, shm());
 					DEFER1(imsg.Release());
 					BHMsgHead head;
 					if (imsg.ParseHead(head)) {
@@ -113,7 +113,7 @@
 		while (run_) { ioProc(); }
 		// try send pending msgs.
 		auto end_time = steady_clock::now() + 3s;
-		while (send_buffer_.TrySend(mq()) && steady_clock::now() < end_time) {
+		while (send_buffer_.TrySend() && steady_clock::now() < end_time) {
 			// LOG_DEBUG() << "try send pending msgs.";
 		}
 	};
@@ -170,7 +170,7 @@
 	};
 #if 0
 	// self alloc
-	MsgI msg;
+	MsgI msg(shm());
 	if (msg.Make(size)) {
 		DEFER1(msg.Release());
 		return OnResult(msg);
@@ -194,7 +194,7 @@
 	              (id << 4) |
 	              EncodeCmd(eCmdAllocRequest0);
 	auto rawCB = [onResult](ShmSocket &sock, int64_t &val) {
-		MsgI msg((val >> 32) & MaskBits(31));
+		MsgI msg(((val >> 32) & MaskBits(31)), sock.shm());
 		DEFER1(msg.Release());
 		onResult(msg);
 		return true;
@@ -206,5 +206,5 @@
 		alloc_cbs_->Pick(id, cb_no_use);
 	};
 
-	return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB);
+	return Send(BHTopicCenterAddress(shm()), cmd, onExpireRemoveCB);
 }
\ No newline at end of file
diff --git a/src/shm_socket.h b/src/shm_socket.h
index 02500b2..bf78e89 100644
--- a/src/shm_socket.h
+++ b/src/shm_socket.h
@@ -66,22 +66,6 @@
 	bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
 	bool Stop();
 
-	template <class Body>
-	bool CenterSend(const MQInfo &remote, BHMsgHead &head, Body &body)
-	{
-		try {
-			//TODO alloc outsiez and use send.
-			MsgI msg;
-			if (!msg.Make(head, body)) { return false; }
-			DEFER1(msg.Release());
-
-			return Send(remote, msg);
-		} catch (...) {
-			SetLastError(eError, "Send internal error.");
-			return false;
-		}
-	}
-
 	bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
 
 	template <class Body>
@@ -155,9 +139,9 @@
 	bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB());
 
 	template <class... Rest>
-	bool SendImpl(const MQInfo &remote, Rest &&...rest)
+	bool SendImpl(Rest &&...rest)
 	{
-		return send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
+		return send_buffer_.Append(std::forward<decltype(rest)>(rest)...);
 	}
 
 	std::vector<std::thread> workers_;
@@ -188,12 +172,13 @@
 
 	Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_;
 	Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_;
-	SendQ send_buffer_;
 
 	// node request center alloc memory.
 	int node_proc_index_ = -1;
 	int socket_index_ = -1;
 	std::atomic<int> alloc_id_;
+
+	SendQ send_buffer_;
 };
 
 #endif // end of include guard: SHM_SOCKET_GWTJHBPO
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 6ed7713..fce7ed6 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -66,13 +66,13 @@
 	}
 
 	if (ssn_id_ == 0) {
-		ssn_id_ = ShmMsgQueue::NewId();
+		ssn_id_ = NewSession();
 	}
 	LOG_DEBUG() << "Node Init, id " << ssn_id_;
 	auto NodeInit = [&]() {
 		int64_t init_request = ssn_id_ << 4 | EncodeCmd(eCmdNodeInit);
 		int64_t reply = 0;
-		if (BHNodeInit(init_request, reply) && DecodeCmd(reply) == eCmdNodeInitReply) {
+		if (BHNodeInit(shm(), init_request, reply) && DecodeCmd(reply) == eCmdNodeInitReply) {
 			int64_t abs_addr = reply >> 4;
 			sockets_.emplace_back(new ShmSocket(abs_addr, shm_, ssn_id_));
 			LOG_DEBUG() << "node init ok";
@@ -94,7 +94,7 @@
 				auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_);
 				AddRoute(head, socket);
 				if (imsg.Fill(head, body)) {
-					socket.Send(BHTopicCenterAddress(), imsg);
+					socket.Send(CenterAddr(), imsg);
 				}
 			} break;
 			case kMsgTypeProcInitReply: {
@@ -187,12 +187,12 @@
 			MsgCommonReply body;
 			CheckResult(imsg, head, body);
 		};
-		return sock.Send(BHTopicCenterAddress(), head, body, onResult);
+		return sock.Send(CenterAddr(), head, body, onResult);
 	} else {
-		MsgI reply;
+		MsgI reply(shm());
 		DEFER1(reply.Release(););
 		BHMsgHead reply_head;
-		bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+		bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms);
 		if (r) {
 			CheckResult(reply, reply_head, reply_body);
 		}
@@ -228,12 +228,12 @@
 			MsgCommonReply body;
 			CheckResult(imsg, head, body);
 		};
-		return sock.Send(BHTopicCenterAddress(), head, body, onResult);
+		return sock.Send(CenterAddr(), head, body, onResult);
 	} else {
-		MsgI reply;
+		MsgI reply(shm());
 		DEFER1(reply.Release(););
 		BHMsgHead reply_head;
-		bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+		bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms);
 		return r && CheckResult(reply, reply_head, reply_body);
 	}
 }
@@ -253,12 +253,12 @@
 	AddRoute(head, sock);
 
 	if (timeout_ms == 0) {
-		return sock.Send(BHTopicCenterAddress(), head, body);
+		return sock.Send(CenterAddr(), head, body);
 	} else {
-		MsgI reply;
+		MsgI reply(shm());
 		DEFER1(reply.Release(););
 		BHMsgHead reply_head;
-		bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+		bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms);
 		r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
 		return (r && IsSuccess(reply_body.errmsg().errcode()));
 	}
@@ -282,10 +282,10 @@
 	BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn()));
 	AddRoute(head, sock);
 
-	MsgI reply;
+	MsgI reply(shm());
 	DEFER1(reply.Release());
 	BHMsgHead reply_head;
-	return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) &&
+	return (sock.SendAndRecv(CenterAddr(), head, query, reply, reply_head, timeout_ms) &&
 	        reply_head.type() == kMsgTypeQueryTopicReply &&
 	        reply.ParseBody(reply_body));
 }
@@ -301,10 +301,10 @@
 	BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn()));
 	AddRoute(head, sock);
 
-	MsgI reply;
+	MsgI reply(shm());
 	DEFER1(reply.Release());
 	BHMsgHead reply_head;
-	return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) &&
+	return (sock.SendAndRecv(CenterAddr(), head, query, reply, reply_head, timeout_ms) &&
 	        reply_head.type() == kMsgTypeQueryProcReply &&
 	        reply.ParseBody(reply_body));
 }
@@ -324,12 +324,12 @@
 	AddRoute(head, sock);
 
 	if (timeout_ms == 0) {
-		return sock.Send(BHTopicCenterAddress(), head, body);
+		return sock.Send(CenterAddr(), head, body);
 	} else {
-		MsgI reply;
+		MsgI reply(shm());
 		DEFER1(reply.Release(););
 		BHMsgHead reply_head;
-		bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+		bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms);
 		r = r && reply_head.type() == kMsgTypeCommonReply;
 		r = r && reply.ParseBody(reply_body);
 		return r;
@@ -525,7 +525,7 @@
 			AddRoute(head, sock);
 			head.set_topic(request.topic());
 
-			MsgI reply_msg;
+			MsgI reply_msg(shm());
 			DEFER1(reply_msg.Release(););
 			BHMsgHead reply_head;
 
@@ -596,13 +596,13 @@
 		AddRoute(head, sock);
 
 		if (timeout_ms == 0) {
-			return sock.Send(BHTopicBusAddress(), head, pub);
+			return sock.Send(BusAddr(), head, pub);
 		} else {
-			MsgI reply;
+			MsgI reply(shm());
 			DEFER1(reply.Release(););
 			BHMsgHead reply_head;
 			MsgCommonReply reply_body;
-			return sock.SendAndRecv(BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
+			return sock.SendAndRecv(BusAddr(), head, pub, reply, reply_head, timeout_ms) &&
 			       reply_head.type() == kMsgTypeCommonReply &&
 			       reply.ParseBody(reply_body) &&
 			       IsSuccess(reply_body.errmsg().errcode());
@@ -629,12 +629,12 @@
 		BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn()));
 		AddRoute(head, sock);
 		if (timeout_ms == 0) {
-			return sock.Send(BHTopicBusAddress(), head, sub);
+			return sock.Send(BusAddr(), head, sub);
 		} else {
-			MsgI reply;
+			MsgI reply(shm());
 			DEFER1(reply.Release(););
 			BHMsgHead reply_head;
-			return sock.SendAndRecv(BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) &&
+			return sock.SendAndRecv(BusAddr(), head, sub, reply, reply_head, timeout_ms) &&
 			       reply_head.type() == kMsgTypeCommonReply &&
 			       reply.ParseBody(reply_body) &&
 			       IsSuccess(reply_body.errmsg().errcode());
diff --git a/src/topic_node.h b/src/topic_node.h
index dcc9518..c115010 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -34,7 +34,9 @@
 	SharedMemory &shm_;
 	ProcInfo info_;
 
-	SharedMemory &shm() { return shm_; }
+	SharedMemory &shm() const { return shm_; }
+	const MQInfo &CenterAddr() const { return BHTopicCenterAddress(shm()); }
+	const MQInfo &BusAddr() const { return BHTopicBusAddress(shm()); }
 
 public:
 	TopicNode(SharedMemory &shm);
diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp
index e1f1d2f..e9131b0 100644
--- a/utest/simple_tests.cpp
+++ b/utest/simple_tests.cpp
@@ -108,12 +108,12 @@
 {
 	SharedMemory &shm = TestShm();
 	GlobalInit(shm);
-	ShmMsgQueue q(shm, ShmMsgQueue::NewId(), 64);
+	ShmMsgQueue q(shm, NewSession(), 64);
 	for (int i = 0; i < 2; ++i) {
 		int ms = i * 100;
 		printf("Timeout Test %4d: ", ms);
 		boost::timer::auto_cpu_timer timer;
-		MsgI msg;
+		MsgI msg(shm);
 		bool r = q.Recv(msg, ms);
 		BOOST_CHECK(!r);
 	}
@@ -125,7 +125,7 @@
 	typedef MsgI Msg;
 	GlobalInit(shm);
 
-	Msg m0(1000);
+	Msg m0(1000, shm);
 	BOOST_CHECK(m0.valid());
 	BOOST_CHECK_EQUAL(m0.Count(), 1);
 	Msg m1 = m0;
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index ef56678..f33f0db 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -24,7 +24,7 @@
 {
 	SharedMemory &shm = TestShm();
 	GlobalInit(shm);
-	MQId server_id = ShmMsgQueue::NewId();
+	MQId server_id = NewSession();
 	ShmMsgQueue server(server_id, shm, 1000);
 
 	const int timeout = 1000;
@@ -35,10 +35,10 @@
 
 	std::string str(data_size, 'a');
 	auto Writer = [&](int writer_id, uint64_t n) {
-		MQId cli_id = ShmMsgQueue::NewId();
+		MQId cli_id = NewSession();
 
 		ShmMsgQueue mq(cli_id, shm, 64);
-		MsgI msg;
+		MsgI msg(shm);
 		MsgRequestTopic body;
 		body.set_topic("topic");
 		body.set_data(str);
@@ -58,7 +58,7 @@
 		auto now = []() { return steady_clock::now(); };
 		auto tm = now();
 		while (*run) {
-			MsgI msg;
+			MsgI msg(shm);
 			BHMsgHead head;
 			if (mq.TryRecv(msg)) {
 				DEFER1(msg.Release());
@@ -149,8 +149,8 @@
 
 	auto Avail = [&]() { return shm.get_free_memory(); };
 	auto init_avail = Avail();
-	ShmSocket srv(shm, ShmMsgQueue::NewId(), qlen);
-	ShmSocket cli(shm, ShmMsgQueue::NewId(), qlen);
+	ShmSocket srv(shm, NewSession(), qlen);
+	ShmSocket cli(shm, NewSession(), qlen);
 
 	int ncli = 1;
 	uint64_t nmsg = 1000 * 1000 * 1;

--
Gitblit v1.8.0