From 26ed48c4e616014ee760fd13d13dbdc8539c34e3 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期二, 22 十二月 2020 19:21:55 +0800
Subject: [PATCH] 解决sendandrecv发送到一个不存在key的情况
---
src/socket/net_mod_socket.c | 37 +++++++-----
src/socket/shm_socket.c | 40 ++++++++++--
test_net_socket/test_net_mod_socket.c | 37 ++++++++----
src/socket/bus_server_socket_wrapper.c | 4
src/socket/net_mod_socket_wrapper.c | 2
src/socket/net_mod_server_socket_wrapper.c | 3
6 files changed, 84 insertions(+), 39 deletions(-)
diff --git a/src/socket/bus_server_socket_wrapper.c b/src/socket/bus_server_socket_wrapper.c
index 124b1a7..5c793c1 100644
--- a/src/socket/bus_server_socket_wrapper.c
+++ b/src/socket/bus_server_socket_wrapper.c
@@ -7,7 +7,7 @@
* 鍒涘缓
*/
void * bus_server_socket_wrapper_open() {
-
+ printf("===bus_server_socket_wrapper_open\n");
BusServerSocket *sockt = new BusServerSocket;
return (void *)sockt;
}
@@ -16,7 +16,7 @@
* 鍏抽棴
*/
void bus_server_socket_wrapper_close(void *_socket) {
- printf("bus_server_socket_wrapper_close\n");
+ printf("===bus_server_socket_wrapper_close\n");
BusServerSocket *sockt = (BusServerSocket *)_socket;
delete sockt;
}
diff --git a/src/socket/net_mod_server_socket_wrapper.c b/src/socket/net_mod_server_socket_wrapper.c
index 6f8be04..c05832c 100644
--- a/src/socket/net_mod_server_socket_wrapper.c
+++ b/src/socket/net_mod_server_socket_wrapper.c
@@ -2,13 +2,14 @@
#include "net_mod_server_socket_wrapper.h"
void *net_mod_server_socket_open(int port) {
+ printf("====net_mod_server_socket_open\n");
net_mod_server_socket_t *sockt = (net_mod_server_socket_t *)malloc(sizeof(net_mod_server_socket_t));
sockt->sockt = new NetModServerSocket(port);
return (void *)sockt;
}
void net_mod_server_socket_close(void *_sockt) {
- printf("net_mod_server_socket_close\n");
+ printf("====net_mod_server_socket_close\n");
net_mod_server_socket_t *sockt = (net_mod_server_socket_t *)_sockt;
delete sockt->sockt;
free(sockt);
diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c
index 1e1fc27..48b0e7c 100644
--- a/src/socket/net_mod_socket.c
+++ b/src/socket/net_mod_socket.c
@@ -85,7 +85,7 @@
int i, n, recv_size, connfd;
net_node_t *node;
- void *recv_buf;
+ void *recv_buf = NULL;
net_mod_request_head_t request_head = {};
@@ -131,13 +131,15 @@
node = &node_arr[i];
if(node->host == NULL || strcmp(node->host, "") == 0 ) {
// 鏈湴鍙戦��
- shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
- strcpy( ret_arr[n_recv_suc].host,"");
- ret_arr[n_recv_suc].port = 0;
- ret_arr[n_recv_suc].key = node->key;
- ret_arr[n_recv_suc].content = recv_buf;
- ret_arr[n_recv_suc].content_length = recv_size;
- n_recv_suc++;
+ if(shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size) == 0) {
+ strcpy( ret_arr[n_recv_suc].host,"");
+ ret_arr[n_recv_suc].port = 0;
+ ret_arr[n_recv_suc].key = node->key;
+ ret_arr[n_recv_suc].content = recv_buf;
+ ret_arr[n_recv_suc].content_length = recv_size;
+ n_recv_suc++;
+ }
+
continue;
}
@@ -234,6 +236,17 @@
return n_recv_suc;
}
+
+
+void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) {
+
+ for(int i =0; i< size; i++) {
+ if(arr[i].content != NULL)
+ free(arr[i].content);
+ }
+ free(arr);
+}
+
int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) {
return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, -1);
@@ -631,14 +644,6 @@
return shmModSocket.get_key();
}
-
-void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) {
-
- for(int i =0; i< size; i++) {
- free(arr[i].content);
- }
- free(arr);
-}
diff --git a/src/socket/net_mod_socket_wrapper.c b/src/socket/net_mod_socket_wrapper.c
index f464487..61373dd 100644
--- a/src/socket/net_mod_socket_wrapper.c
+++ b/src/socket/net_mod_socket_wrapper.c
@@ -7,6 +7,7 @@
* 鍒涘缓
*/
void * net_mod_socket_open() {
+ printf("=====net_mod_socket_open\n");
net_mod_socket_t *sockt = (net_mod_socket_t *)malloc(sizeof(net_mod_socket_t));
sockt->sockt = new NetModSocket;
return (void *)sockt;
@@ -16,6 +17,7 @@
* 鍏抽棴
*/
void net_mod_socket_close(void *_socket) {
+ printf("====net_mod_socket_close\n");
net_mod_socket_t *sockt = (net_mod_socket_t *)_socket;
delete sockt->sockt;
free(sockt);
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index 9581b69..efb3ef7 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -7,9 +7,12 @@
-void print_msg(char *head, shm_msg_t &msg) {
+static void print_msg(char *head, shm_msg_t &msg) {
// err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type);
}
+
+static pthread_once_t _once_ = PTHREAD_ONCE_INIT;
+static pthread_key_t _tmp_recv_socket_key_;
static void *_server_run_msg_rev(void *_socket);
@@ -18,6 +21,9 @@
static int _shm_close_dgram_socket(shm_socket_t *socket);
static int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote);
+
+static void _destrory_tmp_recv_socket_(void *tmp_socket);
+static void _create_tmp_recv_socket_key(void);
// 妫�鏌ey鏄惁宸茬粡琚娇鐢紝鏄繑鍥�0, 鍚﹁繑鍥�1
static inline int _shm_socket_check_key(shm_socket_t *socket) {
@@ -38,6 +44,7 @@
}
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
+
shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
socket->socket_type = socket_type;
socket->key = -1;
@@ -45,12 +52,14 @@
socket->dispatch_thread = 0;
socket->status = SHM_CONN_CLOSED;
socket->mutex = SemUtil::get(IPC_PRIVATE, 1);
+ logger->debug("shm_open_socket\n");
return socket;
}
-int shm_close_socket(shm_socket_t *socket) {
+static int _shm_close_socket(shm_socket_t *socket) {
int ret;
+
switch (socket->socket_type) {
case SHM_SOCKET_STREAM:
ret = _shm_close_stream_socket(socket, true);
@@ -61,9 +70,17 @@
default:
break;
}
- SemUtil::remove(socket->mutex);
free(socket);
+ SemUtil::remove(socket->mutex);
+ logger->debug("shm_close_socket\n");
return ret;
+}
+
+int shm_close_socket(shm_socket_t *socket) {
+
+ // _destrory_tmp_recv_socket_((shm_socket_t *)pthread_getspecific(_tmp_recv_socket_key_));
+
+ return _shm_close_socket(socket);;
}
int shm_socket_bind(shm_socket_t *socket, int key) {
@@ -387,14 +404,20 @@
}
}
-static pthread_once_t _once_ = PTHREAD_ONCE_INIT;
-static pthread_key_t _tmp_recv_socket_key_;
/* Free thread-specific data buffer */
static void _destrory_tmp_recv_socket_(void *tmp_socket)
{
+ int rv;
+ if(tmp_socket == NULL)
+ return;
logger->debug("%d destroy tmp socket\n", pthread_self());
- shm_close_socket((shm_socket_t *)tmp_socket);
+ _shm_close_socket((shm_socket_t *)tmp_socket);
+ rv = pthread_setspecific(_tmp_recv_socket_key_, NULL);
+ if ( rv != 0) {
+ logger->error(rv, "shm_sendandrecv : pthread_setspecific");
+ exit(1);
+ }
}
/* One-time key creation function */
@@ -405,6 +428,7 @@
/* Allocate a unique thread-specific data key and save the address
of the destructor for thread-specific data buffers */
s = pthread_key_create(&_tmp_recv_socket_key_, _destrory_tmp_recv_socket_);
+ //s = pthread_key_create(&_tmp_recv_socket_key_, NULL);
if (s != 0) {
logger->error(s, "pthread_key_create");
abort(); /* dump core and terminate */
@@ -444,8 +468,8 @@
logger->debug("%d create tmp socket\n", pthread_self() );
tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
- rv = pthread_setspecific(_tmp_recv_socket_key_, tmp_socket);
- if (rv != 0) {
+ rv = pthread_setspecific(_tmp_recv_socket_key_, tmp_socket);
+ if ( rv != 0) {
logger->error(rv, "shm_sendandrecv : pthread_setspecific");
exit(1);
}
diff --git a/test_net_socket/test_net_mod_socket.c b/test_net_socket/test_net_mod_socket.c
index 1ca04d0..f509e2c 100644
--- a/test_net_socket/test_net_mod_socket.c
+++ b/test_net_socket/test_net_mod_socket.c
@@ -15,6 +15,7 @@
}Targ;
struct argument_t {
+ bool interactive;
char *fun;
int port;
int key;
@@ -54,13 +55,16 @@
}
}
-void start_net_proxy(int port) {
+void start_net_proxy(argument_t &arg) {
pthread_t tid;
printf("Start net proxy\n");
- void *serverSocket = net_mod_server_socket_open(port);
+ void *serverSocket = net_mod_server_socket_open(arg.port);
// 鍒涘缓涓�涓嚎绋�,鍙互鍏抽棴server
- pthread_create(&tid, NULL, proxy_server_handler, serverSocket);
+ if(arg.interactive) {
+ pthread_create(&tid, NULL, proxy_server_handler, serverSocket);
+ }
+
if(net_mod_server_socket_start(serverSocket) != 0) {
err_exit(errno, "net_mod_server_socket_start");
}
@@ -103,12 +107,14 @@
-void start_bus_server() {
+void start_bus_server(argument_t &arg) {
printf("Start bus server\n");
void * server_socket = bus_server_socket_wrapper_open();
pthread_t tid;
// 鍒涘缓涓�涓嚎绋�,鍙互鍏抽棴bus
- // pthread_create(&tid, NULL, bus_handler, server_socket);
+ if(arg.interactive)
+ pthread_create(&tid, NULL, bus_handler, server_socket);
+
if(bus_server_socket_wrapper_start_bus(server_socket) != 0) {
printf("start bus failed\n");
exit(1);
@@ -152,12 +158,12 @@
//192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.5.104:5000:11
net_node_t *node_arr;
int node_arr_size = parse_node_list(sendlist, &node_arr);
- // print_node_list(node_arr, node_arr_size);
+ print_node_list(node_arr, node_arr_size);
//192.168.5.10:5000:8, 192.168.5.22:5000:8, 192.168.5.104:5000:8
net_node_t *pub_node_arr;
int pub_node_arr_size = parse_node_list(publist, &pub_node_arr);
- // print_node_list(pub_node_arr, pub_node_arr_size);
+ print_node_list(pub_node_arr, pub_node_arr_size);
while (true) {
//printf("Usage: pub <topic> [content] or sub <topic>\n");
@@ -405,12 +411,12 @@
usage(argv[0]);
exit(1);
}
- start_net_proxy(opt.port);
+ start_net_proxy(opt);
}
else if (strcmp("start_bus_server", opt.fun) == 0) {
- start_bus_server();
+ start_bus_server(opt);
}
else if (strcmp("start_reply", opt.fun) == 0) {
if(opt.key == 0) {
@@ -497,6 +503,7 @@
argument_t mopt = {};
// mopt.volume_list_size = 0;
+ mopt.interactive = false;
opterr = 0;
@@ -508,6 +515,7 @@
{"fun", required_argument, 0, 'f'},
{"key", required_argument, 0, 'k'},
{"port", required_argument, 0, 'p'},
+ {"interactive", no_argument, 0, 'i'},
{"sendlist", required_argument, (int *)mopt.sendlist, 0},
{"publist", required_argument, (int *)mopt.publist, 0},
{0, 0, 0, 0}
@@ -518,7 +526,7 @@
{
- c = getopt_long (argc, argv, "+f:k:p:", long_options, &option_index);
+ c = getopt_long (argc, argv, "+f:k:p:i", long_options, &option_index);
/* Detect the end of the options. */
if (c == -1)
@@ -552,6 +560,10 @@
case 'k':
mopt.key = atoi(optarg);
+ break;
+
+ case 'i':
+ mopt.interactive = true;
break;
case 'p':
@@ -608,7 +620,8 @@
net_node_t *node_arr = (net_node_t *) calloc(entry_arr_len, sizeof(net_node_t));
for(i = 0; i < entry_arr_len; i++) {
property_arr_len = str_split(entry_arr[i], ":", &property_arr);
- // printf("%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]);
+ printf("=====%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]);
+
node_arr[i] = {trim(property_arr[0], 0), atoi(property_arr[1]), 0};
free(property_arr[1]);
@@ -628,7 +641,7 @@
void print_node_list(net_node_t *node_arr, int len) {
printf("============node list begin==========\n");
for(int i = 0; i < len; i++) {
- printf("%s,%d,%d,\n", node_arr[i].host, node_arr[i].port, node_arr[i].key);
+ printf("host=%s, port=%d, key=%d \n", node_arr[i].host, node_arr[i].port, node_arr[i].key);
}
printf("============node list end==========\n");
}
--
Gitblit v1.8.0