package deliver import ( "errors" "fmt" "io" "time" "github.com/tmthrgd/shm-go" ) // SHM share memory type SHM struct { rw *shm.ReadWriteCloser typ td data []byte } // Send impl interface Diliver func (s *SHM) Send(data []byte) error { if s == nil || s.rw == nil { return errors.New("please init shm producer first") } ch := make(chan int) go func(){ n, _ := s.rw.Write(data) ch <- n }() select{ case <-ch: return nil case <- time.After(3 * time.Second): return errors.New("send time out") } return errors.New("send should't here") // n, err := s.rw.Write(data) // if n < 1 { // fmt.Println("recv data less than 1 length") // } // return err } // Recv impl interface Diliver func (s *SHM) Recv() ([]byte, error) { if s == nil || s.rw == nil { return nil, errors.New("please open shm consumer first") } ch := make(chan []byte) go func(){ n, err := s.rw.Read(s.data) if err == nil || err == io.EOF { s.data = s.data[:n:n] } ch <- s.data }() select{ case d := <-ch: return d, nil case <- time.After(3 * time.Second): return nil, errors.New("recv time out") } return nil, errors.New("recv should't here") // data := make([]byte, maxRecvSize) // n, err := s.rw.Read(data) // if err == nil || err == io.EOF { // data = data[:n:n] // return data, nil // } // return nil, err } // Close impl interface Deliver func (s *SHM) Close() { if s == nil { return } if s.rw != nil { s.rw.Close() } if s.typ == agent { shm.Unlink(s.rw.Name()) } } func shmServer(m Mode, url string, args ...interface{}) (*SHM, error) { if m != Shm { return nil, errors.New("please use deliver.Shm mode") } var param []int for _, v := range args { switch v.(type) { case int: param = append(param, v.(int)) default: return nil, errors.New("shmServer created recv error parameters") } } blocks, size := 2, maxRecvSize if len(param) == 2 { blocks, size = param[0], param[1] // return nil, errors.New("shmServer created recv too much parameters") } shm.Unlink(url) rw, err := shm.CreateSimplex(url, 0644, blocks, size) if err == nil { fmt.Println(rw.Name()) return &SHM{ rw, agent, make([]byte, maxRecvSize), }, nil } return nil, err } func shmClient(m Mode, url string, args ...interface{}) (*SHM, error) { if m != Shm { return nil, errors.New("please use deliver.Shm mode") } rw, err := shm.OpenSimplex(url) if err == nil { return &SHM{ rw, coactee, make([]byte, maxRecvSize), }, nil } return nil, err }