package deliver import ( "errors" // "fmt" // "io" "time" "github.com/tmthrgd/shm-go" ) // SHM share memory type SHM struct { rw *shm.ReadWriteCloser typ td url string // running result, -1 failed, >0 succeeded res chan int //recv data []byte signal chan []byte } func (s *SHM) sendRoutine(){ for{ select{ case d := <-s.signal: n, err := s.rw.Write(d) if err != nil{ n = -1 } s.res <- n } } } // Send impl interface Diliver func (s *SHM) Send(data []byte) error { if s == nil { return errors.New("please init shm producer first") } if s.rw == nil{ var err error s.rw, err = shm.OpenSimplex(s.url) if err != nil{ return err } } if s.signal == nil{ s.signal = make(chan []byte) go s.sendRoutine() } s.signal <-data select{ case r := <-s.res: if r > 0{ return nil } return errors.New("shm send error") case <- time.After(3 * time.Second): if s.typ == coactee{ s.Close() } close(s.signal) s.signal = nil return errors.New("shm send timeout") } return errors.New("send should't here") } func (s *SHM) recvRoutine(){ for{ select{ case <- s.signal: n, err := s.rw.Read(s.data) if err != nil{ n = -1 } s.res <- n } } } // Recv impl interface Diliver func (s *SHM) Recv() ([]byte, error) { if s == nil { return nil, errors.New("please open shm consumer first") } if s.rw == nil{ var err error s.rw, err = shm.OpenSimplex(s.url) if err != nil{ return nil, err } } if s.signal == nil{ s.signal = make(chan []byte) s.data = make([]byte, maxRecvSize) go s.recvRoutine() } s.signal <- []byte{} select{ case d := <-s.res: if d > 0{ data := make([]byte, d) copy(data, s.data) return data, nil } return nil, nil case <- time.After(3 * time.Second): if s.typ == coactee{ s.Close() } close(s.signal) s.signal = nil return nil, errors.New("recv time out") } return nil, errors.New("recv should't here") } // Close impl interface Deliver func (s *SHM) Close() { if s == nil { return } if s.rw != nil { s.rw.Close() } if s.typ == agent { shm.Unlink(s.url) } s.rw = nil } func shmServer(m Mode, url string, args ...interface{}) (*SHM, error) { if m != Shm { return nil, errors.New("please use deliver.Shm mode") } var param []int for _, v := range args { switch v.(type) { case int: param = append(param, v.(int)) default: return nil, errors.New("shmServer created recv error parameters") } } blocks, size := 2, maxRecvSize if len(param) == 2 { blocks, size = param[0], param[1] // return nil, errors.New("shmServer created recv too much parameters") } shm.Unlink(url) time.Sleep(time.Second) rw, err := shm.CreateSimplex(url, 0644, blocks, size) if err == nil { return &SHM{ rw: rw, typ: agent, url: url, res: make(chan int), data: nil, signal: nil, }, nil } return nil, err } func shmClient(m Mode, url string, args ...interface{}) (*SHM, error) { if m != Shm { return nil, errors.New("please use deliver.Shm mode") } return &SHM{ rw: nil, typ: coactee, url: url, res: make(chan int), data: nil, signal: nil, }, nil }