From 93f640f882412de099ea62743a28798094bdcd70 Mon Sep 17 00:00:00 2001 From: 554325746@qq.com <554325746@qq.com> Date: 星期五, 23 八月 2019 16:43:30 +0800 Subject: [PATCH] bug fix --- shm.go | 87 +++++++++++++++++++++++-------------------- 1 files changed, 46 insertions(+), 41 deletions(-) diff --git a/shm.go b/shm.go index f5fe895..47008b3 100644 --- a/shm.go +++ b/shm.go @@ -25,11 +25,11 @@ signal chan []byte } -func (s *SHM) routine(fn func([]byte)(int, error)){ +func (s *SHM) sendRoutine(){ for{ select{ case d := <-s.signal: - n, err := fn(d) + n, err := s.rw.Write(d) if err != nil{ n = -1 } @@ -37,44 +37,24 @@ } } } - -func (s *SHM) client() error{ - - var err error - - if s.rw == nil{ - s.rw, err = shm.OpenSimplex(s.url) - } - - return err -} - -func (s *SHM) clearTimeout(){ - if s.typ == coactee{ - s.Close() - } - close(s.signal) - s.signal = nil -} - -func (s *SHM) initTimeout(fn func([]byte)(int, error)){ - if s.signal == nil{ - s.signal = make(chan []byte) - go s.routine(fn) - } -} - // Send impl interface Diliver func (s *SHM) Send(data []byte) error { if s == nil { return errors.New("please init shm producer first") } - if err := s.client(); err != nil{ - return err + if s.rw == nil{ + var err error + s.rw, err = shm.OpenSimplex(s.url) + if err != nil{ + return err + } } - s.initTimeout(s.rw.Write) + if s.signal == nil{ + s.signal = make(chan []byte) + go s.sendRoutine() + } s.signal <-data @@ -85,13 +65,29 @@ } return errors.New("shm send error") case <- time.After(3 * time.Second): - s.clearTimeout() + if s.typ == coactee{ + s.Close() + } + close(s.signal) + s.signal = nil return errors.New("shm send timeout") } return errors.New("send should't here") } +func (s *SHM) recvRoutine(){ + for{ + select{ + case <- s.signal: + n, err := s.rw.Read(s.data) + if err != nil{ + n = -1 + } + s.res <- n + } + } +} // Recv impl interface Diliver func (s *SHM) Recv() ([]byte, error) { @@ -99,17 +95,22 @@ return nil, errors.New("please open shm consumer first") } - if err := s.client(); err != nil{ - return nil, err + if s.rw == nil{ + var err error + s.rw, err = shm.OpenSimplex(s.url) + if err != nil{ + return nil, err + } } - - if s.data == nil{ + + if s.signal == nil{ + s.signal = make(chan []byte) s.data = make([]byte, maxRecvSize) + + go s.recvRoutine() } - s.initTimeout(s.rw.Read) - - s.signal <- s.data + s.signal <- []byte{} select{ case d := <-s.res: @@ -120,7 +121,11 @@ } return nil, nil case <- time.After(3 * time.Second): - s.clearTimeout() + if s.typ == coactee{ + s.Close() + } + close(s.signal) + s.signal = nil return nil, errors.New("recv time out") } -- Gitblit v1.8.0