fujuntang
2021-11-10 c479ef57baaaa28964fc3ec8d80ff99dffa7d49f
Fix the system hang issue when the app is killed contantly.
15个文件已修改
394 ■■■■■ 已修改文件
src/bh_api.cpp 130 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_proxy_start.cpp 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg_trigger/msg_mgr.h 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_server_socket_wrapper.cpp 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.cpp 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/proc_def.h 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/hashtable.cpp 56 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/hashtable.h 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/mm.cpp 49 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/mm.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.cpp 75 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.cpp 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp
@@ -23,6 +23,7 @@
static pthread_t gTids;
static void *client_run_check(void *skptr) { 
  pthread_detach(pthread_self());
 
@@ -36,7 +37,7 @@
  sec = TIME_WAIT;
  nsec = 0;
  sprintf(buf, "%s", "Success");
  sprintf(buf, "%s", STR_EXEC);
  data = net_mod_socket_int_get(gNetmod_socket);
  while(true) {
    
@@ -45,9 +46,13 @@
      
      BHFree(buf_temp, size);
      rv = net_mod_socket_sendto_timeout(gNetmod_socket, buf, strlen(buf), key, sec, nsec, SVR_STR, data);
      if (rv != 0) {
        logger->error("the process check response failed with error: %s!\n", bus_strerror(rv));
      if ((gNetmod_socket != NULL) && (gRun_stat != 0)) {
        rv = net_mod_socket_sendto_timeout(gNetmod_socket, buf, strlen(buf), key, sec, nsec, SVR_STR, data);
        if (rv != 0) {
          logger->error("the process check response failed with error: %s!\n", bus_strerror(rv));
        }
      } else {
        break;
      }
      
    } else {
@@ -123,25 +128,25 @@
    shm_mm_wrapper_init(SHM_RES_SIZE);
    
#if defined(PRO_DE_SERIALIZE)
    if (_input.proc_id != NULL) {
    if (strlen(_input.proc_id) > 0) {
      count = strlen(_input.proc_id) + 1;
      min = count > (MAX_STR_LEN - 1) ? (MAX_STR_LEN - 1) : count;
      strncpy(pData.proc_id, _input.proc_id, min);
    }
    if (_input.name != NULL) {
    if (strlen(_input.name) > 0) {
      count = strlen(_input.name) + 1;
      min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN -1) : count;
      strncpy(pData.name, _input.name, min); 
    }
    if (_input.public_info != NULL) {
    if (strlen(_input.public_info) > 0) {
      count = strlen(_input.public_info) + 1;
      min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN - 1) : count;
      strncpy(pData.public_info, _input.public_info, min);
    }
 
    if (_input.private_info != NULL) {
    if (strlen(_input.private_info) > 0) {
      count = strlen(_input.private_info) + 1;
      min = count > (MAX_STR_LEN - 1)? (MAX_STR_LEN - 1): count;
      strncpy(pData.private_info, _input.private_info, min);
@@ -172,11 +177,12 @@
    }
#endif 
    if (pData.proc_id == NULL) {
    if (strlen(pData.proc_id) == 0) {
      rv = EBUS_INVALID_PARA;
      bus_errorset(rv);
      gRun_stat = 0;
      pthread_mutex_unlock(&mutex);
      
      goto exit_entry;
@@ -185,13 +191,13 @@
    gNetmod_socket = net_mod_socket_open();
    hashtable_t *hashtable = mm_get_hashtable();
    key = hashtable_alloc_key(hashtable);
    net_mod_socket_bind(gNetmod_socket, key);
    count = hashtable_alloc_key(hashtable);
    rv = hashtable_alloc_key(hashtable);
    net_mod_socket_int_set(gNetmod_socket, count);
    net_mod_socket_svr_set(gNetmod_socket, rv);
    sprintf(pData.int_info, "%d", count);
    sprintf(pData.svr_info, "%d", rv);
    net_mod_socket_bind(gNetmod_socket, key);
  
    rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG);
@@ -539,21 +545,28 @@
  if (rv == 0) {
    
    ptr = (ProcInfo_query *)((char *)buf + sizeof(int));
    mtr_list_num = ptr->num;
    min = *(int *)buf;
    if (min > 0) {
      ptr = (ProcInfo_query *)((char *)buf + sizeof(int));
      mtr_list_num = ptr->num;
      if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) {
        mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]);
      }
    
    if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) {
      mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]);
      Proc_ptr = &(ptr->procData);
      for(int i = 0; i < mtr_list_num; i++) {
        mtr_list[i].proc_id = (Proc_ptr + i)->proc_id;
        mtr_list[i].mq_id = ID_RSV;
        mtr_list[i].abs_addr = ABS_ID_RSV;
        mtr_list[i].ip = "127.0.0.1";
        mtr_list[i].port = 5000;
      }
    } else {
      mtr_list_num = 0;
    }
    Proc_ptr = &(ptr->procData);
    for(int i = 0; i < mtr_list_num; i++) {
      mtr_list[i].proc_id = (Proc_ptr + i)->proc_id;
      mtr_list[i].mq_id = ID_RSV;
      mtr_list[i].abs_addr = ABS_ID_RSV;
      mtr_list[i].ip = "127.0.0.1";
      mtr_list[i].port = 5000;
    }
    free(buf);
  }
  
exit_entry:
@@ -684,27 +697,31 @@
    if (mpr_list_num > (sizeof(mpr_list) / sizeof(mpr_list[0]))) {
      mpr_list_num = sizeof(mpr_list) / sizeof(mpr_list[0]);
    }
    Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int));
    for(int i = 0; i < mpr_list_num; i++) {
      mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id;
      mpr_list[i].name = (Proc_ptr + i)->procData.name;
      mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info;
      mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info;
      mpr_list[i].online = (Proc_ptr + i)->stat;
      mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num;
      for(int j = 0; j < mpr_list[i].topic_list_num; j++)
      {
        if (j == 0) {
          mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info;
        } else if (j == 1) {
          mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info;
        } else if (j == 2) {
          mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info;
    if (mpr_list_num > 0) {
      Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int));
      for(int i = 0; i < mpr_list_num; i++) {
        mpr_list[i].proc_id = (Proc_ptr + i)->procData.proc_id;
        mpr_list[i].name = (Proc_ptr + i)->procData.name;
        mpr_list[i].public_info = (Proc_ptr + i)->procData.public_info;
        mpr_list[i].private_info = (Proc_ptr + i)->procData.private_info;
        mpr_list[i].online = (Proc_ptr + i)->stat;
        mpr_list[i].topic_list_num = (Proc_ptr + i)->list_num;
        for(int j = 0; j < mpr_list[i].topic_list_num; j++)
        {
          if (j == 0) {
            mpr_list[i].topic_list[j] = (Proc_ptr + i)->reg_info;
          } else if (j == 1) {
            mpr_list[i].topic_list[j] = (Proc_ptr + i)->local_info;
          } else if (j == 2) {
            mpr_list[i].topic_list[j] = (Proc_ptr + i)->net_info;
          }
        }
      }
    }
    free(buf);
  }
    
  errString = bus_strerror(0, 1);
@@ -866,8 +883,8 @@
  ::bhome_msg::MsgCommonReply mcr;
    mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
    mcr.mutable_errmsg()->set_errstring(errString);
    *reply_len=mcr.ByteSizeLong();
    *reply=malloc(*reply_len);
    *reply_len = mcr.ByteSizeLong();
    *reply = malloc(*reply_len);
    mcr.SerializePartialToArray(*reply,*reply_len);
#else 
  len = strlen(errString) + 1;
@@ -1583,6 +1600,13 @@
exit_entry:
  errString = bus_strerror(0, 1);
  
  if (rv != 0) {
    if ((proc_id != NULL) && (proc_id_len != NULL)) {
      *proc_id_len = 0;
      *proc_id = NULL;
    }
  }
#if defined(PRO_DE_SERIALIZE) 
  ::bhome_msg::MsgRequestTopicReply mrt; 
  mrt.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
@@ -1707,6 +1731,12 @@
  if (rv != 0) { 
    rrr.topic = STR_RSV;
    rrr.data = STR_RSV;   
    if ((proc_id != NULL) && (proc_id_len != NULL)) {
      *proc_id_len = 0;
      *proc_id = NULL;
    }
  }
#if defined(PRO_DE_SERIALIZE)
@@ -1814,13 +1844,21 @@
}
#if defined(MSG_HANDLER)
int inter_key_get(void)
{
  if (gNetmod_socket != NULL)
    return net_mod_socket_get_key(gNetmod_socket);
  return 0;
  return SHM_BUS_KEY;
}
#endif
void *socket_data_get(void)
{
  return gNetmod_socket;
}
void inter_key_set(int key)
{
  net_mod_socket_bind(gNetmod_socket, key);
}
src/bh_api.h
@@ -110,6 +110,8 @@
int BHGetLastError(void **msg, int *msg_len);
int inter_key_get(void);
void inter_key_set(int key);
void *socket_data_get(void);
#ifdef __cplusplus
}
#endif
src/bus_proxy_start.cpp
@@ -10,6 +10,7 @@
#include <errno.h>
#include <getopt.h>
#include <stdlib.h>
#include "proc_def.h"
#include "msg_mgr.h"
using namespace std;
@@ -103,7 +104,7 @@
      }
    }
    sleep(10);
    sleep(WT_INT);
  }
  return NULL;
src/msg_trigger/msg_mgr.h
@@ -1,9 +1,7 @@
#ifndef __MSG_MGR_DEF_
#define __MSG_MGR_DEF_
#ifdef __cplusplus
extern "C" {
#endif
#include "shm_allocator.h"
#define SEM_TYPE_ID   0
#define RSV_TYPE_ID   1
@@ -37,16 +35,15 @@
} Msg_info;
#ifdef __cplusplus
}
#endif
int msg_init(void);
void msg_distrib(int msg_id, Msg_info *message);
int get_msg_info(int msg_id, Msg_info *message);
void *sem_msg_handler(void *skptr);
void msg_info_set(int index, Msg_info msg_obj);
typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > recvbuf_val;
typedef std::map<int, recvbuf_val *, std::less<int>, SHM_STL_Allocator<std::pair<int, recvbuf_val *> > > recvbuf_data;
#endif  //end of file
src/net/net_mod_server_socket_wrapper.cpp
@@ -2,13 +2,11 @@
#include "net_mod_server_socket_wrapper.h"
void *net_mod_server_socket_open(int port) {
    printf("====net_mod_server_socket_open\n");
    NetModServerSocket *sockt = new NetModServerSocket(port);
    return (void *)sockt;
}
void net_mod_server_socket_close(void *_sockt) {
    printf("====net_mod_server_socket_close\n");
    NetModServerSocket *sockt = (NetModServerSocket *)_sockt;
    delete sockt;
src/net/net_mod_socket.cpp
@@ -121,7 +121,6 @@
  if (mpool == NULL)
  {
    /* If first call from this thread, allocate buffer for thread, and save its location */
    logger->debug("Create connPool");
    mpool = new NetConnPool();
    if (mpool == NULL) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ malloc");
src/proc_def.h
@@ -16,11 +16,13 @@
#define PROC_QUE_TCS    4
#define PROC_QUE_STCS   5
#define PROC_QUE_ATCS   6
#define PROC_REG_BUF    7
#define ID_RSV          16
#define ABS_ID_RSV      18
#define STR_MAGIC       ","
#define STR_EXEC        "Success"
typedef struct _ProcInfo {
#if 0
@@ -64,6 +66,7 @@
} ProcInfo_query;
#define STR_RSV   "empty"
#define WT_INT    10
#ifdef __cplusplus
}
src/shm/hashtable.cpp
@@ -2,6 +2,7 @@
#include "hashtable.h"
#include "mm.h"
#include "svsem.h"
#include "bh_api.h"
#include "logger_factory.h"
#include <set>
#include <functional>
@@ -52,11 +53,17 @@
  }
  else
  {
    TAILQ_FOREACH(item, my_tailq_head, joint)
    {
      if (key == item->key)
      if ((item != NULL) && (key == item->key)) {
        return item->value;
      } else {
        mm_free(my_tailq_head);
        hashtable->array[code] = NULL;
        hashtable->queueCount--;
        return NULL;
      }
    }
  }
  return NULL;
@@ -71,7 +78,12 @@
  tailq_header_t *my_tailq_head = hashtable->array[code] ;
  if ( my_tailq_head == NULL)
  {
    if (inter_key_get() == 0) {
      inter_key_set(key);
    }
    my_tailq_head  = (tailq_header_t*) mm_malloc(sizeof(tailq_header_t ));
    TAILQ_INIT(my_tailq_head);
    hashtable->array[code] = my_tailq_head;
    goto putnew;
@@ -79,27 +91,30 @@
  TAILQ_FOREACH(item, my_tailq_head, joint)
  {
    if (key ==item->key)
    if ((item != NULL) && (key == item->key))
    {
      oldvalue = item -> value;
      item->key= key;
      item -> value = value;
      item->value = value;
      return oldvalue;
    }
    }
  }
putnew:
  if (inter_key_get() == 0) {
    inter_key_set(key);
  }
  item = (tailq_entry_t *) mm_malloc(sizeof(tailq_entry_t));
  item->key = key;
  item -> value = value;
  item->value = value;
  TAILQ_INSERT_TAIL(my_tailq_head, item, joint);
  return NULL;
}
void *hashtable_remove(hashtable_t *hashtable, int key)
void hashtable_remove(hashtable_t *hashtable, int key)
{
  size_t code = hashcode(key);
  tailq_entry_t *item;
  void *oldvalue;
  int rv;
  if( (rv = svsem_uni_wait(hashtable->mutex)) != 0) {
@@ -111,29 +126,31 @@
    if((rv = svsem_post(hashtable->mutex)) != 0) {
      LoggerFactory::getLogger()->error(errno, "hashtable_remove\n");
    }
    return NULL;
    return;
  } else {
    for (item = TAILQ_FIRST(my_tailq_head); item != NULL; item = TAILQ_NEXT(item, joint))
    for (item = TAILQ_FIRST(my_tailq_head); item != NULL;)
    {
      if (key == item->key)
      {
        oldvalue = item->value;
        /* Remove the item from the tail queue. */
        TAILQ_REMOVE(my_tailq_head, item, joint);
      /* Remove the item from the tail queue. */
      TAILQ_REMOVE(my_tailq_head, item, joint);
        /* mm_free the item as we don't need it anymore. */
        mm_free(item);
      /* mm_free the item as we don't need it anymore. */
      mm_free(item);
      item = TAILQ_NEXT(item, joint);
      if (item == NULL) {
        mm_free(my_tailq_head);
        hashtable->array[code] = NULL;
        hashtable->queueCount--;
        svsem_post(hashtable->mutex);
        return oldvalue;
      }
      return;
    }
    if((rv = svsem_post(hashtable->mutex)) != 0) {
      LoggerFactory::getLogger()->error(errno, "hashtable_remove\n");
    }
    return NULL;
    return;
  }
}
@@ -217,6 +234,7 @@
    key++;
  }
  // 占用key
  _hashtable_put(hashtable, key, (void *)1);
  hashtable->currentKey = key;
src/shm/hashtable.h
@@ -31,7 +31,8 @@
void hashtable_put(hashtable_t *hashtable, int key, void *value) ;
bool  hashtable_check_put(hashtable_t *hashtable, int key, void *value, bool overwrite) ;
void *hashtable_remove(hashtable_t *hashtable, int key);
static inline void _hashtable_remove(hashtable_t *hashtable, int key);
void hashtable_remove(hashtable_t *hashtable, int key);
void hashtable_removeall(hashtable_t *hashtable);
int hashtable_get_queue_count(hashtable_t *hashtable) ;
/** 
src/shm/mm.cpp
@@ -4,6 +4,7 @@
#include "mm.h"
#include "sem_util.h"
#include "logger_factory.h"
#include "bh_api.h"
#include <sys/sem.h>
#include <sys/shm.h>
@@ -135,10 +136,10 @@
/*
 * mm_free - Free a block
 */
void mm_free(void *ptr)
void mm_free(void *ptr, int enable)
{
  if (ptr == 0)
    return;
  if ((ptr == 0) || (*(size_t *)(ptr - SIZE_T_SIZE) == 0x00))
    return;
  /*
   *if (!is_allocated(ptr) ) {
@@ -147,15 +148,19 @@
   *}
   */
  SemUtil::dec_uni(mutex);
  if (enable == true) {
    SemUtil::dec_uni(mutex);
  }
  ptr -= SIZE_T_SIZE;
  size_t size = GET_SIZE(HDRP(ptr));
  PUT(HDRP(ptr), PACK(size, 0));
  PUT(FTRP(ptr), PACK(size, 0));
  *(size_t *)ptr = 0x00;
  coalesce(ptr);
  SemUtil::inc(mutex);
  if (enable == true) {
    SemUtil::inc(mutex);
  }
}
/*
 * mm_realloc - Naive implementation of realloc
@@ -389,15 +394,13 @@
  PUT(HDRP(bp), PACK(size, 0));         /* Free block header */   //line:vm:mm:freeblockhdr
  PUT(FTRP(bp), PACK(size, 0));         /* Free block footer */   //line:vm:mm:freeblockftr
  PUT(HDRP(NEXT_BLKP(bp)), PACK(0, 1)); /* New epilogue header */ //line:vm:mm:newepihdr
  /* Coalesce if the previous block was free */
  return coalesce(bp);                                          //line:vm:mm:returnblock
}
static void insert_fblock (void *bp)
{
  //后进先出的方式插入,即插入链表头位置
  // insert into the header of the free list
  PUT_PTR(SUCCRP(bp), NEXT_FBLKP(heap_listp)); //the successor of bp point to the old first free block
  PUT_PTR(PREDRP(NEXT_FBLKP(heap_listp)), bp); //the predecessor of the old first free block point to bp
@@ -489,7 +492,10 @@
    PUT(FTRP(bp), PACK(csize, 1));
    rm_fblock(bp);
  }
  return bp;
  *(size_t *)bp = inter_key_get();
  return (bp + SIZE_T_SIZE);
}
static int is_allocated(void *ptr)
@@ -514,6 +520,24 @@
}
void find_mm_data(int val)
{
  void *bp = heap_listp;
  SemUtil::dec(mutex);
  for (bp = heap_listp; GET_SIZE(HDRP(bp)) > 0; bp = NEXT_BLKP(bp))
  {
    if (GET_ALLOC(HDRP(bp))) {
      if ((*(size_t *)bp) == val) {
        mm_free(bp + SIZE_T_SIZE, false);
      }
    }
  }
  SemUtil::inc(mutex);
  return;
}
/*
 * find_fit - Find a fit for a block with size bytes
 */
@@ -526,6 +550,9 @@
    if (!GET_ALLOC(HDRP(bp)) && (size <= GET_SIZE(HDRP(bp))))
    {
      return bp;
    } else if (GET_ALLOC(HDRP(bp)) && (GET_SIZE(HDRP(bp)) == 0))
    {
      break;
    }
  }
  return NULL; /* No fit */
src/shm/mm.h
@@ -8,9 +8,10 @@
extern bool mm_init(size_t heap_size);
extern bool mm_destroy(void);
extern void *mm_malloc (size_t size);
extern void mm_free (void *ptr);
void *mm_malloc (size_t size);
void mm_free (void *ptr, int enable = true);
extern void *mm_realloc(void *ptr, size_t size);
extern void find_mm_data(int val);
extern void * mm_get_by_key(int key);
src/socket/bus_server_socket.cpp
@@ -2,6 +2,7 @@
#include "bus_server_socket.h"
#include "shm_mod_socket.h"
#include "shm_socket.h"
#include "msg_mgr.h"
#include "bus_error.h"
static Logger *logger = LoggerFactory::getLogger();
@@ -303,7 +304,7 @@
  LinkNode *pNew = NULL;
  LinkNode *pCur = NULL;
 
  pNew = new(LinkNode);
  pNew = (LinkNode *)malloc(sizeof(LinkNode));
  pNew->data = aData;
  pNew->data_fix = bData;
  pNew->count = 0;
@@ -340,7 +341,7 @@
    
    head = pCur->next;
    
    delete(pCur);
    free(pCur);
    
    pCur = head;
    
@@ -353,7 +354,7 @@
      pCur->next = pNext->next;
      pCur = pNext->next;
      delete(pNext);
      free(pNext);
    } else {
    
      pCur = pNext;
@@ -559,7 +560,10 @@
        procQuePart->erase(buf_temp);
      }
      BusServerSocket::buf_data_remove(key);
      find_mm_data(key);
    }
  } else if (flag == PROC_REG_TCS) {
    ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
    SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
@@ -709,7 +713,7 @@
    sprintf(data_buf, "%d", count);
    shm_sendto(shm_socket, data_buf, strlen(data_buf), key, &timeout, BUS_TIMEOUT_FLAG);
  } else {
  } else if (flag == PROC_QUE_ATCS) {
    int val;
    int temp = 0;
@@ -853,6 +857,17 @@
    shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
    free(last_buf);
  } else {
    char *ptr = NULL;
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
    data1 = atoi(buf_temp);
    ptr = strstr(buf_temp, STR_MAGIC);
    if (ptr != NULL) {
      data2 = atoi(ptr + 1);
    }
    BusServerSocket::buf_data_set(data1, data2);
  }
}
@@ -888,7 +903,7 @@
    int key;
  int flag;
  char buf_temp[MAX_STR_LEN] = { 0x00 };
    char * action, *topic, *topics, *buf, *content;
    char *action, *topic, *topics, *buf, *content;
    size_t head_len;
    bus_head_t head;
    int val;
@@ -935,7 +950,8 @@
        } 
    else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \
            || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \
            || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) {
            || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0) \
            || (strcmp(action, "bufreg") == 0)) {
      content = topics + head.topic_size;
      if (strcmp(action, "reg") == 0) {
        
@@ -957,15 +973,19 @@
        
        flag = PROC_QUE_STCS; 
      } else {
      } else if (strcmp(action, "atcsque") == 0) {
        
        flag = PROC_QUE_ATCS;
      } else {
        flag = PROC_REG_BUF;
      }
        
      if (flag == PROC_REG) {
        memcpy(buf_temp, content, strlen(content) + 1);
        if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
          val = proc_que_iter->second;
@@ -996,6 +1016,7 @@
  hashtable_t *hashtable = mm_get_hashtable();
  void *data_ptr = hashtable_get(hashtable, val);
  if (data_ptr != NULL) {
    if (data_ptr != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr;
@@ -1011,3 +1032,41 @@
}
void BusServerSocket::buf_data_set(int data, int val) {
  recvbuf_val *val_buf;
  recvbuf_data::iterator data_iter;
  recvbuf_val::iterator val_iter;
  if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
    val_buf = data_iter->second;
  } else {
    void *set_ptr = mm_malloc(sizeof(recvbuf_val));
    val_buf = new(set_ptr) recvbuf_val;
    recvBuf_data.insert({data, val_buf});
  }
  val_buf->insert(val);
}
void BusServerSocket::buf_data_remove(int data) {
  int val;
  recvbuf_val *val_buf;
  recvbuf_data::iterator data_iter;
  recvbuf_val::iterator val_iter;
  if ((data_iter = recvBuf_data.find(data)) != recvBuf_data.end()) {
    val_buf = data_iter->second;
    for(val_iter = val_buf->begin(); val_iter != val_buf->end(); ++val_iter) {
      val = *val_iter;
      BusServerSocket::_data_remove(val);
    }
    recvBuf_data.erase(data);
  }
}
src/socket/bus_server_socket.h
@@ -8,6 +8,7 @@
#include "sem_util.h"
#include "logger_factory.h"
#include "key_def.h"
#include "msg_mgr.h"
#include "socket_def.h"
#include <set>
@@ -62,6 +63,7 @@
  // pthread_t recv_thread;
  // <主题, 订阅者>
    SHMTopicSubMap *topic_sub_map;
  recvbuf_data recvBuf_data;
private:
    int  destroy();
@@ -122,6 +124,8 @@
    int get_key() ;
  void _data_remove(int val);
  void buf_data_set(int data, int val);
  void buf_data_remove(int data);
};
src/socket/shm_mod_socket.cpp
@@ -69,6 +69,10 @@
    memcpy(head.action, "atcsque", sizeof(head.action));
  } else if (flag == PROC_REG_BUF) {
    memcpy(head.action, "bufreg", sizeof(head.action));
  } else {
    return -1;
@@ -115,7 +119,7 @@
    
    ts.tv_nsec = (timeout_ms - ts.tv_sec * 1000) * 1000 * 1000;
  
    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) {
      ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_TIMEOUT_FLAG);
@@ -127,7 +131,7 @@
  
  } else if (timeout_ms == 0) {
  
    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) {
      ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_NOWAIT_FLAG);
@@ -139,7 +143,7 @@
 
  } else {
    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS)) {
    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) {
    
      ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, -1);
@@ -165,7 +169,6 @@
int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag, int reset, int data_set) {
    int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag, reset, data_set);
  if(rv == 0) {
      logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key);
      return 0;
  }
@@ -183,7 +186,6 @@
  int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag, reset, data_set);
    if(rv == 0) {
    logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
@@ -202,7 +204,6 @@
    int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
    if(rv == 0) {
      logger->debug("ShmModSocket::sendandrecv:  sendandrecv to %d success.\n", send_key);
      return 0;
  }
src/socket/shm_socket.cpp
@@ -2,10 +2,12 @@
#include "socket_def.h"
#include "hashtable.h"
#include "logger_factory.h"
#include "net_mod_socket_wrapper.h"
#include <map>
#include <cassert>
#include "bus_error.h"
#include "sole.h"
#include "bh_api.h"
#include "shm_mm.h"
#include "key_def.h"
@@ -105,9 +107,6 @@
static int _shm_socket_close_(shm_socket_t *sockt) {
  int rv, i;
  hashtable_t *hashtable = mm_get_hashtable();
  // if(sockt->key != 0) {
  //   auto it =  shmQueueStMap->find(sockt->key);
@@ -117,18 +116,6 @@
  //   }
  // }
  if(sockt->queue != NULL) {
    sockt->queue->close();
    for( i = 0; i < sockt->queue->size(); i++) {
      mm_free((*(sockt->queue))[i].buf);
      logger->info("======= %d free queue element buf\n", sockt->key);
    }
    sleep(1);
    hashtable_remove(hashtable, sockt->key);
  //   sockt->queue = NULL;
  }
  pthread_mutex_destroy(&(sockt->mutex) );
  free(sockt);
  return 0;
@@ -404,13 +391,16 @@
                    const int send_size, const int key, void **recv_buf,
                    int *recv_size,  const struct timespec *timeout,  int flags) {
  
  int data;
  int timeout_ms;
  char data_buf[MAX_STR_LEN] = { 0x00 };
  int rv = 0, tryn = 16;
  shm_packet_t sendpak;
  shm_packet_t recvpak;
  std::map<int, shm_packet_t>::iterator recvbufIter;
  shm_socket_t *tmp_socket = NULL;
  hashtable_t *hashtable = mm_get_hashtable();
  rv = pthread_once(&_once_, _create_threadlocal_socket_key_);
  if (rv != 0) {
    logger->error(rv, "shm_sendandrecv pthread_once");
@@ -421,7 +411,15 @@
  if (tmp_socket == NULL)
  {
    tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
    tmp_socket->key = hashtable_alloc_key(hashtable);
    data = inter_key_get();
    timeout_ms = timeout->tv_sec * 1000 + 3000;
    sprintf(data_buf, "%d, %d", data, tmp_socket->key);
    if (socket_data_get() != NULL) {
      net_mod_socket_reg(socket_data_get(), data_buf, strlen(data_buf), NULL, 0, timeout_ms, PROC_REG_BUF);
    }
    rv =  pthread_setspecific(_localthread_socket_key_, tmp_socket);
    if ( rv != 0) {
      logger->error(rv, "shm_sendandrecv : pthread_setspecific");
@@ -564,6 +562,7 @@
      if (sockt->key == 0) {
        sockt->key = hashtable_alloc_key(hashtable);
      }
      sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
      if(sockt->queue  == NULL ) {
        logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
@@ -729,7 +728,7 @@
  count += strlen(ptr->int_info) + 1;
  memcpy(dst + count, ptr->svr_info, strlen(ptr->svr_info) + 1);
  count += strlen(ptr->svr_info) + 1;
  *counter = count;
}