From db322f33ba13592f2492317e3f1a070454c97059 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 13 五月 2021 19:34:46 +0800
Subject: [PATCH] center alloc all msgs.

---
 src/robust.h                 |   39 +
 src/socket.h                 |   90 +++-
 box/center.cpp               |  276 ++++++++++++--
 src/proto.h                  |    2 
 src/msg.h                    |  109 +++--
 src/shm_msg_queue.h          |   59 ++
 src/socket.cpp               |  141 ++++---
 proto/source/bhome_msg.proto |    9 
 src/defs.h                   |    9 
 box/center.h                 |    7 
 src/sendq.cpp                |   20 
 src/topic_node.cpp           |   95 +++-
 src/shm_msg_queue.cpp        |   26 
 src/defs.cpp                 |   45 ++
 src/msg.cpp                  |   29 +
 box/center_main.cc           |    7 
 utest/speed_test.cpp         |    9 
 utest/api_test.cpp           |    2 
 src/bh_util.h                |    2 
 src/shm_queue.h              |   11 
 src/topic_node.h             |    9 
 utest/robust_test.cpp        |   45 +-
 src/sendq.h                  |   14 
 src/robust.cpp               |   16 
 24 files changed, 788 insertions(+), 283 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index d6ac804..b440a03 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -21,7 +21,7 @@
 #include "log.h"
 #include "shm.h"
 #include <chrono>
-#include <set>
+#include <unordered_map>
 
 using namespace std::chrono;
 using namespace std::chrono_literals;
@@ -33,11 +33,118 @@
 namespace
 {
 
+typedef std::string ProcId;
+typedef size_t ProcIndex; // max local procs.
+const int kMaxProcs = 65536;
+
+// record all procs ever registered, always grow, never remove.
+// mainly for node to request msg allocation.
+// use index instead of MQId to save some bits.
+class ProcRecords
+{
+public:
+	struct ProcRec {
+		ProcId proc_;
+		MQId ssn_ = 0;
+	};
+
+	ProcRecords() { procs_.reserve(kMaxProcs); }
+
+	ProcIndex Put(const ProcId &proc_id, const MQId ssn)
+	{
+		if (procs_.size() >= kMaxProcs) {
+			return -1;
+		}
+		auto pos_isnew = proc_index_.emplace(proc_id, procs_.size());
+		int index = pos_isnew.first->second;
+		if (pos_isnew.second) {
+			procs_.emplace_back(ProcRec{proc_id, ssn});
+		} else { // update ssn
+			procs_[index].ssn_ = ssn;
+		}
+		return index;
+	}
+	const ProcRec &Get(const ProcIndex index) const
+	{
+		static ProcRec empty_rec;
+		return (index < procs_.size()) ? procs_[index] : empty_rec;
+	}
+
+private:
+	std::unordered_map<ProcId, size_t> proc_index_;
+	std::vector<ProcRec> procs_;
+};
+
+class MsgRecords
+{
+	typedef int64_t MsgId;
+	typedef int64_t Offset;
+
+public:
+	void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); }
+	void FreeMsg(MsgId id)
+	{
+		auto pos = msgs_.find(id);
+		if (pos != msgs_.end()) {
+			ShmMsg(pos->second).Free();
+			msgs_.erase(pos);
+		} else {
+			LOG_TRACE() << "ignore late free request.";
+		}
+	}
+	void AutoRemove()
+	{
+		auto now = NowSec();
+		if (now < time_to_clean_) {
+			return;
+		}
+		LOG_FUNCTION;
+		time_to_clean_ = now + 1;
+		int64_t limit = std::max(10000ul, msgs_.size() / 10);
+		int64_t n = 0;
+		auto it = msgs_.begin();
+		while (it != msgs_.end() && --limit > 0) {
+			ShmMsg msg(it->second);
+			if (msg.Count() == 0) {
+				msg.Free();
+				it = msgs_.erase(it);
+				++n;
+			} else if (msg.timestamp() + 10 < NowSec()) {
+				msg.Free();
+				it = msgs_.erase(it);
+				++n;
+				// LOG_DEBUG() << "release timeout msg, someone crashed.";
+			} else {
+				++it;
+			}
+		}
+		if (n > 0) {
+			LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n;
+		}
+	}
+	size_t size() const { return msgs_.size(); }
+	void DebugPrint() const
+	{
+		LOG_DEBUG() << "msgs : " << size();
+		int i = 0;
+		int total_count = 0;
+		for (auto &kv : msgs_) {
+			MsgI msg(kv.second);
+			total_count += msg.Count();
+			LOG_TRACE() << "  " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size();
+		}
+		LOG_DEBUG() << "total count: " << total_count;
+	}
+
+private:
+	std::unordered_map<MsgId, Offset> msgs_;
+	int64_t time_to_clean_ = 0;
+};
+
 //TODO check proc_id
 class NodeCenter
 {
 public:
-	typedef std::string ProcId;
 	typedef MQId Address;
 	typedef bhome_msg::ProcInfo ProcInfo;
 	typedef std::function<void(Address const)> Cleaner;
@@ -102,13 +209,14 @@
 
 	// center name, no relative to shm.
 	const std::string &id() const { return id_; }
-	void OnNodeInit(SharedMemory &shm, const int64_t msg)
+	void OnNodeInit(ShmSocket &socket, const int64_t val)
 	{
-		MQId ssn = msg;
+		LOG_FUNCTION;
+		SharedMemory &shm = socket.shm();
+		MQId ssn = (val >> 4) & MaskBits(60);
 		if (nodes_.find(ssn) != nodes_.end()) {
 			return; // ignore in exists.
 		}
-
 		auto UpdateRegInfo = [&](Node &node) {
 			for (int i = 0; i < 10; ++i) {
 				node->addrs_.insert(ssn + i);
@@ -118,12 +226,10 @@
 
 			// create sockets.
 			try {
-				auto CreateSocket = [](SharedMemory &shm, const MQId id) {
-					ShmSocket tmp(shm, true, id, 16);
-				};
+				auto CreateSocket = [&](const MQId id) { ShmSocket tmp(shm, true, id, 16); };
 				// alloc(-1), node, server, sub, request,
-				for (int i = -1; i < 4; ++i) {
-					CreateSocket(shm, ssn + i);
+				for (int i = 0; i < 4; ++i) {
+					CreateSocket(ssn + i);
 					node->addrs_.insert(ssn + i);
 				}
 				return true;
@@ -132,11 +238,93 @@
 			}
 		};
 
+		auto PrepareProcInit = [&]() {
+			bool r = false;
+			ShmMsg init_msg;
+			if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) {
+				// 31bit pointer, 4bit cmd+flag
+				int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply);
+				r = SendAllocReply(socket, ssn, reply, init_msg);
+			}
+			return r;
+		};
+
 		Node node(new NodeInfo);
-		if (UpdateRegInfo(node)) {
+		if (UpdateRegInfo(node) && PrepareProcInit()) {
 			nodes_[ssn] = node;
 			LOG_INFO() << "new node ssn (" << ssn << ") init";
+		} else {
+			for (int i = 0; i < 10; ++i) {
+				ShmSocket::Remove(shm, ssn + i);
+			}
 		}
+	}
+	void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); }
+
+	bool SendAllocReply(ShmSocket &socket, const Address dest, const int64_t reply, const MsgI &msg)
+	{
+		RecordMsg(msg);
+		auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); };
+		return socket.Send(dest, reply, onExpireFree);
+	}
+	bool SendAllocMsg(ShmSocket &socket, const Address dest, const MsgI &msg)
+	{
+		RecordMsg(msg);
+		auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); };
+		return socket.Send(dest, msg, onExpireFree);
+	}
+
+	void OnAlloc(ShmSocket &socket, const int64_t val)
+	{
+		// LOG_FUNCTION;
+		// 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag
+		int64_t msg_id = (val >> 4) & MaskBits(28);
+		int proc_index = (val >> 32) & MaskBits(16);
+		int socket_index = ((val) >> 48) & MaskBits(4);
+		auto proc_rec(procs_.Get(proc_index));
+		if (proc_rec.proc_.empty()) {
+			return;
+		}
+		Address dest = proc_rec.ssn_ + socket_index;
+
+		auto size = GetAllocSize((val >> 52) & MaskBits(8));
+		MsgI new_msg;
+		if (new_msg.Make(size)) {
+			// 31bit proc index, 28bit id, ,4bit cmd+flag
+			int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0);
+			SendAllocReply(socket, dest, reply, new_msg);
+		} else {
+			int64_t reply = (msg_id << 4) | EncodeCmd(eCmdAllocReply0); // send empty, ack failure.
+			socket.Send(dest, reply);
+		}
+	}
+
+	void OnFree(ShmSocket &socket, const int64_t val)
+	{
+		int64_t msg_id = (val >> 4) & MaskBits(31);
+		msgs_.FreeMsg(msg_id);
+	}
+
+	bool OnCommand(ShmSocket &socket, const int64_t val)
+	{
+		assert(IsCmd(val));
+		int cmd = DecodeCmd(val);
+		switch (cmd) {
+		case eCmdNodeInit: OnNodeInit(socket, val); break;
+		case eCmdAllocRequest0: OnAlloc(socket, val); break;
+		case eCmdFree: OnFree(socket, val); break;
+		default: return false;
+		}
+		return true;
+	}
+
+	MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg)
+	{
+		LOG_DEBUG() << "center got proc init.";
+		auto index = procs_.Put(head.proc_id(), head.ssn_id());
+		auto reply(MakeReply<MsgProcInitReply>(eSuccess));
+		reply.set_proc_index(index);
+		return reply;
 	}
 
 	MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
@@ -160,14 +348,13 @@
 			};
 
 			auto pos = nodes_.find(ssn);
-			if (pos != nodes_.end()) { // update
-				Node &node = pos->second;
-				UpdateRegInfo(node);
-			} else {
-				Node node(new NodeInfo);
-				UpdateRegInfo(node);
-				nodes_[ssn] = node;
+			if (pos == nodes_.end()) {
+				return MakeReply(eInvalidInput, "invalid session.");
 			}
+
+			// update proc info
+			Node &node = pos->second;
+			UpdateRegInfo(node);
 			LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")";
 
 			auto old = online_node_addr_map_.find(head.proc_id());
@@ -376,13 +563,14 @@
 	void OnTimer()
 	{
 		CheckNodes();
+		msgs_.AutoRemove();
 	}
 
 private:
 	void CheckNodes()
 	{
 		auto now = NowSec();
-		if (now - last_check_time_ < 1) { return; }
+		if (now <= last_check_time_) { return; }
 		last_check_time_ = now;
 
 		auto it = nodes_.begin();
@@ -396,6 +584,7 @@
 				++it;
 			}
 		}
+		msgs_.DebugPrint();
 	}
 	bool CanHeartbeat(const NodeInfo &node)
 	{
@@ -448,7 +637,10 @@
 	std::unordered_map<Topic, Clients> service_map_;
 	std::unordered_map<Topic, Clients> subscribe_map_;
 	std::unordered_map<Address, Node> nodes_;
-	std::unordered_map<std::string, Address> online_node_addr_map_;
+	std::unordered_map<ProcId, Address> online_node_addr_map_;
+	ProcRecords procs_; // To get a short index for msg alloc.
+	MsgRecords msgs_;   // record all msgs alloced.
+
 	Cleaner cleaner_; // remove mqs.
 	int64_t offline_time_;
 	int64_t kill_time_;
@@ -483,25 +675,28 @@
 		    msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \
 		return true;
 
-auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, const std::string &proc_id)
+auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, Synced<NodeCenter> &center)
 {
 	return [&](auto &&rep_body) {
-		auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id()));
+		auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id()));
 		auto remote = head.route(0).mq_id();
-		socket.Send(remote, reply_head, rep_body);
+		MsgI msg;
+		if (msg.Make(reply_head, rep_body)) {
+			DEFER1(msg.Release(););
+			center->SendAllocMsg(socket, remote, msg);
+		}
 	};
 }
 
 bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr)
 {
-	auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) {
+	// command
+	auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
 		auto &center = *center_ptr;
-		center->OnNodeInit(socket.shm(), msg.Offset());
+		return IsCmd(cmd) && center->OnCommand(socket, cmd);
 	};
-	auto Nothing = [](ShmSocket &socket) {};
 
-	BHCenter::Install("#centetr.Init", OnNodeInit, Nothing, BHInitAddress(), 16);
-
+	// now we can talk.
 	auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
 		auto &center = *center_ptr;
 		center->OnTimer();
@@ -509,8 +704,9 @@
 
 	auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
 		auto &center = *center_ptr;
-		auto replyer = MakeReplyer(socket, head, center->id());
+		auto replyer = MakeReplyer(socket, head, center);
 		switch (head.type()) {
+			CASE_ON_MSG_TYPE(ProcInit);
 			CASE_ON_MSG_TYPE(Register);
 			CASE_ON_MSG_TYPE(Heartbeat);
 			CASE_ON_MSG_TYPE(Unregister);
@@ -520,12 +716,13 @@
 		default: return false;
 		}
 	};
-	BHCenter::Install("#center.main", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000);
+	BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000);
 
 	auto OnBusIdle = [=](ShmSocket &socket) {};
+	auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
 	auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
 		auto &center = *center_ptr;
-		auto replyer = MakeReplyer(socket, head, center->id());
+		auto replyer = MakeReplyer(socket, head, center);
 		auto OnPublish = [&]() {
 			MsgPublish pub;
 			NodeCenter::Clients clients;
@@ -561,7 +758,7 @@
 		}
 	};
 
-	BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000);
+	BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000);
 
 	return true;
 }
@@ -576,14 +773,9 @@
 	return rec;
 }
 
-bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len)
+bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len)
 {
-	Centers()[name] = CenterInfo{name, handler, MsgIHandler(), idle, mqid, mq_len};
-	return true;
-}
-bool BHCenter::Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len)
-{
-	Centers()[name] = CenterInfo{name, MsgHandler(), handler, idle, mqid, mq_len};
+	Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mqid, mq_len};
 	return true;
 }
 
@@ -609,11 +801,7 @@
 {
 	for (auto &kv : Centers()) {
 		auto &info = kv.second;
-		if (info.handler_) {
-			sockets_[info.name_]->Start(info.handler_, info.idle_);
-		} else {
-			sockets_[info.name_]->Start(info.raw_handler_, info.idle_);
-		}
+		sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
 	}
 
 	return true;
diff --git a/box/center.h b/box/center.h
index 4d71bc9..d68573b 100644
--- a/box/center.h
+++ b/box/center.h
@@ -29,10 +29,9 @@
 
 public:
 	typedef Socket::PartialRecvCB MsgHandler;
-	typedef Socket::RawRecvCB MsgIHandler;
+	typedef Socket::RawRecvCB RawHandler;
 	typedef Socket::IdleCB IdleHandler;
-	static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len);
-	static bool Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len);
+	static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len);
 
 	BHCenter(Socket::Shm &shm);
 	~BHCenter() { Stop(); }
@@ -43,7 +42,7 @@
 	struct CenterInfo {
 		std::string name_;
 		MsgHandler handler_;
-		MsgIHandler raw_handler_;
+		RawHandler raw_handler_;
 		IdleHandler idle_;
 		MQId mqid_;
 		int mq_len_ = 0;
diff --git a/box/center_main.cc b/box/center_main.cc
index 232b943..6795e41 100644
--- a/box/center_main.cc
+++ b/box/center_main.cc
@@ -83,6 +83,12 @@
 	std::atomic<bool> run_;
 };
 
+bool CenterInit(bhome_shm::SharedMemory &shm)
+{
+	ShmSocket create(shm, BHGlobalSenderAddress(), 16);
+	return true;
+}
+
 } // namespace
 int center_main(int argc, const char *argv[])
 {
@@ -102,6 +108,7 @@
 	if (strcasecmp(lvl.c_str(), "fatal") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::fatal); }
 
 	auto &shm = BHomeShm();
+	CenterInit(shm);
 	GlobalInit(shm);
 
 	InstanceFlag inst(shm, kCenterRunningFlag);
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index 51e9b6e..dcb5c56 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -28,6 +28,8 @@
 
 	kMsgTypeCommonReply = 2;
 
+	kMsgTypeProcInit = 8;
+	kMsgTypeProcInitReply = 9;
 	kMsgTypeRegister= 10;
 	// kMsgTypeRegisterReply= 11;
 	kMsgTypeHeartbeat = 12;
@@ -60,6 +62,13 @@
 	MsgTopicList topics = 1;
 }
 
+message MsgProcInit{ } // proc_id is in header.
+
+message MsgProcInitReply {
+	ErrorMsg errmsg = 1;
+	int32 proc_index = 2;
+}
+
 service TopicRPC {
 	rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply);
 	rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply);
diff --git a/src/bh_util.h b/src/bh_util.h
index a1c0d84..223da2a 100644
--- a/src/bh_util.h
+++ b/src/bh_util.h
@@ -92,6 +92,8 @@
 inline void PutInt(void *p, uint32_t u) { Put32(p, u); }
 inline void PutInt(void *p, uint64_t u) { Put64(p, u); }
 
+constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; }
+
 class ExitCall
 {
 	typedef std::function<void(void)> func_t;
diff --git a/src/defs.cpp b/src/defs.cpp
index 6d688b2..450349e 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -33,8 +33,53 @@
 	return le;
 }
 
+constexpr int64_t AllocSizeIndex[] = {
+    16, 24, 32, 40, 48, 56, 64, 72,
+    80, 88, 96, 104, 120, 136, 152, 168,
+    184, 200, 224, 248, 272, 296, 328, 360,
+    392, 432, 472, 520, 568, 624, 680, 744,
+    816, 896, 984, 1080, 1184, 1296, 1416, 1544,
+    1688, 1848, 2016, 2200, 2400, 2624, 2864, 3128,
+    3416, 3728, 4072, 4448, 4856, 5304, 5792, 6320,
+    6896, 7528, 8216, 8968, 9784, 10680, 11656, 12720,
+    13880, 15144, 16520, 18024, 19664, 21456, 23408, 25536,
+    27864, 30400, 33168, 36184, 39480, 43072, 46992, 51264,
+    55928, 61016, 66568, 72624, 79232, 86440, 94304, 102880,
+    112232, 122440, 133576, 145720, 158968, 173424, 189192, 206392,
+    225160, 245632, 267968, 292328, 318904, 347896, 379528, 414032,
+    451672, 492736, 537536, 586408, 639720, 697880, 761328, 830544,
+    906048, 988416, 1078272, 1176296, 1283232, 1399896, 1527160, 1665992,
+    1817448, 1982672, 2162920, 2359552, 2574056, 2808064, 3063344, 3341832,
+    3645640, 3977064, 4338616, 4733040, 5163320, 5632712, 6144776, 6703392,
+    7312792, 7977592, 8702832, 9494000, 10357096, 11298656, 12325808, 13446336,
+    14668736, 16002264, 17457016, 19044024, 20775304, 22663968, 24724328, 26972000,
+    29424000, 32098912, 35017000, 38200368, 41673128, 45461600, 49594472, 54103064,
+    59021528, 64387128, 70240504, 76626008, 83592008, 91191288, 99481408, 108525176,
+    118391104, 129153936, 140895208, 153703864, 167676944, 182920304, 199549424, 217690280,
+    237480312, 259069432, 282621200, 308314040, 336342592, 366919192, 400275488, 436664168,
+    476360912, 519666456, 566908864, 618446040, 674668408, 736001904, 802911168, 875903096,
+    955530656, 1042397080, 1137160456, 1240538680, 1353314928, 1476343560, 1610556616, 1756970856,
+    1916695480, 2090940528, 2281026032, 2488392040, 2714609504, 2961392192, 3230609664, 3524301456,
+    3844692504, 4194210008, 4575501832, 4991456544, 5445225320, 5940245808, 6480268160, 7069383448,
+    7712054672, 8413150552, 9177982424, 10012344464, 10922557600, 11915517384, 12998746240, 14180450448,
+    15469582312, 16875907976, 18410081432, 20083725200, 21909518400, 23901292800, 26074137600, 28444513752,
+    31030378640, 33851322152, 36928715080, 40285871000, 43948222912, 47943515904, 52302017352, 57056746208,
+    62243723136, 67902243424, 74075174648, 80809281440, 88155579752, 96169723368, 104912425496, 114449918728,
+    124854456800, 136204861968, 148587122152, 162095042352, 176830955296, 192906496688, 210443450936, 229574673752};
+
+const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]);
+static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough.");
+static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct.");
 } // namespace
 
+int64_t CalcAllocIndex(int64_t size)
+{
+	auto pos = std::lower_bound(AllocSizeIndex, AllocSizeIndex + kAllocIndexLen, size);
+	return (pos == AllocSizeIndex + kAllocIndexLen) ? -1 : pos - AllocSizeIndex;
+}
+
+int64_t GetAllocSize(int index) { return index < kAllocIndexLen ? AllocSizeIndex[index] : 0; }
+
 std::string BHomeShmName()
 {
 	return "bhome_default_shm_v0";
diff --git a/src/defs.h b/src/defs.h
index a95c81f..f0a0d49 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -23,14 +23,15 @@
 
 typedef uint64_t MQId;
 
-const MQId kBHNodeInit = 10;
+const MQId kBHDefaultSender = 99;
 const MQId kBHTopicCenter = 100;
 const MQId kBHTopicBus = 101;
-const MQId kBHUniCenter = 102;
-inline const MQId BHInitAddress() { return kBHNodeInit; }
+inline const MQId BHGlobalSenderAddress() { return kBHDefaultSender; }
 inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; }
 inline const MQId BHTopicBusAddress() { return kBHTopicBus; }
-inline const MQId BHUniCenterAddress() { return kBHUniCenter; }
+
+int64_t CalcAllocIndex(int64_t size);
+int64_t GetAllocSize(int index);
 
 const int kBHCenterPort = 24287;
 const char kTopicSep = '.';
diff --git a/src/msg.cpp b/src/msg.cpp
index f180d67..a4777d2 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -17,8 +17,37 @@
  */
 #include "msg.h"
 #include "bh_util.h"
+#include "socket.h"
 
 namespace bhome_msg
 {
 
+ShmSocket &ShmMsg::Sender()
+{
+	static ShmSocket sender(shm(), false, BHGlobalSenderAddress(), 16);
+	return sender;
+}
+
+int ShmMsg::Release()
+{
+	if (!valid()) {
+		return 0;
+	}
+	auto n = meta()->count_.Dec();
+	if (n == 0) {
+		int64_t free_cmd = (id() << 4) | EncodeCmd(eCmdFree);
+		Sender().Send(BHTopicCenterAddress(), free_cmd);
+	} else if (n < 0) {
+		throw -123;
+	}
+	return n;
+}
+
+void ShmMsg::Free()
+{
+	assert(valid());
+	shm().Dealloc(meta());
+	offset_ = 0;
+	assert(!valid());
+}
 } // namespace bhome_msg
diff --git a/src/msg.h b/src/msg.h
index 1f5b0f1..9589389 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -26,6 +26,7 @@
 #include <functional>
 #include <stdint.h>
 
+class ShmSocket;
 namespace bhome_msg
 {
 using namespace bhome_shm;
@@ -35,8 +36,9 @@
 
 class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg>
 {
-private:
 	static inline SharedMemory &shm() { return GetData(); }
+	static ShmSocket &Sender();
+
 	// store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
 	class RefCount : private boost::noncopyable
 	{
@@ -49,6 +51,7 @@
 		int Dec() { return --num_; }
 		int Get() { return num_.load(); }
 	};
+
 	typedef int64_t OffsetType;
 	static OffsetType Addr(void *ptr) { return reinterpret_cast<OffsetType>(ptr); }
 	static void *Ptr(const OffsetType offset) { return reinterpret_cast<void *>(offset); }
@@ -60,14 +63,22 @@
 
 	static const uint32_t kMsgTag = 0xf1e2d3c4;
 	struct Meta {
+		static int64_t NewId()
+		{
+			static std::atomic<int64_t> id(0);
+			return ++id;
+		}
+
 		RefCount count_;
 		const uint32_t tag_ = kMsgTag;
 		const uint32_t size_ = 0;
+		const int64_t id_ = 0;
+		std::atomic<int64_t> timestamp_;
 		Meta(uint32_t size) :
-		    size_(size) {}
+		    size_(size), id_(NewId()), timestamp_(NowSec()) {}
 	};
 	OffsetType offset_;
-	void *Alloc(const size_t size)
+	static void *Alloc(const size_t size)
 	{
 		void *p = shm().Alloc(sizeof(Meta) + size);
 		if (p) {
@@ -76,45 +87,33 @@
 		}
 		return p;
 	}
-	void Free()
-	{
-		assert(valid());
-		shm().Dealloc(meta());
-		offset_ = 0;
-		assert(!valid());
-	}
+
+private:
 	Meta *meta() const { return get<Meta>() - 1; }
 
 	typedef std::function<void(void *p, int len)> ToArray;
-	void *Pack(const uint32_t head_len, const ToArray &headToArray,
-	           const uint32_t body_len, const ToArray &bodyToArray)
+
+	template <class Body>
+	void *Pack(const BHMsgHead &head, const uint32_t head_len, const Body &body, const uint32_t body_len)
 	{
-		void *addr = Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len);
+		void *addr = get();
 		if (addr) {
 			auto p = static_cast<char *>(addr);
-			auto Pack1 = [&p](auto len, auto &writer) {
+			auto Pack1 = [&p](auto len, auto &&writer) {
 				Put32(p, len);
 				p += sizeof(len);
 				writer(p, len);
 				p += len;
 			};
-			Pack1(head_len, headToArray);
-			Pack1(body_len, bodyToArray);
+			Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); });
+			Pack1(body_len, [&](void *p, int len) { body.SerializeToArray(p, len); });
 		}
 		return addr;
 	}
 
-	template <class Body>
-	void *Pack(const BHMsgHead &head, const Body &body)
-	{
-		return Pack(
-		    uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); },
-		    uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); });
-	}
-
 	void *Pack(const std::string &content)
 	{
-		void *addr = Alloc(content.size());
+		void *addr = get();
 		if (addr) {
 			memcpy(addr, content.data(), content.size());
 		}
@@ -133,36 +132,48 @@
 	    offset_(p ? (Addr(p) - BaseAddr()) : 0) {}
 
 	template <class T = void>
-	T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); }
+	T *get() const { return offset_ != 0 ? static_cast<T *>(Ptr(offset_ + BaseAddr())) : nullptr; }
 
 public:
 	static bool BindShm(SharedMemory &shm) { return SetData(shm); }
 	ShmMsg() :
-	    ShmMsg(nullptr) {}
+	    offset_(0) {}
 	explicit ShmMsg(const OffsetType offset) :
 	    offset_(offset) {}
 	OffsetType Offset() const { return offset_; }
 	OffsetType &OffsetRef() { return offset_; }
 	void swap(ShmMsg &a) { std::swap(offset_, a.offset_); }
-	bool valid() const { return static_cast<bool>(offset_) && meta()->tag_ == kMsgTag; }
-
-	int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
-	int Release()
-	{
-		if (!valid()) {
-			return 0;
-		}
-		auto n = meta()->count_.Dec();
-		if (n == 0) {
-			Free();
-		}
-		return n;
-	}
+	bool valid() const { return offset_ != 0 && meta()->tag_ == kMsgTag; }
+	int64_t id() const { return valid() ? meta()->id_ : 0; }
+	int64_t timestamp() const { return valid() ? meta()->timestamp_.load() : 0; }
+	size_t Size() const { return valid() ? meta()->size_ : 0; }
 	int Count() const { return valid() ? meta()->count_.Get() : 1; }
+	int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
+	int Release();
+	void Free();
 
 	template <class Body>
-	inline bool Make(const BHMsgHead &head, const Body &body) { return Make(Pack(head, body)); }
-	inline bool Make(const std::string &content) { return Make(Pack(content)); }
+	inline bool Make(const BHMsgHead &head, const Body &body)
+	{
+		uint32_t head_len = head.ByteSizeLong();
+		uint32_t body_len = body.ByteSizeLong();
+		uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len;
+		return Make(size) && Pack(head, head_len, body, body_len);
+	}
+	template <class Body>
+	inline bool Fill(const BHMsgHead &head, const Body &body)
+	{
+		uint32_t head_len = head.ByteSizeLong();
+		uint32_t body_len = body.ByteSizeLong();
+		uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len;
+		return valid() && (meta()->size_ >= size) && Pack(head, head_len, body, body_len);
+	}
+
+	inline bool Make(const std::string &content) { return Make(content.size()) && Pack(content); }
+	inline bool Fill(const std::string &content) { return valid() && (meta()->size_ >= content.size()) && Pack(content); }
+
+	inline bool Make(const size_t size) { return Make(Alloc(size)); }
+
 	template <class Body>
 	static inline std::string Serialize(const BHMsgHead &head, const Body &body)
 	{
@@ -208,6 +219,18 @@
 
 typedef ShmMsg MsgI;
 
+constexpr inline int EncodeCmd(int cmd) { return ((cmd & MaskBits(3)) << 1) | 1; }
+constexpr inline int DecodeCmd(int64_t msg) { return (msg >> 1) & MaskBits(3); }
+constexpr inline bool IsCmd(int64_t msg) { return (msg & 1) != 0; }
+// int64_t pack format: cmd data ,3bit cmd, 1bit flag.
+enum MsgCmd {
+	eCmdNodeInit = 0,      // upto 59bit ssn id
+	eCmdNodeInitReply = 1, // 31bit proc index,
+	eCmdAllocRequest0 = 2, // 8bit size, 4bit socket index, 16bit proc index, 28bit id
+	eCmdAllocReply0 = 3,   // 31bit ptr, 28bit id,
+	eCmdFree = 4,          // upto 59bit msg id,
+};
+
 } // namespace bhome_msg
 
 #endif // end of include guard: MSG_5BILLZET
diff --git a/src/proto.h b/src/proto.h
index 94a438c..c05407b 100644
--- a/src/proto.h
+++ b/src/proto.h
@@ -48,6 +48,8 @@
 BHOME_SIMPLE_MAP_MSG(Publish);
 BHOME_SIMPLE_MAP_MSG(Subscribe);
 BHOME_SIMPLE_MAP_MSG(Unsubscribe);
+BHOME_SIMPLE_MAP_MSG(ProcInit);
+BHOME_SIMPLE_MAP_MSG(ProcInitReply);
 
 #undef BHOME_SIMPLE_MAP_MSG
 #undef BHOME_MAP_MSG_AND_TYPE
diff --git a/src/robust.cpp b/src/robust.cpp
index 26d41b9..4654652 100644
--- a/src/robust.cpp
+++ b/src/robust.cpp
@@ -35,24 +35,30 @@
 
 bool FMutex::try_lock()
 {
-	if (mtx_.try_lock()) {
-		if (flock(fd_, LOCK_EX | LOCK_NB) == 0) {
+	if (flock(fd_, LOCK_EX | LOCK_NB) == 0) {
+		++count_;
+		if (mtx_.try_lock()) {
 			return true;
 		} else {
-			mtx_.unlock();
+			if (--count_ == 0) {
+				flock(fd_, LOCK_UN);
+			}
 		}
 	}
 	return false;
 }
 void FMutex::lock()
 {
-	mtx_.lock();
 	flock(fd_, LOCK_EX);
+	++count_;
+	mtx_.lock();
 }
 void FMutex::unlock()
 {
-	flock(fd_, LOCK_UN);
 	mtx_.unlock();
+	if (--count_ == 0) {
+		flock(fd_, LOCK_UN);
+	}
 }
 
 } // namespace robust
\ No newline at end of file
diff --git a/src/robust.h b/src/robust.h
index 8657122..c70e2fe 100644
--- a/src/robust.h
+++ b/src/robust.h
@@ -19,6 +19,7 @@
 #ifndef ROBUST_Q31RCWYU
 #define ROBUST_Q31RCWYU
 
+#include "bh_util.h"
 #include "log.h"
 #include <atomic>
 #include <chrono>
@@ -37,8 +38,6 @@
 
 using namespace std::chrono;
 using namespace std::chrono_literals;
-constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; }
-
 void QuickSleep();
 
 class CasMutex
@@ -99,7 +98,7 @@
 public:
 	typedef uint64_t id_t;
 	FMutex(id_t id) :
-	    id_(id), fd_(Open(id_))
+	    id_(id), fd_(Open(id_)), count_(0)
 	{
 		if (fd_ == -1) { throw "error create mutex!"; }
 	}
@@ -117,11 +116,10 @@
 	}
 	static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); }
 	static int Close(int fd) { return close(fd); }
-	void FLock();
-	void FUnlock();
 	id_t id_;
 	int fd_;
 	std::mutex mtx_;
+	std::atomic<int32_t> count_;
 };
 
 union semun {
@@ -310,5 +308,36 @@
 	AData buf[capacity];
 };
 
+template <class Int>
+class AtomicQueue<0, Int>
+{
+	typedef Int Data;
+	typedef std::atomic<Data> AData;
+	static_assert(sizeof(Data) == sizeof(AData));
+
+public:
+	AtomicQueue() { memset(this, 0, sizeof(*this)); }
+	bool push(const Data d, bool try_more = false)
+	{
+		auto cur = buf.load();
+		return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d));
+	}
+	bool pop(Data &d, bool try_more = false)
+	{
+		Data cur = buf.load();
+		bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0);
+		if (r) { d = Dec(cur); }
+		return r;
+	}
+	uint32_t head() const { return 0; }
+	uint32_t tail() const { return 0; }
+
+private:
+	static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
+	static inline Data Enc(const Data d) { return (d << 1) | 1; }   // lowest bit 1 means data ok.
+	static inline Data Dec(const Data d) { return d >> 1; }         // lowest bit 1 means data ok.
+	AData buf;
+};
+
 } // namespace robust
 #endif // end of include guard: ROBUST_Q31RCWYU
diff --git a/src/sendq.cpp b/src/sendq.cpp
index c0d5afd..36af264 100644
--- a/src/sendq.cpp
+++ b/src/sendq.cpp
@@ -40,20 +40,24 @@
 	}
 
 	auto SendData = [&](Data &d) {
+		auto TryLoop = [&](auto &&data) {
+			for (int i = 0; i < 1; ++i) {
+				if (mq.TrySend(remote, data)) {
+					return true;
+				}
+			}
+			return false;
+		};
 		bool r = false;
 		if (d.index() == 0) {
 			auto &msg = boost::variant2::get<0>(pos->data().data_);
-			r = mq.TrySend(remote, msg);
+			r = TryLoop(msg);
 			if (r) {
 				msg.Release();
 			}
 		} else {
-			auto &content = boost::variant2::get<1>(pos->data().data_);
-			MsgI msg;
-			if (msg.Make(content)) {
-				DEFER1(msg.Release(););
-				r = mq.TrySend(remote, msg);
-			}
+			auto command = boost::variant2::get<1>(pos->data().data_);
+			r = TryLoop(command);
 		}
 		return r;
 	};
@@ -110,4 +114,4 @@
 	Collect();
 
 	return !out_.empty();
-}
\ No newline at end of file
+}
diff --git a/src/sendq.h b/src/sendq.h
index 0e565d5..862a1cc 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -37,7 +37,8 @@
 	typedef MQId Remote;
 	typedef bhome_msg::MsgI MsgI;
 	typedef std::string Content;
-	typedef boost::variant2::variant<MsgI, Content> Data;
+	typedef int64_t Command;
+	typedef boost::variant2::variant<MsgI, Command> Data;
 	typedef std::function<void(const Data &)> OnMsgEvent;
 	struct MsgInfo {
 		Data data_;
@@ -47,23 +48,16 @@
 	typedef TimedMsg::TimePoint TimePoint;
 	typedef TimedMsg::Duration Duration;
 
-	// template <class... Rest>
-	// void Append(const MQId &id, Rest &&...rest)
-	// {
-	// 	Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...);
-	// }
-
 	void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire = OnMsgEvent())
 	{
 		msg.AddRef();
 		AppendData(addr, Data(msg), DefaultExpire(), onExpire);
 	}
-	void Append(const Remote addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent())
+	void Append(const Remote addr, const Command command, OnMsgEvent onExpire = OnMsgEvent())
 	{
-		AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire);
+		AppendData(addr, Data(command), DefaultExpire(), onExpire);
 	}
 	bool TrySend(ShmMsgQueue &mq);
-	// bool empty() const { return store_.empty(); }
 
 private:
 	static TimePoint Now() { return TimedMsg::Clock::now(); }
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
index b78c1a0..d96c511 100644
--- a/src/shm_msg_queue.cpp
+++ b/src/shm_msg_queue.cpp
@@ -56,6 +56,7 @@
 
 ShmMsgQueue::~ShmMsgQueue() {}
 
+#ifndef BH_USE_ATOMIC_Q
 ShmMsgQueue::Mutex &ShmMsgQueue::GetMutex(const MQId id)
 {
 	static std::unordered_map<MQId, std::shared_ptr<Mutex>> imm;
@@ -69,13 +70,19 @@
 	}
 	return *pos->second;
 }
+#endif
+
 bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id)
 {
 	Queue *q = Find(shm, id);
 	if (q) {
-		MsgI msg;
-		while (q->TryRead(msg.OffsetRef())) {
-			msg.Release();
+		RawData val = 0;
+		while (q->TryRead(val)) {
+			if (IsCmd(val)) {
+				LOG_DEBUG() << "clsing queue " << id << ", has a cmd" << DecodeCmd(val);
+			} else {
+				MsgI(val).Release();
+			}
 		}
 	}
 	return Shmq::Remove(shm, MsgQIdToName(id));
@@ -86,19 +93,18 @@
 	return Shmq::Find(shm, MsgQIdToName(remote_id));
 }
 
-bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg)
+bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, int64_t val)
 {
-	bool r = false;
 	try {
 		ShmMsgQueue dest(remote_id, false, shm, 1);
-		msg.AddRef();
-		DEFER1(if (!r) { msg.Release(); });
-
+#ifndef BH_USE_ATOMIC_Q
 		Guard lock(GetMutex(remote_id));
-		r = dest.queue().TryWrite(msg.Offset());
+#endif
+		return dest.queue().TryWrite(val);
 	} catch (...) {
+		// SetLastError(eNotFound, "remote not found");
+		return false;
 	}
-	return r;
 }
 
 // Test shows that in the 2 cases:
diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h
index 1970803..4b7aed8 100644
--- a/src/shm_msg_queue.h
+++ b/src/shm_msg_queue.h
@@ -24,19 +24,25 @@
 using namespace bhome_shm;
 using namespace bhome_msg;
 
+#define BH_USE_ATOMIC_Q
+
 class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
 {
-	// typedef ShmObject<SharedQ63<4>> Shmq;
-	typedef ShmObject<SharedQueue<int64_t>> Shmq;
-	typedef Shmq::ShmType ShmType;
-	typedef Shmq::Data Queue;
-	typedef std::function<void()> OnSend;
-	typedef robust::FMutex Mutex;
-	// typedef robust::SemMutex Mutex;
-	// typedef robust::NullMutex Mutex;
-	typedef robust::Guard<Mutex> Guard;
-
 public:
+	typedef int64_t RawData;
+
+#ifdef BH_USE_ATOMIC_Q
+	typedef ShmObject<SharedQ63<0>> Shmq;
+#else
+	typedef ShmObject<SharedQueue<RawData>> Shmq;
+	// typedef robust::FMutex Mutex;
+	// typedef robust::SemMutex Mutex;
+	typedef robust::NullMutex Mutex;
+	typedef robust::Guard<Mutex> Guard;
+#endif
+
+	typedef Shmq::Data Queue;
+	typedef Shmq::ShmType ShmType;
 	typedef uint64_t MQId;
 
 	static MQId NewId();
@@ -45,26 +51,45 @@
 	ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len);
 	ShmMsgQueue(ShmType &segment, const int len);
 	~ShmMsgQueue();
-	static bool Remove(SharedMemory &shm, const MQId id);
+	static bool Remove(ShmType &shm, const MQId id);
 	MQId Id() const { return id_; }
 	ShmType &shm() const { return queue_.shm(); }
 
-	bool Recv(MsgI &msg, const int timeout_ms)
+	bool Recv(RawData &val, const int timeout_ms)
 	{
+#ifndef BH_USE_ATOMIC_Q
 		Guard lock(GetMutex(Id()));
-		return queue().Read(msg.OffsetRef(), timeout_ms);
+#endif
+		return queue().Read(val, timeout_ms);
 	}
-	bool TryRecv(MsgI &msg)
+
+	bool TryRecv(RawData &val)
 	{
+#ifndef BH_USE_ATOMIC_Q
 		Guard lock(GetMutex(Id()));
-		return queue().TryRead(msg.OffsetRef());
+#endif
+		return queue().TryRead(val);
 	}
-	static Queue *Find(SharedMemory &shm, const MQId remote_id);
-	static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg);
+
+	bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); }
+	bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); }
+	static Queue *Find(ShmType &shm, const MQId remote_id);
+	static bool TrySend(ShmType &shm, const MQId remote_id, const RawData val);
+	static bool TrySend(ShmType &shm, const MQId remote_id, MsgI msg)
+	{
+		bool r = false;
+		msg.AddRef(); // TODO check if we could avoid addref here.
+		DEFER1(if (!r) { msg.Release(); });
+		r = TrySend(shm, remote_id, msg.Offset());
+		return r;
+	}
 	bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); }
+	bool TrySend(const MQId remote_id, const RawData val) { return TrySend(shm(), remote_id, val); }
 
 private:
+#ifndef BH_USE_ATOMIC_Q
 	static Mutex &GetMutex(const MQId id);
+#endif
 	MQId id_;
 	Queue &queue() { return *queue_.data(); }
 	Shmq queue_;
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 5fd14e3..5c9e077 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -76,7 +76,7 @@
 
 private:
 	Circular<D> queue_;
-	bhome_shm::Mutex mutex_;
+	// bhome_shm::Mutex mutex_;
 };
 
 template <int Power = 4>
@@ -92,11 +92,12 @@
 		using namespace std::chrono;
 		auto end_time = steady_clock::now() + milliseconds(timeout_ms);
 		do {
-			if (TryRead(d)) {
-				return true;
-			} else {
-				robust::QuickSleep();
+			for (int i = 0; i < 100; ++i) {
+				if (TryRead(d)) {
+					return true;
+				}
 			}
+			robust::QuickSleep();
 		} while (steady_clock::now() < end_time);
 		return false;
 	}
diff --git a/src/socket.cpp b/src/socket.cpp
index 6231579..0704174 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -20,22 +20,25 @@
 #include "bh_util.h"
 #include "defs.h"
 #include "msg.h"
+#include <chrono>
+using namespace std::chrono;
+using namespace std::chrono_literals;
 
 using namespace bhome_msg;
 using namespace bhome_shm;
 
 ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) :
-    run_(false), mq_(id, shm, len)
+    run_(false), mq_(id, shm, len), alloc_id_(0)
 {
 	Start();
 }
 ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) :
-    run_(false), mq_(id, create_or_else_find, shm, len)
+    run_(false), mq_(id, create_or_else_find, shm, len), alloc_id_(0)
 {
 	Start();
 }
 ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
-    run_(false), mq_(shm, len)
+    run_(false), mq_(shm, len), alloc_id_(0)
 {
 	Start();
 }
@@ -45,50 +48,15 @@
 	Stop();
 }
 
-bool ShmSocket::Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker)
+bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle)
 {
-	auto ioProc = [this, onData, onIdle]() {
+	auto ioProc = [this, onData, onRaw, onIdle]() {
 		auto DoSend = [this]() { return send_buffer_.TrySend(mq()); };
 		auto DoRecv = [=] {
 			// do not recv if no cb is set.
-			if (!onData) {
-				return false;
-			}
-			auto onMsg = [&](MsgI &imsg) {
-				DEFER1(imsg.Release());
-				onData(*this, imsg);
-			};
-			MsgI imsg;
-			return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false;
-		};
+			if (!onData && per_msg_cbs_->empty() && !onRaw && alloc_cbs_->empty()) { return false; }
 
-		try {
-			bool more_to_send = DoSend();
-			bool more_to_recv = DoRecv();
-			if (onIdle) { onIdle(*this); }
-			if (!more_to_send && !more_to_recv) {
-				robust::QuickSleep();
-			}
-		} catch (...) {
-		}
-	};
-
-	std::lock_guard<std::mutex> lock(mutex_);
-	StopNoLock();
-
-	run_.store(true);
-	for (int i = 0; i < nworker; ++i) {
-		workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } });
-	}
-	return true;
-}
-
-bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
-{
-	auto ioProc = [this, onData, onIdle]() {
-		auto DoSend = [this]() { return send_buffer_.TrySend(mq()); };
-		auto DoRecv = [=] {
-			auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
+			auto onMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
 				RecvCB cb;
 				if (per_msg_cbs_->Pick(head.msg_id(), cb)) {
 					cb(socket, imsg, head);
@@ -96,20 +64,43 @@
 					onData(socket, imsg, head);
 				}
 			};
-
-			// do not recv if no cb is set.
-			if (!onData && per_msg_cbs_->empty()) {
-				return false;
-			}
-			auto onMsg = [&](MsgI &imsg) {
-				DEFER1(imsg.Release());
-				BHMsgHead head;
-				if (imsg.ParseHead(head)) {
-					onRecvWithPerMsgCB(*this, imsg, head);
+			auto onCmdCB = [this, onRaw](ShmSocket &socket, int64_t val) {
+				int cmd = DecodeCmd(val);
+				if (cmd == eCmdAllocReply0) {
+					int id = (val >> 4) & MaskBits(28);
+					RawRecvCB cb;
+					if (alloc_cbs_->Pick(id, cb)) {
+						cb(socket, val);
+						return;
+					}
+				}
+				if (onRaw) {
+					onRaw(socket, val);
 				}
 			};
-			MsgI imsg;
-			return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false;
+
+			auto onRecv = [&](auto &val) {
+				if (IsCmd(val)) {
+					onCmdCB(*this, val);
+				} else {
+					MsgI imsg(val);
+					DEFER1(imsg.Release());
+					BHMsgHead head;
+					if (imsg.ParseHead(head)) {
+						onMsgCB(*this, imsg, head);
+					}
+				}
+			};
+			ShmMsgQueue::RawData val = 0;
+			auto TryRecvMore = [&]() {
+				for (int i = 0; i < 100; ++i) {
+					if (mq().TryRecv(val)) {
+						return true;
+					}
+				}
+				return false;
+			};
+			return TryRecvMore() ? (onRecv(val), true) : false;
 		};
 
 		try {
@@ -126,9 +117,18 @@
 	std::lock_guard<std::mutex> lock(mutex_);
 	StopNoLock();
 
+	auto worker_proc = [this, ioProc]() {
+		while (run_) { ioProc(); }
+		// try send pending msgs.
+		auto end_time = steady_clock::now() + 3s;
+		while (send_buffer_.TrySend(mq()) && steady_clock::now() < end_time) {
+			// LOG_DEBUG() << "try send pending msgs.";
+		}
+	};
+
 	run_.store(true);
 	for (int i = 0; i < nworker; ++i) {
-		workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } });
+		workers_.emplace_back(worker_proc);
 	}
 	return true;
 }
@@ -153,6 +153,10 @@
 	return false;
 }
 
+bool ShmSocket::SyncRecv(int64_t &cmd, const int timeout_ms)
+{
+	return (timeout_ms == 0) ? mq().TryRecv(cmd) : mq().Recv(cmd, timeout_ms);
+}
 //maybe reimplment, using async cbs?
 bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms)
 {
@@ -167,3 +171,30 @@
 	}
 	return false;
 }
+
+bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult)
+{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag
+	// LOG_FUNCTION;
+	if (node_proc_index_ == -1 || socket_index_ == -1) {
+		return false;
+	}
+	int id = (++alloc_id_) & MaskBits(28);
+	int64_t cmd = (CalcAllocIndex(size) << 52) |
+	              ((socket_index_ & MaskBits(4)) << 48) |
+	              ((node_proc_index_ & MaskBits(16)) << 32) |
+	              (id << 4) |
+	              EncodeCmd(eCmdAllocRequest0);
+	auto rawCB = [onResult](ShmSocket &sock, int64_t &val) {
+		MsgI msg((val >> 32) & MaskBits(31));
+		DEFER1(msg.Release());
+		onResult(msg);
+		return true;
+	};
+
+	alloc_cbs_->Store(id, std::move(rawCB));
+	auto onExpireRemoveCB = [this, id](SendQ::Data const &msg) {
+		RawRecvCB cb_no_use;
+		alloc_cbs_->Pick(id, cb_no_use);
+	};
+	return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB);
+}
\ No newline at end of file
diff --git a/src/socket.h b/src/socket.h
index 981677f..d69b8d4 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -42,7 +42,7 @@
 public:
 	typedef ShmMsgQueue::MQId MQId;
 	typedef bhome_shm::SharedMemory Shm;
-	typedef std::function<void(ShmSocket &sock, MsgI &imsg)> RawRecvCB;
+	typedef std::function<void(ShmSocket &sock, Queue::RawData &val)> RawRecvCB;
 	typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
 	typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
 	typedef std::function<void(ShmSocket &sock)> IdleCB;
@@ -54,39 +54,74 @@
 	static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
 	bool Remove() { return Remove(shm(), id()); }
 	MQId id() const { return mq().Id(); }
+	void SetNodeProc(const int proc_index, const int socket_index)
+	{
+		node_proc_index_ = proc_index;
+		socket_index_ = socket_index;
+		LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_;
+	}
 	// start recv.
-	bool Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker = 1);
-	bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
-	bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
+	bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB());
+	bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, RawRecvCB(), onIdle); }
 	bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
 	bool Stop();
 
 	template <class Body>
-	bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
+	bool CenterSend(const MQId remote, BHMsgHead &head, Body &body)
 	{
 		try {
-			if (!cb) {
-				return SendImpl(remote, MsgI::Serialize(head, body));
-			} else {
-				std::string msg_id(head.msg_id());
-				per_msg_cbs_->Store(msg_id, std::move(cb));
-				auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
-					RecvCB cb_no_use;
-					per_msg_cbs_->Pick(msg_id, cb_no_use);
-				};
-				return SendImpl(remote, MsgI::Serialize(head, body), onExpireRemoveCB);
-			}
+			//TODO alloc outsiez and use send.
+			MsgI msg;
+			if (!msg.Make(head, body)) { return false; }
+			DEFER1(msg.Release());
+
+			return Send(remote, msg);
 		} catch (...) {
 			SetLastError(eError, "Send internal error.");
 			return false;
 		}
 	}
 
-	bool Send(const MQId remote, const MsgI &imsg)
-	{
-		return SendImpl(remote, imsg);
-	}
+	bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
 
+	template <class Body>
+	bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
+	{
+		std::string msg_id(head.msg_id());
+		std::string content(MsgI::Serialize(head, body));
+		size_t size = content.size();
+		auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
+			if (!msg.Fill(content)) { return; }
+
+			try {
+				if (!cb) {
+					Send(remote, msg);
+				} else {
+					per_msg_cbs_->Store(msg_id, std::move(cb));
+					auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
+						RecvCB cb_no_use;
+						per_msg_cbs_->Pick(msg_id, cb_no_use);
+					};
+					Send(remote, msg, onExpireRemoveCB);
+				}
+			} catch (...) {
+				SetLastError(eError, "Send internal error.");
+			}
+		};
+
+		return RequestAlloc(size, OnResult);
+	}
+	template <class... T>
+	bool Send(const MQId remote, const MsgI &imsg, T &&...t)
+	{
+		return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...);
+	}
+	template <class... T>
+	bool Send(const MQId remote, const int64_t cmd, T &&...t)
+	{
+		return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...);
+	}
+	bool SyncRecv(int64_t &cmd, const int timeout_ms);
 	bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
 
 	template <class Body>
@@ -153,15 +188,15 @@
 	std::atomic<bool> run_;
 
 	Queue mq_;
-	template <class Key>
+	template <class Key, class CB>
 	class CallbackRecords
 	{
-		std::unordered_map<Key, RecvCB> store_;
+		std::unordered_map<Key, CB> store_;
 
 	public:
 		bool empty() const { return store_.empty(); }
-		bool Store(const Key &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; }
-		bool Pick(const Key &id, RecvCB &cb)
+		bool Store(const Key &id, CB &&cb) { return store_.emplace(id, std::move(cb)).second; }
+		bool Pick(const Key &id, CB &cb)
 		{
 			auto pos = store_.find(id);
 			if (pos != store_.end()) {
@@ -174,9 +209,14 @@
 		}
 	};
 
-	Synced<CallbackRecords<std::string>> per_msg_cbs_;
+	Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_;
+	Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_;
 
 	SendQ send_buffer_;
+	// node request center alloc memory.
+	int node_proc_index_ = -1;
+	int socket_index_ = -1;
+	std::atomic<int> alloc_id_;
 };
 
 #endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index d8d6a42..35228b4 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -42,7 +42,6 @@
 TopicNode::TopicNode(SharedMemory &shm) :
     shm_(shm), state_(eStateUnregistered)
 {
-	Init();
 }
 
 TopicNode::~TopicNode()
@@ -57,34 +56,79 @@
 
 	if (Valid()) {
 		return true;
+	} else if (info_.proc_id().empty()) {
+		return false;
 	}
 
 	if (ssn_id_ == 0) {
 		ssn_id_ = ShmMsgQueue::NewId();
 	}
 	LOG_DEBUG() << "Node Init, id " << ssn_id_;
-	MsgI msg;
-	msg.OffsetRef() = ssn_id_;
-	if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) {
-
-		auto end_time = steady_clock::now() + 3s;
-		do {
-			try {
-				for (int i = eSockStart; i < eSockEnd; ++i) {
-					sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen));
+	auto NodeInit = [&]() {
+		auto SendInitCmd = [&]() {
+			int64_t init_cmd = ssn_id_ << 4 | EncodeCmd(eCmdNodeInit);
+			auto end_time = steady_clock::now() + 3s;
+			bool r = false;
+			do {
+				r = ShmMsgQueue::TrySend(shm(), BHTopicCenterAddress(), init_cmd);
+			} while (!r && steady_clock::now() < end_time);
+			return r;
+		};
+		if (SendInitCmd()) {
+			LOG_DEBUG() << "node send init ok";
+			auto end_time = steady_clock::now() + 3s;
+			do {
+				try {
+					for (int i = eSockStart; i < eSockEnd; ++i) {
+						sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen));
+					}
+					break;
+				} catch (...) {
+					sockets_.clear();
+					std::this_thread::sleep_for(100ms);
 				}
-				break;
-			} catch (...) {
-				sockets_.clear();
-				std::this_thread::sleep_for(100ms);
-			}
-		} while (steady_clock::now() < end_time);
-		if (!sockets_.empty()) {
-			// recv msgs to avoid memory leak.
-			auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
-			SockNode().Start(default_ignore_msg);
-			return true;
+			} while (steady_clock::now() < end_time);
 		}
+	};
+	if (sockets_.empty()) {
+		NodeInit();
+	}
+	if (!sockets_.empty()) {
+		LOG_DEBUG() << "node sockets ok";
+		auto onNodeCmd = [this](ShmSocket &socket, int64_t &val) {
+			LOG_DEBUG() << "node recv cmd: " << DecodeCmd(val);
+			switch (DecodeCmd(val)) {
+			case eCmdNodeInitReply: {
+				MsgI msg(val >> 4);
+				DEFER1(msg.Release());
+				MsgProcInit body;
+				auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_);
+				head.add_route()->set_mq_id(ssn_id_);
+				if (msg.Fill(head, body)) {
+					socket.Send(BHTopicCenterAddress(), msg);
+				}
+			} break;
+			default:
+				break;
+			}
+			return true;
+		};
+
+		// recv msgs to avoid memory leak.
+		auto onMsg = [this](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+			LOG_DEBUG() << "node recv type: " << head.type();
+			if (head.type() == kMsgTypeProcInitReply) {
+				LOG_DEBUG() << "got proc init reply";
+				MsgProcInitReply reply;
+				if (imsg.ParseBody(reply)) {
+					SetProcIndex(reply.proc_index());
+				}
+			}
+			return true;
+		};
+		SockNode().Start(1, onMsg, onNodeCmd);
+		LOG_DEBUG() << "sockets ok.";
+		return true;
 	}
 	return false;
 }
@@ -100,7 +144,7 @@
 	} else if (nworker > 16) {
 		nworker = 16;
 	}
-	SockNode().Start();
+	// SockNode().Start();
 	ServerStart(server_cb, nworker);
 	SubscribeStartWorker(sub_cb, nworker);
 	ClientStartWorker(client_cb, nworker);
@@ -114,12 +158,15 @@
 
 bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
 {
+	{
+		std::lock_guard<std::mutex> lk(mutex_);
+		info_ = proc;
+	}
+
 	if (!Init()) {
 		SetLastError(eError, kErrMsgNotInit);
 		return false;
 	}
-
-	info_ = proc;
 
 	auto &sock = SockNode();
 	MsgRegister body;
diff --git a/src/topic_node.h b/src/topic_node.h
index 338a6e3..b018807 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -130,6 +130,14 @@
 	ShmSocket &SockClient() { return *sockets_[eSockClient]; }
 	ShmSocket &SockServer() { return *sockets_[eSockServer]; }
 
+	void SetProcIndex(int index)
+	{
+		proc_index_ = index;
+		for (int i = eSockStart; i < eSockEnd; ++i) {
+			sockets_[i]->SetNodeProc(index, i);
+		}
+	}
+
 	enum State {
 		eStateUnregistered,
 		eStateOnline,
@@ -144,6 +152,7 @@
 	std::mutex mutex_;
 	MQId ssn_id_ = 0;
 	std::atomic<State> state_;
+	int proc_index_ = -1;
 
 	TopicQueryCache topic_query_cache_;
 };
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index bd59c7f..f48f307 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -293,7 +293,7 @@
 	// }
 
 	int same = 0;
-	int64_t last = 0;
+	uint64_t last = 0;
 	while (last < nreq * ncli && same < 2) {
 		Sleep(1s, false);
 		auto cur = Status().nreply_.load();
diff --git a/utest/robust_test.cpp b/utest/robust_test.cpp
index 0645918..e7b8894 100644
--- a/utest/robust_test.cpp
+++ b/utest/robust_test.cpp
@@ -39,19 +39,24 @@
 	std::atomic<uint64_t> nwrite(0);
 	std::atomic<uint64_t> writedone(0);
 
-#if 0
-	typedef AtomicQueue<4> Rcb;
+#if 1
+	const int kPower = 0;
+	typedef AtomicQueue<kPower> Rcb;
 
 	Rcb tmp;
-	BOOST_CHECK(tmp.like_empty());
+	// BOOST_CHECK(tmp.like_empty());
 	BOOST_CHECK(tmp.push(1));
-	BOOST_CHECK(tmp.tail() == 1);
+	if (kPower != 0) {
+		BOOST_CHECK(tmp.tail() == 1);
+	}
 	BOOST_CHECK(tmp.head() == 0);
 	int64_t d;
 	BOOST_CHECK(tmp.pop(d));
-	BOOST_CHECK(tmp.like_empty());
-	BOOST_CHECK(tmp.head() == 1);
-	BOOST_CHECK(tmp.tail() == 1);
+	if (kPower != 0) {
+		// BOOST_CHECK(tmp.like_empty());
+		BOOST_CHECK(tmp.head() == 1);
+		BOOST_CHECK(tmp.tail() == 1);
+	}
 
 	ShmObject<Rcb> rcb(shm, "test_rcb");
 	bool try_more = true;
@@ -166,18 +171,20 @@
 BOOST_AUTO_TEST_CASE(MutexTest)
 {
 	{
-		int fd = open("/tmp/test_fmutex", O_CREAT | O_RDONLY, 0666);
-		flock(fd, LOCK_EX);
-		printf("lock 1");
+		int sem_id = semget(100, 1, 0666 | IPC_CREAT);
+		auto P = [&]() {
+			sembuf op = {0, -1, SEM_UNDO};
+			semop(sem_id, &op, 1);
+		};
+		auto V = [&]() {
+			sembuf op = {0, 1, SEM_UNDO};
+			semop(sem_id, &op, 1);
+		};
+		for (int i = 0; i < 10; ++i) {
+			V();
+		}
 		Sleep(10s);
-		flock(fd, LOCK_EX);
-		printf("lock 2");
-		Sleep(10s);
-		flock(fd, LOCK_UN);
-		printf("un lock 2");
-		Sleep(10s);
-		flock(fd, LOCK_UN);
-		printf("un lock 1");
+
 		return;
 	}
 
@@ -204,7 +211,7 @@
 
 	std::mutex m;
 	typedef std::chrono::steady_clock Clock;
-	auto Now = []() { return Clock::now().time_since_epoch(); };
+
 	if (pi) {
 		auto old = *pi;
 		printf("int : %d, add1: %d\n", old, ++*pi);
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index f8f54f5..334c081 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -92,9 +92,9 @@
 		}
 	};
 
-	int nwriters[] = {1, 10, 100};
+	int nwriters[] = {1, 10, 100, 1000};
 	int nreaders[] = {2};
-	const int64_t total_msg = 1000 * 100;
+	const int64_t total_msg = 1000 * 1000;
 
 	auto Test = [&](auto &www, auto &rrr, bool isfork) {
 		for (auto nreader : nreaders) {
@@ -127,12 +127,13 @@
 	// typedef ThreadManager Manager;
 	// const bool isfork = IsSameType<Manager, ProcessManager>::value;
 
-	{
+	if (0) {
 		ThreadManager tw, tr;
 		printf("---------------- Testing thread io:  -------------------------------------------------------\n");
 		Test(tw, tr, false);
 	}
-	{
+
+	if (1) {
 		ProcessManager pw, pr;
 		printf("================ Testing process io: =======================================================\n");
 		Test(pw, pr, true);

--
Gitblit v1.8.0