From 83085f2ce99cca05d40a19482151873a55e6393a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 02 四月 2021 19:32:21 +0800
Subject: [PATCH] refactor center; add async request no cb.
---
src/socket.h | 13 +-
src/pubsub_center.h | 15 +--
src/reqrep_center.cpp | 54 +++++++---
src/reqrep.h | 11 +
src/reqrep_center.h | 14 --
src/socket.cpp | 14 +-
src/center.h | 16 +++
src/pubsub_center.cpp | 37 +++---
utest/utest.cpp | 21 +++
src/reqrep.cpp | 34 +++++-
src/center.cpp | 13 ++
11 files changed, 165 insertions(+), 77 deletions(-)
diff --git a/src/center.cpp b/src/center.cpp
index db000c4..d6570aa 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -17,6 +17,8 @@
*/
#include "center.h"
#include "defs.h"
+#include "pubsub_center.h"
+#include "reqrep_center.h"
#include "shm.h"
using namespace bhome_shm;
@@ -26,3 +28,14 @@
static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 64);
return shm;
}
+
+BHCenter::BHCenter(Socket::Shm &shm) :
+ socket_(shm) {}
+
+BHCenter::BHCenter() :
+ BHCenter(BHomeShm()) {}
+
+bool BHCenter::Start()
+{
+ return false;
+}
\ No newline at end of file
diff --git a/src/center.h b/src/center.h
index 153cc3e..f0a177c 100644
--- a/src/center.h
+++ b/src/center.h
@@ -18,8 +18,24 @@
#ifndef CENTER_TM9OUQTG
#define CENTER_TM9OUQTG
+#include "socket.h"
+#include <functional>
+
class BHCenter
{
+ typedef ShmSocket Socket;
+
+public:
+ typedef std::function<bool(ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg)> MsgHandler;
+
+ BHCenter(Socket::Shm &shm);
+ BHCenter();
+ ~BHCenter() { Stop(); }
+ bool Start();
+ bool Stop() { return socket_.Stop(); }
+
+private:
+ ShmSocket socket_;
};
#endif // end of include guard: CENTER_TM9OUQTG
diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp
index 3ba5382..b3af47d 100644
--- a/src/pubsub_center.cpp
+++ b/src/pubsub_center.cpp
@@ -77,25 +77,21 @@
} // namespace
-bool PubSubCenter::Start(const int nworker)
+BHCenter::MsgHandler MakeBusCenter()
{
auto bus_ptr = std::make_shared<Synced<BusCenter>>();
- auto onRecv = [bus_ptr, this](MsgI &imsg) {
+ return [bus_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
#ifndef NDEBUG
static std::atomic<time_t> last(0);
time_t now = 0;
time(&now);
if (last.exchange(now) < now) {
- printf("bus queue size: %ld\n", socket_.Pending());
+ printf("bus queue size: %ld\n", socket.Pending());
}
#endif
auto &bus = *bus_ptr;
-
- BHMsg msg;
- if (!imsg.Unpack(msg)) {
- return;
- }
+ auto &shm = socket.shm();
auto OnSubChange = [&](auto &&update) {
DataSub sub;
@@ -106,7 +102,6 @@
update(client, sub.topics());
}
};
-
auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); };
auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); };
@@ -123,24 +118,30 @@
};
if (imsg.IsCounted()) {
- Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 10); });
+ Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, imsg, 10); });
} else {
MsgI pubmsg;
- if (!pubmsg.MakeRC(shm(), msg)) { return; }
- DEFER1(pubmsg.Release(shm()));
+ if (!pubmsg.MakeRC(shm, msg)) { return; }
+ DEFER1(pubmsg.Release(shm));
- Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 10); });
+ Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, pubmsg, 10); });
}
};
switch (msg.type()) {
- case kMsgTypeSubscribe: OnSubChange(Sub); break;
- case kMsgTypeUnsubscribe: OnSubChange(Unsub); break;
- case kMsgTypePublish: OnPublish(); break;
- default: break;
+ case kMsgTypeSubscribe: OnSubChange(Sub); return true;
+ case kMsgTypeUnsubscribe: OnSubChange(Unsub); return true;
+ case kMsgTypePublish: OnPublish(); return true;
+ default: return false;
}
};
+}
+
+bool PubSubCenter::Start(const int nworker)
+{
+ auto handler = MakeBusCenter();
+ printf("sizeof(pub/sub handler) = %ld\n", sizeof(handler));
const int kMaxWorker = 16;
- return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
+ return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
}
\ No newline at end of file
diff --git a/src/pubsub_center.h b/src/pubsub_center.h
index af3a2f4..e79dd96 100644
--- a/src/pubsub_center.h
+++ b/src/pubsub_center.h
@@ -18,28 +18,23 @@
#ifndef PUBSUB_CENTER_MFSUZJU7
#define PUBSUB_CENTER_MFSUZJU7
+#include "center.h"
#include "defs.h"
#include "socket.h"
#include <mutex>
#include <set>
#include <unordered_map>
+BHCenter::MsgHandler MakeBusCenter();
+
// publish/subcribe manager.
class PubSubCenter
{
- class SocketBus : public ShmSocket
- {
- public:
- SocketBus(ShmSocket::Shm &shm) :
- ShmSocket(shm, &kBHTopicBus, 1000) {}
- using ShmSocket::shm;
- };
- SocketBus socket_;
- ShmSocket::Shm &shm() { return socket_.shm(); }
+ ShmSocket socket_;
public:
PubSubCenter(ShmSocket::Shm &shm) :
- socket_(shm) {}
+ socket_(shm, &kBHTopicBus, 1000) {}
PubSubCenter() :
PubSubCenter(BHomeShm()) {}
~PubSubCenter() { Stop(); }
diff --git a/src/reqrep.cpp b/src/reqrep.cpp
index b8e423b..25c0826 100644
--- a/src/reqrep.cpp
+++ b/src/reqrep.cpp
@@ -26,7 +26,7 @@
bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker)
{
auto AsyncRecvProc = [this, rrcb](BHMsg &msg) {
- auto Find = [&](RecvCB &cb) {
+ auto Find = [&](RecvBHMsgCB &cb) {
std::lock_guard<std::mutex> lock(mutex());
const std::string &msgid = msg.msg_id();
auto pos = async_cbs_.find(msgid);
@@ -39,10 +39,10 @@
}
};
- RecvCB cb;
- if (Find(cb) && cb) {
+ RecvBHMsgCB cb;
+ if (Find(cb)) {
cb(msg);
- } else if (rrcb && msg.type() == kMsgTypeReply) {
+ } else if (msg.type() == kMsgTypeReply) {
DataReply reply;
if (reply.ParseFromString(msg.body())) {
rrcb(reply.data());
@@ -55,6 +55,20 @@
return Start(AsyncRecvProc, nworker);
}
+bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
+{
+ try {
+ BHAddress addr;
+ if (QueryRPCTopic(topic, addr, timeout_ms)) {
+ const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
+ return AsyncSend(addr.mq_id().data(), &msg, timeout_ms);
+ } else {
+ return false;
+ }
+ } catch (...) {
+ return false;
+ }
+}
bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
{
auto Call = [&](const void *remote) {
@@ -103,7 +117,17 @@
return false;
}
-bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
+bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms)
+{
+ assert(remote && pmsg);
+ try {
+ const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
+ return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms);
+ } catch (...) {
+ return false;
+ }
+}
+bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvBHMsgCB &cb)
{
assert(remote && pmsg);
try {
diff --git a/src/reqrep.h b/src/reqrep.h
index 9e43c7b..8a4743c 100644
--- a/src/reqrep.h
+++ b/src/reqrep.h
@@ -43,9 +43,15 @@
bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
bool Stop() { return Socket::Stop(); }
bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
+ bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
+
bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
{
return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
+ }
+ bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms)
+ {
+ return AsyncRequest(topic, data.data(), data.size(), timeout_ms);
}
bool SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
bool SyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms)
@@ -54,10 +60,11 @@
}
private:
- bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
+ bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvBHMsgCB &cb);
+ bool AsyncSend(const void *remote, const void *msg, const int timeout_ms);
bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
bool QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
- std::unordered_map<std::string, RecvCB> async_cbs_;
+ std::unordered_map<std::string, RecvBHMsgCB> async_cbs_;
typedef bhome_msg::BHAddress Address;
class TopicCache
diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp
index 2356ebc..e52b0fd 100644
--- a/src/reqrep_center.cpp
+++ b/src/reqrep_center.cpp
@@ -99,63 +99,81 @@
std::unordered_map<Topic, WeakNode> topic_map_;
std::unordered_map<ProcId, Node> nodes_;
};
+
+Synced<NodeCenter> &Center()
+{
+ static Synced<NodeCenter> s;
+ return s;
+}
+
} // namespace
-bool ReqRepCenter::Start(const int nworker)
+BHCenter::MsgHandler MakeReqRepCenter()
{
auto center_ptr = std::make_shared<Synced<NodeCenter>>();
- auto onRecv = [center_ptr, this](BHMsg &msg) {
+ return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
auto ¢er = *center_ptr;
+ auto &shm = socket.shm();
#ifndef NDEBUG
static std::atomic<time_t> last(0);
time_t now = 0;
time(&now);
if (last.exchange(now) < now) {
- printf("bus queue size: %ld\n", socket_.Pending());
+ printf("bus queue size: %ld\n", socket.Pending());
}
#endif
- if (msg.route_size() == 0) {
- return;
- }
- auto &src_mq = msg.route(0).mq_id();
+ auto SrcMQ = [&]() { return msg.route(0).mq_id(); };
auto OnRegister = [&]() {
+ if (msg.route_size() != 1) { return; }
+
DataProcRegister reg;
if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
- center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end());
+ center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end());
}
};
auto OnHeartbeat = [&]() {
+ if (msg.route_size() != 1) { return; }
+ auto &src_mq = msg.route(0).mq_id();
+
DataProcHeartbeat hb;
if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
- center->Heartbeat(*hb.mutable_proc(), src_mq);
+ center->Heartbeat(*hb.mutable_proc(), SrcMQ());
}
};
auto OnQueryTopic = [&]() {
+ if (msg.route_size() != 1) { return; }
+
DataProcQueryTopic query;
NodeCenter::ProcAddr dest;
if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
MQId remote;
- memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote));
+ memcpy(&remote, SrcMQ().data(), sizeof(MQId));
MsgI imsg;
- if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
- if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) {
- imsg.Release(shm());
+ if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
+ if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) {
+ imsg.Release(shm);
}
}
};
switch (msg.type()) {
- case kMsgTypeProcRegisterTopics: OnRegister(); break;
- case kMsgTypeProcHeartbeat: OnHeartbeat(); break;
- case kMsgTypeProcQueryTopic: OnQueryTopic(); break;
- default: break;
+ case kMsgTypeProcRegisterTopics: OnRegister(); return true;
+ case kMsgTypeProcHeartbeat: OnHeartbeat(); return true;
+ case kMsgTypeProcQueryTopic: OnQueryTopic(); return true;
+ default: return false;
}
};
+}
+
+bool ReqRepCenter::Start(const int nworker)
+{
+ auto handler = MakeReqRepCenter();
+ printf("sizeof(rep/req handler) = %ld\n", sizeof(handler));
const int kMaxWorker = 16;
- return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
+ return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
}
\ No newline at end of file
diff --git a/src/reqrep_center.h b/src/reqrep_center.h
index 6473841..326ac7a 100644
--- a/src/reqrep_center.h
+++ b/src/reqrep_center.h
@@ -18,24 +18,18 @@
#ifndef REQREP_CENTER_US3RBM60
#define REQREP_CENTER_US3RBM60
+#include "center.h"
#include "defs.h"
#include "socket.h"
+BHCenter::MsgHandler MakeReqRepCenter();
class ReqRepCenter
{
- class Socket : public ShmSocket
- {
- public:
- Socket(ShmSocket::Shm &shm) :
- ShmSocket(shm, &kBHTopicReqRepCenter, 1000) {}
- using ShmSocket::shm;
- };
- Socket socket_;
- ShmSocket::Shm &shm() { return socket_.shm(); }
+ ShmSocket socket_;
public:
ReqRepCenter(ShmSocket::Shm &shm) :
- socket_(shm) {}
+ socket_(shm, &kBHTopicReqRepCenter, 1000) {}
ReqRepCenter() :
ReqRepCenter(BHomeShm()) {}
~ReqRepCenter() { Stop(); }
diff --git a/src/socket.cpp b/src/socket.cpp
index b9519be..73681f1 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -49,7 +49,7 @@
Stop(); //TODO should stop in sub class, incase thread access sub class data.
}
-bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker)
+bool ShmSocket::Start(const RecvCB &onData, int nworker)
{
if (!mq_) {
return false;
@@ -62,7 +62,12 @@
try {
MsgI imsg;
DEFER1(imsg.Release(shm_));
- if (mq_->Recv(imsg, 100)) { onData(imsg); }
+ if (mq_->Recv(imsg, 100)) {
+ BHMsg msg;
+ if (imsg.Unpack(msg)) {
+ onData(*this, imsg, msg);
+ }
+ }
} catch (...) {
}
}
@@ -73,11 +78,6 @@
workers_.emplace_back(RecvProc);
}
return true;
-}
-
-bool ShmSocket::Start(const RecvCB &onData, int nworker)
-{
- return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker);
}
bool ShmSocket::Stop()
diff --git a/src/socket.h b/src/socket.h
index 20da7c0..1a3d47b 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -35,21 +35,24 @@
public:
typedef bhome_shm::SharedMemory Shm;
- typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB;
- typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB;
+ typedef std::function<void(ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg)> RecvCB;
+ typedef std::function<void(bhome_msg::BHMsg &msg)> RecvBHMsgCB;
+ ShmSocket(Shm &shm, const void *id, const int len);
ShmSocket(Shm &shm, const int len = 12);
~ShmSocket();
+ Shm &shm() { return shm_; }
// start recv.
bool Start(const RecvCB &onData, int nworker = 1);
- bool StartRaw(const RecvRawCB &onData, int nworker = 1);
+ bool Start(const RecvBHMsgCB &onData, int nworker = 1)
+ {
+ return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, nworker);
+ }
bool Stop();
size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
protected:
- ShmSocket(Shm &shm, const void *id, const int len);
- Shm &shm() { return shm_; }
const Shm &shm() const { return shm_; }
Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid.
const Queue &mq() const { return *mq_; }
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 637ae26..55a08a3 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -181,14 +181,31 @@
auto Client = [&](const std::string &topic, const int nreq) {
SocketRequest client(shm);
+ std::atomic<int> count(0);
std::string reply;
+ auto onRecv = [&](const std::string &rep) {
+ reply = rep;
+ if (++count >= nreq) {
+ printf("count: %d\n", count.load());
+ }
+ };
+ client.StartWorker(onRecv, 1);
boost::timer::auto_cpu_timer timer;
for (int i = 0; i < nreq; ++i) {
- if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
+ if (!client.AsyncRequest(topic, "data " + std::to_string(i), 1000)) {
printf("client request failed\n");
}
+ // if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
+ // printf("client request failed\n");
+ // } else {
+ // ++count;
+ // }
}
printf("request %s %d done ", topic.c_str(), nreq);
+ while (count.load() < nreq) {
+ std::this_thread::yield();
+ }
+ client.Stop();
};
auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
SocketReply server(shm);
@@ -212,7 +229,7 @@
servers.Launch(Server, "server", topics);
std::this_thread::sleep_for(100ms);
for (auto &t : topics) {
- clients.Launch(Client, t, 1000 * 100);
+ clients.Launch(Client, t, 1000 * 1000);
}
clients.WaitAll();
run = false;
--
Gitblit v1.8.0