| | |
| | | static pthread_once_t _once_ = PTHREAD_ONCE_INIT; |
| | | static pthread_key_t _perthread_socket_key_; |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | static void _destrory_socket_perthread(void *tmp_socket); |
| | | static void _create_socket_key_perthread(void); |
| | | |
| | | // 检查key是否已经被使用,是返回0, 否返回1 |
| | | // static int _shm_socket_check_key(shm_socket_t *socket) { |
| | | // void *tmp_ptr = mm_get_by_key(socket->key); |
| | | // if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) { |
| | | // bus_errno = EBUS_KEY_INUSED; |
| | | // logger->error("%s. key = %d ", bus_strerror(EBUS_KEY_INUSED), socket->key); |
| | | // return 0; |
| | | // } |
| | | // return 1; |
| | | // } |
| | | |
| | | // 检查key是否已经被使用, 未被使用则绑定key |
| | | static LockFreeQueue<shm_msg_t> * shm_socket_bind_queue(int key, bool force) { |
| | |
| | | return 0; |
| | | } |
| | | |
| | | // int shm_close_socket(shm_socket_t *socket) { |
| | | |
| | | // // _destrory_socket_perthread((shm_socket_t *)pthread_getspecific(_perthread_socket_key_)); |
| | | |
| | | // return shm_close_socket(socket);; |
| | | // } |
| | | |
| | | int shm_socket_bind(shm_socket_t *socket, int key) { |
| | | socket->key = key; |
| | |
| | | int shm_sendto(shm_socket_t *sockt, const void *buf, const int size, |
| | | const int key, const struct timespec *timeout, const int flag) { |
| | | |
| | | int s; |
| | | int rv; |
| | | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | |
| | | if ((s = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(s, "shm_sendto : pthread_mutex_lock"); |
| | | if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_sendto : pthread_mutex_lock"); |
| | | |
| | | if (sockt->queue == NULL) { |
| | | if (sockt->key == 0) { |
| | |
| | | } |
| | | } |
| | | |
| | | if ((s = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(s, "shm_sendto : pthread_mutex_unlock"); |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_sendto : pthread_mutex_unlock"); |
| | | |
| | | // There is some case where a sockt need to send to himeself, for example when bus server need to stop, he need to send himself |
| | | // a top message. |
| | |
| | | |
| | | // 短连接方式接受 |
| | | int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key, const struct timespec *timeout, int flag) { |
| | | int s; |
| | | int rv; |
| | | |
| | | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | if ((s = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(s, "shm_recvfrom : pthread_mutex_lock"); |
| | | if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_lock"); |
| | | |
| | | if (sockt->queue == NULL) { |
| | | if (sockt->key == 0) { |
| | |
| | | } |
| | | } |
| | | |
| | | if ((s = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(s, "shm_recvfrom : pthread_mutex_unlock"); |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |
| | | |
| | | shm_msg_t src; |
| | | |
| | |
| | | // 用thread local 保证每个线程用一个独占的socket接受对方返回的信息 |
| | | shm_socket_t *tmp_socket; |
| | | |
| | | if (sockt->socket_type != SHM_SOCKET_DGRAM) { |
| | | logger->error( "shm_socket.shm_sendandrecv: Can't invoke shm_sendandrecv method in a %d type socket " |
| | | "which is not a SHM_SOCKET_DGRAM socket ", |
| | | sockt->socket_type); |
| | | exit(1); |
| | | } |
| | | |
| | | /* If first call from this thread, allocate buffer for thread, and save its location */ |
| | | // logger->debug("%d create tmp socket\n", pthread_self() ); |
| | | tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM); |
| | |
| | | |
| | | return _shm_sendandrecv_thread_local(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout, flags); |
| | | } |
| | | |
| | | |
| | | // ============================================================================================================ |
| | | |
| | | |
| | | |
| | | |