/* * ===================================================================================== * * Filename: pubsub.cpp * * Description: * * Version: 1.0 * Created: 2021年03月24日 18时44分13秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "pubsub.h" #include "bh_util.h" #include "defs.h" using namespace std::chrono_literals; using namespace bhome_msg; bool SocketPublish::Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms) { try { MsgPublish pub; pub.set_topic(topic); pub.set_data(data, size); BHMsgHead head(InitMsgHead(GetType(pub), proc_id)); MsgI imsg; if (imsg.MakeRC(shm(), head, pub)) { DEFER1(imsg.Release(shm())); return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms); } } catch (...) { } return false; } namespace { inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } } // namespace bool SocketSubscribe::Subscribe(const std::string &proc_id, const std::vector &topics, const int timeout_ms) { try { MsgSubscribe sub; for (auto &topic : topics) { sub.add_topics(topic); } BHMsgHead head(InitMsgHead(GetType(sub), proc_id)); AddRoute(head, mq().Id()); return Send(&BHTopicBusAddress(), head, sub, timeout_ms); } catch (...) { return false; } } bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker) { auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { if (head.type() == kMsgTypePublish) { MsgPublish pub; if (imsg.ParseBody(pub)) { tdcb(head.proc_id(), pub.topic(), pub.data()); } } else { // ignored, or dropped } }; return tdcb && Start(AsyncRecvProc, nworker); } bool SocketSubscribe::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms) { MsgI msg; BHMsgHead head; if (SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { MsgPublish pub; if (msg.ParseBody(pub)) { head.mutable_proc_id()->swap(proc_id); pub.mutable_topic()->swap(topic); pub.mutable_data()->swap(data); return true; } } return false; }