| | |
| | | ) |
| | | |
| | | |
| | | add_library(shm_queue SHARED ${_SOURCES_}) |
| | | # add_library(shm_queue SHARED ${_SOURCES_}) |
| | | add_library(shm_queue STATIC ${_SOURCES_}) |
| | | |
| | | target_include_directories(shm_queue PUBLIC ${EXTRA_INCLUDES} ) |
| | | |
| | |
| | | "Key already in use", |
| | | "Network fault", |
| | | "Send to self error", |
| | | "Receive from wrong end" |
| | | "Receive from wrong end", |
| | | "Service stoped" |
| | | |
| | | }; |
| | | |
| | |
| | | #define EBUS_NET 504 |
| | | #define EBUS_SENDTO_SELF 505 |
| | | #define EBUS_RECVFROM_WRONG_END 506 |
| | | #define EBUS_STOPED 507 |
| | | |
| | | extern int bus_errno; |
| | | |
| | |
| | | } |
| | | |
| | | |
| | | int NetModSocket::stop() { |
| | | return shmModSocket.stop(); |
| | | } |
| | | |
| | | /** |
| | | * 绑定端口到socket, 如果不绑定则系统自动分配一个 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | |
| | | NetModSocket(); |
| | | ~NetModSocket(); |
| | | |
| | | |
| | | int stop(); |
| | | /** |
| | | * 绑定端口到socket, 如果不绑定则系统自动分配一个 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | |
| | | delete sockt; |
| | | } |
| | | |
| | | int net_mod_socket_stop(void *_socket) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->stop(); |
| | | } |
| | | |
| | | /** |
| | | * 绑定端口到socket, 如果不绑定则系统自动分配一个 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | |
| | | */ |
| | | void * net_mod_socket_open(); |
| | | |
| | | |
| | | /** |
| | | * @brief 关闭 net_mod_socket |
| | | */ |
| | | void net_mod_socket_close(void *_sockt); |
| | | |
| | | |
| | | /** |
| | | * @brief 停止 net_mod_socket |
| | | */ |
| | | int net_mod_socket_stop(void *_sockt); |
| | | |
| | | /** |
| | | * @brief 绑定端口到socket, 如果不绑定则系统自动分配一个 |
| | |
| | | |
| | | int psem_timedwait(sem_t *sem, const struct timespec *ts) { |
| | | struct timespec abs_timeout = TimeUtil::calc_abs_time(ts); |
| | | |
| | | int rv ; |
| | | while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) { |
| | | if(errno == EINTR) |
| | | continue; |
| | | else { |
| | | // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); |
| | | return -1; |
| | | } |
| | | } |
| | | return 0; |
| | | return sem_timedwait(sem, &abs_timeout); |
| | | // int rv ; |
| | | // while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) { |
| | | // if(errno == EINTR) |
| | | // continue; |
| | | // else { |
| | | // // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); |
| | | // return -1; |
| | | // } |
| | | // } |
| | | // return 0; |
| | | } |
| | | |
| | | |
| | | int psem_wait(sem_t *sem) { |
| | | int rv; |
| | | while ( (rv = sem_wait(sem)) == -1) { |
| | | if(errno == EINTR) |
| | | continue; |
| | | else { |
| | | // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout"); |
| | | return -1; |
| | | } |
| | | } |
| | | return 0; |
| | | return sem_wait(sem); |
| | | // int rv; |
| | | // while ( (rv = sem_wait(sem)) == -1) { |
| | | // if(errno == EINTR) |
| | | // continue; |
| | | // else { |
| | | // return -1; |
| | | // } |
| | | // } |
| | | // return 0; |
| | | } |
| | | |
| | | int psem_trywait(sem_t *sem) { |
| | |
| | | } |
| | | |
| | | BusServerSocket::~BusServerSocket() { |
| | | SHMKeySet *subscripter_set; |
| | | SHMTopicSubMap::iterator map_iter; |
| | | |
| | | stop(); |
| | | |
| | | if(topic_sub_map != NULL) { |
| | | for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { |
| | | subscripter_set = map_iter->second; |
| | | if(subscripter_set != NULL) { |
| | | subscripter_set->clear(); |
| | | mm_free((void *)subscripter_set); |
| | | } |
| | | |
| | | } |
| | | topic_sub_map->clear(); |
| | | mem_pool_free_by_key(SHM_BUS_MAP_KEY); |
| | | } |
| | | shm_close_socket(shm_socket); |
| | | logger->debug("BusServerSocket destory 3"); |
| | | destroy(); |
| | | } |
| | | |
| | | |
| | |
| | | if( shm_socket->key <= 0) { |
| | | return -1; |
| | | } |
| | | // snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER); |
| | | // return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0); |
| | | bus_head_t head = {}; |
| | | memcpy(head.action, "stop", sizeof(head.action)); |
| | | head.topic_size = 0; |
| | |
| | | void *buf; |
| | | int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL, 0, &buf); |
| | | if(size > 0) { |
| | | ret = client.sendandrecv( buf, size, shm_socket->key, NULL, NULL); |
| | | ret = client.sendto( buf, size, shm_socket->key); |
| | | free(buf); |
| | | return ret; |
| | | } else { |
| | | return -1; |
| | | } |
| | | |
| | | } |
| | | |
| | | int BusServerSocket::destroy() { |
| | | SHMKeySet *subscripter_set; |
| | | SHMTopicSubMap::iterator map_iter; |
| | | if(topic_sub_map != NULL) { |
| | | for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { |
| | | subscripter_set = map_iter->second; |
| | | if(subscripter_set != NULL) { |
| | | subscripter_set->clear(); |
| | | mm_free((void *)subscripter_set); |
| | | } |
| | | |
| | | } |
| | | topic_sub_map->clear(); |
| | | mem_pool_free_by_key(SHM_BUS_MAP_KEY); |
| | | } |
| | | shm_close_socket(shm_socket); |
| | | logger->debug("BusServerSocket destory 3"); |
| | | return 0; |
| | | } |
| | | |
| | | /* |
| | |
| | | free(buf); |
| | | } |
| | | |
| | | |
| | | shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key); |
| | | |
| | | return NULL; |
| | | } |
| | |
| | | SHMTopicSubMap *topic_sub_map; |
| | | |
| | | private: |
| | | int destroy(); |
| | | void _proxy_sub( char *topic, int key); |
| | | void _proxy_pub( char *topic, void *buf, size_t size, int key); |
| | | void *_run_proxy_(); |
| | | // int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ); |
| | | |
| | | |
| | | void _proxy_desub( char *topic, int key); |
| | | void _proxy_desub_all(int key); |
| | | |
| | |
| | | logger->debug("===bus_server_socket_wrapper_close\n"); |
| | | } |
| | | |
| | | int bus_server_socket_wrapper_stop(void *_socket) { |
| | | BusServerSocket *sockt = (BusServerSocket *)_socket; |
| | | return sockt->stop(); |
| | | } |
| | | /** |
| | | * 启动bus |
| | | * |
| | |
| | | void bus_server_socket_wrapper_close(void *_sockt); |
| | | |
| | | /** |
| | | * @brief 停止 bus_server_socket |
| | | */ |
| | | int bus_server_socket_wrapper_stop(void *_socket); |
| | | |
| | | /** |
| | | * @brief 启动bus |
| | | * |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | |
| | | shm_close_socket(shm_socket); |
| | | } |
| | | |
| | | int ShmModSocket::stop() { |
| | | return shm_socket_stop(shm_socket); |
| | | } |
| | | |
| | | |
| | | int ShmModSocket::bind(int key) { |
| | | return shm_socket_bind(shm_socket, key); |
| | | } |
| | |
| | | ShmModSocket(); |
| | | ~ShmModSocket(); |
| | | |
| | | int stop(); |
| | | /** |
| | | * 绑定端口到socket, 如果不绑定则系统自动分配一个 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | |
| | | |
| | | int shm_close_socket(shm_socket_t *sockt) { |
| | | |
| | | int s; |
| | | |
| | | int rv; |
| | | logger->debug("shm_close_socket\n"); |
| | | if(sockt->queue != NULL) { |
| | | delete sockt->queue; |
| | | sockt->queue = NULL; |
| | | } |
| | | |
| | | s = pthread_mutex_destroy(&(sockt->mutex) ); |
| | | if(s != 0) { |
| | | err_exit(s, "shm_close_socket"); |
| | | rv = pthread_mutex_destroy(&(sockt->mutex) ); |
| | | if(rv != 0) { |
| | | err_exit(rv, "shm_close_socket"); |
| | | } |
| | | |
| | | free(sockt); |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | int shm_socket_stop(shm_socket_t *sockt) { |
| | | struct timespec timeout = {5, 0}; |
| | | shm_packet_t sendpak = {0}; |
| | | sendpak.key = sockt->key; |
| | | sendpak.action = BUS_ACTION_STOP; |
| | | sendpak.size = 0; |
| | | return shm_sendpakto(sockt, &sendpak, sockt->key, &timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | |
| | | int shm_socket_bind(shm_socket_t *sockt, int key) { |
| | | sockt->key = key; |
| | |
| | | shm_packet_t sendpak; |
| | | shm_packet_t recvpak; |
| | | std::map<std::string, shm_packet_t>::iterator recvbufIter; |
| | | |
| | | std::string uuid = sole::uuid4().str(); |
| | | |
| | | sendpak.key = sockt->key; |
| | |
| | | |
| | | |
| | | LABEL_PUSH: |
| | | if (key == sockt->key) { |
| | | if (sendpak->action != BUS_ACTION_STOP && key == sockt->key) { |
| | | logger->error( "can not send to your self!"); |
| | | return EBUS_SENDTO_SELF; |
| | | } |
| | |
| | | } |
| | | |
| | | // 短连接方式接受 |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak , const struct timespec *timeout, int flag) { |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout, int flag) { |
| | | int rv; |
| | | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | shm_packet_t recvpak; |
| | | |
| | | if( sockt->queue != NULL) |
| | | goto LABEL_POP; |
| | |
| | | |
| | | LABEL_POP: |
| | | |
| | | // |
| | | // printf("%p start recv.....\n", sockt); |
| | | |
| | | printf("%p start recv.....\n", sockt); |
| | | |
| | | rv = sockt->queue->pop(*recvpak, timeout, flag); |
| | | rv = sockt->queue->pop(recvpak, timeout, flag); |
| | | if(rv != 0) |
| | | return rv; |
| | | |
| | | |
| | | if(recvpak.action == BUS_ACTION_STOP) { |
| | | return EBUS_STOPED; |
| | | } |
| | | *_recvpak = recvpak; |
| | | return rv; |
| | | } |
| | | // int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf, |
| | |
| | | |
| | | }; |
| | | |
| | | |
| | | |
| | | #define BUS_ACTION_STOP 1 |
| | | |
| | | typedef struct shm_packet_t { |
| | | int key; |
| | | size_t size; |
| | | void * buf; |
| | | char uuid[64]; |
| | | int action; |
| | | |
| | | } shm_packet_t; |
| | | |
| | |
| | | |
| | | int shm_close_socket(shm_socket_t * socket) ; |
| | | |
| | | int shm_socket_stop(shm_socket_t *sockt); |
| | | |
| | | |
| | | int shm_socket_bind(shm_socket_t * socket, int key) ; |
| | | |
| | |
| | | |
| | | |
| | | |
| | | add_executable(UDPServer UDPServer.cpp ) |
| | | target_link_libraries(UDPServer PRIVATE ${EXTRA_LIBS} ) |
| | | target_include_directories(UDPServer PRIVATE |
| | | "${PROJECT_BINARY_DIR}" |
| | | ${EXTRA_INCLUDES} |
| | | ) |
| | | |
| | | |
| | | add_executable(UDPClient UDPClient.cpp ) |
| | | target_link_libraries(UDPClient PRIVATE ${EXTRA_LIBS} ) |
| | | target_include_directories(UDPClient PRIVATE |
| | | "${PROJECT_BINARY_DIR}" |
| | | ${EXTRA_INCLUDES} |
| | | ) |
| | | |
| | | |
New file |
| | |
| | | // Client side implementation of UDP client-server model |
| | | #include <stdio.h> |
| | | #include <stdlib.h> |
| | | #include <unistd.h> |
| | | #include <string.h> |
| | | #include <sys/types.h> |
| | | #include <sys/socket.h> |
| | | #include <arpa/inet.h> |
| | | #include <netinet/in.h> |
| | | |
| | | #define PORT 8080 |
| | | #define MAXLINE 1024 |
| | | |
| | | // Driver code |
| | | int main() { |
| | | int sockfd; |
| | | char buffer[MAXLINE]; |
| | | char *hello = "Hello from client"; |
| | | struct sockaddr_in servaddr; |
| | | |
| | | // Creating socket file descriptor |
| | | if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) { |
| | | perror("socket creation failed"); |
| | | exit(EXIT_FAILURE); |
| | | } |
| | | |
| | | memset(&servaddr, 0, sizeof(servaddr)); |
| | | |
| | | // Filling server information |
| | | servaddr.sin_family = AF_INET; |
| | | servaddr.sin_port = htons(PORT); |
| | | servaddr.sin_addr.s_addr = INADDR_ANY; |
| | | |
| | | int n; |
| | | socklen_t len; |
| | | |
| | | sendto(sockfd, (const char *)hello, strlen(hello), |
| | | MSG_CONFIRM, (const struct sockaddr *) &servaddr, |
| | | sizeof(servaddr)); |
| | | printf("Hello message sent.\n"); |
| | | |
| | | n = recvfrom(sockfd, (char *)buffer, MAXLINE, |
| | | MSG_WAITALL, (struct sockaddr *) &servaddr, |
| | | &len); |
| | | buffer[n] = '\0'; |
| | | printf("Server : %s\n", buffer); |
| | | |
| | | close(sockfd); |
| | | return 0; |
| | | } |
New file |
| | |
| | | // Server side implementation of UDP client-server model |
| | | #include <stdio.h> |
| | | #include <stdlib.h> |
| | | #include <unistd.h> |
| | | #include <string.h> |
| | | #include <sys/types.h> |
| | | #include <sys/socket.h> |
| | | #include <arpa/inet.h> |
| | | #include <netinet/in.h> |
| | | #include <signal.h> |
| | | |
| | | #define PORT 8080 |
| | | #define MAXLINE 1024 |
| | | |
| | | bool stop = false; |
| | | |
| | | static void stop_handler(int sig) { |
| | | printf("stop_handler\n"); |
| | | |
| | | stop = true; |
| | | } |
| | | // Driver code |
| | | int main() { |
| | | int sockfd; |
| | | char buffer[MAXLINE]; |
| | | char *hello = "Hello from server"; |
| | | struct sockaddr_in servaddr, cliaddr; |
| | | signal(SIGINT, stop_handler); |
| | | // Creating socket file descriptor |
| | | if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) { |
| | | perror("socket creation failed"); |
| | | exit(EXIT_FAILURE); |
| | | } |
| | | |
| | | memset(&servaddr, 0, sizeof(servaddr)); |
| | | memset(&cliaddr, 0, sizeof(cliaddr)); |
| | | |
| | | // Filling server information |
| | | servaddr.sin_family = AF_INET; // IPv4 |
| | | servaddr.sin_addr.s_addr = INADDR_ANY; |
| | | servaddr.sin_port = htons(PORT); |
| | | |
| | | // Bind the socket with the server address |
| | | if ( bind(sockfd, (const struct sockaddr *)&servaddr, |
| | | sizeof(servaddr)) < 0 ) |
| | | { |
| | | perror("bind failed"); |
| | | exit(EXIT_FAILURE); |
| | | } |
| | | |
| | | int n; |
| | | |
| | | socklen_t len = sizeof(cliaddr); //len is value/resuslt |
| | | while(!stop) { |
| | | n = recvfrom(sockfd, (char *)buffer, MAXLINE, |
| | | MSG_WAITALL, ( struct sockaddr *) &cliaddr, |
| | | &len); |
| | | buffer[n] = '\0'; |
| | | printf("Client : %s\n", buffer); |
| | | sendto(sockfd, (const char *)hello, strlen(hello), |
| | | MSG_CONFIRM, (const struct sockaddr *) &cliaddr, |
| | | len); |
| | | printf("Hello message sent.\n"); |
| | | } |
| | | |
| | | printf("===stopted.\n"); |
| | | |
| | | |
| | | return 0; |
| | | } |
| | |
| | | |
| | | } |
| | | |
| | | function stop() { |
| | | ps -ef | grep -e "test_net_mod_socket" -e "heart_beat"| awk '{print $2}' | xargs -i kill -15 {} |
| | | |
| | | } |
| | | |
| | | function close() { |
| | | ps -ef | grep -e "test_net_mod_socket" -e "heart_beat"| awk '{print $2}' | xargs -i kill -9 {} |
| | | ipcrm -a |
| | |
| | | |
| | | static void * server_sockt; |
| | | |
| | | static void sigint_handler(int sig) { |
| | | bus_server_socket_wrapper_close(server_sockt); |
| | | static void stop_bus_handler(int sig) { |
| | | bus_server_socket_wrapper_stop(server_sockt); |
| | | } |
| | | |
| | | static void *_start_bus_(void *arg) { |
| | |
| | | if(bus_server_socket_wrapper_start_bus(server_sockt) != 0) { |
| | | printf("start bus failed\n"); |
| | | } |
| | | printf("============_start_bus_ end\n" ); |
| | | |
| | | bus_server_socket_wrapper_close(server_sockt); |
| | | printf("============bus stopted\n" ); |
| | | } |
| | | |
| | | |
| | | int main() { |
| | | pthread_t tid; |
| | | char action[512]; |
| | | signal(SIGINT, sigint_handler); |
| | | signal(SIGINT, stop_bus_handler); |
| | | signal(SIGTERM, stop_bus_handler); |
| | | shm_mm_wrapper_init(512); |
| | | server_sockt = bus_server_socket_wrapper_open(); |
| | | pthread_create(&tid, NULL, _start_bus_, NULL); |
| | |
| | | |
| | | #define SCALE 100000 |
| | | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | | typedef struct Targ { |
| | | net_node_t *node; |
| | | char *nodelist; |
| | |
| | | |
| | | void *serverSockt; |
| | | |
| | | |
| | | static void _recvandsend_callback_(void *recvbuf, int recvsize, int key, void **sendbuf_ptr, int *sendsize_ptr, void * user_data) { |
| | | char sendbuf[512]; |
| | | printf( "server: RECEIVED REQUEST FROM %d : %s\n", key, (char *)recvbuf); |
| | |
| | | return; |
| | | } |
| | | |
| | | bool stop = false; |
| | | |
| | | static void stop_replyserver_handler(int sig) { |
| | | printf("stop_handler\n"); |
| | | |
| | | int rv = net_mod_socket_stop(serverSockt); |
| | | if(rv ==0) { |
| | | logger->debug("send stop suc"); |
| | | return; |
| | | } else { |
| | | logger->debug("send stop fail.%s\n", bus_strerror(rv)); |
| | | } |
| | | } |
| | | |
| | | void start_reply(int mkey) { |
| | | printf("start reply\n"); |
| | | logger->debug("start reply\n"); |
| | | signal(SIGINT, stop_replyserver_handler); |
| | | signal(SIGTERM, stop_replyserver_handler); |
| | | |
| | | serverSockt = net_mod_socket_open(); |
| | | net_mod_socket_bind(serverSockt, mkey); |
| | | |
| | | int rv; |
| | | while(true) { |
| | | rv = net_mod_socket_recvandsend_timeout(serverSockt, _recvandsend_callback_ , 0, 2000000, NULL ); |
| | | int rv = 0 ; |
| | | while( true) { |
| | | rv = net_mod_socket_recvandsend(serverSockt, _recvandsend_callback_ , NULL ); |
| | | if (rv == 0) |
| | | continue; |
| | | if(rv == EBUS_STOPED) { |
| | | logger->debug("Stopping\n"); |
| | | break; |
| | | } |
| | | logger->debug("net_mod_socket_recvandsend error.%s\n", bus_strerror(rv)); |
| | | |
| | | } |
| | | |
| | | //rv = net_mod_socket_recvandsend_timeout(serverSockt, _recvandsend_callback_ , 0, 2000000, NULL ); |
| | | net_mod_socket_close(serverSockt); |
| | | logger->debug("stopted\n"); |
| | | // while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &key) ) == 0) { |
| | | // // printf( "server: RECEIVED REQUEST FROM %d NAME %s\n", key, recvbuf); |
| | | // sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf); |