From f85c9b875b060681b51f57b15074ba1c7c9f5636 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 20 七月 2020 11:10:02 +0800
Subject: [PATCH] update

---
 queue/mod_socket.c |   91 +++++++++++++++++++++++++++++++++++----------
 1 files changed, 70 insertions(+), 21 deletions(-)

diff --git a/queue/mod_socket.c b/queue/mod_socket.c
index fe098f5..cc358f6 100644
--- a/queue/mod_socket.c
+++ b/queue/mod_socket.c
@@ -1,6 +1,9 @@
 #include "mod_socket.h"
 #include "shm_socket.h"
 #include "usg_common.h"
+#include "logger_factory.h"
+static Logger logger = LoggerFactory::getLogger();
+
 typedef struct mod_entry_t
 {
 	int size;
@@ -52,10 +55,15 @@
 	return rv;
 }
 
+int mod_get_socket_port(void * _socket) {
+	mod_socket_t * socket = (mod_socket_t *) _socket;
+	return socket->shm_socket->port;
+}
+
 
 int mod_socket_bind(void * _socket, int port){
 	mod_socket_t * socket = (mod_socket_t *) _socket;
-	return  shm_soket_bind(socket->shm_socket, port);
+	return  shm_socket_bind(socket->shm_socket, port);
 }
 
 void * run_server_recv_client_msg(void *_socket) {
@@ -71,6 +79,7 @@
 		socket->recvQueue->push(entry);
 		// shm_free(recvbuf);
 	}
+
 	free(_socket);
 	shm_close_socket(client_socket);
 	return NULL;
@@ -81,6 +90,7 @@
 	shm_socket_t *client_socket;
 	pthread_t tid;
 	while(socket->shm_socket->status == SHM_CONN_LISTEN) {
+		//鎺ュ彈瀹㈡埛绔殑杩炴帴璇锋眰
 		client_socket = shm_accept(socket->shm_socket);
 		
 		mod_socket_t *arg = (mod_socket_t *)malloc(sizeof(mod_socket_t));
@@ -110,16 +120,37 @@
 
 }
 
-int mod_send(void * _socket, void *buf, int size) {
+int mod_send(void * _socket, const void *buf, const int size) {
 	mod_socket_t * socket = (mod_socket_t *) _socket;
-	if(!socket->is_server ) {
-		return shm_send(socket->shm_socket, buf, size);
+	std::map<int, shm_socket_t* > *clientSocketMap = socket->shm_socket->clientSocketMap;
+	std::map<int, shm_socket_t* >::iterator iter;
+	int rv;
+	if(socket->is_server ) {
+		switch(socket->mod) {
+			case REQ_REP:
+logger.debug("mod_send before");
+				SemUtil::dec(socket->items);
+				rv = shm_send(socket->client_socket, buf, size);
+				SemUtil::inc(socket->slots);
+logger.debug("mod_send after");
+				break;
+			case SURVEY:
+			case PUB_SUB:
+				for(iter = clientSocketMap->begin(); iter != clientSocketMap->end(); iter++) {
+					rv = shm_send(iter->second, buf, size);
+				}
+				break;
+			default:
+				rv = shm_send(socket->client_socket, buf, size);
+		}
+		return rv;
+		
 	}
-	else if(socket->mod == REQ_REP) {
-		SemUtil::dec(socket->items);
-		shm_send(socket->client_socket, buf, size);
-		SemUtil::inc(socket->slots);
-		return 0;
+	else {
+logger.debug("mod_send before");
+		rv = shm_send(socket->shm_socket, buf, size);
+logger.debug("mod_send after");
+		return rv;
 	}
 	return -1;
 	
@@ -129,21 +160,39 @@
 	mod_socket_t * socket = (mod_socket_t *) _socket;
 	mod_entry_t entry;
 
-	if(!socket->is_server ) {
-		return shm_recv(socket->shm_socket, buf, size);
-	}
-	else if(socket->mod == REQ_REP) {
-		SemUtil::dec(socket->slots);
-		socket->recvQueue->pop(entry);
-		*buf = entry.buf;
-		*size = entry.size;
-		socket->client_socket = entry.client_socket;
-		SemUtil::inc(socket->items);
+	if(socket->is_server ) {
+		switch(socket->mod) {
+			case REQ_REP:
+logger.debug("REQ_REP mod_recv before");
+				SemUtil::dec(socket->slots);
+				socket->recvQueue->pop(entry);
+				*buf = entry.buf;
+				*size = entry.size;
+				socket->client_socket = entry.client_socket;
+				SemUtil::inc(socket->items);
+				
+logger.debug("REQ_REP mod_recv after");
+				break;
+			case PUB_SUB:
+				break;
+			case SURVEY:
+			default:
+				socket->recvQueue->pop(entry);
+				*buf = entry.buf;
+				*size = entry.size;
+
+		}
+		 
 		return 0;
 	}
+	else {
+logger.debug("mod_recv before");
+		shm_recv(socket->shm_socket, buf, size);
+logger.debug("mod_recv after");
+		return 0;
+	}
+
 	return -1;
-
-
 }
 
 

--
Gitblit v1.8.0