From 056f71f24cefaf88f2a93714c6678c03ed5f1e0e Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 02 七月 2021 16:54:33 +0800
Subject: [PATCH] fixed to adapt gcc-5.4 & glibc-2.25
---
src/topic_node.cpp | 89 +++++++++++++++++++++++++++++---------------
1 files changed, 59 insertions(+), 30 deletions(-)
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 124d329..6096fbb 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -29,6 +29,11 @@
namespace
{
+bool ValidUserSymbol(const std::string &s)
+{
+ return !s.empty() && s[0] != '#' && s[0] != '@';
+}
+
inline void AddRoute(BHMsgHead &head, const ShmSocket &sock)
{
auto route = head.add_route();
@@ -45,8 +50,8 @@
} // namespace
-TopicNode::TopicNode(SharedMemory &shm) :
- shm_(shm), state_(eStateUninited)
+TopicNode::TopicNode(SharedMemory &shm, MQId ssn_id) :
+ shm_(shm), state_(eStateUninited), ssn_id_(ssn_id)
{
}
@@ -143,10 +148,9 @@
for (auto &p : sockets_) { p->Stop(); }
}
-bool TopicNode::UniRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
+bool TopicNode::DoRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
- auto ValidUserProcId = [](const std::string &id) { return !id.empty() && id[0] != '#'; };
- if (!internal && !ValidUserProcId(proc.proc_id())) {
+ if (!internal && !ValidUserSymbol(proc.proc_id())) {
SetLastError(eInvalidInput, "invalid proc id :'" + proc.proc_id() + "'");
return false;
}
@@ -315,8 +319,17 @@
reply.ParseBody(reply_body));
}
-bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
+bool TopicNode::DoServerRegisterRPC(const bool internal, MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
+ if (!internal) {
+ for (auto &&topic : topics.topic_list()) {
+ if (!ValidUserSymbol(topic)) {
+ SetLastError(eInvalidInput, "invalid user topic :'" + topic + "'");
+ return false;
+ }
+ }
+ }
+
if (!IsOnline()) {
SetLastError(eNotRegistered, kErrMsgNotRegistered);
return false;
@@ -484,9 +497,10 @@
out_msg_id = msg_id;
- auto SendTo = [this, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
+ auto SendTo = [this, remote_addr, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
auto &sock = SockClient();
BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id));
+ *head.mutable_dest() = remote_addr;
AddRoute(head, sock);
head.set_topic(req.topic());
@@ -506,8 +520,12 @@
};
try {
- BHAddress addr;
- return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb);
+ if (remote_addr.ip().empty()) {
+ BHAddress addr;
+ return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb);
+ } else {
+ return SendTo(CenterAddr(), req, cb);
+ }
} catch (...) {
SetLastError(eError, "internal error.");
return false;
@@ -523,26 +541,36 @@
try {
auto &sock = SockClient();
-
- BHAddress addr;
- if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
- LOG_TRACE() << "node: " << SockNode().id() << ", topic dest: " << addr.mq_id();
- BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
- AddRoute(head, sock);
- head.set_topic(request.topic());
-
- MsgI reply_msg(shm());
- DEFER1(reply_msg.Release(););
- BHMsgHead reply_head;
-
- if (sock.SendAndRecv({addr.mq_id(), addr.abs_addr()}, head, request, reply_msg, reply_head, timeout_ms) &&
- reply_head.type() == kMsgTypeRequestTopicReply &&
- reply_msg.ParseBody(out_reply)) {
- reply_head.mutable_proc_id()->swap(out_proc_id);
- return true;
+ MQInfo dest;
+ if (!remote_addr.ip().empty()) {
+ dest = CenterAddr();
+ } else {
+ BHAddress addr;
+ if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
+ dest.offset_ = addr.abs_addr();
+ dest.id_ = addr.mq_id();
+ } else {
+ return false;
}
}
- } catch (...) {
+
+ BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
+ *head.mutable_dest() = remote_addr;
+ AddRoute(head, sock);
+ head.set_topic(request.topic());
+
+ MsgI reply_msg(shm());
+ DEFER1(reply_msg.Release(););
+ BHMsgHead reply_head;
+
+ if (sock.SendAndRecv(dest, head, request, reply_msg, reply_head, timeout_ms) &&
+ reply_head.type() == kMsgTypeRequestTopicReply &&
+ reply_msg.ParseBody(out_reply)) {
+ reply_head.mutable_proc_id()->swap(out_proc_id);
+ return true;
+ }
+ } catch (std::exception &e) {
+ LOG_ERROR() << __func__ << " exception: " << e.what();
SetLastError(eError, __func__ + std::string(" internal errer."));
}
return false;
@@ -600,6 +628,7 @@
auto &sock = SockPub();
BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn()));
AddRoute(head, sock);
+ head.set_topic(pub.topic());
if (timeout_ms == 0) {
return sock.Send(BusAddr(), head, pub);
@@ -620,7 +649,7 @@
// subscribe
-bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
+bool TopicNode::DoSubscribe(MsgTopicList &topics, const bool net, MsgCommonReply &reply_body, const int timeout_ms)
{
if (!IsOnline()) {
SetLastError(eNotRegistered, kErrMsgNotRegistered);
@@ -630,6 +659,7 @@
try {
auto &sock = SockSub();
MsgSubscribe sub;
+ sub.set_network(net);
sub.mutable_topics()->Swap(&topics);
BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn()));
@@ -645,7 +675,6 @@
reply.ParseBody(reply_body) &&
IsSuccess(reply_body.errmsg().errcode());
}
- // TODO wait for result?
} catch (...) {
return false;
}
@@ -691,12 +720,12 @@
return false;
}
}
- //TODO error msg.
if (head.type() == kMsgTypePublish) {
if (pub.ParseFromString(body)) {
head.mutable_proc_id()->swap(proc_id);
return true;
}
}
+ SetLastError(eError, "invalid subcribe msg received.");
return false;
}
\ No newline at end of file
--
Gitblit v1.8.0