task with zhangmeng ok release 2
| | |
| | | import ( |
| | | "errors" |
| | | |
| | | "basic.com/dbapi.git" |
| | | "basic.com/valib/deliver.git" |
| | | |
| | | "github.com/long/test/sdk" |
| | | "github.com/long/test/util" |
| | | "basic.com/pubsub/protomsg.git" |
| | | |
| | | "context" |
| | | "fmt" |
| | | "sync" |
| | | //"time" |
| | | // "time" |
| | | ) |
| | | |
| | | //var SocketManage = make(map[string]SocketContext) |
| | | var SocketManage sync.Map |
| | | |
| | | var Initchannel = make(chan string) |
| | | var Initchannel = make(chan protomsg.Camera ) |
| | | |
| | | type SocketContext struct { |
| | | Sock deliver.Deliver |
| | | Context context.Context |
| | | Cancel context.CancelFunc |
| | | } |
| | | |
| | | var camval dbapi.CameraApi |
| | | |
| | | func Init() { |
| | | |
| | |
| | | go AutoDelCamera(util.Cameraflag) |
| | | |
| | | for _, cam := range util.CameraIds { |
| | | Initchannel <- cam.Id |
| | | Initchannel <- cam |
| | | } |
| | | } |
| | | |
| | | func CreateCamera(camera chan string) { |
| | | for camid := range camera { |
| | | func CreateCamera(camera chan protomsg.Camera) { |
| | | for cam := range camera { |
| | | camid := cam.Id |
| | | caddr := cam.Addr |
| | | if _, ok := SocketManage.Load(camid); !ok { |
| | | url := fmt.Sprintf("ipc:///tmp/%s.ipc", camid) |
| | | |
| | |
| | | continue |
| | | } |
| | | |
| | | go func(cid string, sock SocketContext) { |
| | | Recv(cid, sock) |
| | | }(id, socketlisten) |
| | | go func(cid string, cameraaddr string, sock SocketContext) { |
| | | Recv(cid, cameraaddr, sock) |
| | | }(id, caddr, socketlisten) |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | for key, op := range cameraChanDel { |
| | | if op == "add" { |
| | | Initchannel <- key |
| | | for _, value := range util.CameraIds { |
| | | if key == value.Id{ |
| | | Initchannel <- value |
| | | } |
| | | } |
| | | } else { |
| | | if sock, ok := SocketManage.Load(key); ok { |
| | | if socket, sok := sock.(SocketContext); sok { |
| | |
| | | return cameraid, socket, nil |
| | | } |
| | | |
| | | func Recv(cameraid string, socket SocketContext) { |
| | | func Recv(cameraid string, caddr string , socket SocketContext) { |
| | | var msg []byte |
| | | var err error |
| | | for { |
| | |
| | | fmt.Println() |
| | | fmt.Println("============== one msg input ==========") |
| | | for _, taskid := range GetAlltask(cameraid) { |
| | | //time.Sleep(5 * time.Second) |
| | | // time.Sleep(5 * time.Second) |
| | | fmt.Println("cameraid: ", cameraid, " taskid: ", taskid) |
| | | Taskdolist(cameraid, taskid, msg) |
| | | Taskdolist(cameraid, caddr, taskid, msg) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | // 根据cid 获取 所有的任务 |
| | | // 据cid 获取 所有的任务 |
| | | func GetAlltask(cid string) (tasks []string) { |
| | | for _, camsingle := range util.CameraTasks { |
| | | if cid == camsingle.Camera.Id { |
| | |
| | | return |
| | | } |
| | | |
| | | func Taskdolist(cid string, taskid string, data []byte) { |
| | | func Taskdolist(cid string, caddr string, taskid string, data []byte) { |
| | | |
| | | // 数据加工(打标签) |
| | | sdkmsg := sdk.SdkData(cid, taskid, data) |
| | | sdkmsg := sdk.SdkData(cid, caddr, taskid, data) |
| | | if sdkmsg.Tasklab == nil { |
| | | fmt.Printf("cid:%s 没有任务%s\n", cid, taskid) |
| | | return |
| | |
| | | // 计算分发的主题 |
| | | SendTopic := sdk.SdkSendTopic(sdkmsg) |
| | | if _, ok := sdk.SdkMap[SendTopic]; ok { |
| | | fmt.Println("分发的主题存在") |
| | | sdk.SdkMap[SendTopic] <- sdkmsg |
| | | //fmt.Println("重新开始循环: ", sdk.SdkMap) |
| | | } else { |
| | | fmt.Println("分发的主题不存在") |
| | | } |
| | |
| | | go 1.12 |
| | | |
| | | require ( |
| | | basic.com/dbapi.git v0.0.0-20190531051326-e49473afa5e9 |
| | | basic.com/pubsub/protomsg.git v0.0.0-20190530084829-0ea842491a96 |
| | | basic.com/dbapi.git v0.0.0-20190620073851-30a7d0574bbc |
| | | basic.com/pubsub/protomsg.git v0.0.0-20190621090107-c5cf390e19bb |
| | | basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce |
| | | github.com/ajg/form v1.5.1 // indirect |
| | | github.com/gogo/protobuf v1.2.1 |
| | | github.com/golang/protobuf v1.3.1 |
| | | github.com/gorilla/websocket v1.4.0 // indirect |
| | |
| | | basic.com/dbapi.git v0.0.0-20190531051326-e49473afa5e9 h1:1HIG2sEEYVUKL7nyJvURj/p24JpDwFwKE/XtatPsXVQ= |
| | | basic.com/dbapi.git v0.0.0-20190531051326-e49473afa5e9/go.mod h1:eDXPnxaz6jZPDvBSk7ya7oSASWPCuUEgRTJCjsfKt/Q= |
| | | basic.com/pubsub/protomsg.git v0.0.0-20190530084829-0ea842491a96 h1:7nkipxWDbIK4wRbLCZeUoGxzdEIxZFumSTM6xhfWiWM= |
| | | basic.com/pubsub/protomsg.git v0.0.0-20190530084829-0ea842491a96/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU= |
| | | basic.com/dbapi.git v0.0.0-20190620073851-30a7d0574bbc h1:Dd4aSg+S8E1Kj49kTtHkRZpTeIJiM521i8wCkWMAmOg= |
| | | basic.com/dbapi.git v0.0.0-20190620073851-30a7d0574bbc/go.mod h1:eDXPnxaz6jZPDvBSk7ya7oSASWPCuUEgRTJCjsfKt/Q= |
| | | basic.com/pubsub/protomsg.git v0.0.0-20190621090107-c5cf390e19bb h1:TCo2Oo2ZqHn+/WvfVCyQFSKHHps9gJZkC0D8MtTVcRs= |
| | | basic.com/pubsub/protomsg.git v0.0.0-20190621090107-c5cf390e19bb/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU= |
| | | basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce h1:/D6k+FVN1sMqLz6tMlsIl9bKwE2Mpc5d4QfPh0y4DSQ= |
| | | basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce/go.mod h1:bkYiTUGzckyNOjAgn9rB/DOjFzwoSHJlruuWQ6hu6IY= |
| | | github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU= |
| | | github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= |
| | | github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= |
| | | github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= |
| | | github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= |
| | |
| | | "context" |
| | | "errors" |
| | | "fmt" |
| | | "os" |
| | | |
| | | // "github.com/long/test/httpclient" |
| | | "github.com/long/test/tasktag" |
| | |
| | | ) |
| | | |
| | | var SocketManage = make(map[string]SocketContext) |
| | | var SdkMap = make(map[string]chan *protomsg.SdkMessage) |
| | | var SdkMap = make(map[string]chan protomsg.SdkMessage) |
| | | |
| | | type SocketContext struct { |
| | | Sock deliver.Deliver |
| | |
| | | fmt.Println() |
| | | } |
| | | |
| | | SdkMap["es"] = make(chan *protomsg.SdkMessage) |
| | | SdkMap["es"] = make(chan protomsg.SdkMessage) |
| | | fmt.Println("create es channel: ") |
| | | go es(SdkMap["es"]) |
| | | go AutoDelSdk(util.Sdkflag) |
| | | } |
| | | |
| | | func CreatesdkTopicandServer(sdkid string) { |
| | | SdkMap[sdkid] = make(chan *protomsg.SdkMessage) |
| | | SdkMap[sdkid] = make(chan protomsg.SdkMessage) |
| | | fmt.Println("create sdk channel: ", sdkid) |
| | | |
| | | url := fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPull) |
| | |
| | | } |
| | | |
| | | //单独处理 es 主题的情况 |
| | | func es(sdkmsgchan chan *protomsg.SdkMessage) { |
| | | func es(sdkmsgchan chan protomsg.SdkMessage) { |
| | | for _ = range sdkmsgchan { |
| | | fmt.Println("this data is finish all sdk! ") |
| | | } |
| | |
| | | //主题 |
| | | |
| | | //sdk数据 加工器 |
| | | func SdkData(cid string, taskid string, data []byte) *protomsg.SdkMessage { |
| | | var sdkmsg = &protomsg.SdkMessage{} |
| | | func SdkData(cid string,caddr string, taskid string, data []byte) protomsg.SdkMessage { |
| | | var sdkmsg = protomsg.SdkMessage{} |
| | | sdkmsg.Cid = cid |
| | | //if _, ok := tasktag.TaskMapLab[taskid]; !ok { |
| | | // sdkmsg.Tasklab = nil |
| | | // return sdkmsg |
| | | //} |
| | | |
| | | sdkmsg.Caddr =caddr |
| | | if val, ok := tasktag.TaskMapLab.Load(taskid); !ok { |
| | | sdkmsg.Tasklab = nil |
| | | return sdkmsg |
| | |
| | | } |
| | | |
| | | //sdk数据分发器 |
| | | func SdkSendTopic(sdkmsg *protomsg.SdkMessage) (sdksend string) { |
| | | if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkids) { |
| | | sdksend = sdkmsg.Tasklab.Sdkids[sdkmsg.Tasklab.Index] |
| | | func SdkSendTopic(sdkmsg protomsg.SdkMessage) (sdksend string) { |
| | | if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkinfos) { |
| | | sdksend = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdkid |
| | | } else { |
| | | sdksend = "es" |
| | | } |
| | | fmt.Println("分发的主题是: ", sdksend) |
| | | fmt.Printf("分发的主题是:%s 位置 %d/%d\n ", sdksend, int(sdkmsg.Tasklab.Index)+1, len(sdkmsg.Tasklab.Sdkinfos)) |
| | | return |
| | | } |
| | | |
| | |
| | | |
| | | func Recv(socket SocketContext) { |
| | | |
| | | var repsdkmsg = &protomsg.SdkMessage{} |
| | | var repsdkmsg = protomsg.SdkMessage{} |
| | | for { |
| | | select { |
| | | case <-socket.Context.Done(): |
| | |
| | | //fmt.Printf("%s ", err) |
| | | continue |
| | | } else { |
| | | err = proto.Unmarshal(msg, repsdkmsg) |
| | | err = proto.Unmarshal(msg, &repsdkmsg) |
| | | fmt.Println("receive len: ", len(msg)) |
| | | if err != nil { |
| | | fmt.Println("unmarshal error: ", err) |
| | | os.Exit(1) |
| | | continue |
| | | } |
| | | repsdkmsg.Tasklab.Index++ |
| | | //调用计算函数, 分发给下一个主题 |
| | | nexttopic := SdkSendTopic(repsdkmsg) |
| | | SdkMap[nexttopic] <- repsdkmsg |
| | |
| | | } |
| | | } |
| | | |
| | | func Send(sdkid string, socket SocketContext, in chan *protomsg.SdkMessage) { |
| | | var v *protomsg.SdkMessage |
| | | var ok bool |
| | | func Send(sdkid string, socket SocketContext, in chan protomsg.SdkMessage) { |
| | | // var v *protomsg.SdkMessage |
| | | // var ok bool |
| | | |
| | | for { |
| | | select { |
| | | case <-socket.Context.Done(): |
| | | fmt.Println("socket is close") |
| | | return |
| | | case v, ok = <-in: |
| | | case v, ok := <-in: |
| | | if ok { |
| | | data, err := proto.Marshal(v) |
| | | data, err :=v.Marshal() |
| | | if err != nil { |
| | | fmt.Println("proto marshal error ", err) |
| | | continue |
| | |
| | | "github.com/long/test/util" |
| | | ) |
| | | |
| | | //var TaskMapLab = make(map[string]*protomsg.TaskLabel) |
| | | var TaskMapLab sync.Map |
| | | |
| | | func Init() { |
| | |
| | | for _, taskSdk := range util.TaskSdks { |
| | | var tl protomsg.TaskLabel |
| | | tl.Taskid = taskSdk.Task.Taskid |
| | | tl.Taskname = taskSdk.Task.Taskname |
| | | for _, sdkinfo := range taskSdk.Sdks { |
| | | tl.Sdkids = append(tl.Sdkids, sdkinfo.Id) |
| | | sdkinfowithtask := new(protomsg.SdkmsgWithTask) |
| | | sdkinfowithtask.Sdkid = sdkinfo.Id |
| | | sdkinfowithtask.Sdktype = sdkinfo.SdkType |
| | | sdkinfowithtask.SdkName = sdkinfo.SdkName |
| | | sdkinfowithtask.Sdkdata = make([]byte, 1) |
| | | tl.Sdkinfos = append(tl.Sdkinfos, sdkinfowithtask) |
| | | } |
| | | tl.Index = int32(0) |
| | | tls = append(tls, tl) |
| | | } |
| | | |
| | | GenTasklab(tls) |
| | | |
| | | TaskMapLab.Range(func(k, v interface{}) bool { |
| | | fmt.Println(k, v) |
| | | return true |
| | | }) |
| | | |
| | | //for key, value := range TaskMapLab { |
| | | // fmt.Println() |
| | | // fmt.Println(key, value) |
| | | //} |
| | | } |
| | | |
| | | // 从sqlite 接口拿到所有的任务, 每一个任务都有自己的几个算法 |
| | | //以 taskid 作为key, 对应的算法组合作为 value |
| | | func GenTasklab(tasklab []protomsg.TaskLabel) { |
| | | // TaskMapLab = nil |
| | | // TaskMapLab = make(map[string]*protomsg.TaskLabel) |
| | | TaskMapLab.Range(func(key interface{}, value interface{}) bool { |
| | | TaskMapLab.Delete(key) |
| | | return true |
| | |
| | | for _, value := range tasklab { |
| | | pv := value |
| | | TaskMapLab.Store(value.Taskid, &pv) |
| | | //TaskMapLab[value.Taskid] = &pv |
| | | } |
| | | } |
| | |
| | | var CameraTasks []protomsg.CameraAndTaskInfo |
| | | |
| | | var TaskSdks []protomsg.TaskSdkInfo |
| | | |
| | | var Sdklist []string |
| | | var Sdkinfos []protomsg.Sdk |
| | | |
| | | var urlServer = "tcp://192.168.1.11:40007" |
| | | var urlPubSub = "tcp://192.168.1.11:50007" |
| | |
| | | TaskSdks = taskapi.FindAll() |
| | | |
| | | Sdklist = sdkapi.GetAllSdkIds() |
| | | Sdkinfos = sdkapi.FindAll("") |
| | | initchan <- true |
| | | } |
| | | |
| | |
| | | case protomsg.TableChanged_T_Sdk: |
| | | fmt.Println("update sdk") |
| | | Sdklist = sdkapi.GetAllSdkIds() |
| | | Sdkinfos = sdkapi.FindAll("") |
| | | Sdkflag <- true |
| | | fmt.Println("update finished!") |
| | | |
| | |
| | | |
| | | peers, _ := clientOne.Peers() |
| | | for x := range peers { |
| | | fmt.Println("client: ", x) |
| | | Getdata(x) |
| | | } |
| | | } |