fujuntang
2021-12-09 73689afc09ce346f9eb00e02faf7f242e55dc7ee
src/socket/bus_server_socket.cpp
@@ -6,6 +6,7 @@
#include "bus_error.h"
static Logger *logger = LoggerFactory::getLogger();
static pthread_mutex_t gMutex;
list gLinkedList;
void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
@@ -84,11 +85,13 @@
int  BusServerSocket::start(){
  int rv;
   topic_sub_map =   shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
   rv = _run_proxy_();
  topic_sub_map =   shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY);
   return rv;
  pthread_mutex_init(&gMutex, NULL);
  rv = _run_proxy_();
  return rv;
}
@@ -201,7 +204,7 @@
   int rv;
   struct timespec timeout = {1,0};
   if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
      subscripter_set = map_iter->second;
      for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); ++set_iter) {
@@ -516,9 +519,11 @@
    ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET);
    ProcPartZone *procPart = shm_mm_attach<ProcPartZone>(SHM_BUS_PROC_PART_MAP_KEY);
    if (flag == PROC_REG) {
      pthread_mutex_lock(&gMutex);
      if ((proc_iter = proc->find(key)) == proc->end()) {
        proc->insert({key, Data_stru});
      }
      pthread_mutex_unlock(&gMutex);
      if ((proc_part_iter = procPart->find(key)) == procPart->end()) {
        procPart->insert({key, Data_stru.proc_id});
@@ -537,6 +542,7 @@
        SvrSub_ele->erase(key);
      }
      pthread_mutex_lock(&gMutex);
      if ((proc_iter = proc->find(key)) != proc->end()) {
        data1 = atoi((proc_iter->second).int_info);
@@ -549,6 +555,7 @@
        proc->erase(key);
      }
      pthread_mutex_unlock(&gMutex);
      if ((proc_part_iter = procPart->find(key)) != procPart->end()) {
@@ -560,7 +567,10 @@
        procQuePart->erase(buf_temp);
      }
      pthread_mutex_lock(&gMutex);
      BusServerSocket::buf_data_remove(key);
      pthread_mutex_unlock(&gMutex);
      find_mm_data(key);
    }
@@ -580,6 +590,7 @@
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); 
    data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
    while(data_ptr) {
      data_ptr = trim(data_ptr, 0);
      TcsSub_ele->insert(data_ptr);
      if ((svr_tcs_iter = SvrData->find(data_ptr)) != SvrData->end()) {
        SvrSub_ele = svr_tcs_iter->second;
@@ -609,6 +620,7 @@
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
    data_ptr = strtok(const_cast<char *>(buf_temp), STR_MAGIC);
    while(data_ptr) {
      data_ptr = trim(data_ptr, 0);
      ret = Qurey_object(data_ptr, &len);
      if (ret != NULL) {
    
@@ -694,7 +706,7 @@
    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
    if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
    if ((svr_tcs_iter = SvrData->find(trim(buf_temp, 0))) != SvrData->end()) {
      SvrSub_ele = svr_tcs_iter->second;
    
      for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { 
@@ -876,9 +888,12 @@
  ProcZone::iterator proc_iter;
  ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
  pthread_mutex_lock(&gMutex);
  if ((proc_iter = proc->find(val)) != proc->end()) {
    pthread_mutex_unlock(&gMutex);
    return true;
  }
  pthread_mutex_unlock(&gMutex);
  
  return false;