#include "usg_common.h" #include "usg_typedef.h" #include "shm_queue.h" #include "shm_allocator.h" #include "mem_pool.h" #include "hashtable.h" #include "sem_util.h" #include "socket.h" #include enum shm_msg_type_t { SHM_SOCKET_OPEN = 1, SHM_SOKET_CLOSE = 2, SHM_COMMON_MSG = 3 }; typedef struct shm_msg_t { int port; shm_msg_type_t type; size_t size; void * buf; } shm_msg_t; typedef struct shm_socket_t { int port; shm_mod_t mod; SHMQueue *queue; SHMQueue *remoteQueue; // std::map* > *remoteQueueMap; int slots; int items; int is_server; } shm_socket_t; void shm_init(int size) { mem_pool_init(size); } void shm_destroy() { mem_pool_destroy(); } void shm_free(void *buf) { free(buf); } void *shm_open_socket(int mod) { shm_socket_t *socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); socket->remoteQueueMap = new std::map* >; socket->port = -1; socket->mod = (shm_mod_t)mod; printf("mod===%d\n", socket->mod); socket->is_server = 0; if (mod == REQ_REP) { socket->slots = SemUtil::get(IPC_PRIVATE, 1); socket->items = SemUtil::get(IPC_PRIVATE, 0); } return (void *)socket; } int shm_close_socket(void *socket) { shm_socket_t * _socket = (shm_socket_t *) socket; delete _socket->queue; std::map* > *remoteQueueMap = _socket->remoteQueueMap; for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) { delete iter->second; } delete _socket->remoteQueueMap; if (_socket->mod == REQ_REP) { SemUtil::remove(_socket->slots); SemUtil::remove(_socket->items); } free(socket); return 0; } int shm_bind(void* socket, int port) { shm_socket_t * _socket = (shm_socket_t *) socket; _socket -> port = port; return 0; } int shm_listen(void* socket) { shm_socket_t * _socket = (shm_socket_t *) socket; _socket->is_server = 1; int port; hashtable_t *hashtable = mm_get_hashtable(); if(_socket -> port == -1) { port = hashtable_alloc_key(hashtable); _socket -> port = port; } else { if(hashtable_get(hashtable, _socket->port)!= NULL) { err_exit(0, "key %d has already been in used!", _socket->port); } } _socket->queue = new SHMQueue(_socket->port, 16); return 0; } static int __shm_rev__(shm_socket_t* _socket) { shm_msg_t src; std::map* > *remoteQueueMap = _socket->remoteQueueMap; bool rv = _socket->queue->pop(src); if (rv) { if(src.type == SHM_SOCKET_OPEN) { _socket->remoteQueue = new SHMQueue(src.port, 0); } void * _buf = malloc(src.size); memcpy(_buf, src.buf, src.size); *buf = _buf; *size = src.size; mm_free(src.buf); return 0; } else { return 1; } } int shm_connect(void* socket, int port) { shm_socket_t * _socket = (shm_socket_t *) socket; hashtable_t *hashtable = mm_get_hashtable(); if(hashtable_get(hashtable, port)== NULL) { err_exit(0, "shm_connect:connect at port %d failed!", port); } if(_socket -> port == -1) { _socket -> port = hashtable_alloc_key(hashtable); } else { if(hashtable_get(hashtable, _socket->port)!= NULL) { err_exit(0, "key %d has already been in used!", _socket->port); } } _socket->queue = new SHMQueue(_socket->port, 16); _socket->remoteQueueMap->insert({port, new SHMQueue(port, 0)}); return 0; } int shm_send(void *socket, void *buf, int size) { shm_socket_t * _socket = (shm_socket_t *) socket; hashtable_t *hashtable = mm_get_hashtable(); shm_msg_t dest; dest.port = _socket->port; dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, buf, size); std::map* > *remoteQueueMap = _socket->remoteQueueMap; for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) { if(hashtable_get(hashtable, iter->first)== NULL) { err_msg(0, "shm_send:connect at port %d failed, the other part has been closed!", iter->first); delete iter->second; remoteQueueMap->erase(iter); continue; } if(_socket->mod == REQ_REP && _socket->is_server == 1) SemUtil::dec(_socket->items); iter->second->push(dest); if( _socket->mod == REQ_REP && _socket->is_server == 1) { delete iter->second; remoteQueueMap->erase(iter); SemUtil::inc(_socket->slots); } } return 0; } int shm_recv(void* socket, void **buf, int *size) { shm_socket_t * _socket = (shm_socket_t *) socket; shm_msg_t src; std::map* > *remoteQueueMap = _socket->remoteQueueMap; bool rv = _socket->queue->pop(src); if (rv) { if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) { if(_socket->mod == REQ_REP) SemUtil::dec(_socket->slots); remoteQueueMap->insert({src.port, new SHMQueue(src.port, 0)}); if(_socket->mod == REQ_REP) SemUtil::inc(_socket->items); } void * _buf = malloc(src.size); memcpy(_buf, src.buf, src.size); *buf = _buf; *size = src.size; mm_free(src.buf); return 0; } else { return 1; } }