/* * ===================================================================================== * * Filename: socket.cpp * * Description: * * Version: 1.0 * Created: 2021年03月30日 15时48分58秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "socket.h" #include "bh_util.h" #include "defs.h" #include "msg.h" #include using namespace bhome_msg; using namespace bhome_shm; using namespace std::chrono_literals; namespace { int GetSocketDefaultLen(ShmSocket::Type type) { switch (type) { case ShmSocket::eSockRequest: return 12; case ShmSocket::eSockReply: return 64; case ShmSocket::eSockPublish: return 0; case ShmSocket::eSockSubscribe: return 64; default: return 0; } } } // namespace ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) : shm_(shm), type_(type), run_(false) { int len = GetSocketDefaultLen(type); if (len != 0) { mq_.reset(new Queue(shm_, len)); auto RecvProc = [this]() { while (run_) { try { std::unique_lock lk(mutex_); if (cv_recv_cb_.wait_for(lk, 100ms, [this]() { return HasRecvCB(); })) { BHMsg msg; if (mq_->Recv(msg, 100)) { this->onRecv_(msg); } } } catch (...) { } } }; run_.store(true); workers_.emplace_back(RecvProc); } } ShmSocket::ShmSocket(Type type) : ShmSocket(type, BHomeShm()) { } ShmSocket::~ShmSocket() { Stop(); } bool ShmSocket::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms) { if (type_ != eSockPublish) { return false; } assert(!mq_); try { MsgI imsg; if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) { return false; } DEFER1(imsg.Release(shm_)); return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms); } catch (...) { return false; } } bool ShmSocket::Subscribe(const std::vector &topics, const int timeout_ms) { if (type_ != eSockSubscribe) { return false; } assert(mq_); try { return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms); } catch (...) { return false; } } bool ShmSocket::SetRecvCallback(const RecvCB &onRecv) { std::lock_guard lock(mutex_); onRecv_ = onRecv; cv_recv_cb_.notify_one(); return true; } bool ShmSocket::HasRecvCB() { return static_cast(onRecv_); } void ShmSocket::Stop() { run_ = false; for (auto &t : workers_) { if (t.joinable()) { t.join(); } } }