/* * ===================================================================================== * * Filename: socket.cpp * * Description: * * Version: 1.0 * Created: 2021年03月30日 15时48分58秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "socket.h" #include "bh_util.h" #include "defs.h" #include "msg.h" using namespace bhome_msg; using namespace bhome_shm; ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) : run_(false), mq_(id, shm, len) { Start(); } ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) : run_(false), mq_(shm, len) { Start(); } ShmSocket::~ShmSocket() { Stop(); } bool ShmSocket::Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker) { auto ioProc = [this, onData, onIdle]() { auto DoSend = [this]() { return send_buffer_.TrySend(mq()); }; auto DoRecv = [=] { // do not recv if no cb is set. if (!onData) { return false; } auto onMsg = [&](MsgI &imsg) { DEFER1(imsg.Release()); onData(*this, imsg); }; MsgI imsg; return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false; }; try { bool more_to_send = DoSend(); bool more_to_recv = DoRecv(); if (onIdle) { onIdle(*this); } if (!more_to_send && !more_to_recv) { robust::QuickSleep(); } } catch (...) { } }; std::lock_guard lock(mutex_); StopNoLock(); run_.store(true); for (int i = 0; i < nworker; ++i) { workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } }); } return true; } bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle) { auto ioProc = [this, onData, onIdle]() { auto DoSend = [this]() { return send_buffer_.TrySend(mq()); }; auto DoRecv = [=] { auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { RecvCB cb; if (per_msg_cbs_->Pick(head.msg_id(), cb)) { cb(socket, imsg, head); } else if (onData) { onData(socket, imsg, head); } }; // do not recv if no cb is set. if (!onData && per_msg_cbs_->empty()) { return false; } auto onMsg = [&](MsgI &imsg) { DEFER1(imsg.Release()); BHMsgHead head; if (imsg.ParseHead(head)) { onRecvWithPerMsgCB(*this, imsg, head); } }; MsgI imsg; return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false; }; try { bool more_to_send = DoSend(); bool more_to_recv = DoRecv(); if (onIdle) { onIdle(*this); } if (!more_to_send && !more_to_recv) { robust::QuickSleep(); } } catch (...) { } }; std::lock_guard lock(mutex_); StopNoLock(); run_.store(true); for (int i = 0; i < nworker; ++i) { workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } }); } return true; } bool ShmSocket::Stop() { std::lock_guard lock(mutex_); return StopNoLock(); } bool ShmSocket::StopNoLock() { if (run_.exchange(false)) { for (auto &w : workers_) { if (w.joinable()) { w.join(); } } workers_.clear(); return true; } return false; } //maybe reimplment, using async cbs? bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms) { // std::lock_guard lock(mutex_); // seems no need to lock mutex_. bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms); if (got) { if (msg.ParseHead(head)) { return true; } else { msg.Release(); } } return false; }