| | |
| | | return false; |
| | | } |
| | | |
| | | bool ShmSocket::SyncRecv(int64_t &cmd, const int timeout_ms) |
| | | { |
| | | return (timeout_ms == 0) ? mq().TryRecv(cmd) : mq().Recv(cmd, timeout_ms); |
| | | } |
| | | //maybe reimplment, using async cbs? |
| | | bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms) |
| | | { |
| | | // std::lock_guard<std::mutex> lock(mutex_); // seems no need to lock mutex_. |
| | | bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms); |
| | | if (got) { |
| | | if (msg.ParseHead(head)) { |
| | | return true; |
| | | } else { |
| | | msg.Release(); |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb) |
| | | { |
| | | size_t size = content.size(); |
| | | auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { |
| | | if (!msg.Fill(content)) { return; } |
| | | if (!msg.Fill(content)) { return false; } |
| | | |
| | | try { |
| | | if (!cb) { |
| | |
| | | }; |
| | | Send(remote, msg, onExpireRemoveCB); |
| | | } |
| | | return true; |
| | | } catch (...) { |
| | | SetLastError(eError, "Send internal error."); |
| | | return false; |
| | | } |
| | | }; |
| | | |
| | | #if 0 |
| | | // self alloc |
| | | MsgI msg; |
| | | if (msg.Make(size)) { |
| | | DEFER1(msg.Release()); |
| | | return OnResult(msg); |
| | | } |
| | | #else |
| | | // center alloc |
| | | return RequestAlloc(size, OnResult); |
| | | #endif |
| | | } |
| | | |
| | | bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult) |