#include "epollfd_shm.h"
|
|
#include <sys/types.h>
|
#include <sys/socket.h>
|
#include <sys/epoll.h>
|
#include <sys/un.h>
|
#include <poll.h>
|
#include <unistd.h>
|
#include <errno.h>
|
#include <stdio.h>
|
#include <stdlib.h>
|
#include <string.h>
|
#include <fcntl.h>
|
|
#include "transfd_shm.h"
|
|
static void setfdnonblock(int fd, int nonblock){
|
int flags = fcntl(fd, F_GETFL);
|
if (flags < 0) return;
|
|
if (nonblock) {
|
flags |= O_NONBLOCK;
|
} else {
|
flags &= ~O_NONBLOCK;
|
}
|
if (fcntl(fd, F_SETFL, flags) < 0){
|
printf("setfdnonblock %s failed %d -> %s\n", nonblock ? "nonblock" : "block", errno, strerror(errno));
|
}
|
}
|
|
static inline int fdnonblock(int fd){
|
int flags = fcntl(fd, F_GETFL);
|
if (flags < 0) return 0;
|
return flags & O_NONBLOCK;
|
}
|
|
int unix_domain_server_fd(const char* path, int nonblock){
|
struct sockaddr_un addr;
|
if (strlen(path) > sizeof(addr.sun_path) -1){
|
printf("unix_domain_server_fd path too long, must less than %lu\n", sizeof(addr.sun_path));
|
return -1;
|
}
|
|
memset(&addr, 0, sizeof(addr));
|
addr.sun_family = AF_UNIX;
|
memcpy(addr.sun_path, path, strlen(path));
|
|
int fd = socket(AF_UNIX, SOCK_SEQPACKET|SOCK_CLOEXEC, 0);
|
if (fd < 0) {
|
printf("unix_domain_server_fd failed %d -> %s\n", errno, strerror(errno));
|
return fd;
|
}
|
|
unlink(path);
|
|
int ret = bind(fd, (const struct sockaddr *)&addr, sizeof(addr));
|
if (ret != 0){
|
close(fd);
|
printf("unix_domain_server_fd bind failed %d -> %s\n", errno, strerror(errno));
|
return ret;
|
}
|
|
ret = listen(fd, 128);
|
if (ret != 0){
|
printf("unix_domain_server_fd listen failed %d -> %s\n", errno, strerror(errno));
|
close(fd);
|
return ret;
|
}
|
|
if (nonblock) setfdnonblock(fd, 1);
|
|
return fd;
|
}
|
|
int unix_domain_client_fd(const char *path, int nonblock){
|
struct sockaddr_un addr;
|
memset(&addr, 0, sizeof(addr));
|
addr.sun_family = AF_UNIX;
|
memcpy(addr.sun_path, path, strlen(path));
|
|
int fd = socket(PF_UNIX, SOCK_SEQPACKET|SOCK_CLOEXEC, 0);
|
if (fd < 0){
|
printf("unix_domain_client_fd socket failed %d -> %s\n", errno, strerror(errno));
|
return fd;
|
}
|
|
int ret = connect(fd, (const struct sockaddr *)&addr, sizeof(addr));
|
if (ret != 0){
|
printf("unix_domain_client_fd connect failed %d -> %s\n", errno, strerror(errno));
|
close(fd);
|
return ret;
|
}
|
|
if (nonblock) setfdnonblock(fd, 1);
|
|
return fd;
|
}
|
|
static int epoll_init(int fd, int flags){
|
int epollfd = epoll_create1(EPOLL_CLOEXEC);
|
if (epollfd < 0){
|
printf("epoll_loop_server create epollfd failed %d -> %s", errno, strerror(errno));
|
return -1;
|
}
|
|
struct epoll_event event;
|
event.events = flags;
|
event.data.fd = fd;
|
|
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) < 0){
|
printf("epoll_loop_server add failed %d -> %s\n", errno, strerror(errno));
|
return -1;
|
}
|
return epollfd;
|
}
|
|
static void update_client(int clients[], int max, int newFd, int oldFd){
|
for(int i = 0; i < max; i++){
|
if (clients[i] == oldFd){
|
clients[i] = newFd;
|
break;
|
}
|
}
|
}
|
|
static int no_quit(void* args){
|
return 0;
|
}
|
|
// 认为最多不会超过256个客户端连接
|
static const int maxConns = 256;
|
// 认为最多一次传输的fd不超过16个
|
static const int maxMemFds = 16;
|
|
static int copyFds(void* args, cbGetData fget, int memFds[]) {
|
struct fd_msg msg;
|
memset(&msg, 0, sizeof(msg));
|
int memFds_len = 0;
|
if (fget(args, &msg)){
|
if (msg.data) {
|
memcpy(memFds, msg.data, sizeof(int) * msg.len);
|
memFds_len = msg.len;
|
} else {
|
memFds[0] = msg.fd;
|
memFds_len = 1;
|
}
|
}
|
return memFds_len;
|
}
|
|
static void handleFds(void* args, cbHandleData fhandle, int memFds[], int fds) {
|
struct fd_msg msg = {};
|
int fd_len = fds > maxMemFds ? maxMemFds : fds;
|
if (fd_len > 1) {
|
msg.data = memFds;
|
msg.len = fd_len;
|
} else {
|
msg.fd = memFds[0];
|
}
|
fhandle(args, &msg);
|
}
|
|
int epoll_loop(int listenfd, cbGetData fget, cbHandleData fhandle, cbQuit fquit, void* args){
|
if (!fget && !fhandle) {
|
printf("epoll At Leaset MUST be a sender or receiver\n");
|
return -1;
|
}
|
int is_sender = fget ? 1 : 0;
|
int is_recver = fhandle ? 1 : 0;
|
|
struct sockaddr_un connaddr;
|
socklen_t len = sizeof(connaddr);
|
|
cbQuit fq = no_quit;
|
if (fquit) fq = fquit;
|
|
int clients[maxConns];
|
memset(clients, -1, maxConns * sizeof(int));
|
|
int ET = 0;
|
|
int epollfd = epoll_init(listenfd, EPOLLIN|ET);
|
if (epollfd == -1){
|
return epollfd;
|
}
|
|
int listenfd_nonblock = fdnonblock(listenfd);
|
|
while (!fq(args)) {
|
struct epoll_event events[maxConns];
|
memset(events, 0, maxConns * sizeof(struct epoll_event));
|
int nready = epoll_wait(epollfd, events, maxConns, 3000);
|
if (nready == -1){
|
printf("epoll_loop wait failed %d -> %s\n", errno, strerror(errno));
|
break;
|
}
|
if (nready == 0){
|
// timeout
|
continue;
|
}
|
|
// 发送给所有当前已连接相同的fd
|
int memFds[maxMemFds];
|
memset(memFds, -1, sizeof(int) * maxMemFds);
|
int memFds_len = 0;
|
if (is_sender){
|
memFds_len = copyFds(args, fget, memFds);
|
}
|
|
for(int i = 0; i < nready; i++){
|
if (events[i].data.fd == listenfd){
|
int conn;
|
if (listenfd_nonblock){
|
while((conn = accept(listenfd, (struct sockaddr *)&connaddr, &len)) > 0){
|
setfdnonblock(conn, 1);
|
struct epoll_event ev;
|
ev.data.fd = conn;
|
ev.events = EPOLLRDHUP|ET;
|
if (is_sender) ev.events |= EPOLLOUT;
|
if (is_recver) ev.events |= EPOLLIN;
|
epoll_ctl(epollfd, EPOLL_CTL_ADD, conn, &ev);
|
update_client(clients, maxConns, conn, -1);
|
}
|
} else{
|
conn = accept(listenfd, (struct sockaddr *)&connaddr, &len);
|
if (conn < 0){
|
printf("epoll_loop accept failed %d -> %s\n", errno, strerror(errno));
|
continue;
|
}
|
setfdnonblock(conn, 1);
|
struct epoll_event ev;
|
ev.data.fd = conn;
|
ev.events = EPOLLRDHUP|ET;
|
if (is_sender) ev.events |= EPOLLOUT;
|
if (is_recver) ev.events |= EPOLLIN;
|
epoll_ctl(epollfd, EPOLL_CTL_ADD, conn, &ev);
|
update_client(clients, maxConns, conn, -1);
|
}
|
} else if (events[i].events & EPOLLIN){
|
// 此处应该没有用
|
int conn = events[i].data.fd;
|
if (events[i].events & EPOLLRDHUP){
|
close(conn);
|
struct epoll_event ev = events[i];
|
epoll_ctl(epollfd, EPOLL_CTL_DEL, conn, &ev);
|
update_client(clients, maxConns, -1, conn);
|
continue;
|
}
|
|
if (conn < 0) continue;
|
int recvFds[maxMemFds];
|
memset(recvFds, -1, sizeof(int) * maxMemFds);
|
int fds = recvfd(conn, recvFds, maxMemFds);
|
if (fds < 0){
|
printf("epoll_loop recvfd failed\n");
|
continue;
|
}else if (fds == 0){
|
close(conn);
|
struct epoll_event ev = events[i];
|
epoll_ctl(epollfd, EPOLL_CTL_DEL, conn, &ev);
|
update_client(clients, maxConns, -1, conn);
|
continue;
|
} else if (fds > maxMemFds){
|
printf("epoll_loop recvfd too much! max recv fds %d\n", maxMemFds);
|
}
|
|
handleFds(args, fhandle, recvFds, fds);
|
|
} else if (events[i].events & EPOLLOUT){
|
int conn = events[i].data.fd;
|
if (events[i].events & EPOLLRDHUP){
|
close(conn);
|
struct epoll_event ev = events[i];
|
epoll_ctl(epollfd, EPOLL_CTL_DEL, conn, &ev);
|
update_client(clients, maxConns, -1, conn);
|
continue;
|
}
|
|
if (conn < 0) continue;
|
if (memFds_len > 0){
|
// 仅发送一个 sizeof(memFd)
|
int ret = sendfd(conn, memFds, memFds_len);
|
if (ret == -1){
|
printf("epoll_loop sendfd %d client in %d failed\n", i, nready);
|
} else if (ret == 0){
|
close(conn);
|
struct epoll_event ev = events[i];
|
epoll_ctl(epollfd, EPOLL_CTL_DEL, conn, &ev);
|
update_client(clients, maxConns, -1, conn);
|
continue;
|
}
|
}
|
}
|
}
|
for (int i = 0; i < memFds_len; i++) {
|
close(memFds[i]);
|
}
|
}
|
for(int i = 0; i < maxConns; i++){
|
if (clients[i] != -1)
|
close(clients[i]);
|
}
|
close(epollfd);
|
return 0;
|
}
|
|
int poll_loop(int listenfd, cbGetData fget, cbHandleData fhandle, cbQuit fquit, void* args){
|
if (!fget && !fhandle) {
|
printf("poll At Leaset MUST be a sender or receiver\n");
|
return -1;
|
}
|
int is_sender = fget ? 1 : 0;
|
int is_recver = fhandle ? 1 : 0;
|
|
struct sockaddr_un connaddr;
|
socklen_t len = sizeof(connaddr);
|
|
cbQuit fq = no_quit;
|
if (fquit) fq = fquit;
|
|
struct pollfd clientfds[maxConns];
|
memset(clientfds, -1, maxConns * sizeof(struct pollfd));
|
clientfds[0].fd = listenfd;
|
clientfds[0].events = POLLIN;
|
|
int maxi = 1;
|
|
int listenfd_nonblock = fdnonblock(listenfd);
|
|
while (!fq(args)) {
|
int nready = poll(clientfds, maxConns, 1000);
|
if (nready == -1){
|
printf("poll_loop wait failed %d -> %s\n", errno, strerror(errno));
|
break;
|
}
|
if (nready == 0){
|
// timeout
|
continue;
|
}
|
|
if (clientfds[0].fd == listenfd && (clientfds[0].revents & POLLIN)) {
|
int conn;
|
if (listenfd_nonblock){
|
while((conn = accept(listenfd, (struct sockaddr *)&connaddr, &len)) > 0){
|
setfdnonblock(conn, 1);
|
clientfds[maxi].fd = conn;
|
if (is_sender) clientfds[maxi].events = POLLOUT;
|
if (is_recver) clientfds[maxi].events = POLLIN;
|
maxi++;
|
}
|
} else{
|
conn = accept(listenfd, (struct sockaddr *)&connaddr, &len);
|
if (conn < 0){
|
printf("poll_loop accept failed %d -> %s\n", errno, strerror(errno));
|
break;
|
}
|
setfdnonblock(conn, 1);
|
clientfds[maxi].fd = conn;
|
clientfds[maxi].events = POLLOUT;
|
maxi++;
|
}
|
}
|
|
// 发送给所有当前已连接相同的fd
|
int memFds[maxMemFds];
|
memset(memFds, -1, sizeof(int) * maxMemFds);
|
int memFds_len = 0;
|
if (is_sender){
|
memFds_len = copyFds(args, fget, memFds);
|
}
|
|
for(int i = 1; i < maxConns; i++){
|
if (clientfds[i].revents & POLLOUT){
|
int conn = clientfds[i].fd;
|
if (conn < 0) continue;
|
if (memFds_len > 0){
|
// 仅发送一个 sizeof(memFd)
|
int ret = sendfd(conn, memFds, memFds_len);
|
if (ret == -1){
|
printf("poll_loop sendfd %d client in %d failed\n", i, nready);
|
} else if (ret == 0){
|
close(conn);
|
clientfds[i].fd = -1;
|
continue;
|
}
|
}
|
}
|
if (clientfds[i].revents & POLLIN){
|
int conn = clientfds[i].fd;
|
|
int recvFds[maxMemFds];
|
memset(recvFds, -1, sizeof(int) * maxMemFds);
|
int fds = recvfd(conn, recvFds, maxMemFds);
|
if (fds < 0){
|
printf("poll_loop recvfd failed\n");
|
continue;
|
}else if (fds == 0){
|
close(conn);
|
clientfds[i].fd = -1;
|
continue;
|
} else if (fds > maxMemFds){
|
printf("poll_loop recvfd too much! max recv fds %d\n", maxMemFds);
|
}
|
|
handleFds(args, fhandle, recvFds, fds);
|
}
|
}
|
for (int i = 0; i < memFds_len; i++) {
|
close(memFds[i]);
|
}
|
}
|
for(int i = 0; i < maxConns; i++){
|
if (clientfds[i].fd != -1)
|
close(clientfds[i].fd);
|
}
|
return 0;
|
}
|
|
int select_loop(int listenfd, cbGetData fget, cbHandleData fhandle, cbQuit fquit, void* args){
|
if (!fget && !fhandle) {
|
printf("select At Leaset MUST be a sender or receiver\n");
|
return -1;
|
}
|
int is_sender = fget ? 1 : 0;
|
int is_recver = fhandle ? 1 : 0;
|
|
struct sockaddr_un connaddr;
|
socklen_t len = sizeof(connaddr);
|
|
cbQuit fq = no_quit;
|
if (fquit) fq = fquit;
|
|
int clients[maxConns];
|
memset(clients, -1, maxConns * sizeof(int));
|
|
while (!fq(args)) {
|
fd_set rset, wset;
|
FD_ZERO(&rset);
|
FD_ZERO(&wset);
|
FD_SET(listenfd, &rset);
|
|
int maxfd = listenfd;
|
for (int i = 0; i < maxConns; i++){
|
int conn = clients[i];
|
if (conn != -1){
|
if (is_sender)
|
FD_SET(conn, &wset);
|
if (is_recver)
|
FD_SET(conn, &rset);
|
}
|
if (maxfd < conn) {
|
maxfd = conn;
|
}
|
}
|
|
struct timeval tm = {.tv_sec = 1, .tv_usec = 0};
|
int ret = select(maxfd + 1, &rset, &wset, NULL, &tm);
|
if (ret == -1){
|
if (errno == EINTR)
|
break;
|
} else if (ret == 0) {
|
// timeout
|
continue;
|
}
|
|
if (FD_ISSET(listenfd, &rset)){
|
int conn = accept(listenfd, (struct sockaddr *)&connaddr, &len);
|
if (conn < 0){
|
printf("select accept failed %d -> %s\n", errno, strerror(errno));
|
break;
|
}
|
update_client(clients, maxConns, conn, -1);
|
}
|
|
int memFds[maxMemFds];
|
memset(memFds, -1, sizeof(int) * maxMemFds);
|
int memFds_len = 0;
|
if (is_sender){
|
memFds_len = copyFds(args, fget, memFds);
|
}
|
for (int i = 0; i < maxConns; i++){
|
int conn = clients[i];
|
if (conn > 0 && FD_ISSET(conn, &rset)){
|
|
int recvFds[maxMemFds];
|
memset(recvFds, -1, sizeof(int) * maxMemFds);
|
int fds = recvfd(conn, recvFds, maxMemFds);
|
if (fds < 0){
|
printf("select_loop recvfd failed\n");
|
continue;
|
}else if (fds == 0){
|
close(conn);
|
update_client(clients, maxConns, -1, conn);
|
FD_CLR(conn, &wset);
|
continue;
|
} else if (fds > maxMemFds){
|
printf("select_loop recvfd too much! max recv fds %d\n", maxMemFds);
|
}
|
|
handleFds(args, fhandle, recvFds, fds);
|
}
|
if (conn > 0 && FD_ISSET(conn, &wset)){
|
if (memFds_len > 0){
|
// 仅发送一个 sizeof(memFd)
|
int ret = sendfd(conn, memFds, memFds_len);
|
if (ret == -1){
|
printf("select_loop sendfd %d client in %d failed\n", i, maxConns);
|
} else if (ret == 0){
|
close(conn);
|
update_client(clients, maxConns, -1, conn);
|
FD_CLR(conn, &wset);
|
continue;
|
}
|
}
|
}
|
}
|
for (int i = 0; i < memFds_len; i++) {
|
close(memFds[i]);
|
}
|
}
|
|
for(int i = 0; i < maxConns; i++){
|
if (clients[i] != -1)
|
close(clients[i]);
|
}
|
return 0;
|
}
|
|
int simple_loop(int listenfd, cbGetData fget, cbQuit fquit, void* args){
|
if (!fget) {
|
printf("simple_loop MUST be a sender\n");
|
return -1;
|
}
|
|
struct sockaddr_un connaddr;
|
socklen_t len = sizeof(connaddr);
|
|
cbQuit fq = no_quit;
|
if (fquit) fq = fquit;
|
|
int clients[maxConns];
|
memset(clients, -1, maxConns * sizeof(int));
|
|
// 设置 listenfd 为非阻塞, accpet 不会阻塞
|
setfdnonblock(listenfd, 1);
|
|
int cur_cli = 0;
|
|
while (!fq(args)) {
|
|
int conn = accept(listenfd, (struct sockaddr *)&connaddr, &len);
|
// if (conn < 0){
|
// printf("simple_loop accept failed %d -> %s\n", errno, strerror(errno));
|
// usleep(10000);
|
// continue;
|
// }
|
|
// 如果 accept 成功添加连接
|
if (conn > 0){
|
cur_cli++;
|
// 设置为非阻塞,以免阻塞其他socket发送
|
// setfdnonblock(conn, 1);
|
// 应该设置为阻塞io保证所有的客户端接收的数据应该是一样的
|
// 风险在于如果某一个接收较慢,之后的连接都需等待
|
update_client(clients, maxConns, conn, -1);
|
}
|
|
if (cur_cli <= 0) {
|
usleep(10000);
|
continue;
|
}
|
|
int memFds[maxMemFds];
|
memset(memFds, -1, sizeof(int) * maxMemFds);
|
int memFds_len = copyFds(args, fget, memFds);
|
|
for (int i = 0; i < maxConns; ++i) {
|
int conn = clients[i];
|
if (conn <= 0) continue;
|
|
if (memFds_len > 0){
|
// 仅发送一个 sizeof(memFd)
|
int ret = sendfd(conn, memFds, memFds_len);
|
if (ret == -1){
|
printf("simple_loop sendfd %d client in %d failed\n", i, maxConns);
|
} else if (ret == 0){
|
close(conn);
|
update_client(clients, maxConns, -1, conn);
|
cur_cli--;
|
continue;
|
}
|
}
|
}
|
for (int i = 0; i < memFds_len; i++) {
|
close(memFds[i]);
|
}
|
}
|
|
for(int i = 0; i < maxConns; i++){
|
if (clients[i] != -1)
|
close(clients[i]);
|
}
|
return 0;
|
}
|