From 6eefba812ede29549af3633c490f2e85a4805524 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 31 三月 2021 11:24:20 +0800
Subject: [PATCH] format code style.

---
 src/shm.h              |  130 ++-
 src/socket.h           |   75 +-
 utest/simple_tests.cpp |  191 ++--
 src/msg.h              |   79 +
 src/socket.cpp         |  158 ++--
 .clang-format          |   12 
 src/defs.h             |    5 
 utest/utest.cpp        |  279 +++---
 src/shm_queue.cpp      |   91 +-
 src/center.cpp         |    3 
 src/msg.cpp            |  151 ++--
 utest/speed_test.cpp   |  305 ++++----
 utest/util.h           |  119 +-
 src/bh_util.h          |   60 
 src/pubsub.h           |   37 
 src/shm_queue.h        |  184 ++--
 src/center.h           |    1 
 src/pubsub.cpp         |  248 +++---
 src/shm.cpp            |    6 
 19 files changed, 1,104 insertions(+), 1,030 deletions(-)

diff --git a/.clang-format b/.clang-format
index fd84ae9..413be3a 100644
--- a/.clang-format
+++ b/.clang-format
@@ -3,21 +3,21 @@
 IndentWidth: 4
 TabWidth: 4
 ColumnLimit: 0
-AllowShortIfStatementsOnASingleLine: true
 AllowShortCaseLabelsOnASingleLine: true
 AllowShortBlocksOnASingleLine: true
 AllowShortEnumsOnASingleLine: true
 AllowShortLoopsOnASingleLine: true
-AllowShortLambdasOnASingleLine: true
-AllowShortFunctionsOnASingleLine: true
+AllowShortLambdasOnASingleLine: All
+AllowShortIfStatementsOnASingleLine: WithoutElse
+AllowShortFunctionsOnASingleLine: All
 BreakBeforeBraces: Linux
 IndentCaseLabels: false
 AccessModifierOffset: -4
 BreakConstructorInitializers: AfterColon
-AlignConsecutiveAssignments: true
-AlignConsecutiveDeclarations: true
+ConstructorInitializerAllOnOneLineOrOnePerLine: true
+AllowAllConstructorInitializersOnNextLine: true
+
 AlignTrailingComments: true
 AlignEscapedNewlinesLeft: true
 PointerAlignment: Right
 SpaceAfterCStyleCast: true
-
diff --git a/src/bh_util.h b/src/bh_util.h
index d86b931..b5dc45e 100644
--- a/src/bh_util.h
+++ b/src/bh_util.h
@@ -23,7 +23,7 @@
 
 inline uint16_t Get8(const void *p)
 {
-	return static_cast<const uint8_t*>(p)[0];
+	return static_cast<const uint8_t *>(p)[0];
 }
 inline void Put8(void *p, uint8_t u)
 {
@@ -32,9 +32,9 @@
 
 inline uint16_t Get16(const void *p)
 {
-	auto ptr = static_cast<const uint8_t*>(p);
-	return (((uint16_t)ptr[0]) << 8u) |
-		   (((uint16_t)ptr[1]));
+	auto ptr = static_cast<const uint8_t *>(p);
+	return (((uint16_t) ptr[0]) << 8u) |
+	       (((uint16_t) ptr[1]));
 }
 inline void Put16(void *p, uint16_t u)
 {
@@ -45,11 +45,11 @@
 
 inline uint32_t Get32(const void *p)
 {
-	auto ptr = static_cast<const uint8_t*>(p);
-	return (((uint32_t)ptr[0]) << 24u) |
-		   (((uint32_t)ptr[1]) << 16u) |
-		   (((uint32_t)ptr[2]) << 8u) |
-		   (((uint32_t)ptr[3]));
+	auto ptr = static_cast<const uint8_t *>(p);
+	return (((uint32_t) ptr[0]) << 24u) |
+	       (((uint32_t) ptr[1]) << 16u) |
+	       (((uint32_t) ptr[2]) << 8u) |
+	       (((uint32_t) ptr[3]));
 }
 inline void Put32(void *p, uint32_t u)
 {
@@ -60,17 +60,17 @@
 	ptr[3] = (uint8_t)(u);
 }
 
-inline uint64_t Get64(const void *p)                             
+inline uint64_t Get64(const void *p)
 {
-	auto ptr = static_cast<const uint8_t*>(p);
-	return (((uint64_t)ptr[0]) << 56u) |
-		   (((uint64_t)ptr[1]) << 48u) |
-		   (((uint64_t)ptr[2]) << 40u) |
-		   (((uint64_t)ptr[3]) << 32u) |
-		   (((uint64_t)ptr[4]) << 24u) |
-		   (((uint64_t)ptr[5]) << 16u) |
-		   (((uint64_t)ptr[6]) << 8u) |
-		   ((uint64_t)ptr[7]);
+	auto ptr = static_cast<const uint8_t *>(p);
+	return (((uint64_t) ptr[0]) << 56u) |
+	       (((uint64_t) ptr[1]) << 48u) |
+	       (((uint64_t) ptr[2]) << 40u) |
+	       (((uint64_t) ptr[3]) << 32u) |
+	       (((uint64_t) ptr[4]) << 24u) |
+	       (((uint64_t) ptr[5]) << 16u) |
+	       (((uint64_t) ptr[6]) << 8u) |
+	       ((uint64_t) ptr[7]);
 }
 inline void Put64(void *p, uint64_t u)
 {
@@ -92,20 +92,24 @@
 
 class ExitCall
 {
-    typedef std::function<void(void)> func_t;
-    func_t m_func;
+	typedef std::function<void(void)> func_t;
+	func_t m_func;
+
 public:
-    explicit ExitCall(func_t f): m_func(f) {}
-    ~ExitCall() { if (m_func) { m_func(); } }
+	explicit ExitCall(func_t f) :
+	    m_func(f) {}
+	~ExitCall()
+	{
+		if (m_func) { m_func(); }
+	}
 };
 
 // macro helper
-#define JOIN_IMPL(a, b) a ## b
-#define JOIN(a, b)  JOIN_IMPL(a , b)
+#define JOIN_IMPL(a, b) a##b
+#define JOIN(a, b) JOIN_IMPL(a, b)
 // defer function / lambda.
-#define DEFERF(func) ExitCall JOIN(defer_ , __LINE__)(func)
+#define DEFERF(func) ExitCall JOIN(defer_, __LINE__)(func)
 // defer simple expression
-#define DEFER1(expr) DEFERF([&](){ expr; })
-
+#define DEFER1(expr) DEFERF([&]() { expr; })
 
 #endif /* end of include guard: BH_UTIL_SOXWOK67 */
diff --git a/src/center.cpp b/src/center.cpp
index 809b6d1..db000c4 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -23,7 +23,6 @@
 
 SharedMemory &BHomeShm()
 {
-	static SharedMemory shm("bhome_default_shm_v0", 1024*1024*64);
+	static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 64);
 	return shm;
 }
-
diff --git a/src/center.h b/src/center.h
index fcb8005..153cc3e 100644
--- a/src/center.h
+++ b/src/center.h
@@ -20,7 +20,6 @@
 
 class BHCenter
 {
-
 };
 
 #endif // end of include guard: CENTER_TM9OUQTG
diff --git a/src/defs.h b/src/defs.h
index acfe09e..10ac73c 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -27,9 +27,10 @@
 const MQId kBHBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
 const int kBHCenterPort = 24287;
 const char kTopicSep = '.';
-namespace bhome_shm {
+namespace bhome_shm
+{
 class SharedMemory;
-}
+} // namespace bhome_shm
 
 bhome_shm::SharedMemory &BHomeShm();
 
diff --git a/src/msg.cpp b/src/msg.cpp
index 78834a8..3a01240 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -18,48 +18,49 @@
 #include "msg.h"
 #include "bh_util.h"
 
-namespace bhome_msg {
+namespace bhome_msg
+{
 
 const uint32_t kMsgTag = 0xf1e2d3c4;
 const uint32_t kMsgPrefixLen = 4;
 
 BHMsg InitMsg(MsgType type)
 {
-    BHMsg msg;
-    msg.set_type(type);
-    time_t tm = 0;
-    msg.set_timestamp(time(&tm));
-    return msg;
+	BHMsg msg;
+	msg.set_type(type);
+	time_t tm = 0;
+	msg.set_timestamp(time(&tm));
+	return msg;
 }
 
 BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size)
 {
-    assert(data && size);
-    BHMsg msg(InitMsg(kMsgTypeRequest));
-    msg.set_body(data, size);
-    msg.add_route()->set_mq_id(&src_id, sizeof(src_id));
-    return msg;
+	assert(data && size);
+	BHMsg msg(InitMsg(kMsgTypeRequest));
+	msg.set_body(data, size);
+	msg.add_route()->set_mq_id(&src_id, sizeof(src_id));
+	return msg;
 }
 
 BHMsg MakeReply(const void *data, const size_t size)
 {
-    assert(data && size);
-    BHMsg msg(InitMsg(kMsgTypeReply));
-    msg.set_body(data, size);
-    return msg;
+	assert(data && size);
+	BHMsg msg(InitMsg(kMsgTypeReply));
+	msg.set_body(data, size);
+	return msg;
 }
 
 BHMsg MakeSubUnsub(const MQId &client, const std::vector<std::string> &topics, const MsgType sub_unsub)
 {
-    assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
-    BHMsg msg(InitMsg(sub_unsub));
-    msg.add_route()->set_mq_id(&client, sizeof(client));
-    DataSub subs;
-    for (auto &t : topics) {
-        subs.add_topics(t);
-    }
-    msg.set_body(subs.SerializeAsString());
-    return msg;
+	assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
+	BHMsg msg(InitMsg(sub_unsub));
+	msg.add_route()->set_mq_id(&client, sizeof(client));
+	DataSub subs;
+	for (auto &t : topics) {
+		subs.add_topics(t);
+	}
+	msg.set_body(subs.SerializeAsString());
+	return msg;
 }
 
 BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeSubscribe); }
@@ -67,77 +68,77 @@
 
 BHMsg MakePub(const std::string &topic, const void *data, const size_t size)
 {
-    assert(data && size);
-    BHMsg msg(InitMsg(kMsgTypePublish));
-    DataPub pub;
-    pub.set_topic(topic);
-    pub.set_data(data, size);
-    msg.set_body(pub.SerializeAsString());
-    return msg;
+	assert(data && size);
+	BHMsg msg(InitMsg(kMsgTypePublish));
+	DataPub pub;
+	pub.set_topic(topic);
+	pub.set_data(data, size);
+	msg.set_body(pub.SerializeAsString());
+	return msg;
 }
 
 void *Pack(SharedMemory &shm, const BHMsg &msg)
 {
-    uint32_t msg_size = msg.ByteSizeLong();
-    void *p = shm.Alloc(4 + msg_size);
-    if(p) {
-        Put32(p, msg_size);
-        if (!msg.SerializeToArray(static_cast<char*>(p) + kMsgPrefixLen, msg_size)) {
-            shm.Dealloc(p);
-            p = 0;
-        }
-    }
-    return p;
+	uint32_t msg_size = msg.ByteSizeLong();
+	void *p = shm.Alloc(4 + msg_size);
+	if (p) {
+		Put32(p, msg_size);
+		if (!msg.SerializeToArray(static_cast<char *>(p) + kMsgPrefixLen, msg_size)) {
+			shm.Dealloc(p);
+			p = 0;
+		}
+	}
+	return p;
 }
 
 bool MsgI::Unpack(BHMsg &msg) const
 {
-    void *p = ptr_.get();
-    assert(p);
-    uint32_t msg_size = Get32(p);
-    return msg.ParseFromArray(static_cast<char*>(p) + kMsgPrefixLen, msg_size);
+	void *p = ptr_.get();
+	assert(p);
+	uint32_t msg_size = Get32(p);
+	return msg.ParseFromArray(static_cast<char *>(p) + kMsgPrefixLen, msg_size);
 }
 
 // with ref count;
 bool MsgI::MakeRC(SharedMemory &shm, const BHMsg &msg)
 {
-    void *p = Pack(shm, msg);
-    if(!p) {
-        return false;
-    }
-    RefCount *rc = shm.New<RefCount>();
-    if (!rc) {
-        shm.Dealloc(p);
-        return false;
-    }
-    MsgI(p, rc).swap(*this);
-    return true;
+	void *p = Pack(shm, msg);
+	if (!p) {
+		return false;
+	}
+	RefCount *rc = shm.New<RefCount>();
+	if (!rc) {
+		shm.Dealloc(p);
+		return false;
+	}
+	MsgI(p, rc).swap(*this);
+	return true;
 }
 
 bool MsgI::Make(SharedMemory &shm, const BHMsg &msg)
 {
-    void *p = Pack(shm, msg);
-    if(!p) {
-        return false;
-    }
-    MsgI(p, 0).swap(*this);
-    return true;
+	void *p = Pack(shm, msg);
+	if (!p) {
+		return false;
+	}
+	MsgI(p, 0).swap(*this);
+	return true;
 }
 
-int MsgI::Release(SharedMemory &shm) 
+int MsgI::Release(SharedMemory &shm)
 {
-    if (IsCounted()) {
-        const int n = count_->Dec();
-        if (n != 0) {
-            return n;
-        }
-    }
-    // free data
-    shm.Dealloc(ptr_);
-    ptr_ = 0;
-    shm.Delete(count_);
-    count_ = 0;
-    return 0;
+	if (IsCounted()) {
+		const int n = count_->Dec();
+		if (n != 0) {
+			return n;
+		}
+	}
+	// free data
+	shm.Dealloc(ptr_);
+	ptr_ = 0;
+	shm.Delete(count_);
+	count_ = 0;
+	return 0;
 }
 
 } // namespace bhome_msg
diff --git a/src/msg.h b/src/msg.h
index f3fe726..2154eba 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -18,15 +18,16 @@
 #ifndef MSG_5BILLZET
 #define MSG_5BILLZET
 
-#include <stdint.h>
+#include "bhome_msg.pb.h"
 #include "shm.h"
 #include <boost/interprocess/offset_ptr.hpp>
 #include <boost/uuid/uuid_generators.hpp>
-#include "bhome_msg.pb.h"
+#include <stdint.h>
 
-namespace bhome_msg {
-    using namespace bhome_shm;
-    using namespace bhome::msg; // for serialized data in MsgI
+namespace bhome_msg
+{
+using namespace bhome_shm;
+using namespace bhome::msg; // for serialized data in MsgI
 
 // MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required.
 // message format: header(meta) + body(data).
@@ -37,49 +38,67 @@
 class RefCount : private boost::noncopyable
 {
 public:
-    int Inc() { Guard lk(mutex_); return ++num_; }
-    int Dec() { Guard lk(mutex_); return --num_; }
-    int Get() { Guard lk(mutex_); return num_; }
+	int Inc()
+	{
+		Guard lk(mutex_);
+		return ++num_;
+	}
+	int Dec()
+	{
+		Guard lk(mutex_);
+		return --num_;
+	}
+	int Get()
+	{
+		Guard lk(mutex_);
+		return num_;
+	}
+
 private:
-    Mutex mutex_;
-    int num_ = 1;
+	Mutex mutex_;
+	int num_ = 1;
 };
 
 BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size);
 BHMsg MakeReply(const void *data, const size_t size);
-BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics); 
-BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics); 
+BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics);
+BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics);
 BHMsg MakePub(const std::string &topic, const void *data, const size_t size);
 
-class MsgI {
+class MsgI
+{
 private:
-    offset_ptr<void> ptr_;
-    offset_ptr<RefCount> count_;
+	offset_ptr<void> ptr_;
+	offset_ptr<RefCount> count_;
 
-    bool BuildSubOrUnsub(SharedMemory &shm, const std::vector<std::string> &topics, const MsgType sub_unsub);
+	bool BuildSubOrUnsub(SharedMemory &shm, const std::vector<std::string> &topics, const MsgType sub_unsub);
+
 public:
-    MsgI(void *p=0, RefCount *c=0):ptr_(p), count_(c) {}
-    void swap(MsgI &a) { std::swap(ptr_, a.ptr_); std::swap(count_, a.count_); }
-    template <class T = void> T *get() { return static_cast<T*>(ptr_.get()); }
+	MsgI(void *p = 0, RefCount *c = 0) :
+	    ptr_(p), count_(c) {}
 
-    // AddRef and Release works for both counted and not counted msg.
-    int AddRef() const { return IsCounted() ? count_->Inc() : 1; }
-    int Release(SharedMemory &shm);
+	void swap(MsgI &a)
+	{
+		std::swap(ptr_, a.ptr_);
+		std::swap(count_, a.count_);
+	}
+	template <class T = void>
+	T *get() { return static_cast<T *>(ptr_.get()); }
 
-    int Count()  const{ return IsCounted() ? count_->Get() : 1; }
+	// AddRef and Release works for both counted and not counted msg.
+	int AddRef() const { return IsCounted() ? count_->Inc() : 1; }
+	int Release(SharedMemory &shm);
 
-    bool IsCounted() const { return static_cast<bool>(count_); }
+	int Count() const { return IsCounted() ? count_->Get() : 1; }
+	bool IsCounted() const { return static_cast<bool>(count_); }
 
-    bool Make(SharedMemory &shm, const BHMsg &msg);
-    bool MakeRC(SharedMemory &shm, const BHMsg &msg);
-    bool Unpack(BHMsg &msg) const;
+	bool Make(SharedMemory &shm, const BHMsg &msg);
+	bool MakeRC(SharedMemory &shm, const BHMsg &msg);
+	bool Unpack(BHMsg &msg) const;
 };
 
 inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); }
 
-
 } // namespace bhome_msg
-
-
 
 #endif // end of include guard: MSG_5BILLZET
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index a0dc4e9..d5c7dd2 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -16,170 +16,170 @@
  * =====================================================================================
  */
 #include "pubsub.h"
-#include <chrono>
 #include "bh_util.h"
 #include "defs.h"
+#include <chrono>
 
-namespace bhome_shm {
+namespace bhome_shm
+{
 
 using namespace std::chrono_literals;
 const int kMaxWorker = 16;
 using namespace bhome_msg;
 
-BusManager::BusManager(SharedMemory &shm):
-shm_(shm),
-busq_(kBHBusQueueId, shm, 16),
-run_(false)
+BusManager::BusManager(SharedMemory &shm) :
+    shm_(shm),
+    busq_(kBHBusQueueId, shm, 16),
+    run_(false)
 {
 }
-	
+
 BusManager::~BusManager()
 {
-    Stop();
+	Stop();
 }
 
 bool BusManager::Start(const int nworker)
 {
-    std::lock_guard<std::mutex> guard(mutex_);
-    StopNoLock();
-    // start
-    auto Worker = [&](){
-        while (this->run_) {
-            BusManager &self = *this;
-            MsgI msg;
-            const int timeout_ms = 100;
-            if (self.busq_.Recv(msg, timeout_ms)) {
-                self.OnMsg(msg);
-            }
-        }
-    };
+	std::lock_guard<std::mutex> guard(mutex_);
+	StopNoLock();
+	// start
+	auto Worker = [&]() {
+		while (this->run_) {
+			BusManager &self = *this;
+			MsgI msg;
+			const int timeout_ms = 100;
+			if (self.busq_.Recv(msg, timeout_ms)) {
+				self.OnMsg(msg);
+			}
+		}
+	};
 
-    run_.store(true);
-    const int n = std::min(nworker, kMaxWorker);
-    for (int i = 0; i < n; ++i) {
-        workers_.emplace_back(Worker);
-    }
-    return true;
+	run_.store(true);
+	const int n = std::min(nworker, kMaxWorker);
+	for (int i = 0; i < n; ++i) {
+		workers_.emplace_back(Worker);
+	}
+	return true;
 }
 
 bool BusManager::Stop()
 {
-    std::lock_guard<std::mutex> guard(mutex_);
-    return StopNoLock();
+	std::lock_guard<std::mutex> guard(mutex_);
+	return StopNoLock();
 }
 
 bool BusManager::StopNoLock()
 {
-    if (run_.exchange(false)) {
-        for (auto &w: workers_) {
-            if (w.joinable()) {
-                w.join();
-            }
-        }
-        return true;
-    }    
-    return false;
+	if (run_.exchange(false)) {
+		for (auto &w : workers_) {
+			if (w.joinable()) {
+				w.join();
+			}
+		}
+		return true;
+	}
+	return false;
 }
 
 void BusManager::OnMsg(MsgI &imsg)
 {
-    DEFER1(imsg.Release(shm_));
+	DEFER1(imsg.Release(shm_));
 
-    BHMsg msg;
-    if (!imsg.Unpack(msg)) {
-        return;
-    }
+	BHMsg msg;
+	if (!imsg.Unpack(msg)) {
+		return;
+	}
 
-    auto OnSubChange = [&](auto &&update) {
-        DataSub sub;
-        if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
-            assert(sizeof(MQId) == msg.route(0).mq_id().size());
-            MQId client;
-            memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
+	auto OnSubChange = [&](auto &&update) {
+		DataSub sub;
+		if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
+			assert(sizeof(MQId) == msg.route(0).mq_id().size());
+			MQId client;
+			memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
 
-            std::lock_guard<std::mutex> guard(mutex_);
-            auto &topics = sub.topics();
-            for (auto &topic : topics) {
-                try {
-                    update(topic, client);
-                } catch(...) {
-                    //TODO log error
-                }
-            }
-        }
-    };
+			std::lock_guard<std::mutex> guard(mutex_);
+			auto &topics = sub.topics();
+			for (auto &topic : topics) {
+				try {
+					update(topic, client);
+				} catch (...) {
+					//TODO log error
+				}
+			}
+		}
+	};
 
-    auto Sub1 = [this](const std::string &topic, const MQId &id) {
-        records_[topic].insert(id);
-    };
+	auto Sub1 = [this](const std::string &topic, const MQId &id) {
+		records_[topic].insert(id);
+	};
 
-    auto Unsub1 = [this](const std::string &topic, const MQId &id) {
-        auto pos = records_.find(topic);
-        if (pos != records_.end()) {
-            if (pos->second.erase(id) && pos->second.empty()) {
-                records_.erase(pos);
-            }
-        }
-    };
+	auto Unsub1 = [this](const std::string &topic, const MQId &id) {
+		auto pos = records_.find(topic);
+		if (pos != records_.end()) {
+			if (pos->second.erase(id) && pos->second.empty()) {
+				records_.erase(pos);
+			}
+		}
+	};
 
-    auto OnPublish = [&]() {
-        DataPub pub;
-        if (!pub.ParseFromString(msg.body())) {
-            return;
-        }
-        auto FindClients = [&](const std::string &topic){
-            Clients dests;
-            std::lock_guard<std::mutex> guard(mutex_);
-            auto Find1 = [&](const std::string &t) {
-                auto pos = records_.find(topic);
-                if (pos != records_.end() && !pos->second.empty()) {
-                    auto &clients = pos->second;
-                    for (auto &cli : clients) {
-                        dests.insert(cli);
-                    }
-                }
-            };
-            Find1(topic);
+	auto OnPublish = [&]() {
+		DataPub pub;
+		if (!pub.ParseFromString(msg.body())) {
+			return;
+		}
+		auto FindClients = [&](const std::string &topic) {
+			Clients dests;
+			std::lock_guard<std::mutex> guard(mutex_);
+			auto Find1 = [&](const std::string &t) {
+				auto pos = records_.find(topic);
+				if (pos != records_.end() && !pos->second.empty()) {
+					auto &clients = pos->second;
+					for (auto &cli : clients) {
+						dests.insert(cli);
+					}
+				}
+			};
+			Find1(topic);
 
-            //TODO check and adjust topic on client side sub/pub.
-            size_t pos = 0;
-            while (true) {
-                pos = topic.find(kTopicSep, pos);
-                if (pos == topic.npos || ++pos == topic.size()) {
-                    // Find1(std::string()); // sub all.
-                    break;
-                } else {
-                    Find1(topic.substr(0, pos));
-                }
-            }
-            return dests;
-        };
+			//TODO check and adjust topic on client side sub/pub.
+			size_t pos = 0;
+			while (true) {
+				pos = topic.find(kTopicSep, pos);
+				if (pos == topic.npos || ++pos == topic.size()) {
+					// Find1(std::string()); // sub all.
+					break;
+				} else {
+					Find1(topic.substr(0, pos));
+				}
+			}
+			return dests;
+		};
 
-        auto Dispatch = [&](auto &&send1) {
-            const Clients &clients(FindClients(pub.topic()));
-            for (auto &cli : clients) {
-                send1(cli);
-            }
-        };
+		auto Dispatch = [&](auto &&send1) {
+			const Clients &clients(FindClients(pub.topic()));
+			for (auto &cli : clients) {
+				send1(cli);
+			}
+		};
 
-        if (imsg.IsCounted()) {
-            Dispatch([&](const MQId &cli) { busq_.Send(cli, imsg, 100); });
-        } else {
-            MsgI pubmsg;
-            if (!pubmsg.MakeRC(shm_, msg)) { return; }
-            DEFER1(pubmsg.Release(shm_));
+		if (imsg.IsCounted()) {
+			Dispatch([&](const MQId &cli) { busq_.Send(cli, imsg, 100); });
+		} else {
+			MsgI pubmsg;
+			if (!pubmsg.MakeRC(shm_, msg)) { return; }
+			DEFER1(pubmsg.Release(shm_));
 
-            Dispatch([&](const MQId &cli) { busq_.Send(cli, pubmsg, 100); });
-        }
-    };
+			Dispatch([&](const MQId &cli) { busq_.Send(cli, pubmsg, 100); });
+		}
+	};
 
-    switch (msg.type()) {
-        case kMsgTypeSubscribe: OnSubChange(Sub1); break;
-        case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
-        case kMsgTypePublish : OnPublish(); break;
-        default: break;
-    }
+	switch (msg.type()) {
+	case kMsgTypeSubscribe: OnSubChange(Sub1); break;
+	case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
+	case kMsgTypePublish: OnPublish(); break;
+	default: break;
+	}
 }
 
 } // namespace bhome_shm
-
diff --git a/src/pubsub.h b/src/pubsub.h
index 11fa4e4..dc3fced 100644
--- a/src/pubsub.h
+++ b/src/pubsub.h
@@ -19,35 +19,36 @@
 #define PUBSUB_4KGRA997
 
 #include "shm_queue.h"
-#include <thread>
 #include <atomic>
 #include <mutex>
-#include <vector>
-#include <unordered_map>
 #include <set>
+#include <thread>
+#include <unordered_map>
+#include <vector>
 
-namespace bhome_shm {
+namespace bhome_shm
+{
 
 // publish/subcribe manager.
 class BusManager
 {
-    SharedMemory &shm_;
-    ShmMsgQueue busq_;
-    std::atomic<bool> run_;
-    std::vector<std::thread> workers_;
-    std::mutex mutex_;
-    typedef std::set<MQId> Clients;
-    std::unordered_map<std::string, Clients> records_;
+	SharedMemory &shm_;
+	ShmMsgQueue busq_;
+	std::atomic<bool> run_;
+	std::vector<std::thread> workers_;
+	std::mutex mutex_;
+	typedef std::set<MQId> Clients;
+	std::unordered_map<std::string, Clients> records_;
 
-    bool StopNoLock();
-    void OnMsg(MsgI &msg);
+	bool StopNoLock();
+	void OnMsg(MsgI &msg);
+
 public:
-    BusManager(SharedMemory &shm);
-    ~BusManager();
-    bool Start(const int nworker = 2);
-    bool Stop();
+	BusManager(SharedMemory &shm);
+	~BusManager();
+	bool Start(const int nworker = 2);
+	bool Stop();
 };
-
 
 } // namespace bhome_shm
 
diff --git a/src/shm.cpp b/src/shm.cpp
index d628b56..1658900 100644
--- a/src/shm.cpp
+++ b/src/shm.cpp
@@ -21,9 +21,9 @@
 namespace bhome_shm
 {
 
-SharedMemory::SharedMemory(const std::string &name, const uint64_t size)
-    : mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()),
-      name_(name)
+SharedMemory::SharedMemory(const std::string &name, const uint64_t size) :
+    mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()),
+    name_(name)
 {
 }
 
diff --git a/src/shm.h b/src/shm.h
index 0f68754..3fce99d 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -19,14 +19,15 @@
 #ifndef SHM_6CHO6D6C
 #define SHM_6CHO6D6C
 
+#include <boost/interprocess/managed_shared_memory.hpp>
+#include <boost/interprocess/sync/interprocess_condition.hpp>
+#include <boost/interprocess/sync/interprocess_mutex.hpp>
+#include <boost/interprocess/sync/scoped_lock.hpp>
 #include <boost/noncopyable.hpp>
 #include <boost/uuid/uuid.hpp>
-#include <boost/interprocess/managed_shared_memory.hpp>
-#include <boost/interprocess/sync/interprocess_mutex.hpp>
-#include <boost/interprocess/sync/interprocess_condition.hpp>
-#include <boost/interprocess/sync/scoped_lock.hpp>
 
-namespace bhome_shm {
+namespace bhome_shm
+{
 
 using namespace boost::interprocess;
 
@@ -37,73 +38,90 @@
 
 class SharedMemory : public mshm_t
 {
-    std::string name_;
+	std::string name_;
 
-    static permissions AllowAll() {
-        permissions perm;
-        perm.set_unrestricted();
-        return perm;
-    }
-    void Swap(SharedMemory &a);
+	static permissions AllowAll()
+	{
+		permissions perm;
+		perm.set_unrestricted();
+		return perm;
+	}
+	void Swap(SharedMemory &a);
+
 public:
-    static bool Remove(const std::string &name) {
-        return shared_memory_object::remove(name.c_str());
-    }
-    SharedMemory(const std::string &name, const uint64_t size);
-    ~SharedMemory();
-    std::string name() const { return name_; }
-    bool Remove() { return Remove(name()); }
+	static bool Remove(const std::string &name) { return shared_memory_object::remove(name.c_str()); }
 
-    void *Alloc(const size_t size) { return allocate(size, std::nothrow); }
-    void Dealloc(void *p) { if(p) { deallocate(p); } }
-    template<class T> void Dealloc(offset_ptr<T> ptr) { return Dealloc(ptr.get()); }
+	SharedMemory(const std::string &name, const uint64_t size);
+	~SharedMemory();
+	std::string name() const { return name_; }
+	bool Remove() { return Remove(name()); }
 
-    template <class T, class ...Params> T * New(Params const&...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); }
-    template <class T> void Delete(T *p) { if (p) { destroy_ptr<T>(p); }; }
-    template <class T> void Delete(offset_ptr<T> p) { Delete(p.get()); }
-    template <class T> T *Find(const std::string &name) { return find<T>(name.c_str()).first; }
+	void *Alloc(const size_t size) { return allocate(size, std::nothrow); }
+	void Dealloc(void *p)
+	{
+		if (p) { deallocate(p); }
+	}
+	template <class T>
+	void Dealloc(offset_ptr<T> ptr) { return Dealloc(ptr.get()); }
 
+	template <class T, class... Params>
+	T *New(Params const &...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); }
+	template <class T>
+	void Delete(T *p)
+	{
+		if (p) { destroy_ptr<T>(p); };
+	}
+	template <class T>
+	void Delete(offset_ptr<T> p) { Delete(p.get()); }
+	template <class T>
+	T *Find(const std::string &name) { return find<T>(name.c_str()).first; }
 };
 
 // ShmObject manages an object in shared memory, but ShmObject itself is not in shared memory.
 // works like a smart pointer of an object in shared memory.
 // TODO handshake with center, and can be removed if killed.
 template <class T>
-class ShmObject : private boost::noncopyable {
-    static std::string ObjName(const std::string &name) { return "obj" + name; }
-protected:
-    typedef T Data;
-    typedef SharedMemory ShmType;
-private:
-    ShmType &shm_;
-    std::string name_;
-    Data *pdata_ = nullptr;
+class ShmObject : private boost::noncopyable
+{
+	static std::string ObjName(const std::string &name) { return "obj" + name; }
 
-    bool IsOk() const { return pdata_; }
 protected:
-    ShmType &shm() const { return shm_; }
+	typedef T Data;
+	typedef SharedMemory ShmType;
+
+private:
+	ShmType &shm_;
+	std::string name_;
+	Data *pdata_ = nullptr;
+
+	bool IsOk() const { return pdata_; }
+
+protected:
+	ShmType &shm() const { return shm_; }
+
 public:
-    template <class...Params>
-    ShmObject(ShmType &segment, const std::string &name, Params&&...t):
-    shm_(segment), name_(name)
-    {
-        pdata_ = shm_.find_or_construct<Data>(ObjName(name_).c_str(), std::nothrow)(t...);
-        if (!IsOk()) {
-            throw("Error: Not enough memory, can not allocate \"" + name_ + "\"");
-        }
-    }
-    static Data *Find(SharedMemory &shm, const std::string &name) { return shm.Find<Data>(ObjName(name)); }
-    Data *Find(const std::string &name) { return Find(shm_, ObjName(name)); }
-    virtual ~ShmObject() {}
-    std::string name() const { return name_; }
-    Data* data() { return pdata_; }
-    const Data* data() const { return pdata_; }
-    Data* operator->() { return data(); }
-    const Data* operator->() const { return data(); }
-    bool Remove() { return shm_.destroy<Data>(ObjName(name_).c_str()); }
+	template <class... Params>
+	ShmObject(ShmType &segment, const std::string &name, Params &&...t) :
+	    shm_(segment), name_(name)
+	{
+		pdata_ = shm_.find_or_construct<Data>(ObjName(name_).c_str(), std::nothrow)(t...);
+		if (!IsOk()) {
+			throw("Error: Not enough memory, can not allocate \"" + name_ + "\"");
+		}
+	}
+	static Data *Find(SharedMemory &shm, const std::string &name) { return shm.Find<Data>(ObjName(name)); }
+	Data *Find(const std::string &name) { return Find(shm_, ObjName(name)); }
+	virtual ~ShmObject() {}
+	std::string name() const { return name_; }
+	Data *data() { return pdata_; }
+	const Data *data() const { return pdata_; }
+	Data *operator->() { return data(); }
+	const Data *operator->() const { return data(); }
+	bool Remove() { return shm_.destroy<Data>(ObjName(name_).c_str()); }
 };
 
-template <class D> using Allocator = allocator<D, SharedMemory::segment_manager>;
+template <class D>
+using Allocator = allocator<D, SharedMemory::segment_manager>;
 
 } // namespace bhome_shm
 
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index 421cebf..cf4c8b4 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -17,52 +17,57 @@
  */
 
 #include "shm_queue.h"
-#include <boost/uuid/uuid_io.hpp>
-#include <boost/uuid/uuid_generators.hpp>
 #include "bh_util.h"
+#include <boost/uuid/uuid_generators.hpp>
+#include <boost/uuid/uuid_io.hpp>
 
-namespace bhome_shm {
-using namespace bhome_msg;	
+namespace bhome_shm
+{
+using namespace bhome_msg;
 using namespace boost::interprocess;
 using namespace boost::uuids;
 
-namespace {
-std::string MsgQIdToName(const MQId& id) { return "shmq" + to_string(id); }
+namespace
+{
+std::string MsgQIdToName(const MQId &id) { return "shmq" + to_string(id); }
 // MQId EmptyId() { return nil_uuid(); }
 MQId NewId() { return random_generator()(); }
-const int AdjustMQLength(const int len) {
-    const int kMaxLength = 10000; 
-    const int kDefaultLen = 12;
-    if (len <= 0) {
-        return kDefaultLen;
-    } else if (len < kMaxLength) {
-        return len;
-    } else {
-        return kMaxLength;
-    }
+const int AdjustMQLength(const int len)
+{
+	const int kMaxLength = 10000;
+	const int kDefaultLen = 12;
+	if (len <= 0) {
+		return kDefaultLen;
+	} else if (len < kMaxLength) {
+		return len;
+	} else {
+		return kMaxLength;
+	}
 }
 
-}
+} // namespace
 
 // ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
-ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const int len):
-Super(segment, MsgQIdToName(id), AdjustMQLength(len), segment.get_segment_manager()),
-id_(id)
-{}
+ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const int len) :
+    Super(segment, MsgQIdToName(id), AdjustMQLength(len), segment.get_segment_manager()),
+    id_(id)
+{
+}
 
-ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len):
-ShmMsgQueue(NewId(), segment, len)
-{}
+ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
+    ShmMsgQueue(NewId(), segment, len)
+{
+}
 
 ShmMsgQueue::~ShmMsgQueue()
 {
-    Remove();
+	Remove();
 }
 
 bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
 {
-    Queue *remote = Find(shm, MsgQIdToName(remote_id));
-    return remote && remote->Write(msg, timeout_ms, [](const MsgI&msg){msg.AddRef();});
+	Queue *remote = Find(shm, MsgQIdToName(remote_id));
+	return remote && remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); });
 }
 
 // bool ShmMsgQueue::Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
@@ -73,15 +78,15 @@
 
 bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms)
 {
-    MsgI msg;
-    if(msg.Make(shm(), data)) {
-        if(Send(remote_id, msg, timeout_ms)) {
-            return true;
-        } else {
-            msg.Release(shm());
-        }
-    }
-    return false;
+	MsgI msg;
+	if (msg.Make(shm(), data)) {
+		if (Send(remote_id, msg, timeout_ms)) {
+			return true;
+		} else {
+			msg.Release(shm());
+		}
+	}
+	return false;
 }
 
 /*
@@ -105,13 +110,13 @@
 //*/
 bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms)
 {
-    MsgI imsg;
-    if (Read(imsg, timeout_ms)) {
-        DEFER1(imsg.Release(shm()););
-        return imsg.Unpack(msg);
-    } else {
-        return false;
-    }
+	MsgI imsg;
+	if (Read(imsg, timeout_ms)) {
+		DEFER1(imsg.Release(shm()););
+		return imsg.Unpack(msg);
+	} else {
+		return false;
+	}
 }
 
 } // namespace bhome_shm
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 60b1862..9064f55 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -19,111 +19,125 @@
 #ifndef SHM_QUEUE_JE0OEUP3
 #define SHM_QUEUE_JE0OEUP3
 
-#include "shm.h"
 #include "msg.h"
+#include "shm.h"
 #include <boost/circular_buffer.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
 
-namespace bhome_shm {
-	
-template <class D> using Circular = boost::circular_buffer<D, Allocator<D> >;
+namespace bhome_shm
+{
+
+template <class D>
+using Circular = boost::circular_buffer<D, Allocator<D>>;
 
 typedef boost::uuids::uuid MQId;
 
 template <class D>
 class SharedQueue : private Circular<D>
 {
-    typedef Circular<D> Super;
-    Mutex mutex_;
-    Cond cond_read_;
-    Cond cond_write_;
-    Mutex & mutex() { return mutex_; }
+	typedef Circular<D> Super;
+	Mutex mutex_;
+	Cond cond_read_;
+	Cond cond_write_;
+	Mutex &mutex() { return mutex_; }
 
-    static boost::posix_time::ptime MSFromNow(const int ms)
-    {
-        using namespace boost::posix_time;
-        ptime cur = boost::posix_time::microsec_clock::universal_time();
-        return cur + millisec(ms);
-    }
+	static boost::posix_time::ptime MSFromNow(const int ms)
+	{
+		using namespace boost::posix_time;
+		ptime cur = boost::posix_time::microsec_clock::universal_time();
+		return cur + millisec(ms);
+	}
 
 public:
-    SharedQueue(const uint32_t len, Allocator<D> const& alloc):Super(len, alloc) {}
-    using Super::size;
-    using Super::capacity;
-    template <class Iter, class OnWrite>
-    int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite) {
-        int n = 0;
-        if (begin != end) {
-            auto endtime = MSFromNow(timeout_ms);
-            Guard lock(mutex());
-            while (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); })) {
-                onWrite(*begin);
-                this->push_back(*begin);
-                ++n;
-                cond_read_.notify_one();
-                if (++begin == end) {
-                    break;
-                }
-            }
-        }
-        return n;
-    }
-    template <class OnWrite>
-    bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) {
-        return Write(&buf, (&buf)+1, timeout_ms, onWrite);
-    }
-    bool Write(const D &buf, const int timeout_ms) { return Write(buf, timeout_ms, [](const D &buf){}); }
+	SharedQueue(const uint32_t len, Allocator<D> const &alloc) :
+	    Super(len, alloc) {}
+	using Super::capacity;
+	using Super::size;
+	template <class Iter, class OnWrite>
+	int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite)
+	{
+		int n = 0;
+		if (begin != end) {
+			auto endtime = MSFromNow(timeout_ms);
+			Guard lock(mutex());
+			while (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); })) {
+				onWrite(*begin);
+				this->push_back(*begin);
+				++n;
+				cond_read_.notify_one();
+				if (++begin == end) {
+					break;
+				}
+			}
+		}
+		return n;
+	}
 
-    template <class OnData>
-    bool Read(const int timeout_ms, OnData onData){
-        int n = 0;
-        auto endtime = MSFromNow(timeout_ms);
-        Guard lock(mutex());
-        while (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })) {
-            const bool more = onData(this->front());
-            this->pop_front();
-            cond_write_.notify_one();
-            ++n;
-            if (!more) {
-                break;
-            }
-        }
-        return n;
-    }
-    bool Read(D &buf, const int timeout_ms){
-        auto read1 = [&](D &d) { 
-            using std::swap;
-            swap(buf, d);
-            return false;
-        };
-        return Read(timeout_ms, read1) == 1;
-    }
+	template <class OnWrite>
+	bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite)
+	{
+		return Write(&buf, (&buf) + 1, timeout_ms, onWrite);
+	}
+	bool Write(const D &buf, const int timeout_ms)
+	{
+		return Write(buf, timeout_ms, [](const D &buf) {});
+	}
+
+	template <class OnData>
+	bool Read(const int timeout_ms, OnData onData)
+	{
+		int n = 0;
+		auto endtime = MSFromNow(timeout_ms);
+		Guard lock(mutex());
+		while (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })) {
+			const bool more = onData(this->front());
+			this->pop_front();
+			cond_write_.notify_one();
+			++n;
+			if (!more) {
+				break;
+			}
+		}
+		return n;
+	}
+
+	bool Read(D &buf, const int timeout_ms)
+	{
+		auto read1 = [&](D &d) {
+			using std::swap;
+			swap(buf, d);
+			return false;
+		};
+		return Read(timeout_ms, read1) == 1;
+	}
 };
 
 using namespace bhome_msg;
 
-class ShmMsgQueue : private ShmObject<SharedQueue<MsgI> >
+class ShmMsgQueue : private ShmObject<SharedQueue<MsgI>>
 {
-    typedef ShmObject<SharedQueue<MsgI> > Super;
-    typedef Super::Data Queue;
-    bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
-    bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
-    MQId id_;
-protected:
-    ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use.
-public:
-    ShmMsgQueue(const MQId &id, ShmType &segment, const int len);
-    ShmMsgQueue(ShmType &segment, const int len);
-    ~ShmMsgQueue();
-    const MQId &Id() const { return id_; }
+	typedef ShmObject<SharedQueue<MsgI>> Super;
+	typedef Super::Data Queue;
+	bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
+	bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
+	MQId id_;
 
-    bool Recv(BHMsg &msg, const int timeout_ms);
-    bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
-    bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms);
-    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
-    bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms) {
-        return Send(shm(), remote_id, msg, timeout_ms);
-    }
+protected:
+	ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use.
+public:
+	ShmMsgQueue(const MQId &id, ShmType &segment, const int len);
+	ShmMsgQueue(ShmType &segment, const int len);
+	~ShmMsgQueue();
+	const MQId &Id() const { return id_; }
+
+	bool Recv(BHMsg &msg, const int timeout_ms);
+	bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
+	bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms);
+	static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
+	bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
+	{
+		return Send(shm(), remote_id, msg, timeout_ms);
+	}
 };
 
 } // namespace bhome_shm
diff --git a/src/socket.cpp b/src/socket.cpp
index 1c28bfa..21928b8 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -17,118 +17,118 @@
  */
 
 #include "socket.h"
-#include <chrono>
-#include "msg.h"
-#include "defs.h"
 #include "bh_util.h"
+#include "defs.h"
+#include "msg.h"
+#include <chrono>
 
 using namespace bhome_msg;
 using namespace bhome_shm;
 using namespace std::chrono_literals;
 
-namespace {
-
-int GetSocketDefaultLen(ShmSocket::Type type) 
+namespace
 {
-    switch (type) {
-        case ShmSocket::eSockRequest : return 12;
-        case ShmSocket::eSockReply : return 64;
-        case ShmSocket::eSockPublish : return 0;
-        case ShmSocket::eSockSubscribe : return 64;
-        default: return 0;
-    }
-}
 
-
-}
-
-ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm)
-    : shm_(shm),
-      type_(type),
-      run_(false)
+int GetSocketDefaultLen(ShmSocket::Type type)
 {
-    int len = GetSocketDefaultLen(type);
-    if (len != 0) {
-        mq_.reset(new Queue(shm_, len));
-
-        auto RecvProc = [this](){
-            while (run_) {
-                try {
-                    std::unique_lock<std::mutex> lk(mutex_);
-                    if (cv_recv_cb_.wait_for(lk, 100ms, [this](){ return HasRecvCB(); })) {
-                        BHMsg msg;
-                        if (mq_->Recv(msg, 100)) {
-                            this->onRecv_(msg);
-                        }
-                    }
-                } catch (...) {}
-            }
-        };
-        run_.store(true);
-        workers_.emplace_back(RecvProc);
-    }
+	switch (type) {
+	case ShmSocket::eSockRequest: return 12;
+	case ShmSocket::eSockReply: return 64;
+	case ShmSocket::eSockPublish: return 0;
+	case ShmSocket::eSockSubscribe: return 64;
+	default: return 0;
+	}
 }
-ShmSocket::ShmSocket(Type type)
-    : ShmSocket(type, BHomeShm())
+
+} // namespace
+
+ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) :
+    shm_(shm), type_(type), run_(false)
+{
+	int len = GetSocketDefaultLen(type);
+	if (len != 0) {
+		mq_.reset(new Queue(shm_, len));
+
+		auto RecvProc = [this]() {
+			while (run_) {
+				try {
+					std::unique_lock<std::mutex> lk(mutex_);
+					if (cv_recv_cb_.wait_for(lk, 100ms, [this]() { return HasRecvCB(); })) {
+						BHMsg msg;
+						if (mq_->Recv(msg, 100)) {
+							this->onRecv_(msg);
+						}
+					}
+				} catch (...) {
+				}
+			}
+		};
+		run_.store(true);
+		workers_.emplace_back(RecvProc);
+	}
+}
+
+ShmSocket::ShmSocket(Type type) :
+    ShmSocket(type, BHomeShm())
 {
 }
 
 ShmSocket::~ShmSocket()
 {
-    Stop();
+	Stop();
 }
 
 bool ShmSocket::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
 {
-    if (type_ != eSockPublish) {
-        return false;
-    }
-    assert(!mq_);
-    try {
-        MsgI imsg;
-        if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) {
-            return false;
-        }
-        DEFER1(imsg.Release(shm_));
-        return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms);
-        
-    } catch (...) {
-        return false;
-    }
+	if (type_ != eSockPublish) {
+		return false;
+	}
+	assert(!mq_);
+	try {
+		MsgI imsg;
+		if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) {
+			return false;
+		}
+		DEFER1(imsg.Release(shm_));
+		return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms);
+
+	} catch (...) {
+		return false;
+	}
 }
 
 bool ShmSocket::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
 {
-    if (type_ != eSockSubscribe) {
-        return false;
-    }
-    assert(mq_);
-    try {
-        return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms);
-    } catch (...) {
-        return false;
-    }
+	if (type_ != eSockSubscribe) {
+		return false;
+	}
+	assert(mq_);
+	try {
+		return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms);
+	} catch (...) {
+		return false;
+	}
 }
 
 bool ShmSocket::SetRecvCallback(const RecvCB &onRecv)
 {
-    std::lock_guard<std::mutex> lock(mutex_);
-    onRecv_ = onRecv;
-    cv_recv_cb_.notify_one();
-    return true;
+	std::lock_guard<std::mutex> lock(mutex_);
+	onRecv_ = onRecv;
+	cv_recv_cb_.notify_one();
+	return true;
 }
 
 bool ShmSocket::HasRecvCB()
 {
-    return static_cast<bool>(onRecv_);
+	return static_cast<bool>(onRecv_);
 }
 
 void ShmSocket::Stop()
 {
-    run_ = false;
-    for (auto &t : workers_) {
-        if (t.joinable()) {
-            t.join();
-        }
-    }
+	run_ = false;
+	for (auto &t : workers_) {
+		if (t.joinable()) {
+			t.join();
+		}
+	}
 }
\ No newline at end of file
diff --git a/src/socket.h b/src/socket.h
index e65ac83..92c1b73 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -20,55 +20,56 @@
 #define SOCKET_GWTJHBPO
 
 #include "shm_queue.h"
-#include <vector>
-#include <thread>
-#include <memory>
-#include <functional>
-#include <mutex>
-#include <condition_variable>
 #include <atomic>
+#include <condition_variable>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <vector>
 
 class ShmSocket
 {
-    typedef bhome_shm::ShmMsgQueue Queue;
+	typedef bhome_shm::ShmMsgQueue Queue;
+
 public:
-    enum Type {
-        eSockRequest,
-        eSockReply,
-        eSockSubscribe,
-        eSockPublish,
-    };
-    typedef std::function<void (bhome_msg::BHMsg &msg)> RecvCB;
+	enum Type {
+		eSockRequest,
+		eSockReply,
+		eSockSubscribe,
+		eSockPublish,
+	};
+	typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB;
 
-    ShmSocket(Type type);
-    ShmSocket(Type type, bhome_shm::SharedMemory &shm);
-    ~ShmSocket();
+	ShmSocket(Type type);
+	ShmSocket(Type type, bhome_shm::SharedMemory &shm);
+	~ShmSocket();
 
-    // bool Request(const std::string &topic, const void *data, const size_t size, onReply);
-    bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv
+	// bool Request(const std::string &topic, const void *data, const size_t size, onReply);
+	bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv
 
-    // bool HandleRequest(onData);
-    bool ReadRequest(); // exclude with HandleRequest
-    bool SendReply();   // exclude with HandleRequest
+	// bool HandleRequest(onData);
+	bool ReadRequest(); // exclude with HandleRequest
+	bool SendReply();   // exclude with HandleRequest
 
-    bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
-    bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
-    bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
-    bool SetRecvCallback(const RecvCB &onRecv);
+	bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
+	bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
+	bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
+	bool SetRecvCallback(const RecvCB &onRecv);
+
 private:
-    bool HasRecvCB();
-    void Stop();
+	bool HasRecvCB();
+	void Stop();
 
-    bhome_shm::SharedMemory &shm_;
-    Type type_;
-    std::vector<std::thread> workers_;
-    std::mutex mutex_;
-    std::condition_variable cv_recv_cb_;
-    std::atomic<bool> run_;
-    RecvCB onRecv_;
+	bhome_shm::SharedMemory &shm_;
+	Type type_;
+	std::vector<std::thread> workers_;
+	std::mutex mutex_;
+	std::condition_variable cv_recv_cb_;
+	std::atomic<bool> run_;
+	RecvCB onRecv_;
 
-    std::unique_ptr<Queue> mq_;
+	std::unique_ptr<Queue> mq_;
 };
-
 
 #endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp
index eff0209..06093fd 100644
--- a/utest/simple_tests.cpp
+++ b/utest/simple_tests.cpp
@@ -18,126 +18,123 @@
 
 #include "util.h"
 
-struct s1000 { char a[1000]; };
-
+struct s1000 {
+	char a[1000];
+};
 
 BOOST_AUTO_TEST_CASE(BasicTest)
 {
-    const std::string shm_name("basic");
-    ShmRemover auto_remove(shm_name);
-    SharedMemory shm(shm_name, 1024*1024*10);
-    auto Avail = [&]() { return shm.get_free_memory(); };
+	const std::string shm_name("basic");
+	ShmRemover auto_remove(shm_name);
+	SharedMemory shm(shm_name, 1024 * 1024 * 10);
+	auto Avail = [&]() { return shm.get_free_memory(); };
 
-    offset_ptr<const void> p;
-    BOOST_CHECK(!p);
-    BOOST_CHECK(p.get() == 0);
-    p = 0;
-    BOOST_CHECK(!p);
-    BOOST_CHECK(p.get() == 0);
-    const char *str = "basic";
-    p = str;
-    BOOST_CHECK(p);
-    BOOST_CHECK(p.get() == str);
-    p = 0;
-    BOOST_CHECK(!p);
-    BOOST_CHECK(p.get() == 0);
+	offset_ptr<const void> p;
+	BOOST_CHECK(!p);
+	BOOST_CHECK(p.get() == 0);
+	p = 0;
+	BOOST_CHECK(!p);
+	BOOST_CHECK(p.get() == 0);
+	const char *str = "basic";
+	p               = str;
+	BOOST_CHECK(p);
+	BOOST_CHECK(p.get() == str);
+	p = 0;
+	BOOST_CHECK(!p);
+	BOOST_CHECK(p.get() == 0);
 
+	auto init_avail = Avail();
 
-    auto init_avail = Avail();
+	auto BasicTest = [&](int tid, int nloop) {
+		auto Code = [&](int id) {
+			typedef ShmObject<s1000> Int;
+			std::string name = std::to_string(id);
+			auto a0          = Avail();
+			Int i1(shm, name);
+			auto a1 = Avail();
+			BOOST_CHECK_LT(a1, a0);
+			printf("s1000 size: %ld\n", a0 - a1);
+			i1->a[0] = 5;
+			Int i2(shm, name);
+			auto a2 = Avail();
+			BOOST_CHECK_EQUAL(a1, a2);
+			BOOST_CHECK_EQUAL(i1.data(), i2.data());
+			int i = i1.Remove();
+			BOOST_CHECK_EQUAL(Avail(), a0);
 
-    auto BasicTest = [&](int tid, int nloop) {
-        auto Code = [&](int id) {
+			{
+				auto old = Avail();
+				void *p  = shm.Alloc(1024);
+				shm.Dealloc(p);
+				BOOST_CHECK_EQUAL(old, Avail());
+			}
 
-            typedef ShmObject<s1000> Int;
-            std::string name = std::to_string(id);
-            auto a0 = Avail();
-            Int i1(shm, name);
-            auto a1 = Avail();
-            BOOST_CHECK_LT(a1, a0);
-            printf("s1000 size: %ld\n", a0 - a1);
-            i1->a[0] = 5;
-            Int i2(shm, name);
-            auto a2 = Avail();
-            BOOST_CHECK_EQUAL(a1, a2);
-            BOOST_CHECK_EQUAL(i1.data(), i2.data());
-            int i = i1.Remove();
-            BOOST_CHECK_EQUAL(Avail(), a0);
+			bool r = shared_memory_object::remove(shm_name.c_str());
+			BOOST_CHECK(r);
+		};
+		for (int i = 0; i < nloop; ++i) {
+			Code(i + tid * nloop);
+		}
+	};
 
-            {
-                auto old = Avail();
-                void *p = shm.Alloc(1024);
-                shm.Dealloc(p);
-                BOOST_CHECK_EQUAL(old, Avail());
-            }
-
-            bool r = shared_memory_object::remove(shm_name.c_str());
-            BOOST_CHECK(r);
-        };
-        for (int i = 0; i < nloop; ++i) {
-            Code(i + tid*nloop);
-        }
-    };
-    
-    // boost::timer::auto_cpu_timer timer;
-    ThreadManager threads;
-    int nthread = 1;
-    int nloop = 1;
-    for (int i = 0; i < nthread; ++i)
-    {
-        threads.Launch(BasicTest, i, nloop);
-    }
-    BOOST_CHECK_EQUAL(init_avail, Avail());
+	// boost::timer::auto_cpu_timer timer;
+	ThreadManager threads;
+	int nthread = 1;
+	int nloop   = 1;
+	for (int i = 0; i < nthread; ++i) {
+		threads.Launch(BasicTest, i, nloop);
+	}
+	BOOST_CHECK_EQUAL(init_avail, Avail());
 }
 
 BOOST_AUTO_TEST_CASE(ForkTest)
 {
-    ProcessManager procs;
-    const int nproc = 10;
+	ProcessManager procs;
+	const int nproc = 10;
 
-    printf("Testing fork:\n");
+	printf("Testing fork:\n");
 
-    auto child = [&](int id) {
-        std::this_thread::sleep_for(100ms *id);
-        printf("child id: %3d/%d ends\r", id, nproc);
-    };
+	auto child = [&](int id) {
+		std::this_thread::sleep_for(100ms * id);
+		printf("child id: %3d/%d ends\r", id, nproc);
+	};
 
-    for (int i = 0; i < nproc; ++i) {
-        procs.Launch(child, i+1);
-    }
+	for (int i = 0; i < nproc; ++i) {
+		procs.Launch(child, i + 1);
+	}
 }
 
 BOOST_AUTO_TEST_CASE(TimedWaitTest)
 {
-    const std::string shm_name("shm_wait");
-    ShmRemover auto_remove(shm_name);
-    SharedMemory shm(shm_name, 1024*1024);
-    ShmMsgQueue q(shm, 64);
-    for (int i = 0; i < 2; ++i) {
-        int ms = i * 100;
-        printf("Timeout Test %4d: ", ms);
-        boost::timer::auto_cpu_timer timer;
-        BHMsg msg;
-        bool r = q.Recv(msg, ms);
-        BOOST_CHECK(!r);
-    }
+	const std::string shm_name("shm_wait");
+	ShmRemover auto_remove(shm_name);
+	SharedMemory shm(shm_name, 1024 * 1024);
+	ShmMsgQueue q(shm, 64);
+	for (int i = 0; i < 2; ++i) {
+		int ms = i * 100;
+		printf("Timeout Test %4d: ", ms);
+		boost::timer::auto_cpu_timer timer;
+		BHMsg msg;
+		bool r = q.Recv(msg, ms);
+		BOOST_CHECK(!r);
+	}
 }
 
 BOOST_AUTO_TEST_CASE(RefCountTest)
 {
-    const std::string shm_name("ShmRefCount");
-    ShmRemover auto_remove(shm_name);
-    SharedMemory shm(shm_name, 1024*1024);
+	const std::string shm_name("ShmRefCount");
+	ShmRemover auto_remove(shm_name);
+	SharedMemory shm(shm_name, 1024 * 1024);
 
-    MsgI m0(shm.Alloc(1000), shm.New<RefCount>());
-    BOOST_CHECK(m0.IsCounted());
-    BOOST_CHECK_EQUAL(m0.Count(), 1);
-    MsgI m1 = m0;
-    BOOST_CHECK(m1.IsCounted());
-    BOOST_CHECK_EQUAL(m1.AddRef(), 2);
-    BOOST_CHECK_EQUAL(m0.AddRef(), 3);
-    BOOST_CHECK_EQUAL(m0.Release(shm), 2);
-    BOOST_CHECK_EQUAL(m0.Release(shm), 1);
-    BOOST_CHECK_EQUAL(m1.Release(shm), 0);
-    BOOST_CHECK(!m1.IsCounted());
+	MsgI m0(shm.Alloc(1000), shm.New<RefCount>());
+	BOOST_CHECK(m0.IsCounted());
+	BOOST_CHECK_EQUAL(m0.Count(), 1);
+	MsgI m1 = m0;
+	BOOST_CHECK(m1.IsCounted());
+	BOOST_CHECK_EQUAL(m1.AddRef(), 2);
+	BOOST_CHECK_EQUAL(m0.AddRef(), 3);
+	BOOST_CHECK_EQUAL(m0.Release(shm), 2);
+	BOOST_CHECK_EQUAL(m0.Release(shm), 1);
+	BOOST_CHECK_EQUAL(m1.Release(shm), 0);
+	BOOST_CHECK(!m1.IsCounted());
 }
-
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index b1cba46..35465bb 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -22,176 +22,175 @@
 
 BOOST_AUTO_TEST_CASE(SpeedTest)
 {
-    const std::string shm_name("ShmSpeed");
-    ShmRemover auto_remove(shm_name);
-    const int mem_size = 1024*1024*50;
-    MQId id = boost::uuids::random_generator()();
-    const int timeout = 100;
-    const uint32_t data_size = 4000;
+	const std::string shm_name("ShmSpeed");
+	ShmRemover auto_remove(shm_name);
+	const int mem_size       = 1024 * 1024 * 50;
+	MQId id                  = boost::uuids::random_generator()();
+	const int timeout        = 100;
+	const uint32_t data_size = 4000;
 
-    auto Writer = [&](int writer_id, uint64_t n) {
-        SharedMemory shm(shm_name, mem_size);
-        ShmMsgQueue mq(shm, 64);
-        std::string str(data_size, 'a');
-        MsgI msg;
-        DEFER1(msg.Release(shm););
-        msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size()));
-        for (uint64_t i = 0; i < n; ++i) {
-            // mq.Send(id, str.data(), str.size(), timeout);
-            mq.Send(id, msg, timeout);
-        }
-    };
-    auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork){
-        SharedMemory shm(shm_name, mem_size);
-        ShmMsgQueue mq(id, shm, 1000);
-        while(*run) {
-            BHMsg msg;
-            if (mq.Recv(msg, timeout)) {
-                // ok
-            } else if (isfork) {
-                exit(0); // for forked quit after 1s.
-            }
-        }
-    };
-    auto State = [&](std::atomic<bool> *run){
-        SharedMemory shm(shm_name, mem_size);
-        auto init = shm.get_free_memory();
-        printf("shm init : %ld\n", init);
-        while (*run) {
-            auto cur = shm.get_free_memory();
-            printf("shm used : %8ld/%ld\n", init - cur, init);
-            std::this_thread::sleep_for(1s);
-        }
-    };
+	auto Writer = [&](int writer_id, uint64_t n) {
+		SharedMemory shm(shm_name, mem_size);
+		ShmMsgQueue mq(shm, 64);
+		std::string str(data_size, 'a');
+		MsgI msg;
+		DEFER1(msg.Release(shm););
+		msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size()));
+		for (uint64_t i = 0; i < n; ++i) {
+			// mq.Send(id, str.data(), str.size(), timeout);
+			mq.Send(id, msg, timeout);
+		}
+	};
+	auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
+		SharedMemory shm(shm_name, mem_size);
+		ShmMsgQueue mq(id, shm, 1000);
+		while (*run) {
+			BHMsg msg;
+			if (mq.Recv(msg, timeout)) {
+				// ok
+			} else if (isfork) {
+				exit(0); // for forked quit after 1s.
+			}
+		}
+	};
+	auto State = [&](std::atomic<bool> *run) {
+		SharedMemory shm(shm_name, mem_size);
+		auto init = shm.get_free_memory();
+		printf("shm init : %ld\n", init);
+		while (*run) {
+			auto cur = shm.get_free_memory();
+			printf("shm used : %8ld/%ld\n", init - cur, init);
+			std::this_thread::sleep_for(1s);
+		}
+	};
 
-    int nwriters[] = {1,2,4};
-    int nreaders[] = {1,2};
+	int nwriters[] = {1, 2, 4};
+	int nreaders[] = {1, 2};
 
-    auto Test = [&](auto &www, auto &rrr, bool isfork) {
-        for (auto nreader : nreaders) {
-            for (auto nwriter : nwriters) {
-                const uint64_t nmsg = 1000 * 1000 * 10 / nwriter;
-                const uint64_t total_msg = nmsg * nwriter;
-                std::atomic<bool> run(true);
-                std::this_thread::sleep_for(10ms);
-                boost::timer::auto_cpu_timer timer;
-                for (int i = 0; i < nreader; ++i) {
-                    rrr.Launch(Reader, i, &run, isfork);
-                }
-                for (int i = 0; i < nwriter; ++i) {
-                    www.Launch(Writer, i, nmsg);
-                }
-                www.WaitAll();
-                run.store(false);
-                rrr.WaitAll();
-                printf("Write %ld msg  R(%3d) W(%3d), : ", total_msg, nreader, nwriter);
-            }
-        }
-    };
+	auto Test = [&](auto &www, auto &rrr, bool isfork) {
+		for (auto nreader : nreaders) {
+			for (auto nwriter : nwriters) {
+				const uint64_t nmsg      = 1000 * 1000 * 10 / nwriter;
+				const uint64_t total_msg = nmsg * nwriter;
+				std::atomic<bool> run(true);
+				std::this_thread::sleep_for(10ms);
+				boost::timer::auto_cpu_timer timer;
+				for (int i = 0; i < nreader; ++i) {
+					rrr.Launch(Reader, i, &run, isfork);
+				}
+				for (int i = 0; i < nwriter; ++i) {
+					www.Launch(Writer, i, nmsg);
+				}
+				www.WaitAll();
+				run.store(false);
+				rrr.WaitAll();
+				printf("Write %ld msg  R(%3d) W(%3d), : ", total_msg, nreader, nwriter);
+			}
+		}
+	};
 
-    std::atomic<bool> run(true);
-    ThreadManager state;
-    state.Launch(State, &run);
-    // typedef ProcessManager Manager;
-    // typedef ThreadManager Manager;
-    // const bool isfork = IsSameType<Manager, ProcessManager>::value;
-    ProcessManager pw, pr;
-    printf("================ Testing process io: =======================================================\n");
-    Test(pw, pr, true);
-    ThreadManager tw, tr;
-    printf("---------------- Testing thread io:  -------------------------------------------------------\n");
-    Test(tw, tr, false);
-    run.store(false);
+	std::atomic<bool> run(true);
+	ThreadManager state;
+	state.Launch(State, &run);
+	// typedef ProcessManager Manager;
+	// typedef ThreadManager Manager;
+	// const bool isfork = IsSameType<Manager, ProcessManager>::value;
+	ProcessManager pw, pr;
+	printf("================ Testing process io: =======================================================\n");
+	Test(pw, pr, true);
+	ThreadManager tw, tr;
+	printf("---------------- Testing thread io:  -------------------------------------------------------\n");
+	Test(tw, tr, false);
+	run.store(false);
 }
 
 // Request Reply Test
 BOOST_AUTO_TEST_CASE(RRTest)
 {
-    const std::string shm_name("ShmReqRep");
-    ShmRemover auto_remove(shm_name);
-    const int qlen = 64;
-    const size_t msg_length = 1000;
-    std::string msg_content(msg_length, 'a');
-    msg_content[20] = '\0';
+	const std::string shm_name("ShmReqRep");
+	ShmRemover auto_remove(shm_name);
+	const int qlen          = 64;
+	const size_t msg_length = 1000;
+	std::string msg_content(msg_length, 'a');
+	msg_content[20] = '\0';
 
-    SharedMemory shm(shm_name, 1024*1024*50);
-    auto Avail = [&]() { return shm.get_free_memory(); };
-    auto init_avail = Avail();
-    ShmMsgQueue srv(shm, qlen);
-    ShmMsgQueue cli(shm, qlen);
+	SharedMemory shm(shm_name, 1024 * 1024 * 50);
+	auto Avail      = [&]() { return shm.get_free_memory(); };
+	auto init_avail = Avail();
+	ShmMsgQueue srv(shm, qlen);
+	ShmMsgQueue cli(shm, qlen);
 
-    MsgI request_rc;
-    request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size()));
-    MsgI reply_rc;
-    reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size()));
+	MsgI request_rc;
+	request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size()));
+	MsgI reply_rc;
+	reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size()));
 
-    std::atomic<uint64_t> count(0);
+	std::atomic<uint64_t> count(0);
 
-    std::atomic<ptime> last_time(Now() - seconds(1));
-    std::atomic<uint64_t> last_count(0);
+	std::atomic<ptime> last_time(Now() - seconds(1));
+	std::atomic<uint64_t> last_count(0);
 
-    auto Client = [&](int cli_id, int nmsg){
-        for (int i = 0; i < nmsg; ++i) {
-            auto Req = [&]() {
-                return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100);
-            };
-            auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
+	auto Client = [&](int cli_id, int nmsg) {
+		for (int i = 0; i < nmsg; ++i) {
+			auto Req = [&]() {
+				return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100);
+			};
+			auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
 
-            if (!ReqRC()) {
-                printf("********** client send error.\n");
-                continue;
-            }
-            BHMsg msg;
-            if (!cli.Recv(msg, 1000)) {
-                printf("********** client recv error.\n");
-            } else {
-                ++count;
-                auto cur = Now();
-                if (last_time.exchange(cur) < cur) {
-                    std::cout << "time: " << cur;
-                    printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
-                           count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count());
-                }
+			if (!ReqRC()) {
+				printf("********** client send error.\n");
+				continue;
+			}
+			BHMsg msg;
+			if (!cli.Recv(msg, 1000)) {
+				printf("********** client recv error.\n");
+			} else {
+				++count;
+				auto cur = Now();
+				if (last_time.exchange(cur) < cur) {
+					std::cout << "time: " << cur;
+					printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
+					       count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count());
+				}
+			}
+		}
+	};
 
-            }
-        }
-    };
+	std::atomic<bool> stop(false);
+	auto Server = [&]() {
+		BHMsg req;
+		while (!stop) {
+			if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) {
+				auto &mqid = req.route()[0].mq_id();
+				MQId src_id;
+				memcpy(&src_id, mqid.data(), sizeof(src_id));
+				auto Reply = [&]() {
+					return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100);
+				};
+				auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); };
 
-    std::atomic<bool> stop(false);
-    auto Server = [&](){
-        BHMsg req;
-        while (!stop) {
-            if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) {
-                auto &mqid = req.route()[0].mq_id();
-                MQId src_id;
-                memcpy(&src_id, mqid.data(), sizeof(src_id));
-                auto Reply = [&]() {
-                    return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100);
-                };
-                auto ReplyRC = [&](){ return srv.Send(src_id, reply_rc, 100); };
+				if (ReplyRC()) {
+				}
+			}
+		}
+	};
 
-                if (ReplyRC()) {
-                }
-            }
-        }
-    };
+	boost::timer::auto_cpu_timer timer;
+	DEFER1(printf("Request Reply Test:"););
 
-    boost::timer::auto_cpu_timer timer;
-    DEFER1(printf("Request Reply Test:"););
-
-    ThreadManager clients, servers;
-    for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
-    int ncli = 100*1;
-    uint64_t nmsg = 100*100*2;
-    printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
-    for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
-    clients.WaitAll();
-    printf("request ok: %ld\n", count.load());
-    stop = true;
-    servers.WaitAll();
-    BOOST_CHECK(request_rc.IsCounted());
-    BOOST_CHECK_EQUAL(request_rc.Count(), 1);
-    request_rc.Release(shm);
-    BOOST_CHECK(!request_rc.IsCounted());
-    // BOOST_CHECK_THROW(reply.Count(), int);
+	ThreadManager clients, servers;
+	for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
+	int ncli      = 100 * 1;
+	uint64_t nmsg = 100 * 100 * 2;
+	printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
+	for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
+	clients.WaitAll();
+	printf("request ok: %ld\n", count.load());
+	stop = true;
+	servers.WaitAll();
+	BOOST_CHECK(request_rc.IsCounted());
+	BOOST_CHECK_EQUAL(request_rc.Count(), 1);
+	request_rc.Release(shm);
+	BOOST_CHECK(!request_rc.IsCounted());
+	// BOOST_CHECK_THROW(reply.Count(), int);
 }
diff --git a/utest/utest.cpp b/utest/utest.cpp
index bb5c14d..473b04e 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,167 +1,172 @@
-#include <stdio.h>
-#include <string>
-#include <vector>
-#include <thread>
+#include "defs.h"
+#include "pubsub.h"
+#include "socket.h"
+#include "util.h"
 #include <atomic>
 #include <boost/uuid/uuid_generators.hpp>
 #include <boost/uuid/uuid_io.hpp>
-#include "pubsub.h"
-#include "defs.h"
-#include "util.h"
-#include "socket.h"
+#include <stdio.h>
+#include <string>
+#include <thread>
+#include <vector>
 
-template <class A, class B> struct IsSameType { static const bool value = false; };
-template <class A> struct IsSameType<A,A> { static const bool value = true; };
-
+template <class A, class B>
+struct IsSameType {
+	static const bool value = false;
+};
+template <class A>
+struct IsSameType<A, A> {
+	static const bool value = true;
+};
 
 BOOST_AUTO_TEST_CASE(Temp)
 {
-    std::string topics[] = {
-        "",
-        ".",
-        "a",
-        "sp",
-        "sport",
-        "sport.",
-        "sport.a",
-        "sport.a.b.c",
-        "sport.ab.c",
-        "sport.basketball",
-        "sport.football",
-    };
-    const char sep = '.';
-    auto Adjust = [&](const std::string &user_topic) {
-        if (user_topic.empty() || user_topic.back() == sep) {
-            return user_topic;
-        } else {
-            return user_topic + sep;
-        }
-    };
+	std::string topics[] = {
+	    "",
+	    ".",
+	    "a",
+	    "sp",
+	    "sport",
+	    "sport.",
+	    "sport.a",
+	    "sport.a.b.c",
+	    "sport.ab.c",
+	    "sport.basketball",
+	    "sport.football",
+	};
+	const char sep = '.';
+	auto Adjust = [&](const std::string &user_topic) {
+		if (user_topic.empty() || user_topic.back() == sep) {
+			return user_topic;
+		} else {
+			return user_topic + sep;
+		}
+	};
 
-    for (auto &t : topics) {
-        const std::string &a = Adjust(t);
-        printf("orig: %20s   adjusted: %20s   parts:[", ("'" + t + "'").c_str(), ('\'' + a + '\'').c_str());
+	for (auto &t : topics) {
+		const std::string &a = Adjust(t);
+		printf("orig: %20s   adjusted: %20s   parts:[", ("'" + t + "'").c_str(), ('\'' + a + '\'').c_str());
 
-        size_t pos = 0;
-        while (true) {
-            auto &topic = t;
-            pos = topic.find(kTopicSep, pos);
-            if (pos == topic.npos || ++pos == topic.size()) {
-                // Find1(std::string()); // sub all.
-                break;
-            } else {
-                printf("'%s',", topic.substr(0, pos).c_str());
-            }
-        }
-        printf("]\n");
-    }
+		size_t pos = 0;
+		while (true) {
+			auto &topic = t;
+			pos = topic.find(kTopicSep, pos);
+			if (pos == topic.npos || ++pos == topic.size()) {
+				// Find1(std::string()); // sub all.
+				break;
+			} else {
+				printf("'%s',", topic.substr(0, pos).c_str());
+			}
+		}
+		printf("]\n");
+	}
 }
 
 BOOST_AUTO_TEST_CASE(PubSubTest)
 {
-    const std::string shm_name("ShmPubSub");
-    ShmRemover auto_remove(shm_name); //remove twice? in case of killed?
-    SharedMemory shm(shm_name, 1024*1024*50);
-    auto Avail = [&]() { return shm.get_free_memory(); };
-    auto init_avail = Avail();
+	const std::string shm_name("ShmPubSub");
+	ShmRemover auto_remove(shm_name); //remove twice? in case of killed?
+	SharedMemory shm(shm_name, 1024 * 1024 * 50);
+	auto Avail = [&]() { return shm.get_free_memory(); };
+	auto init_avail = Avail();
 
-    BusManager bus(shm);
-    bus.Start(1);
-    std::this_thread::sleep_for(100ms);
+	BusManager bus(shm);
+	bus.Start(1);
+	std::this_thread::sleep_for(100ms);
 
-    std::atomic<uint64_t> count(0);
-    std::atomic<ptime> last_time(Now() - seconds(1));
-    std::atomic<uint64_t> last_count(0);
+	std::atomic<uint64_t> count(0);
+	std::atomic<ptime> last_time(Now() - seconds(1));
+	std::atomic<uint64_t> last_count(0);
 
-    const uint64_t nmsg = 100;
-    const int timeout = 1000;
-    auto Sub = [&](int id, const std::vector<std::string> &topics) {
-        ShmSocket client(ShmSocket::eSockSubscribe, shm);
-        bool r = client.Subscribe(topics, timeout);
-        std::mutex mutex;
-        std::condition_variable cv;
+	const uint64_t nmsg = 100;
+	const int timeout = 1000;
+	auto Sub = [&](int id, const std::vector<std::string> &topics) {
+		ShmSocket client(ShmSocket::eSockSubscribe, shm);
+		bool r = client.Subscribe(topics, timeout);
+		std::mutex mutex;
+		std::condition_variable cv;
 
-        int i = 0;
-        auto OnRecv = [&](BHMsg &msg) {
-            if (msg.type() != kMsgTypePublish) {
-                BOOST_CHECK(false);
-            }
-            DataPub pub;
-            if (!pub.ParseFromString(msg.body())) {
-                BOOST_CHECK(false);
-            }
-            ++count;
+		int i = 0;
+		auto OnRecv = [&](BHMsg &msg) {
+			if (msg.type() != kMsgTypePublish) {
+				BOOST_CHECK(false);
+			}
+			DataPub pub;
+			if (!pub.ParseFromString(msg.body())) {
+				BOOST_CHECK(false);
+			}
+			++count;
 
-            auto cur = Now();
-            if (last_time.exchange(cur) < cur) {
-                std::cout << "time: " << cur;
-                printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
-                       count.load(), count - last_count.exchange(count), init_avail - Avail());
-            }
-            if (++i >= nmsg*topics.size()) {
-                cv.notify_one();
-            }
-            // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
-        };
-        client.SetRecvCallback(OnRecv);
+			auto cur = Now();
+			if (last_time.exchange(cur) < cur) {
+				std::cout << "time: " << cur;
+				printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
+				       count.load(), count - last_count.exchange(count), init_avail - Avail());
+			}
+			if (++i >= nmsg * topics.size()) {
+				cv.notify_one();
+			}
+			// printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
+		};
+		client.SetRecvCallback(OnRecv);
 
-        std::unique_lock<std::mutex> lk(mutex);
-        cv.wait(lk);
+		std::unique_lock<std::mutex> lk(mutex);
+		cv.wait(lk);
+	};
 
-    };
+	auto Pub = [&](const std::string &topic) {
+		ShmSocket provider(ShmSocket::eSockPublish, shm);
+		for (int i = 0; i < nmsg; ++i) {
+			std::string data = topic + std::to_string(i) + std::string(1000, '-');
 
-    auto Pub = [&](const std::string &topic) {
-        ShmSocket provider(ShmSocket::eSockPublish, shm);
-        for (int i = 0; i < nmsg; ++i) {
-            std::string data = topic + std::to_string(i) + std::string(1000, '-');
+			bool r = provider.Publish(topic, data.data(), data.size(), timeout);
+			// bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout);
+			if (!r) {
+				printf("pub ret: %s\n", r ? "ok" : "fail");
+			}
+		}
+	};
+	ThreadManager threads;
+	typedef std::vector<std::string> Topics;
+	Topics topics;
+	for (int i = 0; i < 100; ++i) {
+		topics.push_back("t" + std::to_string(i));
+	}
+	Topics part;
+	for (int i = 0; i < topics.size(); ++i) {
+		part.push_back(topics[i]);
+		threads.Launch(Sub, i, topics);
+	}
+	std::this_thread::sleep_for(100ms);
+	for (auto &topic : topics) {
+		threads.Launch(Pub, topic);
+	}
+	threads.Launch(Pub, "some_else");
 
-            bool r = provider.Publish(topic, data.data(), data.size(), timeout);
-            // bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout);
-            if (!r) {
-                printf("pub ret: %s\n", r ? "ok" : "fail");
-            }
-        }
-    };
-    ThreadManager threads;
-    typedef std::vector<std::string> Topics;
-    Topics topics;
-    for (int i = 0; i < 100; ++i) {
-        topics.push_back("t" + std::to_string(i));
-    }
-    Topics part;
-    for (int i = 0; i < topics.size(); ++i) {
-        part.push_back(topics[i]);
-        threads.Launch(Sub, i, topics);
-    }
-    std::this_thread::sleep_for(100ms);
-    for (auto &topic: topics) {
-        threads.Launch(Pub, topic);
-    }
-    threads.Launch(Pub, "some_else");
+	threads.WaitAll();
+	std::cout << "end : " << Now();
+	printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
+	       count.load(), count - last_count.exchange(count), init_avail - Avail());
 
-    threads.WaitAll();
-    std::cout << "end : " << Now();
-    printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
-           count.load(), count - last_count.exchange(count), init_avail - Avail());
-
-    bus.Stop();
+	bus.Stop();
 }
 
-inline int MyMin(int a, int b) {
-    printf("MyMin\n");
-    return a < b ? a : b;
+inline int MyMin(int a, int b)
+{
+	printf("MyMin\n");
+	return a < b ? a : b;
 }
+
 int test_main(int argc, char *argv[])
 {
-    printf("test main\n");
-    int a = 0;
-    int b = 0;
-    BOOST_CHECK_EQUAL(a, b);
-    int n = MyMin(4,6);
-    for (int i = 0; i < n; ++i) {
-        printf("i = %d\n", i);
-    }
+	printf("test main\n");
+	int a = 0;
+	int b = 0;
+	BOOST_CHECK_EQUAL(a, b);
+	int n = MyMin(4, 6);
+	for (int i = 0; i < n; ++i) {
+		printf("i = %d\n", i);
+	}
 
-    return 0;
+	return 0;
 }
-
diff --git a/utest/util.h b/utest/util.h
index ac1d58d..ca58cd7 100644
--- a/utest/util.h
+++ b/utest/util.h
@@ -19,21 +19,21 @@
 #ifndef UTIL_W8A0OA5U
 #define UTIL_W8A0OA5U
 
-#include <functional>
-#include <vector>
-#include <thread>
-#include <stdlib.h>
-#include <chrono>
-#include <sys/types.h>
-#include <sys/wait.h>
-#include <boost/noncopyable.hpp>
-#include <boost/timer/timer.hpp>
-#include <boost/test/unit_test.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include "bh_util.h"
+#include "msg.h"
 #include "shm.h"
 #include "shm_queue.h"
-#include "msg.h"
-#include "bh_util.h"
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/noncopyable.hpp>
+#include <boost/test/unit_test.hpp>
+#include <boost/timer/timer.hpp>
+#include <chrono>
+#include <functional>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <thread>
+#include <vector>
 
 using namespace boost::posix_time;
 inline ptime Now() { return second_clock::universal_time(); };
@@ -42,58 +42,69 @@
 
 typedef std::function<void(void)> FuncVV;
 
-class ScopeCall : private boost::noncopyable {
-    FuncVV f_;
+class ScopeCall : private boost::noncopyable
+{
+	FuncVV f_;
+
 public:
-    ScopeCall(FuncVV f):f_(f) { f_(); }
-    ~ScopeCall() { f_(); }
+	ScopeCall(FuncVV f) :
+	    f_(f) { f_(); }
+	~ScopeCall() { f_(); }
 };
-class ThreadManager {
-    std::vector<std::thread> threads_;
+class ThreadManager
+{
+	std::vector<std::thread> threads_;
+
 public:
-    ~ThreadManager() { WaitAll(); }
-    template <class T, class...P>
-    void Launch(T t, P...p) { threads_.emplace_back(t, p...); }
-    void WaitAll() {
-        for (auto &t : threads_) {
-            if (t.joinable()) {
-                t.join();
-            }
-        }
-    }
+	~ThreadManager() { WaitAll(); }
+	template <class T, class... P>
+	void Launch(T t, P... p) { threads_.emplace_back(t, p...); }
+	void WaitAll()
+	{
+		for (auto &t : threads_) {
+			if (t.joinable()) {
+				t.join();
+			}
+		}
+	}
 };
-class ProcessManager {
-    std::vector<pid_t> procs_;
+class ProcessManager
+{
+	std::vector<pid_t> procs_;
+
 public:
-    ~ProcessManager() { WaitAll(); }
-    template <class T, class ...P>
-    void Launch(T t, P...p) {
-        auto pid = fork();
-        if (pid == 0) {
-            // child
-            t(p...);
-            exit(0);
-        } else if (pid != -1) { // Ok
-            procs_.push_back(pid);
-        }
-    };
-    void WaitAll() {
-        for (auto &pid: procs_) {
-            int status = 0;
-            int options = WUNTRACED | WCONTINUED;
-            waitpid(pid, &status, options);
-        }
-        procs_.clear();
-    }
+	~ProcessManager() { WaitAll(); }
+	template <class T, class... P>
+	void Launch(T t, P... p)
+	{
+		auto pid = fork();
+		if (pid == 0) {
+			// child
+			t(p...);
+			exit(0);
+		} else if (pid != -1) { // Ok
+			procs_.push_back(pid);
+		}
+	};
+	void WaitAll()
+	{
+		for (auto &pid : procs_) {
+			int status = 0;
+			int options = WUNTRACED | WCONTINUED;
+			waitpid(pid, &status, options);
+		}
+		procs_.clear();
+	}
 };
 
 using namespace bhome_shm;
 using namespace bhome_msg;
 
 struct ShmRemover {
-    std::string name_;
-    ShmRemover(const std::string &name):name_(name) { SharedMemory::Remove(name_); }
-    ~ShmRemover() { SharedMemory::Remove(name_); }
+	std::string name_;
+	ShmRemover(const std::string &name) :
+	    name_(name) { SharedMemory::Remove(name_); }
+	~ShmRemover() { SharedMemory::Remove(name_); }
 };
 
 #endif // end of include guard: UTIL_W8A0OA5U

--
Gitblit v1.8.0