From 6eefba812ede29549af3633c490f2e85a4805524 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期三, 31 三月 2021 11:24:20 +0800 Subject: [PATCH] format code style. --- src/shm.h | 130 ++- src/socket.h | 75 +- utest/simple_tests.cpp | 191 ++-- src/msg.h | 79 + src/socket.cpp | 158 ++-- .clang-format | 12 src/defs.h | 5 utest/utest.cpp | 279 +++--- src/shm_queue.cpp | 91 +- src/center.cpp | 3 src/msg.cpp | 151 ++-- utest/speed_test.cpp | 305 ++++---- utest/util.h | 119 +- src/bh_util.h | 60 src/pubsub.h | 37 src/shm_queue.h | 184 ++-- src/center.h | 1 src/pubsub.cpp | 248 +++--- src/shm.cpp | 6 19 files changed, 1,104 insertions(+), 1,030 deletions(-) diff --git a/.clang-format b/.clang-format index fd84ae9..413be3a 100644 --- a/.clang-format +++ b/.clang-format @@ -3,21 +3,21 @@ IndentWidth: 4 TabWidth: 4 ColumnLimit: 0 -AllowShortIfStatementsOnASingleLine: true AllowShortCaseLabelsOnASingleLine: true AllowShortBlocksOnASingleLine: true AllowShortEnumsOnASingleLine: true AllowShortLoopsOnASingleLine: true -AllowShortLambdasOnASingleLine: true -AllowShortFunctionsOnASingleLine: true +AllowShortLambdasOnASingleLine: All +AllowShortIfStatementsOnASingleLine: WithoutElse +AllowShortFunctionsOnASingleLine: All BreakBeforeBraces: Linux IndentCaseLabels: false AccessModifierOffset: -4 BreakConstructorInitializers: AfterColon -AlignConsecutiveAssignments: true -AlignConsecutiveDeclarations: true +ConstructorInitializerAllOnOneLineOrOnePerLine: true +AllowAllConstructorInitializersOnNextLine: true + AlignTrailingComments: true AlignEscapedNewlinesLeft: true PointerAlignment: Right SpaceAfterCStyleCast: true - diff --git a/src/bh_util.h b/src/bh_util.h index d86b931..b5dc45e 100644 --- a/src/bh_util.h +++ b/src/bh_util.h @@ -23,7 +23,7 @@ inline uint16_t Get8(const void *p) { - return static_cast<const uint8_t*>(p)[0]; + return static_cast<const uint8_t *>(p)[0]; } inline void Put8(void *p, uint8_t u) { @@ -32,9 +32,9 @@ inline uint16_t Get16(const void *p) { - auto ptr = static_cast<const uint8_t*>(p); - return (((uint16_t)ptr[0]) << 8u) | - (((uint16_t)ptr[1])); + auto ptr = static_cast<const uint8_t *>(p); + return (((uint16_t) ptr[0]) << 8u) | + (((uint16_t) ptr[1])); } inline void Put16(void *p, uint16_t u) { @@ -45,11 +45,11 @@ inline uint32_t Get32(const void *p) { - auto ptr = static_cast<const uint8_t*>(p); - return (((uint32_t)ptr[0]) << 24u) | - (((uint32_t)ptr[1]) << 16u) | - (((uint32_t)ptr[2]) << 8u) | - (((uint32_t)ptr[3])); + auto ptr = static_cast<const uint8_t *>(p); + return (((uint32_t) ptr[0]) << 24u) | + (((uint32_t) ptr[1]) << 16u) | + (((uint32_t) ptr[2]) << 8u) | + (((uint32_t) ptr[3])); } inline void Put32(void *p, uint32_t u) { @@ -60,17 +60,17 @@ ptr[3] = (uint8_t)(u); } -inline uint64_t Get64(const void *p) +inline uint64_t Get64(const void *p) { - auto ptr = static_cast<const uint8_t*>(p); - return (((uint64_t)ptr[0]) << 56u) | - (((uint64_t)ptr[1]) << 48u) | - (((uint64_t)ptr[2]) << 40u) | - (((uint64_t)ptr[3]) << 32u) | - (((uint64_t)ptr[4]) << 24u) | - (((uint64_t)ptr[5]) << 16u) | - (((uint64_t)ptr[6]) << 8u) | - ((uint64_t)ptr[7]); + auto ptr = static_cast<const uint8_t *>(p); + return (((uint64_t) ptr[0]) << 56u) | + (((uint64_t) ptr[1]) << 48u) | + (((uint64_t) ptr[2]) << 40u) | + (((uint64_t) ptr[3]) << 32u) | + (((uint64_t) ptr[4]) << 24u) | + (((uint64_t) ptr[5]) << 16u) | + (((uint64_t) ptr[6]) << 8u) | + ((uint64_t) ptr[7]); } inline void Put64(void *p, uint64_t u) { @@ -92,20 +92,24 @@ class ExitCall { - typedef std::function<void(void)> func_t; - func_t m_func; + typedef std::function<void(void)> func_t; + func_t m_func; + public: - explicit ExitCall(func_t f): m_func(f) {} - ~ExitCall() { if (m_func) { m_func(); } } + explicit ExitCall(func_t f) : + m_func(f) {} + ~ExitCall() + { + if (m_func) { m_func(); } + } }; // macro helper -#define JOIN_IMPL(a, b) a ## b -#define JOIN(a, b) JOIN_IMPL(a , b) +#define JOIN_IMPL(a, b) a##b +#define JOIN(a, b) JOIN_IMPL(a, b) // defer function / lambda. -#define DEFERF(func) ExitCall JOIN(defer_ , __LINE__)(func) +#define DEFERF(func) ExitCall JOIN(defer_, __LINE__)(func) // defer simple expression -#define DEFER1(expr) DEFERF([&](){ expr; }) - +#define DEFER1(expr) DEFERF([&]() { expr; }) #endif /* end of include guard: BH_UTIL_SOXWOK67 */ diff --git a/src/center.cpp b/src/center.cpp index 809b6d1..db000c4 100644 --- a/src/center.cpp +++ b/src/center.cpp @@ -23,7 +23,6 @@ SharedMemory &BHomeShm() { - static SharedMemory shm("bhome_default_shm_v0", 1024*1024*64); + static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 64); return shm; } - diff --git a/src/center.h b/src/center.h index fcb8005..153cc3e 100644 --- a/src/center.h +++ b/src/center.h @@ -20,7 +20,6 @@ class BHCenter { - }; #endif // end of include guard: CENTER_TM9OUQTG diff --git a/src/defs.h b/src/defs.h index acfe09e..10ac73c 100644 --- a/src/defs.h +++ b/src/defs.h @@ -27,9 +27,10 @@ const MQId kBHBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff"); const int kBHCenterPort = 24287; const char kTopicSep = '.'; -namespace bhome_shm { +namespace bhome_shm +{ class SharedMemory; -} +} // namespace bhome_shm bhome_shm::SharedMemory &BHomeShm(); diff --git a/src/msg.cpp b/src/msg.cpp index 78834a8..3a01240 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -18,48 +18,49 @@ #include "msg.h" #include "bh_util.h" -namespace bhome_msg { +namespace bhome_msg +{ const uint32_t kMsgTag = 0xf1e2d3c4; const uint32_t kMsgPrefixLen = 4; BHMsg InitMsg(MsgType type) { - BHMsg msg; - msg.set_type(type); - time_t tm = 0; - msg.set_timestamp(time(&tm)); - return msg; + BHMsg msg; + msg.set_type(type); + time_t tm = 0; + msg.set_timestamp(time(&tm)); + return msg; } BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size) { - assert(data && size); - BHMsg msg(InitMsg(kMsgTypeRequest)); - msg.set_body(data, size); - msg.add_route()->set_mq_id(&src_id, sizeof(src_id)); - return msg; + assert(data && size); + BHMsg msg(InitMsg(kMsgTypeRequest)); + msg.set_body(data, size); + msg.add_route()->set_mq_id(&src_id, sizeof(src_id)); + return msg; } BHMsg MakeReply(const void *data, const size_t size) { - assert(data && size); - BHMsg msg(InitMsg(kMsgTypeReply)); - msg.set_body(data, size); - return msg; + assert(data && size); + BHMsg msg(InitMsg(kMsgTypeReply)); + msg.set_body(data, size); + return msg; } BHMsg MakeSubUnsub(const MQId &client, const std::vector<std::string> &topics, const MsgType sub_unsub) { - assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe); - BHMsg msg(InitMsg(sub_unsub)); - msg.add_route()->set_mq_id(&client, sizeof(client)); - DataSub subs; - for (auto &t : topics) { - subs.add_topics(t); - } - msg.set_body(subs.SerializeAsString()); - return msg; + assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe); + BHMsg msg(InitMsg(sub_unsub)); + msg.add_route()->set_mq_id(&client, sizeof(client)); + DataSub subs; + for (auto &t : topics) { + subs.add_topics(t); + } + msg.set_body(subs.SerializeAsString()); + return msg; } BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeSubscribe); } @@ -67,77 +68,77 @@ BHMsg MakePub(const std::string &topic, const void *data, const size_t size) { - assert(data && size); - BHMsg msg(InitMsg(kMsgTypePublish)); - DataPub pub; - pub.set_topic(topic); - pub.set_data(data, size); - msg.set_body(pub.SerializeAsString()); - return msg; + assert(data && size); + BHMsg msg(InitMsg(kMsgTypePublish)); + DataPub pub; + pub.set_topic(topic); + pub.set_data(data, size); + msg.set_body(pub.SerializeAsString()); + return msg; } void *Pack(SharedMemory &shm, const BHMsg &msg) { - uint32_t msg_size = msg.ByteSizeLong(); - void *p = shm.Alloc(4 + msg_size); - if(p) { - Put32(p, msg_size); - if (!msg.SerializeToArray(static_cast<char*>(p) + kMsgPrefixLen, msg_size)) { - shm.Dealloc(p); - p = 0; - } - } - return p; + uint32_t msg_size = msg.ByteSizeLong(); + void *p = shm.Alloc(4 + msg_size); + if (p) { + Put32(p, msg_size); + if (!msg.SerializeToArray(static_cast<char *>(p) + kMsgPrefixLen, msg_size)) { + shm.Dealloc(p); + p = 0; + } + } + return p; } bool MsgI::Unpack(BHMsg &msg) const { - void *p = ptr_.get(); - assert(p); - uint32_t msg_size = Get32(p); - return msg.ParseFromArray(static_cast<char*>(p) + kMsgPrefixLen, msg_size); + void *p = ptr_.get(); + assert(p); + uint32_t msg_size = Get32(p); + return msg.ParseFromArray(static_cast<char *>(p) + kMsgPrefixLen, msg_size); } // with ref count; bool MsgI::MakeRC(SharedMemory &shm, const BHMsg &msg) { - void *p = Pack(shm, msg); - if(!p) { - return false; - } - RefCount *rc = shm.New<RefCount>(); - if (!rc) { - shm.Dealloc(p); - return false; - } - MsgI(p, rc).swap(*this); - return true; + void *p = Pack(shm, msg); + if (!p) { + return false; + } + RefCount *rc = shm.New<RefCount>(); + if (!rc) { + shm.Dealloc(p); + return false; + } + MsgI(p, rc).swap(*this); + return true; } bool MsgI::Make(SharedMemory &shm, const BHMsg &msg) { - void *p = Pack(shm, msg); - if(!p) { - return false; - } - MsgI(p, 0).swap(*this); - return true; + void *p = Pack(shm, msg); + if (!p) { + return false; + } + MsgI(p, 0).swap(*this); + return true; } -int MsgI::Release(SharedMemory &shm) +int MsgI::Release(SharedMemory &shm) { - if (IsCounted()) { - const int n = count_->Dec(); - if (n != 0) { - return n; - } - } - // free data - shm.Dealloc(ptr_); - ptr_ = 0; - shm.Delete(count_); - count_ = 0; - return 0; + if (IsCounted()) { + const int n = count_->Dec(); + if (n != 0) { + return n; + } + } + // free data + shm.Dealloc(ptr_); + ptr_ = 0; + shm.Delete(count_); + count_ = 0; + return 0; } } // namespace bhome_msg diff --git a/src/msg.h b/src/msg.h index f3fe726..2154eba 100644 --- a/src/msg.h +++ b/src/msg.h @@ -18,15 +18,16 @@ #ifndef MSG_5BILLZET #define MSG_5BILLZET -#include <stdint.h> +#include "bhome_msg.pb.h" #include "shm.h" #include <boost/interprocess/offset_ptr.hpp> #include <boost/uuid/uuid_generators.hpp> -#include "bhome_msg.pb.h" +#include <stdint.h> -namespace bhome_msg { - using namespace bhome_shm; - using namespace bhome::msg; // for serialized data in MsgI +namespace bhome_msg +{ +using namespace bhome_shm; +using namespace bhome::msg; // for serialized data in MsgI // MsgI is safe to be stored in shared memory, so POD data or offset_ptr is required. // message format: header(meta) + body(data). @@ -37,49 +38,67 @@ class RefCount : private boost::noncopyable { public: - int Inc() { Guard lk(mutex_); return ++num_; } - int Dec() { Guard lk(mutex_); return --num_; } - int Get() { Guard lk(mutex_); return num_; } + int Inc() + { + Guard lk(mutex_); + return ++num_; + } + int Dec() + { + Guard lk(mutex_); + return --num_; + } + int Get() + { + Guard lk(mutex_); + return num_; + } + private: - Mutex mutex_; - int num_ = 1; + Mutex mutex_; + int num_ = 1; }; BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size); BHMsg MakeReply(const void *data, const size_t size); -BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics); -BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics); +BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics); +BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics); BHMsg MakePub(const std::string &topic, const void *data, const size_t size); -class MsgI { +class MsgI +{ private: - offset_ptr<void> ptr_; - offset_ptr<RefCount> count_; + offset_ptr<void> ptr_; + offset_ptr<RefCount> count_; - bool BuildSubOrUnsub(SharedMemory &shm, const std::vector<std::string> &topics, const MsgType sub_unsub); + bool BuildSubOrUnsub(SharedMemory &shm, const std::vector<std::string> &topics, const MsgType sub_unsub); + public: - MsgI(void *p=0, RefCount *c=0):ptr_(p), count_(c) {} - void swap(MsgI &a) { std::swap(ptr_, a.ptr_); std::swap(count_, a.count_); } - template <class T = void> T *get() { return static_cast<T*>(ptr_.get()); } + MsgI(void *p = 0, RefCount *c = 0) : + ptr_(p), count_(c) {} - // AddRef and Release works for both counted and not counted msg. - int AddRef() const { return IsCounted() ? count_->Inc() : 1; } - int Release(SharedMemory &shm); + void swap(MsgI &a) + { + std::swap(ptr_, a.ptr_); + std::swap(count_, a.count_); + } + template <class T = void> + T *get() { return static_cast<T *>(ptr_.get()); } - int Count() const{ return IsCounted() ? count_->Get() : 1; } + // AddRef and Release works for both counted and not counted msg. + int AddRef() const { return IsCounted() ? count_->Inc() : 1; } + int Release(SharedMemory &shm); - bool IsCounted() const { return static_cast<bool>(count_); } + int Count() const { return IsCounted() ? count_->Get() : 1; } + bool IsCounted() const { return static_cast<bool>(count_); } - bool Make(SharedMemory &shm, const BHMsg &msg); - bool MakeRC(SharedMemory &shm, const BHMsg &msg); - bool Unpack(BHMsg &msg) const; + bool Make(SharedMemory &shm, const BHMsg &msg); + bool MakeRC(SharedMemory &shm, const BHMsg &msg); + bool Unpack(BHMsg &msg) const; }; inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); } - } // namespace bhome_msg - - #endif // end of include guard: MSG_5BILLZET diff --git a/src/pubsub.cpp b/src/pubsub.cpp index a0dc4e9..d5c7dd2 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -16,170 +16,170 @@ * ===================================================================================== */ #include "pubsub.h" -#include <chrono> #include "bh_util.h" #include "defs.h" +#include <chrono> -namespace bhome_shm { +namespace bhome_shm +{ using namespace std::chrono_literals; const int kMaxWorker = 16; using namespace bhome_msg; -BusManager::BusManager(SharedMemory &shm): -shm_(shm), -busq_(kBHBusQueueId, shm, 16), -run_(false) +BusManager::BusManager(SharedMemory &shm) : + shm_(shm), + busq_(kBHBusQueueId, shm, 16), + run_(false) { } - + BusManager::~BusManager() { - Stop(); + Stop(); } bool BusManager::Start(const int nworker) { - std::lock_guard<std::mutex> guard(mutex_); - StopNoLock(); - // start - auto Worker = [&](){ - while (this->run_) { - BusManager &self = *this; - MsgI msg; - const int timeout_ms = 100; - if (self.busq_.Recv(msg, timeout_ms)) { - self.OnMsg(msg); - } - } - }; + std::lock_guard<std::mutex> guard(mutex_); + StopNoLock(); + // start + auto Worker = [&]() { + while (this->run_) { + BusManager &self = *this; + MsgI msg; + const int timeout_ms = 100; + if (self.busq_.Recv(msg, timeout_ms)) { + self.OnMsg(msg); + } + } + }; - run_.store(true); - const int n = std::min(nworker, kMaxWorker); - for (int i = 0; i < n; ++i) { - workers_.emplace_back(Worker); - } - return true; + run_.store(true); + const int n = std::min(nworker, kMaxWorker); + for (int i = 0; i < n; ++i) { + workers_.emplace_back(Worker); + } + return true; } bool BusManager::Stop() { - std::lock_guard<std::mutex> guard(mutex_); - return StopNoLock(); + std::lock_guard<std::mutex> guard(mutex_); + return StopNoLock(); } bool BusManager::StopNoLock() { - if (run_.exchange(false)) { - for (auto &w: workers_) { - if (w.joinable()) { - w.join(); - } - } - return true; - } - return false; + if (run_.exchange(false)) { + for (auto &w : workers_) { + if (w.joinable()) { + w.join(); + } + } + return true; + } + return false; } void BusManager::OnMsg(MsgI &imsg) { - DEFER1(imsg.Release(shm_)); + DEFER1(imsg.Release(shm_)); - BHMsg msg; - if (!imsg.Unpack(msg)) { - return; - } + BHMsg msg; + if (!imsg.Unpack(msg)) { + return; + } - auto OnSubChange = [&](auto &&update) { - DataSub sub; - if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) { - assert(sizeof(MQId) == msg.route(0).mq_id().size()); - MQId client; - memcpy(&client, msg.route(0).mq_id().data(), sizeof(client)); + auto OnSubChange = [&](auto &&update) { + DataSub sub; + if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) { + assert(sizeof(MQId) == msg.route(0).mq_id().size()); + MQId client; + memcpy(&client, msg.route(0).mq_id().data(), sizeof(client)); - std::lock_guard<std::mutex> guard(mutex_); - auto &topics = sub.topics(); - for (auto &topic : topics) { - try { - update(topic, client); - } catch(...) { - //TODO log error - } - } - } - }; + std::lock_guard<std::mutex> guard(mutex_); + auto &topics = sub.topics(); + for (auto &topic : topics) { + try { + update(topic, client); + } catch (...) { + //TODO log error + } + } + } + }; - auto Sub1 = [this](const std::string &topic, const MQId &id) { - records_[topic].insert(id); - }; + auto Sub1 = [this](const std::string &topic, const MQId &id) { + records_[topic].insert(id); + }; - auto Unsub1 = [this](const std::string &topic, const MQId &id) { - auto pos = records_.find(topic); - if (pos != records_.end()) { - if (pos->second.erase(id) && pos->second.empty()) { - records_.erase(pos); - } - } - }; + auto Unsub1 = [this](const std::string &topic, const MQId &id) { + auto pos = records_.find(topic); + if (pos != records_.end()) { + if (pos->second.erase(id) && pos->second.empty()) { + records_.erase(pos); + } + } + }; - auto OnPublish = [&]() { - DataPub pub; - if (!pub.ParseFromString(msg.body())) { - return; - } - auto FindClients = [&](const std::string &topic){ - Clients dests; - std::lock_guard<std::mutex> guard(mutex_); - auto Find1 = [&](const std::string &t) { - auto pos = records_.find(topic); - if (pos != records_.end() && !pos->second.empty()) { - auto &clients = pos->second; - for (auto &cli : clients) { - dests.insert(cli); - } - } - }; - Find1(topic); + auto OnPublish = [&]() { + DataPub pub; + if (!pub.ParseFromString(msg.body())) { + return; + } + auto FindClients = [&](const std::string &topic) { + Clients dests; + std::lock_guard<std::mutex> guard(mutex_); + auto Find1 = [&](const std::string &t) { + auto pos = records_.find(topic); + if (pos != records_.end() && !pos->second.empty()) { + auto &clients = pos->second; + for (auto &cli : clients) { + dests.insert(cli); + } + } + }; + Find1(topic); - //TODO check and adjust topic on client side sub/pub. - size_t pos = 0; - while (true) { - pos = topic.find(kTopicSep, pos); - if (pos == topic.npos || ++pos == topic.size()) { - // Find1(std::string()); // sub all. - break; - } else { - Find1(topic.substr(0, pos)); - } - } - return dests; - }; + //TODO check and adjust topic on client side sub/pub. + size_t pos = 0; + while (true) { + pos = topic.find(kTopicSep, pos); + if (pos == topic.npos || ++pos == topic.size()) { + // Find1(std::string()); // sub all. + break; + } else { + Find1(topic.substr(0, pos)); + } + } + return dests; + }; - auto Dispatch = [&](auto &&send1) { - const Clients &clients(FindClients(pub.topic())); - for (auto &cli : clients) { - send1(cli); - } - }; + auto Dispatch = [&](auto &&send1) { + const Clients &clients(FindClients(pub.topic())); + for (auto &cli : clients) { + send1(cli); + } + }; - if (imsg.IsCounted()) { - Dispatch([&](const MQId &cli) { busq_.Send(cli, imsg, 100); }); - } else { - MsgI pubmsg; - if (!pubmsg.MakeRC(shm_, msg)) { return; } - DEFER1(pubmsg.Release(shm_)); + if (imsg.IsCounted()) { + Dispatch([&](const MQId &cli) { busq_.Send(cli, imsg, 100); }); + } else { + MsgI pubmsg; + if (!pubmsg.MakeRC(shm_, msg)) { return; } + DEFER1(pubmsg.Release(shm_)); - Dispatch([&](const MQId &cli) { busq_.Send(cli, pubmsg, 100); }); - } - }; + Dispatch([&](const MQId &cli) { busq_.Send(cli, pubmsg, 100); }); + } + }; - switch (msg.type()) { - case kMsgTypeSubscribe: OnSubChange(Sub1); break; - case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break; - case kMsgTypePublish : OnPublish(); break; - default: break; - } + switch (msg.type()) { + case kMsgTypeSubscribe: OnSubChange(Sub1); break; + case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break; + case kMsgTypePublish: OnPublish(); break; + default: break; + } } } // namespace bhome_shm - diff --git a/src/pubsub.h b/src/pubsub.h index 11fa4e4..dc3fced 100644 --- a/src/pubsub.h +++ b/src/pubsub.h @@ -19,35 +19,36 @@ #define PUBSUB_4KGRA997 #include "shm_queue.h" -#include <thread> #include <atomic> #include <mutex> -#include <vector> -#include <unordered_map> #include <set> +#include <thread> +#include <unordered_map> +#include <vector> -namespace bhome_shm { +namespace bhome_shm +{ // publish/subcribe manager. class BusManager { - SharedMemory &shm_; - ShmMsgQueue busq_; - std::atomic<bool> run_; - std::vector<std::thread> workers_; - std::mutex mutex_; - typedef std::set<MQId> Clients; - std::unordered_map<std::string, Clients> records_; + SharedMemory &shm_; + ShmMsgQueue busq_; + std::atomic<bool> run_; + std::vector<std::thread> workers_; + std::mutex mutex_; + typedef std::set<MQId> Clients; + std::unordered_map<std::string, Clients> records_; - bool StopNoLock(); - void OnMsg(MsgI &msg); + bool StopNoLock(); + void OnMsg(MsgI &msg); + public: - BusManager(SharedMemory &shm); - ~BusManager(); - bool Start(const int nworker = 2); - bool Stop(); + BusManager(SharedMemory &shm); + ~BusManager(); + bool Start(const int nworker = 2); + bool Stop(); }; - } // namespace bhome_shm diff --git a/src/shm.cpp b/src/shm.cpp index d628b56..1658900 100644 --- a/src/shm.cpp +++ b/src/shm.cpp @@ -21,9 +21,9 @@ namespace bhome_shm { -SharedMemory::SharedMemory(const std::string &name, const uint64_t size) - : mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()), - name_(name) +SharedMemory::SharedMemory(const std::string &name, const uint64_t size) : + mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()), + name_(name) { } diff --git a/src/shm.h b/src/shm.h index 0f68754..3fce99d 100644 --- a/src/shm.h +++ b/src/shm.h @@ -19,14 +19,15 @@ #ifndef SHM_6CHO6D6C #define SHM_6CHO6D6C +#include <boost/interprocess/managed_shared_memory.hpp> +#include <boost/interprocess/sync/interprocess_condition.hpp> +#include <boost/interprocess/sync/interprocess_mutex.hpp> +#include <boost/interprocess/sync/scoped_lock.hpp> #include <boost/noncopyable.hpp> #include <boost/uuid/uuid.hpp> -#include <boost/interprocess/managed_shared_memory.hpp> -#include <boost/interprocess/sync/interprocess_mutex.hpp> -#include <boost/interprocess/sync/interprocess_condition.hpp> -#include <boost/interprocess/sync/scoped_lock.hpp> -namespace bhome_shm { +namespace bhome_shm +{ using namespace boost::interprocess; @@ -37,73 +38,90 @@ class SharedMemory : public mshm_t { - std::string name_; + std::string name_; - static permissions AllowAll() { - permissions perm; - perm.set_unrestricted(); - return perm; - } - void Swap(SharedMemory &a); + static permissions AllowAll() + { + permissions perm; + perm.set_unrestricted(); + return perm; + } + void Swap(SharedMemory &a); + public: - static bool Remove(const std::string &name) { - return shared_memory_object::remove(name.c_str()); - } - SharedMemory(const std::string &name, const uint64_t size); - ~SharedMemory(); - std::string name() const { return name_; } - bool Remove() { return Remove(name()); } + static bool Remove(const std::string &name) { return shared_memory_object::remove(name.c_str()); } - void *Alloc(const size_t size) { return allocate(size, std::nothrow); } - void Dealloc(void *p) { if(p) { deallocate(p); } } - template<class T> void Dealloc(offset_ptr<T> ptr) { return Dealloc(ptr.get()); } + SharedMemory(const std::string &name, const uint64_t size); + ~SharedMemory(); + std::string name() const { return name_; } + bool Remove() { return Remove(name()); } - template <class T, class ...Params> T * New(Params const&...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); } - template <class T> void Delete(T *p) { if (p) { destroy_ptr<T>(p); }; } - template <class T> void Delete(offset_ptr<T> p) { Delete(p.get()); } - template <class T> T *Find(const std::string &name) { return find<T>(name.c_str()).first; } + void *Alloc(const size_t size) { return allocate(size, std::nothrow); } + void Dealloc(void *p) + { + if (p) { deallocate(p); } + } + template <class T> + void Dealloc(offset_ptr<T> ptr) { return Dealloc(ptr.get()); } + template <class T, class... Params> + T *New(Params const &...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); } + template <class T> + void Delete(T *p) + { + if (p) { destroy_ptr<T>(p); }; + } + template <class T> + void Delete(offset_ptr<T> p) { Delete(p.get()); } + template <class T> + T *Find(const std::string &name) { return find<T>(name.c_str()).first; } }; // ShmObject manages an object in shared memory, but ShmObject itself is not in shared memory. // works like a smart pointer of an object in shared memory. // TODO handshake with center, and can be removed if killed. template <class T> -class ShmObject : private boost::noncopyable { - static std::string ObjName(const std::string &name) { return "obj" + name; } -protected: - typedef T Data; - typedef SharedMemory ShmType; -private: - ShmType &shm_; - std::string name_; - Data *pdata_ = nullptr; +class ShmObject : private boost::noncopyable +{ + static std::string ObjName(const std::string &name) { return "obj" + name; } - bool IsOk() const { return pdata_; } protected: - ShmType &shm() const { return shm_; } + typedef T Data; + typedef SharedMemory ShmType; + +private: + ShmType &shm_; + std::string name_; + Data *pdata_ = nullptr; + + bool IsOk() const { return pdata_; } + +protected: + ShmType &shm() const { return shm_; } + public: - template <class...Params> - ShmObject(ShmType &segment, const std::string &name, Params&&...t): - shm_(segment), name_(name) - { - pdata_ = shm_.find_or_construct<Data>(ObjName(name_).c_str(), std::nothrow)(t...); - if (!IsOk()) { - throw("Error: Not enough memory, can not allocate \"" + name_ + "\""); - } - } - static Data *Find(SharedMemory &shm, const std::string &name) { return shm.Find<Data>(ObjName(name)); } - Data *Find(const std::string &name) { return Find(shm_, ObjName(name)); } - virtual ~ShmObject() {} - std::string name() const { return name_; } - Data* data() { return pdata_; } - const Data* data() const { return pdata_; } - Data* operator->() { return data(); } - const Data* operator->() const { return data(); } - bool Remove() { return shm_.destroy<Data>(ObjName(name_).c_str()); } + template <class... Params> + ShmObject(ShmType &segment, const std::string &name, Params &&...t) : + shm_(segment), name_(name) + { + pdata_ = shm_.find_or_construct<Data>(ObjName(name_).c_str(), std::nothrow)(t...); + if (!IsOk()) { + throw("Error: Not enough memory, can not allocate \"" + name_ + "\""); + } + } + static Data *Find(SharedMemory &shm, const std::string &name) { return shm.Find<Data>(ObjName(name)); } + Data *Find(const std::string &name) { return Find(shm_, ObjName(name)); } + virtual ~ShmObject() {} + std::string name() const { return name_; } + Data *data() { return pdata_; } + const Data *data() const { return pdata_; } + Data *operator->() { return data(); } + const Data *operator->() const { return data(); } + bool Remove() { return shm_.destroy<Data>(ObjName(name_).c_str()); } }; -template <class D> using Allocator = allocator<D, SharedMemory::segment_manager>; +template <class D> +using Allocator = allocator<D, SharedMemory::segment_manager>; } // namespace bhome_shm diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp index 421cebf..cf4c8b4 100644 --- a/src/shm_queue.cpp +++ b/src/shm_queue.cpp @@ -17,52 +17,57 @@ */ #include "shm_queue.h" -#include <boost/uuid/uuid_io.hpp> -#include <boost/uuid/uuid_generators.hpp> #include "bh_util.h" +#include <boost/uuid/uuid_generators.hpp> +#include <boost/uuid/uuid_io.hpp> -namespace bhome_shm { -using namespace bhome_msg; +namespace bhome_shm +{ +using namespace bhome_msg; using namespace boost::interprocess; using namespace boost::uuids; -namespace { -std::string MsgQIdToName(const MQId& id) { return "shmq" + to_string(id); } +namespace +{ +std::string MsgQIdToName(const MQId &id) { return "shmq" + to_string(id); } // MQId EmptyId() { return nil_uuid(); } MQId NewId() { return random_generator()(); } -const int AdjustMQLength(const int len) { - const int kMaxLength = 10000; - const int kDefaultLen = 12; - if (len <= 0) { - return kDefaultLen; - } else if (len < kMaxLength) { - return len; - } else { - return kMaxLength; - } +const int AdjustMQLength(const int len) +{ + const int kMaxLength = 10000; + const int kDefaultLen = 12; + if (len <= 0) { + return kDefaultLen; + } else if (len < kMaxLength) { + return len; + } else { + return kMaxLength; + } } -} +} // namespace // ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2 -ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const int len): -Super(segment, MsgQIdToName(id), AdjustMQLength(len), segment.get_segment_manager()), -id_(id) -{} +ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const int len) : + Super(segment, MsgQIdToName(id), AdjustMQLength(len), segment.get_segment_manager()), + id_(id) +{ +} -ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len): -ShmMsgQueue(NewId(), segment, len) -{} +ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) : + ShmMsgQueue(NewId(), segment, len) +{ +} ShmMsgQueue::~ShmMsgQueue() { - Remove(); + Remove(); } bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms) { - Queue *remote = Find(shm, MsgQIdToName(remote_id)); - return remote && remote->Write(msg, timeout_ms, [](const MsgI&msg){msg.AddRef();}); + Queue *remote = Find(shm, MsgQIdToName(remote_id)); + return remote && remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); }); } // bool ShmMsgQueue::Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms) @@ -73,15 +78,15 @@ bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms) { - MsgI msg; - if(msg.Make(shm(), data)) { - if(Send(remote_id, msg, timeout_ms)) { - return true; - } else { - msg.Release(shm()); - } - } - return false; + MsgI msg; + if (msg.Make(shm(), data)) { + if (Send(remote_id, msg, timeout_ms)) { + return true; + } else { + msg.Release(shm()); + } + } + return false; } /* @@ -105,13 +110,13 @@ //*/ bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms) { - MsgI imsg; - if (Read(imsg, timeout_ms)) { - DEFER1(imsg.Release(shm());); - return imsg.Unpack(msg); - } else { - return false; - } + MsgI imsg; + if (Read(imsg, timeout_ms)) { + DEFER1(imsg.Release(shm());); + return imsg.Unpack(msg); + } else { + return false; + } } } // namespace bhome_shm diff --git a/src/shm_queue.h b/src/shm_queue.h index 60b1862..9064f55 100644 --- a/src/shm_queue.h +++ b/src/shm_queue.h @@ -19,111 +19,125 @@ #ifndef SHM_QUEUE_JE0OEUP3 #define SHM_QUEUE_JE0OEUP3 -#include "shm.h" #include "msg.h" +#include "shm.h" #include <boost/circular_buffer.hpp> #include <boost/date_time/posix_time/posix_time.hpp> -namespace bhome_shm { - -template <class D> using Circular = boost::circular_buffer<D, Allocator<D> >; +namespace bhome_shm +{ + +template <class D> +using Circular = boost::circular_buffer<D, Allocator<D>>; typedef boost::uuids::uuid MQId; template <class D> class SharedQueue : private Circular<D> { - typedef Circular<D> Super; - Mutex mutex_; - Cond cond_read_; - Cond cond_write_; - Mutex & mutex() { return mutex_; } + typedef Circular<D> Super; + Mutex mutex_; + Cond cond_read_; + Cond cond_write_; + Mutex &mutex() { return mutex_; } - static boost::posix_time::ptime MSFromNow(const int ms) - { - using namespace boost::posix_time; - ptime cur = boost::posix_time::microsec_clock::universal_time(); - return cur + millisec(ms); - } + static boost::posix_time::ptime MSFromNow(const int ms) + { + using namespace boost::posix_time; + ptime cur = boost::posix_time::microsec_clock::universal_time(); + return cur + millisec(ms); + } public: - SharedQueue(const uint32_t len, Allocator<D> const& alloc):Super(len, alloc) {} - using Super::size; - using Super::capacity; - template <class Iter, class OnWrite> - int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite) { - int n = 0; - if (begin != end) { - auto endtime = MSFromNow(timeout_ms); - Guard lock(mutex()); - while (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); })) { - onWrite(*begin); - this->push_back(*begin); - ++n; - cond_read_.notify_one(); - if (++begin == end) { - break; - } - } - } - return n; - } - template <class OnWrite> - bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) { - return Write(&buf, (&buf)+1, timeout_ms, onWrite); - } - bool Write(const D &buf, const int timeout_ms) { return Write(buf, timeout_ms, [](const D &buf){}); } + SharedQueue(const uint32_t len, Allocator<D> const &alloc) : + Super(len, alloc) {} + using Super::capacity; + using Super::size; + template <class Iter, class OnWrite> + int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite) + { + int n = 0; + if (begin != end) { + auto endtime = MSFromNow(timeout_ms); + Guard lock(mutex()); + while (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); })) { + onWrite(*begin); + this->push_back(*begin); + ++n; + cond_read_.notify_one(); + if (++begin == end) { + break; + } + } + } + return n; + } - template <class OnData> - bool Read(const int timeout_ms, OnData onData){ - int n = 0; - auto endtime = MSFromNow(timeout_ms); - Guard lock(mutex()); - while (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })) { - const bool more = onData(this->front()); - this->pop_front(); - cond_write_.notify_one(); - ++n; - if (!more) { - break; - } - } - return n; - } - bool Read(D &buf, const int timeout_ms){ - auto read1 = [&](D &d) { - using std::swap; - swap(buf, d); - return false; - }; - return Read(timeout_ms, read1) == 1; - } + template <class OnWrite> + bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) + { + return Write(&buf, (&buf) + 1, timeout_ms, onWrite); + } + bool Write(const D &buf, const int timeout_ms) + { + return Write(buf, timeout_ms, [](const D &buf) {}); + } + + template <class OnData> + bool Read(const int timeout_ms, OnData onData) + { + int n = 0; + auto endtime = MSFromNow(timeout_ms); + Guard lock(mutex()); + while (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })) { + const bool more = onData(this->front()); + this->pop_front(); + cond_write_.notify_one(); + ++n; + if (!more) { + break; + } + } + return n; + } + + bool Read(D &buf, const int timeout_ms) + { + auto read1 = [&](D &d) { + using std::swap; + swap(buf, d); + return false; + }; + return Read(timeout_ms, read1) == 1; + } }; using namespace bhome_msg; -class ShmMsgQueue : private ShmObject<SharedQueue<MsgI> > +class ShmMsgQueue : private ShmObject<SharedQueue<MsgI>> { - typedef ShmObject<SharedQueue<MsgI> > Super; - typedef Super::Data Queue; - bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); } - bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); } - MQId id_; -protected: - ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use. -public: - ShmMsgQueue(const MQId &id, ShmType &segment, const int len); - ShmMsgQueue(ShmType &segment, const int len); - ~ShmMsgQueue(); - const MQId &Id() const { return id_; } + typedef ShmObject<SharedQueue<MsgI>> Super; + typedef Super::Data Queue; + bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); } + bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); } + MQId id_; - bool Recv(BHMsg &msg, const int timeout_ms); - bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); } - bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms); - static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms); - bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms) { - return Send(shm(), remote_id, msg, timeout_ms); - } +protected: + ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use. +public: + ShmMsgQueue(const MQId &id, ShmType &segment, const int len); + ShmMsgQueue(ShmType &segment, const int len); + ~ShmMsgQueue(); + const MQId &Id() const { return id_; } + + bool Recv(BHMsg &msg, const int timeout_ms); + bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); } + bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms); + static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms); + bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms) + { + return Send(shm(), remote_id, msg, timeout_ms); + } }; } // namespace bhome_shm diff --git a/src/socket.cpp b/src/socket.cpp index 1c28bfa..21928b8 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -17,118 +17,118 @@ */ #include "socket.h" -#include <chrono> -#include "msg.h" -#include "defs.h" #include "bh_util.h" +#include "defs.h" +#include "msg.h" +#include <chrono> using namespace bhome_msg; using namespace bhome_shm; using namespace std::chrono_literals; -namespace { - -int GetSocketDefaultLen(ShmSocket::Type type) +namespace { - switch (type) { - case ShmSocket::eSockRequest : return 12; - case ShmSocket::eSockReply : return 64; - case ShmSocket::eSockPublish : return 0; - case ShmSocket::eSockSubscribe : return 64; - default: return 0; - } -} - -} - -ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) - : shm_(shm), - type_(type), - run_(false) +int GetSocketDefaultLen(ShmSocket::Type type) { - int len = GetSocketDefaultLen(type); - if (len != 0) { - mq_.reset(new Queue(shm_, len)); - - auto RecvProc = [this](){ - while (run_) { - try { - std::unique_lock<std::mutex> lk(mutex_); - if (cv_recv_cb_.wait_for(lk, 100ms, [this](){ return HasRecvCB(); })) { - BHMsg msg; - if (mq_->Recv(msg, 100)) { - this->onRecv_(msg); - } - } - } catch (...) {} - } - }; - run_.store(true); - workers_.emplace_back(RecvProc); - } + switch (type) { + case ShmSocket::eSockRequest: return 12; + case ShmSocket::eSockReply: return 64; + case ShmSocket::eSockPublish: return 0; + case ShmSocket::eSockSubscribe: return 64; + default: return 0; + } } -ShmSocket::ShmSocket(Type type) - : ShmSocket(type, BHomeShm()) + +} // namespace + +ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) : + shm_(shm), type_(type), run_(false) +{ + int len = GetSocketDefaultLen(type); + if (len != 0) { + mq_.reset(new Queue(shm_, len)); + + auto RecvProc = [this]() { + while (run_) { + try { + std::unique_lock<std::mutex> lk(mutex_); + if (cv_recv_cb_.wait_for(lk, 100ms, [this]() { return HasRecvCB(); })) { + BHMsg msg; + if (mq_->Recv(msg, 100)) { + this->onRecv_(msg); + } + } + } catch (...) { + } + } + }; + run_.store(true); + workers_.emplace_back(RecvProc); + } +} + +ShmSocket::ShmSocket(Type type) : + ShmSocket(type, BHomeShm()) { } ShmSocket::~ShmSocket() { - Stop(); + Stop(); } bool ShmSocket::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms) { - if (type_ != eSockPublish) { - return false; - } - assert(!mq_); - try { - MsgI imsg; - if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) { - return false; - } - DEFER1(imsg.Release(shm_)); - return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms); - - } catch (...) { - return false; - } + if (type_ != eSockPublish) { + return false; + } + assert(!mq_); + try { + MsgI imsg; + if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) { + return false; + } + DEFER1(imsg.Release(shm_)); + return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms); + + } catch (...) { + return false; + } } bool ShmSocket::Subscribe(const std::vector<std::string> &topics, const int timeout_ms) { - if (type_ != eSockSubscribe) { - return false; - } - assert(mq_); - try { - return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms); - } catch (...) { - return false; - } + if (type_ != eSockSubscribe) { + return false; + } + assert(mq_); + try { + return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms); + } catch (...) { + return false; + } } bool ShmSocket::SetRecvCallback(const RecvCB &onRecv) { - std::lock_guard<std::mutex> lock(mutex_); - onRecv_ = onRecv; - cv_recv_cb_.notify_one(); - return true; + std::lock_guard<std::mutex> lock(mutex_); + onRecv_ = onRecv; + cv_recv_cb_.notify_one(); + return true; } bool ShmSocket::HasRecvCB() { - return static_cast<bool>(onRecv_); + return static_cast<bool>(onRecv_); } void ShmSocket::Stop() { - run_ = false; - for (auto &t : workers_) { - if (t.joinable()) { - t.join(); - } - } + run_ = false; + for (auto &t : workers_) { + if (t.joinable()) { + t.join(); + } + } } \ No newline at end of file diff --git a/src/socket.h b/src/socket.h index e65ac83..92c1b73 100644 --- a/src/socket.h +++ b/src/socket.h @@ -20,55 +20,56 @@ #define SOCKET_GWTJHBPO #include "shm_queue.h" -#include <vector> -#include <thread> -#include <memory> -#include <functional> -#include <mutex> -#include <condition_variable> #include <atomic> +#include <condition_variable> +#include <functional> +#include <memory> +#include <mutex> +#include <thread> +#include <vector> class ShmSocket { - typedef bhome_shm::ShmMsgQueue Queue; + typedef bhome_shm::ShmMsgQueue Queue; + public: - enum Type { - eSockRequest, - eSockReply, - eSockSubscribe, - eSockPublish, - }; - typedef std::function<void (bhome_msg::BHMsg &msg)> RecvCB; + enum Type { + eSockRequest, + eSockReply, + eSockSubscribe, + eSockPublish, + }; + typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB; - ShmSocket(Type type); - ShmSocket(Type type, bhome_shm::SharedMemory &shm); - ~ShmSocket(); + ShmSocket(Type type); + ShmSocket(Type type, bhome_shm::SharedMemory &shm); + ~ShmSocket(); - // bool Request(const std::string &topic, const void *data, const size_t size, onReply); - bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv + // bool Request(const std::string &topic, const void *data, const size_t size, onReply); + bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv - // bool HandleRequest(onData); - bool ReadRequest(); // exclude with HandleRequest - bool SendReply(); // exclude with HandleRequest + // bool HandleRequest(onData); + bool ReadRequest(); // exclude with HandleRequest + bool SendReply(); // exclude with HandleRequest - bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms); - bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms); - bool RecvSub(std::string &topic, std::string &data, const int timeout_ms); - bool SetRecvCallback(const RecvCB &onRecv); + bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms); + bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms); + bool RecvSub(std::string &topic, std::string &data, const int timeout_ms); + bool SetRecvCallback(const RecvCB &onRecv); + private: - bool HasRecvCB(); - void Stop(); + bool HasRecvCB(); + void Stop(); - bhome_shm::SharedMemory &shm_; - Type type_; - std::vector<std::thread> workers_; - std::mutex mutex_; - std::condition_variable cv_recv_cb_; - std::atomic<bool> run_; - RecvCB onRecv_; + bhome_shm::SharedMemory &shm_; + Type type_; + std::vector<std::thread> workers_; + std::mutex mutex_; + std::condition_variable cv_recv_cb_; + std::atomic<bool> run_; + RecvCB onRecv_; - std::unique_ptr<Queue> mq_; + std::unique_ptr<Queue> mq_; }; - #endif // end of include guard: SOCKET_GWTJHBPO diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp index eff0209..06093fd 100644 --- a/utest/simple_tests.cpp +++ b/utest/simple_tests.cpp @@ -18,126 +18,123 @@ #include "util.h" -struct s1000 { char a[1000]; }; - +struct s1000 { + char a[1000]; +}; BOOST_AUTO_TEST_CASE(BasicTest) { - const std::string shm_name("basic"); - ShmRemover auto_remove(shm_name); - SharedMemory shm(shm_name, 1024*1024*10); - auto Avail = [&]() { return shm.get_free_memory(); }; + const std::string shm_name("basic"); + ShmRemover auto_remove(shm_name); + SharedMemory shm(shm_name, 1024 * 1024 * 10); + auto Avail = [&]() { return shm.get_free_memory(); }; - offset_ptr<const void> p; - BOOST_CHECK(!p); - BOOST_CHECK(p.get() == 0); - p = 0; - BOOST_CHECK(!p); - BOOST_CHECK(p.get() == 0); - const char *str = "basic"; - p = str; - BOOST_CHECK(p); - BOOST_CHECK(p.get() == str); - p = 0; - BOOST_CHECK(!p); - BOOST_CHECK(p.get() == 0); + offset_ptr<const void> p; + BOOST_CHECK(!p); + BOOST_CHECK(p.get() == 0); + p = 0; + BOOST_CHECK(!p); + BOOST_CHECK(p.get() == 0); + const char *str = "basic"; + p = str; + BOOST_CHECK(p); + BOOST_CHECK(p.get() == str); + p = 0; + BOOST_CHECK(!p); + BOOST_CHECK(p.get() == 0); + auto init_avail = Avail(); - auto init_avail = Avail(); + auto BasicTest = [&](int tid, int nloop) { + auto Code = [&](int id) { + typedef ShmObject<s1000> Int; + std::string name = std::to_string(id); + auto a0 = Avail(); + Int i1(shm, name); + auto a1 = Avail(); + BOOST_CHECK_LT(a1, a0); + printf("s1000 size: %ld\n", a0 - a1); + i1->a[0] = 5; + Int i2(shm, name); + auto a2 = Avail(); + BOOST_CHECK_EQUAL(a1, a2); + BOOST_CHECK_EQUAL(i1.data(), i2.data()); + int i = i1.Remove(); + BOOST_CHECK_EQUAL(Avail(), a0); - auto BasicTest = [&](int tid, int nloop) { - auto Code = [&](int id) { + { + auto old = Avail(); + void *p = shm.Alloc(1024); + shm.Dealloc(p); + BOOST_CHECK_EQUAL(old, Avail()); + } - typedef ShmObject<s1000> Int; - std::string name = std::to_string(id); - auto a0 = Avail(); - Int i1(shm, name); - auto a1 = Avail(); - BOOST_CHECK_LT(a1, a0); - printf("s1000 size: %ld\n", a0 - a1); - i1->a[0] = 5; - Int i2(shm, name); - auto a2 = Avail(); - BOOST_CHECK_EQUAL(a1, a2); - BOOST_CHECK_EQUAL(i1.data(), i2.data()); - int i = i1.Remove(); - BOOST_CHECK_EQUAL(Avail(), a0); + bool r = shared_memory_object::remove(shm_name.c_str()); + BOOST_CHECK(r); + }; + for (int i = 0; i < nloop; ++i) { + Code(i + tid * nloop); + } + }; - { - auto old = Avail(); - void *p = shm.Alloc(1024); - shm.Dealloc(p); - BOOST_CHECK_EQUAL(old, Avail()); - } - - bool r = shared_memory_object::remove(shm_name.c_str()); - BOOST_CHECK(r); - }; - for (int i = 0; i < nloop; ++i) { - Code(i + tid*nloop); - } - }; - - // boost::timer::auto_cpu_timer timer; - ThreadManager threads; - int nthread = 1; - int nloop = 1; - for (int i = 0; i < nthread; ++i) - { - threads.Launch(BasicTest, i, nloop); - } - BOOST_CHECK_EQUAL(init_avail, Avail()); + // boost::timer::auto_cpu_timer timer; + ThreadManager threads; + int nthread = 1; + int nloop = 1; + for (int i = 0; i < nthread; ++i) { + threads.Launch(BasicTest, i, nloop); + } + BOOST_CHECK_EQUAL(init_avail, Avail()); } BOOST_AUTO_TEST_CASE(ForkTest) { - ProcessManager procs; - const int nproc = 10; + ProcessManager procs; + const int nproc = 10; - printf("Testing fork:\n"); + printf("Testing fork:\n"); - auto child = [&](int id) { - std::this_thread::sleep_for(100ms *id); - printf("child id: %3d/%d ends\r", id, nproc); - }; + auto child = [&](int id) { + std::this_thread::sleep_for(100ms * id); + printf("child id: %3d/%d ends\r", id, nproc); + }; - for (int i = 0; i < nproc; ++i) { - procs.Launch(child, i+1); - } + for (int i = 0; i < nproc; ++i) { + procs.Launch(child, i + 1); + } } BOOST_AUTO_TEST_CASE(TimedWaitTest) { - const std::string shm_name("shm_wait"); - ShmRemover auto_remove(shm_name); - SharedMemory shm(shm_name, 1024*1024); - ShmMsgQueue q(shm, 64); - for (int i = 0; i < 2; ++i) { - int ms = i * 100; - printf("Timeout Test %4d: ", ms); - boost::timer::auto_cpu_timer timer; - BHMsg msg; - bool r = q.Recv(msg, ms); - BOOST_CHECK(!r); - } + const std::string shm_name("shm_wait"); + ShmRemover auto_remove(shm_name); + SharedMemory shm(shm_name, 1024 * 1024); + ShmMsgQueue q(shm, 64); + for (int i = 0; i < 2; ++i) { + int ms = i * 100; + printf("Timeout Test %4d: ", ms); + boost::timer::auto_cpu_timer timer; + BHMsg msg; + bool r = q.Recv(msg, ms); + BOOST_CHECK(!r); + } } BOOST_AUTO_TEST_CASE(RefCountTest) { - const std::string shm_name("ShmRefCount"); - ShmRemover auto_remove(shm_name); - SharedMemory shm(shm_name, 1024*1024); + const std::string shm_name("ShmRefCount"); + ShmRemover auto_remove(shm_name); + SharedMemory shm(shm_name, 1024 * 1024); - MsgI m0(shm.Alloc(1000), shm.New<RefCount>()); - BOOST_CHECK(m0.IsCounted()); - BOOST_CHECK_EQUAL(m0.Count(), 1); - MsgI m1 = m0; - BOOST_CHECK(m1.IsCounted()); - BOOST_CHECK_EQUAL(m1.AddRef(), 2); - BOOST_CHECK_EQUAL(m0.AddRef(), 3); - BOOST_CHECK_EQUAL(m0.Release(shm), 2); - BOOST_CHECK_EQUAL(m0.Release(shm), 1); - BOOST_CHECK_EQUAL(m1.Release(shm), 0); - BOOST_CHECK(!m1.IsCounted()); + MsgI m0(shm.Alloc(1000), shm.New<RefCount>()); + BOOST_CHECK(m0.IsCounted()); + BOOST_CHECK_EQUAL(m0.Count(), 1); + MsgI m1 = m0; + BOOST_CHECK(m1.IsCounted()); + BOOST_CHECK_EQUAL(m1.AddRef(), 2); + BOOST_CHECK_EQUAL(m0.AddRef(), 3); + BOOST_CHECK_EQUAL(m0.Release(shm), 2); + BOOST_CHECK_EQUAL(m0.Release(shm), 1); + BOOST_CHECK_EQUAL(m1.Release(shm), 0); + BOOST_CHECK(!m1.IsCounted()); } - diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index b1cba46..35465bb 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -22,176 +22,175 @@ BOOST_AUTO_TEST_CASE(SpeedTest) { - const std::string shm_name("ShmSpeed"); - ShmRemover auto_remove(shm_name); - const int mem_size = 1024*1024*50; - MQId id = boost::uuids::random_generator()(); - const int timeout = 100; - const uint32_t data_size = 4000; + const std::string shm_name("ShmSpeed"); + ShmRemover auto_remove(shm_name); + const int mem_size = 1024 * 1024 * 50; + MQId id = boost::uuids::random_generator()(); + const int timeout = 100; + const uint32_t data_size = 4000; - auto Writer = [&](int writer_id, uint64_t n) { - SharedMemory shm(shm_name, mem_size); - ShmMsgQueue mq(shm, 64); - std::string str(data_size, 'a'); - MsgI msg; - DEFER1(msg.Release(shm);); - msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size())); - for (uint64_t i = 0; i < n; ++i) { - // mq.Send(id, str.data(), str.size(), timeout); - mq.Send(id, msg, timeout); - } - }; - auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork){ - SharedMemory shm(shm_name, mem_size); - ShmMsgQueue mq(id, shm, 1000); - while(*run) { - BHMsg msg; - if (mq.Recv(msg, timeout)) { - // ok - } else if (isfork) { - exit(0); // for forked quit after 1s. - } - } - }; - auto State = [&](std::atomic<bool> *run){ - SharedMemory shm(shm_name, mem_size); - auto init = shm.get_free_memory(); - printf("shm init : %ld\n", init); - while (*run) { - auto cur = shm.get_free_memory(); - printf("shm used : %8ld/%ld\n", init - cur, init); - std::this_thread::sleep_for(1s); - } - }; + auto Writer = [&](int writer_id, uint64_t n) { + SharedMemory shm(shm_name, mem_size); + ShmMsgQueue mq(shm, 64); + std::string str(data_size, 'a'); + MsgI msg; + DEFER1(msg.Release(shm);); + msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size())); + for (uint64_t i = 0; i < n; ++i) { + // mq.Send(id, str.data(), str.size(), timeout); + mq.Send(id, msg, timeout); + } + }; + auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) { + SharedMemory shm(shm_name, mem_size); + ShmMsgQueue mq(id, shm, 1000); + while (*run) { + BHMsg msg; + if (mq.Recv(msg, timeout)) { + // ok + } else if (isfork) { + exit(0); // for forked quit after 1s. + } + } + }; + auto State = [&](std::atomic<bool> *run) { + SharedMemory shm(shm_name, mem_size); + auto init = shm.get_free_memory(); + printf("shm init : %ld\n", init); + while (*run) { + auto cur = shm.get_free_memory(); + printf("shm used : %8ld/%ld\n", init - cur, init); + std::this_thread::sleep_for(1s); + } + }; - int nwriters[] = {1,2,4}; - int nreaders[] = {1,2}; + int nwriters[] = {1, 2, 4}; + int nreaders[] = {1, 2}; - auto Test = [&](auto &www, auto &rrr, bool isfork) { - for (auto nreader : nreaders) { - for (auto nwriter : nwriters) { - const uint64_t nmsg = 1000 * 1000 * 10 / nwriter; - const uint64_t total_msg = nmsg * nwriter; - std::atomic<bool> run(true); - std::this_thread::sleep_for(10ms); - boost::timer::auto_cpu_timer timer; - for (int i = 0; i < nreader; ++i) { - rrr.Launch(Reader, i, &run, isfork); - } - for (int i = 0; i < nwriter; ++i) { - www.Launch(Writer, i, nmsg); - } - www.WaitAll(); - run.store(false); - rrr.WaitAll(); - printf("Write %ld msg R(%3d) W(%3d), : ", total_msg, nreader, nwriter); - } - } - }; + auto Test = [&](auto &www, auto &rrr, bool isfork) { + for (auto nreader : nreaders) { + for (auto nwriter : nwriters) { + const uint64_t nmsg = 1000 * 1000 * 10 / nwriter; + const uint64_t total_msg = nmsg * nwriter; + std::atomic<bool> run(true); + std::this_thread::sleep_for(10ms); + boost::timer::auto_cpu_timer timer; + for (int i = 0; i < nreader; ++i) { + rrr.Launch(Reader, i, &run, isfork); + } + for (int i = 0; i < nwriter; ++i) { + www.Launch(Writer, i, nmsg); + } + www.WaitAll(); + run.store(false); + rrr.WaitAll(); + printf("Write %ld msg R(%3d) W(%3d), : ", total_msg, nreader, nwriter); + } + } + }; - std::atomic<bool> run(true); - ThreadManager state; - state.Launch(State, &run); - // typedef ProcessManager Manager; - // typedef ThreadManager Manager; - // const bool isfork = IsSameType<Manager, ProcessManager>::value; - ProcessManager pw, pr; - printf("================ Testing process io: =======================================================\n"); - Test(pw, pr, true); - ThreadManager tw, tr; - printf("---------------- Testing thread io: -------------------------------------------------------\n"); - Test(tw, tr, false); - run.store(false); + std::atomic<bool> run(true); + ThreadManager state; + state.Launch(State, &run); + // typedef ProcessManager Manager; + // typedef ThreadManager Manager; + // const bool isfork = IsSameType<Manager, ProcessManager>::value; + ProcessManager pw, pr; + printf("================ Testing process io: =======================================================\n"); + Test(pw, pr, true); + ThreadManager tw, tr; + printf("---------------- Testing thread io: -------------------------------------------------------\n"); + Test(tw, tr, false); + run.store(false); } // Request Reply Test BOOST_AUTO_TEST_CASE(RRTest) { - const std::string shm_name("ShmReqRep"); - ShmRemover auto_remove(shm_name); - const int qlen = 64; - const size_t msg_length = 1000; - std::string msg_content(msg_length, 'a'); - msg_content[20] = '\0'; + const std::string shm_name("ShmReqRep"); + ShmRemover auto_remove(shm_name); + const int qlen = 64; + const size_t msg_length = 1000; + std::string msg_content(msg_length, 'a'); + msg_content[20] = '\0'; - SharedMemory shm(shm_name, 1024*1024*50); - auto Avail = [&]() { return shm.get_free_memory(); }; - auto init_avail = Avail(); - ShmMsgQueue srv(shm, qlen); - ShmMsgQueue cli(shm, qlen); + SharedMemory shm(shm_name, 1024 * 1024 * 50); + auto Avail = [&]() { return shm.get_free_memory(); }; + auto init_avail = Avail(); + ShmMsgQueue srv(shm, qlen); + ShmMsgQueue cli(shm, qlen); - MsgI request_rc; - request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size())); - MsgI reply_rc; - reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size())); + MsgI request_rc; + request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size())); + MsgI reply_rc; + reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size())); - std::atomic<uint64_t> count(0); + std::atomic<uint64_t> count(0); - std::atomic<ptime> last_time(Now() - seconds(1)); - std::atomic<uint64_t> last_count(0); + std::atomic<ptime> last_time(Now() - seconds(1)); + std::atomic<uint64_t> last_count(0); - auto Client = [&](int cli_id, int nmsg){ - for (int i = 0; i < nmsg; ++i) { - auto Req = [&]() { - return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100); - }; - auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); }; + auto Client = [&](int cli_id, int nmsg) { + for (int i = 0; i < nmsg; ++i) { + auto Req = [&]() { + return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100); + }; + auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); }; - if (!ReqRC()) { - printf("********** client send error.\n"); - continue; - } - BHMsg msg; - if (!cli.Recv(msg, 1000)) { - printf("********** client recv error.\n"); - } else { - ++count; - auto cur = Now(); - if (last_time.exchange(cur) < cur) { - std::cout << "time: " << cur; - printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n", - count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count()); - } + if (!ReqRC()) { + printf("********** client send error.\n"); + continue; + } + BHMsg msg; + if (!cli.Recv(msg, 1000)) { + printf("********** client recv error.\n"); + } else { + ++count; + auto cur = Now(); + if (last_time.exchange(cur) < cur) { + std::cout << "time: " << cur; + printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n", + count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count()); + } + } + } + }; - } - } - }; + std::atomic<bool> stop(false); + auto Server = [&]() { + BHMsg req; + while (!stop) { + if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) { + auto &mqid = req.route()[0].mq_id(); + MQId src_id; + memcpy(&src_id, mqid.data(), sizeof(src_id)); + auto Reply = [&]() { + return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100); + }; + auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); }; - std::atomic<bool> stop(false); - auto Server = [&](){ - BHMsg req; - while (!stop) { - if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) { - auto &mqid = req.route()[0].mq_id(); - MQId src_id; - memcpy(&src_id, mqid.data(), sizeof(src_id)); - auto Reply = [&]() { - return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100); - }; - auto ReplyRC = [&](){ return srv.Send(src_id, reply_rc, 100); }; + if (ReplyRC()) { + } + } + } + }; - if (ReplyRC()) { - } - } - } - }; + boost::timer::auto_cpu_timer timer; + DEFER1(printf("Request Reply Test:");); - boost::timer::auto_cpu_timer timer; - DEFER1(printf("Request Reply Test:");); - - ThreadManager clients, servers; - for (int i = 0; i < qlen; ++i) { servers.Launch(Server); } - int ncli = 100*1; - uint64_t nmsg = 100*100*2; - printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg); - for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); } - clients.WaitAll(); - printf("request ok: %ld\n", count.load()); - stop = true; - servers.WaitAll(); - BOOST_CHECK(request_rc.IsCounted()); - BOOST_CHECK_EQUAL(request_rc.Count(), 1); - request_rc.Release(shm); - BOOST_CHECK(!request_rc.IsCounted()); - // BOOST_CHECK_THROW(reply.Count(), int); + ThreadManager clients, servers; + for (int i = 0; i < qlen; ++i) { servers.Launch(Server); } + int ncli = 100 * 1; + uint64_t nmsg = 100 * 100 * 2; + printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg); + for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); } + clients.WaitAll(); + printf("request ok: %ld\n", count.load()); + stop = true; + servers.WaitAll(); + BOOST_CHECK(request_rc.IsCounted()); + BOOST_CHECK_EQUAL(request_rc.Count(), 1); + request_rc.Release(shm); + BOOST_CHECK(!request_rc.IsCounted()); + // BOOST_CHECK_THROW(reply.Count(), int); } diff --git a/utest/utest.cpp b/utest/utest.cpp index bb5c14d..473b04e 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -1,167 +1,172 @@ -#include <stdio.h> -#include <string> -#include <vector> -#include <thread> +#include "defs.h" +#include "pubsub.h" +#include "socket.h" +#include "util.h" #include <atomic> #include <boost/uuid/uuid_generators.hpp> #include <boost/uuid/uuid_io.hpp> -#include "pubsub.h" -#include "defs.h" -#include "util.h" -#include "socket.h" +#include <stdio.h> +#include <string> +#include <thread> +#include <vector> -template <class A, class B> struct IsSameType { static const bool value = false; }; -template <class A> struct IsSameType<A,A> { static const bool value = true; }; - +template <class A, class B> +struct IsSameType { + static const bool value = false; +}; +template <class A> +struct IsSameType<A, A> { + static const bool value = true; +}; BOOST_AUTO_TEST_CASE(Temp) { - std::string topics[] = { - "", - ".", - "a", - "sp", - "sport", - "sport.", - "sport.a", - "sport.a.b.c", - "sport.ab.c", - "sport.basketball", - "sport.football", - }; - const char sep = '.'; - auto Adjust = [&](const std::string &user_topic) { - if (user_topic.empty() || user_topic.back() == sep) { - return user_topic; - } else { - return user_topic + sep; - } - }; + std::string topics[] = { + "", + ".", + "a", + "sp", + "sport", + "sport.", + "sport.a", + "sport.a.b.c", + "sport.ab.c", + "sport.basketball", + "sport.football", + }; + const char sep = '.'; + auto Adjust = [&](const std::string &user_topic) { + if (user_topic.empty() || user_topic.back() == sep) { + return user_topic; + } else { + return user_topic + sep; + } + }; - for (auto &t : topics) { - const std::string &a = Adjust(t); - printf("orig: %20s adjusted: %20s parts:[", ("'" + t + "'").c_str(), ('\'' + a + '\'').c_str()); + for (auto &t : topics) { + const std::string &a = Adjust(t); + printf("orig: %20s adjusted: %20s parts:[", ("'" + t + "'").c_str(), ('\'' + a + '\'').c_str()); - size_t pos = 0; - while (true) { - auto &topic = t; - pos = topic.find(kTopicSep, pos); - if (pos == topic.npos || ++pos == topic.size()) { - // Find1(std::string()); // sub all. - break; - } else { - printf("'%s',", topic.substr(0, pos).c_str()); - } - } - printf("]\n"); - } + size_t pos = 0; + while (true) { + auto &topic = t; + pos = topic.find(kTopicSep, pos); + if (pos == topic.npos || ++pos == topic.size()) { + // Find1(std::string()); // sub all. + break; + } else { + printf("'%s',", topic.substr(0, pos).c_str()); + } + } + printf("]\n"); + } } BOOST_AUTO_TEST_CASE(PubSubTest) { - const std::string shm_name("ShmPubSub"); - ShmRemover auto_remove(shm_name); //remove twice? in case of killed? - SharedMemory shm(shm_name, 1024*1024*50); - auto Avail = [&]() { return shm.get_free_memory(); }; - auto init_avail = Avail(); + const std::string shm_name("ShmPubSub"); + ShmRemover auto_remove(shm_name); //remove twice? in case of killed? + SharedMemory shm(shm_name, 1024 * 1024 * 50); + auto Avail = [&]() { return shm.get_free_memory(); }; + auto init_avail = Avail(); - BusManager bus(shm); - bus.Start(1); - std::this_thread::sleep_for(100ms); + BusManager bus(shm); + bus.Start(1); + std::this_thread::sleep_for(100ms); - std::atomic<uint64_t> count(0); - std::atomic<ptime> last_time(Now() - seconds(1)); - std::atomic<uint64_t> last_count(0); + std::atomic<uint64_t> count(0); + std::atomic<ptime> last_time(Now() - seconds(1)); + std::atomic<uint64_t> last_count(0); - const uint64_t nmsg = 100; - const int timeout = 1000; - auto Sub = [&](int id, const std::vector<std::string> &topics) { - ShmSocket client(ShmSocket::eSockSubscribe, shm); - bool r = client.Subscribe(topics, timeout); - std::mutex mutex; - std::condition_variable cv; + const uint64_t nmsg = 100; + const int timeout = 1000; + auto Sub = [&](int id, const std::vector<std::string> &topics) { + ShmSocket client(ShmSocket::eSockSubscribe, shm); + bool r = client.Subscribe(topics, timeout); + std::mutex mutex; + std::condition_variable cv; - int i = 0; - auto OnRecv = [&](BHMsg &msg) { - if (msg.type() != kMsgTypePublish) { - BOOST_CHECK(false); - } - DataPub pub; - if (!pub.ParseFromString(msg.body())) { - BOOST_CHECK(false); - } - ++count; + int i = 0; + auto OnRecv = [&](BHMsg &msg) { + if (msg.type() != kMsgTypePublish) { + BOOST_CHECK(false); + } + DataPub pub; + if (!pub.ParseFromString(msg.body())) { + BOOST_CHECK(false); + } + ++count; - auto cur = Now(); - if (last_time.exchange(cur) < cur) { - std::cout << "time: " << cur; - printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", - count.load(), count - last_count.exchange(count), init_avail - Avail()); - } - if (++i >= nmsg*topics.size()) { - cv.notify_one(); - } - // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); - }; - client.SetRecvCallback(OnRecv); + auto cur = Now(); + if (last_time.exchange(cur) < cur) { + std::cout << "time: " << cur; + printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", + count.load(), count - last_count.exchange(count), init_avail - Avail()); + } + if (++i >= nmsg * topics.size()) { + cv.notify_one(); + } + // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); + }; + client.SetRecvCallback(OnRecv); - std::unique_lock<std::mutex> lk(mutex); - cv.wait(lk); + std::unique_lock<std::mutex> lk(mutex); + cv.wait(lk); + }; - }; + auto Pub = [&](const std::string &topic) { + ShmSocket provider(ShmSocket::eSockPublish, shm); + for (int i = 0; i < nmsg; ++i) { + std::string data = topic + std::to_string(i) + std::string(1000, '-'); - auto Pub = [&](const std::string &topic) { - ShmSocket provider(ShmSocket::eSockPublish, shm); - for (int i = 0; i < nmsg; ++i) { - std::string data = topic + std::to_string(i) + std::string(1000, '-'); + bool r = provider.Publish(topic, data.data(), data.size(), timeout); + // bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout); + if (!r) { + printf("pub ret: %s\n", r ? "ok" : "fail"); + } + } + }; + ThreadManager threads; + typedef std::vector<std::string> Topics; + Topics topics; + for (int i = 0; i < 100; ++i) { + topics.push_back("t" + std::to_string(i)); + } + Topics part; + for (int i = 0; i < topics.size(); ++i) { + part.push_back(topics[i]); + threads.Launch(Sub, i, topics); + } + std::this_thread::sleep_for(100ms); + for (auto &topic : topics) { + threads.Launch(Pub, topic); + } + threads.Launch(Pub, "some_else"); - bool r = provider.Publish(topic, data.data(), data.size(), timeout); - // bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout); - if (!r) { - printf("pub ret: %s\n", r ? "ok" : "fail"); - } - } - }; - ThreadManager threads; - typedef std::vector<std::string> Topics; - Topics topics; - for (int i = 0; i < 100; ++i) { - topics.push_back("t" + std::to_string(i)); - } - Topics part; - for (int i = 0; i < topics.size(); ++i) { - part.push_back(topics[i]); - threads.Launch(Sub, i, topics); - } - std::this_thread::sleep_for(100ms); - for (auto &topic: topics) { - threads.Launch(Pub, topic); - } - threads.Launch(Pub, "some_else"); + threads.WaitAll(); + std::cout << "end : " << Now(); + printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", + count.load(), count - last_count.exchange(count), init_avail - Avail()); - threads.WaitAll(); - std::cout << "end : " << Now(); - printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", - count.load(), count - last_count.exchange(count), init_avail - Avail()); - - bus.Stop(); + bus.Stop(); } -inline int MyMin(int a, int b) { - printf("MyMin\n"); - return a < b ? a : b; +inline int MyMin(int a, int b) +{ + printf("MyMin\n"); + return a < b ? a : b; } + int test_main(int argc, char *argv[]) { - printf("test main\n"); - int a = 0; - int b = 0; - BOOST_CHECK_EQUAL(a, b); - int n = MyMin(4,6); - for (int i = 0; i < n; ++i) { - printf("i = %d\n", i); - } + printf("test main\n"); + int a = 0; + int b = 0; + BOOST_CHECK_EQUAL(a, b); + int n = MyMin(4, 6); + for (int i = 0; i < n; ++i) { + printf("i = %d\n", i); + } - return 0; + return 0; } - diff --git a/utest/util.h b/utest/util.h index ac1d58d..ca58cd7 100644 --- a/utest/util.h +++ b/utest/util.h @@ -19,21 +19,21 @@ #ifndef UTIL_W8A0OA5U #define UTIL_W8A0OA5U -#include <functional> -#include <vector> -#include <thread> -#include <stdlib.h> -#include <chrono> -#include <sys/types.h> -#include <sys/wait.h> -#include <boost/noncopyable.hpp> -#include <boost/timer/timer.hpp> -#include <boost/test/unit_test.hpp> -#include <boost/date_time/posix_time/posix_time.hpp> +#include "bh_util.h" +#include "msg.h" #include "shm.h" #include "shm_queue.h" -#include "msg.h" -#include "bh_util.h" +#include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/noncopyable.hpp> +#include <boost/test/unit_test.hpp> +#include <boost/timer/timer.hpp> +#include <chrono> +#include <functional> +#include <stdlib.h> +#include <sys/types.h> +#include <sys/wait.h> +#include <thread> +#include <vector> using namespace boost::posix_time; inline ptime Now() { return second_clock::universal_time(); }; @@ -42,58 +42,69 @@ typedef std::function<void(void)> FuncVV; -class ScopeCall : private boost::noncopyable { - FuncVV f_; +class ScopeCall : private boost::noncopyable +{ + FuncVV f_; + public: - ScopeCall(FuncVV f):f_(f) { f_(); } - ~ScopeCall() { f_(); } + ScopeCall(FuncVV f) : + f_(f) { f_(); } + ~ScopeCall() { f_(); } }; -class ThreadManager { - std::vector<std::thread> threads_; +class ThreadManager +{ + std::vector<std::thread> threads_; + public: - ~ThreadManager() { WaitAll(); } - template <class T, class...P> - void Launch(T t, P...p) { threads_.emplace_back(t, p...); } - void WaitAll() { - for (auto &t : threads_) { - if (t.joinable()) { - t.join(); - } - } - } + ~ThreadManager() { WaitAll(); } + template <class T, class... P> + void Launch(T t, P... p) { threads_.emplace_back(t, p...); } + void WaitAll() + { + for (auto &t : threads_) { + if (t.joinable()) { + t.join(); + } + } + } }; -class ProcessManager { - std::vector<pid_t> procs_; +class ProcessManager +{ + std::vector<pid_t> procs_; + public: - ~ProcessManager() { WaitAll(); } - template <class T, class ...P> - void Launch(T t, P...p) { - auto pid = fork(); - if (pid == 0) { - // child - t(p...); - exit(0); - } else if (pid != -1) { // Ok - procs_.push_back(pid); - } - }; - void WaitAll() { - for (auto &pid: procs_) { - int status = 0; - int options = WUNTRACED | WCONTINUED; - waitpid(pid, &status, options); - } - procs_.clear(); - } + ~ProcessManager() { WaitAll(); } + template <class T, class... P> + void Launch(T t, P... p) + { + auto pid = fork(); + if (pid == 0) { + // child + t(p...); + exit(0); + } else if (pid != -1) { // Ok + procs_.push_back(pid); + } + }; + void WaitAll() + { + for (auto &pid : procs_) { + int status = 0; + int options = WUNTRACED | WCONTINUED; + waitpid(pid, &status, options); + } + procs_.clear(); + } }; using namespace bhome_shm; using namespace bhome_msg; struct ShmRemover { - std::string name_; - ShmRemover(const std::string &name):name_(name) { SharedMemory::Remove(name_); } - ~ShmRemover() { SharedMemory::Remove(name_); } + std::string name_; + ShmRemover(const std::string &name) : + name_(name) { SharedMemory::Remove(name_); } + ~ShmRemover() { SharedMemory::Remove(name_); } }; #endif // end of include guard: UTIL_W8A0OA5U -- Gitblit v1.8.0