wangzhengquan
2020-08-04 3a89a77e79407d0d638ddf983ee580410cf807c5
fix sendto
1个文件已添加
5个文件已修改
154 ■■■■■ 已修改文件
src/socket/include/shm_socket.h 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.c 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/util/sem_util.c 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/dgram_mod_req_rep.c 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/dgram_mod_survey.c 77 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/test_survey.sh 52 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/include/shm_socket.h
@@ -29,6 +29,11 @@
    
};
enum shm_socket_error_type_t {
    SHM_SOCKET_CONN_FAILED = 1,
    SHM_SOCKET_TIMEOUT = 2
};
enum shm_connection_status_t {
    SHM_CONN_CLOSED=1,
    SHM_CONN_LISTEN=2,
@@ -44,6 +49,8 @@
} shm_msg_t;
typedef struct shm_socket_t {
    shm_socket_type_t socket_type;
    // 本地port
src/socket/shm_socket.c
@@ -267,6 +267,12 @@
    return -1;
  }
  SHMQueue<shm_msg_t> *remoteQueue;
  if ((remoteQueue = _attach_remote_queue(port)) == NULL) {
      err_msg(0, "shm_sendto failed, the other end has been closed, or has not been opened!");
    return SHM_SOCKET_CONN_FAILED;
  }
  shm_msg_t dest;
  dest.type = SHM_COMMON_MSG;
  dest.port = socket->port;
@@ -274,11 +280,6 @@
  dest.buf = mm_malloc(size);
  memcpy(dest.buf, buf, size);
  SHMQueue<shm_msg_t> *remoteQueue;
  if ((remoteQueue = _attach_remote_queue(port)) == NULL) {
      err_msg(0, "shm_sendto failed, the other end has been closed, or has not been opened!");
    return -1;
  }
  // printf("shm_sendto push before\n");
  bool rv;
  if(flags & SHM_MSG_NOWAIT != 0) {
@@ -295,6 +296,7 @@
    return 0;
  } else {
    delete remoteQueue;
    mm_free(dest.buf);
    err_msg(errno, "sendto port %d failed!", port);
    return -1;
  }
src/util/sem_util.c
@@ -81,7 +81,7 @@
  while (semop(semId, &sops, 1) == -1)
    if (errno != EINTR) {
     // err_msg(errno, "SemUtil::dec");
      err_msg(errno, "SemUtil::dec");
      return -1;
    }
@@ -97,7 +97,7 @@
  while (semop(semId, &sops, 1) == -1)
    if (errno != EINTR) {
     // err_msg(errno, "SemUtil::dec_nowait");
      err_msg(errno, "SemUtil::dec_nowait");
      return -1;
    }
test_socket/dgram_mod_req_rep.c
@@ -152,7 +152,7 @@
  }
  if (strcmp("client", argv[1]) == 0)
    startClients(port);
    client(port);
  
  return 0;
test_socket/dgram_mod_survey.c
@@ -2,6 +2,20 @@
#include "shm_mm.h"
#include "usg_common.h"
typedef struct Targ {
  int port;
  int id;
}Targ;
void sigint_handler(int sig) {
   //dgram_mod_close_socket(server_socket);
  printf("===Catch sigint======================\n");
  shm_destroy();
  exit(0);
}
void server(int port) {
  void *socket = dgram_mod_open_socket();
  dgram_mod_bind(socket, port);
@@ -10,9 +24,12 @@
  char sendbuf[512];
  int rv;
  int remote_port;
  while ( (rv = dgram_mod_recvfrom(socket, &recvbuf, &size, &remote_port) ) == 0) {
  while (true) {
    if ((rv = dgram_mod_recvfrom_timeout(socket, &recvbuf, &size, &remote_port, 5, 0) ) == 0) {
    printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf);
    free(recvbuf);
    }
  }
  dgram_mod_close_socket(socket);
}
@@ -26,13 +43,67 @@
    sprintf(sendbuf, "%d", i);
    printf("SEND HEART:%s\n", sendbuf);
    dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
    sleep(1);
    // sleep(1);
    i++;
  }
  dgram_mod_close_socket(socket);
}
 
void *runclient(void *arg) {
  signal(SIGINT,  sigint_handler);
  Targ *targ = (Targ *)arg;
  int port = targ->port;
  void *socket = dgram_mod_open_socket();
  int size;
  char sendbuf[512];
  long scale = 10;
  long i = 0;
  while (i < scale) {
    sprintf(sendbuf, "%d", i);
    printf("%d SEND HEART:%s\n", targ->id, sendbuf);
    dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
   // sleep(1);
    i++;
  }
  dgram_mod_close_socket(socket);
  return (void *)i;
}
void startClients(int port) {
  int status, i = 0, processors = 100;
  void *res[processors];
  Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
  pthread_t tids[processors];
  char sendbuf[512];
  struct timeval start;
  gettimeofday(&start, NULL);
  for (i = 0; i < processors; i++) {
    targs[i].port = port;
    targs[i].id = i;
    pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]);
  }
  for (i = 0; i < processors; i++) {
    if (pthread_join(tids[i], &res[i]) != 0) {
      perror("multyThreadClient pthread_join");
    } else {
      fprintf(stderr, "client(%d) 发送 %ld 条数据\n", i, (long)res[i]);
    }
  }
  struct timeval end;
  gettimeofday(&end, NULL);
  double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
  long diffsec = (long) (difftime/1000000);
  long diffmsec = difftime - diffsec*1000000;
  printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec);
}
int main(int argc, char *argv[]) {
  shm_init(512);
@@ -51,6 +122,6 @@
  if (strcmp("client", argv[1]) == 0)
    client(port);
  shm_destroy();
  return 0;
}
test_socket/test_survey.sh
New file
@@ -0,0 +1,52 @@
PROCESSES=100
function clean() {
     ps -ef | grep "dgram_mod_survey" | awk  '{print $2}' | xargs -i kill -9 {}
     ipcrm -a
}
function start_server() {
    clean
    ./dgram_mod_survey server 8 & server_pid=$!
    echo "start server pid ${server_pid}"
}
function start_clients() {
    for (( i=0; i<$PROCESSES; i++ ))
    do
        # pid_arr[$i]=$i
        ./dgram_mod_survey client 8 & pid_arr[$i]=$!
        echo "start ${pid_arr[$i]}"
    done
}
function close_cleints() {
    for (( i=0; i<$PROCESSES; i++ ))
    do
        echo "kill ${pid_arr[$i]}"
        kill -9 ${pid_arr[$i]}
        #./dgram_mod_survey client 8 & ${pid_arr[$i]}=$!
    done
}
case ${1} in
  "server")
  start_server
  ;;
  "clients")
  start_clients
  sleep 5
    close_cleints
  ;;
  "")
    start_server
    sleep 1
    start_clients
    sleep 5
    close_cleints
  ;;
  *)
  echo "error input"
  exit 1
  ;;
esac