From 4deeafbd502dc3c57dab8ad6ca601a38a9e7f074 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 06 四月 2021 19:10:49 +0800
Subject: [PATCH] add uni center.

---
 src/pubsub.h |   58 +++++++++++++++++++++++++++++++++-------------------------
 1 files changed, 33 insertions(+), 25 deletions(-)

diff --git a/src/pubsub.h b/src/pubsub.h
index c1f98af..3c3d4ad 100644
--- a/src/pubsub.h
+++ b/src/pubsub.h
@@ -18,37 +18,45 @@
 #ifndef PUBSUB_4KGRA997
 #define PUBSUB_4KGRA997
 
-#include "shm_queue.h"
-#include <thread>
-#include <atomic>
-#include <mutex>
-#include <vector>
-#include <unordered_map>
-#include <set>
+#include "defs.h"
+#include "socket.h"
+#include <string>
 
-namespace bhome_shm {
-
-// publish/subcribe manager.
-class BusManager
+class SocketPublish
 {
-    SharedMemory &shm_;
-    ShmMsgQueue busq_;
-    std::atomic<bool> run_;
-    std::vector<std::thread> workers_;
-    std::mutex mutex_;
-    typedef std::set<MQId> Clients;
-    std::unordered_map<std::string, Clients> records_;
+	typedef ShmSocket Socket;
+	Socket::Shm &shm_;
+	Socket::Shm &shm() { return shm_; }
 
-    bool StopNoLock();
-    void OnMsg(const BHMsg &msg);
 public:
-    BusManager(SharedMemory &shm);
-    ~BusManager();
-    bool Start(const int nworker = 2);
-    bool Stop();
+	SocketPublish(Socket::Shm &shm) :
+	    shm_(shm) {}
+	SocketPublish() :
+	    SocketPublish(BHomeShm()) {}
+	bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
+	bool Publish(const Topic &topic, const std::string &data, const int timeout_ms)
+	{
+		return Publish(topic, data.data(), data.size(), timeout_ms);
+	}
 };
 
+// socket subscribe
+class SocketSubscribe : private ShmSocket
+{
+	typedef ShmSocket Socket;
 
-} // namespace bhome_shm
+public:
+	SocketSubscribe(Socket::Shm &shm) :
+	    Socket(shm, 64) {}
+	SocketSubscribe() :
+	    SocketSubscribe(BHomeShm()) {}
+	~SocketSubscribe() { Stop(); }
+
+	typedef std::function<void(const Topic &topic, const std::string &data)> TopicDataCB;
+	bool StartRecv(const TopicDataCB &tdcb, int nworker = 2);
+	bool Stop() { return Socket::Stop(); }
+	bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms);
+	bool RecvSub(Topic &topic, std::string &data, const int timeout_ms);
+};
 
 #endif // end of include guard: PUBSUB_4KGRA997

--
Gitblit v1.8.0