From 5e3e6719f7d7922decdc16d2313baf2e94210750 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 17 七月 2020 18:29:11 +0800 Subject: [PATCH] pub_sub finished --- queue/libshm_queue.a | 0 queue/shm_socket.c | 4 ++ test2/pub_sub.c | 40 +++++++++----------- queue/mod_socket.c | 63 ++++++++++++++++++++++--------- test2/Makefile | 2 test2/pub_sub | 0 test2/req_rep | 0 7 files changed, 67 insertions(+), 42 deletions(-) diff --git a/queue/libshm_queue.a b/queue/libshm_queue.a index ad2d091..f082fa6 100644 --- a/queue/libshm_queue.a +++ b/queue/libshm_queue.a Binary files differ diff --git a/queue/mod_socket.c b/queue/mod_socket.c index fe098f5..80b5525 100644 --- a/queue/mod_socket.c +++ b/queue/mod_socket.c @@ -112,14 +112,29 @@ int mod_send(void * _socket, void *buf, int size) { mod_socket_t * socket = (mod_socket_t *) _socket; - if(!socket->is_server ) { - return shm_send(socket->shm_socket, buf, size); + std::map<int, shm_socket_t* > *clientSocketMap = socket->shm_socket->clientSocketMap; + std::map<int, shm_socket_t* >::iterator iter; + int rv; + if(socket->is_server ) { + switch(socket->mod) { + case REQ_REP: + SemUtil::dec(socket->items); + rv = shm_send(socket->client_socket, buf, size); + SemUtil::inc(socket->slots); + break; + case PUB_SUB: + for(iter = clientSocketMap->begin(); iter != clientSocketMap->end(); iter++) { + rv = shm_send(iter->second, buf, size); + } + break; + default: + err_exit(0, "涓嶆敮鎸佺殑妯″紡%d", socket->mod); + } + return rv; + } - else if(socket->mod == REQ_REP) { - SemUtil::dec(socket->items); - shm_send(socket->client_socket, buf, size); - SemUtil::inc(socket->slots); - return 0; + else { + return shm_send(socket->shm_socket, buf, size); } return -1; @@ -128,22 +143,32 @@ int mod_recv(void * _socket, void **buf, int *size) { mod_socket_t * socket = (mod_socket_t *) _socket; mod_entry_t entry; + int rv; - if(!socket->is_server ) { + if(socket->is_server ) { + switch(socket->mod) { + case REQ_REP: + SemUtil::dec(socket->slots); + rv = socket->recvQueue->pop(entry); + *buf = entry.buf; + *size = entry.size; + socket->client_socket = entry.client_socket; + SemUtil::inc(socket->items); + break; + case PUB_SUB: + rv = 0; + break; + default: + err_exit(0, "涓嶆敮鎸佺殑妯″紡%d", socket->mod); + } + + return rv; + } + else { return shm_recv(socket->shm_socket, buf, size); } - else if(socket->mod == REQ_REP) { - SemUtil::dec(socket->slots); - socket->recvQueue->pop(entry); - *buf = entry.buf; - *size = entry.size; - socket->client_socket = entry.client_socket; - SemUtil::inc(socket->items); - return 0; - } + return -1; - - } diff --git a/queue/shm_socket.c b/queue/shm_socket.c index e0bdac6..27c5b0a 100644 --- a/queue/shm_socket.c +++ b/queue/shm_socket.c @@ -308,6 +308,10 @@ int shm_send(shm_socket_t *socket, void *buf, int size) { // hashtable_t *hashtable = mm_get_hashtable(); + if(socket->remoteQueue == NULL) { + err_msg(errno, "褰撳墠瀹㈡埛绔棤杩炴帴!"); + return -1; + } shm_msg_t dest; dest.type=SHM_COMMON_MSG; dest.port = socket->port; diff --git a/test2/Makefile b/test2/Makefile index e8c4421..5828f23 100644 --- a/test2/Makefile +++ b/test2/Makefile @@ -14,7 +14,7 @@ include $(ROOT)/Make.defines.$(PLATFORM) -PROGS = req_rep +PROGS = req_rep pub_sub build: $(PROGS) diff --git a/test2/pub_sub b/test2/pub_sub new file mode 100755 index 0000000..19773d9 --- /dev/null +++ b/test2/pub_sub Binary files differ diff --git a/test2/pub_sub.c b/test2/pub_sub.c index 669783f..cf2b507 100644 --- a/test2/pub_sub.c +++ b/test2/pub_sub.c @@ -1,41 +1,37 @@ -#include "socket.h" +#include "mod_socket.h" +#include "shm_mm.h" +#include "usg_common.h" void server(int port) { - void *socket = shm_open_socket(PUB_SUB); - shm_bind(socket, port); - shm_listen(socket); + void *socket = mod_open_socket(PUB_SUB); + mod_socket_bind(socket, port); + mod_listen(socket); int size; void *recvbuf; char sendbuf[512]; while(true) { - shm_recv(socket, &recvbuf, &size); - sprintf(sendbuf, "pub: %s", recvbuf); - puts(sendbuf); - shm_send(socket, sendbuf, strlen(sendbuf)+1) ; - shm_free(recvbuf); + printf("璇疯緭鍏ュ彂甯冩秷鎭�:"); + scanf("%s", sendbuf); + mod_send(socket, sendbuf, strlen(sendbuf)+1) ; + free(recvbuf); } - shm_close_socket(socket); + mod_close_socket(socket); } void client(int port) { - void *socket = shm_open_socket(PUB_SUB); - shm_connect(socket, port); + void *socket = mod_open_socket(PUB_SUB); + mod_connect(socket, port); int size; void *recvbuf; - char sendbuf[512]; - - sprintf(sendbuf, "sub"); - shm_send(socket, sendbuf, strlen(sendbuf)+1) ; - while(true) { - - shm_recv(socket, &recvbuf, &size); - printf("received sub message: %s\n", (char *)recvbuf); - shm_free(recvbuf); + + while(mod_recv(socket, &recvbuf, &size) == 0) { + printf("鏀跺埌璁㈤槄娑堟伅: %s\n", (char *)recvbuf); + free(recvbuf); } - shm_close_socket(socket); + mod_close_socket(socket); } int main(int argc, char *argv[]) { diff --git a/test2/req_rep b/test2/req_rep index bd9df43..e3e7f24 100755 --- a/test2/req_rep +++ b/test2/req_rep Binary files differ -- Gitblit v1.8.0