/* * ===================================================================================== * * Filename: reqrep_center.cpp * * Description: topic request/reply center * * Version: 1.0 * Created: 2021年04月01日 14时08分50秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "reqrep_center.h" #include "bh_util.h" using namespace bhome_shm; struct A { void F(int){}; }; namespace { inline uint64_t Now() { time_t t; return time(&t); } } // namespace bool ReqRepCenter::Start(const int nworker) { auto onRecv = [&](BHMsg &msg) { #ifndef NDEBUG static std::atomic last(0); time_t now = 0; time(&now); if (last.exchange(now) < now) { printf("bus queue size: %ld\n", socket_.Pending()); } #endif if (msg.route_size() == 0) { return; } auto &src_mq = msg.route(0).mq_id(); auto OnRegister = [&]() { DataProcRegister reg; if (!reg.ParseFromString(msg.body())) { return; } ProcInfo pi; pi.server_mqid_ = src_mq; pi.proc_id_ = reg.proc().name(); pi.ext_info_ = reg.proc().info(); pi.timestamp_ = Now(); std::lock_guard lock(mutex_); for (auto &t : reg.topics()) { topic_mq_[t] = pi.server_mqid_; } procs_[pi.proc_id_] = pi; }; auto OnHeartbeat = [&]() { DataProcHeartbeat hb; if (!hb.ParseFromString(msg.body())) { return; } std::lock_guard lock(mutex_); auto pos = procs_.find(hb.proc().name()); if (pos != procs_.end() && pos->second.server_mqid_ == src_mq) { // both name and mq should be the same. pos->second.timestamp_ = Now(); pos->second.ext_info_ = hb.proc().info(); } }; auto OnQueryTopic = [&]() { DataProcQueryTopic query; if (!query.ParseFromString(msg.body())) { return; } std::string dest; auto FindDest = [&]() { std::lock_guard lock(mutex_); auto pos = topic_mq_.find(query.topic()); if (pos != topic_mq_.end()) { dest = pos->second; return true; } else { return false; } }; if (FindDest()) { MQId remote; memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote)); MsgI imsg; if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; } if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) { imsg.Release(shm()); } } }; switch (msg.type()) { case kMsgTypeProcRegisterTopics: OnRegister(); break; case kMsgTypeProcHeartbeat: OnHeartbeat(); break; case kMsgTypeProcQueryTopic: OnQueryTopic(); break; default: break; } }; const int kMaxWorker = 16; return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); }