zhangmeng
2024-04-09 2561a007b8d8999a4750046d0cfb3b1ad5af50ac
test_net_socket/shm_util.cpp
@@ -44,7 +44,7 @@
  pthread_detach(pthread_self());
  
  char action[512];
  while ( true) {
  while ( true ) {
    printf("Input action: Close?\n");
    if(scanf("%s",action) < 1) {
      printf("Invalide action\n");
@@ -200,6 +200,8 @@
   
   int recv_arr_size, i, n;
   net_mod_recv_msg_t *recv_arr;
  net_mod_err_t *errarr;
  int errarr_size = 0;
  pthread_t tid;
  // 创建一个线程接受订阅消息
@@ -234,19 +236,31 @@
        if (fgets(content, MAXLINE, stdin) != NULL) {
           // 收到消息的节点即使没有对应的信息, 也要回复一个表示无的消息,否则会一直等待
          // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size);
        n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1);
        n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content),
         &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000);
          printf(" %d nodes reply\n", n);
          for(i=0; i<recv_arr_size; i++) {
             printf("reply from (host:%s, port: %d, key:%d) >> %s\n",
                recv_arr[i].host,
                recv_arr[i].port,
                recv_arr[i].key,
                (char *)recv_arr[i].content
             );
          }
        if(recv_arr_size > 0) {
          for(i=0; i<recv_arr_size; i++) {
            printf("reply from (host:%s, port: %d, key:%d) >> %s\n",
              recv_arr[i].host,
              recv_arr[i].port,
              recv_arr[i].key,
              (char *)recv_arr[i].content
            );
          }
          // 使用完后,不要忘记释放掉
          net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
        }
          
            // 使用完后,不要忘记释放掉
          net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
        if(errarr_size > 0) {
          for(i = 0; i < errarr_size; i++) {
            printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
          }
          free(errarr);
        }
        }
    }
    else if(strcmp(action, "desub") == 0) {
@@ -290,10 +304,12 @@
  Targ *targ = (Targ *)arg;
  char sendbuf[128];
 
  int j, n;
  int recv_arr_size;
  int i, j, n;
  int recv_arr_size = 0;
  net_mod_recv_msg_t *recv_arr;
  int total = 0;
  net_mod_err_t *errarr;
  int errarr_size = 0;
 
  int rkey, lkey;
  unsigned int l = 0 , rl;
@@ -312,30 +328,42 @@
    sprintf(sendbuf, hello_format, net_mod_socket_get_key(client), l);
    // fprintf(fp, "requst:%s\n", sendbuf);
    // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
    n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1);
    n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1,
      &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1);
    printf("%d: send %d nodes\n", l, n);
    for(j=0; j < recv_arr_size; j++) {
      fprintf(stdout, "%d send '%s' to %d. received  from (host=%s, port= %d, key=%d) '%s'\n",
        net_mod_socket_get_key(client),
        sendbuf,
        targ->node->key,
        recv_arr[j].host,
        recv_arr[j].port,
        recv_arr[j].key,
        (char *)recv_arr[j].content
      );
    if(recv_arr_size > 0) {
      for(j=0; j < recv_arr_size; j++) {
        fprintf(stdout, "%d send '%s' to %d. received  from (host=%s, port= %d, key=%d) '%s'\n",
          net_mod_socket_get_key(client),
          sendbuf,
          targ->node->key,
          recv_arr[j].host,
          recv_arr[j].port,
          recv_arr[j].key,
          (char *)recv_arr[j].content
        );
      printf("key == %d\n", net_mod_socket_get_key(client));
      assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3);
      assert(targ->node->key == rkey);
      assert(net_mod_socket_get_key(client) == lkey);
      assert(rl == l);
        printf("key == %d\n", net_mod_socket_get_key(client));
        assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3);
        assert(targ->node->key == rkey);
        assert(net_mod_socket_get_key(client) == lkey);
        assert(rl == l);
      }
      // 使用完后,不要忘记释放掉
      net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
    }
      // 使用完后,不要忘记释放掉
      net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
    if(errarr_size > 0) {
      for(i = 0; i < errarr_size; i++) {
        printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
      }
      free(errarr);
    }
    total += n;
  }
  if(fp != NULL)
    fclose(fp);
  // net_mod_socket_close(client);
@@ -384,7 +412,7 @@
  double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
  long diffsec = (long) (difftime/1000000);
  long diffusec = difftime - diffsec*1000000;
  fprintf(stderr,"发送数目:%ld, 成功数目: %ld, 用时: (%ld sec %ld usec), 平均: %f\n",
  fprintf(stderr,"发送数目:%d, 成功数目: %ld, 用时: (%ld sec %ld usec), 平均: %f\n",
    SCALE*node_arr_size, total, diffsec, diffusec, difftime/total );
  // fflush(stdout);
 
@@ -393,11 +421,14 @@
// 无限循环send
void test_net_sendandrecv(char *nodelist) {
  int n, j;
  int i, n, j;
  void * client;
  int recv_arr_size;
  net_mod_recv_msg_t *recv_arr;
  net_node_t *node_arr;
  net_mod_err_t *errarr;
  int errarr_size = 0;
  int node_arr_size = parse_node_list(nodelist, &node_arr);
  char buf[128];
  pid_t pid, retPid ;
@@ -413,30 +444,35 @@
  while(true) {
    sprintf(buf, hello_format, pid, l);
    n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1,
      &recv_arr, &recv_arr_size, 1000);
      &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000);
    printf(" %d nodes reply\n", n);
    for(j = 0; j < recv_arr_size; j++) {
      printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n",
        (long)pid,
        buf,
        (char *)recv_arr[j].content,
        recv_arr[j].host,
        recv_arr[j].port,
        recv_arr[j].key
    if(recv_arr_size > 0) {
      for(j = 0; j < recv_arr_size; j++) {
        printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n",
          (long)pid,
          buf,
          (char *)recv_arr[j].content,
          recv_arr[j].host,
          recv_arr[j].port,
          recv_arr[j].key
        );
        assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3);
        assert(retPid == pid);
        assert(retl == l);
        assert(remoteKey == recv_arr[j].key);
      }
      // 使用完后,不要忘记释放掉
      net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
    }
      );
      assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3);
      assert(retPid == pid);
      assert(retl == l);
      assert(remoteKey == recv_arr[j].key);
    if(errarr_size > 0) {
      for(i = 0; i < errarr_size; i++) {
        printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
      }
      free(errarr);
    }
    
    // 使用完后,不要忘记释放掉
    net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
    l++;
  }
@@ -523,7 +559,7 @@
  net_node_t *node_arr;
  int node_arr_size = parse_node_list(nodelist, &node_arr);
 
  char *topic = "news";
  const char *topic = "news";
  sprintf(sendbuf, "pid:%ld: Good news!!", (long)getpid());
  void * client = net_mod_socket_open();
@@ -577,31 +613,45 @@
  }
}
void do_sendandrecv(int key, char *sendbuf) {
  int n, j;
void do_sendandrecv(char *sendlist, char *sendbuf) {
  int i, n, j;
  int recv_arr_size;
  net_mod_recv_msg_t *recv_arr;
  net_mod_err_t *errarr;
  int errarr_size = 0;
  net_node_t node_arr[] = {NULL, 0, key};
  net_node_t *node_arr;
  int node_arr_size = parse_node_list(sendlist, &node_arr);
  print_node_list(node_arr, node_arr_size);
  void * client = net_mod_socket_open();
  n = net_mod_socket_sendandrecv_timeout(client, node_arr, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 5000);
  if(n == 0) {
    printf("send failed\n");
    return;
  }
  printf(" %d nodes reply\n", n);
  for(j=0; j < recv_arr_size; j++) {
  n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1,
    &recv_arr, &recv_arr_size, &errarr, &errarr_size, 5000);
  printf(" %d nodes reply\n", recv_arr_size);
  if(recv_arr_size > 0) {
    for(j=0; j < recv_arr_size; j++) {
    fprintf(stdout, "%d send '%s' to %d. received  from (host=%s, port= %d, key=%d) '%s'\n\n",
      net_mod_socket_get_key(client),
      sendbuf,
      key,
      recv_arr[j].host,
      recv_arr[j].port,
      recv_arr[j].key,
      (char *)recv_arr[j].content
    );
      fprintf(stdout, "===> suc: %d send '%s'. received  from (host=%s, port= %d, key=%d), '%s'\n\n",
        net_mod_socket_get_key(client),
        sendbuf,
        recv_arr[j].host,
        recv_arr[j].port,
        recv_arr[j].key,
        (char *)recv_arr[j].content
      );
    }
    // 使用完后,不要忘记释放掉
    net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
  }
// printf("errarr_size = %d\n", errarr_size);
  if(errarr_size > 0) {
    for(i = 0; i < errarr_size; i++) {
      printf("===> error: (host:%s, port: %d, key:%d). %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
    }
    free(errarr);
  }
  net_mod_socket_close(client);
@@ -771,6 +821,10 @@
  net_node_t *node_arr = (net_node_t *) calloc(entry_arr_len, sizeof(net_node_t));
  for(i = 0; i < entry_arr_len; i++) {
    if(strchr(entry_arr[i], ':') == NULL) {
      node_arr[i]= {NULL, 0, atoi(entry_arr[i])};
      continue;
    }
    property_arr_len = str_split(entry_arr[i], ":", &property_arr);
   printf("=====%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]);
@@ -854,9 +908,9 @@
      usage(prog);
      exit(1);
    }
    int key = atoi(argv[1]);
    char *sendlist = argv[1];
    char *content = argv[2];
    do_sendandrecv(key, content);
    do_sendandrecv(sendlist, content);
  }
  else if (strcmp("start_bus_server", fun) == 0) {
   
@@ -881,7 +935,6 @@
    opt =  parse_args(argc, argv);
    if(opt.bind == 0) {
      usage(argv[0]);
      exit(1);
    } else {
      start_recvfrom(opt.bind, opt.force);
    }
@@ -943,7 +996,7 @@
  }
  else {
    printf("%Invalid funciton name\n");
    printf("Invalid funciton name\n");
    usage(argv[0]);
    exit(1);