wangzhengquan
2021-02-02 cb85aa8a8d02a3d6dc16e3f32e78da9e70f9c7f5
update
7个文件已修改
151 ■■■■■ 已修改文件
src/bus_error.cpp 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_error.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.cpp 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 76 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/socket_def.h 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.cpp 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_error.cpp
@@ -17,7 +17,8 @@
  "The other end is not inline",
  "Key already in use",
  "Network fault",
  "Send to self error"
  "Send to self error",
  "Receive from wrong end"
};
src/bus_error.h
@@ -10,6 +10,7 @@
#define EBUS_KEY_INUSED 503
#define EBUS_NET 504
#define EBUS_SENDTO_SELF 505
#define EBUS_RECVFROM_WRONG_END 506
extern int bus_errno;
src/net/net_mod_socket_wrapper.cpp
@@ -53,15 +53,14 @@
int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){
    NetModSocket *sockt = (NetModSocket *)_socket;
    logger->debug("net_mod_socket_sendto: %d sendto  %d", net_mod_socket_get_key(_socket), key);
    // return sockt->sendto_timeout(buf, size, key, sec, nsec);
    return sockt->sendto(buf, size, key);
    return sockt->sendto_timeout(buf, size, key, sec, nsec);
    // return sockt->sendto(buf, size, key);
}
// 发送信息立刻返回。
int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){
    NetModSocket *sockt = (NetModSocket *)_socket;
    logger->debug("net_mod_socket_sendto: %d sendto  %d", net_mod_socket_get_key(_socket), key);
    return sockt->sendto(buf, size, key);
    // return sockt->sendto_nowait(buf, size, key);
    return sockt->sendto_nowait(buf, size, key);
}
/**
@@ -78,16 +77,17 @@
    logger->debug(" %d net_mod_socket_recvfrom after. rv = %d", net_mod_socket_get_key(_socket), rv);
    return rv;
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->recvfrom(buf, size, key);
    // return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
    // return sockt->recvfrom(buf, size, key);
    return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
}
int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->recvfrom(buf, size, key);
    // return sockt->recvfrom_nowait(buf, size, key);
    return sockt->recvfrom_nowait(buf, size, key);
}
int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
@@ -95,6 +95,7 @@
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size);
}
/**
 * 如果建立连接的节点没有接受到消息等待timeout的时间后返回
 * @timeout 等待时间,单位是千分之一秒
@@ -102,15 +103,14 @@
int net_mod_socket_sendandrecv_timeout(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size,  int timeout){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size);
    // return sockt->sendandrecv_timeout(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size, timeout);
    // return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size);
    return sockt->sendandrecv_timeout(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size, timeout);
}
int net_mod_socket_sendandrecv_nowait(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size);
    // return sockt->sendandrecv_nowait(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size);
    return sockt->sendandrecv_nowait(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size);
}
 
src/socket/shm_socket.cpp
@@ -182,7 +182,6 @@
  }
  shm_packet_t dest;
  dest.type = SHM_COMMON_MSG;
  dest.key = sockt->key;
  dest.size = size;
  dest.buf = mm_malloc(size);
@@ -287,7 +286,6 @@
  //s = pthread_key_create(&_perthread_socket_key_, NULL);
  if (s != 0) {
     logger->error(s, "pthread_key_create");
     abort(); /* dump core and terminate */
     exit(1);
  }
}
@@ -299,11 +297,13 @@
                    int *recv_size,  const struct timespec *timeout,  int flags) {
  int recv_key;
  int rv;
  int tryn = 0;
  // 用thread local 保证每个线程用一个独占的socket接受对方返回的信息
  shm_socket_t *tmp_socket;
 
  /* If first call from this thread, allocate buffer for thread, and save its location */
  // logger->debug("%d create tmp socket\n", pthread_self() );
  rv = pthread_once(&_once_, _create_socket_key_perthread);
  if (rv != 0) {
    logger->error(rv, "shm_sendandrecv pthread_once");
@@ -325,26 +325,31 @@
  }
  if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
    rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
    if(rv != 0) {
      logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv));
    }
    else if(rv == 0 ) {
      logger->debug("======%d use tmp_socket %d, send to  %d, receive from  %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
      if(recv_key == shm_socket_get_key(sockt)) {
        logger->debug("=====收到了自己发给自己的消息\n");
    while(tryn < 3) {
      tryn++;
      rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
      if(rv != 0) {
        logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv));
        return rv;
      }
      assert( send_key == recv_key);
       // 超时导致接发送对象,与返回对象不对应的情况
      if(send_key != recv_key) {
        logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
        exit(1);
        logger->debug("======%d use tmp_socket %d, send to  %d, receive from  %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
        // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
        // exit(1);
        continue;
        // return EBUS_RECVFROM_WRONG_END;
      }
      return 0;
    }
    return rv;
  } else {
    return rv;
  }
    return EBUS_RECVFROM_WRONG_END;
  }
  return rv;
}
int _shm_sendandrecv_alloc_new(shm_socket_t *sockt, const void *send_buf,
@@ -353,31 +358,34 @@
  int recv_key;
  int rv;
  // 用thread local 保证每个线程用一个独占的socket接受对方返回的信息
  int tryn = 0;
  shm_socket_t *tmp_socket;
 
  /* If first call from this thread, allocate buffer for thread, and save its location */
  // logger->debug("%d create tmp socket\n", pthread_self() );
  tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
  if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
    rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
   
    if(rv != 0) {
      printf("_shm_sendandrecv_alloc_new : %s\n", bus_strerror(rv));
    }
    else if(rv == 0 ) {
      printf("======%d use tmp_socket %d, send to  %d, receive from  %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
      if(recv_key == shm_socket_get_key(sockt)) {
        printf("=====收到了自己发给自己的消息\n");
      }
      assert( send_key == recv_key);
      if(send_key != recv_key) {
         err_exit(0, "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
    while(tryn < 3) {
      tryn++;
      rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
      if(rv != 0) {
        logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv));
        return rv;
      }
     
      // 超时导致接发送对象,与返回对象不对应的情况
      if(send_key != recv_key) {
        // logger->debug("======%d use tmp_socket %d, send to  %d, receive from  %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
        // logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
        continue;
      }
      return 0;
    }
    return EBUS_RECVFROM_WRONG_END;
  } 
   
  shm_close_socket(tmp_socket);  
src/socket/shm_socket.h
@@ -12,19 +12,10 @@
    SHM_SOCKET_DGRAM = 2
    
};
enum shm_packet_type_t
{
    SHM_SOCKET_OPEN = 1,
    SHM_SOCKET_OPEN_REPLY = 2,
    SHM_SOCKET_CLOSE = 3,
    SHM_COMMON_MSG = 4
};
typedef struct shm_packet_t {
    int key;
    shm_packet_type_t type;
    size_t size;
    void * buf;
src/socket/socket_def.h
@@ -20,18 +20,4 @@
    
};
// typedef struct shm_bus_msg_t {
//     void *topic;
//     int topic_length;
// } shm_bus_msg_t;
#define ACTION_LIDENTIFIER "<**"
#define ACTION_RIDENTIFIER "**>"
#define TOPIC_LIDENTIFIER "{"
#define TOPIC_RIDENTIFIER "}"
#endif
test_net_socket/test_net_mod_socket.cpp
@@ -272,11 +272,11 @@
    sprintf(sendbuf, hello_format, net_mod_socket_get_key(client), l);
    // fprintf(fp, "requst:%s\n", sendbuf);
    // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
    n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000);
    n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1);
    printf("%d: send %d nodes\n", l, n);
    for(j=0; j < recv_arr_size; j++) {
      fprintf(fp, "%d send '%s' to %d. received  from (host=%s, port= %d, key=%d) '%s'\n",
      fprintf(stdout, "%d send '%s' to %d. received  from (host=%s, port= %d, key=%d) '%s'\n",
        net_mod_socket_get_key(client),
        sendbuf,
        targ->node->key,
@@ -372,7 +372,7 @@
  while(true) {
    sprintf(buf, hello_format, pid, l);
    n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1,
      &recv_arr, &recv_arr_size, 1000);
      &recv_arr, &recv_arr_size, 1);
    printf(" %d nodes reply\n", n);
    for(j = 0; j < recv_arr_size; j++) {
@@ -386,21 +386,7 @@
      );
      // printf( "%d send '%s' to %d. received  from (host=%s, port= %d, key=%d) '%s'\n",
      //   net_mod_socket_get_key(client),
      //   sendbuf,
      //   targ->node->key,
      //   recv_arr[j].host,
      //   recv_arr[j].port,
      //   recv_arr[j].key,
      //   recv_arr[j].content
      // );
      // assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3);
      // assert(targ->node->key == rkey);
      // assert(net_mod_socket_get_key(client) == lkey);
      // assert(rl == l);
      assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3);
      assert(retPid == pid);