From 7a12bed7a2550d037e6e869c1ed0ce115098dbb2 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期六, 13 三月 2021 18:44:51 +0800
Subject: [PATCH] update

---
 test_socket/heart_beat.cpp |  167 +++++++++++++++++++++++++++++++++++--------------------
 1 files changed, 107 insertions(+), 60 deletions(-)

diff --git a/test_socket/heart_beat.cpp b/test_socket/heart_beat.cpp
index cd21a84..021ca3d 100644
--- a/test_socket/heart_beat.cpp
+++ b/test_socket/heart_beat.cpp
@@ -6,7 +6,7 @@
 #include "usg_common.h"
 #include <getopt.h>
 
-
+static Logger *logger =  LoggerFactory::getLogger();
 typedef struct Targ {
   int port;
   int id;
@@ -20,24 +20,45 @@
   // exit(0);
 }
 
+
+void *serverSockt;
+static void server_stop_handler(int sig) {
+  printf("stop_handler\n");
+ 
+  int rv = net_mod_socket_stop(serverSockt);
+  if(rv ==0) {
+    logger->debug("send stop suc");
+    return;
+  } else {
+    logger->debug("send stop fail.%s\n", bus_strerror(rv));
+  }
+}
+
 void server(int port) {
-  void *serv = net_mod_socket_open();
-  net_mod_socket_bind(serv, port);
+  serverSockt = net_mod_socket_open();
+  net_mod_socket_bind(serverSockt, port);
   int size;
   void *recvbuf;
   char sendbuf[512];
   int rv;
   int remote_port;
+
+  signal(SIGTERM,  server_stop_handler);
+  signal(SIGINT,  server_stop_handler);
   while (true) {
-    if(net_mod_socket_recvfrom_timeout(serv, &recvbuf, &size, &remote_port, 0, 2000000000)==0) {
+    rv = net_mod_socket_recvfrom_timeout(serverSockt, &recvbuf, &size, &remote_port, 0, 2000000000);
+    if(rv == 0 ) {
       printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf);
-      net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port);
+      net_mod_socket_sendto(serverSockt, "suc", strlen("suc")+1, remote_port);
       free(recvbuf);
+    } else if(rv == EBUS_STOPED) {
+      logger->debug("Stopping\n");
+      break;
     }
     
   }
   // sleep(1000);
-  net_mod_socket_close(serv);
+  net_mod_socket_close(serverSockt);
 }
 
 void client(int port) {
@@ -49,14 +70,42 @@
   net_node_t node_arr[] = {"", 0, port};
   int node_arr_size = 1;
 
-  int recv_arr_size;
+  int recv_arr_size,  n;
   net_mod_recv_msg_t *recv_arr;
+  net_mod_err_t *errarr;
+  int errarr_size = 0;
+
+  // int recv_arr_size;
+  // net_mod_recv_msg_t *recv_arr;
   while (true) {
     sprintf(sendbuf, "%d", i);
-    rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 2000);
+    rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf),
+         &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000);
+    // rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 2000);
     //rv = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
     printf("SEND HEART:%s, suc nodes = %d\n", sendbuf, rv);
   
+    if(recv_arr_size > 0) {
+      for(i=0; i<recv_arr_size; i++) {
+        printf("reply from (host:%s, port: %d, key:%d) >> %s\n", 
+          recv_arr[i].host,
+          recv_arr[i].port,
+          recv_arr[i].key,
+          (char *)recv_arr[i].content
+        );
+      }
+
+      // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+      net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+    }
+        
+
+    if(errarr_size > 0) {
+      for(i = 0; i < errarr_size; i++) {
+        printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
+      }
+      free(errarr);
+    }
    // sleep(1);
     i++;
   }
@@ -64,66 +113,66 @@
 }
 
 
-void *runclient(void *arg) {
-  // signal(SIGINT,  sigint_handler);
-  Targ *targ = (Targ *)arg;
-  int port = targ->port;
-  void *client = net_mod_socket_open();
-  int size;
-  char sendbuf[512];
-  long scale = 100000;
-  long i = 0;
-  net_node_t node_arr[] = {"", 0, 100};
-  int node_arr_size = 1;
+// void *runclient(void *arg) {
+//   // signal(SIGINT,  sigint_handler);
+//   Targ *targ = (Targ *)arg;
+//   int port = targ->port;
+//   void *client = net_mod_socket_open();
+//   int size;
+//   char sendbuf[512];
+//   long scale = 100000;
+//   long i = 0;
+//   net_node_t node_arr[] = {"", 0, 100};
+//   int node_arr_size = 1;
 
-  int recv_arr_size;
-  net_mod_recv_msg_t *recv_arr;
+//   int recv_arr_size;
+//   net_mod_recv_msg_t *recv_arr;
 
-  while (i < scale) {
-    sprintf(sendbuf, "%d", i);
-    printf("%d SEND HEART:%s\n", targ->id, sendbuf);
-    net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
-    // net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
-    i++;
-  }
+//   while (i < scale) {
+//     sprintf(sendbuf, "%d", i);
+//     printf("%d SEND HEART:%s\n", targ->id, sendbuf);
+//     net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
+//     // net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
+//     i++;
+//   }
   
-   net_mod_socket_close(client);
-  return (void *)i;
-}
+//    net_mod_socket_close(client);
+//   return (void *)i;
+// }
 
  
-void mclient(int port) {
+// void mclient(int port) {
 
-  int status, i = 0, processors = 4;
-  void *res[processors];
-  Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
-  pthread_t tids[processors];
-  char sendbuf[512];
+//   int status, i = 0, processors = 4;
+//   void *res[processors];
+//   Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
+//   pthread_t tids[processors];
+//   char sendbuf[512];
 
-  struct timeval start;
-  gettimeofday(&start, NULL);
-  for (i = 0; i < processors; i++) {
-    targs[i].port = port;
-    targs[i].id = i;
-    pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]);
-  }
+//   struct timeval start;
+//   gettimeofday(&start, NULL);
+//   for (i = 0; i < processors; i++) {
+//     targs[i].port = port;
+//     targs[i].id = i;
+//     pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]);
+//   }
 
-  for (i = 0; i < processors; i++) {
-    if (pthread_join(tids[i], &res[i]) != 0) {
-      perror("multyThreadClient pthread_join");
-    } else {
-      fprintf(stderr, "client(%d) 鍙戦�� %ld 鏉℃暟鎹甛n", i, (long)res[i]);
-    }
-  }
+//   for (i = 0; i < processors; i++) {
+//     if (pthread_join(tids[i], &res[i]) != 0) {
+//       perror("multyThreadClient pthread_join");
+//     } else {
+//       fprintf(stderr, "client(%d) 鍙戦�� %ld 鏉℃暟鎹甛n", i, (long)res[i]);
+//     }
+//   }
 
-  struct timeval end;
-  gettimeofday(&end, NULL);
+//   struct timeval end;
+//   gettimeofday(&end, NULL);
 
-  double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
-  long diffsec = (long) (difftime/1000000);
-  long diffmsec = difftime - diffsec*1000000;
-  printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec);
-}
+//   double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
+//   long diffsec = (long) (difftime/1000000);
+//   long diffmsec = difftime - diffsec*1000000;
+//   printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec);
+// }
 
 int main(int argc, char *argv[]) {
   shm_mm_wrapper_init(512);
@@ -139,8 +188,6 @@
     server(port);
   else if (strcmp("client", argv[1]) == 0)
     client(port);
-  else if (strcmp("mclient", argv[1]) == 0)
-    mclient(port);
 
   shm_mm_wrapper_destroy();
   return 0;

--
Gitblit v1.8.0