From b55ffe89f4b237be5f79232cfddfe22bfdb87c64 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 01 四月 2021 13:23:48 +0800
Subject: [PATCH] make req/rep,sub/pub sockets sub class;
---
src/socket.h | 45 +-
src/pubsub_center.h | 53 +++
src/reqrep.h | 57 +++
src/pubsub.h | 50 +-
src/socket.cpp | 210 +-----------
src/pubsub.cpp | 153 ++------
src/pubsub_center.cpp | 134 ++++++++
utest/utest.cpp | 36 -
src/reqrep.cpp | 164 ++++++++++
9 files changed, 537 insertions(+), 365 deletions(-)
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index eff54ab..cfc77ab 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -19,127 +19,58 @@
#include "bh_util.h"
#include "defs.h"
-namespace bhome_shm
-{
-
using namespace std::chrono_literals;
-const int kMaxWorker = 16;
using namespace bhome_msg;
-BusManager::BusManager(SharedMemory &shm) :
- shm_(shm), socket_(ShmSocket::eSockBus, shm) {}
-BusManager::BusManager() :
- BusManager(BHomeShm()) {}
-
-bool BusManager::Start(const int nworker)
+bool SocketPublish::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
{
- auto onRecv = [&](MsgI &imsg) {
-#ifndef NDEBUG
- static std::atomic<time_t> last(0);
- time_t now = 0;
- time(&now);
- if (last.exchange(now) < now) {
- printf("bus queue size: %ld\n", socket_.Pending());
+ try {
+ MsgI imsg;
+ if (!imsg.MakeRC(shm(), MakePub(topic, data, size))) {
+ return false;
}
-#endif
+ DEFER1(imsg.Release(shm()));
+ return ShmMsgQueue::Send(shm(), kBHBusQueueId, imsg, timeout_ms);
+ } catch (...) {
+ return false;
+ }
+}
- BHMsg msg;
- if (!imsg.Unpack(msg)) {
- return;
- }
+bool SocketSubscribe::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
+{
+ try {
+ return mq().Send(kBHBusQueueId, MakeSub(mq().Id(), topics), timeout_ms);
+ } catch (...) {
+ return false;
+ }
+}
- auto OnSubChange = [&](auto &&update) {
- DataSub sub;
- if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
- assert(sizeof(MQId) == msg.route(0).mq_id().size());
- MQId client;
- memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
-
- std::lock_guard<std::mutex> guard(mutex_);
- auto &topics = sub.topics();
- for (auto &topic : topics) {
- try {
- update(topic, client);
- } catch (...) {
- //TODO log error
- }
- }
+bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker)
+{
+ auto AsyncRecvProc = [this, tdcb](BHMsg &msg) {
+ if (msg.type() == kMsgTypePublish) {
+ DataPub d;
+ if (d.ParseFromString(msg.body())) {
+ tdcb(d.topic(), d.data());
}
- };
-
- auto Sub1 = [this](const std::string &topic, const MQId &id) {
- records_[topic].insert(id);
- };
-
- auto Unsub1 = [this](const std::string &topic, const MQId &id) {
- auto pos = records_.find(topic);
- if (pos != records_.end()) {
- if (pos->second.erase(id) && pos->second.empty()) {
- records_.erase(pos);
- }
- }
- };
-
- auto OnPublish = [&]() {
- DataPub pub;
- if (!pub.ParseFromString(msg.body())) {
- return;
- }
- auto FindClients = [&](const std::string &topic) {
- Clients dests;
- std::lock_guard<std::mutex> guard(mutex_);
- auto Find1 = [&](const std::string &t) {
- auto pos = records_.find(topic);
- if (pos != records_.end() && !pos->second.empty()) {
- auto &clients = pos->second;
- for (auto &cli : clients) {
- dests.insert(cli);
- }
- }
- };
- Find1(topic);
-
- //TODO check and adjust topic on client side sub/pub.
- size_t pos = 0;
- while (true) {
- pos = topic.find(kTopicSep, pos);
- if (pos == topic.npos || ++pos == topic.size()) {
- // Find1(std::string()); // sub all.
- break;
- } else {
- Find1(topic.substr(0, pos));
- }
- }
- return dests;
- };
-
- auto Dispatch = [&](auto &&send1) {
- const Clients &clients(FindClients(pub.topic()));
- for (auto &cli : clients) {
- send1(cli);
- }
- };
-
- if (imsg.IsCounted()) {
- Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, imsg, 100); });
- } else {
- MsgI pubmsg;
- if (!pubmsg.MakeRC(shm_, msg)) { return; }
- DEFER1(pubmsg.Release(shm_));
-
- Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, pubmsg, 100); });
- }
- };
-
- switch (msg.type()) {
- case kMsgTypeSubscribe: OnSubChange(Sub1); break;
- case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
- case kMsgTypePublish: OnPublish(); break;
- default: break;
+ } else {
+ // ignored, or dropped
}
};
- return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
+ return tdcb && Start(AsyncRecvProc, nworker);
}
-} // namespace bhome_shm
+bool SocketSubscribe::RecvSub(std::string &topic, std::string &data, const int timeout_ms)
+{
+ BHMsg msg;
+ if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) {
+ DataPub d;
+ if (d.ParseFromString(msg.body())) {
+ d.mutable_topic()->swap(topic);
+ d.mutable_data()->swap(data);
+ return true;
+ }
+ }
+ return false;
+}
\ No newline at end of file
diff --git a/src/pubsub.h b/src/pubsub.h
index be6521f..cad9f61 100644
--- a/src/pubsub.h
+++ b/src/pubsub.h
@@ -18,31 +18,43 @@
#ifndef PUBSUB_4KGRA997
#define PUBSUB_4KGRA997
+#include "defs.h"
#include "socket.h"
-#include <mutex>
-#include <set>
-#include <unordered_map>
+#include <string>
-namespace bhome_shm
+class SocketPublish
{
-
-// publish/subcribe manager.
-class BusManager
-{
- SharedMemory &shm_;
- ShmSocket socket_;
- std::mutex mutex_;
- typedef std::set<MQId> Clients;
- std::unordered_map<std::string, Clients> records_;
+ typedef ShmSocket Socket;
+ Socket::Shm &shm_;
+ Socket::Shm &shm() { return shm_; }
public:
- BusManager(SharedMemory &shm);
- BusManager();
- ~BusManager() { Stop(); }
- bool Start(const int nworker = 2);
- bool Stop() { return socket_.Stop(); }
+ SocketPublish(Socket::Shm &shm) :
+ shm_(shm) {}
+ SocketPublish() :
+ SocketPublish(BHomeShm()) {}
+ bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
+ bool Publish(const std::string &topic, const std::string &data, const int timeout_ms)
+ {
+ return Publish(topic, data.data(), data.size(), timeout_ms);
+ }
};
-} // namespace bhome_shm
+// socket subscribe
+class SocketSubscribe : private ShmSocket
+{
+ typedef ShmSocket Socket;
+
+public:
+ SocketSubscribe(Socket::Shm &shm) :
+ Socket(shm, 64) {}
+ SocketSubscribe() :
+ SocketSubscribe(BHomeShm()) {}
+
+ typedef std::function<void(const std::string &topic, const std::string &data)> TopicDataCB;
+ bool StartRecv(const TopicDataCB &tdcb, int nworker = 2);
+ bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
+ bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
+};
#endif // end of include guard: PUBSUB_4KGRA997
diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp
new file mode 100644
index 0000000..33c16be
--- /dev/null
+++ b/src/pubsub_center.cpp
@@ -0,0 +1,134 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: pubsub_center.cpp
+ *
+ * Description: pub/sub center/manager
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�01鏃� 09鏃�29鍒�04绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (),
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#include "pubsub_center.h"
+#include "bh_util.h"
+
+PubSubCenter::PubSubCenter(SharedMemory &shm) :
+ socket_(shm) {}
+
+bool PubSubCenter::Start(const int nworker)
+{
+ auto onRecv = [&](MsgI &imsg) {
+#ifndef NDEBUG
+ static std::atomic<time_t> last(0);
+ time_t now = 0;
+ time(&now);
+ if (last.exchange(now) < now) {
+ printf("bus queue size: %ld\n", socket_.Pending());
+ }
+#endif
+
+ BHMsg msg;
+ if (!imsg.Unpack(msg)) {
+ return;
+ }
+
+ auto OnSubChange = [&](auto &&update) {
+ DataSub sub;
+ if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
+ assert(sizeof(MQId) == msg.route(0).mq_id().size());
+ MQId client;
+ memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
+
+ std::lock_guard<std::mutex> guard(mutex_);
+ auto &topics = sub.topics();
+ for (auto &topic : topics) {
+ try {
+ update(topic, client);
+ } catch (...) {
+ //TODO log error
+ }
+ }
+ }
+ };
+
+ auto Sub1 = [this](const std::string &topic, const MQId &id) {
+ records_[topic].insert(id);
+ };
+
+ auto Unsub1 = [this](const std::string &topic, const MQId &id) {
+ auto pos = records_.find(topic);
+ if (pos != records_.end()) {
+ if (pos->second.erase(id) && pos->second.empty()) {
+ records_.erase(pos);
+ }
+ }
+ };
+
+ auto OnPublish = [&]() {
+ DataPub pub;
+ if (!pub.ParseFromString(msg.body())) {
+ return;
+ }
+ auto FindClients = [&](const std::string &topic) {
+ Clients dests;
+ std::lock_guard<std::mutex> guard(mutex_);
+ auto Find1 = [&](const std::string &t) {
+ auto pos = records_.find(topic);
+ if (pos != records_.end() && !pos->second.empty()) {
+ auto &clients = pos->second;
+ for (auto &cli : clients) {
+ dests.insert(cli);
+ }
+ }
+ };
+ Find1(topic);
+
+ //TODO check and adjust topic on client side sub/pub.
+ size_t pos = 0;
+ while (true) {
+ pos = topic.find(kTopicSep, pos);
+ if (pos == topic.npos || ++pos == topic.size()) {
+ // Find1(std::string()); // sub all.
+ break;
+ } else {
+ Find1(topic.substr(0, pos));
+ }
+ }
+ return dests;
+ };
+
+ auto Dispatch = [&](auto &&send1) {
+ const Clients &clients(FindClients(pub.topic()));
+ for (auto &cli : clients) {
+ send1(cli);
+ }
+ };
+
+ if (imsg.IsCounted()) {
+ Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 100); });
+ } else {
+ MsgI pubmsg;
+ if (!pubmsg.MakeRC(shm(), msg)) { return; }
+ DEFER1(pubmsg.Release(shm()));
+
+ Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 100); });
+ }
+ };
+
+ switch (msg.type()) {
+ case kMsgTypeSubscribe: OnSubChange(Sub1); break;
+ case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
+ case kMsgTypePublish: OnPublish(); break;
+ default: break;
+ }
+ };
+
+ const int kMaxWorker = 16;
+ return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
+}
\ No newline at end of file
diff --git a/src/pubsub_center.h b/src/pubsub_center.h
new file mode 100644
index 0000000..866216e
--- /dev/null
+++ b/src/pubsub_center.h
@@ -0,0 +1,53 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: pubsub_center.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�01鏃� 09鏃�29鍒�39绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (),
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef PUBSUB_CENTER_MFSUZJU7
+#define PUBSUB_CENTER_MFSUZJU7
+
+#include "defs.h"
+#include "socket.h"
+#include <mutex>
+#include <set>
+#include <unordered_map>
+using namespace bhome_shm;
+
+// publish/subcribe manager.
+class PubSubCenter
+{
+ class SocketBus : public ShmSocket
+ {
+ public:
+ SocketBus(SharedMemory &shm) :
+ ShmSocket(shm, &kBHBusQueueId, 1000) {}
+ using ShmSocket::shm;
+ };
+ SocketBus socket_;
+ std::mutex mutex_;
+ typedef std::set<MQId> Clients;
+ std::unordered_map<std::string, Clients> records_;
+ ShmSocket::Shm &shm() { return socket_.shm(); }
+
+public:
+ PubSubCenter(SharedMemory &shm);
+ PubSubCenter() :
+ PubSubCenter(BHomeShm()) {}
+ ~PubSubCenter() { Stop(); }
+ bool Start(const int nworker = 2);
+ bool Stop() { return socket_.Stop(); }
+};
+
+#endif // end of include guard: PUBSUB_CENTER_MFSUZJU7
diff --git a/src/reqrep.cpp b/src/reqrep.cpp
new file mode 100644
index 0000000..e1636fd
--- /dev/null
+++ b/src/reqrep.cpp
@@ -0,0 +1,164 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: reqrep.cpp
+ *
+ * Description: topic request/reply sockets
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�01鏃� 09鏃�35鍒�35绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (),
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#include "reqrep.h"
+#include "msg.h"
+#include <chrono>
+#include <condition_variable>
+
+using namespace bhome_msg;
+
+bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker)
+{
+ auto AsyncRecvProc = [this, rrcb](BHMsg &msg) {
+ auto Find = [&](RecvCB &cb) {
+ std::lock_guard<std::mutex> lock(mutex());
+ const std::string &msgid = msg.msg_id();
+ auto pos = async_cbs_.find(msgid);
+ if (pos != async_cbs_.end()) {
+ cb.swap(pos->second);
+ async_cbs_.erase(pos);
+ return true;
+ } else {
+ return false;
+ }
+ };
+
+ RecvCB cb;
+ if (Find(cb) && cb) {
+ cb(msg);
+ } else if (rrcb && msg.type() == kMsgTypeReply) {
+ DataReply reply;
+ if (reply.ParseFromString(msg.body())) {
+ rrcb(reply.data());
+ }
+ } else {
+ // ignored, or dropped
+ }
+ };
+
+ return Start(AsyncRecvProc, nworker);
+}
+
+bool SocketRequest::AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
+{
+ auto Call = [&](const void *remote) {
+ const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
+ auto onRecv = [cb](BHMsg &msg) {
+ if (msg.type() == kMsgTypeReply) {
+ DataReply reply;
+ if (reply.ParseFromString(msg.body())) {
+ cb(reply.data());
+ }
+ }
+ };
+ return AsyncSend(remote, &msg, timeout_ms, onRecv);
+ };
+
+ try {
+ BHAddress addr;
+ if (QueryRPCTopic(topic, addr, timeout_ms)) {
+ return Call(addr.mq_id().data());
+ }
+ } catch (...) {
+ return false;
+ }
+}
+
+bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out)
+{
+ try {
+ BHAddress addr;
+ if (QueryRPCTopic(topic, addr, timeout_ms)) {
+ const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
+ BHMsg reply;
+ if (SyncSendAndRecv(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
+ DataReply dr;
+ if (dr.ParseFromString(msg.body())) {
+ dr.mutable_data()->swap(out);
+ return true;
+ }
+ }
+ }
+ } catch (...) {
+ return false;
+ }
+}
+
+bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
+{
+ assert(remote && pmsg);
+ try {
+ const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
+ auto RegisterCB = [&]() {
+ std::lock_guard<std::mutex> lock(mutex());
+ async_cbs_.emplace(msg.msg_id(), cb);
+ };
+
+ return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms, RegisterCB);
+ } catch (...) {
+ return false;
+ }
+}
+
+bool SocketRequest::SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms)
+{
+ struct State {
+ std::mutex mutex;
+ std::condition_variable cv;
+ bool canceled = false;
+ };
+
+ try {
+ std::shared_ptr<State> st(new State);
+ auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
+
+ auto OnRecv = [=](BHMsg &msg) {
+ std::unique_lock<std::mutex> lk(st->mutex);
+ if (!st->canceled) {
+ static_cast<BHMsg *>(result)->Swap(&msg);
+ st->cv.notify_one();
+ } // else result is no longer valid.
+ };
+
+ std::unique_lock<std::mutex> lk(st->mutex);
+ if (AsyncSend(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
+ return true;
+ } else {
+ st->canceled = true;
+ return false;
+ }
+ } catch (...) {
+ return false;
+ }
+}
+
+bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
+{
+ BHMsg result;
+ const BHMsg &msg = MakeQueryTopic(topic);
+ if (SyncSendAndRecv(&kBHTopicRPCId, &msg, &result, timeout_ms)) {
+ if (result.type() == kMsgTypeQueryTopicReply) {
+ DataQueryTopicReply reply;
+ if (reply.ParseFromString(result.body())) {
+ addr = reply.address();
+ return !addr.mq_id().empty();
+ }
+ }
+ }
+ return false;
+}
diff --git a/src/reqrep.h b/src/reqrep.h
new file mode 100644
index 0000000..02cc86f
--- /dev/null
+++ b/src/reqrep.h
@@ -0,0 +1,57 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: reqrep.h
+ *
+ * Description: topic request/reply sockets
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�01鏃� 09鏃�36鍒�06绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (),
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef REQREP_ACEH09NK
+#define REQREP_ACEH09NK
+
+#include "defs.h"
+#include "socket.h"
+#include <functional>
+#include <unordered_map>
+
+class SocketRequest : private ShmSocket
+{
+ typedef ShmSocket Socket;
+
+public:
+ SocketRequest(Socket::Shm &shm) :
+ Socket(shm, 64) { StartWorker(); }
+ SocketRequest() :
+ SocketRequest(BHomeShm()) {}
+
+ typedef std::function<void(const std::string &data)> RequestResultCB;
+ bool StartWorker(const RequestResultCB &rrcb, int nworker = 2);
+ bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
+ bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
+ bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
+ {
+ return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
+ }
+ bool SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out);
+ bool SyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, std::string &out)
+ {
+ return SyncRequest(topic, data.data(), data.size(), timeout_ms, out);
+ }
+
+private:
+ bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
+ bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
+ bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
+ std::unordered_map<std::string, RecvCB> async_cbs_;
+};
+
+#endif // end of include guard: REQREP_ACEH09NK
diff --git a/src/socket.cpp b/src/socket.cpp
index 13f1e38..4c2fc6b 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -20,8 +20,6 @@
#include "bh_util.h"
#include "defs.h"
#include "msg.h"
-#include <chrono>
-#include <condition_variable>
using namespace bhome_msg;
using namespace bhome_shm;
@@ -31,78 +29,33 @@
} // namespace
-//TODO maybe change to base class, each type is a sub class.
-
-ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) :
- shm_(shm), type_(type), run_(false)
+ShmSocket::ShmSocket(Shm &shm, const void *id, const int len) :
+ shm_(shm), run_(false)
{
- switch (type) {
- case eSockBus: mq_.reset(new Queue(kBHBusQueueId, shm_, 1000)); break;
- case eSockRequest: mq_.reset(new Queue(shm_, 12)); break;
- case eSockReply: mq_.reset(new Queue(shm_, 64)); break;
- case eSockSubscribe: mq_.reset(new Queue(shm_, 64)); break;
- case eSockPublish: break; // no recv mq needed
- default: break;
+ if (id && len > 0) {
+ mq_.reset(new Queue(*static_cast<const MQId *>(id), shm, len));
}
}
-
-ShmSocket::ShmSocket(Type type) :
- ShmSocket(type, BHomeShm()) {}
+ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
+ shm_(shm), run_(false)
+{
+ if (len > 0) {
+ mq_.reset(new Queue(shm_, len));
+ }
+}
ShmSocket::~ShmSocket()
{
Stop();
}
-bool ShmSocket::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
-{
- if (type_ != eSockPublish) {
- return false;
- }
- assert(!mq_);
- try {
- MsgI imsg;
- if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) {
- return false;
- }
- DEFER1(imsg.Release(shm_));
- return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms);
- } catch (...) {
- return false;
- }
-}
-
-bool ShmSocket::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
-{
- if (type_ != eSockSubscribe) {
- return false;
- }
- assert(mq_);
- try {
- return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms);
- } catch (...) {
- return false;
- }
-}
-
bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker)
{
- auto CanRecv = [this]() {
- switch (type_) {
- case eSockRequest:
- case eSockReply:
- case eSockBus:
- case eSockSubscribe:
- return true;
- default:
- return false;
- }
- };
- if (!CanRecv()) {
+ if (!mq_) {
return false;
}
- std::lock_guard<std::mutex> lock(mutex_);
+ std::lock_guard<std::mutex> lock(mutex_);
StopNoLock();
auto RecvProc = [this, onData]() {
while (run_) {
@@ -127,31 +80,6 @@
return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker);
}
-bool ShmSocket::StartAsync(int nworker)
-{
- auto AsyncRecvProc = [this](BHMsg &msg) {
- auto Find = [&](RecvCB &cb) {
- std::lock_guard<std::mutex> lock(mutex_);
- const std::string &msgid = msg.msg_id();
- auto pos = async_cbs_.find(msgid);
- if (pos != async_cbs_.end()) {
- cb.swap(pos->second);
- async_cbs_.erase(pos);
- return true;
- } else {
- return false;
- }
- };
-
- RecvCB cb;
- if (Find(cb) && cb) {
- cb(msg);
- }
- };
-
- return Start(AsyncRecvProc, nworker);
-}
-
bool ShmSocket::Stop()
{
std::lock_guard<std::mutex> lock(mutex_);
@@ -166,118 +94,28 @@
w.join();
}
}
+ workers_.clear();
return true;
}
return false;
}
-bool ShmSocket::AsyncRequest(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
+bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms)
{
- if (type_ != eSockRequest) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (!mq_ || RunningNoLock()) {
return false;
- }
- assert(remote && pmsg && !mq_);
- try {
- const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
- auto RegisterCB = [&]() {
- std::lock_guard<std::mutex> lock(mutex_);
- async_cbs_.emplace(msg.msg_id(), cb);
- };
-
- return mq_->Send(*static_cast<const MQId *>(remote), msg, timeout_ms, RegisterCB);
- } catch (...) {
- return false;
+ } else {
+ return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms);
}
}
-bool ShmSocket::SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms)
+bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms)
{
- struct State {
- std::mutex mutex;
- std::condition_variable cv;
- bool canceled = false;
- };
-
- try {
- std::shared_ptr<State> st(new State);
- auto OnRecv = [=](BHMsg &msg) {
- std::unique_lock<std::mutex> lk(st->mutex);
- if (!st->canceled) {
- static_cast<BHMsg *>(result)->Swap(&msg);
- st->cv.notify_one();
- }
- };
-
- std::unique_lock<std::mutex> lk(st->mutex);
- auto end = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
- if (AsyncRequest(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, end) == std::cv_status::no_timeout) {
- return true;
- } else {
- st->canceled = true;
- return false;
- }
- } catch (...) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (!mq_ || RunningNoLock()) {
return false;
- }
-}
-
-bool ShmSocket::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
-{
- BHMsg result;
- const BHMsg &msg = MakeQueryTopic(topic);
- if (SyncRequest(&kBHTopicRPCId, &msg, &result, timeout_ms)) {
- if (result.type() == kMsgTypeQueryTopicReply) {
- DataQueryTopicReply reply;
- if (reply.ParseFromString(result.body())) {
- addr = reply.address();
- return !addr.mq_id().empty();
- }
- }
- }
- return false;
-}
-
-bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
-{
- auto Call = [&](const void *remote) {
- const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size));
- auto onRecv = [cb](BHMsg &msg) {
- if (msg.type() == kMsgTypeReply) {
- DataReply reply;
- if (reply.ParseFromString(msg.body())) {
- cb(reply.data().data(), reply.data().size());
- }
- }
- };
- return AsyncRequest(remote, &msg, timeout_ms, onRecv);
- };
-
- try {
- BHAddress addr;
- if (QueryRPCTopic(topic, addr, timeout_ms)) {
- return Call(addr.mq_id().data());
- }
- } catch (...) {
- return false;
- }
-}
-
-bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out)
-{
- try {
- BHAddress addr;
- if (QueryRPCTopic(topic, addr, timeout_ms)) {
- const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size));
- BHMsg reply;
- if (SyncRequest(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
- DataReply dr;
- if (dr.ParseFromString(msg.body())) {
- dr.mutable_data()->swap(out);
- return true;
- }
- }
- }
- } catch (...) {
- return false;
+ } else {
+ return mq_->Recv(msg, timeout_ms);
}
}
diff --git a/src/socket.h b/src/socket.h
index eee5b5b..20da7c0 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -26,59 +26,48 @@
#include <memory>
#include <mutex>
#include <thread>
-#include <unordered_map>
#include <vector>
class ShmSocket : private boost::noncopyable
{
+protected:
typedef bhome_shm::ShmMsgQueue Queue;
public:
- enum Type {
- eSockRequest,
- eSockReply,
- eSockSubscribe,
- eSockPublish,
- eSockBus,
- };
+ typedef bhome_shm::SharedMemory Shm;
typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB;
typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB;
- typedef std::function<void(const void *data, const size_t size)> RequestResultCB;
- ShmSocket(Type type, bhome_shm::SharedMemory &shm);
- ShmSocket(Type type);
+ ShmSocket(Shm &shm, const int len = 12);
~ShmSocket();
- bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
- bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out);
-
- // bool HandleRequest(onData);
- bool ReadRequest(); // exclude with HandleRequest
- bool SendReply(); // exclude with HandleRequest
-
- bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
- bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
- bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
// start recv.
bool Start(const RecvCB &onData, int nworker = 1);
bool StartRaw(const RecvRawCB &onData, int nworker = 1);
- bool StartAsync(int nworker = 2);
bool Stop();
size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
+protected:
+ ShmSocket(Shm &shm, const void *id, const int len);
+ Shm &shm() { return shm_; }
+ const Shm &shm() const { return shm_; }
+ Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid.
+ const Queue &mq() const { return *mq_; }
+ std::mutex &mutex() { return mutex_; }
+
+ bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms);
+ bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms);
+
private:
- bool AsyncRequest(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
- bool SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms);
- bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
bool StopNoLock();
- bhome_shm::SharedMemory &shm_;
- const Type type_;
+ bool RunningNoLock() { return !workers_.empty(); }
+
+ Shm &shm_;
std::vector<std::thread> workers_;
std::mutex mutex_;
std::atomic<bool> run_;
std::unique_ptr<Queue> mq_;
- std::unordered_map<std::string, RecvCB> async_cbs_;
};
#endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/utest/utest.cpp b/utest/utest.cpp
index fbe4d51..b95e646 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,5 +1,6 @@
#include "defs.h"
#include "pubsub.h"
+#include "pubsub_center.h"
#include "socket.h"
#include "util.h"
#include <atomic>
@@ -66,7 +67,7 @@
BOOST_AUTO_TEST_CASE(PubSubTest)
{
const std::string shm_name("ShmPubSub");
- // ShmRemover auto_remove(shm_name); //remove twice? in case of killed?
+ ShmRemover auto_remove(shm_name); //remove twice? in case of killed?
SharedMemory shm(shm_name, 1024 * 1024 * 50);
DEFER1(shm.Remove());
auto Avail = [&]() { return shm.get_free_memory(); };
@@ -75,57 +76,50 @@
printf("flag = %d\n", *flag);
++*flag;
- BusManager bus(shm);
+ PubSubCenter bus(shm);
bus.Start();
+
std::this_thread::sleep_for(100ms);
- std::atomic<uint64_t> count(0);
+ std::atomic<uint64_t> total_count(0);
std::atomic<ptime> last_time(Now() - seconds(1));
std::atomic<uint64_t> last_count(0);
const uint64_t nmsg = 100 * 2;
const int timeout = 1000;
auto Sub = [&](int id, const std::vector<std::string> &topics) {
- ShmSocket client(ShmSocket::eSockSubscribe, shm);
+ SocketSubscribe client(shm);
bool r = client.Subscribe(topics, timeout);
std::mutex mutex;
std::condition_variable cv;
- uint64_t i = 0;
- auto OnRecv = [&](BHMsg &msg) {
- if (msg.type() != kMsgTypePublish) {
- BOOST_CHECK(false);
- }
- DataPub pub;
- if (!pub.ParseFromString(msg.body())) {
- BOOST_CHECK(false);
- }
- ++count;
+ std::atomic<uint64_t> n(0);
+ auto OnTopicData = [&](const std::string &topic, const std::string &data) {
+ ++total_count;
auto cur = Now();
if (last_time.exchange(cur) < cur) {
std::cout << "time: " << cur;
printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
- count.load(), count - last_count.exchange(count), init_avail - Avail());
+ total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail());
}
- if (++i >= nmsg * topics.size()) {
+ if (++n >= nmsg * topics.size()) {
cv.notify_one();
}
// printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
};
- client.Start(OnRecv);
+ client.StartRecv(OnTopicData, 1);
std::unique_lock<std::mutex> lk(mutex);
cv.wait(lk);
};
auto Pub = [&](const std::string &topic) {
- ShmSocket provider(ShmSocket::eSockPublish, shm);
+ SocketPublish provider(shm);
for (unsigned i = 0; i < nmsg; ++i) {
std::string data = topic + std::to_string(i) + std::string(1000, '-');
- bool r = provider.Publish(topic, data.data(), data.size(), timeout);
- // bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout);
+ bool r = provider.Publish(topic, data, timeout);
if (!r) {
printf("pub ret: %s\n", r ? "ok" : "fail");
}
@@ -151,7 +145,7 @@
threads.WaitAll();
std::cout << "end : " << Now();
printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
- count.load(), count - last_count.exchange(count), init_avail - Avail());
+ total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail());
bus.Stop();
}
--
Gitblit v1.8.0