chenshijun
2019-07-27 b5560d3cae1164f016ca9339592eda6b9008fb9f
重构代码
6个文件已修改
448 ■■■■ 已修改文件
camera/camera.go 138 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sdk/sdk.go 213 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
taskpubsub 补丁 | 查看 | 原始文档 | blame | 历史
tasktag/tasktag.go 33 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
util/sqlite.go 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
util/util.go 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
camera/camera.go
@@ -1,29 +1,34 @@
package camera
import (
    "errors"
    "basic.com/valib/deliver.git"
    //"sync"
    "taskpubsub/logger"
    "basic.com/pubsub/protomsg.git"
    "github.com/gogo/protobuf/proto"
    "taskpubsub/sdk"
    "taskpubsub/util"
    "github.com/gogo/protobuf/proto"
    "basic.com/pubsub/protomsg.git"
    "context"
    "fmt"
    "sync"
    //"time"
)
var SocketManage sync.Map
const (
    faceExtractWebCID    = "virtual-face-extract-web-camera-id"
    faceExtractWebTaskID = "92496BDF-2BFA-98F2-62E8-96DD9866ABD2"
)
type SocketContext struct {
    Sock    deliver.Deliver
    Context context.Context
    Cancel  context.CancelFunc
var SocketManage = make(map[string]util.SocketContext)
var innerRecvTopic = []string{
    "virtual-faceextract-sdk-pull_2", //to web 以图搜图
}
func initInnerTopic() {
    for _, recvTopic := range innerRecvTopic {
        createCameraRecvServerAndListen(recvTopic)
    }
}
func Init() {
@@ -35,90 +40,74 @@
    // 摄像机初始化
    for _, cam := range util.CameraIds {
        CreateCamera(cam.Id, "camera")
        createCameraRecvServerAndListen(cam.Id)
    }
    //  web端初始化
    CreateCamera("virtual-faceextract-sdk-pull_2" , "web")
    // 手动输入的主题
    initInnerTopic()
    go AutoDelCamera(util.Cameraflag)
    go autoUpdateCamera(util.Cameraflag)
}
// camera 接受数据
func CreateCamera(id string, remote string) {
        if _, ok := SocketManage.Load(id); !ok {
            url := fmt.Sprintf("ipc:///tmp/%s.ipc", id)
func createCameraRecvServerAndListen(id string) {
    if _, isExist := SocketManage[id]; !isExist { //不存在
            socketlisten, err := NewCamerSocketListen(deliver.PushPull, id, url)
        url := fmt.Sprintf("ipc:///tmp/%s.ipc", id)
        socket, err := util.NewSocketListen(int(deliver.PushPull), url)
            if err != nil {
                logger.Error("create socket error")
                return 
            }
        SocketManage[id] = socket
        
            go Recv(socketlisten, remote)
        go Recv(socket)
    }
}
func deleteCameraRecvServer(id string) {
    if _, isExist := SocketManage[id]; isExist { //存在
        SocketManage[id].Cancel()
        delete(SocketManage, id)
        logger.Info("删除server sdk: ", id)
        }
}
//动态处理
func AutoDelCamera(cameraflag chan bool) {
func autoUpdateCamera(cameraflag chan bool) {
    for _ = range cameraflag {
        logger.Info("test autodelcameraflag")
        var oldcamera []string
        var oldcameras []string
        SocketManage.Range(func(k, v interface{}) bool {
            if str, ok := k.(string); ok {
                oldcamera = append(oldcamera, str)
        for key := range SocketManage {
            oldcameras = append(oldcameras, key)
            }
            return true
        })
        var newcamera []string
        var newcameras []string
        for _, camnew := range util.CameraIds {
            newcamera = append(newcamera, camnew.Id)
            newcameras = append(newcameras, camnew.Id)
        }
        newcamera = append(newcamera, "virtual-faceextract-sdk-pull_2")
        // 手动添加的全部加上
        for _, recvTopic := range innerRecvTopic {
            newcameras = append(newcameras, recvTopic)
        }
        cameraChanDel := util.Difference(oldcamera, newcamera)
        logger.Info(cameraChanDel)
        cameraListUpdate := util.Difference(oldcameras, newcameras)
        logger.Info(cameraListUpdate)
        for key, op := range cameraChanDel {
        for key, op := range cameraListUpdate {
            if op == "add" {
               CreateCamera(key, "camera")
               logger.Info("add new camera id=========================", key)
                createCameraRecvServerAndListen(key)
            } else {
                if sock, ok := SocketManage.Load(key); ok {
                    if socket, sok := sock.(SocketContext); sok {
                        socket.Cancel()
                        SocketManage.Delete(key)
                    }
                }
                logger.Info("删除camera server : ", key)
                deleteCameraRecvServer(key)
            }
        }
    }
}
// create server
func NewCamerSocketListen(mode int, cameraid string, url string) (socket SocketContext, err error) {
    ctx, cancel := context.WithCancel(context.Background())
    socket.Context = ctx
    socket.Cancel = cancel
    socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
    logger.Info("new socket.Sock: ", socket.Sock)
    if socket.Sock == nil {
        return socket, errors.New("create listen error")
    }
    SocketManage.Store(cameraid, socket)
    return socket, nil
}
func Recv(socket SocketContext, remote string ) {
func Recv(socket util.SocketContext) {
    var recvmessage []byte
    var imagemsg  protomsg.Image
@@ -144,24 +133,21 @@
               logger.Error("recv msg is not protomsgImage") 
               continue
            }
            switch remote {
               case "camera":
                    for _, taskid := range GetAlltask(imagemsg.Cid) {
                        //time.Sleep(5 * time.Second)
                        logger.Debug("id: ", imagemsg.Cid, " taskid: ", taskid)
                        Taskdolist(imagemsg.Cid, "", taskid, recvmessage)
            if faceExtractWebCID == imagemsg.Cid { //以图搜图
                doTaskList(imagemsg.Cid, "", faceExtractWebTaskID, recvmessage)
            } else {
                taskIDs := GetAllTaskByID(imagemsg.Cid)
                for _, taskID := range taskIDs {
                    logger.Debug("id: ", imagemsg.Cid, " taskid: ", taskID)
                    doTaskList(imagemsg.Cid, "", taskID, recvmessage)
                    }
               case  "web":
                        logger.Debug("id: ", imagemsg.Cid , " taskid: ", "92496BDF-2BFA-98F2-62E8-96DD9866ABD2")
                        Taskdolist(imagemsg.Cid,"", "92496BDF-2BFA-98F2-62E8-96DD9866ABD2", recvmessage)
           }
        }
    }
}
//   据cid 获取 所有的任务
func GetAlltask(cid string) (tasks []string) {
func GetAllTaskByID(cid string) (tasks []string) {
    for _, camsingle := range util.CameraTasks {
        if cid == camsingle.Camera.Id {
            for _, tasksingle := range camsingle.Tasks {
@@ -173,18 +159,18 @@
    return
}
func Taskdolist(cid string, caddr string,  taskid string, data []byte) {
func doTaskList(cid string, caddr string, taskid string, data []byte) {
    //  数据加工(打标签)
    logger.Debug("taskid: ",taskid, "has ", len(data), "data[]byte")
    sdkmsg := sdk.SdkData(cid, caddr, taskid, data)
    sdkmsg := sdk.ToSdkMsg(cid, caddr, taskid, data)
    if sdkmsg.Tasklab == nil {
        logger.Error(cid, " not have taskid: ", taskid )
        return
    }
    //  计算分发的主题
    SendTopic := sdk.SdkSendTopic(sdkmsg)
    SendTopic := sdk.GetSdkSendTopic(sdkmsg)
    logger.Debug(SendTopic)
    if _, ok := sdk.SdkMap[SendTopic]; ok {
        sdk.SdkMap[SendTopic] <- sdkmsg
sdk/sdk.go
@@ -1,14 +1,11 @@
package sdk
import (
    "context"
    "errors"
    "fmt"
    //    "taskpubsub/httpclient"
    "github.com/gogo/protobuf/proto"
    "taskpubsub/tasktag"
    "taskpubsub/util"
    "github.com/gogo/protobuf/proto"
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/deliver.git"
@@ -16,97 +13,108 @@
)
const (
    postPull = "_1.ipc"
    postPush = "_2.ipc"
    postPush = "_1.ipc"
    postPull = "_2.ipc"
)
var SocketManage = make(map[string]SocketContext)
var SocketManage = make(map[string]util.SocketContext)
var SdkMap = make(map[string]chan protomsg.SdkMessage)
type SocketContext struct {
    Sock    deliver.Deliver
    Context context.Context
    Cancel  context.CancelFunc
var innerRecvTopic = []string{
    "facedetect-sdk-no-track", //to sdk-no-track 以图搜图
}
var innerSendTopic = []string{
    "facedetect-sdk-no-track",      //to sdk-no-track 以图搜图
    "virtual-faceextract-sdk-pull", //to web 以图搜图
}
func initInnerTopic() {
    for _, sendTopic := range innerSendTopic {
        createSdkSendServerAndListen(sendTopic)
    }
    for _, recvTopic := range innerRecvTopic {
        createSdkRecvServerAndListen(recvTopic)
    }
}
func Init() {
    logger.Info("============= init sdk info =====================")
    for _, sdkid := range util.Sdklist { // 创建sdk server
        CreatesdkTopicandServer(sdkid)
        createSdkTopicAndServer(sdkid)
        logger.Info()
    }
    // 手动输入的主题
    // sdk-no-track
    CreatesdkTopicandServer("facedetect-sdk-no-track")
    logger.Info()
    initInnerTopic()
    // es
    SdkMap["es"] = make(chan protomsg.SdkMessage)
    logger.Info("create es channel: ")
    go DealEsTopic()
    // web
    SdkMap["virtual-faceextract-sdk-pull"] = make(chan protomsg.SdkMessage)
    logger.Info("create virtual-faceextract-sdk-pull")
    Createwebserver("virtual-faceextract-sdk-pull")
    go Dealextern()
    go AutoDelSdk(util.Sdkflag)
    go autoUpdateSdk(util.Sdkflag)
}
func CreatesdkTopicandServer(sdkid string) {
func createSdkTopicAndServer(sdkid string) {
    createSdkSendServerAndListen(sdkid)
    createSdkRecvServerAndListen(sdkid)
}
    SdkMap[sdkid] = make(chan protomsg.SdkMessage)
    logger.Info("create sdk channel:  ", sdkid)
func deleteSdkTopicAndServer(id string) {
    if _, isExist := SdkMap[id]; isExist { //存在
        close(SdkMap[id])
        delete(SdkMap, id)
        logger.Info("删除主题 sdk: ", id)
    }
    url := fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPull)
    socketser, err := NewSdkSocketListen(deliver.PushPull, sdkid, url)
    if _, isExist := SocketManage[id]; isExist { //存在
        SocketManage[id].Cancel()
        delete(SocketManage, id)
        logger.Info("删除server sdk: ", id)
    }
}
func createSdkSendServerAndListen(id string) {
    if _, isExist := SdkMap[id]; !isExist { //不存在
        SdkMap[id] = make(chan protomsg.SdkMessage)
        logger.Info("create", id)
    }
    url := fmt.Sprintf("ipc:///tmp/%s%s", id, postPush)
    socket, err := util.NewSocketListen(int(deliver.PushPull), url)
    if err != nil {
        delete(SdkMap, sdkid)
        logger.Error(sdkid, "create server error!")
        delete(SdkMap, id)
        logger.Error(id, "create socket error!")
        return
    }
    SocketManage[id] = socket
    go Send(id, socket, SdkMap[id])
}
    go Send(sdkid, socketser, SdkMap[sdkid])
func createSdkRecvServerAndListen(id string) {
    if _, isExist := SdkMap[id]; !isExist { //不存在
        SdkMap[id] = make(chan protomsg.SdkMessage)
        logger.Info("create", id)
    }
    url = fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPush)
    socketdial, err := NewSdkSocketListen(deliver.PushPull, sdkid, url)
    url := fmt.Sprintf("ipc:///tmp/%s%s", id, postPull)
    socket, err := util.NewSocketListen(int(deliver.PushPull), url)
    if err != nil {
        delete(SdkMap, sdkid)
        logger.Error(sdkid, "create dial error!")
        delete(SdkMap, id)
        logger.Error(id, "create socket error!")
        return
    }
    go Recv(socketdial)
    SocketManage[id] = socket
}
// web 接受端处理
func Createwebserver(webid string ){
    url := fmt.Sprintf("ipc:///tmp/%s%s", webid, postPull)
    socketser, err := NewSdkSocketListen(deliver.PushPull, webid, url)
    if err != nil {
        logger.Error(webid, "create server error!")
        return
    }
    go Send(webid, socketser, SdkMap[webid])
}
func DeletesdkTopicandServer(sdkid string) {
    close(SdkMap[sdkid])
    delete(SdkMap, sdkid)
    logger.Info("删除主题 sdk: ", sdkid)
    SocketManage[sdkid].Cancel()
    delete(SocketManage, sdkid)
    logger.Info("删除server sdk: ", sdkid)
    go Recv(socket)
}
//单独处理   es 主题的情况
func Dealextern() {
func DealEsTopic() {
    for {
        select {
            case <-SdkMap["es"]:
@@ -116,46 +124,47 @@
}
//动态处理
func AutoDelSdk(sdkflag chan bool) {
func autoUpdateSdk(sdkflag chan bool) {
    for _ = range sdkflag {
        logger.Info("test autodelsdk")
        var oldSdk []string
        for key, _ := range SdkMap {
            oldSdk = append(oldSdk, key)
        var oldSdkList []string
        for key := range SdkMap {
            oldSdkList = append(oldSdkList, key)
        }
        newSdkList := util.Sdklist
        // 手动添加的全部加上
        util.Sdklist = append(util.Sdklist, "es")
        util.Sdklist = append(util.Sdklist, "virtual-faceextract-sdk-pull")
        util.Sdklist = append(util.Sdklist, "facedetect-sdk-no-track")
        for _, sendTopic := range innerSendTopic {
            newSdkList = append(newSdkList, sendTopic)
        }
        for _, recvTopic := range innerRecvTopic {
            newSdkList = append(newSdkList, recvTopic)
        }
        sdkChanDel := util.Difference(oldSdk, util.Sdklist)
        logger.Info(sdkChanDel)
        sdkListUpdate := util.Difference(oldSdkList, newSdkList)
        logger.Info(sdkListUpdate)
        for key, op := range sdkChanDel {
        for key, op := range sdkListUpdate {
            if op == "add" {
                CreatesdkTopicandServer(key)
                createSdkTopicAndServer(key)
            } else {
                DeletesdkTopicandServer(key)
                logger.Info("删除主题 sdk: ", key)
                deleteSdkTopicAndServer(key)
            }
        }
    }
}
//sdk数据 加工器
func SdkData(cid string,caddr string, taskid string, data []byte) protomsg.SdkMessage {
func ToSdkMsg(cid string, caddr string, taskid string, data []byte) protomsg.SdkMessage {
    var sdkmsg = protomsg.SdkMessage{}
    sdkmsg.Cid = cid
    sdkmsg.Caddr =caddr 
    if val, ok := tasktag.TaskMapLab.Load(taskid); !ok {
    if val, ok := tasktag.TaskLabelMap.Load(taskid); !ok {
        sdkmsg.Tasklab = nil
        return sdkmsg
    } else {
        sdkmsg.Tasklab = val.(*protomsg.TaskLabel)
        sdkmsg.Data = data
    }
@@ -163,51 +172,17 @@
}
//sdk数据分发器
func SdkSendTopic(sdkmsg protomsg.SdkMessage) (sdksend string) {
func GetSdkSendTopic(sdkmsg protomsg.SdkMessage) (sendTopic string) {
    if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkinfos) {
        sdksend = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Ipcid
        sendTopic = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Ipcid
    } else {
        sdksend = "es"
        sendTopic = "es"
    }
    logger.Debug("分发的主题: ", sdksend , "位置:", int(sdkmsg.Tasklab.Index)+1,"/",  len(sdkmsg.Tasklab.Sdkinfos))
    logger.Debug("分发的主题: ", sendTopic, "位置:", int(sdkmsg.Tasklab.Index)+1, "/", len(sdkmsg.Tasklab.Sdkinfos))
    return
}
// create server
func NewSdkSocketListen(mode int, sdkid string, url string) (socket SocketContext, err error) {
    logger.Info("url is: ", url)
    ctx, cancel := context.WithCancel(context.Background())
    socket.Context = ctx
    socket.Cancel = cancel
    socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
    if socket.Sock == nil {
        return socket, errors.New("create listen error")
    }
    SocketManage[sdkid] = socket
    return socket, nil
}
func NewSdkSocketDial(mode int, sdkid string, url string) (sid string, socket SocketContext, err error) {
    logger.Info("url is: ", url)
    ctx, cancel := context.WithCancel(context.Background())
    socket.Context = ctx
    socket.Cancel = cancel
    socket.Sock = deliver.NewClient(deliver.Mode(mode), url)
    if socket.Sock == nil {
        return sdkid, socket, errors.New("create listen error")
    }
    SocketManage[sdkid] = socket
    return sdkid, socket, nil
}
func Recv(socket SocketContext) {
func Recv(socket util.SocketContext) {
    var repsdkmsg = protomsg.SdkMessage{}
    for {
        select {
@@ -225,14 +200,14 @@
                }
                repsdkmsg.Tasklab.Index++
                //调用计算函数, 分发给下一个主题
                nexttopic := SdkSendTopic(repsdkmsg)
                nexttopic := GetSdkSendTopic(repsdkmsg)
                SdkMap[nexttopic] <- repsdkmsg
            }
        }
    }
}
func Send(sdkid string, socket SocketContext, in chan protomsg.SdkMessage) {
func Send(sdkid string, socket util.SocketContext, in chan protomsg.SdkMessage) {
    for {
        select {
@@ -248,7 +223,7 @@
                }
                
                if err := socket.Sock.Send(data); err != nil {
                    logger.Error("failed send")
                    logger.Error("failed send:sdkid=", sdkid)
                    continue
                }
                logger.Debug(sdkid, " send success: ", len(data))
taskpubsub
Binary files differ
tasktag/tasktag.go
@@ -4,27 +4,28 @@
    "sync"
    "basic.com/pubsub/protomsg.git"
    "taskpubsub/util"
    "taskpubsub/logger"
    "taskpubsub/util"
)
var TaskMapLab sync.Map
var TaskLabelMap sync.Map
func Init() {
    logger.Info("============= init tasktag info =====================")
    GenTaskMap()
    genTaskLabelMap()
    go func(taskflag chan bool) {
        for _ = range taskflag {
            GenTaskMap()
            genTaskLabelMap()
            logger.Info("update task finished!")
        }
    }(util.TaskSdkflag)
}
func GenTaskMap() {
    var tls []protomsg.TaskLabel
// 从sqlite 接口拿到所有的任务, 每一个任务都有自己的几个算法
//以 taskid 作为key, 对应的算法组合作为 value
func genTaskLabelMap() {
    var newtls []protomsg.TaskLabel
    for _, taskSdk := range util.TaskSdks {
        var tl protomsg.TaskLabel
@@ -39,25 +40,23 @@
            tl.Sdkinfos = append(tl.Sdkinfos, sdkinfowithtask)
        }
        tl.Index = int32(0)
        tls = append(tls, tl)
        newtls = append(newtls, tl)
    }
    GenTasklab(tls)
    TaskMapLab.Range(func(k, v interface{}) bool {
    updateTaskLabelMap(newtls)
    TaskLabelMap.Range(func(k, v interface{}) bool {
        logger.Info(k, v)
        return true
    })
}
// 从sqlite 接口拿到所有的任务, 每一个任务都有自己的几个算法
//以 taskid 作为key, 对应的算法组合作为 value
func GenTasklab(tasklab []protomsg.TaskLabel) {
    TaskMapLab.Range(func(key interface{}, value interface{}) bool {
        TaskMapLab.Delete(key)
func updateTaskLabelMap(taskLabel []protomsg.TaskLabel) {
    TaskLabelMap.Range(func(key interface{}, value interface{}) bool {
        TaskLabelMap.Delete(key)
        return true
    })
    for _, value := range tasklab {
    for _, value := range taskLabel {
        pv := value
        TaskMapLab.Store(value.Taskid, &pv)
        TaskLabelMap.Store(value.Taskid, &pv)
    }
}
util/sqlite.go
@@ -1,12 +1,11 @@
package util
import (
    "flag"
    "basic.com/pubsub/protomsg.git"
    "basic.com/dbapi.git"
    "github.com/gogo/protobuf/proto"
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/gopherdiscovery.git"
    "flag"
    "github.com/gogo/protobuf/proto"
    "taskpubsub/logger"
)
@@ -42,9 +41,7 @@
var newsdkmsg = &protomsg.DbChangeMessage{}
func processinit(initchan chan bool) {
func initDbData(initchan chan bool) {
    CameraIds = camval.FindAll()
    logger.Info("==============camera camera with task ================")
@@ -58,7 +55,7 @@
    initchan <- true
}
func Getdata(opt []byte) {
func updateDbData(opt []byte) {
    if err := proto.Unmarshal(opt, newsdkmsg); err != nil {
        logger.Error("publichshMessage ", err)
        return
@@ -96,14 +93,14 @@
func Init(initchan chan bool) {
    dbapi.Init(*dbip, *dbport)
    clientOne, _ := gopherdiscovery.ClientWithSub(*urlServer, *urlPubSub, "ip:local")
    recvinit := clientOne.HeartBeatMsg()
    client, _ := gopherdiscovery.ClientWithSub(*urlServer, *urlPubSub, "ip:local")
    recvinit := client.HeartBeatMsg()
    _ = <-recvinit
    processinit(initchan)
    initDbData(initchan)
    peers, _ := clientOne.Peers()
    peers, _ := client.Peers()
    for x := range peers {
        Getdata(x)
        updateDbData(x)
    }
}
util/util.go
@@ -1,11 +1,18 @@
package util
import(
    "basic.com/valib/deliver.git"
    "context"
    "errors"
        "github.com/pierrec/lz4"
        "taskpubsub/logger"
       )
type SocketContext struct {
    Sock    deliver.Deliver
    Context context.Context
    Cancel  context.CancelFunc
}
//  1. oldstring element is not in new  : abandon(delete)
//  2. new element is not in oldstring  : add(add)
@@ -66,3 +73,35 @@
     out = out[:n] // compressed data
         return out, nil
}
// create server
func NewSocketListen(mode int, url string) (socket SocketContext, err error) {
    logger.Info("url is: ", url)
    ctx, cancel := context.WithCancel(context.Background())
    socket.Context = ctx
    socket.Cancel = cancel
    socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
    if socket.Sock == nil {
        return socket, errors.New("create listen error")
    }
    return socket, nil
}
func NewSocketDial(mode int, url string) (socket SocketContext, err error) {
    logger.Info("url is: ", url)
    ctx, cancel := context.WithCancel(context.Background())
    socket.Context = ctx
    socket.Cancel = cancel
    socket.Sock = deliver.NewClient(deliver.Mode(mode), url)
    if socket.Sock == nil {
        return socket, errors.New("create listen error")
    }
    return socket, nil
}