zhangmeng
2021-12-16 8a1a19a41596a44bacef0ab26ffaa88675e402e0
src/nng_wrap.cpp
@@ -235,8 +235,8 @@
            if (rc == (int)sndmsg.size()){
                char* tmp{};
                rc = nn_recv(pub_.socket_, &tmp, NN_MSG, 0);
                nn_freemsg(tmp);
                if (rc > 0){
                    nn_freemsg(tmp);
                    printf("======>> publish topic %s data length %lu\n", msg->topic_.c_str(), msg->data_.size());
                    lock_guard<mutex> l{pub_.mtx_msg_};
                    pub_.msg_.pop_front();
@@ -439,8 +439,8 @@
            char* tmp{};
            int rc = nn_recv(sock, &tmp, NN_MSG, 0);
            nn_freemsg(tmp);
            if (rc > 0){
                nn_freemsg(tmp);
                rc = nn_send(sock, msg.data(), msg.size(), 0);
                if (rc < 0){
                    PRNTVITAG("heartbeat survey failed");
@@ -613,11 +613,7 @@
    return (w);
}
static constexpr int PARALLEL = 62;
static struct work* works_local[PARALLEL]{};
static struct work* works_remote[PARALLEL]{};
static int create_server(nng_socket* sock, const string& url, work** works){
static int create_server(nng_socket* sock, const string& url, const int count){
    TAG;
    if (sock->id > 0) return 0;
@@ -627,22 +623,26 @@
        PRNTVITAG(url);
        return rv;
    }
    for (int i = 0; i < PARALLEL; i++) {
    work** works = (work**)malloc(sizeof(work*) * count);
    for (int i = 0; i < count; i++) {
        works[i] = alloc_work(*sock);
    }
    remove_exist(url);
    rv = nng_listen(*sock, url.c_str(), NULL, 0);
    if (rv < 0){
        free(works);
        PRNTVITAG("create_server nng_listen failed");
        PRNTVITAG(url);
        return rv;
    }
    for (int i = 0; i < PARALLEL; i++) {
    for (int i = 0; i < count; i++) {
        server_cb(works[i]); // this starts them going (INIT state)
    }
    free(works);
    return 0;
}
@@ -660,12 +660,12 @@
        ipc = url;
    }
    reply_.url_ = ipc;
    if(create_server(&reply_.sock_local_, ipc, works_local) != 0) return -1;
    if(create_server(&reply_.sock_local_, ipc, 62) != 0) return -1;
    if (port > 0){
        reply_.port_ = port;
        ipc = "tcp://0.0.0.0:" + to_string(port);
        if(create_server(&reply_.sock_remote_, ipc, works_remote) != 0) return -1;
        if(create_server(&reply_.sock_remote_, ipc, 62) != 0) return -1;
    }else {
        reply_.sock_remote_.id = numeric_limits<int32_t>::max();
    }