| | |
| | | } |
| | | |
| | | |
| | | int NetModSocket::recvandsend(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, |
| | | int NetModSocket::recvandsend(recvandsend_callback_fn callback, |
| | | const struct timespec *timeout , int flag, void * user_data ) { |
| | | |
| | | return shmModSocket.recvandsend( recvbuf, recvsize, key, callback, timeout, flag, user_data); |
| | | return shmModSocket.recvandsend(callback, timeout, flag, user_data); |
| | | } |
| | | |
| | | |
| | |
| | | |
| | | |
| | | |
| | | int recvandsend(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, |
| | | /** |
| | | * recvandsend |
| | | */ |
| | | int recvandsend( recvandsend_callback_fn callback, |
| | | const struct timespec *timeout = NULL , int flag = 0, void * user_data = NULL ); |
| | | |
| | | /** |
| | |
| | | return sockt->recvfrom_nowait(buf, size, key); |
| | | } |
| | | |
| | | |
| | | int net_mod_socket_recvandsend(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, void * user_data) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvandsend( recvbuf, recvsize, key, callback, NULL, 0, user_data); |
| | | |
| | | } |
| | | |
| | | int net_mod_socket_recvandsend_timeout(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, |
| | | int sec, int nsec, void * user_data) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | struct timespec timeout = {sec, nsec}; |
| | | return sockt->recvandsend( recvbuf, recvsize, key, callback, &timeout, BUS_TIMEOUT_FLAG, user_data); |
| | | } |
| | | |
| | | int net_mod_socket_recvandsend_nowait(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, void * user_data) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvandsend( recvbuf, recvsize, key, callback, NULL, BUS_NOWAIT_FLAG, user_data); |
| | | } |
| | | |
| | | |
| | | int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | /** |
| | | * 如果建立连接的节点没有接受到消息等待timeout的时间后返回 |
| | |
| | | } |
| | | |
| | | |
| | | int net_mod_socket_recvandsend(void *_socket, recvandsend_callback_fn callback, void * user_data) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvandsend( callback, NULL, 0, user_data); |
| | | |
| | | } |
| | | |
| | | int net_mod_socket_recvandsend_timeout(void *_socket, recvandsend_callback_fn callback, |
| | | int sec, int nsec, void * user_data) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | struct timespec timeout = {sec, nsec}; |
| | | return sockt->recvandsend( callback, &timeout, BUS_TIMEOUT_FLAG, user_data); |
| | | } |
| | | |
| | | int net_mod_socket_recvandsend_nowait(void *_socket, recvandsend_callback_fn callback, void * user_data) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvandsend( callback, NULL, BUS_NOWAIT_FLAG, user_data); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 向node_arr 中的所有网络节点发布消息 |
| | | * @node_arr 网络节点组, @node_arr_len该数组长度 |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * @brief 接受消息,并把callback函数返回的数据发送回对方,一直等待完成 |
| | | * |
| | | * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放 |
| | | * @param recvsize 接受到消息的长度 |
| | | * @param key 从谁哪里收到的信息 |
| | | * @callback void (*recv_callback_fn)(void **sendbuf, int *sendsize, void * user_data) |
| | | * sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示返回的数据,和返回数据的长度。 |
| | | * |
| | | * @return 0是成功, 其他值是失败的错误码 |
| | | */ |
| | | int net_mod_socket_recvandsend(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, void * user_data); |
| | | |
| | | /** |
| | | * @brief 接受消息,并把callback函数返回的数据发送回对方,在指定的时间内即使没有完成也返回 |
| | | * |
| | | * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放 |
| | | * @param recvsize 接受到消息的长度 |
| | | * @param key 从谁哪里收到的信息 |
| | | * @callback void (*recv_callback_fn)(void **sendbuf, int *sendsize, void * user_data) |
| | | * sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示返回的数据,和返回数据的长度。 |
| | | * |
| | | * @param sec 秒 |
| | | * @param nsec 纳秒 |
| | | * |
| | | * @return 0是成功, 其他值是失败的错误码 |
| | | */ |
| | | int net_mod_socket_recvandsend_timeout(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, |
| | | int sec, int nsec, void * user_data ) ; |
| | | |
| | | |
| | | /** |
| | | * @brief 接受消息,并把callback函数返回的数据发送回对方,无论成功与否立刻返回 |
| | | * |
| | | * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放 |
| | | * @param recvsize 接受到消息的长度 |
| | | * @param key 从谁哪里收到的信息 |
| | | * @callback void (*recv_callback_fn)(void **sendbuf, int *sendsize, void * user_data) |
| | | * sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示返回的数据,和返回数据的长度。 |
| | | * |
| | | * @return 0是成功, 其他值是失败的错误码 |
| | | */ |
| | | int net_mod_socket_recvandsend_nowait(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, void * user_data) ; |
| | | |
| | | |
| | | |
| | | /** |
| | | * @brief 跨机器发送消息并接受返回的应答消息,直到发送完成才返回 |
| | |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout); |
| | | |
| | | |
| | | |
| | | |
| | | /** |
| | | * @brief 接受消息,并把callback函数返回的数据发送回对方,一直等待完成 |
| | | * |
| | | * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放 |
| | | * @param recvsize 接受到消息的长度 |
| | | * @param key 从谁哪里收到的信息 |
| | | * @callback void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data) |
| | | * @recvbuf 收到的数据 |
| | | * @recvsize 收到的数据的大小 |
| | | * @key 接受数据并并发送数据的对象 |
| | | * @sendbuf 存储返回值的地址,表示返回的数据 |
| | | * @sendsize 存储返回值的地址, 返回数据的长度 |
| | | * |
| | | * @return 0是成功, 其他值是失败的错误码 |
| | | */ |
| | | int net_mod_socket_recvandsend(void *_socket, recvandsend_callback_fn callback, void * user_data); |
| | | |
| | | /** |
| | | * @brief 接受消息,并把callback函数返回的数据发送回对方,在指定的时间内即使没有完成也返回 |
| | | * |
| | | * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放 |
| | | * @param recvsize 接受到消息的长度 |
| | | * @param key 从谁哪里收到的信息 |
| | | * @callback void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data) |
| | | * @recvbuf 收到的数据 |
| | | * @recvsize 收到的数据的大小 |
| | | * @key 接受数据并并发送数据的对象 |
| | | * @sendbuf 存储返回值的地址,表示返回的数据 |
| | | * @sendsize 存储返回值的地址, 返回数据的长度 |
| | | * |
| | | * @param sec 秒 |
| | | * @param nsec 纳秒 |
| | | * |
| | | * @return 0是成功, 其他值是失败的错误码 |
| | | */ |
| | | int net_mod_socket_recvandsend_timeout(void *_socket, recvandsend_callback_fn callback, |
| | | int sec, int nsec, void * user_data ) ; |
| | | |
| | | |
| | | /** |
| | | * @brief 接受消息,并把callback函数返回的数据发送回对方,无论成功与否立刻返回 |
| | | * |
| | | * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放 |
| | | * @param recvsize 接受到消息的长度 |
| | | * @param key 从谁哪里收到的信息 |
| | | * @callback void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data) |
| | | * @recvbuf 收到的数据 |
| | | * @recvsize 收到的数据的大小 |
| | | * @key 接受数据并并发送数据的对象 |
| | | * @sendbuf 存储返回值的地址,表示返回的数据 |
| | | * @sendsize 存储返回值的地址, 返回数据的长度 |
| | | * |
| | | * @return 0是成功, 其他值是失败的错误码 |
| | | */ |
| | | int net_mod_socket_recvandsend_nowait(void *_socket, recvandsend_callback_fn callback, void * user_data) ; |
| | | |
| | | |
| | | /** |
| | | * @brief 跨机器发送消息并接受返回的应答消息,不管是否发送完成立刻返回 |
| | | * |
| | |
| | | } |
| | | |
| | | |
| | | int ShmModSocket::recvandsend(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, |
| | | int ShmModSocket::recvandsend( recvandsend_callback_fn callback, |
| | | const struct timespec *timeout , int flag, void * user_data ) { |
| | | int rv = shm_recvandsend(shm_socket, recvbuf, recvsize, key, callback, timeout, flag, user_data); |
| | | if(rv == 0) { |
| | | logger->debug("ShmModSocket::shm_recvandsend: success. key = %d\n", *key); |
| | | return 0; |
| | | } |
| | | |
| | | logger->debug("ShmModSocket::shm_recvandsend : failed. %s", bus_strerror(rv)); |
| | | return rv; |
| | | return shm_recvandsend(shm_socket, callback, timeout, flag, user_data); |
| | | } |
| | | |
| | | // // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | |
| | | const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | |
| | | int recvandsend(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, |
| | | const struct timespec *timeout = NULL , int flag = 0, void * user_data = NULL); |
| | | /** |
| | | * |
| | | */ |
| | | int recvandsend( recvandsend_callback_fn callback, const struct timespec *timeout = NULL , int flag = 0, void * user_data = NULL); |
| | | |
| | | /** |
| | | * 订阅指定主题 |
| | |
| | | } |
| | | |
| | | /** |
| | | * @callback void (*recv_callback_fn)(void **sendbuf, int *sendsize) |
| | | * @callback void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize) |
| | | * sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示发送数据,和发送数据的大小。 |
| | | * |
| | | */ |
| | | int shm_recvandsend(shm_socket_t *sockt, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, |
| | | int shm_recvandsend(shm_socket_t *sockt, recvandsend_callback_fn callback, |
| | | const struct timespec *timeout, int flag, void *user_data) { |
| | | |
| | | int rv; |
| | |
| | | |
| | | rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag); |
| | | |
| | | |
| | | |
| | | if (rv != 0) { |
| | | if(rv == ETIMEDOUT) |
| | | if(rv == ETIMEDOUT){ |
| | | logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(EBUS_TIMEOUT)); |
| | | return EBUS_TIMEOUT; |
| | | } |
| | | |
| | | logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); |
| | | logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | |
| | | if(recvbuf != NULL) { |
| | | void *_buf = malloc(recvpak.size); |
| | | memcpy(_buf, recvpak.buf, recvpak.size); |
| | | *recvbuf = _buf; |
| | | } |
| | | |
| | | if(recvsize != NULL) |
| | | *recvsize = recvpak.size; |
| | | |
| | | if(key != NULL) |
| | | *key = recvpak.key; |
| | | |
| | | |
| | | void *recvbuf = malloc(recvpak.size); |
| | | memcpy(recvbuf, recvpak.buf, recvpak.size); |
| | | mm_free(recvpak.buf); |
| | | |
| | | callback(&sendbuf, &sendsize, user_data); |
| | | callback(recvbuf, recvpak.size, recvpak.key, &sendbuf, &sendsize, user_data); |
| | | |
| | | shm_packet_t sendpak; |
| | | sendpak.key = sockt->key; |
| | |
| | | |
| | | } shm_socket_t; |
| | | |
| | | // typedef void (*recv_callback_fn)(void **sendbuf, int *sendsize); |
| | | typedef std::function<void(void **sendbuf, int *sendsize, void *user_data)> recv_callback_fn; |
| | | // typedef void (*recvandsend_callback_fn)(void **sendbuf, int *sendsize); |
| | | typedef std::function<void(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void *user_data)> recvandsend_callback_fn; |
| | | |
| | | size_t shm_socket_remove_keys(int keys[], size_t length); |
| | | |
| | |
| | | const struct timespec * timeout = NULL, int flags = 0); |
| | | |
| | | /** |
| | | * @callback void (*recv_callback_fn)(void **sendbuf, int *sendsize) |
| | | * sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示发送数据,和发送数据的大小。 |
| | | * @callback void (*recvandsend_callback_fn)(void *recvbuf, int recvsize, int key, void **sendbuf, int *sendsize, void * user_data) |
| | | * @recvbuf 收到的数据 |
| | | * @recvsize 收到的数据的大小 |
| | | * @key 接受数据并并发送数据的对象 |
| | | * @sendbuf 存储返回值的地址,表示返回的数据 |
| | | * @sendsize 存储返回值的地址, 返回数据的长度 |
| | | * |
| | | */ |
| | | int shm_recvandsend(shm_socket_t *sockt, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, |
| | | int shm_recvandsend(shm_socket_t *sockt, recvandsend_callback_fn callback, |
| | | const struct timespec *timeout = NULL, int flag = 0, void * user_data = NULL); |
| | | |
| | | |
| | |
| | | ) |
| | | |
| | | |
| | | add_executable(test_recvandsend test_recvandsend.cpp) |
| | | target_link_libraries(test_recvandsend PRIVATE shm_queue ${EXTRA_LIBS} ) |
| | | |
| | | |
| | | target_include_directories(test_net_mod_socket PRIVATE |
| | | "${PROJECT_BINARY_DIR}" |
| | | ${EXTRA_INCLUDES} |
| | |
| | | printf("start reply\n"); |
| | | void *ser = net_mod_socket_open(); |
| | | net_mod_socket_bind(ser, mkey); |
| | | int recvsize; |
| | | void *recvbuf; |
| | | char sendbuf[512]; |
| | | int rv; |
| | | int key; |
| | | while(true) { |
| | | rv = net_mod_socket_recvandsend_timeout(ser, &recvbuf, &recvsize, &key, [&](void ** buf, int *size, void * user_data){ |
| | | printf( "server: RECEIVED REQUEST FROM %d : %s\n", key, recvbuf); |
| | | rv = net_mod_socket_recvandsend_timeout(ser, [&]( void *recvbuf, int recvsize, int key, void ** sendbuf_ptr, int *sendsize_ptr, void * user_data){ |
| | | printf( "server: RECEIVED REQUEST FROM %d : %s\n", key, (char *)recvbuf); |
| | | sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf); |
| | | // buf 和 size是返回值 |
| | | *buf = sendbuf; |
| | | *size = strlen(sendbuf) + 1; |
| | | *sendbuf_ptr = sendbuf; |
| | | *sendsize_ptr = strlen(sendbuf) + 1; |
| | | //recvbuf是分配到堆里的,使用完后不要忘记释放掉 |
| | | free(recvbuf); |
| | | return; |
| | | }, 0, 2000000, NULL ); |