554325746@qq.com
2019-08-22 d1980f96745856299a784c5f43aa2ee5fd9fe8b5
提交 | 用户 | age
9a89af 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
6     "io"
7
fa89b4 8     "time"
5 9
9a89af 10     "github.com/tmthrgd/shm-go"
Z 11 )
12
13 // SHM share memory
14 type SHM struct {
93ff14 15     rw  *shm.ReadWriteCloser
Z 16     typ td
9a89af 17 }
Z 18
19 // Send impl interface Diliver
20 func (s *SHM) Send(data []byte) error {
c2bbe3 21     if s == nil || s.rw == nil {
9a89af 22         return errors.New("please init shm producer first")
Z 23     }
24
fa89b4 25     ch := make(chan int)
5 26     go func(){
27         n, _ := s.rw.Write(data)
28         ch <- n
29     }()
30     select{
31     case <-ch:
32         return nil
33     case <- time.After(3 * time.Second):
34         return errors.New("send time out")
9a89af 35     }
Z 36
fa89b4 37     return errors.New("send should't here")
5 38
39     // n, err := s.rw.Write(data)
40     // if n < 1 {
41     //     fmt.Println("recv data less than 1 length")
42     // }
43
44     // return err
9a89af 45 }
Z 46
47 // Recv impl interface Diliver
48 func (s *SHM) Recv() ([]byte, error) {
49
c2bbe3 50     if s == nil || s.rw == nil {
9a89af 51         return nil, errors.New("please open shm consumer first")
Z 52     }
53
fa89b4 54     ch := make(chan []byte)
5 55     go func(){
d1980f 56         var body []byte
fa89b4 57         data := make([]byte, maxRecvSize)
5 58         n, err := s.rw.Read(data)
59         if err == nil || err == io.EOF {
d1980f 60             body = make([]byte, n)
5 61             copy(body, data)
62             // data = data[:n:n]
fa89b4 63         }
5 64         ch <- data
65     }()
66     select{
67     case d := <-ch:
68         return d, nil
69     case <- time.After(3 * time.Second):
70         return nil, errors.New("recv time out")
9a89af 71     }
Z 72
fa89b4 73     return nil, errors.New("recv should't here")
5 74
75
76     // data := make([]byte, maxRecvSize)
77     // n, err := s.rw.Read(data)
78     // if err == nil || err == io.EOF {
79     //     data = data[:n:n]
80     //     return data, nil
81     // }
82
83     // return nil, err
9a89af 84 }
Z 85
86 // Close impl interface Deliver
87 func (s *SHM) Close() {
c2bbe3 88     if s == nil {
Z 89         return
90     }
9a89af 91     if s.rw != nil {
Z 92         s.rw.Close()
93     }
94     if s.typ == agent {
95         shm.Unlink(s.rw.Name())
96     }
97 }
98
c2bbe3 99 func shmServer(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 100     if m != Shm {
c2bbe3 101         return nil, errors.New("please use deliver.Shm mode")
9a89af 102     }
Z 103
104     var param []int
105     for _, v := range args {
106         switch v.(type) {
107         case int:
108             param = append(param, v.(int))
109         default:
110
c2bbe3 111             return nil, errors.New("shmServer created recv error parameters")
9a89af 112         }
Z 113     }
3603e5 114
5 115     blocks, size := 2, maxRecvSize
116     if len(param) == 2 {
117         blocks, size = param[0], param[1]
118         // return nil, errors.New("shmServer created recv too much parameters")
9a89af 119     }
25d8c3 120     shm.Unlink(url)
Z 121
3603e5 122     rw, err := shm.CreateSimplex(url, 0644, blocks, size)
c2bbe3 123     if err == nil {
e3c917 124         fmt.Println(rw.Name())
9a89af 125         return &SHM{
Z 126             rw,
127             agent,
c2bbe3 128         }, nil
9a89af 129     }
Z 130
c2bbe3 131     return nil, err
9a89af 132 }
Z 133
c2bbe3 134 func shmClient(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 135     if m != Shm {
c2bbe3 136         return nil, errors.New("please use deliver.Shm mode")
9a89af 137     }
Z 138
c2bbe3 139     rw, err := shm.OpenSimplex(url)
Z 140     if err == nil {
9a89af 141         return &SHM{
Z 142             rw,
143             coactee,
c2bbe3 144         }, nil
9a89af 145     }
c2bbe3 146     return nil, err
9a89af 147 }