From 157b3411dd123694ca29dd80fe9ecc683958ccab Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期四, 27 七月 2023 11:54:24 +0800
Subject: [PATCH] add epoll/poll/select sendmsg/recvmsg transmit fd

---
 src/ipc_server_lib.c |  147 +++++++++++++++++++++++++++----------------------
 1 files changed, 81 insertions(+), 66 deletions(-)

diff --git a/src/ipc_server_lib.c b/src/ipc_server_lib.c
index 41c22ee..6b2ce1f 100644
--- a/src/ipc_server_lib.c
+++ b/src/ipc_server_lib.c
@@ -13,6 +13,8 @@
 #include <sys/types.h>
 #include <pthread.h>
 #include "ipc_msg.h"
+#include "memfd.h"
+
 
 #define  MAX_EPOLL_EVENT_COUNT 1024
 #define LISTENQ 1024
@@ -26,8 +28,7 @@
     struct task* next;               
 };
 
-//typedef int (*readfunc)(struct user_data*);
-typedef int (*writefunc)(struct user_data*);
+
 /* thread exec function */
 static void *readtask(void *args);
 static void *writetask(void *args);
@@ -71,7 +72,6 @@
  */
 static void *readtask(void *args) 
 {
-  int i = 0;
   int fd = -1;
   unsigned int n = 0;
 
@@ -85,16 +85,22 @@
      while(readhead == NULL)
       {
        // if condl false, will unlock mutex
-       mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self());
+       mydebug("thread:%u waiting, readtask is NULL ... \n", (uint32_t)pthread_self());
        pthread_cond_wait(&r_condl, &r_mutex);
       }
 
-     fd = readhead->data.fd;
+      fd = readhead->data.fd;
 
       struct task *tmp = readhead;
       readhead = readhead->next;
       free(tmp);
       pthread_mutex_unlock(&r_mutex);
+
+      if(fd < 0)
+      {
+         perror("readtask fd<0");
+         continue;
+      }
 
       mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd);
 
@@ -102,7 +108,7 @@
       if(NULL == data)
       {
           perror("readtask malloc");
-          return;
+          return NULL;
       }
       
       memset(data, 0 , sizeof(struct user_data));
@@ -136,47 +142,47 @@
               free(data);
               data = NULL;
           }
-        }		  
-        else if (n == 0)
-        {
-            close(fd);
-            mydebug("[SERVER] Info: client closed connection.\n");
-            if(data != NULL)
-            {
-              free(data);
-              data = NULL;
-            }
-        }
-        else
-        {
-            data->n_size = n;
-        
-            mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line);
-            if (data->line[0] != '\0')
-            {
-                // modify monitored event to EPOLLOUT,  wait next loop to send respond
-                ev.data.ptr = data;
-                // Modify event to EPOLLOUT
-                ev.events = EPOLLOUT | EPOLLET;    
-                // modify moditored fd event
-                if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1)
-                {
-                  perror("epoll_ctl:mod");
-                  if(data != NULL)
-                  {
-                    free(data);
-                    data = NULL;
-                  }                 
-                }
-            }
-          }	     
+      }		  
+      else if (n == 0)
+      {
+          close(fd);
+          mydebug("[SERVER] Info: client closed connection.\n");
+          if(data != NULL)
+          {
+            free(data);
+            data = NULL;
+          }
       }
+      else
+      {
+          data->n_size = n;
+      
+          mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line);
+          if (data->line[0] != '\0')
+          {
+              // modify monitored event to EPOLLOUT,  wait next loop to send respond
+              ev.data.ptr = data;
+              // Modify event to EPOLLOUT
+              ev.events = EPOLLOUT | EPOLLET;    
+              // modify moditored fd event
+              if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1)
+              {
+                perror("epoll_ctl:mod");
+                if(data != NULL)
+                {
+                  free(data);
+                  data = NULL;
+                }                 
+              }
+          }
+        }	     
+      }
+
+      return NULL;
 }
 
 static void *writetask(void *args)
 {
-    int n = 0;
-    int nwrite = 0;
     int ret = 0;
 
     // data to wirte back to client
@@ -200,15 +206,18 @@
 
         if(rdata->fd < 0)
         {
-            goto error;
+          continue;
         }
-        mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd);
+        mydebug("[SERVER] writetask %u fetch data %d\n", (uint32_t)pthread_self(), rdata->fd);
 
-        writefunc wfunc = (writefunc)args;
-        ret = wfunc(rdata);
-        if(ret < 0)
+        if(NULL != args)
         {
-          goto error;
+          args_data_st *pargs_data_st = (args_data_st *)args;
+          ret = pargs_data_st->wfunc(rdata, pargs_data_st->args, pargs_data_st->len);
+          if(ret < 0)
+          {
+            goto error;
+          }
         }
 
         // delete data
@@ -230,9 +239,11 @@
 error:	       
         free(rdata); 
     }
+
+    return NULL;
 }
 
-int basic_create_ipc_server(char * unix_domain_path)
+int basic_create_ipc_server(char * unix_domain_path, args_data_st * pargs_data_st)
 {
   pthread_t *read_tids = NULL;
   pthread_t *write_tids = NULL;   
@@ -242,16 +253,19 @@
   socklen_t clt_len;
   int ret;
   int i;
-  char line[MAX_LEN] = {0};
 
   struct task *new_task=NULL;
   struct user_data *rdata=NULL;
+  if(0 == file_exists(unix_domain_path))
+  {
+      unlink(unix_domain_path);
+  }
 
-	if((read_tids = (pthread_t *)malloc(READ_THREAD_NUM * sizeof(pthread_t))) == NULL)
-	{
-		perror("read_tids malloc fail\n");
+  if((read_tids = (pthread_t *)malloc(READ_THREAD_NUM * sizeof(pthread_t))) == NULL)
+  {
+    perror("read_tids malloc fail\n");
     return -1;    
-	}
+  }
 
   // threads for reading thread pool  
   for(i = 0; i < READ_THREAD_NUM; i++)
@@ -268,7 +282,7 @@
   // threads for writing thread pool  
   for(i = 0; i < WRITE_THREAD_NUM; i++)
   {
-    pthread_create(&write_tids[i], NULL, writetask, proc_memfd);
+    pthread_create(&write_tids[i], NULL, writetask, (void *)pargs_data_st);
   }
 
   epfd = epoll_create(MAX_EPOLL_EVENT_COUNT);
@@ -300,7 +314,7 @@
   if(lsn_fd < 0)
   {
     perror("can't create communication socket!");
-    unlink(UNIX_DOMAIN);  
+    unlink(unix_domain_path);  
     return 1;
   }
 
@@ -312,7 +326,6 @@
 
   srv_addr.sun_family = AF_UNIX;
   strncpy(srv_addr.sun_path, unix_domain_path, sizeof(srv_addr.sun_path) - 1);
-  //unlink(UNIX_DOMAIN);
 
   //bind sockfd and sockaddr
   ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
@@ -320,7 +333,7 @@
   {
     perror("can't bind local sockaddr!");
     close(lsn_fd);
-    unlink(UNIX_DOMAIN);
+    unlink(unix_domain_path);
     return 1;
   }
 
@@ -331,7 +344,7 @@
   {
     perror("can't listen client connect request");
     close(lsn_fd);
-    unlink(UNIX_DOMAIN);
+    unlink(unix_domain_path);
 
     return 1;
   }
@@ -378,20 +391,20 @@
       else if (events[i].events & EPOLLIN)
       //write鏁版嵁
       {
-        printf("EPOLLIN\n");
-        if( (eventfd = events[i].data.fd) < 0 )
+        mydebug("EPOLLIN\n");
+        if(events[i].data.fd < 0)
         {
           continue;
         }
 
-          printf("[SERVER] put task %d to read queue\n", events[i].data.fd);
+          mydebug("[SERVER] put task %d to read queue\n", events[i].data.fd);
 
           new_task = (struct task *)malloc(sizeof(struct task));
           if(NULL == new_task)
           {
             goto error;
           }          
-          new_task->data.fd= eventfd;
+          new_task->data.fd= events[i].data.fd;
           new_task->next = NULL;
 
           // protect task queue (readhead/readtail)
@@ -421,7 +434,7 @@
           {
               continue; 
           }  
-          printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd);
+          mydebug("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd);
 
           new_task = malloc(sizeof(struct task));
           if(NULL == new_task)
@@ -453,7 +466,7 @@
       }
       else
       {
-        printf("[SERVER] Error: unknown epoll event");
+        mydebug("[SERVER] Error: unknown epoll event");
       }
     }
   }
@@ -461,6 +474,8 @@
 error:
   close(apt_fd);
   close(lsn_fd);
-  unlink(UNIX_DOMAIN);
+  unlink(unix_domain_path);
+
+  return 0;
 }
 

--
Gitblit v1.8.0