From 16935f4aebffdd1b6580b844391a0aa0f4f3012b Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 22 四月 2024 10:29:12 +0800
Subject: [PATCH] bug fixed

---
 src/interface_bus_api.cpp |  160 ++++++++++++++++++++++++++++++++--------------------
 1 files changed, 98 insertions(+), 62 deletions(-)

diff --git a/src/interface_bus_api.cpp b/src/interface_bus_api.cpp
index a23dd79..13bc08e 100644
--- a/src/interface_bus_api.cpp
+++ b/src/interface_bus_api.cpp
@@ -10,7 +10,7 @@
 #include "bhome_msg_api.pb.h"
 using namespace bhome_msg;
 
-using bus = tuple<_ps, _ps_sub, _sv, _rr, string>;
+using bus = tuple<array<thread, 0>, _ps, _ps_sub, vector<tuple<float>>, deque<map<string, void*>>, _sv, _rr, unordered_map<void*, unordered_set<string*>>, string>;
 void* bus_register(const void *proc_info,
                const int proc_info_len,
                void **reply,
@@ -30,19 +30,20 @@
 /////////////////////////////////////////////////////////////////////////
         bus *b = new bus;
         bhome_msg::ProcInfo pi;
-        if (pi.ParseFromArray(proc_info, proc_info_len)) get<4>(*b) = pi.proc_id();
+        if (pi.ParseFromArray(proc_info, proc_info_len)) get<8>(*b) = pi.proc_id();
+        std::move(pi);
 
         const auto& url_hb = get_url(URLHeartBeat);
-        respond_survey(url_hb,string{(char*)proc_info,(size_t)proc_info_len},&get<2>(*b));
+        respond_survey(url_hb,string{(char*)proc_info,(size_t)proc_info_len},&get<5>(*b));
 
         const auto& url_pub_proxy = get_url(URLPubProxy);
-        publish(url_pub_proxy, NULL, 0, &get<0>(*b));
+        publish(url_pub_proxy, NULL, 0, &get<1>(*b));
         const auto& url_sub_queue = get_url(URLSubQueue);
-        subscribe_center(url_sub_queue, &get<1>(*b));
+        subscribe_center(url_sub_queue, &get<2>(*b));
 
         // temporary
-        port = 0;
-        start_reply(get<4>(*b), port, &get<3>(*b));
+        // port = 0;
+        start_reply(get<8>(*b), port, &get<6>(*b));
 
         return b;
     }
@@ -89,8 +90,13 @@
         PRNTVITAG("topics is null");
         return false;
     }
+    const auto& url = get_url(URLRegTopic);
+    if (!url) {
+        set_last_error("bus_register_topics url empty");
+        return false;
+    }
 
-    if (get<4>(*b).empty()) {
+    if (get<8>(*b).empty()) {
         PRNTVITAG("proc_id is null");
         return false;
     }
@@ -102,14 +108,16 @@
     }
 
     bhome_msg::MsgTopicList mtl2;
-    mtl2.add_topic_list(get<4>(*b));
+    mtl2.add_topic_list(get<8>(*b));
     for(int i = 0; i < mtl.topic_list_size(); i++){
         mtl2.add_topic_list(mtl.topic_list(i));
     }
+    std::move(mtl);
 
     string msg(mtl2.SerializeAsString());
+    std::move(mtl2);
 
-    return simple_request(get_url(URLRegTopic), msg.data(), msg.size(), reply, reply_len, timeout_ms);
+    return simple_request(url, msg.data(), msg.size(), reply, reply_len, timeout_ms);
 }
 
 int bus_query_topic_address(void* handle, const void *remote,
@@ -131,17 +139,21 @@
         return false;
     }
 
-    auto url(get_url(URLQueryTopic));
-
-    if (remote && remote_len > 0){
-        BHAddress addr;
-        if (addr.ParseFromArray(remote, remote_len)){
-            if (!addr.ip().empty() && addr.port() > 0){
-                // url = "tcp://" + addr.ip() + ":" + to_string(addr.port());
-                printf("======>> bus_query_topic_address use remote address %s\n", url.c_str());
-            }
-        }
+    const auto& url = get_url(URLQueryTopic);
+    if (!url) {
+        set_last_error("bus_query_topic_address url empty");
+        return false;
     }
+
+    // if (remote && remote_len > 0){
+    //     BHAddress addr;
+    //     if (addr.ParseFromArray(remote, remote_len)){
+    //         if (!addr.ip().empty() && addr.port() > 0){
+    //             // url = "tcp://" + addr.ip() + ":" + to_string(addr.port());
+    //             printf("======>> bus_query_topic_address use remote address %s\n", url.c_str());
+    //         }
+    //     }
+    // }
 
     return simple_request(url, topics, topics_len, reply, reply_len, timeout_ms);
 }
@@ -160,32 +172,35 @@
         PRNTVITAG("handle is null");
         return false;
     }
-    if (!query || query_len <= 0) {
-        PRNTVITAG("query is null");
+    // if (!query || query_len <= 0) {
+    //     PRNTVITAG("query is null");
+    //     return false;
+    // }
+
+    const auto& url = get_url(URLQueryProcs);
+    if (!url) {
+        set_last_error("bus_query_procs url empty");
         return false;
     }
-
-    auto url(get_url(URLQueryProcs));
-
-    if (remote && remote_len > 0){
-        BHAddress addr;
-        if (addr.ParseFromArray(remote, remote_len)){
-            if (!addr.ip().empty() && addr.port() > 0){
-                // url = "tcp://" + addr.ip() + ":" + to_string(addr.port());
-                printf("======>> bus_query_procs use remote address %s\n", url.c_str());
-            }
-        }
-    }
+    // if (remote && remote_len > 0){
+    //     BHAddress addr;
+    //     if (addr.ParseFromArray(remote, remote_len)){
+    //         if (!addr.ip().empty() && addr.port() > 0){
+    //             // url = "tcp://" + addr.ip() + ":" + to_string(addr.port());
+    //             printf("======>> bus_query_procs use remote address %s\n", url.c_str());
+    //         }
+    //     }
+    // }
 
     return simple_request(url, query, query_len, reply, reply_len, timeout_ms);
 }
 
-int bus_subscribe_topics(void* handle, const void *topics,
+static int sub(const string& url,void* handle, const void *topics,
                       const int topics_len,
                       void **reply,
                       int *reply_len,
-                      const int timeout_ms)
-{
+                      const int timeout_ms){
+
     TAG;
     bus* b = get(handle);
     if (!b){
@@ -208,13 +223,12 @@
     }
 
     for(int i = 0; i < mtl.topic_list_size(); i ++){
-        subscribe_topic(mtl.topic_list(i), &get<1>(*b));
+        subscribe_topic(mtl.topic_list(i), &get<2>(*b));
     }
-
-    return true;
+    std::move(mtl);
 
 ///////////////////////////////////////////////////
-    auto ret = simple_request(get_url(URLSubLocal), topics, topics_len, reply, reply_len, timeout_ms);
+    auto ret = simple_request(url, topics, topics_len, reply, reply_len, timeout_ms);
     if (!ret){
         printf("bus_subscribe_topics simple_request failed\n");
     }
@@ -222,13 +236,22 @@
 
 }
 
+
+int bus_subscribe_topics(void* handle, const void *topics,
+                      const int topics_len,
+                      void **reply,
+                      int *reply_len,
+                      const int timeout_ms)
+{
+    return sub(get_url(URLSubLocal), handle, topics, topics_len, reply, reply_len, timeout_ms);
+}
 int bus_subscribe_topics_net(void* handle, const void *topics,
                          const int topics_len,
                          void **reply,
                          int *reply_len,
                          const int timeout_ms)
 {
-    return bus_subscribe_topics(handle, topics, topics_len, reply, reply_len, timeout_ms);
+    return sub(get_url(URLSubNet), handle, topics, topics_len, reply, reply_len, timeout_ms);
 }
 
 int bus_heartbeat_easy(void* handle, const int timeout_ms){ return 1; }
@@ -244,7 +267,7 @@
         PRNTVITAG("handle is null");
         return false;
     }
-    if (get<4>(*b).empty()){
+    if (get<8>(*b).empty()){
         PRNTVITAG("proc_id is null");
         return false;
     }
@@ -259,12 +282,15 @@
         return false;
     }
 
+    // msgpublish(msgpublish(msg))
     MsgPublish newPub;
-    newPub.set_topic(get<4>(*b));
+    newPub.set_topic(get<8>(*b));
     newPub.set_data(string{(const char*)msgpub, (const size_t)msgpub_len});
 
     string msg(newPub.SerializeAsString());
-    auto ret = publish(pub.topic(), msg.data(), msg.size(), &get<0>(*b));
+    std::move(newPub);
+
+    auto ret = publish(pub.topic(), msg.data(), msg.size(), &get<1>(*b));
     if (ret > 0) return true;
     return false;
 }
@@ -281,9 +307,11 @@
         PRNTVITAG("handle is null");
         return false;
     }
+    if (!proc_id && !proc_id_len && !msgpub && !msgpub_len)
+        return subscribe_read(NULL, NULL, timeout_ms, &get<2>(*b)) == 0;
 
     string topic, msg;
-    auto ret = subscribe_read(&topic, &msg, timeout_ms, &get<1>(*b));
+    auto ret = subscribe_read(&topic, &msg, timeout_ms, &get<2>(*b));
     if (ret < 0) return false;
 
     MsgPublish newPub;
@@ -319,25 +347,28 @@
         return false;
     }
 
+    string url{};
     // BHQueryTopicAddress鑾峰彇proc_id
     string procid{};
-    if (!get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
-        PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
-        return false;
-    }
-
-    auto url("ipc:///tmp/" + procid);
 
     if (remote && remote_len > 0){
         BHAddress addr;
         if (addr.ParseFromArray(remote, remote_len)){
             if (!addr.ip().empty() && addr.port() > 0){
-                // url = "tcp://" + addr.ip() + ":" + to_string(addr.port());
-                printf("======>> BHRequest use remote address %s\n", url.c_str());
+                url = "tcp://" + addr.ip() + ":" + to_string(addr.port());
+                // printf("======>> BHRequest use remote address %s\n", url.c_str());
             }
         }
+    } else if (get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){
+        // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
+        // return false;
+        url = "ipc:///tmp/" + procid;
     }
 
+    if (url.empty()) {
+        set_last_error("bus_request url empty");
+        return false;
+    }
     int rc = request2(url, request, request_len, reply, reply_len, timeout_ms);
     if (rc < 0) return false;
     copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
@@ -358,18 +389,23 @@
         PRNTVITAG("handle is null");
         return false;
     }
+    if (!proc_id && !proc_id_len && !request && !request_len && !src)
+        return read_request(NULL, NULL, timeout_ms, &get<6>(*b)) == 0;
 
     string msg;
-    auto ret = read_request(src, &msg, timeout_ms, &get<3>(*b));
-    if (ret != 0) return false;
+    auto ret = read_request(src, &msg, timeout_ms, &get<6>(*b));
+    // printf("bus_recv_request read_request ret %d msg %s\n", ret, msg.c_str());
+    if (ret < 0) return false;
 
-    string procid{};
-    if (!get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
-        PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
-        return false;
+    if (ret == REPLY_IPC){
+        string procid{};
+        if (get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){
+            // PRNTVITAG("get_proc_id_from_MsgRequestTopic failed");
+            // return false;
+            copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
+        }
     }
 
-    copy_memory(proc_id, proc_id_len, procid.data(), procid.size());
     copy_memory(request, request_len, msg.data(), msg.size());
 
     return true;
@@ -391,7 +427,7 @@
         return false;
     }
 
-    auto ret = send_reply(src, reply, reply_len, &get<3>(*b));
+    auto ret = send_reply(src, reply, reply_len, &get<6>(*b));
 
     if (ret < 0) return false;
     return true;

--
Gitblit v1.8.0