From cb85aa8a8d02a3d6dc16e3f32e78da9e70f9c7f5 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期二, 02 二月 2021 17:49:21 +0800
Subject: [PATCH] update
---
src/net/net_mod_socket_wrapper.cpp | 24 ++++----
src/socket/socket_def.h | 14 ----
test_net_socket/test_net_mod_socket.cpp | 22 +------
src/socket/shm_socket.h | 11 ---
src/bus_error.h | 1
src/bus_error.cpp | 3
src/socket/shm_socket.cpp | 76 ++++++++++++++-----------
7 files changed, 62 insertions(+), 89 deletions(-)
diff --git a/src/bus_error.cpp b/src/bus_error.cpp
index e44e5ef..2c410eb 100644
--- a/src/bus_error.cpp
+++ b/src/bus_error.cpp
@@ -17,7 +17,8 @@
"The other end is not inline",
"Key already in use",
"Network fault",
- "Send to self error"
+ "Send to self error",
+ "Receive from wrong end"
};
diff --git a/src/bus_error.h b/src/bus_error.h
index 9770ccd..7677545 100644
--- a/src/bus_error.h
+++ b/src/bus_error.h
@@ -10,6 +10,7 @@
#define EBUS_KEY_INUSED 503
#define EBUS_NET 504
#define EBUS_SENDTO_SELF 505
+#define EBUS_RECVFROM_WRONG_END 506
extern int bus_errno;
diff --git a/src/net/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp
index be44751..57073f0 100644
--- a/src/net/net_mod_socket_wrapper.cpp
+++ b/src/net/net_mod_socket_wrapper.cpp
@@ -53,15 +53,14 @@
int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){
NetModSocket *sockt = (NetModSocket *)_socket;
logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key);
- // return sockt->sendto_timeout(buf, size, key, sec, nsec);
- return sockt->sendto(buf, size, key);
+ return sockt->sendto_timeout(buf, size, key, sec, nsec);
+ // return sockt->sendto(buf, size, key);
}
// 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){
NetModSocket *sockt = (NetModSocket *)_socket;
logger->debug("net_mod_socket_sendto: %d sendto %d", net_mod_socket_get_key(_socket), key);
- return sockt->sendto(buf, size, key);
- // return sockt->sendto_nowait(buf, size, key);
+ return sockt->sendto_nowait(buf, size, key);
}
/**
@@ -78,16 +77,17 @@
logger->debug(" %d net_mod_socket_recvfrom after. rv = %d", net_mod_socket_get_key(_socket), rv);
return rv;
}
+
// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){
NetModSocket *sockt = (NetModSocket *)_socket;
- return sockt->recvfrom(buf, size, key);
- // return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
+ // return sockt->recvfrom(buf, size, key);
+ return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
}
+
int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){
NetModSocket *sockt = (NetModSocket *)_socket;
- return sockt->recvfrom(buf, size, key);
- // return sockt->recvfrom_nowait(buf, size, key);
+ return sockt->recvfrom_nowait(buf, size, key);
}
int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
@@ -95,6 +95,7 @@
NetModSocket *sockt = (NetModSocket *)_socket;
return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size);
}
+
/**
* 濡傛灉寤虹珛杩炴帴鐨勮妭鐐规病鏈夋帴鍙楀埌娑堟伅绛夊緟timeout鐨勬椂闂村悗杩斿洖
* @timeout 绛夊緟鏃堕棿锛屽崟浣嶆槸鍗冨垎涔嬩竴绉�
@@ -102,15 +103,14 @@
int net_mod_socket_sendandrecv_timeout(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout){
NetModSocket *sockt = (NetModSocket *)_socket;
- return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size);
- // return sockt->sendandrecv_timeout(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, timeout);
+ // return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size);
+ return sockt->sendandrecv_timeout(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size, timeout);
}
int net_mod_socket_sendandrecv_nowait(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
NetModSocket *sockt = (NetModSocket *)_socket;
- return sockt->sendandrecv(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size);
- // return sockt->sendandrecv_nowait(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size);
+ return sockt->sendandrecv_nowait(node_arr, arrlen, send_buf, send_size, recv_arr, recv_arr_size);
}
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index a76cb4b..9544827 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -182,7 +182,6 @@
}
shm_packet_t dest;
- dest.type = SHM_COMMON_MSG;
dest.key = sockt->key;
dest.size = size;
dest.buf = mm_malloc(size);
@@ -287,7 +286,6 @@
//s = pthread_key_create(&_perthread_socket_key_, NULL);
if (s != 0) {
logger->error(s, "pthread_key_create");
- abort(); /* dump core and terminate */
exit(1);
}
}
@@ -299,11 +297,13 @@
int *recv_size, const struct timespec *timeout, int flags) {
int recv_key;
int rv;
+ int tryn = 0;
// 鐢╰hread local 淇濊瘉姣忎釜绾跨▼鐢ㄤ竴涓嫭鍗犵殑socket鎺ュ彈瀵规柟杩斿洖鐨勪俊鎭�
shm_socket_t *tmp_socket;
-
+ /* If first call from this thread, allocate buffer for thread, and save its location */
+ // logger->debug("%d create tmp socket\n", pthread_self() );
rv = pthread_once(&_once_, _create_socket_key_perthread);
if (rv != 0) {
logger->error(rv, "shm_sendandrecv pthread_once");
@@ -325,26 +325,31 @@
}
if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
- rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
- if(rv != 0) {
- logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv));
- }
- else if(rv == 0 ) {
- logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
-
- if(recv_key == shm_socket_get_key(sockt)) {
- logger->debug("=====鏀跺埌浜嗚嚜宸卞彂缁欒嚜宸辩殑娑堟伅\n");
+
+ while(tryn < 3) {
+ tryn++;
+ rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
+ if(rv != 0) {
+ logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv));
+ return rv;
}
- assert( send_key == recv_key);
+
+ // 瓒呮椂瀵艰嚧鎺ュ彂閫佸璞★紝涓庤繑鍥炲璞′笉瀵瑰簲鐨勬儏鍐�
if(send_key != recv_key) {
- logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
- exit(1);
+ logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
+ // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
+ // exit(1);
+ continue;
+ // return EBUS_RECVFROM_WRONG_END;
}
+
+ return 0;
}
- return rv;
- } else {
- return rv;
- }
+
+ return EBUS_RECVFROM_WRONG_END;
+ }
+
+ return rv;
}
int _shm_sendandrecv_alloc_new(shm_socket_t *sockt, const void *send_buf,
@@ -353,31 +358,34 @@
int recv_key;
int rv;
- // 鐢╰hread local 淇濊瘉姣忎釜绾跨▼鐢ㄤ竴涓嫭鍗犵殑socket鎺ュ彈瀵规柟杩斿洖鐨勪俊鎭�
+ int tryn = 0;
shm_socket_t *tmp_socket;
- /* If first call from this thread, allocate buffer for thread, and save its location */
- // logger->debug("%d create tmp socket\n", pthread_self() );
+
tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
- if(rv != 0) {
- printf("_shm_sendandrecv_alloc_new : %s\n", bus_strerror(rv));
- }
- else if(rv == 0 ) {
- printf("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
-
- if(recv_key == shm_socket_get_key(sockt)) {
- printf("=====鏀跺埌浜嗚嚜宸卞彂缁欒嚜宸辩殑娑堟伅\n");
- }
- assert( send_key == recv_key);
- if(send_key != recv_key) {
- err_exit(0, "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
+ while(tryn < 3) {
+ tryn++;
+ rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
+ if(rv != 0) {
+ logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv));
+ return rv;
}
+ // 瓒呮椂瀵艰嚧鎺ュ彂閫佸璞★紝涓庤繑鍥炲璞′笉瀵瑰簲鐨勬儏鍐�
+ if(send_key != recv_key) {
+ // logger->debug("======%d use tmp_socket %d, send to %d, receive from %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
+ // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
+
+ continue;
+ }
+ return 0;
}
+
+ return EBUS_RECVFROM_WRONG_END;
}
shm_close_socket(tmp_socket);
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index ea1ae38..198d4da 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -12,19 +12,10 @@
SHM_SOCKET_DGRAM = 2
};
-
-enum shm_packet_type_t
-{
- SHM_SOCKET_OPEN = 1,
- SHM_SOCKET_OPEN_REPLY = 2,
- SHM_SOCKET_CLOSE = 3,
- SHM_COMMON_MSG = 4
-
-};
+
typedef struct shm_packet_t {
int key;
- shm_packet_type_t type;
size_t size;
void * buf;
diff --git a/src/socket/socket_def.h b/src/socket/socket_def.h
index 56cca6d..cd6f2a6 100644
--- a/src/socket/socket_def.h
+++ b/src/socket/socket_def.h
@@ -20,18 +20,4 @@
};
-
-
-
-// typedef struct shm_bus_msg_t {
-// void *topic;
-// int topic_length;
-
-// } shm_bus_msg_t;
-
-#define ACTION_LIDENTIFIER "<**"
-#define ACTION_RIDENTIFIER "**>"
-#define TOPIC_LIDENTIFIER "{"
-#define TOPIC_RIDENTIFIER "}"
-
#endif
\ No newline at end of file
diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp
index 79f102a..94be25b 100644
--- a/test_net_socket/test_net_mod_socket.cpp
+++ b/test_net_socket/test_net_mod_socket.cpp
@@ -272,11 +272,11 @@
sprintf(sendbuf, hello_format, net_mod_socket_get_key(client), l);
// fprintf(fp, "requst:%s\n", sendbuf);
// n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
- n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000);
+ n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1);
printf("%d: send %d nodes\n", l, n);
for(j=0; j < recv_arr_size; j++) {
- fprintf(fp, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n",
+ fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n",
net_mod_socket_get_key(client),
sendbuf,
targ->node->key,
@@ -372,7 +372,7 @@
while(true) {
sprintf(buf, hello_format, pid, l);
n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1,
- &recv_arr, &recv_arr_size, 1000);
+ &recv_arr, &recv_arr_size, 1);
printf(" %d nodes reply\n", n);
for(j = 0; j < recv_arr_size; j++) {
@@ -386,21 +386,7 @@
);
- // printf( "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n",
- // net_mod_socket_get_key(client),
- // sendbuf,
- // targ->node->key,
- // recv_arr[j].host,
- // recv_arr[j].port,
- // recv_arr[j].key,
- // recv_arr[j].content
- // );
-
-
- // assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3);
- // assert(targ->node->key == rkey);
- // assert(net_mod_socket_get_key(client) == lkey);
- // assert(rl == l);
+
assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3);
assert(retPid == pid);
--
Gitblit v1.8.0