554325746@qq.com
2019-07-22 40d864645ff608e3a81d115fef26bba87954be2c
add log and fix pubusb
8个文件已修改
200 ■■■■■ 已修改文件
camera/camera.go 55 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
logger/logger.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sdk/sdk.go 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tasktag/tasktag.go 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test 补丁 | 查看 | 原始文档 | blame | 历史
util/sqlite.go 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
util/util.go 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
camera/camera.go
@@ -4,6 +4,7 @@
    "errors"
    "basic.com/valib/deliver.git"
    "github.com/long/test/logger"
    "github.com/long/test/sdk"
    "github.com/long/test/util"
@@ -14,7 +15,6 @@
    "fmt"
    "sync"
    //"time"
    "os"
)
@@ -27,10 +27,10 @@
}
func Init() {
    fmt.Println("============ camera info ====================")
    logger.Info("============ camera info ====================")
    for _, cd := range util.CameraIds {
        fmt.Println(cd)
        fmt.Println()
        logger.Info(cd)
        logger.Info()
    }
    // 摄像机初始化
@@ -41,7 +41,7 @@
    //  web端初始化
    CreateCamera("virtual-faceextract-sdk-pull_2" , "web")
//    go AutoDelCamera(util.Cameraflag)
    go AutoDelCamera(util.Cameraflag)
}
// camera 接受数据
@@ -51,7 +51,7 @@
            socketlisten, err := NewCamerSocketListen(deliver.PushPull, id, url)
            if err != nil {
                fmt.Println("create socket error")
                logger.Error("create socket error")
                return 
            }
        
@@ -63,7 +63,7 @@
func AutoDelCamera(cameraflag chan bool) {
    for _ = range cameraflag {
        fmt.Println("test autodelcameraflag")
        logger.Info("test autodelcameraflag")
        var oldcamera []string
        SocketManage.Range(func(k, v interface{}) bool {
@@ -74,16 +74,20 @@
        })
        var newcamera []string
        newcamera = append(newcamera, util.Sdklist...)
        for _, camnew := range util.CameraIds {
            newcamera = append(newcamera, camnew.Id)
        }
        newcamera = append(newcamera, "virtual-faceextract-sdk-pull_2")
        cameraChanDel := util.Difference(oldcamera, newcamera)
        fmt.Println(cameraChanDel)
        logger.Info(cameraChanDel)
        for key, op := range cameraChanDel {
            if op == "add" {
               CreateCamera(key, "camera") 
               fmt.Println("add new camera id=========================")
               os.Exit(1)
               logger.Info("add new camera id=========================", key)
            } else {
                if sock, ok := SocketManage.Load(key); ok {
                    if socket, sok := sock.(SocketContext); sok {
@@ -91,7 +95,7 @@
                        SocketManage.Delete(key)
                    }
                }
                fmt.Println("删除camera server : ", key)
                logger.Info("删除camera server : ", key)
            }
        }
    }
@@ -105,7 +109,7 @@
    socket.Cancel = cancel
    socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
    fmt.Println("new socket.Sock: ", socket.Sock)
    logger.Info("new socket.Sock: ", socket.Sock)
    if socket.Sock == nil {
        return socket, errors.New("create listen error")
@@ -122,37 +126,34 @@
    for {
        select {
        case <-socket.Context.Done():
            fmt.Println("listen recv quit")
            logger.Error("listen recv quit")
            return
        default:
            if recvmessage, err = socket.Sock.Recv(); err != nil {
                fmt.Println("err is: ", err)
                //logger.Error("[camera] err is: ", err)
                continue
            }
            unmsg, err := util.UnCompress(recvmessage)
            if err != nil {
                fmt.Println(err)
                logger.Error(err)
                continue
            }
            if  err := proto.Unmarshal(unmsg,&imagemsg);  err != nil {
               fmt.Println("recv msg is not protomsgImage")
               logger.Error("recv msg is not protomsgImage")
               continue
            }
        //    fmt.Println("============== one msg input ==========")
         //   fmt.Println(imagemsg.Cid)
            switch remote {
               case "camera":
                    fmt.Printf("=== cid: has %d task\n", len(GetAlltask(imagemsg.Cid)))
                    for _, taskid := range GetAlltask(imagemsg.Cid) {
                        //time.Sleep(5 * time.Second)
                        fmt.Println("id: ", imagemsg.Cid, " taskid: ", taskid)
                        logger.Debug("id: ", imagemsg.Cid, " taskid: ", taskid)
                        Taskdolist(imagemsg.Cid, "", taskid, recvmessage)
                    }
               case  "web":
                        fmt.Println("id: ", imagemsg.Cid , " taskid: ", "92496BDF-2BFA-98F2-62E8-96DD9866ABD2")
                        logger.Debug("id: ", imagemsg.Cid , " taskid: ", "92496BDF-2BFA-98F2-62E8-96DD9866ABD2")
                        Taskdolist(imagemsg.Cid,"", "92496BDF-2BFA-98F2-62E8-96DD9866ABD2", recvmessage)
           }
        }
@@ -175,20 +176,20 @@
func Taskdolist(cid string, caddr string,  taskid string, data []byte) {
    //  数据加工(打标签)
    fmt.Printf("taskid %s: has %d data[]byte\n", taskid, len(data))
    logger.Debug("taskid: ",taskid, "has ", len(data), "data[]byte")
    sdkmsg := sdk.SdkData(cid, caddr, taskid, data)
    if sdkmsg.Tasklab == nil {
        fmt.Printf("cid:%s 没有任务%s\n", cid, taskid)
        logger.Error(cid, " not have taskid: ", taskid )
        return
    }
    //  计算分发的主题
    SendTopic := sdk.SdkSendTopic(sdkmsg)
    fmt.Println(SendTopic)
    logger.Debug(SendTopic)
    if _, ok := sdk.SdkMap[SendTopic]; ok {
        sdk.SdkMap[SendTopic] <- sdkmsg
         fmt.Println("dispute sendtopic success", SendTopic)
         logger.Debug("dispute sendtopic success", SendTopic)
    } else {
        fmt.Println("分发的主题不存在")
        logger.Debug("分发的主题不存在")
    }
}
logger/logger.go
@@ -214,4 +214,4 @@
        }
        me.fileFd = nil
    }
}
}
main.go
@@ -1,8 +1,6 @@
package main
import (
    "fmt"
    "log"
    "net/http"
    "flag"
    _ "net/http/pprof"
@@ -12,24 +10,21 @@
    "github.com/long/test/sdk"
    "github.com/long/test/tasktag"
    "github.com/long/test/util"
    //"github.com/long/test/logger"
    "github.com/long/test/logger"
    //    "github.com/long/test/camera"
    //    "github.com/long/test/sdk"
    //    "github.com/long/test/tasktag"
)
var initchan = make(chan bool)
//func init(){
//    var logFile = "./taskpubsub.log"
//    var logSaveDays    =    15
//
//    // 日志初始化
//    logger.Config(logFile, logger.DebugLevel)
//    logger.SetSaveDays(logSaveDays)
//    logger.Info("loginit success !")
//}
func init(){
    var logFile = "./taskpubsub.log"
    var logSaveDays    =    15
    // 日志初始化
    logger.Config(logFile, logger.InfoLevel)
    logger.SetSaveDays(logSaveDays)
    logger.Info("loginit success !")
}
        
func     main() {
    flag.Parse()
@@ -37,14 +32,12 @@
    // pprof 用于分析性能
    go func() {
        log.Println(http.ListenAndServe("0.0.0.0:6061", nil))
        logger.Info(http.ListenAndServe("0.0.0.0:6061", nil))
    }()
    go util.Init(initchan)
    fmt.Println("init ok !!!!, start sdk, task, camera init process ....", <-initchan)
    fmt.Println()
    fmt.Println()
    logger.Info("init ok !!!!, start sdk, task, camera init process ....", <-initchan)
    sdk.Init()        //  获取所有算法id ,建立 sdk 主题, 建立sdk server(send, recv 运行)
    tasktag.Init()   // 获取所有任务,建立任务标签, 在数据进入时, 打标签
    camera.Init()   //获取cid, taskid, sdkid ,关系
sdk/sdk.go
@@ -12,6 +12,7 @@
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/deliver.git"
    "github.com/long/test/logger"
)
const (
@@ -30,18 +31,18 @@
func Init() {
    fmt.Println("============= init sdk info =====================")
    logger.Info("============= init sdk info =====================")
    for _, sdkid := range util.Sdklist { // 创建sdk server
        CreatesdkTopicandServer(sdkid)
        fmt.Println()
        logger.Info()
    }
    // 手动输入的主题
    SdkMap["es"] = make(chan protomsg.SdkMessage)
    fmt.Println("create es channel: ")
    logger.Info("create es channel: ")
    SdkMap["virtual-faceextract-sdk-pull"] = make(chan protomsg.SdkMessage)
    fmt.Println("create virtual-faceextract-sdk-pull")
    logger.Info("create virtual-faceextract-sdk-pull")
    Createwebserver("virtual-faceextract-sdk-pull")
@@ -52,13 +53,13 @@
func CreatesdkTopicandServer(sdkid string) {
    SdkMap[sdkid] = make(chan protomsg.SdkMessage)
    fmt.Println("create sdk channel:  ", sdkid)
    logger.Info("create sdk channel:  ", sdkid)
    url := fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPull)
    socketser, err := NewSdkSocketListen(deliver.PushPull, sdkid, url)
    if err != nil {
        delete(SdkMap, sdkid)
        fmt.Println(sdkid, "create server error!")
        logger.Error(sdkid, "create server error!")
        return
    }
@@ -69,7 +70,7 @@
    socketdial, err := NewSdkSocketListen(deliver.PushPull, sdkid, url)
    if err != nil {
        delete(SdkMap, sdkid)
        fmt.Println(sdkid, "create dial error!")
        logger.Error(sdkid, "create dial error!")
        return
    }
    go Recv(socketdial)
@@ -81,7 +82,7 @@
    url := fmt.Sprintf("ipc:///tmp/%s%s", webid, postPull)
    socketser, err := NewSdkSocketListen(deliver.PushPull, webid, url)
    if err != nil {
        fmt.Println(webid, "create server error!")
        logger.Error(webid, "create server error!")
        return
    }
    go Send(webid, socketser, SdkMap[webid])
@@ -90,11 +91,11 @@
func DeletesdkTopicandServer(sdkid string) {
    close(SdkMap[sdkid])
    delete(SdkMap, sdkid)
    fmt.Println("删除主题 sdk: ", sdkid)
    logger.Info("删除主题 sdk: ", sdkid)
    SocketManage[sdkid].Cancel()
    delete(SocketManage, sdkid)
    fmt.Println("删除server sdk: ", sdkid)
    logger.Info("删除server sdk: ", sdkid)
}
//单独处理   es 主题的情况
@@ -102,7 +103,7 @@
    for {
        select {
            case <-SdkMap["es"]:
                fmt.Println("es finanl sdk!")
                //logger.Info("es finanl sdk!")
        }
    }
}
@@ -111,7 +112,7 @@
func AutoDelSdk(sdkflag chan bool) {
    for _ = range sdkflag {
        fmt.Println("test autodelsdk")
        logger.Info("test autodelsdk")
        var oldSdk []string
        for key, _ := range SdkMap {
            oldSdk = append(oldSdk, key)
@@ -119,20 +120,19 @@
        util.Sdklist = append(util.Sdklist, "es")
        util.Sdklist = append(util.Sdklist, "virtual-faceextract-sdk-pull")
        sdkChanDel := util.Difference(oldSdk, util.Sdklist)
        fmt.Println(sdkChanDel)
        logger.Info(sdkChanDel)
        for key, op := range sdkChanDel {
            if op == "add" {
                CreatesdkTopicandServer(key)
            } else {
                DeletesdkTopicandServer(key)
                fmt.Println("删除主题 sdk: ", key)
                logger.Info("删除主题 sdk: ", key)
            }
        }
    }
}
//主题
//sdk数据 加工器
func SdkData(cid string,caddr string, taskid string, data []byte) protomsg.SdkMessage {
@@ -148,7 +148,6 @@
        sdkmsg.Tasklab = val.(*protomsg.TaskLabel)
        sdkmsg.Data = data
    }
    //sdkmsg.Tasklab = tasktag.TaskMapLab[taskid]
    return sdkmsg
}
@@ -159,18 +158,17 @@
    } else {
        sdksend = "es"
    }
    fmt.Printf("分发的主题是:%s 位置 %d/%d\n ", sdksend, int(sdkmsg.Tasklab.Index)+1, len(sdkmsg.Tasklab.Sdkinfos))
    logger.Debug("分发的主题: ", sdksend , "位置:", int(sdkmsg.Tasklab.Index)+1,"/",  len(sdkmsg.Tasklab.Sdkinfos))
    return
}
// create server
func NewSdkSocketListen(mode int, sdkid string, url string) (socket SocketContext, err error) {
    fmt.Println("url is: ", url)
    logger.Info("url is: ", url)
    ctx, cancel := context.WithCancel(context.Background())
    socket.Context = ctx
    socket.Cancel = cancel
    socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
    if socket.Sock == nil {
@@ -182,7 +180,7 @@
}
func NewSdkSocketDial(mode int, sdkid string, url string) (sid string, socket SocketContext, err error) {
    fmt.Println("url is: ", url)
    logger.Info("url is: ", url)
    ctx, cancel := context.WithCancel(context.Background())
    socket.Context = ctx
@@ -203,17 +201,15 @@
    for {
        select {
        case <-socket.Context.Done():
            fmt.Println("socket close")
            logger.Info("socket close")
            return
        default:
            if msg, err := socket.Sock.Recv(); err != nil {
                //fmt.Printf("%s ", err)
                continue
            } else {
                err = proto.Unmarshal(msg, &repsdkmsg)
                fmt.Println("receive len: ", len(msg))
                if err != nil {
                    fmt.Println("unmarshal error: ", err)
                    logger.Error("unmarshal error: ", err)
                    continue
                }
                repsdkmsg.Tasklab.Index++
@@ -226,33 +222,27 @@
}
func Send(sdkid string, socket SocketContext, in chan protomsg.SdkMessage) {
//    var v *protomsg.SdkMessage
//    var ok bool
    for {
        select {
        case <-socket.Context.Done():
            fmt.Println("socket is close")
            logger.Info("socket is close")
            return
        case v, ok := <-in:
            if ok {
                data, err :=v.Marshal()
                if err != nil {
                    fmt.Println("proto marshal error ", err)
                    logger.Error("proto marshal error ", err)
                    continue
                }
                
                fmt.Printf("从管道sdkid=%s 接受数据 %d\n", sdkid, len(data))
                fmt.Println()
                if err := socket.Sock.Send(data); err != nil {
                    fmt.Println(socket.Sock)
                    fmt.Println("failed send")
                    logger.Error("failed send")
                    continue
                }
                fmt.Printf("sdkid = %s ,send success:%d \n", sdkid, len(data))
                logger.Debug(sdkid, " send success: ", len(data))
            } else {
                fmt.Println(sdkid, " 主题关闭, 关闭send()")
                logger.Debug(sdkid, " 主题关闭, 关闭send()")
                return
            }
        }
tasktag/tasktag.go
@@ -1,23 +1,23 @@
package tasktag
import (
    "fmt"
    "sync"
    "basic.com/pubsub/protomsg.git"
    "github.com/long/test/util"
    "github.com/long/test/logger"
)
var TaskMapLab sync.Map
func Init() {
    fmt.Println("============= init tasktag info =====================")
    logger.Info("============= init tasktag info =====================")
    GenTaskMap()
    go func(taskflag chan bool) {
        for _ = range taskflag {
            GenTaskMap()
            fmt.Println("update task finished!")
            logger.Info("update task finished!")
        }
    }(util.TaskSdkflag)
@@ -43,7 +43,7 @@
    }
    GenTasklab(tls)
    TaskMapLab.Range(func(k, v interface{}) bool {
        fmt.Println(k, v)
        logger.Info(k, v)
        return true
    })
}
test
Binary files differ
util/sqlite.go
@@ -1,14 +1,13 @@
package util
import (
    "fmt"
    "flag"
    "basic.com/pubsub/protomsg.git"
    "basic.com/dbapi.git"
    "github.com/gogo/protobuf/proto"
    "basic.com/valib/gopherdiscovery.git"
    "os"
    "github.com/long/test/logger"
)
/*************************
@@ -32,7 +31,7 @@
var Sdkinfos []protomsg.Sdk
var urlServer = flag.String("urlServer","tcp://127.0.0.1:40007","heartbeat address of url server")
var urlPubSub = flag.String("urlPubsub","tcp://127.0.0.1:5007", "heartbeat pubsub address of url server")
var urlPubSub = flag.String("urlPubsub","tcp://127.0.0.1:50007", "heartbeat pubsub address of url server")
var dbip = flag.String("dbip","127.0.0.1","address of database ip")
var dbport = flag.Int("dbport", 8001, "port of database port")
@@ -48,9 +47,9 @@
func processinit(initchan chan bool) {
    CameraIds = camval.FindAll()
    fmt.Println("==============camera camera with task ================")
    logger.Info("==============camera camera with task ================")
    CameraTasks = camval.FindAllCameraAndTask()
    fmt.Println(CameraTasks)
    logger.Info(CameraTasks)
    TaskSdks = taskapi.FindAllTaskSdkRun()
@@ -61,55 +60,50 @@
func Getdata(opt []byte) {
    if err := proto.Unmarshal(opt, newsdkmsg); err != nil {
        fmt.Println("publichshMessage ", err)
        logger.Error("publichshMessage ", err)
        return
    }
    switch newsdkmsg.Table {
    case protomsg.TableChanged_T_Camera:
        fmt.Println("update camera")
        os.Exit(1)
        logger.Info("update camera")
        CameraIds = camval.FindAll()
        Cameraflag <- true
        logger.Info("update camera finish.")
    case protomsg.TableChanged_T_CameraTask:
        fmt.Println("update cameratask")
        os.Exit(1)
        logger.Info("update cameratask")
        CameraTasks = camval.FindAllCameraAndTask()
        fmt.Println("update cameratask finished!")
        logger.Info("update cameratask finished!")
    case protomsg.TableChanged_T_TaskSdk:
        fmt.Println("update tasksdk")
        os.Exit(1)
        logger.Info("update tasksdk")
        TaskSdks = taskapi.FindAllTaskSdkRun()
        TaskSdkflag <- true
    case protomsg.TableChanged_T_Sdk:
        fmt.Println("update sdk")
        os.Exit(1)
        logger.Info("update sdk")
        Sdklist = sdkapi.GetAllSdkIds()
        Sdkinfos = sdkapi.FindAll("")
        Sdkflag <- true
        fmt.Println("update finished!")
    default:
        fmt.Println("unknow type operation")
        os.Exit(1)
        logger.Info("unknow type operation")
    }
    fmt.Println(newsdkmsg)
    logger.Info(newsdkmsg)
}
func Init(initchan chan bool) {
    dbapi.Init(*dbip, *dbport)
    clientOne, _ := gopherdiscovery.ClientWithSub(*urlServer, *urlPubSub, "ip:local")
    recvinit := clientOne.HeartBeatMsg()
    fmt.Println(<-recvinit)
    _ = <-recvinit
    processinit(initchan)
    peers, _ := clientOne.Peers()
    for x := range peers {
        Getdata(x)
        os.Exit(1)
    }
}
util/util.go
@@ -2,7 +2,7 @@
import(
        "github.com/pierrec/lz4"
        "fmt"
        "github.com/long/test/logger"
       )
@@ -44,7 +44,7 @@
    out := make([]byte, 10*len(in))
         n, err := lz4.UncompressBlock(in, out)
         if err != nil {
             fmt.Println("uncompress error: ", err)
             logger.Error("uncompress error: ", err)
             return nil, err
         }
     out = out[:n] // uncompressed data
@@ -57,11 +57,11 @@
         ht := make([]int, 64<<10) // buffer for the compression table
         n, err := lz4.CompressBlock(in, out, ht)
         if err != nil {
             fmt.Println("compress: ", err)
             logger.Error("compress: ", err)
                 return nil, err
         }
     if n >= len(in) {
         fmt.Println("image is not compressible")
         logger.Error("image is not compressible")
     }
     out = out[:n] // compressed data
         return out, nil