/* * ===================================================================================== * * Filename: pubsub.cpp * * Description: * * Version: 1.0 * Created: 2021年03月24日 18时44分13秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "pubsub.h" #include "bh_util.h" #include "defs.h" using namespace std::chrono_literals; using namespace bhome_msg; bool SocketPublish::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms) { try { MsgI imsg; if (!imsg.MakeRC(shm(), MakePub(topic, data, size))) { return false; } DEFER1(imsg.Release(shm())); return ShmMsgQueue::Send(shm(), kBHTopicBus, imsg, timeout_ms); } catch (...) { return false; } } bool SocketSubscribe::Subscribe(const std::vector &topics, const int timeout_ms) { try { return mq().Send(kBHTopicBus, MakeSub(mq().Id(), topics), timeout_ms); } catch (...) { return false; } } bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker) { auto AsyncRecvProc = [this, tdcb](BHMsg &msg) { if (msg.type() == kMsgTypePublish) { MsgPub d; if (d.ParseFromString(msg.body())) { tdcb(d.topic(), d.data()); } } else { // ignored, or dropped } }; return tdcb && Start(AsyncRecvProc, nworker); } bool SocketSubscribe::RecvSub(Topic &topic, std::string &data, const int timeout_ms) { BHMsg msg; if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) { MsgPub d; if (d.ParseFromString(msg.body())) { d.mutable_topic()->swap(topic); d.mutable_data()->swap(data); return true; } } return false; }