From dc01e4cbb01e96d19b470a366bbe648d426ed171 Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@smartai.com>
Date: 星期六, 11 九月 2021 10:06:15 +0800
Subject: [PATCH] Add topics sub and request support.

---
 src/key_def.h                      |    1 
 src/socket/shm_socket.h            |    4 
 src/net/net_mod_socket.cpp         |   40 ++++-
 src/socket/shm_socket.cpp          |  118 ++++++++++++----
 src/socket/shm_mod_socket.h        |   19 --
 src/socket/bus_server_socket.cpp   |   53 +++++++
 src/net/net_mod_socket_wrapper.cpp |   44 ++++-
 src/net/net_mod_socket_wrapper.h   |   18 +-
 src/net/net_mod_socket.h           |   22 +-
 src/socket/bus_server_socket.h     |    1 
 src/bh_api.cpp                     |   32 +++-
 src/proc_def.h                     |    4 
 src/socket/shm_mod_socket.cpp      |    8 
 13 files changed, 262 insertions(+), 102 deletions(-)

diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 4f623f8..d25bf8a 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -139,6 +139,12 @@
     gNetmod_socket = net_mod_socket_open();
     hashtable_t *hashtable = mm_get_hashtable();
     key = hashtable_alloc_key(hashtable);
+    count = hashtable_alloc_key(hashtable);
+    rv = hashtable_alloc_key(hashtable);
+    net_mod_socket_int_set(gNetmod_socket, count);
+    net_mod_socket_svr_set(gNetmod_socket, rv);
+    sprintf(pData.int_info, "%d", count);
+    sprintf(pData.svr_info, "%d", rv);
     net_mod_socket_bind(gNetmod_socket, key);
   
     rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG);
@@ -932,8 +938,8 @@
   ::bhome_msg::MsgCommonReply mcr;
 	mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
 	mcr.mutable_errmsg()->set_errstring(errString);
-	*reply_len=mcr.ByteSizeLong();
-	*reply=malloc(*reply_len);
+	*reply_len = mcr.ByteSizeLong();
+	*reply = malloc(*reply_len);
 	mcr.SerializePartialToArray(*reply,*reply_len);
 #endif 
 
@@ -1207,6 +1213,7 @@
   int val;
   int len;
   int min;
+  int data;
   int sec, nsec;
   std::string MsgID;
   int timeout_ms = 3000;
@@ -1309,20 +1316,21 @@
         strncpy(topics_buf + strlen(buf_temp) + 1, _input1.data, strlen(_input1.data));
 #endif 
 
+        data = net_mod_socket_svr_get(gNetmod_socket); 
         if (timeout_ms > 0) {
 
           sec = timeout_ms / 1000;
           nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
           
-          rv = net_mod_socket_sendto_timeout(gNetmod_socket, topics_buf, len, val, sec, nsec); 
+          rv = net_mod_socket_sendto_timeout(gNetmod_socket, topics_buf, len, val, sec, nsec, SVR_STR, data); 
           
         } else if (timeout_ms == 0) {
 
-          rv = net_mod_socket_sendto_nowait(gNetmod_socket, topics_buf, len, val); 
+          rv = net_mod_socket_sendto_nowait(gNetmod_socket, topics_buf, len, val, SVR_STR, data); 
 
         } else {
 
-          rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val); 
+          rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val, SVR_STR, data); 
         } 
 
         free(topics_buf);
@@ -1377,6 +1385,7 @@
   int size;
   int val;
   int min, len;
+  int data;
   net_node_t node;
   int node_size;  
   int recv_arr_size;
@@ -1469,6 +1478,7 @@
         len += strlen(_input1.data);
 #endif
 
+        data = net_mod_socket_svr_get(gNetmod_socket);
         topics_buf = (char *)malloc(len);
         if (topics_buf == NULL) {
           
@@ -1620,6 +1630,7 @@
   int key;
   int size;
   int len;
+  int data;
   int sec, nsec;
   char buf_temp[MAX_STR_LEN] = { 0x00 };
   char *topics_buf = NULL;
@@ -1642,20 +1653,21 @@
 		return false;
   }
 
+  data = net_mod_socket_svr_get(gNetmod_socket);
   if (timeout_ms > 0) {
 
     sec = timeout_ms / 1000;
     nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
 
-    rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec);
+    rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec, SVR_STR, data);
 
   } else if (timeout_ms == 0) {
 
-    rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key);
+    rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key, SVR_STR, data);
   
   } else {
 
-    rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key);
+    rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key, SVR_STR, data);
   }
 
   if (rv == 0) {
@@ -1735,6 +1747,7 @@
 int BHSendReply(void *src, const void *reply, const int reply_len)
 {
   int rv;
+  int data;
   const char *_input;
   
 #if defined(PRO_DE_SERIALIZE)
@@ -1777,7 +1790,8 @@
   rv = pthread_mutex_trylock(&mutex);
   if (rv == 0) {
 
-    rv = net_mod_socket_sendto(gNetmod_socket, _input, strlen(_input), *(int *)src);
+    data = net_mod_socket_svr_get(gNetmod_socket);
+    rv = net_mod_socket_sendto(gNetmod_socket, _input, strlen(_input), *(int *)src, SVR_STR, data);
 
     memset(errString, 0x00, sizeof(errString));
     strncpy(errString, bus_strerror(rv), sizeof(errString));
diff --git a/src/key_def.h b/src/key_def.h
index fdeee2e..a25ee07 100644
--- a/src/key_def.h
+++ b/src/key_def.h
@@ -9,6 +9,7 @@
 
 // BUS key
 #define SHM_BUS_KEY 8
+#define SHM_BUS_INT_KEY 9
 
 // 缃戠粶浠g悊key
 #define SHM_NET_PROXY_KEY 99
diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
index ab065eb..2f5ce73 100644
--- a/src/net/net_mod_socket.cpp
+++ b/src/net/net_mod_socket.cpp
@@ -55,6 +55,22 @@
   return shmModSocket.reg(pData, len, buf, size, timeout_ms, flag);
 }
 
+void NetModSocket::int_set(int data) {
+  int_val = data;
+}
+
+void NetModSocket::svr_set(int data) {
+  svr_val = data;
+}
+
+int NetModSocket::int_get(void) {
+  return int_val;
+}
+
+int NetModSocket::svr_get(void) {
+  return svr_val;
+}
+
 // int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
 //   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
 //   return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
@@ -493,20 +509,20 @@
  * @key 鍙戦�佺粰璋�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
  */
-int NetModSocket::sendto(const void *buf, const int size, const int key){
-  return shmModSocket.sendto(buf, size, key);
+int NetModSocket::sendto(const void *buf, const int size, const int key, int reset, int data_set){
+  return shmModSocket.sendto(buf, size, key, 0, 0, reset, data_set);
 }
 
 // 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
-int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){
+int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec, int reset, int data_set){
   struct timespec timeout = {sec, nsec};
-  return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
+  return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG, reset, data_set);
    
 }
 
 // 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
-int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){
-  return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG);
+int NetModSocket::sendto_nowait(const void *buf, const int size, const int key, int reset, int data_set){
+  return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG, reset, data_set);
   
 }
 
@@ -515,21 +531,21 @@
  * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
-int NetModSocket::recvfrom(void **buf, int *size, int *key) {
+int NetModSocket::recvfrom(void **buf, int *size, int *key, int reset, int data_set) {
 
-  return shmModSocket.recvfrom(buf, size, key);
+  return shmModSocket.recvfrom(buf, size, key, 0, 0, reset, data_set);
  
 }
 
 // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){
+int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec, int reset, int data_set){
   struct timespec timeout = {sec, nsec};
-  return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
+  return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG, reset, data_set);
   
 }
 
-int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
-  return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG);
+int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key, int reset, int data_set){
+  return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG, reset, data_set);
 }
 
 int NetModSocket::recvandsend(recvandsend_callback_fn callback,
diff --git a/src/net/net_mod_socket.h b/src/net/net_mod_socket.h
index d8e53ae..9d9af97 100644
--- a/src/net/net_mod_socket.h
+++ b/src/net/net_mod_socket.h
@@ -71,7 +71,8 @@
 private:
    
   ShmModSocket shmModSocket;
-
+  int int_val;
+  int svr_val;
   // pthread_mutex_t sendMutex;
 
   // request header 缂栫爜涓虹綉缁滀紶杈撶殑瀛楄妭
@@ -136,7 +137,10 @@
     net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, 
     net_mod_err_t ** _err_arr, int *_err_arr_size, int timeout); 
  
-
+  void int_set(int data);
+  void svr_set(int data);
+  int int_get(void);
+  int svr_get(void);
   /**
    * 鍔熻兘鍚宻endandrecv
    * 浼樼偣锛氱嚎绋嬪畨鍏�
@@ -146,27 +150,27 @@
   // int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, 
   //   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size);
 
- 
   /**
    * 鍙戦�佷俊鎭�
    * @key 鍙戦�佺粰璋�
    * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
    */
-  int sendto( const void *buf, const int size, const int key);
+  int sendto( const void *buf, const int size, const int key, int reset = 0, int data_set = 0);
   // 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
-  int sendto_timeout( const void *buf, const int size, const int key, int sec, int nsec);
+  int sendto_timeout( const void *buf, const int size, const int key, int sec, int nsec, int reset = 0, int data_set = 0);
   // 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
-  int sendto_nowait( const void *buf, const int size, const int key);
+  int sendto_nowait( const void *buf, const int size, const int key, int reset = 0, int data_set = 0);
 
   /**
    * 鎺ユ敹淇℃伅
    * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
    * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
   */
-  int recvfrom( void **buf, int *size, int *key);
+  int recvfrom( void **buf, int *size, int *key, int reset = 0, int data_set = 0);
   // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-  int recvfrom_timeout( void **buf, int *size, int *key, int sec, int nsec);
-  int recvfrom_nowait( void **buf, int *size, int *key);
+  int recvfrom_timeout( void **buf, int *size, int *key, int sec, int nsec, int reset = 0, int data_set = 0);
+  int recvfrom_nowait( void **buf, int *size, int *key, int reset = 0, int data_set = 0);
+
   /**
    * 鏈湴鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
    * @key 鍙戦�佺粰璋�
diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp
index ab4d59d..5233635 100644
--- a/src/net/net_mod_socket_wrapper.cpp
+++ b/src/net/net_mod_socket_wrapper.cpp
@@ -57,20 +57,20 @@
  * @key 鍙戦�佺粰璋�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
  */
-int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key) {
+int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key, int reset, int data_set) {
 	NetModSocket *sockt = (NetModSocket *)_socket;
-	return sockt->sendto(buf, size, key);
+	return sockt->sendto(buf, size, key, reset, data_set);
 }
 // 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
-int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){
+int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec, int reset, int data_set){
 	NetModSocket *sockt = (NetModSocket *)_socket;
-	return sockt->sendto_timeout(buf, size, key, sec, nsec);
+	return sockt->sendto_timeout(buf, size, key, sec, nsec, reset, data_set);
 	// return sockt->sendto(buf, size, key);
 }
 // 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
-int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){
+int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key, int reset, int data_set){
 	NetModSocket *sockt = (NetModSocket *)_socket;
-	return sockt->sendto_nowait(buf, size, key);
+	return sockt->sendto_nowait(buf, size, key, reset, data_set);
 }
 
 /**
@@ -78,23 +78,23 @@
  * @port 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
-int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key){
+int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key, int reset, int data_set){
 	int rv;
 	NetModSocket *sockt = (NetModSocket *)_socket;
 
-	rv = sockt->recvfrom(buf, size, key);
+	rv = sockt->recvfrom(buf, size, key, reset, data_set);
 	return rv;
 }
 
 // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){
+int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec, int reset, int data_set){
   NetModSocket *sockt = (NetModSocket *)_socket;
-  return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
+  return sockt->recvfrom_timeout(buf, size, key, sec, nsec, reset, data_set);
 }
 
-int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){
+int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key, int reset, int data_set){
   NetModSocket *sockt = (NetModSocket *)_socket;
-  return sockt->recvfrom_nowait(buf, size, key);
+  return sockt->recvfrom_nowait(buf, size, key, reset, data_set);
 }
 
 int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
@@ -108,6 +108,26 @@
   return sockt->bind_proc_id(proc_id, len);
 }
 
+void net_mod_socket_int_set(void * _socket, int data) {
+  NetModSocket *sockt = (NetModSocket *)_socket;
+  sockt->int_set(data);
+}
+
+void net_mod_socket_svr_set(void * _socket, int data) {
+  NetModSocket *sockt = (NetModSocket *)_socket;
+  sockt->svr_set(data);
+}
+
+int net_mod_socket_int_get(void * _socket) {
+  NetModSocket *sockt = (NetModSocket *)_socket;
+  return sockt->int_get();
+}
+
+int net_mod_socket_svr_get(void * _socket) {
+  NetModSocket *sockt = (NetModSocket *)_socket;
+  return sockt->svr_get();
+}
+
 /**
  * 濡傛灉寤虹珛杩炴帴鐨勮妭鐐规病鏈夋帴鍙楀埌娑堟伅绛夊緟timeout鐨勬椂闂村悗杩斿洖
  * @timeout 绛夊緟鏃堕棿锛屽崟浣嶆槸鍗冨垎涔嬩竴绉�
diff --git a/src/net/net_mod_socket_wrapper.h b/src/net/net_mod_socket_wrapper.h
index b869510..d68a23e 100644
--- a/src/net/net_mod_socket_wrapper.h
+++ b/src/net/net_mod_socket_wrapper.h
@@ -67,7 +67,7 @@
  *
  * @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜
  */
-int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key);
+int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key, int reset = 0, int data_set = 0);
 
 /**
  * @brief  鍙戦�佷俊鎭紝鍦ㄦ寚瀹氭椂闂村唴娌″彂閫佸畬鎴愪篃杩斿洖銆� 
@@ -80,7 +80,7 @@
  *
  * @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜
  */
-int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec);
+int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec, int reset = 0, int data_set = 0);
 
 /**
  * @brief 鍙戦�佷俊鎭紝鏃犺鏄惁鍙戦�佸畬鎴愮珛鍒昏繑鍥炪��
@@ -91,7 +91,7 @@
  *
  * @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜
  */
-int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key);
+int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key, int reset = 0, int data_set = 0);
 
 /**
  * @brief 绛夊緟鎺ユ敹淇℃伅锛岀洿鍒版湁娑堟伅鎺ュ彈鍒版墠杩斿洖
@@ -102,7 +102,7 @@
  * 
  * @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜
  */
-int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key);
+int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key, int reset = 0, int data_set = 0);
 
 /**
  * @brief 绛夊緟鎺ユ敹淇℃伅锛屽湪鎸囧畾鐨勬椂闂村唴鍗充娇娌℃湁鎺ュ彈鍒版秷鎭篃瑕佽繑鍥�
@@ -115,7 +115,7 @@
  * 
  * @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜
  */
-int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec);
+int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec, int reset = 0, int data_set = 0);
 
 /**
  * @brief 绛夊緟鎺ユ敹淇℃伅锛岀洿鍒版湁娑堟伅鎺ュ彈鍒版墠杩斿洖
@@ -126,10 +126,12 @@
  * 
  * @return 0鏄垚鍔燂紝鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜
  */
-int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key);
+int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key, int reset = 0, int data_set = 0);
 
-
-
+void net_mod_socket_int_set(void * _socket, int data);
+void net_mod_socket_svr_set(void * _socket, int data);
+int net_mod_socket_int_get(void * _socket);
+int net_mod_socket_svr_get(void * _socket);
 
 /**
  * @brief 璺ㄦ満鍣ㄥ彂閫佹秷鎭苟鎺ュ彈杩斿洖鐨勫簲绛旀秷鎭紝鐩村埌鍙戦�佸畬鎴愭墠杩斿洖
diff --git a/src/proc_def.h b/src/proc_def.h
index 1d84362..2b3f57b 100644
--- a/src/proc_def.h
+++ b/src/proc_def.h
@@ -35,6 +35,8 @@
   char name[MAX_STR_LEN];
   char public_info[MAX_STR_LEN]; 
   char private_info[MAX_STR_LEN];
+  char int_info[MAX_STR_LEN];
+  char svr_info[MAX_STR_LEN];
 #endif
 } ProcInfo;
 
@@ -65,6 +67,8 @@
 }
 #endif
 
+#define INT_STR     0x01
+#define SVR_STR     0x02
 
 #endif  //end of file
 
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 8b022e1..d5e757d 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -302,6 +302,7 @@
   int count = 0;
   int i = 0;
   int len = 0;
+  int data1, data2;
   char *data_ptr;
   ProcInfo Data_stru;
   ProcZone::iterator proc_iter;
@@ -333,6 +334,13 @@
       
       memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1);
       count += strlen(buf + count) + 1;
+
+      memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1); 
+      count += strlen(buf + count) + 1;
+    
+      memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); 
+      count += strlen(buf + count) + 1;
+
     }
 
     ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
@@ -362,6 +370,9 @@
 
       if ((proc_iter = proc->find(key)) != proc->end()) {
 
+        data1 = atoi((proc_iter->second).int_info);
+        data2 = atoi((proc_iter->second).svr_info);
+        BusServerSocket::_data_remove(data1, data2);
         len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
         strncpy(buf_temp, (proc_iter->second).proc_id, len);
         proc->erase(proc_iter);
@@ -504,7 +515,9 @@
 
     free(last_buf);
   } else if (flag == PROC_QUE_STCS) {
+
     SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
 
     strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
     if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
@@ -512,6 +525,9 @@
     
       for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { 
         count = *svr_proc_iter;
+        if ((proc_iter = proc->find(count)) != proc->end()) {
+          count = atoi((proc_iter->second).svr_info);
+        }
 
         break;
       }
@@ -770,3 +786,40 @@
 
 	return rv;
 }
+
+void BusServerSocket::_data_remove(int val1, int val2) {
+
+  int i;
+  LockFreeQueue<shm_packet_t> *queue = NULL;
+  hashtable_t *hashtable = mm_get_hashtable();
+
+  void *data_ptr1 = hashtable_get(hashtable, val1);
+  void *data_ptr2 = hashtable_get(hashtable, val2);
+  if (data_ptr1 != NULL) {
+    if (data_ptr1 != (void *)1) {
+      queue = (LockFreeQueue<shm_packet_t> *)data_ptr1;
+      queue->close();
+      for (i = 0; i < queue->size(); i++) {
+        mm_free((*queue)[i].buf);
+      }
+      sleep(1);
+    }
+
+    hashtable_remove(hashtable, val1);
+  }
+
+  if (data_ptr2 != NULL) {
+    if (data_ptr2 != (void *)1) {
+      queue = (LockFreeQueue<shm_packet_t> *)data_ptr2;
+      queue->close();
+      for (i = 0; i < queue->size(); i++) {
+        mm_free((*queue)[i].buf);
+      }
+      sleep(1);
+    }
+
+    hashtable_remove(hashtable, val2);
+  }
+
+}
+
diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h
index 3052c8b..e60c700 100644
--- a/src/socket/bus_server_socket.h
+++ b/src/socket/bus_server_socket.h
@@ -81,6 +81,7 @@
 	 */
 	int get_key() ;
 
+  void _data_remove(int val1, int val2);
 
 };
 
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index a94b9c3..7562d56 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -166,8 +166,8 @@
  * @key 鍙戦�佺粰璋�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
  */
-int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag) {
-	int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag);
+int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag, int reset, int data_set) {
+	int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag, reset, data_set);
   if(rv == 0) {
 	  logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key);
 	  return 0;
@@ -182,9 +182,9 @@
  * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
-int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
+int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag, int reset, int data_set) {
 
-  int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
+  int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag, reset, data_set);
 
 	if(rv == 0) {
     logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index da02fab..5e234bf 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -64,22 +64,11 @@
 
   int bind_proc_id(char *buf, int len);
   int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
-	/**
-	 * 鍙戦�佷俊鎭�
-	 * @key 鍙戦�佺粰璋�
-	 * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
-	 * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
-	 */
- 
-	int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0);
- 
+	
+  int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0, int reset = 0, int data_set = 0);
 
-	/**
-	 * 鎺ユ敹淇℃伅
-	 * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
-	 * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
-	*/
-	int recvfrom(void **buf, int *size, int *key,  const struct timespec *timeout = NULL, int flag = 0);
+  int recvfrom(void **buf, int *size, int *key,  const struct timespec *timeout = NULL, int flag = 0, int reset = 0, int data_set = 0);
+
 	/**
 	 * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
 	 * @key 鍙戦�佺粰璋�
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 84bf77e..dc6d752 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -23,11 +23,12 @@
 static void _destrory_threadlocal_socket_(void *tmp_socket);
 static void _create_threadlocal_socket_key_(void);
 
-static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak ,  const struct timespec *timeout,  int flag);
+static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak ,  const struct timespec *timeout,  
+                int flag, int reset = 0, int data_set = 0);
 
    
-static int shm_sendpakto(shm_socket_t *sockt,  shm_packet_t *sendpak,
-               const int key, const struct timespec *timeout, const int flag);
+static int shm_sendpakto(shm_socket_t *sockt,  shm_packet_t *sendpak, const int key, const struct timespec *timeout, 
+                const int flag, int reset = 0, int data_set = 0);
 
 
 static int _shm_sendandrecv_uuid(shm_socket_t *sockt, const void *send_buf,
@@ -183,20 +184,24 @@
 }
 
 // 鐭繛鎺ユ柟寮忓彂閫�
-int shm_sendto(shm_socket_t *sockt, const void *buf, const int size,
-               const int key, const struct timespec *timeout, const int flag) {
+int shm_sendto(shm_socket_t *sockt, const void *buf, const int size, const int key, const struct timespec *timeout, 
+                    const int flag, int reset, int data_set) {
 
   int rv;
  
   shm_packet_t sendpak = {0};
-  sendpak.key = sockt->key;
+  if (reset == 0) {
+    sendpak.key = sockt->key;
+  } else {
+    sendpak.key = data_set;
+  }
   sendpak.size = size;
   if(buf != NULL) {
     sendpak.buf = mm_malloc(size);
     memcpy(sendpak.buf, buf, size);
   }
  
-  rv = shm_sendpakto(sockt, &sendpak, key, timeout, flag);
+  rv = shm_sendpakto(sockt, &sendpak, key, timeout, flag, reset, data_set);
   return rv;
 }
 
@@ -262,11 +267,11 @@
 }
 
 // 鐭繛鎺ユ柟寮忔帴鍙�
-int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key,  const struct timespec *timeout,  int flag) {
+int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key,  const struct timespec *timeout,  int flag, int reset, int data_set) {
   int rv;
   
   shm_packet_t recvpak;
-  rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag);
+  rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag, reset, data_set);
 
   if (rv != 0) {
 
@@ -544,15 +549,24 @@
  
 
    
-static int shm_sendpakto(shm_socket_t *sockt,  shm_packet_t *sendpak,
-               const int key, const struct timespec *timeout, const int flag) {
+static int shm_sendpakto(shm_socket_t *sockt,  shm_packet_t *sendpak, const int key, const struct timespec *timeout, 
+                          const int flag, int reset, int data_set) {
 
   int rv;
   shm_queue_status_t stRecord;
   LockFreeQueue<shm_packet_t> *remoteQueue;
+  LockFreeQueue<shm_packet_t> *fixedQueue;
   hashtable_t *hashtable = mm_get_hashtable();
 
-  if( sockt->queue != NULL) 
+  if ((reset != 0) && (data_set == 0)) {
+    return EBUS_KEY_INUSED;
+  }
+
+  if (reset != 0) {
+    fixedQueue = shm_socket_attach_queue(data_set);
+  }
+
+  if (((reset == 0) && (sockt->queue != NULL)) || ((reset != 0) && (fixedQueue != NULL)))
     goto LABEL_PUSH;
 
   // if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
@@ -563,7 +577,7 @@
     if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
     err_exit(rv, "shm_sendto : pthread_mutex_lock");
     
-    if (sockt->queue == NULL) {
+    if ((sockt->queue == NULL) && (reset == 0)) {
       if (sockt->key == 0) {
         sockt->key = hashtable_alloc_key(hashtable);
       }
@@ -580,6 +594,16 @@
       // stRecord.createTime = time(NULL);
       // shmQueueStMap->insert({sockt->key, stRecord});
       
+    }
+
+    if ((fixedQueue == NULL) && (reset != 0)) {
+      fixedQueue = shm_socket_bind_queue(data_set, false);
+      if (fixedQueue == NULL ) {
+        logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
+        if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
+          err_exit(rv, "shm_sendto : pthread_mutex_unlock");
+        return EBUS_KEY_INUSED;
+      }
     }
 
     if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
@@ -611,7 +635,9 @@
     goto ERR_CLOSED;
   }
 
-  sendpak->key = sockt->key;
+  if (reset == 0) {
+    sendpak->key = sockt->key;
+  }
   rv = remoteQueue->push(*sendpak, timeout, flag);
 
   if(rv != 0) {
@@ -629,13 +655,23 @@
 }
 
 // 鐭繛鎺ユ柟寮忔帴鍙�
-static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak ,  const struct timespec *timeout,  int flag) {
+static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak ,  const struct timespec *timeout,
+                            int flag, int reset, int data_set) {
   int rv;
   shm_queue_status_t stRecord;
+  LockFreeQueue<shm_packet_t> *fixedQueue;
   hashtable_t *hashtable = mm_get_hashtable();
   shm_packet_t recvpak;
 
-  if( sockt->queue != NULL) 
+  if ((reset != 0) && (data_set == 0)) {
+    return EBUS_KEY_INUSED;
+  }
+
+  if (reset != 0) {
+    fixedQueue = shm_socket_attach_queue(data_set);
+  }
+
+  if (((sockt->queue != NULL) && (reset == 0)) || ((reset != 0) && (fixedQueue != NULL)))
     goto LABEL_POP;
 
   // if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
@@ -646,21 +682,33 @@
     if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
       err_exit(rv, "shm_recvfrom : pthread_mutex_lock");
  
-    if (sockt->key == 0) {
-      sockt->key = hashtable_alloc_key(hashtable);
-    }  
-    sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
-    if(sockt->queue  == NULL ) {
-      logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
-      if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
-        err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
-      return EBUS_KEY_INUSED;
+    if ((sockt->queue == NULL) && (reset == 0)) {
+      if (sockt->key == 0) {
+        sockt->key = hashtable_alloc_key(hashtable);
+      }  
+      sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
+      if(sockt->queue  == NULL ) {
+        logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
+        if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
+          err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
+        return EBUS_KEY_INUSED;
+      }
+      
+      // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened
+      // stRecord.status = SHM_QUEUE_ST_OPENED;
+      // stRecord.createTime = time(NULL);
+      // shmQueueStMap->insert({sockt->key, stRecord});
     }
-    
-    // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened
-    // stRecord.status = SHM_QUEUE_ST_OPENED;
-    // stRecord.createTime = time(NULL);
-    // shmQueueStMap->insert({sockt->key, stRecord});
+
+    if ((fixedQueue == NULL) && (reset != 0)) {
+      fixedQueue = shm_socket_bind_queue(data_set, false);
+      if (fixedQueue == NULL ) {
+        logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
+        if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
+          err_exit(rv, "shm_sendto : pthread_mutex_unlock");
+        return EBUS_KEY_INUSED;
+      }
+    }
     
     if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
       err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
@@ -669,7 +717,11 @@
   
 LABEL_POP:
 
-  rv = sockt->queue->pop(recvpak, timeout, flag);
+  if (reset == 0) {
+    rv = sockt->queue->pop(recvpak, timeout, flag);
+  } else {
+    rv = fixedQueue->pop(recvpak, timeout, flag);
+  }
   if(rv != 0) {
     if(rv == ETIMEDOUT) {
       return EBUS_TIMEOUT;
@@ -697,6 +749,10 @@
   count += strlen(ptr->public_info) + 1;
   memcpy(dst + count, ptr->private_info, strlen(ptr->private_info) + 1);
   count += strlen(ptr->private_info) + 1;
+  memcpy(dst + count, ptr->int_info, strlen(ptr->int_info) + 1);
+  count += strlen(ptr->int_info) + 1;
+  memcpy(dst + count, ptr->svr_info, strlen(ptr->svr_info) + 1);
+  count += strlen(ptr->svr_info) + 1;
 
   *counter = count;
 }
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index 8e874d1..2b50a11 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -66,9 +66,9 @@
 /**
  * @flags : BUS_NOWAIT_FLAG
  */
-int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0);
+int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0, int reset = 0, int data_set = 0);
 
-int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key,  const struct timespec * timeout = NULL,  int flags=0);
+int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key,  const struct timespec * timeout = NULL,  int flags=0, int reset = 0, int data_set = 0);
 
 int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size,  
 	const struct timespec * timeout = NULL,  int flags = 0);

--
Gitblit v1.8.0