chenshijun
2019-07-27 b5560d3cae1164f016ca9339592eda6b9008fb9f
重构代码
6个文件已修改
636 ■■■■ 已修改文件
camera/camera.go 194 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sdk/sdk.go 265 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
taskpubsub 补丁 | 查看 | 原始文档 | blame | 历史
tasktag/tasktag.go 45 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
util/sqlite.go 43 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
util/util.go 89 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
camera/camera.go
@@ -1,29 +1,34 @@
package camera
import (
    "errors"
    "basic.com/valib/deliver.git"
    "taskpubsub/logger"
    //"sync"
    "taskpubsub/logger"
    "basic.com/pubsub/protomsg.git"
    "github.com/gogo/protobuf/proto"
    "taskpubsub/sdk"
    "taskpubsub/util"
    "github.com/gogo/protobuf/proto"
    "basic.com/pubsub/protomsg.git"
    "context"
    "fmt"
    "sync"
    //"time"
)
var SocketManage sync.Map
const (
    faceExtractWebCID    = "virtual-face-extract-web-camera-id"
    faceExtractWebTaskID = "92496BDF-2BFA-98F2-62E8-96DD9866ABD2"
)
type SocketContext struct {
    Sock    deliver.Deliver
    Context context.Context
    Cancel  context.CancelFunc
var SocketManage = make(map[string]util.SocketContext)
var innerRecvTopic = []string{
    "virtual-faceextract-sdk-pull_2", //to web 以图搜图
}
func initInnerTopic() {
    for _, recvTopic := range innerRecvTopic {
        createCameraRecvServerAndListen(recvTopic)
    }
}
func Init() {
@@ -33,96 +38,80 @@
        logger.Info()
    }
    // 摄像机初始化
    // 摄像机初始化
    for _, cam := range util.CameraIds {
        CreateCamera(cam.Id, "camera")
        createCameraRecvServerAndListen(cam.Id)
    }
    //  web端初始化
    CreateCamera("virtual-faceextract-sdk-pull_2" , "web")
    // 手动输入的主题
    initInnerTopic()
    go AutoDelCamera(util.Cameraflag)
    go autoUpdateCamera(util.Cameraflag)
}
// camera 接受数据
func CreateCamera(id string, remote string) {
        if _, ok := SocketManage.Load(id); !ok {
            url := fmt.Sprintf("ipc:///tmp/%s.ipc", id)
func createCameraRecvServerAndListen(id string) {
    if _, isExist := SocketManage[id]; !isExist { //不存在
            socketlisten, err := NewCamerSocketListen(deliver.PushPull, id, url)
            if err != nil {
                logger.Error("create socket error")
                return
            }
            go Recv(socketlisten, remote)
        url := fmt.Sprintf("ipc:///tmp/%s.ipc", id)
        socket, err := util.NewSocketListen(int(deliver.PushPull), url)
        if err != nil {
            logger.Error("create socket error")
            return
        }
        SocketManage[id] = socket
        go Recv(socket)
    }
}
func deleteCameraRecvServer(id string) {
    if _, isExist := SocketManage[id]; isExist { //存在
        SocketManage[id].Cancel()
        delete(SocketManage, id)
        logger.Info("删除server sdk: ", id)
    }
}
//动态处理
func AutoDelCamera(cameraflag chan bool) {
func autoUpdateCamera(cameraflag chan bool) {
    for _ = range cameraflag {
        logger.Info("test autodelcameraflag")
        var oldcamera []string
        var oldcameras []string
        SocketManage.Range(func(k, v interface{}) bool {
            if str, ok := k.(string); ok {
                oldcamera = append(oldcamera, str)
            }
            return true
        })
        for key := range SocketManage {
            oldcameras = append(oldcameras, key)
        }
        var newcamera []string
        for _, camnew := range util.CameraIds {
            newcamera = append(newcamera, camnew.Id)
        }
        newcamera = append(newcamera, "virtual-faceextract-sdk-pull_2")
        var newcameras []string
        for _, camnew := range util.CameraIds {
            newcameras = append(newcameras, camnew.Id)
        }
        // 手动添加的全部加上
        for _, recvTopic := range innerRecvTopic {
            newcameras = append(newcameras, recvTopic)
        }
        cameraChanDel := util.Difference(oldcamera, newcamera)
        logger.Info(cameraChanDel)
        cameraListUpdate := util.Difference(oldcameras, newcameras)
        logger.Info(cameraListUpdate)
        for key, op := range cameraChanDel {
        for key, op := range cameraListUpdate {
            if op == "add" {
               CreateCamera(key, "camera")
               logger.Info("add new camera id=========================", key)
                createCameraRecvServerAndListen(key)
            } else {
                if sock, ok := SocketManage.Load(key); ok {
                    if socket, sok := sock.(SocketContext); sok {
                        socket.Cancel()
                        SocketManage.Delete(key)
                    }
                }
                logger.Info("删除camera server : ", key)
                deleteCameraRecvServer(key)
            }
        }
    }
}
// create server
func NewCamerSocketListen(mode int, cameraid string, url string) (socket SocketContext, err error) {
    ctx, cancel := context.WithCancel(context.Background())
func Recv(socket util.SocketContext) {
    socket.Context = ctx
    socket.Cancel = cancel
    socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
    logger.Info("new socket.Sock: ", socket.Sock)
    if socket.Sock == nil {
        return socket, errors.New("create listen error")
    }
    SocketManage.Store(cameraid, socket)
    return socket, nil
}
func Recv(socket SocketContext, remote string ) {
    var recvmessage []byte
    var imagemsg  protomsg.Image
    var err error
    var recvmessage []byte
    var imagemsg protomsg.Image
    var err error
    for {
        select {
        case <-socket.Context.Done():
@@ -134,34 +123,31 @@
                continue
            }
            unmsg, err := util.UnCompress(recvmessage)
            if err != nil {
                logger.Error(err)
                continue
            }
            unmsg, err := util.UnCompress(recvmessage)
            if err != nil {
                logger.Error(err)
                continue
            }
            if  err := proto.Unmarshal(unmsg,&imagemsg);  err != nil {
               logger.Error("recv msg is not protomsgImage")
               continue
            }
            switch remote {
               case "camera":
                    for _, taskid := range GetAlltask(imagemsg.Cid) {
                        //time.Sleep(5 * time.Second)
                        logger.Debug("id: ", imagemsg.Cid, " taskid: ", taskid)
                        Taskdolist(imagemsg.Cid, "", taskid, recvmessage)
                    }
               case  "web":
                        logger.Debug("id: ", imagemsg.Cid , " taskid: ", "92496BDF-2BFA-98F2-62E8-96DD9866ABD2")
                        Taskdolist(imagemsg.Cid,"", "92496BDF-2BFA-98F2-62E8-96DD9866ABD2", recvmessage)
           }
            if err := proto.Unmarshal(unmsg, &imagemsg); err != nil {
                logger.Error("recv msg is not protomsgImage")
                continue
            }
            if faceExtractWebCID == imagemsg.Cid { //以图搜图
                doTaskList(imagemsg.Cid, "", faceExtractWebTaskID, recvmessage)
            } else {
                taskIDs := GetAllTaskByID(imagemsg.Cid)
                for _, taskID := range taskIDs {
                    logger.Debug("id: ", imagemsg.Cid, " taskid: ", taskID)
                    doTaskList(imagemsg.Cid, "", taskID, recvmessage)
                }
            }
        }
    }
}
//   据cid 获取 所有的任务
func GetAlltask(cid string) (tasks []string) {
func GetAllTaskByID(cid string) (tasks []string) {
    for _, camsingle := range util.CameraTasks {
        if cid == camsingle.Camera.Id {
            for _, tasksingle := range camsingle.Tasks {
@@ -173,22 +159,22 @@
    return
}
func Taskdolist(cid string, caddr string,  taskid string, data []byte) {
func doTaskList(cid string, caddr string, taskid string, data []byte) {
    //  数据加工(打标签)
    logger.Debug("taskid: ",taskid, "has ", len(data), "data[]byte")
    sdkmsg := sdk.SdkData(cid, caddr, taskid, data)
    logger.Debug("taskid: ", taskid, "has ", len(data), "data[]byte")
    sdkmsg := sdk.ToSdkMsg(cid, caddr, taskid, data)
    if sdkmsg.Tasklab == nil {
        logger.Error(cid, " not have taskid: ", taskid )
        logger.Error(cid, " not have taskid: ", taskid)
        return
    }
    //  计算分发的主题
    SendTopic := sdk.SdkSendTopic(sdkmsg)
    logger.Debug(SendTopic)
    SendTopic := sdk.GetSdkSendTopic(sdkmsg)
    logger.Debug(SendTopic)
    if _, ok := sdk.SdkMap[SendTopic]; ok {
        sdk.SdkMap[SendTopic] <- sdkmsg
         logger.Debug("dispute sendtopic success", SendTopic)
        logger.Debug("dispute sendtopic success", SendTopic)
    } else {
        logger.Debug("分发的主题不存在")
    }
sdk/sdk.go
@@ -1,161 +1,170 @@
package sdk
import (
    "context"
    "errors"
    "fmt"
    //    "taskpubsub/httpclient"
    "github.com/gogo/protobuf/proto"
    "taskpubsub/tasktag"
    "taskpubsub/util"
    "github.com/gogo/protobuf/proto"
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/deliver.git"
    "taskpubsub/logger"
    "taskpubsub/logger"
)
const (
    postPull = "_1.ipc"
    postPush = "_2.ipc"
    postPush = "_1.ipc"
    postPull = "_2.ipc"
)
var SocketManage = make(map[string]SocketContext)
var SocketManage = make(map[string]util.SocketContext)
var SdkMap = make(map[string]chan protomsg.SdkMessage)
type SocketContext struct {
    Sock    deliver.Deliver
    Context context.Context
    Cancel  context.CancelFunc
var innerRecvTopic = []string{
    "facedetect-sdk-no-track", //to sdk-no-track 以图搜图
}
var innerSendTopic = []string{
    "facedetect-sdk-no-track",      //to sdk-no-track 以图搜图
    "virtual-faceextract-sdk-pull", //to web 以图搜图
}
func initInnerTopic() {
    for _, sendTopic := range innerSendTopic {
        createSdkSendServerAndListen(sendTopic)
    }
    for _, recvTopic := range innerRecvTopic {
        createSdkRecvServerAndListen(recvTopic)
    }
}
func Init() {
    logger.Info("============= init sdk info =====================")
    for _, sdkid := range util.Sdklist { // 创建sdk server
        CreatesdkTopicandServer(sdkid)
        createSdkTopicAndServer(sdkid)
        logger.Info()
    }
    // 手动输入的主题
    // sdk-no-track
    CreatesdkTopicandServer("facedetect-sdk-no-track")
    logger.Info()
    // 手动输入的主题
    initInnerTopic()
    // es
    // es
    SdkMap["es"] = make(chan protomsg.SdkMessage)
    logger.Info("create es channel: ")
    go DealEsTopic()
    // web
    SdkMap["virtual-faceextract-sdk-pull"] = make(chan protomsg.SdkMessage)
    logger.Info("create virtual-faceextract-sdk-pull")
    Createwebserver("virtual-faceextract-sdk-pull")
    go Dealextern()
    go AutoDelSdk(util.Sdkflag)
    go autoUpdateSdk(util.Sdkflag)
}
func CreatesdkTopicandServer(sdkid string) {
    SdkMap[sdkid] = make(chan protomsg.SdkMessage)
    logger.Info("create sdk channel:  ", sdkid)
    url := fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPull)
    socketser, err := NewSdkSocketListen(deliver.PushPull, sdkid, url)
    if err != nil {
        delete(SdkMap, sdkid)
        logger.Error(sdkid, "create server error!")
        return
    }
    go Send(sdkid, socketser, SdkMap[sdkid])
    url = fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPush)
    socketdial, err := NewSdkSocketListen(deliver.PushPull, sdkid, url)
    if err != nil {
        delete(SdkMap, sdkid)
        logger.Error(sdkid, "create dial error!")
        return
    }
    go Recv(socketdial)
func createSdkTopicAndServer(sdkid string) {
    createSdkSendServerAndListen(sdkid)
    createSdkRecvServerAndListen(sdkid)
}
// web 接受端处理
func Createwebserver(webid string ){
    url := fmt.Sprintf("ipc:///tmp/%s%s", webid, postPull)
    socketser, err := NewSdkSocketListen(deliver.PushPull, webid, url)
    if err != nil {
        logger.Error(webid, "create server error!")
        return
func deleteSdkTopicAndServer(id string) {
    if _, isExist := SdkMap[id]; isExist { //存在
        close(SdkMap[id])
        delete(SdkMap, id)
        logger.Info("删除主题 sdk: ", id)
    }
    go Send(webid, socketser, SdkMap[webid])
    if _, isExist := SocketManage[id]; isExist { //存在
        SocketManage[id].Cancel()
        delete(SocketManage, id)
        logger.Info("删除server sdk: ", id)
    }
}
func DeletesdkTopicandServer(sdkid string) {
    close(SdkMap[sdkid])
    delete(SdkMap, sdkid)
    logger.Info("删除主题 sdk: ", sdkid)
func createSdkSendServerAndListen(id string) {
    if _, isExist := SdkMap[id]; !isExist { //不存在
        SdkMap[id] = make(chan protomsg.SdkMessage)
        logger.Info("create", id)
    }
    SocketManage[sdkid].Cancel()
    delete(SocketManage, sdkid)
    logger.Info("删除server sdk: ", sdkid)
    url := fmt.Sprintf("ipc:///tmp/%s%s", id, postPush)
    socket, err := util.NewSocketListen(int(deliver.PushPull), url)
    if err != nil {
        delete(SdkMap, id)
        logger.Error(id, "create socket error!")
        return
    }
    SocketManage[id] = socket
    go Send(id, socket, SdkMap[id])
}
func createSdkRecvServerAndListen(id string) {
    if _, isExist := SdkMap[id]; !isExist { //不存在
        SdkMap[id] = make(chan protomsg.SdkMessage)
        logger.Info("create", id)
    }
    url := fmt.Sprintf("ipc:///tmp/%s%s", id, postPull)
    socket, err := util.NewSocketListen(int(deliver.PushPull), url)
    if err != nil {
        delete(SdkMap, id)
        logger.Error(id, "create socket error!")
        return
    }
    SocketManage[id] = socket
    go Recv(socket)
}
//单独处理   es 主题的情况
func Dealextern() {
    for {
        select {
            case <-SdkMap["es"]:
                //logger.Info("es finanl sdk!")
        }
    }
func DealEsTopic() {
    for {
        select {
        case <-SdkMap["es"]:
            //logger.Info("es finanl sdk!")
        }
    }
}
//动态处理
func AutoDelSdk(sdkflag chan bool) {
func autoUpdateSdk(sdkflag chan bool) {
    for _ = range sdkflag {
        logger.Info("test autodelsdk")
        var oldSdk []string
        for key, _ := range SdkMap {
            oldSdk = append(oldSdk, key)
        var oldSdkList []string
        for key := range SdkMap {
            oldSdkList = append(oldSdkList, key)
        }
        // 手动添加的全部加上
        util.Sdklist = append(util.Sdklist, "es")
        util.Sdklist = append(util.Sdklist, "virtual-faceextract-sdk-pull")
        util.Sdklist = append(util.Sdklist, "facedetect-sdk-no-track")
        newSdkList := util.Sdklist
        sdkChanDel := util.Difference(oldSdk, util.Sdklist)
        logger.Info(sdkChanDel)
        // 手动添加的全部加上
        for _, sendTopic := range innerSendTopic {
            newSdkList = append(newSdkList, sendTopic)
        }
        for _, recvTopic := range innerRecvTopic {
            newSdkList = append(newSdkList, recvTopic)
        }
        for key, op := range sdkChanDel {
        sdkListUpdate := util.Difference(oldSdkList, newSdkList)
        logger.Info(sdkListUpdate)
        for key, op := range sdkListUpdate {
            if op == "add" {
                CreatesdkTopicandServer(key)
                createSdkTopicAndServer(key)
            } else {
                DeletesdkTopicandServer(key)
                logger.Info("删除主题 sdk: ", key)
                deleteSdkTopicAndServer(key)
            }
        }
    }
}
//sdk数据 加工器
func SdkData(cid string,caddr string, taskid string, data []byte) protomsg.SdkMessage {
func ToSdkMsg(cid string, caddr string, taskid string, data []byte) protomsg.SdkMessage {
    var sdkmsg = protomsg.SdkMessage{}
    sdkmsg.Cid = cid
    sdkmsg.Caddr =caddr
    if val, ok := tasktag.TaskMapLab.Load(taskid); !ok {
    sdkmsg.Caddr = caddr
    if val, ok := tasktag.TaskLabelMap.Load(taskid); !ok {
        sdkmsg.Tasklab = nil
        return sdkmsg
    } else {
        sdkmsg.Tasklab = val.(*protomsg.TaskLabel)
        sdkmsg.Data = data
    }
@@ -163,51 +172,17 @@
}
//sdk数据分发器
func SdkSendTopic(sdkmsg protomsg.SdkMessage) (sdksend string) {
func GetSdkSendTopic(sdkmsg protomsg.SdkMessage) (sendTopic string) {
    if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkinfos) {
        sdksend = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Ipcid
        sendTopic = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Ipcid
    } else {
        sdksend = "es"
        sendTopic = "es"
    }
    logger.Debug("分发的主题: ", sdksend , "位置:", int(sdkmsg.Tasklab.Index)+1,"/",  len(sdkmsg.Tasklab.Sdkinfos))
    logger.Debug("分发的主题: ", sendTopic, "位置:", int(sdkmsg.Tasklab.Index)+1, "/", len(sdkmsg.Tasklab.Sdkinfos))
    return
}
// create server
func NewSdkSocketListen(mode int, sdkid string, url string) (socket SocketContext, err error) {
    logger.Info("url is: ", url)
    ctx, cancel := context.WithCancel(context.Background())
    socket.Context = ctx
    socket.Cancel = cancel
    socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
    if socket.Sock == nil {
        return socket, errors.New("create listen error")
    }
    SocketManage[sdkid] = socket
    return socket, nil
}
func NewSdkSocketDial(mode int, sdkid string, url string) (sid string, socket SocketContext, err error) {
    logger.Info("url is: ", url)
    ctx, cancel := context.WithCancel(context.Background())
    socket.Context = ctx
    socket.Cancel = cancel
    socket.Sock = deliver.NewClient(deliver.Mode(mode), url)
    if socket.Sock == nil {
        return sdkid, socket, errors.New("create listen error")
    }
    SocketManage[sdkid] = socket
    return sdkid, socket, nil
}
func Recv(socket SocketContext) {
func Recv(socket util.SocketContext) {
    var repsdkmsg = protomsg.SdkMessage{}
    for {
        select {
@@ -222,36 +197,36 @@
                if err != nil {
                    logger.Error("unmarshal error: ", err)
                    continue
                }
                repsdkmsg.Tasklab.Index++
                //调用计算函数, 分发给下一个主题
                nexttopic := SdkSendTopic(repsdkmsg)
                SdkMap[nexttopic] <- repsdkmsg
                }
                repsdkmsg.Tasklab.Index++
                //调用计算函数, 分发给下一个主题
                nexttopic := GetSdkSendTopic(repsdkmsg)
                SdkMap[nexttopic] <- repsdkmsg
            }
        }
    }
}
func Send(sdkid string, socket SocketContext, in chan protomsg.SdkMessage) {
func Send(sdkid string, socket util.SocketContext, in chan protomsg.SdkMessage) {
    for {
        select {
        case <-socket.Context.Done():
            logger.Info("socket is close")
            return
        case v, ok := <-in:
        case v, ok := <-in:
            if ok {
                data, err :=v.Marshal()
                data, err := v.Marshal()
                if err != nil {
                    logger.Error("proto marshal error ", err)
                    continue
                }
                }
                if err := socket.Sock.Send(data); err != nil {
                    logger.Error("failed send")
                    logger.Error("failed send:sdkid=", sdkid)
                    continue
                }
                logger.Debug(sdkid, " send success: ", len(data))
                logger.Debug(sdkid, " send success: ", len(data))
            } else {
                logger.Debug(sdkid, " 主题关闭, 关闭send()")
                return
taskpubsub
Binary files differ
tasktag/tasktag.go
@@ -4,60 +4,59 @@
    "sync"
    "basic.com/pubsub/protomsg.git"
    "taskpubsub/logger"
    "taskpubsub/util"
    "taskpubsub/logger"
)
var TaskMapLab sync.Map
var TaskLabelMap sync.Map
func Init() {
    logger.Info("============= init tasktag info =====================")
    GenTaskMap()
    genTaskLabelMap()
    go func(taskflag chan bool) {
        for _ = range taskflag {
            GenTaskMap()
            genTaskLabelMap()
            logger.Info("update task finished!")
        }
    }(util.TaskSdkflag)
}
func GenTaskMap() {
    var tls []protomsg.TaskLabel
// 从sqlite 接口拿到所有的任务, 每一个任务都有自己的几个算法
//以 taskid 作为key, 对应的算法组合作为 value
func genTaskLabelMap() {
    var newtls []protomsg.TaskLabel
    for _, taskSdk := range util.TaskSdks {
        var tl protomsg.TaskLabel
        tl.Taskid = taskSdk.Task.Taskid
        tl.Taskname = taskSdk.Task.Taskname
        tl.Taskname = taskSdk.Task.Taskname
        for _, sdkinfo := range taskSdk.Sdks {
            sdkinfowithtask := new(protomsg.SdkmsgWithTask)
            sdkinfowithtask.Ipcid = sdkinfo.IpcId
            sdkinfowithtask.Sdktype = sdkinfo.SdkType
            sdkinfowithtask.Sdkdata = make([]byte, 1)
            tl.Sdkinfos = append(tl.Sdkinfos, sdkinfowithtask)
            sdkinfowithtask := new(protomsg.SdkmsgWithTask)
            sdkinfowithtask.Ipcid = sdkinfo.IpcId
            sdkinfowithtask.Sdktype = sdkinfo.SdkType
            sdkinfowithtask.Sdkdata = make([]byte, 1)
            tl.Sdkinfos = append(tl.Sdkinfos, sdkinfowithtask)
        }
        tl.Index = int32(0)
        tls = append(tls, tl)
        newtls = append(newtls, tl)
    }
    GenTasklab(tls)
    TaskMapLab.Range(func(k, v interface{}) bool {
    updateTaskLabelMap(newtls)
    TaskLabelMap.Range(func(k, v interface{}) bool {
        logger.Info(k, v)
        return true
    })
}
// 从sqlite 接口拿到所有的任务, 每一个任务都有自己的几个算法
//以 taskid 作为key, 对应的算法组合作为 value
func GenTasklab(tasklab []protomsg.TaskLabel) {
    TaskMapLab.Range(func(key interface{}, value interface{}) bool {
        TaskMapLab.Delete(key)
func updateTaskLabelMap(taskLabel []protomsg.TaskLabel) {
    TaskLabelMap.Range(func(key interface{}, value interface{}) bool {
        TaskLabelMap.Delete(key)
        return true
    })
    for _, value := range tasklab {
    for _, value := range taskLabel {
        pv := value
        TaskMapLab.Store(value.Taskid, &pv)
        TaskLabelMap.Store(value.Taskid, &pv)
    }
}
util/sqlite.go
@@ -1,13 +1,12 @@
package util
import (
    "flag"
    "basic.com/pubsub/protomsg.git"
    "basic.com/dbapi.git"
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/gopherdiscovery.git"
    "flag"
    "github.com/gogo/protobuf/proto"
    "basic.com/valib/gopherdiscovery.git"
    "taskpubsub/logger"
    "taskpubsub/logger"
)
/*************************
@@ -25,15 +24,15 @@
var CameraTasks []protomsg.CameraAndTaskInfo
//var TaskSdks []protomsg.TaskSdkInfo
var TaskSdks []protomsg.TaskSdkRun
var TaskSdks []protomsg.TaskSdkRun
var Sdklist []string
var Sdkinfos []protomsg.Sdk
var urlServer = flag.String("urlServer","tcp://127.0.0.1:40007","heartbeat address of url server")
var urlPubSub = flag.String("urlPubsub","tcp://127.0.0.1:50007", "heartbeat pubsub address of url server")
var urlServer = flag.String("urlServer", "tcp://127.0.0.1:40007", "heartbeat address of url server")
var urlPubSub = flag.String("urlPubsub", "tcp://127.0.0.1:50007", "heartbeat pubsub address of url server")
var dbip = flag.String("dbip","127.0.0.1","address of database ip")
var dbip = flag.String("dbip", "127.0.0.1", "address of database ip")
var dbport = flag.Int("dbport", 8001, "port of database port")
var Sdkflag = make(chan bool)
@@ -42,23 +41,21 @@
var newsdkmsg = &protomsg.DbChangeMessage{}
func processinit(initchan chan bool) {
func initDbData(initchan chan bool) {
    CameraIds = camval.FindAll()
    logger.Info("==============camera camera with task ================")
    CameraTasks = camval.FindAllCameraAndTask()
    logger.Info(CameraTasks)
    logger.Info(CameraTasks)
    TaskSdks = taskapi.FindAllTaskSdkRun()
    Sdklist = sdkapi.GetAllSdkIds()
    Sdkinfos = sdkapi.FindAll("")
    Sdkinfos = sdkapi.FindAll("")
    initchan <- true
}
func Getdata(opt []byte) {
func updateDbData(opt []byte) {
    if err := proto.Unmarshal(opt, newsdkmsg); err != nil {
        logger.Error("publichshMessage ", err)
        return
@@ -69,7 +66,7 @@
        logger.Info("update camera")
        CameraIds = camval.FindAll()
        Cameraflag <- true
        logger.Info("update camera finish.")
        logger.Info("update camera finish.")
    case protomsg.TableChanged_T_CameraTask:
        logger.Info("update cameratask")
@@ -84,7 +81,7 @@
    case protomsg.TableChanged_T_Sdk:
        logger.Info("update sdk")
        Sdklist = sdkapi.GetAllSdkIds()
        Sdkinfos = sdkapi.FindAll("")
        Sdkinfos = sdkapi.FindAll("")
        Sdkflag <- true
    default:
@@ -95,15 +92,15 @@
}
func Init(initchan chan bool) {
    dbapi.Init(*dbip, *dbport)
    clientOne, _ := gopherdiscovery.ClientWithSub(*urlServer, *urlPubSub, "ip:local")
    recvinit := clientOne.HeartBeatMsg()
    dbapi.Init(*dbip, *dbport)
    client, _ := gopherdiscovery.ClientWithSub(*urlServer, *urlPubSub, "ip:local")
    recvinit := client.HeartBeatMsg()
    _ = <-recvinit
    processinit(initchan)
    initDbData(initchan)
    peers, _ := clientOne.Peers()
    peers, _ := client.Peers()
    for x := range peers {
        Getdata(x)
        updateDbData(x)
    }
}
util/util.go
@@ -1,11 +1,18 @@
package util
import(
        "github.com/pierrec/lz4"
        "taskpubsub/logger"
       )
import (
    "basic.com/valib/deliver.git"
    "context"
    "errors"
    "github.com/pierrec/lz4"
    "taskpubsub/logger"
)
type SocketContext struct {
    Sock    deliver.Deliver
    Context context.Context
    Cancel  context.CancelFunc
}
//  1. oldstring element is not in new  : abandon(delete)
//  2. new element is not in oldstring  : add(add)
@@ -41,28 +48,60 @@
// UnCompress uncompress
func UnCompress(in []byte) ([]byte, error) {
    out := make([]byte, 10*len(in))
         n, err := lz4.UncompressBlock(in, out)
         if err != nil {
             logger.Error("uncompress error: ", err)
             return nil, err
         }
     out = out[:n] // uncompressed data
     return out, nil
    out := make([]byte, 10*len(in))
    n, err := lz4.UncompressBlock(in, out)
    if err != nil {
        logger.Error("uncompress error: ", err)
        return nil, err
    }
    out = out[:n] // uncompressed data
    return out, nil
}
// Compress compress
func Compress(in []byte) ([]byte, error) {
    out := make([]byte, len(in))
         ht := make([]int, 64<<10) // buffer for the compression table
         n, err := lz4.CompressBlock(in, out, ht)
         if err != nil {
             logger.Error("compress: ", err)
                 return nil, err
         }
     if n >= len(in) {
         logger.Error("image is not compressible")
     }
     out = out[:n] // compressed data
         return out, nil
    out := make([]byte, len(in))
    ht := make([]int, 64<<10) // buffer for the compression table
    n, err := lz4.CompressBlock(in, out, ht)
    if err != nil {
        logger.Error("compress: ", err)
        return nil, err
    }
    if n >= len(in) {
        logger.Error("image is not compressible")
    }
    out = out[:n] // compressed data
    return out, nil
}
// create server
func NewSocketListen(mode int, url string) (socket SocketContext, err error) {
    logger.Info("url is: ", url)
    ctx, cancel := context.WithCancel(context.Background())
    socket.Context = ctx
    socket.Cancel = cancel
    socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
    if socket.Sock == nil {
        return socket, errors.New("create listen error")
    }
    return socket, nil
}
func NewSocketDial(mode int, url string) (socket SocketContext, err error) {
    logger.Info("url is: ", url)
    ctx, cancel := context.WithCancel(context.Background())
    socket.Context = ctx
    socket.Cancel = cancel
    socket.Sock = deliver.NewClient(deliver.Mode(mode), url)
    if socket.Sock == nil {
        return socket, errors.New("create listen error")
    }
    return socket, nil
}