wangzhengquan
2020-07-16 37db284a0c422fc28088d21defb46ed0c576d3c1
update
1个文件已修改
19 ■■■■■ 已修改文件
queue/socket.c 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/socket.c
@@ -30,7 +30,8 @@
    int port;
    shm_mod_t mod;
    SHMQueue<shm_msg_t> *queue;
    std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap;
    SHMQueue<shm_msg_t> *remoteQueue;
    // std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap;
    int slots;
    int items;
    int is_server;
@@ -114,26 +115,20 @@
}
static int __shm_rev(shm_socket_t* _socket, void **buf, int *size) {
static int __shm_rev__(shm_socket_t* _socket) {
    shm_msg_t src;
    
    std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
    bool rv = _socket->queue->pop(src);
    if (rv) {
        if(src.type=="open")
        if(src.type == SHM_SOCKET_OPEN) {
            _socket->remoteQueue = new SHMQueue<shm_msg_t>(src.port, 0);
        }
        if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) {
         if(_socket->mod == REQ_REP)
              SemUtil::dec(_socket->slots);
          remoteQueueMap->insert({src.port,  new SHMQueue<shm_msg_t>(src.port, 0)});
          if(_socket->mod == REQ_REP)
              SemUtil::inc(_socket->items);
        }
        
        void * _buf = malloc(src.size);
        memcpy(_buf, src.buf, src.size);