From bb9a7e348892eb5c4fccb063380aa6fcd9612b71 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 06 四月 2021 17:32:35 +0800
Subject: [PATCH] server resend failed; rename msgs; refactor.
---
src/topic_request.h | 32 --
.vscode/tasks.json | 28 --
src/socket.h | 16 +
src/topic_reply.cpp | 142 ++++++++++++
src/reqrep_center.cpp | 22 -
src/topic_reply.h | 52 ++++
.vscode/settings.json | 6
src/socket.cpp | 10
proto/source/bhome_msg.proto | 70 ++++--
utest/utest.cpp | 19 +
proto/source/error_msg.proto | 16 +
src/shm_queue.cpp | 18 -
src/msg.cpp | 33 +-
utest/speed_test.cpp | 2
.vscode/launch.json | 5
src/shm_queue.h | 30 +-
src/pubsub.cpp | 4
src/pubsub_center.cpp | 4
src/topic_request.cpp | 106 +--------
19 files changed, 375 insertions(+), 240 deletions(-)
diff --git a/.vscode/launch.json b/.vscode/launch.json
index c959458..9eeb23e 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -9,7 +9,10 @@
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/utest/utest",
- "args": [],
+ "args": [
+ "-t",
+ "ReqRepTest"
+ ],
"stopAtEntry": false,
"cwd": "${workspaceFolder}",
"environment": [],
diff --git a/.vscode/settings.json b/.vscode/settings.json
index c9d8618..d5005e9 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -56,5 +56,9 @@
"typeindex": "cpp",
"typeinfo": "cpp",
"variant": "cpp"
- }
+ },
+ "files.exclude": {
+ "**/*.un~": true
+ },
+ "cmake.configureOnOpen": false
}
\ No newline at end of file
diff --git a/.vscode/tasks.json b/.vscode/tasks.json
index 352fba0..84142bc 100644
--- a/.vscode/tasks.json
+++ b/.vscode/tasks.json
@@ -3,32 +3,13 @@
{
"type": "cppbuild",
"label": "C/C++: g++ build active file",
- "command": "/usr/bin/g++",
+ "command": "ninja",
"args": [
- "-g",
- "${file}",
- "-o",
- "${fileDirname}/${fileBasenameNoExtension}"
+ "-C",
+ "../build"
],
"options": {
- "cwd": "${workspaceFolder}"
- },
- "problemMatcher": [
- "$gcc"
- ],
- "group": {
- "kind": "build",
- "isDefault": true
- },
- "detail": "Task generated by Debugger."
- },
- {
- "type": "cppbuild",
- "label": "C/C++: g++ build active file",
- "command": "make",
- "args": ["build"],
- "options": {
- "cwd": "${workspaceFolder}"
+ "cwd": "${workspaceFolder}/utest"
},
"problemMatcher": [
"$gcc"
@@ -36,7 +17,6 @@
"group": "build",
"detail": "compiler: /usr/bin/g++"
}
-
],
"version": "2.0.0"
}
\ No newline at end of file
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index a8e5073..9827f17 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -2,7 +2,11 @@
option optimize_for = LITE_RUNTIME;
+import "google/protobuf/descriptor.proto";
+import "error_msg.proto";
+
package bhome.msg;
+
// message format : header(BHMsgHead) + body(variable types)
message BHAddress {
@@ -13,7 +17,7 @@
message ProcInfo
{
- bytes id = 1;
+ bytes id = 1; // serial number, maybe managed
bytes name = 2;
bytes public_info = 3;
bytes private_info = 4;
@@ -28,6 +32,10 @@
bytes topic = 6; // for request route
}
+message BHMsgBody {
+ bytes data = 1;
+}
+
message BHMsg { // deprecated
bytes msg_id = 1;
int64 timestamp = 2;
@@ -38,55 +46,71 @@
enum MsgType {
kMsgTypeInvalid = 0;
- kMsgTypeRequest = 1;
- kMsgTypeReply = 2;
- kMsgTypePublish = 3;
- kMsgTypeSubscribe = 4;
- kMsgTypeUnsubscribe = 5;
- kMsgTypeProcQueryTopic = 6;
- kMsgTypeProcQueryTopicReply = 7;
- kMsgTypeProcRegisterTopics = 8;
- kMsgTypeProcHeartbeat = 9;
+ kMsgTypeCommonReply = 2;
+
+ kMsgTypeRegister= 10;
+ // kMsgTypeRegisterReply= 11;
+ kMsgTypeHeartbeat = 12;
+ // kMsgTypeHeartbeatReply = 13;
+ kMsgTypeQueryTopic = 14;
+ kMsgTypeQueryTopicReply = 15;
+ kMsgTypeRequestTopic = 16;
+ kMsgTypeRequestTopicReply = 17;
+
+ kMsgTypePublish = 100;
+ // kMsgTypePublishReply = 101;
+ kMsgTypeSubscribe = 102;
+ // kMsgTypeSubscribeReply = 103;
+ kMsgTypeUnsubscribe = 104;
+ // kMsgTypeUnsubscribeReply = 105;
+
}
-message DataPub {
+message MsgPub {
bytes topic = 1;
bytes data = 2;
}
-message DataSub {
+message MsgSub {
repeated bytes topics = 1;
}
-message DataRequest {
+message MsgCommonReply {
+ ErrorMsg errmsg = 1;
+}
+
+message MsgRequestTopic {
bytes topic = 1;
bytes data = 2;
}
-message DataReply {
- bytes data = 1;
+message MsgRequestTopicReply {
+ ErrorMsg errmsg = 1;
+ bytes data = 2;
}
-message DataProcRegister
+message MsgRegister
{
ProcInfo proc = 1;
repeated bytes topics = 2;
}
-message DataProcHeartbeat
+message MsgHeartbeat
{
ProcInfo proc = 1;
}
-message DataProcQueryTopic {
+message MsgQueryTopic {
bytes topic = 1;
}
-message DataProcQueryTopicReply {
- BHAddress address = 1;
+message MsgQueryTopicReply {
+ ErrorMsg errmsg = 1;
+ BHAddress address = 2;
}
-service TopicRequestReplyService {
- rpc Request (DataRequest) returns (DataReply);
-}
\ No newline at end of file
+service TopicRPC {
+ rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply);
+ rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply);
+}
diff --git a/proto/source/error_msg.proto b/proto/source/error_msg.proto
new file mode 100644
index 0000000..f283108
--- /dev/null
+++ b/proto/source/error_msg.proto
@@ -0,0 +1,16 @@
+syntax = "proto3";
+
+option optimize_for = LITE_RUNTIME;
+
+package bhome.msg;
+
+enum ErrorCode {
+ eSuccess = 0;
+ eError = 1;
+ eInvalidInput = 2;
+}
+
+message ErrorMsg {
+ ErrorCode errCode = 1;
+ bytes errString = 2;
+}
diff --git a/src/msg.cpp b/src/msg.cpp
index c1dfff9..8752066 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -20,7 +20,10 @@
namespace bhome_msg
{
-
+/*TODO change msg format, header has proc info;
+reply has errer msg
+ center accept request and route.;
+//*/
const uint32_t kMsgTag = 0xf1e2d3c4;
const uint32_t kMsgPrefixLen = 4;
@@ -43,9 +46,9 @@
BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size)
{
- BHMsg msg(InitMsg(kMsgTypeRequest));
+ BHMsg msg(InitMsg(kMsgTypeRequestTopic));
AddRoute(msg, src_id);
- DataRequest req;
+ MsgRequestTopic req;
req.set_topic(topic);
req.set_data(data, size);
msg.set_body(req.SerializeAsString());
@@ -54,9 +57,9 @@
BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics)
{
- BHMsg msg(InitMsg(kMsgTypeProcRegisterTopics));
+ BHMsg msg(InitMsg(kMsgTypeRegister));
AddRoute(msg, src_id);
- DataProcRegister reg;
+ MsgRegister reg;
reg.mutable_proc()->Swap(&info);
for (auto &t : topics) {
reg.add_topics(t);
@@ -67,9 +70,9 @@
BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info)
{
- BHMsg msg(InitMsg(kMsgTypeProcHeartbeat));
+ BHMsg msg(InitMsg(kMsgTypeHeartbeat));
AddRoute(msg, src_id);
- DataProcRegister reg;
+ MsgHeartbeat reg;
reg.mutable_proc()->Swap(&info);
msg.set_body(reg.SerializeAsString());
return msg;
@@ -78,8 +81,8 @@
BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size)
{
assert(data && size);
- BHMsg msg(InitMsg(kMsgTypeReply, src_msgid));
- DataReply reply;
+ BHMsg msg(InitMsg(kMsgTypeRequestTopicReply, src_msgid));
+ MsgRequestTopicReply reply;
reply.set_data(data, size);
msg.set_body(reply.SerializeAsString());
return msg;
@@ -90,7 +93,7 @@
assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
BHMsg msg(InitMsg(sub_unsub));
AddRoute(msg, client);
- DataSub subs;
+ MsgSub subs;
for (auto &t : topics) {
subs.add_topics(t);
}
@@ -105,7 +108,7 @@
{
assert(data && size);
BHMsg msg(InitMsg(kMsgTypePublish));
- DataPub pub;
+ MsgPub pub;
pub.set_topic(topic);
pub.set_data(data, size);
msg.set_body(pub.SerializeAsString());
@@ -114,17 +117,17 @@
BHMsg MakeQueryTopic(const MQId &client, const std::string &topic)
{
- BHMsg msg(InitMsg(kMsgTypeProcQueryTopic));
+ BHMsg msg(InitMsg(kMsgTypeQueryTopic));
AddRoute(msg, client);
- DataProcQueryTopic query;
+ MsgQueryTopic query;
query.set_topic(topic);
msg.set_body(query.SerializeAsString());
return msg;
}
BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid)
{
- BHMsg msg(InitMsg(kMsgTypeProcQueryTopicReply, msgid));
- DataProcQueryTopicReply reply;
+ BHMsg msg(InitMsg(kMsgTypeQueryTopicReply, msgid));
+ MsgQueryTopicReply reply;
reply.mutable_address()->set_mq_id(mqid);
msg.set_body(reply.SerializeAsString());
return msg;
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index 8d26e0b..90688ec 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -49,7 +49,7 @@
{
auto AsyncRecvProc = [this, tdcb](BHMsg &msg) {
if (msg.type() == kMsgTypePublish) {
- DataPub d;
+ MsgPub d;
if (d.ParseFromString(msg.body())) {
tdcb(d.topic(), d.data());
}
@@ -65,7 +65,7 @@
{
BHMsg msg;
if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) {
- DataPub d;
+ MsgPub d;
if (d.ParseFromString(msg.body())) {
d.mutable_topic()->swap(topic);
d.mutable_data()->swap(data);
diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp
index b3af47d..698327e 100644
--- a/src/pubsub_center.cpp
+++ b/src/pubsub_center.cpp
@@ -94,7 +94,7 @@
auto &shm = socket.shm();
auto OnSubChange = [&](auto &&update) {
- DataSub sub;
+ MsgSub sub;
if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
assert(sizeof(MQId) == msg.route(0).mq_id().size());
MQId client;
@@ -106,7 +106,7 @@
auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); };
auto OnPublish = [&]() {
- DataPub pub;
+ MsgPub pub;
if (!pub.ParseFromString(msg.body())) {
return;
}
diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp
index e52b0fd..ce35d1c 100644
--- a/src/reqrep_center.cpp
+++ b/src/reqrep_center.cpp
@@ -100,12 +100,6 @@
std::unordered_map<ProcId, Node> nodes_;
};
-Synced<NodeCenter> &Center()
-{
- static Synced<NodeCenter> s;
- return s;
-}
-
} // namespace
BHCenter::MsgHandler MakeReqRepCenter()
@@ -120,7 +114,7 @@
time_t now = 0;
time(&now);
if (last.exchange(now) < now) {
- printf("bus queue size: %ld\n", socket.Pending());
+ printf("center queue size: %ld\n", socket.Pending());
}
#endif
auto SrcMQ = [&]() { return msg.route(0).mq_id(); };
@@ -128,7 +122,7 @@
auto OnRegister = [&]() {
if (msg.route_size() != 1) { return; }
- DataProcRegister reg;
+ MsgRegister reg;
if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end());
}
@@ -138,7 +132,7 @@
if (msg.route_size() != 1) { return; }
auto &src_mq = msg.route(0).mq_id();
- DataProcHeartbeat hb;
+ MsgHeartbeat hb;
if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
center->Heartbeat(*hb.mutable_proc(), SrcMQ());
}
@@ -147,7 +141,7 @@
auto OnQueryTopic = [&]() {
if (msg.route_size() != 1) { return; }
- DataProcQueryTopic query;
+ MsgQueryTopic query;
NodeCenter::ProcAddr dest;
if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
MQId remote;
@@ -161,9 +155,9 @@
};
switch (msg.type()) {
- case kMsgTypeProcRegisterTopics: OnRegister(); return true;
- case kMsgTypeProcHeartbeat: OnHeartbeat(); return true;
- case kMsgTypeProcQueryTopic: OnQueryTopic(); return true;
+ case kMsgTypeRegister: OnRegister(); return true;
+ case kMsgTypeHeartbeat: OnHeartbeat(); return true;
+ case kMsgTypeQueryTopic: OnQueryTopic(); return true;
default: return false;
}
};
@@ -176,4 +170,4 @@
const int kMaxWorker = 16;
return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
-}
\ No newline at end of file
+}
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index de38229..dcb5a9e 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -76,24 +76,16 @@
Queue *remote = Find(shm, MsgQIdToName(remote_id));
return remote && remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
}
+bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
+{
+ Queue *remote = Find(shm, MsgQIdToName(remote_id));
+ return remote && remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); });
+}
// Test shows that in the 2 cases:
// 1) build msg first, then find remote queue;
// 2) find remote queue first, then build msg;
// 1 is about 50% faster than 2, maybe cache related.
-
-bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, const std::function<void()> &onsend)
-{
- MsgI msg;
- if (msg.Make(shm(), data)) {
- if (Send(remote_id, msg, timeout_ms, onsend)) {
- return true;
- } else {
- msg.Release(shm());
- }
- }
- return false;
-}
bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms)
{
diff --git a/src/shm_queue.h b/src/shm_queue.h
index e9b3a1a..ab8a88c 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -134,22 +134,26 @@
bool Recv(BHMsg &msg, const int timeout_ms);
bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend);
- static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
+ static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
+
+ template <class... Extra>
+ bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, Extra const &...extra)
{
- return Send(shm, remote_id, msg, timeout_ms, []() {});
+ return Send(shm(), remote_id, msg, timeout_ms, extra...);
}
- bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms, OnSend const &onsend);
- bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms)
+
+ template <class... Extra>
+ bool Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, Extra const &...extra)
{
- return Send(remote_id, msg, timeout_ms, []() {});
- }
- bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend)
- {
- return Send(shm(), remote_id, msg, timeout_ms, onsend);
- }
- bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
- {
- return Send(shm(), remote_id, msg, timeout_ms);
+ MsgI msg;
+ if (msg.Make(shm(), data)) {
+ if (Send(shm(), remote_id, msg, timeout_ms, extra...)) {
+ return true;
+ } else {
+ msg.Release(shm());
+ }
+ }
+ return false;
}
size_t Pending() const { return data()->size(); }
};
diff --git a/src/socket.cpp b/src/socket.cpp
index 73681f1..b9def0c 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -49,15 +49,15 @@
Stop(); //TODO should stop in sub class, incase thread access sub class data.
}
-bool ShmSocket::Start(const RecvCB &onData, int nworker)
+bool ShmSocket::Start(const RecvCB &onData, const IdleCB &onIdle, int nworker)
{
- if (!mq_) {
- return false;
+ if (!mq_ || !onData) {
+ return false; // TODO error code.
}
std::lock_guard<std::mutex> lock(mutex_);
StopNoLock();
- auto RecvProc = [this, onData]() {
+ auto RecvProc = [this, onData, onIdle]() {
while (run_) {
try {
MsgI imsg;
@@ -67,6 +67,8 @@
if (imsg.Unpack(msg)) {
onData(*this, imsg, msg);
}
+ } else if (onIdle) {
+ onIdle(*this);
}
} catch (...) {
}
diff --git a/src/socket.h b/src/socket.h
index 1a3d47b..57d0ae4 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -37,6 +37,7 @@
typedef bhome_shm::SharedMemory Shm;
typedef std::function<void(ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg)> RecvCB;
typedef std::function<void(bhome_msg::BHMsg &msg)> RecvBHMsgCB;
+ typedef std::function<void(ShmSocket &sock)> IdleCB;
ShmSocket(Shm &shm, const void *id, const int len);
ShmSocket(Shm &shm, const int len = 12);
@@ -44,22 +45,27 @@
Shm &shm() { return shm_; }
// start recv.
- bool Start(const RecvCB &onData, int nworker = 1);
+ bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1);
+ bool Start(const RecvCB &onData, int nworker = 1) { return Start(onData, IdleCB(), nworker); }
+ bool Start(const RecvBHMsgCB &onData, const IdleCB &onIdle, int nworker = 1)
+ {
+ return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, onIdle, nworker);
+ }
bool Start(const RecvBHMsgCB &onData, int nworker = 1)
{
- return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, nworker);
+ return Start(onData, IdleCB(), nworker);
}
bool Stop();
size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
+
+ bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms);
+ bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms);
protected:
const Shm &shm() const { return shm_; }
Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid.
const Queue &mq() const { return *mq_; }
std::mutex &mutex() { return mutex_; }
-
- bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms);
- bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms);
private:
bool StopNoLock();
diff --git a/src/topic_reply.cpp b/src/topic_reply.cpp
new file mode 100644
index 0000000..356cf3e
--- /dev/null
+++ b/src/topic_reply.cpp
@@ -0,0 +1,142 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: topic_reply.cpp
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�06鏃� 14鏃�40鍒�52绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (),
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#include "topic_reply.h"
+#include <chrono>
+#include <list>
+
+using namespace bhome_msg;
+using namespace std::chrono;
+using namespace std::chrono_literals;
+
+namespace
+{
+struct SrcInfo {
+ std::vector<BHAddress> route;
+ std::string msg_id;
+};
+
+class FailedQ
+{
+ struct FailedMsg {
+ steady_clock::time_point xpr;
+ std::string remote_;
+ BHMsg msg_;
+ FailedMsg(const std::string &addr, BHMsg &&msg) :
+ xpr(steady_clock::now() + 10s), remote_(addr), msg_(std::move(msg)) {}
+ bool Expired() { return steady_clock::now() > xpr; }
+ };
+ typedef std::list<FailedMsg> Queue;
+ Synced<Queue> queue_;
+
+public:
+ void Push(const std::string &remote, BHMsg &&msg)
+ {
+ queue_->emplace_back(remote, std::move(msg));
+ }
+ void TrySend(ShmSocket &socket, const int timeout_ms = 0)
+ {
+ queue_.Apply([&](Queue &q) {
+ if (!q.empty()) {
+ auto it = q.begin();
+ do {
+ if (it->Expired() || socket.SyncSend(it->remote_.data(), it->msg_, timeout_ms)) {
+ it = q.erase(it);
+ } else {
+ ++it;
+ }
+ } while (it != q.end());
+ }
+ });
+ }
+};
+
+} // namespace
+
+bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
+{
+ //TODO check reply?
+ return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
+}
+bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
+{
+ return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
+}
+bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
+{
+ auto failed_q = std::make_shared<FailedQ>();
+
+ auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); };
+
+ auto onRecv = [this, rcb, failed_q, onIdle](BHMsg &msg) {
+ if (msg.type() == kMsgTypeRequestTopic && msg.route_size() > 0) {
+ MsgRequestTopic req;
+ if (req.ParseFromString(msg.body())) {
+ std::string out;
+ if (rcb(req.topic(), req.data(), out)) {
+ BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
+ for (int i = 0; i < msg.route_size() - 1; ++i) {
+ msg.add_route()->Swap(msg.mutable_route(i));
+ }
+ if (!SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 10)) {
+ failed_q->Push(msg.route().rbegin()->mq_id(), std::move(msg_reply));
+ }
+ }
+ }
+ } else {
+ // ignored, or dropped
+ }
+
+ onIdle(*this);
+ };
+
+ return rcb && Start(onRecv, onIdle, nworker);
+}
+
+bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
+{
+ BHMsg msg;
+ if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequestTopic) {
+ MsgRequestTopic request;
+ if (request.ParseFromString(msg.body())) {
+ request.mutable_topic()->swap(topic);
+ request.mutable_data()->swap(data);
+ SrcInfo *p = new SrcInfo;
+ p->route.assign(msg.route().begin(), msg.route().end());
+ p->msg_id = msg.msg_id();
+ src_info = p;
+ return true;
+ }
+ }
+ return false;
+}
+
+bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
+{
+ SrcInfo *p = static_cast<SrcInfo *>(src_info);
+ DEFER1(delete p);
+ if (!p || p->route.empty()) {
+ return false;
+ }
+
+ BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
+ for (unsigned i = 0; i < p->route.size() - 1; ++i) {
+ msg.add_route()->Swap(&p->route[i]);
+ }
+
+ return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
+}
\ No newline at end of file
diff --git a/src/topic_reply.h b/src/topic_reply.h
new file mode 100644
index 0000000..090ad88
--- /dev/null
+++ b/src/topic_reply.h
@@ -0,0 +1,52 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: topic_reply.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�06鏃� 14鏃�41鍒�12绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (),
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef TOPIC_REPLY_3RVYPPWI
+#define TOPIC_REPLY_3RVYPPWI
+
+#include "bh_util.h"
+#include "defs.h"
+#include "msg.h"
+#include "socket.h"
+#include <deque>
+#include <functional>
+
+using bhome::msg::ProcInfo;
+
+class SocketReply : private ShmSocket
+{
+ typedef ShmSocket Socket;
+
+public:
+ SocketReply(Socket::Shm &shm) :
+ Socket(shm, 64) {}
+ SocketReply() :
+ SocketReply(BHomeShm()) {}
+ ~SocketReply() { Stop(); }
+
+ typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
+ bool StartWorker(const OnRequest &rcb, int nworker = 2);
+ bool Stop() { return Socket::Stop(); }
+ bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
+ bool SendReply(void *src_info, const std::string &data, const int timeout_ms);
+ bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms);
+ bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms);
+
+private:
+};
+
+#endif // end of include guard: TOPIC_REPLY_3RVYPPWI
diff --git a/src/reqrep.cpp b/src/topic_request.cpp
similarity index 65%
rename from src/reqrep.cpp
rename to src/topic_request.cpp
index 25c0826..ce7c1a8 100644
--- a/src/reqrep.cpp
+++ b/src/topic_request.cpp
@@ -1,9 +1,9 @@
/*
* =====================================================================================
*
- * Filename: reqrep.cpp
+ * Filename: topic_request.cpp
*
- * Description: topic request/reply sockets
+ * Description: topic request sockets
*
* Version: 1.0
* Created: 2021骞�04鏈�01鏃� 09鏃�35鍒�35绉�
@@ -15,7 +15,7 @@
*
* =====================================================================================
*/
-#include "reqrep.h"
+#include "topic_request.h"
#include "bh_util.h"
#include "msg.h"
#include <chrono>
@@ -40,10 +40,10 @@
};
RecvBHMsgCB cb;
- if (Find(cb)) {
+ if (Find(cb) && cb) {
cb(msg);
- } else if (msg.type() == kMsgTypeReply) {
- DataReply reply;
+ } else if (msg.type() == kMsgTypeRequestTopicReply && rrcb) {
+ MsgRequestTopicReply reply;
if (reply.ParseFromString(msg.body())) {
rrcb(reply.data());
}
@@ -74,8 +74,8 @@
auto Call = [&](const void *remote) {
const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
auto onRecv = [cb](BHMsg &msg) {
- if (msg.type() == kMsgTypeReply) {
- DataReply reply;
+ if (msg.type() == kMsgTypeRequestTopicReply) {
+ MsgRequestTopicReply reply;
if (reply.ParseFromString(msg.body())) {
cb(reply.data());
}
@@ -103,16 +103,22 @@
if (QueryRPCTopic(topic, addr, timeout_ms)) {
const BHMsg &req(MakeRequest(mq().Id(), topic, data, size));
BHMsg reply;
- if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
- DataReply dr;
+ if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeRequestTopicReply) {
+ MsgRequestTopicReply dr;
if (dr.ParseFromString(reply.body())) {
dr.mutable_data()->swap(out);
return true;
+ } else {
+ printf("error parse reply.\n");
}
+ } else {
+ printf("error recv data. line: %d\n", __LINE__);
}
} else {
+ printf("error recv data. line: %d\n", __LINE__);
}
} catch (...) {
+ printf("error recv data. line: %d\n", __LINE__);
}
return false;
}
@@ -186,8 +192,8 @@
BHMsg result;
const BHMsg &msg = MakeQueryTopic(mq().Id(), topic);
if (SyncSendAndRecv(&kBHTopicReqRepCenter, &msg, &result, timeout_ms)) {
- if (result.type() == kMsgTypeProcQueryTopicReply) {
- DataProcQueryTopicReply reply;
+ if (result.type() == kMsgTypeQueryTopicReply) {
+ MsgQueryTopicReply reply;
if (reply.ParseFromString(result.body())) {
addr = reply.address();
if (addr.mq_id().empty()) {
@@ -202,79 +208,3 @@
}
return false;
}
-
-// reply socket
-namespace
-{
-struct SrcInfo {
- std::vector<BHAddress> route;
- std::string msg_id;
-};
-
-} // namespace
-
-bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
-{
- //TODO check reply?
- return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
-}
-bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
-{
- return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
-}
-bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
-{
- auto onRecv = [this, rcb](BHMsg &msg) {
- if (msg.type() == kMsgTypeRequest && msg.route_size() > 0) {
- DataRequest req;
- if (req.ParseFromString(msg.body())) {
- std::string out;
- if (rcb(req.topic(), req.data(), out)) {
- BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
- for (int i = 0; i < msg.route_size() - 1; ++i) {
- msg.add_route()->Swap(msg.mutable_route(i));
- }
- SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 100);
- }
- }
- } else {
- // ignored, or dropped
- }
- };
-
- return rcb && Start(onRecv, nworker);
-}
-
-bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
-{
- BHMsg msg;
- if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequest) {
- DataRequest request;
- if (request.ParseFromString(msg.body())) {
- request.mutable_topic()->swap(topic);
- request.mutable_data()->swap(data);
- SrcInfo *p = new SrcInfo;
- p->route.assign(msg.route().begin(), msg.route().end());
- p->msg_id = msg.msg_id();
- src_info = p;
- return true;
- }
- }
- return false;
-}
-
-bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
-{
- SrcInfo *p = static_cast<SrcInfo *>(src_info);
- DEFER1(delete p);
- if (!p || p->route.empty()) {
- return false;
- }
-
- BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
- for (unsigned i = 0; i < p->route.size() - 1; ++i) {
- msg.add_route()->Swap(&p->route[i]);
- }
-
- return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
-}
\ No newline at end of file
diff --git a/src/reqrep.h b/src/topic_request.h
similarity index 76%
rename from src/reqrep.h
rename to src/topic_request.h
index 8a4743c..6765dc2 100644
--- a/src/reqrep.h
+++ b/src/topic_request.h
@@ -1,9 +1,9 @@
/*
* =====================================================================================
*
- * Filename: reqrep.h
+ * Filename: topic_request.h
*
- * Description: topic request/reply sockets
+ * Description: topic request socket
*
* Version: 1.0
* Created: 2021骞�04鏈�01鏃� 09鏃�36鍒�06绉�
@@ -15,8 +15,8 @@
*
* =====================================================================================
*/
-#ifndef REQREP_ACEH09NK
-#define REQREP_ACEH09NK
+#ifndef TOPIC_REQUEST_ACEH09NK
+#define TOPIC_REQUEST_ACEH09NK
#include "bh_util.h"
#include "defs.h"
@@ -105,26 +105,4 @@
TopicCache topic_cache_;
};
-class SocketReply : private ShmSocket
-{
- typedef ShmSocket Socket;
-
-public:
- SocketReply(Socket::Shm &shm) :
- Socket(shm, 64) {}
- SocketReply() :
- SocketReply(BHomeShm()) {}
- ~SocketReply() { Stop(); }
-
- typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
- bool StartWorker(const OnRequest &rcb, int nworker = 2);
- bool Stop() { return Socket::Stop(); }
- bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
- bool SendReply(void *src_info, const std::string &data, const int timeout_ms);
- bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms);
- bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms);
-
-private:
-};
-
-#endif // end of include guard: REQREP_ACEH09NK
+#endif // end of include guard: TOPIC_REQUEST_ACEH09NK
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index dc64cc0..34f80d8 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -160,7 +160,7 @@
auto Server = [&]() {
BHMsg req;
while (!stop) {
- if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) {
+ if (srv.Recv(req, 100) && req.type() == kMsgTypeRequestTopic) {
auto &mqid = req.route()[0].mq_id();
MQId src_id;
memcpy(&src_id, mqid.data(), sizeof(src_id));
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 55a08a3..8f2a7f5 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,9 +1,10 @@
#include "defs.h"
#include "pubsub.h"
#include "pubsub_center.h"
-#include "reqrep.h"
#include "reqrep_center.h"
#include "socket.h"
+#include "topic_reply.h"
+#include "topic_request.h"
#include "util.h"
#include <atomic>
#include <boost/uuid/uuid_generators.hpp>
@@ -189,24 +190,26 @@
printf("count: %d\n", count.load());
}
};
- client.StartWorker(onRecv, 1);
+ client.StartWorker(onRecv, 2);
boost::timer::auto_cpu_timer timer;
for (int i = 0; i < nreq; ++i) {
if (!client.AsyncRequest(topic, "data " + std::to_string(i), 1000)) {
printf("client request failed\n");
}
+
// if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
// printf("client request failed\n");
// } else {
// ++count;
// }
}
- printf("request %s %d done ", topic.c_str(), nreq);
- while (count.load() < nreq) {
+ do {
std::this_thread::yield();
- }
+ } while (count.load() < nreq);
client.Stop();
+ printf("request %s %d done ", topic.c_str(), count.load());
};
+ std::atomic_uint64_t server_msg_count(0);
auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
SocketReply server(shm);
ProcInfo info;
@@ -215,7 +218,8 @@
if (!server.Register(info, topics, 100)) {
printf("register failed\n");
}
- auto onData = [](const std::string &topic, const std::string &data, std::string &reply) {
+ auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) {
+ ++server_msg_count;
reply = topic + ':' + data;
return true;
};
@@ -229,9 +233,10 @@
servers.Launch(Server, "server", topics);
std::this_thread::sleep_for(100ms);
for (auto &t : topics) {
- clients.Launch(Client, t, 1000 * 1000);
+ clients.Launch(Client, t, 1000 * 100);
}
clients.WaitAll();
+ printf("clients done, server replyed: %d\n", server_msg_count.load());
run = false;
servers.WaitAll();
}
--
Gitblit v1.8.0