| | |
| | | } |
| | | |
| | | |
| | | int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) { |
| | | int NetModSocket::pub(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) { |
| | | return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, -1); |
| | | } |
| | | |
| | | int NetModSocket::pub_nowait(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) { |
| | | int NetModSocket::pub_nowait(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) { |
| | | return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, 0); |
| | | } |
| | | |
| | | int NetModSocket::pub_timeout(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size, int msec ) { |
| | | int NetModSocket::pub_timeout(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int msec ) { |
| | | return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, msec); |
| | | } |
| | | |
| | | |
| | | // int pub(char *topic, int topic_size, void *content, int content_size, int port); |
| | | |
| | | int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, |
| | | int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, |
| | | int content_size, int msec) { |
| | | int i, connfd; |
| | | net_node_t *node; |
| | |
| | | //====================================================================================== |
| | | |
| | | int NetModSocket::write_request(int clientfd, net_mod_request_head_t &request_head, |
| | | void *content_buf, int content_size, void *topic_buf, int topic_size) { |
| | | const void *content_buf, int content_size, const void *topic_buf, int topic_size) { |
| | | |
| | | int buf_size; |
| | | char *buf; |
| | |
| | | //读取返回信息 |
| | | int read_response(int clientfd, net_mod_recv_msg_t *recv_msg); |
| | | // 发送请求信息 |
| | | int write_request(int clientfd, net_mod_request_head_t &request_head, void *send_buf, int send_size, void *topic_buf, int topic_size); |
| | | int write_request(int clientfd, net_mod_request_head_t &request_head, const void *send_buf, int send_size, const void *topic_buf, int topic_size); |
| | | |
| | | int _sendandrecv_(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout); |
| | | |
| | | int _pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size, int timeout) ; |
| | | int _pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int timeout) ; |
| | | |
| | | |
| | | public: |
| | |
| | | * @content 内容,@content_size 内容长度 |
| | | * @return 成功发布的节点的个数 |
| | | */ |
| | | int pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) ; |
| | | int pub(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) ; |
| | | |
| | | int pub_nowait(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size); |
| | | int pub_nowait(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size); |
| | | /** |
| | | * @msec 毫秒 (千分之一秒) |
| | | */ |
| | | int pub_timeout(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size, int msec); |
| | | int pub_timeout(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int msec); |
| | | /** |
| | | * 订阅指定主题 |
| | | * @topic 主题 |
| | |
| | | * @content 内容,@content_size 内容长度 |
| | | * @return 成功发布的节点的个数 |
| | | */ |
| | | int net_mod_socket_pub(void *_socket, net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size) { |
| | | int net_mod_socket_pub(void *_socket, net_node_t *node_arr, int node_arr_len, const char *topic, int topic_size, const void *content, int content_size) { |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->pub(node_arr, node_arr_len, topic, topic_size, content, content_size); |
| | | } |
| | | int net_mod_socket_pub_timeout(void *_socket, net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size, int msec){ |
| | | int net_mod_socket_pub_timeout(void *_socket, net_node_t *node_arr, int node_arr_len, const char *topic, int topic_size, const void *content, int content_size, int msec){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->pub_timeout(node_arr, node_arr_len, topic, topic_size, content, content_size, msec); |
| | | } |
| | | int net_mod_socket_pub_nowait(void *_socket, net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size){ |
| | | int net_mod_socket_pub_nowait(void *_socket, net_node_t *node_arr, int node_arr_len, const char *topic, int topic_size, const void *content, int content_size){ |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | return sockt->pub_nowait(node_arr, node_arr_len, topic, topic_size, content, content_size); |
| | | } |
| | |
| | | * |
| | | * @return 成功发布的节点的个数 |
| | | */ |
| | | int net_mod_socket_pub(void *_sockt, net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size); |
| | | int net_mod_socket_pub(void *_sockt, net_node_t *node_arr, int node_arr_len, const char *topic, int topic_size, const void *content, int content_size); |
| | | |
| | | |
| | | /** |
| | |
| | | * |
| | | * @return 成功发布的节点的个数 |
| | | */ |
| | | int net_mod_socket_pub_timeout(void *_sockt, net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size, int msec); |
| | | int net_mod_socket_pub_timeout(void *_sockt, net_node_t *node_arr, int node_arr_len, const char *topic, int topic_size, const void *content, int content_size, int msec); |
| | | |
| | | |
| | | /** |
| | |
| | | * |
| | | * @return 成功发布的节点的个数 |
| | | */ |
| | | int net_mod_socket_pub_nowait(void *_sockt, net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size); |
| | | int net_mod_socket_pub_nowait(void *_sockt, net_node_t *node_arr, int node_arr_len, const char *topic, int topic_size, const void *content, int content_size); |
| | | |
| | | /** |
| | | * @brief 订阅感兴趣主题的消息 |
| | |
| | | } |
| | | |
| | | |
| | | void *bus_handler(void *sockt) { |
| | | // pthread_detach(pthread_self()); |
| | | void * bus_server; |
| | | |
| | | char action[512]; |
| | | while ( true) { |
| | | printf("Input action: Close?\n"); |
| | | if(scanf("%s",action) < 1) { |
| | | printf("Invalide action\n"); |
| | | continue; |
| | | static void stop_bus_handler(int sig) { |
| | | bus_server_socket_wrapper_stop(bus_server); |
| | | } |
| | | |
| | | if(strcmp(action, "close") == 0) { |
| | | bus_server_socket_wrapper_close(sockt); |
| | | break; |
| | | } else { |
| | | printf("Invalide action\n"); |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | void start_bus_server(argument_t &arg) { |
| | | printf("Start bus server\n"); |
| | | void * server_socket = bus_server_socket_wrapper_open(); |
| | | pthread_t tid; |
| | | // 创建一个线程,可以关闭bus |
| | | if(arg.interactive) |
| | | pthread_create(&tid, NULL, bus_handler, server_socket); |
| | | bus_server = bus_server_socket_wrapper_open(); |
| | | |
| | | if(bus_server_socket_wrapper_start_bus(server_socket) != 0) { |
| | | signal(SIGINT, stop_bus_handler); |
| | | signal(SIGTERM, stop_bus_handler); |
| | | |
| | | if(bus_server_socket_wrapper_start_bus(bus_server) != 0) { |
| | | printf("start bus failed\n"); |
| | | exit(1); |
| | | } |
| | | |
| | | if (pthread_join(tid, NULL) != 0) { |
| | | perror(" pthread_join"); |
| | | } |
| | | bus_server_socket_wrapper_close(bus_server); |
| | | } |
| | | |
| | | void *serverSockt; |
| | |
| | | //rv = net_mod_socket_recvandsend_timeout(serverSockt, _recvandsend_callback_ , 0, 2000000, NULL ); |
| | | net_mod_socket_close(serverSockt); |
| | | logger->debug("stopted\n"); |
| | | |
| | | // while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &key) ) == 0) { |
| | | // // printf( "server: RECEIVED REQUEST FROM %d NAME %s\n", key, recvbuf); |
| | | // sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf); |
| | |
| | | net_node_t *node_arr; |
| | | int node_arr_size = parse_node_list(targ->nodelist, &node_arr); |
| | | |
| | | char *topic = "news"; |
| | | const char *topic = "news"; |
| | | // char filename[512]; |
| | | // sprintf(filename, "test%d.tmp", targ->id); |
| | | // FILE *fp = NULL; |