From 37989e0300c20cd65a4d4a3c49eeaea2122ddf9e Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 20 七月 2020 10:28:02 +0800 Subject: [PATCH] update --- queue/mod_socket.c | 86 ++++++++++++++++++++++++++++++++---------- 1 files changed, 65 insertions(+), 21 deletions(-) diff --git a/queue/mod_socket.c b/queue/mod_socket.c index fe098f5..a63c37d 100644 --- a/queue/mod_socket.c +++ b/queue/mod_socket.c @@ -1,6 +1,9 @@ #include "mod_socket.h" #include "shm_socket.h" #include "usg_common.h" +#include "logger_factory.h" +static Logger logger = LoggerFactory::getLogger(); + typedef struct mod_entry_t { int size; @@ -55,7 +58,7 @@ int mod_socket_bind(void * _socket, int port){ mod_socket_t * socket = (mod_socket_t *) _socket; - return shm_soket_bind(socket->shm_socket, port); + return shm_socket_bind(socket->shm_socket, port); } void * run_server_recv_client_msg(void *_socket) { @@ -71,6 +74,7 @@ socket->recvQueue->push(entry); // shm_free(recvbuf); } + free(_socket); shm_close_socket(client_socket); return NULL; @@ -81,6 +85,7 @@ shm_socket_t *client_socket; pthread_t tid; while(socket->shm_socket->status == SHM_CONN_LISTEN) { + //鎺ュ彈瀹㈡埛绔殑杩炴帴璇锋眰 client_socket = shm_accept(socket->shm_socket); mod_socket_t *arg = (mod_socket_t *)malloc(sizeof(mod_socket_t)); @@ -110,16 +115,37 @@ } -int mod_send(void * _socket, void *buf, int size) { +int mod_send(void * _socket, const void *buf, const int size) { mod_socket_t * socket = (mod_socket_t *) _socket; - if(!socket->is_server ) { - return shm_send(socket->shm_socket, buf, size); + std::map<int, shm_socket_t* > *clientSocketMap = socket->shm_socket->clientSocketMap; + std::map<int, shm_socket_t* >::iterator iter; + int rv; + if(socket->is_server ) { + switch(socket->mod) { + case REQ_REP: +logger.debug("mod_send before"); + SemUtil::dec(socket->items); + rv = shm_send(socket->client_socket, buf, size); + SemUtil::inc(socket->slots); +logger.debug("mod_send after"); + break; + case SURVEY: + case PUB_SUB: + for(iter = clientSocketMap->begin(); iter != clientSocketMap->end(); iter++) { + rv = shm_send(iter->second, buf, size); + } + break; + default: + rv = shm_send(socket->client_socket, buf, size); + } + return rv; + } - else if(socket->mod == REQ_REP) { - SemUtil::dec(socket->items); - shm_send(socket->client_socket, buf, size); - SemUtil::inc(socket->slots); - return 0; + else { +logger.debug("mod_send before"); + rv = shm_send(socket->shm_socket, buf, size); +logger.debug("mod_send after"); + return rv; } return -1; @@ -129,21 +155,39 @@ mod_socket_t * socket = (mod_socket_t *) _socket; mod_entry_t entry; - if(!socket->is_server ) { - return shm_recv(socket->shm_socket, buf, size); - } - else if(socket->mod == REQ_REP) { - SemUtil::dec(socket->slots); - socket->recvQueue->pop(entry); - *buf = entry.buf; - *size = entry.size; - socket->client_socket = entry.client_socket; - SemUtil::inc(socket->items); + if(socket->is_server ) { + switch(socket->mod) { + case REQ_REP: +logger.debug("REQ_REP mod_recv before"); + SemUtil::dec(socket->slots); + socket->recvQueue->pop(entry); + *buf = entry.buf; + *size = entry.size; + socket->client_socket = entry.client_socket; + SemUtil::inc(socket->items); + +logger.debug("REQ_REP mod_recv after"); + break; + case PUB_SUB: + break; + case SURVEY: + default: + socket->recvQueue->pop(entry); + *buf = entry.buf; + *size = entry.size; + + } + return 0; } + else { +logger.debug("mod_recv before"); + shm_recv(socket->shm_socket, buf, size); +logger.debug("mod_recv after"); + return 0; + } + return -1; - - } -- Gitblit v1.8.0