zhangmeng
2022-01-13 b379104205af838e883a252e9b8358b0388e6015
src/nng_wrap.cpp
@@ -138,7 +138,7 @@
                msg = &pub->msg_.front();
                if (msg->topic_.empty()) {pub->msg_.pop_front(); continue;}
            }
            string sndmsg(msg->topic_ + msg->data_);
            string sndmsg = (string{msg->topic_}+='\0')+=msg->data_;
            int rc = nn_send(pub->socket_, sndmsg.data(), sndmsg.size(), 0);
            if (rc == (int)sndmsg.size()){
                char* tmp{};
@@ -209,19 +209,18 @@
            if (m_len > 0){
                string tmp_msg{m, (size_t)m_len};
                nn_freemsg(m);
                string topic{}, msg{};
                const auto topic{tmp_msg.c_str()};
                string msg{};
                {
                    lock_guard<mutex> l{(*sub)()};
                    for(auto && i : sub->topics_){
                        if (tmp_msg.size() < i.size()) continue;
                        topic = move(tmp_msg.substr(0, i.size()));
                        if (topic == i){
                            msg = move(tmp_msg.substr(i.size()));
                        if (!!!i.compare(topic)){
                            msg = move(tmp_msg.substr(i.size()+1));
                            break;
                        }
                    }
                }
                printf("======>> subscribe recv topic %s msg length %lu\n", topic.c_str(), msg.length());
                printf("======>> subscribe recv topic %s msg length %lu\n", topic, msg.length());
                if (!msg.empty()){
                    lock_guard<mutex> l(sub->mtx_msg_);
                    sub->msg_.emplace_back(topic, move(msg));
@@ -513,6 +512,7 @@
        get<1>(get<1>(rep->socks_)) = port;
        ipc = "tcp://0.0.0.0:" + to_string(port);
        if(create_server(&get<0>(get<1>(rep->socks_)), ipc, 62, rep) != 0) return -1;
        printf("======>> create server for remote port %d\n", port);
    }else {
        get<0>(get<1>(rep->socks_)).id = numeric_limits<int32_t>::max();
    }