From cab831748a2a9cc18b7f18f3b5e14a4374b7ab68 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 17 五月 2021 18:34:26 +0800
Subject: [PATCH] socket send using abs addr, avoid shm find by id.

---
 src/socket.h                     |   44 +---
 box/center.cpp                   |  103 ++++++-----
 .vscode/settings.json            |   11 +
 src/shm_msg_queue.h              |    5 
 src/socket.cpp                   |   40 +++
 src/defs.h                       |   18 +-
 box/center.h                     |    4 
 src/sendq.cpp                    |   33 +++
 src/topic_node.cpp               |  100 ++++++----
 src/shm_msg_queue.cpp            |    6 
 src/defs.cpp                     |    8 
 utest/speed_test.cpp             |   27 +-
 proto/source/bhome_msg_api.proto |    6 
 utest/api_test.cpp               |   14 +
 src/bh_util.h                    |    4 
 src/topic_node.h                 |    5 
 src/sendq.h                      |   48 +++--
 src/bh_api.cpp                   |   31 ++-
 18 files changed, 307 insertions(+), 200 deletions(-)

diff --git a/.vscode/settings.json b/.vscode/settings.json
index c0b9587..df60df7 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -61,7 +61,16 @@
         "strstream": "cpp",
         "unordered_set": "cpp",
         "cfenv": "cpp",
-        "*.ipp": "cpp"
+        "*.ipp": "cpp",
+        "cassert": "cpp",
+        "cerrno": "cpp",
+        "cfloat": "cpp",
+        "ciso646": "cpp",
+        "climits": "cpp",
+        "ios": "cpp",
+        "locale": "cpp",
+        "queue": "cpp",
+        "random": "cpp"
     },
     "files.exclude": {
         "**/*.un~": true,
diff --git a/box/center.cpp b/box/center.cpp
index 7d51f2f..2f244b4 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -105,17 +105,18 @@
 		auto it = msgs_.begin();
 		while (it != msgs_.end() && --limit > 0) {
 			ShmMsg msg(it->second);
-			if (msg.Count() == 0) {
+			auto Free = [&]() {
 				msg.Free();
 				it = msgs_.erase(it);
 				++n;
-			} else if (msg.timestamp() + 60 < NowSec()) {
-				msg.Free();
-				it = msgs_.erase(it);
-				++n;
-				// LOG_DEBUG() << "release timeout msg, someone crashed.";
-			} else {
+			};
+			int n = now - msg.timestamp();
+			if (n < 10) {
 				++it;
+			} else if (msg.Count() == 0) {
+				Free();
+			} else if (n > 60) {
+				Free();
 			}
 		}
 		if (n > 0) {
@@ -181,22 +182,24 @@
 	typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
 
 	struct NodeInfo {
-		ProcState state_;             // state
-		std::set<Address> addrs_;     // registered mqs
-		ProcInfo proc_;               //
-		AddressTopics services_;      // address: topics
-		AddressTopics subscriptions_; // address: topics
+		ProcState state_;               // state
+		std::map<MQId, int64_t> addrs_; // registered mqs
+		ProcInfo proc_;                 //
+		AddressTopics services_;        // address: topics
+		AddressTopics subscriptions_;   // address: topics
 	};
 	typedef std::shared_ptr<NodeInfo> Node;
 	typedef std::weak_ptr<NodeInfo> WeakNode;
 
 	struct TopicDest {
-		Address mq_;
+		MQId mq_id_;
+		int64_t mq_abs_addr_;
 		WeakNode weak_node_;
-		bool operator<(const TopicDest &a) const { return mq_ < a.mq_; }
+		bool operator<(const TopicDest &a) const { return mq_id_ < a.mq_id_; }
 	};
 	inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
-	inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
+	inline int64_t SrcAbsAddr(const BHMsgHead &head) { return head.route(0).abs_addr(); }
+	inline bool MatchAddr(std::map<Address, int64_t> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
 
 	NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) :
 	    id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(0) {}
@@ -218,39 +221,38 @@
 			return; // ignore in exists.
 		}
 		auto UpdateRegInfo = [&](Node &node) {
-			for (int i = 0; i < 10; ++i) {
-				node->addrs_.insert(ssn + i);
-			}
 			node->state_.timestamp_ = NowSec() - offline_time_;
 			node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
 
 			// create sockets.
+			const int nsocks = 4;
 			try {
-				auto CreateSocket = [&](const MQId id) { ShmSocket tmp(shm, true, id, 16); };
-				// alloc(-1), node, server, sub, request,
-				for (int i = 0; i < 4; ++i) {
-					CreateSocket(ssn + i);
-					node->addrs_.insert(ssn + i);
+				for (int i = 0; i < nsocks; ++i) {
+					ShmSocket tmp(shm, true, ssn + i, 16);
+					node->addrs_.emplace(ssn + i, tmp.AbsAddr());
 				}
 				return true;
 			} catch (...) {
+				for (int i = 0; i < nsocks; ++i) {
+					ShmSocket::Remove(shm, ssn + i);
+				}
 				return false;
 			}
 		};
 
-		auto PrepareProcInit = [&]() {
+		auto PrepareProcInit = [&](Node &node) {
 			bool r = false;
 			ShmMsg init_msg;
 			if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) {
 				// 31bit pointer, 4bit cmd+flag
 				int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply);
-				r = SendAllocReply(socket, ssn, reply, init_msg);
+				r = SendAllocReply(socket, {ssn, node->addrs_[ssn]}, reply, init_msg);
 			}
 			return r;
 		};
 
 		Node node(new NodeInfo);
-		if (UpdateRegInfo(node) && PrepareProcInit()) {
+		if (UpdateRegInfo(node) && PrepareProcInit(node)) {
 			nodes_[ssn] = node;
 			LOG_INFO() << "new node ssn (" << ssn << ") init";
 		} else {
@@ -261,13 +263,13 @@
 	}
 	void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); }
 
-	bool SendAllocReply(ShmSocket &socket, const Address dest, const int64_t reply, const MsgI &msg)
+	bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg)
 	{
 		RecordMsg(msg);
 		auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); };
 		return socket.Send(dest, reply, onExpireFree);
 	}
-	bool SendAllocMsg(ShmSocket &socket, const Address dest, const MsgI &msg)
+	bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg)
 	{
 		RecordMsg(msg);
 		return socket.Send(dest, msg);
@@ -284,7 +286,21 @@
 		if (proc_rec.proc_.empty()) {
 			return;
 		}
-		Address dest = proc_rec.ssn_ + socket_index;
+
+		MQInfo dest = {proc_rec.ssn_ + socket_index, 0};
+		auto FindMq = [&]() {
+			auto pos = nodes_.find(proc_rec.ssn_);
+			if (pos != nodes_.end()) {
+				for (auto &&mq : pos->second->addrs_) {
+					if (mq.first == dest.id_) {
+						dest.offset_ = mq.second;
+						return true;
+					}
+				}
+			}
+			return false;
+		};
+		if (!FindMq()) { return; }
 
 		auto size = GetAllocSize((val >> 52) & MaskBits(8));
 		MsgI new_msg;
@@ -337,10 +353,6 @@
 			// when node restart, ssn will change,
 			// and old node will be removed after timeout.
 			auto UpdateRegInfo = [&](Node &node) {
-				node->addrs_.insert(SrcAddr(head));
-				for (auto &addr : msg.addrs()) {
-					node->addrs_.insert(addr.mq_id());
-				}
 				node->proc_.Swap(msg.mutable_proc());
 				node->state_.timestamp_ = head.timestamp();
 				node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
@@ -420,11 +432,11 @@
 			    auto src = SrcAddr(head);
 			    auto &topics = msg.topics().topic_list();
 			    node->services_[src].insert(topics.begin(), topics.end());
-			    TopicDest dest = {src, node};
+			    TopicDest dest = {src, SrcAbsAddr(head), node};
 			    for (auto &topic : topics) {
 				    service_map_[topic].insert(dest);
 			    }
-			    LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << *node->addrs_.begin() << " serve " << topics.size() << " topics:\n";
+			    LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << node->addrs_.begin()->first << " serve " << topics.size() << " topics:\n";
 			    for (auto &topic : topics) {
 				    LOG_DEBUG() << "\t" << topic;
 			    }
@@ -464,7 +476,8 @@
 					if (dest_node && Valid(*dest_node)) {
 						auto node_addr = reply.add_node_address();
 						node_addr->set_proc_id(dest_node->proc_.proc_id());
-						node_addr->mutable_addr()->set_mq_id(dest.mq_);
+						node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
+						node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
 					}
 				}
 				return reply;
@@ -482,7 +495,7 @@
 			auto src = SrcAddr(head);
 			auto &topics = msg.topics().topic_list();
 			node->subscriptions_[src].insert(topics.begin(), topics.end());
-			TopicDest dest = {src, node};
+			TopicDest dest = {src, SrcAbsAddr(head), node};
 			for (auto &topic : topics) {
 				subscribe_map_[topic].insert(dest);
 			}
@@ -505,7 +518,7 @@
 			};
 
 			if (pos != node->subscriptions_.end()) {
-				const TopicDest &dest = {src, node};
+				const TopicDest &dest = {src, SrcAbsAddr(head), node};
 				auto &topics = msg.topics().topic_list();
 				// clear node sub records;
 				for (auto &topic : topics) {
@@ -602,7 +615,7 @@
 	{
 		auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) {
 			for (auto &addr_topics : node_rec) {
-				TopicDest dest{addr_topics.first, node};
+				TopicDest dest{addr_topics.first, 0, node}; // abs_addr is not used.
 				for (auto &topic : addr_topics.second) {
 					auto pos = rec_map.find(topic);
 					if (pos != rec_map.end()) {
@@ -626,7 +639,7 @@
 		}
 
 		for (auto &addr : node->addrs_) {
-			cleaner_(addr);
+			cleaner_(addr.first);
 		}
 
 		node->addrs_.clear();
@@ -678,7 +691,7 @@
 {
 	return [&](auto &&rep_body) {
 		auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id()));
-		auto remote = head.route(0).mq_id();
+		MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()};
 		MsgI msg;
 		if (msg.Make(reply_head, rep_body)) {
 			DEFER1(msg.Release(););
@@ -741,7 +754,7 @@
 					if (node) {
 						// should also make sure that mq is not killed before msg expires.
 						// it would be ok if (kill_time - offline_time) is longer than expire time.
-						socket.Send(cli.mq_, msg);
+						socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
 						++it;
 					} else {
 						it = clients.erase(it);
@@ -772,9 +785,9 @@
 	return rec;
 }
 
-bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len)
+bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len)
 {
-	Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mqid, mq_len};
+	Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mq, mq_len};
 	return true;
 }
 
@@ -792,7 +805,7 @@
 
 	for (auto &kv : Centers()) {
 		auto &info = kv.second;
-		sockets_[info.name_] = std::make_shared<ShmSocket>(shm, info.mqid_, info.mq_len_);
+		sockets_[info.name_] = std::make_shared<ShmSocket>(info.mq_.offset_, shm, info.mq_.id_);
 	}
 }
 
diff --git a/box/center.h b/box/center.h
index d68573b..ebe48b4 100644
--- a/box/center.h
+++ b/box/center.h
@@ -31,7 +31,7 @@
 	typedef Socket::PartialRecvCB MsgHandler;
 	typedef Socket::RawRecvCB RawHandler;
 	typedef Socket::IdleCB IdleHandler;
-	static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len);
+	static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len);
 
 	BHCenter(Socket::Shm &shm);
 	~BHCenter() { Stop(); }
@@ -44,7 +44,7 @@
 		MsgHandler handler_;
 		RawHandler raw_handler_;
 		IdleHandler idle_;
-		MQId mqid_;
+		MQInfo mq_;
 		int mq_len_ = 0;
 	};
 	typedef std::map<std::string, CenterInfo> CenterRecords;
diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto
index 94bc82e..8b422c7 100644
--- a/proto/source/bhome_msg_api.proto
+++ b/proto/source/bhome_msg_api.proto
@@ -9,8 +9,9 @@
 
 message BHAddress {
 	uint64 mq_id = 1;
-	bytes ip = 2;   
-	int32 port = 3;
+	int64 abs_addr = 2;
+	bytes ip = 3;   
+	int32 port = 4;
 }
 
 message ProcInfo
@@ -48,7 +49,6 @@
 message MsgRegister
 {
 	ProcInfo proc = 1;
-	repeated BHAddress addrs = 2;
 }
 
 message MsgUnregister
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index c9ceb20..ca7249d 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -30,16 +30,21 @@
 }
 std::unique_ptr<TopicNode> &ProcNodePtr()
 {
-	static bool init = GlobalInit(BHomeShm());
-	auto InitLog = []() {
-		auto id = GetProcExe();
-		char path[200] = {0};
-		sprintf(path, "/tmp/bhshmq_node_%s.log", id.c_str());
-		ns_log::AddLog(path);
-		return true;
-	};
-	static bool init_log = InitLog();
-	static std::unique_ptr<TopicNode> ptr(new TopicNode(BHomeShm()));
+	static std::mutex mtx;
+	std::lock_guard<std::mutex> lk(mtx);
+
+	static std::unique_ptr<TopicNode> ptr;
+	if (!ptr && GlobalInit(BHomeShm())) {
+		auto InitLog = []() {
+			auto id = GetProcExe();
+			char path[200] = {0};
+			sprintf(path, "/tmp/bhshmq_node_%s.log", id.c_str());
+			ns_log::AddLog(path);
+			return true;
+		};
+		static bool init_log = InitLog();
+		ptr.reset(new TopicNode(BHomeShm()));
+	}
 	return ptr;
 }
 TopicNode &ProcNode()
@@ -114,6 +119,12 @@
 		return false;
 	}
 	MsgOut msg_reply;
+	auto &ptr = ProcNodePtr();
+	if (!ptr) {
+		SetLastError(eNotFound, "center not started.");
+		return 0;
+	}
+
 	return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) &&
 	       PackOutput(msg_reply, reply, reply_len);
 }
diff --git a/src/bh_util.h b/src/bh_util.h
index 223da2a..15ffeb0 100644
--- a/src/bh_util.h
+++ b/src/bh_util.h
@@ -157,9 +157,9 @@
 	}
 
 protected:
-	static inline T &GetData()
+	static inline T &GetData(const std::string &msg = "Must set data before use!")
 	{
-		if (!ptr()) { throw std::string("Must set ShmMsg shm before use!"); }
+		if (!ptr()) { throw std::logic_error(msg); }
 		return *ptr();
 	}
 
diff --git a/src/defs.cpp b/src/defs.cpp
index b812b65..6e7a5fd 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -141,10 +141,10 @@
 	return false;
 }
 
-uint64_t BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_.id_; }
-uint64_t BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_.id_; }
-uint64_t BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_.id_; }
-uint64_t BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_.id_; }
+const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; }
+const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; }
+const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; }
+const MQInfo &BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_; }
 
 int64_t CalcAllocIndex(int64_t size)
 {
diff --git a/src/defs.h b/src/defs.h
index 5c770a7..cc3dc02 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -27,12 +27,12 @@
 int64_t CalcAllocIndex(int64_t size);
 int64_t GetAllocSize(int index);
 
-struct CenterInfo {
-	struct MQInfo {
-		int64_t id_ = 0;
-		int64_t offset_ = 0;
-	};
+struct MQInfo {
+	MQId id_ = 0;
+	int64_t offset_ = 0;
+};
 
+struct CenterInfo {
 	MQInfo mq_center_;
 	MQInfo mq_bus_;
 	MQInfo mq_init_;
@@ -59,9 +59,9 @@
 void GetLastError(int &ec, std::string &msg);
 //TODO center can check shm for previous crash.
 
-uint64_t BHGlobalSenderAddress();
-uint64_t BHTopicCenterAddress();
-uint64_t BHTopicBusAddress();
-uint64_t BHCenterReplyAddress();
+const MQInfo &BHGlobalSenderAddress();
+const MQInfo &BHTopicCenterAddress();
+const MQInfo &BHTopicBusAddress();
+const MQInfo &BHCenterReplyAddress();
 
 #endif // end of include guard: DEFS_KP8LKGD0
diff --git a/src/sendq.cpp b/src/sendq.cpp
index 1eaefe6..f1e5918 100644
--- a/src/sendq.cpp
+++ b/src/sendq.cpp
@@ -21,6 +21,24 @@
 
 using namespace bhome_shm;
 
+void SendQ::AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire)
+{
+	TimedMsg tmp(expire, MsgInfo{mq, data, std::move(onExpire)});
+	std::unique_lock<std::mutex> lock(mutex_in_);
+
+	try {
+		auto &al = in_[mq.id_];
+		if (!al.empty()) {
+			al.front().emplace_back(std::move(tmp));
+		} else {
+			al.insert(al.begin(), Array())->emplace_back(std::move(tmp));
+		}
+	} catch (std::exception &e) {
+		LOG_ERROR() << "sendq error: " << e.what();
+		throw e;
+	}
+}
+
 int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr)
 {
 	auto FirstNotExpired = [](Array &l) {
@@ -36,7 +54,7 @@
 		}
 	}
 
-	while (pos != arr.end() && mq.TrySend(remote, pos->data().data_)) {
+	while (pos != arr.end() && mq.TrySend(pos->data().mq_, pos->data().data_)) {
 		++pos;
 	}
 
@@ -59,6 +77,8 @@
 bool SendQ::TrySend(ShmMsgQueue &mq)
 {
 	std::unique_lock<std::mutex> lock(mutex_out_);
+	// if (TooFast()) { return false; }
+
 	size_t nsend = 0;
 	if (!out_.empty()) {
 		auto rec = out_.begin();
@@ -89,3 +109,14 @@
 
 	return !out_.empty();
 }
+
+bool SendQ::TooFast()
+{
+	auto cur = NowSec();
+	if (cur > last_time_) {
+		last_time_ = cur;
+		count_ = 0;
+	}
+
+	return ++count_ > 1000 * 100;
+} // not accurate in multi-thread.
\ No newline at end of file
diff --git a/src/sendq.h b/src/sendq.h
index 9e2b5ca..d1ba30a 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -39,6 +39,7 @@
 	typedef int64_t Data;
 	typedef std::function<void(const Data &)> OnMsgEvent;
 	struct MsgInfo {
+		MQInfo mq_;
 		Data data_;
 		OnMsgEvent on_expire_;
 	};
@@ -46,45 +47,51 @@
 	typedef TimedMsg::TimePoint TimePoint;
 	typedef TimedMsg::Duration Duration;
 
-	void Append(const Remote addr, const MsgI msg)
+	bool Append(const MQInfo &mq, MsgI msg)
 	{
 		msg.AddRef();
 		auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); };
-		AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire);
+		try {
+			AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
+			return true;
+		} catch (...) {
+			msg.Release();
+			return false;
+		}
 	}
 
-	void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire)
+	bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire)
 	{
 		msg.AddRef();
 		auto onMsgExpire = [onExpire](const Data &d) {
 			onExpire(d);
 			MsgI(d).Release();
 		};
-		AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire);
+		try {
+			AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
+			return true;
+		} catch (...) {
+			msg.Release();
+			return false;
+		}
 	}
 
-	void Append(const Remote addr, const Data command, OnMsgEvent onExpire = OnMsgEvent())
+	bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent())
 	{
-		AppendData(addr, command, DefaultExpire(), onExpire);
+		try {
+			AppendData(mq, command, DefaultExpire(), onExpire);
+			return true;
+		} catch (...) {
+			return false;
+		}
 	}
 	bool TrySend(ShmMsgQueue &mq);
 
 private:
 	static TimePoint Now() { return TimedMsg::Clock::now(); }
 	static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
-	void AppendData(const Remote addr, const Data data, const TimePoint &expire, OnMsgEvent onExpire)
-	{
-		//TODO simple queue, organize later ?
+	void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire);
 
-		TimedMsg tmp(expire, MsgInfo{data, std::move(onExpire)});
-		std::unique_lock<std::mutex> lock(mutex_in_);
-		auto &al = in_[addr];
-		if (!al.empty()) {
-			al.front().emplace_back(std::move(tmp));
-		} else {
-			al.insert(al.begin(), Array())->emplace_back(std::move(tmp));
-		}
-	}
 	typedef std::deque<TimedMsg> Array;
 	typedef std::list<Array> ArrayList;
 	typedef std::unordered_map<Remote, ArrayList> Store;
@@ -92,10 +99,15 @@
 	int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr);
 	int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
 
+	bool TooFast();
+
 	std::mutex mutex_in_;
 	std::mutex mutex_out_;
 	Store in_;
 	Store out_;
+
+	int64_t count_ = 0;
+	int64_t last_time_ = 0;
 };
 
 #endif // end of include guard: SENDQ_IWKMSK7M
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
index 663da1e..1d78e8c 100644
--- a/src/shm_msg_queue.cpp
+++ b/src/shm_msg_queue.cpp
@@ -33,7 +33,7 @@
 
 ShmMsgQueue::MQId ShmMsgQueue::NewId()
 {
-	static auto &id = GetData();
+	static auto &id = GetData("Must init shared memory before use! Please make sure center is running.");
 	return (++id) * 10;
 }
 
@@ -96,11 +96,11 @@
 	return Shmq::Find(shm, MsgQIdToName(remote_id));
 }
 
-bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote, int64_t val)
+bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQInfo &remote, const RawData val)
 {
 	try {
 		//TODO find from center, or use offset.
-		ShmMsgQueue dest(shm, false, remote, 1);
+		ShmMsgQueue dest(remote.offset_, shm, remote.id_);
 #ifndef BH_USE_ATOMIC_Q
 		Guard lock(GetMutex(remote_id));
 #endif
diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h
index eead739..de60fde 100644
--- a/src/shm_msg_queue.h
+++ b/src/shm_msg_queue.h
@@ -18,6 +18,7 @@
 #ifndef SHM_MSG_QUEUE_D847TQXH
 #define SHM_MSG_QUEUE_D847TQXH
 
+#include "defs.h"
 #include "msg.h"
 #include "shm_queue.h"
 
@@ -75,8 +76,8 @@
 	bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); }
 	bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); }
 	static Queue *Find(ShmType &shm, const MQId remote);
-	static bool TrySend(ShmType &shm, const MQId remote, const RawData val);
-	bool TrySend(const MQId remote, const RawData val) { return TrySend(shm(), remote, val); }
+	static bool TrySend(ShmType &shm, const MQInfo &remote, const RawData val);
+	bool TrySend(const MQInfo &remote, const RawData val) { return TrySend(shm(), remote, val); }
 
 private:
 #ifndef BH_USE_ATOMIC_Q
diff --git a/src/socket.cpp b/src/socket.cpp
index 4f09517..55b43f9 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -80,15 +80,13 @@
 				}
 			};
 			ShmMsgQueue::RawData val = 0;
-			auto TryRecvMore = [&]() {
-				for (int i = 0; i < 100; ++i) {
-					if (mq().TryRecv(val)) {
-						return true;
-					}
+			for (int i = 0; i < 100; ++i) {
+				if (mq().TryRecv(val)) {
+					onRecv(val);
+					return true;
 				}
-				return false;
-			};
-			return TryRecvMore() ? (onRecv(val), true) : false;
+			}
+			return false;
 		};
 
 		try {
@@ -160,6 +158,31 @@
 	return false;
 }
 
+bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb)
+{
+	size_t size = content.size();
+	auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
+		if (!msg.Fill(content)) { return; }
+
+		try {
+			if (!cb) {
+				Send(remote, msg);
+			} else {
+				per_msg_cbs_->Store(msg_id, std::move(cb));
+				auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
+					RecvCB cb_no_use;
+					per_msg_cbs_->Pick(msg_id, cb_no_use);
+				};
+				Send(remote, msg, onExpireRemoveCB);
+			}
+		} catch (...) {
+			SetLastError(eError, "Send internal error.");
+		}
+	};
+
+	return RequestAlloc(size, OnResult);
+}
+
 bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult)
 { // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag
 	// LOG_FUNCTION;
@@ -184,5 +207,6 @@
 		RawRecvCB cb_no_use;
 		alloc_cbs_->Pick(id, cb_no_use);
 	};
+
 	return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB);
 }
\ No newline at end of file
diff --git a/src/socket.h b/src/socket.h
index 8e9db69..dea106c 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -59,7 +59,6 @@
 	{
 		node_proc_index_ = proc_index;
 		socket_index_ = socket_index;
-		LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_;
 	}
 	// start recv.
 	bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB());
@@ -68,7 +67,7 @@
 	bool Stop();
 
 	template <class Body>
-	bool CenterSend(const MQId remote, BHMsgHead &head, Body &body)
+	bool CenterSend(const MQInfo &remote, BHMsgHead &head, Body &body)
 	{
 		try {
 			//TODO alloc outsiez and use send.
@@ -86,39 +85,17 @@
 	bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
 
 	template <class Body>
-	bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
+	bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
 	{
-		std::string msg_id(head.msg_id());
-		std::string content(MsgI::Serialize(head, body));
-		size_t size = content.size();
-		auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
-			if (!msg.Fill(content)) { return; }
-
-			try {
-				if (!cb) {
-					Send(remote, msg);
-				} else {
-					per_msg_cbs_->Store(msg_id, std::move(cb));
-					auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
-						RecvCB cb_no_use;
-						per_msg_cbs_->Pick(msg_id, cb_no_use);
-					};
-					Send(remote, msg, onExpireRemoveCB);
-				}
-			} catch (...) {
-				SetLastError(eError, "Send internal error.");
-			}
-		};
-
-		return RequestAlloc(size, OnResult);
+		return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb));
 	}
 	template <class... T>
-	bool Send(const MQId remote, const MsgI &imsg, T &&...t)
+	bool Send(const MQInfo &remote, const MsgI &imsg, T &&...t)
 	{
 		return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...);
 	}
 	template <class... T>
-	bool Send(const MQId remote, const int64_t cmd, T &&...t)
+	bool Send(const MQInfo &remote, const int64_t cmd, T &&...t)
 	{
 		return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...);
 	}
@@ -126,7 +103,7 @@
 	bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
 
 	template <class Body>
-	bool SendAndRecv(const MQId remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
+	bool SendAndRecv(const MQInfo &remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
 	{
 		struct State {
 			std::mutex mutex;
@@ -136,6 +113,7 @@
 
 		try {
 			std::shared_ptr<State> st(new State);
+
 			auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
 
 			auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
@@ -176,12 +154,12 @@
 	bool StopNoLock();
 	bool RunningNoLock() { return !workers_.empty(); }
 
+	bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB());
+
 	template <class... Rest>
-	bool SendImpl(const MQId remote, Rest &&...rest)
+	bool SendImpl(const MQInfo &remote, Rest &&...rest)
 	{
-		// TODO send alloc request, and pack later, higher bit means alloc?
-		send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
-		return true;
+		return send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
 	}
 
 	std::vector<std::thread> workers_;
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 35228b4..51a0ab7 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -28,7 +28,12 @@
 
 namespace
 {
-inline void AddRoute(BHMsgHead &head, const MQId id) { head.add_route()->set_mq_id(id); }
+inline void AddRoute(BHMsgHead &head, const ShmSocket &sock)
+{
+	auto route = head.add_route();
+	route->set_mq_id(sock.id());
+	route->set_abs_addr(sock.AbsAddr());
+}
 
 struct SrcInfo {
 	std::vector<BHAddress> route;
@@ -40,7 +45,7 @@
 } // namespace
 
 TopicNode::TopicNode(SharedMemory &shm) :
-    shm_(shm), state_(eStateUnregistered)
+    shm_(shm), state_(eStateUninited)
 {
 }
 
@@ -79,6 +84,7 @@
 			auto end_time = steady_clock::now() + 3s;
 			do {
 				try {
+					//TODO recv offset, avoid query.
 					for (int i = eSockStart; i < eSockEnd; ++i) {
 						sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen));
 					}
@@ -94,7 +100,6 @@
 		NodeInit();
 	}
 	if (!sockets_.empty()) {
-		LOG_DEBUG() << "node sockets ok";
 		auto onNodeCmd = [this](ShmSocket &socket, int64_t &val) {
 			LOG_DEBUG() << "node recv cmd: " << DecodeCmd(val);
 			switch (DecodeCmd(val)) {
@@ -103,7 +108,7 @@
 				DEFER1(msg.Release());
 				MsgProcInit body;
 				auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_);
-				head.add_route()->set_mq_id(ssn_id_);
+				AddRoute(head, socket);
 				if (msg.Fill(head, body)) {
 					socket.Send(BHTopicCenterAddress(), msg);
 				}
@@ -122,12 +127,12 @@
 				MsgProcInitReply reply;
 				if (imsg.ParseBody(reply)) {
 					SetProcIndex(reply.proc_index());
+					this->state_ = eStateUnregistered;
 				}
 			}
 			return true;
 		};
 		SockNode().Start(1, onMsg, onNodeCmd);
-		LOG_DEBUG() << "sockets ok.";
 		return true;
 	}
 	return false;
@@ -167,19 +172,22 @@
 		SetLastError(eError, kErrMsgNotInit);
 		return false;
 	}
+	auto end_time = steady_clock::now() + milliseconds(timeout_ms);
+
+	while (state_ != eStateUnregistered && steady_clock::now() < end_time) {
+		std::this_thread::yield();
+	}
+	if (state_ != eStateUnregistered) {
+		SetLastError(eError, kErrMsgNotInit);
+		return false;
+	}
 
 	auto &sock = SockNode();
 	MsgRegister body;
 	body.mutable_proc()->Swap(&proc);
-	auto AddId = [&](const MQId id) { body.add_addrs()->set_mq_id(id); };
-	AddId(SockNode().id());
-	AddId(SockServer().id());
-	AddId(SockClient().id());
-	AddId(SockSub().id());
-	AddId(SockPub().id());
 
 	auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
-	AddRoute(head, sock.id());
+	AddRoute(head, sock);
 
 	auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) {
 		bool ok = head.type() == kMsgTypeCommonReply &&
@@ -224,7 +232,7 @@
 	body.mutable_proc()->Swap(&proc);
 
 	auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
-	AddRoute(head, sock.id());
+	AddRoute(head, sock);
 
 	auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) {
 		bool r = head.type() == kMsgTypeCommonReply &&
@@ -260,7 +268,7 @@
 	body.mutable_proc()->Swap(&proc);
 
 	auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
-	AddRoute(head, sock.id());
+	AddRoute(head, sock);
 
 	if (timeout_ms == 0) {
 		return sock.Send(BHTopicCenterAddress(), head, body);
@@ -290,7 +298,7 @@
 	auto &sock = SockNode();
 
 	BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn()));
-	AddRoute(head, sock.id());
+	AddRoute(head, sock);
 
 	MsgI reply;
 	DEFER1(reply.Release());
@@ -312,7 +320,7 @@
 	body.mutable_topics()->Swap(&topics);
 
 	auto head(InitMsgHead(GetType(body), proc_id(), ssn()));
-	AddRoute(head, sock.id());
+	AddRoute(head, sock);
 
 	if (timeout_ms == 0) {
 		return sock.Send(BHTopicCenterAddress(), head, body);
@@ -341,7 +349,7 @@
 			for (int i = 0; i < head.route_size() - 1; ++i) {
 				reply_head.add_route()->Swap(head.mutable_route(i));
 			}
-			auto remote = head.route().rbegin()->mq_id();
+			MQInfo remote = {head.route().rbegin()->mq_id(), head.route().rbegin()->abs_addr()};
 			sock.Send(remote, reply_head, reply_body);
 		}
 	};
@@ -357,10 +365,17 @@
 		MsgRequestTopic req;
 		if (!imsg.ParseBody(req)) { return; }
 
-		SrcInfo *p = new SrcInfo;
-		p->route.assign(head.route().begin(), head.route().end());
-		p->msg_id = head.msg_id();
-		acb(p, *head.mutable_proc_id(), req);
+		try {
+			SrcInfo *p = new SrcInfo;
+			if (!p) {
+				throw std::runtime_error("no memory.");
+			}
+			p->route.assign(head.route().begin(), head.route().end());
+			p->msg_id = head.msg_id();
+			acb(p, *head.mutable_proc_id(), req);
+		} catch (std::exception &e) {
+			LOG_ERROR() << "error server handle msg:" << e.what();
+		}
 	};
 
 	auto &sock = SockServer();
@@ -381,11 +396,19 @@
 	if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
 		if (imsg.ParseBody(request)) {
 			head.mutable_proc_id()->swap(proc_id);
-			SrcInfo *p = new SrcInfo;
-			p->route.assign(head.route().begin(), head.route().end());
-			p->msg_id = head.msg_id();
-			src_info = p;
-			return true;
+			try {
+				SrcInfo *p = new SrcInfo;
+				if (!p) {
+					throw std::runtime_error("no memory.");
+				}
+				p->route.assign(head.route().begin(), head.route().end());
+				p->msg_id = head.msg_id();
+				src_info = p;
+				return true;
+			} catch (std::exception &e) {
+				LOG_ERROR() << "error recv request: " << e.what();
+				return false;
+			}
 		}
 	}
 	return false;
@@ -409,7 +432,8 @@
 	for (unsigned i = 0; i < p->route.size() - 1; ++i) {
 		head.add_route()->Swap(&p->route[i]);
 	}
-	return sock.Send(p->route.back().mq_id(), head, body);
+	MQInfo dest = {p->route.back().mq_id(), p->route.back().abs_addr()};
+	return sock.Send(dest, head, body);
 }
 
 bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker)
@@ -440,10 +464,10 @@
 
 	out_msg_id = msg_id;
 
-	auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) {
+	auto SendTo = [this, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
 		auto &sock = SockClient();
 		BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id));
-		AddRoute(head, sock.id());
+		AddRoute(head, sock);
 		head.set_topic(req.topic());
 
 		if (cb) {
@@ -455,15 +479,15 @@
 					}
 				}
 			};
-			return sock.Send(addr.mq_id(), head, req, onRecv);
+			return sock.Send(remote, head, req, onRecv);
 		} else {
-			return sock.Send(addr.mq_id(), head, req);
+			return sock.Send(remote, head, req);
 		}
 	};
 
 	try {
 		BHAddress addr;
-		return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb);
+		return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb);
 	} catch (...) {
 		SetLastError(eError, "internal error.");
 		return false;
@@ -484,14 +508,14 @@
 		if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
 			LOG_TRACE() << "node: " << SockNode().id() << ", topic dest: " << addr.mq_id();
 			BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
-			AddRoute(head, sock.id());
+			AddRoute(head, sock);
 			head.set_topic(request.topic());
 
 			MsgI reply_msg;
 			DEFER1(reply_msg.Release(););
 			BHMsgHead reply_head;
 
-			if (sock.SendAndRecv(addr.mq_id(), head, request, reply_msg, reply_head, timeout_ms) &&
+			if (sock.SendAndRecv({addr.mq_id(), addr.abs_addr()}, head, request, reply_msg, reply_head, timeout_ms) &&
 			    reply_head.type() == kMsgTypeRequestTopicReply &&
 			    reply_msg.ParseBody(out_reply)) {
 				reply_head.mutable_proc_id()->swap(out_proc_id);
@@ -504,7 +528,7 @@
 	return false;
 }
 
-int TopicNode::QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms)
+int TopicNode::QueryTopicServers(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms)
 {
 	int n = 0;
 	MsgQueryTopic query;
@@ -532,7 +556,7 @@
 		return true;
 	}
 	std::vector<NodeAddress> lst;
-	if (QueryRPCTopics(topic, lst, timeout_ms)) {
+	if (QueryTopicServers(topic, lst, timeout_ms)) {
 		addr = lst.front().addr();
 		if (addr.mq_id() != 0) {
 			topic_query_cache_.Store(topic, addr);
@@ -555,7 +579,7 @@
 	try {
 		auto &sock = SockPub();
 		BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn()));
-		AddRoute(head, sock.id());
+		AddRoute(head, sock);
 
 		if (timeout_ms == 0) {
 			return sock.Send(BHTopicBusAddress(), head, pub);
@@ -589,7 +613,7 @@
 		sub.mutable_topics()->Swap(&topics);
 
 		BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn()));
-		AddRoute(head, sock.id());
+		AddRoute(head, sock);
 		if (timeout_ms == 0) {
 			return sock.Send(BHTopicBusAddress(), head, sub);
 		} else {
diff --git a/src/topic_node.h b/src/topic_node.h
index b018807..be82cf6 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -78,7 +78,7 @@
 	MQId ssn() { return SockNode().id(); }
 	bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms);
 	typedef MsgQueryTopicReply::BHNodeAddress NodeAddress;
-	int QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms);
+	int QueryTopicServers(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms);
 	const std::string &proc_id() { return info_.proc_id(); }
 
 	typedef BHAddress Address;
@@ -139,6 +139,7 @@
 	}
 
 	enum State {
+		eStateUninited,
 		eStateUnregistered,
 		eStateOnline,
 		eStateOffline // heartbeat fail.
@@ -146,7 +147,7 @@
 	void state(const State st) { state_.store(st); }
 	void state_cas(State expected, const State val) { state_.compare_exchange_strong(expected, val); }
 	State state() const { return state_.load(); }
-	bool IsOnline() { return Init() && state() == eStateOnline; }
+	bool IsOnline() { return state() == eStateOnline; }
 	bool Init();
 	bool Valid() const { return !sockets_.empty(); }
 	std::mutex mutex_;
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index f48f307..44c809d 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -129,7 +129,14 @@
 		void *reply = 0;
 		int reply_len = 0;
 		reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000);
-		printf("register %s\n", reg ? "ok" : "failed");
+		if (reg) {
+			printf("register ok\n");
+		} else {
+			int ec = 0;
+			std::string msg;
+			GetLastError(ec, msg);
+			printf("register failed, %d, %s\n", ec, msg.c_str());
+		}
 
 		BHFree(reply, reply_len);
 		Sleep(1s);
@@ -239,6 +246,7 @@
 			DEFER1(BHFree(msg_id, len););
 			// Sleep(10ms, false);
 			std::string dest(BHAddress().SerializeAsString());
+
 			bool r = BHAsyncRequest(dest.data(), dest.size(), s.data(), s.size(), 0, 0);
 			if (r) {
 				++Status().nrequest_;
@@ -294,11 +302,12 @@
 
 	int same = 0;
 	uint64_t last = 0;
-	while (last < nreq * ncli && same < 2) {
+	while (last < nreq * ncli && same < 3) {
 		Sleep(1s, false);
 		auto cur = Status().nreply_.load();
 		if (last == cur) {
 			++same;
+			printf("same %d\n", same);
 		} else {
 			last = cur;
 			same = 0;
@@ -308,6 +317,7 @@
 	run = false;
 	threads.WaitAll();
 	auto &st = Status();
+	Sleep(1s);
 	printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load());
 	BHCleanup();
 	printf("after cleanup\n");
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 66e5179..4dea623 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -24,16 +24,8 @@
 {
 	SharedMemory &shm = TestShm();
 	GlobalInit(shm);
-	auto InitSem = [](auto id) {
-		auto sem_id = semget(id, 1, 0666 | IPC_CREAT);
-		union semun init_val;
-		init_val.val = 1;
-		semctl(sem_id, 0, SETVAL, init_val);
-		return;
-	};
-
-	MQId id = ShmMsgQueue::NewId();
-	InitSem(id);
+	MQId server_id = ShmMsgQueue::NewId();
+	ShmMsgQueue server(server_id, shm, 1000);
 
 	const int timeout = 1000;
 	const uint32_t data_size = 1001;
@@ -44,7 +36,6 @@
 	std::string str(data_size, 'a');
 	auto Writer = [&](int writer_id, uint64_t n) {
 		MQId cli_id = ShmMsgQueue::NewId();
-		InitSem(cli_id);
 
 		ShmMsgQueue mq(cli_id, shm, 64);
 		MsgI msg;
@@ -58,12 +49,12 @@
 
 		for (uint64_t i = 0; i < n; ++i) {
 			msg.AddRef();
-			while (!mq.TrySend(id, msg.Offset())) {}
+			while (!mq.TrySend({server.Id(), server.AbsAddr()}, msg.Offset())) {}
 			++nwrite;
 		}
 	};
 	auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
-		ShmMsgQueue mq(id, shm, 1000);
+		ShmMsgQueue &mq = server;
 		auto now = []() { return steady_clock::now(); };
 		auto tm = now();
 		while (*run) {
@@ -189,8 +180,10 @@
 				req_body.set_topic("topic");
 				req_body.set_data(msg_content);
 				auto req_head(InitMsgHead(GetType(req_body), client_proc_id, cli.id()));
-				req_head.add_route()->set_mq_id(cli.id());
-				return cli.Send(srv.id(), req_head, req_body);
+				auto route = req_head.add_route();
+				route->set_mq_id(cli.id());
+				route->set_abs_addr(cli.AbsAddr());
+				return cli.Send({srv.id(), srv.AbsAddr()}, req_head, req_body);
 			};
 
 			Req();
@@ -207,13 +200,13 @@
 				DEFER1(req.Release());
 
 				if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
-					auto src_id = req_head.route()[0].mq_id();
+					MQInfo src_mq = {req_head.route()[0].mq_id(), req_head.route()[0].abs_addr()};
 					auto Reply = [&]() {
 						MsgRequestTopic reply_body;
 						reply_body.set_topic("topic");
 						reply_body.set_data(msg_content);
 						auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id()));
-						return srv.Send(src_id, reply_head, reply_body);
+						return srv.Send(src_mq, reply_head, reply_body);
 					};
 					Reply();
 				}

--
Gitblit v1.8.0