| | |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int msec ) { |
| | | |
| | |
| | | return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG); |
| | | } |
| | | |
| | | |
| | | |
| | | int NetModSocket::recvandsend(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback) { |
| | | return shmModSocket.recvandsend( recvbuf, recvsize, key, callback); |
| | | |
| | | } |
| | | |
| | | int NetModSocket::recvandsend_timeout(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, |
| | | const struct timespec *timeout ) { |
| | | return shmModSocket.recvandsend( recvbuf, recvsize, key, callback, timeout, BUS_TIMEOUT_FLAG); |
| | | |
| | | } |
| | | |
| | | int NetModSocket::recvandsend_nowait(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback) { |
| | | return shmModSocket.recvandsend( recvbuf, recvsize, key, callback, NULL, BUS_NOWAIT_FLAG); |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 发送请求信息并等待接收应答 |
| | | * @key 发送给谁 |
| | |
| | | int sendandrecv_timeout( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, int sec, int nsec) ; |
| | | int sendandrecv_nowait( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size) ; |
| | | |
| | | |
| | | /** |
| | | |
| | | int recvandsend(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback) ; |
| | | |
| | | int recvandsend_timeout(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, |
| | | const struct timespec *timeout ); |
| | | int recvandsend_nowait(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback); |
| | | |
| | | /** |
| | | * 向node_arr 中的所有网络节点发布消息 |
| | | * @node_arr 网络节点组, @node_arr_len该数组长度 |
| | | * @topic 主题,@topic_size 该主题的长度 |
| | |
| | | return sockt->recvfrom_nowait(buf, size, key); |
| | | } |
| | | |
| | | |
| | | int net_mod_socket_recvandsend(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvandsend( recvbuf, recvsize, key, callback); |
| | | |
| | | } |
| | | |
| | | int net_mod_socket_recvandsend_timeout(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, |
| | | int sec, int nsec ) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | struct timespec timeout = {sec, nsec}; |
| | | return sockt->recvandsend_timeout( recvbuf, recvsize, key, callback, &timeout); |
| | | } |
| | | |
| | | int net_mod_socket_recvandsend_nowait(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvandsend_nowait( recvbuf, recvsize, key, callback); |
| | | } |
| | | |
| | | |
| | | int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | /** |
| | | * 如果建立连接的节点没有接受到消息等待timeout的时间后返回 |
| | |
| | | return sockt->sendandrecv_nowait(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * 向node_arr 中的所有网络节点发布消息 |
| | |
| | | */ |
| | | int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key); |
| | | |
| | | |
| | | |
| | | /** |
| | | * @brief 接受消息,并把callback函数返回的数据发送回对方,一直等待完成 |
| | | * |
| | | * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放 |
| | | * @param recvsize 接受到消息的长度 |
| | | * @param key 从谁哪里收到的信息 |
| | | * @callback void (*recv_callback_fn)(void **sendbuf, int *sendsize) |
| | | * sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示返回的数据,和返回数据的长度。 |
| | | * |
| | | * @return 0是成功, 其他值是失败的错误码 |
| | | */ |
| | | int net_mod_socket_recvandsend(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback); |
| | | |
| | | /** |
| | | * @brief 接受消息,并把callback函数返回的数据发送回对方,在指定的时间内即使没有完成也返回 |
| | | * |
| | | * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放 |
| | | * @param recvsize 接受到消息的长度 |
| | | * @param key 从谁哪里收到的信息 |
| | | * @callback void (*recv_callback_fn)(void **sendbuf, int *sendsize) |
| | | * sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示返回的数据,和返回数据的长度。 |
| | | * |
| | | * @param sec 秒 |
| | | * @param nsec 纳秒 |
| | | * |
| | | * @return 0是成功, 其他值是失败的错误码 |
| | | */ |
| | | int net_mod_socket_recvandsend_timeout(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, |
| | | int sec, int nsec ) ; |
| | | |
| | | |
| | | /** |
| | | * @brief 接受消息,并把callback函数返回的数据发送回对方,无论成功与否立刻返回 |
| | | * |
| | | * @param recvbuf 接受到的消息存放的缓存地址,该buf使用完成后需要手动释放 |
| | | * @param recvsize 接受到消息的长度 |
| | | * @param key 从谁哪里收到的信息 |
| | | * @callback void (*recv_callback_fn)(void **sendbuf, int *sendsize) |
| | | * sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示返回的数据,和返回数据的长度。 |
| | | * |
| | | * @return 0是成功, 其他值是失败的错误码 |
| | | */ |
| | | int net_mod_socket_recvandsend_nowait(void *_socket, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback) ; |
| | | |
| | | |
| | | |
| | | /** |
| | | * @brief 跨机器发送消息并接受返回的应答消息,直到发送完成才返回 |
| | | * |
| | |
| | | * @return 成功发送的节点的个数 |
| | | * |
| | | */ |
| | | extern int net_mod_socket_sendandrecv(void *_sockt, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | int net_mod_socket_sendandrecv(void *_sockt, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) ; |
| | | |
| | | |
| | |
| | | |
| | | |
| | | |
| | | /** |
| | | * @brief 向node_arr中的所有网络节点发布消息 |
| | | * |
| | | * @param node_arr 网络节点组, @node_arr_len该数组长度 |
| | | * @param topic 主题, |
| | | * @param topic_size 该主题的长度 |
| | | * @param content 内容, |
| | | * @param content_size 内容长度 |
| | | * |
| | | * @return 成功发布的节点的个数 |
| | | */ |
| | | /** |
| | | * @brief 向node_arr中的所有网络节点发布消息 |
| | | * |
| | | * @param node_arr 网络节点组, @node_arr_len该数组长度 |
| | | * @param topic 主题, |
| | | * @param topic_size 该主题的长度 |
| | | * @param content 内容, |
| | | * @param content_size 内容长度 |
| | | * |
| | | * @return 成功发布的节点的个数 |
| | | */ |
| | | int net_mod_socket_pub(void *_sockt, net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size); |
| | | |
| | | |
| | |
| | | logger->debug("ShmModSocket::sendandrecv : sendandrecv to %d failed %s", send_key, bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | |
| | | |
| | | int ShmModSocket::recvandsend(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, |
| | | const struct timespec *timeout , int flag ) { |
| | | int rv = shm_recvandsend(shm_socket, recvbuf, recvsize, key, callback, timeout, flag); |
| | | if(rv == 0) { |
| | | logger->debug("ShmModSocket::shm_recvandsend: success. key = %d\n", *key); |
| | | return 0; |
| | | } |
| | | |
| | | logger->debug("ShmModSocket::shm_recvandsend : failed. %s", bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | |
| | | // // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | // int ShmModSocket::sendandrecv_unsafe(const void *send_buf, const int send_size, const int send_key, |
| | |
| | | const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int sendandrecv_unsafe(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, |
| | | const struct timespec *timeout = NULL, int flag = 0) ; |
| | | int recvandsend(void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, |
| | | const struct timespec *timeout = NULL , int flag = 0); |
| | | |
| | | /** |
| | | * 订阅指定主题 |
| | |
| | | #include <map> |
| | | #include <cassert> |
| | | #include "bus_error.h" |
| | | #include "sole.h" |
| | | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | |
| | | |
| | | static void _destrory_socket_perthread(void *tmp_socket); |
| | | static void _create_socket_key_perthread(void); |
| | | |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak , const struct timespec *timeout, int flag); |
| | | |
| | | |
| | | static int shm_sendpakto(shm_socket_t *sockt, const shm_packet_t *sendpak, |
| | | const int key, const struct timespec *timeout, const int flag); |
| | | |
| | | // 检查key是否已经被使用, 未被使用则绑定key |
| | | static LockFreeQueue<shm_packet_t> * shm_socket_bind_queue(int key, bool force) { |
| | |
| | | pthread_mutexattr_t mtxAttr; |
| | | |
| | | logger->debug("shm_open_socket\n"); |
| | | shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); |
| | | socket->socket_type = socket_type; |
| | | socket->key = 0; |
| | | socket->force_bind = false; |
| | | socket->queue = NULL; |
| | | // shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); |
| | | shm_socket_t *sockt = new shm_socket_t; |
| | | sockt->socket_type = socket_type; |
| | | sockt->key = 0; |
| | | sockt->force_bind = false; |
| | | sockt->queue = NULL; |
| | | |
| | | |
| | | s = pthread_mutexattr_init(&mtxAttr); |
| | |
| | | s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK); |
| | | if (s != 0) |
| | | err_exit(s, "pthread_mutexattr_settype"); |
| | | s = pthread_mutex_init(&(socket->mutex), &mtxAttr); |
| | | s = pthread_mutex_init(&(sockt->mutex), &mtxAttr); |
| | | if (s != 0) |
| | | err_exit(s, "pthread_mutex_init"); |
| | | |
| | |
| | | if (s != 0) |
| | | err_exit(s, "pthread_mutexattr_destroy"); |
| | | |
| | | return socket; |
| | | return sockt; |
| | | } |
| | | |
| | | int shm_close_socket(shm_socket_t *socket) { |
| | | int shm_close_socket(shm_socket_t *sockt) { |
| | | |
| | | int s; |
| | | |
| | | logger->debug("shm_close_socket\n"); |
| | | if(socket->queue != NULL) { |
| | | delete socket->queue; |
| | | socket->queue = NULL; |
| | | if(sockt->queue != NULL) { |
| | | delete sockt->queue; |
| | | sockt->queue = NULL; |
| | | } |
| | | |
| | | s = pthread_mutex_destroy(&(socket->mutex) ); |
| | | s = pthread_mutex_destroy(&(sockt->mutex) ); |
| | | if(s != 0) { |
| | | err_exit(s, "shm_close_socket"); |
| | | } |
| | | |
| | | free(socket); |
| | | free(sockt); |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | int shm_socket_bind(shm_socket_t *socket, int key) { |
| | | socket->key = key; |
| | | int shm_socket_bind(shm_socket_t *sockt, int key) { |
| | | sockt->key = key; |
| | | return 0; |
| | | } |
| | | |
| | | int shm_socket_force_bind(shm_socket_t *socket, int key) { |
| | | socket->force_bind = true; |
| | | socket->key = key; |
| | | int shm_socket_force_bind(shm_socket_t *sockt, int key) { |
| | | sockt->force_bind = true; |
| | | sockt->key = key; |
| | | return 0; |
| | | } |
| | | |
| | | int shm_socket_get_key(shm_socket_t *sk){ |
| | | return sk->key; |
| | | int shm_socket_get_key(shm_socket_t *sockt){ |
| | | return sockt->key; |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | // 短连接方式发送 |
| | |
| | | const int key, const struct timespec *timeout, const int flag) { |
| | | |
| | | int rv; |
| | | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | if( sockt->queue != NULL) |
| | | goto LABEL_PUSH; |
| | | |
| | | { |
| | | if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_sendto : pthread_mutex_lock"); |
| | | shm_packet_t sendpak; |
| | | sendpak.key = sockt->key; |
| | | sendpak.size = size; |
| | | sendpak.buf = mm_malloc(size); |
| | | memcpy(sendpak.buf, buf, size); |
| | | rv = shm_sendpakto(sockt, &sendpak, key, timeout, flag); |
| | | return rv; |
| | | } |
| | | |
| | | if (sockt->queue == NULL) { |
| | | if (sockt->key == 0) { |
| | | sockt->key = hashtable_alloc_key(hashtable); |
| | | } |
| | | sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); |
| | | if(sockt->queue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | |
| | | int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf, |
| | | const int send_size, const int key, void **recv_buf, |
| | | int *recv_size, const struct timespec *timeout, int flags) { |
| | | |
| | | int rv, tryn = 3; |
| | | shm_packet_t sendpak; |
| | | shm_packet_t recvpak; |
| | | std::map<std::string, shm_packet_t>::iterator recvbufIter; |
| | | std::string uuid = sole::uuid4().str(); |
| | | |
| | | sendpak.key = sockt->key; |
| | | sendpak.size = send_size; |
| | | sendpak.buf = mm_malloc(send_size); |
| | | memcpy(sendpak.buf, send_buf, send_size); |
| | | memcpy(sendpak.uuid, uuid.c_str(), uuid.length() + 1); |
| | | // uuid.copy(sendpak.uuid, sizeof sendpak.uuid); |
| | | rv = shm_sendpakto(sockt, &sendpak, key, timeout, flags); |
| | | |
| | | if(rv != 0) { |
| | | return rv; |
| | | } |
| | | |
| | | while(true) { |
| | | tryn--; |
| | | recvbufIter = sockt->recvbuf.find(uuid); |
| | | if(recvbufIter != sockt->recvbuf.end()) { |
| | | // 在缓存里查到了UUID匹配成功的 |
| | | logger->debug("get from recvbuf: %s", uuid.c_str()); |
| | | recvpak = recvbufIter->second; |
| | | sockt->recvbuf.erase(recvbufIter); |
| | | break; |
| | | } |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_sendto : pthread_mutex_unlock"); |
| | | |
| | | rv = shm_recvpakfrom(sockt, &recvpak, timeout, flags); |
| | | |
| | | if (rv != 0) { |
| | | |
| | | if(rv == ETIMEDOUT) |
| | | return EBUS_TIMEOUT; |
| | | |
| | | logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | |
| | | logger->debug("send uuid:%s, recv uuid: %s", uuid.c_str(), recvpak.uuid); |
| | | if (strncmp(uuid.c_str(), recvpak.uuid, sizeof recvpak.uuid) == 0) { |
| | | // 发送与接受的UUID匹配成功 |
| | | break; |
| | | } else { |
| | | // 答非所问,放到缓存里 |
| | | sockt->recvbuf.insert({recvpak.uuid, recvpak}); |
| | | continue; |
| | | } |
| | | |
| | | if(tryn == 0) { |
| | | // 尝试了tryn次都没有成功 |
| | | return EBUS_RECVFROM_WRONG_END; |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | LABEL_PUSH: |
| | | if (key == sockt->key) { |
| | | logger->error( "can not send to your self!"); |
| | | return EBUS_SENDTO_SELF; |
| | | LABLE_SUC: |
| | | if(recv_buf != NULL) { |
| | | void *_buf = malloc(recvpak.size); |
| | | memcpy(_buf, recvpak.buf, recvpak.size); |
| | | *recv_buf = _buf; |
| | | } |
| | | |
| | | if(recv_size != NULL) |
| | | *recv_size = recvpak.size; |
| | | |
| | | LockFreeQueue<shm_packet_t> *remoteQueue; |
| | | if ((remoteQueue = shm_socket_attach_queue(key)) == NULL) { |
| | | bus_errno = EBUS_CLOSED; |
| | | logger->error("sendto key %d failed, %s", key, bus_strerror(bus_errno)); |
| | | return EBUS_CLOSED; |
| | | } |
| | | mm_free(recvpak.buf); |
| | | |
| | | shm_packet_t dest; |
| | | dest.key = sockt->key; |
| | | dest.size = size; |
| | | dest.buf = mm_malloc(size); |
| | | memcpy(dest.buf, buf, size); |
| | | return 0; |
| | | |
| | | rv = remoteQueue->push(dest, timeout, flag); |
| | | } |
| | | |
| | | if (rv == 0) { |
| | | printf("%d sendto %d suc.\n", shm_socket_get_key(sockt), key); |
| | | return 0; |
| | | } else { |
| | | mm_free(dest.buf); |
| | | /** |
| | | * @callback void (*recv_callback_fn)(void **sendbuf, int *sendsize) |
| | | * sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示发送数据,和发送数据的大小。 |
| | | * |
| | | */ |
| | | int shm_recvandsend(shm_socket_t *sockt, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, |
| | | const struct timespec *timeout, int flag) { |
| | | |
| | | int rv; |
| | | |
| | | void *sendbuf = NULL; |
| | | int sendsize = 0; |
| | | shm_packet_t recvpak; |
| | | |
| | | rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag); |
| | | |
| | | |
| | | |
| | | if (rv != 0) { |
| | | if(rv == ETIMEDOUT) |
| | | return EBUS_TIMEOUT; |
| | | else { |
| | | logger->debug("====%d sendto key %d failed %s", shm_socket_get_key(sockt), key, bus_strerror(rv)); |
| | | logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | } |
| | | |
| | | if(recvbuf != NULL) { |
| | | void *_buf = malloc(recvpak.size); |
| | | memcpy(_buf, recvpak.buf, recvpak.size); |
| | | *recvbuf = _buf; |
| | | } |
| | | |
| | | if(recvsize != NULL) |
| | | *recvsize = recvpak.size; |
| | | |
| | | if(key != NULL) |
| | | *key = recvpak.key; |
| | | |
| | | mm_free(recvpak.buf); |
| | | |
| | | callback(&sendbuf, &sendsize); |
| | | |
| | | shm_packet_t sendpak; |
| | | sendpak.key = sockt->key; |
| | | sendpak.size = sendsize; |
| | | memcpy(sendpak.uuid, recvpak.uuid, sizeof sendpak.uuid); |
| | | if(sendbuf !=NULL && sendsize > 0) { |
| | | sendpak.buf = mm_malloc(sendsize); |
| | | memcpy(sendpak.buf, sendbuf, sendsize); |
| | | } |
| | | |
| | | rv = shm_sendpakto(sockt, &sendpak, recvpak.key, timeout, flag); |
| | | |
| | | return 0; |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | // 短连接方式接受 |
| | | int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key, const struct timespec *timeout, int flag) { |
| | | int rv; |
| | | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | shm_packet_t recvpak; |
| | | rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag); |
| | | |
| | | if( sockt->queue != NULL) |
| | | goto LABEL_POP; |
| | | if (rv != 0) { |
| | | |
| | | { |
| | | if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_lock"); |
| | | |
| | | |
| | | if (sockt->key == 0) { |
| | | sockt->key = hashtable_alloc_key(hashtable); |
| | | } |
| | | sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); |
| | | if(sockt->queue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |
| | | |
| | | } |
| | | |
| | | LABEL_POP: |
| | | |
| | | shm_packet_t src; |
| | | |
| | | rv = sockt->queue->pop(src, timeout, flag); |
| | | |
| | | if (rv == 0) { |
| | | if(buf != NULL) { |
| | | void *_buf = malloc(src.size); |
| | | memcpy(_buf, src.buf, src.size); |
| | | *buf = _buf; |
| | | } |
| | | |
| | | if(size != NULL) |
| | | *size = src.size; |
| | | |
| | | if(key != NULL) |
| | | *key = src.key; |
| | | |
| | | mm_free(src.buf); |
| | | return 0; |
| | | } else { |
| | | if(rv == ETIMEDOUT) |
| | | return EBUS_TIMEOUT; |
| | | else { |
| | |
| | | return rv; |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | if(buf != NULL) { |
| | | void *_buf = malloc(recvpak.size); |
| | | memcpy(_buf, recvpak.buf, recvpak.size); |
| | | *buf = _buf; |
| | | } |
| | | |
| | | if(size != NULL) |
| | | *size = recvpak.size; |
| | | |
| | | if(key != NULL) |
| | | *key = recvpak.key; |
| | | |
| | | mm_free(recvpak.buf); |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | // ================================================================================================= |
| | | |
| | | /* Free thread-specific data buffer */ |
| | | static void _destrory_socket_perthread(void *tmp_socket) |
| | |
| | | |
| | | } |
| | | |
| | | int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, |
| | | const int send_size, const int send_key, void **recv_buf, |
| | | int *recv_size, const struct timespec *timeout, int flags) { |
| | | |
| | | struct timespec tm = {10, 0}; |
| | | return _shm_sendandrecv_thread_local(socket, send_buf, send_size, send_key,recv_buf, recv_size, &tm, flags); |
| | | |
| | | static int shm_sendpakto(shm_socket_t *sockt, const shm_packet_t *sendpak, |
| | | const int key, const struct timespec *timeout, const int flag) { |
| | | |
| | | int rv; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | if( sockt->queue != NULL) |
| | | goto LABEL_PUSH; |
| | | |
| | | { |
| | | if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_sendto : pthread_mutex_lock"); |
| | | |
| | | if (sockt->queue == NULL) { |
| | | if (sockt->key == 0) { |
| | | sockt->key = hashtable_alloc_key(hashtable); |
| | | } |
| | | sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); |
| | | if(sockt->queue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | } |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_sendto : pthread_mutex_unlock"); |
| | | |
| | | } |
| | | |
| | | |
| | | LABEL_PUSH: |
| | | if (key == sockt->key) { |
| | | logger->error( "can not send to your self!"); |
| | | return EBUS_SENDTO_SELF; |
| | | } |
| | | |
| | | LockFreeQueue<shm_packet_t> *remoteQueue; |
| | | if ((remoteQueue = shm_socket_attach_queue(key)) == NULL) { |
| | | bus_errno = EBUS_CLOSED; |
| | | logger->error("sendto key %d failed, %s", key, bus_strerror(bus_errno)); |
| | | return EBUS_CLOSED; |
| | | } |
| | | |
| | | |
| | | |
| | | rv = remoteQueue->push(*sendpak, timeout, flag); |
| | | |
| | | return rv; |
| | | } |
| | | |
| | | // 短连接方式接受 |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak , const struct timespec *timeout, int flag) { |
| | | int rv; |
| | | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | if( sockt->queue != NULL) |
| | | goto LABEL_POP; |
| | | |
| | | { |
| | | if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_lock"); |
| | | |
| | | |
| | | if (sockt->key == 0) { |
| | | sockt->key = hashtable_alloc_key(hashtable); |
| | | } |
| | | sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); |
| | | if(sockt->queue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |
| | | |
| | | } |
| | | |
| | | LABEL_POP: |
| | | |
| | | |
| | | |
| | | rv = sockt->queue->pop(*recvpak, timeout, flag); |
| | | |
| | | return rv; |
| | | } |
| | | // int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf, |
| | | // const int send_size, const int send_key, void **recv_buf, |
| | | // int *recv_size, const struct timespec *timeout, int flags) { |
| | | |
| | | // struct timespec tm = {10, 0}; |
| | | // return _shm_sendandrecv_thread_local(sockt, send_buf, send_size, send_key,recv_buf, recv_size, &tm, flags); |
| | | // } |
| | |
| | | #include "usg_typedef.h" |
| | | #include "shm_queue.h" |
| | | #include "lock_free_queue.h" |
| | | #include <functional> |
| | | |
| | | enum shm_socket_type_t |
| | | { |
| | |
| | | |
| | | }; |
| | | |
| | | |
| | | |
| | | typedef struct shm_packet_t { |
| | | int key; |
| | | size_t size; |
| | | void * buf; |
| | | char uuid[64]; |
| | | |
| | | } shm_packet_t; |
| | | |
| | |
| | | |
| | | LockFreeQueue<shm_packet_t> *queue; //self queue |
| | | LockFreeQueue<shm_packet_t> *remoteQueue; // peer queue |
| | | std::map<std::string, shm_packet_t> recvbuf; |
| | | |
| | | |
| | | } shm_socket_t; |
| | | |
| | | |
| | | // typedef void (*recv_callback_fn)(void **sendbuf, int *sendsize); |
| | | typedef std::function<void(void **sendbuf, int *sendsize)> recv_callback_fn; |
| | | |
| | | size_t shm_socket_remove_keys(int keys[], size_t length); |
| | | |
| | |
| | | int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key, const struct timespec * timeout = NULL, int flags=0); |
| | | |
| | | int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, |
| | | const struct timespec * timeout = NULL, int flags=0); |
| | | const struct timespec * timeout = NULL, int flags = 0); |
| | | |
| | | /** |
| | | * 功能同shm_sendandrecv, 但是不是线程安全的 |
| | | * @callback void (*recv_callback_fn)(void **sendbuf, int *sendsize) |
| | | * sendbuf 和 sendsize是callbak_fn回调函数的返回值, 分别表示发送数据,和发送数据的大小。 |
| | | * |
| | | */ |
| | | int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, |
| | | const struct timespec * timeout = NULL, int flags=0); |
| | | int shm_recvandsend(shm_socket_t *sockt, void **recvbuf, int *recvsize, int *key, recv_callback_fn callback, |
| | | const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | |
| | | |
| | | |
| | |
| | | std::cout << "uuid v4 base62 : " << u4.base62() << std::endl; |
| | | std::cout << "uuid v4 pretty : " << u4.pretty() << std::endl << std::endl; |
| | | |
| | | std::cout << "uuid v4 length : " << u4.str().length() << std::endl; |
| | | |
| | | std::string test("123"); |
| | | std::cout << "test length : " << test.length() << std::endl; |
| | | u1 = sole::rebuild("F81D4FAE-7DEC-11D0-A765-00A0C91E6BF6"); |
| | | u4 = sole::rebuild("GITheR4tLlg-BagIW20DGja"); |
| | | |
| | |
| | | |
| | | |
| | | |
| | | void start_reply(int key) { |
| | | void start_reply(int mkey) { |
| | | printf("start reply\n"); |
| | | void *ser = net_mod_socket_open(); |
| | | net_mod_socket_bind(ser, key); |
| | | int size; |
| | | net_mod_socket_bind(ser, mkey); |
| | | int recvsize; |
| | | void *recvbuf; |
| | | char sendbuf[512]; |
| | | int rv; |
| | | int remote_port; |
| | | while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &remote_port) ) == 0) { |
| | | // printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf); |
| | | int key; |
| | | while(true) { |
| | | rv = net_mod_socket_recvandsend_timeout(ser, &recvbuf, &recvsize, &key, [&](void ** buf, int *size){ |
| | | printf( "server: RECEIVED REQUEST FROM %d : %s\n", key, recvbuf); |
| | | sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf); |
| | | net_mod_socket_sendto(ser, sendbuf, strlen(sendbuf) + 1, remote_port); |
| | | // buf 和 size是返回值 |
| | | *buf = sendbuf; |
| | | *size = strlen(sendbuf) + 1; |
| | | free(recvbuf); |
| | | return; |
| | | }, 0, 2000000 ); |
| | | } |
| | | // while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &key) ) == 0) { |
| | | // // printf( "server: RECEIVED REQUEST FROM %d NAME %s\n", key, recvbuf); |
| | | // sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf); |
| | | // net_mod_socket_sendto(ser, sendbuf, strlen(sendbuf) + 1, key); |
| | | // free(recvbuf); |
| | | // } |
| | | } |
| | | |
| | | // 交互式客户端 |
| | |
| | | if (fgets(content, MAXLINE, stdin) != NULL) { |
| | | // 收到消息的节点即使没有对应的信息, 也要回复一个表示无的消息,否则会一直等待 |
| | | // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size); |
| | | n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1000); |
| | | n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1); |
| | | printf(" %d nodes reply\n", n); |
| | | for(i=0; i<recv_arr_size; i++) { |
| | | printf("reply from (host:%s, port: %d, key:%d) >> %s\n", |
| | |
| | | sprintf(sendbuf, hello_format, net_mod_socket_get_key(client), l); |
| | | // fprintf(fp, "requst:%s\n", sendbuf); |
| | | // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size); |
| | | n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000); |
| | | n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1); |
| | | printf("%d: send %d nodes\n", l, n); |
| | | for(j=0; j < recv_arr_size; j++) { |
| | | |