From a38304f7f6b91aaa1b0aa76cc9d3e5b6aef1f85f Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期六, 13 三月 2021 17:07:22 +0800
Subject: [PATCH] update
---
test_net_socket/shm_util.cpp | 290 +++++++++++++++++++++++++++++++++++++++------------------
1 files changed, 198 insertions(+), 92 deletions(-)
diff --git a/test_net_socket/shm_util.cpp b/test_net_socket/shm_util.cpp
index bf214ed..1b3dca3 100644
--- a/test_net_socket/shm_util.cpp
+++ b/test_net_socket/shm_util.cpp
@@ -21,6 +21,8 @@
struct argument_t {
bool interactive;
+ bool force;
+ int bind;
int port;
int key;
char *sendlist;
@@ -42,7 +44,7 @@
pthread_detach(pthread_self());
char action[512];
- while ( true) {
+ while ( true ) {
printf("Input action: Close?\n");
if(scanf("%s",action) < 1) {
printf("Invalide action\n");
@@ -147,13 +149,18 @@
}
}
-void start_reply(int mkey) {
+void start_recvfrom(int mkey, bool force) {
logger->debug("start reply\n");
signal(SIGINT, stop_replyserver_handler);
signal(SIGTERM, stop_replyserver_handler);
serverSockt = net_mod_socket_open();
- net_mod_socket_bind(serverSockt, mkey);
+ if(force) {
+ net_mod_socket_force_bind(serverSockt, mkey);
+ } else {
+ net_mod_socket_bind(serverSockt, mkey);
+ }
+
int rv = 0 ;
while( true) {
@@ -163,6 +170,9 @@
if(rv == EBUS_STOPED) {
logger->debug("Stopping\n");
break;
+ } else if(rv == EBUS_KEY_INUSED){
+ printf("key宸茬粡琚崰鐢╘n");
+ exit(1);
}
logger->debug("net_mod_socket_recvandsend error.%s\n", bus_strerror(rv));
@@ -190,6 +200,8 @@
int recv_arr_size, i, n;
net_mod_recv_msg_t *recv_arr;
+ net_mod_err_t *errarr;
+ int errarr_size = 0;
pthread_t tid;
// 鍒涘缓涓�涓嚎绋嬫帴鍙楄闃呮秷鎭�
@@ -224,19 +236,31 @@
if (fgets(content, MAXLINE, stdin) != NULL) {
// 鏀跺埌娑堟伅鐨勮妭鐐瑰嵆浣挎病鏈夊搴旂殑淇℃伅锛� 涔熻鍥炲涓�涓〃绀烘棤鐨勬秷鎭�,鍚﹀垯浼氫竴鐩寸瓑寰�
// n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size);
- n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1);
+ n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content),
+ &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000);
printf(" %d nodes reply\n", n);
- for(i=0; i<recv_arr_size; i++) {
- printf("reply from (host:%s, port: %d, key:%d) >> %s\n",
- recv_arr[i].host,
- recv_arr[i].port,
- recv_arr[i].key,
- (char *)recv_arr[i].content
- );
- }
+
+ if(recv_arr_size > 0) {
+ for(i=0; i<recv_arr_size; i++) {
+ printf("reply from (host:%s, port: %d, key:%d) >> %s\n",
+ recv_arr[i].host,
+ recv_arr[i].port,
+ recv_arr[i].key,
+ (char *)recv_arr[i].content
+ );
+ }
+
+ // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+ net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+ }
- // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
- net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+
+ if(errarr_size > 0) {
+ for(i = 0; i < errarr_size; i++) {
+ printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
+ }
+ free(errarr);
+ }
}
}
else if(strcmp(action, "desub") == 0) {
@@ -280,10 +304,12 @@
Targ *targ = (Targ *)arg;
char sendbuf[128];
- int j, n;
- int recv_arr_size;
+ int i, j, n;
+ int recv_arr_size = 0;
net_mod_recv_msg_t *recv_arr;
int total = 0;
+ net_mod_err_t *errarr;
+ int errarr_size = 0;
int rkey, lkey;
unsigned int l = 0 , rl;
@@ -302,30 +328,42 @@
sprintf(sendbuf, hello_format, net_mod_socket_get_key(client), l);
// fprintf(fp, "requst:%s\n", sendbuf);
// n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
- n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1);
+ n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1,
+ &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1);
printf("%d: send %d nodes\n", l, n);
- for(j=0; j < recv_arr_size; j++) {
- fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n",
- net_mod_socket_get_key(client),
- sendbuf,
- targ->node->key,
- recv_arr[j].host,
- recv_arr[j].port,
- recv_arr[j].key,
- (char *)recv_arr[j].content
- );
+ if(recv_arr_size > 0) {
+ for(j=0; j < recv_arr_size; j++) {
+ fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n",
+ net_mod_socket_get_key(client),
+ sendbuf,
+ targ->node->key,
+ recv_arr[j].host,
+ recv_arr[j].port,
+ recv_arr[j].key,
+ (char *)recv_arr[j].content
+ );
- printf("key == %d\n", net_mod_socket_get_key(client));
- assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3);
- assert(targ->node->key == rkey);
- assert(net_mod_socket_get_key(client) == lkey);
- assert(rl == l);
+ printf("key == %d\n", net_mod_socket_get_key(client));
+ assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3);
+ assert(targ->node->key == rkey);
+ assert(net_mod_socket_get_key(client) == lkey);
+ assert(rl == l);
+ }
+ // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+ net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
}
- // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
- net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+
+ if(errarr_size > 0) {
+ for(i = 0; i < errarr_size; i++) {
+ printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
+ }
+ free(errarr);
+ }
+
total += n;
}
+
if(fp != NULL)
fclose(fp);
// net_mod_socket_close(client);
@@ -374,7 +412,7 @@
double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
long diffsec = (long) (difftime/1000000);
long diffusec = difftime - diffsec*1000000;
- fprintf(stderr,"鍙戦�佹暟鐩�:%ld, 鎴愬姛鏁扮洰: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n",
+ fprintf(stderr,"鍙戦�佹暟鐩�:%d, 鎴愬姛鏁扮洰: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n",
SCALE*node_arr_size, total, diffsec, diffusec, difftime/total );
// fflush(stdout);
@@ -383,11 +421,14 @@
// 鏃犻檺寰幆send
void test_net_sendandrecv(char *nodelist) {
- int n, j;
+ int i, n, j;
void * client;
int recv_arr_size;
net_mod_recv_msg_t *recv_arr;
net_node_t *node_arr;
+ net_mod_err_t *errarr;
+ int errarr_size = 0;
+
int node_arr_size = parse_node_list(nodelist, &node_arr);
char buf[128];
pid_t pid, retPid ;
@@ -403,30 +444,35 @@
while(true) {
sprintf(buf, hello_format, pid, l);
n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1,
- &recv_arr, &recv_arr_size, 1000);
+ &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000);
printf(" %d nodes reply\n", n);
- for(j = 0; j < recv_arr_size; j++) {
- printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n",
- (long)pid,
- buf,
- (char *)recv_arr[j].content,
- recv_arr[j].host,
- recv_arr[j].port,
- recv_arr[j].key
+ if(recv_arr_size > 0) {
+ for(j = 0; j < recv_arr_size; j++) {
+ printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n",
+ (long)pid,
+ buf,
+ (char *)recv_arr[j].content,
+ recv_arr[j].host,
+ recv_arr[j].port,
+ recv_arr[j].key
+ );
+ assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3);
+ assert(retPid == pid);
+ assert(retl == l);
+ assert(remoteKey == recv_arr[j].key);
+ }
+ // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+ net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+ }
- );
-
-
-
- assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3);
- assert(retPid == pid);
- assert(retl == l);
- assert(remoteKey == recv_arr[j].key);
+ if(errarr_size > 0) {
+ for(i = 0; i < errarr_size; i++) {
+ printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
+ }
+ free(errarr);
}
- // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
- net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
l++;
}
@@ -513,7 +559,7 @@
net_node_t *node_arr;
int node_arr_size = parse_node_list(nodelist, &node_arr);
- char *topic = "news";
+ const char *topic = "news";
sprintf(sendbuf, "pid:%ld: Good news!!", (long)getpid());
void * client = net_mod_socket_open();
@@ -528,16 +574,32 @@
void list () {
LockFreeQueue<shm_packet_t> * mqueue;
hashtable_t *hashtable = mm_get_hashtable();
- printf("%10s \t %10s\n", "KEY", "LENGTH");
+ printf("%10s \t %-10s \t %10s\n", "KEY", "LENGTH", "STATUS");
hashtable_foreach(hashtable, [&](int key, void * value){
if(key >= 100 ) {
mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, key);
- printf("%10d \t %10d\n", key, mqueue->size());
+ if((long)mqueue == 0x1) {
+ printf("%10d \t %-10s\n", key, "Not In Used");
+ } else {
+ printf("%10d \t %-10d\n", key, mqueue->size());
+ }
+
} else {
- printf("%10d \t %10s\n", key, "");
+ printf("%10d\n", key);
}
});
+}
+
+void info(int key) {
+ LockFreeQueue<shm_packet_t> * mqueue;
+ hashtable_t *hashtable = mm_get_hashtable();
+ mqueue = (LockFreeQueue<shm_packet_t> *) hashtable_get(hashtable, key);
+ printf("%10s: %-10p\n", "PTR", mqueue);
+ printf("%10s: %-10d\n", "KEY", key);
+ printf("%10s: %-10d\n", "LENGTH", mqueue->size());
+
+
}
@@ -551,31 +613,45 @@
}
}
-void do_sendandrecv(int key, char *sendbuf) {
- int n, j;
+void do_sendandrecv(char *sendlist, char *sendbuf) {
+ int i, n, j;
int recv_arr_size;
net_mod_recv_msg_t *recv_arr;
+ net_mod_err_t *errarr;
+ int errarr_size = 0;
- net_node_t node_arr[] = {NULL, 0, key};
+
+ net_node_t *node_arr;
+ int node_arr_size = parse_node_list(sendlist, &node_arr);
+
+ print_node_list(node_arr, node_arr_size);
void * client = net_mod_socket_open();
- n = net_mod_socket_sendandrecv_timeout(client, node_arr, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 5);
- if(n == 0) {
- printf("send failed\n");
- return;
- }
- printf(" %d nodes reply\n", n);
- for(j=0; j < recv_arr_size; j++) {
+ n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1,
+ &recv_arr, &recv_arr_size, &errarr, &errarr_size, 5000);
+
+ printf(" %d nodes reply\n", recv_arr_size);
+ if(recv_arr_size > 0) {
+ for(j=0; j < recv_arr_size; j++) {
- fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n\n",
- net_mod_socket_get_key(client),
- sendbuf,
- key,
- recv_arr[j].host,
- recv_arr[j].port,
- recv_arr[j].key,
- (char *)recv_arr[j].content
- );
+ fprintf(stdout, "===> suc: %d send '%s'. received from (host=%s, port= %d, key=%d), '%s'\n\n",
+ net_mod_socket_get_key(client),
+ sendbuf,
+ recv_arr[j].host,
+ recv_arr[j].port,
+ recv_arr[j].key,
+ (char *)recv_arr[j].content
+ );
+ }
+ // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+ net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+ }
+// printf("errarr_size = %d\n", errarr_size);
+ if(errarr_size > 0) {
+ for(i = 0; i < errarr_size; i++) {
+ printf("===> error: (host:%s, port: %d, key:%d). %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
+ }
+ free(errarr);
}
net_mod_socket_close(client);
@@ -604,6 +680,8 @@
fpe("./shm_util list\n");
fpe("# remove key 1001\n");
fpe("./shm_util rm 1001\n");
+ fpe("./shm_util info 1002\n");
+ fpe("./shm_util recvfrom --bind 1002 [--force]\n")
fpe("\n");
}
@@ -635,6 +713,8 @@
{"key", required_argument, 0, 'k'},
{"port", required_argument, 0, 'p'},
{"interactive", no_argument, 0, 'i'},
+ {"force", no_argument, 0, 'f'},
+ {"bind", required_argument, (int *)mopt.bind, 0},
{"sendlist", required_argument, (int *)mopt.sendlist, 0},
{"publist", required_argument, (int *)mopt.publist, 0},
{0, 0, 0, 0}
@@ -645,7 +725,7 @@
{
- c = getopt_long (argc, argv, "+f:k:p:i", long_options, &option_index);
+ c = getopt_long (argc, argv, "+fk:p:i", long_options, &option_index);
/* Detect the end of the options. */
if (c == -1)
@@ -664,6 +744,9 @@
else if(strcmp(long_options[option_index].name, "publist") == 0) {
mopt.publist = optarg;
}
+ else if(strcmp(long_options[option_index].name, "bind") == 0) {
+ mopt.bind = atoi(optarg);
+ }
else {
printf ("option %s", long_options[option_index].name);
if (optarg)
@@ -679,6 +762,10 @@
case 'i':
mopt.interactive = true;
+ break;
+
+ case 'f':
+ mopt.force = true;
break;
case 'p':
@@ -734,6 +821,10 @@
net_node_t *node_arr = (net_node_t *) calloc(entry_arr_len, sizeof(net_node_t));
for(i = 0; i < entry_arr_len; i++) {
+ if(strchr(entry_arr[i], ':') == NULL) {
+ node_arr[i]= {NULL, 0, atoi(entry_arr[i])};
+ continue;
+ }
property_arr_len = str_split(entry_arr[i], ":", &property_arr);
printf("=====%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]);
@@ -768,7 +859,7 @@
int i;
char *prog;
char * fun;
- argument_t opt;
+ argument_t opt = {};
shm_mm_wrapper_init(512);
@@ -788,24 +879,38 @@
else if (strcmp("list", fun) == 0 ) {
list();
}
+ else if (strcmp("info", fun) == 0 ) {
+ if(argc < 2) {
+
+ usage(prog);
+
+ } else {
+ for(i = 1; i < argc; i++) {
+ int key = atoi(argv[i]);
+ info(key);
+ }
+ }
+ }
else if (strcmp("rm", fun) == 0 ) {
if(argc < 2) {
usage(prog);
- exit(1);
+
+ } else {
+ for(i = 1; i < argc; i++) {
+ int key = atoi(argv[i]);
+ remove(key);
+ }
}
- for(i = 1; i < argc; i++) {
- int key = atoi(argv[i]);
- remove(key);
- }
+
}
else if (strcmp("sendandrecv", fun) == 0 ) {
if(argc < 3) {
usage(prog);
exit(1);
}
- int key = atoi(argv[1]);
+ char *sendlist = argv[1];
char *content = argv[2];
- do_sendandrecv(key, content);
+ do_sendandrecv(sendlist, content);
}
else if (strcmp("start_bus_server", fun) == 0) {
@@ -826,13 +931,14 @@
}
- else if (strcmp("start_reply", fun) == 0) {
+ else if (strcmp("recvfrom", fun) == 0) {
opt = parse_args(argc, argv);
- if(opt.key == 0) {
+ if(opt.bind == 0) {
usage(argv[0]);
- exit(1);
+ } else {
+ start_recvfrom(opt.bind, opt.force);
}
- start_reply(opt.key);
+
}
else if (strcmp("start_net_client", fun) == 0) {
opt = parse_args(argc, argv);
@@ -890,7 +996,7 @@
}
else {
- printf("%Invalid funciton name\n");
+ printf("Invalid funciton name\n");
usage(argv[0]);
exit(1);
--
Gitblit v1.8.0