| | |
| | | |
| | | |
| | | |
| | | static void print_msg(char *head, shm_msg_t &msg) { |
| | | static void print_msg(char *head, shm_packet_t &msg) { |
| | | // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type); |
| | | } |
| | | |
| | |
| | | static void _create_socket_key_perthread(void); |
| | | |
| | | // 检查key是否已经被使用, 未被使用则绑定key |
| | | static LockFreeQueue<shm_msg_t> * shm_socket_bind_queue(int key, bool force) { |
| | | static LockFreeQueue<shm_packet_t> * shm_socket_bind_queue(int key, bool force) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | LockFreeQueue<shm_msg_t> *queue; |
| | | LockFreeQueue<shm_packet_t> *queue; |
| | | hashtable_lock(hashtable); |
| | | void *tmp_ptr = hashtable_get(hashtable, key); |
| | | |
| | | |
| | | if (tmp_ptr == NULL || tmp_ptr == (void *)1 ) { |
| | | queue = new LockFreeQueue<shm_msg_t>(16); |
| | | queue = new LockFreeQueue<shm_packet_t>(16); |
| | | hashtable_put(hashtable, key, (void *)queue); |
| | | hashtable_unlock(hashtable); |
| | | return queue; |
| | | } else if(force) { |
| | | hashtable_unlock(hashtable); |
| | | return (LockFreeQueue<shm_msg_t> *) queue; |
| | | return (LockFreeQueue<shm_packet_t> *) queue; |
| | | } |
| | | |
| | | hashtable_unlock(hashtable); |
| | |
| | | /** |
| | | * 绑定key到队列,但是并不会创建队列。 |
| | | */ |
| | | static LockFreeQueue<shm_msg_t> * shm_socket_attach_queue(int key) { |
| | | LockFreeQueue<shm_msg_t> * queue; |
| | | static LockFreeQueue<shm_packet_t> * shm_socket_attach_queue(int key) { |
| | | LockFreeQueue<shm_packet_t> * queue; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | void *tmp_ptr = hashtable_get(hashtable, key); |
| | | if (tmp_ptr == NULL || tmp_ptr == (void *)1) { |
| | |
| | | return NULL; |
| | | } |
| | | |
| | | queue = ( LockFreeQueue<shm_msg_t> *)tmp_ptr; |
| | | queue = ( LockFreeQueue<shm_packet_t> *)tmp_ptr; |
| | | // hashtable_unlock(hashtable); |
| | | return queue; |
| | | } |
| | |
| | | |
| | | size_t shm_socket_remove_keys(int keys[], size_t length) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | LockFreeQueue<shm_msg_t> *mqueue; |
| | | LockFreeQueue<shm_packet_t> *mqueue; |
| | | size_t count = 0; |
| | | for(int i = 0; i< length; i++) { |
| | | // 销毁共享内存的queue |
| | | mqueue = (LockFreeQueue<shm_msg_t> *)hashtable_get(hashtable, keys[i]); |
| | | mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]); |
| | | delete mqueue; |
| | | hashtable_remove(hashtable, keys[i]); |
| | | count++; |
| | |
| | | return EBUS_SENDTO_SELF; |
| | | } |
| | | |
| | | LockFreeQueue<shm_msg_t> *remoteQueue; |
| | | LockFreeQueue<shm_packet_t> *remoteQueue; |
| | | if ((remoteQueue = shm_socket_attach_queue(key)) == NULL) { |
| | | bus_errno = EBUS_CLOSED; |
| | | logger->error("sendto key %d failed, %s", key, bus_strerror(bus_errno)); |
| | | return EBUS_CLOSED; |
| | | } |
| | | |
| | | shm_msg_t dest; |
| | | shm_packet_t dest; |
| | | dest.type = SHM_COMMON_MSG; |
| | | dest.key = sockt->key; |
| | | dest.size = size; |
| | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |
| | | |
| | | shm_msg_t src; |
| | | shm_packet_t src; |
| | | |
| | | rv = sockt->queue->pop(src, timeout, flag); |
| | | |