fujuntang
2021-11-10 c479ef57baaaa28964fc3ec8d80ff99dffa7d49f
src/socket/shm_mod_socket.cpp
@@ -3,23 +3,13 @@
static Logger *logger = LoggerFactory::getLogger();
size_t ShmModSocket::remove_keys(int keys[], size_t length) {
   BusServerSocket::remove_subscripters(keys, length);
   return shm_socket_remove_keys(keys, length);
}
size_t ShmModSocket::remove_keys_exclude(int keys[], size_t length) {
   BusServerSocket::remove_subscripters(keys, length);
   return shm_socket_remove_keys_exclude(keys, length);
}
ShmModSocket::ShmModSocket() {
   shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
   shm_socket = shm_socket_open(SHM_SOCKET_DGRAM);
   bus_set = new std::set<int>;
}
ShmModSocket::~ShmModSocket() {
  // logger->debug("Close ShmModSocket...\n");
   struct timespec timeout = {1, 0};
   if(bus_set != NULL) {
      for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) {
@@ -28,7 +18,7 @@
      delete bus_set;
   }
   shm_close_socket(shm_socket);
   shm_socket_close(shm_socket);
}
int ShmModSocket::stop() {
@@ -48,15 +38,137 @@
   return shm_socket_force_bind(shm_socket, key);
}
int ShmModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag)
{
  int ret;
  struct timespec ts;
  bus_head_t head = {};
  if (flag == PROC_REG) {
    memcpy(head.action, "reg", sizeof(head.action));
  } else if (flag == PROC_UNREG) {
    memcpy(head.action, "unreg", sizeof(head.action));
  } else if (flag == PROC_REG_TCS) {
    memcpy(head.action, "tcsreg", sizeof(head.action));
  } else if (flag == PROC_QUE_TCS) {
    memcpy(head.action, "tcsque", sizeof(head.action));
  } else if (flag == PROC_QUE_STCS) {
    memcpy(head.action, "stcsque", sizeof(head.action));
  } else  if (flag == PROC_QUE_ATCS) {
    memcpy(head.action, "atcsque", sizeof(head.action));
  } else if (flag == PROC_REG_BUF) {
    memcpy(head.action, "bufreg", sizeof(head.action));
  } else {
    return -1;
  }
  if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
    head.topic_size = 0;
    if (pData != NULL) {
      head.content_size = sizeof(ProcInfo);
    } else {
      head.content_size = 0;
    }
  } else {
    head.topic_size = len;
    head.content_size = 0;
  }
  void *buf_temp;
  int buf_size;
  if ((flag == PROC_REG) || (flag == PROC_UNREG)) {
    buf_size = get_bus_sendbuf(head, NULL, 0, pData, head.content_size, &buf_temp);
  } else {
    buf_size = get_bus_sendbuf(head, pData, len, NULL, head.content_size, &buf_temp);
  }
  if (timeout_ms > 0) {
    ts.tv_sec = timeout_ms /1000;
    ts.tv_nsec = (timeout_ms - ts.tv_sec * 1000) * 1000 * 1000;
    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) {
      ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_TIMEOUT_FLAG);
    } else {
      ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_TIMEOUT_FLAG);
    }
  } else if (timeout_ms == 0) {
    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) {
      ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_NOWAIT_FLAG);
    } else {
      ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_NOWAIT_FLAG);
    }
  } else {
    if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) {
      ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, -1);
    } else {
      ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, -1);
    }
  }
  free(buf_temp);
  return ret;
}
/**
 * 发送信息
 * @key 发送给谁
 * @return 0 成功, 其他值 失败的错误码
 */
int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag) {
   int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag);
int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag, int reset, int data_set) {
   int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag, reset, data_set);
  if(rv == 0) {
     logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key);
     return 0;
  }
@@ -69,11 +181,11 @@
 * @key 从谁哪里收到的信息
 * @return 0 成功, 其他值 失败的错误码
*/
int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
   int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag, int reset, int data_set) {
  int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag, reset, data_set);
   if(rv == 0) {
    logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
@@ -87,12 +199,11 @@
 * @key 发送给谁
 * @return 0 成功, 其他值 失败的错误码
*/
int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key,
int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key,
   void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
   int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
   if(rv == 0) {
     logger->debug("ShmModSocket::sendandrecv:  sendandrecv to %d success.\n", send_key);
     return 0;
  }
@@ -193,8 +304,8 @@
   memcpy(head.action, "pub", sizeof(head.action));
   head.topic_size = topic_size = strlen(topic) + 1;
   head.content_size = content_size;
   void *buf;
  void *buf;
   int size = get_bus_sendbuf(head, topic,  topic_size, content,  content_size, &buf);
   if(size > 0) {
      ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
@@ -225,6 +336,8 @@
  int buf_size;
  char *buf;
  int  max_buf_size;
  void *buf_ptr;
  int count = 0;
  if((buf = (char *) malloc(MAXBUF)) == NULL) {
    LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
    exit(1);
@@ -232,7 +345,7 @@
    max_buf_size = MAXBUF;
  }
  buf_size = BUS_HEAD_SIZE + content_size + topic_size  ;
  buf_size = BUS_HEAD_SIZE + content_size + topic_size;
  if(max_buf_size < buf_size) {
    
    if((buf = (char *) realloc(buf, buf_size)) == NULL) {
@@ -243,13 +356,26 @@
    }
  }
  memcpy(buf, ShmModSocket::encode_bus_head(request_head), BUS_HEAD_SIZE);
  buf_ptr = ShmModSocket::encode_bus_head(request_head);
  memcpy(buf, buf_ptr, BUS_HEAD_SIZE);
  if(topic_size != 0 ) 
    memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size);
  if(content_size != 0)
     memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
  if ((content_size != 0) && (strncmp(request_head.action, "reg", strlen("reg")) != 0) && \
                              (strncmp(request_head.action, "unreg", strlen("unreg")) != 0)) {
      memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
  } else {
    if (((strncmp(request_head.action, "reg", strlen("reg")) == 0) || (strncmp(request_head.action, "unreg", \
                                strlen("unreg")) == 0)) && (content_buf != NULL)) {
      proc_copy(buf + BUS_HEAD_SIZE + topic_size, const_cast<void *> (content_buf), &count);
      request_head.content_size = count;
      buf_size -= (content_size - count);
    }
  }
 
  *retbuf = buf;
  free(buf_ptr);
  return buf_size;
}
@@ -268,7 +394,7 @@
  tmp_ptr += sizeof(head.action);
  PUT(tmp_ptr, htonl(head.topic_size));
  tmp_ptr += 4;
  tmp_ptr += sizeof(head.topic_size);
  PUT(tmp_ptr, htonl(head.content_size));
  
  return headbs;
@@ -283,7 +409,7 @@
  tmp_ptr += sizeof(head.action);
  head.topic_size = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  tmp_ptr += sizeof(head.topic_size);
  head.content_size = ntohl(GET(tmp_ptr));
 
  return head;