From bb9a7e348892eb5c4fccb063380aa6fcd9612b71 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 06 四月 2021 17:32:35 +0800 Subject: [PATCH] server resend failed; rename msgs; refactor. --- src/topic_request.h | 32 -- .vscode/tasks.json | 28 -- src/socket.h | 16 + src/topic_reply.cpp | 142 ++++++++++++ src/reqrep_center.cpp | 22 - src/topic_reply.h | 52 ++++ .vscode/settings.json | 6 src/socket.cpp | 10 proto/source/bhome_msg.proto | 70 ++++-- utest/utest.cpp | 19 + proto/source/error_msg.proto | 16 + src/shm_queue.cpp | 18 - src/msg.cpp | 33 +- utest/speed_test.cpp | 2 .vscode/launch.json | 5 src/shm_queue.h | 30 +- src/pubsub.cpp | 4 src/pubsub_center.cpp | 4 src/topic_request.cpp | 106 +-------- 19 files changed, 375 insertions(+), 240 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index c959458..9eeb23e 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -9,7 +9,10 @@ "type": "cppdbg", "request": "launch", "program": "${workspaceFolder}/utest/utest", - "args": [], + "args": [ + "-t", + "ReqRepTest" + ], "stopAtEntry": false, "cwd": "${workspaceFolder}", "environment": [], diff --git a/.vscode/settings.json b/.vscode/settings.json index c9d8618..d5005e9 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -56,5 +56,9 @@ "typeindex": "cpp", "typeinfo": "cpp", "variant": "cpp" - } + }, + "files.exclude": { + "**/*.un~": true + }, + "cmake.configureOnOpen": false } \ No newline at end of file diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 352fba0..84142bc 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -3,32 +3,13 @@ { "type": "cppbuild", "label": "C/C++: g++ build active file", - "command": "/usr/bin/g++", + "command": "ninja", "args": [ - "-g", - "${file}", - "-o", - "${fileDirname}/${fileBasenameNoExtension}" + "-C", + "../build" ], "options": { - "cwd": "${workspaceFolder}" - }, - "problemMatcher": [ - "$gcc" - ], - "group": { - "kind": "build", - "isDefault": true - }, - "detail": "Task generated by Debugger." - }, - { - "type": "cppbuild", - "label": "C/C++: g++ build active file", - "command": "make", - "args": ["build"], - "options": { - "cwd": "${workspaceFolder}" + "cwd": "${workspaceFolder}/utest" }, "problemMatcher": [ "$gcc" @@ -36,7 +17,6 @@ "group": "build", "detail": "compiler: /usr/bin/g++" } - ], "version": "2.0.0" } \ No newline at end of file diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto index a8e5073..9827f17 100644 --- a/proto/source/bhome_msg.proto +++ b/proto/source/bhome_msg.proto @@ -2,7 +2,11 @@ option optimize_for = LITE_RUNTIME; +import "google/protobuf/descriptor.proto"; +import "error_msg.proto"; + package bhome.msg; + // message format : header(BHMsgHead) + body(variable types) message BHAddress { @@ -13,7 +17,7 @@ message ProcInfo { - bytes id = 1; + bytes id = 1; // serial number, maybe managed bytes name = 2; bytes public_info = 3; bytes private_info = 4; @@ -28,6 +32,10 @@ bytes topic = 6; // for request route } +message BHMsgBody { + bytes data = 1; +} + message BHMsg { // deprecated bytes msg_id = 1; int64 timestamp = 2; @@ -38,55 +46,71 @@ enum MsgType { kMsgTypeInvalid = 0; - kMsgTypeRequest = 1; - kMsgTypeReply = 2; - kMsgTypePublish = 3; - kMsgTypeSubscribe = 4; - kMsgTypeUnsubscribe = 5; - kMsgTypeProcQueryTopic = 6; - kMsgTypeProcQueryTopicReply = 7; - kMsgTypeProcRegisterTopics = 8; - kMsgTypeProcHeartbeat = 9; + kMsgTypeCommonReply = 2; + + kMsgTypeRegister= 10; + // kMsgTypeRegisterReply= 11; + kMsgTypeHeartbeat = 12; + // kMsgTypeHeartbeatReply = 13; + kMsgTypeQueryTopic = 14; + kMsgTypeQueryTopicReply = 15; + kMsgTypeRequestTopic = 16; + kMsgTypeRequestTopicReply = 17; + + kMsgTypePublish = 100; + // kMsgTypePublishReply = 101; + kMsgTypeSubscribe = 102; + // kMsgTypeSubscribeReply = 103; + kMsgTypeUnsubscribe = 104; + // kMsgTypeUnsubscribeReply = 105; + } -message DataPub { +message MsgPub { bytes topic = 1; bytes data = 2; } -message DataSub { +message MsgSub { repeated bytes topics = 1; } -message DataRequest { +message MsgCommonReply { + ErrorMsg errmsg = 1; +} + +message MsgRequestTopic { bytes topic = 1; bytes data = 2; } -message DataReply { - bytes data = 1; +message MsgRequestTopicReply { + ErrorMsg errmsg = 1; + bytes data = 2; } -message DataProcRegister +message MsgRegister { ProcInfo proc = 1; repeated bytes topics = 2; } -message DataProcHeartbeat +message MsgHeartbeat { ProcInfo proc = 1; } -message DataProcQueryTopic { +message MsgQueryTopic { bytes topic = 1; } -message DataProcQueryTopicReply { - BHAddress address = 1; +message MsgQueryTopicReply { + ErrorMsg errmsg = 1; + BHAddress address = 2; } -service TopicRequestReplyService { - rpc Request (DataRequest) returns (DataReply); -} \ No newline at end of file +service TopicRPC { + rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply); + rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply); +} diff --git a/proto/source/error_msg.proto b/proto/source/error_msg.proto new file mode 100644 index 0000000..f283108 --- /dev/null +++ b/proto/source/error_msg.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +option optimize_for = LITE_RUNTIME; + +package bhome.msg; + +enum ErrorCode { + eSuccess = 0; + eError = 1; + eInvalidInput = 2; +} + +message ErrorMsg { + ErrorCode errCode = 1; + bytes errString = 2; +} diff --git a/src/msg.cpp b/src/msg.cpp index c1dfff9..8752066 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -20,7 +20,10 @@ namespace bhome_msg { - +/*TODO change msg format, header has proc info; +reply has errer msg + center accept request and route.; +//*/ const uint32_t kMsgTag = 0xf1e2d3c4; const uint32_t kMsgPrefixLen = 4; @@ -43,9 +46,9 @@ BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size) { - BHMsg msg(InitMsg(kMsgTypeRequest)); + BHMsg msg(InitMsg(kMsgTypeRequestTopic)); AddRoute(msg, src_id); - DataRequest req; + MsgRequestTopic req; req.set_topic(topic); req.set_data(data, size); msg.set_body(req.SerializeAsString()); @@ -54,9 +57,9 @@ BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics) { - BHMsg msg(InitMsg(kMsgTypeProcRegisterTopics)); + BHMsg msg(InitMsg(kMsgTypeRegister)); AddRoute(msg, src_id); - DataProcRegister reg; + MsgRegister reg; reg.mutable_proc()->Swap(&info); for (auto &t : topics) { reg.add_topics(t); @@ -67,9 +70,9 @@ BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info) { - BHMsg msg(InitMsg(kMsgTypeProcHeartbeat)); + BHMsg msg(InitMsg(kMsgTypeHeartbeat)); AddRoute(msg, src_id); - DataProcRegister reg; + MsgHeartbeat reg; reg.mutable_proc()->Swap(&info); msg.set_body(reg.SerializeAsString()); return msg; @@ -78,8 +81,8 @@ BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size) { assert(data && size); - BHMsg msg(InitMsg(kMsgTypeReply, src_msgid)); - DataReply reply; + BHMsg msg(InitMsg(kMsgTypeRequestTopicReply, src_msgid)); + MsgRequestTopicReply reply; reply.set_data(data, size); msg.set_body(reply.SerializeAsString()); return msg; @@ -90,7 +93,7 @@ assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe); BHMsg msg(InitMsg(sub_unsub)); AddRoute(msg, client); - DataSub subs; + MsgSub subs; for (auto &t : topics) { subs.add_topics(t); } @@ -105,7 +108,7 @@ { assert(data && size); BHMsg msg(InitMsg(kMsgTypePublish)); - DataPub pub; + MsgPub pub; pub.set_topic(topic); pub.set_data(data, size); msg.set_body(pub.SerializeAsString()); @@ -114,17 +117,17 @@ BHMsg MakeQueryTopic(const MQId &client, const std::string &topic) { - BHMsg msg(InitMsg(kMsgTypeProcQueryTopic)); + BHMsg msg(InitMsg(kMsgTypeQueryTopic)); AddRoute(msg, client); - DataProcQueryTopic query; + MsgQueryTopic query; query.set_topic(topic); msg.set_body(query.SerializeAsString()); return msg; } BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid) { - BHMsg msg(InitMsg(kMsgTypeProcQueryTopicReply, msgid)); - DataProcQueryTopicReply reply; + BHMsg msg(InitMsg(kMsgTypeQueryTopicReply, msgid)); + MsgQueryTopicReply reply; reply.mutable_address()->set_mq_id(mqid); msg.set_body(reply.SerializeAsString()); return msg; diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 8d26e0b..90688ec 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -49,7 +49,7 @@ { auto AsyncRecvProc = [this, tdcb](BHMsg &msg) { if (msg.type() == kMsgTypePublish) { - DataPub d; + MsgPub d; if (d.ParseFromString(msg.body())) { tdcb(d.topic(), d.data()); } @@ -65,7 +65,7 @@ { BHMsg msg; if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) { - DataPub d; + MsgPub d; if (d.ParseFromString(msg.body())) { d.mutable_topic()->swap(topic); d.mutable_data()->swap(data); diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp index b3af47d..698327e 100644 --- a/src/pubsub_center.cpp +++ b/src/pubsub_center.cpp @@ -94,7 +94,7 @@ auto &shm = socket.shm(); auto OnSubChange = [&](auto &&update) { - DataSub sub; + MsgSub sub; if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) { assert(sizeof(MQId) == msg.route(0).mq_id().size()); MQId client; @@ -106,7 +106,7 @@ auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); }; auto OnPublish = [&]() { - DataPub pub; + MsgPub pub; if (!pub.ParseFromString(msg.body())) { return; } diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp index e52b0fd..ce35d1c 100644 --- a/src/reqrep_center.cpp +++ b/src/reqrep_center.cpp @@ -100,12 +100,6 @@ std::unordered_map<ProcId, Node> nodes_; }; -Synced<NodeCenter> &Center() -{ - static Synced<NodeCenter> s; - return s; -} - } // namespace BHCenter::MsgHandler MakeReqRepCenter() @@ -120,7 +114,7 @@ time_t now = 0; time(&now); if (last.exchange(now) < now) { - printf("bus queue size: %ld\n", socket.Pending()); + printf("center queue size: %ld\n", socket.Pending()); } #endif auto SrcMQ = [&]() { return msg.route(0).mq_id(); }; @@ -128,7 +122,7 @@ auto OnRegister = [&]() { if (msg.route_size() != 1) { return; } - DataProcRegister reg; + MsgRegister reg; if (reg.ParseFromString(msg.body()) && reg.has_proc()) { center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end()); } @@ -138,7 +132,7 @@ if (msg.route_size() != 1) { return; } auto &src_mq = msg.route(0).mq_id(); - DataProcHeartbeat hb; + MsgHeartbeat hb; if (hb.ParseFromString(msg.body()) && hb.has_proc()) { center->Heartbeat(*hb.mutable_proc(), SrcMQ()); } @@ -147,7 +141,7 @@ auto OnQueryTopic = [&]() { if (msg.route_size() != 1) { return; } - DataProcQueryTopic query; + MsgQueryTopic query; NodeCenter::ProcAddr dest; if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) { MQId remote; @@ -161,9 +155,9 @@ }; switch (msg.type()) { - case kMsgTypeProcRegisterTopics: OnRegister(); return true; - case kMsgTypeProcHeartbeat: OnHeartbeat(); return true; - case kMsgTypeProcQueryTopic: OnQueryTopic(); return true; + case kMsgTypeRegister: OnRegister(); return true; + case kMsgTypeHeartbeat: OnHeartbeat(); return true; + case kMsgTypeQueryTopic: OnQueryTopic(); return true; default: return false; } }; @@ -176,4 +170,4 @@ const int kMaxWorker = 16; return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); -} \ No newline at end of file +} diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp index de38229..dcb5a9e 100644 --- a/src/shm_queue.cpp +++ b/src/shm_queue.cpp @@ -76,24 +76,16 @@ Queue *remote = Find(shm, MsgQIdToName(remote_id)); return remote && remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); } +bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms) +{ + Queue *remote = Find(shm, MsgQIdToName(remote_id)); + return remote && remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); }); +} // Test shows that in the 2 cases: // 1) build msg first, then find remote queue; // 2) find remote queue first, then build msg; // 1 is about 50% faster than 2, maybe cache related. - -bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, const std::function<void()> &onsend) -{ - MsgI msg; - if (msg.Make(shm(), data)) { - if (Send(remote_id, msg, timeout_ms, onsend)) { - return true; - } else { - msg.Release(shm()); - } - } - return false; -} bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms) { diff --git a/src/shm_queue.h b/src/shm_queue.h index e9b3a1a..ab8a88c 100644 --- a/src/shm_queue.h +++ b/src/shm_queue.h @@ -134,22 +134,26 @@ bool Recv(BHMsg &msg, const int timeout_ms); bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); } static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend); - static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms) + static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms); + + template <class... Extra> + bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, Extra const &...extra) { - return Send(shm, remote_id, msg, timeout_ms, []() {}); + return Send(shm(), remote_id, msg, timeout_ms, extra...); } - bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms, OnSend const &onsend); - bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms) + + template <class... Extra> + bool Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, Extra const &...extra) { - return Send(remote_id, msg, timeout_ms, []() {}); - } - bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend) - { - return Send(shm(), remote_id, msg, timeout_ms, onsend); - } - bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms) - { - return Send(shm(), remote_id, msg, timeout_ms); + MsgI msg; + if (msg.Make(shm(), data)) { + if (Send(shm(), remote_id, msg, timeout_ms, extra...)) { + return true; + } else { + msg.Release(shm()); + } + } + return false; } size_t Pending() const { return data()->size(); } }; diff --git a/src/socket.cpp b/src/socket.cpp index 73681f1..b9def0c 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -49,15 +49,15 @@ Stop(); //TODO should stop in sub class, incase thread access sub class data. } -bool ShmSocket::Start(const RecvCB &onData, int nworker) +bool ShmSocket::Start(const RecvCB &onData, const IdleCB &onIdle, int nworker) { - if (!mq_) { - return false; + if (!mq_ || !onData) { + return false; // TODO error code. } std::lock_guard<std::mutex> lock(mutex_); StopNoLock(); - auto RecvProc = [this, onData]() { + auto RecvProc = [this, onData, onIdle]() { while (run_) { try { MsgI imsg; @@ -67,6 +67,8 @@ if (imsg.Unpack(msg)) { onData(*this, imsg, msg); } + } else if (onIdle) { + onIdle(*this); } } catch (...) { } diff --git a/src/socket.h b/src/socket.h index 1a3d47b..57d0ae4 100644 --- a/src/socket.h +++ b/src/socket.h @@ -37,6 +37,7 @@ typedef bhome_shm::SharedMemory Shm; typedef std::function<void(ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg)> RecvCB; typedef std::function<void(bhome_msg::BHMsg &msg)> RecvBHMsgCB; + typedef std::function<void(ShmSocket &sock)> IdleCB; ShmSocket(Shm &shm, const void *id, const int len); ShmSocket(Shm &shm, const int len = 12); @@ -44,22 +45,27 @@ Shm &shm() { return shm_; } // start recv. - bool Start(const RecvCB &onData, int nworker = 1); + bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1); + bool Start(const RecvCB &onData, int nworker = 1) { return Start(onData, IdleCB(), nworker); } + bool Start(const RecvBHMsgCB &onData, const IdleCB &onIdle, int nworker = 1) + { + return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, onIdle, nworker); + } bool Start(const RecvBHMsgCB &onData, int nworker = 1) { - return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, nworker); + return Start(onData, IdleCB(), nworker); } bool Stop(); size_t Pending() const { return mq_ ? mq_->Pending() : 0; } + + bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms); + bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms); protected: const Shm &shm() const { return shm_; } Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid. const Queue &mq() const { return *mq_; } std::mutex &mutex() { return mutex_; } - - bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms); - bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms); private: bool StopNoLock(); diff --git a/src/topic_reply.cpp b/src/topic_reply.cpp new file mode 100644 index 0000000..356cf3e --- /dev/null +++ b/src/topic_reply.cpp @@ -0,0 +1,142 @@ +/* + * ===================================================================================== + * + * Filename: topic_reply.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�06鏃� 14鏃�40鍒�52绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ +#include "topic_reply.h" +#include <chrono> +#include <list> + +using namespace bhome_msg; +using namespace std::chrono; +using namespace std::chrono_literals; + +namespace +{ +struct SrcInfo { + std::vector<BHAddress> route; + std::string msg_id; +}; + +class FailedQ +{ + struct FailedMsg { + steady_clock::time_point xpr; + std::string remote_; + BHMsg msg_; + FailedMsg(const std::string &addr, BHMsg &&msg) : + xpr(steady_clock::now() + 10s), remote_(addr), msg_(std::move(msg)) {} + bool Expired() { return steady_clock::now() > xpr; } + }; + typedef std::list<FailedMsg> Queue; + Synced<Queue> queue_; + +public: + void Push(const std::string &remote, BHMsg &&msg) + { + queue_->emplace_back(remote, std::move(msg)); + } + void TrySend(ShmSocket &socket, const int timeout_ms = 0) + { + queue_.Apply([&](Queue &q) { + if (!q.empty()) { + auto it = q.begin(); + do { + if (it->Expired() || socket.SyncSend(it->remote_.data(), it->msg_, timeout_ms)) { + it = q.erase(it); + } else { + ++it; + } + } while (it != q.end()); + } + }); + } +}; + +} // namespace + +bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms) +{ + //TODO check reply? + return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms); +} +bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms) +{ + return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms); +} +bool SocketReply::StartWorker(const OnRequest &rcb, int nworker) +{ + auto failed_q = std::make_shared<FailedQ>(); + + auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); }; + + auto onRecv = [this, rcb, failed_q, onIdle](BHMsg &msg) { + if (msg.type() == kMsgTypeRequestTopic && msg.route_size() > 0) { + MsgRequestTopic req; + if (req.ParseFromString(msg.body())) { + std::string out; + if (rcb(req.topic(), req.data(), out)) { + BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size())); + for (int i = 0; i < msg.route_size() - 1; ++i) { + msg.add_route()->Swap(msg.mutable_route(i)); + } + if (!SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 10)) { + failed_q->Push(msg.route().rbegin()->mq_id(), std::move(msg_reply)); + } + } + } + } else { + // ignored, or dropped + } + + onIdle(*this); + }; + + return rcb && Start(onRecv, onIdle, nworker); +} + +bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) +{ + BHMsg msg; + if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequestTopic) { + MsgRequestTopic request; + if (request.ParseFromString(msg.body())) { + request.mutable_topic()->swap(topic); + request.mutable_data()->swap(data); + SrcInfo *p = new SrcInfo; + p->route.assign(msg.route().begin(), msg.route().end()); + p->msg_id = msg.msg_id(); + src_info = p; + return true; + } + } + return false; +} + +bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms) +{ + SrcInfo *p = static_cast<SrcInfo *>(src_info); + DEFER1(delete p); + if (!p || p->route.empty()) { + return false; + } + + BHMsg msg(MakeReply(p->msg_id, data.data(), data.size())); + for (unsigned i = 0; i < p->route.size() - 1; ++i) { + msg.add_route()->Swap(&p->route[i]); + } + + return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms); +} \ No newline at end of file diff --git a/src/topic_reply.h b/src/topic_reply.h new file mode 100644 index 0000000..090ad88 --- /dev/null +++ b/src/topic_reply.h @@ -0,0 +1,52 @@ +/* + * ===================================================================================== + * + * Filename: topic_reply.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�06鏃� 14鏃�41鍒�12绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ +#ifndef TOPIC_REPLY_3RVYPPWI +#define TOPIC_REPLY_3RVYPPWI + +#include "bh_util.h" +#include "defs.h" +#include "msg.h" +#include "socket.h" +#include <deque> +#include <functional> + +using bhome::msg::ProcInfo; + +class SocketReply : private ShmSocket +{ + typedef ShmSocket Socket; + +public: + SocketReply(Socket::Shm &shm) : + Socket(shm, 64) {} + SocketReply() : + SocketReply(BHomeShm()) {} + ~SocketReply() { Stop(); } + + typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest; + bool StartWorker(const OnRequest &rcb, int nworker = 2); + bool Stop() { return Socket::Stop(); } + bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms); + bool SendReply(void *src_info, const std::string &data, const int timeout_ms); + bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms); + bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms); + +private: +}; + +#endif // end of include guard: TOPIC_REPLY_3RVYPPWI diff --git a/src/reqrep.cpp b/src/topic_request.cpp similarity index 65% rename from src/reqrep.cpp rename to src/topic_request.cpp index 25c0826..ce7c1a8 100644 --- a/src/reqrep.cpp +++ b/src/topic_request.cpp @@ -1,9 +1,9 @@ /* * ===================================================================================== * - * Filename: reqrep.cpp + * Filename: topic_request.cpp * - * Description: topic request/reply sockets + * Description: topic request sockets * * Version: 1.0 * Created: 2021骞�04鏈�01鏃� 09鏃�35鍒�35绉� @@ -15,7 +15,7 @@ * * ===================================================================================== */ -#include "reqrep.h" +#include "topic_request.h" #include "bh_util.h" #include "msg.h" #include <chrono> @@ -40,10 +40,10 @@ }; RecvBHMsgCB cb; - if (Find(cb)) { + if (Find(cb) && cb) { cb(msg); - } else if (msg.type() == kMsgTypeReply) { - DataReply reply; + } else if (msg.type() == kMsgTypeRequestTopicReply && rrcb) { + MsgRequestTopicReply reply; if (reply.ParseFromString(msg.body())) { rrcb(reply.data()); } @@ -74,8 +74,8 @@ auto Call = [&](const void *remote) { const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); auto onRecv = [cb](BHMsg &msg) { - if (msg.type() == kMsgTypeReply) { - DataReply reply; + if (msg.type() == kMsgTypeRequestTopicReply) { + MsgRequestTopicReply reply; if (reply.ParseFromString(msg.body())) { cb(reply.data()); } @@ -103,16 +103,22 @@ if (QueryRPCTopic(topic, addr, timeout_ms)) { const BHMsg &req(MakeRequest(mq().Id(), topic, data, size)); BHMsg reply; - if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeReply) { - DataReply dr; + if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeRequestTopicReply) { + MsgRequestTopicReply dr; if (dr.ParseFromString(reply.body())) { dr.mutable_data()->swap(out); return true; + } else { + printf("error parse reply.\n"); } + } else { + printf("error recv data. line: %d\n", __LINE__); } } else { + printf("error recv data. line: %d\n", __LINE__); } } catch (...) { + printf("error recv data. line: %d\n", __LINE__); } return false; } @@ -186,8 +192,8 @@ BHMsg result; const BHMsg &msg = MakeQueryTopic(mq().Id(), topic); if (SyncSendAndRecv(&kBHTopicReqRepCenter, &msg, &result, timeout_ms)) { - if (result.type() == kMsgTypeProcQueryTopicReply) { - DataProcQueryTopicReply reply; + if (result.type() == kMsgTypeQueryTopicReply) { + MsgQueryTopicReply reply; if (reply.ParseFromString(result.body())) { addr = reply.address(); if (addr.mq_id().empty()) { @@ -202,79 +208,3 @@ } return false; } - -// reply socket -namespace -{ -struct SrcInfo { - std::vector<BHAddress> route; - std::string msg_id; -}; - -} // namespace - -bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms) -{ - //TODO check reply? - return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms); -} -bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms) -{ - return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms); -} -bool SocketReply::StartWorker(const OnRequest &rcb, int nworker) -{ - auto onRecv = [this, rcb](BHMsg &msg) { - if (msg.type() == kMsgTypeRequest && msg.route_size() > 0) { - DataRequest req; - if (req.ParseFromString(msg.body())) { - std::string out; - if (rcb(req.topic(), req.data(), out)) { - BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size())); - for (int i = 0; i < msg.route_size() - 1; ++i) { - msg.add_route()->Swap(msg.mutable_route(i)); - } - SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 100); - } - } - } else { - // ignored, or dropped - } - }; - - return rcb && Start(onRecv, nworker); -} - -bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) -{ - BHMsg msg; - if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequest) { - DataRequest request; - if (request.ParseFromString(msg.body())) { - request.mutable_topic()->swap(topic); - request.mutable_data()->swap(data); - SrcInfo *p = new SrcInfo; - p->route.assign(msg.route().begin(), msg.route().end()); - p->msg_id = msg.msg_id(); - src_info = p; - return true; - } - } - return false; -} - -bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms) -{ - SrcInfo *p = static_cast<SrcInfo *>(src_info); - DEFER1(delete p); - if (!p || p->route.empty()) { - return false; - } - - BHMsg msg(MakeReply(p->msg_id, data.data(), data.size())); - for (unsigned i = 0; i < p->route.size() - 1; ++i) { - msg.add_route()->Swap(&p->route[i]); - } - - return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms); -} \ No newline at end of file diff --git a/src/reqrep.h b/src/topic_request.h similarity index 76% rename from src/reqrep.h rename to src/topic_request.h index 8a4743c..6765dc2 100644 --- a/src/reqrep.h +++ b/src/topic_request.h @@ -1,9 +1,9 @@ /* * ===================================================================================== * - * Filename: reqrep.h + * Filename: topic_request.h * - * Description: topic request/reply sockets + * Description: topic request socket * * Version: 1.0 * Created: 2021骞�04鏈�01鏃� 09鏃�36鍒�06绉� @@ -15,8 +15,8 @@ * * ===================================================================================== */ -#ifndef REQREP_ACEH09NK -#define REQREP_ACEH09NK +#ifndef TOPIC_REQUEST_ACEH09NK +#define TOPIC_REQUEST_ACEH09NK #include "bh_util.h" #include "defs.h" @@ -105,26 +105,4 @@ TopicCache topic_cache_; }; -class SocketReply : private ShmSocket -{ - typedef ShmSocket Socket; - -public: - SocketReply(Socket::Shm &shm) : - Socket(shm, 64) {} - SocketReply() : - SocketReply(BHomeShm()) {} - ~SocketReply() { Stop(); } - - typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest; - bool StartWorker(const OnRequest &rcb, int nworker = 2); - bool Stop() { return Socket::Stop(); } - bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms); - bool SendReply(void *src_info, const std::string &data, const int timeout_ms); - bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms); - bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms); - -private: -}; - -#endif // end of include guard: REQREP_ACEH09NK +#endif // end of include guard: TOPIC_REQUEST_ACEH09NK diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index dc64cc0..34f80d8 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -160,7 +160,7 @@ auto Server = [&]() { BHMsg req; while (!stop) { - if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) { + if (srv.Recv(req, 100) && req.type() == kMsgTypeRequestTopic) { auto &mqid = req.route()[0].mq_id(); MQId src_id; memcpy(&src_id, mqid.data(), sizeof(src_id)); diff --git a/utest/utest.cpp b/utest/utest.cpp index 55a08a3..8f2a7f5 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -1,9 +1,10 @@ #include "defs.h" #include "pubsub.h" #include "pubsub_center.h" -#include "reqrep.h" #include "reqrep_center.h" #include "socket.h" +#include "topic_reply.h" +#include "topic_request.h" #include "util.h" #include <atomic> #include <boost/uuid/uuid_generators.hpp> @@ -189,24 +190,26 @@ printf("count: %d\n", count.load()); } }; - client.StartWorker(onRecv, 1); + client.StartWorker(onRecv, 2); boost::timer::auto_cpu_timer timer; for (int i = 0; i < nreq; ++i) { if (!client.AsyncRequest(topic, "data " + std::to_string(i), 1000)) { printf("client request failed\n"); } + // if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) { // printf("client request failed\n"); // } else { // ++count; // } } - printf("request %s %d done ", topic.c_str(), nreq); - while (count.load() < nreq) { + do { std::this_thread::yield(); - } + } while (count.load() < nreq); client.Stop(); + printf("request %s %d done ", topic.c_str(), count.load()); }; + std::atomic_uint64_t server_msg_count(0); auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { SocketReply server(shm); ProcInfo info; @@ -215,7 +218,8 @@ if (!server.Register(info, topics, 100)) { printf("register failed\n"); } - auto onData = [](const std::string &topic, const std::string &data, std::string &reply) { + auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) { + ++server_msg_count; reply = topic + ':' + data; return true; }; @@ -229,9 +233,10 @@ servers.Launch(Server, "server", topics); std::this_thread::sleep_for(100ms); for (auto &t : topics) { - clients.Launch(Client, t, 1000 * 1000); + clients.Launch(Client, t, 1000 * 100); } clients.WaitAll(); + printf("clients done, server replyed: %d\n", server_msg_count.load()); run = false; servers.WaitAll(); } -- Gitblit v1.8.0