From 6eefba812ede29549af3633c490f2e85a4805524 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 31 三月 2021 11:24:20 +0800
Subject: [PATCH] format code style.
---
src/shm.h | 130 ++-
src/socket.h | 75 +-
utest/simple_tests.cpp | 191 ++--
src/msg.h | 79 +
src/socket.cpp | 158 ++--
.clang-format | 12
src/defs.h | 5
utest/utest.cpp | 279 +++---
src/shm_queue.cpp | 91 +-
src/center.cpp | 3
src/msg.cpp | 151 ++--
utest/speed_test.cpp | 305 ++++----
utest/util.h | 119 +-
src/bh_util.h | 60
src/pubsub.h | 37
src/shm_queue.h | 184 ++--
src/center.h | 1
src/pubsub.cpp | 248 +++---
src/shm.cpp | 6
19 files changed, 1,104 insertions(+), 1,030 deletions(-)
diff --git a/.clang-format b/.clang-format
index fd84ae9..413be3a 100644
--- a/.clang-format
+++ b/.clang-format
@@ -3,21 +3,21 @@
IndentWidth: 4
TabWidth: 4
ColumnLimit: 0
-AllowShortIfStatementsOnASingleLine: true
AllowShortCaseLabelsOnASingleLine: true
AllowShortBlocksOnASingleLine: true
AllowShortEnumsOnASingleLine: true
AllowShortLoopsOnASingleLine: true
-AllowShortLambdasOnASingleLine: true
-AllowShortFunctionsOnASingleLine: true
+AllowShortLambdasOnASingleLine: All
+AllowShortIfStatementsOnASingleLine: WithoutElse
+AllowShortFunctionsOnASingleLine: All
BreakBeforeBraces: Linux
IndentCaseLabels: false
AccessModifierOffset: -4
BreakConstructorInitializers: AfterColon
-AlignConsecutiveAssignments: true
-AlignConsecutiveDeclarations: true
+ConstructorInitializerAllOnOneLineOrOnePerLine: true
+AllowAllConstructorInitializersOnNextLine: true
+
AlignTrailingComments: true
AlignEscapedNewlinesLeft: true
PointerAlignment: Right
SpaceAfterCStyleCast: true
-
diff --git a/src/bh_util.h b/src/bh_util.h
index d86b931..b5dc45e 100644
--- a/src/bh_util.h
+++ b/src/bh_util.h
@@ -23,7 +23,7 @@
inline uint16_t Get8(const void *p)
{
- return static_cast<const uint8_t*>(p)[0];
+ return static_cast<const uint8_t *>(p)[0];
}
inline void Put8(void *p, uint8_t u)
{
@@ -32,9 +32,9 @@
inline uint16_t Get16(const void *p)
{
- auto ptr = static_cast<const uint8_t*>(p);
- return (((uint16_t)ptr[0]) << 8u) |
- (((uint16_t)ptr[1]));
+ auto ptr = static_cast<const uint8_t *>(p);
+ return (((uint16_t) ptr[0]) << 8u) |
+ (((uint16_t) ptr[1]));
}
inline void Put16(void *p, uint16_t u)
{
@@ -45,11 +45,11 @@
inline uint32_t Get32(const void *p)
{
- auto ptr = static_cast<const uint8_t*>(p);
- return (((uint32_t)ptr[0]) << 24u) |
- (((uint32_t)ptr[1]) << 16u) |
- (((uint32_t)ptr[2]) << 8u) |
- (((uint32_t)ptr[3]));
+ auto ptr = static_cast<const uint8_t *>(p);
+ return (((uint32_t) ptr[0]) << 24u) |
+ (((uint32_t) ptr[1]) << 16u) |
+ (((uint32_t) ptr[2]) << 8u) |
+ (((uint32_t) ptr[3]));
}
inline void Put32(void *p, uint32_t u)
{
@@ -60,17 +60,17 @@
ptr[3] = (uint8_t)(u);
}
-inline uint64_t Get64(const void *p)
+inline uint64_t Get64(const void *p)
{
- auto ptr = static_cast<const uint8_t*>(p);
- return (((uint64_t)ptr[0]) << 56u) |
- (((uint64_t)ptr[1]) << 48u) |
- (((uint64_t)ptr[2]) << 40u) |
- (((uint64_t)ptr[3]) << 32u) |
- (((uint64_t)ptr[4]) << 24u) |
- (((uint64_t)ptr[5]) << 16u) |
- (((uint64_t)ptr[6]) << 8u) |
- ((uint64_t)ptr[7]);
+ auto ptr = static_cast<const uint8_t *>(p);
+ return (((uint64_t) ptr[0]) << 56u) |
+ (((uint64_t) ptr[1]) << 48u) |
+ (((uint64_t) ptr[2]) << 40u) |
+ (((uint64_t) ptr[3]) << 32u) |
+ (((uint64_t) ptr[4]) << 24u) |
+ (((uint64_t) ptr[5]) << 16u) |
+ (((uint64_t) ptr[6]) << 8u) |
+ ((uint64_t) ptr[7]);
}
inline void Put64(void *p, uint64_t u)
{
@@ -92,20 +92,24 @@
class ExitCall
{
- typedef std::function<void(void)> func_t;
- func_t m_func;
+ typedef std::function<void(void)> func_t;
+ func_t m_func;
+
public:
- explicit ExitCall(func_t f): m_func(f) {}
- ~ExitCall() { if (m_func) { m_func(); } }
+ explicit ExitCall(func_t f) :
+ m_func(f) {}
+ ~ExitCall()
+ {
+ if (m_func) { m_func(); }
+ }
};
// macro helper
-#define JOIN_IMPL(a, b) a ## b
-#define JOIN(a, b) JOIN_IMPL(a , b)
+#define JOIN_IMPL(a, b) a##b
+#define JOIN(a, b) JOIN_IMPL(a, b)
// defer function / lambda.
-#define DEFERF(func) ExitCall JOIN(defer_ , __LINE__)(func)
+#define DEFERF(func) ExitCall JOIN(defer_, __LINE__)(func)
// defer simple expression
-#define DEFER1(expr) DEFERF([&](){ expr; })
-
+#define DEFER1(expr) DEFERF([&]() { expr; })
#endif /* end of include guard: BH_UTIL_SOXWOK67 */
diff --git a/src/center.cpp b/src/center.cpp
index 809b6d1..db000c4 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -23,7 +23,6 @@
SharedMemory &BHomeShm()
{
- static SharedMemory shm("bhome_default_shm_v0", 1024*1024*64);
+ static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 64);
return shm;
}
-
diff --git a/src/center.h b/src/center.h
index fcb8005..153cc3e 100644
--- a/src/center.h
+++ b/src/center.h
@@ -20,7 +20,6 @@
class BHCenter
{
-
};
#endif // end of include guard: CENTER_TM9OUQTG
diff --git a/src/defs.h b/src/defs.h
index acfe09e..10ac73c 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -27,9 +27,10 @@
const MQId kBHBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
const int kBHCenterPort = 24287;
const char kTopicSep = '.';
-namespace bhome_shm {
+namespace bhome_shm
+{
class SharedMemory;
-}
+} // namespace bhome_shm
bhome_shm::SharedMemory &BHomeShm();
diff --git a/src/msg.cpp b/src/msg.cpp
index 78834a8..3a01240 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -18,48 +18,49 @@
#include "msg.h"
#include "bh_util.h"
-namespace bhome_msg {
+namespace bhome_msg
+{
const uint32_t kMsgTag = 0xf1e2d3c4;
const uint32_t kMsgPrefixLen = 4;
BHMsg InitMsg(MsgType type)
{
- BHMsg msg;
- msg.set_type(type);
- time_t tm = 0;
- msg.set_timestamp(time(&tm));
- return msg;
+ BHMsg msg;
+ msg.set_type(type);
+ time_t tm = 0;
+ msg.set_timestamp(time(&tm));
+ return msg;
}
BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size)
{
- assert(data && size);
- BHMsg msg(InitMsg(kMsgTypeRequest));
- msg.set_body(data, size);
- msg.add_route()->set_mq_id(&src_id, sizeof(src_id));
- return msg;
+ assert(data && size);
+ BHMsg msg(InitMsg(kMsgTypeRequest));
+ msg.set_body(data, size);
+ msg.add_route()->set_mq_id(&src_id, sizeof(src_id));
+ return msg;
}
BHMsg MakeReply(const void *data, const size_t size)
{
- assert(data && size);
- BHMsg msg(InitMsg(kMsgTypeReply));
- msg.set_body(data, size);
- return msg;
+ assert(data && size);
+ BHMsg msg(InitMsg(kMsgTypeReply));
+ msg.set_body(data, size);
+ return msg;
}
BHMsg MakeSubUnsub(const MQId &client, const std::vector<std::string> &topics, const MsgType sub_unsub)
{
- assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
- BHMsg msg(InitMsg(sub_unsub));
- msg.add_route()->set_mq_id(&client, sizeof(client));
- DataSub subs;
- for (auto &t : topics) {
- subs.add_topics(t);
- }
- msg.set_body(subs.SerializeAsString());
- return msg;
+ assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
+ BHMsg msg(InitMsg(sub_unsub));
+ msg.add_route()->set_mq_id(&client, sizeof(client));
+ DataSub subs;
+ for (auto &t : topics) {
+ subs.add_topics(t);
+ }
+ msg.set_body(subs.SerializeAsString());
+ return msg;
}
BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeSubscribe); }
@@ -67,77 +68,77 @@
BHMsg MakePub(const std::string &topic, const void *data, const size_t size)
{
- assert(data && size);
- BHMsg msg(InitMsg(kMsgTypePublish));
- DataPub pub;
- pub.set_topic(topic);
- pub.set_data(data, size);
- msg.set_body(pub.SerializeAsString());
- return msg;
+ assert(data && size);
+ BHMsg msg(InitMsg(kMsgTypePublish));
+ DataPub pub;
+ pub.set_topic(topic);
+ pub.set_data(data, size);
+ msg.set_body(pub.SerializeAsString());
+ return msg;
}
void *Pack(SharedMemory &shm, const BHMsg &msg)
{
- uint32_t msg_size = msg.ByteSizeLong();
- void *p = shm.Alloc(4 + msg_size);
- if(p) {
- Put32(p, msg_size);
- if (!msg.SerializeToArray(static_cast<char*>(p) + kMsgPrefixLen, msg_size)) {
- shm.Dealloc(p);
- p = 0;
- }
- }
- return p;
+ uint32_t msg_size = msg.ByteSizeLong();
+ void *p = shm.Alloc(4 + msg_size);
+ if (p) {
+ Put32(p, msg_size);
+ if (!msg.SerializeToArray(static_cast<char *>(p) + kMsgPrefixLen, msg_size)) {
+ shm.Dealloc(p);
+ p = 0;
+ }
+ }
+ return p;
}
bool MsgI::Unpack(BHMsg &msg) const
{
- void *p = ptr_.get();
- assert(p);
- uint32_t msg_size = Get32(p);
- return msg.ParseFromArray(static_cast<char*>(p) + kMsgPrefixLen, msg_size);
+ void *p = ptr_.get();
+ assert(p);
+ uint32_t msg_size = Get32(p);
+ return msg.ParseFromArray(static_cast<char *>(p) + kMsgPrefixLen, msg_size);
}
// with ref count;
bool MsgI::MakeRC(SharedMemory &shm, const BHMsg &msg)
{
- void *p = Pack(shm, msg);
- if(!p) {
- return false;
- }
- RefCount *rc = shm.New<RefCount>();
- if (!rc) {
- shm.Dealloc(p);
- return false;
- }
- MsgI(p, rc).swap(*this);
- return true;
+ void *p = Pack(shm, msg);
+ if (!p) {
+ return false;
+ }
+ RefCount *rc = shm.New<RefCount>();
+ if (!rc) {
+ shm.Dealloc(p);
+ return false;
+ }
+ MsgI(p, rc).swap(*this);
+ return true;
}
bool MsgI::Make(SharedMemory &shm, const BHMsg &msg)
{
- void *p = Pack(shm, msg);
- if(!p) {
- return false;
- }
- MsgI(p, 0).swap(*this);
- return true;
+ void *p = Pack(shm, msg);
+ if (!p) {
+ return false;
+ }
+ MsgI(p, 0).swap(*this);
+ return true;
}
-int MsgI::Release(SharedMemory &shm)
+int MsgI::Release(SharedMemory &shm)
{
- if (IsCounted()) {
- const int n = count_->Dec();
- if (n != 0) {
- return n;
- }
- }
- // free data
- shm.Dealloc(ptr_);
- ptr_ = 0;
- shm.Delete(count_);
- count_ = 0;
- return 0;
+ if (IsCounted()) {
+ const int n = count_->Dec();
+ if (n != 0) {
+ return n;
+ }
+ }
+ // free data
+ shm.Dealloc(ptr_);
+ ptr_ = 0;
+ shm.Delete(count_);
+ count_ = 0;
+ return 0;
}
} // namespace bhome_msg
diff --git a/src/msg.h b/src/msg.h
index f3fe726..2154eba 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -18,15 +18,16 @@
#ifndef MSG_5BILLZET
#define MSG_5BILLZET
-#include <stdint.h>
+#include "bhome_msg.pb.h"
#include "shm.h"
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/uuid/uuid_generators.hpp>
-#include "bhome_msg.pb.h"
+#include <stdint.h>
-namespace bhome_msg {
- using namespace bhome_shm;
- using namespace bhome::msg; // for serialized data in MsgI
+namespace bhome_msg
+{
+using namespace bhome_shm;
+using namespace bhome::msg; // for serialized data in MsgI
// MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required.
// message format: header(meta) + body(data).
@@ -37,49 +38,67 @@
class RefCount : private boost::noncopyable
{
public:
- int Inc() { Guard lk(mutex_); return ++num_; }
- int Dec() { Guard lk(mutex_); return --num_; }
- int Get() { Guard lk(mutex_); return num_; }
+ int Inc()
+ {
+ Guard lk(mutex_);
+ return ++num_;
+ }
+ int Dec()
+ {
+ Guard lk(mutex_);
+ return --num_;
+ }
+ int Get()
+ {
+ Guard lk(mutex_);
+ return num_;
+ }
+
private:
- Mutex mutex_;
- int num_ = 1;
+ Mutex mutex_;
+ int num_ = 1;
};
BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size);
BHMsg MakeReply(const void *data, const size_t size);
-BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics);
-BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics);
+BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics);
+BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics);
BHMsg MakePub(const std::string &topic, const void *data, const size_t size);
-class MsgI {
+class MsgI
+{
private:
- offset_ptr<void> ptr_;
- offset_ptr<RefCount> count_;
+ offset_ptr<void> ptr_;
+ offset_ptr<RefCount> count_;
- bool BuildSubOrUnsub(SharedMemory &shm, const std::vector<std::string> &topics, const MsgType sub_unsub);
+ bool BuildSubOrUnsub(SharedMemory &shm, const std::vector<std::string> &topics, const MsgType sub_unsub);
+
public:
- MsgI(void *p=0, RefCount *c=0):ptr_(p), count_(c) {}
- void swap(MsgI &a) { std::swap(ptr_, a.ptr_); std::swap(count_, a.count_); }
- template <class T = void> T *get() { return static_cast<T*>(ptr_.get()); }
+ MsgI(void *p = 0, RefCount *c = 0) :
+ ptr_(p), count_(c) {}
- // AddRef and Release works for both counted and not counted msg.
- int AddRef() const { return IsCounted() ? count_->Inc() : 1; }
- int Release(SharedMemory &shm);
+ void swap(MsgI &a)
+ {
+ std::swap(ptr_, a.ptr_);
+ std::swap(count_, a.count_);
+ }
+ template <class T = void>
+ T *get() { return static_cast<T *>(ptr_.get()); }
- int Count() const{ return IsCounted() ? count_->Get() : 1; }
+ // AddRef and Release works for both counted and not counted msg.
+ int AddRef() const { return IsCounted() ? count_->Inc() : 1; }
+ int Release(SharedMemory &shm);
- bool IsCounted() const { return static_cast<bool>(count_); }
+ int Count() const { return IsCounted() ? count_->Get() : 1; }
+ bool IsCounted() const { return static_cast<bool>(count_); }
- bool Make(SharedMemory &shm, const BHMsg &msg);
- bool MakeRC(SharedMemory &shm, const BHMsg &msg);
- bool Unpack(BHMsg &msg) const;
+ bool Make(SharedMemory &shm, const BHMsg &msg);
+ bool MakeRC(SharedMemory &shm, const BHMsg &msg);
+ bool Unpack(BHMsg &msg) const;
};
inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); }
-
} // namespace bhome_msg
-
-
#endif // end of include guard: MSG_5BILLZET
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index a0dc4e9..d5c7dd2 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -16,170 +16,170 @@
* =====================================================================================
*/
#include "pubsub.h"
-#include <chrono>
#include "bh_util.h"
#include "defs.h"
+#include <chrono>
-namespace bhome_shm {
+namespace bhome_shm
+{
using namespace std::chrono_literals;
const int kMaxWorker = 16;
using namespace bhome_msg;
-BusManager::BusManager(SharedMemory &shm):
-shm_(shm),
-busq_(kBHBusQueueId, shm, 16),
-run_(false)
+BusManager::BusManager(SharedMemory &shm) :
+ shm_(shm),
+ busq_(kBHBusQueueId, shm, 16),
+ run_(false)
{
}
-
+
BusManager::~BusManager()
{
- Stop();
+ Stop();
}
bool BusManager::Start(const int nworker)
{
- std::lock_guard<std::mutex> guard(mutex_);
- StopNoLock();
- // start
- auto Worker = [&](){
- while (this->run_) {
- BusManager &self = *this;
- MsgI msg;
- const int timeout_ms = 100;
- if (self.busq_.Recv(msg, timeout_ms)) {
- self.OnMsg(msg);
- }
- }
- };
+ std::lock_guard<std::mutex> guard(mutex_);
+ StopNoLock();
+ // start
+ auto Worker = [&]() {
+ while (this->run_) {
+ BusManager &self = *this;
+ MsgI msg;
+ const int timeout_ms = 100;
+ if (self.busq_.Recv(msg, timeout_ms)) {
+ self.OnMsg(msg);
+ }
+ }
+ };
- run_.store(true);
- const int n = std::min(nworker, kMaxWorker);
- for (int i = 0; i < n; ++i) {
- workers_.emplace_back(Worker);
- }
- return true;
+ run_.store(true);
+ const int n = std::min(nworker, kMaxWorker);
+ for (int i = 0; i < n; ++i) {
+ workers_.emplace_back(Worker);
+ }
+ return true;
}
bool BusManager::Stop()
{
- std::lock_guard<std::mutex> guard(mutex_);
- return StopNoLock();
+ std::lock_guard<std::mutex> guard(mutex_);
+ return StopNoLock();
}
bool BusManager::StopNoLock()
{
- if (run_.exchange(false)) {
- for (auto &w: workers_) {
- if (w.joinable()) {
- w.join();
- }
- }
- return true;
- }
- return false;
+ if (run_.exchange(false)) {
+ for (auto &w : workers_) {
+ if (w.joinable()) {
+ w.join();
+ }
+ }
+ return true;
+ }
+ return false;
}
void BusManager::OnMsg(MsgI &imsg)
{
- DEFER1(imsg.Release(shm_));
+ DEFER1(imsg.Release(shm_));
- BHMsg msg;
- if (!imsg.Unpack(msg)) {
- return;
- }
+ BHMsg msg;
+ if (!imsg.Unpack(msg)) {
+ return;
+ }
- auto OnSubChange = [&](auto &&update) {
- DataSub sub;
- if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
- assert(sizeof(MQId) == msg.route(0).mq_id().size());
- MQId client;
- memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
+ auto OnSubChange = [&](auto &&update) {
+ DataSub sub;
+ if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
+ assert(sizeof(MQId) == msg.route(0).mq_id().size());
+ MQId client;
+ memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
- std::lock_guard<std::mutex> guard(mutex_);
- auto &topics = sub.topics();
- for (auto &topic : topics) {
- try {
- update(topic, client);
- } catch(...) {
- //TODO log error
- }
- }
- }
- };
+ std::lock_guard<std::mutex> guard(mutex_);
+ auto &topics = sub.topics();
+ for (auto &topic : topics) {
+ try {
+ update(topic, client);
+ } catch (...) {
+ //TODO log error
+ }
+ }
+ }
+ };
- auto Sub1 = [this](const std::string &topic, const MQId &id) {
- records_[topic].insert(id);
- };
+ auto Sub1 = [this](const std::string &topic, const MQId &id) {
+ records_[topic].insert(id);
+ };
- auto Unsub1 = [this](const std::string &topic, const MQId &id) {
- auto pos = records_.find(topic);
- if (pos != records_.end()) {
- if (pos->second.erase(id) && pos->second.empty()) {
- records_.erase(pos);
- }
- }
- };
+ auto Unsub1 = [this](const std::string &topic, const MQId &id) {
+ auto pos = records_.find(topic);
+ if (pos != records_.end()) {
+ if (pos->second.erase(id) && pos->second.empty()) {
+ records_.erase(pos);
+ }
+ }
+ };
- auto OnPublish = [&]() {
- DataPub pub;
- if (!pub.ParseFromString(msg.body())) {
- return;
- }
- auto FindClients = [&](const std::string &topic){
- Clients dests;
- std::lock_guard<std::mutex> guard(mutex_);
- auto Find1 = [&](const std::string &t) {
- auto pos = records_.find(topic);
- if (pos != records_.end() && !pos->second.empty()) {
- auto &clients = pos->second;
- for (auto &cli : clients) {
- dests.insert(cli);
- }
- }
- };
- Find1(topic);
+ auto OnPublish = [&]() {
+ DataPub pub;
+ if (!pub.ParseFromString(msg.body())) {
+ return;
+ }
+ auto FindClients = [&](const std::string &topic) {
+ Clients dests;
+ std::lock_guard<std::mutex> guard(mutex_);
+ auto Find1 = [&](const std::string &t) {
+ auto pos = records_.find(topic);
+ if (pos != records_.end() && !pos->second.empty()) {
+ auto &clients = pos->second;
+ for (auto &cli : clients) {
+ dests.insert(cli);
+ }
+ }
+ };
+ Find1(topic);
- //TODO check and adjust topic on client side sub/pub.
- size_t pos = 0;
- while (true) {
- pos = topic.find(kTopicSep, pos);
- if (pos == topic.npos || ++pos == topic.size()) {
- // Find1(std::string()); // sub all.
- break;
- } else {
- Find1(topic.substr(0, pos));
- }
- }
- return dests;
- };
+ //TODO check and adjust topic on client side sub/pub.
+ size_t pos = 0;
+ while (true) {
+ pos = topic.find(kTopicSep, pos);
+ if (pos == topic.npos || ++pos == topic.size()) {
+ // Find1(std::string()); // sub all.
+ break;
+ } else {
+ Find1(topic.substr(0, pos));
+ }
+ }
+ return dests;
+ };
- auto Dispatch = [&](auto &&send1) {
- const Clients &clients(FindClients(pub.topic()));
- for (auto &cli : clients) {
- send1(cli);
- }
- };
+ auto Dispatch = [&](auto &&send1) {
+ const Clients &clients(FindClients(pub.topic()));
+ for (auto &cli : clients) {
+ send1(cli);
+ }
+ };
- if (imsg.IsCounted()) {
- Dispatch([&](const MQId &cli) { busq_.Send(cli, imsg, 100); });
- } else {
- MsgI pubmsg;
- if (!pubmsg.MakeRC(shm_, msg)) { return; }
- DEFER1(pubmsg.Release(shm_));
+ if (imsg.IsCounted()) {
+ Dispatch([&](const MQId &cli) { busq_.Send(cli, imsg, 100); });
+ } else {
+ MsgI pubmsg;
+ if (!pubmsg.MakeRC(shm_, msg)) { return; }
+ DEFER1(pubmsg.Release(shm_));
- Dispatch([&](const MQId &cli) { busq_.Send(cli, pubmsg, 100); });
- }
- };
+ Dispatch([&](const MQId &cli) { busq_.Send(cli, pubmsg, 100); });
+ }
+ };
- switch (msg.type()) {
- case kMsgTypeSubscribe: OnSubChange(Sub1); break;
- case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
- case kMsgTypePublish : OnPublish(); break;
- default: break;
- }
+ switch (msg.type()) {
+ case kMsgTypeSubscribe: OnSubChange(Sub1); break;
+ case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
+ case kMsgTypePublish: OnPublish(); break;
+ default: break;
+ }
}
} // namespace bhome_shm
-
diff --git a/src/pubsub.h b/src/pubsub.h
index 11fa4e4..dc3fced 100644
--- a/src/pubsub.h
+++ b/src/pubsub.h
@@ -19,35 +19,36 @@
#define PUBSUB_4KGRA997
#include "shm_queue.h"
-#include <thread>
#include <atomic>
#include <mutex>
-#include <vector>
-#include <unordered_map>
#include <set>
+#include <thread>
+#include <unordered_map>
+#include <vector>
-namespace bhome_shm {
+namespace bhome_shm
+{
// publish/subcribe manager.
class BusManager
{
- SharedMemory &shm_;
- ShmMsgQueue busq_;
- std::atomic<bool> run_;
- std::vector<std::thread> workers_;
- std::mutex mutex_;
- typedef std::set<MQId> Clients;
- std::unordered_map<std::string, Clients> records_;
+ SharedMemory &shm_;
+ ShmMsgQueue busq_;
+ std::atomic<bool> run_;
+ std::vector<std::thread> workers_;
+ std::mutex mutex_;
+ typedef std::set<MQId> Clients;
+ std::unordered_map<std::string, Clients> records_;
- bool StopNoLock();
- void OnMsg(MsgI &msg);
+ bool StopNoLock();
+ void OnMsg(MsgI &msg);
+
public:
- BusManager(SharedMemory &shm);
- ~BusManager();
- bool Start(const int nworker = 2);
- bool Stop();
+ BusManager(SharedMemory &shm);
+ ~BusManager();
+ bool Start(const int nworker = 2);
+ bool Stop();
};
-
} // namespace bhome_shm
diff --git a/src/shm.cpp b/src/shm.cpp
index d628b56..1658900 100644
--- a/src/shm.cpp
+++ b/src/shm.cpp
@@ -21,9 +21,9 @@
namespace bhome_shm
{
-SharedMemory::SharedMemory(const std::string &name, const uint64_t size)
- : mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()),
- name_(name)
+SharedMemory::SharedMemory(const std::string &name, const uint64_t size) :
+ mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()),
+ name_(name)
{
}
diff --git a/src/shm.h b/src/shm.h
index 0f68754..3fce99d 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -19,14 +19,15 @@
#ifndef SHM_6CHO6D6C
#define SHM_6CHO6D6C
+#include <boost/interprocess/managed_shared_memory.hpp>
+#include <boost/interprocess/sync/interprocess_condition.hpp>
+#include <boost/interprocess/sync/interprocess_mutex.hpp>
+#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/noncopyable.hpp>
#include <boost/uuid/uuid.hpp>
-#include <boost/interprocess/managed_shared_memory.hpp>
-#include <boost/interprocess/sync/interprocess_mutex.hpp>
-#include <boost/interprocess/sync/interprocess_condition.hpp>
-#include <boost/interprocess/sync/scoped_lock.hpp>
-namespace bhome_shm {
+namespace bhome_shm
+{
using namespace boost::interprocess;
@@ -37,73 +38,90 @@
class SharedMemory : public mshm_t
{
- std::string name_;
+ std::string name_;
- static permissions AllowAll() {
- permissions perm;
- perm.set_unrestricted();
- return perm;
- }
- void Swap(SharedMemory &a);
+ static permissions AllowAll()
+ {
+ permissions perm;
+ perm.set_unrestricted();
+ return perm;
+ }
+ void Swap(SharedMemory &a);
+
public:
- static bool Remove(const std::string &name) {
- return shared_memory_object::remove(name.c_str());
- }
- SharedMemory(const std::string &name, const uint64_t size);
- ~SharedMemory();
- std::string name() const { return name_; }
- bool Remove() { return Remove(name()); }
+ static bool Remove(const std::string &name) { return shared_memory_object::remove(name.c_str()); }
- void *Alloc(const size_t size) { return allocate(size, std::nothrow); }
- void Dealloc(void *p) { if(p) { deallocate(p); } }
- template<class T> void Dealloc(offset_ptr<T> ptr) { return Dealloc(ptr.get()); }
+ SharedMemory(const std::string &name, const uint64_t size);
+ ~SharedMemory();
+ std::string name() const { return name_; }
+ bool Remove() { return Remove(name()); }
- template <class T, class ...Params> T * New(Params const&...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); }
- template <class T> void Delete(T *p) { if (p) { destroy_ptr<T>(p); }; }
- template <class T> void Delete(offset_ptr<T> p) { Delete(p.get()); }
- template <class T> T *Find(const std::string &name) { return find<T>(name.c_str()).first; }
+ void *Alloc(const size_t size) { return allocate(size, std::nothrow); }
+ void Dealloc(void *p)
+ {
+ if (p) { deallocate(p); }
+ }
+ template <class T>
+ void Dealloc(offset_ptr<T> ptr) { return Dealloc(ptr.get()); }
+ template <class T, class... Params>
+ T *New(Params const &...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); }
+ template <class T>
+ void Delete(T *p)
+ {
+ if (p) { destroy_ptr<T>(p); };
+ }
+ template <class T>
+ void Delete(offset_ptr<T> p) { Delete(p.get()); }
+ template <class T>
+ T *Find(const std::string &name) { return find<T>(name.c_str()).first; }
};
// ShmObject manages an object in shared memory, but ShmObject itself is not in shared memory.
// works like a smart pointer of an object in shared memory.
// TODO handshake with center, and can be removed if killed.
template <class T>
-class ShmObject : private boost::noncopyable {
- static std::string ObjName(const std::string &name) { return "obj" + name; }
-protected:
- typedef T Data;
- typedef SharedMemory ShmType;
-private:
- ShmType &shm_;
- std::string name_;
- Data *pdata_ = nullptr;
+class ShmObject : private boost::noncopyable
+{
+ static std::string ObjName(const std::string &name) { return "obj" + name; }
- bool IsOk() const { return pdata_; }
protected:
- ShmType &shm() const { return shm_; }
+ typedef T Data;
+ typedef SharedMemory ShmType;
+
+private:
+ ShmType &shm_;
+ std::string name_;
+ Data *pdata_ = nullptr;
+
+ bool IsOk() const { return pdata_; }
+
+protected:
+ ShmType &shm() const { return shm_; }
+
public:
- template <class...Params>
- ShmObject(ShmType &segment, const std::string &name, Params&&...t):
- shm_(segment), name_(name)
- {
- pdata_ = shm_.find_or_construct<Data>(ObjName(name_).c_str(), std::nothrow)(t...);
- if (!IsOk()) {
- throw("Error: Not enough memory, can not allocate \"" + name_ + "\"");
- }
- }
- static Data *Find(SharedMemory &shm, const std::string &name) { return shm.Find<Data>(ObjName(name)); }
- Data *Find(const std::string &name) { return Find(shm_, ObjName(name)); }
- virtual ~ShmObject() {}
- std::string name() const { return name_; }
- Data* data() { return pdata_; }
- const Data* data() const { return pdata_; }
- Data* operator->() { return data(); }
- const Data* operator->() const { return data(); }
- bool Remove() { return shm_.destroy<Data>(ObjName(name_).c_str()); }
+ template <class... Params>
+ ShmObject(ShmType &segment, const std::string &name, Params &&...t) :
+ shm_(segment), name_(name)
+ {
+ pdata_ = shm_.find_or_construct<Data>(ObjName(name_).c_str(), std::nothrow)(t...);
+ if (!IsOk()) {
+ throw("Error: Not enough memory, can not allocate \"" + name_ + "\"");
+ }
+ }
+ static Data *Find(SharedMemory &shm, const std::string &name) { return shm.Find<Data>(ObjName(name)); }
+ Data *Find(const std::string &name) { return Find(shm_, ObjName(name)); }
+ virtual ~ShmObject() {}
+ std::string name() const { return name_; }
+ Data *data() { return pdata_; }
+ const Data *data() const { return pdata_; }
+ Data *operator->() { return data(); }
+ const Data *operator->() const { return data(); }
+ bool Remove() { return shm_.destroy<Data>(ObjName(name_).c_str()); }
};
-template <class D> using Allocator = allocator<D, SharedMemory::segment_manager>;
+template <class D>
+using Allocator = allocator<D, SharedMemory::segment_manager>;
} // namespace bhome_shm
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index 421cebf..cf4c8b4 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -17,52 +17,57 @@
*/
#include "shm_queue.h"
-#include <boost/uuid/uuid_io.hpp>
-#include <boost/uuid/uuid_generators.hpp>
#include "bh_util.h"
+#include <boost/uuid/uuid_generators.hpp>
+#include <boost/uuid/uuid_io.hpp>
-namespace bhome_shm {
-using namespace bhome_msg;
+namespace bhome_shm
+{
+using namespace bhome_msg;
using namespace boost::interprocess;
using namespace boost::uuids;
-namespace {
-std::string MsgQIdToName(const MQId& id) { return "shmq" + to_string(id); }
+namespace
+{
+std::string MsgQIdToName(const MQId &id) { return "shmq" + to_string(id); }
// MQId EmptyId() { return nil_uuid(); }
MQId NewId() { return random_generator()(); }
-const int AdjustMQLength(const int len) {
- const int kMaxLength = 10000;
- const int kDefaultLen = 12;
- if (len <= 0) {
- return kDefaultLen;
- } else if (len < kMaxLength) {
- return len;
- } else {
- return kMaxLength;
- }
+const int AdjustMQLength(const int len)
+{
+ const int kMaxLength = 10000;
+ const int kDefaultLen = 12;
+ if (len <= 0) {
+ return kDefaultLen;
+ } else if (len < kMaxLength) {
+ return len;
+ } else {
+ return kMaxLength;
+ }
}
-}
+} // namespace
// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
-ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const int len):
-Super(segment, MsgQIdToName(id), AdjustMQLength(len), segment.get_segment_manager()),
-id_(id)
-{}
+ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const int len) :
+ Super(segment, MsgQIdToName(id), AdjustMQLength(len), segment.get_segment_manager()),
+ id_(id)
+{
+}
-ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len):
-ShmMsgQueue(NewId(), segment, len)
-{}
+ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
+ ShmMsgQueue(NewId(), segment, len)
+{
+}
ShmMsgQueue::~ShmMsgQueue()
{
- Remove();
+ Remove();
}
bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
{
- Queue *remote = Find(shm, MsgQIdToName(remote_id));
- return remote && remote->Write(msg, timeout_ms, [](const MsgI&msg){msg.AddRef();});
+ Queue *remote = Find(shm, MsgQIdToName(remote_id));
+ return remote && remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); });
}
// bool ShmMsgQueue::Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
@@ -73,15 +78,15 @@
bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms)
{
- MsgI msg;
- if(msg.Make(shm(), data)) {
- if(Send(remote_id, msg, timeout_ms)) {
- return true;
- } else {
- msg.Release(shm());
- }
- }
- return false;
+ MsgI msg;
+ if (msg.Make(shm(), data)) {
+ if (Send(remote_id, msg, timeout_ms)) {
+ return true;
+ } else {
+ msg.Release(shm());
+ }
+ }
+ return false;
}
/*
@@ -105,13 +110,13 @@
//*/
bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms)
{
- MsgI imsg;
- if (Read(imsg, timeout_ms)) {
- DEFER1(imsg.Release(shm()););
- return imsg.Unpack(msg);
- } else {
- return false;
- }
+ MsgI imsg;
+ if (Read(imsg, timeout_ms)) {
+ DEFER1(imsg.Release(shm()););
+ return imsg.Unpack(msg);
+ } else {
+ return false;
+ }
}
} // namespace bhome_shm
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 60b1862..9064f55 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -19,111 +19,125 @@
#ifndef SHM_QUEUE_JE0OEUP3
#define SHM_QUEUE_JE0OEUP3
-#include "shm.h"
#include "msg.h"
+#include "shm.h"
#include <boost/circular_buffer.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
-namespace bhome_shm {
-
-template <class D> using Circular = boost::circular_buffer<D, Allocator<D> >;
+namespace bhome_shm
+{
+
+template <class D>
+using Circular = boost::circular_buffer<D, Allocator<D>>;
typedef boost::uuids::uuid MQId;
template <class D>
class SharedQueue : private Circular<D>
{
- typedef Circular<D> Super;
- Mutex mutex_;
- Cond cond_read_;
- Cond cond_write_;
- Mutex & mutex() { return mutex_; }
+ typedef Circular<D> Super;
+ Mutex mutex_;
+ Cond cond_read_;
+ Cond cond_write_;
+ Mutex &mutex() { return mutex_; }
- static boost::posix_time::ptime MSFromNow(const int ms)
- {
- using namespace boost::posix_time;
- ptime cur = boost::posix_time::microsec_clock::universal_time();
- return cur + millisec(ms);
- }
+ static boost::posix_time::ptime MSFromNow(const int ms)
+ {
+ using namespace boost::posix_time;
+ ptime cur = boost::posix_time::microsec_clock::universal_time();
+ return cur + millisec(ms);
+ }
public:
- SharedQueue(const uint32_t len, Allocator<D> const& alloc):Super(len, alloc) {}
- using Super::size;
- using Super::capacity;
- template <class Iter, class OnWrite>
- int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite) {
- int n = 0;
- if (begin != end) {
- auto endtime = MSFromNow(timeout_ms);
- Guard lock(mutex());
- while (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); })) {
- onWrite(*begin);
- this->push_back(*begin);
- ++n;
- cond_read_.notify_one();
- if (++begin == end) {
- break;
- }
- }
- }
- return n;
- }
- template <class OnWrite>
- bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) {
- return Write(&buf, (&buf)+1, timeout_ms, onWrite);
- }
- bool Write(const D &buf, const int timeout_ms) { return Write(buf, timeout_ms, [](const D &buf){}); }
+ SharedQueue(const uint32_t len, Allocator<D> const &alloc) :
+ Super(len, alloc) {}
+ using Super::capacity;
+ using Super::size;
+ template <class Iter, class OnWrite>
+ int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite)
+ {
+ int n = 0;
+ if (begin != end) {
+ auto endtime = MSFromNow(timeout_ms);
+ Guard lock(mutex());
+ while (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); })) {
+ onWrite(*begin);
+ this->push_back(*begin);
+ ++n;
+ cond_read_.notify_one();
+ if (++begin == end) {
+ break;
+ }
+ }
+ }
+ return n;
+ }
- template <class OnData>
- bool Read(const int timeout_ms, OnData onData){
- int n = 0;
- auto endtime = MSFromNow(timeout_ms);
- Guard lock(mutex());
- while (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })) {
- const bool more = onData(this->front());
- this->pop_front();
- cond_write_.notify_one();
- ++n;
- if (!more) {
- break;
- }
- }
- return n;
- }
- bool Read(D &buf, const int timeout_ms){
- auto read1 = [&](D &d) {
- using std::swap;
- swap(buf, d);
- return false;
- };
- return Read(timeout_ms, read1) == 1;
- }
+ template <class OnWrite>
+ bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite)
+ {
+ return Write(&buf, (&buf) + 1, timeout_ms, onWrite);
+ }
+ bool Write(const D &buf, const int timeout_ms)
+ {
+ return Write(buf, timeout_ms, [](const D &buf) {});
+ }
+
+ template <class OnData>
+ bool Read(const int timeout_ms, OnData onData)
+ {
+ int n = 0;
+ auto endtime = MSFromNow(timeout_ms);
+ Guard lock(mutex());
+ while (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })) {
+ const bool more = onData(this->front());
+ this->pop_front();
+ cond_write_.notify_one();
+ ++n;
+ if (!more) {
+ break;
+ }
+ }
+ return n;
+ }
+
+ bool Read(D &buf, const int timeout_ms)
+ {
+ auto read1 = [&](D &d) {
+ using std::swap;
+ swap(buf, d);
+ return false;
+ };
+ return Read(timeout_ms, read1) == 1;
+ }
};
using namespace bhome_msg;
-class ShmMsgQueue : private ShmObject<SharedQueue<MsgI> >
+class ShmMsgQueue : private ShmObject<SharedQueue<MsgI>>
{
- typedef ShmObject<SharedQueue<MsgI> > Super;
- typedef Super::Data Queue;
- bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
- bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
- MQId id_;
-protected:
- ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use.
-public:
- ShmMsgQueue(const MQId &id, ShmType &segment, const int len);
- ShmMsgQueue(ShmType &segment, const int len);
- ~ShmMsgQueue();
- const MQId &Id() const { return id_; }
+ typedef ShmObject<SharedQueue<MsgI>> Super;
+ typedef Super::Data Queue;
+ bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
+ bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
+ MQId id_;
- bool Recv(BHMsg &msg, const int timeout_ms);
- bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
- bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms);
- static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
- bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms) {
- return Send(shm(), remote_id, msg, timeout_ms);
- }
+protected:
+ ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use.
+public:
+ ShmMsgQueue(const MQId &id, ShmType &segment, const int len);
+ ShmMsgQueue(ShmType &segment, const int len);
+ ~ShmMsgQueue();
+ const MQId &Id() const { return id_; }
+
+ bool Recv(BHMsg &msg, const int timeout_ms);
+ bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
+ bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms);
+ static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
+ bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
+ {
+ return Send(shm(), remote_id, msg, timeout_ms);
+ }
};
} // namespace bhome_shm
diff --git a/src/socket.cpp b/src/socket.cpp
index 1c28bfa..21928b8 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -17,118 +17,118 @@
*/
#include "socket.h"
-#include <chrono>
-#include "msg.h"
-#include "defs.h"
#include "bh_util.h"
+#include "defs.h"
+#include "msg.h"
+#include <chrono>
using namespace bhome_msg;
using namespace bhome_shm;
using namespace std::chrono_literals;
-namespace {
-
-int GetSocketDefaultLen(ShmSocket::Type type)
+namespace
{
- switch (type) {
- case ShmSocket::eSockRequest : return 12;
- case ShmSocket::eSockReply : return 64;
- case ShmSocket::eSockPublish : return 0;
- case ShmSocket::eSockSubscribe : return 64;
- default: return 0;
- }
-}
-
-}
-
-ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm)
- : shm_(shm),
- type_(type),
- run_(false)
+int GetSocketDefaultLen(ShmSocket::Type type)
{
- int len = GetSocketDefaultLen(type);
- if (len != 0) {
- mq_.reset(new Queue(shm_, len));
-
- auto RecvProc = [this](){
- while (run_) {
- try {
- std::unique_lock<std::mutex> lk(mutex_);
- if (cv_recv_cb_.wait_for(lk, 100ms, [this](){ return HasRecvCB(); })) {
- BHMsg msg;
- if (mq_->Recv(msg, 100)) {
- this->onRecv_(msg);
- }
- }
- } catch (...) {}
- }
- };
- run_.store(true);
- workers_.emplace_back(RecvProc);
- }
+ switch (type) {
+ case ShmSocket::eSockRequest: return 12;
+ case ShmSocket::eSockReply: return 64;
+ case ShmSocket::eSockPublish: return 0;
+ case ShmSocket::eSockSubscribe: return 64;
+ default: return 0;
+ }
}
-ShmSocket::ShmSocket(Type type)
- : ShmSocket(type, BHomeShm())
+
+} // namespace
+
+ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) :
+ shm_(shm), type_(type), run_(false)
+{
+ int len = GetSocketDefaultLen(type);
+ if (len != 0) {
+ mq_.reset(new Queue(shm_, len));
+
+ auto RecvProc = [this]() {
+ while (run_) {
+ try {
+ std::unique_lock<std::mutex> lk(mutex_);
+ if (cv_recv_cb_.wait_for(lk, 100ms, [this]() { return HasRecvCB(); })) {
+ BHMsg msg;
+ if (mq_->Recv(msg, 100)) {
+ this->onRecv_(msg);
+ }
+ }
+ } catch (...) {
+ }
+ }
+ };
+ run_.store(true);
+ workers_.emplace_back(RecvProc);
+ }
+}
+
+ShmSocket::ShmSocket(Type type) :
+ ShmSocket(type, BHomeShm())
{
}
ShmSocket::~ShmSocket()
{
- Stop();
+ Stop();
}
bool ShmSocket::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
{
- if (type_ != eSockPublish) {
- return false;
- }
- assert(!mq_);
- try {
- MsgI imsg;
- if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) {
- return false;
- }
- DEFER1(imsg.Release(shm_));
- return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms);
-
- } catch (...) {
- return false;
- }
+ if (type_ != eSockPublish) {
+ return false;
+ }
+ assert(!mq_);
+ try {
+ MsgI imsg;
+ if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) {
+ return false;
+ }
+ DEFER1(imsg.Release(shm_));
+ return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms);
+
+ } catch (...) {
+ return false;
+ }
}
bool ShmSocket::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
{
- if (type_ != eSockSubscribe) {
- return false;
- }
- assert(mq_);
- try {
- return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms);
- } catch (...) {
- return false;
- }
+ if (type_ != eSockSubscribe) {
+ return false;
+ }
+ assert(mq_);
+ try {
+ return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms);
+ } catch (...) {
+ return false;
+ }
}
bool ShmSocket::SetRecvCallback(const RecvCB &onRecv)
{
- std::lock_guard<std::mutex> lock(mutex_);
- onRecv_ = onRecv;
- cv_recv_cb_.notify_one();
- return true;
+ std::lock_guard<std::mutex> lock(mutex_);
+ onRecv_ = onRecv;
+ cv_recv_cb_.notify_one();
+ return true;
}
bool ShmSocket::HasRecvCB()
{
- return static_cast<bool>(onRecv_);
+ return static_cast<bool>(onRecv_);
}
void ShmSocket::Stop()
{
- run_ = false;
- for (auto &t : workers_) {
- if (t.joinable()) {
- t.join();
- }
- }
+ run_ = false;
+ for (auto &t : workers_) {
+ if (t.joinable()) {
+ t.join();
+ }
+ }
}
\ No newline at end of file
diff --git a/src/socket.h b/src/socket.h
index e65ac83..92c1b73 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -20,55 +20,56 @@
#define SOCKET_GWTJHBPO
#include "shm_queue.h"
-#include <vector>
-#include <thread>
-#include <memory>
-#include <functional>
-#include <mutex>
-#include <condition_variable>
#include <atomic>
+#include <condition_variable>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <vector>
class ShmSocket
{
- typedef bhome_shm::ShmMsgQueue Queue;
+ typedef bhome_shm::ShmMsgQueue Queue;
+
public:
- enum Type {
- eSockRequest,
- eSockReply,
- eSockSubscribe,
- eSockPublish,
- };
- typedef std::function<void (bhome_msg::BHMsg &msg)> RecvCB;
+ enum Type {
+ eSockRequest,
+ eSockReply,
+ eSockSubscribe,
+ eSockPublish,
+ };
+ typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB;
- ShmSocket(Type type);
- ShmSocket(Type type, bhome_shm::SharedMemory &shm);
- ~ShmSocket();
+ ShmSocket(Type type);
+ ShmSocket(Type type, bhome_shm::SharedMemory &shm);
+ ~ShmSocket();
- // bool Request(const std::string &topic, const void *data, const size_t size, onReply);
- bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv
+ // bool Request(const std::string &topic, const void *data, const size_t size, onReply);
+ bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv
- // bool HandleRequest(onData);
- bool ReadRequest(); // exclude with HandleRequest
- bool SendReply(); // exclude with HandleRequest
+ // bool HandleRequest(onData);
+ bool ReadRequest(); // exclude with HandleRequest
+ bool SendReply(); // exclude with HandleRequest
- bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
- bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
- bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
- bool SetRecvCallback(const RecvCB &onRecv);
+ bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
+ bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
+ bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
+ bool SetRecvCallback(const RecvCB &onRecv);
+
private:
- bool HasRecvCB();
- void Stop();
+ bool HasRecvCB();
+ void Stop();
- bhome_shm::SharedMemory &shm_;
- Type type_;
- std::vector<std::thread> workers_;
- std::mutex mutex_;
- std::condition_variable cv_recv_cb_;
- std::atomic<bool> run_;
- RecvCB onRecv_;
+ bhome_shm::SharedMemory &shm_;
+ Type type_;
+ std::vector<std::thread> workers_;
+ std::mutex mutex_;
+ std::condition_variable cv_recv_cb_;
+ std::atomic<bool> run_;
+ RecvCB onRecv_;
- std::unique_ptr<Queue> mq_;
+ std::unique_ptr<Queue> mq_;
};
-
#endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp
index eff0209..06093fd 100644
--- a/utest/simple_tests.cpp
+++ b/utest/simple_tests.cpp
@@ -18,126 +18,123 @@
#include "util.h"
-struct s1000 { char a[1000]; };
-
+struct s1000 {
+ char a[1000];
+};
BOOST_AUTO_TEST_CASE(BasicTest)
{
- const std::string shm_name("basic");
- ShmRemover auto_remove(shm_name);
- SharedMemory shm(shm_name, 1024*1024*10);
- auto Avail = [&]() { return shm.get_free_memory(); };
+ const std::string shm_name("basic");
+ ShmRemover auto_remove(shm_name);
+ SharedMemory shm(shm_name, 1024 * 1024 * 10);
+ auto Avail = [&]() { return shm.get_free_memory(); };
- offset_ptr<const void> p;
- BOOST_CHECK(!p);
- BOOST_CHECK(p.get() == 0);
- p = 0;
- BOOST_CHECK(!p);
- BOOST_CHECK(p.get() == 0);
- const char *str = "basic";
- p = str;
- BOOST_CHECK(p);
- BOOST_CHECK(p.get() == str);
- p = 0;
- BOOST_CHECK(!p);
- BOOST_CHECK(p.get() == 0);
+ offset_ptr<const void> p;
+ BOOST_CHECK(!p);
+ BOOST_CHECK(p.get() == 0);
+ p = 0;
+ BOOST_CHECK(!p);
+ BOOST_CHECK(p.get() == 0);
+ const char *str = "basic";
+ p = str;
+ BOOST_CHECK(p);
+ BOOST_CHECK(p.get() == str);
+ p = 0;
+ BOOST_CHECK(!p);
+ BOOST_CHECK(p.get() == 0);
+ auto init_avail = Avail();
- auto init_avail = Avail();
+ auto BasicTest = [&](int tid, int nloop) {
+ auto Code = [&](int id) {
+ typedef ShmObject<s1000> Int;
+ std::string name = std::to_string(id);
+ auto a0 = Avail();
+ Int i1(shm, name);
+ auto a1 = Avail();
+ BOOST_CHECK_LT(a1, a0);
+ printf("s1000 size: %ld\n", a0 - a1);
+ i1->a[0] = 5;
+ Int i2(shm, name);
+ auto a2 = Avail();
+ BOOST_CHECK_EQUAL(a1, a2);
+ BOOST_CHECK_EQUAL(i1.data(), i2.data());
+ int i = i1.Remove();
+ BOOST_CHECK_EQUAL(Avail(), a0);
- auto BasicTest = [&](int tid, int nloop) {
- auto Code = [&](int id) {
+ {
+ auto old = Avail();
+ void *p = shm.Alloc(1024);
+ shm.Dealloc(p);
+ BOOST_CHECK_EQUAL(old, Avail());
+ }
- typedef ShmObject<s1000> Int;
- std::string name = std::to_string(id);
- auto a0 = Avail();
- Int i1(shm, name);
- auto a1 = Avail();
- BOOST_CHECK_LT(a1, a0);
- printf("s1000 size: %ld\n", a0 - a1);
- i1->a[0] = 5;
- Int i2(shm, name);
- auto a2 = Avail();
- BOOST_CHECK_EQUAL(a1, a2);
- BOOST_CHECK_EQUAL(i1.data(), i2.data());
- int i = i1.Remove();
- BOOST_CHECK_EQUAL(Avail(), a0);
+ bool r = shared_memory_object::remove(shm_name.c_str());
+ BOOST_CHECK(r);
+ };
+ for (int i = 0; i < nloop; ++i) {
+ Code(i + tid * nloop);
+ }
+ };
- {
- auto old = Avail();
- void *p = shm.Alloc(1024);
- shm.Dealloc(p);
- BOOST_CHECK_EQUAL(old, Avail());
- }
-
- bool r = shared_memory_object::remove(shm_name.c_str());
- BOOST_CHECK(r);
- };
- for (int i = 0; i < nloop; ++i) {
- Code(i + tid*nloop);
- }
- };
-
- // boost::timer::auto_cpu_timer timer;
- ThreadManager threads;
- int nthread = 1;
- int nloop = 1;
- for (int i = 0; i < nthread; ++i)
- {
- threads.Launch(BasicTest, i, nloop);
- }
- BOOST_CHECK_EQUAL(init_avail, Avail());
+ // boost::timer::auto_cpu_timer timer;
+ ThreadManager threads;
+ int nthread = 1;
+ int nloop = 1;
+ for (int i = 0; i < nthread; ++i) {
+ threads.Launch(BasicTest, i, nloop);
+ }
+ BOOST_CHECK_EQUAL(init_avail, Avail());
}
BOOST_AUTO_TEST_CASE(ForkTest)
{
- ProcessManager procs;
- const int nproc = 10;
+ ProcessManager procs;
+ const int nproc = 10;
- printf("Testing fork:\n");
+ printf("Testing fork:\n");
- auto child = [&](int id) {
- std::this_thread::sleep_for(100ms *id);
- printf("child id: %3d/%d ends\r", id, nproc);
- };
+ auto child = [&](int id) {
+ std::this_thread::sleep_for(100ms * id);
+ printf("child id: %3d/%d ends\r", id, nproc);
+ };
- for (int i = 0; i < nproc; ++i) {
- procs.Launch(child, i+1);
- }
+ for (int i = 0; i < nproc; ++i) {
+ procs.Launch(child, i + 1);
+ }
}
BOOST_AUTO_TEST_CASE(TimedWaitTest)
{
- const std::string shm_name("shm_wait");
- ShmRemover auto_remove(shm_name);
- SharedMemory shm(shm_name, 1024*1024);
- ShmMsgQueue q(shm, 64);
- for (int i = 0; i < 2; ++i) {
- int ms = i * 100;
- printf("Timeout Test %4d: ", ms);
- boost::timer::auto_cpu_timer timer;
- BHMsg msg;
- bool r = q.Recv(msg, ms);
- BOOST_CHECK(!r);
- }
+ const std::string shm_name("shm_wait");
+ ShmRemover auto_remove(shm_name);
+ SharedMemory shm(shm_name, 1024 * 1024);
+ ShmMsgQueue q(shm, 64);
+ for (int i = 0; i < 2; ++i) {
+ int ms = i * 100;
+ printf("Timeout Test %4d: ", ms);
+ boost::timer::auto_cpu_timer timer;
+ BHMsg msg;
+ bool r = q.Recv(msg, ms);
+ BOOST_CHECK(!r);
+ }
}
BOOST_AUTO_TEST_CASE(RefCountTest)
{
- const std::string shm_name("ShmRefCount");
- ShmRemover auto_remove(shm_name);
- SharedMemory shm(shm_name, 1024*1024);
+ const std::string shm_name("ShmRefCount");
+ ShmRemover auto_remove(shm_name);
+ SharedMemory shm(shm_name, 1024 * 1024);
- MsgI m0(shm.Alloc(1000), shm.New<RefCount>());
- BOOST_CHECK(m0.IsCounted());
- BOOST_CHECK_EQUAL(m0.Count(), 1);
- MsgI m1 = m0;
- BOOST_CHECK(m1.IsCounted());
- BOOST_CHECK_EQUAL(m1.AddRef(), 2);
- BOOST_CHECK_EQUAL(m0.AddRef(), 3);
- BOOST_CHECK_EQUAL(m0.Release(shm), 2);
- BOOST_CHECK_EQUAL(m0.Release(shm), 1);
- BOOST_CHECK_EQUAL(m1.Release(shm), 0);
- BOOST_CHECK(!m1.IsCounted());
+ MsgI m0(shm.Alloc(1000), shm.New<RefCount>());
+ BOOST_CHECK(m0.IsCounted());
+ BOOST_CHECK_EQUAL(m0.Count(), 1);
+ MsgI m1 = m0;
+ BOOST_CHECK(m1.IsCounted());
+ BOOST_CHECK_EQUAL(m1.AddRef(), 2);
+ BOOST_CHECK_EQUAL(m0.AddRef(), 3);
+ BOOST_CHECK_EQUAL(m0.Release(shm), 2);
+ BOOST_CHECK_EQUAL(m0.Release(shm), 1);
+ BOOST_CHECK_EQUAL(m1.Release(shm), 0);
+ BOOST_CHECK(!m1.IsCounted());
}
-
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index b1cba46..35465bb 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -22,176 +22,175 @@
BOOST_AUTO_TEST_CASE(SpeedTest)
{
- const std::string shm_name("ShmSpeed");
- ShmRemover auto_remove(shm_name);
- const int mem_size = 1024*1024*50;
- MQId id = boost::uuids::random_generator()();
- const int timeout = 100;
- const uint32_t data_size = 4000;
+ const std::string shm_name("ShmSpeed");
+ ShmRemover auto_remove(shm_name);
+ const int mem_size = 1024 * 1024 * 50;
+ MQId id = boost::uuids::random_generator()();
+ const int timeout = 100;
+ const uint32_t data_size = 4000;
- auto Writer = [&](int writer_id, uint64_t n) {
- SharedMemory shm(shm_name, mem_size);
- ShmMsgQueue mq(shm, 64);
- std::string str(data_size, 'a');
- MsgI msg;
- DEFER1(msg.Release(shm););
- msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size()));
- for (uint64_t i = 0; i < n; ++i) {
- // mq.Send(id, str.data(), str.size(), timeout);
- mq.Send(id, msg, timeout);
- }
- };
- auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork){
- SharedMemory shm(shm_name, mem_size);
- ShmMsgQueue mq(id, shm, 1000);
- while(*run) {
- BHMsg msg;
- if (mq.Recv(msg, timeout)) {
- // ok
- } else if (isfork) {
- exit(0); // for forked quit after 1s.
- }
- }
- };
- auto State = [&](std::atomic<bool> *run){
- SharedMemory shm(shm_name, mem_size);
- auto init = shm.get_free_memory();
- printf("shm init : %ld\n", init);
- while (*run) {
- auto cur = shm.get_free_memory();
- printf("shm used : %8ld/%ld\n", init - cur, init);
- std::this_thread::sleep_for(1s);
- }
- };
+ auto Writer = [&](int writer_id, uint64_t n) {
+ SharedMemory shm(shm_name, mem_size);
+ ShmMsgQueue mq(shm, 64);
+ std::string str(data_size, 'a');
+ MsgI msg;
+ DEFER1(msg.Release(shm););
+ msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size()));
+ for (uint64_t i = 0; i < n; ++i) {
+ // mq.Send(id, str.data(), str.size(), timeout);
+ mq.Send(id, msg, timeout);
+ }
+ };
+ auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
+ SharedMemory shm(shm_name, mem_size);
+ ShmMsgQueue mq(id, shm, 1000);
+ while (*run) {
+ BHMsg msg;
+ if (mq.Recv(msg, timeout)) {
+ // ok
+ } else if (isfork) {
+ exit(0); // for forked quit after 1s.
+ }
+ }
+ };
+ auto State = [&](std::atomic<bool> *run) {
+ SharedMemory shm(shm_name, mem_size);
+ auto init = shm.get_free_memory();
+ printf("shm init : %ld\n", init);
+ while (*run) {
+ auto cur = shm.get_free_memory();
+ printf("shm used : %8ld/%ld\n", init - cur, init);
+ std::this_thread::sleep_for(1s);
+ }
+ };
- int nwriters[] = {1,2,4};
- int nreaders[] = {1,2};
+ int nwriters[] = {1, 2, 4};
+ int nreaders[] = {1, 2};
- auto Test = [&](auto &www, auto &rrr, bool isfork) {
- for (auto nreader : nreaders) {
- for (auto nwriter : nwriters) {
- const uint64_t nmsg = 1000 * 1000 * 10 / nwriter;
- const uint64_t total_msg = nmsg * nwriter;
- std::atomic<bool> run(true);
- std::this_thread::sleep_for(10ms);
- boost::timer::auto_cpu_timer timer;
- for (int i = 0; i < nreader; ++i) {
- rrr.Launch(Reader, i, &run, isfork);
- }
- for (int i = 0; i < nwriter; ++i) {
- www.Launch(Writer, i, nmsg);
- }
- www.WaitAll();
- run.store(false);
- rrr.WaitAll();
- printf("Write %ld msg R(%3d) W(%3d), : ", total_msg, nreader, nwriter);
- }
- }
- };
+ auto Test = [&](auto &www, auto &rrr, bool isfork) {
+ for (auto nreader : nreaders) {
+ for (auto nwriter : nwriters) {
+ const uint64_t nmsg = 1000 * 1000 * 10 / nwriter;
+ const uint64_t total_msg = nmsg * nwriter;
+ std::atomic<bool> run(true);
+ std::this_thread::sleep_for(10ms);
+ boost::timer::auto_cpu_timer timer;
+ for (int i = 0; i < nreader; ++i) {
+ rrr.Launch(Reader, i, &run, isfork);
+ }
+ for (int i = 0; i < nwriter; ++i) {
+ www.Launch(Writer, i, nmsg);
+ }
+ www.WaitAll();
+ run.store(false);
+ rrr.WaitAll();
+ printf("Write %ld msg R(%3d) W(%3d), : ", total_msg, nreader, nwriter);
+ }
+ }
+ };
- std::atomic<bool> run(true);
- ThreadManager state;
- state.Launch(State, &run);
- // typedef ProcessManager Manager;
- // typedef ThreadManager Manager;
- // const bool isfork = IsSameType<Manager, ProcessManager>::value;
- ProcessManager pw, pr;
- printf("================ Testing process io: =======================================================\n");
- Test(pw, pr, true);
- ThreadManager tw, tr;
- printf("---------------- Testing thread io: -------------------------------------------------------\n");
- Test(tw, tr, false);
- run.store(false);
+ std::atomic<bool> run(true);
+ ThreadManager state;
+ state.Launch(State, &run);
+ // typedef ProcessManager Manager;
+ // typedef ThreadManager Manager;
+ // const bool isfork = IsSameType<Manager, ProcessManager>::value;
+ ProcessManager pw, pr;
+ printf("================ Testing process io: =======================================================\n");
+ Test(pw, pr, true);
+ ThreadManager tw, tr;
+ printf("---------------- Testing thread io: -------------------------------------------------------\n");
+ Test(tw, tr, false);
+ run.store(false);
}
// Request Reply Test
BOOST_AUTO_TEST_CASE(RRTest)
{
- const std::string shm_name("ShmReqRep");
- ShmRemover auto_remove(shm_name);
- const int qlen = 64;
- const size_t msg_length = 1000;
- std::string msg_content(msg_length, 'a');
- msg_content[20] = '\0';
+ const std::string shm_name("ShmReqRep");
+ ShmRemover auto_remove(shm_name);
+ const int qlen = 64;
+ const size_t msg_length = 1000;
+ std::string msg_content(msg_length, 'a');
+ msg_content[20] = '\0';
- SharedMemory shm(shm_name, 1024*1024*50);
- auto Avail = [&]() { return shm.get_free_memory(); };
- auto init_avail = Avail();
- ShmMsgQueue srv(shm, qlen);
- ShmMsgQueue cli(shm, qlen);
+ SharedMemory shm(shm_name, 1024 * 1024 * 50);
+ auto Avail = [&]() { return shm.get_free_memory(); };
+ auto init_avail = Avail();
+ ShmMsgQueue srv(shm, qlen);
+ ShmMsgQueue cli(shm, qlen);
- MsgI request_rc;
- request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size()));
- MsgI reply_rc;
- reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size()));
+ MsgI request_rc;
+ request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size()));
+ MsgI reply_rc;
+ reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size()));
- std::atomic<uint64_t> count(0);
+ std::atomic<uint64_t> count(0);
- std::atomic<ptime> last_time(Now() - seconds(1));
- std::atomic<uint64_t> last_count(0);
+ std::atomic<ptime> last_time(Now() - seconds(1));
+ std::atomic<uint64_t> last_count(0);
- auto Client = [&](int cli_id, int nmsg){
- for (int i = 0; i < nmsg; ++i) {
- auto Req = [&]() {
- return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100);
- };
- auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
+ auto Client = [&](int cli_id, int nmsg) {
+ for (int i = 0; i < nmsg; ++i) {
+ auto Req = [&]() {
+ return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100);
+ };
+ auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
- if (!ReqRC()) {
- printf("********** client send error.\n");
- continue;
- }
- BHMsg msg;
- if (!cli.Recv(msg, 1000)) {
- printf("********** client recv error.\n");
- } else {
- ++count;
- auto cur = Now();
- if (last_time.exchange(cur) < cur) {
- std::cout << "time: " << cur;
- printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
- count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count());
- }
+ if (!ReqRC()) {
+ printf("********** client send error.\n");
+ continue;
+ }
+ BHMsg msg;
+ if (!cli.Recv(msg, 1000)) {
+ printf("********** client recv error.\n");
+ } else {
+ ++count;
+ auto cur = Now();
+ if (last_time.exchange(cur) < cur) {
+ std::cout << "time: " << cur;
+ printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
+ count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count());
+ }
+ }
+ }
+ };
- }
- }
- };
+ std::atomic<bool> stop(false);
+ auto Server = [&]() {
+ BHMsg req;
+ while (!stop) {
+ if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) {
+ auto &mqid = req.route()[0].mq_id();
+ MQId src_id;
+ memcpy(&src_id, mqid.data(), sizeof(src_id));
+ auto Reply = [&]() {
+ return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100);
+ };
+ auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); };
- std::atomic<bool> stop(false);
- auto Server = [&](){
- BHMsg req;
- while (!stop) {
- if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) {
- auto &mqid = req.route()[0].mq_id();
- MQId src_id;
- memcpy(&src_id, mqid.data(), sizeof(src_id));
- auto Reply = [&]() {
- return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100);
- };
- auto ReplyRC = [&](){ return srv.Send(src_id, reply_rc, 100); };
+ if (ReplyRC()) {
+ }
+ }
+ }
+ };
- if (ReplyRC()) {
- }
- }
- }
- };
+ boost::timer::auto_cpu_timer timer;
+ DEFER1(printf("Request Reply Test:"););
- boost::timer::auto_cpu_timer timer;
- DEFER1(printf("Request Reply Test:"););
-
- ThreadManager clients, servers;
- for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
- int ncli = 100*1;
- uint64_t nmsg = 100*100*2;
- printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
- for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
- clients.WaitAll();
- printf("request ok: %ld\n", count.load());
- stop = true;
- servers.WaitAll();
- BOOST_CHECK(request_rc.IsCounted());
- BOOST_CHECK_EQUAL(request_rc.Count(), 1);
- request_rc.Release(shm);
- BOOST_CHECK(!request_rc.IsCounted());
- // BOOST_CHECK_THROW(reply.Count(), int);
+ ThreadManager clients, servers;
+ for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
+ int ncli = 100 * 1;
+ uint64_t nmsg = 100 * 100 * 2;
+ printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
+ for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
+ clients.WaitAll();
+ printf("request ok: %ld\n", count.load());
+ stop = true;
+ servers.WaitAll();
+ BOOST_CHECK(request_rc.IsCounted());
+ BOOST_CHECK_EQUAL(request_rc.Count(), 1);
+ request_rc.Release(shm);
+ BOOST_CHECK(!request_rc.IsCounted());
+ // BOOST_CHECK_THROW(reply.Count(), int);
}
diff --git a/utest/utest.cpp b/utest/utest.cpp
index bb5c14d..473b04e 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,167 +1,172 @@
-#include <stdio.h>
-#include <string>
-#include <vector>
-#include <thread>
+#include "defs.h"
+#include "pubsub.h"
+#include "socket.h"
+#include "util.h"
#include <atomic>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
-#include "pubsub.h"
-#include "defs.h"
-#include "util.h"
-#include "socket.h"
+#include <stdio.h>
+#include <string>
+#include <thread>
+#include <vector>
-template <class A, class B> struct IsSameType { static const bool value = false; };
-template <class A> struct IsSameType<A,A> { static const bool value = true; };
-
+template <class A, class B>
+struct IsSameType {
+ static const bool value = false;
+};
+template <class A>
+struct IsSameType<A, A> {
+ static const bool value = true;
+};
BOOST_AUTO_TEST_CASE(Temp)
{
- std::string topics[] = {
- "",
- ".",
- "a",
- "sp",
- "sport",
- "sport.",
- "sport.a",
- "sport.a.b.c",
- "sport.ab.c",
- "sport.basketball",
- "sport.football",
- };
- const char sep = '.';
- auto Adjust = [&](const std::string &user_topic) {
- if (user_topic.empty() || user_topic.back() == sep) {
- return user_topic;
- } else {
- return user_topic + sep;
- }
- };
+ std::string topics[] = {
+ "",
+ ".",
+ "a",
+ "sp",
+ "sport",
+ "sport.",
+ "sport.a",
+ "sport.a.b.c",
+ "sport.ab.c",
+ "sport.basketball",
+ "sport.football",
+ };
+ const char sep = '.';
+ auto Adjust = [&](const std::string &user_topic) {
+ if (user_topic.empty() || user_topic.back() == sep) {
+ return user_topic;
+ } else {
+ return user_topic + sep;
+ }
+ };
- for (auto &t : topics) {
- const std::string &a = Adjust(t);
- printf("orig: %20s adjusted: %20s parts:[", ("'" + t + "'").c_str(), ('\'' + a + '\'').c_str());
+ for (auto &t : topics) {
+ const std::string &a = Adjust(t);
+ printf("orig: %20s adjusted: %20s parts:[", ("'" + t + "'").c_str(), ('\'' + a + '\'').c_str());
- size_t pos = 0;
- while (true) {
- auto &topic = t;
- pos = topic.find(kTopicSep, pos);
- if (pos == topic.npos || ++pos == topic.size()) {
- // Find1(std::string()); // sub all.
- break;
- } else {
- printf("'%s',", topic.substr(0, pos).c_str());
- }
- }
- printf("]\n");
- }
+ size_t pos = 0;
+ while (true) {
+ auto &topic = t;
+ pos = topic.find(kTopicSep, pos);
+ if (pos == topic.npos || ++pos == topic.size()) {
+ // Find1(std::string()); // sub all.
+ break;
+ } else {
+ printf("'%s',", topic.substr(0, pos).c_str());
+ }
+ }
+ printf("]\n");
+ }
}
BOOST_AUTO_TEST_CASE(PubSubTest)
{
- const std::string shm_name("ShmPubSub");
- ShmRemover auto_remove(shm_name); //remove twice? in case of killed?
- SharedMemory shm(shm_name, 1024*1024*50);
- auto Avail = [&]() { return shm.get_free_memory(); };
- auto init_avail = Avail();
+ const std::string shm_name("ShmPubSub");
+ ShmRemover auto_remove(shm_name); //remove twice? in case of killed?
+ SharedMemory shm(shm_name, 1024 * 1024 * 50);
+ auto Avail = [&]() { return shm.get_free_memory(); };
+ auto init_avail = Avail();
- BusManager bus(shm);
- bus.Start(1);
- std::this_thread::sleep_for(100ms);
+ BusManager bus(shm);
+ bus.Start(1);
+ std::this_thread::sleep_for(100ms);
- std::atomic<uint64_t> count(0);
- std::atomic<ptime> last_time(Now() - seconds(1));
- std::atomic<uint64_t> last_count(0);
+ std::atomic<uint64_t> count(0);
+ std::atomic<ptime> last_time(Now() - seconds(1));
+ std::atomic<uint64_t> last_count(0);
- const uint64_t nmsg = 100;
- const int timeout = 1000;
- auto Sub = [&](int id, const std::vector<std::string> &topics) {
- ShmSocket client(ShmSocket::eSockSubscribe, shm);
- bool r = client.Subscribe(topics, timeout);
- std::mutex mutex;
- std::condition_variable cv;
+ const uint64_t nmsg = 100;
+ const int timeout = 1000;
+ auto Sub = [&](int id, const std::vector<std::string> &topics) {
+ ShmSocket client(ShmSocket::eSockSubscribe, shm);
+ bool r = client.Subscribe(topics, timeout);
+ std::mutex mutex;
+ std::condition_variable cv;
- int i = 0;
- auto OnRecv = [&](BHMsg &msg) {
- if (msg.type() != kMsgTypePublish) {
- BOOST_CHECK(false);
- }
- DataPub pub;
- if (!pub.ParseFromString(msg.body())) {
- BOOST_CHECK(false);
- }
- ++count;
+ int i = 0;
+ auto OnRecv = [&](BHMsg &msg) {
+ if (msg.type() != kMsgTypePublish) {
+ BOOST_CHECK(false);
+ }
+ DataPub pub;
+ if (!pub.ParseFromString(msg.body())) {
+ BOOST_CHECK(false);
+ }
+ ++count;
- auto cur = Now();
- if (last_time.exchange(cur) < cur) {
- std::cout << "time: " << cur;
- printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
- count.load(), count - last_count.exchange(count), init_avail - Avail());
- }
- if (++i >= nmsg*topics.size()) {
- cv.notify_one();
- }
- // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
- };
- client.SetRecvCallback(OnRecv);
+ auto cur = Now();
+ if (last_time.exchange(cur) < cur) {
+ std::cout << "time: " << cur;
+ printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
+ count.load(), count - last_count.exchange(count), init_avail - Avail());
+ }
+ if (++i >= nmsg * topics.size()) {
+ cv.notify_one();
+ }
+ // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
+ };
+ client.SetRecvCallback(OnRecv);
- std::unique_lock<std::mutex> lk(mutex);
- cv.wait(lk);
+ std::unique_lock<std::mutex> lk(mutex);
+ cv.wait(lk);
+ };
- };
+ auto Pub = [&](const std::string &topic) {
+ ShmSocket provider(ShmSocket::eSockPublish, shm);
+ for (int i = 0; i < nmsg; ++i) {
+ std::string data = topic + std::to_string(i) + std::string(1000, '-');
- auto Pub = [&](const std::string &topic) {
- ShmSocket provider(ShmSocket::eSockPublish, shm);
- for (int i = 0; i < nmsg; ++i) {
- std::string data = topic + std::to_string(i) + std::string(1000, '-');
+ bool r = provider.Publish(topic, data.data(), data.size(), timeout);
+ // bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout);
+ if (!r) {
+ printf("pub ret: %s\n", r ? "ok" : "fail");
+ }
+ }
+ };
+ ThreadManager threads;
+ typedef std::vector<std::string> Topics;
+ Topics topics;
+ for (int i = 0; i < 100; ++i) {
+ topics.push_back("t" + std::to_string(i));
+ }
+ Topics part;
+ for (int i = 0; i < topics.size(); ++i) {
+ part.push_back(topics[i]);
+ threads.Launch(Sub, i, topics);
+ }
+ std::this_thread::sleep_for(100ms);
+ for (auto &topic : topics) {
+ threads.Launch(Pub, topic);
+ }
+ threads.Launch(Pub, "some_else");
- bool r = provider.Publish(topic, data.data(), data.size(), timeout);
- // bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout);
- if (!r) {
- printf("pub ret: %s\n", r ? "ok" : "fail");
- }
- }
- };
- ThreadManager threads;
- typedef std::vector<std::string> Topics;
- Topics topics;
- for (int i = 0; i < 100; ++i) {
- topics.push_back("t" + std::to_string(i));
- }
- Topics part;
- for (int i = 0; i < topics.size(); ++i) {
- part.push_back(topics[i]);
- threads.Launch(Sub, i, topics);
- }
- std::this_thread::sleep_for(100ms);
- for (auto &topic: topics) {
- threads.Launch(Pub, topic);
- }
- threads.Launch(Pub, "some_else");
+ threads.WaitAll();
+ std::cout << "end : " << Now();
+ printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
+ count.load(), count - last_count.exchange(count), init_avail - Avail());
- threads.WaitAll();
- std::cout << "end : " << Now();
- printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
- count.load(), count - last_count.exchange(count), init_avail - Avail());
-
- bus.Stop();
+ bus.Stop();
}
-inline int MyMin(int a, int b) {
- printf("MyMin\n");
- return a < b ? a : b;
+inline int MyMin(int a, int b)
+{
+ printf("MyMin\n");
+ return a < b ? a : b;
}
+
int test_main(int argc, char *argv[])
{
- printf("test main\n");
- int a = 0;
- int b = 0;
- BOOST_CHECK_EQUAL(a, b);
- int n = MyMin(4,6);
- for (int i = 0; i < n; ++i) {
- printf("i = %d\n", i);
- }
+ printf("test main\n");
+ int a = 0;
+ int b = 0;
+ BOOST_CHECK_EQUAL(a, b);
+ int n = MyMin(4, 6);
+ for (int i = 0; i < n; ++i) {
+ printf("i = %d\n", i);
+ }
- return 0;
+ return 0;
}
-
diff --git a/utest/util.h b/utest/util.h
index ac1d58d..ca58cd7 100644
--- a/utest/util.h
+++ b/utest/util.h
@@ -19,21 +19,21 @@
#ifndef UTIL_W8A0OA5U
#define UTIL_W8A0OA5U
-#include <functional>
-#include <vector>
-#include <thread>
-#include <stdlib.h>
-#include <chrono>
-#include <sys/types.h>
-#include <sys/wait.h>
-#include <boost/noncopyable.hpp>
-#include <boost/timer/timer.hpp>
-#include <boost/test/unit_test.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
+#include "bh_util.h"
+#include "msg.h"
#include "shm.h"
#include "shm_queue.h"
-#include "msg.h"
-#include "bh_util.h"
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/noncopyable.hpp>
+#include <boost/test/unit_test.hpp>
+#include <boost/timer/timer.hpp>
+#include <chrono>
+#include <functional>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <thread>
+#include <vector>
using namespace boost::posix_time;
inline ptime Now() { return second_clock::universal_time(); };
@@ -42,58 +42,69 @@
typedef std::function<void(void)> FuncVV;
-class ScopeCall : private boost::noncopyable {
- FuncVV f_;
+class ScopeCall : private boost::noncopyable
+{
+ FuncVV f_;
+
public:
- ScopeCall(FuncVV f):f_(f) { f_(); }
- ~ScopeCall() { f_(); }
+ ScopeCall(FuncVV f) :
+ f_(f) { f_(); }
+ ~ScopeCall() { f_(); }
};
-class ThreadManager {
- std::vector<std::thread> threads_;
+class ThreadManager
+{
+ std::vector<std::thread> threads_;
+
public:
- ~ThreadManager() { WaitAll(); }
- template <class T, class...P>
- void Launch(T t, P...p) { threads_.emplace_back(t, p...); }
- void WaitAll() {
- for (auto &t : threads_) {
- if (t.joinable()) {
- t.join();
- }
- }
- }
+ ~ThreadManager() { WaitAll(); }
+ template <class T, class... P>
+ void Launch(T t, P... p) { threads_.emplace_back(t, p...); }
+ void WaitAll()
+ {
+ for (auto &t : threads_) {
+ if (t.joinable()) {
+ t.join();
+ }
+ }
+ }
};
-class ProcessManager {
- std::vector<pid_t> procs_;
+class ProcessManager
+{
+ std::vector<pid_t> procs_;
+
public:
- ~ProcessManager() { WaitAll(); }
- template <class T, class ...P>
- void Launch(T t, P...p) {
- auto pid = fork();
- if (pid == 0) {
- // child
- t(p...);
- exit(0);
- } else if (pid != -1) { // Ok
- procs_.push_back(pid);
- }
- };
- void WaitAll() {
- for (auto &pid: procs_) {
- int status = 0;
- int options = WUNTRACED | WCONTINUED;
- waitpid(pid, &status, options);
- }
- procs_.clear();
- }
+ ~ProcessManager() { WaitAll(); }
+ template <class T, class... P>
+ void Launch(T t, P... p)
+ {
+ auto pid = fork();
+ if (pid == 0) {
+ // child
+ t(p...);
+ exit(0);
+ } else if (pid != -1) { // Ok
+ procs_.push_back(pid);
+ }
+ };
+ void WaitAll()
+ {
+ for (auto &pid : procs_) {
+ int status = 0;
+ int options = WUNTRACED | WCONTINUED;
+ waitpid(pid, &status, options);
+ }
+ procs_.clear();
+ }
};
using namespace bhome_shm;
using namespace bhome_msg;
struct ShmRemover {
- std::string name_;
- ShmRemover(const std::string &name):name_(name) { SharedMemory::Remove(name_); }
- ~ShmRemover() { SharedMemory::Remove(name_); }
+ std::string name_;
+ ShmRemover(const std::string &name) :
+ name_(name) { SharedMemory::Remove(name_); }
+ ~ShmRemover() { SharedMemory::Remove(name_); }
};
#endif // end of include guard: UTIL_W8A0OA5U
--
Gitblit v1.8.0