lichao
2021-04-06 4deeafbd502dc3c57dab8ad6ca601a38a9e7f074
src/pubsub.h
@@ -18,38 +18,45 @@
#ifndef PUBSUB_4KGRA997
#define PUBSUB_4KGRA997
#include "shm_queue.h"
#include <atomic>
#include <mutex>
#include <set>
#include <thread>
#include <unordered_map>
#include <vector>
#include "defs.h"
#include "socket.h"
#include <string>
namespace bhome_shm
class SocketPublish
{
// publish/subcribe manager.
class BusManager
{
   SharedMemory &shm_;
   ShmMsgQueue busq_;
   std::atomic<bool> run_;
   std::vector<std::thread> workers_;
   std::mutex mutex_;
   typedef std::set<MQId> Clients;
   std::unordered_map<std::string, Clients> records_;
   bool StopNoLock();
   void OnMsg(MsgI &msg);
   typedef ShmSocket Socket;
   Socket::Shm &shm_;
   Socket::Shm &shm() { return shm_; }
public:
   BusManager(SharedMemory &shm);
   ~BusManager();
   bool Start(const int nworker = 2);
   bool Stop();
   SocketPublish(Socket::Shm &shm) :
       shm_(shm) {}
   SocketPublish() :
       SocketPublish(BHomeShm()) {}
   bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
   bool Publish(const Topic &topic, const std::string &data, const int timeout_ms)
   {
      return Publish(topic, data.data(), data.size(), timeout_ms);
   }
};
} // namespace bhome_shm
// socket subscribe
class SocketSubscribe : private ShmSocket
{
   typedef ShmSocket Socket;
public:
   SocketSubscribe(Socket::Shm &shm) :
       Socket(shm, 64) {}
   SocketSubscribe() :
       SocketSubscribe(BHomeShm()) {}
   ~SocketSubscribe() { Stop(); }
   typedef std::function<void(const Topic &topic, const std::string &data)> TopicDataCB;
   bool StartRecv(const TopicDataCB &tdcb, int nworker = 2);
   bool Stop() { return Socket::Stop(); }
   bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms);
   bool RecvSub(Topic &topic, std::string &data, const int timeout_ms);
};
#endif // end of include guard: PUBSUB_4KGRA997