From afdec3e47d918c56a7c97df8975cebdf1c33d831 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期三, 22 七月 2020 14:41:54 +0800
Subject: [PATCH] dgram req_rep

---
 src/libshm_queue.a                    |    0 
 src/socket/shm_socket.c               |    4 
 demo/queue                            |    0 
 test_socket/dgram_socket_test         |    0 
 Makefile                              |    1 
 src/socket/dgram_mod_socket.c         |  139 +++++++++++++++++++++++
 build/include/dgram_mod_socket.h      |   41 ++++++
 demo/pub_sub                          |    0 
 src/util/sem_util.c                   |   10 
 test_socket/dgram_mod_req_rep.c       |   58 +++++++++
 build/include/logger_factory.h        |    4 
 test_socket/Makefile                  |    2 
 src/queue/include/shm_allocator.h     |    4 
 src/socket/mod_socket.c               |    8 -
 test_socket/dgram_mod_req_rep         |    0 
 src/socket/include/dgram_mod_socket.h |   41 ++++++
 demo/req_rep                          |    0 
 build/lib/libshm_queue.a              |    0 
 src/logger_factory.h                  |    4 
 19 files changed, 292 insertions(+), 24 deletions(-)

diff --git a/Makefile b/Makefile
index 757d648..e509479 100755
--- a/Makefile
+++ b/Makefile
@@ -10,6 +10,7 @@
 		(cd $$i && echo "cleaning $$i" && $(MAKE) clean) || exit 1; \
 	done
 	rm -rf build
+	ipcrm -a
 
 ipcrm:
 	-ipcrm -a
diff --git a/build/include/dgram_mod_socket.h b/build/include/dgram_mod_socket.h
new file mode 100644
index 0000000..ab635a3
--- /dev/null
+++ b/build/include/dgram_mod_socket.h
@@ -0,0 +1,41 @@
+#ifndef __DGRAM_MOD_SOCKET_H__
+#define __DGRAM_MOD_SOCKET_H__
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+
+enum socket_mod_t
+{
+	PULL_PUSH = 1,
+	REQ_REP = 2,
+	PAIR = 3,
+	PUB_SUB = 4,
+	SURVEY = 5,
+	BUS = 6
+	
+};
+
+ 
+
+
+void *dgram_mod_open_socket(int mod);
+
+int dgram_mod_close_socket(void * _socket);
+
+int dgram_mod_bind(void * _socket, int port);
+
+int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port);
+
+int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port);
+ 
+
+int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
\ No newline at end of file
diff --git a/build/include/logger_factory.h b/build/include/logger_factory.h
index 384e3e0..a766d14 100644
--- a/build/include/logger_factory.h
+++ b/build/include/logger_factory.h
@@ -6,8 +6,8 @@
 public:
 
 	static Logger getLogger() {
-//ERROR ALL DEBUG
-		static Logger logger(Logger::DEBUG);
+//ERROR ALL DEBUG INFO
+		static Logger logger(Logger::ERROR);
 		return logger;
 	}
 };
diff --git a/build/lib/libshm_queue.a b/build/lib/libshm_queue.a
index 4e9e1cb..11e442c 100644
--- a/build/lib/libshm_queue.a
+++ b/build/lib/libshm_queue.a
Binary files differ
diff --git a/demo/pub_sub b/demo/pub_sub
index 265621f..6ae4f04 100755
--- a/demo/pub_sub
+++ b/demo/pub_sub
Binary files differ
diff --git a/demo/queue b/demo/queue
index b1e1056..096c64a 100755
--- a/demo/queue
+++ b/demo/queue
Binary files differ
diff --git a/demo/req_rep b/demo/req_rep
index 3d35107..c950193 100755
--- a/demo/req_rep
+++ b/demo/req_rep
Binary files differ
diff --git a/src/libshm_queue.a b/src/libshm_queue.a
index 4e9e1cb..11e442c 100644
--- a/src/libshm_queue.a
+++ b/src/libshm_queue.a
Binary files differ
diff --git a/src/logger_factory.h b/src/logger_factory.h
index 384e3e0..a766d14 100644
--- a/src/logger_factory.h
+++ b/src/logger_factory.h
@@ -6,8 +6,8 @@
 public:
 
 	static Logger getLogger() {
-//ERROR ALL DEBUG
-		static Logger logger(Logger::DEBUG);
+//ERROR ALL DEBUG INFO
+		static Logger logger(Logger::ERROR);
 		return logger;
 	}
 };
diff --git a/src/queue/include/shm_allocator.h b/src/queue/include/shm_allocator.h
index 023bc9d..ae94a9c 100644
--- a/src/queue/include/shm_allocator.h
+++ b/src/queue/include/shm_allocator.h
@@ -66,12 +66,10 @@
 class SHM_Allocator {
   public:
     static void *allocate (size_t size) {
-       printf("shm_allocator malloc\n");
       return mem_pool_malloc(size);
     }
 
     static void deallocate (void *ptr) {
-      printf("shm_allocator free\n");
       return mem_pool_free(ptr);
     }
 };
@@ -80,12 +78,10 @@
 class DM_Allocator {
   public:
     static void *allocate (size_t size) {
-      printf("dm_allocator malloc\n");
       return malloc(size);
     }
 
     static void deallocate (void *ptr) {
-      printf("dm_allocator free\n");
       return free(ptr);
     }
 };
diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c
new file mode 100644
index 0000000..f9857bc
--- /dev/null
+++ b/src/socket/dgram_mod_socket.c
@@ -0,0 +1,139 @@
+#include "usg_common.h"
+#include "dgram_mod_socket.h"
+#include "shm_socket.h"
+#include "shm_allocator.h"
+#include "mem_pool.h"
+#include "hashtable.h"
+#include "sem_util.h"
+#include "logger_factory.h"
+
+typedef struct dgram_mod_socket_t {
+	socket_mod_t mod;
+  shm_socket_t *shm_socket;
+  pthread_t recv_thread;
+	std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * > *recv_queue_map;
+} dgram_mod_socket_t;
+
+
+void *dgram_mod_open_socket(int mod) {
+	dgram_mod_socket_t * socket = (dgram_mod_socket_t *)calloc(1, sizeof(dgram_mod_socket_t));
+	socket->mod = (socket_mod_t)mod;
+	socket->recv_thread = 0;
+	socket->recv_queue_map = NULL;
+	socket->shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
+
+	printf("socket->shm_socket = %p \n" , socket->shm_socket);
+	return (void *)socket;
+}
+
+
+int dgram_mod_close_socket(void * _socket) {
+	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+	shm_close_socket(socket->shm_socket);
+	if(socket->recv_queue_map != NULL) {
+		for(auto iter = socket->recv_queue_map->begin(); iter != socket->recv_queue_map->end(); iter++) {
+			delete iter->second;
+			socket->recv_queue_map->erase(iter);
+			
+		}
+		delete socket->recv_queue_map;
+	}
+
+
+	if(socket->recv_thread != 0)
+		pthread_cancel(socket->recv_thread);
+	free(_socket);
+}
+
+
+int dgram_mod_bind(void * _socket, int port){
+	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+	return  shm_socket_bind(socket->shm_socket, port);
+}
+
+int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port) {
+	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+
+	return shm_sendto(socket->shm_socket, buf, size, port);
+
+}
+
+int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port) {
+
+	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+	if(socket->mod == REQ_REP && socket->recv_thread != 0) {
+		err_exit(0, "you have used sendandrecv method os you can not use recvfrom method any more. these two method can not be used at the same time.");
+		return -1;
+	}
+	return shm_recvfrom(socket->shm_socket, buf, size, port);
+}
+
+void *_dgram_mod_run_recv(void * _socket) {
+	pthread_detach(pthread_self());
+	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+	void *buf;
+	int size;
+	int port;
+	shm_msg_t msg;
+	LockFreeQueue<shm_msg_t, DM_Allocator> *queue;
+	std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >::iterator iter;
+// printf("==============_dgram_mod_run_recv recv before\n");
+	while(shm_recvfrom(socket->shm_socket, &buf, &size, &port) == 0) {
+		if( (iter = socket->recv_queue_map->find(port) ) != socket->recv_queue_map->end()) {
+			queue = iter->second;
+		} else {
+			queue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
+			socket->recv_queue_map->insert({port, queue});
+		}
+
+		msg.buf = buf;
+		msg.size = size;
+		msg.port = port;
+// printf("==============_dgram_mod_run_recv push before\n");
+		queue->push(msg);
+// printf("==============_dgram_mod_run_recv push after\n");
+	 
+	}
+	return NULL;
+
+
+
+}
+
+int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) {
+	
+	dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+	if(socket->mod != REQ_REP) {
+		err_exit(0, "you can't use this method other than REQ_REP mod!");
+	}
+	if(socket->recv_queue_map == NULL) {
+		socket->recv_queue_map = new std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >;
+	}
+
+	std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >::iterator iter;
+	LockFreeQueue<shm_msg_t, DM_Allocator> *queue;
+	if( (iter = socket->recv_queue_map->find(port) ) != socket->recv_queue_map->end()) {
+		queue = iter->second;
+	} else {
+		queue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
+		socket->recv_queue_map->insert({port, queue});
+	}
+
+	if (socket->recv_thread == 0) {
+		
+		pthread_create(&(socket->recv_thread ), NULL, _dgram_mod_run_recv , _socket);
+		
+	}
+
+	shm_sendto(socket->shm_socket, send_buf, send_size, port);	
+	shm_msg_t msg;
+// printf("==============dgram_mod_sendandrecv pop before\n");
+	if(queue->pop(msg)) {
+		*recv_buf = msg.buf;
+		*recv_size = msg.size;
+// printf("==============dgram_mod_sendandrecv pop after\n");
+		return 0;
+	}
+	return -1;
+
+}
\ No newline at end of file
diff --git a/src/socket/include/dgram_mod_socket.h b/src/socket/include/dgram_mod_socket.h
new file mode 100644
index 0000000..ab635a3
--- /dev/null
+++ b/src/socket/include/dgram_mod_socket.h
@@ -0,0 +1,41 @@
+#ifndef __DGRAM_MOD_SOCKET_H__
+#define __DGRAM_MOD_SOCKET_H__
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+
+enum socket_mod_t
+{
+	PULL_PUSH = 1,
+	REQ_REP = 2,
+	PAIR = 3,
+	PUB_SUB = 4,
+	SURVEY = 5,
+	BUS = 6
+	
+};
+
+ 
+
+
+void *dgram_mod_open_socket(int mod);
+
+int dgram_mod_close_socket(void * _socket);
+
+int dgram_mod_bind(void * _socket, int port);
+
+int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port);
+
+int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port);
+ 
+
+int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
\ No newline at end of file
diff --git a/src/socket/mod_socket.c b/src/socket/mod_socket.c
index fcb5e58..62e8b8f 100644
--- a/src/socket/mod_socket.c
+++ b/src/socket/mod_socket.c
@@ -133,11 +133,9 @@
 	if(socket->is_server ) {
 		switch(socket->mod) {
 			case REQ_REP:
-logger.debug("mod_send before");
 				SemUtil::dec(socket->items);
 				rv = shm_send(socket->client_socket, buf, size);
 				SemUtil::inc(socket->slots);
-logger.debug("mod_send after");
 				break;
 			case SURVEY:
 			case PUB_SUB:
@@ -152,9 +150,7 @@
 		
 	}
 	else {
-logger.debug("mod_send before");
 		rv = shm_send(socket->shm_socket, buf, size);
-logger.debug("mod_send after");
 		return rv;
 	}
 	return -1;
@@ -168,7 +164,6 @@
 	if(socket->is_server ) {
 		switch(socket->mod) {
 			case REQ_REP:
-logger.debug("REQ_REP mod_recv before");
 				SemUtil::dec(socket->slots);
 				socket->recvQueue->pop(entry);
 				*buf = entry.buf;
@@ -176,7 +171,6 @@
 				socket->client_socket = entry.client_socket;
 				SemUtil::inc(socket->items);
 				
-logger.debug("REQ_REP mod_recv after");
 				break;
 			case PUB_SUB:
 				break;
@@ -191,9 +185,7 @@
 		return 0;
 	}
 	else {
-logger.debug("mod_recv before");
 		shm_recv(socket->shm_socket, buf, size);
-logger.debug("mod_recv after");
 		return 0;
 	}
 
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index 260cdc2..3d7ba4e 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -287,7 +287,7 @@
 	}
 
 	shm_msg_t src;
-//logger.debug("shm_recvfrom pop before");
+printf("shm_recvfrom pop before");
 	if (socket->queue->pop(src)) {
 		void * _buf = malloc(src.size);
 		memcpy(_buf, src.buf, src.size);
@@ -295,7 +295,7 @@
 		*size = src.size;
 		*port = src.port;
 		mm_free(src.buf);
-//logger.debug("shm_recvfrom pop after");
+printf("shm_recvfrom pop after");
 		return 0;
 	} else {
 		return -1;
diff --git a/src/util/sem_util.c b/src/util/sem_util.c
index e2b2c20..4f294c0 100644
--- a/src/util/sem_util.c
+++ b/src/util/sem_util.c
@@ -14,12 +14,12 @@
     union semun arg;
     struct sembuf sop;
 
-    logger.info("%ld: created semaphore\n", (long)getpid());
+    //logger.info("%ld: created semaphore\n", (long)getpid());
 
     arg.val = 0; /* So initialize it to 0 */
     if (semctl(semid, 0, SETVAL, arg) == -1)
       err_exit(errno, "semctl 1");
-    logger.info("%ld: initialized semaphore\n", (long)getpid());
+    //logger.info("%ld: initialized semaphore\n", (long)getpid());
 
     /* Perform a "no-op" semaphore operation - changes sem_otime
        so other processes can see we've initialized the set. */
@@ -29,7 +29,7 @@
     sop.sem_flg = 0;
     if (semop(semid, &sop, 1) == -1)
       err_exit(errno, "semop");
-    logger.info("%ld: completed dummy semop()\n", (long)getpid());
+    //logger.info("%ld: completed dummy semop()\n", (long)getpid());
 
   } else { /* We didn't create the semaphore set */
 
@@ -46,12 +46,12 @@
       if (semid == -1)
         err_exit(errno, "semget 2");
 
-      logger.info("%ld: got semaphore key\n", (long)getpid());
+     // logger.info("%ld: got semaphore key\n", (long)getpid());
       /* Wait until another process has called semop() */
 
       arg.buf = &ds;
       for (j = 0; j < MAX_TRIES; j++) {
-        logger.info("Try %d\n", j);
+        //logger.info("Try %d\n", j);
         if (semctl(semid, 0, IPC_STAT, arg) == -1)
           err_exit(errno, "semctl 2");
 
diff --git a/test_socket/Makefile b/test_socket/Makefile
index c2a7f89..214cd1a 100644
--- a/test_socket/Makefile
+++ b/test_socket/Makefile
@@ -14,7 +14,7 @@
 include $(ROOT)/Make.defines.$(PLATFORM)
 
 
-PROGS =	dgram_socket_test
+PROGS =	dgram_socket_test dgram_mod_req_rep
 
 
 build: $(PROGS)
diff --git a/test_socket/dgram_mod_req_rep b/test_socket/dgram_mod_req_rep
new file mode 100755
index 0000000..4034ad8
--- /dev/null
+++ b/test_socket/dgram_mod_req_rep
Binary files differ
diff --git a/test_socket/dgram_mod_req_rep.c b/test_socket/dgram_mod_req_rep.c
new file mode 100644
index 0000000..a857ce6
--- /dev/null
+++ b/test_socket/dgram_mod_req_rep.c
@@ -0,0 +1,58 @@
+#include "dgram_mod_socket.h"
+#include "shm_mm.h"
+#include "usg_common.h"
+
+void server(int port) {
+  void *socket = dgram_mod_open_socket(REQ_REP);
+  dgram_mod_bind(socket, port);
+  int size;
+  void *recvbuf;
+  char sendbuf[512];
+  int rv;
+  int remote_port;
+  while ( (rv = dgram_mod_recvfrom(socket, &recvbuf, &size, &remote_port) ) == 0) {
+    sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf);
+    puts(sendbuf);
+    dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, remote_port);
+    free(recvbuf);
+  }
+  dgram_mod_close_socket(socket);
+}
+
+void client(int port) {
+  void *socket = dgram_mod_open_socket(REQ_REP);
+  int size;
+  void *recvbuf;
+  char sendbuf[512];
+  while (true) {
+    printf("request: ");
+    scanf("%s", sendbuf);
+    dgram_mod_sendandrecv(socket, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &size);
+    printf("reply: %s\n", (char *)recvbuf);
+    free(recvbuf);
+  }
+  dgram_mod_close_socket(socket);
+}
+
+ 
+
+int main(int argc, char *argv[]) {
+  shm_init(512);
+  int port;
+  if (argc < 3) {
+    fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
+    return 1;
+  }
+
+  port = atoi(argv[2]);
+
+  if (strcmp("server", argv[1]) == 0) {
+    server(port);
+  }
+
+  if (strcmp("client", argv[1]) == 0)
+    client(port);
+
+  
+  return 0;
+}
\ No newline at end of file
diff --git a/test_socket/dgram_socket_test b/test_socket/dgram_socket_test
index d493973..6a18d7c 100755
--- a/test_socket/dgram_socket_test
+++ b/test_socket/dgram_socket_test
Binary files differ

--
Gitblit v1.8.0