/*
|
* =====================================================================================
|
*
|
* Filename: topic_reply.cpp
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年04月06日 14时40分52秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (),
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "topic_reply.h"
|
#include <chrono>
|
#include <list>
|
|
using namespace bhome_msg;
|
using namespace std::chrono;
|
using namespace std::chrono_literals;
|
|
namespace
|
{
|
struct SrcInfo {
|
std::vector<BHAddress> route;
|
std::string msg_id;
|
};
|
|
class FailedQ
|
{
|
struct FailedMsg {
|
steady_clock::time_point xpr;
|
std::string remote_;
|
BHMsg msg_;
|
FailedMsg(const std::string &addr, BHMsg &&msg) :
|
xpr(steady_clock::now() + 10s), remote_(addr), msg_(std::move(msg)) {}
|
bool Expired() { return steady_clock::now() > xpr; }
|
};
|
typedef std::list<FailedMsg> Queue;
|
Synced<Queue> queue_;
|
|
public:
|
void Push(const std::string &remote, BHMsg &&msg)
|
{
|
queue_->emplace_back(remote, std::move(msg));
|
}
|
void TrySend(ShmSocket &socket, const int timeout_ms = 0)
|
{
|
queue_.Apply([&](Queue &q) {
|
if (!q.empty()) {
|
auto it = q.begin();
|
do {
|
if (it->Expired() || socket.SyncSend(it->remote_.data(), it->msg_, timeout_ms)) {
|
it = q.erase(it);
|
} else {
|
++it;
|
}
|
} while (it != q.end());
|
}
|
});
|
}
|
};
|
|
} // namespace
|
|
bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
|
{
|
//TODO check reply?
|
return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
|
}
|
bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
|
{
|
return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
|
}
|
bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
|
{
|
auto failed_q = std::make_shared<FailedQ>();
|
|
auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); };
|
|
auto onRecv = [this, rcb, failed_q, onIdle](BHMsg &msg) {
|
if (msg.type() == kMsgTypeRequestTopic && msg.route_size() > 0) {
|
MsgRequestTopic req;
|
if (req.ParseFromString(msg.body())) {
|
std::string out;
|
if (rcb(req.topic(), req.data(), out)) {
|
BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
|
for (int i = 0; i < msg.route_size() - 1; ++i) {
|
msg.add_route()->Swap(msg.mutable_route(i));
|
}
|
if (!SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 10)) {
|
failed_q->Push(msg.route().rbegin()->mq_id(), std::move(msg_reply));
|
}
|
}
|
}
|
} else {
|
// ignored, or dropped
|
}
|
|
onIdle(*this);
|
};
|
|
return rcb && Start(onRecv, onIdle, nworker);
|
}
|
|
bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
|
{
|
BHMsg msg;
|
if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequestTopic) {
|
MsgRequestTopic request;
|
if (request.ParseFromString(msg.body())) {
|
request.mutable_topic()->swap(topic);
|
request.mutable_data()->swap(data);
|
SrcInfo *p = new SrcInfo;
|
p->route.assign(msg.route().begin(), msg.route().end());
|
p->msg_id = msg.msg_id();
|
src_info = p;
|
return true;
|
}
|
}
|
return false;
|
}
|
|
bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
|
{
|
SrcInfo *p = static_cast<SrcInfo *>(src_info);
|
DEFER1(delete p);
|
if (!p || p->route.empty()) {
|
return false;
|
}
|
|
BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
|
for (unsigned i = 0; i < p->route.size() - 1; ++i) {
|
msg.add_route()->Swap(&p->route[i]);
|
}
|
|
return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
|
}
|