add api; fix send, socknode mem leak.
| | |
| | | "program": "${workspaceFolder}/debug/bin/utest", |
| | | "args": [ |
| | | "-t", |
| | | "SRTest" |
| | | "ApiTest" |
| | | ], |
| | | "stopAtEntry": false, |
| | | "cwd": "${workspaceFolder}", |
| | |
| | | "*.inc": "cpp", |
| | | "strstream": "cpp", |
| | | "unordered_set": "cpp", |
| | | "cfenv": "cpp" |
| | | "cfenv": "cpp", |
| | | "*.ipp": "cpp" |
| | | }, |
| | | "files.exclude": { |
| | | "**/*.un~": true, |
New file |
| | |
| | | #ifndef APP_ARG_OQMELZBX |
| | | #define APP_ARG_OQMELZBX |
| | | |
| | | #include <map> |
| | | #include <string> |
| | | |
| | | class AppArg |
| | | { |
| | | typedef std::map<std::string, std::string> ArgMap; |
| | | public: |
| | | AppArg(int argc, const char *argv[]) { |
| | | Parse(argc, argv); |
| | | } |
| | | bool Has(const std::string &key) const { |
| | | return Pos(key) != args.end(); |
| | | } |
| | | std::string Get(const std::string &key, const std::string &def = "") const { |
| | | ArgMap::const_iterator pos = Pos(key); |
| | | if (pos != args.end()) { |
| | | return pos->second; |
| | | } else { |
| | | return def; |
| | | } |
| | | } |
| | | private: |
| | | void Parse(int argc, const char *argv[]) { |
| | | for (int i = 1; i < argc; ++i) { |
| | | std::string text(argv[i]); |
| | | if (text.substr(0, 2) == "--") { |
| | | text = text.substr(2); |
| | | std::string::size_type sep = text.find('='); |
| | | if (sep == std::string::npos) { |
| | | args[text].clear(); |
| | | } else { |
| | | args[text.substr(0, sep)] = text.substr(sep+1); |
| | | } |
| | | } else if (text.substr(0,1) == "-") { |
| | | text = text.substr(1); |
| | | args[text].clear(); |
| | | if (i+1 < argc) { |
| | | std::string next(argv[i+1]); |
| | | if (next.substr(0,1) != "-") { |
| | | args[text] = next; |
| | | ++i; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |
| | | ArgMap::const_iterator Pos(const std::string &key) const { |
| | | return args.find(key); |
| | | } |
| | | |
| | | ArgMap args; |
| | | }; |
| | | |
| | | #endif // end of include guard: APP_ARG_OQMELZBX |
| | | |
| | |
| | | }; |
| | | |
| | | auto pos = nodes_.find(head.proc_id()); |
| | | if (pos == nodes_.end()) { // new client |
| | | Node node(new NodeInfo); |
| | | UpdateRegInfo(node); |
| | | nodes_[node->proc_.proc_id()] = node; |
| | | } else { |
| | | if (pos != nodes_.end()) { // new client |
| | | Node &node = pos->second; |
| | | if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) { |
| | | // node restarted, release old mq. |
| | | for (auto &addr : node->addrs_) { |
| | | cleaner_(addr); |
| | | } |
| | | node->addrs_.clear(); |
| | | RemoveNode(node); |
| | | node.reset(new NodeInfo); |
| | | } |
| | | UpdateRegInfo(node); |
| | | } else { |
| | | Node node(new NodeInfo); |
| | | UpdateRegInfo(node); |
| | | nodes_[node->proc_.proc_id()] = node; |
| | | } |
| | | return MakeReply(eSuccess); |
| | | } catch (...) { |
| | |
| | | auto &cli = *it->second; |
| | | cli.state_.UpdateState(now, offline_time_, kill_time_); |
| | | if (cli.state_.flag_ == kStateKillme) { |
| | | if (cleaner_) { |
| | | for (auto &addr : cli.addrs_) { |
| | | cleaner_(addr); |
| | | } |
| | | } |
| | | RemoveNode(it->second); |
| | | it = nodes_.erase(it); |
| | | } else { |
| | | ++it; |
| | |
| | | { |
| | | auto node = weak.lock(); |
| | | return node && Valid(*node); |
| | | } |
| | | void RemoveNode(Node &node) |
| | | { |
| | | auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) { |
| | | for (auto &addr_topics : node_rec) { |
| | | TopicDest dest{addr_topics.first, node}; |
| | | for (auto &topic : addr_topics.second) { |
| | | auto pos = rec_map.find(topic); |
| | | if (pos != rec_map.end()) { |
| | | pos->second.erase(dest); |
| | | if (pos->second.empty()) { |
| | | rec_map.erase(pos); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | }; |
| | | EraseMapRec(service_map_, node->services_); |
| | | EraseMapRec(subscribe_map_, node->subscriptions_); |
| | | |
| | | for (auto &addr : node->addrs_) { |
| | | cleaner_(addr); |
| | | } |
| | | node->addrs_.clear(); |
| | | } |
| | | std::string id_; // center proc id; |
| | | |
| | |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); |
| | | MsgI msg; |
| | | if (msg.Make(socket.shm(), reply_head, rep_body)) { |
| | | auto &remote = head.route(0).mq_id(); |
| | | bool r = socket.Send(remote.data(), msg); |
| | | } |
| | | auto &remote = head.route(0).mq_id(); |
| | | socket.Send(remote.data(), reply_head, rep_body); |
| | | }; |
| | | }; |
| | | |
| | |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "app_arg.h" |
| | | #include "box.h" |
| | | #include "center.h" |
| | | #include "defs.h" |
| | | #include "signalhandle.h" |
| | | #include <chrono> |
| | | #include <thread> |
| | | using namespace std::chrono_literals; |
| | | |
| | | int center_main(int argc, const char *argv[]) |
| | | { |
| | | AppArg args(argc, argv); |
| | | if (args.Has("remove")) { |
| | | BHomeShm().Remove(); |
| | | return 0; |
| | | } |
| | | |
| | | bool run = true; |
| | | auto showStatus = [&]() { |
| | | auto init = BHomeShm().get_free_memory(); |
| | | uint64_t idx = 0; |
| | | while (run) { |
| | | std::this_thread::sleep_for(1s); |
| | | printf("%8d shared memory: avail : %ld / %ld\n", ++idx, BHomeShm().get_free_memory(), init); |
| | | } |
| | | }; |
| | | std::thread t(showStatus); |
| | | |
| | | BHCenter center(BHomeShm()); |
| | | center.Start(); |
| | | printf("center started ...\n"); |
| | | WaitForSignals({SIGINT, SIGTERM}); |
| | | // BHomeShm().Remove(); // remove ? |
| | | run = false; |
| | | t.join(); |
| | | return 0; |
| | | } |
| | | |
| | |
| | | } |
| | | size_t size() const { return size_; } |
| | | operator bool() const { return ptr_; } |
| | | bool ReleaseTo(void **pdata, int *psize) |
| | | { |
| | | if (!ptr_) { |
| | | return false; |
| | | } |
| | | if (pdata && psize) { |
| | | *psize = size(); |
| | | *pdata = release(); |
| | | } |
| | | return true; |
| | | } |
| | | }; |
| | | |
| | | template <class Msg> |
| | | bool PackOutput(const Msg &msg, void **out, int *out_len) |
| | | { |
| | | if (!out || !out_len) { |
| | | return true; // not wanted. |
| | | } |
| | | auto size = msg.ByteSizeLong(); |
| | | TmpPtr p(size); |
| | | if (!p) { |
| | |
| | | return false; |
| | | } |
| | | msg.SerializePartialToArray(p.get(), size); |
| | | *out = p.release(); |
| | | *out_len = size; |
| | | p.ReleaseTo(out, out_len); |
| | | return true; |
| | | } |
| | | |
| | | template <class MsgIn, class MsgOut = MsgCommonReply> |
| | | bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int), |
| | | const void *request, |
| | | const int request_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | { |
| | | MsgIn input; |
| | | if (!input.ParseFromArray(request, request_len)) { |
| | | SetLastError(eInvalidInput, "invalid input."); |
| | | return false; |
| | | } |
| | | MsgOut msg_reply; |
| | | if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) { |
| | | return PackOutput(msg_reply, reply, reply_len); |
| | | |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | bool BHRegister(const void *proc_info, |
| | | const int proc_info_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | ProcInfo pi; |
| | | if (!pi.ParseFromArray(proc_info, proc_info_len)) { |
| | | SetLastError(eInvalidInput, "invalid input."); |
| | | return false; |
| | | } |
| | | MsgCommonReply msg_reply; |
| | | if (ProcNode().Register(pi, msg_reply, timeout_ms)) { |
| | | return PackOutput(msg_reply, reply, reply_len); |
| | | } else { |
| | | return false; |
| | | } |
| | | return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | bool BHHeartBeatEasy(const int timeout_ms) |
| | |
| | | return ProcNode().Heartbeat(timeout_ms); |
| | | } |
| | | |
| | | bool BHHeartBeat(const void *proc_info, |
| | | const int proc_info_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | ProcInfo pi; |
| | | if (!pi.ParseFromArray(proc_info, proc_info_len)) { |
| | | SetLastError(eInvalidInput, "invalid input."); |
| | | return false; |
| | | } |
| | | MsgCommonReply msg_reply; |
| | | if (ProcNode().Heartbeat(pi, msg_reply, timeout_ms)) { |
| | | return PackOutput(msg_reply, reply, reply_len); |
| | | } else { |
| | | return false; |
| | | } |
| | | return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | bool BHPublish(const void *msgpub, |
| | |
| | | if (ProcNode().RecvSub(proc, pub, timeout_ms)) { |
| | | TmpPtr pproc(proc); |
| | | if (pproc && PackOutput(pub, msgpub, msgpub_len)) { |
| | | *proc_id = pproc.release(); |
| | | *proc_id_len = pproc.size(); |
| | | pproc.ReleaseTo(proc_id, proc_id_len); |
| | | return true; |
| | | } else { |
| | | SetLastError(ENOMEM, "out of mem"); |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool BHAsyncRequest(const void *request, |
| | | const int request_len, |
| | | void **msg_id, |
| | | int *msg_id_len) |
| | | { |
| | | MsgRequestTopic req; |
| | | if (!req.ParseFromArray(request, request_len)) { |
| | | SetLastError(eInvalidInput, "invalid input."); |
| | | return false; |
| | | } |
| | | std::string str_msg_id; |
| | | MsgRequestTopicReply out_msg; |
| | | if (ProcNode().ClientAsyncRequest(req, str_msg_id)) { |
| | | if (!msg_id || !msg_id_len) { |
| | | return true; |
| | | } |
| | | TmpPtr ptr(str_msg_id); |
| | | if (ptr) { |
| | | ptr.ReleaseTo(msg_id, msg_id_len); |
| | | return true; |
| | | } else { |
| | | SetLastError(ENOMEM, "out of mem"); |
| | | } |
| | |
| | | if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) { |
| | | TmpPtr pproc(proc); |
| | | if (pproc && PackOutput(out_msg, reply, reply_len)) { |
| | | *proc_id = pproc.release(); |
| | | *proc_id_len = pproc.size(); |
| | | pproc.ReleaseTo(proc_id, proc_id_len); |
| | | return true; |
| | | } else { |
| | | SetLastError(ENOMEM, "out of mem"); |
| | | } |
| | |
| | | if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) { |
| | | TmpPtr pproc(proc); |
| | | if (pproc && PackOutput(out_msg, request, request_len)) { |
| | | *proc_id = pproc.release(); |
| | | *proc_id_len = pproc.size(); |
| | | pproc.ReleaseTo(proc_id, proc_id_len); |
| | | *src = src_info; |
| | | return true; |
| | | } else { |
| | | SetLastError(ENOMEM, "out of mem"); |
| | | } |
| | |
| | | typedef std::function<bool(const void *, const int)> ServerSender; |
| | | } // namespace |
| | | |
| | | void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb) |
| | | void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb) |
| | | { |
| | | TopicNode::ServerCB on_req; |
| | | TopicNode::SubDataCB on_sub; |
| | | TopicNode::RequestResultCB on_reply; |
| | | if (server_cb) { |
| | | on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { |
| | | std::string sreq(request.SerializeAsString()); |
| | |
| | | sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size()); |
| | | }; |
| | | } |
| | | if (client_cb) { |
| | | on_reply = [client_cb](const BHMsgHead &head, const MsgRequestTopicReply &rep) { |
| | | std::string s(rep.SerializeAsString()); |
| | | client_cb(head.proc_id().data(), head.proc_id().size(), |
| | | head.msg_id().data(), head.msg_id().size(), |
| | | s.data(), s.size()); |
| | | }; |
| | | } |
| | | |
| | | ProcNode().Start(on_req, on_sub); |
| | | ProcNode().Start(on_req, on_sub, on_reply); |
| | | } |
| | | bool BHServerCallbackReply(const BHServerCallbackTag *tag, |
| | | const void *data, |
| | |
| | | std::string err_msg; |
| | | GetLastError(ec, err_msg); |
| | | TmpPtr p(err_msg); |
| | | if (p) { |
| | | *msg = p.release(); |
| | | *msg_len = p.size(); |
| | | } |
| | | p.ReleaseTo(msg, msg_len); |
| | | } |
| | | return ec; |
| | | } |
| | |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | bool BHRegisterTopics(const void *topics, |
| | | const int topics_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | bool BHSubscribeTopics(const void *topics, |
| | | const int topics_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | typedef void (*FSubDataCallback)(const void *proc_id, |
| | | const int proc_id_len, |
| | | const void *data, |
| | |
| | | const int data_len, |
| | | BHServerCallbackTag *tag); |
| | | |
| | | void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb); |
| | | typedef void (*FClientCallback)(const void *proc_id, |
| | | const int proc_id_len, |
| | | const void *msg_id, |
| | | const int msg_id_len, |
| | | const void *data, |
| | | const int data_len); |
| | | |
| | | void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb); |
| | | bool BHServerCallbackReply(const BHServerCallbackTag *tag, |
| | | const void *data, |
| | | const int data_len); |
| | |
| | | int *msgpub_len, |
| | | const int timeout_ms); |
| | | |
| | | bool BHAsyncRequest(const void *request, |
| | | const int request_len, |
| | | void **msg_id, |
| | | int *msg_id_len); |
| | | |
| | | bool BHRequest(const void *request, |
| | | const int request_len, |
| | | void **proc_id, |
| | |
| | | |
| | | } // namespace |
| | | |
| | | std::string NewMsgId() { return RandId(); } |
| | | |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id) |
| | | { |
| | | return InitMsgHead(type, proc_id, RandId()); |
| | |
| | | SetError(*msg.mutable_errmsg(), err_code, err_str); |
| | | return msg; |
| | | } |
| | | |
| | | std::string NewMsgId(); |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid); |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id); |
| | | // inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } |
| | |
| | | void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) |
| | | { |
| | | using namespace std::chrono_literals; |
| | | Append(addr, msg, Now() + 60s, onExpire); |
| | | Append(addr, msg, Now() + 3s, onExpire); |
| | | } |
| | | bool TrySend(bhome_shm::ShmMsgQueue &mq); |
| | | // bool empty() const { return store_.empty(); } |
| | |
| | | |
| | | class ShmSocket : private boost::noncopyable |
| | | { |
| | | bool SendImpl(const void *valid_remote, const MsgI &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent()) |
| | | bool SendImpl(const void *valid_remote, MsgI const &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent()) |
| | | { |
| | | // if (!mq().TrySend(*(MQId *) valid_remote, imsg)) { |
| | | send_buffer_.Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire); |
| | |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body) |
| | | { |
| | | MsgI msg; |
| | | return msg.Make(shm(), head, body) && SendImpl(valid_remote, msg); |
| | | if (msg.Make(shm(), head, body)) { |
| | | DEFER1(if (msg.IsCounted()) { msg.Release(shm()); }); |
| | | return SendImpl(valid_remote, msg); |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | template <class Body> |
| | |
| | | //TODO send_buffer_ need flag, and remove callback on expire. |
| | | MsgI msg; |
| | | if (msg.Make(shm(), head, body)) { |
| | | DEFER1(if (msg.IsCounted()) { msg.Release(shm()); }); |
| | | std::string msg_id(head.msg_id()); |
| | | per_msg_cbs_->Add(msg_id, cb); |
| | | auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) { |
| | |
| | | per_msg_cbs_->Find(msg_id, cb_no_use); |
| | | }; |
| | | return SendImpl(valid_remote, msg, onExpireRemoveCB); |
| | | } else { |
| | | printf("out of mem?, avail: %ld\n", shm().get_free_memory()); |
| | | } |
| | | return false; |
| | | } |
| | |
| | | } // namespace |
| | | |
| | | TopicNode::TopicNode(SharedMemory &shm) : |
| | | shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) |
| | | shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm), registered_(false) |
| | | { |
| | | SockNode().Start(); |
| | | // recv msgs to avoid memory leak. |
| | | auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; |
| | | SockNode().Start(default_ignore_msg); |
| | | } |
| | | |
| | | TopicNode::~TopicNode() |
| | | { |
| | | Stop(); |
| | | SockNode().Stop(); |
| | | } |
| | | |
| | | void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb) |
| | | void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) |
| | | { |
| | | ServerStart(server_cb, 1); |
| | | SubscribeStartWorker(sub_cb, 1); |
| | | // SockClient().Start(); |
| | | if (nworker < 1) { |
| | | nworker = 1; |
| | | } else if (nworker > 16) { |
| | | nworker = 16; |
| | | } |
| | | |
| | | ServerStart(server_cb, nworker); |
| | | SubscribeStartWorker(sub_cb, nworker); |
| | | ClientStartWorker(client_cb, nworker); |
| | | } |
| | | void TopicNode::Stop() |
| | | { |
| | | SockSub().Stop(); |
| | | SockServer().Stop(); |
| | | SockClient().Stop(); |
| | | SockNode().Stop(); |
| | | } |
| | | |
| | | bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | info_ = proc; |
| | | |
| | | auto &sock = SockNode(); |
| | | MsgRegister body; |
| | | *body.mutable_proc() = proc; |
| | | body.mutable_proc()->Swap(&proc); |
| | | auto AddId = [&](const MQId &id) { body.add_addrs()->set_mq_id(&id, sizeof(id)); }; |
| | | AddId(SockNode().id()); |
| | | AddId(SockServer().id()); |
| | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { |
| | | bool ok = head.type() == kMsgTypeCommonReply && |
| | | msg.ParseBody(rbody) && |
| | | IsSuccess(rbody.errmsg().errcode()); |
| | | printf("async regisered %s\n", ok ? "ok" : "failed"); |
| | | registered_.store(ok); |
| | | }; |
| | | |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(&BHTopicCenterAddress(), head, body); |
| | | auto onResult = [this, CheckResult](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | MsgCommonReply body; |
| | | CheckResult(imsg, head, body); |
| | | }; |
| | | return sock.Send(&BHTopicCenterAddress(), head, body, onResult); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); |
| | | if (r && IsSuccess(reply_body.errmsg().errcode())) { |
| | | info_ = body; |
| | | return true; |
| | | if (r) { |
| | | CheckResult(reply, reply_head, reply_body); |
| | | } |
| | | return false; |
| | | return IsRegistered(); |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | auto &sock = SockNode(); |
| | | MsgHeartbeat body; |
| | | *body.mutable_proc() = proc; |
| | | body.mutable_proc()->Swap(&proc); |
| | | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | AddRoute(head, sock.id()); |
| | |
| | | |
| | | bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | //TODO check registered |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | auto &sock = SockServer(); |
| | | MsgRegisterRPC body; |
| | | body.mutable_topics()->Swap(&topics); |
| | |
| | | for (int i = 0; i < head.route_size() - 1; ++i) { |
| | | reply_head.add_route()->Swap(head.mutable_route(i)); |
| | | } |
| | | MsgI msg; |
| | | if (msg.Make(sock.shm(), reply_head, reply_body)) { |
| | | auto &remote = head.route().rbegin()->mq_id(); |
| | | sock.Send(remote.data(), msg); |
| | | } |
| | | auto &remote = head.route().rbegin()->mq_id(); |
| | | sock.Send(remote.data(), reply_head, reply_body); |
| | | } |
| | | }; |
| | | |
| | |
| | | |
| | | bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | auto &sock = SockServer(); |
| | | |
| | | MsgI imsg; |
| | |
| | | |
| | | bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | auto &sock = SockServer(); |
| | | |
| | | SrcInfo *p = static_cast<SrcInfo *>(src_info); |
| | |
| | | if (head.type() == kMsgTypeRequestTopicReply) { |
| | | MsgRequestTopicReply reply; |
| | | if (imsg.ParseBody(reply)) { |
| | | cb(head.proc_id(), reply); |
| | | cb(head, reply); |
| | | } |
| | | } |
| | | }; |
| | |
| | | return SockRequest().Start(onData, nworker); |
| | | } |
| | | |
| | | bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const RequestResultCB &cb) |
| | | bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb) |
| | | { |
| | | auto Call = [&](const void *remote) { |
| | | auto &sock = SockRequest(); |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(req), proc_id())); |
| | | const std::string &msg_id(NewMsgId()); |
| | | |
| | | out_msg_id = msg_id; |
| | | |
| | | auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) { |
| | | auto &sock = SockClient(); |
| | | BHMsgHead head(InitMsgHead(GetType(req), proc_id(), msg_id)); |
| | | AddRoute(head, sock.id()); |
| | | head.set_topic(req.topic()); |
| | | |
| | | if (cb) { |
| | | auto onRecv = [cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() == kMsgTypeRequestTopicReply) { |
| | | MsgRequestTopicReply reply; |
| | | if (imsg.ParseBody(reply)) { |
| | | cb(head.proc_id(), reply); |
| | | cb(head, reply); |
| | | } |
| | | } |
| | | }; |
| | | return sock.Send(remote, head, req, onRecv); |
| | | return sock.Send(addr.mq_id().data(), head, req, onRecv); |
| | | } else { |
| | | return sock.Send(remote, head, req); |
| | | return sock.Send(addr.mq_id().data(), head, req); |
| | | } |
| | | }; |
| | | |
| | | try { |
| | | auto &sock = SockClient(); |
| | | BHAddress addr; |
| | | if (ClientQueryRPCTopic(req.topic(), addr, 1000)) { |
| | | return Call(addr.mq_id().data()); |
| | | } else { |
| | | SetLastError(eNotFound, "remote not found."); |
| | | return false; |
| | | |
| | | if (topic_query_cache_.Find(req.topic(), addr)) { |
| | | return SendTo(addr, req, cb); |
| | | } |
| | | |
| | | MsgQueryTopic query; |
| | | query.set_topic(req.topic()); |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | auto onQueryResult = [this, SendTo, req, cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | MsgQueryTopicReply rep; |
| | | if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) { |
| | | auto &addr = rep.address(); |
| | | if (!addr.mq_id().empty()) { |
| | | topic_query_cache_.Update(req.topic(), addr); |
| | | SendTo(addr, req, cb); |
| | | } |
| | | } |
| | | }; |
| | | return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult); |
| | | |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | |
| | | |
| | | bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | try { |
| | | auto &sock = SockRequest(); |
| | | |
| | |
| | | if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { |
| | | BHMsgHead head(InitMsgHead(GetType(request), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | head.set_topic(request.topic()); |
| | | |
| | | MsgI reply_msg; |
| | | DEFER1(reply_msg.Release(shm_);); |
| | |
| | | |
| | | bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | auto &sock = SockRequest(); |
| | | |
| | | if (topic_query_cache_.Find(topic, addr)) { |
| | |
| | | |
| | | bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | try { |
| | | auto &sock = SockPub(); |
| | | BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); |
| | |
| | | |
| | | // subscribe |
| | | |
| | | bool TopicNode::Subscribe(MsgTopicList &topics, const int timeout_ms) |
| | | bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | try { |
| | | auto &sock = SockSub(); |
| | | MsgSubscribe sub; |
| | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm());); |
| | | BHMsgHead reply_head; |
| | | MsgCommonReply reply_body; |
| | | return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeCommonReply && |
| | | reply.ParseBody(reply_body) && |
| | |
| | | |
| | | bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms) |
| | | { |
| | | if (!IsRegistered()) { return false; } |
| | | |
| | | auto &sock = SockSub(); |
| | | MsgI msg; |
| | | DEFER1(msg.Release(shm());); |
| | |
| | | class TopicNode |
| | | { |
| | | SharedMemory &shm_; |
| | | MsgRegister info_; |
| | | ProcInfo info_; |
| | | |
| | | SharedMemory &shm() { return shm_; } |
| | | |
| | |
| | | bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply); |
| | | |
| | | // topic client |
| | | typedef std::function<void(const std::string &proc_id, const MsgRequestTopicReply &reply)> RequestResultCB; |
| | | typedef std::function<void(const BHMsgHead &head, const MsgRequestTopicReply &reply)> RequestResultCB; |
| | | bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2); |
| | | bool ClientAsyncRequest(const MsgRequestTopic &request, const RequestResultCB &rrcb = RequestResultCB()); |
| | | bool ClientAsyncRequest(const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB()); |
| | | bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms); |
| | | |
| | | // publish |
| | |
| | | // subscribe |
| | | typedef std::function<void(const std::string &proc_id, const MsgPublish &data)> SubDataCB; |
| | | bool SubscribeStartWorker(const SubDataCB &tdcb, int nworker = 2); |
| | | bool Subscribe(MsgTopicList &topics, const int timeout_ms); |
| | | bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms); |
| | | bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms); |
| | | |
| | | void Start(ServerCB const &server_cb, SubDataCB const &sub_cb); |
| | | void Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2); |
| | | void Stop(); |
| | | |
| | | private: |
| | | bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); |
| | | const std::string &proc_id() { return info_.proc().proc_id(); } |
| | | const std::string &proc_id() { return info_.proc_id(); } |
| | | |
| | | typedef bhome_msg::BHAddress Address; |
| | | class TopicQueryCache |
| | |
| | | auto &SockClient() { return SockRequest(); } |
| | | auto &SockReply() { return sock_reply_; } |
| | | auto &SockServer() { return SockReply(); } |
| | | bool IsRegistered() const { return registered_.load(); } |
| | | |
| | | std::atomic<bool> registered_; |
| | | ShmSocket sock_node_; |
| | | ShmSocket sock_request_; |
| | | ShmSocket sock_reply_; |
| | |
| | | */ |
| | | #include "bh_api.h" |
| | | #include "util.h" |
| | | #include <atomic> |
| | | |
| | | class DemoClient |
| | | using namespace bhome::msg; |
| | | |
| | | namespace |
| | | { |
| | | public: |
| | | typedef std::atomic<uint64_t> Number; |
| | | |
| | | struct MsgStatus { |
| | | Number nrequest_; |
| | | Number nreply_; |
| | | Number nserved_; |
| | | MsgStatus() : |
| | | nrequest_(0), nreply_(0), nserved_(0) {} |
| | | }; |
| | | |
| | | MsgStatus &Status() |
| | | { |
| | | static MsgStatus st; |
| | | return st; |
| | | } |
| | | } // namespace |
| | | |
| | | void SubRecvProc(const void *proc_id, |
| | | const int proc_id_len, |
| | | const void *data, |
| | | const int data_len) |
| | | { |
| | | std::string proc((const char *) proc_id, proc_id_len); |
| | | MsgPublish pub; |
| | | pub.ParseFromArray(data, data_len); |
| | | // printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str()); |
| | | } |
| | | |
| | | void ServerProc(const void *proc_id, |
| | | const int proc_id_len, |
| | | const void *data, |
| | | const int data_len, |
| | | BHServerCallbackTag *tag) |
| | | { |
| | | // printf("ServerProc: "); |
| | | // DEFER1(printf("\n");); |
| | | MsgRequestTopic request; |
| | | if (request.ParseFromArray(data, data_len)) { |
| | | MsgRequestTopicReply reply; |
| | | reply.set_data(" reply: " + request.data()); |
| | | std::string s(reply.SerializeAsString()); |
| | | // printf("%s", reply.data().c_str()); |
| | | BHServerCallbackReply(tag, s.data(), s.size()); |
| | | ++Status().nserved_; |
| | | } |
| | | } |
| | | |
| | | void ClientProc(const void *proc_id, |
| | | const int proc_id_len, |
| | | const void *msg_id, |
| | | const int msg_id_len, |
| | | const void *data, |
| | | const int data_len) |
| | | { |
| | | std::string proc((const char *) proc_id, proc_id_len); |
| | | MsgRequestTopicReply reply; |
| | | if (reply.ParseFromArray(data, data_len)) { |
| | | ++Status().nreply_; |
| | | } |
| | | // printf("client Recv reply : %s\n", reply.data().c_str()); |
| | | } |
| | | |
| | | BOOST_AUTO_TEST_CASE(ApiTest) |
| | | { |
| | |
| | | nsec, nhour, nday, years); |
| | | std::chrono::steady_clock::duration a(123456); |
| | | printf("nowsec: %ld\n", NowSec()); |
| | | // for (int i = 0; i < 5; ++i) { |
| | | // std::this_thread::sleep_for(1s); |
| | | // printf("nowsec: %ld\n", NowSec()); |
| | | // } |
| | | |
| | | printf("maxsec: %ld\n", CountSeconds(max_time)); |
| | | |
| | | ProcInfo proc; |
| | | proc.set_proc_id("demo_client"); |
| | | proc.set_public_info("public info of demo_client. etc..."); |
| | | std::string proc_buf(proc.SerializeAsString()); |
| | | void *reply = 0; |
| | | int reply_len = 0; |
| | | bool r = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 1000); |
| | | printf("register %s\n", r ? "ok" : "failed"); |
| | | bool reg = false; |
| | | for (int i = 0; i < 10 && !reg; ++i) { |
| | | ProcInfo proc; |
| | | proc.set_proc_id("demo_client"); |
| | | proc.set_public_info("public info of demo_client. etc..."); |
| | | std::string proc_buf(proc.SerializeAsString()); |
| | | void *reply = 0; |
| | | int reply_len = 0; |
| | | reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000); |
| | | printf("register %s\n", reg ? "ok" : "failed"); |
| | | |
| | | BHFree(reply, reply_len); |
| | | Sleep(1s); |
| | | } |
| | | |
| | | const std::string topic_ = "topic_"; |
| | | |
| | | { |
| | | MsgTopicList topics; |
| | | for (int i = 0; i < 10; ++i) { |
| | | topics.add_topic_list(topic_ + std::to_string(i)); |
| | | } |
| | | std::string s = topics.SerializeAsString(); |
| | | void *reply = 0; |
| | | int reply_len = 0; |
| | | bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000); |
| | | BHFree(reply, reply_len); |
| | | // printf("register topic : %s\n", r ? "ok" : "failed"); |
| | | Sleep(1s); |
| | | } |
| | | |
| | | { |
| | | MsgTopicList topics; |
| | | for (int i = 0; i < 10; ++i) { |
| | | topics.add_topic_list(topic_ + std::to_string(i * 2)); |
| | | } |
| | | std::string s = topics.SerializeAsString(); |
| | | void *reply = 0; |
| | | int reply_len = 0; |
| | | bool r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000); |
| | | BHFree(reply, reply_len); |
| | | printf("subscribe topic : %s\n", r ? "ok" : "failed"); |
| | | } |
| | | |
| | | BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); |
| | | |
| | | { |
| | | for (int i = 0; i < 1; ++i) { |
| | | MsgPublish pub; |
| | | pub.set_topic(topic_ + std::to_string(i)); |
| | | pub.set_data("pub_data_" + std::string(1024 * 1024, 'a')); |
| | | std::string s(pub.SerializeAsString()); |
| | | BHPublish(s.data(), s.size(), 0); |
| | | // Sleep(1s); |
| | | } |
| | | } |
| | | |
| | | auto asyncRequest = [&](uint64_t nreq) { |
| | | for (uint64_t i = 0; i < nreq; ++i) { |
| | | MsgRequestTopic req; |
| | | req.set_topic(topic_ + std::to_string(0)); |
| | | req.set_data("request_data_" + std::to_string(i)); |
| | | std::string s(req.SerializeAsString()); |
| | | void *msg_id = 0; |
| | | int len = 0; |
| | | bool r = BHAsyncRequest(s.data(), s.size(), 0, 0); |
| | | DEFER1(BHFree(msg_id, len);); |
| | | if (r) { |
| | | ++Status().nrequest_; |
| | | } else { |
| | | printf("request topic : %s\n", r ? "ok" : "failed"); |
| | | } |
| | | } |
| | | }; |
| | | auto showStatus = [](std::atomic<bool> *run) { |
| | | int64_t last = 0; |
| | | while (*run) { |
| | | auto &st = Status(); |
| | | std::this_thread::sleep_for(1s); |
| | | int cur = st.nreply_.load(); |
| | | printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld, speed %8ld\n", st.nrequest_.load(), st.nserved_.load(), cur, cur - last); |
| | | last = cur; |
| | | } |
| | | }; |
| | | auto hb = [](std::atomic<bool> *run) { |
| | | while (*run) { |
| | | BHHeartBeatEasy(0); |
| | | std::this_thread::sleep_for(1s); |
| | | } |
| | | }; |
| | | std::atomic<bool> run(true); |
| | | ThreadManager threads; |
| | | boost::timer::auto_cpu_timer timer; |
| | | threads.Launch(hb, &run); |
| | | // threads.Launch(showStatus, &run); |
| | | int ncli = 10; |
| | | const uint64_t nreq = 1000 * 100; |
| | | for (int i = 0; i < ncli; ++i) { |
| | | threads.Launch(asyncRequest, nreq); |
| | | } |
| | | |
| | | int same = 0; |
| | | int64_t last = 0; |
| | | while (last < nreq * ncli && same < 3) { |
| | | Sleep(1s); |
| | | auto cur = Status().nreply_.load(); |
| | | if (last == cur) { |
| | | ++same; |
| | | } else { |
| | | last = cur; |
| | | same = 0; |
| | | } |
| | | } |
| | | |
| | | run = false; |
| | | threads.WaitAll(); |
| | | auto &st = Status(); |
| | | printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load()); |
| | | } |
| | |
| | | BHCenter center(shm); |
| | | center.Start(); |
| | | |
| | | std::this_thread::sleep_for(100ms); |
| | | Sleep(100ms); |
| | | |
| | | std::atomic<uint64_t> total_count(0); |
| | | std::atomic<ptime> last_time(Now() - seconds(1)); |
| | |
| | | for (auto &t : topics) { |
| | | tlist.add_topic_list(t); |
| | | } |
| | | bool r = client.Subscribe(tlist, timeout); |
| | | MsgCommonReply reply_body; |
| | | bool r = client.Subscribe(tlist, reply_body, timeout); |
| | | if (!r) { |
| | | printf("client subscribe failed.\n"); |
| | | } |
| | |
| | | MsgPublish pub; |
| | | pub.set_topic(topic); |
| | | pub.set_data(data); |
| | | bool r = provider.Publish(pub, timeout); |
| | | bool r = provider.Publish(pub, 0); |
| | | if (!r) { |
| | | static std::atomic<int> an(0); |
| | | int n = ++an; |
| | |
| | | part.push_back(topics[i]); |
| | | threads.Launch(Sub, i, topics); |
| | | } |
| | | std::this_thread::sleep_for(100ms); |
| | | Sleep(100ms); |
| | | for (auto &topic : topics) { |
| | | threads.Launch(Pub, topic); |
| | | } |
| | |
| | | |
| | | std::atomic<int> count(0); |
| | | std::string reply; |
| | | auto onRecv = [&](const std::string &proc_id, const MsgRequestTopicReply &msg) { |
| | | auto onRecv = [&](const BHMsgHead &head, const MsgRequestTopicReply &msg) { |
| | | reply = msg.data(); |
| | | if (++count >= nreq) { |
| | | printf("count: %d\n", count.load()); |
| | |
| | | MsgRequestTopic req; |
| | | req.set_topic(topic); |
| | | req.set_data("data " + std::to_string(i)); |
| | | if (!client.ClientAsyncRequest(req)) { |
| | | std::string msg_id; |
| | | if (!client.ClientAsyncRequest(req, msg_id)) { |
| | | printf("client request failed\n"); |
| | | ++count; |
| | | } |
| | |
| | | ThreadManager clients, servers; |
| | | std::vector<Topic> topics = {"topic1", "topic2"}; |
| | | servers.Launch(Server, "server", topics); |
| | | std::this_thread::sleep_for(100ms); |
| | | Sleep(100ms); |
| | | for (auto &t : topics) { |
| | | clients.Launch(Client, t, 1000 * 1); |
| | | clients.Launch(Client, t, 1000 * 100); |
| | | } |
| | | clients.WaitAll(); |
| | | printf("clients done, server replyed: %ld\n", server_msg_count.load()); |
| | |
| | | }; |
| | | Check(); |
| | | for (int i = 0; i < 3; ++i) { |
| | | std::this_thread::sleep_for(1s); |
| | | Sleep(1s); |
| | | Check(); |
| | | } |
| | | printf("sleep 4\n"); |
| | | std::this_thread::sleep_for(4s); |
| | | Sleep(4s); |
| | | for (int i = 0; i < 2; ++i) { |
| | | std::this_thread::sleep_for(1s); |
| | | Sleep(1s); |
| | | Check(); |
| | | } |
| | | } |
| | | printf("sleep 8\n"); |
| | | std::this_thread::sleep_for(8s); |
| | | Sleep(8s); |
| | | } |
| | | inline int MyMin(int a, int b) |
| | | { |
| | |
| | | |
| | | using namespace std::chrono_literals; |
| | | |
| | | template <class D> |
| | | inline void Sleep(D d) |
| | | { |
| | | printf("sleep for %ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(d).count()); |
| | | std::this_thread::sleep_for(d); |
| | | } |
| | | |
| | | typedef std::function<void(void)> FuncVV; |
| | | |
| | | class ScopeCall : private boost::noncopyable |