From c64c54d8e75b9354dc49a7b6b2d326e7dd59eb37 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 15 四月 2021 19:32:16 +0800
Subject: [PATCH] add api; fix send, socknode mem leak.
---
src/proto.cpp | 2
src/socket.h | 11
box/center.cpp | 53 ++-
src/proto.h | 2
.vscode/settings.json | 3
utest/utest.cpp | 28 +-
src/topic_node.cpp | 129 +++++++---
box/center_main.cc | 25 ++
utest/api_test.cpp | 196 +++++++++++++++-
utest/util.h | 7
.vscode/launch.json | 2
src/topic_node.h | 14
src/bh_api.h | 26 ++
src/sendq.h | 2
src/bh_api.cpp | 142 ++++++++---
box/app_arg.h | 59 ++++
16 files changed, 558 insertions(+), 143 deletions(-)
diff --git a/.vscode/launch.json b/.vscode/launch.json
index 12aa21d..939b9a9 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -11,7 +11,7 @@
"program": "${workspaceFolder}/debug/bin/utest",
"args": [
"-t",
- "SRTest"
+ "ApiTest"
],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
diff --git a/.vscode/settings.json b/.vscode/settings.json
index 97450e9..88753a7 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -60,7 +60,8 @@
"*.inc": "cpp",
"strstream": "cpp",
"unordered_set": "cpp",
- "cfenv": "cpp"
+ "cfenv": "cpp",
+ "*.ipp": "cpp"
},
"files.exclude": {
"**/*.un~": true,
diff --git a/box/app_arg.h b/box/app_arg.h
new file mode 100644
index 0000000..e9e2c1c
--- /dev/null
+++ b/box/app_arg.h
@@ -0,0 +1,59 @@
+#ifndef APP_ARG_OQMELZBX
+#define APP_ARG_OQMELZBX
+
+#include <map>
+#include <string>
+
+class AppArg
+{
+ typedef std::map<std::string, std::string> ArgMap;
+public:
+ AppArg(int argc, const char *argv[]) {
+ Parse(argc, argv);
+ }
+ bool Has(const std::string &key) const {
+ return Pos(key) != args.end();
+ }
+ std::string Get(const std::string &key, const std::string &def = "") const {
+ ArgMap::const_iterator pos = Pos(key);
+ if (pos != args.end()) {
+ return pos->second;
+ } else {
+ return def;
+ }
+ }
+private:
+ void Parse(int argc, const char *argv[]) {
+ for (int i = 1; i < argc; ++i) {
+ std::string text(argv[i]);
+ if (text.substr(0, 2) == "--") {
+ text = text.substr(2);
+ std::string::size_type sep = text.find('=');
+ if (sep == std::string::npos) {
+ args[text].clear();
+ } else {
+ args[text.substr(0, sep)] = text.substr(sep+1);
+ }
+ } else if (text.substr(0,1) == "-") {
+ text = text.substr(1);
+ args[text].clear();
+ if (i+1 < argc) {
+ std::string next(argv[i+1]);
+ if (next.substr(0,1) != "-") {
+ args[text] = next;
+ ++i;
+ }
+ }
+ }
+ }
+
+ }
+ ArgMap::const_iterator Pos(const std::string &key) const {
+ return args.find(key);
+ }
+
+ ArgMap args;
+};
+
+#endif // end of include guard: APP_ARG_OQMELZBX
+
diff --git a/box/center.cpp b/box/center.cpp
index 0dd4ed4..8625f7f 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -121,20 +121,18 @@
};
auto pos = nodes_.find(head.proc_id());
- if (pos == nodes_.end()) { // new client
- Node node(new NodeInfo);
- UpdateRegInfo(node);
- nodes_[node->proc_.proc_id()] = node;
- } else {
+ if (pos != nodes_.end()) { // new client
Node &node = pos->second;
if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) {
// node restarted, release old mq.
- for (auto &addr : node->addrs_) {
- cleaner_(addr);
- }
- node->addrs_.clear();
+ RemoveNode(node);
+ node.reset(new NodeInfo);
}
UpdateRegInfo(node);
+ } else {
+ Node node(new NodeInfo);
+ UpdateRegInfo(node);
+ nodes_[node->proc_.proc_id()] = node;
}
return MakeReply(eSuccess);
} catch (...) {
@@ -334,11 +332,7 @@
auto &cli = *it->second;
cli.state_.UpdateState(now, offline_time_, kill_time_);
if (cli.state_.flag_ == kStateKillme) {
- if (cleaner_) {
- for (auto &addr : cli.addrs_) {
- cleaner_(addr);
- }
- }
+ RemoveNode(it->second);
it = nodes_.erase(it);
} else {
++it;
@@ -357,6 +351,30 @@
{
auto node = weak.lock();
return node && Valid(*node);
+ }
+ void RemoveNode(Node &node)
+ {
+ auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) {
+ for (auto &addr_topics : node_rec) {
+ TopicDest dest{addr_topics.first, node};
+ for (auto &topic : addr_topics.second) {
+ auto pos = rec_map.find(topic);
+ if (pos != rec_map.end()) {
+ pos->second.erase(dest);
+ if (pos->second.empty()) {
+ rec_map.erase(pos);
+ }
+ }
+ }
+ }
+ };
+ EraseMapRec(service_map_, node->services_);
+ EraseMapRec(subscribe_map_, node->subscriptions_);
+
+ for (auto &addr : node->addrs_) {
+ cleaner_(addr);
+ }
+ node->addrs_.clear();
}
std::string id_; // center proc id;
@@ -403,11 +421,8 @@
auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
return [&](auto &&rep_body) {
auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
- MsgI msg;
- if (msg.Make(socket.shm(), reply_head, rep_body)) {
- auto &remote = head.route(0).mq_id();
- bool r = socket.Send(remote.data(), msg);
- }
+ auto &remote = head.route(0).mq_id();
+ socket.Send(remote.data(), reply_head, rep_body);
};
};
diff --git a/box/center_main.cc b/box/center_main.cc
index 40aed56..5baa409 100644
--- a/box/center_main.cc
+++ b/box/center_main.cc
@@ -15,17 +15,40 @@
*
* =====================================================================================
*/
+#include "app_arg.h"
#include "box.h"
#include "center.h"
#include "defs.h"
#include "signalhandle.h"
+#include <chrono>
+#include <thread>
+using namespace std::chrono_literals;
int center_main(int argc, const char *argv[])
{
+ AppArg args(argc, argv);
+ if (args.Has("remove")) {
+ BHomeShm().Remove();
+ return 0;
+ }
+
+ bool run = true;
+ auto showStatus = [&]() {
+ auto init = BHomeShm().get_free_memory();
+ uint64_t idx = 0;
+ while (run) {
+ std::this_thread::sleep_for(1s);
+ printf("%8d shared memory: avail : %ld / %ld\n", ++idx, BHomeShm().get_free_memory(), init);
+ }
+ };
+ std::thread t(showStatus);
+
BHCenter center(BHomeShm());
center.Start();
+ printf("center started ...\n");
WaitForSignals({SIGINT, SIGTERM});
- // BHomeShm().Remove(); // remove ?
+ run = false;
+ t.join();
return 0;
}
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 78b8a59..2abe66d 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -39,11 +39,25 @@
}
size_t size() const { return size_; }
operator bool() const { return ptr_; }
+ bool ReleaseTo(void **pdata, int *psize)
+ {
+ if (!ptr_) {
+ return false;
+ }
+ if (pdata && psize) {
+ *psize = size();
+ *pdata = release();
+ }
+ return true;
+ }
};
template <class Msg>
bool PackOutput(const Msg &msg, void **out, int *out_len)
{
+ if (!out || !out_len) {
+ return true; // not wanted.
+ }
auto size = msg.ByteSizeLong();
TmpPtr p(size);
if (!p) {
@@ -51,30 +65,37 @@
return false;
}
msg.SerializePartialToArray(p.get(), size);
- *out = p.release();
- *out_len = size;
+ p.ReleaseTo(out, out_len);
return true;
+}
+
+template <class MsgIn, class MsgOut = MsgCommonReply>
+bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
+ const void *request,
+ const int request_len,
+ void **reply,
+ int *reply_len,
+ const int timeout_ms)
+{
+ MsgIn input;
+ if (!input.ParseFromArray(request, request_len)) {
+ SetLastError(eInvalidInput, "invalid input.");
+ return false;
+ }
+ MsgOut msg_reply;
+ if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) {
+ return PackOutput(msg_reply, reply, reply_len);
+
+ } else {
+ return false;
+ }
}
} // namespace
-bool BHRegister(const void *proc_info,
- const int proc_info_len,
- void **reply,
- int *reply_len,
- const int timeout_ms)
+bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
- ProcInfo pi;
- if (!pi.ParseFromArray(proc_info, proc_info_len)) {
- SetLastError(eInvalidInput, "invalid input.");
- return false;
- }
- MsgCommonReply msg_reply;
- if (ProcNode().Register(pi, msg_reply, timeout_ms)) {
- return PackOutput(msg_reply, reply, reply_len);
- } else {
- return false;
- }
+ return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
bool BHHeartBeatEasy(const int timeout_ms)
@@ -82,23 +103,19 @@
return ProcNode().Heartbeat(timeout_ms);
}
-bool BHHeartBeat(const void *proc_info,
- const int proc_info_len,
- void **reply,
- int *reply_len,
- const int timeout_ms)
+bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
- ProcInfo pi;
- if (!pi.ParseFromArray(proc_info, proc_info_len)) {
- SetLastError(eInvalidInput, "invalid input.");
- return false;
- }
- MsgCommonReply msg_reply;
- if (ProcNode().Heartbeat(pi, msg_reply, timeout_ms)) {
- return PackOutput(msg_reply, reply, reply_len);
- } else {
- return false;
- }
+ return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
+}
+
+bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+ return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
+}
+
+bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
+{
+ return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
}
bool BHPublish(const void *msgpub,
@@ -125,8 +142,35 @@
if (ProcNode().RecvSub(proc, pub, timeout_ms)) {
TmpPtr pproc(proc);
if (pproc && PackOutput(pub, msgpub, msgpub_len)) {
- *proc_id = pproc.release();
- *proc_id_len = pproc.size();
+ pproc.ReleaseTo(proc_id, proc_id_len);
+ return true;
+ } else {
+ SetLastError(ENOMEM, "out of mem");
+ }
+ }
+ return false;
+}
+
+bool BHAsyncRequest(const void *request,
+ const int request_len,
+ void **msg_id,
+ int *msg_id_len)
+{
+ MsgRequestTopic req;
+ if (!req.ParseFromArray(request, request_len)) {
+ SetLastError(eInvalidInput, "invalid input.");
+ return false;
+ }
+ std::string str_msg_id;
+ MsgRequestTopicReply out_msg;
+ if (ProcNode().ClientAsyncRequest(req, str_msg_id)) {
+ if (!msg_id || !msg_id_len) {
+ return true;
+ }
+ TmpPtr ptr(str_msg_id);
+ if (ptr) {
+ ptr.ReleaseTo(msg_id, msg_id_len);
+ return true;
} else {
SetLastError(ENOMEM, "out of mem");
}
@@ -152,8 +196,8 @@
if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) {
TmpPtr pproc(proc);
if (pproc && PackOutput(out_msg, reply, reply_len)) {
- *proc_id = pproc.release();
- *proc_id_len = pproc.size();
+ pproc.ReleaseTo(proc_id, proc_id_len);
+ return true;
} else {
SetLastError(ENOMEM, "out of mem");
}
@@ -174,9 +218,9 @@
if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) {
TmpPtr pproc(proc);
if (pproc && PackOutput(out_msg, request, request_len)) {
- *proc_id = pproc.release();
- *proc_id_len = pproc.size();
+ pproc.ReleaseTo(proc_id, proc_id_len);
*src = src_info;
+ return true;
} else {
SetLastError(ENOMEM, "out of mem");
}
@@ -206,10 +250,11 @@
typedef std::function<bool(const void *, const int)> ServerSender;
} // namespace
-void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb)
+void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb)
{
TopicNode::ServerCB on_req;
TopicNode::SubDataCB on_sub;
+ TopicNode::RequestResultCB on_reply;
if (server_cb) {
on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
std::string sreq(request.SerializeAsString());
@@ -228,8 +273,16 @@
sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size());
};
}
+ if (client_cb) {
+ on_reply = [client_cb](const BHMsgHead &head, const MsgRequestTopicReply &rep) {
+ std::string s(rep.SerializeAsString());
+ client_cb(head.proc_id().data(), head.proc_id().size(),
+ head.msg_id().data(), head.msg_id().size(),
+ s.data(), s.size());
+ };
+ }
- ProcNode().Start(on_req, on_sub);
+ ProcNode().Start(on_req, on_sub, on_reply);
}
bool BHServerCallbackReply(const BHServerCallbackTag *tag,
const void *data,
@@ -251,10 +304,7 @@
std::string err_msg;
GetLastError(ec, err_msg);
TmpPtr p(err_msg);
- if (p) {
- *msg = p.release();
- *msg_len = p.size();
- }
+ p.ReleaseTo(msg, msg_len);
}
return ec;
}
diff --git a/src/bh_api.h b/src/bh_api.h
index 1023ba4..eeb47a5 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -14,6 +14,18 @@
int *reply_len,
const int timeout_ms);
+bool BHRegisterTopics(const void *topics,
+ const int topics_len,
+ void **reply,
+ int *reply_len,
+ const int timeout_ms);
+
+bool BHSubscribeTopics(const void *topics,
+ const int topics_len,
+ void **reply,
+ int *reply_len,
+ const int timeout_ms);
+
typedef void (*FSubDataCallback)(const void *proc_id,
const int proc_id_len,
const void *data,
@@ -25,7 +37,14 @@
const int data_len,
BHServerCallbackTag *tag);
-void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb);
+typedef void (*FClientCallback)(const void *proc_id,
+ const int proc_id_len,
+ const void *msg_id,
+ const int msg_id_len,
+ const void *data,
+ const int data_len);
+
+void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb);
bool BHServerCallbackReply(const BHServerCallbackTag *tag,
const void *data,
const int data_len);
@@ -47,6 +66,11 @@
int *msgpub_len,
const int timeout_ms);
+bool BHAsyncRequest(const void *request,
+ const int request_len,
+ void **msg_id,
+ int *msg_id_len);
+
bool BHRequest(const void *request,
const int request_len,
void **proc_id,
diff --git a/src/proto.cpp b/src/proto.cpp
index 287924b..b1e8207 100644
--- a/src/proto.cpp
+++ b/src/proto.cpp
@@ -30,6 +30,8 @@
} // namespace
+std::string NewMsgId() { return RandId(); }
+
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id)
{
return InitMsgHead(type, proc_id, RandId());
diff --git a/src/proto.h b/src/proto.h
index 42fe343..b418342 100644
--- a/src/proto.h
+++ b/src/proto.h
@@ -72,7 +72,7 @@
SetError(*msg.mutable_errmsg(), err_code, err_str);
return msg;
}
-
+std::string NewMsgId();
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid);
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id);
// inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
diff --git a/src/sendq.h b/src/sendq.h
index b4f3821..aa8923d 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -55,7 +55,7 @@
void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
{
using namespace std::chrono_literals;
- Append(addr, msg, Now() + 60s, onExpire);
+ Append(addr, msg, Now() + 3s, onExpire);
}
bool TrySend(bhome_shm::ShmMsgQueue &mq);
// bool empty() const { return store_.empty(); }
diff --git a/src/socket.h b/src/socket.h
index 0b0b880..96af6e7 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -36,7 +36,7 @@
class ShmSocket : private boost::noncopyable
{
- bool SendImpl(const void *valid_remote, const MsgI &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent())
+ bool SendImpl(const void *valid_remote, MsgI const &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent())
{
// if (!mq().TrySend(*(MQId *) valid_remote, imsg)) {
send_buffer_.Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire);
@@ -69,7 +69,11 @@
bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body)
{
MsgI msg;
- return msg.Make(shm(), head, body) && SendImpl(valid_remote, msg);
+ if (msg.Make(shm(), head, body)) {
+ DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
+ return SendImpl(valid_remote, msg);
+ }
+ return false;
}
template <class Body>
@@ -78,6 +82,7 @@
//TODO send_buffer_ need flag, and remove callback on expire.
MsgI msg;
if (msg.Make(shm(), head, body)) {
+ DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
std::string msg_id(head.msg_id());
per_msg_cbs_->Add(msg_id, cb);
auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) {
@@ -85,6 +90,8 @@
per_msg_cbs_->Find(msg_id, cb_no_use);
};
return SendImpl(valid_remote, msg, onExpireRemoveCB);
+ } else {
+ printf("out of mem?, avail: %ld\n", shm().get_free_memory());
}
return false;
}
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 4ce2c97..e9e627f 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -35,35 +35,45 @@
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
- shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm)
+ shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm), registered_(false)
{
- SockNode().Start();
+ // recv msgs to avoid memory leak.
+ auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
+ SockNode().Start(default_ignore_msg);
}
TopicNode::~TopicNode()
{
Stop();
+ SockNode().Stop();
}
-void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb)
+void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker)
{
- ServerStart(server_cb, 1);
- SubscribeStartWorker(sub_cb, 1);
- // SockClient().Start();
+ if (nworker < 1) {
+ nworker = 1;
+ } else if (nworker > 16) {
+ nworker = 16;
+ }
+
+ ServerStart(server_cb, nworker);
+ SubscribeStartWorker(sub_cb, nworker);
+ ClientStartWorker(client_cb, nworker);
}
void TopicNode::Stop()
{
SockSub().Stop();
SockServer().Stop();
SockClient().Stop();
- SockNode().Stop();
}
bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
+ info_ = proc;
+
auto &sock = SockNode();
MsgRegister body;
- *body.mutable_proc() = proc;
+ body.mutable_proc()->Swap(&proc);
auto AddId = [&](const MQId &id) { body.add_addrs()->set_mq_id(&id, sizeof(id)); };
AddId(SockNode().id());
AddId(SockServer().id());
@@ -74,27 +84,39 @@
auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
AddRoute(head, sock.id());
+ auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) {
+ bool ok = head.type() == kMsgTypeCommonReply &&
+ msg.ParseBody(rbody) &&
+ IsSuccess(rbody.errmsg().errcode());
+ printf("async regisered %s\n", ok ? "ok" : "failed");
+ registered_.store(ok);
+ };
+
if (timeout_ms == 0) {
- return sock.Send(&BHTopicCenterAddress(), head, body);
+ auto onResult = [this, CheckResult](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
+ MsgCommonReply body;
+ CheckResult(imsg, head, body);
+ };
+ return sock.Send(&BHTopicCenterAddress(), head, body, onResult);
} else {
MsgI reply;
DEFER1(reply.Release(shm_););
BHMsgHead reply_head;
bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
- r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
- if (r && IsSuccess(reply_body.errmsg().errcode())) {
- info_ = body;
- return true;
+ if (r) {
+ CheckResult(reply, reply_head, reply_body);
}
- return false;
+ return IsRegistered();
}
}
bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
+ if (!IsRegistered()) { return false; }
+
auto &sock = SockNode();
MsgHeartbeat body;
- *body.mutable_proc() = proc;
+ body.mutable_proc()->Swap(&proc);
auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
AddRoute(head, sock.id());
@@ -120,7 +142,8 @@
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
- //TODO check registered
+ if (!IsRegistered()) { return false; }
+
auto &sock = SockServer();
MsgRegisterRPC body;
body.mutable_topics()->Swap(&topics);
@@ -155,11 +178,8 @@
for (int i = 0; i < head.route_size() - 1; ++i) {
reply_head.add_route()->Swap(head.mutable_route(i));
}
- MsgI msg;
- if (msg.Make(sock.shm(), reply_head, reply_body)) {
- auto &remote = head.route().rbegin()->mq_id();
- sock.Send(remote.data(), msg);
- }
+ auto &remote = head.route().rbegin()->mq_id();
+ sock.Send(remote.data(), reply_head, reply_body);
}
};
@@ -169,6 +189,8 @@
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
{
+ if (!IsRegistered()) { return false; }
+
auto &sock = SockServer();
MsgI imsg;
@@ -188,6 +210,8 @@
bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body)
{
+ if (!IsRegistered()) { return false; }
+
auto &sock = SockServer();
SrcInfo *p = static_cast<SrcInfo *>(src_info);
@@ -211,7 +235,7 @@
if (head.type() == kMsgTypeRequestTopicReply) {
MsgRequestTopicReply reply;
if (imsg.ParseBody(reply)) {
- cb(head.proc_id(), reply);
+ cb(head, reply);
}
}
};
@@ -219,37 +243,60 @@
return SockRequest().Start(onData, nworker);
}
-bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const RequestResultCB &cb)
+bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
{
- auto Call = [&](const void *remote) {
- auto &sock = SockRequest();
+ if (!IsRegistered()) { return false; }
- BHMsgHead head(InitMsgHead(GetType(req), proc_id()));
+ const std::string &msg_id(NewMsgId());
+
+ out_msg_id = msg_id;
+
+ auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) {
+ auto &sock = SockClient();
+ BHMsgHead head(InitMsgHead(GetType(req), proc_id(), msg_id));
AddRoute(head, sock.id());
+ head.set_topic(req.topic());
if (cb) {
auto onRecv = [cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
if (head.type() == kMsgTypeRequestTopicReply) {
MsgRequestTopicReply reply;
if (imsg.ParseBody(reply)) {
- cb(head.proc_id(), reply);
+ cb(head, reply);
}
}
};
- return sock.Send(remote, head, req, onRecv);
+ return sock.Send(addr.mq_id().data(), head, req, onRecv);
} else {
- return sock.Send(remote, head, req);
+ return sock.Send(addr.mq_id().data(), head, req);
}
};
try {
+ auto &sock = SockClient();
BHAddress addr;
- if (ClientQueryRPCTopic(req.topic(), addr, 1000)) {
- return Call(addr.mq_id().data());
- } else {
- SetLastError(eNotFound, "remote not found.");
- return false;
+
+ if (topic_query_cache_.Find(req.topic(), addr)) {
+ return SendTo(addr, req, cb);
}
+
+ MsgQueryTopic query;
+ query.set_topic(req.topic());
+ BHMsgHead head(InitMsgHead(GetType(query), proc_id()));
+ AddRoute(head, sock.id());
+
+ auto onQueryResult = [this, SendTo, req, cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+ MsgQueryTopicReply rep;
+ if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) {
+ auto &addr = rep.address();
+ if (!addr.mq_id().empty()) {
+ topic_query_cache_.Update(req.topic(), addr);
+ SendTo(addr, req, cb);
+ }
+ }
+ };
+ return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult);
+
} catch (...) {
return false;
}
@@ -257,6 +304,8 @@
bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
{
+ if (!IsRegistered()) { return false; }
+
try {
auto &sock = SockRequest();
@@ -264,6 +313,7 @@
if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
BHMsgHead head(InitMsgHead(GetType(request), proc_id()));
AddRoute(head, sock.id());
+ head.set_topic(request.topic());
MsgI reply_msg;
DEFER1(reply_msg.Release(shm_););
@@ -288,6 +338,8 @@
bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
{
+ if (!IsRegistered()) { return false; }
+
auto &sock = SockRequest();
if (topic_query_cache_.Find(topic, addr)) {
@@ -325,6 +377,8 @@
bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms)
{
+ if (!IsRegistered()) { return false; }
+
try {
auto &sock = SockPub();
BHMsgHead head(InitMsgHead(GetType(pub), proc_id()));
@@ -349,8 +403,10 @@
// subscribe
-bool TopicNode::Subscribe(MsgTopicList &topics, const int timeout_ms)
+bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
+ if (!IsRegistered()) { return false; }
+
try {
auto &sock = SockSub();
MsgSubscribe sub;
@@ -364,7 +420,6 @@
MsgI reply;
DEFER1(reply.Release(shm()););
BHMsgHead reply_head;
- MsgCommonReply reply_body;
return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) &&
reply_head.type() == kMsgTypeCommonReply &&
reply.ParseBody(reply_body) &&
@@ -396,6 +451,8 @@
bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms)
{
+ if (!IsRegistered()) { return false; }
+
auto &sock = SockSub();
MsgI msg;
DEFER1(msg.Release(shm()););
diff --git a/src/topic_node.h b/src/topic_node.h
index 0627930..8c3c48e 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -29,7 +29,7 @@
class TopicNode
{
SharedMemory &shm_;
- MsgRegister info_;
+ ProcInfo info_;
SharedMemory &shm() { return shm_; }
@@ -51,9 +51,9 @@
bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply);
// topic client
- typedef std::function<void(const std::string &proc_id, const MsgRequestTopicReply &reply)> RequestResultCB;
+ typedef std::function<void(const BHMsgHead &head, const MsgRequestTopicReply &reply)> RequestResultCB;
bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
- bool ClientAsyncRequest(const MsgRequestTopic &request, const RequestResultCB &rrcb = RequestResultCB());
+ bool ClientAsyncRequest(const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB());
bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms);
// publish
@@ -62,15 +62,15 @@
// subscribe
typedef std::function<void(const std::string &proc_id, const MsgPublish &data)> SubDataCB;
bool SubscribeStartWorker(const SubDataCB &tdcb, int nworker = 2);
- bool Subscribe(MsgTopicList &topics, const int timeout_ms);
+ bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms);
bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms);
- void Start(ServerCB const &server_cb, SubDataCB const &sub_cb);
+ void Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2);
void Stop();
private:
bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
- const std::string &proc_id() { return info_.proc().proc_id(); }
+ const std::string &proc_id() { return info_.proc_id(); }
typedef bhome_msg::BHAddress Address;
class TopicQueryCache
@@ -118,7 +118,9 @@
auto &SockClient() { return SockRequest(); }
auto &SockReply() { return sock_reply_; }
auto &SockServer() { return SockReply(); }
+ bool IsRegistered() const { return registered_.load(); }
+ std::atomic<bool> registered_;
ShmSocket sock_node_;
ShmSocket sock_request_;
ShmSocket sock_reply_;
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 113bb99..cff2cc5 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -17,11 +17,73 @@
*/
#include "bh_api.h"
#include "util.h"
+#include <atomic>
-class DemoClient
+using namespace bhome::msg;
+
+namespace
{
-public:
+typedef std::atomic<uint64_t> Number;
+
+struct MsgStatus {
+ Number nrequest_;
+ Number nreply_;
+ Number nserved_;
+ MsgStatus() :
+ nrequest_(0), nreply_(0), nserved_(0) {}
};
+
+MsgStatus &Status()
+{
+ static MsgStatus st;
+ return st;
+}
+} // namespace
+
+void SubRecvProc(const void *proc_id,
+ const int proc_id_len,
+ const void *data,
+ const int data_len)
+{
+ std::string proc((const char *) proc_id, proc_id_len);
+ MsgPublish pub;
+ pub.ParseFromArray(data, data_len);
+ // printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str());
+}
+
+void ServerProc(const void *proc_id,
+ const int proc_id_len,
+ const void *data,
+ const int data_len,
+ BHServerCallbackTag *tag)
+{
+ // printf("ServerProc: ");
+ // DEFER1(printf("\n"););
+ MsgRequestTopic request;
+ if (request.ParseFromArray(data, data_len)) {
+ MsgRequestTopicReply reply;
+ reply.set_data(" reply: " + request.data());
+ std::string s(reply.SerializeAsString());
+ // printf("%s", reply.data().c_str());
+ BHServerCallbackReply(tag, s.data(), s.size());
+ ++Status().nserved_;
+ }
+}
+
+void ClientProc(const void *proc_id,
+ const int proc_id_len,
+ const void *msg_id,
+ const int msg_id_len,
+ const void *data,
+ const int data_len)
+{
+ std::string proc((const char *) proc_id, proc_id_len);
+ MsgRequestTopicReply reply;
+ if (reply.ParseFromArray(data, data_len)) {
+ ++Status().nreply_;
+ }
+ // printf("client Recv reply : %s\n", reply.data().c_str());
+}
BOOST_AUTO_TEST_CASE(ApiTest)
{
@@ -36,19 +98,125 @@
nsec, nhour, nday, years);
std::chrono::steady_clock::duration a(123456);
printf("nowsec: %ld\n", NowSec());
- // for (int i = 0; i < 5; ++i) {
- // std::this_thread::sleep_for(1s);
- // printf("nowsec: %ld\n", NowSec());
- // }
printf("maxsec: %ld\n", CountSeconds(max_time));
- ProcInfo proc;
- proc.set_proc_id("demo_client");
- proc.set_public_info("public info of demo_client. etc...");
- std::string proc_buf(proc.SerializeAsString());
- void *reply = 0;
- int reply_len = 0;
- bool r = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 1000);
- printf("register %s\n", r ? "ok" : "failed");
+ bool reg = false;
+ for (int i = 0; i < 10 && !reg; ++i) {
+ ProcInfo proc;
+ proc.set_proc_id("demo_client");
+ proc.set_public_info("public info of demo_client. etc...");
+ std::string proc_buf(proc.SerializeAsString());
+ void *reply = 0;
+ int reply_len = 0;
+ reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000);
+ printf("register %s\n", reg ? "ok" : "failed");
+
+ BHFree(reply, reply_len);
+ Sleep(1s);
+ }
+
+ const std::string topic_ = "topic_";
+
+ {
+ MsgTopicList topics;
+ for (int i = 0; i < 10; ++i) {
+ topics.add_topic_list(topic_ + std::to_string(i));
+ }
+ std::string s = topics.SerializeAsString();
+ void *reply = 0;
+ int reply_len = 0;
+ bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
+ BHFree(reply, reply_len);
+ // printf("register topic : %s\n", r ? "ok" : "failed");
+ Sleep(1s);
+ }
+
+ {
+ MsgTopicList topics;
+ for (int i = 0; i < 10; ++i) {
+ topics.add_topic_list(topic_ + std::to_string(i * 2));
+ }
+ std::string s = topics.SerializeAsString();
+ void *reply = 0;
+ int reply_len = 0;
+ bool r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000);
+ BHFree(reply, reply_len);
+ printf("subscribe topic : %s\n", r ? "ok" : "failed");
+ }
+
+ BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
+
+ {
+ for (int i = 0; i < 1; ++i) {
+ MsgPublish pub;
+ pub.set_topic(topic_ + std::to_string(i));
+ pub.set_data("pub_data_" + std::string(1024 * 1024, 'a'));
+ std::string s(pub.SerializeAsString());
+ BHPublish(s.data(), s.size(), 0);
+ // Sleep(1s);
+ }
+ }
+
+ auto asyncRequest = [&](uint64_t nreq) {
+ for (uint64_t i = 0; i < nreq; ++i) {
+ MsgRequestTopic req;
+ req.set_topic(topic_ + std::to_string(0));
+ req.set_data("request_data_" + std::to_string(i));
+ std::string s(req.SerializeAsString());
+ void *msg_id = 0;
+ int len = 0;
+ bool r = BHAsyncRequest(s.data(), s.size(), 0, 0);
+ DEFER1(BHFree(msg_id, len););
+ if (r) {
+ ++Status().nrequest_;
+ } else {
+ printf("request topic : %s\n", r ? "ok" : "failed");
+ }
+ }
+ };
+ auto showStatus = [](std::atomic<bool> *run) {
+ int64_t last = 0;
+ while (*run) {
+ auto &st = Status();
+ std::this_thread::sleep_for(1s);
+ int cur = st.nreply_.load();
+ printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld, speed %8ld\n", st.nrequest_.load(), st.nserved_.load(), cur, cur - last);
+ last = cur;
+ }
+ };
+ auto hb = [](std::atomic<bool> *run) {
+ while (*run) {
+ BHHeartBeatEasy(0);
+ std::this_thread::sleep_for(1s);
+ }
+ };
+ std::atomic<bool> run(true);
+ ThreadManager threads;
+ boost::timer::auto_cpu_timer timer;
+ threads.Launch(hb, &run);
+ // threads.Launch(showStatus, &run);
+ int ncli = 10;
+ const uint64_t nreq = 1000 * 100;
+ for (int i = 0; i < ncli; ++i) {
+ threads.Launch(asyncRequest, nreq);
+ }
+
+ int same = 0;
+ int64_t last = 0;
+ while (last < nreq * ncli && same < 3) {
+ Sleep(1s);
+ auto cur = Status().nreply_.load();
+ if (last == cur) {
+ ++same;
+ } else {
+ last = cur;
+ same = 0;
+ }
+ }
+
+ run = false;
+ threads.WaitAll();
+ auto &st = Status();
+ printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load());
}
\ No newline at end of file
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 817cbaf..12d4396 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -99,7 +99,7 @@
BHCenter center(shm);
center.Start();
- std::this_thread::sleep_for(100ms);
+ Sleep(100ms);
std::atomic<uint64_t> total_count(0);
std::atomic<ptime> last_time(Now() - seconds(1));
@@ -113,7 +113,8 @@
for (auto &t : topics) {
tlist.add_topic_list(t);
}
- bool r = client.Subscribe(tlist, timeout);
+ MsgCommonReply reply_body;
+ bool r = client.Subscribe(tlist, reply_body, timeout);
if (!r) {
printf("client subscribe failed.\n");
}
@@ -149,7 +150,7 @@
MsgPublish pub;
pub.set_topic(topic);
pub.set_data(data);
- bool r = provider.Publish(pub, timeout);
+ bool r = provider.Publish(pub, 0);
if (!r) {
static std::atomic<int> an(0);
int n = ++an;
@@ -169,7 +170,7 @@
part.push_back(topics[i]);
threads.Launch(Sub, i, topics);
}
- std::this_thread::sleep_for(100ms);
+ Sleep(100ms);
for (auto &topic : topics) {
threads.Launch(Pub, topic);
}
@@ -217,7 +218,7 @@
std::atomic<int> count(0);
std::string reply;
- auto onRecv = [&](const std::string &proc_id, const MsgRequestTopicReply &msg) {
+ auto onRecv = [&](const BHMsgHead &head, const MsgRequestTopicReply &msg) {
reply = msg.data();
if (++count >= nreq) {
printf("count: %d\n", count.load());
@@ -229,7 +230,8 @@
MsgRequestTopic req;
req.set_topic(topic);
req.set_data("data " + std::to_string(i));
- if (!client.ClientAsyncRequest(req)) {
+ std::string msg_id;
+ if (!client.ClientAsyncRequest(req, msg_id)) {
printf("client request failed\n");
++count;
}
@@ -274,9 +276,9 @@
ThreadManager clients, servers;
std::vector<Topic> topics = {"topic1", "topic2"};
servers.Launch(Server, "server", topics);
- std::this_thread::sleep_for(100ms);
+ Sleep(100ms);
for (auto &t : topics) {
- clients.Launch(Client, t, 1000 * 1);
+ clients.Launch(Client, t, 1000 * 100);
}
clients.WaitAll();
printf("clients done, server replyed: %ld\n", server_msg_count.load());
@@ -302,18 +304,16 @@
};
Check();
for (int i = 0; i < 3; ++i) {
- std::this_thread::sleep_for(1s);
+ Sleep(1s);
Check();
}
- printf("sleep 4\n");
- std::this_thread::sleep_for(4s);
+ Sleep(4s);
for (int i = 0; i < 2; ++i) {
- std::this_thread::sleep_for(1s);
+ Sleep(1s);
Check();
}
}
- printf("sleep 8\n");
- std::this_thread::sleep_for(8s);
+ Sleep(8s);
}
inline int MyMin(int a, int b)
{
diff --git a/utest/util.h b/utest/util.h
index aaa5189..7f41da9 100644
--- a/utest/util.h
+++ b/utest/util.h
@@ -38,6 +38,13 @@
using namespace std::chrono_literals;
+template <class D>
+inline void Sleep(D d)
+{
+ printf("sleep for %ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
+ std::this_thread::sleep_for(d);
+}
+
typedef std::function<void(void)> FuncVV;
class ScopeCall : private boost::noncopyable
--
Gitblit v1.8.0