| | |
| | | #ifndef SOCKET_GWTJHBPO |
| | | #define SOCKET_GWTJHBPO |
| | | |
| | | #include "bh_util.h" |
| | | #include "defs.h" |
| | | #include "shm_queue.h" |
| | | #include <atomic> |
| | |
| | | |
| | | class ShmSocket : private boost::noncopyable |
| | | { |
| | | template <class DoSend> |
| | | inline bool SendImpl(MsgI &msg, const int timeout_ms, const DoSend &doSend) |
| | | { |
| | | bool r = false; |
| | | DEFER1(if (msg.IsCounted() || !r) { msg.Release(shm()); }); |
| | | r = doSend(msg); |
| | | return r; |
| | | } |
| | | |
| | | protected: |
| | | typedef bhome_shm::ShmMsgQueue Queue; |
| | | |
| | |
| | | bool Stop(); |
| | | size_t Pending() const { return mq().Pending(); } |
| | | |
| | | bool Send(const void *id, const MsgI &imsg, const int timeout_ms) |
| | | bool Send(const void *valid_remote, const MsgI &imsg, const int timeout_ms) |
| | | { |
| | | return mq().Send(*static_cast<const MQId *>(id), imsg, timeout_ms); |
| | | assert(valid_remote); |
| | | return mq().Send(*static_cast<const MQId *>(valid_remote), imsg, timeout_ms); |
| | | } |
| | | //TODO reimplment, using async. |
| | | bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms); |
| | | |
| | | template <class Body> |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb = RecvCB()) |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb) |
| | | { |
| | | assert(valid_remote); |
| | | try { |
| | | if (cb) { |
| | | auto RegisterCB = [&]() { |
| | | std::lock_guard<std::mutex> lock(mutex()); |
| | | async_cbs_.emplace(head.msg_id(), cb); |
| | | }; |
| | | return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms, RegisterCB); |
| | | } else { |
| | | return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms); |
| | | } |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { async_cbs_->Add(head.msg_id(), cb); }); }; |
| | | MsgI msg; |
| | | return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend); |
| | | } |
| | | |
| | | template <class Body> |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms) |
| | | { |
| | | auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms); }; |
| | | MsgI msg; |
| | | return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend); |
| | | } |
| | | |
| | | template <class Body> |
| | |
| | | std::atomic<bool> run_; |
| | | |
| | | Queue mq_; |
| | | std::unordered_map<std::string, RecvCB> async_cbs_; |
| | | class AsyncCBs |
| | | { |
| | | std::unordered_map<std::string, RecvCB> store_; |
| | | |
| | | public: |
| | | bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; } |
| | | bool Find(const std::string &id, RecvCB &cb) |
| | | { |
| | | auto pos = store_.find(id); |
| | | if (pos != store_.end()) { |
| | | cb.swap(pos->second); |
| | | store_.erase(pos); |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | }; |
| | | |
| | | Synced<AsyncCBs> async_cbs_; |
| | | }; |
| | | |
| | | #endif // end of include guard: SOCKET_GWTJHBPO |