From 153375e3b152768cbffce715d049499945834c29 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 13 四月 2021 15:11:14 +0800 Subject: [PATCH] add api code, use proto msg as data. --- utest/speed_test.cpp | 4 utest/api_test.cpp | 22 ++ .vscode/launch.json | 2 .vscode/settings.json | 5 src/defs.h | 3 src/topic_node.h | 36 +-- src/bh_api.h | 79 +++++++ utest/utest.cpp | 21 + src/bh_api.cpp | 263 ++++++++++++++++++++++++++ src/topic_node.cpp | 86 +++----- src/defs.cpp | 23 ++ 11 files changed, 455 insertions(+), 89 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index b4e9631..939b9a9 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -11,7 +11,7 @@ "program": "${workspaceFolder}/debug/bin/utest", "args": [ "-t", - "HeartbeatTest" + "ApiTest" ], "stopAtEntry": false, "cwd": "${workspaceFolder}", diff --git a/.vscode/settings.json b/.vscode/settings.json index 0af3e6c..6a4497a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -67,5 +67,8 @@ "build/": true, "debug/": true }, - "cmake.configureOnOpen": false + "cmake.configureOnOpen": false, + "C_Cpp.default.includePath": [ + "build/proto" + ] } \ No newline at end of file diff --git a/src/bh_api.cpp b/src/bh_api.cpp new file mode 100644 index 0000000..39e38d3 --- /dev/null +++ b/src/bh_api.cpp @@ -0,0 +1,263 @@ +#include "bh_api.h" +#include "defs.h" +#include "topic_node.h" +#include <memory> + +using namespace bhome_shm; +using namespace bhome_msg; + +namespace +{ +TopicNode &ProcNode() +{ + static TopicNode node(BHomeShm()); + return node; +} + +class TmpPtr : private boost::noncopyable +{ + void *ptr_ = 0; + size_t size_ = 0; + +public: + explicit TmpPtr(const size_t size) : + ptr_(malloc(size)), size_(size) {} + explicit TmpPtr(const std::string &str) : + TmpPtr(str.size()) + { + if (ptr_) { + memcpy(ptr_, str.data(), str.size()); + } + } + ~TmpPtr() { free(ptr_); } + void *get() const { return ptr_; } + void *release() + { + void *tmp = ptr_; + ptr_ = 0; + return tmp; + } + size_t size() const { return size_; } + operator bool() const { return ptr_; } +}; + +template <class Msg> +bool PackOutput(const Msg &msg, void **out, int *out_len) +{ + auto size = msg.ByteSizeLong(); + TmpPtr p(size); + if (!p) { + SetLastError(ENOMEM, "not enough memory."); + return false; + } + msg.SerializePartialToArray(p.get(), size); + *out = p.release(); + *out_len = size; + return true; +} + +} // namespace + +bool BHRegister(const void *proc_info, + const int proc_info_len, + void **reply, + int *reply_len, + const int timeout_ms) +{ + ProcInfo pi; + if (!pi.ParseFromArray(proc_info, proc_info_len)) { + SetLastError(eInvalidInput, "invalid input."); + return false; + } + MsgCommonReply msg_reply; + if (ProcNode().Register(pi, msg_reply, timeout_ms)) { + return PackOutput(msg_reply, reply, reply_len); + } else { + return false; + } +} + +bool BHHeartBeatEasy(const int timeout_ms) +{ + return ProcNode().Heartbeat(timeout_ms); +} + +bool BHHeartBeat(const void *proc_info, + const int proc_info_len, + void **reply, + int *reply_len, + const int timeout_ms) +{ + ProcInfo pi; + if (!pi.ParseFromArray(proc_info, proc_info_len)) { + SetLastError(eInvalidInput, "invalid input."); + return false; + } + MsgCommonReply msg_reply; + if (ProcNode().Heartbeat(pi, msg_reply, timeout_ms)) { + return PackOutput(msg_reply, reply, reply_len); + } else { + return false; + } +} + +bool BHPublish(const void *msgpub, + const int msgpub_len, + const int timeout_ms) +{ + MsgPublish pub; + if (!pub.ParseFromArray(msgpub, msgpub_len)) { + SetLastError(eInvalidInput, "invalid input."); + return false; + } + return ProcNode().Publish(pub, timeout_ms); +} + +bool BHReadSub(void **proc_id, + int *proc_id_len, + void **msgpub, + int *msgpub_len, + const int timeout_ms) +{ + std::string proc; + MsgPublish pub; + + if (ProcNode().RecvSub(proc, pub, timeout_ms)) { + TmpPtr pproc(proc); + if (pproc && PackOutput(pub, msgpub, msgpub_len)) { + *proc_id = pproc.release(); + *proc_id_len = pproc.size(); + } else { + SetLastError(ENOMEM, "out of mem"); + } + } + return false; +} + +bool BHRequest(const void *request, + const int request_len, + void **proc_id, + int *proc_id_len, + void **reply, + int *reply_len, + const int timeout_ms) +{ + MsgRequestTopic req; + if (!req.ParseFromArray(request, request_len)) { + SetLastError(eInvalidInput, "invalid input."); + return false; + } + std::string proc; + MsgRequestTopicReply out_msg; + if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) { + TmpPtr pproc(proc); + if (pproc && PackOutput(out_msg, reply, reply_len)) { + *proc_id = pproc.release(); + *proc_id_len = pproc.size(); + } else { + SetLastError(ENOMEM, "out of mem"); + } + } + return false; +} + +bool BHReadRequest(void **proc_id, + int *proc_id_len, + void **request, + int *request_len, + void **src, + const int timeout_ms) +{ + void *src_info = 0; + std::string proc; + MsgRequestTopic out_msg; + if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) { + TmpPtr pproc(proc); + if (pproc && PackOutput(out_msg, request, request_len)) { + *proc_id = pproc.release(); + *proc_id_len = pproc.size(); + *src = src_info; + } else { + SetLastError(ENOMEM, "out of mem"); + } + } + return false; +} + +bool BHSendReply(void *src, + const void *reply, + const int reply_len, + const int timeout_ms) +{ + MsgRequestTopicReply rep; + if (!rep.ParseFromArray(reply, reply_len)) { + SetLastError(eInvalidInput, "invalid input."); + return false; + } + return ProcNode().ServerSendReply(src, rep, timeout_ms); +} + +int BHCleanUp() +{ + return 0; +} + +namespace +{ +typedef std::function<bool(const void *, const int)> ServerSender; +} // namespace + +void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb) +{ + TopicNode::ServerCB on_req; + TopicNode::SubDataCB on_sub; + if (server_cb) { + on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { + std::string sreq(request.SerializeAsString()); + bool r = false; + ServerSender sender = [&](const void *p, const int len) { + r = reply.ParseFromArray(p, len); + return r; + }; + server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), (BHServerCallbackTag *) (&sender)); + return r; + }; + } + if (sub_cb) { + on_sub = [sub_cb](const std::string &proc_id, const MsgPublish &pub) { + std::string s(pub.SerializeAsString()); + sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size()); + }; + } + + ProcNode().Start(on_req, on_sub); +} +bool BHServerCallbackReply(const BHServerCallbackTag *tag, + const void *data, + const int data_len) +{ + auto &sender = *(const ServerSender *) (tag); + return sender(data, data_len); +} + +void BHFree(void *data, int size) +{ + free(data); +} + +int BHGetLastError(void **msg, int *msg_len) +{ + int ec = 0; + if (msg && msg_len) { + std::string err_msg; + GetLastError(ec, err_msg); + TmpPtr p(err_msg); + if (p) { + *msg = p.release(); + *msg_len = p.size(); + } + } + return ec; +} + +#undef BH_SOCKET_MEMF_CALL diff --git a/src/bh_api.h b/src/bh_api.h new file mode 100644 index 0000000..1b351ca --- /dev/null +++ b/src/bh_api.h @@ -0,0 +1,79 @@ +#ifndef BH_API_WRAPPER_O81WKNXI +#define BH_API_WRAPPER_O81WKNXI + +#ifdef __cplusplus +extern "C" { +#endif + +struct BHSrcInfo; +struct BHServerCallbackTag; + +bool BHRegister(const void *proc_info, + const int proc_info_len, + void **reply, + int *reply_len, + const int timeout_ms); + +typedef void (*FSubDataCallback)(const void *proc_id, + const int proc_id_len, + const void *data, + const int data_len); + +typedef void (*FServerCallback)(const void *proc_id, + const int proc_id_len, + const void *data, + const int data_len, + BHServerCallbackTag *tag); + +void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb); +bool BHServerCallbackReply(const BHServerCallbackTag *tag, + const void *data, + const int data_len); + +bool BHHeartBeatEasy(const int timeout_ms); +bool BHHeartBeat(const void *proc_info, + const int proc_info_len, + void **reply, + int *reply_len, + const int timeout_ms); + +bool BHPublish(const void *msgpub, + const int msgpub_len, + const int timeout_ms); + +bool BHReadSub(const void *proc_id, + const int proc_id_len, + void **msgpub, + int *msgpub_len, + const int timeout_ms); + +bool BHRequest(const void *request, + const int request_len, + void **proc_id, + int *proc_id_len, + void **reply, + int *reply_len, + const int timeout_ms); + +bool BHReadRequest(void **proc_id, + int *proc_id_len, + void **request, + int *request_len, + BHSrcInfo **src, + const int timeout_ms); + +bool BHSendReply(BHSrcInfo *src, + const void *reply, + const int reply_len, + const int timeout_ms); + +// int BHCleanUp(); + +void BHFree(void *buf, int size); + +int BHGetLastError(void **msg, int &msg_len); + +#ifdef __cplusplus +} +#endif +#endif /* end of include guard: BH_API_WRAPPER_O81WKNXI */ diff --git a/src/defs.cpp b/src/defs.cpp index cab4fc7..bab2e53 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -23,8 +23,31 @@ const MQId kBHTopicReqRepCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff"); const MQId kBHUniCenter = boost::uuids::string_generator()("87654321-89ab-cdef-8349-1234567890ff"); +struct LastError { + int ec_ = 0; + std::string msg_; +}; + +LastError &LastErrorStore() +{ + thread_local LastError le; + return le; +} + } // namespace const MQId &BHTopicBusAddress() { return kBHTopicBus; } const MQId &BHTopicCenterAddress() { return kBHTopicReqRepCenter; } const MQId &BHUniCenterAddress() { return kBHUniCenter; } + +void SetLastError(const int ec, const std::string &msg) +{ + LastErrorStore().ec_ = ec; + LastErrorStore().msg_ = msg; +} + +void GetLastError(int &ec, std::string &msg) +{ + ec = LastErrorStore().ec_; + msg = LastErrorStore().msg_; +} \ No newline at end of file diff --git a/src/defs.h b/src/defs.h index d50e380..08181d8 100644 --- a/src/defs.h +++ b/src/defs.h @@ -38,7 +38,8 @@ bhome_shm::SharedMemory &BHomeShm(); typedef std::string Topic; - +void SetLastError(const int ec, const std::string &msg); +void GetLastError(int &ec, std::string &msg); //TODO center can check shm for previous crash. #endif // end of include guard: DEFS_KP8LKGD0 diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 788c536..8f039de 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -39,7 +39,7 @@ TopicNode::TopicNode(SharedMemory &shm) : shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) { - Start(); + SockNode().Start(); } TopicNode::~TopicNode() @@ -47,14 +47,15 @@ Stop(); } -void TopicNode::Start() +void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb) { - SockNode().Start(); - SockClient().Start(); - SockServer().Start(); + ServerStart(server_cb, 1); + SubscribeStartWorker(sub_cb, 1); + // SockClient().Start(); } void TopicNode::Stop() { + SockSub().Stop(); SockServer().Stop(); SockClient().Stop(); SockNode().Stop(); @@ -132,10 +133,8 @@ return r; } -bool TopicNode::ServerStart(const OnRequest &rcb, int nworker) +bool TopicNode::ServerStart(const ServerCB &rcb, int nworker) { - //TODO check registered - auto failed_q = std::make_shared<ServerFailedQ>(); auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); }; @@ -144,10 +143,8 @@ if (head.type() == kMsgTypeRequestTopic && head.route_size() > 0) { MsgRequestTopic req; if (imsg.ParseBody(req)) { - std::string out; - if (rcb(req.topic(), req.data(), out)) { - MsgRequestTopicReply reply_body; - reply_body.set_data(std::move(out)); + MsgRequestTopicReply reply_body; + if (rcb(head.proc_id(), req, reply_body)) { BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id())); for (int i = 0; i < head.route_size() - 1; ++i) { @@ -173,17 +170,15 @@ return rcb && sock.Start(onRecv, onIdle, nworker); } -bool TopicNode::ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) +bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) { auto &sock = SockServer(); MsgI imsg; BHMsgHead head; if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { - MsgRequestTopic request; if (imsg.ParseBody(request)) { - request.mutable_topic()->swap(topic); - request.mutable_data()->swap(data); + head.mutable_proc_id()->swap(proc_id); SrcInfo *p = new SrcInfo; p->route.assign(head.route().begin(), head.route().end()); p->msg_id = head.msg_id(); @@ -194,7 +189,7 @@ return false; } -bool TopicNode::ServerSendReply(void *src_info, const std::string &data, const int timeout_ms) +bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body, const int timeout_ms) { auto &sock = SockServer(); @@ -203,14 +198,10 @@ if (!p || p->route.empty()) { return false; } - MsgRequestTopicReply body; - body.set_data(data); BHMsgHead head(InitMsgHead(GetType(body), proc_id(), p->msg_id)); - for (unsigned i = 0; i < p->route.size() - 1; ++i) { head.add_route()->Swap(&p->route[i]); } - return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms); } @@ -223,7 +214,7 @@ if (head.type() == kMsgTypeRequestTopicReply) { MsgRequestTopicReply reply; if (imsg.ParseBody(reply)) { - cb(reply.data()); + cb(head.proc_id(), reply); } } }; @@ -231,14 +222,11 @@ return SockRequest().Start(onData, nworker); } -bool TopicNode::ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) +bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const int timeout_ms, const RequestResultCB &cb) { auto Call = [&](const void *remote) { auto &sock = SockRequest(); - MsgRequestTopic req; - req.set_topic(topic); - req.set_data(data, size); BHMsgHead head(InitMsgHead(GetType(req), proc_id())); AddRoute(head, sock.id()); @@ -247,7 +235,7 @@ if (head.type() == kMsgTypeRequestTopicReply) { MsgRequestTopicReply reply; if (imsg.ParseBody(reply)) { - cb(reply.data()); + cb(head.proc_id(), reply); } } }; @@ -259,9 +247,10 @@ try { BHAddress addr; - if (ClientQueryRPCTopic(topic, addr, timeout_ms)) { + if (ClientQueryRPCTopic(req.topic(), addr, timeout_ms)) { return Call(addr.mq_id().data()); } else { + SetLastError(eNotFound, "remote not found."); return false; } } catch (...) { @@ -269,37 +258,30 @@ } } -bool TopicNode::ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms) +bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms) { try { auto &sock = SockRequest(); BHAddress addr; - if (ClientQueryRPCTopic(topic, addr, timeout_ms)) { - - MsgRequestTopic req; - req.set_topic(topic); - req.set_data(data, size); - BHMsgHead head(InitMsgHead(GetType(req), proc_id())); + if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { + BHMsgHead head(InitMsgHead(GetType(request), proc_id())); AddRoute(head, sock.id()); - MsgI reply; - DEFER1(reply.Release(shm_);); + MsgI reply_msg; + DEFER1(reply_msg.Release(shm_);); BHMsgHead reply_head; - if (sock.SendAndRecv(addr.mq_id().data(), head, req, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeRequestTopicReply) { - MsgRequestTopicReply dr; - if (reply.ParseBody(dr)) { - dr.mutable_data()->swap(out); + if (sock.SendAndRecv(addr.mq_id().data(), head, request, reply_msg, reply_head, timeout_ms) && reply_head.type() == kMsgTypeRequestTopicReply) { + if (reply_msg.ParseBody(out_reply)) { + reply_head.mutable_proc_id()->swap(out_proc_id); return true; } else { printf("error parse reply.\n"); } - } else { - printf("error recv data. line: %d\n", __LINE__); } } else { - printf("error recv data. line: %d\n", __LINE__); + SetLastError(eNotFound, "remote not found."); } } catch (...) { printf("error recv data. line: %d\n", __LINE__); @@ -344,14 +326,10 @@ // publish -bool TopicNode::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms) +bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms) { try { auto &sock = SockPub(); - - MsgPublish pub; - pub.set_topic(topic); - pub.set_data(data, size); BHMsgHead head(InitMsgHead(GetType(pub), proc_id())); AddRoute(head, sock.id()); @@ -386,7 +364,7 @@ } } -bool TopicNode::SubscribeStartWorker(const TopicDataCB &tdcb, int nworker) +bool TopicNode::SubscribeStartWorker(const SubDataCB &tdcb, int nworker) { auto &sock = SockSub(); @@ -394,7 +372,7 @@ if (head.type() == kMsgTypePublish) { MsgPublish pub; if (imsg.ParseBody(pub)) { - tdcb(head.proc_id(), pub.topic(), pub.data()); + tdcb(head.proc_id(), pub); } } else { // ignored, or dropped @@ -404,18 +382,16 @@ return tdcb && sock.Start(AsyncRecvProc, nworker); } -bool TopicNode::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms) +bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms) { auto &sock = SockSub(); MsgI msg; DEFER1(msg.Release(shm());); BHMsgHead head; + //TODO error msg. if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { - MsgPublish pub; if (msg.ParseBody(pub)) { head.mutable_proc_id()->swap(proc_id); - pub.mutable_topic()->swap(topic); - pub.mutable_data()->swap(data); return true; } } diff --git a/src/topic_node.h b/src/topic_node.h index d2cdcf9..d671026 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -34,45 +34,39 @@ SharedMemory &shm() { return shm_; } public: + typedef std::function<void(std::string &proc_id, const void *data, const int len)> DataCB; TopicNode(SharedMemory &shm); ~TopicNode(); - void Start(); - void Stop(); // topic node bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms); bool Heartbeat(const int timeout_ms); // topic rpc server - typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest; - bool ServerStart(OnRequest const &cb, const int nworker = 2); + typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerCB; + bool ServerStart(ServerCB const &cb, const int nworker = 2); bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms); - bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms); - bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms); + bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms); + bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply, const int timeout_ms); // topic client - typedef std::function<void(const std::string &data)> RequestResultCB; + typedef std::function<void(const std::string &proc_id, const MsgRequestTopicReply &reply)> RequestResultCB; bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2); - bool ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()); - bool ClientAsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()) - { - return ClientAsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb); - } - bool ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms); - bool ClientSyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms) - { - return ClientSyncRequest(topic, data.data(), data.size(), out, timeout_ms); - } + bool ClientAsyncRequest(const MsgRequestTopic &request, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()); + bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms); // publish - bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms); + bool Publish(const MsgPublish &pub, const int timeout_ms); // subscribe - typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB; - bool SubscribeStartWorker(const TopicDataCB &tdcb, int nworker = 2); + typedef std::function<void(const std::string &proc_id, const MsgPublish &data)> SubDataCB; + bool SubscribeStartWorker(const SubDataCB &tdcb, int nworker = 2); bool Subscribe(MsgTopicList &topics, const int timeout_ms); - bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms); + bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms); + + void Start(ServerCB const &server_cb, SubDataCB const &sub_cb); + void Stop(); private: bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); diff --git a/utest/api_test.cpp b/utest/api_test.cpp new file mode 100644 index 0000000..40ed2a1 --- /dev/null +++ b/utest/api_test.cpp @@ -0,0 +1,22 @@ +/* + * ===================================================================================== + * + * Filename: api_test.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�13鏃� 14鏃�31鍒�46绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#include "util.h" + +BOOST_AUTO_TEST_CASE(ApiTest) +{ +} \ No newline at end of file diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index 77c018a..7f77b02 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -171,7 +171,7 @@ } MsgI msg; BHMsgHead head; - if (!cli.SyncRecv(msg, head, 1000)) { + if (!cli.SyncRecv(msg, head, 100)) { printf("********** client recv error.\n"); } else { DEFER1(msg.Release(shm)); @@ -192,7 +192,7 @@ BHMsgHead req_head; while (!stop) { - if (srv.SyncRecv(req, req_head, 100)) { + if (srv.SyncRecv(req, req_head, 10)) { DEFER1(req.Release(shm)); if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) { diff --git a/utest/utest.cpp b/utest/utest.cpp index f127a8f..5a04842 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -124,7 +124,7 @@ std::condition_variable cv; std::atomic<uint64_t> n(0); - auto OnTopicData = [&](const std::string &proc_id, const std::string &topic, const std::string &data) { + auto OnTopicData = [&](const std::string &proc_id, const MsgPublish &pub) { ++total_count; auto cur = Now(); @@ -149,8 +149,10 @@ for (unsigned i = 0; i < nmsg; ++i) { std::string data = topic + std::to_string(i) + std::string(1000, '-'); - - bool r = provider.Publish(topic, data.data(), data.size(), timeout); + MsgPublish pub; + pub.set_topic(topic); + pub.set_data(data); + bool r = provider.Publish(pub, timeout); if (!r) { static std::atomic<int> an(0); int n = ++an; @@ -218,8 +220,8 @@ std::atomic<int> count(0); std::string reply; - auto onRecv = [&](const std::string &rep) { - reply = rep; + auto onRecv = [&](const std::string &proc_id, const MsgRequestTopicReply &msg) { + reply = msg.data(); if (++count >= nreq) { printf("count: %d\n", count.load()); } @@ -227,7 +229,10 @@ client.ClientStartWorker(onRecv, 2); boost::timer::auto_cpu_timer timer; for (int i = 0; i < nreq; ++i) { - if (!client.ClientAsyncRequest(topic, "data " + std::to_string(i), 1000)) { + MsgRequestTopic req; + req.set_topic(topic); + req.set_data("data " + std::to_string(i)); + if (!client.ClientAsyncRequest(req, 1000)) { printf("client request failed\n"); ++count; } @@ -248,9 +253,9 @@ auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { DemoNode server(name, shm); - auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) { + auto onData = [&](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { ++server_msg_count; - reply = topic + ':' + data; + reply.set_data(request.topic() + ':' + request.data()); return true; }; server.ServerStart(onData); -- Gitblit v1.8.0