From b861de29176891657cc96631ddbfb4ea9e114a42 Mon Sep 17 00:00:00 2001
From: Fu Juntang <StrongTiger_001@163.com>
Date: 星期一, 30 八月 2021 17:52:23 +0800
Subject: [PATCH] re-structure the communication work flow.

---
 src/socket/shm_mod_socket.cpp |  150 +++++++++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 143 insertions(+), 7 deletions(-)

diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index abd9477..a94b9c3 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -38,6 +38,129 @@
 	return shm_socket_force_bind(shm_socket, key);
 }
 
+int ShmModSocket::bind_proc_id(char *buf, int len) {
+  return shm_socket_bind_proc_id(shm_socket, buf, len);
+}
+
+int ShmModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag)
+{
+  int ret;
+  struct timespec ts;
+
+  bus_head_t head = {}; 
+  
+  if (flag == PROC_REG) {
+  
+    memcpy(head.action, "reg", sizeof(head.action));
+  
+  } else if (flag == PROC_UNREG) {
+
+    memcpy(head.action, "unreg", sizeof(head.action));
+  
+  } else if (flag == PROC_REG_TCS) { 
+  
+    memcpy(head.action, "tcsreg", sizeof(head.action));
+  
+  } else if (flag == PROC_QUE_TCS) { 
+  
+    memcpy(head.action, "tcsque", sizeof(head.action)); 
+  
+  } else if (flag == PROC_QUE_STCS) {
+  
+    memcpy(head.action, "stcsque", sizeof(head.action));
+  
+  } else  if (flag == PROC_QUE_ATCS) {
+
+    memcpy(head.action, "atcsque", sizeof(head.action));
+
+  } else {
+
+    return -1;
+
+  }
+
+  if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
+    
+    head.topic_size = 0;
+
+    if (pData != NULL) {
+    
+      head.content_size = sizeof(ProcInfo);
+    
+    } else {
+
+      head.content_size = 0;
+
+    }
+  } else {
+
+    head.topic_size = len;
+
+    head.content_size = 0;
+
+  }
+  
+  void *buf_temp;
+  int buf_size;
+
+  if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
+  
+    buf_size = get_bus_sendbuf(head, NULL, 0, pData, head.content_size, &buf_temp);
+  
+  } else {
+  
+    buf_size = get_bus_sendbuf(head, pData, len, NULL, head.content_size, &buf_temp);
+  
+  }
+
+  if (timeout_ms > 0) {
+
+    ts.tv_sec = timeout_ms /1000;
+    
+    ts.tv_nsec = (timeout_ms - ts.tv_sec * 1000) * 1000 * 1000;
+  
+    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
+
+      ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_TIMEOUT_FLAG);
+
+    } else {
+
+      ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_TIMEOUT_FLAG);
+
+    }
+  
+  } else if (timeout_ms == 0) {
+  
+    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
+
+      ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_NOWAIT_FLAG);
+
+    } else {
+
+      ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_NOWAIT_FLAG);
+
+    }
+ 
+  } else {
+
+    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
+    
+      ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, -1);
+
+    } else {
+
+      ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, -1);
+
+    }
+  
+  }
+
+  free(buf_temp);
+
+  return ret;
+
+}
+
 /**
  * 鍙戦�佷俊鎭�
  * @key 鍙戦�佺粰璋�
@@ -60,7 +183,8 @@
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
 int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
-	int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
+
+  int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
 
 	if(rv == 0) {
     logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
@@ -77,7 +201,7 @@
  * @key 鍙戦�佺粰璋�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
-int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key, 
+int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key,
 	void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
 	int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
 
@@ -183,8 +307,8 @@
 	memcpy(head.action, "pub", sizeof(head.action));
 	head.topic_size = topic_size = strlen(topic) + 1;
 	head.content_size = content_size;
-
-	void *buf;
+	
+  void *buf;
 	int size = get_bus_sendbuf(head, topic,  topic_size, content,  content_size, &buf);
 	if(size > 0) {
 		ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
@@ -216,6 +340,7 @@
   char *buf;
   int  max_buf_size;
   void *buf_ptr;
+  int count = 0;
   if((buf = (char *) malloc(MAXBUF)) == NULL) {
     LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
     exit(1);
@@ -223,7 +348,7 @@
     max_buf_size = MAXBUF;
   }
 
-  buf_size = BUS_HEAD_SIZE + content_size + topic_size  ;
+  buf_size = BUS_HEAD_SIZE + content_size + topic_size;
   if(max_buf_size < buf_size) {
     
     if((buf = (char *) realloc(buf, buf_size)) == NULL) {
@@ -238,8 +363,19 @@
   memcpy(buf, buf_ptr, BUS_HEAD_SIZE);
   if(topic_size != 0 ) 
     memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size);
-  if(content_size != 0)
- 	 memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
+  if ((content_size != 0) && (strncmp(request_head.action, "reg", strlen("reg")) != 0) && \
+                              (strncmp(request_head.action, "unreg", strlen("unreg")) != 0)) {
+ 	  memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
+  } else {
+    if (((strncmp(request_head.action, "reg", strlen("reg")) == 0) || (strncmp(request_head.action, "unreg", \
+                                strlen("unreg")) == 0)) && (content_buf != NULL)) {
+      proc_copy(buf + BUS_HEAD_SIZE + topic_size, const_cast<void *> (content_buf), &count);
+
+      request_head.content_size = count;
+      buf_size -= (content_size - count);
+
+    }
+  }
  
   *retbuf = buf;
   free(buf_ptr);

--
Gitblit v1.8.0