/* * ===================================================================================== * * Filename: socket.cpp * * Description: * * Version: 1.0 * Created: 2021年03月30日 15时48分58秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "socket.h" #include "bh_util.h" #include "defs.h" #include "msg.h" using namespace bhome_msg; using namespace bhome_shm; namespace { } // namespace ShmSocket::ShmSocket(Shm &shm, const void *id, const int len) : shm_(shm), run_(false) { if (id && len > 0) { mq_.reset(new Queue(*static_cast(id), shm, len)); } } ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) : shm_(shm), run_(false) { if (len > 0) { mq_.reset(new Queue(shm_, len)); } } ShmSocket::~ShmSocket() { Stop(); } bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker) { if (!mq_) { return false; } std::lock_guard lock(mutex_); StopNoLock(); auto RecvProc = [this, onData]() { while (run_) { try { MsgI imsg; DEFER1(imsg.Release(shm_)); if (mq_->Recv(imsg, 100)) { onData(imsg); } } catch (...) { } } }; run_.store(true); for (int i = 0; i < nworker; ++i) { workers_.emplace_back(RecvProc); } return true; } bool ShmSocket::Start(const RecvCB &onData, int nworker) { return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker); } bool ShmSocket::Stop() { std::lock_guard lock(mutex_); return StopNoLock(); } bool ShmSocket::StopNoLock() { if (run_.exchange(false)) { for (auto &w : workers_) { if (w.joinable()) { w.join(); } } workers_.clear(); return true; } return false; } bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms) { std::lock_guard lock(mutex_); if (!mq_ || RunningNoLock()) { return false; } else { return mq_->Send(*static_cast(id), msg, timeout_ms); } } bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms) { std::lock_guard lock(mutex_); if (!mq_ || RunningNoLock()) { return false; } else { return mq_->Recv(msg, timeout_ms); } }