| | |
| | | #include <stdio.h> |
| | | #include <stdlib.h> |
| | | #include <string.h> |
| | | #include <sys/socket.h> |
| | | #include <sys/un.h> |
| | | #include <unistd.h> |
| | | #include <signal.h> |
| | | #include <sys/epoll.h> |
| | | #include <errno.h> |
| | | #include <sys/syscall.h> |
| | | #include <sys/mman.h> |
| | | #include <sys/stat.h> |
| | | #include <fcntl.h> |
| | | #include <signal.h> |
| | | #include <sys/types.h> |
| | | #include <pthread.h> |
| | | #include <sys/stat.h> |
| | | |
| | | #include "memfd.h" |
| | | #include "ipc_msg.h" |
| | | |
| | | #define MAX_EPOLL_EVENT_COUNT 1024 |
| | | #define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \ |
| | | } while (0) |
| | | #define LISTENQ 1024 |
| | | |
| | | // task item in thread pool |
| | | struct task |
| | | { |
| | | // file descriptor or user_data |
| | | epoll_data_t data; |
| | | // next task |
| | | struct task* next; |
| | | }; |
| | | |
| | | struct user_data |
| | | { |
| | | int fd; |
| | | unsigned int n_size; |
| | | char line[MAX_LEN]; |
| | | }; |
| | | |
| | | //typedef int (*readfunc)(struct user_data*); |
| | | typedef int (*writefunc)(struct user_data*); |
| | | /* thread exec function */ |
| | | void *readtask(void *args); |
| | | void *writetask(void *args); |
| | | |
| | | // mutex lock to protect readhead/readtail |
| | | pthread_mutex_t r_mutex; |
| | | pthread_cond_t r_condl; |
| | | |
| | | // mutex lock to protect writehead/writetail |
| | | pthread_mutex_t w_mutex; |
| | | pthread_cond_t w_condl; |
| | | struct task *readhead = NULL, *readtail = NULL; |
| | | struct task *writehead = NULL, *writetail = NULL; |
| | | |
| | | //create epoll |
| | | int epfd,eventfd; |
| | | struct epoll_event ev,events[LISTENQ]; |
| | | |
| | | int g_memfd = 0; |
| | | |
| | |
| | | return len; |
| | | } |
| | | |
| | | void setnonblocking(int sock) |
| | | { |
| | | int opts; |
| | | |
| | | opts = fcntl(sock, F_GETFL); |
| | | if(opts<0) |
| | | { |
| | | perror("fcntl(sock,GETFL)"); |
| | | exit(1); |
| | | } |
| | | |
| | | if(fcntl(sock, F_SETFL, opts | O_NONBLOCK)<0) |
| | | { |
| | | perror("fcntl(sock,SETFL,opts)"); |
| | | exit(1); |
| | | } |
| | | } |
| | | |
| | | |
| | | int proc_memfd(struct user_data* rdata) |
| | |
| | | |
| | | } |
| | | |
| | | |
| | | /** |
| | | * thread exec function |
| | | */ |
| | | void *readtask(void *args) |
| | | { |
| | | int i = 0; |
| | | int fd = -1; |
| | | unsigned int n = 0; |
| | | |
| | | struct user_data *data = NULL; |
| | | |
| | | while (1) |
| | | { |
| | | // protect task queue (readhead/readtail) |
| | | pthread_mutex_lock(&r_mutex); |
| | | |
| | | while(readhead == NULL) |
| | | { |
| | | // if condl false, will unlock mutex |
| | | mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self()); |
| | | pthread_cond_wait(&r_condl, &r_mutex); |
| | | } |
| | | |
| | | fd = readhead->data.fd; |
| | | |
| | | struct task *tmp = readhead; |
| | | readhead = readhead->next; |
| | | free(tmp); |
| | | pthread_mutex_unlock(&r_mutex); |
| | | |
| | | mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd); |
| | | |
| | | data = (struct user_data *)malloc(sizeof(struct user_data)); |
| | | if(NULL == data) |
| | | { |
| | | perror("readtask malloc"); |
| | | return; |
| | | } |
| | | |
| | | memset(data, 0 , sizeof(struct user_data)); |
| | | data->fd = fd; |
| | | |
| | | n=0; |
| | | int nread = 0; |
| | | while((nread = read(fd,data->line+n,MAX_LEN))>0) |
| | | { |
| | | n+=nread; |
| | | } |
| | | |
| | | if((nread==-1) && (errno != EAGAIN)) |
| | | { |
| | | perror("read error"); |
| | | } |
| | | |
| | | if(n < 0) |
| | | { |
| | | if (errno == ECONNRESET) |
| | | { |
| | | close(fd); |
| | | mydebug("[SERVER] Error: readline failed: %s\n", strerror(errno)); |
| | | } |
| | | else |
| | | { |
| | | mydebug("readline error \n"); |
| | | } |
| | | if(data != NULL) |
| | | { |
| | | free(data); |
| | | data = NULL; |
| | | } |
| | | } |
| | | else if (n == 0) |
| | | { |
| | | close(fd); |
| | | mydebug("[SERVER] Info: client closed connection.\n"); |
| | | if(data != NULL) |
| | | { |
| | | free(data); |
| | | data = NULL; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | data->n_size = n; |
| | | |
| | | mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line); |
| | | if (data->line[0] != '\0') |
| | | { |
| | | // modify monitored event to EPOLLOUT, wait next loop to send respond |
| | | ev.data.ptr = data; |
| | | // Modify event to EPOLLOUT |
| | | ev.events = EPOLLOUT | EPOLLET; |
| | | // modify moditored fd event |
| | | if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1) |
| | | { |
| | | perror("epoll_ctl:mod"); |
| | | if(data != NULL) |
| | | { |
| | | free(data); |
| | | data = NULL; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | void *writetask(void *args) |
| | | { |
| | | int n = 0; |
| | | int nwrite = 0; |
| | | int ret = 0; |
| | | |
| | | // data to wirte back to client |
| | | struct user_data *rdata = NULL; |
| | | while(1) |
| | | { |
| | | pthread_mutex_lock(&w_mutex); |
| | | while(writehead == NULL) |
| | | { |
| | | // if condl false, will unlock mutex |
| | | pthread_cond_wait(&w_condl, &w_mutex); |
| | | } |
| | | rdata = (struct user_data*)writehead->data.ptr; |
| | | struct task* tmp = writehead; |
| | | writehead = writehead->next; |
| | | free(tmp); |
| | | |
| | | //echo("[SERVER] thread %d writetask before unlock\n", pthread_self()); |
| | | pthread_mutex_unlock(&w_mutex); |
| | | //echo("[SERVER] thread %d writetask after unlock\n", pthread_self()); |
| | | |
| | | if(rdata->fd < 0) |
| | | { |
| | | goto error; |
| | | } |
| | | mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd); |
| | | |
| | | writefunc wfunc = (writefunc)args; |
| | | ret = wfunc(rdata); |
| | | if(ret < 0) |
| | | { |
| | | goto error; |
| | | } |
| | | |
| | | // delete data |
| | | if (rdata->fd > 0) |
| | | { |
| | | // modify monitored event to EPOLLIN, wait next loop to receive data |
| | | ev.data.fd = rdata->fd; |
| | | // monitor in message, edge trigger |
| | | //ev.events = EPOLLIN | EPOLLET; |
| | | // modify moditored fd event |
| | | if(epoll_ctl(epfd, EPOLL_CTL_DEL, rdata->fd, &ev)==-1) |
| | | { |
| | | perror("epoll_ctl:del"); |
| | | } |
| | | |
| | | close(rdata->fd); |
| | | } |
| | | |
| | | error: |
| | | free(rdata); |
| | | } |
| | | } |
| | | |
| | | |
| | | /*define you our proc_memfd funtion to send your message to other processes,or just put it in shm, |
| | | notice the file description fd and the pid of creator to other*/ |
| | | int main(int argc, char **argv) |
| | | { |
| | | char *name; |
| | | ssize_t len; |
| | | int lsn_fd, apt_fd; |
| | | struct sockaddr_un srv_addr; |
| | | struct sockaddr_un clt_addr; |
| | | socklen_t clt_len; |
| | | int ret; |
| | | int i; |
| | | //char recv_buf[MAX_LEN] = {0}; |
| | | //char send_buf[MAX_LEN] = {0}; |
| | | char line[MAX_LEN] = {0}; |
| | | pthread_t *read_tids = NULL; |
| | | int read_thread_num = 2; |
| | | pthread_t *write_tids = NULL; |
| | | int write_thread_num = 2; |
| | | struct task *new_task=NULL; |
| | | struct user_data *rdata=NULL; |
| | | |
| | | char * unixdomain = NULL; |
| | | int ret = 0; |
| | | |
| | | if (argc < 3) { |
| | | fprintf(stderr, "%s name size\n", argv[0]); |
| | | fprintf(stderr, "%s name size [unix domain path]\n", argv[0]); |
| | | exit(EXIT_FAILURE); |
| | | } |
| | | |
| | | name = argv[1]; |
| | | len = atoi(argv[2]) * 1024LU * 1024LU * 1024LU; |
| | | signal(SIGTERM,handler); |
| | | unixdomain = argv[3]; |
| | | |
| | | signal(SIGTERM,handler); |
| | | signal(SIGABRT,handler); |
| | | signal(SIGSEGV,handler); |
| | | |
| | | |
| | | g_memfd = basic_shm_create(name, len); |
| | | ret = shm_write(g_memfd); |
| | | |
| | | |
| | | if((read_tids = (pthread_t *)malloc(read_thread_num * sizeof(pthread_t))) == NULL) |
| | | { |
| | | perror("read_tids malloc fail\n"); |
| | | return -1; |
| | | } |
| | | |
| | | // threads for reading thread pool |
| | | for(i = 0; i < read_thread_num; i++) |
| | | ret = basic_create_ipc_server(unixdomain); |
| | | if(ret < 0) |
| | | { |
| | | pthread_create(&read_tids[i], NULL, readtask, NULL); |
| | | printf("failed to create ipc_server\n"); |
| | | } |
| | | |
| | | if((write_tids = (pthread_t *)malloc(write_thread_num * sizeof(pthread_t))) == NULL) |
| | | { |
| | | perror("write_tids malloc fail\n"); |
| | | return -1; |
| | | } |
| | | |
| | | // threads for writing thread pool |
| | | for(i = 0; i < write_thread_num; i++) |
| | | { |
| | | pthread_create(&write_tids[i], NULL, writetask, proc_memfd); |
| | | } |
| | | |
| | | epfd = epoll_create(MAX_EPOLL_EVENT_COUNT); |
| | | |
| | | //create socket to bind local IP and PORT |
| | | lsn_fd = socket(AF_UNIX, SOCK_STREAM, 0); |
| | | |
| | | int opt = 0x1; |
| | | setsockopt(lsn_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); |
| | | |
| | | // set the descriptor as non-blocking |
| | | setnonblocking(lsn_fd); |
| | | |
| | | ev.data.fd = lsn_fd; |
| | | ev.events = EPOLLIN|EPOLLET; |
| | | |
| | | /*Control an epoll descriptor, $epfd, by requesting the operation op be performed on the target file descriptor, fd. |
| | | |
| | | $epfd is an epoll descriptor returned from epoll_create. |
| | | $op is one of EPOLL_CTL_ADD, EPOLL_CTL_MOD or EPOLL_CTL_DEL. |
| | | $fd is the file desciptor to be watched. |
| | | $eventmask is a bitmask of events defined by EPOLLIN, EPOLLOUT, etc. |
| | | |
| | | When successful, epoll_ctl returns 0. When an error occurs, epoll_ctl returns -1 and errno is set appropriately. |
| | | */ |
| | | if(epoll_ctl(epfd,EPOLL_CTL_ADD,lsn_fd,&ev)==-1) |
| | | { |
| | | perror("epoll_ctl:add"); |
| | | } |
| | | |
| | | if(lsn_fd < 0) |
| | | { |
| | | perror("can't create communication socket!"); |
| | | unlink(UNIX_DOMAIN); |
| | | basic_shm_close(g_memfd); |
| | | return 1; |
| | | } |
| | | |
| | | //create local IP and PORT |
| | | srv_addr.sun_family = AF_UNIX; |
| | | strncpy(srv_addr.sun_path, UNIX_DOMAIN, sizeof(srv_addr.sun_path) - 1); |
| | | //unlink(UNIX_DOMAIN); |
| | | |
| | | //bind sockfd and sockaddr |
| | | ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr)); |
| | | if(ret == -1) |
| | | { |
| | | perror("can't bind local sockaddr!"); |
| | | close(lsn_fd); |
| | | unlink(UNIX_DOMAIN); |
| | | basic_shm_close(g_memfd); |
| | | return 1; |
| | | } |
| | | |
| | | //listen lsn_fd, try listen 5 |
| | | |
| | | ret = listen(lsn_fd, LISTENQ); |
| | | if(ret == -1) |
| | | { |
| | | perror("can't listen client connect request"); |
| | | close(lsn_fd); |
| | | unlink(UNIX_DOMAIN); |
| | | basic_shm_close(g_memfd); |
| | | |
| | | return 1; |
| | | } |
| | | |
| | | clt_len = sizeof(clt_addr); |
| | | while(1) |
| | | { |
| | | int nfds = epoll_wait(epfd,events,1024,100); |
| | | int i=0; |
| | | for(i=0;i<nfds;++i) |
| | | { |
| | | if(events[i].data.fd == lsn_fd) |
| | | { |
| | | // accept the client connection |
| | | while((apt_fd=accept(lsn_fd, (struct sockaddr *)&clt_addr, &clt_len))>0) |
| | | { |
| | | setnonblocking(apt_fd); |
| | | //char *str = inet_ntoa(clt_addr.sun_path); |
| | | //printf("[SERVER] connect from %s \n", str); |
| | | ev.data.fd = apt_fd; |
| | | // monitor in message, edge trigger |
| | | ev.events = EPOLLIN | EPOLLET; |
| | | // add fd to epoll queue |
| | | if(epoll_ctl(epfd,EPOLL_CTL_ADD,apt_fd,&ev)==-1) |
| | | { |
| | | perror("epoll_ctl:add"); |
| | | continue; |
| | | //exit(EXIT_FAILURE); |
| | | } |
| | | } |
| | | if( apt_fd == -1) |
| | | { |
| | | if((errno != EAGAIN ) |
| | | && (errno!= ECONNABORTED) |
| | | && (errno != EPROTO) |
| | | && (errno != EINTR)) |
| | | { |
| | | perror("accept"); |
| | | |
| | | } |
| | | } |
| | | continue; |
| | | } |
| | | else if (events[i].events & EPOLLIN) |
| | | //write数据 |
| | | { |
| | | printf("EPOLLIN\n"); |
| | | if( (eventfd = events[i].data.fd) < 0 ) |
| | | { |
| | | continue; |
| | | } |
| | | |
| | | printf("[SERVER] put task %d to read queue\n", events[i].data.fd); |
| | | |
| | | new_task = (struct task *)malloc(sizeof(struct task)); |
| | | if(NULL == new_task) |
| | | { |
| | | goto error; |
| | | } |
| | | new_task->data.fd= eventfd; |
| | | new_task->next = NULL; |
| | | |
| | | // protect task queue (readhead/readtail) |
| | | pthread_mutex_lock(&r_mutex); |
| | | |
| | | // the queue is empty |
| | | if(readhead == NULL) |
| | | { |
| | | readhead = new_task; |
| | | readtail = new_task; |
| | | } |
| | | // queue is not empty |
| | | else |
| | | { |
| | | readtail->next = new_task; |
| | | readtail = new_task; |
| | | } |
| | | |
| | | // trigger readtask thread |
| | | pthread_cond_broadcast(&r_condl); |
| | | pthread_mutex_unlock(&r_mutex); |
| | | } |
| | | else if (events[i].events & EPOLLOUT) |
| | | { |
| | | rdata = (struct user_data *)events[i].data.ptr; |
| | | if ( NULL == rdata ) |
| | | { |
| | | continue; |
| | | } |
| | | printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd); |
| | | |
| | | new_task = malloc(sizeof(struct task)); |
| | | if(NULL == new_task) |
| | | { |
| | | goto error; |
| | | } |
| | | new_task->data.ptr = (struct user_data*)events[i].data.ptr; |
| | | new_task->next = NULL; |
| | | |
| | | pthread_mutex_lock(&w_mutex); |
| | | |
| | | // the queue is empty |
| | | if (writehead == NULL) |
| | | { |
| | | writehead = new_task; |
| | | writetail = new_task; |
| | | } |
| | | // queue is not empty |
| | | else |
| | | { |
| | | writetail->next = new_task; |
| | | writetail = new_task; |
| | | } |
| | | |
| | | // trigger writetask thread |
| | | pthread_cond_broadcast(&w_condl); |
| | | |
| | | pthread_mutex_unlock(&w_mutex); |
| | | } |
| | | else |
| | | { |
| | | printf("[SERVER] Error: unknown epoll event"); |
| | | } |
| | | } |
| | | } |
| | | |
| | | error: |
| | | close(apt_fd); |
| | | close(lsn_fd); |
| | | unlink(UNIX_DOMAIN); |
| | | basic_shm_close(g_memfd); |
| | | return 0; |
| | | } |