From 4deeafbd502dc3c57dab8ad6ca601a38a9e7f074 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 06 四月 2021 19:10:49 +0800 Subject: [PATCH] add uni center. --- src/pubsub.h | 58 +++++++++++++++++++++++++++++++++------------------------- 1 files changed, 33 insertions(+), 25 deletions(-) diff --git a/src/pubsub.h b/src/pubsub.h index c1f98af..3c3d4ad 100644 --- a/src/pubsub.h +++ b/src/pubsub.h @@ -18,37 +18,45 @@ #ifndef PUBSUB_4KGRA997 #define PUBSUB_4KGRA997 -#include "shm_queue.h" -#include <thread> -#include <atomic> -#include <mutex> -#include <vector> -#include <unordered_map> -#include <set> +#include "defs.h" +#include "socket.h" +#include <string> -namespace bhome_shm { - -// publish/subcribe manager. -class BusManager +class SocketPublish { - SharedMemory &shm_; - ShmMsgQueue busq_; - std::atomic<bool> run_; - std::vector<std::thread> workers_; - std::mutex mutex_; - typedef std::set<MQId> Clients; - std::unordered_map<std::string, Clients> records_; + typedef ShmSocket Socket; + Socket::Shm &shm_; + Socket::Shm &shm() { return shm_; } - bool StopNoLock(); - void OnMsg(const BHMsg &msg); public: - BusManager(SharedMemory &shm); - ~BusManager(); - bool Start(const int nworker = 2); - bool Stop(); + SocketPublish(Socket::Shm &shm) : + shm_(shm) {} + SocketPublish() : + SocketPublish(BHomeShm()) {} + bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms); + bool Publish(const Topic &topic, const std::string &data, const int timeout_ms) + { + return Publish(topic, data.data(), data.size(), timeout_ms); + } }; +// socket subscribe +class SocketSubscribe : private ShmSocket +{ + typedef ShmSocket Socket; -} // namespace bhome_shm +public: + SocketSubscribe(Socket::Shm &shm) : + Socket(shm, 64) {} + SocketSubscribe() : + SocketSubscribe(BHomeShm()) {} + ~SocketSubscribe() { Stop(); } + + typedef std::function<void(const Topic &topic, const std::string &data)> TopicDataCB; + bool StartRecv(const TopicDataCB &tdcb, int nworker = 2); + bool Stop() { return Socket::Stop(); } + bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms); + bool RecvSub(Topic &topic, std::string &data, const int timeout_ms); +}; #endif // end of include guard: PUBSUB_4KGRA997 -- Gitblit v1.8.0