From 2197cf91e7a3bd5941327ba630a42946b88f069e Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 09 四月 2021 14:15:41 +0800 Subject: [PATCH] join pub/sub to node; refactor. --- src/socket.h | 63 +++++-- src/proto.h | 2 src/msg.h | 26 +-- src/socket.cpp | 48 ++--- proto/source/bhome_msg.proto | 22 ++ utest/utest.cpp | 45 ++--- src/topic_node.cpp | 121 +++++++++++++- src/center.cpp | 14 + src/msg.cpp | 8 + utest/speed_test.cpp | 18 +- /dev/null | 31 --- utest/util.h | 23 ++ src/shm_queue.h | 21 -- src/topic_node.h | 22 ++ 14 files changed, 277 insertions(+), 187 deletions(-) diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto index b06b692..5056a26 100644 --- a/proto/source/bhome_msg.proto +++ b/proto/source/bhome_msg.proto @@ -3,6 +3,7 @@ // import "google/protobuf/descriptor.proto"; import "bhome_msg_api.proto"; +import "error_msg.proto"; package bhome.msg; @@ -18,12 +19,21 @@ bytes topic = 6; // for request route } -message BHMsg { // deprecated - bytes msg_id = 1; - int64 timestamp = 2; - int32 type = 3; - repeated BHAddress route = 4; // for reply and proxy. - bytes body = 5; +message MsgRequest { + MsgType type = 1; + // oneof body; +} + +message MsgReply { + ErrorMsg err_msg = 1; + // oneof reply +} + +message BHMsgBody { + oneof reqrep { + MsgRequest request = 1; + MsgReply reply = 2; + } } enum MsgType { diff --git a/src/center.cpp b/src/center.cpp index fe549b7..71c85c3 100644 --- a/src/center.cpp +++ b/src/center.cpp @@ -336,7 +336,7 @@ auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); - bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 10); + bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100); if (!r) { printf("send reply failed.\n"); } @@ -364,18 +364,20 @@ MsgPublish pub; NodeCenter::Clients clients; MsgCommonReply reply; - MsgI pubmsg; if (head.route_size() != 1 || !msg.ParseBody(pub)) { return; } else if (!center->FindClients(head, pub, clients, reply)) { - // send error reply. MakeReplyer(socket, head, center->id())(reply); - } else if (pubmsg.MakeRC(socket.shm(), msg)) { - DEFER1(pubmsg.Release(socket.shm())); + } else { + MakeReplyer(socket, head, center->id())(MakeReply(eSuccess)); + if (!msg.EnableRefCount(socket.shm())) { return; } // no memory? + for (auto &cli : clients) { auto node = cli.weak_node_.lock(); if (node) { - socket.Send(cli.mq_.data(), pubmsg, 10); + if (!socket.Send(cli.mq_.data(), msg, 100)) { + printf("center route publish failed. need resend.\n"); + } } } } diff --git a/src/msg.cpp b/src/msg.cpp index c353d84..06b817e 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -78,6 +78,14 @@ return true; } +bool MsgI::EnableRefCount(SharedMemory &shm) +{ + if (!IsCounted()) { + count_ = shm.New<RefCount>(); + } + return IsCounted(); +} + int MsgI::Release(SharedMemory &shm) { if (IsCounted()) { diff --git a/src/msg.h b/src/msg.h index 661d989..10ad0d2 100644 --- a/src/msg.h +++ b/src/msg.h @@ -105,27 +105,21 @@ bool IsCounted() const { return static_cast<bool>(count_); } template <class Body> - bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body) - { - return Make(shm, Pack(shm, head, body)); - } - template <class Body> - bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body) + inline bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body) { return MakeRC(shm, Pack(shm, head, body)); } - bool MakeRC(SharedMemory &shm, MsgI &a) + + bool EnableRefCount(SharedMemory &shm); + + template <class Body> + inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body) { - if (a.IsCounted()) { - *this = a; - AddRef(); - return true; - } else { - void *p = a.ptr_.get(); - a.ptr_ = 0; - return MakeRC(shm, p); - } + void *p = Pack(shm, head, body); + auto NeedRefCount = [&]() { return head.type() == kMsgTypePublish; }; + return NeedRefCount() ? MakeRC(shm, p) : Make(shm, p); } + bool ParseHead(BHMsgHead &head) const; template <class Body> bool ParseBody(Body &body) const diff --git a/src/proto.h b/src/proto.h index 2057711..da3bde6 100644 --- a/src/proto.h +++ b/src/proto.h @@ -74,5 +74,5 @@ BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid); BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id); // inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } - +inline bool IsSuccess(const ErrorCode ec) { return ec == eSuccess; } #endif // end of include guard: PROTO_UA9UWKL1 diff --git a/src/pubsub.cpp b/src/pubsub.cpp deleted file mode 100644 index 471c63c..0000000 --- a/src/pubsub.cpp +++ /dev/null @@ -1,92 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: pubsub.cpp - * - * Description: - * - * Version: 1.0 - * Created: 2021骞�03鏈�24鏃� 18鏃�44鍒�13绉� - * Revision: none - * Compiler: gcc - * - * Author: Li Chao (), - * Organization: - * - * ===================================================================================== - */ -#include "pubsub.h" -#include "bh_util.h" -#include "defs.h" - -using namespace std::chrono_literals; -using namespace bhome_msg; - -bool SocketPublish::Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms) -{ - try { - MsgPublish pub; - pub.set_topic(topic); - pub.set_data(data, size); - BHMsgHead head(InitMsgHead(GetType(pub), proc_id)); - MsgI imsg; - if (imsg.MakeRC(shm(), head, pub)) { - DEFER1(imsg.Release(shm())); - return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms); - } - } catch (...) { - } - return false; -} -namespace -{ -inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } - -} // namespace -bool SocketSubscribe::Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms) -{ - try { - MsgSubscribe sub; - for (auto &topic : topics) { - sub.add_topics(topic); - } - BHMsgHead head(InitMsgHead(GetType(sub), proc_id)); - AddRoute(head, mq().Id()); - - return Send(&BHTopicBusAddress(), head, sub, timeout_ms); - } catch (...) { - return false; - } -} - -bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker) -{ - auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { - if (head.type() == kMsgTypePublish) { - MsgPublish pub; - if (imsg.ParseBody(pub)) { - tdcb(head.proc_id(), pub.topic(), pub.data()); - } - } else { - // ignored, or dropped - } - }; - - return tdcb && Start(AsyncRecvProc, nworker); -} - -bool SocketSubscribe::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms) -{ - MsgI msg; - BHMsgHead head; - if (SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { - MsgPublish pub; - if (msg.ParseBody(pub)) { - head.mutable_proc_id()->swap(proc_id); - pub.mutable_topic()->swap(topic); - pub.mutable_data()->swap(data); - return true; - } - } - return false; -} \ No newline at end of file diff --git a/src/pubsub.h b/src/pubsub.h deleted file mode 100644 index bd60fcd..0000000 --- a/src/pubsub.h +++ /dev/null @@ -1,58 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: pubsub.h - * - * Description: - * - * Version: 1.0 - * Created: 2021骞�03鏈�24鏃� 18鏃�44鍒�36绉� - * Revision: none - * Compiler: gcc - * - * Author: Li Chao (), - * Organization: - * - * ===================================================================================== - */ -#ifndef PUBSUB_4KGRA997 -#define PUBSUB_4KGRA997 - -#include "defs.h" -#include "socket.h" -#include <string> - -class SocketPublish -{ - typedef ShmSocket Socket; - Socket::Shm &shm_; - Socket::Shm &shm() { return shm_; } - -public: - SocketPublish(Socket::Shm &shm) : - shm_(shm) {} - SocketPublish() : - SocketPublish(BHomeShm()) {} - bool Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms); -}; - -// socket subscribe -class SocketSubscribe : private ShmSocket -{ - typedef ShmSocket Socket; - -public: - SocketSubscribe(Socket::Shm &shm) : - Socket(shm, 64) {} - SocketSubscribe() : - SocketSubscribe(BHomeShm()) {} - ~SocketSubscribe() { Stop(); } - - typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB; - bool StartRecv(const TopicDataCB &tdcb, int nworker = 2); - bool Stop() { return Socket::Stop(); } - bool Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms); - bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms); -}; - -#endif // end of include guard: PUBSUB_4KGRA997 diff --git a/src/shm_queue.h b/src/shm_queue.h index 32ccfae..88c13ec 100644 --- a/src/shm_queue.h +++ b/src/shm_queue.h @@ -136,25 +136,8 @@ static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend); static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms); - template <class... Extra> - bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, Extra const &...extra) - { - return Send(shm(), remote_id, msg, timeout_ms, extra...); - } - template <class Body, class... Extra> - bool Send(const MQId &remote_id, const BHMsgHead &head, const Body &body, const int timeout_ms, Extra const &...extra) - { - MsgI msg; - if (msg.Make(shm(), head, body)) { - if (Send(shm(), remote_id, msg, timeout_ms, extra...)) { - return true; - } else { - msg.Release(shm()); - } - } - return false; - } - + template <class... Rest> + bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); } size_t Pending() const { return data()->size(); } }; diff --git a/src/socket.cpp b/src/socket.cpp index f2b29f4..116175d 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -43,51 +43,37 @@ bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle) { - auto onRecv = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { - auto Find = [&](RecvCB &cb) { - std::lock_guard<std::mutex> lock(mutex()); - const std::string &msgid = head.msg_id(); - auto pos = async_cbs_.find(msgid); - if (pos != async_cbs_.end()) { - cb.swap(pos->second); - async_cbs_.erase(pos); - return true; - } else { - return false; - } - }; - + auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { RecvCB cb; - if (Find(cb)) { + if (async_cbs_->Find(head.msg_id(), cb)) { cb(socket, imsg, head); } else if (onData) { onData(socket, imsg, head); } // else ignored, or dropped }; - std::lock_guard<std::mutex> lock(mutex_); - StopNoLock(); - auto RecvProc = [this, onRecv, onIdle]() { - while (run_) { - try { - MsgI imsg; - if (mq().Recv(imsg, 10)) { - DEFER1(imsg.Release(shm())); - BHMsgHead head; - if (imsg.ParseHead(head)) { - onRecv(*this, imsg, head); - } - } else if (onIdle) { - onIdle(*this); + auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() { + try { + MsgI imsg; + if (mq().Recv(imsg, 10)) { + DEFER1(imsg.Release(shm())); + BHMsgHead head; + if (imsg.ParseHead(head)) { + onRecvWithPerMsgCB(*this, imsg, head); } - } catch (...) { + } else if (onIdle) { + onIdle(*this); } + } catch (...) { } }; + std::lock_guard<std::mutex> lock(mutex_); + StopNoLock(); + run_.store(true); for (int i = 0; i < nworker; ++i) { - workers_.emplace_back(RecvProc); + workers_.emplace_back([this, recvLoopBody]() { while (run_) { recvLoopBody(); } }); } return true; } diff --git a/src/socket.h b/src/socket.h index 7c4f83f..f73bee5 100644 --- a/src/socket.h +++ b/src/socket.h @@ -19,6 +19,7 @@ #ifndef SOCKET_GWTJHBPO #define SOCKET_GWTJHBPO +#include "bh_util.h" #include "defs.h" #include "shm_queue.h" #include <atomic> @@ -34,6 +35,15 @@ class ShmSocket : private boost::noncopyable { + template <class DoSend> + inline bool SendImpl(MsgI &msg, const int timeout_ms, const DoSend &doSend) + { + bool r = false; + DEFER1(if (msg.IsCounted() || !r) { msg.Release(shm()); }); + r = doSend(msg); + return r; + } + protected: typedef bhome_shm::ShmMsgQueue Queue; @@ -55,30 +65,28 @@ bool Stop(); size_t Pending() const { return mq().Pending(); } - bool Send(const void *id, const MsgI &imsg, const int timeout_ms) + bool Send(const void *valid_remote, const MsgI &imsg, const int timeout_ms) { - return mq().Send(*static_cast<const MQId *>(id), imsg, timeout_ms); + assert(valid_remote); + return mq().Send(*static_cast<const MQId *>(valid_remote), imsg, timeout_ms); } //TODO reimplment, using async. bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms); template <class Body> - bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb = RecvCB()) + bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb) { - assert(valid_remote); - try { - if (cb) { - auto RegisterCB = [&]() { - std::lock_guard<std::mutex> lock(mutex()); - async_cbs_.emplace(head.msg_id(), cb); - }; - return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms, RegisterCB); - } else { - return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms); - } - } catch (...) { - return false; - } + auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { async_cbs_->Add(head.msg_id(), cb); }); }; + MsgI msg; + return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend); + } + + template <class Body> + bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms) + { + auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms); }; + MsgI msg; + return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend); } template <class Body> @@ -133,7 +141,26 @@ std::atomic<bool> run_; Queue mq_; - std::unordered_map<std::string, RecvCB> async_cbs_; + class AsyncCBs + { + std::unordered_map<std::string, RecvCB> store_; + + public: + bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; } + bool Find(const std::string &id, RecvCB &cb) + { + auto pos = store_.find(id); + if (pos != store_.end()) { + cb.swap(pos->second); + store_.erase(pos); + return true; + } else { + return false; + } + } + }; + + Synced<AsyncCBs> async_cbs_; }; #endif // end of include guard: SOCKET_GWTJHBPO diff --git a/src/topic_node.cpp b/src/topic_node.cpp index c6c9771..d76c03a 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -76,29 +76,34 @@ shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) { SockNode().Start(); + SockClient().Start(); + SockServer().Start(); } + TopicNode::~TopicNode() { StopAll(); - SockNode().Stop(); } + void TopicNode::StopAll() { - ServerStop(); - ClientStopWorker(); + SockServer().Stop(); + SockClient().Stop(); + SockNode().Stop(); } bool TopicNode::Register(const MsgRegister &body, MsgCommonReply &reply_body, const int timeout_ms) { + auto &sock = SockNode(); + auto head(InitMsgHead(GetType(body), body.proc().proc_id())); - AddRoute(head, SockNode().id()); + AddRoute(head, sock.id()); MsgI reply; DEFER1(reply.Release(shm_);); BHMsgHead reply_head; - bool r = SockNode().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); - r = r && reply_head.type() == kMsgTypeCommonReply; - r = r && reply.ParseBody(reply_body); + bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); if (r) { info_ = body; } @@ -108,14 +113,15 @@ bool TopicNode::RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply_body, const int timeout_ms) { //TODO check registered + auto &sock = SockServer(); auto head(InitMsgHead(GetType(body), proc_id())); - AddRoute(head, SockReply().id()); + AddRoute(head, sock.id()); MsgI reply; DEFER1(reply.Release(shm_);); BHMsgHead reply_head; - bool r = SockReply().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); r = r && reply_head.type() == kMsgTypeCommonReply; r = r && reply.ParseBody(reply_body); return r; @@ -154,15 +160,17 @@ onIdle(sock); }; - return rcb && SockReply().Start(onRecv, onIdle, nworker); + auto &sock = SockServer(); + return rcb && sock.Start(onRecv, onIdle, nworker); } -bool TopicNode::ServerStop() { return SockReply().Stop(); } bool TopicNode::ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) { + auto &sock = SockServer(); + MsgI imsg; BHMsgHead head; - if (SockReply().SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { + if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { MsgRequestTopic request; if (imsg.ParseBody(request)) { request.mutable_topic()->swap(topic); @@ -179,6 +187,8 @@ bool TopicNode::ServerSendReply(void *src_info, const std::string &data, const int timeout_ms) { + auto &sock = SockServer(); + SrcInfo *p = static_cast<SrcInfo *>(src_info); DEFER1(delete p); if (!p || p->route.empty()) { @@ -192,7 +202,7 @@ head.add_route()->Swap(&p->route[i]); } - return SockReply().Send(p->route.back().mq_id().data(), head, body, timeout_ms); + return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms); } bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker) @@ -211,12 +221,12 @@ return SockRequest().Start(onData, nworker); } -bool TopicNode::ClientStopWorker() { return SockRequest().Stop(); } bool TopicNode::ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) { auto Call = [&](const void *remote) { auto &sock = SockRequest(); + MsgRequestTopic req; req.set_topic(topic); req.set_data(data, size); @@ -254,6 +264,7 @@ { try { auto &sock = SockRequest(); + BHAddress addr; if (ClientQueryRPCTopic(topic, addr, timeout_ms)) { @@ -290,6 +301,7 @@ bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) { auto &sock = SockRequest(); + if (topic_query_cache_.Find(topic, addr)) { return true; } @@ -319,4 +331,85 @@ } else { } return false; +} + +// publish + +bool TopicNode::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms) +{ + try { + auto &sock = SockPub(); + + MsgPublish pub; + pub.set_topic(topic); + pub.set_data(data, size); + BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); + AddRoute(head, sock.id()); + + MsgI reply; + DEFER1(reply.Release(shm());); + BHMsgHead reply_head; + MsgCommonReply reply_body; + return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && + reply_head.type() == kMsgTypeCommonReply && + reply.ParseBody(reply_body) && + IsSuccess(reply_body.errmsg().errcode()); + } catch (...) { + } + return false; +} + +// subscribe + +bool TopicNode::Subscribe(const std::vector<Topic> &topics, const int timeout_ms) +{ + try { + auto &sock = SockSub(); + MsgSubscribe sub; + for (auto &topic : topics) { + sub.add_topics(topic); + } + BHMsgHead head(InitMsgHead(GetType(sub), proc_id())); + AddRoute(head, sock.id()); + + return sock.Send(&BHTopicBusAddress(), head, sub, timeout_ms); + } catch (...) { + return false; + } +} + +bool TopicNode::SubscribeStartWorker(const TopicDataCB &tdcb, int nworker) +{ + auto &sock = SockSub(); + + auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { + if (head.type() == kMsgTypePublish) { + MsgPublish pub; + if (imsg.ParseBody(pub)) { + tdcb(head.proc_id(), pub.topic(), pub.data()); + } + } else { + // ignored, or dropped + } + }; + + return tdcb && sock.Start(AsyncRecvProc, nworker); +} + +bool TopicNode::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms) +{ + auto &sock = SockSub(); + MsgI msg; + DEFER1(msg.Release(shm());); + BHMsgHead head; + if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { + MsgPublish pub; + if (msg.ParseBody(pub)) { + head.mutable_proc_id()->swap(proc_id); + pub.mutable_topic()->swap(topic); + pub.mutable_data()->swap(data); + return true; + } + } + return false; } \ No newline at end of file diff --git a/src/topic_node.h b/src/topic_node.h index 8852af1..34fe2ee 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -19,7 +19,6 @@ #define TOPIC_NODE_YVKWA6TF #include "msg.h" -#include "pubsub.h" #include "socket.h" #include <memory> @@ -32,23 +31,26 @@ SharedMemory &shm_; MsgRegister info_; + SharedMemory &shm() { return shm_; } + public: TopicNode(SharedMemory &shm); ~TopicNode(); + + void StopAll(); + // topic node bool Register(const MsgRegister &body, MsgCommonReply &reply, const int timeout_ms); bool RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply, const int timeout_ms); // topic rpc server typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest; bool ServerStart(OnRequest const &cb, const int nworker = 2); - bool ServerStop(); bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms); bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms); // topic client typedef std::function<void(const std::string &data)> RequestResultCB; bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2); - bool ClientStopWorker(); bool ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()); bool ClientAsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()) { @@ -60,7 +62,14 @@ return ClientSyncRequest(topic, data.data(), data.size(), out, timeout_ms); } - void StopAll(); + // publish + bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms); + + // subscribe + typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB; + bool SubscribeStartWorker(const TopicDataCB &tdcb, int nworker = 2); + bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms); + bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms); private: bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); @@ -106,14 +115,17 @@ // some sockets may be the same one, using functions make it easy to change. auto &SockNode() { return sock_node_; } + auto &SockPub() { return SockNode(); } auto &SockSub() { return sock_sub_; } auto &SockRequest() { return sock_request_; } + auto &SockClient() { return SockRequest(); } auto &SockReply() { return sock_reply_; } + auto &SockServer() { return SockReply(); } ShmSocket sock_node_; ShmSocket sock_request_; ShmSocket sock_reply_; - SocketSubscribe sock_sub_; + ShmSocket sock_sub_; TopicQueryCache topic_query_cache_; }; diff --git a/src/topic_rpc.cpp b/src/topic_rpc.cpp deleted file mode 100644 index 065a861..0000000 --- a/src/topic_rpc.cpp +++ /dev/null @@ -1,21 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: topic_rpc.cpp - * - * Description: topic request/reply manager - * - * Version: 1.0 - * Created: 2021骞�03鏈�31鏃� 16鏃�29鍒�31绉� - * Revision: none - * Compiler: gcc - * - * Author: YOUR NAME (), - * Organization: - * - * ===================================================================================== - */ -#include "topic_rpc.h" - - - diff --git a/src/topic_rpc.h b/src/topic_rpc.h deleted file mode 100644 index 40ff985..0000000 --- a/src/topic_rpc.h +++ /dev/null @@ -1,31 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: topic_rpc.h - * - * Description: - * - * Version: 1.0 - * Created: 2021骞�03鏈�31鏃� 16鏃�30鍒�10绉� - * Revision: none - * Compiler: gcc - * - * Author: YOUR NAME (), - * Organization: - * - * ===================================================================================== - */ -#ifndef TOPIC_RPC_JU1AYN5L -#define TOPIC_RPC_JU1AYN5L - -#include "socket.h" - -// request/reply topic manager -class RPCManager -{ - ShmSocket socket_; - -public: -}; - -#endif // end of include guard: TOPIC_RPC_JU1AYN5L diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index d777f91..b1f11ac 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -40,6 +40,7 @@ body.set_data(str); auto head(InitMsgHead(GetType(body), proc_id)); msg.MakeRC(shm, head, body); + assert(msg.IsCounted()); DEFER1(msg.Release(shm);); for (uint64_t i = 0; i < n; ++i) { @@ -127,8 +128,8 @@ SharedMemory shm(shm_name, 1024 * 1024 * 50); auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); - ShmMsgQueue srv(shm, qlen); - ShmMsgQueue cli(shm, qlen); + ShmSocket srv(shm, qlen); + ShmSocket cli(shm, qlen); MsgI request_rc; MsgRequestTopic req_body; @@ -156,9 +157,9 @@ req_body.set_topic("topic"); req_body.set_data(msg_content); auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); - return cli.Send(srv.Id(), req_head, req_body, 100); + return cli.Send(&srv.id(), req_head, req_body, 100); }; - auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); }; + auto ReqRC = [&]() { return cli.Send(&srv.id(), request_rc, 1000); }; if (!ReqRC()) { printf("********** client send error.\n"); @@ -166,7 +167,7 @@ } MsgI msg; BHMsgHead head; - if (!cli.Recv(msg, 1000)) { + if (!cli.SyncRecv(msg, head, 1000)) { printf("********** client recv error.\n"); } else { DEFER1(msg.Release(shm)); @@ -187,8 +188,9 @@ BHMsgHead req_head; while (!stop) { - if (srv.Recv(req, 100)) { + if (srv.SyncRecv(req, req_head, 100)) { DEFER1(req.Release(shm)); + if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) { auto &mqid = req_head.route()[0].mq_id(); MQId src_id; @@ -198,9 +200,9 @@ reply_body.set_topic("topic"); reply_body.set_data(msg_content); auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id())); - return srv.Send(src_id, reply_head, reply_body, 100); + return srv.Send(&src_id, reply_head, reply_body, 100); }; - auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); }; + auto ReplyRC = [&]() { return srv.Send(&src_id, reply_rc, 100); }; if (ReplyRC()) { } diff --git a/utest/utest.cpp b/utest/utest.cpp index c925e22..f88eab9 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -1,8 +1,5 @@ #include "center.h" #include "defs.h" -#include "pubsub.h" -#include "socket.h" -#include "topic_node.h" #include "util.h" #include <atomic> #include <boost/uuid/uuid_generators.hpp> @@ -92,8 +89,12 @@ const uint64_t nmsg = 100 * 2; const int timeout = 1000; auto Sub = [&](int id, const std::vector<std::string> &topics) { - SocketSubscribe client(shm); - bool r = client.Subscribe(sub_proc_id, topics, timeout); + DemoNode client("client_" + std::to_string(id), shm); + + bool r = client.Subscribe(topics, timeout); + if (!r) { + printf("client subscribe failed.\n"); + } std::mutex mutex; std::condition_variable cv; @@ -112,18 +113,19 @@ } // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); }; - client.StartRecv(OnTopicData, 1); + client.SubscribeStartWorker(OnTopicData, 1); std::unique_lock<std::mutex> lk(mutex); cv.wait(lk); }; auto Pub = [&](const std::string &topic) { - SocketPublish provider(shm); + DemoNode provider("server_" + topic, shm); + for (unsigned i = 0; i < nmsg; ++i) { std::string data = topic + std::to_string(i) + std::string(1000, '-'); - bool r = provider.Publish(pub_proc_id, topic, data.data(), data.size(), timeout); + bool r = provider.Publish(topic, data.data(), data.size(), timeout); if (!r) { printf("pub ret: %s\n", r ? "ok" : "fail"); } @@ -184,15 +186,7 @@ std::atomic<bool> run(true); auto Client = [&](const std::string &topic, const int nreq) { - TopicNode client(shm); - MsgRegister reg; - reg.mutable_proc()->set_proc_id(client_proc_id + topic); - MsgCommonReply reply_body; - - if (!client.Register(reg, reply_body, 1000)) { - printf("client register failed\n"); - return; - } + DemoNode client(client_proc_id + topic, shm); std::atomic<int> count(0); std::string reply; @@ -218,21 +212,13 @@ do { std::this_thread::yield(); } while (count.load() < nreq); - client.ClientStopWorker(); + client.StopAll(); printf("request %s %d done ", topic.c_str(), count.load()); }; + std::atomic_uint64_t server_msg_count(0); auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { - TopicNode server(shm); - MsgRegister reg; - reg.mutable_proc()->set_proc_id(server_proc_id); - reg.mutable_proc()->set_name(name); - MsgCommonReply reply_body; - - if (!server.Register(reg, reply_body, 100)) { - printf("server register failed\n"); - return; - } + DemoNode server(name, shm); auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) { ++server_msg_count; @@ -245,6 +231,7 @@ for (auto &topic : topics) { rpc.add_topics(topic); } + MsgCommonReply reply_body; if (!server.RegisterRPC(rpc, reply_body, 100)) { printf("server register topic failed\n"); return; @@ -262,7 +249,7 @@ clients.Launch(Client, t, 1000 * 1); } clients.WaitAll(); - printf("clients done, server replyed: %d\n", server_msg_count.load()); + printf("clients done, server replyed: %ld\n", server_msg_count.load()); run = false; servers.WaitAll(); } diff --git a/utest/util.h b/utest/util.h index ca58cd7..28b636e 100644 --- a/utest/util.h +++ b/utest/util.h @@ -20,9 +20,7 @@ #define UTIL_W8A0OA5U #include "bh_util.h" -#include "msg.h" -#include "shm.h" -#include "shm_queue.h" +#include "topic_node.h" #include <boost/date_time/posix_time/posix_time.hpp> #include <boost/noncopyable.hpp> #include <boost/test/unit_test.hpp> @@ -107,4 +105,23 @@ ~ShmRemover() { SharedMemory::Remove(name_); } }; +class DemoNode : public TopicNode +{ + std::string id_; + +public: + DemoNode(const std::string &id, SharedMemory &shm) : + TopicNode(shm), id_(id) { Init(); } + void Init() + { + MsgRegister reg; + reg.mutable_proc()->set_proc_id(id_); + MsgCommonReply reply_body; + + if (!Register(reg, reply_body, 1000)) { + printf("node %s register failed\n", id_.c_str()); + } + } +}; + #endif // end of include guard: UTIL_W8A0OA5U -- Gitblit v1.8.0