#include "usg_common.h"
|
#include "usg_typedef.h"
|
#include "shm_queue.h"
|
#include "shm_allocator.h"
|
|
#include "mem_pool.h"
|
#include "hashtable.h"
|
#include "sem_util.h"
|
#include "socket.h"
|
#include <map>
|
|
|
enum shm_msg_type_t
|
{
|
SHM_SOCKET_OPEN = 1,
|
SHM_SOKET_CLOSE = 2,
|
SHM_COMMON_MSG = 3
|
|
};
|
|
typedef struct shm_msg_t {
|
int port;
|
shm_msg_type_t type;
|
size_t size;
|
void * buf;
|
|
} shm_msg_t;
|
|
typedef struct shm_socket_t {
|
int port;
|
shm_mod_t mod;
|
SHMQueue<shm_msg_t> *queue;
|
SHMQueue<shm_msg_t> *remoteQueue;
|
// std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap;
|
int slots;
|
int items;
|
int is_server;
|
|
} shm_socket_t;
|
|
|
|
void shm_init(int size) {
|
mem_pool_init(size);
|
}
|
|
void shm_destroy() {
|
mem_pool_destroy();
|
}
|
|
void shm_free(void *buf) {
|
free(buf);
|
}
|
|
void *shm_open_socket(int mod) {
|
shm_socket_t *socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
|
socket->remoteQueueMap = new std::map<int, SHMQueue<shm_msg_t>* >;
|
socket->port = -1;
|
socket->mod = (shm_mod_t)mod;
|
printf("mod===%d\n", socket->mod);
|
socket->is_server = 0;
|
if (mod == REQ_REP) {
|
socket->slots = SemUtil::get(IPC_PRIVATE, 1);
|
socket->items = SemUtil::get(IPC_PRIVATE, 0);
|
}
|
|
return (void *)socket;
|
}
|
|
|
int shm_close_socket(void *socket) {
|
shm_socket_t * _socket = (shm_socket_t *) socket;
|
delete _socket->queue;
|
|
std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
|
for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) {
|
delete iter->second;
|
}
|
delete _socket->remoteQueueMap;
|
|
if (_socket->mod == REQ_REP) {
|
SemUtil::remove(_socket->slots);
|
SemUtil::remove(_socket->items);
|
}
|
|
free(socket);
|
return 0;
|
|
}
|
|
|
int shm_bind(void* socket, int port) {
|
shm_socket_t * _socket = (shm_socket_t *) socket;
|
_socket -> port = port;
|
return 0;
|
}
|
|
int shm_listen(void* socket) {
|
shm_socket_t * _socket = (shm_socket_t *) socket;
|
_socket->is_server = 1;
|
int port;
|
hashtable_t *hashtable = mm_get_hashtable();
|
if(_socket -> port == -1) {
|
port = hashtable_alloc_key(hashtable);
|
_socket -> port = port;
|
} else {
|
|
if(hashtable_get(hashtable, _socket->port)!= NULL) {
|
err_exit(0, "key %d has already been in used!", _socket->port);
|
}
|
}
|
|
_socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16);
|
return 0;
|
}
|
|
|
static int __shm_rev__(shm_socket_t* _socket) {
|
shm_msg_t src;
|
|
std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
|
bool rv = _socket->queue->pop(src);
|
|
if (rv) {
|
if(src.type == SHM_SOCKET_OPEN) {
|
_socket->remoteQueue = new SHMQueue<shm_msg_t>(src.port, 0);
|
}
|
|
|
|
|
|
void * _buf = malloc(src.size);
|
memcpy(_buf, src.buf, src.size);
|
*buf = _buf;
|
*size = src.size;
|
mm_free(src.buf);
|
return 0;
|
} else {
|
return 1;
|
}
|
}
|
|
int shm_connect(void* socket, int port) {
|
shm_socket_t * _socket = (shm_socket_t *) socket;
|
hashtable_t *hashtable = mm_get_hashtable();
|
if(hashtable_get(hashtable, port)== NULL) {
|
err_exit(0, "shm_connect:connect at port %d failed!", port);
|
}
|
if(_socket -> port == -1) {
|
_socket -> port = hashtable_alloc_key(hashtable);
|
} else {
|
|
if(hashtable_get(hashtable, _socket->port)!= NULL) {
|
err_exit(0, "key %d has already been in used!", _socket->port);
|
}
|
}
|
|
_socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16);
|
_socket->remoteQueueMap->insert({port, new SHMQueue<shm_msg_t>(port, 0)});
|
return 0;
|
}
|
|
int shm_send(void *socket, void *buf, int size) {
|
shm_socket_t * _socket = (shm_socket_t *) socket;
|
hashtable_t *hashtable = mm_get_hashtable();
|
shm_msg_t dest;
|
dest.port = _socket->port;
|
dest.size = size;
|
dest.buf = mm_malloc(size);
|
memcpy(dest.buf, buf, size);
|
|
std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
|
for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) {
|
if(hashtable_get(hashtable, iter->first)== NULL) {
|
err_msg(0, "shm_send:connect at port %d failed, the other part has been closed!", iter->first);
|
delete iter->second;
|
remoteQueueMap->erase(iter);
|
continue;
|
}
|
if(_socket->mod == REQ_REP && _socket->is_server == 1)
|
SemUtil::dec(_socket->items);
|
|
iter->second->push(dest);
|
|
if( _socket->mod == REQ_REP && _socket->is_server == 1) {
|
delete iter->second;
|
remoteQueueMap->erase(iter);
|
SemUtil::inc(_socket->slots);
|
}
|
|
|
}
|
return 0;
|
}
|
|
int shm_recv(void* socket, void **buf, int *size) {
|
shm_socket_t * _socket = (shm_socket_t *) socket;
|
shm_msg_t src;
|
|
std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
|
bool rv = _socket->queue->pop(src);
|
if (rv) {
|
if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) {
|
if(_socket->mod == REQ_REP)
|
SemUtil::dec(_socket->slots);
|
|
remoteQueueMap->insert({src.port, new SHMQueue<shm_msg_t>(src.port, 0)});
|
|
if(_socket->mod == REQ_REP)
|
SemUtil::inc(_socket->items);
|
}
|
|
void * _buf = malloc(src.size);
|
memcpy(_buf, src.buf, src.size);
|
*buf = _buf;
|
*size = src.size;
|
mm_free(src.buf);
|
return 0;
|
} else {
|
return 1;
|
}
|
}
|
|
|
|
|
|