zhangmeng
2019-08-30 4f3bd6a122917eed4b5a5ead80a05ef9c143e395
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package deliver
 
import (
    "errors"
    "fmt"
    "io"
    "reflect"
    "time"
    "unsafe"
 
    "basic.com/valib/shm.git"
)
 
// SHM share memory
type SHM struct {
    rw  *shm.ReadWriteCloser
    typ td
 
    recvData []byte
}
 
// Send impl interface Diliver
func (s *SHM) Send(data []byte) error {
    if s == nil || s.rw == nil {
        return errors.New("please init shm producer first")
    }
 
    n, err := s.rw.Write(data)
    if n < 1 {
        fmt.Println("recv data less than 1 length")
    }
 
    return err
}
 
// Recv impl interface Diliver
func (s *SHM) Recv() ([]byte, error) {
 
    if s == nil || s.rw == nil {
        return nil, errors.New("please open shm consumer first")
    }
 
    // orig
 
    // data := make([]byte, maxRecvSize)
    // n, err := s.rw.Read(data)
    // if err == nil || err == io.EOF {
    //     data := data[:n:n]
    //     return data, nil
    // }
 
    // cur
 
    // data, err := s.rw.DirectRead()
    // if err == nil || err == io.EOF {
    //     return data, nil
    // }
 
    // new
 
    if s.recvData == nil {
        s.recvData = make([]byte, maxRecvSize)
    }
    sliceHeader := (*reflect.SliceHeader)(unsafe.Pointer(&s.recvData))
    sliceHeader.Cap = maxRecvSize
    sliceHeader.Len = 0
 
    n, err := s.rw.Read(s.recvData)
    if err == nil || err == io.EOF {
        sliceHeader.Len = n
        return s.recvData, nil
    }
 
    return nil, err
}
 
// Recv2 impl interface
func (s *SHM) Recv2(data []byte) (int, error) {
    if s == nil || s.rw == nil {
        return 0, errors.New("please open shm consumer first")
    }
 
    n, err := s.rw.Read(data)
    if err == nil || err == io.EOF {
        data = data[:n:n]
        return n, nil
    }
 
    return 0, err
}
 
// Close impl interface Deliver
func (s *SHM) Close() {
    if s == nil {
        return
    }
    if s.rw != nil {
        s.rw.Close()
    }
    if s.typ == agent {
        shm.Unlink(s.rw.Name())
    }
}
 
func shmServer(m Mode, url string, args ...interface{}) (*SHM, error) {
    if m != Shm {
        return nil, errors.New("please use deliver.Shm mode")
    }
 
    var param []int
    for _, v := range args {
        switch v.(type) {
        case int:
            param = append(param, v.(int))
        default:
 
            return nil, errors.New("shmServer created recv error parameters")
        }
    }
 
    blocks, size := 2, maxRecvSize
    if len(param) == 2 {
        blocks, size = param[0], param[1]
        // return nil, errors.New("shmServer created recv too much parameters")
    }
 
    time.Sleep(time.Millisecond)
    shm.Unlink(url)
 
    rw, err := shm.CreateSimplex(url, 0644, blocks, size)
    if err == nil {
        fmt.Println(rw.Name())
        return &SHM{
            rw:  rw,
            typ: agent,
        }, nil
    }
 
    return nil, err
}
 
func shmClient(m Mode, url string, args ...interface{}) (*SHM, error) {
    if m != Shm {
        return nil, errors.New("please use deliver.Shm mode")
    }
 
    rw, err := shm.OpenSimplex(url)
    if err == nil {
        return &SHM{
            rw:  rw,
            typ: coactee,
        }, nil
    }
    return nil, err
}