From 37989e0300c20cd65a4d4a3c49eeaea2122ddf9e Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 20 七月 2020 10:28:02 +0800 Subject: [PATCH] update --- queue/mod_socket.c | 195 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 195 insertions(+), 0 deletions(-) diff --git a/queue/mod_socket.c b/queue/mod_socket.c index f73dcba..a63c37d 100644 --- a/queue/mod_socket.c +++ b/queue/mod_socket.c @@ -1,2 +1,197 @@ #include "mod_socket.h" +#include "shm_socket.h" +#include "usg_common.h" +#include "logger_factory.h" +static Logger logger = LoggerFactory::getLogger(); + +typedef struct mod_entry_t +{ + int size; + void *buf; + shm_socket_t *client_socket; +}mod_entry_t; + +typedef struct mod_socket_t { + socket_mod_t mod; + shm_socket_t *shm_socket; + shm_socket_t *client_socket; + int is_server; + LockFreeQueue<mod_entry_t, DM_Allocator> *recvQueue; + int slots; + int items; + + +} mod_socket_t; + +/** + * + */ +void *mod_open_socket(int mod) { + mod_socket_t *socket = (mod_socket_t *)malloc(sizeof(mod_socket_t)); + socket->shm_socket=shm_open_socket(); + socket->is_server = 0; + socket->mod = (socket_mod_t)mod; + socket->recvQueue = new LockFreeQueue<mod_entry_t, DM_Allocator>(16); + if (mod == REQ_REP) { + socket->slots = SemUtil::get(IPC_PRIVATE, 1); + socket->items = SemUtil::get(IPC_PRIVATE, 0); + } + + return (void *)socket; +} + + + +int mod_close_socket(void * _socket){ + mod_socket_t * socket = (mod_socket_t *) _socket; + + if (socket->mod == REQ_REP) { + SemUtil::remove(socket->slots); + SemUtil::remove(socket->items); + } + + int rv = shm_close_socket(socket->shm_socket); + free(_socket); + return rv; +} + + +int mod_socket_bind(void * _socket, int port){ + mod_socket_t * socket = (mod_socket_t *) _socket; + return shm_socket_bind(socket->shm_socket, port); +} + +void * run_server_recv_client_msg(void *_socket) { + pthread_detach(pthread_self()); + mod_socket_t * socket = (mod_socket_t *) _socket; + shm_socket_t * client_socket = socket->client_socket; + + mod_entry_t entry; + entry.client_socket = client_socket; + while (socket->shm_socket->status == SHM_CONN_LISTEN && + client_socket->status == SHM_CONN_ESTABLISHED && shm_recv(client_socket, &entry.buf, &entry.size) == 0 ) { + + socket->recvQueue->push(entry); + // shm_free(recvbuf); + } + + free(_socket); + shm_close_socket(client_socket); + return NULL; +} + +void *run_accept_connection(void * _socket) { + mod_socket_t * socket = (mod_socket_t *) _socket; + shm_socket_t *client_socket; + pthread_t tid; + while(socket->shm_socket->status == SHM_CONN_LISTEN) { + //鎺ュ彈瀹㈡埛绔殑杩炴帴璇锋眰 + client_socket = shm_accept(socket->shm_socket); + + mod_socket_t *arg = (mod_socket_t *)malloc(sizeof(mod_socket_t)); + memcpy(arg, _socket, sizeof(mod_socket_t)); + arg->client_socket = client_socket; + pthread_create(&tid, NULL, run_server_recv_client_msg , (void *)arg); + } + return NULL; +} + +int mod_listen(void * _socket) { + mod_socket_t * socket = (mod_socket_t *) _socket; + pthread_t tid; + socket->is_server = 1; + int rv = shm_listen(socket->shm_socket); + if(rv == 0) { + pthread_create(&tid, NULL, run_accept_connection, _socket); + return 0; + } + return -1; +} + + +int mod_connect(void * _socket, int port) { + mod_socket_t * socket = (mod_socket_t *) _socket; + return shm_connect(socket->shm_socket, port); + +} + +int mod_send(void * _socket, const void *buf, const int size) { + mod_socket_t * socket = (mod_socket_t *) _socket; + std::map<int, shm_socket_t* > *clientSocketMap = socket->shm_socket->clientSocketMap; + std::map<int, shm_socket_t* >::iterator iter; + int rv; + if(socket->is_server ) { + switch(socket->mod) { + case REQ_REP: +logger.debug("mod_send before"); + SemUtil::dec(socket->items); + rv = shm_send(socket->client_socket, buf, size); + SemUtil::inc(socket->slots); +logger.debug("mod_send after"); + break; + case SURVEY: + case PUB_SUB: + for(iter = clientSocketMap->begin(); iter != clientSocketMap->end(); iter++) { + rv = shm_send(iter->second, buf, size); + } + break; + default: + rv = shm_send(socket->client_socket, buf, size); + } + return rv; + + } + else { +logger.debug("mod_send before"); + rv = shm_send(socket->shm_socket, buf, size); +logger.debug("mod_send after"); + return rv; + } + return -1; + +} + +int mod_recv(void * _socket, void **buf, int *size) { + mod_socket_t * socket = (mod_socket_t *) _socket; + mod_entry_t entry; + + if(socket->is_server ) { + switch(socket->mod) { + case REQ_REP: +logger.debug("REQ_REP mod_recv before"); + SemUtil::dec(socket->slots); + socket->recvQueue->pop(entry); + *buf = entry.buf; + *size = entry.size; + socket->client_socket = entry.client_socket; + SemUtil::inc(socket->items); + +logger.debug("REQ_REP mod_recv after"); + break; + case PUB_SUB: + break; + case SURVEY: + default: + socket->recvQueue->pop(entry); + *buf = entry.buf; + *size = entry.size; + + } + + return 0; + } + else { +logger.debug("mod_recv before"); + shm_recv(socket->shm_socket, buf, size); +logger.debug("mod_recv after"); + return 0; + } + + return -1; +} + + +void mod_free(void *buf) { + free(buf); +} -- Gitblit v1.8.0