Merge branch 'dev' into rdma
| | |
| | | list(APPEND EXTRA_LIBS ${PROJECT_SOURCE_DIR}/lib/libusgcommon.a pthread) |
| | | |
| | | add_subdirectory(${PROJECT_SOURCE_DIR}/src) |
| | | add_subdirectory(${PROJECT_SOURCE_DIR}/test) |
| | | add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket) |
| | |
| | | char * |
| | | bus_strerror(int err) |
| | | { |
| | | int s; |
| | | int s, eindex; |
| | | char *buf; |
| | | |
| | | eindex = err - 10000; |
| | | /* Make first caller allocate key for thread-specific data */ |
| | | |
| | | s = pthread_once(&once, createKey); |
| | |
| | | err_exit(s, "pthread_setspecific"); |
| | | } |
| | | |
| | | if (err < 0 || err >= _bus_nerr || _bus_errlist[err] == NULL) |
| | | if (eindex < 0 || eindex >= _bus_nerr || _bus_errlist[eindex] == NULL) |
| | | { |
| | | snprintf(buf, MAX_ERROR_LEN, "Unknown error %d", err); |
| | | snprintf(buf, MAX_ERROR_LEN, "Unknown error %d", eindex); |
| | | } |
| | | else |
| | | { |
| | | strncpy(buf, _bus_errlist[err], MAX_ERROR_LEN - 1); |
| | | strncpy(buf, _bus_errlist[eindex], MAX_ERROR_LEN - 1); |
| | | buf[MAX_ERROR_LEN - 1] = '\0'; /* Ensure null termination */ |
| | | } |
| | | |
| | |
| | | |
| | | |
| | | |
| | | |
| | | #define EBUS_TIMEOUT 1 |
| | | #define EBUS_CLOSED 2 |
| | | #define ESHM_BUS_KEY_INUSED 3 |
| | | #define EBUS_BASE 10000 |
| | | #define EBUS_TIMEOUT 10001 |
| | | #define EBUS_CLOSED 10002 |
| | | #define ESHM_BUS_KEY_INUSED 10003 |
| | | |
| | | extern int bus_errno; |
| | | |
| | |
| | | #include "logger_factory.h" |
| | | #include "bus_error.h" |
| | | |
| | | Logger * LoggerFactory::logger = NULL; |
| | | |
| | |
| | | config.console = 1; |
| | | logger = new Logger(config); |
| | | return logger; |
| | | } |
| | | |
| | | void LoggerFactory::error(int s) { |
| | | Logger* logger = LoggerFactory::getLogger(); |
| | | if(s == EBUS_TIMEOUT) { |
| | | logger->error("shm_recvfrom failed, %s", bus_strerror(EBUS_TIMEOUT)); |
| | | |
| | | } else { |
| | | logger->error(s, "shm_recvfrom failed!"); |
| | | |
| | | } |
| | | } |
| | |
| | | public: |
| | | |
| | | static Logger* getLogger(); |
| | | static void error(int s); |
| | | }; |
| | | |
| | | #endif |
| | |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts) |
| | | { |
| | | // LoggerFactory::getLogger()->debug("=================ts sec = %d, nsec = %ld \n", ts->tv_sec, ts->tv_nsec ); |
| | | |
| | | LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n"); |
| | | // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n"); |
| | | struct timespec timeout = PXSemUtil::calc_sem_timeout(ts); |
| | | // LoggerFactory::getLogger()->debug("================== timeout before sec = %d, nsec = %ld \n", timeout.tv_sec, timeout.tv_nsec ); |
| | | |
| | | while (sem_timedwait(&items, &timeout) == -1) { |
| | | if (errno == ETIMEDOUT) |
| | | // LoggerFactory::getLogger()->error(errno, "1 LockFreeQueue pop_timeout %d %d", errno, ETIMEDOUT); |
| | | if (errno == ETIMEDOUT) { |
| | | // LoggerFactory::getLogger()->error(errno, "2 LockFreeQueue pop_timeout %d %d", errno, EBUS_TIMEOUT); |
| | | return EBUS_TIMEOUT; |
| | | } |
| | | else if(errno == EINTR) |
| | | continue; |
| | | else { |
| | | LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout %d", errno); |
| | | LoggerFactory::getLogger()->error(errno, "3 LockFreeQueue pop_timeout %d", errno); |
| | | return errno; |
| | | } |
| | | } |
| | |
| | | inline bool full(); |
| | | inline bool empty(); |
| | | |
| | | inline bool push(const ELEM_T &a_data); |
| | | inline bool push_nowait(const ELEM_T &a_data); |
| | | inline bool push_timeout(const ELEM_T &a_data, |
| | | const struct timespec *timeout); |
| | | inline bool pop(ELEM_T &a_data); |
| | | inline bool pop_nowait(ELEM_T &a_data); |
| | | inline bool pop_timeout(ELEM_T &a_data, struct timespec *timeout); |
| | | inline int push(const ELEM_T &a_data); |
| | | inline int push_nowait(const ELEM_T &a_data); |
| | | inline int push_timeout(const ELEM_T &a_data, const struct timespec *timeout); |
| | | inline int pop(ELEM_T &a_data); |
| | | inline int pop_nowait(ELEM_T &a_data); |
| | | inline int pop_timeout(ELEM_T &a_data, struct timespec *timeout); |
| | | |
| | | inline ELEM_T &operator[](unsigned i); |
| | | |
| | |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | inline bool SHMQueue<ELEM_T>::push(const ELEM_T &a_data) { |
| | | inline int SHMQueue<ELEM_T>::push(const ELEM_T &a_data) { |
| | | return queue->push(a_data); |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | inline bool SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) { |
| | | inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) { |
| | | return queue->push_nowait(a_data); |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, |
| | | inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, |
| | | const struct timespec *timeout) { |
| | | |
| | | return queue->push_timeout(a_data, timeout); |
| | | } |
| | | |
| | | template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::pop(ELEM_T &a_data) { |
| | | template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop(ELEM_T &a_data) { |
| | | // printf("SHMQueue pop before\n"); |
| | | int rv = queue->pop(a_data); |
| | | // printf("SHMQueue after before\n"); |
| | |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | inline bool SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) { |
| | | inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) { |
| | | return queue->pop_nowait(a_data); |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | inline bool SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, |
| | | struct timespec *timeout) { |
| | | inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) { |
| | | return queue->pop_timeout(a_data, timeout); |
| | | } |
| | | |
| | |
| | | ret_arr[n_recv_suc].content = recv_buf; |
| | | ret_arr[n_recv_suc].content_length = recv_size; |
| | | n_recv_suc++; |
| | | } else { |
| | | if(ret > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->debug("NetModSocket:: %d _sendandrecv_ to key %d failed, %s", get_key(), node->key, bus_strerror(ret)); |
| | | |
| | | } else { |
| | | logger->error(ret, "NetModSocket:: %d _sendandrecv_ to key %d failed", get_key(), node->key); |
| | | } |
| | | } |
| | | |
| | | |
| | | continue; |
| | | } |
| | |
| | | return n_pub_suc; |
| | | } |
| | | |
| | | |
| | | int NetModSocket::sendandrecv_safe(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { |
| | | |
| | | int i, clientfd; |
| | | net_node_t *node; |
| | | void *recv_buf; |
| | | int recv_size; |
| | | char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH]; |
| | | net_mod_request_head_t request_head = {}; |
| | | net_mod_response_head_t response_head; |
| | | |
| | | |
| | | char portstr[32]; |
| | | char *buf = NULL; |
| | | int buf_size, max_buf_size; |
| | | |
| | | if(buf == NULL) { |
| | | buf = (char *)malloc(MAXBUF); |
| | | max_buf_size = MAXBUF; |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv malloc"); |
| | | } |
| | | |
| | | int nsuc = 0; |
| | | net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t)); |
| | | |
| | | for (i = 0; i< arrlen; i++) { |
| | | |
| | | node = &node_arr[i]; |
| | | if(node->host == NULL) { |
| | | // 本地发送 |
| | | shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size); |
| | | goto LABEL_ARR_PUSH; |
| | | } |
| | | |
| | | sprintf(portstr, "%d", node->port); |
| | | clientfd = open_clientfd(node->host, portstr); |
| | | if(clientfd < 0) { |
| | | continue; |
| | | } |
| | | |
| | | buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH; |
| | | if(max_buf_size < buf_size) { |
| | | if((buf = (char *)realloc(buf, buf_size)) == NULL) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe realloc buf"); |
| | | } else { |
| | | max_buf_size = buf_size; |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | request_head.mod = REQ_REP; |
| | | request_head.key = node->key; |
| | | request_head.content_length = send_size; |
| | | request_head.topic_length = 0; |
| | | |
| | | // optval = 1; |
| | | // setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval)); |
| | | memcpy(buf, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH); |
| | | memcpy(buf + NET_MODE_REQUEST_HEAD_LENGTH, send_buf, send_size); |
| | | |
| | | |
| | | if(rio_writen(clientfd, buf, buf_size) != buf_size ) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe rio_writen buf"); |
| | | |
| | | close(clientfd); |
| | | continue; |
| | | } |
| | | // optval = 0; |
| | | // setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval)); |
| | | |
| | | if ( rio_readn(clientfd, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe rio_readnb response_head_bs"); |
| | | |
| | | close(clientfd); |
| | | continue; |
| | | } |
| | | |
| | | response_head = NetModSocket::decode_response_head(response_head_bs); |
| | | if(response_head.code != 0) { |
| | | continue; |
| | | } |
| | | |
| | | recv_buf = malloc(response_head.content_length); |
| | | if(recv_buf == NULL) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc"); |
| | | exit(1); |
| | | } |
| | | if ( (recv_size = rio_readn(clientfd, recv_buf, response_head.content_length) ) != response_head.content_length) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe rio_readnb recv_buf"); |
| | | |
| | | close(clientfd); |
| | | continue; |
| | | } |
| | | |
| | | LABEL_ARR_PUSH: |
| | | if(node->host != NULL) { |
| | | strcpy(ret_arr[nsuc].host, node->host); |
| | | } else { |
| | | strcpy(ret_arr[nsuc].host, "local"); |
| | | } |
| | | |
| | | ret_arr[nsuc].port = node->port; |
| | | ret_arr[nsuc].key = node->key; |
| | | ret_arr[nsuc].content = recv_buf; |
| | | ret_arr[nsuc].content_length = recv_size; |
| | | |
| | | nsuc++; |
| | | } |
| | | |
| | | *recv_arr = ret_arr; |
| | | if(recv_arr_size != NULL) { |
| | | *recv_arr_size = nsuc; |
| | | } |
| | | |
| | | free(buf); |
| | | return nsuc; |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * 发送信息 |
| | | * @key 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int NetModSocket::sendto(const void *buf, const int size, const int key){ |
| | | return shmModSocket.sendto(buf, size, key); |
| | | int rv = shmModSocket.sendto(buf, size, key); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::sendto: %d sendto %d success.\n", get_key(), key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->debug("NetModSocket::sendto: %d sendto %d failed %s", get_key(), key, bus_strerror(rv)); |
| | | } else { |
| | | logger->error(rv, "NetModSocket::sendto : %d sendto %d failed", get_key(), key); |
| | | } |
| | | return rv; |
| | | } |
| | | |
| | | // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){ |
| | | struct timespec timeout = {sec, nsec}; |
| | | return shmModSocket.sendto_timeout(buf, size, key, &timeout); |
| | | int rv = shmModSocket.sendto_timeout(buf, size, key, &timeout); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::sendto_timeout: %d sendto %d success.\n", get_key(), key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->debug("NetModSocket::sendto_timeout : %d sendto %d failed %s", get_key(), key, bus_strerror(rv)); |
| | | } else { |
| | | logger->error(rv, "NetModSocket::sendto_timeout: %d sendto %d failed", get_key(), key); |
| | | } |
| | | return rv; |
| | | } |
| | | |
| | | // 发送信息立刻返回。 |
| | | int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){ |
| | | return shmModSocket.sendto_nowait(buf, size, key); |
| | | int rv = shmModSocket.sendto_nowait(buf, size, key); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::sendto_nowait: %d sendto %d success.\n", get_key(), key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->debug("NetModSocket::sendto_nowait %d sendto %d failed %s", get_key(), key, bus_strerror(rv)); |
| | | |
| | | } else { |
| | | logger->error(rv, "NetModSocket::sendto_nowait %d sendto %d failed", get_key(), key); |
| | | } |
| | | return rv; |
| | | } |
| | | |
| | | /** |
| | |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int NetModSocket::recvfrom(void **buf, int *size, int *key) { |
| | | return shmModSocket.recvfrom(buf, size, key); |
| | | int rv = shmModSocket.recvfrom(buf, size, key); |
| | | |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | logger->debug("NetModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv)); |
| | | } else { |
| | | logger->error(rv, "NetModSocket::recvfrom: socket %d recvfrom failed", get_key()); |
| | | } |
| | | return rv; |
| | | } |
| | | |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){ |
| | | struct timespec timeout = {sec, nsec}; |
| | | return shmModSocket.recvfrom_timeout(buf, size, key, &timeout); |
| | | int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->debug("NetModSocket::recvfrom_timeout: %d recvfrom failed %s", get_key(), bus_strerror(rv)); |
| | | } else { |
| | | logger->error(rv, "NetModSocket::recvfrom_timeout: %d recvfrom failed", get_key()); |
| | | } |
| | | return rv; |
| | | } |
| | | |
| | | int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){ |
| | | return shmModSocket.recvfrom_nowait(buf, size, key); |
| | | int rv = shmModSocket.recvfrom_nowait(buf, size, key); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->debug("NetModSocket::recvfrom_nowait: %d recvfrom failed %s", get_key(), bus_strerror(rv)); |
| | | } else { |
| | | logger->error(rv, "NetModSocket::recvfrom_nowait: %d recvfrom failed", get_key()); |
| | | } |
| | | return rv; |
| | | } |
| | | |
| | | /** |
| | |
| | | * 缺点:阻塞的,性能不如sendandrecv |
| | | * |
| | | */ |
| | | int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size); |
| | | // int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, |
| | | // net_mod_recv_msg_t ** recv_arr, int *recv_arr_size); |
| | | |
| | | |
| | | /** |
| | |
| | | |
| | | |
| | | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | | /** |
| | | * 创建 |
| | | */ |
| | | void * net_mod_socket_open() { |
| | | printf("=====net_mod_socket_open\n"); |
| | | NetModSocket *sockt = new NetModSocket; |
| | | return (void *)sockt; |
| | | } |
| | |
| | | * 关闭 |
| | | */ |
| | | void net_mod_socket_close(void *_socket) { |
| | | printf("====net_mod_socket_close\n"); |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | delete sockt; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * 绑定端口到socket, 如果不绑定则系统自动分配一个 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int net_mod_socket_bind(void * _socket, int port){ |
| | | int net_mod_socket_bind(void * _socket, int key){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->bind(port); |
| | | return sockt->bind(key); |
| | | } |
| | | |
| | | /** |
| | | * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int net_mod_socket_force_bind(void * _socket, int port) { |
| | | int net_mod_socket_force_bind(void * _socket, int key) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->force_bind(port); |
| | | return sockt->force_bind(key); |
| | | } |
| | | |
| | | /** |
| | | * 发送信息 |
| | | * @port 发送给谁 |
| | | * @key 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int port) { |
| | | int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->sendto(buf, size, port); |
| | | logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key); |
| | | return sockt->sendto(buf, size, key); |
| | | } |
| | | // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int port, int sec, int nsec){ |
| | | int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->sendto_timeout(buf, size, port, sec, nsec); |
| | | logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key); |
| | | return sockt->sendto_timeout(buf, size, key, sec, nsec); |
| | | } |
| | | // 发送信息立刻返回。 |
| | | int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int port){ |
| | | int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->sendto_nowait(buf, size, port); |
| | | logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key); |
| | | return sockt->sendto_nowait(buf, size, key); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @port 从谁哪里收到的信息 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *port){ |
| | | int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvfrom(buf, size, port); |
| | | return sockt->recvfrom(buf, size, key); |
| | | } |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *port, int sec, int nsec){ |
| | | int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvfrom_timeout(buf, size, port, sec, nsec); |
| | | //return sockt->recvfrom(buf, size, key); |
| | | return sockt->recvfrom_timeout(buf, size, key, sec, nsec); |
| | | } |
| | | int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *port){ |
| | | int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvfrom_nowait(buf, size, port); |
| | | return sockt->recvfrom_nowait(buf, size, key); |
| | | } |
| | | |
| | | int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | |
| | | * 取消订阅指定主题 |
| | | * @topic 主题,主题为空时取消全部订阅 |
| | | * @size 主题长度 |
| | | * @port 总线端口 |
| | | */ |
| | | int net_mod_socket_desub(void * _socket, void *topic, int size) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | |
| | | */ |
| | | int ShmModSocket::recvfrom(void **buf, int *size, int *key) { |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, NULL, 0); |
| | | // logger->error(rv, "ShmModSocket::recvfrom failed!"); |
| | | |
| | | return rv; |
| | | } |
| | | |
| | |
| | | void *tmp_ptr = mm_get_by_key(socket->key); |
| | | if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) { |
| | | bus_errno = ESHM_BUS_KEY_INUSED; |
| | | logger->error("%s. key = %d ", bus_strerror(bus_errno), socket->key); |
| | | logger->error("%s. key = %d ", bus_strerror(ESHM_BUS_KEY_INUSED), socket->key); |
| | | return 0; |
| | | } |
| | | return 1; |
| | |
| | | const int key, const struct timespec *timeout, const int flags) { |
| | | |
| | | int s; |
| | | bool rv; |
| | | int rv; |
| | | |
| | | if (socket->socket_type != SHM_SOCKET_DGRAM) { |
| | | logger->error( "shm_socket.shm_sendto: Can't invoke shm_sendto method in a %d type socket which is " |
| | |
| | | if ((s = pthread_mutex_unlock(&(socket->mutex))) != 0) |
| | | err_exit(s, "shm_sendto : pthread_mutex_unlock"); |
| | | |
| | | // if (key == socket->key) { |
| | | // logger->error( "can not send to your self!"); |
| | | // return -1; |
| | | // } |
| | | if (key == socket->key) { |
| | | logger->error( "can not send to your self!"); |
| | | return -1; |
| | | } |
| | | |
| | | SHMQueue<shm_msg_t> *remoteQueue; |
| | | if ((remoteQueue = _attach_remote_queue(key)) == NULL) { |
| | |
| | | } else { |
| | | delete remoteQueue; |
| | | mm_free(dest.buf); |
| | | if(rv == EBUS_TIMEOUT) { |
| | | if(rv > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | // logger->error("sendto key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT)); |
| | | return EBUS_TIMEOUT; |
| | | logger->debug("sendto key %d failed %s", key, bus_strerror(rv)); |
| | | } else { |
| | | //logger->error(errno, "sendto key %d failed!", key); |
| | | return rv; |
| | | logger->error(rv, "sendto key %d failed", key); |
| | | } |
| | | return rv; |
| | | |
| | | |
| | | } |
| | |
| | | // 短连接方式接受 |
| | | int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key, struct timespec *timeout, int flags) { |
| | | int s; |
| | | bool rv; |
| | | int rv; |
| | | |
| | | if (socket->socket_type != SHM_SOCKET_DGRAM) { |
| | | logger->error("shm_socket.shm_recvfrom: Can't invoke shm_recvfrom method in a %d type socket which " |
| | |
| | | rv = socket->queue->pop_nowait(src); |
| | | } else if(timeout != NULL) { |
| | | rv = socket->queue->pop_timeout(src, timeout); |
| | | // printf("0 shm_recvfrom====%d\n", rv); |
| | | } else { |
| | | rv = socket->queue->pop(src); |
| | | } |
| | |
| | | mm_free(src.buf); |
| | | return 0; |
| | | } else { |
| | | |
| | | if(rv == EBUS_TIMEOUT) { |
| | | // logger->error("shm_recvfrom failed, %s", bus_strerror(EBUS_TIMEOUT)); |
| | | return EBUS_TIMEOUT; |
| | | if(rv > EBUS_BASE) { |
| | | logger->debug("shm_recvfrom failed %s", bus_strerror(rv)); |
| | | } else { |
| | | // logger->error(rv, "shm_recvfrom failed!"); |
| | | return rv; |
| | | logger->error(rv, "shm_recvfrom failed"); |
| | | } |
| | | return rv; |
| | | |
| | | } |
| | | } |
| | |
| | | if (tmp_socket == NULL) |
| | | { |
| | | /* If first call from this thread, allocate buffer for thread, and save its location */ |
| | | logger->debug("%d create tmp socket\n", pthread_self() ); |
| | | logger->debug("%ld create tmp socket\n", (long)pthread_self() ); |
| | | tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM); |
| | | |
| | | rv = pthread_setspecific(_tmp_recv_socket_key_, tmp_socket); |
| | |
| | | |
| | | if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) { |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); |
| | | |
| | | return rv; |
| | | } else { |
| | | |
| | | return rv; |
| | | } |
| | | return -1; |
| | | } |
| | | |
| | | int _shm_sendandrecv_alloc_new(shm_socket_t *socket, const void *send_buf, |
| | |
| | | int key; |
| | | bool force_bind; |
| | | pthread_mutex_t mutex; |
| | | shm_connection_status_t status; |
| | | SHMQueue<shm_msg_t> *queue; |
| | | SHMQueue<shm_msg_t> *remoteQueue; |
| | | shm_connection_status_t status; |
| | | SHMQueue<shm_msg_t> *queue; //self queue |
| | | SHMQueue<shm_msg_t> *remoteQueue; // peer queue |
| | | LockFreeQueue<shm_msg_t, DM_Allocator> *messageQueue; |
| | | LockFreeQueue<shm_msg_t, DM_Allocator> *acceptQueue; |
| | | std::map<int, shm_socket_t* > *clientSocketMap; |
New file |
| | |
| | | # add the executable |
| | | add_executable(test1 test1.cpp ) |
| | | target_link_libraries(test1 PUBLIC ${EXTRA_LIBS} ) |
| | | |
| | | target_include_directories(test1 PUBLIC |
| | | "${PROJECT_BINARY_DIR}" |
| | | ${EXTRA_INCLUDES} |
| | | ) |
| | | |
| | | # add the install targets |
| | | install(TARGETS test1 DESTINATION bin) |
New file |
| | |
| | | #include "usg_common.h" |
| | | static void sig_quit(int); |
| | | #define SIGCLOSE1 (SIGRTMIN +1) |
| | | |
| | | struct cm_con_data_t |
| | | { |
| | | uint64_t addr; /* Buffer address */ |
| | | uint32_t rkey; /* Remote key */ |
| | | uint32_t qp_num; /* QP number */ |
| | | uint16_t lid; /* LID of the IB port */ |
| | | uint8_t gid[16]; /* gid */ |
| | | } __attribute__ ((packed)); |
| | | |
| | | |
| | | |
| | | |
| | | struct cm_con_data2_t |
| | | { |
| | | uint64_t addr; /* Buffer address */ |
| | | uint32_t rkey; /* Remote key */ |
| | | uint32_t qp_num; /* QP number */ |
| | | uint16_t lid; /* LID of the IB port */ |
| | | uint8_t gid[16]; /* gid */ |
| | | } ; |
| | | |
| | | int |
| | | main(void) |
| | | { |
| | | |
| | | printf("===%d, %d \n", sizeof(cm_con_data_t), sizeof(cm_con_data2_t)); |
| | | /* SIGQUIT here will terminate with core file */ |
| | | } |
| | | |