From afdec3e47d918c56a7c97df8975cebdf1c33d831 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期三, 22 七月 2020 14:41:54 +0800 Subject: [PATCH] dgram req_rep --- src/libshm_queue.a | 0 src/socket/shm_socket.c | 4 demo/queue | 0 test_socket/dgram_socket_test | 0 Makefile | 1 src/socket/dgram_mod_socket.c | 139 +++++++++++++++++++++++ build/include/dgram_mod_socket.h | 41 ++++++ demo/pub_sub | 0 src/util/sem_util.c | 10 test_socket/dgram_mod_req_rep.c | 58 +++++++++ build/include/logger_factory.h | 4 test_socket/Makefile | 2 src/queue/include/shm_allocator.h | 4 src/socket/mod_socket.c | 8 - test_socket/dgram_mod_req_rep | 0 src/socket/include/dgram_mod_socket.h | 41 ++++++ demo/req_rep | 0 build/lib/libshm_queue.a | 0 src/logger_factory.h | 4 19 files changed, 292 insertions(+), 24 deletions(-) diff --git a/Makefile b/Makefile index 757d648..e509479 100755 --- a/Makefile +++ b/Makefile @@ -10,6 +10,7 @@ (cd $$i && echo "cleaning $$i" && $(MAKE) clean) || exit 1; \ done rm -rf build + ipcrm -a ipcrm: -ipcrm -a diff --git a/build/include/dgram_mod_socket.h b/build/include/dgram_mod_socket.h new file mode 100644 index 0000000..ab635a3 --- /dev/null +++ b/build/include/dgram_mod_socket.h @@ -0,0 +1,41 @@ +#ifndef __DGRAM_MOD_SOCKET_H__ +#define __DGRAM_MOD_SOCKET_H__ + + +#ifdef __cplusplus +extern "C" { +#endif + + +enum socket_mod_t +{ + PULL_PUSH = 1, + REQ_REP = 2, + PAIR = 3, + PUB_SUB = 4, + SURVEY = 5, + BUS = 6 + +}; + + + + +void *dgram_mod_open_socket(int mod); + +int dgram_mod_close_socket(void * _socket); + +int dgram_mod_bind(void * _socket, int port); + +int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port); + +int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port); + + +int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; + +#ifdef __cplusplus +} +#endif + +#endif \ No newline at end of file diff --git a/build/include/logger_factory.h b/build/include/logger_factory.h index 384e3e0..a766d14 100644 --- a/build/include/logger_factory.h +++ b/build/include/logger_factory.h @@ -6,8 +6,8 @@ public: static Logger getLogger() { -//ERROR ALL DEBUG - static Logger logger(Logger::DEBUG); +//ERROR ALL DEBUG INFO + static Logger logger(Logger::ERROR); return logger; } }; diff --git a/build/lib/libshm_queue.a b/build/lib/libshm_queue.a index 4e9e1cb..11e442c 100644 --- a/build/lib/libshm_queue.a +++ b/build/lib/libshm_queue.a Binary files differ diff --git a/demo/pub_sub b/demo/pub_sub index 265621f..6ae4f04 100755 --- a/demo/pub_sub +++ b/demo/pub_sub Binary files differ diff --git a/demo/queue b/demo/queue index b1e1056..096c64a 100755 --- a/demo/queue +++ b/demo/queue Binary files differ diff --git a/demo/req_rep b/demo/req_rep index 3d35107..c950193 100755 --- a/demo/req_rep +++ b/demo/req_rep Binary files differ diff --git a/src/libshm_queue.a b/src/libshm_queue.a index 4e9e1cb..11e442c 100644 --- a/src/libshm_queue.a +++ b/src/libshm_queue.a Binary files differ diff --git a/src/logger_factory.h b/src/logger_factory.h index 384e3e0..a766d14 100644 --- a/src/logger_factory.h +++ b/src/logger_factory.h @@ -6,8 +6,8 @@ public: static Logger getLogger() { -//ERROR ALL DEBUG - static Logger logger(Logger::DEBUG); +//ERROR ALL DEBUG INFO + static Logger logger(Logger::ERROR); return logger; } }; diff --git a/src/queue/include/shm_allocator.h b/src/queue/include/shm_allocator.h index 023bc9d..ae94a9c 100644 --- a/src/queue/include/shm_allocator.h +++ b/src/queue/include/shm_allocator.h @@ -66,12 +66,10 @@ class SHM_Allocator { public: static void *allocate (size_t size) { - printf("shm_allocator malloc\n"); return mem_pool_malloc(size); } static void deallocate (void *ptr) { - printf("shm_allocator free\n"); return mem_pool_free(ptr); } }; @@ -80,12 +78,10 @@ class DM_Allocator { public: static void *allocate (size_t size) { - printf("dm_allocator malloc\n"); return malloc(size); } static void deallocate (void *ptr) { - printf("dm_allocator free\n"); return free(ptr); } }; diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c new file mode 100644 index 0000000..f9857bc --- /dev/null +++ b/src/socket/dgram_mod_socket.c @@ -0,0 +1,139 @@ +#include "usg_common.h" +#include "dgram_mod_socket.h" +#include "shm_socket.h" +#include "shm_allocator.h" +#include "mem_pool.h" +#include "hashtable.h" +#include "sem_util.h" +#include "logger_factory.h" + +typedef struct dgram_mod_socket_t { + socket_mod_t mod; + shm_socket_t *shm_socket; + pthread_t recv_thread; + std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * > *recv_queue_map; +} dgram_mod_socket_t; + + +void *dgram_mod_open_socket(int mod) { + dgram_mod_socket_t * socket = (dgram_mod_socket_t *)calloc(1, sizeof(dgram_mod_socket_t)); + socket->mod = (socket_mod_t)mod; + socket->recv_thread = 0; + socket->recv_queue_map = NULL; + socket->shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); + + printf("socket->shm_socket = %p \n" , socket->shm_socket); + return (void *)socket; +} + + +int dgram_mod_close_socket(void * _socket) { + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + shm_close_socket(socket->shm_socket); + if(socket->recv_queue_map != NULL) { + for(auto iter = socket->recv_queue_map->begin(); iter != socket->recv_queue_map->end(); iter++) { + delete iter->second; + socket->recv_queue_map->erase(iter); + + } + delete socket->recv_queue_map; + } + + + if(socket->recv_thread != 0) + pthread_cancel(socket->recv_thread); + free(_socket); +} + + +int dgram_mod_bind(void * _socket, int port){ + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + return shm_socket_bind(socket->shm_socket, port); +} + +int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port) { + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + + return shm_sendto(socket->shm_socket, buf, size, port); + +} + +int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port) { + + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + if(socket->mod == REQ_REP && socket->recv_thread != 0) { + err_exit(0, "you have used sendandrecv method os you can not use recvfrom method any more. these two method can not be used at the same time."); + return -1; + } + return shm_recvfrom(socket->shm_socket, buf, size, port); +} + +void *_dgram_mod_run_recv(void * _socket) { + pthread_detach(pthread_self()); + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + void *buf; + int size; + int port; + shm_msg_t msg; + LockFreeQueue<shm_msg_t, DM_Allocator> *queue; + std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >::iterator iter; +// printf("==============_dgram_mod_run_recv recv before\n"); + while(shm_recvfrom(socket->shm_socket, &buf, &size, &port) == 0) { + if( (iter = socket->recv_queue_map->find(port) ) != socket->recv_queue_map->end()) { + queue = iter->second; + } else { + queue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); + socket->recv_queue_map->insert({port, queue}); + } + + msg.buf = buf; + msg.size = size; + msg.port = port; +// printf("==============_dgram_mod_run_recv push before\n"); + queue->push(msg); +// printf("==============_dgram_mod_run_recv push after\n"); + + } + return NULL; + + + +} + +int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) { + + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + if(socket->mod != REQ_REP) { + err_exit(0, "you can't use this method other than REQ_REP mod!"); + } + if(socket->recv_queue_map == NULL) { + socket->recv_queue_map = new std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >; + } + + std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >::iterator iter; + LockFreeQueue<shm_msg_t, DM_Allocator> *queue; + if( (iter = socket->recv_queue_map->find(port) ) != socket->recv_queue_map->end()) { + queue = iter->second; + } else { + queue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); + socket->recv_queue_map->insert({port, queue}); + } + + if (socket->recv_thread == 0) { + + pthread_create(&(socket->recv_thread ), NULL, _dgram_mod_run_recv , _socket); + + } + + shm_sendto(socket->shm_socket, send_buf, send_size, port); + shm_msg_t msg; +// printf("==============dgram_mod_sendandrecv pop before\n"); + if(queue->pop(msg)) { + *recv_buf = msg.buf; + *recv_size = msg.size; +// printf("==============dgram_mod_sendandrecv pop after\n"); + return 0; + } + return -1; + +} \ No newline at end of file diff --git a/src/socket/include/dgram_mod_socket.h b/src/socket/include/dgram_mod_socket.h new file mode 100644 index 0000000..ab635a3 --- /dev/null +++ b/src/socket/include/dgram_mod_socket.h @@ -0,0 +1,41 @@ +#ifndef __DGRAM_MOD_SOCKET_H__ +#define __DGRAM_MOD_SOCKET_H__ + + +#ifdef __cplusplus +extern "C" { +#endif + + +enum socket_mod_t +{ + PULL_PUSH = 1, + REQ_REP = 2, + PAIR = 3, + PUB_SUB = 4, + SURVEY = 5, + BUS = 6 + +}; + + + + +void *dgram_mod_open_socket(int mod); + +int dgram_mod_close_socket(void * _socket); + +int dgram_mod_bind(void * _socket, int port); + +int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port); + +int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port); + + +int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; + +#ifdef __cplusplus +} +#endif + +#endif \ No newline at end of file diff --git a/src/socket/mod_socket.c b/src/socket/mod_socket.c index fcb5e58..62e8b8f 100644 --- a/src/socket/mod_socket.c +++ b/src/socket/mod_socket.c @@ -133,11 +133,9 @@ if(socket->is_server ) { switch(socket->mod) { case REQ_REP: -logger.debug("mod_send before"); SemUtil::dec(socket->items); rv = shm_send(socket->client_socket, buf, size); SemUtil::inc(socket->slots); -logger.debug("mod_send after"); break; case SURVEY: case PUB_SUB: @@ -152,9 +150,7 @@ } else { -logger.debug("mod_send before"); rv = shm_send(socket->shm_socket, buf, size); -logger.debug("mod_send after"); return rv; } return -1; @@ -168,7 +164,6 @@ if(socket->is_server ) { switch(socket->mod) { case REQ_REP: -logger.debug("REQ_REP mod_recv before"); SemUtil::dec(socket->slots); socket->recvQueue->pop(entry); *buf = entry.buf; @@ -176,7 +171,6 @@ socket->client_socket = entry.client_socket; SemUtil::inc(socket->items); -logger.debug("REQ_REP mod_recv after"); break; case PUB_SUB: break; @@ -191,9 +185,7 @@ return 0; } else { -logger.debug("mod_recv before"); shm_recv(socket->shm_socket, buf, size); -logger.debug("mod_recv after"); return 0; } diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c index 260cdc2..3d7ba4e 100644 --- a/src/socket/shm_socket.c +++ b/src/socket/shm_socket.c @@ -287,7 +287,7 @@ } shm_msg_t src; -//logger.debug("shm_recvfrom pop before"); +printf("shm_recvfrom pop before"); if (socket->queue->pop(src)) { void * _buf = malloc(src.size); memcpy(_buf, src.buf, src.size); @@ -295,7 +295,7 @@ *size = src.size; *port = src.port; mm_free(src.buf); -//logger.debug("shm_recvfrom pop after"); +printf("shm_recvfrom pop after"); return 0; } else { return -1; diff --git a/src/util/sem_util.c b/src/util/sem_util.c index e2b2c20..4f294c0 100644 --- a/src/util/sem_util.c +++ b/src/util/sem_util.c @@ -14,12 +14,12 @@ union semun arg; struct sembuf sop; - logger.info("%ld: created semaphore\n", (long)getpid()); + //logger.info("%ld: created semaphore\n", (long)getpid()); arg.val = 0; /* So initialize it to 0 */ if (semctl(semid, 0, SETVAL, arg) == -1) err_exit(errno, "semctl 1"); - logger.info("%ld: initialized semaphore\n", (long)getpid()); + //logger.info("%ld: initialized semaphore\n", (long)getpid()); /* Perform a "no-op" semaphore operation - changes sem_otime so other processes can see we've initialized the set. */ @@ -29,7 +29,7 @@ sop.sem_flg = 0; if (semop(semid, &sop, 1) == -1) err_exit(errno, "semop"); - logger.info("%ld: completed dummy semop()\n", (long)getpid()); + //logger.info("%ld: completed dummy semop()\n", (long)getpid()); } else { /* We didn't create the semaphore set */ @@ -46,12 +46,12 @@ if (semid == -1) err_exit(errno, "semget 2"); - logger.info("%ld: got semaphore key\n", (long)getpid()); + // logger.info("%ld: got semaphore key\n", (long)getpid()); /* Wait until another process has called semop() */ arg.buf = &ds; for (j = 0; j < MAX_TRIES; j++) { - logger.info("Try %d\n", j); + //logger.info("Try %d\n", j); if (semctl(semid, 0, IPC_STAT, arg) == -1) err_exit(errno, "semctl 2"); diff --git a/test_socket/Makefile b/test_socket/Makefile index c2a7f89..214cd1a 100644 --- a/test_socket/Makefile +++ b/test_socket/Makefile @@ -14,7 +14,7 @@ include $(ROOT)/Make.defines.$(PLATFORM) -PROGS = dgram_socket_test +PROGS = dgram_socket_test dgram_mod_req_rep build: $(PROGS) diff --git a/test_socket/dgram_mod_req_rep b/test_socket/dgram_mod_req_rep new file mode 100755 index 0000000..4034ad8 --- /dev/null +++ b/test_socket/dgram_mod_req_rep Binary files differ diff --git a/test_socket/dgram_mod_req_rep.c b/test_socket/dgram_mod_req_rep.c new file mode 100644 index 0000000..a857ce6 --- /dev/null +++ b/test_socket/dgram_mod_req_rep.c @@ -0,0 +1,58 @@ +#include "dgram_mod_socket.h" +#include "shm_mm.h" +#include "usg_common.h" + +void server(int port) { + void *socket = dgram_mod_open_socket(REQ_REP); + dgram_mod_bind(socket, port); + int size; + void *recvbuf; + char sendbuf[512]; + int rv; + int remote_port; + while ( (rv = dgram_mod_recvfrom(socket, &recvbuf, &size, &remote_port) ) == 0) { + sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf); + puts(sendbuf); + dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, remote_port); + free(recvbuf); + } + dgram_mod_close_socket(socket); +} + +void client(int port) { + void *socket = dgram_mod_open_socket(REQ_REP); + int size; + void *recvbuf; + char sendbuf[512]; + while (true) { + printf("request: "); + scanf("%s", sendbuf); + dgram_mod_sendandrecv(socket, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &size); + printf("reply: %s\n", (char *)recvbuf); + free(recvbuf); + } + dgram_mod_close_socket(socket); +} + + + +int main(int argc, char *argv[]) { + shm_init(512); + int port; + if (argc < 3) { + fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client"); + return 1; + } + + port = atoi(argv[2]); + + if (strcmp("server", argv[1]) == 0) { + server(port); + } + + if (strcmp("client", argv[1]) == 0) + client(port); + + + return 0; +} \ No newline at end of file diff --git a/test_socket/dgram_socket_test b/test_socket/dgram_socket_test index d493973..6a18d7c 100755 --- a/test_socket/dgram_socket_test +++ b/test_socket/dgram_socket_test Binary files differ -- Gitblit v1.8.0