From 607ac3ae8bfc017e10a7907e69dcbc3ab2a0fb63 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 05 二月 2021 13:54:56 +0800 Subject: [PATCH] add stop method --- test/UDPServer.cpp | 70 ++++++++++ src/socket/bus_server_socket_wrapper.cpp | 4 test_net_socket/test_net_mod_socket.cpp | 39 +++++ src/socket/shm_socket.h | 6 src/bus_error.cpp | 3 test/UDPClient.cpp | 50 +++++++ src/net/net_mod_socket.cpp | 4 src/socket/shm_socket.cpp | 37 +++- src/socket/shm_mod_socket.h | 1 test_net_socket/test_bus_stop.cpp | 11 + src/socket/bus_server_socket.cpp | 46 +++--- src/socket/bus_server_socket_wrapper.h | 5 src/net/net_mod_socket_wrapper.cpp | 5 src/net/net_mod_socket_wrapper.h | 6 src/net/net_mod_socket.h | 2 src/psem.cpp | 42 +++--- test_net_socket/net_mod_socket.sh | 5 src/CMakeLists.txt | 3 src/bus_error.h | 1 src/socket/bus_server_socket.h | 3 src/socket/shm_mod_socket.cpp | 5 test/CMakeLists.txt | 15 ++ 22 files changed, 294 insertions(+), 69 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a96920e..18b555d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -29,7 +29,8 @@ ) -add_library(shm_queue SHARED ${_SOURCES_}) +# add_library(shm_queue SHARED ${_SOURCES_}) +add_library(shm_queue STATIC ${_SOURCES_}) target_include_directories(shm_queue PUBLIC ${EXTRA_INCLUDES} ) diff --git a/src/bus_error.cpp b/src/bus_error.cpp index 2c410eb..a9348e0 100644 --- a/src/bus_error.cpp +++ b/src/bus_error.cpp @@ -18,7 +18,8 @@ "Key already in use", "Network fault", "Send to self error", - "Receive from wrong end" + "Receive from wrong end", + "Service stoped" }; diff --git a/src/bus_error.h b/src/bus_error.h index 7677545..fa9f352 100644 --- a/src/bus_error.h +++ b/src/bus_error.h @@ -11,6 +11,7 @@ #define EBUS_NET 504 #define EBUS_SENDTO_SELF 505 #define EBUS_RECVFROM_WRONG_END 506 +#define EBUS_STOPED 507 extern int bus_errno; diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp index 8c1465d..ef817f0 100644 --- a/src/net/net_mod_socket.cpp +++ b/src/net/net_mod_socket.cpp @@ -49,6 +49,10 @@ } +int NetModSocket::stop() { + return shmModSocket.stop(); +} + /** * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 diff --git a/src/net/net_mod_socket.h b/src/net/net_mod_socket.h index 11e0ebe..9358311 100644 --- a/src/net/net_mod_socket.h +++ b/src/net/net_mod_socket.h @@ -98,7 +98,7 @@ NetModSocket(); ~NetModSocket(); - + int stop(); /** * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp index e72297a..c074e15 100644 --- a/src/net/net_mod_socket_wrapper.cpp +++ b/src/net/net_mod_socket_wrapper.cpp @@ -20,6 +20,11 @@ delete sockt; } +int net_mod_socket_stop(void *_socket) { + NetModSocket *sockt = (NetModSocket *)_socket; + return sockt->stop(); +} + /** * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 diff --git a/src/net/net_mod_socket_wrapper.h b/src/net/net_mod_socket_wrapper.h index 79cb7d7..e29fe5b 100644 --- a/src/net/net_mod_socket_wrapper.h +++ b/src/net/net_mod_socket_wrapper.h @@ -29,12 +29,16 @@ */ void * net_mod_socket_open(); + /** * @brief 鍏抽棴 net_mod_socket */ void net_mod_socket_close(void *_sockt); - +/** + * @brief 鍋滄 net_mod_socket + */ +int net_mod_socket_stop(void *_sockt); /** * @brief 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓� diff --git a/src/psem.cpp b/src/psem.cpp index fd06216..2ace11f 100644 --- a/src/psem.cpp +++ b/src/psem.cpp @@ -6,31 +6,31 @@ int psem_timedwait(sem_t *sem, const struct timespec *ts) { struct timespec abs_timeout = TimeUtil::calc_abs_time(ts); - - int rv ; - while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) { - if(errno == EINTR) - continue; - else { - // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); - return -1; - } - } - return 0; + return sem_timedwait(sem, &abs_timeout); + // int rv ; + // while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) { + // if(errno == EINTR) + // continue; + // else { + // // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); + // return -1; + // } + // } + // return 0; } int psem_wait(sem_t *sem) { - int rv; - while ( (rv = sem_wait(sem)) == -1) { - if(errno == EINTR) - continue; - else { - // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); - return -1; - } - } - return 0; + return sem_wait(sem); + // int rv; + // while ( (rv = sem_wait(sem)) == -1) { + // if(errno == EINTR) + // continue; + // else { + // return -1; + // } + // } + // return 0; } int psem_trywait(sem_t *sem) { diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index 3904297..b49eeb8 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -58,25 +58,7 @@ } BusServerSocket::~BusServerSocket() { - SHMKeySet *subscripter_set; - SHMTopicSubMap::iterator map_iter; - - stop(); - - if(topic_sub_map != NULL) { - for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { - subscripter_set = map_iter->second; - if(subscripter_set != NULL) { - subscripter_set->clear(); - mm_free((void *)subscripter_set); - } - - } - topic_sub_map->clear(); - mem_pool_free_by_key(SHM_BUS_MAP_KEY); - } - shm_close_socket(shm_socket); - logger->debug("BusServerSocket destory 3"); + destroy(); } @@ -111,8 +93,6 @@ if( shm_socket->key <= 0) { return -1; } - // snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER); - // return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0); bus_head_t head = {}; memcpy(head.action, "stop", sizeof(head.action)); head.topic_size = 0; @@ -122,13 +102,33 @@ void *buf; int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL, 0, &buf); if(size > 0) { - ret = client.sendandrecv( buf, size, shm_socket->key, NULL, NULL); + ret = client.sendto( buf, size, shm_socket->key); free(buf); return ret; } else { return -1; } +} + +int BusServerSocket::destroy() { + SHMKeySet *subscripter_set; + SHMTopicSubMap::iterator map_iter; + if(topic_sub_map != NULL) { + for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + subscripter_set = map_iter->second; + if(subscripter_set != NULL) { + subscripter_set->clear(); + mm_free((void *)subscripter_set); + } + + } + topic_sub_map->clear(); + mem_pool_free_by_key(SHM_BUS_MAP_KEY); + } + shm_close_socket(shm_socket); + logger->debug("BusServerSocket destory 3"); + return 0; } /* @@ -280,8 +280,6 @@ free(buf); } - - shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key); return NULL; } diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h index 7a6c829..bdfb172 100644 --- a/src/socket/bus_server_socket.h +++ b/src/socket/bus_server_socket.h @@ -27,11 +27,12 @@ SHMTopicSubMap *topic_sub_map; private: + int destroy(); void _proxy_sub( char *topic, int key); void _proxy_pub( char *topic, void *buf, size_t size, int key); void *_run_proxy_(); // int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ); - + void _proxy_desub( char *topic, int key); void _proxy_desub_all(int key); diff --git a/src/socket/bus_server_socket_wrapper.cpp b/src/socket/bus_server_socket_wrapper.cpp index a4f148c..d2f5a8e 100644 --- a/src/socket/bus_server_socket_wrapper.cpp +++ b/src/socket/bus_server_socket_wrapper.cpp @@ -22,6 +22,10 @@ logger->debug("===bus_server_socket_wrapper_close\n"); } +int bus_server_socket_wrapper_stop(void *_socket) { + BusServerSocket *sockt = (BusServerSocket *)_socket; + return sockt->stop(); +} /** * 鍚姩bus * diff --git a/src/socket/bus_server_socket_wrapper.h b/src/socket/bus_server_socket_wrapper.h index 2a7c59d..06a060e 100644 --- a/src/socket/bus_server_socket_wrapper.h +++ b/src/socket/bus_server_socket_wrapper.h @@ -29,6 +29,11 @@ void bus_server_socket_wrapper_close(void *_sockt); /** + * @brief 鍋滄 bus_server_socket + */ +int bus_server_socket_wrapper_stop(void *_socket); + +/** * @brief 鍚姩bus * * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp index f5bd0b7..dc071cb 100644 --- a/src/socket/shm_mod_socket.cpp +++ b/src/socket/shm_mod_socket.cpp @@ -26,6 +26,11 @@ shm_close_socket(shm_socket); } +int ShmModSocket::stop() { + return shm_socket_stop(shm_socket); +} + + int ShmModSocket::bind(int key) { return shm_socket_bind(shm_socket, key); } diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h index fb389a3..44a714b 100644 --- a/src/socket/shm_mod_socket.h +++ b/src/socket/shm_mod_socket.h @@ -47,6 +47,7 @@ ShmModSocket(); ~ShmModSocket(); + int stop(); /** * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index 8fef2f7..fc410bf 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -112,23 +112,31 @@ int shm_close_socket(shm_socket_t *sockt) { - int s; - + int rv; logger->debug("shm_close_socket\n"); if(sockt->queue != NULL) { delete sockt->queue; sockt->queue = NULL; } - s = pthread_mutex_destroy(&(sockt->mutex) ); - if(s != 0) { - err_exit(s, "shm_close_socket"); + rv = pthread_mutex_destroy(&(sockt->mutex) ); + if(rv != 0) { + err_exit(rv, "shm_close_socket"); } free(sockt); return 0; } + +int shm_socket_stop(shm_socket_t *sockt) { + struct timespec timeout = {5, 0}; + shm_packet_t sendpak = {0}; + sendpak.key = sockt->key; + sendpak.action = BUS_ACTION_STOP; + sendpak.size = 0; + return shm_sendpakto(sockt, &sendpak, sockt->key, &timeout, BUS_TIMEOUT_FLAG); +} int shm_socket_bind(shm_socket_t *sockt, int key) { sockt->key = key; @@ -175,6 +183,7 @@ shm_packet_t sendpak; shm_packet_t recvpak; std::map<std::string, shm_packet_t>::iterator recvbufIter; + std::string uuid = sole::uuid4().str(); sendpak.key = sockt->key; @@ -507,7 +516,7 @@ LABEL_PUSH: - if (key == sockt->key) { + if (sendpak->action != BUS_ACTION_STOP && key == sockt->key) { logger->error( "can not send to your self!"); return EBUS_SENDTO_SELF; } @@ -527,10 +536,11 @@ } // 鐭繛鎺ユ柟寮忔帴鍙� -static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak , const struct timespec *timeout, int flag) { +static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout, int flag) { int rv; hashtable_t *hashtable = mm_get_hashtable(); + shm_packet_t recvpak; if( sockt->queue != NULL) goto LABEL_POP; @@ -557,11 +567,18 @@ LABEL_POP: + // + // printf("%p start recv.....\n", sockt); - printf("%p start recv.....\n", sockt); - - rv = sockt->queue->pop(*recvpak, timeout, flag); + rv = sockt->queue->pop(recvpak, timeout, flag); + if(rv != 0) + return rv; + + if(recvpak.action == BUS_ACTION_STOP) { + return EBUS_STOPED; + } + *_recvpak = recvpak; return rv; } // int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf, diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h index 03ee831..e7c0dda 100644 --- a/src/socket/shm_socket.h +++ b/src/socket/shm_socket.h @@ -14,13 +14,15 @@ }; - + +#define BUS_ACTION_STOP 1 typedef struct shm_packet_t { int key; size_t size; void * buf; char uuid[64]; + int action; } shm_packet_t; @@ -50,6 +52,8 @@ int shm_close_socket(shm_socket_t * socket) ; +int shm_socket_stop(shm_socket_t *sockt); + int shm_socket_bind(shm_socket_t * socket, int key) ; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0b001cc..31bfd83 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -49,4 +49,19 @@ +add_executable(UDPServer UDPServer.cpp ) +target_link_libraries(UDPServer PRIVATE ${EXTRA_LIBS} ) +target_include_directories(UDPServer PRIVATE + "${PROJECT_BINARY_DIR}" + ${EXTRA_INCLUDES} + ) + + +add_executable(UDPClient UDPClient.cpp ) +target_link_libraries(UDPClient PRIVATE ${EXTRA_LIBS} ) +target_include_directories(UDPClient PRIVATE + "${PROJECT_BINARY_DIR}" + ${EXTRA_INCLUDES} + ) + diff --git a/test/UDPClient.cpp b/test/UDPClient.cpp new file mode 100644 index 0000000..d54753b --- /dev/null +++ b/test/UDPClient.cpp @@ -0,0 +1,50 @@ +// Client side implementation of UDP client-server model +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> +#include <netinet/in.h> + +#define PORT 8080 +#define MAXLINE 1024 + +// Driver code +int main() { + int sockfd; + char buffer[MAXLINE]; + char *hello = "Hello from client"; + struct sockaddr_in servaddr; + + // Creating socket file descriptor + if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) { + perror("socket creation failed"); + exit(EXIT_FAILURE); + } + + memset(&servaddr, 0, sizeof(servaddr)); + + // Filling server information + servaddr.sin_family = AF_INET; + servaddr.sin_port = htons(PORT); + servaddr.sin_addr.s_addr = INADDR_ANY; + + int n; + socklen_t len; + + sendto(sockfd, (const char *)hello, strlen(hello), + MSG_CONFIRM, (const struct sockaddr *) &servaddr, + sizeof(servaddr)); + printf("Hello message sent.\n"); + + n = recvfrom(sockfd, (char *)buffer, MAXLINE, + MSG_WAITALL, (struct sockaddr *) &servaddr, + &len); + buffer[n] = '\0'; + printf("Server : %s\n", buffer); + + close(sockfd); + return 0; +} \ No newline at end of file diff --git a/test/UDPServer.cpp b/test/UDPServer.cpp new file mode 100644 index 0000000..4f998d7 --- /dev/null +++ b/test/UDPServer.cpp @@ -0,0 +1,70 @@ +// Server side implementation of UDP client-server model +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> +#include <netinet/in.h> +#include <signal.h> + +#define PORT 8080 +#define MAXLINE 1024 + +bool stop = false; + +static void stop_handler(int sig) { + printf("stop_handler\n"); + + stop = true; +} +// Driver code +int main() { + int sockfd; + char buffer[MAXLINE]; + char *hello = "Hello from server"; + struct sockaddr_in servaddr, cliaddr; + signal(SIGINT, stop_handler); + // Creating socket file descriptor + if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) { + perror("socket creation failed"); + exit(EXIT_FAILURE); + } + + memset(&servaddr, 0, sizeof(servaddr)); + memset(&cliaddr, 0, sizeof(cliaddr)); + + // Filling server information + servaddr.sin_family = AF_INET; // IPv4 + servaddr.sin_addr.s_addr = INADDR_ANY; + servaddr.sin_port = htons(PORT); + + // Bind the socket with the server address + if ( bind(sockfd, (const struct sockaddr *)&servaddr, + sizeof(servaddr)) < 0 ) + { + perror("bind failed"); + exit(EXIT_FAILURE); + } + + int n; + + socklen_t len = sizeof(cliaddr); //len is value/resuslt + while(!stop) { + n = recvfrom(sockfd, (char *)buffer, MAXLINE, + MSG_WAITALL, ( struct sockaddr *) &cliaddr, + &len); + buffer[n] = '\0'; + printf("Client : %s\n", buffer); + sendto(sockfd, (const char *)hello, strlen(hello), + MSG_CONFIRM, (const struct sockaddr *) &cliaddr, + len); + printf("Hello message sent.\n"); + } + + printf("===stopted.\n"); + + + return 0; +} diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh index 52eb54e..288f192 100755 --- a/test_net_socket/net_mod_socket.sh +++ b/test_net_socket/net_mod_socket.sh @@ -53,6 +53,11 @@ } +function stop() { + ps -ef | grep -e "test_net_mod_socket" -e "heart_beat"| awk '{print $2}' | xargs -i kill -15 {} + +} + function close() { ps -ef | grep -e "test_net_mod_socket" -e "heart_beat"| awk '{print $2}' | xargs -i kill -9 {} ipcrm -a diff --git a/test_net_socket/test_bus_stop.cpp b/test_net_socket/test_bus_stop.cpp index 3185029..d40f0d7 100644 --- a/test_net_socket/test_bus_stop.cpp +++ b/test_net_socket/test_bus_stop.cpp @@ -8,8 +8,8 @@ static void * server_sockt; -static void sigint_handler(int sig) { - bus_server_socket_wrapper_close(server_sockt); +static void stop_bus_handler(int sig) { + bus_server_socket_wrapper_stop(server_sockt); } static void *_start_bus_(void *arg) { @@ -20,14 +20,17 @@ if(bus_server_socket_wrapper_start_bus(server_sockt) != 0) { printf("start bus failed\n"); } - printf("============_start_bus_ end\n" ); + + bus_server_socket_wrapper_close(server_sockt); + printf("============bus stopted\n" ); } int main() { pthread_t tid; char action[512]; - signal(SIGINT, sigint_handler); + signal(SIGINT, stop_bus_handler); + signal(SIGTERM, stop_bus_handler); shm_mm_wrapper_init(512); server_sockt = bus_server_socket_wrapper_open(); pthread_create(&tid, NULL, _start_bus_, NULL); diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp index 2ebbd06..49b6251 100644 --- a/test_net_socket/test_net_mod_socket.cpp +++ b/test_net_socket/test_net_mod_socket.cpp @@ -10,6 +10,8 @@ #define SCALE 100000 +static Logger *logger = LoggerFactory::getLogger(); + typedef struct Targ { net_node_t *node; char *nodelist; @@ -133,6 +135,7 @@ void *serverSockt; + static void _recvandsend_callback_(void *recvbuf, int recvsize, int key, void **sendbuf_ptr, int *sendsize_ptr, void * user_data) { char sendbuf[512]; printf( "server: RECEIVED REQUEST FROM %d : %s\n", key, (char *)recvbuf); @@ -145,16 +148,44 @@ return; } +bool stop = false; + +static void stop_replyserver_handler(int sig) { + printf("stop_handler\n"); + + int rv = net_mod_socket_stop(serverSockt); + if(rv ==0) { + logger->debug("send stop suc"); + return; + } else { + logger->debug("send stop fail.%s\n", bus_strerror(rv)); + } +} void start_reply(int mkey) { - printf("start reply\n"); + logger->debug("start reply\n"); + signal(SIGINT, stop_replyserver_handler); + signal(SIGTERM, stop_replyserver_handler); + serverSockt = net_mod_socket_open(); net_mod_socket_bind(serverSockt, mkey); - int rv; - while(true) { - rv = net_mod_socket_recvandsend_timeout(serverSockt, _recvandsend_callback_ , 0, 2000000, NULL ); + int rv = 0 ; + while( true) { + rv = net_mod_socket_recvandsend(serverSockt, _recvandsend_callback_ , NULL ); + if (rv == 0) + continue; + if(rv == EBUS_STOPED) { + logger->debug("Stopping\n"); + break; + } + logger->debug("net_mod_socket_recvandsend error.%s\n", bus_strerror(rv)); + } + + //rv = net_mod_socket_recvandsend_timeout(serverSockt, _recvandsend_callback_ , 0, 2000000, NULL ); + net_mod_socket_close(serverSockt); + logger->debug("stopted\n"); // while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &key) ) == 0) { // // printf( "server: RECEIVED REQUEST FROM %d NAME %s\n", key, recvbuf); // sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf); -- Gitblit v1.8.0