| | |
| | | |
| | | #include <boost/uuid/uuid.hpp> |
| | | #include <boost/uuid/uuid_generators.hpp> |
| | | #include <string> |
| | | |
| | | typedef boost::uuids::uuid MQId; |
| | | |
| | |
| | | } // namespace bhome_shm |
| | | |
| | | bhome_shm::SharedMemory &BHomeShm(); |
| | | typedef std::string Topic; |
| | | |
| | | //TODO center can check shm for previous crash. |
| | | |
| | |
| | | using namespace std::chrono_literals; |
| | | using namespace bhome_msg; |
| | | |
| | | bool SocketPublish::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms) |
| | | bool SocketPublish::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms) |
| | | { |
| | | try { |
| | | MsgI imsg; |
| | |
| | | } |
| | | } |
| | | |
| | | bool SocketSubscribe::Subscribe(const std::vector<std::string> &topics, const int timeout_ms) |
| | | bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms) |
| | | { |
| | | try { |
| | | return mq().Send(kBHTopicBus, MakeSub(mq().Id(), topics), timeout_ms); |
| | |
| | | return tdcb && Start(AsyncRecvProc, nworker); |
| | | } |
| | | |
| | | bool SocketSubscribe::RecvSub(std::string &topic, std::string &data, const int timeout_ms) |
| | | bool SocketSubscribe::RecvSub(Topic &topic, std::string &data, const int timeout_ms) |
| | | { |
| | | BHMsg msg; |
| | | if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) { |
| | |
| | | shm_(shm) {} |
| | | SocketPublish() : |
| | | SocketPublish(BHomeShm()) {} |
| | | bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms); |
| | | bool Publish(const std::string &topic, const std::string &data, const int timeout_ms) |
| | | bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms); |
| | | bool Publish(const Topic &topic, const std::string &data, const int timeout_ms) |
| | | { |
| | | return Publish(topic, data.data(), data.size(), timeout_ms); |
| | | } |
| | |
| | | SocketSubscribe(BHomeShm()) {} |
| | | ~SocketSubscribe() { Stop(); } |
| | | |
| | | typedef std::function<void(const std::string &topic, const std::string &data)> TopicDataCB; |
| | | typedef std::function<void(const Topic &topic, const std::string &data)> TopicDataCB; |
| | | bool StartRecv(const TopicDataCB &tdcb, int nworker = 2); |
| | | bool Stop() { return Socket::Stop(); } |
| | | bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms); |
| | | bool RecvSub(std::string &topic, std::string &data, const int timeout_ms); |
| | | bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms); |
| | | bool RecvSub(Topic &topic, std::string &data, const int timeout_ms); |
| | | }; |
| | | |
| | | #endif // end of include guard: PUBSUB_4KGRA997 |
| | |
| | | }; |
| | | SocketBus socket_; |
| | | ShmSocket::Shm &shm() { return socket_.shm(); } |
| | | |
| | | std::mutex mutex_; |
| | | typedef std::set<MQId> Clients; |
| | | std::unordered_map<std::string, Clients> records_; |
| | | std::unordered_map<Topic, Clients> records_; |
| | | bool Find1(const Topic &topic); |
| | | |
| | | public: |
| | | PubSubCenter(ShmSocket::Shm &shm) : |
| | |
| | | return Start(AsyncRecvProc, nworker); |
| | | } |
| | | |
| | | bool SocketRequest::AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) |
| | | bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) |
| | | { |
| | | auto Call = [&](const void *remote) { |
| | | const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); |
| | |
| | | } |
| | | } |
| | | |
| | | bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms) |
| | | bool SocketRequest::SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms) |
| | | { |
| | | try { |
| | | BHAddress addr; |
| | |
| | | } |
| | | } |
| | | |
| | | bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms) |
| | | bool SocketRequest::QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) |
| | | { |
| | | if (topic_cache_.Find(topic, addr)) { |
| | | return true; |
| | |
| | | bool StartWorker(const RequestResultCB &rrcb, int nworker = 2); |
| | | bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); } |
| | | bool Stop() { return Socket::Stop(); } |
| | | bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb); |
| | | bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb) |
| | | bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb); |
| | | bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb) |
| | | { |
| | | return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb); |
| | | } |
| | | bool SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms); |
| | | bool SyncRequest(const std::string &topic, const std::string &data, std::string &out, const int timeout_ms) |
| | | bool SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms); |
| | | bool SyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms) |
| | | { |
| | | return SyncRequest(topic, data.data(), data.size(), out, timeout_ms); |
| | | } |
| | |
| | | private: |
| | | bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb); |
| | | bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms); |
| | | bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms); |
| | | bool QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); |
| | | std::unordered_map<std::string, RecvCB> async_cbs_; |
| | | |
| | | typedef bhome_msg::BHAddress Address; |
| | |
| | | { |
| | | class Impl |
| | | { |
| | | typedef std::unordered_map<std::string, Address> Store; |
| | | typedef std::unordered_map<Topic, Address> Store; |
| | | Store store_; |
| | | |
| | | public: |
| | | bool Find(const std::string &topic, Address &addr) |
| | | bool Find(const Topic &topic, Address &addr) |
| | | { |
| | | auto pos = store_.find(topic); |
| | | if (pos != store_.end()) { |
| | |
| | | return false; |
| | | } |
| | | } |
| | | bool Update(const std::string &topic, const Address &addr) |
| | | bool Update(const Topic &topic, const Address &addr) |
| | | { |
| | | store_[topic] = addr; |
| | | return true; |
| | |
| | | // } |
| | | |
| | | public: |
| | | bool Find(const std::string &topic, Address &addr) { return impl_->Find(topic, addr); } |
| | | bool Update(const std::string &topic, const Address &addr) { return impl_->Update(topic, addr); } |
| | | bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); } |
| | | bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); } |
| | | }; |
| | | TopicCache topic_cache_; |
| | | }; |
| | |
| | | } |
| | | } |
| | | } |
| | | bool QueryTopic(const std::string &topic, ProcAddr &addr) |
| | | bool QueryTopic(const Topic &topic, ProcAddr &addr) |
| | | { |
| | | auto pos = topic_map_.find(topic); |
| | | if (pos != topic_map_.end()) { |
| | |
| | | }; |
| | | typedef std::shared_ptr<NodeInfo> Node; |
| | | typedef std::weak_ptr<NodeInfo> WeakNode; |
| | | std::unordered_map<std::string, WeakNode> topic_map_; |
| | | std::unordered_map<Topic, WeakNode> topic_map_; |
| | | std::unordered_map<ProcId, Node> nodes_; |
| | | }; |
| | | } // namespace |
| | |
| | | |
| | | BOOST_AUTO_TEST_CASE(Temp) |
| | | { |
| | | std::string topics[] = { |
| | | Topic topics[] = { |
| | | "", |
| | | ".", |
| | | "a", |
| | |
| | | } |
| | | }; |
| | | ThreadManager threads; |
| | | typedef std::vector<std::string> Topics; |
| | | typedef std::vector<Topic> Topics; |
| | | Topics topics; |
| | | for (int i = 0; i < 100; ++i) { |
| | | topics.push_back("t" + std::to_string(i)); |
| | |
| | | } |
| | | }; |
| | | ThreadManager clients, servers; |
| | | std::vector<std::string> topics = {"topic1", "topic2"}; |
| | | std::vector<Topic> topics = {"topic1", "topic2"}; |
| | | servers.Launch(Server, "server", topics); |
| | | std::this_thread::sleep_for(100ms); |
| | | for (auto &t : topics) { |