From 1fbfef2a51db4a3bac9d8a5b87af94a40a913b7a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期日, 25 四月 2021 15:33:40 +0800
Subject: [PATCH] change mqid from uuid to uint64.

---
 src/shm.h                        |   16 ++
 src/socket.h                     |   34 ++--
 box/center.cpp                   |   32 ++---
 src/msg.h                        |   24 ---
 src/socket.cpp                   |    2 
 src/defs.h                       |   14 +-
 utest/utest.cpp                  |    6 
 box/center.h                     |    5 
 src/sendq.cpp                    |    8 
 src/shm_queue.cpp                |   26 ++-
 src/topic_node.cpp               |   44 +++---
 src/defs.cpp                     |   19 +-
 box/center_main.cc               |    6 
 utest/speed_test.cpp             |   12 -
 proto/source/bhome_msg_api.proto |    4 
 utest/api_test.cpp               |    4 
 box/status_main.cc               |    2 
 src/bh_util.h                    |   25 ++++
 src/shm_queue.h                  |   34 ++---
 src/sendq.h                      |   22 +-
 src/bh_api.cpp                   |    2 
 21 files changed, 177 insertions(+), 164 deletions(-)

diff --git a/box/center.cpp b/box/center.cpp
index badfbfe..d920ff7 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -37,9 +37,9 @@
 {
 public:
 	typedef std::string ProcId;
-	typedef std::string Address;
+	typedef MQId Address;
 	typedef bhome_msg::ProcInfo ProcInfo;
-	typedef std::function<void(Address const &)> Cleaner;
+	typedef std::function<void(Address const)> Cleaner;
 
 private:
 	enum {
@@ -84,7 +84,7 @@
 		WeakNode weak_node_;
 		bool operator<(const TopicDest &a) const { return mq_ < a.mq_; }
 	};
-	inline const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
+	inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
 	inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
 
 	NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) :
@@ -182,7 +182,7 @@
 	{
 		return HandleMsg(
 		    head, [&](Node node) -> MsgCommonReply {
-			    auto &src = SrcAddr(head);
+			    auto src = SrcAddr(head);
 			    auto &topics = msg.topics().topic_list();
 			    node->services_[src].insert(topics.begin(), topics.end());
 			    TopicDest dest = {src, node};
@@ -240,7 +240,7 @@
 	MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg)
 	{
 		return HandleMsg(head, [&](Node node) {
-			auto &src = SrcAddr(head);
+			auto src = SrcAddr(head);
 			auto &topics = msg.topics().topic_list();
 			node->subscriptions_[src].insert(topics.begin(), topics.end());
 			TopicDest dest = {src, node};
@@ -253,7 +253,7 @@
 	MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
 	{
 		return HandleMsg(head, [&](Node node) {
-			auto &src = SrcAddr(head);
+			auto src = SrcAddr(head);
 			auto pos = node->subscriptions_.find(src);
 
 			auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) {
@@ -426,8 +426,8 @@
 	auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
 		return [&](auto &&rep_body) {
 			auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
-			auto &remote = head.route(0).mq_id();
-			socket.Send(remote.data(), reply_head, rep_body);
+			auto remote = head.route(0).mq_id();
+			socket.Send(remote, reply_head, rep_body);
 		};
 	};
 
@@ -473,7 +473,7 @@
 					if (node) {
 						// should also make sure that mq is not killed before msg expires.
 						// it would be ok if (kill_time - offline_time) is longer than expire time.
-						socket.Send(cli.mq_.data(), msg);
+						socket.Send(cli.mq_, msg);
 						++it;
 					} else {
 						it = clients.erase(it);
@@ -505,28 +505,24 @@
 	return rec;
 }
 
-bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len)
+bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len)
 {
 	Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len};
 	return true;
 }
-bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len)
-{
-	return Install(name, handler, idle, std::string((const char *) &mqid, sizeof(mqid)), mq_len);
-}
 
 BHCenter::BHCenter(Socket::Shm &shm)
 {
-	auto gc = [&](const std::string &id) {
-		auto r = ShmSocket::Remove(shm, *(MQId *) id.data());
-		printf("remove mq : %s\n", r ? "ok" : "failed");
+	auto gc = [&](const MQId id) {
+		auto r = ShmSocket::Remove(shm, id);
+		printf("remove mq %ld : %s\n", id, (r ? "ok" : "failed"));
 	};
 
 	AddCenter("#bhome_center", gc);
 
 	for (auto &kv : Centers()) {
 		auto &info = kv.second;
-		sockets_[info.name_] = std::make_shared<ShmSocket>(shm, *(MQId *) info.mqid_.data(), info.mq_len_);
+		sockets_[info.name_] = std::make_shared<ShmSocket>(shm, info.mqid_, info.mq_len_);
 	}
 }
 
diff --git a/box/center.h b/box/center.h
index 60639d5..ab8b15f 100644
--- a/box/center.h
+++ b/box/center.h
@@ -30,8 +30,7 @@
 public:
 	typedef Socket::PartialRecvCB MsgHandler;
 	typedef Socket::IdleCB IdleHandler;
-	static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len);
-	static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len);
+	static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len);
 
 	BHCenter(Socket::Shm &shm);
 	~BHCenter() { Stop(); }
@@ -43,7 +42,7 @@
 		std::string name_;
 		MsgHandler handler_;
 		IdleHandler idle_;
-		std::string mqid_;
+		MQId mqid_;
 		int mq_len_ = 0;
 	};
 	typedef std::map<std::string, CenterInfo> CenterRecords;
diff --git a/box/center_main.cc b/box/center_main.cc
index 7f4b26b..fdda2cd 100644
--- a/box/center_main.cc
+++ b/box/center_main.cc
@@ -44,8 +44,8 @@
 			return true;
 		}
 
-		auto mtx(shm_.find_or_construct<Mutex>((name_ + "_mutex_0").c_str())());
-		auto time_stamp(shm_.find_or_construct<int64_t>((name_ + "_timestamp_0").c_str())(0));
+		auto mtx(shm_.FindOrCreate<Mutex>(name_ + "_mutex_0"));
+		auto time_stamp(shm_.FindOrCreate<int64_t>(name_ + "_timestamp_0", 0));
 
 		if (mtx && time_stamp) {
 			Guard lock(*mtx);
@@ -86,7 +86,7 @@
 int center_main(int argc, const char *argv[])
 {
 	auto &shm = BHomeShm();
-	MsgI::BindShm(shm);
+	GlobalInit(shm);
 
 	AppArg args(argc, argv);
 	if (args.Has("remove")) {
diff --git a/box/status_main.cc b/box/status_main.cc
index a435c2f..e0fb932 100644
--- a/box/status_main.cc
+++ b/box/status_main.cc
@@ -44,7 +44,7 @@
 			return shm_name;
 		}
 	};
-	printf("monitoring shm : %s, size : %dM\n", DisplayName().c_str(), shm_size);
+	printf("monitoring shm : %s, size : %ldM\n", DisplayName().c_str(), shm_size);
 
 	SharedMemory shm(shm_name, 1024 * 1024 * shm_size);
 	std::atomic<bool> run(true);
diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto
index 838c228..94bc82e 100644
--- a/proto/source/bhome_msg_api.proto
+++ b/proto/source/bhome_msg_api.proto
@@ -8,8 +8,8 @@
 package bhome_msg;
 
 message BHAddress {
-	bytes mq_id = 1; // mqid, uuid
-	bytes ip = 2;   //
+	uint64 mq_id = 1;
+	bytes ip = 2;   
 	int32 port = 3;
 }
 
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index c4ac9c9..7e7b2e9 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -10,7 +10,7 @@
 {
 TopicNode &ProcNode()
 {
-	static bool init_bind_msg_shm = MsgI::BindShm(BHomeShm());
+	static bool init = GlobalInit(BHomeShm());
 	static TopicNode node(BHomeShm());
 	return node;
 }
diff --git a/src/bh_util.h b/src/bh_util.h
index e3ab70b..c419a59 100644
--- a/src/bh_util.h
+++ b/src/bh_util.h
@@ -143,6 +143,31 @@
 	}
 };
 
+template <class T, class Tag>
+class StaticDataRef
+{
+	typedef T *Ptr;
+	static inline Ptr &ptr()
+	{
+		static Ptr sp(nullptr);
+		return sp;
+	}
+
+protected:
+	static inline T &GetData()
+	{
+		if (!ptr()) { throw std::string("Must set ShmMsg shm before use!"); }
+		return *ptr();
+	}
+
+public:
+	static bool SetData(T &t)
+	{
+		auto Bind = [&]() { ptr() = &t; return true; };
+		return ptr() ? false : Bind();
+	}
+};
+
 // macro helper
 #define JOIN_IMPL(a, b) a##b
 #define JOIN(a, b) JOIN_IMPL(a, b)
diff --git a/src/defs.cpp b/src/defs.cpp
index 0ff671b..0ca82bf 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -16,14 +16,11 @@
  * =====================================================================================
  */
 #include "defs.h"
-#include "shm.h"
+#include "msg.h"
+#include "shm_queue.h"
 
 namespace
 {
-
-const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
-const MQId kBHTopicCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
-const MQId kBHUniCenter = boost::uuids::string_generator()("87654321-89ab-cdef-8349-1234567890ff");
 
 struct LastError {
 	int ec_ = 0;
@@ -38,16 +35,20 @@
 
 } // namespace
 
-const MQId &BHTopicBusAddress() { return kBHTopicBus; }
-const MQId &BHTopicCenterAddress() { return kBHTopicCenter; }
-const MQId &BHUniCenterAddress() { return kBHUniCenter; }
-
 bhome_shm::SharedMemory &BHomeShm()
 {
 	static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512);
 	return shm;
 }
 
+bool GlobalInit(bhome_shm::SharedMemory &shm)
+{
+	MsgI::BindShm(shm);
+	typedef std::atomic<MQId> IdSrc;
+	IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000);
+	return ShmMsgQueue::SetData(*psrc);
+}
+
 void SetLastError(const int ec, const std::string &msg)
 {
 	LastErrorStore().ec_ = ec;
diff --git a/src/defs.h b/src/defs.h
index 08181d8..1c9e663 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -19,15 +19,16 @@
 #ifndef DEFS_KP8LKGD0
 #define DEFS_KP8LKGD0
 
-#include <boost/uuid/uuid.hpp>
-#include <boost/uuid/uuid_generators.hpp>
 #include <string>
 
-typedef boost::uuids::uuid MQId;
+typedef uint64_t MQId;
 
-const MQId &BHTopicBusAddress();
-const MQId &BHTopicCenterAddress();
-const MQId &BHUniCenterAddress();
+const MQId kBHTopicCenter = 100;
+const MQId kBHTopicBus = 101;
+const MQId kBHUniCenter = 102;
+inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; }
+inline const MQId BHTopicBusAddress() { return kBHTopicBus; }
+inline const MQId BHUniCenterAddress() { return kBHUniCenter; }
 
 const int kBHCenterPort = 24287;
 const char kTopicSep = '.';
@@ -37,6 +38,7 @@
 } // namespace bhome_shm
 
 bhome_shm::SharedMemory &BHomeShm();
+bool GlobalInit(bhome_shm::SharedMemory &shm);
 typedef std::string Topic;
 void SetLastError(const int ec, const std::string &msg);
 void GetLastError(int &ec, std::string &msg);
diff --git a/src/msg.h b/src/msg.h
index 6ce4902..e332a5d 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -23,7 +23,6 @@
 #include "shm.h"
 #include <atomic>
 #include <boost/interprocess/offset_ptr.hpp>
-#include <boost/uuid/uuid_generators.hpp>
 #include <functional>
 #include <stdint.h>
 
@@ -34,11 +33,10 @@
 // ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required.
 // message content layout: (meta) / header_size + header + data_size + data
 
-typedef boost::uuids::uuid MQId;
-
-class ShmMsg
+class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg>
 {
 private:
+	static inline SharedMemory &shm() { return GetData(); }
 	// store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
 	class RefCount : private boost::noncopyable
 	{
@@ -58,16 +56,6 @@
 	{
 		static const Offset base = Addr(shm().get_address()); // cache value.
 		return base;
-	}
-	static inline SharedMemory &shm()
-	{
-		if (!pshm()) { throw std::string("Must set ShmMsg shm before use!"); }
-		return *pshm();
-	}
-	static inline SharedMemory *&pshm()
-	{
-		static SharedMemory *pshm = 0;
-		return pshm;
 	}
 
 	static const uint32_t kMsgTag = 0xf1e2d3c4;
@@ -145,13 +133,7 @@
 	T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); }
 
 public:
-	static bool BindShm(SharedMemory &shm)
-	{
-		assert(!pshm());
-		pshm() = &shm;
-		return true;
-	}
-
+	static bool BindShm(SharedMemory &shm) { return SetData(shm); }
 	ShmMsg() :
 	    ShmMsg(nullptr) {}
 	explicit ShmMsg(const size_t size) :
diff --git a/src/sendq.cpp b/src/sendq.cpp
index 54de419..5b57d72 100644
--- a/src/sendq.cpp
+++ b/src/sendq.cpp
@@ -19,7 +19,7 @@
 #include "shm_queue.h"
 #include <chrono>
 
-int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr)
+int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr)
 {
 	auto FirstNotExpired = [](Array &l) {
 		auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; };
@@ -41,7 +41,7 @@
 		bool r = false;
 		if (d.index() == 0) {
 			auto &msg = boost::variant2::get<0>(pos->data().data_);
-			r = mq.TrySend(*(MQId *) remote.data(), msg);
+			r = mq.TrySend(remote, msg);
 			if (r) {
 				msg.Release();
 			}
@@ -50,7 +50,7 @@
 			MsgI msg;
 			if (msg.Make(content)) {
 				DEFER1(msg.Release(););
-				r = mq.TrySend(*(MQId *) remote.data(), msg);
+				r = mq.TrySend(remote, msg);
 			}
 		}
 		return r;
@@ -65,7 +65,7 @@
 	return nprocessed;
 }
 
-int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &al)
+int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &al)
 {
 	int nsend = 0;
 	auto AllSent = [&](Array &arr) {
diff --git a/src/sendq.h b/src/sendq.h
index bba44af..0699df7 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -37,7 +37,7 @@
 class SendQ
 {
 public:
-	typedef std::string Remote;
+	typedef MQId Remote;
 	typedef bhome_msg::MsgI MsgI;
 	typedef std::string Content;
 	typedef boost::variant2::variant<MsgI, Content> Data;
@@ -50,18 +50,18 @@
 	typedef TimedMsg::TimePoint TimePoint;
 	typedef TimedMsg::Duration Duration;
 
-	template <class... Rest>
-	void Append(const MQId &id, Rest &&...rest)
-	{
-		Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...);
-	}
+	// template <class... Rest>
+	// void Append(const MQId &id, Rest &&...rest)
+	// {
+	// 	Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...);
+	// }
 
-	void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
+	void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire = OnMsgEvent())
 	{
 		msg.AddRef();
 		AppendData(addr, Data(msg), DefaultExpire(), onExpire);
 	}
-	void Append(const Remote &addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent())
+	void Append(const Remote addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent())
 	{
 		AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire);
 	}
@@ -71,7 +71,7 @@
 private:
 	static TimePoint Now() { return TimedMsg::Clock::now(); }
 	static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
-	void AppendData(const Remote &addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire)
+	void AppendData(const Remote addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire)
 	{
 		//TODO simple queue, organize later ?
 
@@ -88,8 +88,8 @@
 	typedef std::list<Array> ArrayList;
 	typedef std::unordered_map<Remote, ArrayList> Store;
 
-	int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr);
-	int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &arr);
+	int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr);
+	int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
 
 	std::mutex mutex_in_;
 	std::mutex mutex_out_;
diff --git a/src/shm.h b/src/shm.h
index 0e834c3..a70afcb 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -25,7 +25,6 @@
 #include <boost/interprocess/sync/interprocess_mutex.hpp>
 #include <boost/interprocess/sync/scoped_lock.hpp>
 #include <boost/noncopyable.hpp>
-#include <boost/uuid/uuid.hpp>
 #include <chrono>
 #include <thread>
 
@@ -103,7 +102,16 @@
 	~SharedMemory();
 	std::string name() const { return name_; }
 	bool Remove() { return Remove(name()); }
-
+	template <class T, class... Params>
+	T *FindOrCreate(const std::string &name, Params &&...params)
+	{
+		return find_or_construct<T>(name.c_str(), std::nothrow)(std::forward<decltype(params)>(params)...);
+	}
+	template <class T, class... Params>
+	T *Create(const std::string &name, Params &&...params)
+	{
+		return construct<T>(name.c_str(), std::nothrow)(std::forward<decltype(params)>(params)...);
+	}
 	void *Alloc(const size_t size) { return allocate(size, std::nothrow); }
 	void Dealloc(void *p)
 	{
@@ -113,7 +121,7 @@
 	void Dealloc(offset_ptr<T> ptr) { return Dealloc(ptr.get()); }
 
 	template <class T, class... Params>
-	T *New(Params const &...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); }
+	T *New(Params &&...params) { return construct<T>(anonymous_instance, std::nothrow)(std::forward<decltype(params)>(params)...); }
 	template <class T>
 	void Delete(T *p)
 	{
@@ -157,7 +165,7 @@
 	ShmObject(ShmType &segment, const std::string &name, Params &&...t) :
 	    shm_(segment), name_(name)
 	{
-		pdata_ = shm_.find_or_construct<Data>(ObjName(name_).c_str(), std::nothrow)(t...);
+		pdata_ = shm_.Create<Data>(ObjName(name_), std::forward<decltype(t)>(t)...);
 		if (!IsOk()) {
 			throw("Error: Not enough memory, can not allocate \"" + name_ + "\"");
 		}
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index 215a8ac..1be8021 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -18,20 +18,21 @@
 
 #include "shm_queue.h"
 #include "bh_util.h"
-#include <boost/uuid/uuid_generators.hpp>
-#include <boost/uuid/uuid_io.hpp>
 
 namespace bhome_shm
 {
 using namespace bhome_msg;
 using namespace boost::interprocess;
-using namespace boost::uuids;
 
 namespace
 {
-std::string MsgQIdToName(const MQId &id) { return "shmq" + to_string(id); }
-// MQId EmptyId() { return nil_uuid(); }
-MQId NewId() { return random_generator()(); }
+std::string MsgQIdToName(const ShmMsgQueue::MQId id)
+{
+	char buf[40] = "mqOx";
+	int n = sprintf(buf + 4, "%lx", id);
+	return std::string(buf, n + 4);
+}
+
 const int AdjustMQLength(const int len)
 {
 	const int kMaxLength = 10000;
@@ -47,8 +48,13 @@
 
 } // namespace
 
+ShmMsgQueue::MQId ShmMsgQueue::NewId()
+{
+	static auto &id = GetData();
+	return ++id;
+}
 // ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
-ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const int len) :
+ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
     Super(segment, MsgQIdToName(id), AdjustMQLength(len), segment.get_segment_manager()),
     id_(id)
 {
@@ -59,7 +65,7 @@
 
 ShmMsgQueue::~ShmMsgQueue() {}
 
-bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId &id)
+bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id)
 {
 	Queue *q = Find(shm, id);
 	if (q) {
@@ -71,12 +77,12 @@
 	return Super::Remove(shm, MsgQIdToName(id));
 }
 
-ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId &remote_id)
+ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId remote_id)
 {
 	return Super::Find(shm, MsgQIdToName(remote_id));
 }
 
-bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend)
+bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend)
 {
 	Queue *remote = Find(shm, remote_id);
 	if (remote) {
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 93d77df..70039b5 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -21,6 +21,7 @@
 
 #include "msg.h"
 #include "shm.h"
+#include <atomic>
 #include <boost/circular_buffer.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
 
@@ -29,8 +30,6 @@
 
 template <class D>
 using Circular = boost::circular_buffer<D, Allocator<D>>;
-
-typedef boost::uuids::uuid MQId;
 
 template <class D>
 class SharedQueue : private Circular<D>
@@ -137,32 +136,32 @@
 
 using namespace bhome_msg;
 
-class ShmMsgQueue : private ShmObject<SharedQueue<MsgI>>
+class ShmMsgQueue : private ShmObject<SharedQueue<MsgI>>, public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
 {
 	typedef ShmObject<SharedQueue<MsgI>> Super;
 	typedef Super::Data Queue;
 	typedef std::function<void()> OnSend;
-	MQId id_;
 
-protected:
-	ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use.
 public:
-	ShmMsgQueue(const MQId &id, ShmType &segment, const int len);
+	typedef uint64_t MQId;
+
+	static MQId NewId();
+
+	ShmMsgQueue(const MQId id, ShmType &segment, const int len);
 	ShmMsgQueue(ShmType &segment, const int len);
 	~ShmMsgQueue();
-	static bool Remove(SharedMemory &shm, const MQId &id);
-	const MQId &Id() const { return id_; }
+	static bool Remove(SharedMemory &shm, const MQId id);
+	MQId Id() const { return id_; }
 	using Super::shm;
 
 	bool Recv(MsgI &msg, const int timeout_ms) { return data()->Read(msg, timeout_ms); }
 	bool TryRecv(MsgI &msg) { return data()->TryRead(msg); }
 	template <class OnData>
 	int TryRecvAll(OnData const &onData) { return data()->TryReadAll(onData); }
-	static Queue *Find(SharedMemory &shm, const MQId &remote_id);
-	// static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend = OnSend());
-	static bool TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
+	static Queue *Find(SharedMemory &shm, const MQId remote_id);
+	static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
 	template <class Iter>
-	static int TrySendAll(SharedMemory &shm, const MQId &remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
+	static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
 	{
 		Queue *remote = Find(shm, remote_id);
 		if (remote) {
@@ -177,14 +176,13 @@
 		}
 	}
 
-	// template <class... Rest>
-	// bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); }
 	template <class... Rest>
-	bool TrySend(const MQId &remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
+	bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
 	template <class... Rest>
-	int TrySendAll(const MQId &remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); }
+	int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); }
 
-	size_t Pending() const { return data()->size(); }
+private:
+	MQId id_;
 };
 
 } // namespace bhome_shm
diff --git a/src/socket.cpp b/src/socket.cpp
index 313c212..e471633 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -24,7 +24,7 @@
 using namespace bhome_msg;
 using namespace bhome_shm;
 
-ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) :
+ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) :
     run_(false), mq_(id, shm, len)
 {
 	Start();
diff --git a/src/socket.h b/src/socket.h
index 1ba10cb..cd6bfee 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -33,44 +33,37 @@
 #include <vector>
 
 using namespace bhome_msg;
-
 class ShmSocket : private boost::noncopyable
 {
-	template <class... T>
-	bool SendImpl(const void *valid_remote, T &&...rest)
-	{
-		send_buffer_.Append(*static_cast<const MQId *>(valid_remote), std::forward<decltype(rest)>(rest)...);
-		return true;
-	}
 
 protected:
 	typedef bhome_shm::ShmMsgQueue Queue;
 
 public:
+	typedef ShmMsgQueue::MQId MQId;
 	typedef bhome_shm::SharedMemory Shm;
 	typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
 	typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
 	typedef std::function<void(ShmSocket &sock)> IdleCB;
 
-	ShmSocket(Shm &shm, const MQId &id, const int len);
+	ShmSocket(Shm &shm, const MQId id, const int len);
 	ShmSocket(Shm &shm, const int len = 12);
 	~ShmSocket();
-	static bool Remove(SharedMemory &shm, const MQId &id) { return Queue::Remove(shm, id); }
+	static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
 	bool Remove() { return Remove(shm(), id()); }
-	const MQId &id() const { return mq().Id(); }
+	MQId id() const { return mq().Id(); }
 	// start recv.
 	bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
 	bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
 	bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
 	bool Stop();
-	size_t Pending() const { return mq().Pending(); }
 
 	template <class Body>
-	bool Send(const void *valid_remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
+	bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
 	{
 		try {
 			if (!cb) {
-				return SendImpl(valid_remote, MsgI::Serialize(head, body));
+				return SendImpl(remote, MsgI::Serialize(head, body));
 			} else {
 				std::string msg_id(head.msg_id());
 				per_msg_cbs_->Store(msg_id, std::move(cb));
@@ -78,7 +71,7 @@
 					RecvCB cb_no_use;
 					per_msg_cbs_->Pick(msg_id, cb_no_use);
 				};
-				return SendImpl(valid_remote, MsgI::Serialize(head, body), onExpireRemoveCB);
+				return SendImpl(remote, MsgI::Serialize(head, body), onExpireRemoveCB);
 			}
 		} catch (...) {
 			SetLastError(eError, "Send internal error.");
@@ -86,15 +79,15 @@
 		}
 	}
 
-	bool Send(const void *valid_remote, const MsgI &imsg)
+	bool Send(const MQId remote, const MsgI &imsg)
 	{
-		return SendImpl(valid_remote, imsg);
+		return SendImpl(remote, imsg);
 	}
 
 	bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
 
 	template <class Body>
-	bool SendAndRecv(const void *remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
+	bool SendAndRecv(const MQId remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
 	{
 		struct State {
 			std::mutex mutex;
@@ -144,6 +137,13 @@
 	bool StopNoLock();
 	bool RunningNoLock() { return !workers_.empty(); }
 
+	template <class... Rest>
+	bool SendImpl(const MQId remote, Rest &&...rest)
+	{
+		send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
+		return true;
+	}
+
 	std::vector<std::thread> workers_;
 	std::mutex mutex_;
 	std::atomic<bool> run_;
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 00db773..9398318 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -25,7 +25,7 @@
 
 namespace
 {
-inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
+inline void AddRoute(BHMsgHead &head, const MQId id) { head.add_route()->set_mq_id(id); }
 
 struct SrcInfo {
 	std::vector<BHAddress> route;
@@ -82,7 +82,7 @@
 	auto &sock = SockNode();
 	MsgRegister body;
 	body.mutable_proc()->Swap(&proc);
-	auto AddId = [&](const MQId &id) { body.add_addrs()->set_mq_id(&id, sizeof(id)); };
+	auto AddId = [&](const MQId id) { body.add_addrs()->set_mq_id(id); };
 	AddId(SockNode().id());
 	AddId(SockServer().id());
 	AddId(SockClient().id());
@@ -108,12 +108,12 @@
 			MsgCommonReply body;
 			CheckResult(imsg, head, body);
 		};
-		return sock.Send(&BHTopicCenterAddress(), head, body, onResult);
+		return sock.Send(BHTopicCenterAddress(), head, body, onResult);
 	} else {
 		MsgI reply;
 		DEFER1(reply.Release(););
 		BHMsgHead reply_head;
-		bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+		bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
 		if (r) {
 			CheckResult(reply, reply_head, reply_body);
 		}
@@ -144,12 +144,12 @@
 			MsgCommonReply body;
 			CheckResult(imsg, head, body);
 		};
-		return sock.Send(&BHTopicCenterAddress(), head, body, onResult);
+		return sock.Send(BHTopicCenterAddress(), head, body, onResult);
 	} else {
 		MsgI reply;
 		DEFER1(reply.Release(););
 		BHMsgHead reply_head;
-		bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+		bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
 		return r && CheckResult(reply, reply_head, reply_body);
 	}
 }
@@ -169,12 +169,12 @@
 	AddRoute(head, sock.id());
 
 	if (timeout_ms == 0) {
-		return sock.Send(&BHTopicCenterAddress(), head, body);
+		return sock.Send(BHTopicCenterAddress(), head, body);
 	} else {
 		MsgI reply;
 		DEFER1(reply.Release(););
 		BHMsgHead reply_head;
-		bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+		bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
 		r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
 		return (r && IsSuccess(reply_body.errmsg().errcode()));
 	}
@@ -201,7 +201,7 @@
 	MsgI reply;
 	DEFER1(reply.Release());
 	BHMsgHead reply_head;
-	return (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) &&
+	return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) &&
 	        reply_head.type() == kMsgTypeQueryTopicReply &&
 	        reply.ParseBody(reply_body));
 }
@@ -221,12 +221,12 @@
 	AddRoute(head, sock.id());
 
 	if (timeout_ms == 0) {
-		return sock.Send(&BHTopicCenterAddress(), head, body);
+		return sock.Send(BHTopicCenterAddress(), head, body);
 	} else {
 		MsgI reply;
 		DEFER1(reply.Release(););
 		BHMsgHead reply_head;
-		bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+		bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
 		r = r && reply_head.type() == kMsgTypeCommonReply;
 		r = r && reply.ParseBody(reply_body);
 		return r;
@@ -247,8 +247,8 @@
 			for (int i = 0; i < head.route_size() - 1; ++i) {
 				reply_head.add_route()->Swap(head.mutable_route(i));
 			}
-			auto &remote = head.route().rbegin()->mq_id();
-			sock.Send(remote.data(), reply_head, reply_body);
+			auto remote = head.route().rbegin()->mq_id();
+			sock.Send(remote, reply_head, reply_body);
 		}
 	};
 
@@ -315,7 +315,7 @@
 	for (unsigned i = 0; i < p->route.size() - 1; ++i) {
 		head.add_route()->Swap(&p->route[i]);
 	}
-	return sock.Send(p->route.back().mq_id().data(), head, body);
+	return sock.Send(p->route.back().mq_id(), head, body);
 }
 
 bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker)
@@ -361,9 +361,9 @@
 					}
 				}
 			};
-			return sock.Send(addr.mq_id().data(), head, req, onRecv);
+			return sock.Send(addr.mq_id(), head, req, onRecv);
 		} else {
-			return sock.Send(addr.mq_id().data(), head, req);
+			return sock.Send(addr.mq_id(), head, req);
 		}
 	};
 
@@ -396,7 +396,7 @@
 			DEFER1(reply_msg.Release(););
 			BHMsgHead reply_head;
 
-			if (sock.SendAndRecv(addr.mq_id().data(), head, request, reply_msg, reply_head, timeout_ms) &&
+			if (sock.SendAndRecv(addr.mq_id(), head, request, reply_msg, reply_head, timeout_ms) &&
 			    reply_head.type() == kMsgTypeRequestTopicReply &&
 			    reply_msg.ParseBody(out_reply)) {
 				reply_head.mutable_proc_id()->swap(out_proc_id);
@@ -441,7 +441,7 @@
 	std::vector<NodeAddress> lst;
 	if (QueryRPCTopics(topic, lst, timeout_ms)) {
 		addr = lst.front().addr();
-		if (!addr.mq_id().empty()) {
+		if (addr.mq_id() != 0) {
 			topic_query_cache_.Store(topic, addr);
 			return true;
 		}
@@ -464,13 +464,13 @@
 		AddRoute(head, sock.id());
 
 		if (timeout_ms == 0) {
-			return sock.Send(&BHTopicBusAddress(), head, pub);
+			return sock.Send(BHTopicBusAddress(), head, pub);
 		} else {
 			MsgI reply;
 			DEFER1(reply.Release(););
 			BHMsgHead reply_head;
 			MsgCommonReply reply_body;
-			return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
+			return sock.SendAndRecv(BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
 			       reply_head.type() == kMsgTypeCommonReply &&
 			       reply.ParseBody(reply_body) &&
 			       IsSuccess(reply_body.errmsg().errcode());
@@ -497,12 +497,12 @@
 		BHMsgHead head(InitMsgHead(GetType(sub), proc_id()));
 		AddRoute(head, sock.id());
 		if (timeout_ms == 0) {
-			return sock.Send(&BHTopicBusAddress(), head, sub);
+			return sock.Send(BHTopicBusAddress(), head, sub);
 		} else {
 			MsgI reply;
 			DEFER1(reply.Release(););
 			BHMsgHead reply_head;
-			return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) &&
+			return sock.SendAndRecv(BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) &&
 			       reply_head.type() == kMsgTypeCommonReply &&
 			       reply.ParseBody(reply_body) &&
 			       IsSuccess(reply_body.errmsg().errcode());
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 5d65bd5..dd59b09 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -198,8 +198,8 @@
 
 	const std::string mtx_name("test_mutex");
 	const std::string int_name("test_int");
-	auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())();
-	auto pi = shm.find_or_construct<int>(int_name.c_str())(100);
+	auto mtx = shm.FindOrCreate<Mutex>(mtx_name);
+	auto pi = shm.FindOrCreate<int>(int_name, 100);
 
 	printf("mutetx ");
 	PrintPtr(mtx);
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 4615c53..d145ab4 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -26,7 +26,7 @@
 	SharedMemory &shm = TestShm();
 	MsgI::BindShm(shm);
 
-	MQId id = boost::uuids::random_generator()();
+	MQId id = ShmMsgQueue::NewId();
 	const int timeout = 1000;
 	const uint32_t data_size = 4000;
 	const std::string proc_id = "demo_proc";
@@ -157,8 +157,8 @@
 				req_body.set_topic("topic");
 				req_body.set_data(msg_content);
 				auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
-				req_head.add_route()->set_mq_id(&cli.id(), cli.id().size());
-				return cli.Send(&srv.id(), req_head, req_body);
+				req_head.add_route()->set_mq_id(cli.id());
+				return cli.Send(srv.id(), req_head, req_body);
 			};
 
 			Req();
@@ -175,15 +175,13 @@
 				DEFER1(req.Release());
 
 				if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
-					auto &mqid = req_head.route()[0].mq_id();
-					MQId src_id;
-					memcpy(&src_id, mqid.data(), sizeof(src_id));
+					auto src_id = req_head.route()[0].mq_id();
 					auto Reply = [&]() {
 						MsgRequestTopic reply_body;
 						reply_body.set_topic("topic");
 						reply_body.set_data(msg_content);
 						auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
-						return srv.Send(&src_id, reply_head, reply_body);
+						return srv.Send(src_id, reply_head, reply_body);
 					};
 					Reply();
 				}
diff --git a/utest/utest.cpp b/utest/utest.cpp
index ff5d2ed..d058471 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -2,8 +2,6 @@
 #include "defs.h"
 #include "util.h"
 #include <atomic>
-#include <boost/uuid/uuid_generators.hpp>
-#include <boost/uuid/uuid_io.hpp>
 #include <condition_variable>
 #include <stdio.h>
 #include <string>
@@ -96,7 +94,7 @@
 
 	auto Avail = [&]() { return shm.get_free_memory(); };
 	auto init_avail = Avail();
-	int *flag = shm.find_or_construct<int>("flag")(123);
+	int *flag = shm.FindOrCreate<int>("flag", 123);
 	printf("flag = %d\n", *flag);
 	++*flag;
 	const std::string sub_proc_id = "subscriber";
@@ -207,7 +205,7 @@
 
 	auto Avail = [&]() { return shm.get_free_memory(); };
 	auto init_avail = Avail();
-	int *flag = shm.find_or_construct<int>("flag")(123);
+	int *flag = shm.FindOrCreate<int>("flag", 123);
 	printf("flag = %d\n", *flag);
 	++*flag;
 

--
Gitblit v1.8.0