From cf0a3209b51babf72469d962914db0dac2e5f52c Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 27 十二月 2022 14:13:30 +0800
Subject: [PATCH] add get msg timeout

---
 fixed_q.h        |   30 +++++++++++++-
 exported_symbols |    2 +
 cbhomeclient.cpp |   53 ++++++++++++++++++--------
 cbhomeclient.h   |   12 +++++-
 4 files changed, 76 insertions(+), 21 deletions(-)

diff --git a/cbhomeclient.cpp b/cbhomeclient.cpp
index c101e66..b2d084a 100644
--- a/cbhomeclient.cpp
+++ b/cbhomeclient.cpp
@@ -82,7 +82,7 @@
 Msg to_bus(client* cli, F&& f, Args&&... args){
     Msg mesg;
     if (std::forward<F>(f)(cli->bus, std::forward<Args>(args)...))
-        mesg = std::move(msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...))));
+        mesg = msg(crop<Is...>(tuple<Args...>(std::forward<Args>(args)...)));
     return mesg;
 }
 
@@ -96,7 +96,7 @@
         tie(d, s) = tp;
         if (m.ParseFromArray(d, s)) {
             bus_free(d, s);
-            msg = std::move(make_tuple(true, std::move(m)));
+            msg = make_tuple(true, std::move(m));
             break;
         }
         m.Clear();
@@ -110,8 +110,8 @@
 template <size_t... Is, class F, class... Args>
 MsgCR to_center(client* cli, F&& f, Args&&... args){
     MsgCR msg(dummy());
-    auto vmsg = std::move(to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...));
-    if (!vmsg.empty()) msg = std::move(parse(cli, vmsg.at(0)));
+    auto vmsg = to_bus<Is...>(cli, std::forward<F>(f), std::forward<Args>(args)...);
+    if (!vmsg.empty()) msg = parse(cli, vmsg.at(0));
     return msg;
 }
 
@@ -124,8 +124,8 @@
     const auto& tpc = tlist.SerializeAsString();
     void* replymsg = NULL;
     int replysize = 0;
-    msg = std::move(to_center<2,3>(cli, std::forward<F>(f),
-        tpc.data(), tpc.size(), &replymsg, &replysize, sndto));
+    msg = to_center<2,3>(cli, std::forward<F>(f),
+        tpc.data(), tpc.size(), &replymsg, &replysize, sndto);
     return msg;
 }
 
@@ -248,11 +248,7 @@
     delete cli;
 }
 
-struct csubmsg* bus_client_get_submsg(void* handle){
-    client* cli = ptr(handle);
-    Msg msg = std::move(cli->sub_q->pop());
-    if (msg.empty()) return NULL;
-
+static struct csubmsg* parse_submsg(const Msg& msg){
     void* procid = NULL, *data = NULL;
     int pids = 0, size = 0;
 
@@ -265,12 +261,23 @@
 
     return pmsg;
 }
-
-struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){
+struct csubmsg* bus_client_get_submsg_intime(void* handle, const size_t ms){
     client* cli = ptr(handle);
-    Msg msg = std::move(cli->readreq_q->pop());
+    Msg msg = cli->sub_q->pop(ms);
     if (msg.empty()) return NULL;
 
+    return parse_submsg(msg);
+}
+
+struct csubmsg* bus_client_get_submsg(void* handle){
+    client* cli = ptr(handle);
+    Msg msg = cli->sub_q->pop();
+    if (msg.empty()) return NULL;
+
+    return parse_submsg(msg);
+}
+
+static struct creqmsg* parse_reqmsg(const Msg& msg, void** src){
     void* procid = NULL, *data = NULL;
     int pids = 0, size = 0;
 
@@ -284,14 +291,28 @@
 
     return pmsg;
 }
+struct creqmsg* bus_client_get_reqmsg_intime(void* handle, void** src, const size_t ms){
+    client* cli = ptr(handle);
+    Msg msg = cli->readreq_q->pop(ms);
+    if (msg.empty()) return NULL;
+
+    return parse_reqmsg(msg, src);
+}
+struct creqmsg* bus_client_get_reqmsg(void* handle, void** src){
+    client* cli = ptr(handle);
+    Msg msg = cli->readreq_q->pop();
+    if (msg.empty()) return NULL;
+
+    return parse_reqmsg(msg, src);
+}
 
 int bus_client_request(void* handle, struct creqmsg* msg, struct crepmsg** repmsg){
 
     void* procid = NULL, *reply = NULL;
     int pids = 0, replys = 0;
-    auto vmsg = std::move(to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0,
+    auto vmsg = to_bus<4,5,6,7>(ptr(handle), bus_request, (void*)NULL, 0,
                     msg->msg, msg->msgl, &procid, &pids,
-                    &reply, &replys, sndto));
+                    &reply, &replys, sndto);
     if (!vmsg.empty()){
         void* procid = NULL, *data = NULL;
         int pids = 0, size = 0;
diff --git a/cbhomeclient.h b/cbhomeclient.h
index 9d24e6c..c962cea 100644
--- a/cbhomeclient.h
+++ b/cbhomeclient.h
@@ -24,11 +24,15 @@
 void bus_client_free(void* handle);
 
 /*
-    鑾峰彇璁㈤槄鐨勬秷鎭紝璁㈤槄娑堟伅閫氳繃绾跨▼涓嶅仠璇诲彇锛屾澶勪粠缂撳瓨涓鍙�
+    鑾峰彇璁㈤槄鐨勬秷鎭紝璁㈤槄娑堟伅閫氳繃绾跨▼涓嶅仠璇诲彇锛屾澶勪粠缂撳瓨涓鍙栵紝璇讳笉鍒颁笉杩斿洖
     蹇呴』閫氳繃 message.h 鐨� free_reqmsg 閲婃斁
     閫氳繃 get_submsg_db get_submsg_proclist 鑾峰彇瀵瑰簲鐨勬秷鎭�
 */
 struct csubmsg* bus_client_get_submsg(void* handle);
+/*
+    涓� bus_client_get_submsg 鐩稿悓锛屼絾鏀寔瓒呮椂杩斿洖锛宮s 鍙傛暟琛ㄦ槑瓒呮椂鏃堕暱锛屽崟浣嶆绉�
+*/
+struct csubmsg* bus_client_get_submsg_intime(void* handle, const size_t ms);
 /*
     鍙戝竷娑堟伅锛宒ata 鏄� MsgPublish protobuffer搴忓垪鍖栧悗鐨勬暟鎹�
 */
@@ -39,7 +43,7 @@
 int bus_client_publish(void* handle, const char* topic, const size_t topicl, const char* data, const size_t size);
 
 /*
-    鑾峰彇 request 娑堟伅锛岄�氳繃绾跨▼璇诲彇锛屾澶勪粠缂撳瓨涓鍙�
+    鑾峰彇 request 娑堟伅锛岄�氳繃绾跨▼璇诲彇锛屾澶勪粠缂撳瓨涓鍙�, 璇讳笉鍒颁笉杩斿洖
     蹇呴』璋冪敤 free_reqmsg 閲婃斁
     鍙�氳繃 message.h 鐨� get_reqmsg_stackerr get_reqmsg_stack 鑾峰彇瀵瑰簲鐨勬秷鎭�
     src 鏄摢涓�涓繘绋嬭姹傜殑鏍囪瘑绗�
@@ -48,6 +52,10 @@
 */
 struct creqmsg* bus_client_get_reqmsg(void* handle, void** src);
 /*
+    涓� bus_client_get_reqmsg 鐩稿悓锛屼絾鏀寔瓒呮椂杩斿洖锛宮s 鍙傛暟琛ㄦ槑瓒呮椂鏃堕暱锛屽崟浣嶆绉�
+*/
+struct creqmsg* bus_client_get_reqmsg_intime(void* handle, void** src, const size_t ms);
+/*
     鍝嶅簲娑堟伅鍥炲锛宻rc鏄繛鎺ユ爣璇嗙锛宮sg鏄渶瑕佸洖澶嶇殑娑堟伅
     閫氳繃 message.h 鐨� make_reply_msg 鍒涘缓鏃讹紝鏈夊唴瀛樻嫹璐濓紝蹇呴』閫氳繃 free_reply_msg 閲婃斁
     鎴栬�呴�氳繃濉厖 crepmsg 缁撴瀯浣撴瀯閫狅紝鐢辫皟鐢ㄨ�呮帶鍒跺彉閲忕殑鍐呭瓨鍜岀敓鍛藉懆鏈燂紝鍙兘涓嶄細鎷疯礉鍐呭瓨锛屾晥鐜囨洿楂�
diff --git a/exported_symbols b/exported_symbols
index be3995b..aea51b5 100644
--- a/exported_symbols
+++ b/exported_symbols
@@ -3,9 +3,11 @@
     bus_client_init;
     bus_client_free;
     bus_client_get_submsg;
+    bus_client_get_submsg_intime;
     bus_client_pubmsg;
     bus_client_publish;
     bus_client_get_reqmsg;
+    bus_client_get_reqmsg_intime;
     bus_client_reply_msg;
     bus_client_request;
 
diff --git a/fixed_q.h b/fixed_q.h
index 2e4fe94..4620d48 100644
--- a/fixed_q.h
+++ b/fixed_q.h
@@ -46,6 +46,17 @@
         cond_node.notify();
         return q.size();
     }
+    int pop(T& m, const size_t ms){
+        std::unique_lock<std::mutex> l{mtx_q};
+        auto d = std::chrono::milliseconds{ms};
+        if(!cond_node.wait_for(l, d, [this]{ return !q.empty(); })){
+            return -1;
+        }
+        m = q.front();
+        q.pop_front();
+        cond_spare.notify();
+        return q.size();
+    }
     int pop(T& m){
         std::unique_lock<std::mutex> l{mtx_q};
         auto d = std::chrono::milliseconds{du};
@@ -57,16 +68,29 @@
         cond_spare.notify();
         return q.size();
     }
+    T pop(const size_t ms){
+        std::unique_lock<std::mutex> l{mtx_q};
+        auto d = std::chrono::milliseconds{ms};
+        T t{};
+        if(!cond_node.wait_for(l, d, [this]{ return !q.empty(); })){
+            return t;
+        }
+        t = q.front();
+        q.pop_front();
+        cond_spare.notify();
+        return t;
+    }
     T pop(){
         std::unique_lock<std::mutex> l{mtx_q};
         auto d = std::chrono::milliseconds{du};
+        T t{};
         while(!cond_node.wait_for(l, d, [this]{ return !q.empty(); })){
-            if (pred()) return T{};
+            if (pred()) return t;
         }
-        auto m(move(q.front()));
+        t = q.front();
         q.pop_front();
         cond_spare.notify();
-        return m;
+        return t;
     }
 
     template<typename F> void clear(F&& f){

--
Gitblit v1.8.0