server resend failed; rename msgs; refactor.
3个文件已添加
2 文件已重命名
14个文件已修改
| | |
| | | "type": "cppdbg", |
| | | "request": "launch", |
| | | "program": "${workspaceFolder}/utest/utest", |
| | | "args": [], |
| | | "args": [ |
| | | "-t", |
| | | "ReqRepTest" |
| | | ], |
| | | "stopAtEntry": false, |
| | | "cwd": "${workspaceFolder}", |
| | | "environment": [], |
| | |
| | | "typeindex": "cpp", |
| | | "typeinfo": "cpp", |
| | | "variant": "cpp" |
| | | } |
| | | }, |
| | | "files.exclude": { |
| | | "**/*.un~": true |
| | | }, |
| | | "cmake.configureOnOpen": false |
| | | } |
| | |
| | | { |
| | | "type": "cppbuild", |
| | | "label": "C/C++: g++ build active file", |
| | | "command": "/usr/bin/g++", |
| | | "command": "ninja", |
| | | "args": [ |
| | | "-g", |
| | | "${file}", |
| | | "-o", |
| | | "${fileDirname}/${fileBasenameNoExtension}" |
| | | "-C", |
| | | "../build" |
| | | ], |
| | | "options": { |
| | | "cwd": "${workspaceFolder}" |
| | | }, |
| | | "problemMatcher": [ |
| | | "$gcc" |
| | | ], |
| | | "group": { |
| | | "kind": "build", |
| | | "isDefault": true |
| | | }, |
| | | "detail": "Task generated by Debugger." |
| | | }, |
| | | { |
| | | "type": "cppbuild", |
| | | "label": "C/C++: g++ build active file", |
| | | "command": "make", |
| | | "args": ["build"], |
| | | "options": { |
| | | "cwd": "${workspaceFolder}" |
| | | "cwd": "${workspaceFolder}/utest" |
| | | }, |
| | | "problemMatcher": [ |
| | | "$gcc" |
| | |
| | | "group": "build", |
| | | "detail": "compiler: /usr/bin/g++" |
| | | } |
| | | |
| | | ], |
| | | "version": "2.0.0" |
| | | } |
| | |
| | | |
| | | option optimize_for = LITE_RUNTIME; |
| | | |
| | | import "google/protobuf/descriptor.proto"; |
| | | import "error_msg.proto"; |
| | | |
| | | package bhome.msg; |
| | | |
| | | |
| | | // message format : header(BHMsgHead) + body(variable types) |
| | | message BHAddress { |
| | |
| | | |
| | | message ProcInfo |
| | | { |
| | | bytes id = 1; |
| | | bytes id = 1; // serial number, maybe managed |
| | | bytes name = 2; |
| | | bytes public_info = 3; |
| | | bytes private_info = 4; |
| | |
| | | bytes topic = 6; // for request route |
| | | } |
| | | |
| | | message BHMsgBody { |
| | | bytes data = 1; |
| | | } |
| | | |
| | | message BHMsg { // deprecated |
| | | bytes msg_id = 1; |
| | | int64 timestamp = 2; |
| | |
| | | |
| | | enum MsgType { |
| | | kMsgTypeInvalid = 0; |
| | | kMsgTypeRequest = 1; |
| | | kMsgTypeReply = 2; |
| | | kMsgTypePublish = 3; |
| | | kMsgTypeSubscribe = 4; |
| | | kMsgTypeUnsubscribe = 5; |
| | | |
| | | kMsgTypeProcQueryTopic = 6; |
| | | kMsgTypeProcQueryTopicReply = 7; |
| | | kMsgTypeProcRegisterTopics = 8; |
| | | kMsgTypeProcHeartbeat = 9; |
| | | kMsgTypeCommonReply = 2; |
| | | |
| | | kMsgTypeRegister= 10; |
| | | // kMsgTypeRegisterReply= 11; |
| | | kMsgTypeHeartbeat = 12; |
| | | // kMsgTypeHeartbeatReply = 13; |
| | | kMsgTypeQueryTopic = 14; |
| | | kMsgTypeQueryTopicReply = 15; |
| | | kMsgTypeRequestTopic = 16; |
| | | kMsgTypeRequestTopicReply = 17; |
| | | |
| | | kMsgTypePublish = 100; |
| | | // kMsgTypePublishReply = 101; |
| | | kMsgTypeSubscribe = 102; |
| | | // kMsgTypeSubscribeReply = 103; |
| | | kMsgTypeUnsubscribe = 104; |
| | | // kMsgTypeUnsubscribeReply = 105; |
| | | |
| | | } |
| | | |
| | | message DataPub { |
| | | message MsgPub { |
| | | bytes topic = 1; |
| | | bytes data = 2; |
| | | } |
| | | |
| | | message DataSub { |
| | | message MsgSub { |
| | | repeated bytes topics = 1; |
| | | } |
| | | |
| | | message DataRequest { |
| | | message MsgCommonReply { |
| | | ErrorMsg errmsg = 1; |
| | | } |
| | | |
| | | message MsgRequestTopic { |
| | | bytes topic = 1; |
| | | bytes data = 2; |
| | | } |
| | | |
| | | message DataReply { |
| | | bytes data = 1; |
| | | message MsgRequestTopicReply { |
| | | ErrorMsg errmsg = 1; |
| | | bytes data = 2; |
| | | } |
| | | |
| | | message DataProcRegister |
| | | message MsgRegister |
| | | { |
| | | ProcInfo proc = 1; |
| | | repeated bytes topics = 2; |
| | | } |
| | | |
| | | message DataProcHeartbeat |
| | | message MsgHeartbeat |
| | | { |
| | | ProcInfo proc = 1; |
| | | } |
| | | |
| | | message DataProcQueryTopic { |
| | | message MsgQueryTopic { |
| | | bytes topic = 1; |
| | | } |
| | | |
| | | message DataProcQueryTopicReply { |
| | | BHAddress address = 1; |
| | | message MsgQueryTopicReply { |
| | | ErrorMsg errmsg = 1; |
| | | BHAddress address = 2; |
| | | } |
| | | |
| | | service TopicRequestReplyService { |
| | | rpc Request (DataRequest) returns (DataReply); |
| | | } |
| | | service TopicRPC { |
| | | rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply); |
| | | rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply); |
| | | } |
New file |
| | |
| | | syntax = "proto3"; |
| | | |
| | | option optimize_for = LITE_RUNTIME; |
| | | |
| | | package bhome.msg; |
| | | |
| | | enum ErrorCode { |
| | | eSuccess = 0; |
| | | eError = 1; |
| | | eInvalidInput = 2; |
| | | } |
| | | |
| | | message ErrorMsg { |
| | | ErrorCode errCode = 1; |
| | | bytes errString = 2; |
| | | } |
| | |
| | | |
| | | namespace bhome_msg |
| | | { |
| | | |
| | | /*TODO change msg format, header has proc info; |
| | | reply has errer msg |
| | | center accept request and route.; |
| | | //*/ |
| | | const uint32_t kMsgTag = 0xf1e2d3c4; |
| | | const uint32_t kMsgPrefixLen = 4; |
| | | |
| | |
| | | |
| | | BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size) |
| | | { |
| | | BHMsg msg(InitMsg(kMsgTypeRequest)); |
| | | BHMsg msg(InitMsg(kMsgTypeRequestTopic)); |
| | | AddRoute(msg, src_id); |
| | | DataRequest req; |
| | | MsgRequestTopic req; |
| | | req.set_topic(topic); |
| | | req.set_data(data, size); |
| | | msg.set_body(req.SerializeAsString()); |
| | |
| | | |
| | | BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics) |
| | | { |
| | | BHMsg msg(InitMsg(kMsgTypeProcRegisterTopics)); |
| | | BHMsg msg(InitMsg(kMsgTypeRegister)); |
| | | AddRoute(msg, src_id); |
| | | DataProcRegister reg; |
| | | MsgRegister reg; |
| | | reg.mutable_proc()->Swap(&info); |
| | | for (auto &t : topics) { |
| | | reg.add_topics(t); |
| | |
| | | |
| | | BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info) |
| | | { |
| | | BHMsg msg(InitMsg(kMsgTypeProcHeartbeat)); |
| | | BHMsg msg(InitMsg(kMsgTypeHeartbeat)); |
| | | AddRoute(msg, src_id); |
| | | DataProcRegister reg; |
| | | MsgHeartbeat reg; |
| | | reg.mutable_proc()->Swap(&info); |
| | | msg.set_body(reg.SerializeAsString()); |
| | | return msg; |
| | |
| | | BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size) |
| | | { |
| | | assert(data && size); |
| | | BHMsg msg(InitMsg(kMsgTypeReply, src_msgid)); |
| | | DataReply reply; |
| | | BHMsg msg(InitMsg(kMsgTypeRequestTopicReply, src_msgid)); |
| | | MsgRequestTopicReply reply; |
| | | reply.set_data(data, size); |
| | | msg.set_body(reply.SerializeAsString()); |
| | | return msg; |
| | |
| | | assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe); |
| | | BHMsg msg(InitMsg(sub_unsub)); |
| | | AddRoute(msg, client); |
| | | DataSub subs; |
| | | MsgSub subs; |
| | | for (auto &t : topics) { |
| | | subs.add_topics(t); |
| | | } |
| | |
| | | { |
| | | assert(data && size); |
| | | BHMsg msg(InitMsg(kMsgTypePublish)); |
| | | DataPub pub; |
| | | MsgPub pub; |
| | | pub.set_topic(topic); |
| | | pub.set_data(data, size); |
| | | msg.set_body(pub.SerializeAsString()); |
| | |
| | | |
| | | BHMsg MakeQueryTopic(const MQId &client, const std::string &topic) |
| | | { |
| | | BHMsg msg(InitMsg(kMsgTypeProcQueryTopic)); |
| | | BHMsg msg(InitMsg(kMsgTypeQueryTopic)); |
| | | AddRoute(msg, client); |
| | | DataProcQueryTopic query; |
| | | MsgQueryTopic query; |
| | | query.set_topic(topic); |
| | | msg.set_body(query.SerializeAsString()); |
| | | return msg; |
| | | } |
| | | BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid) |
| | | { |
| | | BHMsg msg(InitMsg(kMsgTypeProcQueryTopicReply, msgid)); |
| | | DataProcQueryTopicReply reply; |
| | | BHMsg msg(InitMsg(kMsgTypeQueryTopicReply, msgid)); |
| | | MsgQueryTopicReply reply; |
| | | reply.mutable_address()->set_mq_id(mqid); |
| | | msg.set_body(reply.SerializeAsString()); |
| | | return msg; |
| | |
| | | { |
| | | auto AsyncRecvProc = [this, tdcb](BHMsg &msg) { |
| | | if (msg.type() == kMsgTypePublish) { |
| | | DataPub d; |
| | | MsgPub d; |
| | | if (d.ParseFromString(msg.body())) { |
| | | tdcb(d.topic(), d.data()); |
| | | } |
| | |
| | | { |
| | | BHMsg msg; |
| | | if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) { |
| | | DataPub d; |
| | | MsgPub d; |
| | | if (d.ParseFromString(msg.body())) { |
| | | d.mutable_topic()->swap(topic); |
| | | d.mutable_data()->swap(data); |
| | |
| | | auto &shm = socket.shm(); |
| | | |
| | | auto OnSubChange = [&](auto &&update) { |
| | | DataSub sub; |
| | | MsgSub sub; |
| | | if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) { |
| | | assert(sizeof(MQId) == msg.route(0).mq_id().size()); |
| | | MQId client; |
| | |
| | | auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); }; |
| | | |
| | | auto OnPublish = [&]() { |
| | | DataPub pub; |
| | | MsgPub pub; |
| | | if (!pub.ParseFromString(msg.body())) { |
| | | return; |
| | | } |
| | |
| | | std::unordered_map<ProcId, Node> nodes_; |
| | | }; |
| | | |
| | | Synced<NodeCenter> &Center() |
| | | { |
| | | static Synced<NodeCenter> s; |
| | | return s; |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | BHCenter::MsgHandler MakeReqRepCenter() |
| | |
| | | time_t now = 0; |
| | | time(&now); |
| | | if (last.exchange(now) < now) { |
| | | printf("bus queue size: %ld\n", socket.Pending()); |
| | | printf("center queue size: %ld\n", socket.Pending()); |
| | | } |
| | | #endif |
| | | auto SrcMQ = [&]() { return msg.route(0).mq_id(); }; |
| | |
| | | auto OnRegister = [&]() { |
| | | if (msg.route_size() != 1) { return; } |
| | | |
| | | DataProcRegister reg; |
| | | MsgRegister reg; |
| | | if (reg.ParseFromString(msg.body()) && reg.has_proc()) { |
| | | center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end()); |
| | | } |
| | |
| | | if (msg.route_size() != 1) { return; } |
| | | auto &src_mq = msg.route(0).mq_id(); |
| | | |
| | | DataProcHeartbeat hb; |
| | | MsgHeartbeat hb; |
| | | if (hb.ParseFromString(msg.body()) && hb.has_proc()) { |
| | | center->Heartbeat(*hb.mutable_proc(), SrcMQ()); |
| | | } |
| | |
| | | auto OnQueryTopic = [&]() { |
| | | if (msg.route_size() != 1) { return; } |
| | | |
| | | DataProcQueryTopic query; |
| | | MsgQueryTopic query; |
| | | NodeCenter::ProcAddr dest; |
| | | if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) { |
| | | MQId remote; |
| | |
| | | }; |
| | | |
| | | switch (msg.type()) { |
| | | case kMsgTypeProcRegisterTopics: OnRegister(); return true; |
| | | case kMsgTypeProcHeartbeat: OnHeartbeat(); return true; |
| | | case kMsgTypeProcQueryTopic: OnQueryTopic(); return true; |
| | | case kMsgTypeRegister: OnRegister(); return true; |
| | | case kMsgTypeHeartbeat: OnHeartbeat(); return true; |
| | | case kMsgTypeQueryTopic: OnQueryTopic(); return true; |
| | | default: return false; |
| | | } |
| | | }; |
| | |
| | | |
| | | const int kMaxWorker = 16; |
| | | return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); |
| | | } |
| | | } |
| | |
| | | Queue *remote = Find(shm, MsgQIdToName(remote_id)); |
| | | return remote && remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); |
| | | } |
| | | bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms) |
| | | { |
| | | Queue *remote = Find(shm, MsgQIdToName(remote_id)); |
| | | return remote && remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); }); |
| | | } |
| | | |
| | | // Test shows that in the 2 cases: |
| | | // 1) build msg first, then find remote queue; |
| | | // 2) find remote queue first, then build msg; |
| | | // 1 is about 50% faster than 2, maybe cache related. |
| | | |
| | | bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, const std::function<void()> &onsend) |
| | | { |
| | | MsgI msg; |
| | | if (msg.Make(shm(), data)) { |
| | | if (Send(remote_id, msg, timeout_ms, onsend)) { |
| | | return true; |
| | | } else { |
| | | msg.Release(shm()); |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms) |
| | | { |
| | |
| | | bool Recv(BHMsg &msg, const int timeout_ms); |
| | | bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); } |
| | | static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend); |
| | | static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms) |
| | | static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms); |
| | | |
| | | template <class... Extra> |
| | | bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, Extra const &...extra) |
| | | { |
| | | return Send(shm, remote_id, msg, timeout_ms, []() {}); |
| | | return Send(shm(), remote_id, msg, timeout_ms, extra...); |
| | | } |
| | | bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms, OnSend const &onsend); |
| | | bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms) |
| | | |
| | | template <class... Extra> |
| | | bool Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, Extra const &...extra) |
| | | { |
| | | return Send(remote_id, msg, timeout_ms, []() {}); |
| | | } |
| | | bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend) |
| | | { |
| | | return Send(shm(), remote_id, msg, timeout_ms, onsend); |
| | | } |
| | | bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms) |
| | | { |
| | | return Send(shm(), remote_id, msg, timeout_ms); |
| | | MsgI msg; |
| | | if (msg.Make(shm(), data)) { |
| | | if (Send(shm(), remote_id, msg, timeout_ms, extra...)) { |
| | | return true; |
| | | } else { |
| | | msg.Release(shm()); |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | size_t Pending() const { return data()->size(); } |
| | | }; |
| | |
| | | Stop(); //TODO should stop in sub class, incase thread access sub class data. |
| | | } |
| | | |
| | | bool ShmSocket::Start(const RecvCB &onData, int nworker) |
| | | bool ShmSocket::Start(const RecvCB &onData, const IdleCB &onIdle, int nworker) |
| | | { |
| | | if (!mq_) { |
| | | return false; |
| | | if (!mq_ || !onData) { |
| | | return false; // TODO error code. |
| | | } |
| | | |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | StopNoLock(); |
| | | auto RecvProc = [this, onData]() { |
| | | auto RecvProc = [this, onData, onIdle]() { |
| | | while (run_) { |
| | | try { |
| | | MsgI imsg; |
| | |
| | | if (imsg.Unpack(msg)) { |
| | | onData(*this, imsg, msg); |
| | | } |
| | | } else if (onIdle) { |
| | | onIdle(*this); |
| | | } |
| | | } catch (...) { |
| | | } |
| | |
| | | typedef bhome_shm::SharedMemory Shm; |
| | | typedef std::function<void(ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg)> RecvCB; |
| | | typedef std::function<void(bhome_msg::BHMsg &msg)> RecvBHMsgCB; |
| | | typedef std::function<void(ShmSocket &sock)> IdleCB; |
| | | |
| | | ShmSocket(Shm &shm, const void *id, const int len); |
| | | ShmSocket(Shm &shm, const int len = 12); |
| | |
| | | |
| | | Shm &shm() { return shm_; } |
| | | // start recv. |
| | | bool Start(const RecvCB &onData, int nworker = 1); |
| | | bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1); |
| | | bool Start(const RecvCB &onData, int nworker = 1) { return Start(onData, IdleCB(), nworker); } |
| | | bool Start(const RecvBHMsgCB &onData, const IdleCB &onIdle, int nworker = 1) |
| | | { |
| | | return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, onIdle, nworker); |
| | | } |
| | | bool Start(const RecvBHMsgCB &onData, int nworker = 1) |
| | | { |
| | | return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, nworker); |
| | | return Start(onData, IdleCB(), nworker); |
| | | } |
| | | bool Stop(); |
| | | size_t Pending() const { return mq_ ? mq_->Pending() : 0; } |
| | | |
| | | bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms); |
| | | bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms); |
| | | |
| | | protected: |
| | | const Shm &shm() const { return shm_; } |
| | | Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid. |
| | | const Queue &mq() const { return *mq_; } |
| | | std::mutex &mutex() { return mutex_; } |
| | | |
| | | bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms); |
| | | bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms); |
| | | |
| | | private: |
| | | bool StopNoLock(); |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: topic_reply.cpp |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月06日 14时40分52秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "topic_reply.h" |
| | | #include <chrono> |
| | | #include <list> |
| | | |
| | | using namespace bhome_msg; |
| | | using namespace std::chrono; |
| | | using namespace std::chrono_literals; |
| | | |
| | | namespace |
| | | { |
| | | struct SrcInfo { |
| | | std::vector<BHAddress> route; |
| | | std::string msg_id; |
| | | }; |
| | | |
| | | class FailedQ |
| | | { |
| | | struct FailedMsg { |
| | | steady_clock::time_point xpr; |
| | | std::string remote_; |
| | | BHMsg msg_; |
| | | FailedMsg(const std::string &addr, BHMsg &&msg) : |
| | | xpr(steady_clock::now() + 10s), remote_(addr), msg_(std::move(msg)) {} |
| | | bool Expired() { return steady_clock::now() > xpr; } |
| | | }; |
| | | typedef std::list<FailedMsg> Queue; |
| | | Synced<Queue> queue_; |
| | | |
| | | public: |
| | | void Push(const std::string &remote, BHMsg &&msg) |
| | | { |
| | | queue_->emplace_back(remote, std::move(msg)); |
| | | } |
| | | void TrySend(ShmSocket &socket, const int timeout_ms = 0) |
| | | { |
| | | queue_.Apply([&](Queue &q) { |
| | | if (!q.empty()) { |
| | | auto it = q.begin(); |
| | | do { |
| | | if (it->Expired() || socket.SyncSend(it->remote_.data(), it->msg_, timeout_ms)) { |
| | | it = q.erase(it); |
| | | } else { |
| | | ++it; |
| | | } |
| | | } while (it != q.end()); |
| | | } |
| | | }); |
| | | } |
| | | }; |
| | | |
| | | } // namespace |
| | | |
| | | bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms) |
| | | { |
| | | //TODO check reply? |
| | | return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms); |
| | | } |
| | | bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms) |
| | | { |
| | | return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms); |
| | | } |
| | | bool SocketReply::StartWorker(const OnRequest &rcb, int nworker) |
| | | { |
| | | auto failed_q = std::make_shared<FailedQ>(); |
| | | |
| | | auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); }; |
| | | |
| | | auto onRecv = [this, rcb, failed_q, onIdle](BHMsg &msg) { |
| | | if (msg.type() == kMsgTypeRequestTopic && msg.route_size() > 0) { |
| | | MsgRequestTopic req; |
| | | if (req.ParseFromString(msg.body())) { |
| | | std::string out; |
| | | if (rcb(req.topic(), req.data(), out)) { |
| | | BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size())); |
| | | for (int i = 0; i < msg.route_size() - 1; ++i) { |
| | | msg.add_route()->Swap(msg.mutable_route(i)); |
| | | } |
| | | if (!SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 10)) { |
| | | failed_q->Push(msg.route().rbegin()->mq_id(), std::move(msg_reply)); |
| | | } |
| | | } |
| | | } |
| | | } else { |
| | | // ignored, or dropped |
| | | } |
| | | |
| | | onIdle(*this); |
| | | }; |
| | | |
| | | return rcb && Start(onRecv, onIdle, nworker); |
| | | } |
| | | |
| | | bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) |
| | | { |
| | | BHMsg msg; |
| | | if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequestTopic) { |
| | | MsgRequestTopic request; |
| | | if (request.ParseFromString(msg.body())) { |
| | | request.mutable_topic()->swap(topic); |
| | | request.mutable_data()->swap(data); |
| | | SrcInfo *p = new SrcInfo; |
| | | p->route.assign(msg.route().begin(), msg.route().end()); |
| | | p->msg_id = msg.msg_id(); |
| | | src_info = p; |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms) |
| | | { |
| | | SrcInfo *p = static_cast<SrcInfo *>(src_info); |
| | | DEFER1(delete p); |
| | | if (!p || p->route.empty()) { |
| | | return false; |
| | | } |
| | | |
| | | BHMsg msg(MakeReply(p->msg_id, data.data(), data.size())); |
| | | for (unsigned i = 0; i < p->route.size() - 1; ++i) { |
| | | msg.add_route()->Swap(&p->route[i]); |
| | | } |
| | | |
| | | return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms); |
| | | } |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: topic_reply.h |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月06日 14时41分12秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #ifndef TOPIC_REPLY_3RVYPPWI |
| | | #define TOPIC_REPLY_3RVYPPWI |
| | | |
| | | #include "bh_util.h" |
| | | #include "defs.h" |
| | | #include "msg.h" |
| | | #include "socket.h" |
| | | #include <deque> |
| | | #include <functional> |
| | | |
| | | using bhome::msg::ProcInfo; |
| | | |
| | | class SocketReply : private ShmSocket |
| | | { |
| | | typedef ShmSocket Socket; |
| | | |
| | | public: |
| | | SocketReply(Socket::Shm &shm) : |
| | | Socket(shm, 64) {} |
| | | SocketReply() : |
| | | SocketReply(BHomeShm()) {} |
| | | ~SocketReply() { Stop(); } |
| | | |
| | | typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest; |
| | | bool StartWorker(const OnRequest &rcb, int nworker = 2); |
| | | bool Stop() { return Socket::Stop(); } |
| | | bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms); |
| | | bool SendReply(void *src_info, const std::string &data, const int timeout_ms); |
| | | bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms); |
| | | bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms); |
| | | |
| | | private: |
| | | }; |
| | | |
| | | #endif // end of include guard: TOPIC_REPLY_3RVYPPWI |
File was renamed from src/reqrep.cpp |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: reqrep.cpp |
| | | * Filename: topic_request.cpp |
| | | * |
| | | * Description: topic request/reply sockets |
| | | * Description: topic request sockets |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月01日 09时35分35秒 |
| | |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "reqrep.h" |
| | | #include "topic_request.h" |
| | | #include "bh_util.h" |
| | | #include "msg.h" |
| | | #include <chrono> |
| | |
| | | }; |
| | | |
| | | RecvBHMsgCB cb; |
| | | if (Find(cb)) { |
| | | if (Find(cb) && cb) { |
| | | cb(msg); |
| | | } else if (msg.type() == kMsgTypeReply) { |
| | | DataReply reply; |
| | | } else if (msg.type() == kMsgTypeRequestTopicReply && rrcb) { |
| | | MsgRequestTopicReply reply; |
| | | if (reply.ParseFromString(msg.body())) { |
| | | rrcb(reply.data()); |
| | | } |
| | |
| | | auto Call = [&](const void *remote) { |
| | | const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); |
| | | auto onRecv = [cb](BHMsg &msg) { |
| | | if (msg.type() == kMsgTypeReply) { |
| | | DataReply reply; |
| | | if (msg.type() == kMsgTypeRequestTopicReply) { |
| | | MsgRequestTopicReply reply; |
| | | if (reply.ParseFromString(msg.body())) { |
| | | cb(reply.data()); |
| | | } |
| | |
| | | if (QueryRPCTopic(topic, addr, timeout_ms)) { |
| | | const BHMsg &req(MakeRequest(mq().Id(), topic, data, size)); |
| | | BHMsg reply; |
| | | if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeReply) { |
| | | DataReply dr; |
| | | if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeRequestTopicReply) { |
| | | MsgRequestTopicReply dr; |
| | | if (dr.ParseFromString(reply.body())) { |
| | | dr.mutable_data()->swap(out); |
| | | return true; |
| | | } else { |
| | | printf("error parse reply.\n"); |
| | | } |
| | | } else { |
| | | printf("error recv data. line: %d\n", __LINE__); |
| | | } |
| | | } else { |
| | | printf("error recv data. line: %d\n", __LINE__); |
| | | } |
| | | } catch (...) { |
| | | printf("error recv data. line: %d\n", __LINE__); |
| | | } |
| | | return false; |
| | | } |
| | |
| | | BHMsg result; |
| | | const BHMsg &msg = MakeQueryTopic(mq().Id(), topic); |
| | | if (SyncSendAndRecv(&kBHTopicReqRepCenter, &msg, &result, timeout_ms)) { |
| | | if (result.type() == kMsgTypeProcQueryTopicReply) { |
| | | DataProcQueryTopicReply reply; |
| | | if (result.type() == kMsgTypeQueryTopicReply) { |
| | | MsgQueryTopicReply reply; |
| | | if (reply.ParseFromString(result.body())) { |
| | | addr = reply.address(); |
| | | if (addr.mq_id().empty()) { |
| | |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | // reply socket |
| | | namespace |
| | | { |
| | | struct SrcInfo { |
| | | std::vector<BHAddress> route; |
| | | std::string msg_id; |
| | | }; |
| | | |
| | | } // namespace |
| | | |
| | | bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms) |
| | | { |
| | | //TODO check reply? |
| | | return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms); |
| | | } |
| | | bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms) |
| | | { |
| | | return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms); |
| | | } |
| | | bool SocketReply::StartWorker(const OnRequest &rcb, int nworker) |
| | | { |
| | | auto onRecv = [this, rcb](BHMsg &msg) { |
| | | if (msg.type() == kMsgTypeRequest && msg.route_size() > 0) { |
| | | DataRequest req; |
| | | if (req.ParseFromString(msg.body())) { |
| | | std::string out; |
| | | if (rcb(req.topic(), req.data(), out)) { |
| | | BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size())); |
| | | for (int i = 0; i < msg.route_size() - 1; ++i) { |
| | | msg.add_route()->Swap(msg.mutable_route(i)); |
| | | } |
| | | SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 100); |
| | | } |
| | | } |
| | | } else { |
| | | // ignored, or dropped |
| | | } |
| | | }; |
| | | |
| | | return rcb && Start(onRecv, nworker); |
| | | } |
| | | |
| | | bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) |
| | | { |
| | | BHMsg msg; |
| | | if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequest) { |
| | | DataRequest request; |
| | | if (request.ParseFromString(msg.body())) { |
| | | request.mutable_topic()->swap(topic); |
| | | request.mutable_data()->swap(data); |
| | | SrcInfo *p = new SrcInfo; |
| | | p->route.assign(msg.route().begin(), msg.route().end()); |
| | | p->msg_id = msg.msg_id(); |
| | | src_info = p; |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms) |
| | | { |
| | | SrcInfo *p = static_cast<SrcInfo *>(src_info); |
| | | DEFER1(delete p); |
| | | if (!p || p->route.empty()) { |
| | | return false; |
| | | } |
| | | |
| | | BHMsg msg(MakeReply(p->msg_id, data.data(), data.size())); |
| | | for (unsigned i = 0; i < p->route.size() - 1; ++i) { |
| | | msg.add_route()->Swap(&p->route[i]); |
| | | } |
| | | |
| | | return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms); |
| | | } |
File was renamed from src/reqrep.h |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: reqrep.h |
| | | * Filename: topic_request.h |
| | | * |
| | | * Description: topic request/reply sockets |
| | | * Description: topic request socket |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月01日 09时36分06秒 |
| | |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #ifndef REQREP_ACEH09NK |
| | | #define REQREP_ACEH09NK |
| | | #ifndef TOPIC_REQUEST_ACEH09NK |
| | | #define TOPIC_REQUEST_ACEH09NK |
| | | |
| | | #include "bh_util.h" |
| | | #include "defs.h" |
| | |
| | | TopicCache topic_cache_; |
| | | }; |
| | | |
| | | class SocketReply : private ShmSocket |
| | | { |
| | | typedef ShmSocket Socket; |
| | | |
| | | public: |
| | | SocketReply(Socket::Shm &shm) : |
| | | Socket(shm, 64) {} |
| | | SocketReply() : |
| | | SocketReply(BHomeShm()) {} |
| | | ~SocketReply() { Stop(); } |
| | | |
| | | typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest; |
| | | bool StartWorker(const OnRequest &rcb, int nworker = 2); |
| | | bool Stop() { return Socket::Stop(); } |
| | | bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms); |
| | | bool SendReply(void *src_info, const std::string &data, const int timeout_ms); |
| | | bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms); |
| | | bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms); |
| | | |
| | | private: |
| | | }; |
| | | |
| | | #endif // end of include guard: REQREP_ACEH09NK |
| | | #endif // end of include guard: TOPIC_REQUEST_ACEH09NK |
| | |
| | | auto Server = [&]() { |
| | | BHMsg req; |
| | | while (!stop) { |
| | | if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) { |
| | | if (srv.Recv(req, 100) && req.type() == kMsgTypeRequestTopic) { |
| | | auto &mqid = req.route()[0].mq_id(); |
| | | MQId src_id; |
| | | memcpy(&src_id, mqid.data(), sizeof(src_id)); |
| | |
| | | #include "defs.h" |
| | | #include "pubsub.h" |
| | | #include "pubsub_center.h" |
| | | #include "reqrep.h" |
| | | #include "reqrep_center.h" |
| | | #include "socket.h" |
| | | #include "topic_reply.h" |
| | | #include "topic_request.h" |
| | | #include "util.h" |
| | | #include <atomic> |
| | | #include <boost/uuid/uuid_generators.hpp> |
| | |
| | | printf("count: %d\n", count.load()); |
| | | } |
| | | }; |
| | | client.StartWorker(onRecv, 1); |
| | | client.StartWorker(onRecv, 2); |
| | | boost::timer::auto_cpu_timer timer; |
| | | for (int i = 0; i < nreq; ++i) { |
| | | if (!client.AsyncRequest(topic, "data " + std::to_string(i), 1000)) { |
| | | printf("client request failed\n"); |
| | | } |
| | | |
| | | // if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) { |
| | | // printf("client request failed\n"); |
| | | // } else { |
| | | // ++count; |
| | | // } |
| | | } |
| | | printf("request %s %d done ", topic.c_str(), nreq); |
| | | while (count.load() < nreq) { |
| | | do { |
| | | std::this_thread::yield(); |
| | | } |
| | | } while (count.load() < nreq); |
| | | client.Stop(); |
| | | printf("request %s %d done ", topic.c_str(), count.load()); |
| | | }; |
| | | std::atomic_uint64_t server_msg_count(0); |
| | | auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { |
| | | SocketReply server(shm); |
| | | ProcInfo info; |
| | |
| | | if (!server.Register(info, topics, 100)) { |
| | | printf("register failed\n"); |
| | | } |
| | | auto onData = [](const std::string &topic, const std::string &data, std::string &reply) { |
| | | auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) { |
| | | ++server_msg_count; |
| | | reply = topic + ':' + data; |
| | | return true; |
| | | }; |
| | |
| | | servers.Launch(Server, "server", topics); |
| | | std::this_thread::sleep_for(100ms); |
| | | for (auto &t : topics) { |
| | | clients.Launch(Client, t, 1000 * 1000); |
| | | clients.Launch(Client, t, 1000 * 100); |
| | | } |
| | | clients.WaitAll(); |
| | | printf("clients done, server replyed: %d\n", server_msg_count.load()); |
| | | run = false; |
| | | servers.WaitAll(); |
| | | } |