wangzhengquan
2020-07-17 5e3e6719f7d7922decdc16d2313baf2e94210750
queue/mod_socket.c
@@ -112,14 +112,29 @@
int mod_send(void * _socket, void *buf, int size) {
   mod_socket_t * socket = (mod_socket_t *) _socket;
   if(!socket->is_server ) {
      return shm_send(socket->shm_socket, buf, size);
   std::map<int, shm_socket_t* > *clientSocketMap = socket->shm_socket->clientSocketMap;
   std::map<int, shm_socket_t* >::iterator iter;
   int rv;
   if(socket->is_server ) {
      switch(socket->mod) {
         case REQ_REP:
            SemUtil::dec(socket->items);
            rv = shm_send(socket->client_socket, buf, size);
            SemUtil::inc(socket->slots);
            break;
         case PUB_SUB:
            for(iter = clientSocketMap->begin(); iter != clientSocketMap->end(); iter++) {
               rv = shm_send(iter->second, buf, size);
            }
            break;
         default:
            err_exit(0, "不支持的模式%d", socket->mod);
      }
      return rv;
   }
   else if(socket->mod == REQ_REP) {
      SemUtil::dec(socket->items);
      shm_send(socket->client_socket, buf, size);
      SemUtil::inc(socket->slots);
      return 0;
   else {
      return shm_send(socket->shm_socket, buf, size);
   }
   return -1;
   
@@ -128,22 +143,32 @@
int mod_recv(void * _socket, void **buf, int *size) {
   mod_socket_t * socket = (mod_socket_t *) _socket;
   mod_entry_t entry;
   int rv;
   if(!socket->is_server ) {
   if(socket->is_server ) {
      switch(socket->mod) {
         case REQ_REP:
            SemUtil::dec(socket->slots);
            rv = socket->recvQueue->pop(entry);
            *buf = entry.buf;
            *size = entry.size;
            socket->client_socket = entry.client_socket;
            SemUtil::inc(socket->items);
            break;
         case PUB_SUB:
            rv = 0;
            break;
         default:
            err_exit(0, "不支持的模式%d", socket->mod);
      }
      return rv;
   }
   else {
      return shm_recv(socket->shm_socket, buf, size);
   }
   else if(socket->mod == REQ_REP) {
      SemUtil::dec(socket->slots);
      socket->recvQueue->pop(entry);
      *buf = entry.buf;
      *size = entry.size;
      socket->client_socket = entry.client_socket;
      SemUtil::inc(socket->items);
      return 0;
   }
   return -1;
}