From 54951dfb930bea890aef14be02236f81b3a19f2e Mon Sep 17 00:00:00 2001
From: cheliequan <liequanche@126.com>
Date: 星期二, 27 十二月 2022 10:23:43 +0800
Subject: [PATCH] 更新消息通知库,使用basic_create_ipc_server函数创建消息监听服务器

---
 src/memfd.c          |    1 
 src/ipc_msg.h        |   21 +
 src/ipc_server_lib.c |  466 +++++++++++++++++++++++++++
 src/ipc_msg.c        |    4 
 src/Makefile         |    6 
 src/ipc_server.c     |  483 +---------------------------
 6 files changed, 508 insertions(+), 473 deletions(-)

diff --git a/src/Makefile b/src/Makefile
index 772b875..d870100 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -17,8 +17,10 @@
 	$(CC) $(CFLAGS) $(INCLUDE) -fPIC -c shmht_mytests.c
 memfd.o:
 	$(CC) $(CFLAGS) $(INCLUDE) -fPIC -c memfd.c
-ipc_server:
-	$(CC) $(CFLAGS) $(INCLUDE) -fPIC -lpthread -o $@ ipc_server.c ipc_msg.c memfd.c
+ipc_server:libipc_server.so
+	$(CC) $(CFLAGS) $(INCLUDE) -fPIC -pthread -o $@ ipc_server.c -lipc_server -L .
+libipc_server.so:
+	$(CC) -shared -fPIC -o $@ ipc_server_lib.c ipc_msg.c memfd.c
 ipc_client:
 	$(CC) $(CFLAGS) $(INCLUDE) -fPIC -o $@ ipc_client.c ipc_msg.c memfd.c
 clean:
diff --git a/src/ipc_msg.c b/src/ipc_msg.c
index 0530fd4..804eafa 100644
--- a/src/ipc_msg.c
+++ b/src/ipc_msg.c
@@ -57,7 +57,7 @@
 int  send_fd_sendmsg(int  fd, memfd_data_st** ppmemfd_data)
 {
   memfd_data_st *ptr_memfd_data = *ppmemfd_data; 
-  return send_fd_args(fd, ptr_memfd_data->memfd, ptr_memfd_data->pid, &ptr_memfd_data->data, ptr_memfd_data->len);
+  return send_fd_args_sendmsg(fd, ptr_memfd_data->memfd, ptr_memfd_data->pid, &ptr_memfd_data->data, ptr_memfd_data->len);
 }
 
 /**
@@ -102,4 +102,4 @@
   memset(*ppmemfd_data, 0, control_len);
   memcpy(*ppmemfd_data, CMSG_DATA(&cm),  control_len);
   return ret;
-}
\ No newline at end of file
+}
diff --git a/src/ipc_msg.h b/src/ipc_msg.h
index e7ad225..5b489c3 100644
--- a/src/ipc_msg.h
+++ b/src/ipc_msg.h
@@ -11,6 +11,8 @@
 
 
 #define UNIX_DOMAIN "/tmp/UNIX.domain"
+#define errExit(msg)    do { perror(msg); exit(EXIT_FAILURE); \
+} while (0)
 
 #define MAX_LEN  4096
 #define HELLO_MSG "hello_basic"
@@ -26,20 +28,33 @@
   int len;
 } memfd_data_st;
 
-int  send_fd(int  fd, memfd_data_st** ppmemfd_data);
+struct user_data
+{
+  int fd;
+  unsigned int n_size;
+  char line[MAX_LEN];
+};
+
+#define READ_THREAD_NUM  2
+#define WRITE_THREAD_NUM 2
+
+int proc_memfd(struct user_data* rdata);
+int basic_create_ipc_server(char * unix_domain_path);
+
+int  send_fd_sendmsg(int  fd, memfd_data_st** ppmemfd_data);
 
 /**
  * @brief 鍙戦�佺洰鏍囨枃浠舵弿杩扮
  * @param fd		浼犻�掍俊鎭殑 UNIX 鍩� 鏂囦欢鎻忚堪绗�
  * @param fd_to_send	寰呭彂閫佺殑鏂囦欢鎻忚堪绗�
  */
-int  send_fd_args(int  fd, int fd_to_send, pid_t pid, void **ppdata, int len);
+int  send_fd_args_sendmsg(int  fd, int fd_to_send, pid_t pid, void **ppdata, int len);
 
 /**
  * @brief 鎺ュ彈鏂囦欢鎻忚堪绗�
  * @param fd 浼犻�掍俊鎭殑 UNIX 鍩� 鏂囦欢鎻忚堪绗�
  */
-int recv_fd(int  fd, memfd_data_st** ppmemfd_data);
+int recv_fd_recvmsg(int  fd, memfd_data_st** ppmemfd_data);
 
 #ifdef __cplusplus 
 } 
diff --git a/src/ipc_server.c b/src/ipc_server.c
index 45f6082..99418c7 100644
--- a/src/ipc_server.c
+++ b/src/ipc_server.c
@@ -1,61 +1,14 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
-#include <sys/socket.h>
-#include <sys/un.h>
 #include <unistd.h>
-#include <signal.h>
-#include <sys/epoll.h>
 #include <errno.h>
-#include <sys/syscall.h>
-#include <sys/mman.h>
-#include <sys/stat.h>
-#include <fcntl.h>
+#include <signal.h>
 #include <sys/types.h>
-#include <pthread.h>
+#include <sys/stat.h>
 
+#include "memfd.h"
 #include "ipc_msg.h"
-
-#define  MAX_EPOLL_EVENT_COUNT 1024
-#define errExit(msg)    do { perror(msg); exit(EXIT_FAILURE); \
-} while (0)
-#define LISTENQ 1024
-
-// task item in thread pool
-struct task                          
-{
-    // file descriptor or user_data
-    epoll_data_t data;        
-    // next task
-    struct task* next;               
-};
-
-struct user_data
-{
-  int fd;
-  unsigned int n_size;
-  char line[MAX_LEN];
-};
-
-//typedef int (*readfunc)(struct user_data*);
-typedef int (*writefunc)(struct user_data*);
-/* thread exec function */
-void *readtask(void *args);
-void *writetask(void *args);
-
-// mutex lock to protect readhead/readtail
-pthread_mutex_t r_mutex;             
-pthread_cond_t r_condl;
-
-// mutex lock to protect writehead/writetail
-pthread_mutex_t w_mutex;             
-pthread_cond_t w_condl;
-struct task *readhead = NULL, *readtail = NULL;
-struct task *writehead = NULL, *writetail = NULL;
-
-//create epoll
-int epfd,eventfd;
-struct epoll_event   ev,events[LISTENQ];
 
 int g_memfd = 0;
 
@@ -89,23 +42,6 @@
   return len;
 }
 
-void setnonblocking(int sock)
-{
-  int opts;
-
-  opts = fcntl(sock, F_GETFL);
-  if(opts<0)
-   {
-     perror("fcntl(sock,GETFL)");
-     exit(1); 
-   }
-
-  if(fcntl(sock, F_SETFL, opts | O_NONBLOCK)<0)
-   {
-    perror("fcntl(sock,SETFL,opts)");
-    exit(1);
-   }
-}
 
 
 int proc_memfd(struct user_data* rdata)
@@ -166,423 +102,38 @@
     
 }
 
-
-/**
- * thread exec function
- */
-void *readtask(void *args) 
-{
-  int i = 0;
-  int fd = -1;
-  unsigned int n = 0;
-
-  struct user_data *data = NULL;
-
-  while (1)
-   {
-    // protect task queue (readhead/readtail)
-     pthread_mutex_lock(&r_mutex);
-
-     while(readhead == NULL)
-      {
-       // if condl false, will unlock mutex
-       mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self());
-       pthread_cond_wait(&r_condl, &r_mutex);
-      }
-
-     fd = readhead->data.fd;
-
-      struct task *tmp = readhead;
-      readhead = readhead->next;
-      free(tmp);
-      pthread_mutex_unlock(&r_mutex);
-
-      mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd);
-
-      data = (struct user_data *)malloc(sizeof(struct user_data));
-      if(NULL == data)
-      {
-          perror("readtask malloc");
-          return;
-      }
-      
-      memset(data, 0 , sizeof(struct user_data));
-      data->fd = fd;
-
-      n=0;
-      int nread = 0;
-      while((nread = read(fd,data->line+n,MAX_LEN))>0)
-      {
-          n+=nread;
-      }
-
-      if((nread==-1) && (errno != EAGAIN))
-      {
-        perror("read error");
-      }
-
-      if(n < 0)
-      {
-        if (errno == ECONNRESET)
-          {
-            close(fd);
-            mydebug("[SERVER] Error: readline failed: %s\n", strerror(errno));           
-          }
-          else
-          {
-              mydebug("readline error \n");
-          }
-          if(data != NULL)
-          {
-              free(data);
-              data = NULL;
-          }
-        }		  
-        else if (n == 0)
-        {
-            close(fd);
-            mydebug("[SERVER] Info: client closed connection.\n");
-            if(data != NULL)
-            {
-              free(data);
-              data = NULL;
-            }
-        }
-        else
-        {
-            data->n_size = n;
-        
-            mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line);
-            if (data->line[0] != '\0')
-            {
-                // modify monitored event to EPOLLOUT,  wait next loop to send respond
-                ev.data.ptr = data;
-                // Modify event to EPOLLOUT
-                ev.events = EPOLLOUT | EPOLLET;    
-                // modify moditored fd event
-                if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1)
-                {
-                  perror("epoll_ctl:mod");
-                  if(data != NULL)
-                  {
-                    free(data);
-                    data = NULL;
-                  }                 
-                }
-            }
-          }	     
-      }
-}
-
-void *writetask(void *args)
-{
-    int n = 0;
-    int nwrite = 0;
-    int ret = 0;
-
-    // data to wirte back to client
-    struct user_data *rdata = NULL;  
-    while(1)
-    {
-        pthread_mutex_lock(&w_mutex);
-        while(writehead == NULL)
-        {
-            // if condl false, will unlock mutex
-            pthread_cond_wait(&w_condl, &w_mutex); 
-        }
-        rdata = (struct user_data*)writehead->data.ptr;
-        struct task* tmp = writehead;
-        writehead = writehead->next;
-        free(tmp);
-
-        //echo("[SERVER] thread %d writetask before unlock\n", pthread_self());
-        pthread_mutex_unlock(&w_mutex);
-        //echo("[SERVER] thread %d writetask after unlock\n", pthread_self());
-
-        if(rdata->fd < 0)
-        {
-            goto error;
-        }
-        mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd);
-
-        writefunc wfunc = (writefunc)args;
-        ret = wfunc(rdata);
-        if(ret < 0)
-        {
-          goto error;
-        }
-
-        // delete data
-        if (rdata->fd > 0)
-        {       
-          // modify monitored event to EPOLLIN, wait next loop to receive data
-          ev.data.fd = rdata->fd;
-          // monitor in message, edge trigger
-          //ev.events = EPOLLIN | EPOLLET;   
-          // modify moditored fd event
-          if(epoll_ctl(epfd, EPOLL_CTL_DEL, rdata->fd, &ev)==-1)
-          {
-              perror("epoll_ctl:del");
-          } 
-  
-          close(rdata->fd);
-        }	
-
-error:	       
-        free(rdata); 
-    }
-}
-
-
+/*define you our proc_memfd funtion to send your message to other processes锛宱r just put it in shm锛�
+ notice the file description fd and the pid of creator to other*/
 int main(int argc, char **argv)
 {
   char *name;
   ssize_t  len;
-  int lsn_fd, apt_fd;
-  struct sockaddr_un srv_addr;
-  struct sockaddr_un clt_addr;
-  socklen_t clt_len;
-  int ret;
-  int i;
-  //char recv_buf[MAX_LEN] = {0};
-  //char send_buf[MAX_LEN] = {0};
-  char line[MAX_LEN] = {0};
-  pthread_t *read_tids = NULL;
-  int read_thread_num = 2;
-  pthread_t *write_tids = NULL;   
-  int write_thread_num = 2;   
-  struct task *new_task=NULL;
-  struct user_data *rdata=NULL;
-
+  char * unixdomain = NULL;
+  int ret = 0;
 
   if (argc < 3) {
-    fprintf(stderr, "%s name size\n", argv[0]);
+    fprintf(stderr, "%s name size [unix domain path]\n", argv[0]);
     exit(EXIT_FAILURE);
   }
 
   name = argv[1];
   len = atoi(argv[2]) * 1024LU * 1024LU * 1024LU;
-  signal(SIGTERM,handler);
+  unixdomain = argv[3];
 
+  signal(SIGTERM,handler);
+  signal(SIGABRT,handler);
+  signal(SIGSEGV,handler);
+  
+ 
   g_memfd = basic_shm_create(name, len);
   ret = shm_write(g_memfd);
 
-
-	if((read_tids = (pthread_t *)malloc(read_thread_num * sizeof(pthread_t))) == NULL)
-	{
-		perror("read_tids malloc fail\n");
-    return -1;    
-	}
-
-  // threads for reading thread pool  
-  for(i = 0; i < read_thread_num; i++)
+  ret = basic_create_ipc_server(unixdomain);
+  if(ret < 0)
   {
-    pthread_create(&read_tids[i], NULL, readtask, NULL);
+     printf("failed to create ipc_server\n");      
   }
 
-	if((write_tids = (pthread_t *)malloc(write_thread_num * sizeof(pthread_t))) == NULL)
-	{
-		perror("write_tids  malloc fail\n");
-    return -1;
-	}
-
-  // threads for writing thread pool  
-  for(i = 0; i < write_thread_num; i++)
-  {
-    pthread_create(&write_tids[i], NULL, writetask, proc_memfd);
-  }
-
-  epfd = epoll_create(MAX_EPOLL_EVENT_COUNT);
-
-  //create socket to bind local IP and PORT
-  lsn_fd = socket(AF_UNIX, SOCK_STREAM, 0);
-
-  int opt = 0x1;
-  setsockopt(lsn_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
-
-  // set the descriptor as non-blocking
-  setnonblocking(lsn_fd);
-
-  ev.data.fd = lsn_fd;
-  ev.events = EPOLLIN|EPOLLET;
-
-  /*Control an epoll descriptor, $epfd, by requesting the operation op be performed on the target file descriptor, fd.
-
-  $epfd is an epoll descriptor returned from epoll_create.
-  $op is one of EPOLL_CTL_ADD, EPOLL_CTL_MOD or EPOLL_CTL_DEL.
-  $fd is the file desciptor to be watched.
-  $eventmask is a bitmask of events defined by EPOLLIN, EPOLLOUT, etc.
-
-  When successful, epoll_ctl returns 0. When an error occurs, epoll_ctl returns -1 and errno is set appropriately.
-  */
-  if(epoll_ctl(epfd,EPOLL_CTL_ADD,lsn_fd,&ev)==-1)
-  {
-      perror("epoll_ctl:add");               
-  } 
-  
-  if(lsn_fd < 0)
-  {
-    perror("can't create communication socket!");
-    unlink(UNIX_DOMAIN);
-    basic_shm_close(g_memfd);      
-    return 1;
-  }
-
-  //create local IP and PORT
-  srv_addr.sun_family = AF_UNIX;
-  strncpy(srv_addr.sun_path, UNIX_DOMAIN, sizeof(srv_addr.sun_path) - 1);
-  //unlink(UNIX_DOMAIN);
-
-  //bind sockfd and sockaddr
-  ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
-  if(ret == -1)
-  {
-    perror("can't bind local sockaddr!");
-    close(lsn_fd);
-    unlink(UNIX_DOMAIN);
-    basic_shm_close(g_memfd);  
-    return 1;
-  }
-
-  //listen lsn_fd, try listen 5
-
-  ret = listen(lsn_fd, LISTENQ);
-  if(ret == -1)
-  {
-    perror("can't listen client connect request");
-    close(lsn_fd);
-    unlink(UNIX_DOMAIN);
-    basic_shm_close(g_memfd);  
-
-    return 1;
-  }
-
-  clt_len = sizeof(clt_addr);
-  while(1)
-  {
-    int nfds = epoll_wait(epfd,events,1024,100);
-    int i=0;
-    for(i=0;i<nfds;++i)
-    {
-      if(events[i].data.fd == lsn_fd)
-      {
-          // accept the client connection
-			    while((apt_fd=accept(lsn_fd, (struct sockaddr *)&clt_addr, &clt_len))>0)
-			    {
-            setnonblocking(apt_fd);
-            //char *str = inet_ntoa(clt_addr.sun_path);
-            //printf("[SERVER] connect from %s \n", str);
-            ev.data.fd = apt_fd; 				       
-            // monitor in message, edge trigger
-            ev.events = EPOLLIN | EPOLLET; 				       
-            // add fd to epoll queue
-            if(epoll_ctl(epfd,EPOLL_CTL_ADD,apt_fd,&ev)==-1)
-            {
-                perror("epoll_ctl:add");
-                continue;
-                //exit(EXIT_FAILURE);
-            }
-        	}
-    	 		if( apt_fd == -1)
-    			{
-    	 			if((errno != EAGAIN ) 
-    					&& (errno!= ECONNABORTED)
-    	 				&& (errno != EPROTO)
-    	 				&& (errno != EINTR))
-    	 			{
-    					perror("accept");
-    	 				
-    	 			}
-    	 		}
-	 		    continue;
-      }
-      else if (events[i].events & EPOLLIN)
-      //write鏁版嵁
-      {
-        printf("EPOLLIN\n");
-        if( (eventfd = events[i].data.fd) < 0 )
-        {
-          continue;
-        }
-
-          printf("[SERVER] put task %d to read queue\n", events[i].data.fd);
-
-          new_task = (struct task *)malloc(sizeof(struct task));
-          if(NULL == new_task)
-          {
-            goto error;
-          }          
-          new_task->data.fd= eventfd;
-          new_task->next = NULL;
-
-          // protect task queue (readhead/readtail)
-          pthread_mutex_lock(&r_mutex); 
-
-          // the queue is empty
-          if(readhead == NULL)
-          {
-             readhead = new_task;
-             readtail = new_task;
-          }
-          // queue is not empty
-          else
-          {
-             readtail->next = new_task;
-             readtail = new_task;
-          }
-
-           // trigger readtask thread
-           pthread_cond_broadcast(&r_condl);
-           pthread_mutex_unlock(&r_mutex);
-      }
-      else if (events[i].events & EPOLLOUT)
-      {
-          rdata = (struct user_data *)events[i].data.ptr;
-          if ( NULL == rdata )
-          {
-              continue; 
-          }  
-          printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd);
-
-          new_task = malloc(sizeof(struct task));
-          if(NULL == new_task)
-          {
-            goto error;
-          }
-          new_task->data.ptr = (struct user_data*)events[i].data.ptr;
-          new_task->next = NULL;
-
-          pthread_mutex_lock(&w_mutex);
-
-          // the queue is empty
-          if (writehead == NULL)
-          {
-              writehead = new_task;
-              writetail = new_task;
-          }
-          // queue is not empty
-          else                 
-          {
-              writetail->next = new_task;
-              writetail = new_task;
-          }
-
-          // trigger writetask thread
-          pthread_cond_broadcast(&w_condl);  
-
-          pthread_mutex_unlock(&w_mutex);
-      }
-      else
-      {
-        printf("[SERVER] Error: unknown epoll event");
-      }
-    }
-  }
-
-error:
-  close(apt_fd);
-  close(lsn_fd);
-  unlink(UNIX_DOMAIN);
   basic_shm_close(g_memfd);  
   return 0;
 }
diff --git a/src/ipc_server_lib.c b/src/ipc_server_lib.c
new file mode 100644
index 0000000..41c22ee
--- /dev/null
+++ b/src/ipc_server_lib.c
@@ -0,0 +1,466 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <unistd.h>
+#include <sys/epoll.h>
+#include <errno.h>
+#include <sys/syscall.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <pthread.h>
+#include "ipc_msg.h"
+
+#define  MAX_EPOLL_EVENT_COUNT 1024
+#define LISTENQ 1024
+
+// task item in thread pool
+struct task                          
+{
+    // file descriptor or user_data
+    epoll_data_t data;        
+    // next task
+    struct task* next;               
+};
+
+//typedef int (*readfunc)(struct user_data*);
+typedef int (*writefunc)(struct user_data*);
+/* thread exec function */
+static void *readtask(void *args);
+static void *writetask(void *args);
+
+// mutex lock to protect readhead/readtail
+pthread_mutex_t r_mutex;             
+pthread_cond_t r_condl;
+
+// mutex lock to protect writehead/writetail
+pthread_mutex_t w_mutex;             
+pthread_cond_t w_condl;
+struct task *readhead = NULL, *readtail = NULL;
+struct task *writehead = NULL, *writetail = NULL;
+
+//create epoll
+int epfd,eventfd;
+struct epoll_event   ev,events[LISTENQ];
+
+
+static void setnonblocking(int sock)
+{
+  int opts;
+
+  opts = fcntl(sock, F_GETFL);
+  if(opts<0)
+   {
+     perror("fcntl(sock,GETFL)");
+     exit(1); 
+   }
+
+  if(fcntl(sock, F_SETFL, opts | O_NONBLOCK)<0)
+   {
+    perror("fcntl(sock,SETFL,opts)");
+    exit(1);
+   }
+}
+
+
+/**
+ * thread exec function
+ */
+static void *readtask(void *args) 
+{
+  int i = 0;
+  int fd = -1;
+  unsigned int n = 0;
+
+  struct user_data *data = NULL;
+
+  while (1)
+   {
+    // protect task queue (readhead/readtail)
+     pthread_mutex_lock(&r_mutex);
+
+     while(readhead == NULL)
+      {
+       // if condl false, will unlock mutex
+       mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self());
+       pthread_cond_wait(&r_condl, &r_mutex);
+      }
+
+     fd = readhead->data.fd;
+
+      struct task *tmp = readhead;
+      readhead = readhead->next;
+      free(tmp);
+      pthread_mutex_unlock(&r_mutex);
+
+      mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd);
+
+      data = (struct user_data *)malloc(sizeof(struct user_data));
+      if(NULL == data)
+      {
+          perror("readtask malloc");
+          return;
+      }
+      
+      memset(data, 0 , sizeof(struct user_data));
+      data->fd = fd;
+
+      n=0;
+      int nread = 0;
+      while((nread = read(fd,data->line+n,MAX_LEN))>0)
+      {
+          n+=nread;
+      }
+
+      if((nread==-1) && (errno != EAGAIN))
+      {
+        perror("read error");
+      }
+
+      if(n < 0)
+      {
+        if (errno == ECONNRESET)
+          {
+            close(fd);
+            mydebug("[SERVER] Error: readline failed: %s\n", strerror(errno));           
+          }
+          else
+          {
+              mydebug("readline error \n");
+          }
+          if(data != NULL)
+          {
+              free(data);
+              data = NULL;
+          }
+        }		  
+        else if (n == 0)
+        {
+            close(fd);
+            mydebug("[SERVER] Info: client closed connection.\n");
+            if(data != NULL)
+            {
+              free(data);
+              data = NULL;
+            }
+        }
+        else
+        {
+            data->n_size = n;
+        
+            mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line);
+            if (data->line[0] != '\0')
+            {
+                // modify monitored event to EPOLLOUT,  wait next loop to send respond
+                ev.data.ptr = data;
+                // Modify event to EPOLLOUT
+                ev.events = EPOLLOUT | EPOLLET;    
+                // modify moditored fd event
+                if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1)
+                {
+                  perror("epoll_ctl:mod");
+                  if(data != NULL)
+                  {
+                    free(data);
+                    data = NULL;
+                  }                 
+                }
+            }
+          }	     
+      }
+}
+
+static void *writetask(void *args)
+{
+    int n = 0;
+    int nwrite = 0;
+    int ret = 0;
+
+    // data to wirte back to client
+    struct user_data *rdata = NULL;  
+    while(1)
+    {
+        pthread_mutex_lock(&w_mutex);
+        while(writehead == NULL)
+        {
+            // if condl false, will unlock mutex
+            pthread_cond_wait(&w_condl, &w_mutex); 
+        }
+        rdata = (struct user_data*)writehead->data.ptr;
+        struct task* tmp = writehead;
+        writehead = writehead->next;
+        free(tmp);
+
+        //echo("[SERVER] thread %d writetask before unlock\n", pthread_self());
+        pthread_mutex_unlock(&w_mutex);
+        //echo("[SERVER] thread %d writetask after unlock\n", pthread_self());
+
+        if(rdata->fd < 0)
+        {
+            goto error;
+        }
+        mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd);
+
+        writefunc wfunc = (writefunc)args;
+        ret = wfunc(rdata);
+        if(ret < 0)
+        {
+          goto error;
+        }
+
+        // delete data
+        if (rdata->fd > 0)
+        {       
+          // modify monitored event to EPOLLIN, wait next loop to receive data
+          ev.data.fd = rdata->fd;
+          // monitor in message, edge trigger
+          //ev.events = EPOLLIN | EPOLLET;   
+          // modify moditored fd event
+          if(epoll_ctl(epfd, EPOLL_CTL_DEL, rdata->fd, &ev)==-1)
+          {
+              perror("epoll_ctl:del");
+          } 
+  
+          close(rdata->fd);
+        }	
+
+error:	       
+        free(rdata); 
+    }
+}
+
+int basic_create_ipc_server(char * unix_domain_path)
+{
+  pthread_t *read_tids = NULL;
+  pthread_t *write_tids = NULL;   
+  int lsn_fd, apt_fd;
+  struct sockaddr_un srv_addr;
+  struct sockaddr_un clt_addr;
+  socklen_t clt_len;
+  int ret;
+  int i;
+  char line[MAX_LEN] = {0};
+
+  struct task *new_task=NULL;
+  struct user_data *rdata=NULL;
+
+	if((read_tids = (pthread_t *)malloc(READ_THREAD_NUM * sizeof(pthread_t))) == NULL)
+	{
+		perror("read_tids malloc fail\n");
+    return -1;    
+	}
+
+  // threads for reading thread pool  
+  for(i = 0; i < READ_THREAD_NUM; i++)
+  {
+    pthread_create(&read_tids[i], NULL, readtask, NULL);
+  }
+
+	if((write_tids = (pthread_t *)malloc(WRITE_THREAD_NUM * sizeof(pthread_t))) == NULL)
+	{
+		perror("write_tids  malloc fail\n");
+    return -1;
+	}
+
+  // threads for writing thread pool  
+  for(i = 0; i < WRITE_THREAD_NUM; i++)
+  {
+    pthread_create(&write_tids[i], NULL, writetask, proc_memfd);
+  }
+
+  epfd = epoll_create(MAX_EPOLL_EVENT_COUNT);
+
+  //create socket to bind local IP and PORT
+  lsn_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+
+
+  // set the descriptor as non-blocking
+  setnonblocking(lsn_fd);
+
+  ev.data.fd = lsn_fd;
+  ev.events = EPOLLIN|EPOLLET;
+
+  /*Control an epoll descriptor, $epfd, by requesting the operation op be performed on the target file descriptor, fd.
+
+  $epfd is an epoll descriptor returned from epoll_create.
+  $op is one of EPOLL_CTL_ADD, EPOLL_CTL_MOD or EPOLL_CTL_DEL.
+  $fd is the file desciptor to be watched.
+  $eventmask is a bitmask of events defined by EPOLLIN, EPOLLOUT, etc.
+
+  When successful, epoll_ctl returns 0. When an error occurs, epoll_ctl returns -1 and errno is set appropriately.
+  */
+  if(epoll_ctl(epfd,EPOLL_CTL_ADD,lsn_fd,&ev)==-1)
+  {
+      perror("epoll_ctl:add");               
+  } 
+  
+  if(lsn_fd < 0)
+  {
+    perror("can't create communication socket!");
+    unlink(UNIX_DOMAIN);  
+    return 1;
+  }
+
+  //create unix domain
+  if(unix_domain_path == NULL)
+  {
+    unix_domain_path = UNIX_DOMAIN;
+  }
+
+  srv_addr.sun_family = AF_UNIX;
+  strncpy(srv_addr.sun_path, unix_domain_path, sizeof(srv_addr.sun_path) - 1);
+  //unlink(UNIX_DOMAIN);
+
+  //bind sockfd and sockaddr
+  ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
+  if(ret == -1)
+  {
+    perror("can't bind local sockaddr!");
+    close(lsn_fd);
+    unlink(UNIX_DOMAIN);
+    return 1;
+  }
+
+  //listen lsn_fd
+
+  ret = listen(lsn_fd, LISTENQ);
+  if(ret == -1)
+  {
+    perror("can't listen client connect request");
+    close(lsn_fd);
+    unlink(UNIX_DOMAIN);
+
+    return 1;
+  }
+
+  clt_len = sizeof(clt_addr);
+  while(1)
+  {
+    int nfds = epoll_wait(epfd,events,1024,100);
+    int i=0;
+    for(i=0;i<nfds;++i)
+    {
+      if(events[i].data.fd == lsn_fd)
+      {
+          // accept the client connection
+			    while((apt_fd=accept(lsn_fd, (struct sockaddr *)&clt_addr, &clt_len))>0)
+			    {
+            setnonblocking(apt_fd);
+            //char *str = inet_ntoa(clt_addr.sun_path);
+            //printf("[SERVER] connect from %s \n", str);
+            ev.data.fd = apt_fd; 				       
+            // monitor in message, edge trigger
+            ev.events = EPOLLIN | EPOLLET; 				       
+            // add fd to epoll queue
+            if(epoll_ctl(epfd,EPOLL_CTL_ADD,apt_fd,&ev)==-1)
+            {
+                perror("epoll_ctl:add");
+                continue;
+                //exit(EXIT_FAILURE);
+            }
+        	}
+    	 		if( apt_fd == -1)
+    			{
+    	 			if((errno != EAGAIN ) 
+    					&& (errno!= ECONNABORTED)
+    	 				&& (errno != EPROTO)
+    	 				&& (errno != EINTR))
+    	 			{
+    					perror("accept");
+    	 				
+    	 			}
+    	 		}
+	 		    continue;
+      }
+      else if (events[i].events & EPOLLIN)
+      //write鏁版嵁
+      {
+        printf("EPOLLIN\n");
+        if( (eventfd = events[i].data.fd) < 0 )
+        {
+          continue;
+        }
+
+          printf("[SERVER] put task %d to read queue\n", events[i].data.fd);
+
+          new_task = (struct task *)malloc(sizeof(struct task));
+          if(NULL == new_task)
+          {
+            goto error;
+          }          
+          new_task->data.fd= eventfd;
+          new_task->next = NULL;
+
+          // protect task queue (readhead/readtail)
+          pthread_mutex_lock(&r_mutex); 
+
+          // the queue is empty
+          if(readhead == NULL)
+          {
+             readhead = new_task;
+             readtail = new_task;
+          }
+          // queue is not empty
+          else
+          {
+             readtail->next = new_task;
+             readtail = new_task;
+          }
+
+           // trigger readtask thread
+           pthread_cond_broadcast(&r_condl);
+           pthread_mutex_unlock(&r_mutex);
+      }
+      else if (events[i].events & EPOLLOUT)
+      {
+          rdata = (struct user_data *)events[i].data.ptr;
+          if ( NULL == rdata )
+          {
+              continue; 
+          }  
+          printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd);
+
+          new_task = malloc(sizeof(struct task));
+          if(NULL == new_task)
+          {
+            goto error;
+          }
+          new_task->data.ptr = (struct user_data*)events[i].data.ptr;
+          new_task->next = NULL;
+
+          pthread_mutex_lock(&w_mutex);
+
+          // the queue is empty
+          if (writehead == NULL)
+          {
+              writehead = new_task;
+              writetail = new_task;
+          }
+          // queue is not empty
+          else                 
+          {
+              writetail->next = new_task;
+              writetail = new_task;
+          }
+
+          // trigger writetask thread
+          pthread_cond_broadcast(&w_condl);  
+
+          pthread_mutex_unlock(&w_mutex);
+      }
+      else
+      {
+        printf("[SERVER] Error: unknown epoll event");
+      }
+    }
+  }
+
+error:
+  close(apt_fd);
+  close(lsn_fd);
+  unlink(UNIX_DOMAIN);
+}
+
diff --git a/src/memfd.c b/src/memfd.c
index 2509685..00df30e 100644
--- a/src/memfd.c
+++ b/src/memfd.c
@@ -12,6 +12,7 @@
 #include <time.h>
 #include <stdarg.h>
 #include "memfd.h"
+#include <assert.h>
 
 #define errExit(msg)    do { perror(msg); exit(EXIT_FAILURE); \
 } while (0)

--
Gitblit v1.8.0