wangzhengquan
2020-07-20 f85c9b875b060681b51f57b15074ba1c7c9f5636
queue/mod_socket.c
@@ -55,6 +55,11 @@
   return rv;
}
int mod_get_socket_port(void * _socket) {
   mod_socket_t * socket = (mod_socket_t *) _socket;
   return socket->shm_socket->port;
}
int mod_socket_bind(void * _socket, int port){
   mod_socket_t * socket = (mod_socket_t *) _socket;
@@ -74,6 +79,7 @@
      socket->recvQueue->push(entry);
      // shm_free(recvbuf);
   }
   free(_socket);
   shm_close_socket(client_socket);
   return NULL;
@@ -84,6 +90,7 @@
   shm_socket_t *client_socket;
   pthread_t tid;
   while(socket->shm_socket->status == SHM_CONN_LISTEN) {
      //接受客户端的连接请求
      client_socket = shm_accept(socket->shm_socket);
      
      mod_socket_t *arg = (mod_socket_t *)malloc(sizeof(mod_socket_t));
@@ -152,38 +159,37 @@
int mod_recv(void * _socket, void **buf, int *size) {
   mod_socket_t * socket = (mod_socket_t *) _socket;
   mod_entry_t entry;
   int rv;
   if(socket->is_server ) {
      switch(socket->mod) {
         case REQ_REP:
   logger.debug("REQ_REP mod_recv before");
logger.debug("REQ_REP mod_recv before");
            SemUtil::dec(socket->slots);
            rv = socket->recvQueue->pop(entry);
            socket->recvQueue->pop(entry);
            *buf = entry.buf;
            *size = entry.size;
            socket->client_socket = entry.client_socket;
            SemUtil::inc(socket->items);
logger.debug("REQ_REP mod_recv after");
            break;
         case PUB_SUB:
            rv = 0;
            break;
         case SURVEY:
         default:
            rv = socket->recvQueue->pop(entry);
            socket->recvQueue->pop(entry);
            *buf = entry.buf;
            *size = entry.size;
      }
      return rv;
      }
      return 0;
   }
   else {
logger.debug("mod_recv before");
      rv = shm_recv(socket->shm_socket, buf, size);
      shm_recv(socket->shm_socket, buf, size);
logger.debug("mod_recv after");
      return rv;
      return 0;
   }
   return -1;