wangzhengquan
2021-03-13 7a12bed7a2550d037e6e869c1ed0ce115098dbb2
update
8个文件已修改
308 ■■■■■ 已修改文件
CMakeLists.txt 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h 55 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/mm.cpp 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 60 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_socket.sh 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/CMakeLists.txt 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/heart_beat.cpp 167 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/heart_beat.sh 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
CMakeLists.txt
@@ -29,6 +29,6 @@
    add_subdirectory(${PROJECT_SOURCE_DIR}/src)
    add_subdirectory(${PROJECT_SOURCE_DIR}/test)
    add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket)
#    add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket)
    add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket)
#    add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util)
endif()
src/queue/lock_free_queue.h
@@ -88,9 +88,9 @@
  sem_t slots;
  sem_t items;
  // time_t createTime;
  // time_t closeTime;
  // int status;
  time_t createTime;
  time_t closeTime;
  int status;
public:
@@ -101,7 +101,8 @@
  /// template
  ~LockFreeQueue();
  // inline void  close();
  inline void  close();
  inline bool isClosed();
  // std::atomic_uint reference;
  /// @brief constructor of the class
@@ -129,17 +130,17 @@
  
  // time_t getCreateTime() {
  //   return createTime;
  // }
  time_t getCreateTime() {
    return createTime;
  }
  // time_t getCloseTime() {
  //   return closeTime;
  // }
  time_t getCloseTime() {
    return closeTime;
  }
  // int getStatus() {
  //   return status;
  // }
  int getStatus() {
    return status;
  }
  /// @brief push an element at the tail of the queue
  /// @param the element to insert in the queue
@@ -182,20 +183,28 @@
  if (sem_init(&items, 1, 0) == -1)
    err_exit(errno, "LockFreeQueue sem_init");
  
  // createTime = time(NULL);
  // status = LOCK_FREE_Q_ST_OPENED;
  createTime = time(NULL);
  status = LOCK_FREE_Q_ST_OPENED;
}
// template<
//   typename ELEM_T,
//   typename Allocator,
//   template<typename T, typename AT> class Q_TYPE>
// inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() {
//   // status = LOCK_FREE_Q_ST_CLOSED;
//   // closeTime = time(NULL);
// }
template<
  typename ELEM_T,
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() {
  status = LOCK_FREE_Q_ST_CLOSED;
  closeTime = time(NULL);
}
template<
  typename ELEM_T,
  typename Allocator,
  template<typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::isClosed() {
  return status == LOCK_FREE_Q_ST_CLOSED;
}
template<
src/shm/mm.cpp
@@ -123,8 +123,11 @@
    return aptr;
  } else {
    SemUtil::inc(mutex);
    err_msg(0, "mm_malloc : out of memery\n");
    LoggerFactory::getLogger()->fatal("mm_malloc : out of memery\n");
    // abort();
    err_exit(0, "mm_malloc : out of memery\n");
    exit(1);
    return NULL;
  }
src/socket/shm_socket.cpp
@@ -10,7 +10,7 @@
static Logger *logger = LoggerFactory::getLogger();
ShmQueueStMap * shmQueueStMap ;
// ShmQueueStMap * shmQueueStMap ;
static void print_msg(char *head, shm_packet_t &msg) {
  // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type);
@@ -104,7 +104,7 @@
    err_exit(s, "pthread_mutexattr_destroy");
  shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY);
  // shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY);
  return sockt;
}
@@ -113,37 +113,33 @@
static int _shm_socket_close_(shm_socket_t *sockt) {
  
  int rv, i;
  hashtable_t *hashtable = mm_get_hashtable();
  logger->debug("shm_socket_close\n");
 
  if(sockt->key != 0) {
    auto it =  shmQueueStMap->find(sockt->key);
    if(it != shmQueueStMap->end()) {
      it->second.status = SHM_QUEUE_ST_CLOSED;
      it->second.closeTime = time(NULL);
    }
  }
  // if(sockt->key != 0) {
  //   auto it =  shmQueueStMap->find(sockt->key);
  //   if(it != shmQueueStMap->end()) {
  //     it->second.status = SHM_QUEUE_ST_CLOSED;
  //     it->second.closeTime = time(NULL);
  //   }
  // }
  printf("====sockt->queue addr = %p\n", sockt->queue);
  // printf("====sockt->queue addr = %p\n", sockt->queue);
  if(sockt->queue != NULL) {
    sockt->queue->close();
    for( i = 0; i < sockt->queue->size(); i++) {
      mm_free((*(sockt->queue))[i].buf);
      logger->info("======= %d free queue element buf\n", sockt->key);
    }
    sleep(1);
    // hashtable_remove(hashtable, mkey);
    hashtable_remove(hashtable, sockt->key);
  //   sockt->queue = NULL;
  }
 
  // hashtable_remove(hashtable, mkey);
  // if(sockt->queue != NULL) {
  //   sockt->queue = NULL;
  // }
  pthread_mutex_destroy(&(sockt->mutex) );
  free(sockt);
  return 0;
@@ -578,9 +574,9 @@
      }
      // 标记key对应的状态 ,为opened
      stRecord.status = SHM_QUEUE_ST_OPENED;
      stRecord.createTime = time(NULL);
      shmQueueStMap->insert({sockt->key, stRecord});
      // stRecord.status = SHM_QUEUE_ST_OPENED;
      // stRecord.createTime = time(NULL);
      // shmQueueStMap->insert({sockt->key, stRecord});
      
    }
@@ -597,17 +593,19 @@
  }
  // 检查key标记的状态
  auto it =  shmQueueStMap->find(key);
  if(it != shmQueueStMap->end()) {
    if(it->second.status == SHM_QUEUE_ST_CLOSED) {
      // key对应的状态是关闭的
      goto ERR_CLOSED;
    }
  }
  // auto it =  shmQueueStMap->find(key);
  // if(it != shmQueueStMap->end()) {
  //   if(it->second.status == SHM_QUEUE_ST_CLOSED) {
  //     // key对应的状态是关闭的
  //     goto ERR_CLOSED;
  //   }
  // }
  remoteQueue = shm_socket_attach_queue(key);
  if (remoteQueue == NULL ) {
    goto ERR_CLOSED;
  } else if(remoteQueue->isClosed()) {
    goto ERR_CLOSED;
  }
@@ -659,9 +657,9 @@
    }
    
    // 标记key对应的状态 ,为opened
    stRecord.status = SHM_QUEUE_ST_OPENED;
    stRecord.createTime = time(NULL);
    shmQueueStMap->insert({sockt->key, stRecord});
    // stRecord.status = SHM_QUEUE_ST_OPENED;
    // stRecord.createTime = time(NULL);
    // shmQueueStMap->insert({sockt->key, stRecord});
    
    if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
      err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
test_net_socket/net_mod_socket.sh
@@ -11,7 +11,7 @@
    ./shm_util recvfrom --bind=102 & server_pid=$! &&  echo "pid: ${server_pid}" 
    # 打开回队列收进程
    ./shm_util start_resycle & server_pid=$! &&  echo "pid: ${server_pid}"
    # ./shm_util start_resycle & server_pid=$! &&  echo "pid: ${server_pid}"
}
# 交互式客户端
test_socket/CMakeLists.txt
@@ -9,10 +9,13 @@
 
add_custom_command(
  OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh
  COMMAND cp  ${CMAKE_CURRENT_SOURCE_DIR}/heart_beat.sh ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh
  OUTPUT ${PROJECT_BINARY_DIR}/bin/heart_beat.sh
  COMMAND cp  ${CMAKE_CURRENT_SOURCE_DIR}/heart_beat.sh ${PROJECT_BINARY_DIR}/bin/heart_beat.sh
  DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/heart_beat.sh
  )
add_custom_target("heart_beat.sh" ALL DEPENDS ${PROJECT_BINARY_DIR}/bin/heart_beat.sh)
add_executable(heart_beat heart_beat.cpp ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh)
target_link_libraries(heart_beat PRIVATE shm_queue  ${EXTRA_LIBS} )
 
test_socket/heart_beat.cpp
@@ -6,7 +6,7 @@
#include "usg_common.h"
#include <getopt.h>
static Logger *logger =  LoggerFactory::getLogger();
typedef struct Targ {
  int port;
  int id;
@@ -20,24 +20,45 @@
  // exit(0);
}
void *serverSockt;
static void server_stop_handler(int sig) {
  printf("stop_handler\n");
  int rv = net_mod_socket_stop(serverSockt);
  if(rv ==0) {
    logger->debug("send stop suc");
    return;
  } else {
    logger->debug("send stop fail.%s\n", bus_strerror(rv));
  }
}
void server(int port) {
  void *serv = net_mod_socket_open();
  net_mod_socket_bind(serv, port);
  serverSockt = net_mod_socket_open();
  net_mod_socket_bind(serverSockt, port);
  int size;
  void *recvbuf;
  char sendbuf[512];
  int rv;
  int remote_port;
  signal(SIGTERM,  server_stop_handler);
  signal(SIGINT,  server_stop_handler);
  while (true) {
    if(net_mod_socket_recvfrom_timeout(serv, &recvbuf, &size, &remote_port, 0, 2000000000)==0) {
    rv = net_mod_socket_recvfrom_timeout(serverSockt, &recvbuf, &size, &remote_port, 0, 2000000000);
    if(rv == 0 ) {
      printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf);
      net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port);
      net_mod_socket_sendto(serverSockt, "suc", strlen("suc")+1, remote_port);
      free(recvbuf);
    } else if(rv == EBUS_STOPED) {
      logger->debug("Stopping\n");
      break;
    }
    
  }
  // sleep(1000);
  net_mod_socket_close(serv);
  net_mod_socket_close(serverSockt);
}
void client(int port) {
@@ -49,14 +70,42 @@
  net_node_t node_arr[] = {"", 0, port};
  int node_arr_size = 1;
  int recv_arr_size;
  int recv_arr_size,  n;
  net_mod_recv_msg_t *recv_arr;
  net_mod_err_t *errarr;
  int errarr_size = 0;
  // int recv_arr_size;
  // net_mod_recv_msg_t *recv_arr;
  while (true) {
    sprintf(sendbuf, "%d", i);
    rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 2000);
    rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf),
         &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000);
    // rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 2000);
    //rv = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
    printf("SEND HEART:%s, suc nodes = %d\n", sendbuf, rv);
  
    if(recv_arr_size > 0) {
      for(i=0; i<recv_arr_size; i++) {
        printf("reply from (host:%s, port: %d, key:%d) >> %s\n",
          recv_arr[i].host,
          recv_arr[i].port,
          recv_arr[i].key,
          (char *)recv_arr[i].content
        );
      }
      // 使用完后,不要忘记释放掉
      net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
    }
    if(errarr_size > 0) {
      for(i = 0; i < errarr_size; i++) {
        printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
      }
      free(errarr);
    }
   // sleep(1);
    i++;
  }
@@ -64,66 +113,66 @@
}
void *runclient(void *arg) {
  // signal(SIGINT,  sigint_handler);
  Targ *targ = (Targ *)arg;
  int port = targ->port;
  void *client = net_mod_socket_open();
  int size;
  char sendbuf[512];
  long scale = 100000;
  long i = 0;
  net_node_t node_arr[] = {"", 0, 100};
  int node_arr_size = 1;
// void *runclient(void *arg) {
//   // signal(SIGINT,  sigint_handler);
//   Targ *targ = (Targ *)arg;
//   int port = targ->port;
//   void *client = net_mod_socket_open();
//   int size;
//   char sendbuf[512];
//   long scale = 100000;
//   long i = 0;
//   net_node_t node_arr[] = {"", 0, 100};
//   int node_arr_size = 1;
  int recv_arr_size;
  net_mod_recv_msg_t *recv_arr;
//   int recv_arr_size;
//   net_mod_recv_msg_t *recv_arr;
  while (i < scale) {
    sprintf(sendbuf, "%d", i);
    printf("%d SEND HEART:%s\n", targ->id, sendbuf);
    net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
    // net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
    i++;
  }
//   while (i < scale) {
//     sprintf(sendbuf, "%d", i);
//     printf("%d SEND HEART:%s\n", targ->id, sendbuf);
//     net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
//     // net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
//     i++;
//   }
  
   net_mod_socket_close(client);
  return (void *)i;
}
//    net_mod_socket_close(client);
//   return (void *)i;
// }
 
void mclient(int port) {
// void mclient(int port) {
  int status, i = 0, processors = 4;
  void *res[processors];
  Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
  pthread_t tids[processors];
  char sendbuf[512];
//   int status, i = 0, processors = 4;
//   void *res[processors];
//   Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
//   pthread_t tids[processors];
//   char sendbuf[512];
  struct timeval start;
  gettimeofday(&start, NULL);
  for (i = 0; i < processors; i++) {
    targs[i].port = port;
    targs[i].id = i;
    pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]);
  }
//   struct timeval start;
//   gettimeofday(&start, NULL);
//   for (i = 0; i < processors; i++) {
//     targs[i].port = port;
//     targs[i].id = i;
//     pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]);
//   }
  for (i = 0; i < processors; i++) {
    if (pthread_join(tids[i], &res[i]) != 0) {
      perror("multyThreadClient pthread_join");
    } else {
      fprintf(stderr, "client(%d) 发送 %ld 条数据\n", i, (long)res[i]);
    }
  }
//   for (i = 0; i < processors; i++) {
//     if (pthread_join(tids[i], &res[i]) != 0) {
//       perror("multyThreadClient pthread_join");
//     } else {
//       fprintf(stderr, "client(%d) 发送 %ld 条数据\n", i, (long)res[i]);
//     }
//   }
  struct timeval end;
  gettimeofday(&end, NULL);
//   struct timeval end;
//   gettimeofday(&end, NULL);
  double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
  long diffsec = (long) (difftime/1000000);
  long diffmsec = difftime - diffsec*1000000;
  printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec);
}
//   double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
//   long diffsec = (long) (difftime/1000000);
//   long diffmsec = difftime - diffsec*1000000;
//   printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec);
// }
int main(int argc, char *argv[]) {
  shm_mm_wrapper_init(512);
@@ -139,8 +188,6 @@
    server(port);
  else if (strcmp("client", argv[1]) == 0)
    client(port);
  else if (strcmp("mclient", argv[1]) == 0)
    mclient(port);
  shm_mm_wrapper_destroy();
  return 0;
test_socket/heart_beat.sh
@@ -50,6 +50,16 @@
  close
  ;;
  "test2")
  start_server
    sleep 1
    start_clients
    sleep 5
    kill -15 server_pid
    sleep 2
    close_clients
  ;;
  "")
    start_server
    sleep 1