From 54951dfb930bea890aef14be02236f81b3a19f2e Mon Sep 17 00:00:00 2001
From: cheliequan <liequanche@126.com>
Date: 星期二, 27 十二月 2022 10:23:43 +0800
Subject: [PATCH] 更新消息通知库,使用basic_create_ipc_server函数创建消息监听服务器
---
src/memfd.c | 1
src/ipc_msg.h | 21 +
src/ipc_server_lib.c | 466 +++++++++++++++++++++++++++
src/ipc_msg.c | 4
src/Makefile | 6
src/ipc_server.c | 483 +---------------------------
6 files changed, 508 insertions(+), 473 deletions(-)
diff --git a/src/Makefile b/src/Makefile
index 772b875..d870100 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -17,8 +17,10 @@
$(CC) $(CFLAGS) $(INCLUDE) -fPIC -c shmht_mytests.c
memfd.o:
$(CC) $(CFLAGS) $(INCLUDE) -fPIC -c memfd.c
-ipc_server:
- $(CC) $(CFLAGS) $(INCLUDE) -fPIC -lpthread -o $@ ipc_server.c ipc_msg.c memfd.c
+ipc_server:libipc_server.so
+ $(CC) $(CFLAGS) $(INCLUDE) -fPIC -pthread -o $@ ipc_server.c -lipc_server -L .
+libipc_server.so:
+ $(CC) -shared -fPIC -o $@ ipc_server_lib.c ipc_msg.c memfd.c
ipc_client:
$(CC) $(CFLAGS) $(INCLUDE) -fPIC -o $@ ipc_client.c ipc_msg.c memfd.c
clean:
diff --git a/src/ipc_msg.c b/src/ipc_msg.c
index 0530fd4..804eafa 100644
--- a/src/ipc_msg.c
+++ b/src/ipc_msg.c
@@ -57,7 +57,7 @@
int send_fd_sendmsg(int fd, memfd_data_st** ppmemfd_data)
{
memfd_data_st *ptr_memfd_data = *ppmemfd_data;
- return send_fd_args(fd, ptr_memfd_data->memfd, ptr_memfd_data->pid, &ptr_memfd_data->data, ptr_memfd_data->len);
+ return send_fd_args_sendmsg(fd, ptr_memfd_data->memfd, ptr_memfd_data->pid, &ptr_memfd_data->data, ptr_memfd_data->len);
}
/**
@@ -102,4 +102,4 @@
memset(*ppmemfd_data, 0, control_len);
memcpy(*ppmemfd_data, CMSG_DATA(&cm), control_len);
return ret;
-}
\ No newline at end of file
+}
diff --git a/src/ipc_msg.h b/src/ipc_msg.h
index e7ad225..5b489c3 100644
--- a/src/ipc_msg.h
+++ b/src/ipc_msg.h
@@ -11,6 +11,8 @@
#define UNIX_DOMAIN "/tmp/UNIX.domain"
+#define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \
+} while (0)
#define MAX_LEN 4096
#define HELLO_MSG "hello_basic"
@@ -26,20 +28,33 @@
int len;
} memfd_data_st;
-int send_fd(int fd, memfd_data_st** ppmemfd_data);
+struct user_data
+{
+ int fd;
+ unsigned int n_size;
+ char line[MAX_LEN];
+};
+
+#define READ_THREAD_NUM 2
+#define WRITE_THREAD_NUM 2
+
+int proc_memfd(struct user_data* rdata);
+int basic_create_ipc_server(char * unix_domain_path);
+
+int send_fd_sendmsg(int fd, memfd_data_st** ppmemfd_data);
/**
* @brief 鍙戦�佺洰鏍囨枃浠舵弿杩扮
* @param fd 浼犻�掍俊鎭殑 UNIX 鍩� 鏂囦欢鎻忚堪绗�
* @param fd_to_send 寰呭彂閫佺殑鏂囦欢鎻忚堪绗�
*/
-int send_fd_args(int fd, int fd_to_send, pid_t pid, void **ppdata, int len);
+int send_fd_args_sendmsg(int fd, int fd_to_send, pid_t pid, void **ppdata, int len);
/**
* @brief 鎺ュ彈鏂囦欢鎻忚堪绗�
* @param fd 浼犻�掍俊鎭殑 UNIX 鍩� 鏂囦欢鎻忚堪绗�
*/
-int recv_fd(int fd, memfd_data_st** ppmemfd_data);
+int recv_fd_recvmsg(int fd, memfd_data_st** ppmemfd_data);
#ifdef __cplusplus
}
diff --git a/src/ipc_server.c b/src/ipc_server.c
index 45f6082..99418c7 100644
--- a/src/ipc_server.c
+++ b/src/ipc_server.c
@@ -1,61 +1,14 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#include <sys/socket.h>
-#include <sys/un.h>
#include <unistd.h>
-#include <signal.h>
-#include <sys/epoll.h>
#include <errno.h>
-#include <sys/syscall.h>
-#include <sys/mman.h>
-#include <sys/stat.h>
-#include <fcntl.h>
+#include <signal.h>
#include <sys/types.h>
-#include <pthread.h>
+#include <sys/stat.h>
+#include "memfd.h"
#include "ipc_msg.h"
-
-#define MAX_EPOLL_EVENT_COUNT 1024
-#define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \
-} while (0)
-#define LISTENQ 1024
-
-// task item in thread pool
-struct task
-{
- // file descriptor or user_data
- epoll_data_t data;
- // next task
- struct task* next;
-};
-
-struct user_data
-{
- int fd;
- unsigned int n_size;
- char line[MAX_LEN];
-};
-
-//typedef int (*readfunc)(struct user_data*);
-typedef int (*writefunc)(struct user_data*);
-/* thread exec function */
-void *readtask(void *args);
-void *writetask(void *args);
-
-// mutex lock to protect readhead/readtail
-pthread_mutex_t r_mutex;
-pthread_cond_t r_condl;
-
-// mutex lock to protect writehead/writetail
-pthread_mutex_t w_mutex;
-pthread_cond_t w_condl;
-struct task *readhead = NULL, *readtail = NULL;
-struct task *writehead = NULL, *writetail = NULL;
-
-//create epoll
-int epfd,eventfd;
-struct epoll_event ev,events[LISTENQ];
int g_memfd = 0;
@@ -89,23 +42,6 @@
return len;
}
-void setnonblocking(int sock)
-{
- int opts;
-
- opts = fcntl(sock, F_GETFL);
- if(opts<0)
- {
- perror("fcntl(sock,GETFL)");
- exit(1);
- }
-
- if(fcntl(sock, F_SETFL, opts | O_NONBLOCK)<0)
- {
- perror("fcntl(sock,SETFL,opts)");
- exit(1);
- }
-}
int proc_memfd(struct user_data* rdata)
@@ -166,423 +102,38 @@
}
-
-/**
- * thread exec function
- */
-void *readtask(void *args)
-{
- int i = 0;
- int fd = -1;
- unsigned int n = 0;
-
- struct user_data *data = NULL;
-
- while (1)
- {
- // protect task queue (readhead/readtail)
- pthread_mutex_lock(&r_mutex);
-
- while(readhead == NULL)
- {
- // if condl false, will unlock mutex
- mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self());
- pthread_cond_wait(&r_condl, &r_mutex);
- }
-
- fd = readhead->data.fd;
-
- struct task *tmp = readhead;
- readhead = readhead->next;
- free(tmp);
- pthread_mutex_unlock(&r_mutex);
-
- mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd);
-
- data = (struct user_data *)malloc(sizeof(struct user_data));
- if(NULL == data)
- {
- perror("readtask malloc");
- return;
- }
-
- memset(data, 0 , sizeof(struct user_data));
- data->fd = fd;
-
- n=0;
- int nread = 0;
- while((nread = read(fd,data->line+n,MAX_LEN))>0)
- {
- n+=nread;
- }
-
- if((nread==-1) && (errno != EAGAIN))
- {
- perror("read error");
- }
-
- if(n < 0)
- {
- if (errno == ECONNRESET)
- {
- close(fd);
- mydebug("[SERVER] Error: readline failed: %s\n", strerror(errno));
- }
- else
- {
- mydebug("readline error \n");
- }
- if(data != NULL)
- {
- free(data);
- data = NULL;
- }
- }
- else if (n == 0)
- {
- close(fd);
- mydebug("[SERVER] Info: client closed connection.\n");
- if(data != NULL)
- {
- free(data);
- data = NULL;
- }
- }
- else
- {
- data->n_size = n;
-
- mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line);
- if (data->line[0] != '\0')
- {
- // modify monitored event to EPOLLOUT, wait next loop to send respond
- ev.data.ptr = data;
- // Modify event to EPOLLOUT
- ev.events = EPOLLOUT | EPOLLET;
- // modify moditored fd event
- if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1)
- {
- perror("epoll_ctl:mod");
- if(data != NULL)
- {
- free(data);
- data = NULL;
- }
- }
- }
- }
- }
-}
-
-void *writetask(void *args)
-{
- int n = 0;
- int nwrite = 0;
- int ret = 0;
-
- // data to wirte back to client
- struct user_data *rdata = NULL;
- while(1)
- {
- pthread_mutex_lock(&w_mutex);
- while(writehead == NULL)
- {
- // if condl false, will unlock mutex
- pthread_cond_wait(&w_condl, &w_mutex);
- }
- rdata = (struct user_data*)writehead->data.ptr;
- struct task* tmp = writehead;
- writehead = writehead->next;
- free(tmp);
-
- //echo("[SERVER] thread %d writetask before unlock\n", pthread_self());
- pthread_mutex_unlock(&w_mutex);
- //echo("[SERVER] thread %d writetask after unlock\n", pthread_self());
-
- if(rdata->fd < 0)
- {
- goto error;
- }
- mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd);
-
- writefunc wfunc = (writefunc)args;
- ret = wfunc(rdata);
- if(ret < 0)
- {
- goto error;
- }
-
- // delete data
- if (rdata->fd > 0)
- {
- // modify monitored event to EPOLLIN, wait next loop to receive data
- ev.data.fd = rdata->fd;
- // monitor in message, edge trigger
- //ev.events = EPOLLIN | EPOLLET;
- // modify moditored fd event
- if(epoll_ctl(epfd, EPOLL_CTL_DEL, rdata->fd, &ev)==-1)
- {
- perror("epoll_ctl:del");
- }
-
- close(rdata->fd);
- }
-
-error:
- free(rdata);
- }
-}
-
-
+/*define you our proc_memfd funtion to send your message to other processes锛宱r just put it in shm锛�
+ notice the file description fd and the pid of creator to other*/
int main(int argc, char **argv)
{
char *name;
ssize_t len;
- int lsn_fd, apt_fd;
- struct sockaddr_un srv_addr;
- struct sockaddr_un clt_addr;
- socklen_t clt_len;
- int ret;
- int i;
- //char recv_buf[MAX_LEN] = {0};
- //char send_buf[MAX_LEN] = {0};
- char line[MAX_LEN] = {0};
- pthread_t *read_tids = NULL;
- int read_thread_num = 2;
- pthread_t *write_tids = NULL;
- int write_thread_num = 2;
- struct task *new_task=NULL;
- struct user_data *rdata=NULL;
-
+ char * unixdomain = NULL;
+ int ret = 0;
if (argc < 3) {
- fprintf(stderr, "%s name size\n", argv[0]);
+ fprintf(stderr, "%s name size [unix domain path]\n", argv[0]);
exit(EXIT_FAILURE);
}
name = argv[1];
len = atoi(argv[2]) * 1024LU * 1024LU * 1024LU;
- signal(SIGTERM,handler);
+ unixdomain = argv[3];
+ signal(SIGTERM,handler);
+ signal(SIGABRT,handler);
+ signal(SIGSEGV,handler);
+
+
g_memfd = basic_shm_create(name, len);
ret = shm_write(g_memfd);
-
- if((read_tids = (pthread_t *)malloc(read_thread_num * sizeof(pthread_t))) == NULL)
- {
- perror("read_tids malloc fail\n");
- return -1;
- }
-
- // threads for reading thread pool
- for(i = 0; i < read_thread_num; i++)
+ ret = basic_create_ipc_server(unixdomain);
+ if(ret < 0)
{
- pthread_create(&read_tids[i], NULL, readtask, NULL);
+ printf("failed to create ipc_server\n");
}
- if((write_tids = (pthread_t *)malloc(write_thread_num * sizeof(pthread_t))) == NULL)
- {
- perror("write_tids malloc fail\n");
- return -1;
- }
-
- // threads for writing thread pool
- for(i = 0; i < write_thread_num; i++)
- {
- pthread_create(&write_tids[i], NULL, writetask, proc_memfd);
- }
-
- epfd = epoll_create(MAX_EPOLL_EVENT_COUNT);
-
- //create socket to bind local IP and PORT
- lsn_fd = socket(AF_UNIX, SOCK_STREAM, 0);
-
- int opt = 0x1;
- setsockopt(lsn_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
-
- // set the descriptor as non-blocking
- setnonblocking(lsn_fd);
-
- ev.data.fd = lsn_fd;
- ev.events = EPOLLIN|EPOLLET;
-
- /*Control an epoll descriptor, $epfd, by requesting the operation op be performed on the target file descriptor, fd.
-
- $epfd is an epoll descriptor returned from epoll_create.
- $op is one of EPOLL_CTL_ADD, EPOLL_CTL_MOD or EPOLL_CTL_DEL.
- $fd is the file desciptor to be watched.
- $eventmask is a bitmask of events defined by EPOLLIN, EPOLLOUT, etc.
-
- When successful, epoll_ctl returns 0. When an error occurs, epoll_ctl returns -1 and errno is set appropriately.
- */
- if(epoll_ctl(epfd,EPOLL_CTL_ADD,lsn_fd,&ev)==-1)
- {
- perror("epoll_ctl:add");
- }
-
- if(lsn_fd < 0)
- {
- perror("can't create communication socket!");
- unlink(UNIX_DOMAIN);
- basic_shm_close(g_memfd);
- return 1;
- }
-
- //create local IP and PORT
- srv_addr.sun_family = AF_UNIX;
- strncpy(srv_addr.sun_path, UNIX_DOMAIN, sizeof(srv_addr.sun_path) - 1);
- //unlink(UNIX_DOMAIN);
-
- //bind sockfd and sockaddr
- ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
- if(ret == -1)
- {
- perror("can't bind local sockaddr!");
- close(lsn_fd);
- unlink(UNIX_DOMAIN);
- basic_shm_close(g_memfd);
- return 1;
- }
-
- //listen lsn_fd, try listen 5
-
- ret = listen(lsn_fd, LISTENQ);
- if(ret == -1)
- {
- perror("can't listen client connect request");
- close(lsn_fd);
- unlink(UNIX_DOMAIN);
- basic_shm_close(g_memfd);
-
- return 1;
- }
-
- clt_len = sizeof(clt_addr);
- while(1)
- {
- int nfds = epoll_wait(epfd,events,1024,100);
- int i=0;
- for(i=0;i<nfds;++i)
- {
- if(events[i].data.fd == lsn_fd)
- {
- // accept the client connection
- while((apt_fd=accept(lsn_fd, (struct sockaddr *)&clt_addr, &clt_len))>0)
- {
- setnonblocking(apt_fd);
- //char *str = inet_ntoa(clt_addr.sun_path);
- //printf("[SERVER] connect from %s \n", str);
- ev.data.fd = apt_fd;
- // monitor in message, edge trigger
- ev.events = EPOLLIN | EPOLLET;
- // add fd to epoll queue
- if(epoll_ctl(epfd,EPOLL_CTL_ADD,apt_fd,&ev)==-1)
- {
- perror("epoll_ctl:add");
- continue;
- //exit(EXIT_FAILURE);
- }
- }
- if( apt_fd == -1)
- {
- if((errno != EAGAIN )
- && (errno!= ECONNABORTED)
- && (errno != EPROTO)
- && (errno != EINTR))
- {
- perror("accept");
-
- }
- }
- continue;
- }
- else if (events[i].events & EPOLLIN)
- //write鏁版嵁
- {
- printf("EPOLLIN\n");
- if( (eventfd = events[i].data.fd) < 0 )
- {
- continue;
- }
-
- printf("[SERVER] put task %d to read queue\n", events[i].data.fd);
-
- new_task = (struct task *)malloc(sizeof(struct task));
- if(NULL == new_task)
- {
- goto error;
- }
- new_task->data.fd= eventfd;
- new_task->next = NULL;
-
- // protect task queue (readhead/readtail)
- pthread_mutex_lock(&r_mutex);
-
- // the queue is empty
- if(readhead == NULL)
- {
- readhead = new_task;
- readtail = new_task;
- }
- // queue is not empty
- else
- {
- readtail->next = new_task;
- readtail = new_task;
- }
-
- // trigger readtask thread
- pthread_cond_broadcast(&r_condl);
- pthread_mutex_unlock(&r_mutex);
- }
- else if (events[i].events & EPOLLOUT)
- {
- rdata = (struct user_data *)events[i].data.ptr;
- if ( NULL == rdata )
- {
- continue;
- }
- printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd);
-
- new_task = malloc(sizeof(struct task));
- if(NULL == new_task)
- {
- goto error;
- }
- new_task->data.ptr = (struct user_data*)events[i].data.ptr;
- new_task->next = NULL;
-
- pthread_mutex_lock(&w_mutex);
-
- // the queue is empty
- if (writehead == NULL)
- {
- writehead = new_task;
- writetail = new_task;
- }
- // queue is not empty
- else
- {
- writetail->next = new_task;
- writetail = new_task;
- }
-
- // trigger writetask thread
- pthread_cond_broadcast(&w_condl);
-
- pthread_mutex_unlock(&w_mutex);
- }
- else
- {
- printf("[SERVER] Error: unknown epoll event");
- }
- }
- }
-
-error:
- close(apt_fd);
- close(lsn_fd);
- unlink(UNIX_DOMAIN);
basic_shm_close(g_memfd);
return 0;
}
diff --git a/src/ipc_server_lib.c b/src/ipc_server_lib.c
new file mode 100644
index 0000000..41c22ee
--- /dev/null
+++ b/src/ipc_server_lib.c
@@ -0,0 +1,466 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <unistd.h>
+#include <sys/epoll.h>
+#include <errno.h>
+#include <sys/syscall.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <sys/types.h>
+#include <pthread.h>
+#include "ipc_msg.h"
+
+#define MAX_EPOLL_EVENT_COUNT 1024
+#define LISTENQ 1024
+
+// task item in thread pool
+struct task
+{
+ // file descriptor or user_data
+ epoll_data_t data;
+ // next task
+ struct task* next;
+};
+
+//typedef int (*readfunc)(struct user_data*);
+typedef int (*writefunc)(struct user_data*);
+/* thread exec function */
+static void *readtask(void *args);
+static void *writetask(void *args);
+
+// mutex lock to protect readhead/readtail
+pthread_mutex_t r_mutex;
+pthread_cond_t r_condl;
+
+// mutex lock to protect writehead/writetail
+pthread_mutex_t w_mutex;
+pthread_cond_t w_condl;
+struct task *readhead = NULL, *readtail = NULL;
+struct task *writehead = NULL, *writetail = NULL;
+
+//create epoll
+int epfd,eventfd;
+struct epoll_event ev,events[LISTENQ];
+
+
+static void setnonblocking(int sock)
+{
+ int opts;
+
+ opts = fcntl(sock, F_GETFL);
+ if(opts<0)
+ {
+ perror("fcntl(sock,GETFL)");
+ exit(1);
+ }
+
+ if(fcntl(sock, F_SETFL, opts | O_NONBLOCK)<0)
+ {
+ perror("fcntl(sock,SETFL,opts)");
+ exit(1);
+ }
+}
+
+
+/**
+ * thread exec function
+ */
+static void *readtask(void *args)
+{
+ int i = 0;
+ int fd = -1;
+ unsigned int n = 0;
+
+ struct user_data *data = NULL;
+
+ while (1)
+ {
+ // protect task queue (readhead/readtail)
+ pthread_mutex_lock(&r_mutex);
+
+ while(readhead == NULL)
+ {
+ // if condl false, will unlock mutex
+ mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self());
+ pthread_cond_wait(&r_condl, &r_mutex);
+ }
+
+ fd = readhead->data.fd;
+
+ struct task *tmp = readhead;
+ readhead = readhead->next;
+ free(tmp);
+ pthread_mutex_unlock(&r_mutex);
+
+ mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd);
+
+ data = (struct user_data *)malloc(sizeof(struct user_data));
+ if(NULL == data)
+ {
+ perror("readtask malloc");
+ return;
+ }
+
+ memset(data, 0 , sizeof(struct user_data));
+ data->fd = fd;
+
+ n=0;
+ int nread = 0;
+ while((nread = read(fd,data->line+n,MAX_LEN))>0)
+ {
+ n+=nread;
+ }
+
+ if((nread==-1) && (errno != EAGAIN))
+ {
+ perror("read error");
+ }
+
+ if(n < 0)
+ {
+ if (errno == ECONNRESET)
+ {
+ close(fd);
+ mydebug("[SERVER] Error: readline failed: %s\n", strerror(errno));
+ }
+ else
+ {
+ mydebug("readline error \n");
+ }
+ if(data != NULL)
+ {
+ free(data);
+ data = NULL;
+ }
+ }
+ else if (n == 0)
+ {
+ close(fd);
+ mydebug("[SERVER] Info: client closed connection.\n");
+ if(data != NULL)
+ {
+ free(data);
+ data = NULL;
+ }
+ }
+ else
+ {
+ data->n_size = n;
+
+ mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line);
+ if (data->line[0] != '\0')
+ {
+ // modify monitored event to EPOLLOUT, wait next loop to send respond
+ ev.data.ptr = data;
+ // Modify event to EPOLLOUT
+ ev.events = EPOLLOUT | EPOLLET;
+ // modify moditored fd event
+ if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1)
+ {
+ perror("epoll_ctl:mod");
+ if(data != NULL)
+ {
+ free(data);
+ data = NULL;
+ }
+ }
+ }
+ }
+ }
+}
+
+static void *writetask(void *args)
+{
+ int n = 0;
+ int nwrite = 0;
+ int ret = 0;
+
+ // data to wirte back to client
+ struct user_data *rdata = NULL;
+ while(1)
+ {
+ pthread_mutex_lock(&w_mutex);
+ while(writehead == NULL)
+ {
+ // if condl false, will unlock mutex
+ pthread_cond_wait(&w_condl, &w_mutex);
+ }
+ rdata = (struct user_data*)writehead->data.ptr;
+ struct task* tmp = writehead;
+ writehead = writehead->next;
+ free(tmp);
+
+ //echo("[SERVER] thread %d writetask before unlock\n", pthread_self());
+ pthread_mutex_unlock(&w_mutex);
+ //echo("[SERVER] thread %d writetask after unlock\n", pthread_self());
+
+ if(rdata->fd < 0)
+ {
+ goto error;
+ }
+ mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd);
+
+ writefunc wfunc = (writefunc)args;
+ ret = wfunc(rdata);
+ if(ret < 0)
+ {
+ goto error;
+ }
+
+ // delete data
+ if (rdata->fd > 0)
+ {
+ // modify monitored event to EPOLLIN, wait next loop to receive data
+ ev.data.fd = rdata->fd;
+ // monitor in message, edge trigger
+ //ev.events = EPOLLIN | EPOLLET;
+ // modify moditored fd event
+ if(epoll_ctl(epfd, EPOLL_CTL_DEL, rdata->fd, &ev)==-1)
+ {
+ perror("epoll_ctl:del");
+ }
+
+ close(rdata->fd);
+ }
+
+error:
+ free(rdata);
+ }
+}
+
+int basic_create_ipc_server(char * unix_domain_path)
+{
+ pthread_t *read_tids = NULL;
+ pthread_t *write_tids = NULL;
+ int lsn_fd, apt_fd;
+ struct sockaddr_un srv_addr;
+ struct sockaddr_un clt_addr;
+ socklen_t clt_len;
+ int ret;
+ int i;
+ char line[MAX_LEN] = {0};
+
+ struct task *new_task=NULL;
+ struct user_data *rdata=NULL;
+
+ if((read_tids = (pthread_t *)malloc(READ_THREAD_NUM * sizeof(pthread_t))) == NULL)
+ {
+ perror("read_tids malloc fail\n");
+ return -1;
+ }
+
+ // threads for reading thread pool
+ for(i = 0; i < READ_THREAD_NUM; i++)
+ {
+ pthread_create(&read_tids[i], NULL, readtask, NULL);
+ }
+
+ if((write_tids = (pthread_t *)malloc(WRITE_THREAD_NUM * sizeof(pthread_t))) == NULL)
+ {
+ perror("write_tids malloc fail\n");
+ return -1;
+ }
+
+ // threads for writing thread pool
+ for(i = 0; i < WRITE_THREAD_NUM; i++)
+ {
+ pthread_create(&write_tids[i], NULL, writetask, proc_memfd);
+ }
+
+ epfd = epoll_create(MAX_EPOLL_EVENT_COUNT);
+
+ //create socket to bind local IP and PORT
+ lsn_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+
+
+ // set the descriptor as non-blocking
+ setnonblocking(lsn_fd);
+
+ ev.data.fd = lsn_fd;
+ ev.events = EPOLLIN|EPOLLET;
+
+ /*Control an epoll descriptor, $epfd, by requesting the operation op be performed on the target file descriptor, fd.
+
+ $epfd is an epoll descriptor returned from epoll_create.
+ $op is one of EPOLL_CTL_ADD, EPOLL_CTL_MOD or EPOLL_CTL_DEL.
+ $fd is the file desciptor to be watched.
+ $eventmask is a bitmask of events defined by EPOLLIN, EPOLLOUT, etc.
+
+ When successful, epoll_ctl returns 0. When an error occurs, epoll_ctl returns -1 and errno is set appropriately.
+ */
+ if(epoll_ctl(epfd,EPOLL_CTL_ADD,lsn_fd,&ev)==-1)
+ {
+ perror("epoll_ctl:add");
+ }
+
+ if(lsn_fd < 0)
+ {
+ perror("can't create communication socket!");
+ unlink(UNIX_DOMAIN);
+ return 1;
+ }
+
+ //create unix domain
+ if(unix_domain_path == NULL)
+ {
+ unix_domain_path = UNIX_DOMAIN;
+ }
+
+ srv_addr.sun_family = AF_UNIX;
+ strncpy(srv_addr.sun_path, unix_domain_path, sizeof(srv_addr.sun_path) - 1);
+ //unlink(UNIX_DOMAIN);
+
+ //bind sockfd and sockaddr
+ ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
+ if(ret == -1)
+ {
+ perror("can't bind local sockaddr!");
+ close(lsn_fd);
+ unlink(UNIX_DOMAIN);
+ return 1;
+ }
+
+ //listen lsn_fd
+
+ ret = listen(lsn_fd, LISTENQ);
+ if(ret == -1)
+ {
+ perror("can't listen client connect request");
+ close(lsn_fd);
+ unlink(UNIX_DOMAIN);
+
+ return 1;
+ }
+
+ clt_len = sizeof(clt_addr);
+ while(1)
+ {
+ int nfds = epoll_wait(epfd,events,1024,100);
+ int i=0;
+ for(i=0;i<nfds;++i)
+ {
+ if(events[i].data.fd == lsn_fd)
+ {
+ // accept the client connection
+ while((apt_fd=accept(lsn_fd, (struct sockaddr *)&clt_addr, &clt_len))>0)
+ {
+ setnonblocking(apt_fd);
+ //char *str = inet_ntoa(clt_addr.sun_path);
+ //printf("[SERVER] connect from %s \n", str);
+ ev.data.fd = apt_fd;
+ // monitor in message, edge trigger
+ ev.events = EPOLLIN | EPOLLET;
+ // add fd to epoll queue
+ if(epoll_ctl(epfd,EPOLL_CTL_ADD,apt_fd,&ev)==-1)
+ {
+ perror("epoll_ctl:add");
+ continue;
+ //exit(EXIT_FAILURE);
+ }
+ }
+ if( apt_fd == -1)
+ {
+ if((errno != EAGAIN )
+ && (errno!= ECONNABORTED)
+ && (errno != EPROTO)
+ && (errno != EINTR))
+ {
+ perror("accept");
+
+ }
+ }
+ continue;
+ }
+ else if (events[i].events & EPOLLIN)
+ //write鏁版嵁
+ {
+ printf("EPOLLIN\n");
+ if( (eventfd = events[i].data.fd) < 0 )
+ {
+ continue;
+ }
+
+ printf("[SERVER] put task %d to read queue\n", events[i].data.fd);
+
+ new_task = (struct task *)malloc(sizeof(struct task));
+ if(NULL == new_task)
+ {
+ goto error;
+ }
+ new_task->data.fd= eventfd;
+ new_task->next = NULL;
+
+ // protect task queue (readhead/readtail)
+ pthread_mutex_lock(&r_mutex);
+
+ // the queue is empty
+ if(readhead == NULL)
+ {
+ readhead = new_task;
+ readtail = new_task;
+ }
+ // queue is not empty
+ else
+ {
+ readtail->next = new_task;
+ readtail = new_task;
+ }
+
+ // trigger readtask thread
+ pthread_cond_broadcast(&r_condl);
+ pthread_mutex_unlock(&r_mutex);
+ }
+ else if (events[i].events & EPOLLOUT)
+ {
+ rdata = (struct user_data *)events[i].data.ptr;
+ if ( NULL == rdata )
+ {
+ continue;
+ }
+ printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd);
+
+ new_task = malloc(sizeof(struct task));
+ if(NULL == new_task)
+ {
+ goto error;
+ }
+ new_task->data.ptr = (struct user_data*)events[i].data.ptr;
+ new_task->next = NULL;
+
+ pthread_mutex_lock(&w_mutex);
+
+ // the queue is empty
+ if (writehead == NULL)
+ {
+ writehead = new_task;
+ writetail = new_task;
+ }
+ // queue is not empty
+ else
+ {
+ writetail->next = new_task;
+ writetail = new_task;
+ }
+
+ // trigger writetask thread
+ pthread_cond_broadcast(&w_condl);
+
+ pthread_mutex_unlock(&w_mutex);
+ }
+ else
+ {
+ printf("[SERVER] Error: unknown epoll event");
+ }
+ }
+ }
+
+error:
+ close(apt_fd);
+ close(lsn_fd);
+ unlink(UNIX_DOMAIN);
+}
+
diff --git a/src/memfd.c b/src/memfd.c
index 2509685..00df30e 100644
--- a/src/memfd.c
+++ b/src/memfd.c
@@ -12,6 +12,7 @@
#include <time.h>
#include <stdarg.h>
#include "memfd.h"
+#include <assert.h>
#define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \
} while (0)
--
Gitblit v1.8.0