wangzhengquan
2020-09-25 00dba6082e245d917cb7d6eed3c627211ff41cd7
src/socket/mod_socket.c
@@ -1,7 +1,12 @@
#include "usg_common.h"
#include "mod_socket.h"
#include "shm_socket.h"
#include "usg_common.h"
#include "shm_allocator.h"
#include "mem_pool.h"
#include "hashtable.h"
#include "sem_util.h"
#include "logger_factory.h"
static Logger logger = LoggerFactory::getLogger();
typedef struct mod_entry_t
@@ -28,7 +33,7 @@
 */
void *mod_open_socket(int mod) {
  mod_socket_t *socket = (mod_socket_t *)malloc(sizeof(mod_socket_t));
  socket->shm_socket=shm_open_socket();
  socket->shm_socket=shm_open_socket(SHM_SOCKET_STREAM);
  socket->is_server = 0;
  socket->mod = (socket_mod_t)mod;
  socket->recvQueue = new LockFreeQueue<mod_entry_t, DM_Allocator>(16);
@@ -55,10 +60,7 @@
   return rv;
}
int mod_get_socket_port(void * _socket) {
   mod_socket_t * socket = (mod_socket_t *) _socket;
   return socket->shm_socket->port;
}
int mod_socket_bind(void * _socket, int port){
@@ -128,11 +130,9 @@
   if(socket->is_server ) {
      switch(socket->mod) {
         case REQ_REP:
logger.debug("mod_send before");
            SemUtil::dec(socket->items);
            rv = shm_send(socket->client_socket, buf, size);
            SemUtil::inc(socket->slots);
logger.debug("mod_send after");
            break;
         case SURVEY:
         case PUB_SUB:
@@ -147,9 +147,7 @@
      
   }
   else {
logger.debug("mod_send before");
      rv = shm_send(socket->shm_socket, buf, size);
logger.debug("mod_send after");
      return rv;
   }
   return -1;
@@ -163,7 +161,6 @@
   if(socket->is_server ) {
      switch(socket->mod) {
         case REQ_REP:
logger.debug("REQ_REP mod_recv before");
            SemUtil::dec(socket->slots);
            socket->recvQueue->pop(entry);
            *buf = entry.buf;
@@ -171,7 +168,6 @@
            socket->client_socket = entry.client_socket;
            SemUtil::inc(socket->items);
            
logger.debug("REQ_REP mod_recv after");
            break;
         case PUB_SUB:
            break;
@@ -186,15 +182,18 @@
      return 0;
   }
   else {
logger.debug("mod_recv before");
      shm_recv(socket->shm_socket, buf, size);
logger.debug("mod_recv after");
      return 0;
   }
   return -1;
}
int mod_get_socket_port(void * _socket) {
   mod_socket_t * socket = (mod_socket_t *) _socket;
   return socket->shm_socket->port;
}
void mod_free(void *buf) {
   free(buf);