wangzhengquan
2021-01-13 8df2b63c21d0aabaa894930e3ab1ea63c49d47ff
fix bug invalid argument
6个文件已修改
105 ■■■■■ 已修改文件
src/px_sem_util.cpp 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.cpp 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.cpp 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/heart_beat.cpp 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/px_sem_util.cpp
@@ -1,13 +1,16 @@
#include "px_sem_util.h"
#define NANO 1000000000
struct timespec PXSemUtil::calc_sem_timeout(const struct timespec *ts) {
    int tmp_sec;
    struct timespec res;
  struct timespec timeout;
  if (clock_gettime(CLOCK_REALTIME, &timeout) == -1)
      err_exit(errno, "clock_gettime");
  timeout.tv_nsec += ts->tv_nsec;
  tmp_sec =  timeout.tv_nsec / 10e9;
  timeout.tv_nsec =  timeout.tv_nsec - tmp_sec * 10e9;
  timeout.tv_sec += ts->tv_sec + tmp_sec;
  return timeout;
  res.tv_sec = timeout.tv_sec + ts->tv_sec;
  res.tv_nsec = timeout.tv_nsec + ts->tv_nsec;
  res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO);
  res.tv_nsec = res.tv_nsec % NANO;
  return res;
}
src/queue/lock_free_queue.h
@@ -262,12 +262,12 @@
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
{
     
    int rv;
    struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
  // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld", 
  //   timeout.tv_sec, timeout.tv_nsec);
    while (sem_timedwait(&slots, &timeout) == -1) {
    while ( sem_timedwait(&slots, &timeout) == -1) {
    //     LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld, ETIMEDOUT=%d, errno=%d\n", 
    // timeout.tv_sec, timeout.tv_nsec, ETIMEDOUT, errno);
@@ -343,9 +343,11 @@
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
{
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");
// LoggerFactory::getLogger()->debug("=================ts sec = %d, nsec = %ld \n", ts->tv_sec,  ts->tv_nsec );
    // struct timespec timeout_tmp = {1, 0};
    struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
// LoggerFactory::getLogger()->debug("================== timeout before sec = %d, nsec = %ld \n", timeout.tv_sec,  timeout.tv_nsec );
    while (sem_timedwait(&items, &timeout) == -1) {
        if (errno == ETIMEDOUT)
@@ -353,8 +355,8 @@
        else if(errno == EINTR)
            continue;
        else {
          LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout");
          return -1;
          // LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout %d", errno);
          return errno;
        }
    }
src/socket/net_mod_socket.cpp
@@ -67,15 +67,15 @@
int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
  _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
  return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
}
int NetModSocket::sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec) {
  _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec);
  return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec);
}
int NetModSocket::sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
   _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0);
  return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0);
}
@@ -327,13 +327,13 @@
  // 本地发送
  if(node_arr == NULL || arrlen == 0) {
    if(msec == 0) {
      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
    } else if(msec > 0) {
      timeout.tv_sec = msec / 1000;
      timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
    } else {
      ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
    }
    if(ret == 0 ) {
      n_pub_suc++;
@@ -346,13 +346,13 @@
    if(node->host == NULL) {
      // 本地发送
      if(msec == 0) {
        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
      } else if(msec > 0) {
        timeout.tv_sec = msec / 1000;
        timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
      } else {
        ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
      }
      if(ret == 0 ) {
src/socket/shm_mod_socket.cpp
@@ -65,26 +65,32 @@
// printf("dgram_mod_recvfrom  before\n");
    int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flags);
// printf("dgram_mod_recvfrom  after\n");
    return rv;
}
/**
 * 接收信息
 * @key 从谁哪里收到的信息
 * @return 0 成功, 其他值 失败的错误码
*/
int ShmModSocket::recvfrom(void **buf, int *size, int *key) {
        return  _recvfrom_( buf, size, key, NULL, 0);
    int rv =  _recvfrom_( buf, size, key, NULL, 0);
    // logger->error(rv, "ShmModSocket::recvfrom failed!");
  return rv;
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, struct timespec *timeout) {
    return _recvfrom_(buf, size, key, timeout, 0);
    int rv =  _recvfrom_(buf, size, key, timeout, 0);
     return rv;
}
int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
    return _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
    int rv =  _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
    // logger->error(rv, "ShmModSocket::recvfrom_nowait failed!");
  return rv;
}
/**
@@ -188,14 +194,7 @@
 */
int  ShmModSocket::_sub_(char *topic, int topic_size, int key,  
    struct timespec *timeout, int flags) {
    // char buf[8192];
    // int rv;
    // snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
    // rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
    // if(rv == 0) {
    //     bus_set->insert(key);
    // }
    // return rv;
    int ret;
    bus_head_t head = {};
@@ -240,7 +239,14 @@
    if(size > 0) {
        ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
        free(buf);
        return ret;
        if(ret == EBUS_TIMEOUT) {
        logger->error(ret, "ShmModSocket::_desub_ key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT));
        return EBUS_TIMEOUT;
      } else {
        logger->error(ret, "ShmModSocket::_desub_ key %d failed!", key);
        return ret;
      }
    } else {
        return -1;
    }
src/socket/shm_socket.cpp
@@ -386,12 +386,12 @@
    delete remoteQueue;
    mm_free(dest.buf);
    if(rv == EBUS_TIMEOUT) {
      bus_errno = EBUS_TIMEOUT;
      logger->error(errno, "sendto key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT));
      // bus_errno = EBUS_TIMEOUT;
      logger->error(rv, "sendto key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT));
      return EBUS_TIMEOUT;
    } else {
      //logger->error(errno, "sendto key %d failed!", key);
      return -1;
      return rv;
    }
   
   
@@ -457,7 +457,16 @@
    mm_free(src.buf);
    return 0;
  } else {
    return -1;
    if(rv == EBUS_TIMEOUT) {
      // bus_errno = EBUS_TIMEOUT;
      logger->error("shm_recvfrom  failed, %s", bus_strerror(EBUS_TIMEOUT));
      return EBUS_TIMEOUT;
    } else {
      logger->error(rv, "shm_recvfrom  failed!");
      return rv;
    }
  }
}
test_net_socket/heart_beat.cpp
@@ -28,10 +28,13 @@
  char sendbuf[512];
  int rv;
  int remote_port;
  while (net_mod_socket_recvfrom(serv, &recvbuf, &size, &remote_port) == 0) {
    printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf);
    net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port);
    free(recvbuf);
  while (true) {
    if(net_mod_socket_recvfrom_timeout(serv, &recvbuf, &size, &remote_port, 0, 2000000000)==0) {
      printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf);
      net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port);
      free(recvbuf);
    }
  }
  // sleep(1000);
  net_mod_socket_close(serv);