wangzhengquan
2020-12-22 fb8aef5a4908a50d415cf5ed33a10699fdfa9c98
src/socket/shm_mod_socket.c
@@ -187,30 +187,65 @@
/**
 * @key 总线端口
 */
int  ShmModSocket::_sub_(char *topic, int size, int key,
int  ShmModSocket::_sub_(char *topic, int topic_size, int key,
   struct timespec *timeout, int flags) {
   char buf[8192];
   int rv;
   snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
   rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
   if(rv == 0) {
      bus_set->insert(key);
   // char buf[8192];
   // int rv;
   // snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
   // rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
   // if(rv == 0) {
   //    bus_set->insert(key);
   // }
   // return rv;
   int ret;
   bus_head_t head = {};
   memcpy(head.action, "sub", sizeof(head.action));
   head.topic_size = topic_size = strlen(topic) + 1;
   head.content_size = 0;
   void *buf;
   int size = get_bus_sendbuf(head, topic, topic_size, NULL,  0, &buf);
   if(size > 0) {
      ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
      free(buf);
      if(ret == 0) {
         bus_set->insert(key);
      }
      return ret;
   } else {
      return -1;
   }
   return rv;
}
/**
 * @key 总线端口
 */
int  ShmModSocket::_desub_(char *topic, int size, int key,
int  ShmModSocket::_desub_(char *topic, int topic_size, int key,
   struct timespec *timeout, int flags) {
   char buf[8192];
   // char buf[8192];
   int ret;
   if(topic == NULL) {
      topic = "";
   }
   snprintf(buf,  8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER,  topic, TOPIC_RIDENTIFIER);
   return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
   // snprintf(buf,  8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER,  topic, TOPIC_RIDENTIFIER);
   // return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
   bus_head_t head = {};
   memcpy(head.action, "desub", sizeof(head.action));
   head.topic_size = topic_size = strlen(topic) + 1;
   head.content_size = 0;
   void *buf;
   int size = get_bus_sendbuf(head, topic,  topic_size, NULL,  0, &buf);
   if(size > 0) {
      ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
      free(buf);
      return ret;
   } else {
      return -1;
   }
}
/**
@@ -220,15 +255,100 @@
 
int  ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key,  
   struct timespec *timeout, int flags) {
   int head_len;
   char buf[8192+content_size];
   snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
   head_len = strlen(buf);
   memcpy(buf+head_len, content, content_size);
   return shm_sendto(shm_socket, buf, head_len+content_size, key, timeout, flags);
   // int head_len;
   // char buf[8192+content_size];
   // snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
   // head_len = strlen(buf);
   // memcpy(buf+head_len, content, content_size);
   int ret;
   bus_head_t head = {};
   memcpy(head.action, "pub", sizeof(head.action));
   head.topic_size = topic_size = strlen(topic) + 1;
   head.content_size = content_size;
   void *buf;
   int size = get_bus_sendbuf(head, topic,  topic_size, content,  content_size, &buf);
   if(size > 0) {
      ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
      free(buf);
      return ret;
   } else {
      return -1;
   }
}
int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head,
  void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) {
  int buf_size;
  char *buf;
  int  max_buf_size;
  if((buf = (char *)malloc(MAXBUF)) == NULL) {
    LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
    exit(1);
  } else {
    max_buf_size = MAXBUF;
  }
  buf_size = BUS_HEAD_SIZE + content_size + topic_size  ;
  if(max_buf_size < buf_size) {
    if((buf = (char *)realloc(buf, buf_size)) == NULL) {
      LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf  realloc buf");
      exit(1);
    } else {
      max_buf_size = buf_size;
    }
  }
  memcpy(buf, ShmModSocket::encode_bus_head(request_head), BUS_HEAD_SIZE);
  if(topic_size != 0 )
    memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size);
  if(content_size != 0)
     memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
  *retbuf = buf;
  return buf_size;
}
/**
   char action[];
  uint32_t topic_size;
   uint32_t content_size;
*/
void * ShmModSocket::encode_bus_head(bus_head_t & head) {
  void * headbs = malloc(BUS_HEAD_SIZE);
  char *tmp_ptr = (char *)headbs;
  memcpy(tmp_ptr, head.action, sizeof(head.action));
  tmp_ptr += sizeof(head.action);
  PUT(tmp_ptr, htonl(head.topic_size));
  tmp_ptr += 4;
  PUT(tmp_ptr, htonl(head.content_size));
  return headbs;
}
bus_head_t  ShmModSocket::decode_bus_head(void *headbs) {
  char *tmp_ptr = (char *)headbs;
  bus_head_t head;
  memcpy(head.action, tmp_ptr, sizeof(head.action));
  tmp_ptr += sizeof(head.action);
  head.topic_size = ntohl(GET(tmp_ptr));
  tmp_ptr += 4;
  head.content_size = ntohl(GET(tmp_ptr));
  return head;
}