zhangmeng
2020-01-20 accc3295d6daacd70494ecd3a08033f505c29f66
replace sdkhelper with local common
12个文件已修改
225 ■■■■■ 已修改文件
app/master/dbfetcher.go 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/master/master.go 49 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
libcomm/db.go 110 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
libcomm/go.mod 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
libgowrapper/face @ 611a49 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
libgowrapper/humantrack @ d10763 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
libgowrapper/reid @ 432430 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
libgowrapper/vehicle @ 1a51c8 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
libgowrapper/yolo @ 51ba06 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
util/data.go 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/master/dbfetcher.go
@@ -5,13 +5,12 @@
    "analysis/logo"
    "plugin"
    "basic.com/libgowrapper/sdkstruct.git"
    "basic.com/valib/pubsub.git"
)
// Fetcher db
type Fetcher struct {
    fnInitDBAPI func(string, int, int, int, func(...interface{}))
    fnSDKInfo   func() []sdkstruct.SDKInfo
    fnInit func(string, string, int, []string, string) (chan pubsub.Message, error)
}
// NewFetcher new
@@ -23,23 +22,14 @@
        return nil
    }
    fn, err := app.LoadFunc(plug, soFile, "InitDBAPI")
    fn, err := app.LoadFunc(plug, soFile, "Init")
    if err != nil {
        logo.Infoln("Lookup Func InitDBAPI From File: ", soFile, " Error")
        logo.Infoln("Lookup Func Init From File: ", soFile, " Error")
        return nil
    }
    fnInit := fn.(func(string, int, int, int, func(...interface{})))
    fn, err = app.LoadFunc(plug, soFile, "SDKInfo")
    if err != nil {
        logo.Infoln("Lookup Func SDKInfo From File: ", soFile, " Error")
        return nil
    }
    fnSDKInfo := fn.(func() []sdkstruct.SDKInfo)
    fnInit := fn.(func(string, string, int, []string, string) (chan pubsub.Message, error))
    return &Fetcher{
        fnInitDBAPI: fnInit,
        fnSDKInfo:   fnSDKInfo,
        fnInit: fnInit,
    }
}
app/master/master.go
@@ -5,9 +5,14 @@
    "analysis/logo"
    "analysis/util"
    "context"
    "encoding/json"
    "os"
    "strconv"
    "strings"
    "time"
    "basic.com/libgowrapper/sdkstruct.git"
    "basic.com/valib/pubsub.git"
)
func reaper(ctxt context.Context) {
@@ -28,17 +33,45 @@
    logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB")
    fetcher.fnInitDBAPI(util.FSI.IP, util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln)
    // fetcher.fnInitDBAPI("192.168.20.10", util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln)
    sdks := fetcher.fnSDKInfo()
    ip := "tcp://" + util.FSI.IP
    url := ip + ":" + strconv.Itoa(util.FSI.DataPort)
    hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort)
    chMsg, err := fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid()))
    for {
        if len(sdks) == 0 {
            logo.Errorln("!!!!!!Fetcher Can't Get SDK Infos From Remote DB")
            continue
        if err == nil {
            break
        }
        break
        logo.Infoln("Analysis Fetcher INIT Error! URL:", url)
        time.Sleep(time.Second)
        chMsg, err = fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid()))
    }
    return manualStart(ctx, sdks, configPath)
    for {
        select {
        case <-ctx.Done():
            return true
        case msg := <-chMsg:
            //              sdktype       process_name   topic        null
            //                yolo/face  yolo_0/yolo_1  channel
            var sdk map[string](map[string](map[string]interface{}))
            if err := json.Unmarshal(msg.Msg, &sdk); err != nil {
                logo.Infoln("Fetcher SDK unmarshal err:", err)
                continue
            }
            logo.Infoln("~~~~~~Recv New SDKInfos")
            chCameras <- CameraInfo{
                Cameras: cameras,
            }
            logo.Infoln("~~~~~~Recv New SDKInfos Over")
        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
}
func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool {
go.mod
@@ -5,8 +5,12 @@
require (
    basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c
    basic.com/valib/gogpu.git v0.0.0-20190711044327-62043b070865
    basic.com/valib/gopherdiscovery.git v0.0.0-20200113080951-9bccb7681924 // indirect
    basic.com/valib/pubsub.git v0.0.0-20200116061307-c43a8e3e552e
    github.com/amoghe/distillog v0.0.0-20180726233512-ae382b35b717
    github.com/natefinch/lumberjack v2.0.0+incompatible
    github.com/olebedev/config v0.0.0-20190528211619-364964f3a8e4
    golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect
    gopkg.in/yaml.v2 v2.2.7 // indirect
    nanomsg.org/go-mangos v1.4.0 // indirect
)
libcomm/db.go
@@ -1,109 +1,15 @@
package main
import (
    "fmt"
    "strconv"
    "basic.com/dbapi.git"
    "basic.com/libgowrapper/sdkstruct.git"
    "basic.com/pubsub/cache.git/shardmap"
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/gopherdiscovery.git"
    "github.com/gogo/protobuf/proto"
    "basic.com/valib/pubsub.git"
)
const (
    prefixTASKSDKRULE = "TASKSDKRULE_"
)
var cMap *shardmap.ShardMap
// InitDBAPI init dbapi
func InitDBAPI(ip string, httpPort, heartBeatPort, dataPort int, log func(...interface{})) {
    dbapi.Init(ip, httpPort)
    var initchan = make(chan bool)
    go InitCache(initchan, ip, heartBeatPort, dataPort)
    log("db init done!", <-initchan)
}
// TaskInfos get camera infos from sqlite db
func TaskInfos() []protomsg.TaskSdkInfo {
    tAPI := dbapi.TaskApi{}
    tasks := tAPI.FindAll()
    return tasks
}
// SDKInfo get sdk
func SDKInfo() []sdkstruct.SDKInfo {
    sAPI := dbapi.SdkApi{}
    s := sAPI.FindAllSdkRun()
    var sdks []sdkstruct.SDKInfo
    for _, v := range s {
        sdks = append(sdks, sdkstruct.SDKInfo{
            IpcID:   v.IpcId,
            SdkType: v.SdkType,
        })
// Init init tcp://192.168.5.22:4005
func Init(url, heartBeatURL string, mode int, topics []string, processID string) (chan pubsub.Message, error) {
    p, err := pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID)
    if err != nil {
        return nil, err
    }
    return sdks
}
// InitCache cache
func InitCache(initChan chan bool, dbIP string, surveyPort int, pubSubPort int) {
    cMap = shardmap.New(uint8(32))
    urlSurvey := "tcp://" + dbIP + ":" + strconv.Itoa(surveyPort)
    urlPubSub := "tcp://" + dbIP + ":" + strconv.Itoa(pubSubPort)
    client, _ := gopherdiscovery.ClientWithSub(urlSurvey, urlPubSub, "analysisProc")
    recvMsg := client.HeartBeatMsg()
    fmt.Println(<-recvMsg)
    initCacheData(initChan)
    peers, _ := client.Peers()
    for b := range peers {
        fmt.Println("peerMsg:", b)
        updateData(b)
    }
}
func initCacheData(initChan chan bool) {
    initTaskSdkRule()
    initChan <- true
}
func initTaskSdkRule() {
    var api dbapi.TaskSdkRuleApi
    b, rules := api.FindAllTaskSdkRules()
    if b {
        if rules != nil {
            for _, tRule := range rules {
                cMap.Set(prefixTASKSDKRULE+tRule.TaskId, tRule.SdkRules)
            }
        }
    }
}
func updateData(b []byte) {
    newUpdateMsg := &protomsg.DbChangeMessage{}
    if err := proto.Unmarshal(b, newUpdateMsg); err != nil {
        fmt.Println("dbChangeMsg unmarshal err:", err)
        return
    }
    switch newUpdateMsg.Table {
    case protomsg.TableChanged_T_TaskSdkRule:
        initTaskSdkRule()
    default:
    }
}
// GetTaskSdkRules rules
func GetTaskSdkRules(taskID string) []*protomsg.SdkRuleSet {
    r, b := cMap.Get(prefixTASKSDKRULE + taskID)
    if b {
        return r.([]*protomsg.SdkRuleSet)
    }
    return nil
    c := p.Recv()
    return c, err
}
libcomm/go.mod
@@ -3,17 +3,8 @@
go 1.12
require (
    basic.com/dbapi.git v0.0.0-20191216030028-03153c1f1f30
    basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c
    basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48
    basic.com/pubsub/protomsg.git v0.0.0-20191219033725-b95da65535d0
    basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28
    github.com/ajg/form v1.5.1 // indirect
    github.com/gogo/protobuf v1.3.1
    golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect
    golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 // indirect
    golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 // indirect
    google.golang.org/grpc v1.26.0 // indirect
    honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect
    basic.com/valib/gopherdiscovery.git v0.0.0-20200113080951-9bccb7681924 // indirect
    basic.com/valib/pubsub.git v0.0.0-20200116061307-c43a8e3e552e
    golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect
    nanomsg.org/go-mangos v1.4.0 // indirect
)
libgowrapper/face
@@ -1 +1 @@
Subproject commit 45b445473158f7e38106a973ec6d57157279eee1
Subproject commit 611a497108ec9d66646e98daaec5cf94da8ad02d
libgowrapper/humantrack
@@ -1 +1 @@
Subproject commit f4171524668399dfbd487f244d0efa8efaea7d9f
Subproject commit d107634e8ddcca05b773794df7ed4e9412a11d55
libgowrapper/reid
@@ -1 +1 @@
Subproject commit 54521e1c57ec88f3dc1e31d2fcc435ed76afa848
Subproject commit 4324306f529b9bc62d7e818c0b12ff822687bb47
libgowrapper/vehicle
@@ -1 +1 @@
Subproject commit 589ea342e4aa810cb58c5b1af42f537e2732d4ff
Subproject commit 1a51c8cdd99ad79b113dfb98f0f3fb44235037f7
libgowrapper/yolo
@@ -1 +1 @@
Subproject commit 414a9974b34c5c1cdb9219a2da7daff01f335d25
Subproject commit 51ba06b0aa6278da92703f38c6c26003e4bdae88
main.go
@@ -53,7 +53,6 @@
    // 指定获取配置信息从sqlite,有最高优先级, master使用
    flag.StringVar(&util.FSI.IP, util.FetchSrvIP, util.FSI.IP, "从IP获取需要运行的SDK,master使用")
    flag.IntVar(&util.FSI.HTTPort, util.FetchSrvPort, util.FSI.HTTPort, "获取需要运行的SDK服务器的HTTP Port,master使用")
    flag.IntVar(&util.FSI.HBPort, util.FetchSrvHeartbeatPort, util.FSI.HBPort, "获取需要运行的SDK服务器的心跳 Port,master使用")
    flag.IntVar(&util.FSI.DataPort, util.FetchSrvDataPort, util.FSI.DataPort, "获取需要运行的SDK服务器的数据 Port,master使用")
@@ -74,10 +73,9 @@
}
func setParamters() {
    util.FillParams(util.FetchSrvIP, util.FSI.IP)
    util.FillParams(util.FetchSrvPort, strconv.Itoa(util.FSI.HTTPort))
    util.FillParams(util.FetchSrvHeartbeatPort, strconv.Itoa(util.FSI.HBPort))
    util.FillParams(util.FetchSrvDataPort, strconv.Itoa(util.FSI.DataPort))
    // util.FillParams(util.FetchSrvIP, util.FSI.IP)
    // util.FillParams(util.FetchSrvHeartbeatPort, strconv.Itoa(util.FSI.HBPort))
    // util.FillParams(util.FetchSrvDataPort, strconv.Itoa(util.FSI.DataPort))
    util.FillParams(util.RuleIPC, util.ToRuleIPC)
util/data.go
@@ -3,7 +3,6 @@
// FetchServerInfo 从sqlite的服务器获取SDK信息的服务器信息
type FetchServerInfo struct {
    IP       string
    HTTPort  int
    HBPort   int //心跳端口
    DataPort int //数据端口
}
@@ -12,9 +11,8 @@
    // FSI SDK信息服务器IP/Port
    FSI = &FetchServerInfo{
        IP:       "127.0.0.1",
        HTTPort:  8001,
        HBPort:   40007,
        DataPort: 50007,
        HBPort:   5005,
        DataPort: 4005,
    }
    // ToRuleIPC to ruleprocess
@@ -42,7 +40,6 @@
    ConfigPath = "config-path"
    FetchSrvIP            = "fetch-server-ip"
    FetchSrvPort          = "fetch-server-port"
    FetchSrvHeartbeatPort = "fetch-server-heartbeat-port"
    FetchSrvDataPort      = "fetch-server-data-port"