/* * ===================================================================================== * * Filename: socket.cpp * * Description: * * Version: 1.0 * Created: 2021年03月30日 15时48分58秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "socket.h" #include "bh_util.h" #include "defs.h" #include "msg.h" using namespace bhome_msg; using namespace bhome_shm; namespace { } // namespace ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) : shm_(shm), run_(false), mq_(id, shm, len) { } ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) : shm_(shm), run_(false), mq_(shm, len) {} ShmSocket::~ShmSocket() { Stop(); //TODO should stop in sub class, incase thread access sub class data. } bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle) { auto onRecv = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { auto Find = [&](RecvCB &cb) { std::lock_guard lock(mutex()); const std::string &msgid = head.msg_id(); auto pos = async_cbs_.find(msgid); if (pos != async_cbs_.end()) { cb.swap(pos->second); async_cbs_.erase(pos); return true; } else { return false; } }; RecvCB cb; if (Find(cb)) { cb(socket, imsg, head); } else if (onData) { onData(socket, imsg, head); } // else ignored, or dropped }; std::lock_guard lock(mutex_); StopNoLock(); auto RecvProc = [this, onRecv, onIdle]() { while (run_) { try { MsgI imsg; if (mq().Recv(imsg, 10)) { DEFER1(imsg.Release(shm())); BHMsgHead head; if (imsg.ParseHead(head)) { onRecv(*this, imsg, head); } } else if (onIdle) { onIdle(*this); } } catch (...) { } } }; run_.store(true); for (int i = 0; i < nworker; ++i) { workers_.emplace_back(RecvProc); } return true; } bool ShmSocket::Stop() { std::lock_guard lock(mutex_); return StopNoLock(); } bool ShmSocket::StopNoLock() { if (run_.exchange(false)) { for (auto &w : workers_) { if (w.joinable()) { w.join(); } } workers_.clear(); return true; } return false; } bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms) { std::lock_guard lock(mutex_); auto Recv = [&]() { if (mq().Recv(msg, timeout_ms)) { if (msg.ParseHead(head)) { return true; } else { msg.Release(shm()); } } return false; }; return !RunningNoLock() && Recv(); }