wangzhengquan
2021-01-25 c46be6db32872bfd7c4010b43526b5e6bc0fa6a5
update
1个文件已删除
1 文件已重命名
14个文件已修改
800538 ■■■■■ 已修改文件
src/bus_error.cpp 36 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_error.h 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/psem.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/array_lock_free_sem_queue.h 32 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h 87 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/shm_queue.h 103 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/hashtable.cpp 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/hashtable.cpp.2 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.cpp 65 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 75 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/svsem.cpp 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/time_util.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_queue/test.txt 800024 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/bus_test.cpp 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_error.cpp
@@ -42,9 +42,8 @@
char *
bus_strerror(int err)
{
  int s, eindex;
  int s;
  char *buf;
  eindex = err - EBUS_BASE;
  /* Make first caller allocate key for thread-specific data */
  s = pthread_once(&once, createKey);
@@ -65,14 +64,31 @@
      err_exit(s, "pthread_setspecific");
  }
  if (eindex < 0 || eindex >= _bus_nerr || _bus_errlist[eindex] == NULL)
  {
    snprintf(buf, MAX_ERROR_LEN, "Unknown error %d", eindex);
  }
  else
  {
    strncpy(buf, _bus_errlist[eindex], MAX_ERROR_LEN - 1);
    buf[MAX_ERROR_LEN - 1] = '\0';          /* Ensure null termination */
  if(err < EBUS_BASE) {
    // libc错误
    if (err < 0 || err >= _sys_nerr || _sys_errlist[err] == NULL)
    {
      snprintf(buf, MAX_ERROR_LEN, "Unknown error %d", err);
    }
    else
    {
      strncpy(buf, _sys_errlist[err], MAX_ERROR_LEN - 1);
      buf[MAX_ERROR_LEN - 1] = '\0';          /* Ensure null termination */
    }
  } else {
    //自定义错误
    err -= EBUS_BASE;
    if (err < 0 || err >= _bus_nerr || _bus_errlist[err] == NULL)
    {
      snprintf(buf, MAX_ERROR_LEN, "Unknown error %d", err);
    }
    else
    {
      strncpy(buf, _bus_errlist[err], MAX_ERROR_LEN - 1);
      buf[MAX_ERROR_LEN - 1] = '\0';          /* Ensure null termination */
    }
  }
  return buf;
src/bus_error.h
@@ -4,11 +4,11 @@
#define EBUS_BASE 10000
#define EBUS_TIMEOUT 10001
#define EBUS_CLOSED 10002
#define EBUS_KEY_INUSED 10003
#define EBUS_NET 10004
#define EBUS_BASE 500
#define EBUS_TIMEOUT 501
#define EBUS_CLOSED 502
#define EBUS_KEY_INUSED 503
#define EBUS_NET 504
extern int bus_errno;
src/psem.cpp
@@ -13,7 +13,7 @@
          continue;
      else {
         // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
         return rv;
         return -1;
      }
  }
  return 0;
@@ -27,7 +27,7 @@
          continue;
      else {
         // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
         return rv;
         return -1;
      }
  }
  return 0;
src/queue/array_lock_free_sem_queue.h
@@ -204,6 +204,7 @@
{
  uint32_t currentReadIndex;
  uint32_t currentWriteIndex;
  uint32_t tmpIndex;
  int s;
  // sigset_t mask_all, pre;
@@ -215,41 +216,42 @@
  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    if (m_count == Q_SIZE) {
      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
        return -1;
        return errno;
      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          return -1;
          return errno;
        }
            
      } else {
        s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          return -1;
          return errno;
        }
      }
    }
  #else
    if (currentReadIndex == currentWriteIndex - Q_SIZE  + 1   )
    tmpIndex = (uint32_t)(currentWriteIndex - Q_SIZE  + 1);
    if (currentReadIndex ==   tmpIndex )
    {
        // the queue is full
      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
        return -1;
        return errno;
      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0);
        s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          return -1;
          return errno;
        }
            
      } else {
        s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, NULL, NULL, 0);
        s = futex((int *)&m_readIndex, FUTEX_WAIT, tmpIndex, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          return -1;
          return errno;
        }
      }
    }
@@ -315,7 +317,7 @@
      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        // sigprocmask(SIG_SETMASK, &pre, NULL);
        return -1;
        return errno;
      }
      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
@@ -323,14 +325,14 @@
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          // sigprocmask(SIG_SETMASK, &pre, NULL);
          return -1;
          return errno;
        }
            
      } else {
        s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          // sigprocmask(SIG_SETMASK, &pre, NULL);
          return -1;
          return errno;
        }
      }
    }
@@ -344,7 +346,7 @@
      // waiting to commit the data into it
      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        // sigprocmask(SIG_SETMASK, &pre, NULL);
        return -1;
        return errno;
      }
      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
@@ -352,14 +354,14 @@
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          // sigprocmask(SIG_SETMASK, &pre, NULL);
          return -1;
          return errno;
        }
            
      } else {
        s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
         // sigprocmask(SIG_SETMASK, &pre, NULL);
          return -1;
          return errno;
        }
      }
    }
src/queue/lock_free_queue.h
@@ -207,31 +207,31 @@
template<typename ELEM_T,
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
      typename Allocator,
      template<typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) {
    LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
    if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        if (psem_trywait(&slots) == -1) {
            return -1;
        }
    } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        if (psem_timedwait(&slots, timeout) == -1) {
            return -1;
        }
    } else {
        if (psem_wait(&slots) == -1) {
            return -1;
        }
    }
  LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
  if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
      if (psem_trywait(&slots) == -1) {
          return errno;
      }
  } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
      if (psem_timedwait(&slots, timeout) == -1) {
          return errno;
      }
  } else {
      if (psem_wait(&slots) == -1) {
          return errno;
      }
  }
    if (m_qImpl.push(a_data)) {
        psem_post(&items);
        LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
        return 0;
    }
    return -1;
  if (m_qImpl.push(a_data)) {
    psem_post(&items);
    LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
    return 0;
  }
  return -1;
}
@@ -239,31 +239,30 @@
        typename Allocator,
        template<typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) {
  LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before....");
    LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
  if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
      if (psem_trywait(&items) == -1) {
          return errno;
      }
  } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
    LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before. flag=%d ,  %d\n", flag, timeout->tv_sec);
      if (psem_timedwait(&items, timeout) == -1) {
          return errno;
      }
  } else {
      if (psem_wait(&items) == -1) {
          return errno;
      }
  }
    if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        if (psem_trywait(&items) == -1) {
            return -1;
        }
    } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        if (psem_timedwait(&items, timeout) == -1) {
            return -1;
        }
    } else {
        if (psem_wait(&items) == -1) {
            return -1;
        }
    }
    if (m_qImpl.pop(a_data)) {
        psem_post(&slots);
        LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
        return 0;
    }
    return -1;
  if (m_qImpl.pop(a_data)) {
      psem_post(&slots);
      LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
      return 0;
  }
  return -1;
}
template<typename ELEM_T,
src/queue/shm_queue.h
@@ -33,12 +33,8 @@
  bool full();
  bool empty();
  int push(const ELEM_T &a_data);
  int push_nowait(const ELEM_T &a_data);
  int push_timeout(const ELEM_T &a_data, const struct timespec *timeout);
  int pop(ELEM_T &a_data);
  int pop_nowait(ELEM_T &a_data);
  int pop_timeout(ELEM_T &a_data, struct timespec *timeout);
  int push(const ELEM_T &a_data, const struct timespec *timeout=NULL, int flag=0);
  int pop(ELEM_T &a_data, const struct timespec *timeout=NULL, int flag=0);
  ELEM_T &operator[](unsigned i);
@@ -144,89 +140,40 @@
  return queue->empty();
}
template <typename ELEM_T>
int SHMQueue<ELEM_T>::push(const ELEM_T &a_data) {
  int rv = queue->push(a_data);
  if(rv == -1) {
    return errno;
  } else {
int SHMQueue<ELEM_T>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) {
  int rv = queue->push(a_data, timeout, flag);
  if(rv == 0) {
    return 0;
  }
  if(rv == ETIMEDOUT)
    return EBUS_TIMEOUT;
  else {
    LoggerFactory::getLogger()->error("LockFreeQueue push_timeout: %s", bus_strerror(rv));
    return rv;
  }
}
template <typename ELEM_T>
int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) {
  int rv =  queue->push(a_data, NULL, BUS_NOWAIT_FLAG);
  if(rv == -1) {
    if (errno == EAGAIN)
      return EAGAIN;
    else {
        err_msg(errno, "LockFreeQueue push_nowait");
        return errno;
    }
  }
  return 0;
}
int SHMQueue<ELEM_T>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) {
template <typename ELEM_T>
int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) {
  int rv = queue->push(a_data, timeout, BUS_TIMEOUT_FLAG);
  if(rv == -1) {
    if(errno == ETIMEDOUT)
        return EBUS_TIMEOUT;
    else {
       LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
       return errno;
    }
  }
  return 0;
}
template <typename ELEM_T>
int SHMQueue<ELEM_T>::pop(ELEM_T &a_data) {
  LoggerFactory::getLogger()->debug("SHMQueue pop before\n");
  int rv = queue->pop(a_data);
  LoggerFactory::getLogger()->debug("SHMQueue pop before\n");
  if(rv == -1) {
    return errno;
  } else {
  int rv = queue->pop(a_data, timeout, flag);
  if(rv == 0) {
    return 0;
  }
}
template <typename ELEM_T>
int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) {
  int rv = queue->pop(a_data, NULL, BUS_NOWAIT_FLAG);
  if(rv == -1) {
    if (errno == EAGAIN)
      return errno;
    else {
        LoggerFactory::getLogger()->error(errno, " SHMQueue pop_nowait");
        return errno;
    }
  if(rv == ETIMEDOUT)
    return EBUS_TIMEOUT;
  else {
    LoggerFactory::getLogger()->error("LockFreeQueue pop_timeout: %s", bus_strerror(rv));
    return rv;
  }
  return 0;
}
template <typename ELEM_T>
int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) {
  int rv;
  rv = queue->pop(a_data, timeout, BUS_TIMEOUT_FLAG);
  if(rv == -1) {
    if (errno == ETIMEDOUT) {
      return EBUS_TIMEOUT;
    } else {
      LoggerFactory::getLogger()->error(errno, " SHMQueue pop_timeout");
      return errno;
    }
  }
  return 0;
  return rv;
  
}
src/shm/hashtable.cpp
@@ -110,13 +110,13 @@
  int rv;
  if( (rv = svsem_wait(hashtable->mutex)) != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
    LoggerFactory::getLogger()->error(errno, "hashtable_remove\n");
  }
  tailq_header_t *my_tailq_head = hashtable->array[code] ;
  if ( my_tailq_head == NULL)
  {
    if((rv = svsem_post(hashtable->mutex)) != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
      LoggerFactory::getLogger()->error(errno, "hashtable_remove\n");
    }
    return NULL;
  } else {
@@ -136,7 +136,7 @@
    }
    if((rv = svsem_post(hashtable->mutex)) != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
      LoggerFactory::getLogger()->error(errno, "hashtable_remove\n");
    }
    return NULL;
@@ -155,13 +155,13 @@
  int rv;
  if(( rv = svsem_wait(hashtable->mutex)) != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
    LoggerFactory::getLogger()->error(errno, "hashtable_put\n");
  }
  _hashtable_put(hashtable, key, value);
  if(( rv = svsem_post(hashtable->mutex)) != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
    LoggerFactory::getLogger()->error(errno, "hashtable_put\n");
  }
}
@@ -212,7 +212,7 @@
  int key = START_KEY;
  rv = svsem_wait(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_alloc_key\n");
    LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n");
  }
  while(_hashtable_get(hashtable, key) != NULL) {
@@ -223,7 +223,7 @@
  rv = svsem_post(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_alloc_key\n");
    LoggerFactory::getLogger()->error(errno, "hashtable_alloc_key\n");
  }
  return key;
}
@@ -234,7 +234,7 @@
  tailq_entry_t *item;
  int rv;
  if( (rv = svsem_wait(hashtable->mutex)) != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n");
    LoggerFactory::getLogger()->error(errno, "hashtable_removeall\n");
  }
  for (int i = 0; i < MAPSIZE; i++)
  {
@@ -253,7 +253,7 @@
  }
  if((rv = svsem_post(hashtable->mutex)) != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n");
    LoggerFactory::getLogger()->error(errno, "hashtable_removeall\n");
  }
}
src/shm/hashtable.cpp.2
src/socket/shm_mod_socket.cpp
@@ -47,11 +47,11 @@
}
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) {
    return shm_sendto(shm_socket, buf, size, key, timeout, 0);
    return shm_sendto(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG);
}
// 发送信息立刻返回。
int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){
    return shm_sendto(shm_socket, buf, size, key, NULL, (int)BUS_NOWAIT_FLAG);
    return shm_sendto(shm_socket, buf, size, key, NULL, BUS_NOWAIT_FLAG);
}
 
@@ -67,10 +67,12 @@
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, struct timespec *timeout) {
int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, const struct timespec *timeout) {
    int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG);
     return rv;
     // printf("ShmModSocket::recvfrom_timeout\n");
     // return 501;
}
int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
@@ -88,22 +90,25 @@
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout){
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, BUS_TIMEOUT_FLAG);
}
int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)BUS_NOWAIT_FLAG);
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, BUS_NOWAIT_FLAG);
}
int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, BUS_TIMEOUT_FLAG);
}
int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)BUS_NOWAIT_FLAG);
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, BUS_NOWAIT_FLAG);
}
@@ -119,11 +124,11 @@
    return _sub_( topic, size, key, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  ShmModSocket::sub_timeout(char *topic, int size, int key, struct timespec *timeout){
    return _sub_(topic, size, key, timeout, 0);
int  ShmModSocket::sub_timeout(char *topic, int size, int key, const struct timespec *timeout){
    return _sub_(topic, size, key, timeout, BUS_TIMEOUT_FLAG);
}
int  ShmModSocket::sub_nowait(char *topic, int size, int key) {
    return _sub_(topic, size, key, NULL,  (int)BUS_NOWAIT_FLAG);
    return _sub_(topic, size, key, NULL,  BUS_NOWAIT_FLAG);
}
@@ -138,11 +143,11 @@
    return _desub_( topic, size, key, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  ShmModSocket::desub_timeout(char *topic, int size, int key, struct timespec *timeout){
    return _desub_(topic, size, key, timeout, 0);
int  ShmModSocket::desub_timeout(char *topic, int size, int key, const struct timespec *timeout){
    return _desub_(topic, size, key, timeout, BUS_TIMEOUT_FLAG);
}
int  ShmModSocket::desub_nowait(char *topic, int size, int key) {
    return _desub_(topic, size, key, NULL,  (int)BUS_NOWAIT_FLAG);
    return _desub_(topic, size, key, NULL,  BUS_NOWAIT_FLAG);
}
@@ -157,11 +162,11 @@
        return _pub_(topic, topic_size, content, content_size, key, NULL, 0);
}
//  超时返回。 @sec 秒 , @nsec 纳秒
int  ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, struct timespec * timeout){
    return _pub_( topic, topic_size, content, content_size, key, timeout, 0);
int  ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec * timeout){
    return _pub_( topic, topic_size, content, content_size, key, timeout, BUS_TIMEOUT_FLAG);
}
int  ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){
    return _pub_(topic, topic_size, content, content_size, key, NULL, (int)BUS_NOWAIT_FLAG);
    return _pub_(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG);
}
@@ -179,7 +184,7 @@
 * @key 总线端口
 */
int  ShmModSocket::_sub_(char *topic, int topic_size, int key,  
    struct timespec *timeout, int flags) {
    const struct timespec *timeout, int flags) {
     
    int ret;
@@ -206,8 +211,7 @@
/**
 * @key 总线端口
 */
int  ShmModSocket::_desub_(char *topic, int topic_size, int key,
    struct timespec *timeout, int flags) {
int  ShmModSocket::_desub_(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
    // char buf[8192];
    int ret;
    if(topic == NULL) {
@@ -225,14 +229,12 @@
    if(size > 0) {
        ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
        free(buf);
        if(ret == EBUS_TIMEOUT) {
        logger->error(ret, "ShmModSocket::_desub_ key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT));
        return EBUS_TIMEOUT;
      } else {
        logger->error(ret, "ShmModSocket::_desub_ key %d failed!", key);
        return ret;
      }
        if(ret == 0) {
            return 0;
        } else {
            logger->error("ShmModSocket::_desub_ key %d failed, %s", key, bus_strerror(ret));
            return ret;
        }
    } else {
        return -1;
    }
@@ -244,8 +246,7 @@
 * @str "<**pub**>{经济}"
 */
 
int  ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key,
    struct timespec *timeout, int flags) {
int  ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) {
    // int head_len;
    // char buf[8192+content_size];
    // snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
src/socket/shm_mod_socket.h
@@ -31,10 +31,10 @@
private:
     
    int _sub_( char *topic, int size, int key, struct timespec *timeout, int flags);
    int _pub_( char *topic, int topic_size, void *content, int content_size, int key, struct timespec *timeout, int flags);
    int _sub_( char *topic, int size, int key, const struct timespec *timeouts,  int flags);
    int _pub_( char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeouts,  int flags);
    int  _desub_( char *topic, int size, int key, struct timespec *timeout, int flags);
    int  _desub_( char *topic, int size, int key, const struct timespec *timeouts, int flags);
    static int get_bus_sendbuf(bus_head_t &request_head, void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf);
@@ -80,7 +80,7 @@
    */
    int recvfrom(void **buf, int *size, int *key);
    // 接受信息超时返回。 @sec 秒 , @nsec 纳秒
    int recvfrom_timeout(void **buf, int *size, int *key,  struct timespec *timeout);
    int recvfrom_timeout(void **buf, int *size, int *key,  const struct timespec *timeout);
    int recvfrom_nowait(void **buf, int *size, int *key);
    /**
@@ -90,13 +90,13 @@
    */
    int sendandrecv(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int sendandrecv_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size,  struct timespec *timeout) ;
    int sendandrecv_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, const struct timespec *timeout) ;
    int sendandrecv_nowait(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
    int sendandrecv_unsafe(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size,  struct timespec *timeout) ;
    int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, const  struct timespec *timeout) ;
    int sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
    /**
@@ -107,7 +107,7 @@
     */
    int  sub(char *topic, int size, int key);
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int  sub_timeout(char *topic, int size, int key,  struct timespec *timeout);
    int  sub_timeout(char *topic, int size, int key,  const struct timespec *timeout);
    int  sub_nowait(char *topic, int size, int key);
@@ -119,7 +119,7 @@
     */
    int desub( char *topic, int size, int key);
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int desub_timeout(char *topic, int size, int key, struct timespec *timeout);
    int desub_timeout(char *topic, int size, int key, const struct timespec *timeout);
    int desub_nowait(char *topic, int size, int key) ;
    /**
@@ -130,7 +130,7 @@
     */
    int  pub(char *topic, int topic_size, void *content, int content_size, int key);
    //  超时返回。 @sec 秒 , @nsec 纳秒
    int  pub_timeout(char *topic, int topic_size, void *content, int content_size, int key,  struct timespec *timeout);
    int  pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, const  struct timespec *timeout);
    int  pub_nowait(char *topic, int topic_size, void *content, int content_size, int key);
src/socket/shm_socket.cpp
@@ -190,7 +190,7 @@
    msg.size = 0;
    msg.type = SHM_SOCKET_OPEN_REPLY;
    if (client_socket->remoteQueue->push_timeout(msg, &timeout) == 0) {
    if (client_socket->remoteQueue->push(msg, &timeout, BUS_TIMEOUT_FLAG) == 0) {
      client_socket->status = SHM_CONN_ESTABLISHED;
      return client_socket;
    } else {
@@ -243,7 +243,7 @@
  msg.key = socket->key;
  msg.size = 0;
  msg.type = SHM_SOCKET_OPEN;
  socket->remoteQueue->push_timeout(msg, &timeout);
  socket->remoteQueue->push(msg, &timeout, BUS_TIMEOUT_FLAG);
  //接受open reply
  if (socket->queue->pop(msg) == 0) {
@@ -315,7 +315,7 @@
// 短连接方式发送
int shm_sendto(shm_socket_t *socket, const void *buf, const int size,
               const int key, const struct timespec *timeout, const int flags) {
               const int key, const struct timespec *timeout, const int flag) {
  int s;
  int rv;
@@ -372,73 +372,60 @@
  dest.buf = mm_malloc(size);
  memcpy(dest.buf, buf, size);
  if( (flags & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
    rv = remoteQueue->push_nowait(dest);
  } else if((flags & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
      rv = remoteQueue->push_timeout(dest, timeout);
  } else {
      rv = remoteQueue->push(dest);
  }
  rv = remoteQueue->push(dest, timeout, flag);
  if (rv == 0) {
    // printf("shm_sendto push after\n");
    return 0;
  } else {
    mm_free(dest.buf);
    if(rv > EBUS_BASE) {
      // bus_errno = EBUS_TIMEOUT;
      logger->debug("sendto key %d failed %s", key, bus_strerror(rv));
    } else {
      logger->error(rv, "sendto key %d failed", key);
    }
    logger->debug("sendto key %d failed %s", key, bus_strerror(rv));
    return rv;
  }
}
int shm_recvfrom2(shm_socket_t *socket, void **buf, int *size, int *key,  const struct timespec *timeout,  int flag) {
  return 501;
}
// 短连接方式接受
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key,  struct timespec *timeout,  int flags) {
int shm_recvfrom(shm_socket_t *sokt, void **buf, int *size, int *key,  const struct timespec *timeout,  int flag) {
  int s;
  int rv;
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
  if (sokt->socket_type != SHM_SOCKET_DGRAM) {
    logger->error("shm_socket.shm_recvfrom: Can't invoke shm_recvfrom method in a %d type socket  which "
                "is not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
             sokt->socket_type);
    exit(1);
  }
  hashtable_t *hashtable = mm_get_hashtable();
  if ((s = pthread_mutex_lock(&(socket->mutex))) != 0)
  if ((s = pthread_mutex_lock(&(sokt->mutex))) != 0)
    err_exit(s, "shm_recvfrom : pthread_mutex_lock");
 
  if (socket->queue == NULL) {
    if (socket->key == 0) {
      socket->key = hashtable_alloc_key(hashtable);
  if (sokt->queue == NULL) {
    if (sokt->key == 0) {
      sokt->key = hashtable_alloc_key(hashtable);
    } else {
      if(!_shm_socket_check_key(socket)) {
      if(!_shm_socket_check_key(sokt)) {
        bus_errno = EBUS_KEY_INUSED;
        return EBUS_KEY_INUSED;
      }
    }
    socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
    sokt->queue = new SHMQueue<shm_msg_t>(sokt->key, 16);
  }
  
  if ((s = pthread_mutex_unlock(&(socket->mutex))) != 0)
  if ((s = pthread_mutex_unlock(&(sokt->mutex))) != 0)
    err_exit(s, "shm_recvfrom : pthread_mutex_unlock");
  shm_msg_t src;
 
   if((flags & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
    rv = socket->queue->pop_nowait(src);
  } else if((flags & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
    rv = socket->queue->pop_timeout(src, timeout);
// printf("0 shm_recvfrom====%d\n", rv);
  } else {
    rv = socket->queue->pop(src);
  }
printf ("====== before ======\n");
  rv = sokt->queue->pop(src, timeout, flag);
printf ("====== after ======\n %d", rv);
  if (rv == 0) {
    if(buf != NULL) {
@@ -456,11 +443,7 @@
    mm_free(src.buf);
    return 0;
  } else {
    if(rv > EBUS_BASE) {
      logger->debug("shm_recvfrom failed %s", bus_strerror(rv));
    } else {
      logger->error(rv, "shm_recvfrom failed");
    }
    logger->debug("shm_recvfrom failed %s", bus_strerror(rv));
    return rv;
  }
@@ -503,7 +486,7 @@
// use thread local
int _shm_sendandrecv_thread_local(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
                    int *recv_size,  const struct timespec *timeout,  int flags) {
  int recv_key;
  int rv;
@@ -548,7 +531,7 @@
int _shm_sendandrecv_alloc_new(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
                    int *recv_size,  const struct timespec *timeout,  int flags) {
  int recv_key;
  int rv;
@@ -577,7 +560,7 @@
int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
                    int *recv_size,  const struct timespec *timeout,  int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    logger->error( "shm_socket.shm_sendandrecv_unsafe : Can't invoke shm_sendandrecv method in a %d type socket  "
                "which is not a SHM_SOCKET_DGRAM socket ",
@@ -599,7 +582,7 @@
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
                    int *recv_size,  const struct timespec *timeout,  int flags) {
  return  _shm_sendandrecv_thread_local(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout,  flags);
}
@@ -713,7 +696,7 @@
  close_msg.size = 0;
  close_msg.type = SHM_SOCKET_CLOSE;
  if (notifyRemote && socket->remoteQueue != NULL) {
    socket->remoteQueue->push_timeout(close_msg, &timeout);
    socket->remoteQueue->push(close_msg, &timeout, BUS_TIMEOUT_FLAG);
  }
  if (socket->queue != NULL) {
@@ -739,7 +722,7 @@
         iter != socket->clientSocketMap->end(); iter++) {
      client_socket = iter->second;
      client_socket->remoteQueue->push_timeout(close_msg, &timeout);
      client_socket->remoteQueue->push(close_msg, &timeout, BUS_TIMEOUT_FLAG);
      client_socket->remoteQueue = NULL;
      delete client_socket->messageQueue;
src/socket/shm_socket.h
@@ -88,16 +88,16 @@
 */
int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0);
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key,   struct timespec * timeout = NULL,  int flags=0);
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key,  const struct timespec * timeout = NULL,  int flags=0);
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size,  
    struct timespec * timeout = NULL,  int flags=0);
    const struct timespec * timeout = NULL,  int flags=0);
/**
 * 功能同shm_sendandrecv, 但是不是线程安全的
 */
int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size,  
    struct timespec * timeout = NULL,  int flags=0);
    const struct timespec * timeout = NULL,  int flags=0);
src/svsem.cpp
@@ -83,7 +83,7 @@
  while (semop(semid, &sops, 1) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "svsem_dec");
      return errno;
      return -1;
    }
  return 0;
@@ -95,14 +95,9 @@
  sops.sem_num = 0;
  sops.sem_op = -1;
  sops.sem_flg = IPC_NOWAIT | 0;
  if (semop(semid, &sops, 1) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "svsem_dec_nowait");
      return errno;
    }
  return 0;
  return semop(semid, &sops, 1) ;
}
int svsem_timedwait(const int semid, const struct timespec *timeout) {
@@ -115,7 +110,7 @@
  while (semtimedop(semid, &sops, 1, timeout) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "svsem_psem_timedwait");
      return errno;
      return -1;
    }
  return 0;
@@ -133,7 +128,7 @@
  int rv = semop(semid, &sops, 1);
  if (rv == -1) {
    // err_msg(errno, "svsem_inc");
    return errno;
    return -1;
  }
  return 0;
}
@@ -149,7 +144,7 @@
  arg.val = 1;
  if (semctl(semid, 1, SETVAL, arg) == -1) {
    err_msg(errno, "svsem_set");
    return errno;
    return -1;
  }   
  //释放mutex
@@ -165,7 +160,7 @@
  while (semop(semid, sops, 2) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "Svsvsem_dec");
      return errno;
      return -1;
    }
@@ -177,7 +172,7 @@
  while (semop(semid, sops, 1) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "Svsvsem_dec");
      return errno;
      return -1;
    }
  return 0;
@@ -190,7 +185,7 @@
  
  if (semctl(semid, 1, SETVAL, arg) == -1) {
    err_msg(errno, "svsem_set");
    return errno;
    return -1;
  }
  return 0;
}
@@ -212,7 +207,7 @@
  while (semop(semid, &sops, 1) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "svsem_zero");
      return errno;
      return -1;
    }
  return 0;
@@ -229,7 +224,7 @@
  while (semop(semid, &sops, 1) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "svsem_zero_nowait");
      return errno;
      return -1;
    }
  return 0;
@@ -245,7 +240,7 @@
  while (semtimedop(semid, &sops, 1, timeout) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "svsem_zero_timeout");
      return errno;
      return -1;
    }
  return 0;
@@ -257,11 +252,8 @@
int svsem_set(int semid, int val) {
  union semun arg;
  arg.val = val;
  if (semctl(semid, 0, SETVAL, arg) == -1) {
    err_msg(errno, "svsem_set");
    return errno;
  }
  return 0;
  return semctl(semid, 0, SETVAL, arg);
}
src/time_util.cpp
@@ -6,12 +6,12 @@
struct timespec TimeUtil::calc_abs_time(const struct timespec *ts) {
 
    struct timespec res;
  struct timespec timeout;
  if (clock_gettime(CLOCK_REALTIME, &timeout) == -1)
  struct timespec cur;
  if (clock_gettime(CLOCK_REALTIME, &cur) == -1)
      err_exit(errno, "clock_gettime");
  res.tv_sec = timeout.tv_sec + ts->tv_sec;
  res.tv_nsec = timeout.tv_nsec + ts->tv_nsec;
  res.tv_sec = cur.tv_sec + ts->tv_sec;
  res.tv_nsec = cur.tv_nsec + ts->tv_nsec;
  res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO);
  res.tv_nsec = res.tv_nsec % NANO;
  return res;
test_queue/test.txt
File was deleted
test_socket/bus_test.cpp
@@ -3,6 +3,10 @@
#include "shm_mm_wrapper.h"
#include "usg_common.h"
#include "mm.h"
#include "logger_factory.h"
#include "bus_error.h"
static Logger *logger = LoggerFactory::getLogger();
BusServerSocket * server_socket;
void sigint_handler(int sig) {
@@ -46,7 +50,7 @@
void *run_recv(void *skptr) {
  while(true) {
    printf("================run_recv\n");
    logger->debug("================run_recv\n");
    sleep(1);
  }
}
@@ -60,17 +64,21 @@
  ShmModSocket *sk = new ShmModSocket();
  
  pthread_t tid;
  pthread_create(&tid, NULL, run_recv, (void *)sk);
  int size;
  
  char action[512];
  char topic[512];
  char content[512];
  long i = 0;
  pthread_create(&tid, NULL, run_recv, (void *)sk);
  while (true) {
    //printf("Usage: pub <topic> [content] or sub <topic>\n");
    printf("Can I help you? sub, pub, desub or quit\n");
    scanf("%s",action);
    printf("Can I help you? sub, pub, desub or quit %d\n", i++);
    // sleep(100);
    scanf("%s", action);
    
    if(strcmp(action, "sub") == 0) {
      printf("Please input topic!\n");