From c479ef57baaaa28964fc3ec8d80ff99dffa7d49f Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@smartai.com>
Date: 星期三, 10 十一月 2021 09:49:29 +0800
Subject: [PATCH] Fix the system hang issue when the app is killed contantly.

---
 src/net/net_mod_server_socket_wrapper.cpp |    2 
 src/bus_proxy_start.cpp                   |    3 
 src/shm/mm.cpp                            |   49 +++++-
 src/shm/hashtable.h                       |    3 
 src/shm/mm.h                              |    5 
 src/net/net_mod_socket.cpp                |    1 
 src/socket/shm_socket.cpp                 |   37 ++--
 src/msg_trigger/msg_mgr.h                 |   11 -
 src/socket/bus_server_socket.cpp          |   75 +++++++++-
 src/bh_api.h                              |    2 
 src/socket/bus_server_socket.h            |    4 
 src/bh_api.cpp                            |  130 ++++++++++++------
 src/proc_def.h                            |    3 
 src/socket/shm_mod_socket.cpp             |   13 +
 src/shm/hashtable.cpp                     |   56 +++++--
 15 files changed, 271 insertions(+), 123 deletions(-)

diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 37f8377..c74c80d 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -23,6 +23,7 @@
 
 static pthread_t gTids;
 
+
 static void *client_run_check(void *skptr) { 
   pthread_detach(pthread_self());
  
@@ -36,7 +37,7 @@
 
   sec = TIME_WAIT;
   nsec = 0;
-  sprintf(buf, "%s", "Success");
+  sprintf(buf, "%s", STR_EXEC);
   data = net_mod_socket_int_get(gNetmod_socket);
   while(true) {
     
@@ -45,9 +46,13 @@
       
       BHFree(buf_temp, size);
 
-      rv = net_mod_socket_sendto_timeout(gNetmod_socket, buf, strlen(buf), key, sec, nsec, SVR_STR, data);
-      if (rv != 0) {
-        logger->error("the process check response failed with error: %s!\n", bus_strerror(rv));
+      if ((gNetmod_socket != NULL) && (gRun_stat != 0)) {
+        rv = net_mod_socket_sendto_timeout(gNetmod_socket, buf, strlen(buf), key, sec, nsec, SVR_STR, data);
+        if (rv != 0) {
+          logger->error("the process check response failed with error: %s!\n", bus_strerror(rv));
+        }
+      } else {
+        break;
       }
       
     } else {
@@ -123,25 +128,25 @@
     shm_mm_wrapper_init(SHM_RES_SIZE);
     
 #if defined(PRO_DE_SERIALIZE)
-    if (_input.proc_id != NULL) {
+    if (strlen(_input.proc_id) > 0) {
       count = strlen(_input.proc_id) + 1;
       min = count > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : count;
       strncpy(pData.proc_id, _input.proc_id, min);
     }
 
-    if (_input.name != NULL) {
+    if (strlen(_input.name) > 0) {
       count = strlen(_input.name) + 1;
       min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN -1) : count;
       strncpy(pData.name, _input.name, min); 
     }
 
-    if (_input.public_info != NULL) {
+    if (strlen(_input.public_info) > 0) {
       count = strlen(_input.public_info) + 1;
       min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN - 1) : count;
       strncpy(pData.public_info, _input.public_info, min);
     }
  
-    if (_input.private_info != NULL) {
+    if (strlen(_input.private_info) > 0) {
       count = strlen(_input.private_info) + 1;
       min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN - 1): count;
       strncpy(pData.private_info, _input.private_info, min);
@@ -172,11 +177,12 @@
     }
 #endif 
 
-    if (pData.proc_id == NULL) {
+    if (strlen(pData.proc_id) == 0) {
       rv = EBUS_INVALID_PARA;
 
       bus_errorset(rv);
 
+      gRun_stat = 0;
       pthread_mutex_unlock(&mutex);
       
       goto exit_entry;
@@ -185,13 +191,13 @@
     gNetmod_socket = net_mod_socket_open();
     hashtable_t *hashtable = mm_get_hashtable();
     key = hashtable_alloc_key(hashtable);
+    net_mod_socket_bind(gNetmod_socket, key);
     count = hashtable_alloc_key(hashtable);
     rv = hashtable_alloc_key(hashtable);
     net_mod_socket_int_set(gNetmod_socket, count);
     net_mod_socket_svr_set(gNetmod_socket, rv);
     sprintf(pData.int_info, "%d", count);
     sprintf(pData.svr_info, "%d", rv);
-    net_mod_socket_bind(gNetmod_socket, key);
   
     rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG);
 
@@ -539,21 +545,28 @@
 
   if (rv == 0) {
     
-    ptr = (ProcInfo_query *)((char *)buf + sizeof(int));
-    mtr_list_num = ptr->num;
+    min = *(int *)buf;
+    if (min > 0) {
+      ptr = (ProcInfo_query *)((char *)buf + sizeof(int));
+      mtr_list_num = ptr->num;
+      
+      if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) {
+        mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]);
+      }
     
-    if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) {
-      mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]);
+      Proc_ptr = &(ptr->procData);
+      for(int i = 0; i < mtr_list_num; i++) {
+        mtr_list[i].proc_id = (Proc_ptr + i)->proc_id;
+        mtr_list[i].mq_id = ID_RSV;
+        mtr_list[i].abs_addr = ABS_ID_RSV;
+        mtr_list[i].ip = "127.0.0.1";
+        mtr_list[i].port = 5000;
+      }
+    } else {
+      mtr_list_num = 0;
     }
-  
-    Proc_ptr = &(ptr->procData);
-    for(int i = 0; i < mtr_list_num; i++) {
-      mtr_list[i].proc_id = (Proc_ptr + i)->proc_id;
-      mtr_list[i].mq_id = ID_RSV;
-      mtr_list[i].abs_addr = ABS_ID_RSV;
-      mtr_list[i].ip = "127.0.0.1";
-      mtr_list[i].port = 5000;
-    }
+
+    free(buf);
   }
   
 exit_entry:
@@ -684,27 +697,31 @@
     if (mpr_list_num > (sizeof(mpr_list) / sizeof(mpr_list[0]))) {
       mpr_list_num = sizeof(mpr_list) / sizeof(mpr_list[0]);
     }
-    
-    Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int));
-    for(int i = 0; i < mpr_list_num; i++) {
-      mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id;
-      mpr_list[i].name = (Proc_ptr + i)->procData.name;
-      mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info;
-      mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info;
-      mpr_list[i].online = (Proc_ptr + i)->stat;
-      mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num;
-      
-      for(int j = 0; j < mpr_list[i].topic_list_num; j++)
-      {
-        if (j == 0) {
-          mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info;
-        } else if (j == 1) {
-          mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info;
-        } else if (j == 2) {
-          mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info;
+   
+    if (mpr_list_num > 0) {
+      Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int));
+      for(int i = 0; i < mpr_list_num; i++) {
+        mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id;
+        mpr_list[i].name = (Proc_ptr + i)->procData.name;
+        mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info;
+        mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info;
+        mpr_list[i].online = (Proc_ptr + i)->stat;
+        mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num;
+        
+        for(int j = 0; j < mpr_list[i].topic_list_num; j++)
+        {
+          if (j == 0) {
+            mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info;
+          } else if (j == 1) {
+            mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info;
+          } else if (j == 2) {
+            mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info;
+          }
         }
       }
     }
+
+    free(buf);
   }
     
   errString = bus_strerror(0, 1);
@@ -866,8 +883,8 @@
   ::bhome_msg::MsgCommonReply mcr;
 	mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
 	mcr.mutable_errmsg()->set_errstring(errString);
-	*reply_len=mcr.ByteSizeLong();
-	*reply=malloc(*reply_len);
+	*reply_len = mcr.ByteSizeLong();
+	*reply = malloc(*reply_len);
 	mcr.SerializePartialToArray(*reply,*reply_len);
 #else 
   len = strlen(errString) + 1;
@@ -1583,6 +1600,13 @@
 exit_entry:
   errString = bus_strerror(0, 1);
   
+  if (rv != 0) {
+    if ((proc_id != NULL) && (proc_id_len != NULL)) {
+      *proc_id_len = 0;
+      *proc_id = NULL;
+    }
+  }
+  
 #if defined(PRO_DE_SERIALIZE) 
   ::bhome_msg::MsgRequestTopicReply mrt; 
   mrt.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
@@ -1707,6 +1731,12 @@
   if (rv != 0) { 
     rrr.topic = STR_RSV;
     rrr.data = STR_RSV;   
+
+    if ((proc_id != NULL) && (proc_id_len != NULL)) {
+
+      *proc_id_len = 0;
+      *proc_id = NULL;
+    }
   }
 
 #if defined(PRO_DE_SERIALIZE)
@@ -1814,13 +1844,21 @@
 
 }
 
-#if defined(MSG_HANDLER)
 int inter_key_get(void)
 {
   if (gNetmod_socket != NULL)
     return net_mod_socket_get_key(gNetmod_socket);
 
-  return 0;
+  return SHM_BUS_KEY;
 }
-#endif 
+
+void *socket_data_get(void)
+{
+  return gNetmod_socket;
+}
+
+void inter_key_set(int key)
+{
+  net_mod_socket_bind(gNetmod_socket, key);
+}
 
diff --git a/src/bh_api.h b/src/bh_api.h
index a231b0c..2a9e39a 100644
--- a/src/bh_api.h
+++ b/src/bh_api.h
@@ -110,6 +110,8 @@
 int BHGetLastError(void **msg, int *msg_len);
 
 int inter_key_get(void);
+void inter_key_set(int key);
+void *socket_data_get(void);
 #ifdef __cplusplus
 }
 #endif
diff --git a/src/bus_proxy_start.cpp b/src/bus_proxy_start.cpp
index c5eaaf5..6029e6e 100644
--- a/src/bus_proxy_start.cpp
+++ b/src/bus_proxy_start.cpp
@@ -10,6 +10,7 @@
 #include <errno.h>
 #include <getopt.h>
 #include <stdlib.h>
+#include "proc_def.h"
 #include "msg_mgr.h"
 
 using namespace std;
@@ -103,7 +104,7 @@
       }
     }
 
-    sleep(10);
+    sleep(WT_INT);
   }
 
   return NULL;
diff --git a/src/msg_trigger/msg_mgr.h b/src/msg_trigger/msg_mgr.h
index 809808b..686d47c 100644
--- a/src/msg_trigger/msg_mgr.h
+++ b/src/msg_trigger/msg_mgr.h
@@ -1,9 +1,7 @@
 #ifndef __MSG_MGR_DEF_
 #define __MSG_MGR_DEF_
 
-#ifdef __cplusplus
-extern "C" {
-#endif 
+#include "shm_allocator.h"
 
 #define SEM_TYPE_ID   0
 #define RSV_TYPE_ID   1
@@ -37,16 +35,15 @@
 
 } Msg_info;
 
-#ifdef __cplusplus
-}
-#endif
-
 int msg_init(void);
 void msg_distrib(int msg_id, Msg_info *message);
 int get_msg_info(int msg_id, Msg_info *message);
 void *sem_msg_handler(void *skptr);
 void msg_info_set(int index, Msg_info msg_obj);
 
+typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > recvbuf_val;
+typedef std::map<int, recvbuf_val *, std::less<int>, SHM_STL_Allocator<std::pair<int, recvbuf_val *> > > recvbuf_data;
+
 #endif  //end of file
 
 
diff --git a/src/net/net_mod_server_socket_wrapper.cpp b/src/net/net_mod_server_socket_wrapper.cpp
index e1ad04d..13b7f37 100644
--- a/src/net/net_mod_server_socket_wrapper.cpp
+++ b/src/net/net_mod_server_socket_wrapper.cpp
@@ -2,13 +2,11 @@
 #include "net_mod_server_socket_wrapper.h"
 
 void *net_mod_server_socket_open(int port) {
-	printf("====net_mod_server_socket_open\n");
 	NetModServerSocket *sockt = new NetModServerSocket(port);
 	return (void *)sockt;
 }
 
 void net_mod_server_socket_close(void *_sockt) {
-	printf("====net_mod_server_socket_close\n");
 	NetModServerSocket *sockt = (NetModServerSocket *)_sockt;
 	delete sockt;
 
diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
index c48c33b..acd9053 100644
--- a/src/net/net_mod_socket.cpp
+++ b/src/net/net_mod_socket.cpp
@@ -121,7 +121,6 @@
   if (mpool == NULL)
   {
     /* If first call from this thread, allocate buffer for thread, and save its location */
-    logger->debug("Create connPool");
     mpool = new NetConnPool();
     if (mpool == NULL) {
       LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ malloc");
diff --git a/src/proc_def.h b/src/proc_def.h
index 0fa2fe9..4b6bf41 100644
--- a/src/proc_def.h
+++ b/src/proc_def.h
@@ -16,11 +16,13 @@
 #define PROC_QUE_TCS    4
 #define PROC_QUE_STCS   5
 #define PROC_QUE_ATCS   6
+#define PROC_REG_BUF    7
 
 #define ID_RSV          16
 #define ABS_ID_RSV      18
 
 #define STR_MAGIC       ","
+#define STR_EXEC        "Success"
 
 typedef struct _ProcInfo {
 #if 0
@@ -64,6 +66,7 @@
 } ProcInfo_query;
 
 #define STR_RSV   "empty"
+#define WT_INT    10
 
 #ifdef __cplusplus
 }
diff --git a/src/shm/hashtable.cpp b/src/shm/hashtable.cpp
index 62d052e..14daaf0 100755
--- a/src/shm/hashtable.cpp
+++ b/src/shm/hashtable.cpp
@@ -2,6 +2,7 @@
 #include "hashtable.h"
 #include "mm.h"
 #include "svsem.h"
+#include "bh_api.h"
 #include "logger_factory.h"
 #include <set>
 #include <functional>
@@ -52,11 +53,17 @@
   }
   else
   {
-
     TAILQ_FOREACH(item, my_tailq_head, joint)
     {
-      if (key == item->key)
+      if ((item != NULL) && (key == item->key)) {
         return item->value;
+      } else {
+        mm_free(my_tailq_head);
+        hashtable->array[code] = NULL;
+        hashtable->queueCount--;
+
+        return NULL;
+      }
     }
   }
   return NULL;
@@ -71,7 +78,12 @@
   tailq_header_t *my_tailq_head = hashtable->array[code] ;
   if ( my_tailq_head == NULL)
   {
+    if (inter_key_get() == 0) {
+      inter_key_set(key);
+    }
+
     my_tailq_head  = (tailq_header_t*) mm_malloc(sizeof(tailq_header_t ));
+
     TAILQ_INIT(my_tailq_head);
     hashtable->array[code] = my_tailq_head;
     goto putnew;
@@ -79,27 +91,30 @@
 
   TAILQ_FOREACH(item, my_tailq_head, joint)
   {
-    if (key ==item->key)
+    if ((item != NULL) && (key == item->key))
     {
       oldvalue = item -> value;
       item->key= key;
-      item -> value = value;
+      item->value = value;
       return oldvalue;
-    }
+    } 
   }
 putnew:
+
+  if (inter_key_get() == 0) {
+    inter_key_set(key);
+  }
   item = (tailq_entry_t *) mm_malloc(sizeof(tailq_entry_t));
   item->key = key;
-  item -> value = value;
+  item->value = value;
   TAILQ_INSERT_TAIL(my_tailq_head, item, joint);
   return NULL;
 }
 
-void *hashtable_remove(hashtable_t *hashtable, int key)
+void hashtable_remove(hashtable_t *hashtable, int key)
 {
   size_t code = hashcode(key);
   tailq_entry_t *item;
-  void *oldvalue;
   int rv;
 
   if( (rv = svsem_uni_wait(hashtable->mutex)) != 0) {
@@ -111,29 +126,31 @@
     if((rv = svsem_post(hashtable->mutex)) != 0) {
       LoggerFactory::getLogger()->error(errno, "hashtable_remove\n");
     }
-    return NULL;
+    return;
   } else {
-    for (item = TAILQ_FIRST(my_tailq_head); item != NULL; item = TAILQ_NEXT(item, joint))
+    for (item = TAILQ_FIRST(my_tailq_head); item != NULL;)
     {
-      if (key == item->key)
-      {
-        oldvalue = item->value;
-        /* Remove the item from the tail queue. */
-        TAILQ_REMOVE(my_tailq_head, item, joint);
+      /* Remove the item from the tail queue. */
+      TAILQ_REMOVE(my_tailq_head, item, joint);
 
-        /* mm_free the item as we don't need it anymore. */
-        mm_free(item);
+      /* mm_free the item as we don't need it anymore. */
+      mm_free(item);
+
+      item = TAILQ_NEXT(item, joint);
+      if (item == NULL) {
+        mm_free(my_tailq_head);
+        hashtable->array[code] = NULL;
         hashtable->queueCount--;
         svsem_post(hashtable->mutex);
-        return oldvalue;
       }
+      return;
     }
 
     if((rv = svsem_post(hashtable->mutex)) != 0) {
       LoggerFactory::getLogger()->error(errno, "hashtable_remove\n");
     }
 
-    return NULL;
+    return;
   }
 }
 
@@ -217,6 +234,7 @@
     key++;
   }
   // 鍗犵敤key
+
   _hashtable_put(hashtable, key, (void *)1);
 
   hashtable->currentKey = key;
diff --git a/src/shm/hashtable.h b/src/shm/hashtable.h
index 7bb6eac..8909ae5 100755
--- a/src/shm/hashtable.h
+++ b/src/shm/hashtable.h
@@ -31,7 +31,8 @@
 void hashtable_put(hashtable_t *hashtable, int key, void *value) ;
 bool  hashtable_check_put(hashtable_t *hashtable, int key, void *value, bool overwrite) ;
 
-void *hashtable_remove(hashtable_t *hashtable, int key);
+static inline void _hashtable_remove(hashtable_t *hashtable, int key);
+void hashtable_remove(hashtable_t *hashtable, int key);
 void hashtable_removeall(hashtable_t *hashtable);
 int hashtable_get_queue_count(hashtable_t *hashtable) ;
 /** 
diff --git a/src/shm/mm.cpp b/src/shm/mm.cpp
index 13ec443..5837906 100644
--- a/src/shm/mm.cpp
+++ b/src/shm/mm.cpp
@@ -4,6 +4,7 @@
 #include "mm.h"
 #include "sem_util.h"
 #include "logger_factory.h"
+#include "bh_api.h"
 #include <sys/sem.h>
 #include <sys/shm.h>
 
@@ -135,10 +136,10 @@
 /*
  * mm_free - Free a block
  */
-void mm_free(void *ptr)
+void mm_free(void *ptr, int enable)
 {
-  if (ptr == 0)
-    return;
+  if ((ptr == 0) || (*(size_t *)(ptr - SIZE_T_SIZE) == 0x00)) 
+    return; 
 
   /*
    *if (!is_allocated(ptr) ) {
@@ -147,15 +148,19 @@
    *}
    */
 
-  SemUtil::dec_uni(mutex);
+  if (enable == true) {
+    SemUtil::dec_uni(mutex);
+  }
+  ptr -= SIZE_T_SIZE;
   size_t size = GET_SIZE(HDRP(ptr));
   PUT(HDRP(ptr), PACK(size, 0));
   PUT(FTRP(ptr), PACK(size, 0));
+  *(size_t *)ptr = 0x00;
   coalesce(ptr);
-  SemUtil::inc(mutex);
+  if (enable == true) {
+    SemUtil::inc(mutex);
+  }
 }
-
-
 
 /*
  * mm_realloc - Naive implementation of realloc
@@ -389,15 +394,13 @@
   PUT(HDRP(bp), PACK(size, 0));         /* Free block header */   //line:vm:mm:freeblockhdr
   PUT(FTRP(bp), PACK(size, 0));         /* Free block footer */   //line:vm:mm:freeblockftr
   PUT(HDRP(NEXT_BLKP(bp)), PACK(0, 1)); /* New epilogue header */ //line:vm:mm:newepihdr
-
+  
   /* Coalesce if the previous block was free */
   return coalesce(bp);                                          //line:vm:mm:returnblock
 }
 
 static void insert_fblock (void *bp)
 {
-  //鍚庤繘鍏堝嚭鐨勬柟寮忔彃鍏ワ紝鍗虫彃鍏ラ摼琛ㄥご浣嶇疆
-
   // insert into the header of the free list
   PUT_PTR(SUCCRP(bp), NEXT_FBLKP(heap_listp)); //the successor of bp point to the old first free block
   PUT_PTR(PREDRP(NEXT_FBLKP(heap_listp)), bp); //the predecessor of the old first free block point to bp
@@ -489,7 +492,10 @@
     PUT(FTRP(bp), PACK(csize, 1));
     rm_fblock(bp);
   }
-  return bp;
+
+  *(size_t *)bp = inter_key_get();
+  
+  return (bp + SIZE_T_SIZE);
 }
 
 static int is_allocated(void *ptr)
@@ -514,6 +520,24 @@
 
 }
 
+void find_mm_data(int val)
+{
+  void *bp = heap_listp;
+
+  SemUtil::dec(mutex);
+  for (bp = heap_listp; GET_SIZE(HDRP(bp)) > 0; bp = NEXT_BLKP(bp))
+  {
+    if (GET_ALLOC(HDRP(bp))) {
+      if ((*(size_t *)bp) == val) {
+        mm_free(bp + SIZE_T_SIZE, false);
+      }
+    }
+  }
+  SemUtil::inc(mutex);
+
+  return;
+}
+
 /*
  * find_fit - Find a fit for a block with size bytes
  */
@@ -526,6 +550,9 @@
     if (!GET_ALLOC(HDRP(bp)) && (size <= GET_SIZE(HDRP(bp))))
     {
       return bp;
+    } else if (GET_ALLOC(HDRP(bp)) && (GET_SIZE(HDRP(bp)) == 0)) 
+    {
+      break;
     }
   }
   return NULL; /* No fit */
diff --git a/src/shm/mm.h b/src/shm/mm.h
index 6dbb979..d7d9a84 100644
--- a/src/shm/mm.h
+++ b/src/shm/mm.h
@@ -8,9 +8,10 @@
 extern bool mm_init(size_t heap_size);
 extern bool mm_destroy(void);
 
-extern void *mm_malloc (size_t size);
-extern void mm_free (void *ptr);
+void *mm_malloc (size_t size);
+void mm_free (void *ptr, int enable = true);
 extern void *mm_realloc(void *ptr, size_t size);
+extern void find_mm_data(int val);
 
 extern void * mm_get_by_key(int key);
 
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 315c356..0a44949 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -2,6 +2,7 @@
 #include "bus_server_socket.h"
 #include "shm_mod_socket.h"
 #include "shm_socket.h"
+#include "msg_mgr.h"
 #include "bus_error.h"
 
 static Logger *logger = LoggerFactory::getLogger();
@@ -303,7 +304,7 @@
   LinkNode *pNew = NULL;
   LinkNode *pCur = NULL;
  
-  pNew = new(LinkNode);
+  pNew = (LinkNode *)malloc(sizeof(LinkNode));
   pNew->data = aData;
   pNew->data_fix = bData;
   pNew->count = 0;
@@ -340,7 +341,7 @@
     
     head = pCur->next;
     
-    delete(pCur);
+    free(pCur);
     
     pCur = head;
     
@@ -353,7 +354,7 @@
       pCur->next = pNext->next;
       pCur = pNext->next;
 
-      delete(pNext);
+      free(pNext);
     } else {
     
       pCur = pNext;
@@ -559,7 +560,10 @@
         procQuePart->erase(buf_temp);
       }
 
+      BusServerSocket::buf_data_remove(key);
+      find_mm_data(key);
     }
+
   } else if (flag == PROC_REG_TCS) {
     ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
     SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
@@ -709,7 +713,7 @@
     sprintf(data_buf, "%d", count);
     shm_sendto(shm_socket, data_buf, strlen(data_buf), key, &timeout, BUS_TIMEOUT_FLAG);
 
-  } else {
+  } else if (flag == PROC_QUE_ATCS) {
 
     int val;
     int temp = 0;
@@ -853,6 +857,17 @@
     shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
 
     free(last_buf);
+  } else {
+
+    char *ptr = NULL;
+    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
+
+    data1 = atoi(buf_temp);
+    ptr = strstr(buf_temp, STR_MAGIC);
+    if (ptr != NULL) {
+      data2 = atoi(ptr + 1);
+    }
+    BusServerSocket::buf_data_set(data1, data2); 
   }
 }
 
@@ -888,7 +903,7 @@
 	int key;
   int flag;
   char buf_temp[MAX_STR_LEN] = { 0x00 };
-	char * action, *topic, *topics, *buf, *content;
+	char *action, *topic, *topics, *buf, *content;
 	size_t head_len;
 	bus_head_t head;
     int val;
@@ -935,7 +950,8 @@
 		} 
     else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \
             || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \
-            || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) {
+            || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0) \
+            || (strcmp(action, "bufreg") == 0)) {
       content = topics + head.topic_size;
       if (strcmp(action, "reg") == 0) {
         
@@ -957,15 +973,19 @@
         
         flag = PROC_QUE_STCS; 
 
-      } else {
+      } else if (strcmp(action, "atcsque") == 0) {
         
         flag = PROC_QUE_ATCS;
+
+      } else {
+        
+        flag = PROC_REG_BUF;
 
       }
         
       if (flag == PROC_REG) {
         memcpy(buf_temp, content, strlen(content) + 1);
-
+        
         if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
 
           val = proc_que_iter->second;
@@ -996,6 +1016,7 @@
   hashtable_t *hashtable = mm_get_hashtable();
 
   void *data_ptr = hashtable_get(hashtable, val);
+
   if (data_ptr != NULL) {
     if (data_ptr != (void *)1) {
       queue = (LockFreeQueue<shm_packet_t> *)data_ptr;
@@ -1011,3 +1032,41 @@
 
 }
 
+void BusServerSocket::buf_data_set(int data, int val) {
+  recvbuf_val *val_buf;
+  recvbuf_data::iterator data_iter;
+  recvbuf_val::iterator val_iter;
+
+  if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
+    val_buf = data_iter->second;
+  } else {
+    void *set_ptr = mm_malloc(sizeof(recvbuf_val));
+
+    val_buf = new(set_ptr) recvbuf_val;
+    recvBuf_data.insert({data, val_buf}); 
+  }
+
+  val_buf->insert(val);
+}
+
+void BusServerSocket::buf_data_remove(int data) {
+
+  int val;
+  recvbuf_val *val_buf;
+  recvbuf_data::iterator data_iter;
+  recvbuf_val::iterator val_iter;
+
+  if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
+    
+    val_buf = data_iter->second;
+    for(val_iter = val_buf->begin(); val_iter != val_buf->end(); ++val_iter) {
+      val = *val_iter;
+
+      BusServerSocket::_data_remove(val);
+    }
+
+    recvBuf_data.erase(data);
+  }
+}
+
+
diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h
index 7b94ba1..4cd0140 100644
--- a/src/socket/bus_server_socket.h
+++ b/src/socket/bus_server_socket.h
@@ -8,6 +8,7 @@
 #include "sem_util.h"
 #include "logger_factory.h"
 #include "key_def.h"
+#include "msg_mgr.h"
 #include "socket_def.h"
 #include <set>
 
@@ -62,6 +63,7 @@
   // pthread_t recv_thread;
   // <涓婚锛� 璁㈤槄鑰�>
 	SHMTopicSubMap *topic_sub_map;
+  recvbuf_data recvBuf_data;
 
 private:
 	int  destroy();
@@ -122,6 +124,8 @@
 	int get_key() ;
 
   void _data_remove(int val);
+  void buf_data_set(int data, int val);
+  void buf_data_remove(int data);
 
 };
 
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index 6139d34..6058308 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -69,6 +69,10 @@
 
     memcpy(head.action, "atcsque", sizeof(head.action));
 
+  } else if (flag == PROC_REG_BUF) {
+
+    memcpy(head.action, "bufreg", sizeof(head.action));
+
   } else {
 
     return -1;
@@ -115,7 +119,7 @@
     
     ts.tv_nsec = (timeout_ms - ts.tv_sec * 1000) * 1000 * 1000;
   
-    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
+    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) {
 
       ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_TIMEOUT_FLAG);
 
@@ -127,7 +131,7 @@
   
   } else if (timeout_ms == 0) {
   
-    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
+    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) {
 
       ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_NOWAIT_FLAG);
 
@@ -139,7 +143,7 @@
  
   } else {
 
-    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
+    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) {
     
       ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, -1);
 
@@ -165,7 +169,6 @@
 int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag, int reset, int data_set) {
 	int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag, reset, data_set);
   if(rv == 0) {
-	  logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key);
 	  return 0;
   }
 
@@ -183,7 +186,6 @@
   int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag, reset, data_set);
 
 	if(rv == 0) {
-    logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
     return 0;
   }
 
@@ -202,7 +204,6 @@
 	int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
 
 	if(rv == 0) {
-	  logger->debug("ShmModSocket::sendandrecv:  sendandrecv to %d success.\n", send_key);
 	  return 0;
   }
 
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 709505f..ae9a98b 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -2,10 +2,12 @@
 #include "socket_def.h"
 #include "hashtable.h"
 #include "logger_factory.h"
+#include "net_mod_socket_wrapper.h"
 #include <map>
 #include <cassert>
 #include "bus_error.h"
 #include "sole.h"
+#include "bh_api.h"
 #include "shm_mm.h"
 #include "key_def.h"
 
@@ -105,9 +107,6 @@
 
 
 static int _shm_socket_close_(shm_socket_t *sockt) {
-  
-  int rv, i;
-  hashtable_t *hashtable = mm_get_hashtable();
 
   // if(sockt->key != 0) {
   //   auto it =  shmQueueStMap->find(sockt->key);
@@ -117,18 +116,6 @@
   //   }
   // }
 
-  if(sockt->queue != NULL) {
-    sockt->queue->close();
-    for( i = 0; i < sockt->queue->size(); i++) {
-      mm_free((*(sockt->queue))[i].buf);
-      logger->info("======= %d free queue element buf\n", sockt->key);
-    }
-    sleep(1);
-
-    hashtable_remove(hashtable, sockt->key);
-  //   sockt->queue = NULL;
-  }
- 
   pthread_mutex_destroy(&(sockt->mutex) );
   free(sockt);
   return 0;
@@ -404,13 +391,16 @@
                     const int send_size, const int key, void **recv_buf,
                     int *recv_size,  const struct timespec *timeout,  int flags) {
   
- 
+  int data;
+  int timeout_ms;
+  char data_buf[MAX_STR_LEN] = { 0x00 };
   int rv = 0, tryn = 16;
   shm_packet_t sendpak;
   shm_packet_t recvpak;
   std::map<int, shm_packet_t>::iterator recvbufIter;
   shm_socket_t *tmp_socket = NULL;
- 
+  hashtable_t *hashtable = mm_get_hashtable();
+
   rv = pthread_once(&_once_, _create_threadlocal_socket_key_);
   if (rv != 0) {
     logger->error(rv, "shm_sendandrecv pthread_once");
@@ -421,7 +411,15 @@
   if (tmp_socket == NULL)
   {
     tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
-    
+   
+    tmp_socket->key = hashtable_alloc_key(hashtable);
+    data = inter_key_get();
+    timeout_ms = timeout->tv_sec * 1000 + 3000;
+    sprintf(data_buf, "%d, %d", data, tmp_socket->key);
+    if (socket_data_get() != NULL) {
+      net_mod_socket_reg(socket_data_get(), data_buf, strlen(data_buf), NULL, 0, timeout_ms, PROC_REG_BUF);
+    }
+
     rv =  pthread_setspecific(_localthread_socket_key_, tmp_socket);
     if ( rv != 0) {
       logger->error(rv, "shm_sendandrecv : pthread_setspecific");
@@ -564,6 +562,7 @@
       if (sockt->key == 0) {
         sockt->key = hashtable_alloc_key(hashtable);
       }
+
       sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
       if(sockt->queue  == NULL ) {
         logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
@@ -729,7 +728,7 @@
   count += strlen(ptr->int_info) + 1;
   memcpy(dst + count, ptr->svr_info, strlen(ptr->svr_info) + 1);
   count += strlen(ptr->svr_info) + 1;
-
+  
   *counter = count;
 }
 

--
Gitblit v1.8.0