wangzhengquan
2021-02-07 14755ab48a9d8f942076e72a4ea064813e06407f
udate
5个文件已修改
72 ■■■■■ 已修改文件
src/net/net_mod_socket.cpp 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.h 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.cpp 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.cpp 40 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.cpp
@@ -308,22 +308,22 @@
}
int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) {
int NetModSocket::pub(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) {
  return _pub_(node_arr, arrlen, topic, topic_size, content,   content_size, -1);
}
int NetModSocket::pub_nowait(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) {
int NetModSocket::pub_nowait(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) {
  return _pub_(node_arr, arrlen, topic, topic_size, content,   content_size, 0);
}
int NetModSocket::pub_timeout(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size, int  msec ) {
int NetModSocket::pub_timeout(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int  msec ) {
  return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, msec);
}
// int  pub(char *topic, int topic_size, void *content, int content_size, int port);
int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content,
int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content,
 int content_size, int  msec) {
  int i, connfd;
  net_node_t *node;
@@ -600,7 +600,7 @@
//======================================================================================
int NetModSocket::write_request(int clientfd, net_mod_request_head_t &request_head, 
  void *content_buf, int content_size, void *topic_buf, int topic_size) {
  const void *content_buf, int content_size, const void *topic_buf, int topic_size) {
 
  int buf_size;
  char *buf;
src/net/net_mod_socket.h
@@ -85,12 +85,12 @@
  //读取返回信息
  int read_response(int clientfd, net_mod_recv_msg_t *recv_msg);
  // 发送请求信息
  int write_request(int clientfd, net_mod_request_head_t &request_head, void *send_buf, int send_size, void *topic_buf, int topic_size);
  int write_request(int clientfd, net_mod_request_head_t &request_head, const void *send_buf, int send_size, const void *topic_buf, int topic_size);
  int _sendandrecv_(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, 
    net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout);
  int _pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size, int timeout) ;
  int _pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int timeout) ;
  
public:
@@ -206,13 +206,13 @@
   * @content 内容,@content_size 内容长度
   * @return 成功发布的节点的个数
   */
  int pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) ;
  int pub(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) ;
  int pub_nowait(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size);
  int pub_nowait(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size);
  /**
   * @msec 毫秒 (千分之一秒)
   */
  int pub_timeout(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size, int  msec);
  int pub_timeout(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int  msec);
  /**
   * 订阅指定主题
   * @topic 主题
src/net/net_mod_socket_wrapper.cpp
@@ -145,15 +145,15 @@
 * @content 内容,@content_size 内容长度
 * @return 成功发布的节点的个数
 */
int net_mod_socket_pub(void *_socket, net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size) {
int net_mod_socket_pub(void *_socket, net_node_t *node_arr, int node_arr_len, const char *topic, int topic_size, const void *content, int content_size) {
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->pub(node_arr, node_arr_len, topic, topic_size, content, content_size);
}
int net_mod_socket_pub_timeout(void *_socket, net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size, int msec){
int net_mod_socket_pub_timeout(void *_socket, net_node_t *node_arr, int node_arr_len, const char *topic, int topic_size, const void *content, int content_size, int msec){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->pub_timeout(node_arr, node_arr_len, topic, topic_size, content, content_size, msec);
}
int net_mod_socket_pub_nowait(void *_socket, net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size){
int net_mod_socket_pub_nowait(void *_socket, net_node_t *node_arr, int node_arr_len, const char *topic, int topic_size, const void *content, int content_size){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->pub_nowait(node_arr, node_arr_len, topic, topic_size, content, content_size);
}
src/net/net_mod_socket_wrapper.h
@@ -245,7 +245,7 @@
   *
   * @return 成功发布的节点的个数
   */
int net_mod_socket_pub(void *_sockt, net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size);
int net_mod_socket_pub(void *_sockt, net_node_t *node_arr, int node_arr_len, const char *topic, int topic_size, const void *content, int content_size);
/**
@@ -260,7 +260,7 @@
  *
  * @return 成功发布的节点的个数
  */
int net_mod_socket_pub_timeout(void *_sockt, net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size, int msec);
int net_mod_socket_pub_timeout(void *_sockt, net_node_t *node_arr, int node_arr_len, const char *topic, int topic_size, const void *content, int content_size, int msec);
/**
@@ -274,7 +274,7 @@
  *
  * @return 成功发布的节点的个数
  */
int net_mod_socket_pub_nowait(void *_sockt, net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size);
int net_mod_socket_pub_nowait(void *_sockt, net_node_t *node_arr, int node_arr_len, const char *topic, int topic_size, const void *content, int content_size);
/**
 * @brief 订阅感兴趣主题的消息
test_net_socket/test_net_mod_socket.cpp
@@ -92,45 +92,26 @@
}
void *bus_handler(void *sockt) {
  // pthread_detach(pthread_self());
void * bus_server;
  
  char action[512];
  while ( true) {
    printf("Input action: Close?\n");
    if(scanf("%s",action) < 1) {
      printf("Invalide action\n");
      continue;
static void stop_bus_handler(int sig) {
  bus_server_socket_wrapper_stop(bus_server);
    }
    if(strcmp(action, "close") == 0) {
      bus_server_socket_wrapper_close(sockt);
      break;
    } else {
      printf("Invalide action\n");
    }
  }
}
void start_bus_server(argument_t &arg) {
  printf("Start bus server\n");
  void * server_socket = bus_server_socket_wrapper_open();
  pthread_t tid;
  // 创建一个线程,可以关闭bus
  if(arg.interactive)
    pthread_create(&tid, NULL, bus_handler, server_socket);
  bus_server = bus_server_socket_wrapper_open();
  if(bus_server_socket_wrapper_start_bus(server_socket) != 0) {
  signal(SIGINT,  stop_bus_handler);
  signal(SIGTERM,  stop_bus_handler);
  if(bus_server_socket_wrapper_start_bus(bus_server) != 0) {
    printf("start bus failed\n");
    exit(1);
  }
  if (pthread_join(tid, NULL) != 0) {
    perror(" pthread_join");
  }
  bus_server_socket_wrapper_close(bus_server);
}
void *serverSockt;
@@ -186,6 +167,7 @@
    //rv = net_mod_socket_recvandsend_timeout(serverSockt, _recvandsend_callback_ , 0, 2000000, NULL );
  net_mod_socket_close(serverSockt);
  logger->debug("stopted\n");
  // while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &key) ) == 0) {
  //  // printf( "server: RECEIVED REQUEST FROM  %d NAME %s\n", key, recvbuf);
  //   sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf);
@@ -457,7 +439,7 @@
  net_node_t *node_arr;
  int node_arr_size = parse_node_list(targ->nodelist, &node_arr);
 
  char *topic = "news";
  const char *topic = "news";
  // char filename[512];
  // sprintf(filename, "test%d.tmp", targ->id);
  // FILE *fp = NULL;