From 491d98b3ba32cafed5682552bd870ca0ef93275c Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 30 三月 2021 18:29:09 +0800 Subject: [PATCH] add ShmSocket as shm interface, add sub/pub. --- src/shm.h | 4 src/socket.h | 74 ++++++++++++ src/shm_queue.h | 5 src/socket.cpp | 134 ++++++++++++++++++++++ src/center.h | 26 ++++ src/defs.h | 6 + utest/utest.cpp | 66 +++++----- src/shm_queue.cpp | 12 + src/center.cpp | 29 ++++ 9 files changed, 320 insertions(+), 36 deletions(-) diff --git a/src/center.cpp b/src/center.cpp new file mode 100644 index 0000000..809b6d1 --- /dev/null +++ b/src/center.cpp @@ -0,0 +1,29 @@ +/* + * ===================================================================================== + * + * Filename: center.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�03鏈�30鏃� 16鏃�19鍒�37绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ +#include "center.h" +#include "defs.h" +#include "shm.h" + +using namespace bhome_shm; + +SharedMemory &BHomeShm() +{ + static SharedMemory shm("bhome_default_shm_v0", 1024*1024*64); + return shm; +} + diff --git a/src/center.h b/src/center.h new file mode 100644 index 0000000..fcb8005 --- /dev/null +++ b/src/center.h @@ -0,0 +1,26 @@ +/* + * ===================================================================================== + * + * Filename: center.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�03鏈�30鏃� 16鏃�22鍒�24绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ +#ifndef CENTER_TM9OUQTG +#define CENTER_TM9OUQTG + +class BHCenter +{ + +}; + +#endif // end of include guard: CENTER_TM9OUQTG diff --git a/src/defs.h b/src/defs.h index 56c6c9c..acfe09e 100644 --- a/src/defs.h +++ b/src/defs.h @@ -27,6 +27,12 @@ const MQId kBHBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff"); const int kBHCenterPort = 24287; const char kTopicSep = '.'; +namespace bhome_shm { +class SharedMemory; +} + +bhome_shm::SharedMemory &BHomeShm(); + //TODO center can check shm for previous crash. #endif // end of include guard: DEFS_KP8LKGD0 diff --git a/src/shm.h b/src/shm.h index 5e2c8b9..0f68754 100644 --- a/src/shm.h +++ b/src/shm.h @@ -61,6 +61,7 @@ template <class T, class ...Params> T * New(Params const&...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); } template <class T> void Delete(T *p) { if (p) { destroy_ptr<T>(p); }; } template <class T> void Delete(offset_ptr<T> p) { Delete(p.get()); } + template <class T> T *Find(const std::string &name) { return find<T>(name.c_str()).first; } }; @@ -91,7 +92,8 @@ throw("Error: Not enough memory, can not allocate \"" + name_ + "\""); } } - Data *find(const std::string &name) { return shm_.find<Data>(ObjName(name).c_str()).first; } + static Data *Find(SharedMemory &shm, const std::string &name) { return shm.Find<Data>(ObjName(name)); } + Data *Find(const std::string &name) { return Find(shm_, ObjName(name)); } virtual ~ShmObject() {} std::string name() const { return name_; } Data* data() { return pdata_; } diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp index ffc7c21..421cebf 100644 --- a/src/shm_queue.cpp +++ b/src/shm_queue.cpp @@ -28,7 +28,7 @@ namespace { std::string MsgQIdToName(const MQId& id) { return "shmq" + to_string(id); } -MQId EmptyId() { return nil_uuid(); } +// MQId EmptyId() { return nil_uuid(); } MQId NewId() { return random_generator()(); } const int AdjustMQLength(const int len) { const int kMaxLength = 10000; @@ -59,12 +59,18 @@ Remove(); } -bool ShmMsgQueue::Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms) +bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms) { - Queue *remote = find(MsgQIdToName(remote_id)); + Queue *remote = Find(shm, MsgQIdToName(remote_id)); return remote && remote->Write(msg, timeout_ms, [](const MsgI&msg){msg.AddRef();}); } +// bool ShmMsgQueue::Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms) +// { +// Queue *remote = Find(MsgQIdToName(remote_id)); +// return remote && remote->Write(msg, timeout_ms, [](const MsgI&msg){msg.AddRef();}); +// } + bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms) { MsgI msg; diff --git a/src/shm_queue.h b/src/shm_queue.h index a536553..60b1862 100644 --- a/src/shm_queue.h +++ b/src/shm_queue.h @@ -120,7 +120,10 @@ bool Recv(BHMsg &msg, const int timeout_ms); bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); } bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms); - bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms); + static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms); + bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms) { + return Send(shm(), remote_id, msg, timeout_ms); + } }; } // namespace bhome_shm diff --git a/src/socket.cpp b/src/socket.cpp new file mode 100644 index 0000000..1c28bfa --- /dev/null +++ b/src/socket.cpp @@ -0,0 +1,134 @@ +/* + * ===================================================================================== + * + * Filename: socket.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�03鏈�30鏃� 15鏃�48鍒�58绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ + +#include "socket.h" +#include <chrono> +#include "msg.h" +#include "defs.h" +#include "bh_util.h" + +using namespace bhome_msg; +using namespace bhome_shm; +using namespace std::chrono_literals; + +namespace { + +int GetSocketDefaultLen(ShmSocket::Type type) +{ + switch (type) { + case ShmSocket::eSockRequest : return 12; + case ShmSocket::eSockReply : return 64; + case ShmSocket::eSockPublish : return 0; + case ShmSocket::eSockSubscribe : return 64; + default: return 0; + } +} + + +} + +ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) + : shm_(shm), + type_(type), + run_(false) +{ + int len = GetSocketDefaultLen(type); + if (len != 0) { + mq_.reset(new Queue(shm_, len)); + + auto RecvProc = [this](){ + while (run_) { + try { + std::unique_lock<std::mutex> lk(mutex_); + if (cv_recv_cb_.wait_for(lk, 100ms, [this](){ return HasRecvCB(); })) { + BHMsg msg; + if (mq_->Recv(msg, 100)) { + this->onRecv_(msg); + } + } + } catch (...) {} + } + }; + run_.store(true); + workers_.emplace_back(RecvProc); + } +} +ShmSocket::ShmSocket(Type type) + : ShmSocket(type, BHomeShm()) +{ +} + +ShmSocket::~ShmSocket() +{ + Stop(); +} + +bool ShmSocket::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms) +{ + if (type_ != eSockPublish) { + return false; + } + assert(!mq_); + try { + MsgI imsg; + if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) { + return false; + } + DEFER1(imsg.Release(shm_)); + return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms); + + } catch (...) { + return false; + } +} + +bool ShmSocket::Subscribe(const std::vector<std::string> &topics, const int timeout_ms) +{ + if (type_ != eSockSubscribe) { + return false; + } + assert(mq_); + try { + return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms); + } catch (...) { + return false; + } +} + +bool ShmSocket::SetRecvCallback(const RecvCB &onRecv) +{ + std::lock_guard<std::mutex> lock(mutex_); + onRecv_ = onRecv; + cv_recv_cb_.notify_one(); + return true; +} + +bool ShmSocket::HasRecvCB() +{ + return static_cast<bool>(onRecv_); +} + +void ShmSocket::Stop() +{ + run_ = false; + for (auto &t : workers_) { + if (t.joinable()) { + t.join(); + } + } +} \ No newline at end of file diff --git a/src/socket.h b/src/socket.h new file mode 100644 index 0000000..e65ac83 --- /dev/null +++ b/src/socket.h @@ -0,0 +1,74 @@ +/* + * ===================================================================================== + * + * Filename: socket.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�03鏈�30鏃� 15鏃�49鍒�19绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ + +#ifndef SOCKET_GWTJHBPO +#define SOCKET_GWTJHBPO + +#include "shm_queue.h" +#include <vector> +#include <thread> +#include <memory> +#include <functional> +#include <mutex> +#include <condition_variable> +#include <atomic> + +class ShmSocket +{ + typedef bhome_shm::ShmMsgQueue Queue; +public: + enum Type { + eSockRequest, + eSockReply, + eSockSubscribe, + eSockPublish, + }; + typedef std::function<void (bhome_msg::BHMsg &msg)> RecvCB; + + ShmSocket(Type type); + ShmSocket(Type type, bhome_shm::SharedMemory &shm); + ~ShmSocket(); + + // bool Request(const std::string &topic, const void *data, const size_t size, onReply); + bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv + + // bool HandleRequest(onData); + bool ReadRequest(); // exclude with HandleRequest + bool SendReply(); // exclude with HandleRequest + + bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms); + bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms); + bool RecvSub(std::string &topic, std::string &data, const int timeout_ms); + bool SetRecvCallback(const RecvCB &onRecv); +private: + bool HasRecvCB(); + void Stop(); + + bhome_shm::SharedMemory &shm_; + Type type_; + std::vector<std::thread> workers_; + std::mutex mutex_; + std::condition_variable cv_recv_cb_; + std::atomic<bool> run_; + RecvCB onRecv_; + + std::unique_ptr<Queue> mq_; +}; + + +#endif // end of include guard: SOCKET_GWTJHBPO diff --git a/utest/utest.cpp b/utest/utest.cpp index f7571c8..bb5c14d 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -8,6 +8,7 @@ #include "pubsub.h" #include "defs.h" #include "util.h" +#include "socket.h" template <class A, class B> struct IsSameType { static const bool value = false; }; template <class A> struct IsSameType<A,A> { static const bool value = true; }; @@ -73,45 +74,48 @@ std::atomic<uint64_t> last_count(0); const uint64_t nmsg = 100; - const int timeout = 1000; auto Sub = [&](int id, const std::vector<std::string> &topics) { - ShmMsgQueue client(shm, 8); - client.Send(kBHBusQueueId, MakeSub(client.Id(), topics), timeout); - for (int i = 0; i < nmsg * topics.size(); ++i) { - BHMsg msg; - if (client.Recv(msg, 1000)) { - if (msg.type() != kMsgTypePublish) { - BOOST_CHECK(false); - } - DataPub pub; - if (!pub.ParseFromString(msg.body())) { - BOOST_CHECK(false); - } - ++count; - auto cur = Now(); - if (last_time.exchange(cur) < cur) { - std::cout << "time: " << cur; - printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", - count.load(), count - last_count.exchange(count), init_avail - Avail()); - } - // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); - } else { - printf("sub %2d recv timeout\n", id); - } + ShmSocket client(ShmSocket::eSockSubscribe, shm); + bool r = client.Subscribe(topics, timeout); + std::mutex mutex; + std::condition_variable cv; - } + int i = 0; + auto OnRecv = [&](BHMsg &msg) { + if (msg.type() != kMsgTypePublish) { + BOOST_CHECK(false); + } + DataPub pub; + if (!pub.ParseFromString(msg.body())) { + BOOST_CHECK(false); + } + ++count; + + auto cur = Now(); + if (last_time.exchange(cur) < cur) { + std::cout << "time: " << cur; + printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", + count.load(), count - last_count.exchange(count), init_avail - Avail()); + } + if (++i >= nmsg*topics.size()) { + cv.notify_one(); + } + // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); + }; + client.SetRecvCallback(OnRecv); + + std::unique_lock<std::mutex> lk(mutex); + cv.wait(lk); + }; + auto Pub = [&](const std::string &topic) { - ShmMsgQueue provider(shm, 0); + ShmSocket provider(ShmSocket::eSockPublish, shm); for (int i = 0; i < nmsg; ++i) { std::string data = topic + std::to_string(i) + std::string(1000, '-'); - MsgI msg; - msg.MakeRC(shm, MakePub(topic, data.data(), data.size())); - DEFER1(msg.Release(shm)); - bool r = provider.Send(kBHBusQueueId, msg, timeout); - + bool r = provider.Publish(topic, data.data(), data.size(), timeout); // bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout); if (!r) { printf("pub ret: %s\n", r ? "ok" : "fail"); -- Gitblit v1.8.0