wangzhengquan
2021-02-04 546bdaa58724ec6a19b56e800ad60963bd3bd1bc
modify callback function
10个文件已修改
232 ■■■■ 已修改文件
src/net/net_mod_socket.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.h 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.cpp 42 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.h 104 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.cpp 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/CMakeLists.txt 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.cpp 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.cpp
@@ -498,10 +498,10 @@
}
int NetModSocket::recvandsend(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback,
int NetModSocket::recvandsend(recvandsend_callback_fn callback,
                              const struct timespec *timeout , int flag, void * user_data ) {
  return shmModSocket.recvandsend( recvbuf, recvsize, key, callback, timeout, flag, user_data);
  return shmModSocket.recvandsend(callback, timeout, flag, user_data);
}
 
src/net/net_mod_socket.h
@@ -195,7 +195,10 @@
  
  int recvandsend(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback,
  /**
   * recvandsend
   */
  int recvandsend( recvandsend_callback_fn callback,
                                        const struct timespec *timeout = NULL , int flag = 0, void * user_data = NULL );
  /**
src/net/net_mod_socket_wrapper.cpp
@@ -90,34 +90,11 @@
    return sockt->recvfrom_nowait(buf, size, key);
}
int net_mod_socket_recvandsend(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, void * user_data) {
  NetModSocket *sockt = (NetModSocket *)_socket;
  return sockt->recvandsend( recvbuf, recvsize, key, callback, NULL, 0, user_data);
}
int net_mod_socket_recvandsend_timeout(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback,
                                       int sec, int nsec, void * user_data) {
  NetModSocket *sockt = (NetModSocket *)_socket;
  struct timespec timeout = {sec, nsec};
  return sockt->recvandsend( recvbuf, recvsize, key, callback, &timeout, BUS_TIMEOUT_FLAG, user_data);
}
int net_mod_socket_recvandsend_nowait(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, void * user_data) {
  NetModSocket *sockt = (NetModSocket *)_socket;
  return sockt->recvandsend( recvbuf, recvsize, key, callback, NULL, BUS_NOWAIT_FLAG, user_data);
}
int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size);
}
/**
 * 如果建立连接的节点没有接受到消息等待timeout的时间后返回
@@ -137,6 +114,25 @@
}
int net_mod_socket_recvandsend(void *_socket, recvandsend_callback_fn callback, void * user_data) {
  NetModSocket *sockt = (NetModSocket *)_socket;
  return sockt->recvandsend(  callback, NULL, 0, user_data);
}
int net_mod_socket_recvandsend_timeout(void *_socket, recvandsend_callback_fn callback,
                                       int sec, int nsec, void * user_data) {
  NetModSocket *sockt = (NetModSocket *)_socket;
  struct timespec timeout = {sec, nsec};
  return sockt->recvandsend(  callback, &timeout, BUS_TIMEOUT_FLAG, user_data);
}
int net_mod_socket_recvandsend_nowait(void *_socket,  recvandsend_callback_fn callback, void * user_data) {
  NetModSocket *sockt = (NetModSocket *)_socket;
  return sockt->recvandsend(  callback, NULL, BUS_NOWAIT_FLAG, user_data);
}
 /**
 * 向node_arr 中的所有网络节点发布消息
 * @node_arr 网络节点组, @node_arr_len该数组长度
src/net/net_mod_socket_wrapper.h
@@ -122,51 +122,6 @@
/**
 * @brief 接受消息,并把callback函数返回的数据发送回对方,一直等待完成
 *
 * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放
 * @param recvsize 接受到消息的长度
 * @param key 从谁哪里收到的信息
 * @callback  void (*recv_callback_fn)(void **sendbuf, int *sendsize, void * user_data)
 *            sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示返回的数据,和返回数据的长度。
 *
 * @return 0是成功, 其他值是失败的错误码
 */
int net_mod_socket_recvandsend(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, void * user_data);
/**
 * @brief 接受消息,并把callback函数返回的数据发送回对方,在指定的时间内即使没有完成也返回
 *
 * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放
 * @param recvsize 接受到消息的长度
 * @param key 从谁哪里收到的信息
 * @callback  void (*recv_callback_fn)(void **sendbuf, int *sendsize, void * user_data)
 *            sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示返回的数据,和返回数据的长度。
 *
 * @param sec 秒
 * @param nsec 纳秒
 *
 * @return 0是成功, 其他值是失败的错误码
 */
int net_mod_socket_recvandsend_timeout(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback,
                                      int sec, int nsec, void * user_data ) ;
/**
 * @brief 接受消息,并把callback函数返回的数据发送回对方,无论成功与否立刻返回
 *
 * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放
 * @param recvsize 接受到消息的长度
 * @param key 从谁哪里收到的信息
 * @callback  void (*recv_callback_fn)(void **sendbuf, int *sendsize, void * user_data)
 *            sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示返回的数据,和返回数据的长度。
 *
 * @return 0是成功, 其他值是失败的错误码
 */
int net_mod_socket_recvandsend_nowait(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, void * user_data) ;
/**
 * @brief 跨机器发送消息并接受返回的应答消息,直到发送完成才返回
@@ -205,6 +160,65 @@
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout);
/**
 * @brief 接受消息,并把callback函数返回的数据发送回对方,一直等待完成
 *
 * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放
 * @param recvsize 接受到消息的长度
 * @param key 从谁哪里收到的信息
 * @callback  void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data)
 *                  @recvbuf 收到的数据
 *                  @recvsize 收到的数据的大小
 *                  @key 接受数据并并发送数据的对象
 *                  @sendbuf 存储返回值的地址,表示返回的数据
 *                  @sendsize 存储返回值的地址, 返回数据的长度
 *
 * @return 0是成功, 其他值是失败的错误码
 */
int net_mod_socket_recvandsend(void *_socket,  recvandsend_callback_fn callback, void * user_data);
/**
 * @brief 接受消息,并把callback函数返回的数据发送回对方,在指定的时间内即使没有完成也返回
 *
 * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放
 * @param recvsize 接受到消息的长度
 * @param key 从谁哪里收到的信息
 * @callback  void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data)
 *                  @recvbuf 收到的数据
 *                  @recvsize 收到的数据的大小
 *                  @key 接受数据并并发送数据的对象
 *                  @sendbuf 存储返回值的地址,表示返回的数据
 *                  @sendsize 存储返回值的地址, 返回数据的长度
 *
 * @param sec 秒
 * @param nsec 纳秒
 *
 * @return 0是成功, 其他值是失败的错误码
 */
int net_mod_socket_recvandsend_timeout(void *_socket, recvandsend_callback_fn callback,
                                       int sec, int nsec, void * user_data ) ;
/**
 * @brief 接受消息,并把callback函数返回的数据发送回对方,无论成功与否立刻返回
 *
 * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放
 * @param recvsize 接受到消息的长度
 * @param key 从谁哪里收到的信息
 * @callback  void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data)
 *                  @recvbuf 收到的数据
 *                  @recvsize 收到的数据的大小
 *                  @key 接受数据并并发送数据的对象
 *                  @sendbuf 存储返回值的地址,表示返回的数据
 *                  @sendsize 存储返回值的地址, 返回数据的长度
 *
 * @return 0是成功, 其他值是失败的错误码
 */
int net_mod_socket_recvandsend_nowait(void *_socket, recvandsend_callback_fn callback, void * user_data) ;
/**
 * @brief 跨机器发送消息并接受返回的应答消息,不管是否发送完成立刻返回
 *
src/socket/shm_mod_socket.cpp
@@ -91,16 +91,9 @@
}
int ShmModSocket::recvandsend(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback,
int ShmModSocket::recvandsend( recvandsend_callback_fn callback,
                    const struct timespec *timeout , int flag, void * user_data ) {
  int rv = shm_recvandsend(shm_socket, recvbuf, recvsize, key, callback, timeout, flag, user_data);
  if(rv == 0) {
    logger->debug("ShmModSocket::shm_recvandsend: success. key = %d\n", *key);
    return 0;
  }
  logger->debug("ShmModSocket::shm_recvandsend :  failed. %s", bus_strerror(rv));
  return rv;
  return shm_recvandsend(shm_socket, callback, timeout, flag, user_data);
}
 
// // 超时返回。 @sec 秒 , @nsec 纳秒
src/socket/shm_mod_socket.h
@@ -87,8 +87,10 @@
     const struct timespec *timeout = NULL, int flag = 0);
  int recvandsend(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback,
                      const struct timespec *timeout = NULL , int flag = 0, void * user_data = NULL);
    /**
     *
     */
  int recvandsend( recvandsend_callback_fn callback, const struct timespec *timeout = NULL , int flag = 0, void * user_data = NULL);
    /**
     * 订阅指定主题
src/socket/shm_socket.cpp
@@ -242,11 +242,11 @@
}
/**
 * @callback  void (*recv_callback_fn)(void **sendbuf, int *sendsize)
 * @callback  void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize)
 *            sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示发送数据,和发送数据的大小。
 * 
 */
int shm_recvandsend(shm_socket_t *sockt, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback,
int shm_recvandsend(shm_socket_t *sockt,  recvandsend_callback_fn callback,
  const struct timespec *timeout, int flag, void *user_data) {
  
  int rv;
@@ -257,31 +257,21 @@
  rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag);
  if (rv != 0) {
    if(rv == ETIMEDOUT)
    if(rv == ETIMEDOUT){
      logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(EBUS_TIMEOUT));
      return EBUS_TIMEOUT;
    }
    
    logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
    logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
    return rv;
  }
   
  if(recvbuf != NULL) {
    void *_buf = malloc(recvpak.size);
    memcpy(_buf, recvpak.buf, recvpak.size);
    *recvbuf = _buf;
  }
  if(recvsize != NULL)
    *recvsize = recvpak.size;
  if(key != NULL)
    *key = recvpak.key;
  void *recvbuf = malloc(recvpak.size);
  memcpy(recvbuf, recvpak.buf, recvpak.size);
  mm_free(recvpak.buf);
  callback(&sendbuf, &sendsize, user_data);
  callback(recvbuf, recvpak.size, recvpak.key, &sendbuf, &sendsize, user_data);
  shm_packet_t sendpak;
  sendpak.key = sockt->key;
src/socket/shm_socket.h
@@ -41,8 +41,8 @@
} shm_socket_t;
// typedef void (*recv_callback_fn)(void **sendbuf, int *sendsize);
typedef std::function<void(void **sendbuf, int *sendsize, void *user_data)> recv_callback_fn;
// typedef void (*recvandsend_callback_fn)(void **sendbuf, int *sendsize);
typedef std::function<void(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void *user_data)> recvandsend_callback_fn;
size_t shm_socket_remove_keys(int keys[], size_t length);
@@ -67,11 +67,15 @@
    const struct timespec * timeout = NULL,  int flags = 0);
/**
 * @callback  void (*recv_callback_fn)(void **sendbuf, int *sendsize)
 *            sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示发送数据,和发送数据的大小。
 * @callback  void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data)
 *                  @recvbuf 收到的数据
 *                  @recvsize 收到的数据的大小
 *                  @key 接受数据并并发送数据的对象
 *                  @sendbuf 存储返回值的地址,表示返回的数据
 *                  @sendsize 存储返回值的地址, 返回数据的长度
 *
 */
int shm_recvandsend(shm_socket_t *sockt, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback,
int shm_recvandsend(shm_socket_t *sockt,  recvandsend_callback_fn callback,
                    const struct timespec *timeout = NULL, int flag = 0,  void * user_data = NULL);
test_net_socket/CMakeLists.txt
@@ -24,10 +24,6 @@
                            )
add_executable(test_recvandsend test_recvandsend.cpp)
target_link_libraries(test_recvandsend PRIVATE shm_queue  ${EXTRA_LIBS} )
target_include_directories(test_net_mod_socket PRIVATE
                            "${PROJECT_BINARY_DIR}"
                             ${EXTRA_INCLUDES}
test_net_socket/test_net_mod_socket.cpp
@@ -137,18 +137,16 @@
  printf("start reply\n");
  void *ser = net_mod_socket_open();
  net_mod_socket_bind(ser, mkey);
  int recvsize;
  void *recvbuf;
  char sendbuf[512];
  int rv;
  int key;
  while(true) {
    rv = net_mod_socket_recvandsend_timeout(ser, &recvbuf, &recvsize, &key, [&](void ** buf, int *size, void * user_data){
    printf( "server: RECEIVED REQUEST FROM  %d : %s\n", key, recvbuf);
    rv = net_mod_socket_recvandsend_timeout(ser, [&]( void *recvbuf, int recvsize, int key, void ** sendbuf_ptr, int *sendsize_ptr, void * user_data){
    printf( "server: RECEIVED REQUEST FROM  %d : %s\n", key, (char *)recvbuf);
    sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf);
    // buf 和 size是返回值
    *buf = sendbuf;
    *size = strlen(sendbuf) + 1;
    *sendbuf_ptr = sendbuf;
    *sendsize_ptr = strlen(sendbuf) + 1;
    //recvbuf是分配到堆里的,使用完后不要忘记释放掉
    free(recvbuf);
    return;
    }, 0, 2000000, NULL );