From ab9d762e22875cec0cecf7783b9d76995562bebb Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 21 十二月 2020 16:58:51 +0800
Subject: [PATCH] update

---
 src/socket/shm_mod_socket.h    |   19 +++
 src/socket/bus_server_socket.c |   85 +++++++++-------
 src/socket/bus_server_socket.h |    2 
 src/socket/shm_mod_socket.c    |  158 ++++++++++++++++++++++++++++---
 4 files changed, 206 insertions(+), 58 deletions(-)

diff --git a/src/socket/bus_server_socket.c b/src/socket/bus_server_socket.c
index eac784f..e1ddb89 100644
--- a/src/socket/bus_server_socket.c
+++ b/src/socket/bus_server_socket.c
@@ -1,5 +1,6 @@
 
 #include "bus_server_socket.h"
+#include "shm_mod_socket.h"
 
 static Logger *logger = LoggerFactory::getLogger();
 
@@ -183,7 +184,7 @@
 /*
  * 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙�
 */
-void BusServerSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key) {
+void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) {
 	SHMKeySet *subscripter_set;
 
 	SHMTopicSubMap::iterator map_iter;
@@ -200,7 +201,7 @@
 		for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
 			send_key = *set_iter;
  // printf("_proxy_pub send before %d \n", send_key);
-			if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) {
+			if (shm_sendto(shm_socket, buf, size, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) {
 				//瀵规柟宸插叧闂殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡
 				subscripter_to_del.push_back(send_key);
 			} else {
@@ -226,57 +227,63 @@
 	// pthread_detach(pthread_self());
 	int size;
 	int key;
-	char * action, *topic, *topics, *buf;
+	char * action, *topic, *topics, *buf, *content;
 	size_t head_len;
+	char resp_buf[128];
+	bus_head_t head;
 
 	const char *topic_delim = ",";
 // printf("run_pubsub_proxy server receive before\n");
 	while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
 //printf("run_pubsub_proxy server recv after: %s \n", buf);
-		if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
- // printf("run_pubsub_proxy  %s %s \n", action, topics);
-			if(strcmp(action, "sub") == 0) {
-				// 璁㈤槄鏀寔澶氫富棰樿闃�
-				topic = strtok(topics, topic_delim);
+		head = ShmModSocket::decode_bus_head(buf);
+		topics = buf + BUS_HEAD_SIZE;
+		action = head.action;
+		// if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
+  printf("run_pubsub_proxy : %s, %s \n", action, topics);
+		if(strcmp(action, "sub") == 0) {
+			// 璁㈤槄鏀寔澶氫富棰樿闃�
+			topic = strtok(topics, topic_delim);
 //printf("run_pubsub_proxy topic = %s\n", topic);
+		  while(topic) {
+       _proxy_sub(trim(topic, 0), key);
+        topic =  strtok(NULL, topic_delim);
+		  }
+
+		} else if(strcmp(action, "desub") == 0) {
+// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
+			if(strcmp(trim(topics, 0), "") == 0) {
+				// 鍙栨秷鎵�鏈夎闃�
+				_proxy_desub_all(key);
+			} else {
+			 
+				topic = strtok(topics, topic_delim);
 			  while(topic) {
-	       _proxy_sub(trim(topic, 0), key);
+	       _proxy_desub(trim(topic, 0), key);
 	        topic =  strtok(NULL, topic_delim);
 			  }
-
-			} else if(strcmp(action, "desub") == 0) {
-// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
-				if(strcmp(trim(topics, 0), "") == 0) {
-					// 鍙栨秷鎵�鏈夎闃�
-					_proxy_desub_all(key);
-				} else {
-				 
-					topic = strtok(topics, topic_delim);
-				  while(topic) {
-		       _proxy_desub(trim(topic, 0), key);
-		        topic =  strtok(NULL, topic_delim);
-				  }
-				}
-				
-			} else if(strcmp(action, "pub") == 0) {
-				_proxy_pub(topics, head_len, buf, size, key);
-			}  else if(strcmp(action, "stop") == 0) {
-				 logger->info( "Stopping Bus...");
-				 // snprintf(buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
-				 // shm_sendto(shm_socket, const void *buf, const int size, key);
-				 free(action);
-				 free(topics);
-				 free(buf);
-				 break;
-			} else {
-				logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action");
 			}
 			
-			free(action);
-			free(topics);
+		} else if(strcmp(action, "pub") == 0) {
+			 content = topics + head.topic_size;
+			_proxy_pub(topics, content, head.content_size, key);
+		}  else if(strcmp(action, "stop") == 0) {
+			 logger->info( "Stopping Bus...");
+			 // snprintf(resp_buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
+			 // shm_sendto(shm_socket, resp_buf, strlen(resp_buf), key);
+			 // free(action);
+			 // free(topics);
+			 free(buf);
+			 break;
 		} else {
-			logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg");
+			logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action");
 		}
+		
+		// free(action);
+		// free(topics);
+		// } else {
+		// 	logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg");
+		// }
 		free(buf);
 	}
 	return NULL;
diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h
index 1a83d0e..d475f02 100644
--- a/src/socket/bus_server_socket.h
+++ b/src/socket/bus_server_socket.h
@@ -28,7 +28,7 @@
 
 private:
 	void _proxy_sub( char *topic, int key);
-	void _proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key);
+	void _proxy_pub( char *topic, void *buf, size_t size, int key);
 	void *run_pubsub_proxy();
 	int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
  
diff --git a/src/socket/shm_mod_socket.c b/src/socket/shm_mod_socket.c
index 6685087..e28f872 100644
--- a/src/socket/shm_mod_socket.c
+++ b/src/socket/shm_mod_socket.c
@@ -187,30 +187,66 @@
 /**
  * @key 鎬荤嚎绔彛
  */
-int  ShmModSocket::_sub_(char *topic, int size, int key,  
+int  ShmModSocket::_sub_(char *topic, int topic_size, int key,  
 	struct timespec *timeout, int flags) {
-	char buf[8192];
-	int rv;
-	snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
-	rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
-	if(rv == 0) {
-		bus_set->insert(key);
+	// char buf[8192];
+	// int rv;
+	// snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
+	// rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
+	// if(rv == 0) {
+	// 	bus_set->insert(key);
+	// }
+	// return rv;
+
+	int ret;
+	bus_head_t head = {};
+	memcpy(head.action, "sub", sizeof(head.action));
+	head.topic_size = topic_size = strlen(topic) + 1;
+	head.content_size = 0;
+
+	void *buf;
+	int size = get_bus_sendbuf(head, topic, topic_size, NULL,  0, &buf);
+	if(size > 0) {
+		ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
+		free(buf);
+		if(ret == 0) {
+			bus_set->insert(key);
+		}
+		return ret;
+	} else {
+		return -1;
 	}
-	return rv;
 }
 
 
 /**
  * @key 鎬荤嚎绔彛
  */
-int  ShmModSocket::_desub_(char *topic, int size, int key,  
+int  ShmModSocket::_desub_(char *topic, int topic_size, int key,  
 	struct timespec *timeout, int flags) {
-	char buf[8192];
+	// char buf[8192];
+	int ret;
 	if(topic == NULL) {
 		topic = "";
 	}
-	snprintf(buf,  8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER,  topic, TOPIC_RIDENTIFIER);
-	return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
+	// snprintf(buf,  8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER,  topic, TOPIC_RIDENTIFIER);
+	// return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
+
+	bus_head_t head = {};
+	memcpy(head.action, "desub", sizeof(head.action));
+	head.topic_size = topic_size = strlen(topic) + 1;
+	head.content_size = 0;
+
+	void *buf;
+	int size = get_bus_sendbuf(head, topic,  topic_size, NULL,  0, &buf);
+	if(size > 0) {
+		ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
+		free(buf);
+		return ret;
+	} else {
+		return -1;
+	}
+
 }
 
 /**
@@ -220,15 +256,101 @@
  
 int  ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key,  
 	struct timespec *timeout, int flags) {
-	int head_len;
-	char buf[8192+content_size];
-	snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
-	head_len = strlen(buf);
-	memcpy(buf+head_len, content, content_size);
-	return shm_sendto(shm_socket, buf, head_len+content_size, key, timeout, flags);
+	// int head_len;
+	// char buf[8192+content_size];
+	// snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
+	// head_len = strlen(buf);
+	// memcpy(buf+head_len, content, content_size);
+
+	int ret;
+	bus_head_t head = {};
+	memcpy(head.action, "pub", sizeof(head.action));
+	head.topic_size = topic_size = strlen(topic) + 1;
+	head.content_size = content_size;
+
+	void *buf;
+	int size = get_bus_sendbuf(head, topic,  topic_size, content,  content_size, &buf);
+	if(size > 0) {
+		ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
+		free(buf);
+		return ret;
+	} else {
+		return -1;
+	}
+
 
 }
 
 
+int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head, 
+  void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) {
+ 
+  int buf_size;
+  char *buf;
+  int  max_buf_size;
+  if((buf = (char *)malloc(MAXBUF)) == NULL) {
+    LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
+    exit(1);
+  } else {
+    max_buf_size = MAXBUF;
+  }
+
+  buf_size = BUS_HEAD_SIZE + content_size + topic_size  ;
+  if(max_buf_size < buf_size) {
+    
+    if((buf = (char *)realloc(buf, buf_size)) == NULL) {
+      LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf  realloc buf");
+      exit(1);
+    } else {
+      max_buf_size = buf_size;
+    }
+  }
+
+  memcpy(buf, ShmModSocket::encode_bus_head(request_head), BUS_HEAD_SIZE);
+  if(topic_size != 0 ) 
+    memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size);
+  if(content_size != 0)
+ 	 memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
+ 
+  *retbuf = buf;
+  return buf_size;
+}
+
+/**
+	char action[];
+  uint32_t topic_size;
+	uint32_t content_size;
+*/
+
+void * ShmModSocket::encode_bus_head(bus_head_t & head) {
+  void * headbs = malloc(BUS_HEAD_SIZE);
+  char *tmp_ptr = (char *)headbs;
+
+  memcpy(tmp_ptr, head.action, sizeof(head.action));
+
+  tmp_ptr += 4;
+  PUT(tmp_ptr, htonl(head.topic_size));
+
+  tmp_ptr += 4;
+  PUT(tmp_ptr, htonl(head.content_size));
+  
+  return headbs;
+}
+
+bus_head_t  ShmModSocket::decode_bus_head(void *headbs) {
+  char *tmp_ptr = (char *)headbs;
+  bus_head_t head;
+
+  memcpy(head.action, tmp_ptr, sizeof(head.action));
+
+  tmp_ptr += 4;
+  head.topic_size = ntohl(GET(tmp_ptr));
+
+  tmp_ptr += 4;
+  head.content_size = ntohl(GET(tmp_ptr));
+ 
+ 
+  return head;
+}
 
  
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index 5c66fbb..9ec3044 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -11,6 +11,16 @@
 #include <set>
 #include "socket_def.h"
 
+#define BUS_HEAD_SIZE (64 + 2 * sizeof(uint32_t))
+
+struct bus_head_t
+{
+	char action[64];
+	uint32_t topic_size;
+	uint32_t content_size;
+};
+
+
 class ShmModSocket {
 private:
 	shm_socket_t *shm_socket;
@@ -26,8 +36,17 @@
 
 	int  _desub_( char *topic, int size, int key, struct timespec *timeout, int flags);
 
+
+	static int get_bus_sendbuf(bus_head_t &request_head, void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf);
+
 public:
 	static size_t remove_keys(int keys[], size_t length);
+
+  // bus header 缂栫爜涓虹綉缁滀紶杈撶殑瀛楄妭
+  static void * encode_bus_head(bus_head_t & bushead);
+  // 瑙g爜 bus  header
+  static bus_head_t  decode_bus_head(void *headbs); 
+  
 public:
 	ShmModSocket();
 	~ShmModSocket();

--
Gitblit v1.8.0