zhangmeng
2020-01-21 9d9cd1d3b93613071d1dffc1c82c4515d2a65af6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package master
 
import (
    "analysis/app"
    "analysis/logo"
    "analysis/util"
    "context"
    "encoding/json"
    "os"
    "plugin"
    "strconv"
    "strings"
    "time"
)
 
var (
    soLoaded bool
    fnFetch  func(context.Context, string, string, int, string, chan<- []byte, func(...interface{}))
    fnNotify func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc
)
 
func soLoad(soFile string) bool {
    if fnFetch != nil && fnNotify != nil {
        return true
    }
 
    plug, err := plugin.Open(soFile)
    if err != nil {
        logo.Errorln("Open: ", soFile, " error: ", err)
        return false
    }
 
    var fn plugin.Symbol
 
    fn, err = app.LoadFunc(plug, soFile, "Fetch")
    if err != nil {
        logo.Infoln("Lookup Func Fetch From File: ", soFile, " Error")
        return false
    }
    fnFetch = fn.(func(context.Context, string, string, int, string, chan<- []byte, func(...interface{})))
 
    fn, err = app.LoadFunc(plug, soFile, "Notify")
    if err != nil {
        logo.Infoln("Lookup Func Notify From File: ", soFile, " Error")
        return false
    }
    fnNotify = fn.(func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc)
    return true
}
 
func initFetcher(ctx context.Context, soFile string) <-chan []byte {
    if !soLoad(soFile) {
        logo.Errorln("New Fetcher Load so File Funcs Error From File: ", soFile)
        return nil
    }
 
    logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB")
 
    // ip := "tcp://192.168.5.22"
    ip := "tcp://" + util.FSI.IP
    url := ip + ":" + strconv.Itoa(util.FSI.DataPort)
    hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort)
 
    ch := make(chan []byte, 3)
 
    fnFetch(ctx, url, hearturl, 0, "analysis-master"+strconv.Itoa(os.Getpid()), ch, logo.Infoln)
 
    logo.Infoln("~~~~~~Start Recv SDK Infos")
    return ch
}
 
// Run run
func Run(ctx context.Context, soFile, configPath string) bool {
    daemon := NewDaemon()
    chProc := make(chan []TypeProc, 32)
    go daemon.Watch(ctx, chProc)
 
    chMsg := initFetcher(ctx, soFile)
    if chMsg == nil {
        logo.Infoln("Master Run initFetcher Failed")
        return false
    }
    params := app.GetParams()
 
    for {
        select {
        case <-ctx.Done():
            return true
        case msg := <-chMsg:
            //          sdktype       process_name   topic        null
            //            yolo/face  yolo_0/yolo_1  channel
            var sdk map[string](map[string](map[string]interface{}))
 
            if err := json.Unmarshal(msg, &sdk); err != nil {
                logo.Infoln("Fetcher SDK unmarshal err:", err)
                continue
            }
 
            logo.Infoln("~~~~~~Before Recv New SDKInfos")
 
            var typeProcs []TypeProc
 
            for sdkType, mapSdkProc := range sdk {
                config := findConfigFile(sdkType, configPath)
                if config == nil {
                    logo.Infoln("!!!!!!There Is Not ", sdkType, " Config File In ", configPath, " Skip It")
                    continue
                }
                env := checkConfig(sdkType, *config)
                if env == nil {
                    continue
                }
 
                var channels []string
                var namedProcs []NamedProc
 
                for procName, mapProcChannels := range mapSdkProc {
                    for c := range mapProcChannels {
                        channels = append(channels, c)
                    }
                    p := NamedProc{
                        Name:     procName,
                        Channels: channels,
                        Env:      *env,
                        Config:   *config,
                        Param:    params,
                    }
                    namedProcs = append(namedProcs, p)
                }
                t := TypeProc{
                    Typ:       sdkType,
                    SNameProc: namedProcs,
                }
                typeProcs = append(typeProcs, t)
            }
            chProc <- typeProcs
 
            logo.Infof("~~~~~~Recv New SDKInfos %+v\n", typeProcs)
 
        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
 
}
 
func findConfigFile(typ, configPath string) *string {
    rPath := configPath
    // default config file
    file := rPath + typ + ".json"
    // if configPath not end with '/'
    if rPath[len(rPath)-1] != '/' {
        file = rPath + "/" + typ + ".json"
    }
    // whether file exist
    if util.IsFileExist(file) {
        return &file
    }
    return nil
}
 
func checkConfig(typ, file string) *string {
    cfg, err := app.ReadConfig(file)
    if err != nil {
        logo.Errorln("!!!!!!Master Read: ", file, " for ", typ, " Config Error: ", err)
        return nil
    }
 
    // check config runtime exist if config this item
    env := strings.TrimSpace(cfg.Env)
    if len(env) > 0 {
        envs := strings.Split(env, ":")
        pathExist := true
        for _, v := range envs {
            if !util.IsFileExist(v) {
                logo.Infoln("Can't Find Runtime Path:", v, "Skip SDK: ", typ)
                pathExist = false
                break
            }
        }
        if !pathExist {
 
            return nil
        }
    }
    return &env
}