zhangmeng
2021-12-16 8a1a19a41596a44bacef0ab26ffaa88675e402e0
stash
3个文件已修改
91 ■■■■ 已修改文件
src/bn_api.h 66 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/exported_symbols 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/nng_wrap.cpp 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bn_api.h
@@ -5,19 +5,6 @@
extern "C" {
#endif
typedef int (*FBHApiIn1Out1)(const void *proc_info,
                             const int proc_info_len,
                             void **reply,
                             int *reply_len,
                             const int timeout_ms);
int BHApiIn1Out1Proxy(FBHApiIn1Out1 func,
                      const void *request,
                      const int request_len,
                      void **reply,
                      int *reply_len,
                      const int timeout_ms);
int BHRegister(const void *proc_info,
               const int proc_info_len,
               void **reply,
@@ -62,26 +49,6 @@
                         void **reply,
                         int *reply_len,
                         const int timeout_ms);
typedef void (*FSubDataCallback)(const void *proc_id,
                                 int proc_id_len,
                                 const void *data,
                                 int data_len);
typedef void (*FServerCallback)(const void *proc_id,
                                int proc_id_len,
                                const void *data,
                                int data_len,
                                void *src);
typedef void (*FClientCallback)(const void *proc_id,
                                int proc_id_len,
                                const void *msg_id,
                                int msg_id_len,
                                const void *data,
                                int data_len);
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb);
int BHHeartbeatEasy(const int timeout_ms);
int BHHeartbeat(const void *proc_info,
@@ -128,12 +95,45 @@
int BHGetLastError(void **msg, int *msg_len);
/////////////////////////////////////////////////////
int BHAsyncRequest(const void *remote,
                   const int remote_len,
                   const void *request,
                   const int request_len,
                   void **msg_id,
                   int *msg_id_len);
typedef void (*FSubDataCallback)(const void *proc_id,
                                 int proc_id_len,
                                 const void *data,
                                 int data_len);
typedef void (*FServerCallback)(const void *proc_id,
                                int proc_id_len,
                                const void *data,
                                int data_len,
                                void *src);
typedef void (*FClientCallback)(const void *proc_id,
                                int proc_id_len,
                                const void *msg_id,
                                int msg_id_len,
                                const void *data,
                                int data_len);
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb);
typedef int (*FBHApiIn1Out1)(const void *proc_info,
                             const int proc_info_len,
                             void **reply,
                             int *reply_len,
                             const int timeout_ms);
int BHApiIn1Out1Proxy(FBHApiIn1Out1 func,
                      const void *request,
                      const int request_len,
                      void **reply,
                      int *reply_len,
                      const int timeout_ms);
//////////////////////////////////////////////////////////
void TestRequest(int type, const char* msg, const int len);
src/exported_symbols
@@ -1,6 +1,5 @@
VERS_1.0 {
global:
    BHApiIn1Out1Proxy;
    BHRegister;
    BHUnregister;
    BHRegisterTopics;
@@ -8,12 +7,10 @@
    BHQueryProcs;
    BHSubscribeTopics;
    BHSubscribeNetTopics;
    BHStartWorker;
    BHHeartbeatEasy;
    BHHeartbeat;
    BHPublish;
    BHReadSub;
    BHAsyncRequest;
    BHRequest;
    BHReadRequest;
    BHSendReply;
@@ -21,6 +18,10 @@
    BHGetLastError;
    BHFree;
    BHApiIn1Out1Proxy;
    BHStartWorker;
    BHAsyncRequest;
    TestRequest;
    TestReply;
    TestPub;
src/nng_wrap.cpp
@@ -613,11 +613,7 @@
    return (w);
}
static constexpr int PARALLEL = 62;
static struct work* works_local[PARALLEL]{};
static struct work* works_remote[PARALLEL]{};
static int create_server(nng_socket* sock, const string& url, work** works){
static int create_server(nng_socket* sock, const string& url, const int count){
    TAG;
    if (sock->id > 0) return 0;
@@ -627,22 +623,26 @@
        PRNTVITAG(url);
        return rv;
    }
    for (int i = 0; i < PARALLEL; i++) {
    work** works = (work**)malloc(sizeof(work*) * count);
    for (int i = 0; i < count; i++) {
        works[i] = alloc_work(*sock);
    }
    remove_exist(url);
    rv = nng_listen(*sock, url.c_str(), NULL, 0);
    if (rv < 0){
        free(works);
        PRNTVITAG("create_server nng_listen failed");
        PRNTVITAG(url);
        return rv;
    }
    for (int i = 0; i < PARALLEL; i++) {
    for (int i = 0; i < count; i++) {
        server_cb(works[i]); // this starts them going (INIT state)
    }
    free(works);
    return 0;
}
@@ -660,12 +660,12 @@
        ipc = url;
    }
    reply_.url_ = ipc;
    if(create_server(&reply_.sock_local_, ipc, works_local) != 0) return -1;
    if(create_server(&reply_.sock_local_, ipc, 62) != 0) return -1;
    if (port > 0){
        reply_.port_ = port;
        ipc = "tcp://0.0.0.0:" + to_string(port);
        if(create_server(&reply_.sock_remote_, ipc, works_remote) != 0) return -1;
        if(create_server(&reply_.sock_remote_, ipc, 62) != 0) return -1;
    }else {
        reply_.sock_remote_.id = numeric_limits<int32_t>::max();
    }