wangzhengquan
2021-03-13 7a12bed7a2550d037e6e869c1ed0ce115098dbb2
test_socket/heart_beat.cpp
@@ -6,7 +6,7 @@
#include "usg_common.h"
#include <getopt.h>
static Logger *logger =  LoggerFactory::getLogger();
typedef struct Targ {
  int port;
  int id;
@@ -20,24 +20,45 @@
  // exit(0);
}
void *serverSockt;
static void server_stop_handler(int sig) {
  printf("stop_handler\n");
  int rv = net_mod_socket_stop(serverSockt);
  if(rv ==0) {
    logger->debug("send stop suc");
    return;
  } else {
    logger->debug("send stop fail.%s\n", bus_strerror(rv));
  }
}
void server(int port) {
  void *serv = net_mod_socket_open();
  net_mod_socket_bind(serv, port);
  serverSockt = net_mod_socket_open();
  net_mod_socket_bind(serverSockt, port);
  int size;
  void *recvbuf;
  char sendbuf[512];
  int rv;
  int remote_port;
  signal(SIGTERM,  server_stop_handler);
  signal(SIGINT,  server_stop_handler);
  while (true) {
    if(net_mod_socket_recvfrom_timeout(serv, &recvbuf, &size, &remote_port, 0, 2000000000)==0) {
    rv = net_mod_socket_recvfrom_timeout(serverSockt, &recvbuf, &size, &remote_port, 0, 2000000000);
    if(rv == 0 ) {
      printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf);
      net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port);
      net_mod_socket_sendto(serverSockt, "suc", strlen("suc")+1, remote_port);
      free(recvbuf);
    } else if(rv == EBUS_STOPED) {
      logger->debug("Stopping\n");
      break;
    }
    
  }
  // sleep(1000);
  net_mod_socket_close(serv);
  net_mod_socket_close(serverSockt);
}
void client(int port) {
@@ -49,14 +70,42 @@
  net_node_t node_arr[] = {"", 0, port};
  int node_arr_size = 1;
  int recv_arr_size;
  int recv_arr_size,  n;
  net_mod_recv_msg_t *recv_arr;
  net_mod_err_t *errarr;
  int errarr_size = 0;
  // int recv_arr_size;
  // net_mod_recv_msg_t *recv_arr;
  while (true) {
    sprintf(sendbuf, "%d", i);
    rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 2000);
    rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf),
         &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000);
    // rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 2000);
    //rv = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
    printf("SEND HEART:%s, suc nodes = %d\n", sendbuf, rv);
  
    if(recv_arr_size > 0) {
      for(i=0; i<recv_arr_size; i++) {
        printf("reply from (host:%s, port: %d, key:%d) >> %s\n",
          recv_arr[i].host,
          recv_arr[i].port,
          recv_arr[i].key,
          (char *)recv_arr[i].content
        );
      }
      // 使用完后,不要忘记释放掉
      net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
    }
    if(errarr_size > 0) {
      for(i = 0; i < errarr_size; i++) {
        printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
      }
      free(errarr);
    }
   // sleep(1);
    i++;
  }
@@ -64,66 +113,66 @@
}
void *runclient(void *arg) {
  // signal(SIGINT,  sigint_handler);
  Targ *targ = (Targ *)arg;
  int port = targ->port;
  void *client = net_mod_socket_open();
  int size;
  char sendbuf[512];
  long scale = 100000;
  long i = 0;
  net_node_t node_arr[] = {"", 0, 100};
  int node_arr_size = 1;
// void *runclient(void *arg) {
//   // signal(SIGINT,  sigint_handler);
//   Targ *targ = (Targ *)arg;
//   int port = targ->port;
//   void *client = net_mod_socket_open();
//   int size;
//   char sendbuf[512];
//   long scale = 100000;
//   long i = 0;
//   net_node_t node_arr[] = {"", 0, 100};
//   int node_arr_size = 1;
  int recv_arr_size;
  net_mod_recv_msg_t *recv_arr;
//   int recv_arr_size;
//   net_mod_recv_msg_t *recv_arr;
  while (i < scale) {
    sprintf(sendbuf, "%d", i);
    printf("%d SEND HEART:%s\n", targ->id, sendbuf);
    net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
    // net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
    i++;
  }
//   while (i < scale) {
//     sprintf(sendbuf, "%d", i);
//     printf("%d SEND HEART:%s\n", targ->id, sendbuf);
//     net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
//     // net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
//     i++;
//   }
  
   net_mod_socket_close(client);
  return (void *)i;
}
//    net_mod_socket_close(client);
//   return (void *)i;
// }
 
void mclient(int port) {
// void mclient(int port) {
  int status, i = 0, processors = 4;
  void *res[processors];
  Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
  pthread_t tids[processors];
  char sendbuf[512];
//   int status, i = 0, processors = 4;
//   void *res[processors];
//   Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
//   pthread_t tids[processors];
//   char sendbuf[512];
  struct timeval start;
  gettimeofday(&start, NULL);
  for (i = 0; i < processors; i++) {
    targs[i].port = port;
    targs[i].id = i;
    pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]);
  }
//   struct timeval start;
//   gettimeofday(&start, NULL);
//   for (i = 0; i < processors; i++) {
//     targs[i].port = port;
//     targs[i].id = i;
//     pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]);
//   }
  for (i = 0; i < processors; i++) {
    if (pthread_join(tids[i], &res[i]) != 0) {
      perror("multyThreadClient pthread_join");
    } else {
      fprintf(stderr, "client(%d) 发送 %ld 条数据\n", i, (long)res[i]);
    }
  }
//   for (i = 0; i < processors; i++) {
//     if (pthread_join(tids[i], &res[i]) != 0) {
//       perror("multyThreadClient pthread_join");
//     } else {
//       fprintf(stderr, "client(%d) 发送 %ld 条数据\n", i, (long)res[i]);
//     }
//   }
  struct timeval end;
  gettimeofday(&end, NULL);
//   struct timeval end;
//   gettimeofday(&end, NULL);
  double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
  long diffsec = (long) (difftime/1000000);
  long diffmsec = difftime - diffsec*1000000;
  printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec);
}
//   double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
//   long diffsec = (long) (difftime/1000000);
//   long diffmsec = difftime - diffsec*1000000;
//   printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec);
// }
int main(int argc, char *argv[]) {
  shm_mm_wrapper_init(512);
@@ -139,8 +188,6 @@
    server(port);
  else if (strcmp("client", argv[1]) == 0)
    client(port);
  else if (strcmp("mclient", argv[1]) == 0)
    mclient(port);
  shm_mm_wrapper_destroy();
  return 0;