wangzhengquan
2020-07-20 f85c9b875b060681b51f57b15074ba1c7c9f5636
queue/mod_socket.c
@@ -1,2 +1,202 @@
#include "mod_socket.h"
#include "shm_socket.h"
#include "usg_common.h"
#include "logger_factory.h"
static Logger logger = LoggerFactory::getLogger();
typedef struct mod_entry_t
{
   int size;
   void *buf;
   shm_socket_t *client_socket;
}mod_entry_t;
typedef struct mod_socket_t {
  socket_mod_t mod;
  shm_socket_t *shm_socket;
  shm_socket_t *client_socket;
  int is_server;
  LockFreeQueue<mod_entry_t, DM_Allocator> *recvQueue;
  int slots;
  int items;
} mod_socket_t;
/**
 *
 */
void *mod_open_socket(int mod) {
  mod_socket_t *socket = (mod_socket_t *)malloc(sizeof(mod_socket_t));
  socket->shm_socket=shm_open_socket();
  socket->is_server = 0;
  socket->mod = (socket_mod_t)mod;
  socket->recvQueue = new LockFreeQueue<mod_entry_t, DM_Allocator>(16);
  if (mod == REQ_REP) {
    socket->slots = SemUtil::get(IPC_PRIVATE, 1);
    socket->items = SemUtil::get(IPC_PRIVATE, 0);
  }
  return (void *)socket;
}
int mod_close_socket(void * _socket){
   mod_socket_t * socket = (mod_socket_t *) _socket;
   if (socket->mod == REQ_REP) {
      SemUtil::remove(socket->slots);
    SemUtil::remove(socket->items);
   }
   int rv = shm_close_socket(socket->shm_socket);
   free(_socket);
   return rv;
}
int mod_get_socket_port(void * _socket) {
   mod_socket_t * socket = (mod_socket_t *) _socket;
   return socket->shm_socket->port;
}
int mod_socket_bind(void * _socket, int port){
   mod_socket_t * socket = (mod_socket_t *) _socket;
   return  shm_socket_bind(socket->shm_socket, port);
}
void * run_server_recv_client_msg(void *_socket) {
   pthread_detach(pthread_self());
   mod_socket_t * socket = (mod_socket_t *) _socket;
   shm_socket_t * client_socket = socket->client_socket;
   mod_entry_t entry;
   entry.client_socket = client_socket;
   while (socket->shm_socket->status == SHM_CONN_LISTEN &&
      client_socket->status == SHM_CONN_ESTABLISHED && shm_recv(client_socket, &entry.buf, &entry.size) == 0 ) {
      socket->recvQueue->push(entry);
      // shm_free(recvbuf);
   }
   free(_socket);
   shm_close_socket(client_socket);
   return NULL;
}
void *run_accept_connection(void * _socket) {
   mod_socket_t * socket = (mod_socket_t *) _socket;
   shm_socket_t *client_socket;
   pthread_t tid;
   while(socket->shm_socket->status == SHM_CONN_LISTEN) {
      //接受客户端的连接请求
      client_socket = shm_accept(socket->shm_socket);
      mod_socket_t *arg = (mod_socket_t *)malloc(sizeof(mod_socket_t));
      memcpy(arg, _socket, sizeof(mod_socket_t));
      arg->client_socket = client_socket;
      pthread_create(&tid, NULL, run_server_recv_client_msg , (void *)arg);
   }
   return NULL;
}
int mod_listen(void * _socket) {
   mod_socket_t * socket = (mod_socket_t *) _socket;
   pthread_t tid;
   socket->is_server = 1;
   int rv = shm_listen(socket->shm_socket);
   if(rv == 0) {
      pthread_create(&tid, NULL, run_accept_connection, _socket);
      return 0;
   }
   return -1;
}
int mod_connect(void * _socket, int port) {
   mod_socket_t * socket = (mod_socket_t *) _socket;
   return shm_connect(socket->shm_socket, port);
}
int mod_send(void * _socket, const void *buf, const int size) {
   mod_socket_t * socket = (mod_socket_t *) _socket;
   std::map<int, shm_socket_t* > *clientSocketMap = socket->shm_socket->clientSocketMap;
   std::map<int, shm_socket_t* >::iterator iter;
   int rv;
   if(socket->is_server ) {
      switch(socket->mod) {
         case REQ_REP:
logger.debug("mod_send before");
            SemUtil::dec(socket->items);
            rv = shm_send(socket->client_socket, buf, size);
            SemUtil::inc(socket->slots);
logger.debug("mod_send after");
            break;
         case SURVEY:
         case PUB_SUB:
            for(iter = clientSocketMap->begin(); iter != clientSocketMap->end(); iter++) {
               rv = shm_send(iter->second, buf, size);
            }
            break;
         default:
            rv = shm_send(socket->client_socket, buf, size);
      }
      return rv;
   }
   else {
logger.debug("mod_send before");
      rv = shm_send(socket->shm_socket, buf, size);
logger.debug("mod_send after");
      return rv;
   }
   return -1;
}
int mod_recv(void * _socket, void **buf, int *size) {
   mod_socket_t * socket = (mod_socket_t *) _socket;
   mod_entry_t entry;
   if(socket->is_server ) {
      switch(socket->mod) {
         case REQ_REP:
logger.debug("REQ_REP mod_recv before");
            SemUtil::dec(socket->slots);
            socket->recvQueue->pop(entry);
            *buf = entry.buf;
            *size = entry.size;
            socket->client_socket = entry.client_socket;
            SemUtil::inc(socket->items);
logger.debug("REQ_REP mod_recv after");
            break;
         case PUB_SUB:
            break;
         case SURVEY:
         default:
            socket->recvQueue->pop(entry);
            *buf = entry.buf;
            *size = entry.size;
      }
      return 0;
   }
   else {
logger.debug("mod_recv before");
      shm_recv(socket->shm_socket, buf, size);
logger.debug("mod_recv after");
      return 0;
   }
   return -1;
}
void mod_free(void *buf) {
   free(buf);
}