| | |
| | | static void _destrory_threadlocal_socket_(void *tmp_socket); |
| | | static void _create_threadlocal_socket_key_(void); |
| | | |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak , const struct timespec *timeout, int flag); |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak , const struct timespec *timeout, |
| | | int flag, int reset = 0, int data_set = 0); |
| | | |
| | | |
| | | static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, |
| | | const int key, const struct timespec *timeout, const int flag); |
| | | static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, const int key, const struct timespec *timeout, |
| | | const int flag, int reset = 0, int data_set = 0); |
| | | |
| | | |
| | | static int _shm_sendandrecv_uuid(shm_socket_t *sockt, const void *send_buf, |
| | |
| | | void *tmp_ptr = hashtable_get(hashtable, key); |
| | | |
| | | if (tmp_ptr == NULL || tmp_ptr == (void *)1 ) { |
| | | queue = new LockFreeQueue<shm_packet_t>(32); |
| | | queue = new LockFreeQueue<shm_packet_t>(LOCK_FREE_Q_DEFAULT_SIZE); |
| | | hashtable_put(hashtable, key, (void *)queue); |
| | | return queue; |
| | | } else if(force) { |
| | |
| | | int s, type; |
| | | pthread_mutexattr_t mtxAttr; |
| | | |
| | | logger->debug("shm_socket_open\n"); |
| | | // shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); |
| | | shm_socket_t *sockt = new shm_socket_t; |
| | | sockt->socket_type = socket_type; |
| | |
| | | } |
| | | |
| | | // 短连接方式发送 |
| | | int shm_sendto(shm_socket_t *sockt, const void *buf, const int size, |
| | | const int key, const struct timespec *timeout, const int flag) { |
| | | int shm_sendto(shm_socket_t *sockt, const void *buf, const int size, const int key, const struct timespec *timeout, |
| | | const int flag, int reset, int data_set) { |
| | | |
| | | int rv; |
| | | |
| | | shm_packet_t sendpak = {0}; |
| | | sendpak.key = sockt->key; |
| | | if (reset == 0) { |
| | | sendpak.key = sockt->key; |
| | | } else { |
| | | sendpak.key = data_set; |
| | | } |
| | | sendpak.size = size; |
| | | if(buf != NULL) { |
| | | sendpak.buf = mm_malloc(size); |
| | | memcpy(sendpak.buf, buf, size); |
| | | } |
| | | |
| | | rv = shm_sendpakto(sockt, &sendpak, key, timeout, flag); |
| | | rv = shm_sendpakto(sockt, &sendpak, key, timeout, flag, reset, data_set); |
| | | return rv; |
| | | } |
| | | |
| | |
| | | |
| | | if (rv != 0) { |
| | | if(rv == ETIMEDOUT){ |
| | | logger->debug("%d shm_recvandsend failed %s", shm_socket_get_key(sockt), bus_strerror(EBUS_TIMEOUT)); |
| | | logger->error("%d shm_recvandsend failed %s", shm_socket_get_key(sockt), bus_strerror(EBUS_TIMEOUT)); |
| | | return EBUS_TIMEOUT; |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | // 短连接方式接受 |
| | | int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key, const struct timespec *timeout, int flag) { |
| | | int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key, const struct timespec *timeout, int flag, int reset, int data_set) { |
| | | int rv; |
| | | |
| | | shm_packet_t recvpak; |
| | | rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag); |
| | | rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag, reset, data_set); |
| | | |
| | | if (rv != 0) { |
| | | |
| | | logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); |
| | | logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); |
| | | return rv; |
| | | |
| | | } |
| | |
| | | recvbufIter = sockt->recvbuf.find(uuid); |
| | | if(recvbufIter != sockt->recvbuf.end()) { |
| | | // 在缓存里查到了UUID匹配成功的 |
| | | logger->debug("get from recvbuf: %s", uuid.c_str()); |
| | | recvpak = recvbufIter->second; |
| | | sockt->recvbuf.erase(recvbufIter); |
| | | goto LABLE_SUC; |
| | |
| | | return EBUS_TIMEOUT; |
| | | } |
| | | |
| | | logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); |
| | | logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | |
| | | logger->debug("send uuid:%s, recv uuid: %s", uuid.c_str(), recvpak.uuid); |
| | | if(strlen(recvpak.uuid) == 0) { |
| | | continue; |
| | | } else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof(recvpak.uuid)) == 0) { |
| | |
| | | rv = shm_recvpakfrom(tmp_socket, &recvpak, timeout, flags); |
| | | |
| | | if (rv != 0) { |
| | | logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv)); |
| | | logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | |
| | |
| | | |
| | | |
| | | |
| | | static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, |
| | | const int key, const struct timespec *timeout, const int flag) { |
| | | static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, const int key, const struct timespec *timeout, |
| | | const int flag, int reset, int data_set) { |
| | | |
| | | int rv; |
| | | shm_queue_status_t stRecord; |
| | | LockFreeQueue<shm_packet_t> *remoteQueue; |
| | | LockFreeQueue<shm_packet_t> *fixedQueue; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | if( sockt->queue != NULL) |
| | | if ((reset != 0) && (data_set == 0)) { |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | if (reset != 0) { |
| | | fixedQueue = shm_socket_attach_queue(data_set); |
| | | } |
| | | |
| | | if (((reset == 0) && (sockt->queue != NULL)) || ((reset != 0) && (fixedQueue != NULL))) |
| | | goto LABEL_PUSH; |
| | | |
| | | // if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) { |
| | |
| | | if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_sendto : pthread_mutex_lock"); |
| | | |
| | | if (sockt->queue == NULL) { |
| | | if ((sockt->queue == NULL) && (reset == 0)) { |
| | | if (sockt->key == 0) { |
| | | sockt->key = hashtable_alloc_key(hashtable); |
| | | } |
| | |
| | | // stRecord.createTime = time(NULL); |
| | | // shmQueueStMap->insert({sockt->key, stRecord}); |
| | | |
| | | } |
| | | |
| | | if ((fixedQueue == NULL) && (reset != 0)) { |
| | | fixedQueue = shm_socket_bind_queue(data_set, false); |
| | | if (fixedQueue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_sendto : pthread_mutex_unlock"); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | } |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | |
| | | goto ERR_CLOSED; |
| | | } |
| | | |
| | | sendpak->key = sockt->key; |
| | | if (reset == 0) { |
| | | sendpak->key = sockt->key; |
| | | } |
| | | rv = remoteQueue->push(*sendpak, timeout, flag); |
| | | |
| | | if(rv != 0) { |
| | |
| | | } |
| | | |
| | | // 短连接方式接受 |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout, int flag) { |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout, |
| | | int flag, int reset, int data_set) { |
| | | int rv; |
| | | shm_queue_status_t stRecord; |
| | | LockFreeQueue<shm_packet_t> *fixedQueue; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | shm_packet_t recvpak; |
| | | |
| | | if( sockt->queue != NULL) |
| | | if ((reset != 0) && (data_set == 0)) { |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | if (reset != 0) { |
| | | fixedQueue = shm_socket_attach_queue(data_set); |
| | | } |
| | | |
| | | if (((sockt->queue != NULL) && (reset == 0)) || ((reset != 0) && (fixedQueue != NULL))) |
| | | goto LABEL_POP; |
| | | |
| | | // if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) { |
| | |
| | | if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_lock"); |
| | | |
| | | if (sockt->key == 0) { |
| | | sockt->key = hashtable_alloc_key(hashtable); |
| | | } |
| | | sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); |
| | | if(sockt->queue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |
| | | return EBUS_KEY_INUSED; |
| | | if ((sockt->queue == NULL) && (reset == 0)) { |
| | | if (sockt->key == 0) { |
| | | sockt->key = hashtable_alloc_key(hashtable); |
| | | } |
| | | sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); |
| | | if(sockt->queue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | // 标记key对应的状态 ,为opened |
| | | // stRecord.status = SHM_QUEUE_ST_OPENED; |
| | | // stRecord.createTime = time(NULL); |
| | | // shmQueueStMap->insert({sockt->key, stRecord}); |
| | | } |
| | | |
| | | // 标记key对应的状态 ,为opened |
| | | // stRecord.status = SHM_QUEUE_ST_OPENED; |
| | | // stRecord.createTime = time(NULL); |
| | | // shmQueueStMap->insert({sockt->key, stRecord}); |
| | | |
| | | if ((fixedQueue == NULL) && (reset != 0)) { |
| | | fixedQueue = shm_socket_bind_queue(data_set, false); |
| | | if (fixedQueue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_sendto : pthread_mutex_unlock"); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | } |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |
| | |
| | | |
| | | LABEL_POP: |
| | | |
| | | rv = sockt->queue->pop(recvpak, timeout, flag); |
| | | if (reset == 0) { |
| | | rv = sockt->queue->pop(recvpak, timeout, flag); |
| | | } else { |
| | | rv = fixedQueue->pop(recvpak, timeout, flag); |
| | | } |
| | | if(rv != 0) { |
| | | if(rv == ETIMEDOUT) { |
| | | return EBUS_TIMEOUT; |
| | |
| | | count += strlen(ptr->public_info) + 1; |
| | | memcpy(dst + count, ptr->private_info, strlen(ptr->private_info) + 1); |
| | | count += strlen(ptr->private_info) + 1; |
| | | memcpy(dst + count, ptr->int_info, strlen(ptr->int_info) + 1); |
| | | count += strlen(ptr->int_info) + 1; |
| | | memcpy(dst + count, ptr->svr_info, strlen(ptr->svr_info) + 1); |
| | | count += strlen(ptr->svr_info) + 1; |
| | | |
| | | *counter = count; |
| | | } |