add api code, use proto msg as data.
| | |
| | | "program": "${workspaceFolder}/debug/bin/utest", |
| | | "args": [ |
| | | "-t", |
| | | "HeartbeatTest" |
| | | "ApiTest" |
| | | ], |
| | | "stopAtEntry": false, |
| | | "cwd": "${workspaceFolder}", |
| | |
| | | "build/": true, |
| | | "debug/": true |
| | | }, |
| | | "cmake.configureOnOpen": false |
| | | "cmake.configureOnOpen": false, |
| | | "C_Cpp.default.includePath": [ |
| | | "build/proto" |
| | | ] |
| | | } |
New file |
| | |
| | | #include "bh_api.h" |
| | | #include "defs.h" |
| | | #include "topic_node.h" |
| | | #include <memory> |
| | | |
| | | using namespace bhome_shm; |
| | | using namespace bhome_msg; |
| | | |
| | | namespace |
| | | { |
| | | TopicNode &ProcNode() |
| | | { |
| | | static TopicNode node(BHomeShm()); |
| | | return node; |
| | | } |
| | | |
| | | class TmpPtr : private boost::noncopyable |
| | | { |
| | | void *ptr_ = 0; |
| | | size_t size_ = 0; |
| | | |
| | | public: |
| | | explicit TmpPtr(const size_t size) : |
| | | ptr_(malloc(size)), size_(size) {} |
| | | explicit TmpPtr(const std::string &str) : |
| | | TmpPtr(str.size()) |
| | | { |
| | | if (ptr_) { |
| | | memcpy(ptr_, str.data(), str.size()); |
| | | } |
| | | } |
| | | ~TmpPtr() { free(ptr_); } |
| | | void *get() const { return ptr_; } |
| | | void *release() |
| | | { |
| | | void *tmp = ptr_; |
| | | ptr_ = 0; |
| | | return tmp; |
| | | } |
| | | size_t size() const { return size_; } |
| | | operator bool() const { return ptr_; } |
| | | }; |
| | | |
| | | template <class Msg> |
| | | bool PackOutput(const Msg &msg, void **out, int *out_len) |
| | | { |
| | | auto size = msg.ByteSizeLong(); |
| | | TmpPtr p(size); |
| | | if (!p) { |
| | | SetLastError(ENOMEM, "not enough memory."); |
| | | return false; |
| | | } |
| | | msg.SerializePartialToArray(p.get(), size); |
| | | *out = p.release(); |
| | | *out_len = size; |
| | | return true; |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | bool BHRegister(const void *proc_info, |
| | | const int proc_info_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | { |
| | | ProcInfo pi; |
| | | if (!pi.ParseFromArray(proc_info, proc_info_len)) { |
| | | SetLastError(eInvalidInput, "invalid input."); |
| | | return false; |
| | | } |
| | | MsgCommonReply msg_reply; |
| | | if (ProcNode().Register(pi, msg_reply, timeout_ms)) { |
| | | return PackOutput(msg_reply, reply, reply_len); |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | bool BHHeartBeatEasy(const int timeout_ms) |
| | | { |
| | | return ProcNode().Heartbeat(timeout_ms); |
| | | } |
| | | |
| | | bool BHHeartBeat(const void *proc_info, |
| | | const int proc_info_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | { |
| | | ProcInfo pi; |
| | | if (!pi.ParseFromArray(proc_info, proc_info_len)) { |
| | | SetLastError(eInvalidInput, "invalid input."); |
| | | return false; |
| | | } |
| | | MsgCommonReply msg_reply; |
| | | if (ProcNode().Heartbeat(pi, msg_reply, timeout_ms)) { |
| | | return PackOutput(msg_reply, reply, reply_len); |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | bool BHPublish(const void *msgpub, |
| | | const int msgpub_len, |
| | | const int timeout_ms) |
| | | { |
| | | MsgPublish pub; |
| | | if (!pub.ParseFromArray(msgpub, msgpub_len)) { |
| | | SetLastError(eInvalidInput, "invalid input."); |
| | | return false; |
| | | } |
| | | return ProcNode().Publish(pub, timeout_ms); |
| | | } |
| | | |
| | | bool BHReadSub(void **proc_id, |
| | | int *proc_id_len, |
| | | void **msgpub, |
| | | int *msgpub_len, |
| | | const int timeout_ms) |
| | | { |
| | | std::string proc; |
| | | MsgPublish pub; |
| | | |
| | | if (ProcNode().RecvSub(proc, pub, timeout_ms)) { |
| | | TmpPtr pproc(proc); |
| | | if (pproc && PackOutput(pub, msgpub, msgpub_len)) { |
| | | *proc_id = pproc.release(); |
| | | *proc_id_len = pproc.size(); |
| | | } else { |
| | | SetLastError(ENOMEM, "out of mem"); |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool BHRequest(const void *request, |
| | | const int request_len, |
| | | void **proc_id, |
| | | int *proc_id_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | { |
| | | MsgRequestTopic req; |
| | | if (!req.ParseFromArray(request, request_len)) { |
| | | SetLastError(eInvalidInput, "invalid input."); |
| | | return false; |
| | | } |
| | | std::string proc; |
| | | MsgRequestTopicReply out_msg; |
| | | if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) { |
| | | TmpPtr pproc(proc); |
| | | if (pproc && PackOutput(out_msg, reply, reply_len)) { |
| | | *proc_id = pproc.release(); |
| | | *proc_id_len = pproc.size(); |
| | | } else { |
| | | SetLastError(ENOMEM, "out of mem"); |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool BHReadRequest(void **proc_id, |
| | | int *proc_id_len, |
| | | void **request, |
| | | int *request_len, |
| | | void **src, |
| | | const int timeout_ms) |
| | | { |
| | | void *src_info = 0; |
| | | std::string proc; |
| | | MsgRequestTopic out_msg; |
| | | if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) { |
| | | TmpPtr pproc(proc); |
| | | if (pproc && PackOutput(out_msg, request, request_len)) { |
| | | *proc_id = pproc.release(); |
| | | *proc_id_len = pproc.size(); |
| | | *src = src_info; |
| | | } else { |
| | | SetLastError(ENOMEM, "out of mem"); |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool BHSendReply(void *src, |
| | | const void *reply, |
| | | const int reply_len, |
| | | const int timeout_ms) |
| | | { |
| | | MsgRequestTopicReply rep; |
| | | if (!rep.ParseFromArray(reply, reply_len)) { |
| | | SetLastError(eInvalidInput, "invalid input."); |
| | | return false; |
| | | } |
| | | return ProcNode().ServerSendReply(src, rep, timeout_ms); |
| | | } |
| | | |
| | | int BHCleanUp() |
| | | { |
| | | return 0; |
| | | } |
| | | |
| | | namespace |
| | | { |
| | | typedef std::function<bool(const void *, const int)> ServerSender; |
| | | } // namespace |
| | | |
| | | void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb) |
| | | { |
| | | TopicNode::ServerCB on_req; |
| | | TopicNode::SubDataCB on_sub; |
| | | if (server_cb) { |
| | | on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { |
| | | std::string sreq(request.SerializeAsString()); |
| | | bool r = false; |
| | | ServerSender sender = [&](const void *p, const int len) { |
| | | r = reply.ParseFromArray(p, len); |
| | | return r; |
| | | }; |
| | | server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), (BHServerCallbackTag *) (&sender)); |
| | | return r; |
| | | }; |
| | | } |
| | | if (sub_cb) { |
| | | on_sub = [sub_cb](const std::string &proc_id, const MsgPublish &pub) { |
| | | std::string s(pub.SerializeAsString()); |
| | | sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size()); |
| | | }; |
| | | } |
| | | |
| | | ProcNode().Start(on_req, on_sub); |
| | | } |
| | | bool BHServerCallbackReply(const BHServerCallbackTag *tag, |
| | | const void *data, |
| | | const int data_len) |
| | | { |
| | | auto &sender = *(const ServerSender *) (tag); |
| | | return sender(data, data_len); |
| | | } |
| | | |
| | | void BHFree(void *data, int size) |
| | | { |
| | | free(data); |
| | | } |
| | | |
| | | int BHGetLastError(void **msg, int *msg_len) |
| | | { |
| | | int ec = 0; |
| | | if (msg && msg_len) { |
| | | std::string err_msg; |
| | | GetLastError(ec, err_msg); |
| | | TmpPtr p(err_msg); |
| | | if (p) { |
| | | *msg = p.release(); |
| | | *msg_len = p.size(); |
| | | } |
| | | } |
| | | return ec; |
| | | } |
| | | |
| | | #undef BH_SOCKET_MEMF_CALL |
New file |
| | |
| | | #ifndef BH_API_WRAPPER_O81WKNXI |
| | | #define BH_API_WRAPPER_O81WKNXI |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | | #endif |
| | | |
| | | struct BHSrcInfo; |
| | | struct BHServerCallbackTag; |
| | | |
| | | bool BHRegister(const void *proc_info, |
| | | const int proc_info_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | typedef void (*FSubDataCallback)(const void *proc_id, |
| | | const int proc_id_len, |
| | | const void *data, |
| | | const int data_len); |
| | | |
| | | typedef void (*FServerCallback)(const void *proc_id, |
| | | const int proc_id_len, |
| | | const void *data, |
| | | const int data_len, |
| | | BHServerCallbackTag *tag); |
| | | |
| | | void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb); |
| | | bool BHServerCallbackReply(const BHServerCallbackTag *tag, |
| | | const void *data, |
| | | const int data_len); |
| | | |
| | | bool BHHeartBeatEasy(const int timeout_ms); |
| | | bool BHHeartBeat(const void *proc_info, |
| | | const int proc_info_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | bool BHPublish(const void *msgpub, |
| | | const int msgpub_len, |
| | | const int timeout_ms); |
| | | |
| | | bool BHReadSub(const void *proc_id, |
| | | const int proc_id_len, |
| | | void **msgpub, |
| | | int *msgpub_len, |
| | | const int timeout_ms); |
| | | |
| | | bool BHRequest(const void *request, |
| | | const int request_len, |
| | | void **proc_id, |
| | | int *proc_id_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms); |
| | | |
| | | bool BHReadRequest(void **proc_id, |
| | | int *proc_id_len, |
| | | void **request, |
| | | int *request_len, |
| | | BHSrcInfo **src, |
| | | const int timeout_ms); |
| | | |
| | | bool BHSendReply(BHSrcInfo *src, |
| | | const void *reply, |
| | | const int reply_len, |
| | | const int timeout_ms); |
| | | |
| | | // int BHCleanUp(); |
| | | |
| | | void BHFree(void *buf, int size); |
| | | |
| | | int BHGetLastError(void **msg, int &msg_len); |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | | #endif |
| | | #endif /* end of include guard: BH_API_WRAPPER_O81WKNXI */ |
| | |
| | | const MQId kBHTopicReqRepCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff"); |
| | | const MQId kBHUniCenter = boost::uuids::string_generator()("87654321-89ab-cdef-8349-1234567890ff"); |
| | | |
| | | struct LastError { |
| | | int ec_ = 0; |
| | | std::string msg_; |
| | | }; |
| | | |
| | | LastError &LastErrorStore() |
| | | { |
| | | thread_local LastError le; |
| | | return le; |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | const MQId &BHTopicBusAddress() { return kBHTopicBus; } |
| | | const MQId &BHTopicCenterAddress() { return kBHTopicReqRepCenter; } |
| | | const MQId &BHUniCenterAddress() { return kBHUniCenter; } |
| | | |
| | | void SetLastError(const int ec, const std::string &msg) |
| | | { |
| | | LastErrorStore().ec_ = ec; |
| | | LastErrorStore().msg_ = msg; |
| | | } |
| | | |
| | | void GetLastError(int &ec, std::string &msg) |
| | | { |
| | | ec = LastErrorStore().ec_; |
| | | msg = LastErrorStore().msg_; |
| | | } |
| | |
| | | |
| | | bhome_shm::SharedMemory &BHomeShm(); |
| | | typedef std::string Topic; |
| | | |
| | | void SetLastError(const int ec, const std::string &msg); |
| | | void GetLastError(int &ec, std::string &msg); |
| | | //TODO center can check shm for previous crash. |
| | | |
| | | #endif // end of include guard: DEFS_KP8LKGD0 |
| | |
| | | TopicNode::TopicNode(SharedMemory &shm) : |
| | | shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) |
| | | { |
| | | Start(); |
| | | SockNode().Start(); |
| | | } |
| | | |
| | | TopicNode::~TopicNode() |
| | |
| | | Stop(); |
| | | } |
| | | |
| | | void TopicNode::Start() |
| | | void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb) |
| | | { |
| | | SockNode().Start(); |
| | | SockClient().Start(); |
| | | SockServer().Start(); |
| | | ServerStart(server_cb, 1); |
| | | SubscribeStartWorker(sub_cb, 1); |
| | | // SockClient().Start(); |
| | | } |
| | | void TopicNode::Stop() |
| | | { |
| | | SockSub().Stop(); |
| | | SockServer().Stop(); |
| | | SockClient().Stop(); |
| | | SockNode().Stop(); |
| | |
| | | return r; |
| | | } |
| | | |
| | | bool TopicNode::ServerStart(const OnRequest &rcb, int nworker) |
| | | bool TopicNode::ServerStart(const ServerCB &rcb, int nworker) |
| | | { |
| | | //TODO check registered |
| | | |
| | | auto failed_q = std::make_shared<ServerFailedQ>(); |
| | | |
| | | auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); }; |
| | |
| | | if (head.type() == kMsgTypeRequestTopic && head.route_size() > 0) { |
| | | MsgRequestTopic req; |
| | | if (imsg.ParseBody(req)) { |
| | | std::string out; |
| | | if (rcb(req.topic(), req.data(), out)) { |
| | | MsgRequestTopicReply reply_body; |
| | | reply_body.set_data(std::move(out)); |
| | | MsgRequestTopicReply reply_body; |
| | | if (rcb(head.proc_id(), req, reply_body)) { |
| | | BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id())); |
| | | |
| | | for (int i = 0; i < head.route_size() - 1; ++i) { |
| | |
| | | return rcb && sock.Start(onRecv, onIdle, nworker); |
| | | } |
| | | |
| | | bool TopicNode::ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) |
| | | bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) |
| | | { |
| | | auto &sock = SockServer(); |
| | | |
| | | MsgI imsg; |
| | | BHMsgHead head; |
| | | if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { |
| | | MsgRequestTopic request; |
| | | if (imsg.ParseBody(request)) { |
| | | request.mutable_topic()->swap(topic); |
| | | request.mutable_data()->swap(data); |
| | | head.mutable_proc_id()->swap(proc_id); |
| | | SrcInfo *p = new SrcInfo; |
| | | p->route.assign(head.route().begin(), head.route().end()); |
| | | p->msg_id = head.msg_id(); |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool TopicNode::ServerSendReply(void *src_info, const std::string &data, const int timeout_ms) |
| | | bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body, const int timeout_ms) |
| | | { |
| | | auto &sock = SockServer(); |
| | | |
| | |
| | | if (!p || p->route.empty()) { |
| | | return false; |
| | | } |
| | | MsgRequestTopicReply body; |
| | | body.set_data(data); |
| | | BHMsgHead head(InitMsgHead(GetType(body), proc_id(), p->msg_id)); |
| | | |
| | | for (unsigned i = 0; i < p->route.size() - 1; ++i) { |
| | | head.add_route()->Swap(&p->route[i]); |
| | | } |
| | | |
| | | return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms); |
| | | } |
| | | |
| | |
| | | if (head.type() == kMsgTypeRequestTopicReply) { |
| | | MsgRequestTopicReply reply; |
| | | if (imsg.ParseBody(reply)) { |
| | | cb(reply.data()); |
| | | cb(head.proc_id(), reply); |
| | | } |
| | | } |
| | | }; |
| | |
| | | return SockRequest().Start(onData, nworker); |
| | | } |
| | | |
| | | bool TopicNode::ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) |
| | | bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const int timeout_ms, const RequestResultCB &cb) |
| | | { |
| | | auto Call = [&](const void *remote) { |
| | | auto &sock = SockRequest(); |
| | | |
| | | MsgRequestTopic req; |
| | | req.set_topic(topic); |
| | | req.set_data(data, size); |
| | | BHMsgHead head(InitMsgHead(GetType(req), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | |
| | | if (head.type() == kMsgTypeRequestTopicReply) { |
| | | MsgRequestTopicReply reply; |
| | | if (imsg.ParseBody(reply)) { |
| | | cb(reply.data()); |
| | | cb(head.proc_id(), reply); |
| | | } |
| | | } |
| | | }; |
| | |
| | | |
| | | try { |
| | | BHAddress addr; |
| | | if (ClientQueryRPCTopic(topic, addr, timeout_ms)) { |
| | | if (ClientQueryRPCTopic(req.topic(), addr, timeout_ms)) { |
| | | return Call(addr.mq_id().data()); |
| | | } else { |
| | | SetLastError(eNotFound, "remote not found."); |
| | | return false; |
| | | } |
| | | } catch (...) { |
| | |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms) |
| | | bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms) |
| | | { |
| | | try { |
| | | auto &sock = SockRequest(); |
| | | |
| | | BHAddress addr; |
| | | if (ClientQueryRPCTopic(topic, addr, timeout_ms)) { |
| | | |
| | | MsgRequestTopic req; |
| | | req.set_topic(topic); |
| | | req.set_data(data, size); |
| | | BHMsgHead head(InitMsgHead(GetType(req), proc_id())); |
| | | if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { |
| | | BHMsgHead head(InitMsgHead(GetType(request), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | MsgI reply_msg; |
| | | DEFER1(reply_msg.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | |
| | | if (sock.SendAndRecv(addr.mq_id().data(), head, req, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeRequestTopicReply) { |
| | | MsgRequestTopicReply dr; |
| | | if (reply.ParseBody(dr)) { |
| | | dr.mutable_data()->swap(out); |
| | | if (sock.SendAndRecv(addr.mq_id().data(), head, request, reply_msg, reply_head, timeout_ms) && reply_head.type() == kMsgTypeRequestTopicReply) { |
| | | if (reply_msg.ParseBody(out_reply)) { |
| | | reply_head.mutable_proc_id()->swap(out_proc_id); |
| | | return true; |
| | | } else { |
| | | printf("error parse reply.\n"); |
| | | } |
| | | } else { |
| | | printf("error recv data. line: %d\n", __LINE__); |
| | | } |
| | | } else { |
| | | printf("error recv data. line: %d\n", __LINE__); |
| | | SetLastError(eNotFound, "remote not found."); |
| | | } |
| | | } catch (...) { |
| | | printf("error recv data. line: %d\n", __LINE__); |
| | |
| | | |
| | | // publish |
| | | |
| | | bool TopicNode::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms) |
| | | bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms) |
| | | { |
| | | try { |
| | | auto &sock = SockPub(); |
| | | |
| | | MsgPublish pub; |
| | | pub.set_topic(topic); |
| | | pub.set_data(data, size); |
| | | BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::SubscribeStartWorker(const TopicDataCB &tdcb, int nworker) |
| | | bool TopicNode::SubscribeStartWorker(const SubDataCB &tdcb, int nworker) |
| | | { |
| | | auto &sock = SockSub(); |
| | | |
| | |
| | | if (head.type() == kMsgTypePublish) { |
| | | MsgPublish pub; |
| | | if (imsg.ParseBody(pub)) { |
| | | tdcb(head.proc_id(), pub.topic(), pub.data()); |
| | | tdcb(head.proc_id(), pub); |
| | | } |
| | | } else { |
| | | // ignored, or dropped |
| | |
| | | return tdcb && sock.Start(AsyncRecvProc, nworker); |
| | | } |
| | | |
| | | bool TopicNode::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms) |
| | | bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms) |
| | | { |
| | | auto &sock = SockSub(); |
| | | MsgI msg; |
| | | DEFER1(msg.Release(shm());); |
| | | BHMsgHead head; |
| | | //TODO error msg. |
| | | if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { |
| | | MsgPublish pub; |
| | | if (msg.ParseBody(pub)) { |
| | | head.mutable_proc_id()->swap(proc_id); |
| | | pub.mutable_topic()->swap(topic); |
| | | pub.mutable_data()->swap(data); |
| | | return true; |
| | | } |
| | | } |
| | |
| | | SharedMemory &shm() { return shm_; } |
| | | |
| | | public: |
| | | typedef std::function<void(std::string &proc_id, const void *data, const int len)> DataCB; |
| | | TopicNode(SharedMemory &shm); |
| | | ~TopicNode(); |
| | | |
| | | void Start(); |
| | | void Stop(); |
| | | // topic node |
| | | bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); |
| | | bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); |
| | | bool Heartbeat(const int timeout_ms); |
| | | |
| | | // topic rpc server |
| | | typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest; |
| | | bool ServerStart(OnRequest const &cb, const int nworker = 2); |
| | | typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerCB; |
| | | bool ServerStart(ServerCB const &cb, const int nworker = 2); |
| | | bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms); |
| | | bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms); |
| | | bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms); |
| | | bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms); |
| | | bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply, const int timeout_ms); |
| | | |
| | | // topic client |
| | | typedef std::function<void(const std::string &data)> RequestResultCB; |
| | | typedef std::function<void(const std::string &proc_id, const MsgRequestTopicReply &reply)> RequestResultCB; |
| | | bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2); |
| | | bool ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()); |
| | | bool ClientAsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()) |
| | | { |
| | | return ClientAsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb); |
| | | } |
| | | bool ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms); |
| | | bool ClientSyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms) |
| | | { |
| | | return ClientSyncRequest(topic, data.data(), data.size(), out, timeout_ms); |
| | | } |
| | | bool ClientAsyncRequest(const MsgRequestTopic &request, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()); |
| | | bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms); |
| | | |
| | | // publish |
| | | bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms); |
| | | bool Publish(const MsgPublish &pub, const int timeout_ms); |
| | | |
| | | // subscribe |
| | | typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB; |
| | | bool SubscribeStartWorker(const TopicDataCB &tdcb, int nworker = 2); |
| | | typedef std::function<void(const std::string &proc_id, const MsgPublish &data)> SubDataCB; |
| | | bool SubscribeStartWorker(const SubDataCB &tdcb, int nworker = 2); |
| | | bool Subscribe(MsgTopicList &topics, const int timeout_ms); |
| | | bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms); |
| | | bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms); |
| | | |
| | | void Start(ServerCB const &server_cb, SubDataCB const &sub_cb); |
| | | void Stop(); |
| | | |
| | | private: |
| | | bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: api_test.cpp |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月13日 14时31分46秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "util.h" |
| | | |
| | | BOOST_AUTO_TEST_CASE(ApiTest) |
| | | { |
| | | } |
| | |
| | | } |
| | | MsgI msg; |
| | | BHMsgHead head; |
| | | if (!cli.SyncRecv(msg, head, 1000)) { |
| | | if (!cli.SyncRecv(msg, head, 100)) { |
| | | printf("********** client recv error.\n"); |
| | | } else { |
| | | DEFER1(msg.Release(shm)); |
| | |
| | | BHMsgHead req_head; |
| | | |
| | | while (!stop) { |
| | | if (srv.SyncRecv(req, req_head, 100)) { |
| | | if (srv.SyncRecv(req, req_head, 10)) { |
| | | DEFER1(req.Release(shm)); |
| | | |
| | | if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) { |
| | |
| | | std::condition_variable cv; |
| | | |
| | | std::atomic<uint64_t> n(0); |
| | | auto OnTopicData = [&](const std::string &proc_id, const std::string &topic, const std::string &data) { |
| | | auto OnTopicData = [&](const std::string &proc_id, const MsgPublish &pub) { |
| | | ++total_count; |
| | | |
| | | auto cur = Now(); |
| | |
| | | |
| | | for (unsigned i = 0; i < nmsg; ++i) { |
| | | std::string data = topic + std::to_string(i) + std::string(1000, '-'); |
| | | |
| | | bool r = provider.Publish(topic, data.data(), data.size(), timeout); |
| | | MsgPublish pub; |
| | | pub.set_topic(topic); |
| | | pub.set_data(data); |
| | | bool r = provider.Publish(pub, timeout); |
| | | if (!r) { |
| | | static std::atomic<int> an(0); |
| | | int n = ++an; |
| | |
| | | |
| | | std::atomic<int> count(0); |
| | | std::string reply; |
| | | auto onRecv = [&](const std::string &rep) { |
| | | reply = rep; |
| | | auto onRecv = [&](const std::string &proc_id, const MsgRequestTopicReply &msg) { |
| | | reply = msg.data(); |
| | | if (++count >= nreq) { |
| | | printf("count: %d\n", count.load()); |
| | | } |
| | |
| | | client.ClientStartWorker(onRecv, 2); |
| | | boost::timer::auto_cpu_timer timer; |
| | | for (int i = 0; i < nreq; ++i) { |
| | | if (!client.ClientAsyncRequest(topic, "data " + std::to_string(i), 1000)) { |
| | | MsgRequestTopic req; |
| | | req.set_topic(topic); |
| | | req.set_data("data " + std::to_string(i)); |
| | | if (!client.ClientAsyncRequest(req, 1000)) { |
| | | printf("client request failed\n"); |
| | | ++count; |
| | | } |
| | |
| | | auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { |
| | | DemoNode server(name, shm); |
| | | |
| | | auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) { |
| | | auto onData = [&](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { |
| | | ++server_msg_count; |
| | | reply = topic + ':' + data; |
| | | reply.set_data(request.topic() + ':' + request.data()); |
| | | return true; |
| | | }; |
| | | server.ServerStart(onData); |