From 83085f2ce99cca05d40a19482151873a55e6393a Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 02 四月 2021 19:32:21 +0800 Subject: [PATCH] refactor center; add async request no cb. --- src/socket.h | 13 +- src/pubsub_center.h | 15 +-- src/reqrep_center.cpp | 54 +++++++--- src/reqrep.h | 11 + src/reqrep_center.h | 14 -- src/socket.cpp | 14 +- src/center.h | 16 +++ src/pubsub_center.cpp | 37 +++--- utest/utest.cpp | 21 +++ src/reqrep.cpp | 34 +++++- src/center.cpp | 13 ++ 11 files changed, 165 insertions(+), 77 deletions(-) diff --git a/src/center.cpp b/src/center.cpp index db000c4..d6570aa 100644 --- a/src/center.cpp +++ b/src/center.cpp @@ -17,6 +17,8 @@ */ #include "center.h" #include "defs.h" +#include "pubsub_center.h" +#include "reqrep_center.h" #include "shm.h" using namespace bhome_shm; @@ -26,3 +28,14 @@ static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 64); return shm; } + +BHCenter::BHCenter(Socket::Shm &shm) : + socket_(shm) {} + +BHCenter::BHCenter() : + BHCenter(BHomeShm()) {} + +bool BHCenter::Start() +{ + return false; +} \ No newline at end of file diff --git a/src/center.h b/src/center.h index 153cc3e..f0a177c 100644 --- a/src/center.h +++ b/src/center.h @@ -18,8 +18,24 @@ #ifndef CENTER_TM9OUQTG #define CENTER_TM9OUQTG +#include "socket.h" +#include <functional> + class BHCenter { + typedef ShmSocket Socket; + +public: + typedef std::function<bool(ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg)> MsgHandler; + + BHCenter(Socket::Shm &shm); + BHCenter(); + ~BHCenter() { Stop(); } + bool Start(); + bool Stop() { return socket_.Stop(); } + +private: + ShmSocket socket_; }; #endif // end of include guard: CENTER_TM9OUQTG diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp index 3ba5382..b3af47d 100644 --- a/src/pubsub_center.cpp +++ b/src/pubsub_center.cpp @@ -77,25 +77,21 @@ } // namespace -bool PubSubCenter::Start(const int nworker) +BHCenter::MsgHandler MakeBusCenter() { auto bus_ptr = std::make_shared<Synced<BusCenter>>(); - auto onRecv = [bus_ptr, this](MsgI &imsg) { + return [bus_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) { #ifndef NDEBUG static std::atomic<time_t> last(0); time_t now = 0; time(&now); if (last.exchange(now) < now) { - printf("bus queue size: %ld\n", socket_.Pending()); + printf("bus queue size: %ld\n", socket.Pending()); } #endif auto &bus = *bus_ptr; - - BHMsg msg; - if (!imsg.Unpack(msg)) { - return; - } + auto &shm = socket.shm(); auto OnSubChange = [&](auto &&update) { DataSub sub; @@ -106,7 +102,6 @@ update(client, sub.topics()); } }; - auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); }; auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); }; @@ -123,24 +118,30 @@ }; if (imsg.IsCounted()) { - Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 10); }); + Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, imsg, 10); }); } else { MsgI pubmsg; - if (!pubmsg.MakeRC(shm(), msg)) { return; } - DEFER1(pubmsg.Release(shm())); + if (!pubmsg.MakeRC(shm, msg)) { return; } + DEFER1(pubmsg.Release(shm)); - Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 10); }); + Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, pubmsg, 10); }); } }; switch (msg.type()) { - case kMsgTypeSubscribe: OnSubChange(Sub); break; - case kMsgTypeUnsubscribe: OnSubChange(Unsub); break; - case kMsgTypePublish: OnPublish(); break; - default: break; + case kMsgTypeSubscribe: OnSubChange(Sub); return true; + case kMsgTypeUnsubscribe: OnSubChange(Unsub); return true; + case kMsgTypePublish: OnPublish(); return true; + default: return false; } }; +} + +bool PubSubCenter::Start(const int nworker) +{ + auto handler = MakeBusCenter(); + printf("sizeof(pub/sub handler) = %ld\n", sizeof(handler)); const int kMaxWorker = 16; - return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); + return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); } \ No newline at end of file diff --git a/src/pubsub_center.h b/src/pubsub_center.h index af3a2f4..e79dd96 100644 --- a/src/pubsub_center.h +++ b/src/pubsub_center.h @@ -18,28 +18,23 @@ #ifndef PUBSUB_CENTER_MFSUZJU7 #define PUBSUB_CENTER_MFSUZJU7 +#include "center.h" #include "defs.h" #include "socket.h" #include <mutex> #include <set> #include <unordered_map> +BHCenter::MsgHandler MakeBusCenter(); + // publish/subcribe manager. class PubSubCenter { - class SocketBus : public ShmSocket - { - public: - SocketBus(ShmSocket::Shm &shm) : - ShmSocket(shm, &kBHTopicBus, 1000) {} - using ShmSocket::shm; - }; - SocketBus socket_; - ShmSocket::Shm &shm() { return socket_.shm(); } + ShmSocket socket_; public: PubSubCenter(ShmSocket::Shm &shm) : - socket_(shm) {} + socket_(shm, &kBHTopicBus, 1000) {} PubSubCenter() : PubSubCenter(BHomeShm()) {} ~PubSubCenter() { Stop(); } diff --git a/src/reqrep.cpp b/src/reqrep.cpp index b8e423b..25c0826 100644 --- a/src/reqrep.cpp +++ b/src/reqrep.cpp @@ -26,7 +26,7 @@ bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker) { auto AsyncRecvProc = [this, rrcb](BHMsg &msg) { - auto Find = [&](RecvCB &cb) { + auto Find = [&](RecvBHMsgCB &cb) { std::lock_guard<std::mutex> lock(mutex()); const std::string &msgid = msg.msg_id(); auto pos = async_cbs_.find(msgid); @@ -39,10 +39,10 @@ } }; - RecvCB cb; - if (Find(cb) && cb) { + RecvBHMsgCB cb; + if (Find(cb)) { cb(msg); - } else if (rrcb && msg.type() == kMsgTypeReply) { + } else if (msg.type() == kMsgTypeReply) { DataReply reply; if (reply.ParseFromString(msg.body())) { rrcb(reply.data()); @@ -55,6 +55,20 @@ return Start(AsyncRecvProc, nworker); } +bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms) +{ + try { + BHAddress addr; + if (QueryRPCTopic(topic, addr, timeout_ms)) { + const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); + return AsyncSend(addr.mq_id().data(), &msg, timeout_ms); + } else { + return false; + } + } catch (...) { + return false; + } +} bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) { auto Call = [&](const void *remote) { @@ -103,7 +117,17 @@ return false; } -bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb) +bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms) +{ + assert(remote && pmsg); + try { + const BHMsg &msg = *static_cast<const BHMsg *>(pmsg); + return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms); + } catch (...) { + return false; + } +} +bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvBHMsgCB &cb) { assert(remote && pmsg); try { diff --git a/src/reqrep.h b/src/reqrep.h index 9e43c7b..8a4743c 100644 --- a/src/reqrep.h +++ b/src/reqrep.h @@ -43,9 +43,15 @@ bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); } bool Stop() { return Socket::Stop(); } bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb); + bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms); + bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb) { return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb); + } + bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms) + { + return AsyncRequest(topic, data.data(), data.size(), timeout_ms); } bool SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms); bool SyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms) @@ -54,10 +60,11 @@ } private: - bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb); + bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvBHMsgCB &cb); + bool AsyncSend(const void *remote, const void *msg, const int timeout_ms); bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms); bool QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); - std::unordered_map<std::string, RecvCB> async_cbs_; + std::unordered_map<std::string, RecvBHMsgCB> async_cbs_; typedef bhome_msg::BHAddress Address; class TopicCache diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp index 2356ebc..e52b0fd 100644 --- a/src/reqrep_center.cpp +++ b/src/reqrep_center.cpp @@ -99,63 +99,81 @@ std::unordered_map<Topic, WeakNode> topic_map_; std::unordered_map<ProcId, Node> nodes_; }; + +Synced<NodeCenter> &Center() +{ + static Synced<NodeCenter> s; + return s; +} + } // namespace -bool ReqRepCenter::Start(const int nworker) +BHCenter::MsgHandler MakeReqRepCenter() { auto center_ptr = std::make_shared<Synced<NodeCenter>>(); - auto onRecv = [center_ptr, this](BHMsg &msg) { + return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) { auto ¢er = *center_ptr; + auto &shm = socket.shm(); #ifndef NDEBUG static std::atomic<time_t> last(0); time_t now = 0; time(&now); if (last.exchange(now) < now) { - printf("bus queue size: %ld\n", socket_.Pending()); + printf("bus queue size: %ld\n", socket.Pending()); } #endif - if (msg.route_size() == 0) { - return; - } - auto &src_mq = msg.route(0).mq_id(); + auto SrcMQ = [&]() { return msg.route(0).mq_id(); }; auto OnRegister = [&]() { + if (msg.route_size() != 1) { return; } + DataProcRegister reg; if (reg.ParseFromString(msg.body()) && reg.has_proc()) { - center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end()); + center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end()); } }; auto OnHeartbeat = [&]() { + if (msg.route_size() != 1) { return; } + auto &src_mq = msg.route(0).mq_id(); + DataProcHeartbeat hb; if (hb.ParseFromString(msg.body()) && hb.has_proc()) { - center->Heartbeat(*hb.mutable_proc(), src_mq); + center->Heartbeat(*hb.mutable_proc(), SrcMQ()); } }; auto OnQueryTopic = [&]() { + if (msg.route_size() != 1) { return; } + DataProcQueryTopic query; NodeCenter::ProcAddr dest; if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) { MQId remote; - memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote)); + memcpy(&remote, SrcMQ().data(), sizeof(MQId)); MsgI imsg; - if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; } - if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) { - imsg.Release(shm()); + if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; } + if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) { + imsg.Release(shm); } } }; switch (msg.type()) { - case kMsgTypeProcRegisterTopics: OnRegister(); break; - case kMsgTypeProcHeartbeat: OnHeartbeat(); break; - case kMsgTypeProcQueryTopic: OnQueryTopic(); break; - default: break; + case kMsgTypeProcRegisterTopics: OnRegister(); return true; + case kMsgTypeProcHeartbeat: OnHeartbeat(); return true; + case kMsgTypeProcQueryTopic: OnQueryTopic(); return true; + default: return false; } }; +} + +bool ReqRepCenter::Start(const int nworker) +{ + auto handler = MakeReqRepCenter(); + printf("sizeof(rep/req handler) = %ld\n", sizeof(handler)); const int kMaxWorker = 16; - return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); + return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); } \ No newline at end of file diff --git a/src/reqrep_center.h b/src/reqrep_center.h index 6473841..326ac7a 100644 --- a/src/reqrep_center.h +++ b/src/reqrep_center.h @@ -18,24 +18,18 @@ #ifndef REQREP_CENTER_US3RBM60 #define REQREP_CENTER_US3RBM60 +#include "center.h" #include "defs.h" #include "socket.h" +BHCenter::MsgHandler MakeReqRepCenter(); class ReqRepCenter { - class Socket : public ShmSocket - { - public: - Socket(ShmSocket::Shm &shm) : - ShmSocket(shm, &kBHTopicReqRepCenter, 1000) {} - using ShmSocket::shm; - }; - Socket socket_; - ShmSocket::Shm &shm() { return socket_.shm(); } + ShmSocket socket_; public: ReqRepCenter(ShmSocket::Shm &shm) : - socket_(shm) {} + socket_(shm, &kBHTopicReqRepCenter, 1000) {} ReqRepCenter() : ReqRepCenter(BHomeShm()) {} ~ReqRepCenter() { Stop(); } diff --git a/src/socket.cpp b/src/socket.cpp index b9519be..73681f1 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -49,7 +49,7 @@ Stop(); //TODO should stop in sub class, incase thread access sub class data. } -bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker) +bool ShmSocket::Start(const RecvCB &onData, int nworker) { if (!mq_) { return false; @@ -62,7 +62,12 @@ try { MsgI imsg; DEFER1(imsg.Release(shm_)); - if (mq_->Recv(imsg, 100)) { onData(imsg); } + if (mq_->Recv(imsg, 100)) { + BHMsg msg; + if (imsg.Unpack(msg)) { + onData(*this, imsg, msg); + } + } } catch (...) { } } @@ -73,11 +78,6 @@ workers_.emplace_back(RecvProc); } return true; -} - -bool ShmSocket::Start(const RecvCB &onData, int nworker) -{ - return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker); } bool ShmSocket::Stop() diff --git a/src/socket.h b/src/socket.h index 20da7c0..1a3d47b 100644 --- a/src/socket.h +++ b/src/socket.h @@ -35,21 +35,24 @@ public: typedef bhome_shm::SharedMemory Shm; - typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB; - typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB; + typedef std::function<void(ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg)> RecvCB; + typedef std::function<void(bhome_msg::BHMsg &msg)> RecvBHMsgCB; + ShmSocket(Shm &shm, const void *id, const int len); ShmSocket(Shm &shm, const int len = 12); ~ShmSocket(); + Shm &shm() { return shm_; } // start recv. bool Start(const RecvCB &onData, int nworker = 1); - bool StartRaw(const RecvRawCB &onData, int nworker = 1); + bool Start(const RecvBHMsgCB &onData, int nworker = 1) + { + return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, nworker); + } bool Stop(); size_t Pending() const { return mq_ ? mq_->Pending() : 0; } protected: - ShmSocket(Shm &shm, const void *id, const int len); - Shm &shm() { return shm_; } const Shm &shm() const { return shm_; } Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid. const Queue &mq() const { return *mq_; } diff --git a/utest/utest.cpp b/utest/utest.cpp index 637ae26..55a08a3 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -181,14 +181,31 @@ auto Client = [&](const std::string &topic, const int nreq) { SocketRequest client(shm); + std::atomic<int> count(0); std::string reply; + auto onRecv = [&](const std::string &rep) { + reply = rep; + if (++count >= nreq) { + printf("count: %d\n", count.load()); + } + }; + client.StartWorker(onRecv, 1); boost::timer::auto_cpu_timer timer; for (int i = 0; i < nreq; ++i) { - if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) { + if (!client.AsyncRequest(topic, "data " + std::to_string(i), 1000)) { printf("client request failed\n"); } + // if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) { + // printf("client request failed\n"); + // } else { + // ++count; + // } } printf("request %s %d done ", topic.c_str(), nreq); + while (count.load() < nreq) { + std::this_thread::yield(); + } + client.Stop(); }; auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { SocketReply server(shm); @@ -212,7 +229,7 @@ servers.Launch(Server, "server", topics); std::this_thread::sleep_for(100ms); for (auto &t : topics) { - clients.Launch(Client, t, 1000 * 100); + clients.Launch(Client, t, 1000 * 1000); } clients.WaitAll(); run = false; -- Gitblit v1.8.0