From 1f3729698a131b3f701f67adb6a1258aa1235dce Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 20 四月 2021 15:43:53 +0800
Subject: [PATCH] api server callback change tag to src; refactor.
---
utest/speed_test.cpp | 74 +++++---------
api/go/bhome_node.go | 9 -
src/socket.h | 12 +-
utest/api_test.cpp | 4
utest/util.h | 1
.vscode/settings.json | 8 +
src/socket.cpp | 2
src/topic_node.h | 38 +++----
src/bh_api.h | 6 -
utest/utest.cpp | 22 +++-
src/bh_api.cpp | 29 -----
src/topic_node.cpp | 36 +++++-
12 files changed, 109 insertions(+), 132 deletions(-)
diff --git a/.vscode/settings.json b/.vscode/settings.json
index 88753a7..4003476 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -65,8 +65,14 @@
},
"files.exclude": {
"**/*.un~": true,
+ "**/bhshmq_center": true,
+ "**/bhshmq_status": true,
+ "**/bhshmqbox": true,
+ "**/gmon.out": true,
+ "api/go/bhome_msg": true,
"build/": true,
- "debug/": true
+ "debug/": true,
+ "utest/utest": true
},
"cmake.configureOnOpen": false,
"C_Cpp.default.includePath": [
diff --git a/api/go/bhome_node.go b/api/go/bhome_node.go
index c950750..f4db490 100644
--- a/api/go/bhome_node.go
+++ b/api/go/bhome_node.go
@@ -147,16 +147,11 @@
}
-func ServerCallbackReply(tag unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool {
- data, _ := rep.Marshal()
- return C.BHServerCallbackReply(tag, unsafe.Pointer(&data[0]), C.int(len(data))) > 0
-}
-
-type ServecCB func(proc_id *string, req *bh.MsgRequestTopic, reply *bh.MsgRequestTopicReply) bool
+type ServecCB func(src unsafe.Pointer, proc_id *string, req *bh.MsgRequestTopic)
type SubDataCB func(proc_id *string, pub *bh.MsgPublish)
type ClientCB func(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply)
-func cserver_callback(cpid *unsafe.Pointer, cpid_len unsafe.Pointer) {
+func cserver_callback(cpid unsafe.Pointer, pid_len C.int, src unsafe.Pointer) {
}
func StartWorker(server_cb ServecCB, sub_cb SubDataCB, client_cb ClientCB) {
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 3844000..cdf2e96 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -246,31 +246,15 @@
return ProcNode().ServerSendReply(src, rep);
}
-int BHCleanUp()
-{
- return 0;
-}
-
-namespace
-{
-typedef std::function<bool(const void *, const int)> ServerSender;
-} // namespace
-
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb)
{
- TopicNode::ServerCB on_req;
+ TopicNode::ServerAsyncCB on_req;
TopicNode::SubDataCB on_sub;
TopicNode::RequestResultCB on_reply;
if (server_cb) {
- on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
+ on_req = [server_cb](void *src, std::string &proc_id, const MsgRequestTopic &request) {
std::string sreq(request.SerializeAsString());
- bool r = false;
- ServerSender sender = [&](const void *p, const int len) {
- r = reply.ParseFromArray(p, len);
- return r;
- };
- server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), &sender);
- return r;
+ server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), src);
};
}
if (sub_cb) {
@@ -289,13 +273,6 @@
}
ProcNode().Start(on_req, on_sub, on_reply);
-}
-int BHServerCallbackReply(const void *tag,
- const void *data,
- const int data_len)
-{
- auto &sender = *(const ServerSender *) (tag);
- return sender(data, data_len);
}
void BHFree(void *data, int size)
diff --git a/src/bh_api.h b/src/bh_api.h
index 39b4cc6..2b3e15f 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -45,7 +45,7 @@
const int proc_id_len,
const void *data,
const int data_len,
- const void *tag);
+ void *src);
typedef void (*FClientCallback)(const void *proc_id,
const int proc_id_len,
@@ -55,10 +55,6 @@
const int data_len);
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb);
-
-int BHServerCallbackReply(const void *tag,
- const void *data,
- const int data_len);
int BHHeartbeatEasy(const int timeout_ms);
int BHHeartbeat(const void *proc_info,
diff --git a/src/socket.cpp b/src/socket.cpp
index aec42b4..1315474 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -52,7 +52,7 @@
auto DoRecv = [=] {
auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
RecvCB cb;
- if (per_msg_cbs_->Find(head.msg_id(), cb)) {
+ if (per_msg_cbs_->Pick(head.msg_id(), cb)) {
cb(socket, imsg, head);
} else if (onData) {
onData(socket, imsg, head);
diff --git a/src/socket.h b/src/socket.h
index 493aeb4..bbf3851 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -66,17 +66,17 @@
size_t Pending() const { return mq().Pending(); }
template <class Body>
- bool Send(const void *valid_remote, BHMsgHead &head, Body &body, const RecvCB &cb = RecvCB())
+ bool Send(const void *valid_remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
{
try {
if (!cb) {
return SendImpl(valid_remote, MsgI::Serialize(head, body));
} else {
std::string msg_id(head.msg_id());
- per_msg_cbs_->Add(msg_id, cb);
+ per_msg_cbs_->Store(msg_id, std::move(cb));
auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
RecvCB cb_no_use;
- per_msg_cbs_->Find(msg_id, cb_no_use);
+ per_msg_cbs_->Pick(msg_id, cb_no_use);
};
return SendImpl(valid_remote, MsgI::Serialize(head, body), onExpireRemoveCB);
}
@@ -117,7 +117,7 @@
};
std::unique_lock<std::mutex> lk(st->mutex);
- bool sendok = Send(remote, head, body, OnRecv);
+ bool sendok = Send(remote, head, body, std::move(OnRecv));
if (!sendok) {
printf("send timeout\n");
}
@@ -154,8 +154,8 @@
public:
bool empty() const { return store_.empty(); }
- bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; }
- bool Find(const std::string &id, RecvCB &cb)
+ bool Store(const std::string &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; }
+ bool Pick(const std::string &id, RecvCB &cb)
{
auto pos = store_.find(id);
if (pos != store_.end()) {
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 9853f35..16c565b 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -54,7 +54,7 @@
}
}
-void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker)
+void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker)
{
if (nworker < 1) {
nworker = 1;
@@ -178,7 +178,7 @@
}
}
-bool TopicNode::ServerStart(const ServerCB &rcb, int nworker)
+bool TopicNode::ServerStart(const ServerSyncCB &rcb, int nworker)
{
auto onRecv = [this, rcb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; }
@@ -199,6 +199,23 @@
auto &sock = SockServer();
return rcb && sock.Start(onRecv, nworker);
+}
+
+bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker)
+{
+ auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+ if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; }
+ MsgRequestTopic req;
+ if (!imsg.ParseBody(req)) { return; }
+
+ SrcInfo *p = new SrcInfo;
+ p->route.assign(head.route().begin(), head.route().end());
+ p->msg_id = head.msg_id();
+ acb(p, *head.mutable_proc_id(), req);
+ };
+
+ auto &sock = SockServer();
+ return acb && sock.Start(onRecv, nworker);
}
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
@@ -296,13 +313,15 @@
};
try {
- auto &sock = SockClient();
BHAddress addr;
-
- if (topic_query_cache_.Find(req.topic(), addr)) {
+#if 1
+ return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb);
+#else
+ if (topic_query_cache_.Pick(req.topic(), addr)) {
return SendTo(addr, req, cb);
}
+ auto &sock = SockClient();
MsgQueryTopic query;
query.set_topic(req.topic());
BHMsgHead head(InitMsgHead(GetType(query), proc_id()));
@@ -313,12 +332,13 @@
if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) {
auto &addr = rep.address();
if (!addr.mq_id().empty()) {
- topic_query_cache_.Update(req.topic(), addr);
+ topic_query_cache_.Store(req.topic(), addr);
SendTo(addr, req, cb);
}
}
};
- return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult);
+ return sock.Send(&BHTopicCenterAddress(), head, query, std::move(onQueryResult));
+#endif
} catch (...) {
SetLastError(eError, "internal error.");
@@ -391,7 +411,7 @@
if (addr.mq_id().empty()) {
return false;
} else {
- topic_query_cache_.Update(topic, addr);
+ topic_query_cache_.Store(topic, addr);
return true;
}
}
diff --git a/src/topic_node.h b/src/topic_node.h
index 87ad770..35cdde5 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -34,7 +34,6 @@
SharedMemory &shm() { return shm_; }
public:
- typedef std::function<void(std::string &proc_id, const void *data, const int len)> DataCB;
TopicNode(SharedMemory &shm);
~TopicNode();
@@ -44,8 +43,10 @@
bool Heartbeat(const int timeout_ms);
// topic rpc server
- typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerCB;
- bool ServerStart(ServerCB const &cb, const int nworker = 2);
+ typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB;
+ typedef std::function<void(void *src_info, std::string &client_proc_id, MsgRequestTopic &request)> ServerAsyncCB;
+ bool ServerStart(ServerSyncCB const &cb, const int nworker = 2);
+ bool ServerStart(ServerAsyncCB const &cb, const int nworker = 2);
bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms);
bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms);
bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply);
@@ -65,7 +66,7 @@
bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms);
bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms);
- void Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2);
+ void Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2);
void Stop();
private:
@@ -77,45 +78,40 @@
{
class Impl
{
- typedef std::unordered_map<Topic, Address> Store;
- Store store_;
+ typedef std::unordered_map<Topic, Address> Records;
+ Records records_;
public:
bool Find(const Topic &topic, Address &addr)
{
- auto pos = store_.find(topic);
- if (pos != store_.end()) {
+ auto pos = records_.find(topic);
+ if (pos != records_.end()) {
addr = pos->second;
return true;
} else {
return false;
}
}
- bool Update(const Topic &topic, const Address &addr)
+ bool Store(const Topic &topic, const Address &addr)
{
- store_[topic] = addr;
+ records_[topic] = addr;
return true;
}
};
Synced<Impl> impl_;
- // Impl &impl()
- // {
- // thread_local Impl impl;
- // return impl;
- // }
public:
bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); }
- bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); }
+ bool Store(const Topic &topic, const Address &addr) { return impl_->Store(topic, addr); }
};
// some sockets may be the same one, using functions make it easy to change.
- auto &SockNode() { return sock_node_; }
- auto &SockPub() { return SockNode(); }
- auto &SockSub() { return sock_sub_; }
- auto &SockClient() { return sock_client_; }
- auto &SockServer() { return sock_server_; }
+ ShmSocket &SockNode() { return sock_node_; }
+ ShmSocket &SockPub() { return SockNode(); }
+ ShmSocket &SockSub() { return sock_sub_; }
+ ShmSocket &SockClient() { return sock_client_; }
+ ShmSocket &SockServer() { return sock_server_; }
bool IsRegistered() const { return registered_.load(); }
ShmSocket sock_node_;
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index b2c00a4..ae3f10a 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -66,7 +66,7 @@
const int proc_id_len,
const void *data,
const int data_len,
- const void *tag)
+ void *src)
{
// printf("ServerProc: ");
// DEFER1(printf("\n"););
@@ -76,7 +76,7 @@
reply.set_data(" reply: " + request.data());
std::string s(reply.SerializeAsString());
// printf("%s", reply.data().c_str());
- BHServerCallbackReply(tag, s.data(), s.size());
+ BHSendReply(src, s.data(), s.size());
++Status().nserved_;
}
}
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 52fe824..86367b9 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -119,40 +119,38 @@
const std::string shm_name("ShmSendRecv");
ShmRemover auto_remove(shm_name);
const int qlen = 64;
- const size_t msg_length = 1000;
+ const size_t msg_length = 100;
std::string msg_content(msg_length, 'a');
msg_content[20] = '\0';
const std::string client_proc_id = "client_proc";
const std::string server_proc_id = "server_proc";
- SharedMemory shm(shm_name, 1024 * 1024 * 50);
+ SharedMemory shm(shm_name, 1024 * 1024 * 512);
auto Avail = [&]() { return shm.get_free_memory(); };
auto init_avail = Avail();
ShmSocket srv(shm, qlen);
ShmSocket cli(shm, qlen);
- MsgI request_rc;
- MsgRequestTopic req_body;
- req_body.set_topic("topic");
- req_body.set_data(msg_content);
- auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
- req_head.add_route()->set_mq_id(&cli.id(), cli.id().size());
- request_rc.MakeRC(shm, req_head, req_body);
- DEFER1(request_rc.Release(shm));
-
- MsgRequestTopic reply_body;
- reply_body.set_topic("topic");
- reply_body.set_data(msg_content);
- auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id));
- reply_head.add_route()->set_mq_id(&srv.id(), srv.id().size());
- MsgI reply_rc;
- reply_rc.MakeRC(shm, reply_head, reply_body);
- DEFER1(reply_rc.Release(shm));
-
+ int ncli = 1;
+ uint64_t nmsg = 1000 * 1000 * 1;
std::atomic<uint64_t> count(0);
- std::atomic<ptime> last_time(Now() - seconds(1));
+ std::atomic<int64_t> last_time(NowSec() - 1);
std::atomic<uint64_t> last_count(0);
+
+ auto PrintStatus = [&](int64_t cur) {
+ std::cout << "time: " << cur;
+ printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld\n",
+ count.load(), count - last_count.exchange(count), init_avail - Avail());
+ };
+ auto onRecv = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
+ ++count;
+ auto cur = NowSec();
+ if (last_time.exchange(cur) < cur) {
+ PrintStatus(cur);
+ }
+ };
+ cli.Start(onRecv, 2);
auto Client = [&](int cli_id, int nmsg) {
for (int i = 0; i < nmsg; ++i) {
@@ -161,28 +159,11 @@
req_body.set_topic("topic");
req_body.set_data(msg_content);
auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
+ req_head.add_route()->set_mq_id(&cli.id(), cli.id().size());
return cli.Send(&srv.id(), req_head, req_body);
};
- auto ReqRC = [&]() { return cli.Send(&srv.id(), request_rc); };
- if (!ReqRC()) {
- printf("********** client send error.\n");
- continue;
- }
- MsgI msg;
- BHMsgHead head;
- if (!cli.SyncRecv(msg, head, 1000)) {
- printf("********** client recv error.\n");
- } else {
- DEFER1(msg.Release(shm));
- ++count;
- auto cur = Now();
- if (last_time.exchange(cur) < cur) {
- std::cout << "time: " << cur;
- printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
- count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count());
- }
- }
+ Req();
}
};
@@ -206,10 +187,7 @@
auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
return srv.Send(&src_id, reply_head, reply_body);
};
- auto ReplyRC = [&]() { return srv.Send(&src_id, reply_rc); };
-
- if (ReplyRC()) {
- }
+ Reply();
}
}
}
@@ -219,13 +197,15 @@
DEFER1(printf("Request Reply Test:"););
ThreadManager clients, servers;
- for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
- int ncli = 100 * 1;
- uint64_t nmsg = 100 * 100 * 2;
+ for (int i = 0; i < 2; ++i) { servers.Launch(Server); }
printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
clients.WaitAll();
printf("request ok: %ld\n", count.load());
+ do {
+ std::this_thread::sleep_for(100ms);
+ } while (count.load() < ncli * nmsg);
+ PrintStatus(NowSec());
stop = true;
servers.WaitAll();
// BOOST_CHECK_THROW(reply.Count(), int);
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 572d8e5..b2de97f 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -102,7 +102,7 @@
Sleep(100ms);
std::atomic<uint64_t> total_count(0);
- std::atomic<ptime> last_time(Now() - seconds(1));
+ std::atomic<int64_t> last_time(NowSec() - 1);
std::atomic<uint64_t> last_count(0);
const uint64_t nmsg = 100 * 2;
@@ -125,7 +125,7 @@
auto OnTopicData = [&](const std::string &proc_id, const MsgPublish &pub) {
++total_count;
- auto cur = Now();
+ auto cur = NowSec();
if (last_time.exchange(cur) < cur) {
std::cout << "time: " << cur;
printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
@@ -177,7 +177,7 @@
threads.Launch(Pub, "some_else");
threads.WaitAll();
- std::cout << "end : " << Now();
+
printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail());
}
@@ -227,7 +227,9 @@
MsgRequestTopic req;
req.set_topic(topic);
req.set_data("data " + std::string(100, 'a'));
+
client.ClientStartWorker(onRecv, 2);
+
boost::timer::auto_cpu_timer timer;
for (int i = 0; i < nreq; ++i) {
std::string msg_id;
@@ -244,7 +246,7 @@
// ++count;
}
do {
- std::this_thread::yield();
+ std::this_thread::sleep_for(100ms);
} while (count.load() < nreq);
client.Stop();
printf("request %s %d done ", topic.c_str(), count.load());
@@ -254,12 +256,18 @@
auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
DemoNode server(name, shm);
- auto onData = [&](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
+ auto onDataSync = [&](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
++server_msg_count;
reply.set_data(request.topic() + ':' + request.data());
return true;
};
- server.ServerStart(onData);
+ auto onDataAsync = [&](void *src, std::string &proc_id, MsgRequestTopic &request) {
+ ++server_msg_count;
+ MsgRequestTopicReply reply;
+ reply.set_data(request.topic() + ':' + request.data());
+ server.ServerSendReply(src, reply);
+ };
+ server.ServerStart(onDataAsync);
MsgTopicList rpc;
for (auto &topic : topics) {
@@ -272,7 +280,7 @@
}
while (run) {
- std::this_thread::yield();
+ std::this_thread::sleep_for(100ms);
}
};
ThreadManager clients, servers;
diff --git a/utest/util.h b/utest/util.h
index f31d63f..4d960db 100644
--- a/utest/util.h
+++ b/utest/util.h
@@ -34,7 +34,6 @@
#include <vector>
using namespace boost::posix_time;
-inline ptime Now() { return second_clock::universal_time(); };
using namespace std::chrono_literals;
--
Gitblit v1.8.0