From 1b94589dcb8d497d2d8a208efd61a54631f6b84e Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期三, 23 十二月 2020 16:08:33 +0800 Subject: [PATCH] update --- src/socket/net_mod_socket.c | 60 +++++++++-- src/socket/shm_socket.c | 45 ++++++--- test_net_socket/test_net_mod_socket.c | 6 src/queue/lock_free_queue.h | 22 ++- src/socket/bus_server_socket.c | 36 +++--- src/socket/bus_server_socket_wrapper.c | 9 + test_net_socket/test_bus_stop.c | 54 ++++++++++ test_net_socket/Makefile | 3 src/socket/net_mod_server_socket.c | 2 test_net_socket/heart_beat.c | 41 +++++-- 10 files changed, 202 insertions(+), 76 deletions(-) diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h index ee11da6..84c885c 100644 --- a/src/queue/lock_free_queue.h +++ b/src/queue/lock_free_queue.h @@ -11,6 +11,7 @@ // default Queue size #define LOCK_FREE_Q_DEFAULT_SIZE 16 +// static Logger *logger = LoggerFactory::getLogger(); // define this macro if calls to "size" must return the real size of the // queue. If it is undefined that function will try to take a snapshot of // the queue, but returned value might be bogus @@ -200,7 +201,7 @@ template <typename T, typename AT> class Q_TYPE> bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data) { - // printf("==================LockFreeQueue push before\n"); +LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); if (SemUtil::dec(slots) == -1) { err_msg(errno, "LockFreeQueue push"); return false; @@ -209,7 +210,7 @@ if ( m_qImpl.push(a_data) ) { SemUtil::inc(items); - // printf("==================LockFreeQueue push after\n"); +LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n"); return true; } return false; @@ -247,18 +248,19 @@ bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout) { - +LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n"); if (SemUtil::dec_timeout(slots, timeout) == -1) { if (errno == EAGAIN) return false; else { - // err_msg(errno, "LockFreeQueue push_timeout"); + err_msg(errno, "LockFreeQueue push_timeout"); return false; } } if (m_qImpl.push(a_data)){ - SemUtil::inc(items); + SemUtil::inc(items); +LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n"); return true; } return false; @@ -274,7 +276,8 @@ template <typename T, typename AT> class Q_TYPE> bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data) { - // printf("==================LockFreeQueue pop before\n"); + +LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n"); if (SemUtil::dec(items) == -1) { err_msg(errno, "LockFreeQueue pop"); return false; @@ -282,7 +285,7 @@ if (m_qImpl.pop(a_data)) { SemUtil::inc(slots); - // printf("==================LockFreeQueue pop after\n"); +LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); return true; } return false; @@ -319,7 +322,7 @@ template <typename T, typename AT> class Q_TYPE> bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout) { -// printf("==================LockFreeQueue pop_timeout before\n"); +LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n"); if (SemUtil::dec_timeout(items, timeout) == -1) { if (errno == EAGAIN) return false; @@ -331,7 +334,7 @@ if (m_qImpl.pop(a_data)) { SemUtil::inc(slots); -// printf("==================LockFreeQueue pop_timeout after\n"); +LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n"); return true; } return false; @@ -346,6 +349,7 @@ return m_qImpl.operator[](i); } + template < typename ELEM_T, typename Allocator, diff --git a/src/socket/bus_server_socket.c b/src/socket/bus_server_socket.c index 8c50d37..0f4e52e 100644 --- a/src/socket/bus_server_socket.c +++ b/src/socket/bus_server_socket.c @@ -60,15 +60,19 @@ BusServerSocket::BusServerSocket() { + logger->debug("BusServerSocket Init"); shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); topic_sub_map = NULL; + } BusServerSocket::~BusServerSocket() { SHMKeySet *subscripter_set; SHMTopicSubMap::iterator map_iter; + logger->debug("BusServerSocket destory 1"); stop(); + logger->debug("BusServerSocket destory 2"); if(topic_sub_map != NULL) { for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { @@ -83,6 +87,7 @@ mem_pool_free_by_key(BUS_MAP_KEY); } shm_close_socket(shm_socket); + logger->debug("BusServerSocket destory 3"); } @@ -109,14 +114,13 @@ run_pubsub_proxy(); // 杩涚▼鍋滄鐨勬椂鍊欙紝棰勭暀3绉掕祫婧愬洖鏀剁殑鏃堕棿銆傚惁鍒欙紝浼氬彂鐢熻皟鐢╟lose鐨勬椂鍊欙紝鍏变韩鍐呭瓨鐨勮祫婧愯繕娌℃潵寰楀強鍥炴敹杩涚▼灏遍��鍑轰簡 - sleep(3); return 0; } int BusServerSocket::stop(){ int ret; - + logger->debug("====>stopping"); if( shm_socket->key <= 0) { return -1; } @@ -127,15 +131,11 @@ head.topic_size = 0; head.content_size = 0; - void *recv_buf; - int recv_size; - void *buf; int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL, 0, &buf); if(size > 0) { - ret = shm_sendandrecv(shm_socket, buf, size, shm_socket->key, &recv_buf, &recv_size); + ret = shm_sendandrecv_unsafe(shm_socket, buf, size, shm_socket->key, NULL, NULL); free(buf); - free(recv_buf); return ret; } else { return -1; @@ -260,7 +260,8 @@ topic = strtok(NULL, topic_delim); } - } else if(strcmp(action, "desub") == 0) { + } + else if(strcmp(action, "desub") == 0) { // printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), "")); if(strcmp(trim(topics, 0), "") == 0) { // 鍙栨秷鎵�鏈夎闃� @@ -274,27 +275,26 @@ } } - } else if(strcmp(action, "pub") == 0) { + } + else if(strcmp(action, "pub") == 0) { content = topics + head.topic_size; _proxy_pub(topics, content, head.content_size, key); - } else if(strcmp(action, "stop") == 0) { - logger->info( "Stopping Bus..."); - // snprintf(resp_buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER); - shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key); + } + else if(strcmp(action, "stop") == 0) { + free(buf); break; } else { logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action %s", action); } - // free(action); - // free(topics); - // } else { - // logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg"); - // } free(buf); } + + logger->info( "Stopping Bus..."); + shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key); + return NULL; } diff --git a/src/socket/bus_server_socket_wrapper.c b/src/socket/bus_server_socket_wrapper.c index 5c793c1..220461e 100644 --- a/src/socket/bus_server_socket_wrapper.c +++ b/src/socket/bus_server_socket_wrapper.c @@ -7,7 +7,7 @@ * 鍒涘缓 */ void * bus_server_socket_wrapper_open() { - printf("===bus_server_socket_wrapper_open\n"); + logger->debug("===bus_server_socket_wrapper_open\n"); BusServerSocket *sockt = new BusServerSocket; return (void *)sockt; } @@ -16,9 +16,10 @@ * 鍏抽棴 */ void bus_server_socket_wrapper_close(void *_socket) { - printf("===bus_server_socket_wrapper_close\n"); - BusServerSocket *sockt = (BusServerSocket *)_socket; - delete sockt; + + // BusServerSocket *sockt = (BusServerSocket *)_socket; + //delete sockt; + logger->debug("===bus_server_socket_wrapper_close\n"); } /** diff --git a/src/socket/net_mod_server_socket.c b/src/socket/net_mod_server_socket.c index 2fb70e8..46c3bce 100644 --- a/src/socket/net_mod_server_socket.c +++ b/src/socket/net_mod_server_socket.c @@ -168,9 +168,7 @@ if(request_head.timeout > 0) { timeout.tv_sec = request_head.timeout / 1000; timeout.tv_nsec = (request_head.timeout - timeout.tv_sec * 1000) * 10e6; - // printf(" timeout.tv_sec = %d, timeout.tv_nsec=%ld\n", timeout.tv_sec, timeout.tv_nsec ); - ret = shmModSocket.sendandrecv_unsafe_timeout(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, &timeout); } else if(request_head.timeout == 0) { diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c index 48b0e7c..fb5003e 100644 --- a/src/socket/net_mod_socket.c +++ b/src/socket/net_mod_socket.c @@ -86,15 +86,15 @@ int i, n, recv_size, connfd; net_node_t *node; void *recv_buf = NULL; + struct timespec timeout; + int ret; + int n_req = 0, n_recv_suc = 0, n_resp =0; net_mod_request_head_t request_head = {}; - - int n_req = 0, n_recv_suc = 0, n_resp =0; - net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t)); - int ret; + NetConnPool *mpool; /* Make first caller allocate key for thread-specific data */ @@ -131,7 +131,17 @@ node = &node_arr[i]; if(node->host == NULL || strcmp(node->host, "") == 0 ) { // 鏈湴鍙戦�� - if(shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size) == 0) { + + if(msec == 0) { + ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size); + } else if(msec > 0){ + timeout.tv_sec = msec / 1000; + timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; + ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout); + } else { + ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size); + } + if( ret == 0) { strcpy( ret_arr[n_recv_suc].host,""); ret_arr[n_recv_suc].port = 0; ret_arr[n_recv_suc].key = node->key; @@ -229,7 +239,12 @@ mpool->maxi = -1; - *recv_arr = ret_arr; + if(recv_arr != NULL) { + *recv_arr = ret_arr; + } else { + free_recv_msg_arr(ret_arr, n_recv_suc); + } + if(recv_arr_size != NULL) { *recv_arr_size = n_recv_suc; } @@ -264,9 +279,10 @@ // int pub(char *topic, int topic_size, void *content, int content_size, int port); int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, - int content_size, int timeout) { + int content_size, int msec) { int i, connfd; net_node_t *node; + struct timespec timeout; net_mod_request_head_t request_head; net_mod_recv_msg_t recv_msg; @@ -302,7 +318,16 @@ // 鏈湴鍙戦�� if(node_arr == NULL || arrlen == 0) { - if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) { + if(msec == 0) { + ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key); + } else if(msec > 0) { + timeout.tv_sec = msec / 1000; + timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; + ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout); + } else { + ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key); + } + if(ret == 0 ) { n_pub_suc++; } } @@ -312,9 +337,20 @@ node = &node_arr[i]; if(node->host == NULL) { // 鏈湴鍙戦�� - if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) { - n_pub_suc++; + if(msec == 0) { + ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key); + } else if(msec > 0) { + timeout.tv_sec = msec / 1000; + timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; + ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout); + } else { + ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key); } + + if(ret == 0 ) { + n_pub_suc++; + } + } else { sprintf(portstr, "%d", node->port); @@ -326,7 +362,7 @@ request_head.key = node->key; request_head.content_length = content_size; request_head.topic_length = strlen(topic) + 1; - request_head.timeout = timeout; + request_head.timeout = msec; if(write_request(connfd, request_head, content, content_size, topic, request_head.topic_length) != 0) { LoggerFactory::getLogger()->error(" NetModSocket::_pub_ write_request failture %s:%d\n", node->host, node->port); @@ -341,7 +377,7 @@ while(n_resp < n_req) { /* Wait for listening/connected descriptor(s) to become ready */ - if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, timeout) ) <= 0) { + if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, msec) ) <= 0) { // wirite_set 鍜� read_set 鍦ㄦ寚瀹氭椂闂村唴閮芥病鍑嗗濂� break; } diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c index efb3ef7..c1ac3c8 100644 --- a/src/socket/shm_socket.c +++ b/src/socket/shm_socket.c @@ -45,6 +45,7 @@ shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) { + logger->debug("shm_open_socket\n"); shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); socket->socket_type = socket_type; socket->key = -1; @@ -52,11 +53,11 @@ socket->dispatch_thread = 0; socket->status = SHM_CONN_CLOSED; socket->mutex = SemUtil::get(IPC_PRIVATE, 1); - logger->debug("shm_open_socket\n"); + return socket; } -static int _shm_close_socket(shm_socket_t *socket) { +int shm_close_socket(shm_socket_t *socket) { int ret; @@ -76,12 +77,12 @@ return ret; } -int shm_close_socket(shm_socket_t *socket) { +// int shm_close_socket(shm_socket_t *socket) { - // _destrory_tmp_recv_socket_((shm_socket_t *)pthread_getspecific(_tmp_recv_socket_key_)); +// // _destrory_tmp_recv_socket_((shm_socket_t *)pthread_getspecific(_tmp_recv_socket_key_)); - return _shm_close_socket(socket);; -} +// return shm_close_socket(socket);; +// } int shm_socket_bind(shm_socket_t *socket, int key) { socket->key = key; @@ -391,11 +392,18 @@ } if (rv) { - void *_buf = malloc(src.size); - memcpy(_buf, src.buf, src.size); - *buf = _buf; - *size = src.size; - *key = src.key; + if(buf != NULL) { + void *_buf = malloc(src.size); + memcpy(_buf, src.buf, src.size); + *buf = _buf; + } + + if(size != NULL) + *size = src.size; + + if(key != NULL) + *key = src.key; + mm_free(src.buf); // printf("shm_recvfrom pop after\n"); return 0; @@ -411,12 +419,13 @@ int rv; if(tmp_socket == NULL) return; + logger->debug("%d destroy tmp socket\n", pthread_self()); - _shm_close_socket((shm_socket_t *)tmp_socket); + shm_close_socket((shm_socket_t *)tmp_socket); rv = pthread_setspecific(_tmp_recv_socket_key_, NULL); if ( rv != 0) { - logger->error(rv, "shm_sendandrecv : pthread_setspecific"); - exit(1); + logger->error(rv, "shm_sendandrecv : pthread_setspecific"); + exit(1); } } @@ -438,7 +447,7 @@ -int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, +int shm_sendandrecv_safe(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout, int flags) { int recv_key; @@ -508,6 +517,12 @@ return -1; } +int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, + const int send_size, const int send_key, void **recv_buf, + int *recv_size, struct timespec *timeout, int flags) { + return shm_sendandrecv_unsafe(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout, flags); +} + // ============================================================================================================ /** diff --git a/test_net_socket/Makefile b/test_net_socket/Makefile index 0d8505f..832a130 100644 --- a/test_net_socket/Makefile +++ b/test_net_socket/Makefile @@ -14,8 +14,7 @@ #-I$(ROOT)/include/usgcommon INCLUDES += -I${ROOT}/src -I${ROOT}/src/queue -I${ROOT}/src/socket -I${ROOT}/src/common/include -I${ROOT}/include/usgcommon - -PROGS = ${DEST}/test_net_mod_socket +PROGS = ${DEST}/test_net_mod_socket ${DEST}/test_bus_stop ${DEST}/heart_beat DEPENDENCES = $(patsubst %, %.d, $(PROGS)) diff --git a/test_socket/dgram_mod_survey.c b/test_net_socket/heart_beat.c similarity index 69% rename from test_socket/dgram_mod_survey.c rename to test_net_socket/heart_beat.c index da3260f..562cb23 100644 --- a/test_socket/dgram_mod_survey.c +++ b/test_net_socket/heart_beat.c @@ -1,6 +1,10 @@ -#include "dgram_mod_socket.h" +#include "net_mod_server_socket_wrapper.h" +#include "net_mod_socket_wrapper.h" +#include "bus_server_socket_wrapper.h" + #include "shm_mm_wraper.h" #include "usg_common.h" +#include <getopt.h> typedef struct Targ { @@ -10,43 +14,50 @@ }Targ; void sigint_handler(int sig) { - //dgram_mod_close_socket(server_socket); + // net_mod_socket_close(server_socket); printf("===Catch sigint======================\n"); shm_mm_wrapper_destroy(); exit(0); } void server(int port) { - void *socket = dgram_mod_open_socket(); - dgram_mod_bind(socket, port); + void *serv = net_mod_socket_open(); + net_mod_socket_bind(serv, port); int size; void *recvbuf; char sendbuf[512]; int rv; int remote_port; while (true) { - if ((rv = dgram_mod_recvfrom_timeout(socket, &recvbuf, &size, &remote_port, 15, 0) ) == 0) { + if ((rv = net_mod_socket_recvfrom(serv, &recvbuf, &size, &remote_port) ) == 0) { printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf); + net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port); free(recvbuf); } } - dgram_mod_close_socket(socket); + net_mod_socket_close(serv); } void client(int port) { - void *socket = dgram_mod_open_socket(); + int rv; + void *client = net_mod_socket_open(); int size; char sendbuf[512]; long i = 0; + net_node_t node_arr[] = {"", 0, 100}; + int node_arr_size = 1; + + int recv_arr_size; + net_mod_recv_msg_t *recv_arr; while (true) { sprintf(sendbuf, "%d", i); printf("SEND HEART:%s\n", sendbuf); - dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, port); + rv = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL); // sleep(1); i++; } - dgram_mod_close_socket(socket); + net_mod_socket_close(client); } @@ -54,20 +65,26 @@ signal(SIGINT, sigint_handler); Targ *targ = (Targ *)arg; int port = targ->port; - void *socket = dgram_mod_open_socket(); + void *socket = net_mod_socket_open(); int size; char sendbuf[512]; long scale = 10; long i = 0; + net_node_t node_arr[] = {"", 0, 100}; + int node_arr_size = 1; + + int recv_arr_size; + net_mod_recv_msg_t *recv_arr; + while (i < scale) { sprintf(sendbuf, "%d", i); printf("%d SEND HEART:%s\n", targ->id, sendbuf); - dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, port); + net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port); sleep(1); i++; } - dgram_mod_close_socket(socket); + net_mod_socket_close(socket); return (void *)i; } diff --git a/test_net_socket/test_bus_stop.c b/test_net_socket/test_bus_stop.c new file mode 100644 index 0000000..ed2f60f --- /dev/null +++ b/test_net_socket/test_bus_stop.c @@ -0,0 +1,54 @@ +#include "net_mod_server_socket_wrapper.h" +#include "net_mod_socket_wrapper.h" +#include "bus_server_socket_wrapper.h" + +#include "shm_mm_wraper.h" +#include "usg_common.h" +#include <getopt.h> + +static void * server_sockt; + +static void *_start_bus_(void *arg) { + // pthread_detach(pthread_self()); + printf("Start bus server\n"); + pthread_t tid; + + server_sockt = bus_server_socket_wrapper_open(); + + if(bus_server_socket_wrapper_start_bus(server_sockt) != 0) { + printf("start bus failed\n"); + } +} + +int main() { + + + pthread_t tid; + char action[512]; + + shm_mm_wrapper_init(512); + pthread_create(&tid, NULL, _start_bus_, NULL); + + + while (true) { + printf("Input action: Close?\n"); + if(scanf("%s", action) < 1) { + printf("Invalide action\n"); + continue; + } + + if(strcmp(action, "close") == 0) { + bus_server_socket_wrapper_close(server_sockt); + break; + } else { + printf("Invalide action\n"); + } + } + + if (pthread_join(tid, NULL) != 0) { + perror(" pthread_join"); + } + + + shm_mm_wrapper_destroy(); +} \ No newline at end of file diff --git a/test_net_socket/test_net_mod_socket.c b/test_net_socket/test_net_mod_socket.c index f509e2c..5693cf6 100644 --- a/test_net_socket/test_net_mod_socket.c +++ b/test_net_socket/test_net_mod_socket.c @@ -137,6 +137,7 @@ sprintf(sendbuf, "RECEIVED PORT %d NAME %s", remote_port, recvbuf); net_mod_socket_sendto(client, sendbuf, strlen(sendbuf) + 1, remote_port); free(recvbuf); + sleep(1000); } } @@ -259,8 +260,9 @@ for (i = 0; i < SCALE; i++) { sprintf(sendbuf, "thread(%d) %d", targ->id, i); fprintf(fp, "requst:%s\n", sendbuf); - n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); - //printf("send %d nodes\n", n); + // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); + n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000); + printf("send %d nodes\n", n); for(j=0; j < recv_arr_size; j++) { fprintf(fp, "reply: host:%s, port: %d, key:%d, content: %s\n", recv_arr[j].host, -- Gitblit v1.8.0