#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "ipc_msg.h" #include "memfd.h" #define MAX_EPOLL_EVENT_COUNT 1024 #define LISTENQ 1024 // task item in thread pool struct task { // file descriptor or user_data epoll_data_t data; // next task struct task* next; }; /* thread exec function */ static void *readtask(void *args); static void *writetask(void *args); // mutex lock to protect readhead/readtail pthread_mutex_t r_mutex; pthread_cond_t r_condl; // mutex lock to protect writehead/writetail pthread_mutex_t w_mutex; pthread_cond_t w_condl; struct task *readhead = NULL, *readtail = NULL; struct task *writehead = NULL, *writetail = NULL; //create epoll int epfd,eventfd; struct epoll_event ev,events[LISTENQ]; static void setnonblocking(int sock) { int opts; opts = fcntl(sock, F_GETFL); if(opts<0) { perror("fcntl(sock,GETFL)"); exit(1); } if(fcntl(sock, F_SETFL, opts | O_NONBLOCK)<0) { perror("fcntl(sock,SETFL,opts)"); exit(1); } } /** * thread exec function */ static void *readtask(void *args) { int fd = -1; unsigned int n = 0; struct user_data *data = NULL; while (1) { // protect task queue (readhead/readtail) pthread_mutex_lock(&r_mutex); while(readhead == NULL) { // if condl false, will unlock mutex mydebug("thread:%u waiting, readtask is NULL ... \n", (uint32_t)pthread_self()); pthread_cond_wait(&r_condl, &r_mutex); } fd = readhead->data.fd; struct task *tmp = readhead; readhead = readhead->next; free(tmp); pthread_mutex_unlock(&r_mutex); if(fd < 0) { perror("readtask fd<0"); continue; } mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd); data = (struct user_data *)malloc(sizeof(struct user_data)); if(NULL == data) { perror("readtask malloc"); return NULL; } memset(data, 0 , sizeof(struct user_data)); data->fd = fd; n=0; int nread = 0; while((nread = read(fd,data->line+n,MAX_LEN))>0) { n+=nread; } if((nread==-1) && (errno != EAGAIN)) { perror("read error"); } if(n < 0) { if (errno == ECONNRESET) { close(fd); mydebug("[SERVER] Error: readline failed: %s\n", strerror(errno)); } else { mydebug("readline error \n"); } if(data != NULL) { free(data); data = NULL; } } else if (n == 0) { close(fd); mydebug("[SERVER] Info: client closed connection.\n"); if(data != NULL) { free(data); data = NULL; } } else { data->n_size = n; mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line); if (data->line[0] != '\0') { // modify monitored event to EPOLLOUT, wait next loop to send respond ev.data.ptr = data; // Modify event to EPOLLOUT ev.events = EPOLLOUT | EPOLLET; // modify moditored fd event if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1) { perror("epoll_ctl:mod"); if(data != NULL) { free(data); data = NULL; } } } } } return NULL; } static void *writetask(void *args) { int ret = 0; // data to wirte back to client struct user_data *rdata = NULL; while(1) { pthread_mutex_lock(&w_mutex); while(writehead == NULL) { // if condl false, will unlock mutex pthread_cond_wait(&w_condl, &w_mutex); } rdata = (struct user_data*)writehead->data.ptr; struct task* tmp = writehead; writehead = writehead->next; free(tmp); //echo("[SERVER] thread %d writetask before unlock\n", pthread_self()); pthread_mutex_unlock(&w_mutex); //echo("[SERVER] thread %d writetask after unlock\n", pthread_self()); if(rdata->fd < 0) { continue; } mydebug("[SERVER] writetask %u fetch data %d\n", (uint32_t)pthread_self(), rdata->fd); if(NULL != args) { args_data_st *pargs_data_st = (args_data_st *)args; ret = pargs_data_st->wfunc(rdata, pargs_data_st->args, pargs_data_st->len); if(ret < 0) { goto error; } } // delete data if (rdata->fd > 0) { // modify monitored event to EPOLLIN, wait next loop to receive data ev.data.fd = rdata->fd; // monitor in message, edge trigger //ev.events = EPOLLIN | EPOLLET; // modify moditored fd event if(epoll_ctl(epfd, EPOLL_CTL_DEL, rdata->fd, &ev)==-1) { perror("epoll_ctl:del"); } close(rdata->fd); } error: free(rdata); } return NULL; } int basic_create_ipc_server(char * unix_domain_path, args_data_st * pargs_data_st) { pthread_t *read_tids = NULL; pthread_t *write_tids = NULL; int lsn_fd, apt_fd; struct sockaddr_un srv_addr; struct sockaddr_un clt_addr; socklen_t clt_len; int ret; int i; struct task *new_task=NULL; struct user_data *rdata=NULL; if(0 == file_exists(unix_domain_path)) { unlink(unix_domain_path); } if((read_tids = (pthread_t *)malloc(READ_THREAD_NUM * sizeof(pthread_t))) == NULL) { perror("read_tids malloc fail\n"); return -1; } // threads for reading thread pool for(i = 0; i < READ_THREAD_NUM; i++) { pthread_create(&read_tids[i], NULL, readtask, NULL); } if((write_tids = (pthread_t *)malloc(WRITE_THREAD_NUM * sizeof(pthread_t))) == NULL) { perror("write_tids malloc fail\n"); return -1; } // threads for writing thread pool for(i = 0; i < WRITE_THREAD_NUM; i++) { pthread_create(&write_tids[i], NULL, writetask, (void *)pargs_data_st); } epfd = epoll_create(MAX_EPOLL_EVENT_COUNT); //create socket to bind local IP and PORT lsn_fd = socket(AF_UNIX, SOCK_STREAM, 0); // set the descriptor as non-blocking setnonblocking(lsn_fd); ev.data.fd = lsn_fd; ev.events = EPOLLIN|EPOLLET; /*Control an epoll descriptor, $epfd, by requesting the operation op be performed on the target file descriptor, fd. $epfd is an epoll descriptor returned from epoll_create. $op is one of EPOLL_CTL_ADD, EPOLL_CTL_MOD or EPOLL_CTL_DEL. $fd is the file desciptor to be watched. $eventmask is a bitmask of events defined by EPOLLIN, EPOLLOUT, etc. When successful, epoll_ctl returns 0. When an error occurs, epoll_ctl returns -1 and errno is set appropriately. */ if(epoll_ctl(epfd,EPOLL_CTL_ADD,lsn_fd,&ev)==-1) { perror("epoll_ctl:add"); } if(lsn_fd < 0) { perror("can't create communication socket!"); unlink(unix_domain_path); return 1; } //create unix domain if(unix_domain_path == NULL) { unix_domain_path = UNIX_DOMAIN; } srv_addr.sun_family = AF_UNIX; strncpy(srv_addr.sun_path, unix_domain_path, sizeof(srv_addr.sun_path) - 1); //bind sockfd and sockaddr ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr)); if(ret == -1) { perror("can't bind local sockaddr!"); close(lsn_fd); unlink(unix_domain_path); return 1; } //listen lsn_fd ret = listen(lsn_fd, LISTENQ); if(ret == -1) { perror("can't listen client connect request"); close(lsn_fd); unlink(unix_domain_path); return 1; } clt_len = sizeof(clt_addr); while(1) { int nfds = epoll_wait(epfd,events,1024,100); int i=0; for(i=0;i0) { setnonblocking(apt_fd); //char *str = inet_ntoa(clt_addr.sun_path); //printf("[SERVER] connect from %s \n", str); ev.data.fd = apt_fd; // monitor in message, edge trigger ev.events = EPOLLIN | EPOLLET; // add fd to epoll queue if(epoll_ctl(epfd,EPOLL_CTL_ADD,apt_fd,&ev)==-1) { perror("epoll_ctl:add"); continue; //exit(EXIT_FAILURE); } } if( apt_fd == -1) { if((errno != EAGAIN ) && (errno!= ECONNABORTED) && (errno != EPROTO) && (errno != EINTR)) { perror("accept"); } } continue; } else if (events[i].events & EPOLLIN) //write数据 { mydebug("EPOLLIN\n"); if(events[i].data.fd < 0) { continue; } mydebug("[SERVER] put task %d to read queue\n", events[i].data.fd); new_task = (struct task *)malloc(sizeof(struct task)); if(NULL == new_task) { goto error; } new_task->data.fd= events[i].data.fd; new_task->next = NULL; // protect task queue (readhead/readtail) pthread_mutex_lock(&r_mutex); // the queue is empty if(readhead == NULL) { readhead = new_task; readtail = new_task; } // queue is not empty else { readtail->next = new_task; readtail = new_task; } // trigger readtask thread pthread_cond_broadcast(&r_condl); pthread_mutex_unlock(&r_mutex); } else if (events[i].events & EPOLLOUT) { rdata = (struct user_data *)events[i].data.ptr; if ( NULL == rdata ) { continue; } mydebug("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd); new_task = malloc(sizeof(struct task)); if(NULL == new_task) { goto error; } new_task->data.ptr = (struct user_data*)events[i].data.ptr; new_task->next = NULL; pthread_mutex_lock(&w_mutex); // the queue is empty if (writehead == NULL) { writehead = new_task; writetail = new_task; } // queue is not empty else { writetail->next = new_task; writetail = new_task; } // trigger writetask thread pthread_cond_broadcast(&w_condl); pthread_mutex_unlock(&w_mutex); } else { mydebug("[SERVER] Error: unknown epoll event"); } } } error: close(apt_fd); close(lsn_fd); unlink(unix_domain_path); return 0; }