join pub/sub to node; refactor.
| | |
| | | |
| | | // import "google/protobuf/descriptor.proto"; |
| | | import "bhome_msg_api.proto"; |
| | | import "error_msg.proto"; |
| | | |
| | | package bhome.msg; |
| | | |
| | |
| | | bytes topic = 6; // for request route |
| | | } |
| | | |
| | | message BHMsg { // deprecated |
| | | bytes msg_id = 1; |
| | | int64 timestamp = 2; |
| | | int32 type = 3; |
| | | repeated BHAddress route = 4; // for reply and proxy. |
| | | bytes body = 5; |
| | | message MsgRequest { |
| | | MsgType type = 1; |
| | | // oneof body; |
| | | } |
| | | |
| | | message MsgReply { |
| | | ErrorMsg err_msg = 1; |
| | | // oneof reply |
| | | } |
| | | |
| | | message BHMsgBody { |
| | | oneof reqrep { |
| | | MsgRequest request = 1; |
| | | MsgReply reply = 2; |
| | | } |
| | | } |
| | | |
| | | enum MsgType { |
| | |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); |
| | | bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 10); |
| | | bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100); |
| | | if (!r) { |
| | | printf("send reply failed.\n"); |
| | | } |
| | |
| | | MsgPublish pub; |
| | | NodeCenter::Clients clients; |
| | | MsgCommonReply reply; |
| | | MsgI pubmsg; |
| | | if (head.route_size() != 1 || !msg.ParseBody(pub)) { |
| | | return; |
| | | } else if (!center->FindClients(head, pub, clients, reply)) { |
| | | // send error reply. |
| | | MakeReplyer(socket, head, center->id())(reply); |
| | | } else if (pubmsg.MakeRC(socket.shm(), msg)) { |
| | | DEFER1(pubmsg.Release(socket.shm())); |
| | | } else { |
| | | MakeReplyer(socket, head, center->id())(MakeReply(eSuccess)); |
| | | if (!msg.EnableRefCount(socket.shm())) { return; } // no memory? |
| | | |
| | | for (auto &cli : clients) { |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node) { |
| | | socket.Send(cli.mq_.data(), pubmsg, 10); |
| | | if (!socket.Send(cli.mq_.data(), msg, 100)) { |
| | | printf("center route publish failed. need resend.\n"); |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | return true; |
| | | } |
| | | |
| | | bool MsgI::EnableRefCount(SharedMemory &shm) |
| | | { |
| | | if (!IsCounted()) { |
| | | count_ = shm.New<RefCount>(); |
| | | } |
| | | return IsCounted(); |
| | | } |
| | | |
| | | int MsgI::Release(SharedMemory &shm) |
| | | { |
| | | if (IsCounted()) { |
| | |
| | | bool IsCounted() const { return static_cast<bool>(count_); } |
| | | |
| | | template <class Body> |
| | | bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body) |
| | | { |
| | | return Make(shm, Pack(shm, head, body)); |
| | | } |
| | | template <class Body> |
| | | bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body) |
| | | inline bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body) |
| | | { |
| | | return MakeRC(shm, Pack(shm, head, body)); |
| | | } |
| | | bool MakeRC(SharedMemory &shm, MsgI &a) |
| | | |
| | | bool EnableRefCount(SharedMemory &shm); |
| | | |
| | | template <class Body> |
| | | inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body) |
| | | { |
| | | if (a.IsCounted()) { |
| | | *this = a; |
| | | AddRef(); |
| | | return true; |
| | | } else { |
| | | void *p = a.ptr_.get(); |
| | | a.ptr_ = 0; |
| | | return MakeRC(shm, p); |
| | | } |
| | | void *p = Pack(shm, head, body); |
| | | auto NeedRefCount = [&]() { return head.type() == kMsgTypePublish; }; |
| | | return NeedRefCount() ? MakeRC(shm, p) : Make(shm, p); |
| | | } |
| | | |
| | | bool ParseHead(BHMsgHead &head) const; |
| | | template <class Body> |
| | | bool ParseBody(Body &body) const |
| | |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid); |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id); |
| | | // inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } |
| | | |
| | | inline bool IsSuccess(const ErrorCode ec) { return ec == eSuccess; } |
| | | #endif // end of include guard: PROTO_UA9UWKL1 |
| | |
| | | static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend); |
| | | static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms); |
| | | |
| | | template <class... Extra> |
| | | bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, Extra const &...extra) |
| | | { |
| | | return Send(shm(), remote_id, msg, timeout_ms, extra...); |
| | | } |
| | | template <class Body, class... Extra> |
| | | bool Send(const MQId &remote_id, const BHMsgHead &head, const Body &body, const int timeout_ms, Extra const &...extra) |
| | | { |
| | | MsgI msg; |
| | | if (msg.Make(shm(), head, body)) { |
| | | if (Send(shm(), remote_id, msg, timeout_ms, extra...)) { |
| | | return true; |
| | | } else { |
| | | msg.Release(shm()); |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | template <class... Rest> |
| | | bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); } |
| | | size_t Pending() const { return data()->size(); } |
| | | }; |
| | | |
| | |
| | | |
| | | bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle) |
| | | { |
| | | auto onRecv = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | auto Find = [&](RecvCB &cb) { |
| | | std::lock_guard<std::mutex> lock(mutex()); |
| | | const std::string &msgid = head.msg_id(); |
| | | auto pos = async_cbs_.find(msgid); |
| | | if (pos != async_cbs_.end()) { |
| | | cb.swap(pos->second); |
| | | async_cbs_.erase(pos); |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | }; |
| | | |
| | | auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | RecvCB cb; |
| | | if (Find(cb)) { |
| | | if (async_cbs_->Find(head.msg_id(), cb)) { |
| | | cb(socket, imsg, head); |
| | | } else if (onData) { |
| | | onData(socket, imsg, head); |
| | | } // else ignored, or dropped |
| | | }; |
| | | |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | StopNoLock(); |
| | | auto RecvProc = [this, onRecv, onIdle]() { |
| | | while (run_) { |
| | | try { |
| | | MsgI imsg; |
| | | if (mq().Recv(imsg, 10)) { |
| | | DEFER1(imsg.Release(shm())); |
| | | BHMsgHead head; |
| | | if (imsg.ParseHead(head)) { |
| | | onRecv(*this, imsg, head); |
| | | } |
| | | } else if (onIdle) { |
| | | onIdle(*this); |
| | | auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() { |
| | | try { |
| | | MsgI imsg; |
| | | if (mq().Recv(imsg, 10)) { |
| | | DEFER1(imsg.Release(shm())); |
| | | BHMsgHead head; |
| | | if (imsg.ParseHead(head)) { |
| | | onRecvWithPerMsgCB(*this, imsg, head); |
| | | } |
| | | } catch (...) { |
| | | } else if (onIdle) { |
| | | onIdle(*this); |
| | | } |
| | | } catch (...) { |
| | | } |
| | | }; |
| | | |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | StopNoLock(); |
| | | |
| | | run_.store(true); |
| | | for (int i = 0; i < nworker; ++i) { |
| | | workers_.emplace_back(RecvProc); |
| | | workers_.emplace_back([this, recvLoopBody]() { while (run_) { recvLoopBody(); } }); |
| | | } |
| | | return true; |
| | | } |
| | |
| | | #ifndef SOCKET_GWTJHBPO |
| | | #define SOCKET_GWTJHBPO |
| | | |
| | | #include "bh_util.h" |
| | | #include "defs.h" |
| | | #include "shm_queue.h" |
| | | #include <atomic> |
| | |
| | | |
| | | class ShmSocket : private boost::noncopyable |
| | | { |
| | | template <class DoSend> |
| | | inline bool SendImpl(MsgI &msg, const int timeout_ms, const DoSend &doSend) |
| | | { |
| | | bool r = false; |
| | | DEFER1(if (msg.IsCounted() || !r) { msg.Release(shm()); }); |
| | | r = doSend(msg); |
| | | return r; |
| | | } |
| | | |
| | | protected: |
| | | typedef bhome_shm::ShmMsgQueue Queue; |
| | | |
| | |
| | | bool Stop(); |
| | | size_t Pending() const { return mq().Pending(); } |
| | | |
| | | bool Send(const void *id, const MsgI &imsg, const int timeout_ms) |
| | | bool Send(const void *valid_remote, const MsgI &imsg, const int timeout_ms) |
| | | { |
| | | return mq().Send(*static_cast<const MQId *>(id), imsg, timeout_ms); |
| | | assert(valid_remote); |
| | | return mq().Send(*static_cast<const MQId *>(valid_remote), imsg, timeout_ms); |
| | | } |
| | | //TODO reimplment, using async. |
| | | bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms); |
| | | |
| | | template <class Body> |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb = RecvCB()) |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb) |
| | | { |
| | | assert(valid_remote); |
| | | try { |
| | | if (cb) { |
| | | auto RegisterCB = [&]() { |
| | | std::lock_guard<std::mutex> lock(mutex()); |
| | | async_cbs_.emplace(head.msg_id(), cb); |
| | | }; |
| | | return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms, RegisterCB); |
| | | } else { |
| | | return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms); |
| | | } |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { async_cbs_->Add(head.msg_id(), cb); }); }; |
| | | MsgI msg; |
| | | return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend); |
| | | } |
| | | |
| | | template <class Body> |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms) |
| | | { |
| | | auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms); }; |
| | | MsgI msg; |
| | | return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend); |
| | | } |
| | | |
| | | template <class Body> |
| | |
| | | std::atomic<bool> run_; |
| | | |
| | | Queue mq_; |
| | | std::unordered_map<std::string, RecvCB> async_cbs_; |
| | | class AsyncCBs |
| | | { |
| | | std::unordered_map<std::string, RecvCB> store_; |
| | | |
| | | public: |
| | | bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; } |
| | | bool Find(const std::string &id, RecvCB &cb) |
| | | { |
| | | auto pos = store_.find(id); |
| | | if (pos != store_.end()) { |
| | | cb.swap(pos->second); |
| | | store_.erase(pos); |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | }; |
| | | |
| | | Synced<AsyncCBs> async_cbs_; |
| | | }; |
| | | |
| | | #endif // end of include guard: SOCKET_GWTJHBPO |
| | |
| | | shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) |
| | | { |
| | | SockNode().Start(); |
| | | SockClient().Start(); |
| | | SockServer().Start(); |
| | | } |
| | | |
| | | TopicNode::~TopicNode() |
| | | { |
| | | StopAll(); |
| | | SockNode().Stop(); |
| | | } |
| | | |
| | | void TopicNode::StopAll() |
| | | { |
| | | ServerStop(); |
| | | ClientStopWorker(); |
| | | SockServer().Stop(); |
| | | SockClient().Stop(); |
| | | SockNode().Stop(); |
| | | } |
| | | |
| | | bool TopicNode::Register(const MsgRegister &body, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | auto &sock = SockNode(); |
| | | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | AddRoute(head, SockNode().id()); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = SockNode().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply; |
| | | r = r && reply.ParseBody(reply_body); |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); |
| | | if (r) { |
| | | info_ = body; |
| | | } |
| | |
| | | bool TopicNode::RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | //TODO check registered |
| | | auto &sock = SockServer(); |
| | | |
| | | auto head(InitMsgHead(GetType(body), proc_id())); |
| | | AddRoute(head, SockReply().id()); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = SockReply().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply; |
| | | r = r && reply.ParseBody(reply_body); |
| | | return r; |
| | |
| | | onIdle(sock); |
| | | }; |
| | | |
| | | return rcb && SockReply().Start(onRecv, onIdle, nworker); |
| | | auto &sock = SockServer(); |
| | | return rcb && sock.Start(onRecv, onIdle, nworker); |
| | | } |
| | | bool TopicNode::ServerStop() { return SockReply().Stop(); } |
| | | |
| | | bool TopicNode::ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) |
| | | { |
| | | auto &sock = SockServer(); |
| | | |
| | | MsgI imsg; |
| | | BHMsgHead head; |
| | | if (SockReply().SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { |
| | | if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { |
| | | MsgRequestTopic request; |
| | | if (imsg.ParseBody(request)) { |
| | | request.mutable_topic()->swap(topic); |
| | |
| | | |
| | | bool TopicNode::ServerSendReply(void *src_info, const std::string &data, const int timeout_ms) |
| | | { |
| | | auto &sock = SockServer(); |
| | | |
| | | SrcInfo *p = static_cast<SrcInfo *>(src_info); |
| | | DEFER1(delete p); |
| | | if (!p || p->route.empty()) { |
| | |
| | | head.add_route()->Swap(&p->route[i]); |
| | | } |
| | | |
| | | return SockReply().Send(p->route.back().mq_id().data(), head, body, timeout_ms); |
| | | return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms); |
| | | } |
| | | |
| | | bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker) |
| | |
| | | |
| | | return SockRequest().Start(onData, nworker); |
| | | } |
| | | bool TopicNode::ClientStopWorker() { return SockRequest().Stop(); } |
| | | |
| | | bool TopicNode::ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) |
| | | { |
| | | auto Call = [&](const void *remote) { |
| | | auto &sock = SockRequest(); |
| | | |
| | | MsgRequestTopic req; |
| | | req.set_topic(topic); |
| | | req.set_data(data, size); |
| | |
| | | { |
| | | try { |
| | | auto &sock = SockRequest(); |
| | | |
| | | BHAddress addr; |
| | | if (ClientQueryRPCTopic(topic, addr, timeout_ms)) { |
| | | |
| | |
| | | bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) |
| | | { |
| | | auto &sock = SockRequest(); |
| | | |
| | | if (topic_query_cache_.Find(topic, addr)) { |
| | | return true; |
| | | } |
| | |
| | | } else { |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | // publish |
| | | |
| | | bool TopicNode::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms) |
| | | { |
| | | try { |
| | | auto &sock = SockPub(); |
| | | |
| | | MsgPublish pub; |
| | | pub.set_topic(topic); |
| | | pub.set_data(data, size); |
| | | BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm());); |
| | | BHMsgHead reply_head; |
| | | MsgCommonReply reply_body; |
| | | return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeCommonReply && |
| | | reply.ParseBody(reply_body) && |
| | | IsSuccess(reply_body.errmsg().errcode()); |
| | | } catch (...) { |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | // subscribe |
| | | |
| | | bool TopicNode::Subscribe(const std::vector<Topic> &topics, const int timeout_ms) |
| | | { |
| | | try { |
| | | auto &sock = SockSub(); |
| | | MsgSubscribe sub; |
| | | for (auto &topic : topics) { |
| | | sub.add_topics(topic); |
| | | } |
| | | BHMsgHead head(InitMsgHead(GetType(sub), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | return sock.Send(&BHTopicBusAddress(), head, sub, timeout_ms); |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::SubscribeStartWorker(const TopicDataCB &tdcb, int nworker) |
| | | { |
| | | auto &sock = SockSub(); |
| | | |
| | | auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() == kMsgTypePublish) { |
| | | MsgPublish pub; |
| | | if (imsg.ParseBody(pub)) { |
| | | tdcb(head.proc_id(), pub.topic(), pub.data()); |
| | | } |
| | | } else { |
| | | // ignored, or dropped |
| | | } |
| | | }; |
| | | |
| | | return tdcb && sock.Start(AsyncRecvProc, nworker); |
| | | } |
| | | |
| | | bool TopicNode::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms) |
| | | { |
| | | auto &sock = SockSub(); |
| | | MsgI msg; |
| | | DEFER1(msg.Release(shm());); |
| | | BHMsgHead head; |
| | | if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { |
| | | MsgPublish pub; |
| | | if (msg.ParseBody(pub)) { |
| | | head.mutable_proc_id()->swap(proc_id); |
| | | pub.mutable_topic()->swap(topic); |
| | | pub.mutable_data()->swap(data); |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | |
| | | #define TOPIC_NODE_YVKWA6TF |
| | | |
| | | #include "msg.h" |
| | | #include "pubsub.h" |
| | | #include "socket.h" |
| | | #include <memory> |
| | | |
| | |
| | | SharedMemory &shm_; |
| | | MsgRegister info_; |
| | | |
| | | SharedMemory &shm() { return shm_; } |
| | | |
| | | public: |
| | | TopicNode(SharedMemory &shm); |
| | | ~TopicNode(); |
| | | |
| | | void StopAll(); |
| | | // topic node |
| | | bool Register(const MsgRegister &body, MsgCommonReply &reply, const int timeout_ms); |
| | | bool RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply, const int timeout_ms); |
| | | |
| | | // topic rpc server |
| | | typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest; |
| | | bool ServerStart(OnRequest const &cb, const int nworker = 2); |
| | | bool ServerStop(); |
| | | bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms); |
| | | bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms); |
| | | |
| | | // topic client |
| | | typedef std::function<void(const std::string &data)> RequestResultCB; |
| | | bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2); |
| | | bool ClientStopWorker(); |
| | | bool ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()); |
| | | bool ClientAsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()) |
| | | { |
| | |
| | | return ClientSyncRequest(topic, data.data(), data.size(), out, timeout_ms); |
| | | } |
| | | |
| | | void StopAll(); |
| | | // publish |
| | | bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms); |
| | | |
| | | // subscribe |
| | | typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB; |
| | | bool SubscribeStartWorker(const TopicDataCB &tdcb, int nworker = 2); |
| | | bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms); |
| | | bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms); |
| | | |
| | | private: |
| | | bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); |
| | |
| | | // some sockets may be the same one, using functions make it easy to change. |
| | | |
| | | auto &SockNode() { return sock_node_; } |
| | | auto &SockPub() { return SockNode(); } |
| | | auto &SockSub() { return sock_sub_; } |
| | | auto &SockRequest() { return sock_request_; } |
| | | auto &SockClient() { return SockRequest(); } |
| | | auto &SockReply() { return sock_reply_; } |
| | | auto &SockServer() { return SockReply(); } |
| | | |
| | | ShmSocket sock_node_; |
| | | ShmSocket sock_request_; |
| | | ShmSocket sock_reply_; |
| | | SocketSubscribe sock_sub_; |
| | | ShmSocket sock_sub_; |
| | | |
| | | TopicQueryCache topic_query_cache_; |
| | | }; |
| | |
| | | body.set_data(str); |
| | | auto head(InitMsgHead(GetType(body), proc_id)); |
| | | msg.MakeRC(shm, head, body); |
| | | assert(msg.IsCounted()); |
| | | DEFER1(msg.Release(shm);); |
| | | |
| | | for (uint64_t i = 0; i < n; ++i) { |
| | |
| | | SharedMemory shm(shm_name, 1024 * 1024 * 50); |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | | auto init_avail = Avail(); |
| | | ShmMsgQueue srv(shm, qlen); |
| | | ShmMsgQueue cli(shm, qlen); |
| | | ShmSocket srv(shm, qlen); |
| | | ShmSocket cli(shm, qlen); |
| | | |
| | | MsgI request_rc; |
| | | MsgRequestTopic req_body; |
| | |
| | | req_body.set_topic("topic"); |
| | | req_body.set_data(msg_content); |
| | | auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); |
| | | return cli.Send(srv.Id(), req_head, req_body, 100); |
| | | return cli.Send(&srv.id(), req_head, req_body, 100); |
| | | }; |
| | | auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); }; |
| | | auto ReqRC = [&]() { return cli.Send(&srv.id(), request_rc, 1000); }; |
| | | |
| | | if (!ReqRC()) { |
| | | printf("********** client send error.\n"); |
| | |
| | | } |
| | | MsgI msg; |
| | | BHMsgHead head; |
| | | if (!cli.Recv(msg, 1000)) { |
| | | if (!cli.SyncRecv(msg, head, 1000)) { |
| | | printf("********** client recv error.\n"); |
| | | } else { |
| | | DEFER1(msg.Release(shm)); |
| | |
| | | BHMsgHead req_head; |
| | | |
| | | while (!stop) { |
| | | if (srv.Recv(req, 100)) { |
| | | if (srv.SyncRecv(req, req_head, 100)) { |
| | | DEFER1(req.Release(shm)); |
| | | |
| | | if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) { |
| | | auto &mqid = req_head.route()[0].mq_id(); |
| | | MQId src_id; |
| | |
| | | reply_body.set_topic("topic"); |
| | | reply_body.set_data(msg_content); |
| | | auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id())); |
| | | return srv.Send(src_id, reply_head, reply_body, 100); |
| | | return srv.Send(&src_id, reply_head, reply_body, 100); |
| | | }; |
| | | auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); }; |
| | | auto ReplyRC = [&]() { return srv.Send(&src_id, reply_rc, 100); }; |
| | | |
| | | if (ReplyRC()) { |
| | | } |
| | |
| | | #include "center.h" |
| | | #include "defs.h" |
| | | #include "pubsub.h" |
| | | #include "socket.h" |
| | | #include "topic_node.h" |
| | | #include "util.h" |
| | | #include <atomic> |
| | | #include <boost/uuid/uuid_generators.hpp> |
| | |
| | | const uint64_t nmsg = 100 * 2; |
| | | const int timeout = 1000; |
| | | auto Sub = [&](int id, const std::vector<std::string> &topics) { |
| | | SocketSubscribe client(shm); |
| | | bool r = client.Subscribe(sub_proc_id, topics, timeout); |
| | | DemoNode client("client_" + std::to_string(id), shm); |
| | | |
| | | bool r = client.Subscribe(topics, timeout); |
| | | if (!r) { |
| | | printf("client subscribe failed.\n"); |
| | | } |
| | | std::mutex mutex; |
| | | std::condition_variable cv; |
| | | |
| | |
| | | } |
| | | // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str()); |
| | | }; |
| | | client.StartRecv(OnTopicData, 1); |
| | | client.SubscribeStartWorker(OnTopicData, 1); |
| | | |
| | | std::unique_lock<std::mutex> lk(mutex); |
| | | cv.wait(lk); |
| | | }; |
| | | |
| | | auto Pub = [&](const std::string &topic) { |
| | | SocketPublish provider(shm); |
| | | DemoNode provider("server_" + topic, shm); |
| | | |
| | | for (unsigned i = 0; i < nmsg; ++i) { |
| | | std::string data = topic + std::to_string(i) + std::string(1000, '-'); |
| | | |
| | | bool r = provider.Publish(pub_proc_id, topic, data.data(), data.size(), timeout); |
| | | bool r = provider.Publish(topic, data.data(), data.size(), timeout); |
| | | if (!r) { |
| | | printf("pub ret: %s\n", r ? "ok" : "fail"); |
| | | } |
| | |
| | | std::atomic<bool> run(true); |
| | | |
| | | auto Client = [&](const std::string &topic, const int nreq) { |
| | | TopicNode client(shm); |
| | | MsgRegister reg; |
| | | reg.mutable_proc()->set_proc_id(client_proc_id + topic); |
| | | MsgCommonReply reply_body; |
| | | |
| | | if (!client.Register(reg, reply_body, 1000)) { |
| | | printf("client register failed\n"); |
| | | return; |
| | | } |
| | | DemoNode client(client_proc_id + topic, shm); |
| | | |
| | | std::atomic<int> count(0); |
| | | std::string reply; |
| | |
| | | do { |
| | | std::this_thread::yield(); |
| | | } while (count.load() < nreq); |
| | | client.ClientStopWorker(); |
| | | client.StopAll(); |
| | | printf("request %s %d done ", topic.c_str(), count.load()); |
| | | }; |
| | | |
| | | std::atomic_uint64_t server_msg_count(0); |
| | | auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { |
| | | TopicNode server(shm); |
| | | MsgRegister reg; |
| | | reg.mutable_proc()->set_proc_id(server_proc_id); |
| | | reg.mutable_proc()->set_name(name); |
| | | MsgCommonReply reply_body; |
| | | |
| | | if (!server.Register(reg, reply_body, 100)) { |
| | | printf("server register failed\n"); |
| | | return; |
| | | } |
| | | DemoNode server(name, shm); |
| | | |
| | | auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) { |
| | | ++server_msg_count; |
| | |
| | | for (auto &topic : topics) { |
| | | rpc.add_topics(topic); |
| | | } |
| | | MsgCommonReply reply_body; |
| | | if (!server.RegisterRPC(rpc, reply_body, 100)) { |
| | | printf("server register topic failed\n"); |
| | | return; |
| | |
| | | clients.Launch(Client, t, 1000 * 1); |
| | | } |
| | | clients.WaitAll(); |
| | | printf("clients done, server replyed: %d\n", server_msg_count.load()); |
| | | printf("clients done, server replyed: %ld\n", server_msg_count.load()); |
| | | run = false; |
| | | servers.WaitAll(); |
| | | } |
| | |
| | | #define UTIL_W8A0OA5U |
| | | |
| | | #include "bh_util.h" |
| | | #include "msg.h" |
| | | #include "shm.h" |
| | | #include "shm_queue.h" |
| | | #include "topic_node.h" |
| | | #include <boost/date_time/posix_time/posix_time.hpp> |
| | | #include <boost/noncopyable.hpp> |
| | | #include <boost/test/unit_test.hpp> |
| | |
| | | ~ShmRemover() { SharedMemory::Remove(name_); } |
| | | }; |
| | | |
| | | class DemoNode : public TopicNode |
| | | { |
| | | std::string id_; |
| | | |
| | | public: |
| | | DemoNode(const std::string &id, SharedMemory &shm) : |
| | | TopicNode(shm), id_(id) { Init(); } |
| | | void Init() |
| | | { |
| | | MsgRegister reg; |
| | | reg.mutable_proc()->set_proc_id(id_); |
| | | MsgCommonReply reply_body; |
| | | |
| | | if (!Register(reg, reply_body, 1000)) { |
| | | printf("node %s register failed\n", id_.c_str()); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | #endif // end of include guard: UTIL_W8A0OA5U |