From dc01e4cbb01e96d19b470a366bbe648d426ed171 Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@smartai.com>
Date: 星期六, 11 九月 2021 10:06:15 +0800
Subject: [PATCH] Add topics sub and request support.
---
src/key_def.h | 1
src/socket/shm_socket.h | 4
src/net/net_mod_socket.cpp | 40 ++++-
src/socket/shm_socket.cpp | 118 ++++++++++++----
src/socket/shm_mod_socket.h | 19 --
src/socket/bus_server_socket.cpp | 53 +++++++
src/net/net_mod_socket_wrapper.cpp | 44 ++++-
src/net/net_mod_socket_wrapper.h | 18 +-
src/net/net_mod_socket.h | 22 +-
src/socket/bus_server_socket.h | 1
src/bh_api.cpp | 32 +++-
src/proc_def.h | 4
src/socket/shm_mod_socket.cpp | 8
13 files changed, 262 insertions(+), 102 deletions(-)
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index 4f623f8..d25bf8a 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -139,6 +139,12 @@
gNetmod_socket = net_mod_socket_open();
hashtable_t *hashtable = mm_get_hashtable();
key = hashtable_alloc_key(hashtable);
+ count = hashtable_alloc_key(hashtable);
+ rv = hashtable_alloc_key(hashtable);
+ net_mod_socket_int_set(gNetmod_socket, count);
+ net_mod_socket_svr_set(gNetmod_socket, rv);
+ sprintf(pData.int_info, "%d", count);
+ sprintf(pData.svr_info, "%d", rv);
net_mod_socket_bind(gNetmod_socket, key);
rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG);
@@ -932,8 +938,8 @@
::bhome_msg::MsgCommonReply mcr;
mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
mcr.mutable_errmsg()->set_errstring(errString);
- *reply_len=mcr.ByteSizeLong();
- *reply=malloc(*reply_len);
+ *reply_len = mcr.ByteSizeLong();
+ *reply = malloc(*reply_len);
mcr.SerializePartialToArray(*reply,*reply_len);
#endif
@@ -1207,6 +1213,7 @@
int val;
int len;
int min;
+ int data;
int sec, nsec;
std::string MsgID;
int timeout_ms = 3000;
@@ -1309,20 +1316,21 @@
strncpy(topics_buf + strlen(buf_temp) + 1, _input1.data, strlen(_input1.data));
#endif
+ data = net_mod_socket_svr_get(gNetmod_socket);
if (timeout_ms > 0) {
sec = timeout_ms / 1000;
nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
- rv = net_mod_socket_sendto_timeout(gNetmod_socket, topics_buf, len, val, sec, nsec);
+ rv = net_mod_socket_sendto_timeout(gNetmod_socket, topics_buf, len, val, sec, nsec, SVR_STR, data);
} else if (timeout_ms == 0) {
- rv = net_mod_socket_sendto_nowait(gNetmod_socket, topics_buf, len, val);
+ rv = net_mod_socket_sendto_nowait(gNetmod_socket, topics_buf, len, val, SVR_STR, data);
} else {
- rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val);
+ rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val, SVR_STR, data);
}
free(topics_buf);
@@ -1377,6 +1385,7 @@
int size;
int val;
int min, len;
+ int data;
net_node_t node;
int node_size;
int recv_arr_size;
@@ -1469,6 +1478,7 @@
len += strlen(_input1.data);
#endif
+ data = net_mod_socket_svr_get(gNetmod_socket);
topics_buf = (char *)malloc(len);
if (topics_buf == NULL) {
@@ -1620,6 +1630,7 @@
int key;
int size;
int len;
+ int data;
int sec, nsec;
char buf_temp[MAX_STR_LEN] = { 0x00 };
char *topics_buf = NULL;
@@ -1642,20 +1653,21 @@
return false;
}
+ data = net_mod_socket_svr_get(gNetmod_socket);
if (timeout_ms > 0) {
sec = timeout_ms / 1000;
nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
- rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec);
+ rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec, SVR_STR, data);
} else if (timeout_ms == 0) {
- rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key);
+ rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key, SVR_STR, data);
} else {
- rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key);
+ rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key, SVR_STR, data);
}
if (rv == 0) {
@@ -1735,6 +1747,7 @@
int BHSendReply(void *src, const void *reply, const int reply_len)
{
int rv;
+ int data;
const char *_input;
#if defined(PRO_DE_SERIALIZE)
@@ -1777,7 +1790,8 @@
rv = pthread_mutex_trylock(&mutex);
if (rv == 0) {
- rv = net_mod_socket_sendto(gNetmod_socket, _input, strlen(_input), *(int *)src);
+ data = net_mod_socket_svr_get(gNetmod_socket);
+ rv = net_mod_socket_sendto(gNetmod_socket, _input, strlen(_input), *(int *)src, SVR_STR, data);
memset(errString, 0x00, sizeof(errString));
strncpy(errString, bus_strerror(rv), sizeof(errString));
diff --git a/src/key_def.h b/src/key_def.h
index fdeee2e..a25ee07 100644
--- a/src/key_def.h
+++ b/src/key_def.h
@@ -9,6 +9,7 @@
// BUS key
#define SHM_BUS_KEY 8
+#define SHM_BUS_INT_KEY 9
// 缃戠粶浠g悊key
#define SHM_NET_PROXY_KEY 99
diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
index ab065eb..2f5ce73 100644
--- a/src/net/net_mod_socket.cpp
+++ b/src/net/net_mod_socket.cpp
@@ -55,6 +55,22 @@
return shmModSocket.reg(pData, len, buf, size, timeout_ms, flag);
}
+void NetModSocket::int_set(int data) {
+ int_val = data;
+}
+
+void NetModSocket::svr_set(int data) {
+ svr_val = data;
+}
+
+int NetModSocket::int_get(void) {
+ return int_val;
+}
+
+int NetModSocket::svr_get(void) {
+ return svr_val;
+}
+
// int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
// net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
// return _sendandrecv_(node_arr, arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
@@ -493,20 +509,20 @@
* @key 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int NetModSocket::sendto(const void *buf, const int size, const int key){
- return shmModSocket.sendto(buf, size, key);
+int NetModSocket::sendto(const void *buf, const int size, const int key, int reset, int data_set){
+ return shmModSocket.sendto(buf, size, key, 0, 0, reset, data_set);
}
// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
-int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){
+int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec, int reset, int data_set){
struct timespec timeout = {sec, nsec};
- return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
+ return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG, reset, data_set);
}
// 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
-int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){
- return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG);
+int NetModSocket::sendto_nowait(const void *buf, const int size, const int key, int reset, int data_set){
+ return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG, reset, data_set);
}
@@ -515,21 +531,21 @@
* @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int NetModSocket::recvfrom(void **buf, int *size, int *key) {
+int NetModSocket::recvfrom(void **buf, int *size, int *key, int reset, int data_set) {
- return shmModSocket.recvfrom(buf, size, key);
+ return shmModSocket.recvfrom(buf, size, key, 0, 0, reset, data_set);
}
// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){
+int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec, int reset, int data_set){
struct timespec timeout = {sec, nsec};
- return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
+ return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG, reset, data_set);
}
-int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
- return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG);
+int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key, int reset, int data_set){
+ return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG, reset, data_set);
}
int NetModSocket::recvandsend(recvandsend_callback_fn callback,
diff --git a/src/net/net_mod_socket.h b/src/net/net_mod_socket.h
index d8e53ae..9d9af97 100644
--- a/src/net/net_mod_socket.h
+++ b/src/net/net_mod_socket.h
@@ -71,7 +71,8 @@
private:
ShmModSocket shmModSocket;
-
+ int int_val;
+ int svr_val;
// pthread_mutex_t sendMutex;
// request header 缂栫爜涓虹綉缁滀紶杈撶殑瀛楄妭
@@ -136,7 +137,10 @@
net_mod_recv_msg_t ** recv_arr, int *recv_arr_size,
net_mod_err_t ** _err_arr, int *_err_arr_size, int timeout);
-
+ void int_set(int data);
+ void svr_set(int data);
+ int int_get(void);
+ int svr_get(void);
/**
* 鍔熻兘鍚宻endandrecv
* 浼樼偣锛氱嚎绋嬪畨鍏�
@@ -146,27 +150,27 @@
// int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size,
// net_mod_recv_msg_t ** recv_arr, int *recv_arr_size);
-
/**
* 鍙戦�佷俊鎭�
* @key 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
- int sendto( const void *buf, const int size, const int key);
+ int sendto( const void *buf, const int size, const int key, int reset = 0, int data_set = 0);
// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
- int sendto_timeout( const void *buf, const int size, const int key, int sec, int nsec);
+ int sendto_timeout( const void *buf, const int size, const int key, int sec, int nsec, int reset = 0, int data_set = 0);
// 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
- int sendto_nowait( const void *buf, const int size, const int key);
+ int sendto_nowait( const void *buf, const int size, const int key, int reset = 0, int data_set = 0);
/**
* 鎺ユ敹淇℃伅
* @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
- int recvfrom( void **buf, int *size, int *key);
+ int recvfrom( void **buf, int *size, int *key, int reset = 0, int data_set = 0);
// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
- int recvfrom_timeout( void **buf, int *size, int *key, int sec, int nsec);
- int recvfrom_nowait( void **buf, int *size, int *key);
+ int recvfrom_timeout( void **buf, int *size, int *key, int sec, int nsec, int reset = 0, int data_set = 0);
+ int recvfrom_nowait( void **buf, int *size, int *key, int reset = 0, int data_set = 0);
+
/**
* 鏈湴鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
* @key 鍙戦�佺粰璋�
diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp
index ab4d59d..5233635 100644
--- a/src/net/net_mod_socket_wrapper.cpp
+++ b/src/net/net_mod_socket_wrapper.cpp
@@ -57,20 +57,20 @@
* @key 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key) {
+int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key, int reset, int data_set) {
NetModSocket *sockt = (NetModSocket *)_socket;
- return sockt->sendto(buf, size, key);
+ return sockt->sendto(buf, size, key, reset, data_set);
}
// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
-int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){
+int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec, int reset, int data_set){
NetModSocket *sockt = (NetModSocket *)_socket;
- return sockt->sendto_timeout(buf, size, key, sec, nsec);
+ return sockt->sendto_timeout(buf, size, key, sec, nsec, reset, data_set);
// return sockt->sendto(buf, size, key);
}
// 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
-int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){
+int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key, int reset, int data_set){
NetModSocket *sockt = (NetModSocket *)_socket;
- return sockt->sendto_nowait(buf, size, key);
+ return sockt->sendto_nowait(buf, size, key, reset, data_set);
}
/**
@@ -78,23 +78,23 @@
* @port 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key){
+int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key, int reset, int data_set){
int rv;
NetModSocket *sockt = (NetModSocket *)_socket;
- rv = sockt->recvfrom(buf, size, key);
+ rv = sockt->recvfrom(buf, size, key, reset, data_set);
return rv;
}
// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){
+int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec, int reset, int data_set){
NetModSocket *sockt = (NetModSocket *)_socket;
- return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
+ return sockt->recvfrom_timeout(buf, size, key, sec, nsec, reset, data_set);
}
-int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){
+int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key, int reset, int data_set){
NetModSocket *sockt = (NetModSocket *)_socket;
- return sockt->recvfrom_nowait(buf, size, key);
+ return sockt->recvfrom_nowait(buf, size, key, reset, data_set);
}
int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
@@ -108,6 +108,26 @@
return sockt->bind_proc_id(proc_id, len);
}
+void net_mod_socket_int_set(void * _socket, int data) {
+ NetModSocket *sockt = (NetModSocket *)_socket;
+ sockt->int_set(data);
+}
+
+void net_mod_socket_svr_set(void * _socket, int data) {
+ NetModSocket *sockt = (NetModSocket *)_socket;
+ sockt->svr_set(data);
+}
+
+int net_mod_socket_int_get(void * _socket) {
+ NetModSocket *sockt = (NetModSocket *)_socket;
+ return sockt->int_get();
+}
+
+int net_mod_socket_svr_get(void * _socket) {
+ NetModSocket *sockt = (NetModSocket *)_socket;
+ return sockt->svr_get();
+}
+
/**
* 濡傛灉寤虹珛杩炴帴鐨勮妭鐐规病鏈夋帴鍙楀埌娑堟伅绛夊緟timeout鐨勬椂闂村悗杩斿洖
* @timeout 绛夊緟鏃堕棿锛屽崟浣嶆槸鍗冨垎涔嬩竴绉�
diff --git a/src/net/net_mod_socket_wrapper.h b/src/net/net_mod_socket_wrapper.h
index b869510..d68a23e 100644
--- a/src/net/net_mod_socket_wrapper.h
+++ b/src/net/net_mod_socket_wrapper.h
@@ -67,7 +67,7 @@
*
* @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜
*/
-int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key);
+int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key, int reset = 0, int data_set = 0);
/**
* @brief 鍙戦�佷俊鎭紝鍦ㄦ寚瀹氭椂闂村唴娌″彂閫佸畬鎴愪篃杩斿洖銆�
@@ -80,7 +80,7 @@
*
* @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜
*/
-int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec);
+int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec, int reset = 0, int data_set = 0);
/**
* @brief 鍙戦�佷俊鎭紝鏃犺鏄惁鍙戦�佸畬鎴愮珛鍒昏繑鍥炪��
@@ -91,7 +91,7 @@
*
* @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜
*/
-int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key);
+int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key, int reset = 0, int data_set = 0);
/**
* @brief 绛夊緟鎺ユ敹淇℃伅锛岀洿鍒版湁娑堟伅鎺ュ彈鍒版墠杩斿洖
@@ -102,7 +102,7 @@
*
* @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜
*/
-int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key);
+int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key, int reset = 0, int data_set = 0);
/**
* @brief 绛夊緟鎺ユ敹淇℃伅锛屽湪鎸囧畾鐨勬椂闂村唴鍗充娇娌℃湁鎺ュ彈鍒版秷鎭篃瑕佽繑鍥�
@@ -115,7 +115,7 @@
*
* @return 0鏄垚鍔燂紝 鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜
*/
-int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec);
+int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec, int reset = 0, int data_set = 0);
/**
* @brief 绛夊緟鎺ユ敹淇℃伅锛岀洿鍒版湁娑堟伅鎺ュ彈鍒版墠杩斿洖
@@ -126,10 +126,12 @@
*
* @return 0鏄垚鍔燂紝鍏朵粬鍊兼槸澶辫触鐨勯敊璇爜
*/
-int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key);
+int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key, int reset = 0, int data_set = 0);
-
-
+void net_mod_socket_int_set(void * _socket, int data);
+void net_mod_socket_svr_set(void * _socket, int data);
+int net_mod_socket_int_get(void * _socket);
+int net_mod_socket_svr_get(void * _socket);
/**
* @brief 璺ㄦ満鍣ㄥ彂閫佹秷鎭苟鎺ュ彈杩斿洖鐨勫簲绛旀秷鎭紝鐩村埌鍙戦�佸畬鎴愭墠杩斿洖
diff --git a/src/proc_def.h b/src/proc_def.h
index 1d84362..2b3f57b 100644
--- a/src/proc_def.h
+++ b/src/proc_def.h
@@ -35,6 +35,8 @@
char name[MAX_STR_LEN];
char public_info[MAX_STR_LEN];
char private_info[MAX_STR_LEN];
+ char int_info[MAX_STR_LEN];
+ char svr_info[MAX_STR_LEN];
#endif
} ProcInfo;
@@ -65,6 +67,8 @@
}
#endif
+#define INT_STR 0x01
+#define SVR_STR 0x02
#endif //end of file
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 8b022e1..d5e757d 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -302,6 +302,7 @@
int count = 0;
int i = 0;
int len = 0;
+ int data1, data2;
char *data_ptr;
ProcInfo Data_stru;
ProcZone::iterator proc_iter;
@@ -333,6 +334,13 @@
memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1);
count += strlen(buf + count) + 1;
+
+ memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1);
+ count += strlen(buf + count) + 1;
+
+ memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1);
+ count += strlen(buf + count) + 1;
+
}
ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
@@ -362,6 +370,9 @@
if ((proc_iter = proc->find(key)) != proc->end()) {
+ data1 = atoi((proc_iter->second).int_info);
+ data2 = atoi((proc_iter->second).svr_info);
+ BusServerSocket::_data_remove(data1, data2);
len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
strncpy(buf_temp, (proc_iter->second).proc_id, len);
proc->erase(proc_iter);
@@ -504,7 +515,9 @@
free(last_buf);
} else if (flag == PROC_QUE_STCS) {
+
SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
+ ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
@@ -512,6 +525,9 @@
for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) {
count = *svr_proc_iter;
+ if ((proc_iter = proc->find(count)) != proc->end()) {
+ count = atoi((proc_iter->second).svr_info);
+ }
break;
}
@@ -770,3 +786,40 @@
return rv;
}
+
+void BusServerSocket::_data_remove(int val1, int val2) {
+
+ int i;
+ LockFreeQueue<shm_packet_t> *queue = NULL;
+ hashtable_t *hashtable = mm_get_hashtable();
+
+ void *data_ptr1 = hashtable_get(hashtable, val1);
+ void *data_ptr2 = hashtable_get(hashtable, val2);
+ if (data_ptr1 != NULL) {
+ if (data_ptr1 != (void *)1) {
+ queue = (LockFreeQueue<shm_packet_t> *)data_ptr1;
+ queue->close();
+ for (i = 0; i < queue->size(); i++) {
+ mm_free((*queue)[i].buf);
+ }
+ sleep(1);
+ }
+
+ hashtable_remove(hashtable, val1);
+ }
+
+ if (data_ptr2 != NULL) {
+ if (data_ptr2 != (void *)1) {
+ queue = (LockFreeQueue<shm_packet_t> *)data_ptr2;
+ queue->close();
+ for (i = 0; i < queue->size(); i++) {
+ mm_free((*queue)[i].buf);
+ }
+ sleep(1);
+ }
+
+ hashtable_remove(hashtable, val2);
+ }
+
+}
+
diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h
index 3052c8b..e60c700 100644
--- a/src/socket/bus_server_socket.h
+++ b/src/socket/bus_server_socket.h
@@ -81,6 +81,7 @@
*/
int get_key() ;
+ void _data_remove(int val1, int val2);
};
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index a94b9c3..7562d56 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -166,8 +166,8 @@
* @key 鍙戦�佺粰璋�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag) {
- int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag);
+int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag, int reset, int data_set) {
+ int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag, reset, data_set);
if(rv == 0) {
logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key);
return 0;
@@ -182,9 +182,9 @@
* @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
* @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
*/
-int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
+int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag, int reset, int data_set) {
- int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
+ int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag, reset, data_set);
if(rv == 0) {
logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index da02fab..5e234bf 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -64,22 +64,11 @@
int bind_proc_id(char *buf, int len);
int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
- /**
- * 鍙戦�佷俊鎭�
- * @key 鍙戦�佺粰璋�
- * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG
- * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
- */
-
- int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0);
-
+
+ int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0, int reset = 0, int data_set = 0);
- /**
- * 鎺ユ敹淇℃伅
- * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
- * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
- */
- int recvfrom(void **buf, int *size, int *key, const struct timespec *timeout = NULL, int flag = 0);
+ int recvfrom(void **buf, int *size, int *key, const struct timespec *timeout = NULL, int flag = 0, int reset = 0, int data_set = 0);
+
/**
* 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
* @key 鍙戦�佺粰璋�
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 84bf77e..dc6d752 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -23,11 +23,12 @@
static void _destrory_threadlocal_socket_(void *tmp_socket);
static void _create_threadlocal_socket_key_(void);
-static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak , const struct timespec *timeout, int flag);
+static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak , const struct timespec *timeout,
+ int flag, int reset = 0, int data_set = 0);
-static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak,
- const int key, const struct timespec *timeout, const int flag);
+static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, const int key, const struct timespec *timeout,
+ const int flag, int reset = 0, int data_set = 0);
static int _shm_sendandrecv_uuid(shm_socket_t *sockt, const void *send_buf,
@@ -183,20 +184,24 @@
}
// 鐭繛鎺ユ柟寮忓彂閫�
-int shm_sendto(shm_socket_t *sockt, const void *buf, const int size,
- const int key, const struct timespec *timeout, const int flag) {
+int shm_sendto(shm_socket_t *sockt, const void *buf, const int size, const int key, const struct timespec *timeout,
+ const int flag, int reset, int data_set) {
int rv;
shm_packet_t sendpak = {0};
- sendpak.key = sockt->key;
+ if (reset == 0) {
+ sendpak.key = sockt->key;
+ } else {
+ sendpak.key = data_set;
+ }
sendpak.size = size;
if(buf != NULL) {
sendpak.buf = mm_malloc(size);
memcpy(sendpak.buf, buf, size);
}
- rv = shm_sendpakto(sockt, &sendpak, key, timeout, flag);
+ rv = shm_sendpakto(sockt, &sendpak, key, timeout, flag, reset, data_set);
return rv;
}
@@ -262,11 +267,11 @@
}
// 鐭繛鎺ユ柟寮忔帴鍙�
-int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
+int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key, const struct timespec *timeout, int flag, int reset, int data_set) {
int rv;
shm_packet_t recvpak;
- rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag);
+ rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag, reset, data_set);
if (rv != 0) {
@@ -544,15 +549,24 @@
-static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak,
- const int key, const struct timespec *timeout, const int flag) {
+static int shm_sendpakto(shm_socket_t *sockt, shm_packet_t *sendpak, const int key, const struct timespec *timeout,
+ const int flag, int reset, int data_set) {
int rv;
shm_queue_status_t stRecord;
LockFreeQueue<shm_packet_t> *remoteQueue;
+ LockFreeQueue<shm_packet_t> *fixedQueue;
hashtable_t *hashtable = mm_get_hashtable();
- if( sockt->queue != NULL)
+ if ((reset != 0) && (data_set == 0)) {
+ return EBUS_KEY_INUSED;
+ }
+
+ if (reset != 0) {
+ fixedQueue = shm_socket_attach_queue(data_set);
+ }
+
+ if (((reset == 0) && (sockt->queue != NULL)) || ((reset != 0) && (fixedQueue != NULL)))
goto LABEL_PUSH;
// if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
@@ -563,7 +577,7 @@
if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
err_exit(rv, "shm_sendto : pthread_mutex_lock");
- if (sockt->queue == NULL) {
+ if ((sockt->queue == NULL) && (reset == 0)) {
if (sockt->key == 0) {
sockt->key = hashtable_alloc_key(hashtable);
}
@@ -580,6 +594,16 @@
// stRecord.createTime = time(NULL);
// shmQueueStMap->insert({sockt->key, stRecord});
+ }
+
+ if ((fixedQueue == NULL) && (reset != 0)) {
+ fixedQueue = shm_socket_bind_queue(data_set, false);
+ if (fixedQueue == NULL ) {
+ logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
+ if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
+ err_exit(rv, "shm_sendto : pthread_mutex_unlock");
+ return EBUS_KEY_INUSED;
+ }
}
if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
@@ -611,7 +635,9 @@
goto ERR_CLOSED;
}
- sendpak->key = sockt->key;
+ if (reset == 0) {
+ sendpak->key = sockt->key;
+ }
rv = remoteQueue->push(*sendpak, timeout, flag);
if(rv != 0) {
@@ -629,13 +655,23 @@
}
// 鐭繛鎺ユ柟寮忔帴鍙�
-static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout, int flag) {
+static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout,
+ int flag, int reset, int data_set) {
int rv;
shm_queue_status_t stRecord;
+ LockFreeQueue<shm_packet_t> *fixedQueue;
hashtable_t *hashtable = mm_get_hashtable();
shm_packet_t recvpak;
- if( sockt->queue != NULL)
+ if ((reset != 0) && (data_set == 0)) {
+ return EBUS_KEY_INUSED;
+ }
+
+ if (reset != 0) {
+ fixedQueue = shm_socket_attach_queue(data_set);
+ }
+
+ if (((sockt->queue != NULL) && (reset == 0)) || ((reset != 0) && (fixedQueue != NULL)))
goto LABEL_POP;
// if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
@@ -646,21 +682,33 @@
if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
err_exit(rv, "shm_recvfrom : pthread_mutex_lock");
- if (sockt->key == 0) {
- sockt->key = hashtable_alloc_key(hashtable);
- }
- sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
- if(sockt->queue == NULL ) {
- logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
- if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
- err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
- return EBUS_KEY_INUSED;
+ if ((sockt->queue == NULL) && (reset == 0)) {
+ if (sockt->key == 0) {
+ sockt->key = hashtable_alloc_key(hashtable);
+ }
+ sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
+ if(sockt->queue == NULL ) {
+ logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
+ if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
+ err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
+ return EBUS_KEY_INUSED;
+ }
+
+ // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened
+ // stRecord.status = SHM_QUEUE_ST_OPENED;
+ // stRecord.createTime = time(NULL);
+ // shmQueueStMap->insert({sockt->key, stRecord});
}
-
- // 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened
- // stRecord.status = SHM_QUEUE_ST_OPENED;
- // stRecord.createTime = time(NULL);
- // shmQueueStMap->insert({sockt->key, stRecord});
+
+ if ((fixedQueue == NULL) && (reset != 0)) {
+ fixedQueue = shm_socket_bind_queue(data_set, false);
+ if (fixedQueue == NULL ) {
+ logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
+ if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
+ err_exit(rv, "shm_sendto : pthread_mutex_unlock");
+ return EBUS_KEY_INUSED;
+ }
+ }
if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
@@ -669,7 +717,11 @@
LABEL_POP:
- rv = sockt->queue->pop(recvpak, timeout, flag);
+ if (reset == 0) {
+ rv = sockt->queue->pop(recvpak, timeout, flag);
+ } else {
+ rv = fixedQueue->pop(recvpak, timeout, flag);
+ }
if(rv != 0) {
if(rv == ETIMEDOUT) {
return EBUS_TIMEOUT;
@@ -697,6 +749,10 @@
count += strlen(ptr->public_info) + 1;
memcpy(dst + count, ptr->private_info, strlen(ptr->private_info) + 1);
count += strlen(ptr->private_info) + 1;
+ memcpy(dst + count, ptr->int_info, strlen(ptr->int_info) + 1);
+ count += strlen(ptr->int_info) + 1;
+ memcpy(dst + count, ptr->svr_info, strlen(ptr->svr_info) + 1);
+ count += strlen(ptr->svr_info) + 1;
*counter = count;
}
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index 8e874d1..2b50a11 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -66,9 +66,9 @@
/**
* @flags : BUS_NOWAIT_FLAG
*/
-int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0);
+int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0, int reset = 0, int data_set = 0);
-int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key, const struct timespec * timeout = NULL, int flags=0);
+int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key, const struct timespec * timeout = NULL, int flags=0, int reset = 0, int data_set = 0);
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size,
const struct timespec * timeout = NULL, int flags = 0);
--
Gitblit v1.8.0