| | |
| | | #ifndef PUBSUB_4KGRA997 |
| | | #define PUBSUB_4KGRA997 |
| | | |
| | | #include "shm_queue.h" |
| | | #include <thread> |
| | | #include <atomic> |
| | | #include <mutex> |
| | | #include <vector> |
| | | #include <unordered_map> |
| | | #include <set> |
| | | #include "defs.h" |
| | | #include "socket.h" |
| | | #include <string> |
| | | |
| | | namespace bhome_shm { |
| | | |
| | | // publish/subcribe manager. |
| | | class BusManager |
| | | class SocketPublish |
| | | { |
| | | SharedMemory &shm_; |
| | | ShmMsgQueue busq_; |
| | | std::atomic<bool> run_; |
| | | std::vector<std::thread> workers_; |
| | | std::mutex mutex_; |
| | | typedef std::set<MQId> Clients; |
| | | std::unordered_map<std::string, Clients> records_; |
| | | typedef ShmSocket Socket; |
| | | Socket::Shm &shm_; |
| | | Socket::Shm &shm() { return shm_; } |
| | | |
| | | bool StopNoLock(); |
| | | void OnMsg(MsgI &msg); |
| | | public: |
| | | BusManager(SharedMemory &shm); |
| | | ~BusManager(); |
| | | bool Start(const int nworker = 2); |
| | | bool Stop(); |
| | | SocketPublish(Socket::Shm &shm) : |
| | | shm_(shm) {} |
| | | SocketPublish() : |
| | | SocketPublish(BHomeShm()) {} |
| | | bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms); |
| | | bool Publish(const Topic &topic, const std::string &data, const int timeout_ms) |
| | | { |
| | | return Publish(topic, data.data(), data.size(), timeout_ms); |
| | | } |
| | | }; |
| | | |
| | | // socket subscribe |
| | | class SocketSubscribe : private ShmSocket |
| | | { |
| | | typedef ShmSocket Socket; |
| | | |
| | | } // namespace bhome_shm |
| | | public: |
| | | SocketSubscribe(Socket::Shm &shm) : |
| | | Socket(shm, 64) {} |
| | | SocketSubscribe() : |
| | | SocketSubscribe(BHomeShm()) {} |
| | | ~SocketSubscribe() { Stop(); } |
| | | |
| | | typedef std::function<void(const Topic &topic, const std::string &data)> TopicDataCB; |
| | | bool StartRecv(const TopicDataCB &tdcb, int nworker = 2); |
| | | bool Stop() { return Socket::Stop(); } |
| | | bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms); |
| | | bool RecvSub(Topic &topic, std::string &data, const int timeout_ms); |
| | | }; |
| | | |
| | | #endif // end of include guard: PUBSUB_4KGRA997 |