| | |
| | | } |
| | | |
| | | static pthread_once_t _once_ = PTHREAD_ONCE_INIT; |
| | | static pthread_key_t _perthread_socket_key_; |
| | | static pthread_key_t _localthread_socket_key_; |
| | | |
| | | static void _destrory_socket_perthread(void *tmp_socket); |
| | | static void _create_socket_key_perthread(void); |
| | | static void _destrory_threadlocal_socket_(void *tmp_socket); |
| | | static void _create_threadlocal_socket_key_(void); |
| | | |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak , const struct timespec *timeout, int flag); |
| | | |
| | | |
| | | static int shm_sendpakto(shm_socket_t *sockt, const shm_packet_t *sendpak, |
| | | static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, |
| | | const int key, const struct timespec *timeout, const int flag); |
| | | |
| | | |
| | |
| | | return queue; |
| | | } else if(force) { |
| | | hashtable_unlock(hashtable); |
| | | return (LockFreeQueue<shm_packet_t> *) queue; |
| | | return (LockFreeQueue<shm_packet_t> *) tmp_ptr; |
| | | } |
| | | |
| | | hashtable_unlock(hashtable); |
| | |
| | | |
| | | int rv; |
| | | logger->debug("shm_socket_close\n"); |
| | | // hashtable_remove(hashtable, mkey); |
| | | // if(sockt->queue != NULL) { |
| | | // delete sockt->queue; |
| | | // sockt->queue = NULL; |
| | | // } |
| | | |
| | | // hashtable_remove(hashtable, mkey); |
| | | |
| | | |
| | | if(sockt->key != 0) { |
| | | auto it = shmQueueStMap->find(sockt->key); |
| | | if(it != shmQueueStMap->end()) { |
| | |
| | | |
| | | |
| | | int shm_socket_close(shm_socket_t *sockt) { |
| | | |
| | | shm_socket_t * threadlocal_socket = (shm_socket_t *)pthread_getspecific(_localthread_socket_key_); |
| | | if(threadlocal_socket != NULL) { |
| | | _destrory_threadlocal_socket_(threadlocal_socket); |
| | | } |
| | | return _shm_socket_close_(sockt); |
| | | } |
| | | |
| | |
| | | |
| | | if (rv != 0) { |
| | | |
| | | if(rv == ETIMEDOUT) |
| | | return EBUS_TIMEOUT; |
| | | else { |
| | | logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); |
| | | return rv; |
| | | |
| | | } |
| | | |
| | |
| | | if(key != NULL) |
| | | *key = recvpak.key; |
| | | |
| | | if(recvpak.key == 0) { |
| | | err_exit(0, "key = %d, pid= %d, recvpak.key == 0", shm_socket_get_key(sockt), getpid()); |
| | | } |
| | | mm_free(recvpak.buf); |
| | | return 0; |
| | | } |
| | |
| | | // ================================================================================================= |
| | | |
| | | /* Free thread-specific data buffer */ |
| | | static void _destrory_socket_perthread(void *tmp_socket) |
| | | static void _destrory_threadlocal_socket_(void *tmp_socket) |
| | | { |
| | | int rv; |
| | | |
| | | logger->debug("%lu destroy threadlocal socket\n", pthread_self()); |
| | | |
| | | if(tmp_socket == NULL) |
| | | return; |
| | | |
| | | logger->debug("%d destroy tmp socket\n", pthread_self()); |
| | | |
| | | _shm_socket_close_((shm_socket_t *)tmp_socket); |
| | | rv = pthread_setspecific(_perthread_socket_key_, NULL); |
| | | rv = pthread_setspecific(_localthread_socket_key_, NULL); |
| | | if ( rv != 0) { |
| | | logger->error(rv, "shm_sendandrecv : pthread_setspecific"); |
| | | exit(1); |
| | |
| | | } |
| | | |
| | | /* One-time key creation function */ |
| | | static void _create_socket_key_perthread(void) |
| | | static void _create_threadlocal_socket_key_(void) |
| | | { |
| | | int s; |
| | | |
| | | /* Allocate a unique thread-specific data key and save the address |
| | | of the destructor for thread-specific data buffers */ |
| | | s = pthread_key_create(&_perthread_socket_key_, _destrory_socket_perthread); |
| | | //s = pthread_key_create(&_perthread_socket_key_, NULL); |
| | | s = pthread_key_create(&_localthread_socket_key_, _destrory_threadlocal_socket_); |
| | | //s = pthread_key_create(&_localthread_socket_key_, NULL); |
| | | if (s != 0) { |
| | | logger->error(s, "pthread_key_create"); |
| | | exit(1); |
| | |
| | | int *recv_size, const struct timespec *timeout, int flags) { |
| | | |
| | | |
| | | int rv, tryn = 6; |
| | | int rv = 0, tryn = 16; |
| | | shm_packet_t sendpak; |
| | | shm_packet_t recvpak; |
| | | std::map<int, shm_packet_t>::iterator recvbufIter; |
| | | // 用thread local 保证每个线程用一个独占的socket接受对方返回的信息 |
| | | shm_socket_t *tmp_socket; |
| | | shm_socket_t *tmp_socket = NULL; |
| | | |
| | | /* If first call from this thread, allocate buffer for thread, and save its location */ |
| | | // logger->debug("%d create tmp socket\n", pthread_self() ); |
| | | rv = pthread_once(&_once_, _create_socket_key_perthread); |
| | | rv = pthread_once(&_once_, _create_threadlocal_socket_key_); |
| | | if (rv != 0) { |
| | | logger->error(rv, "shm_sendandrecv pthread_once"); |
| | | exit(1); |
| | | } |
| | | |
| | | tmp_socket = (shm_socket_t *)pthread_getspecific(_perthread_socket_key_); |
| | | tmp_socket = (shm_socket_t *)pthread_getspecific(_localthread_socket_key_); |
| | | if (tmp_socket == NULL) |
| | | { |
| | | /* If first call from this thread, allocate buffer for thread, and save its location */ |
| | | logger->debug("%ld create tmp socket\n", (long)pthread_self() ); |
| | | logger->debug("%lu create threadlocal socket\n", (long)pthread_self() ); |
| | | tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM); |
| | | |
| | | rv = pthread_setspecific(_perthread_socket_key_, tmp_socket); |
| | | rv = pthread_setspecific(_localthread_socket_key_, tmp_socket); |
| | | if ( rv != 0) { |
| | | logger->error(rv, "shm_sendandrecv : pthread_setspecific"); |
| | | exit(1); |
| | | } |
| | | } |
| | | |
| | | |
| | | sendpak.key = tmp_socket->key; |
| | | sendpak.size = send_size; |
| | | if(send_buf != NULL) { |
| | |
| | | rv = shm_recvpakfrom(tmp_socket, &recvpak, timeout, flags); |
| | | |
| | | if (rv != 0) { |
| | | |
| | | if(rv == ETIMEDOUT) { |
| | | return EBUS_TIMEOUT; |
| | | } |
| | | |
| | | logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | |
| | | tryn++; |
| | | rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags); |
| | | if(rv != 0) { |
| | | logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv)); |
| | | logger->error("_shm_sendandrecv_alloc_new : %s\n", bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | |
| | |
| | | |
| | | |
| | | |
| | | static int shm_sendpakto(shm_socket_t *sockt, const shm_packet_t *sendpak, |
| | | static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, |
| | | const int key, const struct timespec *timeout, const int flag) { |
| | | |
| | | int rv; |
| | |
| | | if( sockt->queue != NULL) |
| | | goto LABEL_PUSH; |
| | | |
| | | if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) { |
| | | return EBUS_EXCEED_LIMIT; |
| | | } |
| | | // if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) { |
| | | // return EBUS_EXCEED_LIMIT; |
| | | // } |
| | | |
| | | { |
| | | if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | |
| | | sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); |
| | | if(sockt->queue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_sendto : pthread_mutex_unlock"); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | |
| | | goto ERR_CLOSED; |
| | | } |
| | | |
| | | sendpak->key = sockt->key; |
| | | rv = remoteQueue->push(*sendpak, timeout, flag); |
| | | if(rv == ETIMEDOUT) { |
| | | return EBUS_TIMEOUT; |
| | | } |
| | | return rv; |
| | | |
| | | ERR_CLOSED: |
| | |
| | | if( sockt->queue != NULL) |
| | | goto LABEL_POP; |
| | | |
| | | if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) { |
| | | return EBUS_EXCEED_LIMIT; |
| | | } |
| | | // if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) { |
| | | // return EBUS_EXCEED_LIMIT; |
| | | // } |
| | | |
| | | { |
| | | if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | |
| | | sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); |
| | | if(sockt->queue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | |
| | | |
| | | LABEL_POP: |
| | | |
| | | // 检查key标记的状态 |
| | | // auto shmQueueMapIter = shmQueueStMap->find(sockt->key); |
| | | // if(shmQueueMapIter != shmQueueStMap->end()) { |
| | | // stRecord = shmQueueMapIter->second; |
| | | // if(stRecord.status = SHM_QUEUE_ST_CLOSED) { |
| | | // // key对应的状态是关闭的 |
| | | // goto ERR_CLOSED; |
| | | // } |
| | | // } |
| | | |
| | | rv = sockt->queue->pop(recvpak, timeout, flag); |
| | | if(rv != 0) |
| | | if(rv != 0) { |
| | | if(rv == ETIMEDOUT) { |
| | | return EBUS_TIMEOUT; |
| | | } |
| | | return rv; |
| | | } |
| | | |
| | | |
| | | if(recvpak.action == BUS_ACTION_STOP) { |
| | | return EBUS_STOPED; |
| | | } |
| | | *_recvpak = recvpak; |
| | | return rv; |
| | | return 0; |
| | | } |
| | | |