zhangmeng
2019-08-27 fa924718b1e8d5f566f7655807e77df1bcfe8f86
提交 | 用户 | age
9a89af 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
fb46ee 6     "io"
512af9 7     "time"
9a89af 8
b8c958 9     "basic.com/valib/shm.git"
9a89af 10 )
Z 11
12 // SHM share memory
13 type SHM struct {
93ff14 14     rw  *shm.ReadWriteCloser
Z 15     typ td
9a89af 16 }
Z 17
18 // Send impl interface Diliver
19 func (s *SHM) Send(data []byte) error {
c2bbe3 20     if s == nil || s.rw == nil {
9a89af 21         return errors.New("please init shm producer first")
Z 22     }
23
fb46ee 24     n, err := s.rw.Write(data)
Z 25     if n < 1 {
26         fmt.Println("recv data less than 1 length")
9a89af 27     }
Z 28
fb46ee 29     return err
9a89af 30 }
Z 31
32 // Recv impl interface Diliver
33 func (s *SHM) Recv() ([]byte, error) {
34
c2bbe3 35     if s == nil || s.rw == nil {
9a89af 36         return nil, errors.New("please open shm consumer first")
Z 37     }
38
20a4c4 39     // data := make([]byte, maxRecvSize)
Z 40     // n, err := s.rw.Read(data)
41     // if err == nil || err == io.EOF {
020e17 42     //     data := data[:n:n]
20a4c4 43     //     return data, nil
Z 44     // }
45
46     data, err := s.rw.DirectRead()
fb46ee 47     if err == nil || err == io.EOF {
Z 48         return data, nil
9a89af 49     }
fb46ee 50     return nil, err
9a89af 51 }
Z 52
020e17 53 // Recv2 impl interface
fa9247 54 func (s *SHM) Recv2(data []byte) (int, error) {
020e17 55     if s == nil || s.rw == nil {
fa9247 56         return 0, errors.New("please open shm consumer first")
020e17 57     }
Z 58
59     n, err := s.rw.Read(data)
60     if err == nil || err == io.EOF {
61         data = data[:n:n]
fa9247 62         return n, nil
020e17 63     }
Z 64
fa9247 65     return 0, err
020e17 66 }
Z 67
9a89af 68 // Close impl interface Deliver
Z 69 func (s *SHM) Close() {
c2bbe3 70     if s == nil {
Z 71         return
72     }
9a89af 73     if s.rw != nil {
Z 74         s.rw.Close()
75     }
76     if s.typ == agent {
77         shm.Unlink(s.rw.Name())
78     }
79 }
80
c2bbe3 81 func shmServer(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 82     if m != Shm {
c2bbe3 83         return nil, errors.New("please use deliver.Shm mode")
9a89af 84     }
Z 85
86     var param []int
87     for _, v := range args {
88         switch v.(type) {
89         case int:
90             param = append(param, v.(int))
91         default:
92
c2bbe3 93             return nil, errors.New("shmServer created recv error parameters")
9a89af 94         }
Z 95     }
fb46ee 96
512af9 97     blocks, size := 2, maxRecvSize
Z 98     if len(param) == 2 {
99         blocks, size = param[0], param[1]
100         // return nil, errors.New("shmServer created recv too much parameters")
101     }
102
103     time.Sleep(time.Millisecond)
25d8c3 104     shm.Unlink(url)
Z 105
512af9 106     rw, err := shm.CreateSimplex(url, 0644, blocks, size)
c2bbe3 107     if err == nil {
e3c917 108         fmt.Println(rw.Name())
9a89af 109         return &SHM{
20a4c4 110             rw:  rw,
Z 111             typ: agent,
c2bbe3 112         }, nil
9a89af 113     }
Z 114
c2bbe3 115     return nil, err
9a89af 116 }
Z 117
c2bbe3 118 func shmClient(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 119     if m != Shm {
c2bbe3 120         return nil, errors.New("please use deliver.Shm mode")
9a89af 121     }
Z 122
c2bbe3 123     rw, err := shm.OpenSimplex(url)
Z 124     if err == nil {
9a89af 125         return &SHM{
20a4c4 126             rw:  rw,
Z 127             typ: coactee,
c2bbe3 128         }, nil
9a89af 129     }
c2bbe3 130     return nil, err
9a89af 131 }