| | |
| | | timeout.tv_sec = request_head.timeout / 1000; |
| | | timeout.tv_nsec = (request_head.timeout - timeout.tv_sec * 1000) * 10e6; |
| | | // printf(" timeout.tv_sec = %d, timeout.tv_nsec=%ld\n", timeout.tv_sec, timeout.tv_nsec ); |
| | | ret = shmModSocket.sendandrecv_unsafe_timeout(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, &timeout); |
| | | ret = shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, &timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | else if(request_head.timeout == 0) { |
| | | ret = shmModSocket.sendandrecv_unsafe_nowait(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size); |
| | | ret = shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | else if(request_head.timeout == -1) { |
| | | ret = shmModSocket.sendandrecv_unsafe(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size); |
| | |
| | | if(request_head.timeout > 0) { |
| | | timeout.tv_sec = request_head.timeout / 1000; |
| | | timeout.tv_nsec = (request_head.timeout - timeout.tv_sec * 1000) * 10e6; |
| | | ret = shmModSocket.pub_timeout((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY, &timeout); |
| | | ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | else if(request_head.timeout == 0) { |
| | | ret = shmModSocket.pub_nowait((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY); |
| | | ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | else if(request_head.timeout == -1) { |
| | | ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, SHM_BUS_KEY); |
| | |
| | | { |
| | | int s; |
| | | if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR) |
| | | logger->error(errno, "NetModSocket::NetModSocket signal"); |
| | | logger->error(errno, "NetModSocket::NetModSocket signal"); |
| | | |
| | | gpool = new NetConnPool(); |
| | | |
| | |
| | | // 本地发送 |
| | | |
| | | if(msec == 0) { |
| | | ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size); |
| | | ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, NULL, BUS_NOWAIT_FLAG); |
| | | } else if(msec > 0){ |
| | | timeout.tv_sec = msec / 1000; |
| | | timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; |
| | | ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout); |
| | | ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout, BUS_TIMEOUT_FLAG); |
| | | } else { |
| | | ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size); |
| | | } |
| | |
| | | // 本地发送 |
| | | if(node_arr == NULL || arrlen == 0) { |
| | | if(msec == 0) { |
| | | ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY); |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG); |
| | | } else if(msec > 0) { |
| | | timeout.tv_sec = msec / 1000; |
| | | timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; |
| | | ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout); |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG); |
| | | } else { |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY); |
| | | } |
| | |
| | | if(node->host == NULL) { |
| | | // 本地发送 |
| | | if(msec == 0) { |
| | | ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY); |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, NULL, BUS_NOWAIT_FLAG); |
| | | } else if(msec > 0) { |
| | | timeout.tv_sec = msec / 1000; |
| | | timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6; |
| | | ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout); |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout, BUS_TIMEOUT_FLAG); |
| | | } else { |
| | | ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY); |
| | | } |
| | |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int NetModSocket::sendto(const void *buf, const int size, const int key){ |
| | | int rv = shmModSocket.sendto(buf, size, key); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::sendto: %d sendto %d success.\n", get_key(), key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->debug("NetModSocket::sendto: %d sendto %d failed %s", get_key(), key, bus_strerror(rv)); |
| | | } else { |
| | | logger->error(rv, "NetModSocket::sendto : %d sendto %d failed", get_key(), key); |
| | | } |
| | | return rv; |
| | | return shmModSocket.sendto(buf, size, key); |
| | | } |
| | | |
| | | // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){ |
| | | struct timespec timeout = {sec, nsec}; |
| | | int rv = shmModSocket.sendto_timeout(buf, size, key, &timeout); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::sendto_timeout: %d sendto %d success.\n", get_key(), key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->debug("NetModSocket::sendto_timeout : %d sendto %d failed %s", get_key(), key, bus_strerror(rv)); |
| | | } else { |
| | | logger->error(rv, "NetModSocket::sendto_timeout: %d sendto %d failed", get_key(), key); |
| | | } |
| | | return rv; |
| | | return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG); |
| | | |
| | | } |
| | | |
| | | // 发送信息立刻返回。 |
| | | int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){ |
| | | int rv = shmModSocket.sendto_nowait(buf, size, key); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::sendto_nowait: %d sendto %d success.\n", get_key(), key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->debug("NetModSocket::sendto_nowait %d sendto %d failed %s", get_key(), key, bus_strerror(rv)); |
| | | |
| | | } else { |
| | | logger->error(rv, "NetModSocket::sendto_nowait %d sendto %d failed", get_key(), key); |
| | | } |
| | | return rv; |
| | | return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG); |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | int NetModSocket::recvfrom(void **buf, int *size, int *key) { |
| | | |
| | | logger->debug(" %d NetModSocket::recvfrom before", get_key()); |
| | | int rv = shmModSocket.recvfrom(buf, size, key); |
| | | |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::recvfrom: <<<< %d recvfrom %d success.\n", get_key(), *key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | logger->debug("NetModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv)); |
| | | } else { |
| | | logger->error(rv, "NetModSocket::recvfrom: socket %d recvfrom failed", get_key()); |
| | | } |
| | | return rv; |
| | | return shmModSocket.recvfrom(buf, size, key); |
| | | |
| | | } |
| | | |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){ |
| | | struct timespec timeout = {sec, nsec}; |
| | | int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::recvfrom_timeout: %d recvfrom %d success.\n", get_key(), *key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->debug("NetModSocket::recvfrom_timeout: %d recvfrom failed %s", get_key(), bus_strerror(rv)); |
| | | } else { |
| | | logger->error(rv, "NetModSocket::recvfrom_timeout: %d recvfrom failed", get_key()); |
| | | } |
| | | return rv; |
| | | return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG); |
| | | |
| | | } |
| | | |
| | | int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){ |
| | | int rv = shmModSocket.recvfrom_nowait(buf, size, key); |
| | | if(rv == 0) { |
| | | logger->debug("NetModSocket::recvfrom_nowait: %d recvfrom %d success.\n", get_key(), *key); |
| | | return 0; |
| | | } |
| | | |
| | | if(rv > EBUS_BASE) { |
| | | // bus_errno = EBUS_TIMEOUT; |
| | | logger->debug("NetModSocket::recvfrom_nowait: %d recvfrom failed %s", get_key(), bus_strerror(rv)); |
| | | } else { |
| | | logger->error(rv, "NetModSocket::recvfrom_nowait: %d recvfrom failed", get_key()); |
| | | } |
| | | return rv; |
| | | return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | /** |
| | |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int NetModSocket::sendandrecv_timeout( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, int sec, int nsec){ |
| | | struct timespec timeout = {sec, nsec}; |
| | | return shmModSocket.sendandrecv_timeout(send_buf, send_size, key, recv_buf, recv_size, &timeout); |
| | | return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, &timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | int NetModSocket::sendandrecv_nowait( const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) { |
| | | return shmModSocket.sendandrecv_nowait(send_buf, send_size, key, recv_buf, recv_size); |
| | | return shmModSocket.sendandrecv(send_buf, send_size, key, recv_buf, recv_size, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int NetModSocket::sub_timeout( void *topic, int size, int key, int sec, int nsec){ |
| | | struct timespec timeout = {sec, nsec}; |
| | | return shmModSocket.sub_timeout((char *)topic, size, key, &timeout); |
| | | return shmModSocket.sub((char *)topic, size, key, &timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | int NetModSocket::sub_nowait( void *topic, int size, int key){ |
| | | return shmModSocket.sub_nowait((char *)topic, size, key); |
| | | return shmModSocket.sub((char *)topic, size, key, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int NetModSocket::desub_timeout( void *topic, int size, int key, int sec, int nsec){ |
| | | struct timespec timeout = {sec, nsec}; |
| | | return shmModSocket.desub_timeout((char *)topic, size, key, &timeout); |
| | | return shmModSocket.desub((char *)topic, size, key, &timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | int NetModSocket::desub_nowait( void *topic, int size, int key){ |
| | | return shmModSocket.desub_nowait((char *)topic, size, key); |
| | | return shmModSocket.desub((char *)topic, size, key, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int NetModSocket::pub_timeout( char *topic, int topic_size, void *content, int content_size, int key, int sec, int nsec){ |
| | | struct timespec timeout = {sec, nsec}; |
| | | return shmModSocket.pub_timeout(topic, topic_size, content, content_size, key, &timeout); |
| | | return shmModSocket.pub(topic, topic_size, content, content_size, key, &timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | int NetModSocket::pub_nowait( char *topic, int topic_size, void *content, int content_size, int key){ |
| | | return shmModSocket.pub_nowait(topic, topic_size, content, content_size, key); |
| | | return shmModSocket.pub(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | |
| | | * @return 0是成功, 其他值是失败的错误码 |
| | | */ |
| | | int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key); |
| | | |
| | | /** |
| | | * @brief 等待接收信息,在指定的时间内即使没有接受到消息也要返回 |
| | | * |
| | |
| | | struct timespec timeout = {1, 0}; |
| | | if(bus_set != NULL) { |
| | | for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) { |
| | | desub_timeout(NULL, 0, *bus_iter, &timeout); |
| | | desub(NULL, 0, *bus_iter, &timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | delete bus_set; |
| | | } |
| | |
| | | int ShmModSocket::force_bind(int key) { |
| | | return shm_socket_force_bind(shm_socket, key); |
| | | } |
| | | |
| | | /** |
| | | * 发送信息 |
| | | * @key 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::sendto(const void *buf, const int size, const int key) { |
| | | return shm_sendto(shm_socket, buf, size, key, NULL, 0); |
| | | int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag) { |
| | | int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag); |
| | | if(rv == 0) { |
| | | logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key); |
| | | return 0; |
| | | } |
| | | |
| | | logger->debug("ShmModSocket::sendto : %d sendto %d failed %s", get_key(), key, bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) { |
| | | return shm_sendto(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | // 发送信息立刻返回。 |
| | | int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){ |
| | | return shm_sendto(shm_socket, buf, size, key, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 接收信息 |
| | | * @key 从谁哪里收到的信息 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::recvfrom(void **buf, int *size, int *key) { |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, NULL, 0); |
| | | |
| | | int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) { |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag); |
| | | |
| | | if(rv == 0) { |
| | | logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); |
| | | return 0; |
| | | } |
| | | |
| | | logger->debug("ShmModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | |
| | | |
| | | int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, const struct timespec *timeout) { |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG); |
| | | |
| | | return rv; |
| | | } |
| | | |
| | | int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){ |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, NULL, BUS_NOWAIT_FLAG); |
| | | // logger->error(rv, "ShmModSocket::recvfrom_nowait failed!"); |
| | | return rv; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 发送请求信息并等待接收应答 |
| | | * @key 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0); |
| | | int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key, |
| | | void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag); |
| | | } |
| | | |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, BUS_TIMEOUT_FLAG); |
| | | int ShmModSocket::sendandrecv_unsafe(const void *send_buf, const int send_size, const int send_key, |
| | | void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag); |
| | | } |
| | | int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | /** |
| | | * 订阅指定主题 |
| | | * @topic 主题 |
| | | * @size 主题长度 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::sub(char *topic, int size, int key){ |
| | | return _sub_( topic, size, key, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sub_timeout(char *topic, int size, int key, const struct timespec *timeout){ |
| | | return _sub_(topic, size, key, timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | int ShmModSocket::sub_nowait(char *topic, int size, int key) { |
| | | return _sub_(topic, size, key, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * 取消订阅指定主题 |
| | | * @topic 主题 |
| | | * @size 主题长度 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::desub(char *topic, int size, int key){ |
| | | return _desub_( topic, size, key, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::desub_timeout(char *topic, int size, int key, const struct timespec *timeout){ |
| | | return _desub_(topic, size, key, timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | int ShmModSocket::desub_nowait(char *topic, int size, int key) { |
| | | return _desub_(topic, size, key, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * 发布主题 |
| | | * @topic 主题 |
| | | * @content 主题内容 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key){ |
| | | return _pub_(topic, topic_size, content, content_size, key, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec * timeout){ |
| | | return _pub_( topic, topic_size, content, content_size, key, timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){ |
| | | return _pub_(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 获取soket key |
| | | */ |
| | | int ShmModSocket::get_key(){ |
| | | return shm_socket->key; |
| | | } |
| | | |
| | | |
| | | |
| | | // ============================================================================= |
| | | /** |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::_sub_(char *topic, int topic_size, int key, |
| | | int ShmModSocket::sub(char *topic, int topic_size, int key, |
| | | const struct timespec *timeout, int flags) { |
| | | |
| | | |
| | | int ret; |
| | | bus_head_t head = {}; |
| | | memcpy(head.action, "sub", sizeof(head.action)); |
| | |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | /** |
| | | * 取消订阅指定主题 |
| | | * @topic 主题 |
| | | * @size 主题长度 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::_desub_(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) { |
| | | int ShmModSocket::desub(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) { |
| | | // char buf[8192]; |
| | | int ret; |
| | | if(topic == NULL) { |
| | |
| | | |
| | | } |
| | | |
| | | /** |
| | | * @key 总线端口 |
| | | * @str "<**pub**>{经济}" |
| | | */ |
| | | |
| | | int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) { |
| | | // int head_len; |
| | | // char buf[8192+content_size]; |
| | | // snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); |
| | | // head_len = strlen(buf); |
| | | // memcpy(buf+head_len, content, content_size); |
| | | |
| | | |
| | | /** |
| | | * 发布主题 |
| | | * @topic 主题 |
| | | * @content 主题内容 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) { |
| | | int ret; |
| | | bus_head_t head = {}; |
| | | memcpy(head.action, "pub", sizeof(head.action)); |
| | |
| | | return -1; |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * 获取soket key |
| | | */ |
| | | int ShmModSocket::get_key(){ |
| | | return shm_socket->key; |
| | | } |
| | | |
| | | |
| | | |
| | | // ============================================================================= |
| | | |
| | | int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head, |
| | | void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) { |
| | |
| | | int buf_size; |
| | | char *buf; |
| | | int max_buf_size; |
| | | if((buf = (char *)malloc(MAXBUF)) == NULL) { |
| | | if((buf = (char *) malloc(MAXBUF)) == NULL) { |
| | | LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc"); |
| | | exit(1); |
| | | } else { |
| | |
| | | buf_size = BUS_HEAD_SIZE + content_size + topic_size ; |
| | | if(max_buf_size < buf_size) { |
| | | |
| | | if((buf = (char *)realloc(buf, buf_size)) == NULL) { |
| | | if((buf = (char *) realloc(buf, buf_size)) == NULL) { |
| | | LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf realloc buf"); |
| | | exit(1); |
| | | } else { |
| | |
| | | |
| | | private: |
| | | |
| | | int _sub_( char *topic, int size, int key, const struct timespec *timeouts, int flags); |
| | | int _pub_( char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeouts, int flags); |
| | | |
| | | int _desub_( char *topic, int size, int key, const struct timespec *timeouts, int flags); |
| | | |
| | | |
| | | |
| | | static int get_bus_sendbuf(bus_head_t &request_head, void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf); |
| | | |
| | |
| | | /** |
| | | * 发送信息 |
| | | * @key 发送给谁 |
| | | * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int sendto(const void *buf, const int size, const int key); |
| | | // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout); |
| | | // 发送信息立刻返回。 |
| | | int sendto_nowait(const void *buf, const int size, const int key); |
| | | |
| | | int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | |
| | | /** |
| | | * 接收信息 |
| | | * @key 从谁哪里收到的信息 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int recvfrom(void **buf, int *size, int *key); |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int recvfrom_timeout(void **buf, int *size, int *key, const struct timespec *timeout); |
| | | int recvfrom_nowait(void **buf, int *size, int *key); |
| | | |
| | | int recvfrom(void **buf, int *size, int *key, const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | /** |
| | | * 发送请求信息并等待接收应答 |
| | | * @key 发送给谁 |
| | | * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int sendandrecv(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ; |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int sendandrecv_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, const struct timespec *timeout) ; |
| | | int sendandrecv_nowait(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ; |
| | | |
| | | int sendandrecv(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, |
| | | const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | |
| | | int sendandrecv_unsafe(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ; |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, const struct timespec *timeout) ; |
| | | int sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ; |
| | | int sendandrecv_unsafe(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, |
| | | const struct timespec *timeout = NULL, int flag = 0) ; |
| | | |
| | | /** |
| | | * 订阅指定主题 |
| | | * @topic 主题 |
| | | * @size 主题长度 |
| | | * @key 总线端口 |
| | | * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG |
| | | */ |
| | | int sub(char *topic, int size, int key); |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int sub_timeout(char *topic, int size, int key, const struct timespec *timeout); |
| | | int sub_nowait(char *topic, int size, int key); |
| | | int sub(char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | |
| | | /** |
| | |
| | | * @topic 主题,主题为空时取消全部订阅 |
| | | * @size 主题长度 |
| | | * @key 总线端口 |
| | | * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG |
| | | */ |
| | | int desub( char *topic, int size, int key); |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int desub_timeout(char *topic, int size, int key, const struct timespec *timeout); |
| | | int desub_nowait(char *topic, int size, int key) ; |
| | | int desub(char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | /** |
| | | * 发布主题 |
| | | * @topic 主题 |
| | | * @content 主题内容 |
| | | * @key 总线端口 |
| | | * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG |
| | | */ |
| | | int pub(char *topic, int topic_size, void *content, int content_size, int key); |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout); |
| | | int pub_nowait(char *topic, int topic_size, void *content, int content_size, int key); |
| | | int pub(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | |
| | | /** |
| | |
| | | ./test_net_mod_socket --fun="start_reply" --key=100 & server_pid=$! && echo "pid: ${server_pid}" |
| | | } |
| | | |
| | | # 交互式客户端 |
| | | function client() { |
| | | |
| | | # ./test_net_mod_socket --fun="start_net_client" \ |
| | |
| | | |
| | | } |
| | | |
| | | # 无限循环send |
| | | function send() { |
| | | ./test_net_mod_socket --fun="test_net_sendandrecv" \ |
| | | --sendlist="localhost:5000:100, localhost:5000:100" |
| | | |
| | | } |
| | | # 多线程send |
| | | function msend() { |
| | | ./test_net_mod_socket --fun="test_net_sendandrecv_threads" \ |
| | | --sendlist="localhost:5000:100, localhost:5000:100" |
| | | |
| | | } |
| | | |
| | | # 无限循环 pub |
| | | function pub() { |
| | | ./test_net_mod_socket --fun="test_net_pub" \ |
| | | --publist="localhost:5000, localhost:5000" |
| | | |
| | | } |
| | | # 多线程pub |
| | | function mpub() { |
| | | ./test_net_mod_socket --fun="test_net_pub_threads" \ |
| | | --publist="localhost:5000, localhost:5000" |
| | |
| | | "msend") |
| | | msend |
| | | ;; |
| | | "send") |
| | | send |
| | | ;; |
| | | "mpub") |
| | | mpub |
| | | ;; |
| | | "pub") |
| | | pub |
| | | ;; |
| | | "close") |
| | | close |
| | | ;; |
| | |
| | | #include "shm_mm_wrapper.h" |
| | | #include "usg_common.h" |
| | | #include <getopt.h> |
| | | #include "logger_factory.h" |
| | | |
| | | #define SCALE 100000 |
| | | |
| | |
| | | int remote_port; |
| | | while ( (rv = net_mod_socket_recvfrom(client, &recvbuf, &size, &remote_port) ) == 0) { |
| | | // printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf); |
| | | sprintf(sendbuf, "RECEIVED PORT %d NAME %s", remote_port, recvbuf); |
| | | sprintf(sendbuf, "RECEIVED: %s", recvbuf); |
| | | net_mod_socket_sendto(client, sendbuf, strlen(sendbuf) + 1, remote_port); |
| | | free(recvbuf); |
| | | } |
| | |
| | | n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1000); |
| | | printf(" %d nodes reply\n", n); |
| | | for(i=0; i<recv_arr_size; i++) { |
| | | printf("host:%s, port: %d, key:%d, content: %s\n", |
| | | printf("reply from (host:%s, port: %d, key:%d) >> %s\n", |
| | | recv_arr[i].host, |
| | | recv_arr[i].port, |
| | | recv_arr[i].key, |
| | |
| | | Targ *targ = (Targ *)arg; |
| | | char sendbuf[512]; |
| | | |
| | | int i,j, n, recv_arr_size; |
| | | int i,j, n; |
| | | int recv_arr_size; |
| | | net_mod_recv_msg_t *recv_arr; |
| | | int total = 0; |
| | | |
| | |
| | | n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000); |
| | | printf("%d: send %d nodes\n", i, n); |
| | | for(j=0; j < recv_arr_size; j++) { |
| | | fprintf(fp, "reply: host:%s, port: %d, key:%d, content: %s\n", |
| | | fprintf(fp, "reply from (host:%s, port: %d, key:%d) >> %s\n", |
| | | recv_arr[j].host, |
| | | recv_arr[j].port, |
| | | recv_arr[j].key, |
| | |
| | | return (void *)total; |
| | | } |
| | | |
| | | //多线程send |
| | | void test_net_sendandrecv_threads(char *nodelist) { |
| | | |
| | | int status, i = 0, processors = 1; |
| | | int status, i = 0, processors = 4; |
| | | void *res[processors]; |
| | | // Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); |
| | | Targ targs[processors]; |
| | |
| | | |
| | | } |
| | | |
| | | // 无限循环send |
| | | void test_net_sendandrecv(char *nodelist) { |
| | | |
| | | int n, i; |
| | | void * client; |
| | | int recv_arr_size; |
| | | net_mod_recv_msg_t *recv_arr; |
| | | net_node_t *node_arr; |
| | | int node_arr_size = parse_node_list(nodelist, &node_arr); |
| | | char content[128]; |
| | | |
| | | sprintf(content, "pid:%ld say Hello!!", (long)getpid()); |
| | | client = net_mod_socket_open(); |
| | | while(true) { |
| | | n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1000); |
| | | printf(" %d nodes reply\n", n); |
| | | for(i=0; i<recv_arr_size; i++) { |
| | | LoggerFactory::getLogger()->debug("reply from (host:%s, port: %d, key:%d) >> %s\n", |
| | | recv_arr[i].host, |
| | | recv_arr[i].port, |
| | | recv_arr[i].key, |
| | | recv_arr[i].content |
| | | ); |
| | | } |
| | | |
| | | // 使用完后,不要忘记释放掉 |
| | | net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); |
| | | } |
| | | |
| | | net_mod_socket_close(client); |
| | | |
| | | } |
| | | |
| | | void *_run_pub_(void *arg) { |
| | | Targ *targ = (Targ *)arg; |
| | | char sendbuf[512]; |
| | | char sendbuf[128]; |
| | | |
| | | int i,j, n; |
| | | int total = 0; |
| | |
| | | int node_arr_size = parse_node_list(targ->nodelist, &node_arr); |
| | | |
| | | char *topic = "news"; |
| | | |
| | | |
| | | |
| | | // char filename[512]; |
| | | // sprintf(filename, "test%d.tmp", targ->id); |
| | | // FILE *fp = NULL; |
| | |
| | | |
| | | n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1); |
| | | // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); |
| | | printf( "pub:%s to %d nodes\n", sendbuf, n); |
| | | LoggerFactory::getLogger()->debug( "pub:%s to %d nodes\n", sendbuf, n); |
| | | total += n; |
| | | } |
| | | // fclose(fp); |
| | |
| | | return (void *)total; |
| | | } |
| | | |
| | | //多线程pub |
| | | void test_net_pub_threads(char *nodelist) { |
| | | |
| | | int status, i = 0, processors = 4; |
| | |
| | | // fflush(stdout); |
| | | net_mod_socket_close(client); |
| | | } |
| | | |
| | | // 无限循环pub |
| | | void test_net_pub(char *nodelist) { |
| | | |
| | | int n; |
| | | char sendbuf[512]; |
| | | net_node_t *node_arr; |
| | | int node_arr_size = parse_node_list(nodelist, &node_arr); |
| | | |
| | | char *topic = "news"; |
| | | sprintf(sendbuf, "pid:%ld: Good news!!", (long)getpid()); |
| | | |
| | | void * client = net_mod_socket_open(); |
| | | while (true) { |
| | | n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1); |
| | | // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); |
| | | LoggerFactory::getLogger()->debug( "pub to %d nodes\n", n); |
| | | } |
| | | net_mod_socket_close(client); |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | |
| | | |
| | | test_net_sendandrecv_threads(opt.sendlist); |
| | | } |
| | | else if (strcmp("test_net_sendandrecv", opt.fun) == 0) { |
| | | if(opt.sendlist == 0) { |
| | | fprintf(stderr, "Missing sendlist .\n"); |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | |
| | | test_net_sendandrecv(opt.sendlist); |
| | | } |
| | | else if (strcmp("test_net_pub_threads", opt.fun) == 0) { |
| | | if(opt.publist == 0) { |
| | | fprintf(stderr, "Missing publist .\n"); |
| | |
| | | |
| | | test_net_pub_threads(opt.publist); |
| | | } |
| | | else if (strcmp("test_net_pub", opt.fun) == 0) { |
| | | if(opt.publist == 0) { |
| | | fprintf(stderr, "Missing publist .\n"); |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | |
| | | test_net_pub(opt.publist); |
| | | } |
| | | |
| | | else { |
| | | usage(argv[0]); |