| | |
| | | kMsgTypePublish = 3; |
| | | kMsgTypeSubscribe = 4; |
| | | kMsgTypeUnsubscribe = 5; |
| | | kMsgTypeQueryTopic = 6; |
| | | kMsgTypeQueryTopicReply = 7; |
| | | |
| | | kMsgTypeProcQueryTopic = 6; |
| | | kMsgTypeProcQueryTopicReply = 7; |
| | | kMsgTypeProcRegisterTopics = 8; |
| | | kMsgTypeProcHeartbeat = 9; |
| | | } |
| | | |
| | | message DataPub { |
| | |
| | | bytes data = 1; |
| | | } |
| | | |
| | | message DataQueryTopic { |
| | | message ProcInfo |
| | | { |
| | | bytes name = 1; |
| | | bytes info = 2; |
| | | } |
| | | |
| | | message DataProcRegister |
| | | { |
| | | ProcInfo proc = 1; |
| | | repeated bytes topics = 2; |
| | | } |
| | | |
| | | message DataProcHeartbeat |
| | | { |
| | | ProcInfo proc = 1; |
| | | } |
| | | |
| | | message DataProcQueryTopic { |
| | | bytes topic = 1; |
| | | } |
| | | |
| | | message DataQueryTopicReply { |
| | | message DataProcQueryTopicReply { |
| | | BHAddress address = 1; |
| | | } |
| | |
| | | |
| | | typedef boost::uuids::uuid MQId; |
| | | |
| | | const MQId kBHBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff"); |
| | | const MQId kBHTopicRPCId = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff"); |
| | | const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff"); |
| | | const MQId kBHTopicReqRepCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff"); |
| | | const int kBHCenterPort = 24287; |
| | | const char kTopicSep = '.'; |
| | | namespace bhome_shm |
| | |
| | | const uint32_t kMsgTag = 0xf1e2d3c4; |
| | | const uint32_t kMsgPrefixLen = 4; |
| | | |
| | | BHMsg InitMsg(MsgType type) |
| | | inline void AddRoute(BHMsg &msg, const MQId &id) { msg.add_route()->set_mq_id(&id, sizeof(id)); } |
| | | |
| | | std::string RandId() |
| | | { |
| | | boost::uuids::uuid id = boost::uuids::random_generator()(); |
| | | return std::string((char *) &id, sizeof(id)); |
| | | } |
| | | BHMsg InitMsg(MsgType type, const std::string &msgid = RandId()) |
| | | { |
| | | BHMsg msg; |
| | | msg.set_msg_id(msgid); |
| | | msg.set_type(type); |
| | | time_t tm = 0; |
| | | msg.set_timestamp(time(&tm)); |
| | | return msg; |
| | | } |
| | | |
| | | BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size) |
| | | { |
| | | assert(data && size); |
| | | BHMsg msg(InitMsg(kMsgTypeRequest)); |
| | | msg.set_body(data, size); |
| | | msg.add_route()->set_mq_id(&src_id, sizeof(src_id)); |
| | | return msg; |
| | | } |
| | | BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size) |
| | | { |
| | | BHMsg msg(InitMsg(kMsgTypeRequest)); |
| | | AddRoute(msg, src_id); |
| | | DataRequest req; |
| | | req.set_topic(topic); |
| | | req.set_data(data, size); |
| | | const std::string &body(req.SerializeAsString()); |
| | | return MakeRequest(src_id, body.data(), body.size()); |
| | | msg.set_body(req.SerializeAsString()); |
| | | return msg; |
| | | } |
| | | |
| | | BHMsg MakeReply(const void *data, const size_t size) |
| | | BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics) |
| | | { |
| | | BHMsg msg(InitMsg(kMsgTypeProcRegisterTopics)); |
| | | AddRoute(msg, src_id); |
| | | DataProcRegister reg; |
| | | reg.mutable_proc()->Swap(&info); |
| | | for (auto &t : topics) { |
| | | reg.add_topics(t); |
| | | } |
| | | msg.set_body(reg.SerializeAsString()); |
| | | return msg; |
| | | } |
| | | |
| | | BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info) |
| | | { |
| | | BHMsg msg(InitMsg(kMsgTypeProcHeartbeat)); |
| | | AddRoute(msg, src_id); |
| | | DataProcRegister reg; |
| | | reg.mutable_proc()->Swap(&info); |
| | | msg.set_body(reg.SerializeAsString()); |
| | | return msg; |
| | | } |
| | | |
| | | BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size) |
| | | { |
| | | assert(data && size); |
| | | BHMsg msg(InitMsg(kMsgTypeReply)); |
| | | BHMsg msg(InitMsg(kMsgTypeReply, src_msgid)); |
| | | DataReply reply; |
| | | reply.set_data(data, size); |
| | | msg.set_body(reply.SerializeAsString()); |
| | |
| | | { |
| | | assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe); |
| | | BHMsg msg(InitMsg(sub_unsub)); |
| | | msg.add_route()->set_mq_id(&client, sizeof(client)); |
| | | AddRoute(msg, client); |
| | | DataSub subs; |
| | | for (auto &t : topics) { |
| | | subs.add_topics(t); |
| | |
| | | return msg; |
| | | } |
| | | |
| | | BHMsg MakeQueryTopic(const std::string &topic) |
| | | BHMsg MakeQueryTopic(const MQId &client, const std::string &topic) |
| | | { |
| | | BHMsg msg(InitMsg(kMsgTypeQueryTopic)); |
| | | DataQueryTopic query; |
| | | BHMsg msg(InitMsg(kMsgTypeProcQueryTopic)); |
| | | AddRoute(msg, client); |
| | | DataProcQueryTopic query; |
| | | query.set_topic(topic); |
| | | msg.set_body(query.SerializeAsString()); |
| | | return msg; |
| | | } |
| | | BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid) |
| | | { |
| | | BHMsg msg(InitMsg(kMsgTypeProcQueryTopicReply, msgid)); |
| | | DataProcQueryTopicReply reply; |
| | | reply.mutable_address()->set_mq_id(mqid); |
| | | msg.set_body(reply.SerializeAsString()); |
| | | return msg; |
| | | } |
| | | |
| | | void *Pack(SharedMemory &shm, const BHMsg &msg) |
| | | { |
| | |
| | | int num_ = 1; |
| | | }; |
| | | |
| | | BHMsg MakeQueryTopic(const std::string &topic); |
| | | BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size); |
| | | BHMsg MakeQueryTopic(const MQId &client, const std::string &topic); |
| | | BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid); |
| | | BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size); |
| | | BHMsg MakeReply(const void *data, const size_t size); |
| | | BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size); |
| | | BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics); |
| | | BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info); |
| | | BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics); |
| | | BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics); |
| | | BHMsg MakePub(const std::string &topic, const void *data, const size_t size); |
| | |
| | | return false; |
| | | } |
| | | DEFER1(imsg.Release(shm())); |
| | | return ShmMsgQueue::Send(shm(), kBHBusQueueId, imsg, timeout_ms); |
| | | return ShmMsgQueue::Send(shm(), kBHTopicBus, imsg, timeout_ms); |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | |
| | | bool SocketSubscribe::Subscribe(const std::vector<std::string> &topics, const int timeout_ms) |
| | | { |
| | | try { |
| | | return mq().Send(kBHBusQueueId, MakeSub(mq().Id(), topics), timeout_ms); |
| | | return mq().Send(kBHTopicBus, MakeSub(mq().Id(), topics), timeout_ms); |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | |
| | | Socket(shm, 64) {} |
| | | SocketSubscribe() : |
| | | SocketSubscribe(BHomeShm()) {} |
| | | ~SocketSubscribe() { Stop(); } |
| | | |
| | | typedef std::function<void(const std::string &topic, const std::string &data)> TopicDataCB; |
| | | bool StartRecv(const TopicDataCB &tdcb, int nworker = 2); |
| | | bool Stop() { return Socket::Stop(); } |
| | | bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms); |
| | | bool RecvSub(std::string &topic, std::string &data, const int timeout_ms); |
| | | }; |
| | |
| | | */ |
| | | #include "pubsub_center.h" |
| | | #include "bh_util.h" |
| | | |
| | | PubSubCenter::PubSubCenter(SharedMemory &shm) : |
| | | socket_(shm) {} |
| | | using namespace bhome_shm; |
| | | |
| | | bool PubSubCenter::Start(const int nworker) |
| | | { |
| | |
| | | #include <mutex> |
| | | #include <set> |
| | | #include <unordered_map> |
| | | using namespace bhome_shm; |
| | | |
| | | // publish/subcribe manager. |
| | | class PubSubCenter |
| | |
| | | class SocketBus : public ShmSocket |
| | | { |
| | | public: |
| | | SocketBus(SharedMemory &shm) : |
| | | ShmSocket(shm, &kBHBusQueueId, 1000) {} |
| | | SocketBus(ShmSocket::Shm &shm) : |
| | | ShmSocket(shm, &kBHTopicBus, 1000) {} |
| | | using ShmSocket::shm; |
| | | }; |
| | | SocketBus socket_; |
| | | ShmSocket::Shm &shm() { return socket_.shm(); } |
| | | std::mutex mutex_; |
| | | typedef std::set<MQId> Clients; |
| | | std::unordered_map<std::string, Clients> records_; |
| | | ShmSocket::Shm &shm() { return socket_.shm(); } |
| | | |
| | | public: |
| | | PubSubCenter(SharedMemory &shm); |
| | | PubSubCenter(ShmSocket::Shm &shm) : |
| | | socket_(shm) {} |
| | | PubSubCenter() : |
| | | PubSubCenter(BHomeShm()) {} |
| | | ~PubSubCenter() { Stop(); } |
| | |
| | | * ===================================================================================== |
| | | */ |
| | | #include "reqrep.h" |
| | | #include "bh_util.h" |
| | | #include "msg.h" |
| | | #include <chrono> |
| | | #include <condition_variable> |
| | |
| | | BHAddress addr; |
| | | if (QueryRPCTopic(topic, addr, timeout_ms)) { |
| | | return Call(addr.mq_id().data()); |
| | | } else { |
| | | return false; |
| | | } |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out) |
| | | bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms) |
| | | { |
| | | try { |
| | | BHAddress addr; |
| | | if (QueryRPCTopic(topic, addr, timeout_ms)) { |
| | | const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); |
| | | const BHMsg &req(MakeRequest(mq().Id(), topic, data, size)); |
| | | BHMsg reply; |
| | | if (SyncSendAndRecv(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) { |
| | | if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeReply) { |
| | | DataReply dr; |
| | | if (dr.ParseFromString(msg.body())) { |
| | | if (dr.ParseFromString(reply.body())) { |
| | | dr.mutable_data()->swap(out); |
| | | return true; |
| | | } |
| | | } |
| | | } else { |
| | | } |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb) |
| | |
| | | if (!st->canceled) { |
| | | static_cast<BHMsg *>(result)->Swap(&msg); |
| | | st->cv.notify_one(); |
| | | } // else result is no longer valid. |
| | | } else { |
| | | } |
| | | }; |
| | | |
| | | std::unique_lock<std::mutex> lk(st->mutex); |
| | | if (AsyncSend(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) { |
| | | bool sendok = AsyncSend(remote, msg, timeout_ms, OnRecv); |
| | | if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) { |
| | | return true; |
| | | } else { |
| | | st->canceled = true; |
| | |
| | | |
| | | bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms) |
| | | { |
| | | if (tmp_cache_.first == topic) { |
| | | addr = tmp_cache_.second; |
| | | return true; |
| | | } |
| | | |
| | | BHMsg result; |
| | | const BHMsg &msg = MakeQueryTopic(topic); |
| | | if (SyncSendAndRecv(&kBHTopicRPCId, &msg, &result, timeout_ms)) { |
| | | if (result.type() == kMsgTypeQueryTopicReply) { |
| | | DataQueryTopicReply reply; |
| | | const BHMsg &msg = MakeQueryTopic(mq().Id(), topic); |
| | | if (SyncSendAndRecv(&kBHTopicReqRepCenter, &msg, &result, timeout_ms)) { |
| | | if (result.type() == kMsgTypeProcQueryTopicReply) { |
| | | DataProcQueryTopicReply reply; |
| | | if (reply.ParseFromString(result.body())) { |
| | | addr = reply.address(); |
| | | tmp_cache_.first = topic; |
| | | tmp_cache_.second = addr; |
| | | return !addr.mq_id().empty(); |
| | | } |
| | | } |
| | | } else { |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | // reply socket |
| | | namespace |
| | | { |
| | | struct SrcInfo { |
| | | std::vector<BHAddress> route; |
| | | std::string msg_id; |
| | | }; |
| | | |
| | | } // namespace |
| | | |
| | | bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms) |
| | | { |
| | | //TODO check reply? |
| | | return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms); |
| | | } |
| | | bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms) |
| | | { |
| | | return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms); |
| | | } |
| | | bool SocketReply::StartWorker(const OnRequest &rcb, int nworker) |
| | | { |
| | | auto onRecv = [this, rcb](BHMsg &msg) { |
| | | if (msg.type() == kMsgTypeRequest && msg.route_size() > 0) { |
| | | DataRequest req; |
| | | if (req.ParseFromString(msg.body())) { |
| | | std::string out; |
| | | if (rcb(req.topic(), req.data(), out)) { |
| | | BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size())); |
| | | for (int i = 0; i < msg.route_size() - 1; ++i) { |
| | | msg.add_route()->Swap(msg.mutable_route(i)); |
| | | } |
| | | SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 100); |
| | | } |
| | | } |
| | | } else { |
| | | // ignored, or dropped |
| | | } |
| | | }; |
| | | |
| | | return rcb && Start(onRecv, nworker); |
| | | } |
| | | |
| | | bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) |
| | | { |
| | | BHMsg msg; |
| | | if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequest) { |
| | | DataRequest request; |
| | | if (request.ParseFromString(msg.body())) { |
| | | request.mutable_topic()->swap(topic); |
| | | request.mutable_data()->swap(data); |
| | | SrcInfo *p = new SrcInfo; |
| | | p->route.assign(msg.route().begin(), msg.route().end()); |
| | | p->msg_id = msg.msg_id(); |
| | | src_info = p; |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms) |
| | | { |
| | | SrcInfo *p = static_cast<SrcInfo *>(src_info); |
| | | DEFER1(delete p); |
| | | if (!p || p->route.empty()) { |
| | | return false; |
| | | } |
| | | |
| | | BHMsg msg(MakeReply(p->msg_id, data.data(), data.size())); |
| | | for (unsigned i = 0; i < p->route.size() - 1; ++i) { |
| | | msg.add_route()->Swap(&p->route[i]); |
| | | } |
| | | |
| | | return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms); |
| | | } |
| | |
| | | #define REQREP_ACEH09NK |
| | | |
| | | #include "defs.h" |
| | | #include "msg.h" |
| | | #include "socket.h" |
| | | #include <functional> |
| | | #include <unordered_map> |
| | | |
| | | using bhome::msg::ProcInfo; |
| | | |
| | | class SocketRequest : private ShmSocket |
| | | { |
| | |
| | | Socket(shm, 64) { StartWorker(); } |
| | | SocketRequest() : |
| | | SocketRequest(BHomeShm()) {} |
| | | ~SocketRequest() { Stop(); } |
| | | |
| | | typedef std::function<void(const std::string &data)> RequestResultCB; |
| | | bool StartWorker(const RequestResultCB &rrcb, int nworker = 2); |
| | | bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); } |
| | | bool Stop() { return Socket::Stop(); } |
| | | bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb); |
| | | bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb) |
| | | { |
| | | return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb); |
| | | } |
| | | bool SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out); |
| | | bool SyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, std::string &out) |
| | | bool SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms); |
| | | bool SyncRequest(const std::string &topic, const std::string &data, std::string &out, const int timeout_ms) |
| | | { |
| | | return SyncRequest(topic, data.data(), data.size(), timeout_ms, out); |
| | | return SyncRequest(topic, data.data(), data.size(), out, timeout_ms); |
| | | } |
| | | |
| | | private: |
| | |
| | | bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms); |
| | | bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms); |
| | | std::unordered_map<std::string, RecvCB> async_cbs_; |
| | | |
| | | std::pair<std::string, bhome::msg::BHAddress> tmp_cache_; |
| | | }; |
| | | |
| | | class SocketReply : private ShmSocket |
| | | { |
| | | typedef ShmSocket Socket; |
| | | |
| | | public: |
| | | SocketReply(Socket::Shm &shm) : |
| | | Socket(shm, 64) {} |
| | | SocketReply() : |
| | | SocketReply(BHomeShm()) {} |
| | | ~SocketReply() { Stop(); } |
| | | |
| | | typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest; |
| | | bool StartWorker(const OnRequest &rcb, int nworker = 2); |
| | | bool Stop() { return Socket::Stop(); } |
| | | bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms); |
| | | bool SendReply(void *src_info, const std::string &data, const int timeout_ms); |
| | | bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms); |
| | | bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms); |
| | | |
| | | private: |
| | | }; |
| | | |
| | | #endif // end of include guard: REQREP_ACEH09NK |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: reqrep_center.cpp |
| | | * |
| | | * Description: topic request/reply center |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月01日 14时08分50秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "reqrep_center.h" |
| | | #include "bh_util.h" |
| | | using namespace bhome_shm; |
| | | |
| | | struct A { |
| | | void F(int){}; |
| | | }; |
| | | |
| | | namespace |
| | | { |
| | | inline uint64_t Now() |
| | | { |
| | | time_t t; |
| | | return time(&t); |
| | | } |
| | | |
| | | } // namespace |
| | | bool ReqRepCenter::Start(const int nworker) |
| | | { |
| | | auto onRecv = [&](BHMsg &msg) { |
| | | #ifndef NDEBUG |
| | | static std::atomic<time_t> last(0); |
| | | time_t now = 0; |
| | | time(&now); |
| | | if (last.exchange(now) < now) { |
| | | printf("bus queue size: %ld\n", socket_.Pending()); |
| | | } |
| | | #endif |
| | | if (msg.route_size() == 0) { |
| | | return; |
| | | } |
| | | auto &src_mq = msg.route(0).mq_id(); |
| | | |
| | | auto OnRegister = [&]() { |
| | | DataProcRegister reg; |
| | | if (!reg.ParseFromString(msg.body())) { |
| | | return; |
| | | } |
| | | ProcInfo pi; |
| | | pi.server_mqid_ = src_mq; |
| | | pi.proc_id_ = reg.proc().name(); |
| | | pi.ext_info_ = reg.proc().info(); |
| | | pi.timestamp_ = Now(); |
| | | |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | for (auto &t : reg.topics()) { |
| | | topic_mq_[t] = pi.server_mqid_; |
| | | } |
| | | procs_[pi.proc_id_] = pi; |
| | | }; |
| | | |
| | | auto OnHeartbeat = [&]() { |
| | | DataProcHeartbeat hb; |
| | | if (!hb.ParseFromString(msg.body())) { |
| | | return; |
| | | } |
| | | |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | auto pos = procs_.find(hb.proc().name()); |
| | | if (pos != procs_.end() && pos->second.server_mqid_ == src_mq) { // both name and mq should be the same. |
| | | pos->second.timestamp_ = Now(); |
| | | pos->second.ext_info_ = hb.proc().info(); |
| | | } |
| | | }; |
| | | |
| | | auto OnQueryTopic = [&]() { |
| | | DataProcQueryTopic query; |
| | | if (!query.ParseFromString(msg.body())) { |
| | | return; |
| | | } |
| | | |
| | | std::string dest; |
| | | auto FindDest = [&]() { |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | auto pos = topic_mq_.find(query.topic()); |
| | | if (pos != topic_mq_.end()) { |
| | | dest = pos->second; |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | }; |
| | | if (FindDest()) { |
| | | MQId remote; |
| | | memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote)); |
| | | MsgI imsg; |
| | | if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; } |
| | | if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) { |
| | | imsg.Release(shm()); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | switch (msg.type()) { |
| | | case kMsgTypeProcRegisterTopics: OnRegister(); break; |
| | | case kMsgTypeProcHeartbeat: OnHeartbeat(); break; |
| | | case kMsgTypeProcQueryTopic: OnQueryTopic(); break; |
| | | default: break; |
| | | } |
| | | }; |
| | | |
| | | const int kMaxWorker = 16; |
| | | return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); |
| | | } |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: reqrep_center.h |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月01日 14时09分13秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #ifndef REQREP_CENTER_US3RBM60 |
| | | #define REQREP_CENTER_US3RBM60 |
| | | |
| | | #include "defs.h" |
| | | #include "socket.h" |
| | | #include <chrono> |
| | | #include <mutex> |
| | | #include <set> |
| | | |
| | | class ReqRepCenter |
| | | { |
| | | class Socket : public ShmSocket |
| | | { |
| | | public: |
| | | Socket(ShmSocket::Shm &shm) : |
| | | ShmSocket(shm, &kBHTopicReqRepCenter, 1000) {} |
| | | using ShmSocket::shm; |
| | | }; |
| | | Socket socket_; |
| | | ShmSocket::Shm &shm() { return socket_.shm(); } |
| | | struct ProcInfo { |
| | | std::string proc_id_; // unique name |
| | | std::string server_mqid_; |
| | | std::string ext_info_; // maybe json. |
| | | uint64_t timestamp_ = 0; |
| | | }; |
| | | |
| | | typedef std::string Dests; |
| | | |
| | | std::mutex mutex_; |
| | | std::unordered_map<std::string, Dests> topic_mq_; |
| | | std::unordered_map<std::string, ProcInfo> procs_; |
| | | |
| | | public: |
| | | ReqRepCenter(ShmSocket::Shm &shm) : |
| | | socket_(shm) {} |
| | | ReqRepCenter() : |
| | | ReqRepCenter(BHomeShm()) {} |
| | | ~ReqRepCenter() { Stop(); } |
| | | bool Start(const int nworker = 2); |
| | | bool Stop() { return socket_.Stop(); } |
| | | }; |
| | | |
| | | #endif // end of include guard: REQREP_CENTER_US3RBM60 |
| | |
| | | |
| | | ShmSocket::~ShmSocket() |
| | | { |
| | | Stop(); |
| | | Stop(); //TODO should stop in sub class, incase thread access sub class data. |
| | | } |
| | | |
| | | bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker) |
| | |
| | | |
| | | bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms) |
| | | { |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | if (!mq_ || RunningNoLock()) { |
| | | return false; |
| | | } else { |
| | | return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms); |
| | | } |
| | | return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms); |
| | | } |
| | | |
| | | bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms) |
| | |
| | | { |
| | | const std::string shm_name("ShmSpeed"); |
| | | ShmRemover auto_remove(shm_name); |
| | | const int mem_size = 1024 * 1024 * 50; |
| | | MQId id = boost::uuids::random_generator()(); |
| | | const int timeout = 100; |
| | | const int mem_size = 1024 * 1024 * 50; |
| | | MQId id = boost::uuids::random_generator()(); |
| | | const int timeout = 100; |
| | | const uint32_t data_size = 4000; |
| | | |
| | | auto Writer = [&](int writer_id, uint64_t n) { |
| | |
| | | std::string str(data_size, 'a'); |
| | | MsgI msg; |
| | | DEFER1(msg.Release(shm);); |
| | | msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size())); |
| | | msg.MakeRC(shm, MakeRequest(mq.Id(), "topic", str.data(), str.size())); |
| | | for (uint64_t i = 0; i < n; ++i) { |
| | | // mq.Send(id, str.data(), str.size(), timeout); |
| | | mq.Send(id, msg, timeout); |
| | |
| | | auto Test = [&](auto &www, auto &rrr, bool isfork) { |
| | | for (auto nreader : nreaders) { |
| | | for (auto nwriter : nwriters) { |
| | | const uint64_t nmsg = 1000 * 1000 * 10 / nwriter; |
| | | const uint64_t nmsg = 1000 * 1000 * 10 / nwriter; |
| | | const uint64_t total_msg = nmsg * nwriter; |
| | | std::atomic<bool> run(true); |
| | | std::this_thread::sleep_for(10ms); |
| | |
| | | run.store(false); |
| | | } |
| | | |
| | | // Request Reply Test |
| | | BOOST_AUTO_TEST_CASE(RRTest) |
| | | // Send Recv Test |
| | | BOOST_AUTO_TEST_CASE(SRTest) |
| | | { |
| | | const std::string shm_name("ShmReqRep"); |
| | | const std::string shm_name("ShmSendRecv"); |
| | | ShmRemover auto_remove(shm_name); |
| | | const int qlen = 64; |
| | | const int qlen = 64; |
| | | const size_t msg_length = 1000; |
| | | std::string msg_content(msg_length, 'a'); |
| | | msg_content[20] = '\0'; |
| | | |
| | | SharedMemory shm(shm_name, 1024 * 1024 * 50); |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | | auto init_avail = Avail(); |
| | | ShmMsgQueue srv(shm, qlen); |
| | | ShmMsgQueue cli(shm, qlen); |
| | | |
| | | MsgI request_rc; |
| | | request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size())); |
| | | request_rc.MakeRC(shm, MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size())); |
| | | MsgI reply_rc; |
| | | reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size())); |
| | | reply_rc.MakeRC(shm, MakeReply("fakemsgid", msg_content.data(), msg_content.size())); |
| | | |
| | | std::atomic<uint64_t> count(0); |
| | | |
| | |
| | | auto Client = [&](int cli_id, int nmsg) { |
| | | for (int i = 0; i < nmsg; ++i) { |
| | | auto Req = [&]() { |
| | | return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100); |
| | | return cli.Send(srv.Id(), MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size()), 100); |
| | | }; |
| | | auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); }; |
| | | |
| | |
| | | MQId src_id; |
| | | memcpy(&src_id, mqid.data(), sizeof(src_id)); |
| | | auto Reply = [&]() { |
| | | return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100); |
| | | return srv.Send(src_id, MakeReply(req.msg_id(), msg_content.data(), msg_content.size()), 100); |
| | | }; |
| | | auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); }; |
| | | |
| | |
| | | |
| | | ThreadManager clients, servers; |
| | | for (int i = 0; i < qlen; ++i) { servers.Launch(Server); } |
| | | int ncli = 100 * 1; |
| | | int ncli = 100 * 1; |
| | | uint64_t nmsg = 100 * 100 * 2; |
| | | printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg); |
| | | for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); } |
| | |
| | | #include "defs.h" |
| | | #include "pubsub.h" |
| | | #include "pubsub_center.h" |
| | | #include "reqrep.h" |
| | | #include "reqrep_center.h" |
| | | #include "socket.h" |
| | | #include "util.h" |
| | | #include <atomic> |
| | |
| | | bus.Stop(); |
| | | } |
| | | |
| | | BOOST_AUTO_TEST_CASE(ReqRepTest) |
| | | { |
| | | const std::string shm_name("ShmReqRep"); |
| | | ShmRemover auto_remove(shm_name); |
| | | SharedMemory shm(shm_name, 1024 * 1024 * 50); |
| | | |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | | auto init_avail = Avail(); |
| | | int *flag = shm.find_or_construct<int>("flag")(123); |
| | | printf("flag = %d\n", *flag); |
| | | ++*flag; |
| | | |
| | | ReqRepCenter center(shm); |
| | | center.Start(2); |
| | | std::atomic<bool> run(true); |
| | | |
| | | auto Client = [&](const std::string &topic, const int nreq) { |
| | | SocketRequest client(shm); |
| | | std::string reply; |
| | | boost::timer::auto_cpu_timer timer; |
| | | for (int i = 0; i < nreq; ++i) { |
| | | if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) { |
| | | printf("client request failed\n"); |
| | | } |
| | | } |
| | | printf("request %s %d done ", topic.c_str(), nreq); |
| | | }; |
| | | auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { |
| | | SocketReply server(shm); |
| | | ProcInfo info; |
| | | info.set_name(name); |
| | | info.set_info(name); |
| | | if (!server.Register(info, topics, 100)) { |
| | | printf("register failed\n"); |
| | | } |
| | | auto onData = [](const std::string &topic, const std::string &data, std::string &reply) { |
| | | reply = topic + ':' + data; |
| | | return true; |
| | | }; |
| | | server.StartWorker(onData); |
| | | while (run) { |
| | | std::this_thread::yield(); |
| | | } |
| | | }; |
| | | ThreadManager clients, servers; |
| | | std::vector<std::string> topics = {"topic1", "topic2"}; |
| | | servers.Launch(Server, "server", topics); |
| | | std::this_thread::sleep_for(100ms); |
| | | for (auto &t : topics) { |
| | | clients.Launch(Client, t, 1000 * 100); |
| | | } |
| | | clients.WaitAll(); |
| | | run = false; |
| | | servers.WaitAll(); |
| | | } |
| | | |
| | | inline int MyMin(int a, int b) |
| | | { |
| | | printf("MyMin\n"); |