| | |
| | | signal chan []byte |
| | | } |
| | | |
| | | func (s *SHM) routine(fn func([]byte)(int, error)){ |
| | | func (s *SHM) sendRoutine(){ |
| | | for{ |
| | | select{ |
| | | case d := <-s.signal: |
| | | n, err := fn(d) |
| | | n, err := s.rw.Write(d) |
| | | if err != nil{ |
| | | n = -1 |
| | | } |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (s *SHM) client() error{ |
| | | |
| | | var err error |
| | | |
| | | if s.rw == nil{ |
| | | s.rw, err = shm.OpenSimplex(s.url) |
| | | } |
| | | |
| | | return err |
| | | } |
| | | |
| | | func (s *SHM) clearTimeout(){ |
| | | if s.typ == coactee{ |
| | | s.Close() |
| | | } |
| | | close(s.signal) |
| | | s.signal = nil |
| | | } |
| | | |
| | | func (s *SHM) initTimeout(fn func([]byte)(int, error)){ |
| | | if s.signal == nil{ |
| | | s.signal = make(chan []byte) |
| | | go s.routine(fn) |
| | | } |
| | | } |
| | | |
| | | // Send impl interface Diliver |
| | | func (s *SHM) Send(data []byte) error { |
| | | if s == nil { |
| | | return errors.New("please init shm producer first") |
| | | } |
| | | |
| | | if err := s.client(); err != nil{ |
| | | return err |
| | | if s.rw == nil{ |
| | | var err error |
| | | s.rw, err = shm.OpenSimplex(s.url) |
| | | if err != nil{ |
| | | return err |
| | | } |
| | | } |
| | | |
| | | s.initTimeout(s.rw.Write) |
| | | if s.signal == nil{ |
| | | s.signal = make(chan []byte) |
| | | go s.sendRoutine() |
| | | } |
| | | |
| | | s.signal <-data |
| | | |
| | |
| | | } |
| | | return errors.New("shm send error") |
| | | case <- time.After(3 * time.Second): |
| | | s.clearTimeout() |
| | | if s.typ == coactee{ |
| | | s.Close() |
| | | } |
| | | close(s.signal) |
| | | s.signal = nil |
| | | return errors.New("shm send timeout") |
| | | } |
| | | |
| | | return errors.New("send should't here") |
| | | } |
| | | |
| | | func (s *SHM) recvRoutine(){ |
| | | for{ |
| | | select{ |
| | | case <- s.signal: |
| | | n, err := s.rw.Read(s.data) |
| | | if err != nil{ |
| | | n = -1 |
| | | } |
| | | s.res <- n |
| | | } |
| | | } |
| | | } |
| | | // Recv impl interface Diliver |
| | | func (s *SHM) Recv() ([]byte, error) { |
| | | |
| | |
| | | return nil, errors.New("please open shm consumer first") |
| | | } |
| | | |
| | | if err := s.client(); err != nil{ |
| | | return nil, err |
| | | if s.rw == nil{ |
| | | var err error |
| | | s.rw, err = shm.OpenSimplex(s.url) |
| | | if err != nil{ |
| | | return nil, err |
| | | } |
| | | } |
| | | |
| | | if s.data == nil{ |
| | | |
| | | if s.signal == nil{ |
| | | s.signal = make(chan []byte) |
| | | s.data = make([]byte, maxRecvSize) |
| | | |
| | | go s.recvRoutine() |
| | | } |
| | | |
| | | s.initTimeout(s.rw.Read) |
| | | |
| | | s.signal <- s.data |
| | | s.signal <- []byte{} |
| | | |
| | | select{ |
| | | case d := <-s.res: |
| | |
| | | } |
| | | return nil, nil |
| | | case <- time.After(3 * time.Second): |
| | | s.clearTimeout() |
| | | if s.typ == coactee{ |
| | | s.Close() |
| | | } |
| | | close(s.signal) |
| | | s.signal = nil |
| | | |
| | | return nil, errors.New("recv time out") |
| | | } |