554325746@qq.com
2019-06-26 5fdc5059555e7a4cf533b7bfdb79025f23fa2b6a
task with zhangmeng ok release 2
7个文件已修改
1个文件已删除
122 ■■■■ 已修改文件
camera/camera.go 42 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ls 补丁 | 查看 | 原始文档 | blame | 历史
sdk/sdk.go 43 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tasktag/tasktag.go 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test 补丁 | 查看 | 原始文档 | blame | 历史
util/sqlite.go 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
camera/camera.go
@@ -3,30 +3,28 @@
import (
    "errors"
    "basic.com/dbapi.git"
    "basic.com/valib/deliver.git"
    "github.com/long/test/sdk"
    "github.com/long/test/util"
    "basic.com/pubsub/protomsg.git"
    "context"
    "fmt"
    "sync"
    //"time"
    // "time"
)
//var SocketManage = make(map[string]SocketContext)
var SocketManage sync.Map
var Initchannel = make(chan string)
var Initchannel = make(chan protomsg.Camera )
type SocketContext struct {
    Sock    deliver.Deliver
    Context context.Context
    Cancel  context.CancelFunc
}
var camval dbapi.CameraApi
func Init() {
@@ -40,12 +38,14 @@
    go AutoDelCamera(util.Cameraflag)
    for _, cam := range util.CameraIds {
        Initchannel <- cam.Id
        Initchannel <- cam
    }
}
func CreateCamera(camera chan string) {
    for camid := range camera {
func CreateCamera(camera chan protomsg.Camera) {
    for cam := range camera {
        camid := cam.Id
        caddr := cam.Addr
        if _, ok := SocketManage.Load(camid); !ok {
            url := fmt.Sprintf("ipc:///tmp/%s.ipc", camid)
@@ -55,9 +55,9 @@
                continue
            }
            go func(cid string, sock SocketContext) {
                Recv(cid, sock)
            }(id, socketlisten)
            go func(cid string, cameraaddr string,  sock SocketContext) {
                Recv(cid, cameraaddr, sock)
            }(id, caddr, socketlisten)
        }
    }
}
@@ -87,7 +87,11 @@
        for key, op := range cameraChanDel {
            if op == "add" {
                Initchannel <- key
                for _, value := range util.CameraIds {
                    if key == value.Id{
                        Initchannel <- value
                    }
                }
            } else {
                if sock, ok := SocketManage.Load(key); ok {
                    if socket, sok := sock.(SocketContext); sok {
@@ -118,7 +122,7 @@
    return cameraid, socket, nil
}
func Recv(cameraid string, socket SocketContext) {
func Recv(cameraid string, caddr string , socket SocketContext) {
    var msg []byte
    var err error
    for {
@@ -134,16 +138,16 @@
                fmt.Println()
                fmt.Println("============== one msg input ==========")
                for _, taskid := range GetAlltask(cameraid) {
                    //time.Sleep(5 * time.Second)
                    // time.Sleep(5 * time.Second)
                    fmt.Println("cameraid: ", cameraid, " taskid: ", taskid)
                    Taskdolist(cameraid, taskid, msg)
                     Taskdolist(cameraid, caddr, taskid, msg)
                }
            }
        }
    }
}
//   根据cid 获取 所有的任务
//   据cid 获取 所有的任务
func GetAlltask(cid string) (tasks []string) {
    for _, camsingle := range util.CameraTasks {
        if cid == camsingle.Camera.Id {
@@ -156,10 +160,10 @@
    return
}
func Taskdolist(cid string, taskid string, data []byte) {
func Taskdolist(cid string, caddr string,  taskid string, data []byte) {
    //  数据加工(打标签)
    sdkmsg := sdk.SdkData(cid, taskid, data)
    sdkmsg := sdk.SdkData(cid, caddr, taskid, data)
    if sdkmsg.Tasklab == nil {
        fmt.Printf("cid:%s 没有任务%s\n", cid, taskid)
        return
@@ -168,9 +172,7 @@
    //  计算分发的主题
    SendTopic := sdk.SdkSendTopic(sdkmsg)
    if _, ok := sdk.SdkMap[SendTopic]; ok {
        fmt.Println("分发的主题存在")
        sdk.SdkMap[SendTopic] <- sdkmsg
        //fmt.Println("重新开始循环: ", sdk.SdkMap)
    } else {
        fmt.Println("分发的主题不存在")
    }
go.mod
@@ -3,9 +3,10 @@
go 1.12
require (
    basic.com/dbapi.git v0.0.0-20190531051326-e49473afa5e9
    basic.com/pubsub/protomsg.git v0.0.0-20190530084829-0ea842491a96
    basic.com/dbapi.git v0.0.0-20190620073851-30a7d0574bbc
    basic.com/pubsub/protomsg.git v0.0.0-20190621090107-c5cf390e19bb
    basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce
    github.com/ajg/form v1.5.1 // indirect
    github.com/gogo/protobuf v1.2.1
    github.com/golang/protobuf v1.3.1
    github.com/gorilla/websocket v1.4.0 // indirect
go.sum
@@ -1,9 +1,13 @@
basic.com/dbapi.git v0.0.0-20190531051326-e49473afa5e9 h1:1HIG2sEEYVUKL7nyJvURj/p24JpDwFwKE/XtatPsXVQ=
basic.com/dbapi.git v0.0.0-20190531051326-e49473afa5e9/go.mod h1:eDXPnxaz6jZPDvBSk7ya7oSASWPCuUEgRTJCjsfKt/Q=
basic.com/pubsub/protomsg.git v0.0.0-20190530084829-0ea842491a96 h1:7nkipxWDbIK4wRbLCZeUoGxzdEIxZFumSTM6xhfWiWM=
basic.com/pubsub/protomsg.git v0.0.0-20190530084829-0ea842491a96/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/dbapi.git v0.0.0-20190620073851-30a7d0574bbc h1:Dd4aSg+S8E1Kj49kTtHkRZpTeIJiM521i8wCkWMAmOg=
basic.com/dbapi.git v0.0.0-20190620073851-30a7d0574bbc/go.mod h1:eDXPnxaz6jZPDvBSk7ya7oSASWPCuUEgRTJCjsfKt/Q=
basic.com/pubsub/protomsg.git v0.0.0-20190621090107-c5cf390e19bb h1:TCo2Oo2ZqHn+/WvfVCyQFSKHHps9gJZkC0D8MtTVcRs=
basic.com/pubsub/protomsg.git v0.0.0-20190621090107-c5cf390e19bb/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce h1:/D6k+FVN1sMqLz6tMlsIl9bKwE2Mpc5d4QfPh0y4DSQ=
basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce/go.mod h1:bkYiTUGzckyNOjAgn9rB/DOjFzwoSHJlruuWQ6hu6IY=
github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
ls
sdk/sdk.go
@@ -4,6 +4,7 @@
    "context"
    "errors"
    "fmt"
    "os"
    //    "github.com/long/test/httpclient"
    "github.com/long/test/tasktag"
@@ -21,7 +22,7 @@
)
var SocketManage = make(map[string]SocketContext)
var SdkMap = make(map[string]chan *protomsg.SdkMessage)
var SdkMap = make(map[string]chan protomsg.SdkMessage)
type SocketContext struct {
    Sock    deliver.Deliver
@@ -37,14 +38,14 @@
        fmt.Println()
    }
    SdkMap["es"] = make(chan *protomsg.SdkMessage)
    SdkMap["es"] = make(chan protomsg.SdkMessage)
    fmt.Println("create es channel:  ")
    go es(SdkMap["es"])
    go AutoDelSdk(util.Sdkflag)
}
func CreatesdkTopicandServer(sdkid string) {
    SdkMap[sdkid] = make(chan *protomsg.SdkMessage)
    SdkMap[sdkid] = make(chan protomsg.SdkMessage)
    fmt.Println("create sdk channel:  ", sdkid)
    url := fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPull)
@@ -77,7 +78,7 @@
}
//单独处理   es 主题的情况
func es(sdkmsgchan chan *protomsg.SdkMessage) {
func es(sdkmsgchan chan protomsg.SdkMessage) {
    for _ = range sdkmsgchan {
        fmt.Println("this data is finish all sdk! ")
    }
@@ -110,14 +111,10 @@
//主题
//sdk数据 加工器
func SdkData(cid string, taskid string, data []byte) *protomsg.SdkMessage {
    var sdkmsg = &protomsg.SdkMessage{}
func SdkData(cid string,caddr string, taskid string, data []byte) protomsg.SdkMessage {
    var sdkmsg = protomsg.SdkMessage{}
    sdkmsg.Cid = cid
    //if _, ok := tasktag.TaskMapLab[taskid]; !ok {
    //    sdkmsg.Tasklab = nil
    //    return sdkmsg
    //}
    sdkmsg.Caddr =caddr
    if val, ok := tasktag.TaskMapLab.Load(taskid); !ok {
        sdkmsg.Tasklab = nil
        return sdkmsg
@@ -132,13 +129,13 @@
}
//sdk数据分发器
func SdkSendTopic(sdkmsg *protomsg.SdkMessage) (sdksend string) {
    if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkids) {
        sdksend = sdkmsg.Tasklab.Sdkids[sdkmsg.Tasklab.Index]
func SdkSendTopic(sdkmsg protomsg.SdkMessage) (sdksend string) {
    if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkinfos) {
        sdksend = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdkid
    } else {
        sdksend = "es"
    }
    fmt.Println("分发的主题是: ", sdksend)
    fmt.Printf("分发的主题是:%s 位置 %d/%d\n ", sdksend, int(sdkmsg.Tasklab.Index)+1, len(sdkmsg.Tasklab.Sdkinfos))
    return
}
@@ -179,7 +176,7 @@
func Recv(socket SocketContext) {
    var repsdkmsg = &protomsg.SdkMessage{}
    var repsdkmsg = protomsg.SdkMessage{}
    for {
        select {
        case <-socket.Context.Done():
@@ -190,12 +187,14 @@
                //fmt.Printf("%s ", err)
                continue
            } else {
                err = proto.Unmarshal(msg, repsdkmsg)
                err = proto.Unmarshal(msg, &repsdkmsg)
                fmt.Println("receive len: ", len(msg))
                if err != nil {
                    fmt.Println("unmarshal error: ", err)
                    os.Exit(1)
                    continue
                }
                repsdkmsg.Tasklab.Index++
                //调用计算函数, 分发给下一个主题
                nexttopic := SdkSendTopic(repsdkmsg)
                SdkMap[nexttopic] <- repsdkmsg
@@ -204,18 +203,18 @@
    }
}
func Send(sdkid string, socket SocketContext, in chan *protomsg.SdkMessage) {
    var v *protomsg.SdkMessage
    var ok bool
func Send(sdkid string, socket SocketContext, in chan protomsg.SdkMessage) {
//    var v *protomsg.SdkMessage
//    var ok bool
    for {
        select {
        case <-socket.Context.Done():
            fmt.Println("socket is close")
            return
        case v, ok = <-in:
        case v, ok := <-in:
            if ok {
                data, err := proto.Marshal(v)
                data, err :=v.Marshal()
                if err != nil {
                    fmt.Println("proto marshal error ", err)
                    continue
tasktag/tasktag.go
@@ -8,7 +8,6 @@
    "github.com/long/test/util"
)
//var TaskMapLab = make(map[string]*protomsg.TaskLabel)
var TaskMapLab sync.Map
func Init() {
@@ -29,31 +28,28 @@
    for _, taskSdk := range util.TaskSdks {
        var tl protomsg.TaskLabel
        tl.Taskid = taskSdk.Task.Taskid
        tl.Taskname = taskSdk.Task.Taskname
        for _, sdkinfo := range taskSdk.Sdks {
            tl.Sdkids = append(tl.Sdkids, sdkinfo.Id)
            sdkinfowithtask := new(protomsg.SdkmsgWithTask)
            sdkinfowithtask.Sdkid = sdkinfo.Id
            sdkinfowithtask.Sdktype = sdkinfo.SdkType
            sdkinfowithtask.SdkName = sdkinfo.SdkName
            sdkinfowithtask.Sdkdata = make([]byte, 1)
            tl.Sdkinfos = append(tl.Sdkinfos, sdkinfowithtask)
        }
        tl.Index = int32(0)
        tls = append(tls, tl)
    }
    GenTasklab(tls)
    TaskMapLab.Range(func(k, v interface{}) bool {
        fmt.Println(k, v)
        return true
    })
    //for key, value := range TaskMapLab {
    //    fmt.Println()
    //    fmt.Println(key, value)
    //}
}
// 从sqlite 接口拿到所有的任务, 每一个任务都有自己的几个算法
//以 taskid 作为key, 对应的算法组合作为 value
func GenTasklab(tasklab []protomsg.TaskLabel) {
    // TaskMapLab = nil
    // TaskMapLab = make(map[string]*protomsg.TaskLabel)
    TaskMapLab.Range(func(key interface{}, value interface{}) bool {
        TaskMapLab.Delete(key)
        return true
@@ -62,6 +58,5 @@
    for _, value := range tasklab {
        pv := value
        TaskMapLab.Store(value.Taskid, &pv)
        //TaskMapLab[value.Taskid] = &pv
    }
}
test
Binary files differ
util/sqlite.go
@@ -24,7 +24,9 @@
var CameraTasks []protomsg.CameraAndTaskInfo
var TaskSdks []protomsg.TaskSdkInfo
var Sdklist []string
var Sdkinfos []protomsg.Sdk
var urlServer = "tcp://192.168.1.11:40007"
var urlPubSub = "tcp://192.168.1.11:50007"
@@ -48,6 +50,7 @@
    TaskSdks = taskapi.FindAll()
    Sdklist = sdkapi.GetAllSdkIds()
    Sdkinfos = sdkapi.FindAll("")
    initchan <- true
}
@@ -76,6 +79,7 @@
    case protomsg.TableChanged_T_Sdk:
        fmt.Println("update sdk")
        Sdklist = sdkapi.GetAllSdkIds()
        Sdkinfos = sdkapi.FindAll("")
        Sdkflag <- true
        fmt.Println("update finished!")
@@ -93,7 +97,6 @@
    peers, _ := clientOne.Peers()
    for x := range peers {
        fmt.Println("client: ", x)
        Getdata(x)
    }
}