| | |
| | | Stop(); |
| | | } |
| | | |
| | | bool ShmSocket::Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker) |
| | | { |
| | | auto ioProc = [this, onData, onIdle]() { |
| | | auto DoSend = [this]() { return send_buffer_.TrySend(mq()); }; |
| | | auto DoRecv = [=] { |
| | | // do not recv if no cb is set. |
| | | if (!onData) { |
| | | return false; |
| | | } |
| | | auto onMsg = [&](MsgI &imsg) { |
| | | DEFER1(imsg.Release()); |
| | | onData(*this, imsg); |
| | | }; |
| | | MsgI imsg; |
| | | return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false; |
| | | }; |
| | | |
| | | try { |
| | | bool more_to_send = DoSend(); |
| | | bool more_to_recv = DoRecv(); |
| | | if (onIdle) { onIdle(*this); } |
| | | if (!more_to_send && !more_to_recv) { |
| | | robust::QuickSleep(); |
| | | } |
| | | } catch (...) { |
| | | } |
| | | }; |
| | | |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | StopNoLock(); |
| | | |
| | | run_.store(true); |
| | | for (int i = 0; i < nworker; ++i) { |
| | | workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } }); |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle) |
| | | { |
| | | auto ioProc = [this, onData, onIdle]() { |
| | |
| | | bool more_to_recv = DoRecv(); |
| | | if (onIdle) { onIdle(*this); } |
| | | if (!more_to_send && !more_to_recv) { |
| | | std::this_thread::yield(); |
| | | using namespace std::chrono_literals; |
| | | std::this_thread::sleep_for(10000ns); |
| | | robust::QuickSleep(); |
| | | } |
| | | } catch (...) { |
| | | } |