554325746@qq.com
2019-08-21 0af498fd328f89961f9358ff25215c247c8f89d3
提交 | 用户 | age
9a89af 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
6     "io"
7
fa89b4 8     "time"
5 9
9a89af 10     "github.com/tmthrgd/shm-go"
Z 11 )
12
13 // SHM share memory
14 type SHM struct {
93ff14 15     rw  *shm.ReadWriteCloser
Z 16     typ td
9a89af 17 }
Z 18
19 // Send impl interface Diliver
20 func (s *SHM) Send(data []byte) error {
c2bbe3 21     if s == nil || s.rw == nil {
9a89af 22         return errors.New("please init shm producer first")
Z 23     }
24
fa89b4 25     ch := make(chan int)
5 26     go func(){
27         n, _ := s.rw.Write(data)
28         ch <- n
29     }()
30     select{
31     case <-ch:
32         return nil
33     case <- time.After(3 * time.Second):
34         return errors.New("send time out")
9a89af 35     }
Z 36
fa89b4 37     return errors.New("send should't here")
5 38
39     // n, err := s.rw.Write(data)
40     // if n < 1 {
41     //     fmt.Println("recv data less than 1 length")
42     // }
43
44     // return err
9a89af 45 }
Z 46
47 // Recv impl interface Diliver
48 func (s *SHM) Recv() ([]byte, error) {
49
c2bbe3 50     if s == nil || s.rw == nil {
9a89af 51         return nil, errors.New("please open shm consumer first")
Z 52     }
53
fa89b4 54     ch := make(chan []byte)
5 55     go func(){
56         data := make([]byte, maxRecvSize)
57         n, err := s.rw.Read(data)
58         if err == nil || err == io.EOF {
59             data = data[:n:n]
60         }
61         ch <- data
62     }()
63     select{
64     case d := <-ch:
65         return d, nil
66     case <- time.After(3 * time.Second):
67         return nil, errors.New("recv time out")
9a89af 68     }
Z 69
fa89b4 70     return nil, errors.New("recv should't here")
5 71
72
73     // data := make([]byte, maxRecvSize)
74     // n, err := s.rw.Read(data)
75     // if err == nil || err == io.EOF {
76     //     data = data[:n:n]
77     //     return data, nil
78     // }
79
80     // return nil, err
9a89af 81 }
Z 82
83 // Close impl interface Deliver
84 func (s *SHM) Close() {
c2bbe3 85     if s == nil {
Z 86         return
87     }
9a89af 88     if s.rw != nil {
Z 89         s.rw.Close()
90     }
91     if s.typ == agent {
92         shm.Unlink(s.rw.Name())
93     }
94 }
95
c2bbe3 96 func shmServer(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 97     if m != Shm {
c2bbe3 98         return nil, errors.New("please use deliver.Shm mode")
9a89af 99     }
Z 100
101     var param []int
102     for _, v := range args {
103         switch v.(type) {
104         case int:
105             param = append(param, v.(int))
106         default:
107
c2bbe3 108             return nil, errors.New("shmServer created recv error parameters")
9a89af 109         }
Z 110     }
3603e5 111
5 112     blocks, size := 2, maxRecvSize
113     if len(param) == 2 {
114         blocks, size = param[0], param[1]
115         // return nil, errors.New("shmServer created recv too much parameters")
9a89af 116     }
25d8c3 117     shm.Unlink(url)
Z 118
3603e5 119     rw, err := shm.CreateSimplex(url, 0644, blocks, size)
c2bbe3 120     if err == nil {
e3c917 121         fmt.Println(rw.Name())
9a89af 122         return &SHM{
Z 123             rw,
124             agent,
c2bbe3 125         }, nil
9a89af 126     }
Z 127
c2bbe3 128     return nil, err
9a89af 129 }
Z 130
c2bbe3 131 func shmClient(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 132     if m != Shm {
c2bbe3 133         return nil, errors.New("please use deliver.Shm mode")
9a89af 134     }
Z 135
c2bbe3 136     rw, err := shm.OpenSimplex(url)
Z 137     if err == nil {
9a89af 138         return &SHM{
Z 139             rw,
140             coactee,
c2bbe3 141         }, nil
9a89af 142     }
c2bbe3 143     return nil, err
9a89af 144 }