reid from https://github.com/michuanhaohao/reid-strong-baseline
zhangmeng
2020-01-14 5459ba1d3f7f944aa97923ed9c09a5dbc7663928
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package common
 
import (
    "context"
    "time"
 
    "basic.com/pubsub/protomsg.git"
 
    "basic.com/libgowrapper/sdkstruct.git"
)
 
/////////////////////////////////////////////////////////////////
 
// FlowCreate create flow
func FlowCreate(ctx context.Context, id string, shm bool, ipc2Rule string, ruleCacheSize int, fn func(...interface{})) (<-chan []byte, chan<- sdkstruct.MsgSDK) {
 
    const (
        postPull = `_1`
        postPush = `_2`
    )
    ipcRcv := GetIpcAddress(shm, id+postPull)
    ipcSnd := GetIpcAddress(shm, id+postPush)
    chRcv := make(chan []byte, 3)
    chSnd := make(chan sdkstruct.MsgSDK, 3)
 
    rcver := NewReciever(ipcRcv, chRcv, shm, fn)
    snder := NewSender(ipcSnd, chSnd, shm, fn)
    torule := NewToRule(ipc2Rule, ruleCacheSize, fn)
 
    snder.ApplyCallbackFunc(torule.Push)
 
    go rcver.Run(ctx)
    go snder.Run(ctx)
    go torule.Run(ctx)
 
    return chRcv, chSnd
}
 
// WorkFlowSimple work
func WorkFlowSimple(ctx context.Context, out chan<- sdkstruct.MsgSDK, typ string,
    fnConsume func() []interface{}, fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
    fn func(...interface{})) {
 
    tm := time.Now()
    sc := 0
 
    for {
        select {
        case <-ctx.Done():
            return
        default:
 
            elems := fnConsume()
            if elems == nil || len(elems) == 0 {
                time.Sleep(10 * time.Millisecond)
                continue
            }
 
            var msgs []protomsg.SdkMessage
            for _, v := range elems {
                msgs = append(msgs, v.(protomsg.SdkMessage))
            }
 
            fnRun(msgs, out, typ)
 
            sc++
            if sc == 25 {
                fn(typ, " RUN 25 FRAME USE TIME: ", time.Since(tm))
                sc = 0
                tm = time.Now()
            }
            if time.Since(tm) > time.Second {
                fn(typ, " RUN ", sc, " FRAME USE TIME: ", time.Since(tm))
                sc = 0
                tm = time.Now()
            }
        }
    }
}
 
// FlowSimple wrap
func FlowSimple(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string,
    fnProduce func(interface{}), fnConsume func() []interface{},
    fnRun func(protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
    fnClose func(), fn func(...interface{})) {
 
    cb := func(msgs []protomsg.SdkMessage, ch chan<- sdkstruct.MsgSDK, typ string) {
        fnRun(msgs[0], ch, typ)
    }
 
    FlowBatch(ctx, in, out, typ, fnProduce, fnConsume, cb, fnClose, fn)
 
}
 
// FlowBatch batch
func FlowBatch(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string,
    fnProduce func(interface{}), fnConsume func() []interface{},
    fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
    fnClose func(), fn func(...interface{})) {
 
    chMsg := make(chan protomsg.SdkMessage, 3)
    go UnserilizeProto(ctx, in, chMsg, fn)
 
    go WorkFlowSimple(ctx, out, typ, fnConsume, fnRun, fn)
 
    for {
        select {
        case <-ctx.Done():
            fnClose()
            return
        case rMsg := <-chMsg:
            if !ValidRemoteMessage(rMsg, typ, fn) {
                fn(typ, " validremotemessage invalid")
                EjectResult(nil, rMsg, out)
                continue
            }
            fnProduce(rMsg)
 
        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
 
}