| | |
| | | static LockFreeQueue<shm_msg_t> * shm_socket_attach_queue(int key) { |
| | | LockFreeQueue<shm_msg_t> * queue; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | // hashtable_lock(hashtable); |
| | | void *tmp_ptr = hashtable_get(hashtable, key); |
| | | if (tmp_ptr == NULL || tmp_ptr == (void *)1) { |
| | | //logger->error("shm_socket._remote_queue_attach:connet at key %d failed!", key); |
| | |
| | | int s; |
| | | int rv; |
| | | |
| | | if (sockt->socket_type != SHM_SOCKET_DGRAM) { |
| | | logger->error( "shm_socket.shm_sendto: Can't invoke shm_sendto method in a %d type socket which is " |
| | | "not a SHM_SOCKET_DGRAM socket ", |
| | | sockt->socket_type); |
| | | exit(0); |
| | | } |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | |
| | |
| | | |
| | | |
| | | // 短连接方式接受 |
| | | int shm_recvfrom(shm_socket_t *sokt, void **buf, int *size, int *key, const struct timespec *timeout, int flag) { |
| | | int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key, const struct timespec *timeout, int flag) { |
| | | int s; |
| | | int rv; |
| | | |
| | | if (sokt->socket_type != SHM_SOCKET_DGRAM) { |
| | | logger->error("shm_socket.shm_recvfrom: Can't invoke shm_recvfrom method in a %d type socket which " |
| | | "is not a SHM_SOCKET_DGRAM socket ", |
| | | sokt->socket_type); |
| | | exit(1); |
| | | } |
| | | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | if ((s = pthread_mutex_lock(&(sokt->mutex))) != 0) |
| | | if ((s = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(s, "shm_recvfrom : pthread_mutex_lock"); |
| | | |
| | | if (sokt->queue == NULL) { |
| | | if (sokt->key == 0) { |
| | | sokt->key = hashtable_alloc_key(hashtable); |
| | | if (sockt->queue == NULL) { |
| | | if (sockt->key == 0) { |
| | | sockt->key = hashtable_alloc_key(hashtable); |
| | | } |
| | | sokt->queue = shm_socket_bind_queue( sokt->key, sokt->force_bind); |
| | | if(sokt->queue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sokt->key); |
| | | sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); |
| | | if(sockt->queue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | } |
| | | |
| | | if ((s = pthread_mutex_unlock(&(sokt->mutex))) != 0) |
| | | if ((s = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(s, "shm_recvfrom : pthread_mutex_unlock"); |
| | | |
| | | shm_msg_t src; |
| | | |
| | | rv = sokt->queue->pop(src, timeout, flag); |
| | | rv = sockt->queue->pop(src, timeout, flag); |
| | | |
| | | if (rv == 0) { |
| | | if(buf != NULL) { |
| | |
| | | |
| | | |
| | | // use thread local |
| | | int _shm_sendandrecv_thread_local(shm_socket_t *socket, const void *send_buf, |
| | | int _shm_sendandrecv_thread_local(shm_socket_t *sockt, const void *send_buf, |
| | | const int send_size, const int send_key, void **recv_buf, |
| | | int *recv_size, const struct timespec *timeout, int flags) { |
| | | int recv_key; |
| | |
| | | |
| | | // 用thread local 保证每个线程用一个独占的socket接受对方返回的信息 |
| | | shm_socket_t *tmp_socket; |
| | | |
| | | if (socket->socket_type != SHM_SOCKET_DGRAM) { |
| | | logger->error( "shm_socket.shm_sendandrecv: Can't invoke shm_sendandrecv method in a %d type socket " |
| | | "which is not a SHM_SOCKET_DGRAM socket ", |
| | | socket->socket_type); |
| | | exit(1); |
| | | } |
| | | |
| | | |
| | | rv = pthread_once(&_once_, _create_socket_key_perthread); |
| | |
| | | if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) { |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); |
| | | if(rv != 0) { |
| | | printf("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); |
| | | logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); |
| | | } |
| | | else if(rv == 0 ) { |
| | | logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); |
| | | |
| | | if(recv_key == shm_socket_get_key(sockt)) { |
| | | logger->debug("=====收到了自己发给自己的消息\n"); |
| | | } |
| | | assert( send_key == recv_key); |
| | | if(send_key != recv_key) { |
| | | err_exit(0, "_shm_sendandrecv_thread_local: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); |
| | | logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); |
| | | exit(1); |
| | | } |
| | | |
| | | } |
| | | return rv; |
| | | } else { |
| | |
| | | const int send_size, const int send_key, void **recv_buf, |
| | | int *recv_size, const struct timespec *timeout, int flags) { |
| | | |
| | | return _shm_sendandrecv_alloc_new(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout, flags); |
| | | return _shm_sendandrecv_thread_local(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout, flags); |
| | | } |
| | | |
| | | |