554325746@qq.com
2019-08-22 0c22e1e1b5c77fa5d09600200239bd3a0907fc78
提交 | 用户 | age
9a89af 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
6     "io"
7
fa89b4 8     "time"
5 9
9a89af 10     "github.com/tmthrgd/shm-go"
Z 11 )
12
13 // SHM share memory
14 type SHM struct {
93ff14 15     rw  *shm.ReadWriteCloser
Z 16     typ td
43e52a 17
5 18     data []byte
9a89af 19 }
Z 20
21 // Send impl interface Diliver
22 func (s *SHM) Send(data []byte) error {
c2bbe3 23     if s == nil || s.rw == nil {
9a89af 24         return errors.New("please init shm producer first")
Z 25     }
26
fa89b4 27     ch := make(chan int)
5 28     go func(){
29         n, _ := s.rw.Write(data)
30         ch <- n
31     }()
32     select{
33     case <-ch:
34         return nil
35     case <- time.After(3 * time.Second):
36         return errors.New("send time out")
9a89af 37     }
Z 38
fa89b4 39     return errors.New("send should't here")
5 40
41     // n, err := s.rw.Write(data)
42     // if n < 1 {
43     //     fmt.Println("recv data less than 1 length")
44     // }
45
46     // return err
9a89af 47 }
Z 48
49 // Recv impl interface Diliver
50 func (s *SHM) Recv() ([]byte, error) {
51
c2bbe3 52     if s == nil || s.rw == nil {
9a89af 53         return nil, errors.New("please open shm consumer first")
Z 54     }
55
fa89b4 56     ch := make(chan []byte)
5 57     go func(){
0c22e1 58         var data []byte
43e52a 59         n, err := s.rw.Read(s.data)
fa89b4 60         if err == nil || err == io.EOF {
0c22e1 61             data = make([]byte, n)
5 62             copy(data, s.data)
63             // s.data = s.data[:n:n]
fa89b4 64         }
43e52a 65         ch <- s.data
fa89b4 66     }()
5 67     select{
68     case d := <-ch:
69         return d, nil
70     case <- time.After(3 * time.Second):
71         return nil, errors.New("recv time out")
9a89af 72     }
Z 73
fa89b4 74     return nil, errors.New("recv should't here")
5 75
76
77     // data := make([]byte, maxRecvSize)
78     // n, err := s.rw.Read(data)
79     // if err == nil || err == io.EOF {
80     //     data = data[:n:n]
81     //     return data, nil
82     // }
83
84     // return nil, err
9a89af 85 }
Z 86
87 // Close impl interface Deliver
88 func (s *SHM) Close() {
c2bbe3 89     if s == nil {
Z 90         return
91     }
9a89af 92     if s.rw != nil {
Z 93         s.rw.Close()
94     }
95     if s.typ == agent {
96         shm.Unlink(s.rw.Name())
97     }
98 }
99
c2bbe3 100 func shmServer(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 101     if m != Shm {
c2bbe3 102         return nil, errors.New("please use deliver.Shm mode")
9a89af 103     }
Z 104
105     var param []int
106     for _, v := range args {
107         switch v.(type) {
108         case int:
109             param = append(param, v.(int))
110         default:
111
c2bbe3 112             return nil, errors.New("shmServer created recv error parameters")
9a89af 113         }
Z 114     }
3603e5 115
5 116     blocks, size := 2, maxRecvSize
117     if len(param) == 2 {
118         blocks, size = param[0], param[1]
119         // return nil, errors.New("shmServer created recv too much parameters")
9a89af 120     }
25d8c3 121     shm.Unlink(url)
Z 122
3603e5 123     rw, err := shm.CreateSimplex(url, 0644, blocks, size)
c2bbe3 124     if err == nil {
e3c917 125         fmt.Println(rw.Name())
9a89af 126         return &SHM{
Z 127             rw,
128             agent,
43e52a 129             make([]byte, maxRecvSize),
c2bbe3 130         }, nil
9a89af 131     }
Z 132
c2bbe3 133     return nil, err
9a89af 134 }
Z 135
c2bbe3 136 func shmClient(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 137     if m != Shm {
c2bbe3 138         return nil, errors.New("please use deliver.Shm mode")
9a89af 139     }
Z 140
c2bbe3 141     rw, err := shm.OpenSimplex(url)
Z 142     if err == nil {
9a89af 143         return &SHM{
Z 144             rw,
145             coactee,
43e52a 146             make([]byte, maxRecvSize),
c2bbe3 147         }, nil
9a89af 148     }
c2bbe3 149     return nil, err
9a89af 150 }