| | |
| | | #include <cassert> |
| | | #include "bus_error.h" |
| | | #include "sole.h" |
| | | #include "shm_mm.h" |
| | | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | | |
| | | ShmQueueStMap * shmQueueStMap ; |
| | | |
| | | static void print_msg(char *head, shm_packet_t &msg) { |
| | | // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type); |
| | |
| | | if (s != 0) |
| | | err_exit(s, "pthread_mutexattr_destroy"); |
| | | |
| | | |
| | | shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); |
| | | |
| | | return sockt; |
| | | } |
| | | |
| | |
| | | |
| | | int rv; |
| | | logger->debug("shm_socket_close\n"); |
| | | if(sockt->queue != NULL) { |
| | | delete sockt->queue; |
| | | sockt->queue = NULL; |
| | | } |
| | | // if(sockt->queue != NULL) { |
| | | // delete sockt->queue; |
| | | // sockt->queue = NULL; |
| | | // } |
| | | |
| | | rv = pthread_mutex_destroy(&(sockt->mutex) ); |
| | | if(rv != 0) { |
| | |
| | | } |
| | | |
| | | free(sockt); |
| | | |
| | | auto it = shmQueueStMap.find(key); |
| | | if(it != shmQueueStMap.end()) { |
| | | it->second.status = SHM_QUEUE_ST_CLOSED |
| | | it->second.closeTime = time(NULL); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | |
| | | const int key, const struct timespec *timeout, const int flag) { |
| | | |
| | | int rv; |
| | | shm_queue_status_t stRecord; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | if( sockt->queue != NULL) |
| | |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | // 标记key对应的状态 ,为opened |
| | | stRecord.status = SHM_QUEUE_ST_OPENED; |
| | | stRecord.createTime = time(NULL); |
| | | shmQueueStMap.insert({sockt->key, stRecord}); |
| | | |
| | | } |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | |
| | | return EBUS_SENDTO_SELF; |
| | | } |
| | | |
| | | LockFreeQueue<shm_packet_t> *remoteQueue; |
| | | if ((remoteQueue = shm_socket_attach_queue(key)) == NULL) { |
| | | bus_errno = EBUS_CLOSED; |
| | | logger->error("sendto key %d failed, %s", key, bus_strerror(bus_errno)); |
| | | return EBUS_CLOSED; |
| | | // 检查key标记的状态 |
| | | auto it = shmQueueStMap.find(key); |
| | | if(it != shmQueueStMap.end()) { |
| | | if(it->second.status == SHM_QUEUE_ST_CLOSED) { |
| | | // key对应的状态是关闭的 |
| | | goto ERR_CLOSED; |
| | | } |
| | | } |
| | | |
| | | LockFreeQueue<shm_packet_t> *remoteQueue = shm_socket_attach_queue(key); |
| | | |
| | | if (remoteQueue == NULL ) { |
| | | goto ERR_CLOSED; |
| | | } |
| | | |
| | | rv = remoteQueue->push(*sendpak, timeout, flag); |
| | | |
| | | return rv; |
| | | |
| | | ERR_CLOSED: |
| | | logger->error("sendto key %d failed, %s", key, bus_strerror(EBUS_CLOSED)); |
| | | return EBUS_CLOSED; |
| | | |
| | | } |
| | | |
| | | // 短连接方式接受 |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout, int flag) { |
| | | int rv; |
| | | |
| | | shm_queue_status_t stRecord; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | shm_packet_t recvpak; |
| | | |
| | |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | // 标记key对应的状态 ,为opened |
| | | stRecord.status = SHM_QUEUE_ST_OPENED; |
| | | stRecord.createTime = time(NULL); |
| | | shmQueueStMap.insert({sockt->key, stRecord}); |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |
| | |
| | | |
| | | LABEL_POP: |
| | | |
| | | // |
| | | // printf("%p start recv.....\n", sockt); |
| | | // 检查key标记的状态 |
| | | // auto shmQueueMapIter = shmQueueStMap.find(sockt->key); |
| | | // if(shmQueueMapIter != shmQueueStMap.end()) { |
| | | // stRecord = shmQueueMapIter->second; |
| | | // if(stRecord.status = SHM_QUEUE_ST_CLOSED) { |
| | | // // key对应的状态是关闭的 |
| | | // goto ERR_CLOSED; |
| | | // } |
| | | // } |
| | | |
| | | rv = sockt->queue->pop(recvpak, timeout, flag); |
| | | if(rv != 0) |
| | |
| | | *_recvpak = recvpak; |
| | | return rv; |
| | | } |
| | | // int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf, |
| | | // const int send_size, const int send_key, void **recv_buf, |
| | | // int *recv_size, const struct timespec *timeout, int flags) { |
| | | |
| | | // struct timespec tm = {10, 0}; |
| | | // return _shm_sendandrecv_thread_local(sockt, send_buf, send_size, send_key,recv_buf, recv_size, &tm, flags); |
| | | // } |