From ab9d762e22875cec0cecf7783b9d76995562bebb Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 21 十二月 2020 16:58:51 +0800
Subject: [PATCH] update

---
 src/socket/shm_mod_socket.c |  158 ++++++++++++++++++++++++++++++++++++++++++++++------
 1 files changed, 140 insertions(+), 18 deletions(-)

diff --git a/src/socket/shm_mod_socket.c b/src/socket/shm_mod_socket.c
index 6685087..e28f872 100644
--- a/src/socket/shm_mod_socket.c
+++ b/src/socket/shm_mod_socket.c
@@ -187,30 +187,66 @@
 /**
  * @key 鎬荤嚎绔彛
  */
-int  ShmModSocket::_sub_(char *topic, int size, int key,  
+int  ShmModSocket::_sub_(char *topic, int topic_size, int key,  
 	struct timespec *timeout, int flags) {
-	char buf[8192];
-	int rv;
-	snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
-	rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
-	if(rv == 0) {
-		bus_set->insert(key);
+	// char buf[8192];
+	// int rv;
+	// snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
+	// rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
+	// if(rv == 0) {
+	// 	bus_set->insert(key);
+	// }
+	// return rv;
+
+	int ret;
+	bus_head_t head = {};
+	memcpy(head.action, "sub", sizeof(head.action));
+	head.topic_size = topic_size = strlen(topic) + 1;
+	head.content_size = 0;
+
+	void *buf;
+	int size = get_bus_sendbuf(head, topic, topic_size, NULL,  0, &buf);
+	if(size > 0) {
+		ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
+		free(buf);
+		if(ret == 0) {
+			bus_set->insert(key);
+		}
+		return ret;
+	} else {
+		return -1;
 	}
-	return rv;
 }
 
 
 /**
  * @key 鎬荤嚎绔彛
  */
-int  ShmModSocket::_desub_(char *topic, int size, int key,  
+int  ShmModSocket::_desub_(char *topic, int topic_size, int key,  
 	struct timespec *timeout, int flags) {
-	char buf[8192];
+	// char buf[8192];
+	int ret;
 	if(topic == NULL) {
 		topic = "";
 	}
-	snprintf(buf,  8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER,  topic, TOPIC_RIDENTIFIER);
-	return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
+	// snprintf(buf,  8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER,  topic, TOPIC_RIDENTIFIER);
+	// return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
+
+	bus_head_t head = {};
+	memcpy(head.action, "desub", sizeof(head.action));
+	head.topic_size = topic_size = strlen(topic) + 1;
+	head.content_size = 0;
+
+	void *buf;
+	int size = get_bus_sendbuf(head, topic,  topic_size, NULL,  0, &buf);
+	if(size > 0) {
+		ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
+		free(buf);
+		return ret;
+	} else {
+		return -1;
+	}
+
 }
 
 /**
@@ -220,15 +256,101 @@
  
 int  ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key,  
 	struct timespec *timeout, int flags) {
-	int head_len;
-	char buf[8192+content_size];
-	snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
-	head_len = strlen(buf);
-	memcpy(buf+head_len, content, content_size);
-	return shm_sendto(shm_socket, buf, head_len+content_size, key, timeout, flags);
+	// int head_len;
+	// char buf[8192+content_size];
+	// snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
+	// head_len = strlen(buf);
+	// memcpy(buf+head_len, content, content_size);
+
+	int ret;
+	bus_head_t head = {};
+	memcpy(head.action, "pub", sizeof(head.action));
+	head.topic_size = topic_size = strlen(topic) + 1;
+	head.content_size = content_size;
+
+	void *buf;
+	int size = get_bus_sendbuf(head, topic,  topic_size, content,  content_size, &buf);
+	if(size > 0) {
+		ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
+		free(buf);
+		return ret;
+	} else {
+		return -1;
+	}
+
 
 }
 
 
+int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head, 
+  void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) {
+ 
+  int buf_size;
+  char *buf;
+  int  max_buf_size;
+  if((buf = (char *)malloc(MAXBUF)) == NULL) {
+    LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
+    exit(1);
+  } else {
+    max_buf_size = MAXBUF;
+  }
+
+  buf_size = BUS_HEAD_SIZE + content_size + topic_size  ;
+  if(max_buf_size < buf_size) {
+    
+    if((buf = (char *)realloc(buf, buf_size)) == NULL) {
+      LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf  realloc buf");
+      exit(1);
+    } else {
+      max_buf_size = buf_size;
+    }
+  }
+
+  memcpy(buf, ShmModSocket::encode_bus_head(request_head), BUS_HEAD_SIZE);
+  if(topic_size != 0 ) 
+    memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size);
+  if(content_size != 0)
+ 	 memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
+ 
+  *retbuf = buf;
+  return buf_size;
+}
+
+/**
+	char action[];
+  uint32_t topic_size;
+	uint32_t content_size;
+*/
+
+void * ShmModSocket::encode_bus_head(bus_head_t & head) {
+  void * headbs = malloc(BUS_HEAD_SIZE);
+  char *tmp_ptr = (char *)headbs;
+
+  memcpy(tmp_ptr, head.action, sizeof(head.action));
+
+  tmp_ptr += 4;
+  PUT(tmp_ptr, htonl(head.topic_size));
+
+  tmp_ptr += 4;
+  PUT(tmp_ptr, htonl(head.content_size));
+  
+  return headbs;
+}
+
+bus_head_t  ShmModSocket::decode_bus_head(void *headbs) {
+  char *tmp_ptr = (char *)headbs;
+  bus_head_t head;
+
+  memcpy(head.action, tmp_ptr, sizeof(head.action));
+
+  tmp_ptr += 4;
+  head.topic_size = ntohl(GET(tmp_ptr));
+
+  tmp_ptr += 4;
+  head.content_size = ntohl(GET(tmp_ptr));
+ 
+ 
+  return head;
+}
 
  

--
Gitblit v1.8.0