From 54951dfb930bea890aef14be02236f81b3a19f2e Mon Sep 17 00:00:00 2001 From: cheliequan <liequanche@126.com> Date: 星期二, 27 十二月 2022 10:23:43 +0800 Subject: [PATCH] 更新消息通知库,使用basic_create_ipc_server函数创建消息监听服务器 --- src/ipc_server.c | 483 +--------------------------------------------------- 1 files changed, 17 insertions(+), 466 deletions(-) diff --git a/src/ipc_server.c b/src/ipc_server.c index 45f6082..99418c7 100644 --- a/src/ipc_server.c +++ b/src/ipc_server.c @@ -1,61 +1,14 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> -#include <sys/socket.h> -#include <sys/un.h> #include <unistd.h> -#include <signal.h> -#include <sys/epoll.h> #include <errno.h> -#include <sys/syscall.h> -#include <sys/mman.h> -#include <sys/stat.h> -#include <fcntl.h> +#include <signal.h> #include <sys/types.h> -#include <pthread.h> +#include <sys/stat.h> +#include "memfd.h" #include "ipc_msg.h" - -#define MAX_EPOLL_EVENT_COUNT 1024 -#define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \ -} while (0) -#define LISTENQ 1024 - -// task item in thread pool -struct task -{ - // file descriptor or user_data - epoll_data_t data; - // next task - struct task* next; -}; - -struct user_data -{ - int fd; - unsigned int n_size; - char line[MAX_LEN]; -}; - -//typedef int (*readfunc)(struct user_data*); -typedef int (*writefunc)(struct user_data*); -/* thread exec function */ -void *readtask(void *args); -void *writetask(void *args); - -// mutex lock to protect readhead/readtail -pthread_mutex_t r_mutex; -pthread_cond_t r_condl; - -// mutex lock to protect writehead/writetail -pthread_mutex_t w_mutex; -pthread_cond_t w_condl; -struct task *readhead = NULL, *readtail = NULL; -struct task *writehead = NULL, *writetail = NULL; - -//create epoll -int epfd,eventfd; -struct epoll_event ev,events[LISTENQ]; int g_memfd = 0; @@ -89,23 +42,6 @@ return len; } -void setnonblocking(int sock) -{ - int opts; - - opts = fcntl(sock, F_GETFL); - if(opts<0) - { - perror("fcntl(sock,GETFL)"); - exit(1); - } - - if(fcntl(sock, F_SETFL, opts | O_NONBLOCK)<0) - { - perror("fcntl(sock,SETFL,opts)"); - exit(1); - } -} int proc_memfd(struct user_data* rdata) @@ -166,423 +102,38 @@ } - -/** - * thread exec function - */ -void *readtask(void *args) -{ - int i = 0; - int fd = -1; - unsigned int n = 0; - - struct user_data *data = NULL; - - while (1) - { - // protect task queue (readhead/readtail) - pthread_mutex_lock(&r_mutex); - - while(readhead == NULL) - { - // if condl false, will unlock mutex - mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self()); - pthread_cond_wait(&r_condl, &r_mutex); - } - - fd = readhead->data.fd; - - struct task *tmp = readhead; - readhead = readhead->next; - free(tmp); - pthread_mutex_unlock(&r_mutex); - - mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd); - - data = (struct user_data *)malloc(sizeof(struct user_data)); - if(NULL == data) - { - perror("readtask malloc"); - return; - } - - memset(data, 0 , sizeof(struct user_data)); - data->fd = fd; - - n=0; - int nread = 0; - while((nread = read(fd,data->line+n,MAX_LEN))>0) - { - n+=nread; - } - - if((nread==-1) && (errno != EAGAIN)) - { - perror("read error"); - } - - if(n < 0) - { - if (errno == ECONNRESET) - { - close(fd); - mydebug("[SERVER] Error: readline failed: %s\n", strerror(errno)); - } - else - { - mydebug("readline error \n"); - } - if(data != NULL) - { - free(data); - data = NULL; - } - } - else if (n == 0) - { - close(fd); - mydebug("[SERVER] Info: client closed connection.\n"); - if(data != NULL) - { - free(data); - data = NULL; - } - } - else - { - data->n_size = n; - - mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line); - if (data->line[0] != '\0') - { - // modify monitored event to EPOLLOUT, wait next loop to send respond - ev.data.ptr = data; - // Modify event to EPOLLOUT - ev.events = EPOLLOUT | EPOLLET; - // modify moditored fd event - if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1) - { - perror("epoll_ctl:mod"); - if(data != NULL) - { - free(data); - data = NULL; - } - } - } - } - } -} - -void *writetask(void *args) -{ - int n = 0; - int nwrite = 0; - int ret = 0; - - // data to wirte back to client - struct user_data *rdata = NULL; - while(1) - { - pthread_mutex_lock(&w_mutex); - while(writehead == NULL) - { - // if condl false, will unlock mutex - pthread_cond_wait(&w_condl, &w_mutex); - } - rdata = (struct user_data*)writehead->data.ptr; - struct task* tmp = writehead; - writehead = writehead->next; - free(tmp); - - //echo("[SERVER] thread %d writetask before unlock\n", pthread_self()); - pthread_mutex_unlock(&w_mutex); - //echo("[SERVER] thread %d writetask after unlock\n", pthread_self()); - - if(rdata->fd < 0) - { - goto error; - } - mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd); - - writefunc wfunc = (writefunc)args; - ret = wfunc(rdata); - if(ret < 0) - { - goto error; - } - - // delete data - if (rdata->fd > 0) - { - // modify monitored event to EPOLLIN, wait next loop to receive data - ev.data.fd = rdata->fd; - // monitor in message, edge trigger - //ev.events = EPOLLIN | EPOLLET; - // modify moditored fd event - if(epoll_ctl(epfd, EPOLL_CTL_DEL, rdata->fd, &ev)==-1) - { - perror("epoll_ctl:del"); - } - - close(rdata->fd); - } - -error: - free(rdata); - } -} - - +/*define you our proc_memfd funtion to send your message to other processes锛宱r just put it in shm锛� + notice the file description fd and the pid of creator to other*/ int main(int argc, char **argv) { char *name; ssize_t len; - int lsn_fd, apt_fd; - struct sockaddr_un srv_addr; - struct sockaddr_un clt_addr; - socklen_t clt_len; - int ret; - int i; - //char recv_buf[MAX_LEN] = {0}; - //char send_buf[MAX_LEN] = {0}; - char line[MAX_LEN] = {0}; - pthread_t *read_tids = NULL; - int read_thread_num = 2; - pthread_t *write_tids = NULL; - int write_thread_num = 2; - struct task *new_task=NULL; - struct user_data *rdata=NULL; - + char * unixdomain = NULL; + int ret = 0; if (argc < 3) { - fprintf(stderr, "%s name size\n", argv[0]); + fprintf(stderr, "%s name size [unix domain path]\n", argv[0]); exit(EXIT_FAILURE); } name = argv[1]; len = atoi(argv[2]) * 1024LU * 1024LU * 1024LU; - signal(SIGTERM,handler); + unixdomain = argv[3]; + signal(SIGTERM,handler); + signal(SIGABRT,handler); + signal(SIGSEGV,handler); + + g_memfd = basic_shm_create(name, len); ret = shm_write(g_memfd); - - if((read_tids = (pthread_t *)malloc(read_thread_num * sizeof(pthread_t))) == NULL) - { - perror("read_tids malloc fail\n"); - return -1; - } - - // threads for reading thread pool - for(i = 0; i < read_thread_num; i++) + ret = basic_create_ipc_server(unixdomain); + if(ret < 0) { - pthread_create(&read_tids[i], NULL, readtask, NULL); + printf("failed to create ipc_server\n"); } - if((write_tids = (pthread_t *)malloc(write_thread_num * sizeof(pthread_t))) == NULL) - { - perror("write_tids malloc fail\n"); - return -1; - } - - // threads for writing thread pool - for(i = 0; i < write_thread_num; i++) - { - pthread_create(&write_tids[i], NULL, writetask, proc_memfd); - } - - epfd = epoll_create(MAX_EPOLL_EVENT_COUNT); - - //create socket to bind local IP and PORT - lsn_fd = socket(AF_UNIX, SOCK_STREAM, 0); - - int opt = 0x1; - setsockopt(lsn_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); - - // set the descriptor as non-blocking - setnonblocking(lsn_fd); - - ev.data.fd = lsn_fd; - ev.events = EPOLLIN|EPOLLET; - - /*Control an epoll descriptor, $epfd, by requesting the operation op be performed on the target file descriptor, fd. - - $epfd is an epoll descriptor returned from epoll_create. - $op is one of EPOLL_CTL_ADD, EPOLL_CTL_MOD or EPOLL_CTL_DEL. - $fd is the file desciptor to be watched. - $eventmask is a bitmask of events defined by EPOLLIN, EPOLLOUT, etc. - - When successful, epoll_ctl returns 0. When an error occurs, epoll_ctl returns -1 and errno is set appropriately. - */ - if(epoll_ctl(epfd,EPOLL_CTL_ADD,lsn_fd,&ev)==-1) - { - perror("epoll_ctl:add"); - } - - if(lsn_fd < 0) - { - perror("can't create communication socket!"); - unlink(UNIX_DOMAIN); - basic_shm_close(g_memfd); - return 1; - } - - //create local IP and PORT - srv_addr.sun_family = AF_UNIX; - strncpy(srv_addr.sun_path, UNIX_DOMAIN, sizeof(srv_addr.sun_path) - 1); - //unlink(UNIX_DOMAIN); - - //bind sockfd and sockaddr - ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr)); - if(ret == -1) - { - perror("can't bind local sockaddr!"); - close(lsn_fd); - unlink(UNIX_DOMAIN); - basic_shm_close(g_memfd); - return 1; - } - - //listen lsn_fd, try listen 5 - - ret = listen(lsn_fd, LISTENQ); - if(ret == -1) - { - perror("can't listen client connect request"); - close(lsn_fd); - unlink(UNIX_DOMAIN); - basic_shm_close(g_memfd); - - return 1; - } - - clt_len = sizeof(clt_addr); - while(1) - { - int nfds = epoll_wait(epfd,events,1024,100); - int i=0; - for(i=0;i<nfds;++i) - { - if(events[i].data.fd == lsn_fd) - { - // accept the client connection - while((apt_fd=accept(lsn_fd, (struct sockaddr *)&clt_addr, &clt_len))>0) - { - setnonblocking(apt_fd); - //char *str = inet_ntoa(clt_addr.sun_path); - //printf("[SERVER] connect from %s \n", str); - ev.data.fd = apt_fd; - // monitor in message, edge trigger - ev.events = EPOLLIN | EPOLLET; - // add fd to epoll queue - if(epoll_ctl(epfd,EPOLL_CTL_ADD,apt_fd,&ev)==-1) - { - perror("epoll_ctl:add"); - continue; - //exit(EXIT_FAILURE); - } - } - if( apt_fd == -1) - { - if((errno != EAGAIN ) - && (errno!= ECONNABORTED) - && (errno != EPROTO) - && (errno != EINTR)) - { - perror("accept"); - - } - } - continue; - } - else if (events[i].events & EPOLLIN) - //write鏁版嵁 - { - printf("EPOLLIN\n"); - if( (eventfd = events[i].data.fd) < 0 ) - { - continue; - } - - printf("[SERVER] put task %d to read queue\n", events[i].data.fd); - - new_task = (struct task *)malloc(sizeof(struct task)); - if(NULL == new_task) - { - goto error; - } - new_task->data.fd= eventfd; - new_task->next = NULL; - - // protect task queue (readhead/readtail) - pthread_mutex_lock(&r_mutex); - - // the queue is empty - if(readhead == NULL) - { - readhead = new_task; - readtail = new_task; - } - // queue is not empty - else - { - readtail->next = new_task; - readtail = new_task; - } - - // trigger readtask thread - pthread_cond_broadcast(&r_condl); - pthread_mutex_unlock(&r_mutex); - } - else if (events[i].events & EPOLLOUT) - { - rdata = (struct user_data *)events[i].data.ptr; - if ( NULL == rdata ) - { - continue; - } - printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd); - - new_task = malloc(sizeof(struct task)); - if(NULL == new_task) - { - goto error; - } - new_task->data.ptr = (struct user_data*)events[i].data.ptr; - new_task->next = NULL; - - pthread_mutex_lock(&w_mutex); - - // the queue is empty - if (writehead == NULL) - { - writehead = new_task; - writetail = new_task; - } - // queue is not empty - else - { - writetail->next = new_task; - writetail = new_task; - } - - // trigger writetask thread - pthread_cond_broadcast(&w_condl); - - pthread_mutex_unlock(&w_mutex); - } - else - { - printf("[SERVER] Error: unknown epoll event"); - } - } - } - -error: - close(apt_fd); - close(lsn_fd); - unlink(UNIX_DOMAIN); basic_shm_close(g_memfd); return 0; } -- Gitblit v1.8.0