From 5e3e6719f7d7922decdc16d2313baf2e94210750 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期五, 17 七月 2020 18:29:11 +0800
Subject: [PATCH] pub_sub finished

---
 queue/libshm_queue.a |    0 
 queue/shm_socket.c   |    4 ++
 test2/pub_sub.c      |   40 +++++++++-----------
 queue/mod_socket.c   |   63 ++++++++++++++++++++++---------
 test2/Makefile       |    2 
 test2/pub_sub        |    0 
 test2/req_rep        |    0 
 7 files changed, 67 insertions(+), 42 deletions(-)

diff --git a/queue/libshm_queue.a b/queue/libshm_queue.a
index ad2d091..f082fa6 100644
--- a/queue/libshm_queue.a
+++ b/queue/libshm_queue.a
Binary files differ
diff --git a/queue/mod_socket.c b/queue/mod_socket.c
index fe098f5..80b5525 100644
--- a/queue/mod_socket.c
+++ b/queue/mod_socket.c
@@ -112,14 +112,29 @@
 
 int mod_send(void * _socket, void *buf, int size) {
 	mod_socket_t * socket = (mod_socket_t *) _socket;
-	if(!socket->is_server ) {
-		return shm_send(socket->shm_socket, buf, size);
+	std::map<int, shm_socket_t* > *clientSocketMap = socket->shm_socket->clientSocketMap;
+	std::map<int, shm_socket_t* >::iterator iter;
+	int rv;
+	if(socket->is_server ) {
+		switch(socket->mod) {
+			case REQ_REP:
+				SemUtil::dec(socket->items);
+				rv = shm_send(socket->client_socket, buf, size);
+				SemUtil::inc(socket->slots);
+				break;
+			case PUB_SUB:
+				for(iter = clientSocketMap->begin(); iter != clientSocketMap->end(); iter++) {
+					rv = shm_send(iter->second, buf, size);
+				}
+				break;
+			default:
+				err_exit(0, "涓嶆敮鎸佺殑妯″紡%d", socket->mod);
+		}
+		return rv;
+		
 	}
-	else if(socket->mod == REQ_REP) {
-		SemUtil::dec(socket->items);
-		shm_send(socket->client_socket, buf, size);
-		SemUtil::inc(socket->slots);
-		return 0;
+	else {
+		return shm_send(socket->shm_socket, buf, size);
 	}
 	return -1;
 	
@@ -128,22 +143,32 @@
 int mod_recv(void * _socket, void **buf, int *size) {
 	mod_socket_t * socket = (mod_socket_t *) _socket;
 	mod_entry_t entry;
+	int rv;
 
-	if(!socket->is_server ) {
+	if(socket->is_server ) {
+		switch(socket->mod) {
+			case REQ_REP:
+				SemUtil::dec(socket->slots);
+				rv = socket->recvQueue->pop(entry);
+				*buf = entry.buf;
+				*size = entry.size;
+				socket->client_socket = entry.client_socket;
+				SemUtil::inc(socket->items);
+				break;
+			case PUB_SUB:
+				rv = 0;
+				break;
+			default:
+				err_exit(0, "涓嶆敮鎸佺殑妯″紡%d", socket->mod);
+		}
+
+		return rv;
+	}
+	else {
 		return shm_recv(socket->shm_socket, buf, size);
 	}
-	else if(socket->mod == REQ_REP) {
-		SemUtil::dec(socket->slots);
-		socket->recvQueue->pop(entry);
-		*buf = entry.buf;
-		*size = entry.size;
-		socket->client_socket = entry.client_socket;
-		SemUtil::inc(socket->items);
-		return 0;
-	}
+
 	return -1;
-
-
 }
 
 
diff --git a/queue/shm_socket.c b/queue/shm_socket.c
index e0bdac6..27c5b0a 100644
--- a/queue/shm_socket.c
+++ b/queue/shm_socket.c
@@ -308,6 +308,10 @@
 
 int shm_send(shm_socket_t *socket, void *buf, int size) {
 	// hashtable_t *hashtable = mm_get_hashtable();
+	if(socket->remoteQueue == NULL) {
+		err_msg(errno, "褰撳墠瀹㈡埛绔棤杩炴帴!");
+		return -1;
+	}
 	shm_msg_t dest;
 	dest.type=SHM_COMMON_MSG;
 	dest.port = socket->port;
diff --git a/test2/Makefile b/test2/Makefile
index e8c4421..5828f23 100644
--- a/test2/Makefile
+++ b/test2/Makefile
@@ -14,7 +14,7 @@
 include $(ROOT)/Make.defines.$(PLATFORM)
 
 
-PROGS =	req_rep
+PROGS =	req_rep pub_sub
 
 
 build: $(PROGS)
diff --git a/test2/pub_sub b/test2/pub_sub
new file mode 100755
index 0000000..19773d9
--- /dev/null
+++ b/test2/pub_sub
Binary files differ
diff --git a/test2/pub_sub.c b/test2/pub_sub.c
index 669783f..cf2b507 100644
--- a/test2/pub_sub.c
+++ b/test2/pub_sub.c
@@ -1,41 +1,37 @@
-#include "socket.h"
+#include "mod_socket.h"
+#include "shm_mm.h"
+#include "usg_common.h"
 
 
 void server(int port) {
-	void *socket = shm_open_socket(PUB_SUB);
-	shm_bind(socket, port);
-	shm_listen(socket);
+	void *socket = mod_open_socket(PUB_SUB);
+	mod_socket_bind(socket, port);
+	mod_listen(socket);
 	int size;
 	void *recvbuf;
 	char sendbuf[512];
 	while(true) {
-		shm_recv(socket, &recvbuf, &size);
-		sprintf(sendbuf, "pub: %s", recvbuf);
-		puts(sendbuf);
-		shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
-		shm_free(recvbuf);
+		printf("璇疯緭鍏ュ彂甯冩秷鎭�:");
+		scanf("%s", sendbuf);
+		mod_send(socket, sendbuf, strlen(sendbuf)+1) ;
+		free(recvbuf);
 
 	}
-	shm_close_socket(socket);
+	mod_close_socket(socket);
 }
 
 void client(int port) {
-	void *socket = shm_open_socket(PUB_SUB);
-	shm_connect(socket, port);
+	void *socket = mod_open_socket(PUB_SUB);
+	mod_connect(socket, port);
 	int size;
 	void *recvbuf;
-	char sendbuf[512];
-
-	sprintf(sendbuf, "sub");
-	shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
-	while(true) {
-		
-		shm_recv(socket, &recvbuf, &size);
-		printf("received sub message: %s\n", (char *)recvbuf);
-		shm_free(recvbuf);
+ 
+	while(mod_recv(socket, &recvbuf, &size) == 0) {
+		printf("鏀跺埌璁㈤槄娑堟伅: %s\n", (char *)recvbuf);
+		free(recvbuf);
 
 	}
-	shm_close_socket(socket);
+	mod_close_socket(socket);
 }
 
 int main(int argc, char *argv[]) {
diff --git a/test2/req_rep b/test2/req_rep
index bd9df43..e3e7f24 100755
--- a/test2/req_rep
+++ b/test2/req_rep
Binary files differ

--
Gitblit v1.8.0