CMakeLists.txt | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/queue/lock_free_queue.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/shm/mm.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/socket/shm_socket.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test_net_socket/net_mod_socket.sh | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test_socket/CMakeLists.txt | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test_socket/heart_beat.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test_socket/heart_beat.sh | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
CMakeLists.txt
@@ -29,6 +29,6 @@ add_subdirectory(${PROJECT_SOURCE_DIR}/src) add_subdirectory(${PROJECT_SOURCE_DIR}/test) add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket) # add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket) add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket) # add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util) endif() src/queue/lock_free_queue.h
@@ -88,9 +88,9 @@ sem_t slots; sem_t items; // time_t createTime; // time_t closeTime; // int status; time_t createTime; time_t closeTime; int status; public: @@ -101,7 +101,8 @@ /// template ~LockFreeQueue(); // inline void close(); inline void close(); inline bool isClosed(); // std::atomic_uint reference; /// @brief constructor of the class @@ -129,17 +130,17 @@ // time_t getCreateTime() { // return createTime; // } time_t getCreateTime() { return createTime; } // time_t getCloseTime() { // return closeTime; // } time_t getCloseTime() { return closeTime; } // int getStatus() { // return status; // } int getStatus() { return status; } /// @brief push an element at the tail of the queue /// @param the element to insert in the queue @@ -182,20 +183,28 @@ if (sem_init(&items, 1, 0) == -1) err_exit(errno, "LockFreeQueue sem_init"); // createTime = time(NULL); // status = LOCK_FREE_Q_ST_OPENED; createTime = time(NULL); status = LOCK_FREE_Q_ST_OPENED; } // template< // typename ELEM_T, // typename Allocator, // template<typename T, typename AT> class Q_TYPE> // inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() { // // status = LOCK_FREE_Q_ST_CLOSED; // // closeTime = time(NULL); // } template< typename ELEM_T, typename Allocator, template<typename T, typename AT> class Q_TYPE> inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() { status = LOCK_FREE_Q_ST_CLOSED; closeTime = time(NULL); } template< typename ELEM_T, typename Allocator, template<typename T, typename AT> class Q_TYPE> inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::isClosed() { return status == LOCK_FREE_Q_ST_CLOSED; } template< src/shm/mm.cpp
@@ -123,8 +123,11 @@ return aptr; } else { SemUtil::inc(mutex); err_msg(0, "mm_malloc : out of memery\n"); LoggerFactory::getLogger()->fatal("mm_malloc : out of memery\n"); // abort(); err_exit(0, "mm_malloc : out of memery\n"); exit(1); return NULL; } src/socket/shm_socket.cpp
@@ -10,7 +10,7 @@ static Logger *logger = LoggerFactory::getLogger(); ShmQueueStMap * shmQueueStMap ; // ShmQueueStMap * shmQueueStMap ; static void print_msg(char *head, shm_packet_t &msg) { // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type); @@ -104,7 +104,7 @@ err_exit(s, "pthread_mutexattr_destroy"); shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); // shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); return sockt; } @@ -113,37 +113,33 @@ static int _shm_socket_close_(shm_socket_t *sockt) { int rv, i; hashtable_t *hashtable = mm_get_hashtable(); logger->debug("shm_socket_close\n"); if(sockt->key != 0) { auto it = shmQueueStMap->find(sockt->key); if(it != shmQueueStMap->end()) { it->second.status = SHM_QUEUE_ST_CLOSED; it->second.closeTime = time(NULL); } } // if(sockt->key != 0) { // auto it = shmQueueStMap->find(sockt->key); // if(it != shmQueueStMap->end()) { // it->second.status = SHM_QUEUE_ST_CLOSED; // it->second.closeTime = time(NULL); // } // } printf("====sockt->queue addr = %p\n", sockt->queue); // printf("====sockt->queue addr = %p\n", sockt->queue); if(sockt->queue != NULL) { sockt->queue->close(); for( i = 0; i < sockt->queue->size(); i++) { mm_free((*(sockt->queue))[i].buf); logger->info("======= %d free queue element buf\n", sockt->key); } sleep(1); // hashtable_remove(hashtable, mkey); hashtable_remove(hashtable, sockt->key); // sockt->queue = NULL; } // hashtable_remove(hashtable, mkey); // if(sockt->queue != NULL) { // sockt->queue = NULL; // } pthread_mutex_destroy(&(sockt->mutex) ); free(sockt); return 0; @@ -578,9 +574,9 @@ } // 标记key对应的状态 ,为opened stRecord.status = SHM_QUEUE_ST_OPENED; stRecord.createTime = time(NULL); shmQueueStMap->insert({sockt->key, stRecord}); // stRecord.status = SHM_QUEUE_ST_OPENED; // stRecord.createTime = time(NULL); // shmQueueStMap->insert({sockt->key, stRecord}); } @@ -597,17 +593,19 @@ } // 检查key标记的状态 auto it = shmQueueStMap->find(key); if(it != shmQueueStMap->end()) { if(it->second.status == SHM_QUEUE_ST_CLOSED) { // key对应的状态是关闭的 goto ERR_CLOSED; } } // auto it = shmQueueStMap->find(key); // if(it != shmQueueStMap->end()) { // if(it->second.status == SHM_QUEUE_ST_CLOSED) { // // key对应的状态是关闭的 // goto ERR_CLOSED; // } // } remoteQueue = shm_socket_attach_queue(key); if (remoteQueue == NULL ) { goto ERR_CLOSED; } else if(remoteQueue->isClosed()) { goto ERR_CLOSED; } @@ -659,9 +657,9 @@ } // 标记key对应的状态 ,为opened stRecord.status = SHM_QUEUE_ST_OPENED; stRecord.createTime = time(NULL); shmQueueStMap->insert({sockt->key, stRecord}); // stRecord.status = SHM_QUEUE_ST_OPENED; // stRecord.createTime = time(NULL); // shmQueueStMap->insert({sockt->key, stRecord}); if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); test_net_socket/net_mod_socket.sh
@@ -11,7 +11,7 @@ ./shm_util recvfrom --bind=102 & server_pid=$! && echo "pid: ${server_pid}" # 打开回队列收进程 ./shm_util start_resycle & server_pid=$! && echo "pid: ${server_pid}" # ./shm_util start_resycle & server_pid=$! && echo "pid: ${server_pid}" } # 交互式客户端 test_socket/CMakeLists.txt
@@ -9,10 +9,13 @@ add_custom_command( OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/heart_beat.sh ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh OUTPUT ${PROJECT_BINARY_DIR}/bin/heart_beat.sh COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/heart_beat.sh ${PROJECT_BINARY_DIR}/bin/heart_beat.sh DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/heart_beat.sh ) add_custom_target("heart_beat.sh" ALL DEPENDS ${PROJECT_BINARY_DIR}/bin/heart_beat.sh) add_executable(heart_beat heart_beat.cpp ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh) target_link_libraries(heart_beat PRIVATE shm_queue ${EXTRA_LIBS} ) test_socket/heart_beat.cpp
@@ -6,7 +6,7 @@ #include "usg_common.h" #include <getopt.h> static Logger *logger = LoggerFactory::getLogger(); typedef struct Targ { int port; int id; @@ -20,24 +20,45 @@ // exit(0); } void *serverSockt; static void server_stop_handler(int sig) { printf("stop_handler\n"); int rv = net_mod_socket_stop(serverSockt); if(rv ==0) { logger->debug("send stop suc"); return; } else { logger->debug("send stop fail.%s\n", bus_strerror(rv)); } } void server(int port) { void *serv = net_mod_socket_open(); net_mod_socket_bind(serv, port); serverSockt = net_mod_socket_open(); net_mod_socket_bind(serverSockt, port); int size; void *recvbuf; char sendbuf[512]; int rv; int remote_port; signal(SIGTERM, server_stop_handler); signal(SIGINT, server_stop_handler); while (true) { if(net_mod_socket_recvfrom_timeout(serv, &recvbuf, &size, &remote_port, 0, 2000000000)==0) { rv = net_mod_socket_recvfrom_timeout(serverSockt, &recvbuf, &size, &remote_port, 0, 2000000000); if(rv == 0 ) { printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf); net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port); net_mod_socket_sendto(serverSockt, "suc", strlen("suc")+1, remote_port); free(recvbuf); } else if(rv == EBUS_STOPED) { logger->debug("Stopping\n"); break; } } // sleep(1000); net_mod_socket_close(serv); net_mod_socket_close(serverSockt); } void client(int port) { @@ -49,14 +70,42 @@ net_node_t node_arr[] = {"", 0, port}; int node_arr_size = 1; int recv_arr_size; int recv_arr_size, n; net_mod_recv_msg_t *recv_arr; net_mod_err_t *errarr; int errarr_size = 0; // int recv_arr_size; // net_mod_recv_msg_t *recv_arr; while (true) { sprintf(sendbuf, "%d", i); rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 2000); rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf), &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000); // rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 2000); //rv = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL); printf("SEND HEART:%s, suc nodes = %d\n", sendbuf, rv); if(recv_arr_size > 0) { for(i=0; i<recv_arr_size; i++) { printf("reply from (host:%s, port: %d, key:%d) >> %s\n", recv_arr[i].host, recv_arr[i].port, recv_arr[i].key, (char *)recv_arr[i].content ); } // 使用完后,不要忘记释放掉 net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); } if(errarr_size > 0) { for(i = 0; i < errarr_size; i++) { printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code)); } free(errarr); } // sleep(1); i++; } @@ -64,66 +113,66 @@ } void *runclient(void *arg) { // signal(SIGINT, sigint_handler); Targ *targ = (Targ *)arg; int port = targ->port; void *client = net_mod_socket_open(); int size; char sendbuf[512]; long scale = 100000; long i = 0; net_node_t node_arr[] = {"", 0, 100}; int node_arr_size = 1; // void *runclient(void *arg) { // // signal(SIGINT, sigint_handler); // Targ *targ = (Targ *)arg; // int port = targ->port; // void *client = net_mod_socket_open(); // int size; // char sendbuf[512]; // long scale = 100000; // long i = 0; // net_node_t node_arr[] = {"", 0, 100}; // int node_arr_size = 1; int recv_arr_size; net_mod_recv_msg_t *recv_arr; // int recv_arr_size; // net_mod_recv_msg_t *recv_arr; while (i < scale) { sprintf(sendbuf, "%d", i); printf("%d SEND HEART:%s\n", targ->id, sendbuf); net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL); // net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port); i++; } // while (i < scale) { // sprintf(sendbuf, "%d", i); // printf("%d SEND HEART:%s\n", targ->id, sendbuf); // net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL); // // net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port); // i++; // } net_mod_socket_close(client); return (void *)i; } // net_mod_socket_close(client); // return (void *)i; // } void mclient(int port) { // void mclient(int port) { int status, i = 0, processors = 4; void *res[processors]; Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); pthread_t tids[processors]; char sendbuf[512]; // int status, i = 0, processors = 4; // void *res[processors]; // Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); // pthread_t tids[processors]; // char sendbuf[512]; struct timeval start; gettimeofday(&start, NULL); for (i = 0; i < processors; i++) { targs[i].port = port; targs[i].id = i; pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]); } // struct timeval start; // gettimeofday(&start, NULL); // for (i = 0; i < processors; i++) { // targs[i].port = port; // targs[i].id = i; // pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]); // } for (i = 0; i < processors; i++) { if (pthread_join(tids[i], &res[i]) != 0) { perror("multyThreadClient pthread_join"); } else { fprintf(stderr, "client(%d) 发送 %ld 条数据\n", i, (long)res[i]); } } // for (i = 0; i < processors; i++) { // if (pthread_join(tids[i], &res[i]) != 0) { // perror("multyThreadClient pthread_join"); // } else { // fprintf(stderr, "client(%d) 发送 %ld 条数据\n", i, (long)res[i]); // } // } struct timeval end; gettimeofday(&end, NULL); // struct timeval end; // gettimeofday(&end, NULL); double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); long diffsec = (long) (difftime/1000000); long diffmsec = difftime - diffsec*1000000; printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec); } // double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); // long diffsec = (long) (difftime/1000000); // long diffmsec = difftime - diffsec*1000000; // printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec); // } int main(int argc, char *argv[]) { shm_mm_wrapper_init(512); @@ -139,8 +188,6 @@ server(port); else if (strcmp("client", argv[1]) == 0) client(port); else if (strcmp("mclient", argv[1]) == 0) mclient(port); shm_mm_wrapper_destroy(); return 0; test_socket/heart_beat.sh
@@ -50,6 +50,16 @@ close ;; "test2") start_server sleep 1 start_clients sleep 5 kill -15 server_pid sleep 2 close_clients ;; "") start_server sleep 1