shm implemented as memfd syscall
cheliequan
2022-12-27 54951dfb930bea890aef14be02236f81b3a19f2e
src/ipc_server.c
@@ -1,61 +1,14 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <signal.h>
#include <sys/epoll.h>
#include <errno.h>
#include <sys/syscall.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/types.h>
#include <pthread.h>
#include <sys/stat.h>
#include "memfd.h"
#include "ipc_msg.h"
#define  MAX_EPOLL_EVENT_COUNT 1024
#define errExit(msg)    do { perror(msg); exit(EXIT_FAILURE); \
} while (0)
#define LISTENQ 1024
// task item in thread pool
struct task
{
    // file descriptor or user_data
    epoll_data_t data;
    // next task
    struct task* next;
};
struct user_data
{
  int fd;
  unsigned int n_size;
  char line[MAX_LEN];
};
//typedef int (*readfunc)(struct user_data*);
typedef int (*writefunc)(struct user_data*);
/* thread exec function */
void *readtask(void *args);
void *writetask(void *args);
// mutex lock to protect readhead/readtail
pthread_mutex_t r_mutex;
pthread_cond_t r_condl;
// mutex lock to protect writehead/writetail
pthread_mutex_t w_mutex;
pthread_cond_t w_condl;
struct task *readhead = NULL, *readtail = NULL;
struct task *writehead = NULL, *writetail = NULL;
//create epoll
int epfd,eventfd;
struct epoll_event   ev,events[LISTENQ];
int g_memfd = 0;
@@ -89,23 +42,6 @@
  return len;
}
void setnonblocking(int sock)
{
  int opts;
  opts = fcntl(sock, F_GETFL);
  if(opts<0)
   {
     perror("fcntl(sock,GETFL)");
     exit(1);
   }
  if(fcntl(sock, F_SETFL, opts | O_NONBLOCK)<0)
   {
    perror("fcntl(sock,SETFL,opts)");
    exit(1);
   }
}
int proc_memfd(struct user_data* rdata)
@@ -166,423 +102,38 @@
    
}
/**
 * thread exec function
 */
void *readtask(void *args)
{
  int i = 0;
  int fd = -1;
  unsigned int n = 0;
  struct user_data *data = NULL;
  while (1)
   {
    // protect task queue (readhead/readtail)
     pthread_mutex_lock(&r_mutex);
     while(readhead == NULL)
      {
       // if condl false, will unlock mutex
       mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self());
       pthread_cond_wait(&r_condl, &r_mutex);
      }
     fd = readhead->data.fd;
      struct task *tmp = readhead;
      readhead = readhead->next;
      free(tmp);
      pthread_mutex_unlock(&r_mutex);
      mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd);
      data = (struct user_data *)malloc(sizeof(struct user_data));
      if(NULL == data)
      {
          perror("readtask malloc");
          return;
      }
      memset(data, 0 , sizeof(struct user_data));
      data->fd = fd;
      n=0;
      int nread = 0;
      while((nread = read(fd,data->line+n,MAX_LEN))>0)
      {
          n+=nread;
      }
      if((nread==-1) && (errno != EAGAIN))
      {
        perror("read error");
      }
      if(n < 0)
      {
        if (errno == ECONNRESET)
          {
            close(fd);
            mydebug("[SERVER] Error: readline failed: %s\n", strerror(errno));
          }
          else
          {
              mydebug("readline error \n");
          }
          if(data != NULL)
          {
              free(data);
              data = NULL;
          }
        }
        else if (n == 0)
        {
            close(fd);
            mydebug("[SERVER] Info: client closed connection.\n");
            if(data != NULL)
            {
              free(data);
              data = NULL;
            }
        }
        else
        {
            data->n_size = n;
            mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line);
            if (data->line[0] != '\0')
            {
                // modify monitored event to EPOLLOUT,  wait next loop to send respond
                ev.data.ptr = data;
                // Modify event to EPOLLOUT
                ev.events = EPOLLOUT | EPOLLET;
                // modify moditored fd event
                if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1)
                {
                  perror("epoll_ctl:mod");
                  if(data != NULL)
                  {
                    free(data);
                    data = NULL;
                  }
                }
            }
          }
      }
}
void *writetask(void *args)
{
    int n = 0;
    int nwrite = 0;
    int ret = 0;
    // data to wirte back to client
    struct user_data *rdata = NULL;
    while(1)
    {
        pthread_mutex_lock(&w_mutex);
        while(writehead == NULL)
        {
            // if condl false, will unlock mutex
            pthread_cond_wait(&w_condl, &w_mutex);
        }
        rdata = (struct user_data*)writehead->data.ptr;
        struct task* tmp = writehead;
        writehead = writehead->next;
        free(tmp);
        //echo("[SERVER] thread %d writetask before unlock\n", pthread_self());
        pthread_mutex_unlock(&w_mutex);
        //echo("[SERVER] thread %d writetask after unlock\n", pthread_self());
        if(rdata->fd < 0)
        {
            goto error;
        }
        mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd);
        writefunc wfunc = (writefunc)args;
        ret = wfunc(rdata);
        if(ret < 0)
        {
          goto error;
        }
        // delete data
        if (rdata->fd > 0)
        {
          // modify monitored event to EPOLLIN, wait next loop to receive data
          ev.data.fd = rdata->fd;
          // monitor in message, edge trigger
          //ev.events = EPOLLIN | EPOLLET;
          // modify moditored fd event
          if(epoll_ctl(epfd, EPOLL_CTL_DEL, rdata->fd, &ev)==-1)
          {
              perror("epoll_ctl:del");
          }
          close(rdata->fd);
        }
error:
        free(rdata);
    }
}
/*define you our proc_memfd funtion to send your message to other processes,or just put it in shm,
 notice the file description fd and the pid of creator to other*/
int main(int argc, char **argv)
{
  char *name;
  ssize_t  len;
  int lsn_fd, apt_fd;
  struct sockaddr_un srv_addr;
  struct sockaddr_un clt_addr;
  socklen_t clt_len;
  int ret;
  int i;
  //char recv_buf[MAX_LEN] = {0};
  //char send_buf[MAX_LEN] = {0};
  char line[MAX_LEN] = {0};
  pthread_t *read_tids = NULL;
  int read_thread_num = 2;
  pthread_t *write_tids = NULL;
  int write_thread_num = 2;
  struct task *new_task=NULL;
  struct user_data *rdata=NULL;
  char * unixdomain = NULL;
  int ret = 0;
  if (argc < 3) {
    fprintf(stderr, "%s name size\n", argv[0]);
    fprintf(stderr, "%s name size [unix domain path]\n", argv[0]);
    exit(EXIT_FAILURE);
  }
  name = argv[1];
  len = atoi(argv[2]) * 1024LU * 1024LU * 1024LU;
  signal(SIGTERM,handler);
  unixdomain = argv[3];
  signal(SIGTERM,handler);
  signal(SIGABRT,handler);
  signal(SIGSEGV,handler);
  g_memfd = basic_shm_create(name, len);
  ret = shm_write(g_memfd);
   if((read_tids = (pthread_t *)malloc(read_thread_num * sizeof(pthread_t))) == NULL)
   {
      perror("read_tids malloc fail\n");
    return -1;
   }
  // threads for reading thread pool
  for(i = 0; i < read_thread_num; i++)
  ret = basic_create_ipc_server(unixdomain);
  if(ret < 0)
  {
    pthread_create(&read_tids[i], NULL, readtask, NULL);
     printf("failed to create ipc_server\n");
  }
   if((write_tids = (pthread_t *)malloc(write_thread_num * sizeof(pthread_t))) == NULL)
   {
      perror("write_tids  malloc fail\n");
    return -1;
   }
  // threads for writing thread pool
  for(i = 0; i < write_thread_num; i++)
  {
    pthread_create(&write_tids[i], NULL, writetask, proc_memfd);
  }
  epfd = epoll_create(MAX_EPOLL_EVENT_COUNT);
  //create socket to bind local IP and PORT
  lsn_fd = socket(AF_UNIX, SOCK_STREAM, 0);
  int opt = 0x1;
  setsockopt(lsn_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
  // set the descriptor as non-blocking
  setnonblocking(lsn_fd);
  ev.data.fd = lsn_fd;
  ev.events = EPOLLIN|EPOLLET;
  /*Control an epoll descriptor, $epfd, by requesting the operation op be performed on the target file descriptor, fd.
  $epfd is an epoll descriptor returned from epoll_create.
  $op is one of EPOLL_CTL_ADD, EPOLL_CTL_MOD or EPOLL_CTL_DEL.
  $fd is the file desciptor to be watched.
  $eventmask is a bitmask of events defined by EPOLLIN, EPOLLOUT, etc.
  When successful, epoll_ctl returns 0. When an error occurs, epoll_ctl returns -1 and errno is set appropriately.
  */
  if(epoll_ctl(epfd,EPOLL_CTL_ADD,lsn_fd,&ev)==-1)
  {
      perror("epoll_ctl:add");
  }
  if(lsn_fd < 0)
  {
    perror("can't create communication socket!");
    unlink(UNIX_DOMAIN);
    basic_shm_close(g_memfd);
    return 1;
  }
  //create local IP and PORT
  srv_addr.sun_family = AF_UNIX;
  strncpy(srv_addr.sun_path, UNIX_DOMAIN, sizeof(srv_addr.sun_path) - 1);
  //unlink(UNIX_DOMAIN);
  //bind sockfd and sockaddr
  ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
  if(ret == -1)
  {
    perror("can't bind local sockaddr!");
    close(lsn_fd);
    unlink(UNIX_DOMAIN);
    basic_shm_close(g_memfd);
    return 1;
  }
  //listen lsn_fd, try listen 5
  ret = listen(lsn_fd, LISTENQ);
  if(ret == -1)
  {
    perror("can't listen client connect request");
    close(lsn_fd);
    unlink(UNIX_DOMAIN);
    basic_shm_close(g_memfd);
    return 1;
  }
  clt_len = sizeof(clt_addr);
  while(1)
  {
    int nfds = epoll_wait(epfd,events,1024,100);
    int i=0;
    for(i=0;i<nfds;++i)
    {
      if(events[i].data.fd == lsn_fd)
      {
          // accept the client connection
             while((apt_fd=accept(lsn_fd, (struct sockaddr *)&clt_addr, &clt_len))>0)
             {
            setnonblocking(apt_fd);
            //char *str = inet_ntoa(clt_addr.sun_path);
            //printf("[SERVER] connect from %s \n", str);
            ev.data.fd = apt_fd;
            // monitor in message, edge trigger
            ev.events = EPOLLIN | EPOLLET;
            // add fd to epoll queue
            if(epoll_ctl(epfd,EPOLL_CTL_ADD,apt_fd,&ev)==-1)
            {
                perror("epoll_ctl:add");
                continue;
                //exit(EXIT_FAILURE);
            }
           }
              if( apt_fd == -1)
             {
                 if((errno != EAGAIN )
                   && (errno!= ECONNABORTED)
                    && (errno != EPROTO)
                    && (errno != EINTR))
                 {
                   perror("accept");
                 }
              }
              continue;
      }
      else if (events[i].events & EPOLLIN)
      //write数据
      {
        printf("EPOLLIN\n");
        if( (eventfd = events[i].data.fd) < 0 )
        {
          continue;
        }
          printf("[SERVER] put task %d to read queue\n", events[i].data.fd);
          new_task = (struct task *)malloc(sizeof(struct task));
          if(NULL == new_task)
          {
            goto error;
          }
          new_task->data.fd= eventfd;
          new_task->next = NULL;
          // protect task queue (readhead/readtail)
          pthread_mutex_lock(&r_mutex);
          // the queue is empty
          if(readhead == NULL)
          {
             readhead = new_task;
             readtail = new_task;
          }
          // queue is not empty
          else
          {
             readtail->next = new_task;
             readtail = new_task;
          }
           // trigger readtask thread
           pthread_cond_broadcast(&r_condl);
           pthread_mutex_unlock(&r_mutex);
      }
      else if (events[i].events & EPOLLOUT)
      {
          rdata = (struct user_data *)events[i].data.ptr;
          if ( NULL == rdata )
          {
              continue;
          }
          printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd);
          new_task = malloc(sizeof(struct task));
          if(NULL == new_task)
          {
            goto error;
          }
          new_task->data.ptr = (struct user_data*)events[i].data.ptr;
          new_task->next = NULL;
          pthread_mutex_lock(&w_mutex);
          // the queue is empty
          if (writehead == NULL)
          {
              writehead = new_task;
              writetail = new_task;
          }
          // queue is not empty
          else
          {
              writetail->next = new_task;
              writetail = new_task;
          }
          // trigger writetask thread
          pthread_cond_broadcast(&w_condl);
          pthread_mutex_unlock(&w_mutex);
      }
      else
      {
        printf("[SERVER] Error: unknown epoll event");
      }
    }
  }
error:
  close(apt_fd);
  close(lsn_fd);
  unlink(UNIX_DOMAIN);
  basic_shm_close(g_memfd);  
  return 0;
}