#include "usg_common.h"
|
#include "mod_socket.h"
|
#include "shm_socket.h"
|
#include "shm_allocator.h"
|
#include "mem_pool.h"
|
#include "hashtable.h"
|
#include "sem_util.h"
|
#include "logger_factory.h"
|
|
static Logger *logger = LoggerFactory::getLogger();
|
|
typedef struct mod_entry_t
|
{
|
int size;
|
void *buf;
|
shm_socket_t *client_socket;
|
}mod_entry_t;
|
|
typedef struct mod_socket_t {
|
socket_mod_t mod;
|
shm_socket_t *shm_socket;
|
shm_socket_t *client_socket;
|
int is_server;
|
LockFreeQueue<mod_entry_t, DM_Allocator> *recvQueue;
|
int slots;
|
int items;
|
|
|
} mod_socket_t;
|
|
/**
|
*
|
*/
|
void *mod_open_socket(int mod) {
|
mod_socket_t *socket = (mod_socket_t *)malloc(sizeof(mod_socket_t));
|
socket->shm_socket=shm_open_socket(SHM_SOCKET_STREAM);
|
socket->is_server = 0;
|
socket->mod = (socket_mod_t)mod;
|
socket->recvQueue = new LockFreeQueue<mod_entry_t, DM_Allocator>(16);
|
if (mod == REQ_REP) {
|
socket->slots = SemUtil::get(IPC_PRIVATE, 1);
|
socket->items = SemUtil::get(IPC_PRIVATE, 0);
|
}
|
|
return (void *)socket;
|
}
|
|
|
|
int mod_close_socket(void * _socket){
|
mod_socket_t * socket = (mod_socket_t *) _socket;
|
|
if (socket->mod == REQ_REP) {
|
SemUtil::remove(socket->slots);
|
SemUtil::remove(socket->items);
|
}
|
|
int rv = shm_close_socket(socket->shm_socket);
|
free(_socket);
|
return rv;
|
}
|
|
|
|
|
int mod_socket_bind(void * _socket, int port){
|
mod_socket_t * socket = (mod_socket_t *) _socket;
|
return shm_socket_bind(socket->shm_socket, port);
|
}
|
|
void * run_server_recv_client_msg(void *_socket) {
|
pthread_detach(pthread_self());
|
mod_socket_t * socket = (mod_socket_t *) _socket;
|
shm_socket_t * client_socket = socket->client_socket;
|
|
mod_entry_t entry;
|
entry.client_socket = client_socket;
|
while (socket->shm_socket->status == SHM_CONN_LISTEN &&
|
client_socket->status == SHM_CONN_ESTABLISHED && shm_recv(client_socket, &entry.buf, &entry.size) == 0 ) {
|
|
socket->recvQueue->push(entry);
|
// shm_free(recvbuf);
|
}
|
|
free(_socket);
|
shm_close_socket(client_socket);
|
return NULL;
|
}
|
|
void *run_accept_connection(void * _socket) {
|
mod_socket_t * socket = (mod_socket_t *) _socket;
|
shm_socket_t *client_socket;
|
pthread_t tid;
|
while(socket->shm_socket->status == SHM_CONN_LISTEN) {
|
//接受客户端的连接请求
|
client_socket = shm_accept(socket->shm_socket);
|
|
mod_socket_t *arg = (mod_socket_t *)malloc(sizeof(mod_socket_t));
|
memcpy(arg, _socket, sizeof(mod_socket_t));
|
arg->client_socket = client_socket;
|
pthread_create(&tid, NULL, run_server_recv_client_msg , (void *)arg);
|
}
|
return NULL;
|
}
|
|
int mod_listen(void * _socket) {
|
mod_socket_t * socket = (mod_socket_t *) _socket;
|
pthread_t tid;
|
socket->is_server = 1;
|
int rv = shm_listen(socket->shm_socket);
|
if(rv == 0) {
|
pthread_create(&tid, NULL, run_accept_connection, _socket);
|
return 0;
|
}
|
return -1;
|
}
|
|
|
int mod_connect(void * _socket, int port) {
|
mod_socket_t * socket = (mod_socket_t *) _socket;
|
return shm_connect(socket->shm_socket, port);
|
|
}
|
|
int mod_send(void * _socket, const void *buf, const int size) {
|
mod_socket_t * socket = (mod_socket_t *) _socket;
|
std::map<int, shm_socket_t* > *clientSocketMap = socket->shm_socket->clientSocketMap;
|
std::map<int, shm_socket_t* >::iterator iter;
|
int rv;
|
if(socket->is_server ) {
|
switch(socket->mod) {
|
case REQ_REP:
|
SemUtil::dec(socket->items);
|
rv = shm_send(socket->client_socket, buf, size);
|
SemUtil::inc(socket->slots);
|
break;
|
case SURVEY:
|
case PUB_SUB:
|
for(iter = clientSocketMap->begin(); iter != clientSocketMap->end(); iter++) {
|
rv = shm_send(iter->second, buf, size);
|
}
|
break;
|
default:
|
rv = shm_send(socket->client_socket, buf, size);
|
}
|
return rv;
|
|
}
|
else {
|
rv = shm_send(socket->shm_socket, buf, size);
|
return rv;
|
}
|
return -1;
|
|
}
|
|
int mod_recv(void * _socket, void **buf, int *size) {
|
mod_socket_t * socket = (mod_socket_t *) _socket;
|
mod_entry_t entry;
|
|
if(socket->is_server ) {
|
switch(socket->mod) {
|
case REQ_REP:
|
SemUtil::dec(socket->slots);
|
socket->recvQueue->pop(entry);
|
*buf = entry.buf;
|
*size = entry.size;
|
socket->client_socket = entry.client_socket;
|
SemUtil::inc(socket->items);
|
|
break;
|
case PUB_SUB:
|
break;
|
case SURVEY:
|
default:
|
socket->recvQueue->pop(entry);
|
*buf = entry.buf;
|
*size = entry.size;
|
|
}
|
|
return 0;
|
}
|
else {
|
shm_recv(socket->shm_socket, buf, size);
|
return 0;
|
}
|
|
return -1;
|
}
|
|
int mod_get_socket_port(void * _socket) {
|
mod_socket_t * socket = (mod_socket_t *) _socket;
|
return socket->shm_socket->port;
|
}
|
|
|
void mod_free(void *buf) {
|
free(buf);
|
}
|