/*
|
* =====================================================================================
|
*
|
* Filename: pubsub.cpp
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年03月24日 18时44分13秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (),
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "pubsub.h"
|
#include "bh_util.h"
|
#include "defs.h"
|
|
using namespace std::chrono_literals;
|
using namespace bhome_msg;
|
|
bool SocketPublish::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
|
{
|
try {
|
MsgI imsg;
|
if (!imsg.MakeRC(shm(), MakePub(topic, data, size))) {
|
return false;
|
}
|
DEFER1(imsg.Release(shm()));
|
return ShmMsgQueue::Send(shm(), kBHTopicBus, imsg, timeout_ms);
|
} catch (...) {
|
return false;
|
}
|
}
|
|
bool SocketSubscribe::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
|
{
|
try {
|
return mq().Send(kBHTopicBus, MakeSub(mq().Id(), topics), timeout_ms);
|
} catch (...) {
|
return false;
|
}
|
}
|
|
bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker)
|
{
|
auto AsyncRecvProc = [this, tdcb](BHMsg &msg) {
|
if (msg.type() == kMsgTypePublish) {
|
DataPub d;
|
if (d.ParseFromString(msg.body())) {
|
tdcb(d.topic(), d.data());
|
}
|
} else {
|
// ignored, or dropped
|
}
|
};
|
|
return tdcb && Start(AsyncRecvProc, nworker);
|
}
|
|
bool SocketSubscribe::RecvSub(std::string &topic, std::string &data, const int timeout_ms)
|
{
|
BHMsg msg;
|
if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) {
|
DataPub d;
|
if (d.ParseFromString(msg.body())) {
|
d.mutable_topic()->swap(topic);
|
d.mutable_data()->swap(data);
|
return true;
|
}
|
}
|
return false;
|
}
|