From dc01e4cbb01e96d19b470a366bbe648d426ed171 Mon Sep 17 00:00:00 2001 From: fujuntang <fujuntang@smartai.com> Date: 星期六, 11 九月 2021 10:06:15 +0800 Subject: [PATCH] Add topics sub and request support. --- src/key_def.h | 1 src/socket/shm_socket.h | 4 src/net/net_mod_socket.cpp | 40 ++++- src/socket/shm_socket.cpp | 118 ++++++++++++---- src/socket/shm_mod_socket.h | 19 -- src/socket/bus_server_socket.cpp | 53 +++++++ src/net/net_mod_socket_wrapper.cpp | 44 ++++- src/net/net_mod_socket_wrapper.h | 18 +- src/net/net_mod_socket.h | 22 +- src/socket/bus_server_socket.h | 1 src/bh_api.cpp | 32 +++- src/proc_def.h | 4 src/socket/shm_mod_socket.cpp | 8 13 files changed, 262 insertions(+), 102 deletions(-) diff --git a/src/bh_api.cpp b/src/bh_api.cpp index 4f623f8..d25bf8a 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -139,6 +139,12 @@ gNetmod_socket = net_mod_socket_open(); hashtable_t *hashtable = mm_get_hashtable(); key = hashtable_alloc_key(hashtable); + count = hashtable_alloc_key(hashtable); + rv = hashtable_alloc_key(hashtable); + net_mod_socket_int_set(gNetmod_socket, count); + net_mod_socket_svr_set(gNetmod_socket, rv); + sprintf(pData.int_info, "%d", count); + sprintf(pData.svr_info, "%d", rv); net_mod_socket_bind(gNetmod_socket, key); rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG); @@ -932,8 +938,8 @@ ::bhome_msg::MsgCommonReply mcr; mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv)); mcr.mutable_errmsg()->set_errstring(errString); - *reply_len=mcr.ByteSizeLong(); - *reply=malloc(*reply_len); + *reply_len = mcr.ByteSizeLong(); + *reply = malloc(*reply_len); mcr.SerializePartialToArray(*reply,*reply_len); #endif @@ -1207,6 +1213,7 @@ int val; int len; int min; + int data; int sec, nsec; std::string MsgID; int timeout_ms = 3000; @@ -1309,20 +1316,21 @@ strncpy(topics_buf + strlen(buf_temp) + 1, _input1.data, strlen(_input1.data)); #endif + data = net_mod_socket_svr_get(gNetmod_socket); if (timeout_ms > 0) { sec = timeout_ms / 1000; nsec = (timeout_ms - sec * 1000) * 1000 * 1000; - rv = net_mod_socket_sendto_timeout(gNetmod_socket, topics_buf, len, val, sec, nsec); + rv = net_mod_socket_sendto_timeout(gNetmod_socket, topics_buf, len, val, sec, nsec, SVR_STR, data); } else if (timeout_ms == 0) { - rv = net_mod_socket_sendto_nowait(gNetmod_socket, topics_buf, len, val); + rv = net_mod_socket_sendto_nowait(gNetmod_socket, topics_buf, len, val, SVR_STR, data); } else { - rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val); + rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val, SVR_STR, data); } free(topics_buf); @@ -1377,6 +1385,7 @@ int size; int val; int min, len; + int data; net_node_t node; int node_size; int recv_arr_size; @@ -1469,6 +1478,7 @@ len += strlen(_input1.data); #endif + data = net_mod_socket_svr_get(gNetmod_socket); topics_buf = (char *)malloc(len); if (topics_buf == NULL) { @@ -1620,6 +1630,7 @@ int key; int size; int len; + int data; int sec, nsec; char buf_temp[MAX_STR_LEN] = { 0x00 }; char *topics_buf = NULL; @@ -1642,20 +1653,21 @@ return false; } + data = net_mod_socket_svr_get(gNetmod_socket); if (timeout_ms > 0) { sec = timeout_ms / 1000; nsec = (timeout_ms - sec * 1000) * 1000 * 1000; - rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec); + rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec, SVR_STR, data); } else if (timeout_ms == 0) { - rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key); + rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key, SVR_STR, data); } else { - rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key); + rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key, SVR_STR, data); } if (rv == 0) { @@ -1735,6 +1747,7 @@ int BHSendReply(void *src, const void *reply, const int reply_len) { int rv; + int data; const char *_input; #if defined(PRO_DE_SERIALIZE) @@ -1777,7 +1790,8 @@ rv = pthread_mutex_trylock(&mutex); if (rv == 0) { - rv = net_mod_socket_sendto(gNetmod_socket, _input, strlen(_input), *(int *)src); + data = net_mod_socket_svr_get(gNetmod_socket); + rv = net_mod_socket_sendto(gNetmod_socket, _input, strlen(_input), *(int *)src, SVR_STR, data); memset(errString, 0x00, sizeof(errString)); strncpy(errString, bus_strerror(rv), sizeof(errString)); diff --git a/src/key_def.h b/src/key_def.h index fdeee2e..a25ee07 100644 --- a/src/key_def.h +++ b/src/key_def.h @@ -9,6 +9,7 @@ // BUS key #define SHM_BUS_KEY 8 +#define SHM_BUS_INT_KEY 9 // 缃戠粶浠g悊key #define SHM_NET_PROXY_KEY 99 diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp index ab065eb..2f5ce73 100644 --- a/src/net/net_mod_socket.cpp +++ b/src/net/net_mod_socket.cpp @@ -55,6 +55,22 @@ return shmModSocket.reg(pData, len, buf, size, timeout_ms, flag); } +void NetModSocket::int_set(int data) { + int_val = data; +} + +void NetModSocket::svr_set(int data) { + svr_val = data; +} + +int NetModSocket::int_get(void) { + return int_val; +} + +int NetModSocket::svr_get(void) { + return svr_val; +} + // int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, // net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { // return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1); @@ -493,20 +509,20 @@ * @key 鍙戦�佺粰璋� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int NetModSocket::sendto(const void *buf, const int size, const int key){ - return shmModSocket.sendto(buf, size, key); +int NetModSocket::sendto(const void *buf, const int size, const int key, int reset, int data_set){ + return shmModSocket.sendto(buf, size, key, 0, 0, reset, data_set); } // 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇 -int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){ +int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec, int reset, int data_set){ struct timespec timeout = {sec, nsec}; - return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG); + return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG, reset, data_set); } // 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� -int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){ - return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG); +int NetModSocket::sendto_nowait(const void *buf, const int size, const int key, int reset, int data_set){ + return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG, reset, data_set); } @@ -515,21 +531,21 @@ * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int NetModSocket::recvfrom(void **buf, int *size, int *key) { +int NetModSocket::recvfrom(void **buf, int *size, int *key, int reset, int data_set) { - return shmModSocket.recvfrom(buf, size, key); + return shmModSocket.recvfrom(buf, size, key, 0, 0, reset, data_set); } // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){ +int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec, int reset, int data_set){ struct timespec timeout = {sec, nsec}; - return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG); + return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG, reset, data_set); } -int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){ - return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG); +int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key, int reset, int data_set){ + return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG, reset, data_set); } int NetModSocket::recvandsend(recvandsend_callback_fn callback, diff --git a/src/net/net_mod_socket.h b/src/net/net_mod_socket.h index d8e53ae..9d9af97 100644 --- a/src/net/net_mod_socket.h +++ b/src/net/net_mod_socket.h @@ -71,7 +71,8 @@ private: ShmModSocket shmModSocket; - + int int_val; + int svr_val; // pthread_mutex_t sendMutex; // request header 缂栫爜涓虹綉缁滀紶杈撶殑瀛楄妭 @@ -136,7 +137,10 @@ net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, net_mod_err_t ** _err_arr, int *_err_arr_size, int timeout); - + void int_set(int data); + void svr_set(int data); + int int_get(void); + int svr_get(void); /** * 鍔熻兘鍚宻endandrecv * 浼樼偣锛氱嚎绋嬪畨鍏� @@ -146,27 +150,27 @@ // int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, // net_mod_recv_msg_t ** recv_arr, int *recv_arr_size); - /** * 鍙戦�佷俊鎭� * @key 鍙戦�佺粰璋� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ - int sendto( const void *buf, const int size, const int key); + int sendto( const void *buf, const int size, const int key, int reset = 0, int data_set = 0); // 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇 - int sendto_timeout( const void *buf, const int size, const int key, int sec, int nsec); + int sendto_timeout( const void *buf, const int size, const int key, int sec, int nsec, int reset = 0, int data_set = 0); // 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� - int sendto_nowait( const void *buf, const int size, const int key); + int sendto_nowait( const void *buf, const int size, const int key, int reset = 0, int data_set = 0); /** * 鎺ユ敹淇℃伅 * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ - int recvfrom( void **buf, int *size, int *key); + int recvfrom( void **buf, int *size, int *key, int reset = 0, int data_set = 0); // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 - int recvfrom_timeout( void **buf, int *size, int *key, int sec, int nsec); - int recvfrom_nowait( void **buf, int *size, int *key); + int recvfrom_timeout( void **buf, int *size, int *key, int sec, int nsec, int reset = 0, int data_set = 0); + int recvfrom_nowait( void **buf, int *size, int *key, int reset = 0, int data_set = 0); + /** * 鏈湴鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟 * @key 鍙戦�佺粰璋� diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp index ab4d59d..5233635 100644 --- a/src/net/net_mod_socket_wrapper.cpp +++ b/src/net/net_mod_socket_wrapper.cpp @@ -57,20 +57,20 @@ * @key 鍙戦�佺粰璋� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key) { +int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key, int reset, int data_set) { NetModSocket *sockt = (NetModSocket *)_socket; - return sockt->sendto(buf, size, key); + return sockt->sendto(buf, size, key, reset, data_set); } // 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇 -int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){ +int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec, int reset, int data_set){ NetModSocket *sockt = (NetModSocket *)_socket; - return sockt->sendto_timeout(buf, size, key, sec, nsec); + return sockt->sendto_timeout(buf, size, key, sec, nsec, reset, data_set); // return sockt->sendto(buf, size, key); } // 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� -int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){ +int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key, int reset, int data_set){ NetModSocket *sockt = (NetModSocket *)_socket; - return sockt->sendto_nowait(buf, size, key); + return sockt->sendto_nowait(buf, size, key, reset, data_set); } /** @@ -78,23 +78,23 @@ * @port 浠庤皝鍝噷鏀跺埌鐨勪俊鎭� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key){ +int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key, int reset, int data_set){ int rv; NetModSocket *sockt = (NetModSocket *)_socket; - rv = sockt->recvfrom(buf, size, key); + rv = sockt->recvfrom(buf, size, key, reset, data_set); return rv; } // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){ +int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec, int reset, int data_set){ NetModSocket *sockt = (NetModSocket *)_socket; - return sockt->recvfrom_timeout(buf, size, key, sec, nsec); + return sockt->recvfrom_timeout(buf, size, key, sec, nsec, reset, data_set); } -int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){ +int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key, int reset, int data_set){ NetModSocket *sockt = (NetModSocket *)_socket; - return sockt->recvfrom_nowait(buf, size, key); + return sockt->recvfrom_nowait(buf, size, key, reset, data_set); } int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, @@ -108,6 +108,26 @@ return sockt->bind_proc_id(proc_id, len); } +void net_mod_socket_int_set(void * _socket, int data) { + NetModSocket *sockt = (NetModSocket *)_socket; + sockt->int_set(data); +} + +void net_mod_socket_svr_set(void * _socket, int data) { + NetModSocket *sockt = (NetModSocket *)_socket; + sockt->svr_set(data); +} + +int net_mod_socket_int_get(void * _socket) { + NetModSocket *sockt = (NetModSocket *)_socket; + return sockt->int_get(); +} + +int net_mod_socket_svr_get(void * _socket) { + NetModSocket *sockt = (NetModSocket *)_socket; + return sockt->svr_get(); +} + /** * 濡傛灉寤虹珛杩炴帴鐨勮妭鐐规病鏈夋帴鍙楀埌娑堟伅绛夊緟timeout鐨勬椂闂村悗杩斿洖 * @timeout 绛夊緟鏃堕棿锛屽崟浣嶆槸鍗冨垎涔嬩竴绉� diff --git a/src/net/net_mod_socket_wrapper.h b/src/net/net_mod_socket_wrapper.h index b869510..d68a23e 100644 --- a/src/net/net_mod_socket_wrapper.h +++ b/src/net/net_mod_socket_wrapper.h @@ -67,7 +67,7 @@ * * @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜 */ -int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key); +int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key, int reset = 0, int data_set = 0); /** * @brief 鍙戦�佷俊鎭紝鍦ㄦ寚瀹氭椂闂村唴娌″彂閫佸畬鎴愪篃杩斿洖銆� @@ -80,7 +80,7 @@ * * @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜 */ -int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec); +int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec, int reset = 0, int data_set = 0); /** * @brief 鍙戦�佷俊鎭紝鏃犺鏄惁鍙戦�佸畬鎴愮珛鍒昏繑鍥炪�� @@ -91,7 +91,7 @@ * * @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜 */ -int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key); +int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key, int reset = 0, int data_set = 0); /** * @brief 绛夊緟鎺ユ敹淇℃伅锛岀洿鍒版湁娑堟伅鎺ュ彈鍒版墠杩斿洖 @@ -102,7 +102,7 @@ * * @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜 */ -int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key); +int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key, int reset = 0, int data_set = 0); /** * @brief 绛夊緟鎺ユ敹淇℃伅锛屽湪鎸囧畾鐨勬椂闂村唴鍗充娇娌℃湁鎺ュ彈鍒版秷鎭篃瑕佽繑鍥� @@ -115,7 +115,7 @@ * * @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜 */ -int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec); +int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec, int reset = 0, int data_set = 0); /** * @brief 绛夊緟鎺ユ敹淇℃伅锛岀洿鍒版湁娑堟伅鎺ュ彈鍒版墠杩斿洖 @@ -126,10 +126,12 @@ * * @return 0鏄垚鍔燂紝鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜 */ -int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key); +int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key, int reset = 0, int data_set = 0); - - +void net_mod_socket_int_set(void * _socket, int data); +void net_mod_socket_svr_set(void * _socket, int data); +int net_mod_socket_int_get(void * _socket); +int net_mod_socket_svr_get(void * _socket); /** * @brief 璺ㄦ満鍣ㄥ彂閫佹秷鎭苟鎺ュ彈杩斿洖鐨勫簲绛旀秷鎭紝鐩村埌鍙戦�佸畬鎴愭墠杩斿洖 diff --git a/src/proc_def.h b/src/proc_def.h index 1d84362..2b3f57b 100644 --- a/src/proc_def.h +++ b/src/proc_def.h @@ -35,6 +35,8 @@ char name[MAX_STR_LEN]; char public_info[MAX_STR_LEN]; char private_info[MAX_STR_LEN]; + char int_info[MAX_STR_LEN]; + char svr_info[MAX_STR_LEN]; #endif } ProcInfo; @@ -65,6 +67,8 @@ } #endif +#define INT_STR 0x01 +#define SVR_STR 0x02 #endif //end of file diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp index 8b022e1..d5e757d 100644 --- a/src/socket/bus_server_socket.cpp +++ b/src/socket/bus_server_socket.cpp @@ -302,6 +302,7 @@ int count = 0; int i = 0; int len = 0; + int data1, data2; char *data_ptr; ProcInfo Data_stru; ProcZone::iterator proc_iter; @@ -333,6 +334,13 @@ memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1); count += strlen(buf + count) + 1; + + memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + + memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1); + count += strlen(buf + count) + 1; + } ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); @@ -362,6 +370,9 @@ if ((proc_iter = proc->find(key)) != proc->end()) { + data1 = atoi((proc_iter->second).int_info); + data2 = atoi((proc_iter->second).svr_info); + BusServerSocket::_data_remove(data1, data2); len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1); strncpy(buf_temp, (proc_iter->second).proc_id, len); proc->erase(proc_iter); @@ -504,7 +515,9 @@ free(last_buf); } else if (flag == PROC_QUE_STCS) { + SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY); + ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY); strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size); if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) { @@ -512,6 +525,9 @@ for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { count = *svr_proc_iter; + if ((proc_iter = proc->find(count)) != proc->end()) { + count = atoi((proc_iter->second).svr_info); + } break; } @@ -770,3 +786,40 @@ return rv; } + +void BusServerSocket::_data_remove(int val1, int val2) { + + int i; + LockFreeQueue<shm_packet_t> *queue = NULL; + hashtable_t *hashtable = mm_get_hashtable(); + + void *data_ptr1 = hashtable_get(hashtable, val1); + void *data_ptr2 = hashtable_get(hashtable, val2); + if (data_ptr1 != NULL) { + if (data_ptr1 != (void *)1) { + queue = (LockFreeQueue<shm_packet_t> *)data_ptr1; + queue->close(); + for (i = 0; i < queue->size(); i++) { + mm_free((*queue)[i].buf); + } + sleep(1); + } + + hashtable_remove(hashtable, val1); + } + + if (data_ptr2 != NULL) { + if (data_ptr2 != (void *)1) { + queue = (LockFreeQueue<shm_packet_t> *)data_ptr2; + queue->close(); + for (i = 0; i < queue->size(); i++) { + mm_free((*queue)[i].buf); + } + sleep(1); + } + + hashtable_remove(hashtable, val2); + } + +} + diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h index 3052c8b..e60c700 100644 --- a/src/socket/bus_server_socket.h +++ b/src/socket/bus_server_socket.h @@ -81,6 +81,7 @@ */ int get_key() ; + void _data_remove(int val1, int val2); }; diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp index a94b9c3..7562d56 100644 --- a/src/socket/shm_mod_socket.cpp +++ b/src/socket/shm_mod_socket.cpp @@ -166,8 +166,8 @@ * @key 鍙戦�佺粰璋� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag) { - int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag); +int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag, int reset, int data_set) { + int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag, reset, data_set); if(rv == 0) { logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key); return 0; @@ -182,9 +182,9 @@ * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) { +int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag, int reset, int data_set) { - int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag); + int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag, reset, data_set); if(rv == 0) { logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key); diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h index da02fab..5e234bf 100644 --- a/src/socket/shm_mod_socket.h +++ b/src/socket/shm_mod_socket.h @@ -64,22 +64,11 @@ int bind_proc_id(char *buf, int len); int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag); - /** - * 鍙戦�佷俊鎭� - * @key 鍙戦�佺粰璋� - * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ - - int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0); - + + int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0, int reset = 0, int data_set = 0); - /** - * 鎺ユ敹淇℃伅 - * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ - int recvfrom(void **buf, int *size, int *key, const struct timespec *timeout = NULL, int flag = 0); + int recvfrom(void **buf, int *size, int *key, const struct timespec *timeout = NULL, int flag = 0, int reset = 0, int data_set = 0); + /** * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟 * @key 鍙戦�佺粰璋� diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp index 84bf77e..dc6d752 100644 --- a/src/socket/shm_socket.cpp +++ b/src/socket/shm_socket.cpp @@ -23,11 +23,12 @@ static void _destrory_threadlocal_socket_(void *tmp_socket); static void _create_threadlocal_socket_key_(void); -static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak , const struct timespec *timeout, int flag); +static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak , const struct timespec *timeout, + int flag, int reset = 0, int data_set = 0); -static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, - const int key, const struct timespec *timeout, const int flag); +static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, const int key, const struct timespec *timeout, + const int flag, int reset = 0, int data_set = 0); static int _shm_sendandrecv_uuid(shm_socket_t *sockt, const void *send_buf, @@ -183,20 +184,24 @@ } // 鐭繛鎺ユ柟寮忓彂閫� -int shm_sendto(shm_socket_t *sockt, const void *buf, const int size, - const int key, const struct timespec *timeout, const int flag) { +int shm_sendto(shm_socket_t *sockt, const void *buf, const int size, const int key, const struct timespec *timeout, + const int flag, int reset, int data_set) { int rv; shm_packet_t sendpak = {0}; - sendpak.key = sockt->key; + if (reset == 0) { + sendpak.key = sockt->key; + } else { + sendpak.key = data_set; + } sendpak.size = size; if(buf != NULL) { sendpak.buf = mm_malloc(size); memcpy(sendpak.buf, buf, size); } - rv = shm_sendpakto(sockt, &sendpak, key, timeout, flag); + rv = shm_sendpakto(sockt, &sendpak, key, timeout, flag, reset, data_set); return rv; } @@ -262,11 +267,11 @@ } // 鐭繛鎺ユ柟寮忔帴鍙� -int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key, const struct timespec *timeout, int flag) { +int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key, const struct timespec *timeout, int flag, int reset, int data_set) { int rv; shm_packet_t recvpak; - rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag); + rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag, reset, data_set); if (rv != 0) { @@ -544,15 +549,24 @@ -static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, - const int key, const struct timespec *timeout, const int flag) { +static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, const int key, const struct timespec *timeout, + const int flag, int reset, int data_set) { int rv; shm_queue_status_t stRecord; LockFreeQueue<shm_packet_t> *remoteQueue; + LockFreeQueue<shm_packet_t> *fixedQueue; hashtable_t *hashtable = mm_get_hashtable(); - if( sockt->queue != NULL) + if ((reset != 0) && (data_set == 0)) { + return EBUS_KEY_INUSED; + } + + if (reset != 0) { + fixedQueue = shm_socket_attach_queue(data_set); + } + + if (((reset == 0) && (sockt->queue != NULL)) || ((reset != 0) && (fixedQueue != NULL))) goto LABEL_PUSH; // if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) { @@ -563,7 +577,7 @@ if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) err_exit(rv, "shm_sendto : pthread_mutex_lock"); - if (sockt->queue == NULL) { + if ((sockt->queue == NULL) && (reset == 0)) { if (sockt->key == 0) { sockt->key = hashtable_alloc_key(hashtable); } @@ -580,6 +594,16 @@ // stRecord.createTime = time(NULL); // shmQueueStMap->insert({sockt->key, stRecord}); + } + + if ((fixedQueue == NULL) && (reset != 0)) { + fixedQueue = shm_socket_bind_queue(data_set, false); + if (fixedQueue == NULL ) { + logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); + if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) + err_exit(rv, "shm_sendto : pthread_mutex_unlock"); + return EBUS_KEY_INUSED; + } } if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) @@ -611,7 +635,9 @@ goto ERR_CLOSED; } - sendpak->key = sockt->key; + if (reset == 0) { + sendpak->key = sockt->key; + } rv = remoteQueue->push(*sendpak, timeout, flag); if(rv != 0) { @@ -629,13 +655,23 @@ } // 鐭繛鎺ユ柟寮忔帴鍙� -static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout, int flag) { +static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout, + int flag, int reset, int data_set) { int rv; shm_queue_status_t stRecord; + LockFreeQueue<shm_packet_t> *fixedQueue; hashtable_t *hashtable = mm_get_hashtable(); shm_packet_t recvpak; - if( sockt->queue != NULL) + if ((reset != 0) && (data_set == 0)) { + return EBUS_KEY_INUSED; + } + + if (reset != 0) { + fixedQueue = shm_socket_attach_queue(data_set); + } + + if (((sockt->queue != NULL) && (reset == 0)) || ((reset != 0) && (fixedQueue != NULL))) goto LABEL_POP; // if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) { @@ -646,21 +682,33 @@ if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0) err_exit(rv, "shm_recvfrom : pthread_mutex_lock"); - if (sockt->key == 0) { - sockt->key = hashtable_alloc_key(hashtable); - } - sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); - if(sockt->queue == NULL ) { - logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); - if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) - err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); - return EBUS_KEY_INUSED; + if ((sockt->queue == NULL) && (reset == 0)) { + if (sockt->key == 0) { + sockt->key = hashtable_alloc_key(hashtable); + } + sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind); + if(sockt->queue == NULL ) { + logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); + if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) + err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); + return EBUS_KEY_INUSED; + } + + // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened + // stRecord.status = SHM_QUEUE_ST_OPENED; + // stRecord.createTime = time(NULL); + // shmQueueStMap->insert({sockt->key, stRecord}); } - - // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened - // stRecord.status = SHM_QUEUE_ST_OPENED; - // stRecord.createTime = time(NULL); - // shmQueueStMap->insert({sockt->key, stRecord}); + + if ((fixedQueue == NULL) && (reset != 0)) { + fixedQueue = shm_socket_bind_queue(data_set, false); + if (fixedQueue == NULL ) { + logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); + if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) + err_exit(rv, "shm_sendto : pthread_mutex_unlock"); + return EBUS_KEY_INUSED; + } + } if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); @@ -669,7 +717,11 @@ LABEL_POP: - rv = sockt->queue->pop(recvpak, timeout, flag); + if (reset == 0) { + rv = sockt->queue->pop(recvpak, timeout, flag); + } else { + rv = fixedQueue->pop(recvpak, timeout, flag); + } if(rv != 0) { if(rv == ETIMEDOUT) { return EBUS_TIMEOUT; @@ -697,6 +749,10 @@ count += strlen(ptr->public_info) + 1; memcpy(dst + count, ptr->private_info, strlen(ptr->private_info) + 1); count += strlen(ptr->private_info) + 1; + memcpy(dst + count, ptr->int_info, strlen(ptr->int_info) + 1); + count += strlen(ptr->int_info) + 1; + memcpy(dst + count, ptr->svr_info, strlen(ptr->svr_info) + 1); + count += strlen(ptr->svr_info) + 1; *counter = count; } diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h index 8e874d1..2b50a11 100644 --- a/src/socket/shm_socket.h +++ b/src/socket/shm_socket.h @@ -66,9 +66,9 @@ /** * @flags : BUS_NOWAIT_FLAG */ -int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0); +int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0, int reset = 0, int data_set = 0); -int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key, const struct timespec * timeout = NULL, int flags=0); +int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key, const struct timespec * timeout = NULL, int flags=0, int reset = 0, int data_set = 0); int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec * timeout = NULL, int flags = 0); -- Gitblit v1.8.0