Fix the data parsing when in multiple threads.
| | |
| | | if (mtr_list_num > sizeof(mtr_list) / sizeof(mtr_list[0])) { |
| | | mtr_list_num = sizeof(mtr_list) / sizeof(mtr_list[0]); |
| | | } |
| | | |
| | | |
| | | Proc_ptr = &(ptr->procData); |
| | | for(int i = 0; i < mtr_list_num; i++) { |
| | | mtr_list[i].proc_id = ptr->procData.proc_id; |
| | | mtr_list[i].proc_id = (Proc_ptr + i)->proc_id; |
| | | mtr_list[i].mq_id = ID_RSV; |
| | | mtr_list[i].abs_addr = ABS_ID_RSV; |
| | | mtr_list[i].ip = "127.0.0.1"; |
| | |
| | | int sec, nsec; |
| | | std::string MsgID; |
| | | int timeout_ms = 3000; |
| | | char data_buf[MAX_STR_LEN] = { 0x00 }; |
| | | char buf_temp[MAX_STR_LEN] = { 0x00 }; |
| | | char *topics_buf = NULL; |
| | | |
| | |
| | | rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS); |
| | | if (rv == 0) { |
| | | |
| | | val = atoi((char *)buf); |
| | | len = size > (sizeof(data_buf) - 1) ? (sizeof(data_buf) - 1) : size; |
| | | memcpy(data_buf, (char *)buf, len); |
| | | val = atoi((char *)data_buf); |
| | | |
| | | free(buf); |
| | | |
| | |
| | | net_node_t node; |
| | | int node_size; |
| | | int recv_arr_size; |
| | | char data_buf[MAX_STR_LEN] = { 0x00 }; |
| | | net_mod_recv_msg_t *recv_arr; |
| | | net_mod_err_t *errarr; |
| | | int errarr_size = 0; |
| | |
| | | rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS); |
| | | if (rv == 0) { |
| | | |
| | | val = atoi((char *)buf); |
| | | len = size > (sizeof(data_buf) - 1) ? (sizeof(data_buf) - 1) : size; |
| | | memcpy(data_buf, (char *)buf, len); |
| | | val = atoi((char *)data_buf); |
| | | |
| | | free(buf); |
| | | |
| | |
| | | len += strlen(_input1.data); |
| | | #endif |
| | | |
| | | data = net_mod_socket_svr_get(gNetmod_socket); |
| | | topics_buf = (char *)malloc(len); |
| | | if (topics_buf == NULL) { |
| | | |
| | |
| | | return NULL; |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | void *svr_start(void *skptr) { |
| | | int port = *(int *)skptr; |
| | | |
| | |
| | | return shmModSocket.force_bind(key); |
| | | } |
| | | |
| | | int NetModSocket::bind_proc_id(char *buf, int len) { |
| | | return shmModSocket.bind_proc_id(buf, len); |
| | | } |
| | | |
| | | int NetModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) { |
| | | |
| | | return shmModSocket.reg(pData, len, buf, size, timeout_ms, flag); |
| | |
| | | */ |
| | | int force_bind( int key); |
| | | |
| | | int bind_proc_id(char *buf, int len); |
| | | int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag); |
| | | |
| | | /** |
| | |
| | | return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, err_arr, err_arr_size, -1); |
| | | } |
| | | |
| | | int net_mod_socket_bind_proc_id(void * _socket, char *proc_id, int len){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->bind_proc_id(proc_id, len); |
| | | } |
| | | |
| | | void net_mod_socket_int_set(void * _socket, int data) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | sockt->int_set(data); |
| | |
| | | goto suc; |
| | | } |
| | | val = _hashtable_get(hashtable, key); |
| | | // val = 1是allockey的情况 |
| | | if(val != NULL && val != (void *)1) |
| | | goto fail; |
| | | |
| | |
| | | |
| | | data1 = atoi((proc_iter->second).int_info); |
| | | data2 = atoi((proc_iter->second).svr_info); |
| | | BusServerSocket::_data_remove(data1, data2); |
| | | BusServerSocket::_data_remove(data1); |
| | | BusServerSocket::_data_remove(data2); |
| | | BusServerSocket::_data_remove(key); |
| | | len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1); |
| | | strncpy(buf_temp, (proc_iter->second).proc_id, len); |
| | | proc->erase(proc_iter); |
| | |
| | | ProcDataZone::iterator proc_que_iter; |
| | | ProcDataZone *procQuePart = shm_mm_attach<ProcDataZone>(SHM_QUEUE_ST_SET); |
| | | |
| | | int rv; |
| | | char send_buf[512] = { 0x00 }; |
| | | int rv; |
| | | char send_buf[512] = { 0x00 }; |
| | | |
| | | const char *topic_delim = ","; |
| | | const char *topic_delim = ","; |
| | | while((rv = shm_recvfrom(shm_socket, (void **)&buf, &size, &key)) == 0) { |
| | | head = ShmModSocket::decode_bus_head(buf); |
| | | topics = buf + BUS_HEAD_SIZE; |
| | |
| | | _proxy_reg(topics, head.topic_size, content, head.content_size, key, flag); |
| | | |
| | | } |
| | | else if (strncmp(buf, "request", strlen("request")) == 0) { |
| | | sprintf(send_buf, "%4d", key); |
| | | strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4)); |
| | | |
| | | rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key); |
| | | if(rv != 0) { |
| | | logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n"); |
| | | } |
| | | } |
| | | else if(strcmp(action, "stop") == 0) { |
| | | free(buf); |
| | | break; |
| | | } else { |
| | | logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action); |
| | | } |
| | | free(buf); |
| | | } |
| | | free(buf); |
| | | break; |
| | | } else { |
| | | logger->error( "BusServerSocket::_run_proxy_ : unrecognized action %s", action); |
| | | } |
| | | free(buf); |
| | | } |
| | | |
| | | |
| | | return rv; |
| | | return rv; |
| | | } |
| | | |
| | | void BusServerSocket::_data_remove(int val1, int val2) { |
| | | void BusServerSocket::_data_remove(int val) { |
| | | |
| | | int i; |
| | | LockFreeQueue<shm_packet_t> *queue = NULL; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | void *data_ptr1 = hashtable_get(hashtable, val1); |
| | | void *data_ptr2 = hashtable_get(hashtable, val2); |
| | | if (data_ptr1 != NULL) { |
| | | if (data_ptr1 != (void *)1) { |
| | | queue = (LockFreeQueue<shm_packet_t> *)data_ptr1; |
| | | void *data_ptr = hashtable_get(hashtable, val); |
| | | if (data_ptr != NULL) { |
| | | if (data_ptr != (void *)1) { |
| | | queue = (LockFreeQueue<shm_packet_t> *)data_ptr; |
| | | queue->close(); |
| | | for (i = 0; i < queue->size(); i++) { |
| | | mm_free((*queue)[i].buf); |
| | |
| | | sleep(1); |
| | | } |
| | | |
| | | hashtable_remove(hashtable, val1); |
| | | } |
| | | |
| | | if (data_ptr2 != NULL) { |
| | | if (data_ptr2 != (void *)1) { |
| | | queue = (LockFreeQueue<shm_packet_t> *)data_ptr2; |
| | | queue->close(); |
| | | for (i = 0; i < queue->size(); i++) { |
| | | mm_free((*queue)[i].buf); |
| | | } |
| | | sleep(1); |
| | | } |
| | | |
| | | hashtable_remove(hashtable, val2); |
| | | hashtable_remove(hashtable, val); |
| | | } |
| | | |
| | | } |
| | |
| | | */ |
| | | int get_key() ; |
| | | |
| | | void _data_remove(int val1, int val2); |
| | | void _data_remove(int val); |
| | | |
| | | }; |
| | | |
| | |
| | | return shm_socket_force_bind(shm_socket, key); |
| | | } |
| | | |
| | | int ShmModSocket::bind_proc_id(char *buf, int len) { |
| | | return shm_socket_bind_proc_id(shm_socket, buf, len); |
| | | } |
| | | |
| | | int ShmModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) |
| | | { |
| | | int ret; |
| | |
| | | */ |
| | | int force_bind(int key); |
| | | |
| | | int bind_proc_id(char *buf, int len); |
| | | int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag); |
| | | |
| | | int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0, int reset = 0, int data_set = 0); |
| | |
| | | return 0; |
| | | } |
| | | |
| | | int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len) { |
| | | strncpy(sockt->proc_id, buf, len > MAX_STR_LEN ? MAX_STR_LEN : len); |
| | | |
| | | return 0; |
| | | } |
| | | |
| | | int shm_socket_get_key(shm_socket_t *sockt){ |
| | | return sockt->key; |
| | | } |
| | | |
| | | int shm_socket_get_procid(shm_socket_t *sockt, char *buf, int len) { |
| | | strncpy(buf, sockt->proc_id, len); |
| | | |
| | | return 0; |
| | | } |
| | | |
| | | // 短连接方式发送 |
| | |
| | | tryn--; |
| | | recvbufIter = tmp_socket->recvbuf2.find(key); |
| | | if(recvbufIter != tmp_socket->recvbuf2.end()) { |
| | | // 在缓存里查到了key匹配成功的 |
| | | recvpak = recvbufIter->second; |
| | | tmp_socket->recvbuf2.erase(recvbufIter); |
| | | tmp_socket->recvbuf2.erase(key); |
| | | goto LABLE_SUC; |
| | | } |
| | | |
| | |
| | | } else { |
| | | // 答非所问,放到缓存里 |
| | | tmp_socket->recvbuf2.insert({recvpak.key, recvpak}); |
| | | exit(0); |
| | | continue; |
| | | } |
| | | } |
| | |
| | | |
| | | size_t size; |
| | | void * buf; |
| | | char uuid[64]; |
| | | char uuid[1]; |
| | | int action; |
| | | |
| | | } shm_packet_t; |
| | |
| | | typedef struct shm_socket_t { |
| | | shm_socket_type_t socket_type; |
| | | int key; |
| | | char proc_id[MAX_STR_LEN]; |
| | | bool force_bind; |
| | | pthread_mutex_t mutex; |
| | | |
| | |
| | | |
| | | int shm_socket_force_bind(shm_socket_t * socket, int key) ; |
| | | |
| | | int shm_socket_bind_proc_id(shm_socket_t *sockt, const char *buf, int len); |
| | | /** |
| | | * @flags : BUS_NOWAIT_FLAG |
| | | */ |