From 5fdc5059555e7a4cf533b7bfdb79025f23fa2b6a Mon Sep 17 00:00:00 2001
From: 554325746@qq.com <554325746@qq.com>
Date: 星期三, 26 六月 2019 14:53:46 +0800
Subject: [PATCH] task with zhangmeng ok release 2
---
/dev/null | 0
go.sum | 8 ++
test | 0
tasktag/tasktag.go | 19 ++----
go.mod | 5 +
sdk/sdk.go | 43 +++++++-------
util/sqlite.go | 5 +
camera/camera.go | 42 +++++++------
8 files changed, 63 insertions(+), 59 deletions(-)
diff --git a/camera/camera.go b/camera/camera.go
index 6432434..b24b337 100644
--- a/camera/camera.go
+++ b/camera/camera.go
@@ -3,30 +3,28 @@
import (
"errors"
- "basic.com/dbapi.git"
"basic.com/valib/deliver.git"
"github.com/long/test/sdk"
"github.com/long/test/util"
+ "basic.com/pubsub/protomsg.git"
"context"
"fmt"
"sync"
- //"time"
+ // "time"
)
//var SocketManage = make(map[string]SocketContext)
var SocketManage sync.Map
-var Initchannel = make(chan string)
+var Initchannel = make(chan protomsg.Camera )
type SocketContext struct {
Sock deliver.Deliver
Context context.Context
Cancel context.CancelFunc
}
-
-var camval dbapi.CameraApi
func Init() {
@@ -40,12 +38,14 @@
go AutoDelCamera(util.Cameraflag)
for _, cam := range util.CameraIds {
- Initchannel <- cam.Id
+ Initchannel <- cam
}
}
-func CreateCamera(camera chan string) {
- for camid := range camera {
+func CreateCamera(camera chan protomsg.Camera) {
+ for cam := range camera {
+ camid := cam.Id
+ caddr := cam.Addr
if _, ok := SocketManage.Load(camid); !ok {
url := fmt.Sprintf("ipc:///tmp/%s.ipc", camid)
@@ -55,9 +55,9 @@
continue
}
- go func(cid string, sock SocketContext) {
- Recv(cid, sock)
- }(id, socketlisten)
+ go func(cid string, cameraaddr string, sock SocketContext) {
+ Recv(cid, cameraaddr, sock)
+ }(id, caddr, socketlisten)
}
}
}
@@ -87,7 +87,11 @@
for key, op := range cameraChanDel {
if op == "add" {
- Initchannel <- key
+ for _, value := range util.CameraIds {
+ if key == value.Id{
+ Initchannel <- value
+ }
+ }
} else {
if sock, ok := SocketManage.Load(key); ok {
if socket, sok := sock.(SocketContext); sok {
@@ -118,7 +122,7 @@
return cameraid, socket, nil
}
-func Recv(cameraid string, socket SocketContext) {
+func Recv(cameraid string, caddr string , socket SocketContext) {
var msg []byte
var err error
for {
@@ -134,16 +138,16 @@
fmt.Println()
fmt.Println("============== one msg input ==========")
for _, taskid := range GetAlltask(cameraid) {
- //time.Sleep(5 * time.Second)
+ // time.Sleep(5 * time.Second)
fmt.Println("cameraid: ", cameraid, " taskid: ", taskid)
- Taskdolist(cameraid, taskid, msg)
+ Taskdolist(cameraid, caddr, taskid, msg)
}
}
}
}
}
-// 鏍规嵁cid 鑾峰彇 鎵�鏈夌殑浠诲姟
+// 鎹甤id 鑾峰彇 鎵�鏈夌殑浠诲姟
func GetAlltask(cid string) (tasks []string) {
for _, camsingle := range util.CameraTasks {
if cid == camsingle.Camera.Id {
@@ -156,10 +160,10 @@
return
}
-func Taskdolist(cid string, taskid string, data []byte) {
+func Taskdolist(cid string, caddr string, taskid string, data []byte) {
// 鏁版嵁鍔犲伐(鎵撴爣绛�)
- sdkmsg := sdk.SdkData(cid, taskid, data)
+ sdkmsg := sdk.SdkData(cid, caddr, taskid, data)
if sdkmsg.Tasklab == nil {
fmt.Printf("cid:%s 娌℃湁浠诲姟%s\n", cid, taskid)
return
@@ -168,9 +172,7 @@
// 璁$畻鍒嗗彂鐨勪富棰�
SendTopic := sdk.SdkSendTopic(sdkmsg)
if _, ok := sdk.SdkMap[SendTopic]; ok {
- fmt.Println("鍒嗗彂鐨勪富棰樺瓨鍦�")
sdk.SdkMap[SendTopic] <- sdkmsg
- //fmt.Println("閲嶆柊寮�濮嬪惊鐜�: ", sdk.SdkMap)
} else {
fmt.Println("鍒嗗彂鐨勪富棰樹笉瀛樺湪")
}
diff --git a/go.mod b/go.mod
index 9051b1b..42e398d 100644
--- a/go.mod
+++ b/go.mod
@@ -3,9 +3,10 @@
go 1.12
require (
- basic.com/dbapi.git v0.0.0-20190531051326-e49473afa5e9
- basic.com/pubsub/protomsg.git v0.0.0-20190530084829-0ea842491a96
+ basic.com/dbapi.git v0.0.0-20190620073851-30a7d0574bbc
+ basic.com/pubsub/protomsg.git v0.0.0-20190621090107-c5cf390e19bb
basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce
+ github.com/ajg/form v1.5.1 // indirect
github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.3.1
github.com/gorilla/websocket v1.4.0 // indirect
diff --git a/go.sum b/go.sum
index aa563a6..4a4fa0d 100644
--- a/go.sum
+++ b/go.sum
@@ -1,9 +1,13 @@
basic.com/dbapi.git v0.0.0-20190531051326-e49473afa5e9 h1:1HIG2sEEYVUKL7nyJvURj/p24JpDwFwKE/XtatPsXVQ=
basic.com/dbapi.git v0.0.0-20190531051326-e49473afa5e9/go.mod h1:eDXPnxaz6jZPDvBSk7ya7oSASWPCuUEgRTJCjsfKt/Q=
-basic.com/pubsub/protomsg.git v0.0.0-20190530084829-0ea842491a96 h1:7nkipxWDbIK4wRbLCZeUoGxzdEIxZFumSTM6xhfWiWM=
-basic.com/pubsub/protomsg.git v0.0.0-20190530084829-0ea842491a96/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
+basic.com/dbapi.git v0.0.0-20190620073851-30a7d0574bbc h1:Dd4aSg+S8E1Kj49kTtHkRZpTeIJiM521i8wCkWMAmOg=
+basic.com/dbapi.git v0.0.0-20190620073851-30a7d0574bbc/go.mod h1:eDXPnxaz6jZPDvBSk7ya7oSASWPCuUEgRTJCjsfKt/Q=
+basic.com/pubsub/protomsg.git v0.0.0-20190621090107-c5cf390e19bb h1:TCo2Oo2ZqHn+/WvfVCyQFSKHHps9gJZkC0D8MtTVcRs=
+basic.com/pubsub/protomsg.git v0.0.0-20190621090107-c5cf390e19bb/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce h1:/D6k+FVN1sMqLz6tMlsIl9bKwE2Mpc5d4QfPh0y4DSQ=
basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce/go.mod h1:bkYiTUGzckyNOjAgn9rB/DOjFzwoSHJlruuWQ6hu6IY=
+github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
+github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
diff --git a/ls b/ls
deleted file mode 100644
index e69de29..0000000
--- a/ls
+++ /dev/null
diff --git a/sdk/sdk.go b/sdk/sdk.go
index bd933e7..7a1c554 100644
--- a/sdk/sdk.go
+++ b/sdk/sdk.go
@@ -4,6 +4,7 @@
"context"
"errors"
"fmt"
+ "os"
// "github.com/long/test/httpclient"
"github.com/long/test/tasktag"
@@ -21,7 +22,7 @@
)
var SocketManage = make(map[string]SocketContext)
-var SdkMap = make(map[string]chan *protomsg.SdkMessage)
+var SdkMap = make(map[string]chan protomsg.SdkMessage)
type SocketContext struct {
Sock deliver.Deliver
@@ -37,14 +38,14 @@
fmt.Println()
}
- SdkMap["es"] = make(chan *protomsg.SdkMessage)
+ SdkMap["es"] = make(chan protomsg.SdkMessage)
fmt.Println("create es channel: ")
go es(SdkMap["es"])
go AutoDelSdk(util.Sdkflag)
}
func CreatesdkTopicandServer(sdkid string) {
- SdkMap[sdkid] = make(chan *protomsg.SdkMessage)
+ SdkMap[sdkid] = make(chan protomsg.SdkMessage)
fmt.Println("create sdk channel: ", sdkid)
url := fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPull)
@@ -77,7 +78,7 @@
}
//鍗曠嫭澶勭悊 es 涓婚鐨勬儏鍐�
-func es(sdkmsgchan chan *protomsg.SdkMessage) {
+func es(sdkmsgchan chan protomsg.SdkMessage) {
for _ = range sdkmsgchan {
fmt.Println("this data is finish all sdk! ")
}
@@ -110,14 +111,10 @@
//涓婚
//sdk鏁版嵁 鍔犲伐鍣�
-func SdkData(cid string, taskid string, data []byte) *protomsg.SdkMessage {
- var sdkmsg = &protomsg.SdkMessage{}
+func SdkData(cid string,caddr string, taskid string, data []byte) protomsg.SdkMessage {
+ var sdkmsg = protomsg.SdkMessage{}
sdkmsg.Cid = cid
- //if _, ok := tasktag.TaskMapLab[taskid]; !ok {
- // sdkmsg.Tasklab = nil
- // return sdkmsg
- //}
-
+ sdkmsg.Caddr =caddr
if val, ok := tasktag.TaskMapLab.Load(taskid); !ok {
sdkmsg.Tasklab = nil
return sdkmsg
@@ -132,13 +129,13 @@
}
//sdk鏁版嵁鍒嗗彂鍣�
-func SdkSendTopic(sdkmsg *protomsg.SdkMessage) (sdksend string) {
- if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkids) {
- sdksend = sdkmsg.Tasklab.Sdkids[sdkmsg.Tasklab.Index]
+func SdkSendTopic(sdkmsg protomsg.SdkMessage) (sdksend string) {
+ if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkinfos) {
+ sdksend = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdkid
} else {
sdksend = "es"
}
- fmt.Println("鍒嗗彂鐨勪富棰樻槸锛� ", sdksend)
+ fmt.Printf("鍒嗗彂鐨勪富棰樻槸锛�%s 浣嶇疆 %d/%d\n ", sdksend, int(sdkmsg.Tasklab.Index)+1, len(sdkmsg.Tasklab.Sdkinfos))
return
}
@@ -179,7 +176,7 @@
func Recv(socket SocketContext) {
- var repsdkmsg = &protomsg.SdkMessage{}
+ var repsdkmsg = protomsg.SdkMessage{}
for {
select {
case <-socket.Context.Done():
@@ -190,12 +187,14 @@
//fmt.Printf("%s ", err)
continue
} else {
- err = proto.Unmarshal(msg, repsdkmsg)
+ err = proto.Unmarshal(msg, &repsdkmsg)
fmt.Println("receive len: ", len(msg))
if err != nil {
fmt.Println("unmarshal error: ", err)
+ os.Exit(1)
continue
}
+ repsdkmsg.Tasklab.Index++
//璋冪敤璁$畻鍑芥暟锛� 鍒嗗彂缁欎笅涓�涓富棰�
nexttopic := SdkSendTopic(repsdkmsg)
SdkMap[nexttopic] <- repsdkmsg
@@ -204,18 +203,18 @@
}
}
-func Send(sdkid string, socket SocketContext, in chan *protomsg.SdkMessage) {
- var v *protomsg.SdkMessage
- var ok bool
+func Send(sdkid string, socket SocketContext, in chan protomsg.SdkMessage) {
+// var v *protomsg.SdkMessage
+// var ok bool
for {
select {
case <-socket.Context.Done():
fmt.Println("socket is close")
return
- case v, ok = <-in:
+ case v, ok := <-in:
if ok {
- data, err := proto.Marshal(v)
+ data, err :=v.Marshal()
if err != nil {
fmt.Println("proto marshal error ", err)
continue
diff --git a/tasktag/tasktag.go b/tasktag/tasktag.go
index 5f6cb8e..8afacbb 100644
--- a/tasktag/tasktag.go
+++ b/tasktag/tasktag.go
@@ -8,7 +8,6 @@
"github.com/long/test/util"
)
-//var TaskMapLab = make(map[string]*protomsg.TaskLabel)
var TaskMapLab sync.Map
func Init() {
@@ -29,31 +28,28 @@
for _, taskSdk := range util.TaskSdks {
var tl protomsg.TaskLabel
tl.Taskid = taskSdk.Task.Taskid
+ tl.Taskname = taskSdk.Task.Taskname
for _, sdkinfo := range taskSdk.Sdks {
- tl.Sdkids = append(tl.Sdkids, sdkinfo.Id)
+ sdkinfowithtask := new(protomsg.SdkmsgWithTask)
+ sdkinfowithtask.Sdkid = sdkinfo.Id
+ sdkinfowithtask.Sdktype = sdkinfo.SdkType
+ sdkinfowithtask.SdkName = sdkinfo.SdkName
+ sdkinfowithtask.Sdkdata = make([]byte, 1)
+ tl.Sdkinfos = append(tl.Sdkinfos, sdkinfowithtask)
}
tl.Index = int32(0)
tls = append(tls, tl)
}
-
GenTasklab(tls)
-
TaskMapLab.Range(func(k, v interface{}) bool {
fmt.Println(k, v)
return true
})
-
- //for key, value := range TaskMapLab {
- // fmt.Println()
- // fmt.Println(key, value)
- //}
}
// 浠巗qlite 鎺ュ彛鎷垮埌鎵�鏈夌殑浠诲姟, 姣忎竴涓换鍔¢兘鏈夎嚜宸辩殑鍑犱釜绠楁硶
//浠� taskid 浣滀负key, 瀵瑰簲鐨勭畻娉曠粍鍚堜綔涓� value
func GenTasklab(tasklab []protomsg.TaskLabel) {
- // TaskMapLab = nil
- // TaskMapLab = make(map[string]*protomsg.TaskLabel)
TaskMapLab.Range(func(key interface{}, value interface{}) bool {
TaskMapLab.Delete(key)
return true
@@ -62,6 +58,5 @@
for _, value := range tasklab {
pv := value
TaskMapLab.Store(value.Taskid, &pv)
- //TaskMapLab[value.Taskid] = &pv
}
}
diff --git a/test b/test
index c3d91da..451b075 100755
--- a/test
+++ b/test
Binary files differ
diff --git a/util/sqlite.go b/util/sqlite.go
index 35a6a9a..f06b3e3 100644
--- a/util/sqlite.go
+++ b/util/sqlite.go
@@ -24,7 +24,9 @@
var CameraTasks []protomsg.CameraAndTaskInfo
var TaskSdks []protomsg.TaskSdkInfo
+
var Sdklist []string
+var Sdkinfos []protomsg.Sdk
var urlServer = "tcp://192.168.1.11:40007"
var urlPubSub = "tcp://192.168.1.11:50007"
@@ -48,6 +50,7 @@
TaskSdks = taskapi.FindAll()
Sdklist = sdkapi.GetAllSdkIds()
+ Sdkinfos = sdkapi.FindAll("")
initchan <- true
}
@@ -76,6 +79,7 @@
case protomsg.TableChanged_T_Sdk:
fmt.Println("update sdk")
Sdklist = sdkapi.GetAllSdkIds()
+ Sdkinfos = sdkapi.FindAll("")
Sdkflag <- true
fmt.Println("update finished!")
@@ -93,7 +97,6 @@
peers, _ := clientOne.Peers()
for x := range peers {
- fmt.Println("client: ", x)
Getdata(x)
}
}
--
Gitblit v1.8.0