zhangmeng
2024-04-22 16935f4aebffdd1b6580b844391a0aa0f4f3012b
bug fixed
4个文件已修改
112 ■■■■■ 已修改文件
src/bn_api.cpp 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/common.h 30 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/interface_bus_api.cpp 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/nng_wrap.cpp 48 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bn_api.cpp
@@ -97,7 +97,7 @@
    if (!topics || topics_len <= 0) return false;
    const auto& url = get_url(URLRegTopic);
    if (url.empty()) {
    if (!url) {
        set_last_error("BHRegisterTopics url empty");
        return false;
    }
@@ -136,7 +136,7 @@
    if (!topic || topic_len <= 0) return false;
    const auto& url = get_url(URLQueryTopic);
    if (url.empty()) {
    if (!url) {
        set_last_error("BHQueryTopicAddress url empty");
        return false;
    }
@@ -165,7 +165,7 @@
    // if (!query || query_len <= 0) return false;
    const auto& url = get_url(URLQueryProcs);
    if (url.empty()) {
    if (!url) {
        set_last_error("BHQueryProcs url empty");
        return false;
    }
@@ -247,7 +247,7 @@
int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
    const auto& url = get_url(URLSubLocal);
    if (url.empty()) {
    if (!url) {
        set_last_error("BHSubscribeTopics url empty");
        return false;
    }
@@ -257,7 +257,7 @@
int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
    const auto& url = get_url(URLSubNet);
    if (url.empty()) {
    if (!url) {
        set_last_error("BHSubscribeNetTopics url empty");
        return false;
    }
@@ -272,6 +272,9 @@
              int *msgpub_len,
              const int timeout_ms)
{
    if (!proc_id && !proc_id_len && !msgpub && !msgpub_len)
        return subscribe_read(NULL, NULL, timeout_ms) == 0;
    string topic, msg;
    auto ret = subscribe_read(&topic, &msg, timeout_ms);
    // printf("BHReadSub msg topic %s length %lu\n", topic.c_str(), msg.length());
@@ -391,6 +394,9 @@
                  void **src,
                  const int timeout_ms)
{
    if (!proc_id && !proc_id_len && !request && !request_len && !src)
        return read_request(NULL, NULL, timeout_ms) == 0;
    string msg;
    auto ret = read_request(src, &msg, timeout_ms);
    if (ret < 0) return false;
src/common.h
@@ -46,18 +46,18 @@
    URLHeartBeat,
};
static char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //进程注册
static char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //注销
static char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //注册主题
static char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //查询指定的主题
static char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //查询所有注册的进程
static char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //订阅本地主题
static char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //订阅网络主题
static char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc";
static char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc";   //这个是代理中心,用于接收待发布的消息
static char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc";  //这个是客户端从center订阅的通道
static const char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //进程注册
static const char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //注销
static const char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //注册主题
static const char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //查询指定的主题
static const char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //查询所有注册的进程
static const char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //订阅本地主题
static const char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //订阅网络主题
static const char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc";
static const char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc";   //这个是代理中心,用于接收待发布的消息
static const char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc";  //这个是客户端从center订阅的通道
static const std::unordered_map<int, std::string> map_url{
static const std::unordered_map<int, const char*> map_url{
    {URLReg,                IPC_REGISTER},
    {URLDeReg,              IPC_UNREGISTER},
    {URLRegTopic,           IPC_REGTOPIC},
@@ -69,12 +69,12 @@
    {URLSubQueue,           IPC_SUB_QUEUE},
    {URLHeartBeat,          IPC_HEARTBEAT},
};
inline std::string get_url(const int type){
inline const char* get_url(const int type){
    auto iter = map_url.find(type);
    if (iter != map_url.end()){
        return iter->second;
    }
    return {};
    return NULL;
}
template <class... T> struct make_void{typedef void type;};
@@ -133,9 +133,9 @@
    struct psmsg{
        DISABLE_COPY_AND_ASSIGN(psmsg);
        psmsg(const std::string& t, std::string&& m)
        :topic_(t),data_(std::move(m)){}
            :topic_(t),data_(std::move(m)){}
        psmsg(std::string&& t, std::string&& m)
        :topic_(std::move(t)),data_(std::move(m)){}
            :topic_(std::move(t)),data_(std::move(m)){}
        std::string topic_{};
        std::string data_{};
    };
src/interface_bus_api.cpp
@@ -90,6 +90,11 @@
        PRNTVITAG("topics is null");
        return false;
    }
    const auto& url = get_url(URLRegTopic);
    if (!url) {
        set_last_error("bus_register_topics url empty");
        return false;
    }
    if (get<8>(*b).empty()) {
        PRNTVITAG("proc_id is null");
@@ -112,11 +117,6 @@
    string msg(mtl2.SerializeAsString());
    std::move(mtl2);
    const auto& url = get_url(URLRegTopic);
    if (url.empty()) {
        set_last_error("bus_register_topics url empty");
        return false;
    }
    return simple_request(url, msg.data(), msg.size(), reply, reply_len, timeout_ms);
}
@@ -140,7 +140,7 @@
    }
    const auto& url = get_url(URLQueryTopic);
    if (url.empty()) {
    if (!url) {
        set_last_error("bus_query_topic_address url empty");
        return false;
    }
@@ -178,7 +178,7 @@
    // }
    const auto& url = get_url(URLQueryProcs);
    if (url.empty()) {
    if (!url) {
        set_last_error("bus_query_procs url empty");
        return false;
    }
@@ -307,6 +307,8 @@
        PRNTVITAG("handle is null");
        return false;
    }
    if (!proc_id && !proc_id_len && !msgpub && !msgpub_len)
        return subscribe_read(NULL, NULL, timeout_ms, &get<2>(*b)) == 0;
    string topic, msg;
    auto ret = subscribe_read(&topic, &msg, timeout_ms, &get<2>(*b));
@@ -387,6 +389,8 @@
        PRNTVITAG("handle is null");
        return false;
    }
    if (!proc_id && !proc_id_len && !request && !request_len && !src)
        return read_request(NULL, NULL, timeout_ms, &get<6>(*b)) == 0;
    string msg;
    auto ret = read_request(src, &msg, timeout_ms, &get<6>(*b));
src/nng_wrap.cpp
@@ -314,17 +314,29 @@
    _ps_sub* sub = (_ps_sub*)arg;
    if (!sub) sub = singleton<_ps_sub>();
    if (!topic && !msg) {
        lock_guard<mutex> l{sub->mtx_msg_};
        for (int i = 0; i < 2; i++) {
            if (!sub->msg_.empty())
                return 0;
            this_thread::sleep_for(chrono::milliseconds(to_ms));
        }
        return -1;
    }
    TAG;
    int tm = to_ms > 0 ? to_ms : 30;
    unique_lock<mutex> l(sub->mtx_msg_);
    auto status = sub->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [sub]{
        return !sub->msg_.empty();
    });
    if (!status){
        PRNTVITAG("subscribe_read timeout");
        return -1;
    if (sub->msg_.empty()) {
        auto status = sub->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [sub]{
            return !sub->msg_.empty();
        });
        if (!status){
            PRNTVITAG("subscribe_read timeout");
            return -1;
        }
    }
    auto& tmp = sub->msg_.front();
    *topic = std::move(tmp.topic_);
@@ -580,18 +592,30 @@
        if (start_reply(rep->url_, get<1>(get<1>(rep->socks_))) != 0)
            return -1;
    if (!src && !msg) {
        lock_guard<mutex> l{rep->mtx_msg_};
        for (int i = 0; i < 2; i++) {
            if (!rep->msg_.empty())
                return 0;
            this_thread::sleep_for(chrono::milliseconds(to_ms));
        }
        return -1;
    }
    int tm = to_ms > 0 ? to_ms : 30;
    uint64_t key{};
    work* w{};
    {
        unique_lock<mutex> l(rep->mtx_msg_);
        auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{
            return !rep->msg_.empty();
        });
        if (!status){
            PRNTVITAG("read_request timeout");
            return -1;
        if (rep->msg_.empty()) {
            auto status = rep->cv_msg_.wait_for(l, chrono::milliseconds{tm}, [rep]{
                return !rep->msg_.empty();
            });
            if (!status){
                PRNTVITAG("read_request timeout");
                return -1;
            }
        }
        auto iter = rep->msg_.begin();
        key = iter->first;