From ab9d762e22875cec0cecf7783b9d76995562bebb Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 21 十二月 2020 16:58:51 +0800
Subject: [PATCH] update

---
 src/socket/bus_server_socket.c |   85 +++++++++++++++++++++++-------------------
 1 files changed, 46 insertions(+), 39 deletions(-)

diff --git a/src/socket/bus_server_socket.c b/src/socket/bus_server_socket.c
index eac784f..e1ddb89 100644
--- a/src/socket/bus_server_socket.c
+++ b/src/socket/bus_server_socket.c
@@ -1,5 +1,6 @@
 
 #include "bus_server_socket.h"
+#include "shm_mod_socket.h"
 
 static Logger *logger = LoggerFactory::getLogger();
 
@@ -183,7 +184,7 @@
 /*
  * 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙�
 */
-void BusServerSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key) {
+void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) {
 	SHMKeySet *subscripter_set;
 
 	SHMTopicSubMap::iterator map_iter;
@@ -200,7 +201,7 @@
 		for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
 			send_key = *set_iter;
  // printf("_proxy_pub send before %d \n", send_key);
-			if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) {
+			if (shm_sendto(shm_socket, buf, size, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) {
 				//瀵规柟宸插叧闂殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡
 				subscripter_to_del.push_back(send_key);
 			} else {
@@ -226,57 +227,63 @@
 	// pthread_detach(pthread_self());
 	int size;
 	int key;
-	char * action, *topic, *topics, *buf;
+	char * action, *topic, *topics, *buf, *content;
 	size_t head_len;
+	char resp_buf[128];
+	bus_head_t head;
 
 	const char *topic_delim = ",";
 // printf("run_pubsub_proxy server receive before\n");
 	while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
 //printf("run_pubsub_proxy server recv after: %s \n", buf);
-		if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
- // printf("run_pubsub_proxy  %s %s \n", action, topics);
-			if(strcmp(action, "sub") == 0) {
-				// 璁㈤槄鏀寔澶氫富棰樿闃�
-				topic = strtok(topics, topic_delim);
+		head = ShmModSocket::decode_bus_head(buf);
+		topics = buf + BUS_HEAD_SIZE;
+		action = head.action;
+		// if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
+  printf("run_pubsub_proxy : %s, %s \n", action, topics);
+		if(strcmp(action, "sub") == 0) {
+			// 璁㈤槄鏀寔澶氫富棰樿闃�
+			topic = strtok(topics, topic_delim);
 //printf("run_pubsub_proxy topic = %s\n", topic);
+		  while(topic) {
+       _proxy_sub(trim(topic, 0), key);
+        topic =  strtok(NULL, topic_delim);
+		  }
+
+		} else if(strcmp(action, "desub") == 0) {
+// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
+			if(strcmp(trim(topics, 0), "") == 0) {
+				// 鍙栨秷鎵�鏈夎闃�
+				_proxy_desub_all(key);
+			} else {
+			 
+				topic = strtok(topics, topic_delim);
 			  while(topic) {
-	       _proxy_sub(trim(topic, 0), key);
+	       _proxy_desub(trim(topic, 0), key);
 	        topic =  strtok(NULL, topic_delim);
 			  }
-
-			} else if(strcmp(action, "desub") == 0) {
-// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
-				if(strcmp(trim(topics, 0), "") == 0) {
-					// 鍙栨秷鎵�鏈夎闃�
-					_proxy_desub_all(key);
-				} else {
-				 
-					topic = strtok(topics, topic_delim);
-				  while(topic) {
-		       _proxy_desub(trim(topic, 0), key);
-		        topic =  strtok(NULL, topic_delim);
-				  }
-				}
-				
-			} else if(strcmp(action, "pub") == 0) {
-				_proxy_pub(topics, head_len, buf, size, key);
-			}  else if(strcmp(action, "stop") == 0) {
-				 logger->info( "Stopping Bus...");
-				 // snprintf(buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
-				 // shm_sendto(shm_socket, const void *buf, const int size, key);
-				 free(action);
-				 free(topics);
-				 free(buf);
-				 break;
-			} else {
-				logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action");
 			}
 			
-			free(action);
-			free(topics);
+		} else if(strcmp(action, "pub") == 0) {
+			 content = topics + head.topic_size;
+			_proxy_pub(topics, content, head.content_size, key);
+		}  else if(strcmp(action, "stop") == 0) {
+			 logger->info( "Stopping Bus...");
+			 // snprintf(resp_buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
+			 // shm_sendto(shm_socket, resp_buf, strlen(resp_buf), key);
+			 // free(action);
+			 // free(topics);
+			 free(buf);
+			 break;
 		} else {
-			logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg");
+			logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action");
 		}
+		
+		// free(action);
+		// free(topics);
+		// } else {
+		// 	logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg");
+		// }
 		free(buf);
 	}
 	return NULL;

--
Gitblit v1.8.0