Add topics sub and request support.
| | |
| | | gNetmod_socket = net_mod_socket_open(); |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | key = hashtable_alloc_key(hashtable); |
| | | count = hashtable_alloc_key(hashtable); |
| | | rv = hashtable_alloc_key(hashtable); |
| | | net_mod_socket_int_set(gNetmod_socket, count); |
| | | net_mod_socket_svr_set(gNetmod_socket, rv); |
| | | sprintf(pData.int_info, "%d", count); |
| | | sprintf(pData.svr_info, "%d", rv); |
| | | net_mod_socket_bind(gNetmod_socket, key); |
| | | |
| | | rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG); |
| | |
| | | ::bhome_msg::MsgCommonReply mcr; |
| | | mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); |
| | | mcr.mutable_errmsg()->set_errstring(errString); |
| | | *reply_len=mcr.ByteSizeLong(); |
| | | *reply=malloc(*reply_len); |
| | | *reply_len = mcr.ByteSizeLong(); |
| | | *reply = malloc(*reply_len); |
| | | mcr.SerializePartialToArray(*reply,*reply_len); |
| | | #endif |
| | | |
| | |
| | | int val; |
| | | int len; |
| | | int min; |
| | | int data; |
| | | int sec, nsec; |
| | | std::string MsgID; |
| | | int timeout_ms = 3000; |
| | |
| | | strncpy(topics_buf + strlen(buf_temp) + 1, _input1.data, strlen(_input1.data)); |
| | | #endif |
| | | |
| | | data = net_mod_socket_svr_get(gNetmod_socket); |
| | | if (timeout_ms > 0) { |
| | | |
| | | sec = timeout_ms / 1000; |
| | | nsec = (timeout_ms - sec * 1000) * 1000 * 1000; |
| | | |
| | | rv = net_mod_socket_sendto_timeout(gNetmod_socket, topics_buf, len, val, sec, nsec); |
| | | rv = net_mod_socket_sendto_timeout(gNetmod_socket, topics_buf, len, val, sec, nsec, SVR_STR, data); |
| | | |
| | | } else if (timeout_ms == 0) { |
| | | |
| | | rv = net_mod_socket_sendto_nowait(gNetmod_socket, topics_buf, len, val); |
| | | rv = net_mod_socket_sendto_nowait(gNetmod_socket, topics_buf, len, val, SVR_STR, data); |
| | | |
| | | } else { |
| | | |
| | | rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val); |
| | | rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val, SVR_STR, data); |
| | | } |
| | | |
| | | free(topics_buf); |
| | |
| | | int size; |
| | | int val; |
| | | int min, len; |
| | | int data; |
| | | net_node_t node; |
| | | int node_size; |
| | | int recv_arr_size; |
| | |
| | | len += strlen(_input1.data); |
| | | #endif |
| | | |
| | | data = net_mod_socket_svr_get(gNetmod_socket); |
| | | topics_buf = (char *)malloc(len); |
| | | if (topics_buf == NULL) { |
| | | |
| | |
| | | int key; |
| | | int size; |
| | | int len; |
| | | int data; |
| | | int sec, nsec; |
| | | char buf_temp[MAX_STR_LEN] = { 0x00 }; |
| | | char *topics_buf = NULL; |
| | |
| | | return false; |
| | | } |
| | | |
| | | data = net_mod_socket_svr_get(gNetmod_socket); |
| | | if (timeout_ms > 0) { |
| | | |
| | | sec = timeout_ms / 1000; |
| | | nsec = (timeout_ms - sec * 1000) * 1000 * 1000; |
| | | |
| | | rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec); |
| | | rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec, SVR_STR, data); |
| | | |
| | | } else if (timeout_ms == 0) { |
| | | |
| | | rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key); |
| | | rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key, SVR_STR, data); |
| | | |
| | | } else { |
| | | |
| | | rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key); |
| | | rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key, SVR_STR, data); |
| | | } |
| | | |
| | | if (rv == 0) { |
| | |
| | | int BHSendReply(void *src, const void *reply, const int reply_len) |
| | | { |
| | | int rv; |
| | | int data; |
| | | const char *_input; |
| | | |
| | | #if defined(PRO_DE_SERIALIZE) |
| | |
| | | rv = pthread_mutex_trylock(&mutex); |
| | | if (rv == 0) { |
| | | |
| | | rv = net_mod_socket_sendto(gNetmod_socket, _input, strlen(_input), *(int *)src); |
| | | data = net_mod_socket_svr_get(gNetmod_socket); |
| | | rv = net_mod_socket_sendto(gNetmod_socket, _input, strlen(_input), *(int *)src, SVR_STR, data); |
| | | |
| | | memset(errString, 0x00, sizeof(errString)); |
| | | strncpy(errString, bus_strerror(rv), sizeof(errString)); |
| | |
| | | |
| | | // BUS key |
| | | #define SHM_BUS_KEY 8 |
| | | #define SHM_BUS_INT_KEY 9 |
| | | |
| | | // 网络代理key |
| | | #define SHM_NET_PROXY_KEY 99 |
| | |
| | | return shmModSocket.reg(pData, len, buf, size, timeout_ms, flag); |
| | | } |
| | | |
| | | void NetModSocket::int_set(int data) { |
| | | int_val = data; |
| | | } |
| | | |
| | | void NetModSocket::svr_set(int data) { |
| | | svr_val = data; |
| | | } |
| | | |
| | | int NetModSocket::int_get(void) { |
| | | return int_val; |
| | | } |
| | | |
| | | int NetModSocket::svr_get(void) { |
| | | return svr_val; |
| | | } |
| | | |
| | | // int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | // net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { |
| | | // return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1); |
| | |
| | | * @key 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int NetModSocket::sendto(const void *buf, const int size, const int key){ |
| | | return shmModSocket.sendto(buf, size, key); |
| | | int NetModSocket::sendto(const void *buf, const int size, const int key, int reset, int data_set){ |
| | | return shmModSocket.sendto(buf, size, key, 0, 0, reset, data_set); |
| | | } |
| | | |
| | | // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){ |
| | | int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec, int reset, int data_set){ |
| | | struct timespec timeout = {sec, nsec}; |
| | | return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG); |
| | | return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG, reset, data_set); |
| | | |
| | | } |
| | | |
| | | // 发送信息立刻返回。 |
| | | int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){ |
| | | return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG); |
| | | int NetModSocket::sendto_nowait(const void *buf, const int size, const int key, int reset, int data_set){ |
| | | return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG, reset, data_set); |
| | | |
| | | } |
| | | |
| | |
| | | * @key 从谁哪里收到的信息 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int NetModSocket::recvfrom(void **buf, int *size, int *key) { |
| | | int NetModSocket::recvfrom(void **buf, int *size, int *key, int reset, int data_set) { |
| | | |
| | | return shmModSocket.recvfrom(buf, size, key); |
| | | return shmModSocket.recvfrom(buf, size, key, 0, 0, reset, data_set); |
| | | |
| | | } |
| | | |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){ |
| | | int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec, int reset, int data_set){ |
| | | struct timespec timeout = {sec, nsec}; |
| | | return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG); |
| | | return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG, reset, data_set); |
| | | |
| | | } |
| | | |
| | | int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){ |
| | | return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG); |
| | | int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key, int reset, int data_set){ |
| | | return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG, reset, data_set); |
| | | } |
| | | |
| | | int NetModSocket::recvandsend(recvandsend_callback_fn callback, |
| | |
| | | private: |
| | | |
| | | ShmModSocket shmModSocket; |
| | | |
| | | int int_val; |
| | | int svr_val; |
| | | // pthread_mutex_t sendMutex; |
| | | |
| | | // request header 编码为网络传输的字节 |
| | |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, |
| | | net_mod_err_t ** _err_arr, int *_err_arr_size, int timeout); |
| | | |
| | | |
| | | void int_set(int data); |
| | | void svr_set(int data); |
| | | int int_get(void); |
| | | int svr_get(void); |
| | | /** |
| | | * 功能同sendandrecv |
| | | * 优点:线程安全 |
| | |
| | | // int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, |
| | | // net_mod_recv_msg_t ** recv_arr, int *recv_arr_size); |
| | | |
| | | |
| | | /** |
| | | * 发送信息 |
| | | * @key 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int sendto( const void *buf, const int size, const int key); |
| | | int sendto( const void *buf, const int size, const int key, int reset = 0, int data_set = 0); |
| | | // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int sendto_timeout( const void *buf, const int size, const int key, int sec, int nsec); |
| | | int sendto_timeout( const void *buf, const int size, const int key, int sec, int nsec, int reset = 0, int data_set = 0); |
| | | // 发送信息立刻返回。 |
| | | int sendto_nowait( const void *buf, const int size, const int key); |
| | | int sendto_nowait( const void *buf, const int size, const int key, int reset = 0, int data_set = 0); |
| | | |
| | | /** |
| | | * 接收信息 |
| | | * @key 从谁哪里收到的信息 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int recvfrom( void **buf, int *size, int *key); |
| | | int recvfrom( void **buf, int *size, int *key, int reset = 0, int data_set = 0); |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int recvfrom_timeout( void **buf, int *size, int *key, int sec, int nsec); |
| | | int recvfrom_nowait( void **buf, int *size, int *key); |
| | | int recvfrom_timeout( void **buf, int *size, int *key, int sec, int nsec, int reset = 0, int data_set = 0); |
| | | int recvfrom_nowait( void **buf, int *size, int *key, int reset = 0, int data_set = 0); |
| | | |
| | | /** |
| | | * 本地发送请求信息并等待接收应答 |
| | | * @key 发送给谁 |
| | |
| | | * @key 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key) { |
| | | int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key, int reset, int data_set) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->sendto(buf, size, key); |
| | | return sockt->sendto(buf, size, key, reset, data_set); |
| | | } |
| | | // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){ |
| | | int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec, int reset, int data_set){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->sendto_timeout(buf, size, key, sec, nsec); |
| | | return sockt->sendto_timeout(buf, size, key, sec, nsec, reset, data_set); |
| | | // return sockt->sendto(buf, size, key); |
| | | } |
| | | // 发送信息立刻返回。 |
| | | int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){ |
| | | int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key, int reset, int data_set){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->sendto_nowait(buf, size, key); |
| | | return sockt->sendto_nowait(buf, size, key, reset, data_set); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @port 从谁哪里收到的信息 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key){ |
| | | int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key, int reset, int data_set){ |
| | | int rv; |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | |
| | | rv = sockt->recvfrom(buf, size, key); |
| | | rv = sockt->recvfrom(buf, size, key, reset, data_set); |
| | | return rv; |
| | | } |
| | | |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){ |
| | | int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec, int reset, int data_set){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvfrom_timeout(buf, size, key, sec, nsec); |
| | | return sockt->recvfrom_timeout(buf, size, key, sec, nsec, reset, data_set); |
| | | } |
| | | |
| | | int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){ |
| | | int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key, int reset, int data_set){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->recvfrom_nowait(buf, size, key); |
| | | return sockt->recvfrom_nowait(buf, size, key, reset, data_set); |
| | | } |
| | | |
| | | int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | |
| | | return sockt->bind_proc_id(proc_id, len); |
| | | } |
| | | |
| | | void net_mod_socket_int_set(void * _socket, int data) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | sockt->int_set(data); |
| | | } |
| | | |
| | | void net_mod_socket_svr_set(void * _socket, int data) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | sockt->svr_set(data); |
| | | } |
| | | |
| | | int net_mod_socket_int_get(void * _socket) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->int_get(); |
| | | } |
| | | |
| | | int net_mod_socket_svr_get(void * _socket) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->svr_get(); |
| | | } |
| | | |
| | | /** |
| | | * 如果建立连接的节点没有接受到消息等待timeout的时间后返回 |
| | | * @timeout 等待时间,单位是千分之一秒 |
| | |
| | | * |
| | | * @return 0是成功, 其他值是失败的错误码 |
| | | */ |
| | | int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key); |
| | | int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key, int reset = 0, int data_set = 0); |
| | | |
| | | /** |
| | | * @brief 发送信息,在指定时间内没发送完成也返回。 |
| | |
| | | * |
| | | * @return 0是成功, 其他值是失败的错误码 |
| | | */ |
| | | int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec); |
| | | int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec, int reset = 0, int data_set = 0); |
| | | |
| | | /** |
| | | * @brief 发送信息,无论是否发送完成立刻返回。 |
| | |
| | | * |
| | | * @return 0是成功, 其他值是失败的错误码 |
| | | */ |
| | | int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key); |
| | | int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key, int reset = 0, int data_set = 0); |
| | | |
| | | /** |
| | | * @brief 等待接收信息,直到有消息接受到才返回 |
| | |
| | | * |
| | | * @return 0是成功, 其他值是失败的错误码 |
| | | */ |
| | | int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key); |
| | | int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key, int reset = 0, int data_set = 0); |
| | | |
| | | /** |
| | | * @brief 等待接收信息,在指定的时间内即使没有接受到消息也要返回 |
| | |
| | | * |
| | | * @return 0是成功, 其他值是失败的错误码 |
| | | */ |
| | | int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec); |
| | | int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec, int reset = 0, int data_set = 0); |
| | | |
| | | /** |
| | | * @brief 等待接收信息,直到有消息接受到才返回 |
| | |
| | | * |
| | | * @return 0是成功,其他值是失败的错误码 |
| | | */ |
| | | int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key); |
| | | int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key, int reset = 0, int data_set = 0); |
| | | |
| | | |
| | | |
| | | void net_mod_socket_int_set(void * _socket, int data); |
| | | void net_mod_socket_svr_set(void * _socket, int data); |
| | | int net_mod_socket_int_get(void * _socket); |
| | | int net_mod_socket_svr_get(void * _socket); |
| | | |
| | | /** |
| | | * @brief 跨机器发送消息并接受返回的应答消息,直到发送完成才返回 |
| | |
| | | char name[MAX_STR_LEN]; |
| | | char public_info[MAX_STR_LEN]; |
| | | char private_info[MAX_STR_LEN]; |
| | | char int_info[MAX_STR_LEN]; |
| | | char svr_info[MAX_STR_LEN]; |
| | | #endif |
| | | } ProcInfo; |
| | | |
| | |
| | | } |
| | | #endif |
| | | |
| | | #define INT_STR 0x01 |
| | | #define SVR_STR 0x02 |
| | | |
| | | #endif //end of file |
| | | |
| | |
| | | int count = 0; |
| | | int i = 0; |
| | | int len = 0; |
| | | int data1, data2; |
| | | char *data_ptr; |
| | | ProcInfo Data_stru; |
| | | ProcZone::iterator proc_iter; |
| | |
| | | |
| | | memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1); |
| | | count += strlen(buf + count) + 1; |
| | | |
| | | memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1); |
| | | count += strlen(buf + count) + 1; |
| | | |
| | | memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); |
| | | count += strlen(buf + count) + 1; |
| | | |
| | | } |
| | | |
| | | ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); |
| | |
| | | |
| | | if ((proc_iter = proc->find(key)) != proc->end()) { |
| | | |
| | | data1 = atoi((proc_iter->second).int_info); |
| | | data2 = atoi((proc_iter->second).svr_info); |
| | | BusServerSocket::_data_remove(data1, data2); |
| | | len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1); |
| | | strncpy(buf_temp, (proc_iter->second).proc_id, len); |
| | | proc->erase(proc_iter); |
| | |
| | | |
| | | free(last_buf); |
| | | } else if (flag == PROC_QUE_STCS) { |
| | | |
| | | SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); |
| | | ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); |
| | | |
| | | strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); |
| | | if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) { |
| | |
| | | |
| | | for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { |
| | | count = *svr_proc_iter; |
| | | if ((proc_iter = proc->find(count)) != proc->end()) { |
| | | count = atoi((proc_iter->second).svr_info); |
| | | } |
| | | |
| | | break; |
| | | } |
| | |
| | | |
| | | return rv; |
| | | } |
| | | |
| | | void BusServerSocket::_data_remove(int val1, int val2) { |
| | | |
| | | int i; |
| | | LockFreeQueue<shm_packet_t> *queue = NULL; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | void *data_ptr1 = hashtable_get(hashtable, val1); |
| | | void *data_ptr2 = hashtable_get(hashtable, val2); |
| | | if (data_ptr1 != NULL) { |
| | | if (data_ptr1 != (void *)1) { |
| | | queue = (LockFreeQueue<shm_packet_t> *)data_ptr1; |
| | | queue->close(); |
| | | for (i = 0; i < queue->size(); i++) { |
| | | mm_free((*queue)[i].buf); |
| | | } |
| | | sleep(1); |
| | | } |
| | | |
| | | hashtable_remove(hashtable, val1); |
| | | } |
| | | |
| | | if (data_ptr2 != NULL) { |
| | | if (data_ptr2 != (void *)1) { |
| | | queue = (LockFreeQueue<shm_packet_t> *)data_ptr2; |
| | | queue->close(); |
| | | for (i = 0; i < queue->size(); i++) { |
| | | mm_free((*queue)[i].buf); |
| | | } |
| | | sleep(1); |
| | | } |
| | | |
| | | hashtable_remove(hashtable, val2); |
| | | } |
| | | |
| | | } |
| | | |
| | |
| | | */ |
| | | int get_key() ; |
| | | |
| | | void _data_remove(int val1, int val2); |
| | | |
| | | }; |
| | | |
| | |
| | | * @key 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag) { |
| | | int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag); |
| | | int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag, int reset, int data_set) { |
| | | int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag, reset, data_set); |
| | | if(rv == 0) { |
| | | logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key); |
| | | return 0; |
| | |
| | | * @key 从谁哪里收到的信息 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) { |
| | | int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag, int reset, int data_set) { |
| | | |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag); |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag, reset, data_set); |
| | | |
| | | if(rv == 0) { |
| | | logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); |
| | |
| | | |
| | | int bind_proc_id(char *buf, int len); |
| | | int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag); |
| | | /** |
| | | * 发送信息 |
| | | * @key 发送给谁 |
| | | * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | |
| | | int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0); |
| | | |
| | | |
| | | int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0, int reset = 0, int data_set = 0); |
| | | |
| | | /** |
| | | * 接收信息 |
| | | * @key 从谁哪里收到的信息 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int recvfrom(void **buf, int *size, int *key, const struct timespec *timeout = NULL, int flag = 0); |
| | | int recvfrom(void **buf, int *size, int *key, const struct timespec *timeout = NULL, int flag = 0, int reset = 0, int data_set = 0); |
| | | |
| | | /** |
| | | * 发送请求信息并等待接收应答 |
| | | * @key 发送给谁 |
| | |
| | | static void _destrory_threadlocal_socket_(void *tmp_socket); |
| | | static void _create_threadlocal_socket_key_(void); |
| | | |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak , const struct timespec *timeout, int flag); |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak , const struct timespec *timeout, |
| | | int flag, int reset = 0, int data_set = 0); |
| | | |
| | | |
| | | static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, |
| | | const int key, const struct timespec *timeout, const int flag); |
| | | static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, const int key, const struct timespec *timeout, |
| | | const int flag, int reset = 0, int data_set = 0); |
| | | |
| | | |
| | | static int _shm_sendandrecv_uuid(shm_socket_t *sockt, const void *send_buf, |
| | |
| | | } |
| | | |
| | | // 短连接方式发送 |
| | | int shm_sendto(shm_socket_t *sockt, const void *buf, const int size, |
| | | const int key, const struct timespec *timeout, const int flag) { |
| | | int shm_sendto(shm_socket_t *sockt, const void *buf, const int size, const int key, const struct timespec *timeout, |
| | | const int flag, int reset, int data_set) { |
| | | |
| | | int rv; |
| | | |
| | | shm_packet_t sendpak = {0}; |
| | | sendpak.key = sockt->key; |
| | | if (reset == 0) { |
| | | sendpak.key = sockt->key; |
| | | } else { |
| | | sendpak.key = data_set; |
| | | } |
| | | sendpak.size = size; |
| | | if(buf != NULL) { |
| | | sendpak.buf = mm_malloc(size); |
| | | memcpy(sendpak.buf, buf, size); |
| | | } |
| | | |
| | | rv = shm_sendpakto(sockt, &sendpak, key, timeout, flag); |
| | | rv = shm_sendpakto(sockt, &sendpak, key, timeout, flag, reset, data_set); |
| | | return rv; |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | // 短连接方式接受 |
| | | int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key, const struct timespec *timeout, int flag) { |
| | | int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key, const struct timespec *timeout, int flag, int reset, int data_set) { |
| | | int rv; |
| | | |
| | | shm_packet_t recvpak; |
| | | rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag); |
| | | rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag, reset, data_set); |
| | | |
| | | if (rv != 0) { |
| | | |
| | |
| | | |
| | | |
| | | |
| | | static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, |
| | | const int key, const struct timespec *timeout, const int flag) { |
| | | static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, const int key, const struct timespec *timeout, |
| | | const int flag, int reset, int data_set) { |
| | | |
| | | int rv; |
| | | shm_queue_status_t stRecord; |
| | | LockFreeQueue<shm_packet_t> *remoteQueue; |
| | | LockFreeQueue<shm_packet_t> *fixedQueue; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | if( sockt->queue != NULL) |
| | | if ((reset != 0) && (data_set == 0)) { |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | if (reset != 0) { |
| | | fixedQueue = shm_socket_attach_queue(data_set); |
| | | } |
| | | |
| | | if (((reset == 0) && (sockt->queue != NULL)) || ((reset != 0) && (fixedQueue != NULL))) |
| | | goto LABEL_PUSH; |
| | | |
| | | // if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) { |
| | |
| | | if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_sendto : pthread_mutex_lock"); |
| | | |
| | | if (sockt->queue == NULL) { |
| | | if ((sockt->queue == NULL) && (reset == 0)) { |
| | | if (sockt->key == 0) { |
| | | sockt->key = hashtable_alloc_key(hashtable); |
| | | } |
| | |
| | | // stRecord.createTime = time(NULL); |
| | | // shmQueueStMap->insert({sockt->key, stRecord}); |
| | | |
| | | } |
| | | |
| | | if ((fixedQueue == NULL) && (reset != 0)) { |
| | | fixedQueue = shm_socket_bind_queue(data_set, false); |
| | | if (fixedQueue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_sendto : pthread_mutex_unlock"); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | } |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | |
| | | goto ERR_CLOSED; |
| | | } |
| | | |
| | | sendpak->key = sockt->key; |
| | | if (reset == 0) { |
| | | sendpak->key = sockt->key; |
| | | } |
| | | rv = remoteQueue->push(*sendpak, timeout, flag); |
| | | |
| | | if(rv != 0) { |
| | |
| | | } |
| | | |
| | | // 短连接方式接受 |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout, int flag) { |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout, |
| | | int flag, int reset, int data_set) { |
| | | int rv; |
| | | shm_queue_status_t stRecord; |
| | | LockFreeQueue<shm_packet_t> *fixedQueue; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | shm_packet_t recvpak; |
| | | |
| | | if( sockt->queue != NULL) |
| | | if ((reset != 0) && (data_set == 0)) { |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | if (reset != 0) { |
| | | fixedQueue = shm_socket_attach_queue(data_set); |
| | | } |
| | | |
| | | if (((sockt->queue != NULL) && (reset == 0)) || ((reset != 0) && (fixedQueue != NULL))) |
| | | goto LABEL_POP; |
| | | |
| | | // if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) { |
| | |
| | | if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_lock"); |
| | | |
| | | if (sockt->key == 0) { |
| | | sockt->key = hashtable_alloc_key(hashtable); |
| | | } |
| | | sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); |
| | | if(sockt->queue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |
| | | return EBUS_KEY_INUSED; |
| | | if ((sockt->queue == NULL) && (reset == 0)) { |
| | | if (sockt->key == 0) { |
| | | sockt->key = hashtable_alloc_key(hashtable); |
| | | } |
| | | sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); |
| | | if(sockt->queue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | // 标记key对应的状态 ,为opened |
| | | // stRecord.status = SHM_QUEUE_ST_OPENED; |
| | | // stRecord.createTime = time(NULL); |
| | | // shmQueueStMap->insert({sockt->key, stRecord}); |
| | | } |
| | | |
| | | // 标记key对应的状态 ,为opened |
| | | // stRecord.status = SHM_QUEUE_ST_OPENED; |
| | | // stRecord.createTime = time(NULL); |
| | | // shmQueueStMap->insert({sockt->key, stRecord}); |
| | | |
| | | if ((fixedQueue == NULL) && (reset != 0)) { |
| | | fixedQueue = shm_socket_bind_queue(data_set, false); |
| | | if (fixedQueue == NULL ) { |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_sendto : pthread_mutex_unlock"); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | } |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |
| | |
| | | |
| | | LABEL_POP: |
| | | |
| | | rv = sockt->queue->pop(recvpak, timeout, flag); |
| | | if (reset == 0) { |
| | | rv = sockt->queue->pop(recvpak, timeout, flag); |
| | | } else { |
| | | rv = fixedQueue->pop(recvpak, timeout, flag); |
| | | } |
| | | if(rv != 0) { |
| | | if(rv == ETIMEDOUT) { |
| | | return EBUS_TIMEOUT; |
| | |
| | | count += strlen(ptr->public_info) + 1; |
| | | memcpy(dst + count, ptr->private_info, strlen(ptr->private_info) + 1); |
| | | count += strlen(ptr->private_info) + 1; |
| | | memcpy(dst + count, ptr->int_info, strlen(ptr->int_info) + 1); |
| | | count += strlen(ptr->int_info) + 1; |
| | | memcpy(dst + count, ptr->svr_info, strlen(ptr->svr_info) + 1); |
| | | count += strlen(ptr->svr_info) + 1; |
| | | |
| | | *counter = count; |
| | | } |
| | |
| | | /** |
| | | * @flags : BUS_NOWAIT_FLAG |
| | | */ |
| | | int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0); |
| | | int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0, int reset = 0, int data_set = 0); |
| | | |
| | | int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key, const struct timespec * timeout = NULL, int flags=0); |
| | | int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key, const struct timespec * timeout = NULL, int flags=0, int reset = 0, int data_set = 0); |
| | | |
| | | int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, |
| | | const struct timespec * timeout = NULL, int flags = 0); |