| | |
| | | |
| | | |
| | | |
| | | static void print_msg(char *head, shm_msg_t &msg) { |
| | | static void print_msg(char *head, shm_packet_t &msg) { |
| | | // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type); |
| | | } |
| | | |
| | |
| | | static void _create_socket_key_perthread(void); |
| | | |
| | | // 检查key是否已经被使用, 未被使用则绑定key |
| | | static LockFreeQueue<shm_msg_t> * shm_socket_bind_queue(int key, bool force) { |
| | | static LockFreeQueue<shm_packet_t> * shm_socket_bind_queue(int key, bool force) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | LockFreeQueue<shm_msg_t> *queue; |
| | | LockFreeQueue<shm_packet_t> *queue; |
| | | hashtable_lock(hashtable); |
| | | void *tmp_ptr = hashtable_get(hashtable, key); |
| | | |
| | | |
| | | if (tmp_ptr == NULL || tmp_ptr == (void *)1 ) { |
| | | queue = new LockFreeQueue<shm_msg_t>(16); |
| | | queue = new LockFreeQueue<shm_packet_t>(16); |
| | | hashtable_put(hashtable, key, (void *)queue); |
| | | hashtable_unlock(hashtable); |
| | | return queue; |
| | | } else if(force) { |
| | | hashtable_unlock(hashtable); |
| | | return (LockFreeQueue<shm_msg_t> *) queue; |
| | | return (LockFreeQueue<shm_packet_t> *) queue; |
| | | } |
| | | |
| | | hashtable_unlock(hashtable); |
| | |
| | | /** |
| | | * 绑定key到队列,但是并不会创建队列。 |
| | | */ |
| | | static LockFreeQueue<shm_msg_t> * shm_socket_attach_queue(int key) { |
| | | LockFreeQueue<shm_msg_t> * queue; |
| | | static LockFreeQueue<shm_packet_t> * shm_socket_attach_queue(int key) { |
| | | LockFreeQueue<shm_packet_t> * queue; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | void *tmp_ptr = hashtable_get(hashtable, key); |
| | | if (tmp_ptr == NULL || tmp_ptr == (void *)1) { |
| | |
| | | return NULL; |
| | | } |
| | | |
| | | queue = ( LockFreeQueue<shm_msg_t> *)tmp_ptr; |
| | | queue = ( LockFreeQueue<shm_packet_t> *)tmp_ptr; |
| | | // hashtable_unlock(hashtable); |
| | | return queue; |
| | | } |
| | |
| | | |
| | | size_t shm_socket_remove_keys(int keys[], size_t length) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | LockFreeQueue<shm_msg_t> *mqueue; |
| | | LockFreeQueue<shm_packet_t> *mqueue; |
| | | size_t count = 0; |
| | | for(int i = 0; i< length; i++) { |
| | | // 销毁共享内存的queue |
| | | mqueue = (LockFreeQueue<shm_msg_t> *)hashtable_get(hashtable, keys[i]); |
| | | mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]); |
| | | delete mqueue; |
| | | hashtable_remove(hashtable, keys[i]); |
| | | count++; |
| | |
| | | return EBUS_SENDTO_SELF; |
| | | } |
| | | |
| | | LockFreeQueue<shm_msg_t> *remoteQueue; |
| | | LockFreeQueue<shm_packet_t> *remoteQueue; |
| | | if ((remoteQueue = shm_socket_attach_queue(key)) == NULL) { |
| | | bus_errno = EBUS_CLOSED; |
| | | logger->error("sendto key %d failed, %s", key, bus_strerror(bus_errno)); |
| | | return EBUS_CLOSED; |
| | | } |
| | | |
| | | shm_msg_t dest; |
| | | dest.type = SHM_COMMON_MSG; |
| | | shm_packet_t dest; |
| | | dest.key = sockt->key; |
| | | dest.size = size; |
| | | dest.buf = mm_malloc(size); |
| | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |
| | | |
| | | shm_msg_t src; |
| | | shm_packet_t src; |
| | | |
| | | rv = sockt->queue->pop(src, timeout, flag); |
| | | |
| | |
| | | //s = pthread_key_create(&_perthread_socket_key_, NULL); |
| | | if (s != 0) { |
| | | logger->error(s, "pthread_key_create"); |
| | | abort(); /* dump core and terminate */ |
| | | exit(1); |
| | | } |
| | | } |
| | |
| | | int *recv_size, const struct timespec *timeout, int flags) { |
| | | int recv_key; |
| | | int rv; |
| | | int tryn = 0; |
| | | |
| | | // 用thread local 保证每个线程用一个独占的socket接受对方返回的信息 |
| | | shm_socket_t *tmp_socket; |
| | | |
| | | |
| | | /* If first call from this thread, allocate buffer for thread, and save its location */ |
| | | // logger->debug("%d create tmp socket\n", pthread_self() ); |
| | | rv = pthread_once(&_once_, _create_socket_key_perthread); |
| | | if (rv != 0) { |
| | | logger->error(rv, "shm_sendandrecv pthread_once"); |
| | |
| | | } |
| | | |
| | | if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) { |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); |
| | | if(rv != 0) { |
| | | logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); |
| | | } |
| | | else if(rv == 0 ) { |
| | | logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); |
| | | |
| | | if(recv_key == shm_socket_get_key(sockt)) { |
| | | logger->debug("=====收到了自己发给自己的消息\n"); |
| | | |
| | | while(tryn < 3) { |
| | | tryn++; |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); |
| | | if(rv != 0) { |
| | | logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | assert( send_key == recv_key); |
| | | |
| | | // 超时导致接发送对象,与返回对象不对应的情况 |
| | | if(send_key != recv_key) { |
| | | logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); |
| | | exit(1); |
| | | logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); |
| | | // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); |
| | | // exit(1); |
| | | continue; |
| | | // return EBUS_RECVFROM_WRONG_END; |
| | | } |
| | | |
| | | return 0; |
| | | } |
| | | return rv; |
| | | } else { |
| | | return rv; |
| | | } |
| | | |
| | | return EBUS_RECVFROM_WRONG_END; |
| | | } |
| | | |
| | | return rv; |
| | | } |
| | | |
| | | int _shm_sendandrecv_alloc_new(shm_socket_t *sockt, const void *send_buf, |
| | |
| | | int recv_key; |
| | | int rv; |
| | | |
| | | // 用thread local 保证每个线程用一个独占的socket接受对方返回的信息 |
| | | int tryn = 0; |
| | | shm_socket_t *tmp_socket; |
| | | |
| | | /* If first call from this thread, allocate buffer for thread, and save its location */ |
| | | // logger->debug("%d create tmp socket\n", pthread_self() ); |
| | | |
| | | tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM); |
| | | |
| | | if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) { |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); |
| | | |
| | | if(rv != 0) { |
| | | printf("_shm_sendandrecv_alloc_new : %s\n", bus_strerror(rv)); |
| | | } |
| | | else if(rv == 0 ) { |
| | | printf("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); |
| | | |
| | | if(recv_key == shm_socket_get_key(sockt)) { |
| | | printf("=====收到了自己发给自己的消息\n"); |
| | | } |
| | | assert( send_key == recv_key); |
| | | if(send_key != recv_key) { |
| | | err_exit(0, "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); |
| | | while(tryn < 3) { |
| | | tryn++; |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); |
| | | if(rv != 0) { |
| | | logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | |
| | | // 超时导致接发送对象,与返回对象不对应的情况 |
| | | if(send_key != recv_key) { |
| | | // logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key); |
| | | // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key); |
| | | |
| | | continue; |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | return EBUS_RECVFROM_WRONG_END; |
| | | } |
| | | |
| | | shm_close_socket(tmp_socket); |