From b55ffe89f4b237be5f79232cfddfe22bfdb87c64 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 01 四月 2021 13:23:48 +0800 Subject: [PATCH] make req/rep,sub/pub sockets sub class; --- src/socket.h | 45 +- src/pubsub_center.h | 53 +++ src/reqrep.h | 57 +++ src/pubsub.h | 50 +- src/socket.cpp | 210 +----------- src/pubsub.cpp | 153 ++------ src/pubsub_center.cpp | 134 ++++++++ utest/utest.cpp | 36 - src/reqrep.cpp | 164 ++++++++++ 9 files changed, 537 insertions(+), 365 deletions(-) diff --git a/src/pubsub.cpp b/src/pubsub.cpp index eff54ab..cfc77ab 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -19,127 +19,58 @@ #include "bh_util.h" #include "defs.h" -namespace bhome_shm -{ - using namespace std::chrono_literals; -const int kMaxWorker = 16; using namespace bhome_msg; -BusManager::BusManager(SharedMemory &shm) : - shm_(shm), socket_(ShmSocket::eSockBus, shm) {} -BusManager::BusManager() : - BusManager(BHomeShm()) {} - -bool BusManager::Start(const int nworker) +bool SocketPublish::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms) { - auto onRecv = [&](MsgI &imsg) { -#ifndef NDEBUG - static std::atomic<time_t> last(0); - time_t now = 0; - time(&now); - if (last.exchange(now) < now) { - printf("bus queue size: %ld\n", socket_.Pending()); + try { + MsgI imsg; + if (!imsg.MakeRC(shm(), MakePub(topic, data, size))) { + return false; } -#endif + DEFER1(imsg.Release(shm())); + return ShmMsgQueue::Send(shm(), kBHBusQueueId, imsg, timeout_ms); + } catch (...) { + return false; + } +} - BHMsg msg; - if (!imsg.Unpack(msg)) { - return; - } +bool SocketSubscribe::Subscribe(const std::vector<std::string> &topics, const int timeout_ms) +{ + try { + return mq().Send(kBHBusQueueId, MakeSub(mq().Id(), topics), timeout_ms); + } catch (...) { + return false; + } +} - auto OnSubChange = [&](auto &&update) { - DataSub sub; - if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) { - assert(sizeof(MQId) == msg.route(0).mq_id().size()); - MQId client; - memcpy(&client, msg.route(0).mq_id().data(), sizeof(client)); - - std::lock_guard<std::mutex> guard(mutex_); - auto &topics = sub.topics(); - for (auto &topic : topics) { - try { - update(topic, client); - } catch (...) { - //TODO log error - } - } +bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker) +{ + auto AsyncRecvProc = [this, tdcb](BHMsg &msg) { + if (msg.type() == kMsgTypePublish) { + DataPub d; + if (d.ParseFromString(msg.body())) { + tdcb(d.topic(), d.data()); } - }; - - auto Sub1 = [this](const std::string &topic, const MQId &id) { - records_[topic].insert(id); - }; - - auto Unsub1 = [this](const std::string &topic, const MQId &id) { - auto pos = records_.find(topic); - if (pos != records_.end()) { - if (pos->second.erase(id) && pos->second.empty()) { - records_.erase(pos); - } - } - }; - - auto OnPublish = [&]() { - DataPub pub; - if (!pub.ParseFromString(msg.body())) { - return; - } - auto FindClients = [&](const std::string &topic) { - Clients dests; - std::lock_guard<std::mutex> guard(mutex_); - auto Find1 = [&](const std::string &t) { - auto pos = records_.find(topic); - if (pos != records_.end() && !pos->second.empty()) { - auto &clients = pos->second; - for (auto &cli : clients) { - dests.insert(cli); - } - } - }; - Find1(topic); - - //TODO check and adjust topic on client side sub/pub. - size_t pos = 0; - while (true) { - pos = topic.find(kTopicSep, pos); - if (pos == topic.npos || ++pos == topic.size()) { - // Find1(std::string()); // sub all. - break; - } else { - Find1(topic.substr(0, pos)); - } - } - return dests; - }; - - auto Dispatch = [&](auto &&send1) { - const Clients &clients(FindClients(pub.topic())); - for (auto &cli : clients) { - send1(cli); - } - }; - - if (imsg.IsCounted()) { - Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, imsg, 100); }); - } else { - MsgI pubmsg; - if (!pubmsg.MakeRC(shm_, msg)) { return; } - DEFER1(pubmsg.Release(shm_)); - - Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, pubmsg, 100); }); - } - }; - - switch (msg.type()) { - case kMsgTypeSubscribe: OnSubChange(Sub1); break; - case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break; - case kMsgTypePublish: OnPublish(); break; - default: break; + } else { + // ignored, or dropped } }; - return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); + return tdcb && Start(AsyncRecvProc, nworker); } -} // namespace bhome_shm +bool SocketSubscribe::RecvSub(std::string &topic, std::string &data, const int timeout_ms) +{ + BHMsg msg; + if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) { + DataPub d; + if (d.ParseFromString(msg.body())) { + d.mutable_topic()->swap(topic); + d.mutable_data()->swap(data); + return true; + } + } + return false; +} \ No newline at end of file diff --git a/src/pubsub.h b/src/pubsub.h index be6521f..cad9f61 100644 --- a/src/pubsub.h +++ b/src/pubsub.h @@ -18,31 +18,43 @@ #ifndef PUBSUB_4KGRA997 #define PUBSUB_4KGRA997 +#include "defs.h" #include "socket.h" -#include <mutex> -#include <set> -#include <unordered_map> +#include <string> -namespace bhome_shm +class SocketPublish { - -// publish/subcribe manager. -class BusManager -{ - SharedMemory &shm_; - ShmSocket socket_; - std::mutex mutex_; - typedef std::set<MQId> Clients; - std::unordered_map<std::string, Clients> records_; + typedef ShmSocket Socket; + Socket::Shm &shm_; + Socket::Shm &shm() { return shm_; } public: - BusManager(SharedMemory &shm); - BusManager(); - ~BusManager() { Stop(); } - bool Start(const int nworker = 2); - bool Stop() { return socket_.Stop(); } + SocketPublish(Socket::Shm &shm) : + shm_(shm) {} + SocketPublish() : + SocketPublish(BHomeShm()) {} + bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms); + bool Publish(const std::string &topic, const std::string &data, const int timeout_ms) + { + return Publish(topic, data.data(), data.size(), timeout_ms); + } }; -} // namespace bhome_shm +// socket subscribe +class SocketSubscribe : private ShmSocket +{ + typedef ShmSocket Socket; + +public: + SocketSubscribe(Socket::Shm &shm) : + Socket(shm, 64) {} + SocketSubscribe() : + SocketSubscribe(BHomeShm()) {} + + typedef std::function<void(const std::string &topic, const std::string &data)> TopicDataCB; + bool StartRecv(const TopicDataCB &tdcb, int nworker = 2); + bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms); + bool RecvSub(std::string &topic, std::string &data, const int timeout_ms); +}; #endif // end of include guard: PUBSUB_4KGRA997 diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp new file mode 100644 index 0000000..33c16be --- /dev/null +++ b/src/pubsub_center.cpp @@ -0,0 +1,134 @@ +/* + * ===================================================================================== + * + * Filename: pubsub_center.cpp + * + * Description: pub/sub center/manager + * + * Version: 1.0 + * Created: 2021骞�04鏈�01鏃� 09鏃�29鍒�04绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ +#include "pubsub_center.h" +#include "bh_util.h" + +PubSubCenter::PubSubCenter(SharedMemory &shm) : + socket_(shm) {} + +bool PubSubCenter::Start(const int nworker) +{ + auto onRecv = [&](MsgI &imsg) { +#ifndef NDEBUG + static std::atomic<time_t> last(0); + time_t now = 0; + time(&now); + if (last.exchange(now) < now) { + printf("bus queue size: %ld\n", socket_.Pending()); + } +#endif + + BHMsg msg; + if (!imsg.Unpack(msg)) { + return; + } + + auto OnSubChange = [&](auto &&update) { + DataSub sub; + if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) { + assert(sizeof(MQId) == msg.route(0).mq_id().size()); + MQId client; + memcpy(&client, msg.route(0).mq_id().data(), sizeof(client)); + + std::lock_guard<std::mutex> guard(mutex_); + auto &topics = sub.topics(); + for (auto &topic : topics) { + try { + update(topic, client); + } catch (...) { + //TODO log error + } + } + } + }; + + auto Sub1 = [this](const std::string &topic, const MQId &id) { + records_[topic].insert(id); + }; + + auto Unsub1 = [this](const std::string &topic, const MQId &id) { + auto pos = records_.find(topic); + if (pos != records_.end()) { + if (pos->second.erase(id) && pos->second.empty()) { + records_.erase(pos); + } + } + }; + + auto OnPublish = [&]() { + DataPub pub; + if (!pub.ParseFromString(msg.body())) { + return; + } + auto FindClients = [&](const std::string &topic) { + Clients dests; + std::lock_guard<std::mutex> guard(mutex_); + auto Find1 = [&](const std::string &t) { + auto pos = records_.find(topic); + if (pos != records_.end() && !pos->second.empty()) { + auto &clients = pos->second; + for (auto &cli : clients) { + dests.insert(cli); + } + } + }; + Find1(topic); + + //TODO check and adjust topic on client side sub/pub. + size_t pos = 0; + while (true) { + pos = topic.find(kTopicSep, pos); + if (pos == topic.npos || ++pos == topic.size()) { + // Find1(std::string()); // sub all. + break; + } else { + Find1(topic.substr(0, pos)); + } + } + return dests; + }; + + auto Dispatch = [&](auto &&send1) { + const Clients &clients(FindClients(pub.topic())); + for (auto &cli : clients) { + send1(cli); + } + }; + + if (imsg.IsCounted()) { + Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 100); }); + } else { + MsgI pubmsg; + if (!pubmsg.MakeRC(shm(), msg)) { return; } + DEFER1(pubmsg.Release(shm())); + + Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 100); }); + } + }; + + switch (msg.type()) { + case kMsgTypeSubscribe: OnSubChange(Sub1); break; + case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break; + case kMsgTypePublish: OnPublish(); break; + default: break; + } + }; + + const int kMaxWorker = 16; + return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); +} \ No newline at end of file diff --git a/src/pubsub_center.h b/src/pubsub_center.h new file mode 100644 index 0000000..866216e --- /dev/null +++ b/src/pubsub_center.h @@ -0,0 +1,53 @@ +/* + * ===================================================================================== + * + * Filename: pubsub_center.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�01鏃� 09鏃�29鍒�39绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ +#ifndef PUBSUB_CENTER_MFSUZJU7 +#define PUBSUB_CENTER_MFSUZJU7 + +#include "defs.h" +#include "socket.h" +#include <mutex> +#include <set> +#include <unordered_map> +using namespace bhome_shm; + +// publish/subcribe manager. +class PubSubCenter +{ + class SocketBus : public ShmSocket + { + public: + SocketBus(SharedMemory &shm) : + ShmSocket(shm, &kBHBusQueueId, 1000) {} + using ShmSocket::shm; + }; + SocketBus socket_; + std::mutex mutex_; + typedef std::set<MQId> Clients; + std::unordered_map<std::string, Clients> records_; + ShmSocket::Shm &shm() { return socket_.shm(); } + +public: + PubSubCenter(SharedMemory &shm); + PubSubCenter() : + PubSubCenter(BHomeShm()) {} + ~PubSubCenter() { Stop(); } + bool Start(const int nworker = 2); + bool Stop() { return socket_.Stop(); } +}; + +#endif // end of include guard: PUBSUB_CENTER_MFSUZJU7 diff --git a/src/reqrep.cpp b/src/reqrep.cpp new file mode 100644 index 0000000..e1636fd --- /dev/null +++ b/src/reqrep.cpp @@ -0,0 +1,164 @@ +/* + * ===================================================================================== + * + * Filename: reqrep.cpp + * + * Description: topic request/reply sockets + * + * Version: 1.0 + * Created: 2021骞�04鏈�01鏃� 09鏃�35鍒�35绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ +#include "reqrep.h" +#include "msg.h" +#include <chrono> +#include <condition_variable> + +using namespace bhome_msg; + +bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker) +{ + auto AsyncRecvProc = [this, rrcb](BHMsg &msg) { + auto Find = [&](RecvCB &cb) { + std::lock_guard<std::mutex> lock(mutex()); + const std::string &msgid = msg.msg_id(); + auto pos = async_cbs_.find(msgid); + if (pos != async_cbs_.end()) { + cb.swap(pos->second); + async_cbs_.erase(pos); + return true; + } else { + return false; + } + }; + + RecvCB cb; + if (Find(cb) && cb) { + cb(msg); + } else if (rrcb && msg.type() == kMsgTypeReply) { + DataReply reply; + if (reply.ParseFromString(msg.body())) { + rrcb(reply.data()); + } + } else { + // ignored, or dropped + } + }; + + return Start(AsyncRecvProc, nworker); +} + +bool SocketRequest::AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) +{ + auto Call = [&](const void *remote) { + const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); + auto onRecv = [cb](BHMsg &msg) { + if (msg.type() == kMsgTypeReply) { + DataReply reply; + if (reply.ParseFromString(msg.body())) { + cb(reply.data()); + } + } + }; + return AsyncSend(remote, &msg, timeout_ms, onRecv); + }; + + try { + BHAddress addr; + if (QueryRPCTopic(topic, addr, timeout_ms)) { + return Call(addr.mq_id().data()); + } + } catch (...) { + return false; + } +} + +bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out) +{ + try { + BHAddress addr; + if (QueryRPCTopic(topic, addr, timeout_ms)) { + const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); + BHMsg reply; + if (SyncSendAndRecv(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) { + DataReply dr; + if (dr.ParseFromString(msg.body())) { + dr.mutable_data()->swap(out); + return true; + } + } + } + } catch (...) { + return false; + } +} + +bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb) +{ + assert(remote && pmsg); + try { + const BHMsg &msg = *static_cast<const BHMsg *>(pmsg); + auto RegisterCB = [&]() { + std::lock_guard<std::mutex> lock(mutex()); + async_cbs_.emplace(msg.msg_id(), cb); + }; + + return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms, RegisterCB); + } catch (...) { + return false; + } +} + +bool SocketRequest::SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms) +{ + struct State { + std::mutex mutex; + std::condition_variable cv; + bool canceled = false; + }; + + try { + std::shared_ptr<State> st(new State); + auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); + + auto OnRecv = [=](BHMsg &msg) { + std::unique_lock<std::mutex> lk(st->mutex); + if (!st->canceled) { + static_cast<BHMsg *>(result)->Swap(&msg); + st->cv.notify_one(); + } // else result is no longer valid. + }; + + std::unique_lock<std::mutex> lk(st->mutex); + if (AsyncSend(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) { + return true; + } else { + st->canceled = true; + return false; + } + } catch (...) { + return false; + } +} + +bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms) +{ + BHMsg result; + const BHMsg &msg = MakeQueryTopic(topic); + if (SyncSendAndRecv(&kBHTopicRPCId, &msg, &result, timeout_ms)) { + if (result.type() == kMsgTypeQueryTopicReply) { + DataQueryTopicReply reply; + if (reply.ParseFromString(result.body())) { + addr = reply.address(); + return !addr.mq_id().empty(); + } + } + } + return false; +} diff --git a/src/reqrep.h b/src/reqrep.h new file mode 100644 index 0000000..02cc86f --- /dev/null +++ b/src/reqrep.h @@ -0,0 +1,57 @@ +/* + * ===================================================================================== + * + * Filename: reqrep.h + * + * Description: topic request/reply sockets + * + * Version: 1.0 + * Created: 2021骞�04鏈�01鏃� 09鏃�36鍒�06绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ +#ifndef REQREP_ACEH09NK +#define REQREP_ACEH09NK + +#include "defs.h" +#include "socket.h" +#include <functional> +#include <unordered_map> + +class SocketRequest : private ShmSocket +{ + typedef ShmSocket Socket; + +public: + SocketRequest(Socket::Shm &shm) : + Socket(shm, 64) { StartWorker(); } + SocketRequest() : + SocketRequest(BHomeShm()) {} + + typedef std::function<void(const std::string &data)> RequestResultCB; + bool StartWorker(const RequestResultCB &rrcb, int nworker = 2); + bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); } + bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb); + bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb) + { + return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb); + } + bool SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out); + bool SyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, std::string &out) + { + return SyncRequest(topic, data.data(), data.size(), timeout_ms, out); + } + +private: + bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb); + bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms); + bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms); + std::unordered_map<std::string, RecvCB> async_cbs_; +}; + +#endif // end of include guard: REQREP_ACEH09NK diff --git a/src/socket.cpp b/src/socket.cpp index 13f1e38..4c2fc6b 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -20,8 +20,6 @@ #include "bh_util.h" #include "defs.h" #include "msg.h" -#include <chrono> -#include <condition_variable> using namespace bhome_msg; using namespace bhome_shm; @@ -31,78 +29,33 @@ } // namespace -//TODO maybe change to base class, each type is a sub class. - -ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) : - shm_(shm), type_(type), run_(false) +ShmSocket::ShmSocket(Shm &shm, const void *id, const int len) : + shm_(shm), run_(false) { - switch (type) { - case eSockBus: mq_.reset(new Queue(kBHBusQueueId, shm_, 1000)); break; - case eSockRequest: mq_.reset(new Queue(shm_, 12)); break; - case eSockReply: mq_.reset(new Queue(shm_, 64)); break; - case eSockSubscribe: mq_.reset(new Queue(shm_, 64)); break; - case eSockPublish: break; // no recv mq needed - default: break; + if (id && len > 0) { + mq_.reset(new Queue(*static_cast<const MQId *>(id), shm, len)); } } - -ShmSocket::ShmSocket(Type type) : - ShmSocket(type, BHomeShm()) {} +ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) : + shm_(shm), run_(false) +{ + if (len > 0) { + mq_.reset(new Queue(shm_, len)); + } +} ShmSocket::~ShmSocket() { Stop(); } -bool ShmSocket::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms) -{ - if (type_ != eSockPublish) { - return false; - } - assert(!mq_); - try { - MsgI imsg; - if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) { - return false; - } - DEFER1(imsg.Release(shm_)); - return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms); - } catch (...) { - return false; - } -} - -bool ShmSocket::Subscribe(const std::vector<std::string> &topics, const int timeout_ms) -{ - if (type_ != eSockSubscribe) { - return false; - } - assert(mq_); - try { - return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms); - } catch (...) { - return false; - } -} - bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker) { - auto CanRecv = [this]() { - switch (type_) { - case eSockRequest: - case eSockReply: - case eSockBus: - case eSockSubscribe: - return true; - default: - return false; - } - }; - if (!CanRecv()) { + if (!mq_) { return false; } - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard<std::mutex> lock(mutex_); StopNoLock(); auto RecvProc = [this, onData]() { while (run_) { @@ -127,31 +80,6 @@ return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker); } -bool ShmSocket::StartAsync(int nworker) -{ - auto AsyncRecvProc = [this](BHMsg &msg) { - auto Find = [&](RecvCB &cb) { - std::lock_guard<std::mutex> lock(mutex_); - const std::string &msgid = msg.msg_id(); - auto pos = async_cbs_.find(msgid); - if (pos != async_cbs_.end()) { - cb.swap(pos->second); - async_cbs_.erase(pos); - return true; - } else { - return false; - } - }; - - RecvCB cb; - if (Find(cb) && cb) { - cb(msg); - } - }; - - return Start(AsyncRecvProc, nworker); -} - bool ShmSocket::Stop() { std::lock_guard<std::mutex> lock(mutex_); @@ -166,118 +94,28 @@ w.join(); } } + workers_.clear(); return true; } return false; } -bool ShmSocket::AsyncRequest(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb) +bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms) { - if (type_ != eSockRequest) { + std::lock_guard<std::mutex> lock(mutex_); + if (!mq_ || RunningNoLock()) { return false; - } - assert(remote && pmsg && !mq_); - try { - const BHMsg &msg = *static_cast<const BHMsg *>(pmsg); - auto RegisterCB = [&]() { - std::lock_guard<std::mutex> lock(mutex_); - async_cbs_.emplace(msg.msg_id(), cb); - }; - - return mq_->Send(*static_cast<const MQId *>(remote), msg, timeout_ms, RegisterCB); - } catch (...) { - return false; + } else { + return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms); } } -bool ShmSocket::SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms) +bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms) { - struct State { - std::mutex mutex; - std::condition_variable cv; - bool canceled = false; - }; - - try { - std::shared_ptr<State> st(new State); - auto OnRecv = [=](BHMsg &msg) { - std::unique_lock<std::mutex> lk(st->mutex); - if (!st->canceled) { - static_cast<BHMsg *>(result)->Swap(&msg); - st->cv.notify_one(); - } - }; - - std::unique_lock<std::mutex> lk(st->mutex); - auto end = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); - if (AsyncRequest(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, end) == std::cv_status::no_timeout) { - return true; - } else { - st->canceled = true; - return false; - } - } catch (...) { + std::lock_guard<std::mutex> lock(mutex_); + if (!mq_ || RunningNoLock()) { return false; - } -} - -bool ShmSocket::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms) -{ - BHMsg result; - const BHMsg &msg = MakeQueryTopic(topic); - if (SyncRequest(&kBHTopicRPCId, &msg, &result, timeout_ms)) { - if (result.type() == kMsgTypeQueryTopicReply) { - DataQueryTopicReply reply; - if (reply.ParseFromString(result.body())) { - addr = reply.address(); - return !addr.mq_id().empty(); - } - } - } - return false; -} - -bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) -{ - auto Call = [&](const void *remote) { - const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size)); - auto onRecv = [cb](BHMsg &msg) { - if (msg.type() == kMsgTypeReply) { - DataReply reply; - if (reply.ParseFromString(msg.body())) { - cb(reply.data().data(), reply.data().size()); - } - } - }; - return AsyncRequest(remote, &msg, timeout_ms, onRecv); - }; - - try { - BHAddress addr; - if (QueryRPCTopic(topic, addr, timeout_ms)) { - return Call(addr.mq_id().data()); - } - } catch (...) { - return false; - } -} - -bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out) -{ - try { - BHAddress addr; - if (QueryRPCTopic(topic, addr, timeout_ms)) { - const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size)); - BHMsg reply; - if (SyncRequest(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) { - DataReply dr; - if (dr.ParseFromString(msg.body())) { - dr.mutable_data()->swap(out); - return true; - } - } - } - } catch (...) { - return false; + } else { + return mq_->Recv(msg, timeout_ms); } } diff --git a/src/socket.h b/src/socket.h index eee5b5b..20da7c0 100644 --- a/src/socket.h +++ b/src/socket.h @@ -26,59 +26,48 @@ #include <memory> #include <mutex> #include <thread> -#include <unordered_map> #include <vector> class ShmSocket : private boost::noncopyable { +protected: typedef bhome_shm::ShmMsgQueue Queue; public: - enum Type { - eSockRequest, - eSockReply, - eSockSubscribe, - eSockPublish, - eSockBus, - }; + typedef bhome_shm::SharedMemory Shm; typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB; typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB; - typedef std::function<void(const void *data, const size_t size)> RequestResultCB; - ShmSocket(Type type, bhome_shm::SharedMemory &shm); - ShmSocket(Type type); + ShmSocket(Shm &shm, const int len = 12); ~ShmSocket(); - bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb); - bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out); - - // bool HandleRequest(onData); - bool ReadRequest(); // exclude with HandleRequest - bool SendReply(); // exclude with HandleRequest - - bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms); - bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms); - bool RecvSub(std::string &topic, std::string &data, const int timeout_ms); // start recv. bool Start(const RecvCB &onData, int nworker = 1); bool StartRaw(const RecvRawCB &onData, int nworker = 1); - bool StartAsync(int nworker = 2); bool Stop(); size_t Pending() const { return mq_ ? mq_->Pending() : 0; } +protected: + ShmSocket(Shm &shm, const void *id, const int len); + Shm &shm() { return shm_; } + const Shm &shm() const { return shm_; } + Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid. + const Queue &mq() const { return *mq_; } + std::mutex &mutex() { return mutex_; } + + bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms); + bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms); + private: - bool AsyncRequest(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb); - bool SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms); - bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms); bool StopNoLock(); - bhome_shm::SharedMemory &shm_; - const Type type_; + bool RunningNoLock() { return !workers_.empty(); } + + Shm &shm_; std::vector<std::thread> workers_; std::mutex mutex_; std::atomic<bool> run_; std::unique_ptr<Queue> mq_; - std::unordered_map<std::string, RecvCB> async_cbs_; }; #endif // end of include guard: SOCKET_GWTJHBPO diff --git a/utest/utest.cpp b/utest/utest.cpp index fbe4d51..b95e646 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -1,5 +1,6 @@ #include "defs.h" #include "pubsub.h" +#include "pubsub_center.h" #include "socket.h" #include "util.h" #include <atomic> @@ -66,7 +67,7 @@ BOOST_AUTO_TEST_CASE(PubSubTest) { const std::string shm_name("ShmPubSub"); - // ShmRemover auto_remove(shm_name); //remove twice? in case of killed? + ShmRemover auto_remove(shm_name); //remove twice? in case of killed? SharedMemory shm(shm_name, 1024 * 1024 * 50); DEFER1(shm.Remove()); auto Avail = [&]() { return shm.get_free_memory(); }; @@ -75,57 +76,50 @@ printf("flag = %d\n", *flag); ++*flag; - BusManager bus(shm); + PubSubCenter bus(shm); bus.Start(); + std::this_thread::sleep_for(100ms); - std::atomic<uint64_t> count(0); + std::atomic<uint64_t> total_count(0); std::atomic<ptime> last_time(Now() - seconds(1)); std::atomic<uint64_t> last_count(0); const uint64_t nmsg = 100 * 2; const int timeout = 1000; auto Sub = [&](int id, const std::vector<std::string> &topics) { - ShmSocket client(ShmSocket::eSockSubscribe, shm); + SocketSubscribe client(shm); bool r = client.Subscribe(topics, timeout); std::mutex mutex; std::condition_variable cv; - uint64_t i = 0; - auto OnRecv = [&](BHMsg &msg) { - if (msg.type() != kMsgTypePublish) { - BOOST_CHECK(false); - } - DataPub pub; - if (!pub.ParseFromString(msg.body())) { - BOOST_CHECK(false); - } - ++count; + std::atomic<uint64_t> n(0); + auto OnTopicData = [&](const std::string &topic, const std::string &data) { + ++total_count; auto cur = Now(); if (last_time.exchange(cur) < cur) { std::cout << "time: " << cur; printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", - count.load(), count - last_count.exchange(count), init_avail - Avail()); + total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail()); } - if (++i >= nmsg * topics.size()) { + if (++n >= nmsg * topics.size()) { cv.notify_one(); } // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); }; - client.Start(OnRecv); + client.StartRecv(OnTopicData, 1); std::unique_lock<std::mutex> lk(mutex); cv.wait(lk); }; auto Pub = [&](const std::string &topic) { - ShmSocket provider(ShmSocket::eSockPublish, shm); + SocketPublish provider(shm); for (unsigned i = 0; i < nmsg; ++i) { std::string data = topic + std::to_string(i) + std::string(1000, '-'); - bool r = provider.Publish(topic, data.data(), data.size(), timeout); - // bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout); + bool r = provider.Publish(topic, data, timeout); if (!r) { printf("pub ret: %s\n", r ? "ok" : "fail"); } @@ -151,7 +145,7 @@ threads.WaitAll(); std::cout << "end : " << Now(); printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", - count.load(), count - last_count.exchange(count), init_avail - Avail()); + total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail()); bus.Stop(); } -- Gitblit v1.8.0