| | |
| | | public: |
| | | typedef ShmMsgQueue::MQId MQId; |
| | | typedef bhome_shm::SharedMemory Shm; |
| | | typedef std::function<void(ShmSocket &sock, MsgI &imsg)> RawRecvCB; |
| | | typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB; |
| | | typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB; |
| | | typedef std::function<void(ShmSocket &sock)> IdleCB; |
| | | |
| | | ShmSocket(Shm &shm, const MQId id, const int len); |
| | | ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len); |
| | | ShmSocket(Shm &shm, const int len = 12); |
| | | ~ShmSocket(); |
| | | static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); } |
| | | bool Remove() { return Remove(shm(), id()); } |
| | | MQId id() const { return mq().Id(); } |
| | | // start recv. |
| | | bool Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker = 1); |
| | | bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB()); |
| | | bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); } |
| | | bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); } |
| | |
| | | std::unique_lock<std::mutex> lk(st->mutex); |
| | | bool sendok = Send(remote, head, body, std::move(OnRecv)); |
| | | if (!sendok) { |
| | | printf("send timeout\n"); |
| | | LOG_DEBUG() << "send timeout"; |
| | | } |
| | | if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) { |
| | | return true; |
| | |
| | | template <class... Rest> |
| | | bool SendImpl(const MQId remote, Rest &&...rest) |
| | | { |
| | | // TODO send alloc request, and pack later, higher bit means alloc? |
| | | send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...); |
| | | return true; |
| | | } |
| | |
| | | std::atomic<bool> run_; |
| | | |
| | | Queue mq_; |
| | | class AsyncCBs |
| | | template <class Key> |
| | | class CallbackRecords |
| | | { |
| | | std::unordered_map<std::string, RecvCB> store_; |
| | | std::unordered_map<Key, RecvCB> store_; |
| | | |
| | | public: |
| | | bool empty() const { return store_.empty(); } |
| | | bool Store(const std::string &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; } |
| | | bool Pick(const std::string &id, RecvCB &cb) |
| | | bool Store(const Key &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; } |
| | | bool Pick(const Key &id, RecvCB &cb) |
| | | { |
| | | auto pos = store_.find(id); |
| | | if (pos != store_.end()) { |
| | |
| | | } |
| | | }; |
| | | |
| | | Synced<AsyncCBs> per_msg_cbs_; |
| | | Synced<CallbackRecords<std::string>> per_msg_cbs_; |
| | | |
| | | SendQ send_buffer_; |
| | | // Synced<SendQ> send_buffer_; |
| | | }; |
| | | |
| | | #endif // end of include guard: SOCKET_GWTJHBPO |