fujuntang
2021-09-17 97428da8d339d3d0c4d00aa328d76a32c7858412
src/socket/bus_server_socket.cpp
@@ -1,21 +1,23 @@
#include "bus_server_socket.h"
#include "shm_mod_socket.h"
#include "shm_socket.h"
#include "bus_error.h"
static Logger *logger = LoggerFactory::getLogger();
list gLinkedList;
void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
   SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
   SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
   SHMKeySet *subscripter_set;
   SHMKeySet::iterator set_iter;
   SHMTopicSubMap::iterator map_iter;
   if(topic_sub_map != NULL) {
      for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
      for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
         subscripter_set = map_iter->second;
         if(subscripter_set != NULL) {
            for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
            for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
               cb(subscripter_set, *set_iter);
            }
         }
@@ -29,17 +31,16 @@
   int key;
   for(int i = 0; i < length; i++) {
      key = keys[i];
      SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
      SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
      SHMKeySet *subscripter_set;
      SHMKeySet::iterator set_iter;
      SHMTopicSubMap::iterator map_iter;
      if(topic_sub_map != NULL) {
         for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
         for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
            subscripter_set = map_iter->second;
            if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) {
               subscripter_set->erase(set_iter);
// printf("remove_subscripter %s, %d\n", map_iter->first, key);
               count++;
            }
         }
@@ -51,32 +52,13 @@
BusServerSocket::BusServerSocket() {
   logger->debug("BusServerSocket Init");
   shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
   shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
   topic_sub_map = NULL;
}
BusServerSocket::~BusServerSocket() {
   SHMKeySet *subscripter_set;
   SHMTopicSubMap::iterator map_iter;
   stop();
   if(topic_sub_map != NULL) {
      for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
         subscripter_set = map_iter->second;
         if(subscripter_set != NULL) {
            subscripter_set->clear();
            mm_free((void *)subscripter_set);
         }
      }
      topic_sub_map->clear();
      mem_pool_free_by_key(SHM_BUS_MAP_KEY);
   }
   shm_close_socket(shm_socket);
   logger->debug("BusServerSocket destory 3");
   destroy();
}
@@ -97,12 +79,15 @@
 * 启动bus
 * 
 * @return 0 成功, 其他值 失败的错误码
*/
 */
int  BusServerSocket::start(){
   topic_sub_map =   mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
  int rv;
   topic_sub_map =   shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
 
   run_pubsub_proxy();
   return 0;
   rv = _run_proxy_();
   return rv;
}
@@ -111,8 +96,6 @@
   if( shm_socket->key <= 0) {
      return -1;
   }
   // snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
   // return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0);
   bus_head_t head = {};
   memcpy(head.action, "stop", sizeof(head.action));
   head.topic_size = 0;
@@ -122,7 +105,7 @@
   void *buf;
   int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL,  0, &buf);
   if(size > 0) {
      ret = client.sendandrecv_unsafe( buf, size, shm_socket->key, NULL, NULL);
      ret = client.sendto( buf, size, shm_socket->key);
      free(buf);
      return ret;
   } else {
@@ -131,15 +114,35 @@
}
int  BusServerSocket::destroy() {
   SHMKeySet *subscripter_set;
   SHMTopicSubMap::iterator map_iter;
   if(topic_sub_map != NULL) {
      for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
         subscripter_set = map_iter->second;
         if(subscripter_set != NULL) {
            subscripter_set->clear();
            mm_free((void *)subscripter_set);
         }
      }
      topic_sub_map->clear();
      shm_mm_free_by_key(SHM_BUS_MAP_KEY);
   }
   shm_socket_close(shm_socket);
  return 0;
}
/*
 * 处理订阅
*/
void BusServerSocket::_proxy_sub( char *topic, int key) {
   SHMKeySet *subscripter_set;
  struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0};
   SHMTopicSubMap::iterator map_iter;
   SHMKeySet::iterator set_iter;
//printf("_proxy_sub topic = %s\n", topic);
   if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
      subscripter_set = map_iter->second;
   } else {
@@ -148,6 +151,7 @@
      topic_sub_map->insert({topic, subscripter_set});
   }
   subscripter_set->insert(key);
}
/*
@@ -174,7 +178,7 @@
   SHMTopicSubMap::iterator map_iter;
   // SHMKeySet::iterator set_iter;
   for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
   for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); ++map_iter) {
         subscripter_set = map_iter->second;
         subscripter_set->erase(key);
   }
@@ -183,7 +187,7 @@
/*
 * 处理发布,代理转发
*/
void BusServerSocket::_proxy_pub( char *topic, void *buf, size_t size, int key) {
void BusServerSocket::_proxy_pub( char *topic, char *buf, size_t size, int key) {
   SHMKeySet *subscripter_set;
   SHMTopicSubMap::iterator map_iter;
@@ -193,23 +197,24 @@
   std::vector<int>::iterator vector_iter;
   int send_key;
   int rv;
   struct timespec timeout = {1,0};
   if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
      subscripter_set = map_iter->second;
      for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
      for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
         send_key = *set_iter;
// logger->debug("_proxy_pub send before %d \n", send_key);
         if (shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG) == EBUS_CLOSED ) {
            //对方已关闭的连接放到待删除队列里。如果直接删除会让iter指针出现错乱
            subscripter_to_del.push_back(send_key);
         } else {
// logger->debug("_proxy_pub send after: %d \n", send_key);
         rv = shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG);
         if(rv == 0) {
            continue;
         }
         //对方已关闭的或者对应的进程被kill掉的连接放到待删除队列里。如果直接删除会让iter指针出现错乱
         subscripter_to_del.push_back(send_key);
      }
      // 删除已关闭的端
      for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) {
      for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); ++vector_iter) {
         if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) {
            subscripter_set->erase(set_iter);
            logger->debug("remove closed subscripter %d \n", send_key);
@@ -218,28 +223,686 @@
      subscripter_to_del.clear();
   }
}
void * BusServerSocket::run_pubsub_proxy() {
ProcInfo_query *Qurey_object(const char *object, int *length) {
  int flag = 0;
  int val;
  int len;
  int total = 0;
  ProcInfo *Proc_ptr = NULL;
  ProcInfo Data_stru;
  ProcInfo_query *dataBuf = NULL;
  SvrProc *SvrSub_ele;
  SvrTcs::iterator svr_tcs_iter;
  SvrProc::iterator svr_proc_iter;
  ProcZone::iterator proc_iter;
  SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
  ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
  if ((svr_tcs_iter = SvrData->find(object)) != SvrData->end()) {
    SvrSub_ele = svr_tcs_iter->second;
    for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) {
      val = *svr_proc_iter;
      if ((proc_iter = proc->find(val)) != proc->end()) {
        if (dataBuf == NULL) {
          dataBuf = (ProcInfo_query *)malloc(sizeof(ProcInfo_query));
          if (dataBuf == NULL) {
            logger->error("in proxy_reg: Out of memory!\n");
            exit(1);
          }
          total = sizeof(ProcInfo_query);
        }
        if (flag == 0) {
          memset(dataBuf, 0x00, sizeof(ProcInfo_query));
          dataBuf->num = 1;
          strncpy(dataBuf->name, object, sizeof(dataBuf->name) - 1);
          flag = 1;
        } else {
          dataBuf->num++;
          len = sizeof(ProcInfo_query) + sizeof(ProcInfo) * (dataBuf->num - 1);
          dataBuf = (ProcInfo_query *)realloc(dataBuf, len);
          if (dataBuf == NULL) {
            logger->error("in proxy_reg: Out of memory!\n");
            exit(1);
          }
          total += sizeof(ProcInfo);
          memset((char *)dataBuf + len - sizeof(ProcInfo), 0x00, sizeof(ProcInfo));
        }
        memset(&Data_stru, 0x00, sizeof(ProcInfo));
        Data_stru = proc_iter->second;
        Proc_ptr = &(dataBuf->procData) + dataBuf->num - 1;
        strncpy(Proc_ptr->proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id) + 1);
        strncpy(Proc_ptr->name, Data_stru.name, strlen(Data_stru.name) + 1);
        strncpy(Proc_ptr->public_info, Data_stru.public_info, strlen(Data_stru.public_info) + 1);
        strncpy(Proc_ptr->private_info, Data_stru.private_info, strlen(Data_stru.private_info) + 1);
        if (length != NULL)
          *length = total;
      }
    }
  }
  return dataBuf;
}
void list::Insert(int aData, int bData)
{
  LinkNode *pHead = NULL;
  LinkNode *pNew = NULL;
  LinkNode *pCur = NULL;
  pNew = new(LinkNode);
  pNew->data = aData;
  pNew->data_fix = bData;
  pNew->count = 0;
  pHead = head;
  pCur = pHead;
  if(pHead == NULL) {
    head = pNew;
    pNew->next = NULL;
  } else {
    while(pCur->next != NULL) {
      pCur = pCur->next;
    }
    pCur->next = pNew;
    pNew->next = NULL;
  }
}
void list::Delete(int data)
{
  LinkNode *pHead;
  LinkNode *pCur;
  LinkNode *pNext;
  pHead = head;
  pCur = pHead;
  if(pHead == NULL)
    return;
  while((pCur != NULL) && (pCur->data == data)) {
    head = pCur->next;
    delete(pCur);
    pCur = head;
  }
  while((pCur != NULL) && (pCur->next != NULL)) {
    pNext = pCur->next;
    if(pNext->data == data) {
      pCur->next = pNext->next;
      pCur = pNext->next;
      delete(pNext);
    } else {
      pCur = pNext;
    }
  }
}
void list::dataSet(int data, int val)
{
  LinkNode *pCur;
  pCur = head;
  if(pCur == NULL)
    return;
  while(pCur != NULL) {
    if(pCur->data == data) {
      pCur->count = val;
    }
    pCur = pCur->next;
  }
}
int list::dataGet(int data)
{
  LinkNode *pCur;
  pCur = head;
  if(pCur == NULL)
    return 0;
  while(pCur != NULL) {
    if(pCur->data == data) {
      return pCur->count;
    }
    pCur = pCur->next;
  }
  return 0;
}
int list::dataFixGet(int data)
{
  LinkNode *pCur;
  pCur = head;
  if(pCur == NULL)
    return 0;
  while(pCur != NULL) {
    if(pCur->data == data) {
      return pCur->data_fix;
    }
    pCur = pCur->next;
  }
  return 0;
}
int list::NodeNum(void)
{
  int count = 0;
  LinkNode *pCur = head;
  if (pCur == NULL) {
    return 0;
  }
  while(pCur != NULL) {
    ++count;
    pCur = pCur->next;
  }
  return count;
}
int list::nodeGet(int index)
{
  int count = 0;
  LinkNode *pCur = head;
  if (pCur == NULL) {
    return 0;
  }
  while((pCur != NULL) && (count <= index)) {
    if (count == index) {
      return pCur->data;
    }
    ++count;
    pCur = pCur->next;
  }
  return 0;
}
void BusServerSocket::_proxy_reg(const char *topic, size_t topic_size, const char *buf, size_t buf_size, int key, int flag)
{
  char buf_temp[MAX_STR_LEN * MAX_TOPICS_NUN] = { 0x00 };
  int count = 0;
  int i = 0;
  int len = 0;
  int data1, data2;
  char *data_ptr;
  ProcInfo Data_stru;
  ProcZone::iterator proc_iter;
  TcsZone *TcsSub_ele;
  ProcDataZone::iterator proc_que_iter;
  ProcTcsMap::iterator proc_tcs_iter;
  SvrProc *SvrSub_ele;
  SvrProc::iterator svr_proc_iter;
  SvrTcs::iterator svr_tcs_iter;
  TcsZone::iterator tcssub_iter;
  ProcPartZone::iterator proc_part_iter;
  struct timespec timeout = {.tv_sec = 1, .tv_nsec = 0};
  if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
    memset(&Data_stru, 0x00, sizeof(ProcInfo));
    if (buf != NULL) {
      memcpy(Data_stru.proc_id, buf, strlen(buf) + 1);
      count = strlen(buf) + 1;
      memcpy(Data_stru.name, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
      memcpy(Data_stru.public_info, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
      memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
      memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
      memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
      if (flag == PROC_REG) {
        gLinkedList.Insert(key, atoi(Data_stru.int_info));
      }
    }
    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
    ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
    ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY);
    if (flag == PROC_REG) {
      if ((proc_iter = proc->find(key)) == proc->end()) {
        proc->insert({key, Data_stru});
      }
      if ((proc_part_iter = procPart->find(key)) == procPart->end()) {
        procPart->insert({key, Data_stru.proc_id});
      }
      if ((proc_que_iter = procQuePart->find(Data_stru.proc_id)) == procQuePart->end()) {
        procQuePart->insert({Data_stru.proc_id, key});
      }
    } else {
      SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
      for (svr_tcs_iter = SvrData->begin(); svr_tcs_iter != SvrData->end(); ++svr_tcs_iter) {
        SvrSub_ele = svr_tcs_iter->second;
        SvrSub_ele->erase(key);
      }
      if ((proc_iter = proc->find(key)) != proc->end()) {
        data1 = atoi((proc_iter->second).int_info);
        data2 = atoi((proc_iter->second).svr_info);
        BusServerSocket::_data_remove(data1, data2);
        len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
        strncpy(buf_temp, (proc_iter->second).proc_id, len);
        proc->erase(proc_iter);
      }
      if ((proc_part_iter = procPart->find(key)) != procPart->end()) {
        procPart->erase(key);
      }
      if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
        procQuePart->erase(buf_temp);
      }
    }
  } else if (flag == PROC_REG_TCS) {
    ProcTcsMap *proc = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
    SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
    if ((proc_tcs_iter = proc->find(key)) != proc->end()) {
      TcsSub_ele = proc_tcs_iter->second;
    } else {
      void *ptr_set = mm_malloc(sizeof(TcsZone));
      TcsSub_ele = new(ptr_set) TcsZone;
      proc->insert({key, TcsSub_ele});
    }
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
    data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
    while(data_ptr) {
      TcsSub_ele->insert(data_ptr);
      if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) {
        SvrSub_ele = svr_tcs_iter->second;
      } else {
        void *ptr_set = mm_malloc(sizeof(SvrProc));
        SvrSub_ele = new(ptr_set) SvrProc;
        SvrData->insert({data_ptr, SvrSub_ele});
      }
      SvrSub_ele->insert(key);
      data_ptr = strtok(NULL, STR_MAGIC);
    }
  } else if (flag == PROC_QUE_TCS) {
    struct _temp_store {
      void *ptr;
      int total;
    } *temp_store = NULL;
    int num = 0;
    int sum = 0;
    ProcInfo_query *ret = NULL;
    ProcInfo_query *ret_store = NULL;
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
    data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
    while(data_ptr) {
      ret = Qurey_object(data_ptr, &len);
      if (ret != NULL) {
        if (temp_store == NULL) {
          temp_store = (_temp_store *)malloc(sizeof(_temp_store));
          if (temp_store == NULL) {
            logger->error("in proxy_reg: Out of memory!\n");
            exit(1);
          }
          temp_store->ptr = ret;
          temp_store->total = len;
          num = 1;
        } else {
          num++;
          temp_store = (_temp_store *)realloc(temp_store, sizeof(_temp_store) * num);
          if (temp_store == NULL) {
            logger->error("in proxy_reg: Out of memory!\n");
            exit(1);
          }
          (temp_store + num - 1)->ptr = ret;
          (temp_store + num - 1)->total = len;
        }
      }
      data_ptr = strtok(NULL, STR_MAGIC);
    }
    if (num > 0) {
      for (count = 0; count < num; count++) {
        if (ret_store == NULL) {
          ret_store = (ProcInfo_query *)malloc((temp_store + count)->total);
          if (ret_store == NULL) {
            logger->error("in proxy_reg: Out of memory!\n");
            exit(1);
          }
          sum = (temp_store + count)->total;
          memcpy(ret_store, (temp_store + count)->ptr, (temp_store +count)->total);
        } else {
          ret_store = (ProcInfo_query *)realloc(ret_store, sum + (temp_store + count)->total);
          if (ret_store == NULL) {
            logger->error("in proxy_reg: Out of memory!\n");
            exit(1);
          }
          memcpy((char *)ret_store + sum, (temp_store + count)->ptr, (temp_store + count)->total);
          sum += (temp_store + count)->total;
        }
        free((temp_store + count)->ptr);
      }
      free(temp_store);
    }
    void *last_buf = malloc(sum + sizeof(int));
    if (last_buf == NULL) {
      logger->error("in proxy_reg: Out of memory!\n");
      exit(1);
    }
    *(int *)last_buf = num;
    if (num > 0) {
      memcpy((char *)last_buf + sizeof(int), (char *)ret_store, sum);
      free(ret_store);
    }
    shm_sendto(shm_socket, last_buf, sum + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
    free(last_buf);
  } else if (flag == PROC_QUE_STCS) {
    SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
    if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
      SvrSub_ele = svr_tcs_iter->second;
      for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) {
        count = *svr_proc_iter;
        if ((proc_iter = proc->find(count)) != proc->end()) {
          count = atoi((proc_iter->second).svr_info);
        }
        break;
      }
    } else {
      count = 0;
    }
    memset(buf_temp, 0x00, sizeof(buf_temp));
    sprintf(buf_temp, "%d", count);
    shm_sendto(shm_socket, buf_temp, strlen(buf_temp), key, &timeout, BUS_TIMEOUT_FLAG);
  } else {
    int val;
    int temp = 0;
    int pos = 0;
    int size = 0;
    ProcInfo_sum *Data_sum = NULL;
    SHMKeySet *subs_proc;
    SHMKeySet::iterator subs_proc_iter;
    SHMTopicSubMap::iterator subs_iter;
    ProcTcsMap *procData = shm_mm_attach<ProcTcsMap>(SHM_BUS_PROC_TCS_MAP_KEY);
    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
    for (proc_iter = proc->begin(); proc_iter != proc->end(); ++proc_iter) {
      memset(&Data_stru, 0x00, sizeof(Data_stru));
      if (count == 0) {
        Data_sum = (ProcInfo_sum *)malloc(sizeof(ProcInfo_sum));
        if (Data_sum == NULL) {
          logger->error("in proxy_reg: Out of memory!\n");
          exit(1);
        }
        count++;
        memset(Data_sum, 0x00, sizeof(ProcInfo_sum));
      } else {
        count++;
        len = sizeof(ProcInfo_sum) * count;
        Data_sum = (ProcInfo_sum *)realloc(Data_sum, len);
        if (Data_sum == NULL) {
          logger->error("in proxy_reg: Out of memory!\n");
          exit(1);
        }
        memset(Data_sum + count - 1, 0x00, sizeof(ProcInfo_sum));
      }
      Data_stru = proc_iter->second;
      memcpy((Data_sum + count - 1)->procData.proc_id, Data_stru.proc_id, strlen(Data_stru.proc_id));
      memcpy((Data_sum + count - 1)->procData.name, Data_stru.name, strlen(Data_stru.name));
      memcpy((Data_sum + count - 1)->procData.public_info, Data_stru.public_info, strlen(Data_stru.public_info));
      memcpy((Data_sum + count - 1)->procData.private_info, Data_stru.private_info, strlen(Data_stru.private_info));
      (Data_sum + count - 1)->stat = 1;
      (Data_sum + count - 1)->list_num = 3;
      val = proc_iter->first;
      if ((proc_tcs_iter = procData->find(val)) != procData->end()) {
        TcsSub_ele = proc_tcs_iter->second;
        temp = 0;
        pos = 0;
        len = sizeof(Data_sum->reg_info) - 1;
        for (tcssub_iter = TcsSub_ele->begin(); tcssub_iter != TcsSub_ele->end(); ++tcssub_iter) {
          if (temp == 0) {
            strncpy((Data_sum + count - 1)->reg_info, (*tcssub_iter).c_str(), strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str()));
            pos += strlen((Data_sum + count - 1)->reg_info);
            len -= strlen((Data_sum + count - 1)->reg_info);
            temp++;
          } else {
            if (len > 0) {
              strcat((Data_sum + count - 1)->reg_info, ",");
              pos += 1;
              len -= 1;
            }
            if (len > 0) {
              size = strlen((*tcssub_iter).c_str()) > len ? len : strlen((*tcssub_iter).c_str());
              strncpy(&(Data_sum + count - 1)->reg_info[pos], (*tcssub_iter).c_str(), size);
              pos += size;
              len -= size;
            }
          }
        }
        pos = 0;
        temp = 0;
        len = sizeof(Data_sum->local_info) - 1;
        for (subs_iter = topic_sub_map->begin(); subs_iter != topic_sub_map->end(); ++subs_iter) {
          subs_proc = subs_iter->second;
          if ((subs_proc_iter = subs_proc->find(val)) != subs_proc->end()) {
            if ((temp == 0)) {
              strncpy((Data_sum + count - 1)->local_info, subs_iter->first.c_str(), strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str()));
              pos += strlen((Data_sum + count - 1)->local_info);
              len -= strlen((Data_sum + count - 1)->local_info);
              temp++;
            } else {
              if (len > 0) {
                strcat((Data_sum + count - 1)->local_info, ",");
                pos += 1;
                len -= 1;
              }
              if (len > 0) {
                size = strlen(subs_iter->first.c_str()) > len ? len : strlen(subs_iter->first.c_str());
                strncpy(&(Data_sum + count - 1)->local_info[pos], subs_iter->first.c_str(), size);
                pos += size;
                len -= size;
              }
            }
          }
        }
      }
    }
    temp = count * sizeof(ProcInfo_sum);
    void *last_buf = malloc(temp + sizeof(int));
    if (last_buf == NULL) {
      logger->error("in proxy_reg: Out of memory!\n");
      exit(1);
    }
    *(int *)last_buf = count;
    if (count > 0) {
      memcpy((char *)last_buf + sizeof(int), (char *)Data_sum, temp);
      free(Data_sum);
    }
    shm_sendto(shm_socket, last_buf, temp + sizeof(int), key, &timeout, BUS_TIMEOUT_FLAG);
    free(last_buf);
  }
}
int BusServerSocket::get_data(int val) {
  ProcZone::iterator proc_iter;
  ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
  if ((proc_iter = proc->find(val)) != proc->end()) {
    return true;
  }
  return false;
}
int BusServerSocket::check_proc(const int val, const void *buf, int len, void **buf_ret, int *len_ret, \
                          const struct timespec *timeout, const int flag) {
  int ret;
  ret = shm_sendandrecv(shm_socket, buf, len, val, buf_ret, len_ret, timeout, flag);
  return ret;
}
void BusServerSocket::remove_proc(int val) {
  BusServerSocket::_proxy_reg(NULL, 0, NULL, 0, val, PROC_UNREG);
}
// 运行代理
int BusServerSocket::_run_proxy_() {
   int size;
   int key;
  int flag;
  char buf_temp[MAX_STR_LEN] = { 0x00 };
   char * action, *topic, *topics, *buf, *content;
   size_t head_len;
   char resp_buf[128];
   bus_head_t head;
    int val;
    ProcDataZone::iterator proc_que_iter;
    ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
   const char *topic_delim = ",";
// logger.debug("run_pubsub_proxy server receive before\n");
   while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
// logger.debug("run_pubsub_proxy server recvfrom %d after: %s \n", key, buf);
  int rv;
  char send_buf[512] = { 0x00 };
  const char *topic_delim = ",";
   while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) {
      head = ShmModSocket::decode_bus_head(buf);
      topics = buf + BUS_HEAD_SIZE;
      action = head.action;
// logger.debug("run_pubsub_proxy : %s\n", action);
      if(strcmp(action, "sub") == 0) {
         // 订阅支持多主题订阅
         topic = strtok(topics, topic_delim);
// logger.debug("run_pubsub_proxy topic = %s\n", topic);
        while(topic) {
       _proxy_sub(trim(topic, 0), key);
        topic =  strtok(NULL, topic_delim);
@@ -247,7 +910,6 @@
      } 
      else if(strcmp(action, "desub") == 0) {
// logger.debug("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
         if(strcmp(trim(topics, 0), "") == 0) {
            // 取消所有订阅
            _proxy_desub_all(key);
@@ -262,22 +924,110 @@
         
      } 
      else if(strcmp(action, "pub") == 0) {
          content = topics + head.topic_size;
         _proxy_pub(topics, content, head.content_size, key);
      topics[head.topic_size - 1] = '\0';
        content = topics + head.topic_size;
      }
      else if(strcmp(action, "stop") == 0) {
         logger->info( "Stopping Bus...");
         _proxy_pub(topics, topics, head.topic_size + head.content_size, key);
      }
    else if ((strcmp(action, "reg") == 0) || (strcmp(action, "unreg") == 0) \
            || (strcmp(action, "tcsreg") == 0) || (strcmp(action, "tcsque") == 0) \
            || (strcmp(action, "stcsque") == 0) || (strcmp(action, "atcsque") == 0)) {
      content = topics + head.topic_size;
      if (strcmp(action, "reg") == 0) {
        flag = PROC_REG;
      } else if (strcmp(action, "unreg") == 0) {
        flag = PROC_UNREG;
      } else if (strcmp(action, "tcsreg") == 0) {
        flag = PROC_REG_TCS;
      } else if (strcmp(action, "tcsque") == 0) {
        flag = PROC_QUE_TCS;
      } else if (strcmp(action, "stcsque") == 0) {
        flag = PROC_QUE_STCS;
      } else {
        flag = PROC_QUE_ATCS;
      }
      if (flag == PROC_REG) {
        memcpy(buf_temp, content, strlen(content) + 1);
        if ((proc_que_iter = procQuePart->find(buf_temp)) != procQuePart->end()) {
          val = proc_que_iter->second;
          _proxy_reg(topics, head.topic_size, content, head.content_size, val, PROC_UNREG);
        }
      }
      _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag);
    }
      else if (strncmp(buf, "request", strlen("request")) == 0) {
      sprintf(send_buf, "%4d", key);
      strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4));
      rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key);
      if(rv != 0) {
        logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n");
      }
    }
    else if(strcmp(action, "stop") == 0) {
         free(buf);
         break;
      } else {
         logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action %s", action);
         logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action);
      }
      free(buf);
   }
   shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key);
   return NULL;
   return rv;
}
void BusServerSocket::_data_remove(int val1, int val2) {
  int i;
  LockFreeQueue<shm_packet_t> *queue = NULL;
  hashtable_t *hashtable = mm_get_hashtable();
  void *data_ptr1 = hashtable_get(hashtable, val1);
  void *data_ptr2 = hashtable_get(hashtable, val2);
  if (data_ptr1 != NULL) {
    if (data_ptr1 != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr1;
      queue->close();
      for (i = 0; i < queue->size(); i++) {
        mm_free((*queue)[i].buf);
      }
      sleep(1);
    }
    hashtable_remove(hashtable, val1);
  }
  if (data_ptr2 != NULL) {
    if (data_ptr2 != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr2;
      queue->close();
      for (i = 0; i < queue->size(); i++) {
        mm_free((*queue)[i].buf);
      }
      sleep(1);
    }
    hashtable_remove(hashtable, val2);
  }
}