src/Makefile | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/ipc_msg.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/ipc_msg.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/ipc_server.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/ipc_server_lib.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/memfd.c | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/Makefile
@@ -17,8 +17,10 @@ $(CC) $(CFLAGS) $(INCLUDE) -fPIC -c shmht_mytests.c memfd.o: $(CC) $(CFLAGS) $(INCLUDE) -fPIC -c memfd.c ipc_server: $(CC) $(CFLAGS) $(INCLUDE) -fPIC -lpthread -o $@ ipc_server.c ipc_msg.c memfd.c ipc_server:libipc_server.so $(CC) $(CFLAGS) $(INCLUDE) -fPIC -pthread -o $@ ipc_server.c -lipc_server -L . libipc_server.so: $(CC) -shared -fPIC -o $@ ipc_server_lib.c ipc_msg.c memfd.c ipc_client: $(CC) $(CFLAGS) $(INCLUDE) -fPIC -o $@ ipc_client.c ipc_msg.c memfd.c clean: src/ipc_msg.c
@@ -57,7 +57,7 @@ int send_fd_sendmsg(int fd, memfd_data_st** ppmemfd_data) { memfd_data_st *ptr_memfd_data = *ppmemfd_data; return send_fd_args(fd, ptr_memfd_data->memfd, ptr_memfd_data->pid, &ptr_memfd_data->data, ptr_memfd_data->len); return send_fd_args_sendmsg(fd, ptr_memfd_data->memfd, ptr_memfd_data->pid, &ptr_memfd_data->data, ptr_memfd_data->len); } /** @@ -102,4 +102,4 @@ memset(*ppmemfd_data, 0, control_len); memcpy(*ppmemfd_data, CMSG_DATA(&cm), control_len); return ret; } } src/ipc_msg.h
@@ -11,6 +11,8 @@ #define UNIX_DOMAIN "/tmp/UNIX.domain" #define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \ } while (0) #define MAX_LEN 4096 #define HELLO_MSG "hello_basic" @@ -26,20 +28,33 @@ int len; } memfd_data_st; int send_fd(int fd, memfd_data_st** ppmemfd_data); struct user_data { int fd; unsigned int n_size; char line[MAX_LEN]; }; #define READ_THREAD_NUM 2 #define WRITE_THREAD_NUM 2 int proc_memfd(struct user_data* rdata); int basic_create_ipc_server(char * unix_domain_path); int send_fd_sendmsg(int fd, memfd_data_st** ppmemfd_data); /** * @brief 发送目标文件描述符 * @param fd 传递信息的 UNIX 域 文件描述符 * @param fd_to_send 待发送的文件描述符 */ int send_fd_args(int fd, int fd_to_send, pid_t pid, void **ppdata, int len); int send_fd_args_sendmsg(int fd, int fd_to_send, pid_t pid, void **ppdata, int len); /** * @brief 接受文件描述符 * @param fd 传递信息的 UNIX 域 文件描述符 */ int recv_fd(int fd, memfd_data_st** ppmemfd_data); int recv_fd_recvmsg(int fd, memfd_data_st** ppmemfd_data); #ifdef __cplusplus } src/ipc_server.c
@@ -1,61 +1,14 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <sys/un.h> #include <unistd.h> #include <signal.h> #include <sys/epoll.h> #include <errno.h> #include <sys/syscall.h> #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> #include <signal.h> #include <sys/types.h> #include <pthread.h> #include <sys/stat.h> #include "memfd.h" #include "ipc_msg.h" #define MAX_EPOLL_EVENT_COUNT 1024 #define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \ } while (0) #define LISTENQ 1024 // task item in thread pool struct task { // file descriptor or user_data epoll_data_t data; // next task struct task* next; }; struct user_data { int fd; unsigned int n_size; char line[MAX_LEN]; }; //typedef int (*readfunc)(struct user_data*); typedef int (*writefunc)(struct user_data*); /* thread exec function */ void *readtask(void *args); void *writetask(void *args); // mutex lock to protect readhead/readtail pthread_mutex_t r_mutex; pthread_cond_t r_condl; // mutex lock to protect writehead/writetail pthread_mutex_t w_mutex; pthread_cond_t w_condl; struct task *readhead = NULL, *readtail = NULL; struct task *writehead = NULL, *writetail = NULL; //create epoll int epfd,eventfd; struct epoll_event ev,events[LISTENQ]; int g_memfd = 0; @@ -89,23 +42,6 @@ return len; } void setnonblocking(int sock) { int opts; opts = fcntl(sock, F_GETFL); if(opts<0) { perror("fcntl(sock,GETFL)"); exit(1); } if(fcntl(sock, F_SETFL, opts | O_NONBLOCK)<0) { perror("fcntl(sock,SETFL,opts)"); exit(1); } } int proc_memfd(struct user_data* rdata) @@ -166,423 +102,38 @@ } /** * thread exec function */ void *readtask(void *args) { int i = 0; int fd = -1; unsigned int n = 0; struct user_data *data = NULL; while (1) { // protect task queue (readhead/readtail) pthread_mutex_lock(&r_mutex); while(readhead == NULL) { // if condl false, will unlock mutex mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self()); pthread_cond_wait(&r_condl, &r_mutex); } fd = readhead->data.fd; struct task *tmp = readhead; readhead = readhead->next; free(tmp); pthread_mutex_unlock(&r_mutex); mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd); data = (struct user_data *)malloc(sizeof(struct user_data)); if(NULL == data) { perror("readtask malloc"); return; } memset(data, 0 , sizeof(struct user_data)); data->fd = fd; n=0; int nread = 0; while((nread = read(fd,data->line+n,MAX_LEN))>0) { n+=nread; } if((nread==-1) && (errno != EAGAIN)) { perror("read error"); } if(n < 0) { if (errno == ECONNRESET) { close(fd); mydebug("[SERVER] Error: readline failed: %s\n", strerror(errno)); } else { mydebug("readline error \n"); } if(data != NULL) { free(data); data = NULL; } } else if (n == 0) { close(fd); mydebug("[SERVER] Info: client closed connection.\n"); if(data != NULL) { free(data); data = NULL; } } else { data->n_size = n; mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line); if (data->line[0] != '\0') { // modify monitored event to EPOLLOUT, wait next loop to send respond ev.data.ptr = data; // Modify event to EPOLLOUT ev.events = EPOLLOUT | EPOLLET; // modify moditored fd event if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1) { perror("epoll_ctl:mod"); if(data != NULL) { free(data); data = NULL; } } } } } } void *writetask(void *args) { int n = 0; int nwrite = 0; int ret = 0; // data to wirte back to client struct user_data *rdata = NULL; while(1) { pthread_mutex_lock(&w_mutex); while(writehead == NULL) { // if condl false, will unlock mutex pthread_cond_wait(&w_condl, &w_mutex); } rdata = (struct user_data*)writehead->data.ptr; struct task* tmp = writehead; writehead = writehead->next; free(tmp); //echo("[SERVER] thread %d writetask before unlock\n", pthread_self()); pthread_mutex_unlock(&w_mutex); //echo("[SERVER] thread %d writetask after unlock\n", pthread_self()); if(rdata->fd < 0) { goto error; } mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd); writefunc wfunc = (writefunc)args; ret = wfunc(rdata); if(ret < 0) { goto error; } // delete data if (rdata->fd > 0) { // modify monitored event to EPOLLIN, wait next loop to receive data ev.data.fd = rdata->fd; // monitor in message, edge trigger //ev.events = EPOLLIN | EPOLLET; // modify moditored fd event if(epoll_ctl(epfd, EPOLL_CTL_DEL, rdata->fd, &ev)==-1) { perror("epoll_ctl:del"); } close(rdata->fd); } error: free(rdata); } } /*define you our proc_memfd funtion to send your message to other processes,or just put it in shm, notice the file description fd and the pid of creator to other*/ int main(int argc, char **argv) { char *name; ssize_t len; int lsn_fd, apt_fd; struct sockaddr_un srv_addr; struct sockaddr_un clt_addr; socklen_t clt_len; int ret; int i; //char recv_buf[MAX_LEN] = {0}; //char send_buf[MAX_LEN] = {0}; char line[MAX_LEN] = {0}; pthread_t *read_tids = NULL; int read_thread_num = 2; pthread_t *write_tids = NULL; int write_thread_num = 2; struct task *new_task=NULL; struct user_data *rdata=NULL; char * unixdomain = NULL; int ret = 0; if (argc < 3) { fprintf(stderr, "%s name size\n", argv[0]); fprintf(stderr, "%s name size [unix domain path]\n", argv[0]); exit(EXIT_FAILURE); } name = argv[1]; len = atoi(argv[2]) * 1024LU * 1024LU * 1024LU; signal(SIGTERM,handler); unixdomain = argv[3]; signal(SIGTERM,handler); signal(SIGABRT,handler); signal(SIGSEGV,handler); g_memfd = basic_shm_create(name, len); ret = shm_write(g_memfd); if((read_tids = (pthread_t *)malloc(read_thread_num * sizeof(pthread_t))) == NULL) { perror("read_tids malloc fail\n"); return -1; } // threads for reading thread pool for(i = 0; i < read_thread_num; i++) ret = basic_create_ipc_server(unixdomain); if(ret < 0) { pthread_create(&read_tids[i], NULL, readtask, NULL); printf("failed to create ipc_server\n"); } if((write_tids = (pthread_t *)malloc(write_thread_num * sizeof(pthread_t))) == NULL) { perror("write_tids malloc fail\n"); return -1; } // threads for writing thread pool for(i = 0; i < write_thread_num; i++) { pthread_create(&write_tids[i], NULL, writetask, proc_memfd); } epfd = epoll_create(MAX_EPOLL_EVENT_COUNT); //create socket to bind local IP and PORT lsn_fd = socket(AF_UNIX, SOCK_STREAM, 0); int opt = 0x1; setsockopt(lsn_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); // set the descriptor as non-blocking setnonblocking(lsn_fd); ev.data.fd = lsn_fd; ev.events = EPOLLIN|EPOLLET; /*Control an epoll descriptor, $epfd, by requesting the operation op be performed on the target file descriptor, fd. $epfd is an epoll descriptor returned from epoll_create. $op is one of EPOLL_CTL_ADD, EPOLL_CTL_MOD or EPOLL_CTL_DEL. $fd is the file desciptor to be watched. $eventmask is a bitmask of events defined by EPOLLIN, EPOLLOUT, etc. When successful, epoll_ctl returns 0. When an error occurs, epoll_ctl returns -1 and errno is set appropriately. */ if(epoll_ctl(epfd,EPOLL_CTL_ADD,lsn_fd,&ev)==-1) { perror("epoll_ctl:add"); } if(lsn_fd < 0) { perror("can't create communication socket!"); unlink(UNIX_DOMAIN); basic_shm_close(g_memfd); return 1; } //create local IP and PORT srv_addr.sun_family = AF_UNIX; strncpy(srv_addr.sun_path, UNIX_DOMAIN, sizeof(srv_addr.sun_path) - 1); //unlink(UNIX_DOMAIN); //bind sockfd and sockaddr ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr)); if(ret == -1) { perror("can't bind local sockaddr!"); close(lsn_fd); unlink(UNIX_DOMAIN); basic_shm_close(g_memfd); return 1; } //listen lsn_fd, try listen 5 ret = listen(lsn_fd, LISTENQ); if(ret == -1) { perror("can't listen client connect request"); close(lsn_fd); unlink(UNIX_DOMAIN); basic_shm_close(g_memfd); return 1; } clt_len = sizeof(clt_addr); while(1) { int nfds = epoll_wait(epfd,events,1024,100); int i=0; for(i=0;i<nfds;++i) { if(events[i].data.fd == lsn_fd) { // accept the client connection while((apt_fd=accept(lsn_fd, (struct sockaddr *)&clt_addr, &clt_len))>0) { setnonblocking(apt_fd); //char *str = inet_ntoa(clt_addr.sun_path); //printf("[SERVER] connect from %s \n", str); ev.data.fd = apt_fd; // monitor in message, edge trigger ev.events = EPOLLIN | EPOLLET; // add fd to epoll queue if(epoll_ctl(epfd,EPOLL_CTL_ADD,apt_fd,&ev)==-1) { perror("epoll_ctl:add"); continue; //exit(EXIT_FAILURE); } } if( apt_fd == -1) { if((errno != EAGAIN ) && (errno!= ECONNABORTED) && (errno != EPROTO) && (errno != EINTR)) { perror("accept"); } } continue; } else if (events[i].events & EPOLLIN) //write数据 { printf("EPOLLIN\n"); if( (eventfd = events[i].data.fd) < 0 ) { continue; } printf("[SERVER] put task %d to read queue\n", events[i].data.fd); new_task = (struct task *)malloc(sizeof(struct task)); if(NULL == new_task) { goto error; } new_task->data.fd= eventfd; new_task->next = NULL; // protect task queue (readhead/readtail) pthread_mutex_lock(&r_mutex); // the queue is empty if(readhead == NULL) { readhead = new_task; readtail = new_task; } // queue is not empty else { readtail->next = new_task; readtail = new_task; } // trigger readtask thread pthread_cond_broadcast(&r_condl); pthread_mutex_unlock(&r_mutex); } else if (events[i].events & EPOLLOUT) { rdata = (struct user_data *)events[i].data.ptr; if ( NULL == rdata ) { continue; } printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd); new_task = malloc(sizeof(struct task)); if(NULL == new_task) { goto error; } new_task->data.ptr = (struct user_data*)events[i].data.ptr; new_task->next = NULL; pthread_mutex_lock(&w_mutex); // the queue is empty if (writehead == NULL) { writehead = new_task; writetail = new_task; } // queue is not empty else { writetail->next = new_task; writetail = new_task; } // trigger writetask thread pthread_cond_broadcast(&w_condl); pthread_mutex_unlock(&w_mutex); } else { printf("[SERVER] Error: unknown epoll event"); } } } error: close(apt_fd); close(lsn_fd); unlink(UNIX_DOMAIN); basic_shm_close(g_memfd); return 0; } src/ipc_server_lib.c
New file @@ -0,0 +1,466 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <sys/un.h> #include <unistd.h> #include <sys/epoll.h> #include <errno.h> #include <sys/syscall.h> #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> #include <sys/types.h> #include <pthread.h> #include "ipc_msg.h" #define MAX_EPOLL_EVENT_COUNT 1024 #define LISTENQ 1024 // task item in thread pool struct task { // file descriptor or user_data epoll_data_t data; // next task struct task* next; }; //typedef int (*readfunc)(struct user_data*); typedef int (*writefunc)(struct user_data*); /* thread exec function */ static void *readtask(void *args); static void *writetask(void *args); // mutex lock to protect readhead/readtail pthread_mutex_t r_mutex; pthread_cond_t r_condl; // mutex lock to protect writehead/writetail pthread_mutex_t w_mutex; pthread_cond_t w_condl; struct task *readhead = NULL, *readtail = NULL; struct task *writehead = NULL, *writetail = NULL; //create epoll int epfd,eventfd; struct epoll_event ev,events[LISTENQ]; static void setnonblocking(int sock) { int opts; opts = fcntl(sock, F_GETFL); if(opts<0) { perror("fcntl(sock,GETFL)"); exit(1); } if(fcntl(sock, F_SETFL, opts | O_NONBLOCK)<0) { perror("fcntl(sock,SETFL,opts)"); exit(1); } } /** * thread exec function */ static void *readtask(void *args) { int i = 0; int fd = -1; unsigned int n = 0; struct user_data *data = NULL; while (1) { // protect task queue (readhead/readtail) pthread_mutex_lock(&r_mutex); while(readhead == NULL) { // if condl false, will unlock mutex mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self()); pthread_cond_wait(&r_condl, &r_mutex); } fd = readhead->data.fd; struct task *tmp = readhead; readhead = readhead->next; free(tmp); pthread_mutex_unlock(&r_mutex); mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd); data = (struct user_data *)malloc(sizeof(struct user_data)); if(NULL == data) { perror("readtask malloc"); return; } memset(data, 0 , sizeof(struct user_data)); data->fd = fd; n=0; int nread = 0; while((nread = read(fd,data->line+n,MAX_LEN))>0) { n+=nread; } if((nread==-1) && (errno != EAGAIN)) { perror("read error"); } if(n < 0) { if (errno == ECONNRESET) { close(fd); mydebug("[SERVER] Error: readline failed: %s\n", strerror(errno)); } else { mydebug("readline error \n"); } if(data != NULL) { free(data); data = NULL; } } else if (n == 0) { close(fd); mydebug("[SERVER] Info: client closed connection.\n"); if(data != NULL) { free(data); data = NULL; } } else { data->n_size = n; mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line); if (data->line[0] != '\0') { // modify monitored event to EPOLLOUT, wait next loop to send respond ev.data.ptr = data; // Modify event to EPOLLOUT ev.events = EPOLLOUT | EPOLLET; // modify moditored fd event if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1) { perror("epoll_ctl:mod"); if(data != NULL) { free(data); data = NULL; } } } } } } static void *writetask(void *args) { int n = 0; int nwrite = 0; int ret = 0; // data to wirte back to client struct user_data *rdata = NULL; while(1) { pthread_mutex_lock(&w_mutex); while(writehead == NULL) { // if condl false, will unlock mutex pthread_cond_wait(&w_condl, &w_mutex); } rdata = (struct user_data*)writehead->data.ptr; struct task* tmp = writehead; writehead = writehead->next; free(tmp); //echo("[SERVER] thread %d writetask before unlock\n", pthread_self()); pthread_mutex_unlock(&w_mutex); //echo("[SERVER] thread %d writetask after unlock\n", pthread_self()); if(rdata->fd < 0) { goto error; } mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd); writefunc wfunc = (writefunc)args; ret = wfunc(rdata); if(ret < 0) { goto error; } // delete data if (rdata->fd > 0) { // modify monitored event to EPOLLIN, wait next loop to receive data ev.data.fd = rdata->fd; // monitor in message, edge trigger //ev.events = EPOLLIN | EPOLLET; // modify moditored fd event if(epoll_ctl(epfd, EPOLL_CTL_DEL, rdata->fd, &ev)==-1) { perror("epoll_ctl:del"); } close(rdata->fd); } error: free(rdata); } } int basic_create_ipc_server(char * unix_domain_path) { pthread_t *read_tids = NULL; pthread_t *write_tids = NULL; int lsn_fd, apt_fd; struct sockaddr_un srv_addr; struct sockaddr_un clt_addr; socklen_t clt_len; int ret; int i; char line[MAX_LEN] = {0}; struct task *new_task=NULL; struct user_data *rdata=NULL; if((read_tids = (pthread_t *)malloc(READ_THREAD_NUM * sizeof(pthread_t))) == NULL) { perror("read_tids malloc fail\n"); return -1; } // threads for reading thread pool for(i = 0; i < READ_THREAD_NUM; i++) { pthread_create(&read_tids[i], NULL, readtask, NULL); } if((write_tids = (pthread_t *)malloc(WRITE_THREAD_NUM * sizeof(pthread_t))) == NULL) { perror("write_tids malloc fail\n"); return -1; } // threads for writing thread pool for(i = 0; i < WRITE_THREAD_NUM; i++) { pthread_create(&write_tids[i], NULL, writetask, proc_memfd); } epfd = epoll_create(MAX_EPOLL_EVENT_COUNT); //create socket to bind local IP and PORT lsn_fd = socket(AF_UNIX, SOCK_STREAM, 0); // set the descriptor as non-blocking setnonblocking(lsn_fd); ev.data.fd = lsn_fd; ev.events = EPOLLIN|EPOLLET; /*Control an epoll descriptor, $epfd, by requesting the operation op be performed on the target file descriptor, fd. $epfd is an epoll descriptor returned from epoll_create. $op is one of EPOLL_CTL_ADD, EPOLL_CTL_MOD or EPOLL_CTL_DEL. $fd is the file desciptor to be watched. $eventmask is a bitmask of events defined by EPOLLIN, EPOLLOUT, etc. When successful, epoll_ctl returns 0. When an error occurs, epoll_ctl returns -1 and errno is set appropriately. */ if(epoll_ctl(epfd,EPOLL_CTL_ADD,lsn_fd,&ev)==-1) { perror("epoll_ctl:add"); } if(lsn_fd < 0) { perror("can't create communication socket!"); unlink(UNIX_DOMAIN); return 1; } //create unix domain if(unix_domain_path == NULL) { unix_domain_path = UNIX_DOMAIN; } srv_addr.sun_family = AF_UNIX; strncpy(srv_addr.sun_path, unix_domain_path, sizeof(srv_addr.sun_path) - 1); //unlink(UNIX_DOMAIN); //bind sockfd and sockaddr ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr)); if(ret == -1) { perror("can't bind local sockaddr!"); close(lsn_fd); unlink(UNIX_DOMAIN); return 1; } //listen lsn_fd ret = listen(lsn_fd, LISTENQ); if(ret == -1) { perror("can't listen client connect request"); close(lsn_fd); unlink(UNIX_DOMAIN); return 1; } clt_len = sizeof(clt_addr); while(1) { int nfds = epoll_wait(epfd,events,1024,100); int i=0; for(i=0;i<nfds;++i) { if(events[i].data.fd == lsn_fd) { // accept the client connection while((apt_fd=accept(lsn_fd, (struct sockaddr *)&clt_addr, &clt_len))>0) { setnonblocking(apt_fd); //char *str = inet_ntoa(clt_addr.sun_path); //printf("[SERVER] connect from %s \n", str); ev.data.fd = apt_fd; // monitor in message, edge trigger ev.events = EPOLLIN | EPOLLET; // add fd to epoll queue if(epoll_ctl(epfd,EPOLL_CTL_ADD,apt_fd,&ev)==-1) { perror("epoll_ctl:add"); continue; //exit(EXIT_FAILURE); } } if( apt_fd == -1) { if((errno != EAGAIN ) && (errno!= ECONNABORTED) && (errno != EPROTO) && (errno != EINTR)) { perror("accept"); } } continue; } else if (events[i].events & EPOLLIN) //write数据 { printf("EPOLLIN\n"); if( (eventfd = events[i].data.fd) < 0 ) { continue; } printf("[SERVER] put task %d to read queue\n", events[i].data.fd); new_task = (struct task *)malloc(sizeof(struct task)); if(NULL == new_task) { goto error; } new_task->data.fd= eventfd; new_task->next = NULL; // protect task queue (readhead/readtail) pthread_mutex_lock(&r_mutex); // the queue is empty if(readhead == NULL) { readhead = new_task; readtail = new_task; } // queue is not empty else { readtail->next = new_task; readtail = new_task; } // trigger readtask thread pthread_cond_broadcast(&r_condl); pthread_mutex_unlock(&r_mutex); } else if (events[i].events & EPOLLOUT) { rdata = (struct user_data *)events[i].data.ptr; if ( NULL == rdata ) { continue; } printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd); new_task = malloc(sizeof(struct task)); if(NULL == new_task) { goto error; } new_task->data.ptr = (struct user_data*)events[i].data.ptr; new_task->next = NULL; pthread_mutex_lock(&w_mutex); // the queue is empty if (writehead == NULL) { writehead = new_task; writetail = new_task; } // queue is not empty else { writetail->next = new_task; writetail = new_task; } // trigger writetask thread pthread_cond_broadcast(&w_condl); pthread_mutex_unlock(&w_mutex); } else { printf("[SERVER] Error: unknown epoll event"); } } } error: close(apt_fd); close(lsn_fd); unlink(UNIX_DOMAIN); } src/memfd.c
@@ -12,6 +12,7 @@ #include <time.h> #include <stdarg.h> #include "memfd.h" #include <assert.h> #define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \ } while (0)