From c64c54d8e75b9354dc49a7b6b2d326e7dd59eb37 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 15 四月 2021 19:32:16 +0800 Subject: [PATCH] add api; fix send, socknode mem leak. --- src/proto.cpp | 2 src/socket.h | 11 box/center.cpp | 53 ++- src/proto.h | 2 .vscode/settings.json | 3 utest/utest.cpp | 28 +- src/topic_node.cpp | 129 +++++++--- box/center_main.cc | 25 ++ utest/api_test.cpp | 196 +++++++++++++++- utest/util.h | 7 .vscode/launch.json | 2 src/topic_node.h | 14 src/bh_api.h | 26 ++ src/sendq.h | 2 src/bh_api.cpp | 142 ++++++++--- box/app_arg.h | 59 ++++ 16 files changed, 558 insertions(+), 143 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 12aa21d..939b9a9 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -11,7 +11,7 @@ "program": "${workspaceFolder}/debug/bin/utest", "args": [ "-t", - "SRTest" + "ApiTest" ], "stopAtEntry": false, "cwd": "${workspaceFolder}", diff --git a/.vscode/settings.json b/.vscode/settings.json index 97450e9..88753a7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -60,7 +60,8 @@ "*.inc": "cpp", "strstream": "cpp", "unordered_set": "cpp", - "cfenv": "cpp" + "cfenv": "cpp", + "*.ipp": "cpp" }, "files.exclude": { "**/*.un~": true, diff --git a/box/app_arg.h b/box/app_arg.h new file mode 100644 index 0000000..e9e2c1c --- /dev/null +++ b/box/app_arg.h @@ -0,0 +1,59 @@ +#ifndef APP_ARG_OQMELZBX +#define APP_ARG_OQMELZBX + +#include <map> +#include <string> + +class AppArg +{ + typedef std::map<std::string, std::string> ArgMap; +public: + AppArg(int argc, const char *argv[]) { + Parse(argc, argv); + } + bool Has(const std::string &key) const { + return Pos(key) != args.end(); + } + std::string Get(const std::string &key, const std::string &def = "") const { + ArgMap::const_iterator pos = Pos(key); + if (pos != args.end()) { + return pos->second; + } else { + return def; + } + } +private: + void Parse(int argc, const char *argv[]) { + for (int i = 1; i < argc; ++i) { + std::string text(argv[i]); + if (text.substr(0, 2) == "--") { + text = text.substr(2); + std::string::size_type sep = text.find('='); + if (sep == std::string::npos) { + args[text].clear(); + } else { + args[text.substr(0, sep)] = text.substr(sep+1); + } + } else if (text.substr(0,1) == "-") { + text = text.substr(1); + args[text].clear(); + if (i+1 < argc) { + std::string next(argv[i+1]); + if (next.substr(0,1) != "-") { + args[text] = next; + ++i; + } + } + } + } + + } + ArgMap::const_iterator Pos(const std::string &key) const { + return args.find(key); + } + + ArgMap args; +}; + +#endif // end of include guard: APP_ARG_OQMELZBX + diff --git a/box/center.cpp b/box/center.cpp index 0dd4ed4..8625f7f 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -121,20 +121,18 @@ }; auto pos = nodes_.find(head.proc_id()); - if (pos == nodes_.end()) { // new client - Node node(new NodeInfo); - UpdateRegInfo(node); - nodes_[node->proc_.proc_id()] = node; - } else { + if (pos != nodes_.end()) { // new client Node &node = pos->second; if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) { // node restarted, release old mq. - for (auto &addr : node->addrs_) { - cleaner_(addr); - } - node->addrs_.clear(); + RemoveNode(node); + node.reset(new NodeInfo); } UpdateRegInfo(node); + } else { + Node node(new NodeInfo); + UpdateRegInfo(node); + nodes_[node->proc_.proc_id()] = node; } return MakeReply(eSuccess); } catch (...) { @@ -334,11 +332,7 @@ auto &cli = *it->second; cli.state_.UpdateState(now, offline_time_, kill_time_); if (cli.state_.flag_ == kStateKillme) { - if (cleaner_) { - for (auto &addr : cli.addrs_) { - cleaner_(addr); - } - } + RemoveNode(it->second); it = nodes_.erase(it); } else { ++it; @@ -357,6 +351,30 @@ { auto node = weak.lock(); return node && Valid(*node); + } + void RemoveNode(Node &node) + { + auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) { + for (auto &addr_topics : node_rec) { + TopicDest dest{addr_topics.first, node}; + for (auto &topic : addr_topics.second) { + auto pos = rec_map.find(topic); + if (pos != rec_map.end()) { + pos->second.erase(dest); + if (pos->second.empty()) { + rec_map.erase(pos); + } + } + } + } + }; + EraseMapRec(service_map_, node->services_); + EraseMapRec(subscribe_map_, node->subscriptions_); + + for (auto &addr : node->addrs_) { + cleaner_(addr); + } + node->addrs_.clear(); } std::string id_; // center proc id; @@ -403,11 +421,8 @@ auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); - MsgI msg; - if (msg.Make(socket.shm(), reply_head, rep_body)) { - auto &remote = head.route(0).mq_id(); - bool r = socket.Send(remote.data(), msg); - } + auto &remote = head.route(0).mq_id(); + socket.Send(remote.data(), reply_head, rep_body); }; }; diff --git a/box/center_main.cc b/box/center_main.cc index 40aed56..5baa409 100644 --- a/box/center_main.cc +++ b/box/center_main.cc @@ -15,17 +15,40 @@ * * ===================================================================================== */ +#include "app_arg.h" #include "box.h" #include "center.h" #include "defs.h" #include "signalhandle.h" +#include <chrono> +#include <thread> +using namespace std::chrono_literals; int center_main(int argc, const char *argv[]) { + AppArg args(argc, argv); + if (args.Has("remove")) { + BHomeShm().Remove(); + return 0; + } + + bool run = true; + auto showStatus = [&]() { + auto init = BHomeShm().get_free_memory(); + uint64_t idx = 0; + while (run) { + std::this_thread::sleep_for(1s); + printf("%8d shared memory: avail : %ld / %ld\n", ++idx, BHomeShm().get_free_memory(), init); + } + }; + std::thread t(showStatus); + BHCenter center(BHomeShm()); center.Start(); + printf("center started ...\n"); WaitForSignals({SIGINT, SIGTERM}); - // BHomeShm().Remove(); // remove ? + run = false; + t.join(); return 0; } diff --git a/src/bh_api.cpp b/src/bh_api.cpp index 78b8a59..2abe66d 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -39,11 +39,25 @@ } size_t size() const { return size_; } operator bool() const { return ptr_; } + bool ReleaseTo(void **pdata, int *psize) + { + if (!ptr_) { + return false; + } + if (pdata && psize) { + *psize = size(); + *pdata = release(); + } + return true; + } }; template <class Msg> bool PackOutput(const Msg &msg, void **out, int *out_len) { + if (!out || !out_len) { + return true; // not wanted. + } auto size = msg.ByteSizeLong(); TmpPtr p(size); if (!p) { @@ -51,30 +65,37 @@ return false; } msg.SerializePartialToArray(p.get(), size); - *out = p.release(); - *out_len = size; + p.ReleaseTo(out, out_len); return true; +} + +template <class MsgIn, class MsgOut = MsgCommonReply> +bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int), + const void *request, + const int request_len, + void **reply, + int *reply_len, + const int timeout_ms) +{ + MsgIn input; + if (!input.ParseFromArray(request, request_len)) { + SetLastError(eInvalidInput, "invalid input."); + return false; + } + MsgOut msg_reply; + if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) { + return PackOutput(msg_reply, reply, reply_len); + + } else { + return false; + } } } // namespace -bool BHRegister(const void *proc_info, - const int proc_info_len, - void **reply, - int *reply_len, - const int timeout_ms) +bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { - ProcInfo pi; - if (!pi.ParseFromArray(proc_info, proc_info_len)) { - SetLastError(eInvalidInput, "invalid input."); - return false; - } - MsgCommonReply msg_reply; - if (ProcNode().Register(pi, msg_reply, timeout_ms)) { - return PackOutput(msg_reply, reply, reply_len); - } else { - return false; - } + return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); } bool BHHeartBeatEasy(const int timeout_ms) @@ -82,23 +103,19 @@ return ProcNode().Heartbeat(timeout_ms); } -bool BHHeartBeat(const void *proc_info, - const int proc_info_len, - void **reply, - int *reply_len, - const int timeout_ms) +bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { - ProcInfo pi; - if (!pi.ParseFromArray(proc_info, proc_info_len)) { - SetLastError(eInvalidInput, "invalid input."); - return false; - } - MsgCommonReply msg_reply; - if (ProcNode().Heartbeat(pi, msg_reply, timeout_ms)) { - return PackOutput(msg_reply, reply, reply_len); - } else { - return false; - } + return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms); +} + +bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) +{ + return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); +} + +bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) +{ + return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); } bool BHPublish(const void *msgpub, @@ -125,8 +142,35 @@ if (ProcNode().RecvSub(proc, pub, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(pub, msgpub, msgpub_len)) { - *proc_id = pproc.release(); - *proc_id_len = pproc.size(); + pproc.ReleaseTo(proc_id, proc_id_len); + return true; + } else { + SetLastError(ENOMEM, "out of mem"); + } + } + return false; +} + +bool BHAsyncRequest(const void *request, + const int request_len, + void **msg_id, + int *msg_id_len) +{ + MsgRequestTopic req; + if (!req.ParseFromArray(request, request_len)) { + SetLastError(eInvalidInput, "invalid input."); + return false; + } + std::string str_msg_id; + MsgRequestTopicReply out_msg; + if (ProcNode().ClientAsyncRequest(req, str_msg_id)) { + if (!msg_id || !msg_id_len) { + return true; + } + TmpPtr ptr(str_msg_id); + if (ptr) { + ptr.ReleaseTo(msg_id, msg_id_len); + return true; } else { SetLastError(ENOMEM, "out of mem"); } @@ -152,8 +196,8 @@ if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(out_msg, reply, reply_len)) { - *proc_id = pproc.release(); - *proc_id_len = pproc.size(); + pproc.ReleaseTo(proc_id, proc_id_len); + return true; } else { SetLastError(ENOMEM, "out of mem"); } @@ -174,9 +218,9 @@ if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(out_msg, request, request_len)) { - *proc_id = pproc.release(); - *proc_id_len = pproc.size(); + pproc.ReleaseTo(proc_id, proc_id_len); *src = src_info; + return true; } else { SetLastError(ENOMEM, "out of mem"); } @@ -206,10 +250,11 @@ typedef std::function<bool(const void *, const int)> ServerSender; } // namespace -void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb) +void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb) { TopicNode::ServerCB on_req; TopicNode::SubDataCB on_sub; + TopicNode::RequestResultCB on_reply; if (server_cb) { on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { std::string sreq(request.SerializeAsString()); @@ -228,8 +273,16 @@ sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size()); }; } + if (client_cb) { + on_reply = [client_cb](const BHMsgHead &head, const MsgRequestTopicReply &rep) { + std::string s(rep.SerializeAsString()); + client_cb(head.proc_id().data(), head.proc_id().size(), + head.msg_id().data(), head.msg_id().size(), + s.data(), s.size()); + }; + } - ProcNode().Start(on_req, on_sub); + ProcNode().Start(on_req, on_sub, on_reply); } bool BHServerCallbackReply(const BHServerCallbackTag *tag, const void *data, @@ -251,10 +304,7 @@ std::string err_msg; GetLastError(ec, err_msg); TmpPtr p(err_msg); - if (p) { - *msg = p.release(); - *msg_len = p.size(); - } + p.ReleaseTo(msg, msg_len); } return ec; } diff --git a/src/bh_api.h b/src/bh_api.h index 1023ba4..eeb47a5 100644 --- a/src/bh_api.h +++ b/src/bh_api.h @@ -14,6 +14,18 @@ int *reply_len, const int timeout_ms); +bool BHRegisterTopics(const void *topics, + const int topics_len, + void **reply, + int *reply_len, + const int timeout_ms); + +bool BHSubscribeTopics(const void *topics, + const int topics_len, + void **reply, + int *reply_len, + const int timeout_ms); + typedef void (*FSubDataCallback)(const void *proc_id, const int proc_id_len, const void *data, @@ -25,7 +37,14 @@ const int data_len, BHServerCallbackTag *tag); -void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb); +typedef void (*FClientCallback)(const void *proc_id, + const int proc_id_len, + const void *msg_id, + const int msg_id_len, + const void *data, + const int data_len); + +void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb); bool BHServerCallbackReply(const BHServerCallbackTag *tag, const void *data, const int data_len); @@ -47,6 +66,11 @@ int *msgpub_len, const int timeout_ms); +bool BHAsyncRequest(const void *request, + const int request_len, + void **msg_id, + int *msg_id_len); + bool BHRequest(const void *request, const int request_len, void **proc_id, diff --git a/src/proto.cpp b/src/proto.cpp index 287924b..b1e8207 100644 --- a/src/proto.cpp +++ b/src/proto.cpp @@ -30,6 +30,8 @@ } // namespace +std::string NewMsgId() { return RandId(); } + BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id) { return InitMsgHead(type, proc_id, RandId()); diff --git a/src/proto.h b/src/proto.h index 42fe343..b418342 100644 --- a/src/proto.h +++ b/src/proto.h @@ -72,7 +72,7 @@ SetError(*msg.mutable_errmsg(), err_code, err_str); return msg; } - +std::string NewMsgId(); BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid); BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id); // inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } diff --git a/src/sendq.h b/src/sendq.h index b4f3821..aa8923d 100644 --- a/src/sendq.h +++ b/src/sendq.h @@ -55,7 +55,7 @@ void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) { using namespace std::chrono_literals; - Append(addr, msg, Now() + 60s, onExpire); + Append(addr, msg, Now() + 3s, onExpire); } bool TrySend(bhome_shm::ShmMsgQueue &mq); // bool empty() const { return store_.empty(); } diff --git a/src/socket.h b/src/socket.h index 0b0b880..96af6e7 100644 --- a/src/socket.h +++ b/src/socket.h @@ -36,7 +36,7 @@ class ShmSocket : private boost::noncopyable { - bool SendImpl(const void *valid_remote, const MsgI &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent()) + bool SendImpl(const void *valid_remote, MsgI const &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent()) { // if (!mq().TrySend(*(MQId *) valid_remote, imsg)) { send_buffer_.Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire); @@ -69,7 +69,11 @@ bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body) { MsgI msg; - return msg.Make(shm(), head, body) && SendImpl(valid_remote, msg); + if (msg.Make(shm(), head, body)) { + DEFER1(if (msg.IsCounted()) { msg.Release(shm()); }); + return SendImpl(valid_remote, msg); + } + return false; } template <class Body> @@ -78,6 +82,7 @@ //TODO send_buffer_ need flag, and remove callback on expire. MsgI msg; if (msg.Make(shm(), head, body)) { + DEFER1(if (msg.IsCounted()) { msg.Release(shm()); }); std::string msg_id(head.msg_id()); per_msg_cbs_->Add(msg_id, cb); auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) { @@ -85,6 +90,8 @@ per_msg_cbs_->Find(msg_id, cb_no_use); }; return SendImpl(valid_remote, msg, onExpireRemoveCB); + } else { + printf("out of mem?, avail: %ld\n", shm().get_free_memory()); } return false; } diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 4ce2c97..e9e627f 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -35,35 +35,45 @@ } // namespace TopicNode::TopicNode(SharedMemory &shm) : - shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) + shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm), registered_(false) { - SockNode().Start(); + // recv msgs to avoid memory leak. + auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; + SockNode().Start(default_ignore_msg); } TopicNode::~TopicNode() { Stop(); + SockNode().Stop(); } -void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb) +void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) { - ServerStart(server_cb, 1); - SubscribeStartWorker(sub_cb, 1); - // SockClient().Start(); + if (nworker < 1) { + nworker = 1; + } else if (nworker > 16) { + nworker = 16; + } + + ServerStart(server_cb, nworker); + SubscribeStartWorker(sub_cb, nworker); + ClientStartWorker(client_cb, nworker); } void TopicNode::Stop() { SockSub().Stop(); SockServer().Stop(); SockClient().Stop(); - SockNode().Stop(); } bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { + info_ = proc; + auto &sock = SockNode(); MsgRegister body; - *body.mutable_proc() = proc; + body.mutable_proc()->Swap(&proc); auto AddId = [&](const MQId &id) { body.add_addrs()->set_mq_id(&id, sizeof(id)); }; AddId(SockNode().id()); AddId(SockServer().id()); @@ -74,27 +84,39 @@ auto head(InitMsgHead(GetType(body), body.proc().proc_id())); AddRoute(head, sock.id()); + auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { + bool ok = head.type() == kMsgTypeCommonReply && + msg.ParseBody(rbody) && + IsSuccess(rbody.errmsg().errcode()); + printf("async regisered %s\n", ok ? "ok" : "failed"); + registered_.store(ok); + }; + if (timeout_ms == 0) { - return sock.Send(&BHTopicCenterAddress(), head, body); + auto onResult = [this, CheckResult](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { + MsgCommonReply body; + CheckResult(imsg, head, body); + }; + return sock.Send(&BHTopicCenterAddress(), head, body, onResult); } else { MsgI reply; DEFER1(reply.Release(shm_);); BHMsgHead reply_head; bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); - r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); - if (r && IsSuccess(reply_body.errmsg().errcode())) { - info_ = body; - return true; + if (r) { + CheckResult(reply, reply_head, reply_body); } - return false; + return IsRegistered(); } } bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { + if (!IsRegistered()) { return false; } + auto &sock = SockNode(); MsgHeartbeat body; - *body.mutable_proc() = proc; + body.mutable_proc()->Swap(&proc); auto head(InitMsgHead(GetType(body), body.proc().proc_id())); AddRoute(head, sock.id()); @@ -120,7 +142,8 @@ bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) { - //TODO check registered + if (!IsRegistered()) { return false; } + auto &sock = SockServer(); MsgRegisterRPC body; body.mutable_topics()->Swap(&topics); @@ -155,11 +178,8 @@ for (int i = 0; i < head.route_size() - 1; ++i) { reply_head.add_route()->Swap(head.mutable_route(i)); } - MsgI msg; - if (msg.Make(sock.shm(), reply_head, reply_body)) { - auto &remote = head.route().rbegin()->mq_id(); - sock.Send(remote.data(), msg); - } + auto &remote = head.route().rbegin()->mq_id(); + sock.Send(remote.data(), reply_head, reply_body); } }; @@ -169,6 +189,8 @@ bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) { + if (!IsRegistered()) { return false; } + auto &sock = SockServer(); MsgI imsg; @@ -188,6 +210,8 @@ bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body) { + if (!IsRegistered()) { return false; } + auto &sock = SockServer(); SrcInfo *p = static_cast<SrcInfo *>(src_info); @@ -211,7 +235,7 @@ if (head.type() == kMsgTypeRequestTopicReply) { MsgRequestTopicReply reply; if (imsg.ParseBody(reply)) { - cb(head.proc_id(), reply); + cb(head, reply); } } }; @@ -219,37 +243,60 @@ return SockRequest().Start(onData, nworker); } -bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const RequestResultCB &cb) +bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb) { - auto Call = [&](const void *remote) { - auto &sock = SockRequest(); + if (!IsRegistered()) { return false; } - BHMsgHead head(InitMsgHead(GetType(req), proc_id())); + const std::string &msg_id(NewMsgId()); + + out_msg_id = msg_id; + + auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) { + auto &sock = SockClient(); + BHMsgHead head(InitMsgHead(GetType(req), proc_id(), msg_id)); AddRoute(head, sock.id()); + head.set_topic(req.topic()); if (cb) { auto onRecv = [cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { if (head.type() == kMsgTypeRequestTopicReply) { MsgRequestTopicReply reply; if (imsg.ParseBody(reply)) { - cb(head.proc_id(), reply); + cb(head, reply); } } }; - return sock.Send(remote, head, req, onRecv); + return sock.Send(addr.mq_id().data(), head, req, onRecv); } else { - return sock.Send(remote, head, req); + return sock.Send(addr.mq_id().data(), head, req); } }; try { + auto &sock = SockClient(); BHAddress addr; - if (ClientQueryRPCTopic(req.topic(), addr, 1000)) { - return Call(addr.mq_id().data()); - } else { - SetLastError(eNotFound, "remote not found."); - return false; + + if (topic_query_cache_.Find(req.topic(), addr)) { + return SendTo(addr, req, cb); } + + MsgQueryTopic query; + query.set_topic(req.topic()); + BHMsgHead head(InitMsgHead(GetType(query), proc_id())); + AddRoute(head, sock.id()); + + auto onQueryResult = [this, SendTo, req, cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + MsgQueryTopicReply rep; + if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) { + auto &addr = rep.address(); + if (!addr.mq_id().empty()) { + topic_query_cache_.Update(req.topic(), addr); + SendTo(addr, req, cb); + } + } + }; + return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult); + } catch (...) { return false; } @@ -257,6 +304,8 @@ bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms) { + if (!IsRegistered()) { return false; } + try { auto &sock = SockRequest(); @@ -264,6 +313,7 @@ if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { BHMsgHead head(InitMsgHead(GetType(request), proc_id())); AddRoute(head, sock.id()); + head.set_topic(request.topic()); MsgI reply_msg; DEFER1(reply_msg.Release(shm_);); @@ -288,6 +338,8 @@ bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) { + if (!IsRegistered()) { return false; } + auto &sock = SockRequest(); if (topic_query_cache_.Find(topic, addr)) { @@ -325,6 +377,8 @@ bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms) { + if (!IsRegistered()) { return false; } + try { auto &sock = SockPub(); BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); @@ -349,8 +403,10 @@ // subscribe -bool TopicNode::Subscribe(MsgTopicList &topics, const int timeout_ms) +bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) { + if (!IsRegistered()) { return false; } + try { auto &sock = SockSub(); MsgSubscribe sub; @@ -364,7 +420,6 @@ MsgI reply; DEFER1(reply.Release(shm());); BHMsgHead reply_head; - MsgCommonReply reply_body; return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body) && @@ -396,6 +451,8 @@ bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms) { + if (!IsRegistered()) { return false; } + auto &sock = SockSub(); MsgI msg; DEFER1(msg.Release(shm());); diff --git a/src/topic_node.h b/src/topic_node.h index 0627930..8c3c48e 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -29,7 +29,7 @@ class TopicNode { SharedMemory &shm_; - MsgRegister info_; + ProcInfo info_; SharedMemory &shm() { return shm_; } @@ -51,9 +51,9 @@ bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply); // topic client - typedef std::function<void(const std::string &proc_id, const MsgRequestTopicReply &reply)> RequestResultCB; + typedef std::function<void(const BHMsgHead &head, const MsgRequestTopicReply &reply)> RequestResultCB; bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2); - bool ClientAsyncRequest(const MsgRequestTopic &request, const RequestResultCB &rrcb = RequestResultCB()); + bool ClientAsyncRequest(const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB()); bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms); // publish @@ -62,15 +62,15 @@ // subscribe typedef std::function<void(const std::string &proc_id, const MsgPublish &data)> SubDataCB; bool SubscribeStartWorker(const SubDataCB &tdcb, int nworker = 2); - bool Subscribe(MsgTopicList &topics, const int timeout_ms); + bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms); bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms); - void Start(ServerCB const &server_cb, SubDataCB const &sub_cb); + void Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2); void Stop(); private: bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); - const std::string &proc_id() { return info_.proc().proc_id(); } + const std::string &proc_id() { return info_.proc_id(); } typedef bhome_msg::BHAddress Address; class TopicQueryCache @@ -118,7 +118,9 @@ auto &SockClient() { return SockRequest(); } auto &SockReply() { return sock_reply_; } auto &SockServer() { return SockReply(); } + bool IsRegistered() const { return registered_.load(); } + std::atomic<bool> registered_; ShmSocket sock_node_; ShmSocket sock_request_; ShmSocket sock_reply_; diff --git a/utest/api_test.cpp b/utest/api_test.cpp index 113bb99..cff2cc5 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -17,11 +17,73 @@ */ #include "bh_api.h" #include "util.h" +#include <atomic> -class DemoClient +using namespace bhome::msg; + +namespace { -public: +typedef std::atomic<uint64_t> Number; + +struct MsgStatus { + Number nrequest_; + Number nreply_; + Number nserved_; + MsgStatus() : + nrequest_(0), nreply_(0), nserved_(0) {} }; + +MsgStatus &Status() +{ + static MsgStatus st; + return st; +} +} // namespace + +void SubRecvProc(const void *proc_id, + const int proc_id_len, + const void *data, + const int data_len) +{ + std::string proc((const char *) proc_id, proc_id_len); + MsgPublish pub; + pub.ParseFromArray(data, data_len); + // printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str()); +} + +void ServerProc(const void *proc_id, + const int proc_id_len, + const void *data, + const int data_len, + BHServerCallbackTag *tag) +{ + // printf("ServerProc: "); + // DEFER1(printf("\n");); + MsgRequestTopic request; + if (request.ParseFromArray(data, data_len)) { + MsgRequestTopicReply reply; + reply.set_data(" reply: " + request.data()); + std::string s(reply.SerializeAsString()); + // printf("%s", reply.data().c_str()); + BHServerCallbackReply(tag, s.data(), s.size()); + ++Status().nserved_; + } +} + +void ClientProc(const void *proc_id, + const int proc_id_len, + const void *msg_id, + const int msg_id_len, + const void *data, + const int data_len) +{ + std::string proc((const char *) proc_id, proc_id_len); + MsgRequestTopicReply reply; + if (reply.ParseFromArray(data, data_len)) { + ++Status().nreply_; + } + // printf("client Recv reply : %s\n", reply.data().c_str()); +} BOOST_AUTO_TEST_CASE(ApiTest) { @@ -36,19 +98,125 @@ nsec, nhour, nday, years); std::chrono::steady_clock::duration a(123456); printf("nowsec: %ld\n", NowSec()); - // for (int i = 0; i < 5; ++i) { - // std::this_thread::sleep_for(1s); - // printf("nowsec: %ld\n", NowSec()); - // } printf("maxsec: %ld\n", CountSeconds(max_time)); - ProcInfo proc; - proc.set_proc_id("demo_client"); - proc.set_public_info("public info of demo_client. etc..."); - std::string proc_buf(proc.SerializeAsString()); - void *reply = 0; - int reply_len = 0; - bool r = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 1000); - printf("register %s\n", r ? "ok" : "failed"); + bool reg = false; + for (int i = 0; i < 10 && !reg; ++i) { + ProcInfo proc; + proc.set_proc_id("demo_client"); + proc.set_public_info("public info of demo_client. etc..."); + std::string proc_buf(proc.SerializeAsString()); + void *reply = 0; + int reply_len = 0; + reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000); + printf("register %s\n", reg ? "ok" : "failed"); + + BHFree(reply, reply_len); + Sleep(1s); + } + + const std::string topic_ = "topic_"; + + { + MsgTopicList topics; + for (int i = 0; i < 10; ++i) { + topics.add_topic_list(topic_ + std::to_string(i)); + } + std::string s = topics.SerializeAsString(); + void *reply = 0; + int reply_len = 0; + bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000); + BHFree(reply, reply_len); + // printf("register topic : %s\n", r ? "ok" : "failed"); + Sleep(1s); + } + + { + MsgTopicList topics; + for (int i = 0; i < 10; ++i) { + topics.add_topic_list(topic_ + std::to_string(i * 2)); + } + std::string s = topics.SerializeAsString(); + void *reply = 0; + int reply_len = 0; + bool r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000); + BHFree(reply, reply_len); + printf("subscribe topic : %s\n", r ? "ok" : "failed"); + } + + BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); + + { + for (int i = 0; i < 1; ++i) { + MsgPublish pub; + pub.set_topic(topic_ + std::to_string(i)); + pub.set_data("pub_data_" + std::string(1024 * 1024, 'a')); + std::string s(pub.SerializeAsString()); + BHPublish(s.data(), s.size(), 0); + // Sleep(1s); + } + } + + auto asyncRequest = [&](uint64_t nreq) { + for (uint64_t i = 0; i < nreq; ++i) { + MsgRequestTopic req; + req.set_topic(topic_ + std::to_string(0)); + req.set_data("request_data_" + std::to_string(i)); + std::string s(req.SerializeAsString()); + void *msg_id = 0; + int len = 0; + bool r = BHAsyncRequest(s.data(), s.size(), 0, 0); + DEFER1(BHFree(msg_id, len);); + if (r) { + ++Status().nrequest_; + } else { + printf("request topic : %s\n", r ? "ok" : "failed"); + } + } + }; + auto showStatus = [](std::atomic<bool> *run) { + int64_t last = 0; + while (*run) { + auto &st = Status(); + std::this_thread::sleep_for(1s); + int cur = st.nreply_.load(); + printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld, speed %8ld\n", st.nrequest_.load(), st.nserved_.load(), cur, cur - last); + last = cur; + } + }; + auto hb = [](std::atomic<bool> *run) { + while (*run) { + BHHeartBeatEasy(0); + std::this_thread::sleep_for(1s); + } + }; + std::atomic<bool> run(true); + ThreadManager threads; + boost::timer::auto_cpu_timer timer; + threads.Launch(hb, &run); + // threads.Launch(showStatus, &run); + int ncli = 10; + const uint64_t nreq = 1000 * 100; + for (int i = 0; i < ncli; ++i) { + threads.Launch(asyncRequest, nreq); + } + + int same = 0; + int64_t last = 0; + while (last < nreq * ncli && same < 3) { + Sleep(1s); + auto cur = Status().nreply_.load(); + if (last == cur) { + ++same; + } else { + last = cur; + same = 0; + } + } + + run = false; + threads.WaitAll(); + auto &st = Status(); + printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load()); } \ No newline at end of file diff --git a/utest/utest.cpp b/utest/utest.cpp index 817cbaf..12d4396 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -99,7 +99,7 @@ BHCenter center(shm); center.Start(); - std::this_thread::sleep_for(100ms); + Sleep(100ms); std::atomic<uint64_t> total_count(0); std::atomic<ptime> last_time(Now() - seconds(1)); @@ -113,7 +113,8 @@ for (auto &t : topics) { tlist.add_topic_list(t); } - bool r = client.Subscribe(tlist, timeout); + MsgCommonReply reply_body; + bool r = client.Subscribe(tlist, reply_body, timeout); if (!r) { printf("client subscribe failed.\n"); } @@ -149,7 +150,7 @@ MsgPublish pub; pub.set_topic(topic); pub.set_data(data); - bool r = provider.Publish(pub, timeout); + bool r = provider.Publish(pub, 0); if (!r) { static std::atomic<int> an(0); int n = ++an; @@ -169,7 +170,7 @@ part.push_back(topics[i]); threads.Launch(Sub, i, topics); } - std::this_thread::sleep_for(100ms); + Sleep(100ms); for (auto &topic : topics) { threads.Launch(Pub, topic); } @@ -217,7 +218,7 @@ std::atomic<int> count(0); std::string reply; - auto onRecv = [&](const std::string &proc_id, const MsgRequestTopicReply &msg) { + auto onRecv = [&](const BHMsgHead &head, const MsgRequestTopicReply &msg) { reply = msg.data(); if (++count >= nreq) { printf("count: %d\n", count.load()); @@ -229,7 +230,8 @@ MsgRequestTopic req; req.set_topic(topic); req.set_data("data " + std::to_string(i)); - if (!client.ClientAsyncRequest(req)) { + std::string msg_id; + if (!client.ClientAsyncRequest(req, msg_id)) { printf("client request failed\n"); ++count; } @@ -274,9 +276,9 @@ ThreadManager clients, servers; std::vector<Topic> topics = {"topic1", "topic2"}; servers.Launch(Server, "server", topics); - std::this_thread::sleep_for(100ms); + Sleep(100ms); for (auto &t : topics) { - clients.Launch(Client, t, 1000 * 1); + clients.Launch(Client, t, 1000 * 100); } clients.WaitAll(); printf("clients done, server replyed: %ld\n", server_msg_count.load()); @@ -302,18 +304,16 @@ }; Check(); for (int i = 0; i < 3; ++i) { - std::this_thread::sleep_for(1s); + Sleep(1s); Check(); } - printf("sleep 4\n"); - std::this_thread::sleep_for(4s); + Sleep(4s); for (int i = 0; i < 2; ++i) { - std::this_thread::sleep_for(1s); + Sleep(1s); Check(); } } - printf("sleep 8\n"); - std::this_thread::sleep_for(8s); + Sleep(8s); } inline int MyMin(int a, int b) { diff --git a/utest/util.h b/utest/util.h index aaa5189..7f41da9 100644 --- a/utest/util.h +++ b/utest/util.h @@ -38,6 +38,13 @@ using namespace std::chrono_literals; +template <class D> +inline void Sleep(D d) +{ + printf("sleep for %ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(d).count()); + std::this_thread::sleep_for(d); +} + typedef std::function<void(void)> FuncVV; class ScopeCall : private boost::noncopyable -- Gitblit v1.8.0