wangzhengquan
2021-02-03 758438289fc45829a8f6cef1b42afed0a1a8cb60
src/socket/shm_mod_socket.cpp
@@ -18,7 +18,7 @@
   struct timespec timeout = {1, 0};
   if(bus_set != NULL) {
      for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) {
         desub_timeout(NULL, 0, *bus_iter, &timeout);
         desub(NULL, 0, *bus_iter, &timeout, BUS_TIMEOUT_FLAG);
      }
      delete bus_set;
   }
@@ -37,151 +37,86 @@
int ShmModSocket::force_bind(int key) {
   return shm_socket_force_bind(shm_socket, key);
}
/**
 * 发送信息
 * @key 发送给谁
 * @return 0 成功, 其他值 失败的错误码
 */
int ShmModSocket::sendto(const void *buf, const int size, const int key) {
      return shm_sendto(shm_socket, buf, size, key, NULL, 0);
int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag) {
   int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag);
  if(rv == 0) {
     logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key);
     return 0;
  }
  logger->debug("ShmModSocket::sendto : %d sendto  %d failed %s", get_key(),  key, bus_strerror(rv));
  return rv;
}
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) {
   return shm_sendto(shm_socket, buf, size, key, timeout, 0);
}
// 发送信息立刻返回。
int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){
   return shm_sendto(shm_socket, buf, size, key, NULL, (int)BUS_NOWAIT_FLAG);
}
/**
 * 接收信息
 * @key 从谁哪里收到的信息
 * @return 0 成功, 其他值 失败的错误码
*/
int ShmModSocket::recvfrom(void **buf, int *size, int *key) {
   int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, 0);
int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
   int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
   if(rv == 0) {
    logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  logger->debug("ShmModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv));
  return rv;
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, struct timespec *timeout) {
   int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG);
    return rv;
}
int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
   int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, BUS_NOWAIT_FLAG);
   // logger->error(rv, "ShmModSocket::recvfrom_nowait failed!");
  return rv;
}
/**
 * 发送请求信息并等待接收应答
 * @key 发送给谁
 * @return 0 成功, 其他值 失败的错误码
*/
int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
   return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){
   return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
}
int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
   return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)BUS_NOWAIT_FLAG);
}
int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key,
   void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
   int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
   return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){
   return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
}
int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
   return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)BUS_NOWAIT_FLAG);
   if(rv == 0) {
     logger->debug("ShmModSocket::sendandrecv:  sendandrecv to %d success.\n", send_key);
     return 0;
  }
  logger->debug("ShmModSocket::sendandrecv : sendandrecv to %d failed %s",  send_key, bus_strerror(rv));
   return rv;
}
int ShmModSocket::recvandsend(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback,
                    const struct timespec *timeout , int flag ) {
  int rv = shm_recvandsend(shm_socket, recvbuf, recvsize, key, callback, timeout, flag);
  if(rv == 0) {
    logger->debug("ShmModSocket::shm_recvandsend: success. key = %d\n", *key);
    return 0;
  }
  logger->debug("ShmModSocket::shm_recvandsend :  failed. %s", bus_strerror(rv));
  return rv;
}
// // 超时返回。 @sec 秒 , @nsec 纳秒
// int ShmModSocket::sendandrecv_unsafe(const void *send_buf, const int send_size, const int send_key,
//    void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
//    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
// }
/**
 * 订阅指定主题
 * @topic 主题
 * @size 主题长度
 * @key 总线端口
 */
int  ShmModSocket::sub(char *topic, int size, int key){
   return _sub_( topic, size, key, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  ShmModSocket::sub_timeout(char *topic, int size, int key, struct timespec *timeout){
   return _sub_(topic, size, key, timeout, 0);
}
int  ShmModSocket::sub_nowait(char *topic, int size, int key) {
   return _sub_(topic, size, key, NULL,  (int)BUS_NOWAIT_FLAG);
}
/**
 * 取消订阅指定主题
 * @topic 主题
 * @size 主题长度
 * @key 总线端口
 */
int  ShmModSocket::desub(char *topic, int size, int key){
   return _desub_( topic, size, key, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  ShmModSocket::desub_timeout(char *topic, int size, int key, struct timespec *timeout){
   return _desub_(topic, size, key, timeout, 0);
}
int  ShmModSocket::desub_nowait(char *topic, int size, int key) {
   return _desub_(topic, size, key, NULL,  (int)BUS_NOWAIT_FLAG);
}
/**
 * 发布主题
 * @topic 主题
 * @content 主题内容
 * @key 总线端口
 */
int  ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key){
      return _pub_(topic, topic_size, content, content_size, key, NULL, 0);
}
//  超时返回。 @sec 秒 , @nsec 纳秒
int  ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, struct timespec * timeout){
   return _pub_( topic, topic_size, content, content_size, key, timeout, 0);
}
int  ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){
   return _pub_(topic, topic_size, content, content_size, key, NULL, (int)BUS_NOWAIT_FLAG);
}
/**
 * 获取soket key
 */
int ShmModSocket::get_key(){
   return shm_socket->key;
}
// =============================================================================
/**
 * @key 总线端口
 */
int  ShmModSocket::_sub_(char *topic, int topic_size, int key,
   struct timespec *timeout, int flags) {
int  ShmModSocket::sub(const char *topic, int topic_size, int key,
   const struct timespec *timeout, int flags) {
   int ret;
   bus_head_t head = {};
   memcpy(head.action, "sub", sizeof(head.action));
@@ -203,11 +138,15 @@
}
/**
 * 取消订阅指定主题
 * @topic 主题
 * @size 主题长度
 * @key 总线端口
 */
int  ShmModSocket::_desub_(char *topic, int topic_size, int key,
   struct timespec *timeout, int flags) {
int  ShmModSocket::desub(const char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
   // char buf[8192];
   int ret;
   if(topic == NULL) {
@@ -225,33 +164,27 @@
   if(size > 0) {
      ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
      free(buf);
      if(ret == EBUS_TIMEOUT) {
       logger->error(ret, "ShmModSocket::_desub_ key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT));
       return EBUS_TIMEOUT;
     } else {
       logger->error(ret, "ShmModSocket::_desub_ key %d failed!", key);
       return ret;
     }
      if(ret == 0) {
         return 0;
      } else {
         logger->error("ShmModSocket::_desub_ key %d failed, %s", key, bus_strerror(ret));
         return ret;
      }
   } else {
      return -1;
   }
}
/**
 * @key 总线端口
 * @str "<**pub**>{经济}"
 */
int  ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key,
   struct timespec *timeout, int flags) {
   // int head_len;
   // char buf[8192+content_size];
   // snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
   // head_len = strlen(buf);
   // memcpy(buf+head_len, content, content_size);
/**
 * 发布主题
 * @topic 主题
 * @content 主题内容
 * @key 总线端口
 */
int  ShmModSocket::pub(const char *topic, int topic_size, const void *content, int content_size, int key, const struct timespec *timeout, int flags) {
   int ret;
   bus_head_t head = {};
   memcpy(head.action, "pub", sizeof(head.action));
@@ -268,17 +201,28 @@
      return -1;
   }
}
/**
 * 获取soket key
 */
int ShmModSocket::get_key(){
   return shm_socket->key;
}
// =============================================================================
int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head, 
  void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) {
 const void *topic_buf, int topic_size, const void *content_buf, int content_size, void **retbuf) {
 
  int buf_size;
  char *buf;
  int  max_buf_size;
  if((buf = (char *)malloc(MAXBUF)) == NULL) {
  if((buf = (char *) malloc(MAXBUF)) == NULL) {
    LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
    exit(1);
  } else {
@@ -288,7 +232,7 @@
  buf_size = BUS_HEAD_SIZE + content_size + topic_size  ;
  if(max_buf_size < buf_size) {
    
    if((buf = (char *)realloc(buf, buf_size)) == NULL) {
    if((buf = (char *) realloc(buf, buf_size)) == NULL) {
      LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf  realloc buf");
      exit(1);
    } else {