shm implemented as memfd syscall
src/ipc_server_lib.c
@@ -13,6 +13,8 @@
#include <sys/types.h>
#include <pthread.h>
#include "ipc_msg.h"
#include "memfd.h"
#define  MAX_EPOLL_EVENT_COUNT 1024
#define LISTENQ 1024
@@ -26,8 +28,7 @@
    struct task* next;               
};
//typedef int (*readfunc)(struct user_data*);
typedef int (*writefunc)(struct user_data*);
/* thread exec function */
static void *readtask(void *args);
static void *writetask(void *args);
@@ -71,7 +72,6 @@
 */
static void *readtask(void *args) 
{
  int i = 0;
  int fd = -1;
  unsigned int n = 0;
@@ -85,16 +85,22 @@
     while(readhead == NULL)
      {
       // if condl false, will unlock mutex
       mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self());
       mydebug("thread:%u waiting, readtask is NULL ... \n", (uint32_t)pthread_self());
       pthread_cond_wait(&r_condl, &r_mutex);
      }
     fd = readhead->data.fd;
      fd = readhead->data.fd;
      struct task *tmp = readhead;
      readhead = readhead->next;
      free(tmp);
      pthread_mutex_unlock(&r_mutex);
      if(fd < 0)
      {
         perror("readtask fd<0");
         continue;
      }
      mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd);
@@ -102,7 +108,7 @@
      if(NULL == data)
      {
          perror("readtask malloc");
          return;
          return NULL;
      }
      
      memset(data, 0 , sizeof(struct user_data));
@@ -136,47 +142,47 @@
              free(data);
              data = NULL;
          }
        }
        else if (n == 0)
        {
            close(fd);
            mydebug("[SERVER] Info: client closed connection.\n");
            if(data != NULL)
            {
              free(data);
              data = NULL;
            }
        }
        else
        {
            data->n_size = n;
            mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line);
            if (data->line[0] != '\0')
            {
                // modify monitored event to EPOLLOUT,  wait next loop to send respond
                ev.data.ptr = data;
                // Modify event to EPOLLOUT
                ev.events = EPOLLOUT | EPOLLET;
                // modify moditored fd event
                if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1)
                {
                  perror("epoll_ctl:mod");
                  if(data != NULL)
                  {
                    free(data);
                    data = NULL;
                  }
                }
            }
          }
      }
      else if (n == 0)
      {
          close(fd);
          mydebug("[SERVER] Info: client closed connection.\n");
          if(data != NULL)
          {
            free(data);
            data = NULL;
          }
      }
      else
      {
          data->n_size = n;
          mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line);
          if (data->line[0] != '\0')
          {
              // modify monitored event to EPOLLOUT,  wait next loop to send respond
              ev.data.ptr = data;
              // Modify event to EPOLLOUT
              ev.events = EPOLLOUT | EPOLLET;
              // modify moditored fd event
              if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1)
              {
                perror("epoll_ctl:mod");
                if(data != NULL)
                {
                  free(data);
                  data = NULL;
                }
              }
          }
        }
      }
      return NULL;
}
static void *writetask(void *args)
{
    int n = 0;
    int nwrite = 0;
    int ret = 0;
    // data to wirte back to client
@@ -200,15 +206,18 @@
        if(rdata->fd < 0)
        {
            goto error;
          continue;
        }
        mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd);
        mydebug("[SERVER] writetask %u fetch data %d\n", (uint32_t)pthread_self(), rdata->fd);
        writefunc wfunc = (writefunc)args;
        ret = wfunc(rdata);
        if(ret < 0)
        if(NULL != args)
        {
          goto error;
          args_data_st *pargs_data_st = (args_data_st *)args;
          ret = pargs_data_st->wfunc(rdata, pargs_data_st->args, pargs_data_st->len);
          if(ret < 0)
          {
            goto error;
          }
        }
        // delete data
@@ -230,9 +239,11 @@
error:          
        free(rdata); 
    }
    return NULL;
}
int basic_create_ipc_server(char * unix_domain_path)
int basic_create_ipc_server(char * unix_domain_path, args_data_st * pargs_data_st)
{
  pthread_t *read_tids = NULL;
  pthread_t *write_tids = NULL;   
@@ -242,16 +253,19 @@
  socklen_t clt_len;
  int ret;
  int i;
  char line[MAX_LEN] = {0};
  struct task *new_task=NULL;
  struct user_data *rdata=NULL;
  if(0 == file_exists(unix_domain_path))
  {
      unlink(unix_domain_path);
  }
   if((read_tids = (pthread_t *)malloc(READ_THREAD_NUM * sizeof(pthread_t))) == NULL)
   {
      perror("read_tids malloc fail\n");
  if((read_tids = (pthread_t *)malloc(READ_THREAD_NUM * sizeof(pthread_t))) == NULL)
  {
    perror("read_tids malloc fail\n");
    return -1;    
   }
  }
  // threads for reading thread pool  
  for(i = 0; i < READ_THREAD_NUM; i++)
@@ -268,7 +282,7 @@
  // threads for writing thread pool  
  for(i = 0; i < WRITE_THREAD_NUM; i++)
  {
    pthread_create(&write_tids[i], NULL, writetask, proc_memfd);
    pthread_create(&write_tids[i], NULL, writetask, (void *)pargs_data_st);
  }
  epfd = epoll_create(MAX_EPOLL_EVENT_COUNT);
@@ -300,7 +314,7 @@
  if(lsn_fd < 0)
  {
    perror("can't create communication socket!");
    unlink(UNIX_DOMAIN);
    unlink(unix_domain_path);
    return 1;
  }
@@ -312,7 +326,6 @@
  srv_addr.sun_family = AF_UNIX;
  strncpy(srv_addr.sun_path, unix_domain_path, sizeof(srv_addr.sun_path) - 1);
  //unlink(UNIX_DOMAIN);
  //bind sockfd and sockaddr
  ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
@@ -320,7 +333,7 @@
  {
    perror("can't bind local sockaddr!");
    close(lsn_fd);
    unlink(UNIX_DOMAIN);
    unlink(unix_domain_path);
    return 1;
  }
@@ -331,7 +344,7 @@
  {
    perror("can't listen client connect request");
    close(lsn_fd);
    unlink(UNIX_DOMAIN);
    unlink(unix_domain_path);
    return 1;
  }
@@ -378,20 +391,20 @@
      else if (events[i].events & EPOLLIN)
      //write数据
      {
        printf("EPOLLIN\n");
        if( (eventfd = events[i].data.fd) < 0 )
        mydebug("EPOLLIN\n");
        if(events[i].data.fd < 0)
        {
          continue;
        }
          printf("[SERVER] put task %d to read queue\n", events[i].data.fd);
          mydebug("[SERVER] put task %d to read queue\n", events[i].data.fd);
          new_task = (struct task *)malloc(sizeof(struct task));
          if(NULL == new_task)
          {
            goto error;
          }          
          new_task->data.fd= eventfd;
          new_task->data.fd= events[i].data.fd;
          new_task->next = NULL;
          // protect task queue (readhead/readtail)
@@ -421,7 +434,7 @@
          {
              continue; 
          }  
          printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd);
          mydebug("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd);
          new_task = malloc(sizeof(struct task));
          if(NULL == new_task)
@@ -453,7 +466,7 @@
      }
      else
      {
        printf("[SERVER] Error: unknown epoll event");
        mydebug("[SERVER] Error: unknown epoll event");
      }
    }
  }
@@ -461,6 +474,8 @@
error:
  close(apt_fd);
  close(lsn_fd);
  unlink(UNIX_DOMAIN);
  unlink(unix_domain_path);
  return 0;
}