reid from https://github.com/michuanhaohao/reid-strong-baseline
554325746@qq.com
2020-03-25 b2500a8eb6665ce6efe0a7d954b6f101af83d7ec
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package rpc
 
import (
    "context"
    "time"
 
    "basic.com/valib/deliver.git"
)
 
// Sender decoder ingo
type Sender struct {
    ipcURL string
    in     <-chan []byte
    chCap  int
 
    shm      bool
    fnLogger func(...interface{})
}
 
// NewSender Sender
func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender {
    return &Sender{
        ipcURL: ipcURL,
        in:     in,
        chCap:  cap(in),
 
        shm:      shm,
        fnLogger: fn,
    }
}
 
// Run run a IPC producer
func (s *Sender) Run(ctx context.Context) {
 
    i := s.createIPC(ctx, 50*time.Millisecond, 200)
 
    for {
        select {
        case <-ctx.Done():
            i.Close()
            return
        case d := <-s.in:
 
            if len(s.in) > s.chCap/2 {
                for i := 0; i < s.chCap/2; i++ {
                    <-s.in
                }
            }
            if len(s.in) > 0 {
                d = <-s.in
            }
 
            if s.shm {
                if i == nil {
                    s.fnLogger("!!!!!!SDK Send createIPC not ready error:", s.ipcURL)
                    i = s.createIPC(ctx, 50*time.Millisecond, 10)
                }
                if i == nil {
                    s.fnLogger("!!!!!!SDK Send createIPC error:", s.ipcURL)
                    continue
                }
 
                t := time.Now()
 
                if err := i.Send(d); err != nil {
                    i.Close()
                    s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err)
 
                    i = s.createIPC(ctx, 50*time.Millisecond, 20)
                    s.fnLogger("ANALYSIS SENDER CREATE SHM:", s.ipcURL)
                } else {
                    s.fnLogger("~~~~~~ shm send to reid len: ", len(d))
                }
                s.fnLogger("&&&&&&Sender------>Reid One Time:", time.Since(t))
 
            } else {
                err := i.Send(d)
                if err != nil {
                    // logo.Errorln("error sender 2 pubsub: ", err)
                } else {
                    s.fnLogger("mangos send to pubsub len: ", len(d))
                }
            }
        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
}
 
func (s *Sender) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver {
 
    mode := deliver.PushPull
    if s.shm {
        mode = deliver.Shm
    }
 
    try := 0
 
    c, err := deliver.NewClientWithError(mode, s.ipcURL)
loopR:
    for {
        select {
        case <-ctx.Done():
            return nil
        default:
            if err == nil {
                break loopR
            }
            if loop > 0 {
                try++
                if try > loop {
                    return nil
                }
                time.Sleep(wait)
            } else {
                time.Sleep(time.Second)
            }
            c, err = deliver.NewClientWithError(mode, s.ipcURL)
        }
    }
    return c
}