| | |
| | | $(DEST)/%.o: %.c |
| | | $(CC) ${CPPFLAGS} $(CFLAGS) -c $(filter %.c, $^) -o $@ |
| | | |
| | | $(DEST)/%/%.o: %.c |
| | | $(CC) ${CPPFLAGS} $(CFLAGS) -c $(filter %.c, $^) -o $@ |
| | | |
| | | ${DEST}/%: %.o |
| | | $(CC) $(LDFLAGS) $(filter %.o, $^) -o $@ $(LDLIBS) |
| | |
| | | |
| | | #include "net_mod_server_socket_wrapper.h" |
| | | |
| | | void *net_mod_server_socket_open(int port) { |
| | | net_mod_server_socket_t *sockt = (net_mod_server_socket_t *)malloc(sizeof(net_mod_server_socket_t)); |
| | | sockt->sockt = new NetModServerSocket(port); |
| | | return (void *)sockt; |
| | | } |
| | | |
| | | void net_mod_server_socket_close(void *_sockt) { |
| | | net_mod_server_socket_t *sockt = (net_mod_server_socket_t *)_sockt; |
| | | delete sockt->sockt; |
| | | free(sockt); |
| | | |
| | | } |
| | | |
| | | void net_mod_server_socket_start(void *_sockt) { |
| | | net_mod_server_socket_t *sockt = (net_mod_server_socket_t *)_sockt; |
| | | sockt->sockt->start(); |
| | | } |
| | |
| | | #ifndef __NET_MOD_SERVER_SOCKET_H__ |
| | | #define __NET_MOD_SERVER_SOCKET_H__ |
| | | |
| | | #include "net_mod_server_socket.h" |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | | #endif |
| | | |
| | | struct net_mod_server_socket_t |
| | | { |
| | | NetModServerSocket *sockt; |
| | | }; |
| | | |
| | | /** |
| | | * 创建 |
| | | */ |
| | | void *net_mod_server_socket_open(int port) ; |
| | | |
| | | /** |
| | | * 关闭 |
| | | */ |
| | | void net_mod_server_socket_close(void *_sockt) ; |
| | | |
| | | /** |
| | | * 启动 |
| | | */ |
| | | void net_mod_server_socket_start(void *_sockt); |
| | | |
| | | |
| | | |
| | |
| | | #include "net_mod_socket_io.h" |
| | | |
| | | |
| | | std::map<std::string, rio_t *> NetModSocket::connectionMap; |
| | | std::map<std::string, int> NetModSocket::connectionMap; |
| | | |
| | | NetModSocket::NetModSocket() |
| | | { |
| | |
| | | |
| | | |
| | | NetModSocket::~NetModSocket() { |
| | | rio_t * rio; |
| | | int clientfd; |
| | | for (auto map_iter = connectionMap.begin(); map_iter != connectionMap.end(); map_iter++) { |
| | | rio = map_iter->second; |
| | | Close(rio->rio_fd); |
| | | if(rio != NULL) { |
| | | free(rio); |
| | | } |
| | | clientfd = map_iter->second; |
| | | Close(clientfd); |
| | | |
| | | } |
| | | } |
| | | |
| | | int NetModSocket::connect(const char *host, int port) { |
| | | std::map<std::string, int>::iterator mapIter; |
| | | int clientfd; |
| | | char mapKey[256]; |
| | | char portstr[32]; |
| | | |
| | | sprintf(mapKey, "%s:%d", host, port); |
| | | if( ( mapIter = connectionMap.find(mapKey)) != connectionMap.end()) { |
| | | clientfd = mapIter->second; |
| | | } else { |
| | | |
| | | sprintf(portstr, "%d", port); |
| | | clientfd = open_clientfd(host, portstr); |
| | | connectionMap.insert({mapKey, clientfd}); |
| | | |
| | | } |
| | | return clientfd; |
| | | } |
| | | |
| | | void NetModSocket::remove_connect(const char *host, int port) { |
| | | std::map<std::string, int>::iterator mapIter; |
| | | char mapKey[256]; |
| | | // char portstr[32]; |
| | | sprintf(mapKey, "%s:%d", host, port); |
| | | connectionMap.erase(mapKey); |
| | | } |
| | | |
| | | int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { |
| | | |
| | | int i, n, clientfd; |
| | | char portstr[32]; |
| | | net_node_t *node; |
| | | char mapKey[256]; |
| | | void *recv_buf; |
| | | int recv_size; |
| | | char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH]; |
| | | net_mod_request_head_t request_head = {}; |
| | | net_mod_response_head_t response_head; |
| | | std::map<std::string, rio_t*>::iterator mapIter; |
| | | rio_t *rio; |
| | | |
| | | |
| | | int nsuc = 0; |
| | | net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t)); |
| | | |
| | | for (i = 0; i< arrlen; i++) { |
| | |
| | | shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size); |
| | | goto LABEL_ARR_PUSH; |
| | | } |
| | | sprintf(mapKey, "%s:%d", node->host, node->port); |
| | | if( ( mapIter = connectionMap.find(mapKey)) != connectionMap.end()) { |
| | | rio = mapIter->second; |
| | | } else { |
| | | rio = (rio_t *)malloc(sizeof(rio_t)); |
| | | sprintf(portstr, "%d", node->port); |
| | | clientfd = Open_clientfd(node-> host, portstr); |
| | | Rio_readinitb(rio, clientfd); |
| | | connectionMap.insert({mapKey, rio}); |
| | | |
| | | if( (clientfd = connect(node->host, node->port)) < 0 ) { |
| | | continue; |
| | | } |
| | | |
| | | request_head.mod = REQ_REP; |
| | | request_head.key = node->key; |
| | | request_head.content_length = send_size; |
| | | request_head.topic_length = 0; |
| | | if(rio_writen(rio->rio_fd, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH) != NET_MODE_REQUEST_HEAD_LENGTH) { |
| | | if(rio_writen(clientfd, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH) != NET_MODE_REQUEST_HEAD_LENGTH) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::send head rio_writen"); |
| | | exit(1); |
| | | remove_connect(node->host, node->port); |
| | | close(clientfd); |
| | | continue; |
| | | |
| | | } |
| | | |
| | | if(rio_writen(rio->rio_fd, send_buf, send_size) != send_size ) { |
| | | if(rio_writen(clientfd, send_buf, send_size) != send_size ) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::send conent rio_writen"); |
| | | exit(1); |
| | | remove_connect(node->host, node->port); |
| | | close(clientfd); |
| | | continue; |
| | | } |
| | | |
| | | |
| | | if ( rio_readnb(rio, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH) { |
| | | if ( rio_readn(clientfd, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::send rio_readnb"); |
| | | exit(1); |
| | | remove_connect(node->host, node->port); |
| | | close(clientfd); |
| | | continue; |
| | | } |
| | | |
| | | response_head = NetModSocket::decode_response_head(response_head_bs); |
| | |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc"); |
| | | exit(1); |
| | | } |
| | | if ( (recv_size = rio_readnb(rio, recv_buf, response_head.content_length) ) != response_head.content_length) { |
| | | if ( (recv_size = rio_readn(clientfd, recv_buf, response_head.content_length) ) != response_head.content_length) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::send rio_readnb"); |
| | | exit(1); |
| | | remove_connect(node->host, node->port); |
| | | close(clientfd); |
| | | continue; |
| | | } |
| | | |
| | | LABEL_ARR_PUSH: |
| | | if(node->host != NULL) { |
| | | strcpy( ret_arr[i].host, node->host); |
| | | strcpy( ret_arr[nsuc].host, node->host); |
| | | } else { |
| | | strcpy( ret_arr[i].host, "local"); |
| | | strcpy( ret_arr[nsuc].host, "local"); |
| | | } |
| | | |
| | | ret_arr[i].port = node->port; |
| | | ret_arr[i].key = node->key; |
| | | ret_arr[i].content = recv_buf; |
| | | ret_arr[i].content_length = recv_size; |
| | | ret_arr[nsuc].port = node->port; |
| | | ret_arr[nsuc].key = node->key; |
| | | ret_arr[nsuc].content = recv_buf; |
| | | ret_arr[nsuc].content_length = recv_size; |
| | | |
| | | nsuc++; |
| | | } |
| | | |
| | | *recv_arr = ret_arr; |
| | | if(recv_arr_size != NULL) { |
| | | *recv_arr_size = i; |
| | | *recv_arr_size = nsuc; |
| | | } |
| | | |
| | | return i; |
| | | return nsuc; |
| | | |
| | | } |
| | | |
| | |
| | | |
| | | int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) { |
| | | int i, n, clientfd; |
| | | char portstr[32]; |
| | | net_node_t *node; |
| | | char mapKey[256]; |
| | | void *recv_buf; |
| | | int recv_size; |
| | | char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH]; |
| | | net_mod_request_head_t request_head; |
| | | net_mod_response_head_t response_head; |
| | | std::map<std::string, rio_t*>::iterator mapIter; |
| | | rio_t *rio; |
| | | std::map<std::string, int>::iterator mapIter; |
| | | int nsuc = 0; |
| | | for (i = 0; i< arrlen; i++) { |
| | | |
| | | node = &node_arr[i]; |
| | | if(node->host == NULL) { |
| | | // 本地发送 |
| | | shmModSocket.pub(topic, topic_size, content, content_size, node->key); |
| | | |
| | | |
| | | } else { |
| | | sprintf(mapKey, "%s:%d", node->host, node->port); |
| | | if( ( mapIter = connectionMap.find(mapKey)) != connectionMap.end()) { |
| | | rio = mapIter->second; |
| | | } else { |
| | | rio = (rio_t *)malloc(sizeof(rio_t)); |
| | | sprintf(portstr, "%d", node->port); |
| | | clientfd = Open_clientfd(node-> host, portstr); |
| | | Rio_readinitb(rio, clientfd); |
| | | connectionMap.insert({mapKey, rio}); |
| | | if( (clientfd = connect(node->host, node->port)) < 0 ) { |
| | | continue; |
| | | } |
| | | |
| | | request_head.mod = BUS; |
| | | request_head.key = node->key; |
| | | request_head.content_length = content_size; |
| | | request_head.topic_length = strlen(topic) + 1; |
| | | if(rio_writen(rio->rio_fd, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH) != NET_MODE_REQUEST_HEAD_LENGTH) { |
| | | if(rio_writen(clientfd, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH) != NET_MODE_REQUEST_HEAD_LENGTH) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::pub head rio_writen"); |
| | | exit(1); |
| | | remove_connect(node->host, node->port); |
| | | close(clientfd); |
| | | continue; |
| | | |
| | | } |
| | | |
| | | if(rio_writen(rio->rio_fd, content, content_size) != content_size ) { |
| | | if(rio_writen(clientfd, content, content_size) != content_size ) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::pub rio_writen conent "); |
| | | exit(1); |
| | | remove_connect(node->host, node->port); |
| | | close(clientfd); |
| | | continue; |
| | | } |
| | | |
| | | if(rio_writen(rio->rio_fd, topic, request_head.topic_length) != request_head.topic_length ) { |
| | | if(rio_writen(clientfd, topic, request_head.topic_length) != request_head.topic_length ) { |
| | | LoggerFactory::getLogger()->error(errno, "NetModSocket::pub rio_writen conent "); |
| | | exit(1); |
| | | remove_connect(node->host, node->port); |
| | | close(clientfd); |
| | | continue; |
| | | } |
| | | } |
| | | |
| | | nsuc++; |
| | | } |
| | | return i; |
| | | return nsuc; |
| | | } |
| | | |
| | | void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) { |
| | |
| | | class NetModSocket { |
| | | friend class NetModServerSocket; |
| | | private: |
| | | static std::map<std::string, rio_t *> connectionMap; |
| | | static std::map<std::string, int> connectionMap; |
| | | ShmModSocket shmModSocket; |
| | | |
| | | |
| | |
| | | static void * encode_response_head(net_mod_response_head_t & response); |
| | | static net_mod_response_head_t decode_response_head(void *_headbs); |
| | | |
| | | int connect(const char *host, int port); |
| | | void remove_connect(const char *host, int port); |
| | | |
| | | public: |
| | | |
| | | NetModSocket(); |
| | |
| | | #include "net_mod_socket_wrapper.h" |
| | | /** |
| | | * 创建 |
| | | */ |
| | | void * net_mod_socket_open() { |
| | | net_mod_socket_t *sockt = (net_mod_socket_t *)malloc(sizeof(net_mod_socket_t)); |
| | | sockt->sockt = new NetModSocket; |
| | | return (void *)sockt; |
| | | } |
| | | |
| | | /** |
| | | * 关闭 |
| | | */ |
| | | void net_mod_socket_close(void *_sockt) { |
| | | net_mod_socket_t *sockt = (net_mod_socket_t *)_sockt; |
| | | delete sockt->sockt; |
| | | free(sockt); |
| | | } |
| | | |
| | | /** |
| | | * 向node_arr 中的所有网络节点发送请求消息,节点的返回信息汇总并存储在recv_arr中 |
| | | * @node_arr 网络节点组, @node_arr_len该数组长度 |
| | | * @send_buf 发送的消息,@send_size 该消息体的长度 |
| | | * @recv_arr 返回的应答消息组,@recv_arr_size 该数组长度 |
| | | * @return 成功发送的节点的个数 |
| | | */ |
| | | int net_mod_socket_sendandrecv(void *_sockt, net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) { |
| | | net_mod_socket_t *sockt = (net_mod_socket_t *)_sockt; |
| | | return sockt->sockt->sendandrecv(node_arr, node_arr_len, send_buf, send_size, recv_arr, recv_arr_size); |
| | | } |
| | | |
| | | /** |
| | | * 销毁sendandrecv方法返回的消息组 |
| | | * @arr 消息组 |
| | | * @size 消息组的长度 |
| | | */ |
| | | void net_mod_socket_free_recv_msg_arr(net_mod_recv_msg_t * arr, int len) { |
| | | |
| | | return NetModSocket::free_recv_msg_arr(arr, len); |
| | | } |
| | | |
| | | /** |
| | | * 向node_arr 中的所有网络节点发布消息 |
| | | * @node_arr 网络节点组, @node_arr_len该数组长度 |
| | | * @topic 主题,@topic_size 该主题的长度 |
| | | * @content 内容,@content_size 内容长度 |
| | | * @return 成功发布的节点的个数 |
| | | */ |
| | | int net_mod_socket_pub(void *_sockt, net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size) { |
| | | net_mod_socket_t *sockt = (net_mod_socket_t *)_sockt; |
| | | return sockt->sockt->pub(node_arr, node_arr_len, topic, topic_size, content, content_size); |
| | | } |
| | |
| | | extern "C" { |
| | | #endif |
| | | |
| | | struct net_mod_socket |
| | | struct net_mod_socket_t |
| | | { |
| | | NetModSocket *sockt; |
| | | }; |
| | | |
| | | /** |
| | | * 创建 |
| | | */ |
| | | void * net_mod_socket_open(); |
| | | void net_mod_socket_close(); |
| | | |
| | | /** |
| | | * 关闭 |
| | | */ |
| | | void net_mod_socket_close(void *_sockt); |
| | | |
| | | /** |
| | | * 向node_arr 中的所有网络节点发送请求消息,节点的返回信息汇总并存储在recv_arr中 |
| | | * @node_arr 网络节点组, @node_arr_len该数组长度 |
| | |
| | | * @recv_arr 返回的应答消息组,@recv_arr_size 该数组长度 |
| | | * @return 成功发送的节点的个数 |
| | | */ |
| | | int net_mod_socket_sendandrecv(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, |
| | | int net_mod_socket_sendandrecv(void *_sockt, net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, |
| | | net_mod_recv_msg_t ** recv_arr, int *recv_arr_size); |
| | | |
| | | /** |
| | |
| | | * @arr 消息组 |
| | | * @size 消息组的长度 |
| | | */ |
| | | static void net_mod_socket_free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size); |
| | | void net_mod_socket_free_recv_msg_arr(net_mod_recv_msg_t * arr, int size); |
| | | |
| | | /** |
| | | * 向node_arr 中的所有网络节点发布消息 |
| | |
| | | * @content 内容,@content_size 内容长度 |
| | | * @return 成功发布的节点的个数 |
| | | */ |
| | | int net_mod_socket_pub(net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size); |
| | | int net_mod_socket_pub(void *_sockt, net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size); |
| | | |
| | | |
| | | #ifdef __cplusplus |
| | |
| | | ShmModSocket(); |
| | | ~ShmModSocket(); |
| | | |
| | | |
| | | /** |
| | | * 绑定端口到socket, 如果不绑定则系统自动分配一个 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | |
| | | } |
| | | |
| | | int shm_close_socket(shm_socket_t *socket) { |
| | | |
| | | int ret; |
| | | switch (socket->socket_type) { |
| | | case SHM_SOCKET_STREAM: |
| | | return _shm_close_stream_socket(socket, true); |
| | | case SHM_SOCKET_DGRAM: |
| | | return _shm_close_dgram_socket(socket); |
| | | default: |
| | | return -1; |
| | | case SHM_SOCKET_STREAM: |
| | | ret = _shm_close_stream_socket(socket, true); |
| | | break; |
| | | case SHM_SOCKET_DGRAM: |
| | | ret = _shm_close_dgram_socket(socket); |
| | | break; |
| | | default: |
| | | break; |
| | | } |
| | | return -1; |
| | | SemUtil::remove(socket->mutex); |
| | | free(socket); |
| | | return ret; |
| | | } |
| | | |
| | | int shm_socket_bind(shm_socket_t *socket, int port) { |
| | |
| | | if (socket->dispatch_thread != 0) |
| | | pthread_cancel(socket->dispatch_thread); |
| | | |
| | | free(socket); |
| | | |
| | | return 0; |
| | | |
| | | } |
| | |
| | | delete socket->queue; |
| | | socket->queue = NULL; |
| | | } |
| | | free(socket); |
| | | |
| | | return 0; |
| | | } |
| | | |
| | |
| | | #include "net_mod_server_socket.h" |
| | | #include "net_mod_socket.h" |
| | | #include "net_mod_server_socket_wrapper.h" |
| | | #include "net_mod_socket_wrapper.h" |
| | | #include "shm_mm.h" |
| | | #include "dgram_mod_socket.h" |
| | | #include "usg_common.h" |
| | | |
| | | void server(int port) { |
| | | NetModServerSocket *serverSocket = new NetModServerSocket(port); |
| | | serverSocket->start(); |
| | | void *serverSocket = net_mod_server_socket_open(port); |
| | | net_mod_server_socket_start(serverSocket); |
| | | } |
| | | |
| | | void client(int port ){ |
| | | NetModSocket client; |
| | | void * client = net_mod_socket_open(); |
| | | char content[MAXLINE]; |
| | | char action[512]; |
| | | char topic[512]; |
| | |
| | | printf("Please input topic and content\n"); |
| | | scanf("%s %s", topic, content); |
| | | |
| | | n = client.pub(pub_node_arr, pub_node_arr_size, topic, strlen(topic)+1, content, strlen(content)+1); |
| | | printf("pub %d\n", n); |
| | | n = net_mod_socket_pub(client, pub_node_arr, pub_node_arr_size, topic, strlen(topic)+1, content, strlen(content)+1); |
| | | printf("pub %d nodes\n", n); |
| | | } |
| | | else if(strcmp(action, "send") == 0) { |
| | | getc(stdin); |
| | |
| | | |
| | | if (fgets(content, MAXLINE, stdin) != NULL) { |
| | | // 收到消息的节点即使没有对应的信息, 也要回复一个表示无的消息,否则会一直等待 |
| | | n = client.sendandrecv( node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size); |
| | | n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size); |
| | | printf("send %d nodes\n", n); |
| | | for(i=0; i<recv_arr_size; i++) { |
| | | printf("host:%s, port: %d, key:%d, content: %s\n", |
| | | recv_arr[i].host, |
| | |
| | | recv_arr[i].content |
| | | ); |
| | | } |
| | | //使用完后,不要忘记释放掉 |
| | | NetModSocket::free_recv_msg_arr(recv_arr, recv_arr_size); |
| | | |
| | | // 使用完后,不要忘记释放掉 |
| | | net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size); |
| | | } |
| | | } |
| | | else if(strcmp(action, "quit") == 0) { |
| | |
| | | } |
| | | |
| | | } |
| | | |
| | | net_mod_socket_close(client); |
| | | |
| | | |
| | | } |