From 1f3729698a131b3f701f67adb6a1258aa1235dce Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 20 四月 2021 15:43:53 +0800 Subject: [PATCH] api server callback change tag to src; refactor. --- utest/speed_test.cpp | 74 +++++--------- api/go/bhome_node.go | 9 - src/socket.h | 12 +- utest/api_test.cpp | 4 utest/util.h | 1 .vscode/settings.json | 8 + src/socket.cpp | 2 src/topic_node.h | 38 +++---- src/bh_api.h | 6 - utest/utest.cpp | 22 +++- src/bh_api.cpp | 29 ----- src/topic_node.cpp | 36 +++++- 12 files changed, 109 insertions(+), 132 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 88753a7..4003476 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -65,8 +65,14 @@ }, "files.exclude": { "**/*.un~": true, + "**/bhshmq_center": true, + "**/bhshmq_status": true, + "**/bhshmqbox": true, + "**/gmon.out": true, + "api/go/bhome_msg": true, "build/": true, - "debug/": true + "debug/": true, + "utest/utest": true }, "cmake.configureOnOpen": false, "C_Cpp.default.includePath": [ diff --git a/api/go/bhome_node.go b/api/go/bhome_node.go index c950750..f4db490 100644 --- a/api/go/bhome_node.go +++ b/api/go/bhome_node.go @@ -147,16 +147,11 @@ } -func ServerCallbackReply(tag unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool { - data, _ := rep.Marshal() - return C.BHServerCallbackReply(tag, unsafe.Pointer(&data[0]), C.int(len(data))) > 0 -} - -type ServecCB func(proc_id *string, req *bh.MsgRequestTopic, reply *bh.MsgRequestTopicReply) bool +type ServecCB func(src unsafe.Pointer, proc_id *string, req *bh.MsgRequestTopic) type SubDataCB func(proc_id *string, pub *bh.MsgPublish) type ClientCB func(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply) -func cserver_callback(cpid *unsafe.Pointer, cpid_len unsafe.Pointer) { +func cserver_callback(cpid unsafe.Pointer, pid_len C.int, src unsafe.Pointer) { } func StartWorker(server_cb ServecCB, sub_cb SubDataCB, client_cb ClientCB) { diff --git a/src/bh_api.cpp b/src/bh_api.cpp index 3844000..cdf2e96 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -246,31 +246,15 @@ return ProcNode().ServerSendReply(src, rep); } -int BHCleanUp() -{ - return 0; -} - -namespace -{ -typedef std::function<bool(const void *, const int)> ServerSender; -} // namespace - void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb) { - TopicNode::ServerCB on_req; + TopicNode::ServerAsyncCB on_req; TopicNode::SubDataCB on_sub; TopicNode::RequestResultCB on_reply; if (server_cb) { - on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { + on_req = [server_cb](void *src, std::string &proc_id, const MsgRequestTopic &request) { std::string sreq(request.SerializeAsString()); - bool r = false; - ServerSender sender = [&](const void *p, const int len) { - r = reply.ParseFromArray(p, len); - return r; - }; - server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), &sender); - return r; + server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), src); }; } if (sub_cb) { @@ -289,13 +273,6 @@ } ProcNode().Start(on_req, on_sub, on_reply); -} -int BHServerCallbackReply(const void *tag, - const void *data, - const int data_len) -{ - auto &sender = *(const ServerSender *) (tag); - return sender(data, data_len); } void BHFree(void *data, int size) diff --git a/src/bh_api.h b/src/bh_api.h index 39b4cc6..2b3e15f 100644 --- a/src/bh_api.h +++ b/src/bh_api.h @@ -45,7 +45,7 @@ const int proc_id_len, const void *data, const int data_len, - const void *tag); + void *src); typedef void (*FClientCallback)(const void *proc_id, const int proc_id_len, @@ -55,10 +55,6 @@ const int data_len); void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb); - -int BHServerCallbackReply(const void *tag, - const void *data, - const int data_len); int BHHeartbeatEasy(const int timeout_ms); int BHHeartbeat(const void *proc_info, diff --git a/src/socket.cpp b/src/socket.cpp index aec42b4..1315474 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -52,7 +52,7 @@ auto DoRecv = [=] { auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { RecvCB cb; - if (per_msg_cbs_->Find(head.msg_id(), cb)) { + if (per_msg_cbs_->Pick(head.msg_id(), cb)) { cb(socket, imsg, head); } else if (onData) { onData(socket, imsg, head); diff --git a/src/socket.h b/src/socket.h index 493aeb4..bbf3851 100644 --- a/src/socket.h +++ b/src/socket.h @@ -66,17 +66,17 @@ size_t Pending() const { return mq().Pending(); } template <class Body> - bool Send(const void *valid_remote, BHMsgHead &head, Body &body, const RecvCB &cb = RecvCB()) + bool Send(const void *valid_remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) { try { if (!cb) { return SendImpl(valid_remote, MsgI::Serialize(head, body)); } else { std::string msg_id(head.msg_id()); - per_msg_cbs_->Add(msg_id, cb); + per_msg_cbs_->Store(msg_id, std::move(cb)); auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { RecvCB cb_no_use; - per_msg_cbs_->Find(msg_id, cb_no_use); + per_msg_cbs_->Pick(msg_id, cb_no_use); }; return SendImpl(valid_remote, MsgI::Serialize(head, body), onExpireRemoveCB); } @@ -117,7 +117,7 @@ }; std::unique_lock<std::mutex> lk(st->mutex); - bool sendok = Send(remote, head, body, OnRecv); + bool sendok = Send(remote, head, body, std::move(OnRecv)); if (!sendok) { printf("send timeout\n"); } @@ -154,8 +154,8 @@ public: bool empty() const { return store_.empty(); } - bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; } - bool Find(const std::string &id, RecvCB &cb) + bool Store(const std::string &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; } + bool Pick(const std::string &id, RecvCB &cb) { auto pos = store_.find(id); if (pos != store_.end()) { diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 9853f35..16c565b 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -54,7 +54,7 @@ } } -void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) +void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) { if (nworker < 1) { nworker = 1; @@ -178,7 +178,7 @@ } } -bool TopicNode::ServerStart(const ServerCB &rcb, int nworker) +bool TopicNode::ServerStart(const ServerSyncCB &rcb, int nworker) { auto onRecv = [this, rcb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } @@ -199,6 +199,23 @@ auto &sock = SockServer(); return rcb && sock.Start(onRecv, nworker); +} + +bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker) +{ + auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; } + MsgRequestTopic req; + if (!imsg.ParseBody(req)) { return; } + + SrcInfo *p = new SrcInfo; + p->route.assign(head.route().begin(), head.route().end()); + p->msg_id = head.msg_id(); + acb(p, *head.mutable_proc_id(), req); + }; + + auto &sock = SockServer(); + return acb && sock.Start(onRecv, nworker); } bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) @@ -296,13 +313,15 @@ }; try { - auto &sock = SockClient(); BHAddress addr; - - if (topic_query_cache_.Find(req.topic(), addr)) { +#if 1 + return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb); +#else + if (topic_query_cache_.Pick(req.topic(), addr)) { return SendTo(addr, req, cb); } + auto &sock = SockClient(); MsgQueryTopic query; query.set_topic(req.topic()); BHMsgHead head(InitMsgHead(GetType(query), proc_id())); @@ -313,12 +332,13 @@ if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) { auto &addr = rep.address(); if (!addr.mq_id().empty()) { - topic_query_cache_.Update(req.topic(), addr); + topic_query_cache_.Store(req.topic(), addr); SendTo(addr, req, cb); } } }; - return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult); + return sock.Send(&BHTopicCenterAddress(), head, query, std::move(onQueryResult)); +#endif } catch (...) { SetLastError(eError, "internal error."); @@ -391,7 +411,7 @@ if (addr.mq_id().empty()) { return false; } else { - topic_query_cache_.Update(topic, addr); + topic_query_cache_.Store(topic, addr); return true; } } diff --git a/src/topic_node.h b/src/topic_node.h index 87ad770..35cdde5 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -34,7 +34,6 @@ SharedMemory &shm() { return shm_; } public: - typedef std::function<void(std::string &proc_id, const void *data, const int len)> DataCB; TopicNode(SharedMemory &shm); ~TopicNode(); @@ -44,8 +43,10 @@ bool Heartbeat(const int timeout_ms); // topic rpc server - typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerCB; - bool ServerStart(ServerCB const &cb, const int nworker = 2); + typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB; + typedef std::function<void(void *src_info, std::string &client_proc_id, MsgRequestTopic &request)> ServerAsyncCB; + bool ServerStart(ServerSyncCB const &cb, const int nworker = 2); + bool ServerStart(ServerAsyncCB const &cb, const int nworker = 2); bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms); bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms); bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply); @@ -65,7 +66,7 @@ bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms); bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms); - void Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2); + void Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2); void Stop(); private: @@ -77,45 +78,40 @@ { class Impl { - typedef std::unordered_map<Topic, Address> Store; - Store store_; + typedef std::unordered_map<Topic, Address> Records; + Records records_; public: bool Find(const Topic &topic, Address &addr) { - auto pos = store_.find(topic); - if (pos != store_.end()) { + auto pos = records_.find(topic); + if (pos != records_.end()) { addr = pos->second; return true; } else { return false; } } - bool Update(const Topic &topic, const Address &addr) + bool Store(const Topic &topic, const Address &addr) { - store_[topic] = addr; + records_[topic] = addr; return true; } }; Synced<Impl> impl_; - // Impl &impl() - // { - // thread_local Impl impl; - // return impl; - // } public: bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); } - bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); } + bool Store(const Topic &topic, const Address &addr) { return impl_->Store(topic, addr); } }; // some sockets may be the same one, using functions make it easy to change. - auto &SockNode() { return sock_node_; } - auto &SockPub() { return SockNode(); } - auto &SockSub() { return sock_sub_; } - auto &SockClient() { return sock_client_; } - auto &SockServer() { return sock_server_; } + ShmSocket &SockNode() { return sock_node_; } + ShmSocket &SockPub() { return SockNode(); } + ShmSocket &SockSub() { return sock_sub_; } + ShmSocket &SockClient() { return sock_client_; } + ShmSocket &SockServer() { return sock_server_; } bool IsRegistered() const { return registered_.load(); } ShmSocket sock_node_; diff --git a/utest/api_test.cpp b/utest/api_test.cpp index b2c00a4..ae3f10a 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -66,7 +66,7 @@ const int proc_id_len, const void *data, const int data_len, - const void *tag) + void *src) { // printf("ServerProc: "); // DEFER1(printf("\n");); @@ -76,7 +76,7 @@ reply.set_data(" reply: " + request.data()); std::string s(reply.SerializeAsString()); // printf("%s", reply.data().c_str()); - BHServerCallbackReply(tag, s.data(), s.size()); + BHSendReply(src, s.data(), s.size()); ++Status().nserved_; } } diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index 52fe824..86367b9 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -119,40 +119,38 @@ const std::string shm_name("ShmSendRecv"); ShmRemover auto_remove(shm_name); const int qlen = 64; - const size_t msg_length = 1000; + const size_t msg_length = 100; std::string msg_content(msg_length, 'a'); msg_content[20] = '\0'; const std::string client_proc_id = "client_proc"; const std::string server_proc_id = "server_proc"; - SharedMemory shm(shm_name, 1024 * 1024 * 50); + SharedMemory shm(shm_name, 1024 * 1024 * 512); auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); ShmSocket srv(shm, qlen); ShmSocket cli(shm, qlen); - MsgI request_rc; - MsgRequestTopic req_body; - req_body.set_topic("topic"); - req_body.set_data(msg_content); - auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); - req_head.add_route()->set_mq_id(&cli.id(), cli.id().size()); - request_rc.MakeRC(shm, req_head, req_body); - DEFER1(request_rc.Release(shm)); - - MsgRequestTopic reply_body; - reply_body.set_topic("topic"); - reply_body.set_data(msg_content); - auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id)); - reply_head.add_route()->set_mq_id(&srv.id(), srv.id().size()); - MsgI reply_rc; - reply_rc.MakeRC(shm, reply_head, reply_body); - DEFER1(reply_rc.Release(shm)); - + int ncli = 1; + uint64_t nmsg = 1000 * 1000 * 1; std::atomic<uint64_t> count(0); - std::atomic<ptime> last_time(Now() - seconds(1)); + std::atomic<int64_t> last_time(NowSec() - 1); std::atomic<uint64_t> last_count(0); + + auto PrintStatus = [&](int64_t cur) { + std::cout << "time: " << cur; + printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld\n", + count.load(), count - last_count.exchange(count), init_avail - Avail()); + }; + auto onRecv = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { + ++count; + auto cur = NowSec(); + if (last_time.exchange(cur) < cur) { + PrintStatus(cur); + } + }; + cli.Start(onRecv, 2); auto Client = [&](int cli_id, int nmsg) { for (int i = 0; i < nmsg; ++i) { @@ -161,28 +159,11 @@ req_body.set_topic("topic"); req_body.set_data(msg_content); auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); + req_head.add_route()->set_mq_id(&cli.id(), cli.id().size()); return cli.Send(&srv.id(), req_head, req_body); }; - auto ReqRC = [&]() { return cli.Send(&srv.id(), request_rc); }; - if (!ReqRC()) { - printf("********** client send error.\n"); - continue; - } - MsgI msg; - BHMsgHead head; - if (!cli.SyncRecv(msg, head, 1000)) { - printf("********** client recv error.\n"); - } else { - DEFER1(msg.Release(shm)); - ++count; - auto cur = Now(); - if (last_time.exchange(cur) < cur) { - std::cout << "time: " << cur; - printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n", - count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count()); - } - } + Req(); } }; @@ -206,10 +187,7 @@ auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id())); return srv.Send(&src_id, reply_head, reply_body); }; - auto ReplyRC = [&]() { return srv.Send(&src_id, reply_rc); }; - - if (ReplyRC()) { - } + Reply(); } } } @@ -219,13 +197,15 @@ DEFER1(printf("Request Reply Test:");); ThreadManager clients, servers; - for (int i = 0; i < qlen; ++i) { servers.Launch(Server); } - int ncli = 100 * 1; - uint64_t nmsg = 100 * 100 * 2; + for (int i = 0; i < 2; ++i) { servers.Launch(Server); } printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg); for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); } clients.WaitAll(); printf("request ok: %ld\n", count.load()); + do { + std::this_thread::sleep_for(100ms); + } while (count.load() < ncli * nmsg); + PrintStatus(NowSec()); stop = true; servers.WaitAll(); // BOOST_CHECK_THROW(reply.Count(), int); diff --git a/utest/utest.cpp b/utest/utest.cpp index 572d8e5..b2de97f 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -102,7 +102,7 @@ Sleep(100ms); std::atomic<uint64_t> total_count(0); - std::atomic<ptime> last_time(Now() - seconds(1)); + std::atomic<int64_t> last_time(NowSec() - 1); std::atomic<uint64_t> last_count(0); const uint64_t nmsg = 100 * 2; @@ -125,7 +125,7 @@ auto OnTopicData = [&](const std::string &proc_id, const MsgPublish &pub) { ++total_count; - auto cur = Now(); + auto cur = NowSec(); if (last_time.exchange(cur) < cur) { std::cout << "time: " << cur; printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", @@ -177,7 +177,7 @@ threads.Launch(Pub, "some_else"); threads.WaitAll(); - std::cout << "end : " << Now(); + printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail()); } @@ -227,7 +227,9 @@ MsgRequestTopic req; req.set_topic(topic); req.set_data("data " + std::string(100, 'a')); + client.ClientStartWorker(onRecv, 2); + boost::timer::auto_cpu_timer timer; for (int i = 0; i < nreq; ++i) { std::string msg_id; @@ -244,7 +246,7 @@ // ++count; } do { - std::this_thread::yield(); + std::this_thread::sleep_for(100ms); } while (count.load() < nreq); client.Stop(); printf("request %s %d done ", topic.c_str(), count.load()); @@ -254,12 +256,18 @@ auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { DemoNode server(name, shm); - auto onData = [&](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { + auto onDataSync = [&](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { ++server_msg_count; reply.set_data(request.topic() + ':' + request.data()); return true; }; - server.ServerStart(onData); + auto onDataAsync = [&](void *src, std::string &proc_id, MsgRequestTopic &request) { + ++server_msg_count; + MsgRequestTopicReply reply; + reply.set_data(request.topic() + ':' + request.data()); + server.ServerSendReply(src, reply); + }; + server.ServerStart(onDataAsync); MsgTopicList rpc; for (auto &topic : topics) { @@ -272,7 +280,7 @@ } while (run) { - std::this_thread::yield(); + std::this_thread::sleep_for(100ms); } }; ThreadManager clients, servers; diff --git a/utest/util.h b/utest/util.h index f31d63f..4d960db 100644 --- a/utest/util.h +++ b/utest/util.h @@ -34,7 +34,6 @@ #include <vector> using namespace boost::posix_time; -inline ptime Now() { return second_clock::universal_time(); }; using namespace std::chrono_literals; -- Gitblit v1.8.0