554325746@qq.com
2019-08-21 fa89b4cd58f4b71fdcda3736e10ded1bad263aaf
提交 | 用户 | age
9a89af 1 package deliver
Z 2
3 import (
4     "errors"
5     "fmt"
6     "io"
7
fa89b4 8     "time"
5 9
9a89af 10     "github.com/tmthrgd/shm-go"
Z 11 )
12
13 // SHM share memory
14 type SHM struct {
93ff14 15     rw  *shm.ReadWriteCloser
Z 16     typ td
9a89af 17 }
Z 18
19 // Send impl interface Diliver
20 func (s *SHM) Send(data []byte) error {
c2bbe3 21     if s == nil || s.rw == nil {
9a89af 22         return errors.New("please init shm producer first")
Z 23     }
24
fa89b4 25     ch := make(chan int)
5 26     go func(){
27         n, _ := s.rw.Write(data)
28         ch <- n
29     }()
30     select{
31     case <-ch:
32         return nil
33     case <- time.After(3 * time.Second):
34         return errors.New("send time out")
9a89af 35     }
Z 36
fa89b4 37     return errors.New("send should't here")
5 38
39     // n, err := s.rw.Write(data)
40     // if n < 1 {
41     //     fmt.Println("recv data less than 1 length")
42     // }
43
44     // return err
9a89af 45 }
Z 46
47 // Recv impl interface Diliver
48 func (s *SHM) Recv() ([]byte, error) {
49
c2bbe3 50     if s == nil || s.rw == nil {
9a89af 51         return nil, errors.New("please open shm consumer first")
Z 52     }
53
fa89b4 54     ch := make(chan []byte)
5 55     go func(){
56         data := make([]byte, maxRecvSize)
57         n, err := s.rw.Read(data)
58         if err == nil || err == io.EOF {
59             data = data[:n:n]
60         }
61         ch <- data
62     }()
63     select{
64     case d := <-ch:
65         return d, nil
66     case <- time.After(3 * time.Second):
67         return nil, errors.New("recv time out")
9a89af 68     }
Z 69
fa89b4 70     return nil, errors.New("recv should't here")
5 71
72
73     // data := make([]byte, maxRecvSize)
74     // n, err := s.rw.Read(data)
75     // if err == nil || err == io.EOF {
76     //     data = data[:n:n]
77     //     return data, nil
78     // }
79
80     // return nil, err
9a89af 81 }
Z 82
83 // Close impl interface Deliver
84 func (s *SHM) Close() {
c2bbe3 85     if s == nil {
Z 86         return
87     }
9a89af 88     if s.rw != nil {
Z 89         s.rw.Close()
90     }
91     if s.typ == agent {
92         shm.Unlink(s.rw.Name())
93     }
94 }
95
c2bbe3 96 func shmServer(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 97     if m != Shm {
c2bbe3 98         return nil, errors.New("please use deliver.Shm mode")
9a89af 99     }
Z 100
101     var param []int
102     for _, v := range args {
103         fmt.Println(v)
104         switch v.(type) {
105         case int:
106             param = append(param, v.(int))
107         default:
108
c2bbe3 109             return nil, errors.New("shmServer created recv error parameters")
9a89af 110         }
Z 111     }
112     if len(param) != 2 {
c2bbe3 113         return nil, errors.New("shmServer created recv too much parameters")
9a89af 114     }
c2bbe3 115
25d8c3 116     shm.Unlink(url)
Z 117
c2bbe3 118     rw, err := shm.CreateSimplex(url, 0644, param[0], param[1])
Z 119     if err == nil {
e3c917 120         fmt.Println(rw.Name())
9a89af 121         return &SHM{
Z 122             rw,
123             agent,
c2bbe3 124         }, nil
9a89af 125     }
Z 126
c2bbe3 127     return nil, err
9a89af 128 }
Z 129
c2bbe3 130 func shmClient(m Mode, url string, args ...interface{}) (*SHM, error) {
9a89af 131     if m != Shm {
c2bbe3 132         return nil, errors.New("please use deliver.Shm mode")
9a89af 133     }
Z 134
c2bbe3 135     rw, err := shm.OpenSimplex(url)
Z 136     if err == nil {
9a89af 137         return &SHM{
Z 138             rw,
139             coactee,
c2bbe3 140         }, nil
9a89af 141     }
c2bbe3 142     return nil, err
9a89af 143 }