From 54951dfb930bea890aef14be02236f81b3a19f2e Mon Sep 17 00:00:00 2001
From: cheliequan <liequanche@126.com>
Date: 星期二, 27 十二月 2022 10:23:43 +0800
Subject: [PATCH] 更新消息通知库,使用basic_create_ipc_server函数创建消息监听服务器

---
 src/ipc_server.c |  483 +---------------------------------------------------
 1 files changed, 17 insertions(+), 466 deletions(-)

diff --git a/src/ipc_server.c b/src/ipc_server.c
index 45f6082..99418c7 100644
--- a/src/ipc_server.c
+++ b/src/ipc_server.c
@@ -1,61 +1,14 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
-#include <sys/socket.h>
-#include <sys/un.h>
 #include <unistd.h>
-#include <signal.h>
-#include <sys/epoll.h>
 #include <errno.h>
-#include <sys/syscall.h>
-#include <sys/mman.h>
-#include <sys/stat.h>
-#include <fcntl.h>
+#include <signal.h>
 #include <sys/types.h>
-#include <pthread.h>
+#include <sys/stat.h>
 
+#include "memfd.h"
 #include "ipc_msg.h"
-
-#define  MAX_EPOLL_EVENT_COUNT 1024
-#define errExit(msg)    do { perror(msg); exit(EXIT_FAILURE); \
-} while (0)
-#define LISTENQ 1024
-
-// task item in thread pool
-struct task                          
-{
-    // file descriptor or user_data
-    epoll_data_t data;        
-    // next task
-    struct task* next;               
-};
-
-struct user_data
-{
-  int fd;
-  unsigned int n_size;
-  char line[MAX_LEN];
-};
-
-//typedef int (*readfunc)(struct user_data*);
-typedef int (*writefunc)(struct user_data*);
-/* thread exec function */
-void *readtask(void *args);
-void *writetask(void *args);
-
-// mutex lock to protect readhead/readtail
-pthread_mutex_t r_mutex;             
-pthread_cond_t r_condl;
-
-// mutex lock to protect writehead/writetail
-pthread_mutex_t w_mutex;             
-pthread_cond_t w_condl;
-struct task *readhead = NULL, *readtail = NULL;
-struct task *writehead = NULL, *writetail = NULL;
-
-//create epoll
-int epfd,eventfd;
-struct epoll_event   ev,events[LISTENQ];
 
 int g_memfd = 0;
 
@@ -89,23 +42,6 @@
   return len;
 }
 
-void setnonblocking(int sock)
-{
-  int opts;
-
-  opts = fcntl(sock, F_GETFL);
-  if(opts<0)
-   {
-     perror("fcntl(sock,GETFL)");
-     exit(1); 
-   }
-
-  if(fcntl(sock, F_SETFL, opts | O_NONBLOCK)<0)
-   {
-    perror("fcntl(sock,SETFL,opts)");
-    exit(1);
-   }
-}
 
 
 int proc_memfd(struct user_data* rdata)
@@ -166,423 +102,38 @@
     
 }
 
-
-/**
- * thread exec function
- */
-void *readtask(void *args) 
-{
-  int i = 0;
-  int fd = -1;
-  unsigned int n = 0;
-
-  struct user_data *data = NULL;
-
-  while (1)
-   {
-    // protect task queue (readhead/readtail)
-     pthread_mutex_lock(&r_mutex);
-
-     while(readhead == NULL)
-      {
-       // if condl false, will unlock mutex
-       mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self());
-       pthread_cond_wait(&r_condl, &r_mutex);
-      }
-
-     fd = readhead->data.fd;
-
-      struct task *tmp = readhead;
-      readhead = readhead->next;
-      free(tmp);
-      pthread_mutex_unlock(&r_mutex);
-
-      mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd);
-
-      data = (struct user_data *)malloc(sizeof(struct user_data));
-      if(NULL == data)
-      {
-          perror("readtask malloc");
-          return;
-      }
-      
-      memset(data, 0 , sizeof(struct user_data));
-      data->fd = fd;
-
-      n=0;
-      int nread = 0;
-      while((nread = read(fd,data->line+n,MAX_LEN))>0)
-      {
-          n+=nread;
-      }
-
-      if((nread==-1) && (errno != EAGAIN))
-      {
-        perror("read error");
-      }
-
-      if(n < 0)
-      {
-        if (errno == ECONNRESET)
-          {
-            close(fd);
-            mydebug("[SERVER] Error: readline failed: %s\n", strerror(errno));           
-          }
-          else
-          {
-              mydebug("readline error \n");
-          }
-          if(data != NULL)
-          {
-              free(data);
-              data = NULL;
-          }
-        }		  
-        else if (n == 0)
-        {
-            close(fd);
-            mydebug("[SERVER] Info: client closed connection.\n");
-            if(data != NULL)
-            {
-              free(data);
-              data = NULL;
-            }
-        }
-        else
-        {
-            data->n_size = n;
-        
-            mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line);
-            if (data->line[0] != '\0')
-            {
-                // modify monitored event to EPOLLOUT,  wait next loop to send respond
-                ev.data.ptr = data;
-                // Modify event to EPOLLOUT
-                ev.events = EPOLLOUT | EPOLLET;    
-                // modify moditored fd event
-                if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1)
-                {
-                  perror("epoll_ctl:mod");
-                  if(data != NULL)
-                  {
-                    free(data);
-                    data = NULL;
-                  }                 
-                }
-            }
-          }	     
-      }
-}
-
-void *writetask(void *args)
-{
-    int n = 0;
-    int nwrite = 0;
-    int ret = 0;
-
-    // data to wirte back to client
-    struct user_data *rdata = NULL;  
-    while(1)
-    {
-        pthread_mutex_lock(&w_mutex);
-        while(writehead == NULL)
-        {
-            // if condl false, will unlock mutex
-            pthread_cond_wait(&w_condl, &w_mutex); 
-        }
-        rdata = (struct user_data*)writehead->data.ptr;
-        struct task* tmp = writehead;
-        writehead = writehead->next;
-        free(tmp);
-
-        //echo("[SERVER] thread %d writetask before unlock\n", pthread_self());
-        pthread_mutex_unlock(&w_mutex);
-        //echo("[SERVER] thread %d writetask after unlock\n", pthread_self());
-
-        if(rdata->fd < 0)
-        {
-            goto error;
-        }
-        mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd);
-
-        writefunc wfunc = (writefunc)args;
-        ret = wfunc(rdata);
-        if(ret < 0)
-        {
-          goto error;
-        }
-
-        // delete data
-        if (rdata->fd > 0)
-        {       
-          // modify monitored event to EPOLLIN, wait next loop to receive data
-          ev.data.fd = rdata->fd;
-          // monitor in message, edge trigger
-          //ev.events = EPOLLIN | EPOLLET;   
-          // modify moditored fd event
-          if(epoll_ctl(epfd, EPOLL_CTL_DEL, rdata->fd, &ev)==-1)
-          {
-              perror("epoll_ctl:del");
-          } 
-  
-          close(rdata->fd);
-        }	
-
-error:	       
-        free(rdata); 
-    }
-}
-
-
+/*define you our proc_memfd funtion to send your message to other processes锛宱r just put it in shm锛�
+ notice the file description fd and the pid of creator to other*/
 int main(int argc, char **argv)
 {
   char *name;
   ssize_t  len;
-  int lsn_fd, apt_fd;
-  struct sockaddr_un srv_addr;
-  struct sockaddr_un clt_addr;
-  socklen_t clt_len;
-  int ret;
-  int i;
-  //char recv_buf[MAX_LEN] = {0};
-  //char send_buf[MAX_LEN] = {0};
-  char line[MAX_LEN] = {0};
-  pthread_t *read_tids = NULL;
-  int read_thread_num = 2;
-  pthread_t *write_tids = NULL;   
-  int write_thread_num = 2;   
-  struct task *new_task=NULL;
-  struct user_data *rdata=NULL;
-
+  char * unixdomain = NULL;
+  int ret = 0;
 
   if (argc < 3) {
-    fprintf(stderr, "%s name size\n", argv[0]);
+    fprintf(stderr, "%s name size [unix domain path]\n", argv[0]);
     exit(EXIT_FAILURE);
   }
 
   name = argv[1];
   len = atoi(argv[2]) * 1024LU * 1024LU * 1024LU;
-  signal(SIGTERM,handler);
+  unixdomain = argv[3];
 
+  signal(SIGTERM,handler);
+  signal(SIGABRT,handler);
+  signal(SIGSEGV,handler);
+  
+ 
   g_memfd = basic_shm_create(name, len);
   ret = shm_write(g_memfd);
 
-
-	if((read_tids = (pthread_t *)malloc(read_thread_num * sizeof(pthread_t))) == NULL)
-	{
-		perror("read_tids malloc fail\n");
-    return -1;    
-	}
-
-  // threads for reading thread pool  
-  for(i = 0; i < read_thread_num; i++)
+  ret = basic_create_ipc_server(unixdomain);
+  if(ret < 0)
   {
-    pthread_create(&read_tids[i], NULL, readtask, NULL);
+     printf("failed to create ipc_server\n");      
   }
 
-	if((write_tids = (pthread_t *)malloc(write_thread_num * sizeof(pthread_t))) == NULL)
-	{
-		perror("write_tids  malloc fail\n");
-    return -1;
-	}
-
-  // threads for writing thread pool  
-  for(i = 0; i < write_thread_num; i++)
-  {
-    pthread_create(&write_tids[i], NULL, writetask, proc_memfd);
-  }
-
-  epfd = epoll_create(MAX_EPOLL_EVENT_COUNT);
-
-  //create socket to bind local IP and PORT
-  lsn_fd = socket(AF_UNIX, SOCK_STREAM, 0);
-
-  int opt = 0x1;
-  setsockopt(lsn_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
-
-  // set the descriptor as non-blocking
-  setnonblocking(lsn_fd);
-
-  ev.data.fd = lsn_fd;
-  ev.events = EPOLLIN|EPOLLET;
-
-  /*Control an epoll descriptor, $epfd, by requesting the operation op be performed on the target file descriptor, fd.
-
-  $epfd is an epoll descriptor returned from epoll_create.
-  $op is one of EPOLL_CTL_ADD, EPOLL_CTL_MOD or EPOLL_CTL_DEL.
-  $fd is the file desciptor to be watched.
-  $eventmask is a bitmask of events defined by EPOLLIN, EPOLLOUT, etc.
-
-  When successful, epoll_ctl returns 0. When an error occurs, epoll_ctl returns -1 and errno is set appropriately.
-  */
-  if(epoll_ctl(epfd,EPOLL_CTL_ADD,lsn_fd,&ev)==-1)
-  {
-      perror("epoll_ctl:add");               
-  } 
-  
-  if(lsn_fd < 0)
-  {
-    perror("can't create communication socket!");
-    unlink(UNIX_DOMAIN);
-    basic_shm_close(g_memfd);      
-    return 1;
-  }
-
-  //create local IP and PORT
-  srv_addr.sun_family = AF_UNIX;
-  strncpy(srv_addr.sun_path, UNIX_DOMAIN, sizeof(srv_addr.sun_path) - 1);
-  //unlink(UNIX_DOMAIN);
-
-  //bind sockfd and sockaddr
-  ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
-  if(ret == -1)
-  {
-    perror("can't bind local sockaddr!");
-    close(lsn_fd);
-    unlink(UNIX_DOMAIN);
-    basic_shm_close(g_memfd);  
-    return 1;
-  }
-
-  //listen lsn_fd, try listen 5
-
-  ret = listen(lsn_fd, LISTENQ);
-  if(ret == -1)
-  {
-    perror("can't listen client connect request");
-    close(lsn_fd);
-    unlink(UNIX_DOMAIN);
-    basic_shm_close(g_memfd);  
-
-    return 1;
-  }
-
-  clt_len = sizeof(clt_addr);
-  while(1)
-  {
-    int nfds = epoll_wait(epfd,events,1024,100);
-    int i=0;
-    for(i=0;i<nfds;++i)
-    {
-      if(events[i].data.fd == lsn_fd)
-      {
-          // accept the client connection
-			    while((apt_fd=accept(lsn_fd, (struct sockaddr *)&clt_addr, &clt_len))>0)
-			    {
-            setnonblocking(apt_fd);
-            //char *str = inet_ntoa(clt_addr.sun_path);
-            //printf("[SERVER] connect from %s \n", str);
-            ev.data.fd = apt_fd; 				       
-            // monitor in message, edge trigger
-            ev.events = EPOLLIN | EPOLLET; 				       
-            // add fd to epoll queue
-            if(epoll_ctl(epfd,EPOLL_CTL_ADD,apt_fd,&ev)==-1)
-            {
-                perror("epoll_ctl:add");
-                continue;
-                //exit(EXIT_FAILURE);
-            }
-        	}
-    	 		if( apt_fd == -1)
-    			{
-    	 			if((errno != EAGAIN ) 
-    					&& (errno!= ECONNABORTED)
-    	 				&& (errno != EPROTO)
-    	 				&& (errno != EINTR))
-    	 			{
-    					perror("accept");
-    	 				
-    	 			}
-    	 		}
-	 		    continue;
-      }
-      else if (events[i].events & EPOLLIN)
-      //write鏁版嵁
-      {
-        printf("EPOLLIN\n");
-        if( (eventfd = events[i].data.fd) < 0 )
-        {
-          continue;
-        }
-
-          printf("[SERVER] put task %d to read queue\n", events[i].data.fd);
-
-          new_task = (struct task *)malloc(sizeof(struct task));
-          if(NULL == new_task)
-          {
-            goto error;
-          }          
-          new_task->data.fd= eventfd;
-          new_task->next = NULL;
-
-          // protect task queue (readhead/readtail)
-          pthread_mutex_lock(&r_mutex); 
-
-          // the queue is empty
-          if(readhead == NULL)
-          {
-             readhead = new_task;
-             readtail = new_task;
-          }
-          // queue is not empty
-          else
-          {
-             readtail->next = new_task;
-             readtail = new_task;
-          }
-
-           // trigger readtask thread
-           pthread_cond_broadcast(&r_condl);
-           pthread_mutex_unlock(&r_mutex);
-      }
-      else if (events[i].events & EPOLLOUT)
-      {
-          rdata = (struct user_data *)events[i].data.ptr;
-          if ( NULL == rdata )
-          {
-              continue; 
-          }  
-          printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd);
-
-          new_task = malloc(sizeof(struct task));
-          if(NULL == new_task)
-          {
-            goto error;
-          }
-          new_task->data.ptr = (struct user_data*)events[i].data.ptr;
-          new_task->next = NULL;
-
-          pthread_mutex_lock(&w_mutex);
-
-          // the queue is empty
-          if (writehead == NULL)
-          {
-              writehead = new_task;
-              writetail = new_task;
-          }
-          // queue is not empty
-          else                 
-          {
-              writetail->next = new_task;
-              writetail = new_task;
-          }
-
-          // trigger writetask thread
-          pthread_cond_broadcast(&w_condl);  
-
-          pthread_mutex_unlock(&w_mutex);
-      }
-      else
-      {
-        printf("[SERVER] Error: unknown epoll event");
-      }
-    }
-  }
-
-error:
-  close(apt_fd);
-  close(lsn_fd);
-  unlink(UNIX_DOMAIN);
   basic_shm_close(g_memfd);  
   return 0;
 }

--
Gitblit v1.8.0