zhangmeng
2019-08-26 20a4c4bfb5b9ea427f9117408ff0e4513ebef9eb
提交 | 用户 | age
9a89af 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
fb46ee 6     "io"
512af9 7     "time"
9a89af 8
b8c958 9     "basic.com/valib/shm.git"
9a89af 10 )
Z 11
12 // SHM share memory
13 type SHM struct {
93ff14 14     rw  *shm.ReadWriteCloser
Z 15     typ td
9a89af 16 }
Z 17
18 // Send impl interface Diliver
19 func (s *SHM) Send(data []byte) error {
c2bbe3 20     if s == nil || s.rw == nil {
9a89af 21         return errors.New("please init shm producer first")
Z 22     }
23
fb46ee 24     n, err := s.rw.Write(data)
Z 25     if n < 1 {
26         fmt.Println("recv data less than 1 length")
9a89af 27     }
Z 28
fb46ee 29     return err
9a89af 30 }
Z 31
32 // Recv impl interface Diliver
33 func (s *SHM) Recv() ([]byte, error) {
34
c2bbe3 35     if s == nil || s.rw == nil {
9a89af 36         return nil, errors.New("please open shm consumer first")
Z 37     }
38
20a4c4 39     // data := make([]byte, maxRecvSize)
Z 40     // n, err := s.rw.Read(data)
41     // if err == nil || err == io.EOF {
42     //     data := make([]byte, maxRecvSize)
43     //     copy(data, s.recvData)
44     //     return data, nil
45     // }
46
47     data, err := s.rw.DirectRead()
fb46ee 48     if err == nil || err == io.EOF {
Z 49         return data, nil
9a89af 50     }
fb46ee 51     return nil, err
9a89af 52 }
Z 53
54 // Close impl interface Deliver
55 func (s *SHM) Close() {
c2bbe3 56     if s == nil {
Z 57         return
58     }
9a89af 59     if s.rw != nil {
Z 60         s.rw.Close()
61     }
62     if s.typ == agent {
63         shm.Unlink(s.rw.Name())
64     }
65 }
66
c2bbe3 67 func shmServer(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 68     if m != Shm {
c2bbe3 69         return nil, errors.New("please use deliver.Shm mode")
9a89af 70     }
Z 71
72     var param []int
73     for _, v := range args {
74         switch v.(type) {
75         case int:
76             param = append(param, v.(int))
77         default:
78
c2bbe3 79             return nil, errors.New("shmServer created recv error parameters")
9a89af 80         }
Z 81     }
fb46ee 82
512af9 83     blocks, size := 2, maxRecvSize
Z 84     if len(param) == 2 {
85         blocks, size = param[0], param[1]
86         // return nil, errors.New("shmServer created recv too much parameters")
87     }
88
89     time.Sleep(time.Millisecond)
25d8c3 90     shm.Unlink(url)
Z 91
512af9 92     rw, err := shm.CreateSimplex(url, 0644, blocks, size)
c2bbe3 93     if err == nil {
e3c917 94         fmt.Println(rw.Name())
9a89af 95         return &SHM{
20a4c4 96             rw:  rw,
Z 97             typ: agent,
c2bbe3 98         }, nil
9a89af 99     }
Z 100
c2bbe3 101     return nil, err
9a89af 102 }
Z 103
c2bbe3 104 func shmClient(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 105     if m != Shm {
c2bbe3 106         return nil, errors.New("please use deliver.Shm mode")
9a89af 107     }
Z 108
c2bbe3 109     rw, err := shm.OpenSimplex(url)
Z 110     if err == nil {
9a89af 111         return &SHM{
20a4c4 112             rw:  rw,
Z 113             typ: coactee,
c2bbe3 114         }, nil
9a89af 115     }
c2bbe3 116     return nil, err
9a89af 117 }