#include <stdio.h>
|
#include <stdlib.h>
|
#include <string.h>
|
#include <sys/socket.h>
|
#include <sys/un.h>
|
#include <unistd.h>
|
#include <sys/epoll.h>
|
#include <errno.h>
|
#include <sys/syscall.h>
|
#include <sys/mman.h>
|
#include <sys/stat.h>
|
#include <fcntl.h>
|
#include <sys/types.h>
|
#include <pthread.h>
|
#include "ipc_msg.h"
|
|
#define MAX_EPOLL_EVENT_COUNT 1024
|
#define LISTENQ 1024
|
|
// task item in thread pool
|
struct task
|
{
|
// file descriptor or user_data
|
epoll_data_t data;
|
// next task
|
struct task* next;
|
};
|
|
//typedef int (*readfunc)(struct user_data*);
|
typedef int (*writefunc)(struct user_data*);
|
/* thread exec function */
|
static void *readtask(void *args);
|
static void *writetask(void *args);
|
|
// mutex lock to protect readhead/readtail
|
pthread_mutex_t r_mutex;
|
pthread_cond_t r_condl;
|
|
// mutex lock to protect writehead/writetail
|
pthread_mutex_t w_mutex;
|
pthread_cond_t w_condl;
|
struct task *readhead = NULL, *readtail = NULL;
|
struct task *writehead = NULL, *writetail = NULL;
|
|
//create epoll
|
int epfd,eventfd;
|
struct epoll_event ev,events[LISTENQ];
|
|
|
static void setnonblocking(int sock)
|
{
|
int opts;
|
|
opts = fcntl(sock, F_GETFL);
|
if(opts<0)
|
{
|
perror("fcntl(sock,GETFL)");
|
exit(1);
|
}
|
|
if(fcntl(sock, F_SETFL, opts | O_NONBLOCK)<0)
|
{
|
perror("fcntl(sock,SETFL,opts)");
|
exit(1);
|
}
|
}
|
|
|
/**
|
* thread exec function
|
*/
|
static void *readtask(void *args)
|
{
|
int i = 0;
|
int fd = -1;
|
unsigned int n = 0;
|
|
struct user_data *data = NULL;
|
|
while (1)
|
{
|
// protect task queue (readhead/readtail)
|
pthread_mutex_lock(&r_mutex);
|
|
while(readhead == NULL)
|
{
|
// if condl false, will unlock mutex
|
mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self());
|
pthread_cond_wait(&r_condl, &r_mutex);
|
}
|
|
fd = readhead->data.fd;
|
|
struct task *tmp = readhead;
|
readhead = readhead->next;
|
free(tmp);
|
pthread_mutex_unlock(&r_mutex);
|
|
mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd);
|
|
data = (struct user_data *)malloc(sizeof(struct user_data));
|
if(NULL == data)
|
{
|
perror("readtask malloc");
|
return;
|
}
|
|
memset(data, 0 , sizeof(struct user_data));
|
data->fd = fd;
|
|
n=0;
|
int nread = 0;
|
while((nread = read(fd,data->line+n,MAX_LEN))>0)
|
{
|
n+=nread;
|
}
|
|
if((nread==-1) && (errno != EAGAIN))
|
{
|
perror("read error");
|
}
|
|
if(n < 0)
|
{
|
if (errno == ECONNRESET)
|
{
|
close(fd);
|
mydebug("[SERVER] Error: readline failed: %s\n", strerror(errno));
|
}
|
else
|
{
|
mydebug("readline error \n");
|
}
|
if(data != NULL)
|
{
|
free(data);
|
data = NULL;
|
}
|
}
|
else if (n == 0)
|
{
|
close(fd);
|
mydebug("[SERVER] Info: client closed connection.\n");
|
if(data != NULL)
|
{
|
free(data);
|
data = NULL;
|
}
|
}
|
else
|
{
|
data->n_size = n;
|
|
mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line);
|
if (data->line[0] != '\0')
|
{
|
// modify monitored event to EPOLLOUT, wait next loop to send respond
|
ev.data.ptr = data;
|
// Modify event to EPOLLOUT
|
ev.events = EPOLLOUT | EPOLLET;
|
// modify moditored fd event
|
if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1)
|
{
|
perror("epoll_ctl:mod");
|
if(data != NULL)
|
{
|
free(data);
|
data = NULL;
|
}
|
}
|
}
|
}
|
}
|
}
|
|
static void *writetask(void *args)
|
{
|
int n = 0;
|
int nwrite = 0;
|
int ret = 0;
|
|
// data to wirte back to client
|
struct user_data *rdata = NULL;
|
while(1)
|
{
|
pthread_mutex_lock(&w_mutex);
|
while(writehead == NULL)
|
{
|
// if condl false, will unlock mutex
|
pthread_cond_wait(&w_condl, &w_mutex);
|
}
|
rdata = (struct user_data*)writehead->data.ptr;
|
struct task* tmp = writehead;
|
writehead = writehead->next;
|
free(tmp);
|
|
//echo("[SERVER] thread %d writetask before unlock\n", pthread_self());
|
pthread_mutex_unlock(&w_mutex);
|
//echo("[SERVER] thread %d writetask after unlock\n", pthread_self());
|
|
if(rdata->fd < 0)
|
{
|
goto error;
|
}
|
mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd);
|
|
writefunc wfunc = (writefunc)args;
|
ret = wfunc(rdata);
|
if(ret < 0)
|
{
|
goto error;
|
}
|
|
// delete data
|
if (rdata->fd > 0)
|
{
|
// modify monitored event to EPOLLIN, wait next loop to receive data
|
ev.data.fd = rdata->fd;
|
// monitor in message, edge trigger
|
//ev.events = EPOLLIN | EPOLLET;
|
// modify moditored fd event
|
if(epoll_ctl(epfd, EPOLL_CTL_DEL, rdata->fd, &ev)==-1)
|
{
|
perror("epoll_ctl:del");
|
}
|
|
close(rdata->fd);
|
}
|
|
error:
|
free(rdata);
|
}
|
}
|
|
int basic_create_ipc_server(char * unix_domain_path)
|
{
|
pthread_t *read_tids = NULL;
|
pthread_t *write_tids = NULL;
|
int lsn_fd, apt_fd;
|
struct sockaddr_un srv_addr;
|
struct sockaddr_un clt_addr;
|
socklen_t clt_len;
|
int ret;
|
int i;
|
char line[MAX_LEN] = {0};
|
|
struct task *new_task=NULL;
|
struct user_data *rdata=NULL;
|
|
if((read_tids = (pthread_t *)malloc(READ_THREAD_NUM * sizeof(pthread_t))) == NULL)
|
{
|
perror("read_tids malloc fail\n");
|
return -1;
|
}
|
|
// threads for reading thread pool
|
for(i = 0; i < READ_THREAD_NUM; i++)
|
{
|
pthread_create(&read_tids[i], NULL, readtask, NULL);
|
}
|
|
if((write_tids = (pthread_t *)malloc(WRITE_THREAD_NUM * sizeof(pthread_t))) == NULL)
|
{
|
perror("write_tids malloc fail\n");
|
return -1;
|
}
|
|
// threads for writing thread pool
|
for(i = 0; i < WRITE_THREAD_NUM; i++)
|
{
|
pthread_create(&write_tids[i], NULL, writetask, proc_memfd);
|
}
|
|
epfd = epoll_create(MAX_EPOLL_EVENT_COUNT);
|
|
//create socket to bind local IP and PORT
|
lsn_fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
|
|
// set the descriptor as non-blocking
|
setnonblocking(lsn_fd);
|
|
ev.data.fd = lsn_fd;
|
ev.events = EPOLLIN|EPOLLET;
|
|
/*Control an epoll descriptor, $epfd, by requesting the operation op be performed on the target file descriptor, fd.
|
|
$epfd is an epoll descriptor returned from epoll_create.
|
$op is one of EPOLL_CTL_ADD, EPOLL_CTL_MOD or EPOLL_CTL_DEL.
|
$fd is the file desciptor to be watched.
|
$eventmask is a bitmask of events defined by EPOLLIN, EPOLLOUT, etc.
|
|
When successful, epoll_ctl returns 0. When an error occurs, epoll_ctl returns -1 and errno is set appropriately.
|
*/
|
if(epoll_ctl(epfd,EPOLL_CTL_ADD,lsn_fd,&ev)==-1)
|
{
|
perror("epoll_ctl:add");
|
}
|
|
if(lsn_fd < 0)
|
{
|
perror("can't create communication socket!");
|
unlink(UNIX_DOMAIN);
|
return 1;
|
}
|
|
//create unix domain
|
if(unix_domain_path == NULL)
|
{
|
unix_domain_path = UNIX_DOMAIN;
|
}
|
|
srv_addr.sun_family = AF_UNIX;
|
strncpy(srv_addr.sun_path, unix_domain_path, sizeof(srv_addr.sun_path) - 1);
|
//unlink(UNIX_DOMAIN);
|
|
//bind sockfd and sockaddr
|
ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
|
if(ret == -1)
|
{
|
perror("can't bind local sockaddr!");
|
close(lsn_fd);
|
unlink(UNIX_DOMAIN);
|
return 1;
|
}
|
|
//listen lsn_fd
|
|
ret = listen(lsn_fd, LISTENQ);
|
if(ret == -1)
|
{
|
perror("can't listen client connect request");
|
close(lsn_fd);
|
unlink(UNIX_DOMAIN);
|
|
return 1;
|
}
|
|
clt_len = sizeof(clt_addr);
|
while(1)
|
{
|
int nfds = epoll_wait(epfd,events,1024,100);
|
int i=0;
|
for(i=0;i<nfds;++i)
|
{
|
if(events[i].data.fd == lsn_fd)
|
{
|
// accept the client connection
|
while((apt_fd=accept(lsn_fd, (struct sockaddr *)&clt_addr, &clt_len))>0)
|
{
|
setnonblocking(apt_fd);
|
//char *str = inet_ntoa(clt_addr.sun_path);
|
//printf("[SERVER] connect from %s \n", str);
|
ev.data.fd = apt_fd;
|
// monitor in message, edge trigger
|
ev.events = EPOLLIN | EPOLLET;
|
// add fd to epoll queue
|
if(epoll_ctl(epfd,EPOLL_CTL_ADD,apt_fd,&ev)==-1)
|
{
|
perror("epoll_ctl:add");
|
continue;
|
//exit(EXIT_FAILURE);
|
}
|
}
|
if( apt_fd == -1)
|
{
|
if((errno != EAGAIN )
|
&& (errno!= ECONNABORTED)
|
&& (errno != EPROTO)
|
&& (errno != EINTR))
|
{
|
perror("accept");
|
|
}
|
}
|
continue;
|
}
|
else if (events[i].events & EPOLLIN)
|
//write数据
|
{
|
printf("EPOLLIN\n");
|
if( (eventfd = events[i].data.fd) < 0 )
|
{
|
continue;
|
}
|
|
printf("[SERVER] put task %d to read queue\n", events[i].data.fd);
|
|
new_task = (struct task *)malloc(sizeof(struct task));
|
if(NULL == new_task)
|
{
|
goto error;
|
}
|
new_task->data.fd= eventfd;
|
new_task->next = NULL;
|
|
// protect task queue (readhead/readtail)
|
pthread_mutex_lock(&r_mutex);
|
|
// the queue is empty
|
if(readhead == NULL)
|
{
|
readhead = new_task;
|
readtail = new_task;
|
}
|
// queue is not empty
|
else
|
{
|
readtail->next = new_task;
|
readtail = new_task;
|
}
|
|
// trigger readtask thread
|
pthread_cond_broadcast(&r_condl);
|
pthread_mutex_unlock(&r_mutex);
|
}
|
else if (events[i].events & EPOLLOUT)
|
{
|
rdata = (struct user_data *)events[i].data.ptr;
|
if ( NULL == rdata )
|
{
|
continue;
|
}
|
printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd);
|
|
new_task = malloc(sizeof(struct task));
|
if(NULL == new_task)
|
{
|
goto error;
|
}
|
new_task->data.ptr = (struct user_data*)events[i].data.ptr;
|
new_task->next = NULL;
|
|
pthread_mutex_lock(&w_mutex);
|
|
// the queue is empty
|
if (writehead == NULL)
|
{
|
writehead = new_task;
|
writetail = new_task;
|
}
|
// queue is not empty
|
else
|
{
|
writetail->next = new_task;
|
writetail = new_task;
|
}
|
|
// trigger writetask thread
|
pthread_cond_broadcast(&w_condl);
|
|
pthread_mutex_unlock(&w_mutex);
|
}
|
else
|
{
|
printf("[SERVER] Error: unknown epoll event");
|
}
|
}
|
}
|
|
error:
|
close(apt_fd);
|
close(lsn_fd);
|
unlink(UNIX_DOMAIN);
|
}
|