| | |
| | | #include <cassert> |
| | | #include "bus_error.h" |
| | | #include "sole.h" |
| | | #include "shm_mm.h" |
| | | #include "key_def.h" |
| | | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | | |
| | | ShmQueueStMap * shmQueueStMap ; |
| | | |
| | | static void print_msg(char *head, shm_packet_t &msg) { |
| | | // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type); |
| | |
| | | if (s != 0) |
| | | err_exit(s, "pthread_mutexattr_destroy"); |
| | | |
| | | |
| | | shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); |
| | | |
| | | return sockt; |
| | | } |
| | | |
| | | |
| | | int shm_socket_close(shm_socket_t *sockt) { |
| | | static int _shm_socket_close_(shm_socket_t *sockt) { |
| | | |
| | | int rv; |
| | | logger->debug("shm_socket_close\n"); |
| | | if(sockt->queue != NULL) { |
| | | delete sockt->queue; |
| | | sockt->queue = NULL; |
| | | // if(sockt->queue != NULL) { |
| | | // delete sockt->queue; |
| | | // sockt->queue = NULL; |
| | | // } |
| | | |
| | | // hashtable_remove(hashtable, mkey); |
| | | |
| | | |
| | | if(sockt->key != 0) { |
| | | auto it = shmQueueStMap->find(sockt->key); |
| | | if(it != shmQueueStMap->end()) { |
| | | it->second.status = SHM_QUEUE_ST_CLOSED; |
| | | it->second.closeTime = time(NULL); |
| | | } |
| | | } |
| | | |
| | | pthread_mutex_destroy(&(sockt->mutex) ); |
| | | |
| | | |
| | | |
| | | pthread_mutex_destroy(&(sockt->mutex) ); |
| | | free(sockt); |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | int shm_socket_close(shm_socket_t *sockt) { |
| | | return _shm_socket_close_(sockt); |
| | | } |
| | | |
| | | |
| | |
| | | return; |
| | | |
| | | logger->debug("%d destroy tmp socket\n", pthread_self()); |
| | | shm_socket_close((shm_socket_t *)tmp_socket); |
| | | _shm_socket_close_((shm_socket_t *)tmp_socket); |
| | | rv = pthread_setspecific(_perthread_socket_key_, NULL); |
| | | if ( rv != 0) { |
| | | logger->error(rv, "shm_sendandrecv : pthread_setspecific"); |
| | |
| | | const int send_size, const int key, void **recv_buf, |
| | | int *recv_size, const struct timespec *timeout, int flags) { |
| | | |
| | | int rv, tryn = 3; |
| | | int rv, tryn = 6; |
| | | shm_packet_t sendpak; |
| | | shm_packet_t recvpak; |
| | | std::map<std::string, shm_packet_t>::iterator recvbufIter; |
| | |
| | | return EBUS_RECVFROM_WRONG_END; |
| | | } |
| | | |
| | | shm_socket_close(tmp_socket); |
| | | _shm_socket_close_(tmp_socket); |
| | | return rv; |
| | | |
| | | } |
| | |
| | | const int key, const struct timespec *timeout, const int flag) { |
| | | |
| | | int rv; |
| | | shm_queue_status_t stRecord; |
| | | LockFreeQueue<shm_packet_t> *remoteQueue; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | if( sockt->queue != NULL) |
| | |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | // 标记key对应的状态 ,为opened |
| | | stRecord.status = SHM_QUEUE_ST_OPENED; |
| | | stRecord.createTime = time(NULL); |
| | | shmQueueStMap->insert({sockt->key, stRecord}); |
| | | |
| | | } |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | |
| | | return EBUS_SENDTO_SELF; |
| | | } |
| | | |
| | | LockFreeQueue<shm_packet_t> *remoteQueue; |
| | | if ((remoteQueue = shm_socket_attach_queue(key)) == NULL) { |
| | | bus_errno = EBUS_CLOSED; |
| | | logger->error("sendto key %d failed, %s", key, bus_strerror(bus_errno)); |
| | | return EBUS_CLOSED; |
| | | // 检查key标记的状态 |
| | | auto it = shmQueueStMap->find(key); |
| | | if(it != shmQueueStMap->end()) { |
| | | if(it->second.status == SHM_QUEUE_ST_CLOSED) { |
| | | // key对应的状态是关闭的 |
| | | goto ERR_CLOSED; |
| | | } |
| | | } |
| | | |
| | | |
| | | remoteQueue = shm_socket_attach_queue(key); |
| | | |
| | | if (remoteQueue == NULL ) { |
| | | goto ERR_CLOSED; |
| | | } |
| | | |
| | | rv = remoteQueue->push(*sendpak, timeout, flag); |
| | | |
| | | return rv; |
| | | |
| | | ERR_CLOSED: |
| | | logger->error("sendto key %d failed, %s", key, bus_strerror(EBUS_CLOSED)); |
| | | return EBUS_CLOSED; |
| | | |
| | | } |
| | | |
| | | // 短连接方式接受 |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout, int flag) { |
| | | int rv; |
| | | |
| | | shm_queue_status_t stRecord; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | shm_packet_t recvpak; |
| | | |
| | |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | // 标记key对应的状态 ,为opened |
| | | stRecord.status = SHM_QUEUE_ST_OPENED; |
| | | stRecord.createTime = time(NULL); |
| | | shmQueueStMap->insert({sockt->key, stRecord}); |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |
| | |
| | | |
| | | LABEL_POP: |
| | | |
| | | // |
| | | // printf("%p start recv.....\n", sockt); |
| | | // 检查key标记的状态 |
| | | // auto shmQueueMapIter = shmQueueStMap->find(sockt->key); |
| | | // if(shmQueueMapIter != shmQueueStMap->end()) { |
| | | // stRecord = shmQueueMapIter->second; |
| | | // if(stRecord.status = SHM_QUEUE_ST_CLOSED) { |
| | | // // key对应的状态是关闭的 |
| | | // goto ERR_CLOSED; |
| | | // } |
| | | // } |
| | | |
| | | rv = sockt->queue->pop(recvpak, timeout, flag); |
| | | if(rv != 0) |
| | |
| | | *_recvpak = recvpak; |
| | | return rv; |
| | | } |
| | | // int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf, |
| | | // const int send_size, const int send_key, void **recv_buf, |
| | | // int *recv_size, const struct timespec *timeout, int flags) { |
| | | |
| | | // struct timespec tm = {10, 0}; |
| | | // return _shm_sendandrecv_thread_local(sockt, send_buf, send_size, send_key,recv_buf, recv_size, &tm, flags); |
| | | // } |
| | | |