From 157b3411dd123694ca29dd80fe9ecc683958ccab Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期四, 27 七月 2023 11:54:24 +0800 Subject: [PATCH] add epoll/poll/select sendmsg/recvmsg transmit fd --- src/ipc_server_lib.c | 147 +++++++++++++++++++++++++++---------------------- 1 files changed, 81 insertions(+), 66 deletions(-) diff --git a/src/ipc_server_lib.c b/src/ipc_server_lib.c index 41c22ee..6b2ce1f 100644 --- a/src/ipc_server_lib.c +++ b/src/ipc_server_lib.c @@ -13,6 +13,8 @@ #include <sys/types.h> #include <pthread.h> #include "ipc_msg.h" +#include "memfd.h" + #define MAX_EPOLL_EVENT_COUNT 1024 #define LISTENQ 1024 @@ -26,8 +28,7 @@ struct task* next; }; -//typedef int (*readfunc)(struct user_data*); -typedef int (*writefunc)(struct user_data*); + /* thread exec function */ static void *readtask(void *args); static void *writetask(void *args); @@ -71,7 +72,6 @@ */ static void *readtask(void *args) { - int i = 0; int fd = -1; unsigned int n = 0; @@ -85,16 +85,22 @@ while(readhead == NULL) { // if condl false, will unlock mutex - mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self()); + mydebug("thread:%u waiting, readtask is NULL ... \n", (uint32_t)pthread_self()); pthread_cond_wait(&r_condl, &r_mutex); } - fd = readhead->data.fd; + fd = readhead->data.fd; struct task *tmp = readhead; readhead = readhead->next; free(tmp); pthread_mutex_unlock(&r_mutex); + + if(fd < 0) + { + perror("readtask fd<0"); + continue; + } mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd); @@ -102,7 +108,7 @@ if(NULL == data) { perror("readtask malloc"); - return; + return NULL; } memset(data, 0 , sizeof(struct user_data)); @@ -136,47 +142,47 @@ free(data); data = NULL; } - } - else if (n == 0) - { - close(fd); - mydebug("[SERVER] Info: client closed connection.\n"); - if(data != NULL) - { - free(data); - data = NULL; - } - } - else - { - data->n_size = n; - - mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line); - if (data->line[0] != '\0') - { - // modify monitored event to EPOLLOUT, wait next loop to send respond - ev.data.ptr = data; - // Modify event to EPOLLOUT - ev.events = EPOLLOUT | EPOLLET; - // modify moditored fd event - if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1) - { - perror("epoll_ctl:mod"); - if(data != NULL) - { - free(data); - data = NULL; - } - } - } - } + } + else if (n == 0) + { + close(fd); + mydebug("[SERVER] Info: client closed connection.\n"); + if(data != NULL) + { + free(data); + data = NULL; + } } + else + { + data->n_size = n; + + mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line); + if (data->line[0] != '\0') + { + // modify monitored event to EPOLLOUT, wait next loop to send respond + ev.data.ptr = data; + // Modify event to EPOLLOUT + ev.events = EPOLLOUT | EPOLLET; + // modify moditored fd event + if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1) + { + perror("epoll_ctl:mod"); + if(data != NULL) + { + free(data); + data = NULL; + } + } + } + } + } + + return NULL; } static void *writetask(void *args) { - int n = 0; - int nwrite = 0; int ret = 0; // data to wirte back to client @@ -200,15 +206,18 @@ if(rdata->fd < 0) { - goto error; + continue; } - mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd); + mydebug("[SERVER] writetask %u fetch data %d\n", (uint32_t)pthread_self(), rdata->fd); - writefunc wfunc = (writefunc)args; - ret = wfunc(rdata); - if(ret < 0) + if(NULL != args) { - goto error; + args_data_st *pargs_data_st = (args_data_st *)args; + ret = pargs_data_st->wfunc(rdata, pargs_data_st->args, pargs_data_st->len); + if(ret < 0) + { + goto error; + } } // delete data @@ -230,9 +239,11 @@ error: free(rdata); } + + return NULL; } -int basic_create_ipc_server(char * unix_domain_path) +int basic_create_ipc_server(char * unix_domain_path, args_data_st * pargs_data_st) { pthread_t *read_tids = NULL; pthread_t *write_tids = NULL; @@ -242,16 +253,19 @@ socklen_t clt_len; int ret; int i; - char line[MAX_LEN] = {0}; struct task *new_task=NULL; struct user_data *rdata=NULL; + if(0 == file_exists(unix_domain_path)) + { + unlink(unix_domain_path); + } - if((read_tids = (pthread_t *)malloc(READ_THREAD_NUM * sizeof(pthread_t))) == NULL) - { - perror("read_tids malloc fail\n"); + if((read_tids = (pthread_t *)malloc(READ_THREAD_NUM * sizeof(pthread_t))) == NULL) + { + perror("read_tids malloc fail\n"); return -1; - } + } // threads for reading thread pool for(i = 0; i < READ_THREAD_NUM; i++) @@ -268,7 +282,7 @@ // threads for writing thread pool for(i = 0; i < WRITE_THREAD_NUM; i++) { - pthread_create(&write_tids[i], NULL, writetask, proc_memfd); + pthread_create(&write_tids[i], NULL, writetask, (void *)pargs_data_st); } epfd = epoll_create(MAX_EPOLL_EVENT_COUNT); @@ -300,7 +314,7 @@ if(lsn_fd < 0) { perror("can't create communication socket!"); - unlink(UNIX_DOMAIN); + unlink(unix_domain_path); return 1; } @@ -312,7 +326,6 @@ srv_addr.sun_family = AF_UNIX; strncpy(srv_addr.sun_path, unix_domain_path, sizeof(srv_addr.sun_path) - 1); - //unlink(UNIX_DOMAIN); //bind sockfd and sockaddr ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr)); @@ -320,7 +333,7 @@ { perror("can't bind local sockaddr!"); close(lsn_fd); - unlink(UNIX_DOMAIN); + unlink(unix_domain_path); return 1; } @@ -331,7 +344,7 @@ { perror("can't listen client connect request"); close(lsn_fd); - unlink(UNIX_DOMAIN); + unlink(unix_domain_path); return 1; } @@ -378,20 +391,20 @@ else if (events[i].events & EPOLLIN) //write鏁版嵁 { - printf("EPOLLIN\n"); - if( (eventfd = events[i].data.fd) < 0 ) + mydebug("EPOLLIN\n"); + if(events[i].data.fd < 0) { continue; } - printf("[SERVER] put task %d to read queue\n", events[i].data.fd); + mydebug("[SERVER] put task %d to read queue\n", events[i].data.fd); new_task = (struct task *)malloc(sizeof(struct task)); if(NULL == new_task) { goto error; } - new_task->data.fd= eventfd; + new_task->data.fd= events[i].data.fd; new_task->next = NULL; // protect task queue (readhead/readtail) @@ -421,7 +434,7 @@ { continue; } - printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd); + mydebug("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd); new_task = malloc(sizeof(struct task)); if(NULL == new_task) @@ -453,7 +466,7 @@ } else { - printf("[SERVER] Error: unknown epoll event"); + mydebug("[SERVER] Error: unknown epoll event"); } } } @@ -461,6 +474,8 @@ error: close(apt_fd); close(lsn_fd); - unlink(UNIX_DOMAIN); + unlink(unix_domain_path); + + return 0; } -- Gitblit v1.8.0