From 7a12bed7a2550d037e6e869c1ed0ce115098dbb2 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期六, 13 三月 2021 18:44:51 +0800 Subject: [PATCH] update --- test_socket/CMakeLists.txt | 7 test_socket/heart_beat.cpp | 167 +++++++++++++++++---------- src/queue/lock_free_queue.h | 55 +++++--- src/shm/mm.cpp | 5 test_net_socket/net_mod_socket.sh | 2 test_socket/heart_beat.sh | 10 + CMakeLists.txt | 2 src/socket/shm_socket.cpp | 60 ++++----- 8 files changed, 189 insertions(+), 119 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1ad483f..0b653e7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -29,6 +29,6 @@ add_subdirectory(${PROJECT_SOURCE_DIR}/src) add_subdirectory(${PROJECT_SOURCE_DIR}/test) add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket) -# add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket) + add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket) # add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util) endif() diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h index 31f2cc1..f74f4bc 100644 --- a/src/queue/lock_free_queue.h +++ b/src/queue/lock_free_queue.h @@ -88,9 +88,9 @@ sem_t slots; sem_t items; - // time_t createTime; - // time_t closeTime; - // int status; + time_t createTime; + time_t closeTime; + int status; public: @@ -101,7 +101,8 @@ /// template ~LockFreeQueue(); - // inline void close(); + inline void close(); + inline bool isClosed(); // std::atomic_uint reference; /// @brief constructor of the class @@ -129,17 +130,17 @@ - // time_t getCreateTime() { - // return createTime; - // } + time_t getCreateTime() { + return createTime; + } - // time_t getCloseTime() { - // return closeTime; - // } + time_t getCloseTime() { + return closeTime; + } - // int getStatus() { - // return status; - // } + int getStatus() { + return status; + } /// @brief push an element at the tail of the queue /// @param the element to insert in the queue @@ -182,20 +183,28 @@ if (sem_init(&items, 1, 0) == -1) err_exit(errno, "LockFreeQueue sem_init"); - // createTime = time(NULL); - // status = LOCK_FREE_Q_ST_OPENED; + createTime = time(NULL); + status = LOCK_FREE_Q_ST_OPENED; } -// template< -// typename ELEM_T, -// typename Allocator, -// template<typename T, typename AT> class Q_TYPE> -// inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() { -// // status = LOCK_FREE_Q_ST_CLOSED; -// // closeTime = time(NULL); -// } +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() { + status = LOCK_FREE_Q_ST_CLOSED; + closeTime = time(NULL); +} + +template< + typename ELEM_T, + typename Allocator, + template<typename T, typename AT> class Q_TYPE> +inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::isClosed() { + return status == LOCK_FREE_Q_ST_CLOSED; +} template< diff --git a/src/shm/mm.cpp b/src/shm/mm.cpp index 8fff5c2..3cdd3a2 100644 --- a/src/shm/mm.cpp +++ b/src/shm/mm.cpp @@ -123,8 +123,11 @@ return aptr; } else { SemUtil::inc(mutex); + err_msg(0, "mm_malloc : out of memery\n"); + LoggerFactory::getLogger()->fatal("mm_malloc : out of memery\n"); // abort(); - err_exit(0, "mm_malloc : out of memery\n"); + exit(1); + return NULL; } diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index 1f5d47b..b5d4d09 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -10,7 +10,7 @@ static Logger *logger = LoggerFactory::getLogger(); -ShmQueueStMap * shmQueueStMap ; +// ShmQueueStMap * shmQueueStMap ; static void print_msg(char *head, shm_packet_t &msg) { // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type); @@ -104,7 +104,7 @@ err_exit(s, "pthread_mutexattr_destroy"); - shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); + // shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); return sockt; } @@ -113,37 +113,33 @@ static int _shm_socket_close_(shm_socket_t *sockt) { int rv, i; + hashtable_t *hashtable = mm_get_hashtable(); logger->debug("shm_socket_close\n"); - if(sockt->key != 0) { - auto it = shmQueueStMap->find(sockt->key); - if(it != shmQueueStMap->end()) { - it->second.status = SHM_QUEUE_ST_CLOSED; - it->second.closeTime = time(NULL); - } - } + // if(sockt->key != 0) { + // auto it = shmQueueStMap->find(sockt->key); + // if(it != shmQueueStMap->end()) { + // it->second.status = SHM_QUEUE_ST_CLOSED; + // it->second.closeTime = time(NULL); + // } + // } - printf("====sockt->queue addr = %p\n", sockt->queue); + // printf("====sockt->queue addr = %p\n", sockt->queue); if(sockt->queue != NULL) { + sockt->queue->close(); for( i = 0; i < sockt->queue->size(); i++) { mm_free((*(sockt->queue))[i].buf); logger->info("======= %d free queue element buf\n", sockt->key); } + sleep(1); - // hashtable_remove(hashtable, mkey); + hashtable_remove(hashtable, sockt->key); // sockt->queue = NULL; } - - // hashtable_remove(hashtable, mkey); - // if(sockt->queue != NULL) { - // sockt->queue = NULL; - // } - - pthread_mutex_destroy(&(sockt->mutex) ); free(sockt); return 0; @@ -578,9 +574,9 @@ } // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened - stRecord.status = SHM_QUEUE_ST_OPENED; - stRecord.createTime = time(NULL); - shmQueueStMap->insert({sockt->key, stRecord}); + // stRecord.status = SHM_QUEUE_ST_OPENED; + // stRecord.createTime = time(NULL); + // shmQueueStMap->insert({sockt->key, stRecord}); } @@ -597,17 +593,19 @@ } // 妫�鏌ey鏍囪鐨勭姸鎬� - auto it = shmQueueStMap->find(key); - if(it != shmQueueStMap->end()) { - if(it->second.status == SHM_QUEUE_ST_CLOSED) { - // key瀵瑰簲鐨勭姸鎬佹槸鍏抽棴鐨� - goto ERR_CLOSED; - } - } + // auto it = shmQueueStMap->find(key); + // if(it != shmQueueStMap->end()) { + // if(it->second.status == SHM_QUEUE_ST_CLOSED) { + // // key瀵瑰簲鐨勭姸鎬佹槸鍏抽棴鐨� + // goto ERR_CLOSED; + // } + // } remoteQueue = shm_socket_attach_queue(key); if (remoteQueue == NULL ) { + goto ERR_CLOSED; + } else if(remoteQueue->isClosed()) { goto ERR_CLOSED; } @@ -659,9 +657,9 @@ } // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened - stRecord.status = SHM_QUEUE_ST_OPENED; - stRecord.createTime = time(NULL); - shmQueueStMap->insert({sockt->key, stRecord}); + // stRecord.status = SHM_QUEUE_ST_OPENED; + // stRecord.createTime = time(NULL); + // shmQueueStMap->insert({sockt->key, stRecord}); if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh index d8cf241..d79752e 100755 --- a/test_net_socket/net_mod_socket.sh +++ b/test_net_socket/net_mod_socket.sh @@ -11,7 +11,7 @@ ./shm_util recvfrom --bind=102 & server_pid=$! && echo "pid: ${server_pid}" # 鎵撳紑鍥為槦鍒楁敹杩涚▼ - ./shm_util start_resycle & server_pid=$! && echo "pid: ${server_pid}" + # ./shm_util start_resycle & server_pid=$! && echo "pid: ${server_pid}" } # 浜や簰寮忓鎴风 diff --git a/test_socket/CMakeLists.txt b/test_socket/CMakeLists.txt index 111569d..18bcf4c 100644 --- a/test_socket/CMakeLists.txt +++ b/test_socket/CMakeLists.txt @@ -9,10 +9,13 @@ add_custom_command( - OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh - COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/heart_beat.sh ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh + OUTPUT ${PROJECT_BINARY_DIR}/bin/heart_beat.sh + COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/heart_beat.sh ${PROJECT_BINARY_DIR}/bin/heart_beat.sh DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/heart_beat.sh ) + +add_custom_target("heart_beat.sh" ALL DEPENDS ${PROJECT_BINARY_DIR}/bin/heart_beat.sh) + add_executable(heart_beat heart_beat.cpp ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh) target_link_libraries(heart_beat PRIVATE shm_queue ${EXTRA_LIBS} ) diff --git a/test_socket/heart_beat.cpp b/test_socket/heart_beat.cpp index cd21a84..021ca3d 100644 --- a/test_socket/heart_beat.cpp +++ b/test_socket/heart_beat.cpp @@ -6,7 +6,7 @@ #include "usg_common.h" #include <getopt.h> - +static Logger *logger = LoggerFactory::getLogger(); typedef struct Targ { int port; int id; @@ -20,24 +20,45 @@ // exit(0); } + +void *serverSockt; +static void server_stop_handler(int sig) { + printf("stop_handler\n"); + + int rv = net_mod_socket_stop(serverSockt); + if(rv ==0) { + logger->debug("send stop suc"); + return; + } else { + logger->debug("send stop fail.%s\n", bus_strerror(rv)); + } +} + void server(int port) { - void *serv = net_mod_socket_open(); - net_mod_socket_bind(serv, port); + serverSockt = net_mod_socket_open(); + net_mod_socket_bind(serverSockt, port); int size; void *recvbuf; char sendbuf[512]; int rv; int remote_port; + + signal(SIGTERM, server_stop_handler); + signal(SIGINT, server_stop_handler); while (true) { - if(net_mod_socket_recvfrom_timeout(serv, &recvbuf, &size, &remote_port, 0, 2000000000)==0) { + rv = net_mod_socket_recvfrom_timeout(serverSockt, &recvbuf, &size, &remote_port, 0, 2000000000); + if(rv == 0 ) { printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf); - net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port); + net_mod_socket_sendto(serverSockt, "suc", strlen("suc")+1, remote_port); free(recvbuf); + } else if(rv == EBUS_STOPED) { + logger->debug("Stopping\n"); + break; } } // sleep(1000); - net_mod_socket_close(serv); + net_mod_socket_close(serverSockt); } void client(int port) { @@ -49,14 +70,42 @@ net_node_t node_arr[] = {"", 0, port}; int node_arr_size = 1; - int recv_arr_size; + int recv_arr_size, n; net_mod_recv_msg_t *recv_arr; + net_mod_err_t *errarr; + int errarr_size = 0; + + // int recv_arr_size; + // net_mod_recv_msg_t *recv_arr; while (true) { sprintf(sendbuf, "%d", i); - rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 2000); + rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf), + &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000); + // rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 2000); //rv = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL); printf("SEND HEART:%s, suc nodes = %d\n", sendbuf, rv); + if(recv_arr_size > 0) { + for(i=0; i<recv_arr_size; i++) { + printf("reply from (host:%s, port: %d, key:%d) >> %s\n", + recv_arr[i].host, + recv_arr[i].port, + recv_arr[i].key, + (char *)recv_arr[i].content + ); + } + + // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀 + net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); + } + + + if(errarr_size > 0) { + for(i = 0; i < errarr_size; i++) { + printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code)); + } + free(errarr); + } // sleep(1); i++; } @@ -64,66 +113,66 @@ } -void *runclient(void *arg) { - // signal(SIGINT, sigint_handler); - Targ *targ = (Targ *)arg; - int port = targ->port; - void *client = net_mod_socket_open(); - int size; - char sendbuf[512]; - long scale = 100000; - long i = 0; - net_node_t node_arr[] = {"", 0, 100}; - int node_arr_size = 1; +// void *runclient(void *arg) { +// // signal(SIGINT, sigint_handler); +// Targ *targ = (Targ *)arg; +// int port = targ->port; +// void *client = net_mod_socket_open(); +// int size; +// char sendbuf[512]; +// long scale = 100000; +// long i = 0; +// net_node_t node_arr[] = {"", 0, 100}; +// int node_arr_size = 1; - int recv_arr_size; - net_mod_recv_msg_t *recv_arr; +// int recv_arr_size; +// net_mod_recv_msg_t *recv_arr; - while (i < scale) { - sprintf(sendbuf, "%d", i); - printf("%d SEND HEART:%s\n", targ->id, sendbuf); - net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL); - // net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port); - i++; - } +// while (i < scale) { +// sprintf(sendbuf, "%d", i); +// printf("%d SEND HEART:%s\n", targ->id, sendbuf); +// net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL); +// // net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port); +// i++; +// } - net_mod_socket_close(client); - return (void *)i; -} +// net_mod_socket_close(client); +// return (void *)i; +// } -void mclient(int port) { +// void mclient(int port) { - int status, i = 0, processors = 4; - void *res[processors]; - Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); - pthread_t tids[processors]; - char sendbuf[512]; +// int status, i = 0, processors = 4; +// void *res[processors]; +// Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); +// pthread_t tids[processors]; +// char sendbuf[512]; - struct timeval start; - gettimeofday(&start, NULL); - for (i = 0; i < processors; i++) { - targs[i].port = port; - targs[i].id = i; - pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]); - } +// struct timeval start; +// gettimeofday(&start, NULL); +// for (i = 0; i < processors; i++) { +// targs[i].port = port; +// targs[i].id = i; +// pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]); +// } - for (i = 0; i < processors; i++) { - if (pthread_join(tids[i], &res[i]) != 0) { - perror("multyThreadClient pthread_join"); - } else { - fprintf(stderr, "client(%d) 鍙戦�� %ld 鏉℃暟鎹甛n", i, (long)res[i]); - } - } +// for (i = 0; i < processors; i++) { +// if (pthread_join(tids[i], &res[i]) != 0) { +// perror("multyThreadClient pthread_join"); +// } else { +// fprintf(stderr, "client(%d) 鍙戦�� %ld 鏉℃暟鎹甛n", i, (long)res[i]); +// } +// } - struct timeval end; - gettimeofday(&end, NULL); +// struct timeval end; +// gettimeofday(&end, NULL); - double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); - long diffsec = (long) (difftime/1000000); - long diffmsec = difftime - diffsec*1000000; - printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec); -} +// double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); +// long diffsec = (long) (difftime/1000000); +// long diffmsec = difftime - diffsec*1000000; +// printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec); +// } int main(int argc, char *argv[]) { shm_mm_wrapper_init(512); @@ -139,8 +188,6 @@ server(port); else if (strcmp("client", argv[1]) == 0) client(port); - else if (strcmp("mclient", argv[1]) == 0) - mclient(port); shm_mm_wrapper_destroy(); return 0; diff --git a/test_socket/heart_beat.sh b/test_socket/heart_beat.sh index 92b0867..a284cec 100755 --- a/test_socket/heart_beat.sh +++ b/test_socket/heart_beat.sh @@ -50,6 +50,16 @@ close ;; + "test2") + start_server + sleep 1 + start_clients + sleep 5 + kill -15 server_pid + sleep 2 + close_clients + ;; + "") start_server sleep 1 -- Gitblit v1.8.0