wangzhengquan
2020-07-20 f85c9b875b060681b51f57b15074ba1c7c9f5636
queue/mod_socket.c
@@ -1,6 +1,9 @@
#include "mod_socket.h"
#include "shm_socket.h"
#include "usg_common.h"
#include "logger_factory.h"
static Logger logger = LoggerFactory::getLogger();
typedef struct mod_entry_t
{
   int size;
@@ -52,10 +55,15 @@
   return rv;
}
int mod_get_socket_port(void * _socket) {
   mod_socket_t * socket = (mod_socket_t *) _socket;
   return socket->shm_socket->port;
}
int mod_socket_bind(void * _socket, int port){
   mod_socket_t * socket = (mod_socket_t *) _socket;
   return  shm_soket_bind(socket->shm_socket, port);
   return  shm_socket_bind(socket->shm_socket, port);
}
void * run_server_recv_client_msg(void *_socket) {
@@ -71,6 +79,7 @@
      socket->recvQueue->push(entry);
      // shm_free(recvbuf);
   }
   free(_socket);
   shm_close_socket(client_socket);
   return NULL;
@@ -81,6 +90,7 @@
   shm_socket_t *client_socket;
   pthread_t tid;
   while(socket->shm_socket->status == SHM_CONN_LISTEN) {
      //接受客户端的连接请求
      client_socket = shm_accept(socket->shm_socket);
      
      mod_socket_t *arg = (mod_socket_t *)malloc(sizeof(mod_socket_t));
@@ -118,9 +128,11 @@
   if(socket->is_server ) {
      switch(socket->mod) {
         case REQ_REP:
logger.debug("mod_send before");
            SemUtil::dec(socket->items);
            rv = shm_send(socket->client_socket, buf, size);
            SemUtil::inc(socket->slots);
logger.debug("mod_send after");
            break;
         case SURVEY:
         case PUB_SUB:
@@ -135,7 +147,10 @@
      
   }
   else {
      return shm_send(socket->shm_socket, buf, size);
logger.debug("mod_send before");
      rv = shm_send(socket->shm_socket, buf, size);
logger.debug("mod_send after");
      return rv;
   }
   return -1;
   
@@ -144,33 +159,37 @@
int mod_recv(void * _socket, void **buf, int *size) {
   mod_socket_t * socket = (mod_socket_t *) _socket;
   mod_entry_t entry;
   int rv;
   if(socket->is_server ) {
      switch(socket->mod) {
         case REQ_REP:
logger.debug("REQ_REP mod_recv before");
            SemUtil::dec(socket->slots);
            rv = socket->recvQueue->pop(entry);
            socket->recvQueue->pop(entry);
            *buf = entry.buf;
            *size = entry.size;
            socket->client_socket = entry.client_socket;
            SemUtil::inc(socket->items);
logger.debug("REQ_REP mod_recv after");
            break;
         case PUB_SUB:
            rv = 0;
            break;
         case SURVEY:
         default:
            rv = socket->recvQueue->pop(entry);
            socket->recvQueue->pop(entry);
            *buf = entry.buf;
            *size = entry.size;
      }
      return rv;
      }
      return 0;
   }
   else {
      return shm_recv(socket->shm_socket, buf, size);
logger.debug("mod_recv before");
      shm_recv(socket->shm_socket, buf, size);
logger.debug("mod_recv after");
      return 0;
   }
   return -1;