wangzhengquan
2020-07-16 37db284a0c422fc28088d21defb46ed0c576d3c1
queue/socket.c
@@ -30,7 +30,8 @@
   int port;
   shm_mod_t mod;
   SHMQueue<shm_msg_t> *queue;
   std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap;
   SHMQueue<shm_msg_t> *remoteQueue;
   // std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap;
   int slots;
   int items;
   int is_server;
@@ -114,26 +115,20 @@
}
static int __shm_rev(shm_socket_t* _socket, void **buf, int *size) {
static int __shm_rev__(shm_socket_t* _socket) {
   shm_msg_t src;
   
   std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
   bool rv = _socket->queue->pop(src);
   if (rv) {
      if(src.type=="open")
      if(src.type == SHM_SOCKET_OPEN) {
         _socket->remoteQueue = new SHMQueue<shm_msg_t>(src.port, 0);
      }
      if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) {
       if(_socket->mod == REQ_REP)
           SemUtil::dec(_socket->slots);
        remoteQueueMap->insert({src.port,  new SHMQueue<shm_msg_t>(src.port, 0)});
        if(_socket->mod == REQ_REP)
           SemUtil::inc(_socket->items);
      }
      
      void * _buf = malloc(src.size);
      memcpy(_buf, src.buf, src.size);