From cf0a3209b51babf72469d962914db0dac2e5f52c Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 27 十二月 2022 14:13:30 +0800 Subject: [PATCH] add get msg timeout --- fixed_q.h | 30 +++++++++++++- exported_symbols | 2 + cbhomeclient.cpp | 53 ++++++++++++++++++-------- cbhomeclient.h | 12 +++++- 4 files changed, 76 insertions(+), 21 deletions(-) diff --git a/cbhomeclient.cpp b/cbhomeclient.cpp index c101e66..b2d084a 100644 --- a/cbhomeclient.cpp +++ b/cbhomeclient.cpp @@ -82,7 +82,7 @@ Msg to_bus(client* cli, F&& f, Args&&... args){ Msg mesg; if (std::forward<F>(f)(cli->bus, std::forward<Args>(args)...)) - mesg = std::move(msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...)))); + mesg = msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...))); return mesg; } @@ -96,7 +96,7 @@ tie(d, s) = tp; if (m.ParseFromArray(d, s)) { bus_free(d, s); - msg = std::move(make_tuple(true, std::move(m))); + msg = make_tuple(true, std::move(m)); break; } m.Clear(); @@ -110,8 +110,8 @@ template <size_t... Is, class F, class... Args> MsgCR to_center(client* cli, F&& f, Args&&... args){ MsgCR msg(dummy()); - auto vmsg = std::move(to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...)); - if (!vmsg.empty()) msg = std::move(parse(cli, vmsg.at(0))); + auto vmsg = to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...); + if (!vmsg.empty()) msg = parse(cli, vmsg.at(0)); return msg; } @@ -124,8 +124,8 @@ const auto& tpc = tlist.SerializeAsString(); void* replymsg = NULL; int replysize = 0; - msg = std::move(to_center<2,3>(cli, std::forward<F>(f), - tpc.data(), tpc.size(), &replymsg, &replysize, sndto)); + msg = to_center<2,3>(cli, std::forward<F>(f), + tpc.data(), tpc.size(), &replymsg, &replysize, sndto); return msg; } @@ -248,11 +248,7 @@ delete cli; } -struct csubmsg* bus_client_get_submsg(void* handle){ - client* cli = ptr(handle); - Msg msg = std::move(cli->sub_q->pop()); - if (msg.empty()) return NULL; - +static struct csubmsg* parse_submsg(const Msg& msg){ void* procid = NULL, *data = NULL; int pids = 0, size = 0; @@ -265,12 +261,23 @@ return pmsg; } - -struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){ +struct csubmsg* bus_client_get_submsg_intime(void* handle, const size_t ms){ client* cli = ptr(handle); - Msg msg = std::move(cli->readreq_q->pop()); + Msg msg = cli->sub_q->pop(ms); if (msg.empty()) return NULL; + return parse_submsg(msg); +} + +struct csubmsg* bus_client_get_submsg(void* handle){ + client* cli = ptr(handle); + Msg msg = cli->sub_q->pop(); + if (msg.empty()) return NULL; + + return parse_submsg(msg); +} + +static struct creqmsg* parse_reqmsg(const Msg& msg, void** src){ void* procid = NULL, *data = NULL; int pids = 0, size = 0; @@ -284,14 +291,28 @@ return pmsg; } +struct creqmsg* bus_client_get_reqmsg_intime(void* handle, void** src, const size_t ms){ + client* cli = ptr(handle); + Msg msg = cli->readreq_q->pop(ms); + if (msg.empty()) return NULL; + + return parse_reqmsg(msg, src); +} +struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){ + client* cli = ptr(handle); + Msg msg = cli->readreq_q->pop(); + if (msg.empty()) return NULL; + + return parse_reqmsg(msg, src); +} int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg){ void* procid = NULL, *reply = NULL; int pids = 0, replys = 0; - auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0, + auto vmsg = to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0, msg->msg, msg->msgl, &procid, &pids, - &reply, &replys, sndto)); + &reply, &replys, sndto); if (!vmsg.empty()){ void* procid = NULL, *data = NULL; int pids = 0, size = 0; diff --git a/cbhomeclient.h b/cbhomeclient.h index 9d24e6c..c962cea 100644 --- a/cbhomeclient.h +++ b/cbhomeclient.h @@ -24,11 +24,15 @@ void bus_client_free(void* handle); /* - 鑾峰彇璁㈤槄鐨勬秷鎭紝璁㈤槄娑堟伅閫氳繃绾跨▼涓嶅仠璇诲彇锛屾澶勪粠缂撳瓨涓鍙� + 鑾峰彇璁㈤槄鐨勬秷鎭紝璁㈤槄娑堟伅閫氳繃绾跨▼涓嶅仠璇诲彇锛屾澶勪粠缂撳瓨涓鍙栵紝璇讳笉鍒颁笉杩斿洖 蹇呴』閫氳繃 message.h 鐨� free_reqmsg 閲婃斁 閫氳繃 get_submsg_db get_submsg_proclist 鑾峰彇瀵瑰簲鐨勬秷鎭� */ struct csubmsg* bus_client_get_submsg(void* handle); +/* + 涓� bus_client_get_submsg 鐩稿悓锛屼絾鏀寔瓒呮椂杩斿洖锛宮s 鍙傛暟琛ㄦ槑瓒呮椂鏃堕暱锛屽崟浣嶆绉� +*/ +struct csubmsg* bus_client_get_submsg_intime(void* handle, const size_t ms); /* 鍙戝竷娑堟伅锛宒ata 鏄� MsgPublish protobuffer搴忓垪鍖栧悗鐨勬暟鎹� */ @@ -39,7 +43,7 @@ int bus_client_publish(void* handle, const char* topic, const size_t topicl, const char* data, const size_t size); /* - 鑾峰彇 request 娑堟伅锛岄�氳繃绾跨▼璇诲彇锛屾澶勪粠缂撳瓨涓鍙� + 鑾峰彇 request 娑堟伅锛岄�氳繃绾跨▼璇诲彇锛屾澶勪粠缂撳瓨涓鍙�, 璇讳笉鍒颁笉杩斿洖 蹇呴』璋冪敤 free_reqmsg 閲婃斁 鍙�氳繃 message.h 鐨� get_reqmsg_stackerr get_reqmsg_stack 鑾峰彇瀵瑰簲鐨勬秷鎭� src 鏄摢涓�涓繘绋嬭姹傜殑鏍囪瘑绗� @@ -48,6 +52,10 @@ */ struct creqmsg* bus_client_get_reqmsg(void* handle, void** src); /* + 涓� bus_client_get_reqmsg 鐩稿悓锛屼絾鏀寔瓒呮椂杩斿洖锛宮s 鍙傛暟琛ㄦ槑瓒呮椂鏃堕暱锛屽崟浣嶆绉� +*/ +struct creqmsg* bus_client_get_reqmsg_intime(void* handle, void** src, const size_t ms); +/* 鍝嶅簲娑堟伅鍥炲锛宻rc鏄繛鎺ユ爣璇嗙锛宮sg鏄渶瑕佸洖澶嶇殑娑堟伅 閫氳繃 message.h 鐨� make_reply_msg 鍒涘缓鏃讹紝鏈夊唴瀛樻嫹璐濓紝蹇呴』閫氳繃 free_reply_msg 閲婃斁 鎴栬�呴�氳繃濉厖 crepmsg 缁撴瀯浣撴瀯閫狅紝鐢辫皟鐢ㄨ�呮帶鍒跺彉閲忕殑鍐呭瓨鍜岀敓鍛藉懆鏈燂紝鍙兘涓嶄細鎷疯礉鍐呭瓨锛屾晥鐜囨洿楂� diff --git a/exported_symbols b/exported_symbols index be3995b..aea51b5 100644 --- a/exported_symbols +++ b/exported_symbols @@ -3,9 +3,11 @@ bus_client_init; bus_client_free; bus_client_get_submsg; + bus_client_get_submsg_intime; bus_client_pubmsg; bus_client_publish; bus_client_get_reqmsg; + bus_client_get_reqmsg_intime; bus_client_reply_msg; bus_client_request; diff --git a/fixed_q.h b/fixed_q.h index 2e4fe94..4620d48 100644 --- a/fixed_q.h +++ b/fixed_q.h @@ -46,6 +46,17 @@ cond_node.notify(); return q.size(); } + int pop(T& m, const size_t ms){ + std::unique_lock<std::mutex> l{mtx_q}; + auto d = std::chrono::milliseconds{ms}; + if(!cond_node.wait_for(l, d, [this]{ return !q.empty(); })){ + return -1; + } + m = q.front(); + q.pop_front(); + cond_spare.notify(); + return q.size(); + } int pop(T& m){ std::unique_lock<std::mutex> l{mtx_q}; auto d = std::chrono::milliseconds{du}; @@ -57,16 +68,29 @@ cond_spare.notify(); return q.size(); } + T pop(const size_t ms){ + std::unique_lock<std::mutex> l{mtx_q}; + auto d = std::chrono::milliseconds{ms}; + T t{}; + if(!cond_node.wait_for(l, d, [this]{ return !q.empty(); })){ + return t; + } + t = q.front(); + q.pop_front(); + cond_spare.notify(); + return t; + } T pop(){ std::unique_lock<std::mutex> l{mtx_q}; auto d = std::chrono::milliseconds{du}; + T t{}; while(!cond_node.wait_for(l, d, [this]{ return !q.empty(); })){ - if (pred()) return T{}; + if (pred()) return t; } - auto m(move(q.front())); + t = q.front(); q.pop_front(); cond_spare.notify(); - return m; + return t; } template<typename F> void clear(F&& f){ -- Gitblit v1.8.0