zhangmeng
2021-12-13 a884637d0376d469ee307ebe1d117ae908a4c340
bug fixed for asyn rep-req timeout
4个文件已修改
308 ■■■■■ 已修改文件
main.cpp 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/cpp/CMakeLists.txt 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bn_api.cpp 42 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/nng_wrap.cpp 255 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.cpp
@@ -6,6 +6,7 @@
#include <vector>
#include <thread>
#include <chrono>
#include <atomic>
using namespace std;
#include "src/bn_api.h"
@@ -15,10 +16,11 @@
  thread([]{
    string base_cont("test_req_rep==");
    atomic<uint64_t> index{0};
    vector<thread> v_t;
    for (int i = 0; i < 200; i++){
      v_t.emplace_back([&base_cont, i]{
        int64_t index = 0;
    for (int i = 0; i < 621; i++){
      v_t.emplace_back([&base_cont, i, &index]{
        while (true) {
          // printf("start request\n");
          // auto s = chrono::steady_clock::now();
@@ -31,7 +33,6 @@
      });
    }
    int64_t index = 0;
    while (true) {
      // printf("start request\n");
      // auto s = chrono::steady_clock::now();
proto/cpp/CMakeLists.txt
@@ -23,7 +23,7 @@
foreach(file ${proto_files})
        message(${file})
        message(${MESSAGE_DIR})
        get_filename_component(FIL_WE ${file} NAME_WE)
        # message(${FIL_WE})
src/bn_api.cpp
@@ -176,7 +176,19 @@
{
    if (!topic || topic_len <= 0) return false;
    return simple_request(get_url(URLQueryTopic), topic, topic_len, reply, reply_len, timeout_ms);
    auto url(get_url(URLQueryTopic));
    if (remote && remote_len > 0){
        BHAddress addr;
        if (addr.ParseFromArray(remote, remote_len)){
            if (!addr.ip().empty() && addr.port() > 0){
                // url = "tcp://" + addr.ip() + ":" + to_string(addr.port());
                printf("======>> BHQueryTopicAddress use remote address %s\n", url.c_str());
            }
        }
    }
    return simple_request(url, topic, topic_len, reply, reply_len, timeout_ms);
}
// 请求在线进程 request
@@ -190,9 +202,19 @@
{
    if (!query || query_len <= 0) return false;
    auto ret = simple_request(get_url(URLQueryProcs), query, query_len, reply, reply_len, timeout_ms);
    // printf("======>> BHQueryProcs *reply %p reply_len %d\n", *reply, *reply_len);
    return ret;
    auto url(get_url(URLQueryProcs));
    if (remote && remote_len > 0){
        BHAddress addr;
        if (addr.ParseFromArray(remote, remote_len)){
            if (!addr.ip().empty() && addr.port() > 0){
                // url = "tcp://" + addr.ip() + ":" + to_string(addr.port());
                printf("======>> BHQueryProcs use remote address %s\n", url.c_str());
            }
        }
    }
    return simple_request(url, query, query_len, reply, reply_len, timeout_ms);
}
// above communicate with center
@@ -352,11 +374,13 @@
    auto url("ipc:///tmp/" + procid);
    BHAddress addr;
    if (addr.ParseFromArray(remote, remote_len)){
        if (!addr.ip().empty() && addr.port() > 0){
            url = "tcp://" + addr.ip() + ":" + to_string(addr.port());
            printf("======>>use remote address %s\n", url.c_str());
    if (remote && remote_len > 0){
        BHAddress addr;
        if (addr.ParseFromArray(remote, remote_len)){
            if (!addr.ip().empty() && addr.port() > 0){
                url = "tcp://" + addr.ip() + ":" + to_string(addr.port());
                printf("======>> BHRequest use remote address %s\n", url.c_str());
            }
        }
    }
    // 使用procid作为ipc通信
src/nng_wrap.cpp
@@ -51,17 +51,17 @@
    }
}
static int server_socket(const string& url, const int protocol, int family=AF_SP){
    int sock = nn_socket(family, protocol);
    if (sock < 0) return sock;
    remove_exist(url);
    int rc = nn_bind(sock, url.c_str());
    if (rc < 0) {
        nn_close(sock);
        return rc;
    }
    return sock;
}
// static int server_socket(const string& url, const int protocol, int family=AF_SP){
//     int sock = nn_socket(family, protocol);
//     if (sock < 0) return sock;
//     remove_exist(url);
//     int rc = nn_bind(sock, url.c_str());
//     if (rc < 0) {
//         nn_close(sock);
//         return rc;
//     }
//     return sock;
// }
static void set_socket_timeout(int sock, const int to_ms){
    nn_setsockopt(sock, NN_SOL_SOCKET, NN_SNDTIMEO, &to_ms, sizeof(to_ms));
@@ -106,17 +106,16 @@
        *dest_len = src_len;
}
static string verbose_info{};
static thread_local string verbose_info{};
#ifndef PRNTVITAG
// #define TAG do{ \
//     if (verbose_info.length() > 8096) verbose_info.clear(); \
//     verbose_info=string("function [")+__FUNCTION__+string("]");}while(0)
/* #define PRNTVITAG(msg) do{ \
            if (verbose_info.length() > 8096) verbose_info.clear(); \
#define TAG do{ verbose_info.clear(); \
                verbose_info=string("function [")+__FUNCTION__+string("]"); \
            }while(0)
#define PRNTVITAG(msg) do{ \
            verbose_info+=string("-> (") + msg + string(")"); \
        }while(0) */
#define TAG
#define PRNTVITAG(args)
        }while(0)
// #define TAG
// #define PRNTVITAG(args)
#endif
void get_last_error(int* ec, void** emsg, int* emsg_len){
    *emsg = NULL;
@@ -163,10 +162,12 @@
///////////////////////////////////////////////////////////
// base class
#define DISABLE_COPY_AND_ASSIGN(className) className(const className&)=delete; \
                                className(className&&)=delete; \
                                className& operator=(const className&)=delete; \
                                className& operator=(className&&)=delete
#define DISABLE_COPY_AND_ASSIGN(className) \
        className(const className&)=delete; \
        className(className&&)=delete; \
        className& operator=(const className&)=delete; \
        className& operator=(className&&)=delete
class _nn{
public:
    DISABLE_COPY_AND_ASSIGN(_nn);
@@ -178,15 +179,17 @@
///////////////////////////////////////////////
// publish
struct psmsg{
    DISABLE_COPY_AND_ASSIGN(psmsg);
    psmsg()=delete;
    psmsg(const std::string& t, std::string&& m)
    :topic_(t),msg_(std::move(m)){}
    std::string topic_{};
    std::string msg_{};
};
class _ps : public _nn{
public:
    struct psmsg{
        DISABLE_COPY_AND_ASSIGN(psmsg);
        psmsg()=delete;
        psmsg(const std::string& t, std::string&& m)
        :topic_(t),data_(std::move(m)){}
        std::string topic_{};
        std::string data_{};
    };
public:
    DISABLE_COPY_AND_ASSIGN(_ps);
    _ps()=default;
@@ -217,7 +220,7 @@
    pub_.socket_ = sock;
    pub_.t_ = thread([]{
        while (!pub_.t_quit_.load()) {
            psmsg *msg{NULL};
            _ps::psmsg *msg{NULL};
            {
                unique_lock<mutex> l{pub_.mtx_msg_};
                pub_.cv_msg_.wait(l, []{
@@ -227,22 +230,14 @@
                msg = &pub_.msg_.front();
                if (msg->topic_.empty()) {pub_.msg_.pop_front(); continue;}
            }
            const auto &topic = msg->topic_;
            const auto topic_size = topic.size();
            const auto &data = msg->msg_;
            const auto data_size = data.size();
            char *sndmsg = (char*)malloc(topic_size + data_size);
            memcpy(sndmsg, topic.data(), topic_size);
            memcpy(sndmsg+topic_size, data.data(), data_size);
            int rc = nn_send(pub_.socket_, sndmsg, data_size+topic_size, 0);
            free(sndmsg);
            if (rc == (int)(data_size+topic_size)){
            string sndmsg(msg->topic_ + msg->data_);
            int rc = nn_send(pub_.socket_, sndmsg.data(), sndmsg.size(), 0);
            if (rc == (int)sndmsg.size()){
                char* tmp{};
                rc = nn_recv(pub_.socket_, &tmp, NN_MSG, 0);
                nn_freemsg(tmp);
                if (rc > 0){
                    nn_freemsg(tmp);
                    printf("======>> publish topic %s data length %lu\n", topic.c_str(), data_size);
                    printf("======>> publish topic %s data length %lu\n", msg->topic_.c_str(), msg->data_.size());
                    lock_guard<mutex> l{pub_.mtx_msg_};
                    pub_.msg_.pop_front();
                    continue;
@@ -297,8 +292,10 @@
    if (sub_.socket_ > 0) return 0;
    sub_.url_ = url;
    TAG;
    int sock = client_socket(url, NN_SUB);
    if (sock < 0){
        PRNTVITAG("client_socket faild\n");
        return -1;
    }
    // set_socket_timeout(sock, timeout_req_rep);
@@ -308,20 +305,20 @@
            char* m;
            int m_len = nn_recv(sub_.socket_, &m, NN_MSG, NN_DONTWAIT);
            if (m_len > 0){
                string tmp_msg{m, (size_t)m_len};
                nn_freemsg(m);
                string topic{}, msg{};
                {
                    lock_guard<mutex> l{sub_.mtx_topics_};
                    for(auto && i : sub_.topics_){
                        auto topic_len = i.size();
                        if (m_len <= (int)topic_len) continue;
                        topic.assign(m, topic_len);
                        if (tmp_msg.size() < i.size()) continue;
                        topic = tmp_msg.substr(0, i.size());
                        if (topic == i){
                            msg.assign(m+topic_len, m_len-topic_len);
                            msg = tmp_msg.substr(i.size());
                            break;
                        }
                    }
                }
                nn_freemsg(m);
                printf("======>> subscribe recv topic %s msg length %lu\n", topic.c_str(), msg.length());
                if (!msg.empty()){
                    lock_guard<mutex> l(sub_.mtx_msg_);
@@ -351,14 +348,19 @@
}
int subscribe_topic(const std::string& topic){
    TAG;
    if (sub_.socket_ < 0){
        subscribe_center(sub_.url_);
    }
    if (sub_.socket_ < 0) return -1;
    if (sub_.socket_ < 0) {
        PRNTVITAG("socket_ < 0");
        return -1;
    }
    auto ret = nn_setsockopt(sub_.socket_, NN_SUB, NN_SUB_SUBSCRIBE, topic.c_str(), topic.length());
    // printf("set NN_SUB_SUBSCRIBE topic %s ret %d\n", topic.c_str(), ret);
    if (ret < 0){
        PRNTVITAG("nn_setsockopt failed");
        lock_guard<mutex> l{sub_.mtx_failed_topics_};
        sub_.failed_topics_.insert(topic);
    }
@@ -381,6 +383,8 @@
int subscribe_read(std::string* topic, std::string* msg, const int to_ms){
    TAG;
    int tm = to_ms > 0 ? to_ms : 30;
    unique_lock<mutex> l(sub_.mtx_msg_);
@@ -393,7 +397,7 @@
    }
    const auto& tmp = sub_.msg_.front();
    *topic = tmp.topic_;
    *msg = tmp.msg_;
    *msg = tmp.data_;
    sub_.msg_.pop_front();
    return 0;
@@ -421,21 +425,22 @@
    survey_.url_ = url;
    survey_.fixed_msg_ = move(fixed_msg);
    survey_.t_ = thread([]{
        TAG;
        int& sock = survey_.socket_;
        const auto& msg = survey_.fixed_msg_;
        while (!survey_.t_quit_.load()) {
            if (sock < 0){
                sock = client_socket(survey_.url_, NN_RESPONDENT);
                if (sock > 0){
                    set_socket_timeout(sock, 126);
                }
                if (sock > 0) set_socket_timeout(sock, 126);
            }
            if (sock < 0) continue;
            char* tmp{};
            int rc = nn_recv(sock, &tmp, NN_MSG, 0);
            nn_freemsg(tmp);
            if (rc > 0){
                nn_freemsg(tmp);
                rc = nn_send(sock, msg.data(), msg.size(), 0);
                if (rc < 0){
                    PRNTVITAG("heartbeat survey failed");
@@ -451,12 +456,6 @@
//////////////////////////////////////////////
// reply for request
int request2(const std::string &ipc, const void* r, const int r_len,
    void** reply, int* reply_len, const int to_ms)
{
    return simple_request(ipc, r, r_len, reply, reply_len, to_ms);
}
enum { INIT, RECV, WAIT, SEND };
struct work {
    int state{-1};
@@ -471,15 +470,38 @@
    DISABLE_COPY_AND_ASSIGN(_rr);
    _rr()=default;
    ~_rr(){
        if(sock_local_.id > 0) nng_close(sock_local_);
        if(sock_remote_.id > 0) nng_close(sock_remote_);
        t_quit_.store(true, memory_order_relaxed);
        if (t_unblock_&&t_unblock_->joinable()) t_unblock_->join();
    }
    const string unblocking_msg_{"~!@#$%^&*()-=<<UNBLOCKING>>=-()*&^%$#@!~"};
    unique_ptr<thread>                              t_unblock_{nullptr};
    atomic_bool                                     t_quit_{false};
    nng_socket                                      sock_local_{0};
    nng_socket                                      sock_remote_{0};
    int                                             port_{-1};
    unordered_map<uint64_t, string>                 msg_{};
    unordered_map<uint64_t, struct work*>           works_{};
    class worker{
        worker& in_op(const worker& w){if(&w!=this){w_=w.w_;life_=w.life_;}return *this;};
    public:
        worker()=default;
        ~worker()=default;
        worker(struct work* w):w_(w),life_(0){}
        worker(const worker& w):w_(w.w_),life_(w.life_){}
        worker(worker&& w):w_(w.w_),life_(w.life_){}
        worker& operator=(const worker& w){return in_op(w);}
        worker& operator=(worker&& w){return in_op(w);}
        operator struct work*() const{return w_;}
        operator int&() {return life_;}
        struct work* w_{};
        int life_{};
    };
    unordered_map<uint64_t, worker>                 works_{};
    uint64_t                                        work_index_{0};
    mutex                                           mtx_msg_{};
    condition_variable                              cv_msg_{};
@@ -488,13 +510,35 @@
static _rr reply_;
static void
server_cb(void *arg)
int request2(const std::string &ipc, const void* r, const int r_len,
    void** reply, int* reply_len, const int to_ms)
{
    const auto suc = simple_request(ipc, r, r_len, reply, reply_len, to_ms);
    if (suc){
        const size_t sl = reply_.unblocking_msg_.size();
        const size_t rl = *reply_len;
        if (sl != rl) return true;
        const auto& s = reply_.unblocking_msg_;
        auto r = (const char*)(*reply);
        if (s.compare(0, sl, r, rl) == 0){
            free(*reply);
            *reply = NULL;
            *reply_len = 0;
            return false;
        }
    }
    return suc;
}
static void server_cb(void *arg)
{
    if (!arg) return;
    struct work *work = (struct work*)arg;
    nng_msg *    msg;
    int          rv;
    uint32_t     when{0};
    // uint32_t     when{0};
    switch (work->state) {
    case INIT:
@@ -522,7 +566,6 @@
    case SEND:
        if ((rv = nng_aio_result(work->aio)) != 0) {
            nng_msg_free(work->msg);
            break;
        }
        work->state = RECV;
        nng_ctx_recv(work->ctx, work->aio);
@@ -534,12 +577,18 @@
static void cb_recv_for_aio(work* w){
    nng_msg *om = w->msg;
    if (!om) return;
    string msg{(const char*)nng_msg_body(om), nng_msg_len(om)};
    nng_msg_free(om);
    lock_guard<mutex> l{reply_.mtx_msg_};
    reply_.works_[reply_.work_index_] = w;
    reply_.msg_[reply_.work_index_] = msg;
    reply_.works_.emplace(reply_.work_index_, w);
    reply_.msg_.emplace(reply_.work_index_, move(msg));
    // reply_.works_.insert({reply_.work_index_, w});
    // reply_.msg_.insert({reply_.work_index_, msg});
    // reply_.works_[reply_.work_index_] = w;
    // reply_.msg_[reply_.work_index_] = msg;
    reply_.work_index_++;
    reply_.cv_msg_.notify_all();
}
@@ -565,9 +614,10 @@
}
static constexpr int PARALLEL = 62;
static struct work* works[PARALLEL]{};
static struct work* works_local[PARALLEL]{};
static struct work* works_remote[PARALLEL]{};
static int create_server(nng_socket* sock, const string& url){
static int create_server(nng_socket* sock, const string& url, work** works){
    TAG;
    if (sock->id > 0) return 0;
@@ -596,22 +646,54 @@
    return 0;
}
static void aio_unblock(work* w, const void* msg, const int msg_len){
    nng_msg_alloc(&w->msg, 0);
    nng_msg_append(w->msg, msg, msg_len);
    nng_sleep_aio(0, w->aio);
}
int start_reply(const std::string& url, const int port){
    TAG;
    string ipc = "ipc:///tmp/" + url;
    if (url.find("ipc://") == 0){
        ipc = url;
    }
    reply_.url_ = ipc;
    if(create_server(&reply_.sock_local_, ipc) != 0) return -1;
    if(create_server(&reply_.sock_local_, ipc, works_local) != 0) return -1;
    if (port > 0){
        reply_.port_ = port;
        ipc = "tcp://0.0.0.0:" + to_string(port);
        if(create_server(&reply_.sock_remote_, ipc) != 0) return -1;
        if(create_server(&reply_.sock_remote_, ipc, works_remote) != 0) return -1;
    }else {
        reply_.sock_remote_.id = numeric_limits<int32_t>::max();
    }
    if (!reply_.t_unblock_){
        reply_.t_unblock_.reset(new thread([]{
            constexpr int idle = 10;
            const auto data = reply_.unblocking_msg_.data();
            const auto data_size = reply_.unblocking_msg_.size();
            while (!reply_.t_quit_.load()) {
                this_thread::sleep_for(chrono::milliseconds{10});
                vector<struct work*> tmp{};
                {
                    lock_guard<mutex> l{reply_.mtx_msg_};
                    for(auto iter = reply_.works_.begin(); iter != reply_.works_.end();){
                        if ((iter->second+=idle) > timeout_req_rep){
                            tmp.push_back(iter->second.w_);
                            iter = reply_.works_.erase(iter);
                        }else {
                            ++iter;
                        }
                    }
                }
                for(auto && w : tmp){
                    aio_unblock(w, data, data_size);
                }
            }
        }));
    }
    return 0;
@@ -619,15 +701,13 @@
int read_request(void** src, std::string* msg, const int to_ms){
    if (reply_.sock_local_.id == 0 || reply_.sock_remote_.id == 0) {
    if (reply_.sock_local_.id == 0 || reply_.sock_remote_.id == 0)
        if (start_reply(reply_.url_, reply_.port_) != 0)
            return -1;
    }
    int tm = to_ms > 0 ? to_ms : 30;
    uint64_t key{};
    string tmpmsg;
    {
        unique_lock<mutex> l(reply_.mtx_msg_);
        auto status = reply_.cv_msg_.wait_for(l, chrono::milliseconds{tm}, []{
@@ -637,16 +717,14 @@
            PRNTVITAG("subscribe_read timeout");
            return -1;
        }
        const auto& iter = reply_.msg_.begin();
        auto iter = reply_.msg_.begin();
        key = iter->first;
        tmpmsg = iter->second;
        *msg = move(iter->second);
        reply_.msg_.erase(iter);
    }
    *msg = move(tmpmsg);
    auto s = (uint64_t*)malloc(sizeof(uint64_t));
    *s = key;
    *src = s;
    *src = malloc(sizeof(uint64_t));
    *(uint64_t*)(*src) = key;
    return 0;
}
@@ -663,12 +741,7 @@
        reply_.works_.erase(iter);
    }
    TAG;
    nng_msg_alloc(&w->msg, 0);
    nng_msg_append(w->msg, msg, msg_len);
    nng_sleep_aio(0, w->aio);
    aio_unblock(w, msg, msg_len);
    return 0;
}