| | |
| | | { |
| | | int s; |
| | | if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR) |
| | | logger->error(errno, "NetModSocket::NetModSocket signal"); |
| | | logger->error(errno, "NetModSocket::NetModSocket signal"); |
| | | |
| | | gpool = new NetConnPool(); |
| | | // gpool = new NetConnPool(); |
| | | |
| | | pthread_mutexattr_t mtxAttr; |
| | | s = pthread_mutexattr_init(&mtxAttr); |
| | | if (s != 0) |
| | | err_exit(s, "pthread_mutexattr_init"); |
| | | s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK); |
| | | if (s != 0) |
| | | err_exit(s, "pthread_mutexattr_settype"); |
| | | s = pthread_mutex_init(&sendMutex, &mtxAttr); |
| | | if (s != 0) |
| | | err_exit(s, "pthread_mutex_init"); |
| | | // pthread_mutexattr_t mtxAttr; |
| | | // s = pthread_mutexattr_init(&mtxAttr); |
| | | // if (s != 0) |
| | | // err_exit(s, "pthread_mutexattr_init"); |
| | | // s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK); |
| | | // if (s != 0) |
| | | // err_exit(s, "pthread_mutexattr_settype"); |
| | | // s = pthread_mutex_init(&sendMutex, &mtxAttr); |
| | | // if (s != 0) |
| | | // err_exit(s, "pthread_mutex_init"); |
| | | |
| | | s = pthread_mutexattr_destroy(&mtxAttr); |
| | | if (s != 0) |
| | | err_exit(s, "pthread_mutexattr_destroy"); |
| | | // s = pthread_mutexattr_destroy(&mtxAttr); |
| | | // if (s != 0) |
| | | // err_exit(s, "pthread_mutexattr_destroy"); |
| | | |
| | | } |
| | | |
| | | |
| | | NetModSocket::~NetModSocket() { |
| | | int s; |
| | | delete gpool; |
| | | s = pthread_mutex_destroy(&sendMutex); |
| | | // delete gpool; |
| | | // s = pthread_mutex_destroy(&sendMutex); |
| | | if(s != 0) { |
| | | err_exit(s, "shm_close_socket"); |
| | | } |
| | |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec ) { |
| | | |
| | |
| | | // 本地发送 |
| | | |
| | | if(msec == 0) { |
| | | ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size); |
| | | ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, NULL, BUS_NOWAIT_FLAG); |
| | | } else if(msec > 0){ |
| | | timeout.tv_sec = msec / 1000; |
| | | timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; |
| | | ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout); |
| | | ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout, BUS_TIMEOUT_FLAG); |
| | | } else { |
| | | ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size); |
| | | } |
| | |
| | | // 本地发送 |
| | | if(node_arr == NULL || arrlen == 0) { |
| | | if(msec == 0) { |
| | | ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY); |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG); |
| | | } else if(msec > 0) { |
| | | timeout.tv_sec = msec / 1000; |
| | | timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; |
| | | ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout); |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG); |
| | | } else { |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY); |
| | | } |
| | | if(ret == 0 ) { |
| | | n_pub_suc++; |
| | | } |
| | | return n_pub_suc; |
| | | } |
| | | |
| | | for (i = 0; i < arrlen; i++) { |
| | |
| | | if(node->host == NULL) { |
| | | // 本地发送 |
| | | if(msec == 0) { |
| | | ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY); |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG); |
| | | } else if(msec > 0) { |
| | | timeout.tv_sec = msec / 1000; |
| | | timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; |
| | | ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout); |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG); |
| | | } else { |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY); |
| | | } |
| | |
| | | } |
| | | request_head.mod = BUS; |
| | | memcpy(request_head.host, node->host, sizeof(request_head.host)); |
| | | request_head.key = node->key; |
| | | request_head.key = SHM_BUS_KEY; |
| | | request_head.content_length = content_size; |
| | | request_head.topic_length = strlen(topic) + 1; |
| | | request_head.timeout = msec; |
| | |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int NetModSocket::sendto(const void *buf, const int size, const int key){ |
| | | int rv = shmModSocket.sendto(buf, size, key); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::sendto: %d sendto %d success.\n", get_key(), key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->debug("NetModSocket::sendto: %d sendto %d failed %s", get_key(), key, bus_strerror(rv)); |
| | | } else { |
| | | logger->error(rv, "NetModSocket::sendto : %d sendto %d failed", get_key(), key); |
| | | } |
| | | return rv; |
| | | return shmModSocket.sendto(buf, size, key); |
| | | } |
| | | |
| | | // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){ |
| | | struct timespec timeout = {sec, nsec}; |
| | | int rv = shmModSocket.sendto_timeout(buf, size, key, &timeout); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::sendto_timeout: %d sendto %d success.\n", get_key(), key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->debug("NetModSocket::sendto_timeout : %d sendto %d failed %s", get_key(), key, bus_strerror(rv)); |
| | | } else { |
| | | logger->error(rv, "NetModSocket::sendto_timeout: %d sendto %d failed", get_key(), key); |
| | | } |
| | | return rv; |
| | | return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG); |
| | | |
| | | } |
| | | |
| | | // 发送信息立刻返回。 |
| | | int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){ |
| | | int rv = shmModSocket.sendto_nowait(buf, size, key); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::sendto_nowait: %d sendto %d success.\n", get_key(), key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->debug("NetModSocket::sendto_nowait %d sendto %d failed %s", get_key(), key, bus_strerror(rv)); |
| | | |
| | | } else { |
| | | logger->error(rv, "NetModSocket::sendto_nowait %d sendto %d failed", get_key(), key); |
| | | } |
| | | return rv; |
| | | return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG); |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | int NetModSocket::recvfrom(void **buf, int *size, int *key) { |
| | | |
| | | logger->debug(" %d NetModSocket::recvfrom before", get_key()); |
| | | int rv = shmModSocket.recvfrom(buf, size, key); |
| | | |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::recvfrom: <<<< %d recvfrom %d success.\n", get_key(), *key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | logger->debug("NetModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv)); |
| | | } else { |
| | | logger->error(rv, "NetModSocket::recvfrom: socket %d recvfrom failed", get_key()); |
| | | } |
| | | return rv; |
| | | return shmModSocket.recvfrom(buf, size, key); |
| | | |
| | | } |
| | | |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){ |
| | | struct timespec timeout = {sec, nsec}; |
| | | int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::recvfrom_timeout: %d recvfrom %d success.\n", get_key(), *key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->debug("NetModSocket::recvfrom_timeout: %d recvfrom failed %s", get_key(), bus_strerror(rv)); |
| | | } else { |
| | | logger->error(rv, "NetModSocket::recvfrom_timeout: %d recvfrom failed", get_key()); |
| | | } |
| | | return rv; |
| | | return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG); |
| | | |
| | | } |
| | | |
| | | int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){ |
| | | int rv = shmModSocket.recvfrom_nowait(buf, size, key); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::recvfrom_nowait: %d recvfrom %d success.\n", get_key(), *key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->debug("NetModSocket::recvfrom_nowait: %d recvfrom failed %s", get_key(), bus_strerror(rv)); |
| | | } else { |
| | | logger->error(rv, "NetModSocket::recvfrom_nowait: %d recvfrom failed", get_key()); |
| | | } |
| | | return rv; |
| | | return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | | int NetModSocket::recvandsend(recvandsend_callback_fn callback, |
| | | const struct timespec *timeout , int flag, void * user_data ) { |
| | | |
| | | return shmModSocket.recvandsend(callback, timeout, flag, user_data); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 发送请求信息并等待接收应答 |
| | |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int NetModSocket::sendandrecv_timeout( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, int sec, int nsec){ |
| | | struct timespec timeout = {sec, nsec}; |
| | | return shmModSocket.sendandrecv_timeout(send_buf, send_size, key, recv_buf, recv_size, &timeout); |
| | | return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, &timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | int NetModSocket::sendandrecv_nowait( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) { |
| | | return shmModSocket.sendandrecv_nowait(send_buf, send_size, key, recv_buf, recv_size); |
| | | return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int NetModSocket::sub_timeout( void *topic, int size, int key, int sec, int nsec){ |
| | | struct timespec timeout = {sec, nsec}; |
| | | return shmModSocket.sub_timeout((char *)topic, size, key, &timeout); |
| | | return shmModSocket.sub((char *)topic, size, key, &timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | int NetModSocket::sub_nowait( void *topic, int size, int key){ |
| | | return shmModSocket.sub_nowait((char *)topic, size, key); |
| | | return shmModSocket.sub((char *)topic, size, key, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int NetModSocket::desub_timeout( void *topic, int size, int key, int sec, int nsec){ |
| | | struct timespec timeout = {sec, nsec}; |
| | | return shmModSocket.desub_timeout((char *)topic, size, key, &timeout); |
| | | return shmModSocket.desub((char *)topic, size, key, &timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | int NetModSocket::desub_nowait( void *topic, int size, int key){ |
| | | return shmModSocket.desub_nowait((char *)topic, size, key); |
| | | return shmModSocket.desub((char *)topic, size, key, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int NetModSocket::pub_timeout( char *topic, int topic_size, void *content, int content_size, int key, int sec, int nsec){ |
| | | struct timespec timeout = {sec, nsec}; |
| | | return shmModSocket.pub_timeout(topic, topic_size, content, content_size, key, &timeout); |
| | | return shmModSocket.pub(topic, topic_size, content, content_size, key, &timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | int NetModSocket::pub_nowait( char *topic, int topic_size, void *content, int content_size, int key){ |
| | | return shmModSocket.pub_nowait(topic, topic_size, content, content_size, key); |
| | | return shmModSocket.pub(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |