| | |
| | | #include <sys/types.h> |
| | | #include <pthread.h> |
| | | #include "ipc_msg.h" |
| | | #include "memfd.h" |
| | | |
| | | |
| | | #define MAX_EPOLL_EVENT_COUNT 1024 |
| | | #define LISTENQ 1024 |
| | |
| | | struct task* next; |
| | | }; |
| | | |
| | | //typedef int (*readfunc)(struct user_data*); |
| | | typedef int (*writefunc)(struct user_data*); |
| | | |
| | | /* thread exec function */ |
| | | static void *readtask(void *args); |
| | | static void *writetask(void *args); |
| | |
| | | */ |
| | | static void *readtask(void *args) |
| | | { |
| | | int i = 0; |
| | | int fd = -1; |
| | | unsigned int n = 0; |
| | | |
| | |
| | | while(readhead == NULL) |
| | | { |
| | | // if condl false, will unlock mutex |
| | | mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self()); |
| | | mydebug("thread:%u waiting, readtask is NULL ... \n", (uint32_t)pthread_self()); |
| | | pthread_cond_wait(&r_condl, &r_mutex); |
| | | } |
| | | |
| | | fd = readhead->data.fd; |
| | | fd = readhead->data.fd; |
| | | |
| | | struct task *tmp = readhead; |
| | | readhead = readhead->next; |
| | | free(tmp); |
| | | pthread_mutex_unlock(&r_mutex); |
| | | |
| | | if(fd < 0) |
| | | { |
| | | perror("readtask fd<0"); |
| | | continue; |
| | | } |
| | | |
| | | mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd); |
| | | |
| | |
| | | if(NULL == data) |
| | | { |
| | | perror("readtask malloc"); |
| | | return; |
| | | return NULL; |
| | | } |
| | | |
| | | memset(data, 0 , sizeof(struct user_data)); |
| | |
| | | free(data); |
| | | data = NULL; |
| | | } |
| | | } |
| | | else if (n == 0) |
| | | { |
| | | close(fd); |
| | | mydebug("[SERVER] Info: client closed connection.\n"); |
| | | if(data != NULL) |
| | | { |
| | | free(data); |
| | | data = NULL; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | data->n_size = n; |
| | | |
| | | mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line); |
| | | if (data->line[0] != '\0') |
| | | { |
| | | // modify monitored event to EPOLLOUT, wait next loop to send respond |
| | | ev.data.ptr = data; |
| | | // Modify event to EPOLLOUT |
| | | ev.events = EPOLLOUT | EPOLLET; |
| | | // modify moditored fd event |
| | | if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1) |
| | | { |
| | | perror("epoll_ctl:mod"); |
| | | if(data != NULL) |
| | | { |
| | | free(data); |
| | | data = NULL; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | else if (n == 0) |
| | | { |
| | | close(fd); |
| | | mydebug("[SERVER] Info: client closed connection.\n"); |
| | | if(data != NULL) |
| | | { |
| | | free(data); |
| | | data = NULL; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | data->n_size = n; |
| | | |
| | | mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line); |
| | | if (data->line[0] != '\0') |
| | | { |
| | | // modify monitored event to EPOLLOUT, wait next loop to send respond |
| | | ev.data.ptr = data; |
| | | // Modify event to EPOLLOUT |
| | | ev.events = EPOLLOUT | EPOLLET; |
| | | // modify moditored fd event |
| | | if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1) |
| | | { |
| | | perror("epoll_ctl:mod"); |
| | | if(data != NULL) |
| | | { |
| | | free(data); |
| | | data = NULL; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | return NULL; |
| | | } |
| | | |
| | | static void *writetask(void *args) |
| | | { |
| | | int n = 0; |
| | | int nwrite = 0; |
| | | int ret = 0; |
| | | |
| | | // data to wirte back to client |
| | |
| | | |
| | | if(rdata->fd < 0) |
| | | { |
| | | goto error; |
| | | continue; |
| | | } |
| | | mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd); |
| | | mydebug("[SERVER] writetask %u fetch data %d\n", (uint32_t)pthread_self(), rdata->fd); |
| | | |
| | | writefunc wfunc = (writefunc)args; |
| | | ret = wfunc(rdata); |
| | | if(ret < 0) |
| | | if(NULL != args) |
| | | { |
| | | goto error; |
| | | args_data_st *pargs_data_st = (args_data_st *)args; |
| | | ret = pargs_data_st->wfunc(rdata, pargs_data_st->args, pargs_data_st->len); |
| | | if(ret < 0) |
| | | { |
| | | goto error; |
| | | } |
| | | } |
| | | |
| | | // delete data |
| | |
| | | error: |
| | | free(rdata); |
| | | } |
| | | |
| | | return NULL; |
| | | } |
| | | |
| | | int basic_create_ipc_server(char * unix_domain_path) |
| | | int basic_create_ipc_server(char * unix_domain_path, args_data_st * pargs_data_st) |
| | | { |
| | | pthread_t *read_tids = NULL; |
| | | pthread_t *write_tids = NULL; |
| | |
| | | socklen_t clt_len; |
| | | int ret; |
| | | int i; |
| | | char line[MAX_LEN] = {0}; |
| | | |
| | | struct task *new_task=NULL; |
| | | struct user_data *rdata=NULL; |
| | | if(0 == file_exists(unix_domain_path)) |
| | | { |
| | | unlink(unix_domain_path); |
| | | } |
| | | |
| | | if((read_tids = (pthread_t *)malloc(READ_THREAD_NUM * sizeof(pthread_t))) == NULL) |
| | | { |
| | | perror("read_tids malloc fail\n"); |
| | | if((read_tids = (pthread_t *)malloc(READ_THREAD_NUM * sizeof(pthread_t))) == NULL) |
| | | { |
| | | perror("read_tids malloc fail\n"); |
| | | return -1; |
| | | } |
| | | } |
| | | |
| | | // threads for reading thread pool |
| | | for(i = 0; i < READ_THREAD_NUM; i++) |
| | |
| | | // threads for writing thread pool |
| | | for(i = 0; i < WRITE_THREAD_NUM; i++) |
| | | { |
| | | pthread_create(&write_tids[i], NULL, writetask, proc_memfd); |
| | | pthread_create(&write_tids[i], NULL, writetask, (void *)pargs_data_st); |
| | | } |
| | | |
| | | epfd = epoll_create(MAX_EPOLL_EVENT_COUNT); |
| | |
| | | if(lsn_fd < 0) |
| | | { |
| | | perror("can't create communication socket!"); |
| | | unlink(UNIX_DOMAIN); |
| | | unlink(unix_domain_path); |
| | | return 1; |
| | | } |
| | | |
| | |
| | | |
| | | srv_addr.sun_family = AF_UNIX; |
| | | strncpy(srv_addr.sun_path, unix_domain_path, sizeof(srv_addr.sun_path) - 1); |
| | | //unlink(UNIX_DOMAIN); |
| | | |
| | | //bind sockfd and sockaddr |
| | | ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr)); |
| | |
| | | { |
| | | perror("can't bind local sockaddr!"); |
| | | close(lsn_fd); |
| | | unlink(UNIX_DOMAIN); |
| | | unlink(unix_domain_path); |
| | | return 1; |
| | | } |
| | | |
| | |
| | | { |
| | | perror("can't listen client connect request"); |
| | | close(lsn_fd); |
| | | unlink(UNIX_DOMAIN); |
| | | unlink(unix_domain_path); |
| | | |
| | | return 1; |
| | | } |
| | |
| | | else if (events[i].events & EPOLLIN) |
| | | //write数据 |
| | | { |
| | | printf("EPOLLIN\n"); |
| | | if( (eventfd = events[i].data.fd) < 0 ) |
| | | mydebug("EPOLLIN\n"); |
| | | if(events[i].data.fd < 0) |
| | | { |
| | | continue; |
| | | } |
| | | |
| | | printf("[SERVER] put task %d to read queue\n", events[i].data.fd); |
| | | mydebug("[SERVER] put task %d to read queue\n", events[i].data.fd); |
| | | |
| | | new_task = (struct task *)malloc(sizeof(struct task)); |
| | | if(NULL == new_task) |
| | | { |
| | | goto error; |
| | | } |
| | | new_task->data.fd= eventfd; |
| | | new_task->data.fd= events[i].data.fd; |
| | | new_task->next = NULL; |
| | | |
| | | // protect task queue (readhead/readtail) |
| | |
| | | { |
| | | continue; |
| | | } |
| | | printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd); |
| | | mydebug("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd); |
| | | |
| | | new_task = malloc(sizeof(struct task)); |
| | | if(NULL == new_task) |
| | |
| | | } |
| | | else |
| | | { |
| | | printf("[SERVER] Error: unknown epoll event"); |
| | | mydebug("[SERVER] Error: unknown epoll event"); |
| | | } |
| | | } |
| | | } |
| | |
| | | error: |
| | | close(apt_fd); |
| | | close(lsn_fd); |
| | | unlink(UNIX_DOMAIN); |
| | | unlink(unix_domain_path); |
| | | |
| | | return 0; |
| | | } |
| | | |