From a38304f7f6b91aaa1b0aa76cc9d3e5b6aef1f85f Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期六, 13 三月 2021 17:07:22 +0800
Subject: [PATCH] update
---
test_net_socket/shm_util.cpp | 207 ++++++++++++++++++++++++++++++++-------------------
1 files changed, 130 insertions(+), 77 deletions(-)
diff --git a/test_net_socket/shm_util.cpp b/test_net_socket/shm_util.cpp
index 549e29f..1b3dca3 100644
--- a/test_net_socket/shm_util.cpp
+++ b/test_net_socket/shm_util.cpp
@@ -44,7 +44,7 @@
pthread_detach(pthread_self());
char action[512];
- while ( true) {
+ while ( true ) {
printf("Input action: Close?\n");
if(scanf("%s",action) < 1) {
printf("Invalide action\n");
@@ -200,6 +200,8 @@
int recv_arr_size, i, n;
net_mod_recv_msg_t *recv_arr;
+ net_mod_err_t *errarr;
+ int errarr_size = 0;
pthread_t tid;
// 鍒涘缓涓�涓嚎绋嬫帴鍙楄闃呮秷鎭�
@@ -234,19 +236,31 @@
if (fgets(content, MAXLINE, stdin) != NULL) {
// 鏀跺埌娑堟伅鐨勮妭鐐瑰嵆浣挎病鏈夊搴旂殑淇℃伅锛� 涔熻鍥炲涓�涓〃绀烘棤鐨勬秷鎭�,鍚﹀垯浼氫竴鐩寸瓑寰�
// n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size);
- n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1);
+ n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content),
+ &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000);
printf(" %d nodes reply\n", n);
- for(i=0; i<recv_arr_size; i++) {
- printf("reply from (host:%s, port: %d, key:%d) >> %s\n",
- recv_arr[i].host,
- recv_arr[i].port,
- recv_arr[i].key,
- (char *)recv_arr[i].content
- );
- }
+
+ if(recv_arr_size > 0) {
+ for(i=0; i<recv_arr_size; i++) {
+ printf("reply from (host:%s, port: %d, key:%d) >> %s\n",
+ recv_arr[i].host,
+ recv_arr[i].port,
+ recv_arr[i].key,
+ (char *)recv_arr[i].content
+ );
+ }
+
+ // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+ net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+ }
- // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
- net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+
+ if(errarr_size > 0) {
+ for(i = 0; i < errarr_size; i++) {
+ printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
+ }
+ free(errarr);
+ }
}
}
else if(strcmp(action, "desub") == 0) {
@@ -290,10 +304,12 @@
Targ *targ = (Targ *)arg;
char sendbuf[128];
- int j, n;
- int recv_arr_size;
+ int i, j, n;
+ int recv_arr_size = 0;
net_mod_recv_msg_t *recv_arr;
int total = 0;
+ net_mod_err_t *errarr;
+ int errarr_size = 0;
int rkey, lkey;
unsigned int l = 0 , rl;
@@ -312,30 +328,42 @@
sprintf(sendbuf, hello_format, net_mod_socket_get_key(client), l);
// fprintf(fp, "requst:%s\n", sendbuf);
// n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
- n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1);
+ n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1,
+ &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1);
printf("%d: send %d nodes\n", l, n);
- for(j=0; j < recv_arr_size; j++) {
- fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n",
- net_mod_socket_get_key(client),
- sendbuf,
- targ->node->key,
- recv_arr[j].host,
- recv_arr[j].port,
- recv_arr[j].key,
- (char *)recv_arr[j].content
- );
+ if(recv_arr_size > 0) {
+ for(j=0; j < recv_arr_size; j++) {
+ fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n",
+ net_mod_socket_get_key(client),
+ sendbuf,
+ targ->node->key,
+ recv_arr[j].host,
+ recv_arr[j].port,
+ recv_arr[j].key,
+ (char *)recv_arr[j].content
+ );
- printf("key == %d\n", net_mod_socket_get_key(client));
- assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3);
- assert(targ->node->key == rkey);
- assert(net_mod_socket_get_key(client) == lkey);
- assert(rl == l);
+ printf("key == %d\n", net_mod_socket_get_key(client));
+ assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3);
+ assert(targ->node->key == rkey);
+ assert(net_mod_socket_get_key(client) == lkey);
+ assert(rl == l);
+ }
+ // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+ net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
}
- // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
- net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+
+ if(errarr_size > 0) {
+ for(i = 0; i < errarr_size; i++) {
+ printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
+ }
+ free(errarr);
+ }
+
total += n;
}
+
if(fp != NULL)
fclose(fp);
// net_mod_socket_close(client);
@@ -384,7 +412,7 @@
double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
long diffsec = (long) (difftime/1000000);
long diffusec = difftime - diffsec*1000000;
- fprintf(stderr,"鍙戦�佹暟鐩�:%ld, 鎴愬姛鏁扮洰: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n",
+ fprintf(stderr,"鍙戦�佹暟鐩�:%d, 鎴愬姛鏁扮洰: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n",
SCALE*node_arr_size, total, diffsec, diffusec, difftime/total );
// fflush(stdout);
@@ -393,11 +421,14 @@
// 鏃犻檺寰幆send
void test_net_sendandrecv(char *nodelist) {
- int n, j;
+ int i, n, j;
void * client;
int recv_arr_size;
net_mod_recv_msg_t *recv_arr;
net_node_t *node_arr;
+ net_mod_err_t *errarr;
+ int errarr_size = 0;
+
int node_arr_size = parse_node_list(nodelist, &node_arr);
char buf[128];
pid_t pid, retPid ;
@@ -413,30 +444,35 @@
while(true) {
sprintf(buf, hello_format, pid, l);
n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1,
- &recv_arr, &recv_arr_size, 1000);
+ &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000);
printf(" %d nodes reply\n", n);
- for(j = 0; j < recv_arr_size; j++) {
- printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n",
- (long)pid,
- buf,
- (char *)recv_arr[j].content,
- recv_arr[j].host,
- recv_arr[j].port,
- recv_arr[j].key
+ if(recv_arr_size > 0) {
+ for(j = 0; j < recv_arr_size; j++) {
+ printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n",
+ (long)pid,
+ buf,
+ (char *)recv_arr[j].content,
+ recv_arr[j].host,
+ recv_arr[j].port,
+ recv_arr[j].key
+ );
+ assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3);
+ assert(retPid == pid);
+ assert(retl == l);
+ assert(remoteKey == recv_arr[j].key);
+ }
+ // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+ net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+ }
- );
-
-
-
- assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3);
- assert(retPid == pid);
- assert(retl == l);
- assert(remoteKey == recv_arr[j].key);
+ if(errarr_size > 0) {
+ for(i = 0; i < errarr_size; i++) {
+ printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
+ }
+ free(errarr);
}
- // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
- net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
l++;
}
@@ -523,7 +559,7 @@
net_node_t *node_arr;
int node_arr_size = parse_node_list(nodelist, &node_arr);
- char *topic = "news";
+ const char *topic = "news";
sprintf(sendbuf, "pid:%ld: Good news!!", (long)getpid());
void * client = net_mod_socket_open();
@@ -577,31 +613,45 @@
}
}
-void do_sendandrecv(int key, char *sendbuf) {
- int n, j;
+void do_sendandrecv(char *sendlist, char *sendbuf) {
+ int i, n, j;
int recv_arr_size;
net_mod_recv_msg_t *recv_arr;
+ net_mod_err_t *errarr;
+ int errarr_size = 0;
- net_node_t node_arr[] = {NULL, 0, key};
+
+ net_node_t *node_arr;
+ int node_arr_size = parse_node_list(sendlist, &node_arr);
+
+ print_node_list(node_arr, node_arr_size);
void * client = net_mod_socket_open();
- n = net_mod_socket_sendandrecv_timeout(client, node_arr, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 5000);
- if(n == 0) {
- printf("send failed\n");
- return;
- }
- printf(" %d nodes reply\n", n);
- for(j=0; j < recv_arr_size; j++) {
+ n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1,
+ &recv_arr, &recv_arr_size, &errarr, &errarr_size, 5000);
+
+ printf(" %d nodes reply\n", recv_arr_size);
+ if(recv_arr_size > 0) {
+ for(j=0; j < recv_arr_size; j++) {
- fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n\n",
- net_mod_socket_get_key(client),
- sendbuf,
- key,
- recv_arr[j].host,
- recv_arr[j].port,
- recv_arr[j].key,
- (char *)recv_arr[j].content
- );
+ fprintf(stdout, "===> suc: %d send '%s'. received from (host=%s, port= %d, key=%d), '%s'\n\n",
+ net_mod_socket_get_key(client),
+ sendbuf,
+ recv_arr[j].host,
+ recv_arr[j].port,
+ recv_arr[j].key,
+ (char *)recv_arr[j].content
+ );
+ }
+ // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+ net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+ }
+// printf("errarr_size = %d\n", errarr_size);
+ if(errarr_size > 0) {
+ for(i = 0; i < errarr_size; i++) {
+ printf("===> error: (host:%s, port: %d, key:%d). %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
+ }
+ free(errarr);
}
net_mod_socket_close(client);
@@ -771,6 +821,10 @@
net_node_t *node_arr = (net_node_t *) calloc(entry_arr_len, sizeof(net_node_t));
for(i = 0; i < entry_arr_len; i++) {
+ if(strchr(entry_arr[i], ':') == NULL) {
+ node_arr[i]= {NULL, 0, atoi(entry_arr[i])};
+ continue;
+ }
property_arr_len = str_split(entry_arr[i], ":", &property_arr);
printf("=====%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]);
@@ -854,9 +908,9 @@
usage(prog);
exit(1);
}
- int key = atoi(argv[1]);
+ char *sendlist = argv[1];
char *content = argv[2];
- do_sendandrecv(key, content);
+ do_sendandrecv(sendlist, content);
}
else if (strcmp("start_bus_server", fun) == 0) {
@@ -881,7 +935,6 @@
opt = parse_args(argc, argv);
if(opt.bind == 0) {
usage(argv[0]);
- exit(1);
} else {
start_recvfrom(opt.bind, opt.force);
}
@@ -943,7 +996,7 @@
}
else {
- printf("%Invalid funciton name\n");
+ printf("Invalid funciton name\n");
usage(argv[0]);
exit(1);
--
Gitblit v1.8.0