zhangmeng
2023-03-20 f93a93f959c77aaeaf9154a7d8950691b91e1009
bug fixed
3个文件已修改
44 ■■■■■ 已修改文件
src/bn_api.cpp 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/common.h 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/nng_wrap.cpp 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bn_api.cpp
@@ -505,7 +505,6 @@
    auto back_msg = "back:" + msg;
    BHSendReply(src, back_msg.c_str(), back_msg.size());
    BHFree(src, 0);
}
void TestPub(const char* t, const int t_l, const char* d, const int d_l){
src/common.h
@@ -134,6 +134,8 @@
        DISABLE_COPY_AND_ASSIGN(psmsg);
        psmsg(const std::string& t, std::string&& m)
        :topic_(t),data_(std::move(m)){}
        psmsg(std::string&& t, std::string&& m)
        :topic_(std::move(t)),data_(std::move(m)){}
        std::string topic_{};
        std::string data_{};
    };
@@ -227,6 +229,7 @@
    std::mutex                                      mtx_msg_{};
    std::condition_variable                         cv_msg_{};
    // std::deque<void*>                               q_src_{};
};
template<class T, typename std::enable_if<is_default_c<T>::value, int>::type=0>
src/nng_wrap.cpp
@@ -131,6 +131,8 @@
    set_socket_timeout(sock, timeout_req_rep);
    pub->socket_ = sock;
    pub->t_ = get_thread([](const auto pub){
        string sndmsg{};
        sndmsg.reserve(1024);
        while (!pub->t_quit_.load()) {
            _ps::psmsg *msg{NULL};
            {
@@ -142,9 +144,14 @@
                msg = &pub->msg_.front();
                if (msg->topic_.empty()) {pub->msg_.pop_front(); continue;}
            }
            string sndmsg = (string{msg->topic_}+='\0')+=msg->data_;
            sndmsg += msg->topic_;
            sndmsg += '\0';
            sndmsg += msg->data_;
            int sndmsgsize = (int)sndmsg.size();
            // string sndmsg = (string{msg->topic_}+='\0')+=msg->data_;
            int rc = nn_send(pub->socket_, sndmsg.data(), sndmsg.size(), 0);
            if (rc == (int)sndmsg.size()){
            sndmsg.clear();
            if (rc == sndmsgsize){
                char* tmp{};
                rc = nn_recv(pub->socket_, &tmp, NN_MSG, 0);
                if (rc > 0){
@@ -213,26 +220,30 @@
            // int m_len = nn_recv(sub->socket_, &m, NN_MSG, NN_DONTWAIT);
            int m_len = nn_recv(sub->socket_, &m, NN_MSG, 0);
            if (m_len > 0){
                string tmp_msg{m, (size_t)m_len};
                nn_freemsg(m);
                string topic{tmp_msg.c_str()};
                string msg{};
                char* topic = m;
                char* msg = m + strlen(topic) + 1;
                size_t msgl = m_len - strlen(topic) - 1;
                // string tmp_msg{m, (size_t)m_len};
                // nn_freemsg(m);
                // string topic{tmp_msg.c_str()};
                // string msg{};
                bool found_topic = false;
                {
                    lock_guard<mutex> l{sub->operator()()};
                    for(auto && i : sub->topics_){
                        if (!!!i.compare(topic)){
                            msg = move(tmp_msg.substr(i.size()+1));
                        if (0 == i.compare(topic)){
                            found_topic = true;
                            break;
                        }
                    }
                }
                // printf("======>> subscribe recv topic %s msg length %lu\n", topic, msg.length());
                if (!msg.empty()){
                if (found_topic){
                    lock_guard<mutex> l(sub->mtx_msg_);
                    sub->msg_.emplace_back(move(topic), move(msg));
                    sub->msg_.emplace_back(string(topic), string(msg, msgl));
                    sub->cv_msg_.notify_all();
                }
                nn_freemsg(m);
            }else {
                {
                    lock_guard<mutex> l{sub->mtx_failed_topics_};
@@ -333,17 +344,16 @@
        int& sock = sv->socket_;
        char tmp[1024] = {0};
        while (!sv->t_quit_.load()) {
            if (sock < 0){
                sock = client_socket(sv->url_, NN_RESPONDENT);
                if (sock > 0) set_socket_timeout(sock, 126);
                continue;
            }
            if (sock < 0) continue;
            char* tmp{};
            int rc = nn_recv(sock, &tmp, NN_MSG, 0);
            if (rc > 0){
                nn_freemsg(tmp);
                rc = nn_send(sock, (*sv)().front().data(), (*sv)().front().size(), 0);
                if (rc < 0){
                    PRNTVITAG("heartbeat survey failed");
@@ -605,7 +615,7 @@
        uint64_t key;
        memcpy(&key, src, sizeof(key));
        // auto key = *(static_cast<uint64_t*>(const_cast<void*>(src)));
        free(src);
        // free(src);
        lock_guard<mutex> l{rep->mtx_msg_};
        auto iter = rep->works_.find(key);