src/queue/shm_queue.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/socket/bus_server_socket.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/socket/bus_server_socket.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
test_socket/bus_test.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/queue/shm_queue.h
@@ -28,19 +28,19 @@ void force_destroy(); inline uint32_t size(); uint32_t size(); inline bool full(); inline bool empty(); bool full(); bool empty(); inline int push(const ELEM_T &a_data); inline int push_nowait(const ELEM_T &a_data); inline int push_timeout(const ELEM_T &a_data, const struct timespec *timeout); inline int pop(ELEM_T &a_data); inline int pop_nowait(ELEM_T &a_data); inline int pop_timeout(ELEM_T &a_data, struct timespec *timeout); int push(const ELEM_T &a_data); int push_nowait(const ELEM_T &a_data); int push_timeout(const ELEM_T &a_data, const struct timespec *timeout); int pop(ELEM_T &a_data); int pop_nowait(ELEM_T &a_data); int pop_timeout(ELEM_T &a_data, struct timespec *timeout); inline ELEM_T &operator[](unsigned i); ELEM_T &operator[](unsigned i); // @deprecate static size_t remove_queues_exclude(int keys[], size_t length); @@ -132,20 +132,20 @@ } template <typename ELEM_T> inline uint32_t SHMQueue<ELEM_T>::size() { template <typename ELEM_T> uint32_t SHMQueue<ELEM_T>::size() { return queue->size(); } template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::full() { template <typename ELEM_T> bool SHMQueue<ELEM_T>::full() { return queue->full(); } template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::empty() { template <typename ELEM_T> bool SHMQueue<ELEM_T>::empty() { return queue->empty(); } template <typename ELEM_T> inline int SHMQueue<ELEM_T>::push(const ELEM_T &a_data) { int SHMQueue<ELEM_T>::push(const ELEM_T &a_data) { int rv = queue->push(a_data); if(rv == -1) { return errno; @@ -155,7 +155,7 @@ } template <typename ELEM_T> inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) { int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) { int rv = queue->push(a_data, NULL, BUS_NOWAIT_FLAG); if(rv == -1) { if (errno == EAGAIN) @@ -169,7 +169,7 @@ } template <typename ELEM_T> inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) { int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) { int rv = queue->push(a_data, timeout, BUS_TIMEOUT_FLAG); if(rv == -1) { @@ -183,11 +183,14 @@ return 0; } template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop(ELEM_T &a_data) { // printf("SHMQueue pop before\n"); template <typename ELEM_T> int SHMQueue<ELEM_T>::pop(ELEM_T &a_data) { LoggerFactory::getLogger()->debug("SHMQueue pop before\n"); int rv = queue->pop(a_data); // printf("SHMQueue after before\n"); LoggerFactory::getLogger()->debug("SHMQueue pop before\n"); if(rv == -1) { return errno; } else { return 0; @@ -195,7 +198,7 @@ } template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) { int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) { int rv = queue->pop(a_data, NULL, BUS_NOWAIT_FLAG); if(rv == -1) { @@ -211,7 +214,7 @@ } template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) { int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) { int rv; rv = queue->pop(a_data, timeout, BUS_TIMEOUT_FLAG); @@ -228,7 +231,7 @@ } template <typename ELEM_T> inline ELEM_T &SHMQueue<ELEM_T>::operator[](unsigned i) { ELEM_T &SHMQueue<ELEM_T>::operator[](unsigned i) { return queue->operator[](i); } src/socket/bus_server_socket.cpp
@@ -212,12 +212,12 @@ subscripter_set = map_iter->second; for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { send_key = *set_iter; // printf("_proxy_pub send before %d \n", send_key); printf("_proxy_pub send before %d \n", send_key); if (shm_sendto(shm_socket, buf, size, send_key, &timeout) == EBUS_CLOSED ) { //对方已关闭的连接放到待删除队列里。如果直接删除会让iter指针出现错乱 subscripter_to_del.push_back(send_key); } else { // printf("_proxy_pub send after: %d \n", send_key); printf("_proxy_pub send after: %d \n", send_key); } @@ -247,15 +247,15 @@ const char *topic_delim = ","; // printf("run_pubsub_proxy server receive before\n"); while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) { //printf("run_pubsub_proxy server recv after: %s \n", buf); printf("run_pubsub_proxy server recvfrom %d after: %s \n", key, buf); head = ShmModSocket::decode_bus_head(buf); topics = buf + BUS_HEAD_SIZE; action = head.action; // printf("run_pubsub_proxy : %s, %s \n", action, topics); printf("run_pubsub_proxy : %s, %s \n", action, topics); if(strcmp(action, "sub") == 0) { // 订阅支持多主题订阅 topic = strtok(topics, topic_delim); //printf("run_pubsub_proxy topic = %s\n", topic); printf("run_pubsub_proxy topic = %s\n", topic); while(topic) { _proxy_sub(trim(topic, 0), key); topic = strtok(NULL, topic_delim); @@ -301,71 +301,71 @@ /** * deprecate * @str "<**sub**>{经济}" */ // /** // * deprecate // * @str "<**sub**>{经济}" // */ int BusServerSocket::parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ) { char *ptr = str; char *str_end_ptr = str + size; char *action_start_ptr; char *action_end_ptr; size_t action_len = 0; // int BusServerSocket::parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ) { // char *ptr = str; // char *str_end_ptr = str + size; // char *action_start_ptr; // char *action_end_ptr; // size_t action_len = 0; char *topic_start_ptr; char *topic_end_ptr; size_t topic_len = 0; // char *topic_start_ptr; // char *topic_end_ptr; // size_t topic_len = 0; // if (strlen(identifier) > strlen(str)) { // return 0; // } // // if (strlen(identifier) > strlen(str)) { // // return 0; // // } if (strncmp(ptr, ACTION_LIDENTIFIER, strlen(ACTION_LIDENTIFIER)) == 0) { ptr += strlen(ACTION_LIDENTIFIER); action_start_ptr = ptr; while(strncmp(++ptr, ACTION_RIDENTIFIER, strlen(ACTION_RIDENTIFIER)) != 0) { if(ptr >= str_end_ptr) { return 0; } } // printf("%s\n", ptr); action_end_ptr = ptr; action_len = action_end_ptr - action_start_ptr; ptr += strlen(ACTION_RIDENTIFIER); // printf("%s\n", ptr); // printf("%s\n", str_end_ptr-1); if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) { topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER); // if (strncmp(ptr, ACTION_LIDENTIFIER, strlen(ACTION_LIDENTIFIER)) == 0) { // ptr += strlen(ACTION_LIDENTIFIER); // action_start_ptr = ptr; // while(strncmp(++ptr, ACTION_RIDENTIFIER, strlen(ACTION_RIDENTIFIER)) != 0) { // if(ptr >= str_end_ptr) { // return 0; // } // } // // printf("%s\n", ptr); // action_end_ptr = ptr; // action_len = action_end_ptr - action_start_ptr; // ptr += strlen(ACTION_RIDENTIFIER); // // printf("%s\n", ptr); // // printf("%s\n", str_end_ptr-1); // if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) { // topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER); while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) { if(ptr >= str_end_ptr) { return 0; } } topic_end_ptr = ptr; topic_len = topic_end_ptr - topic_start_ptr; // while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) { // if(ptr >= str_end_ptr) { // return 0; // } // } // topic_end_ptr = ptr; // topic_len = topic_end_ptr - topic_start_ptr; ptr += strlen(TOPIC_RIDENTIFIER); // ptr += strlen(TOPIC_RIDENTIFIER); } else { return 0; } } else { return 0; } // } else { // return 0; // } // } else { // return 0; // } char *topic = (char *)malloc(topic_len+1); strncpy(topic, topic_start_ptr, topic_len); *(topic+topic_len) = '\0'; *_topic = topic; // char *topic = (char *)malloc(topic_len+1); // strncpy(topic, topic_start_ptr, topic_len); // *(topic+topic_len) = '\0'; // *_topic = topic; char *action = (char *)malloc(action_len+1); strncpy(action, action_start_ptr, action_len); *(action+action_len) = '\0'; *_action = action; *head_len = ptr-str; // char *action = (char *)malloc(action_len+1); // strncpy(action, action_start_ptr, action_len); // *(action+action_len) = '\0'; // *_action = action; // *head_len = ptr-str; return 1; } // return 1; // } src/socket/bus_server_socket.h
@@ -30,7 +30,7 @@ void _proxy_sub( char *topic, int key); void _proxy_pub( char *topic, void *buf, size_t size, int key); void *run_pubsub_proxy(); int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ); // int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ); void _proxy_desub( char *topic, int key); void _proxy_desub_all(int key); test_socket/bus_test.cpp
@@ -25,7 +25,8 @@ int size; int key; ShmModSocket *sk = (ShmModSocket *)skptr; while (sk->recvfrom( &recvbuf, &size, &key) == 0) { while ( true) { sk->recvfrom( &recvbuf, &size, &key); printf("收到订阅消息:%s\n", recvbuf); free(recvbuf); } @@ -36,7 +37,7 @@ ShmModSocket *sk = new ShmModSocket(); pthread_t tid; pthread_create(&tid, NULL, run_recv, (void *)socket); pthread_create(&tid, NULL, run_recv, (void *)sk); int size; char action[512];