/*
|
* =====================================================================================
|
*
|
* Filename: reqrep_center.cpp
|
*
|
* Description: topic request/reply center
|
*
|
* Version: 1.0
|
* Created: 2021年04月01日 14时08分50秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (),
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "reqrep_center.h"
|
#include "bh_util.h"
|
using namespace bhome_shm;
|
|
struct A {
|
void F(int){};
|
};
|
|
namespace
|
{
|
inline uint64_t Now()
|
{
|
time_t t;
|
return time(&t);
|
}
|
|
} // namespace
|
bool ReqRepCenter::Start(const int nworker)
|
{
|
auto onRecv = [&](BHMsg &msg) {
|
#ifndef NDEBUG
|
static std::atomic<time_t> last(0);
|
time_t now = 0;
|
time(&now);
|
if (last.exchange(now) < now) {
|
printf("bus queue size: %ld\n", socket_.Pending());
|
}
|
#endif
|
if (msg.route_size() == 0) {
|
return;
|
}
|
auto &src_mq = msg.route(0).mq_id();
|
|
auto OnRegister = [&]() {
|
DataProcRegister reg;
|
if (!reg.ParseFromString(msg.body())) {
|
return;
|
}
|
ProcInfo pi;
|
pi.server_mqid_ = src_mq;
|
pi.proc_id_ = reg.proc().name();
|
pi.ext_info_ = reg.proc().info();
|
pi.timestamp_ = Now();
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
for (auto &t : reg.topics()) {
|
topic_mq_[t] = pi.server_mqid_;
|
}
|
procs_[pi.proc_id_] = pi;
|
};
|
|
auto OnHeartbeat = [&]() {
|
DataProcHeartbeat hb;
|
if (!hb.ParseFromString(msg.body())) {
|
return;
|
}
|
|
std::lock_guard<std::mutex> lock(mutex_);
|
auto pos = procs_.find(hb.proc().name());
|
if (pos != procs_.end() && pos->second.server_mqid_ == src_mq) { // both name and mq should be the same.
|
pos->second.timestamp_ = Now();
|
pos->second.ext_info_ = hb.proc().info();
|
}
|
};
|
|
auto OnQueryTopic = [&]() {
|
DataProcQueryTopic query;
|
if (!query.ParseFromString(msg.body())) {
|
return;
|
}
|
|
std::string dest;
|
auto FindDest = [&]() {
|
std::lock_guard<std::mutex> lock(mutex_);
|
auto pos = topic_mq_.find(query.topic());
|
if (pos != topic_mq_.end()) {
|
dest = pos->second;
|
return true;
|
} else {
|
return false;
|
}
|
};
|
if (FindDest()) {
|
MQId remote;
|
memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote));
|
MsgI imsg;
|
if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
|
if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) {
|
imsg.Release(shm());
|
}
|
}
|
};
|
|
switch (msg.type()) {
|
case kMsgTypeProcRegisterTopics: OnRegister(); break;
|
case kMsgTypeProcHeartbeat: OnHeartbeat(); break;
|
case kMsgTypeProcQueryTopic: OnQueryTopic(); break;
|
default: break;
|
}
|
};
|
|
const int kMaxWorker = 16;
|
return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
|
}
|