554325746@qq.com
2019-08-23 93f640f882412de099ea62743a28798094bdcd70
bug fix
1个文件已修改
87 ■■■■ 已修改文件
shm.go 87 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
shm.go
@@ -25,11 +25,11 @@
    signal chan []byte
}
func (s *SHM) routine(fn func([]byte)(int, error)){
func (s *SHM) sendRoutine(){
    for{
        select{
        case d := <-s.signal:
            n, err := fn(d)
            n, err := s.rw.Write(d)
            if err != nil{
                n = -1
            }
@@ -37,44 +37,24 @@
        }
    }
}
func (s *SHM) client() error{
    var err error
    if s.rw == nil{
        s.rw, err = shm.OpenSimplex(s.url)
    }
    return err
}
func (s *SHM) clearTimeout(){
    if s.typ == coactee{
        s.Close()
    }
    close(s.signal)
    s.signal = nil
}
func (s *SHM) initTimeout(fn func([]byte)(int, error)){
    if s.signal == nil{
        s.signal = make(chan []byte)
        go s.routine(fn)
    }
}
// Send impl interface Diliver
func (s *SHM) Send(data []byte) error {
    if s == nil {
        return errors.New("please init shm producer first")
    }
    if err := s.client(); err != nil{
        return err
    if s.rw == nil{
        var err error
        s.rw, err = shm.OpenSimplex(s.url)
        if err != nil{
            return err
        }
    }
    s.initTimeout(s.rw.Write)
    if s.signal == nil{
        s.signal = make(chan []byte)
        go s.sendRoutine()
    }
    s.signal <-data
@@ -85,13 +65,29 @@
        }
        return errors.New("shm send error")
    case <- time.After(3 * time.Second):
        s.clearTimeout()
        if s.typ == coactee{
            s.Close()
        }
        close(s.signal)
        s.signal = nil
        return errors.New("shm send timeout")
    }
    return errors.New("send should't here")
}
func (s *SHM) recvRoutine(){
    for{
        select{
        case <- s.signal:
            n, err := s.rw.Read(s.data)
            if err != nil{
                n = -1
            }
            s.res <- n
        }
    }
}
// Recv impl interface Diliver
func (s *SHM) Recv() ([]byte, error) {
@@ -99,17 +95,22 @@
        return nil, errors.New("please open shm consumer first")
    }
    if err := s.client(); err != nil{
        return nil, err
    if s.rw == nil{
        var err error
        s.rw, err = shm.OpenSimplex(s.url)
        if err != nil{
            return nil, err
        }
    }
    if s.data == nil{
    if s.signal == nil{
        s.signal = make(chan []byte)
        s.data = make([]byte, maxRecvSize)
        go s.recvRoutine()
    }
    
    s.initTimeout(s.rw.Read)
    s.signal <- s.data
    s.signal <- []byte{}
    select{
    case d := <-s.res:
@@ -120,7 +121,11 @@
        }
        return nil, nil
    case <- time.After(3 * time.Second):
        s.clearTimeout()
        if s.typ == coactee{
            s.Close()
        }
        close(s.signal)
        s.signal = nil
        return nil, errors.New("recv time out")
    }