554325746@qq.com
2019-08-21 bf6ffa3d11933e397d2135a97dfd73c624b4f864
提交 | 用户 | age
9a89af 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
6     "io"
7
fa89b4 8     "time"
5 9
9a89af 10     "github.com/tmthrgd/shm-go"
Z 11 )
12
13 // SHM share memory
14 type SHM struct {
93ff14 15     rw  *shm.ReadWriteCloser
Z 16     typ td
9a89af 17 }
Z 18
19 // Send impl interface Diliver
20 func (s *SHM) Send(data []byte) error {
c2bbe3 21     if s == nil || s.rw == nil {
9a89af 22         return errors.New("please init shm producer first")
Z 23     }
24
fa89b4 25     ch := make(chan int)
5 26     go func(){
27         n, _ := s.rw.Write(data)
28         ch <- n
29     }()
30     select{
31     case <-ch:
32         return nil
33     case <- time.After(3 * time.Second):
34         return errors.New("send time out")
9a89af 35     }
Z 36
fa89b4 37     return errors.New("send should't here")
5 38
39     // n, err := s.rw.Write(data)
40     // if n < 1 {
41     //     fmt.Println("recv data less than 1 length")
42     // }
43
44     // return err
9a89af 45 }
Z 46
47 // Recv impl interface Diliver
48 func (s *SHM) Recv() ([]byte, error) {
49
c2bbe3 50     if s == nil || s.rw == nil {
9a89af 51         return nil, errors.New("please open shm consumer first")
Z 52     }
53
fa89b4 54     ch := make(chan []byte)
5 55     go func(){
56         data := make([]byte, maxRecvSize)
57         n, err := s.rw.Read(data)
58         if err == nil || err == io.EOF {
59             data = data[:n:n]
60         }
61         ch <- data
62     }()
63     select{
64     case d := <-ch:
65         return d, nil
66     case <- time.After(3 * time.Second):
67         return nil, errors.New("recv time out")
9a89af 68     }
Z 69
fa89b4 70     return nil, errors.New("recv should't here")
5 71
72
73     // data := make([]byte, maxRecvSize)
74     // n, err := s.rw.Read(data)
75     // if err == nil || err == io.EOF {
76     //     data = data[:n:n]
77     //     return data, nil
78     // }
79
80     // return nil, err
9a89af 81 }
Z 82
83 // Close impl interface Deliver
84 func (s *SHM) Close() {
c2bbe3 85     if s == nil {
Z 86         return
87     }
9a89af 88     if s.rw != nil {
Z 89         s.rw.Close()
90     }
91     if s.typ == agent {
92         shm.Unlink(s.rw.Name())
93     }
94 }
95
c2bbe3 96 func shmServer(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 97     if m != Shm {
c2bbe3 98         return nil, errors.New("please use deliver.Shm mode")
9a89af 99     }
Z 100
101     var param []int
102     for _, v := range args {
103         switch v.(type) {
104         case int:
105             param = append(param, v.(int))
106         default:
107
c2bbe3 108             return nil, errors.New("shmServer created recv error parameters")
9a89af 109         }
Z 110     }
bf6ffa 111     blocks, size := 2, maxRecvSize
9a89af 112     if len(param) != 2 {
c2bbe3 113         return nil, errors.New("shmServer created recv too much parameters")
9a89af 114     }
c2bbe3 115
bf6ffa 116     blocks, size = param[0], param[1]
5 117
25d8c3 118     shm.Unlink(url)
Z 119
bf6ffa 120     rw, err := shm.CreateSimplex(url, 0644, blocks, size)
c2bbe3 121     if err == nil {
e3c917 122         fmt.Println(rw.Name())
9a89af 123         return &SHM{
Z 124             rw,
125             agent,
c2bbe3 126         }, nil
9a89af 127     }
Z 128
c2bbe3 129     return nil, err
9a89af 130 }
Z 131
c2bbe3 132 func shmClient(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 133     if m != Shm {
c2bbe3 134         return nil, errors.New("please use deliver.Shm mode")
9a89af 135     }
Z 136
c2bbe3 137     rw, err := shm.OpenSimplex(url)
Z 138     if err == nil {
9a89af 139         return &SHM{
Z 140             rw,
141             coactee,
c2bbe3 142         }, nil
9a89af 143     }
c2bbe3 144     return nil, err
9a89af 145 }