| | |
| | | #include "px_sem_util.h" |
| | | |
| | | #define NANO 1000000000 |
| | | struct timespec PXSemUtil::calc_sem_timeout(const struct timespec *ts) { |
| | | int tmp_sec; |
| | | |
| | | struct timespec res; |
| | | struct timespec timeout; |
| | | if (clock_gettime(CLOCK_REALTIME, &timeout) == -1) |
| | | err_exit(errno, "clock_gettime"); |
| | | timeout.tv_nsec += ts->tv_nsec; |
| | | tmp_sec = timeout.tv_nsec / 10e9; |
| | | timeout.tv_nsec = timeout.tv_nsec - tmp_sec * 10e9; |
| | | timeout.tv_sec += ts->tv_sec + tmp_sec; |
| | | return timeout; |
| | | |
| | | res.tv_sec = timeout.tv_sec + ts->tv_sec; |
| | | res.tv_nsec = timeout.tv_nsec + ts->tv_nsec; |
| | | res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO); |
| | | res.tv_nsec = res.tv_nsec % NANO; |
| | | return res; |
| | | } |
| | |
| | | int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts) |
| | | { |
| | | |
| | | |
| | | int rv; |
| | | struct timespec timeout = PXSemUtil::calc_sem_timeout(ts); |
| | | // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld", |
| | | // timeout.tv_sec, timeout.tv_nsec); |
| | | |
| | | while (sem_timedwait(&slots, &timeout) == -1) { |
| | | while ( sem_timedwait(&slots, &timeout) == -1) { |
| | | // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld, ETIMEDOUT=%d, errno=%d\n", |
| | | // timeout.tv_sec, timeout.tv_nsec, ETIMEDOUT, errno); |
| | | |
| | |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts) |
| | | { |
| | | // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n"); |
| | | // LoggerFactory::getLogger()->debug("=================ts sec = %d, nsec = %ld \n", ts->tv_sec, ts->tv_nsec ); |
| | | |
| | | // struct timespec timeout_tmp = {1, 0}; |
| | | struct timespec timeout = PXSemUtil::calc_sem_timeout(ts); |
| | | // LoggerFactory::getLogger()->debug("================== timeout before sec = %d, nsec = %ld \n", timeout.tv_sec, timeout.tv_nsec ); |
| | | |
| | | while (sem_timedwait(&items, &timeout) == -1) { |
| | | if (errno == ETIMEDOUT) |
| | |
| | | else if(errno == EINTR) |
| | | continue; |
| | | else { |
| | | LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout"); |
| | | return -1; |
| | | // LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout %d", errno); |
| | | return errno; |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { |
| | | _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1); |
| | | return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1); |
| | | } |
| | | int NetModSocket::sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec) { |
| | | _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec); |
| | | return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec); |
| | | } |
| | | int NetModSocket::sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { |
| | | _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0); |
| | | return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0); |
| | | |
| | | } |
| | | |
| | |
| | | // 本地发送 |
| | | if(node_arr == NULL || arrlen == 0) { |
| | | if(msec == 0) { |
| | | ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key); |
| | | ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY); |
| | | } else if(msec > 0) { |
| | | timeout.tv_sec = msec / 1000; |
| | | timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; |
| | | ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout); |
| | | ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout); |
| | | } else { |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key); |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY); |
| | | } |
| | | if(ret == 0 ) { |
| | | n_pub_suc++; |
| | |
| | | if(node->host == NULL) { |
| | | // 本地发送 |
| | | if(msec == 0) { |
| | | ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key); |
| | | ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY); |
| | | } else if(msec > 0) { |
| | | timeout.tv_sec = msec / 1000; |
| | | timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; |
| | | ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout); |
| | | ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout); |
| | | } else { |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key); |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY); |
| | | } |
| | | |
| | | if(ret == 0 ) { |
| | |
| | | // printf("dgram_mod_recvfrom before\n"); |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flags); |
| | | // printf("dgram_mod_recvfrom after\n"); |
| | | |
| | | return rv; |
| | | } |
| | | |
| | | /** |
| | | * 接收信息 |
| | | * @key 从谁哪里收到的信息 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::recvfrom(void **buf, int *size, int *key) { |
| | | |
| | | return _recvfrom_( buf, size, key, NULL, 0); |
| | | int rv = _recvfrom_( buf, size, key, NULL, 0); |
| | | // logger->error(rv, "ShmModSocket::recvfrom failed!"); |
| | | return rv; |
| | | } |
| | | |
| | | |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, struct timespec *timeout) { |
| | | return _recvfrom_(buf, size, key, timeout, 0); |
| | | int rv = _recvfrom_(buf, size, key, timeout, 0); |
| | | return rv; |
| | | } |
| | | |
| | | int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){ |
| | | return _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | int rv = _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | // logger->error(rv, "ShmModSocket::recvfrom_nowait failed!"); |
| | | return rv; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | int ShmModSocket::_sub_(char *topic, int topic_size, int key, |
| | | struct timespec *timeout, int flags) { |
| | | // char buf[8192]; |
| | | // int rv; |
| | | // snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); |
| | | // rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags); |
| | | // if(rv == 0) { |
| | | // bus_set->insert(key); |
| | | // } |
| | | // return rv; |
| | | |
| | | |
| | | int ret; |
| | | bus_head_t head = {}; |
| | |
| | | if(size > 0) { |
| | | ret = shm_sendto(shm_socket, buf, size, key, timeout, flags); |
| | | free(buf); |
| | | return ret; |
| | | if(ret == EBUS_TIMEOUT) { |
| | | logger->error(ret, "ShmModSocket::_desub_ key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT)); |
| | | return EBUS_TIMEOUT; |
| | | } else { |
| | | logger->error(ret, "ShmModSocket::_desub_ key %d failed!", key); |
| | | return ret; |
| | | } |
| | | |
| | | } else { |
| | | return -1; |
| | | } |
| | |
| | | delete remoteQueue; |
| | | mm_free(dest.buf); |
| | | if(rv == EBUS_TIMEOUT) { |
| | | bus_errno = EBUS_TIMEOUT; |
| | | logger->error(errno, "sendto key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT)); |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->error(rv, "sendto key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT)); |
| | | return EBUS_TIMEOUT; |
| | | } else { |
| | | //logger->error(errno, "sendto key %d failed!", key); |
| | | return -1; |
| | | return rv; |
| | | } |
| | | |
| | | |
| | |
| | | mm_free(src.buf); |
| | | return 0; |
| | | } else { |
| | | return -1; |
| | | |
| | | if(rv == EBUS_TIMEOUT) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->error("shm_recvfrom failed, %s", bus_strerror(EBUS_TIMEOUT)); |
| | | return EBUS_TIMEOUT; |
| | | } else { |
| | | logger->error(rv, "shm_recvfrom failed!"); |
| | | return rv; |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | |
| | | char sendbuf[512]; |
| | | int rv; |
| | | int remote_port; |
| | | while (net_mod_socket_recvfrom(serv, &recvbuf, &size, &remote_port) == 0) { |
| | | printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf); |
| | | net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port); |
| | | free(recvbuf); |
| | | while (true) { |
| | | if(net_mod_socket_recvfrom_timeout(serv, &recvbuf, &size, &remote_port, 0, 2000000000)==0) { |
| | | printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf); |
| | | net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port); |
| | | free(recvbuf); |
| | | } |
| | | |
| | | } |
| | | // sleep(1000); |
| | | net_mod_socket_close(serv); |