From afdec3e47d918c56a7c97df8975cebdf1c33d831 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期三, 22 七月 2020 14:41:54 +0800
Subject: [PATCH] dgram req_rep
---
src/libshm_queue.a | 0
src/socket/shm_socket.c | 4
demo/queue | 0
test_socket/dgram_socket_test | 0
Makefile | 1
src/socket/dgram_mod_socket.c | 139 +++++++++++++++++++++++
build/include/dgram_mod_socket.h | 41 ++++++
demo/pub_sub | 0
src/util/sem_util.c | 10
test_socket/dgram_mod_req_rep.c | 58 +++++++++
build/include/logger_factory.h | 4
test_socket/Makefile | 2
src/queue/include/shm_allocator.h | 4
src/socket/mod_socket.c | 8 -
test_socket/dgram_mod_req_rep | 0
src/socket/include/dgram_mod_socket.h | 41 ++++++
demo/req_rep | 0
build/lib/libshm_queue.a | 0
src/logger_factory.h | 4
19 files changed, 292 insertions(+), 24 deletions(-)
diff --git a/Makefile b/Makefile
index 757d648..e509479 100755
--- a/Makefile
+++ b/Makefile
@@ -10,6 +10,7 @@
(cd $$i && echo "cleaning $$i" && $(MAKE) clean) || exit 1; \
done
rm -rf build
+ ipcrm -a
ipcrm:
-ipcrm -a
diff --git a/build/include/dgram_mod_socket.h b/build/include/dgram_mod_socket.h
new file mode 100644
index 0000000..ab635a3
--- /dev/null
+++ b/build/include/dgram_mod_socket.h
@@ -0,0 +1,41 @@
+#ifndef __DGRAM_MOD_SOCKET_H__
+#define __DGRAM_MOD_SOCKET_H__
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+
+enum socket_mod_t
+{
+ PULL_PUSH = 1,
+ REQ_REP = 2,
+ PAIR = 3,
+ PUB_SUB = 4,
+ SURVEY = 5,
+ BUS = 6
+
+};
+
+
+
+
+void *dgram_mod_open_socket(int mod);
+
+int dgram_mod_close_socket(void * _socket);
+
+int dgram_mod_bind(void * _socket, int port);
+
+int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port);
+
+int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port);
+
+
+int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
\ No newline at end of file
diff --git a/build/include/logger_factory.h b/build/include/logger_factory.h
index 384e3e0..a766d14 100644
--- a/build/include/logger_factory.h
+++ b/build/include/logger_factory.h
@@ -6,8 +6,8 @@
public:
static Logger getLogger() {
-//ERROR ALL DEBUG
- static Logger logger(Logger::DEBUG);
+//ERROR ALL DEBUG INFO
+ static Logger logger(Logger::ERROR);
return logger;
}
};
diff --git a/build/lib/libshm_queue.a b/build/lib/libshm_queue.a
index 4e9e1cb..11e442c 100644
--- a/build/lib/libshm_queue.a
+++ b/build/lib/libshm_queue.a
Binary files differ
diff --git a/demo/pub_sub b/demo/pub_sub
index 265621f..6ae4f04 100755
--- a/demo/pub_sub
+++ b/demo/pub_sub
Binary files differ
diff --git a/demo/queue b/demo/queue
index b1e1056..096c64a 100755
--- a/demo/queue
+++ b/demo/queue
Binary files differ
diff --git a/demo/req_rep b/demo/req_rep
index 3d35107..c950193 100755
--- a/demo/req_rep
+++ b/demo/req_rep
Binary files differ
diff --git a/src/libshm_queue.a b/src/libshm_queue.a
index 4e9e1cb..11e442c 100644
--- a/src/libshm_queue.a
+++ b/src/libshm_queue.a
Binary files differ
diff --git a/src/logger_factory.h b/src/logger_factory.h
index 384e3e0..a766d14 100644
--- a/src/logger_factory.h
+++ b/src/logger_factory.h
@@ -6,8 +6,8 @@
public:
static Logger getLogger() {
-//ERROR ALL DEBUG
- static Logger logger(Logger::DEBUG);
+//ERROR ALL DEBUG INFO
+ static Logger logger(Logger::ERROR);
return logger;
}
};
diff --git a/src/queue/include/shm_allocator.h b/src/queue/include/shm_allocator.h
index 023bc9d..ae94a9c 100644
--- a/src/queue/include/shm_allocator.h
+++ b/src/queue/include/shm_allocator.h
@@ -66,12 +66,10 @@
class SHM_Allocator {
public:
static void *allocate (size_t size) {
- printf("shm_allocator malloc\n");
return mem_pool_malloc(size);
}
static void deallocate (void *ptr) {
- printf("shm_allocator free\n");
return mem_pool_free(ptr);
}
};
@@ -80,12 +78,10 @@
class DM_Allocator {
public:
static void *allocate (size_t size) {
- printf("dm_allocator malloc\n");
return malloc(size);
}
static void deallocate (void *ptr) {
- printf("dm_allocator free\n");
return free(ptr);
}
};
diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c
new file mode 100644
index 0000000..f9857bc
--- /dev/null
+++ b/src/socket/dgram_mod_socket.c
@@ -0,0 +1,139 @@
+#include "usg_common.h"
+#include "dgram_mod_socket.h"
+#include "shm_socket.h"
+#include "shm_allocator.h"
+#include "mem_pool.h"
+#include "hashtable.h"
+#include "sem_util.h"
+#include "logger_factory.h"
+
+typedef struct dgram_mod_socket_t {
+ socket_mod_t mod;
+ shm_socket_t *shm_socket;
+ pthread_t recv_thread;
+ std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * > *recv_queue_map;
+} dgram_mod_socket_t;
+
+
+void *dgram_mod_open_socket(int mod) {
+ dgram_mod_socket_t * socket = (dgram_mod_socket_t *)calloc(1, sizeof(dgram_mod_socket_t));
+ socket->mod = (socket_mod_t)mod;
+ socket->recv_thread = 0;
+ socket->recv_queue_map = NULL;
+ socket->shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
+
+ printf("socket->shm_socket = %p \n" , socket->shm_socket);
+ return (void *)socket;
+}
+
+
+int dgram_mod_close_socket(void * _socket) {
+ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+ shm_close_socket(socket->shm_socket);
+ if(socket->recv_queue_map != NULL) {
+ for(auto iter = socket->recv_queue_map->begin(); iter != socket->recv_queue_map->end(); iter++) {
+ delete iter->second;
+ socket->recv_queue_map->erase(iter);
+
+ }
+ delete socket->recv_queue_map;
+ }
+
+
+ if(socket->recv_thread != 0)
+ pthread_cancel(socket->recv_thread);
+ free(_socket);
+}
+
+
+int dgram_mod_bind(void * _socket, int port){
+ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+ return shm_socket_bind(socket->shm_socket, port);
+}
+
+int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port) {
+ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+
+ return shm_sendto(socket->shm_socket, buf, size, port);
+
+}
+
+int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port) {
+
+ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+ if(socket->mod == REQ_REP && socket->recv_thread != 0) {
+ err_exit(0, "you have used sendandrecv method os you can not use recvfrom method any more. these two method can not be used at the same time.");
+ return -1;
+ }
+ return shm_recvfrom(socket->shm_socket, buf, size, port);
+}
+
+void *_dgram_mod_run_recv(void * _socket) {
+ pthread_detach(pthread_self());
+ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+ void *buf;
+ int size;
+ int port;
+ shm_msg_t msg;
+ LockFreeQueue<shm_msg_t, DM_Allocator> *queue;
+ std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >::iterator iter;
+// printf("==============_dgram_mod_run_recv recv before\n");
+ while(shm_recvfrom(socket->shm_socket, &buf, &size, &port) == 0) {
+ if( (iter = socket->recv_queue_map->find(port) ) != socket->recv_queue_map->end()) {
+ queue = iter->second;
+ } else {
+ queue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
+ socket->recv_queue_map->insert({port, queue});
+ }
+
+ msg.buf = buf;
+ msg.size = size;
+ msg.port = port;
+// printf("==============_dgram_mod_run_recv push before\n");
+ queue->push(msg);
+// printf("==============_dgram_mod_run_recv push after\n");
+
+ }
+ return NULL;
+
+
+
+}
+
+int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) {
+
+ dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
+ if(socket->mod != REQ_REP) {
+ err_exit(0, "you can't use this method other than REQ_REP mod!");
+ }
+ if(socket->recv_queue_map == NULL) {
+ socket->recv_queue_map = new std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >;
+ }
+
+ std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >::iterator iter;
+ LockFreeQueue<shm_msg_t, DM_Allocator> *queue;
+ if( (iter = socket->recv_queue_map->find(port) ) != socket->recv_queue_map->end()) {
+ queue = iter->second;
+ } else {
+ queue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
+ socket->recv_queue_map->insert({port, queue});
+ }
+
+ if (socket->recv_thread == 0) {
+
+ pthread_create(&(socket->recv_thread ), NULL, _dgram_mod_run_recv , _socket);
+
+ }
+
+ shm_sendto(socket->shm_socket, send_buf, send_size, port);
+ shm_msg_t msg;
+// printf("==============dgram_mod_sendandrecv pop before\n");
+ if(queue->pop(msg)) {
+ *recv_buf = msg.buf;
+ *recv_size = msg.size;
+// printf("==============dgram_mod_sendandrecv pop after\n");
+ return 0;
+ }
+ return -1;
+
+}
\ No newline at end of file
diff --git a/src/socket/include/dgram_mod_socket.h b/src/socket/include/dgram_mod_socket.h
new file mode 100644
index 0000000..ab635a3
--- /dev/null
+++ b/src/socket/include/dgram_mod_socket.h
@@ -0,0 +1,41 @@
+#ifndef __DGRAM_MOD_SOCKET_H__
+#define __DGRAM_MOD_SOCKET_H__
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+
+enum socket_mod_t
+{
+ PULL_PUSH = 1,
+ REQ_REP = 2,
+ PAIR = 3,
+ PUB_SUB = 4,
+ SURVEY = 5,
+ BUS = 6
+
+};
+
+
+
+
+void *dgram_mod_open_socket(int mod);
+
+int dgram_mod_close_socket(void * _socket);
+
+int dgram_mod_bind(void * _socket, int port);
+
+int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port);
+
+int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port);
+
+
+int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
\ No newline at end of file
diff --git a/src/socket/mod_socket.c b/src/socket/mod_socket.c
index fcb5e58..62e8b8f 100644
--- a/src/socket/mod_socket.c
+++ b/src/socket/mod_socket.c
@@ -133,11 +133,9 @@
if(socket->is_server ) {
switch(socket->mod) {
case REQ_REP:
-logger.debug("mod_send before");
SemUtil::dec(socket->items);
rv = shm_send(socket->client_socket, buf, size);
SemUtil::inc(socket->slots);
-logger.debug("mod_send after");
break;
case SURVEY:
case PUB_SUB:
@@ -152,9 +150,7 @@
}
else {
-logger.debug("mod_send before");
rv = shm_send(socket->shm_socket, buf, size);
-logger.debug("mod_send after");
return rv;
}
return -1;
@@ -168,7 +164,6 @@
if(socket->is_server ) {
switch(socket->mod) {
case REQ_REP:
-logger.debug("REQ_REP mod_recv before");
SemUtil::dec(socket->slots);
socket->recvQueue->pop(entry);
*buf = entry.buf;
@@ -176,7 +171,6 @@
socket->client_socket = entry.client_socket;
SemUtil::inc(socket->items);
-logger.debug("REQ_REP mod_recv after");
break;
case PUB_SUB:
break;
@@ -191,9 +185,7 @@
return 0;
}
else {
-logger.debug("mod_recv before");
shm_recv(socket->shm_socket, buf, size);
-logger.debug("mod_recv after");
return 0;
}
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index 260cdc2..3d7ba4e 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -287,7 +287,7 @@
}
shm_msg_t src;
-//logger.debug("shm_recvfrom pop before");
+printf("shm_recvfrom pop before");
if (socket->queue->pop(src)) {
void * _buf = malloc(src.size);
memcpy(_buf, src.buf, src.size);
@@ -295,7 +295,7 @@
*size = src.size;
*port = src.port;
mm_free(src.buf);
-//logger.debug("shm_recvfrom pop after");
+printf("shm_recvfrom pop after");
return 0;
} else {
return -1;
diff --git a/src/util/sem_util.c b/src/util/sem_util.c
index e2b2c20..4f294c0 100644
--- a/src/util/sem_util.c
+++ b/src/util/sem_util.c
@@ -14,12 +14,12 @@
union semun arg;
struct sembuf sop;
- logger.info("%ld: created semaphore\n", (long)getpid());
+ //logger.info("%ld: created semaphore\n", (long)getpid());
arg.val = 0; /* So initialize it to 0 */
if (semctl(semid, 0, SETVAL, arg) == -1)
err_exit(errno, "semctl 1");
- logger.info("%ld: initialized semaphore\n", (long)getpid());
+ //logger.info("%ld: initialized semaphore\n", (long)getpid());
/* Perform a "no-op" semaphore operation - changes sem_otime
so other processes can see we've initialized the set. */
@@ -29,7 +29,7 @@
sop.sem_flg = 0;
if (semop(semid, &sop, 1) == -1)
err_exit(errno, "semop");
- logger.info("%ld: completed dummy semop()\n", (long)getpid());
+ //logger.info("%ld: completed dummy semop()\n", (long)getpid());
} else { /* We didn't create the semaphore set */
@@ -46,12 +46,12 @@
if (semid == -1)
err_exit(errno, "semget 2");
- logger.info("%ld: got semaphore key\n", (long)getpid());
+ // logger.info("%ld: got semaphore key\n", (long)getpid());
/* Wait until another process has called semop() */
arg.buf = &ds;
for (j = 0; j < MAX_TRIES; j++) {
- logger.info("Try %d\n", j);
+ //logger.info("Try %d\n", j);
if (semctl(semid, 0, IPC_STAT, arg) == -1)
err_exit(errno, "semctl 2");
diff --git a/test_socket/Makefile b/test_socket/Makefile
index c2a7f89..214cd1a 100644
--- a/test_socket/Makefile
+++ b/test_socket/Makefile
@@ -14,7 +14,7 @@
include $(ROOT)/Make.defines.$(PLATFORM)
-PROGS = dgram_socket_test
+PROGS = dgram_socket_test dgram_mod_req_rep
build: $(PROGS)
diff --git a/test_socket/dgram_mod_req_rep b/test_socket/dgram_mod_req_rep
new file mode 100755
index 0000000..4034ad8
--- /dev/null
+++ b/test_socket/dgram_mod_req_rep
Binary files differ
diff --git a/test_socket/dgram_mod_req_rep.c b/test_socket/dgram_mod_req_rep.c
new file mode 100644
index 0000000..a857ce6
--- /dev/null
+++ b/test_socket/dgram_mod_req_rep.c
@@ -0,0 +1,58 @@
+#include "dgram_mod_socket.h"
+#include "shm_mm.h"
+#include "usg_common.h"
+
+void server(int port) {
+ void *socket = dgram_mod_open_socket(REQ_REP);
+ dgram_mod_bind(socket, port);
+ int size;
+ void *recvbuf;
+ char sendbuf[512];
+ int rv;
+ int remote_port;
+ while ( (rv = dgram_mod_recvfrom(socket, &recvbuf, &size, &remote_port) ) == 0) {
+ sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf);
+ puts(sendbuf);
+ dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, remote_port);
+ free(recvbuf);
+ }
+ dgram_mod_close_socket(socket);
+}
+
+void client(int port) {
+ void *socket = dgram_mod_open_socket(REQ_REP);
+ int size;
+ void *recvbuf;
+ char sendbuf[512];
+ while (true) {
+ printf("request: ");
+ scanf("%s", sendbuf);
+ dgram_mod_sendandrecv(socket, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &size);
+ printf("reply: %s\n", (char *)recvbuf);
+ free(recvbuf);
+ }
+ dgram_mod_close_socket(socket);
+}
+
+
+
+int main(int argc, char *argv[]) {
+ shm_init(512);
+ int port;
+ if (argc < 3) {
+ fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
+ return 1;
+ }
+
+ port = atoi(argv[2]);
+
+ if (strcmp("server", argv[1]) == 0) {
+ server(port);
+ }
+
+ if (strcmp("client", argv[1]) == 0)
+ client(port);
+
+
+ return 0;
+}
\ No newline at end of file
diff --git a/test_socket/dgram_socket_test b/test_socket/dgram_socket_test
index d493973..6a18d7c 100755
--- a/test_socket/dgram_socket_test
+++ b/test_socket/dgram_socket_test
Binary files differ
--
Gitblit v1.8.0