/*
|
* =====================================================================================
|
*
|
* Filename: reqrep_center.cpp
|
*
|
* Description: topic request/reply center
|
*
|
* Version: 1.0
|
* Created: 2021年04月01日 14时08分50秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (),
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#include "reqrep_center.h"
|
#include "bh_util.h"
|
#include "msg.h"
|
#include <chrono>
|
#include <memory>
|
#include <mutex>
|
#include <unordered_map>
|
|
using namespace bhome_shm;
|
|
namespace
|
{
|
auto Now = []() { time_t t; return time(&t); };
|
|
class NodeCenter
|
{
|
public:
|
typedef std::string ProcAddr;
|
typedef bhome::msg::ProcInfo ProcInfo;
|
|
template <class Iter>
|
bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end)
|
{
|
try {
|
Node node(new NodeInfo);
|
node->addr_ = src_mq;
|
node->proc_.Swap(&info);
|
node->state_.timestamp_ = Now();
|
nodes_[node->proc_.id()] = node;
|
for (auto it = topics_begin; it != topics_end; ++it) {
|
topic_map_[*it] = node;
|
}
|
return true;
|
} catch (...) {
|
return false;
|
}
|
}
|
void Heartbeat(ProcInfo &info, const ProcAddr &src_mq)
|
{
|
auto pos = nodes_.find(info.name());
|
if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same.
|
NodeInfo &ni = *pos->second;
|
ni.state_.timestamp_ = Now();
|
if (!info.public_info().empty()) {
|
ni.proc_.set_public_info(info.public_info());
|
}
|
if (!info.private_info().empty()) {
|
ni.proc_.set_private_info(info.private_info());
|
}
|
}
|
}
|
bool QueryTopic(const std::string &topic, ProcAddr &addr)
|
{
|
auto pos = topic_map_.find(topic);
|
if (pos != topic_map_.end()) {
|
Node node(pos->second.lock());
|
if (node) {
|
addr = node->addr_;
|
return true;
|
} else { // dead, remove record.
|
topic_map_.erase(pos);
|
return false;
|
}
|
} else {
|
return false;
|
}
|
}
|
|
private:
|
struct ProcState {
|
time_t timestamp_ = 0;
|
uint32_t flag_ = 0; // reserved
|
};
|
typedef std::string ProcId;
|
struct NodeInfo {
|
ProcState state_; // state
|
ProcAddr addr_; // registered_mqid.
|
ProcInfo proc_; //
|
};
|
typedef std::shared_ptr<NodeInfo> Node;
|
typedef std::weak_ptr<NodeInfo> WeakNode;
|
std::unordered_map<std::string, WeakNode> topic_map_;
|
std::unordered_map<ProcId, Node> nodes_;
|
};
|
} // namespace
|
|
bool ReqRepCenter::Start(const int nworker)
|
{
|
auto center_ptr = std::make_shared<Synced<NodeCenter>>();
|
auto onRecv = [center_ptr, this](BHMsg &msg) {
|
auto ¢er = *center_ptr;
|
|
#ifndef NDEBUG
|
static std::atomic<time_t> last(0);
|
time_t now = 0;
|
time(&now);
|
if (last.exchange(now) < now) {
|
printf("bus queue size: %ld\n", socket_.Pending());
|
}
|
#endif
|
if (msg.route_size() == 0) {
|
return;
|
}
|
auto &src_mq = msg.route(0).mq_id();
|
|
auto OnRegister = [&]() {
|
DataProcRegister reg;
|
if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
|
center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end());
|
}
|
};
|
|
auto OnHeartbeat = [&]() {
|
DataProcHeartbeat hb;
|
if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
|
center->Heartbeat(*hb.mutable_proc(), src_mq);
|
}
|
};
|
|
auto OnQueryTopic = [&]() {
|
DataProcQueryTopic query;
|
NodeCenter::ProcAddr dest;
|
if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
|
MQId remote;
|
memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote));
|
MsgI imsg;
|
if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
|
if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) {
|
imsg.Release(shm());
|
}
|
}
|
};
|
|
switch (msg.type()) {
|
case kMsgTypeProcRegisterTopics: OnRegister(); break;
|
case kMsgTypeProcHeartbeat: OnHeartbeat(); break;
|
case kMsgTypeProcQueryTopic: OnQueryTopic(); break;
|
default: break;
|
}
|
};
|
|
const int kMaxWorker = 16;
|
return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
|
}
|