From dd0714b75b2e29087e3cd1184995bf38a453d833 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 01 二月 2021 17:36:13 +0800 Subject: [PATCH] update --- src/socket/shm_mod_socket.h | 8 /dev/null | 107 --------------------- lib/libusgcommon.a | 0 src/net/net_mod_socket_wrapper.cpp | 21 ++- test_net_socket/test_net_mod_socket.cpp | 43 ++++++-- src/logger_factory.cpp | 7 + test_net_socket/net_mod_socket.sh | 12 +- src/socket/shm_mod_socket.cpp | 8 lib/libusgcommon.so | 0 src/socket/shm_socket.cpp | 56 ++++------- 10 files changed, 84 insertions(+), 178 deletions(-) diff --git a/lib/libusgcommon.a b/lib/libusgcommon.a index 9d121d2..4bc7434 100644 --- a/lib/libusgcommon.a +++ b/lib/libusgcommon.a Binary files differ diff --git a/lib/libusgcommon.so b/lib/libusgcommon.so index 99bdece..9471178 100644 --- a/lib/libusgcommon.so +++ b/lib/libusgcommon.so Binary files differ diff --git a/src/logger_factory.cpp b/src/logger_factory.cpp index 48cd04b..df0015c 100644 --- a/src/logger_factory.cpp +++ b/src/logger_factory.cpp @@ -11,8 +11,11 @@ LoggerConfig config; config.level = Logger::DEBUG; - - config.logFile = "/tmp/bhome_bus.log"; + + const char *logFileFormat= "/tmp/bhome_bus.%ld.log"; + char logFile[128]; + sprintf(logFile, logFileFormat, getpid()); + config.logFile = logFile; #ifdef BUILD_Debug config.console = 1; diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp index 683c543..be44751 100644 --- a/src/net/net_mod_socket_wrapper.cpp +++ b/src/net/net_mod_socket_wrapper.cpp @@ -53,14 +53,15 @@ int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){ NetModSocket *sockt = (NetModSocket *)_socket; logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key); - return sockt->sendto_timeout(buf, size, key, sec, nsec); - // return sockt->sendto(buf, size, key); + // return sockt->sendto_timeout(buf, size, key, sec, nsec); + return sockt->sendto(buf, size, key); } // 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){ NetModSocket *sockt = (NetModSocket *)_socket; logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key); - return sockt->sendto_nowait(buf, size, key); + return sockt->sendto(buf, size, key); + // return sockt->sendto_nowait(buf, size, key); } /** @@ -80,12 +81,13 @@ // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){ NetModSocket *sockt = (NetModSocket *)_socket; - // return sockt->recvfrom(buf, size, key); - return sockt->recvfrom_timeout(buf, size, key, sec, nsec); + return sockt->recvfrom(buf, size, key); + // return sockt->recvfrom_timeout(buf, size, key, sec, nsec); } int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){ NetModSocket *sockt = (NetModSocket *)_socket; - return sockt->recvfrom_nowait(buf, size, key); + return sockt->recvfrom(buf, size, key); + // return sockt->recvfrom_nowait(buf, size, key); } int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, @@ -100,14 +102,15 @@ int net_mod_socket_sendandrecv_timeout(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout){ NetModSocket *sockt = (NetModSocket *)_socket; - // return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); - return sockt->sendandrecv_timeout(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, timeout); + return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); + // return sockt->sendandrecv_timeout(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, timeout); } int net_mod_socket_sendandrecv_nowait(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { NetModSocket *sockt = (NetModSocket *)_socket; - return sockt->sendandrecv_nowait(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); + return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); + // return sockt->sendandrecv_nowait(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); } diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp index 760c421..02b91f0 100644 --- a/src/socket/shm_mod_socket.cpp +++ b/src/socket/shm_mod_socket.cpp @@ -102,7 +102,7 @@ * @size 涓婚闀垮害 * @key 鎬荤嚎绔彛 */ -int ShmModSocket::sub(char *topic, int topic_size, int key, +int ShmModSocket::sub(const char *topic, int topic_size, int key, const struct timespec *timeout, int flags) { int ret; bus_head_t head = {}; @@ -133,7 +133,7 @@ * @size 涓婚闀垮害 * @key 鎬荤嚎绔彛 */ -int ShmModSocket::desub(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) { +int ShmModSocket::desub(const char *topic, int topic_size, int key, const struct timespec *timeout, int flags) { // char buf[8192]; int ret; if(topic == NULL) { @@ -171,7 +171,7 @@ * @content 涓婚鍐呭 * @key 鎬荤嚎绔彛 */ -int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) { +int ShmModSocket::pub(const char *topic, int topic_size, const void *content, int content_size, int key, const struct timespec *timeout, int flags) { int ret; bus_head_t head = {}; memcpy(head.action, "pub", sizeof(head.action)); @@ -204,7 +204,7 @@ // ============================================================================= int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head, - void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) { + const void *topic_buf, int topic_size, const void *content_buf, int content_size, void **retbuf) { int buf_size; char *buf; diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h index f5441ce..2fa43d7 100644 --- a/src/socket/shm_mod_socket.h +++ b/src/socket/shm_mod_socket.h @@ -33,7 +33,7 @@ - static int get_bus_sendbuf(bus_head_t &request_head, void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf); + static int get_bus_sendbuf(bus_head_t &request_head, const void *topic_buf, int topic_size, const void *content_buf, int content_size, void **retbuf); public: static size_t remove_keys(int keys[], size_t length); @@ -98,7 +98,7 @@ * @key 鎬荤嚎绔彛 * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG */ - int sub(char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0); + int sub(const char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0); /** @@ -108,7 +108,7 @@ * @key 鎬荤嚎绔彛 * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG */ - int desub(char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0); + int desub(const char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0); /** * 鍙戝竷涓婚 @@ -117,7 +117,7 @@ * @key 鎬荤嚎绔彛 * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG */ - int pub(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout = NULL, int flag = 0); + int pub(const char *topic, int topic_size, const void *content, int content_size, int key, const struct timespec *timeout = NULL, int flag = 0); /** diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index 6781380..f5ae328 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -62,7 +62,6 @@ static LockFreeQueue<shm_msg_t> * shm_socket_attach_queue(int key) { LockFreeQueue<shm_msg_t> * queue; hashtable_t *hashtable = mm_get_hashtable(); - // hashtable_lock(hashtable); void *tmp_ptr = hashtable_get(hashtable, key); if (tmp_ptr == NULL || tmp_ptr == (void *)1) { //logger->error("shm_socket._remote_queue_attach锛歝onnet at key %d failed!", key); @@ -169,12 +168,6 @@ int s; int rv; - if (sockt->socket_type != SHM_SOCKET_DGRAM) { - logger->error( "shm_socket.shm_sendto: Can't invoke shm_sendto method in a %d type socket which is " - "not a SHM_SOCKET_DGRAM socket ", - sockt->socket_type); - exit(0); - } hashtable_t *hashtable = mm_get_hashtable(); @@ -235,38 +228,33 @@ // 鐭繛鎺ユ柟寮忔帴鍙� -int shm_recvfrom(shm_socket_t *sokt, void **buf, int *size, int *key, const struct timespec *timeout, int flag) { +int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key, const struct timespec *timeout, int flag) { int s; int rv; - if (sokt->socket_type != SHM_SOCKET_DGRAM) { - logger->error("shm_socket.shm_recvfrom: Can't invoke shm_recvfrom method in a %d type socket which " - "is not a SHM_SOCKET_DGRAM socket ", - sokt->socket_type); - exit(1); - } + hashtable_t *hashtable = mm_get_hashtable(); - if ((s = pthread_mutex_lock(&(sokt->mutex))) != 0) + if ((s = pthread_mutex_lock(&(sockt->mutex))) != 0) err_exit(s, "shm_recvfrom : pthread_mutex_lock"); - if (sokt->queue == NULL) { - if (sokt->key == 0) { - sokt->key = hashtable_alloc_key(hashtable); + if (sockt->queue == NULL) { + if (sockt->key == 0) { + sockt->key = hashtable_alloc_key(hashtable); } - sokt->queue = shm_socket_bind_queue( sokt->key, sokt->force_bind); - if(sokt->queue == NULL ) { - logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sokt->key); + sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); + if(sockt->queue == NULL ) { + logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); return EBUS_KEY_INUSED; } } - if ((s = pthread_mutex_unlock(&(sokt->mutex))) != 0) + if ((s = pthread_mutex_unlock(&(sockt->mutex))) != 0) err_exit(s, "shm_recvfrom : pthread_mutex_unlock"); shm_msg_t src; - rv = sokt->queue->pop(src, timeout, flag); + rv = sockt->queue->pop(src, timeout, flag); if (rv == 0) { if(buf != NULL) { @@ -330,7 +318,7 @@ // use thread local -int _shm_sendandrecv_thread_local(shm_socket_t *socket, const void *send_buf, +int _shm_sendandrecv_thread_local(shm_socket_t *sockt, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout, int flags) { int recv_key; @@ -338,13 +326,6 @@ // 鐢╰hread local 淇濊瘉姣忎釜绾跨▼鐢ㄤ竴涓嫭鍗犵殑socket鎺ュ彈瀵规柟杩斿洖鐨勪俊鎭� shm_socket_t *tmp_socket; - - if (socket->socket_type != SHM_SOCKET_DGRAM) { - logger->error( "shm_socket.shm_sendandrecv: Can't invoke shm_sendandrecv method in a %d type socket " - "which is not a SHM_SOCKET_DGRAM socket ", - socket->socket_type); - exit(1); - } rv = pthread_once(&_once_, _create_socket_key_perthread); @@ -370,14 +351,19 @@ if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) { rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); if(rv != 0) { - printf("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); + logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); } else if(rv == 0 ) { + logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); + + if(recv_key == shm_socket_get_key(sockt)) { + logger->debug("=====鏀跺埌浜嗚嚜宸卞彂缁欒嚜宸辩殑娑堟伅\n"); + } assert( send_key == recv_key); if(send_key != recv_key) { - err_exit(0, "_shm_sendandrecv_thread_local: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); + logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); + exit(1); } - } return rv; } else { @@ -434,7 +420,7 @@ const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout, int flags) { - return _shm_sendandrecv_alloc_new(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout, flags); + return _shm_sendandrecv_thread_local(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout, flags); } diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh index 930cd5d..52eb54e 100755 --- a/test_net_socket/net_mod_socket.sh +++ b/test_net_socket/net_mod_socket.sh @@ -26,18 +26,20 @@ } -# 鏃犻檺寰幆send +# one_to_many send function one_to_many() { ./test_net_mod_socket --fun="one_sendto_many" \ --sendlist=" :5000:100, :5000:101, :5000:102" } -# 澶氱嚎绋媠end -function msend() { - ./test_net_mod_socket --fun="test_net_sendandrecv_threads" \ - --sendlist="localhost:5000:100, localhost:5000:100" + +# +function send() { + ./test_net_mod_socket --fun="test_net_sendandrecv" \ + --sendlist=" :5000:100, :5000:101, :5000:102" } + # 鏃犻檺寰幆 pub function pub() { ./test_net_mod_socket --fun="test_net_pub" \ diff --git a/test_net_socket/one_sendto_many.cpp b/test_net_socket/one_sendto_many.cpp deleted file mode 100644 index bc233d1..0000000 --- a/test_net_socket/one_sendto_many.cpp +++ /dev/null @@ -1,107 +0,0 @@ -#include <assert.h> -#include "net_mod_server_socket_wrapper.h" -#include "net_mod_socket_wrapper.h" -#include "bus_server_socket_wrapper.h" - -#include "shm_mm_wrapper.h" -#include "usg_common.h" -#include <getopt.h> -#include "logger_factory.h" - -void *_run_sendandrecv_(void *arg) { - Targ *targ = (Targ *)arg; - char sendbuf[128]; - - int j, n; - int recv_arr_size; - net_mod_recv_msg_t *recv_arr; - int total = 0; - - - net_node_t *node_arr; - int node_arr_size = parse_node_list(targ->nodelist, &node_arr); - - long rtid; - unsigned int l = 0 , rl; - const char *hello_format = "%ld say Hello %d"; - - - char filename[512]; - sprintf(filename, "test%d.tmp", targ->id); - FILE *fp = NULL; - fp = fopen(filename, "w+"); - // fp = stdout; - - int recvsize; - void *recvbuf; - for (l = 0; l < SCALE; l++) { - sprintf(sendbuf, hello_format, targ->id, l); - // fprintf(fp, "requst:%s\n", sendbuf); - // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); - n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000); - printf("%d: send %d nodes\n", l, n); - for(j=0; j < recv_arr_size; j++) { - - fprintf(fp, "%ld send '%s'. received '%s' from (host:%s, port: %d, key:%d) \n", - targ->id, - sendbuf, - recv_arr[j].content, - recv_arr[j].host, - recv_arr[j].port, - recv_arr[j].key - - ); - - assert(sscanf((const char *)recv_arr[j].content, hello_format, &rtid, &rl) == 2); - assert(rtid == targ->id); - assert(rl == l); - } - // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 - net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); - total += n; - } - // fclose(fp); - // net_mod_socket_close(client); - return (void *)total; -} - -//澶氱嚎绋媠end -void test_net_sendandrecv_threads(char *nodelist) { - - int status, i = 0, processors = 4; - void *res[processors]; - // Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); - Targ targs[processors]; - pthread_t tids[processors]; - char sendbuf[512]; - struct timeval start, end; - long total = 0; - - client = net_mod_socket_open(); - - printf("寮�濮嬫祴璇�...\n"); - gettimeofday(&start, NULL); - for (i = 0; i < processors; i++) { - targs[i].nodelist = nodelist; - targs[i].id = i; - pthread_create(&tids[i], NULL, _run_sendandrecv_, (void *)&targs[i]); - } - - for (i = 0; i < processors; i++) { - if (pthread_join(tids[i], &res[i]) != 0) { - perror("multyThreadClient pthread_join"); - } else { - total += (long)res[i]; - //fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]); - } - } - - gettimeofday(&end, NULL); - - double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); - long diffsec = (long) (difftime/1000000); - long diffusec = difftime - diffsec*1000000; - fprintf(stderr,"鍙戦�佹暟鐩�: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", total, diffsec, diffusec, difftime/total ); - // fflush(stdout); - -} \ No newline at end of file diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp index ca5fa3a..79f102a 100644 --- a/test_net_socket/test_net_mod_socket.cpp +++ b/test_net_socket/test_net_mod_socket.cpp @@ -81,7 +81,7 @@ int key; int rv; while ((rv = net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) ) == 0) { - printf("鏀跺埌璁㈤槄娑堟伅:%s\n", recvbuf); + printf("鏀跺埌璁㈤槄娑堟伅:%s\n", (char *)recvbuf); free(recvbuf); } @@ -144,7 +144,7 @@ int remote_port; while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &remote_port) ) == 0) { // printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf); - sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), recvbuf); + sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf); net_mod_socket_sendto(ser, sendbuf, strlen(sendbuf) + 1, remote_port); free(recvbuf); } @@ -201,7 +201,7 @@ recv_arr[i].host, recv_arr[i].port, recv_arr[i].key, - recv_arr[i].content + (char *)recv_arr[i].content ); } @@ -283,7 +283,7 @@ recv_arr[j].host, recv_arr[j].port, recv_arr[j].key, - recv_arr[j].content + (char *)recv_arr[j].content ); printf("key == %d\n", net_mod_socket_get_key(client)); @@ -359,34 +359,53 @@ net_node_t *node_arr; int node_arr_size = parse_node_list(nodelist, &node_arr); char buf[128]; - pid_t pid, rpid ; - unsigned int l , rl; - const char *hello_format = "%ld say Hello %u "; + pid_t pid, retPid ; + unsigned int l , retl; + int remoteKey; + const char *hello_format = "%d say Hello %u "; + const char *reply_format = "%d RECEIVED %d say Hello %d"; pid = getpid(); l = 0; client = net_mod_socket_open(); while(true) { - sprintf(buf, hello_format, (long)pid, l); + sprintf(buf, hello_format, pid, l); n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1, &recv_arr, &recv_arr_size, 1000); printf(" %d nodes reply\n", n); for(j = 0; j < recv_arr_size; j++) { - LoggerFactory::getLogger()->debug("%ld send '%s'. received '%s' from (host:%s, port: %d, key:%d) \n", + printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n", (long)pid, buf, - recv_arr[j].content, + (char *)recv_arr[j].content, recv_arr[j].host, recv_arr[j].port, recv_arr[j].key ); - // assert(sscanf((const char *)recv_arr[j].content, hello_format, &rpid, &rl) == 2); - // assert(rpid == pid); + // printf( "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n", + // net_mod_socket_get_key(client), + // sendbuf, + // targ->node->key, + // recv_arr[j].host, + // recv_arr[j].port, + // recv_arr[j].key, + // recv_arr[j].content + // ); + + + // assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3); + // assert(targ->node->key == rkey); + // assert(net_mod_socket_get_key(client) == lkey); // assert(rl == l); + + assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3); + assert(retPid == pid); + assert(retl == l); + assert(remoteKey == recv_arr[j].key); } // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 -- Gitblit v1.8.0