派生自 libgowrapper/face

zhangmeng
2020-01-15 0ffd95f2278e860736e46f8b73f357470f5a3d91
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package common
 
import (
    "context"
 
    "time"
 
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/deliver.git"
    "github.com/gogo/protobuf/proto"
)
 
// Reciever recv from ipc
type Reciever struct {
    ctx    context.Context
    ipcURL string
    chMsg  chan<- MsgRS
 
    shm      bool
    fnLogger func(...interface{})
}
 
// NewReciever new recv
func NewReciever(url string, chMsg chan<- MsgRS, shm bool, fn func(...interface{})) *Reciever {
    return &Reciever{
        ipcURL:   url,
        chMsg:    chMsg,
        shm:      shm,
        fnLogger: fn,
    }
}
 
func (r *Reciever) unserilizeProto(ctx context.Context, data <-chan []byte) {
    for {
        select {
        case <-ctx.Done():
            return
        default:
            d := <-data
            if len(d) < 100 {
                continue
            }
            // logo.Infoln(len(d), "reciver数据")
            msg := protomsg.SdkMessage{}
            if err := proto.Unmarshal(d, &msg); err != nil {
                r.fnLogger(err, " msg 处理异常")
                continue
            }
            outMsg := MsgRS{Msg: msg}
            r.chMsg <- outMsg
        }
    }
}
 
// Run run a IPC client
func (r *Reciever) Run(ctx context.Context) {
 
    if r.shm {
        r.runShm(ctx)
    } else {
        r.run(ctx, deliver.NewClient(mode, r.ipcURL))
    }
}
 
func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
 
    dataChan := make(chan []byte, 3)
 
    go r.unserilizeProto(ctx, dataChan)
 
    count := 0
 
    for {
        select {
        case <-ctx.Done():
            i.Close()
            return
        default:
 
            if r.shm {
                if d, err := i.Recv(); err != nil {
                    i.Close()
                    r.fnLogger("ANALYSIS RECV ERROR: ", err)
 
                    c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
                    for {
                        if err == nil {
                            break
                        }
                        time.Sleep(time.Second)
                        c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
                        r.fnLogger("ANALYSIS CREATE FAILED : ", err)
                    }
                    i = c
                    r.fnLogger("ANALYSIS CREATE SHM")
                } else {
                    if d != nil {
                        count++
                        if count > 10 {
                            count = 0
                            r.fnLogger("~~~shm recv image:", len(d))
                        }
                        dataChan <- d
                    }
                }
            } else {
                if msg, err := i.Recv(); err != nil {
                    // logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
                } else {
                    count++
                    if count > 10 {
                        count = 0
                        r.fnLogger("~~~mangos recv image:", len(msg))
                    }
                    dataChan <- msg
                }
            }
 
        }
    }
}
 
func (r *Reciever) runShm(ctx context.Context) {
    c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
    for {
        if err == nil {
            break
        }
        time.Sleep(1 * time.Second)
        c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
        r.fnLogger("CLIENT CREATE FAILED : ", err)
    }
    r.run(ctx, c)
}