wangzhengquan
2021-01-15 42d1c7ef91627d5ac920c8fa35573970ac1bd2d5
Merge branch 'dev' into rdma
1个文件已删除
2个文件已添加
13个文件已修改
439 ■■■■ 已修改文件
CMakeLists.txt 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_error.cpp 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_error.h 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/logger_factory.cpp 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/logger_factory.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/shm_queue.h 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.cpp 218 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket_wrapper.cpp 45 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/CMakeLists.txt 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test.cpp 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test1.cpp 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
CMakeLists.txt
@@ -20,4 +20,5 @@
list(APPEND EXTRA_LIBS ${PROJECT_SOURCE_DIR}/lib/libusgcommon.a pthread)
add_subdirectory(${PROJECT_SOURCE_DIR}/src)
add_subdirectory(${PROJECT_SOURCE_DIR}/test)
add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket)
src/bus_error.cpp
@@ -41,9 +41,9 @@
char *
bus_strerror(int err)
{
  int s;
  int s, eindex;
  char *buf;
  eindex = err - 10000;
  /* Make first caller allocate key for thread-specific data */
  s = pthread_once(&once, createKey);
@@ -64,13 +64,13 @@
      err_exit(s, "pthread_setspecific");
  }
  if (err < 0 || err >= _bus_nerr || _bus_errlist[err] == NULL)
  if (eindex < 0 || eindex >= _bus_nerr || _bus_errlist[eindex] == NULL)
  {
    snprintf(buf, MAX_ERROR_LEN, "Unknown error %d", err);
    snprintf(buf, MAX_ERROR_LEN, "Unknown error %d", eindex);
  }
  else
  {
    strncpy(buf, _bus_errlist[err], MAX_ERROR_LEN - 1);
    strncpy(buf, _bus_errlist[eindex], MAX_ERROR_LEN - 1);
    buf[MAX_ERROR_LEN - 1] = '\0';          /* Ensure null termination */
  }
src/bus_error.h
@@ -4,10 +4,10 @@
#define EBUS_TIMEOUT 1
#define EBUS_CLOSED 2
#define ESHM_BUS_KEY_INUSED 3
#define EBUS_BASE 10000
#define EBUS_TIMEOUT 10001
#define EBUS_CLOSED 10002
#define ESHM_BUS_KEY_INUSED 10003
extern int bus_errno;
src/logger_factory.cpp
@@ -1,4 +1,5 @@
#include "logger_factory.h"
#include "bus_error.h"
Logger * LoggerFactory::logger = NULL;
@@ -19,4 +20,15 @@
    config.console = 1;
    logger = new Logger(config);
    return logger;
}
void  LoggerFactory::error(int s) {
    Logger* logger = LoggerFactory::getLogger();
    if(s == EBUS_TIMEOUT) {
    logger->error("shm_recvfrom  failed, %s", bus_strerror(EBUS_TIMEOUT));
  } else {
    logger->error(s, "shm_recvfrom  failed!");
  }
}
src/logger_factory.h
@@ -10,6 +10,7 @@
public:
    static Logger* getLogger();
    static void error(int s);
};
#endif
src/queue/lock_free_queue.h
@@ -343,19 +343,20 @@
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
{
// LoggerFactory::getLogger()->debug("=================ts sec = %d, nsec = %ld \n", ts->tv_sec,  ts->tv_nsec );
    LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");
    // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");
    struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
// LoggerFactory::getLogger()->debug("================== timeout before sec = %d, nsec = %ld \n", timeout.tv_sec,  timeout.tv_nsec );
    while (sem_timedwait(&items, &timeout) == -1) {
        if (errno == ETIMEDOUT)
        // LoggerFactory::getLogger()->error(errno, "1 LockFreeQueue pop_timeout %d %d", errno, ETIMEDOUT);
        if (errno == ETIMEDOUT) {
             // LoggerFactory::getLogger()->error(errno, "2 LockFreeQueue pop_timeout %d %d", errno, EBUS_TIMEOUT);
            return EBUS_TIMEOUT;
        }
        else if(errno == EINTR)
            continue;
        else {
          LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout %d", errno);
          LoggerFactory::getLogger()->error(errno, "3  LockFreeQueue pop_timeout %d", errno);
          return errno;
        }
    }
src/queue/shm_queue.h
@@ -28,13 +28,12 @@
  inline bool full();
  inline bool empty();
  inline bool push(const ELEM_T &a_data);
  inline bool push_nowait(const ELEM_T &a_data);
  inline bool push_timeout(const ELEM_T &a_data,
                           const struct timespec *timeout);
  inline bool pop(ELEM_T &a_data);
  inline bool pop_nowait(ELEM_T &a_data);
  inline bool pop_timeout(ELEM_T &a_data, struct timespec *timeout);
  inline int push(const ELEM_T &a_data);
  inline int push_nowait(const ELEM_T &a_data);
  inline int push_timeout(const ELEM_T &a_data, const struct timespec *timeout);
  inline int pop(ELEM_T &a_data);
  inline int pop_nowait(ELEM_T &a_data);
  inline int pop_timeout(ELEM_T &a_data, struct timespec *timeout);
  inline ELEM_T &operator[](unsigned i);
@@ -167,23 +166,23 @@
}
template <typename ELEM_T>
inline bool SHMQueue<ELEM_T>::push(const ELEM_T &a_data) {
inline int SHMQueue<ELEM_T>::push(const ELEM_T &a_data) {
  return queue->push(a_data);
}
template <typename ELEM_T>
inline bool SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) {
inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) {
  return queue->push_nowait(a_data);
}
template <typename ELEM_T>
inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data,
inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data,
                                           const struct timespec *timeout) {
  return queue->push_timeout(a_data, timeout);
}
template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::pop(ELEM_T &a_data) {
template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop(ELEM_T &a_data) {
  // printf("SHMQueue pop before\n");
  int rv = queue->pop(a_data);
  // printf("SHMQueue after before\n");
@@ -191,13 +190,12 @@
}
template <typename ELEM_T>
inline bool SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) {
inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) {
  return queue->pop_nowait(a_data);
}
template <typename ELEM_T>
inline bool SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data,
                                          struct timespec *timeout) {
inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) {
  return queue->pop_timeout(a_data, timeout);
}
src/socket/net_mod_socket.cpp
@@ -180,7 +180,16 @@
        ret_arr[n_recv_suc].content = recv_buf;
        ret_arr[n_recv_suc].content_length = recv_size;
        n_recv_suc++;
      } else {
        if(ret > EBUS_BASE) {
          // bus_errno = EBUS_TIMEOUT;
          logger->debug("NetModSocket:: %d _sendandrecv_ to key %d failed, %s", get_key(), node->key, bus_strerror(ret));
        } else {
          logger->error(ret, "NetModSocket:: %d _sendandrecv_ to key %d failed", get_key(),  node->key);
        }
      }
     
      continue;
    }
@@ -443,147 +452,61 @@
  return n_pub_suc;
}
int NetModSocket::sendandrecv_safe(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
  int i,  clientfd;
  net_node_t *node;
  void *recv_buf;
  int recv_size;
  char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH];
  net_mod_request_head_t request_head = {};
  net_mod_response_head_t response_head;
  char portstr[32];
  char *buf = NULL;
  int buf_size, max_buf_size;
  if(buf == NULL) {
    buf = (char *)malloc(MAXBUF);
    max_buf_size = MAXBUF;
    LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv malloc");
  }
  int nsuc = 0;
  net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
  for (i = 0; i< arrlen; i++) {
    node = &node_arr[i];
    if(node->host == NULL) {
      // 本地发送
      shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
      goto LABEL_ARR_PUSH;
    }
    sprintf(portstr, "%d", node->port);
    clientfd = open_clientfd(node->host, portstr);
    if(clientfd < 0) {
      continue;
    }
    buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH;
    if(max_buf_size < buf_size) {
      if((buf = (char *)realloc(buf, buf_size)) == NULL) {
        LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe realloc buf");
      } else {
        max_buf_size = buf_size;
      }
    }
    request_head.mod = REQ_REP;
    request_head.key = node->key;
    request_head.content_length = send_size;
    request_head.topic_length = 0;
    // optval = 1;
    // setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
    memcpy(buf, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH);
    memcpy(buf + NET_MODE_REQUEST_HEAD_LENGTH, send_buf, send_size);
    if(rio_writen(clientfd, buf, buf_size) != buf_size ) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe  rio_writen  buf");
      close(clientfd);
      continue;
    }
    // optval = 0;
    // setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
    if ( rio_readn(clientfd, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) !=  NET_MODE_RESPONSE_HEAD_LENGTH) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe  rio_readnb response_head_bs");
      close(clientfd);
      continue;
    }
    response_head =  NetModSocket::decode_response_head(response_head_bs);
    if(response_head.code != 0) {
      continue;
    }
    recv_buf = malloc(response_head.content_length);
    if(recv_buf == NULL) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc");
      exit(1);
    }
    if ( (recv_size = rio_readn(clientfd, recv_buf, response_head.content_length) ) !=  response_head.content_length) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe  rio_readnb recv_buf");
      close(clientfd);
      continue;
    }
LABEL_ARR_PUSH:
    if(node->host != NULL) {
      strcpy(ret_arr[nsuc].host, node->host);
    } else {
      strcpy(ret_arr[nsuc].host, "local");
    }
    ret_arr[nsuc].port = node->port;
    ret_arr[nsuc].key = node->key;
    ret_arr[nsuc].content = recv_buf;
    ret_arr[nsuc].content_length = recv_size;
    nsuc++;
  }
  *recv_arr = ret_arr;
  if(recv_arr_size != NULL) {
    *recv_arr_size = nsuc;
  }
  free(buf);
  return nsuc;
}
/**
 * 发送信息
 * @key 发送给谁
 * @return 0 成功, 其他值 失败的错误码
 */
int NetModSocket::sendto(const void *buf, const int size, const int key){
  return shmModSocket.sendto(buf, size, key);
  int rv = shmModSocket.sendto(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::sendto: %d sendto %d success.\n", get_key(), key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::sendto: %d sendto  %d failed %s", get_key(), key, bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::sendto : %d sendto  %d failed", get_key(), key);
  }
  return rv;
}
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.sendto_timeout(buf, size, key, &timeout);
  int rv = shmModSocket.sendto_timeout(buf, size, key, &timeout);
  if(rv == 0) {
    logger->debug("NetModSocket::sendto_timeout: %d sendto %d success.\n", get_key(), key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::sendto_timeout : %d sendto  %d failed %s", get_key(),  key, bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::sendto_timeout:  %d sendto  %d failed", get_key(),  key);
  }
  return rv;
}
// 发送信息立刻返回。
int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){
  return shmModSocket.sendto_nowait(buf, size, key);
  int rv = shmModSocket.sendto_nowait(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::sendto_nowait: %d sendto %d success.\n", get_key(), key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::sendto_nowait %d sendto  %d failed %s", get_key(), key, bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::sendto_nowait %d sendto  %d failed", get_key(), key);
  }
  return rv;
}
/**
@@ -592,16 +515,53 @@
 * @return 0 成功, 其他值 失败的错误码
*/
int NetModSocket::recvfrom(void **buf, int *size, int *key) {
  return shmModSocket.recvfrom(buf, size, key);
  int rv = shmModSocket.recvfrom(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    logger->debug("NetModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::recvfrom: socket %d recvfrom failed",  get_key());
  }
  return rv;
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.recvfrom_timeout(buf, size, key, &timeout);
  int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::recvfrom_timeout:  %d recvfrom failed %s", get_key(), bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::recvfrom_timeout:  %d recvfrom failed",  get_key());
  }
  return rv;
}
int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
  return shmModSocket.recvfrom_nowait(buf, size, key);
  int rv = shmModSocket.recvfrom_nowait(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
  if(rv > EBUS_BASE) {
    // bus_errno = EBUS_TIMEOUT;
    logger->debug("NetModSocket::recvfrom_nowait:  %d recvfrom failed %s", get_key(), bus_strerror(rv));
  } else {
    logger->error(rv, "NetModSocket::recvfrom_nowait:  %d recvfrom failed",  get_key());
  }
  return rv;
}
/**
src/socket/net_mod_socket.h
@@ -157,8 +157,8 @@
   * 缺点:阻塞的,性能不如sendandrecv
   * 
   */
  int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size,
    net_mod_recv_msg_t ** recv_arr, int *recv_arr_size);
  // int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size,
  //   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size);
 
  /**
src/socket/net_mod_socket_wrapper.cpp
@@ -2,12 +2,12 @@
static Logger *logger = LoggerFactory::getLogger();
/**
 * 创建
 */
void * net_mod_socket_open() {
    printf("=====net_mod_socket_open\n");
    NetModSocket *sockt = new NetModSocket;
    return (void *)sockt;
}
@@ -16,48 +16,49 @@
 * 关闭
 */
void net_mod_socket_close(void *_socket) {
    printf("====net_mod_socket_close\n");
    NetModSocket *sockt = (NetModSocket *)_socket;
    delete sockt;
}
 
/**
 * 绑定端口到socket, 如果不绑定则系统自动分配一个
 * @return 0 成功, 其他值 失败的错误码
*/
int net_mod_socket_bind(void * _socket, int port){
int net_mod_socket_bind(void * _socket, int key){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->bind(port);
    return sockt->bind(key);
}
/**
 * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key
 * @return 0 成功, 其他值 失败的错误码
*/
int net_mod_socket_force_bind(void * _socket, int port) {
int net_mod_socket_force_bind(void * _socket, int key) {
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->force_bind(port);
    return sockt->force_bind(key);
}
/**
 * 发送信息
 * @port 发送给谁
 * @key 发送给谁
 * @return 0 成功, 其他值 失败的错误码
 */
int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int port) {
int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key) {
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->sendto(buf, size, port);
    logger->debug("net_mod_socket_sendto: %d sendto  %d", net_mod_socket_get_key(_socket), key);
    return sockt->sendto(buf, size, key);
}
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int port, int sec, int nsec){
int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->sendto_timeout(buf, size, port, sec, nsec);
    logger->debug("net_mod_socket_sendto: %d sendto  %d", net_mod_socket_get_key(_socket), key);
    return sockt->sendto_timeout(buf, size, key, sec, nsec);
}
// 发送信息立刻返回。
int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int port){
int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->sendto_nowait(buf, size, port);
    logger->debug("net_mod_socket_sendto: %d sendto  %d", net_mod_socket_get_key(_socket), key);
    return sockt->sendto_nowait(buf, size, key);
}
/**
@@ -65,18 +66,19 @@
 * @port 从谁哪里收到的信息
 * @return 0 成功, 其他值 失败的错误码
*/
int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *port){
int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->recvfrom(buf, size, port);
    return sockt->recvfrom(buf, size, key);
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *port, int sec, int nsec){
int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->recvfrom_timeout(buf, size, port, sec, nsec);
    //return sockt->recvfrom(buf, size, key);
    return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
}
int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *port){
int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->recvfrom_nowait(buf, size, port);
    return sockt->recvfrom_nowait(buf, size, key);
}
int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
@@ -151,7 +153,6 @@
 * 取消订阅指定主题
 * @topic 主题,主题为空时取消全部订阅
 * @size 主题长度
 * @port 总线端口
 */
int  net_mod_socket_desub(void * _socket, void *topic, int size) {
    NetModSocket *sockt = (NetModSocket *)_socket;
src/socket/shm_mod_socket.cpp
@@ -62,7 +62,7 @@
*/
int ShmModSocket::recvfrom(void **buf, int *size, int *key) {
    int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, 0);
    // logger->error(rv, "ShmModSocket::recvfrom failed!");
  return rv;
}
src/socket/shm_socket.cpp
@@ -31,7 +31,7 @@
   void *tmp_ptr = mm_get_by_key(socket->key);
    if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) {
      bus_errno = ESHM_BUS_KEY_INUSED;
      logger->error("%s. key = %d ", bus_strerror(bus_errno), socket->key);
      logger->error("%s. key = %d ", bus_strerror(ESHM_BUS_KEY_INUSED), socket->key);
      return 0;
    }
    return 1;
@@ -318,7 +318,7 @@
               const int key, const struct timespec *timeout, const int flags) {
  int s;
  bool rv;
  int rv;
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    logger->error( "shm_socket.shm_sendto: Can't invoke shm_sendto method in a %d type socket  which is "
@@ -350,10 +350,10 @@
  if ((s = pthread_mutex_unlock(&(socket->mutex))) != 0)
    err_exit(s, "shm_sendto : pthread_mutex_unlock");
  
  // if (key == socket->key) {
  //   logger->error( "can not send to your self!");
  //   return -1;
  // }
  if (key == socket->key) {
    logger->error( "can not send to your self!");
    return -1;
  }
  SHMQueue<shm_msg_t> *remoteQueue;
  if ((remoteQueue = _attach_remote_queue(key)) == NULL) {
@@ -385,14 +385,13 @@
  } else {
    delete remoteQueue;
    mm_free(dest.buf);
    if(rv == EBUS_TIMEOUT) {
    if(rv > EBUS_BASE) {
      // bus_errno = EBUS_TIMEOUT;
      // logger->error("sendto key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT));
      return EBUS_TIMEOUT;
      logger->debug("sendto key %d failed %s", key, bus_strerror(rv));
    } else {
      //logger->error(errno, "sendto key %d failed!", key);
      return rv;
      logger->error(rv, "sendto key %d failed", key);
    }
    return rv;
   
   
  }
@@ -401,7 +400,7 @@
// 短连接方式接受
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key,  struct timespec *timeout,  int flags) {
  int s;
  bool rv;
  int rv;
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    logger->error("shm_socket.shm_recvfrom: Can't invoke shm_recvfrom method in a %d type socket  which "
@@ -437,6 +436,7 @@
    rv = socket->queue->pop_nowait(src);
  } else if(timeout != NULL) {
    rv = socket->queue->pop_timeout(src, timeout);
// printf("0 shm_recvfrom====%d\n", rv);
  } else {
    rv = socket->queue->pop(src);
  }
@@ -457,14 +457,12 @@
    mm_free(src.buf);
    return 0;
  } else {
    if(rv == EBUS_TIMEOUT) {
      // logger->error("shm_recvfrom  failed, %s", bus_strerror(EBUS_TIMEOUT));
      return EBUS_TIMEOUT;
    if(rv > EBUS_BASE) {
      logger->debug("shm_recvfrom failed %s", bus_strerror(rv));
    } else {
      // logger->error(rv, "shm_recvfrom  failed!");
      return rv;
      logger->error(rv, "shm_recvfrom failed");
    }
    return rv;
  }
}
@@ -531,7 +529,7 @@
  if (tmp_socket == NULL)
  {
    /* If first call from this thread, allocate buffer for thread, and save its location */
    logger->debug("%d create tmp socket\n", pthread_self() );
    logger->debug("%ld create tmp socket\n", (long)pthread_self() );
    tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
    rv =  pthread_setspecific(_tmp_recv_socket_key_, tmp_socket);
@@ -543,13 +541,10 @@
  if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
    rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
    return rv;
  } else {
    return rv;
  }
  return -1;
}
int _shm_sendandrecv_alloc_new(shm_socket_t *socket, const void *send_buf,
src/socket/shm_socket.h
@@ -50,9 +50,9 @@
    int key;
    bool force_bind;
    pthread_mutex_t mutex;
    shm_connection_status_t status;
    SHMQueue<shm_msg_t> *queue;
    SHMQueue<shm_msg_t> *remoteQueue;
    shm_connection_status_t status;
    SHMQueue<shm_msg_t> *queue;  //self queue
    SHMQueue<shm_msg_t> *remoteQueue; // peer queue
    LockFreeQueue<shm_msg_t, DM_Allocator> *messageQueue;
    LockFreeQueue<shm_msg_t, DM_Allocator> *acceptQueue;
    std::map<int, shm_socket_t* > *clientSocketMap;
test/CMakeLists.txt
New file
@@ -0,0 +1,11 @@
# add the executable
add_executable(test1 test1.cpp )
target_link_libraries(test1 PUBLIC  ${EXTRA_LIBS} )
target_include_directories(test1 PUBLIC
                            "${PROJECT_BINARY_DIR}"
                             ${EXTRA_INCLUDES}
                            )
# add the install targets
install(TARGETS test1 DESTINATION bin)
test/test.cpp
File was deleted
test/test1.cpp
New file
@@ -0,0 +1,33 @@
#include "usg_common.h"
static void sig_quit(int);
#define SIGCLOSE1   (SIGRTMIN +1)
struct cm_con_data_t
{
  uint64_t addr; /* Buffer address */
  uint32_t rkey; /* Remote key */
  uint32_t qp_num; /* QP number */
  uint16_t lid; /* LID of the IB port */
  uint8_t gid[16]; /* gid */
} __attribute__ ((packed));
struct cm_con_data2_t
{
  uint64_t addr; /* Buffer address */
  uint32_t rkey; /* Remote key */
  uint32_t qp_num; /* QP number */
  uint16_t lid; /* LID of the IB port */
  uint8_t gid[16]; /* gid */
} ;
int
main(void)
{
 printf("===%d, %d \n", sizeof(cm_con_data_t),  sizeof(cm_con_data2_t));
  /* SIGQUIT here will terminate with core file */
}