From 54951dfb930bea890aef14be02236f81b3a19f2e Mon Sep 17 00:00:00 2001 From: cheliequan <liequanche@126.com> Date: 星期二, 27 十二月 2022 10:23:43 +0800 Subject: [PATCH] 更新消息通知库,使用basic_create_ipc_server函数创建消息监听服务器 --- src/memfd.c | 1 src/ipc_msg.h | 21 + src/ipc_server_lib.c | 466 +++++++++++++++++++++++++++ src/ipc_msg.c | 4 src/Makefile | 6 src/ipc_server.c | 483 +--------------------------- 6 files changed, 508 insertions(+), 473 deletions(-) diff --git a/src/Makefile b/src/Makefile index 772b875..d870100 100644 --- a/src/Makefile +++ b/src/Makefile @@ -17,8 +17,10 @@ $(CC) $(CFLAGS) $(INCLUDE) -fPIC -c shmht_mytests.c memfd.o: $(CC) $(CFLAGS) $(INCLUDE) -fPIC -c memfd.c -ipc_server: - $(CC) $(CFLAGS) $(INCLUDE) -fPIC -lpthread -o $@ ipc_server.c ipc_msg.c memfd.c +ipc_server:libipc_server.so + $(CC) $(CFLAGS) $(INCLUDE) -fPIC -pthread -o $@ ipc_server.c -lipc_server -L . +libipc_server.so: + $(CC) -shared -fPIC -o $@ ipc_server_lib.c ipc_msg.c memfd.c ipc_client: $(CC) $(CFLAGS) $(INCLUDE) -fPIC -o $@ ipc_client.c ipc_msg.c memfd.c clean: diff --git a/src/ipc_msg.c b/src/ipc_msg.c index 0530fd4..804eafa 100644 --- a/src/ipc_msg.c +++ b/src/ipc_msg.c @@ -57,7 +57,7 @@ int send_fd_sendmsg(int fd, memfd_data_st** ppmemfd_data) { memfd_data_st *ptr_memfd_data = *ppmemfd_data; - return send_fd_args(fd, ptr_memfd_data->memfd, ptr_memfd_data->pid, &ptr_memfd_data->data, ptr_memfd_data->len); + return send_fd_args_sendmsg(fd, ptr_memfd_data->memfd, ptr_memfd_data->pid, &ptr_memfd_data->data, ptr_memfd_data->len); } /** @@ -102,4 +102,4 @@ memset(*ppmemfd_data, 0, control_len); memcpy(*ppmemfd_data, CMSG_DATA(&cm), control_len); return ret; -} \ No newline at end of file +} diff --git a/src/ipc_msg.h b/src/ipc_msg.h index e7ad225..5b489c3 100644 --- a/src/ipc_msg.h +++ b/src/ipc_msg.h @@ -11,6 +11,8 @@ #define UNIX_DOMAIN "/tmp/UNIX.domain" +#define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \ +} while (0) #define MAX_LEN 4096 #define HELLO_MSG "hello_basic" @@ -26,20 +28,33 @@ int len; } memfd_data_st; -int send_fd(int fd, memfd_data_st** ppmemfd_data); +struct user_data +{ + int fd; + unsigned int n_size; + char line[MAX_LEN]; +}; + +#define READ_THREAD_NUM 2 +#define WRITE_THREAD_NUM 2 + +int proc_memfd(struct user_data* rdata); +int basic_create_ipc_server(char * unix_domain_path); + +int send_fd_sendmsg(int fd, memfd_data_st** ppmemfd_data); /** * @brief 鍙戦�佺洰鏍囨枃浠舵弿杩扮 * @param fd 浼犻�掍俊鎭殑 UNIX 鍩� 鏂囦欢鎻忚堪绗� * @param fd_to_send 寰呭彂閫佺殑鏂囦欢鎻忚堪绗� */ -int send_fd_args(int fd, int fd_to_send, pid_t pid, void **ppdata, int len); +int send_fd_args_sendmsg(int fd, int fd_to_send, pid_t pid, void **ppdata, int len); /** * @brief 鎺ュ彈鏂囦欢鎻忚堪绗� * @param fd 浼犻�掍俊鎭殑 UNIX 鍩� 鏂囦欢鎻忚堪绗� */ -int recv_fd(int fd, memfd_data_st** ppmemfd_data); +int recv_fd_recvmsg(int fd, memfd_data_st** ppmemfd_data); #ifdef __cplusplus } diff --git a/src/ipc_server.c b/src/ipc_server.c index 45f6082..99418c7 100644 --- a/src/ipc_server.c +++ b/src/ipc_server.c @@ -1,61 +1,14 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> -#include <sys/socket.h> -#include <sys/un.h> #include <unistd.h> -#include <signal.h> -#include <sys/epoll.h> #include <errno.h> -#include <sys/syscall.h> -#include <sys/mman.h> -#include <sys/stat.h> -#include <fcntl.h> +#include <signal.h> #include <sys/types.h> -#include <pthread.h> +#include <sys/stat.h> +#include "memfd.h" #include "ipc_msg.h" - -#define MAX_EPOLL_EVENT_COUNT 1024 -#define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \ -} while (0) -#define LISTENQ 1024 - -// task item in thread pool -struct task -{ - // file descriptor or user_data - epoll_data_t data; - // next task - struct task* next; -}; - -struct user_data -{ - int fd; - unsigned int n_size; - char line[MAX_LEN]; -}; - -//typedef int (*readfunc)(struct user_data*); -typedef int (*writefunc)(struct user_data*); -/* thread exec function */ -void *readtask(void *args); -void *writetask(void *args); - -// mutex lock to protect readhead/readtail -pthread_mutex_t r_mutex; -pthread_cond_t r_condl; - -// mutex lock to protect writehead/writetail -pthread_mutex_t w_mutex; -pthread_cond_t w_condl; -struct task *readhead = NULL, *readtail = NULL; -struct task *writehead = NULL, *writetail = NULL; - -//create epoll -int epfd,eventfd; -struct epoll_event ev,events[LISTENQ]; int g_memfd = 0; @@ -89,23 +42,6 @@ return len; } -void setnonblocking(int sock) -{ - int opts; - - opts = fcntl(sock, F_GETFL); - if(opts<0) - { - perror("fcntl(sock,GETFL)"); - exit(1); - } - - if(fcntl(sock, F_SETFL, opts | O_NONBLOCK)<0) - { - perror("fcntl(sock,SETFL,opts)"); - exit(1); - } -} int proc_memfd(struct user_data* rdata) @@ -166,423 +102,38 @@ } - -/** - * thread exec function - */ -void *readtask(void *args) -{ - int i = 0; - int fd = -1; - unsigned int n = 0; - - struct user_data *data = NULL; - - while (1) - { - // protect task queue (readhead/readtail) - pthread_mutex_lock(&r_mutex); - - while(readhead == NULL) - { - // if condl false, will unlock mutex - mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self()); - pthread_cond_wait(&r_condl, &r_mutex); - } - - fd = readhead->data.fd; - - struct task *tmp = readhead; - readhead = readhead->next; - free(tmp); - pthread_mutex_unlock(&r_mutex); - - mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd); - - data = (struct user_data *)malloc(sizeof(struct user_data)); - if(NULL == data) - { - perror("readtask malloc"); - return; - } - - memset(data, 0 , sizeof(struct user_data)); - data->fd = fd; - - n=0; - int nread = 0; - while((nread = read(fd,data->line+n,MAX_LEN))>0) - { - n+=nread; - } - - if((nread==-1) && (errno != EAGAIN)) - { - perror("read error"); - } - - if(n < 0) - { - if (errno == ECONNRESET) - { - close(fd); - mydebug("[SERVER] Error: readline failed: %s\n", strerror(errno)); - } - else - { - mydebug("readline error \n"); - } - if(data != NULL) - { - free(data); - data = NULL; - } - } - else if (n == 0) - { - close(fd); - mydebug("[SERVER] Info: client closed connection.\n"); - if(data != NULL) - { - free(data); - data = NULL; - } - } - else - { - data->n_size = n; - - mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line); - if (data->line[0] != '\0') - { - // modify monitored event to EPOLLOUT, wait next loop to send respond - ev.data.ptr = data; - // Modify event to EPOLLOUT - ev.events = EPOLLOUT | EPOLLET; - // modify moditored fd event - if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1) - { - perror("epoll_ctl:mod"); - if(data != NULL) - { - free(data); - data = NULL; - } - } - } - } - } -} - -void *writetask(void *args) -{ - int n = 0; - int nwrite = 0; - int ret = 0; - - // data to wirte back to client - struct user_data *rdata = NULL; - while(1) - { - pthread_mutex_lock(&w_mutex); - while(writehead == NULL) - { - // if condl false, will unlock mutex - pthread_cond_wait(&w_condl, &w_mutex); - } - rdata = (struct user_data*)writehead->data.ptr; - struct task* tmp = writehead; - writehead = writehead->next; - free(tmp); - - //echo("[SERVER] thread %d writetask before unlock\n", pthread_self()); - pthread_mutex_unlock(&w_mutex); - //echo("[SERVER] thread %d writetask after unlock\n", pthread_self()); - - if(rdata->fd < 0) - { - goto error; - } - mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd); - - writefunc wfunc = (writefunc)args; - ret = wfunc(rdata); - if(ret < 0) - { - goto error; - } - - // delete data - if (rdata->fd > 0) - { - // modify monitored event to EPOLLIN, wait next loop to receive data - ev.data.fd = rdata->fd; - // monitor in message, edge trigger - //ev.events = EPOLLIN | EPOLLET; - // modify moditored fd event - if(epoll_ctl(epfd, EPOLL_CTL_DEL, rdata->fd, &ev)==-1) - { - perror("epoll_ctl:del"); - } - - close(rdata->fd); - } - -error: - free(rdata); - } -} - - +/*define you our proc_memfd funtion to send your message to other processes锛宱r just put it in shm锛� + notice the file description fd and the pid of creator to other*/ int main(int argc, char **argv) { char *name; ssize_t len; - int lsn_fd, apt_fd; - struct sockaddr_un srv_addr; - struct sockaddr_un clt_addr; - socklen_t clt_len; - int ret; - int i; - //char recv_buf[MAX_LEN] = {0}; - //char send_buf[MAX_LEN] = {0}; - char line[MAX_LEN] = {0}; - pthread_t *read_tids = NULL; - int read_thread_num = 2; - pthread_t *write_tids = NULL; - int write_thread_num = 2; - struct task *new_task=NULL; - struct user_data *rdata=NULL; - + char * unixdomain = NULL; + int ret = 0; if (argc < 3) { - fprintf(stderr, "%s name size\n", argv[0]); + fprintf(stderr, "%s name size [unix domain path]\n", argv[0]); exit(EXIT_FAILURE); } name = argv[1]; len = atoi(argv[2]) * 1024LU * 1024LU * 1024LU; - signal(SIGTERM,handler); + unixdomain = argv[3]; + signal(SIGTERM,handler); + signal(SIGABRT,handler); + signal(SIGSEGV,handler); + + g_memfd = basic_shm_create(name, len); ret = shm_write(g_memfd); - - if((read_tids = (pthread_t *)malloc(read_thread_num * sizeof(pthread_t))) == NULL) - { - perror("read_tids malloc fail\n"); - return -1; - } - - // threads for reading thread pool - for(i = 0; i < read_thread_num; i++) + ret = basic_create_ipc_server(unixdomain); + if(ret < 0) { - pthread_create(&read_tids[i], NULL, readtask, NULL); + printf("failed to create ipc_server\n"); } - if((write_tids = (pthread_t *)malloc(write_thread_num * sizeof(pthread_t))) == NULL) - { - perror("write_tids malloc fail\n"); - return -1; - } - - // threads for writing thread pool - for(i = 0; i < write_thread_num; i++) - { - pthread_create(&write_tids[i], NULL, writetask, proc_memfd); - } - - epfd = epoll_create(MAX_EPOLL_EVENT_COUNT); - - //create socket to bind local IP and PORT - lsn_fd = socket(AF_UNIX, SOCK_STREAM, 0); - - int opt = 0x1; - setsockopt(lsn_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); - - // set the descriptor as non-blocking - setnonblocking(lsn_fd); - - ev.data.fd = lsn_fd; - ev.events = EPOLLIN|EPOLLET; - - /*Control an epoll descriptor, $epfd, by requesting the operation op be performed on the target file descriptor, fd. - - $epfd is an epoll descriptor returned from epoll_create. - $op is one of EPOLL_CTL_ADD, EPOLL_CTL_MOD or EPOLL_CTL_DEL. - $fd is the file desciptor to be watched. - $eventmask is a bitmask of events defined by EPOLLIN, EPOLLOUT, etc. - - When successful, epoll_ctl returns 0. When an error occurs, epoll_ctl returns -1 and errno is set appropriately. - */ - if(epoll_ctl(epfd,EPOLL_CTL_ADD,lsn_fd,&ev)==-1) - { - perror("epoll_ctl:add"); - } - - if(lsn_fd < 0) - { - perror("can't create communication socket!"); - unlink(UNIX_DOMAIN); - basic_shm_close(g_memfd); - return 1; - } - - //create local IP and PORT - srv_addr.sun_family = AF_UNIX; - strncpy(srv_addr.sun_path, UNIX_DOMAIN, sizeof(srv_addr.sun_path) - 1); - //unlink(UNIX_DOMAIN); - - //bind sockfd and sockaddr - ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr)); - if(ret == -1) - { - perror("can't bind local sockaddr!"); - close(lsn_fd); - unlink(UNIX_DOMAIN); - basic_shm_close(g_memfd); - return 1; - } - - //listen lsn_fd, try listen 5 - - ret = listen(lsn_fd, LISTENQ); - if(ret == -1) - { - perror("can't listen client connect request"); - close(lsn_fd); - unlink(UNIX_DOMAIN); - basic_shm_close(g_memfd); - - return 1; - } - - clt_len = sizeof(clt_addr); - while(1) - { - int nfds = epoll_wait(epfd,events,1024,100); - int i=0; - for(i=0;i<nfds;++i) - { - if(events[i].data.fd == lsn_fd) - { - // accept the client connection - while((apt_fd=accept(lsn_fd, (struct sockaddr *)&clt_addr, &clt_len))>0) - { - setnonblocking(apt_fd); - //char *str = inet_ntoa(clt_addr.sun_path); - //printf("[SERVER] connect from %s \n", str); - ev.data.fd = apt_fd; - // monitor in message, edge trigger - ev.events = EPOLLIN | EPOLLET; - // add fd to epoll queue - if(epoll_ctl(epfd,EPOLL_CTL_ADD,apt_fd,&ev)==-1) - { - perror("epoll_ctl:add"); - continue; - //exit(EXIT_FAILURE); - } - } - if( apt_fd == -1) - { - if((errno != EAGAIN ) - && (errno!= ECONNABORTED) - && (errno != EPROTO) - && (errno != EINTR)) - { - perror("accept"); - - } - } - continue; - } - else if (events[i].events & EPOLLIN) - //write鏁版嵁 - { - printf("EPOLLIN\n"); - if( (eventfd = events[i].data.fd) < 0 ) - { - continue; - } - - printf("[SERVER] put task %d to read queue\n", events[i].data.fd); - - new_task = (struct task *)malloc(sizeof(struct task)); - if(NULL == new_task) - { - goto error; - } - new_task->data.fd= eventfd; - new_task->next = NULL; - - // protect task queue (readhead/readtail) - pthread_mutex_lock(&r_mutex); - - // the queue is empty - if(readhead == NULL) - { - readhead = new_task; - readtail = new_task; - } - // queue is not empty - else - { - readtail->next = new_task; - readtail = new_task; - } - - // trigger readtask thread - pthread_cond_broadcast(&r_condl); - pthread_mutex_unlock(&r_mutex); - } - else if (events[i].events & EPOLLOUT) - { - rdata = (struct user_data *)events[i].data.ptr; - if ( NULL == rdata ) - { - continue; - } - printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd); - - new_task = malloc(sizeof(struct task)); - if(NULL == new_task) - { - goto error; - } - new_task->data.ptr = (struct user_data*)events[i].data.ptr; - new_task->next = NULL; - - pthread_mutex_lock(&w_mutex); - - // the queue is empty - if (writehead == NULL) - { - writehead = new_task; - writetail = new_task; - } - // queue is not empty - else - { - writetail->next = new_task; - writetail = new_task; - } - - // trigger writetask thread - pthread_cond_broadcast(&w_condl); - - pthread_mutex_unlock(&w_mutex); - } - else - { - printf("[SERVER] Error: unknown epoll event"); - } - } - } - -error: - close(apt_fd); - close(lsn_fd); - unlink(UNIX_DOMAIN); basic_shm_close(g_memfd); return 0; } diff --git a/src/ipc_server_lib.c b/src/ipc_server_lib.c new file mode 100644 index 0000000..41c22ee --- /dev/null +++ b/src/ipc_server_lib.c @@ -0,0 +1,466 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <unistd.h> +#include <sys/epoll.h> +#include <errno.h> +#include <sys/syscall.h> +#include <sys/mman.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <sys/types.h> +#include <pthread.h> +#include "ipc_msg.h" + +#define MAX_EPOLL_EVENT_COUNT 1024 +#define LISTENQ 1024 + +// task item in thread pool +struct task +{ + // file descriptor or user_data + epoll_data_t data; + // next task + struct task* next; +}; + +//typedef int (*readfunc)(struct user_data*); +typedef int (*writefunc)(struct user_data*); +/* thread exec function */ +static void *readtask(void *args); +static void *writetask(void *args); + +// mutex lock to protect readhead/readtail +pthread_mutex_t r_mutex; +pthread_cond_t r_condl; + +// mutex lock to protect writehead/writetail +pthread_mutex_t w_mutex; +pthread_cond_t w_condl; +struct task *readhead = NULL, *readtail = NULL; +struct task *writehead = NULL, *writetail = NULL; + +//create epoll +int epfd,eventfd; +struct epoll_event ev,events[LISTENQ]; + + +static void setnonblocking(int sock) +{ + int opts; + + opts = fcntl(sock, F_GETFL); + if(opts<0) + { + perror("fcntl(sock,GETFL)"); + exit(1); + } + + if(fcntl(sock, F_SETFL, opts | O_NONBLOCK)<0) + { + perror("fcntl(sock,SETFL,opts)"); + exit(1); + } +} + + +/** + * thread exec function + */ +static void *readtask(void *args) +{ + int i = 0; + int fd = -1; + unsigned int n = 0; + + struct user_data *data = NULL; + + while (1) + { + // protect task queue (readhead/readtail) + pthread_mutex_lock(&r_mutex); + + while(readhead == NULL) + { + // if condl false, will unlock mutex + mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self()); + pthread_cond_wait(&r_condl, &r_mutex); + } + + fd = readhead->data.fd; + + struct task *tmp = readhead; + readhead = readhead->next; + free(tmp); + pthread_mutex_unlock(&r_mutex); + + mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd); + + data = (struct user_data *)malloc(sizeof(struct user_data)); + if(NULL == data) + { + perror("readtask malloc"); + return; + } + + memset(data, 0 , sizeof(struct user_data)); + data->fd = fd; + + n=0; + int nread = 0; + while((nread = read(fd,data->line+n,MAX_LEN))>0) + { + n+=nread; + } + + if((nread==-1) && (errno != EAGAIN)) + { + perror("read error"); + } + + if(n < 0) + { + if (errno == ECONNRESET) + { + close(fd); + mydebug("[SERVER] Error: readline failed: %s\n", strerror(errno)); + } + else + { + mydebug("readline error \n"); + } + if(data != NULL) + { + free(data); + data = NULL; + } + } + else if (n == 0) + { + close(fd); + mydebug("[SERVER] Info: client closed connection.\n"); + if(data != NULL) + { + free(data); + data = NULL; + } + } + else + { + data->n_size = n; + + mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line); + if (data->line[0] != '\0') + { + // modify monitored event to EPOLLOUT, wait next loop to send respond + ev.data.ptr = data; + // Modify event to EPOLLOUT + ev.events = EPOLLOUT | EPOLLET; + // modify moditored fd event + if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1) + { + perror("epoll_ctl:mod"); + if(data != NULL) + { + free(data); + data = NULL; + } + } + } + } + } +} + +static void *writetask(void *args) +{ + int n = 0; + int nwrite = 0; + int ret = 0; + + // data to wirte back to client + struct user_data *rdata = NULL; + while(1) + { + pthread_mutex_lock(&w_mutex); + while(writehead == NULL) + { + // if condl false, will unlock mutex + pthread_cond_wait(&w_condl, &w_mutex); + } + rdata = (struct user_data*)writehead->data.ptr; + struct task* tmp = writehead; + writehead = writehead->next; + free(tmp); + + //echo("[SERVER] thread %d writetask before unlock\n", pthread_self()); + pthread_mutex_unlock(&w_mutex); + //echo("[SERVER] thread %d writetask after unlock\n", pthread_self()); + + if(rdata->fd < 0) + { + goto error; + } + mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd); + + writefunc wfunc = (writefunc)args; + ret = wfunc(rdata); + if(ret < 0) + { + goto error; + } + + // delete data + if (rdata->fd > 0) + { + // modify monitored event to EPOLLIN, wait next loop to receive data + ev.data.fd = rdata->fd; + // monitor in message, edge trigger + //ev.events = EPOLLIN | EPOLLET; + // modify moditored fd event + if(epoll_ctl(epfd, EPOLL_CTL_DEL, rdata->fd, &ev)==-1) + { + perror("epoll_ctl:del"); + } + + close(rdata->fd); + } + +error: + free(rdata); + } +} + +int basic_create_ipc_server(char * unix_domain_path) +{ + pthread_t *read_tids = NULL; + pthread_t *write_tids = NULL; + int lsn_fd, apt_fd; + struct sockaddr_un srv_addr; + struct sockaddr_un clt_addr; + socklen_t clt_len; + int ret; + int i; + char line[MAX_LEN] = {0}; + + struct task *new_task=NULL; + struct user_data *rdata=NULL; + + if((read_tids = (pthread_t *)malloc(READ_THREAD_NUM * sizeof(pthread_t))) == NULL) + { + perror("read_tids malloc fail\n"); + return -1; + } + + // threads for reading thread pool + for(i = 0; i < READ_THREAD_NUM; i++) + { + pthread_create(&read_tids[i], NULL, readtask, NULL); + } + + if((write_tids = (pthread_t *)malloc(WRITE_THREAD_NUM * sizeof(pthread_t))) == NULL) + { + perror("write_tids malloc fail\n"); + return -1; + } + + // threads for writing thread pool + for(i = 0; i < WRITE_THREAD_NUM; i++) + { + pthread_create(&write_tids[i], NULL, writetask, proc_memfd); + } + + epfd = epoll_create(MAX_EPOLL_EVENT_COUNT); + + //create socket to bind local IP and PORT + lsn_fd = socket(AF_UNIX, SOCK_STREAM, 0); + + + // set the descriptor as non-blocking + setnonblocking(lsn_fd); + + ev.data.fd = lsn_fd; + ev.events = EPOLLIN|EPOLLET; + + /*Control an epoll descriptor, $epfd, by requesting the operation op be performed on the target file descriptor, fd. + + $epfd is an epoll descriptor returned from epoll_create. + $op is one of EPOLL_CTL_ADD, EPOLL_CTL_MOD or EPOLL_CTL_DEL. + $fd is the file desciptor to be watched. + $eventmask is a bitmask of events defined by EPOLLIN, EPOLLOUT, etc. + + When successful, epoll_ctl returns 0. When an error occurs, epoll_ctl returns -1 and errno is set appropriately. + */ + if(epoll_ctl(epfd,EPOLL_CTL_ADD,lsn_fd,&ev)==-1) + { + perror("epoll_ctl:add"); + } + + if(lsn_fd < 0) + { + perror("can't create communication socket!"); + unlink(UNIX_DOMAIN); + return 1; + } + + //create unix domain + if(unix_domain_path == NULL) + { + unix_domain_path = UNIX_DOMAIN; + } + + srv_addr.sun_family = AF_UNIX; + strncpy(srv_addr.sun_path, unix_domain_path, sizeof(srv_addr.sun_path) - 1); + //unlink(UNIX_DOMAIN); + + //bind sockfd and sockaddr + ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr)); + if(ret == -1) + { + perror("can't bind local sockaddr!"); + close(lsn_fd); + unlink(UNIX_DOMAIN); + return 1; + } + + //listen lsn_fd + + ret = listen(lsn_fd, LISTENQ); + if(ret == -1) + { + perror("can't listen client connect request"); + close(lsn_fd); + unlink(UNIX_DOMAIN); + + return 1; + } + + clt_len = sizeof(clt_addr); + while(1) + { + int nfds = epoll_wait(epfd,events,1024,100); + int i=0; + for(i=0;i<nfds;++i) + { + if(events[i].data.fd == lsn_fd) + { + // accept the client connection + while((apt_fd=accept(lsn_fd, (struct sockaddr *)&clt_addr, &clt_len))>0) + { + setnonblocking(apt_fd); + //char *str = inet_ntoa(clt_addr.sun_path); + //printf("[SERVER] connect from %s \n", str); + ev.data.fd = apt_fd; + // monitor in message, edge trigger + ev.events = EPOLLIN | EPOLLET; + // add fd to epoll queue + if(epoll_ctl(epfd,EPOLL_CTL_ADD,apt_fd,&ev)==-1) + { + perror("epoll_ctl:add"); + continue; + //exit(EXIT_FAILURE); + } + } + if( apt_fd == -1) + { + if((errno != EAGAIN ) + && (errno!= ECONNABORTED) + && (errno != EPROTO) + && (errno != EINTR)) + { + perror("accept"); + + } + } + continue; + } + else if (events[i].events & EPOLLIN) + //write鏁版嵁 + { + printf("EPOLLIN\n"); + if( (eventfd = events[i].data.fd) < 0 ) + { + continue; + } + + printf("[SERVER] put task %d to read queue\n", events[i].data.fd); + + new_task = (struct task *)malloc(sizeof(struct task)); + if(NULL == new_task) + { + goto error; + } + new_task->data.fd= eventfd; + new_task->next = NULL; + + // protect task queue (readhead/readtail) + pthread_mutex_lock(&r_mutex); + + // the queue is empty + if(readhead == NULL) + { + readhead = new_task; + readtail = new_task; + } + // queue is not empty + else + { + readtail->next = new_task; + readtail = new_task; + } + + // trigger readtask thread + pthread_cond_broadcast(&r_condl); + pthread_mutex_unlock(&r_mutex); + } + else if (events[i].events & EPOLLOUT) + { + rdata = (struct user_data *)events[i].data.ptr; + if ( NULL == rdata ) + { + continue; + } + printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd); + + new_task = malloc(sizeof(struct task)); + if(NULL == new_task) + { + goto error; + } + new_task->data.ptr = (struct user_data*)events[i].data.ptr; + new_task->next = NULL; + + pthread_mutex_lock(&w_mutex); + + // the queue is empty + if (writehead == NULL) + { + writehead = new_task; + writetail = new_task; + } + // queue is not empty + else + { + writetail->next = new_task; + writetail = new_task; + } + + // trigger writetask thread + pthread_cond_broadcast(&w_condl); + + pthread_mutex_unlock(&w_mutex); + } + else + { + printf("[SERVER] Error: unknown epoll event"); + } + } + } + +error: + close(apt_fd); + close(lsn_fd); + unlink(UNIX_DOMAIN); +} + diff --git a/src/memfd.c b/src/memfd.c index 2509685..00df30e 100644 --- a/src/memfd.c +++ b/src/memfd.c @@ -12,6 +12,7 @@ #include <time.h> #include <stdarg.h> #include "memfd.h" +#include <assert.h> #define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \ } while (0) -- Gitblit v1.8.0