zhangmeng
2019-08-26 7385a596ec1a6831474d5815d3d6c614c667f270
提交 | 用户 | age
9a89af 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
be392a 6     // "io"
9a89af 7
fa89b4 8     "time"
9a89af 9
b8c958 10     "basic.com/valib/shm.git"
9a89af 11 )
Z 12
13 // SHM share memory
14 type SHM struct {
93ff14 15     rw  *shm.ReadWriteCloser
Z 16     typ td
be392a 17
5 18     data []byte
9a89af 19 }
Z 20
21 // Send impl interface Diliver
22 func (s *SHM) Send(data []byte) error {
c2bbe3 23     if s == nil || s.rw == nil {
9a89af 24         return errors.New("please init shm producer first")
Z 25     }
26
fa89b4 27     ch := make(chan int)
5 28     go func(){
29         n, _ := s.rw.Write(data)
30         ch <- n
31     }()
32     select{
33     case <-ch:
34         return nil
35     case <- time.After(3 * time.Second):
36         return errors.New("send time out")
9a89af 37     }
Z 38
fa89b4 39     return errors.New("send should't here")
5 40
41     // n, err := s.rw.Write(data)
42     // if n < 1 {
43     //     fmt.Println("recv data less than 1 length")
44     // }
45
46     // return err
9a89af 47 }
Z 48
49 // Recv impl interface Diliver
50 func (s *SHM) Recv() ([]byte, error) {
51
c2bbe3 52     if s == nil || s.rw == nil {
9a89af 53         return nil, errors.New("please open shm consumer first")
Z 54     }
55
be392a 56     ch := make(chan int)
fa89b4 57     go func(){
be392a 58         n, _ := s.rw.Read(s.data)
5 59         ch <- n
fa89b4 60     }()
5 61     select{
62     case d := <-ch:
be392a 63         if d > 0{
5 64             data := make([]byte, d)
65             copy(data, s.data)
66             return data, nil
67         }
fa89b4 68     case <- time.After(3 * time.Second):
5 69         return nil, errors.New("recv time out")
9a89af 70     }
Z 71
fa89b4 72     return nil, errors.New("recv should't here")
5 73
74
75     // data := make([]byte, maxRecvSize)
76     // n, err := s.rw.Read(data)
77     // if err == nil || err == io.EOF {
78     //     data = data[:n:n]
79     //     return data, nil
80     // }
81
82     // return nil, err
9a89af 83 }
Z 84
85 // Close impl interface Deliver
86 func (s *SHM) Close() {
c2bbe3 87     if s == nil {
Z 88         return
89     }
9a89af 90     if s.rw != nil {
Z 91         s.rw.Close()
92     }
93     if s.typ == agent {
94         shm.Unlink(s.rw.Name())
95     }
96 }
97
c2bbe3 98 func shmServer(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 99     if m != Shm {
c2bbe3 100         return nil, errors.New("please use deliver.Shm mode")
9a89af 101     }
Z 102
103     var param []int
104     for _, v := range args {
105         switch v.(type) {
106         case int:
107             param = append(param, v.(int))
108         default:
109
c2bbe3 110             return nil, errors.New("shmServer created recv error parameters")
9a89af 111         }
Z 112     }
c2bbe3 113
3603e5 114     blocks, size := 2, maxRecvSize
5 115     if len(param) == 2 {
116         blocks, size = param[0], param[1]
117         // return nil, errors.New("shmServer created recv too much parameters")
9a89af 118     }
25d8c3 119     shm.Unlink(url)
Z 120
3603e5 121     rw, err := shm.CreateSimplex(url, 0644, blocks, size)
c2bbe3 122     if err == nil {
e3c917 123         fmt.Println(rw.Name())
9a89af 124         return &SHM{
Z 125             rw,
126             agent,
be392a 127             make([]byte, maxRecvSize),
c2bbe3 128         }, nil
9a89af 129     }
Z 130
c2bbe3 131     return nil, err
9a89af 132 }
Z 133
c2bbe3 134 func shmClient(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 135     if m != Shm {
c2bbe3 136         return nil, errors.New("please use deliver.Shm mode")
9a89af 137     }
Z 138
c2bbe3 139     rw, err := shm.OpenSimplex(url)
Z 140     if err == nil {
9a89af 141         return &SHM{
Z 142             rw,
143             coactee,
be392a 144             make([]byte, maxRecvSize),
c2bbe3 145         }, nil
9a89af 146     }
c2bbe3 147     return nil, err
9a89af 148 }