src/pubsub.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/pubsub.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/socket.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/socket.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
utest/utest.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/pubsub.cpp
@@ -18,7 +18,6 @@ #include "pubsub.h" #include "bh_util.h" #include "defs.h" #include <chrono> namespace bhome_shm { @@ -28,158 +27,110 @@ using namespace bhome_msg; BusManager::BusManager(SharedMemory &shm) : shm_(shm), busq_(kBHBusQueueId, shm, 16), run_(false) { } BusManager::~BusManager() { Stop(); } shm_(shm), socket_(ShmSocket::eSockBus, shm) {} BusManager::BusManager() : BusManager(BHomeShm()) {} bool BusManager::Start(const int nworker) { std::lock_guard<std::mutex> guard(mutex_); StopNoLock(); // start auto Worker = [&]() { while (this->run_) { BusManager &self = *this; MsgI msg; const int timeout_ms = 100; if (self.busq_.Recv(msg, timeout_ms)) { self.OnMsg(msg); } } }; run_.store(true); const int n = std::min(nworker, kMaxWorker); for (int i = 0; i < n; ++i) { workers_.emplace_back(Worker); } return true; } bool BusManager::Stop() { std::lock_guard<std::mutex> guard(mutex_); return StopNoLock(); } bool BusManager::StopNoLock() { if (run_.exchange(false)) { for (auto &w : workers_) { if (w.joinable()) { w.join(); } } return true; } return false; } void BusManager::OnMsg(MsgI &imsg) { DEFER1(imsg.Release(shm_)); BHMsg msg; if (!imsg.Unpack(msg)) { return; } auto OnSubChange = [&](auto &&update) { DataSub sub; if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) { assert(sizeof(MQId) == msg.route(0).mq_id().size()); MQId client; memcpy(&client, msg.route(0).mq_id().data(), sizeof(client)); std::lock_guard<std::mutex> guard(mutex_); auto &topics = sub.topics(); for (auto &topic : topics) { try { update(topic, client); } catch (...) { //TODO log error } } } }; auto Sub1 = [this](const std::string &topic, const MQId &id) { records_[topic].insert(id); }; auto Unsub1 = [this](const std::string &topic, const MQId &id) { auto pos = records_.find(topic); if (pos != records_.end()) { if (pos->second.erase(id) && pos->second.empty()) { records_.erase(pos); } } }; auto OnPublish = [&]() { DataPub pub; if (!pub.ParseFromString(msg.body())) { auto onRecv = [&](MsgI &imsg) { BHMsg msg; if (!imsg.Unpack(msg)) { return; } auto FindClients = [&](const std::string &topic) { Clients dests; std::lock_guard<std::mutex> guard(mutex_); auto Find1 = [&](const std::string &t) { auto pos = records_.find(topic); if (pos != records_.end() && !pos->second.empty()) { auto &clients = pos->second; for (auto &cli : clients) { dests.insert(cli); auto OnSubChange = [&](auto &&update) { DataSub sub; if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) { assert(sizeof(MQId) == msg.route(0).mq_id().size()); MQId client; memcpy(&client, msg.route(0).mq_id().data(), sizeof(client)); std::lock_guard<std::mutex> guard(mutex_); auto &topics = sub.topics(); for (auto &topic : topics) { try { update(topic, client); } catch (...) { //TODO log error } } }; Find1(topic); } }; //TODO check and adjust topic on client side sub/pub. size_t pos = 0; while (true) { pos = topic.find(kTopicSep, pos); if (pos == topic.npos || ++pos == topic.size()) { // Find1(std::string()); // sub all. break; } else { Find1(topic.substr(0, pos)); auto Sub1 = [this](const std::string &topic, const MQId &id) { records_[topic].insert(id); }; auto Unsub1 = [this](const std::string &topic, const MQId &id) { auto pos = records_.find(topic); if (pos != records_.end()) { if (pos->second.erase(id) && pos->second.empty()) { records_.erase(pos); } } return dests; }; auto Dispatch = [&](auto &&send1) { const Clients &clients(FindClients(pub.topic())); for (auto &cli : clients) { send1(cli); auto OnPublish = [&]() { DataPub pub; if (!pub.ParseFromString(msg.body())) { return; } auto FindClients = [&](const std::string &topic) { Clients dests; std::lock_guard<std::mutex> guard(mutex_); auto Find1 = [&](const std::string &t) { auto pos = records_.find(topic); if (pos != records_.end() && !pos->second.empty()) { auto &clients = pos->second; for (auto &cli : clients) { dests.insert(cli); } } }; Find1(topic); //TODO check and adjust topic on client side sub/pub. size_t pos = 0; while (true) { pos = topic.find(kTopicSep, pos); if (pos == topic.npos || ++pos == topic.size()) { // Find1(std::string()); // sub all. break; } else { Find1(topic.substr(0, pos)); } } return dests; }; auto Dispatch = [&](auto &&send1) { const Clients &clients(FindClients(pub.topic())); for (auto &cli : clients) { send1(cli); } }; if (imsg.IsCounted()) { Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, imsg, 100); }); } else { MsgI pubmsg; if (!pubmsg.MakeRC(shm_, msg)) { return; } DEFER1(pubmsg.Release(shm_)); Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, pubmsg, 100); }); } }; if (imsg.IsCounted()) { Dispatch([&](const MQId &cli) { busq_.Send(cli, imsg, 100); }); } else { MsgI pubmsg; if (!pubmsg.MakeRC(shm_, msg)) { return; } DEFER1(pubmsg.Release(shm_)); Dispatch([&](const MQId &cli) { busq_.Send(cli, pubmsg, 100); }); switch (msg.type()) { case kMsgTypeSubscribe: OnSubChange(Sub1); break; case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break; case kMsgTypePublish: OnPublish(); break; default: break; } }; switch (msg.type()) { case kMsgTypeSubscribe: OnSubChange(Sub1); break; case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break; case kMsgTypePublish: OnPublish(); break; default: break; } return socket_.StartRaw(onRecv, std::min(nworker, kMaxWorker)); } } // namespace bhome_shm src/pubsub.h
@@ -18,13 +18,10 @@ #ifndef PUBSUB_4KGRA997 #define PUBSUB_4KGRA997 #include "shm_queue.h" #include <atomic> #include "socket.h" #include <mutex> #include <set> #include <thread> #include <unordered_map> #include <vector> namespace bhome_shm { @@ -33,21 +30,17 @@ class BusManager { SharedMemory &shm_; ShmMsgQueue busq_; std::atomic<bool> run_; std::vector<std::thread> workers_; ShmSocket socket_; std::mutex mutex_; typedef std::set<MQId> Clients; std::unordered_map<std::string, Clients> records_; bool StopNoLock(); void OnMsg(MsgI &msg); public: BusManager(SharedMemory &shm); ~BusManager(); BusManager(); ~BusManager() { Stop(); } bool Start(const int nworker = 2); bool Stop(); bool Stop() { return socket_.Stop(); } }; } // namespace bhome_shm src/socket.cpp
@@ -20,58 +20,30 @@ #include "bh_util.h" #include "defs.h" #include "msg.h" #include <chrono> using namespace bhome_msg; using namespace bhome_shm; using namespace std::chrono_literals; namespace { int GetSocketDefaultLen(ShmSocket::Type type) { switch (type) { case ShmSocket::eSockRequest: return 12; case ShmSocket::eSockReply: return 64; case ShmSocket::eSockPublish: return 0; case ShmSocket::eSockSubscribe: return 64; default: return 0; } } } // namespace ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) : shm_(shm), type_(type), run_(false) { int len = GetSocketDefaultLen(type); if (len != 0) { mq_.reset(new Queue(shm_, len)); auto RecvProc = [this]() { while (run_) { try { std::unique_lock<std::mutex> lk(mutex_); if (cv_recv_cb_.wait_for(lk, 100ms, [this]() { return HasRecvCB(); })) { BHMsg msg; if (mq_->Recv(msg, 100)) { this->onRecv_(msg); } } } catch (...) { } } }; run_.store(true); workers_.emplace_back(RecvProc); switch (type) { case eSockBus: mq_.reset(new Queue(kBHBusQueueId, shm_, 1000)); break; case eSockRequest: mq_.reset(new Queue(shm_, 12)); break; case eSockReply: mq_.reset(new Queue(shm_, 64)); break; case eSockSubscribe: mq_.reset(new Queue(shm_, 64)); break; case eSockPublish: break; // no recv mq needed default: break; } } ShmSocket::ShmSocket(Type type) : ShmSocket(type, BHomeShm()) { } ShmSocket(type, BHomeShm()) {} ShmSocket::~ShmSocket() { @@ -110,25 +82,63 @@ } } bool ShmSocket::SetRecvCallback(const RecvCB &onRecv) bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker) { auto CanRecv = [this]() { switch (type_) { case eSockRequest: case eSockReply: case eSockBus: case eSockSubscribe: return true; default: return false; } }; if (!CanRecv()) { return false; } std::lock_guard<std::mutex> lock(mutex_); onRecv_ = onRecv; cv_recv_cb_.notify_one(); StopNoLock(); auto RecvProc = [this, onData]() { while (run_) { try { MsgI imsg; DEFER1(imsg.Release(shm_)); if (mq_->Recv(imsg, 100)) { onData(imsg); } } catch (...) { } } }; run_.store(true); for (int i = 0; i < nworker; ++i) { workers_.emplace_back(RecvProc); } return true; } bool ShmSocket::HasRecvCB() bool ShmSocket::Start(const RecvCB &onData, int nworker) { return static_cast<bool>(onRecv_); return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker); } void ShmSocket::Stop() bool ShmSocket::Stop() { run_ = false; for (auto &t : workers_) { if (t.joinable()) { t.join(); std::lock_guard<std::mutex> lock(mutex_); return StopNoLock(); } bool ShmSocket::StopNoLock() { if (run_.exchange(false)) { for (auto &w : workers_) { if (w.joinable()) { w.join(); } } return true; } } return false; } src/socket.h
@@ -21,14 +21,14 @@ #include "shm_queue.h" #include <atomic> #include <condition_variable> #include <boost/noncopyable.hpp> #include <functional> #include <memory> #include <mutex> #include <thread> #include <vector> class ShmSocket class ShmSocket : private boost::noncopyable { typedef bhome_shm::ShmMsgQueue Queue; @@ -38,13 +38,14 @@ eSockReply, eSockSubscribe, eSockPublish, eSockBus, }; typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB; typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB; ShmSocket(Type type); ShmSocket(Type type, bhome_shm::SharedMemory &shm); ShmSocket(Type type); ~ShmSocket(); // bool Request(const std::string &topic, const void *data, const size_t size, onReply); bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv @@ -55,19 +56,19 @@ bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms); bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms); bool RecvSub(std::string &topic, std::string &data, const int timeout_ms); bool SetRecvCallback(const RecvCB &onRecv); // start recv. bool Start(const RecvCB &onData, int nworker = 1); bool StartRaw(const RecvRawCB &onData, int nworker = 1); bool Stop(); private: bool HasRecvCB(); void Stop(); bool StopNoLock(); bhome_shm::SharedMemory &shm_; Type type_; const Type type_; std::vector<std::thread> workers_; std::mutex mutex_; std::condition_variable cv_recv_cb_; std::atomic<bool> run_; RecvCB onRecv_; std::unique_ptr<Queue> mq_; }; utest/utest.cpp
@@ -5,6 +5,7 @@ #include <atomic> #include <boost/uuid/uuid_generators.hpp> #include <boost/uuid/uuid_io.hpp> #include <condition_variable> #include <stdio.h> #include <string> #include <thread> @@ -108,7 +109,7 @@ } // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); }; client.SetRecvCallback(OnRecv); client.Start(OnRecv); std::unique_lock<std::mutex> lk(mutex); cv.wait(lk);