#include "epollfd_shm.h" #include #include #include #include #include #include #include #include #include #include #include #include "transfd_shm.h" static void setfdnonblock(int fd, int nonblock){ int flags = fcntl(fd, F_GETFL); if (flags < 0) return; if (nonblock) { flags |= O_NONBLOCK; } else { flags &= ~O_NONBLOCK; } if (fcntl(fd, F_SETFL, flags) < 0){ printf("setfdnonblock %s failed %d -> %s\n", nonblock ? "nonblock" : "block", errno, strerror(errno)); } } static inline int fdnonblock(int fd){ int flags = fcntl(fd, F_GETFL); if (flags < 0) return 0; return flags & O_NONBLOCK; } int unix_domain_server_fd(const char* path, int nonblock){ struct sockaddr_un addr; if (strlen(path) > sizeof(addr.sun_path) -1){ printf("unix_domain_server_fd path too long, must less than %lu\n", sizeof(addr.sun_path)); return -1; } memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; memcpy(addr.sun_path, path, strlen(path)); int fd = socket(AF_UNIX, SOCK_SEQPACKET|SOCK_CLOEXEC, 0); if (fd < 0) { printf("unix_domain_server_fd failed %d -> %s\n", errno, strerror(errno)); return fd; } unlink(path); int ret = bind(fd, (const struct sockaddr *)&addr, sizeof(addr)); if (ret != 0){ close(fd); printf("unix_domain_server_fd bind failed %d -> %s\n", errno, strerror(errno)); return ret; } ret = listen(fd, 128); if (ret != 0){ printf("unix_domain_server_fd listen failed %d -> %s\n", errno, strerror(errno)); close(fd); return ret; } if (nonblock) setfdnonblock(fd, 1); return fd; } int unix_domain_client_fd(const char *path, int nonblock){ struct sockaddr_un addr; memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; memcpy(addr.sun_path, path, strlen(path)); int fd = socket(PF_UNIX, SOCK_SEQPACKET|SOCK_CLOEXEC, 0); if (fd < 0){ printf("unix_domain_client_fd socket failed %d -> %s\n", errno, strerror(errno)); return fd; } int ret = connect(fd, (const struct sockaddr *)&addr, sizeof(addr)); if (ret != 0){ printf("unix_domain_client_fd connect failed %d -> %s\n", errno, strerror(errno)); close(fd); return ret; } if (nonblock) setfdnonblock(fd, 1); return fd; } static int epoll_init(int fd, int flags){ int epollfd = epoll_create1(EPOLL_CLOEXEC); if (epollfd < 0){ printf("epoll_loop_server create epollfd failed %d -> %s", errno, strerror(errno)); return -1; } struct epoll_event event; event.events = flags; event.data.fd = fd; if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) < 0){ printf("epoll_loop_server add failed %d -> %s\n", errno, strerror(errno)); return -1; } return epollfd; } static void update_client(int clients[], int max, int newFd, int oldFd){ for(int i = 0; i < max; i++){ if (clients[i] == oldFd){ clients[i] = newFd; break; } } } static int no_quit(void* args){ return 0; } // 认为最多不会超过256个客户端连接 static const int maxConns = 256; // 认为最多一次传输的fd不超过16个 static const int maxMemFds = 16; static int copyFds(void* args, cbGetData fget, int memFds[]) { struct fd_msg msg; memset(&msg, 0, sizeof(msg)); int memFds_len = 0; if (fget(args, &msg)){ if (msg.data) { memcpy(memFds, msg.data, sizeof(int) * msg.len); memFds_len = msg.len; } else { memFds[0] = msg.fd; memFds_len = 1; } } return memFds_len; } static void handleFds(void* args, cbHandleData fhandle, int memFds[], int fds) { struct fd_msg msg = {}; int fd_len = fds > maxMemFds ? maxMemFds : fds; if (fd_len > 1) { msg.data = memFds; msg.len = fd_len; } else { msg.fd = memFds[0]; } fhandle(args, &msg); } int epoll_loop(int listenfd, cbGetData fget, cbHandleData fhandle, cbQuit fquit, void* args){ if (!fget && !fhandle) { printf("epoll At Leaset MUST be a sender or receiver\n"); return -1; } int is_sender = fget ? 1 : 0; int is_recver = fhandle ? 1 : 0; struct sockaddr_un connaddr; socklen_t len = sizeof(connaddr); cbQuit fq = no_quit; if (fquit) fq = fquit; int clients[maxConns]; memset(clients, -1, maxConns * sizeof(int)); int ET = 0; int epollfd = epoll_init(listenfd, EPOLLIN|ET); if (epollfd == -1){ return epollfd; } int listenfd_nonblock = fdnonblock(listenfd); while (!fq(args)) { struct epoll_event events[maxConns]; memset(events, 0, maxConns * sizeof(struct epoll_event)); int nready = epoll_wait(epollfd, events, maxConns, 3000); if (nready == -1){ printf("epoll_loop wait failed %d -> %s\n", errno, strerror(errno)); break; } if (nready == 0){ // timeout continue; } // 发送给所有当前已连接相同的fd int memFds[maxMemFds]; memset(memFds, -1, sizeof(int) * maxMemFds); int memFds_len = 0; if (is_sender){ memFds_len = copyFds(args, fget, memFds); } for(int i = 0; i < nready; i++){ if (events[i].data.fd == listenfd){ int conn; if (listenfd_nonblock){ while((conn = accept(listenfd, (struct sockaddr *)&connaddr, &len)) > 0){ setfdnonblock(conn, 1); struct epoll_event ev; ev.data.fd = conn; ev.events = EPOLLRDHUP|ET; if (is_sender) ev.events |= EPOLLOUT; if (is_recver) ev.events |= EPOLLIN; epoll_ctl(epollfd, EPOLL_CTL_ADD, conn, &ev); update_client(clients, maxConns, conn, -1); } } else{ conn = accept(listenfd, (struct sockaddr *)&connaddr, &len); if (conn < 0){ printf("epoll_loop accept failed %d -> %s\n", errno, strerror(errno)); continue; } setfdnonblock(conn, 1); struct epoll_event ev; ev.data.fd = conn; ev.events = EPOLLRDHUP|ET; if (is_sender) ev.events |= EPOLLOUT; if (is_recver) ev.events |= EPOLLIN; epoll_ctl(epollfd, EPOLL_CTL_ADD, conn, &ev); update_client(clients, maxConns, conn, -1); } } else if (events[i].events & EPOLLIN){ // 此处应该没有用 int conn = events[i].data.fd; if (events[i].events & EPOLLRDHUP){ close(conn); struct epoll_event ev = events[i]; epoll_ctl(epollfd, EPOLL_CTL_DEL, conn, &ev); update_client(clients, maxConns, -1, conn); continue; } if (conn < 0) continue; int recvFds[maxMemFds]; memset(recvFds, -1, sizeof(int) * maxMemFds); int fds = recvfd(conn, recvFds, maxMemFds); if (fds < 0){ printf("epoll_loop recvfd failed\n"); continue; }else if (fds == 0){ close(conn); struct epoll_event ev = events[i]; epoll_ctl(epollfd, EPOLL_CTL_DEL, conn, &ev); update_client(clients, maxConns, -1, conn); continue; } else if (fds > maxMemFds){ printf("epoll_loop recvfd too much! max recv fds %d\n", maxMemFds); } handleFds(args, fhandle, recvFds, fds); } else if (events[i].events & EPOLLOUT){ int conn = events[i].data.fd; if (events[i].events & EPOLLRDHUP){ close(conn); struct epoll_event ev = events[i]; epoll_ctl(epollfd, EPOLL_CTL_DEL, conn, &ev); update_client(clients, maxConns, -1, conn); continue; } if (conn < 0) continue; if (memFds_len > 0){ // 仅发送一个 sizeof(memFd) int ret = sendfd(conn, memFds, memFds_len); if (ret == -1){ printf("epoll_loop sendfd %d client in %d failed\n", i, nready); } else if (ret == 0){ close(conn); struct epoll_event ev = events[i]; epoll_ctl(epollfd, EPOLL_CTL_DEL, conn, &ev); update_client(clients, maxConns, -1, conn); continue; } } } } for (int i = 0; i < memFds_len; i++) { close(memFds[i]); } } for(int i = 0; i < maxConns; i++){ if (clients[i] != -1) close(clients[i]); } close(epollfd); return 0; } int poll_loop(int listenfd, cbGetData fget, cbHandleData fhandle, cbQuit fquit, void* args){ if (!fget && !fhandle) { printf("poll At Leaset MUST be a sender or receiver\n"); return -1; } int is_sender = fget ? 1 : 0; int is_recver = fhandle ? 1 : 0; struct sockaddr_un connaddr; socklen_t len = sizeof(connaddr); cbQuit fq = no_quit; if (fquit) fq = fquit; struct pollfd clientfds[maxConns]; memset(clientfds, -1, maxConns * sizeof(struct pollfd)); clientfds[0].fd = listenfd; clientfds[0].events = POLLIN; int maxi = 1; int listenfd_nonblock = fdnonblock(listenfd); while (!fq(args)) { int nready = poll(clientfds, maxConns, 1000); if (nready == -1){ printf("poll_loop wait failed %d -> %s\n", errno, strerror(errno)); break; } if (nready == 0){ // timeout continue; } if (clientfds[0].fd == listenfd && (clientfds[0].revents & POLLIN)) { int conn; if (listenfd_nonblock){ while((conn = accept(listenfd, (struct sockaddr *)&connaddr, &len)) > 0){ setfdnonblock(conn, 1); clientfds[maxi].fd = conn; if (is_sender) clientfds[maxi].events = POLLOUT; if (is_recver) clientfds[maxi].events = POLLIN; maxi++; } } else{ conn = accept(listenfd, (struct sockaddr *)&connaddr, &len); if (conn < 0){ printf("poll_loop accept failed %d -> %s\n", errno, strerror(errno)); break; } setfdnonblock(conn, 1); clientfds[maxi].fd = conn; clientfds[maxi].events = POLLOUT; maxi++; } } // 发送给所有当前已连接相同的fd int memFds[maxMemFds]; memset(memFds, -1, sizeof(int) * maxMemFds); int memFds_len = 0; if (is_sender){ memFds_len = copyFds(args, fget, memFds); } for(int i = 1; i < maxConns; i++){ if (clientfds[i].revents & POLLOUT){ int conn = clientfds[i].fd; if (conn < 0) continue; if (memFds_len > 0){ // 仅发送一个 sizeof(memFd) int ret = sendfd(conn, memFds, memFds_len); if (ret == -1){ printf("poll_loop sendfd %d client in %d failed\n", i, nready); } else if (ret == 0){ close(conn); clientfds[i].fd = -1; continue; } } } if (clientfds[i].revents & POLLIN){ int conn = clientfds[i].fd; int recvFds[maxMemFds]; memset(recvFds, -1, sizeof(int) * maxMemFds); int fds = recvfd(conn, recvFds, maxMemFds); if (fds < 0){ printf("poll_loop recvfd failed\n"); continue; }else if (fds == 0){ close(conn); clientfds[i].fd = -1; continue; } else if (fds > maxMemFds){ printf("poll_loop recvfd too much! max recv fds %d\n", maxMemFds); } handleFds(args, fhandle, recvFds, fds); } } for (int i = 0; i < memFds_len; i++) { close(memFds[i]); } } for(int i = 0; i < maxConns; i++){ if (clientfds[i].fd != -1) close(clientfds[i].fd); } return 0; } int select_loop(int listenfd, cbGetData fget, cbHandleData fhandle, cbQuit fquit, void* args){ if (!fget && !fhandle) { printf("select At Leaset MUST be a sender or receiver\n"); return -1; } int is_sender = fget ? 1 : 0; int is_recver = fhandle ? 1 : 0; struct sockaddr_un connaddr; socklen_t len = sizeof(connaddr); cbQuit fq = no_quit; if (fquit) fq = fquit; int clients[maxConns]; memset(clients, -1, maxConns * sizeof(int)); while (!fq(args)) { fd_set rset, wset; FD_ZERO(&rset); FD_ZERO(&wset); FD_SET(listenfd, &rset); int maxfd = listenfd; for (int i = 0; i < maxConns; i++){ int conn = clients[i]; if (conn != -1){ if (is_sender) FD_SET(conn, &wset); if (is_recver) FD_SET(conn, &rset); } if (maxfd < conn) { maxfd = conn; } } struct timeval tm = {.tv_sec = 1, .tv_usec = 0}; int ret = select(maxfd + 1, &rset, &wset, NULL, &tm); if (ret == -1){ if (errno == EINTR) break; } else if (ret == 0) { // timeout continue; } if (FD_ISSET(listenfd, &rset)){ int conn = accept(listenfd, (struct sockaddr *)&connaddr, &len); if (conn < 0){ printf("select accept failed %d -> %s\n", errno, strerror(errno)); break; } update_client(clients, maxConns, conn, -1); } int memFds[maxMemFds]; memset(memFds, -1, sizeof(int) * maxMemFds); int memFds_len = 0; if (is_sender){ memFds_len = copyFds(args, fget, memFds); } for (int i = 0; i < maxConns; i++){ int conn = clients[i]; if (conn > 0 && FD_ISSET(conn, &rset)){ int recvFds[maxMemFds]; memset(recvFds, -1, sizeof(int) * maxMemFds); int fds = recvfd(conn, recvFds, maxMemFds); if (fds < 0){ printf("select_loop recvfd failed\n"); continue; }else if (fds == 0){ close(conn); update_client(clients, maxConns, -1, conn); FD_CLR(conn, &wset); continue; } else if (fds > maxMemFds){ printf("select_loop recvfd too much! max recv fds %d\n", maxMemFds); } handleFds(args, fhandle, recvFds, fds); } if (conn > 0 && FD_ISSET(conn, &wset)){ if (memFds_len > 0){ // 仅发送一个 sizeof(memFd) int ret = sendfd(conn, memFds, memFds_len); if (ret == -1){ printf("select_loop sendfd %d client in %d failed\n", i, maxConns); } else if (ret == 0){ close(conn); update_client(clients, maxConns, -1, conn); FD_CLR(conn, &wset); continue; } } } } for (int i = 0; i < memFds_len; i++) { close(memFds[i]); } } for(int i = 0; i < maxConns; i++){ if (clients[i] != -1) close(clients[i]); } return 0; } int simple_loop(int listenfd, cbGetData fget, cbQuit fquit, void* args){ if (!fget) { printf("simple_loop MUST be a sender\n"); return -1; } struct sockaddr_un connaddr; socklen_t len = sizeof(connaddr); cbQuit fq = no_quit; if (fquit) fq = fquit; int clients[maxConns]; memset(clients, -1, maxConns * sizeof(int)); // 设置 listenfd 为非阻塞, accpet 不会阻塞 setfdnonblock(listenfd, 1); int cur_cli = 0; while (!fq(args)) { int conn = accept(listenfd, (struct sockaddr *)&connaddr, &len); // if (conn < 0){ // printf("simple_loop accept failed %d -> %s\n", errno, strerror(errno)); // usleep(10000); // continue; // } // 如果 accept 成功添加连接 if (conn > 0){ cur_cli++; // 设置为非阻塞,以免阻塞其他socket发送 // setfdnonblock(conn, 1); // 应该设置为阻塞io保证所有的客户端接收的数据应该是一样的 // 风险在于如果某一个接收较慢,之后的连接都需等待 update_client(clients, maxConns, conn, -1); } if (cur_cli <= 0) { usleep(10000); continue; } int memFds[maxMemFds]; memset(memFds, -1, sizeof(int) * maxMemFds); int memFds_len = copyFds(args, fget, memFds); for (int i = 0; i < maxConns; ++i) { int conn = clients[i]; if (conn <= 0) continue; if (memFds_len > 0){ // 仅发送一个 sizeof(memFd) int ret = sendfd(conn, memFds, memFds_len); if (ret == -1){ printf("simple_loop sendfd %d client in %d failed\n", i, maxConns); } else if (ret == 0){ close(conn); update_client(clients, maxConns, -1, conn); cur_cli--; continue; } } } for (int i = 0; i < memFds_len; i++) { close(memFds[i]); } } for(int i = 0; i < maxConns; i++){ if (clients[i] != -1) close(clients[i]); } return 0; }