From 3ce014732763fd28a7b03d5ce99ec990f830f985 Mon Sep 17 00:00:00 2001
From: cheliequan <liequanche@126.com>
Date: 星期五, 30 十二月 2022 15:59:41 +0800
Subject: [PATCH] 1.客户端支持只获取memfd,只获取数据,同时获取memfd和数据 2.优化代码,所有日志使用mydebug
---
include/ipc_msg.h | 28 ++
/dev/null | 138 -------------
src/ipc_client_lib.c | 46 ++--
include/memfd.h | 1
sample/shmht_mytests.c | 2
sample/ipc_client.c | 83 +++++++
src/shmht.c | 2
src/ipc_server_lib.c | 147 ++++++++------
src/ipc_msg.c | 11 +
src/Makefile | 6
sample/ipc_server.c | 99 ++++++++-
11 files changed, 307 insertions(+), 256 deletions(-)
diff --git a/include/ipc_msg.h b/include/ipc_msg.h
index 5b489c3..628957b 100644
--- a/include/ipc_msg.h
+++ b/include/ipc_msg.h
@@ -15,8 +15,12 @@
} while (0)
#define MAX_LEN 4096
-#define HELLO_MSG "hello_basic"
-#define offsetof(TYPE, MEMBER) ((int) &((TYPE *)0)->MEMBER)
+#define GET_MEMFD_MSG "basic_get_memfd"
+#define GET_DATA_MSG "basic_get_data"
+#define GET_CUSTOM_MSG "basic_get_custom"
+
+#define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)
+
#define container_of(ptr, type, member) ({\
const typeof(((type *)0)->member) * __mptr = (ptr);\
(type *)((char *)__mptr - offsetof(type, member)); }
@@ -35,11 +39,27 @@
char line[MAX_LEN];
};
+typedef int (*wfunc)(struct user_data*, void *arg, int len);
+typedef struct arg_struct {
+ wfunc wfunc;
+ void * args;
+ int len;
+}args_data_st;
+
+typedef struct custom_data{
+ int memfd;
+ void * data;
+ int len;
+}st_custom_data;
+
+
#define READ_THREAD_NUM 2
#define WRITE_THREAD_NUM 2
-int proc_memfd(struct user_data* rdata);
-int basic_create_ipc_server(char * unix_domain_path);
+int file_exists(char *filename);
+int proc_msg(struct user_data* rdata, void * arg, int len);
+int basic_create_ipc_server(char * unix_domain_path, args_data_st * pargs_data_st);
+int basic_create_ipc_client(char * unix_domain_path, char * send_buf, int send_len, void ** pptr_recv_buf, int *ptr_recv_len);
int send_fd_sendmsg(int fd, memfd_data_st** ppmemfd_data);
diff --git a/include/memfd.h b/include/memfd.h
index d6db1f9..17e4fdb 100644
--- a/include/memfd.h
+++ b/include/memfd.h
@@ -14,6 +14,7 @@
int basic_shm_close(int fd);
int basic_shm_shrink(int fd, ssize_t len);
int basic_shm_grow(int fd, ssize_t len);
+void mydebug(const char *fmt, ...);
enum{
local_open_flag = 0,
diff --git a/sample/ipc_client.c b/sample/ipc_client.c
index e9809c3..7dde9a3 100644
--- a/sample/ipc_client.c
+++ b/sample/ipc_client.c
@@ -1,4 +1,6 @@
#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
#include "memfd.h"
#include "ipc_msg.h"
@@ -27,15 +29,60 @@
{
int fd = 0;
int ret;
- int i;
- char *fd_path;
+ int recv_len = 0;
memfd_data_st memfd_data = {0};
- int rcv_num = 0;
+ char snd_buf[MAX_LEN] = {0};
+ char * unix_domain_path = NULL;
- ret = basic_create_ipc_client(UNIX_DOMAIN, &memfd_data);
+ char *recv_buf = (char *)malloc(MAX_LEN);
+
+ /*client get memfd from server*/
+ memset(snd_buf, 0, sizeof(snd_buf));
+ strncpy(snd_buf, GET_MEMFD_MSG, strlen(GET_MEMFD_MSG) + 1);
+ printf("snd_buf: %s\n", snd_buf);
+ memset(recv_buf, 0, MAX_LEN);
+
+ unix_domain_path = UNIX_DOMAIN;
+ ret = basic_create_ipc_client(unix_domain_path, snd_buf, strlen(snd_buf), (void **)&recv_buf, &recv_len);
if(ret < 0)
{
return ret;
+ }
+
+ memcpy(&memfd_data, recv_buf, sizeof(memfd_data));
+
+ printf("receive message from server,pid:%d, memfd:%d\n", memfd_data.pid, memfd_data.memfd);
+ if ((memfd_data.pid != 0) && (memfd_data.memfd != 0))
+ {
+ char fd_path[256] = {0};
+ snprintf(fd_path, sizeof(fd_path), "/proc/%d/fd/%d", memfd_data.pid, memfd_data.memfd);
+ fprintf(stderr,"fd_path:%s\n", fd_path);
+ fd = basic_shm_open(memfd_data.memfd, memfd_data.pid, 1);
+ ret = memfd_read(fd);
+ }
+
+ basic_shm_close(fd);
+
+ /*client get memfd and custom data from server*/
+#if 1
+
+ memset(snd_buf, 0, sizeof(snd_buf));
+ strncpy(snd_buf, GET_CUSTOM_MSG, strlen(GET_CUSTOM_MSG) + 1);
+ printf("\nsnd_buf: %s\n", snd_buf);
+ memset(recv_buf, 0, MAX_LEN);
+
+ unix_domain_path = UNIX_DOMAIN;
+ ret = basic_create_ipc_client(unix_domain_path, snd_buf, strlen(snd_buf), (void **)&recv_buf, &recv_len);
+ if(ret < 0)
+ {
+ return ret;
+ }
+
+ memcpy(&memfd_data, recv_buf, sizeof(memfd_data));
+ if(memfd_data.len > 0)
+ {
+ memfd_data.data = malloc(memfd_data.len);
+ memcpy(memfd_data.data, recv_buf+sizeof(memfd_data), memfd_data.len);
}
printf("receive message from server,pid:%d, memfd:%d\n", memfd_data.pid, memfd_data.memfd);
@@ -44,11 +91,37 @@
char fd_path[256] = {0};
snprintf(fd_path, sizeof(fd_path), "/proc/%d/fd/%d", memfd_data.pid, memfd_data.memfd);
fprintf(stderr,"fd_path:%s\n", fd_path);
- fd = basic_shm_open(memfd_data.memfd, memfd_data.pid , 1);
+ fd = basic_shm_open(memfd_data.memfd, memfd_data.pid, 1);
ret = memfd_read(fd);
}
basic_shm_close(fd);
+ fprintf(stderr,"custom data:%s\n", (char *)memfd_data.data);
+ if(memfd_data.len > 0)
+ {
+ free(memfd_data.data);
+ }
+#endif
+
+#if 1
+ /*client get usedata from server*/
+ memset(snd_buf, 0, sizeof(snd_buf));
+ strncpy(snd_buf, GET_DATA_MSG, strlen(GET_DATA_MSG) + 1);
+ printf("\nsnd_buf: %s\n", snd_buf);
+ memset(recv_buf, 0, MAX_LEN);
+ recv_len = 0;
+
+ //unix_domain_path="/tmp/unix_domain_usedata";
+ ret = basic_create_ipc_client(unix_domain_path, snd_buf, strlen(snd_buf), (void **)&recv_buf, &recv_len);
+ if(ret < 0)
+ {
+ return ret;
+ }
+ printf("receive message from server msg:%s\n", recv_buf);
+ /*deal the recv_buf*/
+#endif
+
+ free(recv_buf);
return 0;
}
diff --git a/sample/ipc_server.c b/sample/ipc_server.c
index 0d8178a..9aeab6a 100644
--- a/sample/ipc_server.c
+++ b/sample/ipc_server.c
@@ -5,11 +5,13 @@
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
-
+#include <string.h>
+#include <pthread.h>
#include "memfd.h"
#include "ipc_msg.h"
-int g_memfd = 0;
+
+//typedef int (*readfunc)(struct user_data*);
void handler(){
@@ -43,10 +45,9 @@
-int proc_memfd(struct user_data* rdata)
+int proc_msg(struct user_data* rdata, void * arg, int len)
{
int fd = rdata->fd;
- unsigned int n_size;
char line[MAX_LEN]= {0};
int n = 0;
int nwrite = 0;
@@ -58,21 +59,56 @@
return -1;
}
- if(0 == strncmp(rdata->line, HELLO_MSG, strlen(HELLO_MSG)))
+
+ if(0 == strncmp(rdata->line, GET_MEMFD_MSG, strlen(GET_MEMFD_MSG)))
{
- memfd_data.memfd = g_memfd;
+ st_custom_data * pst_data = (st_custom_data *)(arg);
memfd_data.pid = getpid();
+ memfd_data.memfd = pst_data->memfd;
memfd_data.data = NULL;
memfd_data.len = 0;
- data_size = sizeof(memfd_data_st) + memfd_data.len;
+
+ data_size = sizeof(memfd_data_st);
memcpy(line, &memfd_data, data_size);
+ n = data_size;
}
- else
+ else if(0 == strncmp(rdata->line, GET_DATA_MSG, strlen(GET_DATA_MSG)))
{
- return -1;
+ if((NULL == arg) || (0 == len))
+ {
+ return -1;
+ }
+
+ st_custom_data * pst_data = (st_custom_data *)(arg);
+ memfd_data.pid = 0;
+ memfd_data.memfd = 0;
+ memfd_data.data = pst_data->data;
+ memfd_data.len = pst_data->len;
+ data_size = pst_data->len;
+ /*copy the arg to line,so it can send to client*/
+ memcpy(line, pst_data->data, data_size);
+ n = data_size;
}
-
- n = data_size;
+ else if(0 == strncmp(rdata->line, GET_CUSTOM_MSG, strlen(GET_CUSTOM_MSG)))
+ {
+ if((NULL == arg) || (0 == len))
+ {
+ return -1;
+ }
+ st_custom_data * pst_data = (st_custom_data *)(arg);
+ memfd_data.pid = getpid();
+ memfd_data.memfd = pst_data->memfd;
+ memfd_data.data = pst_data->data;
+ memfd_data.len = pst_data->len;
+ data_size = sizeof(memfd_data_st);
+ memcpy(line, &memfd_data, data_size);
+
+ memcpy(line+data_size, memfd_data.data, memfd_data.len);
+ data_size += memfd_data.len;
+ n = data_size;
+ }
+
+ mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd);
while(n>0)
{
nwrite = write(rdata->fd, line+data_size-n,n);////ET
@@ -85,12 +121,12 @@
if (errno == ECONNRESET)
{
close(rdata->fd);
- printf("[SERVER] Error: send responce failed: %s\n", strerror(errno));
+ mydebug("[SERVER] Error: send responce failed: %s\n", strerror(errno));
}
else if (nwrite == 0)
{
close(rdata->fd);
- printf("[SERVER] Error: client closed connection.");
+ mydebug("[SERVER] Error: client closed connection.");
}
break;
}
@@ -101,14 +137,16 @@
}
-/*define you our proc_memfd funtion to send your message to other processes锛宱r just put it in shm锛�
+/*define you our proc_msg funtion to send your message to other processes锛�
notice the file description fd and the pid of creator to other*/
int main(int argc, char **argv)
{
char *name;
ssize_t len;
- char * unixdomain = NULL;
+ char * unix_domain_path = NULL;
int ret = 0;
+ args_data_st args_data_st={0};
+ int g_memfd = 0;
if (argc < 3) {
fprintf(stderr, "%s name size [unix domain path]\n", argv[0]);
@@ -117,21 +155,48 @@
name = argv[1];
len = atoi(argv[2]) * 1024LU * 1024LU * 1024LU;
- unixdomain = argv[3];
signal(SIGTERM,handler);
signal(SIGABRT,handler);
signal(SIGSEGV,handler);
+#if 1
g_memfd = basic_shm_create(name, len);
ret = shm_write(g_memfd);
- ret = basic_create_ipc_server(unixdomain);
+ char data[256] = "test custom usedata\n";
+
+ st_custom_data st_data;
+ st_data.memfd = g_memfd;
+ st_data.data = data;
+ st_data.len = strlen(data)+1;
+ args_data_st.wfunc = proc_msg;
+ args_data_st.args = &st_data;
+ args_data_st.len = sizeof(st_data)+st_data.len;
+
+ unix_domain_path=UNIX_DOMAIN;
+ ret = basic_create_ipc_server(unix_domain_path,&args_data_st);
+ if(ret < 0)
+ {
+ printf("failed to create ipc_server %s\n", unix_domain_path);
+ }
+
+#endif
+
+#if 0
+ char send_buf[256] = "test custom usedata\n";
+ args_data_st.wfunc = proc_msg;
+ args_data_st.args = &send_buf;
+ args_data_st.len = strlen(send_buf);
+
+ unix_domain_path="/tmp/unix_domain_usedata";
+ ret = basic_create_ipc_server(unix_domain_path,&args_data_st);
if(ret < 0)
{
printf("failed to create ipc_server\n");
}
+#endif
// basic_shm_close(g_memfd);
return 0;
diff --git a/sample/shmht_mytests.c b/sample/shmht_mytests.c
index 5b316b1..6c5c0a6 100644
--- a/sample/shmht_mytests.c
+++ b/sample/shmht_mytests.c
@@ -444,7 +444,7 @@
assert(shmht_count (h[0]) == 1);
- int ret_size;
+ size_t ret_size;
void *ret = shmht_search(h[0], key, strlen (key), &ret_size);
assert(ret != NULL);
diff --git a/src/Makefile b/src/Makefile
index af87aeb..3f8ae37 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -1,6 +1,6 @@
CC=gcc
AR=ar
-CFLAGS=-g -D __SHMT_DEBUG_MODE__=1 -Wall -I${INCLUDE}
+CFLAGS=-g -fPIC -D __SHMT_DEBUG_MODE__=1 -Wall -I${INCLUDE}
INCLUDE=-I. -I ../include
all:shmht shmht_mytests ipc_server ipc_client
gcc -g -c -Wall -Werror -I${INCLUDE} -fpic list_in_shm.c memfd.c -lpthread
@@ -20,10 +20,10 @@
ipc_server:libipc_server.so
$(CC) $(CFLAGS) $(INCLUDE) -fPIC -pthread -o $@ ../sample/ipc_server.c -lipc_server -L .
libipc_server.so:
- $(CC) -shared -fPIC $(INCLUDE) -o $@ ipc_server_lib.c ipc_msg.c memfd.c
+ $(CC) -shared $(CFLAGS) $(INCLUDE) -o $@ ipc_server_lib.c ipc_msg.c memfd.c
ipc_client:libipc_client.so
$(CC) $(CFLAGS) $(INCLUDE) -fPIC -o $@ ../sample/ipc_client.c ipc_msg.c memfd.c -lipc_client -L .
libipc_client.so:
- $(CC) -shared -fPIC $(INCLUDE) -o $@ ipc_client_lib.c ipc_msg.c memfd.c
+ $(CC) -shared $(CFLAGS) $(INCLUDE) -o $@ ipc_client_lib.c ipc_msg.c memfd.c
clean:
rm -rf *.so *.o a.out shmht_mytests *.a ipc_client ipc_server
diff --git a/src/ipc_client.c b/src/ipc_client.c
deleted file mode 100644
index e9809c3..0000000
--- a/src/ipc_client.c
+++ /dev/null
@@ -1,54 +0,0 @@
-#include <stdio.h>
-#include "memfd.h"
-#include "ipc_msg.h"
-
-
-int memfd_read(int fd)
-{
- unsigned char* addr;
- ssize_t i, len;
- len = basic_shm_mmap(fd, &addr);
- if(len < 0)
- {
- return -1;
- }
- printf ("start: ");
- for (i = 0; i < 32; ++i)
- printf ("%i ", addr[i]);
- printf ("\nend: ");
- for (i = 0; i < 32; ++i)
- printf ("%i ", addr[len-32+i]);
- printf ("\ndone\n");
- return len;
-}
-
-
-int main(int argc, char **argv)
-{
- int fd = 0;
- int ret;
- int i;
- char *fd_path;
- memfd_data_st memfd_data = {0};
- int rcv_num = 0;
-
- ret = basic_create_ipc_client(UNIX_DOMAIN, &memfd_data);
- if(ret < 0)
- {
- return ret;
- }
-
- printf("receive message from server,pid:%d, memfd:%d\n", memfd_data.pid, memfd_data.memfd);
- if ((memfd_data.pid != 0) && (memfd_data.memfd != 0))
- {
- char fd_path[256] = {0};
- snprintf(fd_path, sizeof(fd_path), "/proc/%d/fd/%d", memfd_data.pid, memfd_data.memfd);
- fprintf(stderr,"fd_path:%s\n", fd_path);
- fd = basic_shm_open(memfd_data.memfd, memfd_data.pid , 1);
- ret = memfd_read(fd);
- }
-
- basic_shm_close(fd);
- return 0;
-
-}
diff --git a/src/ipc_client_lib.c b/src/ipc_client_lib.c
index ae6aac1..ff5390a 100644
--- a/src/ipc_client_lib.c
+++ b/src/ipc_client_lib.c
@@ -10,24 +10,32 @@
#include <errno.h>
#include "ipc_msg.h"
-int basic_create_ipc_client(char * unix_domain_path, memfd_data_st * ptr_memfd_data)
+
+int basic_create_ipc_client(char * unix_domain_path, char * send_buf, int send_len, void ** pptr_recv_buf, int *ptr_recv_len)
{
- int fd;
int connect_fd;
struct sockaddr_un srv_addr;
- char snd_buf[MAX_LEN] = {0};
- char rcv_buf[MAX_LEN] = {0};
int ret;
- int i;
- char *fd_path;
- int rcv_num = 0;
+ char buf[MAX_LEN] = {0};
+ int recv_len = 0;
+
+ if((NULL == pptr_recv_buf)||(NULL == ptr_recv_len))
+ {
+ return -1;
+ }
+
+ if (strlen(unix_domain_path) > sizeof(srv_addr.sun_path))
+ {
+ perror("too long filename!\n");
+ return -1;
+ }
connect_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if(connect_fd < 0)
{
perror("client create socket failed");
- return 1;
+ return -1;
}
srv_addr.sun_family = AF_UNIX;
@@ -42,8 +50,8 @@
{
perror("connect to server failed!");
close(connect_fd);
- unlink(UNIX_DOMAIN);
- return 1;
+ //unlink(unix_domain_path);
+ return -1;
}
printf("connect to server path:%s success!", srv_addr.sun_path);
@@ -52,22 +60,20 @@
//int rcv_num = read(connect_fd, rcv_buf, sizeof(rcv_buf));
//memcpy(&memfd_data, rcv_buf, sizeof(memfd_data_st));
- memset(snd_buf, 0, 256);
- strcpy(snd_buf, HELLO_MSG);
- printf("sizeof(snd_buf): %ld\n", sizeof(snd_buf));
-
printf("send data to server... ...\n");
- write(connect_fd, snd_buf, sizeof(snd_buf));
+ write(connect_fd, send_buf, send_len);
printf("send end!\n");
- memset(rcv_buf, 0, sizeof(rcv_buf));
- rcv_num = read(connect_fd, rcv_buf, sizeof(rcv_buf));
- if(rcv_num == 0)
+ recv_len = read(connect_fd, buf, sizeof(buf));
+ if(recv_len == 0)
{
close(connect_fd);
- return 0;
+ return -1;
}
- memcpy(ptr_memfd_data, rcv_buf, rcv_num);
+ memcpy(*pptr_recv_buf, buf, recv_len);
+ *ptr_recv_len = recv_len;
close(connect_fd);
+
+ return 0;
}
diff --git a/src/ipc_msg.c b/src/ipc_msg.c
index 804eafa..85a53c2 100644
--- a/src/ipc_msg.c
+++ b/src/ipc_msg.c
@@ -1,7 +1,16 @@
-#include "ipc_msg.h"
#include <string.h>
#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include "ipc_msg.h"
+
+int file_exists(char *filename)
+{
+ struct stat buffer;
+ return (stat (filename, &buffer) == 0);
+}
/**
* @brief 鍙戦�佺洰鏍囨枃浠舵弿杩扮
* @param fd 浼犻�掍俊鎭殑 UNIX 鍩� 鏂囦欢鎻忚堪绗�
diff --git a/src/ipc_msg.h b/src/ipc_msg.h
deleted file mode 100644
index 5b489c3..0000000
--- a/src/ipc_msg.h
+++ /dev/null
@@ -1,62 +0,0 @@
-#ifndef IPC_MSG_H
-#define IPC_MSG_H
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <netinet/in.h>
-#include <unistd.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-
-#define UNIX_DOMAIN "/tmp/UNIX.domain"
-#define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); \
-} while (0)
-
-#define MAX_LEN 4096
-#define HELLO_MSG "hello_basic"
-#define offsetof(TYPE, MEMBER) ((int) &((TYPE *)0)->MEMBER)
-#define container_of(ptr, type, member) ({\
-const typeof(((type *)0)->member) * __mptr = (ptr);\
-(type *)((char *)__mptr - offsetof(type, member)); }
-
-typedef struct memfd_data {
- pid_t pid;
- int memfd;
- void * data;
- int len;
-} memfd_data_st;
-
-struct user_data
-{
- int fd;
- unsigned int n_size;
- char line[MAX_LEN];
-};
-
-#define READ_THREAD_NUM 2
-#define WRITE_THREAD_NUM 2
-
-int proc_memfd(struct user_data* rdata);
-int basic_create_ipc_server(char * unix_domain_path);
-
-int send_fd_sendmsg(int fd, memfd_data_st** ppmemfd_data);
-
-/**
- * @brief 鍙戦�佺洰鏍囨枃浠舵弿杩扮
- * @param fd 浼犻�掍俊鎭殑 UNIX 鍩� 鏂囦欢鎻忚堪绗�
- * @param fd_to_send 寰呭彂閫佺殑鏂囦欢鎻忚堪绗�
- */
-int send_fd_args_sendmsg(int fd, int fd_to_send, pid_t pid, void **ppdata, int len);
-
-/**
- * @brief 鎺ュ彈鏂囦欢鎻忚堪绗�
- * @param fd 浼犻�掍俊鎭殑 UNIX 鍩� 鏂囦欢鎻忚堪绗�
- */
-int recv_fd_recvmsg(int fd, memfd_data_st** ppmemfd_data);
-
-#ifdef __cplusplus
-}
-#endif
-#endif
diff --git a/src/ipc_server.c b/src/ipc_server.c
deleted file mode 100644
index 0d8178a..0000000
--- a/src/ipc_server.c
+++ /dev/null
@@ -1,138 +0,0 @@
-#include <signal.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <unistd.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <errno.h>
-
-#include "memfd.h"
-#include "ipc_msg.h"
-
-int g_memfd = 0;
-
-void handler(){
-
- printf("clean program start\n");
- //unlink(UNIX_DOMAIN);
- remove(UNIX_DOMAIN);
- printf("clean end.\n");
-}
-
-int shm_write(int fd)
-{
- struct stat st;
- ssize_t len;
- int ret = 0;
- unsigned char* paddr;
-
- if (fstat (fd, &st))
- errExit ("fstat");
- len = st.st_size;
-
- char *content = "hello world!\n";
- ret = basic_shm_mmap (fd, &paddr);
- if (ret < 0)
- errExit("basic_shm_mmap");
-
- memcpy(paddr, content, strlen(content));
-
- printf("length: %zu, atime: %lu.%lu\n", len, st.st_atim.tv_sec, st.st_atim.tv_nsec);
- return len;
-}
-
-
-
-int proc_memfd(struct user_data* rdata)
-{
- int fd = rdata->fd;
- unsigned int n_size;
- char line[MAX_LEN]= {0};
- int n = 0;
- int nwrite = 0;
- int data_size = 0;
- memfd_data_st memfd_data = {0};
-
- if(rdata->n_size <= 0 || fd < 0)
- {
- return -1;
- }
-
- if(0 == strncmp(rdata->line, HELLO_MSG, strlen(HELLO_MSG)))
- {
- memfd_data.memfd = g_memfd;
- memfd_data.pid = getpid();
- memfd_data.data = NULL;
- memfd_data.len = 0;
- data_size = sizeof(memfd_data_st) + memfd_data.len;
- memcpy(line, &memfd_data, data_size);
- }
- else
- {
- return -1;
- }
-
- n = data_size;
- while(n>0)
- {
- nwrite = write(rdata->fd, line+data_size-n,n);////ET
- if( nwrite < n )
- {
- if((nwrite==-1) && (errno != EAGAIN))
- {
- perror("write error");
- }
- if (errno == ECONNRESET)
- {
- close(rdata->fd);
- printf("[SERVER] Error: send responce failed: %s\n", strerror(errno));
- }
- else if (nwrite == 0)
- {
- close(rdata->fd);
- printf("[SERVER] Error: client closed connection.");
- }
- break;
- }
- n -= nwrite;
- }
-
- return nwrite;
-
-}
-
-/*define you our proc_memfd funtion to send your message to other processes锛宱r just put it in shm锛�
- notice the file description fd and the pid of creator to other*/
-int main(int argc, char **argv)
-{
- char *name;
- ssize_t len;
- char * unixdomain = NULL;
- int ret = 0;
-
- if (argc < 3) {
- fprintf(stderr, "%s name size [unix domain path]\n", argv[0]);
- exit(EXIT_FAILURE);
- }
-
- name = argv[1];
- len = atoi(argv[2]) * 1024LU * 1024LU * 1024LU;
- unixdomain = argv[3];
-
- signal(SIGTERM,handler);
- signal(SIGABRT,handler);
- signal(SIGSEGV,handler);
-
-
- g_memfd = basic_shm_create(name, len);
- ret = shm_write(g_memfd);
-
- ret = basic_create_ipc_server(unixdomain);
- if(ret < 0)
- {
- printf("failed to create ipc_server\n");
- }
-
- // basic_shm_close(g_memfd);
- return 0;
-}
diff --git a/src/ipc_server_lib.c b/src/ipc_server_lib.c
index 41c22ee..6b2ce1f 100644
--- a/src/ipc_server_lib.c
+++ b/src/ipc_server_lib.c
@@ -13,6 +13,8 @@
#include <sys/types.h>
#include <pthread.h>
#include "ipc_msg.h"
+#include "memfd.h"
+
#define MAX_EPOLL_EVENT_COUNT 1024
#define LISTENQ 1024
@@ -26,8 +28,7 @@
struct task* next;
};
-//typedef int (*readfunc)(struct user_data*);
-typedef int (*writefunc)(struct user_data*);
+
/* thread exec function */
static void *readtask(void *args);
static void *writetask(void *args);
@@ -71,7 +72,6 @@
*/
static void *readtask(void *args)
{
- int i = 0;
int fd = -1;
unsigned int n = 0;
@@ -85,16 +85,22 @@
while(readhead == NULL)
{
// if condl false, will unlock mutex
- mydebug("thread:%u waiting, task is NULL ... \n", (uint32_t)pthread_self());
+ mydebug("thread:%u waiting, readtask is NULL ... \n", (uint32_t)pthread_self());
pthread_cond_wait(&r_condl, &r_mutex);
}
- fd = readhead->data.fd;
+ fd = readhead->data.fd;
struct task *tmp = readhead;
readhead = readhead->next;
free(tmp);
pthread_mutex_unlock(&r_mutex);
+
+ if(fd < 0)
+ {
+ perror("readtask fd<0");
+ continue;
+ }
mydebug("[SERVER] readtask %u handling %d\n", (uint32_t)pthread_self(), fd);
@@ -102,7 +108,7 @@
if(NULL == data)
{
perror("readtask malloc");
- return;
+ return NULL;
}
memset(data, 0 , sizeof(struct user_data));
@@ -136,47 +142,47 @@
free(data);
data = NULL;
}
- }
- else if (n == 0)
- {
- close(fd);
- mydebug("[SERVER] Info: client closed connection.\n");
- if(data != NULL)
- {
- free(data);
- data = NULL;
- }
- }
- else
- {
- data->n_size = n;
-
- mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line);
- if (data->line[0] != '\0')
- {
- // modify monitored event to EPOLLOUT, wait next loop to send respond
- ev.data.ptr = data;
- // Modify event to EPOLLOUT
- ev.events = EPOLLOUT | EPOLLET;
- // modify moditored fd event
- if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1)
- {
- perror("epoll_ctl:mod");
- if(data != NULL)
- {
- free(data);
- data = NULL;
- }
- }
- }
- }
+ }
+ else if (n == 0)
+ {
+ close(fd);
+ mydebug("[SERVER] Info: client closed connection.\n");
+ if(data != NULL)
+ {
+ free(data);
+ data = NULL;
+ }
}
+ else
+ {
+ data->n_size = n;
+
+ mydebug("[SERVER] readtask %u received %d : [%d] %s\n", (uint32_t)pthread_self(), fd, data->n_size, data->line);
+ if (data->line[0] != '\0')
+ {
+ // modify monitored event to EPOLLOUT, wait next loop to send respond
+ ev.data.ptr = data;
+ // Modify event to EPOLLOUT
+ ev.events = EPOLLOUT | EPOLLET;
+ // modify moditored fd event
+ if(epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev)==-1)
+ {
+ perror("epoll_ctl:mod");
+ if(data != NULL)
+ {
+ free(data);
+ data = NULL;
+ }
+ }
+ }
+ }
+ }
+
+ return NULL;
}
static void *writetask(void *args)
{
- int n = 0;
- int nwrite = 0;
int ret = 0;
// data to wirte back to client
@@ -200,15 +206,18 @@
if(rdata->fd < 0)
{
- goto error;
+ continue;
}
- mydebug("[SERVER] writetask %u sending %d\n", (uint32_t)pthread_self(), rdata->fd);
+ mydebug("[SERVER] writetask %u fetch data %d\n", (uint32_t)pthread_self(), rdata->fd);
- writefunc wfunc = (writefunc)args;
- ret = wfunc(rdata);
- if(ret < 0)
+ if(NULL != args)
{
- goto error;
+ args_data_st *pargs_data_st = (args_data_st *)args;
+ ret = pargs_data_st->wfunc(rdata, pargs_data_st->args, pargs_data_st->len);
+ if(ret < 0)
+ {
+ goto error;
+ }
}
// delete data
@@ -230,9 +239,11 @@
error:
free(rdata);
}
+
+ return NULL;
}
-int basic_create_ipc_server(char * unix_domain_path)
+int basic_create_ipc_server(char * unix_domain_path, args_data_st * pargs_data_st)
{
pthread_t *read_tids = NULL;
pthread_t *write_tids = NULL;
@@ -242,16 +253,19 @@
socklen_t clt_len;
int ret;
int i;
- char line[MAX_LEN] = {0};
struct task *new_task=NULL;
struct user_data *rdata=NULL;
+ if(0 == file_exists(unix_domain_path))
+ {
+ unlink(unix_domain_path);
+ }
- if((read_tids = (pthread_t *)malloc(READ_THREAD_NUM * sizeof(pthread_t))) == NULL)
- {
- perror("read_tids malloc fail\n");
+ if((read_tids = (pthread_t *)malloc(READ_THREAD_NUM * sizeof(pthread_t))) == NULL)
+ {
+ perror("read_tids malloc fail\n");
return -1;
- }
+ }
// threads for reading thread pool
for(i = 0; i < READ_THREAD_NUM; i++)
@@ -268,7 +282,7 @@
// threads for writing thread pool
for(i = 0; i < WRITE_THREAD_NUM; i++)
{
- pthread_create(&write_tids[i], NULL, writetask, proc_memfd);
+ pthread_create(&write_tids[i], NULL, writetask, (void *)pargs_data_st);
}
epfd = epoll_create(MAX_EPOLL_EVENT_COUNT);
@@ -300,7 +314,7 @@
if(lsn_fd < 0)
{
perror("can't create communication socket!");
- unlink(UNIX_DOMAIN);
+ unlink(unix_domain_path);
return 1;
}
@@ -312,7 +326,6 @@
srv_addr.sun_family = AF_UNIX;
strncpy(srv_addr.sun_path, unix_domain_path, sizeof(srv_addr.sun_path) - 1);
- //unlink(UNIX_DOMAIN);
//bind sockfd and sockaddr
ret = bind(lsn_fd, (struct sockaddr*)&srv_addr, sizeof(srv_addr));
@@ -320,7 +333,7 @@
{
perror("can't bind local sockaddr!");
close(lsn_fd);
- unlink(UNIX_DOMAIN);
+ unlink(unix_domain_path);
return 1;
}
@@ -331,7 +344,7 @@
{
perror("can't listen client connect request");
close(lsn_fd);
- unlink(UNIX_DOMAIN);
+ unlink(unix_domain_path);
return 1;
}
@@ -378,20 +391,20 @@
else if (events[i].events & EPOLLIN)
//write鏁版嵁
{
- printf("EPOLLIN\n");
- if( (eventfd = events[i].data.fd) < 0 )
+ mydebug("EPOLLIN\n");
+ if(events[i].data.fd < 0)
{
continue;
}
- printf("[SERVER] put task %d to read queue\n", events[i].data.fd);
+ mydebug("[SERVER] put task %d to read queue\n", events[i].data.fd);
new_task = (struct task *)malloc(sizeof(struct task));
if(NULL == new_task)
{
goto error;
}
- new_task->data.fd= eventfd;
+ new_task->data.fd= events[i].data.fd;
new_task->next = NULL;
// protect task queue (readhead/readtail)
@@ -421,7 +434,7 @@
{
continue;
}
- printf("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd);
+ mydebug("[SERVER] put task %d to write queue\n", ((struct task*)events[i].data.ptr)->data.fd);
new_task = malloc(sizeof(struct task));
if(NULL == new_task)
@@ -453,7 +466,7 @@
}
else
{
- printf("[SERVER] Error: unknown epoll event");
+ mydebug("[SERVER] Error: unknown epoll event");
}
}
}
@@ -461,6 +474,8 @@
error:
close(apt_fd);
close(lsn_fd);
- unlink(UNIX_DOMAIN);
+ unlink(unix_domain_path);
+
+ return 0;
}
diff --git a/src/shmht.c b/src/shmht.c
index dc23056..2700b06 100644
--- a/src/shmht.c
+++ b/src/shmht.c
@@ -83,7 +83,7 @@
2 * sizeof (struct entry) * size + (sizeof (struct bucket) +
register_size) * size;
- shmht_debug(("create_shmht size=%d, register_size:%u, all_ht_size=%d\n", size, register_size, all_ht_size));
+ shmht_debug(("create_shmht size=%d, register_size:%lu, all_ht_size=%d\n", size, register_size, all_ht_size));
int fd = basic_shm_create(name, all_ht_size);
if ( 0 >= fd )
{
--
Gitblit v1.8.0