From 52eee175b041701a8fb29b457b43451c1d6cb983 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期五, 22 一月 2021 17:55:40 +0800
Subject: [PATCH] update

---
 src/queue/shm_queue.h            |   47 ++++++++-------
 src/socket/bus_server_socket.cpp |  124 ++++++++++++++++++++--------------------
 test_socket/bus_test.cpp         |    5 +
 src/socket/bus_server_socket.h   |    2 
 4 files changed, 91 insertions(+), 87 deletions(-)

diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h
index 0be124b..9e64992 100644
--- a/src/queue/shm_queue.h
+++ b/src/queue/shm_queue.h
@@ -28,19 +28,19 @@
 
   void force_destroy();
 
-  inline uint32_t size();
+  uint32_t size();
 
-  inline bool full();
-  inline bool empty();
+  bool full();
+  bool empty();
 
-  inline int push(const ELEM_T &a_data);
-  inline int push_nowait(const ELEM_T &a_data);
-  inline int push_timeout(const ELEM_T &a_data, const struct timespec *timeout);
-  inline int pop(ELEM_T &a_data);
-  inline int pop_nowait(ELEM_T &a_data);
-  inline int pop_timeout(ELEM_T &a_data, struct timespec *timeout);
+  int push(const ELEM_T &a_data);
+  int push_nowait(const ELEM_T &a_data);
+  int push_timeout(const ELEM_T &a_data, const struct timespec *timeout);
+  int pop(ELEM_T &a_data);
+  int pop_nowait(ELEM_T &a_data);
+  int pop_timeout(ELEM_T &a_data, struct timespec *timeout);
 
-  inline ELEM_T &operator[](unsigned i);
+  ELEM_T &operator[](unsigned i);
 
  // @deprecate
   static size_t remove_queues_exclude(int keys[], size_t length);
@@ -132,20 +132,20 @@
   
 }
 
-template <typename ELEM_T> inline uint32_t SHMQueue<ELEM_T>::size() {
+template <typename ELEM_T> uint32_t SHMQueue<ELEM_T>::size() {
   return queue->size();
 }
 
-template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::full() {
+template <typename ELEM_T> bool SHMQueue<ELEM_T>::full() {
   return queue->full();
 }
 
-template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::empty() {
+template <typename ELEM_T> bool SHMQueue<ELEM_T>::empty() {
   return queue->empty();
 }
 
 template <typename ELEM_T>
-inline int SHMQueue<ELEM_T>::push(const ELEM_T &a_data) {
+int SHMQueue<ELEM_T>::push(const ELEM_T &a_data) {
   int rv = queue->push(a_data);
   if(rv == -1) {
     return errno;
@@ -155,7 +155,7 @@
 }
 
 template <typename ELEM_T>
-inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) {
+int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) {
   int rv =  queue->push(a_data, NULL, BUS_NOWAIT_FLAG);
   if(rv == -1) {
     if (errno == EAGAIN)
@@ -169,7 +169,7 @@
 }
 
 template <typename ELEM_T>
-inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) {
+int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) {
 
   int rv = queue->push(a_data, timeout, BUS_TIMEOUT_FLAG);
   if(rv == -1) {
@@ -183,11 +183,14 @@
   return 0;
 }
 
-template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop(ELEM_T &a_data) {
-  // printf("SHMQueue pop before\n");
+template <typename ELEM_T> 
+int SHMQueue<ELEM_T>::pop(ELEM_T &a_data) {
+  LoggerFactory::getLogger()->debug("SHMQueue pop before\n");
   int rv = queue->pop(a_data);
-  // printf("SHMQueue after before\n");
+
+  LoggerFactory::getLogger()->debug("SHMQueue pop before\n");
   if(rv == -1) {
+
     return errno;
   } else {
     return 0;
@@ -195,7 +198,7 @@
 }
 
 template <typename ELEM_T>
-inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) {
+int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) {
   int rv = queue->pop(a_data, NULL, BUS_NOWAIT_FLAG);
 
   if(rv == -1) {
@@ -211,7 +214,7 @@
 }
 
 template <typename ELEM_T>
-inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) {
+int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) {
 
   int rv;
   rv = queue->pop(a_data, timeout, BUS_TIMEOUT_FLAG);
@@ -228,7 +231,7 @@
 }
 
 template <typename ELEM_T>
-inline ELEM_T &SHMQueue<ELEM_T>::operator[](unsigned i) {
+ELEM_T &SHMQueue<ELEM_T>::operator[](unsigned i) {
   return queue->operator[](i);
 }
 
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index db14db4..e66a709 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -212,12 +212,12 @@
 		subscripter_set = map_iter->second;
 		for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
 			send_key = *set_iter;
- // printf("_proxy_pub send before %d \n", send_key);
+ printf("_proxy_pub send before %d \n", send_key);
 			if (shm_sendto(shm_socket, buf, size, send_key, &timeout) == EBUS_CLOSED ) {
 				//瀵规柟宸插叧闂殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡
 				subscripter_to_del.push_back(send_key);
 			} else {
-// printf("_proxy_pub send after: %d \n", send_key);
+printf("_proxy_pub send after: %d \n", send_key);
 			}
 
 			
@@ -247,15 +247,15 @@
 	const char *topic_delim = ",";
 // printf("run_pubsub_proxy server receive before\n");
 	while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
-//printf("run_pubsub_proxy server recv after: %s \n", buf);
+printf("run_pubsub_proxy server recvfrom %d after: %s \n", key, buf);
 		head = ShmModSocket::decode_bus_head(buf);
 		topics = buf + BUS_HEAD_SIZE;
 		action = head.action;
-  // printf("run_pubsub_proxy : %s, %s \n", action, topics);
+  printf("run_pubsub_proxy : %s, %s \n", action, topics);
 		if(strcmp(action, "sub") == 0) {
 			// 璁㈤槄鏀寔澶氫富棰樿闃�
 			topic = strtok(topics, topic_delim);
-//printf("run_pubsub_proxy topic = %s\n", topic);
+printf("run_pubsub_proxy topic = %s\n", topic);
 		  while(topic) {
        _proxy_sub(trim(topic, 0), key);
         topic =  strtok(NULL, topic_delim);
@@ -301,71 +301,71 @@
 
 
 
-/**
- * deprecate
- * @str "<**sub**>{缁忔祹}"
- */
+// /**
+//  * deprecate
+//  * @str "<**sub**>{缁忔祹}"
+//  */
 
-int BusServerSocket::parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ) {
- char *ptr = str;
- char *str_end_ptr = str + size;
- char *action_start_ptr;
- char *action_end_ptr;
- size_t action_len = 0;
+// int BusServerSocket::parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ) {
+//  char *ptr = str;
+//  char *str_end_ptr = str + size;
+//  char *action_start_ptr;
+//  char *action_end_ptr;
+//  size_t action_len = 0;
 
- char *topic_start_ptr;
- char *topic_end_ptr;
- size_t topic_len = 0;
+//  char *topic_start_ptr;
+//  char *topic_end_ptr;
+//  size_t topic_len = 0;
 
- // if (strlen(identifier) > strlen(str)) {
- //  return 0;
- // }
+//  // if (strlen(identifier) > strlen(str)) {
+//  //  return 0;
+//  // }
 
- if (strncmp(ptr, ACTION_LIDENTIFIER, strlen(ACTION_LIDENTIFIER)) == 0) {
-  ptr += strlen(ACTION_LIDENTIFIER);
-  action_start_ptr = ptr;
-  while(strncmp(++ptr, ACTION_RIDENTIFIER, strlen(ACTION_RIDENTIFIER)) != 0) {
-    if(ptr >= str_end_ptr) {
-      return 0;
-    }
-  }
-// printf("%s\n", ptr);
-  action_end_ptr = ptr;
-  action_len = action_end_ptr - action_start_ptr;
-  ptr += strlen(ACTION_RIDENTIFIER);
-// printf("%s\n", ptr);
-// printf("%s\n", str_end_ptr-1);
-  if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) {
-    topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER);
+//  if (strncmp(ptr, ACTION_LIDENTIFIER, strlen(ACTION_LIDENTIFIER)) == 0) {
+//   ptr += strlen(ACTION_LIDENTIFIER);
+//   action_start_ptr = ptr;
+//   while(strncmp(++ptr, ACTION_RIDENTIFIER, strlen(ACTION_RIDENTIFIER)) != 0) {
+//     if(ptr >= str_end_ptr) {
+//       return 0;
+//     }
+//   }
+// // printf("%s\n", ptr);
+//   action_end_ptr = ptr;
+//   action_len = action_end_ptr - action_start_ptr;
+//   ptr += strlen(ACTION_RIDENTIFIER);
+// // printf("%s\n", ptr);
+// // printf("%s\n", str_end_ptr-1);
+//   if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) {
+//     topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER);
    
 
-    while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) {
-      if(ptr >= str_end_ptr) {
-        return 0;
-      }
-    }
-    topic_end_ptr = ptr;
-    topic_len = topic_end_ptr - topic_start_ptr;
+//     while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) {
+//       if(ptr >= str_end_ptr) {
+//         return 0;
+//       }
+//     }
+//     topic_end_ptr = ptr;
+//     topic_len = topic_end_ptr - topic_start_ptr;
     
-    ptr += strlen(TOPIC_RIDENTIFIER);
+//     ptr += strlen(TOPIC_RIDENTIFIER);
    
-  } else {
-    return 0;
-  }
- } else {
-  return 0;
- }
+//   } else {
+//     return 0;
+//   }
+//  } else {
+//   return 0;
+//  }
 
- char *topic = (char *)malloc(topic_len+1);
- strncpy(topic, topic_start_ptr, topic_len);
- *(topic+topic_len) = '\0';
- *_topic = topic;
+//  char *topic = (char *)malloc(topic_len+1);
+//  strncpy(topic, topic_start_ptr, topic_len);
+//  *(topic+topic_len) = '\0';
+//  *_topic = topic;
 
- char *action = (char *)malloc(action_len+1);
- strncpy(action, action_start_ptr, action_len);
- *(action+action_len) = '\0';
- *_action = action;
- *head_len = ptr-str;
+//  char *action = (char *)malloc(action_len+1);
+//  strncpy(action, action_start_ptr, action_len);
+//  *(action+action_len) = '\0';
+//  *_action = action;
+//  *head_len = ptr-str;
 
- return 1;
-}
\ No newline at end of file
+//  return 1;
+// }
\ No newline at end of file
diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h
index d475f02..486bf49 100644
--- a/src/socket/bus_server_socket.h
+++ b/src/socket/bus_server_socket.h
@@ -30,7 +30,7 @@
 	void _proxy_sub( char *topic, int key);
 	void _proxy_pub( char *topic, void *buf, size_t size, int key);
 	void *run_pubsub_proxy();
-	int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
+	// int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
  
 	void _proxy_desub( char *topic, int key);
 	void _proxy_desub_all(int key);
diff --git a/test_socket/bus_test.cpp b/test_socket/bus_test.cpp
index cdd6142..b815476 100644
--- a/test_socket/bus_test.cpp
+++ b/test_socket/bus_test.cpp
@@ -25,7 +25,8 @@
   int size;
   int key;
   ShmModSocket *sk = (ShmModSocket *)skptr;
-  while (sk->recvfrom( &recvbuf, &size, &key) == 0) {
+  while ( true) {
+    sk->recvfrom( &recvbuf, &size, &key);
     printf("鏀跺埌璁㈤槄娑堟伅:%s\n", recvbuf);
     free(recvbuf);
   }
@@ -36,7 +37,7 @@
   ShmModSocket *sk = new ShmModSocket();
   
   pthread_t tid;
-  pthread_create(&tid, NULL, run_recv, (void *)socket);
+  pthread_create(&tid, NULL, run_recv, (void *)sk);
   int size;
   
   char action[512];

--
Gitblit v1.8.0