| | |
| | | #ifndef PUBSUB_4KGRA997 |
| | | #define PUBSUB_4KGRA997 |
| | | |
| | | #include "defs.h" |
| | | #include "socket.h" |
| | | #include <mutex> |
| | | #include <set> |
| | | #include <unordered_map> |
| | | #include <string> |
| | | |
| | | namespace bhome_shm |
| | | class SocketPublish |
| | | { |
| | | |
| | | // publish/subcribe manager. |
| | | class BusManager |
| | | { |
| | | SharedMemory &shm_; |
| | | ShmSocket socket_; |
| | | std::mutex mutex_; |
| | | typedef std::set<MQId> Clients; |
| | | std::unordered_map<std::string, Clients> records_; |
| | | typedef ShmSocket Socket; |
| | | Socket::Shm &shm_; |
| | | Socket::Shm &shm() { return shm_; } |
| | | |
| | | public: |
| | | BusManager(SharedMemory &shm); |
| | | BusManager(); |
| | | ~BusManager() { Stop(); } |
| | | bool Start(const int nworker = 2); |
| | | bool Stop() { return socket_.Stop(); } |
| | | SocketPublish(Socket::Shm &shm) : |
| | | shm_(shm) {} |
| | | SocketPublish() : |
| | | SocketPublish(BHomeShm()) {} |
| | | bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms); |
| | | bool Publish(const std::string &topic, const std::string &data, const int timeout_ms) |
| | | { |
| | | return Publish(topic, data.data(), data.size(), timeout_ms); |
| | | } |
| | | }; |
| | | |
| | | } // namespace bhome_shm |
| | | // socket subscribe |
| | | class SocketSubscribe : private ShmSocket |
| | | { |
| | | typedef ShmSocket Socket; |
| | | |
| | | public: |
| | | SocketSubscribe(Socket::Shm &shm) : |
| | | Socket(shm, 64) {} |
| | | SocketSubscribe() : |
| | | SocketSubscribe(BHomeShm()) {} |
| | | |
| | | typedef std::function<void(const std::string &topic, const std::string &data)> TopicDataCB; |
| | | bool StartRecv(const TopicDataCB &tdcb, int nworker = 2); |
| | | bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms); |
| | | bool RecvSub(std::string &topic, std::string &data, const int timeout_ms); |
| | | }; |
| | | |
| | | #endif // end of include guard: PUBSUB_4KGRA997 |