shm implemented as memfd syscall
cheliequan
2022-12-15 e59ca6b2a1bdc1fe66ab86fd247d0215a7ac7951
增加epoll unixdomain通信示例
1.支持多线程
4个文件已添加
1个文件已修改
883 ■■■■■ 已修改文件
src/Makefile 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/ipc_client.c 135 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/ipc_msg.c 105 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/ipc_msg.h 47 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/ipc_server.c 588 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/Makefile
@@ -2,7 +2,7 @@
AR=ar
CFLAGS=-g -D __SHMT_DEBUG_MODE__=1 -Wall  -I${INCLUDE}  
INCLUDE=-I. -I ../include
all:shmht shmht_mytests
all:shmht shmht_mytests ipc_server ipc_client
    gcc -g -c -Wall -Werror -I${INCLUDE} -fpic list_in_shm.c  memfd.c -lpthread
    gcc -shared -o liblistInShm.so list_in_shm.o memfd.o
#    gcc test_list_in_shm.c liblistInShm.so -lpthread
@@ -17,5 +17,9 @@
    $(CC) $(CFLAGS) $(INCLUDE) -fPIC -c shmht_mytests.c
memfd.o:
    $(CC) $(CFLAGS) $(INCLUDE) -fPIC -c memfd.c
ipc_server:
    $(CC) $(CFLAGS) $(INCLUDE) -fPIC -lpthread -o $@ ipc_server.c ipc_msg.c memfd.c
ipc_client:
    $(CC) $(CFLAGS) $(INCLUDE) -fPIC -o $@ ipc_client.c ipc_msg.c memfd.c
clean:
    rm -rf *.so *.o a.out shmht_mytests *.a
    rm -rf *.so *.o a.out shmht_mytests *.a ipc_client ipc_server
src/ipc_client.c
New file
@@ -0,0 +1,135 @@
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <stdio.h>
#include <sys/inotify.h>
#include <sys/time.h>
#include <errno.h>
#include "ipc_msg.h"
#define errExit(msg)    do { perror(msg); exit(EXIT_FAILURE); \
} while (0)
int memfd_open(char* fd_path)
{
  ssize_t  len;
  int fd;
  struct stat st;
  fd = open(fd_path, O_RDWR);
  if (fd == -1)
    errExit("open");
  if (fstat (fd, &st))
    errExit ("fstat");
  len = st.st_size;
  printf("length: %zu, atime: %lu.%lu\n", len, st.st_atim.tv_sec, st.st_atim.tv_nsec);
  return fd;
}
int memfd_read(int fd)
{
  unsigned char* addr;
  struct stat st;
  ssize_t  i, len;
  if (fstat (fd, &st))
    errExit ("fstat");
  len = st.st_size;
  addr = (unsigned char*) mmap (NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
  if (addr == MAP_FAILED)
    errExit("mmap");
  printf ("start: ");
  for (i = 0; i < 32; ++i)
    printf ("%i ", addr[i]);
  printf ("\nend: ");
  for (i = 0; i < 32; ++i)
    printf ("%i ", addr[len-32+i]);
  printf ("\ndone\n");
  return len;
}
int main(int argc, char **argv)
{
  int fd;
  int connect_fd;
  struct sockaddr_un srv_addr;
  char snd_buf[MAX_LEN] = {0};
  char rcv_buf[MAX_LEN] = {0};
  int ret;
  int i;
  char *fd_path;
  memfd_data_st  memfd_data = {0};
  int rcv_num = 0;
  connect_fd = socket(AF_UNIX, SOCK_STREAM, 0);
  if(connect_fd < 0)
  {
    perror("client create socket failed");
    return 1;
  }
  srv_addr.sun_family = AF_UNIX;
  strcpy(srv_addr.sun_path, UNIX_DOMAIN);
  ret = connect(connect_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
  if(ret == -1)
  {
    perror("connect to server failed!");
    close(connect_fd);
    unlink(UNIX_DOMAIN);
    return 1;
  }
  printf("connect to server path:%s success!", srv_addr.sun_path);
  //memset(rcv_buf, 0, sizeof(rcv_buf));
  //int rcv_num = read(connect_fd, rcv_buf, sizeof(rcv_buf));
  //memcpy(&memfd_data, rcv_buf, sizeof(memfd_data_st));
  memset(snd_buf, 0, 256);
  strcpy(snd_buf, HELLO_MSG);
  printf("sizeof(snd_buf): %ld\n", sizeof(snd_buf));
  printf("send data to server... ...\n");
 // for(i = 0; i < 4; i++)
  {
    write(connect_fd, snd_buf, sizeof(snd_buf));
  }
  printf("send end!\n");
  memset(rcv_buf, 0, sizeof(rcv_buf));
  rcv_num = read(connect_fd, rcv_buf, sizeof(rcv_buf));
  if(rcv_num == 0)
  {
    close(connect_fd);
    return 0;
  }
  memcpy(&memfd_data, rcv_buf, rcv_num);
  printf("receive message from server,pid:%d, memfd:%d\n", memfd_data.pid, memfd_data.memfd);
  if ((memfd_data.pid != 0) && (memfd_data.memfd != 0))
  {
    char fd_path[256] = {0};
    snprintf(fd_path, sizeof(fd_path), "/proc/%d/fd/%d", memfd_data.pid, memfd_data.memfd);
    fd = memfd_open(fd_path);
    ret = memfd_read(fd);
  }
  close(connect_fd);
  return 0;
}
src/ipc_msg.c
New file
@@ -0,0 +1,105 @@
#include "ipc_msg.h"
#include <string.h>
#include <stdlib.h>
/**
 * @brief 发送目标文件描述符
 * @param fd        传递信息的 UNIX 域 文件描述符
 * @param fd_to_send    待发送的文件描述符
 */
int  send_fd_args_sendmsg(int  fd, int fd_to_send, pid_t pid, void **ppdata, int len)
{
    struct iovec iov[1];
    struct msghdr msg;
  int control_len = 0;
  int ret = 0;
  memfd_data_st memfd_data;
    struct cmsghdr cm;            /* 这是辅助数据头部结构体,文件描述符就是通过这个结构体以及后面的数据部分发送的 */
   control_len = CMSG_SPACE(sizeof(memfd_data_st) + len);
   char * recv_buf = (char *)malloc(control_len);
   if(NULL != recv_buf)
   {
        return -1;
   }
    iov[0].iov_base = recv_buf;
    iov[0].iov_len = 1;
    msg.msg_name = NULL;
    msg.msg_namelen = 0;
    msg.msg_iov = iov;
    msg.msg_iovlen = 1;
  memfd_data.memfd = fd_to_send;
  memfd_data.pid = pid;
  memfd_data.data = *ppdata;
  memfd_data.len = len;
    cm.cmsg_len = control_len;    /* 辅助数据的字节数,包扩头部和真正的数据 */
    cm.cmsg_level = SOL_SOCKET;    /* 表示原始协议级别,与 setsockopt 的 level 参数相同 */
    cm.cmsg_type = SCM_RIGHTS;    /* 控制信息类型,SCM_RIGHTS 表示传送的内容是访问权 */
    *(memfd_data_st*)CMSG_DATA(&cm) = memfd_data;/* 设置真正数据部分为我们想发送的文件描述符 */
    msg.msg_control = &cm;        /* 设置辅助数据 */
    msg.msg_controllen = control_len;
    ret = sendmsg(fd, &msg, 0);
  free(recv_buf);
  return ret;
}
/**
 * @brief 发送目标文件描述符
 * @param fd        传递信息的 UNIX 域 文件描述符
 * @param fd_to_send    待发送的文件描述符
 */
int  send_fd_sendmsg(int  fd, memfd_data_st** ppmemfd_data)
{
  memfd_data_st *ptr_memfd_data = *ppmemfd_data;
  return send_fd_args(fd, ptr_memfd_data->memfd, ptr_memfd_data->pid, &ptr_memfd_data->data, ptr_memfd_data->len);
}
/**
 * @brief 接受文件描述符
 * @param fd 传递信息的 UNIX 域 文件描述符
 */
int recv_fd_recvmsg(int  fd, memfd_data_st** ppmemfd_data)
{
  struct iovec iov[1];
  struct msghdr msg;
  char recv_buf[MAX_LEN];
  int ret = -1;
  int control_len = 0;
  struct cmsghdr cm;
  iov[0].iov_base = recv_buf;
    iov[0].iov_len = 1;
    msg.msg_name = NULL;
    msg.msg_namelen = 0;
    msg.msg_iov = iov;
    msg.msg_iovlen = 1;
  control_len = CMSG_SPACE(sizeof(memfd_data_st) + MAX_LEN);
    msg.msg_control = &cm;
    msg.msg_controllen = control_len;
    ret = recvmsg(fd, &msg, 0);
  if(ret < 0)
  {
    ret = -1;
  }
  int off = offsetof(memfd_data_st, len);
  int len = *(int*)((char *)CMSG_DATA(&cm) + off);
  control_len = CMSG_SPACE(sizeof(memfd_data_st) + len);
  *ppmemfd_data = (memfd_data_st *)malloc(control_len);
  if(*ppmemfd_data == NULL)
  {
      return -1;
  }
  memset(*ppmemfd_data, 0, control_len);
  memcpy(*ppmemfd_data, CMSG_DATA(&cm),  control_len);
  return ret;
}
src/ipc_msg.h
New file
@@ -0,0 +1,47 @@
#ifndef IPC_MSG_H
#define IPC_MSG_H
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <unistd.h>
#ifdef __cplusplus
extern "C" {
#endif
#define UNIX_DOMAIN "/tmp/UNIX.domain"
#define MAX_LEN  4096
#define HELLO_MSG "hello_basic"
#define offsetof(TYPE, MEMBER) ((int) &((TYPE *)0)->MEMBER)
#define container_of(ptr, type, member) ({\
const typeof(((type *)0)->member) * __mptr = (ptr);\
(type *)((char *)__mptr - offsetof(type, member)); }
typedef struct memfd_data {
  pid_t pid;
  int memfd;
  void * data;
  int len;
} memfd_data_st;
int  send_fd(int  fd, memfd_data_st** ppmemfd_data);
/**
 * @brief 发送目标文件描述符
 * @param fd        传递信息的 UNIX 域 文件描述符
 * @param fd_to_send    待发送的文件描述符
 */
int  send_fd_args(int  fd, int fd_to_send, pid_t pid, void **ppdata, int len);
/**
 * @brief 接受文件描述符
 * @param fd 传递信息的 UNIX 域 文件描述符
 */
int recv_fd(int  fd, memfd_data_st** ppmemfd_data);
#ifdef __cplusplus
}
#endif
#endif
src/ipc_server.c
New file
@@ -0,0 +1,588 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <signal.h>
#include <sys/epoll.h>
#include <errno.h>
#include <sys/syscall.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/types.h>
#include <pthread.h>
#include "ipc_msg.h"
#define  MAX_EPOLL_EVENT_COUNT 1024
#define errExit(msg)    do { perror(msg); exit(EXIT_FAILURE); \
} while (0)
#define LISTENQ 1024
// task item in thread pool
struct task
{
    // file descriptor or user_data
    epoll_data_t data;
    // next task
    struct task* next;
};
struct user_data
{
  int fd;
  unsigned int n_size;
  char line[MAX_LEN];
};
//typedef int (*readfunc)(struct user_data*);
typedef int (*writefunc)(struct user_data*);
/* thread exec function */
void *readtask(void *args);
void *writetask(void *args);
// mutex lock to protect readhead/readtail
pthread_mutex_t r_mutex;
pthread_cond_t r_condl;
// mutex lock to protect writehead/writetail
pthread_mutex_t w_mutex;
pthread_cond_t w_condl;
struct task *readhead = NULL, *readtail = NULL;
struct task *writehead = NULL, *writetail = NULL;
//create epoll
int epfd,eventfd;
struct epoll_event   ev,events[LISTENQ];
int g_memfd = 0;
void handler(){
  printf("clean program start\n");
  //unlink(UNIX_DOMAIN);
  remove(UNIX_DOMAIN);
  printf("clean end.\n");
}
int shm_write(int fd)
{
  struct stat st;
  ssize_t len;
  int ret = 0;
  unsigned char* paddr;
  if (fstat (fd, &st))
    errExit ("fstat");
  len = st.st_size;
  char *content = "hello world!\n";
  ret =  basic_shm_mmap (fd, &paddr);
  if (ret < 0)
    errExit("basic_shm_mmap");
  memcpy(paddr, content, strlen(content));
  printf("length: %zu, atime: %lu.%lu\n", len, st.st_atim.tv_sec, st.st_atim.tv_nsec);
  return len;
}
void setnonblocking(int sock)
{
  int opts;
  opts = fcntl(sock, F_GETFL);
  if(opts<0)
   {
     perror("fcntl(sock,GETFL)");
     exit(1);
   }
  if(fcntl(sock, F_SETFL, opts | O_NONBLOCK)<0)
   {
    perror("fcntl(sock,SETFL,opts)");
    exit(1);
   }
}
int proc_memfd(struct user_data* rdata)
{
    int fd = rdata->fd;
    unsigned int n_size;
    char line[MAX_LEN]= {0};
    int n = 0;
    int nwrite = 0;
    int data_size = 0;
    memfd_data_st memfd_data = {0};
    if(rdata->n_size <= 0 || fd < 0)
    {
      return -1;
    }
    if(0 == strncmp(rdata->line, HELLO_MSG, strlen(HELLO_MSG)))
    {
      memfd_data.memfd = g_memfd;
      memfd_data.pid = getpid();
      memfd_data.data = NULL;
      memfd_data.len = 0;
      data_size = sizeof(memfd_data_st) + memfd_data.len;
      memcpy(line, &memfd_data, data_size);
    }
    else
    {
      return  -1;
    }
    n = data_size;
    while(n>0)
    {
        nwrite = write(rdata->fd, line+data_size-n,n);////ET
        if( nwrite < n )
        {
            if((nwrite==-1) && (errno != EAGAIN))
            {
                perror("write error");
            }
            if (errno == ECONNRESET)
            {
                close(rdata->fd);
                printf("[SERVER] Error: send responce failed: %s\n", strerror(errno));
            }
            else if (nwrite == 0)
            {
                close(rdata->fd);
                printf("[SERVER] Error: client closed connection.");
            }
            break;
        }
        n -= nwrite;
    }
    return nwrite;
}
/**
 * thread exec function
 */
void *readtask(void *args)
{
  int i = 0;
  int fd = -1;
  unsigned int n = 0;
  struct user_data *data = NULL;
  while (1)
   {
    // protect task queue (readhead/readtail)
     pthread_mutex_lock(&r_mutex);
     while(readhead == NULL)
      {
       // if condl false, will unlock mutex
       mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self());
       pthread_cond_wait(&r_condl, &r_mutex);
      }
     fd = readhead->data.fd;
      struct task *tmp = readhead;
      readhead = readhead->next;
      free(tmp);
      pthread_mutex_unlock(&r_mutex);
      mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd);
      data = (struct user_data *)malloc(sizeof(struct user_data));
      if(NULL == data)
      {
          perror("readtask malloc");
          return;
      }
      memset(data, 0 , sizeof(struct user_data));
      data->fd = fd;
      n=0;
      int nread = 0;
      while((nread = read(fd,data->line+n,MAX_LEN))>0)
      {
          n+=nread;
      }
      if((nread==-1) && (errno != EAGAIN))
      {
        perror("read error");
      }
      if(n < 0)
      {
        if (errno == ECONNRESET)
          {
            close(fd);
            mydebug("[SERVER] Error: readline failed: %s\n", strerror(errno));
          }
          else
          {
              mydebug("readline error \n");
          }
          if(data != NULL)
          {
              free(data);
              data = NULL;
          }
        }
        else if (n == 0)
        {
            close(fd);
            mydebug("[SERVER] Info: client closed connection.\n");
            if(data != NULL)
            {
              free(data);
              data = NULL;
            }
        }
        else
        {
            data->n_size = n;
            mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line);
            if (data->line[0] != '\0')
            {
                // modify monitored event to EPOLLOUT,  wait next loop to send respond
                ev.data.ptr = data;
                // Modify event to EPOLLOUT
                ev.events = EPOLLOUT | EPOLLET;
                // modify moditored fd event
                if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1)
                {
                  perror("epoll_ctl:mod");
                  if(data != NULL)
                  {
                    free(data);
                    data = NULL;
                  }
                }
            }
          }
      }
}
void *writetask(void *args)
{
    int n = 0;
    int nwrite = 0;
    int ret = 0;
    // data to wirte back to client
    struct user_data *rdata = NULL;
    while(1)
    {
        pthread_mutex_lock(&w_mutex);
        while(writehead == NULL)
        {
            // if condl false, will unlock mutex
            pthread_cond_wait(&w_condl, &w_mutex);
        }
        rdata = (struct user_data*)writehead->data.ptr;
        struct task* tmp = writehead;
        writehead = writehead->next;
        free(tmp);
        //echo("[SERVER] thread %d writetask before unlock\n", pthread_self());
        pthread_mutex_unlock(&w_mutex);
        //echo("[SERVER] thread %d writetask after unlock\n", pthread_self());
        if(rdata->fd < 0)
        {
            goto error;
        }
        mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd);
        writefunc wfunc = (writefunc)args;
        ret = wfunc(rdata);
        if(ret < 0)
        {
          goto error;
        }
        // delete data
        if (rdata->fd > 0)
        {
          // modify monitored event to EPOLLIN, wait next loop to receive data
          ev.data.fd = rdata->fd;
          // monitor in message, edge trigger
          //ev.events = EPOLLIN | EPOLLET;
          // modify moditored fd event
          if(epoll_ctl(epfd, EPOLL_CTL_DEL, rdata->fd, &ev)==-1)
          {
              perror("epoll_ctl:del");
          }
          close(rdata->fd);
        }
error:
        free(rdata);
    }
}
int main(int argc, char **argv)
{
  char *name;
  ssize_t  len;
  int lsn_fd, apt_fd;
  struct sockaddr_un srv_addr;
  struct sockaddr_un clt_addr;
  socklen_t clt_len;
  int ret;
  int i;
  //char recv_buf[MAX_LEN] = {0};
  //char send_buf[MAX_LEN] = {0};
  char line[MAX_LEN] = {0};
  pthread_t *read_tids = NULL;
  int read_thread_num = 2;
  pthread_t *write_tids = NULL;
  int write_thread_num = 2;
  struct task *new_task=NULL;
  struct user_data *rdata=NULL;
  if (argc < 3) {
    fprintf(stderr, "%s name size\n", argv[0]);
    exit(EXIT_FAILURE);
  }
  name = argv[1];
  len = atoi(argv[2]) * 1024LU * 1024LU * 1024LU;
  signal(SIGTERM,handler);
  g_memfd = basic_shm_create(name, len);
  ret = shm_write(g_memfd);
    if((read_tids = (pthread_t *)malloc(read_thread_num * sizeof(pthread_t))) == NULL)
    {
        perror("read_tids malloc fail\n");
    return -1;
    }
  // threads for reading thread pool
  for(i = 0; i < read_thread_num; i++)
  {
    pthread_create(&read_tids[i], NULL, readtask, NULL);
  }
    if((write_tids = (pthread_t *)malloc(write_thread_num * sizeof(pthread_t))) == NULL)
    {
        perror("write_tids  malloc fail\n");
    return -1;
    }
  // threads for writing thread pool
  for(i = 0; i < write_thread_num; i++)
  {
    pthread_create(&write_tids[i], NULL, writetask, proc_memfd);
  }
  epfd = epoll_create(MAX_EPOLL_EVENT_COUNT);
  //create socket to bind local IP and PORT
  lsn_fd = socket(AF_UNIX, SOCK_STREAM, 0);
  int opt = 0x1;
  setsockopt(lsn_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
  // set the descriptor as non-blocking
  setnonblocking(lsn_fd);
  ev.data.fd = lsn_fd;
  ev.events = EPOLLIN|EPOLLET;
  /*Control an epoll descriptor, $epfd, by requesting the operation op be performed on the target file descriptor, fd.
  $epfd is an epoll descriptor returned from epoll_create.
  $op is one of EPOLL_CTL_ADD, EPOLL_CTL_MOD or EPOLL_CTL_DEL.
  $fd is the file desciptor to be watched.
  $eventmask is a bitmask of events defined by EPOLLIN, EPOLLOUT, etc.
  When successful, epoll_ctl returns 0. When an error occurs, epoll_ctl returns -1 and errno is set appropriately.
  */
  if(epoll_ctl(epfd,EPOLL_CTL_ADD,lsn_fd,&ev)==-1)
  {
      perror("epoll_ctl:add");
  }
  if(lsn_fd < 0)
  {
    perror("can't create communication socket!");
    unlink(UNIX_DOMAIN);
    basic_shm_close(g_memfd);
    return 1;
  }
  //create local IP and PORT
  srv_addr.sun_family = AF_UNIX;
  strncpy(srv_addr.sun_path, UNIX_DOMAIN, sizeof(srv_addr.sun_path) - 1);
  //unlink(UNIX_DOMAIN);
  //bind sockfd and sockaddr
  ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
  if(ret == -1)
  {
    perror("can't bind local sockaddr!");
    close(lsn_fd);
    unlink(UNIX_DOMAIN);
    basic_shm_close(g_memfd);
    return 1;
  }
  //listen lsn_fd, try listen 5
  ret = listen(lsn_fd, LISTENQ);
  if(ret == -1)
  {
    perror("can't listen client connect request");
    close(lsn_fd);
    unlink(UNIX_DOMAIN);
    basic_shm_close(g_memfd);
    return 1;
  }
  clt_len = sizeof(clt_addr);
  while(1)
  {
    int nfds = epoll_wait(epfd,events,1024,100);
    int i=0;
    for(i=0;i<nfds;++i)
    {
      if(events[i].data.fd == lsn_fd)
      {
          // accept the client connection
                while((apt_fd=accept(lsn_fd, (struct sockaddr *)&clt_addr, &clt_len))>0)
                {
            setnonblocking(apt_fd);
            //char *str = inet_ntoa(clt_addr.sun_path);
            //printf("[SERVER] connect from %s \n", str);
            ev.data.fd = apt_fd;
            // monitor in message, edge trigger
            ev.events = EPOLLIN | EPOLLET;
            // add fd to epoll queue
            if(epoll_ctl(epfd,EPOLL_CTL_ADD,apt_fd,&ev)==-1)
            {
                perror("epoll_ctl:add");
                continue;
                //exit(EXIT_FAILURE);
            }
            }
                 if( apt_fd == -1)
                {
                     if((errno != EAGAIN )
                        && (errno!= ECONNABORTED)
                         && (errno != EPROTO)
                         && (errno != EINTR))
                     {
                        perror("accept");
                     }
                 }
                 continue;
      }
      else if (events[i].events & EPOLLIN)
      //write数据
      {
        printf("EPOLLIN\n");
        if( (eventfd = events[i].data.fd) < 0 )
        {
          continue;
        }
          printf("[SERVER] put task %d to read queue\n", events[i].data.fd);
          new_task = (struct task *)malloc(sizeof(struct task));
          if(NULL == new_task)
          {
            goto error;
          }
          new_task->data.fd= eventfd;
          new_task->next = NULL;
          // protect task queue (readhead/readtail)
          pthread_mutex_lock(&r_mutex);
          // the queue is empty
          if(readhead == NULL)
          {
             readhead = new_task;
             readtail = new_task;
          }
          // queue is not empty
          else
          {
             readtail->next = new_task;
             readtail = new_task;
          }
           // trigger readtask thread
           pthread_cond_broadcast(&r_condl);
           pthread_mutex_unlock(&r_mutex);
      }
      else if (events[i].events & EPOLLOUT)
      {
          rdata = (struct user_data *)events[i].data.ptr;
          if ( NULL == rdata )
          {
              continue;
          }
          printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd);
          new_task = malloc(sizeof(struct task));
          if(NULL == new_task)
          {
            goto error;
          }
          new_task->data.ptr = (struct user_data*)events[i].data.ptr;
          new_task->next = NULL;
          pthread_mutex_lock(&w_mutex);
          // the queue is empty
          if (writehead == NULL)
          {
              writehead = new_task;
              writetail = new_task;
          }
          // queue is not empty
          else
          {
              writetail->next = new_task;
              writetail = new_task;
          }
          // trigger writetask thread
          pthread_cond_broadcast(&w_condl);
          pthread_mutex_unlock(&w_mutex);
      }
      else
      {
        printf("[SERVER] Error: unknown epoll event");
      }
    }
  }
error:
  close(apt_fd);
  close(lsn_fd);
  unlink(UNIX_DOMAIN);
  basic_shm_close(g_memfd);
  return 0;
}