From 7032fedd41386f8a0b779d234620b473d978f889 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 17 七月 2020 17:43:18 +0800 Subject: [PATCH] req_rep finished --- queue/mod_socket.c | 151 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 151 insertions(+), 0 deletions(-) diff --git a/queue/mod_socket.c b/queue/mod_socket.c index f73dcba..fe098f5 100644 --- a/queue/mod_socket.c +++ b/queue/mod_socket.c @@ -1,2 +1,153 @@ #include "mod_socket.h" +#include "shm_socket.h" +#include "usg_common.h" +typedef struct mod_entry_t +{ + int size; + void *buf; + shm_socket_t *client_socket; +}mod_entry_t; + +typedef struct mod_socket_t { + socket_mod_t mod; + shm_socket_t *shm_socket; + shm_socket_t *client_socket; + int is_server; + LockFreeQueue<mod_entry_t, DM_Allocator> *recvQueue; + int slots; + int items; + + +} mod_socket_t; + +/** + * + */ +void *mod_open_socket(int mod) { + mod_socket_t *socket = (mod_socket_t *)malloc(sizeof(mod_socket_t)); + socket->shm_socket=shm_open_socket(); + socket->is_server = 0; + socket->mod = (socket_mod_t)mod; + socket->recvQueue = new LockFreeQueue<mod_entry_t, DM_Allocator>(16); + if (mod == REQ_REP) { + socket->slots = SemUtil::get(IPC_PRIVATE, 1); + socket->items = SemUtil::get(IPC_PRIVATE, 0); + } + + return (void *)socket; +} + + + +int mod_close_socket(void * _socket){ + mod_socket_t * socket = (mod_socket_t *) _socket; + + if (socket->mod == REQ_REP) { + SemUtil::remove(socket->slots); + SemUtil::remove(socket->items); + } + + int rv = shm_close_socket(socket->shm_socket); + free(_socket); + return rv; +} + + +int mod_socket_bind(void * _socket, int port){ + mod_socket_t * socket = (mod_socket_t *) _socket; + return shm_soket_bind(socket->shm_socket, port); +} + +void * run_server_recv_client_msg(void *_socket) { + pthread_detach(pthread_self()); + mod_socket_t * socket = (mod_socket_t *) _socket; + shm_socket_t * client_socket = socket->client_socket; + + mod_entry_t entry; + entry.client_socket = client_socket; + while (socket->shm_socket->status == SHM_CONN_LISTEN && + client_socket->status == SHM_CONN_ESTABLISHED && shm_recv(client_socket, &entry.buf, &entry.size) == 0 ) { + + socket->recvQueue->push(entry); + // shm_free(recvbuf); + } + free(_socket); + shm_close_socket(client_socket); + return NULL; +} + +void *run_accept_connection(void * _socket) { + mod_socket_t * socket = (mod_socket_t *) _socket; + shm_socket_t *client_socket; + pthread_t tid; + while(socket->shm_socket->status == SHM_CONN_LISTEN) { + client_socket = shm_accept(socket->shm_socket); + + mod_socket_t *arg = (mod_socket_t *)malloc(sizeof(mod_socket_t)); + memcpy(arg, _socket, sizeof(mod_socket_t)); + arg->client_socket = client_socket; + pthread_create(&tid, NULL, run_server_recv_client_msg , (void *)arg); + } + return NULL; +} + +int mod_listen(void * _socket) { + mod_socket_t * socket = (mod_socket_t *) _socket; + pthread_t tid; + socket->is_server = 1; + int rv = shm_listen(socket->shm_socket); + if(rv == 0) { + pthread_create(&tid, NULL, run_accept_connection, _socket); + return 0; + } + return -1; +} + + +int mod_connect(void * _socket, int port) { + mod_socket_t * socket = (mod_socket_t *) _socket; + return shm_connect(socket->shm_socket, port); + +} + +int mod_send(void * _socket, void *buf, int size) { + mod_socket_t * socket = (mod_socket_t *) _socket; + if(!socket->is_server ) { + return shm_send(socket->shm_socket, buf, size); + } + else if(socket->mod == REQ_REP) { + SemUtil::dec(socket->items); + shm_send(socket->client_socket, buf, size); + SemUtil::inc(socket->slots); + return 0; + } + return -1; + +} + +int mod_recv(void * _socket, void **buf, int *size) { + mod_socket_t * socket = (mod_socket_t *) _socket; + mod_entry_t entry; + + if(!socket->is_server ) { + return shm_recv(socket->shm_socket, buf, size); + } + else if(socket->mod == REQ_REP) { + SemUtil::dec(socket->slots); + socket->recvQueue->pop(entry); + *buf = entry.buf; + *size = entry.size; + socket->client_socket = entry.client_socket; + SemUtil::inc(socket->items); + return 0; + } + return -1; + + +} + + +void mod_free(void *buf) { + free(buf); +} -- Gitblit v1.8.0