From 2e99e5311d1b9a53cca17008452cbe49e2af7234 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 31 三月 2021 14:05:09 +0800
Subject: [PATCH] add bus socket for manager; refactor.
---
src/socket.h | 23 +-
src/pubsub.h | 17 -
src/socket.cpp | 104 ++++++++------
src/pubsub.cpp | 221 ++++++++++++-------------------
utest/utest.cpp | 3
5 files changed, 162 insertions(+), 206 deletions(-)
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index d5c7dd2..52285b1 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -18,7 +18,6 @@
#include "pubsub.h"
#include "bh_util.h"
#include "defs.h"
-#include <chrono>
namespace bhome_shm
{
@@ -28,158 +27,110 @@
using namespace bhome_msg;
BusManager::BusManager(SharedMemory &shm) :
- shm_(shm),
- busq_(kBHBusQueueId, shm, 16),
- run_(false)
-{
-}
-
-BusManager::~BusManager()
-{
- Stop();
-}
+ shm_(shm), socket_(ShmSocket::eSockBus, shm) {}
+BusManager::BusManager() :
+ BusManager(BHomeShm()) {}
bool BusManager::Start(const int nworker)
{
- std::lock_guard<std::mutex> guard(mutex_);
- StopNoLock();
- // start
- auto Worker = [&]() {
- while (this->run_) {
- BusManager &self = *this;
- MsgI msg;
- const int timeout_ms = 100;
- if (self.busq_.Recv(msg, timeout_ms)) {
- self.OnMsg(msg);
- }
- }
- };
-
- run_.store(true);
- const int n = std::min(nworker, kMaxWorker);
- for (int i = 0; i < n; ++i) {
- workers_.emplace_back(Worker);
- }
- return true;
-}
-
-bool BusManager::Stop()
-{
- std::lock_guard<std::mutex> guard(mutex_);
- return StopNoLock();
-}
-
-bool BusManager::StopNoLock()
-{
- if (run_.exchange(false)) {
- for (auto &w : workers_) {
- if (w.joinable()) {
- w.join();
- }
- }
- return true;
- }
- return false;
-}
-
-void BusManager::OnMsg(MsgI &imsg)
-{
- DEFER1(imsg.Release(shm_));
-
- BHMsg msg;
- if (!imsg.Unpack(msg)) {
- return;
- }
-
- auto OnSubChange = [&](auto &&update) {
- DataSub sub;
- if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
- assert(sizeof(MQId) == msg.route(0).mq_id().size());
- MQId client;
- memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
-
- std::lock_guard<std::mutex> guard(mutex_);
- auto &topics = sub.topics();
- for (auto &topic : topics) {
- try {
- update(topic, client);
- } catch (...) {
- //TODO log error
- }
- }
- }
- };
-
- auto Sub1 = [this](const std::string &topic, const MQId &id) {
- records_[topic].insert(id);
- };
-
- auto Unsub1 = [this](const std::string &topic, const MQId &id) {
- auto pos = records_.find(topic);
- if (pos != records_.end()) {
- if (pos->second.erase(id) && pos->second.empty()) {
- records_.erase(pos);
- }
- }
- };
-
- auto OnPublish = [&]() {
- DataPub pub;
- if (!pub.ParseFromString(msg.body())) {
+ auto onRecv = [&](MsgI &imsg) {
+ BHMsg msg;
+ if (!imsg.Unpack(msg)) {
return;
}
- auto FindClients = [&](const std::string &topic) {
- Clients dests;
- std::lock_guard<std::mutex> guard(mutex_);
- auto Find1 = [&](const std::string &t) {
- auto pos = records_.find(topic);
- if (pos != records_.end() && !pos->second.empty()) {
- auto &clients = pos->second;
- for (auto &cli : clients) {
- dests.insert(cli);
+
+ auto OnSubChange = [&](auto &&update) {
+ DataSub sub;
+ if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
+ assert(sizeof(MQId) == msg.route(0).mq_id().size());
+ MQId client;
+ memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
+
+ std::lock_guard<std::mutex> guard(mutex_);
+ auto &topics = sub.topics();
+ for (auto &topic : topics) {
+ try {
+ update(topic, client);
+ } catch (...) {
+ //TODO log error
}
}
- };
- Find1(topic);
+ }
+ };
- //TODO check and adjust topic on client side sub/pub.
- size_t pos = 0;
- while (true) {
- pos = topic.find(kTopicSep, pos);
- if (pos == topic.npos || ++pos == topic.size()) {
- // Find1(std::string()); // sub all.
- break;
- } else {
- Find1(topic.substr(0, pos));
+ auto Sub1 = [this](const std::string &topic, const MQId &id) {
+ records_[topic].insert(id);
+ };
+
+ auto Unsub1 = [this](const std::string &topic, const MQId &id) {
+ auto pos = records_.find(topic);
+ if (pos != records_.end()) {
+ if (pos->second.erase(id) && pos->second.empty()) {
+ records_.erase(pos);
}
}
- return dests;
};
- auto Dispatch = [&](auto &&send1) {
- const Clients &clients(FindClients(pub.topic()));
- for (auto &cli : clients) {
- send1(cli);
+ auto OnPublish = [&]() {
+ DataPub pub;
+ if (!pub.ParseFromString(msg.body())) {
+ return;
+ }
+ auto FindClients = [&](const std::string &topic) {
+ Clients dests;
+ std::lock_guard<std::mutex> guard(mutex_);
+ auto Find1 = [&](const std::string &t) {
+ auto pos = records_.find(topic);
+ if (pos != records_.end() && !pos->second.empty()) {
+ auto &clients = pos->second;
+ for (auto &cli : clients) {
+ dests.insert(cli);
+ }
+ }
+ };
+ Find1(topic);
+
+ //TODO check and adjust topic on client side sub/pub.
+ size_t pos = 0;
+ while (true) {
+ pos = topic.find(kTopicSep, pos);
+ if (pos == topic.npos || ++pos == topic.size()) {
+ // Find1(std::string()); // sub all.
+ break;
+ } else {
+ Find1(topic.substr(0, pos));
+ }
+ }
+ return dests;
+ };
+
+ auto Dispatch = [&](auto &&send1) {
+ const Clients &clients(FindClients(pub.topic()));
+ for (auto &cli : clients) {
+ send1(cli);
+ }
+ };
+
+ if (imsg.IsCounted()) {
+ Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, imsg, 100); });
+ } else {
+ MsgI pubmsg;
+ if (!pubmsg.MakeRC(shm_, msg)) { return; }
+ DEFER1(pubmsg.Release(shm_));
+
+ Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, pubmsg, 100); });
}
};
- if (imsg.IsCounted()) {
- Dispatch([&](const MQId &cli) { busq_.Send(cli, imsg, 100); });
- } else {
- MsgI pubmsg;
- if (!pubmsg.MakeRC(shm_, msg)) { return; }
- DEFER1(pubmsg.Release(shm_));
-
- Dispatch([&](const MQId &cli) { busq_.Send(cli, pubmsg, 100); });
+ switch (msg.type()) {
+ case kMsgTypeSubscribe: OnSubChange(Sub1); break;
+ case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
+ case kMsgTypePublish: OnPublish(); break;
+ default: break;
}
};
- switch (msg.type()) {
- case kMsgTypeSubscribe: OnSubChange(Sub1); break;
- case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
- case kMsgTypePublish: OnPublish(); break;
- default: break;
- }
+ return socket_.StartRaw(onRecv, std::min(nworker, kMaxWorker));
}
} // namespace bhome_shm
diff --git a/src/pubsub.h b/src/pubsub.h
index dc3fced..be6521f 100644
--- a/src/pubsub.h
+++ b/src/pubsub.h
@@ -18,13 +18,10 @@
#ifndef PUBSUB_4KGRA997
#define PUBSUB_4KGRA997
-#include "shm_queue.h"
-#include <atomic>
+#include "socket.h"
#include <mutex>
#include <set>
-#include <thread>
#include <unordered_map>
-#include <vector>
namespace bhome_shm
{
@@ -33,21 +30,17 @@
class BusManager
{
SharedMemory &shm_;
- ShmMsgQueue busq_;
- std::atomic<bool> run_;
- std::vector<std::thread> workers_;
+ ShmSocket socket_;
std::mutex mutex_;
typedef std::set<MQId> Clients;
std::unordered_map<std::string, Clients> records_;
- bool StopNoLock();
- void OnMsg(MsgI &msg);
-
public:
BusManager(SharedMemory &shm);
- ~BusManager();
+ BusManager();
+ ~BusManager() { Stop(); }
bool Start(const int nworker = 2);
- bool Stop();
+ bool Stop() { return socket_.Stop(); }
};
} // namespace bhome_shm
diff --git a/src/socket.cpp b/src/socket.cpp
index 21928b8..5eb6756 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -20,58 +20,30 @@
#include "bh_util.h"
#include "defs.h"
#include "msg.h"
-#include <chrono>
using namespace bhome_msg;
using namespace bhome_shm;
-using namespace std::chrono_literals;
namespace
{
-
-int GetSocketDefaultLen(ShmSocket::Type type)
-{
- switch (type) {
- case ShmSocket::eSockRequest: return 12;
- case ShmSocket::eSockReply: return 64;
- case ShmSocket::eSockPublish: return 0;
- case ShmSocket::eSockSubscribe: return 64;
- default: return 0;
- }
-}
} // namespace
ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) :
shm_(shm), type_(type), run_(false)
{
- int len = GetSocketDefaultLen(type);
- if (len != 0) {
- mq_.reset(new Queue(shm_, len));
-
- auto RecvProc = [this]() {
- while (run_) {
- try {
- std::unique_lock<std::mutex> lk(mutex_);
- if (cv_recv_cb_.wait_for(lk, 100ms, [this]() { return HasRecvCB(); })) {
- BHMsg msg;
- if (mq_->Recv(msg, 100)) {
- this->onRecv_(msg);
- }
- }
- } catch (...) {
- }
- }
- };
- run_.store(true);
- workers_.emplace_back(RecvProc);
+ switch (type) {
+ case eSockBus: mq_.reset(new Queue(kBHBusQueueId, shm_, 1000)); break;
+ case eSockRequest: mq_.reset(new Queue(shm_, 12)); break;
+ case eSockReply: mq_.reset(new Queue(shm_, 64)); break;
+ case eSockSubscribe: mq_.reset(new Queue(shm_, 64)); break;
+ case eSockPublish: break; // no recv mq needed
+ default: break;
}
}
ShmSocket::ShmSocket(Type type) :
- ShmSocket(type, BHomeShm())
-{
-}
+ ShmSocket(type, BHomeShm()) {}
ShmSocket::~ShmSocket()
{
@@ -110,25 +82,63 @@
}
}
-bool ShmSocket::SetRecvCallback(const RecvCB &onRecv)
+bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker)
{
+ auto CanRecv = [this]() {
+ switch (type_) {
+ case eSockRequest:
+ case eSockReply:
+ case eSockBus:
+ case eSockSubscribe:
+ return true;
+ default:
+ return false;
+ }
+ };
+ if (!CanRecv()) {
+ return false;
+ }
std::lock_guard<std::mutex> lock(mutex_);
- onRecv_ = onRecv;
- cv_recv_cb_.notify_one();
+
+ StopNoLock();
+ auto RecvProc = [this, onData]() {
+ while (run_) {
+ try {
+ MsgI imsg;
+ DEFER1(imsg.Release(shm_));
+ if (mq_->Recv(imsg, 100)) { onData(imsg); }
+ } catch (...) {
+ }
+ }
+ };
+
+ run_.store(true);
+ for (int i = 0; i < nworker; ++i) {
+ workers_.emplace_back(RecvProc);
+ }
return true;
}
-bool ShmSocket::HasRecvCB()
+bool ShmSocket::Start(const RecvCB &onData, int nworker)
{
- return static_cast<bool>(onRecv_);
+ return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker);
}
-void ShmSocket::Stop()
+bool ShmSocket::Stop()
{
- run_ = false;
- for (auto &t : workers_) {
- if (t.joinable()) {
- t.join();
+ std::lock_guard<std::mutex> lock(mutex_);
+ return StopNoLock();
+}
+
+bool ShmSocket::StopNoLock()
+{
+ if (run_.exchange(false)) {
+ for (auto &w : workers_) {
+ if (w.joinable()) {
+ w.join();
+ }
}
+ return true;
}
-}
\ No newline at end of file
+ return false;
+}
diff --git a/src/socket.h b/src/socket.h
index 92c1b73..b94eca2 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -21,14 +21,14 @@
#include "shm_queue.h"
#include <atomic>
-#include <condition_variable>
+#include <boost/noncopyable.hpp>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
-class ShmSocket
+class ShmSocket : private boost::noncopyable
{
typedef bhome_shm::ShmMsgQueue Queue;
@@ -38,13 +38,14 @@
eSockReply,
eSockSubscribe,
eSockPublish,
+ eSockBus,
};
typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB;
+ typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB;
- ShmSocket(Type type);
ShmSocket(Type type, bhome_shm::SharedMemory &shm);
+ ShmSocket(Type type);
~ShmSocket();
-
// bool Request(const std::string &topic, const void *data, const size_t size, onReply);
bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv
@@ -55,19 +56,19 @@
bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
- bool SetRecvCallback(const RecvCB &onRecv);
+
+ // start recv.
+ bool Start(const RecvCB &onData, int nworker = 1);
+ bool StartRaw(const RecvRawCB &onData, int nworker = 1);
+ bool Stop();
private:
- bool HasRecvCB();
- void Stop();
-
+ bool StopNoLock();
bhome_shm::SharedMemory &shm_;
- Type type_;
+ const Type type_;
std::vector<std::thread> workers_;
std::mutex mutex_;
- std::condition_variable cv_recv_cb_;
std::atomic<bool> run_;
- RecvCB onRecv_;
std::unique_ptr<Queue> mq_;
};
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 473b04e..e24d34a 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -5,6 +5,7 @@
#include <atomic>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
+#include <condition_variable>
#include <stdio.h>
#include <string>
#include <thread>
@@ -108,7 +109,7 @@
}
// printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
};
- client.SetRecvCallback(OnRecv);
+ client.Start(OnRecv);
std::unique_lock<std::mutex> lk(mutex);
cv.wait(lk);
--
Gitblit v1.8.0