zhangmeng
2019-08-26 0ac4c151e08a9735d085c08ca5bcd9b50944d650
提交 | 用户 | age
9a89af 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
fb46ee 6     "io"
512af9 7     "time"
9a89af 8
b8c958 9     "basic.com/valib/shm.git"
9a89af 10 )
Z 11
12 // SHM share memory
13 type SHM struct {
93ff14 14     rw  *shm.ReadWriteCloser
Z 15     typ td
0ac4c1 16
Z 17     recvData []byte
9a89af 18 }
Z 19
20 // Send impl interface Diliver
21 func (s *SHM) Send(data []byte) error {
c2bbe3 22     if s == nil || s.rw == nil {
9a89af 23         return errors.New("please init shm producer first")
Z 24     }
25
fb46ee 26     n, err := s.rw.Write(data)
Z 27     if n < 1 {
28         fmt.Println("recv data less than 1 length")
9a89af 29     }
Z 30
fb46ee 31     return err
9a89af 32 }
Z 33
34 // Recv impl interface Diliver
35 func (s *SHM) Recv() ([]byte, error) {
36
c2bbe3 37     if s == nil || s.rw == nil {
9a89af 38         return nil, errors.New("please open shm consumer first")
Z 39     }
40
0ac4c1 41     if s.recvData == nil {
Z 42         s.recvData = make([]byte, maxRecvSize)
43     }
44     n, err := s.rw.Read(s.recvData)
fb46ee 45     if err == nil || err == io.EOF {
0ac4c1 46         data := make([]byte, n)
Z 47         copy(data, s.recvData)
fb46ee 48         return data, nil
9a89af 49     }
Z 50
fb46ee 51     return nil, err
9a89af 52 }
Z 53
54 // Close impl interface Deliver
55 func (s *SHM) Close() {
c2bbe3 56     if s == nil {
Z 57         return
58     }
9a89af 59     if s.rw != nil {
Z 60         s.rw.Close()
61     }
62     if s.typ == agent {
63         shm.Unlink(s.rw.Name())
64     }
65 }
66
c2bbe3 67 func shmServer(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 68     if m != Shm {
c2bbe3 69         return nil, errors.New("please use deliver.Shm mode")
9a89af 70     }
Z 71
72     var param []int
73     for _, v := range args {
74         switch v.(type) {
75         case int:
76             param = append(param, v.(int))
77         default:
78
c2bbe3 79             return nil, errors.New("shmServer created recv error parameters")
9a89af 80         }
Z 81     }
fb46ee 82
512af9 83     blocks, size := 2, maxRecvSize
Z 84     if len(param) == 2 {
85         blocks, size = param[0], param[1]
86         // return nil, errors.New("shmServer created recv too much parameters")
87     }
88
89     time.Sleep(time.Millisecond)
25d8c3 90     shm.Unlink(url)
Z 91
512af9 92     rw, err := shm.CreateSimplex(url, 0644, blocks, size)
c2bbe3 93     if err == nil {
e3c917 94         fmt.Println(rw.Name())
9a89af 95         return &SHM{
0ac4c1 96             rw:       rw,
Z 97             typ:      agent,
98             recvData: nil,
c2bbe3 99         }, nil
9a89af 100     }
Z 101
c2bbe3 102     return nil, err
9a89af 103 }
Z 104
c2bbe3 105 func shmClient(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 106     if m != Shm {
c2bbe3 107         return nil, errors.New("please use deliver.Shm mode")
9a89af 108     }
Z 109
c2bbe3 110     rw, err := shm.OpenSimplex(url)
Z 111     if err == nil {
9a89af 112         return &SHM{
0ac4c1 113             rw:       rw,
Z 114             typ:      coactee,
115             recvData: nil,
c2bbe3 116         }, nil
9a89af 117     }
c2bbe3 118     return nil, err
9a89af 119 }