From 5fdc5059555e7a4cf533b7bfdb79025f23fa2b6a Mon Sep 17 00:00:00 2001
From: 554325746@qq.com <554325746@qq.com>
Date: 星期三, 26 六月 2019 14:53:46 +0800
Subject: [PATCH] task with zhangmeng ok release 2

---
 /dev/null          |    0 
 go.sum             |    8 ++
 test               |    0 
 tasktag/tasktag.go |   19 ++----
 go.mod             |    5 +
 sdk/sdk.go         |   43 +++++++-------
 util/sqlite.go     |    5 +
 camera/camera.go   |   42 +++++++------
 8 files changed, 63 insertions(+), 59 deletions(-)

diff --git a/camera/camera.go b/camera/camera.go
index 6432434..b24b337 100644
--- a/camera/camera.go
+++ b/camera/camera.go
@@ -3,30 +3,28 @@
 import (
 	"errors"
 
-	"basic.com/dbapi.git"
 	"basic.com/valib/deliver.git"
 
 	"github.com/long/test/sdk"
 	"github.com/long/test/util"
+    "basic.com/pubsub/protomsg.git"
 
 	"context"
 	"fmt"
 	"sync"
-	//"time"
+	// "time"
 )
 
 //var SocketManage = make(map[string]SocketContext)
 var SocketManage sync.Map
 
-var Initchannel = make(chan string)
+var Initchannel = make(chan protomsg.Camera )
 
 type SocketContext struct {
 	Sock    deliver.Deliver
 	Context context.Context
 	Cancel  context.CancelFunc
 }
-
-var camval dbapi.CameraApi
 
 func Init() {
 
@@ -40,12 +38,14 @@
 	go AutoDelCamera(util.Cameraflag)
 
 	for _, cam := range util.CameraIds {
-		Initchannel <- cam.Id
+		Initchannel <- cam
 	}
 }
 
-func CreateCamera(camera chan string) {
-	for camid := range camera {
+func CreateCamera(camera chan protomsg.Camera) {
+	for cam := range camera {
+        camid := cam.Id
+        caddr := cam.Addr
 		if _, ok := SocketManage.Load(camid); !ok {
 			url := fmt.Sprintf("ipc:///tmp/%s.ipc", camid)
 
@@ -55,9 +55,9 @@
 				continue
 			}
 
-			go func(cid string, sock SocketContext) {
-				Recv(cid, sock)
-			}(id, socketlisten)
+			go func(cid string, cameraaddr string,  sock SocketContext) {
+				Recv(cid, cameraaddr, sock)
+			}(id, caddr, socketlisten)
 		}
 	}
 }
@@ -87,7 +87,11 @@
 
 		for key, op := range cameraChanDel {
 			if op == "add" {
-				Initchannel <- key
+                for _, value := range util.CameraIds {
+                    if key == value.Id{ 
+        				Initchannel <- value
+                    }
+                }
 			} else {
 				if sock, ok := SocketManage.Load(key); ok {
 					if socket, sok := sock.(SocketContext); sok {
@@ -118,7 +122,7 @@
 	return cameraid, socket, nil
 }
 
-func Recv(cameraid string, socket SocketContext) {
+func Recv(cameraid string, caddr string , socket SocketContext) {
 	var msg []byte
 	var err error
 	for {
@@ -134,16 +138,16 @@
 				fmt.Println()
 				fmt.Println("============== one msg input ==========")
 				for _, taskid := range GetAlltask(cameraid) {
-					//time.Sleep(5 * time.Second)
+					// time.Sleep(5 * time.Second)
 					fmt.Println("cameraid: ", cameraid, " taskid: ", taskid)
-					Taskdolist(cameraid, taskid, msg)
+					 Taskdolist(cameraid, caddr, taskid, msg)
 				}
 			}
 		}
 	}
 }
 
-//   鏍规嵁cid 鑾峰彇 鎵�鏈夌殑浠诲姟
+//   鎹甤id 鑾峰彇 鎵�鏈夌殑浠诲姟
 func GetAlltask(cid string) (tasks []string) {
 	for _, camsingle := range util.CameraTasks {
 		if cid == camsingle.Camera.Id {
@@ -156,10 +160,10 @@
 	return
 }
 
-func Taskdolist(cid string, taskid string, data []byte) {
+func Taskdolist(cid string, caddr string,  taskid string, data []byte) {
 
 	//  鏁版嵁鍔犲伐(鎵撴爣绛�)
-	sdkmsg := sdk.SdkData(cid, taskid, data)
+	sdkmsg := sdk.SdkData(cid, caddr, taskid, data)
 	if sdkmsg.Tasklab == nil {
 		fmt.Printf("cid:%s 娌℃湁浠诲姟%s\n", cid, taskid)
 		return
@@ -168,9 +172,7 @@
 	//  璁$畻鍒嗗彂鐨勪富棰�
 	SendTopic := sdk.SdkSendTopic(sdkmsg)
 	if _, ok := sdk.SdkMap[SendTopic]; ok {
-		fmt.Println("鍒嗗彂鐨勪富棰樺瓨鍦�")
 		sdk.SdkMap[SendTopic] <- sdkmsg
-		//fmt.Println("閲嶆柊寮�濮嬪惊鐜�: ", sdk.SdkMap)
 	} else {
 		fmt.Println("鍒嗗彂鐨勪富棰樹笉瀛樺湪")
 	}
diff --git a/go.mod b/go.mod
index 9051b1b..42e398d 100644
--- a/go.mod
+++ b/go.mod
@@ -3,9 +3,10 @@
 go 1.12
 
 require (
-	basic.com/dbapi.git v0.0.0-20190531051326-e49473afa5e9
-	basic.com/pubsub/protomsg.git v0.0.0-20190530084829-0ea842491a96
+	basic.com/dbapi.git v0.0.0-20190620073851-30a7d0574bbc
+	basic.com/pubsub/protomsg.git v0.0.0-20190621090107-c5cf390e19bb
 	basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce
+	github.com/ajg/form v1.5.1 // indirect
 	github.com/gogo/protobuf v1.2.1
 	github.com/golang/protobuf v1.3.1
 	github.com/gorilla/websocket v1.4.0 // indirect
diff --git a/go.sum b/go.sum
index aa563a6..4a4fa0d 100644
--- a/go.sum
+++ b/go.sum
@@ -1,9 +1,13 @@
 basic.com/dbapi.git v0.0.0-20190531051326-e49473afa5e9 h1:1HIG2sEEYVUKL7nyJvURj/p24JpDwFwKE/XtatPsXVQ=
 basic.com/dbapi.git v0.0.0-20190531051326-e49473afa5e9/go.mod h1:eDXPnxaz6jZPDvBSk7ya7oSASWPCuUEgRTJCjsfKt/Q=
-basic.com/pubsub/protomsg.git v0.0.0-20190530084829-0ea842491a96 h1:7nkipxWDbIK4wRbLCZeUoGxzdEIxZFumSTM6xhfWiWM=
-basic.com/pubsub/protomsg.git v0.0.0-20190530084829-0ea842491a96/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
+basic.com/dbapi.git v0.0.0-20190620073851-30a7d0574bbc h1:Dd4aSg+S8E1Kj49kTtHkRZpTeIJiM521i8wCkWMAmOg=
+basic.com/dbapi.git v0.0.0-20190620073851-30a7d0574bbc/go.mod h1:eDXPnxaz6jZPDvBSk7ya7oSASWPCuUEgRTJCjsfKt/Q=
+basic.com/pubsub/protomsg.git v0.0.0-20190621090107-c5cf390e19bb h1:TCo2Oo2ZqHn+/WvfVCyQFSKHHps9gJZkC0D8MtTVcRs=
+basic.com/pubsub/protomsg.git v0.0.0-20190621090107-c5cf390e19bb/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
 basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce h1:/D6k+FVN1sMqLz6tMlsIl9bKwE2Mpc5d4QfPh0y4DSQ=
 basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce/go.mod h1:bkYiTUGzckyNOjAgn9rB/DOjFzwoSHJlruuWQ6hu6IY=
+github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
+github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
 github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
 github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
 github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
diff --git a/ls b/ls
deleted file mode 100644
index e69de29..0000000
--- a/ls
+++ /dev/null
diff --git a/sdk/sdk.go b/sdk/sdk.go
index bd933e7..7a1c554 100644
--- a/sdk/sdk.go
+++ b/sdk/sdk.go
@@ -4,6 +4,7 @@
 	"context"
 	"errors"
 	"fmt"
+    "os"
 
 	//	"github.com/long/test/httpclient"
 	"github.com/long/test/tasktag"
@@ -21,7 +22,7 @@
 )
 
 var SocketManage = make(map[string]SocketContext)
-var SdkMap = make(map[string]chan *protomsg.SdkMessage)
+var SdkMap = make(map[string]chan protomsg.SdkMessage)
 
 type SocketContext struct {
 	Sock    deliver.Deliver
@@ -37,14 +38,14 @@
 		fmt.Println()
 	}
 
-	SdkMap["es"] = make(chan *protomsg.SdkMessage)
+	SdkMap["es"] = make(chan protomsg.SdkMessage)
 	fmt.Println("create es channel:  ")
 	go es(SdkMap["es"])
 	go AutoDelSdk(util.Sdkflag)
 }
 
 func CreatesdkTopicandServer(sdkid string) {
-	SdkMap[sdkid] = make(chan *protomsg.SdkMessage)
+	SdkMap[sdkid] = make(chan protomsg.SdkMessage)
 	fmt.Println("create sdk channel:  ", sdkid)
 
 	url := fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPull)
@@ -77,7 +78,7 @@
 }
 
 //鍗曠嫭澶勭悊   es 涓婚鐨勬儏鍐�
-func es(sdkmsgchan chan *protomsg.SdkMessage) {
+func es(sdkmsgchan chan protomsg.SdkMessage) {
 	for _ = range sdkmsgchan {
 		fmt.Println("this data is finish all sdk! ")
 	}
@@ -110,14 +111,10 @@
 //涓婚
 
 //sdk鏁版嵁 鍔犲伐鍣�
-func SdkData(cid string, taskid string, data []byte) *protomsg.SdkMessage {
-	var sdkmsg = &protomsg.SdkMessage{}
+func SdkData(cid string,caddr string, taskid string, data []byte) protomsg.SdkMessage {
+	var sdkmsg = protomsg.SdkMessage{}
 	sdkmsg.Cid = cid
-	//if _, ok := tasktag.TaskMapLab[taskid]; !ok {
-	//	sdkmsg.Tasklab = nil
-	//	return sdkmsg
-	//}
-
+    sdkmsg.Caddr =caddr 
 	if val, ok := tasktag.TaskMapLab.Load(taskid); !ok {
 		sdkmsg.Tasklab = nil
 		return sdkmsg
@@ -132,13 +129,13 @@
 }
 
 //sdk鏁版嵁鍒嗗彂鍣�
-func SdkSendTopic(sdkmsg *protomsg.SdkMessage) (sdksend string) {
-	if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkids) {
-		sdksend = sdkmsg.Tasklab.Sdkids[sdkmsg.Tasklab.Index]
+func SdkSendTopic(sdkmsg protomsg.SdkMessage) (sdksend string) {
+	if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkinfos) {
+		sdksend = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdkid
 	} else {
 		sdksend = "es"
 	}
-	fmt.Println("鍒嗗彂鐨勪富棰樻槸锛� ", sdksend)
+	fmt.Printf("鍒嗗彂鐨勪富棰樻槸锛�%s 浣嶇疆 %d/%d\n ", sdksend, int(sdkmsg.Tasklab.Index)+1, len(sdkmsg.Tasklab.Sdkinfos))
 	return
 }
 
@@ -179,7 +176,7 @@
 
 func Recv(socket SocketContext) {
 
-	var repsdkmsg = &protomsg.SdkMessage{}
+	var repsdkmsg = protomsg.SdkMessage{}
 	for {
 		select {
 		case <-socket.Context.Done():
@@ -190,12 +187,14 @@
 				//fmt.Printf("%s ", err)
 				continue
 			} else {
-				err = proto.Unmarshal(msg, repsdkmsg)
+				err = proto.Unmarshal(msg, &repsdkmsg)
 				fmt.Println("receive len: ", len(msg))
 				if err != nil {
 					fmt.Println("unmarshal error: ", err)
+                    os.Exit(1)
 					continue
 				}
+                repsdkmsg.Tasklab.Index++
 				//璋冪敤璁$畻鍑芥暟锛� 鍒嗗彂缁欎笅涓�涓富棰�
 				nexttopic := SdkSendTopic(repsdkmsg)
 				SdkMap[nexttopic] <- repsdkmsg
@@ -204,18 +203,18 @@
 	}
 }
 
-func Send(sdkid string, socket SocketContext, in chan *protomsg.SdkMessage) {
-	var v *protomsg.SdkMessage
-	var ok bool
+func Send(sdkid string, socket SocketContext, in chan protomsg.SdkMessage) {
+//	var v *protomsg.SdkMessage
+//	var ok bool
 
 	for {
 		select {
 		case <-socket.Context.Done():
 			fmt.Println("socket is close")
 			return
-		case v, ok = <-in:
+        case v, ok := <-in:
 			if ok {
-				data, err := proto.Marshal(v)
+				data, err :=v.Marshal()
 				if err != nil {
 					fmt.Println("proto marshal error ", err)
 					continue
diff --git a/tasktag/tasktag.go b/tasktag/tasktag.go
index 5f6cb8e..8afacbb 100644
--- a/tasktag/tasktag.go
+++ b/tasktag/tasktag.go
@@ -8,7 +8,6 @@
 	"github.com/long/test/util"
 )
 
-//var TaskMapLab = make(map[string]*protomsg.TaskLabel)
 var TaskMapLab sync.Map
 
 func Init() {
@@ -29,31 +28,28 @@
 	for _, taskSdk := range util.TaskSdks {
 		var tl protomsg.TaskLabel
 		tl.Taskid = taskSdk.Task.Taskid
+        tl.Taskname = taskSdk.Task.Taskname
 		for _, sdkinfo := range taskSdk.Sdks {
-			tl.Sdkids = append(tl.Sdkids, sdkinfo.Id)
+            sdkinfowithtask := new(protomsg.SdkmsgWithTask) 
+            sdkinfowithtask.Sdkid = sdkinfo.Id
+            sdkinfowithtask.Sdktype = sdkinfo.SdkType
+            sdkinfowithtask.SdkName = sdkinfo.SdkName
+            sdkinfowithtask.Sdkdata = make([]byte, 1)
+            tl.Sdkinfos = append(tl.Sdkinfos, sdkinfowithtask)
 		}
 		tl.Index = int32(0)
 		tls = append(tls, tl)
 	}
-
 	GenTasklab(tls)
-
 	TaskMapLab.Range(func(k, v interface{}) bool {
 		fmt.Println(k, v)
 		return true
 	})
-
-	//for key, value := range TaskMapLab {
-	//	fmt.Println()
-	//	fmt.Println(key, value)
-	//}
 }
 
 // 浠巗qlite 鎺ュ彛鎷垮埌鎵�鏈夌殑浠诲姟, 姣忎竴涓换鍔¢兘鏈夎嚜宸辩殑鍑犱釜绠楁硶
 //浠� taskid 浣滀负key, 瀵瑰簲鐨勭畻娉曠粍鍚堜綔涓� value
 func GenTasklab(tasklab []protomsg.TaskLabel) {
-	// TaskMapLab = nil
-	// TaskMapLab = make(map[string]*protomsg.TaskLabel)
 	TaskMapLab.Range(func(key interface{}, value interface{}) bool {
 		TaskMapLab.Delete(key)
 		return true
@@ -62,6 +58,5 @@
 	for _, value := range tasklab {
 		pv := value
 		TaskMapLab.Store(value.Taskid, &pv)
-		//TaskMapLab[value.Taskid] = &pv
 	}
 }
diff --git a/test b/test
index c3d91da..451b075 100755
--- a/test
+++ b/test
Binary files differ
diff --git a/util/sqlite.go b/util/sqlite.go
index 35a6a9a..f06b3e3 100644
--- a/util/sqlite.go
+++ b/util/sqlite.go
@@ -24,7 +24,9 @@
 var CameraTasks []protomsg.CameraAndTaskInfo
 
 var TaskSdks []protomsg.TaskSdkInfo
+
 var Sdklist []string
+var Sdkinfos []protomsg.Sdk
 
 var urlServer = "tcp://192.168.1.11:40007"
 var urlPubSub = "tcp://192.168.1.11:50007"
@@ -48,6 +50,7 @@
 	TaskSdks = taskapi.FindAll()
 
 	Sdklist = sdkapi.GetAllSdkIds()
+    Sdkinfos = sdkapi.FindAll("")
 	initchan <- true
 }
 
@@ -76,6 +79,7 @@
 	case protomsg.TableChanged_T_Sdk:
 		fmt.Println("update sdk")
 		Sdklist = sdkapi.GetAllSdkIds()
+        Sdkinfos = sdkapi.FindAll("")
 		Sdkflag <- true
 		fmt.Println("update finished!")
 
@@ -93,7 +97,6 @@
 
 	peers, _ := clientOne.Peers()
 	for x := range peers {
-		fmt.Println("client: ", x)
 		Getdata(x)
 	}
 }

--
Gitblit v1.8.0