From f85c9b875b060681b51f57b15074ba1c7c9f5636 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 20 七月 2020 11:10:02 +0800 Subject: [PATCH] update --- queue/mod_socket.c | 91 +++++++++++++++++++++++++++++++++++---------- 1 files changed, 70 insertions(+), 21 deletions(-) diff --git a/queue/mod_socket.c b/queue/mod_socket.c index fe098f5..cc358f6 100644 --- a/queue/mod_socket.c +++ b/queue/mod_socket.c @@ -1,6 +1,9 @@ #include "mod_socket.h" #include "shm_socket.h" #include "usg_common.h" +#include "logger_factory.h" +static Logger logger = LoggerFactory::getLogger(); + typedef struct mod_entry_t { int size; @@ -52,10 +55,15 @@ return rv; } +int mod_get_socket_port(void * _socket) { + mod_socket_t * socket = (mod_socket_t *) _socket; + return socket->shm_socket->port; +} + int mod_socket_bind(void * _socket, int port){ mod_socket_t * socket = (mod_socket_t *) _socket; - return shm_soket_bind(socket->shm_socket, port); + return shm_socket_bind(socket->shm_socket, port); } void * run_server_recv_client_msg(void *_socket) { @@ -71,6 +79,7 @@ socket->recvQueue->push(entry); // shm_free(recvbuf); } + free(_socket); shm_close_socket(client_socket); return NULL; @@ -81,6 +90,7 @@ shm_socket_t *client_socket; pthread_t tid; while(socket->shm_socket->status == SHM_CONN_LISTEN) { + //鎺ュ彈瀹㈡埛绔殑杩炴帴璇锋眰 client_socket = shm_accept(socket->shm_socket); mod_socket_t *arg = (mod_socket_t *)malloc(sizeof(mod_socket_t)); @@ -110,16 +120,37 @@ } -int mod_send(void * _socket, void *buf, int size) { +int mod_send(void * _socket, const void *buf, const int size) { mod_socket_t * socket = (mod_socket_t *) _socket; - if(!socket->is_server ) { - return shm_send(socket->shm_socket, buf, size); + std::map<int, shm_socket_t* > *clientSocketMap = socket->shm_socket->clientSocketMap; + std::map<int, shm_socket_t* >::iterator iter; + int rv; + if(socket->is_server ) { + switch(socket->mod) { + case REQ_REP: +logger.debug("mod_send before"); + SemUtil::dec(socket->items); + rv = shm_send(socket->client_socket, buf, size); + SemUtil::inc(socket->slots); +logger.debug("mod_send after"); + break; + case SURVEY: + case PUB_SUB: + for(iter = clientSocketMap->begin(); iter != clientSocketMap->end(); iter++) { + rv = shm_send(iter->second, buf, size); + } + break; + default: + rv = shm_send(socket->client_socket, buf, size); + } + return rv; + } - else if(socket->mod == REQ_REP) { - SemUtil::dec(socket->items); - shm_send(socket->client_socket, buf, size); - SemUtil::inc(socket->slots); - return 0; + else { +logger.debug("mod_send before"); + rv = shm_send(socket->shm_socket, buf, size); +logger.debug("mod_send after"); + return rv; } return -1; @@ -129,21 +160,39 @@ mod_socket_t * socket = (mod_socket_t *) _socket; mod_entry_t entry; - if(!socket->is_server ) { - return shm_recv(socket->shm_socket, buf, size); - } - else if(socket->mod == REQ_REP) { - SemUtil::dec(socket->slots); - socket->recvQueue->pop(entry); - *buf = entry.buf; - *size = entry.size; - socket->client_socket = entry.client_socket; - SemUtil::inc(socket->items); + if(socket->is_server ) { + switch(socket->mod) { + case REQ_REP: +logger.debug("REQ_REP mod_recv before"); + SemUtil::dec(socket->slots); + socket->recvQueue->pop(entry); + *buf = entry.buf; + *size = entry.size; + socket->client_socket = entry.client_socket; + SemUtil::inc(socket->items); + +logger.debug("REQ_REP mod_recv after"); + break; + case PUB_SUB: + break; + case SURVEY: + default: + socket->recvQueue->pop(entry); + *buf = entry.buf; + *size = entry.size; + + } + return 0; } + else { +logger.debug("mod_recv before"); + shm_recv(socket->shm_socket, buf, size); +logger.debug("mod_recv after"); + return 0; + } + return -1; - - } -- Gitblit v1.8.0