554325746@qq.com
2019-08-22 43e52a52e24ad4fd00d351b21259b9a76affbaed
提交 | 用户 | age
9a89af 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
6     "io"
7
fa89b4 8     "time"
5 9
9a89af 10     "github.com/tmthrgd/shm-go"
Z 11 )
12
13 // SHM share memory
14 type SHM struct {
93ff14 15     rw  *shm.ReadWriteCloser
Z 16     typ td
43e52a 17
5 18     data []byte
9a89af 19 }
Z 20
21 // Send impl interface Diliver
22 func (s *SHM) Send(data []byte) error {
c2bbe3 23     if s == nil || s.rw == nil {
9a89af 24         return errors.New("please init shm producer first")
Z 25     }
26
fa89b4 27     ch := make(chan int)
5 28     go func(){
29         n, _ := s.rw.Write(data)
30         ch <- n
31     }()
32     select{
33     case <-ch:
34         return nil
35     case <- time.After(3 * time.Second):
36         return errors.New("send time out")
9a89af 37     }
Z 38
fa89b4 39     return errors.New("send should't here")
5 40
41     // n, err := s.rw.Write(data)
42     // if n < 1 {
43     //     fmt.Println("recv data less than 1 length")
44     // }
45
46     // return err
9a89af 47 }
Z 48
49 // Recv impl interface Diliver
50 func (s *SHM) Recv() ([]byte, error) {
51
c2bbe3 52     if s == nil || s.rw == nil {
9a89af 53         return nil, errors.New("please open shm consumer first")
Z 54     }
55
fa89b4 56     ch := make(chan []byte)
5 57     go func(){
43e52a 58         n, err := s.rw.Read(s.data)
fa89b4 59         if err == nil || err == io.EOF {
43e52a 60             s.data = s.data[:n:n]
fa89b4 61         }
43e52a 62         ch <- s.data
fa89b4 63     }()
5 64     select{
65     case d := <-ch:
66         return d, nil
67     case <- time.After(3 * time.Second):
68         return nil, errors.New("recv time out")
9a89af 69     }
Z 70
fa89b4 71     return nil, errors.New("recv should't here")
5 72
73
74     // data := make([]byte, maxRecvSize)
75     // n, err := s.rw.Read(data)
76     // if err == nil || err == io.EOF {
77     //     data = data[:n:n]
78     //     return data, nil
79     // }
80
81     // return nil, err
9a89af 82 }
Z 83
84 // Close impl interface Deliver
85 func (s *SHM) Close() {
c2bbe3 86     if s == nil {
Z 87         return
88     }
9a89af 89     if s.rw != nil {
Z 90         s.rw.Close()
91     }
92     if s.typ == agent {
93         shm.Unlink(s.rw.Name())
94     }
95 }
96
c2bbe3 97 func shmServer(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 98     if m != Shm {
c2bbe3 99         return nil, errors.New("please use deliver.Shm mode")
9a89af 100     }
Z 101
102     var param []int
103     for _, v := range args {
104         switch v.(type) {
105         case int:
106             param = append(param, v.(int))
107         default:
108
c2bbe3 109             return nil, errors.New("shmServer created recv error parameters")
9a89af 110         }
Z 111     }
3603e5 112
5 113     blocks, size := 2, maxRecvSize
114     if len(param) == 2 {
115         blocks, size = param[0], param[1]
116         // return nil, errors.New("shmServer created recv too much parameters")
9a89af 117     }
25d8c3 118     shm.Unlink(url)
Z 119
3603e5 120     rw, err := shm.CreateSimplex(url, 0644, blocks, size)
c2bbe3 121     if err == nil {
e3c917 122         fmt.Println(rw.Name())
9a89af 123         return &SHM{
Z 124             rw,
125             agent,
43e52a 126             make([]byte, maxRecvSize),
c2bbe3 127         }, nil
9a89af 128     }
Z 129
c2bbe3 130     return nil, err
9a89af 131 }
Z 132
c2bbe3 133 func shmClient(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 134     if m != Shm {
c2bbe3 135         return nil, errors.New("please use deliver.Shm mode")
9a89af 136     }
Z 137
c2bbe3 138     rw, err := shm.OpenSimplex(url)
Z 139     if err == nil {
9a89af 140         return &SHM{
Z 141             rw,
142             coactee,
43e52a 143             make([]byte, maxRecvSize),
c2bbe3 144         }, nil
9a89af 145     }
c2bbe3 146     return nil, err
9a89af 147 }