From 52eee175b041701a8fb29b457b43451c1d6cb983 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 22 一月 2021 17:55:40 +0800 Subject: [PATCH] update --- src/queue/shm_queue.h | 47 ++++++++------- src/socket/bus_server_socket.cpp | 124 ++++++++++++++++++++-------------------- test_socket/bus_test.cpp | 5 + src/socket/bus_server_socket.h | 2 4 files changed, 91 insertions(+), 87 deletions(-) diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h index 0be124b..9e64992 100644 --- a/src/queue/shm_queue.h +++ b/src/queue/shm_queue.h @@ -28,19 +28,19 @@ void force_destroy(); - inline uint32_t size(); + uint32_t size(); - inline bool full(); - inline bool empty(); + bool full(); + bool empty(); - inline int push(const ELEM_T &a_data); - inline int push_nowait(const ELEM_T &a_data); - inline int push_timeout(const ELEM_T &a_data, const struct timespec *timeout); - inline int pop(ELEM_T &a_data); - inline int pop_nowait(ELEM_T &a_data); - inline int pop_timeout(ELEM_T &a_data, struct timespec *timeout); + int push(const ELEM_T &a_data); + int push_nowait(const ELEM_T &a_data); + int push_timeout(const ELEM_T &a_data, const struct timespec *timeout); + int pop(ELEM_T &a_data); + int pop_nowait(ELEM_T &a_data); + int pop_timeout(ELEM_T &a_data, struct timespec *timeout); - inline ELEM_T &operator[](unsigned i); + ELEM_T &operator[](unsigned i); // @deprecate static size_t remove_queues_exclude(int keys[], size_t length); @@ -132,20 +132,20 @@ } -template <typename ELEM_T> inline uint32_t SHMQueue<ELEM_T>::size() { +template <typename ELEM_T> uint32_t SHMQueue<ELEM_T>::size() { return queue->size(); } -template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::full() { +template <typename ELEM_T> bool SHMQueue<ELEM_T>::full() { return queue->full(); } -template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::empty() { +template <typename ELEM_T> bool SHMQueue<ELEM_T>::empty() { return queue->empty(); } template <typename ELEM_T> -inline int SHMQueue<ELEM_T>::push(const ELEM_T &a_data) { +int SHMQueue<ELEM_T>::push(const ELEM_T &a_data) { int rv = queue->push(a_data); if(rv == -1) { return errno; @@ -155,7 +155,7 @@ } template <typename ELEM_T> -inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) { +int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) { int rv = queue->push(a_data, NULL, BUS_NOWAIT_FLAG); if(rv == -1) { if (errno == EAGAIN) @@ -169,7 +169,7 @@ } template <typename ELEM_T> -inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) { +int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) { int rv = queue->push(a_data, timeout, BUS_TIMEOUT_FLAG); if(rv == -1) { @@ -183,11 +183,14 @@ return 0; } -template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop(ELEM_T &a_data) { - // printf("SHMQueue pop before\n"); +template <typename ELEM_T> +int SHMQueue<ELEM_T>::pop(ELEM_T &a_data) { + LoggerFactory::getLogger()->debug("SHMQueue pop before\n"); int rv = queue->pop(a_data); - // printf("SHMQueue after before\n"); + + LoggerFactory::getLogger()->debug("SHMQueue pop before\n"); if(rv == -1) { + return errno; } else { return 0; @@ -195,7 +198,7 @@ } template <typename ELEM_T> -inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) { +int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) { int rv = queue->pop(a_data, NULL, BUS_NOWAIT_FLAG); if(rv == -1) { @@ -211,7 +214,7 @@ } template <typename ELEM_T> -inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) { +int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) { int rv; rv = queue->pop(a_data, timeout, BUS_TIMEOUT_FLAG); @@ -228,7 +231,7 @@ } template <typename ELEM_T> -inline ELEM_T &SHMQueue<ELEM_T>::operator[](unsigned i) { +ELEM_T &SHMQueue<ELEM_T>::operator[](unsigned i) { return queue->operator[](i); } diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index db14db4..e66a709 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -212,12 +212,12 @@ subscripter_set = map_iter->second; for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { send_key = *set_iter; - // printf("_proxy_pub send before %d \n", send_key); + printf("_proxy_pub send before %d \n", send_key); if (shm_sendto(shm_socket, buf, size, send_key, &timeout) == EBUS_CLOSED ) { //瀵规柟宸插叧闂殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡 subscripter_to_del.push_back(send_key); } else { -// printf("_proxy_pub send after: %d \n", send_key); +printf("_proxy_pub send after: %d \n", send_key); } @@ -247,15 +247,15 @@ const char *topic_delim = ","; // printf("run_pubsub_proxy server receive before\n"); while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) { -//printf("run_pubsub_proxy server recv after: %s \n", buf); +printf("run_pubsub_proxy server recvfrom %d after: %s \n", key, buf); head = ShmModSocket::decode_bus_head(buf); topics = buf + BUS_HEAD_SIZE; action = head.action; - // printf("run_pubsub_proxy : %s, %s \n", action, topics); + printf("run_pubsub_proxy : %s, %s \n", action, topics); if(strcmp(action, "sub") == 0) { // 璁㈤槄鏀寔澶氫富棰樿闃� topic = strtok(topics, topic_delim); -//printf("run_pubsub_proxy topic = %s\n", topic); +printf("run_pubsub_proxy topic = %s\n", topic); while(topic) { _proxy_sub(trim(topic, 0), key); topic = strtok(NULL, topic_delim); @@ -301,71 +301,71 @@ -/** - * deprecate - * @str "<**sub**>{缁忔祹}" - */ +// /** +// * deprecate +// * @str "<**sub**>{缁忔祹}" +// */ -int BusServerSocket::parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ) { - char *ptr = str; - char *str_end_ptr = str + size; - char *action_start_ptr; - char *action_end_ptr; - size_t action_len = 0; +// int BusServerSocket::parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ) { +// char *ptr = str; +// char *str_end_ptr = str + size; +// char *action_start_ptr; +// char *action_end_ptr; +// size_t action_len = 0; - char *topic_start_ptr; - char *topic_end_ptr; - size_t topic_len = 0; +// char *topic_start_ptr; +// char *topic_end_ptr; +// size_t topic_len = 0; - // if (strlen(identifier) > strlen(str)) { - // return 0; - // } +// // if (strlen(identifier) > strlen(str)) { +// // return 0; +// // } - if (strncmp(ptr, ACTION_LIDENTIFIER, strlen(ACTION_LIDENTIFIER)) == 0) { - ptr += strlen(ACTION_LIDENTIFIER); - action_start_ptr = ptr; - while(strncmp(++ptr, ACTION_RIDENTIFIER, strlen(ACTION_RIDENTIFIER)) != 0) { - if(ptr >= str_end_ptr) { - return 0; - } - } -// printf("%s\n", ptr); - action_end_ptr = ptr; - action_len = action_end_ptr - action_start_ptr; - ptr += strlen(ACTION_RIDENTIFIER); -// printf("%s\n", ptr); -// printf("%s\n", str_end_ptr-1); - if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) { - topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER); +// if (strncmp(ptr, ACTION_LIDENTIFIER, strlen(ACTION_LIDENTIFIER)) == 0) { +// ptr += strlen(ACTION_LIDENTIFIER); +// action_start_ptr = ptr; +// while(strncmp(++ptr, ACTION_RIDENTIFIER, strlen(ACTION_RIDENTIFIER)) != 0) { +// if(ptr >= str_end_ptr) { +// return 0; +// } +// } +// // printf("%s\n", ptr); +// action_end_ptr = ptr; +// action_len = action_end_ptr - action_start_ptr; +// ptr += strlen(ACTION_RIDENTIFIER); +// // printf("%s\n", ptr); +// // printf("%s\n", str_end_ptr-1); +// if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) { +// topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER); - while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) { - if(ptr >= str_end_ptr) { - return 0; - } - } - topic_end_ptr = ptr; - topic_len = topic_end_ptr - topic_start_ptr; +// while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) { +// if(ptr >= str_end_ptr) { +// return 0; +// } +// } +// topic_end_ptr = ptr; +// topic_len = topic_end_ptr - topic_start_ptr; - ptr += strlen(TOPIC_RIDENTIFIER); +// ptr += strlen(TOPIC_RIDENTIFIER); - } else { - return 0; - } - } else { - return 0; - } +// } else { +// return 0; +// } +// } else { +// return 0; +// } - char *topic = (char *)malloc(topic_len+1); - strncpy(topic, topic_start_ptr, topic_len); - *(topic+topic_len) = '\0'; - *_topic = topic; +// char *topic = (char *)malloc(topic_len+1); +// strncpy(topic, topic_start_ptr, topic_len); +// *(topic+topic_len) = '\0'; +// *_topic = topic; - char *action = (char *)malloc(action_len+1); - strncpy(action, action_start_ptr, action_len); - *(action+action_len) = '\0'; - *_action = action; - *head_len = ptr-str; +// char *action = (char *)malloc(action_len+1); +// strncpy(action, action_start_ptr, action_len); +// *(action+action_len) = '\0'; +// *_action = action; +// *head_len = ptr-str; - return 1; -} \ No newline at end of file +// return 1; +// } \ No newline at end of file diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h index d475f02..486bf49 100644 --- a/src/socket/bus_server_socket.h +++ b/src/socket/bus_server_socket.h @@ -30,7 +30,7 @@ void _proxy_sub( char *topic, int key); void _proxy_pub( char *topic, void *buf, size_t size, int key); void *run_pubsub_proxy(); - int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ); + // int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ); void _proxy_desub( char *topic, int key); void _proxy_desub_all(int key); diff --git a/test_socket/bus_test.cpp b/test_socket/bus_test.cpp index cdd6142..b815476 100644 --- a/test_socket/bus_test.cpp +++ b/test_socket/bus_test.cpp @@ -25,7 +25,8 @@ int size; int key; ShmModSocket *sk = (ShmModSocket *)skptr; - while (sk->recvfrom( &recvbuf, &size, &key) == 0) { + while ( true) { + sk->recvfrom( &recvbuf, &size, &key); printf("鏀跺埌璁㈤槄娑堟伅:%s\n", recvbuf); free(recvbuf); } @@ -36,7 +37,7 @@ ShmModSocket *sk = new ShmModSocket(); pthread_t tid; - pthread_create(&tid, NULL, run_recv, (void *)socket); + pthread_create(&tid, NULL, run_recv, (void *)sk); int size; char action[512]; -- Gitblit v1.8.0