#include "usg_common.h" #include "shm_stream_mod_socket.h" #include "shm_socket.h" #include "shm_allocator.h" #include "mem_pool.h" #include "hashtable.h" #include "sem_util.h" #include "logger_factory.h" static Logger *logger = LoggerFactory::getLogger(); typedef struct mod_entry_t { int size; void *buf; shm_socket_t *client_socket; }mod_entry_t; typedef struct shm_stream_mod_socket_t { socket_mod_t mod; shm_socket_t *shm_socket; shm_socket_t *client_socket; int is_server; LockFreeQueue *recvQueue; int slots; int items; } shm_stream_mod_socket_t; /** * */ void *shm_stream_mod_socket_open(int mod) { shm_stream_mod_socket_t *socket = (shm_stream_mod_socket_t *)malloc(sizeof(shm_stream_mod_socket_t)); socket->shm_socket=shm_open_socket(SHM_SOCKET_STREAM); socket->is_server = 0; socket->mod = (socket_mod_t)mod; socket->recvQueue = new LockFreeQueue(16); if (mod == REQ_REP) { socket->slots = SemUtil::get(IPC_PRIVATE, 1); socket->items = SemUtil::get(IPC_PRIVATE, 0); } return (void *)socket; } int shm_stream_mod_socket_close(void * _socket){ shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket; if (socket->mod == REQ_REP) { SemUtil::remove(socket->slots); SemUtil::remove(socket->items); } int rv = shm_close_socket(socket->shm_socket); free(_socket); return rv; } int shm_stream_mod_socket_bind(void * _socket, int key){ shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket; return shm_socket_bind(socket->shm_socket, key); } void * run_server_recv_client_msg(void *_socket) { pthread_detach(pthread_self()); shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket; shm_socket_t * client_socket = socket->client_socket; mod_entry_t entry; entry.client_socket = client_socket; while (socket->shm_socket->status == SHM_CONN_LISTEN && client_socket->status == SHM_CONN_ESTABLISHED && shm_recv(client_socket, &entry.buf, &entry.size) == 0 ) { socket->recvQueue->push(entry); // shm_free(recvbuf); } free(_socket); shm_close_socket(client_socket); return NULL; } void *run_accept_connection(void * _socket) { shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket; shm_socket_t *client_socket; pthread_t tid; while(socket->shm_socket->status == SHM_CONN_LISTEN) { //接受客户端的连接请求 client_socket = shm_accept(socket->shm_socket); shm_stream_mod_socket_t *arg = (shm_stream_mod_socket_t *)malloc(sizeof(shm_stream_mod_socket_t)); memcpy(arg, _socket, sizeof(shm_stream_mod_socket_t)); arg->client_socket = client_socket; pthread_create(&tid, NULL, run_server_recv_client_msg , (void *)arg); } return NULL; } int shm_stream_mod_socket_listen(void * _socket) { shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket; pthread_t tid; socket->is_server = 1; int rv = shm_listen(socket->shm_socket); if(rv == 0) { pthread_create(&tid, NULL, run_accept_connection, _socket); return 0; } return -1; } int shm_stream_mod_socket_connect(void * _socket, int key) { shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket; return shm_connect(socket->shm_socket, key); } int shm_stream_mod_socket_send(void * _socket, const void *buf, const int size) { shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket; std::map *clientSocketMap = socket->shm_socket->clientSocketMap; std::map::iterator iter; int rv; if(socket->is_server ) { switch(socket->mod) { case REQ_REP: SemUtil::dec(socket->items); rv = shm_send(socket->client_socket, buf, size); SemUtil::inc(socket->slots); break; case SURVEY: case PUB_SUB: for(iter = clientSocketMap->begin(); iter != clientSocketMap->end(); iter++) { rv = shm_send(iter->second, buf, size); } break; default: rv = shm_send(socket->client_socket, buf, size); } return rv; } else { rv = shm_send(socket->shm_socket, buf, size); return rv; } return -1; } int shm_stream_mod_socket_recv(void * _socket, void **buf, int *size) { shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket; mod_entry_t entry; if(socket->is_server ) { switch(socket->mod) { case REQ_REP: SemUtil::dec(socket->slots); socket->recvQueue->pop(entry); *buf = entry.buf; *size = entry.size; socket->client_socket = entry.client_socket; SemUtil::inc(socket->items); break; case PUB_SUB: break; case SURVEY: default: socket->recvQueue->pop(entry); *buf = entry.buf; *size = entry.size; } return 0; } else { shm_recv(socket->shm_socket, buf, size); return 0; } return -1; } int shm_stream_mod_socket_get_key(void * _socket) { shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket; return socket->shm_socket->key; } void shm_stream_mod_socket_free(void *buf) { free(buf); }