From 5e3e6719f7d7922decdc16d2313baf2e94210750 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期五, 17 七月 2020 18:29:11 +0800
Subject: [PATCH] pub_sub finished

---
 queue/mod_socket.c |   63 ++++++++++++++++++++++---------
 1 files changed, 44 insertions(+), 19 deletions(-)

diff --git a/queue/mod_socket.c b/queue/mod_socket.c
index fe098f5..80b5525 100644
--- a/queue/mod_socket.c
+++ b/queue/mod_socket.c
@@ -112,14 +112,29 @@
 
 int mod_send(void * _socket, void *buf, int size) {
 	mod_socket_t * socket = (mod_socket_t *) _socket;
-	if(!socket->is_server ) {
-		return shm_send(socket->shm_socket, buf, size);
+	std::map<int, shm_socket_t* > *clientSocketMap = socket->shm_socket->clientSocketMap;
+	std::map<int, shm_socket_t* >::iterator iter;
+	int rv;
+	if(socket->is_server ) {
+		switch(socket->mod) {
+			case REQ_REP:
+				SemUtil::dec(socket->items);
+				rv = shm_send(socket->client_socket, buf, size);
+				SemUtil::inc(socket->slots);
+				break;
+			case PUB_SUB:
+				for(iter = clientSocketMap->begin(); iter != clientSocketMap->end(); iter++) {
+					rv = shm_send(iter->second, buf, size);
+				}
+				break;
+			default:
+				err_exit(0, "涓嶆敮鎸佺殑妯″紡%d", socket->mod);
+		}
+		return rv;
+		
 	}
-	else if(socket->mod == REQ_REP) {
-		SemUtil::dec(socket->items);
-		shm_send(socket->client_socket, buf, size);
-		SemUtil::inc(socket->slots);
-		return 0;
+	else {
+		return shm_send(socket->shm_socket, buf, size);
 	}
 	return -1;
 	
@@ -128,22 +143,32 @@
 int mod_recv(void * _socket, void **buf, int *size) {
 	mod_socket_t * socket = (mod_socket_t *) _socket;
 	mod_entry_t entry;
+	int rv;
 
-	if(!socket->is_server ) {
+	if(socket->is_server ) {
+		switch(socket->mod) {
+			case REQ_REP:
+				SemUtil::dec(socket->slots);
+				rv = socket->recvQueue->pop(entry);
+				*buf = entry.buf;
+				*size = entry.size;
+				socket->client_socket = entry.client_socket;
+				SemUtil::inc(socket->items);
+				break;
+			case PUB_SUB:
+				rv = 0;
+				break;
+			default:
+				err_exit(0, "涓嶆敮鎸佺殑妯″紡%d", socket->mod);
+		}
+
+		return rv;
+	}
+	else {
 		return shm_recv(socket->shm_socket, buf, size);
 	}
-	else if(socket->mod == REQ_REP) {
-		SemUtil::dec(socket->slots);
-		socket->recvQueue->pop(entry);
-		*buf = entry.buf;
-		*size = entry.size;
-		socket->client_socket = entry.client_socket;
-		SemUtil::inc(socket->items);
-		return 0;
-	}
+
 	return -1;
-
-
 }
 
 

--
Gitblit v1.8.0