wangzhengquan
2021-02-01 dd0714b75b2e29087e3cd1184995bf38a453d833
update
1个文件已删除
9个文件已修改
262 ■■■■■ 已修改文件
lib/libusgcommon.a 补丁 | 查看 | 原始文档 | blame | 历史
lib/libusgcommon.so 补丁 | 查看 | 原始文档 | blame | 历史
src/logger_factory.cpp 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.cpp 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 56 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_socket.sh 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/one_sendto_many.cpp 107 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.cpp 43 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
lib/libusgcommon.a
Binary files differ
lib/libusgcommon.so
Binary files differ
src/logger_factory.cpp
@@ -11,8 +11,11 @@
     
    LoggerConfig config;
    config.level = Logger::DEBUG;
    config.logFile =  "/tmp/bhome_bus.log";
    const char *logFileFormat= "/tmp/bhome_bus.%ld.log";
    char logFile[128];
    sprintf(logFile, logFileFormat, getpid());
    config.logFile = logFile;
#ifdef BUILD_Debug
    config.console = 1;
src/net/net_mod_socket_wrapper.cpp
@@ -53,14 +53,15 @@
int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){
    NetModSocket *sockt = (NetModSocket *)_socket;
    logger->debug("net_mod_socket_sendto: %d sendto  %d", net_mod_socket_get_key(_socket), key);
    return sockt->sendto_timeout(buf, size, key, sec, nsec);
    // return sockt->sendto(buf, size, key);
    // return sockt->sendto_timeout(buf, size, key, sec, nsec);
    return sockt->sendto(buf, size, key);
}
// 发送信息立刻返回。
int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){
    NetModSocket *sockt = (NetModSocket *)_socket;
    logger->debug("net_mod_socket_sendto: %d sendto  %d", net_mod_socket_get_key(_socket), key);
    return sockt->sendto_nowait(buf, size, key);
    return sockt->sendto(buf, size, key);
    // return sockt->sendto_nowait(buf, size, key);
}
/**
@@ -80,12 +81,13 @@
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){
    NetModSocket *sockt = (NetModSocket *)_socket;
    // return sockt->recvfrom(buf, size, key);
    return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
    return sockt->recvfrom(buf, size, key);
    // return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
}
int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->recvfrom_nowait(buf, size, key);
    return sockt->recvfrom(buf, size, key);
    // return sockt->recvfrom_nowait(buf, size, key);
}
int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
@@ -100,14 +102,15 @@
int net_mod_socket_sendandrecv_timeout(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size,  int timeout){
    NetModSocket *sockt = (NetModSocket *)_socket;
    // return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size);
    return sockt->sendandrecv_timeout(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size, timeout);
    return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size);
    // return sockt->sendandrecv_timeout(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size, timeout);
}
int net_mod_socket_sendandrecv_nowait(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->sendandrecv_nowait(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size);
    return sockt->sendandrecv(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size);
    // return sockt->sendandrecv_nowait(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size);
}
 
src/socket/shm_mod_socket.cpp
@@ -102,7 +102,7 @@
 * @size 主题长度
 * @key 总线端口
 */
int  ShmModSocket::sub(char *topic, int topic_size, int key,
int  ShmModSocket::sub(const char *topic, int topic_size, int key,
    const struct timespec *timeout, int flags) {
    int ret;
    bus_head_t head = {};
@@ -133,7 +133,7 @@
 * @size 主题长度
 * @key 总线端口
 */
int  ShmModSocket::desub(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
int  ShmModSocket::desub(const char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
    // char buf[8192];
    int ret;
    if(topic == NULL) {
@@ -171,7 +171,7 @@
 * @content 主题内容
 * @key 总线端口
 */
int  ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) {
int  ShmModSocket::pub(const char *topic, int topic_size, const void *content, int content_size, int key, const struct timespec *timeout, int flags) {
    int ret;
    bus_head_t head = {};
    memcpy(head.action, "pub", sizeof(head.action));
@@ -204,7 +204,7 @@
// =============================================================================
int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head, 
  void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) {
 const void *topic_buf, int topic_size, const void *content_buf, int content_size, void **retbuf) {
 
  int buf_size;
  char *buf;
src/socket/shm_mod_socket.h
@@ -33,7 +33,7 @@
     
     
    static int get_bus_sendbuf(bus_head_t &request_head, void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf);
    static int get_bus_sendbuf(bus_head_t &request_head, const void *topic_buf, int topic_size, const void *content_buf, int content_size, void **retbuf);
public:
    static size_t remove_keys(int keys[], size_t length);
@@ -98,7 +98,7 @@
     * @key 总线端口
     * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
     */
    int  sub(char *topic, int size, int key,  const struct timespec *timeout = NULL, int flag = 0);
    int  sub(const char *topic, int size, int key,  const struct timespec *timeout = NULL, int flag = 0);
     /**
@@ -108,7 +108,7 @@
     * @key 总线端口
     * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
     */
    int desub(char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0);
    int desub(const char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0);
    /**
     * 发布主题
@@ -117,7 +117,7 @@
     * @key 总线端口
     * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
     */
    int  pub(char *topic, int topic_size, void *content, int content_size, int key, const  struct timespec *timeout = NULL, int flag = 0);
    int  pub(const char *topic, int topic_size, const void *content, int content_size, int key, const  struct timespec *timeout = NULL, int flag = 0);
    /**
src/socket/shm_socket.cpp
@@ -62,7 +62,6 @@
static LockFreeQueue<shm_msg_t> * shm_socket_attach_queue(int key) {
  LockFreeQueue<shm_msg_t> * queue;
  hashtable_t *hashtable = mm_get_hashtable();
  // hashtable_lock(hashtable);
  void *tmp_ptr = hashtable_get(hashtable, key);
  if (tmp_ptr == NULL || tmp_ptr == (void *)1) {
    //logger->error("shm_socket._remote_queue_attach:connet at key %d  failed!", key);
@@ -169,12 +168,6 @@
  int s;
  int rv;
  if (sockt->socket_type != SHM_SOCKET_DGRAM) {
    logger->error( "shm_socket.shm_sendto: Can't invoke shm_sendto method in a %d type socket  which is "
                "not a SHM_SOCKET_DGRAM socket ",
             sockt->socket_type);
    exit(0);
  }
  hashtable_t *hashtable = mm_get_hashtable();
 
@@ -235,38 +228,33 @@
// 短连接方式接受
int shm_recvfrom(shm_socket_t *sokt, void **buf, int *size, int *key,  const struct timespec *timeout,  int flag) {
int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key,  const struct timespec *timeout,  int flag) {
  int s;
  int rv;
  if (sokt->socket_type != SHM_SOCKET_DGRAM) {
    logger->error("shm_socket.shm_recvfrom: Can't invoke shm_recvfrom method in a %d type socket  which "
                "is not a SHM_SOCKET_DGRAM socket ",
             sokt->socket_type);
    exit(1);
  }
  hashtable_t *hashtable = mm_get_hashtable();
  if ((s = pthread_mutex_lock(&(sokt->mutex))) != 0)
  if ((s = pthread_mutex_lock(&(sockt->mutex))) != 0)
    err_exit(s, "shm_recvfrom : pthread_mutex_lock");
 
  if (sokt->queue == NULL) {
    if (sokt->key == 0) {
      sokt->key = hashtable_alloc_key(hashtable);
  if (sockt->queue == NULL) {
    if (sockt->key == 0) {
      sockt->key = hashtable_alloc_key(hashtable);
    }  
    sokt->queue = shm_socket_bind_queue( sokt->key, sokt->force_bind);
    if(sokt->queue  == NULL ) {
      logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sokt->key);
    sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
    if(sockt->queue  == NULL ) {
      logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
      return EBUS_KEY_INUSED;
    }
  }
  
  if ((s = pthread_mutex_unlock(&(sokt->mutex))) != 0)
  if ((s = pthread_mutex_unlock(&(sockt->mutex))) != 0)
    err_exit(s, "shm_recvfrom : pthread_mutex_unlock");
  shm_msg_t src;
 
  rv = sokt->queue->pop(src, timeout, flag);
  rv = sockt->queue->pop(src, timeout, flag);
  if (rv == 0) {
    if(buf != NULL) {
@@ -330,7 +318,7 @@
// use thread local
int _shm_sendandrecv_thread_local(shm_socket_t *socket, const void *send_buf,
int _shm_sendandrecv_thread_local(shm_socket_t *sockt, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  const struct timespec *timeout,  int flags) {
  int recv_key;
@@ -338,13 +326,6 @@
  // 用thread local 保证每个线程用一个独占的socket接受对方返回的信息
  shm_socket_t *tmp_socket;
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    logger->error( "shm_socket.shm_sendandrecv: Can't invoke shm_sendandrecv method in a %d type socket  "
                "which is not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
    exit(1);
  }
 
  rv = pthread_once(&_once_, _create_socket_key_perthread);
@@ -370,14 +351,19 @@
  if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
    rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
    if(rv != 0) {
      printf("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv));
      logger->error("_shm_sendandrecv_thread_local : %s\n", bus_strerror(rv));
    }
    else if(rv == 0 ) {
      logger->debug("======%d use tmp_socket %d, send to  %d, receive from  %d\n", shm_socket_get_key(sockt), shm_socket_get_key(tmp_socket), send_key, recv_key);
      if(recv_key == shm_socket_get_key(sockt)) {
        logger->debug("=====收到了自己发给自己的消息\n");
      }
      assert( send_key == recv_key);
      if(send_key != recv_key) {
         err_exit(0, "_shm_sendandrecv_thread_local: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
        logger->error( "_shm_sendandrecv_alloc_new: send key expect to equal to recv key! send key =%d , recv key=%d", send_key, recv_key);
        exit(1);
      }
    }
    return rv;
  } else {
@@ -434,7 +420,7 @@
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  const struct timespec *timeout,  int flags) {
  return  _shm_sendandrecv_alloc_new(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout,  flags);
  return  _shm_sendandrecv_thread_local(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout,  flags);
}
test_net_socket/net_mod_socket.sh
@@ -26,18 +26,20 @@
     
}
# 无限循环send
# one_to_many send
function one_to_many() {
    ./test_net_mod_socket --fun="one_sendto_many" \
     --sendlist=" :5000:100, :5000:101, :5000:102"
     
}
# 多线程send
function msend() {
    ./test_net_mod_socket --fun="test_net_sendandrecv_threads" \
     --sendlist="localhost:5000:100, localhost:5000:100"
#
function send() {
    ./test_net_mod_socket --fun="test_net_sendandrecv" \
     --sendlist=" :5000:100, :5000:101, :5000:102"
     
}
# 无限循环 pub
function pub() {
    ./test_net_mod_socket --fun="test_net_pub" \
test_net_socket/one_sendto_many.cpp
File was deleted
test_net_socket/test_net_mod_socket.cpp
@@ -81,7 +81,7 @@
  int key;
  int rv;
  while ((rv = net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) ) == 0) {
    printf("收到订阅消息:%s\n", recvbuf);
    printf("收到订阅消息:%s\n", (char *)recvbuf);
    free(recvbuf);
  }
@@ -144,7 +144,7 @@
  int remote_port;
  while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &remote_port) ) == 0) {
   // printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf);
    sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), recvbuf);
    sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf);
    net_mod_socket_sendto(ser, sendbuf, strlen(sendbuf) + 1, remote_port);
    free(recvbuf);
  }
@@ -201,7 +201,7 @@
                    recv_arr[i].host,
                    recv_arr[i].port,
                    recv_arr[i].key,
                    recv_arr[i].content
                    (char *)recv_arr[i].content
                );
            }
            
@@ -283,7 +283,7 @@
        recv_arr[j].host,
        recv_arr[j].port,
        recv_arr[j].key,
        recv_arr[j].content
        (char *)recv_arr[j].content
      );
      printf("key == %d\n", net_mod_socket_get_key(client));
@@ -359,34 +359,53 @@
  net_node_t *node_arr;
  int node_arr_size = parse_node_list(nodelist, &node_arr);
  char buf[128];
  pid_t pid, rpid ;
  unsigned int l , rl;
  const char *hello_format = "%ld say Hello %u ";
  pid_t pid, retPid ;
  unsigned int l , retl;
  int remoteKey;
  const char *hello_format = "%d say Hello %u ";
  const char *reply_format = "%d RECEIVED %d say Hello %d";
  pid = getpid();
  l = 0;
  client = net_mod_socket_open();
  while(true) {
    sprintf(buf, hello_format, (long)pid, l);
    sprintf(buf, hello_format, pid, l);
    n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1,
      &recv_arr, &recv_arr_size, 1000);
    printf(" %d nodes reply\n", n);
    for(j = 0; j < recv_arr_size; j++) {
      LoggerFactory::getLogger()->debug("%ld send '%s'. received '%s' from (host:%s, port: %d, key:%d) \n",
      printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n",
        (long)pid,
        buf,
        recv_arr[j].content,
        (char *)recv_arr[j].content,
        recv_arr[j].host,
        recv_arr[j].port,
        recv_arr[j].key
      );
      // assert(sscanf((const char *)recv_arr[j].content, hello_format, &rpid, &rl) == 2);
      // assert(rpid == pid);
      // printf( "%d send '%s' to %d. received  from (host=%s, port= %d, key=%d) '%s'\n",
      //   net_mod_socket_get_key(client),
      //   sendbuf,
      //   targ->node->key,
      //   recv_arr[j].host,
      //   recv_arr[j].port,
      //   recv_arr[j].key,
      //   recv_arr[j].content
      // );
      // assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3);
      // assert(targ->node->key == rkey);
      // assert(net_mod_socket_get_key(client) == lkey);
      // assert(rl == l);
      assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3);
      assert(retPid == pid);
      assert(retl == l);
      assert(remoteKey == recv_arr[j].key);
    }
    
    // 使用完后,不要忘记释放掉